m2etis  0.4
SpreaditRouting.h
Go to the documentation of this file.
1 /*
2  Copyright (2016) Michael Baer, Daniel Bonrath, All rights reserved.
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15  */
16 
22 #ifndef __M2ETIS_PUBSUB_ROUTING_SPREADITROUTING_H__
23 #define __M2ETIS_PUBSUB_ROUTING_SPREADITROUTING_H__
24 
25 #include <algorithm>
26 #include <string>
27 #include <vector>
28 
31 
34 
35 #include "boost/date_time/posix_time/posix_time_types.hpp"
36 
37 namespace m2etis {
38 namespace pubsub {
39 namespace routing {
40 
41  template<class NetworkType>
42  class SpreaditRouting : public BaseRouting<NetworkType> {
43  public:
45  typedef std::pair<uint64_t, typename NetworkType::Key> TimePair;
46  typedef std::vector<typename NetworkType::Key> KeyList;
47  typedef std::vector<TimePair> TimeList;
48 
49  // [EOC] The next variables are more or less generic to all
50  // distribution (algo) policies and describe their behavoir!
51 
52  // whether or not to register for subscribe messages in forward
53  static const bool register_forward_subscribe = false;
54 
55  // whether or not to register for subscribe messages in deliver
56  static const bool register_deliver_subscribe = true;
57 
58  // whether or not to register for unsubscribe messages in forward
59  static const bool register_forward_unsubscribe = false;
60 
61  // whether or not to register for unsubscribe messages in deliver
62  static const bool register_deliver_unsubscribe = true;
63  // [EOC]
64 
65  // whether or not to activate an automatic periodic resent subscribtion
66  static const bool periodicSubscribtion = true;
67 
68  // the time_duration between resent periodic subscribtions
69  const uint64_t periodic_;
70 
71  // the time_duration to decide if a subscriber is purged form the list
72  const uint64_t purge_distance_;
73 
75 
76  // inform listener, whether a subscriber has been purged, e.g. by timeout
77  // currently only needed for filter strategies
78  boost::function<void(const typename NetworkType::Key)> removed_subscriber_eventlistener_;
79 
80  uint64_t purgeID_;
81 
82  SpreaditRouting(unsigned int topic_name, PubSubSystemEnvironment * pssi, const typename NetworkType::Key & root) :
83  BaseRouting<NetworkType>(topic_name, pssi)
84  , periodic_(spreadit::RESUBSCRIPTION_INTERVAL), purge_distance_(spreadit::PURGE_DISTANCE), _pssi(pssi)
85  , removed_subscriber_eventlistener_()
86  , topic_name_(topic_name), self_(), subscribed_(false), subscriber_()
87  , purging_(true), _root(root) {
88  purgeID_ = pssi->scheduler_.runRepeated(purge_distance_, boost::bind(&SpreaditRouting::purgeList, this), 0);
89  }
90 
91  virtual ~SpreaditRouting() {
92  purging_ = false;
93  _pssi->scheduler_.stop(purgeID_);
94  }
95 
96  void setSelf(const typename NetworkType::Key & self) {
97  self_ = self;
98  }
99 
100  void setUnsubscriptionListener(const boost::function<void(const typename NetworkType::Key)> & listener) {
101  removed_subscriber_eventlistener_ = listener;
102  }
103 
104  // set the self subscription status
105  void selfSubscribed(const bool b) {
106  subscribed_ = b;
107  }
108 
109  // get the self subscribtion status
110  bool selfSubscribed() const {
111  return subscribed_;
112  }
113 
114  void configureRoutingInfo(message::ActionType & msgType, typename message::RoutingInfo<NetworkType>::Ptr routingInfo, typename NetworkType::Key &) {
115  typename RoutingInfoType::Ptr rInfo = cast(routingInfo);
116  switch (msgType) {
117  case message::SUBSCRIBE: {
118  msgType = message::SUBSCRIBE;
119  break;
120  }
121  case message::UNSUBSCRIBE: {
122  msgType = message::UNSUBSCRIBE;
123  if (!parent_vector.empty()) {
124  rInfo->node_adress = parent_vector.at(0).second;
125  }
126  break;
127  }
128  case message::PUBLISH: {
129  msgType = message::PUBLISH;
130  break;
131  }
132  case message::NOTIFY: {
133  msgType = message::NOTIFY;
134  break;
135  }
136  case message::CONTROL: {
137  msgType = message::CONTROL;
138  break;
139  }
140  default:
141  M2ETIS_THROW_FAILURE("Spreadit Routing", "configureRoutingInfo called with wrong action type", -1);
142  }
143  return;
144  }
145 
146  KeyList getTargetNodes(const message::ActionType mtype, typename message::RoutingInfo<NetworkType>::Ptr routingInfo, typename NetworkType::Key & receiver) const {
147  typename RoutingInfoType::Ptr rInfo = cast(routingInfo);
148  /*
149  * All messages must be sent to root.
150  * Even if it's a publishmessage and root is subscribed, too.
151  * In deliver::publish, root will take care to deliver the message and spread it to the subscribers.
152  */
153  KeyList m;
154  if (mtype == message::SUBSCRIBE) {
155  // I've been redirected to a new parent
157  m.push_back(rInfo->node_adress);
158  } else if (!parent_vector.empty()) { // if I already have a parent, send subscribe to him - works like a heart beat
159  m.push_back(parent_vector.begin()->second);
160  } else { // subscription process starts from the root
161  if (self_ == _root) {
162  return m;
163  } else {
164  m.push_back(_root);
165  }
166  }
167  } else if (mtype == message::CONTROL) {
168  // receiver has been saved earlier
169  if (receiver == typename NetworkType::Key()) {
170  struct T {
171  static typename NetworkType::Key get_from_pair(const SpreaditRouting::TimePair & p) {
172  return p.second;
173  }
174  };
175 
176  m.resize(subscriber_.size());
177  std::transform(subscriber_.begin(), subscriber_.end(), m.begin(), T::get_from_pair);
178  } else {
179  m.push_back(receiver);
180  }
181  } else if (mtype == message::PUBLISH) {
182  // here we need to send the message always to root (even if we are the root)
183  // because the processing for publish messages is done in deliver
184  m.push_back(_root);
185  } else if (mtype == message::NOTIFY) {
186  // message must be published, that means from root to subscribers
187  // Simply return a list of subscribed nodes
188 
189  // That's nicer than for_each and push_back, isn't it?
190  // std::transform is really good!
191  struct T {
192  static typename NetworkType::Key get_from_pair(const SpreaditRouting::TimePair & p) {
193  return p.second;
194  }
195  };
196 
197  m.resize(subscriber_.size());
198  std::transform(subscriber_.begin(), subscriber_.end(), m.begin(), T::get_from_pair);
199  } else if (mtype == message::UNSUBSCRIBE) {
200  // message must be published, that means from root to subscribers
201  // Simply return a list of subscribed nodes
202 
203  // That's nicer than for_each and push_back, isn't it?
204  // std::transform is really good!
205  struct T {
206  static typename NetworkType::Key get_from_pair(const TimePair & p) {
207  return p.second;
208  }
209  };
210 
211  m.resize(subscriber_.size());
212  std::transform(subscriber_.begin(), subscriber_.end(), m.begin(), T::get_from_pair);
213 
214  if (!parent_vector.empty()) {
215  m.push_back(parent_vector.at(0).second);
216  }
217  } else {
218  M2ETIS_THROW_FAILURE("Spreadit Routing", "unknown action type", -1);
219  }
220 
221  return m;
222  }
223 
224  bool processSubscribePayload(typename message::RoutingInfo<NetworkType>::Ptr routingInfo, const typename NetworkType::Key & sender, typename NetworkType::Key & receiver, message::ActionType & msgType) override {
225  typename RoutingInfoType::Ptr rInfo = cast(routingInfo);
226 
227  bool is_subscribe_successful = true;
228  bool found = false;
229  typename TimeList::iterator iter;
230 
231  for (iter = subscriber_.begin(); iter != subscriber_.end(); ++iter) {
232  if (iter->second == sender) {
233  found = true;
234  break;
235  }
236  }
237  if (found) {
238  // update entry
239  iter->first = _pssi->clock_.getTime();
240  // send ack to my subscriber - heart beat
242  } else {
243  // check if I have capacity for more children
244  if (bandwidth_capacity > subscriber_.size()) {
245  // insert entry
246  subscriber_.push_back(std::make_pair(_pssi->clock_.getTime(), sender));
247  M2ETIS_LOG_DEBUG(self_.toStr(), " added " << sender.toStr());
249  msgType = message::CONTROL;
250  } else { // redirect to another node
252  rInfo->node_adress = typename NetworkType::Key(subscriber_.at(round_robin_pointer).second);
253  round_robin_pointer = (round_robin_pointer + 1) % subscriber_.size();
254  msgType = message::CONTROL;
255  is_subscribe_successful = false;
256  }
257  }
258 
259  receiver = sender;
260  return is_subscribe_successful;
261  }
262 
263  void processUnsubscribePayload(typename message::RoutingInfo<NetworkType>::Ptr routingInfo, const typename NetworkType::Key & sender, typename NetworkType::Key & receiver, message::ActionType & msgType) {
264  typename RoutingInfoType::Ptr rInfo = cast(routingInfo);
265 
266  struct T {
267  static bool test(const typename NetworkType::Key & send, const TimePair & paar) {
268  return paar.second == send;
269  }
270  };
271 
272  subscriber_.erase(std::remove_if(subscriber_.begin(), subscriber_.end(), boost::bind(T::test, sender, _1)), subscriber_.end());
273 
274  // if it was my parent, he has sent a redirect address
275  if (!parent_vector.empty()) {
276  if (sender == parent_vector.at(0).second) {
277  msgType = message::SUBSCRIBE;
278  receiver = rInfo->node_adress;
280  }
281  }
283  }
284 
285  void processPublishPayload(typename message::RoutingInfo<NetworkType>::Ptr routingInfo, const typename NetworkType::Key &, typename NetworkType::Key &, message::ActionType & msgType) {
286  msgType = message::NOTIFY;
287  }
288 
289  void processNotifyPayload(typename message::RoutingInfo<NetworkType>::Ptr routingInfo, const typename NetworkType::Key &, typename NetworkType::Key &, message::ActionType &) {
290  if (subscriber_.empty()) {
292  }
293  }
294 
295  void processControlPayload(typename message::RoutingInfo<NetworkType>::Ptr routingInfo, const typename NetworkType::Key & sender, typename NetworkType::Key & receiver, message::ActionType & msgType) {
296  typename RoutingInfoType::Ptr rInfo = cast(routingInfo);
297  // new parent acknowledges the subscription
299  if (parent_vector.empty()) {
300  parent_vector.push_back(std::make_pair(_pssi->clock_.getTime(), sender));
301  } else {
302  parent_vector.begin()->second = sender;
303  parent_vector.begin()->first = _pssi->clock_.getTime();
304  }
306  } else if (rInfo->action == message::RoutingInfo<NetworkType>::RoutingType::REDIRECT) { // parent forwards my subscription request to another node
307  msgType = message::SUBSCRIBE;
308  receiver = rInfo->node_adress;
310  } else { // nothing to do, stop the workflow
312  }
313  }
314 
315  private:
316  // The topic name
317  const unsigned int topic_name_;
318  // The own key of this actual node
319  typename NetworkType::Key self_;
320  // Is this node subscribed on the topic?
321  bool subscribed_;
322 
323  unsigned int bandwidth_capacity = spreadit::ALLOWED_CHILDS;
324  unsigned int round_robin_pointer = 0;
325 
326  // List of all subscribers (TimeStamp and Key)
327  TimeList subscriber_;
328  // List of all parents (TimeStamp and Key)
329  TimeList parent_vector;
330 
331  // Control variables for the purging thread
332  volatile bool purging_;
333 
334  typename NetworkType::Key _root;
335 
336  typename RoutingInfoType::Ptr cast(typename message::RoutingInfo<NetworkType>::Ptr ptr) const {
337  typename RoutingInfoType::Ptr ret = boost::dynamic_pointer_cast<RoutingInfoType>(ptr);
338 
339  if (!ret) {
340  M2ETIS_LOG_ERROR("Routing Strategy", "Downcast error of routingInfo");
341  }
342 
343  return ret;
344  }
345 
346  bool purgeList() {
347  // Helper-Struct to create a specialized method which can be used to generically erase from a vector
348  struct T {
349  static bool test(const uint64_t & p, const uint64_t & jetzt, const TimePair & paar) {
350  if ((jetzt - paar.first) > p) {
351  return true;
352  }
353  return false;
354  }
355  };
356 
357  if (subscriber_.empty() && parent_vector.empty()) {
358  return purging_;
359  }
360 
361  // lock the list
362  // get the acutal timestamp
363  // and remove every subscriber which hasn't resubscribed within the "purge_distance"
364  uint64_t jetzt = _pssi->clock_.getTime();
365 
366  // also: inform unsubscription listeners:
367  auto iter_first_erased_subscriber = std::remove_if(subscriber_.begin(), subscriber_.end(), boost::bind(T::test, purge_distance_, jetzt, _1));
368  // notify listeners about erased subscribers:
369  for (auto iter_subscriber = iter_first_erased_subscriber; iter_subscriber != subscriber_.end(); ++iter_subscriber) {
370  removed_subscriber_eventlistener_(iter_subscriber->second);
371  }
372 
373  subscriber_.erase(iter_first_erased_subscriber, subscriber_.end());
374 
375  return purging_;
376  }
377 
378  SpreaditRouting & operator=(const SpreaditRouting &);
379  };
380 
381 } /* namespace routing */
382 } /* namespace pubsub */
383 } /* namespace m2etis */
384 
385 #endif /* __M2ETIS_PUBSUB_ROUTING_SPREADITROUTING_H__ */
386 
boost::shared_ptr< SpreadItRoutingInfo< NetworkType > > Ptr
void processNotifyPayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &, typename NetworkType::Key &, message::ActionType &)
#define M2ETIS_THROW_FAILURE(module, message, errorcode)
throws on internal errors
Definition: Exceptions.h:33
bool processSubscribePayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &sender, typename NetworkType::Key &receiver, message::ActionType &msgType) override
message::SpreadItRoutingInfo< NetworkType > RoutingInfoType
boost::function< void(const typename NetworkType::Key)> removed_subscriber_eventlistener_
#define M2ETIS_LOG_DEBUG(module, message)
Definition: Logger.h:53
void processUnsubscribePayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &sender, typename NetworkType::Key &receiver, message::ActionType &msgType)
void setSelf(const typename NetworkType::Key &self)
static const uint32_t ALLOWED_CHILDS
void configureRoutingInfo(message::ActionType &msgType, typename message::RoutingInfo< NetworkType >::Ptr routingInfo, typename NetworkType::Key &)
KeyList getTargetNodes(const message::ActionType mtype, typename message::RoutingInfo< NetworkType >::Ptr routingInfo, typename NetworkType::Key &receiver) const
uint64_t runRepeated(uint64_t interval, const boost::function< bool(void)> &func, int16_t priority)
adds new job running repeatedly
Definition: Scheduler.h:94
Scheduler< util::RealTimeClock > scheduler_
util::Clock< util::RealTimeClock > clock_
#define M2ETIS_LOG_ERROR(module, message)
Definition: Logger.h:59
boost::shared_ptr< RoutingInfo< NetworkType > > Ptr
Definition: RoutingInfo.h:32
uint64_t getTime() const
Will return the time since the Clock on the rendezvouz node has started.
Definition: Clock.h:68
void processControlPayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &sender, typename NetworkType::Key &receiver, message::ActionType &msgType)
void setUnsubscriptionListener(const boost::function< void(const typename NetworkType::Key)> &listener)
void stop(uint64_t id)
Definition: Scheduler.h:110
std::pair< uint64_t, typename NetworkType::Key > TimePair
static const uint64_t PURGE_DISTANCE
std::vector< typename NetworkType::Key > KeyList
void processPublishPayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &, typename NetworkType::Key &, message::ActionType &msgType)
SpreaditRouting(unsigned int topic_name, PubSubSystemEnvironment *pssi, const typename NetworkType::Key &root)
static const uint64_t RESUBSCRIPTION_INTERVAL