Kagome
Polkadot Runtime Engine in C++17
grandpa_protocol.cpp
Go to the documentation of this file.
1 
7 
8 #include <libp2p/connection/loopback_stream.hpp>
9 
11 #include "network/common.hpp"
14 #include "network/peer_manager.hpp"
16 #include "network/types/roles.hpp"
17 
18 namespace kagome::network {
19  using libp2p::connection::LoopbackStream;
20 
21  KAGOME_DEFINE_CACHE(GrandpaProtocol);
22 
24  libp2p::Host &host,
25  std::shared_ptr<boost::asio::io_context> io_context,
26  const application::AppConfiguration &app_config,
27  std::shared_ptr<consensus::grandpa::GrandpaObserver> grandpa_observer,
28  const OwnPeerInfo &own_info,
29  std::shared_ptr<StreamEngine> stream_engine,
30  std::shared_ptr<PeerManager> peer_manager,
31  const primitives::BlockHash &genesis_hash,
32  std::shared_ptr<libp2p::basic::Scheduler> scheduler)
33  : base_(host,
34  {fmt::format(kGrandpaProtocol, hex_lower(genesis_hash)),
36  "GrandpaProtocol"),
37  io_context_(std::move(io_context)),
38  app_config_(app_config),
39  grandpa_observer_(std::move(grandpa_observer)),
40  own_info_(own_info),
41  stream_engine_(std::move(stream_engine)),
42  peer_manager_(std::move(peer_manager)),
43  scheduler_(std::move(scheduler)) {}
44 
46  auto stream = std::make_shared<LoopbackStream>(own_info_, io_context_);
47  auto res = stream_engine_->addBidirectional(stream, shared_from_this());
48  if (not res.has_value()) {
49  return false;
50  }
51  read(std::move(stream));
52  return base_.start(weak_from_this());
53  }
54 
56  return base_.stop();
57  }
58 
59  void GrandpaProtocol::onIncomingStream(std::shared_ptr<Stream> stream) {
60  BOOST_ASSERT(stream->remotePeerId().has_value());
61 
63  stream,
65  [wp = weak_from_this(), stream](outcome::result<void> res) {
66  auto self = wp.lock();
67  if (not self) {
68  stream->reset();
69  return;
70  }
71 
72  auto peer_id = stream->remotePeerId().value();
73 
74  if (not res.has_value()) {
75  SL_VERBOSE(self->base_.logger(),
76  "Handshake failed on incoming {} stream with {}: {}",
77  self->protocolName(),
78  peer_id,
79  res.error().message());
80  stream->reset();
81  return;
82  }
83 
84  res = self->stream_engine_->addIncoming(stream, self);
85  if (not res.has_value()) {
86  SL_VERBOSE(self->base_.logger(),
87  "Can't register incoming {} stream with {}: {}",
88  self->protocolName(),
89  peer_id,
90  res.error().message());
91  stream->reset();
92  return;
93  }
94 
95  SL_VERBOSE(self->base_.logger(),
96  "Fully established incoming {} stream with {}",
97  self->protocolName(),
98  peer_id);
99  });
100  }
101 
103  const PeerInfo &peer_info,
104  std::function<void(outcome::result<std::shared_ptr<Stream>>)> &&cb) {
105  base_.host().newStream(
106  peer_info.id,
107  base_.protocolIds(),
108  [wp = weak_from_this(), peer_id = peer_info.id, cb = std::move(cb)](
109  auto &&stream_res) mutable {
110  auto self = wp.lock();
111  if (not self) {
113  return;
114  }
115 
116  if (not stream_res.has_value()) {
117  SL_VERBOSE(self->base_.logger(),
118  "Can't create outgoing {} stream with {}: {}",
119  self->protocolName(),
120  peer_id,
121  stream_res.error().message());
122  cb(stream_res.as_failure());
123  return;
124  }
125  auto &stream_and_proto = stream_res.value();
126 
127  auto cb2 = [wp,
128  stream = stream_and_proto.stream,
129  protocol = stream_and_proto.protocol,
130  cb = std::move(cb)](outcome::result<void> res) {
131  auto self = wp.lock();
132  if (not self) {
134  return;
135  }
136 
137  if (not res.has_value()) {
138  SL_VERBOSE(self->base_.logger(),
139  "Handshake failed on outgoing {} stream with {}: {}",
140  protocol,
141  stream->remotePeerId().value(),
142  res.error().message());
143  stream->reset();
144  cb(res.as_failure());
145  return;
146  }
147 
148  res = self->stream_engine_->addOutgoing(stream, self);
149  if (not res.has_value()) {
150  SL_VERBOSE(self->base_.logger(),
151  "Can't register outgoing {} stream with {}: {}",
152  protocol,
153  stream->remotePeerId().value(),
154  res.error().message());
155  stream->reset();
156  cb(res.as_failure());
157  return;
158  }
159 
160  SL_VERBOSE(self->base_.logger(),
161  "Fully established outgoing {} stream with {}",
162  protocol,
163  stream->remotePeerId().value());
164 
165  // Send neighbor message first
166  auto own_peer_state =
167  self->peer_manager_->getPeerState(self->own_info_.id);
168  if (own_peer_state.has_value()) {
170  .round_number = own_peer_state->get().round_number.value_or(1),
171  .voter_set_id = own_peer_state->get().set_id.value_or(0),
172  .last_finalized = own_peer_state->get().last_finalized};
173 
174  SL_DEBUG(self->base_.logger(),
175  "Send initial neighbor message: grandpa round number {}",
176  msg.round_number);
177 
178  auto shared_msg =
180  (*shared_msg) = GrandpaMessage(std::move(msg));
181 
182  self->stream_engine_->send(
183  stream->remotePeerId().value(), self, std::move(shared_msg));
184  }
185 
186  cb(std::move(stream));
187  };
188 
189  self->writeHandshake(std::move(stream_and_proto.stream),
191  std::move(cb2));
192  });
193  }
194 
196  std::shared_ptr<Stream> stream,
197  Direction direction,
198  std::function<void(outcome::result<void>)> &&cb) {
199  auto read_writer = std::make_shared<ScaleMessageReadWriter>(stream);
200 
201  read_writer->read<Roles>(
202  [stream, direction, wp = weak_from_this(), cb = std::move(cb)](
203  auto &&remote_roles_res) mutable {
204  auto self = wp.lock();
205  if (not self) {
206  stream->reset();
208  return;
209  }
210 
211  if (not remote_roles_res.has_value()) {
212  SL_VERBOSE(self->base_.logger(),
213  "Can't read handshake from {}: {}",
214  stream->remotePeerId().value(),
215  remote_roles_res.error().message());
216  stream->reset();
217  cb(remote_roles_res.as_failure());
218  return;
219  }
220  // auto &remote_roles = remote_roles_res.value();
221 
222  SL_TRACE(self->base_.logger(),
223  "Handshake has received from {}",
224  stream->remotePeerId().value());
225 
226  switch (direction) {
227  case Direction::OUTGOING:
228  cb(outcome::success());
229  break;
230  case Direction::INCOMING:
231  self->writeHandshake(
232  std::move(stream), Direction::INCOMING, std::move(cb));
233  break;
234  }
235  });
236  }
237 
239  std::shared_ptr<Stream> stream,
240  Direction direction,
241  std::function<void(outcome::result<void>)> &&cb) {
242  auto read_writer = std::make_shared<ScaleMessageReadWriter>(stream);
243 
244  Roles roles = app_config_.roles();
245 
246  read_writer->write(roles,
247  [stream = std::move(stream),
248  direction,
249  wp = weak_from_this(),
250  cb = std::move(cb)](auto &&write_res) mutable {
251  auto self = wp.lock();
252  if (not self) {
253  stream->reset();
255  return;
256  }
257 
258  if (not write_res.has_value()) {
259  SL_VERBOSE(self->base_.logger(),
260  "Can't send handshake to {}: {}",
261  stream->remotePeerId().value(),
262  write_res.error().message());
263  stream->reset();
264  cb(write_res.as_failure());
265  return;
266  }
267 
268  SL_TRACE(self->base_.logger(),
269  "Handshake has sent to {}",
270  stream->remotePeerId().value());
271 
272  switch (direction) {
273  case Direction::OUTGOING:
274  self->readHandshake(
275  std::move(stream), direction, std::move(cb));
276  break;
277  case Direction::INCOMING:
278  cb(outcome::success());
279  self->read(std::move(stream));
280  break;
281  }
282  });
283  }
284 
285  void GrandpaProtocol::read(std::shared_ptr<Stream> stream) {
286  auto read_writer = std::make_shared<ScaleMessageReadWriter>(stream);
287 
288  read_writer->read<GrandpaMessage>([stream = std::move(stream),
289  wp = weak_from_this()](
290  auto &&grandpa_message_res) mutable {
291  auto self = wp.lock();
292  if (not self) {
293  stream->reset();
294  return;
295  }
296 
297  if (not grandpa_message_res.has_value()) {
298  SL_VERBOSE(self->base_.logger(),
299  "Can't read grandpa message from {}: {}",
300  stream->remotePeerId().value(),
301  grandpa_message_res.error().message());
302  stream->reset();
303  return;
304  }
305 
306  auto peer_id = stream->remotePeerId().value();
307  auto &grandpa_message = grandpa_message_res.value();
308 
309  visit_in_place(
310  grandpa_message,
311  [&](const network::GrandpaVote &vote_message) {
312  SL_VERBOSE(self->base_.logger(),
313  "VoteMessage has received from {}",
314  peer_id);
315  self->grandpa_observer_->onVoteMessage(peer_id, vote_message);
316  },
317  [&](const FullCommitMessage &commit_message) {
318  SL_VERBOSE(self->base_.logger(),
319  "CommitMessage has received from {}",
320  peer_id);
321  self->grandpa_observer_->onCommitMessage(peer_id, commit_message);
322  },
323  [&](const GrandpaNeighborMessage &neighbor_message) {
324  SL_VERBOSE(self->base_.logger(),
325  "NeighborMessage has received from {}",
326  peer_id);
327  self->grandpa_observer_->onNeighborMessage(peer_id,
328  neighbor_message);
329  },
330  [&](const network::CatchUpRequest &catch_up_request) {
331  SL_VERBOSE(self->base_.logger(),
332  "CatchUpRequest has received from {}",
333  peer_id);
334  self->grandpa_observer_->onCatchUpRequest(peer_id,
335  catch_up_request);
336  },
337  [&](const network::CatchUpResponse &catch_up_response) {
338  SL_VERBOSE(self->base_.logger(),
339  "CatchUpResponse has received from {}",
340  peer_id);
341  self->grandpa_observer_->onCatchUpResponse(peer_id,
342  catch_up_response);
343  });
344  self->read(std::move(stream));
345  });
346  }
347 
349  network::GrandpaVote &&vote_message,
350  std::optional<const libp2p::peer::PeerId> peer_id) {
351  SL_DEBUG(base_.logger(),
352  "Send vote message: grandpa round number {}",
353  vote_message.round_number);
354 
355  auto filter = [&, &msg = vote_message](const PeerId &peer_id) {
356  auto info_opt = peer_manager_->getPeerState(peer_id);
357  if (not info_opt.has_value()) {
358  SL_DEBUG(base_.logger(),
359  "Vote signed by {} with set_id={} in round={} "
360  "has not been sent to {}: peer is not connected",
361  msg.id(),
362  msg.counter,
363  msg.round_number,
364  peer_id);
365  return false;
366  }
367  const auto &info = info_opt.value();
368 
369  if (not info.get().set_id.has_value() or not info.get().round_number.has_value()) {
370  SL_DEBUG(base_.logger(),
371  "Vote signed by {} with set_id={} in round={} "
372  "has not been sent to {}: set id or round number unknown",
373  msg.id(),
374  msg.counter,
375  msg.round_number,
376  peer_id);
377  return false;
378  }
379 
380  // If a peer is at a given voter set, it is impolite to send messages
381  // from an earlier voter set. It is extremely impolite to send messages
382  // from a future voter set.
383  if (msg.counter != info.get().set_id) {
384  SL_DEBUG(base_.logger(),
385  "Vote signed by {} with set_id={} in round={} "
386  "has not been sent to {} as impolite: their set id is {}",
387  msg.id(),
388  msg.counter,
389  msg.round_number,
390  peer_id,
391  info.get().set_id.value());
392  return false;
393  }
394 
395  // If a peer is at round r, is impolite to send messages about r-2 or
396  // earlier
397  if (msg.round_number + 2 < info.get().round_number.value()) {
398  SL_DEBUG(
399  base_.logger(),
400  "Vote signed by {} with set_id={} in round={} "
401  "has not been sent to {} as impolite: their round is already {}",
402  msg.id(),
403  msg.counter,
404  msg.round_number,
405  peer_id,
406  info.get().round_number.value());
407  return false;
408  }
409 
410  // If a peer is at round r, is extremely impolite to send messages about
411  // r+1 or later
412  if (msg.round_number > info.get().round_number.value()) {
413  SL_DEBUG(base_.logger(),
414  "Vote signed by {} with set_id={} in round={} "
415  "has not been sent to {} as impolite: their round is old: {}",
416  msg.id(),
417  msg.counter,
418  msg.round_number,
419  peer_id,
420  info.get().round_number.value());
421  return false;
422  }
423 
424  return true;
425  };
426 
427  auto shared_msg =
429  (*shared_msg) = GrandpaMessage(std::move(vote_message));
430 
431  if (not peer_id.has_value()) {
432  stream_engine_->broadcast<GrandpaMessage>(
433  shared_from_this(), std::move(shared_msg), filter);
434  } else {
435  stream_engine_->send(
436  peer_id.value(), shared_from_this(), std::move(shared_msg));
437  };
438  }
439 
441  SL_DEBUG(base_.logger(),
442  "Send neighbor message: grandpa round number {}",
443  msg.round_number);
444 
445  peer_manager_->updatePeerState(own_info_.id, msg);
446 
447  auto shared_msg =
449  (*shared_msg) = GrandpaMessage(msg);
450 
451  stream_engine_->broadcast<GrandpaMessage>(
452  shared_from_this(),
453  shared_msg,
455  stream_engine_->outgoingStreamsNumber(shared_from_this()),
457  }
458 
460  FullCommitMessage &&msg,
461  std::optional<const libp2p::peer::PeerId> peer_id) {
462  SL_DEBUG(base_.logger(),
463  "Send commit message: grandpa round number {}",
464  msg.round);
465 
466  auto filter = [this,
467  set_id = msg.set_id,
468  round_number = msg.round,
469  finalizing =
470  msg.message.target_number](const PeerId &peer_id) {
471  auto info_opt = peer_manager_->getPeerState(peer_id);
472  if (not info_opt.has_value()) {
473  SL_DEBUG(base_.logger(),
474  "Commit with set_id={} in round={} "
475  "has not been sent to {}: peer is not connected",
476  set_id,
477  round_number,
478  peer_id);
479  return false;
480  }
481  const auto &info = info_opt.value();
482 
483  if (not info.get().set_id.has_value() or not info.get().round_number.has_value()) {
484  SL_DEBUG(base_.logger(),
485  "Commit with set_id={} in round={} "
486  "has not been sent to {}: set id or round number unknown",
487  set_id,
488  round_number,
489  peer_id);
490  return false;
491  }
492 
493  // It is especially impolite to send commits which are invalid, or from
494  // a different Set ID than the receiving peer has indicated.
495  if (set_id != info.get().set_id) {
496  SL_DEBUG(base_.logger(),
497  "Commit with set_id={} in round={} "
498  "has not been sent to {} as impolite: their set id is {}",
499  set_id,
500  round_number,
501  peer_id,
502  info.get().set_id.value());
503  return false;
504  }
505 
506  // Don't send commit if that has not actual for remote peer already
507  if (round_number < info.get().round_number.value()) {
508  SL_DEBUG(
509  base_.logger(),
510  "Commit with set_id={} in round={} "
511  "has not been sent to {} as impolite: their round is already {}",
512  set_id,
513  round_number,
514  peer_id,
515  info.get().round_number.value());
516  return false;
517  }
518 
519  // It is impolite to send commits which are earlier than the last commit
520  // sent.
521  if (finalizing < info.get().last_finalized) {
522  SL_DEBUG(
523  base_.logger(),
524  "Commit with set_id={} in round={} "
525  "has not been sent to {} as impolite: their round is already {}",
526  set_id,
527  round_number,
528  peer_id,
529  info.get().round_number.value());
530  return false;
531  }
532 
533  return true;
534  };
535 
536  auto shared_msg =
538  (*shared_msg) = GrandpaMessage(std::move(msg));
539 
540  if (not peer_id.has_value()) {
541  stream_engine_->broadcast<GrandpaMessage>(
542  shared_from_this(), std::move(shared_msg), filter);
543  } else {
544  stream_engine_->send(
545  peer_id.value(), shared_from_this(), std::move(shared_msg));
546  }
547  }
548 
550  CatchUpRequest &&catch_up_request) {
551  SL_DEBUG(
552  base_.logger(),
553  "Send catch-up-request to {} beginning with grandpa round number {}",
554  peer_id,
555  catch_up_request.round_number);
556 
557  auto info_opt = peer_manager_->getPeerState(peer_id);
558  if (not info_opt.has_value()) {
559  SL_DEBUG(base_.logger(),
560  "Catch-up-request with set_id={} in round={} "
561  "has not been sent to {}: peer is not connected",
562  catch_up_request.voter_set_id,
563  catch_up_request.round_number,
564  peer_id);
565  return;
566  }
567  const auto &info = info_opt.value();
568 
569  if (not info.get().set_id.has_value() or not info.get().round_number.has_value()) {
570  SL_DEBUG(base_.logger(),
571  "Catch-up-request with set_id={} in round={} "
572  "has not been sent to {}: set id or round number unknown",
573  catch_up_request.voter_set_id,
574  catch_up_request.round_number,
575  peer_id);
576  return;
577  }
578 
579  // Impolite to send a catch up request to a peer in a new different Set ID.
580  if (catch_up_request.voter_set_id != info.get().set_id) {
581  SL_DEBUG(base_.logger(),
582  "Catch-up-request with set_id={} in round={} "
583  "has not been sent to {}: different set id",
584  catch_up_request.voter_set_id,
585  catch_up_request.round_number,
586  peer_id);
587  return;
588  }
589 
590  // It is impolite to send a catch-up request for a round `R` to a peer
591  // whose announced view is behind `R`.
592  if (catch_up_request.round_number < info.get().round_number.value() - 1) {
593  SL_DEBUG(base_.logger(),
594  "Catch-up-request with set_id={} in round={} "
595  "has not been sent to {}: too old round for requested",
596  catch_up_request.voter_set_id,
597  catch_up_request.round_number,
598  peer_id);
599  return;
600  }
601 
602  auto round_id = std::tuple(info.get().round_number.value(), info.get().set_id.value());
603 
604  auto [iter_by_round, ok_by_round] =
605  recent_catchup_requests_by_round_.emplace(round_id);
606 
607  if (not ok_by_round) {
608  SL_DEBUG(base_.logger(),
609  "Catch-up-request with set_id={} in round={} "
610  "has not been sent to {}: "
611  "the same catch-up request had sent to another peer",
612  catch_up_request.voter_set_id,
613  catch_up_request.round_number,
614  peer_id);
615  return;
616  }
617 
618  auto [iter_by_peer, ok_by_peer] =
619  recent_catchup_requests_by_peer_.emplace(peer_id);
620 
621  // It is impolite to replay a catch-up request
622  if (not ok_by_peer) {
623  recent_catchup_requests_by_round_.erase(iter_by_round);
624  SL_DEBUG(base_.logger(),
625  "Catch-up-request with set_id={} in round={} "
626  "has not been sent to {}: impolite to replay catch-up request",
627  catch_up_request.voter_set_id,
628  catch_up_request.round_number,
629  peer_id);
630  return;
631  }
632 
633  scheduler_->schedule(
634  [wp = weak_from_this(), round_id, peer_id] {
635  if (auto self = wp.lock()) {
636  self->recent_catchup_requests_by_round_.erase(round_id);
637  self->recent_catchup_requests_by_peer_.erase(peer_id);
638  }
639  },
641 
642  auto shared_msg =
644  (*shared_msg) = GrandpaMessage(std::move(catch_up_request));
645 
646  stream_engine_->send(peer_id, shared_from_this(), std::move(shared_msg));
647  }
648 
650  CatchUpResponse &&catch_up_response) {
651  SL_DEBUG(base_.logger(),
652  "Send catch-up response: beginning with grandpa round number {}",
653  catch_up_response.round_number);
654 
655  auto info_opt = peer_manager_->getPeerState(peer_id);
656  if (not info_opt.has_value()) {
657  SL_DEBUG(base_.logger(),
658  "Catch-up-response with set_id={} in round={} "
659  "has not been sent to {}: peer is not connected",
660  catch_up_response.voter_set_id,
661  catch_up_response.round_number,
662  peer_id);
663  return;
664  }
665  const auto &info = info_opt.value();
666 
667  if (not info.get().set_id.has_value() or not info.get().round_number.has_value()) {
668  SL_DEBUG(base_.logger(),
669  "Catch-up-response with set_id={} in round={} "
670  "has not been sent to {}: set id or round number unknown",
671  catch_up_response.voter_set_id,
672  catch_up_response.round_number,
673  peer_id);
674  return;
675  }
676 
678  if (catch_up_response.voter_set_id != info.get().set_id) {
679  SL_DEBUG(base_.logger(),
680  "Catch-up-response with set_id={} in round={} "
681  "has not been sent to {}: {} set id",
682  catch_up_response.voter_set_id,
683  catch_up_response.round_number,
684  peer_id,
685  info.get().set_id.has_value() ? "different" : "unknown");
686  return;
687  }
688 
690  if (catch_up_response.round_number < info.get().round_number) {
691  SL_DEBUG(base_.logger(),
692  "Catch-up-response with set_id={} in round={} "
693  "has not been sent to {}: is already not actual",
694  catch_up_response.voter_set_id,
695  catch_up_response.round_number,
696  peer_id);
697  return;
698  }
699 
700  auto shared_msg =
702  (*shared_msg) = GrandpaMessage(std::move(catch_up_response));
703 
704  stream_engine_->send(peer_id, shared_from_this(), std::move(shared_msg));
705  }
706 
707 } // namespace kagome::network
boost::variant< GrandpaVote, FullCommitMessage, GrandpaNeighborMessage, CatchUpRequest, CatchUpResponse > GrandpaMessage
std::string hex_lower(const gsl::span< const uint8_t > bytes) noexcept
Converts bytes to hex representation.
Definition: hexutil.cpp:52
const libp2p::peer::Protocol kGrandpaProtocol
Definition: common.hpp:24
std::shared_ptr< boost::asio::io_context > io_context_
virtual network::Roles roles() const =0
void readHandshake(std::shared_ptr< Stream > stream, Direction direction, std::function< void(outcome::result< void >)> &&cb)
void finalize(FullCommitMessage &&msg, std::optional< const libp2p::peer::PeerId > peer_id)
void read(std::shared_ptr< Stream > stream)
log::Logger const & logger() const
std::shared_ptr< PeerManager > peer_manager_
libp2p::peer::PeerInfo PeerInfo
std::set< libp2p::peer::PeerId > recent_catchup_requests_by_peer_
libp2p::peer::PeerId PeerId
KAGOME_DEFINE_CACHE(BlockAnnounceProtocol)
virtual int32_t luckyPeers() const =0
#define KAGOME_EXTRACT_SHARED_CACHE(prefix, type)
void catchUpRequest(const libp2p::peer::PeerId &peer_id, CatchUpRequest &&catch_up_request)
void neighbor(GrandpaNeighborMessage &&msg)
Protocols const & protocolIds() const
bool start(std::weak_ptr< T > wptr)
const libp2p::peer::Protocol kGrandpaProtocolLegacy
Definition: common.hpp:23
std::shared_ptr< consensus::grandpa::GrandpaObserver > grandpa_observer_
void onIncomingStream(std::shared_ptr< Stream > stream) override
std::set< std::tuple< consensus::grandpa::RoundNumber, consensus::grandpa::VoterSetId > > recent_catchup_requests_by_round_
void writeHandshake(std::shared_ptr< Stream > stream, Direction direction, std::function< void(outcome::result< void >)> &&cb)
void catchUpResponse(const libp2p::peer::PeerId &peer_id, CatchUpResponse &&catch_up_response)
const application::AppConfiguration & app_config_
std::shared_ptr< StreamEngine > stream_engine_
void vote(network::GrandpaVote &&vote_message, std::optional< const libp2p::peer::PeerId > peer_id)
static constexpr std::chrono::milliseconds kRecentnessDuration
std::shared_ptr< libp2p::basic::Scheduler > scheduler_
void newOutgoingStream(const PeerInfo &peer_info, std::function< void(outcome::result< std::shared_ptr< Stream >>)> &&cb) override