6 #ifndef KAGOME_NETWORK_COLLATIONPROTOCOL 7 #define KAGOME_NETWORK_COLLATIONPROTOCOL 13 #include <libp2p/connection/stream.hpp> 14 #include <libp2p/host/host.hpp> 15 #include <libp2p/peer/peer_id.hpp> 35 public std::enable_shared_from_this<CollationProtocol>,
45 std::shared_ptr<CollationObserver> observer);
50 std::function<
void(outcome::result<std::shared_ptr<Stream>>)> &&cb)
53 bool start()
override;
61 template <
bool DirectionIncoming,
typename F>
63 std::shared_ptr<kagome::network::Stream>
const &stream, F &&func) {
64 auto read_writer = std::make_shared<ScaleMessageReadWriter>(stream);
65 if constexpr (DirectionIncoming) {
66 read_writer->read<
Roles>([wptr{weak_from_this()},
67 func{std::forward<F>(func)},
68 stream](
auto &&result)
mutable {
69 auto self = wptr.lock();
70 if (!result || !
self)
return std::forward<F>(func)(std::move(result));
72 auto read_writer = std::make_shared<ScaleMessageReadWriter>(stream);
74 self->app_config_.roles(),
75 [func{std::forward<F>(func)}, stream](
auto &&result)
mutable {
76 return std::forward<F>(func)(std::move(result));
82 [wptr{weak_from_this()}, func{std::forward<F>(func)}, stream](
83 auto &&result)
mutable {
84 auto self = wptr.lock();
86 return std::forward<F>(func)(std::move(result));
89 std::make_shared<ScaleMessageReadWriter>(stream);
90 read_writer->read<
Roles>(
91 [func{std::forward<F>(func)}, stream](
auto &&result)
mutable {
92 return std::forward<F>(func)(std::move(result));
98 template <
bool DirectionIncoming,
typename F>
100 std::shared_ptr<kagome::network::Stream>
const &stream, F &&func) {
101 exchangeHandshake<DirectionIncoming>(
103 [func{std::forward<F>(func)}, stream, wptr{weak_from_this()}](
104 auto &&result)
mutable {
105 if (
auto self = wptr.lock()) {
107 SL_WARN(self->base_.logger(),
108 "Handshake with {} failed with error {}",
109 stream->remotePeerId().value(),
110 result.error().message());
111 self->base_.closeStream(wptr, stream);
112 std::forward<F>(func)(
nullptr);
116 std::forward<F>(func)(stream);
136 #endif // KAGOME_NETWORK_COLLATIONPROTOCOL
void newOutgoingStream(const PeerInfo &peer_info, std::function< void(outcome::result< std::shared_ptr< Stream >>)> &&cb) override
boost::variant< CollatorDeclaration, CollatorAdvertisement, Dummy, Dummy, Seconded > CollationMessage
const std::string & protocolName() const override
virtual network::Roles roles() const =0
libp2p::peer::PeerInfo PeerInfo
application::AppConfiguration const & app_config_
libp2p::peer::PeerId PeerId
std::shared_ptr< CollationObserver > observer_
CollationProtocol()=delete
~CollationProtocol() override=default
void onCollationAdvRx(libp2p::peer::PeerId const &peer_id, CollatorAdvertisement &&collation_adv)
void onCollationMessageRx(libp2p::peer::PeerId const &peer_id, CollationMessage &&collation_message)
void exchangeHandshake(std::shared_ptr< kagome::network::Stream > const &stream, F &&func)
const libp2p::peer::Protocol kCollationProtocol
void onIncomingStream(std::shared_ptr< Stream > stream) override
void readCollationMsg(std::shared_ptr< kagome::network::Stream > stream)
void onCollationDeclRx(libp2p::peer::PeerId const &peer_id, CollatorDeclaration &&collation_decl)
void doCollatorHandshake(std::shared_ptr< kagome::network::Stream > const &stream, F &&func)