Kagome
Polkadot Runtime Engine in C++17
babe_impl.cpp
Go to the documentation of this file.
1 
7 
8 #include <boost/assert.hpp>
9 #include <boost/range/adaptor/transformed.hpp>
10 
14 #include "common/buffer.hpp"
25 #include "network/synchronizer.hpp"
30 #include "scale/scale.hpp"
32 
33 namespace {
34  constexpr const char *kBlockProposalTime =
35  "kagome_proposer_block_constructed";
36 }
37 
38 using namespace std::literals::chrono_literals;
39 
40 namespace kagome::consensus::babe {
42 
43  BabeImpl::BabeImpl(
44  const application::AppConfiguration &app_config,
45  std::shared_ptr<application::AppStateManager> app_state_manager,
46  std::shared_ptr<BabeLottery> lottery,
47  std::shared_ptr<consensus::babe::BabeConfigRepository> babe_config_repo,
48  std::shared_ptr<authorship::Proposer> proposer,
49  std::shared_ptr<blockchain::BlockTree> block_tree,
50  std::shared_ptr<network::BlockAnnounceTransmitter>
51  block_announce_transmitter,
52  std::shared_ptr<crypto::Sr25519Provider> sr25519_provider,
53  const std::shared_ptr<crypto::Sr25519Keypair> &keypair,
54  std::shared_ptr<clock::SystemClock> clock,
55  std::shared_ptr<crypto::Hasher> hasher,
56  std::unique_ptr<clock::Timer> timer,
57  std::shared_ptr<authority::AuthorityUpdateObserver>
58  authority_update_observer,
59  std::shared_ptr<network::Synchronizer> synchronizer,
60  std::shared_ptr<BabeUtil> babe_util,
62  std::shared_ptr<runtime::OffchainWorkerApi> offchain_worker_api,
63  std::shared_ptr<runtime::Core> core,
64  std::shared_ptr<babe::ConsistencyKeeper> consistency_keeper)
65  : app_config_(app_config),
66  lottery_{std::move(lottery)},
67  babe_config_repo_{std::move(babe_config_repo)},
68  proposer_{std::move(proposer)},
69  block_tree_{std::move(block_tree)},
70  block_announce_transmitter_{std::move(block_announce_transmitter)},
71  keypair_{keypair},
72  clock_{std::move(clock)},
73  hasher_{std::move(hasher)},
74  sr25519_provider_{std::move(sr25519_provider)},
75  timer_{std::move(timer)},
76  authority_update_observer_(std::move(authority_update_observer)),
77  synchronizer_(std::move(synchronizer)),
78  babe_util_(std::move(babe_util)),
79  chain_events_engine_(std::move(chain_events_engine)),
80  chain_sub_([&] {
81  BOOST_ASSERT(chain_events_engine_ != nullptr);
82  return std::make_shared<primitives::events::ChainEventSubscriber>(
84  }()),
85  offchain_worker_api_(std::move(offchain_worker_api)),
86  runtime_core_(std::move(core)),
87  consistency_keeper_(std::move(consistency_keeper)),
88  log_{log::createLogger("Babe", "babe")},
90  BOOST_ASSERT(lottery_);
91  BOOST_ASSERT(babe_config_repo_);
92  BOOST_ASSERT(proposer_);
93  BOOST_ASSERT(block_tree_);
94  BOOST_ASSERT(block_announce_transmitter_);
95  BOOST_ASSERT(sr25519_provider_);
96  BOOST_ASSERT(clock_);
97  BOOST_ASSERT(hasher_);
98  BOOST_ASSERT(timer_);
99  BOOST_ASSERT(log_);
100  BOOST_ASSERT(authority_update_observer_);
101  BOOST_ASSERT(synchronizer_);
102  BOOST_ASSERT(babe_util_);
103  BOOST_ASSERT(offchain_worker_api_);
104  BOOST_ASSERT(runtime_core_);
105  BOOST_ASSERT(consistency_keeper_);
106 
107  BOOST_ASSERT(app_state_manager);
108 
109  // Register metrics
110  metrics_registry_->registerHistogramFamily(
111  kBlockProposalTime, "Time taken to construct new block");
112  metric_block_proposal_time_ = metrics_registry_->registerHistogramMetric(
113  kBlockProposalTime,
114  {0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10});
115 
116  app_state_manager->takeControl(*this);
117  }
118 
120  auto res = getInitialEpochDescriptor();
121  if (res.has_error()) {
122  SL_CRITICAL(log_,
123  "Can't get initial epoch descriptor: {}",
124  res.error().message());
125  return false;
126  }
127 
128  current_epoch_ = res.value();
129 
130  chain_sub_->subscribe(chain_sub_->generateSubscriptionSetId(),
132  chain_sub_->setCallback([wp = weak_from_this()](
134  auto &&,
137  &event) {
139  if (auto self = wp.lock()) {
140  if (self->current_state_ != Babe::State::HEADERS_LOADING
141  and self->current_state_ != Babe::State::STATE_LOADING) {
142  const auto &header =
143  boost::get<primitives::events::HeadsEventParams>(event).get();
144  auto hash =
145  self->hasher_->blake2b_256(scale::encode(header).value());
146 
147  auto version_res = self->runtime_core_->version(hash);
148  if (version_res.has_value()) {
149  auto &version = version_res.value();
150  if (not self->actual_runtime_version_.has_value()
151  || self->actual_runtime_version_ != version) {
152  self->actual_runtime_version_ = version;
153  self->chain_events_engine_->notify(
155  kFinalizedRuntimeVersion,
156  version);
157  }
158  }
159  }
160  }
161  }
162  });
163 
164  return true;
165  }
166 
168  best_block_ = block_tree_->deepestLeaf();
169 
170  SL_DEBUG(log_, "Babe is starting with syncing from block {}", best_block_);
171 
172  SL_DEBUG(log_,
173  "Starting in epoch {} and slot {}",
176 
177  if (keypair_) {
178  auto babe_config =
180  if (babe_config == nullptr) {
181  SL_CRITICAL(
182  log_,
183  "Can't obtain digest of epoch {} from block tree for block {}",
185  best_block_);
186  return false;
187  }
188 
189  const auto &authorities = babe_config->authorities;
190  if (authorities.size() == 1
191  and authorities[0].id.id == keypair_->public_key) {
192  SL_INFO(log_, "Starting single validating node.");
193  onSynchronized();
194  return true;
195  }
196  }
197 
198  switch (app_config_.syncMethod()) {
199  case SyncMethod::Full:
201  break;
202 
203  case SyncMethod::Fast:
204  if (synchronizer_->hasIncompleteRequestOfStateSync()) {
205  // Has incomplete downloading state; continue loading of state
207  } else {
208  // No incomplete downloading state; load headers first
210  }
211  break;
212  }
213 
214  return true;
215  }
216 
217  void BabeImpl::stop() {}
218 
225  std::optional<uint64_t> getAuthorityIndex(
226  const primitives::AuthorityList &authorities,
227  const primitives::BabeSessionKey &authority_key) {
228  uint64_t n = 0;
229  for (auto &authority : authorities) {
230  if (authority.id.id == authority_key) {
231  return n;
232  }
233  ++n;
234  }
235  return std::nullopt;
236  }
237 
238  outcome::result<EpochDescriptor> BabeImpl::getInitialEpochDescriptor() {
239  auto best_block = block_tree_->deepestLeaf();
240 
241  if (best_block.number == 0) {
242  EpochDescriptor epoch_descriptor{
243  .epoch_number = 0,
244  .start_slot =
245  static_cast<BabeSlotNumber>(clock_->now().time_since_epoch()
246  / babe_config_repo_->slotDuration())
247  + 1};
248  return outcome::success(epoch_descriptor);
249  }
250 
251  // Look up slot number of best block
252  auto best_block_header_res = block_tree_->getBlockHeader(best_block.hash);
253  BOOST_ASSERT_MSG(best_block_header_res.has_value(),
254  "Best block must be known whenever");
255  const auto &best_block_header = best_block_header_res.value();
256  auto babe_digest_res = getBabeDigests(best_block_header);
257  BOOST_ASSERT_MSG(babe_digest_res.has_value(),
258  "Any non genesis block must contain babe digest");
259  auto last_slot_number = babe_digest_res.value().second.slot_number;
260 
261  EpochDescriptor epoch_descriptor{
262  .epoch_number = babe_util_->slotToEpoch(last_slot_number),
263  .start_slot =
264  last_slot_number - babe_util_->slotInEpoch(last_slot_number)};
265 
266  return outcome::success(epoch_descriptor);
267  }
268 
270  auto first_slot_number = babe_util_->syncEpoch([&]() {
271  auto res = block_tree_->getBlockHeader(primitives::BlockNumber(1));
272  if (res.has_error()) {
273  SL_TRACE(log_,
274  "First block slot is {}: no first block (at adjusting)",
275  babe_util_->getCurrentSlot());
276  return std::tuple(babe_util_->getCurrentSlot(), false);
277  }
278 
279  const auto &first_block_header = res.value();
280  auto babe_digest_res = consensus::getBabeDigests(first_block_header);
281  BOOST_ASSERT_MSG(babe_digest_res.has_value(),
282  "Any non genesis block must contain babe digest");
283  auto first_slot_number = babe_digest_res.value().second.slot_number;
284 
285  auto is_first_block_finalized =
286  block_tree_->getLastFinalized().number > 0;
287 
288  SL_TRACE(
289  log_,
290  "First block slot is {}: by {}finalized first block (at adjusting)",
291  first_slot_number,
292  is_first_block_finalized ? "" : "non-");
293  return std::tuple(first_slot_number, is_first_block_finalized);
294  });
295 
296  auto current_epoch_start_slot =
297  first_slot_number
298  + current_epoch_.epoch_number * babe_config_repo_->epochLength();
299 
300  if (current_epoch_.start_slot != current_epoch_start_slot) {
301  SL_WARN(log_,
302  "Start-slot of current epoch {} has updated from {} to {}",
305  current_epoch_start_slot);
306 
307  current_epoch_.start_slot = current_epoch_start_slot;
308  }
309  }
310 
312  bool already_active = false;
313  if (not active_.compare_exchange_strong(already_active, true)) {
314  return;
315  }
316 
317  BOOST_ASSERT(keypair_ != nullptr);
318 
320 
321  SL_DEBUG(
322  log_,
323  "Starting an epoch {}. Session key: {:l}. Secondary slots allowed={}",
324  epoch.epoch_number,
325  keypair_->public_key,
327  ->isSecondarySlotsAllowed());
328  current_epoch_ = epoch;
329  current_slot_ = current_epoch_.start_slot;
330 
331  runSlot();
332  }
333 
335  return current_state_;
336  }
337 
339  const network::Status &status) {
340  // If state is loading, just to ping of loading
342  startStateSyncing(peer_id);
343  return;
344  }
345 
346  const auto &last_finalized_block = block_tree_->getLastFinalized();
347 
348  auto current_best_block_res =
349  block_tree_->getBestContaining(last_finalized_block.hash, std::nullopt);
350  BOOST_ASSERT(current_best_block_res.has_value());
351  const auto &current_best_block = current_best_block_res.value();
352 
353  if (current_best_block == status.best_block) {
356  startStateSyncing(peer_id);
359  onSynchronized();
360  }
361  return;
362  }
363 
364  // Remote peer is lagged
365  if (status.best_block.number <= last_finalized_block.number) {
366  return;
367  }
368 
369  startCatchUp(peer_id, status.best_block);
370  }
371 
373  const network::BlockAnnounce &announce) {
374  // If state is loading, just to ping of loading
376  startStateSyncing(peer_id);
377  return;
378  }
379 
380  const auto &last_finalized_block = block_tree_->getLastFinalized();
381 
382  auto current_best_block_res =
383  block_tree_->getBestContaining(last_finalized_block.hash, std::nullopt);
384  BOOST_ASSERT(current_best_block_res.has_value());
385  const auto &current_best_block = current_best_block_res.value();
386 
387  // Skip obsoleted announce
388  if (announce.header.number < current_best_block.number) {
389  return;
390  }
391 
392  // Start catching up if gap recognized
395  if (announce.header.number > current_best_block.number + 1) {
396  auto block_hash =
397  hasher_->blake2b_256(scale::encode(announce.header).value());
398  const primitives::BlockInfo announced_block(announce.header.number,
399  block_hash);
400  startCatchUp(peer_id, announced_block);
401  return;
402  }
403  }
404 
405  // Received announce that has the same block number as ours best,
406  // or greater by one. Using of simple way to load block
407  synchronizer_->syncByBlockHeader(
408  announce.header,
409  peer_id,
410  [wp = weak_from_this(), announce = announce, peer_id](
411  outcome::result<primitives::BlockInfo> block_res) mutable {
412  if (auto self = wp.lock()) {
413  if (block_res.has_error()) {
414  return;
415  }
416  const auto &block = block_res.value();
417 
418  // Headers are loaded; Start sync of state
419  if (self->current_state_ == Babe::State::HEADERS_LOADING) {
420  self->current_state_ = Babe::State::HEADERS_LOADED;
421  self->startStateSyncing(peer_id);
422  return;
423  }
424 
425  // Just synced
426  if (self->current_state_ == Babe::State::CATCHING_UP) {
427  SL_INFO(self->log_, "Catching up is finished on block {}", block);
428  self->current_state_ = Babe::State::SYNCHRONIZED;
429  self->was_synchronized_ = true;
430  self->telemetry_->notifyWasSynchronized();
431  }
432 
433  // Synced
434  if (self->current_state_ == Babe::State::SYNCHRONIZED) {
435  self->onSynchronized();
436  // Propagate announce
437  self->block_announce_transmitter_->blockAnnounce(
438  std::move(announce));
439  return;
440  }
441  }
442  });
443  }
444 
446  const primitives::BlockInfo &target_block) {
448 
449  // synchronize missing blocks with their bodies
450  auto is_ran = synchronizer_->syncByBlockInfo(
451  target_block,
452  peer_id,
453  [wp = weak_from_this(), block = target_block, peer_id](
454  outcome::result<primitives::BlockInfo> res) {
455  if (auto self = wp.lock()) {
456  if (res.has_error()) {
457  SL_DEBUG(self->log_,
458  "Catching up {} to block {} is failed: {}",
459  peer_id,
460  block,
461  res.error().message());
462  return;
463  }
464 
465  SL_DEBUG(self->log_,
466  "Catching up {} to block {} is going; on block {} now",
467  peer_id,
468  block,
469  res.value());
470  }
471  },
472  false);
473 
474  if (is_ran) {
475  SL_VERBOSE(
476  log_, "Catching up {} to block {} is ran", peer_id, target_block);
483  }
484  }
485  }
486 
492  SL_WARN(log_, "Syncing of state can not be start: Bad state of babe");
493  return;
494  }
495 
497 
498  // Switch to last finalized to have a state on it
499  auto block_at_state = block_tree_->getLastFinalized();
500 
501  SL_DEBUG(log_,
502  "Rolling back non-finalized blocks. Last known finalized is {}",
503  block_at_state);
504 
505  // Next do-while-loop serves for removal non finalized blocks
506  bool affected;
507  do {
508  affected = false;
509 
510  auto block_tree_leaves = block_tree_->getLeaves();
511 
512  for (const auto &hash : block_tree_leaves) {
513  if (hash == block_at_state.hash) continue;
514 
515  auto header_res = block_tree_->getBlockHeader(hash);
516  if (header_res.has_error()) {
517  SL_CRITICAL(log_,
518  "Can't get header of one of removing leave_block: {}",
519  header_res.error().message());
520  continue;
521  }
522 
523  const auto &header = header_res.value();
524 
525  // Block below last finalized must not being. Don't touch just in case
526  if (header.number < block_at_state.number) {
527  continue;
528  }
529 
530  std::ignore = consistency_keeper_->start(
531  primitives::BlockInfo(header.number, hash));
532 
533  affected = true;
534  }
535  } while (affected);
536 
537  SL_TRACE(log_,
538  "Trying to sync state on block {} from {}",
539  block_at_state,
540  peer_id);
541 
542  synchronizer_->syncState(
543  peer_id,
544  block_at_state,
545  [wp = weak_from_this(), block_at_state, peer_id](auto res) mutable {
546  if (auto self = wp.lock()) {
547  if (res.has_error()) {
548  SL_WARN(self->log_,
549  "Syncing of state with {} on block {} is failed: {}",
550  peer_id,
551  block_at_state,
552  res.error().message());
553  return;
554  }
555 
556  SL_INFO(self->log_,
557  "State on block {} is synced successfully",
558  block_at_state);
559  self->current_state_ = Babe::State::CATCHING_UP;
560  }
561  });
562  }
563 
565  // won't start block production without keypair
566  if (not keypair_) {
568  return;
569  }
570 
572  was_synchronized_ = true;
573  telemetry_->notifyWasSynchronized();
574 
575  if (not active_) {
576  best_block_ = block_tree_->deepestLeaf();
577 
578  SL_DEBUG(log_, "Babe is synchronized on block {}", best_block_);
579 
581  }
582  }
583 
585  return was_synchronized_;
586  }
587 
589  BOOST_ASSERT(keypair_ != nullptr);
590 
591  bool rewind_slots; // NOLINT
592  auto slot = current_slot_;
593 
594  do {
595  // check that we are really in the middle of the slot, as expected; we
596  // can cooperate with a relatively little (kMaxLatency) latency, as our
597  // node will be able to retrieve
598  auto now = clock_->now();
599 
600  auto finish_time = babe_util_->slotFinishTime(current_slot_);
601 
602  rewind_slots =
603  now > finish_time
604  and (now - finish_time) > babe_config_repo_->slotDuration();
605 
606  if (rewind_slots) {
607  // we are too far behind; after skipping some slots (but not epochs)
608  // control will be returned to this method
609 
610  ++current_slot_;
611 
613  != babe_util_->slotToEpoch(current_slot_)) {
614  startNextEpoch();
615  } else {
617  }
618  } else if (slot < current_slot_) {
619  SL_VERBOSE(log_, "Slots {}..{} was skipped", slot, current_slot_ - 1);
620  }
621  } while (rewind_slots);
622 
623  // Slot processing begins in 1/3 slot time before end
624  auto finish_time = babe_util_->slotFinishTime(current_slot_)
625  - babe_config_repo_->slotDuration() / 3;
626 
627  SL_VERBOSE(log_,
628  "Starting a slot {} in epoch {} (remains {:.2f} sec.)",
631  std::chrono::duration_cast<std::chrono::milliseconds>(
632  babe_util_->remainToFinishOfSlot(current_slot_))
633  .count()
634  / 1000.);
635 
636  // everything is OK: wait for the end of the slot
637  timer_->expiresAt(finish_time);
638  timer_->asyncWait([this](auto &&ec) {
639  if (ec) {
640  log_->error("error happened while waiting on the timer: {}",
641  ec.message());
642  return;
643  }
644  processSlot();
645  });
646  }
647 
649  BOOST_ASSERT(keypair_ != nullptr);
650 
651  best_block_ = block_tree_->deepestLeaf();
652 
653  // Resolve slot collisions: if best block slot greater than current slot,
654  // that select his ancestor as best
655  for (;;) {
656  const auto &hash = best_block_.hash;
657  const auto header_res = block_tree_->getBlockHeader(hash);
658  BOOST_ASSERT(header_res.has_value());
659  const auto &header = header_res.value();
660  const auto babe_digests_res = getBabeDigests(header);
661  if (babe_digests_res.has_value()) {
662  const auto &babe_digests = babe_digests_res.value();
663  auto best_block_slot = babe_digests.second.slot_number;
664  if (current_slot_ > best_block_slot) { // Condition met
665  break;
666  }
667  SL_DEBUG(log_, "Detected collision in slot {}", current_slot_);
668  // Shift to parent block and check again
669  best_block_ =
670  primitives::BlockInfo(header.number - 1, header.parent_hash);
671  continue;
672  }
673  if (best_block_.number == 0) {
674  // Only genesis block header might not have a babe digest
675  break;
676  }
677  BOOST_ASSERT(babe_digests_res.has_value());
678  }
679 
680  auto babe_config =
682  if (babe_config) {
683  auto authority_index_res =
684  getAuthorityIndex(babe_config->authorities, keypair_->public_key);
685  if (not authority_index_res) {
686  SL_ERROR(log_,
687  "Authority not known, skipping slot processing. "
688  "Probably authority list has changed.");
689  } else {
690  const auto &authority_index = authority_index_res.value();
691 
692  if (lottery_->getEpoch() != current_epoch_) {
693  changeLotteryEpoch(current_epoch_, babe_config);
694  }
695 
696  auto slot_leadership = lottery_->getSlotLeadership(current_slot_);
697 
698  if (slot_leadership.has_value()) {
699  const auto &vrf_result = slot_leadership.value();
700  SL_DEBUG(log_,
701  "Babe author {} is leader (vrfOutput: {}, proof: {})",
702  keypair_->public_key,
703  common::Buffer(vrf_result.output),
704  common::Buffer(vrf_result.proof));
705 
707  SlotType::Primary, std::cref(vrf_result), authority_index);
708  } else if (babe_config->allowed_slots
710  or babe_config->allowed_slots
712  auto expected_author =
713  lottery_->secondarySlotAuthor(current_slot_,
714  babe_config->authorities.size(),
715  babe_config->randomness);
716 
717  if (expected_author.has_value()
718  and authority_index == expected_author.value()) {
719  if (babe_config->allowed_slots
721  auto vrf = lottery_->slotVrfSignature(current_slot_);
723  SlotType::SecondaryVRF, std::cref(vrf), authority_index);
724  } else { // plain secondary slots mode
726  SlotType::SecondaryPlain, std::nullopt, authority_index);
727  }
728  }
729  }
730  }
731  } else {
732  SL_ERROR(log_, "Can not get epoch; Skipping slot processing");
733  }
734 
735  SL_DEBUG(log_,
736  "Slot {} in epoch {} has finished",
739 
740  ++current_slot_;
741 
742  if (current_epoch_.epoch_number != babe_util_->slotToEpoch(current_slot_)) {
743  startNextEpoch();
744  } else {
746  }
747 
748  auto start_time = babe_util_->slotStartTime(current_slot_);
749 
750  SL_DEBUG(log_,
751  "Slot {} in epoch {} will start after {:.2f} sec.",
754  std::chrono::duration_cast<std::chrono::milliseconds>(
755  babe_util_->remainToStartOfSlot(current_slot_))
756  .count()
757  / 1000.);
758 
759  // everything is OK: wait for the end of the slot
760  timer_->expiresAt(start_time);
761  timer_->asyncWait([this](auto &&ec) {
762  if (ec) {
763  log_->error("error happened while waiting on the timer: {}",
764  ec.message());
765  return;
766  }
767  runSlot();
768  });
769  }
770 
771  outcome::result<primitives::PreRuntime> BabeImpl::babePreDigest(
772  SlotType slot_type,
773  std::optional<std::reference_wrapper<const crypto::VRFOutput>> output,
774  primitives::AuthorityIndex authority_index) const {
775  BabeBlockHeader babe_header{
776  .slot_assignment_type = slot_type,
777  .authority_index = authority_index,
778  .slot_number = current_slot_,
779  };
780 
781  if (babe_header.needVRFCheck()) {
782  if (not output.has_value()) {
783  SL_ERROR(
784  log_,
785  "VRF proof is required to build block header but was not passed");
787  }
788  babe_header.vrf_output = output.value();
789  }
790 
791  auto encoded_header_res = scale::encode(babe_header);
792  if (!encoded_header_res) {
793  SL_ERROR(log_,
794  "cannot encode BabeBlockHeader: {}",
795  encoded_header_res.error().message());
796  return encoded_header_res.error();
797  }
798  common::Buffer encoded_header{encoded_header_res.value()};
799 
800  return primitives::PreRuntime{{primitives::kBabeEngineId, encoded_header}};
801  }
802 
803  outcome::result<primitives::Seal> BabeImpl::sealBlock(
804  const primitives::Block &block) const {
805  BOOST_ASSERT(keypair_ != nullptr);
806 
807  auto pre_seal_encoded_block = scale::encode(block.header).value();
808 
809  auto pre_seal_hash = hasher_->blake2b_256(pre_seal_encoded_block);
810 
811  Seal seal{};
812 
813  if (auto signature = sr25519_provider_->sign(*keypair_, pre_seal_hash);
814  signature) {
815  seal.signature = signature.value();
816  } else {
817  SL_ERROR(
818  log_, "Error signing a block seal: {}", signature.error().message());
819  return signature.error();
820  }
821  auto encoded_seal = common::Buffer(scale::encode(seal).value());
822  return primitives::Seal{{primitives::kBabeEngineId, encoded_seal}};
823  }
824 
826  SlotType slot_type,
827  std::optional<std::reference_wrapper<const crypto::VRFOutput>> output,
828  primitives::AuthorityIndex authority_index) {
829  BOOST_ASSERT(keypair_ != nullptr);
830 
831  // build a block to be announced
832  SL_VERBOSE(log_,
833  "Obtained {} slot leadership in slot {} epoch {}",
834  slot_type == SlotType::Primary ? "primary"
835  : slot_type == SlotType::SecondaryVRF ? "secondary-vrf"
836  : slot_type == SlotType::SecondaryPlain ? "secondary-plain"
837  : "unknown",
840 
841  SL_INFO(log_, "Babe builds block on top of block {}", best_block_);
842 
843  primitives::InherentData inherent_data;
844  auto now = std::chrono::duration_cast<std::chrono::milliseconds>(
845  clock_->now().time_since_epoch())
846  .count();
847 
848  if (auto res = inherent_data.putData<uint64_t>(kTimestampId, now);
849  res.has_error()) {
850  SL_ERROR(log_, "cannot put an inherent data: {}", res.error().message());
851  return;
852  }
853 
854  if (auto res = inherent_data.putData(kBabeSlotId, current_slot_);
855  res.has_error()) {
856  SL_ERROR(log_, "cannot put an inherent data: {}", res);
857  return;
858  }
859 
860  ParachainInherentData paras_inherent_data;
861 
862  // TODO: research of filling of bitfield, backed candidates, disputes
863  // issue https://github.com/soramitsu/kagome/issues/1209
864 
865  {
866  auto best_block_header_res =
867  block_tree_->getBlockHeader(best_block_.hash);
868  BOOST_ASSERT_MSG(best_block_header_res.has_value(),
869  "The best block is always known");
870  paras_inherent_data.parent_header =
871  std::move(best_block_header_res.value());
872  }
873 
874  if (auto res = inherent_data.putData(kParachainId, paras_inherent_data);
875  res.has_error()) {
876  SL_ERROR(log_, "cannot put an inherent data: {}", res);
877  return;
878  }
879 
880  auto proposal_start = std::chrono::high_resolution_clock::now();
881  // calculate babe_pre_digest
882  auto babe_pre_digest_res =
883  babePreDigest(slot_type, output, authority_index);
884  if (not babe_pre_digest_res) {
885  SL_ERROR(log_,
886  "cannot propose a block: {}",
887  babe_pre_digest_res.error().message());
888  return;
889  }
890  const auto &babe_pre_digest = babe_pre_digest_res.value();
891 
892  // create new block
893  auto pre_seal_block_res =
894  proposer_->propose(best_block_, inherent_data, {babe_pre_digest});
895  if (!pre_seal_block_res) {
896  SL_ERROR(log_,
897  "Cannot propose a block: {}",
898  pre_seal_block_res.error().message());
899  return;
900  }
901 
902  auto proposal_end = std::chrono::high_resolution_clock::now();
903  auto duration_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
904  proposal_end - proposal_start)
905  .count();
906  SL_DEBUG(log_, "Block has been built in {} ms", duration_ms);
907 
908  metric_block_proposal_time_->observe(static_cast<double>(duration_ms)
909  / 1000);
910 
911  auto block = pre_seal_block_res.value();
912 
913  // Ensure block's extrinsics root matches extrinsics in block's body
914  BOOST_ASSERT_MSG(
915  [&block]() {
916  using boost::adaptors::transformed;
917  const auto &ext_root_res = storage::trie::calculateOrderedTrieHash(
918  block.body | transformed([](const auto &ext) {
919  return common::Buffer{scale::encode(ext).value()};
920  }));
921  return ext_root_res.has_value()
922  and (ext_root_res.value()
923  == common::Buffer(block.header.extrinsics_root));
924  }(),
925  "Extrinsics root does not match extrinsics in the block");
926 
927  // seal the block
928  auto seal_res = sealBlock(block);
929  if (!seal_res) {
930  SL_ERROR(
931  log_, "Failed to seal the block: {}", seal_res.error().message());
932  return;
933  }
934 
935  // add seal digest item
936  block.header.digest.emplace_back(seal_res.value());
937 
938  if (babe_util_->remainToFinishOfSlot(current_slot_ + kMaxBlockSlotsOvertime)
939  .count()
940  == 0) {
941  SL_WARN(log_,
942  "Block was not built in time. "
943  "Allowed slots ({}) have passed. "
944  "If you are executing in debug mode, consider to rebuild in "
945  "release",
947  return;
948  }
949 
950  const auto block_hash =
951  hasher_->blake2b_256(scale::encode(block.header).value());
952  const primitives::BlockInfo block_info(block.header.number, block_hash);
953 
954  auto last_finalized_block = block_tree_->getLastFinalized();
955  auto previous_best_block_res =
956  block_tree_->getBestContaining(last_finalized_block.hash, std::nullopt);
957  BOOST_ASSERT(previous_best_block_res.has_value());
958  const auto &previous_best_block = previous_best_block_res.value();
959 
960  // add block to the block tree
961  if (auto add_res = block_tree_->addBlock(block); not add_res) {
962  SL_ERROR(log_,
963  "Could not add block {}: {}",
964  block_info,
965  add_res.error().message());
966  auto removal_res = block_tree_->removeLeaf(block_hash);
967  if (removal_res.has_error()
968  and removal_res
969  != outcome::failure(
971  SL_WARN(log_,
972  "Rolling back of block {} is failed: {}",
973  block_info,
974  removal_res.error().message());
975  }
976  return;
977  }
978  telemetry_->notifyBlockImported(block_info, telemetry::BlockOrigin::kOwn);
979 
980  // observe possible changes of authorities
981  // (must be done strictly after block will be added)
982  for (auto &digest_item : block.header.digest) {
983  auto res = visit_in_place(
984  digest_item,
985  [&](const primitives::Consensus &consensus_message)
986  -> outcome::result<void> {
987  auto res = authority_update_observer_->onConsensus(
988  block_info, consensus_message);
989  if (res.has_error()) {
990  SL_WARN(log_,
991  "Can't process consensus message digest: {}",
992  res.error().message());
993  }
994  return res;
995  },
996  [](const auto &) { return outcome::success(); });
997  if (res.has_error()) {
998  return;
999  }
1000  }
1001 
1002  // finally, broadcast the sealed block
1003  block_announce_transmitter_->blockAnnounce(
1005  SL_DEBUG(
1006  log_,
1007  "Announced block number {} in slot {} (epoch {}) with timestamp {}",
1008  block.header.number,
1009  current_slot_,
1010  babe_util_->slotToEpoch(current_slot_),
1011  now);
1012 
1013  last_finalized_block = block_tree_->getLastFinalized();
1014  auto current_best_block_res =
1015  block_tree_->getBestContaining(last_finalized_block.hash, std::nullopt);
1016  BOOST_ASSERT(current_best_block_res.has_value());
1017  const auto &current_best_block = current_best_block_res.value();
1018 
1019  // Create new offchain worker for block if it is best only
1020  if (current_best_block.number > previous_best_block.number) {
1021  auto ocw_res = offchain_worker_api_->offchain_worker(
1022  block.header.parent_hash, block.header);
1023  if (ocw_res.has_failure()) {
1024  log_->error("Can't spawn offchain worker for block {}: {}",
1025  block_info,
1026  ocw_res.error().message());
1027  }
1028  }
1029  }
1030 
1032  const EpochDescriptor &epoch,
1033  std::shared_ptr<const primitives::BabeConfiguration> babe_config) const {
1034  BOOST_ASSERT(keypair_ != nullptr);
1035 
1036  auto authority_index_res =
1037  getAuthorityIndex(babe_config->authorities, keypair_->public_key);
1038  if (not authority_index_res) {
1039  SL_CRITICAL(log_,
1040  "Block production failed: This node is not in the list of "
1041  "authorities. (public key: {})",
1042  keypair_->public_key);
1043  return;
1044  }
1045 
1046  auto threshold = calculateThreshold(babe_config->leadership_rate,
1047  babe_config->authorities,
1048  authority_index_res.value());
1049 
1050  lottery_->changeEpoch(epoch, babe_config->randomness, threshold, *keypair_);
1051  }
1052 
1054  SL_DEBUG(log_,
1055  "Epoch {} has finished. Start epoch {}",
1058 
1061 
1062  babe_util_->syncEpoch([&]() {
1063  auto res = block_tree_->getBlockHeader(primitives::BlockNumber(1));
1064  if (res.has_error()) {
1065  SL_WARN(log_,
1066  "First block slot is {}: no first block (at start next epoch)",
1067  babe_util_->getCurrentSlot());
1068  return std::tuple(babe_util_->getCurrentSlot(), false);
1069  }
1070 
1071  const auto &first_block_header = res.value();
1072  auto babe_digest_res = consensus::getBabeDigests(first_block_header);
1073  BOOST_ASSERT_MSG(babe_digest_res.has_value(),
1074  "Any non genesis block must contain babe digest");
1075  auto first_slot_number = babe_digest_res.value().second.slot_number;
1076 
1077  auto is_first_block_finalized =
1078  block_tree_->getLastFinalized().number > 0;
1079 
1080  SL_WARN(log_,
1081  "First block slot is {}: by {}finalized first block (at start "
1082  "next epoch)",
1083  first_slot_number,
1084  is_first_block_finalized ? "" : "non-");
1085  return std::tuple(first_slot_number, is_first_block_finalized);
1086  });
1087  }
1088 
1089 } // namespace kagome::consensus::babe
outcome::result< primitives::PreRuntime > babePreDigest(SlotType slot_type, std::optional< std::reference_wrapper< const crypto::VRFOutput >> output, primitives::AuthorityIndex authority_index) const
Definition: babe_impl.cpp:771
crypto::Sr25519PublicKey BabeSessionKey
Definition: session_key.hpp:17
static constexpr auto kMaxBlockSlotsOvertime
Definition: babe_impl.hpp:67
Class represents arbitrary (including empty) byte buffer.
Definition: buffer.hpp:29
telemetry::Telemetry telemetry_
Definition: babe_impl.hpp:207
uint32_t AuthorityIndex
Definition: authority.hpp:36
std::shared_ptr< TelemetryService > createTelemetryService()
Returns preliminary initialized instance of telemetry service.
Definition: service.cpp:69
std::shared_ptr< BabeUtil > babe_util_
Definition: babe_impl.hpp:182
std::shared_ptr< runtime::OffchainWorkerApi > offchain_worker_api_
Definition: babe_impl.hpp:186
application::AppConfiguration::SyncMethod SyncMethod
Definition: babe_impl.cpp:41
void runEpoch(EpochDescriptor epoch) override
Definition: babe_impl.cpp:311
outcome::result< std::pair< Seal, BabeBlockHeader > > getBabeDigests(const primitives::BlockHeader &block_header)
A secondary deterministic slot assignment.
std::shared_ptr< clock::SystemClock > clock_
Definition: babe_impl.hpp:175
Block class represents polkadot block primitive.
Definition: block.hpp:19
void onRemoteStatus(const libp2p::peer::PeerId &peer_id, const network::Status &status) override
Definition: babe_impl.cpp:338
A secondary deterministic slot assignment with VRF outputs.
outcome::result< EpochDescriptor > getInitialEpochDescriptor()
Definition: babe_impl.cpp:238
std::shared_ptr< authorship::Proposer > proposer_
Definition: babe_impl.hpp:170
outcome::result< primitives::Seal > sealBlock(const primitives::Block &block) const
Definition: babe_impl.cpp:803
std::shared_ptr< consensus::babe::BabeConfigRepository > babe_config_repo_
Definition: babe_impl.hpp:169
primitives::BlockInfo best_block_
Definition: babe_impl.hpp:200
std::shared_ptr< authority::AuthorityUpdateObserver > authority_update_observer_
Definition: babe_impl.hpp:180
primitives::BlockInfo best_block
Definition: status.hpp:35
boost::variant< std::nullopt_t, HeadsEventParams, RuntimeVersionEventParams, NewRuntimeEventParams > ChainEventParams
Definition: event_types.hpp:51
void processSlotLeadership(SlotType slot_type, std::optional< std::reference_wrapper< const crypto::VRFOutput >> output, primitives::AuthorityIndex authority_index)
Definition: babe_impl.cpp:825
Threshold calculateThreshold(const std::pair< uint64_t, uint64_t > &ratio, const primitives::AuthorityList &authorities, primitives::AuthorityIndex authority_index)
crypto::Sr25519Signature signature
Sig_sr25519(Blake2s(block_header))
Definition: seal.hpp:20
std::shared_ptr< network::BlockAnnounceTransmitter > block_announce_transmitter_
Definition: babe_impl.hpp:173
void startCatchUp(const libp2p::peer::PeerId &peer_id, const primitives::BlockInfo &target_block)
Definition: babe_impl.cpp:445
const application::AppConfiguration & app_config_
Definition: babe_impl.hpp:167
BabeSlotNumber start_slot
starting slot of the epoch
string version
Definition: conf.py:16
uint32_t BlockNumber
Definition: common.hpp:18
std::unique_ptr< clock::Timer > timer_
Definition: babe_impl.hpp:178
SLBuffer< std::numeric_limits< size_t >::max()> Buffer
Definition: buffer.hpp:244
std::shared_ptr< ChainSubscriptionEngine > ChainSubscriptionEnginePtr
libp2p::peer::PeerId PeerId
std::shared_ptr< primitives::events::ChainEventSubscriber > chain_sub_
Definition: babe_impl.hpp:184
primitives::BlockHeader header
State getCurrentState() const override
Definition: babe_impl.cpp:334
virtual void observe(const double value)=0
Observe the given amount.
const std::shared_ptr< crypto::Sr25519Keypair > & keypair_
Definition: babe_impl.hpp:174
metrics::Histogram * metric_block_proposal_time_
Definition: babe_impl.hpp:204
std::shared_ptr< runtime::Core > runtime_core_
Definition: babe_impl.hpp:187
void onBlockAnnounce(const libp2p::peer::PeerId &peer_id, const network::BlockAnnounce &announce) override
Definition: babe_impl.cpp:372
Block that was collated by this node.
std::shared_ptr< BabeLottery > lottery_
Definition: babe_impl.hpp:168
uint64_t BabeSlotNumber
slot number of the Babe production
Definition: common.hpp:24
virtual SyncMethod syncMethod() const =0
std::shared_ptr< babe::ConsistencyKeeper > consistency_keeper_
Definition: babe_impl.hpp:188
A primary VRF-based slot assignment.
BlockNumber number
index of the block in the chain
std::optional< uint64_t > getAuthorityIndex(const primitives::AuthorityList &authorities, const primitives::BabeSessionKey &authority_key)
Get index of authority.
Definition: babe_impl.cpp:225
void startStateSyncing(const libp2p::peer::PeerId &peer_id)
Definition: babe_impl.cpp:487
bool wasSynchronized() const override
Definition: babe_impl.cpp:584
BlockHeader header
block header
Definition: block.hpp:22
void changeLotteryEpoch(const EpochDescriptor &epoch, std::shared_ptr< const primitives::BabeConfiguration > babe_config) const
Definition: babe_impl.cpp:1031
detail::BlockInfoT< struct BlockInfoTag > BlockInfo
Definition: common.hpp:63
const auto kBabeEngineId
Definition: digest.hpp:25
Logger createLogger(const std::string &tag)
Definition: logger.cpp:112
std::shared_ptr< crypto::Hasher > hasher_
Definition: babe_impl.hpp:176
std::shared_ptr< crypto::Sr25519Provider > sr25519_provider_
Definition: babe_impl.hpp:177
outcome::result< common::Buffer > calculateOrderedTrieHash(const It &begin, const It &end)
std::shared_ptr< network::Synchronizer > synchronizer_
Definition: babe_impl.hpp:181
outcome::result< void > putData(InherentIdentifier identifier, const T &inherent)
std::shared_ptr< blockchain::BlockTree > block_tree_
Definition: babe_impl.hpp:171
metrics::RegistryPtr metrics_registry_
Definition: babe_impl.hpp:203
primitives::events::ChainSubscriptionEnginePtr chain_events_engine_
Definition: babe_impl.hpp:183