Kagome
Polkadot Runtime Engine in C++17
api_service_impl.cpp
Go to the documentation of this file.
1 
7 
8 #include <boost/algorithm/string/replace.hpp>
9 
11 #include "api/jrpc/jrpc_server.hpp"
16 #include "common/hexutil.hpp"
17 #include "primitives/common.hpp"
23 
24 namespace {
25  thread_local class {
26  private:
27  std::optional<kagome::api::Session::SessionId> bound_session_id_ =
28  std::nullopt;
29 
30  public:
31  void storeSessionId(kagome::api::Session::SessionId id) {
32  bound_session_id_ = id;
33  }
34  void releaseSessionId() {
35  bound_session_id_ = std::nullopt;
36  }
37  std::optional<kagome::api::Session::SessionId> fetchSessionId() {
38  return bound_session_id_;
39  }
40  } threaded_info;
41 
42  template <typename Func>
43  auto withThisSession(Func &&f) {
44  if (auto session_id = threaded_info.fetchSessionId(); session_id)
45  return std::forward<Func>(f)(*session_id);
46 
47  throw jsonrpc::InternalErrorFault(
48  "Internal error. No session was bound to subscription.");
49  }
50 } // namespace
51 
52 namespace {
53  using namespace kagome::api;
54 
65  template <typename F>
66  inline void forJsonData(std::shared_ptr<JRpcServer> server,
68  uint32_t set_id,
69  std::string_view name,
70  jsonrpc::Value &&value,
71  F &&f) {
72  BOOST_ASSERT(server);
73  BOOST_ASSERT(logger);
74  BOOST_ASSERT(!name.empty());
75 
76  jsonrpc::Value::Struct response;
77  response["result"] = std::move(value);
78  response["subscription"] = makeValue(set_id);
79 
80  jsonrpc::Request::Parameters params;
81  params.push_back(std::move(response));
82 
83  server->processJsonData(name.data(), params, [&](const auto &response) {
84  if (response.has_value())
85  std::forward<F>(f)(response.value());
86  else
87  logger->error("process Json data failed => {}",
88  response.error().message());
89  });
90  }
91  inline void sendEvent(std::shared_ptr<JRpcServer> server,
92  std::shared_ptr<Session> session,
93  kagome::log::Logger logger,
94  uint32_t set_id,
95  std::string_view name,
96  jsonrpc::Value &&value) {
97  BOOST_ASSERT(session);
98  forJsonData(server,
99  logger,
100  set_id,
101  name,
102  std::move(value),
103  [session{std::move(session)}](const auto &response) {
104  session->respond(response);
105  });
106  }
107 } // namespace
108 
109 namespace kagome::api {
110  KAGOME_DEFINE_CACHE(api_service);
111 
112  const std::string kRpcEventRuntimeVersion = "state_runtimeVersion";
113  const std::string kRpcEventNewHeads = "chain_newHead";
114  const std::string kRpcEventFinalizedHeads = "chain_finalizedHead";
115  const std::string kRpcEventSubscribeStorage = "state_storage";
116 
117  const std::string kRpcEventUpdateExtrinsic = "author_extrinsicUpdate";
118 
120  const std::shared_ptr<application::AppStateManager> &app_state_manager,
121  std::shared_ptr<api::RpcThreadPool> thread_pool,
122  ListenerList listeners,
123  std::shared_ptr<JRpcServer> server,
124  const ProcessorSpan &processors,
125  StorageSubscriptionEnginePtr storage_sub_engine,
126  ChainSubscriptionEnginePtr chain_sub_engine,
127  ExtrinsicSubscriptionEnginePtr ext_sub_engine,
128  std::shared_ptr<subscription::ExtrinsicEventKeyRepository>
129  extrinsic_event_key_repo,
130  std::shared_ptr<blockchain::BlockTree> block_tree,
131  std::shared_ptr<storage::trie::TrieStorage> trie_storage,
132  std::shared_ptr<runtime::Core> core)
133  : thread_pool_(std::move(thread_pool)),
134  listeners_(std::move(listeners.listeners)),
135  server_(std::move(server)),
136  logger_{log::createLogger("ApiService", "api")},
137  block_tree_{std::move(block_tree)},
138  trie_storage_{std::move(trie_storage)},
139  core_(std::move(core)),
140  subscription_engines_{.storage = std::move(storage_sub_engine),
141  .chain = std::move(chain_sub_engine),
142  .ext = std::move(ext_sub_engine)},
143  extrinsic_event_key_repo_{std::move(extrinsic_event_key_repo)} {
144  BOOST_ASSERT(thread_pool_);
145  BOOST_ASSERT(block_tree_);
146  BOOST_ASSERT(trie_storage_);
147  BOOST_ASSERT(core_);
148  BOOST_ASSERT(
149  std::all_of(listeners_.cbegin(), listeners_.cend(), [](auto &listener) {
150  return listener != nullptr;
151  }));
152  for (auto &processor : processors.processors) {
153  BOOST_ASSERT(processor != nullptr);
154  processor->registerHandlers();
155  }
156 
157  BOOST_ASSERT(app_state_manager);
158  app_state_manager->takeControl(*this);
159 
160  BOOST_ASSERT(subscription_engines_.chain);
161  BOOST_ASSERT(subscription_engines_.storage);
162  BOOST_ASSERT(subscription_engines_.ext);
163  BOOST_ASSERT(extrinsic_event_key_repo_);
164  }
165 
167  const std::vector<
168  std::pair<common::Buffer, std::optional<common::Buffer>>>
169  &key_value_pairs,
170  const primitives::BlockHash &block) {
176 
177  jsonrpc::Value::Array changes;
178  changes.reserve(key_value_pairs.size());
179  for (auto &[key, value] : key_value_pairs) {
180  changes.emplace_back(jsonrpc::Value{jsonrpc::Value::Array{
181  api::makeValue(key),
182  value.has_value() ? api::makeValue(hex_lower_0x(value.value()))
183  : api::makeValue(std::nullopt)}});
184  }
185 
186  jsonrpc::Value::Struct result;
187  result["changes"] = std::move(changes);
188  result["block"] = api::makeValue(hex_lower_0x(block));
189 
190  return result;
191  }
192 
194  for (const auto &listener : listeners_) {
195  auto on_new_session =
196  [wp = weak_from_this()](const sptr<Session> &session) mutable {
197  auto self = wp.lock();
198  if (!self) {
199  return;
200  }
201 
202 #define UNWRAP_WEAK_PTR(callback) \
203  [wp](auto &&...params) mutable { \
204  if (auto self = wp.lock()) { \
205  self->callback(params...); \
206  } \
207  }
208 
209  if (SessionType::kWs == session->type()) {
210  auto session_context =
211  self->storeSessionWithId(session->id(), session);
212  BOOST_ASSERT(session_context);
213  session_context->storage_sub->setCallback(
215  session_context->chain_sub->setCallback(
217  session_context->ext_sub->setCallback(
219  }
220 
221  session->connectOnRequest(UNWRAP_WEAK_PTR(onSessionRequest));
222  session->connectOnCloseHandler(UNWRAP_WEAK_PTR(onSessionClose));
223  };
224 #undef UNWRAP_WEAK_PTR
225 
226  listener->setHandlerForNewSession(std::move(on_new_session));
227  }
228  return true;
229  } // namespace kagome::api
230 
232  thread_pool_->start();
233  SL_DEBUG(logger_, "API Service started");
234  return true;
235  }
236 
238  thread_pool_->stop();
239  SL_DEBUG(logger_, "API Service stopped");
240  }
241 
242  std::shared_ptr<ApiServiceImpl::SessionSubscriptions>
244  const std::shared_ptr<Session> &session) {
245  std::lock_guard guard(subscribed_sessions_cs_);
246  auto &&[it, inserted] = subscribed_sessions_.emplace(
247  id,
248  std::make_shared<ApiServiceImpl::SessionSubscriptions>(
250  .storage_sub = std::make_shared<StorageEventSubscriber>(
251  subscription_engines_.storage, session),
252  .chain_sub = std::make_shared<ChainEventSubscriber>(
253  subscription_engines_.chain, session),
254  .ext_sub = std::make_shared<ExtrinsicEventSubscriber>(
255  subscription_engines_.ext, session),
256  .messages = {}}));
257 
258  BOOST_ASSERT(inserted);
259  return it->second;
260  }
261 
263  std::lock_guard guard(subscribed_sessions_cs_);
264  subscribed_sessions_.erase(id);
265  }
266 
267  outcome::result<ApiServiceImpl::PubsubSubscriptionId>
269  const std::vector<common::Buffer> &keys) {
270  return withThisSession([&](kagome::api::Session::SessionId tid) {
271  return withSession(tid, [&](SessionSubscriptions &session_context) {
272  auto &session = session_context.storage_sub;
273  const auto id = session->generateSubscriptionSetId();
274  const auto &best_block_hash = block_tree_->deepestLeaf().hash;
275  const auto &header = block_tree_->getBlockHeader(best_block_hash);
276  BOOST_ASSERT(header.has_value());
277  auto persistent_batch =
278  trie_storage_->getPersistentBatchAt(header.value().state_root);
279  BOOST_ASSERT(persistent_batch.has_value());
280 
281  auto &pb = persistent_batch.value();
282  BOOST_ASSERT(pb);
283 
284  session_context.messages = uploadMessagesListFromCache();
285 
286  std::vector<std::pair<common::Buffer, std::optional<common::Buffer>>>
287  pairs;
288  pairs.reserve(keys.size());
289 
290  for (auto &key : keys) {
291  session->subscribe(id, key);
292 
293  auto value_opt_res = pb->tryGet(key);
294  if (value_opt_res.has_value()) {
295  pairs.emplace_back(std::move(key),
296  std::move(value_opt_res.value()));
297  }
298  }
299 
300  forJsonData(server_,
301  logger_,
302  id,
303  kRpcEventSubscribeStorage,
304  createStateStorageEvent(std::move(pairs), best_block_hash),
305  [&](const auto &result) {
306  session_context.messages->emplace_back(
307  uploadFromCache(result.data()));
308  });
309 
310  return static_cast<PubsubSubscriptionId>(id);
311  });
312  });
313  }
314 
315  outcome::result<ApiServiceImpl::PubsubSubscriptionId>
317  return withThisSession([&](kagome::api::Session::SessionId tid) {
318  return withSession(tid, [&](SessionSubscriptions &session_context) {
319  auto &session = session_context.chain_sub;
320  const auto id = session->generateSubscriptionSetId();
321  session->subscribe(id,
323 
324  auto header =
325  block_tree_->getBlockHeader(block_tree_->getLastFinalized().hash);
326  if (!header.has_error()) {
327  session_context.messages = uploadMessagesListFromCache();
328  forJsonData(server_,
329  logger_,
330  id,
331  kRpcEventFinalizedHeads,
332  makeValue(header.value()),
333  [&](const auto &result) {
334  session_context.messages->emplace_back(
335  uploadFromCache(result.data()));
336  });
337  } else {
338  logger_->error(
339  "Request block header of the last finalized "
340  "failed with error: "
341  "{}",
342  header.error().message());
343  }
344  return static_cast<PubsubSubscriptionId>(id);
345  });
346  });
347  }
348 
350  PubsubSubscriptionId subscription_id) {
351  return withThisSession([&](kagome::api::Session::SessionId tid) {
352  return withSession(tid, [&](SessionSubscriptions &session_context) {
353  auto &session = session_context.chain_sub;
354  return session->unsubscribe(subscription_id);
355  });
356  });
357  }
358 
359  outcome::result<ApiServiceImpl::PubsubSubscriptionId>
361  return withThisSession([&](kagome::api::Session::SessionId tid) {
362  return withSession(tid, [&](SessionSubscriptions &session_context) {
363  auto &session = session_context.chain_sub;
364  const auto id = session->generateSubscriptionSetId();
365  session->subscribe(id, primitives::events::ChainEventType::kNewHeads);
366 
367  auto header =
368  block_tree_->getBlockHeader(block_tree_->deepestLeaf().hash);
369  if (!header.has_error()) {
370  session_context.messages = uploadMessagesListFromCache();
371  forJsonData(server_,
372  logger_,
373  id,
374  kRpcEventNewHeads,
375  makeValue(header.value()),
376  [&](const auto &result) {
377  session_context.messages->emplace_back(
378  uploadFromCache(result.data()));
379  });
380  } else {
381  logger_->error(
382  "Request block header of the deepest leaf failed with error: {}",
383  header.error().message());
384  }
385  return static_cast<PubsubSubscriptionId>(id);
386  });
387  });
388  }
389 
390  outcome::result<bool> ApiServiceImpl::unsubscribeNewHeads(
391  PubsubSubscriptionId subscription_id) {
392  return withThisSession([&](kagome::api::Session::SessionId tid) {
393  return withSession(tid, [&](SessionSubscriptions &session_context) {
394  auto &session = session_context.chain_sub;
395  return session->unsubscribe(subscription_id);
396  });
397  });
398  }
399 
400  outcome::result<ApiServiceImpl::PubsubSubscriptionId>
402  return withThisSession([&](kagome::api::Session::SessionId tid) {
403  return withSession(tid, [&](SessionSubscriptions &session_context) {
404  auto &session = session_context.chain_sub;
405  const auto id = session->generateSubscriptionSetId();
406  session->subscribe(
408 
409  auto version_res = core_->version(block_tree_->getLastFinalized().hash);
410  if (version_res.has_value()) {
411  const auto &version = version_res.value();
412  session_context.messages = uploadMessagesListFromCache();
413  forJsonData(server_,
414  logger_,
415  id,
416  kRpcEventRuntimeVersion,
418  [&](const auto &result) {
419  session_context.messages->emplace_back(
420  uploadFromCache(result.data()));
421  });
422  }
423  return static_cast<PubsubSubscriptionId>(id);
424  });
425  });
426  }
427 
429  PubsubSubscriptionId subscription_id) {
430  return withThisSession([&](kagome::api::Session::SessionId tid) {
431  return withSession(tid, [&](SessionSubscriptions &session_context) {
432  auto &session = session_context.chain_sub;
433  return session->unsubscribe(subscription_id);
434  });
435  });
436  }
437 
438  outcome::result<ApiServiceImpl::PubsubSubscriptionId>
440  const primitives::Transaction::Hash &tx_hash) {
441  return withThisSession([&](kagome::api::Session::SessionId tid) {
442  return withSession(tid, [&](SessionSubscriptions &session_context) {
443  auto &session_sub = session_context.ext_sub;
444  const auto sub_id = session_sub->generateSubscriptionSetId();
445  const auto key = extrinsic_event_key_repo_->add(tx_hash);
446  session_sub->subscribe(sub_id, key);
447 
448  return static_cast<PubsubSubscriptionId>(sub_id);
449  });
450  });
451  }
452 
454  PubsubSubscriptionId subscription_id) {
455  return withThisSession([&](kagome::api::Session::SessionId tid) {
456  return withSession(tid, [&](SessionSubscriptions &session_context) {
457  auto &session = session_context.ext_sub;
458  return session->unsubscribe(subscription_id);
459  });
460  });
461  }
462 
464  const std::vector<PubsubSubscriptionId> &subscription_ids) {
465  return withThisSession([&](kagome::api::Session::SessionId tid) {
466  return withSession(tid, [&](SessionSubscriptions &session_context) {
467  auto &session = session_context.storage_sub;
468  for (auto id : subscription_ids) session->unsubscribe(id);
469  return true;
470  });
471  });
472  }
473 
474  void ApiServiceImpl::onSessionRequest(std::string_view request,
475  std::shared_ptr<Session> session) {
476  auto thread_session_auto_release = [](void *) {
477  threaded_info.releaseSessionId();
478  };
479 
480  threaded_info.storeSessionId(session->id());
481 
486  std::unique_ptr<void, decltype(thread_session_auto_release)>
487  // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
488  thread_session_keeper(reinterpret_cast<void *>(0xff),
489  std::move(thread_session_auto_release));
490 
491  // TODO(kamilsa): remove that string replacement when
492  // https://github.com/soramitsu/kagome/issues/572 resolved
493  std::string str_request(request);
494  boost::replace_first(str_request, "\"params\":null", "\"params\":[null]");
495 
496  // process new request
497  server_->processData(str_request, [&](std::string_view response) mutable {
498  // process response
499  session->respond(response);
500  });
501 
502  try {
503  withSession(session->id(), [&](SessionSubscriptions &session_context) {
504  if (session_context.messages)
505  for (auto &msg : *session_context.messages) {
506  BOOST_ASSERT(msg);
507  session->respond(*msg);
508  }
509 
510  session_context.messages.reset();
511  });
512  } catch (jsonrpc::InternalErrorFault &) {
513  }
514  }
515 
517  removeSessionById(id);
518  }
519 
521  SessionPtr &session,
522  const Buffer &key,
523  const std::optional<Buffer> &data,
524  const common::Hash256 &block) {
525  sendEvent(server_,
526  session,
527  logger_,
528  set_id,
529  kRpcEventSubscribeStorage,
530  createStateStorageEvent({{key, data}}, block));
531  }
532 
534  SubscriptionSetId set_id,
535  SessionPtr &session,
537  const primitives::events::ChainEventParams &event_params) {
538  std::string_view name;
539  switch (event_type) {
541  name = kRpcEventNewHeads;
542  } break;
545  } break;
548  } break;
550  return;
551  default:
552  BOOST_ASSERT(!"Unknown chain event");
553  return;
554  }
555 
556  BOOST_ASSERT(!name.empty());
557  sendEvent(
558  server_, session, logger_, set_id, name, api::makeValue(event_params));
559  }
560 
562  SubscriptionSetId set_id,
563  SessionPtr &session,
566  sendEvent(server_,
567  session,
568  logger_,
569  set_id,
570  kRpcEventUpdateExtrinsic,
571  api::makeValue(params));
572  }
573 
574 } // namespace kagome::api
outcome::result< PubsubSubscriptionId > subscribeRuntimeVersion() override
primitives::events::ChainSubscriptionEnginePtr ChainSubscriptionEnginePtr
Class represents arbitrary (including empty) byte buffer.
Definition: buffer.hpp:29
outcome::result< PubsubSubscriptionId > subscribeNewHeads() override
std::shared_ptr< blockchain::BlockTree > block_tree_
const std::string kRpcEventRuntimeVersion
void removeSessionById(Session::SessionId id)
outcome::result< uint32_t > subscribeSessionToKeys(const std::vector< common::Buffer > &keys) override
void onSessionClose(Session::SessionId id, SessionType)
const std::string kRpcEventUpdateExtrinsic
SessionSubscriptions::AdditionMessageType uploadFromCache(T &&value)
KAGOME_DEFINE_CACHE(api_service)
void onSessionRequest(std::string_view request, std::shared_ptr< Session > session)
outcome::result< std::unique_ptr< PersistentTrieBatch > > persistent_batch(const std::unique_ptr< TrieStorageImpl > &trie, const RootHash &hash)
STL namespace.
std::shared_ptr< runtime::Core > core_
primitives::events::ExtrinsicSubscriptionEnginePtr ExtrinsicSubscriptionEnginePtr
std::shared_ptr< JRpcServer > server_
auto withSession(kagome::api::Session::SessionId id, Func &&f)
std::shared_ptr< T > sptr
Definition: api_service.hpp:26
boost::variant< std::nullopt_t, HeadsEventParams, RuntimeVersionEventParams, NewRuntimeEventParams > ChainEventParams
Definition: event_types.hpp:51
outcome::result< bool > unsubscribeFromExtrinsicLifecycle(PubsubSubscriptionId subscription_id) override
std::shared_ptr< Session > SessionPtr
void onStorageEvent(SubscriptionSetId set_id, SessionPtr &session, const Buffer &key, const std::optional< Buffer > &data, const common::Hash256 &block)
jsonrpc::Value createStateStorageEvent(const std::vector< std::pair< common::Buffer, std::optional< common::Buffer >>> &key_value_pairs, const primitives::BlockHash &block)
string version
Definition: conf.py:16
subscription::SubscriptionSetId SubscriptionSetId
subscription set id from subscription::SubscriptionEngine
SessionSubscriptions::CachedAdditionMessagesList uploadMessagesListFromCache()
ApiServiceImpl(const std::shared_ptr< application::AppStateManager > &app_state_manager, std::shared_ptr< api::RpcThreadPool > thread_pool, ListenerList listeners, std::shared_ptr< JRpcServer > server, const ProcessorSpan &processors, StorageSubscriptionEnginePtr storage_sub_engine, ChainSubscriptionEnginePtr chain_sub_engine, ExtrinsicSubscriptionEnginePtr ext_sub_engine, std::shared_ptr< subscription::ExtrinsicEventKeyRepository > extrinsic_event_key_repo, std::shared_ptr< blockchain::BlockTree > block_tree, std::shared_ptr< storage::trie::TrieStorage > trie_storage, std::shared_ptr< runtime::Core > core)
uint32_t PubsubSubscriptionId
subscription id for pubsub API methods
Definition: api_service.hpp:23
std::vector< sptr< Listener > > listeners_
void onChainEvent(SubscriptionSetId set_id, SessionPtr &session, primitives::events::ChainEventType event_type, const primitives::events::ChainEventParams &params)
std::shared_ptr< soralog::Logger > Logger
Definition: logger.hpp:23
outcome::result< PubsubSubscriptionId > subscribeForExtrinsicLifecycle(const primitives::Transaction::Hash &tx_hash) override
std::shared_ptr< subscription::ExtrinsicEventKeyRepository > extrinsic_event_key_repo_
std::string hex_lower_0x(gsl::span< const uint8_t > bytes) noexcept
Converts bytes to hex representation with prefix 0x.
Definition: hexutil.cpp:58
primitives::events::StorageSubscriptionEnginePtr StorageSubscriptionEnginePtr
const std::string kRpcEventSubscribeStorage
jsonrpc::Value makeValue(const uint32_t &)
outcome::result< bool > unsubscribeNewHeads(PubsubSubscriptionId subscription_id) override
std::shared_ptr< storage::trie::TrieStorage > trie_storage_
outcome::result< PubsubSubscriptionId > subscribeFinalizedHeads() override
std::shared_ptr< api::RpcThreadPool > thread_pool_
Logger createLogger(const std::string &tag)
Definition: logger.cpp:112
uint64_t SessionId
Definition: session.hpp:43
std::unordered_map< Session::SessionId, std::shared_ptr< SessionSubscriptions > > subscribed_sessions_
struct kagome::api::ApiServiceImpl::@1 subscription_engines_
outcome::result< bool > unsubscribeSessionFromIds(const std::vector< PubsubSubscriptionId > &subscription_id) override
const std::string kRpcEventFinalizedHeads
std::shared_ptr< SessionSubscriptions > storeSessionWithId(Session::SessionId id, const std::shared_ptr< Session > &session)
outcome::result< bool > unsubscribeRuntimeVersion(PubsubSubscriptionId subscription_id) override
outcome::result< bool > unsubscribeFinalizedHeads(PubsubSubscriptionId subscription_id) override
#define UNWRAP_WEAK_PTR(callback)
void onExtrinsicEvent(SubscriptionSetId set_id, SessionPtr &session, primitives::events::SubscribedExtrinsicId id, const primitives::events::ExtrinsicLifecycleEvent &params)
const std::string kRpcEventNewHeads