Kagome
Polkadot Runtime Engine in C++17
sync_protocol_impl.cpp
Go to the documentation of this file.
1 
7 
8 #include "common/visitor.hpp"
11 #include "network/common.hpp"
15 #include "network/rpc.hpp"
18 
19 namespace kagome::network {
20 
21  namespace detail {
23  std::size_t capacity, std::chrono::seconds expiration_time)
24  : capacity_{capacity}, expiration_time_{expiration_time} {
25  // resize to preallocate elements
26  storage_.resize(capacity_);
27  // preallocate internal storage of containers
28  lookup_table_.reserve(capacity_);
29  free_slots_.reserve(capacity_);
30  for (std::size_t i = 0; i < capacity_; ++i) {
31  free_slots_.emplace(i);
32  }
33  }
34 
36  const PeerId &peer_id, BlocksRequest::Fingerprint request_fingerprint) {
37  auto found = lookup_table_.find(peer_id);
38 
39  // the peer is not cached yet
40  if (found == lookup_table_.end()) {
41  cache(peer_id, request_fingerprint);
42  return false;
43  }
44 
45  // the peer was previously cached
46  auto slot = found->second;
47  auto &entry = storage_[slot];
48  auto now = std::chrono::system_clock::now();
49  if (not storage_[slot] // dangling reference was found
50  or now > entry->valid_till // the record was expired
51  ) {
52  cache(peer_id, request_fingerprint, slot);
53  return false;
54  }
55 
56  // peer record in cache is valid and not expired
57  entry->valid_till = now + expiration_time_; // prolong expiry time
58  auto &requests = entry->fingerprints;
59  if (std::count(requests.begin(), requests.end(), request_fingerprint)
60  >= 2) {
61  return true;
62  }
63  requests.push_back(request_fingerprint);
64  return false;
65  }
66 
68  const PeerId &peer_id,
69  BlocksRequest::Fingerprint request_fingerprint,
70  std::optional<CacheRecordIndex> target_slot) {
71  CacheRecordIndex slot;
72  if (target_slot) {
73  slot = *target_slot;
74  } else {
75  if (free_slots_.empty()) {
76  purge();
77  }
78  if (free_slots_.empty()) {
79  return;
80  }
81  auto free_slot = free_slots_.begin();
82  slot = *free_slot;
83  free_slots_.erase(free_slot);
84  }
85 
86  boost::circular_buffer<BlocksRequest::Fingerprint> fingerprints(
88  if (target_slot and storage_[*target_slot]) {
89  fingerprints = std::move(storage_[*target_slot]->fingerprints);
90  }
91  fingerprints.push_back(request_fingerprint);
92 
93  CacheRecord record{
94  .peer_id = peer_id,
95  .valid_till = std::chrono::system_clock::now() + expiration_time_,
96  .fingerprints = std::move(fingerprints),
97  };
98 
99  storage_[slot] = std::move(record);
100  lookup_table_[peer_id] = slot;
101  }
102 
104  auto it = lookup_table_.begin();
105  while (it != lookup_table_.end()) {
106  auto slot = it->second;
107  // remove dangling reference from lookup table
108  if (not storage_[slot]) {
109  free_slots_.insert(slot);
110  it = lookup_table_.erase(it); // points to the following element
111  continue;
112  }
113  auto &record = storage_[slot].value();
114  // remove expired entries
115  if (std::chrono::system_clock::now() > record.valid_till) {
116  storage_[slot] = std::nullopt;
117  free_slots_.insert(slot);
118  it = lookup_table_.erase(it); // points to the following element
119  continue;
120  }
121  it++;
122  }
123  }
124 
125  } // namespace detail
126 
128  libp2p::Host &host,
129  const application::ChainSpec &chain_spec,
130  std::shared_ptr<SyncProtocolObserver> sync_observer,
131  std::shared_ptr<ReputationRepository> reputation_repository)
132  : base_(host,
133  {fmt::format(kSyncProtocol.data(), chain_spec.protocolId())},
134  "SyncProtocol"),
135  sync_observer_(std::move(sync_observer)),
136  reputation_repository_(std::move(reputation_repository)),
139  BOOST_ASSERT(sync_observer_ != nullptr);
140  BOOST_ASSERT(reputation_repository_ != nullptr);
141  }
142 
144  return base_.start(weak_from_this());
145  }
146 
148  return base_.stop();
149  }
150 
151  void SyncProtocolImpl::onIncomingStream(std::shared_ptr<Stream> stream) {
152  BOOST_ASSERT(stream->remotePeerId().has_value());
153 
154  readRequest(stream);
155  }
156 
158  const PeerInfo &peer_info,
159  std::function<void(outcome::result<std::shared_ptr<Stream>>)> &&cb) {
160  SL_DEBUG(base_.logger(),
161  "Connect for {} stream with {}",
162  protocolName(),
163  peer_info.id);
164 
165  base_.host().newStream(
166  peer_info.id,
167  base_.protocolIds(),
168  [wp = weak_from_this(), peer_id = peer_info.id, cb = std::move(cb)](
169  auto &&stream_res) mutable {
170  network::streamReadBuffer(stream_res);
171  auto self = wp.lock();
172  if (not self) {
174  return;
175  }
176 
177  if (not stream_res.has_value()) {
178  SL_VERBOSE(
179  self->base_.logger(),
180  "Error happened while connection over {} stream with {}: {}",
181  self->protocolName(),
182  peer_id,
183  stream_res.error().message());
184  cb(stream_res.as_failure());
185  return;
186  }
187  const auto &stream_and_proto = stream_res.value();
188 
189  SL_DEBUG(self->base_.logger(),
190  "Established connection over {} stream with {}",
191  stream_and_proto.protocol,
192  peer_id);
193 
194  cb(std::move(stream_and_proto.stream));
195  });
196  }
197 
198  void SyncProtocolImpl::readRequest(std::shared_ptr<Stream> stream) {
199  auto read_writer = std::make_shared<ProtobufMessageReadWriter>(stream);
200 
201  SL_DEBUG(base_.logger(),
202  "Read request from incoming {} stream with {}",
203  protocolName(),
204  stream->remotePeerId().value());
205 
206  read_writer->read<BlocksRequest>([stream, wp = weak_from_this()](
207  auto &&block_request_res) mutable {
208  auto self = wp.lock();
209  if (not self) {
210  stream->reset();
211  return;
212  }
213 
214  if (not block_request_res.has_value()) {
215  SL_VERBOSE(self->base_.logger(),
216  "Error at read request from incoming {} stream with {}: {}",
217  self->protocolName(),
218  stream->remotePeerId().value(),
219  block_request_res.error().message());
220 
221  stream->reset();
222  return;
223  }
224  auto &block_request = block_request_res.value();
225 
226  if (self->base_.logger()->level() >= log::Level::VERBOSE) {
227  std::string logmsg = fmt::format(
228  "Block request is received from incoming {} stream with {}",
229  self->protocolName(),
230  stream->remotePeerId().value());
231 
232  logmsg += ", fields=";
233  if (block_request.fields & BlockAttribute::HEADER) logmsg += 'H';
234  if (block_request.fields & BlockAttribute::BODY) logmsg += 'B';
235  if (block_request.fields & BlockAttribute::RECEIPT) logmsg += 'R';
236  if (block_request.fields & BlockAttribute::MESSAGE_QUEUE) logmsg += 'M';
237  if (block_request.fields & BlockAttribute::JUSTIFICATION) logmsg += 'J';
238 
239  visit_in_place(block_request.from, [&](const auto &from) {
240  logmsg += fmt::format(", from {}", from);
241  });
242 
243  logmsg +=
244  block_request.direction == Direction::ASCENDING ? " anc" : " desc";
245 
246  if (block_request.max.has_value()) {
247  logmsg += fmt::format(", max {}", block_request.max.value());
248  }
249 
250  self->base_.logger()->verbose(std::move(logmsg));
251  }
252 
253  auto block_response_res =
254  self->sync_observer_->onBlocksRequest(block_request);
255 
256  if (not block_response_res) {
257  SL_VERBOSE(
258  self->base_.logger(),
259  "Error at execute request from incoming {} stream with {}: {}",
260  self->protocolName(),
261  stream->remotePeerId().value(),
262  block_response_res.error().message());
263 
264  stream->reset();
265  return;
266  }
267  auto &block_response = block_response_res.value();
268 
269  if ((not block_response.blocks.empty()) and stream->remotePeerId()
270  and self->response_cache_.isDuplicate(stream->remotePeerId().value(),
271  block_request.fingerprint())) {
272  auto peer_id = stream->remotePeerId().value();
273  SL_DEBUG(self->base_.logger(),
274  "Stream {} to {} reset due to repeating non-polite block "
275  "request with fingerprint {}",
276  self->protocolName(),
277  peer_id,
278  block_request.fingerprint());
279  self->reputation_repository_->changeForATime(
280  peer_id,
283  stream->reset();
284  return;
285  }
286 
287  self->writeResponse(std::move(stream), block_response);
288  });
289  }
290 
291  void SyncProtocolImpl::writeResponse(std::shared_ptr<Stream> stream,
292  const BlocksResponse &block_response) {
293  auto read_writer = std::make_shared<ProtobufMessageReadWriter>(stream);
294 
295  read_writer->write(
296  block_response,
297  [stream = std::move(stream),
298  wp = weak_from_this()](auto &&write_res) mutable {
299  auto self = wp.lock();
300  if (not self) {
301  stream->reset();
302  return;
303  }
304 
305  if (not write_res.has_value()) {
306  SL_VERBOSE(
307  self->base_.logger(),
308  "Error at writing response to incoming {} stream with {}: {}",
309  self->protocolName(),
310  stream->remotePeerId().value(),
311  write_res.error().message());
312  stream->reset();
313  return;
314  }
315 
316  stream->close([](auto &&...) {});
317  });
318  }
319 
321  std::shared_ptr<Stream> stream,
322  BlocksRequest block_request,
323  std::function<void(outcome::result<void>)> &&cb) {
324  auto read_writer = std::make_shared<ProtobufMessageReadWriter>(stream);
325 
326  SL_DEBUG(base_.logger(),
327  "Write request info outgoing {} stream with {}",
328  protocolName(),
329  stream->remotePeerId().value());
330 
331  read_writer->write(
332  block_request,
333  [stream, wp = weak_from_this(), cb = std::move(cb)](
334  auto &&write_res) mutable {
335  auto self = wp.lock();
336  if (not self) {
337  stream->reset();
339  return;
340  }
341 
342  if (not write_res.has_value()) {
343  SL_VERBOSE(
344  self->base_.logger(),
345  "Error at write request into outgoing {} stream with {}: {}",
346  self->protocolName(),
347  stream->remotePeerId().value(),
348  write_res.error().message());
349 
350  stream->reset();
351  cb(write_res.as_failure());
352  return;
353  }
354 
355  SL_DEBUG(self->base_.logger(),
356  "Request written successful into outgoing {} stream with {}",
357  self->protocolName(),
358  stream->remotePeerId().value());
359 
360  cb(outcome::success());
361  });
362  }
363 
365  std::shared_ptr<Stream> stream,
366  std::function<void(outcome::result<BlocksResponse>)> &&response_handler) {
367  auto read_writer = std::make_shared<ProtobufMessageReadWriter>(stream);
368 
369  SL_DEBUG(base_.logger(),
370  "Read response from outgoing {} stream with {}",
371  protocolName(),
372  stream->remotePeerId().value());
373 
374  read_writer->read<BlocksResponse>([stream,
375  wp = weak_from_this(),
376  response_handler =
377  std::move(response_handler)](
378  auto &&block_response_res) mutable {
379  auto self = wp.lock();
380  if (not self) {
381  stream->reset();
382  response_handler(ProtocolError::GONE);
383  return;
384  }
385 
386  if (not block_response_res.has_value()) {
387  SL_VERBOSE(self->base_.logger(),
388  "Error at read response from outgoing {} stream with {}: {}",
389  self->protocolName(),
390  stream->remotePeerId().value(),
391  block_response_res.error().message());
392 
393  stream->reset();
394  response_handler(block_response_res.as_failure());
395  return;
396  }
397  auto &blocks_response = block_response_res.value();
398 
399  SL_DEBUG(self->base_.logger(),
400  "Successful response read from outgoing {} stream with {}",
401  self->protocolName(),
402  stream->remotePeerId().value());
403 
404  stream->reset();
405  response_handler(std::move(blocks_response));
406  });
407  }
408 
410  const PeerId &peer_id,
411  BlocksRequest block_request,
412  std::function<void(outcome::result<BlocksResponse>)> &&response_handler) {
413  auto addresses_res =
414  base_.host().getPeerRepository().getAddressRepository().getAddresses(
415  peer_id);
416  if (not addresses_res.has_value()) {
417  response_handler(addresses_res.as_failure());
418  return;
419  }
420 
421  if (base_.logger()->level() >= log::Level::DEBUG) {
422  std::string logmsg = "Requesting blocks: fields=";
423 
424  if (block_request.fields & BlockAttribute::HEADER) logmsg += 'H';
425  if (block_request.fields & BlockAttribute::BODY) logmsg += "B";
426  if (block_request.fields & BlockAttribute::RECEIPT) logmsg += "R";
427  if (block_request.fields & BlockAttribute::MESSAGE_QUEUE) logmsg += "M";
428  if (block_request.fields & BlockAttribute::JUSTIFICATION) logmsg += "J";
429 
430  visit_in_place(block_request.from, [&](const auto &from) {
431  logmsg += fmt::format(" from {}", from);
432  });
433 
434  logmsg +=
435  block_request.direction == Direction::ASCENDING ? " anc" : " desc";
436 
437  if (block_request.max.has_value()) {
438  logmsg += fmt::format(", max {}", block_request.max.value());
439  }
440 
441  base_.logger()->debug(std::move(logmsg));
442  }
443 
445  {peer_id, addresses_res.value()},
446  [wp = weak_from_this(),
447  response_handler = std::move(response_handler),
448  block_request = std::move(block_request)](auto &&stream_res) mutable {
449  if (not stream_res.has_value()) {
450  response_handler(stream_res.as_failure());
451  return;
452  }
453  auto &stream = stream_res.value();
454 
455  auto self = wp.lock();
456  if (not self) {
457  stream->reset();
458  response_handler(ProtocolError::GONE);
459  return;
460  }
461 
462  SL_DEBUG(self->base_.logger(),
463  "Established outgoing {} stream with {}",
464  self->protocolName(),
465  stream->remotePeerId().value());
466 
467  self->writeRequest(stream,
468  std::move(block_request),
469  [stream,
470  wp = std::move(wp),
471  response_handler = std::move(response_handler)](
472  auto &&write_res) mutable {
473  auto self = wp.lock();
474  if (not self) {
475  stream->reset();
476  response_handler(ProtocolError::GONE);
477  return;
478  }
479 
480  if (not write_res.has_value()) {
481  stream->reset();
482  response_handler(write_res.as_failure());
483  return;
484  }
485 
486  self->readResponse(std::move(stream),
487  std::move(response_handler));
488  });
489  });
490  }
491 
492 } // namespace kagome::network
Include a justification for the block.
std::unordered_map< PeerId, CacheRecordIndex > lookup_table_
BlocksResponseCache(std::size_t capacity, std::chrono::seconds expiration_time)
void cache(const PeerId &peer_id, BlocksRequest::Fingerprint request_fingerprint, std::optional< CacheRecordIndex > target_slot=std::nullopt)
void writeRequest(std::shared_ptr< Stream > stream, BlocksRequest block_request, std::function< void(outcome::result< void >)> &&cb)
void readResponse(std::shared_ptr< Stream > stream, std::function< void(outcome::result< BlocksResponse >)> &&response_handler)
void readRequest(std::shared_ptr< Stream > stream)
BlockAttributes fields
bits, showing, which parts of BlockData to return
void newOutgoingStream(const PeerInfo &peer_info, std::function< void(outcome::result< std::shared_ptr< Stream >>)> &&cb) override
log::Logger const & logger() const
const ReputationChange DUPLICATE_BLOCK_REQUEST
void streamReadBuffer(libp2p::StreamAndProtocol &result)
libp2p::peer::PeerInfo PeerInfo
bool isDuplicate(const PeerId &peer_id, BlocksRequest::Fingerprint request_fingerprint)
libp2p::peer::PeerId PeerId
SyncProtocolImpl(libp2p::Host &host, const application::ChainSpec &chain_spec, std::shared_ptr< SyncProtocolObserver > sync_observer, std::shared_ptr< ReputationRepository > reputation_repository)
virtual const std::string & protocolId() const =0
std::vector< std::optional< CacheRecord > > storage_
std::shared_ptr< SyncProtocolObserver > sync_observer_
detail::BlocksResponseCache response_cache_
std::shared_ptr< ReputationRepository > reputation_repository_
Protocols const & protocolIds() const
primitives::BlockId from
start from this block
std::unordered_set< CacheRecordIndex > free_slots_
static constexpr auto kResponsesCacheExpirationTimeout
void request(const PeerId &peer_id, BlocksRequest block_request, std::function< void(outcome::result< BlocksResponse >)> &&response_handler) override
Make async request to peer and return response in callback.
const std::string & protocolName() const override
bool start(std::weak_ptr< T > wptr)
void writeResponse(std::shared_ptr< Stream > stream, const BlocksResponse &block_response)
static constexpr auto kMaxCacheEntriesPerPeer
static constexpr auto kResponsesCacheCapacity
Fingerprint fingerprint() const
std::optional< uint32_t > max
const libp2p::peer::Protocol kSyncProtocol
Definition: common.hpp:18
void onIncomingStream(std::shared_ptr< Stream > stream) override
Direction direction
sequence direction