6 #ifndef KAGOME_NETWORK_REQUESTRESPONSEPROTOCOL 7 #define KAGOME_NETWORK_REQUESTRESPONSEPROTOCOL 17 template <
typename Request,
typename Response,
typename ReadWriter>
20 std::enable_shared_from_this<
21 RequestResponseProtocol<Request, Response, ReadWriter>> {
46 std::function<
void(outcome::result<ResponseType>)> &&response_handler) {
48 base_.
host().getPeerRepository().getAddressRepository().getAddresses(
50 if (!addresses_res.has_value()) {
51 response_handler(addresses_res.as_failure());
57 {peer_id, std::move(addresses_res.value())},
58 [wptr{this->weak_from_this()},
59 response_handler{std::move(response_handler)},
60 request{std::move(request)}](
auto &&stream_res)
mutable {
61 if (!stream_res.has_value()) {
62 response_handler(stream_res.as_failure());
66 auto stream = std::move(stream_res.value());
67 auto self = wptr.lock();
69 stream->close([stream](
auto &&) {});
74 SL_DEBUG(self->base_.logger(),
75 "Established outgoing {} stream with {}",
77 stream->remotePeerId().value());
79 self->writeRequest(std::move(stream),
81 std::move(response_handler));
87 RequestType request, std::shared_ptr<Stream> stream) = 0;
98 BOOST_ASSERT(stream->remotePeerId().has_value());
104 std::function<
void(outcome::result<std::shared_ptr<Stream>>)> &&cb)
107 "Connect for {} stream with {}",
114 [wptr{this->weak_from_this()},
115 peer_id{peer_info.id},
116 cb{std::move(cb)}](
auto &&stream_and_proto)
mutable {
117 if (!stream_and_proto.has_value()) {
118 cb(stream_and_proto.as_failure());
122 auto stream = std::move(stream_and_proto.value().stream);
123 auto self = wptr.lock();
126 stream->close([stream](
auto &&) {});
130 SL_DEBUG(self->base_.logger(),
131 "Established connection over {} stream with {}",
132 self->protocolName(),
134 cb(std::move(stream));
138 template <
typename M>
139 void write(std::shared_ptr<Stream> stream,
141 std::function<
void(outcome::result<void>,
142 std::shared_ptr<Stream>)> &&cb) {
144 std::is_same_v<M, RequestType> || std::is_same_v<M, ResponseType>);
146 "Write msg into {} stream with {}",
148 stream->remotePeerId().value());
150 auto read_writer = std::make_shared<ReadWriterType>(stream);
153 [stream{std::move(stream)},
154 wptr{this->weak_from_this()},
155 cb{std::move(cb)}](
auto &&write_res)
mutable {
156 auto self = wptr.lock();
159 stream->close([stream](
auto &&) {});
163 if (!write_res.has_value()) {
164 SL_VERBOSE(self->base_.logger(),
165 "Error at write into {} stream with {}: {}",
166 self->protocolName(),
167 stream->remotePeerId().value(),
168 write_res.error().message());
170 cb(write_res.as_failure(),
nullptr);
171 self->base_.closeStream(std::move(wptr), std::move(stream));
176 self->base_.logger(),
177 "Request written successful into outgoing {} stream with {}",
178 self->protocolName(),
179 stream->remotePeerId().value());
180 cb(outcome::success(), std::move(stream));
186 std::function<
void(outcome::result<ResponseType>)> &&cb) {
190 [wptr{this->weak_from_this()}, cb{std::move(cb)}](
191 auto &&write_res, std::shared_ptr<Stream> stream)
mutable {
192 auto self = wptr.lock();
197 if (!write_res.has_value()) {
198 cb(write_res.as_failure());
201 self->readResponse(std::move(stream), std::move(cb));
209 [wptr{this->weak_from_this()}](
auto &&result,
210 std::shared_ptr<Stream> stream) {
212 auto self = wptr.lock();
214 self,
"If self not exists then we can not get result as OK.");
215 self->base_.closeStream(std::move(wptr), std::move(stream));
220 template <
typename M>
222 std::shared_ptr<Stream> stream,
223 std::function<
void(outcome::result<M>, std::shared_ptr<Stream>)> &&cb) {
225 "Read from {} stream with {}",
227 stream->remotePeerId().value());
229 auto read_writer = std::make_shared<ReadWriterType>(stream);
230 read_writer->template read<M>(
231 [stream{std::move(stream)},
232 wptr{this->weak_from_this()},
233 cb{std::move(cb)}](
auto &&read_result)
mutable {
234 auto self = wptr.lock();
237 stream->close([stream](
auto &&) {});
241 if (!read_result.has_value()) {
242 SL_VERBOSE(self->base_.logger(),
243 "Error at read from outgoing {} stream with {}: {}",
244 self->protocolName(),
245 stream->remotePeerId().value(),
246 read_result.error().message());
248 cb(read_result.as_failure(),
nullptr);
249 self->base_.closeStream(std::move(wptr), std::move(stream));
253 SL_DEBUG(self->base_.logger(),
254 "Successful response read from outgoing {} stream with {}",
255 self->protocolName(),
256 stream->remotePeerId().value());
257 cb(std::move(read_result.value()), std::move(stream));
262 std::function<
void(outcome::result<ResponseType>)> &&cb) {
263 return read<ResponseType>(
265 [cb{std::move(cb)}, wptr{this->weak_from_this()}](
266 auto &&result, std::shared_ptr<Stream> stream) {
269 auto self = wptr.lock();
272 && !!
"If self not exists then we can not get result as OK.");
273 self->base_.closeStream(std::move(wptr), std::move(stream));
279 return read<RequestType>(
281 [wptr{this->weak_from_this()}](outcome::result<RequestType> result,
282 std::shared_ptr<Stream> stream) {
285 auto self = wptr.lock();
287 && !!
"If self not exists then we can not get result as OK.");
289 auto request = std::move(result.value());
290 auto response_result =
self->onRxRequest(request, stream);
291 if (!response_result) {
292 SL_VERBOSE(self->base_.logger(),
293 "Error at execute request from incoming {} stream " 295 self->protocolName(),
296 stream->remotePeerId().value(),
297 response_result.error().message());
298 self->base_.closeStream(std::move(wptr), std::move(stream));
301 self->writeResponse(std::move(stream),
302 std::move(response_result.value()));
312 #endif // KAGOME_NETWORK_REQUESTRESPONSEPROTOCOL
virtual ~RequestResponseProtocol()
void read(std::shared_ptr< Stream > stream, std::function< void(outcome::result< M >, std::shared_ptr< Stream >)> &&cb)
log::Logger const & logger() const
libp2p::peer::PeerInfo PeerInfo
libp2p::peer::Protocol Protocol
libp2p::peer::PeerId PeerId
void writeRequest(std::shared_ptr< Stream > stream, RequestType request, std::function< void(outcome::result< ResponseType >)> &&cb)
void onIncomingStream(std::shared_ptr< Stream > stream) override
const Protocol & protocolName() const override
void write(std::shared_ptr< Stream > stream, M msg, std::function< void(outcome::result< void >, std::shared_ptr< Stream >)> &&cb)
void readRequest(std::shared_ptr< Stream > stream)
ProtocolBaseImpl & base()
RequestResponseProtocol(libp2p::Host &host, Protocol const &protocol, ProtocolName const &name)
Protocols const & protocolIds() const
void newOutgoingStream(const PeerInfo &peer_info, std::function< void(outcome::result< std::shared_ptr< Stream >>)> &&cb) override
bool start(std::weak_ptr< T > wptr)
virtual void onTxRequest(RequestType const &request)=0
virtual outcome::result< ResponseType > onRxRequest(RequestType request, std::shared_ptr< Stream > stream)=0
void readResponse(std::shared_ptr< Stream > stream, std::function< void(outcome::result< ResponseType >)> &&cb)
void doRequest(const PeerId &peer_id, RequestType request, std::function< void(outcome::result< ResponseType >)> &&response_handler)
void writeResponse(std::shared_ptr< Stream > stream, ResponseType response)