Kagome
Polkadot Runtime Engine in C++17
block_announce_protocol.cpp
Go to the documentation of this file.
1 
7 
9 #include "network/common.hpp"
13 
14 namespace kagome::network {
15 
16  KAGOME_DEFINE_CACHE(BlockAnnounceProtocol);
17 
19  libp2p::Host &host,
20  const application::AppConfiguration &app_config,
21  const application::ChainSpec &chain_spec,
22  std::shared_ptr<StreamEngine> stream_engine,
23  std::shared_ptr<blockchain::BlockTree> block_tree,
24  std::shared_ptr<BlockAnnounceObserver> observer,
25  std::shared_ptr<PeerManager> peer_manager)
26  : base_(host,
27  {fmt::format(kBlockAnnouncesProtocol.data(),
28  chain_spec.protocolId())},
29  "BlockAnnounceProtocol"),
30  app_config_(app_config),
31  stream_engine_(std::move(stream_engine)),
32  block_tree_(std::move(block_tree)),
33  observer_(std::move(observer)),
34  peer_manager_(std::move(peer_manager)) {
35  BOOST_ASSERT(stream_engine_ != nullptr);
36  BOOST_ASSERT(block_tree_ != nullptr);
37  BOOST_ASSERT(observer_ != nullptr);
38  BOOST_ASSERT(peer_manager_ != nullptr);
39  }
40 
42  return base_.start(weak_from_this());
43  }
44 
46  return base_.stop();
47  }
48 
49  outcome::result<Status> BlockAnnounceProtocol::createStatus() const {
51  Roles roles = app_config_.roles();
52 
54  BlockInfo best_block;
55  const auto &last_finalized = block_tree_->getLastFinalized().hash;
56  if (auto best_res =
57  block_tree_->getBestContaining(last_finalized, std::nullopt);
58  best_res.has_value()) {
59  best_block = best_res.value();
60  } else {
61  base_.logger()->error("Could not get best block info: {}",
62  best_res.error().message());
64  }
65 
66  auto &genesis_hash = block_tree_->getGenesisBlockHash();
67 
68  return Status{
69  .roles = roles, .best_block = best_block, .genesis_hash = genesis_hash};
70  }
71 
72  void BlockAnnounceProtocol::onIncomingStream(std::shared_ptr<Stream> stream) {
73  BOOST_ASSERT(stream->remotePeerId().has_value());
74 
75  readStatus(stream,
77  [wp = weak_from_this(), stream](outcome::result<void> res) {
78  auto self = wp.lock();
79  if (not self) {
80  stream->reset();
81  return;
82  }
83 
84  auto peer_id = stream->remotePeerId().value();
85 
86  if (not res.has_value()) {
87  SL_VERBOSE(
88  self->base_.logger(),
89  "Handshake failed on incoming {} stream with {}: {}",
90  self->protocolName(),
91  peer_id.toBase58(),
92  res.error().message());
93  stream->reset();
94  return;
95  }
96 
97  res = self->stream_engine_->addIncoming(stream, self);
98  if (not res.has_value()) {
99  SL_VERBOSE(self->base_.logger(),
100  "Can't register incoming {} stream with {}: {}",
101  self->protocolName(),
102  peer_id.toBase58(),
103  res.error().message());
104  stream->reset();
105  return;
106  }
107 
108  self->peer_manager_->reserveStreams(peer_id);
109  self->peer_manager_->startPingingPeer(peer_id);
110 
111  SL_VERBOSE(self->base_.logger(),
112  "Fully established incoming {} stream with {}",
113  self->protocolName(),
114  peer_id.toBase58());
115  });
116  }
117 
119  const PeerInfo &peer_info,
120  std::function<void(outcome::result<std::shared_ptr<Stream>>)> &&cb) {
121  SL_DEBUG(base_.logger(),
122  "Connect for {} stream with {}",
123  protocolName(),
124  peer_info.id);
125 
126  base_.host().newStream(
127  peer_info.id,
128  base_.protocolIds(),
129  [wp = weak_from_this(), peer_id = peer_info.id, cb = std::move(cb)](
130  auto &&stream_res) mutable {
131  auto self = wp.lock();
132  if (not self) {
134  return;
135  }
136 
137  if (not stream_res.has_value()) {
138  SL_VERBOSE(self->base_.logger(),
139  "Can't create outgoing {} stream with {}: {}",
140  self->protocolName(),
141  peer_id,
142  stream_res.error().message());
143  cb(stream_res.as_failure());
144  return;
145  }
146 
147  const auto &stream_and_proto = stream_res.value();
148 
149  auto cb2 = [wp,
150  stream = stream_and_proto.stream,
151  protocol = stream_and_proto.protocol,
152  cb = std::move(cb)](outcome::result<void> res) {
153  auto self = wp.lock();
154  if (not self) {
156  return;
157  }
158 
159  if (not res.has_value()) {
160  SL_VERBOSE(self->base_.logger(),
161  "Handshake failed on outgoing {} stream with {}: {}",
162  protocol,
163  stream->remotePeerId().value(),
164  res.error().message());
165  stream->reset();
166  cb(res.as_failure());
167  return;
168  }
169 
170  res = self->stream_engine_->addOutgoing(stream, self);
171  if (not res.has_value()) {
172  SL_VERBOSE(self->base_.logger(),
173  "Can't register outgoing {} stream with {}: {}",
174  protocol,
175  stream->remotePeerId().value(),
176  res.error().message());
177  stream->reset();
178  cb(res.as_failure());
179  return;
180  }
181 
182  SL_VERBOSE(self->base_.logger(),
183  "Fully established outgoing {} stream with {}",
184  protocol,
185  stream->remotePeerId().value());
186  cb(std::move(stream));
187  };
188 
189  self->writeStatus(std::move(stream_and_proto.stream),
191  std::move(cb2));
192  });
193  }
194 
196  std::shared_ptr<Stream> stream,
197  Direction direction,
198  std::function<void(outcome::result<void>)> &&cb) {
199  auto read_writer = std::make_shared<ScaleMessageReadWriter>(stream);
200 
201  read_writer->read<Status>(
202  [stream, direction, wp = weak_from_this(), cb = std::move(cb)](
203  auto &&remote_status_res) mutable {
204  auto self = wp.lock();
205  if (not self) {
206  stream->reset();
208  return;
209  }
210 
211  if (not remote_status_res.has_value()) {
212  SL_VERBOSE(self->base_.logger(),
213  "Can't read handshake from {}: {}",
214  stream->remotePeerId().value(),
215  remote_status_res.error().message());
216  stream->reset();
217  cb(remote_status_res.as_failure());
218  return;
219  }
220  auto &remote_status = remote_status_res.value();
221 
222  SL_TRACE(self->base_.logger(),
223  "Handshake has received from {}",
224  stream->remotePeerId().value());
225 
226  auto &genesis_hash = self->block_tree_->getGenesisBlockHash();
227 
228  if (remote_status.genesis_hash != genesis_hash) {
229  SL_VERBOSE(self->base_.logger(),
230  "Error while processing status: genesis no match");
231  stream->reset();
233  return;
234  }
235 
236  auto peer_id = stream->remotePeerId().value();
237  SL_TRACE(self->base_.logger(),
238  "Received status from peer_id={} (best block {})",
239  peer_id,
240  remote_status.best_block.number);
241  self->peer_manager_->updatePeerState(peer_id, remote_status);
242 
243  switch (direction) {
244  case Direction::OUTGOING:
245  cb(outcome::success());
246  break;
247  case Direction::INCOMING:
248  self->writeStatus(
249  std::move(stream), Direction::INCOMING, std::move(cb));
250  break;
251  }
252 
253  self->observer_->onRemoteStatus(peer_id, remote_status);
254  });
255  }
256 
258  std::shared_ptr<Stream> stream,
259  Direction direction,
260  std::function<void(outcome::result<void>)> &&cb) {
261  auto read_writer = std::make_shared<ScaleMessageReadWriter>(stream);
262 
263  auto status_res = createStatus();
264  if (not status_res.has_value()) {
265  stream->reset();
267  return;
268  }
269 
270  const auto &status = status_res.value();
271 
272  read_writer->write(status,
273  [stream = std::move(stream),
274  direction,
275  wp = weak_from_this(),
276  cb = std::move(cb)](auto &&write_res) mutable {
277  auto self = wp.lock();
278  if (not self) {
279  stream->reset();
281  return;
282  }
283 
284  if (not write_res.has_value()) {
285  SL_VERBOSE(self->base_.logger(),
286  "Can't send handshake to {}: {}",
287  stream->remotePeerId().value(),
288  write_res.error().message());
289  stream->reset();
290  cb(write_res.as_failure());
291  return;
292  }
293 
294  SL_TRACE(self->base_.logger(),
295  "Handshake has sent to {}",
296  stream->remotePeerId().value());
297 
298  switch (direction) {
299  case Direction::OUTGOING:
300  self->readStatus(std::move(stream),
302  std::move(cb));
303  break;
304  case Direction::INCOMING:
305  cb(outcome::success());
306  self->readAnnounce(std::move(stream));
307  break;
308  }
309  });
310  }
311 
312  void BlockAnnounceProtocol::readAnnounce(std::shared_ptr<Stream> stream) {
313  auto read_writer = std::make_shared<ScaleMessageReadWriter>(stream);
314 
315  read_writer->read<BlockAnnounce>(
316  [stream = std::move(stream),
317  wp = weak_from_this()](auto &&block_announce_res) mutable {
318  auto self = wp.lock();
319  if (not self) {
320  stream->reset();
321  return;
322  }
323 
324  if (not block_announce_res.has_value()) {
325  SL_DEBUG(self->base_.logger(),
326  "Can't read block announce from {}: {}",
327  stream->remotePeerId().value(),
328  block_announce_res.error().message());
329  stream->reset();
330  return;
331  }
332 
333  auto peer_id = stream->remotePeerId().value();
334  auto &block_announce = block_announce_res.value();
335 
336  SL_VERBOSE(self->base_.logger(),
337  "Announce of block #{} is received from {}",
338  block_announce.header.number,
339  peer_id);
340 
341  self->observer_->onBlockAnnounce(peer_id, block_announce);
342 
343  BOOST_ASSERT_MSG(stream->remotePeerId().has_value(),
344  "peer_id must be known at this moment");
345  self->peer_manager_->updatePeerState(stream->remotePeerId().value(),
346  block_announce);
347 
348  self->readAnnounce(std::move(stream));
349  });
350  }
351 
353  auto shared_msg =
355  (*shared_msg) = std::move(announce);
356 
357  SL_DEBUG(
358  base_.logger(), "Send announce of block #{}", announce.header.number);
359 
360  stream_engine_->broadcast(
361  shared_from_this(),
362  shared_msg,
364  stream_engine_->outgoingStreamsNumber(shared_from_this()),
366  }
367 
368 } // namespace kagome::network
const application::AppConfiguration & app_config_
std::shared_ptr< blockchain::BlockTree > block_tree_
void onIncomingStream(std::shared_ptr< Stream > stream) override
outcome::result< Status > createStatus() const
void newOutgoingStream(const PeerInfo &peer_info, std::function< void(outcome::result< std::shared_ptr< Stream >>)> &&cb) override
virtual network::Roles roles() const =0
const libp2p::peer::Protocol kBlockAnnouncesProtocol
Definition: common.hpp:21
primitives::BlockInfo BlockInfo
Definition: structs.hpp:29
log::Logger const & logger() const
libp2p::peer::PeerInfo PeerInfo
void readAnnounce(std::shared_ptr< Stream > stream)
KAGOME_DEFINE_CACHE(BlockAnnounceProtocol)
virtual int32_t luckyPeers() const =0
#define KAGOME_EXTRACT_SHARED_CACHE(prefix, type)
virtual const std::string & protocolId() const =0
Protocols const & protocolIds() const
void blockAnnounce(BlockAnnounce &&announce)
void writeStatus(std::shared_ptr< Stream > stream, Direction direction, std::function< void(outcome::result< void >)> &&cb)
bool start(std::weak_ptr< T > wptr)
std::shared_ptr< StreamEngine > stream_engine_
std::shared_ptr< PeerManager > peer_manager_
const std::string & protocolName() const override
void readStatus(std::shared_ptr< Stream > stream, Direction direction, std::function< void(outcome::result< void >)> &&cb)
std::shared_ptr< BlockAnnounceObserver > observer_