Kagome
Polkadot Runtime Engine in C++17
subscription_engine.hpp
Go to the documentation of this file.
1 
6 #ifndef KAGOME_SUBSCRIPTION_ENGINE_HPP
7 #define KAGOME_SUBSCRIPTION_ENGINE_HPP
8 
9 #include <list>
10 #include <memory>
11 #include <shared_mutex>
12 #include <unordered_map>
13 
14 namespace kagome::subscription {
15 
16  template <typename Event, typename Receiver, typename... Arguments>
17  class Subscriber;
18 
19  using SubscriptionSetId = uint32_t;
20 
29  template <typename EventKey, typename Receiver, typename... EventParams>
30  class SubscriptionEngine final
31  : public std::enable_shared_from_this<
32  SubscriptionEngine<EventKey, Receiver, EventParams...>> {
33  public:
34  using EventKeyType = EventKey;
35  using ReceiverType = Receiver;
36  using SubscriberType =
38  using SubscriberWeakPtr = std::weak_ptr<SubscriberType>;
39 
44  using SubscribersContainer =
45  std::list<std::pair<SubscriptionSetId, SubscriberWeakPtr>>;
46  using IteratorType = typename SubscribersContainer::iterator;
47 
48  public:
49  SubscriptionEngine() = default;
50  ~SubscriptionEngine() = default;
51 
52  SubscriptionEngine(SubscriptionEngine &&) = default; // NOLINT
53  SubscriptionEngine &operator=(SubscriptionEngine &&) = default; // NOLINT
54 
55  SubscriptionEngine(const SubscriptionEngine &) = delete;
57 
58  private:
59  template <typename KeyType, typename ValueType, typename... Args>
60  friend class Subscriber;
61  using KeyValueContainer =
62  std::unordered_map<EventKeyType, SubscribersContainer>;
63 
64  mutable std::shared_mutex subscribers_map_cs_;
66 
68  const EventKeyType &key,
69  SubscriberWeakPtr ptr) {
70  std::unique_lock lock(subscribers_map_cs_);
71  auto &subscribers_list = subscribers_map_[key];
72  return subscribers_list.emplace(subscribers_list.end(),
73  std::make_pair(set_id, std::move(ptr)));
74  }
75 
76  void unsubscribe(const EventKeyType &key, const IteratorType &it_remove) {
77  std::unique_lock lock(subscribers_map_cs_);
78  auto it = subscribers_map_.find(key);
79  if (subscribers_map_.end() != it) {
80  it->second.erase(it_remove);
81  if (it->second.empty()) subscribers_map_.erase(it);
82  }
83  }
84 
85  public:
86  size_t size(const EventKeyType &key) const {
87  std::shared_lock lock(subscribers_map_cs_);
88  if (auto it = subscribers_map_.find(key); it != subscribers_map_.end())
89  return it->second.size();
90 
91  return 0ull;
92  }
93 
94  size_t size() const {
95  std::shared_lock lock(subscribers_map_cs_);
96  size_t count = 0ull;
97  for (auto &it : subscribers_map_) count += it.second.size();
98  return count;
99  }
100 
101  void notify(const EventKeyType &key, const EventParams &...args) {
102  std::shared_lock lock(subscribers_map_cs_);
103  auto it = subscribers_map_.find(key);
104  if (subscribers_map_.end() == it) return;
105 
106  auto &subscribers_container = it->second;
107  for (auto it_sub = subscribers_container.begin();
108  it_sub != subscribers_container.end();) {
109  if (auto sub = it_sub->second.lock()) {
110  sub->on_notify(it_sub->first, key, args...);
111  ++it_sub;
112  } else {
113  it_sub = subscribers_container.erase(it_sub);
114  }
115  }
116  }
117  };
118 
119 } // namespace kagome::subscription
120 
121 #endif // KAGOME_SUBSCRIPTION_ENGINE_HPP
void notify(const EventKeyType &key, const EventParams &...args)
std::list< std::pair< SubscriptionSetId, SubscriberWeakPtr >> SubscribersContainer
std::weak_ptr< SubscriberType > SubscriberWeakPtr
IteratorType subscribe(SubscriptionSetId set_id, const EventKeyType &key, SubscriberWeakPtr ptr)
void unsubscribe(const EventKeyType &key, const IteratorType &it_remove)
size_t size(const EventKeyType &key) const
SubscriptionEngine & operator=(SubscriptionEngine &&)=default
typename SubscribersContainer::iterator IteratorType
std::unordered_map< EventKeyType, SubscribersContainer > KeyValueContainer