Kagome
Polkadot Runtime Engine in C++17
state_protocol_impl.cpp
Go to the documentation of this file.
1 
9 #include "network/common.hpp"
13 
14 namespace kagome::network {
15 
17  libp2p::Host &host,
18  const application::ChainSpec &chain_spec,
19  std::shared_ptr<StateProtocolObserver> state_observer)
20  : host_(host), state_observer_(std::move(state_observer)) {
21  BOOST_ASSERT(state_observer_ != nullptr);
22  const_cast<Protocol &>(protocol_) =
23  fmt::format(kStateProtocol.data(), chain_spec.protocolId());
24  }
25 
27  host_.setProtocolHandler(
28  {protocol_}, [wp = weak_from_this()](auto &&stream_and_proto) {
29  if (auto self = wp.lock()) {
30  if (auto peer_id = stream_and_proto.stream->remotePeerId()) {
31  SL_TRACE(self->log_,
32  "Handled {} protocol stream from {:l}",
33  self->protocol_,
34  peer_id.value());
35  self->onIncomingStream(
36  std::forward<decltype(stream_and_proto.stream)>(
37  stream_and_proto.stream));
38  return;
39  }
40  self->log_->warn("Handled {} protocol stream from unknown peer",
41  self->protocol_);
42  }
43  });
44  return true;
45  }
46 
48  return true;
49  }
50 
51  void StateProtocolImpl::onIncomingStream(std::shared_ptr<Stream> stream) {
52  BOOST_ASSERT(stream->remotePeerId().has_value());
53 
54  readRequest(stream);
55  }
56 
58  const PeerInfo &peer_info,
59  std::function<void(outcome::result<std::shared_ptr<Stream>>)> &&cb) {
60  SL_DEBUG(log_, "Connect for {} stream with {}", protocol_, peer_info.id);
61 
62  host_.newStream(
63  peer_info.id,
64  {protocol_},
65  [wp = weak_from_this(), peer_id = peer_info.id, cb = std::move(cb)](
66  auto &&stream_res) mutable {
67  auto self = wp.lock();
68  if (not self) {
69  cb(ProtocolError::GONE);
70  return;
71  }
72 
73  if (not stream_res.has_value()) {
74  SL_VERBOSE(
75  self->log_,
76  "Error happened while connection over {} stream with {}: {}",
77  self->protocol_,
78  peer_id,
79  stream_res.error().message());
80  cb(stream_res.as_failure());
81  return;
82  }
83  auto &stream_and_proto = stream_res.value();
84 
85  SL_DEBUG(self->log_,
86  "Established connection over {} stream with {}",
87  self->protocol_,
88  peer_id);
89 
90  cb(std::move(stream_and_proto.stream));
91  });
92  }
93 
94  void StateProtocolImpl::readRequest(std::shared_ptr<Stream> stream) {
95  auto read_writer = std::make_shared<ProtobufMessageReadWriter>(stream);
96 
97  SL_DEBUG(log_,
98  "Read request from incoming {} stream with {}",
99  protocol_,
100  stream->remotePeerId().value());
101 
102  read_writer->read<StateRequest>([stream, wp = weak_from_this()](
103  auto &&state_request_res) mutable {
104  auto self = wp.lock();
105  if (not self) {
106  stream->reset();
107  return;
108  }
109 
110  if (not state_request_res.has_value()) {
111  SL_VERBOSE(self->log_,
112  "Error at read request from incoming {} stream with {}: {}",
113  self->protocol_,
114  stream->remotePeerId().value(),
115  state_request_res.error().message());
116 
117  stream->reset();
118  return;
119  }
120 
121  auto &state_request = state_request_res.value();
122 
123  if (self->log_->level() >= log::Level::VERBOSE) {
124  std::string keys;
125  if (!state_request.start.empty()) {
126  keys = " starting with keys [";
127  for (const auto &key : state_request.start) {
128  keys += key.toHex();
129  keys += ",";
130  }
131  keys.back() = ']';
132  }
133  SL_VERBOSE(
134  self->log_,
135  "State request is received from incoming {} stream with {} for "
136  "block {}{}.",
137  self->protocol_,
138  stream->remotePeerId().value(),
139  state_request.hash.toHex(),
140  keys);
141  }
142 
143  auto state_response_res =
144  self->state_observer_->onStateRequest(state_request);
145 
146  if (not state_response_res) {
147  SL_VERBOSE(
148  self->log_,
149  "Error at execute request from incoming {} stream with {}: {}",
150  self->protocol_,
151  stream->remotePeerId().value(),
152  state_response_res.error().message());
153 
154  stream->reset();
155  return;
156  }
157 
158  self->writeResponse(std::move(stream),
159  std::move(state_response_res.value()));
160  });
161  }
162 
164  const PeerId &peer_id,
165  StateRequest state_request,
166  std::function<void(outcome::result<StateResponse>)> &&response_handler) {
167  auto addresses_res =
168  host_.getPeerRepository().getAddressRepository().getAddresses(peer_id);
169  if (not addresses_res.has_value()) {
170  response_handler(addresses_res.as_failure());
171  return;
172  }
173 
175  {peer_id, std::move(addresses_res.value())},
176  [wp = weak_from_this(),
177  response_handler = std::move(response_handler),
178  state_request = std::move(state_request)](auto &&stream_res) mutable {
179  if (not stream_res.has_value()) {
180  response_handler(stream_res.as_failure());
181  return;
182  }
183  auto &stream = stream_res.value();
184 
185  auto self = wp.lock();
186  if (not self) {
187  stream->reset();
188  response_handler(ProtocolError::GONE);
189  return;
190  }
191 
192  SL_DEBUG(self->log_,
193  "Established outgoing {} stream with {}",
194  self->protocol_,
195  stream->remotePeerId().value());
196 
197  self->writeRequest(stream,
198  std::move(state_request),
199  [stream,
200  wp = std::move(wp),
201  response_handler = std::move(response_handler)](
202  auto &&write_res) mutable {
203  auto self = wp.lock();
204  if (not self) {
205  stream->reset();
206  response_handler(ProtocolError::GONE);
207  return;
208  }
209 
210  SL_DEBUG(self->log_, "State request sent");
211 
212  if (not write_res.has_value()) {
213  SL_WARN(self->log_, "Error getting response");
214  stream->reset();
215  response_handler(write_res.as_failure());
216  return;
217  }
218 
219  self->readResponse(std::move(stream),
220  std::move(response_handler));
221  });
222  });
223  }
224 
225  void StateProtocolImpl::writeResponse(std::shared_ptr<Stream> stream,
226  StateResponse state_response) {
227  auto read_writer = std::make_shared<ProtobufMessageReadWriter>(stream);
228 
229  read_writer->write(
230  state_response,
231  [stream = std::move(stream),
232  wp = weak_from_this()](auto &&write_res) mutable {
233  auto self = wp.lock();
234  if (not self) {
235  stream->reset();
236  return;
237  }
238 
239  if (not write_res.has_value()) {
240  SL_VERBOSE(
241  self->log_,
242  "Error at writing response to incoming {} stream with {}: {}",
243  self->protocol_,
244  stream->remotePeerId().value(),
245  write_res.error().message());
246  stream->reset();
247  return;
248  }
249 
250  stream->close([](auto &&...) {});
251  });
252  }
253 
255  std::shared_ptr<Stream> stream,
256  StateRequest state_request,
257  std::function<void(outcome::result<void>)> &&cb) {
258  auto read_writer = std::make_shared<ProtobufMessageReadWriter>(stream);
259 
260  SL_DEBUG(log_,
261  "Write request info outgoing {} stream with {}",
262  protocol_,
263  stream->remotePeerId().value());
264 
265  read_writer->write(
266  state_request,
267  [stream, wp = weak_from_this(), cb = std::move(cb)](
268  auto &&write_res) mutable {
269  auto self = wp.lock();
270  if (not self) {
271  stream->reset();
273  return;
274  }
275 
276  if (not write_res.has_value()) {
277  SL_VERBOSE(
278  self->log_,
279  "Error at write request into outgoing {} stream with {}: {}",
280  self->protocol_,
281  stream->remotePeerId().value(),
282  write_res.error().message());
283 
284  stream->reset();
285  cb(write_res.as_failure());
286  return;
287  }
288 
289  SL_DEBUG(self->log_,
290  "Request written successful into outgoing {} stream with {}",
291  self->protocol_,
292  stream->remotePeerId().value());
293 
294  cb(outcome::success());
295  });
296  }
297 
299  std::shared_ptr<Stream> stream,
300  std::function<void(outcome::result<StateResponse>)> &&response_handler) {
301  auto read_writer = std::make_shared<ProtobufMessageReadWriter>(stream);
302 
303  SL_DEBUG(log_,
304  "Read response from outgoing {} stream with {}",
305  protocol_,
306  stream->remotePeerId().value());
307 
308  read_writer->read<StateResponse>([stream,
309  wp = weak_from_this(),
310  response_handler =
311  std::move(response_handler)](
312  auto &&state_response_res) mutable {
313  auto self = wp.lock();
314  if (not self) {
315  stream->reset();
316  response_handler(ProtocolError::GONE);
317  return;
318  }
319 
320  if (not state_response_res.has_value()) {
321  SL_WARN(self->log_,
322  "Error at read response from outgoing {} stream with {}: {}",
323  self->protocol_,
324  stream->remotePeerId().value(),
325  state_response_res.error().message());
326 
327  stream->reset();
328  response_handler(state_response_res.as_failure());
329  return;
330  }
331  auto &state_response = state_response_res.value();
332 
333  SL_DEBUG(self->log_,
334  "Successful response read from outgoing {} stream with {}",
335  self->protocol_,
336  stream->remotePeerId().value());
337 
338  stream->reset();
339  response_handler(std::move(state_response));
340  });
341  }
342 
343 } // namespace kagome::network
primitives::BlockHash hash
Block header hash.
StateProtocolImpl(libp2p::Host &host, const application::ChainSpec &chain_spec, std::shared_ptr< StateProtocolObserver > state_observer)
void writeResponse(std::shared_ptr< Stream > stream, StateResponse state_response)
const libp2p::peer::Protocol kStateProtocol
Definition: common.hpp:17
STL namespace.
void writeRequest(std::shared_ptr< Stream > stream, StateRequest state_request, std::function< void(outcome::result< void >)> &&cb)
const libp2p::peer::Protocol protocol_
libp2p::peer::PeerInfo PeerInfo
libp2p::peer::Protocol Protocol
libp2p::peer::PeerId PeerId
void newOutgoingStream(const PeerInfo &peer_info, std::function< void(outcome::result< std::shared_ptr< Stream >>)> &&cb) override
void request(const PeerId &peer_id, StateRequest state_request, std::function< void(outcome::result< StateResponse >)> &&response_handler) override
Make async request to peer and return response in callback.
virtual const std::string & protocolId() const =0
std::string toHex() const noexcept
Definition: blob.hpp:160
void readRequest(std::shared_ptr< Stream > stream)
void onIncomingStream(std::shared_ptr< Stream > stream) override
std::shared_ptr< StateProtocolObserver > state_observer_
void readResponse(std::shared_ptr< Stream > stream, std::function< void(outcome::result< StateResponse >)> &&response_handler)