m2etis  0.4
DirectRouting.h
Go to the documentation of this file.
1 /*
2  Copyright (2016) Michael Baer, Daniel Bonrath, All rights reserved.
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15  */
16 
22 #ifndef __M2ETIS_PUBSUB_ROUTING_DIRECTROUTING_H__
23 #define __M2ETIS_PUBSUB_ROUTING_DIRECTROUTING_H__
24 
25 #include <algorithm>
26 #include <string>
27 #include <vector>
28 
33 
34 #include "boost/date_time/posix_time/posix_time_types.hpp"
35 
36 namespace m2etis {
37 namespace pubsub {
38 namespace routing {
39 
40  template<class NetworkType>
41  class DirectRouting : public BaseRouting<NetworkType> {
42  public:
44  typedef std::vector<typename NetworkType::Key> KeyList;
45  typedef std::pair<uint64_t, typename NetworkType::Key> TimePair;
46  typedef std::vector<TimePair> TimeList;
47 
48  // [EOC] The next variables are more or less generic to all
49  // distribution (algo) policies and describe their behavoir!
50 
51  // whether or not to register for subscribe messages in forward
52  static const bool register_forward_subscribe = false;
53 
54  // whether or not to register for subscribe messages in deliver
55  static const bool register_deliver_subscribe = true;
56 
57  // whether or not to register for unsubscribe messages in forward
58  static const bool register_forward_unsubscribe = false;
59 
60  // whether or not to register for unsubscribe messages in deliver
61  static const bool register_deliver_unsubscribe = true;
62  // [EOC]
63 
64  // whether or not to activate an automatic periodic resent subscribtion
65  static const bool periodicSubscribtion = true;
66 
67  // the time_duration between resent periodic subscribtions
68  const uint64_t periodic_;
69 
70  // the time_duration to decide if a subscriber is purged form the list
71  const uint64_t purge_distance_;
72 
74 
75  // inform listener, whether a subscriber has been purged, e.g. by timeout
76  // currently only needed for filter strategies
77  boost::function<void(const typename NetworkType::Key)> removed_subscriber_eventlistener_;
78 
79  uint64_t purgeID_;
80 
81  DirectRouting(const unsigned short topic_name, PubSubSystemEnvironment * pssi, const typename NetworkType::Key & root) : BaseRouting<NetworkType>(topic_name, pssi), periodic_(direct::RESUBSCRIPTION_INTERVAL), purge_distance_(direct::PURGE_DISTANCE), _pssi(pssi), removed_subscriber_eventlistener_(), topic_name_(topic_name), self_(), selfsubscribed_(false), subscriber_(), purging_(true), _root(root) {
82  purgeID_ = pssi->scheduler_.runRepeated(purge_distance_, boost::bind(&DirectRouting::purgeList, this), 6);
83  }
84 
85  virtual ~DirectRouting() {
86  purging_ = false;
87  _pssi->scheduler_.stop(purgeID_);
88  }
89 
90  void setSelf(const typename NetworkType::Key & self) {
91  self_ = self;
92  }
93 
94  void setUnsubscriptionListener(const boost::function<void(const typename NetworkType::Key)> & listener) {
95  removed_subscriber_eventlistener_ = listener;
96  }
97 
98  // set the self subscription status
99  void selfSubscribed(const bool b) {
100  selfsubscribed_ = b;
101  }
102 
103  // get the self subscribtion status
104  bool selfSubscribed() const {
105  return selfsubscribed_;
106  }
107 
108  void configureRoutingInfo(message::ActionType & msgType, typename message::RoutingInfo<NetworkType>::Ptr routingInfo, typename NetworkType::Key &) {
109  typename RoutingInfoType::Ptr rInfo = cast(routingInfo);
110  switch (msgType) {
111  case message::SUBSCRIBE: {
112  msgType = message::SUBSCRIBE;
113  break;
114  }
115  case message::UNSUBSCRIBE: {
116  msgType = message::UNSUBSCRIBE;
117  break;
118  }
119  case message::PUBLISH: {
120  msgType = message::PUBLISH;
121  break;
122  }
123  case message::NOTIFY: {
124  msgType = message::NOTIFY;
125  break;
126  }
127  case message::CONTROL: {
128  msgType = message::CONTROL;
129  break;
130  }
131  default:
132  M2ETIS_LOG_ERROR("Direct Routing", "configureRoutingInfo called with wrong action type");
133  }
134  return;
135  }
136 
144  KeyList getTargetNodes(const message::ActionType mtype, typename message::RoutingInfo<NetworkType>::Ptr routingInfo, typename NetworkType::Key &) const {
145  /*
146  * All messages must be sent to root.
147  * Even if it's a publishmessage and root is subscribed, too.
148  * In deliver::publish, root will take care to deliver the message and spread it to the subscribers.
149  */
150  KeyList m;
151  if (mtype == message::SUBSCRIBE || mtype == message::UNSUBSCRIBE) {
152  // If the current node is root, then we don't need to send a message to ourself!
153  if (self_ == _root) {
154  return m;
155  }
156  m.push_back(_root);
157  } else if (mtype == message::PUBLISH) {
158  m.push_back(_root);
159  } else if (mtype == message::NOTIFY || mtype == message::CONTROL) {
160  struct T {
161  static typename NetworkType::Key get_from_pair(const TimePair & p) {
162  return p.second;
163  }
164  };
165 
166  m.resize(subscriber_.size());
167  std::transform(subscriber_.begin(), subscriber_.end(), m.begin(), T::get_from_pair);
168  } else {
169  assert(false);
170  }
171 
172  return m;
173  }
174 
182  bool processSubscribePayload(typename message::RoutingInfo<NetworkType>::Ptr routingInfo, const typename NetworkType::Key & sender, typename NetworkType::Key &, message::ActionType &) override {
183  if (sender != self_) { // I don't send a message to me if I'm root. But somehow the variable inside R changed!
184  bool found = false;
185  typename TimeList::iterator iter;
186  for (iter = subscriber_.begin(); iter != subscriber_.end(); ++iter) {
187  if (iter->second == sender) {
188  found = true;
189  break;
190  }
191  }
192  if (found) {
193  // update entry
194  iter->first = _pssi->clock_.getTime();
195  } else {
196  // insert entry
197  subscriber_.push_back(std::make_pair(_pssi->clock_.getTime(), sender));
198  }
199  }
201  return true;
202  }
203 
211  void processUnsubscribePayload(typename message::RoutingInfo<NetworkType>::Ptr routingInfo, const typename NetworkType::Key & sender, typename NetworkType::Key &, message::ActionType &) {
212  struct T {
213  static bool test(const typename NetworkType::Key & send, const TimePair & paar) {
214  return paar.second == send;
215  }
216  };
217 
218  subscriber_.erase(std::remove_if(subscriber_.begin(), subscriber_.end(), boost::bind(T::test, sender, _1)), subscriber_.end());
219 
220  return;
221  }
222 
231  void processPublishPayload(typename message::RoutingInfo<NetworkType>::Ptr routingInfo, const typename NetworkType::Key &, typename NetworkType::Key &, message::ActionType & msgType) {
232  assert(self_ == _root);
233  if (self_ == _root) {
234  if (!subscriber_.empty()) {
235  msgType = message::NOTIFY;
236  return;
237  }
238  }
239 
241  return;
242  }
243 
252  void processNotifyPayload(typename message::RoutingInfo<NetworkType>::Ptr routingInfo, const typename NetworkType::Key &, typename NetworkType::Key &, message::ActionType &) {
254  return;
255  }
256 
257  void processControlPayload(typename message::RoutingInfo<NetworkType>::Ptr routingInfo, const typename NetworkType::Key &, typename NetworkType::Key &, message::ActionType &) {
258  return;
259  }
260 
261  private:
262  // The topic name
263  const unsigned short topic_name_;
264  // The own key of this actual node
265  typename NetworkType::Key self_;
266  // Is this node subscribed on the topic?
267  bool selfsubscribed_;
268 
269  // List of all subscribers (TimeStamp and Key)
270  TimeList subscriber_;
271 
272  // Control variables for the purging thread
273  mutable boost::mutex subscriber_mutex_;
274  volatile bool purging_;
275 
276  typename NetworkType::Key _root;
277 
278  typename RoutingInfoType::Ptr cast(typename message::RoutingInfo<NetworkType>::Ptr ptr) const {
279  typename RoutingInfoType::Ptr ret = boost::dynamic_pointer_cast<RoutingInfoType>(ptr);
280  if (!ret) {
281  M2ETIS_LOG_ERROR("Routing Strategy", "Downcast error of routingInfo");
282  }
283  return ret;
284  }
285 
286  bool purgeList() {
287  // Helper-Struct to create a specialized method which can be used to generically erase from a vector
288  struct T {
289  static bool test(const uint64_t & p, const uint64_t & jetzt, const TimePair & paar) {
290  if ((jetzt - paar.first) > p) {
291  return true;
292  }
293  return false;
294  }
295  };
296 
297  if (subscriber_.empty()) {
298  return purging_;
299  }
300 
301  // lock the list
302  // get the current timestamp
303  // and remove every subscriber which hasn't resubscribed within the "purge_distance"
304  uint64_t jetzt = _pssi->clock_.getTime();
305 
306  auto iter_first_erased_subscriber = std::remove_if(subscriber_.begin(), subscriber_.end(), boost::bind(T::test, purge_distance_, jetzt, _1));
307  // notify listeners about erased subscribers:
308  for (auto iter_subscriber = iter_first_erased_subscriber; iter_subscriber != subscriber_.end(); ++iter_subscriber) {
309  removed_subscriber_eventlistener_(iter_subscriber->second);
310  }
311 
312  subscriber_.erase(iter_first_erased_subscriber, subscriber_.end());
313 
314  return purging_;
315  }
316  };
317 
318 } /* namespace routing */
319 } /* namespace pubsub */
320 } /* namespace m2etis */
321 
322 #endif /* __M2ETIS_PUBSUB_ROUTING_DIRECTROUTING_H__ */
323 
void configureRoutingInfo(message::ActionType &msgType, typename message::RoutingInfo< NetworkType >::Ptr routingInfo, typename NetworkType::Key &)
void setUnsubscriptionListener(const boost::function< void(const typename NetworkType::Key)> &listener)
Definition: DirectRouting.h:94
PubSubSystemEnvironment * _pssi
Definition: DirectRouting.h:73
void processUnsubscribePayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &sender, typename NetworkType::Key &, message::ActionType &)
bool processSubscribePayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &sender, typename NetworkType::Key &, message::ActionType &) override
static const bool register_deliver_unsubscribe
Definition: DirectRouting.h:61
static const bool register_forward_subscribe
Definition: DirectRouting.h:52
boost::function< void(const typename NetworkType::Key)> removed_subscriber_eventlistener_
Definition: DirectRouting.h:77
message::DirectRoutingInfo< NetworkType > RoutingInfoType
Definition: DirectRouting.h:43
std::pair< uint64_t, typename NetworkType::Key > TimePair
Definition: DirectRouting.h:45
void processControlPayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &, typename NetworkType::Key &, message::ActionType &)
DirectRouting(const unsigned short topic_name, PubSubSystemEnvironment *pssi, const typename NetworkType::Key &root)
Definition: DirectRouting.h:81
boost::shared_ptr< DirectRoutingInfo< NetworkType > > Ptr
uint64_t runRepeated(uint64_t interval, const boost::function< bool(void)> &func, int16_t priority)
adds new job running repeatedly
Definition: Scheduler.h:94
std::vector< TimePair > TimeList
Definition: DirectRouting.h:46
Scheduler< util::RealTimeClock > scheduler_
util::Clock< util::RealTimeClock > clock_
#define M2ETIS_LOG_ERROR(module, message)
Definition: Logger.h:59
boost::shared_ptr< RoutingInfo< NetworkType > > Ptr
Definition: RoutingInfo.h:32
uint64_t getTime() const
Will return the time since the Clock on the rendezvouz node has started.
Definition: Clock.h:68
void stop(uint64_t id)
Definition: Scheduler.h:110
void setSelf(const typename NetworkType::Key &self)
Definition: DirectRouting.h:90
static const bool register_deliver_subscribe
Definition: DirectRouting.h:55
static const uint64_t PURGE_DISTANCE
std::vector< typename NetworkType::Key > KeyList
Definition: DirectRouting.h:44
KeyList getTargetNodes(const message::ActionType mtype, typename message::RoutingInfo< NetworkType >::Ptr routingInfo, typename NetworkType::Key &) const
static const bool register_forward_unsubscribe
Definition: DirectRouting.h:58
void processNotifyPayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &, typename NetworkType::Key &, message::ActionType &)
void processPublishPayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &, typename NetworkType::Key &, message::ActionType &msgType)
static const uint64_t RESUBSCRIPTION_INTERVAL