Kagome
Polkadot Runtime Engine in C++17
ws_listener_impl.cpp
Go to the documentation of this file.
1 
7 
8 #include <boost/asio.hpp>
11 
12 namespace {
13  constexpr auto openedRpcSessionMetricName = "kagome_rpc_sessions_opened";
14  constexpr auto closedRpcSessionMetricName = "kagome_rpc_sessions_closed";
15 } // namespace
16 
17 namespace kagome::api {
19  const std::shared_ptr<application::AppStateManager> &app_state_manager,
20  std::shared_ptr<Context> context,
21  Configuration listener_config,
22  SessionImpl::Configuration session_config)
23  : context_{std::move(context)},
24  config_{std::move(listener_config)},
25  session_config_{session_config},
27  next_session_id_{1ull},
29  log_{log::createLogger("RpcWsListener", "rpc_transport")} {
30  BOOST_ASSERT(app_state_manager);
31 
32  // Register metrics
33  registry_->registerCounterFamily(
34  openedRpcSessionMetricName, "Number of persistent RPC sessions opened");
36  registry_->registerCounterMetric(openedRpcSessionMetricName);
37  registry_->registerCounterFamily(
38  closedRpcSessionMetricName, "Number of persistent RPC sessions closed");
40  registry_->registerCounterMetric(closedRpcSessionMetricName);
41 
42  app_state_manager->takeControl(*this);
43  }
44 
46  try {
49  } catch (const boost::wrapexcept<boost::system::system_error> &exception) {
50  SL_CRITICAL(log_, "Failed to prepare a listener: {}", exception.what());
51  return false;
52  } catch (const std::exception &exception) {
53  SL_CRITICAL(
54  log_, "Exception when preparing a listener: {}", exception.what());
55  return false;
56  }
57 
58  boost::system::error_code ec;
59  acceptor_->set_option(boost::asio::socket_base::reuse_address(true), ec);
60  if (ec) {
61  SL_ERROR(log_, "Failed to set `reuse address` option to acceptor");
62  return false;
63  }
64  return true;
65  }
66 
68  BOOST_ASSERT(acceptor_);
69 
70  if (!acceptor_->is_open()) {
71  SL_ERROR(log_, "An attempt to start on non-opened acceptor");
72  return false;
73  }
74  SL_TRACE(log_, "Connections limit is set to {}", max_ws_connections_);
75  SL_INFO(log_,
76  "Listening for new connections on {}:{}",
77  config_.endpoint.address(),
78  acceptor_->local_endpoint().port());
79  acceptOnce();
80  return true;
81  }
82 
84  if (acceptor_) {
85  acceptor_->cancel();
86  }
87  }
88 
90  NewSessionHandler &&on_new_session) {
92  std::make_unique<NewSessionHandler>(std::move(on_new_session));
93  }
94 
96  new_session_ = std::make_shared<SessionImpl>(
97  *context_, session_config_, next_session_id_.fetch_add(1ull));
98  auto session_stopped_handler = [wp = weak_from_this()] {
99  if (auto self = wp.lock()) {
100  self->closed_session_->inc();
101  --self->active_connections_;
102  SL_TRACE(self->log_,
103  "Session closed. Active connections count is {}",
104  self->active_connections_.load());
105  }
106  };
107 
108  auto on_accept = [wp = weak_from_this(),
109  session_stopped_handler](boost::system::error_code ec) {
110  if (auto self = wp.lock()) {
111  if (not ec) {
112  self->new_session_->connectOnWsSessionCloseHandler(
113  session_stopped_handler);
114  if (1 + self->active_connections_.fetch_add(1)
115  > self->max_ws_connections_) {
116  self->new_session_->reject();
117  SL_TRACE(self->log_,
118  "Connection limit ({}) reached, new connection rejected. "
119  "Active connections count is {}",
120  self->max_ws_connections_,
121  self->active_connections_.load());
122  } else {
123  self->opened_session_->inc();
124  if (self->on_new_session_) {
125  (*self->on_new_session_)(self->new_session_);
126  }
127  self->new_session_->start();
128  SL_TRACE(self->log_,
129  "New session started. Active connections count is {}",
130  self->active_connections_.load());
131  }
132  }
133  if (self->acceptor_->is_open()) {
134  // continue to accept until acceptor is ready
135  self->acceptOnce();
136  }
137  }
138  };
139 
140  acceptor_->async_accept(new_session_->socket(), std::move(on_accept));
141  }
142 } // namespace kagome::api
std::atomic< uint32_t > active_connections_
const SessionImpl::Configuration session_config_
void acceptOnce() override
Accept incoming connection.
void setHandlerForNewSession(NewSessionHandler &&on_new_session) override
Set handler for working new session.
std::unique_ptr< NewSessionHandler > on_new_session_
std::atomic< Session::SessionId > next_session_id_
std::shared_ptr< Context > context_
std::unique_ptr< Acceptor > acceptor_
std::function< void(const std::shared_ptr< Session > &)> NewSessionHandler
Definition: listener.hpp:26
std::unique_ptr< Acceptor > acceptOnFreePort(std::shared_ptr< boost::asio::io_context > context, Endpoint endpoint, uint16_t port_tolerance, const log::Logger &logger)
Definition: tuner.cpp:10
metrics::RegistryPtr registry_
std::shared_ptr< SessionImpl > new_session_
Endpoint endpoint
listening endpoint
Definition: listener.hpp:33
metrics::Counter * opened_session_
metrics::Counter * closed_session_
constexpr uint16_t kDefaultPortTolerance
Definition: tuner.hpp:17
Logger createLogger(const std::string &tag)
Definition: logger.cpp:112
WsListenerImpl(const std::shared_ptr< application::AppStateManager > &app_state_manager, std::shared_ptr< Context > context, Configuration listener_config, SessionImpl::Configuration session_config)
uint32_t ws_max_connections
max allowed simultaneous connections through websocket
Definition: listener.hpp:36