m2etis  0.4
MTPOrder.h
Go to the documentation of this file.
1 /*
2  Copyright (2016) Michael Baer, Daniel Bonrath, All rights reserved.
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15  */
16 
22 #ifndef __M2ETIS_PUBSUB_ORDER_MTPORDER_H__
23 #define __M2ETIS_PUBSUB_ORDER_MTPORDER_H__
24 
25 #include <queue>
26 #include <tuple>
27 
31 
32 #include "boost/thread.hpp"
33 
34 #if I6E_PLATFORM == I6E_PLATFORM_WIN32
35  #pragma warning(push)
36  #pragma warning(disable : 4127)
37 #endif
38 
39 namespace m2etis {
40 namespace pubsub {
41 namespace order {
42 
46  enum class LateDeliver {
47  DELIVER,
48  DROP
49  };
50 
57  template<class NetworkType, uint64_t Timeout, LateDeliver deli>
58  class MTPOrder : public BaseOrder<NetworkType> {
59  public:
61 
62  MTPOrder(PubSubSystemEnvironment * pssi, bool b) : BaseOrder<NetworkType>(pssi, b), queue_(), nextRec_(0), nextSend_(0), delivered_(0), periodicID_(0) {
63  periodicID_ = pssi->scheduler_.runRepeated(50000, boost::bind(&MTPOrder::purge, this), 1);
64  }
65 
66  virtual ~MTPOrder() {
67  BaseOrder<NetworkType>::pssi_->scheduler_.stop(periodicID_);
68  }
69 
73  bool hasPending() {
74  return !queue_.empty();
75  }
76 
82  bool configureOrderInfo(uint64_t id, const message::ActionType mtype, typename message::OrderInfo::Ptr ptr, const typename NetworkType::Key & rec) {
83  message::MTPOrderInfo::Ptr info = boost::static_pointer_cast<message::MTPOrderInfo>(ptr);
84  info->missing_ = remHelp.getDropped(rec);
85  remHelp.clear(rec);
86 
87  if (mtype == message::NOTIFY) { // notify already as sequence number
89  return true;
90  }
91  if (mtype == message::SUBSCRIBE) { // subscribes don't need a sequence number
93  return true;
94  }
95  if (mtype == message::UNSUBSCRIBE) { // unsubscribes don't need a sequence number
97  return true;
98  }
99  if (mtype == message::CONTROL) { // controls don't need a sequence number
101  return true;
102  }
103  sendQueue_.push(std::make_pair(id, info)); // store the current message buffer id along with the header struct
104  OrderInfoType::Ptr p = boost::make_shared<OrderInfoType>();
106  BaseOrder<NetworkType>::sendMsg_(p, typename NetworkType::Key(), ControlTarget::ROOT); // send a new control message to the root node of this tree (sequencer) to get a sequence number
107  return true;
108  }
109 
113  bool processControlPayload(typename message::OrderInfo::Ptr ptr, const typename NetworkType::Key & sender) {
114  message::MTPOrderInfo::Ptr info = boost::static_pointer_cast<message::MTPOrderInfo>(ptr);
115  if (info->type_ == message::MTPOrderInfo::TYPE_TOKEN_REQUEST) {
116  // (Root) an id was requested
117  OrderInfoType::Ptr p = boost::make_shared<OrderInfoType>();
119  p->seqNr = nextSend_++;
121  } else {
122  // (Sender) received an id. sending...
123  assert(info->type_ == message::MTPOrderInfo::TYPE_TOKEN_GRANT);
124  assert(sendQueue_.size() > 0);
125  sendQueue_.front().second->seqNr = info->seqNr;
127  sendQueue_.pop();
128  }
129  return false;
130  }
131 
135  void processSubscribePayload(typename message::OrderInfo::Ptr, const typename NetworkType::Key &) {
136  }
137 
141  void processPublishPayload(typename message::OrderInfo::Ptr, const typename NetworkType::Key &) {
142  }
143 
147  void processNotifyPayload(typename message::OrderInfo::Ptr ptr, const typename NetworkType::Key &) {
148  message::MTPOrderInfo::Ptr info = boost::static_pointer_cast<message::MTPOrderInfo>(ptr);
149 
150  for (uint64_t i : info->missing_) {
151  droppedQueue_.push(i);
152  }
153 
154  purge();
155  }
156 
160  void otherOrders(const std::vector<MTPOrder *> &) {
161  }
162 
167  void receive(uint64_t id, typename message::OrderInfo::Ptr ptr, const typename NetworkType::Key &) {
168  message::MTPOrderInfo::Ptr info = boost::static_pointer_cast<message::MTPOrderInfo>(ptr);
169 
170  if (info->seqNr == nextRec_) {
172  delivered_++;
173  nextRec_++;
174  purge();
175  } else if (info->seqNr == UINT64_MAX) {
177  delivered_++;
178  nextRec_++;
179  purge();
180  } else if (info->seqNr < nextRec_) {
181  // message arrived too late
182  if (deli == LateDeliver::DELIVER) {
183  // deliver
185  } else {
186  // just delete from buffer
188  }
189  } else {
190  queue_.push(std::make_tuple(info->seqNr, id, BaseOrder<NetworkType>::pssi_->scheduler_.getTime()));
191  }
192  }
193 
197  void notifyRemovedMessage(typename message::OrderInfo::Ptr ptr, const typename NetworkType::Key & receiver) {
198  message::MTPOrderInfo::Ptr info = boost::static_pointer_cast<message::MTPOrderInfo>(ptr);
199  remHelp.dropped(info->seqNr, receiver);
200  }
201 
202  private:
207  std::priority_queue<std::tuple<uint64_t, uint64_t, uint64_t>, std::vector<std::tuple<uint64_t, uint64_t, uint64_t>>, std::greater<std::tuple<uint64_t, uint64_t, uint64_t>>> queue_;
208 
213  std::queue<std::pair<uint64_t, OrderInfoType::Ptr>> sendQueue_;
214 
218  std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<uint64_t>> droppedQueue_;
219 
223  uint64_t nextRec_;
224 
228  uint64_t nextSend_;
229 
233  uint64_t delivered_;
234 
239 
240  uint64_t periodicID_;
241 
245  bool purge() {
246  uint64_t cT = BaseOrder<NetworkType>::pssi_->scheduler_.getTime();
247  while (!queue_.empty()) {
248  if (std::get<SEQNR>(queue_.top()) == nextRec_) {
249  // next messages is available
251  delivered_++;
252  queue_.pop();
253  nextRec_++;
254  } else if (std::get<TIME>(queue_.top()) + Timeout < cT) {
255  // Timeout
256  nextRec_ = std::get<SEQNR>(queue_.top());
257  } else if (!droppedQueue_.empty() && nextRec_ == droppedQueue_.top()) {
258  // This messge will never arrive
259  nextRec_++;
260  droppedQueue_.pop();
261  } else {
262  // still waiting
263  break;
264  }
265  }
266  return true;
267  }
268 
272  enum QueueTuple {
273  SEQNR,
274  BUFID,
275  TIME
276  };
277  };
278 
279 } /* namespace order */
280 } /* namespace pubsub */
281 } /* namespace m2etis */
282 
283 #if I6E_PLATFORM == I6E_PLATFORM_WIN32
284  #pragma warning(pop)
285 #endif
286 
287 #endif /* __M2ETIS_PUBSUB_ORDER_MTPORDER_H__ */
288 
boost::shared_ptr< OrderInfo > Ptr
Definition: OrderInfo.h:31
void receive(uint64_t id, typename message::OrderInfo::Ptr ptr, const typename NetworkType::Key &)
receives a message stores the message in the queue and wait for it's deliver
Definition: MTPOrder.h:167
LateDeliver
what to do when recieving an old message after Timeout
Definition: MTPOrder.h:46
void processNotifyPayload(typename message::OrderInfo::Ptr ptr, const typename NetworkType::Key &)
called for every NotifyMsg that arrived
Definition: MTPOrder.h:147
bool hasPending()
pending messages if the buffer is non-empty
Definition: MTPOrder.h:73
MTPOrder(PubSubSystemEnvironment *pssi, bool b)
Definition: MTPOrder.h:62
void notifyRemovedMessage(typename message::OrderInfo::Ptr ptr, const typename NetworkType::Key &receiver)
notified when message dropped by filter or validity strategy
Definition: MTPOrder.h:197
boost::function< void(message::OrderInfo::Ptr, const typename NetworkType::Key &, ControlTarget)> sendMsg_
stores the function to send new messages This function will send a newly created control message cont...
Definition: BaseOrder.h:146
This is the interface that should be used for all Order Strategies It specifies all functions that sh...
Definition: BaseOrder.h:39
boost::shared_ptr< MTPOrderInfo > Ptr
Definition: MTPOrderInfo.h:34
bool processControlPayload(typename message::OrderInfo::Ptr ptr, const typename NetworkType::Key &sender)
processes a control message
Definition: MTPOrder.h:113
void clear(const KEYType &rec)
Definition: RemoveHelper.h:42
uint64_t runRepeated(uint64_t interval, const boost::function< bool(void)> &func, int16_t priority)
adds new job running repeatedly
Definition: Scheduler.h:94
std::vector< uint64_t > missing_
stores information about dropped messages
Definition: MTPOrderInfo.h:67
bool configureOrderInfo(uint64_t id, const message::ActionType mtype, typename message::OrderInfo::Ptr ptr, const typename NetworkType::Key &rec)
used to manage sending a message All messages but Publish messages can be sent directly without any s...
Definition: MTPOrder.h:82
Scheduler< util::RealTimeClock > scheduler_
std::vector< IDType > getDropped(const KEYType &rec)
Definition: RemoveHelper.h:46
void otherOrders(const std::vector< MTPOrder * > &)
processes a subscribe message
Definition: MTPOrder.h:160
message::MTPOrderInfo OrderInfoType
Definition: MTPOrder.h:60
boost::function< void(uint64_t, msgProcess)> function_
stores the function to be called for delivering
Definition: BaseOrder.h:136
void processPublishPayload(typename message::OrderInfo::Ptr, const typename NetworkType::Key &)
processes a subscribe message
Definition: MTPOrder.h:141
void dropped(IDType id, const KEYType &rec)
Definition: RemoveHelper.h:34
void processSubscribePayload(typename message::OrderInfo::Ptr, const typename NetworkType::Key &)
processes a subscribe message
Definition: MTPOrder.h:135
This class implements the MTP Order strategy The root node is used as a fixed sequencer. Every node that wants to send a message, request a sequenzer number (TOKEN) from the sequencer and afterwards sends his message along with this sequence number.
Definition: MTPOrder.h:58