Kagome
Polkadot Runtime Engine in C++17
ws_session.hpp
Go to the documentation of this file.
1 
6 #ifndef KAGOME_CORE_API_TRANSPORT_IMPL_WS_SESSION_HPP
7 #define KAGOME_CORE_API_TRANSPORT_IMPL_WS_SESSION_HPP
8 
9 #include <chrono>
10 #include <cstdlib>
11 #include <memory>
12 #include <queue>
13 
14 #include <boost/asio/strand.hpp>
15 #include <boost/beast/core/multi_buffer.hpp>
16 #include <boost/beast/core/tcp_stream.hpp>
17 #include <boost/beast/websocket.hpp>
18 
20 #include "log/logger.hpp"
21 
22 namespace kagome::api {
23 
24  class WsSession : public Session,
25  public std::enable_shared_from_this<WsSession> {
26  using WsError = boost::beast::websocket::error;
27  using OnWsSessionCloseHandler = std::function<void()>;
28 
29  public:
30  struct Configuration {
31  static constexpr size_t kDefaultRequestSize = 10000u;
32  static constexpr Duration kDefaultTimeout = std::chrono::seconds(30);
33 
34  size_t max_request_size{kDefaultRequestSize};
35  Duration operation_timeout{kDefaultTimeout};
36  };
37 
38  ~WsSession() override = default;
39 
46  WsSession(Context &context, Configuration config, SessionId id);
47 
48  Socket &socket() override {
49  return socket_;
50  }
51 
55  void start() override;
56 
61  Session::SessionId id() const override;
62 
67  SessionType type() const override {
68  return SessionType::kWs;
69  }
70 
75  void respond(std::string_view response) override;
76 
80  void reject();
81 
88 
89  private:
93  void stop();
94 
99  void stop(boost::beast::websocket::close_code code);
100 
106  void handleRequest(std::string_view data);
107 
111  void asyncRead();
112 
116  void asyncWrite();
117 
121  void onRun();
122 
126  void onAccept(boost::system::error_code ec);
127 
131  void onRead(boost::system::error_code ec, std::size_t size);
132 
136  void onWrite(boost::system::error_code ec, std::size_t bytes_transferred);
137 
143  void reportError(boost::system::error_code ec, std::string_view message);
144 
146  boost::asio::strand<boost::asio::io_context::executor_type> strand_;
147 
149  boost::asio::ip::tcp::socket socket_;
150 
152  boost::beast::websocket::stream<boost::asio::ip::tcp::socket &> stream_;
153  boost::beast::flat_buffer rbuffer_;
154  boost::beast::flat_buffer wbuffer_;
155 
156  std::mutex cs_;
157  std::queue<std::string> pending_responses_;
158 
159  std::atomic_bool writing_in_progress_ = false;
160  std::atomic_bool stopped_ = false;
161 
162  SessionId const id_;
164  log::Logger logger_ = log::createLogger("WsSession", "rpc_transport");
165  };
166 
167 } // namespace kagome::api
168 
169 #endif // KAGOME_CORE_API_TRANSPORT_BEAST_HTTP_SESSION_HPP
std::queue< std::string > pending_responses_
Definition: ws_session.hpp:157
boost::beast::websocket::stream< boost::asio::ip::tcp::socket & > stream_
Definition: ws_session.hpp:152
std::atomic_bool writing_in_progress_
Definition: ws_session.hpp:159
boost::beast::flat_buffer wbuffer_
write buffer
Definition: ws_session.hpp:154
Timer::duration Duration
Definition: session.hpp:42
Configuration config_
session configuration
Definition: ws_session.hpp:151
OnWsSessionCloseHandler on_ws_close_
Definition: ws_session.hpp:163
void respond(std::string_view response) override
sends response wrapped by websocket frame
Definition: ws_session.cpp:79
void onRead(boost::system::error_code ec, std::size_t size)
read completion callback
Definition: ws_session.cpp:137
std::atomic_bool stopped_
Definition: ws_session.hpp:160
Socket & socket() override
Definition: ws_session.hpp:48
boost::asio::ip::tcp::socket Socket
Definition: session.hpp:37
Session::SessionId id() const override
method to get id of the session
Definition: ws_session.cpp:75
SessionType type() const override
method to get type of the session
Definition: ws_session.hpp:67
WsSession(Context &context, Configuration config, SessionId id)
constructor
Definition: ws_session.cpp:15
boost::asio::ip::tcp::socket socket_
Socket for the connection.
Definition: ws_session.hpp:149
~WsSession() override=default
void connectOnWsSessionCloseHandler(OnWsSessionCloseHandler &&handler)
connects on websocket close callback. Used to maintain the maximum number of simultaneous sessions ...
Definition: ws_session.cpp:59
std::shared_ptr< soralog::Logger > Logger
Definition: logger.hpp:23
void onWrite(boost::system::error_code ec, std::size_t bytes_transferred)
write completion callback
Definition: ws_session.cpp:155
void onAccept(boost::system::error_code ec)
handshake completion callback
Definition: ws_session.cpp:125
void onRun()
connected callback
Definition: ws_session.cpp:108
rpc session
Definition: session.hpp:27
void asyncRead()
asynchronously read
Definition: ws_session.cpp:69
void start() override
starts session
Definition: ws_session.cpp:22
static constexpr Duration kDefaultTimeout
Definition: ws_session.hpp:32
boost::beast::websocket::error WsError
Definition: ws_session.hpp:26
static constexpr size_t kDefaultRequestSize
Definition: ws_session.hpp:31
SessionId const id_
Definition: ws_session.hpp:162
boost::beast::flat_buffer rbuffer_
read buffer
Definition: ws_session.hpp:153
void asyncWrite()
asynchronously write
Definition: ws_session.cpp:88
boost::asio::strand< boost::asio::io_context::executor_type > strand_
Strand to ensure the connection&#39;s handlers are not called concurrently.
Definition: ws_session.hpp:146
Logger createLogger(const std::string &tag)
Definition: logger.cpp:112
uint64_t SessionId
Definition: session.hpp:43
void handleRequest(std::string_view data)
process received websocket frame, compose and execute response
Definition: ws_session.cpp:64
void reportError(boost::system::error_code ec, std::string_view message)
reports error code and message
Definition: ws_session.cpp:188
std::function< void()> OnWsSessionCloseHandler
Definition: ws_session.hpp:27
void stop()
stops session
Definition: ws_session.cpp:34
void reject()
Closes the incoming connection with "try again later" response.
Definition: ws_session.cpp:29