m2etis  0.4
DecisionTreeFilter.h
Go to the documentation of this file.
1 /*
2  Copyright (2016) Michael Baer, Daniel Bonrath, All rights reserved.
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15  */
16 
22 #ifndef __M2ETIS_PUBSUB_FILTER_DECISIONTREEFILTER_H__
23 #define __M2ETIS_PUBSUB_FILTER_DECISIONTREEFILTER_H__
24 
30 
34 
35 #include "boost/shared_ptr.hpp"
36 
60 namespace m2etis {
61 namespace pubsub {
62 namespace filter {
63 
64  template<typename EventType, typename NetworkType>
65  class DecisionTreeFilter: public BaseFilter<EventType, NetworkType> {
66  public:
68  enum {
69  size = 0
70  };
71 
72  DecisionTreeFilter() : decision_tree_(), has_new_subscription_(false), current_event_() {
73  }
74 
75  virtual ~DecisionTreeFilter() {
76  }
77 
78  virtual void getSubscribePayload(boost::shared_ptr<FilterExp<EventType> > filter, bool is_periodic_resubscribe, typename message::FilterInfo::Ptr filterInfo) override {
79  // implementation of Aguilera´s pre_process algorithm to create subscription decision tree:
80  DecisionTreePreProcessVisitor<EventType, NetworkType> decision_tree_preprocess_visitor(&decision_tree_);
81 
82  filter->Accept(decision_tree_preprocess_visitor); // add inner nodes to decision tree
83 
84  // add subscriber to decision tree
85  if (*(decision_tree_preprocess_visitor.get_current_decision_tree_node_ptr())) {
86  // subscription node already in tree
87  (*decision_tree_preprocess_visitor.get_current_decision_tree_node_ptr())->addSubscriber(self_);
88  } else {
89  // create node for subscription:
90  *(decision_tree_preprocess_visitor.get_current_decision_tree_node_ptr()) = boost::make_shared<DecisionTreeLeaf<EventType, NetworkType> >(self_);
91  }
92 
93  has_new_subscription_ = true;
94  typename FilterInfoType::Ptr fInfo = cast(filterInfo);
95  fInfo->dynamic_filter_ = decision_tree_;
96 
97  fInfo->isUnsubscribe_ = false;
98  }
99 
100  virtual void getUnsubscribePayload(typename message::FilterInfo::Ptr filterInfo) override {
101  // when unsubscribing a filter only the sender key in the leaf node is removed for efficiency reasons
102  // the actual nodes are not removed and maybe do not lead to any subscribers any more
103  decision_tree_ = boost::shared_ptr<DecisionTreeNode<EventType, NetworkType> >();
104  typename FilterInfoType::Ptr fInfo = cast(filterInfo);
105 
106  fInfo->dynamic_filter_ = boost::shared_ptr<DecisionTreeNode<EventType, NetworkType> >();
107  }
108 
110  DecisionTreeWalkerVisitor<EventType, NetworkType> decision_tree_walker_visitor(&decision_tree_);
111 
112  filter->Accept(decision_tree_walker_visitor);
113 
114  if (*(decision_tree_walker_visitor.get_current_decision_tree_node_ptr())) {
115  // filter is registered
116  (*(decision_tree_walker_visitor.get_current_decision_tree_node_ptr()))->removeSubscriber(self_);
117  }
118 
119  if (!(decision_tree_->hasSubscription(self_))) {
120  // if last filter has been deregistered, unsubscribe from the tree
121  typename FilterInfoType::Ptr fInfo = cast(filterInfo);
122  fInfo->dynamic_filter_ = boost::shared_ptr<DecisionTreeNode<EventType, NetworkType> >();
123 
125  }
126 
127  // find out, if a subscriber has reigstered the same filter:
128  if (*(decision_tree_walker_visitor.get_current_decision_tree_node_ptr())) {
129  // filter is registered
130  if ((*(decision_tree_walker_visitor.get_current_decision_tree_node_ptr()))->hasSubscriber()) {
131  // filter has been deregsitered, but a subscriber of this node has registered the same filter
133  }
134  }
135 
136  typename FilterInfoType::Ptr fInfo = cast(filterInfo);
137  fInfo->unsubscribe_filter_ = filter;
138  fInfo->isUnsubscribe_ = true;
139 
141  }
142 
143  virtual std::string getPublishPayload(const typename BaseFilter<EventType, NetworkType>::PayloadPtr message_text) const override {
144  return ""; // prefilter or preprocess the message and give other nodes a hint
145  }
146 
147  virtual std::string processSubscribePayload(const typename NetworkType::Key & sender, typename message::FilterInfo::Ptr filterInfo) override {
148  typename FilterInfoType::Ptr fInfo = cast(filterInfo);
149  if (fInfo->isUnsubscribe_) {
150  // currently only one filter can be contained:
151  DecisionTreeWalkerVisitor<EventType, NetworkType> decision_tree_walker_visitor(&decision_tree_);
152 
153  fInfo->unsubscribe_filter_->Accept(decision_tree_walker_visitor);
154 
155  if (*(decision_tree_walker_visitor.get_current_decision_tree_node_ptr())) {
156  // filter is registered
157  (*(decision_tree_walker_visitor.get_current_decision_tree_node_ptr()))->removeSubscriber(sender);
158  }
159 
160  return "";
161  }
162 
163  // decision trees are sent in filterinfo. merge into own decision tree:
164 
165  if (!decision_tree_) {
166  decision_tree_ = fInfo->dynamic_filter_;
167  } else {
168  decision_tree_->merge(fInfo->dynamic_filter_, sender);
169  }
170 
171  has_new_subscription_ = true;
172  return "";
173  } // processSubscribePayload
174 
175  // filterinfo from sender (maybe a predicate)
176  virtual void processUnsubscribePayload(const typename NetworkType::Key & sender, typename message::FilterInfo::Ptr filterInfo) override {
177  decision_tree_->removeSubscriber(sender);
178  }
179 
180  // called by routing strategy:
181  virtual void processRoutingStrategyUnsubscribeNotification(const typename NetworkType::Key & sender) override {
182  decision_tree_->removeSubscriber(sender);
183  }
184 
185  // function matching against dynamic filters of subscriber before forwarding to subscriber:
186  bool match(const typename NetworkType::Key & to, typename message::FilterInfo::Ptr filterInfo, typename BaseFilter<EventType, NetworkType>::PayloadPtr event) {
187  static std::set<typename NetworkType::Key> matching_subscribers; // stores matched subscriptions
188 
189  if (!event || current_event_ != event || has_new_subscription_) {
190  // subscription not already determined
191  matching_subscribers.clear();
192  has_new_subscription_ = false;
193  current_event_ = event;
194  decision_tree_->visit(*event, matching_subscribers);
195  }
196 
197  return (matching_subscribers.find(to) == matching_subscribers.end() ? false : true);
198  }
199 
200  // function matching against own dynamic_filters before delivering to application:
202  return match(self_, typename message::FilterInfo::Ptr(), event);
203  }
204 
205  void setSelf(const typename NetworkType::Key & self) {
206  self_ = self;
207  }
208 
209  private:
210  // The key of this node
211  typename NetworkType::Key self_;
212 
213  typename FilterInfoType::Ptr cast(typename message::FilterInfo::Ptr ptr) const {
214  typename FilterInfoType::Ptr ret = boost::dynamic_pointer_cast<FilterInfoType>(ptr);
215  if (!ret) {
216  M2ETIS_LOG_ERROR("Filter Strategy", "Downcast error of filterInfo");
217  }
218  return ret;
219  }
220 
221  boost::shared_ptr<DecisionTreeNode<EventType, NetworkType> > decision_tree_;
222 
223  // needed to determine if decision tree has to be evaluated again or if previous result can be used:
224  bool has_new_subscription_;
225  boost::shared_ptr<EventType> current_event_; // not via == operator, so EventType does not have to overload ==
226  };
227 
228 } /* namespace filter */
229 } /* namespace pubsub */
230 } /* namespace m2etis */
231 
232 #endif /* __M2ETIS_PUBSUB_FILTER_DECISIONTREEFILTER_H__ */
233 
virtual std::string getPublishPayload(const typename BaseFilter< EventType, NetworkType >::PayloadPtr message_text) const override
BaseFilter< EventType, NetworkType >::FilterUnsubscribeInformation getUnsubscribePayload(typename message::FilterInfo::Ptr filterInfo, boost::shared_ptr< FilterExp< EventType > > filter)
deregisters individual filter returns information whether to
void setSelf(const typename NetworkType::Key &self)
bool match(typename BaseFilter< EventType, NetworkType >::PayloadPtr event)
virtual void processUnsubscribePayload(const typename NetworkType::Key &sender, typename message::FilterInfo::Ptr filterInfo) override
removes all filters of the sender with the key given
bool match(const typename NetworkType::Key &to, typename message::FilterInfo::Ptr filterInfo, typename BaseFilter< EventType, NetworkType >::PayloadPtr event)
boost::shared_ptr< DecisionTreeFilterInfo< EventType, NetworkType > > Ptr
boost::shared_ptr< DecisionTreeNode< EventType, NetworkType > > * get_current_decision_tree_node_ptr()
boost::shared_ptr< EventType > PayloadPtr
Definition: BaseFilter.h:48
boost::shared_ptr< FilterInfo > Ptr
Definition: FilterInfo.h:35
virtual void processRoutingStrategyUnsubscribeNotification(const typename NetworkType::Key &sender) override
removes all filters of the sender with the key given intended for routing strategies to signal purged...
boost::shared_ptr< DecisionTreeNode< EventType, NetworkType > > * get_current_decision_tree_node_ptr()
message::DecisionTreeFilterInfo< EventType, NetworkType > FilterInfoType
virtual std::string processSubscribePayload(const typename NetworkType::Key &sender, typename message::FilterInfo::Ptr filterInfo) override
processes the set of received filters from sender
#define M2ETIS_LOG_ERROR(module, message)
Definition: Logger.h:59
virtual void getUnsubscribePayload(typename message::FilterInfo::Ptr filterInfo) override
removes all filters
virtual void getSubscribePayload(boost::shared_ptr< FilterExp< EventType > > filter, bool is_periodic_resubscribe, typename message::FilterInfo::Ptr filterInfo) override
processes the new filter