18 constexpr
const char *readyTransactionsMetricName =
19 "kagome_ready_transactions_number";
25 using primitives::events::ExtrinsicLifecycleEvent;
28 std::shared_ptr<runtime::TaggedTransactionQueue> ttq,
29 std::shared_ptr<crypto::Hasher> hasher,
30 std::shared_ptr<network::TransactionsTransmitter> tx_transmitter,
31 std::unique_ptr<PoolModerator> moderator,
32 std::shared_ptr<blockchain::BlockHeaderRepository> header_repo,
33 std::shared_ptr<primitives::events::ExtrinsicSubscriptionEngine>
35 std::shared_ptr<subscription::ExtrinsicEventKeyRepository> ext_key_repo,
37 : header_repo_{std::move(header_repo)},
45 BOOST_ASSERT_MSG(
header_repo_ !=
nullptr,
"header repo is nullptr");
46 BOOST_ASSERT_MSG(
ttq_ !=
nullptr,
"tagged-transaction queue is nullptr");
47 BOOST_ASSERT_MSG(
hasher_ !=
nullptr,
"hasher is nullptr");
48 BOOST_ASSERT_MSG(
tx_transmitter_ !=
nullptr,
"tx_transmitter is nullptr");
49 BOOST_ASSERT_MSG(
moderator_ !=
nullptr,
"moderator is nullptr");
50 BOOST_ASSERT_MSG(
sub_engine_ !=
nullptr,
"sub engine is nullptr");
52 "extrinsic event key repository is nullptr");
56 readyTransactionsMetricName,
57 "Number of transactions in the ready queue");
63 outcome::result<primitives::Transaction>
67 OUTCOME_TRY(res,
ttq_->validate_transaction(source, extrinsic));
69 return visit_in_place(
72 return visit_in_place(
75 [](
const auto &validity_error)
76 -> outcome::result<primitives::Transaction> {
77 return validity_error;
81 -> outcome::result<primitives::Transaction> {
83 size_t length = extrinsic.
data.size();
111 return submitOne(std::make_shared<Transaction>(std::move(tx)));
115 const std::shared_ptr<Transaction> &tx) {
116 if (
auto [_, ok] =
imported_txs_.emplace(tx->hash, tx); !ok) {
121 if (processResult.has_error()
123 if (
auto key =
ext_key_repo_->get(tx->hash); key.has_value()) {
125 ExtrinsicLifecycleEvent::Dropped(key.value()));
130 "Extrinsic {} with hash {} was added to the pool",
131 tx->ext.data.toHex(),
135 return processResult;
139 const std::shared_ptr<Transaction> &tx) {
146 return outcome::success();
150 const std::shared_ptr<Transaction> &tx) {
157 return outcome::success();
165 const std::shared_ptr<Transaction> &tx) {
167 if (
auto key =
ext_key_repo_->get(tx->hash); key.has_value()) {
169 ExtrinsicLifecycleEvent::Future(key.value()));
174 const std::shared_ptr<Transaction> &tx) {
179 return outcome::success();
187 return outcome::success();
191 const std::shared_ptr<Transaction> &tx) {
192 for (
auto &tag : tx->requires) {
195 if (
auto key =
ext_key_repo_->get(tx->hash); key.has_value()) {
197 ExtrinsicLifecycleEvent::Future(key.value()));
204 if (tx_node.empty()) {
206 "Extrinsic with hash {} was not found in the pool during remove",
210 const auto &tx = tx_node.mapped();
218 "Extrinsic {} with hash {} was removed from the pool",
219 tx->ext.data.toHex(),
221 return std::move(*tx);
227 while (!postponed_txs.empty()) {
228 auto tx = postponed_txs.front().lock();
229 postponed_txs.pop_front();
232 if (result.has_error()
235 std::make_move_iterator(postponed_txs.begin()),
236 std::make_move_iterator(postponed_txs.end()));
243 const std::shared_ptr<Transaction> &tx) {
244 for (
auto &tag : tx->requires) {
246 for (
auto i = range.first; i != range.second;) {
247 if (i->second.lock() == tx) {
255 std::map<Transaction::Hash, std::shared_ptr<Transaction>>
257 std::map<Transaction::Hash, std::shared_ptr<Transaction>> ready;
259 if (
auto tx = it.second.lock()) {
260 ready.emplace(it.first, std::move(tx));
266 const std::unordered_map<Transaction::Hash, std::shared_ptr<Transaction>>
275 std::vector<Transaction::Hash> remove_to;
279 remove_to.emplace_back(txHash);
283 for (
auto &tx_hash : remove_to) {
285 if (
auto key =
ext_key_repo_->get(tx.hash); key.has_value()) {
287 ExtrinsicLifecycleEvent::Dropped(key.value()));
294 return outcome::success();
298 const std::shared_ptr<const Transaction> &tx)
const {
300 return i !=
ready_txs_.end() && !i->second.expired();
304 const std::shared_ptr<const Transaction> &tx)
const {
306 tx->requires.begin(), tx->requires.end(), [
this](
auto &&tag) {
308 return range.first != range.second;
313 if (
auto [_, ok] =
ready_txs_.emplace(tx->hash, tx); ok) {
314 if (
auto key =
ext_key_repo_->get(tx->hash); key.has_value()) {
316 ExtrinsicLifecycleEvent::Ready(key.value()));
325 const std::shared_ptr<Transaction> &tx) {
326 for (
auto &tag : tx->requires) {
328 for (
auto i = range.first; i != range.second;) {
330 if (ci->second.lock() == tx) {
333 std::move(node.mapped()));
340 const std::shared_ptr<Transaction> &tx) {
341 for (
auto &tag : tx->provides) {
350 for (
auto it = range.first; it != range.second;) {
351 auto tx = (it++)->second.lock();
363 if (
auto tx_node =
ready_txs_.extract(tx->hash); !tx_node.empty()) {
367 if (
auto key =
ext_key_repo_->get(tx->hash); key.has_value()) {
369 ExtrinsicLifecycleEvent::Future(key.value()));
375 const std::shared_ptr<Transaction> &tx) {
376 for (
auto &tag : tx->requires) {
382 const std::shared_ptr<Transaction> &tx) {
383 for (
auto &tag : tx->provides) {
385 for (
auto i = range.first; i != range.second; ++i) {
386 if (i->second.lock() == tx) {
401 if (
auto tx = it->second.lock()) {
std::shared_ptr< blockchain::BlockHeaderRepository > header_repo_
outcome::result< primitives::Transaction > constructTransaction(primitives::TransactionSource source, primitives::Extrinsic extrinsic) const override
outcome::result< Transaction::Hash > submitExtrinsic(primitives::TransactionSource source, primitives::Extrinsic extrinsic) override
virtual void set(double val)=0
Set the gauge to the given value.
const std::unordered_map< Transaction::Hash, std::shared_ptr< Transaction > > & getPendingTransactions() const override
void commitRequiredTags(const std::shared_ptr< Transaction > &tx)
outcome::result< void > processTransactionAsReady(const std::shared_ptr< Transaction > &tx)
metrics::RegistryPtr metrics_registry_
outcome::result< void > ensureSpace() const
std::unique_ptr< PoolModerator > moderator_
bans stale and invalid transactions for some amount of time
boost::variant< InvalidTransaction, UnknownTransaction > TransactionValidityError
bool checkForReady(const std::shared_ptr< const Transaction > &tx) const
void setReady(const std::shared_ptr< Transaction > &tx)
void delTransactionAsWaiting(const std::shared_ptr< Transaction > &tx)
metrics::Gauge * metric_ready_txs_
common::Buffer data
extrinsic content as byte array
outcome::result< void > submitOne(Transaction &&tx) override
std::shared_ptr< crypto::Hasher > hasher_
Status getStatus() const override
gsl::span< const uint8_t > make_span(const rocksdb::Slice &s)
void rollbackRequiredTags(const std::shared_ptr< Transaction > &tx)
void provideTag(const Transaction::Tag &tag)
Information concerning a valid transaction.
std::map< Transaction::Hash, std::shared_ptr< Transaction > > getReadyTransactions() const override
void addTransactionAsWaiting(const std::shared_ptr< Transaction > &tx)
std::shared_ptr< network::TransactionsTransmitter > tx_transmitter_
bool hasSpaceInReady() const
void postponeTransaction(const std::shared_ptr< Transaction > &tx)
Postpone ready transaction (in case ready limit was enreach before)
outcome::result< Transaction > removeOne(const Transaction::Hash &tx_hash) override
std::list< std::weak_ptr< Transaction > > postponed_txs_
List of ready transaction over limit. It will be process first of all.
bool isInReady(const std::shared_ptr< const Transaction > &tx) const
outcome::result< void > processTransactionAsWaiting(const std::shared_ptr< Transaction > &tx)
std::multimap< Transaction::Tag, std::weak_ptr< Transaction > > tx_provides_tag_
Transactions which provides specific tags.
void unsetReady(const std::shared_ptr< Transaction > &tx)
std::unordered_map< Transaction::Hash, std::shared_ptr< Transaction > > imported_txs_
All of imported transaction, contained in the pool.
outcome::result< std::vector< Transaction > > removeStale(const primitives::BlockId &at) override
std::multimap< Transaction::Tag, std::weak_ptr< Transaction > > tx_depends_on_tag_
Transactions with resolved requirement of a specific tag.
outcome::result< void > processTransaction(const std::shared_ptr< Transaction > &tx)
void commitProvidedTags(const std::shared_ptr< Transaction > &tx)
boost::variant< BlockHash, BlockNumber > BlockId
Block id is the variant over BlockHash and BlockNumber.
std::shared_ptr< runtime::TaggedTransactionQueue > ttq_
std::shared_ptr< subscription::ExtrinsicEventKeyRepository > ext_key_repo_
TransactionPoolImpl(std::shared_ptr< runtime::TaggedTransactionQueue > ttq, std::shared_ptr< crypto::Hasher > hasher, std::shared_ptr< network::TransactionsTransmitter > tx_transmitter, std::unique_ptr< PoolModerator > moderator, std::shared_ptr< blockchain::BlockHeaderRepository > header_repo, std::shared_ptr< primitives::events::ExtrinsicSubscriptionEngine > sub_engine, std::shared_ptr< subscription::ExtrinsicEventKeyRepository > ext_key_repo, Limits limits)
std::shared_ptr< primitives::events::ExtrinsicSubscriptionEngine > sub_engine_
Extrinsic class represents extrinsic.
void rollbackProvidedTags(const std::shared_ptr< Transaction > &tx)
void processPostponedTransactions()
Process postponed transactions (in case appearing space for them)
std::unordered_map< Transaction::Hash, std::weak_ptr< Transaction > > ready_txs_
Collection transaction with full-satisfied dependencies.
void unprovideTag(const Transaction::Tag &tag)
std::multimap< Transaction::Tag, std::weak_ptr< Transaction > > tx_waits_tag_
Transactions with unresolved require of specific tags.