m2etis  0.4
GeneralBooleanExpressionsFilter.h
Go to the documentation of this file.
1 /*
2  Copyright (2016) Michael Baer, Daniel Bonrath, All rights reserved.
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15  */
16 
22 #ifndef __M2ETIS_PUBSUB_FILTER_GENERALBOOLEANEXPRESSIONSFILTER_H__
23 #define __M2ETIS_PUBSUB_FILTER_GENERALBOOLEANEXPRESSIONSFILTER_H__
24 
25 #include <map>
26 #include <set>
27 #include <string>
28 #include <vector>
29 
36 
37 #include "boost/tuple/tuple.hpp"
38 
54 namespace m2etis {
55 namespace pubsub {
56 namespace filter {
57 
58  template<typename EventType, typename NetworkType>
59  class GeneralBooleanExpressionsFilter : public BaseFilter<EventType, NetworkType> {
60  public:
62 
63  enum {
64  size = 0
65  };
66 
68  }
69 
71  }
72 
73  virtual void getSubscribePayload(boost::shared_ptr<FilterExp<EventType>> filter, bool, typename message::FilterInfo::Ptr filterInfo) override {
74  // insert into index data structures:
75  addSubscription(self_, filter);
76 
77  // copy own filters and filters of subscribers into set and forward the set:
78  auto filters = std::set<boost::shared_ptr<FilterExp<EventType>>>();
79 
80  for (auto subscription : subscriptions_) {
81  filters.insert(boost::get<1>(subscription.second));
82  }
83 
84  typename FilterInfoType::Ptr fInfo = cast(filterInfo);
85  fInfo->dynamic_filter_ = filters;
86  fInfo->isUnsubscribe_ = false;
87  }
88 
89  virtual void getUnsubscribePayload(typename message::FilterInfo::Ptr filterInfo) override {
90  // clear all indexes:
91  for (auto predicateID_subscription_pair : predicate_subscription_association_table_) {
92  predicate_identifier_factory_.freeID(predicateID_subscription_pair.first);
93  }
94 
95  for (auto subscriptionID_subscription_pair : subscription_subscriber_association_table) {
96  subscription_identifier_factory_.freeID(subscriptionID_subscription_pair.first);
97  }
98 
99  has_new_subscription_ = true;
100 
101  subscription_subscriptionID_association_table.clear();
102  subscription_subscriber_association_table.clear();
103  subscriptions_.clear();
104  predicate_subscription_association_table_.clear();
105  predicate_indexes_.clear();
106 
107  typename FilterInfoType::Ptr fInfo = cast(filterInfo);
108  fInfo->dynamic_filter_ = std::set<boost::shared_ptr<FilterExp<EventType>>>();
109  }
110 
112  bool hasMoreSubscribers;
113  bool hasMoreSubscriptions;
114 
115  removeSubscription(self_, filter, &hasMoreSubscribers, &hasMoreSubscriptions);
116 
117  if (!hasMoreSubscriptions) {
118  // if last filter has been deregistered, unsubscribe from the tree
119  typename FilterInfoType::Ptr fInfo = cast(filterInfo);
120  fInfo->dynamic_filter_ = std::set<boost::shared_ptr<FilterExp<EventType>>>();
121 
123  }
124 
125  if (hasMoreSubscribers) {
126  // filter has been deregsitered, but a subscriber of this node has registered the same filter
128  } else {
129  typename FilterInfoType::Ptr fInfo = cast(filterInfo);
130  fInfo->dynamic_filter_ = std::set<boost::shared_ptr<FilterExp<EventType>>>({filter});
131  fInfo->isUnsubscribe_ = true;
133  }
134  }
135 
136  virtual std::string getPublishPayload(const typename BaseFilter<EventType, NetworkType>::PayloadPtr message_text) const override {
137  return "";
138  }
139 
140  virtual std::string processSubscribePayload(const typename NetworkType::Key & sender, typename message::FilterInfo::Ptr filterInfo) override {
141  typename FilterInfoType::Ptr fInfo = cast(filterInfo);
142 
143  bool hasMoreSubscribers;
144  bool hasMoreSubscriptions;
145 
146  if (fInfo->isUnsubscribe_) {
147  // currently only one filter can be contained:
148  removeSubscription(sender, *(fInfo->dynamic_filter_.begin()), &hasMoreSubscribers, &hasMoreSubscriptions);
149 
150  return "";
151  }
152 
153  for (auto filter : fInfo->dynamic_filter_) {
154  addSubscription(sender, filter);
155  }
156 
157  // remove all filters not contained in the filterinfo, but in the indexes:
158 
159  bool is_filter_found = false;
160  auto erasable_subscriptions = std::vector<boost::shared_ptr<FilterExp<EventType>>>();
161 
162  for (auto filter_filterID_pair : subscription_subscriptionID_association_table) {
163  for (auto filter_ptr : fInfo->dynamic_filter_) {
164  if (*(filter_filterID_pair.first) == *filter_ptr) {
165  is_filter_found = true;
166 
167  break;
168  }
169  }
170  if (!is_filter_found) {
171  erasable_subscriptions.push_back(filter_filterID_pair.first);
172  // may not be removed here to avoid invalidating iterators
173  }
174  is_filter_found = false;
175  }
176 
177  for (auto erasable_subscription : erasable_subscriptions) {
178  removeSubscription(sender, erasable_subscription, &hasMoreSubscribers, &hasMoreSubscriptions);
179  }
180 
181  return "";
182  } // ProcessSubscribePayload
183 
184  virtual void processUnsubscribePayload(const typename NetworkType::Key & sender, typename message::FilterInfo::Ptr filterInfo) override {
185  bool hasMoreSubscribers;
186  bool hasMoreSubscriptions;
187 
188  // remove all filters of sender
189 
190  for (auto filter_filterID_pair : subscription_subscriptionID_association_table) {
191  removeSubscription(sender, (filter_filterID_pair.first), &hasMoreSubscribers, &hasMoreSubscriptions);
192  }
193  }
194 
195  // called by routing strategy:
196  virtual void processRoutingStrategyUnsubscribeNotification(const typename NetworkType::Key & sender) override {
197  bool hasMoreSubscribers;
198  bool hasMoreSubscriptions;
199 
200  for (auto filter_filterID_pair : subscription_subscriptionID_association_table) {
201  removeSubscription(sender, (filter_filterID_pair.first), &hasMoreSubscribers, &hasMoreSubscriptions);
202  }
203  }
204 
205  // function matching against dynamic filters of subscriber before forwarding to subscriber:
206  virtual bool match(const typename NetworkType::Key & to, typename message::FilterInfo::Ptr filterInfo, typename BaseFilter<EventType, NetworkType>::PayloadPtr event) override {
207  // Given the filterinfo (from getPublishPayload), the message and the recipient
208  // decide if the recipient needs that message
209 
210  // remember result and evaluate again only if needed
211  static std::set<typename NetworkType::Key> matching_subscribers; // stores matched subscriptions
212 
213  if (!event || current_event_ != event || has_new_subscription_) {
214  matching_subscribers.clear();
215  has_new_subscription_ = false;
216  current_event_ = event;
217 
218  for (auto predicate : fulfilled_predicate_vector_) {
219  predicate = false;
220  }
221 
222  for (auto predicate_index : predicate_indexes_) {
223  predicate_index->determineMatchingPredicates(*event, fulfilled_predicate_vector_);
224  }
225 
226  // determine number of fulfilled predicates for each subscription: (hit vector)
227  std::map<SubscriptionIdentifierFactory::SubscriptionID, int> hit_vector;
228 
229  PredicateIdentifierFactory::PredicateID current_predicate_id = 0;
230  for (auto predicate : fulfilled_predicate_vector_) {
231  if (predicate) {
232  auto hit_subscriptions = predicate_subscription_association_table_[current_predicate_id];
233  for (auto hit_subscription : hit_subscriptions) {
234  // auto previous_hit_number = hit_vector.at(hit_subscription);
235  auto previous_hit_number = 0;
236  auto iter_hit_vector = hit_vector.find(hit_subscription);
237  if (iter_hit_vector != hit_vector.end()) {
238  previous_hit_number = hit_vector.at(hit_subscription);
239  }
240  hit_vector[hit_subscription] = previous_hit_number + 1;
241  M2ETIS_LOG_INFO("GeneralBooleanExpression", "belonging to predicate id " << current_predicate_id << "new hit number " << previous_hit_number + 1);
242  }
243  }
244  ++current_predicate_id;
245  }
246 
247  // determine matching subscription candidates (hit count >= minimum fulfilled predicate count)
248  // and test
249 
250  for (auto & subscription : subscriptions_) {
251  auto iter_hit_vector = hit_vector.find(subscription.first);
252  if (iter_hit_vector != hit_vector.end()) {
253  // store subscribers so indexes do not have to be evaluated again for the same event
254  if (boost::get<0>(subscription.second) <= hit_vector.at(subscription.first)) {
255  MatchVisitor<EventType> dynamic_filter_visitor(*event);
256  (boost::get<1>(subscription.second))->Accept(dynamic_filter_visitor);
257 
258  if (dynamic_filter_visitor.get_result()) {
259  auto subscribers_to_subscriptionID = subscription_subscriber_association_table.find(subscription.first)->second;
260  matching_subscribers.insert(subscribers_to_subscriptionID.begin(), subscribers_to_subscriptionID.end());
261  }
262  }
263  }
264  }
265  } // if event has to be evaluated again
266 
267  return (matching_subscribers.find(to) == matching_subscribers.end() ? false : true);
268  }
269 
270  // function matching against own dynamic_filters before delivering to application:
271  virtual bool match(typename BaseFilter<EventType, NetworkType>::PayloadPtr event) override {
272  return match(self_, typename message::FilterInfo::Ptr(), event);
273  }
274 
275  virtual void setSelf(const typename NetworkType::Key & self) override {
276  self_ = self;
277  }
278 
279  private:
280  // adds the subscription to the data structures
281  // called from getSubscribePayload and processSubscribePayload
282  void addSubscription(const typename NetworkType::Key & subscriber_key, boost::shared_ptr<FilterExp<EventType>> filter) {
283  // test, if subscription is already registered:
284  for (auto filter_filterID_pair : subscription_subscriptionID_association_table) {
285  if (*(filter_filterID_pair.first) == *filter) {
286  subscription_subscriber_association_table[filter_filterID_pair.second].insert(subscriber_key);
287  return;
288  }
289  }
290 
291  has_new_subscription_ = true;
292 
293  auto subscription_id = subscription_identifier_factory_.createSubscriptionIdentifier();
294  subscription_subscriber_association_table[subscription_id] = std::set<typename NetworkType::Key>({subscriber_key});
295 
296  GeneralBooleanExpressionsPreProcessVisitor<EventType, NetworkType> preprocess_visitor(&predicate_indexes_, &predicate_subscription_association_table_, subscription_id, &predicate_identifier_factory_);
297 
298  filter->Accept(preprocess_visitor);
299 
300  // initializing fulfilled_predicate_vector_ with the correct size:
301  if (!predicate_subscription_association_table_.empty()) {
302  auto maxPredicateID = (--predicate_subscription_association_table_.end())->first;
303  fulfilled_predicate_vector_ = std::vector<bool>(maxPredicateID + 1, false);
304  } else {
305  fulfilled_predicate_vector_ = std::vector<bool>();
306  }
307 
308  // necessary data structures for subscriptions, containing:
309  // subscription id(key), minimum predicate count vector, hit vector, shared pointer to subscription tree
310  // filled in getSubscrbie payload, with the exception of hit vector, which is filled in match
311 
312  boost::tuple<int, boost::shared_ptr<FilterExp<EventType>>> subscription_data = {0, filter};
313  // initialize minimum predicate count vector (Algorithm GetMinPredicates in Bittner)
314 
315  GetMinPredicatesVisitor<EventType> get_min_predicates_visitor;
316 
317  filter->Accept(get_min_predicates_visitor);
318 
319  boost::get<0>(subscription_data) = get_min_predicates_visitor.get_result();
320 
321  subscriptions_[subscription_id] = subscription_data;
322 
323  subscription_subscriptionID_association_table.push_back(std::make_pair(filter, subscription_id));
324 
325  return;
326  } // void addSubscription
327 
328  // removes the subscription from the data structures
329  // called from getUnsubscribePayload and processUnsubscribePayload
330  void removeSubscription(const typename NetworkType::Key & subscriber_key, boost::shared_ptr<FilterExp<EventType>> filter, bool * hasMoreSubscribers, bool * hasMoreSubscriptions) {
331  *hasMoreSubscriptions = true; // subscriptions to the node!
332  *hasMoreSubscribers = true; // further subscribers having registered the predicate to be removed
333 
334  // determining subscription id:
336  auto erasable_subscription_iter = subscription_subscriptionID_association_table.end();
337 
338  for (auto filter_id_pair_iter = subscription_subscriptionID_association_table.begin(); filter_id_pair_iter != subscription_subscriptionID_association_table.end(); ++filter_id_pair_iter) {
339  if (*(filter_id_pair_iter->first) == *filter) {
340  subscriptionID = filter_id_pair_iter->second;
341  erasable_subscription_iter = filter_id_pair_iter;
342  }
343  }
344 
345  if (erasable_subscription_iter == subscription_subscriptionID_association_table.end()) {
346  // subscription not registered
347  *hasMoreSubscribers = true;
348 
349  // find out if there are subscriptions to this node left:
350  checkForSubscriptions(subscriber_key, hasMoreSubscriptions);
351 
352  return;
353  }
354 
355  subscription_subscriber_association_table[subscriptionID].erase(subscriber_key);
356 
357  if (subscription_subscriber_association_table[subscriptionID].size() == 0) {
358  // no more subscribers for this subscription
359  *hasMoreSubscribers = false;
360  subscription_subscriber_association_table.erase(subscriptionID); // remove empty set
361 
362  subscriptions_.erase(subscriptionID);
363  subscription_subscriptionID_association_table.erase(erasable_subscription_iter);
364 
365  // using predicate_subscription_association_table_ to determine predicates in removed filter:
366 
367  std::set<PredicateIdentifierFactory::PredicateID> erasable_predicates;
368 
369  for (auto & predicateID_subscriptionSet_pair : predicate_subscription_association_table_) {
370  auto number_of_subscriptions_to_predicate = predicateID_subscriptionSet_pair.second.count(subscriptionID);
371 
372  if (number_of_subscriptions_to_predicate == 0) {
373  continue;
374  // predicate not used in this subscription
375  }
376 
377  if (number_of_subscriptions_to_predicate == 1) {
378  // there are no other subscriptions with this predicate,
379  // remove predicate from indexes
380 
381  // store predicate ids to remove from predicate_subscription_association_table
382  // may not be removed here to avoid invalidating iterators
383 
384  erasable_predicates.insert(predicateID_subscriptionSet_pair.first);
385 
386  for (auto predicate_index : predicate_indexes_) {
387  predicate_index->removePredicate(predicateID_subscriptionSet_pair.first);
388  }
389  }
390 
391  predicateID_subscriptionSet_pair.second.erase(predicateID_subscriptionSet_pair.second.find(subscriptionID));
392  }
393 
394  for (auto erasable_predicate : erasable_predicates) {
395  predicate_subscription_association_table_.erase(erasable_predicate);
396  }
397 
398  subscription_identifier_factory_.freeID(subscriptionID);
399  } // no more subscribers for this subscription
400 
401  // find out if there are subscriptions to this node left:
402 
403  checkForSubscriptions(subscriber_key, hasMoreSubscriptions);
404  } // removeSubscription
405 
406  void checkForSubscriptions(const typename NetworkType::Key & subscriber_key, bool * hasMoreSubscriptions) {
407  // find out if there are subscriptions to this node left:
408  *hasMoreSubscriptions = false;
409 
410  for (auto subscription_subscriber_pair : subscription_subscriber_association_table) {
411  if (subscription_subscriber_pair.second.count(subscriber_key) > 0) {
412  *hasMoreSubscriptions = true;
413  break;
414  }
415  }
416  }
417 
418  // The key of this node
419  typename NetworkType::Key self_;
420 
421  typename FilterInfoType::Ptr cast(typename message::FilterInfo::Ptr ptr) const {
422  typename FilterInfoType::Ptr ret = boost::dynamic_pointer_cast<FilterInfoType>(ptr);
423  if (!ret) {
424  M2ETIS_LOG_ERROR("Filter Strategy", "Downcast error of filterInfo");
425  }
426  return ret;
427  }
428 
429  // list of predicate indexes: (each of which has to be evaluated for every incoming message)
430  std::vector<std::shared_ptr<PredicateIndex<EventType>>> predicate_indexes_;
431 
432  // predicate_id corresponds to position in fulfilled_predicate_vector_
433  std::vector<bool> fulfilled_predicate_vector_;
434 
435  // if predicate is matched, determine subcriptions containing predicate for candidate subscription checking
436  // via the following data structure
437  // (multi)set because of unsubscriptions
438  std::map<PredicateIdentifierFactory::PredicateID, std::multiset<SubscriptionIdentifierFactory::SubscriptionID>> predicate_subscription_association_table_;
439 
440  SubscriptionIdentifierFactory subscription_identifier_factory_;
441  PredicateIdentifierFactory predicate_identifier_factory_;
442 
443  // necessary data structures for subscriptions, containing:
444  // subscription id(key), minimum predicate count, shared pointer to subscription tree
445  // filled in getSubscrbie payload
446  // hit vector is filled in match
447  std::map<SubscriptionIdentifierFactory::SubscriptionID, boost::tuple<int, boost::shared_ptr<FilterExp<EventType>>>> subscriptions_;
448 
449  std::map<SubscriptionIdentifierFactory::SubscriptionID, std::set<typename NetworkType::Key>> subscription_subscriber_association_table;
450 
451  // needed to determine if indexes have to be evaluated again or if previous result can be used:
452  bool has_new_subscription_;
453  boost::shared_ptr<EventType> current_event_; // not via == operator, so EventType does not have to overload ==
454 
455  // mapping filters (which are sent via network) to (local) IDs for deregistrations:
456  // maybe introduce hash (unordered_)map to fasten access to filters with identical hashes and their SubscriptionIDs
457  std::vector<std::pair<boost::shared_ptr<FilterExp<EventType>>, SubscriptionIdentifierFactory::SubscriptionID>> subscription_subscriptionID_association_table;
458  };
459 
460 } /* namespace filter */
461 } /* namespace pubsub */
462 } /* namespace m2etis */
463 
464 #endif /* __M2ETIS_PUBSUB_FILTER_GENERALBOOLEANEXPRESSIONSFILTER_H__ */
465 
virtual void processRoutingStrategyUnsubscribeNotification(const typename NetworkType::Key &sender) override
removes all filters of the sender with the key given intended for routing strategies to signal purged...
virtual void getUnsubscribePayload(typename message::FilterInfo::Ptr filterInfo) override
removes all filters
boost::shared_ptr< EventType > PayloadPtr
Definition: BaseFilter.h:48
boost::shared_ptr< FilterInfo > Ptr
Definition: FilterInfo.h:35
#define M2ETIS_LOG_INFO(module, message)
Definition: Logger.h:55
virtual bool match(const typename NetworkType::Key &to, typename message::FilterInfo::Ptr filterInfo, typename BaseFilter< EventType, NetworkType >::PayloadPtr event) override
virtual BaseFilter< EventType, NetworkType >::FilterUnsubscribeInformation getUnsubscribePayload(typename message::FilterInfo::Ptr filterInfo, boost::shared_ptr< FilterExp< EventType >> filter) override
virtual void getSubscribePayload(boost::shared_ptr< FilterExp< EventType >> filter, bool, typename message::FilterInfo::Ptr filterInfo) override
boost::shared_ptr< GeneralBooleanExpressionsFilterInfo< EventType > > Ptr
virtual std::string processSubscribePayload(const typename NetworkType::Key &sender, typename message::FilterInfo::Ptr filterInfo) override
processes the set of received filters from sender
#define M2ETIS_LOG_ERROR(module, message)
Definition: Logger.h:59
virtual std::string getPublishPayload(const typename BaseFilter< EventType, NetworkType >::PayloadPtr message_text) const override
message::GeneralBooleanExpressionsFilterInfo< EventType > FilterInfoType
virtual void setSelf(const typename NetworkType::Key &self) override
virtual void processUnsubscribePayload(const typename NetworkType::Key &sender, typename message::FilterInfo::Ptr filterInfo) override
removes all filters of the sender with the key given
virtual bool match(typename BaseFilter< EventType, NetworkType >::PayloadPtr event) override