6 #ifndef KAGOME_STREAM_ENGINE_HPP 7 #define KAGOME_STREAM_ENGINE_HPP 14 #include <unordered_map> 16 #include "libp2p/connection/stream.hpp" 17 #include "libp2p/host/host.hpp" 18 #include "libp2p/peer/peer_info.hpp" 19 #include "libp2p/peer/protocol.hpp" 40 struct StreamEngine final : std::enable_shared_from_this<StreamEngine> {
48 std::chrono::seconds(30);
56 template <
typename Rng = std::mt19937>
59 : candidates_num_{candidates_num} {
60 auto lucky_rate = lucky_peers_num > 0
61 ?
static_cast<double>(lucky_peers_num)
62 / std::max(candidates_num_, lucky_peers_num)
64 threshold_ = gen_.max() * lucky_rate;
67 auto res = candidates_num_ > 0 && gen_() <= threshold_;
82 std::shared_ptr<Stream> stream;
86 std::shared_ptr<Stream> stream;
87 bool reserved =
false;
90 std::deque<std::function<void(std::shared_ptr<Stream>)>>
95 : protocol{std::move(proto)} {}
97 std::shared_ptr<Stream> incoming_stream,
98 std::shared_ptr<Stream> outgoing_stream)
99 : protocol{std::move(proto)},
100 incoming{std::move(incoming_stream)},
101 outgoing{std::move(outgoing_stream)} {}
107 return outgoing.stream and not outgoing.stream->isClosed();
115 if (outgoing.reserved or hasActiveOutgoing()) {
119 outgoing.reserved =
true;
124 return outgoing.reserved;
131 BOOST_ASSERT(outgoing.reserved);
132 outgoing.reserved =
false;
139 return incoming.stream and not incoming.stream->isClosed();
144 using PeerMap = std::map<PeerId, ProtocolMap>;
154 StreamEngine(std::shared_ptr<ReputationRepository> reputation_repository)
158 template <
typename... Args>
160 return std::make_shared<StreamEngine>(std::forward<Args>(args)...);
164 outcome::result<void>
add(std::shared_ptr<Stream> stream,
165 const std::shared_ptr<ProtocolBase> &protocol,
167 BOOST_ASSERT(protocol !=
nullptr);
168 BOOST_ASSERT(stream !=
nullptr);
170 OUTCOME_TRY(peer_id, stream->remotePeerId());
171 auto dir =
static_cast<uint8_t
>(direction);
172 const bool is_incoming =
174 const bool is_outgoing =
178 bool existing =
false;
179 forSubscriber(peer_id, streams, protocol, [&](
auto type,
auto &descr) {
192 auto &proto_map = streams[peer_id];
193 proto_map.emplace(protocol,
194 ProtocolDescr{protocol,
195 is_incoming ? stream :
nullptr,
196 is_outgoing ? stream :
nullptr});
198 "Added {} {} stream with peer {}",
202 protocol->protocolName(),
205 return outcome::success();
211 std::shared_ptr<Stream> stream,
212 const std::shared_ptr<ProtocolBase> &protocol) {
217 std::shared_ptr<Stream> stream,
218 const std::shared_ptr<ProtocolBase> &protocol) {
223 std::shared_ptr<Stream> stream,
224 const std::shared_ptr<ProtocolBase> &protocol) {
229 const std::shared_ptr<ProtocolBase> &protocol) {
230 BOOST_ASSERT(protocol !=
nullptr);
232 return streams[peer_id]
233 .emplace(protocol, ProtocolDescr{protocol})
239 "Reserved {} stream with peer {}",
240 protocol->protocolName(),
247 if (
auto it = streams.find(peer_id); it != streams.end()) {
248 for (
auto &protocol_it : it->second) {
249 auto &descr = protocol_it.second;
250 if (descr.incoming.stream) {
251 descr.incoming.stream->reset();
253 if (descr.outgoing.stream) {
254 descr.outgoing.stream->reset();
263 std::shared_ptr<ProtocolBase>
const &protocol) {
264 BOOST_ASSERT(protocol);
266 auto &proto_map = streams[peer_id];
267 auto [it, _] = proto_map.emplace(protocol, ProtocolDescr{protocol});
268 return it->second.reserve();
273 std::shared_ptr<ProtocolBase>
const &protocol) {
274 BOOST_ASSERT(protocol);
276 forSubscriber(peer_id, streams, protocol, [&](
auto,
auto &descr) {
277 return descr.dropReserved();
283 std::shared_ptr<ProtocolBase>
const &protocol)
const {
284 BOOST_ASSERT(protocol);
287 forSubscriber(peer_id, streams, protocol, [&](
auto,
auto const &descr) {
288 alive = descr.hasActiveOutgoing() || descr.hasActiveIncoming()
289 || descr.isOutgoingReserved();
295 template <
typename T>
297 const std::shared_ptr<ProtocolBase> &protocol,
298 std::shared_ptr<T> msg) {
299 BOOST_ASSERT(msg !=
nullptr);
300 BOOST_ASSERT(protocol !=
nullptr);
302 bool was_sent =
false;
305 peer_id, streams, protocol, [&](
auto type,
auto const &descr) {
306 if (descr.hasActiveOutgoing()) {
307 send(peer_id, protocol, descr.outgoing.stream, msg);
318 template <
typename T>
320 const std::shared_ptr<ProtocolBase> &protocol,
321 const std::shared_ptr<T> &msg,
322 const std::function<
bool(
const PeerId &peer_id)> &predicate) {
323 BOOST_ASSERT(msg !=
nullptr);
324 BOOST_ASSERT(protocol !=
nullptr);
326 forEachPeer([&](
const auto &peer_id,
auto &proto_map) {
327 if (predicate(peer_id)) {
328 forProtocol(proto_map, protocol, [&](
auto &descr) {
329 if (descr.hasActiveOutgoing()) {
330 send(peer_id, protocol, descr.outgoing.stream, msg);
339 template <
typename T>
340 void broadcast(
const std::shared_ptr<ProtocolBase> &protocol,
341 const std::shared_ptr<T> &msg) {
342 static const std::function<bool(const PeerId &)> any =
343 [](
const PeerId &) {
return true; };
348 int candidates_num{0};
350 candidates_num = std::count_if(
351 streams.begin(), streams.end(), [&protocol](
const auto &entry) {
352 auto &[peer_id, protocol_map] = entry;
353 return protocol_map.find(protocol) != protocol_map.end()
354 && protocol_map.at(protocol).hasActiveOutgoing();
357 return candidates_num;
360 template <
typename F>
364 for (
auto const &i : streams) {
365 if (filter(i.first)) {
366 result += i.second.size();
374 template <
typename TPeerId,
375 typename = std::enable_if<std::is_same_v<PeerId, TPeerId>>>
377 return PeerInfo{.id = std::forward<TPeerId>(peer_id), .addresses = {}};
380 outcome::result<PeerInfo>
from(std::shared_ptr<Stream> &stream)
const {
381 BOOST_ASSERT(stream);
382 auto peer_id_res = stream->remotePeerId();
383 if (!peer_id_res.has_value()) {
384 logger_->error(
"Can't get peer_id: {}", peer_id_res.error().message());
385 return peer_id_res.as_failure();
387 return from(std::move(peer_id_res.value()));
390 template <
typename F>
393 for (
auto &[peer_id, protocol_map] : streams) {
394 std::forward<F>(f)(peer_id, protocol_map);
399 template <
typename F>
402 for (
auto const &[peer_id, protocol_map] : streams) {
403 std::forward<F>(f)(peer_id, protocol_map);
410 std::shared_ptr<Stream>
const &src,
411 std::shared_ptr<ProtocolBase>
const &protocol,
415 if (dst.get() == src.get())
return;
417 bool replaced =
false;
421 dst->close([](outcome::result<void>) {});
430 "{} {} stream with peer {} was {}",
434 protocol->protocolName(),
435 dst->remotePeerId().has_value()
436 ? fmt::format(
"{}", dst->remotePeerId().value())
438 replaced ?
"replaced" :
"stored");
441 template <
typename T>
443 std::shared_ptr<ProtocolBase>
const &protocol,
444 std::shared_ptr<Stream> stream,
445 std::shared_ptr<T>
const &msg) {
446 BOOST_ASSERT(stream !=
nullptr);
448 auto read_writer = std::make_shared<ScaleMessageReadWriter>(stream);
451 [wp(weak_from_this()), peer_id, protocol, msg, stream](
auto &&res) {
452 if (
auto self = wp.lock()) {
453 if (res.has_value()) {
454 SL_TRACE(self->logger_,
455 "Message sent to {} stream with {}",
456 protocol->protocolName(),
459 SL_DEBUG(self->logger_,
460 "Could not send message to {} stream with {}: {}",
461 protocol->protocolName(),
463 res.error().message());
470 template <
typename PM,
typename F>
472 const std::shared_ptr<ProtocolBase> &protocol,
474 if (
auto it = proto_map.find(protocol); it != proto_map.end()) {
475 auto &descr = it->second;
476 std::forward<F>(f)(descr);
480 template <
typename PM,
typename F>
483 std::shared_ptr<ProtocolBase>
const &protocol,
485 if (
auto it = streams.find(peer_id); it != streams.end()) {
486 forProtocol(it->second, protocol, [&](
auto &descr) {
487 std::forward<F>(f)(it->second, descr);
492 [[maybe_unused]]
void dump(std::string_view msg) {
493 if (
logger_->level() >= log::Level::DEBUG) {
494 logger_->debug(
"DUMP: vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv");
495 logger_->debug(
"DUMP: {}", msg);
496 forEachPeer([&](
const auto &peer_id,
auto const &proto_map) {
497 logger_->debug(
"DUMP: Peer {}", peer_id);
498 for (
auto const &[protocol, descr] : proto_map) {
499 logger_->debug(
"DUMP: Protocol {}", protocol);
500 logger_->debug(
"DUMP: I={} O={} Messages:{}",
501 descr.incoming.stream,
502 descr.outgoing.stream,
503 descr.deferred_messages.size());
506 logger_->debug(
"DUMP: ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^");
511 std::shared_ptr<ProtocolBase>
const &protocol,
512 ProtocolDescr &descr) {
514 protocol->newOutgoingStream(
516 [wp(weak_from_this()), protocol, peer_id](
517 auto &&stream_res)
mutable {
518 auto self = wp.lock();
524 SL_DEBUG(self->logger_,
525 "Could not send message to new {} stream with {}: {}",
526 protocol->protocolName(),
528 stream_res.error().message());
532 std::make_error_code(std::errc::not_connected))) {
533 self->reputation_repository_->changeForATime(
536 kDownVoteByDisconnectionExpirationTimeout);
540 self->streams_.exclusiveAccess([&](
auto &streams) {
542 peer_id, streams, protocol, [&](
auto,
auto &descr) {
551 auto &stream = stream_res.value();
552 self->streams_.exclusiveAccess([&](
auto &streams) {
553 [[maybe_unused]]
bool existing =
false;
555 peer_id, streams, protocol, [&](
auto,
auto &descr) {
557 self->uploadStream(descr.
outgoing.stream,
569 BOOST_ASSERT(existing);
575 template <
typename T>
577 const std::shared_ptr<ProtocolBase> &protocol,
578 std::shared_ptr<T> msg) {
580 forSubscriber(peer_id, streams, protocol, [&](
auto,
auto &descr) {
581 descr.deferred_messages.push_back(
582 [wp(weak_from_this()), peer_id, protocol, msg(std::move(msg))](
583 std::shared_ptr<Stream> stream) {
584 if (
auto self = wp.lock()) {
585 self->send(peer_id, protocol, stream, msg);
601 #endif // KAGOME_STREAM_ENGINE_HPP libp2p::peer::PeerId PeerId
bool hasActiveOutgoing() const
void send(const PeerId &peer_id, const std::shared_ptr< ProtocolBase > &protocol, std::shared_ptr< T > msg)
void del(const PeerId &peer_id)
ProtocolDescr(std::shared_ptr< ProtocolBase > proto)
SafeObject< PeerMap > streams_
StreamEngine & operator=(const StreamEngine &)=delete
bool operator()(const PeerId &)
bool isAlive(PeerId const &peer_id, std::shared_ptr< ProtocolBase > const &protocol) const
const ReputationChange UNEXPECTED_DISCONNECT
size_t count(F &&filter) const
void uploadStream(std::shared_ptr< Stream > &dst, std::shared_ptr< Stream > const &src, std::shared_ptr< ProtocolBase > const &protocol, Direction direction)
std::shared_ptr< StreamEngine > StreamEnginePtr
libp2p::connection::Stream Stream
static constexpr auto kDownVoteByDisconnectionExpirationTimeout
std::map< std::shared_ptr< ProtocolBase >, ProtocolDescr > ProtocolMap
bool hasActiveIncoming() const
libp2p::peer::PeerInfo PeerInfo
bool reserveOutgoing(PeerId const &peer_id, std::shared_ptr< ProtocolBase > const &protocol)
auto sharedAccess(F &&f) const
outcome::result< void > addOutgoing(std::shared_ptr< Stream > stream, const std::shared_ptr< ProtocolBase > &protocol)
int outgoingStreamsNumber(const std::shared_ptr< ProtocolBase > &protocol)
libp2p::peer::PeerInfo PeerInfo
libp2p::peer::Protocol Protocol
libp2p::peer::PeerId PeerId
auto exclusiveAccess(F &&f)
RandomGossipStrategy(const int candidates_num, const int lucky_peers_num)
void send(PeerId const &peer_id, std::shared_ptr< ProtocolBase > const &protocol, std::shared_ptr< Stream > stream, std::shared_ptr< T > const &msg)
Rng::result_type threshold_
std::shared_ptr< soralog::Logger > Logger
void broadcast(const std::shared_ptr< ProtocolBase > &protocol, const std::shared_ptr< T > &msg, const std::function< bool(const PeerId &peer_id)> &predicate)
void reserveStreams(const PeerId &peer_id, const std::shared_ptr< ProtocolBase > &protocol)
std::map< PeerId, ProtocolMap > PeerMap
std::shared_ptr< ReputationRepository > reputation_repository_
outcome::result< PeerInfo > from(std::shared_ptr< Stream > &stream) const
StreamEngine(const StreamEngine &)=delete
libp2p::peer::Protocol Protocol
void broadcast(const std::shared_ptr< ProtocolBase > &protocol, const std::shared_ptr< T > &msg)
StreamEngine(std::shared_ptr< ReputationRepository > reputation_repository)
outcome::result< void > addBidirectional(std::shared_ptr< Stream > stream, const std::shared_ptr< ProtocolBase > &protocol)
outcome::result< void > addIncoming(std::shared_ptr< Stream > stream, const std::shared_ptr< ProtocolBase > &protocol)
outcome::result< void > add(std::shared_ptr< Stream > stream, const std::shared_ptr< ProtocolBase > &protocol, Direction direction)
void updateStream(PeerId const &peer_id, std::shared_ptr< ProtocolBase > const &protocol, ProtocolDescr &descr)
static void forSubscriber(PeerId const &peer_id, PM &streams, std::shared_ptr< ProtocolBase > const &protocol, F &&f)
static void forProtocol(PM &proto_map, const std::shared_ptr< ProtocolBase > &protocol, F &&f)
void dump(std::string_view msg)
static StreamEnginePtr create(Args &&...args)
libp2p::connection::Stream Stream
bool isOutgoingReserved() const
std::deque< std::function< void(std::shared_ptr< Stream >)> > deferred_messages
std::shared_ptr< ProtocolBase > protocol
struct kagome::network::StreamEngine::ProtocolDescr::@10 outgoing
ProtocolDescr(std::shared_ptr< ProtocolBase > proto, std::shared_ptr< Stream > incoming_stream, std::shared_ptr< Stream > outgoing_stream)
Logger createLogger(const std::string &tag)
void forEachPeer(F &&f) const
void updateStream(const PeerId &peer_id, const std::shared_ptr< ProtocolBase > &protocol, std::shared_ptr< T > msg)
void dropReserveOutgoing(PeerId const &peer_id, std::shared_ptr< ProtocolBase > const &protocol)
PeerInfo from(TPeerId &&peer_id) const