9 #include <unordered_set> 11 #include <boost/range/adaptor/reversed.hpp> 12 #include <scale/scale.hpp> 35 std::shared_ptr<application::AppStateManager> app_state_manager,
36 std::shared_ptr<blockchain::BlockTree> block_tree,
37 std::shared_ptr<storage::trie::TrieStorage> trie_storage,
38 std::shared_ptr<runtime::GrandpaApi> grandpa_api,
39 std::shared_ptr<crypto::Hasher> hasher,
40 std::shared_ptr<storage::BufferStorage> persistent_storage,
41 std::shared_ptr<blockchain::BlockHeaderRepository> header_repo)
42 : config_{std::move(config)},
53 BOOST_ASSERT(
hasher_ !=
nullptr);
57 BOOST_ASSERT(app_state_manager !=
nullptr);
58 app_state_manager->atPrepare([&] {
return prepare(); });
69 std::stack<ConsensusMessages> collected;
71 std::unordered_set<primitives::BlockHash> observed;
72 for (
auto &leaf : block_tree.
getLeaves()) {
73 for (
auto hash = leaf;;) {
74 if (hash == finalized_block_hash) {
79 if (not observed.emplace(hash).second) {
87 for (
auto &digest_item : boost::adaptors::reverse(header.digest)) {
98 hash = header.parent_hash;
108 std::optional<AuthoritySetId> set_id_opt;
109 auto current_set_id_keypart =
110 hasher.
twox_128(Buffer::fromString(
"CurrentSetId"));
111 for (
auto prefix : {
"GrandpaFinality",
"Grandpa"}) {
112 auto prefix_key_part = hasher.
twox_128(Buffer::fromString(prefix));
114 Buffer().
put(prefix_key_part).
put(current_set_id_keypart);
116 OUTCOME_TRY(val_opt, trie_batch.
tryGet(set_id_key));
117 if (val_opt.has_value()) {
118 auto &val = val_opt.value();
119 set_id_opt.
emplace(scale::decode<AuthoritySetId>(val.get()).value());
129 outcome::result<std::optional<std::unique_ptr<ScheduleNode>>>
131 OUTCOME_TRY(opt_root, storage.
tryLoad(kScheduleGraphRootKey));
132 if (!opt_root)
return std::nullopt;
133 auto &encoded_root = opt_root.value();
134 OUTCOME_TRY(root, scale::decode<ScheduleNode>(encoded_root));
135 return std::make_unique<ScheduleNode>(std::move(root));
140 OUTCOME_TRY(enc_root, scale::encode(root));
141 OUTCOME_TRY(storage.
put(kScheduleGraphRootKey,
143 return outcome::success();
148 OUTCOME_TRY(storage.
remove(kScheduleGraphRootKey));
149 return outcome::success();
161 outcome::result<primitives::BlockInfo>
163 std::stack<ConsensusMessages> &collected_msgs,
167 bool found_set_change =
false;
168 bool is_unapplied_change =
false;
170 for (
auto hash = finalized_block.
hash; !found_set_change;) {
174 log,
"Failed to obtain the last finalized block header {}", hash);
176 OUTCOME_TRY(header, header_res);
178 if (header.number == 0) {
179 found_set_change =
true;
182 for (
auto &digest_item : boost::adaptors::reverse(header.digest)) {
188 if (not is_grandpa) {
192 auto decoded_res = consensus_message.
decode();
193 if (decoded_res.has_error()) {
194 log->critical(
"Error decoding consensus message: {}",
195 decoded_res.error().message());
197 auto &grandpa_digest = decoded_res.value().asGrandpaDigest();
199 auto scheduled_change =
200 boost::get<primitives::ScheduledChange>(&grandpa_digest);
201 if (scheduled_change !=
nullptr) {
202 found_set_change =
true;
203 is_unapplied_change =
204 header.number + scheduled_change->subchain_length
205 >= finalized_block.
number;
206 if (is_unapplied_change) {
215 boost::get<primitives::ForcedChange>(&grandpa_digest);
216 if (forced_change !=
nullptr) {
217 found_set_change =
true;
218 is_unapplied_change =
219 header.number + forced_change->subchain_length
220 >= finalized_block.
number;
221 if (is_unapplied_change) {
229 [](
const auto &...) {});
233 if (found_set_change) {
235 header.number - 1, header.parent_hash)
240 hash = header.parent_hash;
243 BOOST_UNREACHABLE_RETURN({})
249 const auto finalized_block =
block_tree_->getLastFinalized();
253 "Error initializing authority manager: {}",
254 res.error().message());
256 return res.has_value();
261 OUTCOME_TRY(collected_msgs,
264 OUTCOME_TRY(graph_root_block,
268 OUTCOME_TRY(root_header,
269 block_tree_->getBlockHeader(graph_root_block.hash));
272 auto set_id_from_runtime_opt = set_id_from_runtime_res.has_value()
273 ? set_id_from_runtime_res.value()
277 auto last_finalized_block =
block_tree_->getLastFinalized();
280 && opt_root.value()->current_block.number
281 <= last_finalized_block.number) {
285 if (set_id_from_runtime_opt.has_value()
286 && opt_root.value()->current_authorities->id
287 == set_id_from_runtime_opt.value() - 1) {
288 auto &authority_list =
289 opt_root.value()->current_authorities->authorities;
290 opt_root.value()->current_authorities =
291 std::make_shared<primitives::AuthoritySet>(
292 set_id_from_runtime_opt.value(), authority_list);
295 root_ = std::move(opt_root.value());
297 "Fetched authority set graph root from database with id {}",
298 root_->current_authorities->id);
300 }
else if (last_finalized_block.number == 0) {
301 auto &genesis_hash =
block_tree_->getGenesisBlockHash();
302 OUTCOME_TRY(initial_authorities,
grandpa_api_->authorities(genesis_hash));
304 std::make_shared<primitives::AuthoritySet>(
305 0, std::move(initial_authorities)),
307 }
else if (set_id_from_runtime_res.has_value()
308 && set_id_from_runtime_opt.has_value()) {
311 "Storage does not contain valid info about the root authority set; " 312 "Fall back to obtaining it from the runtime storage (which may " 313 "fail after a forced authority change happened on chain)");
317 auto authority_set = std::make_shared<primitives::AuthoritySet>(
318 set_id_from_runtime_opt.value(), std::move(
authorities));
324 "Create authority set graph root with id {}, taken from runtime " 326 root_->current_authorities->id);
330 "Failed to initialize authority manager; Try running recovery mode");
331 return set_id_from_runtime_res.as_failure();
334 while (not collected_msgs.empty()) {
335 const auto &args = collected_msgs.top();
336 OUTCOME_TRY(
onConsensus(args.block, args.message));
338 collected_msgs.pop();
345 "Current grandpa authority set (id={}):",
346 root_->current_authorities->id);
348 for (
const auto &authority : *
root_->current_authorities) {
350 "{}/{}: id={} weight={}",
352 root_->current_authorities->authorities.size(),
357 return outcome::success();
360 outcome::result<std::optional<AuthoritySetId>>
366 if (batch_res.has_error()) {
369 "Failed to fetch set id from trie storage: state {} is not in " 374 return batch_res.as_failure();
380 set_id = set_id_res.value();
382 auto &batch = batch_res.value();
387 if (set_id_opt)
return set_id_opt.value();
390 "Failed to read authority set id from runtime (attempted both " 391 "GrandpaApi_current_set_id and trie storage)");
399 auto genesis_hash =
block_tree_->getGenesisBlockHash();
401 OUTCOME_TRY(initial_authorities,
grandpa_api_->authorities(genesis_hash));
405 std::make_shared<primitives::AuthoritySet>(0, initial_authorities),
408 "Recovering authority manager state... (might take a few minutes)");
412 "Can't recalculate authority set id on a prune state, fall" 413 " back to fetching from runtime");
417 auto start = std::chrono::steady_clock::now();
420 OUTCOME_TRY(header,
header_repo_->getBlockHeader(number));
421 OUTCOME_TRY(hash,
header_repo_->getHashByNumber(number));
424 for (
auto &msg : header.digest) {
425 if (
auto consensus_msg = boost::get<primitives::Consensus>(&msg);
426 consensus_msg !=
nullptr) {
430 auto justification_res =
block_tree_->getBlockJustification(hash);
431 if (justification_res.has_value())
prune(info);
432 auto end = std::chrono::steady_clock::now();
433 auto duration = end - start;
439 "Processed {} out of {} blocks",
441 last_finalized_number);
445 return outcome::success();
450 log_->critical(
"Authority manager has null root");
453 return root_->current_block;
456 std::optional<std::shared_ptr<const primitives::AuthoritySet>>
461 if (node ==
nullptr) {
466 node->current_block == target_block
468 : node->current_block.number
472 node->makeDescendant(target_block, node_in_finalized_chain);
474 if (adjusted_node->enabled) {
477 "Pick authority set with id {} for block {}",
478 adjusted_node->current_authorities->id,
480 for (
auto &authority : adjusted_node->current_authorities->authorities) {
481 SL_TRACE(
log_,
"Authority {}: {}", authority.id.id, authority.weight);
483 return adjusted_node->current_authorities;
487 auto authorities = std::make_shared<primitives::AuthoritySet>(
488 *adjusted_node->current_authorities);
491 [](
auto &authority) { authority.weight = 0; });
500 "Applying scheduled change on block {} to activate at block {}",
507 if (not ancestor_node) {
512 "Authorities for block {} found on block {} with set id {}",
514 ancestor_node->current_block,
515 ancestor_node->current_authorities->id);
517 auto schedule_change = [&](
const std::shared_ptr<ScheduleNode> &node)
518 -> outcome::result<void> {
519 auto new_authorities = std::make_shared<primitives::AuthoritySet>(
527 "Authority set change is scheduled after block #{} (set id={})",
529 new_authorities->id);
532 for (
auto &authority : *new_authorities) {
534 "New authority ({}/{}): id={} weight={}",
536 new_authorities->authorities.size(),
541 return outcome::success();
546 ancestor_node->current_block ==
block_tree_->getLastFinalized()
552 std::optional<std::shared_ptr<const primitives::AuthoritySet>> maybe_set =
554 if (not is_ancestor_node_finalized) {
555 std::shared_ptr<const ScheduleNode> last_node = ancestor_node;
556 while (last_node and last_node !=
root_) {
557 if (
const auto *action =
558 boost::get<ScheduleNode::ScheduledChange>(&last_node->action);
560 if (block.
number <= action->applied_block) {
563 return outcome::success();
566 if (action->new_authorities->id
567 > ancestor_node->current_authorities->id) {
568 maybe_set = action->new_authorities;
573 last_node = last_node->parent.lock();
577 if (ancestor_node->current_block == block) {
578 if (maybe_set.has_value()) {
579 ancestor_node->current_authorities = maybe_set.value();
581 ancestor_node->adjust(is_ancestor_node_finalized);
584 OUTCOME_TRY(schedule_change(ancestor_node));
587 auto new_node = ancestor_node->makeDescendant(block,
true);
590 if (maybe_set.has_value()) {
591 new_node->current_authorities = maybe_set.value();
595 "Make a schedule node for block {}, with actual set id {}",
597 new_node->current_authorities->id);
600 OUTCOME_TRY(schedule_change(new_node));
609 return outcome::success();
618 "Applying forced change (delay start: {}, delay: {}) on block {} " 619 "to activate at block {}",
623 delay_start + delay);
624 auto delay_start_hash_res =
header_repo_->getHashByNumber(delay_start);
625 if (delay_start_hash_res.has_error()) {
626 SL_ERROR(
log_,
"Failed to obtain hash by number {}", delay_start);
628 OUTCOME_TRY(delay_start_hash, delay_start_hash_res);
632 if (not ancestor_node) {
637 "Found previous authority change at block {} with set id {}",
638 ancestor_node->current_block,
639 ancestor_node->current_authorities->id);
641 auto force_change = [&](
const std::shared_ptr<ScheduleNode> &node)
642 -> outcome::result<void> {
643 auto new_authorities = std::make_shared<primitives::AuthoritySet>(
647 if (node->current_block.number >= delay_start + delay) {
648 node->current_authorities = new_authorities;
650 "Change has been forced on block #{} (set id={})",
652 node->current_authorities->id);
657 "Change will be forced on block #{} (set id={})",
659 new_authorities->id);
663 for (
auto &authority : *new_authorities) {
665 "New authority ({}/{}): id={} weight={}",
667 new_authorities->authorities.size(),
672 return outcome::success();
676 ancestor_node->makeDescendant({delay_start, delay_start_hash},
true);
678 OUTCOME_TRY(force_change(new_node));
681 ancestor_node->descendants.clear();
682 ancestor_node->descendants.push_back(new_node);
683 new_node->descendants.clear();
685 return outcome::success();
691 SL_TRACE(
log_,
"Ignore 'on disabled' message due to config");
692 return outcome::success();
694 SL_DEBUG(
log_,
"Applying disable authority on block {}", block);
702 auto disable_authority = [&](
const std::shared_ptr<ScheduleNode> &node)
703 -> outcome::result<void> {
705 auto new_authority_set = std::make_shared<primitives::AuthoritySet>(
706 *node->current_authorities);
709 if (authority_index >= node->current_authorities->authorities.size()) {
713 new_authority_set->authorities[authority_index].weight = 0;
714 node->current_authorities = std::move(new_authority_set);
718 "Authority id={} (index={} in set id={}) is disabled on block #{}",
719 node->current_authorities->authorities[authority_index].id.id,
721 node->current_authorities->id,
722 node->current_block.number);
724 return outcome::success();
728 node->current_block.number <=
block_tree_->getLastFinalized().number;
730 if (node->current_block == block) {
731 node->adjust(node_in_finalized_chain);
732 OUTCOME_TRY(disable_authority(node));
734 auto new_node = node->makeDescendant(block, node_in_finalized_chain);
736 OUTCOME_TRY(disable_authority(new_node));
739 auto descendants = std::move(node->descendants);
740 for (
auto &descendant : descendants) {
743 if (descendant->current_authorities == node->current_authorities) {
744 descendant->current_authorities = new_node->current_authorities;
746 new_node->descendants.emplace_back(std::move(descendant));
748 node->descendants.emplace_back(std::move(descendant));
751 node->descendants.emplace_back(std::move(new_node));
754 return outcome::success();
759 SL_DEBUG(
log_,
"Applying pause on block {}", block);
768 node->current_block.number <=
block_tree_->getLastFinalized().number;
770 auto new_node = node->makeDescendant(block, node_in_finalized_chain);
775 "Scheduled pause after block #{}",
776 new_node->current_block.number);
779 auto descendants = std::move(node->descendants);
780 for (
auto &descendant : descendants) {
782 block.
number <= descendant->current_block.number ? new_node : node;
783 ancestor->descendants.emplace_back(std::move(descendant));
785 node->descendants.emplace_back(std::move(new_node));
787 return outcome::success();
799 node->current_block.number <=
block_tree_->getLastFinalized().number;
801 auto new_node = node->makeDescendant(block, node_in_finalized_chain);
806 "Resuming will be done at block #{}",
807 new_node->current_block.number);
812 return outcome::success();
820 "Apply consensus message from block {}, engine {}",
824 OUTCOME_TRY(decoded, message.
decode());
825 return visit_in_place(
826 decoded.asGrandpaDigest(),
830 block, msg.authorities, block.
number + msg.subchain_length);
834 block, msg.authorities, msg.delay_start, msg.subchain_length);
837 SL_DEBUG(
log_,
"OnDisabled {}", msg.authority_index);
841 SL_DEBUG(
log_,
"Pause {}", msg.subchain_length);
845 SL_DEBUG(
log_,
"Resume {}", msg.subchain_length);
857 return outcome::success();
861 "Unknown consensus engine id in block {}: {}",
864 return outcome::success();
869 if (block ==
root_->current_block) {
883 if (node->current_block == block) {
885 root_ = std::move(node);
889 auto new_node = node->makeDescendant(block,
true);
890 auto descendants = std::move(node->descendants);
891 for (
auto &descendant : descendants) {
893 new_node->descendants.emplace_back(std::move(descendant));
897 root_ = std::move(new_node);
901 SL_DEBUG(
log_,
"Prune authority manager upto block {}", block);
906 BOOST_ASSERT(
root_ !=
nullptr);
910 || (
root_->current_block != block
914 std::shared_ptr<ScheduleNode> ancestor =
root_;
915 while (ancestor->current_block != block) {
916 bool goto_next_generation =
false;
917 for (
const auto &node : ancestor->descendants) {
918 if (node->current_block == block) {
923 goto_next_generation =
true;
927 if (not goto_next_generation) {
938 "Looking if direct chain exists between {} and {}",
954 std::shared_ptr<ScheduleNode> node,
955 std::shared_ptr<ScheduleNode> new_node) {
956 auto descendants = std::move(node->descendants);
957 for (
auto &descendant : descendants) {
959 new_node->current_block.number < descendant->current_block.number
964 if (
auto *forced_change =
965 boost::get<ScheduleNode::ForcedChange>(&ancestor->action)) {
966 if (descendant->current_block.number
967 >= forced_change->delay_start + forced_change->delay_length) {
968 descendant->current_authorities = forced_change->new_authorities;
972 if (
auto *resume = boost::get<ScheduleNode::Resume>(&ancestor->action)) {
973 if (descendant->current_block.number >= resume->applied_block) {
974 descendant->enabled =
true;
979 ancestor->descendants.emplace_back(std::move(descendant));
981 node->descendants.emplace_back(std::move(new_node));
987 if (ancestor ==
nullptr) {
988 SL_TRACE(
log_,
"No scheduled changes on block {}: no ancestor", block);
992 if (ancestor ==
root_) {
995 "Can't cancel scheduled changes on block {}: it is root",
1000 if (ancestor->current_block == block) {
1001 ancestor = std::const_pointer_cast<
ScheduleNode>(ancestor->parent.lock());
1004 auto it = std::find_if(ancestor->descendants.begin(),
1005 ancestor->descendants.end(),
1006 [&block](std::shared_ptr<ScheduleNode> node) {
1007 return node->current_block == block;
1010 if (it != ancestor->descendants.end()) {
1011 if (not(*it)->descendants.empty()) {
1013 SL_TRACE(
log_,
"No scheduled changes on block {}: not found", block);
1017 SL_DEBUG(
log_,
"Scheduled changes on block {} has removed", block);
1018 ancestor->descendants.erase(it);
static const common::Buffer kScheduleGraphRootKey
std::shared_ptr< storage::trie::TrieStorage > trie_storage_
outcome::result< std::stack< ConsensusMessages > > collectMsgsFromNonFinalBlocks(blockchain::BlockTree const &block_tree, primitives::BlockHash const &finalized_block_hash)
#define KAGOME_PROFILE_END(scope)
Class represents arbitrary (including empty) byte buffer.
std::shared_ptr< ScheduleNode > root_
primitives::Consensus message
void cancel(const primitives::BlockInfo &block) override
Cancel changes. Should be called when the block is rolled back.
virtual outcome::result< void > remove(const K &key)=0
Remove value by key.
virtual outcome::result< primitives::BlockHeader > getBlockHeader(const primitives::BlockId &block) const =0
std::shared_ptr< ScheduleNode > getAppropriateAncestor(const primitives::BlockInfo &block) const
Find schedule_node according to the block.
AuthorityManagerImpl(Config config, std::shared_ptr< application::AppStateManager > app_state_manager, std::shared_ptr< blockchain::BlockTree > block_tree, std::shared_ptr< storage::trie::TrieStorage > trie_storage, std::shared_ptr< runtime::GrandpaApi > grandpa_api, std::shared_ptr< crypto::Hasher > hash, std::shared_ptr< storage::BufferStorage > persistent_storage, std::shared_ptr< blockchain::BlockHeaderRepository > header_repo)
ConsensusEngineId consensus_engine_id
const auto kUnsupportedEngineId_POL1
outcome::result< std::optional< std::unique_ptr< ScheduleNode > > > fetchScheduleGraphRoot(storage::BufferStorage const &storage)
const auto kUnsupportedEngineId_BEEF
outcome::result< void > applyPause(const primitives::BlockInfo &block, primitives::BlockNumber activate_at) override
A signal to pause the current authority set after the given delay, is a block finalized by the finali...
outcome::result< void > onConsensus(const primitives::BlockInfo &block, const primitives::Consensus &message) override
outcome::result< void > applyForcedChange(const primitives::BlockInfo &block, const primitives::AuthorityList &authorities, primitives::BlockNumber delay_start, size_t delay) override
Force an authority set change after the given delay of N blocks, after next one would be imported blo...
outcome::result< BlockHash > calculateBlockHash(BlockHeader const &header, crypto::Hasher const &hasher)
virtual Hash128 twox_128(gsl::span< const uint8_t > buffer) const =0
twox_128 calculates 16-byte twox hash
outcome::result< void > applyScheduledChange(const primitives::BlockInfo &block, const primitives::AuthorityList &authorities, primitives::BlockNumber activate_at) override
Schedule an authority set change after the given delay of N blocks, after next one would be finalized...
std::string toString() const noexcept
outcome::result< void > applyOnDisabled(const primitives::BlockInfo &block, uint64_t authority_index) override
An index of the individual authority in the current authority list that should be immediately disable...
SLBuffer< std::numeric_limits< size_t >::max()> Buffer
SLBuffer & put(std::string_view view)
Put a string into byte buffer.
outcome::result< std::optional< primitives::AuthoritySetId > > readSetIdFromRuntime(primitives::BlockHeader const &targetBlock) const
std::shared_ptr< soralog::Logger > Logger
outcome::result< std::optional< AuthoritySetId > > fetchSetIdFromTrieStorage(storage::trie::TrieBatch const &trie_batch, crypto::Hasher const &hasher, storage::trie::RootHash const &state)
outcome::result< primitives::BlockInfo > collectConsensusMsgsUntilNearestSetChangeTo(std::stack< ConsensusMessages > &collected_msgs, const primitives::BlockInfo &finalized_block, const blockchain::BlockTree &block_tree, log::Logger &log)
primitives::BlockInfo block
outcome::result< void > storeScheduleGraphRoot(storage::BufferStorage &storage, ScheduleNode const &root)
outcome::result< void > initializeAt(const primitives::BlockInfo &root_block)
std::shared_ptr< crypto::Hasher > hasher_
void prune(const primitives::BlockInfo &block) override
Prunes data which was needed only till {.
virtual std::vector< primitives::BlockHash > getLeaves() const =0
void reorganize(std::shared_ptr< ScheduleNode > node, std::shared_ptr< ScheduleNode > new_node)
primitives::BlockInfo base() const override
static SLBuffer fromString(const std::string_view &src)
stores content of a string to byte array
virtual outcome::result< void > put(const K &key, const V &value)=0
Store value by key.
bool directChainExists(const primitives::BlockInfo &ancestor, const primitives::BlockInfo &descendant) const
Check if one block is direct ancestor of second one.
static std::shared_ptr< ScheduleNode > createAsRoot(std::shared_ptr< const primitives::AuthoritySet > current_authorities, primitives::BlockInfo block)
std::shared_ptr< runtime::GrandpaApi > grandpa_api_
Base::iterator emplace(Iter pos, Args &&...args)
#define KAGOME_PROFILE_START(scope)
const auto kGrandpaEngineId
detail::BlockInfoT< struct BlockInfoTag > BlockInfo
std::shared_ptr< storage::BufferStorage > persistent_storage_
virtual outcome::result< std::optional< ConstValueView > > tryGet(const Key &key) const =0
Get value by key.
Logger createLogger(const std::string &tag)
std::shared_ptr< const blockchain::BlockTree > block_tree_
std::shared_ptr< blockchain::BlockHeaderRepository > header_repo_
outcome::result< DecodedConsensusMessage > decode() const
virtual outcome::result< std::optional< V > > tryLoad(const Key &key) const =0
Load value by key.
outcome::result< void > clearScheduleGraphRoot(storage::BufferStorage &storage)
std::optional< std::shared_ptr< const primitives::AuthoritySet > > authorities(const primitives::BlockInfo &target_block, IsBlockFinalized finalized) const override
Returns authorities according specified block.
outcome::result< void > recalculateStoredState(primitives::BlockNumber last_finalized_number) override
outcome::result< void > applyResume(const primitives::BlockInfo &block, primitives::BlockNumber activate_at) override
A signal to resume the current authority set after the given delay, is an imported block and validate...
Node of scheduler tree. Contains actual authorities for the accorded block and all its descendant blo...