Kagome
Polkadot Runtime Engine in C++17
transaction_pool_impl.cpp
Go to the documentation of this file.
1 
7 
8 #include "crypto/hasher.hpp"
10 #include "primitives/block_id.hpp"
13 
16 
17 namespace {
18  constexpr const char *readyTransactionsMetricName =
19  "kagome_ready_transactions_number";
20 }
21 
22 namespace kagome::transaction_pool {
23 
25  using primitives::events::ExtrinsicLifecycleEvent;
26 
28  std::shared_ptr<runtime::TaggedTransactionQueue> ttq,
29  std::shared_ptr<crypto::Hasher> hasher,
30  std::shared_ptr<network::TransactionsTransmitter> tx_transmitter,
31  std::unique_ptr<PoolModerator> moderator,
32  std::shared_ptr<blockchain::BlockHeaderRepository> header_repo,
33  std::shared_ptr<primitives::events::ExtrinsicSubscriptionEngine>
34  sub_engine,
35  std::shared_ptr<subscription::ExtrinsicEventKeyRepository> ext_key_repo,
36  Limits limits)
37  : header_repo_{std::move(header_repo)},
38  sub_engine_{std::move(sub_engine)},
39  ext_key_repo_{std::move(ext_key_repo)},
40  ttq_{std::move(ttq)},
41  hasher_{std::move(hasher)},
42  tx_transmitter_{std::move(tx_transmitter)},
43  moderator_{std::move(moderator)},
44  limits_{limits} {
45  BOOST_ASSERT_MSG(header_repo_ != nullptr, "header repo is nullptr");
46  BOOST_ASSERT_MSG(ttq_ != nullptr, "tagged-transaction queue is nullptr");
47  BOOST_ASSERT_MSG(hasher_ != nullptr, "hasher is nullptr");
48  BOOST_ASSERT_MSG(tx_transmitter_ != nullptr, "tx_transmitter is nullptr");
49  BOOST_ASSERT_MSG(moderator_ != nullptr, "moderator is nullptr");
50  BOOST_ASSERT_MSG(sub_engine_ != nullptr, "sub engine is nullptr");
51  BOOST_ASSERT_MSG(ext_key_repo_ != nullptr,
52  "extrinsic event key repository is nullptr");
53 
54  // Register metrics
55  metrics_registry_->registerGaugeFamily(
56  readyTransactionsMetricName,
57  "Number of transactions in the ready queue");
59  metrics_registry_->registerGaugeMetric(readyTransactionsMetricName);
61  }
62 
63  outcome::result<primitives::Transaction>
66  primitives::Extrinsic extrinsic) const {
67  OUTCOME_TRY(res, ttq_->validate_transaction(source, extrinsic));
68 
69  return visit_in_place(
70  res,
72  return visit_in_place(
73  e,
74  // return either invalid or unknown validity error
75  [](const auto &validity_error)
76  -> outcome::result<primitives::Transaction> {
77  return validity_error;
78  });
79  },
80  [&](const primitives::ValidTransaction &v)
81  -> outcome::result<primitives::Transaction> {
82  common::Hash256 hash = hasher_->blake2b_256(extrinsic.data);
83  size_t length = extrinsic.data.size();
84 
85  return primitives::Transaction{extrinsic,
86  length,
87  hash,
88  v.priority,
89  v.longevity,
90  v.requires,
91  v.provides,
92  v.propagate};
93  });
94  }
95 
96  outcome::result<Transaction::Hash> TransactionPoolImpl::submitExtrinsic(
98  OUTCOME_TRY(tx, constructTransaction(source, extrinsic));
99 
100  if (tx.should_propagate && !imported_txs_.count(tx.hash)) {
101  tx_transmitter_->propagateTransactions(gsl::make_span(std::vector{tx}));
102  }
103  auto hash = tx.hash;
104  // send to pool
105  OUTCOME_TRY(submitOne(std::move(tx)));
106 
107  return hash;
108  }
109 
110  outcome::result<void> TransactionPoolImpl::submitOne(Transaction &&tx) {
111  return submitOne(std::make_shared<Transaction>(std::move(tx)));
112  }
113 
114  outcome::result<void> TransactionPoolImpl::submitOne(
115  const std::shared_ptr<Transaction> &tx) {
116  if (auto [_, ok] = imported_txs_.emplace(tx->hash, tx); !ok) {
118  }
119 
120  auto processResult = processTransaction(tx);
121  if (processResult.has_error()
122  && processResult.error() == TransactionPoolError::POOL_IS_FULL) {
123  if (auto key = ext_key_repo_->get(tx->hash); key.has_value()) {
124  sub_engine_->notify(key.value(),
125  ExtrinsicLifecycleEvent::Dropped(key.value()));
126  }
127  imported_txs_.erase(tx->hash);
128  } else {
129  SL_DEBUG(logger_,
130  "Extrinsic {} with hash {} was added to the pool",
131  tx->ext.data.toHex(),
132  tx->hash.toHex());
133  }
134 
135  return processResult;
136  }
137 
139  const std::shared_ptr<Transaction> &tx) {
140  OUTCOME_TRY(ensureSpace());
141  if (checkForReady(tx)) {
142  OUTCOME_TRY(processTransactionAsReady(tx));
143  } else {
144  OUTCOME_TRY(processTransactionAsWaiting(tx));
145  }
146  return outcome::success();
147  }
148 
150  const std::shared_ptr<Transaction> &tx) {
151  if (hasSpaceInReady()) {
152  setReady(tx);
153  } else {
155  }
156 
157  return outcome::success();
158  }
159 
161  return ready_txs_.size() < limits_.max_ready_num;
162  }
163 
165  const std::shared_ptr<Transaction> &tx) {
166  postponed_txs_.push_back(tx);
167  if (auto key = ext_key_repo_->get(tx->hash); key.has_value()) {
168  sub_engine_->notify(key.value(),
169  ExtrinsicLifecycleEvent::Future(key.value()));
170  }
171  }
172 
174  const std::shared_ptr<Transaction> &tx) {
175  OUTCOME_TRY(ensureSpace());
176 
178 
179  return outcome::success();
180  }
181 
182  outcome::result<void> TransactionPoolImpl::ensureSpace() const {
183  if (imported_txs_.size() > limits_.capacity) {
185  }
186 
187  return outcome::success();
188  }
189 
191  const std::shared_ptr<Transaction> &tx) {
192  for (auto &tag : tx->requires) {
193  tx_waits_tag_.emplace(tag, tx);
194  }
195  if (auto key = ext_key_repo_->get(tx->hash); key.has_value()) {
196  sub_engine_->notify(key.value(),
197  ExtrinsicLifecycleEvent::Future(key.value()));
198  }
199  }
200 
201  outcome::result<Transaction> TransactionPoolImpl::removeOne(
202  const Transaction::Hash &tx_hash) {
203  auto tx_node = imported_txs_.extract(tx_hash);
204  if (tx_node.empty()) {
205  SL_TRACE(logger_,
206  "Extrinsic with hash {} was not found in the pool during remove",
207  tx_hash);
209  }
210  const auto &tx = tx_node.mapped();
211 
212  unsetReady(tx);
214 
216 
217  SL_DEBUG(logger_,
218  "Extrinsic {} with hash {} was removed from the pool",
219  tx->ext.data.toHex(),
220  tx->hash.toHex());
221  return std::move(*tx);
222  }
223 
225  // Move to local for avoid endless cycle at possible coming back tx
226  auto postponed_txs = std::move(postponed_txs_);
227  while (!postponed_txs.empty()) {
228  auto tx = postponed_txs.front().lock();
229  postponed_txs.pop_front();
230 
231  auto result = processTransaction(tx);
232  if (result.has_error()
233  && result.error() == TransactionPoolError::POOL_IS_FULL) {
234  postponed_txs_.insert(postponed_txs_.end(),
235  std::make_move_iterator(postponed_txs.begin()),
236  std::make_move_iterator(postponed_txs.end()));
237  return;
238  }
239  }
240  }
241 
243  const std::shared_ptr<Transaction> &tx) {
244  for (auto &tag : tx->requires) {
245  auto range = tx_waits_tag_.equal_range(tag);
246  for (auto i = range.first; i != range.second;) {
247  if (i->second.lock() == tx) {
248  tx_waits_tag_.erase(i);
249  break;
250  }
251  }
252  }
253  }
254 
255  std::map<Transaction::Hash, std::shared_ptr<Transaction>>
257  std::map<Transaction::Hash, std::shared_ptr<Transaction>> ready;
258  std::for_each(ready_txs_.begin(), ready_txs_.end(), [&ready](auto it) {
259  if (auto tx = it.second.lock()) {
260  ready.emplace(it.first, std::move(tx));
261  }
262  });
263  return ready;
264  }
265 
266  const std::unordered_map<Transaction::Hash, std::shared_ptr<Transaction>>
268  return imported_txs_;
269  }
270 
271  outcome::result<std::vector<Transaction>> TransactionPoolImpl::removeStale(
272  const primitives::BlockId &at) {
273  OUTCOME_TRY(number, header_repo_->getNumberById(at));
274 
275  std::vector<Transaction::Hash> remove_to;
276 
277  for (auto &[txHash, tx] : imported_txs_) {
278  if (moderator_->banIfStale(number, *tx)) {
279  remove_to.emplace_back(txHash);
280  }
281  }
282 
283  for (auto &tx_hash : remove_to) {
284  OUTCOME_TRY(tx, removeOne(tx_hash));
285  if (auto key = ext_key_repo_->get(tx.hash); key.has_value()) {
286  sub_engine_->notify(key.value(),
287  ExtrinsicLifecycleEvent::Dropped(key.value()));
288  ext_key_repo_->remove(tx.hash);
289  }
290  }
291 
292  moderator_->updateBan();
293 
294  return outcome::success();
295  }
296 
298  const std::shared_ptr<const Transaction> &tx) const {
299  auto i = ready_txs_.find(tx->hash);
300  return i != ready_txs_.end() && !i->second.expired();
301  }
302 
304  const std::shared_ptr<const Transaction> &tx) const {
305  return std::all_of(
306  tx->requires.begin(), tx->requires.end(), [this](auto &&tag) {
307  auto range = tx_provides_tag_.equal_range(tag);
308  return range.first != range.second;
309  });
310  }
311 
312  void TransactionPoolImpl::setReady(const std::shared_ptr<Transaction> &tx) {
313  if (auto [_, ok] = ready_txs_.emplace(tx->hash, tx); ok) {
314  if (auto key = ext_key_repo_->get(tx->hash); key.has_value()) {
315  sub_engine_->notify(key.value(),
316  ExtrinsicLifecycleEvent::Ready(key.value()));
317  }
318  commitRequiredTags(tx);
319  commitProvidedTags(tx);
321  }
322  }
323 
325  const std::shared_ptr<Transaction> &tx) {
326  for (auto &tag : tx->requires) {
327  auto range = tx_waits_tag_.equal_range(tag);
328  for (auto i = range.first; i != range.second;) {
329  auto ci = i++;
330  if (ci->second.lock() == tx) {
331  auto node = tx_waits_tag_.extract(ci);
332  tx_depends_on_tag_.emplace(std::move(node.key()),
333  std::move(node.mapped()));
334  }
335  }
336  }
337  }
338 
340  const std::shared_ptr<Transaction> &tx) {
341  for (auto &tag : tx->provides) {
342  tx_provides_tag_.emplace(tag, tx);
343 
344  provideTag(tag);
345  }
346  }
347 
348  void TransactionPoolImpl::provideTag(const Transaction::Tag &tag) {
349  auto range = tx_waits_tag_.equal_range(tag);
350  for (auto it = range.first; it != range.second;) {
351  auto tx = (it++)->second.lock();
352  if (checkForReady(tx)) {
353  if (hasSpaceInReady()) {
354  setReady(tx);
355  } else {
357  }
358  }
359  }
360  }
361 
362  void TransactionPoolImpl::unsetReady(const std::shared_ptr<Transaction> &tx) {
363  if (auto tx_node = ready_txs_.extract(tx->hash); !tx_node.empty()) {
367  if (auto key = ext_key_repo_->get(tx->hash); key.has_value()) {
368  sub_engine_->notify(key.value(),
369  ExtrinsicLifecycleEvent::Future(key.value()));
370  }
371  }
372  }
373 
375  const std::shared_ptr<Transaction> &tx) {
376  for (auto &tag : tx->requires) {
377  tx_waits_tag_.emplace(tag, tx);
378  }
379  }
380 
382  const std::shared_ptr<Transaction> &tx) {
383  for (auto &tag : tx->provides) {
384  auto range = tx_provides_tag_.equal_range(tag);
385  for (auto i = range.first; i != range.second; ++i) {
386  if (i->second.lock() == tx) {
387  tx_provides_tag_.erase(i);
388  break;
389  }
390  }
391 
392  unprovideTag(tag);
393  }
394  }
395 
396  void TransactionPoolImpl::unprovideTag(const Transaction::Tag &tag) {
397  if (tx_provides_tag_.find(tag) == tx_provides_tag_.end()) {
398  for (auto it = tx_depends_on_tag_.find(tag);
399  it != tx_depends_on_tag_.end();
400  it = tx_depends_on_tag_.find(tag)) {
401  if (auto tx = it->second.lock()) {
402  unsetReady(tx);
403  }
404  tx_depends_on_tag_.erase(it);
405  }
406  }
407  }
408 
410  return Status{ready_txs_.size(), imported_txs_.size() - ready_txs_.size()};
411  }
412 
413 } // namespace kagome::transaction_pool
std::shared_ptr< blockchain::BlockHeaderRepository > header_repo_
outcome::result< primitives::Transaction > constructTransaction(primitives::TransactionSource source, primitives::Extrinsic extrinsic) const override
outcome::result< Transaction::Hash > submitExtrinsic(primitives::TransactionSource source, primitives::Extrinsic extrinsic) override
virtual void set(double val)=0
Set the gauge to the given value.
const std::unordered_map< Transaction::Hash, std::shared_ptr< Transaction > > & getPendingTransactions() const override
void commitRequiredTags(const std::shared_ptr< Transaction > &tx)
outcome::result< void > processTransactionAsReady(const std::shared_ptr< Transaction > &tx)
std::unique_ptr< PoolModerator > moderator_
bans stale and invalid transactions for some amount of time
boost::variant< InvalidTransaction, UnknownTransaction > TransactionValidityError
common::Hash256 Hash
bool checkForReady(const std::shared_ptr< const Transaction > &tx) const
void setReady(const std::shared_ptr< Transaction > &tx)
void delTransactionAsWaiting(const std::shared_ptr< Transaction > &tx)
common::Buffer data
extrinsic content as byte array
Definition: extrinsic.hpp:27
outcome::result< void > submitOne(Transaction &&tx) override
gsl::span< const uint8_t > make_span(const rocksdb::Slice &s)
uint32_t BlockNumber
Definition: common.hpp:18
void rollbackRequiredTags(const std::shared_ptr< Transaction > &tx)
Information concerning a valid transaction.
std::map< Transaction::Hash, std::shared_ptr< Transaction > > getReadyTransactions() const override
void addTransactionAsWaiting(const std::shared_ptr< Transaction > &tx)
std::shared_ptr< network::TransactionsTransmitter > tx_transmitter_
void postponeTransaction(const std::shared_ptr< Transaction > &tx)
Postpone ready transaction (in case ready limit was enreach before)
outcome::result< Transaction > removeOne(const Transaction::Hash &tx_hash) override
std::list< std::weak_ptr< Transaction > > postponed_txs_
List of ready transaction over limit. It will be process first of all.
bool isInReady(const std::shared_ptr< const Transaction > &tx) const
outcome::result< void > processTransactionAsWaiting(const std::shared_ptr< Transaction > &tx)
std::multimap< Transaction::Tag, std::weak_ptr< Transaction > > tx_provides_tag_
Transactions which provides specific tags.
void unsetReady(const std::shared_ptr< Transaction > &tx)
std::unordered_map< Transaction::Hash, std::shared_ptr< Transaction > > imported_txs_
All of imported transaction, contained in the pool.
outcome::result< std::vector< Transaction > > removeStale(const primitives::BlockId &at) override
std::multimap< Transaction::Tag, std::weak_ptr< Transaction > > tx_depends_on_tag_
Transactions with resolved requirement of a specific tag.
outcome::result< void > processTransaction(const std::shared_ptr< Transaction > &tx)
void commitProvidedTags(const std::shared_ptr< Transaction > &tx)
boost::variant< BlockHash, BlockNumber > BlockId
Block id is the variant over BlockHash and BlockNumber.
Definition: block_id.hpp:18
std::shared_ptr< runtime::TaggedTransactionQueue > ttq_
std::shared_ptr< subscription::ExtrinsicEventKeyRepository > ext_key_repo_
TransactionPoolImpl(std::shared_ptr< runtime::TaggedTransactionQueue > ttq, std::shared_ptr< crypto::Hasher > hasher, std::shared_ptr< network::TransactionsTransmitter > tx_transmitter, std::unique_ptr< PoolModerator > moderator, std::shared_ptr< blockchain::BlockHeaderRepository > header_repo, std::shared_ptr< primitives::events::ExtrinsicSubscriptionEngine > sub_engine, std::shared_ptr< subscription::ExtrinsicEventKeyRepository > ext_key_repo, Limits limits)
std::shared_ptr< primitives::events::ExtrinsicSubscriptionEngine > sub_engine_
Extrinsic class represents extrinsic.
Definition: extrinsic.hpp:24
void rollbackProvidedTags(const std::shared_ptr< Transaction > &tx)
void processPostponedTransactions()
Process postponed transactions (in case appearing space for them)
std::unordered_map< Transaction::Hash, std::weak_ptr< Transaction > > ready_txs_
Collection transaction with full-satisfied dependencies.
std::multimap< Transaction::Tag, std::weak_ptr< Transaction > > tx_waits_tag_
Transactions with unresolved require of specific tags.