23 std::size_t capacity, std::chrono::seconds expiration_time)
30 for (std::size_t i = 0; i <
capacity_; ++i) {
41 cache(peer_id, request_fingerprint);
46 auto slot = found->second;
48 auto now = std::chrono::system_clock::now();
50 or now > entry->valid_till
52 cache(peer_id, request_fingerprint, slot);
58 auto &requests = entry->fingerprints;
59 if (std::count(requests.begin(), requests.end(), request_fingerprint)
63 requests.push_back(request_fingerprint);
70 std::optional<CacheRecordIndex> target_slot) {
86 boost::circular_buffer<BlocksRequest::Fingerprint> fingerprints(
88 if (target_slot and
storage_[*target_slot]) {
89 fingerprints = std::move(
storage_[*target_slot]->fingerprints);
91 fingerprints.push_back(request_fingerprint);
96 .fingerprints = std::move(fingerprints),
106 auto slot = it->second;
113 auto &record =
storage_[slot].value();
115 if (std::chrono::system_clock::now() > record.valid_till) {
130 std::shared_ptr<SyncProtocolObserver> sync_observer,
131 std::shared_ptr<ReputationRepository> reputation_repository)
152 BOOST_ASSERT(stream->remotePeerId().has_value());
159 std::function<
void(outcome::result<std::shared_ptr<Stream>>)> &&cb) {
161 "Connect for {} stream with {}",
168 [wp = weak_from_this(), peer_id = peer_info.id, cb = std::move(cb)](
169 auto &&stream_res)
mutable {
171 auto self = wp.lock();
177 if (not stream_res.has_value()) {
179 self->base_.logger(),
180 "Error happened while connection over {} stream with {}: {}",
181 self->protocolName(),
183 stream_res.error().message());
184 cb(stream_res.as_failure());
187 const auto &stream_and_proto = stream_res.value();
189 SL_DEBUG(self->base_.logger(),
190 "Established connection over {} stream with {}",
191 stream_and_proto.protocol,
194 cb(std::move(stream_and_proto.stream));
199 auto read_writer = std::make_shared<ProtobufMessageReadWriter>(stream);
202 "Read request from incoming {} stream with {}",
204 stream->remotePeerId().value());
206 read_writer->read<
BlocksRequest>([stream, wp = weak_from_this()](
207 auto &&block_request_res)
mutable {
208 auto self = wp.lock();
214 if (not block_request_res.has_value()) {
215 SL_VERBOSE(self->base_.logger(),
216 "Error at read request from incoming {} stream with {}: {}",
217 self->protocolName(),
218 stream->remotePeerId().value(),
219 block_request_res.error().message());
224 auto &block_request = block_request_res.value();
226 if (self->base_.logger()->level() >= log::Level::VERBOSE) {
227 std::string logmsg = fmt::format(
228 "Block request is received from incoming {} stream with {}",
229 self->protocolName(),
230 stream->remotePeerId().value());
232 logmsg +=
", fields=";
239 visit_in_place(block_request.from, [&](
const auto &from) {
240 logmsg += fmt::format(
", from {}", from);
246 if (block_request.max.has_value()) {
247 logmsg += fmt::format(
", max {}", block_request.max.value());
250 self->base_.logger()->verbose(std::move(logmsg));
253 auto block_response_res =
254 self->sync_observer_->onBlocksRequest(block_request);
256 if (not block_response_res) {
258 self->base_.logger(),
259 "Error at execute request from incoming {} stream with {}: {}",
260 self->protocolName(),
261 stream->remotePeerId().value(),
262 block_response_res.error().message());
267 auto &block_response = block_response_res.value();
269 if ((not block_response.blocks.empty()) and stream->remotePeerId()
270 and
self->response_cache_.isDuplicate(stream->remotePeerId().value(),
272 auto peer_id = stream->remotePeerId().value();
273 SL_DEBUG(self->base_.logger(),
274 "Stream {} to {} reset due to repeating non-polite block " 275 "request with fingerprint {}",
276 self->protocolName(),
278 block_request.fingerprint());
279 self->reputation_repository_->changeForATime(
287 self->writeResponse(std::move(stream), block_response);
293 auto read_writer = std::make_shared<ProtobufMessageReadWriter>(stream);
297 [stream = std::move(stream),
298 wp = weak_from_this()](
auto &&write_res)
mutable {
299 auto self = wp.lock();
305 if (not write_res.has_value()) {
307 self->base_.logger(),
308 "Error at writing response to incoming {} stream with {}: {}",
309 self->protocolName(),
310 stream->remotePeerId().value(),
311 write_res.error().message());
316 stream->close([](
auto &&...) {});
321 std::shared_ptr<Stream> stream,
323 std::function<
void(outcome::result<void>)> &&cb) {
324 auto read_writer = std::make_shared<ProtobufMessageReadWriter>(stream);
327 "Write request info outgoing {} stream with {}",
329 stream->remotePeerId().value());
333 [stream, wp = weak_from_this(), cb = std::move(cb)](
334 auto &&write_res)
mutable {
335 auto self = wp.lock();
342 if (not write_res.has_value()) {
344 self->base_.logger(),
345 "Error at write request into outgoing {} stream with {}: {}",
346 self->protocolName(),
347 stream->remotePeerId().value(),
348 write_res.error().message());
351 cb(write_res.as_failure());
355 SL_DEBUG(self->base_.logger(),
356 "Request written successful into outgoing {} stream with {}",
357 self->protocolName(),
358 stream->remotePeerId().value());
360 cb(outcome::success());
365 std::shared_ptr<Stream> stream,
366 std::function<
void(outcome::result<BlocksResponse>)> &&response_handler) {
367 auto read_writer = std::make_shared<ProtobufMessageReadWriter>(stream);
370 "Read response from outgoing {} stream with {}",
372 stream->remotePeerId().value());
375 wp = weak_from_this(),
377 std::move(response_handler)](
378 auto &&block_response_res)
mutable {
379 auto self = wp.lock();
386 if (not block_response_res.has_value()) {
387 SL_VERBOSE(self->base_.logger(),
388 "Error at read response from outgoing {} stream with {}: {}",
389 self->protocolName(),
390 stream->remotePeerId().value(),
391 block_response_res.error().message());
394 response_handler(block_response_res.as_failure());
397 auto &blocks_response = block_response_res.value();
399 SL_DEBUG(self->base_.logger(),
400 "Successful response read from outgoing {} stream with {}",
401 self->protocolName(),
402 stream->remotePeerId().value());
405 response_handler(std::move(blocks_response));
412 std::function<
void(outcome::result<BlocksResponse>)> &&response_handler) {
414 base_.
host().getPeerRepository().getAddressRepository().getAddresses(
416 if (not addresses_res.has_value()) {
417 response_handler(addresses_res.as_failure());
421 if (
base_.
logger()->level() >= log::Level::DEBUG) {
422 std::string logmsg =
"Requesting blocks: fields=";
430 visit_in_place(block_request.
from, [&](
const auto &from) {
431 logmsg += fmt::format(
" from {}", from);
437 if (block_request.
max.has_value()) {
438 logmsg += fmt::format(
", max {}", block_request.
max.value());
445 {peer_id, addresses_res.value()},
446 [wp = weak_from_this(),
447 response_handler = std::move(response_handler),
448 block_request = std::move(block_request)](
auto &&stream_res)
mutable {
449 if (not stream_res.has_value()) {
450 response_handler(stream_res.as_failure());
453 auto &stream = stream_res.value();
455 auto self = wp.lock();
462 SL_DEBUG(self->base_.logger(),
463 "Established outgoing {} stream with {}",
464 self->protocolName(),
465 stream->remotePeerId().value());
467 self->writeRequest(stream,
468 std::move(block_request),
471 response_handler = std::move(response_handler)](
472 auto &&write_res)
mutable {
473 auto self = wp.lock();
480 if (not write_res.has_value()) {
482 response_handler(write_res.as_failure());
486 self->readResponse(std::move(stream),
487 std::move(response_handler));
Include block message queue.
Include a justification for the block.
std::unordered_map< PeerId, CacheRecordIndex > lookup_table_
BlocksResponseCache(std::size_t capacity, std::chrono::seconds expiration_time)
void cache(const PeerId &peer_id, BlocksRequest::Fingerprint request_fingerprint, std::optional< CacheRecordIndex > target_slot=std::nullopt)
void writeRequest(std::shared_ptr< Stream > stream, BlocksRequest block_request, std::function< void(outcome::result< void >)> &&cb)
void readResponse(std::shared_ptr< Stream > stream, std::function< void(outcome::result< BlocksResponse >)> &&response_handler)
void readRequest(std::shared_ptr< Stream > stream)
BlockAttributes fields
bits, showing, which parts of BlockData to return
std::size_t CacheRecordIndex
void newOutgoingStream(const PeerInfo &peer_info, std::function< void(outcome::result< std::shared_ptr< Stream >>)> &&cb) override
log::Logger const & logger() const
const ReputationChange DUPLICATE_BLOCK_REQUEST
void streamReadBuffer(libp2p::StreamAndProtocol &result)
libp2p::peer::PeerInfo PeerInfo
bool isDuplicate(const PeerId &peer_id, BlocksRequest::Fingerprint request_fingerprint)
libp2p::peer::PeerId PeerId
SyncProtocolImpl(libp2p::Host &host, const application::ChainSpec &chain_spec, std::shared_ptr< SyncProtocolObserver > sync_observer, std::shared_ptr< ReputationRepository > reputation_repository)
virtual const std::string & protocolId() const =0
std::vector< std::optional< CacheRecord > > storage_
std::shared_ptr< SyncProtocolObserver > sync_observer_
detail::BlocksResponseCache response_cache_
std::shared_ptr< ReputationRepository > reputation_repository_
const std::chrono::seconds expiration_time_
const std::size_t capacity_
Protocols const & protocolIds() const
primitives::BlockId from
start from this block
std::unordered_set< CacheRecordIndex > free_slots_
static constexpr auto kResponsesCacheExpirationTimeout
void request(const PeerId &peer_id, BlocksRequest block_request, std::function< void(outcome::result< BlocksResponse >)> &&response_handler) override
Make async request to peer and return response in callback.
void purge()
removes all stale records
const std::string & protocolName() const override
bool start(std::weak_ptr< T > wptr)
void writeResponse(std::shared_ptr< Stream > stream, const BlocksResponse &block_response)
static constexpr auto kMaxCacheEntriesPerPeer
static constexpr auto kResponsesCacheCapacity
Fingerprint fingerprint() const
std::optional< uint32_t > max
const libp2p::peer::Protocol kSyncProtocol
void onIncomingStream(std::shared_ptr< Stream > stream) override
Direction direction
sequence direction