Kagome
Polkadot Runtime Engine in C++17
request_response_protocol.hpp
Go to the documentation of this file.
1 
6 #ifndef KAGOME_NETWORK_REQUESTRESPONSEPROTOCOL
7 #define KAGOME_NETWORK_REQUESTRESPONSEPROTOCOL
8 
10 
12 #include "protocol_error.hpp"
13 #include "utils/box.hpp"
14 
15 namespace kagome::network {
16 
17  template <typename Request, typename Response, typename ReadWriter>
19  : ProtocolBase,
20  std::enable_shared_from_this<
21  RequestResponseProtocol<Request, Response, ReadWriter>> {
22  using RequestType = Request;
23  using ResponseType = Response;
24  using ReadWriterType = ReadWriter;
25 
27  Protocol const &protocol,
28  ProtocolName const &name)
29  : base_(host, {protocol}, name), protocol_{protocol} {}
31 
32  bool start() override {
33  return base_.start(this->weak_from_this());
34  }
35  bool stop() override {
36  return base_.stop();
37  }
38 
39  const Protocol &protocolName() const override {
40  return protocol_;
41  }
42 
43  void doRequest(
44  const PeerId &peer_id,
45  RequestType request,
46  std::function<void(outcome::result<ResponseType>)> &&response_handler) {
47  auto addresses_res =
48  base_.host().getPeerRepository().getAddressRepository().getAddresses(
49  peer_id);
50  if (!addresses_res.has_value()) {
51  response_handler(addresses_res.as_failure());
52  return;
53  }
54 
55  onTxRequest(request);
57  {peer_id, std::move(addresses_res.value())},
58  [wptr{this->weak_from_this()},
59  response_handler{std::move(response_handler)},
60  request{std::move(request)}](auto &&stream_res) mutable {
61  if (!stream_res.has_value()) {
62  response_handler(stream_res.as_failure());
63  return;
64  }
65 
66  auto stream = std::move(stream_res.value());
67  auto self = wptr.lock();
68  if (!self) {
69  stream->close([stream](auto &&) {});
70  response_handler(ProtocolError::GONE);
71  return;
72  }
73 
74  SL_DEBUG(self->base_.logger(),
75  "Established outgoing {} stream with {}",
76  self->protocolName(),
77  stream->remotePeerId().value());
78 
79  self->writeRequest(std::move(stream),
80  std::move(request),
81  std::move(response_handler));
82  });
83  }
84 
85  protected:
86  virtual outcome::result<ResponseType> onRxRequest(
87  RequestType request, std::shared_ptr<Stream> stream) = 0;
88  virtual void onTxRequest(RequestType const &request) = 0;
89 
91  return base_;
92  }
93 
94  private:
95  friend class ProtocolBaseImpl;
96 
97  void onIncomingStream(std::shared_ptr<Stream> stream) override {
98  BOOST_ASSERT(stream->remotePeerId().has_value());
99  readRequest(std::move(stream));
100  }
101 
103  const PeerInfo &peer_info,
104  std::function<void(outcome::result<std::shared_ptr<Stream>>)> &&cb)
105  override {
106  SL_DEBUG(base_.logger(),
107  "Connect for {} stream with {}",
108  protocolName(),
109  peer_info.id);
110 
111  base_.host().newStream(
112  peer_info.id,
113  base_.protocolIds(),
114  [wptr{this->weak_from_this()},
115  peer_id{peer_info.id},
116  cb{std::move(cb)}](auto &&stream_and_proto) mutable {
117  if (!stream_and_proto.has_value()) {
118  cb(stream_and_proto.as_failure());
119  return;
120  }
121 
122  auto stream = std::move(stream_and_proto.value().stream);
123  auto self = wptr.lock();
124  if (!self) {
126  stream->close([stream](auto &&) {});
127  return;
128  }
129 
130  SL_DEBUG(self->base_.logger(),
131  "Established connection over {} stream with {}",
132  self->protocolName(),
133  peer_id);
134  cb(std::move(stream));
135  });
136  }
137 
138  template <typename M>
139  void write(std::shared_ptr<Stream> stream,
140  M msg,
141  std::function<void(outcome::result<void>,
142  std::shared_ptr<Stream>)> &&cb) {
143  static_assert(
144  std::is_same_v<M, RequestType> || std::is_same_v<M, ResponseType>);
145  SL_DEBUG(base_.logger(),
146  "Write msg into {} stream with {}",
147  protocolName(),
148  stream->remotePeerId().value());
149 
150  auto read_writer = std::make_shared<ReadWriterType>(stream);
151  read_writer->write(
152  msg,
153  [stream{std::move(stream)},
154  wptr{this->weak_from_this()},
155  cb{std::move(cb)}](auto &&write_res) mutable {
156  auto self = wptr.lock();
157  if (not self) {
158  cb(ProtocolError::GONE, nullptr);
159  stream->close([stream](auto &&) {});
160  return;
161  }
162 
163  if (!write_res.has_value()) {
164  SL_VERBOSE(self->base_.logger(),
165  "Error at write into {} stream with {}: {}",
166  self->protocolName(),
167  stream->remotePeerId().value(),
168  write_res.error().message());
169 
170  cb(write_res.as_failure(), nullptr);
171  self->base_.closeStream(std::move(wptr), std::move(stream));
172  return;
173  }
174 
175  SL_DEBUG(
176  self->base_.logger(),
177  "Request written successful into outgoing {} stream with {}",
178  self->protocolName(),
179  stream->remotePeerId().value());
180  cb(outcome::success(), std::move(stream));
181  });
182  }
183 
184  void writeRequest(std::shared_ptr<Stream> stream,
185  RequestType request,
186  std::function<void(outcome::result<ResponseType>)> &&cb) {
187  return write(
188  std::move(stream),
189  std::move(request),
190  [wptr{this->weak_from_this()}, cb{std::move(cb)}](
191  auto &&write_res, std::shared_ptr<Stream> stream) mutable {
192  auto self = wptr.lock();
193  if (!self) {
195  return;
196  }
197  if (!write_res.has_value()) {
198  cb(write_res.as_failure());
199  return;
200  }
201  self->readResponse(std::move(stream), std::move(cb));
202  });
203  }
204 
205  void writeResponse(std::shared_ptr<Stream> stream, ResponseType response) {
206  return write(
207  std::move(stream),
208  std::move(response),
209  [wptr{this->weak_from_this()}](auto &&result,
210  std::shared_ptr<Stream> stream) {
211  if (result) {
212  auto self = wptr.lock();
213  BOOST_ASSERT_MSG(
214  self, "If self not exists then we can not get result as OK.");
215  self->base_.closeStream(std::move(wptr), std::move(stream));
216  }
217  });
218  }
219 
220  template <typename M>
221  void read(
222  std::shared_ptr<Stream> stream,
223  std::function<void(outcome::result<M>, std::shared_ptr<Stream>)> &&cb) {
224  SL_DEBUG(base_.logger(),
225  "Read from {} stream with {}",
226  protocolName(),
227  stream->remotePeerId().value());
228 
229  auto read_writer = std::make_shared<ReadWriterType>(stream);
230  read_writer->template read<M>(
231  [stream{std::move(stream)},
232  wptr{this->weak_from_this()},
233  cb{std::move(cb)}](auto &&read_result) mutable {
234  auto self = wptr.lock();
235  if (!self) {
236  cb(ProtocolError::GONE, nullptr);
237  stream->close([stream](auto &&) {});
238  return;
239  }
240 
241  if (!read_result.has_value()) {
242  SL_VERBOSE(self->base_.logger(),
243  "Error at read from outgoing {} stream with {}: {}",
244  self->protocolName(),
245  stream->remotePeerId().value(),
246  read_result.error().message());
247 
248  cb(read_result.as_failure(), nullptr);
249  self->base_.closeStream(std::move(wptr), std::move(stream));
250  return;
251  }
252 
253  SL_DEBUG(self->base_.logger(),
254  "Successful response read from outgoing {} stream with {}",
255  self->protocolName(),
256  stream->remotePeerId().value());
257  cb(std::move(read_result.value()), std::move(stream));
258  });
259  }
260 
261  void readResponse(std::shared_ptr<Stream> stream,
262  std::function<void(outcome::result<ResponseType>)> &&cb) {
263  return read<ResponseType>(
264  std::move(stream),
265  [cb{std::move(cb)}, wptr{this->weak_from_this()}](
266  auto &&result, std::shared_ptr<Stream> stream) {
267  cb(result);
268  if (result) {
269  auto self = wptr.lock();
270  assert(
271  self
272  && !!"If self not exists then we can not get result as OK.");
273  self->base_.closeStream(std::move(wptr), std::move(stream));
274  }
275  });
276  }
277 
278  void readRequest(std::shared_ptr<Stream> stream) {
279  return read<RequestType>(
280  std::move(stream),
281  [wptr{this->weak_from_this()}](outcome::result<RequestType> result,
282  std::shared_ptr<Stream> stream) {
283  if (!result) return;
284 
285  auto self = wptr.lock();
286  assert(self
287  && !!"If self not exists then we can not get result as OK.");
288 
289  auto request = std::move(result.value());
290  auto response_result = self->onRxRequest(request, stream);
291  if (!response_result) {
292  SL_VERBOSE(self->base_.logger(),
293  "Error at execute request from incoming {} stream "
294  "with {}: {}",
295  self->protocolName(),
296  stream->remotePeerId().value(),
297  response_result.error().message());
298  self->base_.closeStream(std::move(wptr), std::move(stream));
299  return;
300  }
301  self->writeResponse(std::move(stream),
302  std::move(response_result.value()));
303  });
304  }
305 
308  };
309 
310 } // namespace kagome::network
311 
312 #endif // KAGOME_NETWORK_REQUESTRESPONSEPROTOCOL
void read(std::shared_ptr< Stream > stream, std::function< void(outcome::result< M >, std::shared_ptr< Stream >)> &&cb)
log::Logger const & logger() const
libp2p::peer::PeerInfo PeerInfo
libp2p::peer::Protocol Protocol
libp2p::peer::PeerId PeerId
void writeRequest(std::shared_ptr< Stream > stream, RequestType request, std::function< void(outcome::result< ResponseType >)> &&cb)
void onIncomingStream(std::shared_ptr< Stream > stream) override
const Protocol & protocolName() const override
void write(std::shared_ptr< Stream > stream, M msg, std::function< void(outcome::result< void >, std::shared_ptr< Stream >)> &&cb)
void readRequest(std::shared_ptr< Stream > stream)
RequestResponseProtocol(libp2p::Host &host, Protocol const &protocol, ProtocolName const &name)
Protocols const & protocolIds() const
std::string ProtocolName
void newOutgoingStream(const PeerInfo &peer_info, std::function< void(outcome::result< std::shared_ptr< Stream >>)> &&cb) override
bool start(std::weak_ptr< T > wptr)
virtual void onTxRequest(RequestType const &request)=0
virtual outcome::result< ResponseType > onRxRequest(RequestType request, std::shared_ptr< Stream > stream)=0
void readResponse(std::shared_ptr< Stream > stream, std::function< void(outcome::result< ResponseType >)> &&cb)
void doRequest(const PeerId &peer_id, RequestType request, std::function< void(outcome::result< ResponseType >)> &&response_handler)
void writeResponse(std::shared_ptr< Stream > stream, ResponseType response)