m2etis  0.4
BruteForceFilter.h
Go to the documentation of this file.
1 /*
2  Copyright (2016) Michael Baer, Daniel Bonrath, All rights reserved.
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15  */
16 
22 #ifndef __M2ETIS_PUBSUB_FILTER_BRUTEFORCEFILTER_H__
23 #define __M2ETIS_PUBSUB_FILTER_BRUTEFORCEFILTER_H__
24 
25 #include <map>
26 #include <string>
27 
32 
33 #include "boost/make_shared.hpp"
34 #include "boost/shared_ptr.hpp"
35 
43 namespace m2etis {
44 namespace pubsub {
45 namespace filter {
46 
47  template<typename EventType, typename NetworkType>
48  class BruteForceFilter: public BaseFilter<EventType, NetworkType> {
49  public:
51 
52  enum {
53  size = 0
54  };
55 
60  }
61 
62  virtual ~BruteForceFilter() {
63  }
64 
65  virtual void getSubscribePayload(boost::shared_ptr<FilterExp<EventType>> filter, bool, typename message::FilterInfo::Ptr filterInfo) override {
66  // save filter if not yet stored:
67  dynamic_filters_[self_].insert(filter);
68 
69  // copy own filters and filters of subscribers into set and return the set:
70  auto filters = std::set<boost::shared_ptr<FilterExp<EventType>>>();
71 
72  for (auto key_filters_pair : dynamic_filters_) {
73  for (auto current_filter : key_filters_pair.second) {
74  filters.insert(current_filter);
75  }
76  }
77 
78  typename FilterInfoType::Ptr fInfo = cast(filterInfo);
79  fInfo->dynamic_filter_ = filters;
80  fInfo->isUnsubscribe_ = false;
81  }
82 
83  virtual void getUnsubscribePayload(typename message::FilterInfo::Ptr filterInfo) override {
84  dynamic_filters_.clear();
85 
86  typename FilterInfoType::Ptr fInfo = cast(filterInfo);
87  fInfo->dynamic_filter_ = std::set<boost::shared_ptr<FilterExp<EventType>>>();
88  }
89 
91  removeFilter(self_, filter);
92 
93  if (dynamic_filters_[self_].empty()) {
94  // if last filter has been deregistered, unsubscribe from the tree
95  typename FilterInfoType::Ptr fInfo = cast(filterInfo);
96  fInfo->dynamic_filter_ = std::set<boost::shared_ptr<FilterExp<EventType>>>();
97 
99  }
100 
101  bool isFilterFound = false;
102 
103  // find out, if a subscriber has registered the same filter:
104  for (auto keySubscriptionPair : dynamic_filters_) {
105  for (auto registered_filter : keySubscriptionPair.second) {
106  if (*registered_filter == *filter) {
107  isFilterFound = true;
108  break;
109  }
110  }
111  }
112 
113  if (isFilterFound) {
114  // filter has been deregistered, but a subscriber of this node has registered the same filter
116  } else {
117  typename FilterInfoType::Ptr fInfo = cast(filterInfo);
118  fInfo->dynamic_filter_ = std::set<boost::shared_ptr<FilterExp<EventType> > >({filter});
119  fInfo->isUnsubscribe_ = true;
121  }
122  }
123 
124  virtual std::string getPublishPayload(const typename BaseFilter<EventType, NetworkType>::PayloadPtr message_text) const override {
125  return "";
126  }
127 
128  virtual std::string processSubscribePayload(const typename NetworkType::Key & sender_key, typename message::FilterInfo::Ptr filterInfo) override {
129  typename FilterInfoType::Ptr fInfo = cast(filterInfo);
130 
131  if (fInfo->isUnsubscribe_) {
132  // the subscribe message contains the deregistering of a single filter
133  // see function getUnsubscribePayload
134  // currently only one filter can be contained:
135  removeFilter(sender_key, *(fInfo->dynamic_filter_.begin()));
136  return "";
137  }
138 
139  dynamic_filters_[sender_key] = fInfo->dynamic_filter_;
140 
141  return "";
142  }
143 
144  virtual void processUnsubscribePayload(const typename NetworkType::Key & sender_key, typename message::FilterInfo::Ptr filterInfo) override {
145  dynamic_filters_.erase(sender_key);
146  }
147 
148  virtual void processRoutingStrategyUnsubscribeNotification(const typename NetworkType::Key & sender_key) override {
149  dynamic_filters_.erase(sender_key);
150  }
151 
152  virtual bool match(const typename NetworkType::Key & to, typename message::FilterInfo::Ptr filterInfo, typename BaseFilter<EventType, NetworkType>::PayloadPtr event) override {
153  for (auto filter_of_recipient : (dynamic_filters_[to])) {
154  MatchVisitor<EventType> dynamic_filter_visitor(*event);
155 
156  filter_of_recipient->Accept(dynamic_filter_visitor);
157  if (dynamic_filter_visitor.get_result()) {
158  return true;
159  }
160  }
161  // no matching filter found
162  return false;
163  }
164 
165  virtual bool match(typename BaseFilter<EventType, NetworkType>::PayloadPtr event) override {
166  // matching message against all stored predicates
167  return match(self_, typename message::FilterInfo::Ptr(), event);
168  }
169 
170  virtual void setSelf(const typename NetworkType::Key & self) override {
171  self_ = self;
172  }
173 
174  private:
175  void removeFilter(const typename NetworkType::Key & key, boost::shared_ptr<FilterExp<EventType> > filter) {
176  // comparing objects, not shared_pointers:
177  for (auto filter_iter = dynamic_filters_[key].begin(); filter_iter != dynamic_filters_[key].end(); ++filter_iter) {
178  if (**filter_iter == *filter) {
179  dynamic_filters_[key].erase(filter_iter);
180  break;
181  }
182  }
183  }
184 
185  typename FilterInfoType::Ptr cast(typename message::FilterInfo::Ptr ptr) const {
186  typename FilterInfoType::Ptr ret = boost::dynamic_pointer_cast<FilterInfoType>(ptr);
187  if (!ret) {
188  M2ETIS_LOG_ERROR("Filter Strategy", "Downcast error of filterInfo");
189  }
190  return ret;
191  }
192 
193  std::map<typename NetworkType::Key, std::set<boost::shared_ptr<FilterExp<EventType> > > > dynamic_filters_;
194 
195  protected:
196  // The own key of this node
197  typename NetworkType::Key self_;
198  };
199 
200 } /* namespace filter */
201 } /* namespace pubsub */
202 } /* namespace m2etis */
203 
204 #endif /* __M2ETIS_PUBSUB_FILTER_BRUTEFORCEFILTER_H__ */
205 
virtual std::string processSubscribePayload(const typename NetworkType::Key &sender_key, typename message::FilterInfo::Ptr filterInfo) override
processes the set of received filters from sender
virtual void processUnsubscribePayload(const typename NetworkType::Key &sender_key, typename message::FilterInfo::Ptr filterInfo) override
removes all filters of the sender with the key given
virtual void setSelf(const typename NetworkType::Key &self) override
virtual BaseFilter< EventType, NetworkType >::FilterUnsubscribeInformation getUnsubscribePayload(typename message::FilterInfo::Ptr filterInfo, boost::shared_ptr< FilterExp< EventType >> filter) override
boost::shared_ptr< BruteForceFilterInfo< EventType > > Ptr
boost::shared_ptr< EventType > PayloadPtr
Definition: BaseFilter.h:48
boost::shared_ptr< FilterInfo > Ptr
Definition: FilterInfo.h:35
virtual std::string getPublishPayload(const typename BaseFilter< EventType, NetworkType >::PayloadPtr message_text) const override
virtual void getUnsubscribePayload(typename message::FilterInfo::Ptr filterInfo) override
removes all filters
#define M2ETIS_LOG_ERROR(module, message)
Definition: Logger.h:59
virtual void getSubscribePayload(boost::shared_ptr< FilterExp< EventType >> filter, bool, typename message::FilterInfo::Ptr filterInfo) override
message::BruteForceFilterInfo< EventType > FilterInfoType
virtual void processRoutingStrategyUnsubscribeNotification(const typename NetworkType::Key &sender_key) override
removes all filters of the sender with the key given intended for routing strategies to signal purged...
virtual bool match(const typename NetworkType::Key &to, typename message::FilterInfo::Ptr filterInfo, typename BaseFilter< EventType, NetworkType >::PayloadPtr event) override
virtual bool match(typename BaseFilter< EventType, NetworkType >::PayloadPtr event) override