Kagome
Polkadot Runtime Engine in C++17
block_executor_impl.cpp
Go to the documentation of this file.
1 
7 
8 #include <chrono>
9 
19 #include "primitives/common.hpp"
21 #include "scale/scale.hpp"
23 
26  switch (e) {
27  case E::INVALID_BLOCK:
28  return "Invalid block";
29  case E::PARENT_NOT_FOUND:
30  return "Parent not found";
31  case E::INTERNAL_ERROR:
32  return "Internal error";
33  }
34  return "Unknown error";
35 }
36 
37 namespace {
38  constexpr const char *kBlockExecutionTime =
39  "kagome_block_verification_and_import_time";
40 }
41 
42 namespace kagome::consensus {
43 
45  std::shared_ptr<blockchain::BlockTree> block_tree,
46  std::shared_ptr<runtime::Core> core,
47  std::shared_ptr<consensus::babe::BabeConfigRepository> babe_config_repo,
48  std::shared_ptr<BlockValidator> block_validator,
49  std::shared_ptr<grandpa::Environment> grandpa_environment,
50  std::shared_ptr<transaction_pool::TransactionPool> tx_pool,
51  std::shared_ptr<crypto::Hasher> hasher,
52  std::shared_ptr<blockchain::DigestTracker> digest_tracker,
53  std::shared_ptr<BabeUtil> babe_util,
54  std::shared_ptr<runtime::OffchainWorkerApi> offchain_worker_api,
55  std::shared_ptr<babe::ConsistencyKeeper> consistency_keeper)
56  : block_tree_{std::move(block_tree)},
57  core_{std::move(core)},
58  babe_config_repo_{std::move(babe_config_repo)},
59  block_validator_{std::move(block_validator)},
60  grandpa_environment_{std::move(grandpa_environment)},
61  tx_pool_{std::move(tx_pool)},
62  hasher_{std::move(hasher)},
63  digest_tracker_(std::move(digest_tracker)),
64  babe_util_(std::move(babe_util)),
65  offchain_worker_api_(std::move(offchain_worker_api)),
66  consistency_keeper_(std::move(consistency_keeper)),
67  logger_{log::createLogger("BlockExecutor", "block_executor")},
69  BOOST_ASSERT(block_tree_ != nullptr);
70  BOOST_ASSERT(core_ != nullptr);
71  BOOST_ASSERT(babe_config_repo_ != nullptr);
72  BOOST_ASSERT(block_validator_ != nullptr);
73  BOOST_ASSERT(grandpa_environment_ != nullptr);
74  BOOST_ASSERT(tx_pool_ != nullptr);
75  BOOST_ASSERT(hasher_ != nullptr);
76  BOOST_ASSERT(digest_tracker_ != nullptr);
77  BOOST_ASSERT(babe_util_ != nullptr);
78  BOOST_ASSERT(offchain_worker_api_ != nullptr);
79  BOOST_ASSERT(consistency_keeper_ != nullptr);
80  BOOST_ASSERT(logger_ != nullptr);
81  BOOST_ASSERT(telemetry_ != nullptr);
82 
83  // Register metrics
84  metrics_registry_->registerHistogramFamily(
85  kBlockExecutionTime, "Time taken to verify and import blocks");
86  metric_block_execution_time_ = metrics_registry_->registerHistogramMetric(
87  kBlockExecutionTime,
88  {0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10});
89  }
90 
91  outcome::result<void> BlockExecutorImpl::applyBlock(
93  if (not b.header.has_value()) {
94  logger_->warn("Skipping a block without header");
95  return Error::INVALID_BLOCK;
96  }
97  auto &header = b.header.value();
98 
99  auto block_hash = hasher_->blake2b_256(scale::encode(header).value());
100 
101  primitives::BlockInfo block_info(header.number, block_hash);
102 
103  if (auto header_res = block_tree_->getBlockHeader(header.parent_hash);
104  header_res.has_error()
105  && header_res.error() == blockchain::BlockTreeError::HEADER_NOT_FOUND) {
106  logger_->warn("Skipping a block {} with unknown parent", block_info);
108  } else if (header_res.has_error()) {
109  return header_res.as_failure();
110  }
111 
112  // get current time to measure performance if block execution
113  auto t_start = std::chrono::high_resolution_clock::now();
114 
115  bool block_already_exists = false;
116 
117  // check if block body already exists. If so, do not apply
118  if (auto body_res = block_tree_->getBlockBody(block_hash);
119  body_res.has_value()) {
120  SL_DEBUG(logger_, "Skip existing block: {}", block_info);
121 
122  OUTCOME_TRY(block_tree_->addExistingBlock(block_hash, header));
123  block_already_exists = true;
124  } else if (body_res.error() != blockchain::BlockTreeError::BODY_NOT_FOUND) {
125  return body_res.as_failure();
126  }
127 
128  if (not b.body.has_value()) {
129  logger_->warn("Skipping a block without body.");
130  return Error::INVALID_BLOCK;
131  }
132  auto &body = b.body.value();
133 
134  primitives::Block block{.header = std::move(header),
135  .body = std::move(body)};
136 
137  OUTCOME_TRY(babe_digests, getBabeDigests(block.header));
138 
139  const auto &babe_header = babe_digests.second;
140 
141  auto slot_number = babe_header.slot_number;
142 
143  babe_util_->syncEpoch([&] {
144  auto res = block_tree_->getBlockHeader(primitives::BlockNumber(1));
145  if (res.has_error()) {
146  if (block.header.number == 1) {
147  SL_TRACE(logger_,
148  "First block slot is {}: it is first block (at executing)",
149  slot_number);
150  return std::tuple(slot_number, false);
151  } else {
152  SL_TRACE(logger_,
153  "First block slot is {}: no first block (at executing)",
154  babe_util_->getCurrentSlot());
155  return std::tuple(babe_util_->getCurrentSlot(), false);
156  }
157  }
158 
159  const auto &first_block_header = res.value();
160  auto babe_digest_res = consensus::getBabeDigests(first_block_header);
161  BOOST_ASSERT_MSG(babe_digest_res.has_value(),
162  "Any non genesis block must contain babe digest");
163  auto first_slot_number = babe_digest_res.value().second.slot_number;
164 
165  auto is_first_block_finalized =
166  block_tree_->getLastFinalized().number > 0;
167 
168  SL_TRACE(
169  logger_,
170  "First block slot is {}: by {}finalized first block (at executing)",
171  first_slot_number,
172  is_first_block_finalized ? "" : "non-");
173  return std::tuple(first_slot_number, is_first_block_finalized);
174  });
175 
176  auto epoch_number = babe_util_->slotToEpoch(slot_number);
177 
178  SL_INFO(
179  logger_,
180  "Applying block {} ({} in slot {}, epoch {}, authority #{})", // .
181  block_info,
182  to_string(babe_header.slotType()),
183  slot_number,
184  epoch_number,
185  babe_header.authority_index);
186 
187  auto consistency_guard = consistency_keeper_->start(block_info);
188 
189  // observe digest of block
190  // (must be done strictly after block will be added)
191  auto digest_tracking_res =
192  digest_tracker_->onDigest(block_info, block.header.digest);
193  if (digest_tracking_res.has_error()) {
194  SL_ERROR(logger_,
195  "Error while tracking digest of block {}: {}",
196  block_info,
197  digest_tracking_res.error().message());
198  return digest_tracking_res.as_failure();
199  }
200 
201  auto babe_config = babe_config_repo_->config(block_info, epoch_number);
202  if (babe_config == nullptr) {
203  return Error::INVALID_BLOCK; // TODO Change to more appropriate error
204  }
205 
206  SL_TRACE(logger_,
207  "Actual epoch digest to apply block {} (slot {}, epoch {}). "
208  "Randomness: {}",
209  block_info,
210  slot_number,
211  epoch_number,
212  babe_config->randomness);
213 
214  auto threshold = calculateThreshold(babe_config->leadership_rate,
215  babe_config->authorities,
216  babe_header.authority_index);
217 
218  OUTCOME_TRY(block_validator_->validateHeader(
219  block.header,
220  epoch_number,
221  babe_config->authorities[babe_header.authority_index].id,
222  threshold,
223  *babe_config));
224 
225  auto block_without_seal_digest = block;
226 
227  // block should be applied without last digest which contains the seal
228  block_without_seal_digest.header.digest.pop_back();
229 
230  auto parent = block_tree_->getBlockHeader(block.header.parent_hash).value();
231 
232  auto last_finalized_block = block_tree_->getLastFinalized();
233  auto previous_best_block_res =
234  block_tree_->getBestContaining(last_finalized_block.hash, std::nullopt);
235  BOOST_ASSERT(previous_best_block_res.has_value());
236  const auto &previous_best_block = previous_best_block_res.value();
237 
238  if (not block_already_exists) {
239  auto exec_start = std::chrono::high_resolution_clock::now();
240  SL_DEBUG(logger_,
241  "Execute block {}, state {}, a child of block {}, state {}",
242  block_info,
243  block.header.state_root,
244  primitives::BlockInfo(parent.number, block.header.parent_hash),
245  parent.state_root);
246 
247  OUTCOME_TRY(core_->execute_block(block_without_seal_digest));
248 
249  auto exec_end = std::chrono::high_resolution_clock::now();
250  auto duration_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
251  exec_end - exec_start)
252  .count();
253  SL_DEBUG(logger_, "Core_execute_block: {} ms", duration_ms);
254 
255  metric_block_execution_time_->observe(static_cast<double>(duration_ms)
256  / 1000);
257 
258  // add block header if it does not exist
259  OUTCOME_TRY(block_tree_->addBlock(block));
260  }
261 
262  // apply justification if any (must be done strictly after block will be
263  // added and his consensus-digests will be handled)
264  if (b.justification.has_value()) {
265  SL_VERBOSE(logger_, "Justification received for block {}", block_info);
266 
267  // try to apply left in justification store values first
268  if (not justifications_.empty()) {
269  std::vector<primitives::BlockInfo> to_remove;
270  for (const auto &[block_justified_for, justification] :
271  justifications_) {
272  auto res = applyJustification(block_justified_for, justification);
273  if (res) {
274  to_remove.push_back(block_justified_for);
275  }
276  }
277  if (not to_remove.empty()) {
278  for (const auto &item : to_remove) {
279  justifications_.erase(item);
280  }
281  }
282  }
283 
284  auto res = applyJustification(block_info, b.justification.value());
285  if (res.has_error()) {
286  if (res
287  == outcome::failure(grandpa::VotingRoundError::NOT_ENOUGH_WEIGHT)) {
288  justifications_.emplace(block_info, b.justification.value());
289  } else {
290  return res.as_failure();
291  }
292  } else {
293  // safely could be remove if current justification applied successfully
294  justifications_.clear();
295  }
296  }
297 
298  // remove block's extrinsics from tx pool
299  for (const auto &extrinsic : block.body) {
300  auto res = tx_pool_->removeOne(hasher_->blake2b_256(extrinsic.data));
301  if (res.has_error()
302  && res
303  != outcome::failure(
305  return res.as_failure();
306  }
307  }
308 
309  auto t_end = std::chrono::high_resolution_clock::now();
310 
311  logger_->info(
312  "Imported block {} within {} ms",
313  block_info,
314  std::chrono::duration_cast<std::chrono::milliseconds>(t_end - t_start)
315  .count());
316 
317  last_finalized_block = block_tree_->getLastFinalized();
318  telemetry_->notifyBlockFinalized(last_finalized_block);
319  auto current_best_block_res =
320  block_tree_->getBestContaining(last_finalized_block.hash, std::nullopt);
321  BOOST_ASSERT(current_best_block_res.has_value());
322  const auto &current_best_block = current_best_block_res.value();
323  telemetry_->notifyBlockImported(
324  current_best_block, telemetry::BlockOrigin::kNetworkInitialSync);
325 
326  // Create new offchain worker for block if it is best only
327  if (current_best_block.number > previous_best_block.number) {
328  auto ocw_res = offchain_worker_api_->offchain_worker(
329  block.header.parent_hash, block.header);
330  if (ocw_res.has_failure()) {
331  logger_->error("Can't spawn offchain worker for block {}: {}",
332  block_info,
333  ocw_res.error().message());
334  }
335  }
336 
337  consistency_guard.commit();
338 
339  return outcome::success();
340  }
341 
343  const primitives::BlockInfo &block_info,
344  const primitives::Justification &justification) {
345  return grandpa_environment_->applyJustification(block_info, justification);
346  }
347 
348 } // namespace kagome::consensus
std::shared_ptr< transaction_pool::TransactionPool > tx_pool_
std::shared_ptr< TelemetryService > createTelemetryService()
Returns preliminary initialized instance of telemetry service.
Definition: service.cpp:69
OUTCOME_CPP_DEFINE_CATEGORY(kagome::consensus, BlockExecutorImpl::Error, e)
std::map< primitives::BlockInfo, primitives::Justification > justifications_
outcome::result< std::pair< Seal, BabeBlockHeader > > getBabeDigests(const primitives::BlockHeader &block_header)
Block class represents polkadot block primitive.
Definition: block.hpp:19
std::string_view to_string(SlotType s)
Definition: slot.hpp:22
std::shared_ptr< runtime::OffchainWorkerApi > offchain_worker_api_
Block is part of the initial sync with the network.
Threshold calculateThreshold(const std::pair< uint64_t, uint64_t > &ratio, const primitives::AuthorityList &authorities, primitives::AuthorityIndex authority_index)
uint32_t BlockNumber
Definition: common.hpp:18
std::shared_ptr< consensus::babe::BabeConfigRepository > babe_config_repo_
virtual void observe(const double value)=0
Observe the given amount.
std::shared_ptr< grandpa::Environment > grandpa_environment_
std::shared_ptr< runtime::Core > core_
std::shared_ptr< crypto::Hasher > hasher_
std::shared_ptr< blockchain::DigestTracker > digest_tracker_
std::shared_ptr< BabeUtil > babe_util_
BlockExecutorImpl(std::shared_ptr< blockchain::BlockTree > block_tree, std::shared_ptr< runtime::Core > core, std::shared_ptr< consensus::babe::BabeConfigRepository > babe_config_repo, std::shared_ptr< BlockValidator > block_validator, std::shared_ptr< grandpa::Environment > grandpa_environment, std::shared_ptr< transaction_pool::TransactionPool > tx_pool, std::shared_ptr< crypto::Hasher > hasher, std::shared_ptr< blockchain::DigestTracker > digest_tracker, std::shared_ptr< BabeUtil > babe_util, std::shared_ptr< runtime::OffchainWorkerApi > offchain_worker_api, std::shared_ptr< babe::ConsistencyKeeper > consistency_keeper)
std::shared_ptr< blockchain::BlockTree > block_tree_
outcome::result< void > applyJustification(const primitives::BlockInfo &block_info, const primitives::Justification &justification) override
BlockHeader header
block header
Definition: block.hpp:22
Logger createLogger(const std::string &tag)
Definition: logger.cpp:112
std::shared_ptr< babe::ConsistencyKeeper > consistency_keeper_
outcome::result< void > applyBlock(primitives::BlockData &&block) override
std::shared_ptr< BlockValidator > block_validator_