19 std::shared_ptr<StateProtocolObserver> state_observer)
20 : host_(host), state_observer_(
std::move(state_observer)) {
27 host_.setProtocolHandler(
28 {
protocol_}, [wp = weak_from_this()](
auto &&stream_and_proto) {
29 if (
auto self = wp.lock()) {
30 if (
auto peer_id = stream_and_proto.stream->remotePeerId()) {
32 "Handled {} protocol stream from {:l}",
35 self->onIncomingStream(
36 std::forward<decltype(stream_and_proto.stream)>(
37 stream_and_proto.stream));
40 self->log_->warn(
"Handled {} protocol stream from unknown peer",
52 BOOST_ASSERT(stream->remotePeerId().has_value());
59 std::function<
void(outcome::result<std::shared_ptr<Stream>>)> &&cb) {
60 SL_DEBUG(
log_,
"Connect for {} stream with {}",
protocol_, peer_info.id);
65 [wp = weak_from_this(), peer_id = peer_info.id, cb = std::move(cb)](
66 auto &&stream_res)
mutable {
67 auto self = wp.lock();
69 cb(ProtocolError::GONE);
73 if (not stream_res.has_value()) {
76 "Error happened while connection over {} stream with {}: {}",
79 stream_res.error().message());
80 cb(stream_res.as_failure());
83 auto &stream_and_proto = stream_res.value();
86 "Established connection over {} stream with {}",
90 cb(std::move(stream_and_proto.stream));
95 auto read_writer = std::make_shared<ProtobufMessageReadWriter>(stream);
98 "Read request from incoming {} stream with {}",
100 stream->remotePeerId().value());
102 read_writer->read<
StateRequest>([stream, wp = weak_from_this()](
103 auto &&state_request_res)
mutable {
104 auto self = wp.lock();
110 if (not state_request_res.has_value()) {
111 SL_VERBOSE(self->log_,
112 "Error at read request from incoming {} stream with {}: {}",
114 stream->remotePeerId().value(),
115 state_request_res.error().message());
121 auto &state_request = state_request_res.value();
123 if (self->log_->level() >= log::Level::VERBOSE) {
125 if (!state_request.start.empty()) {
126 keys =
" starting with keys [";
127 for (
const auto &key : state_request.start) {
135 "State request is received from incoming {} stream with {} for " 138 stream->remotePeerId().value(),
143 auto state_response_res =
144 self->state_observer_->onStateRequest(state_request);
146 if (not state_response_res) {
149 "Error at execute request from incoming {} stream with {}: {}",
151 stream->remotePeerId().value(),
152 state_response_res.error().message());
158 self->writeResponse(std::move(stream),
159 std::move(state_response_res.value()));
166 std::function<
void(outcome::result<StateResponse>)> &&response_handler) {
168 host_.getPeerRepository().getAddressRepository().getAddresses(peer_id);
169 if (not addresses_res.has_value()) {
170 response_handler(addresses_res.as_failure());
175 {peer_id, std::move(addresses_res.value())},
176 [wp = weak_from_this(),
177 response_handler = std::move(response_handler),
178 state_request = std::move(state_request)](
auto &&stream_res)
mutable {
179 if (not stream_res.has_value()) {
180 response_handler(stream_res.as_failure());
183 auto &stream = stream_res.value();
185 auto self = wp.lock();
193 "Established outgoing {} stream with {}",
195 stream->remotePeerId().value());
197 self->writeRequest(stream,
198 std::move(state_request),
201 response_handler = std::move(response_handler)](
202 auto &&write_res)
mutable {
203 auto self = wp.lock();
210 SL_DEBUG(self->log_,
"State request sent");
212 if (not write_res.has_value()) {
213 SL_WARN(self->log_,
"Error getting response");
215 response_handler(write_res.as_failure());
219 self->readResponse(std::move(stream),
220 std::move(response_handler));
227 auto read_writer = std::make_shared<ProtobufMessageReadWriter>(stream);
231 [stream = std::move(stream),
232 wp = weak_from_this()](
auto &&write_res)
mutable {
233 auto self = wp.lock();
239 if (not write_res.has_value()) {
242 "Error at writing response to incoming {} stream with {}: {}",
244 stream->remotePeerId().value(),
245 write_res.error().message());
250 stream->close([](
auto &&...) {});
255 std::shared_ptr<Stream> stream,
257 std::function<
void(outcome::result<void>)> &&cb) {
258 auto read_writer = std::make_shared<ProtobufMessageReadWriter>(stream);
261 "Write request info outgoing {} stream with {}",
263 stream->remotePeerId().value());
267 [stream, wp = weak_from_this(), cb = std::move(cb)](
268 auto &&write_res)
mutable {
269 auto self = wp.lock();
276 if (not write_res.has_value()) {
279 "Error at write request into outgoing {} stream with {}: {}",
281 stream->remotePeerId().value(),
282 write_res.error().message());
285 cb(write_res.as_failure());
290 "Request written successful into outgoing {} stream with {}",
292 stream->remotePeerId().value());
294 cb(outcome::success());
299 std::shared_ptr<Stream> stream,
300 std::function<
void(outcome::result<StateResponse>)> &&response_handler) {
301 auto read_writer = std::make_shared<ProtobufMessageReadWriter>(stream);
304 "Read response from outgoing {} stream with {}",
306 stream->remotePeerId().value());
309 wp = weak_from_this(),
311 std::move(response_handler)](
312 auto &&state_response_res)
mutable {
313 auto self = wp.lock();
320 if (not state_response_res.has_value()) {
322 "Error at read response from outgoing {} stream with {}: {}",
324 stream->remotePeerId().value(),
325 state_response_res.error().message());
328 response_handler(state_response_res.as_failure());
331 auto &state_response = state_response_res.value();
334 "Successful response read from outgoing {} stream with {}",
336 stream->remotePeerId().value());
339 response_handler(std::move(state_response));
primitives::BlockHash hash
Block header hash.
StateProtocolImpl(libp2p::Host &host, const application::ChainSpec &chain_spec, std::shared_ptr< StateProtocolObserver > state_observer)
void writeResponse(std::shared_ptr< Stream > stream, StateResponse state_response)
const libp2p::peer::Protocol kStateProtocol
void writeRequest(std::shared_ptr< Stream > stream, StateRequest state_request, std::function< void(outcome::result< void >)> &&cb)
const libp2p::peer::Protocol protocol_
libp2p::peer::PeerInfo PeerInfo
libp2p::peer::Protocol Protocol
libp2p::peer::PeerId PeerId
void newOutgoingStream(const PeerInfo &peer_info, std::function< void(outcome::result< std::shared_ptr< Stream >>)> &&cb) override
void request(const PeerId &peer_id, StateRequest state_request, std::function< void(outcome::result< StateResponse >)> &&response_handler) override
Make async request to peer and return response in callback.
virtual const std::string & protocolId() const =0
std::string toHex() const noexcept
void readRequest(std::shared_ptr< Stream > stream)
void onIncomingStream(std::shared_ptr< Stream > stream) override
std::shared_ptr< StateProtocolObserver > state_observer_
void readResponse(std::shared_ptr< Stream > stream, std::function< void(outcome::result< StateResponse >)> &&response_handler)