8 #include <boost/asio.hpp> 13 constexpr
auto openedRpcSessionMetricName =
"kagome_rpc_sessions_opened";
14 constexpr
auto closedRpcSessionMetricName =
"kagome_rpc_sessions_closed";
19 const std::shared_ptr<application::AppStateManager> &app_state_manager,
20 std::shared_ptr<Context> context,
23 : context_{std::move(context)},
24 config_{std::move(listener_config)},
30 BOOST_ASSERT(app_state_manager);
34 openedRpcSessionMetricName,
"Number of persistent RPC sessions opened");
36 registry_->registerCounterMetric(openedRpcSessionMetricName);
38 closedRpcSessionMetricName,
"Number of persistent RPC sessions closed");
40 registry_->registerCounterMetric(closedRpcSessionMetricName);
42 app_state_manager->takeControl(*
this);
49 }
catch (
const boost::wrapexcept<boost::system::system_error> &exception) {
50 SL_CRITICAL(
log_,
"Failed to prepare a listener: {}", exception.what());
52 }
catch (
const std::exception &exception) {
54 log_,
"Exception when preparing a listener: {}", exception.what());
58 boost::system::error_code ec;
59 acceptor_->set_option(boost::asio::socket_base::reuse_address(
true), ec);
61 SL_ERROR(
log_,
"Failed to set `reuse address` option to acceptor");
71 SL_ERROR(
log_,
"An attempt to start on non-opened acceptor");
76 "Listening for new connections on {}:{}",
92 std::make_unique<NewSessionHandler>(std::move(on_new_session));
98 auto session_stopped_handler = [wp = weak_from_this()] {
99 if (
auto self = wp.lock()) {
100 self->closed_session_->inc();
101 --
self->active_connections_;
103 "Session closed. Active connections count is {}",
104 self->active_connections_.load());
108 auto on_accept = [wp = weak_from_this(),
109 session_stopped_handler](boost::system::error_code ec) {
110 if (
auto self = wp.lock()) {
112 self->new_session_->connectOnWsSessionCloseHandler(
113 session_stopped_handler);
114 if (1 + self->active_connections_.fetch_add(1)
115 >
self->max_ws_connections_) {
116 self->new_session_->reject();
118 "Connection limit ({}) reached, new connection rejected. " 119 "Active connections count is {}",
120 self->max_ws_connections_,
121 self->active_connections_.load());
123 self->opened_session_->inc();
124 if (self->on_new_session_) {
125 (*
self->on_new_session_)(self->new_session_);
127 self->new_session_->start();
129 "New session started. Active connections count is {}",
130 self->active_connections_.load());
133 if (self->acceptor_->is_open()) {
std::atomic< uint32_t > active_connections_
const SessionImpl::Configuration session_config_
void acceptOnce() override
Accept incoming connection.
void setHandlerForNewSession(NewSessionHandler &&on_new_session) override
Set handler for working new session.
std::unique_ptr< NewSessionHandler > on_new_session_
std::atomic< Session::SessionId > next_session_id_
std::shared_ptr< Context > context_
std::unique_ptr< Acceptor > acceptor_
std::function< void(const std::shared_ptr< Session > &)> NewSessionHandler
std::unique_ptr< Acceptor > acceptOnFreePort(std::shared_ptr< boost::asio::io_context > context, Endpoint endpoint, uint16_t port_tolerance, const log::Logger &logger)
metrics::RegistryPtr registry_
std::shared_ptr< SessionImpl > new_session_
Endpoint endpoint
listening endpoint
metrics::Counter * opened_session_
const uint32_t max_ws_connections_
metrics::Counter * closed_session_
constexpr uint16_t kDefaultPortTolerance
Logger createLogger(const std::string &tag)
WsListenerImpl(const std::shared_ptr< application::AppStateManager > &app_state_manager, std::shared_ptr< Context > context, Configuration listener_config, SessionImpl::Configuration session_config)
uint32_t ws_max_connections
max allowed simultaneous connections through websocket
const Configuration config_