Kagome
Polkadot Runtime Engine in C++17
synchronizer_impl.cpp
Go to the documentation of this file.
1 
7 
8 #include <random>
9 
14 #include "primitives/common.hpp"
20 
23  switch (e) {
24  case E::SHUTTING_DOWN:
25  return "Node is shutting down";
26  case E::EMPTY_RESPONSE:
27  return "Response is empty";
28  case E::RESPONSE_WITHOUT_BLOCK_HEADER:
29  return "Response does not contain header of some block";
30  case E::RESPONSE_WITHOUT_BLOCK_BODY:
31  return "Response does not contain body of some block";
32  case E::DISCARDED_BLOCK:
33  return "Block is discarded";
34  case E::WRONG_ORDER:
35  return "Wrong order of blocks/headers in response";
36  case E::INVALID_HASH:
37  return "Hash does not match";
38  case E::ALREADY_IN_QUEUE:
39  return "Block is already enqueued";
40  case E::PEER_BUSY:
41  return "Peer is busy";
42  case E::ARRIVED_TOO_EARLY:
43  return "Block is arrived too early. Try to process it late";
44  case E::DUPLICATE_REQUEST:
45  return "Duplicate of recent request has been detected";
46  }
47  return "unknown error";
48 }
49 
50 namespace {
51  constexpr const char *kImportQueueLength =
52  "kagome_import_queue_blocks_submitted";
53 
54  kagome::network::BlockAttributes attributesForSync(
57  switch (method) {
58  case SM::Full:
60  case SM::Fast:
63  }
65  }
66 } // namespace
67 
68 namespace kagome::network {
69 
71  const application::AppConfiguration &app_config,
72  std::shared_ptr<application::AppStateManager> app_state_manager,
73  std::shared_ptr<blockchain::BlockTree> block_tree,
74  std::shared_ptr<storage::changes_trie::ChangesTracker> changes_tracker,
75  std::shared_ptr<consensus::BlockAppender> block_appender,
76  std::shared_ptr<consensus::BlockExecutor> block_executor,
77  std::shared_ptr<storage::trie::TrieSerializer> serializer,
78  std::shared_ptr<storage::trie::TrieStorage> storage,
79  std::shared_ptr<network::Router> router,
80  std::shared_ptr<libp2p::basic::Scheduler> scheduler,
81  std::shared_ptr<crypto::Hasher> hasher,
82  std::shared_ptr<storage::BufferStorage> buffer_storage)
83  : app_state_manager_(std::move(app_state_manager)),
84  block_tree_(std::move(block_tree)),
85  trie_changes_tracker_(std::move(changes_tracker)),
86  block_appender_(std::move(block_appender)),
87  block_executor_(std::move(block_executor)),
88  serializer_(std::move(serializer)),
89  storage_(std::move(storage)),
90  router_(std::move(router)),
91  scheduler_(std::move(scheduler)),
92  hasher_(std::move(hasher)),
93  buffer_storage_(std::move(buffer_storage)) {
94  BOOST_ASSERT(app_state_manager_);
95  BOOST_ASSERT(block_tree_);
96  BOOST_ASSERT(trie_changes_tracker_);
97  BOOST_ASSERT(block_executor_);
98  BOOST_ASSERT(serializer_);
99  BOOST_ASSERT(storage_);
100  BOOST_ASSERT(router_);
101  BOOST_ASSERT(scheduler_);
102  BOOST_ASSERT(hasher_);
103  BOOST_ASSERT(buffer_storage_);
104 
105  sync_method_ = app_config.syncMethod();
106 
107  // Register metrics
108  metrics_registry_->registerGaugeFamily(
109  kImportQueueLength, "Number of blocks submitted to the import queue");
111  metrics_registry_->registerGaugeMetric(kImportQueueLength);
113 
114  app_state_manager_->takeControl(*this);
115  }
116 
119  auto opt_res =
121  if (opt_res.has_error()) {
122  SL_ERROR(
123  log_, "Can't check of incomplete state sync: {}", opt_res.error());
124  return false;
125  }
126  if (opt_res.value().has_value()) {
127  auto &encoded_block = opt_res.value().value();
128  auto block_res =
129  scale::decode<decltype(state_sync_on_block_)::value_type>(
130  std::move(encoded_block));
131  if (block_res.has_error()) {
132  SL_ERROR(log_,
133  "Can't decode data of incomplete state sync: {}",
134  block_res.error());
135  return false;
136  }
137  auto &block = block_res.value();
138  SL_WARN(log_,
139  "Found incomplete state sync on block {}; "
140  "State sync will be restarted",
141  block);
142  state_sync_on_block_.emplace(std::move(block));
143  }
144  return true;
145  }
146 
149  return true;
150  }
151 
154  node_is_shutting_down_ = true;
155  }
156 
158  const primitives::BlockInfo &block_info, SyncResultHandler &&handler) {
159  // Check if block is already in tree
160  if (block_tree_->hasBlockHeader(block_info.hash)) {
161  scheduler_->schedule(
162  [handler = std::move(handler), block_info] { handler(block_info); });
163  return false;
164  }
165 
166  auto last_finalized_block = block_tree_->getLastFinalized();
167  // Check if block from discarded side-chain
168  if (last_finalized_block.number <= block_info.number) {
169  scheduler_->schedule(
170  [handler = std::move(handler)] { handler(Error::DISCARDED_BLOCK); });
171  return false;
172  }
173 
174  // Check if block has arrived too early
175  auto best_block_res =
176  block_tree_->getBestContaining(last_finalized_block.hash, std::nullopt);
177  BOOST_ASSERT(best_block_res.has_value());
178  const auto &best_block = best_block_res.value();
179  if (best_block.number + kMaxDistanceToBlockForSubscription
180  < block_info.number) {
181  scheduler_->schedule([handler = std::move(handler)] {
182  handler(Error::ARRIVED_TOO_EARLY);
183  });
184  return false;
185  }
186 
187  subscriptions_.emplace(block_info, std::move(handler));
188  return true;
189  }
190 
192  outcome::result<void> res) {
193  auto range = subscriptions_.equal_range(block);
194  for (auto it = range.first; it != range.second;) {
195  auto cit = it++;
196  if (auto node = subscriptions_.extract(cit)) {
197  if (res.has_error()) {
198  auto error = res.as_failure();
199  scheduler_->schedule(
200  [handler = std::move(node.mapped()), error] { handler(error); });
201  } else {
202  scheduler_->schedule(
203  [handler = std::move(node.mapped()), block] { handler(block); });
204  }
205  }
206  }
207  }
208 
210  const primitives::BlockInfo &block_info,
211  const libp2p::peer::PeerId &peer_id,
213  bool subscribe_to_block) {
214  // Subscribe on demand
215  if (subscribe_to_block) {
216  subscribeToBlock(block_info, std::move(handler));
217  }
218 
219  // If provided block is already enqueued, just remember peer
220  if (auto it = known_blocks_.find(block_info.hash);
221  it != known_blocks_.end()) {
222  auto &block_in_queue = it->second;
223  block_in_queue.peers.emplace(peer_id);
224  if (handler) handler(block_info);
225  return false;
226  }
227 
228  // We are communicating with one peer only for one issue.
229  // If peer is already in use, don't start an additional issue.
230  auto peer_is_busy = not busy_peers_.emplace(peer_id).second;
231  if (peer_is_busy) {
232  SL_TRACE(
233  log_,
234  "Can't syncByBlockHeader block {} is received from {}: Peer busy",
235  block_info,
236  peer_id);
237  return false;
238  }
239  SL_TRACE(log_, "Peer {} marked as busy", peer_id);
240 
241  const auto &last_finalized_block = block_tree_->getLastFinalized();
242 
243  auto best_block_res =
244  block_tree_->getBestContaining(last_finalized_block.hash, std::nullopt);
245  BOOST_ASSERT(best_block_res.has_value());
246  const auto &best_block = best_block_res.value();
247 
248  // Provided block is equal our best one. Nothing needs to do.
249  if (block_info == best_block) {
250  if (handler) handler(block_info);
251  return false;
252  }
253 
254  // First we need to find the best common block to avoid manipulations with
255  // blocks what already exists on node.
256  //
257  // Find will be doing in interval between definitely known common block and
258  // potentially unknown.
259  //
260  // Best candidate for lower bound is last finalized (it must be known for
261  // all synchronized nodes).
262  const auto lower = last_finalized_block.number;
263 
264  // Best candidate for upper bound is next potentially known block (next for
265  // min of provided and our best)
266  const auto upper = std::min(block_info.number, best_block.number) + 1;
267 
268  // Search starts with potentially known block (min of provided and our best)
269  const auto hint = std::min(block_info.number, best_block.number);
270 
271  BOOST_ASSERT(lower < upper);
272 
273  // Callback what will be called at the end of finding the best common block
274  auto find_handler =
275  [wp = weak_from_this(), peer_id, handler = std::move(handler)](
276  outcome::result<primitives::BlockInfo> res) mutable {
277  if (auto self = wp.lock()) {
278  // Remove peer from list of busy peers
279  if (self->busy_peers_.erase(peer_id) > 0) {
280  SL_TRACE(self->log_, "Peer {} unmarked as busy", peer_id);
281  }
282 
283  // Finding the best common block was failed
284  if (not res.has_value()) {
285  if (handler) handler(res.as_failure());
286  return;
287  }
288 
289  // If provided block is already enqueued, just remember peer
290  auto &block_info = res.value();
291  if (auto it = self->known_blocks_.find(block_info.hash);
292  it != self->known_blocks_.end()) {
293  auto &block_in_queue = it->second;
294  block_in_queue.peers.emplace(peer_id);
295  if (handler) handler(std::move(block_info));
296  return;
297  }
298 
299  // Start to load blocks since found
300  SL_DEBUG(self->log_,
301  "Start to load blocks from {} since block {}",
302  peer_id,
303  block_info);
304  self->loadBlocks(peer_id, block_info, std::move(handler));
305  }
306  };
307 
308  // Find the best common block
309  SL_DEBUG(log_,
310  "Start to find common block with {} in #{}..#{} to catch up",
311  peer_id,
312  lower,
313  upper);
314  findCommonBlock(peer_id, lower, upper, hint, std::move(find_handler));
315  return true;
316  }
317 
319  const primitives::BlockHeader &header,
320  const libp2p::peer::PeerId &peer_id,
322  auto block_hash = hasher_->blake2b_256(scale::encode(header).value());
323  const primitives::BlockInfo block_info(header.number, block_hash);
324 
325  // Block was applied before
326  if (block_tree_->getBlockHeader(block_hash).has_value()) {
327  return false;
328  }
329 
330  // Block is already enqueued
331  if (auto it = known_blocks_.find(block_info.hash);
332  it != known_blocks_.end()) {
333  auto &block_in_queue = it->second;
334  block_in_queue.peers.emplace(peer_id);
335  return false;
336  }
337 
338  // Number of provided block header greater currently watched.
339  // Reset watched blocks list and start to watch the block with new number
340  if (watched_blocks_number_ < header.number) {
342  watched_blocks_.clear();
343  }
344  // If number of provided block header is the same of watched, add handler
345  // for this block
346  if (watched_blocks_number_ == header.number) {
347  watched_blocks_.emplace(block_hash, std::move(handler));
348  }
349 
350  // If parent of provided block is in chain, start to load it immediately
351  bool parent_is_known =
352  known_blocks_.find(header.parent_hash) != known_blocks_.end()
353  or block_tree_->getBlockHeader(header.parent_hash).has_value();
354 
355  if (parent_is_known) {
356  loadBlocks(peer_id, block_info, [wp = weak_from_this()](auto res) {
357  if (auto self = wp.lock()) {
358  SL_TRACE(self->log_, "Block(s) enqueued to apply by announce");
359  }
360  });
361  return true;
362  }
363 
364  // Otherwise, is using base way to enqueue
365  return syncByBlockInfo(
366  block_info,
367  peer_id,
368  [wp = weak_from_this()](auto res) {
369  if (auto self = wp.lock()) {
370  SL_TRACE(self->log_, "Block(s) enqueued to load by announce");
371  }
372  },
373  false);
374  }
375 
377  const PeerId &peer_id,
378  primitives::BlockInfo target_block,
379  std::optional<uint32_t> limit,
381  if (busy_peers_.find(peer_id) != busy_peers_.end()) {
382  SL_DEBUG(
383  log_,
384  "Justifications load since block {} was rescheduled, peer {} is busy",
385  target_block,
386  peer_id);
387  scheduler_->schedule([wp = weak_from_this(),
388  peer_id,
389  block = std::move(target_block),
390  limit = std::move(limit),
391  handler = std::move(handler)]() mutable {
392  auto self = wp.lock();
393  if (not self) {
394  return;
395  }
396  self->syncMissingJustifications(
397  peer_id, std::move(block), std::move(limit), std::move(handler));
398  });
399  return;
400  }
401 
403  peer_id, std::move(target_block), std::move(limit), std::move(handler));
404  }
405 
407  const libp2p::peer::PeerId &peer_id,
411  SyncResultHandler &&handler,
412  std::map<primitives::BlockNumber, primitives::BlockHash> &&observed) {
413  // Interrupts process if node is shutting down
415  handler(Error::SHUTTING_DOWN);
416  return;
417  }
418 
420  hint,
422  1};
423 
424  auto request_fingerprint = request.fingerprint();
425 
426  if (not recent_requests_.emplace(peer_id, request_fingerprint).second) {
427  SL_VERBOSE(
428  log_,
429  "Can't check if block #{} in #{}..#{} is common with {}: {}",
430  hint,
431  lower,
432  upper - 1,
433  peer_id,
434  outcome::result<void>(Error::DUPLICATE_REQUEST).error().message());
435  handler(Error::DUPLICATE_REQUEST);
436  return;
437  }
438 
439  scheduleRecentRequestRemoval(peer_id, request_fingerprint);
440 
441  auto response_handler = [wp = weak_from_this(),
442  lower,
443  upper,
444  target = hint,
445  peer_id,
446  handler = std::move(handler),
447  observed = std::move(observed),
448  request_fingerprint](auto &&response_res) mutable {
449  auto self = wp.lock();
450  if (not self) {
451  return;
452  }
453 
454  // Any error interrupts finding common block
455  if (response_res.has_error()) {
456  SL_VERBOSE(self->log_,
457  "Can't check if block #{} in #{}..#{} is common with {}: {}",
458  target,
459  lower,
460  upper - 1,
461  peer_id,
462  response_res.error().message());
463  handler(response_res.as_failure());
464  return;
465  }
466  auto &blocks = response_res.value().blocks;
467 
468  // No block in response is abnormal situation. Requested block must be
469  // existed because finding in interval of numbers of blocks that must
470  // exist
471  if (blocks.empty()) {
472  SL_VERBOSE(self->log_,
473  "Can't check if block #{} in #{}..#{} is common with {}: "
474  "Response does not have any blocks",
475  target,
476  lower,
477  upper - 1,
478  peer_id);
479  handler(Error::EMPTY_RESPONSE);
480  self->recent_requests_.erase(std::tuple(peer_id, request_fingerprint));
481  return;
482  }
483 
484  auto hash = blocks.front().hash;
485 
486  observed.emplace(target, hash);
487 
488  for (;;) {
489  // Check if block is known (is already enqueued or is in block tree)
490  bool block_is_known =
491  self->known_blocks_.find(hash) != self->known_blocks_.end()
492  or self->block_tree_->getBlockHeader(hash).has_value();
493 
494  // Interval of finding is totally narrowed. Common block should be found
495  if (target == lower) {
496  if (block_is_known) {
497  // Common block is found
498  SL_DEBUG(self->log_,
499  "Found best common block with {}: {}",
500  peer_id,
501  BlockInfo(target, hash));
502  handler(BlockInfo(target, hash));
503  return;
504  }
505 
506  // Common block is not found. It is abnormal situation. Requested
507  // block must be existed because finding in interval of numbers of
508  // blocks that must exist
509  SL_WARN(self->log_, "Not found any common block with {}", peer_id);
510  handler(Error::EMPTY_RESPONSE);
511  return;
512  }
513 
515 
516  // Narrowing interval for next iteration
517  if (block_is_known) {
518  SL_TRACE(self->log_,
519  "Block {} of {} is found locally",
520  BlockInfo(target, hash),
521  peer_id);
522 
523  // Narrowing interval to continue above
524  lower = target;
525  hint = lower + (upper - lower) / 2;
526  } else {
527  SL_TRACE(self->log_,
528  "Block {} of {} is not found locally",
529  BlockInfo(target, hash),
530  peer_id,
531  lower,
532  upper - 1);
533 
534  // Step for next iteration
535  auto step = upper - target;
536 
537  // Narrowing interval to continue below
538  upper = target;
539  hint = upper - std::min(step, (upper - lower) / 2);
540  }
541  hint = lower + (upper - lower) / 2;
542 
543  // Try again with narrowed interval
544 
545  auto it = observed.find(hint);
546 
547  // This block number was observed early
548  if (it != observed.end()) {
549  target = hint;
550  hash = it->second;
551 
552  SL_TRACE(
553  self->log_,
554  "Block {} of {} is already observed. Continue without request",
555  BlockInfo(target, hash),
556  peer_id);
557  continue;
558  }
559 
560  // This block number has not observed yet
561  self->findCommonBlock(peer_id,
562  lower,
563  upper,
564  hint,
565  std::move(handler),
566  std::move(observed));
567  break;
568  }
569  };
570 
571  SL_TRACE(log_,
572  "Check if block #{} in #{}..#{} is common with {}",
573  hint,
574  lower,
575  upper - 1,
576  peer_id);
577 
578  auto protocol = router_->getSyncProtocol();
579  BOOST_ASSERT_MSG(protocol, "Router did not provide sync protocol");
580  protocol->request(peer_id, std::move(request), std::move(response_handler));
581  }
582 
585  SyncResultHandler &&handler) {
586  // Interrupts process if node is shutting down
588  if (handler) handler(Error::SHUTTING_DOWN);
589  return;
590  }
591 
592  network::BlocksRequest request{attributesForSync(sync_method_),
593  from.hash,
595  std::nullopt};
596 
597  auto request_fingerprint = request.fingerprint();
598 
599  if (not recent_requests_.emplace(peer_id, request_fingerprint).second) {
600  SL_ERROR(
601  log_,
602  "Can't load blocks from {} beginning block {}: {}",
603  peer_id,
604  from,
605  outcome::result<void>(Error::DUPLICATE_REQUEST).error().message());
606  if (handler) handler(Error::DUPLICATE_REQUEST);
607  return;
608  }
609 
610  scheduleRecentRequestRemoval(peer_id, request_fingerprint);
611 
612  auto response_handler = [wp = weak_from_this(),
613  from,
614  peer_id,
615  handler = std::move(handler),
616  parent_hash = primitives::BlockHash{}](
617  auto &&response_res) mutable {
618  auto self = wp.lock();
619  if (not self) {
620  return;
621  }
622 
623  // Any error interrupts loading of blocks
624  if (response_res.has_error()) {
625  SL_ERROR(self->log_,
626  "Can't load blocks from {} beginning block {}: {}",
627  peer_id,
628  from,
629  response_res.error().message());
630  if (handler) handler(response_res.as_failure());
631  return;
632  }
633  auto &blocks = response_res.value().blocks;
634 
635  // No block in response is abnormal situation.
636  // At least one starting block should be returned as existing
637  if (blocks.empty()) {
638  SL_ERROR(self->log_,
639  "Can't load blocks from {} beginning block {}: "
640  "Response does not have any blocks",
641  peer_id,
642  from);
643  if (handler) handler(Error::EMPTY_RESPONSE);
644  return;
645  }
646 
647  SL_TRACE(self->log_,
648  "{} blocks are loaded from {} beginning block {}",
649  blocks.size(),
650  peer_id,
651  from);
652 
653  bool some_blocks_added = false;
654  primitives::BlockInfo last_loaded_block;
655 
656  for (auto &block : blocks) {
657  // Check if header is provided
658  if (not block.header.has_value()) {
659  SL_ERROR(self->log_,
660  "Can't load blocks from {} starting from block {}: "
661  "Received block without header",
662  peer_id,
663  from);
664  if (handler) handler(Error::RESPONSE_WITHOUT_BLOCK_HEADER);
665  return;
666  }
667  // Check if body is provided
668  if (not block.header.has_value()) {
669  SL_ERROR(self->log_,
670  "Can't load blocks from {} starting from block {}: "
671  "Received block without body",
672  peer_id,
673  from);
674  if (handler) handler(Error::RESPONSE_WITHOUT_BLOCK_BODY);
675  return;
676  }
677  auto &header = block.header.value();
678 
679  const auto &last_finalized_block =
680  self->block_tree_->getLastFinalized();
681 
682  // Check by number if block is not finalized yet
683  if (last_finalized_block.number >= header.number) {
684  if (last_finalized_block.number == header.number) {
685  if (last_finalized_block.hash != block.hash) {
686  SL_ERROR(self->log_,
687  "Can't load blocks from {} starting from block {}: "
688  "Received discarded block {}",
689  peer_id,
690  from,
691  BlockInfo(header.number, block.hash));
692  if (handler) handler(Error::DISCARDED_BLOCK);
693  return;
694  }
695 
696  SL_TRACE(self->log_,
697  "Skip block {} received from {}: "
698  "it is finalized with block #{}",
699  BlockInfo(header.number, block.hash),
700  peer_id,
701  last_finalized_block.number);
702  continue;
703  }
704 
705  SL_TRACE(self->log_,
706  "Skip block {} received from {}: "
707  "it is below the last finalized block #{}",
708  BlockInfo(header.number, block.hash),
709  peer_id,
710  last_finalized_block.number);
711  continue;
712  }
713 
714  // Check if block is not discarded
715  if (last_finalized_block.number + 1 == header.number) {
716  if (last_finalized_block.hash != header.parent_hash) {
717  SL_ERROR(self->log_,
718  "Can't complete blocks loading from {} starting from "
719  "block {}: Received discarded block {}",
720  peer_id,
721  from,
722  BlockInfo(header.number, header.parent_hash));
723  if (handler) handler(Error::DISCARDED_BLOCK);
724  return;
725  }
726 
727  // Start to check parents
728  parent_hash = header.parent_hash;
729  }
730 
731  // Check if block is in chain
732  static const primitives::BlockHash zero_hash;
733  if (parent_hash != header.parent_hash && parent_hash != zero_hash) {
734  SL_ERROR(self->log_,
735  "Can't complete blocks loading from {} starting from "
736  "block {}: Received block is not descendant of previous",
737  peer_id,
738  from);
739  if (handler) handler(Error::WRONG_ORDER);
740  return;
741  }
742 
743  // Check if hash is valid
744  auto calculated_hash =
745  self->hasher_->blake2b_256(scale::encode(header).value());
746  if (block.hash != calculated_hash) {
747  SL_ERROR(self->log_,
748  "Can't complete blocks loading from {} starting from "
749  "block {}: "
750  "Received block whose hash does not match the header",
751  peer_id,
752  from);
753  if (handler) handler(Error::INVALID_HASH);
754  return;
755  }
756 
757  last_loaded_block = {header.number, block.hash};
758 
759  parent_hash = block.hash;
760 
761  // Add block in queue and save peer or just add peer for existing record
762  auto it = self->known_blocks_.find(block.hash);
763  if (it == self->known_blocks_.end()) {
764  self->known_blocks_.emplace(block.hash, KnownBlock{block, {peer_id}});
765  self->metric_import_queue_length_->set(self->known_blocks_.size());
766  } else {
767  it->second.peers.emplace(peer_id);
768  SL_TRACE(self->log_,
769  "Skip block {} received from {}: already enqueued",
770  BlockInfo(header.number, block.hash),
771  peer_id);
772  continue;
773  }
774 
775  SL_TRACE(self->log_,
776  "Enqueue block {} received from {}",
777  BlockInfo(header.number, block.hash),
778  peer_id);
779 
780  self->generations_.emplace(header.number, block.hash);
781  self->ancestry_.emplace(header.parent_hash, block.hash);
782 
783  some_blocks_added = true;
784  }
785 
786  SL_TRACE(self->log_, "Block loading is finished");
787  if (handler) {
788  handler(last_loaded_block);
789  }
790 
791  if (some_blocks_added) {
792  SL_TRACE(self->log_, "Enqueued some new blocks: schedule applying");
793  self->scheduler_->schedule([wp] {
794  if (auto self = wp.lock()) {
795  self->applyNextBlock();
796  }
797  });
798  }
799  };
800 
801  auto protocol = router_->getSyncProtocol();
802  BOOST_ASSERT_MSG(protocol, "Router did not provide sync protocol");
803  protocol->request(peer_id, std::move(request), std::move(response_handler));
804  }
805 
807  primitives::BlockInfo target_block,
808  std::optional<uint32_t> limit,
809  SyncResultHandler &&handler) {
811  if (handler) handler(Error::SHUTTING_DOWN);
812  return;
813  }
814 
815  busy_peers_.insert(peer_id);
816  auto cleanup = gsl::finally([this, peer_id] {
817  auto peer = busy_peers_.find(peer_id);
818  if (peer != busy_peers_.end()) {
819  busy_peers_.erase(peer);
820  }
821  });
822 
823  BlocksRequest request{
825  target_block.hash,
827  limit};
828 
829  auto request_fingerprint = request.fingerprint();
830  if (not recent_requests_.emplace(peer_id, request_fingerprint).second) {
831  SL_ERROR(
832  log_,
833  "Can't load justification from {} for block {}: {}",
834  peer_id,
835  target_block,
836  outcome::result<void>(Error::DUPLICATE_REQUEST).error().message());
837  if (handler) {
838  handler(Error::DUPLICATE_REQUEST);
839  }
840  return;
841  }
842 
843  scheduleRecentRequestRemoval(peer_id, request_fingerprint);
844 
845  auto response_handler = [wp = weak_from_this(),
846  peer_id,
847  target_block,
848  handler = std::move(handler)](
849  auto &&response_res) mutable {
850  auto self = wp.lock();
851  if (not self) {
852  return;
853  }
854 
855  if (response_res.has_error()) {
856  SL_ERROR(self->log_,
857  "Can't load justification from {} for block {}: {}",
858  peer_id,
859  target_block,
860  response_res.error().message());
861  if (handler) {
862  handler(response_res.as_failure());
863  }
864  return;
865  }
866 
867  auto &blocks = response_res.value().blocks;
868 
869  if (blocks.empty()) {
870  SL_ERROR(self->log_,
871  "Can't load block justification from {} for block {}: "
872  "Response does not have any contents",
873  peer_id,
874  target_block);
875  if (handler) handler(Error::EMPTY_RESPONSE);
876  return;
877  }
878 
879  bool justification_received = false;
880  BlockInfo last_justified_block;
881  for (auto &block : blocks) {
882  if (not block.header) {
883  SL_ERROR(self->log_,
884  "No header was provided from {} for block {} while "
885  "requesting justifications",
886  peer_id,
887  target_block);
888  if (handler) handler(Error::RESPONSE_WITHOUT_BLOCK_HEADER);
889  return;
890  }
891  if (block.justification) {
892  justification_received = true;
893  last_justified_block =
894  primitives::BlockInfo{block.header->number, block.hash};
895  {
896  std::lock_guard lock(self->justifications_mutex_);
897  self->justifications_.emplace(last_justified_block,
898  *block.justification);
899  }
900  }
901  }
902 
903  if (justification_received) {
904  SL_TRACE(self->log_, "Enqueued new justifications: schedule applying");
905  self->scheduler_->schedule([wp] {
906  if (auto self = wp.lock()) {
907  self->applyNextJustification();
908  }
909  });
910  }
911  if (handler) {
912  handler(last_justified_block);
913  }
914  };
915 
916  auto protocol = router_->getSyncProtocol();
917  BOOST_ASSERT_MSG(protocol, "Router did not provide sync protocol");
918  protocol->request(peer_id, std::move(request), std::move(response_handler));
919  }
920 
922  const primitives::BlockInfo &block,
923  SyncResultHandler &&handler) {
924  if (state_sync_request_.has_value()
925  and state_sync_request_->hash != block.hash) {
926  SL_WARN(log_,
927  "SyncState was not requested to {}: "
928  "state sync for other block is not completed yet",
929  peer_id);
930  return;
931  }
932 
933  bool bool_val = false;
934  if (not state_sync_request_in_progress_.compare_exchange_strong(bool_val,
935  true)) {
936  SL_TRACE(log_,
937  "State sync request was not sent to {} for block {}: "
938  "previous request in progress",
939  peer_id,
940  block);
941  return;
942  }
943 
944  if (not state_sync_request_.has_value()) {
945  SL_INFO(log_, "Sync of state for block {} has started", block);
946  }
947 
948  SL_TRACE(
949  log_, "State sync request has sent to {} for block {}", peer_id, block);
950 
951  auto request = state_sync_request_.value_or(
952  network::StateRequest{block.hash, {{}}, true});
953 
954  auto protocol = router_->getStateProtocol();
955  BOOST_ASSERT_MSG(protocol, "Router did not provide state protocol");
956 
957  auto response_handler =
958  [wp = weak_from_this(), block, peer_id, handler = std::move(handler)](
959  auto &&response_res) mutable {
960  auto self = wp.lock();
961  if (not self) {
962  return;
963  }
964 
965  // Request failed
966  if (response_res.has_error()) {
967  self->state_sync_request_in_progress_ = false;
968 
969  SL_WARN(self->log_,
970  "State syncing failed with error: {}",
971  response_res.error().message());
972  if (handler) handler(response_res.as_failure());
973  return;
974  }
975 
976  // Processing of response
977  for (unsigned i = 0; i < response_res.value().entries.size(); ++i) {
978  const auto &state_entry = response_res.value().entries[i];
979 
980  // get or create batch
981  auto it = self->batches_store_.find(state_entry.state_root);
982  auto batch = (it != self->batches_store_.end())
983  ? std::get<2>(it->second)
984  : self->storage_
985  ->getPersistentBatchAt(
986  self->serializer_->getEmptyRootHash())
987  .value();
988 
989  // main storage entries size empty at child storage state syncing
990  if (state_entry.entries.size()) {
991  SL_TRACE(self->log_,
992  "Syncing {}th item. Current key {}. Keys received {}.",
993  i,
994  state_entry.entries[0].key.toHex(),
995  state_entry.entries.size());
996  for (const auto &entry : state_entry.entries) {
997  std::ignore = batch->put(entry.key, entry.value);
998  }
999 
1000  // store batch to continue at next state_entry
1001  if (!state_entry.complete) {
1002  self->batches_store_[state_entry.state_root] = {
1003  state_entry.entries.back().key, i, batch};
1004  } else {
1005  self->batches_store_.erase(state_entry.state_root);
1006  }
1007  }
1008 
1009  // Handle completion of syncing
1010  if (state_entry.complete) {
1011  auto res = batch->commit();
1012  if (res.has_value()) {
1013  const auto &expected = [&] {
1014  if (i != 0) { // Child state
1015  return state_entry.state_root;
1016  } else { // Main state
1017  auto header_res =
1018  self->block_tree_->getBlockHeader(block.hash);
1019  BOOST_ASSERT_MSG(header_res.has_value(),
1020  "It is state of existing block; head "
1021  "must be existing");
1022  const auto &header = header_res.value();
1023  return header.state_root;
1024  }
1025  }();
1026  const auto &actual = res.value();
1027 
1028  if (actual == expected) {
1029  SL_INFO(self->log_,
1030  "Syncing of {}state on block {} has finished. "
1031  "Root hashes match: {}",
1032  i != 0 ? "child " : "",
1033  block,
1034  actual);
1035  } else {
1036  SL_WARN(self->log_,
1037  "Syncing of {}state on block {} has finished. "
1038  "Root hashes mismatch: expected={}, actual={}",
1039  i != 0 ? "child " : "",
1040  block,
1041  expected,
1042  actual);
1043  }
1044  }
1045 
1046  self->trie_changes_tracker_->onBlockAdded(block.hash);
1047  }
1048 
1049  // just calculate state entries in main storage for trace log
1050  if (!i) {
1051  self->entries_ += state_entry.entries.size();
1052  }
1053  }
1054 
1055  // not well-formed way to place 0th batch key to front
1056  std::map<unsigned, common::Buffer> keymap;
1057  for (const auto &[_, val] : self->batches_store_) {
1058  unsigned i = std::get<1>(val);
1059  keymap[i] = std::get<0>(val);
1060  SL_TRACE(self->log_, "Index: {}, Key: {}", i, keymap[i]);
1061  }
1062 
1063  std::vector<common::Buffer> keys;
1064  for (const auto &[_, val] : keymap) {
1065  keys.push_back(val);
1066  }
1067 
1068  if (not response_res.value().entries[0].complete) {
1069  SL_TRACE(self->log_,
1070  "State syncing continues. {} entries loaded",
1071  self->entries_);
1072  // Save data of incomplete state sync once
1073  if (not self->state_sync_on_block_.has_value()) {
1074  auto res = self->buffer_storage_->put(
1076  common::Buffer(scale::encode(block).value()));
1077  if (res.has_error()) {
1078  SL_WARN(self->log_,
1079  "Can't save data of incomplete state sync: {}",
1080  res.error());
1081  }
1082  self->state_sync_on_block_.emplace(block);
1083  }
1084  self->state_sync_request_ =
1085  StateRequest{block.hash, std::move(keys), true};
1086  self->state_sync_request_in_progress_ = false;
1087  self->syncState(peer_id, block, std::move(handler));
1088  return;
1089  }
1090 
1091  // State syncing has completed; Switch to the full syncing
1093  // Forget saved data of incomplete state sync
1094  auto res = self->buffer_storage_->remove(
1096  if (res.has_error()) {
1097  SL_WARN(self->log_,
1098  "Can't remove data of incomplete state sync: {}",
1099  res.error());
1100  }
1101  self->state_sync_request_.reset();
1102  self->state_sync_on_block_.reset();
1103  self->state_sync_request_in_progress_ = false;
1104  if (handler) {
1105  handler(block);
1106  }
1107  };
1108 
1109  protocol->request(peer_id, std::move(request), std::move(response_handler));
1110  }
1111 
1113  if (generations_.empty()) {
1114  SL_TRACE(log_, "No block for applying");
1115  return;
1116  }
1117 
1118  bool false_val = false;
1119  if (not applying_in_progress_.compare_exchange_strong(false_val, true)) {
1120  SL_TRACE(log_, "Applying in progress");
1121  return;
1122  }
1123  SL_TRACE(log_, "Begin applying");
1124  auto cleanup = gsl::finally([this] {
1125  SL_TRACE(log_, "End applying");
1126  applying_in_progress_ = false;
1127  });
1128 
1129  primitives::BlockHash hash;
1130 
1131  while (true) {
1132  auto generation_node = generations_.extract(generations_.begin());
1133  if (generation_node) {
1134  hash = generation_node.mapped();
1135  break;
1136  }
1137  if (generations_.empty()) {
1138  SL_TRACE(log_, "No block for applying");
1139  return;
1140  }
1141  }
1142 
1143  auto node = known_blocks_.extract(hash);
1144  if (node) {
1145  auto &block = node.mapped().data;
1146  BOOST_ASSERT(block.header.has_value());
1147  const BlockInfo block_info(block.header->number, block.hash);
1148 
1149  const auto &last_finalized_block = block_tree_->getLastFinalized();
1150 
1151  SyncResultHandler handler;
1152 
1153  if (watched_blocks_number_ == block.header->number) {
1154  if (auto wbn_node = watched_blocks_.extract(hash)) {
1155  handler = std::move(wbn_node.mapped());
1156  }
1157  }
1158 
1159  // Skip applied and finalized blocks and
1160  // discard side-chain below last finalized
1161  if (block.header->number <= last_finalized_block.number) {
1162  auto header_res = block_tree_->getBlockHeader(hash);
1163  if (not header_res.has_value()) {
1164  auto n = discardBlock(block.hash);
1165  SL_WARN(
1166  log_,
1167  "Block {} {} not applied as discarded",
1168  block_info,
1169  n ? fmt::format("and {} others have", n) : fmt::format("has"));
1170  if (handler) handler(Error::DISCARDED_BLOCK);
1171  }
1172 
1173  } else {
1174  outcome::result<void> applying_res = outcome::success();
1175 
1177  // Regular syncing
1178  applying_res = block_executor_->applyBlock(std::move(block));
1179 
1180  } else {
1181  // Fast syncing
1182  if (not state_sync_request_.has_value()) {
1183  // Headers loading
1184  applying_res = block_appender_->appendBlock(std::move(block));
1185 
1186  } else {
1187  // State syncing in progress; Temporary discard all new blocks
1188  auto n = discardBlock(block.hash);
1189  SL_WARN(
1190  log_,
1191  "Block {} {} not applied as discarded: "
1192  "state syncing on block in progress",
1193  block_info,
1194  n ? fmt::format("and {} others have", n) : fmt::format("has"));
1195  if (handler) handler(Error::DISCARDED_BLOCK);
1196  return;
1197  }
1198  }
1199 
1200  notifySubscribers(block_info, applying_res);
1201 
1202  if (not applying_res.has_value()) {
1203  if (applying_res
1204  != outcome::failure(blockchain::BlockTreeError::BLOCK_EXISTS)) {
1205  notifySubscribers(block_info, applying_res.as_failure());
1206  auto n = discardBlock(block.hash);
1207  SL_WARN(
1208  log_,
1209  "Block {} {} been discarded: {}",
1210  block_info,
1211  n ? fmt::format("and {} others have", n) : fmt::format("has"),
1212  applying_res.error().message());
1213  if (handler) handler(Error::DISCARDED_BLOCK);
1214  } else {
1215  SL_DEBUG(log_, "Block {} is skipped as existing", block_info);
1216  if (handler) handler(block_info);
1217  }
1218  } else {
1219  telemetry_->notifyBlockImported(
1221  if (handler) handler(block_info);
1222  }
1223  }
1224  }
1225  ancestry_.erase(hash);
1226 
1227  auto minPreloadedBlockAmount =
1231 
1232  if (known_blocks_.size() < minPreloadedBlockAmount) {
1233  SL_TRACE(log_,
1234  "{} blocks in queue: ask next portion of block",
1235  known_blocks_.size());
1237  } else {
1238  SL_TRACE(log_, "{} blocks in queue", known_blocks_.size());
1239  }
1241  scheduler_->schedule([wp = weak_from_this()] {
1242  if (auto self = wp.lock()) {
1243  self->applyNextBlock();
1244  }
1245  });
1246  }
1247 
1249  // Operate over the same lock as for the whole blocks application
1250  bool false_val = false;
1251  if (not applying_in_progress_.compare_exchange_strong(false_val, true)) {
1252  SL_TRACE(log_, "Applying justification in progress");
1253  return;
1254  }
1255  SL_TRACE(log_, "Begin justification applying");
1256  auto cleanup = gsl::finally([this] {
1257  SL_TRACE(log_, "End justification applying");
1258  applying_in_progress_ = false;
1259  });
1260 
1261  std::queue<JustificationPair> justifications;
1262  {
1263  std::lock_guard lock(justifications_mutex_);
1264  justifications.swap(justifications_);
1265  }
1266 
1267  while (not justifications.empty()) {
1268  auto [block_info, justification] = std::move(justifications.front());
1269  const auto &block = block_info; // SL_WARN compilation WA
1270  justifications.pop();
1271  auto res = block_executor_->applyJustification(block_info, justification);
1272  if (res.has_error()) {
1273  SL_WARN(log_,
1274  "Justification for block {} was not applied: {}",
1275  block,
1276  res.error().message());
1277  } else {
1278  SL_TRACE(log_, "Applied justification for block {}", block);
1279  }
1280  }
1281  }
1282 
1284  const primitives::BlockHash &hash_of_discarding_block) {
1285  std::queue<primitives::BlockHash> queue;
1286  queue.emplace(hash_of_discarding_block);
1287 
1288  size_t affected = 0;
1289  while (not queue.empty()) {
1290  const auto &hash = queue.front();
1291 
1292  if (auto it = known_blocks_.find(hash); it != known_blocks_.end()) {
1293  auto number = it->second.data.header->number;
1294  notifySubscribers({number, hash}, Error::DISCARDED_BLOCK);
1295 
1296  known_blocks_.erase(it);
1297  affected++;
1298  }
1299 
1300  auto range = ancestry_.equal_range(hash);
1301  for (auto it = range.first; it != range.second; ++it) {
1302  queue.emplace(it->second);
1303  }
1304  ancestry_.erase(range.first, range.second);
1305 
1306  queue.pop();
1307  }
1308 
1310  return affected;
1311  }
1312 
1313  void SynchronizerImpl::prune(const primitives::BlockInfo &finalized_block) {
1314  // Remove blocks whose numbers less finalized one
1315  while (not generations_.empty()) {
1316  auto generation_node = generations_.extract(generations_.begin());
1317  if (generation_node) {
1318  const auto &number = generation_node.key();
1319  if (number >= finalized_block.number) {
1320  break;
1321  }
1322  const auto &hash = generation_node.mapped();
1323  notifySubscribers({number, hash}, Error::DISCARDED_BLOCK);
1324 
1325  known_blocks_.erase(hash);
1326  ancestry_.erase(hash);
1327  }
1328  }
1329 
1330  // Remove blocks whose numbers equal finalized one, excluding finalized
1331  // one
1332  auto range = generations_.equal_range(finalized_block.number);
1333  for (auto it = range.first; it != range.second;) {
1334  auto cit = it++;
1335  const auto &hash = cit->second;
1336  if (hash != finalized_block.hash) {
1337  discardBlock(hash);
1338  }
1339  }
1340 
1342  }
1343 
1345  const libp2p::peer::PeerId &peer_id,
1346  const BlocksRequest::Fingerprint &fingerprint) {
1347  scheduler_->schedule(
1348  [wp = weak_from_this(), peer_id, fingerprint] {
1349  if (auto self = wp.lock()) {
1350  self->recent_requests_.erase(std::tuple(peer_id, fingerprint));
1351  }
1352  },
1354  }
1355 
1357  bool false_val = false;
1358  if (not asking_blocks_portion_in_progress_.compare_exchange_strong(
1359  false_val, true)) {
1360  SL_TRACE(log_, "Asking portion of blocks in progress");
1361  return;
1362  }
1363  SL_TRACE(log_, "Begin asking portion of blocks");
1364 
1365  for (auto g_it = generations_.rbegin(); g_it != generations_.rend();
1366  ++g_it) {
1367  const auto &hash = g_it->second;
1368 
1369  auto b_it = known_blocks_.find(hash);
1370  if (b_it == known_blocks_.end()) {
1371  SL_TRACE(log_,
1372  "Block {} is unknown. Go to next one",
1373  primitives::BlockInfo(g_it->first, hash));
1374  continue;
1375  }
1376 
1377  primitives::BlockInfo block_info(g_it->first, hash);
1378 
1379  auto &peers = b_it->second.peers;
1380  if (peers.empty()) {
1381  SL_TRACE(
1382  log_, "Block {} don't have any peer. Go to next one", block_info);
1383  continue;
1384  }
1385 
1386  for (auto p_it = peers.begin(); p_it != peers.end();) {
1387  auto cp_it = p_it++;
1388 
1389  auto peer_id = *cp_it;
1390 
1391  if (busy_peers_.find(peer_id) != busy_peers_.end()) {
1392  SL_TRACE(log_,
1393  "Peer {} for block {} is busy",
1394  peer_id,
1395  primitives::BlockInfo(g_it->first, hash));
1396  continue;
1397  }
1398 
1399  busy_peers_.insert(peers.extract(cp_it));
1400  SL_TRACE(log_, "Peer {} marked as busy", peer_id);
1401 
1402  auto handler = [wp = weak_from_this(), peer_id](const auto &res) {
1403  if (auto self = wp.lock()) {
1404  if (self->busy_peers_.erase(peer_id) > 0) {
1405  SL_TRACE(self->log_, "Peer {} unmarked as busy", peer_id);
1406  }
1407  SL_TRACE(self->log_, "End asking portion of blocks");
1408  self->asking_blocks_portion_in_progress_ = false;
1409  if (not res.has_value()) {
1410  SL_DEBUG(self->log_,
1411  "Loading next portion of blocks from {} is failed: {}",
1412  peer_id,
1413  res.error().message());
1414  return;
1415  }
1416  SL_DEBUG(self->log_,
1417  "Portion of blocks from {} is loaded till {}",
1418  peer_id,
1419  res.value());
1420  if (self->known_blocks_.empty()) {
1421  self->askNextPortionOfBlocks();
1422  }
1423  }
1424  };
1425 
1427  auto lower = generations_.begin()->first;
1428  auto upper = generations_.rbegin()->first + 1;
1429  auto hint = generations_.rbegin()->first;
1430 
1431  SL_DEBUG(
1432  log_,
1433  "Start to find common block with {} in #{}..#{} to fill queue",
1434  peer_id,
1435  generations_.begin()->first,
1436  generations_.rbegin()->first);
1438  peer_id,
1439  lower,
1440  upper,
1441  hint,
1442  [wp = weak_from_this(), peer_id, handler = std::move(handler)](
1443  outcome::result<primitives::BlockInfo> res) {
1444  if (auto self = wp.lock()) {
1445  if (not res.has_value()) {
1446  SL_DEBUG(self->log_,
1447  "Can't load next portion of blocks from {}: {}",
1448  peer_id,
1449  res.error().message());
1450  handler(res);
1451  return;
1452  }
1453  auto &common_block_info = res.value();
1454  SL_DEBUG(self->log_,
1455  "Start to load next portion of blocks from {} "
1456  "since block {}",
1457  peer_id,
1458  common_block_info);
1459  self->loadBlocks(
1460  peer_id, common_block_info, std::move(handler));
1461  }
1462  });
1463  } else {
1464  SL_DEBUG(log_,
1465  "Start to load next portion of blocks from {} "
1466  "since block {}",
1467  peer_id,
1468  block_info);
1469  loadBlocks(peer_id, block_info, std::move(handler));
1470  }
1471  return;
1472  }
1473 
1474  SL_TRACE(log_,
1475  "Block {} doesn't have appropriate peer. Go to next one",
1476  primitives::BlockInfo(g_it->first, hash));
1477  }
1478 
1479  SL_TRACE(log_, "End asking portion of blocks: none");
1481  }
1482 
1483 } // namespace kagome::network
Class represents arbitrary (including empty) byte buffer.
Definition: buffer.hpp:29
std::shared_ptr< network::Router > router_
size_t discardBlock(const primitives::BlockHash &block)
Include a justification for the block.
virtual void set(double val)=0
Set the gauge to the given value.
void askNextPortionOfBlocks()
Tries to request another portion of block.
void applyNextJustification()
Pops next justification from queue and tries to apply it.
application::AppConfiguration::SyncMethod sync_method_
static constexpr size_t kMinPreloadedBlockAmountForFastSyncing
std::unordered_map< primitives::BlockHash, KnownBlock > known_blocks_
SynchronizerImpl(const application::AppConfiguration &app_config, std::shared_ptr< application::AppStateManager > app_state_manager, std::shared_ptr< blockchain::BlockTree > block_tree, std::shared_ptr< storage::changes_trie::ChangesTracker > changes_tracker, std::shared_ptr< consensus::BlockAppender > block_appender, std::shared_ptr< consensus::BlockExecutor > block_executor, std::shared_ptr< storage::trie::TrieSerializer > serializer, std::shared_ptr< storage::trie::TrieStorage > storage, std::shared_ptr< network::Router > router, std::shared_ptr< libp2p::basic::Scheduler > scheduler, std::shared_ptr< crypto::Hasher > hasher, std::shared_ptr< storage::BufferStorage > buffer_storage)
std::shared_ptr< storage::trie::TrieSerializer > serializer_
void findCommonBlock(const libp2p::peer::PeerId &peer_id, primitives::BlockNumber lower, primitives::BlockNumber upper, primitives::BlockNumber hint, SyncResultHandler &&handler, std::map< primitives::BlockNumber, primitives::BlockHash > &&observed={})
std::shared_ptr< blockchain::BlockTree > block_tree_
std::shared_ptr< libp2p::basic::Scheduler > scheduler_
STL namespace.
std::shared_ptr< storage::changes_trie::ChangesTracker > trie_changes_tracker_
Block is part of the initial sync with the network.
std::unordered_multimap< primitives::BlockHash, primitives::BlockHash > ancestry_
void syncState(const libp2p::peer::PeerId &peer_id, const primitives::BlockInfo &block, SyncResultHandler &&handler) override
primitives::BlockInfo BlockInfo
Definition: structs.hpp:29
std::optional< primitives::BlockInfo > state_sync_on_block_
OUTCOME_CPP_DEFINE_CATEGORY(kagome::network, SynchronizerImpl::Error, e)
std::atomic_bool asking_blocks_portion_in_progress_
uint32_t BlockNumber
Definition: common.hpp:18
std::set< libp2p::peer::PeerId > busy_peers_
libp2p::peer::PeerId PeerId
std::shared_ptr< application::AppStateManager > app_state_manager_
std::optional< network::StateRequest > state_sync_request_
const common::Buffer kBlockOfIncompleteSyncStateLookupKey
void scheduleRecentRequestRemoval(const libp2p::peer::PeerId &peer_id, const BlocksRequest::Fingerprint &fingerprint)
void loadJustifications(const libp2p::peer::PeerId &peer_id, primitives::BlockInfo target_block, std::optional< uint32_t > limit, SyncResultHandler &&handler)
bool subscribeToBlock(const primitives::BlockInfo &block_info, SyncResultHandler &&handler)
std::set< std::tuple< libp2p::peer::PeerId, BlocksRequest::Fingerprint > > recent_requests_
std::function< void(outcome::result< primitives::BlockInfo >)> SyncResultHandler
std::shared_ptr< storage::BufferStorage > buffer_storage_
void loadBlocks(const libp2p::peer::PeerId &peer_id, primitives::BlockInfo from, SyncResultHandler &&handler)
std::shared_ptr< storage::trie::TrieStorage > storage_
std::shared_ptr< consensus::BlockAppender > block_appender_
void prune(const primitives::BlockInfo &finalized_block)
std::multimap< primitives::BlockInfo, SyncResultHandler > subscriptions_
void syncMissingJustifications(const PeerId &peer_id, primitives::BlockInfo target_block, std::optional< uint32_t > limit, SyncResultHandler &&handler) override
static constexpr size_t kMinPreloadedBlockAmount
virtual SyncMethod syncMethod() const =0
static constexpr size_t kMaxDistanceToBlockForSubscription
BlockNumber number
index of the block in the chain
std::shared_ptr< crypto::Hasher > hasher_
bool syncByBlockInfo(const primitives::BlockInfo &block_info, const libp2p::peer::PeerId &peer_id, SyncResultHandler &&handler, bool subscribe_to_block) override
void notifySubscribers(const primitives::BlockInfo &block_info, outcome::result< void > res)
Notifies subscribers about arrived block.
std::unordered_multimap< primitives::BlockHash, SyncResultHandler > watched_blocks_
primitives::BlockNumber watched_blocks_number_
std::queue< JustificationPair > justifications_
BlockHash parent_hash
32-byte Blake2s hash of parent header
std::multimap< primitives::BlockNumber, primitives::BlockHash > generations_
static constexpr std::chrono::milliseconds kRecentnessDuration
std::shared_ptr< consensus::BlockExecutor > block_executor_
void applyNextBlock()
Pops next block from queue and tries to apply that.
bool syncByBlockHeader(const primitives::BlockHeader &header, const libp2p::peer::PeerId &peer_id, SyncResultHandler &&handler) override
static constexpr BlockAttributes kBasicAttributes
includes HEADER, BODY and JUSTIFICATION