Kagome
Polkadot Runtime Engine in C++17
service_impl.cpp
Go to the documentation of this file.
1 
7 
8 #include <regex>
9 
10 #include <fmt/core.h>
11 
12 #define RAPIDJSON_NO_SIZETYPEDEFINE
13 namespace rapidjson {
14  typedef ::std::size_t SizeType;
15 }
16 #include <rapidjson/document.h>
17 #include <rapidjson/rapidjson.h>
18 #include <rapidjson/stringbuffer.h>
19 #include <rapidjson/writer.h>
20 
21 #include <boost/date_time/posix_time/posix_time.hpp>
22 #include <libp2p/basic/scheduler/asio_scheduler_backend.hpp>
23 #include <libp2p/basic/scheduler/scheduler_impl.hpp>
24 #include <libp2p/multi/multiaddress.hpp>
25 #include "common/uri.hpp"
27 
28 namespace {
29  std::string json2string(rapidjson::Document &document) {
30  rapidjson::StringBuffer buffer;
31  rapidjson::Writer writer(buffer);
32  document.Accept(writer);
33  return buffer.GetString();
34  }
35 } // namespace
36 
37 namespace kagome::telemetry {
38 
39  TelemetryServiceImpl::TelemetryServiceImpl(
40  std::shared_ptr<application::AppStateManager> app_state_manager,
41  const application::AppConfiguration &app_configuration,
42  const application::ChainSpec &chain_spec,
43  const libp2p::Host &host,
44  std::shared_ptr<const transaction_pool::TransactionPool> tx_pool,
45  std::shared_ptr<const storage::BufferStorage> buffer_storage,
46  std::shared_ptr<const network::PeerManager> peer_manager)
47  : app_state_manager_{std::move(app_state_manager)},
48  app_configuration_{app_configuration},
49  chain_spec_{chain_spec},
50  host_{host},
51  tx_pool_{std::move(tx_pool)},
52  buffer_storage_{std::move(buffer_storage)},
53  peer_manager_{std::move(peer_manager)},
55  log_{log::createLogger("TelemetryService", "telemetry")} {
56  BOOST_ASSERT(app_state_manager_);
57  BOOST_ASSERT(tx_pool_);
58  BOOST_ASSERT(buffer_storage_);
59  BOOST_ASSERT(peer_manager_);
60  io_context_ = std::make_shared<boost::asio::io_context>();
61  auto scheduler_asio_backend =
62  std::make_shared<libp2p::basic::AsioSchedulerBackend>(io_context_);
63  scheduler_ = std::make_shared<libp2p::basic::SchedulerImpl>(
64  scheduler_asio_backend, libp2p::basic::Scheduler::Config{});
65  work_guard_ = std::make_shared<WorkGuardT>(io_context_->get_executor());
66  if (enabled_) {
67  message_pool_ = std::make_shared<MessagePool>(
69  app_state_manager_->registerHandlers([&]() { return prepare(); },
70  [&]() { return start(); },
71  [&]() { stop(); });
72  } else {
73  SL_INFO(log_, "Telemetry disabled");
74  }
75  }
76 
79  auto chain_spec = chainSpecEndpoints();
80  const auto &cli_config = app_configuration_.telemetryEndpoints();
81  const auto &endpoints = cli_config.empty() ? chain_spec : cli_config;
82 
83  for (const auto &endpoint : endpoints) {
84  auto connection = std::make_shared<TelemetryConnectionImpl>(
86  endpoint,
87  // there is no way for connections to live longer than *this
88  [&](std::shared_ptr<TelemetryConnection> conn) {
89  if (not shutdown_requested_) {
90  conn->send(connectedMessage());
91  last_finalized_.reported = 0;
92  }
93  },
95  scheduler_);
96  connections_.emplace_back(std::move(connection));
97  }
98  worker_thread_ = std::make_shared<std::thread>(
99  [io_context{io_context_}] { io_context->run(); });
100  worker_thread_->detach();
101  return true;
102  }
103 
105  for (auto &connection : connections_) {
106  connection->connect();
107  }
108  frequent_timer_ = scheduler_->scheduleWithHandle(
110  delayed_timer_ = scheduler_->scheduleWithHandle(
112  return true;
113  }
114 
116  shutdown_requested_ = true;
117  frequent_timer_.cancel();
118  delayed_timer_.cancel();
119  for (auto &connection : connections_) {
120  connection->shutdown();
121  }
122  io_context_->stop();
123  }
124 
125  std::vector<TelemetryEndpoint> TelemetryServiceImpl::chainSpecEndpoints()
126  const {
127  std::vector<TelemetryEndpoint> endpoints;
128  auto &from_spec = chain_spec_.telemetryEndpoints();
129  endpoints.reserve(from_spec.size());
130  for (const auto &endpoint : from_spec) {
131  // unfortunately, cannot use structured binding due to clang limitations
132  auto uri_candidate = endpoint.first;
133 
134  if (not uri_candidate.empty() and '/' == uri_candidate.at(0)) {
135  // assume endpoint specified as multiaddress
136  auto ma_res = libp2p::multi::Multiaddress::create(uri_candidate);
137  if (ma_res.has_error()) {
138  SL_WARN(log_,
139  "Telemetry endpoint '{}' cannot be interpreted as a valid "
140  "multiaddress and was skipped due to error: {}",
141  uri_candidate,
142  ma_res.error().message());
143  continue;
144  }
145 
146  {
147  // transform multiaddr of telemetry endpoint into uri form
148  auto parts = ma_res.value().getProtocolsWithValues();
149  if (parts.size() != 3) {
150  SL_WARN(
151  log_,
152  "Telemetry endpoint '{}' has unknown format and was skipped",
153  uri_candidate);
154  continue;
155  }
156  auto host = parts[0].second;
157  auto schema = parts[2].first.name.substr(std::strlen("x-parity-"));
158  auto path =
159  std::regex_replace(parts[2].second, std::regex("%2F"), "/");
160  uri_candidate = fmt::format("{}://{}{}", schema, host, path);
161  }
162  }
163  auto parsed_uri = common::Uri::parse(uri_candidate);
164  if (parsed_uri.error().has_value()) {
165  SL_WARN(log_,
166  "Telemetry endpoint '{}' cannot be interpreted as a valid URI "
167  "and was skipped due to error: {}",
168  uri_candidate,
169  parsed_uri.error().value());
170  continue;
171  }
172 
173  auto &verbosity = endpoint.second;
174  if (verbosity > 9) {
175  SL_WARN(log_,
176  "Telemetry endpoint '{}' is not valid, its verbosity level is "
177  "above the maximum possible {} > 9",
178  uri_candidate,
179  verbosity);
180  continue;
181  }
182  endpoints.emplace_back(std::move(parsed_uri), verbosity);
183  }
184  return endpoints;
185  }
186 
188  frequent_timer_.cancel();
189  if (shutdown_requested_) {
190  return;
191  }
192  std::optional<MessageHandle> last_imported_msg, last_finalized_msg;
193  auto refs = connections_.size();
194  {
195  // do quick information retrieval under spin lock
196  std::lock_guard lock(cache_mutex_);
197  // prepare last imported block message
198  if (last_imported_.is_set) {
199  last_imported_.is_set = false;
200  auto msg =
202  last_imported_msg = message_pool_->push(msg, refs);
203  }
204  // prepare last finalized message if there is a need to
205  if (last_finalized_.reported < last_finalized_.block.number) {
206  auto msg = blockNotification(last_finalized_.block, std::nullopt);
207  last_finalized_msg = message_pool_->push(msg, refs);
208  last_finalized_.reported = last_finalized_.block.number;
209  }
210  }
211  for (auto &conn : connections_) {
212  if (last_imported_msg) {
213  conn->send(*last_imported_msg);
214  }
215  if (last_finalized_msg) {
216  conn->send(*last_finalized_msg);
217  }
218  }
219  frequent_timer_ = scheduler_->scheduleWithHandle(
221  }
222 
224  delayed_timer_.cancel();
225  if (shutdown_requested_) {
226  return;
227  }
228  std::optional<MessageHandle> system_msg_1, system_msg_2;
229  auto refs = connections_.size();
230  system_msg_1 = message_pool_->push(systemIntervalMessage1(), refs);
231  system_msg_2 = message_pool_->push(systemIntervalMessage2(), refs);
232 
233  for (auto &conn : connections_) {
234  if (system_msg_1) {
235  conn->send(*system_msg_1);
236  }
237  if (system_msg_2) {
238  conn->send(*system_msg_2);
239  }
240  }
241  delayed_timer_ = scheduler_->scheduleWithHandle(
243  }
244 
246  greeting_json_.SetObject();
247  auto &allocator = greeting_json_.GetAllocator();
248  auto str_val = [&allocator](const std::string &str) -> rapidjson::Value & {
249  static rapidjson::Value val;
250  val.SetString(str.c_str(),
251  static_cast<rapidjson::SizeType>(str.length()),
252  allocator);
253  return val;
254  };
255 
256  auto startup_time =
257  std::to_string(std::chrono::duration_cast<std::chrono::milliseconds>(
258  std::chrono::system_clock::now().time_since_epoch())
259  .count());
260 
261  rapidjson::Value payload(rapidjson::kObjectType);
262  payload
263  .AddMember(
264  "authority",
265  static_cast<bool>(app_configuration_.roles().flags.authority),
266  allocator)
267  .AddMember("chain", str_val(chain_spec_.name()), allocator)
268  .AddMember("config", str_val(""), allocator)
269  .AddMember("genesis_hash", str_val(genesis_hash_), allocator)
270  .AddMember("implementation", str_val(kImplementationName), allocator)
271  .AddMember("msg", str_val("system.connected"), allocator)
272  .AddMember("name", str_val(app_configuration_.nodeName()), allocator)
273  .AddMember("network_id", str_val(host_.getId().toBase58()), allocator)
274  .AddMember("startup_time", str_val(startup_time), allocator)
275  .AddMember(
276  "version", str_val(app_configuration_.nodeVersion()), allocator);
277 
278  greeting_json_.AddMember("id", 1, allocator)
279  .AddMember("payload", payload, allocator)
280  .AddMember("ts", str_val(""), allocator);
281  }
282 
284  // UTC time works just fine.
285  // The approach allows us just to append zero offset and avoid computation
286  // of actual offset and modifying the offset string and timestamp itself.
287  auto t = boost::posix_time::microsec_clock::universal_time();
288  return boost::posix_time::to_iso_extended_string(t) + "+00:00";
289  }
290 
292  auto &allocator = greeting_json_.GetAllocator();
293  auto timestamp_string = currentTimestamp();
294  greeting_json_["ts"].SetString(
295  timestamp_string.c_str(), timestamp_string.length(), allocator);
296 
297  return json2string(greeting_json_);
298  }
299 
301  const primitives::BlockInfo &info, BlockOrigin origin) {
302  if (not enabled_ or shutdown_requested_) {
303  return;
304  }
305  std::lock_guard lock(cache_mutex_);
306  last_imported_.block = info;
307  last_imported_.origin = origin;
308  last_imported_.is_set = true;
309  }
310 
312  const primitives::BlockInfo &info) {
313  if (not enabled_ or shutdown_requested_) {
314  return;
315  }
316  if (info.number > last_finalized_.block.number) {
317  std::lock_guard lock(cache_mutex_);
318  last_finalized_.block = info;
319  }
320  }
321 
323  const primitives::BlockInfo &info, std::optional<BlockOrigin> origin) {
324  std::string event_name =
325  origin.has_value() ? "block.import" : "notify.finalized";
326 
327  rapidjson::Document document;
328  document.SetObject();
329  auto &allocator = document.GetAllocator();
330  auto str_val = [&allocator](const std::string &str) -> rapidjson::Value & {
331  static rapidjson::Value val;
332  val.SetString(str.c_str(),
333  static_cast<rapidjson::SizeType>(str.length()),
334  allocator);
335  return val;
336  };
337 
338  rapidjson::Value payload(rapidjson::kObjectType), height;
339 
340  payload.AddMember(
341  "best", str_val(fmt::format("{:l}", info.hash)), allocator);
342 
343  if (origin.has_value()) {
344  height.SetInt(info.number);
345  rapidjson::Value origin_val;
346  using o = BlockOrigin;
347  switch (origin.value()) {
348  case o::kGenesis:
349  origin_val = str_val("Genesis");
350  break;
352  origin_val = was_synchronized_ ? str_val("NetworkBroadcast")
353  : str_val("NetworkInitialSync");
354  break;
356  origin_val = str_val("NetworkBroadcast");
357  break;
359  origin_val = str_val("ConsensusBroadcast");
360  break;
361  case o::kOwn:
362  origin_val = str_val("Own");
363  break;
364  case o::kFile:
365  default:
366  origin_val = str_val("File");
367  }
368  payload.AddMember("origin", origin_val, allocator);
369  } else {
370  auto finalized_height_str = std::to_string(info.number);
371  height.SetString(
372  finalized_height_str.c_str(),
373  static_cast<rapidjson::SizeType>(finalized_height_str.length()),
374  allocator);
375  }
376  payload.AddMember("height", height, allocator)
377  .AddMember("msg", str_val(event_name), allocator);
378 
379  document.AddMember("id", 1, allocator)
380  .AddMember("payload", payload, allocator)
381  .AddMember("ts", str_val(currentTimestamp()), allocator);
382 
383  return json2string(document);
384  }
385 
387  rapidjson::Document document;
388  document.SetObject();
389  auto &allocator = document.GetAllocator();
390  auto str_val = [&allocator](const std::string &str) -> rapidjson::Value & {
391  static rapidjson::Value val;
392  val.SetString(str.c_str(),
393  static_cast<rapidjson::SizeType>(str.length()),
394  allocator);
395  return val;
396  };
397 
398  rapidjson::Value payload(rapidjson::kObjectType);
399 
400  rapidjson::Value height, finalized_height, tx_count, state_size, best_hash,
401  finalized_hash;
402  {
403  std::lock_guard lock(cache_mutex_);
404  height.SetInt(last_imported_.block.number);
405  finalized_height.SetInt(last_finalized_.block.number);
406  tx_count.SetInt(tx_pool_->getStatus().ready_num);
407  state_size.SetInt(buffer_storage_->size());
408  best_hash = str_val(fmt::format("{:l}", last_imported_.block.hash));
409  finalized_hash = str_val(fmt::format("{:l}", last_finalized_.block.hash));
410  }
411 
412  // fields order is preserved the same way substrate orders it
413  payload.AddMember("best", best_hash, allocator)
414  .AddMember("finalized_hash", finalized_hash, allocator)
415  .AddMember("finalized_height", finalized_height, allocator)
416  .AddMember("height", height, allocator)
417  .AddMember("msg", str_val("system.interval"), allocator)
418  .AddMember("txcount", tx_count, allocator)
419  .AddMember("used_state_cache_size", state_size, allocator);
420 
421  document.AddMember("id", 1, allocator)
422  .AddMember("payload", payload, allocator)
423  .AddMember("ts", str_val(currentTimestamp()), allocator);
424 
425  return json2string(document);
426  }
427 
429  rapidjson::Document document;
430  document.SetObject();
431  auto &allocator = document.GetAllocator();
432  auto str_val = [&allocator](const std::string &str) -> rapidjson::Value & {
433  static rapidjson::Value val;
434  val.SetString(str.c_str(),
435  static_cast<rapidjson::SizeType>(str.length()),
436  allocator);
437  return val;
438  };
439 
440  rapidjson::Value payload(rapidjson::kObjectType);
441 
442  rapidjson::Value bandwidth_down, bandwidth_up, peers_count;
443  auto active_peers = peer_manager_->activePeersNumber();
444  // we are not actually measuring bandwidth. the following will just let us
445  // see the history of active peers count change in the telemetry UI
446  auto peers_to_bandwidth = active_peers * 1'000'000;
447  bandwidth_down.SetInt(peers_to_bandwidth);
448  bandwidth_up.SetInt(peers_to_bandwidth);
449  peers_count.SetInt(active_peers);
450 
451  // fields order is preserved the same way substrate orders it
452  payload.AddMember("bandwidth_download", bandwidth_down, allocator)
453  .AddMember("bandwidth_upload", bandwidth_up, allocator)
454  .AddMember("msg", str_val("system.interval"), allocator)
455  .AddMember("peers", peers_count, allocator);
456 
457  document.AddMember("id", 1, allocator)
458  .AddMember("payload", payload, allocator)
459  .AddMember("ts", str_val(currentTimestamp()), allocator);
460 
461  return json2string(document);
462  }
463 
465  const primitives::BlockHash &hash) {
466  genesis_hash_ = fmt::format("{:l}", hash);
467  }
468 
470  was_synchronized_ = true;
471  }
472 
474  return enabled_;
475  }
476 } // namespace kagome::telemetry
static constexpr auto kTelemetryReportingInterval
std::shared_ptr< boost::asio::io_context > io_context_
const application::ChainSpec & chain_spec_
struct kagome::network::Roles::@11 flags
virtual bool isTelemetryEnabled() const =0
void frequentNotificationsRoutine()
produces and sends notifications about best and finalized block
void setGenesisBlockHash(const primitives::BlockHash &hash) override
static constexpr auto kImplementationName
libp2p::basic::Scheduler::Handle delayed_timer_
std::vector< TelemetryEndpoint > chainSpecEndpoints() const
parse telemetry endpoints from chain specification
std::string_view to_string(SlotType s)
Definition: slot.hpp:22
Genesis block built into the client.
void notifyBlockFinalized(const primitives::BlockInfo &info) override
virtual const std::string & name() const =0
std::string systemIntervalMessage1()
compose system health notification of the first format
virtual network::Roles roles() const =0
virtual const std::vector< telemetry::TelemetryEndpoint > & telemetryEndpoints() const =0
Block is part of the initial sync with the network.
static constexpr auto kTelemetryMessageMaxLengthBytes
std::string systemIntervalMessage2()
compose system health notification of the second format
struct kagome::telemetry::TelemetryServiceImpl::@13 last_imported_
static Uri parse(std::string_view uri)
Definition: uri.cpp:42
void notifyBlockImported(const primitives::BlockInfo &info, BlockOrigin origin) override
std::shared_ptr< libp2p::basic::Scheduler > scheduler_
static constexpr auto kTelemetrySystemReportInterval
void delayedNotificationsRoutine()
produces and sends system health notifications
std::string blockNotification(const primitives::BlockInfo &info, std::optional< BlockOrigin > origin)
std::string connectedMessage()
produces the greeting message for the (re-)established connections
struct kagome::telemetry::TelemetryServiceImpl::@14 last_finalized_
virtual const std::string & nodeVersion() const =0
Block was broadcasted on the network.
Block that was collated by this node.
libp2p::basic::Scheduler::Handle frequent_timer_
std::shared_ptr< application::AppStateManager > app_state_manager_
Block was imported from a file.
std::shared_ptr< const storage::BufferStorage > buffer_storage_
std::vector< std::shared_ptr< TelemetryConnection > > connections_
std::shared_ptr< std::thread > worker_thread_
std::shared_ptr< const network::PeerManager > peer_manager_
virtual const std::string & nodeName() const =0
const application::AppConfiguration & app_configuration_
Logger createLogger(const std::string &tag)
Definition: logger.cpp:112
static constexpr auto kTelemetryMessagePoolSize
virtual const std::vector< std::pair< std::string, size_t > > & telemetryEndpoints() const =0
::std::size_t SizeType
std::shared_ptr< MessagePool > message_pool_
std::shared_ptr< const transaction_pool::TransactionPool > tx_pool_
std::shared_ptr< WorkGuardT > work_guard_