Kagome
Polkadot Runtime Engine in C++17
authority_manager_impl.cpp
Go to the documentation of this file.
1 
7 
8 #include <stack>
9 #include <unordered_set>
10 
11 #include <boost/range/adaptor/reversed.hpp>
12 #include <scale/scale.hpp>
13 
18 #include "common/visitor.hpp"
22 #include "crypto/hasher.hpp"
23 #include "log/profiling_logger.hpp"
27 
30 
31 namespace kagome::authority {
32 
34  Config config,
35  std::shared_ptr<application::AppStateManager> app_state_manager,
36  std::shared_ptr<blockchain::BlockTree> block_tree,
37  std::shared_ptr<storage::trie::TrieStorage> trie_storage,
38  std::shared_ptr<runtime::GrandpaApi> grandpa_api,
39  std::shared_ptr<crypto::Hasher> hasher,
40  std::shared_ptr<storage::BufferStorage> persistent_storage,
41  std::shared_ptr<blockchain::BlockHeaderRepository> header_repo)
42  : config_{std::move(config)},
43  block_tree_(std::move(block_tree)),
44  trie_storage_(std::move(trie_storage)),
45  grandpa_api_(std::move(grandpa_api)),
46  hasher_(std::move(hasher)),
47  persistent_storage_{std::move(persistent_storage)},
48  header_repo_{std::move(header_repo)},
49  log_{log::createLogger("AuthorityManager", "authority")} {
50  BOOST_ASSERT(block_tree_ != nullptr);
51  BOOST_ASSERT(grandpa_api_ != nullptr);
52  BOOST_ASSERT(trie_storage_ != nullptr);
53  BOOST_ASSERT(hasher_ != nullptr);
54  BOOST_ASSERT(persistent_storage_ != nullptr);
55  BOOST_ASSERT(header_repo_ != nullptr);
56 
57  BOOST_ASSERT(app_state_manager != nullptr);
58  app_state_manager->atPrepare([&] { return prepare(); });
59  }
60 
64  };
65 
66  outcome::result<std::stack<ConsensusMessages>> collectMsgsFromNonFinalBlocks(
67  blockchain::BlockTree const &block_tree,
68  primitives::BlockHash const &finalized_block_hash) {
69  std::stack<ConsensusMessages> collected;
70 
71  std::unordered_set<primitives::BlockHash> observed;
72  for (auto &leaf : block_tree.getLeaves()) {
73  for (auto hash = leaf;;) {
74  if (hash == finalized_block_hash) {
75  break;
76  }
77 
78  // if we already checked this block (and thus the rest of the branch)
79  if (not observed.emplace(hash).second) {
80  break;
81  }
82 
83  // just obtained from block tree
84  auto header = block_tree.getBlockHeader(hash).value();
85 
86  // observe possible changes of authorities
87  for (auto &digest_item : boost::adaptors::reverse(header.digest)) {
88  visit_in_place(
89  digest_item,
90  [&](const primitives::Consensus &consensus_message) {
91  collected.emplace(ConsensusMessages{
92  primitives::BlockInfo(header.number, hash),
93  consensus_message});
94  },
95  [](const auto &) {});
96  }
97 
98  hash = header.parent_hash;
99  }
100  }
101  return collected;
102  }
103 
104  outcome::result<std::optional<AuthoritySetId>> fetchSetIdFromTrieStorage(
105  storage::trie::TrieBatch const &trie_batch,
106  crypto::Hasher const &hasher,
107  storage::trie::RootHash const &state) {
108  std::optional<AuthoritySetId> set_id_opt;
109  auto current_set_id_keypart =
110  hasher.twox_128(Buffer::fromString("CurrentSetId"));
111  for (auto prefix : {"GrandpaFinality", "Grandpa"}) {
112  auto prefix_key_part = hasher.twox_128(Buffer::fromString(prefix));
113  auto set_id_key =
114  Buffer().put(prefix_key_part).put(current_set_id_keypart);
115 
116  OUTCOME_TRY(val_opt, trie_batch.tryGet(set_id_key));
117  if (val_opt.has_value()) {
118  auto &val = val_opt.value();
119  set_id_opt.emplace(scale::decode<AuthoritySetId>(val.get()).value());
120  break;
121  }
122  }
123  return set_id_opt;
124  }
125 
127  common::Buffer::fromString(":authority_manager:schedule_graph_root");
128 
129  outcome::result<std::optional<std::unique_ptr<ScheduleNode>>>
131  OUTCOME_TRY(opt_root, storage.tryLoad(kScheduleGraphRootKey));
132  if (!opt_root) return std::nullopt;
133  auto &encoded_root = opt_root.value();
134  OUTCOME_TRY(root, scale::decode<ScheduleNode>(encoded_root));
135  return std::make_unique<ScheduleNode>(std::move(root));
136  }
137 
138  outcome::result<void> storeScheduleGraphRoot(storage::BufferStorage &storage,
139  ScheduleNode const &root) {
140  OUTCOME_TRY(enc_root, scale::encode(root));
141  OUTCOME_TRY(storage.put(kScheduleGraphRootKey,
142  common::Buffer{std::move(enc_root)}));
143  return outcome::success();
144  }
145 
146  outcome::result<void> clearScheduleGraphRoot(
147  storage::BufferStorage &storage) {
148  OUTCOME_TRY(storage.remove(kScheduleGraphRootKey));
149  return outcome::success();
150  }
151 
161  outcome::result<primitives::BlockInfo>
163  std::stack<ConsensusMessages> &collected_msgs,
164  const primitives::BlockInfo &finalized_block,
165  const blockchain::BlockTree &block_tree,
166  log::Logger &log) {
167  bool found_set_change = false;
168  bool is_unapplied_change = false;
169 
170  for (auto hash = finalized_block.hash; !found_set_change;) {
171  auto header_res = block_tree.getBlockHeader(hash);
172  if (!header_res) {
173  SL_ERROR(
174  log, "Failed to obtain the last finalized block header {}", hash);
175  }
176  OUTCOME_TRY(header, header_res);
177 
178  if (header.number == 0) {
179  found_set_change = true;
180  } else {
181  // observe possible changes of authorities
182  for (auto &digest_item : boost::adaptors::reverse(header.digest)) {
183  visit_in_place(
184  digest_item,
185  [&](const primitives::Consensus &consensus_message) {
186  const bool is_grandpa = consensus_message.consensus_engine_id
188  if (not is_grandpa) {
189  return;
190  }
191 
192  auto decoded_res = consensus_message.decode();
193  if (decoded_res.has_error()) {
194  log->critical("Error decoding consensus message: {}",
195  decoded_res.error().message());
196  }
197  auto &grandpa_digest = decoded_res.value().asGrandpaDigest();
198 
199  auto scheduled_change =
200  boost::get<primitives::ScheduledChange>(&grandpa_digest);
201  if (scheduled_change != nullptr) {
202  found_set_change = true;
203  is_unapplied_change =
204  header.number + scheduled_change->subchain_length
205  >= finalized_block.number;
206  if (is_unapplied_change) {
207  collected_msgs.emplace(ConsensusMessages{
208  primitives::BlockInfo(header.number, hash),
209  consensus_message});
210  }
211  return;
212  }
213 
214  auto forced_change =
215  boost::get<primitives::ForcedChange>(&grandpa_digest);
216  if (forced_change != nullptr) {
217  found_set_change = true;
218  is_unapplied_change =
219  header.number + forced_change->subchain_length
220  >= finalized_block.number;
221  if (is_unapplied_change) {
222  collected_msgs.emplace(ConsensusMessages{
223  primitives::BlockInfo(header.number, hash),
224  consensus_message});
225  }
226  return;
227  }
228  },
229  [](const auto &...) {}); // Other variants are ignored
230  }
231  }
232 
233  if (found_set_change) {
234  auto block = is_unapplied_change ? primitives::BlockInfo(
235  header.number - 1, header.parent_hash)
236  : finalized_block;
237 
238  return block;
239  } else {
240  hash = header.parent_hash;
241  }
242  }
243  BOOST_UNREACHABLE_RETURN({})
244  }
245 
247 
249  const auto finalized_block = block_tree_->getLastFinalized();
250  auto res = initializeAt(finalized_block);
251  if (!res) {
252  SL_ERROR(log_,
253  "Error initializing authority manager: {}",
254  res.error().message());
255  }
256  return res.has_value();
257  }
258 
259  outcome::result<void> AuthorityManagerImpl::initializeAt(
260  const primitives::BlockInfo &root_block) {
261  OUTCOME_TRY(collected_msgs,
263 
264  OUTCOME_TRY(graph_root_block,
266  collected_msgs, root_block, *block_tree_, log_));
267 
268  OUTCOME_TRY(root_header,
269  block_tree_->getBlockHeader(graph_root_block.hash));
270 
271  auto set_id_from_runtime_res = readSetIdFromRuntime(root_header);
272  auto set_id_from_runtime_opt = set_id_from_runtime_res.has_value()
273  ? set_id_from_runtime_res.value()
274  : std::nullopt;
275 
276  OUTCOME_TRY(opt_root, fetchScheduleGraphRoot(*persistent_storage_));
277  auto last_finalized_block = block_tree_->getLastFinalized();
278 
279  if (opt_root
280  && opt_root.value()->current_block.number
281  <= last_finalized_block.number) {
282  // TODO(Harrm): #1334
283  // Correction to bypass the bug where after finishing syncing
284  // and restarting the node we get a set id off by one
285  if (set_id_from_runtime_opt.has_value()
286  && opt_root.value()->current_authorities->id
287  == set_id_from_runtime_opt.value() - 1) {
288  auto &authority_list =
289  opt_root.value()->current_authorities->authorities;
290  opt_root.value()->current_authorities =
291  std::make_shared<primitives::AuthoritySet>(
292  set_id_from_runtime_opt.value(), authority_list);
293  }
294 
295  root_ = std::move(opt_root.value());
296  SL_DEBUG(log_,
297  "Fetched authority set graph root from database with id {}",
298  root_->current_authorities->id);
299 
300  } else if (last_finalized_block.number == 0) {
301  auto &genesis_hash = block_tree_->getGenesisBlockHash();
302  OUTCOME_TRY(initial_authorities, grandpa_api_->authorities(genesis_hash));
304  std::make_shared<primitives::AuthoritySet>(
305  0, std::move(initial_authorities)),
306  {0, genesis_hash});
307  } else if (set_id_from_runtime_res.has_value()
308  && set_id_from_runtime_opt.has_value()) {
309  SL_WARN(
310  log_,
311  "Storage does not contain valid info about the root authority set; "
312  "Fall back to obtaining it from the runtime storage (which may "
313  "fail after a forced authority change happened on chain)");
314  OUTCOME_TRY(authorities,
315  grandpa_api_->authorities(graph_root_block.hash));
316 
317  auto authority_set = std::make_shared<primitives::AuthoritySet>(
318  set_id_from_runtime_opt.value(), std::move(authorities));
320  graph_root_block);
321 
322  OUTCOME_TRY(storeScheduleGraphRoot(*persistent_storage_, *root_));
323  SL_TRACE(log_,
324  "Create authority set graph root with id {}, taken from runtime "
325  "storage",
326  root_->current_authorities->id);
327  } else {
328  SL_ERROR(
329  log_,
330  "Failed to initialize authority manager; Try running recovery mode");
331  return set_id_from_runtime_res.as_failure();
332  }
333 
334  while (not collected_msgs.empty()) {
335  const auto &args = collected_msgs.top();
336  OUTCOME_TRY(onConsensus(args.block, args.message));
337 
338  collected_msgs.pop();
339  }
340 
341  // prune to reorganize collected changes
342  prune(root_block);
343 
344  SL_DEBUG(log_,
345  "Current grandpa authority set (id={}):",
346  root_->current_authorities->id);
347  size_t index = 0;
348  for (const auto &authority : *root_->current_authorities) {
349  SL_TRACE(log_,
350  "{}/{}: id={} weight={}",
351  ++index,
352  root_->current_authorities->authorities.size(),
353  authority.id.id,
354  authority.weight);
355  }
356 
357  return outcome::success();
358  }
359 
360  outcome::result<std::optional<AuthoritySetId>>
362  primitives::BlockHeader const &header) const {
363  AuthoritySetId set_id{};
364 
365  auto batch_res = trie_storage_->getEphemeralBatchAt(header.state_root);
366  if (batch_res.has_error()) {
367  if (batch_res.error() == storage::DatabaseError::NOT_FOUND) {
368  SL_DEBUG(log_,
369  "Failed to fetch set id from trie storage: state {} is not in "
370  "the storage",
371  header.state_root);
372  return std::nullopt;
373  }
374  return batch_res.as_failure();
375  }
376 
377  OUTCOME_TRY(hash, primitives::calculateBlockHash(header, *hasher_));
378  auto set_id_res = grandpa_api_->current_set_id(hash);
379  if (set_id_res) {
380  set_id = set_id_res.value();
381  } else {
382  auto &batch = batch_res.value();
383 
384  OUTCOME_TRY(
385  set_id_opt,
386  fetchSetIdFromTrieStorage(*batch, *hasher_, header.state_root));
387  if (set_id_opt) return set_id_opt.value();
388 
389  SL_DEBUG(log_,
390  "Failed to read authority set id from runtime (attempted both "
391  "GrandpaApi_current_set_id and trie storage)");
392  return std::nullopt;
393  }
394  return set_id;
395  }
396 
398  primitives::BlockNumber last_finalized_number) {
399  auto genesis_hash = block_tree_->getGenesisBlockHash();
400 
401  OUTCOME_TRY(initial_authorities, grandpa_api_->authorities(genesis_hash));
402  primitives::BlockInfo genesis_info{0, block_tree_->getGenesisBlockHash()};
403 
405  std::make_shared<primitives::AuthoritySet>(0, initial_authorities),
406  {0, genesis_hash});
407  SL_INFO(log_,
408  "Recovering authority manager state... (might take a few minutes)");
409  // if state is pruned
410  if (header_repo_->getBlockHeader(1).has_error()) {
411  SL_WARN(log_,
412  "Can't recalculate authority set id on a prune state, fall"
413  " back to fetching from runtime");
415  }
416 
417  auto start = std::chrono::steady_clock::now();
418  for (primitives::BlockNumber number = 0; number <= last_finalized_number;
419  number++) {
420  OUTCOME_TRY(header, header_repo_->getBlockHeader(number));
421  OUTCOME_TRY(hash, header_repo_->getHashByNumber(number));
422  primitives::BlockInfo info{number, hash};
423 
424  for (auto &msg : header.digest) {
425  if (auto consensus_msg = boost::get<primitives::Consensus>(&msg);
426  consensus_msg != nullptr) {
427  onConsensus(info, *consensus_msg).value();
428  }
429  }
430  auto justification_res = block_tree_->getBlockJustification(hash);
431  if (justification_res.has_value()) prune(info);
432  auto end = std::chrono::steady_clock::now();
433  auto duration = end - start;
434  using namespace std::chrono_literals;
435  // 5 seconds is nothing special, just a random more-or-like convenient
436  // duration.
437  if (duration > 5s) {
438  SL_VERBOSE(log_,
439  "Processed {} out of {} blocks",
440  number,
441  last_finalized_number);
442  start = end;
443  }
444  }
445  return outcome::success();
446  }
447 
449  if (not root_) {
450  log_->critical("Authority manager has null root");
451  std::terminate();
452  }
453  return root_->current_block;
454  }
455 
456  std::optional<std::shared_ptr<const primitives::AuthoritySet>>
458  IsBlockFinalized finalized) const {
459  auto node = getAppropriateAncestor(target_block);
460 
461  if (node == nullptr) {
462  return std::nullopt;
463  }
464 
465  IsBlockFinalized node_in_finalized_chain =
466  node->current_block == target_block
467  ? (bool)finalized
468  : node->current_block.number
469  <= block_tree_->getLastFinalized().number;
470 
471  auto adjusted_node =
472  node->makeDescendant(target_block, node_in_finalized_chain);
473 
474  if (adjusted_node->enabled) {
475  // Original authorities
476  SL_DEBUG(log_,
477  "Pick authority set with id {} for block {}",
478  adjusted_node->current_authorities->id,
479  target_block);
480  for (auto &authority : adjusted_node->current_authorities->authorities) {
481  SL_TRACE(log_, "Authority {}: {}", authority.id.id, authority.weight);
482  }
483  return adjusted_node->current_authorities;
484  }
485 
486  // Zero-weighted authorities
487  auto authorities = std::make_shared<primitives::AuthoritySet>(
488  *adjusted_node->current_authorities);
489  std::for_each(authorities->begin(),
490  authorities->end(),
491  [](auto &authority) { authority.weight = 0; });
492  return authorities;
493  }
494 
496  const primitives::BlockInfo &block,
498  primitives::BlockNumber activate_at) {
499  SL_DEBUG(log_,
500  "Applying scheduled change on block {} to activate at block {}",
501  block,
502  activate_at);
503  KAGOME_PROFILE_START(get_appropriate_ancestor)
504  auto ancestor_node = getAppropriateAncestor(block);
505  KAGOME_PROFILE_END(get_appropriate_ancestor)
506 
507  if (not ancestor_node) {
509  }
510 
511  SL_DEBUG(log_,
512  "Authorities for block {} found on block {} with set id {}",
513  block,
514  ancestor_node->current_block,
515  ancestor_node->current_authorities->id);
516 
517  auto schedule_change = [&](const std::shared_ptr<ScheduleNode> &node)
518  -> outcome::result<void> {
519  auto new_authorities = std::make_shared<primitives::AuthoritySet>(
520  node->current_authorities->id + 1, authorities);
521 
522  node->action =
523  ScheduleNode::ScheduledChange{activate_at, new_authorities};
524 
525  SL_VERBOSE(
526  log_,
527  "Authority set change is scheduled after block #{} (set id={})",
528  activate_at,
529  new_authorities->id);
530 
531  size_t index = 0;
532  for (auto &authority : *new_authorities) {
533  SL_TRACE(log_,
534  "New authority ({}/{}): id={} weight={}",
535  ++index,
536  new_authorities->authorities.size(),
537  authority.id.id,
538  authority.weight);
539  }
540 
541  return outcome::success();
542  };
543 
544  KAGOME_PROFILE_START(is_ancestor_node_finalized)
545  IsBlockFinalized is_ancestor_node_finalized =
546  ancestor_node->current_block == block_tree_->getLastFinalized()
547  or directChainExists(ancestor_node->current_block,
548  block_tree_->getLastFinalized());
549  KAGOME_PROFILE_END(is_ancestor_node_finalized)
550 
551  // maybe_set contains last planned authority set, if present
552  std::optional<std::shared_ptr<const primitives::AuthoritySet>> maybe_set =
553  std::nullopt;
554  if (not is_ancestor_node_finalized) {
555  std::shared_ptr<const ScheduleNode> last_node = ancestor_node;
556  while (last_node and last_node != root_) {
557  if (const auto *action =
558  boost::get<ScheduleNode::ScheduledChange>(&last_node->action);
559  action != nullptr) {
560  if (block.number <= action->applied_block) {
561  // It's mean, that new Scheduled Changes would be scheduled before
562  // previous is activated. So we ignore it
563  return outcome::success();
564  }
565 
566  if (action->new_authorities->id
567  > ancestor_node->current_authorities->id) {
568  maybe_set = action->new_authorities;
569  }
570  break;
571  }
572 
573  last_node = last_node->parent.lock();
574  }
575  }
576 
577  if (ancestor_node->current_block == block) {
578  if (maybe_set.has_value()) {
579  ancestor_node->current_authorities = maybe_set.value();
580  } else {
581  ancestor_node->adjust(is_ancestor_node_finalized);
582  }
583 
584  OUTCOME_TRY(schedule_change(ancestor_node));
585  } else {
586  KAGOME_PROFILE_START(make_descendant)
587  auto new_node = ancestor_node->makeDescendant(block, true);
588  KAGOME_PROFILE_END(make_descendant)
589 
590  if (maybe_set.has_value()) {
591  new_node->current_authorities = maybe_set.value();
592  }
593 
594  SL_DEBUG(log_,
595  "Make a schedule node for block {}, with actual set id {}",
596  block,
597  new_node->current_authorities->id);
598 
599  KAGOME_PROFILE_START(schedule_change)
600  OUTCOME_TRY(schedule_change(new_node));
601  KAGOME_PROFILE_END(schedule_change)
602 
603  // Reorganize ancestry
605  reorganize(ancestor_node, new_node);
607  }
608 
609  return outcome::success();
610  }
611 
613  const primitives::BlockInfo &current_block,
615  primitives::BlockNumber delay_start,
616  size_t delay) {
617  SL_DEBUG(log_,
618  "Applying forced change (delay start: {}, delay: {}) on block {} "
619  "to activate at block {}",
620  delay_start,
621  delay,
622  current_block,
623  delay_start + delay);
624  auto delay_start_hash_res = header_repo_->getHashByNumber(delay_start);
625  if (delay_start_hash_res.has_error()) {
626  SL_ERROR(log_, "Failed to obtain hash by number {}", delay_start);
627  }
628  OUTCOME_TRY(delay_start_hash, delay_start_hash_res);
629  auto ancestor_node =
630  getAppropriateAncestor({delay_start, delay_start_hash});
631 
632  if (not ancestor_node) {
634  }
635 
636  SL_DEBUG(log_,
637  "Found previous authority change at block {} with set id {}",
638  ancestor_node->current_block,
639  ancestor_node->current_authorities->id);
640 
641  auto force_change = [&](const std::shared_ptr<ScheduleNode> &node)
642  -> outcome::result<void> {
643  auto new_authorities = std::make_shared<primitives::AuthoritySet>(
644  node->current_authorities->id + 1, authorities);
645 
646  // Force changes
647  if (node->current_block.number >= delay_start + delay) {
648  node->current_authorities = new_authorities;
649  SL_VERBOSE(log_,
650  "Change has been forced on block #{} (set id={})",
651  delay_start + delay,
652  node->current_authorities->id);
653  } else {
654  node->action =
655  ScheduleNode::ForcedChange{delay_start, delay, new_authorities};
656  SL_VERBOSE(log_,
657  "Change will be forced on block #{} (set id={})",
658  delay_start + delay,
659  new_authorities->id);
660  }
661 
662  size_t index = 0;
663  for (auto &authority : *new_authorities) {
664  SL_TRACE(log_,
665  "New authority ({}/{}): id={} weight={}",
666  ++index,
667  new_authorities->authorities.size(),
668  authority.id.id,
669  authority.weight);
670  }
671 
672  return outcome::success();
673  };
674 
675  auto new_node =
676  ancestor_node->makeDescendant({delay_start, delay_start_hash}, true);
677 
678  OUTCOME_TRY(force_change(new_node));
679 
680  // Reorganize ancestry
681  ancestor_node->descendants.clear();
682  ancestor_node->descendants.push_back(new_node);
683  new_node->descendants.clear(); // reset all pending scheduled changes
684 
685  return outcome::success();
686  }
687 
689  const primitives::BlockInfo &block, uint64_t authority_index) {
691  SL_TRACE(log_, "Ignore 'on disabled' message due to config");
692  return outcome::success();
693  }
694  SL_DEBUG(log_, "Applying disable authority on block {}", block);
695 
696  auto node = getAppropriateAncestor(block);
697 
698  if (not node) {
700  }
701 
702  auto disable_authority = [&](const std::shared_ptr<ScheduleNode> &node)
703  -> outcome::result<void> {
704  // Make changed authorities
705  auto new_authority_set = std::make_shared<primitives::AuthoritySet>(
706  *node->current_authorities);
707 
708  // Check if index not out of bound
709  if (authority_index >= node->current_authorities->authorities.size()) {
711  }
712 
713  new_authority_set->authorities[authority_index].weight = 0;
714  node->current_authorities = std::move(new_authority_set);
715 
716  SL_VERBOSE(
717  log_,
718  "Authority id={} (index={} in set id={}) is disabled on block #{}",
719  node->current_authorities->authorities[authority_index].id.id,
720  authority_index,
721  node->current_authorities->id,
722  node->current_block.number);
723 
724  return outcome::success();
725  };
726 
727  IsBlockFinalized node_in_finalized_chain =
728  node->current_block.number <= block_tree_->getLastFinalized().number;
729 
730  if (node->current_block == block) {
731  node->adjust(node_in_finalized_chain);
732  OUTCOME_TRY(disable_authority(node));
733  } else {
734  auto new_node = node->makeDescendant(block, node_in_finalized_chain);
735 
736  OUTCOME_TRY(disable_authority(new_node));
737 
738  // Reorganize ancestry
739  auto descendants = std::move(node->descendants);
740  for (auto &descendant : descendants) {
741  if (directChainExists(block, descendant->current_block)) {
742  // Propagate change to descendants
743  if (descendant->current_authorities == node->current_authorities) {
744  descendant->current_authorities = new_node->current_authorities;
745  }
746  new_node->descendants.emplace_back(std::move(descendant));
747  } else {
748  node->descendants.emplace_back(std::move(descendant));
749  }
750  }
751  node->descendants.emplace_back(std::move(new_node));
752  }
753 
754  return outcome::success();
755  }
756 
757  outcome::result<void> AuthorityManagerImpl::applyPause(
758  const primitives::BlockInfo &block, primitives::BlockNumber activate_at) {
759  SL_DEBUG(log_, "Applying pause on block {}", block);
760 
761  auto node = getAppropriateAncestor(block);
762 
763  if (not node) {
765  }
766 
767  IsBlockFinalized node_in_finalized_chain =
768  node->current_block.number <= block_tree_->getLastFinalized().number;
769 
770  auto new_node = node->makeDescendant(block, node_in_finalized_chain);
771 
772  new_node->action = ScheduleNode::Pause{activate_at};
773 
774  SL_VERBOSE(log_,
775  "Scheduled pause after block #{}",
776  new_node->current_block.number);
777 
778  // Reorganize ancestry
779  auto descendants = std::move(node->descendants);
780  for (auto &descendant : descendants) {
781  auto &ancestor =
782  block.number <= descendant->current_block.number ? new_node : node;
783  ancestor->descendants.emplace_back(std::move(descendant));
784  }
785  node->descendants.emplace_back(std::move(new_node));
786 
787  return outcome::success();
788  }
789 
790  outcome::result<void> AuthorityManagerImpl::applyResume(
791  const primitives::BlockInfo &block, primitives::BlockNumber activate_at) {
792  auto node = getAppropriateAncestor(block);
793 
794  if (not node) {
796  }
797 
798  IsBlockFinalized node_in_finalized_chain =
799  node->current_block.number <= block_tree_->getLastFinalized().number;
800 
801  auto new_node = node->makeDescendant(block, node_in_finalized_chain);
802 
803  new_node->action = ScheduleNode::Resume{activate_at};
804 
805  SL_VERBOSE(log_,
806  "Resuming will be done at block #{}",
807  new_node->current_block.number);
808 
809  // Reorganize ancestry
810  reorganize(node, new_node);
811 
812  return outcome::success();
813  }
814 
815  outcome::result<void> AuthorityManagerImpl::onConsensus(
816  const primitives::BlockInfo &block,
817  const primitives::Consensus &message) {
819  SL_TRACE(log_,
820  "Apply consensus message from block {}, engine {}",
821  block,
822  message.consensus_engine_id.toString());
823 
824  OUTCOME_TRY(decoded, message.decode());
825  return visit_in_place(
826  decoded.asGrandpaDigest(),
827  [this, &block](
828  const primitives::ScheduledChange &msg) -> outcome::result<void> {
829  return applyScheduledChange(
830  block, msg.authorities, block.number + msg.subchain_length);
831  },
832  [this, &block](const primitives::ForcedChange &msg) {
833  return applyForcedChange(
834  block, msg.authorities, msg.delay_start, msg.subchain_length);
835  },
836  [this, &block](const primitives::OnDisabled &msg) {
837  SL_DEBUG(log_, "OnDisabled {}", msg.authority_index);
838  return applyOnDisabled(block, msg.authority_index);
839  },
840  [this, &block](const primitives::Pause &msg) {
841  SL_DEBUG(log_, "Pause {}", msg.subchain_length);
842  return applyPause(block, block.number + msg.subchain_length);
843  },
844  [this, &block](const primitives::Resume &msg) {
845  SL_DEBUG(log_, "Resume {}", msg.subchain_length);
846  return applyResume(block, block.number + msg.subchain_length);
847  },
848  [](auto &) {
850  });
851  } else if (message.consensus_engine_id == primitives::kBabeEngineId
852  or message.consensus_engine_id
854  or message.consensus_engine_id
856  // ignore
857  return outcome::success();
858 
859  } else {
860  SL_DEBUG(log_,
861  "Unknown consensus engine id in block {}: {}",
862  block,
863  message.consensus_engine_id.toString());
864  return outcome::success();
865  }
866  }
867 
869  if (block == root_->current_block) {
870  return;
871  }
872 
873  if (block.number < root_->current_block.number) {
874  return;
875  }
876 
877  auto node = getAppropriateAncestor(block);
878 
879  if (not node) {
880  return;
881  }
882 
883  if (node->current_block == block) {
884  // Rebase
885  root_ = std::move(node);
886 
887  } else {
888  // Reorganize ancestry
889  auto new_node = node->makeDescendant(block, true);
890  auto descendants = std::move(node->descendants);
891  for (auto &descendant : descendants) {
892  if (directChainExists(block, descendant->current_block)) {
893  new_node->descendants.emplace_back(std::move(descendant));
894  }
895  }
896 
897  root_ = std::move(new_node);
898  }
900 
901  SL_DEBUG(log_, "Prune authority manager upto block {}", block);
902  }
903 
904  std::shared_ptr<ScheduleNode> AuthorityManagerImpl::getAppropriateAncestor(
905  const primitives::BlockInfo &block) const {
906  BOOST_ASSERT(root_ != nullptr);
907 
908  // Target block is not descendant of the current root
909  if (root_->current_block.number > block.number
910  || (root_->current_block != block
911  && not directChainExists(root_->current_block, block))) {
912  return nullptr;
913  }
914  std::shared_ptr<ScheduleNode> ancestor = root_;
915  while (ancestor->current_block != block) {
916  bool goto_next_generation = false;
917  for (const auto &node : ancestor->descendants) {
918  if (node->current_block == block) {
919  return node;
920  }
921  if (directChainExists(node->current_block, block)) {
922  ancestor = node;
923  goto_next_generation = true;
924  break;
925  }
926  }
927  if (not goto_next_generation) {
928  break;
929  }
930  }
931  return ancestor;
932  }
933 
935  const primitives::BlockInfo &ancestor,
936  const primitives::BlockInfo &descendant) const {
937  SL_TRACE(log_,
938  "Looking if direct chain exists between {} and {}",
939  ancestor,
940  descendant);
941  KAGOME_PROFILE_START(direct_chain_exists)
942  // Any block is descendant of genesis
943  if (ancestor.number <= 1 && ancestor.number < descendant.number) {
944  return true;
945  }
946  auto result =
947  ancestor.number < descendant.number
948  && block_tree_->hasDirectChain(ancestor.hash, descendant.hash);
949  KAGOME_PROFILE_END(direct_chain_exists)
950  return result;
951  }
952 
954  std::shared_ptr<ScheduleNode> node,
955  std::shared_ptr<ScheduleNode> new_node) {
956  auto descendants = std::move(node->descendants);
957  for (auto &descendant : descendants) {
958  auto &ancestor =
959  new_node->current_block.number < descendant->current_block.number
960  ? new_node
961  : node;
962 
963  // Apply if delay will be passed for descendant
964  if (auto *forced_change =
965  boost::get<ScheduleNode::ForcedChange>(&ancestor->action)) {
966  if (descendant->current_block.number
967  >= forced_change->delay_start + forced_change->delay_length) {
968  descendant->current_authorities = forced_change->new_authorities;
969  descendant->action = ScheduleNode::NoAction{};
970  }
971  }
972  if (auto *resume = boost::get<ScheduleNode::Resume>(&ancestor->action)) {
973  if (descendant->current_block.number >= resume->applied_block) {
974  descendant->enabled = true;
975  descendant->action = ScheduleNode::NoAction{};
976  }
977  }
978 
979  ancestor->descendants.emplace_back(std::move(descendant));
980  }
981  node->descendants.emplace_back(std::move(new_node));
982  }
983 
985  auto ancestor = getAppropriateAncestor(block);
986 
987  if (ancestor == nullptr) {
988  SL_TRACE(log_, "No scheduled changes on block {}: no ancestor", block);
989  return;
990  }
991 
992  if (ancestor == root_) {
993  // Can't remove root
994  SL_TRACE(log_,
995  "Can't cancel scheduled changes on block {}: it is root",
996  block);
997  return;
998  }
999 
1000  if (ancestor->current_block == block) {
1001  ancestor = std::const_pointer_cast<ScheduleNode>(ancestor->parent.lock());
1002  }
1003 
1004  auto it = std::find_if(ancestor->descendants.begin(),
1005  ancestor->descendants.end(),
1006  [&block](std::shared_ptr<ScheduleNode> node) {
1007  return node->current_block == block;
1008  });
1009 
1010  if (it != ancestor->descendants.end()) {
1011  if (not(*it)->descendants.empty()) {
1012  // Has descendants - is not a leaf
1013  SL_TRACE(log_, "No scheduled changes on block {}: not found", block);
1014  return;
1015  }
1016 
1017  SL_DEBUG(log_, "Scheduled changes on block {} has removed", block);
1018  ancestor->descendants.erase(it);
1019  }
1020  }
1021 } // namespace kagome::authority
static const common::Buffer kScheduleGraphRootKey
std::shared_ptr< storage::trie::TrieStorage > trie_storage_
outcome::result< std::stack< ConsensusMessages > > collectMsgsFromNonFinalBlocks(blockchain::BlockTree const &block_tree, primitives::BlockHash const &finalized_block_hash)
#define KAGOME_PROFILE_END(scope)
Class represents arbitrary (including empty) byte buffer.
Definition: buffer.hpp:29
void cancel(const primitives::BlockInfo &block) override
Cancel changes. Should be called when the block is rolled back.
virtual outcome::result< void > remove(const K &key)=0
Remove value by key.
virtual outcome::result< primitives::BlockHeader > getBlockHeader(const primitives::BlockId &block) const =0
std::shared_ptr< ScheduleNode > getAppropriateAncestor(const primitives::BlockInfo &block) const
Find schedule_node according to the block.
AuthorityManagerImpl(Config config, std::shared_ptr< application::AppStateManager > app_state_manager, std::shared_ptr< blockchain::BlockTree > block_tree, std::shared_ptr< storage::trie::TrieStorage > trie_storage, std::shared_ptr< runtime::GrandpaApi > grandpa_api, std::shared_ptr< crypto::Hasher > hash, std::shared_ptr< storage::BufferStorage > persistent_storage, std::shared_ptr< blockchain::BlockHeaderRepository > header_repo)
const auto kUnsupportedEngineId_POL1
Definition: digest.hpp:31
outcome::result< std::optional< std::unique_ptr< ScheduleNode > > > fetchScheduleGraphRoot(storage::BufferStorage const &storage)
const auto kUnsupportedEngineId_BEEF
Definition: digest.hpp:34
outcome::result< void > applyPause(const primitives::BlockInfo &block, primitives::BlockNumber activate_at) override
A signal to pause the current authority set after the given delay, is a block finalized by the finali...
outcome::result< void > onConsensus(const primitives::BlockInfo &block, const primitives::Consensus &message) override
storage::trie::RootHash state_root
root of the Merkle tree
outcome::result< void > applyForcedChange(const primitives::BlockInfo &block, const primitives::AuthorityList &authorities, primitives::BlockNumber delay_start, size_t delay) override
Force an authority set change after the given delay of N blocks, after next one would be imported blo...
outcome::result< BlockHash > calculateBlockHash(BlockHeader const &header, crypto::Hasher const &hasher)
Definition: block_header.cpp:5
virtual Hash128 twox_128(gsl::span< const uint8_t > buffer) const =0
twox_128 calculates 16-byte twox hash
outcome::result< void > applyScheduledChange(const primitives::BlockInfo &block, const primitives::AuthorityList &authorities, primitives::BlockNumber activate_at) override
Schedule an authority set change after the given delay of N blocks, after next one would be finalized...
std::string toString() const noexcept
Definition: blob.hpp:153
outcome::result< void > applyOnDisabled(const primitives::BlockInfo &block, uint64_t authority_index) override
An index of the individual authority in the current authority list that should be immediately disable...
uint32_t BlockNumber
Definition: common.hpp:18
SLBuffer< std::numeric_limits< size_t >::max()> Buffer
Definition: buffer.hpp:244
SLBuffer & put(std::string_view view)
Put a string into byte buffer.
Definition: buffer.hpp:117
outcome::result< std::optional< primitives::AuthoritySetId > > readSetIdFromRuntime(primitives::BlockHeader const &targetBlock) const
std::shared_ptr< soralog::Logger > Logger
Definition: logger.hpp:23
uint64_t AuthoritySetId
Definition: authority.hpp:20
outcome::result< std::optional< AuthoritySetId > > fetchSetIdFromTrieStorage(storage::trie::TrieBatch const &trie_batch, crypto::Hasher const &hasher, storage::trie::RootHash const &state)
outcome::result< primitives::BlockInfo > collectConsensusMsgsUntilNearestSetChangeTo(std::stack< ConsensusMessages > &collected_msgs, const primitives::BlockInfo &finalized_block, const blockchain::BlockTree &block_tree, log::Logger &log)
outcome::result< void > storeScheduleGraphRoot(storage::BufferStorage &storage, ScheduleNode const &root)
outcome::result< void > initializeAt(const primitives::BlockInfo &root_block)
std::shared_ptr< crypto::Hasher > hasher_
void prune(const primitives::BlockInfo &block) override
Prunes data which was needed only till {.
virtual std::vector< primitives::BlockHash > getLeaves() const =0
void reorganize(std::shared_ptr< ScheduleNode > node, std::shared_ptr< ScheduleNode > new_node)
primitives::BlockInfo base() const override
static SLBuffer fromString(const std::string_view &src)
stores content of a string to byte array
Definition: buffer.hpp:205
virtual outcome::result< void > put(const K &key, const V &value)=0
Store value by key.
bool directChainExists(const primitives::BlockInfo &ancestor, const primitives::BlockInfo &descendant) const
Check if one block is direct ancestor of second one.
static std::shared_ptr< ScheduleNode > createAsRoot(std::shared_ptr< const primitives::AuthoritySet > current_authorities, primitives::BlockInfo block)
std::shared_ptr< runtime::GrandpaApi > grandpa_api_
Base::iterator emplace(Iter pos, Args &&...args)
#define KAGOME_PROFILE_START(scope)
const auto kGrandpaEngineId
Definition: digest.hpp:28
detail::BlockInfoT< struct BlockInfoTag > BlockInfo
Definition: common.hpp:63
std::shared_ptr< storage::BufferStorage > persistent_storage_
virtual outcome::result< std::optional< ConstValueView > > tryGet(const Key &key) const =0
Get value by key.
const auto kBabeEngineId
Definition: digest.hpp:25
Logger createLogger(const std::string &tag)
Definition: logger.cpp:112
std::shared_ptr< const blockchain::BlockTree > block_tree_
std::shared_ptr< blockchain::BlockHeaderRepository > header_repo_
outcome::result< DecodedConsensusMessage > decode() const
Definition: digest.hpp:158
virtual outcome::result< std::optional< V > > tryLoad(const Key &key) const =0
Load value by key.
outcome::result< void > clearScheduleGraphRoot(storage::BufferStorage &storage)
std::optional< std::shared_ptr< const primitives::AuthoritySet > > authorities(const primitives::BlockInfo &target_block, IsBlockFinalized finalized) const override
Returns authorities according specified block.
outcome::result< void > recalculateStoredState(primitives::BlockNumber last_finalized_number) override
outcome::result< void > applyResume(const primitives::BlockInfo &block, primitives::BlockNumber activate_at) override
A signal to resume the current authority set after the given delay, is an imported block and validate...
Node of scheduler tree. Contains actual authorities for the accorded block and all its descendant blo...