6 #ifndef KAGOME_TELEMETRY_CONNECTION_IMPL_HPP 7 #define KAGOME_TELEMETRY_CONNECTION_IMPL_HPP 15 #include <boost/asio.hpp> 16 #include <boost/asio/ssl.hpp> 17 #include <boost/asio/strand.hpp> 18 #include <boost/beast/core.hpp> 19 #include <boost/beast/ssl.hpp> 20 #include <boost/beast/websocket.hpp> 21 #include <boost/beast/websocket/ssl.hpp> 22 #include <boost/circular_buffer.hpp> 23 #include <boost/variant.hpp> 24 #include <libp2p/basic/scheduler.hpp> 45 public std::enable_shared_from_this<TelemetryConnectionImpl> {
56 std::shared_ptr<boost::asio::io_context> io_context,
59 std::shared_ptr<MessagePool> message_pool,
60 std::shared_ptr<libp2p::basic::Scheduler> scheduler);
74 void send(
const std::string &data)
override;
90 using SslStream = boost::beast::ssl_stream<TcpStream>;
92 using WsStream = boost::beast::websocket::stream<T>;
100 template <
typename WsStreamT>
103 void onResolve(boost::beast::error_code ec,
104 boost::asio::ip::tcp::resolver::results_type results);
107 boost::beast::error_code ec,
108 boost::asio::ip::tcp::resolver::results_type::endpoint_type endpoint);
110 template <
typename WsStreamT>
140 boost::variant<WsTcpStreamPtr, WsSslStreamPtr>
ws_;
144 boost::circular_buffer<MessageHandle>
queue_;
149 #endif // KAGOME_TELEMETRY_CONNECTION_IMPL_HPP
std::unique_ptr< WsTcpStream > WsTcpStreamPtr
void send(const std::string &data) override
boost::beast::ssl_stream< TcpStream > SslStream
WsStream< SslStream > WsSslStream
void onSslHandshake(boost::beast::error_code ec)
TelemetryConnectionImpl(std::shared_ptr< boost::asio::io_context > io_context, const TelemetryEndpoint &endpoint, OnConnectedCallback callback, std::shared_ptr< MessagePool > message_pool, std::shared_ptr< libp2p::basic::Scheduler > scheduler)
static constexpr auto kReconnectTimeoutIncrement
an addition to to reconnect timeout after failed attempt
void write(WsStreamT &ws, MessageHandle message_handle)
std::shared_ptr< MessagePool > message_pool_
static constexpr auto kConnectionTimeout
operations' timeout during websocket connection establishing
boost::asio::ip::tcp::resolver resolver_
boost::variant< WsTcpStreamPtr, WsSslStreamPtr > ws_
void onConnect(boost::beast::error_code ec, boost::asio::ip::tcp::resolver::results_type::endpoint_type endpoint)
std::function< void(std::shared_ptr< TelemetryConnection >)> OnConnectedCallback
void shutdown() override
Request graceful connection shutdown.
void connect() override
Initiate connection process.
boost::beast::lowest_layer_type< SslStream > & stream_lowest_layer()
void onResolve(boost::beast::error_code ec, boost::asio::ip::tcp::resolver::results_type results)
void setOptionsAndRunWsHandshake(WsStreamT &ws)
static std::size_t instance_
WsStream< TcpStream > WsTcpStream
std::shared_ptr< soralog::Logger > Logger
std::chrono::seconds reconnect_timeout_
boost::beast::websocket::stream< T > WsStream
boost::asio::ssl::context ssl_ctx_
static constexpr auto kMaxReconnectTimeout
maximum reconnect timeout value despite reconnect attempts number
boost::circular_buffer< MessageHandle > queue_
std::string ws_handshake_hostname_
const TelemetryEndpoint endpoint_
std::unique_ptr< WsSslStream > WsSslStreamPtr
std::shared_ptr< boost::asio::io_context > io_context_
std::shared_ptr< libp2p::basic::Scheduler > scheduler_
bool isConnected() const override
Reports connection status.
static constexpr auto kInitialReconnectTimeout
starting value for reconnection timeout in case line failure
boost::beast::tcp_stream TcpStream
const TelemetryEndpoint & endpoint() const override
To remind what was the endpoint ;)
std::size_t MessageHandle
OnConnectedCallback callback_
void onHandshake(boost::beast::error_code ec)