Kagome
Polkadot Runtime Engine in C++17
api_service_impl.hpp
Go to the documentation of this file.
1 
6 #ifndef KAGOME_API_SERVICE_IMPL_HPP
7 #define KAGOME_API_SERVICE_IMPL_HPP
8 
10 
11 #include <functional>
12 #include <mutex>
13 #include <type_traits>
14 #include <unordered_map>
15 
16 #include <jsonrpc-lean/fault.h>
17 
20 #include "common/buffer.hpp"
22 #include "log/logger.hpp"
23 #include "primitives/block_id.hpp"
26 
27 namespace kagome::api {
28  class JRpcProcessor;
29  class JRpcServer;
30  class Listener;
31 } // namespace kagome::api
33  class AppStateManager;
34 }
35 namespace kagome::blockchain {
36  class BlockTree;
37 }
38 namespace kagome::primitives {
39  struct Transaction;
40 }
42  class TrieStorage;
43 }
44 namespace kagome::runtime {
45  class Core;
46 }
47 namespace kagome::subscription {
48  class ExtrinsicEventKeyRepository;
49 }
50 
51 namespace jsonrpc {
52  class Value;
53 }
54 
55 namespace kagome::api {
56 
57  template <typename T>
58  using UCachedType = std::unique_ptr<T, void (*)(T *const)>;
59 
60  KAGOME_DECLARE_CACHE(api_service,
61  KAGOME_CACHE_UNIT(std::string),
63 
64  class JRpcProcessor;
65 
69  class ApiServiceImpl final
70  : public ApiService,
71  public std::enable_shared_from_this<ApiServiceImpl> {
89 
90  using SessionPtr = std::shared_ptr<Session>;
91 
94 
96  using PubsubSubscriptionId = uint32_t;
97 
99 
101  using AdditionMessageType =
102  decltype(KAGOME_EXTRACT_UNIQUE_CACHE(api_service, std::string));
103  using AdditionMessagesList = std::vector<AdditionMessageType>;
105  api_service, AdditionMessagesList));
106 
111  };
112 
113  public:
114  template <class T>
115  using sptr = std::shared_ptr<T>;
116 
117  struct ListenerList {
118  std::vector<sptr<Listener>> listeners;
119  };
120  struct ProcessorSpan {
121  gsl::span<sptr<JRpcProcessor>> processors;
122  };
123 
129  ApiServiceImpl(
130  const std::shared_ptr<application::AppStateManager> &app_state_manager,
131  std::shared_ptr<api::RpcThreadPool> thread_pool,
132  ListenerList listeners,
133  std::shared_ptr<JRpcServer> server,
134  const ProcessorSpan &processors,
135  StorageSubscriptionEnginePtr storage_sub_engine,
136  ChainSubscriptionEnginePtr chain_sub_engine,
137  ExtrinsicSubscriptionEnginePtr ext_sub_engine,
138  std::shared_ptr<subscription::ExtrinsicEventKeyRepository>
139  extrinsic_event_key_repo,
140  std::shared_ptr<blockchain::BlockTree> block_tree,
141  std::shared_ptr<storage::trie::TrieStorage> trie_storage,
142  std::shared_ptr<runtime::Core> core);
143 
144  ~ApiServiceImpl() override = default;
145 
147  bool prepare() override;
148 
150  bool start() override;
151 
153  void stop() override;
154 
155  outcome::result<uint32_t> subscribeSessionToKeys(
156  const std::vector<common::Buffer> &keys) override;
157 
158  outcome::result<bool> unsubscribeSessionFromIds(
159  const std::vector<PubsubSubscriptionId> &subscription_id) override;
160 
161  outcome::result<PubsubSubscriptionId> subscribeFinalizedHeads() override;
162  outcome::result<bool> unsubscribeFinalizedHeads(
163  PubsubSubscriptionId subscription_id) override;
164 
165  outcome::result<PubsubSubscriptionId> subscribeNewHeads() override;
166  outcome::result<bool> unsubscribeNewHeads(
167  PubsubSubscriptionId subscription_id) override;
168 
169  outcome::result<PubsubSubscriptionId> subscribeRuntimeVersion() override;
170  outcome::result<bool> unsubscribeRuntimeVersion(
171  PubsubSubscriptionId subscription_id) override;
172 
173  outcome::result<PubsubSubscriptionId> subscribeForExtrinsicLifecycle(
174  const primitives::Transaction::Hash &tx_hash) override;
175  outcome::result<bool> unsubscribeFromExtrinsicLifecycle(
176  PubsubSubscriptionId subscription_id) override;
177 
178  private:
179  jsonrpc::Value createStateStorageEvent(
180  const std::vector<
181  std::pair<common::Buffer, std::optional<common::Buffer>>>
182  &key_value_pairs,
183  const primitives::BlockHash &block);
184 
185  std::optional<std::shared_ptr<SessionSubscriptions>> findSessionById(
186  Session::SessionId id) {
187  std::lock_guard guard(subscribed_sessions_cs_);
188  if (auto it = subscribed_sessions_.find(id);
189  subscribed_sessions_.end() != it)
190  return it->second;
191 
192  return std::nullopt;
193  }
194  void removeSessionById(Session::SessionId id);
195  std::shared_ptr<SessionSubscriptions> storeSessionWithId(
196  Session::SessionId id, const std::shared_ptr<Session> &session);
197 
198  void onSessionRequest(std::string_view request,
199  std::shared_ptr<Session> session);
200  void onSessionClose(Session::SessionId id, SessionType);
201  void onStorageEvent(SubscriptionSetId set_id,
202  SessionPtr &session,
203  const Buffer &key,
204  const std::optional<Buffer> &data,
205  const common::Hash256 &block);
206  void onChainEvent(SubscriptionSetId set_id,
207  SessionPtr &session,
210  void onExtrinsicEvent(
211  SubscriptionSetId set_id,
212  SessionPtr &session,
215 
216  template <typename Func>
218  if (auto session_context = findSessionById(id)) {
219  BOOST_ASSERT(*session_context);
220  return std::forward<Func>(f)(**session_context);
221  }
222 
223  throw jsonrpc::InternalErrorFault(
224  "Internal error. No session was stored for subscription.");
225  }
226 
227  template <typename T>
229  auto obj = KAGOME_EXTRACT_UNIQUE_CACHE(api_service, std::string);
230  obj->assign(std::forward<T>(value));
231  return obj;
232  }
233 
236  auto obj = KAGOME_EXTRACT_UNIQUE_CACHE(
238  obj->clear();
239  return obj;
240  }
241 
242  std::shared_ptr<api::RpcThreadPool> thread_pool_;
243  std::vector<sptr<Listener>> listeners_;
244  std::shared_ptr<JRpcServer> server_;
246  std::shared_ptr<blockchain::BlockTree> block_tree_;
247  std::shared_ptr<storage::trie::TrieStorage> trie_storage_;
248  std::shared_ptr<runtime::Core> core_;
249 
251  std::unordered_map<Session::SessionId,
252  std::shared_ptr<SessionSubscriptions>>
254 
255  struct {
259  } subscription_engines_;
260  std::shared_ptr<subscription::ExtrinsicEventKeyRepository>
262  };
263 } // namespace kagome::api
264 
265 #endif // KAGOME_API_SERVICE_IMPL_HPP
primitives::events::ChainSubscriptionEnginePtr ChainSubscriptionEnginePtr
Class represents arbitrary (including empty) byte buffer.
Definition: buffer.hpp:29
primitives::events::StorageEventSubscriberPtr StorageEventSubscriberPtr
std::shared_ptr< blockchain::BlockTree > block_tree_
std::shared_ptr< ExtrinsicEventSubscriber > ExtrinsicEventSubscriberPtr
std::shared_ptr< StorageSubscriptionEngine > StorageSubscriptionEnginePtr
subscription::SubscriptionEngine< SubscribedExtrinsicId, std::shared_ptr< api::Session >, primitives::events::ExtrinsicLifecycleEvent > ExtrinsicSubscriptionEngine
std::unique_ptr< T, void(*)(T *const)> UCachedType
SessionSubscriptions::AdditionMessageType uploadFromCache(T &&value)
#define KAGOME_CACHE_UNIT(type)
STL namespace.
std::shared_ptr< runtime::Core > core_
primitives::events::ExtrinsicSubscriptionEnginePtr ExtrinsicSubscriptionEnginePtr
std::shared_ptr< JRpcServer > server_
auto withSession(kagome::api::Session::SessionId id, Func &&f)
boost::variant< std::nullopt_t, HeadsEventParams, RuntimeVersionEventParams, NewRuntimeEventParams > ChainEventParams
Definition: event_types.hpp:51
std::shared_ptr< Session > SessionPtr
std::vector< AdditionMessageType > AdditionMessagesList
std::shared_ptr< ChainEventSubscriber > ChainEventSubscriberPtr
subscription::SubscriptionSetId SubscriptionSetId
subscription set id from subscription::SubscriptionEngine
SessionSubscriptions::CachedAdditionMessagesList uploadMessagesListFromCache()
uint32_t PubsubSubscriptionId
subscription id for pubsub API methods
Definition: api_service.hpp:23
SLBuffer< std::numeric_limits< size_t >::max()> Buffer
Definition: buffer.hpp:244
std::shared_ptr< ChainSubscriptionEngine > ChainSubscriptionEnginePtr
std::vector< sptr< Listener > > listeners_
#define KAGOME_EXTRACT_SHARED_CACHE(prefix, type)
std::vector< sptr< Listener > > listeners
std::shared_ptr< soralog::Logger > Logger
Definition: logger.hpp:23
primitives::events::ExtrinsicEventSubscriberPtr ExtrinsicEventSubscriberPtr
std::shared_ptr< subscription::ExtrinsicEventKeyRepository > extrinsic_event_key_repo_
decltype(KAGOME_EXTRACT_UNIQUE_CACHE(api_service, std::string)) AdditionMessageType
KAGOME_DECLARE_CACHE(api_service, KAGOME_CACHE_UNIT(std::string), KAGOME_CACHE_UNIT(std::vector< UCachedType< std::string >>)) class JRpcProcessor
StorageSubscriptionEngine::SubscriberType StorageEventSubscriber
std::shared_ptr< ExtrinsicSubscriptionEngine > ExtrinsicSubscriptionEnginePtr
primitives::events::StorageSubscriptionEnginePtr StorageSubscriptionEnginePtr
#define KAGOME_EXTRACT_UNIQUE_CACHE(prefix, type)
std::shared_ptr< StorageEventSubscriber > StorageEventSubscriberPtr
ExtrinsicSubscriptionEngine::SubscriberType ExtrinsicEventSubscriber
primitives::events::ChainEventSubscriberPtr ChainEventSubscriberPtr
decltype(KAGOME_EXTRACT_SHARED_CACHE(api_service, AdditionMessagesList)) CachedAdditionMessagesList
ChainSubscriptionEngine::SubscriberType ChainEventSubscriber
std::shared_ptr< storage::trie::TrieStorage > trie_storage_
std::shared_ptr< api::RpcThreadPool > thread_pool_
uint64_t SessionId
Definition: session.hpp:43
std::unordered_map< Session::SessionId, std::shared_ptr< SessionSubscriptions > > subscribed_sessions_
std::optional< std::shared_ptr< SessionSubscriptions > > findSessionById(Session::SessionId id)
gsl::span< sptr< JRpcProcessor > > processors