Kagome
Polkadot Runtime Engine in C++17
rpc.hpp
Go to the documentation of this file.
1 
6 #ifndef KAGOME_RPC_HPP
7 #define KAGOME_RPC_HPP
8 
9 #include <functional>
10 #include <memory>
11 
12 #include "common/buffer.hpp"
13 #include "libp2p/basic/readwriter.hpp"
14 #include "libp2p/host/host.hpp"
15 #include "libp2p/peer/peer_info.hpp"
16 #include "libp2p/peer/protocol.hpp"
17 #include "log/logger.hpp"
18 #include "outcome/outcome.hpp"
19 
20 namespace kagome::network {
26  template <typename MessageReadWriterT>
27  struct RPC {
38  template <typename Request, typename Response>
39  static void read(std::shared_ptr<libp2p::basic::ReadWriter> read_writer,
40  std::function<outcome::result<Response>(Request)> cb,
41  std::function<void(outcome::result<void>)> error_cb) {
42  auto msg_read_writer =
43  std::make_shared<MessageReadWriterT>(std::move(read_writer));
44  msg_read_writer->template read<Request>(
45  [msg_read_writer, cb = std::move(cb), error_cb = std::move(error_cb)](
46  auto &&request_res) mutable {
47  if (!request_res) {
48  return error_cb(request_res.error());
49  }
50 
51  auto response_res = cb(std::move(request_res.value()));
52  if (!response_res) {
53  return error_cb(response_res.error());
54  }
55 
56  msg_read_writer->template write<Response>(
57  response_res.value(),
58  [error_cb = std::move(error_cb)](auto &&write_res) {
59  if (!write_res) {
60  return error_cb(write_res.error());
61  }
62  });
63  });
64  }
65 
73  template <typename Request>
74  static void read(std::shared_ptr<libp2p::basic::ReadWriter> read_writer,
75  std::function<void(outcome::result<Request>)> cb) {
76  auto msg_read_writer =
77  std::make_shared<MessageReadWriterT>(std::move(read_writer));
78  msg_read_writer->template read<Request>(
79  [msg_read_writer, cb = std::move(cb)](auto &&msg_res) mutable {
80  if (!msg_res) {
81  return cb(msg_res.error());
82  }
83 
84  cb(std::move(msg_res.value()));
85  });
86  }
87 
98  template <typename Request, typename Response>
99  static void write(libp2p::Host &host,
100  const libp2p::peer::PeerInfo &peer_info,
101  const libp2p::peer::Protocol &protocol,
102  Request request,
103  std::function<void(outcome::result<Response>)> cb) {
104  host.newStream(
105  peer_info.id,
106  {protocol},
107  [request = std::move(request),
108  cb = std::move(cb)](auto &&stream_res) mutable {
109  if (!stream_res) {
110  return cb(stream_res.error());
111  }
112 
113  auto stream_and_proto = std::move(stream_res.value());
114  auto &stream = stream_and_proto.stream;
115 
116  auto log = log::createLogger("rpc_writter", "network");
117  SL_DEBUG(log,
118  "Sending blocks request to {}",
119  stream->remotePeerId().value().toBase58());
120 
121  auto read_writer = std::make_shared<MessageReadWriterT>(stream);
122  read_writer->template write<Request>(
123  request,
124  [read_writer, stream = std::move(stream), cb = std::move(cb)](
125  auto &&write_res) mutable {
126  if (!write_res) {
127  stream->reset();
128  return cb(write_res.error());
129  }
130 
131  auto log = log::createLogger("rpc_writter", "network");
132  SL_DEBUG(log,
133  "Request to {} sent successfully",
134  stream->remotePeerId().value().toBase58());
135 
136  read_writer->template read<Response>(
137  [stream = std::move(stream),
138  cb = std::move(cb)](auto &&msg_res) {
139  if (!msg_res) {
140  stream->reset();
141  return cb(msg_res.error());
142  }
143 
144  stream->close([](auto &&) {});
145  return cb(std::move(msg_res.value()));
146  });
147  });
148  });
149  }
150 
160  template <typename Request>
161  static void write(libp2p::Host &host,
162  const libp2p::peer::PeerInfo &peer_info,
163  const libp2p::peer::Protocol &protocol,
164  Request request,
165  std::function<void(outcome::result<void>)> cb) {
166  host.newStream(peer_info.id,
167  {protocol},
168  [request = std::move(request),
169  cb = std::move(cb)](auto &&stream_res) mutable {
170  if (!stream_res) {
171  return cb(stream_res.error());
172  }
173 
174  auto stream_and_proto = std::move(stream_res.value());
175  auto read_writer = std::make_shared<MessageReadWriterT>(
176  stream_and_proto.stream);
177  read_writer->template write<Request>(
178  request,
179  [stream = std::move(stream_and_proto.stream),
180  cb = std::move(cb)](auto &&write_res) {
181  if (!write_res) {
182  stream->reset();
183  return cb(write_res.error());
184  }
185 
186  stream->close([](auto &&) {});
187  return cb(outcome::success());
188  });
189  });
190  }
191  };
192 } // namespace kagome::network
193 
194 #endif // KAGOME_RPC_HPP
static void read(std::shared_ptr< libp2p::basic::ReadWriter > read_writer, std::function< void(outcome::result< Request >)> cb)
Definition: rpc.hpp:74
libp2p::peer::PeerInfo PeerInfo
libp2p::peer::Protocol Protocol
static void read(std::shared_ptr< libp2p::basic::ReadWriter > read_writer, std::function< outcome::result< Response >(Request)> cb, std::function< void(outcome::result< void >)> error_cb)
Definition: rpc.hpp:39
static void write(libp2p::Host &host, const libp2p::peer::PeerInfo &peer_info, const libp2p::peer::Protocol &protocol, Request request, std::function< void(outcome::result< Response >)> cb)
Definition: rpc.hpp:99
Logger createLogger(const std::string &tag)
Definition: logger.cpp:112
static void write(libp2p::Host &host, const libp2p::peer::PeerInfo &peer_info, const libp2p::peer::Protocol &protocol, Request request, std::function< void(outcome::result< void >)> cb)
Definition: rpc.hpp:161