6 #ifndef KAGOME_API_SERVICE_IMPL_HPP 7 #define KAGOME_API_SERVICE_IMPL_HPP 13 #include <type_traits> 14 #include <unordered_map> 16 #include <jsonrpc-lean/fault.h> 48 class ExtrinsicEventKeyRepository;
115 using sptr = std::shared_ptr<T>;
130 const std::shared_ptr<application::AppStateManager> &app_state_manager,
131 std::shared_ptr<api::RpcThreadPool> thread_pool,
133 std::shared_ptr<JRpcServer> server,
138 std::shared_ptr<subscription::ExtrinsicEventKeyRepository>
139 extrinsic_event_key_repo,
140 std::shared_ptr<blockchain::BlockTree> block_tree,
141 std::shared_ptr<storage::trie::TrieStorage> trie_storage,
142 std::shared_ptr<runtime::Core> core);
144 ~ApiServiceImpl()
override =
default;
147 bool prepare()
override;
150 bool start()
override;
153 void stop()
override;
155 outcome::result<uint32_t> subscribeSessionToKeys(
156 const std::vector<common::Buffer> &keys)
override;
158 outcome::result<bool> unsubscribeSessionFromIds(
159 const std::vector<PubsubSubscriptionId> &subscription_id)
override;
161 outcome::result<PubsubSubscriptionId> subscribeFinalizedHeads()
override;
162 outcome::result<bool> unsubscribeFinalizedHeads(
165 outcome::result<PubsubSubscriptionId> subscribeNewHeads()
override;
166 outcome::result<bool> unsubscribeNewHeads(
169 outcome::result<PubsubSubscriptionId> subscribeRuntimeVersion()
override;
170 outcome::result<bool> unsubscribeRuntimeVersion(
173 outcome::result<PubsubSubscriptionId> subscribeForExtrinsicLifecycle(
175 outcome::result<bool> unsubscribeFromExtrinsicLifecycle(
179 jsonrpc::Value createStateStorageEvent(
187 std::lock_guard guard(subscribed_sessions_cs_);
188 if (
auto it = subscribed_sessions_.find(
id);
189 subscribed_sessions_.end() != it)
195 std::shared_ptr<SessionSubscriptions> storeSessionWithId(
198 void onSessionRequest(std::string_view request,
199 std::shared_ptr<Session> session);
204 const std::optional<Buffer> &data,
210 void onExtrinsicEvent(
216 template <
typename Func>
218 if (
auto session_context = findSessionById(
id)) {
219 BOOST_ASSERT(*session_context);
220 return std::forward<Func>(f)(**session_context);
223 throw jsonrpc::InternalErrorFault(
224 "Internal error. No session was stored for subscription.");
227 template <
typename T>
230 obj->assign(std::forward<T>(value));
248 std::shared_ptr<runtime::Core>
core_;
252 std::shared_ptr<SessionSubscriptions>>
259 } subscription_engines_;
260 std::shared_ptr<subscription::ExtrinsicEventKeyRepository>
265 #endif // KAGOME_API_SERVICE_IMPL_HPP primitives::events::ChainSubscriptionEnginePtr ChainSubscriptionEnginePtr
uint32_t SubscribedExtrinsicId
Class represents arbitrary (including empty) byte buffer.
primitives::events::StorageEventSubscriberPtr StorageEventSubscriberPtr
std::shared_ptr< blockchain::BlockTree > block_tree_
std::shared_ptr< ExtrinsicEventSubscriber > ExtrinsicEventSubscriberPtr
std::shared_ptr< StorageSubscriptionEngine > StorageSubscriptionEnginePtr
subscription::SubscriptionEngine< SubscribedExtrinsicId, std::shared_ptr< api::Session >, primitives::events::ExtrinsicLifecycleEvent > ExtrinsicSubscriptionEngine
std::unique_ptr< T, void(*)(T *const)> UCachedType
SessionSubscriptions::AdditionMessageType uploadFromCache(T &&value)
#define KAGOME_CACHE_UNIT(type)
std::shared_ptr< runtime::Core > core_
ExtrinsicEventSubscriberPtr ext_sub
primitives::events::ExtrinsicSubscriptionEnginePtr ExtrinsicSubscriptionEnginePtr
std::shared_ptr< JRpcServer > server_
auto withSession(kagome::api::Session::SessionId id, Func &&f)
boost::variant< std::nullopt_t, HeadsEventParams, RuntimeVersionEventParams, NewRuntimeEventParams > ChainEventParams
std::shared_ptr< Session > SessionPtr
std::vector< AdditionMessageType > AdditionMessagesList
std::mutex subscribed_sessions_cs_
std::shared_ptr< ChainEventSubscriber > ChainEventSubscriberPtr
subscription::SubscriptionSetId SubscriptionSetId
subscription set id from subscription::SubscriptionEngine
SessionSubscriptions::CachedAdditionMessagesList uploadMessagesListFromCache()
uint32_t PubsubSubscriptionId
subscription id for pubsub API methods
SLBuffer< std::numeric_limits< size_t >::max()> Buffer
std::shared_ptr< ChainSubscriptionEngine > ChainSubscriptionEnginePtr
std::vector< sptr< Listener > > listeners_
#define KAGOME_EXTRACT_SHARED_CACHE(prefix, type)
std::vector< sptr< Listener > > listeners
std::shared_ptr< soralog::Logger > Logger
primitives::events::ExtrinsicEventSubscriberPtr ExtrinsicEventSubscriberPtr
CachedAdditionMessagesList messages
std::shared_ptr< subscription::ExtrinsicEventKeyRepository > extrinsic_event_key_repo_
decltype(KAGOME_EXTRACT_UNIQUE_CACHE(api_service, std::string)) AdditionMessageType
KAGOME_DECLARE_CACHE(api_service, KAGOME_CACHE_UNIT(std::string), KAGOME_CACHE_UNIT(std::vector< UCachedType< std::string >>)) class JRpcProcessor
StorageSubscriptionEngine::SubscriberType StorageEventSubscriber
std::shared_ptr< ExtrinsicSubscriptionEngine > ExtrinsicSubscriptionEnginePtr
primitives::events::StorageSubscriptionEnginePtr StorageSubscriptionEnginePtr
StorageEventSubscriberPtr storage_sub
std::shared_ptr< T > sptr
#define KAGOME_EXTRACT_UNIQUE_CACHE(prefix, type)
std::shared_ptr< StorageEventSubscriber > StorageEventSubscriberPtr
ExtrinsicSubscriptionEngine::SubscriberType ExtrinsicEventSubscriber
ChainEventSubscriberPtr chain_sub
primitives::events::ChainEventSubscriberPtr ChainEventSubscriberPtr
decltype(KAGOME_EXTRACT_SHARED_CACHE(api_service, AdditionMessagesList)) CachedAdditionMessagesList
ChainSubscriptionEngine::SubscriberType ChainEventSubscriber
std::shared_ptr< storage::trie::TrieStorage > trie_storage_
uint32_t SubscriptionSetId
std::shared_ptr< api::RpcThreadPool > thread_pool_
std::unordered_map< Session::SessionId, std::shared_ptr< SessionSubscriptions > > subscribed_sessions_
std::optional< std::shared_ptr< SessionSubscriptions > > findSessionById(Session::SessionId id)
gsl::span< sptr< JRpcProcessor > > processors