Kagome
Polkadot Runtime Engine in C++17
synchronizer_impl.hpp
Go to the documentation of this file.
1 
6 #ifndef KAGOME_NETWORK_SYNCHRONIZERIMPL
7 #define KAGOME_NETWORK_SYNCHRONIZERIMPL
8 
10 
11 #include <atomic>
12 #include <mutex>
13 #include <queue>
14 
15 #include <libp2p/basic/scheduler.hpp>
16 
20 #include "metrics/metrics.hpp"
21 #include "network/router.hpp"
23 #include "telemetry/service.hpp"
24 
25 namespace kagome::application {
26  class AppConfiguration;
27 }
28 
30  class ChangesTracker;
31 }
32 
33 namespace kagome::storage::trie {
34  class PersistentTrieBatch;
35  class TrieSerializer;
36  class TrieStorage;
37 } // namespace kagome::storage::trie
38 
39 namespace kagome::network {
40 
42  : public Synchronizer,
43  public std::enable_shared_from_this<SynchronizerImpl> {
44  public:
48  static constexpr size_t kMinPreloadedBlockAmount = 256;
49 
52  static constexpr size_t kMinPreloadedBlockAmountForFastSyncing =
53  kMinPreloadedBlockAmount * 40;
54 
58  static constexpr size_t kMaxDistanceToBlockForSubscription =
59  kMinPreloadedBlockAmount * 2;
60 
61  static constexpr std::chrono::milliseconds kRecentnessDuration =
62  std::chrono::seconds(60);
63 
64  enum class Error {
65  SHUTTING_DOWN = 1,
66  EMPTY_RESPONSE,
67  RESPONSE_WITHOUT_BLOCK_HEADER,
68  RESPONSE_WITHOUT_BLOCK_BODY,
69  DISCARDED_BLOCK,
70  WRONG_ORDER,
71  INVALID_HASH,
72  ALREADY_IN_QUEUE,
73  PEER_BUSY,
74  ARRIVED_TOO_EARLY,
75  DUPLICATE_REQUEST,
76  };
77 
79  const application::AppConfiguration &app_config,
80  std::shared_ptr<application::AppStateManager> app_state_manager,
81  std::shared_ptr<blockchain::BlockTree> block_tree,
82  std::shared_ptr<storage::changes_trie::ChangesTracker> changes_tracker,
83  std::shared_ptr<consensus::BlockAppender> block_appender,
84  std::shared_ptr<consensus::BlockExecutor> block_executor,
85  std::shared_ptr<storage::trie::TrieSerializer> serializer,
86  std::shared_ptr<storage::trie::TrieStorage> storage,
87  std::shared_ptr<network::Router> router,
88  std::shared_ptr<libp2p::basic::Scheduler> scheduler,
89  std::shared_ptr<crypto::Hasher> hasher,
90  std::shared_ptr<storage::BufferStorage> buffer_storage);
91 
93  bool prepare();
94 
96  bool start();
97 
99  void stop();
100 
106  bool syncByBlockInfo(const primitives::BlockInfo &block_info,
107  const libp2p::peer::PeerId &peer_id,
108  SyncResultHandler &&handler,
109  bool subscribe_to_block) override;
110 
115  bool syncByBlockHeader(const primitives::BlockHeader &header,
116  const libp2p::peer::PeerId &peer_id,
117  SyncResultHandler &&handler) override;
118 
119  // See loadJustifications
120  void syncMissingJustifications(const PeerId &peer_id,
121  primitives::BlockInfo target_block,
122  std::optional<uint32_t> limit,
123  SyncResultHandler &&handler) override;
124 
128  void syncState(const libp2p::peer::PeerId &peer_id,
129  const primitives::BlockInfo &block,
130  SyncResultHandler &&handler) override;
131 
140  void findCommonBlock(const libp2p::peer::PeerId &peer_id,
144  SyncResultHandler &&handler,
145  std::map<primitives::BlockNumber,
146  primitives::BlockHash> &&observed = {});
147 
150  void loadBlocks(const libp2p::peer::PeerId &peer_id,
152  SyncResultHandler &&handler);
153 
157  void loadJustifications(const libp2p::peer::PeerId &peer_id,
158  primitives::BlockInfo target_block,
159  std::optional<uint32_t> limit,
160  SyncResultHandler &&handler);
161 
163  bool hasIncompleteRequestOfStateSync() const override {
164  return state_sync_request_.has_value();
165  }
166 
167  private:
171  bool subscribeToBlock(const primitives::BlockInfo &block_info,
172  SyncResultHandler &&handler);
173 
175  void notifySubscribers(const primitives::BlockInfo &block_info,
176  outcome::result<void> res);
177 
179  void askNextPortionOfBlocks();
180 
182  void applyNextBlock();
183 
185  void applyNextJustification();
186 
189  size_t discardBlock(const primitives::BlockHash &block);
190 
193  void prune(const primitives::BlockInfo &finalized_block);
194 
197  void scheduleRecentRequestRemoval(
198  const libp2p::peer::PeerId &peer_id,
199  const BlocksRequest::Fingerprint &fingerprint);
200 
201  std::shared_ptr<application::AppStateManager> app_state_manager_;
202  std::shared_ptr<blockchain::BlockTree> block_tree_;
203  std::shared_ptr<storage::changes_trie::ChangesTracker>
205  std::shared_ptr<consensus::BlockAppender> block_appender_;
206  std::shared_ptr<consensus::BlockExecutor> block_executor_;
207  std::shared_ptr<storage::trie::TrieSerializer> serializer_;
208  std::shared_ptr<storage::trie::TrieStorage> storage_;
209  std::shared_ptr<network::Router> router_;
210  std::shared_ptr<libp2p::basic::Scheduler> scheduler_;
211  std::shared_ptr<crypto::Hasher> hasher_;
212  std::shared_ptr<storage::BufferStorage> buffer_storage_;
213 
215 
216  // Metrics
219 
220  log::Logger log_ = log::createLogger("Synchronizer", "synchronizer");
222 
223  std::atomic_bool state_sync_request_in_progress_ = false;
224  std::optional<network::StateRequest> state_sync_request_;
225  std::optional<primitives::BlockInfo> state_sync_on_block_;
226 
227  bool node_is_shutting_down_ = false;
228 
229  struct KnownBlock {
233  std::set<libp2p::peer::PeerId> peers;
234  };
235 
236  // Already known (enqueued) but is not applied yet
237  std::unordered_map<primitives::BlockHash, KnownBlock> known_blocks_;
238 
239  std::optional<primitives::BlockInfo> sync_block_;
240 
241  // Blocks grouped by number
242  std::multimap<primitives::BlockNumber, primitives::BlockHash> generations_;
243 
244  // Links parent->child
245  std::unordered_multimap<primitives::BlockHash, primitives::BlockHash>
247 
248  // Loaded justifications to apply
249  using JustificationPair =
250  std::pair<primitives::BlockInfo, primitives::Justification>;
251  std::queue<JustificationPair> justifications_;
253 
254  // BlockNumber of blocks (aka height) that is potentially best now
255  primitives::BlockNumber watched_blocks_number_{};
256 
257  // Handlers what will be called when block is apply
258  std::unordered_multimap<primitives::BlockHash, SyncResultHandler>
260 
261  std::multimap<primitives::BlockInfo, SyncResultHandler> subscriptions_;
262 
263  std::atomic_bool applying_in_progress_ = false;
264  std::atomic_bool asking_blocks_portion_in_progress_ = false;
265  std::set<libp2p::peer::PeerId> busy_peers_;
266 
267  std::set<std::tuple<libp2p::peer::PeerId, BlocksRequest::Fingerprint>>
269 
270  std::unordered_map<
272  std::tuple<common::Buffer,
273  unsigned,
274  std::shared_ptr<storage::trie::PersistentTrieBatch>>>
276  size_t entries_{0};
277  };
278 
279 } // namespace kagome::network
280 
282 
283 #endif // KAGOME_NETWORK_SYNCHRONIZERIMPL
std::shared_ptr< network::Router > router_
bool hasIncompleteRequestOfStateSync() const override
Check if incomplete requests of state sync exists.
std::shared_ptr< TelemetryService > createTelemetryService()
Returns preliminary initialized instance of telemetry service.
Definition: service.cpp:69
application::AppConfiguration::SyncMethod sync_method_
std::unordered_map< storage::trie::RootHash, std::tuple< common::Buffer, unsigned, std::shared_ptr< storage::trie::PersistentTrieBatch > > > batches_store_
std::pair< primitives::BlockInfo, primitives::Justification > JustificationPair
std::unordered_map< primitives::BlockHash, KnownBlock > known_blocks_
std::shared_ptr< storage::trie::TrieSerializer > serializer_
std::shared_ptr< blockchain::BlockTree > block_tree_
std::shared_ptr< libp2p::basic::Scheduler > scheduler_
std::set< libp2p::peer::PeerId > peers
Peers who know this block.
std::shared_ptr< storage::changes_trie::ChangesTracker > trie_changes_tracker_
std::unordered_multimap< primitives::BlockHash, primitives::BlockHash > ancestry_
OUTCOME_HPP_DECLARE_ERROR(kagome::api, JRpcServerImpl::Error)
std::unique_ptr< Registry > RegistryPtr
Definition: metrics.hpp:15
std::optional< primitives::BlockInfo > state_sync_on_block_
RegistryPtr createRegistry()
uint32_t BlockNumber
Definition: common.hpp:18
primitives::BlockData data
Data of block.
SLBuffer< std::numeric_limits< size_t >::max()> Buffer
Definition: buffer.hpp:244
std::set< libp2p::peer::PeerId > busy_peers_
libp2p::peer::PeerId PeerId
std::shared_ptr< application::AppStateManager > app_state_manager_
std::optional< network::StateRequest > state_sync_request_
std::optional< primitives::BlockInfo > sync_block_
std::shared_ptr< soralog::Logger > Logger
Definition: logger.hpp:23
std::set< std::tuple< libp2p::peer::PeerId, BlocksRequest::Fingerprint > > recent_requests_
std::function< void(outcome::result< primitives::BlockInfo >)> SyncResultHandler
std::shared_ptr< storage::BufferStorage > buffer_storage_
std::shared_ptr< storage::trie::TrieStorage > storage_
std::shared_ptr< consensus::BlockAppender > block_appender_
std::multimap< primitives::BlockInfo, SyncResultHandler > subscriptions_
std::shared_ptr< crypto::Hasher > hasher_
std::unordered_multimap< primitives::BlockHash, SyncResultHandler > watched_blocks_
Logger createLogger(const std::string &tag)
Definition: logger.cpp:112
std::queue< JustificationPair > justifications_
std::shared_ptr< TelemetryService > Telemetry
Definition: service.hpp:86
std::multimap< primitives::BlockNumber, primitives::BlockHash > generations_
A gauge metric to represent a value that can arbitrarily go up and down.
Definition: metrics.hpp:49
std::shared_ptr< consensus::BlockExecutor > block_executor_
common::Hash256 RootHash
Definition: types.hpp:13