Kagome
Polkadot Runtime Engine in C++17
service_impl.hpp
Go to the documentation of this file.
1 
6 #ifndef KAGOME_TELEMETRY_SERVICE_IMPL_HPP
7 #define KAGOME_TELEMETRY_SERVICE_IMPL_HPP
8 
9 #include "telemetry/service.hpp"
10 
11 #include <chrono>
12 #include <memory>
13 #include <thread>
14 #include <vector>
15 
16 #define RAPIDJSON_NO_SIZETYPEDEFINE
17 namespace rapidjson {
18  typedef ::std::size_t SizeType;
19 }
20 #include <rapidjson/document.h>
21 
22 #include <boost/asio/io_context.hpp>
23 #include <libp2p/basic/scheduler.hpp>
24 #include <libp2p/host/host.hpp>
28 #include "common/spin_lock.hpp"
29 #include "log/logger.hpp"
30 #include "network/peer_manager.hpp"
32 #include "telemetry/connection.hpp"
35 
36 namespace kagome::telemetry {
37 
38  static constexpr auto kImplementationName = "Kagome Node";
39 
40  static constexpr auto kTelemetryReportingInterval = std::chrono::seconds(1);
41  static constexpr auto kTelemetrySystemReportInterval =
42  std::chrono::seconds(5);
43  static constexpr auto kTelemetryMessageMaxLengthBytes = 2 * 1024;
44  static constexpr auto kTelemetryMessagePoolSize = 1000;
45 
47  : public TelemetryService,
48  public std::enable_shared_from_this<TelemetryService> {
49  public:
51  std::shared_ptr<application::AppStateManager> app_state_manager,
52  const application::AppConfiguration &app_configuration,
53  const application::ChainSpec &chain_spec,
54  const libp2p::Host &host,
55  std::shared_ptr<const transaction_pool::TransactionPool> tx_pool,
56  std::shared_ptr<const storage::BufferStorage> buffer_storage,
57  std::shared_ptr<const network::PeerManager> peer_manager);
60  TelemetryServiceImpl &operator=(const TelemetryServiceImpl &) = delete;
61  TelemetryServiceImpl &operator=(TelemetryServiceImpl &&) = delete;
62 
63  void notifyBlockImported(const primitives::BlockInfo &info,
64  BlockOrigin origin) override;
65 
66  void notifyBlockFinalized(const primitives::BlockInfo &info) override;
67 
68  void setGenesisBlockHash(const primitives::BlockHash &hash) override;
69 
70  void notifyWasSynchronized() override;
71 
72  bool isEnabled() const override;
73 
74  private:
75  // handlers for AppStateManager
76  bool prepare();
77  bool start();
78  void stop();
79 
81  std::vector<TelemetryEndpoint> chainSpecEndpoints() const;
82 
84  std::string connectedMessage();
85 
87  void frequentNotificationsRoutine();
88 
90  void delayedNotificationsRoutine();
91 
96  void prepareGreetingMessage();
97 
105  std::string blockNotification(const primitives::BlockInfo &info,
106  std::optional<BlockOrigin> origin);
107 
109  std::string systemIntervalMessage1();
110 
112  std::string systemIntervalMessage2();
113 
115  std::string currentTimestamp() const;
116 
117  /*****************************************************************
118  * *
119  * Class Fields *
120  * *
121  *****************************************************************/
122 
123  // constructor arguments
124  std::shared_ptr<application::AppStateManager> app_state_manager_;
128  std::shared_ptr<const transaction_pool::TransactionPool> tx_pool_;
129  std::shared_ptr<const storage::BufferStorage> buffer_storage_;
130  std::shared_ptr<const network::PeerManager> peer_manager_;
131  const bool enabled_;
132 
133  // connections thread fields
134  volatile bool shutdown_requested_ = false;
135  std::shared_ptr<libp2p::basic::Scheduler> scheduler_;
136  using WorkGuardT = boost::asio::executor_work_guard<
137  boost::asio::io_context::executor_type>;
138  std::shared_ptr<WorkGuardT> work_guard_;
139  std::shared_ptr<boost::asio::io_context> io_context_;
140  std::shared_ptr<std::thread> worker_thread_;
141  std::vector<std::shared_ptr<TelemetryConnection>> connections_;
142  libp2p::basic::Scheduler::Handle frequent_timer_;
143  libp2p::basic::Scheduler::Handle delayed_timer_;
144 
145  // data cache before serialization and sending over
147  struct {
148  bool is_set = false;
149  primitives::BlockInfo block{0, {}};
150  BlockOrigin origin;
151  } last_imported_;
152  struct {
153  primitives::BlockNumber reported = 0;
154  primitives::BlockInfo block{0, {}};
155  } last_finalized_;
156 
157  // auxiliary fields
159  rapidjson::Document greeting_json_;
160  std::string genesis_hash_;
161  std::shared_ptr<MessagePool> message_pool_;
162  bool was_synchronized_ = false;
163  };
164 
165 } // namespace kagome::telemetry
166 
167 #endif // KAGOME_TELEMETRY_SERVICE_IMPL_HPP
static constexpr auto kTelemetryReportingInterval
std::shared_ptr< boost::asio::io_context > io_context_
const application::ChainSpec & chain_spec_
static constexpr auto kImplementationName
libp2p::basic::Scheduler::Handle delayed_timer_
static constexpr auto kTelemetryMessageMaxLengthBytes
uint32_t BlockNumber
Definition: common.hpp:18
std::shared_ptr< libp2p::basic::Scheduler > scheduler_
static constexpr auto kTelemetrySystemReportInterval
std::shared_ptr< soralog::Logger > Logger
Definition: logger.hpp:23
libp2p::basic::Scheduler::Handle frequent_timer_
std::shared_ptr< application::AppStateManager > app_state_manager_
std::shared_ptr< const storage::BufferStorage > buffer_storage_
std::vector< std::shared_ptr< TelemetryConnection > > connections_
std::shared_ptr< std::thread > worker_thread_
std::shared_ptr< const network::PeerManager > peer_manager_
const application::AppConfiguration & app_configuration_
static constexpr auto kTelemetryMessagePoolSize
::std::size_t SizeType
std::shared_ptr< MessagePool > message_pool_
boost::asio::executor_work_guard< boost::asio::io_context::executor_type > WorkGuardT
std::shared_ptr< const transaction_pool::TransactionPool > tx_pool_
std::shared_ptr< WorkGuardT > work_guard_