8 #include <boost/algorithm/string/replace.hpp> 27 std::optional<kagome::api::Session::SessionId> bound_session_id_ =
32 bound_session_id_ = id;
34 void releaseSessionId() {
35 bound_session_id_ = std::nullopt;
37 std::optional<kagome::api::Session::SessionId> fetchSessionId() {
38 return bound_session_id_;
42 template <
typename Func>
43 auto withThisSession(Func &&f) {
44 if (
auto session_id = threaded_info.fetchSessionId(); session_id)
45 return std::forward<Func>(f)(*session_id);
47 throw jsonrpc::InternalErrorFault(
48 "Internal error. No session was bound to subscription.");
66 inline void forJsonData(std::shared_ptr<JRpcServer> server,
69 std::string_view name,
70 jsonrpc::Value &&value,
74 BOOST_ASSERT(!name.empty());
76 jsonrpc::Value::Struct response;
77 response[
"result"] = std::move(value);
78 response[
"subscription"] =
makeValue(set_id);
80 jsonrpc::Request::Parameters params;
81 params.push_back(std::move(response));
83 server->processJsonData(name.data(), params, [&](
const auto &response) {
84 if (response.has_value())
85 std::forward<F>(f)(response.value());
87 logger->error(
"process Json data failed => {}",
88 response.error().message());
91 inline void sendEvent(std::shared_ptr<JRpcServer> server,
92 std::shared_ptr<Session> session,
95 std::string_view name,
96 jsonrpc::Value &&value) {
97 BOOST_ASSERT(session);
103 [session{std::move(session)}](
const auto &response) {
104 session->respond(response);
120 const std::shared_ptr<application::AppStateManager> &app_state_manager,
121 std::shared_ptr<api::RpcThreadPool> thread_pool,
123 std::shared_ptr<JRpcServer> server,
128 std::shared_ptr<subscription::ExtrinsicEventKeyRepository>
129 extrinsic_event_key_repo,
130 std::shared_ptr<blockchain::BlockTree> block_tree,
131 std::shared_ptr<storage::trie::TrieStorage> trie_storage,
132 std::shared_ptr<runtime::Core> core)
133 : thread_pool_(
std::move(thread_pool)),
134 listeners_(
std::move(listeners.listeners)),
135 server_(
std::move(server)),
139 core_(std::move(core)),
141 .chain = std::move(chain_sub_engine),
142 .ext = std::move(ext_sub_engine)},
150 return listener !=
nullptr;
152 for (
auto &processor : processors.processors) {
153 BOOST_ASSERT(processor !=
nullptr);
154 processor->registerHandlers();
157 BOOST_ASSERT(app_state_manager);
158 app_state_manager->takeControl(*
this);
177 jsonrpc::Value::Array changes;
178 changes.reserve(key_value_pairs.size());
179 for (
auto &[key, value] : key_value_pairs) {
180 changes.emplace_back(jsonrpc::Value{jsonrpc::Value::Array{
186 jsonrpc::Value::Struct result;
187 result[
"changes"] = std::move(changes);
195 auto on_new_session =
196 [wp = weak_from_this()](
const sptr<Session> &session)
mutable {
197 auto self = wp.lock();
202 #define UNWRAP_WEAK_PTR(callback) \ 203 [wp](auto &&...params) mutable { \ 204 if (auto self = wp.lock()) { \ 205 self->callback(params...); \ 210 auto session_context =
211 self->storeSessionWithId(session->id(), session);
212 BOOST_ASSERT(session_context);
213 session_context->storage_sub->setCallback(
215 session_context->chain_sub->setCallback(
217 session_context->ext_sub->setCallback(
224 #undef UNWRAP_WEAK_PTR 226 listener->setHandlerForNewSession(std::move(on_new_session));
233 SL_DEBUG(
logger_,
"API Service started");
239 SL_DEBUG(
logger_,
"API Service stopped");
242 std::shared_ptr<ApiServiceImpl::SessionSubscriptions>
244 const std::shared_ptr<Session> &session) {
248 std::make_shared<ApiServiceImpl::SessionSubscriptions>(
250 .
storage_sub = std::make_shared<StorageEventSubscriber>(
252 .chain_sub = std::make_shared<ChainEventSubscriber>(
254 .ext_sub = std::make_shared<ExtrinsicEventSubscriber>(
258 BOOST_ASSERT(inserted);
267 outcome::result<ApiServiceImpl::PubsubSubscriptionId>
269 const std::vector<common::Buffer> &keys) {
273 const auto id = session->generateSubscriptionSetId();
274 const auto &best_block_hash =
block_tree_->deepestLeaf().hash;
275 const auto &header =
block_tree_->getBlockHeader(best_block_hash);
276 BOOST_ASSERT(header.has_value());
278 trie_storage_->getPersistentBatchAt(header.value().state_root);
279 BOOST_ASSERT(persistent_batch.has_value());
281 auto &pb = persistent_batch.value();
286 std::vector<std::pair<common::Buffer, std::optional<common::Buffer>>>
288 pairs.reserve(keys.size());
290 for (
auto &key : keys) {
291 session->subscribe(
id, key);
293 auto value_opt_res = pb->tryGet(key);
294 if (value_opt_res.has_value()) {
295 pairs.emplace_back(std::move(key),
296 std::move(value_opt_res.value()));
303 kRpcEventSubscribeStorage,
305 [&](
const auto &result) {
306 session_context.
messages->emplace_back(
315 outcome::result<ApiServiceImpl::PubsubSubscriptionId>
319 auto &session = session_context.
chain_sub;
320 const auto id = session->generateSubscriptionSetId();
321 session->subscribe(
id,
326 if (!header.has_error()) {
331 kRpcEventFinalizedHeads,
333 [&](
const auto &result) {
334 session_context.
messages->emplace_back(
339 "Request block header of the last finalized " 340 "failed with error: " 342 header.error().message());
353 auto &session = session_context.
chain_sub;
354 return session->unsubscribe(subscription_id);
359 outcome::result<ApiServiceImpl::PubsubSubscriptionId>
363 auto &session = session_context.
chain_sub;
364 const auto id = session->generateSubscriptionSetId();
369 if (!header.has_error()) {
376 [&](
const auto &result) {
377 session_context.
messages->emplace_back(
382 "Request block header of the deepest leaf failed with error: {}",
383 header.error().message());
394 auto &session = session_context.
chain_sub;
395 return session->unsubscribe(subscription_id);
400 outcome::result<ApiServiceImpl::PubsubSubscriptionId>
404 auto &session = session_context.
chain_sub;
405 const auto id = session->generateSubscriptionSetId();
410 if (version_res.has_value()) {
411 const auto &
version = version_res.value();
416 kRpcEventRuntimeVersion,
418 [&](
const auto &result) {
419 session_context.
messages->emplace_back(
432 auto &session = session_context.
chain_sub;
433 return session->unsubscribe(subscription_id);
438 outcome::result<ApiServiceImpl::PubsubSubscriptionId>
443 auto &session_sub = session_context.
ext_sub;
444 const auto sub_id = session_sub->generateSubscriptionSetId();
446 session_sub->subscribe(sub_id, key);
457 auto &session = session_context.
ext_sub;
458 return session->unsubscribe(subscription_id);
464 const std::vector<PubsubSubscriptionId> &subscription_ids) {
468 for (
auto id : subscription_ids) session->unsubscribe(
id);
475 std::shared_ptr<Session> session) {
476 auto thread_session_auto_release = [](
void *) {
477 threaded_info.releaseSessionId();
480 threaded_info.storeSessionId(session->id());
486 std::unique_ptr<void, decltype(thread_session_auto_release)>
488 thread_session_keeper(reinterpret_cast<void *>(0xff),
489 std::move(thread_session_auto_release));
493 std::string str_request(request);
494 boost::replace_first(str_request,
"\"params\":null",
"\"params\":[null]");
497 server_->processData(str_request, [&](std::string_view response)
mutable {
499 session->respond(response);
504 if (session_context.messages)
505 for (
auto &msg : *session_context.messages) {
507 session->respond(*msg);
510 session_context.messages.reset();
512 }
catch (jsonrpc::InternalErrorFault &) {
523 const std::optional<Buffer> &data,
529 kRpcEventSubscribeStorage,
538 std::string_view name;
539 switch (event_type) {
552 BOOST_ASSERT(!
"Unknown chain event");
556 BOOST_ASSERT(!name.empty());
570 kRpcEventUpdateExtrinsic,
outcome::result< PubsubSubscriptionId > subscribeRuntimeVersion() override
primitives::events::ChainSubscriptionEnginePtr ChainSubscriptionEnginePtr
uint32_t SubscribedExtrinsicId
Class represents arbitrary (including empty) byte buffer.
outcome::result< PubsubSubscriptionId > subscribeNewHeads() override
std::shared_ptr< blockchain::BlockTree > block_tree_
const std::string kRpcEventRuntimeVersion
void removeSessionById(Session::SessionId id)
outcome::result< uint32_t > subscribeSessionToKeys(const std::vector< common::Buffer > &keys) override
void onSessionClose(Session::SessionId id, SessionType)
const std::string kRpcEventUpdateExtrinsic
SessionSubscriptions::AdditionMessageType uploadFromCache(T &&value)
KAGOME_DEFINE_CACHE(api_service)
void onSessionRequest(std::string_view request, std::shared_ptr< Session > session)
outcome::result< std::unique_ptr< PersistentTrieBatch > > persistent_batch(const std::unique_ptr< TrieStorageImpl > &trie, const RootHash &hash)
std::shared_ptr< runtime::Core > core_
ExtrinsicEventSubscriberPtr ext_sub
primitives::events::ExtrinsicSubscriptionEnginePtr ExtrinsicSubscriptionEnginePtr
std::shared_ptr< JRpcServer > server_
auto withSession(kagome::api::Session::SessionId id, Func &&f)
std::shared_ptr< T > sptr
boost::variant< std::nullopt_t, HeadsEventParams, RuntimeVersionEventParams, NewRuntimeEventParams > ChainEventParams
outcome::result< bool > unsubscribeFromExtrinsicLifecycle(PubsubSubscriptionId subscription_id) override
std::shared_ptr< Session > SessionPtr
void onStorageEvent(SubscriptionSetId set_id, SessionPtr &session, const Buffer &key, const std::optional< Buffer > &data, const common::Hash256 &block)
jsonrpc::Value createStateStorageEvent(const std::vector< std::pair< common::Buffer, std::optional< common::Buffer >>> &key_value_pairs, const primitives::BlockHash &block)
std::mutex subscribed_sessions_cs_
subscription::SubscriptionSetId SubscriptionSetId
subscription set id from subscription::SubscriptionEngine
SessionSubscriptions::CachedAdditionMessagesList uploadMessagesListFromCache()
ApiServiceImpl(const std::shared_ptr< application::AppStateManager > &app_state_manager, std::shared_ptr< api::RpcThreadPool > thread_pool, ListenerList listeners, std::shared_ptr< JRpcServer > server, const ProcessorSpan &processors, StorageSubscriptionEnginePtr storage_sub_engine, ChainSubscriptionEnginePtr chain_sub_engine, ExtrinsicSubscriptionEnginePtr ext_sub_engine, std::shared_ptr< subscription::ExtrinsicEventKeyRepository > extrinsic_event_key_repo, std::shared_ptr< blockchain::BlockTree > block_tree, std::shared_ptr< storage::trie::TrieStorage > trie_storage, std::shared_ptr< runtime::Core > core)
uint32_t PubsubSubscriptionId
subscription id for pubsub API methods
std::vector< sptr< Listener > > listeners_
void onChainEvent(SubscriptionSetId set_id, SessionPtr &session, primitives::events::ChainEventType event_type, const primitives::events::ChainEventParams ¶ms)
std::shared_ptr< soralog::Logger > Logger
outcome::result< PubsubSubscriptionId > subscribeForExtrinsicLifecycle(const primitives::Transaction::Hash &tx_hash) override
CachedAdditionMessagesList messages
std::shared_ptr< subscription::ExtrinsicEventKeyRepository > extrinsic_event_key_repo_
std::string hex_lower_0x(gsl::span< const uint8_t > bytes) noexcept
Converts bytes to hex representation with prefix 0x.
primitives::events::StorageSubscriptionEnginePtr StorageSubscriptionEnginePtr
const std::string kRpcEventSubscribeStorage
jsonrpc::Value makeValue(const uint32_t &)
StorageEventSubscriberPtr storage_sub
outcome::result< bool > unsubscribeNewHeads(PubsubSubscriptionId subscription_id) override
ChainEventSubscriberPtr chain_sub
std::shared_ptr< storage::trie::TrieStorage > trie_storage_
outcome::result< PubsubSubscriptionId > subscribeFinalizedHeads() override
std::shared_ptr< api::RpcThreadPool > thread_pool_
Logger createLogger(const std::string &tag)
std::unordered_map< Session::SessionId, std::shared_ptr< SessionSubscriptions > > subscribed_sessions_
struct kagome::api::ApiServiceImpl::@1 subscription_engines_
outcome::result< bool > unsubscribeSessionFromIds(const std::vector< PubsubSubscriptionId > &subscription_id) override
const std::string kRpcEventFinalizedHeads
std::shared_ptr< SessionSubscriptions > storeSessionWithId(Session::SessionId id, const std::shared_ptr< Session > &session)
outcome::result< bool > unsubscribeRuntimeVersion(PubsubSubscriptionId subscription_id) override
outcome::result< bool > unsubscribeFinalizedHeads(PubsubSubscriptionId subscription_id) override
#define UNWRAP_WEAK_PTR(callback)
void onExtrinsicEvent(SubscriptionSetId set_id, SessionPtr &session, primitives::events::SubscribedExtrinsicId id, const primitives::events::ExtrinsicLifecycleEvent ¶ms)
const std::string kRpcEventNewHeads