Kagome
Polkadot Runtime Engine in C++17
connection_impl.cpp
Go to the documentation of this file.
1 
7 
8 #include <openssl/tls1.h>
9 
10 namespace kagome::telemetry {
11 
12  std::size_t TelemetryConnectionImpl::instance_ = 0;
13 
15  std::shared_ptr<boost::asio::io_context> io_context,
16  const TelemetryEndpoint &endpoint,
17  OnConnectedCallback callback,
18  std::shared_ptr<MessagePool> message_pool,
19  std::shared_ptr<libp2p::basic::Scheduler> scheduler)
20  : io_context_{std::move(io_context)},
22  callback_{std::move(callback)},
23  message_pool_{std::move(message_pool)},
24  scheduler_{std::move(scheduler)},
25  ssl_ctx_{boost::asio::ssl::context::sslv23},
26  resolver_{boost::asio::make_strand(*io_context_)} {
27  BOOST_ASSERT(io_context_);
28  BOOST_ASSERT(message_pool_);
29  BOOST_ASSERT(scheduler_);
30  auto instance_number = std::to_string(++instance_);
31  queue_.set_capacity(message_pool_->capacity());
32  log_ = log::createLogger("TelemetryConnection#" + instance_number,
33  "telemetry");
34  }
35 
37  shutdown_requested_ = false;
38  is_connected_ = false;
39 
40  // immediate return in case of empty host value
41  if (endpoint_.uri().Host.empty()) {
42  SL_ERROR(log_,
43  "Host cannot be empty for telemetry endpoint {}",
44  endpoint_.uri().to_string());
45  return;
46  }
47 
48  // setup defaults basing on URI schema
49  auto &schema = endpoint_.uri().Schema;
50  if (schema == "ws") {
51  secure_ = false;
52  port_ = 80;
53  } else if (schema == "wss") {
54  secure_ = true;
55  port_ = 443;
56  } else {
57  SL_ERROR(log_,
58  "Unsupported schema '{}' passed for telemetry endpoint {}",
59  schema,
60  endpoint_.uri().to_string());
61  return;
62  }
63 
64  // parse custom defined port value if any
65  auto &port = endpoint_.uri().Port;
66  if (not port.empty()) {
67  try {
68  auto port_int = std::stoi(port);
69  if (port_int < 0 or port_int > 65536) {
70  throw std::out_of_range("");
71  }
72  port_ = static_cast<uint16_t>(port_int);
73  } catch (...) {
74  // only std::invalid_argument or std::out_of_range are possible
75  SL_ERROR(log_,
76  "Specified port value is not valid for endpoint {}",
77  endpoint_.uri().to_string());
78  return;
79  }
80  }
81 
82  auto &path = endpoint_.uri().Path;
83  path_ = path.empty() ? "/" : path;
84 
85  if (secure_) {
86  ws_ = std::make_unique<WsSslStream>(
87  boost::asio::make_strand(*io_context_), ssl_ctx_);
88  } else {
89  ws_ =
90  std::make_unique<WsTcpStream>(boost::asio::make_strand(*io_context_));
91  }
92 
93  SL_DEBUG(log_, "Connecting to endpoint {}", endpoint_.uri().to_string());
94  resolver_.async_resolve(
95  endpoint_.uri().Host,
97  boost::beast::bind_front_handler(&TelemetryConnectionImpl::onResolve,
98  shared_from_this()));
99  }
100 
102  return endpoint_;
103  }
104 
105  void TelemetryConnectionImpl::send(const std::string &data) {
106  if (not is_connected_) {
107  return;
108  }
109  auto push = message_pool_->push(data, 1);
110  if (not push) {
111  return;
112  }
113  send(*push);
114  }
115 
117  if (not is_connected_) {
118  message_pool_->release(message_handle);
119  return;
120  }
121  if (busy_) {
122  if (queue_.full()) {
123  message_pool_->release(message_handle);
124  return;
125  }
126  queue_.push_back(message_handle);
127  } else {
128  busy_ = true;
129  if (secure_) {
130  write(*boost::relaxed_get<WsSslStreamPtr>(ws_), message_handle);
131  } else {
132  write(*boost::relaxed_get<WsTcpStreamPtr>(ws_), message_handle);
133  }
134  }
135  }
136 
138  return is_connected_;
139  }
140 
142  shutdown_requested_ = true;
143  if (is_connected_) {
144  close();
145  }
146  }
147 
148  boost::beast::lowest_layer_type<TelemetryConnectionImpl::SslStream>
150  return secure_ ? boost::beast::get_lowest_layer(
151  *boost::relaxed_get<WsSslStreamPtr>(ws_))
152  : boost::beast::get_lowest_layer(
153  *boost::relaxed_get<WsTcpStreamPtr>(ws_));
154  }
155 
156  template <typename WsStreamT>
157  void TelemetryConnectionImpl::write(WsStreamT &ws,
158  MessageHandle message_handle) {
159  auto write_handler = [self{shared_from_this()}, message_handle, &ws](
160  boost::beast::error_code ec,
161  std::size_t bytes_transferred) {
162  boost::ignore_unused(bytes_transferred);
163  self->message_pool_->release(message_handle);
164  if (ec) {
165  self->is_connected_ = false;
166  self->busy_ = false;
167  self->releaseQueue();
168  SL_ERROR(self->log_,
169  "Unable to send data through websocket: {}",
170  ec.message());
171  self->reconnect();
172  return;
173  }
174  if (self->queue_.empty()) {
175  self->busy_ = false;
176  } else {
177  auto next = self->queue_.front();
178  self->queue_.pop_front();
179  self->write(ws, next);
180  }
181  };
182 
183  ws.async_write((*message_pool_)[message_handle], write_handler);
184  }
185 
187  boost::beast::error_code ec,
188  boost::asio::ip::tcp::resolver::results_type results) {
189  if (ec) {
190  SL_ERROR(log_, "Unable to resolve host: {}", ec.message());
191  reconnect();
192  return;
193  }
194 
195  stream_lowest_layer().expires_after(kConnectionTimeout);
196  stream_lowest_layer().async_connect(
197  results,
198  boost::beast::bind_front_handler(&TelemetryConnectionImpl::onConnect,
199  shared_from_this()));
200  }
201 
203  boost::beast::error_code ec,
204  boost::asio::ip::tcp::resolver::results_type::endpoint_type endpoint) {
205  if (ec) {
206  SL_ERROR(log_, "Unable to connect to endpoint: {}", ec.message());
207  reconnect();
208  return;
209  }
210 
211  if (secure_) {
212  auto &ws = boost::relaxed_get<WsSslStreamPtr>(ws_);
213  if (not SSL_set_tlsext_host_name(ws->next_layer().native_handle(),
214  endpoint_.uri().Host.c_str())) {
215  ec = boost::beast::error_code(static_cast<int>(::ERR_get_error()),
216  boost::asio::error::get_ssl_category());
217  reconnect();
218  SL_ERROR(log_, "Unable to set SNI hostname: {}", ec.message());
219  }
220  }
221 
223  endpoint_.uri().Host + ":" + std::to_string(endpoint.port());
224 
225  if (secure_) {
226  auto &ws = *boost::relaxed_get<WsSslStreamPtr>(ws_);
227  ws.next_layer().async_handshake(
228  boost::asio::ssl::stream_base::client,
229  boost::beast::bind_front_handler(
230  &TelemetryConnectionImpl::onSslHandshake, shared_from_this()));
231  } else {
232  setOptionsAndRunWsHandshake(*boost::relaxed_get<WsTcpStreamPtr>(ws_));
233  }
234  }
235 
236  template <typename WsStreamT>
238  stream_lowest_layer().expires_never();
239 
240  ws.set_option(boost::beast::websocket::stream_base::timeout::suggested(
241  boost::beast::role_type::client));
242  // Set a decorator to change the User-Agent of the handshake
243  ws.set_option(boost::beast::websocket::stream_base::decorator(
244  [](boost::beast::websocket::request_type &req) {
245  req.set(boost::beast::http::field::user_agent,
246  std::string(BOOST_BEAST_VERSION_STRING)
247  + " Kagome Telemetry Reporter");
248  }));
249 
250  ws.async_handshake(
252  path_,
253  boost::beast::bind_front_handler(&TelemetryConnectionImpl::onHandshake,
254  shared_from_this()));
255  }
256 
257  void TelemetryConnectionImpl::onSslHandshake(boost::beast::error_code ec) {
258  if (ec) {
259  SL_ERROR(log_, "Unable to perform SSL handshake: {}", ec.message());
260  reconnect();
261  return;
262  }
263  BOOST_VERIFY(secure_);
264  setOptionsAndRunWsHandshake(*boost::relaxed_get<WsSslStreamPtr>(ws_));
265  }
266 
267  void TelemetryConnectionImpl::onHandshake(boost::beast::error_code ec) {
268  if (ec) {
269  SL_ERROR(log_, "Websocket handshake failed: {}", ec.message());
270  reconnect();
271  return;
272  }
273  if (shutdown_requested_) {
274  close();
275  return;
276  }
277  is_connected_ = true;
278  SL_INFO(log_, "Connection established");
280  callback_(shared_from_this());
281  }
282 
284  for (auto handle : queue_) {
285  message_pool_->release(handle);
286  }
287  queue_.clear();
288  }
289 
291  is_connected_ = false;
292  if (secure_) {
293  auto &ws = *boost::relaxed_get<WsSslStreamPtr>(ws_);
294  ws.async_close(boost::beast::websocket::close_code::normal, [](auto) {});
295  } else {
296  auto &ws = *boost::relaxed_get<WsTcpStreamPtr>(ws_);
297  ws.async_close(boost::beast::websocket::close_code::normal, [](auto) {});
298  }
299  }
300 
303  return;
304  }
305  SL_DEBUG(
306  log_, "Trying to reconnect in {} seconds", reconnect_timeout_.count());
307  scheduler_->schedule([self{shared_from_this()}] { self->connect(); },
311  }
312  }
313 
314 } // namespace kagome::telemetry
void send(const std::string &data) override
std::string to_string() const
Definition: uri.cpp:12
std::string_view to_string(SlotType s)
Definition: slot.hpp:22
void onSslHandshake(boost::beast::error_code ec)
TelemetryConnectionImpl(std::shared_ptr< boost::asio::io_context > io_context, const TelemetryEndpoint &endpoint, OnConnectedCallback callback, std::shared_ptr< MessagePool > message_pool, std::shared_ptr< libp2p::basic::Scheduler > scheduler)
static constexpr auto kReconnectTimeoutIncrement
an addition to to reconnect timeout after failed attempt
void write(WsStreamT &ws, MessageHandle message_handle)
std::shared_ptr< MessagePool > message_pool_
static constexpr auto kConnectionTimeout
operations&#39; timeout during websocket connection establishing
boost::asio::ip::tcp::resolver resolver_
boost::variant< WsTcpStreamPtr, WsSslStreamPtr > ws_
std::string Schema
Definition: uri.hpp:17
void onConnect(boost::beast::error_code ec, boost::asio::ip::tcp::resolver::results_type::endpoint_type endpoint)
void shutdown() override
Request graceful connection shutdown.
void connect() override
Initiate connection process.
boost::beast::lowest_layer_type< SslStream > & stream_lowest_layer()
void onResolve(boost::beast::error_code ec, boost::asio::ip::tcp::resolver::results_type results)
std::string Host
Definition: uri.hpp:18
static constexpr auto kMaxReconnectTimeout
maximum reconnect timeout value despite reconnect attempts number
boost::circular_buffer< MessageHandle > queue_
std::shared_ptr< boost::asio::io_context > io_context_
std::shared_ptr< libp2p::basic::Scheduler > scheduler_
const common::Uri & uri() const
Definition: endpoint.hpp:22
bool isConnected() const override
Reports connection status.
static constexpr auto kInitialReconnectTimeout
starting value for reconnection timeout in case line failure
const TelemetryEndpoint & endpoint() const override
To remind what was the endpoint ;)
std::size_t MessageHandle
Logger createLogger(const std::string &tag)
Definition: logger.cpp:112
std::string Port
Definition: uri.hpp:19
std::string Path
Definition: uri.hpp:20
void onHandshake(boost::beast::error_code ec)