8 #include <libp2p/connection/loopback_stream.hpp> 19 using libp2p::connection::LoopbackStream;
25 std::shared_ptr<boost::asio::io_context> io_context,
27 std::shared_ptr<consensus::grandpa::GrandpaObserver> grandpa_observer,
29 std::shared_ptr<StreamEngine> stream_engine,
30 std::shared_ptr<PeerManager> peer_manager,
32 std::shared_ptr<libp2p::basic::Scheduler> scheduler)
47 auto res =
stream_engine_->addBidirectional(stream, shared_from_this());
48 if (not res.has_value()) {
51 read(std::move(stream));
60 BOOST_ASSERT(stream->remotePeerId().has_value());
65 [wp = weak_from_this(), stream](outcome::result<void> res) {
66 auto self = wp.lock();
72 auto peer_id = stream->remotePeerId().value();
74 if (not res.has_value()) {
75 SL_VERBOSE(self->base_.logger(),
76 "Handshake failed on incoming {} stream with {}: {}",
79 res.error().message());
84 res =
self->stream_engine_->addIncoming(stream,
self);
85 if (not res.has_value()) {
86 SL_VERBOSE(self->base_.logger(),
87 "Can't register incoming {} stream with {}: {}",
90 res.error().message());
95 SL_VERBOSE(self->base_.logger(),
96 "Fully established incoming {} stream with {}",
104 std::function<
void(outcome::result<std::shared_ptr<Stream>>)> &&cb) {
108 [wp = weak_from_this(), peer_id = peer_info.id, cb = std::move(cb)](
109 auto &&stream_res)
mutable {
110 auto self = wp.lock();
116 if (not stream_res.has_value()) {
117 SL_VERBOSE(self->base_.logger(),
118 "Can't create outgoing {} stream with {}: {}",
119 self->protocolName(),
121 stream_res.error().message());
122 cb(stream_res.as_failure());
125 auto &stream_and_proto = stream_res.value();
128 stream = stream_and_proto.stream,
129 protocol = stream_and_proto.protocol,
130 cb = std::move(cb)](outcome::result<void> res) {
131 auto self = wp.lock();
137 if (not res.has_value()) {
138 SL_VERBOSE(self->base_.logger(),
139 "Handshake failed on outgoing {} stream with {}: {}",
141 stream->remotePeerId().value(),
142 res.error().message());
144 cb(res.as_failure());
148 res =
self->stream_engine_->addOutgoing(stream,
self);
149 if (not res.has_value()) {
150 SL_VERBOSE(self->base_.logger(),
151 "Can't register outgoing {} stream with {}: {}",
153 stream->remotePeerId().value(),
154 res.error().message());
156 cb(res.as_failure());
160 SL_VERBOSE(self->base_.logger(),
161 "Fully established outgoing {} stream with {}",
163 stream->remotePeerId().value());
166 auto own_peer_state =
167 self->peer_manager_->getPeerState(self->own_info_.id);
168 if (own_peer_state.has_value()) {
170 .
round_number = own_peer_state->get().round_number.value_or(1),
171 .voter_set_id = own_peer_state->get().set_id.value_or(0),
172 .last_finalized = own_peer_state->get().last_finalized};
174 SL_DEBUG(self->base_.logger(),
175 "Send initial neighbor message: grandpa round number {}",
182 self->stream_engine_->send(
183 stream->remotePeerId().value(),
self, std::move(shared_msg));
186 cb(std::move(stream));
189 self->writeHandshake(std::move(stream_and_proto.stream),
196 std::shared_ptr<Stream> stream,
198 std::function<
void(outcome::result<void>)> &&cb) {
199 auto read_writer = std::make_shared<ScaleMessageReadWriter>(stream);
201 read_writer->read<
Roles>(
202 [stream, direction, wp = weak_from_this(), cb = std::move(cb)](
203 auto &&remote_roles_res)
mutable {
204 auto self = wp.lock();
211 if (not remote_roles_res.has_value()) {
212 SL_VERBOSE(self->base_.logger(),
213 "Can't read handshake from {}: {}",
214 stream->remotePeerId().
value(),
215 remote_roles_res.error().message());
217 cb(remote_roles_res.as_failure());
222 SL_TRACE(self->base_.logger(),
223 "Handshake has received from {}",
224 stream->remotePeerId().
value());
228 cb(outcome::success());
231 self->writeHandshake(
239 std::shared_ptr<Stream> stream,
241 std::function<
void(outcome::result<void>)> &&cb) {
242 auto read_writer = std::make_shared<ScaleMessageReadWriter>(stream);
246 read_writer->write(roles,
247 [stream = std::move(stream),
249 wp = weak_from_this(),
250 cb = std::move(cb)](
auto &&write_res)
mutable {
251 auto self = wp.lock();
258 if (not write_res.has_value()) {
259 SL_VERBOSE(self->base_.logger(),
260 "Can't send handshake to {}: {}",
261 stream->remotePeerId().value(),
262 write_res.error().message());
264 cb(write_res.as_failure());
268 SL_TRACE(self->base_.logger(),
269 "Handshake has sent to {}",
270 stream->remotePeerId().value());
275 std::move(stream), direction, std::move(cb));
278 cb(outcome::success());
279 self->read(std::move(stream));
286 auto read_writer = std::make_shared<ScaleMessageReadWriter>(stream);
289 wp = weak_from_this()](
290 auto &&grandpa_message_res)
mutable {
291 auto self = wp.lock();
297 if (not grandpa_message_res.has_value()) {
298 SL_VERBOSE(self->base_.logger(),
299 "Can't read grandpa message from {}: {}",
300 stream->remotePeerId().value(),
301 grandpa_message_res.error().message());
306 auto peer_id = stream->remotePeerId().value();
307 auto &grandpa_message = grandpa_message_res.value();
312 SL_VERBOSE(self->base_.logger(),
313 "VoteMessage has received from {}",
315 self->grandpa_observer_->onVoteMessage(peer_id, vote_message);
318 SL_VERBOSE(self->base_.logger(),
319 "CommitMessage has received from {}",
321 self->grandpa_observer_->onCommitMessage(peer_id, commit_message);
324 SL_VERBOSE(self->base_.logger(),
325 "NeighborMessage has received from {}",
327 self->grandpa_observer_->onNeighborMessage(peer_id,
331 SL_VERBOSE(self->base_.logger(),
332 "CatchUpRequest has received from {}",
334 self->grandpa_observer_->onCatchUpRequest(peer_id,
338 SL_VERBOSE(self->base_.logger(),
339 "CatchUpResponse has received from {}",
341 self->grandpa_observer_->onCatchUpResponse(peer_id,
344 self->read(std::move(stream));
350 std::optional<const libp2p::peer::PeerId> peer_id) {
352 "Send vote message: grandpa round number {}",
353 vote_message.round_number);
355 auto filter = [&, &msg = vote_message](
const PeerId &peer_id) {
357 if (not info_opt.has_value()) {
359 "Vote signed by {} with set_id={} in round={} " 360 "has not been sent to {}: peer is not connected",
367 const auto &info = info_opt.value();
369 if (not info.get().set_id.has_value() or not info.get().round_number.has_value()) {
371 "Vote signed by {} with set_id={} in round={} " 372 "has not been sent to {}: set id or round number unknown",
383 if (msg.counter != info.get().set_id) {
385 "Vote signed by {} with set_id={} in round={} " 386 "has not been sent to {} as impolite: their set id is {}",
391 info.get().set_id.value());
397 if (msg.round_number + 2 < info.get().round_number.value()) {
400 "Vote signed by {} with set_id={} in round={} " 401 "has not been sent to {} as impolite: their round is already {}",
406 info.get().round_number.value());
412 if (msg.round_number > info.get().round_number.value()) {
414 "Vote signed by {} with set_id={} in round={} " 415 "has not been sent to {} as impolite: their round is old: {}",
420 info.get().round_number.value());
431 if (not peer_id.has_value()) {
433 shared_from_this(), std::move(shared_msg), filter);
436 peer_id.value(), shared_from_this(), std::move(shared_msg));
442 "Send neighbor message: grandpa round number {}",
461 std::optional<const libp2p::peer::PeerId> peer_id) {
463 "Send commit message: grandpa round number {}",
468 round_number = msg.round,
470 msg.message.target_number](
const PeerId &peer_id) {
472 if (not info_opt.has_value()) {
474 "Commit with set_id={} in round={} " 475 "has not been sent to {}: peer is not connected",
481 const auto &info = info_opt.value();
483 if (not info.get().set_id.has_value() or not info.get().round_number.has_value()) {
485 "Commit with set_id={} in round={} " 486 "has not been sent to {}: set id or round number unknown",
495 if (set_id != info.get().set_id) {
497 "Commit with set_id={} in round={} " 498 "has not been sent to {} as impolite: their set id is {}",
502 info.get().set_id.value());
507 if (round_number < info.get().round_number.value()) {
510 "Commit with set_id={} in round={} " 511 "has not been sent to {} as impolite: their round is already {}",
515 info.get().round_number.value());
521 if (finalizing < info.get().last_finalized) {
524 "Commit with set_id={} in round={} " 525 "has not been sent to {} as impolite: their round is already {}",
529 info.get().round_number.value());
540 if (not peer_id.has_value()) {
542 shared_from_this(), std::move(shared_msg), filter);
545 peer_id.value(), shared_from_this(), std::move(shared_msg));
553 "Send catch-up-request to {} beginning with grandpa round number {}",
555 catch_up_request.round_number);
558 if (not info_opt.has_value()) {
560 "Catch-up-request with set_id={} in round={} " 561 "has not been sent to {}: peer is not connected",
562 catch_up_request.voter_set_id,
563 catch_up_request.round_number,
567 const auto &info = info_opt.value();
569 if (not info.get().set_id.has_value() or not info.get().round_number.has_value()) {
571 "Catch-up-request with set_id={} in round={} " 572 "has not been sent to {}: set id or round number unknown",
573 catch_up_request.voter_set_id,
574 catch_up_request.round_number,
580 if (catch_up_request.voter_set_id != info.get().set_id) {
582 "Catch-up-request with set_id={} in round={} " 583 "has not been sent to {}: different set id",
584 catch_up_request.voter_set_id,
585 catch_up_request.round_number,
592 if (catch_up_request.round_number < info.get().round_number.value() - 1) {
594 "Catch-up-request with set_id={} in round={} " 595 "has not been sent to {}: too old round for requested",
596 catch_up_request.voter_set_id,
597 catch_up_request.round_number,
602 auto round_id = std::tuple(info.get().round_number.value(), info.get().set_id.value());
604 auto [iter_by_round, ok_by_round] =
607 if (not ok_by_round) {
609 "Catch-up-request with set_id={} in round={} " 610 "has not been sent to {}: " 611 "the same catch-up request had sent to another peer",
612 catch_up_request.voter_set_id,
613 catch_up_request.round_number,
618 auto [iter_by_peer, ok_by_peer] =
622 if (not ok_by_peer) {
625 "Catch-up-request with set_id={} in round={} " 626 "has not been sent to {}: impolite to replay catch-up request",
627 catch_up_request.voter_set_id,
628 catch_up_request.round_number,
634 [wp = weak_from_this(), round_id, peer_id] {
635 if (
auto self = wp.lock()) {
636 self->recent_catchup_requests_by_round_.erase(round_id);
637 self->recent_catchup_requests_by_peer_.erase(peer_id);
646 stream_engine_->send(peer_id, shared_from_this(), std::move(shared_msg));
652 "Send catch-up response: beginning with grandpa round number {}",
653 catch_up_response.round_number);
656 if (not info_opt.has_value()) {
658 "Catch-up-response with set_id={} in round={} " 659 "has not been sent to {}: peer is not connected",
660 catch_up_response.voter_set_id,
661 catch_up_response.round_number,
665 const auto &info = info_opt.value();
667 if (not info.get().set_id.has_value() or not info.get().round_number.has_value()) {
669 "Catch-up-response with set_id={} in round={} " 670 "has not been sent to {}: set id or round number unknown",
671 catch_up_response.voter_set_id,
672 catch_up_response.round_number,
678 if (catch_up_response.voter_set_id != info.get().set_id) {
680 "Catch-up-response with set_id={} in round={} " 681 "has not been sent to {}: {} set id",
682 catch_up_response.voter_set_id,
683 catch_up_response.round_number,
685 info.get().set_id.has_value() ?
"different" :
"unknown");
690 if (catch_up_response.round_number < info.get().round_number) {
692 "Catch-up-response with set_id={} in round={} " 693 "has not been sent to {}: is already not actual",
694 catch_up_response.voter_set_id,
695 catch_up_response.round_number,
704 stream_engine_->send(peer_id, shared_from_this(), std::move(shared_msg));
boost::variant< GrandpaVote, FullCommitMessage, GrandpaNeighborMessage, CatchUpRequest, CatchUpResponse > GrandpaMessage
std::string hex_lower(const gsl::span< const uint8_t > bytes) noexcept
Converts bytes to hex representation.
const libp2p::peer::Protocol kGrandpaProtocol
std::shared_ptr< boost::asio::io_context > io_context_
virtual network::Roles roles() const =0
void readHandshake(std::shared_ptr< Stream > stream, Direction direction, std::function< void(outcome::result< void >)> &&cb)
void finalize(FullCommitMessage &&msg, std::optional< const libp2p::peer::PeerId > peer_id)
void read(std::shared_ptr< Stream > stream)
log::Logger const & logger() const
std::shared_ptr< PeerManager > peer_manager_
libp2p::peer::PeerInfo PeerInfo
std::set< libp2p::peer::PeerId > recent_catchup_requests_by_peer_
libp2p::peer::PeerId PeerId
KAGOME_DEFINE_CACHE(BlockAnnounceProtocol)
virtual int32_t luckyPeers() const =0
#define KAGOME_EXTRACT_SHARED_CACHE(prefix, type)
void catchUpRequest(const libp2p::peer::PeerId &peer_id, CatchUpRequest &&catch_up_request)
const OwnPeerInfo & own_info_
void neighbor(GrandpaNeighborMessage &&msg)
Protocols const & protocolIds() const
bool start(std::weak_ptr< T > wptr)
const libp2p::peer::Protocol kGrandpaProtocolLegacy
std::shared_ptr< consensus::grandpa::GrandpaObserver > grandpa_observer_
void onIncomingStream(std::shared_ptr< Stream > stream) override
std::set< std::tuple< consensus::grandpa::RoundNumber, consensus::grandpa::VoterSetId > > recent_catchup_requests_by_round_
void writeHandshake(std::shared_ptr< Stream > stream, Direction direction, std::function< void(outcome::result< void >)> &&cb)
void catchUpResponse(const libp2p::peer::PeerId &peer_id, CatchUpResponse &&catch_up_response)
const application::AppConfiguration & app_config_
std::shared_ptr< StreamEngine > stream_engine_
void vote(network::GrandpaVote &&vote_message, std::optional< const libp2p::peer::PeerId > peer_id)
static constexpr std::chrono::milliseconds kRecentnessDuration
std::shared_ptr< libp2p::basic::Scheduler > scheduler_
void newOutgoingStream(const PeerInfo &peer_info, std::function< void(outcome::result< std::shared_ptr< Stream >>)> &&cb) override