Kagome
Polkadot Runtime Engine in C++17
message_pool.cpp
Go to the documentation of this file.
1 
7 
8 #include <mutex> // for std::lock_guard
9 
10 #include <boost/assert.hpp>
11 
12 namespace kagome::telemetry {
13  MessagePool::MessagePool(std::size_t entry_size_bytes,
14  std::size_t entries_count)
15  : entry_size_{entry_size_bytes}, entries_count_{entries_count} {
16  // preallocate all the buffers
17  pool_.resize(entries_count);
18  for (auto &entry : pool_) {
19  entry.data.resize(entry_size_bytes, '\0');
20  }
21  free_slots_.reserve(entries_count);
22  for (size_t i = 0; i < entries_count; ++i) {
23  free_slots_.emplace(i);
24  }
25  }
26 
27  std::optional<MessageHandle> MessagePool::push(const std::string &message,
28  int16_t ref_count) {
29  bool message_exceeds_max_size = message.length() > entry_size_;
30  if (message_exceeds_max_size) {
31  return std::nullopt;
32  }
33  if (ref_count <= 0) {
34  return std::nullopt;
35  }
36  auto has_free_slot = nextFreeSlot(); // quick blocking lookup
37  if (not has_free_slot) {
38  return std::nullopt;
39  }
40 
41  auto slot = has_free_slot.value();
42  auto &entry = pool_[slot];
43  entry.ref_count = ref_count;
44  entry.data_size = message.length();
45  memcpy(entry.data.data(), message.c_str(), message.length());
46  // presence of zero-terminating char is preserved by the way of memory
47  // (re-)initialization in constructor and release method, and also a
48  // boundary check here - at the first line of push method
49  return slot;
50  }
51 
53  bool handle_is_valid =
54  (handle < pool_.size()) and (free_slots_.count(handle) == 0);
55  if (not handle_is_valid) {
56  return 0; // zero references for bad handle
57  }
58  std::lock_guard lock(mutex_);
59  // allowed to call only over already occupied slots
60  BOOST_ASSERT(free_slots_.count(handle) == 0);
61  auto &entry = pool_[handle];
62  return ++entry.ref_count;
63  }
64 
66  bool handle_is_valid =
67  (handle < pool_.size()) and (free_slots_.count(handle) == 0);
68  if (not handle_is_valid) {
69  return 0; // zero references for bad handle
70  }
71  std::lock_guard lock(mutex_);
72  auto &entry = pool_[handle];
73  if (entry.ref_count > 0 and --entry.ref_count == 0) {
74  memset(entry.data.data(), '\0', entry.data.size());
75  entry.data_size = 0;
76  free_slots_.emplace(handle);
77  }
78  return entry.ref_count;
79  }
80 
81  boost::asio::mutable_buffer MessagePool::operator[](
82  MessageHandle handle) const {
83  bool handle_is_valid =
84  (handle < pool_.size()) and (free_slots_.count(handle) == 0);
85  if (not handle_is_valid) {
86  throw std::runtime_error("Bad access through invalid handle");
87  }
88  // No synchronization required due to the design of its use way.
89  // Read access might be requested only when a handle is already acquired.
90  // => There would be no race during initialization and read.
91  // The buffer will remain valid till all holders request its release.
92  // The handle cannot be reassigned prior to complete release.
93  // => There is no chance to get dangling pointers inside boost buffers.
94  auto &entry = pool_[handle];
95  return boost::asio::buffer(const_cast<uint8_t *>(entry.data.data()),
96  entry.data_size);
97  }
98 
99  std::size_t MessagePool::capacity() const {
100  return entries_count_;
101  }
102 
103  std::optional<MessageHandle> MessagePool::nextFreeSlot() {
104  std::lock_guard lock(mutex_);
105  if (free_slots_.empty()) {
106  return std::nullopt;
107  }
108  auto free_slot = free_slots_.begin();
109  auto slot = *free_slot;
110  free_slots_.erase(free_slot);
111  return slot;
112  }
113 } // namespace kagome::telemetry
std::unordered_set< std::size_t > free_slots_
RefCount release(MessageHandle handle)
std::size_t capacity() const
std::optional< MessageHandle > push(const std::string &message, int16_t ref_count)
std::optional< MessageHandle > nextFreeSlot()
performs quick lookup for a free slot
const std::size_t entries_count_
boost::asio::mutable_buffer operator[](MessageHandle handle) const
std::vector< Record > pool_
std::size_t MessageHandle
RefCount add_ref(MessageHandle handle)
MessagePool(std::size_t entry_size_bytes, std::size_t entries_count)