Kagome
Polkadot Runtime Engine in C++17
subscriber.hpp
Go to the documentation of this file.
1 
6 #ifndef KAGOME_SUBSCRIPTION_SUBSCRIBER_HPP
7 #define KAGOME_SUBSCRIPTION_SUBSCRIBER_HPP
8 
9 #include <atomic>
10 #include <functional>
11 #include <memory>
12 #include <mutex>
13 
15 
16 namespace kagome::subscription {
17 
19  static std::atomic<SubscriptionSetId> id{0ll};
20  SubscriptionSetId result = ++id;
21  return result;
22  }
23 
34  template <typename EventKey, typename Receiver, typename... Arguments>
35  class Subscriber final : public std::enable_shared_from_this<
36  Subscriber<EventKey, Receiver, Arguments...>> {
37  public:
38  using EventType = EventKey;
39  using ReceiverType = Receiver;
40  using Hash = size_t;
41 
44  using SubscriptionEnginePtr = std::shared_ptr<SubscriptionEngineType>;
45 
46  using CallbackFnType = std::function<void(SubscriptionSetId,
47  ReceiverType &,
48  const EventType &,
49  const Arguments &...)>;
50 
51  private:
53  std::unordered_map<EventType,
55  using SubscriptionsSets =
56  std::unordered_map<SubscriptionSetId, SubscriptionsContainer>;
58  ReceiverType object_;
59 
60  std::mutex subscriptions_cs_;
62 
64 
65  public:
66  template <typename... SubscriberConstructorArgs>
68  SubscriberConstructorArgs &&...args)
69  : engine_(ptr),
70  object_(std::forward<SubscriberConstructorArgs>(args)...) {}
71 
73  // Unsubscribe all
74  for (auto &[_, subscriptions] : subscriptions_sets_)
75  for (auto &[key, it] : subscriptions) engine_->unsubscribe(key, it);
76  }
77 
78  Subscriber(const Subscriber &) = delete;
79  Subscriber &operator=(const Subscriber &) = delete;
80 
81  Subscriber(Subscriber &&) = default; // NOLINT
82  Subscriber &operator=(Subscriber &&) = default; // NOLINT
83 
85  on_notify_callback_ = std::move(f);
86  }
87 
89  return generateNextId();
90  }
91 
92  void subscribe(SubscriptionSetId id, const EventType &key) {
93  std::lock_guard lock(subscriptions_cs_);
94  auto &&[it, inserted] = subscriptions_sets_[id].emplace(
95  key, typename SubscriptionEngineType::IteratorType{});
96 
99  if (inserted)
100  it->second = engine_->subscribe(id, key, this->weak_from_this());
101  }
102 
108  bool unsubscribe(SubscriptionSetId id, const EventType &key) {
109  std::lock_guard<std::mutex> lock(subscriptions_cs_);
110  if (auto set_it = subscriptions_sets_.find(id);
111  set_it != subscriptions_sets_.end()) {
112  auto &subscriptions = set_it->second;
113  auto it = subscriptions.find(key);
114  if (subscriptions.end() != it) {
115  engine_->unsubscribe(key, it->second);
116  subscriptions.erase(it);
117  return true;
118  }
119  }
120  return false;
121  }
122 
128  std::lock_guard<std::mutex> lock(subscriptions_cs_);
129  if (auto set_it = subscriptions_sets_.find(id);
130  set_it != subscriptions_sets_.end()) {
131  auto &subscriptions = set_it->second;
132  for (auto &[key, it] : subscriptions) engine_->unsubscribe(key, it);
133 
134  subscriptions_sets_.erase(set_it);
135  return true;
136  }
137  return false;
138  }
139 
140  void unsubscribe() {
141  std::lock_guard<std::mutex> lock(subscriptions_cs_);
142  for (auto &[_, subscriptions] : subscriptions_sets_)
143  for (auto &[key, it] : subscriptions) engine_->unsubscribe(key, it);
144 
145  subscriptions_sets_.clear();
146  }
147 
149  const EventType &key,
150  const Arguments &...args) {
151  if (nullptr != on_notify_callback_)
152  on_notify_callback_(set_id, object_, key, args...);
153  }
154 
155  ReceiverType &get() {
156  return object_;
157  }
158  };
159 
160 } // namespace kagome::subscription
161 
162 #endif // KAGOME_SUBSCRIPTION_SUBSCRIBER_HPP
SubscriptionEnginePtr engine_
Definition: subscriber.hpp:57
std::unordered_map< SubscriptionSetId, SubscriptionsContainer > SubscriptionsSets
Definition: subscriber.hpp:56
bool unsubscribe(SubscriptionSetId id, const EventType &key)
Definition: subscriber.hpp:108
void on_notify(SubscriptionSetId set_id, const EventType &key, const Arguments &...args)
Definition: subscriber.hpp:148
STL namespace.
Subscriber(SubscriptionEnginePtr &ptr, SubscriberConstructorArgs &&...args)
Definition: subscriber.hpp:67
SubscriptionsSets subscriptions_sets_
Definition: subscriber.hpp:61
std::unordered_map< EventType, typename SubscriptionEngineType::IteratorType > SubscriptionsContainer
Definition: subscriber.hpp:54
void subscribe(SubscriptionSetId id, const EventType &key)
Definition: subscriber.hpp:92
void setCallback(CallbackFnType &&f)
Definition: subscriber.hpp:84
Subscriber & operator=(const Subscriber &)=delete
SubscriptionSetId generateNextId()
Definition: subscriber.hpp:18
SubscriptionSetId generateSubscriptionSetId()
Definition: subscriber.hpp:88
typename SubscribersContainer::iterator IteratorType
bool unsubscribe(SubscriptionSetId id)
Definition: subscriber.hpp:127
std::function< void(SubscriptionSetId, ReceiverType &, const EventType &, const Arguments &...)> CallbackFnType
Definition: subscriber.hpp:49
std::shared_ptr< SubscriptionEngineType > SubscriptionEnginePtr
Definition: subscriber.hpp:44