Kagome
Polkadot Runtime Engine in C++17
collation_protocol.cpp
Go to the documentation of this file.
1 
7 
8 #include <iostream>
9 
10 #include "network/common.hpp"
14 
15 namespace kagome::network {
16 
18  libp2p::Host &host,
19  application::AppConfiguration const &app_config,
20  application::ChainSpec const & /*chain_spec*/,
21  std::shared_ptr<CollationObserver> observer)
22  : base_(host, {kCollationProtocol}, "CollationProtocol"),
23  observer_(std::move(observer)),
24  app_config_{app_config} {}
25 
27  return base_.start(weak_from_this());
28  }
29 
31  return base_.stop();
32  }
33 
35  const PeerInfo &peer_info,
36  std::function<void(outcome::result<std::shared_ptr<Stream>>)> &&cb) {
37  SL_DEBUG(base_.logger(),
38  "Connect for {} stream with {}",
39  protocolName(),
40  peer_info.id);
41 
42  base_.host().newStream(
43  peer_info.id,
45  [wp = weak_from_this(), peer_id = peer_info.id, cb = std::move(cb)](
46  auto &&stream_res) mutable {
47  auto self = wp.lock();
48  if (not self) {
50  return;
51  }
52 
53  if (not stream_res.has_value()) {
54  SL_VERBOSE(self->base_.logger(),
55  "Can't create outgoing {} stream with {}: {}",
56  self->protocolName(),
57  peer_id,
58  stream_res.error().message());
59  cb(stream_res.as_failure());
60  return;
61  }
62 
63  auto &stream = stream_res.value().stream;
64  BOOST_ASSERT(stream->remotePeerId().has_value());
65  self->doCollatorHandshake<false>(
66  stream,
67  [wptr{wp},
68  cb{std::move(cb)}](std::shared_ptr<Stream> const &stream) {
69  if (!stream)
71  else
72  cb(stream);
73  });
74  });
75  }
76 
78  libp2p::peer::PeerId const &peer_id,
79  CollatorDeclaration &&collation_decl) {
80  if (observer_) {
81  observer_->onDeclare(peer_id,
82  collation_decl.collator_id,
83  collation_decl.para_id,
84  collation_decl.signature);
85  }
86  }
87 
89  libp2p::peer::PeerId const &peer_id,
90  CollatorAdvertisement &&collation_adv) {
91  if (observer_) {
92  observer_->onAdvertise(peer_id, collation_adv.relay_parent);
93  }
94  }
95 
97  libp2p::peer::PeerId const &peer_id,
98  CollationMessage &&collation_message) {
99  visit_in_place(
100  std::move(collation_message),
101  [&](network::CollatorDeclaration &&collation_decl) {
102  onCollationDeclRx(peer_id, std::move(collation_decl));
103  },
104  [&](network::CollatorAdvertisement &&collation_adv) {
105  onCollationAdvRx(peer_id, std::move(collation_adv));
106  },
107  [&](auto &&) {
108  SL_WARN(base_.logger(), "Unexpected collation message from.");
109  });
110  }
111 
113  std::shared_ptr<kagome::network::Stream> stream) {
114  auto read_writer = std::make_shared<ScaleMessageReadWriter>(stream);
115  read_writer->template read<WireMessage>(
116  [wptr{weak_from_this()},
117  stream{std::move(stream)}](auto &&result) mutable {
118  auto self = wptr.lock();
119  if (!self) {
120  stream->close([](auto &&) {});
121  return;
122  }
123 
124  if (!result) {
125  SL_WARN(self->base_.logger(),
126  "Can't read incoming collation message from stream {} with "
127  "error {}",
128  stream->remotePeerId().value(),
129  result.error().message());
130  self->base_.closeStream(wptr, stream);
131  return;
132  }
133 
134  SL_VERBOSE(self->base_.logger(),
135  "Received collation message from {}",
136  stream->remotePeerId().has_value()
137  ? stream->remotePeerId().value().toBase58()
138  : "{no peerId}");
139 
140  visit_in_place(
141  std::move(result.value()),
142  [&](ViewUpdate &&) {
143  SL_VERBOSE(self->base_.logger(),
144  "Received ViewUpdate from {}",
145  stream->remotePeerId().has_value()
146  ? stream->remotePeerId().value().toBase58()
147  : "{no peerId}");
148  },
149  [&](ProtocolMessage &&p) {
150  visit_in_place(std::move(p), [&](CollationMessage &&m) {
151  BOOST_ASSERT(stream->remotePeerId().has_value());
152  self->onCollationMessageRx(stream->remotePeerId().value(),
153  std::move(m));
154  });
155  });
156  self->readCollationMsg(std::move(stream));
157  });
158  }
159 
160  void CollationProtocol::onIncomingStream(std::shared_ptr<Stream> stream) {
161  BOOST_ASSERT(stream->remotePeerId().has_value());
162  doCollatorHandshake<true>(
163  stream,
164  [wptr{weak_from_this()}](
165  std::shared_ptr<Stream> const &stream) mutable {
166  if (!stream) {
167  return;
168  }
169  if (auto self = wptr.lock()) {
170  self->readCollationMsg(stream);
171  }
172  });
173  }
174 
175 } // namespace kagome::network
void newOutgoingStream(const PeerInfo &peer_info, std::function< void(outcome::result< std::shared_ptr< Stream >>)> &&cb) override
boost::variant< CollatorDeclaration, CollatorAdvertisement, Dummy, Dummy, Seconded > CollationMessage
Dummy ViewUpdate
ViewUpdate message. Maybe will be implemented later.
const std::string & protocolName() const override
boost::variant< CollationMessage > ProtocolMessage
log::Logger const & logger() const
libp2p::peer::PeerInfo PeerInfo
application::AppConfiguration const & app_config_
libp2p::peer::PeerId PeerId
std::shared_ptr< CollationObserver > observer_
void onCollationAdvRx(libp2p::peer::PeerId const &peer_id, CollatorAdvertisement &&collation_adv)
Protocols const & protocolIds() const
void onCollationMessageRx(libp2p::peer::PeerId const &peer_id, CollationMessage &&collation_message)
bool start(std::weak_ptr< T > wptr)
const libp2p::peer::Protocol kCollationProtocol
Definition: common.hpp:26
void onIncomingStream(std::shared_ptr< Stream > stream) override
void readCollationMsg(std::shared_ptr< kagome::network::Stream > stream)
void onCollationDeclRx(libp2p::peer::PeerId const &peer_id, CollatorDeclaration &&collation_decl)