Kagome
Polkadot Runtime Engine in C++17
propagate_transactions_protocol.cpp
Go to the documentation of this file.
1 
7 
9 #include "network/common.hpp"
12 
13 namespace {
14  constexpr const char *kPropagatedTransactions =
15  "kagome_sync_propagated_transactions";
16 }
17 
18 namespace kagome::network {
19 
20  KAGOME_DEFINE_CACHE(PropagateTransactionsProtocol);
21 
23  libp2p::Host &host,
24  const application::AppConfiguration &app_config,
25  const application::ChainSpec &chain_spec,
26  std::shared_ptr<consensus::babe::Babe> babe,
27  std::shared_ptr<ExtrinsicObserver> extrinsic_observer,
28  std::shared_ptr<StreamEngine> stream_engine,
29  std::shared_ptr<primitives::events::ExtrinsicSubscriptionEngine>
30  extrinsic_events_engine,
31  std::shared_ptr<subscription::ExtrinsicEventKeyRepository>
32  ext_event_key_repo)
33  : base_(host,
34  {fmt::format(kPropagateTransactionsProtocol.data(),
35  chain_spec.protocolId())},
36  "PropagateTransactionsProtocol"),
37  app_config_(app_config),
38  babe_(std::move(babe)),
39  extrinsic_observer_(std::move(extrinsic_observer)),
40  stream_engine_(std::move(stream_engine)),
41  extrinsic_events_engine_{std::move(extrinsic_events_engine)},
42  ext_event_key_repo_{std::move(ext_event_key_repo)} {
43  BOOST_ASSERT(extrinsic_observer_ != nullptr);
44  BOOST_ASSERT(stream_engine_ != nullptr);
45  BOOST_ASSERT(extrinsic_events_engine_ != nullptr);
46  BOOST_ASSERT(ext_event_key_repo_ != nullptr);
47 
48  // Register metrics
49  metrics_registry_->registerCounterFamily(
50  kPropagatedTransactions,
51  "Number of transactions propagated to at least one peer");
53  metrics_registry_->registerCounterMetric(kPropagatedTransactions);
54  }
55 
57  return base_.start(weak_from_this());
58  }
59 
61  return base_.stop();
62  }
63 
65  std::shared_ptr<Stream> stream) {
66  BOOST_ASSERT(stream->remotePeerId().has_value());
67 
69  stream,
71  [wp = weak_from_this(), stream](outcome::result<void> res) {
72  auto self = wp.lock();
73  if (not self) {
74  stream->reset();
75  return;
76  }
77 
78  auto peer_id = stream->remotePeerId().value();
79 
80  if (not res.has_value()) {
81  SL_VERBOSE(self->base_.logger(),
82  "Handshake failed on incoming {} stream with {}: {}",
83  self->protocolName(),
84  peer_id,
85  res.error().message());
86  stream->reset();
87  return;
88  }
89 
90  res = self->stream_engine_->addIncoming(stream, self);
91  if (not res.has_value()) {
92  SL_VERBOSE(self->base_.logger(),
93  "Can't register incoming {} stream with {}: {}",
94  self->protocolName(),
95  peer_id,
96  res.error().message());
97  stream->reset();
98  return;
99  }
100 
101  SL_VERBOSE(self->base_.logger(),
102  "Fully established incoming {} stream with {}",
103  self->protocolName(),
104  peer_id);
105  });
106  }
107 
109  const PeerInfo &peer_info,
110  std::function<void(outcome::result<std::shared_ptr<Stream>>)> &&cb) {
111  base_.host().newStream(
112  peer_info.id,
113  base_.protocolIds(),
114  [wp = weak_from_this(), peer_id = peer_info.id, cb = std::move(cb)](
115  auto &&stream_res) mutable {
116  auto self = wp.lock();
117  if (not self) {
119  return;
120  }
121 
122  if (not stream_res.has_value()) {
123  SL_VERBOSE(self->base_.logger(),
124  "Can't create outgoing {} stream with {}: {}",
125  self->protocolName(),
126  peer_id,
127  stream_res.error().message());
128  cb(stream_res.as_failure());
129  return;
130  }
131  const auto &stream_and_proto = stream_res.value();
132 
133  auto cb2 = [wp,
134  stream = stream_and_proto.stream,
135  protocol = stream_and_proto.protocol,
136  cb = std::move(cb)](outcome::result<void> res) {
137  auto self = wp.lock();
138  if (not self) {
140  return;
141  }
142 
143  if (not res.has_value()) {
144  SL_VERBOSE(self->base_.logger(),
145  "Handshake failed on outgoing {} stream with {}: {}",
146  protocol,
147  stream->remotePeerId().value(),
148  res.error().message());
149  stream->reset();
150  cb(res.as_failure());
151  return;
152  }
153 
154  res = self->stream_engine_->addOutgoing(stream, self);
155  if (not res.has_value()) {
156  SL_VERBOSE(self->base_.logger(),
157  "Can't register outgoing {} stream with {}: {}",
158  protocol,
159  stream->remotePeerId().value(),
160  res.error().message());
161  stream->reset();
162  cb(res.as_failure());
163  return;
164  }
165 
166  SL_VERBOSE(self->base_.logger(),
167  "Fully established outgoing {} stream with {}",
168  protocol,
169  stream->remotePeerId().value());
170  cb(std::move(stream));
171  };
172 
173  self->writeHandshake(std::move(stream_and_proto.stream),
175  std::move(cb2));
176  });
177  }
178 
180  std::shared_ptr<Stream> stream,
181  Direction direction,
182  std::function<void(outcome::result<void>)> &&cb) {
183  auto read_writer = std::make_shared<ScaleMessageReadWriter>(stream);
184 
185  read_writer->read<NoData>(
186  [stream, direction, wp = weak_from_this(), cb = std::move(cb)](
187  auto &&remote_handshake_res) mutable {
188  auto self = wp.lock();
189  if (not self) {
190  stream->reset();
192  return;
193  }
194 
195  if (not remote_handshake_res.has_value()) {
196  SL_VERBOSE(self->base_.logger(),
197  "Can't read handshake from {}: {}",
198  stream->remotePeerId().value(),
199  remote_handshake_res.error().message());
200  stream->reset();
201  cb(remote_handshake_res.as_failure());
202  return;
203  }
204 
205  SL_TRACE(self->base_.logger(),
206  "Handshake has received from {}",
207  stream->remotePeerId().value());
208 
209  switch (direction) {
210  case Direction::OUTGOING:
211  cb(outcome::success());
212  break;
213  case Direction::INCOMING:
214  self->writeHandshake(
215  std::move(stream), Direction::INCOMING, std::move(cb));
216  break;
217  }
218  });
219  }
220 
222  std::shared_ptr<Stream> stream,
223  Direction direction,
224  std::function<void(outcome::result<void>)> &&cb) {
225  auto read_writer = std::make_shared<ScaleMessageReadWriter>(stream);
226 
227  read_writer->write(NoData{},
228  [stream = std::move(stream),
229  direction,
230  wp = weak_from_this(),
231  cb = std::move(cb)](auto &&write_res) mutable {
232  auto self = wp.lock();
233  if (not self) {
234  stream->reset();
236  return;
237  }
238 
239  if (not write_res.has_value()) {
240  SL_VERBOSE(self->base_.logger(),
241  "Can't send handshake to {}: {}",
242  stream->remotePeerId().value(),
243  write_res.error().message());
244  stream->reset();
245  cb(write_res.as_failure());
246  return;
247  }
248 
249  SL_TRACE(self->base_.logger(),
250  "Handshake has sent to {}",
251  stream->remotePeerId().value());
252 
253  switch (direction) {
254  case Direction::OUTGOING:
255  self->readHandshake(std::move(stream),
257  std::move(cb));
258  break;
259  case Direction::INCOMING:
260  cb(outcome::success());
261  self->readPropagatedExtrinsics(std::move(stream));
262  break;
263  }
264  });
265  }
266 
268  std::shared_ptr<Stream> stream) {
269  auto read_writer = std::make_shared<ScaleMessageReadWriter>(stream);
270 
271  read_writer->read<PropagatedExtrinsics>([stream = std::move(stream),
272  wp = weak_from_this()](
273  auto &&message_res) mutable {
274  auto self = wp.lock();
275  if (not self) {
276  stream->reset();
277  return;
278  }
279 
280  if (not message_res.has_value()) {
281  SL_VERBOSE(self->base_.logger(),
282  "Can't read propagated transactions from {}: {}",
283  stream->remotePeerId().value(),
284  message_res.error().message());
285  stream->reset();
286  return;
287  }
288 
289  auto peer_id = stream->remotePeerId().value();
290  auto &message = message_res.value();
291 
292  SL_VERBOSE(self->base_.logger(),
293  "Received {} propagated transactions from {}",
294  message.extrinsics.size(),
295  peer_id);
296 
297  if (self->babe_->wasSynchronized()) {
298  for (auto &ext : message.extrinsics) {
299  auto result = self->extrinsic_observer_->onTxMessage(ext);
300  if (result) {
301  SL_DEBUG(self->base_.logger(), " Received tx {}", result.value());
302  } else {
303  SL_DEBUG(self->base_.logger(),
304  " Rejected tx: {}",
305  result.error().message());
306  }
307  }
308  } else {
309  SL_TRACE(self->base_.logger(),
310  "Skipping extrinsics processing since the node was not in a "
311  "synchronized state yet.");
312  }
313 
314  self->readPropagatedExtrinsics(std::move(stream));
315  });
316  }
317 
319  gsl::span<const primitives::Transaction> txs) {
320  SL_DEBUG(
321  base_.logger(), "Propagate transactions : {} extrinsics", txs.size());
322 
323  std::vector<libp2p::peer::PeerId> peers;
324  stream_engine_->forEachPeer(
325  [&peers](const libp2p::peer::PeerId &peer_id, const auto &) {
326  peers.push_back(peer_id);
327  });
328  if (peers.size() > 1) { // One of peers is current node itself
329  metric_propagated_tx_counter_->inc(peers.size() - 1);
330  for (const auto &tx : txs) {
331  if (auto key = ext_event_key_repo_->get(tx.hash); key.has_value()) {
332  extrinsic_events_engine_->notify(
333  key.value(),
335  key.value(), peers));
336  }
337  }
338  }
339 
341  exts.extrinsics.resize(txs.size());
342  std::transform(
343  txs.begin(), txs.end(), exts.extrinsics.begin(), [](auto &tx) {
344  return tx.ext;
345  });
346 
349  (*shared_msg) = std::move(exts);
350 
352  shared_from_this(),
353  shared_msg,
355  stream_engine_->outgoingStreamsNumber(shared_from_this()),
357  }
358 
359 } // namespace kagome::network
void readPropagatedExtrinsics(std::shared_ptr< Stream > stream)
void readHandshake(std::shared_ptr< Stream > stream, Direction direction, std::function< void(outcome::result< void >)> &&cb)
static ExtrinsicLifecycleEvent Broadcast(SubscribedExtrinsicId id, gsl::span< const libp2p::peer::PeerId > peers)
void writeHandshake(std::shared_ptr< Stream > stream, Direction direction, std::function< void(outcome::result< void >)> &&cb)
log::Logger const & logger() const
std::shared_ptr< subscription::ExtrinsicEventKeyRepository > ext_event_key_repo_
libp2p::peer::PeerInfo PeerInfo
libp2p::peer::PeerId PeerId
KAGOME_DEFINE_CACHE(BlockAnnounceProtocol)
virtual int32_t luckyPeers() const =0
#define KAGOME_EXTRACT_SHARED_CACHE(prefix, type)
std::shared_ptr< primitives::events::ExtrinsicSubscriptionEngine > extrinsic_events_engine_
virtual void inc()=0
Increment the counter by 1.
virtual const std::string & protocolId() const =0
void newOutgoingStream(const PeerInfo &peer_info, std::function< void(outcome::result< std::shared_ptr< Stream >>)> &&cb) override
void onIncomingStream(std::shared_ptr< Stream > stream) override
Protocols const & protocolIds() const
const libp2p::peer::Protocol kPropagateTransactionsProtocol
Definition: common.hpp:19
bool start(std::weak_ptr< T > wptr)
void propagateTransactions(gsl::span< const primitives::Transaction > txs)
std::vector< primitives::Extrinsic > extrinsics