21 std::shared_ptr<CollationObserver> observer)
36 std::function<
void(outcome::result<std::shared_ptr<Stream>>)> &&cb) {
38 "Connect for {} stream with {}",
45 [wp = weak_from_this(), peer_id = peer_info.id, cb = std::move(cb)](
46 auto &&stream_res)
mutable {
47 auto self = wp.lock();
53 if (not stream_res.has_value()) {
54 SL_VERBOSE(self->base_.logger(),
55 "Can't create outgoing {} stream with {}: {}",
58 stream_res.error().message());
59 cb(stream_res.as_failure());
63 auto &stream = stream_res.value().stream;
64 BOOST_ASSERT(stream->remotePeerId().has_value());
65 self->doCollatorHandshake<
false>(
68 cb{std::move(cb)}](std::shared_ptr<Stream>
const &stream) {
82 collation_decl.collator_id,
83 collation_decl.para_id,
84 collation_decl.signature);
92 observer_->onAdvertise(peer_id, collation_adv.relay_parent);
100 std::move(collation_message),
108 SL_WARN(
base_.
logger(),
"Unexpected collation message from.");
113 std::shared_ptr<kagome::network::Stream> stream) {
114 auto read_writer = std::make_shared<ScaleMessageReadWriter>(stream);
115 read_writer->template read<WireMessage>(
116 [wptr{weak_from_this()},
117 stream{std::move(stream)}](
auto &&result)
mutable {
118 auto self = wptr.lock();
120 stream->close([](
auto &&) {});
125 SL_WARN(self->base_.logger(),
126 "Can't read incoming collation message from stream {} with " 128 stream->remotePeerId().value(),
129 result.error().message());
130 self->base_.closeStream(wptr, stream);
134 SL_VERBOSE(self->base_.logger(),
135 "Received collation message from {}",
136 stream->remotePeerId().has_value()
137 ? stream->remotePeerId().value().toBase58()
141 std::move(result.value()),
143 SL_VERBOSE(self->base_.logger(),
144 "Received ViewUpdate from {}",
145 stream->remotePeerId().has_value()
146 ? stream->remotePeerId().value().toBase58()
151 BOOST_ASSERT(stream->remotePeerId().has_value());
152 self->onCollationMessageRx(stream->remotePeerId().value(),
156 self->readCollationMsg(std::move(stream));
161 BOOST_ASSERT(stream->remotePeerId().has_value());
162 doCollatorHandshake<true>(
164 [wptr{weak_from_this()}](
165 std::shared_ptr<Stream>
const &stream)
mutable {
169 if (
auto self = wptr.lock()) {
170 self->readCollationMsg(stream);
void newOutgoingStream(const PeerInfo &peer_info, std::function< void(outcome::result< std::shared_ptr< Stream >>)> &&cb) override
boost::variant< CollatorDeclaration, CollatorAdvertisement, Dummy, Dummy, Seconded > CollationMessage
Dummy ViewUpdate
ViewUpdate message. Maybe will be implemented later.
const std::string & protocolName() const override
boost::variant< CollationMessage > ProtocolMessage
log::Logger const & logger() const
libp2p::peer::PeerInfo PeerInfo
application::AppConfiguration const & app_config_
libp2p::peer::PeerId PeerId
std::shared_ptr< CollationObserver > observer_
CollationProtocol()=delete
void onCollationAdvRx(libp2p::peer::PeerId const &peer_id, CollatorAdvertisement &&collation_adv)
Protocols const & protocolIds() const
void onCollationMessageRx(libp2p::peer::PeerId const &peer_id, CollationMessage &&collation_message)
bool start(std::weak_ptr< T > wptr)
const libp2p::peer::Protocol kCollationProtocol
void onIncomingStream(std::shared_ptr< Stream > stream) override
void readCollationMsg(std::shared_ptr< kagome::network::Stream > stream)
void onCollationDeclRx(libp2p::peer::PeerId const &peer_id, CollatorDeclaration &&collation_decl)