12 #define RAPIDJSON_NO_SIZETYPEDEFINE 16 #include <rapidjson/document.h> 17 #include <rapidjson/rapidjson.h> 18 #include <rapidjson/stringbuffer.h> 19 #include <rapidjson/writer.h> 21 #include <boost/date_time/posix_time/posix_time.hpp> 22 #include <libp2p/basic/scheduler/asio_scheduler_backend.hpp> 23 #include <libp2p/basic/scheduler/scheduler_impl.hpp> 24 #include <libp2p/multi/multiaddress.hpp> 29 std::string json2string(rapidjson::Document &document) {
30 rapidjson::StringBuffer buffer;
31 rapidjson::Writer writer(buffer);
32 document.Accept(writer);
33 return buffer.GetString();
39 TelemetryServiceImpl::TelemetryServiceImpl(
40 std::shared_ptr<application::AppStateManager> app_state_manager,
44 std::shared_ptr<const transaction_pool::TransactionPool> tx_pool,
45 std::shared_ptr<const storage::BufferStorage> buffer_storage,
46 std::shared_ptr<const network::PeerManager> peer_manager)
47 : app_state_manager_{std::move(app_state_manager)},
60 io_context_ = std::make_shared<boost::asio::io_context>();
61 auto scheduler_asio_backend =
62 std::make_shared<libp2p::basic::AsioSchedulerBackend>(
io_context_);
63 scheduler_ = std::make_shared<libp2p::basic::SchedulerImpl>(
64 scheduler_asio_backend, libp2p::basic::Scheduler::Config{});
70 [&]() {
return start(); },
73 SL_INFO(
log_,
"Telemetry disabled");
81 const auto &endpoints = cli_config.empty() ? chain_spec : cli_config;
83 for (
const auto &endpoint : endpoints) {
84 auto connection = std::make_shared<TelemetryConnectionImpl>(
88 [&](std::shared_ptr<TelemetryConnection> conn) {
106 connection->connect();
120 connection->shutdown();
127 std::vector<TelemetryEndpoint> endpoints;
129 endpoints.reserve(from_spec.size());
130 for (
const auto &endpoint : from_spec) {
132 auto uri_candidate = endpoint.first;
134 if (not uri_candidate.empty() and
'/' == uri_candidate.at(0)) {
136 auto ma_res = libp2p::multi::Multiaddress::create(uri_candidate);
137 if (ma_res.has_error()) {
139 "Telemetry endpoint '{}' cannot be interpreted as a valid " 140 "multiaddress and was skipped due to error: {}",
142 ma_res.error().message());
148 auto parts = ma_res.value().getProtocolsWithValues();
149 if (parts.size() != 3) {
152 "Telemetry endpoint '{}' has unknown format and was skipped",
156 auto host = parts[0].second;
157 auto schema = parts[2].first.name.substr(std::strlen(
"x-parity-"));
159 std::regex_replace(parts[2].second, std::regex(
"%2F"),
"/");
160 uri_candidate = fmt::format(
"{}://{}{}", schema, host, path);
164 if (parsed_uri.error().has_value()) {
166 "Telemetry endpoint '{}' cannot be interpreted as a valid URI " 167 "and was skipped due to error: {}",
169 parsed_uri.error().value());
173 auto &verbosity = endpoint.second;
176 "Telemetry endpoint '{}' is not valid, its verbosity level is " 177 "above the maximum possible {} > 9",
182 endpoints.emplace_back(std::move(parsed_uri), verbosity);
192 std::optional<MessageHandle> last_imported_msg, last_finalized_msg;
212 if (last_imported_msg) {
213 conn->send(*last_imported_msg);
215 if (last_finalized_msg) {
216 conn->send(*last_finalized_msg);
228 std::optional<MessageHandle> system_msg_1, system_msg_2;
235 conn->send(*system_msg_1);
238 conn->send(*system_msg_2);
248 auto str_val = [&allocator](
const std::string &str) -> rapidjson::Value & {
249 static rapidjson::Value val;
250 val.SetString(str.c_str(),
257 std::to_string(std::chrono::duration_cast<std::chrono::milliseconds>(
258 std::chrono::system_clock::now().time_since_epoch())
261 rapidjson::Value payload(rapidjson::kObjectType);
268 .AddMember(
"config", str_val(
""), allocator)
269 .AddMember(
"genesis_hash", str_val(
genesis_hash_), allocator)
271 .AddMember(
"msg", str_val(
"system.connected"), allocator)
273 .AddMember(
"network_id", str_val(
host_.getId().toBase58()), allocator)
274 .AddMember(
"startup_time", str_val(startup_time), allocator)
279 .AddMember(
"payload", payload, allocator)
280 .AddMember(
"ts", str_val(
""), allocator);
287 auto t = boost::posix_time::microsec_clock::universal_time();
288 return boost::posix_time::to_iso_extended_string(t) +
"+00:00";
295 timestamp_string.c_str(), timestamp_string.length(), allocator);
324 std::string event_name =
325 origin.has_value() ?
"block.import" :
"notify.finalized";
327 rapidjson::Document document;
328 document.SetObject();
329 auto &allocator = document.GetAllocator();
330 auto str_val = [&allocator](
const std::string &str) -> rapidjson::Value & {
331 static rapidjson::Value val;
332 val.SetString(str.c_str(),
338 rapidjson::Value payload(rapidjson::kObjectType), height;
341 "best", str_val(fmt::format(
"{:l}", info.
hash)), allocator);
343 if (origin.has_value()) {
344 height.SetInt(info.
number);
345 rapidjson::Value origin_val;
347 switch (origin.value()) {
349 origin_val = str_val(
"Genesis");
353 : str_val(
"NetworkInitialSync");
356 origin_val = str_val(
"NetworkBroadcast");
359 origin_val = str_val(
"ConsensusBroadcast");
362 origin_val = str_val(
"Own");
366 origin_val = str_val(
"File");
368 payload.AddMember(
"origin", origin_val, allocator);
372 finalized_height_str.c_str(),
376 payload.AddMember(
"height", height, allocator)
377 .AddMember(
"msg", str_val(event_name), allocator);
379 document.AddMember(
"id", 1, allocator)
380 .AddMember(
"payload", payload, allocator)
383 return json2string(document);
387 rapidjson::Document document;
388 document.SetObject();
389 auto &allocator = document.GetAllocator();
390 auto str_val = [&allocator](
const std::string &str) -> rapidjson::Value & {
391 static rapidjson::Value val;
392 val.SetString(str.c_str(),
398 rapidjson::Value payload(rapidjson::kObjectType);
400 rapidjson::Value height, finalized_height, tx_count, state_size, best_hash,
406 tx_count.SetInt(
tx_pool_->getStatus().ready_num);
408 best_hash = str_val(fmt::format(
"{:l}",
last_imported_.block.hash));
409 finalized_hash = str_val(fmt::format(
"{:l}",
last_finalized_.block.hash));
413 payload.AddMember(
"best", best_hash, allocator)
414 .AddMember(
"finalized_hash", finalized_hash, allocator)
415 .AddMember(
"finalized_height", finalized_height, allocator)
416 .AddMember(
"height", height, allocator)
417 .AddMember(
"msg", str_val(
"system.interval"), allocator)
418 .AddMember(
"txcount", tx_count, allocator)
419 .AddMember(
"used_state_cache_size", state_size, allocator);
421 document.AddMember(
"id", 1, allocator)
422 .AddMember(
"payload", payload, allocator)
425 return json2string(document);
429 rapidjson::Document document;
430 document.SetObject();
431 auto &allocator = document.GetAllocator();
432 auto str_val = [&allocator](
const std::string &str) -> rapidjson::Value & {
433 static rapidjson::Value val;
434 val.SetString(str.c_str(),
440 rapidjson::Value payload(rapidjson::kObjectType);
442 rapidjson::Value bandwidth_down, bandwidth_up, peers_count;
446 auto peers_to_bandwidth = active_peers * 1
'000'000;
447 bandwidth_down.SetInt(peers_to_bandwidth);
448 bandwidth_up.SetInt(peers_to_bandwidth);
449 peers_count.SetInt(active_peers);
452 payload.AddMember(
"bandwidth_download", bandwidth_down, allocator)
453 .AddMember(
"bandwidth_upload", bandwidth_up, allocator)
454 .AddMember(
"msg", str_val(
"system.interval"), allocator)
455 .AddMember(
"peers", peers_count, allocator);
457 document.AddMember(
"id", 1, allocator)
458 .AddMember(
"payload", payload, allocator)
461 return json2string(document);
static constexpr auto kTelemetryReportingInterval
std::shared_ptr< boost::asio::io_context > io_context_
const application::ChainSpec & chain_spec_
struct kagome::network::Roles::@11 flags
virtual bool isTelemetryEnabled() const =0
void frequentNotificationsRoutine()
produces and sends notifications about best and finalized block
void setGenesisBlockHash(const primitives::BlockHash &hash) override
static constexpr auto kImplementationName
libp2p::basic::Scheduler::Handle delayed_timer_
std::vector< TelemetryEndpoint > chainSpecEndpoints() const
parse telemetry endpoints from chain specification
std::string_view to_string(SlotType s)
Genesis block built into the client.
void notifyBlockFinalized(const primitives::BlockInfo &info) override
virtual const std::string & name() const =0
std::string systemIntervalMessage1()
compose system health notification of the first format
virtual network::Roles roles() const =0
virtual const std::vector< telemetry::TelemetryEndpoint > & telemetryEndpoints() const =0
Block is part of the initial sync with the network.
static constexpr auto kTelemetryMessageMaxLengthBytes
std::string systemIntervalMessage2()
compose system health notification of the second format
volatile bool shutdown_requested_
struct kagome::telemetry::TelemetryServiceImpl::@13 last_imported_
static Uri parse(std::string_view uri)
void notifyBlockImported(const primitives::BlockInfo &info, BlockOrigin origin) override
std::shared_ptr< libp2p::basic::Scheduler > scheduler_
static constexpr auto kTelemetrySystemReportInterval
void delayedNotificationsRoutine()
produces and sends system health notifications
std::string blockNotification(const primitives::BlockInfo &info, std::optional< BlockOrigin > origin)
std::string connectedMessage()
produces the greeting message for the (re-)established connections
struct kagome::telemetry::TelemetryServiceImpl::@14 last_finalized_
virtual const std::string & nodeVersion() const =0
Block was broadcasted on the network.
Block that was collated by this node.
libp2p::basic::Scheduler::Handle frequent_timer_
std::shared_ptr< application::AppStateManager > app_state_manager_
Block was imported from a file.
std::shared_ptr< const storage::BufferStorage > buffer_storage_
std::vector< std::shared_ptr< TelemetryConnection > > connections_
std::shared_ptr< std::thread > worker_thread_
std::string genesis_hash_
std::string currentTimestamp() const
std::shared_ptr< const network::PeerManager > peer_manager_
virtual const std::string & nodeName() const =0
void prepareGreetingMessage()
const application::AppConfiguration & app_configuration_
const libp2p::Host & host_
bool isEnabled() const override
Logger createLogger(const std::string &tag)
common::spin_lock cache_mutex_
static constexpr auto kTelemetryMessagePoolSize
virtual const std::vector< std::pair< std::string, size_t > > & telemetryEndpoints() const =0
std::shared_ptr< MessagePool > message_pool_
void notifyWasSynchronized() override
std::shared_ptr< const transaction_pool::TransactionPool > tx_pool_
std::shared_ptr< WorkGuardT > work_guard_
rapidjson::Document greeting_json_