12 #include <libp2p/protocol/kademlia/impl/peer_routing_table.hpp> 19 constexpr
const char *syncPeerMetricName =
"kagome_sync_peers";
21 static constexpr int32_t kDisconnectReputation = -256;
27 case E::UNDECLARED_COLLATOR:
28 return "Process handling from undeclared collator";
30 return "Processing para hash, which is out of view";
32 return "Processing duplicated hash";
34 return "Unknown error in ChainSpecImpl";
39 std::shared_ptr<application::AppStateManager> app_state_manager,
41 std::shared_ptr<libp2p::protocol::Identify> identify,
42 std::shared_ptr<libp2p::protocol::kademlia::Kademlia> kademlia,
43 std::shared_ptr<libp2p::basic::Scheduler> scheduler,
44 std::shared_ptr<StreamEngine> stream_engine,
46 std::shared_ptr<clock::SteadyClock> clock,
49 std::shared_ptr<network::Router> router,
50 std::shared_ptr<storage::BufferStorage> storage,
51 std::shared_ptr<crypto::Hasher> hasher,
52 std::shared_ptr<ReputationRepository> reputation_repository)
53 : app_state_manager_(
std::move(app_state_manager)),
55 identify_(
std::move(identify)),
56 kademlia_(
std::move(kademlia)),
57 scheduler_(
std::move(scheduler)),
58 stream_engine_(
std::move(stream_engine)),
59 app_config_(app_config),
60 clock_(
std::move(clock)),
61 bootstrap_nodes_(bootstrap_nodes),
62 own_peer_info_(own_peer_info),
63 router_{std::move(router)},
73 BOOST_ASSERT(
router_ !=
nullptr);
75 BOOST_ASSERT(
hasher_ !=
nullptr);
79 registry_->registerGaugeFamily(syncPeerMetricName,
80 "Number of peers we sync with");
90 "Does not have any bootstrap nodes. " 91 "Provide them by chain spec or CLI argument `--bootnodes'");
101 "Peer manager is started in passive mode, " 102 "because have not any bootstrap nodes.");
113 .getChannel<libp2p::event::protocol::kademlia::PeerAddedChannel>()
114 .subscribe([wp = weak_from_this()](
const PeerId &peer_id) {
115 if (
auto self = wp.lock()) {
117 self->reputation_repository_->reputation(peer_id);
120 "Disconnecting from peer {} due to its negative " 124 self->disconnectFromPeer(peer_id);
127 self->processDiscoveredPeer(peer_id);
133 .getChannel<libp2p::event::network::OnPeerDisconnectedChannel>()
134 .subscribe([wp = weak_from_this()](
const PeerId &peer_id) {
135 if (
auto self = wp.lock()) {
137 "OnPeerDisconnectedChannel handler from peer {}",
139 self->stream_engine_->del(peer_id);
140 self->peer_states_.erase(peer_id);
141 self->active_peers_.erase(peer_id);
142 self->connecting_peers_.erase(peer_id);
143 self->sync_peer_num_->set(self->active_peers_.size());
145 "Remained {} active peers",
146 self->active_peers_.size());
150 identify_->onIdentifyReceived([wp = weak_from_this()](
152 if (
auto self = wp.lock()) {
153 SL_DEBUG(self->log_,
"Identify received from peer {}", peer_id);
154 if (
auto rating = self->reputation_repository_->reputation(peer_id);
158 "Disconnecting from peer {} due to its negative reputation {}",
161 self->disconnectFromPeer(peer_id);
164 self->processFullyConnectedPeer(peer_id);
173 kademlia_->addPeer(bootstrap_node,
true);
179 "Loaded {} last active peers' record(s)",
180 last_active_peers.size());
181 for (
const auto &peer_info : last_active_peers) {
201 auto res =
host_.getPeerRepository().getAddressRepository().upsertAddresses(
202 peer_info.id, peer_info.addresses, libp2p::peer::ttl::kTransient);
218 std::function<
void(
const PeerId &)> func)
const {
229 BOOST_ASSERT(!it->second.collator_state
230 && !!
"Collator state should be empty at the time.");
232 .collator_id = collator_id,
233 .advertisements = {}};
234 it->second.time =
clock_->now();
239 const PeerId &peer_id, std::function<
void(
const PeerId &)> func)
const {
246 std::pair<network::CollatorPublicKey const &, network::ParachainId>>
252 if (parachain_state.
our_view.count(para_hash) == 0)
255 if (peer_state.
collator_state.value().advertisements.count(para_hash) != 0)
259 std::move(para_hash));
260 return std::make_pair(peer_state.
collator_state.value().collator_id,
265 SL_TRACE(
log_,
"Try to align peers number");
276 using PriorityType = int32_t;
277 using ItemType = std::pair<PriorityType, PeerId>;
279 std::vector<ItemType> peers_list;
282 uint64_t
const now_ms =
283 std::chrono::time_point_cast<std::chrono::milliseconds>(
clock_->now())
286 uint64_t
const idle_ms =
287 std::chrono::duration_cast<std::chrono::milliseconds>(peer_ttl).count();
290 uint64_t
const last_activity_ms =
291 std::chrono::time_point_cast<std::chrono::milliseconds>(
297 if (peer_reputation < kDisconnectReputation
298 || last_activity_ms + idle_ms < now_ms) {
299 peers_list.push_back(
300 std::make_pair(std::numeric_limits<PriorityType>::min(), peer_id));
305 peers_list.push_back(std::make_pair(peer_reputation, peer_id));
309 std::sort(peers_list.begin(),
311 [](
auto const &l,
auto const &r) {
return r.first < l.first; });
313 for (; !peers_list.empty()
314 && (peers_list.size() > hard_limit
315 || peers_list.back().first
316 == std::numeric_limits<PriorityType>::min());
317 peers_list.pop_back()) {
318 const auto &peer_id = peers_list.back().second;
323 if (active_peers_.size() < target_count) {
327 auto &peer_id = node.value();
336 "Remained peers in queue for connect: {}",
342 SL_DEBUG(
log_,
"Queue for connect is empty. Reuse bootstrap nodes");
352 "Queue for connect is empty. Connecting peers: {}",
360 [wp = weak_from_this()] {
361 if (
auto self = wp.lock()) {
366 SL_DEBUG(
log_,
"Active peers = {}", active_peers_.size());
376 auto peer_info =
host_.getPeerRepository().getPeerInfo(peer_id);
377 if (peer_info.addresses.empty()) {
378 SL_DEBUG(
log_,
"Not found addresses for peer {}", peer_id);
383 auto connectedness =
host_.connectedness(peer_info);
384 if (connectedness == libp2p::Host::Connectedness::CAN_NOT_CONNECT) {
385 SL_DEBUG(
log_,
"Can not connect to peer {}", peer_id);
390 SL_DEBUG(
log_,
"Try to connect to peer {}", peer_info.id);
391 for (
auto addr : peer_info.addresses) {
392 SL_DEBUG(
log_,
" address: {}", addr.getStringAddress());
397 [wp = weak_from_this(), peer_id](
auto res)
mutable {
398 auto self = wp.lock();
403 if (not res.has_value()) {
405 "Connecting to peer {} is failed: {}",
407 res.error().message());
408 self->connecting_peers_.erase(peer_id);
412 auto &connection = res.value();
413 auto remote_peer_id_res = connection->remotePeer();
414 if (not remote_peer_id_res.has_value()) {
417 "Connected, but not identified yet (expecting peer_id={:l})",
419 self->connecting_peers_.erase(peer_id);
423 auto &remote_peer_id = remote_peer_id_res.value();
424 if (remote_peer_id == peer_id) {
425 SL_DEBUG(self->log_,
"Connected to peer {}", peer_id);
427 self->processFullyConnectedPeer(peer_id);
438 SL_INFO(
log_,
"Disconnect from peer {}", peer_id);
439 host_.disconnect(peer_id);
445 it->second.time_point =
clock_->now();
450 auto ping_protocol =
router_->getPingProtocol();
451 BOOST_ASSERT_MSG(ping_protocol,
"Router did not provide ping protocol");
454 host_.getNetwork().getConnectionManager().getBestConnectionForPeer(
456 if (conn ==
nullptr) {
458 "Failed to start pinging {}: No connection to this peer exists",
464 if (not is_emplaced) {
470 "Start pinging of {} (conn={})",
472 static_cast<void *>(conn.get()));
474 ping_protocol->startPinging(
476 [wp = weak_from_this(), peer_id, conn](
477 outcome::result<std::shared_ptr<
478 libp2p::protocol::PingClientSession>> session_res) {
479 if (
auto self = wp.lock()) {
480 if (session_res.has_error()) {
482 "Stop pinging of {} (conn={}): {}",
484 static_cast<void *>(conn.get()),
485 session_res.error().message());
486 self->pinging_connections_.erase(conn);
487 self->disconnectFromPeer(peer_id);
490 "Pinging: {} (conn={}) is alive",
492 static_cast<void *>(conn.get()));
493 self->keepAlive(peer_id);
502 it->second.time =
clock_->now();
503 it->second.roles = status.
roles;
513 auto hash =
hasher_->blake2b_256(scale::encode(announce.
header).value());
516 it->second.time =
clock_->now();
523 it->second.time =
clock_->now();
524 it->second.round_number = neighbor_message.
round_number;
529 std::optional<std::reference_wrapper<PeerState>>
560 "New peer_id enqueued: {:l}. In queue: {}",
573 host_.getNetwork().getConnectionManager().getBestConnectionForPeer(
575 if (connection ==
nullptr) {
579 if (connection->isInitiator()) {
580 auto out_peers_count = std::count_if(
590 auto in_peers_count = 0u;
591 auto in_light_peers_count = 0u;
604 }
else if (
peer_states_[peer_id].roles.flags.light == 1) {
608 ++in_light_peers_count;
619 PeerInfo peer_info{.id = peer_id, .addresses = {}};
621 auto block_announce_protocol =
router_->getBlockAnnounceProtocol();
622 BOOST_ASSERT_MSG(block_announce_protocol,
623 "Router did not provide block announce protocol");
626 block_announce_protocol)) {
627 block_announce_protocol->newOutgoingStream(
629 [wp = weak_from_this(),
631 protocol = block_announce_protocol,
632 connection](
auto &&stream_res) {
633 auto self = wp.lock();
638 auto &peer_id = peer_info.id;
640 self->stream_engine_->dropReserveOutgoing(peer_id, protocol);
641 if (not stream_res.has_value()) {
642 self->log_->warn(
"Unable to create stream {} with {}: {}",
643 protocol->protocolName(),
645 stream_res.error().message());
646 self->connecting_peers_.erase(peer_id);
647 self->disconnectFromPeer(peer_id);
650 PeerType peer_type = connection->isInitiator()
655 if (
auto [ap_it, added] = self->active_peers_.emplace(
658 self->recently_active_peers_.insert(peer_id);
661 if (
auto piq_it = self->peers_in_queue_.find(peer_id);
662 piq_it !=
self->peers_in_queue_.end()) {
664 std::find_if(self->queue_to_connect_.cbegin(),
665 self->queue_to_connect_.cend(),
666 [&peer_id = peer_id](
const auto &item) {
667 return peer_id == item.get();
669 self->queue_to_connect_.erase(qtc_it);
670 self->peers_in_queue_.erase(piq_it);
671 BOOST_ASSERT(self->queue_to_connect_.size()
672 ==
self->peers_in_queue_.size());
675 "Remained peers in queue for connect: {}",
676 self->peers_in_queue_.size());
678 self->sync_peer_num_->set(self->active_peers_.size());
681 self->connecting_peers_.erase(peer_id);
683 self->reserveStreams(peer_id);
684 self->startPingingPeer(peer_id);
687 auto r_info_opt =
self->getPeerState(peer_id);
688 auto o_info_opt =
self->getPeerState(self->own_peer_info_.id);
689 if (r_info_opt.has_value() and o_info_opt.has_value()) {
690 auto &r_info = r_info_opt.value();
691 auto &o_info = o_info_opt.value();
693 if (r_info.get().best_block.number
694 <= o_info.get().best_block.number) {
695 auto grandpa_protocol =
self->router_->getGrandpaProtocol();
696 BOOST_ASSERT_MSG(grandpa_protocol,
697 "Router did not provide grandpa protocol");
698 grandpa_protocol->newOutgoingStream(peer_info,
699 [](
const auto &...) {});
705 "Stream {} with {} is alive",
706 block_announce_protocol->protocolName(),
712 host_.getPeerRepository().getAddressRepository().getAddresses(peer_id);
713 if (addresses_res.has_value()) {
714 auto &addresses = addresses_res.value();
715 peer_info.addresses = std::move(addresses);
722 auto grandpa_protocol =
router_->getGrandpaProtocol();
723 BOOST_ASSERT_MSG(grandpa_protocol,
724 "Router did not provide grandpa protocol");
726 auto transaction_protocol =
router_->getPropagateTransactionsProtocol();
727 BOOST_ASSERT_MSG(transaction_protocol,
728 "Router did not provide propagate transaction protocol");
738 std::vector<scale::PeerInfoSerializable>
743 "List of last active peers cannot be obtained from storage. " 745 get_res.error().message());
749 std::vector<scale::PeerInfoSerializable> last_active_peers;
750 scale::ScaleDecoderStream s{get_res.value()};
752 s >> last_active_peers;
754 SL_ERROR(
log_,
"Unable to decode list of active peers");
757 return last_active_peers;
761 std::vector<libp2p::peer::PeerInfo> last_active_peers;
763 auto peer_info =
host_.getPeerRepository().getPeerInfo(peer_id);
764 last_active_peers.push_back(peer_info);
767 if (last_active_peers.empty()) {
769 "Zero last active peers, won't save zero. Storage will remain " 774 scale::ScaleEncoderStream out;
776 out << last_active_peers;
778 SL_ERROR(
log_,
"Unable to encode list of active peers");
786 "Cannot store active peers. Error={}",
787 save_res.error().message());
791 "Saved {} last active peers' record(s)",
792 last_active_peers.size());
798 if ((**it).isClosed()) {
BlockNumber last_finalized
const BootstrapNodes & bootstrap_nodes_
std::unordered_set< PeerId > peers_in_queue_
std::unordered_set< libp2p::network::ConnectionManager::ConnectionSPtr > pinging_connections_
std::shared_ptr< libp2p::basic::Scheduler > scheduler_
std::shared_ptr< application::AppStateManager > app_state_manager_
Class represents arbitrary (including empty) byte buffer.
void forEachPeer(std::function< void(const PeerId &)> func) const override
virtual void set(double val)=0
Set the gauge to the given value.
libp2p::peer::PeerId PeerId
network::ParachainId parachain_id
std::unordered_set< PeerId > connecting_peers_
virtual uint32_t outPeers() const =0
void updatePeerState(const PeerId &peer_id, const Status &status) override
metrics::RegistryPtr registry_
virtual uint32_t inPeersLight() const =0
void processFullyConnectedPeer(const PeerId &peer_id)
std::shared_ptr< StreamEngine > stream_engine_
virtual uint32_t inPeers() const =0
bool isSelfPeer(const PeerId &peer_id) const
Right way to check self peer as it takes into account dev mode.
std::deque< std::reference_wrapper< const PeerId > > queue_to_connect_
std::map< PeerId, PeerDescriptor > active_peers_
primitives::BlockInfo best_block
crypto::Sr25519PublicKey CollatorPublicKey
std::optional< CollatorState > collator_state
PeerManagerImpl(std::shared_ptr< application::AppStateManager > app_state_manager, libp2p::Host &host, std::shared_ptr< libp2p::protocol::Identify > identify, std::shared_ptr< libp2p::protocol::kademlia::Kademlia > kademlia, std::shared_ptr< libp2p::basic::Scheduler > scheduler, std::shared_ptr< StreamEngine > stream_engine, const application::AppConfiguration &app_config, std::shared_ptr< clock::SteadyClock > clock, const BootstrapNodes &bootstrap_nodes, const OwnPeerInfo &own_peer_info, std::shared_ptr< network::Router > router, std::shared_ptr< storage::BufferStorage > storage, std::shared_ptr< crypto::Hasher > hasher, std::shared_ptr< ReputationRepository > reputation_repository)
void forOnePeer(const PeerId &peer_id, std::function< void(const PeerId &)> func) const override
const common::Buffer kActivePeersKey
ParachainState & parachainState() override
outcome::result< std::pair< network::CollatorPublicKey const &, network::ParachainId > > insert_advertisement(PeerState &peer_state, ParachainState ¶chain_state, primitives::BlockHash para_hash) override
const application::AppConfiguration & app_config_
size_t activePeersNumber() const override
std::unordered_map< BlockHash, bool > our_view
libp2p::event::Handle add_peer_handle_
virtual bool isRunInDevMode() const =0
metrics::Gauge * sync_peer_num_
void reserveStreams(const PeerId &peer_id) const override
virtual const network::PeeringConfig & peeringConfig() const =0
primitives::BlockHeader header
std::shared_ptr< ReputationRepository > reputation_repository_
void setCollating(const PeerId &peer_id, network::CollatorPublicKey const &collator_id, network::ParachainId para_id) override
std::shared_ptr< network::Router > router_
libp2p::peer::PeerInfo PeerInfo
std::shared_ptr< libp2p::protocol::kademlia::Kademlia > kademlia_
std::set< PeerId > recently_active_peers_
std::shared_ptr< clock::SteadyClock > clock_
void keepAlive(const PeerId &peer_id) override
OUTCOME_CPP_DEFINE_CATEGORY(kagome::network, PeerManagerImpl::Error, e)
std::chrono::seconds peerTtl
Time of peer inactivity to disconnect.
void align()
Aligns amount of connected streams.
void startPingingPeer(const PeerId &peer_id) override
static constexpr std::chrono::seconds kTimeoutForConnecting
void processDiscoveredPeer(const PeerId &peer_id)
std::unordered_map< PeerId, PeerState > peer_states_
void connectToPeer(const PeerInfo &peer_info) override
libp2p::event::Handle peer_disconnected_handler_
size_t hardLimit
Max peers before start forced disconnection.
libp2p::basic::Scheduler::Handle align_timer_
std::shared_ptr< crypto::Hasher > hasher_
std::optional< std::reference_wrapper< PeerState > > getPeerState(const PeerId &peer_id) override
std::chrono::seconds aligningPeriod
Period of active peers amount aligning.
Logger createLogger(const std::string &tag)
void disconnectFromPeer(const PeerId &peer_id)
Closes all streams of provided peer.
void clearClosedPingingConnections()
std::shared_ptr< StreamEngine > getStreamEngine() override
std::shared_ptr< storage::BufferStorage > storage_
ParachainState parachain_state_
const OwnPeerInfo & own_peer_info_
std::shared_ptr< libp2p::protocol::Identify > identify_
size_t targetPeerAmount
Target amount of active peers.
std::vector< scale::PeerInfoSerializable > loadLastActivePeers()