8 #include <boost/assert.hpp> 9 #include <boost/range/adaptor/transformed.hpp> 30 #include "scale/scale.hpp" 34 constexpr
const char *kBlockProposalTime =
35 "kagome_proposer_block_constructed";
45 std::shared_ptr<application::AppStateManager> app_state_manager,
46 std::shared_ptr<BabeLottery> lottery,
47 std::shared_ptr<consensus::babe::BabeConfigRepository> babe_config_repo,
48 std::shared_ptr<authorship::Proposer> proposer,
49 std::shared_ptr<blockchain::BlockTree> block_tree,
50 std::shared_ptr<network::BlockAnnounceTransmitter>
51 block_announce_transmitter,
52 std::shared_ptr<crypto::Sr25519Provider> sr25519_provider,
53 const std::shared_ptr<crypto::Sr25519Keypair> &keypair,
54 std::shared_ptr<clock::SystemClock> clock,
55 std::shared_ptr<crypto::Hasher> hasher,
56 std::unique_ptr<clock::Timer> timer,
57 std::shared_ptr<authority::AuthorityUpdateObserver>
58 authority_update_observer,
59 std::shared_ptr<network::Synchronizer> synchronizer,
60 std::shared_ptr<BabeUtil> babe_util,
62 std::shared_ptr<runtime::OffchainWorkerApi> offchain_worker_api,
63 std::shared_ptr<runtime::Core> core,
64 std::shared_ptr<babe::ConsistencyKeeper> consistency_keeper)
65 : app_config_(app_config),
66 lottery_{std::move(lottery)},
82 return std::make_shared<primitives::events::ChainEventSubscriber>(
107 BOOST_ASSERT(app_state_manager);
111 kBlockProposalTime,
"Time taken to construct new block");
114 {0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10});
116 app_state_manager->takeControl(*
this);
121 if (res.has_error()) {
123 "Can't get initial epoch descriptor: {}",
124 res.error().message());
132 chain_sub_->setCallback([wp = weak_from_this()](
139 if (
auto self = wp.lock()) {
143 boost::get<primitives::events::HeadsEventParams>(event).
get();
145 self->hasher_->blake2b_256(scale::encode(header).value());
147 auto version_res =
self->runtime_core_->version(hash);
148 if (version_res.has_value()) {
149 auto &
version = version_res.value();
150 if (not self->actual_runtime_version_.has_value()
151 ||
self->actual_runtime_version_ !=
version) {
152 self->actual_runtime_version_ =
version;
153 self->chain_events_engine_->notify(
155 kFinalizedRuntimeVersion,
170 SL_DEBUG(
log_,
"Babe is starting with syncing from block {}",
best_block_);
173 "Starting in epoch {} and slot {}",
180 if (babe_config ==
nullptr) {
183 "Can't obtain digest of epoch {} from block tree for block {}",
189 const auto &authorities = babe_config->authorities;
190 if (authorities.size() == 1
191 and authorities[0].id.id ==
keypair_->public_key) {
192 SL_INFO(
log_,
"Starting single validating node.");
229 for (
auto &authority : authorities) {
230 if (authority.id.id == authority_key) {
241 if (best_block.number == 0) {
248 return outcome::success(epoch_descriptor);
252 auto best_block_header_res =
block_tree_->getBlockHeader(best_block.hash);
253 BOOST_ASSERT_MSG(best_block_header_res.has_value(),
254 "Best block must be known whenever");
255 const auto &best_block_header = best_block_header_res.value();
257 BOOST_ASSERT_MSG(babe_digest_res.has_value(),
258 "Any non genesis block must contain babe digest");
259 auto last_slot_number = babe_digest_res.value().second.slot_number;
264 last_slot_number -
babe_util_->slotInEpoch(last_slot_number)};
266 return outcome::success(epoch_descriptor);
270 auto first_slot_number =
babe_util_->syncEpoch([&]() {
272 if (res.has_error()) {
274 "First block slot is {}: no first block (at adjusting)",
276 return std::tuple(
babe_util_->getCurrentSlot(),
false);
279 const auto &first_block_header = res.value();
281 BOOST_ASSERT_MSG(babe_digest_res.has_value(),
282 "Any non genesis block must contain babe digest");
283 auto first_slot_number = babe_digest_res.value().second.slot_number;
285 auto is_first_block_finalized =
290 "First block slot is {}: by {}finalized first block (at adjusting)",
292 is_first_block_finalized ?
"" :
"non-");
293 return std::tuple(first_slot_number, is_first_block_finalized);
296 auto current_epoch_start_slot =
302 "Start-slot of current epoch {} has updated from {} to {}",
305 current_epoch_start_slot);
312 bool already_active =
false;
313 if (not
active_.compare_exchange_strong(already_active,
true)) {
323 "Starting an epoch {}. Session key: {:l}. Secondary slots allowed={}",
327 ->isSecondarySlotsAllowed());
346 const auto &last_finalized_block =
block_tree_->getLastFinalized();
348 auto current_best_block_res =
349 block_tree_->getBestContaining(last_finalized_block.hash, std::nullopt);
350 BOOST_ASSERT(current_best_block_res.has_value());
351 const auto ¤t_best_block = current_best_block_res.value();
353 if (current_best_block == status.
best_block) {
380 const auto &last_finalized_block =
block_tree_->getLastFinalized();
382 auto current_best_block_res =
383 block_tree_->getBestContaining(last_finalized_block.hash, std::nullopt);
384 BOOST_ASSERT(current_best_block_res.has_value());
385 const auto ¤t_best_block = current_best_block_res.value();
388 if (announce.
header.
number < current_best_block.number) {
395 if (announce.
header.
number > current_best_block.number + 1) {
397 hasher_->blake2b_256(scale::encode(announce.
header).value());
410 [wp = weak_from_this(), announce = announce, peer_id](
411 outcome::result<primitives::BlockInfo> block_res)
mutable {
412 if (
auto self = wp.lock()) {
413 if (block_res.has_error()) {
416 const auto &block = block_res.value();
421 self->startStateSyncing(peer_id);
427 SL_INFO(self->log_,
"Catching up is finished on block {}", block);
429 self->was_synchronized_ =
true;
430 self->telemetry_->notifyWasSynchronized();
435 self->onSynchronized();
437 self->block_announce_transmitter_->blockAnnounce(
438 std::move(announce));
453 [wp = weak_from_this(), block = target_block, peer_id](
454 outcome::result<primitives::BlockInfo> res) {
455 if (
auto self = wp.lock()) {
456 if (res.has_error()) {
458 "Catching up {} to block {} is failed: {}",
461 res.error().message());
466 "Catching up {} to block {} is going; on block {} now",
476 log_,
"Catching up {} to block {} is ran", peer_id, target_block);
492 SL_WARN(
log_,
"Syncing of state can not be start: Bad state of babe");
499 auto block_at_state =
block_tree_->getLastFinalized();
502 "Rolling back non-finalized blocks. Last known finalized is {}",
512 for (
const auto &hash : block_tree_leaves) {
513 if (hash == block_at_state.hash)
continue;
515 auto header_res =
block_tree_->getBlockHeader(hash);
516 if (header_res.has_error()) {
518 "Can't get header of one of removing leave_block: {}",
519 header_res.error().message());
523 const auto &header = header_res.value();
526 if (header.number < block_at_state.number) {
538 "Trying to sync state on block {} from {}",
545 [wp = weak_from_this(), block_at_state, peer_id](
auto res)
mutable {
546 if (
auto self = wp.lock()) {
547 if (res.has_error()) {
549 "Syncing of state with {} on block {} is failed: {}",
552 res.error().message());
557 "State on block {} is synced successfully",
621 }
while (rewind_slots);
628 "Starting a slot {} in epoch {} (remains {:.2f} sec.)",
631 std::chrono::duration_cast<std::chrono::milliseconds>(
637 timer_->expiresAt(finish_time);
638 timer_->asyncWait([
this](
auto &&ec) {
640 log_->error(
"error happened while waiting on the timer: {}",
657 const auto header_res =
block_tree_->getBlockHeader(hash);
658 BOOST_ASSERT(header_res.has_value());
659 const auto &header = header_res.value();
661 if (babe_digests_res.has_value()) {
662 const auto &babe_digests = babe_digests_res.value();
663 auto best_block_slot = babe_digests.second.slot_number;
677 BOOST_ASSERT(babe_digests_res.has_value());
683 auto authority_index_res =
685 if (not authority_index_res) {
687 "Authority not known, skipping slot processing. " 688 "Probably authority list has changed.");
690 const auto &authority_index = authority_index_res.value();
698 if (slot_leadership.has_value()) {
699 const auto &vrf_result = slot_leadership.value();
701 "Babe author {} is leader (vrfOutput: {}, proof: {})",
708 }
else if (babe_config->allowed_slots
710 or babe_config->allowed_slots
712 auto expected_author =
714 babe_config->authorities.size(),
715 babe_config->randomness);
717 if (expected_author.has_value()
718 and authority_index == expected_author.value()) {
719 if (babe_config->allowed_slots
732 SL_ERROR(
log_,
"Can not get epoch; Skipping slot processing");
736 "Slot {} in epoch {} has finished",
751 "Slot {} in epoch {} will start after {:.2f} sec.",
754 std::chrono::duration_cast<std::chrono::milliseconds>(
760 timer_->expiresAt(start_time);
761 timer_->asyncWait([
this](
auto &&ec) {
763 log_->error(
"error happened while waiting on the timer: {}",
773 std::optional<std::reference_wrapper<const crypto::VRFOutput>> output,
777 .authority_index = authority_index,
781 if (babe_header.needVRFCheck()) {
782 if (not output.has_value()) {
785 "VRF proof is required to build block header but was not passed");
788 babe_header.vrf_output = output.value();
791 auto encoded_header_res = scale::encode(babe_header);
792 if (!encoded_header_res) {
794 "cannot encode BabeBlockHeader: {}",
795 encoded_header_res.error().message());
796 return encoded_header_res.error();
807 auto pre_seal_encoded_block = scale::encode(block.
header).value();
809 auto pre_seal_hash =
hasher_->blake2b_256(pre_seal_encoded_block);
818 log_,
"Error signing a block seal: {}", signature.error().message());
819 return signature.error();
827 std::optional<std::reference_wrapper<const crypto::VRFOutput>> output,
833 "Obtained {} slot leadership in slot {} epoch {}",
844 auto now = std::chrono::duration_cast<std::chrono::milliseconds>(
845 clock_->now().time_since_epoch())
850 SL_ERROR(
log_,
"cannot put an inherent data: {}", res.error().message());
856 SL_ERROR(
log_,
"cannot put an inherent data: {}", res);
866 auto best_block_header_res =
868 BOOST_ASSERT_MSG(best_block_header_res.has_value(),
869 "The best block is always known");
871 std::move(best_block_header_res.value());
876 SL_ERROR(
log_,
"cannot put an inherent data: {}", res);
880 auto proposal_start = std::chrono::high_resolution_clock::now();
882 auto babe_pre_digest_res =
884 if (not babe_pre_digest_res) {
886 "cannot propose a block: {}",
887 babe_pre_digest_res.error().message());
890 const auto &babe_pre_digest = babe_pre_digest_res.value();
893 auto pre_seal_block_res =
895 if (!pre_seal_block_res) {
897 "Cannot propose a block: {}",
898 pre_seal_block_res.error().message());
902 auto proposal_end = std::chrono::high_resolution_clock::now();
903 auto duration_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
904 proposal_end - proposal_start)
906 SL_DEBUG(
log_,
"Block has been built in {} ms", duration_ms);
911 auto block = pre_seal_block_res.value();
916 using boost::adaptors::transformed;
918 block.body | transformed([](
const auto &ext) {
921 return ext_root_res.has_value()
922 and (ext_root_res.value()
925 "Extrinsics root does not match extrinsics in the block");
931 log_,
"Failed to seal the block: {}", seal_res.error().message());
936 block.header.digest.emplace_back(seal_res.value());
942 "Block was not built in time. " 943 "Allowed slots ({}) have passed. " 944 "If you are executing in debug mode, consider to rebuild in " 950 const auto block_hash =
951 hasher_->blake2b_256(scale::encode(block.header).value());
954 auto last_finalized_block =
block_tree_->getLastFinalized();
955 auto previous_best_block_res =
956 block_tree_->getBestContaining(last_finalized_block.hash, std::nullopt);
957 BOOST_ASSERT(previous_best_block_res.has_value());
958 const auto &previous_best_block = previous_best_block_res.value();
961 if (
auto add_res =
block_tree_->addBlock(block); not add_res) {
963 "Could not add block {}: {}",
965 add_res.error().message());
966 auto removal_res =
block_tree_->removeLeaf(block_hash);
967 if (removal_res.has_error()
972 "Rolling back of block {} is failed: {}",
974 removal_res.error().message());
982 for (
auto &digest_item : block.header.digest) {
983 auto res = visit_in_place(
986 -> outcome::result<void> {
988 block_info, consensus_message);
989 if (res.has_error()) {
991 "Can't process consensus message digest: {}",
992 res.error().message());
996 [](
const auto &) {
return outcome::success(); });
997 if (res.has_error()) {
1007 "Announced block number {} in slot {} (epoch {}) with timestamp {}",
1008 block.header.number,
1013 last_finalized_block =
block_tree_->getLastFinalized();
1014 auto current_best_block_res =
1015 block_tree_->getBestContaining(last_finalized_block.hash, std::nullopt);
1016 BOOST_ASSERT(current_best_block_res.has_value());
1017 const auto ¤t_best_block = current_best_block_res.value();
1020 if (current_best_block.number > previous_best_block.number) {
1022 block.header.parent_hash, block.header);
1023 if (ocw_res.has_failure()) {
1024 log_->error(
"Can't spawn offchain worker for block {}: {}",
1026 ocw_res.error().message());
1033 std::shared_ptr<const primitives::BabeConfiguration> babe_config)
const {
1036 auto authority_index_res =
1038 if (not authority_index_res) {
1040 "Block production failed: This node is not in the list of " 1041 "authorities. (public key: {})",
1047 babe_config->authorities,
1048 authority_index_res.value());
1050 lottery_->changeEpoch(epoch, babe_config->randomness, threshold, *
keypair_);
1055 "Epoch {} has finished. Start epoch {}",
1064 if (res.has_error()) {
1066 "First block slot is {}: no first block (at start next epoch)",
1068 return std::tuple(
babe_util_->getCurrentSlot(),
false);
1071 const auto &first_block_header = res.value();
1073 BOOST_ASSERT_MSG(babe_digest_res.has_value(),
1074 "Any non genesis block must contain babe digest");
1075 auto first_slot_number = babe_digest_res.value().second.slot_number;
1077 auto is_first_block_finalized =
1081 "First block slot is {}: by {}finalized first block (at start " 1084 is_first_block_finalized ?
"" :
"non-");
1085 return std::tuple(first_slot_number, is_first_block_finalized);
outcome::result< primitives::PreRuntime > babePreDigest(SlotType slot_type, std::optional< std::reference_wrapper< const crypto::VRFOutput >> output, primitives::AuthorityIndex authority_index) const
crypto::Sr25519PublicKey BabeSessionKey
static constexpr auto kMaxBlockSlotsOvertime
Class represents arbitrary (including empty) byte buffer.
telemetry::Telemetry telemetry_
std::shared_ptr< TelemetryService > createTelemetryService()
Returns preliminary initialized instance of telemetry service.
std::shared_ptr< BabeUtil > babe_util_
std::shared_ptr< runtime::OffchainWorkerApi > offchain_worker_api_
application::AppConfiguration::SyncMethod SyncMethod
void runEpoch(EpochDescriptor epoch) override
outcome::result< std::pair< Seal, BabeBlockHeader > > getBabeDigests(const primitives::BlockHeader &block_header)
A secondary deterministic slot assignment.
std::shared_ptr< clock::SystemClock > clock_
Block class represents polkadot block primitive.
void onRemoteStatus(const libp2p::peer::PeerId &peer_id, const network::Status &status) override
BabeSlotNumber current_slot_
primitives::BlockHeader parent_header
A secondary deterministic slot assignment with VRF outputs.
outcome::result< EpochDescriptor > getInitialEpochDescriptor()
std::shared_ptr< authorship::Proposer > proposer_
outcome::result< primitives::Seal > sealBlock(const primitives::Block &block) const
std::shared_ptr< consensus::babe::BabeConfigRepository > babe_config_repo_
primitives::BlockInfo best_block_
std::shared_ptr< authority::AuthorityUpdateObserver > authority_update_observer_
primitives::BlockInfo best_block
boost::variant< std::nullopt_t, HeadsEventParams, RuntimeVersionEventParams, NewRuntimeEventParams > ChainEventParams
void processSlotLeadership(SlotType slot_type, std::optional< std::reference_wrapper< const crypto::VRFOutput >> output, primitives::AuthorityIndex authority_index)
Threshold calculateThreshold(const std::pair< uint64_t, uint64_t > &ratio, const primitives::AuthorityList &authorities, primitives::AuthorityIndex authority_index)
crypto::Sr25519Signature signature
Sig_sr25519(Blake2s(block_header))
std::shared_ptr< network::BlockAnnounceTransmitter > block_announce_transmitter_
void startCatchUp(const libp2p::peer::PeerId &peer_id, const primitives::BlockInfo &target_block)
const application::AppConfiguration & app_config_
BabeSlotNumber start_slot
starting slot of the epoch
std::unique_ptr< clock::Timer > timer_
SLBuffer< std::numeric_limits< size_t >::max()> Buffer
std::shared_ptr< ChainSubscriptionEngine > ChainSubscriptionEnginePtr
libp2p::peer::PeerId PeerId
std::shared_ptr< primitives::events::ChainEventSubscriber > chain_sub_
primitives::BlockHeader header
EpochDescriptor current_epoch_
State getCurrentState() const override
virtual void observe(const double value)=0
Observe the given amount.
const std::shared_ptr< crypto::Sr25519Keypair > & keypair_
metrics::Histogram * metric_block_proposal_time_
std::shared_ptr< runtime::Core > runtime_core_
void onBlockAnnounce(const libp2p::peer::PeerId &peer_id, const network::BlockAnnounce &announce) override
Block that was collated by this node.
std::shared_ptr< BabeLottery > lottery_
uint64_t BabeSlotNumber
slot number of the Babe production
virtual SyncMethod syncMethod() const =0
std::shared_ptr< babe::ConsistencyKeeper > consistency_keeper_
void adjustEpochDescriptor()
A primary VRF-based slot assignment.
std::optional< uint64_t > getAuthorityIndex(const primitives::AuthorityList &authorities, const primitives::BabeSessionKey &authority_key)
Get index of authority.
void startStateSyncing(const libp2p::peer::PeerId &peer_id)
bool wasSynchronized() const override
BlockHeader header
block header
uint32_t SubscriptionSetId
void changeLotteryEpoch(const EpochDescriptor &epoch, std::shared_ptr< const primitives::BabeConfiguration > babe_config) const
detail::BlockInfoT< struct BlockInfoTag > BlockInfo
Logger createLogger(const std::string &tag)
std::shared_ptr< crypto::Hasher > hasher_
std::shared_ptr< crypto::Sr25519Provider > sr25519_provider_
outcome::result< common::Buffer > calculateOrderedTrieHash(const It &begin, const It &end)
std::shared_ptr< network::Synchronizer > synchronizer_
outcome::result< void > putData(InherentIdentifier identifier, const T &inherent)
void onSynchronized() override
std::shared_ptr< blockchain::BlockTree > block_tree_
metrics::RegistryPtr metrics_registry_
primitives::events::ChainSubscriptionEnginePtr chain_events_engine_