Kagome
Polkadot Runtime Engine in C++17
ws_session.cpp
Go to the documentation of this file.
1 
7 
8 #include <thread>
9 
10 #include <boost/asio/dispatch.hpp>
11 #include <boost/config.hpp>
12 
13 namespace kagome::api {
14 
16  : strand_(boost::asio::make_strand(context)),
17  socket_(strand_),
18  config_{config},
20  id_(id) {}
21 
23  boost::asio::dispatch(stream_.get_executor(),
24  boost::beast::bind_front_handler(&WsSession::onRun,
25  shared_from_this()));
26  SL_DEBUG(logger_, "Session#{} BEGIN", id_);
27  }
28 
30  stop(boost::beast::websocket::close_code::try_again_later);
31  SL_DEBUG(logger_, "Session#{} END", id_);
32  }
33 
34  void WsSession::stop() {
35  // none is the default value for close_reason
36  stop(boost::beast::websocket::close_code::none);
37  }
38 
39  void WsSession::stop(boost::beast::websocket::close_code code) {
40  bool already_stopped = false;
41  if (stopped_.compare_exchange_strong(already_stopped, true)) {
42  boost::system::error_code ec;
43  stream_.close(boost::beast::websocket::close_reason(code), ec);
44  boost::ignore_unused(ec);
46  if (on_ws_close_) {
47  on_ws_close_();
48  }
49  SL_TRACE(logger_, "Session id = {} terminated, reason = {} ", id_, code);
50  } else {
51  SL_TRACE(logger_,
52  "Session id = {} was already terminated. Doing nothing. Called "
53  "for reason = {}",
54  id_,
55  code);
56  }
57  }
58 
61  on_ws_close_ = std::move(handler);
62  }
63 
64  void WsSession::handleRequest(std::string_view data) {
65  SL_DEBUG(logger_, "Session#{} IN: {}", id_, data);
66  processRequest(data, shared_from_this());
67  }
68 
70  stream_.async_read(rbuffer_,
71  boost::beast::bind_front_handler(&WsSession::onRead,
72  shared_from_this()));
73  }
74 
76  return id_;
77  }
78 
79  void WsSession::respond(std::string_view response) {
80  SL_DEBUG(logger_, "Session#{} OUT: {}", id_, response);
81  {
82  std::lock_guard lg{cs_};
83  pending_responses_.emplace(response);
84  }
85  asyncWrite();
86  }
87 
89  bool val = false;
90  if (writing_in_progress_.compare_exchange_strong(val, true)) {
91  std::lock_guard lg{cs_};
92  if (wbuffer_.size() == 0 and not pending_responses_.empty()) {
93  boost::asio::buffer_copy(
94  wbuffer_.prepare(pending_responses_.front().size()),
95  boost::asio::const_buffer(pending_responses_.front().data(),
96  pending_responses_.front().size()));
97  wbuffer_.commit(pending_responses_.front().size());
98  stream_.text(true);
99  pending_responses_.pop();
100 
101  stream_.async_write(wbuffer_.data(),
102  boost::beast::bind_front_handler(
103  &WsSession::onWrite, shared_from_this()));
104  }
105  }
106  }
107 
109  // Set suggested timeout settings for the websocket
110  stream_.set_option(boost::beast::websocket::stream_base::timeout::suggested(
111  boost::beast::role_type::server));
112 
113  // Set a decorator to change the Server of the handshake
114  stream_.set_option(boost::beast::websocket::stream_base::decorator(
115  [](boost::beast::websocket::response_type &res) {
116  res.set(boost::beast::http::field::server,
117  std::string(BOOST_BEAST_VERSION_STRING)
118  + " websocket-server-async");
119  }));
120  // Accept the websocket handshake
121  stream_.async_accept(boost::beast::bind_front_handler(&WsSession::onAccept,
122  shared_from_this()));
123  }
124 
125  void WsSession::onAccept(boost::system::error_code ec) {
126  if (ec) {
127  auto error_message =
128  (ec == WsError::closed) ? "connection was closed" : ec.message();
129  reportError(ec, error_message);
130  stop();
131  return;
132  }
133 
134  asyncRead();
135  };
136 
137  void WsSession::onRead(boost::system::error_code ec,
138  std::size_t bytes_transferred) {
139  if (ec) {
140  auto error_message =
141  (ec == WsError::closed) ? "connection was closed" : ec.message();
142  reportError(ec, error_message);
143  stop();
144  return;
145  }
146 
148  {static_cast<char *>(rbuffer_.data().data()), bytes_transferred});
149 
150  rbuffer_.consume(bytes_transferred);
151 
152  asyncRead();
153  }
154 
155  void WsSession::onWrite(boost::system::error_code ec,
156  std::size_t bytes_transferred) {
157  if (ec) {
158  reportError(ec, "failed to write message");
159  return stop();
160  }
161 
162  wbuffer_.consume(bytes_transferred);
163 
164  if (wbuffer_.size() > 0) {
165  stream_.async_write(wbuffer_.data(),
166  boost::beast::bind_front_handler(&WsSession::onWrite,
167  shared_from_this()));
168  } else {
169  std::lock_guard lg{cs_};
170  if (not pending_responses_.empty()) {
171  boost::asio::buffer_copy(
172  wbuffer_.prepare(pending_responses_.front().size()),
173  boost::asio::const_buffer(pending_responses_.front().data(),
174  pending_responses_.front().size()));
175  wbuffer_.commit(pending_responses_.front().size());
176  stream_.text(true);
177  pending_responses_.pop();
178 
179  stream_.async_write(wbuffer_.data(),
180  boost::beast::bind_front_handler(
181  &WsSession::onWrite, shared_from_this()));
182  } else {
183  writing_in_progress_ = false;
184  }
185  }
186  }
187 
188  void WsSession::reportError(boost::system::error_code ec,
189  std::string_view message) {
190  SL_ERROR(logger_,
191  "error occurred: {}, code: {}, message: {}",
192  message,
193  ec,
194  ec.message());
195  }
196 
197 } // namespace kagome::api
std::queue< std::string > pending_responses_
Definition: ws_session.hpp:157
boost::beast::websocket::stream< boost::asio::ip::tcp::socket & > stream_
Definition: ws_session.hpp:152
std::atomic_bool writing_in_progress_
Definition: ws_session.hpp:159
boost::beast::flat_buffer wbuffer_
write buffer
Definition: ws_session.hpp:154
OnWsSessionCloseHandler on_ws_close_
Definition: ws_session.hpp:163
void respond(std::string_view response) override
sends response wrapped by websocket frame
Definition: ws_session.cpp:79
void onRead(boost::system::error_code ec, std::size_t size)
read completion callback
Definition: ws_session.cpp:137
std::atomic_bool stopped_
Definition: ws_session.hpp:160
Session::SessionId id() const override
method to get id of the session
Definition: ws_session.cpp:75
SessionType type() const override
method to get type of the session
Definition: ws_session.hpp:67
WsSession(Context &context, Configuration config, SessionId id)
constructor
Definition: ws_session.cpp:15
boost::asio::ip::tcp::socket socket_
Socket for the connection.
Definition: ws_session.hpp:149
void processRequest(std::string_view request, std::shared_ptr< Session > session)
process request message
Definition: session.hpp:77
void connectOnWsSessionCloseHandler(OnWsSessionCloseHandler &&handler)
connects on websocket close callback. Used to maintain the maximum number of simultaneous sessions ...
Definition: ws_session.cpp:59
void onWrite(boost::system::error_code ec, std::size_t bytes_transferred)
write completion callback
Definition: ws_session.cpp:155
void onAccept(boost::system::error_code ec)
handshake completion callback
Definition: ws_session.cpp:125
void onRun()
connected callback
Definition: ws_session.cpp:108
void asyncRead()
asynchronously read
Definition: ws_session.cpp:69
void start() override
starts session
Definition: ws_session.cpp:22
SessionId const id_
Definition: ws_session.hpp:162
boost::beast::flat_buffer rbuffer_
read buffer
Definition: ws_session.hpp:153
void asyncWrite()
asynchronously write
Definition: ws_session.cpp:88
uint64_t SessionId
Definition: session.hpp:43
void handleRequest(std::string_view data)
process received websocket frame, compose and execute response
Definition: ws_session.cpp:64
void reportError(boost::system::error_code ec, std::string_view message)
reports error code and message
Definition: ws_session.cpp:188
std::function< void()> OnWsSessionCloseHandler
Definition: ws_session.hpp:27
void stop()
stops session
Definition: ws_session.cpp:34
void notifyOnClose(SessionId id, SessionType type)
makes on close notification to listener
Definition: session.hpp:93
void reject()
Closes the incoming connection with "try again later" response.
Definition: ws_session.cpp:29