Kagome
Polkadot Runtime Engine in C++17
parachain_processor.cpp
Go to the documentation of this file.
1 
7 
8 #include <gsl/span>
9 
10 #include "crypto/hasher.hpp"
12 #include "network/common.hpp"
15 #include "network/peer_manager.hpp"
16 #include "network/router.hpp"
17 #include "scale/scale.hpp"
18 
21  e) {
23  switch (e) {
24  case E::RESPONSE_ALREADY_RECEIVED:
25  return "Response already present";
26  case E::COLLATION_NOT_FOUND:
27  return "Collation not found";
28  case E::KEY_NOT_PRESENT:
29  return "Private key is not present";
30  }
31  return "Unknown parachain processor error";
32 }
33 
34 namespace kagome::parachain {
35 
37  std::shared_ptr<network::PeerManager> pm,
38  std::shared_ptr<crypto::Sr25519Provider> crypto_provider,
39  std::shared_ptr<network::Router> router,
40  std::shared_ptr<boost::asio::io_context> this_context,
41  std::shared_ptr<crypto::Sr25519Keypair> keypair,
42  std::shared_ptr<crypto::Hasher> hasher)
43  : pm_(std::move(pm)),
44  crypto_provider_(std::move(crypto_provider)),
45  router_(std::move(router)),
46  this_context_(std::move(this_context)),
47  keypair_(std::move(keypair)),
48  hasher_(std::move(hasher)) {
49  BOOST_ASSERT(pm_);
50  BOOST_ASSERT(crypto_provider_);
51  BOOST_ASSERT(this_context_);
52  BOOST_ASSERT(router_);
53  BOOST_ASSERT(hasher_);
54  }
55 
58 #ifndef NDEBUG
59  for (auto &w : workers_) {
60  BOOST_ASSERT(!w);
61  }
62 #endif // NDEBUG
63  }
64 
66  context_ = std::make_shared<WorkersContext>();
67  work_guard_ = std::make_shared<WorkGuard>(context_->get_executor());
68  return true;
69  }
70 
72  BOOST_ASSERT(context_);
73  BOOST_ASSERT(work_guard_);
74  for (auto &worker : workers_)
75  if (!worker)
76  worker = std::make_unique<std::thread>(
77  [wptr{weak_from_this()}, context{context_}]() {
78  if (auto self = wptr.lock())
79  self->logger_->debug("Started parachain worker with id: {}",
80  std::this_thread::get_id());
81 
82  context->run();
83  });
84  return true;
85  }
86 
88  work_guard_.reset();
89  if (context_) {
90  context_->stop();
91  context_.reset();
92  }
93  for (auto &worker : workers_) {
94  if (worker) {
95  BOOST_ASSERT(worker->joinable());
96  worker->join();
97  worker.reset();
98  }
99  }
100  }
101 
102  outcome::result<ParachainProcessorImpl::FetchedCollation>
104  primitives::BlockHash const &relay_parent,
105  libp2p::peer::PeerId const &peer_id) {
106  return collations_.sharedAccess(
107  [&](auto const &collations)
108  -> outcome::result<ParachainProcessorImpl::FetchedCollation> {
109  if (auto para_it = collations.find(id); para_it != collations.end()) {
110  if (auto relay_it = para_it->second.find(relay_parent);
111  relay_it != para_it->second.end()) {
112  if (auto peer_it = relay_it->second.find(peer_id);
113  peer_it != relay_it->second.end()) {
114  return peer_it->second;
115  }
116  }
117  }
119  });
120  }
121 
122  outcome::result<ParachainProcessorImpl::FetchedCollation>
125  primitives::BlockHash const &relay_parent,
126  libp2p::peer::PeerId const &peer_id,
129  [&](auto &collations)
130  -> outcome::result<ParachainProcessorImpl::FetchedCollation> {
131  auto &peer_map = collations[id][relay_parent];
132  auto const &[it, inserted] = peer_map.insert(
133  std::make_pair(peer_id,
134  std::make_shared<FetchedCollationState>(
135  std::move(response),
138 
139  if (!inserted) return Error::RESPONSE_ALREADY_RECEIVED;
140  return it->second;
141  });
142  }
143 
146  primitives::BlockHash const &relay_parent,
147  libp2p::peer::PeerId const &peer_id,
149  auto store_result =
150  loadIntoResponseSlot(id, relay_parent, peer_id, std::move(response));
151  if (!store_result) {
152  logger_->warn("Fetch collation from {}:{} store failed with: {}",
153  peer_id.toBase58(),
154  relay_parent,
155  store_result.error().message());
156  return;
157  }
158 
159  FetchedCollationState const &collation = *store_result.value();
160  network::CandidateDescriptor const &descriptor =
161  candidateDescriptorFrom(collation);
162  primitives::BlockHash const candidate_hash = candidateHashFrom(collation);
163 
164  auto const candidate_para_id = descriptor.para_id;
165  if (candidate_para_id != our_current_state_.assignment) {
166  logger_->warn("Try to second for para_id {} out of our assignment {}.",
167  candidate_para_id,
168  our_current_state_.assignment
169  ? std::to_string(*our_current_state_.assignment)
170  : "{no assignment}");
171  return;
172  }
173 
174  if (our_current_state_.seconded) {
175  logger_->debug("Already have seconded block {} instead of {}.",
176  our_current_state_.seconded->toString(),
177  candidate_hash);
178  return;
179  }
180 
181  if (our_current_state_.issued_statements.count(candidate_hash) != 0) {
182  logger_->debug("Statement of {} already issued.", candidate_hash);
183  return;
184  }
185 
186  appendAsyncValidationTask(id, relay_parent, peer_id);
187  }
188 
190  logger_->error("Not implemented.");
191  }
192 
194  AttestingData &attesting_data) {
197 
198  auto opt_descriptor = candidateDescriptorFrom(attesting_data.statement);
199  BOOST_ASSERT(opt_descriptor);
200 
201  auto const &collator_id = collatorIdFromDescriptor(*opt_descriptor);
202  if (our_current_state_.required_collator
203  && collator_id != *our_current_state_.required_collator) {
204  our_current_state_.issued_statements.insert(
205  attesting_data.candidate_hash);
206  return;
207  }
208 
209  notify_internal(context_, [wptr{weak_from_this()}] {
210  if (auto self = wptr.lock()) {
212  self->requestPoV();
214  }
215  });
216  }
217 
220  primitives::BlockHash const &relay_parent,
221  libp2p::peer::PeerId const &peer_id) {
225  context_, [id, relay_parent, peer_id, wptr{weak_from_this()}] {
226  if (auto self = wptr.lock()) {
227  self->logger_->debug(
228  "Got an async task for VALIDATION execution {}:{}:{}",
229  id,
230  relay_parent,
231  peer_id.toBase58());
232 
233  if (auto collation_result =
234  self->getFromSlot(id, relay_parent, peer_id))
235  self->validateAndMakeAvailable(
236  peer_id,
237  relay_parent,
238  std::move(collation_result).value(),
239  [wptr](auto const &peer_id,
240  auto &&validate_and_second_result) {
241  if (auto self = wptr.lock())
242  self->onValidationComplete(
243  peer_id, std::move(validate_and_second_result));
244  });
245  else
246  self->logger_->template warn(
247  "In job retrieve collation result error: {}",
248  collation_result.error().message());
249  }
250  });
251  }
252 
253  template <typename T>
254  outcome::result<network::Signature> ParachainProcessorImpl::sign(
255  T const &t) const {
256  if (!keypair_) return Error::KEY_NOT_PRESENT;
257 
258  auto payload = scale::encode(t).value();
259  return crypto_provider_->sign(*keypair_, payload).value();
260  }
261 
262  std::optional<network::ValidatorIndex> ParachainProcessorImpl::getOurIndex() {
263  logger_->error("Not implemented. Return my validator index.");
264  return std::nullopt;
265  }
266 
268  libp2p::peer::PeerId const &peer_id,
269  primitives::BlockHash const &relay_parent,
270  std::shared_ptr<network::Statement> const &statement) {
271  if (auto result = importStatement(statement)) {
272  if (result->group_id != our_current_state_.assignment) {
273  logger_->warn(
274  "Registered statement from not our group(our: {}, registered: {}).",
275  our_current_state_.assignment
276  ? std::to_string(*our_current_state_.assignment)
277  : "{not assigned}",
278  result->group_id);
279  return;
280  }
281 
282  std::optional<std::reference_wrapper<AttestingData>> attesting_ref =
283  visit_in_place(
284  statement->candidate_state,
285  [&](network::Dummy const &)
286  -> std::optional<std::reference_wrapper<AttestingData>> {
287  BOOST_ASSERT(!"Not used!");
288  return std::nullopt;
289  },
290  [&](network::CommittedCandidateReceipt const &seconded)
291  -> std::optional<std::reference_wrapper<AttestingData>> {
292  auto const &candidate_hash = result->candidate;
293  auto const &[it, _] =
294  our_current_state_.fallbacks.insert(std::make_pair(
295  candidate_hash,
296  AttestingData{.statement = statement,
297  .candidate_hash = candidate_hash,
298  .pov_hash = seconded.descriptor.pov_hash,
299  .from_validator = statement->validator_ix,
300  .backing = {}}));
301  return it->second;
302  },
303  [&](primitives::BlockHash const &candidate_hash)
304  -> std::optional<std::reference_wrapper<AttestingData>> {
305  auto it = our_current_state_.fallbacks.find(candidate_hash);
306  if (it == our_current_state_.fallbacks.end())
307  return std::nullopt;
308  if (!getOurIndex() || *getOurIndex() == statement->validator_ix)
309  return std::nullopt;
310  if (our_current_state_.awaiting_validation.count(candidate_hash)
311  > 0) {
312  it->second.backing.push(statement->validator_ix);
313  return std::nullopt;
314  }
315  it->second.from_validator = statement->validator_ix;
316  return it->second;
317  });
318 
319  if (attesting_ref) kickOffValidationWork(attesting_ref->get());
320  }
321  }
322 
323  std::optional<ParachainProcessorImpl::ImportStatementSummary>
325  primitives::BlockHash const &candidate_hash,
326  std::shared_ptr<network::Statement> const &statement) {
327  logger_->error("Not implemented. Should be inserted in statement table.");
328  return std::nullopt;
329  }
330 
332  std::shared_ptr<network::Statement> const &statement) {
333  logger_->error(
334  "Not implemented. Should notify somebody that backed candidate "
335  "appeared.");
336  }
337 
338  std::optional<ParachainProcessorImpl::ImportStatementSummary>
340  std::shared_ptr<network::Statement> const &statement) {
341  auto import_result =
342  importStatementToTable(candidateHashFrom(statement), statement);
343  if (import_result && import_result->attested) {
344  notifyBackedCandidate(statement);
345  }
346  return import_result;
347  }
348 
349  template <ParachainProcessorImpl::StatementType kStatementType>
350  std::shared_ptr<network::Statement>
352  ValidateAndSecondResult &validation_result) {
353  static_assert(kStatementType == StatementType::kSeconded
354  || kStatementType == StatementType::kValid);
355 
356  auto const our_ix = getOurIndex();
357  if (!our_ix) {
358  logger_->template warn(
359  "We are not validators or we have no validator index.");
360  return nullptr;
361  }
362 
363  if constexpr (kStatementType == StatementType::kSeconded) {
365  .descriptor =
366  candidateDescriptorFrom(*validation_result.fetched_collation),
367  .commitments = std::move(*validation_result.commitments)};
368 
371  auto sign_result = sign(receipt);
372  if (!sign_result) {
373  logger_->error(
374  "Unable to sign Commited Candidate Receipt. Failed with error: {}",
375  sign_result.error().message());
376  return nullptr;
377  }
378 
379  return std::make_shared<network::Statement>(
380  network::Statement{.candidate_state = std::move(receipt),
381  .validator_ix = *our_ix,
382  .signature = std::move(sign_result.value())});
383  } else if constexpr (kStatementType == StatementType::kValid) {
384  auto const candidate_hash =
385  candidateHashFrom(*validation_result.fetched_collation);
386 
389  auto sign_result = sign(candidate_hash);
390  if (!sign_result) {
391  logger_->error(
392  "Unable to sign deemed valid hash. Failed with error: {}",
393  sign_result.error().message());
394  return nullptr;
395  }
396 
397  return std::make_shared<network::Statement>(
398  network::Statement{.candidate_state = candidate_hash,
399  .validator_ix = *our_ix,
400  .signature = std::move(sign_result.value())});
401  }
402  }
403 
405  libp2p::peer::PeerId const &peer_id,
406  primitives::BlockHash const &relay_parent) {
407  auto stream_engine = pm_->getStreamEngine();
408  BOOST_ASSERT(stream_engine);
409 
410  auto collation_protocol = router_->getCollationProtocol();
411  BOOST_ASSERT(collation_protocol);
412 
413  if (stream_engine->reserveOutgoing(peer_id, collation_protocol)) {
414  collation_protocol->newOutgoingStream(
415  libp2p::peer::PeerInfo{.id = peer_id, .addresses = {}},
416  [relay_parent,
417  protocol{collation_protocol},
418  peer_id,
419  wptr{weak_from_this()}](auto &&stream_result) {
420  auto self = wptr.lock();
421  if (not self) return;
422 
423  auto stream_engine = self->pm_->getStreamEngine();
424  stream_engine->dropReserveOutgoing(peer_id, protocol);
425 
426  if (!stream_result.has_value()) {
427  self->logger_->warn("Unable to create stream {} with {}: {}",
428  protocol->protocolName(),
429  peer_id,
430  stream_result.error().message());
431  return;
432  }
433 
434  if (auto add_result = stream_engine->addOutgoing(
435  std::move(stream_result.value()), protocol);
436  !add_result) {
437  self->logger_->error("Unable to store stream {} with {}: {}",
438  protocol->protocolName(),
439  peer_id,
440  add_result.error().message());
441  }
442  self->handleNotify(peer_id, relay_parent);
443  });
444  return;
445  }
446 
447  auto &statements_queue = our_current_state_.seconded_statements[peer_id];
448  while (!statements_queue.empty()) {
449  std::shared_ptr<network::Statement> statement(statements_queue.front());
450  statements_queue.pop_front();
451 
452  stream_engine->send(
453  peer_id,
454  collation_protocol,
455  std::make_shared<network::WireMessage>(network::ProtocolMessage(
457  .relay_parent = relay_parent, .statement = *statement}))));
458  }
459  }
460 
462  libp2p::peer::PeerId const &peer_id,
463  primitives::BlockHash const &relay_parent,
464  std::shared_ptr<network::Statement> const &statement) {
465  our_current_state_.seconded_statements[peer_id].emplace_back(statement);
466  handleNotify(peer_id, relay_parent);
467  }
468 
470  libp2p::peer::PeerId const &peer_id,
471  ValidateAndSecondResult &&validation_result) {
472  BOOST_ASSERT(validation_result.fetched_collation);
473 
474  auto const candidate_hash =
475  candidateHashFrom(*validation_result.fetched_collation);
476  if (!validation_result.result) {
477  logger_->warn("Candidate {} validation failed with: {}",
478  candidate_hash,
479  validation_result.result.error().message());
480  // send invalid
481  } else if (!our_current_state_.seconded
482  && our_current_state_.issued_statements.count(candidate_hash)
483  == 0) {
484  validation_result.fetched_collation->collation_state =
486  our_current_state_.seconded = candidate_hash;
487  our_current_state_.issued_statements.insert(candidate_hash);
488  if (auto statement = createAndSignStatement<StatementType::kSeconded>(
489  validation_result)) {
490  importStatement(statement);
492  notify(peer_id, validation_result.relay_parent, statement);
493  }
494  }
495  }
496 
498  std::shared_ptr<network::Statement> const &statement) {
500  }
501 
503  FetchedCollation & /*fetched_collation*/) {
504  logger_->error("Candidate Validation is not implemented!");
505  return outcome::success();
506  }
507 
509  FetchedCollation & /*fetched_collation*/) {
510  logger_->error("Erasure coding Validation is not implemented!");
511  return outcome::success();
512  }
513 
515  logger_->error("Availability notification is not implemented!");
516  }
517 
518  template <typename F>
520  libp2p::peer::PeerId const &peer_id,
521  primitives::BlockHash const &relay_parent,
522  FetchedCollation fetched_collation,
523  F &&callback) {
524  BOOST_ASSERT(fetched_collation);
525 
526  switch (fetched_collation->pov_state) {
528  break;
530  default: {
531  logger_->error("Unexpected PoV state processing.");
532  }
533  return;
534  }
535 
536  if (auto validation_result = validateCandidate(fetched_collation);
537  !validation_result) {
538  logger_->warn("Candidate {} validation failed with error: {}",
539  candidateHashFrom(*fetched_collation),
540  validation_result.error().message());
541  return;
542  }
543 
544  if (auto erasure_coding_result = validateErasureCoding(fetched_collation);
545  !erasure_coding_result) {
546  logger_->warn("Candidate {} validation failed with error: {}",
547  candidateHashFrom(*fetched_collation),
548  erasure_coding_result.error().message());
549  return;
550  }
551 
554  [peer_id,
555  callback{std::forward<F>(callback)},
556  validate_and_second_result{ValidateAndSecondResult{
557  .result = outcome::success(),
558  .fetched_collation = fetched_collation,
559  .relay_parent = relay_parent,
560  .commitments = nullptr}}]() mutable {
561  std::forward<F>(callback)(
562  peer_id, std::move(validate_and_second_result));
563  });
564  }
565 
567  libp2p::peer::PeerId const &peer_id, ValidateAndSecondResult &&result) {
568  auto const candidate_hash = candidateHashFrom(*result.fetched_collation);
569  our_current_state_.fallbacks.erase(candidate_hash);
570 
571  if (our_current_state_.issued_statements.count(candidate_hash) == 0) {
572  if (result.result) {
573  if (auto statement =
574  createAndSignStatement<StatementType::kValid>(result)) {
575  importStatement(statement);
577  }
578  }
579  our_current_state_.issued_statements.insert(candidate_hash);
580  }
581  }
582 
584  libp2p::peer::PeerId const &peer_id, ValidateAndSecondResult &&result) {
585  auto const candidate_hash = candidateHashFrom(*result.fetched_collation);
586  auto it = our_current_state_.fallbacks.find(candidate_hash);
587 
588  if (it == our_current_state_.fallbacks.end()) {
589  logger_->error(
590  "Internal error. Fallbacks doesn't contain candidate hash {}",
591  candidate_hash);
592  return;
593  }
594 
595  AttestingData &attesting = it->second;
596  if (!attesting.backing.empty()) {
597  attesting.from_validator = attesting.backing.front();
598  attesting.backing.pop();
599  kickOffValidationWork(attesting);
600  }
601  }
602 
604  std::optional<network::ParachainId> para_id) {
605  our_current_state_.assignment = para_id;
606  }
607 
609  network::PendingCollation const &pending_collation) {
610  router_->getReqCollationProtocol()->request(
611  pending_collation.peer_id,
613  .relay_parent = pending_collation.relay_parent,
614  .para_id = pending_collation.para_id},
615  [para_id{pending_collation.para_id},
616  relay_parent{pending_collation.relay_parent},
617  peer_id{pending_collation.peer_id},
618  wptr{weak_from_this()}](
619  outcome::result<network::CollationFetchingResponse> &&result) {
620  auto self = wptr.lock();
621  if (!self) return;
622 
623  if (!result) {
624  self->logger_->warn("Fetch collation from {}:{} failed with: {}",
625  peer_id.toBase58(),
626  relay_parent,
627  result.error().message());
628  return;
629  }
630 
631  self->handleFetchedCollation(
632  para_id, relay_parent, peer_id, std::move(result).value());
633  });
634  }
635 
636 } // namespace kagome::parachain
primitives::BlockHash candidateHashFrom(FetchedCollationState const &fetched_collation_state)
void kickOffValidationWork(AttestingData &attesting_data)
outcome::result< FetchedCollation > getFromSlot(network::ParachainId id, primitives::BlockHash const &relay_parent, libp2p::peer::PeerId const &peer_id)
void onAttestComplete(libp2p::peer::PeerId const &peer_id, ValidateAndSecondResult &&result)
primitives::BlockHash relay_parent
void notify(libp2p::peer::PeerId const &peer_id, primitives::BlockHash const &relay_parent, std::shared_ptr< network::Statement > const &statement)
std::shared_ptr< crypto::Sr25519Provider > crypto_provider_
void handleFetchedCollation(network::ParachainId id, primitives::BlockHash const &relay_parent, libp2p::peer::PeerId const &peer_id, network::CollationFetchingResponse &&response)
boost::variant< CollatorDeclaration, CollatorAdvertisement, Dummy, Dummy, Seconded > CollationMessage
std::string_view to_string(SlotType s)
Definition: slot.hpp:22
std::optional< ImportStatementSummary > importStatement(std::shared_ptr< network::Statement > const &statement)
std::shared_ptr< crypto::Hasher > hasher_
void notifyBackedCandidate(std::shared_ptr< network::Statement > const &statement)
std::shared_ptr< network::PeerManager > pm_
void setAssignedParachain(std::optional< network::ParachainId > para_id)
STL namespace.
void handleNotify(libp2p::peer::PeerId const &peer_id, primitives::BlockHash const &relay_parent)
void onValidationComplete(libp2p::peer::PeerId const &peer_id, ValidateAndSecondResult &&result)
void requestCollations(network::PendingCollation const &pending_collation)
std::shared_ptr< network::Statement > createAndSignStatement(ValidateAndSecondResult &validation_result)
std::shared_ptr< FetchedCollationState > FetchedCollation
struct kagome::parachain::ParachainProcessorImpl::@12 our_current_state_
boost::variant< CollationMessage > ProtocolMessage
outcome::result< FetchedCollation > loadIntoResponseSlot(network::ParachainId id, primitives::BlockHash const &relay_parent, libp2p::peer::PeerId const &peer_id, network::CollationFetchingResponse &&response)
auto sharedAccess(F &&f) const
Definition: safe_object.hpp:49
std::shared_ptr< WorkersContext > this_context_
network::CandidateDescriptor const & candidateDescriptorFrom(FetchedCollationState const &fetched_collation_state)
std::optional< ImportStatementSummary > importStatementToTable(primitives::BlockHash const &candidate_hash, std::shared_ptr< network::Statement > const &statement)
libp2p::peer::PeerInfo PeerInfo
std::unique_ptr< std::thread > workers_[kBackgroundWorkers]
libp2p::peer::PeerId PeerId
auto exclusiveAccess(F &&f)
Definition: safe_object.hpp:43
void notify_internal(std::shared_ptr< WorkersContext > &context, F &&func)
std::tuple<> Dummy
NU element.
ParachainProcessorImpl(std::shared_ptr< network::PeerManager > pm, std::shared_ptr< crypto::Sr25519Provider > crypto_provider, std::shared_ptr< network::Router > router, std::shared_ptr< boost::asio::io_context > this_context, std::shared_ptr< crypto::Sr25519Keypair > keypair, std::shared_ptr< crypto::Hasher > hasher)
network::CollatorPublicKey const & collatorIdFromDescriptor(network::CandidateDescriptor const &descriptor)
void onAttestNoPoVComplete(libp2p::peer::PeerId const &peer_id, ValidateAndSecondResult &&result)
outcome::result< void > validateCandidate(FetchedCollation &fetched_collation)
void notifyStatementDistributionSystem(std::shared_ptr< network::Statement > const &statement)
std::shared_ptr< network::Router > router_
std::shared_ptr< crypto::Sr25519Keypair > keypair_
outcome::result< network::Signature > sign(T const &t) const
void appendAsyncValidationTask(network::ParachainId id, primitives::BlockHash const &relay_parent, libp2p::peer::PeerId const &peer_id)
libp2p::peer::PeerId const & peer_id
std::optional< network::ValidatorIndex > getOurIndex()
void validateAndMakeAvailable(libp2p::peer::PeerId const &peer_id, primitives::BlockHash const &relay_parent, FetchedCollation fetched_collation, F &&callback)
outcome::result< void > validateErasureCoding(FetchedCollation &fetched_collation)
OUTCOME_CPP_DEFINE_CATEGORY(kagome::parachain, ParachainProcessorImpl::Error, e)
void handleStatement(libp2p::peer::PeerId const &peer_id, primitives::BlockHash const &relay_parent, std::shared_ptr< network::Statement > const &statement)
std::shared_ptr< WorkersContext > context_