17 #include "scale/scale.hpp" 24 case E::RESPONSE_ALREADY_RECEIVED:
25 return "Response already present";
26 case E::COLLATION_NOT_FOUND:
27 return "Collation not found";
28 case E::KEY_NOT_PRESENT:
29 return "Private key is not present";
31 return "Unknown parachain processor error";
37 std::shared_ptr<network::PeerManager> pm,
38 std::shared_ptr<crypto::Sr25519Provider> crypto_provider,
39 std::shared_ptr<network::Router> router,
40 std::shared_ptr<boost::asio::io_context> this_context,
41 std::shared_ptr<crypto::Sr25519Keypair> keypair,
42 std::shared_ptr<crypto::Hasher> hasher)
44 crypto_provider_(
std::move(crypto_provider)),
45 router_(
std::move(router)),
46 this_context_(
std::move(this_context)),
47 keypair_(
std::move(keypair)),
48 hasher_(
std::move(hasher)) {
66 context_ = std::make_shared<WorkersContext>();
76 worker = std::make_unique<std::thread>(
77 [wptr{weak_from_this()}, context{
context_}]() {
78 if (
auto self = wptr.lock())
79 self->logger_->debug(
"Started parachain worker with id: {}",
80 std::this_thread::get_id());
95 BOOST_ASSERT(worker->joinable());
102 outcome::result<ParachainProcessorImpl::FetchedCollation>
107 [&](
auto const &collations)
108 -> outcome::result<ParachainProcessorImpl::FetchedCollation> {
109 if (
auto para_it = collations.find(
id); para_it != collations.end()) {
110 if (
auto relay_it = para_it->second.find(relay_parent);
111 relay_it != para_it->second.end()) {
112 if (
auto peer_it = relay_it->second.find(peer_id);
113 peer_it != relay_it->second.end()) {
114 return peer_it->second;
122 outcome::result<ParachainProcessorImpl::FetchedCollation>
129 [&](
auto &collations)
130 -> outcome::result<ParachainProcessorImpl::FetchedCollation> {
131 auto &peer_map = collations[id][relay_parent];
132 auto const &[it, inserted] = peer_map.insert(
133 std::make_pair(peer_id,
134 std::make_shared<FetchedCollationState>(
152 logger_->warn(
"Fetch collation from {}:{} store failed with: {}",
155 store_result.error().message());
164 auto const candidate_para_id = descriptor.
para_id;
166 logger_->warn(
"Try to second for para_id {} out of our assignment {}.",
170 :
"{no assignment}");
175 logger_->debug(
"Already have seconded block {} instead of {}.",
182 logger_->debug(
"Statement of {} already issued.", candidate_hash);
190 logger_->error(
"Not implemented.");
199 BOOST_ASSERT(opt_descriptor);
210 if (
auto self = wptr.lock()) {
225 context_, [
id, relay_parent, peer_id, wptr{weak_from_this()}] {
226 if (
auto self = wptr.lock()) {
227 self->logger_->debug(
228 "Got an async task for VALIDATION execution {}:{}:{}",
233 if (
auto collation_result =
234 self->getFromSlot(
id, relay_parent, peer_id))
235 self->validateAndMakeAvailable(
238 std::move(collation_result).value(),
239 [wptr](
auto const &peer_id,
240 auto &&validate_and_second_result) {
241 if (
auto self = wptr.lock())
242 self->onValidationComplete(
243 peer_id, std::move(validate_and_second_result));
246 self->logger_->template warn(
247 "In job retrieve collation result error: {}",
248 collation_result.error().message());
253 template <
typename T>
258 auto payload = scale::encode(t).value();
263 logger_->error(
"Not implemented. Return my validator index.");
270 std::shared_ptr<network::Statement>
const &statement) {
274 "Registered statement from not our group(our: {}, registered: {}).",
282 std::optional<std::reference_wrapper<AttestingData>> attesting_ref =
284 statement->candidate_state,
286 -> std::optional<std::reference_wrapper<AttestingData>> {
287 BOOST_ASSERT(!
"Not used!");
291 -> std::optional<std::reference_wrapper<AttestingData>> {
292 auto const &candidate_hash = result->candidate;
293 auto const &[it, _] =
294 our_current_state_.fallbacks.insert(std::make_pair(
296 AttestingData{.statement = statement,
297 .candidate_hash = candidate_hash,
298 .pov_hash = seconded.descriptor.pov_hash,
299 .from_validator = statement->validator_ix,
304 -> std::optional<std::reference_wrapper<AttestingData>> {
312 it->second.backing.push(statement->validator_ix);
315 it->second.from_validator = statement->validator_ix;
323 std::optional<ParachainProcessorImpl::ImportStatementSummary>
326 std::shared_ptr<network::Statement>
const &statement) {
327 logger_->error(
"Not implemented. Should be inserted in statement table.");
332 std::shared_ptr<network::Statement>
const &statement) {
334 "Not implemented. Should notify somebody that backed candidate " 338 std::optional<ParachainProcessorImpl::ImportStatementSummary>
340 std::shared_ptr<network::Statement>
const &statement) {
343 if (import_result && import_result->attested) {
346 return import_result;
349 template <ParachainProcessorImpl::StatementType kStatementType>
350 std::shared_ptr<network::Statement>
359 "We are not validators or we have no validator index.");
367 .commitments = std::move(*validation_result.
commitments)};
371 auto sign_result =
sign(receipt);
374 "Unable to sign Commited Candidate Receipt. Failed with error: {}",
375 sign_result.error().message());
379 return std::make_shared<network::Statement>(
381 .validator_ix = *our_ix,
382 .signature = std::move(sign_result.value())});
384 auto const candidate_hash =
389 auto sign_result =
sign(candidate_hash);
392 "Unable to sign deemed valid hash. Failed with error: {}",
393 sign_result.error().message());
397 return std::make_shared<network::Statement>(
399 .validator_ix = *our_ix,
400 .signature = std::move(sign_result.value())});
407 auto stream_engine =
pm_->getStreamEngine();
408 BOOST_ASSERT(stream_engine);
410 auto collation_protocol =
router_->getCollationProtocol();
411 BOOST_ASSERT(collation_protocol);
413 if (stream_engine->reserveOutgoing(peer_id, collation_protocol)) {
414 collation_protocol->newOutgoingStream(
417 protocol{collation_protocol},
419 wptr{weak_from_this()}](
auto &&stream_result) {
420 auto self = wptr.lock();
421 if (not
self)
return;
423 auto stream_engine =
self->pm_->getStreamEngine();
424 stream_engine->dropReserveOutgoing(peer_id, protocol);
426 if (!stream_result.has_value()) {
427 self->logger_->warn(
"Unable to create stream {} with {}: {}",
428 protocol->protocolName(),
430 stream_result.error().message());
434 if (
auto add_result = stream_engine->addOutgoing(
435 std::move(stream_result.value()), protocol);
437 self->logger_->error(
"Unable to store stream {} with {}: {}",
438 protocol->protocolName(),
440 add_result.error().message());
442 self->handleNotify(peer_id, relay_parent);
448 while (!statements_queue.empty()) {
449 std::shared_ptr<network::Statement> statement(statements_queue.front());
450 statements_queue.pop_front();
457 .
relay_parent = relay_parent, .statement = *statement}))));
464 std::shared_ptr<network::Statement>
const &statement) {
472 BOOST_ASSERT(validation_result.fetched_collation);
474 auto const candidate_hash =
476 if (!validation_result.result) {
477 logger_->warn(
"Candidate {} validation failed with: {}",
479 validation_result.result.error().message());
484 validation_result.fetched_collation->collation_state =
488 if (
auto statement = createAndSignStatement<StatementType::kSeconded>(
489 validation_result)) {
492 notify(peer_id, validation_result.relay_parent, statement);
498 std::shared_ptr<network::Statement>
const &statement) {
504 logger_->error(
"Candidate Validation is not implemented!");
505 return outcome::success();
510 logger_->error(
"Erasure coding Validation is not implemented!");
511 return outcome::success();
515 logger_->error(
"Availability notification is not implemented!");
518 template <
typename F>
524 BOOST_ASSERT(fetched_collation);
526 switch (fetched_collation->pov_state) {
531 logger_->error(
"Unexpected PoV state processing.");
537 !validation_result) {
538 logger_->warn(
"Candidate {} validation failed with error: {}",
540 validation_result.error().message());
545 !erasure_coding_result) {
546 logger_->warn(
"Candidate {} validation failed with error: {}",
548 erasure_coding_result.error().message());
555 callback{std::forward<F>(callback)},
557 .
result = outcome::success(),
558 .fetched_collation = fetched_collation,
559 .relay_parent = relay_parent,
560 .commitments =
nullptr}}]()
mutable {
561 std::forward<F>(callback)(
562 peer_id, std::move(validate_and_second_result));
574 createAndSignStatement<StatementType::kValid>(result)) {
590 "Internal error. Fallbacks doesn't contain candidate hash {}",
596 if (!attesting.
backing.empty()) {
604 std::optional<network::ParachainId> para_id) {
610 router_->getReqCollationProtocol()->request(
613 .relay_parent = pending_collation.relay_parent,
614 .para_id = pending_collation.para_id},
615 [para_id{pending_collation.para_id},
616 relay_parent{pending_collation.relay_parent},
617 peer_id{pending_collation.peer_id},
618 wptr{weak_from_this()}](
619 outcome::result<network::CollationFetchingResponse> &&result) {
620 auto self = wptr.lock();
624 self->logger_->warn(
"Fetch collation from {}:{} failed with: {}",
627 result.error().message());
631 self->handleFetchedCollation(
632 para_id, relay_parent, peer_id, std::move(result).value());
primitives::BlockHash candidateHashFrom(FetchedCollationState const &fetched_collation_state)
std::queue< network::ValidatorIndex > backing
void kickOffValidationWork(AttestingData &attesting_data)
outcome::result< FetchedCollation > getFromSlot(network::ParachainId id, primitives::BlockHash const &relay_parent, libp2p::peer::PeerId const &peer_id)
void onAttestComplete(libp2p::peer::PeerId const &peer_id, ValidateAndSecondResult &&result)
primitives::BlockHash relay_parent
void notify(libp2p::peer::PeerId const &peer_id, primitives::BlockHash const &relay_parent, std::shared_ptr< network::Statement > const &statement)
std::shared_ptr< crypto::Sr25519Provider > crypto_provider_
void handleFetchedCollation(network::ParachainId id, primitives::BlockHash const &relay_parent, libp2p::peer::PeerId const &peer_id, network::CollationFetchingResponse &&response)
boost::variant< CollatorDeclaration, CollatorAdvertisement, Dummy, Dummy, Seconded > CollationMessage
std::string_view to_string(SlotType s)
std::optional< ImportStatementSummary > importStatement(std::shared_ptr< network::Statement > const &statement)
std::shared_ptr< crypto::Hasher > hasher_
void notifyBackedCandidate(std::shared_ptr< network::Statement > const &statement)
std::shared_ptr< network::PeerManager > pm_
CandidateState candidate_state
void setAssignedParachain(std::optional< network::ParachainId > para_id)
network::ValidatorIndex from_validator
void handleNotify(libp2p::peer::PeerId const &peer_id, primitives::BlockHash const &relay_parent)
void onValidationComplete(libp2p::peer::PeerId const &peer_id, ValidateAndSecondResult &&result)
void requestCollations(network::PendingCollation const &pending_collation)
std::shared_ptr< network::Statement > createAndSignStatement(ValidateAndSecondResult &validation_result)
std::shared_ptr< FetchedCollationState > FetchedCollation
outcome::result< void > result
CandidateDescriptor descriptor
struct kagome::parachain::ParachainProcessorImpl::@12 our_current_state_
boost::variant< CollationMessage > ProtocolMessage
outcome::result< FetchedCollation > loadIntoResponseSlot(network::ParachainId id, primitives::BlockHash const &relay_parent, libp2p::peer::PeerId const &peer_id, network::CollationFetchingResponse &&response)
std::shared_ptr< network::Statement > statement
auto sharedAccess(F &&f) const
std::shared_ptr< WorkersContext > this_context_
network::CandidateDescriptor const & candidateDescriptorFrom(FetchedCollationState const &fetched_collation_state)
void notifyAvailableData()
std::optional< ImportStatementSummary > importStatementToTable(primitives::BlockHash const &candidate_hash, std::shared_ptr< network::Statement > const &statement)
libp2p::peer::PeerInfo PeerInfo
~ParachainProcessorImpl()
std::unique_ptr< std::thread > workers_[kBackgroundWorkers]
libp2p::peer::PeerId PeerId
auto exclusiveAccess(F &&f)
std::shared_ptr< WorkGuard > work_guard_
void notify_internal(std::shared_ptr< WorkersContext > &context, F &&func)
SafeObject< ParachainMap > collations_
std::tuple<> Dummy
NU element.
ParachainProcessorImpl(std::shared_ptr< network::PeerManager > pm, std::shared_ptr< crypto::Sr25519Provider > crypto_provider, std::shared_ptr< network::Router > router, std::shared_ptr< boost::asio::io_context > this_context, std::shared_ptr< crypto::Sr25519Keypair > keypair, std::shared_ptr< crypto::Hasher > hasher)
network::CollatorPublicKey const & collatorIdFromDescriptor(network::CandidateDescriptor const &descriptor)
void onAttestNoPoVComplete(libp2p::peer::PeerId const &peer_id, ValidateAndSecondResult &&result)
outcome::result< void > validateCandidate(FetchedCollation &fetched_collation)
primitives::BlockHash candidate_hash
void notifyStatementDistributionSystem(std::shared_ptr< network::Statement > const &statement)
std::shared_ptr< network::Router > router_
std::shared_ptr< crypto::Sr25519Keypair > keypair_
outcome::result< network::Signature > sign(T const &t) const
void appendAsyncValidationTask(network::ParachainId id, primitives::BlockHash const &relay_parent, libp2p::peer::PeerId const &peer_id)
FetchedCollation fetched_collation
libp2p::peer::PeerId const & peer_id
std::optional< network::ValidatorIndex > getOurIndex()
void validateAndMakeAvailable(libp2p::peer::PeerId const &peer_id, primitives::BlockHash const &relay_parent, FetchedCollation fetched_collation, F &&callback)
outcome::result< void > validateErasureCoding(FetchedCollation &fetched_collation)
OUTCOME_CPP_DEFINE_CATEGORY(kagome::parachain, ParachainProcessorImpl::Error, e)
void handleStatement(libp2p::peer::PeerId const &peer_id, primitives::BlockHash const &relay_parent, std::shared_ptr< network::Statement > const &statement)
std::shared_ptr< WorkersContext > context_