Kagome
Polkadot Runtime Engine in C++17
exposer_impl.cpp
Go to the documentation of this file.
1 
7 
8 #include <thread>
9 
10 #include "api/transport/tuner.hpp"
12 
13 namespace kagome::metrics {
15  const std::shared_ptr<application::AppStateManager> &app_state_manager,
16  Exposer::Configuration exposer_config,
17  Session::Configuration session_config)
18  : logger_{log::createLogger("OpenMetrics", "metrics")},
19  context_{std::make_shared<Context>()},
20  config_{std::move(exposer_config)},
21  session_config_{session_config} {
22  BOOST_ASSERT(app_state_manager);
23  app_state_manager->takeControl(*this);
24  }
25 
27  try {
28  acceptor_ =
32  logger_);
33  } catch (const boost::wrapexcept<boost::system::system_error> &exception) {
34  SL_CRITICAL(
35  logger_, "Failed to prepare a listener: {}", exception.what());
36  return false;
37  } catch (const std::exception &exception) {
38  SL_CRITICAL(
39  logger_, "Exception when preparing a listener: {}", exception.what());
40  return false;
41  }
42 
43  boost::system::error_code ec;
44  acceptor_->set_option(boost::asio::socket_base::reuse_address(true), ec);
45  if (ec) {
46  logger_->error("Failed to set `reuse address` option to acceptor");
47  return false;
48  }
49  return true;
50  }
51 
53  BOOST_ASSERT(acceptor_);
54 
55  if (!acceptor_->is_open()) {
56  logger_->error("error: trying to start on non opened acceptor");
57  return false;
58  }
59 
60  logger_->info("Listening for new connections on {}:{}",
61  config_.endpoint.address(),
62  acceptor_->local_endpoint().port());
63  acceptOnce();
64 
65  thread_ = std::make_shared<std::thread>(
66  [context = context_]() { context->run(); });
67  thread_->detach();
68 
69  return true;
70  }
71 
73  if (acceptor_) {
74  acceptor_->cancel();
75  }
76  context_->stop();
77  }
78 
80  new_session_ = std::make_shared<SessionImpl>(*context_, session_config_);
81  new_session_->connectOnRequest(std::bind(&Handler::onSessionRequest,
82  handler_.get(),
83  std::placeholders::_1,
84  std::placeholders::_2));
85 
86  auto on_accept = [wp = weak_from_this()](boost::system::error_code ec) {
87  if (auto self = wp.lock()) {
88  if (not ec) {
89  self->new_session_->start();
90  }
91 
92  if (self->acceptor_->is_open()) {
93  // continue to accept until acceptor is ready
94  self->acceptOnce();
95  }
96  }
97  };
98 
99  acceptor_->async_accept(new_session_->socket(), std::move(on_accept));
100  }
101 } // namespace kagome::metrics
std::shared_ptr< std::thread > thread_
std::unique_ptr< Acceptor > acceptor_
bool start() override
start interface for AppStateManager
std::shared_ptr< Session > new_session_
ExposerImpl(const std::shared_ptr< application::AppStateManager > &app_state_manager, Exposer::Configuration exposer_config, Session::Configuration session_config)
virtual void onSessionRequest(Session::Request request, std::shared_ptr< Session > session)=0
main interface for session request handling
const Session::Configuration session_config_
std::unique_ptr< Acceptor > acceptOnFreePort(std::shared_ptr< boost::asio::io_context > context, Endpoint endpoint, uint16_t port_tolerance, const log::Logger &logger)
Definition: tuner.cpp:10
const Configuration config_
bool prepare() override
prepare interface for AppStateManager
std::shared_ptr< Handler > handler_
Definition: exposer.hpp:58
void stop() override
stop interface for AppStateManager
constexpr uint16_t kDefaultPortTolerance
Definition: tuner.hpp:17
Logger createLogger(const std::string &tag)
Definition: logger.cpp:112
std::shared_ptr< Context > context_