m2etis  0.4
Channel.h
Go to the documentation of this file.
1 /*
2  Copyright (2016) Michael Baer, Daniel Bonrath, All rights reserved.
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15  */
16 
22 #ifndef __M2ETIS_PUBSUB_CHANNEL_H__
23 #define __M2ETIS_PUBSUB_CHANNEL_H__
24 
25 #include "m2etis/util/Logger.h"
26 
27 #include "m2etis/TemplateHelper.h"
41 
42 #include "m2etis/sim/i6eRandom.h"
43 
44 #include "boost/bind.hpp"
45 #include "boost/date_time/posix_time/posix_time_types.hpp"
46 #include "boost/foreach.hpp"
47 #include "boost/make_shared.hpp"
48 #include "boost/shared_ptr.hpp"
49 #include "boost/thread/locks.hpp"
50 
51 #if I6E_PLATFORM == I6E_PLATFORM_WIN32
52  #pragma warning(push)
53  #pragma warning(disable : 4127)
54 #endif
55 
56 namespace m2etis {
57 namespace pubsub {
58 
65  template<class ChannelType, class NetworkType, class EventType>
69 
70  typename NetworkType::Key _self;
71  typename NetworkType::Key _rendezvous;
73  const ChannelName topic_;
75  std::vector<TreeType *> trees_;
77  util::DoubleBufferQueue<std::pair<BasicDeliverCallbackInterface<EventType> *, boost::shared_ptr<filter::FilterExp<EventType>>>> subscribeQueue_;
78  util::DoubleBufferQueue<boost::shared_ptr<filter::FilterExp<EventType>>> unsubscribeQueue_; // for filter strategies
79  bool unsubscribe_;
81  std::map<typename NetworkType::Key, uint64_t> _nodeList;
82  bool joined;
83  bool _hasSubscribe;
84  std::pair<BasicDeliverCallbackInterface<EventType> *, boost::shared_ptr<filter::FilterExp<EventType>>> _lastSubscribe;
85  uint64_t parseID_;
86  uint64_t dynamicPeriodID_;
87 
88  public:
92  Channel(const ChannelName topic_name, const std::string & ip, uint16_t port, const std::string & rendezvousIP, uint16_t known_hostport, PubSubSystemEnvironment * pssi, const std::vector<std::string> & rootList) : ChannelType::PartitionStrategy(), ChannelType::RendezvousStrategy(rootList), _self(ip + std::string(":") + std::to_string(port)), _rendezvous(rendezvousIP + std::string(":") + std::to_string(known_hostport)), factory_(message::MessageFactory<ChannelType, NetworkType>()), topic_(topic_name), controller_(pssi->_factory->createNetworkController(NetworkType())), msgQueue_(), subscribeQueue_(), unsubscribeQueue_(), unsubscribe_(false), pssi_(pssi), _nodeList({std::make_pair(_self, 0)}), joined(false), _hasSubscribe(false), _lastSubscribe(), dynamicPeriodID_(UINT64_MAX) {
93  if (!ChannelType::PartitionStrategy::DYNAMIC_PARTITION) {
94  for (auto name : ChannelType::PartitionStrategy::getTreeNames()) {
95  trees_.push_back(pssi->_tree_factory->template createTree<ChannelType, NetworkType, EventType>(_self, _rendezvous, typename NetworkType::Key(ChannelType::RendezvousStrategy::getRoot() + ":" + std::to_string(known_hostport)), pssi, topic_));
96  if (name) {} // Just to prevent unused warning
97  }
98  } else {
99  ChannelType::PartitionStrategy::createRendezvousPartition(_rendezvous);
100 
101  for (auto name : ChannelType::PartitionStrategy::getTreeNames()) {
102  trees_.push_back(pssi->_tree_factory->template createTree<ChannelType, NetworkType, EventType>(_self, _rendezvous, _rendezvous, pssi, topic_));
103  if (name) {} // Just to prevent unused warning
104  }
105  }
106 
107  updateOrderTrees();
108 
109  parseID_ = pssi->scheduler_.runRepeated(parameters::PULL_PARSEMESSAGE, boost::bind(&Channel::parseMessages, this), 2);
110  controller_->register_deliver(message::JOIN | topic_, boost::bind(&Channel::deliver, this, _1));
111  controller_->register_deliver(message::STATE | topic_, boost::bind(&Channel::deliver, this, _1));
112  controller_->register_deliver(message::LEAVE | topic_, boost::bind(&Channel::deliver, this, _1));
113 
114  if (_self != _rendezvous) {
115  pssi->scheduler_.runOnce(1000, boost::bind(&Channel::startJoin, this), 1);
116  } else {
117  dynamicPeriodID_ = pssi->scheduler_.runRepeated(6000000, boost::bind(&Channel::purgeNodes, this), 1);
118  joined = true;
119  }
120  }
121 
122  virtual ~Channel() {
123  pssi_->scheduler_.stop(parseID_);
124  if (dynamicPeriodID_ != UINT64_MAX) {
125  pssi_->scheduler_.stop(dynamicPeriodID_);
126  }
127  msgQueue_.clear();
128  for (TreeType * t : trees_) {
129  delete t;
130  }
131  trees_.clear();
132 
133  if (_self != _rendezvous) {
134 #ifdef WITH_SIM
135  SimulationEventType v;
136  v._simID = -1;
137  v._simChannel = topic_;
138  typename IMessage::Ptr msg = boost::static_pointer_cast<IMessage>(createMessage(v));
139 #else
140  typename IMessage::Ptr msg = boost::static_pointer_cast<IMessage>(createMessage());
141 #endif /* WITH_SIM */
142 
143  msg->sender = _self;
144  msg->receiver = _rendezvous;
145  msg->type = message::LEAVE | topic_;
146  controller_->send(msg);
147  }
148  }
149 
150  inline std::string getSelf() const {
151  // all trees belonging to the same topic have a reference to the same NetworkController
152  // arbitrariliy choosing the first tree:
153  return trees_[0]->getSelf().toStr();
154  }
155 
156  inline ChannelName getChannel() const {
157  return topic_;
158  }
159 
160  void subscribe(BasicDeliverCallbackInterface<EventType> & callback, boost::shared_ptr<filter::FilterExp<EventType> > predicate) {
161  subscribeQueue_.push(std::make_pair(&callback, predicate));
162  }
163 
164  // for filter strategies: deregistering a single filter: (unsubscribes from the tree, if last filter has been deregistered)
165  void unsubscribe(boost::shared_ptr<filter::FilterExp<EventType>> predicate) override {
166  unsubscribeQueue_.push(predicate);
167  }
168 
169  void unsubscribe() {
170  unsubscribe_ = true;
171  }
172 
173  void publish(const typename message::M2Message<EventType>::Ptr msg) {
174  msgQueue_.push(msg);
175  }
176 
178  return factory_.template createMessage<EventType>();
179  }
180 
181  typename message::M2Message<EventType>::Ptr createMessage(const EventType & payload) const {
182  return factory_.createMessage(payload);
183  }
184 
185 #ifdef WITH_SIM
186  typename message::NetworkMessage<net::NetworkType<net::OMNET> >::Ptr transformToNetworkMessage(typename message::M2Message<EventType>::Ptr msg) const {
187  return factory_.template transformToNetworkMessage<EventType>(msg);
188  }
189 
190  typename message::M2Message<EventType>::Ptr transformToM2Message(typename message::NetworkMessage<net::NetworkType<net::OMNET>>::Ptr msg) const {
191  return factory_.template transformToM2Message<EventType>(msg);
192  }
193 #endif
194 
195  bool parseMessages() {
196  while (!subscribeQueue_.empty()) {
197  _lastSubscribe = subscribeQueue_.poll();
198  _hasSubscribe = true;
199 
200  for (auto tree_index : ChannelType::PartitionStrategy::getSubscribeTrees(_lastSubscribe.second)) {
201  trees_[tree_index]->subscribe(*_lastSubscribe.first, _lastSubscribe.second);
202  }
203  }
204 
205  if (unsubscribe_) {
206  for (auto tree : trees_) {
207  tree->unsubscribe();
208  }
209 
210  unsubscribe_ = false;
211  }
212 
213  // for filter strategies: process unsubscribeQueue
214  while (!unsubscribeQueue_.empty()) {
215  boost::shared_ptr<filter::FilterExp<EventType> > deregisteredFilter = unsubscribeQueue_.poll();
216 
217  for (auto tree_index : ChannelType::PartitionStrategy::getSubscribeTrees(deregisteredFilter)) {
218  trees_[tree_index]->unsubscribe(deregisteredFilter);
219  }
220  }
221 
222  uint32_t msgPublished = 0; // pause publishing after some messages to allow other task running
223  while (msgPublished < parameters::PUBLISH_MESSAGECOUNT_MAX && !msgQueue_.empty() && !trees_.empty()) {
224  typename message::M2Message<EventType>::Ptr msg = msgQueue_.front();
225  std::vector<int>::size_type i = ChannelType::PartitionStrategy::getPublishTree(msg->payload, _self);
226  if (i < trees_.size()) {
227  msgQueue_.pop();
228  trees_[ChannelType::PartitionStrategy::getPublishTree(msg->payload, _self)]->publish(msg);
229  ++msgPublished;
230  } else {
231  break;
232  }
233  }
234  return true;
235  }
236 
241  typename IMessage::Ptr msg2 = boost::static_pointer_cast<IMessage>(msg);
242 
244 
245  if (tmp == message::JOIN) {
246  if (_self == _rendezvous) {
247  if (_nodeList.find(msg2->sender) == _nodeList.end()) { // current state hasn't to be sent if only a node updated its state
248  _nodeList[msg2->sender] = pssi_->clock_.getTime(); // add node to list and set time for this node to be able to purge it
249 
250  if (ChannelType::PartitionStrategy::createPartition(msg2->sender)) {
251  trees_.push_back(pssi_->_tree_factory->template createTree<ChannelType, NetworkType, EventType>(_self, _rendezvous, msg2->sender, pssi_, topic_));
252 
253  updateOrderTrees();
254 
255  if (_hasSubscribe) {
256  unsubscribePartitions();
257  }
258  }
259 
260  // RP answers with current node list AND its own time
261  for (std::pair<typename NetworkType::Key, uint64_t> node : _nodeList) {
262  msg2->_nodeList.push_back(node.first);
263  }
264 
265  msg2->sender = _self;
266  msg2->type = message::STATE | topic_;
267  msg2->_time = pssi_->clock_.getTime();
268 
269  if (ChannelType::PartitionStrategy::DYNAMIC_PARTITION) {
270  for (size_t i = 0; i < trees_.size(); ++i) {
271  msg2->_trees.push_back(typename IMessage::TreeHelper(trees_[i]->getTopic(), ChannelType::PartitionStrategy::getPredicate(i), trees_[i]->getRoot()));
272  }
273  }
274 
275  for (typename NetworkType::Key node : msg2->_nodeList) {
276  if (node != _self) { // RP mustn't send message to itself
277  msg2->receiver = node;
278  controller_->send(boost::make_shared<IMessage>(*msg2));
279  }
280  }
281 
282  if (_hasSubscribe) {
283  subscribe(*_lastSubscribe.first, _lastSubscribe.second);
284  }
285  } else {
286  _nodeList[msg2->sender] = pssi_->clock_.getTime();
287  }
288  }
289  } else if (tmp == message::STATE) {
290  joined = true;
291  for (typename NetworkType::Key node : msg2->_nodeList) {
292  _nodeList[node] = 0;
293  }
294 
295  uint64_t rT = pssi_->clock_.getRealTime();
296 // assert(msg2->_time + parameters::EXPECTED_LATENCY >= rT);
297  uint64_t offset = msg2->_time - rT + parameters::EXPECTED_LATENCY;
298  pssi_->clock_.setOffset(offset); // difference between the two clocks + network delay
299 
300  if (ChannelType::PartitionStrategy::DYNAMIC_PARTITION) {
301  bool newTree = false;
302  for (size_t i = 0; i < msg2->_trees.size(); ++i) {
303  bool found = false;
304  for (size_t j = 0; j < trees_.size(); ++j) {
305  if (trees_[j]->getTopic() == msg2->_trees[i].topic) { // if tree with this topic already exists skip it
306  ChannelType::PartitionStrategy::changePredicate(j, msg2->_trees[i].predicates);
307  ChannelType::PartitionStrategy::changeRoot(j, msg2->_trees[i].root);
308  found = true;
309  break;
310  }
311  }
312  if (!found) {
313  ChannelType::PartitionStrategy::addPartition(msg2->_trees[i].predicates, msg2->_trees[i].root); // insert partition to partition strategy
314  trees_.push_back(new Tree<ChannelType, NetworkType, EventType>(msg2->_trees[i].topic, _self, _rendezvous, msg2->_trees[i].root, pssi_, topic_)); // create responding tree
315 
316  newTree = true;
317  }
318  }
319 
320  if (newTree && _hasSubscribe) {
321  updateOrderTrees();
322 
323  unsubscribePartitions();
324 
325  subscribe(*_lastSubscribe.first, _lastSubscribe.second);
326  }
327  }
328  } else if (tmp == message::LEAVE) {
329  _nodeList.erase(msg2->sender);
330 
331  if (_self == _rendezvous) {
332  if (ChannelType::PartitionStrategy::DYNAMIC_PARTITION) {
333  for (size_t i = 0; i < trees_.size(); ++i) {
334  if (trees_[i]->getRoot() == msg2->sender) {
335  msg2->_topics.insert(trees_[i]->getTopic());
336  }
337  }
338 
339  removeTopics(msg2->_topics);
340 
341  updateOrderTrees();
342 
343  if (_hasSubscribe) {
344  unsubscribePartitions();
345 
346  subscribe(*_lastSubscribe.first, _lastSubscribe.second);
347  }
348  }
349 
350  for (std::pair<typename NetworkType::Key, uint64_t> node : _nodeList) {
351  if (node.first != _self) { // RP mustn't send message to itself
352  msg2->receiver = node.first;
353  controller_->send(boost::make_shared<IMessage>(*msg2));
354  }
355  }
356  } else {
357  if (ChannelType::PartitionStrategy::DYNAMIC_PARTITION) {
358  removeTopics(msg2->_topics);
359 
360  updateOrderTrees();
361 
362  if (_hasSubscribe) {
363  unsubscribePartitions();
364 
365  subscribe(*_lastSubscribe.first, _lastSubscribe.second);
366  }
367  }
368  }
369  }
370  }
371 
372  private:
376  bool startJoin() {
377  updateState();
378 
379  dynamicPeriodID_ = pssi_->scheduler_.runRepeated(5000000, boost::bind(&Channel::updateState, this), 1);
380 
381  return false;
382  }
383 
387  bool purgeNodes() {
388  assert(_self == _rendezvous);
389  std::vector<typename NetworkType::Key> purges;
390 
391  for (std::pair<typename NetworkType::Key, uint64_t> p : _nodeList) {
392  if (p.first != _rendezvous && pssi_->clock_.getTime() - p.second >= 6000000) {
393  purges.push_back(p.first);
394  }
395  }
396 
397  for (typename NetworkType::Key node : purges) {
398  _nodeList.erase(node);
399  }
400 
401  for (typename NetworkType::Key node : purges) {
402 #ifdef WITH_SIM
403  SimulationEventType v;
404  v._simID = -1;
405  v._simChannel = topic_;
406  typename IMessage::Ptr msg = boost::static_pointer_cast<IMessage>(createMessage(v));
407 #else
408  typename IMessage::Ptr msg = boost::static_pointer_cast<IMessage>(createMessage());
409 #endif /* WITH_SIM */
410 
411  msg->sender = node;
412  msg->type = message::LEAVE | topic_;
413 
414  if (ChannelType::PartitionStrategy::DYNAMIC_PARTITION) {
415  for (size_t i = 0; i < trees_.size(); ++i) {
416  if (trees_[i]->getRoot() == node) {
417  msg->_topics.insert(trees_[i]->getTopic());
418  }
419  }
420 
421  removeTopics(msg->_topics);
422 
423  updateOrderTrees();
424 
425  if (_hasSubscribe) {
426  unsubscribePartitions();
427 
428  subscribe(*_lastSubscribe.first, _lastSubscribe.second);
429  }
430  }
431 
432  for (std::pair<typename NetworkType::Key, uint64_t> nodes : _nodeList) {
433  if (nodes.first != _self) { // RP mustn't send message to itself
434  msg->receiver = nodes.first;
435  controller_->send(boost::make_shared<IMessage>(*msg));
436  }
437  }
438  }
439 
440  return true;
441  }
442 
446  bool updateState() {
447 #ifdef WITH_SIM
448  SimulationEventType v;
449  v._simID = -1;
450  v._simChannel = topic_;
451  typename IMessage::Ptr msg = boost::static_pointer_cast<IMessage>(createMessage(v));
452 #else
453  typename IMessage::Ptr msg = boost::static_pointer_cast<IMessage>(createMessage());
454 #endif /* WITH_SIM */
455 
456  msg->sender = _self;
457  msg->receiver = _rendezvous;
458  msg->type = message::JOIN | topic_;
459  controller_->send(msg);
460 
461  return true;
462  }
463 
467  void updateOrderTrees() {
468  std::vector<typename ChannelType::OrderStrategy *> v;
469  for (size_t i = 0; i < trees_.size(); ++i) {
470  v.push_back(static_cast<typename ChannelType::OrderStrategy *>(trees_[i]));
471  }
472  for (size_t i = 0; i < trees_.size(); i++) {
473  trees_[i]->otherOrders(v);
474  }
475  }
476 
480  void removeTopics(const std::set<uint16_t> & topics) {
481  for (uint16_t ttr : topics) {
482  for (size_t j = 0; j < trees_.size(); ++j) {
483  if (trees_[j]->getTopic() == ttr) {
484  ChannelType::PartitionStrategy::removePartition(j);
485  delete trees_[j];
486  trees_.erase(trees_.begin() + int(j));
487  break;
488  }
489  }
490  }
491  }
492 
496  void unsubscribePartitions() {
497  std::vector<unsigned int> subs = ChannelType::PartitionStrategy::getSubscribeTrees(_lastSubscribe.second);
498 
499  size_t index = 0;
500 
501  for (size_t i = 0; i < trees_.size(); ++i) {
502  if (i == subs[index]) {
503  index++;
504  } else {
505  trees_[i]->unsubscribe(_lastSubscribe.second);
506  }
507  }
508  }
509  };
510 
511 } /* namespace pubsub */
512 } /* namespace m2etis */
513 
514 #if I6E_PLATFORM == I6E_PLATFORM_WIN32
515  #pragma warning(pop)
516 #endif
517 
518 #endif /* __M2ETIS_PUBSUB_CHANNEL_H__ */
519 
boost::shared_ptr< M2Message< EventType > > Ptr
Definition: M2Message.h:42
void setOffset(uint64_t offset)
sets offset of this Clock to adjust times over network
Definition: Clock.h:141
void register_deliver(message::MessageType nr, net_deliver_func f)
ChannelName getChannel() const
Gets a identification of the channel.
Definition: Channel.h:156
PayloadPtr payload
payload
Definition: M2Message.h:51
bool empty() const
returns true if the queue is empty, otherwise false
void unsubscribe(boost::shared_ptr< filter::FilterExp< EventType >> predicate) override
Definition: Channel.h:165
Channel(const ChannelName topic_name, const std::string &ip, uint16_t port, const std::string &rendezvousIP, uint16_t known_hostport, PubSubSystemEnvironment *pssi, const std::vector< std::string > &rootList)
Constructor.
Definition: Channel.h:92
T front()
returns first entry of the queue
void send(typename message::NetworkMessage< NetworkType >::Ptr msg)
sends a message to the receiver defined within the message if the sender is the receiver, message is put into deliver method directly to remove overhead by serializing and trying to send message otherwise the message is forwarded to the corresponding wrapper
void push(const T &value)
pushes the given value into the queue
STL namespace.
static const uint32_t PULL_PARSEMESSAGE
RendezvousT RendezvousStrategy
Definition: ChannelType.h:49
static const uint32_t EXPECTED_LATENCY
void subscribe(BasicDeliverCallbackInterface< EventType > &callback, boost::shared_ptr< filter::FilterExp< EventType > > predicate)
subscribes to the channel
Definition: Channel.h:160
static const uint32_t PUBLISH_MESSAGECOUNT_MAX
void publish(const typename message::M2Message< EventType >::Ptr msg)
publishes a message on the channel
Definition: Channel.h:173
uint64_t getRealTime() const
Will return the real time since the Clock has been started.
Definition: Clock.h:77
void deliver(typename message::NetworkMessage< NetworkType >::Ptr msg)
Channel receives messages from network in here, used for dynamic partitions.
Definition: Channel.h:240
message::M2Message< EventType >::Ptr createMessage() const
Definition: Channel.h:177
uint64_t runRepeated(uint64_t interval, const boost::function< bool(void)> &func, int16_t priority)
adds new job running repeatedly
Definition: Scheduler.h:94
Scheduler< util::RealTimeClock > scheduler_
void clear()
removes all elements in the queue
util::Clock< util::RealTimeClock > clock_
uint64_t getTime() const
Will return the time since the Clock on the rendezvouz node has started.
Definition: Clock.h:68
T poll()
remoes first entry of the queue and returns its value
M2Message< EventType >::Ptr createMessage(const EventType &w) const
creates a Message
boost::shared_ptr< NetworkMessage > Ptr
void stop(uint64_t id)
Definition: Scheduler.h:110
std::string getSelf() const
Definition: Channel.h:150
Message Factory to create messages.
boost::shared_ptr< InternalMessage > Ptr
message::M2Message< EventType >::Ptr createMessage(const EventType &payload) const
Definition: Channel.h:181
static const uint32_t ACTION_TYPE_MASK
Definition: MessageType.h:32
void pop()
removes first entry of the queue