m2etis  0.4
ScribeRouting.h
Go to the documentation of this file.
1 /*
2  Copyright (2016) Michael Baer, Daniel Bonrath, All rights reserved.
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15  */
16 
22 #ifndef __M2ETIS_PUBSUB_ROUTING_SCRIBEROUTING_H__
23 #define __M2ETIS_PUBSUB_ROUTING_SCRIBEROUTING_H__
24 
25 #include <algorithm>
26 #include <string>
27 #include <vector>
28 
31 
33 
34 #include "boost/thread.hpp"
35 
36 namespace m2etis {
37 namespace pubsub {
38 namespace routing {
39 
40  template<class NetworkType>
41  class ScribeRouting : public BaseRouting<NetworkType> {
42  public:
44  typedef std::vector<typename NetworkType::Key> KeyList;
45  typedef std::pair<uint64_t, typename NetworkType::Key> TimePair;
46  typedef std::vector<TimePair> TimeList;
47 
48  // [EOC] The next variables are more or less generic to all
49  // routing policies and describe their behavior!
50 
51  // whether or not to register for subscribe messages in forward
52  static const bool register_forward_subscribe = true;
53 
54  // whether or not to register for subscribe messages in deliver
55  static const bool register_deliver_subscribe = false;
56 
57  // whether or not to register for unsubscribe messages in forward
58  static const bool register_forward_unsubscribe = true;
59 
60  // whether or not to register for unsubscribe messages in deliver
61  static const bool register_deliver_unsubscribe = false;
62  // [EOC]
63 
64  // whether or not to activate an automatic periodic resent subscribtion
65  static const bool periodicSubscribtion = true;
66 
67  // the time_duration between resent periodic subscribtions
68  const uint64_t periodic_;
69 
70  // the time_duration to decide if a subscriber is purged form the list
71  const uint64_t purge_distance_;
72 
74 
75  // inform listener, whether a subscriber has been purged, e.g. by timeout
76  // currently only needed for filter strategies
77  boost::function<void(const typename NetworkType::Key)> removed_subscriber_eventlistener_;
78 
79  uint64_t purgeID_;
80 
81  ScribeRouting(const uint16_t topic_name, PubSubSystemEnvironment * pssi, const typename NetworkType::Key & root) : BaseRouting<NetworkType>(topic_name, pssi)
82  , periodic_(scribe::RESUBSCRIPTION_INTERVAL), purge_distance_(scribe::PURGE_DISTANCE), _pssi(pssi)
83  , topic_name_(topic_name), self_(), subscribed_(false), subscriber_()
84  , purging_(true), _root(root) {
85  purgeID_ = pssi->scheduler_.runRepeated(purge_distance_, boost::bind(&ScribeRouting::purgeList, this), 0);
86  }
87 
88  virtual ~ScribeRouting() {
89  purging_ = false;
90  _pssi->scheduler_.stop(purgeID_);
91  }
92 
93  void setSelf(const typename NetworkType::Key & self) {
94  self_ = self;
95  }
96 
97  void setUnsubscriptionListener(const boost::function<void(const typename NetworkType::Key)> & listener) {
98  removed_subscriber_eventlistener_ = listener;
99  }
100 
101  // set the self subscription status
102  void selfSubscribed(const bool b) {
103  subscribed_ = b;
104  }
105 
106  // get the self subscribtion status
107  bool selfSubscribed() const {
108  return subscribed_;
109  }
110 
111  void configureRoutingInfo(message::ActionType & msgType, typename message::RoutingInfo<NetworkType>::Ptr routingInfo, typename NetworkType::Key & receiver) {
112  typename RoutingInfoType::Ptr rInfo = cast(routingInfo);
113  switch (msgType) {
114  case message::SUBSCRIBE: {
115  msgType = message::SUBSCRIBE;
116  break;
117  }
118  case message::UNSUBSCRIBE: {
119  msgType = message::UNSUBSCRIBE;
120  break;
121  }
122  case message::PUBLISH: {
123  msgType = message::PUBLISH;
124  break;
125  }
126  default: {
127  M2ETIS_LOG_ERROR("Scribe Routing", "configureRoutingInfo called with wrong action type");
128  }
129  }
130  }
131 
132  KeyList getTargetNodes(const message::ActionType mtype, typename message::RoutingInfo<NetworkType>::Ptr routingInfo, typename NetworkType::Key & receiver) const {
133  typename RoutingInfoType::Ptr rInfo = cast(routingInfo);
134 
135  /*
136  * All messages must be sent to root.
137  * Even if it's a publishmessage and root is subscribed, too.
138  * In deliver::publish, root will take care to deliver the message and spread it to the subscribers.
139  */
140  KeyList m;
141  if (mtype == message::SUBSCRIBE || mtype == message::UNSUBSCRIBE) {
142  // If the current node is root, then we don't need to send a message to ourself!
143  if (self_ == _root) {
144  return m;
145  }
146  m.push_back(_root);
147  } else if (mtype == message::PUBLISH) {
148  // TODO: (???) use the IP address if it is known, publish messages are sent to the root who starts the distribution
149  // if root is not known, use pastry routing to localize it
150  m.push_back(_root);
151  } else {
152  // message must be published, that means from root to subscribers
153  // Simply return a list of subscribed nodes
154 
155  // That's nicer than for_each and push_back, isn't it?
156  // std::transform is really good!
157  struct T {
158  static typename NetworkType::Key get_from_pair(const ScribeRouting::TimePair & p) {
159  return p.second;
160  }
161  };
162  m.resize(subscriber_.size());
163  std::transform(subscriber_.begin(), subscriber_.end(), m.begin(), T::get_from_pair);
164  }
165  return m;
166  }
167 
168  bool processSubscribePayload(typename message::RoutingInfo<NetworkType>::Ptr routingInfo, const typename NetworkType::Key & sender, typename NetworkType::Key & receiver, message::ActionType & msgType) {
169  if (sender == self_) { // TODO: (Daniel) hack, not sure that's right, but otherwise nodes subscribe on itself
170  return false;
171  }
172  typename RoutingInfoType::Ptr rInfo = cast(routingInfo);
173  bool found = false;
174  typename TimeList::iterator iter;
175  for (iter = subscriber_.begin(); iter != subscriber_.end(); ++iter) {
176  if (iter->second == sender) {
177  found = true;
178  break;
179  }
180  }
181  if (found) {
182  // update entry
183  iter->first = _pssi->clock_.getTime();
184  } else {
185  // insert entry
186  subscriber_.push_back(std::make_pair(_pssi->clock_.getTime(), sender));
187  }
188  // stop the subscribe message
189  if (selfSubscribed()) {
191  }
192  return true;
193  }
194 
195  void processUnsubscribePayload(typename message::RoutingInfo<NetworkType>::Ptr routingInfo, const typename NetworkType::Key & sender, typename NetworkType::Key & receiver, message::ActionType & msgType) {
196  // TODO: (???) use subscribed_ to check if I am a subscriber or just forwarder
197  struct T {
198  static bool test(const typename NetworkType::Key & send, const TimePair & paar) {
199  return paar.second == send;
200  }
201  };
202 
203  subscriber_.erase(std::remove_if(subscriber_.begin(), subscriber_.end(), boost::bind(T::test, sender, _1)), subscriber_.end());
204  }
205 
209  void processPublishPayload(typename message::RoutingInfo<NetworkType>::Ptr routingInfo, const typename NetworkType::Key & sender, typename NetworkType::Key & receiver, message::ActionType & msgType) {
210  M2ETIS_LOG_INFO("Received Publish msg", "");
211  typename RoutingInfoType::Ptr rInfo = cast(routingInfo);
212 
213  msgType = message::NOTIFY;
214  }
215 
216  void processNotifyPayload(typename message::RoutingInfo<NetworkType>::Ptr routingInfo, const typename NetworkType::Key & sender, typename NetworkType::Key & receiver, message::ActionType & msgType) {
217  }
218 
219  void processControlPayload(typename message::RoutingInfo<NetworkType>::Ptr routingInfo, const typename NetworkType::Key & sender, typename NetworkType::Key & receiver, message::ActionType & msgType) {
220  return;
221  }
222 
223  private:
224  // The topic name
225  const uint16_t topic_name_;
226  // The own key of this actual node
227  typename NetworkType::Key self_;
228  // Is this node subscribed on the topic?
229  bool subscribed_;
230 
231  // List of all subscribers (TimeStamp and Key)
232  TimeList subscriber_;
233 
234  // Control variables for the purging thread
235  boost::thread purger_;
236  volatile bool purging_;
237 
238  typename NetworkType::Key _root;
239 
240  typename RoutingInfoType::Ptr cast(typename message::RoutingInfo<NetworkType>::Ptr ptr) const {
241  typename RoutingInfoType::Ptr ret = boost::dynamic_pointer_cast<RoutingInfoType>(ptr);
242 
243  if (!ret) {
244  M2ETIS_LOG_ERROR("Routing Strategy", "Downcast error of routingInfo");
245  }
246 
247  return ret;
248  }
249 
250 
251  bool purgeList() {
252  // Helper-Struct to create a specialized method which can be used to generically erase from a vector
253  struct T {
254  static bool test(const uint64_t & p, const uint64_t & jetzt, const ScribeRouting::TimePair & paar) {
255  if ((jetzt - paar.first) > p) {
256  return true;
257  }
258  return false;
259  }
260  };
261 
262  if (subscriber_.empty()) {
263  return true;
264  }
265 
266  // lock the list
267  // get the acutal timestamp
268  // and remove every subscriber which hasn't resubscribed within the "purge_distance"
269  uint64_t jetzt = _pssi->clock_.getTime();
270 
271  auto iter_first_erased_subscriber = std::remove_if(subscriber_.begin(), subscriber_.end(), boost::bind(T::test, purge_distance_, jetzt, _1));
272  // notify listeners about erased subscribers:
273  for (auto iter_subscriber = iter_first_erased_subscriber; iter_subscriber != subscriber_.end(); ++iter_subscriber) {
274  removed_subscriber_eventlistener_(iter_subscriber->second);
275  }
276 
277  subscriber_.erase(iter_first_erased_subscriber, subscriber_.end());
278 
279  return true;
280  }
281  };
282 
283 } /* namespace routing */
284 } /* namespace pubsub */
285 } /* namespace m2etis */
286 
287 #endif /* __M2ETIS_PUBSUB_ROUTING_SCRIBEROUTING_H__ */
288 
message::ScribeRoutingInfo< NetworkType > RoutingInfoType
Definition: ScribeRouting.h:43
static const bool register_forward_unsubscribe
Definition: ScribeRouting.h:58
static const bool register_forward_subscribe
Definition: ScribeRouting.h:52
boost::shared_ptr< ScribeRoutingInfo< NetworkType > > Ptr
boost::function< void(const typename NetworkType::Key)> removed_subscriber_eventlistener_
Definition: ScribeRouting.h:77
std::pair< uint64_t, typename NetworkType::Key > TimePair
Definition: ScribeRouting.h:45
bool processSubscribePayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &sender, typename NetworkType::Key &receiver, message::ActionType &msgType)
#define M2ETIS_LOG_INFO(module, message)
Definition: Logger.h:55
void processPublishPayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &sender, typename NetworkType::Key &receiver, message::ActionType &msgType)
uint64_t runRepeated(uint64_t interval, const boost::function< bool(void)> &func, int16_t priority)
adds new job running repeatedly
Definition: Scheduler.h:94
ScribeRouting(const uint16_t topic_name, PubSubSystemEnvironment *pssi, const typename NetworkType::Key &root)
Definition: ScribeRouting.h:81
std::vector< typename NetworkType::Key > KeyList
Definition: ScribeRouting.h:44
Scheduler< util::RealTimeClock > scheduler_
util::Clock< util::RealTimeClock > clock_
void configureRoutingInfo(message::ActionType &msgType, typename message::RoutingInfo< NetworkType >::Ptr routingInfo, typename NetworkType::Key &receiver)
#define M2ETIS_LOG_ERROR(module, message)
Definition: Logger.h:59
boost::shared_ptr< RoutingInfo< NetworkType > > Ptr
Definition: RoutingInfo.h:32
uint64_t getTime() const
Will return the time since the Clock on the rendezvouz node has started.
Definition: Clock.h:68
static const bool register_deliver_subscribe
Definition: ScribeRouting.h:55
void setSelf(const typename NetworkType::Key &self)
Definition: ScribeRouting.h:93
void stop(uint64_t id)
Definition: Scheduler.h:110
static const bool register_deliver_unsubscribe
Definition: ScribeRouting.h:61
void processNotifyPayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &sender, typename NetworkType::Key &receiver, message::ActionType &msgType)
static const uint64_t PURGE_DISTANCE
KeyList getTargetNodes(const message::ActionType mtype, typename message::RoutingInfo< NetworkType >::Ptr routingInfo, typename NetworkType::Key &receiver) const
PubSubSystemEnvironment * _pssi
Definition: ScribeRouting.h:73
void processControlPayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &sender, typename NetworkType::Key &receiver, message::ActionType &msgType)
void setUnsubscriptionListener(const boost::function< void(const typename NetworkType::Key)> &listener)
Definition: ScribeRouting.h:97
void processUnsubscribePayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &sender, typename NetworkType::Key &receiver, message::ActionType &msgType)
std::vector< TimePair > TimeList
Definition: ScribeRouting.h:46
static const uint64_t RESUBSCRIPTION_INTERVAL