m2etis  0.4
DirectBroadcastRouting.h
Go to the documentation of this file.
1 /*
2  Copyright (2016) Michael Baer, Daniel Bonrath, All rights reserved.
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15  */
16 
22 #ifndef __M2ETIS_PUBSUB_ROUTING_DIRECTBROADCASTROUTING_H__
23 #define __M2ETIS_PUBSUB_ROUTING_DIRECTBROADCASTROUTING_H__
24 
25 #include <algorithm>
26 #include <string>
27 #include <vector>
28 
33 
34 namespace m2etis {
35 namespace pubsub {
36 namespace routing {
37 
42  template<class NetworkType>
43  class DirectBroadcastRouting : public BaseRouting<NetworkType> {
44  public:
46  typedef std::vector<typename NetworkType::Key> KeyList;
47  typedef std::set<typename NetworkType::Key> KeySet;
48  typedef std::pair<uint64_t, typename NetworkType::Key> TimePair;
49  typedef std::vector<TimePair> TimeList;
50 
51  // [EOC] The next variables are more or less generic to all
52  // distribution (algo) policies and describe their behavoir!
53 
54  // whether or not to register for subscribe messages in forward
55  static const bool register_forward_subscribe = false;
56 
57  // whether or not to register for subscribe messages in deliver
58  static const bool register_deliver_subscribe = true;
59 
60  // whether or not to register for unsubscribe messages in forward
61  static const bool register_forward_unsubscribe = false;
62 
63  // whether or not to register for unsubscribe messages in deliver
64  static const bool register_deliver_unsubscribe = true;
65  // [EOC]
66 
67  // whether or not to activate an automatic periodic resent subscribtion
68  static const bool periodicSubscribtion = true;
69 
70  // the time_duration between resent periodic subscribtions
71  const uint64_t periodic_;
72 
73  // the time_duration to decide if a subscriber is purged form the list
74  const uint64_t purge_distance_;
75 
77 
78  // inform listener, whether a subscriber has been purged, e.g. by timeout
79  // currently only needed for filter strategies
80  boost::function<void(const typename NetworkType::Key)> _removed_subscribereventlistener;
81 
82  uint64_t purgeID_;
83 
84  uint64_t registerID_;
85 
86  DirectBroadcastRouting(const unsigned short topic_name, PubSubSystemEnvironment * pssi, const typename NetworkType::Key & root) : BaseRouting<NetworkType>(topic_name, pssi), periodic_(directbroadcast::RESUBSCRIPTION_INTERVAL), purge_distance_(directbroadcast::PURGE_DISTANCE), _pssi(pssi), _removed_subscribereventlistener(), purgeID_(), registerID_(), topic_name_(topic_name), self_(), selfSubscribed_(false), _subscriber(), _purging(true), _newSubs(), _nodes(), _root(root) {
87  purgeID_ = pssi->scheduler_.runRepeated(purge_distance_, boost::bind(&DirectBroadcastRouting::purgeList, this), 6);
88  registerID_ = pssi->scheduler_.runOnce(1, [this]() {
90  return false;
91  }, 1);
92  }
93 
95  _purging = false;
96  _pssi->scheduler_.stop(purgeID_);
97  _pssi->scheduler_.stop(registerID_);
98  }
99 
100  void registerOnRoot() {
101  if (self_ != _root) {
102 #if I6E_PLATFORM == I6E_PLATFORM_WIN32
103  RoutingInfoType::Ptr newInfo = boost::make_shared<RoutingInfoType>();
104 #elif I6E_PLATFORM == I6E_PLATFORM_LINUX
105  typename RoutingInfoType::Ptr newInfo = boost::make_shared<RoutingInfoType>();
106 #endif
107  newInfo->action = RoutingInfoType::RoutingType::REDIRECT;
108 #if I6E_PLATFORM == I6E_PLATFORM_WIN32
109  sendCtrlMsg_(newInfo, _root, ControlTarget::ROOT);
110 #elif I6E_PLATFORM == I6E_PLATFORM_LINUX
112 #endif
113  } else {
114  _nodes.insert(self_);
115  }
116  }
117 
118  void setSelf(const typename NetworkType::Key & self) {
119  self_ = self;
120  if (self_ == _root) {
121  _nodes.insert(self_);
122  }
123  }
124 
125  void setUnsubscriptionListener(const boost::function<void(const typename NetworkType::Key)> & listener) {
126  _removed_subscribereventlistener = listener;
127  }
128 
129  // set the self subscription status
130  void selfSubscribed(const bool b) {
131  selfSubscribed_ = b;
132  }
133 
134  // get the self subscribtion status
135  bool selfSubscribed() const {
136  return selfSubscribed_;
137  }
138 
139  void configureRoutingInfo(message::ActionType & msgType, typename message::RoutingInfo<NetworkType>::Ptr routingInfo, typename NetworkType::Key &) {
140  typename RoutingInfoType::Ptr rInfo = cast(routingInfo);
141  switch (msgType) {
142  case message::SUBSCRIBE: {
143  msgType = message::SUBSCRIBE;
144  break;
145  }
146  case message::UNSUBSCRIBE: {
147  msgType = message::UNSUBSCRIBE;
148  break;
149  }
150  case message::PUBLISH: {
151  msgType = message::NOTIFY;
152  break;
153  }
154  case message::NOTIFY: {
155  msgType = message::NOTIFY;
156  break;
157  }
158  case message::CONTROL: {
159  msgType = message::CONTROL;
160  break;
161  }
162  default:
163  M2ETIS_LOG_ERROR("DirectBroadcastRouting", "configureRoutingInfo called with wrong action type");
164  }
165  return;
166  }
167 
175  KeyList getTargetNodes(const message::ActionType mtype, typename message::RoutingInfo<NetworkType>::Ptr routingInfo, typename NetworkType::Key &) const {
176  /*
177  * All messages must be sent to root.
178  * Even if it's a publishmessage and root is subscribed, too.
179  * In deliver::publish, root will take care to deliver the message and spread it to the subscribers.
180  */
181  typename RoutingInfoType::Ptr rInfo = cast(routingInfo);
182  KeyList m;
183  if (mtype == message::SUBSCRIBE) {
184  // if we got control message with all available nodes, we have to send all of them our subscribe
186  m = _newSubs;
187  // clear list with new nodes because they aren't new any longer
188  const_cast<DirectBroadcastRouting *>(this)->_newSubs.clear();
189  return m;
190  }
191 
192  // if we currently haven't any subscriber, so the first subscribe will be sent
193  if (!_nodes.empty()) {
194  m.resize(_nodes.size());
195  std::copy(_nodes.begin(), _nodes.end(), m.begin());
196  }
197 
198  m.erase(std::remove_if(m.begin(), m.end(), [this](const typename NetworkType::Key & other) {
199  return self_ == other;
200  }), m.end());
201  } else if (mtype == message::UNSUBSCRIBE) {
202  m.resize(_nodes.size());
203  std::copy(_nodes.begin(), _nodes.end(), m.begin());
204 
205  m.erase(std::remove_if(m.begin(), m.end(), [this](const typename NetworkType::Key & other) {
206  return self_ == other;
207  }), m.end());
208  } else if (mtype == message::PUBLISH) {
209  assert(false); // there aren't publish messages sent in DirectBroadcast
210  } else if (mtype == message::CONTROL) {
211  if (self_ == _root) {
212  for (auto n : _nodes) {
213  if (n != _root) {
214  m.push_back(n);
215  }
216  }
217  } else {
218  m.push_back(_root);
219  }
220  } else if (mtype == message::NOTIFY) {
221  // notify messages will be sent to every subscriber being registered
222  struct T {
223  static typename NetworkType::Key get_from_pair(const TimePair & p) {
224  return p.second;
225  }
226  };
227 
228  m.resize(_subscriber.size());
229  std::transform(_subscriber.begin(), _subscriber.end(), m.begin(), T::get_from_pair);
230 
231  // I have to "send" me the message to, but only if I'm subscribed on this Tree
232  if (selfSubscribed() && std::find(m.begin(), m.end(), self_) == m.end()) {
233  m.push_back(self_);
234  }
235  } else {
236  assert(false);
237  }
238 
239  return m;
240  }
241 
249  bool processSubscribePayload(typename message::RoutingInfo<NetworkType>::Ptr routingInfo, const typename NetworkType::Key & sender, typename NetworkType::Key &, message::ActionType &) override {
250  typename RoutingInfoType::Ptr rInfo = cast(routingInfo);
251  bool found = false;
252  typename TimeList::iterator iter;
253  for (iter = _subscriber.begin(); iter != _subscriber.end(); ++iter) {
254  if (iter->second == sender) {
255  found = true;
256  break;
257  }
258  }
259  if (found) {
260  // update entry
261  iter->first = _pssi->clock_.getTime();
262  } else {
263  // insert entry
264  _subscriber.push_back(std::make_pair(_pssi->clock_.getTime(), sender));
265  }
267  return true;
268  }
269 
277  void processUnsubscribePayload(typename message::RoutingInfo<NetworkType>::Ptr routingInfo, const typename NetworkType::Key & sender, typename NetworkType::Key &, message::ActionType &) {
278  typename RoutingInfoType::Ptr rInfo = cast(routingInfo);
279  struct T {
280  static bool test(const typename NetworkType::Key & send, const TimePair & paar) {
281  return paar.second == send;
282  }
283  };
284 
285  _subscriber.erase(std::remove_if(_subscriber.begin(), _subscriber.end(), boost::bind(T::test, sender, _1)), _subscriber.end());
287  }
288 
297  void processPublishPayload(typename message::RoutingInfo<NetworkType>::Ptr routingInfo, const typename NetworkType::Key &, typename NetworkType::Key &, message::ActionType &) {
298  assert(false); // there musn't be publish messages
299  }
300 
309  void processNotifyPayload(typename message::RoutingInfo<NetworkType>::Ptr routingInfo, const typename NetworkType::Key &, typename NetworkType::Key &, message::ActionType &) {
311  return;
312  }
313 
314  void processControlPayload(typename message::RoutingInfo<NetworkType>::Ptr routingInfo, const typename NetworkType::Key & sender, typename NetworkType::Key &, message::ActionType & msgType) {
315  typename RoutingInfoType::Ptr rInfo = cast(routingInfo);
316  // message returned from RP
318  if (self_ == _root) {
319  // new node created, notify existing nodes with list of all existing nodes
320  if (_nodes.insert(sender).second) {
321  if (selfSubscribed_) {
322  msgType = message::SUBSCRIBE;
324  _newSubs.push_back(sender);
325  } else {
327  }
328  typename RoutingInfoType::Ptr newInfo = boost::make_shared<RoutingInfoType>();
330  newInfo->_nodes = _nodes;
332  }
333  } else {
334  _nodes = rInfo->_nodes;
335  if (selfSubscribed_) {
336  msgType = message::SUBSCRIBE;
338  for (auto n : _nodes) {
339  bool found = false;
340  for (TimePair p : _subscriber) {
341  if (p.second == n) {
342  found = true;
343  break;
344  }
345  }
346  if (!found) {
347  _newSubs.push_back(n);
348  }
349  }
350  rInfo->_nodes.clear();
351  } else {
353  }
354  }
355 
356  } else { // nothing to do, stop the workflow
358  }
359  return;
360  }
361 
362  private:
363  // The topic name
364  const unsigned short topic_name_;
365  // The own key of this actual node
366  typename NetworkType::Key self_;
367  // Is this node subscribed on the topic?
368  bool selfSubscribed_;
369 
370  // List of all subscribers (TimeStamp and Key)
371  TimeList _subscriber;
372 
373  // Control variables for the purging thread
374  volatile bool _purging;
375  std::vector<typename NetworkType::Key> _newSubs;
376  KeySet _nodes;
377 
378  typename NetworkType::Key _root;
379 
380  typename RoutingInfoType::Ptr cast(typename message::RoutingInfo<NetworkType>::Ptr ptr) const {
381  typename RoutingInfoType::Ptr ret = boost::dynamic_pointer_cast<RoutingInfoType>(ptr);
382  if (!ret) {
383  M2ETIS_LOG_ERROR("DirectBroadcastRouting", "Downcast error of routingInfo");
384  }
385  return ret;
386  }
387 
388  bool purgeList() {
389  // Helper-Struct to create a specialized method which can be used to generically erase from a vector
390  struct T {
391  static bool test(const uint64_t & p, const uint64_t & jetzt, const TimePair & paar) {
392  if ((jetzt - paar.first) > p) {
393  return true;
394  }
395  return false;
396  }
397  };
398 
399  if (_subscriber.empty()) {
400  return _purging;
401  }
402 
403  // lock the list
404  // get the current timestamp
405  // and remove every subscriber which hasn't resubscribed within the "purge_distance"
406  uint64_t jetzt = _pssi->clock_.getTime();
407 
408  auto iter_first_erased_subscriber = std::remove_if(_subscriber.begin(), _subscriber.end(), boost::bind(T::test, purge_distance_, jetzt, _1));
409  // notify listeners about erased subscribers:
410  for (auto iter_subscriber = iter_first_erased_subscriber; iter_subscriber != _subscriber.end(); ++iter_subscriber) {
411  _removed_subscribereventlistener(iter_subscriber->second);
412  }
413 
414  _subscriber.erase(iter_first_erased_subscriber, _subscriber.end());
415 
416  return _purging;
417  }
418 
419  DirectBroadcastRouting & operator=(const DirectBroadcastRouting &);
420  };
421 
422 } /* namespace routing */
423 } /* namespace pubsub */
424 } /* namespace m2etis */
425 
426 #endif /* __M2ETIS_PUBSUB_ROUTING_DIRECTBROADCASTROUTING_H__ */
427 
boost::function< void(const typename NetworkType::Key)> _removed_subscribereventlistener
uint64_t runOnce(uint64_t time, const boost::function< bool(void)> &func, int16_t priority)
adds new job running only once
Definition: Scheduler.h:78
void processPublishPayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &, typename NetworkType::Key &, message::ActionType &)
KeyList getTargetNodes(const message::ActionType mtype, typename message::RoutingInfo< NetworkType >::Ptr routingInfo, typename NetworkType::Key &) const
void processNotifyPayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &, typename NetworkType::Key &, message::ActionType &)
std::pair< uint64_t, typename NetworkType::Key > TimePair
creates connections between every single node and sends messages directly, so every node is a root no...
boost::function< void(typename message::RoutingInfo< NetworkType >::Ptr, typename NetworkType::Key, ControlTarget)> sendCtrlMsg_
Definition: BaseRouting.h:124
DirectBroadcastRouting(const unsigned short topic_name, PubSubSystemEnvironment *pssi, const typename NetworkType::Key &root)
bool processSubscribePayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &sender, typename NetworkType::Key &, message::ActionType &) override
void setSelf(const typename NetworkType::Key &self)
uint64_t runRepeated(uint64_t interval, const boost::function< bool(void)> &func, int16_t priority)
adds new job running repeatedly
Definition: Scheduler.h:94
std::set< typename NetworkType::Key > KeySet
Scheduler< util::RealTimeClock > scheduler_
util::Clock< util::RealTimeClock > clock_
#define M2ETIS_LOG_ERROR(module, message)
Definition: Logger.h:59
boost::shared_ptr< RoutingInfo< NetworkType > > Ptr
Definition: RoutingInfo.h:32
uint64_t getTime() const
Will return the time since the Clock on the rendezvouz node has started.
Definition: Clock.h:68
void processUnsubscribePayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &sender, typename NetworkType::Key &, message::ActionType &)
void processControlPayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &sender, typename NetworkType::Key &, message::ActionType &msgType)
void stop(uint64_t id)
Definition: Scheduler.h:110
void configureRoutingInfo(message::ActionType &msgType, typename message::RoutingInfo< NetworkType >::Ptr routingInfo, typename NetworkType::Key &)
static const uint64_t PURGE_DISTANCE
std::vector< typename NetworkType::Key > KeyList
message::DirectBroadcastRoutingInfo< NetworkType > RoutingInfoType
void setUnsubscriptionListener(const boost::function< void(const typename NetworkType::Key)> &listener)
boost::shared_ptr< DirectBroadcastRoutingInfo< NetworkType > > Ptr
static const uint64_t RESUBSCRIPTION_INTERVAL