Kagome
Polkadot Runtime Engine in C++17
sync_protocol_impl.hpp
Go to the documentation of this file.
1 
6 #ifndef KAGOME_NETWORK_SYNCPROTOCOLIMPL
7 #define KAGOME_NETWORK_SYNCPROTOCOLIMPL
8 
10 
11 #include <chrono>
12 #include <memory>
13 #include <optional>
14 #include <unordered_map>
15 #include <unordered_set>
16 
17 #include <boost/circular_buffer.hpp>
18 #include <libp2p/connection/stream.hpp>
19 #include <libp2p/host/host.hpp>
21 #include "log/logger.hpp"
25 #include "utils/non_copyable.hpp"
26 
27 namespace kagome::network {
28 
29  static constexpr auto kResponsesCacheCapacity = 500;
30  static constexpr auto kResponsesCacheExpirationTimeout =
31  std::chrono::seconds(30);
32  static constexpr auto kMaxCacheEntriesPerPeer = 5;
33 
34  namespace detail {
35 
41  public:
47  BlocksResponseCache(std::size_t capacity,
48  std::chrono::seconds expiration_time);
49 
63  bool isDuplicate(const PeerId &peer_id,
64  BlocksRequest::Fingerprint request_fingerprint);
65 
66  private:
67  using ExpirationTimepoint =
68  std::chrono::time_point<std::chrono::system_clock>;
69  using CacheRecordIndex = std::size_t;
70 
78  void cache(const PeerId &peer_id,
79  BlocksRequest::Fingerprint request_fingerprint,
80  std::optional<CacheRecordIndex> target_slot = std::nullopt);
81 
83  void purge();
84 
85  struct CacheRecord {
88  boost::circular_buffer<BlocksRequest::Fingerprint> fingerprints{
89  kMaxCacheEntriesPerPeer};
90  };
91 
92  const std::size_t capacity_;
93  const std::chrono::seconds expiration_time_;
94  std::unordered_map<PeerId, CacheRecordIndex> lookup_table_;
95  std::vector<std::optional<CacheRecord>> storage_;
96  std::unordered_set<CacheRecordIndex> free_slots_;
97  };
98 
99  } // namespace detail
100 
101  class SyncProtocolImpl final
102  : public SyncProtocol,
103  public std::enable_shared_from_this<SyncProtocolImpl>,
104  NonCopyable,
105  NonMovable {
106  public:
108  libp2p::Host &host,
109  const application::ChainSpec &chain_spec,
110  std::shared_ptr<SyncProtocolObserver> sync_observer,
111  std::shared_ptr<ReputationRepository> reputation_repository);
112 
113  bool start() override;
114  bool stop() override;
115 
116  const std::string &protocolName() const override {
117  return kSyncProtocolName;
118  }
119 
120  void onIncomingStream(std::shared_ptr<Stream> stream) override;
121  void newOutgoingStream(
122  const PeerInfo &peer_info,
123  std::function<void(outcome::result<std::shared_ptr<Stream>>)> &&cb)
124  override;
125 
126  void request(const PeerId &peer_id,
127  BlocksRequest block_request,
128  std::function<void(outcome::result<BlocksResponse>)>
129  &&response_handler) override;
130 
131  void readRequest(std::shared_ptr<Stream> stream);
132 
133  void writeResponse(std::shared_ptr<Stream> stream,
134  const BlocksResponse &block_response);
135 
136  void writeRequest(std::shared_ptr<Stream> stream,
137  BlocksRequest block_request,
138  std::function<void(outcome::result<void>)> &&cb);
139 
140  void readResponse(std::shared_ptr<Stream> stream,
141  std::function<void(outcome::result<BlocksResponse>)>
142  &&response_handler);
143 
144  private:
145  const static inline auto kSyncProtocolName = "SyncProtocol"s;
147  std::shared_ptr<SyncProtocolObserver> sync_observer_;
148  std::shared_ptr<ReputationRepository> reputation_repository_;
150  };
151 
152 } // namespace kagome::network
153 
154 #endif // KAGOME_NETWORK_SYNCPROTOCOLIMPL
std::unordered_map< PeerId, CacheRecordIndex > lookup_table_
Class for communication via /{chainType}/sync/2 according to sync protocol specification https://spec...
BlocksResponseCache(std::size_t capacity, std::chrono::seconds expiration_time)
void cache(const PeerId &peer_id, BlocksRequest::Fingerprint request_fingerprint, std::optional< CacheRecordIndex > target_slot=std::nullopt)
libp2p::peer::PeerInfo PeerInfo
bool isDuplicate(const PeerId &peer_id, BlocksRequest::Fingerprint request_fingerprint)
libp2p::peer::PeerId PeerId
std::vector< std::optional< CacheRecord > > storage_
std::shared_ptr< SyncProtocolObserver > sync_observer_
detail::BlocksResponseCache response_cache_
std::shared_ptr< ReputationRepository > reputation_repository_
boost::circular_buffer< BlocksRequest::Fingerprint > fingerprints
std::chrono::time_point< std::chrono::system_clock > ExpirationTimepoint
std::unordered_set< CacheRecordIndex > free_slots_
static constexpr auto kResponsesCacheExpirationTimeout
const std::string & protocolName() const override
static constexpr auto kMaxCacheEntriesPerPeer
static constexpr auto kResponsesCacheCapacity