10 #include <libp2p/host/host.hpp> 24 std::shared_ptr<clock::SystemClock> clock,
25 std::shared_ptr<crypto::Hasher> hasher,
26 std::shared_ptr<storage::BufferStorage> storage,
27 std::shared_ptr<crypto::CSPRNG> random_generator,
28 std::shared_ptr<api::AuthorApi> author_api,
30 std::shared_ptr<OffchainPersistentStorage> persistent_storage,
31 std::shared_ptr<runtime::Executor> executor,
33 std::shared_ptr<OffchainWorkerPool> ocw_pool)
34 : app_config_(app_config),
35 clock_(
std::move(clock)),
36 hasher_(
std::move(hasher)),
37 random_generator_(
std::move(random_generator)),
38 author_api_(
std::move(author_api)),
39 current_peer_info_(current_peer_info),
40 persistent_storage_(
std::move(persistent_storage)),
41 executor_(
std::move(executor)),
43 ocw_pool_(
std::move(ocw_pool)),
45 "OffchainWorker#" +
std::
to_string(header_.number),
"offchain")) {
48 BOOST_ASSERT(storage);
60 std::make_shared<OffchainLocalStorageImpl>(std::move(storage));
66 auto main_thread_func = [ocw = shared_from_this(), ocw_pool =
ocw_pool_] {
67 soralog::util::setThreadName(
"ocw." +
std::to_string(ocw->block_.number));
69 ocw_pool->addWorker(ocw);
72 ocw->log_,
"Offchain worker is started for block {}", ocw->block_);
74 auto res = ocw->executor_->callAt<
void>(
75 ocw->block_.hash,
"OffchainWorkerApi_offchain_worker", ocw->header_);
77 ocw_pool->removeWorker();
79 if (res.has_failure()) {
81 "Can't execute offchain worker for block {}: {}",
83 res.error().message());
88 "Offchain worker is successfully executed for block {}",
93 std::thread(std::move(main_thread_func)).detach();
94 }
catch (
const std::system_error &exception) {
95 return outcome::failure(exception.code());
97 BOOST_UNREACHABLE_RETURN({});
100 return outcome::success();
112 if (result.has_value()) {
120 {current_peer_info_.addresses.begin(),
121 current_peer_info_.addresses.end()});
127 return std::chrono::duration_cast<std::chrono::milliseconds>(
128 clock_->now().time_since_epoch())
133 auto ts =
clock_->zero() + std::chrono::milliseconds(deadline);
135 "Falling asleep till {} (for {}ms)",
137 std::chrono::duration_cast<std::chrono::milliseconds>(
142 std::chrono::milliseconds(deadline)));
143 SL_DEBUG(
log_,
"Woke up after sleeping");
154 switch (storage_type) {
165 "Attempt to use off-chain local storage which unavailable yet.");
170 BOOST_UNREACHABLE_RETURN({});
178 auto result = storage.set(key, std::move(value));
179 if (result.has_error()) {
180 SL_WARN(
log_,
"Can't set value in storage: {}", result.error().message());
187 auto result = storage.clear(key);
188 if (result.has_error()) {
190 log_,
"Can't clear value in storage: {}", result.error().message());
197 std::optional<common::BufferView> expected,
200 auto result = storage.compare_and_set(key, expected, std::move(value));
201 if (result.has_error()) {
203 "Can't compare-and-set value in storage: {}",
204 result.error().message());
207 return result.value();
213 auto result = storage.get(key);
214 if (result.has_error()
216 SL_WARN(
log_,
"Can't get value in storage: {}", result.error().message());
225 auto request = std::make_shared<HttpRequest>(request_id);
227 if (not request->init(method, uri, std::move(meta))) {
241 RequestId id, std::string_view name, std::string_view value) {
246 auto &request = it->second;
248 request->addRequestHeader(name, value);
255 std::optional<std::chrono::milliseconds> timeout = std::nullopt;
256 if (deadline.has_value()) {
257 timeout = std::chrono::duration_cast<std::chrono::milliseconds>(
258 std::chrono::milliseconds(deadline.value())
259 -
clock_->now().time_since_epoch());
266 auto &request = it->second;
268 auto result = request->writeRequestBody(chunk, timeout);
274 const std::vector<RequestId> &ids, std::optional<Timestamp> deadline) {
275 std::vector<HttpStatus> result;
276 result.reserve(ids.size());
278 for (
auto id : ids) {
284 auto &request = it->second;
287 while ((status = request->status()) == 0) {
288 if (deadline.has_value()
289 and (
clock_->zero() + std::chrono::milliseconds(deadline.value()))
302 std::vector<std::pair<std::string, std::string>>
304 std::vector<std::pair<std::string, std::string>> result;
309 auto &request = it->second;
311 result = request->getResponseHeaders();
322 auto &request = it->second;
324 std::optional<std::chrono::milliseconds> timeout = std::nullopt;
325 if (deadline.has_value()) {
326 timeout = std::chrono::duration_cast<std::chrono::milliseconds>(
327 std::chrono::milliseconds(deadline.value())
328 -
clock_->now().time_since_epoch());
331 auto result = request->readResponseBody(chunk, timeout);
337 std::vector<libp2p::peer::PeerId> nodes,
bool authorized_only) {
340 throw std::runtime_error(
341 "This method of OffchainWorkerImpl is not implemented yet");
OffchainStorage & getStorage(StorageType storage_type)
Clock::TimePoint TimePoint
Class represents arbitrary (including empty) byte buffer.
std::map< RequestId, std::shared_ptr< HttpRequest > > active_http_requests_
struct kagome::network::Roles::@11 flags
std::shared_ptr< api::AuthorApi > author_api_
bool localStorageCompareAndSet(StorageType storage_type, const common::BufferView &key, std::optional< common::BufferView > expected, common::Buffer value) override
Result< Success, HttpError > httpRequestWriteBody(RequestId id, common::Buffer chunk, std::optional< Timestamp > deadline) override
std::string_view to_string(SlotType s)
outcome::result< void > run() override
void sleepUntil(Timestamp timestamp) override
std::shared_ptr< OffchainWorkerPool > ocw_pool_
virtual network::Roles roles() const =0
const network::OwnPeerInfo & current_peer_info_
std::vector< HttpStatus > httpResponseWait(const std::vector< RequestId > &ids, std::optional< Timestamp > deadline) override
RandomSeed randomSeed() override
Result< Success, Failure > httpRequestAddHeader(RequestId id, std::string_view name, std::string_view value) override
OffchainWorkerImpl(const application::AppConfiguration &app_config, std::shared_ptr< clock::SystemClock > clock, std::shared_ptr< crypto::Hasher > hasher, std::shared_ptr< storage::BufferStorage > storage, std::shared_ptr< crypto::CSPRNG > random_generator, std::shared_ptr< api::AuthorApi > author_api, const network::OwnPeerInfo ¤t_peer_info, std::shared_ptr< OffchainPersistentStorage > persistent_storage, std::shared_ptr< runtime::Executor > executor, const primitives::BlockHeader &header, std::shared_ptr< OffchainWorkerPool > ocw_pool)
static constexpr auto latency_of_waiting
std::vector< std::pair< std::string, std::string > > httpResponseHeaders(RequestId id) override
Result< RequestId, Failure > httpRequestStart(HttpMethod method, std::string_view uri, common::Buffer meta) override
uint16_t HttpStatus
HTTP status codes that can get returned by certain Offchain funcs. 0: the specified request identifie...
bool isValidator() const override
const primitives::BlockHeader header_
Result< uint32_t, HttpError > httpResponseReadBody(RequestId id, common::Buffer &chunk, std::optional< Timestamp > deadline) override
constexpr HttpStatus InvalidIdentifier(0)
outcome::result< common::Buffer > localStorageGet(StorageType storage_type, const common::BufferView &key) override
Timestamp timestamp() override
std::shared_ptr< crypto::Hasher > hasher_
Result< Success, Failure > submitTransaction(const primitives::Extrinsic &ext) override
Result< OpaqueNetworkState, Failure > networkState() override
void localStorageSet(StorageType storage_type, const common::BufferView &key, common::Buffer value) override
uint64_t Timestamp
Timestamp is milliseconds since UNIX Epoch.
The ID of the request is invalid.
std::shared_ptr< crypto::CSPRNG > random_generator_
std::shared_ptr< offchain::OffchainLocalStorage > local_storage_
std::shared_ptr< clock::SystemClock > clock_
Extrinsic class represents extrinsic.
Logger createLogger(const std::string &tag)
const primitives::BlockInfo block_
const application::AppConfiguration & app_config_
void localStorageClear(StorageType storage_type, const common::BufferView &key) override
void setAuthorizedNodes(std::vector< libp2p::peer::PeerId > nodes, bool authorized_only) override
std::shared_ptr< offchain::OffchainPersistentStorage > persistent_storage_
constexpr HttpStatus DeadlineHasReached(10)
std::shared_ptr< runtime::Executor > executor_