Kagome
Polkadot Runtime Engine in C++17
stream_read_buffer.hpp
Go to the documentation of this file.
1 
6 #ifndef KAGOME_NETWORK_HELPERS_STREAM_READ_BUFFER_HPP
7 #define KAGOME_NETWORK_HELPERS_STREAM_READ_BUFFER_HPP
8 
10 
11 namespace libp2p::connection {
20  std::enable_shared_from_this<StreamReadBuffer> {
21  std::shared_ptr<std::vector<uint8_t>> buffer;
22  size_t begin{};
23  size_t end{};
24 
25  StreamReadBuffer(std::shared_ptr<Stream> stream, size_t capacity)
26  : StreamProxyBase{std::move(stream)},
27  buffer{std::make_shared<std::vector<uint8_t>>(capacity)} {}
28 
32  size_t size() const {
33  return end - begin;
34  }
35 
40  void readFull(gsl::span<uint8_t> out, size_t total, ReadCallbackFunc cb) {
41  // read some bytes
42  readSome(out,
43  out.size(),
44  [weak{weak_from_this()}, out, total, cb{std::move(cb)}](
45  outcome::result<size_t> _r) mutable {
46  if (auto self{weak.lock()}) {
47  if (_r.has_error()) {
48  return cb(_r.error());
49  }
50  const auto &r = _r.value();
51  const auto _r{gsl::narrow<ptrdiff_t>(r)};
52  BOOST_ASSERT(_r <= out.size());
53  if (_r == out.size()) {
54  // successfully read last bytes
55  return cb(total);
56  }
57  // read remaining bytes
58  self->readFull(out.subspan(r), total, std::move(cb));
59  }
60  });
61  }
62 
63  void read(gsl::span<uint8_t> out, size_t n, ReadCallbackFunc cb) override {
64  BOOST_ASSERT(out.size() >= gsl::narrow<ptrdiff_t>(n));
65  out = out.first(n);
66  readFull(out, n, std::move(cb));
67  }
68 
72  size_t readFromBuffer(gsl::span<uint8_t> out) {
73  // can't read more bytes than available
74  auto n = std::min(gsl::narrow<size_t>(out.size()), size());
75  BOOST_ASSERT(n != 0);
76  // copy bytes from buffer
77  std::copy_n(buffer->begin() + begin, n, out.begin());
78  // consume buffer bytes
79  begin += n;
80  return n;
81  }
82 
83  void readSome(gsl::span<uint8_t> out,
84  size_t n,
85  ReadCallbackFunc cb) override {
86  BOOST_ASSERT(out.size() >= gsl::narrow<ptrdiff_t>(n));
87  out = out.first(n);
88  if (out.empty()) {
89  return cb(out.size());
90  }
91  if (size() != 0) {
92  // read available buffer bytes
93  return cb(readFromBuffer(out));
94  }
95  // read to fill buffer
96  stream->readSome(
97  *buffer,
98  buffer->size(),
99  [weak{weak_from_this()}, out, cb{std::move(cb)}, buffer{buffer}](
100  outcome::result<size_t> _r) mutable {
101  if (auto self{weak.lock()}) {
102  if (_r.has_error()) {
103  return cb(_r.error());
104  }
105  const auto &r = _r.value();
106  // fill buffer
107  self->begin = 0;
108  self->end = r;
109  // read available buffer bytes
110  cb(self->readFromBuffer(out));
111  }
112  });
113  }
114  };
115 } // namespace libp2p::connection
116 
117 namespace kagome::network {
122  inline void streamReadBuffer(libp2p::StreamAndProtocol &result) {
123  constexpr size_t kBuffer{1 << 16};
124  result.stream = std::make_shared<libp2p::connection::StreamReadBuffer>(
125  std::move(result.stream), kBuffer);
126  }
127 
132  inline void streamReadBuffer(libp2p::StreamAndProtocolOrError &result) {
133  if (result.has_value()) {
134  streamReadBuffer(result.value());
135  }
136  }
137 } // namespace kagome::network
138 
139 #endif // KAGOME_NETWORK_HELPERS_STREAM_READ_BUFFER_HPP
void streamReadBuffer(libp2p::StreamAndProtocolOrError &result)
void readSome(gsl::span< uint8_t > out, size_t n, ReadCallbackFunc cb) override
std::shared_ptr< std::vector< uint8_t > > buffer
void read(gsl::span< uint8_t > out, size_t n, ReadCallbackFunc cb) override
StreamReadBuffer(std::shared_ptr< Stream > stream, size_t capacity)
void readFull(gsl::span< uint8_t > out, size_t total, ReadCallbackFunc cb)
size_t readFromBuffer(gsl::span< uint8_t > out)