6 #ifndef KAGOME_SUBSCRIPTION_SUBSCRIBER_HPP 7 #define KAGOME_SUBSCRIPTION_SUBSCRIBER_HPP 19 static std::atomic<SubscriptionSetId>
id{0ll};
34 template <
typename EventKey,
typename Receiver,
typename... Arguments>
35 class Subscriber final :
public std::enable_shared_from_this<
36 Subscriber<EventKey, Receiver, Arguments...>> {
49 const Arguments &...)>;
56 std::unordered_map<SubscriptionSetId, SubscriptionsContainer>;
66 template <
typename... SubscriberConstructorArgs>
68 SubscriberConstructorArgs &&...args)
70 object_(
std::forward<SubscriberConstructorArgs>(args)...) {}
74 for (
auto &[_, subscriptions] : subscriptions_sets_)
75 for (
auto &[key, it] : subscriptions) engine_->unsubscribe(key, it);
85 on_notify_callback_ = std::move(f);
93 std::lock_guard lock(subscriptions_cs_);
94 auto &&[it, inserted] = subscriptions_sets_[id].emplace(
100 it->second = engine_->subscribe(
id, key, this->weak_from_this());
109 std::lock_guard<std::mutex> lock(subscriptions_cs_);
110 if (
auto set_it = subscriptions_sets_.find(
id);
111 set_it != subscriptions_sets_.end()) {
112 auto &subscriptions = set_it->second;
113 auto it = subscriptions.find(key);
114 if (subscriptions.end() != it) {
115 engine_->unsubscribe(key, it->second);
116 subscriptions.erase(it);
128 std::lock_guard<std::mutex> lock(subscriptions_cs_);
129 if (
auto set_it = subscriptions_sets_.find(
id);
130 set_it != subscriptions_sets_.end()) {
131 auto &subscriptions = set_it->second;
132 for (
auto &[key, it] : subscriptions) engine_->unsubscribe(key, it);
134 subscriptions_sets_.erase(set_it);
141 std::lock_guard<std::mutex> lock(subscriptions_cs_);
142 for (
auto &[_, subscriptions] : subscriptions_sets_)
143 for (
auto &[key, it] : subscriptions) engine_->unsubscribe(key, it);
145 subscriptions_sets_.clear();
149 const EventType &key,
150 const Arguments &...args) {
151 if (
nullptr != on_notify_callback_)
155 ReceiverType &
get() {
162 #endif // KAGOME_SUBSCRIPTION_SUBSCRIBER_HPP
SubscriptionEnginePtr engine_
std::unordered_map< SubscriptionSetId, SubscriptionsContainer > SubscriptionsSets
CallbackFnType on_notify_callback_
bool unsubscribe(SubscriptionSetId id, const EventType &key)
void on_notify(SubscriptionSetId set_id, const EventType &key, const Arguments &...args)
Subscriber(SubscriptionEnginePtr &ptr, SubscriberConstructorArgs &&...args)
SubscriptionsSets subscriptions_sets_
std::unordered_map< EventType, typename SubscriptionEngineType::IteratorType > SubscriptionsContainer
void subscribe(SubscriptionSetId id, const EventType &key)
void setCallback(CallbackFnType &&f)
Subscriber & operator=(const Subscriber &)=delete
SubscriptionSetId generateNextId()
SubscriptionSetId generateSubscriptionSetId()
std::mutex subscriptions_cs_
typename SubscribersContainer::iterator IteratorType
uint32_t SubscriptionSetId
bool unsubscribe(SubscriptionSetId id)
std::function< void(SubscriptionSetId, ReceiverType &, const EventType &, const Arguments &...)> CallbackFnType
std::shared_ptr< SubscriptionEngineType > SubscriptionEnginePtr