m2etis  0.4
GMSOrder.h
Go to the documentation of this file.
1 /*
2  Copyright (2016) Michael Baer, Daniel Bonrath, All rights reserved.
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15  */
16 
22 #ifndef __M2ETIS_PUBSUB_ORDER_GMSORDER_H__
23 #define __M2ETIS_PUBSUB_ORDER_GMSORDER_H__
24 
25 #include <set>
26 
29 
30 #include "boost/any.hpp"
31 
32 namespace m2etis {
33 namespace pubsub {
34 namespace order {
35 
41  template<class NetworkType, unsigned int Timeout>
42  class GMSOrder : public BaseOrder<NetworkType> {
46  struct MsgInfo {
47  MsgInfo(uint64_t a, uint64_t b, uint64_t c) : seqNr(a), treeId(b), msgId(c) {
48  }
49 
53  uint64_t seqNr;
54 
58  uint64_t treeId;
59 
63  uint64_t msgId;
64 
65  bool operator<(const MsgInfo & b) const {
66  return !(std::tie(seqNr, treeId, msgId) < std::tie(b.seqNr, b.treeId, b.msgId));
67  }
68 
69  bool operator>(const MsgInfo & b) const {
70  return !(*this < b);
71  }
72  };
73 
74  public:
76 
77  GMSOrder(PubSubSystemEnvironment * pssi, bool isRoot) : BaseOrder<NetworkType>(pssi, isRoot), pssi_(pssi), nextFinished(0), nextSend(0) {
78  // if rootnode, set variables that wouldn't be set because we don't get a subscribe msg
79  if (isRoot) {
80  for (size_t i = 0; i < others_.size(); ++i) {
81  others_[i]->setMasterTree(myTree_);
82  }
83  }
84  }
85 
86  virtual ~GMSOrder() {}
87 
88  uint64_t counter = 0;
89 
94 
98  std::vector<GMSOrder *> others_;
99 
104  std::vector<std::set<typename NetworkType::Key>> subs_;
105 
109  std::map<typename NetworkType::Key, uint64_t> nextSend_;
113  std::map<typename NetworkType::Key, uint64_t> nextRec_;
118  std::map<typename NetworkType::Key, uint64_t> mainTree_;
119 
123  std::set<uint64_t> selfSub_;
124 
128  uint64_t nextFinished;
129 
133  uint64_t masterTree = UINT64_MAX;
134 
138  uint64_t myTree_ = UINT64_MAX;
139 
143  uint64_t nextSend;
144 
148  std::map<typename NetworkType::Key, std::set<uint64_t>> subOn_;
149 
150  typedef std::pair<uint64_t, uint64_t> MsgIdent; // <Tree, MsgNr>
151  typedef uint64_t SN;
152  typedef std::pair<uint64_t, uint64_t> BufId; // <Tree, BufferNr>
153 
154  typedef std::tuple<MsgInfo, uint64_t> QueueTuple;
155 
159  std::priority_queue<QueueTuple, std::vector<QueueTuple>, std::greater<QueueTuple>> ctrlQueue_;
160 
164  std::map<MsgIdent, BufId> pubQueue_;
165 
169  bool hasPending() {
170  return !ctrlQueue_.empty();
171  }
172 
176  void setMasterTree(uint64_t t) {
177  masterTree = t;
178  }
179 
183  bool configureOrderInfo(uint64_t id, const message::ActionType mtype, typename message::OrderInfo::Ptr ptr, const typename NetworkType::Key & rec) {
184  // no infos, just send
185  typename OrderInfoType::Ptr info = boost::static_pointer_cast<OrderInfoType>(ptr);
186  info->mT = masterTree;
187  info->sender = BaseOrder<NetworkType>::hn;
188 
189  if (mtype == message::SUBSCRIBE) {
190  info->realTree = myTree_;
191  if (masterTree == UINT64_MAX) { // not yet subscribed on any tree
192  info->mT = myTree_;
193  for (size_t i = 0; i < others_.size(); ++i) {
194  others_[i]->setMasterTree(myTree_);
195  }
197  } else if (masterTree == myTree_) {
199  } else {
201  }
202  // remember this subscription
203  if (selfSub_.find(myTree_) == selfSub_.end()) {
204  selfSub_.insert(myTree_);
205  }
206  } else if (mtype == message::PUBLISH) {
207  info->realTree = myTree_;
209  } else if (mtype == message::NOTIFY) {
211  } else if (mtype == message::CONTROL) {
212  if (info->seqNr == 123456789 && info->type != message::TYPE::UNSUB) { // identify my own control msgs
213  if (nextSend_.find(rec) == nextSend_.end()) { // send with next id for this link
214  nextSend_[rec] = 0;
215  }
216  info->seqNr = nextSend_[rec]++;
217  }
219  } else {
220  selfSub_.erase(myTree_);
221  if (myTree_ == masterTree) {
222  // we need a new masterTree
223  if (selfSub_.empty()) { // no more subscriptions
224  masterTree = UINT64_MAX;
225  } else {
226  masterTree = *(selfSub_.begin());
227  }
228  }
229  // send an unsubscribe control to server
230  typename OrderInfoType::Ptr p = boost::make_shared<OrderInfoType>();
231  p->type = message::TYPE::UNSUB;
232  p->realTree = myTree_;
233  p->mT = masterTree;
234  p->seqNr = 123456789; // random value to identify these messages in configure order info
235  BaseOrder<NetworkType>::sendMsg_(p, typename NetworkType::Key(), ControlTarget::ROOT);
236 
237  info->realTree = myTree_;
239  }
240  return true;
241  }
242 
247  void processSubscribePayload(typename message::OrderInfo::Ptr ptr, const typename NetworkType::Key & sender) {
248  typename OrderInfoType::Ptr info = boost::static_pointer_cast<OrderInfoType>(ptr);
250  if (myTree_ == 0) {
251  // process on tree 0
252  info->realTree = info->realTree == UINT64_MAX ? 0 : info->realTree;
253  // sender is subscribed on tree realTree
254  subOn_[sender].insert(info->realTree);
255  // on realTree, sender is subscribed
256  subs_[uint32_t(info->realTree)].insert(sender);
257  // sender's maintree is mT
258  mainTree_[sender] = info->mT;
259  } else {
260  //redirect msg to tree 0
261  info->realTree = myTree_;
262  others_[0]->processSubscribePayload(ptr, sender);
263  }
264  }
265  }
266 
267  void processUnsubscribePayload(typename message::OrderInfo::Ptr ptr, const typename NetworkType::Key & sender) {
268  }
269 
273  void processPublishPayload(typename message::OrderInfo::Ptr ptr, const typename NetworkType::Key &) {
274  typename OrderInfoType::Ptr info = boost::static_pointer_cast<OrderInfoType>(ptr);
275  assert(BaseOrder<NetworkType>::isRoot_); // only the root node receives publish messages
276  info->msgNr = nextSend++; // give it a unique id
277  // Tree 0 shall send the CtrlMsgs used for this message (This tree sends the Msg on all needed trees)
278  others_[0]->sendCtrl(myTree_, info->msgNr);
279  }
280 
284  void processNotifyPayload(typename message::OrderInfo::Ptr, const typename NetworkType::Key &) {
285  }
286 
290  bool processControlPayload(typename message::OrderInfo::Ptr ptr, const typename NetworkType::Key & sender) {
291  typename OrderInfoType::Ptr info = boost::static_pointer_cast<OrderInfoType>(ptr);
292  if (info->type == message::TYPE::UNSUB) {
293  // someone unsubscribes
295  if (myTree_ == 0) {
296  // process on tree 0
297  info->realTree = info->realTree == UINT64_MAX ? 0 : info->realTree;
298  // sender is subscribed on tree realTree
299  subOn_[sender].erase(info->realTree);
300  // on realTree, sender is subscribed
301  subs_[uint32_t(info->realTree)].erase(sender);
302  // sender's maintree is mT
303  mainTree_[sender] = info->mT;
304  } else {
305  //redirect msg to tree 0
306  info->realTree = myTree_;
307  others_[0]->processControlPayload(ptr, sender);
308  }
309  }
310  return false;
311  }
312 
313  if (masterTree != myTree_) {
314  // ControlMsg on wrong Tree. Just ignore the message
315  return true;
316  }
317  ctrlQueue_.push(std::make_tuple(MsgInfo(info->seqNr, info->realTree, info->msgNr), BaseOrder<NetworkType>::pssi_->scheduler_.getTime()));
318  updateQueue();
319  return true;
320  }
321 
326  void deliver(uint64_t a) {
328  }
329 
333  void updateQueue() {
334  bool b = true;
335  while (b && !ctrlQueue_.empty()) { // process multiple messages
336  b = false; // stop unless we remove the first element
337  MsgInfo head = std::get<0>(ctrlQueue_.top());
338  if (ctrlQueue_.size() > 0 && head.seqNr == nextFinished) { // next message to be processed
339  if (head.msgId == UINT64_MAX) { // this message doesn't need to be synchronised
340  nextFinished++;
341  ctrlQueue_.pop();
342  b = true;
343  } else {
344  BufId bid = std::make_pair(head.treeId, head.msgId); // identifier for a message in the buffer (tree + id)
345  // check if corresponding NOTIFY already arrived
346  if (pubQueue_.find(bid) != pubQueue_.end()) {
347  // deliver the message with given id on it's realTree
348  others_[uint32_t(pubQueue_[bid].first)]->deliver(pubQueue_[bid].second);
349  pubQueue_.erase(bid);
350  ctrlQueue_.pop();
351  nextFinished++;
352  b = true;
353  }
354  }
355  } else if (std::get<1>(ctrlQueue_.top()) + Timeout > BaseOrder<NetworkType>::pssi_->scheduler_.getTime()) {
356  // Timeout
357  nextFinished = std::get<0>(ctrlQueue_.top()).seqNr;
358  b = true;
359  }
360  }
361  }
362 
368  void receive(uint64_t id, typename message::OrderInfo::Ptr ptr, const typename NetworkType::Key & sender) {
369  typename OrderInfoType::Ptr info = boost::static_pointer_cast<OrderInfoType>(ptr);
371  // no need to buffer messages on the root because they are in order by definition
373  return;
374  }
375  if (masterTree == UINT64_MAX || myTree_ == masterTree) {
376  // save the message until the corresponding controlmessage arrived
377  pubQueue_[std::make_pair(info->realTree, info->msgNr)] = std::make_pair(info->realTree, id);
378  updateQueue();
379  } else {
380  // redirect message to the master Tree
381  others_[uint32_t(masterTree)]->receive(id, ptr, sender);
382  }
383  }
384 
390  void otherOrders(const std::vector<GMSOrder *> & others) {
391  others_ = others; // save other instances
392 
393  // searches instances to find the index of the current tree
394  for (size_t i = 0; i < others_.size(); ++i) {
395  if (others_[i] == this) {
396  myTree_ = i;
397  break;
398  }
399  }
400 
401  // initializes lists of subscribers for all trees
402  subs_ = std::vector<std::set<typename NetworkType::Key>>(others_.size());
403  }
404 
411  void sendCtrl(uint64_t treeId, uint64_t msgId) {
412  std::set<uint64_t> v;
413  for (typename std::set<typename NetworkType::Key>::iterator subsOnTree = subs_[uint32_t(treeId)].begin(); subsOnTree != subs_[uint32_t(treeId)].end(); ++subsOnTree) { // *subsOnTree := string
414  // iterator over all subscribers on the tree and get all used mainTrees
415  v.insert(mainTree_[*subsOnTree]);
416  }
417  // send a CtrlMsg on all these maintrees
418  for (std::set<uint64_t>::iterator it = v.begin(); it != v.end(); ++it) {
419  others_[uint32_t(*it)]->sendCtrl2(treeId, msgId);
420  }
421  }
422 
428  void sendCtrl2(uint64_t treeId, uint64_t msgId) {
429  typename OrderInfoType::Ptr p = boost::make_shared<OrderInfoType>();
430  p->msgNr = msgId;
431  p->realTree = treeId;
432  // TODO: (Daniel) remove magic number!
433  p->seqNr = 123456789; // random value to identify these messages in configure order info
434  // publish this message normally
435  BaseOrder<NetworkType>::sendMsg_(p, typename NetworkType::Key(), ControlTarget::ALL);
436  }
437 
441  void notifyRemovedMessage(typename message::OrderInfo::Ptr, const typename NetworkType::Key &) {
442  }
443  };
444 
445 } /* namespace order */
446 } /* namespace pubsub */
447 } /* namespace m2etis */
448 
449 #endif /* __M2ETIS_PUBSUB_ORDER_GMSORDER_H__ */
450 
boost::shared_ptr< OrderInfo > Ptr
Definition: OrderInfo.h:31
std::priority_queue< QueueTuple, std::vector< QueueTuple >, std::greater< QueueTuple > > ctrlQueue_
Queue containing all recieved but not yet further processed ctrl messages.
Definition: GMSOrder.h:159
void deliver(uint64_t a)
the Message with id a shall be delivered This is used to force other instances to deliver a message ...
Definition: GMSOrder.h:326
std::tuple< MsgInfo, uint64_t > QueueTuple
Definition: GMSOrder.h:154
uint64_t masterTree
tree on which Control Msgs are expected. Default: None
Definition: GMSOrder.h:133
void sendCtrl2(uint64_t treeId, uint64_t msgId)
send a ctrlMsg on this tree
Definition: GMSOrder.h:428
void notifyRemovedMessage(typename message::OrderInfo::Ptr, const typename NetworkType::Key &)
notified when message dropped by filter or validity strategy
Definition: GMSOrder.h:441
bool configureOrderInfo(uint64_t id, const message::ActionType mtype, typename message::OrderInfo::Ptr ptr, const typename NetworkType::Key &rec)
configures the info struct
Definition: GMSOrder.h:183
std::map< typename NetworkType::Key, uint64_t > nextSend_
Receiver -> next Seq. Nr. to use.
Definition: GMSOrder.h:109
uint64_t nextFinished
next ID to be expected
Definition: GMSOrder.h:128
std::set< uint64_t > selfSub_
this node is subscribed on these trees
Definition: GMSOrder.h:123
void updateQueue()
checks the queue for new messages to deliver
Definition: GMSOrder.h:333
bool processControlPayload(typename message::OrderInfo::Ptr ptr, const typename NetworkType::Key &sender)
processes a control payload (Root)
Definition: GMSOrder.h:290
uint64_t nextSend
next number to be used as the global Sequence number
Definition: GMSOrder.h:143
std::pair< uint64_t, uint64_t > BufId
Definition: GMSOrder.h:152
std::map< typename NetworkType::Key, uint64_t > nextRec_
Sender -> next Seq. Nr. to expect.
Definition: GMSOrder.h:113
std::map< typename NetworkType::Key, std::set< uint64_t > > subOn_
which node is subscribed on which tree
Definition: GMSOrder.h:148
boost::function< void(message::OrderInfo::Ptr, const typename NetworkType::Key &, ControlTarget)> sendMsg_
stores the function to send new messages This function will send a newly created control message cont...
Definition: BaseOrder.h:146
bool hasPending()
returns whether pending messages exist
Definition: GMSOrder.h:169
This is the interface that should be used for all Order Strategies It specifies all functions that sh...
Definition: BaseOrder.h:39
boost::shared_ptr< GMSOrderInfo > Ptr
Definition: GMSOrderInfo.h:40
uint64_t myTree_
index number of this tree
Definition: GMSOrder.h:138
void receive(uint64_t id, typename message::OrderInfo::Ptr ptr, const typename NetworkType::Key &sender)
called by the Tree whenever a message is ready to be delivered This function stores the messages (to ...
Definition: GMSOrder.h:368
std::pair< uint64_t, uint64_t > MsgIdent
Definition: GMSOrder.h:150
void processUnsubscribePayload(typename message::OrderInfo::Ptr ptr, const typename NetworkType::Key &sender)
Definition: GMSOrder.h:267
void processSubscribePayload(typename message::OrderInfo::Ptr ptr, const typename NetworkType::Key &sender)
processes a subscribe payload (Root) the message will be processed on tree 0
Definition: GMSOrder.h:247
void setMasterTree(uint64_t t)
this node will only accept controlmessages from this tree
Definition: GMSOrder.h:176
PubSubSystemEnvironment * pssi_
pointer to the PubSubSystemEnvironment
Definition: GMSOrder.h:93
boost::function< void(uint64_t, msgProcess)> function_
stores the function to be called for delivering
Definition: BaseOrder.h:136
GMSOrder(PubSubSystemEnvironment *pssi, bool isRoot)
Definition: GMSOrder.h:77
std::map< MsgIdent, BufId > pubQueue_
stores all NOTIFY msgs that arrived
Definition: GMSOrder.h:164
void processNotifyPayload(typename message::OrderInfo::Ptr, const typename NetworkType::Key &)
called for every NotifyMsg that arrived
Definition: GMSOrder.h:284
std::vector< std::set< typename NetworkType::Key > > subs_
for every tree, save all subscribers only neccessary on root node
Definition: GMSOrder.h:104
message::GMSOrderInfo< NetworkType > OrderInfoType
Definition: GMSOrder.h:75
implements the Order Strategy from Garcia-Molina and Spauster The Strategy ensures synchronisation al...
Definition: GMSOrder.h:42
void processPublishPayload(typename message::OrderInfo::Ptr ptr, const typename NetworkType::Key &)
processes a publish payload
Definition: GMSOrder.h:273
std::vector< GMSOrder * > others_
list of all Order classes from the other trees
Definition: GMSOrder.h:98
std::map< typename NetworkType::Key, uint64_t > mainTree_
Receiver -> Tree that should be used for the Control msg only used in root node.
Definition: GMSOrder.h:118
void sendCtrl(uint64_t treeId, uint64_t msgId)
the tree shall publish CtrlMsgs on all trees that have subscribers This function is only used on Tree...
Definition: GMSOrder.h:411
void otherOrders(const std::vector< GMSOrder * > &others)
stores the instances of the OrderStrategy from the other Channels Also sets the myTree_ variable and ...
Definition: GMSOrder.h:390