Kagome
Polkadot Runtime Engine in C++17
http_request.cpp
Go to the documentation of this file.
1 
7 
8 #include <algorithm>
9 #include <string>
10 #include <string_view>
11 #include <thread>
12 
13 #include <boost/asio/connect.hpp>
14 #include <boost/asio/ip/tcp.hpp>
15 #include <boost/asio/ssl/context.hpp>
16 #include <boost/asio/ssl/error.hpp>
17 #include <boost/asio/ssl/stream.hpp>
18 #include <boost/beast/http.hpp>
19 
20 namespace kagome::offchain {
21 
23  : id_(id),
24  resolver_(io_context_),
25  ssl_ctx_(boost::asio::ssl::context::sslv23),
26  deadline_timer_(io_context_),
27  log_(log::createLogger("HttpRequest#" + std::to_string(id_),
28  "offchain")) {
29  ssl_ctx_.set_default_verify_paths();
30  ssl_ctx_.set_verify_mode(boost::asio::ssl::verify_peer);
31  ssl_ctx_.set_verify_callback(
32  [log = log_, wp = weak_from_this()](
33  bool preverified, boost::asio::ssl::verify_context &ctx) {
34  // We will simply print the certificate's subject name here
35  char subject_name[256];
36  X509 *cert = X509_STORE_CTX_get_current_cert(ctx.native_handle());
37  X509_NAME_oneline(X509_get_subject_name(cert), subject_name, 256);
38  SL_WARN(log,
39  "Verifying [{}] was {}",
40  subject_name,
41  preverified ? "Successful" : "Failed");
42 
43  return preverified;
44  });
45  }
46 
48  std::string_view uri_arg,
49  common::Buffer meta) {
50  uri_ = common::Uri::parse(uri_arg);
51  if (uri_.error().has_value()) {
53  fmt::format("URI parsing was failed: {}", uri_.error().value());
54  SL_ERROR(log_, "{}", error_message_);
55  return false;
56  }
57  if (uri_.Schema != "https" and uri_.Schema != "http") {
58  error_message_ = fmt::format("URI has invalid schema: `{}`", uri_.Schema);
59  SL_ERROR(log_, "{}", error_message_);
60  return false;
61  }
62  if (not uri_.Port.empty()) {
63  if (int port = std::stoi(std::string(uri_.Port));
64  port <= 0 or port > 65536 or uri_.Port != std::to_string(port)) {
65  error_message_ = fmt::format("URI has invalid port: `{}`", uri_.Port);
66  SL_ERROR(log_, "{}", error_message_);
67  return false;
68  }
69  } else if (uri_.Schema == "https") {
70  uri_.Port = "443";
71  } else if (uri_.Schema == "http") {
72  uri_.Port = "80";
73  }
74  if (uri_.Host.empty()) {
75  error_message_ = "URI has empty host";
76  SL_ERROR(log_, "{}", error_message_);
77  return false;
78  }
79  if (uri_.Path.empty()) {
80  error_message_ = "URI has empty path";
81  SL_ERROR(log_, "{}", error_message_);
82  return false;
83  }
84 
85  if (uri_.Schema == "https") {
86  secure_ = true;
87  stream_ = std::make_unique<SslStream>(io_context_, ssl_ctx_);
88  } else if (uri_.Schema == "http") {
89  secure_ = false;
90  stream_ = std::make_unique<TcpStream>(io_context_);
91  }
92 
93  SL_DEBUG(log_, "Initialized for URL: {}", uri_.to_string());
94 
95  if (method == HttpMethod::Post) {
96  request_.method(boost::beast::http::verb::post);
97  } else if (method == HttpMethod::Get) {
98  request_.method(boost::beast::http::verb::get);
99  }
100  request_.target(std::string(uri_.Path));
101  request_.version(11); // HTTP/1.1
102  request_.set(boost::beast::http::field::host, uri_.Host);
103  request_.set(boost::beast::http::field::user_agent, "KagomeOffchainWorker");
104  request_.set(boost::beast::http::field::connection, "Close");
105 
106  resolve();
107  return true;
108  }
109 
111  if (status_ != 0) {
112  return;
113  }
114 
115  SL_TRACE(log_, "Resolve hostname {}", uri_.Host);
116 
117  if (secure_) {
118  auto &stream = *boost::relaxed_get<SslStreamPtr>(stream_);
119 
120  // Set SNI Hostname (many hosts need this to handshake successfully)
121  if (!SSL_set_tlsext_host_name(stream.native_handle(),
122  std::string(uri_.Host).c_str())) {
123  boost::beast::error_code ec{static_cast<int>(::ERR_get_error()),
124  boost::asio::error::get_ssl_category()};
125  error_message_ = fmt::format(
126  "Can't resolve hostname {}: {}", uri_.Host, ec.message());
127  SL_ERROR(log_, "{}", error_message_);
129  return;
130  }
131  }
132 
133  auto resolve_handler = [wp = weak_from_this()](const auto &ec, auto it) {
134  if (auto self = wp.lock()) {
135  if (self->status_ != 0) {
136  SL_TRACE(
137  self->log_, "Result of resolving is ignored: {}", self->status_);
138  return;
139  }
140 
141  if (!ec) {
142  SL_TRACE(self->log_, "Resolved hostname {}", self->uri_.Host);
143  self->resolver_iterator_ = it;
144  self->connect();
145  return;
146  }
147 
148  self->error_message_ = fmt::format(
149  "Can't resolve hostname {}: {}", self->uri_.Host, ec.message());
150  SL_ERROR(self->log_, "{}", self->error_message_);
151  self->status_ = ErrorHasOccurred;
152  }
153  };
154 
155  resolver_.async_resolve(uri_.Host, uri_.Port, std::move(resolve_handler));
156  }
157 
159  if (status_ != 0) {
160  return;
161  }
162 
163  SL_TRACE(log_,
164  "Connect to `{}` (addr={}:{})",
165  uri_.Host,
166  resolver_iterator_->endpoint().address().to_string(),
167  resolver_iterator_->endpoint().port());
168 
169  auto &stream = secure_ ? boost::beast::get_lowest_layer(
170  *boost::relaxed_get<SslStreamPtr>(stream_))
171  : boost::beast::get_lowest_layer(
172  *boost::relaxed_get<TcpStreamPtr>(stream_));
173 
174  auto connect_handler = [wp = weak_from_this()](const auto &ec, auto it) {
175  if (auto self = wp.lock()) {
176  if (self->status_ != 0) {
177  SL_TRACE(
178  self->log_, "Result of connecting is ignored: {}", self->status_);
179  return;
180  }
181 
182  if (!ec) {
183  SL_TRACE(self->log_, "Connection established");
184  if (self->secure_) {
185  self->handshake();
186  } else {
187  self->connected_ = true;
188  self->sendRequest();
189  }
190  return;
191  }
192 
193  SL_ERROR(self->log_, "Connection failed: {}", ec.message());
194 
195  // Try to connect next endpoint if any
196  if (++self->resolver_iterator_
197  != boost::asio::ip::tcp::resolver::iterator{}) {
198  SL_TRACE(self->log_, "Trying next endpoint...");
199  self->connect();
200  } else {
201  self->error_message_ =
202  fmt::format("Connection failed: {}", ec.message());
203  self->status_ = ErrorHasOccurred;
204  }
205  }
206  };
207 
208  stream.async_connect(resolver_iterator_, {}, std::move(connect_handler));
209  }
210 
212  if (status_ != 0) {
213  return;
214  }
215 
216  BOOST_ASSERT(secure_);
217 
218  auto &stream = *boost::relaxed_get<SslStreamPtr>(stream_);
219 
220  auto handshake_handler = [wp = weak_from_this()](const auto &ec) {
221  if (auto self = wp.lock()) {
222  if (self->status_ != 0) {
223  SL_TRACE(
224  self->log_, "Result of handshake is ignored: {}", self->status_);
225  return;
226  }
227 
228  if (!ec) {
229  SL_TRACE(self->log_, "Handshake successful");
230  self->connected_ = true;
231  self->sendRequest();
232  return;
233  }
234 
235  self->error_message_ =
236  fmt::format("Handshake failed: {}", ec.message());
237  SL_ERROR(self->log_, "{}", self->error_message_);
238  self->status_ = ErrorHasOccurred;
239  }
240  };
241 
242  stream.async_handshake(boost::asio::ssl::stream_base::client,
243  std::move(handshake_handler));
244  }
245 
247  if (status_ != 0) {
248  return;
249  }
250 
251  if (not request_is_ready_) {
252  SL_TRACE(log_, "Request not ready (body is not finalized)");
253  return;
254  }
255  if (not connected_) {
256  SL_TRACE(log_, "Request not ready (connection is not established)");
257  return;
258  }
259 
260  if (request_.method() == boost::beast::http::verb::post) {
261  request_.set(boost::beast::http::field::content_length,
262  std::to_string(request_.body().size()));
263  }
264 
265  auto serializer = std::make_shared<boost::beast::http::request_serializer<
266  boost::beast::http::string_body>>(request_);
267 
268  auto write_handler = [wp = weak_from_this(), serializer](const auto &ec,
269  auto written) {
270  if (auto self = wp.lock()) {
271  if (self->status_ != 0) {
272  SL_TRACE(self->log_,
273  "Result of request sending is ignored: {}",
274  self->status_);
275  return;
276  }
277 
278  if (!ec) {
279  SL_TRACE(self->log_, "Request has sent successful");
280  self->recvResponse();
281  return;
282  }
283 
284  self->error_message_ =
285  fmt::format("Request send was fail: {}", ec.message());
286  SL_ERROR(self->log_, "{}", self->error_message_);
287  self->status_ = ErrorHasOccurred;
288  }
289  };
290 
291  // Send the HTTP request to the remote host
292  if (secure_) {
293  auto &stream = *boost::relaxed_get<SslStreamPtr>(stream_);
294  boost::beast::http::async_write(
295  stream, *serializer, std::move(write_handler));
296  } else {
297  auto &stream = *boost::relaxed_get<TcpStreamPtr>(stream_);
298  boost::beast::http::async_write(
299  stream, *serializer, std::move(write_handler));
300  }
301  }
302 
304  if (status_ != 0) {
305  return;
306  }
307 
308  SL_TRACE(log_, "Read response");
309 
310  auto read_handler = [wp = weak_from_this()](const auto &ec, auto received) {
311  if (auto self = wp.lock()) {
312  if (self->status_ != 0) {
313  SL_TRACE(self->log_,
314  "Result of response receiving is ignored: {}",
315  self->status_);
316  return;
317  }
318 
319  if (!ec) {
320  SL_TRACE(self->log_, "Response has received successful", received);
321  self->done();
322  return;
323  }
324 
325  self->error_message_ =
326  fmt::format("Response reception has failed: {}", ec.message());
327  SL_ERROR(self->log_, "{}", self->error_message_);
328  self->status_ = ErrorHasOccurred;
329  }
330  };
331 
332  if (secure_) {
333  auto &stream = *boost::relaxed_get<SslStreamPtr>(stream_);
334  boost::system::error_code ec;
335  boost::beast::get_lowest_layer(stream).socket().shutdown(
336  boost::asio::ip::tcp::socket::shutdown_send, ec);
337  boost::beast::http::async_read(
338  stream, buffer_, parser_, std::move(read_handler));
339  } else {
340  auto &stream = *boost::relaxed_get<TcpStreamPtr>(stream_);
341  boost::system::error_code ec;
342  boost::beast::get_lowest_layer(stream).socket().shutdown(
343  boost::asio::ip::tcp::socket::shutdown_send, ec);
344  boost::beast::http::async_read(
345  stream, buffer_, parser_, std::move(read_handler));
346  }
347  }
348 
350  if (status_ != 0) {
351  return;
352  }
353 
354  if (secure_) {
355  auto &stream = *boost::relaxed_get<SslStreamPtr>(stream_);
356  boost::system::error_code ec;
357  boost::beast::get_lowest_layer(stream).socket().shutdown(
358  boost::asio::ip::tcp::socket::shutdown_both, ec);
359  } else {
360  auto &stream = *boost::relaxed_get<TcpStreamPtr>(stream_);
361  boost::system::error_code ec;
362  boost::beast::get_lowest_layer(stream).socket().shutdown(
363  boost::asio::ip::tcp::socket::shutdown_both, ec);
364  }
365 
366  response_ = parser_.release();
367 
368  status_ = response_.result_int();
369  }
370 
372  return id_;
373  }
374 
376  return status_;
377  }
378 
380  std::string_view name, std::string_view value) {
381  if (not adding_headers_is_allowed_) {
382  error_message_ = "Trying to add header into ready request";
383  SL_ERROR(log_, "{}", error_message_);
384  return Failure();
385  }
386 
387  request_.insert(boost::string_view(name.begin(), name.size()), value);
388 
389  return Success();
390  }
391 
393  const common::Buffer &chunk,
394  std::optional<std::chrono::milliseconds> deadline_opt) {
395  if (request_has_sent_) {
396  error_message_ = "Trying to write body into ready request";
397  SL_ERROR(log_, "{}", error_message_);
398  return HttpError::IoError;
399  }
400 
403  }
404 
405  if (chunk.empty()) {
406  request_has_sent_ = true;
407  request_is_ready_ = true;
408  sendRequest();
409  if (deadline_opt.has_value()) {
410  auto &deadline = deadline_opt.value();
411  io_context_.run_for(deadline);
412  if (status_ == 0) {
413  error_message_ = "Deadline has reached";
415  }
416  } else {
417  io_context_.run();
418  }
419 
420  } else {
421  request_.body().append(reinterpret_cast<const char *>(chunk.data()),
422  chunk.size());
423  }
424 
426  }
427 
428  std::vector<std::pair<std::string, std::string>>
430  std::vector<std::pair<std::string, std::string>> result;
431  for (auto &header : response_) {
432  result.emplace_back(std::pair(header.name_string(), header.value()));
433  }
434 
435  return result;
436  }
437 
439  common::Buffer &chunk,
440  std::optional<std::chrono::milliseconds> deadline) {
441  switch (status_) {
442  case InvalidIdentifier:
443  error_message_ = "Invalid identifier";
444  return HttpError::InvalidId;
445  case DeadlineHasReached:
446  error_message_ = "Deadline has reached";
447  return HttpError::Timeout;
448  case ErrorHasOccurred:
449  error_message_ = "IO error happened";
450  return HttpError::IoError;
451  }
452 
453  auto amount = std::min(response_.body().size(), chunk.size());
454 
455  std::copy_n(response_.body().begin(), amount, chunk.begin());
456 
457  return amount;
458  }
459 } // namespace kagome::offchain
Class represents arbitrary (including empty) byte buffer.
Definition: buffer.hpp:29
boost::beast::http::response< boost::beast::http::string_body > response_
std::string to_string() const
Definition: uri.cpp:12
std::string_view to_string(SlotType s)
Definition: slot.hpp:22
The deadline was reached.
boost::beast::http::response_parser< boost::beast::http::string_body > parser_
STL namespace.
HttpRequest(HttpRequest &&) noexcept=delete
HttpStatus status() const
bool init(HttpMethod method, std::string_view uri, common::Buffer meta)
std::string Schema
Definition: uri.hpp:17
boost::asio::io_context io_context_
uint16_t HttpStatus
HTTP status codes that can get returned by certain Offchain funcs. 0: the specified request identifie...
Definition: types.hpp:46
There was an IO error while processing the request.
static Uri parse(std::string_view uri)
Definition: uri.cpp:42
std::vector< std::pair< std::string, std::string > > getResponseHeaders() const
constexpr HttpStatus InvalidIdentifier(0)
boost::beast::flat_buffer buffer_
Result< Success, Failure > addRequestHeader(std::string_view name, std::string_view value)
Result< uint32_t, HttpError > readResponseBody(common::Buffer &chunk, std::optional< std::chrono::milliseconds > deadline)
std::string Host
Definition: uri.hpp:18
int16_t RequestId
Definition: types.hpp:29
constexpr HttpStatus ErrorHasOccurred(20)
boost::asio::ip::tcp::resolver resolver_
boost::asio::ip::tcp::resolver::iterator resolver_iterator_
The ID of the request is invalid.
Result< Success, HttpError > writeRequestBody(const common::Buffer &chunk, std::optional< std::chrono::milliseconds > deadline_opt)
const std::optional< std::string_view > & error() const
Definition: uri.hpp:28
Logger createLogger(const std::string &tag)
Definition: logger.cpp:112
boost::asio::ssl::context ssl_ctx_
boost::variant< TcpStreamPtr, SslStreamPtr > stream_
std::string Port
Definition: uri.hpp:19
boost::beast::http::request< boost::beast::http::string_body > request_
constexpr HttpStatus DeadlineHasReached(10)
std::string Path
Definition: uri.hpp:20