m2etis  0.4
BruteForcePartition.h
Go to the documentation of this file.
1 /*
2  Copyright (2016) Michael Baer, Daniel Bonrath, All rights reserved.
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15  */
16 
22 #ifndef __M2ETIS_PUBSUB_PARTITION_BRUTEFORCEPARTITION_H__
23 #define __M2ETIS_PUBSUB_PARTITION_BRUTEFORCEPARTITION_H__
24 
25 #include <map>
26 
30 
31 namespace m2etis {
32 namespace pubsub {
33 namespace partition {
34 
35  template<typename NetworkType, typename EventType>
36  class BruteForcePartition : public BasePartition<NetworkType, EventType> {
37  public:
38  typedef boost::shared_ptr<EventType> PayloadPtr;
39 
40  static const bool DYNAMIC_PARTITION = false;
41 
42  BruteForcePartition() : partition_filter_vector_(std::vector<boost::shared_ptr<filter::FilterExp<EventType>> >()) {
43  }
44 
45  explicit BruteForcePartition(const std::vector<boost::shared_ptr<filter::FilterExp<EventType>> > & partition_filter_vector) : partition_filter_vector_(partition_filter_vector) {
46  }
47 
48  virtual ~BruteForcePartition() {}
49 
50  void createRendezvousPartition(const typename NetworkType::Key &) {
51  }
52 
53  // returns vector of all tree indexes of channel
54  std::vector<int> getTreeNames() {
55  std::vector<int> ret;
56  if (partition_filter_vector_.empty()) {
57  ret.push_back(0);
58  return ret;
59  }
60  for (std::vector<int>::size_type i = 0; i != partition_filter_vector_.size(); ++i) {
61  ret.push_back(int(i));
62  }
63  return ret;
64  }
65 
66  // returns tree on which the given message should be published (matching algorithm)
67  std::vector<int>::size_type getPublishTree(const PayloadPtr message, const typename NetworkType::Key &) {
68  if (partition_filter_vector_.empty()) {
69  return 0;
70  }
71 
72  std::vector<int>::size_type i = 0;
73  for (auto current_filter : partition_filter_vector_) {
74  filter::MatchVisitor<EventType> match_filter_visitor(*message);
75 
76  current_filter->Accept(match_filter_visitor);
77  if (match_filter_visitor.get_result()) {
78  M2ETIS_LOG_INFO("BruteForcePartition", "publishing message on tree number " << i);
79  return i;
80  }
81  ++i;
82  }
83  M2ETIS_THROW_API("BruteForcePartition", "No matching tree for publish message found.");
84  }
85 
86  // returns trees which have a static filter matching the given dynamic filter (overlap algorithm)
87  std::vector<unsigned int> getSubscribeTrees(boost::shared_ptr<filter::FilterExp<EventType> > dynamic_filter) {
88  std::vector<unsigned int> overlapping_trees;
89  if (partition_filter_vector_.empty()) {
90  overlapping_trees.push_back(0);
91  return overlapping_trees;
92  }
93  unsigned int tree_index = 0;
94 
95  for (auto partition_filter : partition_filter_vector_) {
96  if (isOverlap(partition_filter, dynamic_filter)) {
97  overlapping_trees.push_back(tree_index);
98  }
99  ++tree_index;
100  }
101  return overlapping_trees;
102  } // getSubscribeTrees
103 
104  boost::shared_ptr<filter::FilterExp<EventType>> getPredicate(size_t id) {
105  assert(id < partition_filter_vector_.size());
106  return partition_filter_vector_[id];
107  }
108 
109  bool createPartition(const typename NetworkType::Key &) {
110  return false;
111  }
112 
113  void addPartition(boost::shared_ptr<filter::FilterExp<EventType>>, const typename NetworkType::Key &) {
114  }
115 
116  void removePartition(size_t) {
117  }
118 
119  void changePredicate(size_t, boost::shared_ptr<filter::FilterExp<EventType>>) {
120  }
121 
122  void changeRoot(size_t, typename NetworkType::Key &) {
123  }
124 
125  private:
126  bool isOverlap(const boost::shared_ptr<filter::FilterExp<EventType>> static_filter, const boost::shared_ptr<filter::FilterExp<EventType>> dynamic_filter) {
127  // step 1: test expression for satisfiability and determine predicate assignment candidates:
128  auto intersection_filter = std::make_shared<filter::AndExp<EventType>>(static_filter, dynamic_filter);
129 
130  filter::VariableAssignmentVisitor<EventType> variable_assignment_filter_visitor;
131  intersection_filter->Accept(variable_assignment_filter_visitor); // determining number of predicates
132 
133  unsigned long number_of_assignment_possibilities = static_cast<unsigned long>(pow(2.0, variable_assignment_filter_visitor.get_predicate_number()));
134  std::vector<unsigned long> fulfilling_predicate_assignments;
135 
136  for (unsigned long predicate_assignment = 0; predicate_assignment != number_of_assignment_possibilities; ++predicate_assignment) {
137  variable_assignment_filter_visitor.reset();
138  variable_assignment_filter_visitor.set_predicate_assignment(predicate_assignment);
139  intersection_filter->Accept(variable_assignment_filter_visitor);
140  if (variable_assignment_filter_visitor.get_result()) {
141  fulfilling_predicate_assignments.push_back(predicate_assignment);
142  }
143  }
144 
145  // step 2: test all candidate predicate assignments from step 1
146  for (auto fulfilling_predicate_assignment_iterator = fulfilling_predicate_assignments.begin(); fulfilling_predicate_assignment_iterator != fulfilling_predicate_assignments.end(); ++fulfilling_predicate_assignment_iterator) {
147  bool disjoint_predicates_found = false;
148  auto fulfilling_predicate_assignment = *fulfilling_predicate_assignment_iterator;
149  // testing every predicate against every other predicate for overlap
150 
151  // loop over bits in predicate assignment, check for 1 bits (predicates (indexes), that have to be true):
152  for (int current_predicate_number = 0; current_predicate_number != variable_assignment_filter_visitor.get_predicate_number() && !disjoint_predicates_found; ++current_predicate_number) {
153  if ((fulfilling_predicate_assignment & (1UL << current_predicate_number)) != 0) {
154  // predicate has to be true in order to fulfill variable assignment, compare with all following predicates
155  for (int other_predicate_number = current_predicate_number + 1; other_predicate_number != variable_assignment_filter_visitor.get_predicate_number() && !disjoint_predicates_found; ++other_predicate_number) {
156  if ((fulfilling_predicate_assignment & (1UL << other_predicate_number)) != 0) {
157  // predicate has to be true in order to fulfill variable assignment, compare with current predicate
158  if (!((variable_assignment_filter_visitor.get_predicate_index())[current_predicate_number])->overlaps((variable_assignment_filter_visitor.get_predicate_index())[other_predicate_number])) {
159  disjoint_predicates_found = true; // try next predicate assignment (exit loops for performance reasons in for conditions)
160  }
161  }
162  }
163  }
164  }
165  if (!disjoint_predicates_found) {
166  return true; // predicate assignment found, which is a true overlap
167  }
168  } // loop over predicate assignments, which are possible overlaps
169 
170  return false;
171  } // function isOverlap
172 
173  std::vector<boost::shared_ptr<filter::FilterExp<EventType>> > partition_filter_vector_;
174  // one entry for each tree of channel
175  // TODO: (Roland) make const including elements
176  }; // class BruteForcePartition
177 
178 } /* namespace partition */
179 } /* namespace pubsub */
180 } /* namespace m2etis */
181 
182 #endif /* __M2ETIS_PUBSUB_PARTITION_BRUTEFORCEPARTITION_H__ */
183 
std::vector< unsigned int > getSubscribeTrees(boost::shared_ptr< filter::FilterExp< EventType > > dynamic_filter)
BruteForcePartition(const std::vector< boost::shared_ptr< filter::FilterExp< EventType >> > &partition_filter_vector)
STL namespace.
void createRendezvousPartition(const typename NetworkType::Key &)
creates a base partition for the RP (dynamic)
bool createPartition(const typename NetworkType::Key &)
can create a new partition for this channel if a new node (root) joins, return true, if a new partition was created, otherwise false (dynamic)
#define M2ETIS_LOG_INFO(module, message)
Definition: Logger.h:55
void addPartition(boost::shared_ptr< filter::FilterExp< EventType >>, const typename NetworkType::Key &)
void changeRoot(size_t, typename NetworkType::Key &)
void changePredicate(size_t, boost::shared_ptr< filter::FilterExp< EventType >>)
changes the filter predicate for the partition with the given id (dynamic)
std::vector< int >::size_type getPublishTree(const PayloadPtr message, const typename NetworkType::Key &)
returns the tree index the given node with the given payload should publish on (static & dynamic) ...
boost::shared_ptr< filter::FilterExp< EventType > > getPredicate(size_t id)
returns the predicate for the given partition id (dynamic)
std::vector< int > getTreeNames()
returns amount of partitions this strategy contains (static & dynamic)
#define M2ETIS_THROW_API(module, message)
throws on wrong API usage
Definition: Exceptions.h:40
void removePartition(size_t)
removes the partition with the given id (dynamic)