m2etis  0.4
HierarchicalSpreaditRouting.h
Go to the documentation of this file.
1 /*
2  Copyright (2016) Michael Baer, Daniel Bonrath, All rights reserved.
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15  */
16 
22 #ifndef __M2ETIS_PUBSUB_ROUTING_HIERARCHICALSPREADITROUTING_H__
23 #define __M2ETIS_PUBSUB_ROUTING_HIERARCHICALSPREADITROUTING_H__
24 
25 #include <algorithm>
26 #include <string>
27 #include <vector>
28 
31 
34 
35 #include "boost/date_time/posix_time/posix_time_types.hpp"
36 
37 namespace m2etis {
38 namespace pubsub {
39 namespace routing {
40 
41  template<class NetworkType, unsigned int ChildAmount>
42  class HierarchicalSpreaditRouting : public BaseRouting<NetworkType> {
43  public:
45  typedef std::pair<long, typename NetworkType::Key> TimePair;
46  typedef std::vector<typename NetworkType::Key> KeyList;
47  typedef std::vector<TimePair> TimeList;
48 
49  // [EOC] The next variables are more or less generic to all
50  // distribution (algo) policies and describe their behavoir!
51 
52  // whether or not to register for subscribe messages in forward
53  static const bool register_forward_subscribe = false;
54 
55  // whether or not to register for subscribe messages in deliver
56  static const bool register_deliver_subscribe = true;
57 
58  // whether or not to register for unsubscribe messages in forward
59  static const bool register_forward_unsubscribe = false;
60 
61  // whether or not to register for unsubscribe messages in deliver
62  static const bool register_deliver_unsubscribe = true;
63  // [EOC]
64 
65  // whether or not to activate an automatic periodic resent subscribtion
66  static const bool periodicSubscribtion = true;
67 
68  // the time_duration between resent periodic subscribtions
69  const long periodic_;
70 
71  // the time_duration to decide if a subscriber is purged form the list
72  const long purge_distance_;
73 
75 
76  // inform listener, whether a subscriber has been purged, e.g. by timeout
77  // currently only needed for filter strategies
78  boost::function<void(const typename NetworkType::Key)> removed_subscriber_eventlistener_;
79 
80  unsigned int purgeID_;
81 
82  HierarchicalSpreaditRouting(unsigned int topic_name, PubSubSystemEnvironment * pssi, const typename NetworkType::Key & root) :
83  BaseRouting<NetworkType>(topic_name, pssi)
84  , periodic_(hierarchicalspreadit::RESUBSCRIPTION_INTERVAL), purge_distance_(hierarchicalspreadit::PURGE_DISTANCE), _pssi(pssi)
85  , removed_subscriber_eventlistener_()
86  , topic_name_(topic_name), self_(), subscribed_(false), subscriber_()
87  , purging_(true), _root(root) {
88  purgeID_ = pssi->scheduler_.runRepeated(purge_distance_, boost::bind(&HierarchicalSpreaditRouting::purgeList, this), 0);
89  }
90 
92  purging_ = false;
93  _pssi->scheduler_.stop(purgeID_);
94  }
95 
96  void setSelf(const typename NetworkType::Key & self) {
97  self_ = self;
98  }
99 
100  void setUnsubscriptionListener(const boost::function<void(const typename NetworkType::Key)> & listener) {
101  removed_subscriber_eventlistener_ = listener;
102  }
103 
104  // set the self subscription status
105  void selfSubscribed(const bool b) {
106  subscribed_ = b;
107  }
108 
109  // get the self subscribtion status
110  bool selfSubscribed() const {
111  return subscribed_;
112  }
113 
114  void configureRoutingInfo(message::ActionType & msgType, typename message::RoutingInfo<NetworkType>::Ptr routingInfo, typename NetworkType::Key & receiver) {
115  typename RoutingInfoType::Ptr rInfo = cast(routingInfo);
116  switch (msgType) {
117  case message::SUBSCRIBE: {
118  msgType = message::SUBSCRIBE;
119  break;
120  }
121  case message::UNSUBSCRIBE: {
122  msgType = message::UNSUBSCRIBE;
123  if (!parent_vector.empty()) {
124  rInfo->node_adress = parent_vector.at(0).second;
125  }
126  break;
127  }
128  case message::PUBLISH: {
129  msgType = message::NOTIFY;
130  rInfo->selfSend = true;
131  rInfo->sender = self_;
132  break;
133  }
134  case message::NOTIFY: {
135  msgType = message::NOTIFY;
136  rInfo->sender = self_;
137  break;
138  }
139  case message::CONTROL: {
140  msgType = message::CONTROL;
141  break;
142  }
143  default:
144  M2ETIS_THROW_FAILURE("HierarchicalSpreaditRouting", "configureRoutingInfo called with wrong action type", -1);
145  }
146  return;
147  }
148 
149  KeyList getTargetNodes(const message::ActionType mtype, typename message::RoutingInfo<NetworkType>::Ptr routingInfo, typename NetworkType::Key & receiver) const {
150  typename RoutingInfoType::Ptr rInfo = cast(routingInfo);
151  /*
152  * All messages must be sent to root.
153  * Even if it's a publishmessage and root is subscribed, too.
154  * In deliver::publish, root will take care to deliver the message and spread it to the subscribers.
155  */
156  KeyList m;
157  if (mtype == message::SUBSCRIBE) {
158  // I've been redirected to a new parent
160  m.push_back(rInfo->node_adress);
161  } else if (!parent_vector.empty()) { // if I already have a parent, send subscribe to him - works like a heart beat
162  m.push_back(parent_vector.begin()->second);
163  } else { // subscription process starts from the root
164  if (self_ == _root) {
165  return m;
166  } else {
167  m.push_back(_root);
168  }
169  }
170  } else if (mtype == message::CONTROL) {
171  // receiver has been saved earlier
172  if (receiver == typename NetworkType::Key()) {
173  struct T {
174  static typename NetworkType::Key get_from_pair(const HierarchicalSpreaditRouting::TimePair & p) {
175  return p.second;
176  }
177  };
178 
179  m.resize(subscriber_.size());
180  std::transform(subscriber_.begin(), subscriber_.end(), m.begin(), T::get_from_pair);
181  } else {
182  m.push_back(receiver);
183  }
184  } else if (mtype == message::PUBLISH) {
185  // here we need to send the message always to root (even if we are the root)
186  // because the processing for publish messages is done in deliver
187  assert(false);
188  } else if (mtype == message::NOTIFY) {
189  if (!parent_vector.empty()) { // if node has parent
190  if (rInfo->sender != parent_vector.begin()->second) { // and the messages doesn't come from the parent
191  m.push_back(parent_vector.begin()->second); // the message has to be sent to the parent
192  }
193  } else { // the node doesn't have a parent
194  if (self_ != _root) { // this node isn't root node
195  m.push_back(_root); // the message has to be sent to root
196  }
197  }
198 
199  for (TimePair p : subscriber_) {
200  if (p.second != rInfo->sender) {
201  m.push_back(p.second); // add every subscriber not being the sender of the message as receiver
202  }
203  }
204  if (rInfo->selfSend) {
205  m.push_back(self_);
206  }
207  rInfo->sender = self_;
208  } else if (mtype == message::UNSUBSCRIBE) {
209  // message must be published, that means from root to subscribers
210  // Simply return a list of subscribed nodes
211 
212  // That's nicer than for_each and push_back, isn't it?
213  // std::transform is really good!
214  struct T {
215  static typename NetworkType::Key get_from_pair(const TimePair & p) {
216  return p.second;
217  }
218  };
219 
220  m.resize(subscriber_.size());
221  std::transform(subscriber_.begin(), subscriber_.end(), m.begin(), T::get_from_pair);
222 
223  if (!parent_vector.empty()) {
224  m.push_back(parent_vector.at(0).second);
225  }
226  } else {
227  M2ETIS_THROW_FAILURE("Spreadit Routing", "unknown action type", -1);
228  }
229 
230  return m;
231  }
232 
233  bool processSubscribePayload(typename message::RoutingInfo<NetworkType>::Ptr routingInfo, const typename NetworkType::Key & sender, typename NetworkType::Key & receiver, message::ActionType & msgType) override {
234  typename RoutingInfoType::Ptr rInfo = cast(routingInfo);
235 
236  bool is_subscribe_successful = true;
237  bool found = false;
238  typename TimeList::iterator iter;
239 
240  for (iter = subscriber_.begin(); iter != subscriber_.end(); ++iter) {
241  if (iter->second == sender) {
242  found = true;
243  break;
244  }
245  }
246  if (found) {
247  // update entry
248  iter->first = _pssi->clock_.getTime();
249  // send ack to my subscriber - heart beat
251  } else {
252  // check if I have capacity for more children
253  if (bandwidth_capacity > subscriber_.size()) {
254  // insert entry
255  subscriber_.push_back(std::make_pair(_pssi->clock_.getTime(), sender));
256  M2ETIS_LOG_DEBUG(self_.toStr(), " added " << sender.toStr());
258  msgType = message::CONTROL;
259  } else { // redirect to another node
261  rInfo->node_adress = typename NetworkType::Key(subscriber_.at(round_robin_pointer).second);
262  round_robin_pointer = (round_robin_pointer + 1) % subscriber_.size();
263  msgType = message::CONTROL;
264  is_subscribe_successful = false;
265  }
266  }
267 
268  receiver = sender;
269  return is_subscribe_successful;
270  }
271 
272  void processUnsubscribePayload(typename message::RoutingInfo<NetworkType>::Ptr routingInfo, const typename NetworkType::Key & sender, typename NetworkType::Key & receiver, message::ActionType & msgType) {
273  typename RoutingInfoType::Ptr rInfo = cast(routingInfo);
274 
275  struct T {
276  static bool test(const typename NetworkType::Key & send, const TimePair & paar) {
277  return paar.second == send;
278  }
279  };
280 
281  subscriber_.erase(std::remove_if(subscriber_.begin(), subscriber_.end(), boost::bind(T::test, sender, _1)), subscriber_.end());
282 
283  // if it was my parent, he has sent a redirect address
284  if (!parent_vector.empty()) {
285  if (sender == parent_vector.at(0).second) {
286  msgType = message::SUBSCRIBE;
287  receiver = rInfo->node_adress;
289  }
290  }
292  }
293 
294  void processPublishPayload(typename message::RoutingInfo<NetworkType>::Ptr routingInfo, const typename NetworkType::Key & sender, typename NetworkType::Key & receiver, message::ActionType & msgType) {
295  assert(false);
296  }
297 
298  void processNotifyPayload(typename message::RoutingInfo<NetworkType>::Ptr routingInfo, const typename NetworkType::Key & sender, typename NetworkType::Key & receiver, message::ActionType & msgType) {
299  if ((subscriber_.empty() && (parent_vector.empty() && self_ == _root)) || sender == self_) {
301  }
302  }
303 
304  void processControlPayload(typename message::RoutingInfo<NetworkType>::Ptr routingInfo, const typename NetworkType::Key & sender, typename NetworkType::Key & receiver, message::ActionType & msgType) {
305  typename RoutingInfoType::Ptr rInfo = cast(routingInfo);
306  // new parent acknowledges the subscription
308  if (parent_vector.empty()) {
309  parent_vector.push_back(std::make_pair(_pssi->clock_.getTime(), sender));
310  } else {
311  parent_vector.begin()->second = sender;
312  parent_vector.begin()->first = _pssi->clock_.getTime();
313  }
315  } else if (rInfo->action == message::RoutingInfo<NetworkType>::RoutingType::REDIRECT) { // parent forwards my subscription request to another node
316  msgType = message::SUBSCRIBE;
317  receiver = rInfo->node_adress;
319  } else { // nothing to do, stop the workflow
321  }
322  }
323 
324  private:
325  // The topic name
326  const unsigned int topic_name_;
327  // The own key of this actual node
328  typename NetworkType::Key self_;
329  // Is this node subscribed on the topic?
330  bool subscribed_;
331 
332  unsigned int bandwidth_capacity = ChildAmount;
333  unsigned int round_robin_pointer = 0;
334 
335  // List of all subscribers (TimeStamp and Key)
336  TimeList subscriber_;
337  // List of all parents (TimeStamp and Key)
338  TimeList parent_vector;
339 
340  // Control variables for the purging thread
341  volatile bool purging_;
342 
343  typename NetworkType::Key _root;
344 
345  typename RoutingInfoType::Ptr cast(typename message::RoutingInfo<NetworkType>::Ptr ptr) const {
346  typename RoutingInfoType::Ptr ret = boost::dynamic_pointer_cast<RoutingInfoType>(ptr);
347 
348  if (!ret) {
349  M2ETIS_LOG_ERROR("Routing Strategy", "Downcast error of routingInfo");
350  }
351 
352  return ret;
353  }
354 
355  bool purgeList() {
356  // Helper-Struct to create a specialized method which can be used to generically erase from a vector
357  struct T {
358  static bool test(const long & p, const long & jetzt, const TimePair & paar) {
359  if ((jetzt - paar.first) > p) {
360  return true;
361  }
362  return false;
363  }
364  };
365 
366  if (subscriber_.empty() && parent_vector.empty()) {
367  return purging_;
368  }
369 
370  // lock the list
371  // get the acutal timestamp
372  // and remove every subscriber which hasn't resubscribed within the "purge_distance"
373  long jetzt = _pssi->clock_.getTime();
374 
375  // also: inform unsubscription listeners:
376  auto iter_first_erased_subscriber = std::remove_if(subscriber_.begin(), subscriber_.end(), boost::bind(T::test, purge_distance_, jetzt, _1));
377  // notify listeners about erased subscribers:
378  for (auto iter_subscriber = iter_first_erased_subscriber; iter_subscriber != subscriber_.end(); ++iter_subscriber) {
379  removed_subscriber_eventlistener_(iter_subscriber->second);
380  }
381 
382  subscriber_.erase(iter_first_erased_subscriber, subscriber_.end());
383 
384  return purging_;
385  }
386  };
387 
388 } /* namespace routing */
389 } /* namespace pubsub */
390 } /* namespace m2etis */
391 
392 #endif /* __M2ETIS_PUBSUB_ROUTING_HIERARCHICALSPREADITROUTING_H__ */
393 
bool processSubscribePayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &sender, typename NetworkType::Key &receiver, message::ActionType &msgType) override
#define M2ETIS_THROW_FAILURE(module, message, errorcode)
throws on internal errors
Definition: Exceptions.h:33
void setUnsubscriptionListener(const boost::function< void(const typename NetworkType::Key)> &listener)
#define M2ETIS_LOG_DEBUG(module, message)
Definition: Logger.h:53
HierarchicalSpreaditRouting(unsigned int topic_name, PubSubSystemEnvironment *pssi, const typename NetworkType::Key &root)
void configureRoutingInfo(message::ActionType &msgType, typename message::RoutingInfo< NetworkType >::Ptr routingInfo, typename NetworkType::Key &receiver)
void setSelf(const typename NetworkType::Key &self)
uint64_t runRepeated(uint64_t interval, const boost::function< bool(void)> &func, int16_t priority)
adds new job running repeatedly
Definition: Scheduler.h:94
boost::function< void(const typename NetworkType::Key)> removed_subscriber_eventlistener_
std::pair< long, typename NetworkType::Key > TimePair
Scheduler< util::RealTimeClock > scheduler_
util::Clock< util::RealTimeClock > clock_
#define M2ETIS_LOG_ERROR(module, message)
Definition: Logger.h:59
boost::shared_ptr< RoutingInfo< NetworkType > > Ptr
Definition: RoutingInfo.h:32
uint64_t getTime() const
Will return the time since the Clock on the rendezvouz node has started.
Definition: Clock.h:68
void processPublishPayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &sender, typename NetworkType::Key &receiver, message::ActionType &msgType)
void processUnsubscribePayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &sender, typename NetworkType::Key &receiver, message::ActionType &msgType)
void processControlPayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &sender, typename NetworkType::Key &receiver, message::ActionType &msgType)
void stop(uint64_t id)
Definition: Scheduler.h:110
void processNotifyPayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &sender, typename NetworkType::Key &receiver, message::ActionType &msgType)
boost::shared_ptr< HierarchicalSpreadItRoutingInfo< NetworkType > > Ptr
static const uint64_t PURGE_DISTANCE
message::HierarchicalSpreadItRoutingInfo< NetworkType > RoutingInfoType
KeyList getTargetNodes(const message::ActionType mtype, typename message::RoutingInfo< NetworkType >::Ptr routingInfo, typename NetworkType::Key &receiver) const
static const uint64_t RESUBSCRIPTION_INTERVAL