Kagome
Polkadot Runtime Engine in C++17
offchain_worker_impl.cpp
Go to the documentation of this file.
1 
7 
8 #include <thread>
9 
10 #include <libp2p/host/host.hpp>
11 
14 #include "crypto/hasher.hpp"
19 
20 namespace kagome::offchain {
21 
23  const application::AppConfiguration &app_config,
24  std::shared_ptr<clock::SystemClock> clock,
25  std::shared_ptr<crypto::Hasher> hasher,
26  std::shared_ptr<storage::BufferStorage> storage,
27  std::shared_ptr<crypto::CSPRNG> random_generator,
28  std::shared_ptr<api::AuthorApi> author_api,
29  const network::OwnPeerInfo &current_peer_info,
30  std::shared_ptr<OffchainPersistentStorage> persistent_storage,
31  std::shared_ptr<runtime::Executor> executor,
32  const primitives::BlockHeader &header,
33  std::shared_ptr<OffchainWorkerPool> ocw_pool)
34  : app_config_(app_config),
35  clock_(std::move(clock)),
36  hasher_(std::move(hasher)),
37  random_generator_(std::move(random_generator)),
38  author_api_(std::move(author_api)),
39  current_peer_info_(current_peer_info),
40  persistent_storage_(std::move(persistent_storage)),
41  executor_(std::move(executor)),
42  header_(header),
43  ocw_pool_(std::move(ocw_pool)),
44  log_(log::createLogger(
45  "OffchainWorker#" + std::to_string(header_.number), "offchain")) {
46  BOOST_ASSERT(clock_);
47  BOOST_ASSERT(hasher_);
48  BOOST_ASSERT(storage);
49  BOOST_ASSERT(random_generator_);
50  BOOST_ASSERT(author_api_);
51  BOOST_ASSERT(persistent_storage_);
52  BOOST_ASSERT(executor_);
53  BOOST_ASSERT(ocw_pool_);
54 
55  auto hash = hasher_->blake2b_256(scale::encode(header_).value());
56  const_cast<primitives::BlockInfo &>(block_) =
58 
60  std::make_shared<OffchainLocalStorageImpl>(std::move(storage));
61  }
62 
63  outcome::result<void> OffchainWorkerImpl::run() {
64  BOOST_ASSERT(not ocw_pool_->getWorker());
65 
66  auto main_thread_func = [ocw = shared_from_this(), ocw_pool = ocw_pool_] {
67  soralog::util::setThreadName("ocw." + std::to_string(ocw->block_.number));
68 
69  ocw_pool->addWorker(ocw);
70 
71  SL_TRACE(
72  ocw->log_, "Offchain worker is started for block {}", ocw->block_);
73 
74  auto res = ocw->executor_->callAt<void>(
75  ocw->block_.hash, "OffchainWorkerApi_offchain_worker", ocw->header_);
76 
77  ocw_pool->removeWorker();
78 
79  if (res.has_failure()) {
80  SL_ERROR(ocw->log_,
81  "Can't execute offchain worker for block {}: {}",
82  ocw->block_,
83  res.error().message());
84  return;
85  }
86 
87  SL_DEBUG(ocw->log_,
88  "Offchain worker is successfully executed for block {}",
89  ocw->block_);
90  };
91 
92  try {
93  std::thread(std::move(main_thread_func)).detach();
94  } catch (const std::system_error &exception) {
95  return outcome::failure(exception.code());
96  } catch (...) {
97  BOOST_UNREACHABLE_RETURN({});
98  }
99 
100  return outcome::success();
101  }
102 
104  bool isValidator = app_config_.roles().flags.authority == 1;
105  return isValidator;
106  }
107 
109  const primitives::Extrinsic &ext) {
110  auto result =
111  author_api_->submitExtrinsic(primitives::TransactionSource::Local, ext);
112  if (result.has_value()) {
113  return Success();
114  }
115  return Failure();
116  }
117 
120  {current_peer_info_.addresses.begin(),
121  current_peer_info_.addresses.end()});
122 
123  return result;
124  }
125 
127  return std::chrono::duration_cast<std::chrono::milliseconds>(
128  clock_->now().time_since_epoch())
129  .count();
130  }
131 
133  auto ts = clock_->zero() + std::chrono::milliseconds(deadline);
134  SL_TRACE(log_,
135  "Falling asleep till {} (for {}ms)",
136  deadline,
137  std::chrono::duration_cast<std::chrono::milliseconds>(
138  ts - clock_->now())
139  .count());
140 
141  std::this_thread::sleep_until(decltype(clock_)::element_type::TimePoint(
142  std::chrono::milliseconds(deadline)));
143  SL_DEBUG(log_, "Woke up after sleeping");
144  }
145 
147  RandomSeed seed_bytes;
148  random_generator_->fillRandomly(seed_bytes);
149  return seed_bytes;
150  }
151 
153  StorageType storage_type) {
154  switch (storage_type) {
156  return *persistent_storage_;
157 
158  case StorageType::Local:
159  // TODO(xDimon):
160  // Need to implemented as soon as it will implemented in Substrate.
161  // Specification in not enough to implement it now.
162  // issue: https://github.com/soramitsu/kagome/issues/997
163  SL_WARN(
164  log_,
165  "Attempt to use off-chain local storage which unavailable yet.");
166  return *local_storage_;
167 
169  default:
170  BOOST_UNREACHABLE_RETURN({});
171  }
172  }
173 
175  const common::BufferView &key,
176  common::Buffer value) {
177  auto &storage = getStorage(storage_type);
178  auto result = storage.set(key, std::move(value));
179  if (result.has_error()) {
180  SL_WARN(log_, "Can't set value in storage: {}", result.error().message());
181  }
182  }
183 
185  const common::BufferView &key) {
186  auto &storage = getStorage(storage_type);
187  auto result = storage.clear(key);
188  if (result.has_error()) {
189  SL_WARN(
190  log_, "Can't clear value in storage: {}", result.error().message());
191  }
192  }
193 
195  StorageType storage_type,
196  const common::BufferView &key,
197  std::optional<common::BufferView> expected,
198  common::Buffer value) {
199  auto &storage = getStorage(storage_type);
200  auto result = storage.compare_and_set(key, expected, std::move(value));
201  if (result.has_error()) {
202  SL_WARN(log_,
203  "Can't compare-and-set value in storage: {}",
204  result.error().message());
205  return false;
206  }
207  return result.value();
208  }
209 
210  outcome::result<common::Buffer> OffchainWorkerImpl::localStorageGet(
211  StorageType storage_type, const common::BufferView &key) {
212  auto &storage = getStorage(storage_type);
213  auto result = storage.get(key);
214  if (result.has_error()
215  and result != outcome::failure(storage::DatabaseError::NOT_FOUND)) {
216  SL_WARN(log_, "Can't get value in storage: {}", result.error().message());
217  }
218  return result;
219  }
220 
222  HttpMethod method, std::string_view uri, common::Buffer meta) {
223  auto request_id = ++request_id_;
224 
225  auto request = std::make_shared<HttpRequest>(request_id);
226 
227  if (not request->init(method, uri, std::move(meta))) {
228  return Failure();
229  }
230 
231  auto is_emplaced =
232  active_http_requests_.emplace(request_id, std::move(request)).second;
233 
234  if (is_emplaced) {
235  return request_id;
236  }
237  return Failure();
238  }
239 
241  RequestId id, std::string_view name, std::string_view value) {
242  auto it = active_http_requests_.find(id);
243  if (it == active_http_requests_.end()) {
244  return Failure();
245  }
246  auto &request = it->second;
247 
248  request->addRequestHeader(name, value);
249 
250  return Success();
251  }
252 
254  RequestId id, common::Buffer chunk, std::optional<Timestamp> deadline) {
255  std::optional<std::chrono::milliseconds> timeout = std::nullopt;
256  if (deadline.has_value()) {
257  timeout = std::chrono::duration_cast<std::chrono::milliseconds>(
258  std::chrono::milliseconds(deadline.value())
259  - clock_->now().time_since_epoch());
260  }
261 
262  auto it = active_http_requests_.find(id);
263  if (it == active_http_requests_.end()) {
264  return HttpError::InvalidId;
265  }
266  auto &request = it->second;
267 
268  auto result = request->writeRequestBody(chunk, timeout);
269 
270  return result;
271  }
272 
273  std::vector<HttpStatus> OffchainWorkerImpl::httpResponseWait(
274  const std::vector<RequestId> &ids, std::optional<Timestamp> deadline) {
275  std::vector<HttpStatus> result;
276  result.reserve(ids.size());
277 
278  for (auto id : ids) {
279  auto it = active_http_requests_.find(id);
280  if (it == active_http_requests_.end()) {
281  result.push_back(InvalidIdentifier);
282  continue;
283  }
284  auto &request = it->second;
285 
286  HttpStatus status;
287  while ((status = request->status()) == 0) {
288  if (deadline.has_value()
289  and (clock_->zero() + std::chrono::milliseconds(deadline.value()))
290  < clock_->now()) {
291  break;
292  }
293  std::this_thread::sleep_for(latency_of_waiting);
294  }
295 
296  result.push_back(status ? status : DeadlineHasReached);
297  }
298 
299  return result;
300  }
301 
302  std::vector<std::pair<std::string, std::string>>
304  std::vector<std::pair<std::string, std::string>> result;
305  auto it = active_http_requests_.find(id);
306  if (it == active_http_requests_.end()) {
307  return result;
308  }
309  auto &request = it->second;
310 
311  result = request->getResponseHeaders();
312 
313  return result;
314  }
315 
317  RequestId id, common::Buffer &chunk, std::optional<Timestamp> deadline) {
318  auto it = active_http_requests_.find(id);
319  if (it == active_http_requests_.end()) {
320  return HttpError::InvalidId;
321  }
322  auto &request = it->second;
323 
324  std::optional<std::chrono::milliseconds> timeout = std::nullopt;
325  if (deadline.has_value()) {
326  timeout = std::chrono::duration_cast<std::chrono::milliseconds>(
327  std::chrono::milliseconds(deadline.value())
328  - clock_->now().time_since_epoch());
329  }
330 
331  auto result = request->readResponseBody(chunk, timeout);
332 
333  return result;
334  }
335 
337  std::vector<libp2p::peer::PeerId> nodes, bool authorized_only) {
338  // TODO(xDimon): Need to implement it
339  // issue: https://github.com/soramitsu/kagome/issues/998
340  throw std::runtime_error(
341  "This method of OffchainWorkerImpl is not implemented yet");
342  return;
343  }
344 
345 } // namespace kagome::offchain
OffchainStorage & getStorage(StorageType storage_type)
Clock::TimePoint TimePoint
Definition: common.hpp:30
Class represents arbitrary (including empty) byte buffer.
Definition: buffer.hpp:29
std::map< RequestId, std::shared_ptr< HttpRequest > > active_http_requests_
struct kagome::network::Roles::@11 flags
std::shared_ptr< api::AuthorApi > author_api_
bool localStorageCompareAndSet(StorageType storage_type, const common::BufferView &key, std::optional< common::BufferView > expected, common::Buffer value) override
Result< Success, HttpError > httpRequestWriteBody(RequestId id, common::Buffer chunk, std::optional< Timestamp > deadline) override
std::string_view to_string(SlotType s)
Definition: slot.hpp:22
outcome::result< void > run() override
void sleepUntil(Timestamp timestamp) override
std::shared_ptr< OffchainWorkerPool > ocw_pool_
STL namespace.
virtual network::Roles roles() const =0
const network::OwnPeerInfo & current_peer_info_
std::vector< HttpStatus > httpResponseWait(const std::vector< RequestId > &ids, std::optional< Timestamp > deadline) override
Result< Success, Failure > httpRequestAddHeader(RequestId id, std::string_view name, std::string_view value) override
OffchainWorkerImpl(const application::AppConfiguration &app_config, std::shared_ptr< clock::SystemClock > clock, std::shared_ptr< crypto::Hasher > hasher, std::shared_ptr< storage::BufferStorage > storage, std::shared_ptr< crypto::CSPRNG > random_generator, std::shared_ptr< api::AuthorApi > author_api, const network::OwnPeerInfo &current_peer_info, std::shared_ptr< OffchainPersistentStorage > persistent_storage, std::shared_ptr< runtime::Executor > executor, const primitives::BlockHeader &header, std::shared_ptr< OffchainWorkerPool > ocw_pool)
std::vector< std::pair< std::string, std::string > > httpResponseHeaders(RequestId id) override
Result< RequestId, Failure > httpRequestStart(HttpMethod method, std::string_view uri, common::Buffer meta) override
uint16_t HttpStatus
HTTP status codes that can get returned by certain Offchain funcs. 0: the specified request identifie...
Definition: types.hpp:46
const primitives::BlockHeader header_
Result< uint32_t, HttpError > httpResponseReadBody(RequestId id, common::Buffer &chunk, std::optional< Timestamp > deadline) override
constexpr HttpStatus InvalidIdentifier(0)
outcome::result< common::Buffer > localStorageGet(StorageType storage_type, const common::BufferView &key) override
std::shared_ptr< crypto::Hasher > hasher_
Result< Success, Failure > submitTransaction(const primitives::Extrinsic &ext) override
int16_t RequestId
Definition: types.hpp:29
Result< OpaqueNetworkState, Failure > networkState() override
void localStorageSet(StorageType storage_type, const common::BufferView &key, common::Buffer value) override
uint64_t Timestamp
Timestamp is milliseconds since UNIX Epoch.
Definition: types.hpp:21
The ID of the request is invalid.
std::shared_ptr< crypto::CSPRNG > random_generator_
std::shared_ptr< offchain::OffchainLocalStorage > local_storage_
BlockNumber number
index of the block in the chain
std::shared_ptr< clock::SystemClock > clock_
Extrinsic class represents extrinsic.
Definition: extrinsic.hpp:24
Logger createLogger(const std::string &tag)
Definition: logger.cpp:112
const application::AppConfiguration & app_config_
void localStorageClear(StorageType storage_type, const common::BufferView &key) override
void setAuthorizedNodes(std::vector< libp2p::peer::PeerId > nodes, bool authorized_only) override
std::shared_ptr< offchain::OffchainPersistentStorage > persistent_storage_
constexpr HttpStatus DeadlineHasReached(10)
std::shared_ptr< runtime::Executor > executor_