Kagome
Polkadot Runtime Engine in C++17
connection_impl.hpp
Go to the documentation of this file.
1 
6 #ifndef KAGOME_TELEMETRY_CONNECTION_IMPL_HPP
7 #define KAGOME_TELEMETRY_CONNECTION_IMPL_HPP
8 
10 
11 #include <chrono>
12 #include <memory>
13 #include <queue>
14 
15 #include <boost/asio.hpp>
16 #include <boost/asio/ssl.hpp>
17 #include <boost/asio/strand.hpp>
18 #include <boost/beast/core.hpp>
19 #include <boost/beast/ssl.hpp>
20 #include <boost/beast/websocket.hpp>
21 #include <boost/beast/websocket/ssl.hpp>
22 #include <boost/circular_buffer.hpp>
23 #include <boost/variant.hpp>
24 #include <libp2p/basic/scheduler.hpp>
25 #include "log/logger.hpp"
27 
28 namespace kagome::telemetry {
29 
31  static constexpr auto kConnectionTimeout = std::chrono::seconds(30);
32 
34  static constexpr auto kInitialReconnectTimeout = std::chrono::seconds(5);
36  static constexpr auto kReconnectTimeoutIncrement = std::chrono::seconds(5);
38  static constexpr auto kMaxReconnectTimeout = std::chrono::seconds(60);
39 
44  : public TelemetryConnection,
45  public std::enable_shared_from_this<TelemetryConnectionImpl> {
46  public:
56  std::shared_ptr<boost::asio::io_context> io_context,
58  OnConnectedCallback callback,
59  std::shared_ptr<MessagePool> message_pool,
60  std::shared_ptr<libp2p::basic::Scheduler> scheduler);
63 
65  void connect() override;
66 
68  const TelemetryEndpoint &endpoint() const override;
69 
74  void send(const std::string &data) override;
75 
80  void send(MessageHandle message_handle) override;
81 
83  bool isConnected() const override;
84 
86  void shutdown() override;
87 
88  private:
89  using TcpStream = boost::beast::tcp_stream;
90  using SslStream = boost::beast::ssl_stream<TcpStream>;
91  template <typename T>
92  using WsStream = boost::beast::websocket::stream<T>;
95  using WsTcpStreamPtr = std::unique_ptr<WsTcpStream>;
96  using WsSslStreamPtr = std::unique_ptr<WsSslStream>;
97 
98  boost::beast::lowest_layer_type<SslStream> &stream_lowest_layer();
99 
100  template <typename WsStreamT>
101  void write(WsStreamT &ws, MessageHandle message_handle);
102 
103  void onResolve(boost::beast::error_code ec,
104  boost::asio::ip::tcp::resolver::results_type results);
105 
106  void onConnect(
107  boost::beast::error_code ec,
108  boost::asio::ip::tcp::resolver::results_type::endpoint_type endpoint);
109 
110  template <typename WsStreamT>
111  void setOptionsAndRunWsHandshake(WsStreamT &ws);
112 
113  void onSslHandshake(boost::beast::error_code ec);
114 
115  void onHandshake(boost::beast::error_code ec);
116 
117  void releaseQueue();
118 
119  void close();
120 
121  void reconnect();
122 
123  std::shared_ptr<boost::asio::io_context> io_context_;
126  std::shared_ptr<MessagePool> message_pool_;
127  std::shared_ptr<libp2p::basic::Scheduler> scheduler_;
128  bool is_connected_ = false;
129  bool shutdown_requested_ = false;
131 
133  uint16_t port_ = 80;
134  bool secure_ = false;
135  std::string path_;
137 
138  boost::asio::ssl::context ssl_ctx_;
139  boost::asio::ip::tcp::resolver resolver_;
140  boost::variant<WsTcpStreamPtr, WsSslStreamPtr> ws_;
141 
142  static std::size_t instance_;
143  volatile bool busy_ = false;
144  boost::circular_buffer<MessageHandle> queue_;
145  };
146 
147 } // namespace kagome::telemetry
148 
149 #endif // KAGOME_TELEMETRY_CONNECTION_IMPL_HPP
std::unique_ptr< WsTcpStream > WsTcpStreamPtr
void send(const std::string &data) override
boost::beast::ssl_stream< TcpStream > SslStream
void onSslHandshake(boost::beast::error_code ec)
TelemetryConnectionImpl(std::shared_ptr< boost::asio::io_context > io_context, const TelemetryEndpoint &endpoint, OnConnectedCallback callback, std::shared_ptr< MessagePool > message_pool, std::shared_ptr< libp2p::basic::Scheduler > scheduler)
static constexpr auto kReconnectTimeoutIncrement
an addition to to reconnect timeout after failed attempt
void write(WsStreamT &ws, MessageHandle message_handle)
std::shared_ptr< MessagePool > message_pool_
static constexpr auto kConnectionTimeout
operations&#39; timeout during websocket connection establishing
boost::asio::ip::tcp::resolver resolver_
boost::variant< WsTcpStreamPtr, WsSslStreamPtr > ws_
void onConnect(boost::beast::error_code ec, boost::asio::ip::tcp::resolver::results_type::endpoint_type endpoint)
std::function< void(std::shared_ptr< TelemetryConnection >)> OnConnectedCallback
Definition: connection.hpp:35
void shutdown() override
Request graceful connection shutdown.
void connect() override
Initiate connection process.
boost::beast::lowest_layer_type< SslStream > & stream_lowest_layer()
void onResolve(boost::beast::error_code ec, boost::asio::ip::tcp::resolver::results_type results)
std::shared_ptr< soralog::Logger > Logger
Definition: logger.hpp:23
boost::beast::websocket::stream< T > WsStream
static constexpr auto kMaxReconnectTimeout
maximum reconnect timeout value despite reconnect attempts number
boost::circular_buffer< MessageHandle > queue_
std::unique_ptr< WsSslStream > WsSslStreamPtr
std::shared_ptr< boost::asio::io_context > io_context_
std::shared_ptr< libp2p::basic::Scheduler > scheduler_
bool isConnected() const override
Reports connection status.
static constexpr auto kInitialReconnectTimeout
starting value for reconnection timeout in case line failure
const TelemetryEndpoint & endpoint() const override
To remind what was the endpoint ;)
std::size_t MessageHandle
void onHandshake(boost::beast::error_code ec)