Kagome
Polkadot Runtime Engine in C++17
stream_engine.hpp
Go to the documentation of this file.
1 
6 #ifndef KAGOME_STREAM_ENGINE_HPP
7 #define KAGOME_STREAM_ENGINE_HPP
8 
9 #include <deque>
10 #include <numeric>
11 #include <optional>
12 #include <queue>
13 #include <random>
14 #include <unordered_map>
15 
16 #include "libp2p/connection/stream.hpp"
17 #include "libp2p/host/host.hpp"
18 #include "libp2p/peer/peer_info.hpp"
19 #include "libp2p/peer/protocol.hpp"
20 #include "log/logger.hpp"
27 #include "utils/safe_object.hpp"
28 
29 namespace kagome::network {
30 
40  struct StreamEngine final : std::enable_shared_from_this<StreamEngine> {
45  using StreamEnginePtr = std::shared_ptr<StreamEngine>;
46 
48  std::chrono::seconds(30);
49 
50  enum class Direction : uint8_t {
51  INCOMING = 1,
52  OUTGOING = 2,
53  BIDIRECTIONAL = 3
54  };
55 
56  template <typename Rng = std::mt19937>
58  RandomGossipStrategy(const int candidates_num, const int lucky_peers_num)
59  : candidates_num_{candidates_num} {
60  auto lucky_rate = lucky_peers_num > 0
61  ? static_cast<double>(lucky_peers_num)
62  / std::max(candidates_num_, lucky_peers_num)
63  : 1.;
64  threshold_ = gen_.max() * lucky_rate;
65  }
66  bool operator()(const PeerId &) {
67  auto res = candidates_num_ > 0 && gen_() <= threshold_;
68  return res;
69  }
70 
71  private:
72  Rng gen_;
74  typename Rng::result_type threshold_;
75  };
76 
77  private:
78  struct ProtocolDescr {
79  std::shared_ptr<ProtocolBase> protocol;
80 
81  struct {
82  std::shared_ptr<Stream> stream;
83  } incoming;
84 
85  struct {
86  std::shared_ptr<Stream> stream;
87  bool reserved = false;
88  } outgoing;
89 
90  std::deque<std::function<void(std::shared_ptr<Stream>)>>
91  deferred_messages;
92 
93  public:
94  explicit ProtocolDescr(std::shared_ptr<ProtocolBase> proto)
95  : protocol{std::move(proto)} {}
96  ProtocolDescr(std::shared_ptr<ProtocolBase> proto,
97  std::shared_ptr<Stream> incoming_stream,
98  std::shared_ptr<Stream> outgoing_stream)
99  : protocol{std::move(proto)},
100  incoming{std::move(incoming_stream)},
101  outgoing{std::move(outgoing_stream)} {}
102 
106  bool hasActiveOutgoing() const {
107  return outgoing.stream and not outgoing.stream->isClosed();
108  }
109 
114  bool reserve() {
115  if (outgoing.reserved or hasActiveOutgoing()) {
116  return false;
117  }
118 
119  outgoing.reserved = true;
120  return true;
121  }
122 
123  bool isOutgoingReserved() const {
124  return outgoing.reserved;
125  }
126 
130  void dropReserved() {
131  BOOST_ASSERT(outgoing.reserved);
132  outgoing.reserved = false;
133  }
134 
138  [[maybe_unused]] bool hasActiveIncoming() const {
139  return incoming.stream and not incoming.stream->isClosed();
140  }
141  };
142 
143  using ProtocolMap = std::map<std::shared_ptr<ProtocolBase>, ProtocolDescr>;
144  using PeerMap = std::map<PeerId, ProtocolMap>;
145 
146  public:
147  StreamEngine(const StreamEngine &) = delete;
148  StreamEngine &operator=(const StreamEngine &) = delete;
149 
150  StreamEngine(StreamEngine &&) = delete;
151  StreamEngine &operator=(StreamEngine &&) = delete;
152 
153  ~StreamEngine() = default;
154  StreamEngine(std::shared_ptr<ReputationRepository> reputation_repository)
155  : reputation_repository_(std::move(reputation_repository)),
156  logger_{log::createLogger("StreamEngine", "network")} {}
157 
158  template <typename... Args>
159  static StreamEnginePtr create(Args &&...args) {
160  return std::make_shared<StreamEngine>(std::forward<Args>(args)...);
161  }
162 
163  private:
164  outcome::result<void> add(std::shared_ptr<Stream> stream,
165  const std::shared_ptr<ProtocolBase> &protocol,
166  Direction direction) {
167  BOOST_ASSERT(protocol != nullptr);
168  BOOST_ASSERT(stream != nullptr);
169 
170  OUTCOME_TRY(peer_id, stream->remotePeerId());
171  auto dir = static_cast<uint8_t>(direction);
172  const bool is_incoming =
173  (dir & static_cast<uint8_t>(Direction::INCOMING)) != 0;
174  const bool is_outgoing =
175  (dir & static_cast<uint8_t>(Direction::OUTGOING)) != 0;
176 
177  return streams_.exclusiveAccess([&](auto &streams) {
178  bool existing = false;
179  forSubscriber(peer_id, streams, protocol, [&](auto type, auto &descr) {
180  existing = true;
181  if (is_incoming) {
182  uploadStream(
183  descr.incoming.stream, stream, protocol, Direction::INCOMING);
184  }
185  if (is_outgoing) {
186  uploadStream(
187  descr.outgoing.stream, stream, protocol, Direction::OUTGOING);
188  }
189  });
190 
191  if (not existing) {
192  auto &proto_map = streams[peer_id];
193  proto_map.emplace(protocol,
194  ProtocolDescr{protocol,
195  is_incoming ? stream : nullptr,
196  is_outgoing ? stream : nullptr});
197  SL_DEBUG(logger_,
198  "Added {} {} stream with peer {}",
199  direction == Direction::INCOMING ? "incoming"
200  : direction == Direction::OUTGOING ? "outgoing"
201  : "bidirectional",
202  protocol->protocolName(),
203  peer_id);
204  }
205  return outcome::success();
206  });
207  }
208 
209  public:
210  outcome::result<void> addIncoming(
211  std::shared_ptr<Stream> stream,
212  const std::shared_ptr<ProtocolBase> &protocol) {
213  return add(std::move(stream), protocol, Direction::INCOMING);
214  }
215 
216  outcome::result<void> addOutgoing(
217  std::shared_ptr<Stream> stream,
218  const std::shared_ptr<ProtocolBase> &protocol) {
219  return add(std::move(stream), protocol, Direction::OUTGOING);
220  }
221 
222  outcome::result<void> addBidirectional(
223  std::shared_ptr<Stream> stream,
224  const std::shared_ptr<ProtocolBase> &protocol) {
225  return add(std::move(stream), protocol, Direction::BIDIRECTIONAL);
226  }
227 
228  void reserveStreams(const PeerId &peer_id,
229  const std::shared_ptr<ProtocolBase> &protocol) {
230  BOOST_ASSERT(protocol != nullptr);
231  auto const reserved = streams_.exclusiveAccess([&](auto &streams) {
232  return streams[peer_id]
233  .emplace(protocol, ProtocolDescr{protocol})
234  .second;
235  });
236 
237  if (reserved) {
238  SL_DEBUG(logger_,
239  "Reserved {} stream with peer {}",
240  protocol->protocolName(),
241  peer_id);
242  }
243  }
244 
245  void del(const PeerId &peer_id) {
246  streams_.exclusiveAccess([&](auto &streams) {
247  if (auto it = streams.find(peer_id); it != streams.end()) {
248  for (auto &protocol_it : it->second) {
249  auto &descr = protocol_it.second;
250  if (descr.incoming.stream) {
251  descr.incoming.stream->reset();
252  }
253  if (descr.outgoing.stream) {
254  descr.outgoing.stream->reset();
255  }
256  }
257  streams.erase(it);
258  }
259  });
260  }
261 
262  bool reserveOutgoing(PeerId const &peer_id,
263  std::shared_ptr<ProtocolBase> const &protocol) {
264  BOOST_ASSERT(protocol);
265  return streams_.exclusiveAccess([&](PeerMap &streams) {
266  auto &proto_map = streams[peer_id];
267  auto [it, _] = proto_map.emplace(protocol, ProtocolDescr{protocol});
268  return it->second.reserve();
269  });
270  }
271 
272  void dropReserveOutgoing(PeerId const &peer_id,
273  std::shared_ptr<ProtocolBase> const &protocol) {
274  BOOST_ASSERT(protocol);
275  return streams_.exclusiveAccess([&](auto &streams) {
276  forSubscriber(peer_id, streams, protocol, [&](auto, auto &descr) {
277  return descr.dropReserved();
278  });
279  });
280  }
281 
282  bool isAlive(PeerId const &peer_id,
283  std::shared_ptr<ProtocolBase> const &protocol) const {
284  BOOST_ASSERT(protocol);
285  bool alive = false;
286  streams_.sharedAccess([&](auto const &streams) {
287  forSubscriber(peer_id, streams, protocol, [&](auto, auto const &descr) {
288  alive = descr.hasActiveOutgoing() || descr.hasActiveIncoming()
289  || descr.isOutgoingReserved();
290  });
291  });
292  return alive;
293  }
294 
295  template <typename T>
296  void send(const PeerId &peer_id,
297  const std::shared_ptr<ProtocolBase> &protocol,
298  std::shared_ptr<T> msg) {
299  BOOST_ASSERT(msg != nullptr);
300  BOOST_ASSERT(protocol != nullptr);
301 
302  bool was_sent = false;
303  streams_.sharedAccess([&](auto const &streams) {
305  peer_id, streams, protocol, [&](auto type, auto const &descr) {
306  if (descr.hasActiveOutgoing()) {
307  send(peer_id, protocol, descr.outgoing.stream, msg);
308  was_sent = true;
309  }
310  });
311  });
312 
313  if (not was_sent) {
314  updateStream(peer_id, protocol, msg);
315  }
316  }
317 
318  template <typename T>
319  void broadcast(
320  const std::shared_ptr<ProtocolBase> &protocol,
321  const std::shared_ptr<T> &msg,
322  const std::function<bool(const PeerId &peer_id)> &predicate) {
323  BOOST_ASSERT(msg != nullptr);
324  BOOST_ASSERT(protocol != nullptr);
325 
326  forEachPeer([&](const auto &peer_id, auto &proto_map) {
327  if (predicate(peer_id)) {
328  forProtocol(proto_map, protocol, [&](auto &descr) {
329  if (descr.hasActiveOutgoing()) {
330  send(peer_id, protocol, descr.outgoing.stream, msg);
331  } else {
332  updateStream(peer_id, protocol, descr);
333  }
334  });
335  }
336  });
337  }
338 
339  template <typename T>
340  void broadcast(const std::shared_ptr<ProtocolBase> &protocol,
341  const std::shared_ptr<T> &msg) {
342  static const std::function<bool(const PeerId &)> any =
343  [](const PeerId &) { return true; };
344  broadcast(protocol, msg, any);
345  }
346 
347  int outgoingStreamsNumber(const std::shared_ptr<ProtocolBase> &protocol) {
348  int candidates_num{0};
349  streams_.sharedAccess([&](auto const &streams) {
350  candidates_num = std::count_if(
351  streams.begin(), streams.end(), [&protocol](const auto &entry) {
352  auto &[peer_id, protocol_map] = entry;
353  return protocol_map.find(protocol) != protocol_map.end()
354  && protocol_map.at(protocol).hasActiveOutgoing();
355  });
356  });
357  return candidates_num;
358  }
359 
360  template <typename F>
361  size_t count(F &&filter) const {
362  return streams_.sharedAccess([&](auto const &streams) {
363  size_t result = 0;
364  for (auto const &i : streams) {
365  if (filter(i.first)) {
366  result += i.second.size();
367  }
368  }
369 
370  return result;
371  });
372  }
373 
374  template <typename TPeerId,
375  typename = std::enable_if<std::is_same_v<PeerId, TPeerId>>>
376  PeerInfo from(TPeerId &&peer_id) const {
377  return PeerInfo{.id = std::forward<TPeerId>(peer_id), .addresses = {}};
378  }
379 
380  outcome::result<PeerInfo> from(std::shared_ptr<Stream> &stream) const {
381  BOOST_ASSERT(stream);
382  auto peer_id_res = stream->remotePeerId();
383  if (!peer_id_res.has_value()) {
384  logger_->error("Can't get peer_id: {}", peer_id_res.error().message());
385  return peer_id_res.as_failure();
386  }
387  return from(std::move(peer_id_res.value()));
388  }
389 
390  template <typename F>
391  void forEachPeer(F &&f) {
392  streams_.exclusiveAccess([&](auto &streams) {
393  for (auto &[peer_id, protocol_map] : streams) {
394  std::forward<F>(f)(peer_id, protocol_map);
395  }
396  });
397  }
398 
399  template <typename F>
400  void forEachPeer(F &&f) const {
401  streams_.sharedAccess([&](auto const &streams) {
402  for (auto const &[peer_id, protocol_map] : streams) {
403  std::forward<F>(f)(peer_id, protocol_map);
404  }
405  });
406  }
407 
408  private:
409  void uploadStream(std::shared_ptr<Stream> &dst,
410  std::shared_ptr<Stream> const &src,
411  std::shared_ptr<ProtocolBase> const &protocol,
412  Direction direction) {
413  BOOST_ASSERT(src);
414  // Skip the same stream
415  if (dst.get() == src.get()) return;
416 
417  bool replaced = false;
418  // Reset previous stream if any
419  if (dst) {
420  if (direction == Direction::INCOMING) {
421  dst->close([](outcome::result<void>) {});
422  } else {
423  dst->reset();
424  }
425  replaced = true;
426  }
427 
428  dst = src;
429  SL_DEBUG(logger_,
430  "{} {} stream with peer {} was {}",
431  direction == Direction::BIDIRECTIONAL ? "Bidirectional"
432  : direction == Direction::INCOMING ? "Incoming"
433  : "Outgoing",
434  protocol->protocolName(),
435  dst->remotePeerId().has_value()
436  ? fmt::format("{}", dst->remotePeerId().value())
437  : "without PeerId",
438  replaced ? "replaced" : "stored");
439  }
440 
441  template <typename T>
442  void send(PeerId const &peer_id,
443  std::shared_ptr<ProtocolBase> const &protocol,
444  std::shared_ptr<Stream> stream,
445  std::shared_ptr<T> const &msg) {
446  BOOST_ASSERT(stream != nullptr);
447 
448  auto read_writer = std::make_shared<ScaleMessageReadWriter>(stream);
449  read_writer->write(
450  *msg,
451  [wp(weak_from_this()), peer_id, protocol, msg, stream](auto &&res) {
452  if (auto self = wp.lock()) {
453  if (res.has_value()) {
454  SL_TRACE(self->logger_,
455  "Message sent to {} stream with {}",
456  protocol->protocolName(),
457  peer_id);
458  } else {
459  SL_DEBUG(self->logger_,
460  "Could not send message to {} stream with {}: {}",
461  protocol->protocolName(),
462  peer_id,
463  res.error().message());
464  stream->reset();
465  }
466  }
467  });
468  }
469 
470  template <typename PM, typename F>
471  static void forProtocol(PM &proto_map,
472  const std::shared_ptr<ProtocolBase> &protocol,
473  F &&f) {
474  if (auto it = proto_map.find(protocol); it != proto_map.end()) {
475  auto &descr = it->second;
476  std::forward<F>(f)(descr);
477  }
478  }
479 
480  template <typename PM, typename F>
481  static void forSubscriber(PeerId const &peer_id,
482  PM &streams,
483  std::shared_ptr<ProtocolBase> const &protocol,
484  F &&f) {
485  if (auto it = streams.find(peer_id); it != streams.end()) {
486  forProtocol(it->second, protocol, [&](auto &descr) {
487  std::forward<F>(f)(it->second, descr);
488  });
489  }
490  }
491 
492  [[maybe_unused]] void dump(std::string_view msg) {
493  if (logger_->level() >= log::Level::DEBUG) {
494  logger_->debug("DUMP: vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv");
495  logger_->debug("DUMP: {}", msg);
496  forEachPeer([&](const auto &peer_id, auto const &proto_map) {
497  logger_->debug("DUMP: Peer {}", peer_id);
498  for (auto const &[protocol, descr] : proto_map) {
499  logger_->debug("DUMP: Protocol {}", protocol);
500  logger_->debug("DUMP: I={} O={} Messages:{}",
501  descr.incoming.stream,
502  descr.outgoing.stream,
503  descr.deferred_messages.size());
504  }
505  });
506  logger_->debug("DUMP: ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^");
507  }
508  }
509 
510  void updateStream(PeerId const &peer_id,
511  std::shared_ptr<ProtocolBase> const &protocol,
512  ProtocolDescr &descr) {
513  if (descr.reserve()) {
514  protocol->newOutgoingStream(
515  PeerInfo{peer_id, {}},
516  [wp(weak_from_this()), protocol, peer_id](
517  auto &&stream_res) mutable {
518  auto self = wp.lock();
519  if (not self) {
520  return;
521  }
522 
523  if (!stream_res) {
524  SL_DEBUG(self->logger_,
525  "Could not send message to new {} stream with {}: {}",
526  protocol->protocolName(),
527  peer_id,
528  stream_res.error().message());
529 
530  if (stream_res
531  == outcome::failure(
532  std::make_error_code(std::errc::not_connected))) {
533  self->reputation_repository_->changeForATime(
534  peer_id,
536  kDownVoteByDisconnectionExpirationTimeout);
537  return;
538  }
539 
540  self->streams_.exclusiveAccess([&](auto &streams) {
541  self->forSubscriber(
542  peer_id, streams, protocol, [&](auto, auto &descr) {
543  descr.deferred_messages.clear();
544  descr.dropReserved();
545  });
546  });
547 
548  return;
549  }
550 
551  auto &stream = stream_res.value();
552  self->streams_.exclusiveAccess([&](auto &streams) {
553  [[maybe_unused]] bool existing = false;
554  self->forSubscriber(
555  peer_id, streams, protocol, [&](auto, auto &descr) {
556  existing = true;
557  self->uploadStream(descr.outgoing.stream,
558  stream,
559  protocol,
561  descr.dropReserved();
562 
563  while (!descr.deferred_messages.empty()) {
564  auto &msg = descr.deferred_messages.front();
565  msg(stream);
566  descr.deferred_messages.pop_front();
567  }
568  });
569  BOOST_ASSERT(existing);
570  });
571  });
572  }
573  }
574 
575  template <typename T>
576  void updateStream(const PeerId &peer_id,
577  const std::shared_ptr<ProtocolBase> &protocol,
578  std::shared_ptr<T> msg) {
579  streams_.exclusiveAccess([&](auto &streams) {
580  forSubscriber(peer_id, streams, protocol, [&](auto, auto &descr) {
581  descr.deferred_messages.push_back(
582  [wp(weak_from_this()), peer_id, protocol, msg(std::move(msg))](
583  std::shared_ptr<Stream> stream) {
584  if (auto self = wp.lock()) {
585  self->send(peer_id, protocol, stream, msg);
586  }
587  });
588  updateStream(peer_id, protocol, descr);
589  });
590  });
591  }
592 
593  std::shared_ptr<ReputationRepository> reputation_repository_;
595 
597  };
598 
599 } // namespace kagome::network
600 
601 #endif // KAGOME_STREAM_ENGINE_HPP
libp2p::peer::PeerId PeerId
void send(const PeerId &peer_id, const std::shared_ptr< ProtocolBase > &protocol, std::shared_ptr< T > msg)
void del(const PeerId &peer_id)
ProtocolDescr(std::shared_ptr< ProtocolBase > proto)
SafeObject< PeerMap > streams_
StreamEngine & operator=(const StreamEngine &)=delete
bool isAlive(PeerId const &peer_id, std::shared_ptr< ProtocolBase > const &protocol) const
const ReputationChange UNEXPECTED_DISCONNECT
size_t count(F &&filter) const
void uploadStream(std::shared_ptr< Stream > &dst, std::shared_ptr< Stream > const &src, std::shared_ptr< ProtocolBase > const &protocol, Direction direction)
std::shared_ptr< StreamEngine > StreamEnginePtr
STL namespace.
libp2p::connection::Stream Stream
static constexpr auto kDownVoteByDisconnectionExpirationTimeout
std::map< std::shared_ptr< ProtocolBase >, ProtocolDescr > ProtocolMap
libp2p::peer::PeerInfo PeerInfo
bool reserveOutgoing(PeerId const &peer_id, std::shared_ptr< ProtocolBase > const &protocol)
auto sharedAccess(F &&f) const
Definition: safe_object.hpp:49
outcome::result< void > addOutgoing(std::shared_ptr< Stream > stream, const std::shared_ptr< ProtocolBase > &protocol)
int outgoingStreamsNumber(const std::shared_ptr< ProtocolBase > &protocol)
libp2p::peer::PeerInfo PeerInfo
libp2p::peer::Protocol Protocol
libp2p::peer::PeerId PeerId
auto exclusiveAccess(F &&f)
Definition: safe_object.hpp:43
RandomGossipStrategy(const int candidates_num, const int lucky_peers_num)
void send(PeerId const &peer_id, std::shared_ptr< ProtocolBase > const &protocol, std::shared_ptr< Stream > stream, std::shared_ptr< T > const &msg)
std::shared_ptr< soralog::Logger > Logger
Definition: logger.hpp:23
void broadcast(const std::shared_ptr< ProtocolBase > &protocol, const std::shared_ptr< T > &msg, const std::function< bool(const PeerId &peer_id)> &predicate)
void reserveStreams(const PeerId &peer_id, const std::shared_ptr< ProtocolBase > &protocol)
std::map< PeerId, ProtocolMap > PeerMap
std::shared_ptr< ReputationRepository > reputation_repository_
outcome::result< PeerInfo > from(std::shared_ptr< Stream > &stream) const
StreamEngine(const StreamEngine &)=delete
libp2p::peer::Protocol Protocol
void broadcast(const std::shared_ptr< ProtocolBase > &protocol, const std::shared_ptr< T > &msg)
StreamEngine(std::shared_ptr< ReputationRepository > reputation_repository)
outcome::result< void > addBidirectional(std::shared_ptr< Stream > stream, const std::shared_ptr< ProtocolBase > &protocol)
outcome::result< void > addIncoming(std::shared_ptr< Stream > stream, const std::shared_ptr< ProtocolBase > &protocol)
outcome::result< void > add(std::shared_ptr< Stream > stream, const std::shared_ptr< ProtocolBase > &protocol, Direction direction)
void updateStream(PeerId const &peer_id, std::shared_ptr< ProtocolBase > const &protocol, ProtocolDescr &descr)
static void forSubscriber(PeerId const &peer_id, PM &streams, std::shared_ptr< ProtocolBase > const &protocol, F &&f)
static void forProtocol(PM &proto_map, const std::shared_ptr< ProtocolBase > &protocol, F &&f)
void dump(std::string_view msg)
static StreamEnginePtr create(Args &&...args)
libp2p::connection::Stream Stream
std::deque< std::function< void(std::shared_ptr< Stream >)> > deferred_messages
std::shared_ptr< ProtocolBase > protocol
struct kagome::network::StreamEngine::ProtocolDescr::@10 outgoing
ProtocolDescr(std::shared_ptr< ProtocolBase > proto, std::shared_ptr< Stream > incoming_stream, std::shared_ptr< Stream > outgoing_stream)
Logger createLogger(const std::string &tag)
Definition: logger.cpp:112
void forEachPeer(F &&f) const
void updateStream(const PeerId &peer_id, const std::shared_ptr< ProtocolBase > &protocol, std::shared_ptr< T > msg)
void dropReserveOutgoing(PeerId const &peer_id, std::shared_ptr< ProtocolBase > const &protocol)
PeerInfo from(TPeerId &&peer_id) const