Kagome
Polkadot Runtime Engine in C++17
collation_protocol.hpp
Go to the documentation of this file.
1 
6 #ifndef KAGOME_NETWORK_COLLATIONPROTOCOL
7 #define KAGOME_NETWORK_COLLATIONPROTOCOL
8 
10 
11 #include <memory>
12 
13 #include <libp2p/connection/stream.hpp>
14 #include <libp2p/host/host.hpp>
15 #include <libp2p/peer/peer_id.hpp>
16 
19 #include "log/logger.hpp"
21 #include "network/common.hpp"
24 #include "network/peer_manager.hpp"
27 #include "network/types/roles.hpp"
28 #include "network/types/status.hpp"
29 #include "utils/non_copyable.hpp"
30 
31 namespace kagome::network {
32 
33  class CollationProtocol final
34  : public ProtocolBase,
35  public std::enable_shared_from_this<CollationProtocol>,
37  NonMovable {
38  public:
39  CollationProtocol() = delete;
40  ~CollationProtocol() override = default;
41 
43  application::AppConfiguration const &app_config,
44  application::ChainSpec const &chain_spec,
45  std::shared_ptr<CollationObserver> observer);
46 
47  void onIncomingStream(std::shared_ptr<Stream> stream) override;
48  void newOutgoingStream(
49  const PeerInfo &peer_info,
50  std::function<void(outcome::result<std::shared_ptr<Stream>>)> &&cb)
51  override;
52 
53  bool start() override;
54  bool stop() override;
55 
56  const std::string &protocolName() const override {
57  return kCollationProtocol;
58  }
59 
60  private:
61  template <bool DirectionIncoming, typename F>
63  std::shared_ptr<kagome::network::Stream> const &stream, F &&func) {
64  auto read_writer = std::make_shared<ScaleMessageReadWriter>(stream);
65  if constexpr (DirectionIncoming) {
66  read_writer->read<Roles>([wptr{weak_from_this()},
67  func{std::forward<F>(func)},
68  stream](auto &&result) mutable {
69  auto self = wptr.lock();
70  if (!result || !self) return std::forward<F>(func)(std::move(result));
71 
72  auto read_writer = std::make_shared<ScaleMessageReadWriter>(stream);
73  read_writer->write(
74  self->app_config_.roles(),
75  [func{std::forward<F>(func)}, stream](auto &&result) mutable {
76  return std::forward<F>(func)(std::move(result));
77  });
78  });
79  } else {
80  read_writer->write(
82  [wptr{weak_from_this()}, func{std::forward<F>(func)}, stream](
83  auto &&result) mutable {
84  auto self = wptr.lock();
85  if (!result || !self)
86  return std::forward<F>(func)(std::move(result));
87 
88  auto read_writer =
89  std::make_shared<ScaleMessageReadWriter>(stream);
90  read_writer->read<Roles>(
91  [func{std::forward<F>(func)}, stream](auto &&result) mutable {
92  return std::forward<F>(func)(std::move(result));
93  });
94  });
95  }
96  }
97 
98  template <bool DirectionIncoming, typename F>
100  std::shared_ptr<kagome::network::Stream> const &stream, F &&func) {
101  exchangeHandshake<DirectionIncoming>(
102  stream,
103  [func{std::forward<F>(func)}, stream, wptr{weak_from_this()}](
104  auto &&result) mutable {
105  if (auto self = wptr.lock()) {
106  if (!result) {
107  SL_WARN(self->base_.logger(),
108  "Handshake with {} failed with error {}",
109  stream->remotePeerId().value(),
110  result.error().message());
111  self->base_.closeStream(wptr, stream);
112  std::forward<F>(func)(nullptr);
113  return;
114  }
115  }
116  std::forward<F>(func)(stream);
117  });
118  }
119 
120  void readCollationMsg(std::shared_ptr<kagome::network::Stream> stream);
121 
122  void onCollationMessageRx(libp2p::peer::PeerId const &peer_id,
123  CollationMessage &&collation_message);
124  void onCollationDeclRx(libp2p::peer::PeerId const &peer_id,
125  CollatorDeclaration &&collation_decl);
126  void onCollationAdvRx(libp2p::peer::PeerId const &peer_id,
127  CollatorAdvertisement &&collation_adv);
128 
130  std::shared_ptr<CollationObserver> observer_;
132  };
133 
134 } // namespace kagome::network
135 
136 #endif // KAGOME_NETWORK_COLLATIONPROTOCOL
void newOutgoingStream(const PeerInfo &peer_info, std::function< void(outcome::result< std::shared_ptr< Stream >>)> &&cb) override
boost::variant< CollatorDeclaration, CollatorAdvertisement, Dummy, Dummy, Seconded > CollationMessage
const std::string & protocolName() const override
virtual network::Roles roles() const =0
libp2p::peer::PeerInfo PeerInfo
application::AppConfiguration const & app_config_
libp2p::peer::PeerId PeerId
std::shared_ptr< CollationObserver > observer_
~CollationProtocol() override=default
void onCollationAdvRx(libp2p::peer::PeerId const &peer_id, CollatorAdvertisement &&collation_adv)
void onCollationMessageRx(libp2p::peer::PeerId const &peer_id, CollationMessage &&collation_message)
void exchangeHandshake(std::shared_ptr< kagome::network::Stream > const &stream, F &&func)
const libp2p::peer::Protocol kCollationProtocol
Definition: common.hpp:26
void onIncomingStream(std::shared_ptr< Stream > stream) override
void readCollationMsg(std::shared_ptr< kagome::network::Stream > stream)
void onCollationDeclRx(libp2p::peer::PeerId const &peer_id, CollatorDeclaration &&collation_decl)
void doCollatorHandshake(std::shared_ptr< kagome::network::Stream > const &stream, F &&func)