Kagome
Polkadot Runtime Engine in C++17
peer_manager_impl.cpp
Go to the documentation of this file.
1 
7 
8 #include <execinfo.h>
9 #include <limits>
10 #include <memory>
11 
12 #include <libp2p/protocol/kademlia/impl/peer_routing_table.hpp>
13 
14 #include "outcome/outcome.hpp"
15 #include "scale/libp2p_types.hpp"
17 
18 namespace {
19  constexpr const char *syncPeerMetricName = "kagome_sync_peers";
21  static constexpr int32_t kDisconnectReputation = -256;
22 } // namespace
23 
26  switch (e) {
27  case E::UNDECLARED_COLLATOR:
28  return "Process handling from undeclared collator";
29  case E::OUT_OF_VIEW:
30  return "Processing para hash, which is out of view";
31  case E::DUPLICATE:
32  return "Processing duplicated hash";
33  }
34  return "Unknown error in ChainSpecImpl";
35 }
36 
37 namespace kagome::network {
39  std::shared_ptr<application::AppStateManager> app_state_manager,
40  libp2p::Host &host,
41  std::shared_ptr<libp2p::protocol::Identify> identify,
42  std::shared_ptr<libp2p::protocol::kademlia::Kademlia> kademlia,
43  std::shared_ptr<libp2p::basic::Scheduler> scheduler,
44  std::shared_ptr<StreamEngine> stream_engine,
45  const application::AppConfiguration &app_config,
46  std::shared_ptr<clock::SteadyClock> clock,
47  const BootstrapNodes &bootstrap_nodes,
48  const OwnPeerInfo &own_peer_info,
49  std::shared_ptr<network::Router> router,
50  std::shared_ptr<storage::BufferStorage> storage,
51  std::shared_ptr<crypto::Hasher> hasher,
52  std::shared_ptr<ReputationRepository> reputation_repository)
53  : app_state_manager_(std::move(app_state_manager)),
54  host_(host),
55  identify_(std::move(identify)),
56  kademlia_(std::move(kademlia)),
57  scheduler_(std::move(scheduler)),
58  stream_engine_(std::move(stream_engine)),
59  app_config_(app_config),
60  clock_(std::move(clock)),
61  bootstrap_nodes_(bootstrap_nodes),
62  own_peer_info_(own_peer_info),
63  router_{std::move(router)},
64  storage_{std::move(storage)},
65  hasher_{std::move(hasher)},
66  reputation_repository_{std::move(reputation_repository)},
67  log_(log::createLogger("PeerManager", "network")) {
68  BOOST_ASSERT(app_state_manager_ != nullptr);
69  BOOST_ASSERT(identify_ != nullptr);
70  BOOST_ASSERT(kademlia_ != nullptr);
71  BOOST_ASSERT(scheduler_ != nullptr);
72  BOOST_ASSERT(stream_engine_ != nullptr);
73  BOOST_ASSERT(router_ != nullptr);
74  BOOST_ASSERT(storage_ != nullptr);
75  BOOST_ASSERT(hasher_ != nullptr);
76  BOOST_ASSERT(reputation_repository_ != nullptr);
77 
78  // Register metrics
79  registry_->registerGaugeFamily(syncPeerMetricName,
80  "Number of peers we sync with");
81  sync_peer_num_ = registry_->registerGaugeMetric(syncPeerMetricName);
82  sync_peer_num_->set(0);
83 
84  app_state_manager_->takeControl(*this);
85  }
86 
88  if (not app_config_.isRunInDevMode() && bootstrap_nodes_.empty()) {
89  log_->critical(
90  "Does not have any bootstrap nodes. "
91  "Provide them by chain spec or CLI argument `--bootnodes'");
92  return false;
93  }
94 
95  return true;
96  }
97 
99  if (app_config_.isRunInDevMode() && bootstrap_nodes_.empty()) {
100  log_->warn(
101  "Peer manager is started in passive mode, "
102  "because have not any bootstrap nodes.");
103  return true;
104  }
105 
106  // Add themselves into peer routing
107  kademlia_->addPeer(host_.getPeerInfo(), true);
108  // It is used only for DEV mode
109  processDiscoveredPeer(host_.getPeerInfo().id);
110 
112  host_.getBus()
113  .getChannel<libp2p::event::protocol::kademlia::PeerAddedChannel>()
114  .subscribe([wp = weak_from_this()](const PeerId &peer_id) {
115  if (auto self = wp.lock()) {
116  if (auto rating =
117  self->reputation_repository_->reputation(peer_id);
118  rating < 0) {
119  SL_DEBUG(self->log_,
120  "Disconnecting from peer {} due to its negative "
121  "reputation {}",
122  peer_id,
123  rating);
124  self->disconnectFromPeer(peer_id);
125  return;
126  }
127  self->processDiscoveredPeer(peer_id);
128  }
129  });
130 
132  host_.getBus()
133  .getChannel<libp2p::event::network::OnPeerDisconnectedChannel>()
134  .subscribe([wp = weak_from_this()](const PeerId &peer_id) {
135  if (auto self = wp.lock()) {
136  SL_DEBUG(self->log_,
137  "OnPeerDisconnectedChannel handler from peer {}",
138  peer_id);
139  self->stream_engine_->del(peer_id);
140  self->peer_states_.erase(peer_id);
141  self->active_peers_.erase(peer_id);
142  self->connecting_peers_.erase(peer_id);
143  self->sync_peer_num_->set(self->active_peers_.size());
144  SL_DEBUG(self->log_,
145  "Remained {} active peers",
146  self->active_peers_.size());
147  }
148  });
149 
150  identify_->onIdentifyReceived([wp = weak_from_this()](
151  const PeerId &peer_id) {
152  if (auto self = wp.lock()) {
153  SL_DEBUG(self->log_, "Identify received from peer {}", peer_id);
154  if (auto rating = self->reputation_repository_->reputation(peer_id);
155  rating < 0) {
156  SL_DEBUG(
157  self->log_,
158  "Disconnecting from peer {} due to its negative reputation {}",
159  peer_id,
160  rating);
161  self->disconnectFromPeer(peer_id);
162  return;
163  }
164  self->processFullyConnectedPeer(peer_id);
165  }
166  });
167 
168  // Start Identify protocol
169  identify_->start();
170 
171  // Enqueue bootstrap nodes with permanent lifetime
172  for (const auto &bootstrap_node : bootstrap_nodes_) {
173  kademlia_->addPeer(bootstrap_node, true);
174  }
175 
176  // Enqueue last active peers as first peers set but with limited lifetime
177  auto last_active_peers = loadLastActivePeers();
178  SL_DEBUG(log_,
179  "Loaded {} last active peers' record(s)",
180  last_active_peers.size());
181  for (const auto &peer_info : last_active_peers) {
182  kademlia_->addPeer(peer_info, false);
183  }
184 
185  // Start Kademlia (processing incoming message and random walking)
186  kademlia_->start();
187 
188  // Do first alignment of peers count
189  align();
190 
191  return true;
192  }
193 
196  add_peer_handle_.unsubscribe();
197  peer_disconnected_handler_.unsubscribe();
198  }
199 
200  void PeerManagerImpl::connectToPeer(const PeerInfo &peer_info) {
201  auto res = host_.getPeerRepository().getAddressRepository().upsertAddresses(
202  peer_info.id, peer_info.addresses, libp2p::peer::ttl::kTransient);
203  if (res) {
204  connectToPeer(peer_info.id);
205  }
206  }
207 
209  return active_peers_.size();
210  }
211 
212  std::shared_ptr<StreamEngine> PeerManagerImpl::getStreamEngine() {
213  BOOST_ASSERT(stream_engine_);
214  return stream_engine_;
215  }
216 
218  std::function<void(const PeerId &)> func) const {
219  for (auto &it : active_peers_) {
220  func(it.first);
221  }
222  }
223 
225  const PeerId &peer_id,
226  network::CollatorPublicKey const &collator_id,
227  network::ParachainId para_id) {
228  if (auto it = peer_states_.find(peer_id); it != peer_states_.end()) {
229  BOOST_ASSERT(!it->second.collator_state
230  && !!"Collator state should be empty at the time.");
231  it->second.collator_state = CollatorState{.parachain_id = para_id,
232  .collator_id = collator_id,
233  .advertisements = {}};
234  it->second.time = clock_->now();
235  }
236  }
237 
239  const PeerId &peer_id, std::function<void(const PeerId &)> func) const {
240  if (active_peers_.count(peer_id)) {
241  func(peer_id);
242  }
243  }
244 
245  outcome::result<
246  std::pair<network::CollatorPublicKey const &, network::ParachainId>>
248  ParachainState &parachain_state,
249  primitives::BlockHash para_hash) {
250  if (!peer_state.collator_state) return Error::UNDECLARED_COLLATOR;
251 
252  if (parachain_state.our_view.count(para_hash) == 0)
253  return Error::OUT_OF_VIEW;
254 
255  if (peer_state.collator_state.value().advertisements.count(para_hash) != 0)
256  return Error::DUPLICATE;
257 
258  peer_state.collator_state.value().advertisements.insert(
259  std::move(para_hash));
260  return std::make_pair(peer_state.collator_state.value().collator_id,
261  peer_state.collator_state.value().parachain_id);
262  }
263 
265  SL_TRACE(log_, "Try to align peers number");
266 
267  const auto target_count = app_config_.peeringConfig().targetPeerAmount;
268  const auto hard_limit = app_config_.peeringConfig().hardLimit;
269  const auto peer_ttl = app_config_.peeringConfig().peerTtl;
270 
271  align_timer_.cancel();
272 
274 
275  // disconnect from peers with negative reputation
276  using PriorityType = int32_t;
277  using ItemType = std::pair<PriorityType, PeerId>;
278 
279  std::vector<ItemType> peers_list;
280  peers_list.reserve(active_peers_.size());
281 
282  uint64_t const now_ms =
283  std::chrono::time_point_cast<std::chrono::milliseconds>(clock_->now())
284  .time_since_epoch()
285  .count();
286  uint64_t const idle_ms =
287  std::chrono::duration_cast<std::chrono::milliseconds>(peer_ttl).count();
288 
289  for (const auto &[peer_id, desc] : active_peers_) {
290  uint64_t const last_activity_ms =
291  std::chrono::time_point_cast<std::chrono::milliseconds>(
292  desc.time_point)
293  .time_since_epoch()
294  .count();
295 
296  const auto peer_reputation = reputation_repository_->reputation(peer_id);
297  if (peer_reputation < kDisconnectReputation
298  || last_activity_ms + idle_ms < now_ms) {
299  peers_list.push_back(
300  std::make_pair(std::numeric_limits<PriorityType>::min(), peer_id));
301  // we have to store peers somewhere first due to inability to iterate
302  // over active_peers_ and do disconnectFromPeers (which modifies
303  // active_peers_) at the same time
304  } else {
305  peers_list.push_back(std::make_pair(peer_reputation, peer_id));
306  }
307  }
308 
309  std::sort(peers_list.begin(),
310  peers_list.end(),
311  [](auto const &l, auto const &r) { return r.first < l.first; });
312 
313  for (; !peers_list.empty()
314  && (peers_list.size() > hard_limit
315  || peers_list.back().first
316  == std::numeric_limits<PriorityType>::min());
317  peers_list.pop_back()) {
318  const auto &peer_id = peers_list.back().second;
319  disconnectFromPeer(peer_id);
320  }
321 
322  // Not enough active peers
323  if (active_peers_.size() < target_count) {
324  if (not queue_to_connect_.empty()) {
325  for (;;) {
326  auto node = peers_in_queue_.extract(queue_to_connect_.front());
327  auto &peer_id = node.value();
328 
329  queue_to_connect_.pop_front();
330  BOOST_ASSERT(queue_to_connect_.size() == peers_in_queue_.size());
331 
332  if (connecting_peers_.emplace(peer_id).second) {
333  connectToPeer(peer_id);
334 
335  SL_TRACE(log_,
336  "Remained peers in queue for connect: {}",
337  peers_in_queue_.size());
338  break;
339  }
340  }
341  } else if (connecting_peers_.empty()) {
342  SL_DEBUG(log_, "Queue for connect is empty. Reuse bootstrap nodes");
343  for (const auto &bootstrap_node : bootstrap_nodes_) {
344  if (own_peer_info_.id != bootstrap_node.id) {
345  if (connecting_peers_.emplace(bootstrap_node.id).second) {
346  connectToPeer(bootstrap_node.id);
347  }
348  }
349  }
350  } else {
351  SL_DEBUG(log_,
352  "Queue for connect is empty. Connecting peers: {}",
353  connecting_peers_.size());
354  }
355  }
356 
357  const auto aligning_period = app_config_.peeringConfig().aligningPeriod;
358 
359  align_timer_ = scheduler_->scheduleWithHandle(
360  [wp = weak_from_this()] {
361  if (auto self = wp.lock()) {
362  self->align();
363  }
364  },
365  aligning_period);
366  SL_DEBUG(log_, "Active peers = {}", active_peers_.size());
367  }
368 
369  void PeerManagerImpl::connectToPeer(const PeerId &peer_id) {
370  // Skip connection to itself
371  if (isSelfPeer(peer_id)) {
372  connecting_peers_.erase(peer_id);
373  return;
374  }
375 
376  auto peer_info = host_.getPeerRepository().getPeerInfo(peer_id);
377  if (peer_info.addresses.empty()) {
378  SL_DEBUG(log_, "Not found addresses for peer {}", peer_id);
379  connecting_peers_.erase(peer_id);
380  return;
381  }
382 
383  auto connectedness = host_.connectedness(peer_info);
384  if (connectedness == libp2p::Host::Connectedness::CAN_NOT_CONNECT) {
385  SL_DEBUG(log_, "Can not connect to peer {}", peer_id);
386  connecting_peers_.erase(peer_id);
387  return;
388  }
389 
390  SL_DEBUG(log_, "Try to connect to peer {}", peer_info.id);
391  for (auto addr : peer_info.addresses) {
392  SL_DEBUG(log_, " address: {}", addr.getStringAddress());
393  }
394 
395  host_.connect(
396  peer_info,
397  [wp = weak_from_this(), peer_id](auto res) mutable {
398  auto self = wp.lock();
399  if (not self) {
400  return;
401  }
402 
403  if (not res.has_value()) {
404  SL_DEBUG(self->log_,
405  "Connecting to peer {} is failed: {}",
406  peer_id,
407  res.error().message());
408  self->connecting_peers_.erase(peer_id);
409  return;
410  }
411 
412  auto &connection = res.value();
413  auto remote_peer_id_res = connection->remotePeer();
414  if (not remote_peer_id_res.has_value()) {
415  SL_DEBUG(
416  self->log_,
417  "Connected, but not identified yet (expecting peer_id={:l})",
418  peer_id);
419  self->connecting_peers_.erase(peer_id);
420  return;
421  }
422 
423  auto &remote_peer_id = remote_peer_id_res.value();
424  if (remote_peer_id == peer_id) {
425  SL_DEBUG(self->log_, "Connected to peer {}", peer_id);
426 
427  self->processFullyConnectedPeer(peer_id);
428  }
429  },
431  }
432 
434  if (peer_id == own_peer_info_.id) {
435  return;
436  }
437 
438  SL_INFO(log_, "Disconnect from peer {}", peer_id);
439  host_.disconnect(peer_id);
440  }
441 
442  void PeerManagerImpl::keepAlive(const PeerId &peer_id) {
443  auto it = active_peers_.find(peer_id);
444  if (it != active_peers_.end()) {
445  it->second.time_point = clock_->now();
446  }
447  }
448 
450  auto ping_protocol = router_->getPingProtocol();
451  BOOST_ASSERT_MSG(ping_protocol, "Router did not provide ping protocol");
452 
453  auto conn =
454  host_.getNetwork().getConnectionManager().getBestConnectionForPeer(
455  peer_id);
456  if (conn == nullptr) {
457  SL_DEBUG(log_,
458  "Failed to start pinging {}: No connection to this peer exists",
459  peer_id);
460  return;
461  }
463  auto [_, is_emplaced] = pinging_connections_.emplace(conn);
464  if (not is_emplaced) {
465  // Pinging is already going
466  return;
467  }
468 
469  SL_DEBUG(log_,
470  "Start pinging of {} (conn={})",
471  peer_id,
472  static_cast<void *>(conn.get()));
473 
474  ping_protocol->startPinging(
475  conn,
476  [wp = weak_from_this(), peer_id, conn](
477  outcome::result<std::shared_ptr<
478  libp2p::protocol::PingClientSession>> session_res) {
479  if (auto self = wp.lock()) {
480  if (session_res.has_error()) {
481  SL_DEBUG(self->log_,
482  "Stop pinging of {} (conn={}): {}",
483  peer_id,
484  static_cast<void *>(conn.get()),
485  session_res.error().message());
486  self->pinging_connections_.erase(conn);
487  self->disconnectFromPeer(peer_id);
488  } else {
489  SL_DEBUG(self->log_,
490  "Pinging: {} (conn={}) is alive",
491  peer_id,
492  static_cast<void *>(conn.get()));
493  self->keepAlive(peer_id);
494  }
495  }
496  });
497  }
498 
500  const Status &status) {
501  auto [it, is_new] = peer_states_.emplace(peer_id, PeerState{});
502  it->second.time = clock_->now();
503  it->second.roles = status.roles;
504  it->second.best_block = status.best_block;
505  }
506 
508  return parachain_state_;
509  }
510 
512  const BlockAnnounce &announce) {
513  auto hash = hasher_->blake2b_256(scale::encode(announce.header).value());
514 
515  auto [it, _] = peer_states_.emplace(peer_id, PeerState{});
516  it->second.time = clock_->now();
517  it->second.best_block = {announce.header.number, hash};
518  }
519 
521  const PeerId &peer_id, const GrandpaNeighborMessage &neighbor_message) {
522  auto [it, _] = peer_states_.emplace(peer_id, PeerState{});
523  it->second.time = clock_->now();
524  it->second.round_number = neighbor_message.round_number;
525  it->second.set_id = neighbor_message.voter_set_id;
526  it->second.last_finalized = neighbor_message.last_finalized;
527  }
528 
529  std::optional<std::reference_wrapper<PeerState>>
531  auto it = peer_states_.find(peer_id);
532  if (it == peer_states_.end()) {
533  return std::nullopt;
534  }
535  return it->second;
536  }
537 
539  // Ignore himself
540  if (isSelfPeer(peer_id)) {
541  return;
542  }
543 
544  // Skip if peer is already active
545  if (active_peers_.find(peer_id) != active_peers_.end()) {
546  return;
547  }
548 
549  auto [it, added] = peers_in_queue_.emplace(peer_id);
550 
551  // Already in queue
552  if (not added) {
553  return;
554  }
555 
556  queue_to_connect_.emplace_back(*it);
557  BOOST_ASSERT(queue_to_connect_.size() == peers_in_queue_.size());
558 
559  SL_DEBUG(log_,
560  "New peer_id enqueued: {:l}. In queue: {}",
561  peer_id,
562  queue_to_connect_.size());
563  }
564 
566  // Skip connection to itself
567  if (isSelfPeer(peer_id)) {
568  connecting_peers_.erase(peer_id);
569  return;
570  }
571 
572  auto connection =
573  host_.getNetwork().getConnectionManager().getBestConnectionForPeer(
574  peer_id);
575  if (connection == nullptr) {
576  connecting_peers_.erase(peer_id);
577  return;
578  }
579  if (connection->isInitiator()) {
580  auto out_peers_count = std::count_if(
581  active_peers_.begin(), active_peers_.end(), [](const auto &el) {
582  return el.second.peer_type == PeerType::PEER_TYPE_OUT;
583  });
584  if (out_peers_count > app_config_.outPeers()) {
585  connecting_peers_.erase(peer_id);
586  disconnectFromPeer(peer_id);
587  return;
588  }
589  } else {
590  auto in_peers_count = 0u;
591  auto in_light_peers_count = 0u;
592  if (peer_states_[peer_id].roles.flags.full == 1) {
593  for (const auto &peer : active_peers_) {
594  if (peer.second.peer_type == PeerType::PEER_TYPE_IN
595  and peer_states_[peer.first].roles.flags.full == 1) {
596  ++in_peers_count;
597  }
598  }
599  if (in_peers_count >= app_config_.inPeers()) {
600  connecting_peers_.erase(peer_id);
601  disconnectFromPeer(peer_id);
602  return;
603  }
604  } else if (peer_states_[peer_id].roles.flags.light == 1) {
605  for (const auto &peer : active_peers_) {
606  if (peer.second.peer_type == PeerType::PEER_TYPE_IN
607  and peer_states_[peer.first].roles.flags.light == 1) {
608  ++in_light_peers_count;
609  }
610  }
611  if (in_light_peers_count >= app_config_.inPeersLight()) {
612  connecting_peers_.erase(peer_id);
613  disconnectFromPeer(peer_id);
614  return;
615  }
616  }
617  }
618 
619  PeerInfo peer_info{.id = peer_id, .addresses = {}};
620 
621  auto block_announce_protocol = router_->getBlockAnnounceProtocol();
622  BOOST_ASSERT_MSG(block_announce_protocol,
623  "Router did not provide block announce protocol");
624 
625  if (stream_engine_->reserveOutgoing(peer_info.id,
626  block_announce_protocol)) {
627  block_announce_protocol->newOutgoingStream(
628  peer_info,
629  [wp = weak_from_this(),
630  peer_info,
631  protocol = block_announce_protocol,
632  connection](auto &&stream_res) {
633  auto self = wp.lock();
634  if (not self) {
635  return;
636  }
637 
638  auto &peer_id = peer_info.id;
639 
640  self->stream_engine_->dropReserveOutgoing(peer_id, protocol);
641  if (not stream_res.has_value()) {
642  self->log_->warn("Unable to create stream {} with {}: {}",
643  protocol->protocolName(),
644  peer_id,
645  stream_res.error().message());
646  self->connecting_peers_.erase(peer_id);
647  self->disconnectFromPeer(peer_id);
648  return;
649  }
650  PeerType peer_type = connection->isInitiator()
653 
654  // Add to active peer list
655  if (auto [ap_it, added] = self->active_peers_.emplace(
656  peer_id, PeerDescriptor{peer_type, self->clock_->now()});
657  added) {
658  self->recently_active_peers_.insert(peer_id);
659 
660  // And remove from queue
661  if (auto piq_it = self->peers_in_queue_.find(peer_id);
662  piq_it != self->peers_in_queue_.end()) {
663  auto qtc_it =
664  std::find_if(self->queue_to_connect_.cbegin(),
665  self->queue_to_connect_.cend(),
666  [&peer_id = peer_id](const auto &item) {
667  return peer_id == item.get();
668  });
669  self->queue_to_connect_.erase(qtc_it);
670  self->peers_in_queue_.erase(piq_it);
671  BOOST_ASSERT(self->queue_to_connect_.size()
672  == self->peers_in_queue_.size());
673 
674  SL_DEBUG(self->log_,
675  "Remained peers in queue for connect: {}",
676  self->peers_in_queue_.size());
677  }
678  self->sync_peer_num_->set(self->active_peers_.size());
679  }
680 
681  self->connecting_peers_.erase(peer_id);
682 
683  self->reserveStreams(peer_id);
684  self->startPingingPeer(peer_id);
685 
686  // Establish outgoing grandpa stream if node synced
687  auto r_info_opt = self->getPeerState(peer_id);
688  auto o_info_opt = self->getPeerState(self->own_peer_info_.id);
689  if (r_info_opt.has_value() and o_info_opt.has_value()) {
690  auto &r_info = r_info_opt.value();
691  auto &o_info = o_info_opt.value();
692 
693  if (r_info.get().best_block.number
694  <= o_info.get().best_block.number) {
695  auto grandpa_protocol = self->router_->getGrandpaProtocol();
696  BOOST_ASSERT_MSG(grandpa_protocol,
697  "Router did not provide grandpa protocol");
698  grandpa_protocol->newOutgoingStream(peer_info,
699  [](const auto &...) {});
700  }
701  }
702  });
703  } else {
704  SL_DEBUG(log_,
705  "Stream {} with {} is alive",
706  block_announce_protocol->protocolName(),
707  peer_id);
708  connecting_peers_.erase(peer_id);
709  }
710 
711  auto addresses_res =
712  host_.getPeerRepository().getAddressRepository().getAddresses(peer_id);
713  if (addresses_res.has_value()) {
714  auto &addresses = addresses_res.value();
715  peer_info.addresses = std::move(addresses);
716  kademlia_->addPeer(peer_info, false);
717  }
718  }
719 
720  void PeerManagerImpl::reserveStreams(const PeerId &peer_id) const {
721  // Reserve stream slots for needed protocols
722  auto grandpa_protocol = router_->getGrandpaProtocol();
723  BOOST_ASSERT_MSG(grandpa_protocol,
724  "Router did not provide grandpa protocol");
725 
726  auto transaction_protocol = router_->getPropagateTransactionsProtocol();
727  BOOST_ASSERT_MSG(transaction_protocol,
728  "Router did not provide propagate transaction protocol");
729 
730  stream_engine_->reserveStreams(peer_id, grandpa_protocol);
731  stream_engine_->reserveStreams(peer_id, transaction_protocol);
732  }
733 
734  bool PeerManagerImpl::isSelfPeer(const PeerId &peer_id) const {
735  return own_peer_info_.id == peer_id;
736  }
737 
738  std::vector<scale::PeerInfoSerializable>
740  auto get_res = storage_->load(storage::kActivePeersKey);
741  if (not get_res) {
742  SL_ERROR(log_,
743  "List of last active peers cannot be obtained from storage. "
744  "Error={}",
745  get_res.error().message());
746  return {};
747  }
748 
749  std::vector<scale::PeerInfoSerializable> last_active_peers;
750  scale::ScaleDecoderStream s{get_res.value()};
751  try {
752  s >> last_active_peers;
753  } catch (...) {
754  SL_ERROR(log_, "Unable to decode list of active peers");
755  return {};
756  }
757  return last_active_peers;
758  }
759 
761  std::vector<libp2p::peer::PeerInfo> last_active_peers;
762  for (const auto &peer_id : recently_active_peers_) {
763  auto peer_info = host_.getPeerRepository().getPeerInfo(peer_id);
764  last_active_peers.push_back(peer_info);
765  }
766 
767  if (last_active_peers.empty()) {
768  SL_DEBUG(log_,
769  "Zero last active peers, won't save zero. Storage will remain "
770  "untouched.");
771  return;
772  }
773 
774  scale::ScaleEncoderStream out;
775  try {
776  out << last_active_peers;
777  } catch (...) {
778  SL_ERROR(log_, "Unable to encode list of active peers");
779  return;
780  }
781 
782  auto save_res = storage_->put(storage::kActivePeersKey,
783  common::Buffer{out.to_vector()});
784  if (not save_res) {
785  SL_ERROR(log_,
786  "Cannot store active peers. Error={}",
787  save_res.error().message());
788  return;
789  }
790  SL_DEBUG(log_,
791  "Saved {} last active peers' record(s)",
792  last_active_peers.size());
793  }
794 
796  for (auto it = pinging_connections_.begin();
797  it != pinging_connections_.end();) {
798  if ((**it).isClosed()) {
799  it = pinging_connections_.erase(it);
800  } else {
801  ++it;
802  }
803  }
804  }
805 } // namespace kagome::network
const BootstrapNodes & bootstrap_nodes_
std::unordered_set< PeerId > peers_in_queue_
std::unordered_set< libp2p::network::ConnectionManager::ConnectionSPtr > pinging_connections_
std::shared_ptr< libp2p::basic::Scheduler > scheduler_
std::shared_ptr< application::AppStateManager > app_state_manager_
Class represents arbitrary (including empty) byte buffer.
Definition: buffer.hpp:29
void forEachPeer(std::function< void(const PeerId &)> func) const override
virtual void set(double val)=0
Set the gauge to the given value.
libp2p::peer::PeerId PeerId
network::ParachainId parachain_id
std::unordered_set< PeerId > connecting_peers_
virtual uint32_t outPeers() const =0
void updatePeerState(const PeerId &peer_id, const Status &status) override
virtual uint32_t inPeersLight() const =0
void processFullyConnectedPeer(const PeerId &peer_id)
std::shared_ptr< StreamEngine > stream_engine_
virtual uint32_t inPeers() const =0
STL namespace.
bool isSelfPeer(const PeerId &peer_id) const
Right way to check self peer as it takes into account dev mode.
std::deque< std::reference_wrapper< const PeerId > > queue_to_connect_
std::map< PeerId, PeerDescriptor > active_peers_
primitives::BlockInfo best_block
Definition: status.hpp:35
crypto::Sr25519PublicKey CollatorPublicKey
std::optional< CollatorState > collator_state
PeerManagerImpl(std::shared_ptr< application::AppStateManager > app_state_manager, libp2p::Host &host, std::shared_ptr< libp2p::protocol::Identify > identify, std::shared_ptr< libp2p::protocol::kademlia::Kademlia > kademlia, std::shared_ptr< libp2p::basic::Scheduler > scheduler, std::shared_ptr< StreamEngine > stream_engine, const application::AppConfiguration &app_config, std::shared_ptr< clock::SteadyClock > clock, const BootstrapNodes &bootstrap_nodes, const OwnPeerInfo &own_peer_info, std::shared_ptr< network::Router > router, std::shared_ptr< storage::BufferStorage > storage, std::shared_ptr< crypto::Hasher > hasher, std::shared_ptr< ReputationRepository > reputation_repository)
void forOnePeer(const PeerId &peer_id, std::function< void(const PeerId &)> func) const override
const common::Buffer kActivePeersKey
ParachainState & parachainState() override
outcome::result< std::pair< network::CollatorPublicKey const &, network::ParachainId > > insert_advertisement(PeerState &peer_state, ParachainState &parachain_state, primitives::BlockHash para_hash) override
const application::AppConfiguration & app_config_
size_t activePeersNumber() const override
std::unordered_map< BlockHash, bool > our_view
libp2p::event::Handle add_peer_handle_
virtual bool isRunInDevMode() const =0
void reserveStreams(const PeerId &peer_id) const override
virtual const network::PeeringConfig & peeringConfig() const =0
primitives::BlockHeader header
std::shared_ptr< ReputationRepository > reputation_repository_
void setCollating(const PeerId &peer_id, network::CollatorPublicKey const &collator_id, network::ParachainId para_id) override
std::shared_ptr< network::Router > router_
libp2p::peer::PeerInfo PeerInfo
std::shared_ptr< libp2p::protocol::kademlia::Kademlia > kademlia_
std::shared_ptr< clock::SteadyClock > clock_
void keepAlive(const PeerId &peer_id) override
OUTCOME_CPP_DEFINE_CATEGORY(kagome::network, PeerManagerImpl::Error, e)
std::chrono::seconds peerTtl
Time of peer inactivity to disconnect.
void align()
Aligns amount of connected streams.
void startPingingPeer(const PeerId &peer_id) override
static constexpr std::chrono::seconds kTimeoutForConnecting
void processDiscoveredPeer(const PeerId &peer_id)
std::unordered_map< PeerId, PeerState > peer_states_
BlockNumber number
index of the block in the chain
void connectToPeer(const PeerInfo &peer_info) override
libp2p::event::Handle peer_disconnected_handler_
size_t hardLimit
Max peers before start forced disconnection.
libp2p::basic::Scheduler::Handle align_timer_
std::shared_ptr< crypto::Hasher > hasher_
std::optional< std::reference_wrapper< PeerState > > getPeerState(const PeerId &peer_id) override
std::chrono::seconds aligningPeriod
Period of active peers amount aligning.
Logger createLogger(const std::string &tag)
Definition: logger.cpp:112
void disconnectFromPeer(const PeerId &peer_id)
Closes all streams of provided peer.
std::shared_ptr< StreamEngine > getStreamEngine() override
std::shared_ptr< storage::BufferStorage > storage_
std::shared_ptr< libp2p::protocol::Identify > identify_
size_t targetPeerAmount
Target amount of active peers.
std::vector< scale::PeerInfoSerializable > loadLastActivePeers()