6 #ifndef KAGOME_NETWORK_REQUESTRESPONSEPROTOCOL     7 #define KAGOME_NETWORK_REQUESTRESPONSEPROTOCOL    17   template <
typename Request, 
typename Response, 
typename ReadWriter>
    20         std::enable_shared_from_this<
    21             RequestResponseProtocol<Request, Response, ReadWriter>> {
    46         std::function<
void(outcome::result<ResponseType>)> &&response_handler) {
    48           base_.
host().getPeerRepository().getAddressRepository().getAddresses(
    50       if (!addresses_res.has_value()) {
    51         response_handler(addresses_res.as_failure());
    57           {peer_id, std::move(addresses_res.value())},
    58           [wptr{this->weak_from_this()},
    59            response_handler{std::move(response_handler)},
    60            request{std::move(request)}](
auto &&stream_res) 
mutable {
    61             if (!stream_res.has_value()) {
    62               response_handler(stream_res.as_failure());
    66             auto stream = std::move(stream_res.value());
    67             auto self = wptr.lock();
    69               stream->close([stream](
auto &&) {});
    74             SL_DEBUG(self->base_.logger(),
    75                      "Established outgoing {} stream with {}",
    77                      stream->remotePeerId().value());
    79             self->writeRequest(std::move(stream),
    81                                std::move(response_handler));
    87         RequestType request, std::shared_ptr<Stream> stream) = 0;
    98       BOOST_ASSERT(stream->remotePeerId().has_value());
   104         std::function<
void(outcome::result<std::shared_ptr<Stream>>)> &&cb)
   107                "Connect for {} stream with {}",
   114           [wptr{this->weak_from_this()},
   115            peer_id{peer_info.id},
   116            cb{std::move(cb)}](
auto &&stream_and_proto) 
mutable {
   117             if (!stream_and_proto.has_value()) {
   118               cb(stream_and_proto.as_failure());
   122             auto stream = std::move(stream_and_proto.value().stream);
   123             auto self = wptr.lock();
   126               stream->close([stream](
auto &&) {});
   130             SL_DEBUG(self->base_.logger(),
   131                      "Established connection over {} stream with {}",
   132                      self->protocolName(),
   134             cb(std::move(stream));
   138     template <
typename M>
   139     void write(std::shared_ptr<Stream> stream,
   141                std::function<
void(outcome::result<void>,
   142                                   std::shared_ptr<Stream>)> &&cb) {
   144           std::is_same_v<M, RequestType> || std::is_same_v<M, ResponseType>);
   146                "Write msg into {} stream with {}",
   148                stream->remotePeerId().value());
   150       auto read_writer = std::make_shared<ReadWriterType>(stream);
   153           [stream{std::move(stream)},
   154            wptr{this->weak_from_this()},
   155            cb{std::move(cb)}](
auto &&write_res) 
mutable {
   156             auto self = wptr.lock();
   159               stream->close([stream](
auto &&) {});
   163             if (!write_res.has_value()) {
   164               SL_VERBOSE(self->base_.logger(),
   165                          "Error at write into {} stream with {}: {}",
   166                          self->protocolName(),
   167                          stream->remotePeerId().value(),
   168                          write_res.error().message());
   170               cb(write_res.as_failure(), 
nullptr);
   171               self->base_.closeStream(std::move(wptr), std::move(stream));
   176                 self->base_.logger(),
   177                 "Request written successful into outgoing {} stream with {}",
   178                 self->protocolName(),
   179                 stream->remotePeerId().value());
   180             cb(outcome::success(), std::move(stream));
   186                       std::function<
void(outcome::result<ResponseType>)> &&cb) {
   190           [wptr{this->weak_from_this()}, cb{std::move(cb)}](
   191               auto &&write_res, std::shared_ptr<Stream> stream) 
mutable {
   192             auto self = wptr.lock();
   197             if (!write_res.has_value()) {
   198               cb(write_res.as_failure());
   201             self->readResponse(std::move(stream), std::move(cb));
   209           [wptr{this->weak_from_this()}](
auto &&result,
   210                                          std::shared_ptr<Stream> stream) {
   212               auto self = wptr.lock();
   214                   self, 
"If self not exists then we can not get result as OK.");
   215               self->base_.closeStream(std::move(wptr), std::move(stream));
   220     template <
typename M>
   222         std::shared_ptr<Stream> stream,
   223         std::function<
void(outcome::result<M>, std::shared_ptr<Stream>)> &&cb) {
   225                "Read from {} stream with {}",
   227                stream->remotePeerId().value());
   229       auto read_writer = std::make_shared<ReadWriterType>(stream);
   230       read_writer->template read<M>(
   231           [stream{std::move(stream)},
   232            wptr{this->weak_from_this()},
   233            cb{std::move(cb)}](
auto &&read_result) 
mutable {
   234             auto self = wptr.lock();
   237               stream->close([stream](
auto &&) {});
   241             if (!read_result.has_value()) {
   242               SL_VERBOSE(self->base_.logger(),
   243                          "Error at read from outgoing {} stream with {}: {}",
   244                          self->protocolName(),
   245                          stream->remotePeerId().value(),
   246                          read_result.error().message());
   248               cb(read_result.as_failure(), 
nullptr);
   249               self->base_.closeStream(std::move(wptr), std::move(stream));
   253             SL_DEBUG(self->base_.logger(),
   254                      "Successful response read from outgoing {} stream with {}",
   255                      self->protocolName(),
   256                      stream->remotePeerId().value());
   257             cb(std::move(read_result.value()), std::move(stream));
   262                       std::function<
void(outcome::result<ResponseType>)> &&cb) {
   263       return read<ResponseType>(
   265           [cb{std::move(cb)}, wptr{this->weak_from_this()}](
   266               auto &&result, std::shared_ptr<Stream> stream) {
   269               auto self = wptr.lock();
   272                   && !!
"If self not exists then we can not get result as OK.");
   273               self->base_.closeStream(std::move(wptr), std::move(stream));
   279       return read<RequestType>(
   281           [wptr{this->weak_from_this()}](outcome::result<RequestType> result,
   282                                          std::shared_ptr<Stream> stream) {
   285             auto self = wptr.lock();
   287                    && !!
"If self not exists then we can not get result as OK.");
   289             auto request = std::move(result.value());
   290             auto response_result = 
self->onRxRequest(request, stream);
   291             if (!response_result) {
   292               SL_VERBOSE(self->base_.logger(),
   293                          "Error at execute request from incoming {} stream "   295                          self->protocolName(),
   296                          stream->remotePeerId().value(),
   297                          response_result.error().message());
   298               self->base_.closeStream(std::move(wptr), std::move(stream));
   301             self->writeResponse(std::move(stream),
   302                                 std::move(response_result.value()));
   312 #endif  // KAGOME_NETWORK_REQUESTRESPONSEPROTOCOL 
virtual ~RequestResponseProtocol()
 
void read(std::shared_ptr< Stream > stream, std::function< void(outcome::result< M >, std::shared_ptr< Stream >)> &&cb)
 
log::Logger const & logger() const 
 
libp2p::peer::PeerInfo PeerInfo
 
libp2p::peer::Protocol Protocol
 
libp2p::peer::PeerId PeerId
 
void writeRequest(std::shared_ptr< Stream > stream, RequestType request, std::function< void(outcome::result< ResponseType >)> &&cb)
 
void onIncomingStream(std::shared_ptr< Stream > stream) override
 
const Protocol & protocolName() const override
 
void write(std::shared_ptr< Stream > stream, M msg, std::function< void(outcome::result< void >, std::shared_ptr< Stream >)> &&cb)
 
void readRequest(std::shared_ptr< Stream > stream)
 
ProtocolBaseImpl & base()
 
RequestResponseProtocol(libp2p::Host &host, Protocol const &protocol, ProtocolName const &name)
 
Protocols const & protocolIds() const 
 
void newOutgoingStream(const PeerInfo &peer_info, std::function< void(outcome::result< std::shared_ptr< Stream >>)> &&cb) override
 
bool start(std::weak_ptr< T > wptr)
 
virtual void onTxRequest(RequestType const &request)=0
 
virtual outcome::result< ResponseType > onRxRequest(RequestType request, std::shared_ptr< Stream > stream)=0
 
void readResponse(std::shared_ptr< Stream > stream, std::function< void(outcome::result< ResponseType >)> &&cb)
 
void doRequest(const PeerId &peer_id, RequestType request, std::function< void(outcome::result< ResponseType >)> &&response_handler)
 
void writeResponse(std::shared_ptr< Stream > stream, ResponseType response)