22 std::shared_ptr<StreamEngine> stream_engine,
23 std::shared_ptr<blockchain::BlockTree> block_tree,
24 std::shared_ptr<BlockAnnounceObserver> observer,
25 std::shared_ptr<PeerManager> peer_manager)
29 "BlockAnnounceProtocol"),
55 const auto &last_finalized =
block_tree_->getLastFinalized().hash;
57 block_tree_->getBestContaining(last_finalized, std::nullopt);
58 best_res.has_value()) {
59 best_block = best_res.value();
61 base_.
logger()->error(
"Could not get best block info: {}",
62 best_res.error().message());
66 auto &genesis_hash =
block_tree_->getGenesisBlockHash();
69 .
roles = roles, .best_block = best_block, .genesis_hash = genesis_hash};
73 BOOST_ASSERT(stream->remotePeerId().has_value());
77 [wp = weak_from_this(), stream](outcome::result<void> res) {
78 auto self = wp.lock();
84 auto peer_id = stream->remotePeerId().value();
86 if (not res.has_value()) {
89 "Handshake failed on incoming {} stream with {}: {}",
92 res.error().message());
97 res =
self->stream_engine_->addIncoming(stream,
self);
98 if (not res.has_value()) {
99 SL_VERBOSE(self->base_.logger(),
100 "Can't register incoming {} stream with {}: {}",
101 self->protocolName(),
103 res.error().message());
108 self->peer_manager_->reserveStreams(peer_id);
109 self->peer_manager_->startPingingPeer(peer_id);
111 SL_VERBOSE(self->base_.logger(),
112 "Fully established incoming {} stream with {}",
113 self->protocolName(),
120 std::function<
void(outcome::result<std::shared_ptr<Stream>>)> &&cb) {
122 "Connect for {} stream with {}",
129 [wp = weak_from_this(), peer_id = peer_info.id, cb = std::move(cb)](
130 auto &&stream_res)
mutable {
131 auto self = wp.lock();
137 if (not stream_res.has_value()) {
138 SL_VERBOSE(self->base_.logger(),
139 "Can't create outgoing {} stream with {}: {}",
140 self->protocolName(),
142 stream_res.error().message());
143 cb(stream_res.as_failure());
147 const auto &stream_and_proto = stream_res.value();
150 stream = stream_and_proto.stream,
151 protocol = stream_and_proto.protocol,
152 cb = std::move(cb)](outcome::result<void> res) {
153 auto self = wp.lock();
159 if (not res.has_value()) {
160 SL_VERBOSE(self->base_.logger(),
161 "Handshake failed on outgoing {} stream with {}: {}",
163 stream->remotePeerId().value(),
164 res.error().message());
166 cb(res.as_failure());
170 res =
self->stream_engine_->addOutgoing(stream,
self);
171 if (not res.has_value()) {
172 SL_VERBOSE(self->base_.logger(),
173 "Can't register outgoing {} stream with {}: {}",
175 stream->remotePeerId().value(),
176 res.error().message());
178 cb(res.as_failure());
182 SL_VERBOSE(self->base_.logger(),
183 "Fully established outgoing {} stream with {}",
185 stream->remotePeerId().value());
186 cb(std::move(stream));
189 self->writeStatus(std::move(stream_and_proto.stream),
196 std::shared_ptr<Stream> stream,
198 std::function<
void(outcome::result<void>)> &&cb) {
199 auto read_writer = std::make_shared<ScaleMessageReadWriter>(stream);
201 read_writer->read<
Status>(
202 [stream, direction, wp = weak_from_this(), cb = std::move(cb)](
203 auto &&remote_status_res)
mutable {
204 auto self = wp.lock();
211 if (not remote_status_res.has_value()) {
212 SL_VERBOSE(self->base_.logger(),
213 "Can't read handshake from {}: {}",
214 stream->remotePeerId().value(),
215 remote_status_res.error().message());
217 cb(remote_status_res.as_failure());
220 auto &remote_status = remote_status_res.value();
222 SL_TRACE(self->base_.logger(),
223 "Handshake has received from {}",
224 stream->remotePeerId().value());
226 auto &genesis_hash =
self->block_tree_->getGenesisBlockHash();
228 if (remote_status.genesis_hash != genesis_hash) {
229 SL_VERBOSE(self->base_.logger(),
230 "Error while processing status: genesis no match");
236 auto peer_id = stream->remotePeerId().value();
237 SL_TRACE(self->base_.logger(),
238 "Received status from peer_id={} (best block {})",
240 remote_status.best_block.number);
241 self->peer_manager_->updatePeerState(peer_id, remote_status);
245 cb(outcome::success());
253 self->observer_->onRemoteStatus(peer_id, remote_status);
258 std::shared_ptr<Stream> stream,
260 std::function<
void(outcome::result<void>)> &&cb) {
261 auto read_writer = std::make_shared<ScaleMessageReadWriter>(stream);
264 if (not status_res.has_value()) {
270 const auto &status = status_res.value();
272 read_writer->write(status,
273 [stream = std::move(stream),
275 wp = weak_from_this(),
276 cb = std::move(cb)](
auto &&write_res)
mutable {
277 auto self = wp.lock();
284 if (not write_res.has_value()) {
285 SL_VERBOSE(self->base_.logger(),
286 "Can't send handshake to {}: {}",
287 stream->remotePeerId().value(),
288 write_res.error().message());
290 cb(write_res.as_failure());
294 SL_TRACE(self->base_.logger(),
295 "Handshake has sent to {}",
296 stream->remotePeerId().value());
300 self->readStatus(std::move(stream),
305 cb(outcome::success());
306 self->readAnnounce(std::move(stream));
313 auto read_writer = std::make_shared<ScaleMessageReadWriter>(stream);
316 [stream = std::move(stream),
317 wp = weak_from_this()](
auto &&block_announce_res)
mutable {
318 auto self = wp.lock();
324 if (not block_announce_res.has_value()) {
325 SL_DEBUG(self->base_.logger(),
326 "Can't read block announce from {}: {}",
327 stream->remotePeerId().value(),
328 block_announce_res.error().message());
333 auto peer_id = stream->remotePeerId().value();
334 auto &block_announce = block_announce_res.value();
336 SL_VERBOSE(self->base_.logger(),
337 "Announce of block #{} is received from {}",
338 block_announce.header.number,
341 self->observer_->onBlockAnnounce(peer_id, block_announce);
343 BOOST_ASSERT_MSG(stream->remotePeerId().has_value(),
344 "peer_id must be known at this moment");
345 self->peer_manager_->updatePeerState(stream->remotePeerId().value(),
348 self->readAnnounce(std::move(stream));
355 (*shared_msg) = std::move(announce);
358 base_.
logger(),
"Send announce of block #{}", announce.header.number);
const application::AppConfiguration & app_config_
std::shared_ptr< blockchain::BlockTree > block_tree_
void onIncomingStream(std::shared_ptr< Stream > stream) override
outcome::result< Status > createStatus() const
void newOutgoingStream(const PeerInfo &peer_info, std::function< void(outcome::result< std::shared_ptr< Stream >>)> &&cb) override
virtual network::Roles roles() const =0
const libp2p::peer::Protocol kBlockAnnouncesProtocol
primitives::BlockInfo BlockInfo
log::Logger const & logger() const
libp2p::peer::PeerInfo PeerInfo
void readAnnounce(std::shared_ptr< Stream > stream)
KAGOME_DEFINE_CACHE(BlockAnnounceProtocol)
virtual int32_t luckyPeers() const =0
#define KAGOME_EXTRACT_SHARED_CACHE(prefix, type)
virtual const std::string & protocolId() const =0
Protocols const & protocolIds() const
void blockAnnounce(BlockAnnounce &&announce)
void writeStatus(std::shared_ptr< Stream > stream, Direction direction, std::function< void(outcome::result< void >)> &&cb)
bool start(std::weak_ptr< T > wptr)
std::shared_ptr< StreamEngine > stream_engine_
std::shared_ptr< PeerManager > peer_manager_
BlockAnnounceProtocol()=delete
const std::string & protocolName() const override
void readStatus(std::shared_ptr< Stream > stream, Direction direction, std::function< void(outcome::result< void >)> &&cb)
std::shared_ptr< BlockAnnounceObserver > observer_