24 case E::SHUTTING_DOWN:
25 return "Node is shutting down";
26 case E::EMPTY_RESPONSE:
27 return "Response is empty";
28 case E::RESPONSE_WITHOUT_BLOCK_HEADER:
29 return "Response does not contain header of some block";
30 case E::RESPONSE_WITHOUT_BLOCK_BODY:
31 return "Response does not contain body of some block";
32 case E::DISCARDED_BLOCK:
33 return "Block is discarded";
35 return "Wrong order of blocks/headers in response";
37 return "Hash does not match";
38 case E::ALREADY_IN_QUEUE:
39 return "Block is already enqueued";
41 return "Peer is busy";
42 case E::ARRIVED_TOO_EARLY:
43 return "Block is arrived too early. Try to process it late";
44 case E::DUPLICATE_REQUEST:
45 return "Duplicate of recent request has been detected";
47 return "unknown error";
51 constexpr
const char *kImportQueueLength =
52 "kagome_import_queue_blocks_submitted";
72 std::shared_ptr<application::AppStateManager> app_state_manager,
73 std::shared_ptr<blockchain::BlockTree> block_tree,
74 std::shared_ptr<storage::changes_trie::ChangesTracker> changes_tracker,
75 std::shared_ptr<consensus::BlockAppender> block_appender,
76 std::shared_ptr<consensus::BlockExecutor> block_executor,
77 std::shared_ptr<storage::trie::TrieSerializer> serializer,
78 std::shared_ptr<storage::trie::TrieStorage> storage,
79 std::shared_ptr<network::Router> router,
80 std::shared_ptr<libp2p::basic::Scheduler> scheduler,
81 std::shared_ptr<crypto::Hasher> hasher,
82 std::shared_ptr<storage::BufferStorage> buffer_storage)
83 : app_state_manager_(
std::move(app_state_manager)),
84 block_tree_(
std::move(block_tree)),
85 trie_changes_tracker_(
std::move(changes_tracker)),
86 block_appender_(
std::move(block_appender)),
87 block_executor_(
std::move(block_executor)),
88 serializer_(
std::move(serializer)),
89 storage_(
std::move(storage)),
90 router_(
std::move(router)),
91 scheduler_(
std::move(scheduler)),
92 hasher_(
std::move(hasher)),
93 buffer_storage_(
std::move(buffer_storage)) {
109 kImportQueueLength,
"Number of blocks submitted to the import queue");
121 if (opt_res.has_error()) {
123 log_,
"Can't check of incomplete state sync: {}", opt_res.error());
126 if (opt_res.value().has_value()) {
127 auto &encoded_block = opt_res.value().value();
129 scale::decode<decltype(state_sync_on_block_)::value_type>(
130 std::move(encoded_block));
131 if (block_res.has_error()) {
133 "Can't decode data of incomplete state sync: {}",
137 auto &block = block_res.value();
139 "Found incomplete state sync on block {}; " 140 "State sync will be restarted",
162 [handler = std::move(handler), block_info] { handler(block_info); });
166 auto last_finalized_block =
block_tree_->getLastFinalized();
168 if (last_finalized_block.number <= block_info.
number) {
175 auto best_block_res =
176 block_tree_->getBestContaining(last_finalized_block.hash, std::nullopt);
177 BOOST_ASSERT(best_block_res.has_value());
178 const auto &best_block = best_block_res.value();
181 scheduler_->schedule([handler = std::move(handler)] {
192 outcome::result<void> res) {
194 for (
auto it = range.first; it != range.second;) {
197 if (res.has_error()) {
198 auto error = res.as_failure();
200 [handler = std::move(node.mapped()), error] { handler(error); });
203 [handler = std::move(node.mapped()), block] { handler(block); });
213 bool subscribe_to_block) {
215 if (subscribe_to_block) {
222 auto &block_in_queue = it->second;
223 block_in_queue.peers.emplace(peer_id);
224 if (handler) handler(block_info);
230 auto peer_is_busy = not
busy_peers_.emplace(peer_id).second;
234 "Can't syncByBlockHeader block {} is received from {}: Peer busy",
239 SL_TRACE(
log_,
"Peer {} marked as busy", peer_id);
241 const auto &last_finalized_block =
block_tree_->getLastFinalized();
243 auto best_block_res =
244 block_tree_->getBestContaining(last_finalized_block.hash, std::nullopt);
245 BOOST_ASSERT(best_block_res.has_value());
246 const auto &best_block = best_block_res.value();
249 if (block_info == best_block) {
250 if (handler) handler(block_info);
262 const auto lower = last_finalized_block.number;
266 const auto upper = std::min(block_info.
number, best_block.number) + 1;
269 const auto hint = std::min(block_info.
number, best_block.number);
271 BOOST_ASSERT(lower < upper);
275 [wp = weak_from_this(), peer_id, handler = std::move(handler)](
276 outcome::result<primitives::BlockInfo> res)
mutable {
277 if (
auto self = wp.lock()) {
279 if (self->busy_peers_.erase(peer_id) > 0) {
280 SL_TRACE(self->log_,
"Peer {} unmarked as busy", peer_id);
284 if (not res.has_value()) {
285 if (handler) handler(res.as_failure());
290 auto &block_info = res.value();
291 if (
auto it = self->known_blocks_.find(block_info.
hash);
292 it !=
self->known_blocks_.end()) {
293 auto &block_in_queue = it->second;
294 block_in_queue.peers.emplace(peer_id);
295 if (handler) handler(std::move(block_info));
301 "Start to load blocks from {} since block {}",
304 self->loadBlocks(peer_id, block_info, std::move(handler));
310 "Start to find common block with {} in #{}..#{} to catch up",
322 auto block_hash =
hasher_->blake2b_256(scale::encode(header).value());
326 if (
block_tree_->getBlockHeader(block_hash).has_value()) {
333 auto &block_in_queue = it->second;
334 block_in_queue.peers.emplace(peer_id);
351 bool parent_is_known =
355 if (parent_is_known) {
356 loadBlocks(peer_id, block_info, [wp = weak_from_this()](
auto res) {
357 if (
auto self = wp.lock()) {
358 SL_TRACE(self->log_,
"Block(s) enqueued to apply by announce");
368 [wp = weak_from_this()](
auto res) {
369 if (
auto self = wp.lock()) {
370 SL_TRACE(self->log_,
"Block(s) enqueued to load by announce");
379 std::optional<uint32_t> limit,
384 "Justifications load since block {} was rescheduled, peer {} is busy",
389 block = std::move(target_block),
390 limit = std::move(limit),
391 handler = std::move(handler)]()
mutable {
392 auto self = wp.lock();
396 self->syncMissingJustifications(
397 peer_id, std::move(block), std::move(limit), std::move(handler));
403 peer_id, std::move(target_block), std::move(limit), std::move(handler));
412 std::map<primitives::BlockNumber, primitives::BlockHash> &&observed) {
424 auto request_fingerprint = request.fingerprint();
429 "Can't check if block #{} in #{}..#{} is common with {}: {}",
441 auto response_handler = [wp = weak_from_this(),
446 handler = std::move(handler),
447 observed = std::move(observed),
448 request_fingerprint](
auto &&response_res)
mutable {
449 auto self = wp.lock();
455 if (response_res.has_error()) {
456 SL_VERBOSE(self->log_,
457 "Can't check if block #{} in #{}..#{} is common with {}: {}",
462 response_res.error().message());
463 handler(response_res.as_failure());
466 auto &blocks = response_res.value().blocks;
471 if (blocks.empty()) {
472 SL_VERBOSE(self->log_,
473 "Can't check if block #{} in #{}..#{} is common with {}: " 474 "Response does not have any blocks",
480 self->recent_requests_.erase(std::tuple(peer_id, request_fingerprint));
484 auto hash = blocks.front().hash;
486 observed.emplace(target, hash);
490 bool block_is_known =
491 self->known_blocks_.find(hash) !=
self->known_blocks_.end()
492 or
self->block_tree_->getBlockHeader(hash).has_value();
495 if (target == lower) {
496 if (block_is_known) {
499 "Found best common block with {}: {}",
509 SL_WARN(self->log_,
"Not found any common block with {}", peer_id);
517 if (block_is_known) {
519 "Block {} of {} is found locally",
525 hint = lower + (upper - lower) / 2;
528 "Block {} of {} is not found locally",
535 auto step = upper - target;
539 hint = upper - std::min(step, (upper - lower) / 2);
541 hint = lower + (upper - lower) / 2;
545 auto it = observed.find(hint);
548 if (it != observed.end()) {
554 "Block {} of {} is already observed. Continue without request",
561 self->findCommonBlock(peer_id,
566 std::move(observed));
572 "Check if block #{} in #{}..#{} is common with {}",
578 auto protocol =
router_->getSyncProtocol();
579 BOOST_ASSERT_MSG(protocol,
"Router did not provide sync protocol");
580 protocol->request(peer_id, std::move(request), std::move(response_handler));
597 auto request_fingerprint = request.fingerprint();
602 "Can't load blocks from {} beginning block {}: {}",
612 auto response_handler = [wp = weak_from_this(),
615 handler = std::move(handler),
617 auto &&response_res)
mutable {
618 auto self = wp.lock();
624 if (response_res.has_error()) {
626 "Can't load blocks from {} beginning block {}: {}",
629 response_res.error().message());
630 if (handler) handler(response_res.as_failure());
633 auto &blocks = response_res.value().blocks;
637 if (blocks.empty()) {
639 "Can't load blocks from {} beginning block {}: " 640 "Response does not have any blocks",
648 "{} blocks are loaded from {} beginning block {}",
653 bool some_blocks_added =
false;
656 for (
auto &block : blocks) {
658 if (not block.header.has_value()) {
660 "Can't load blocks from {} starting from block {}: " 661 "Received block without header",
668 if (not block.header.has_value()) {
670 "Can't load blocks from {} starting from block {}: " 671 "Received block without body",
677 auto &header = block.header.value();
679 const auto &last_finalized_block =
680 self->block_tree_->getLastFinalized();
683 if (last_finalized_block.number >= header.number) {
684 if (last_finalized_block.number == header.number) {
685 if (last_finalized_block.hash != block.hash) {
687 "Can't load blocks from {} starting from block {}: " 688 "Received discarded block {}",
697 "Skip block {} received from {}: " 698 "it is finalized with block #{}",
701 last_finalized_block.
number);
706 "Skip block {} received from {}: " 707 "it is below the last finalized block #{}",
710 last_finalized_block.
number);
715 if (last_finalized_block.number + 1 == header.number) {
716 if (last_finalized_block.hash != header.parent_hash) {
718 "Can't complete blocks loading from {} starting from " 719 "block {}: Received discarded block {}",
722 BlockInfo(header.number, header.parent_hash));
728 parent_hash = header.parent_hash;
733 if (parent_hash != header.parent_hash && parent_hash != zero_hash) {
735 "Can't complete blocks loading from {} starting from " 736 "block {}: Received block is not descendant of previous",
744 auto calculated_hash =
745 self->hasher_->blake2b_256(scale::encode(header).value());
746 if (block.hash != calculated_hash) {
748 "Can't complete blocks loading from {} starting from " 750 "Received block whose hash does not match the header",
757 last_loaded_block = {header.
number, block.hash};
759 parent_hash = block.hash;
762 auto it =
self->known_blocks_.find(block.hash);
763 if (it == self->known_blocks_.end()) {
764 self->known_blocks_.emplace(block.hash,
KnownBlock{block, {peer_id}});
765 self->metric_import_queue_length_->set(self->known_blocks_.size());
767 it->second.peers.emplace(peer_id);
769 "Skip block {} received from {}: already enqueued",
776 "Enqueue block {} received from {}",
780 self->generations_.emplace(header.number, block.hash);
781 self->ancestry_.emplace(header.parent_hash, block.hash);
783 some_blocks_added =
true;
786 SL_TRACE(self->log_,
"Block loading is finished");
788 handler(last_loaded_block);
791 if (some_blocks_added) {
792 SL_TRACE(self->log_,
"Enqueued some new blocks: schedule applying");
793 self->scheduler_->schedule([wp] {
794 if (
auto self = wp.lock()) {
795 self->applyNextBlock();
801 auto protocol =
router_->getSyncProtocol();
802 BOOST_ASSERT_MSG(protocol,
"Router did not provide sync protocol");
803 protocol->request(peer_id, std::move(request), std::move(response_handler));
808 std::optional<uint32_t> limit,
816 auto cleanup = gsl::finally([
this, peer_id] {
829 auto request_fingerprint = request.fingerprint();
833 "Can't load justification from {} for block {}: {}",
845 auto response_handler = [wp = weak_from_this(),
848 handler = std::move(handler)](
849 auto &&response_res)
mutable {
850 auto self = wp.lock();
855 if (response_res.has_error()) {
857 "Can't load justification from {} for block {}: {}",
860 response_res.error().message());
862 handler(response_res.as_failure());
867 auto &blocks = response_res.value().blocks;
869 if (blocks.empty()) {
871 "Can't load block justification from {} for block {}: " 872 "Response does not have any contents",
879 bool justification_received =
false;
881 for (
auto &block : blocks) {
882 if (not block.header) {
884 "No header was provided from {} for block {} while " 885 "requesting justifications",
891 if (block.justification) {
892 justification_received =
true;
893 last_justified_block =
896 std::lock_guard lock(self->justifications_mutex_);
897 self->justifications_.emplace(last_justified_block,
898 *block.justification);
903 if (justification_received) {
904 SL_TRACE(self->log_,
"Enqueued new justifications: schedule applying");
905 self->scheduler_->schedule([wp] {
906 if (
auto self = wp.lock()) {
907 self->applyNextJustification();
912 handler(last_justified_block);
916 auto protocol =
router_->getSyncProtocol();
917 BOOST_ASSERT_MSG(protocol,
"Router did not provide sync protocol");
918 protocol->request(peer_id, std::move(request), std::move(response_handler));
927 "SyncState was not requested to {}: " 928 "state sync for other block is not completed yet",
933 bool bool_val =
false;
937 "State sync request was not sent to {} for block {}: " 938 "previous request in progress",
945 SL_INFO(
log_,
"Sync of state for block {} has started", block);
949 log_,
"State sync request has sent to {} for block {}", peer_id, block);
954 auto protocol =
router_->getStateProtocol();
955 BOOST_ASSERT_MSG(protocol,
"Router did not provide state protocol");
957 auto response_handler =
958 [wp = weak_from_this(), block, peer_id, handler = std::move(handler)](
959 auto &&response_res)
mutable {
960 auto self = wp.lock();
966 if (response_res.has_error()) {
967 self->state_sync_request_in_progress_ =
false;
970 "State syncing failed with error: {}",
971 response_res.error().message());
972 if (handler) handler(response_res.as_failure());
977 for (
unsigned i = 0; i < response_res.value().entries.size(); ++i) {
978 const auto &state_entry = response_res.value().entries[i];
981 auto it =
self->batches_store_.find(state_entry.state_root);
982 auto batch = (it !=
self->batches_store_.end())
983 ? std::get<2>(it->second)
985 ->getPersistentBatchAt(
986 self->serializer_->getEmptyRootHash())
990 if (state_entry.entries.size()) {
992 "Syncing {}th item. Current key {}. Keys received {}.",
994 state_entry.entries[0].key.toHex(),
995 state_entry.entries.size());
996 for (
const auto &entry : state_entry.entries) {
997 std::ignore = batch->put(entry.key, entry.value);
1001 if (!state_entry.complete) {
1002 self->batches_store_[state_entry.state_root] = {
1003 state_entry.entries.back().key, i, batch};
1005 self->batches_store_.erase(state_entry.state_root);
1010 if (state_entry.complete) {
1011 auto res = batch->commit();
1012 if (res.has_value()) {
1013 const auto &expected = [&] {
1015 return state_entry.state_root;
1018 self->block_tree_->getBlockHeader(block.
hash);
1019 BOOST_ASSERT_MSG(header_res.has_value(),
1020 "It is state of existing block; head " 1021 "must be existing");
1022 const auto &header = header_res.value();
1023 return header.state_root;
1026 const auto &actual = res.value();
1028 if (actual == expected) {
1030 "Syncing of {}state on block {} has finished. " 1031 "Root hashes match: {}",
1032 i != 0 ?
"child " :
"",
1037 "Syncing of {}state on block {} has finished. " 1038 "Root hashes mismatch: expected={}, actual={}",
1039 i != 0 ?
"child " :
"",
1046 self->trie_changes_tracker_->onBlockAdded(block.
hash);
1051 self->entries_ += state_entry.entries.size();
1056 std::map<unsigned, common::Buffer> keymap;
1057 for (
const auto &[_, val] : self->batches_store_) {
1058 unsigned i = std::get<1>(val);
1059 keymap[i] = std::get<0>(val);
1060 SL_TRACE(self->log_,
"Index: {}, Key: {}", i, keymap[i]);
1063 std::vector<common::Buffer> keys;
1064 for (
const auto &[_, val] : keymap) {
1065 keys.push_back(val);
1068 if (not response_res.value().entries[0].complete) {
1069 SL_TRACE(self->log_,
1070 "State syncing continues. {} entries loaded",
1073 if (not self->state_sync_on_block_.has_value()) {
1074 auto res =
self->buffer_storage_->put(
1077 if (res.has_error()) {
1079 "Can't save data of incomplete state sync: {}",
1082 self->state_sync_on_block_.emplace(block);
1084 self->state_sync_request_ =
1086 self->state_sync_request_in_progress_ =
false;
1087 self->syncState(peer_id, block, std::move(handler));
1094 auto res =
self->buffer_storage_->remove(
1096 if (res.has_error()) {
1098 "Can't remove data of incomplete state sync: {}",
1101 self->state_sync_request_.reset();
1102 self->state_sync_on_block_.reset();
1103 self->state_sync_request_in_progress_ =
false;
1109 protocol->request(peer_id, std::move(request), std::move(response_handler));
1114 SL_TRACE(
log_,
"No block for applying");
1118 bool false_val =
false;
1120 SL_TRACE(
log_,
"Applying in progress");
1123 SL_TRACE(
log_,
"Begin applying");
1124 auto cleanup = gsl::finally([
this] {
1125 SL_TRACE(
log_,
"End applying");
1133 if (generation_node) {
1134 hash = generation_node.mapped();
1138 SL_TRACE(
log_,
"No block for applying");
1145 auto &block = node.mapped().data;
1146 BOOST_ASSERT(block.header.has_value());
1147 const BlockInfo block_info(block.header->number, block.hash);
1149 const auto &last_finalized_block =
block_tree_->getLastFinalized();
1155 handler = std::move(wbn_node.mapped());
1161 if (block.header->number <= last_finalized_block.number) {
1162 auto header_res =
block_tree_->getBlockHeader(hash);
1163 if (not header_res.has_value()) {
1167 "Block {} {} not applied as discarded",
1169 n ? fmt::format(
"and {} others have", n) : fmt::format(
"has"));
1174 outcome::result<void> applying_res = outcome::success();
1191 "Block {} {} not applied as discarded: " 1192 "state syncing on block in progress",
1194 n ? fmt::format(
"and {} others have", n) : fmt::format(
"has"));
1202 if (not applying_res.has_value()) {
1209 "Block {} {} been discarded: {}",
1211 n ? fmt::format(
"and {} others have", n) : fmt::format(
"has"),
1212 applying_res.error().message());
1215 SL_DEBUG(
log_,
"Block {} is skipped as existing", block_info);
1216 if (handler) handler(block_info);
1221 if (handler) handler(block_info);
1227 auto minPreloadedBlockAmount =
1234 "{} blocks in queue: ask next portion of block",
1241 scheduler_->schedule([wp = weak_from_this()] {
1242 if (
auto self = wp.lock()) {
1243 self->applyNextBlock();
1250 bool false_val =
false;
1252 SL_TRACE(
log_,
"Applying justification in progress");
1255 SL_TRACE(
log_,
"Begin justification applying");
1256 auto cleanup = gsl::finally([
this] {
1257 SL_TRACE(
log_,
"End justification applying");
1261 std::queue<JustificationPair> justifications;
1267 while (not justifications.empty()) {
1268 auto [block_info, justification] = std::move(justifications.front());
1269 const auto &block = block_info;
1270 justifications.pop();
1271 auto res =
block_executor_->applyJustification(block_info, justification);
1272 if (res.has_error()) {
1274 "Justification for block {} was not applied: {}",
1276 res.error().message());
1278 SL_TRACE(
log_,
"Applied justification for block {}", block);
1285 std::queue<primitives::BlockHash> queue;
1286 queue.emplace(hash_of_discarding_block);
1288 size_t affected = 0;
1289 while (not queue.empty()) {
1290 const auto &hash = queue.front();
1293 auto number = it->second.data.header->number;
1300 auto range =
ancestry_.equal_range(hash);
1301 for (
auto it = range.first; it != range.second; ++it) {
1302 queue.emplace(it->second);
1304 ancestry_.erase(range.first, range.second);
1317 if (generation_node) {
1318 const auto &number = generation_node.key();
1319 if (number >= finalized_block.
number) {
1322 const auto &hash = generation_node.mapped();
1333 for (
auto it = range.first; it != range.second;) {
1335 const auto &hash = cit->second;
1336 if (hash != finalized_block.
hash) {
1348 [wp = weak_from_this(), peer_id, fingerprint] {
1349 if (
auto self = wp.lock()) {
1350 self->recent_requests_.erase(std::tuple(peer_id, fingerprint));
1357 bool false_val =
false;
1360 SL_TRACE(
log_,
"Asking portion of blocks in progress");
1363 SL_TRACE(
log_,
"Begin asking portion of blocks");
1367 const auto &hash = g_it->second;
1372 "Block {} is unknown. Go to next one",
1379 auto &peers = b_it->second.peers;
1380 if (peers.empty()) {
1382 log_,
"Block {} don't have any peer. Go to next one", block_info);
1386 for (
auto p_it = peers.begin(); p_it != peers.end();) {
1387 auto cp_it = p_it++;
1389 auto peer_id = *cp_it;
1393 "Peer {} for block {} is busy",
1400 SL_TRACE(
log_,
"Peer {} marked as busy", peer_id);
1402 auto handler = [wp = weak_from_this(), peer_id](
const auto &res) {
1403 if (
auto self = wp.lock()) {
1404 if (self->busy_peers_.erase(peer_id) > 0) {
1405 SL_TRACE(self->log_,
"Peer {} unmarked as busy", peer_id);
1407 SL_TRACE(self->log_,
"End asking portion of blocks");
1408 self->asking_blocks_portion_in_progress_ =
false;
1409 if (not res.has_value()) {
1410 SL_DEBUG(self->log_,
1411 "Loading next portion of blocks from {} is failed: {}",
1413 res.error().message());
1416 SL_DEBUG(self->log_,
1417 "Portion of blocks from {} is loaded till {}",
1420 if (self->known_blocks_.empty()) {
1421 self->askNextPortionOfBlocks();
1433 "Start to find common block with {} in #{}..#{} to fill queue",
1442 [wp = weak_from_this(), peer_id, handler = std::move(handler)](
1443 outcome::result<primitives::BlockInfo> res) {
1444 if (
auto self = wp.lock()) {
1445 if (not res.has_value()) {
1446 SL_DEBUG(self->log_,
1447 "Can't load next portion of blocks from {}: {}",
1449 res.error().message());
1453 auto &common_block_info = res.value();
1454 SL_DEBUG(self->log_,
1455 "Start to load next portion of blocks from {} " 1460 peer_id, common_block_info, std::move(handler));
1465 "Start to load next portion of blocks from {} " 1469 loadBlocks(peer_id, block_info, std::move(handler));
1475 "Block {} doesn't have appropriate peer. Go to next one",
1479 SL_TRACE(
log_,
"End asking portion of blocks: none");
std::mutex justifications_mutex_
Class represents arbitrary (including empty) byte buffer.
std::shared_ptr< network::Router > router_
size_t discardBlock(const primitives::BlockHash &block)
Include a justification for the block.
virtual void set(double val)=0
Set the gauge to the given value.
void askNextPortionOfBlocks()
Tries to request another portion of block.
void applyNextJustification()
Pops next justification from queue and tries to apply it.
application::AppConfiguration::SyncMethod sync_method_
metrics::RegistryPtr metrics_registry_
static constexpr size_t kMinPreloadedBlockAmountForFastSyncing
std::unordered_map< primitives::BlockHash, KnownBlock > known_blocks_
SynchronizerImpl(const application::AppConfiguration &app_config, std::shared_ptr< application::AppStateManager > app_state_manager, std::shared_ptr< blockchain::BlockTree > block_tree, std::shared_ptr< storage::changes_trie::ChangesTracker > changes_tracker, std::shared_ptr< consensus::BlockAppender > block_appender, std::shared_ptr< consensus::BlockExecutor > block_executor, std::shared_ptr< storage::trie::TrieSerializer > serializer, std::shared_ptr< storage::trie::TrieStorage > storage, std::shared_ptr< network::Router > router, std::shared_ptr< libp2p::basic::Scheduler > scheduler, std::shared_ptr< crypto::Hasher > hasher, std::shared_ptr< storage::BufferStorage > buffer_storage)
std::shared_ptr< storage::trie::TrieSerializer > serializer_
void findCommonBlock(const libp2p::peer::PeerId &peer_id, primitives::BlockNumber lower, primitives::BlockNumber upper, primitives::BlockNumber hint, SyncResultHandler &&handler, std::map< primitives::BlockNumber, primitives::BlockHash > &&observed={})
std::shared_ptr< blockchain::BlockTree > block_tree_
std::shared_ptr< libp2p::basic::Scheduler > scheduler_
std::shared_ptr< storage::changes_trie::ChangesTracker > trie_changes_tracker_
Block is part of the initial sync with the network.
telemetry::Telemetry telemetry_
std::unordered_multimap< primitives::BlockHash, primitives::BlockHash > ancestry_
void syncState(const libp2p::peer::PeerId &peer_id, const primitives::BlockInfo &block, SyncResultHandler &&handler) override
primitives::BlockInfo BlockInfo
std::optional< primitives::BlockInfo > state_sync_on_block_
OUTCOME_CPP_DEFINE_CATEGORY(kagome::network, SynchronizerImpl::Error, e)
std::atomic_bool asking_blocks_portion_in_progress_
std::set< libp2p::peer::PeerId > busy_peers_
libp2p::peer::PeerId PeerId
std::shared_ptr< application::AppStateManager > app_state_manager_
std::optional< network::StateRequest > state_sync_request_
const common::Buffer kBlockOfIncompleteSyncStateLookupKey
void scheduleRecentRequestRemoval(const libp2p::peer::PeerId &peer_id, const BlocksRequest::Fingerprint &fingerprint)
void loadJustifications(const libp2p::peer::PeerId &peer_id, primitives::BlockInfo target_block, std::optional< uint32_t > limit, SyncResultHandler &&handler)
bool subscribeToBlock(const primitives::BlockInfo &block_info, SyncResultHandler &&handler)
std::set< std::tuple< libp2p::peer::PeerId, BlocksRequest::Fingerprint > > recent_requests_
std::function< void(outcome::result< primitives::BlockInfo >)> SyncResultHandler
std::atomic_bool state_sync_request_in_progress_
metrics::Gauge * metric_import_queue_length_
std::shared_ptr< storage::BufferStorage > buffer_storage_
void loadBlocks(const libp2p::peer::PeerId &peer_id, primitives::BlockInfo from, SyncResultHandler &&handler)
std::shared_ptr< storage::trie::TrieStorage > storage_
std::shared_ptr< consensus::BlockAppender > block_appender_
void prune(const primitives::BlockInfo &finalized_block)
std::atomic_bool applying_in_progress_
std::multimap< primitives::BlockInfo, SyncResultHandler > subscriptions_
void syncMissingJustifications(const PeerId &peer_id, primitives::BlockInfo target_block, std::optional< uint32_t > limit, SyncResultHandler &&handler) override
static constexpr size_t kMinPreloadedBlockAmount
virtual SyncMethod syncMethod() const =0
static constexpr size_t kMaxDistanceToBlockForSubscription
std::shared_ptr< crypto::Hasher > hasher_
bool syncByBlockInfo(const primitives::BlockInfo &block_info, const libp2p::peer::PeerId &peer_id, SyncResultHandler &&handler, bool subscribe_to_block) override
void notifySubscribers(const primitives::BlockInfo &block_info, outcome::result< void > res)
Notifies subscribers about arrived block.
std::unordered_multimap< primitives::BlockHash, SyncResultHandler > watched_blocks_
primitives::BlockNumber watched_blocks_number_
std::queue< JustificationPair > justifications_
std::multimap< primitives::BlockNumber, primitives::BlockHash > generations_
static constexpr std::chrono::milliseconds kRecentnessDuration
bool node_is_shutting_down_
std::shared_ptr< consensus::BlockExecutor > block_executor_
void applyNextBlock()
Pops next block from queue and tries to apply that.
bool syncByBlockHeader(const primitives::BlockHeader &header, const libp2p::peer::PeerId &peer_id, SyncResultHandler &&handler) override
static constexpr BlockAttributes kBasicAttributes
includes HEADER, BODY and JUSTIFICATION