8 #include <openssl/tls1.h> 15 std::shared_ptr<boost::asio::io_context> io_context,
17 OnConnectedCallback callback,
18 std::shared_ptr<MessagePool> message_pool,
19 std::shared_ptr<libp2p::basic::Scheduler> scheduler)
20 : io_context_{std::move(io_context)},
25 ssl_ctx_{boost::asio::ssl::context::sslv23},
43 "Host cannot be empty for telemetry endpoint {}",
53 }
else if (schema ==
"wss") {
58 "Unsupported schema '{}' passed for telemetry endpoint {}",
66 if (not port.empty()) {
68 auto port_int = std::stoi(port);
69 if (port_int < 0 or port_int > 65536) {
70 throw std::out_of_range(
"");
72 port_ =
static_cast<uint16_t
>(port_int);
76 "Specified port value is not valid for endpoint {}",
83 path_ = path.empty() ?
"/" : path;
86 ws_ = std::make_unique<WsSslStream>(
90 std::make_unique<WsTcpStream>(boost::asio::make_strand(*
io_context_));
126 queue_.push_back(message_handle);
130 write(*boost::relaxed_get<WsSslStreamPtr>(
ws_), message_handle);
132 write(*boost::relaxed_get<WsTcpStreamPtr>(
ws_), message_handle);
148 boost::beast::lowest_layer_type<TelemetryConnectionImpl::SslStream>
150 return secure_ ? boost::beast::get_lowest_layer(
151 *boost::relaxed_get<WsSslStreamPtr>(
ws_))
152 : boost::beast::get_lowest_layer(
153 *boost::relaxed_get<WsTcpStreamPtr>(
ws_));
156 template <
typename WsStreamT>
159 auto write_handler = [
self{shared_from_this()}, message_handle, &ws](
160 boost::beast::error_code ec,
161 std::size_t bytes_transferred) {
162 boost::ignore_unused(bytes_transferred);
163 self->message_pool_->release(message_handle);
165 self->is_connected_ =
false;
167 self->releaseQueue();
169 "Unable to send data through websocket: {}",
174 if (self->queue_.empty()) {
177 auto next =
self->queue_.front();
178 self->queue_.pop_front();
179 self->write(ws, next);
183 ws.async_write((*
message_pool_)[message_handle], write_handler);
187 boost::beast::error_code ec,
188 boost::asio::ip::tcp::resolver::results_type results) {
190 SL_ERROR(
log_,
"Unable to resolve host: {}", ec.message());
199 shared_from_this()));
203 boost::beast::error_code ec,
204 boost::asio::ip::tcp::resolver::results_type::endpoint_type
endpoint) {
206 SL_ERROR(
log_,
"Unable to connect to endpoint: {}", ec.message());
212 auto &ws = boost::relaxed_get<WsSslStreamPtr>(
ws_);
213 if (not SSL_set_tlsext_host_name(ws->next_layer().native_handle(),
215 ec = boost::beast::error_code(static_cast<int>(::ERR_get_error()),
216 boost::asio::error::get_ssl_category());
218 SL_ERROR(
log_,
"Unable to set SNI hostname: {}", ec.message());
226 auto &ws = *boost::relaxed_get<WsSslStreamPtr>(
ws_);
227 ws.next_layer().async_handshake(
228 boost::asio::ssl::stream_base::client,
229 boost::beast::bind_front_handler(
236 template <
typename WsStreamT>
240 ws.set_option(boost::beast::websocket::stream_base::timeout::suggested(
241 boost::beast::role_type::client));
243 ws.set_option(boost::beast::websocket::stream_base::decorator(
244 [](boost::beast::websocket::request_type &req) {
245 req.set(boost::beast::http::field::user_agent,
246 std::string(BOOST_BEAST_VERSION_STRING)
247 +
" Kagome Telemetry Reporter");
254 shared_from_this()));
259 SL_ERROR(
log_,
"Unable to perform SSL handshake: {}", ec.message());
269 SL_ERROR(
log_,
"Websocket handshake failed: {}", ec.message());
278 SL_INFO(
log_,
"Connection established");
284 for (
auto handle :
queue_) {
293 auto &ws = *boost::relaxed_get<WsSslStreamPtr>(
ws_);
294 ws.async_close(boost::beast::websocket::close_code::normal, [](
auto) {});
296 auto &ws = *boost::relaxed_get<WsTcpStreamPtr>(
ws_);
297 ws.async_close(boost::beast::websocket::close_code::normal, [](
auto) {});
307 scheduler_->schedule([
self{shared_from_this()}] {
self->connect(); },
void send(const std::string &data) override
std::string to_string() const
std::string_view to_string(SlotType s)
void onSslHandshake(boost::beast::error_code ec)
TelemetryConnectionImpl(std::shared_ptr< boost::asio::io_context > io_context, const TelemetryEndpoint &endpoint, OnConnectedCallback callback, std::shared_ptr< MessagePool > message_pool, std::shared_ptr< libp2p::basic::Scheduler > scheduler)
static constexpr auto kReconnectTimeoutIncrement
an addition to to reconnect timeout after failed attempt
void write(WsStreamT &ws, MessageHandle message_handle)
std::shared_ptr< MessagePool > message_pool_
static constexpr auto kConnectionTimeout
operations' timeout during websocket connection establishing
boost::asio::ip::tcp::resolver resolver_
boost::variant< WsTcpStreamPtr, WsSslStreamPtr > ws_
void onConnect(boost::beast::error_code ec, boost::asio::ip::tcp::resolver::results_type::endpoint_type endpoint)
void shutdown() override
Request graceful connection shutdown.
void connect() override
Initiate connection process.
boost::beast::lowest_layer_type< SslStream > & stream_lowest_layer()
void onResolve(boost::beast::error_code ec, boost::asio::ip::tcp::resolver::results_type results)
void setOptionsAndRunWsHandshake(WsStreamT &ws)
static std::size_t instance_
std::chrono::seconds reconnect_timeout_
boost::asio::ssl::context ssl_ctx_
static constexpr auto kMaxReconnectTimeout
maximum reconnect timeout value despite reconnect attempts number
boost::circular_buffer< MessageHandle > queue_
std::string ws_handshake_hostname_
const TelemetryEndpoint endpoint_
std::shared_ptr< boost::asio::io_context > io_context_
std::shared_ptr< libp2p::basic::Scheduler > scheduler_
const common::Uri & uri() const
bool isConnected() const override
Reports connection status.
static constexpr auto kInitialReconnectTimeout
starting value for reconnection timeout in case line failure
const TelemetryEndpoint & endpoint() const override
To remind what was the endpoint ;)
std::size_t MessageHandle
Logger createLogger(const std::string &tag)
OnConnectedCallback callback_
void onHandshake(boost::beast::error_code ec)