6 #ifndef KAGOME_NETWORK_SYNCHRONIZERIMPL 7 #define KAGOME_NETWORK_SYNCHRONIZERIMPL 15 #include <libp2p/basic/scheduler.hpp> 26 class AppConfiguration;
34 class PersistentTrieBatch;
43 public std::enable_shared_from_this<SynchronizerImpl> {
48 static constexpr
size_t kMinPreloadedBlockAmount = 256;
52 static constexpr
size_t kMinPreloadedBlockAmountForFastSyncing =
53 kMinPreloadedBlockAmount * 40;
58 static constexpr
size_t kMaxDistanceToBlockForSubscription =
59 kMinPreloadedBlockAmount * 2;
61 static constexpr std::chrono::milliseconds kRecentnessDuration =
62 std::chrono::seconds(60);
67 RESPONSE_WITHOUT_BLOCK_HEADER,
68 RESPONSE_WITHOUT_BLOCK_BODY,
80 std::shared_ptr<application::AppStateManager> app_state_manager,
81 std::shared_ptr<blockchain::BlockTree> block_tree,
82 std::shared_ptr<storage::changes_trie::ChangesTracker> changes_tracker,
83 std::shared_ptr<consensus::BlockAppender> block_appender,
84 std::shared_ptr<consensus::BlockExecutor> block_executor,
85 std::shared_ptr<storage::trie::TrieSerializer> serializer,
86 std::shared_ptr<storage::trie::TrieStorage> storage,
87 std::shared_ptr<network::Router> router,
88 std::shared_ptr<libp2p::basic::Scheduler> scheduler,
89 std::shared_ptr<crypto::Hasher> hasher,
90 std::shared_ptr<storage::BufferStorage> buffer_storage);
109 bool subscribe_to_block)
override;
120 void syncMissingJustifications(
const PeerId &peer_id,
122 std::optional<uint32_t> limit,
159 std::optional<uint32_t> limit,
164 return state_sync_request_.has_value();
176 outcome::result<void> res);
179 void askNextPortionOfBlocks();
182 void applyNextBlock();
185 void applyNextJustification();
197 void scheduleRecentRequestRemoval(
203 std::shared_ptr<storage::changes_trie::ChangesTracker>
208 std::shared_ptr<storage::trie::TrieStorage>
storage_;
223 std::atomic_bool state_sync_request_in_progress_ =
false;
227 bool node_is_shutting_down_ =
false;
233 std::set<libp2p::peer::PeerId>
peers;
242 std::multimap<primitives::BlockNumber, primitives::BlockHash>
generations_;
245 std::unordered_multimap<primitives::BlockHash, primitives::BlockHash>
250 std::pair<primitives::BlockInfo, primitives::Justification>;
258 std::unordered_multimap<primitives::BlockHash, SyncResultHandler>
263 std::atomic_bool applying_in_progress_ =
false;
264 std::atomic_bool asking_blocks_portion_in_progress_ =
false;
267 std::set<std::tuple<libp2p::peer::PeerId, BlocksRequest::Fingerprint>>
274 std::shared_ptr<storage::trie::PersistentTrieBatch>>>
283 #endif // KAGOME_NETWORK_SYNCHRONIZERIMPL std::mutex justifications_mutex_
std::shared_ptr< network::Router > router_
bool hasIncompleteRequestOfStateSync() const override
Check if incomplete requests of state sync exists.
std::shared_ptr< TelemetryService > createTelemetryService()
Returns preliminary initialized instance of telemetry service.
application::AppConfiguration::SyncMethod sync_method_
std::unordered_map< storage::trie::RootHash, std::tuple< common::Buffer, unsigned, std::shared_ptr< storage::trie::PersistentTrieBatch > > > batches_store_
std::pair< primitives::BlockInfo, primitives::Justification > JustificationPair
std::unordered_map< primitives::BlockHash, KnownBlock > known_blocks_
std::shared_ptr< storage::trie::TrieSerializer > serializer_
std::shared_ptr< blockchain::BlockTree > block_tree_
std::shared_ptr< libp2p::basic::Scheduler > scheduler_
std::set< libp2p::peer::PeerId > peers
Peers who know this block.
std::shared_ptr< storage::changes_trie::ChangesTracker > trie_changes_tracker_
std::unordered_multimap< primitives::BlockHash, primitives::BlockHash > ancestry_
OUTCOME_HPP_DECLARE_ERROR(kagome::api, JRpcServerImpl::Error)
std::unique_ptr< Registry > RegistryPtr
std::optional< primitives::BlockInfo > state_sync_on_block_
RegistryPtr createRegistry()
primitives::BlockData data
Data of block.
SLBuffer< std::numeric_limits< size_t >::max()> Buffer
std::set< libp2p::peer::PeerId > busy_peers_
libp2p::peer::PeerId PeerId
std::shared_ptr< application::AppStateManager > app_state_manager_
std::optional< network::StateRequest > state_sync_request_
std::optional< primitives::BlockInfo > sync_block_
std::shared_ptr< soralog::Logger > Logger
std::set< std::tuple< libp2p::peer::PeerId, BlocksRequest::Fingerprint > > recent_requests_
std::function< void(outcome::result< primitives::BlockInfo >)> SyncResultHandler
metrics::Gauge * metric_import_queue_length_
std::shared_ptr< storage::BufferStorage > buffer_storage_
std::shared_ptr< storage::trie::TrieStorage > storage_
std::shared_ptr< consensus::BlockAppender > block_appender_
std::multimap< primitives::BlockInfo, SyncResultHandler > subscriptions_
std::shared_ptr< crypto::Hasher > hasher_
std::unordered_multimap< primitives::BlockHash, SyncResultHandler > watched_blocks_
Logger createLogger(const std::string &tag)
std::queue< JustificationPair > justifications_
std::shared_ptr< TelemetryService > Telemetry
std::multimap< primitives::BlockNumber, primitives::BlockHash > generations_
A gauge metric to represent a value that can arbitrarily go up and down.
std::shared_ptr< consensus::BlockExecutor > block_executor_