Kagome
Polkadot Runtime Engine in C++17
parachain_processor.hpp
Go to the documentation of this file.
1 
6 #ifndef KAGOME_PARACHAIN_PROCESSOR_HPP
7 #define KAGOME_PARACHAIN_PROCESSOR_HPP
8 
9 #include <memory>
10 #include <queue>
11 #include <thread>
12 #include <unordered_map>
13 
14 #include <boost/asio/executor_work_guard.hpp>
15 #include <boost/asio/io_context.hpp>
16 #include <boost/asio/io_service.hpp>
17 #include <boost/asio/post.hpp>
18 #include <boost/asio/signal_set.hpp>
19 #include <libp2p/peer/peer_id.hpp>
20 
21 #include "common/visitor.hpp"
22 #include "crypto/hasher.hpp"
25 #include "outcome/outcome.hpp"
26 #include "primitives/common.hpp"
27 #include "utils/non_copyable.hpp"
28 #include "utils/safe_object.hpp"
29 
30 namespace kagome::network {
31  class PeerManager;
32  class Router;
33  struct PendingCollation;
34 } // namespace kagome::network
35 
36 namespace kagome::crypto {
37  class Sr25519Provider;
38  class Hasher;
39 } // namespace kagome::crypto
40 
41 namespace kagome::parachain {
42 
44  : std::enable_shared_from_this<ParachainProcessorImpl> {
45  enum class Error {
46  RESPONSE_ALREADY_RECEIVED = 1,
47  COLLATION_NOT_FOUND,
48  KEY_NOT_PRESENT
49  };
50  static constexpr uint64_t kBackgroundWorkers = 5;
51 
58  uint64_t validity_votes;
60  bool attested;
61  };
62 
64  std::shared_ptr<network::PeerManager> pm,
65  std::shared_ptr<crypto::Sr25519Provider> crypto_provider,
66  std::shared_ptr<network::Router> router,
67  std::shared_ptr<boost::asio::io_context> this_context,
68  std::shared_ptr<crypto::Sr25519Keypair> keypair,
69  std::shared_ptr<crypto::Hasher> hasher);
71 
72  bool start();
73  void stop();
74  bool prepare();
75  void requestCollations(network::PendingCollation const &pending_collation);
76  void setAssignedParachain(std::optional<network::ParachainId> para_id);
77  void handleStatement(libp2p::peer::PeerId const &peer_id,
78  primitives::BlockHash const &relay_parent,
79  std::shared_ptr<network::Statement> const &statement);
80 
81  private:
82  enum struct CollationState { kFetched, kSeconded };
83  enum struct PoVDataState { kComplete, kFetchFromValidator };
84  enum struct StatementType { kSeconded = 0, kValid };
85 
90 
92  CollationState cs,
93  PoVDataState ps)
94  : fetched_collation(std::move(fc)),
95  collation_state(cs),
96  pov_state(ps) {}
97  };
98 
99  using FetchedCollation = std::shared_ptr<FetchedCollationState>;
100  using Commitments = std::shared_ptr<network::CandidateCommitments>;
101  using PeerMap = std::unordered_map<libp2p::peer::PeerId, FetchedCollation>;
102  using RelayParentMap = std::unordered_map<primitives::BlockHash, PeerMap>;
103  using ParachainMap =
104  std::unordered_map<network::ParachainId, RelayParentMap>;
105  using BackgroundTask = std::function<void()>;
106  using WorkersContext = boost::asio::io_context;
107  using WorkGuard = boost::asio::executor_work_guard<
108  boost::asio::io_context::executor_type>;
109 
111  outcome::result<void> result;
115  };
116 
117  struct AttestingData {
118  std::shared_ptr<network::Statement> statement;
122  std::queue<network::ValidatorIndex> backing;
123  };
124 
125  /*
126  * Validation.
127  */
128  outcome::result<void> validateCandidate(
129  FetchedCollation &fetched_collation);
130  outcome::result<void> validateErasureCoding(
131  FetchedCollation &fetched_collation);
132  template <typename F>
133  void validateAndMakeAvailable(libp2p::peer::PeerId const &peer_id,
134  primitives::BlockHash const &relay_parent,
135  FetchedCollation fetched_collation,
136  F &&callback);
137  void requestPoV();
138 
139  /*
140  * Logic.
141  */
142  void onValidationComplete(libp2p::peer::PeerId const &peer_id,
143  ValidateAndSecondResult &&result);
144  void onAttestComplete(libp2p::peer::PeerId const &peer_id,
145  ValidateAndSecondResult &&result);
146  void onAttestNoPoVComplete(libp2p::peer::PeerId const &peer_id,
147  ValidateAndSecondResult &&result);
148  void appendAsyncValidationTask(network::ParachainId id,
149  primitives::BlockHash const &relay_parent,
150  libp2p::peer::PeerId const &peer_id);
151  void kickOffValidationWork(AttestingData &attesting_data);
152  void handleFetchedCollation(network::ParachainId id,
153  primitives::BlockHash const &relay_parent,
154  libp2p::peer::PeerId const &peer_id,
156  template <StatementType kStatementType>
157  std::shared_ptr<network::Statement> createAndSignStatement(
158  ValidateAndSecondResult &validation_result);
159  std::optional<ImportStatementSummary> importStatement(
160  std::shared_ptr<network::Statement> const &statement);
161  std::optional<network::ValidatorIndex> getOurIndex();
162 
163  /*
164  * Helpers.
165  */
167  FetchedCollationState const &fetched_collation_state) {
168  return visit_in_place(
169  fetched_collation_state.fetched_collation.response_data,
170  [&](network::CollationResponse const &collation_response)
172  return hasher_->blake2b_256(
173  scale::encode(collation_response.receipt).value());
174  });
175  }
176 
178  FetchedCollationState const &fetched_collation_state) {
179  return visit_in_place(
180  fetched_collation_state.fetched_collation.response_data,
181  [](network::CollationResponse const &collation_response)
182  -> network::CandidateDescriptor const & {
183  return collation_response.receipt.descriptor;
184  });
185  }
186 
187  std::optional<std::reference_wrapper<network::CandidateDescriptor const>>
189  std::shared_ptr<network::Statement> const &statement) {
190  return visit_in_place(
191  statement->candidate_state,
192  [](network::CommittedCandidateReceipt const &receipt)
193  -> std::optional<
194  std::reference_wrapper<network::CandidateDescriptor const>> {
195  return receipt.descriptor;
196  },
197  [](...)
198  -> std::optional<
199  std::reference_wrapper<network::CandidateDescriptor const>> {
200  BOOST_ASSERT(false);
201  return std::nullopt;
202  });
203  }
204 
206  network::CandidateDescriptor const &descriptor) {
207  return descriptor.collator_id;
208  }
209 
211  std::shared_ptr<network::Statement> const &statement) {
212  return visit_in_place(
213  statement->candidate_state,
214  [](network::Dummy const &) {
215  BOOST_ASSERT(!"Not used!");
216  return primitives::BlockHash{};
217  },
218  [&](network::CommittedCandidateReceipt const &data) {
219  return hasher_->blake2b_256(
220  scale::encode(network::CandidateReceipt{
221  .descriptor = data.descriptor,
222  .commitments_hash = hasher_->blake2b_256(
223  scale::encode(data.commitments).value())})
224  .value());
225  },
226  [&](primitives::BlockHash const &candidate_hash) {
227  return candidate_hash;
228  });
229  }
230 
231  /*
232  * Notification
233  */
234  template <typename F>
235  void notify_internal(std::shared_ptr<WorkersContext> &context, F &&func) {
236  BOOST_ASSERT(context);
237  boost::asio::post(*context, std::forward<F>(func));
238  }
239  void notifyBackedCandidate(
240  std::shared_ptr<network::Statement> const &statement);
241  void notifyAvailableData();
242  void notifyStatementDistributionSystem(
243  std::shared_ptr<network::Statement> const &statement);
244  void notify(libp2p::peer::PeerId const &peer_id,
245  primitives::BlockHash const &relay_parent,
246  std::shared_ptr<network::Statement> const &statement);
247  void handleNotify(libp2p::peer::PeerId const &peer_id,
248  primitives::BlockHash const &relay_parent);
249 
250  outcome::result<FetchedCollation> loadIntoResponseSlot(
252  primitives::BlockHash const &relay_parent,
253  libp2p::peer::PeerId const &peer_id,
255 
256  outcome::result<FetchedCollation> getFromSlot(
258  primitives::BlockHash const &relay_parent,
259  libp2p::peer::PeerId const &peer_id);
260 
261  template <typename T>
262  outcome::result<network::Signature> sign(T const &t) const;
263 
264  std::optional<ImportStatementSummary> importStatementToTable(
265  primitives::BlockHash const &candidate_hash,
266  std::shared_ptr<network::Statement> const &statement);
267 
268  std::shared_ptr<network::PeerManager> pm_;
269  std::shared_ptr<crypto::Sr25519Provider> crypto_provider_;
270  std::shared_ptr<network::Router> router_;
271  log::Logger logger_ =
272  log::createLogger("ParachainProcessorImpl", "parachain");
273 
274  struct {
275  std::optional<network::ParachainId> assignment;
276  std::optional<primitives::BlockHash> seconded;
277  std::optional<network::CollatorPublicKey> required_collator;
278 
279  std::unordered_set<primitives::BlockHash> awaiting_validation;
280  std::unordered_set<primitives::BlockHash> issued_statements;
281  std::unordered_map<primitives::BlockHash, AttestingData> fallbacks;
282  std::unordered_map<libp2p::peer::PeerId,
283  std::deque<std::shared_ptr<network::Statement>>>
284  seconded_statements;
285  } our_current_state_;
286 
287  std::shared_ptr<WorkersContext> context_;
288  std::shared_ptr<WorkGuard> work_guard_;
289  std::unique_ptr<std::thread> workers_[kBackgroundWorkers];
290 
292  std::shared_ptr<WorkersContext> this_context_;
293  std::shared_ptr<crypto::Sr25519Keypair> keypair_;
294  std::shared_ptr<crypto::Hasher> hasher_;
295  };
296 
297 } // namespace kagome::parachain
298 
300 
301 #endif // KAGOME_PARACHAIN_PROCESSOR_HPP
primitives::BlockHash candidateHashFrom(FetchedCollationState const &fetched_collation_state)
std::shared_ptr< crypto::Sr25519Provider > crypto_provider_
primitives::BlockHash candidateHashFrom(std::shared_ptr< network::Statement > const &statement)
std::shared_ptr< crypto::Hasher > hasher_
std::shared_ptr< network::PeerManager > pm_
STL namespace.
std::shared_ptr< FetchedCollationState > FetchedCollation
crypto::Sr25519PublicKey CollatorPublicKey
std::shared_ptr< WorkersContext > this_context_
network::CandidateDescriptor const & candidateDescriptorFrom(FetchedCollationState const &fetched_collation_state)
boost::asio::executor_work_guard< boost::asio::io_context::executor_type > WorkGuard
libp2p::peer::PeerId PeerId
void notify_internal(std::shared_ptr< WorkersContext > &context, F &&func)
std::shared_ptr< soralog::Logger > Logger
Definition: logger.hpp:23
uint64_t validity_votes
How many validity votes are currently witnessed.
CollatorPublicKey collator_id
executed in the context of
std::tuple<> Dummy
NU element.
network::CollatorPublicKey const & collatorIdFromDescriptor(network::CandidateDescriptor const &descriptor)
network::ParachainId group_id
The group that the candidate is in.
std::shared_ptr< network::Router > router_
std::shared_ptr< crypto::Sr25519Keypair > keypair_
std::shared_ptr< network::CandidateCommitments > Commitments
FetchedCollationState(network::CollationFetchingResponse &&fc, CollationState cs, PoVDataState ps)
std::optional< std::reference_wrapper< network::CandidateDescriptor const > > candidateDescriptorFrom(std::shared_ptr< network::Statement > const &statement)
std::unordered_map< libp2p::peer::PeerId, FetchedCollation > PeerMap
Logger createLogger(const std::string &tag)
Definition: logger.cpp:112
std::unordered_map< primitives::BlockHash, PeerMap > RelayParentMap
primitives::BlockHash candidate
The digest of the candidate.
OUTCOME_HPP_DECLARE_ERROR(kagome::parachain, ParachainProcessorImpl::Error)
std::shared_ptr< WorkersContext > context_
std::unordered_map< network::ParachainId, RelayParentMap > ParachainMap