m2etis  0.4
DetMergeOrder.h
Go to the documentation of this file.
1 /*
2  Copyright (2016) Michael Baer, Daniel Bonrath, All rights reserved.
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15  */
16 
22 #ifndef __M2ETIS_PUBSUB_ORDER_DETMERGEORDER_H__
23 #define __M2ETIS_PUBSUB_ORDER_DETMERGEORDER_H__
24 
25 #include <queue>
26 
28 
30 
31 #undef max
32 
33 namespace m2etis {
34 namespace pubsub {
35 namespace order {
36 
40  template<class NetworkType, class Config>
41  class DetMergeOrder : public BaseOrder<NetworkType>, public Config {
45  class Compare {
46  public:
47  bool operator()(const std::pair<typename message::DetMergeOrderInfo<Config>::Ptr, uint64_t> & a, const std::pair<typename message::DetMergeOrderInfo<Config>::Ptr, uint64_t> & b) const {
48  // reveresed order to get the smallest pair first (priority_queue sorts maximum first)
49  return !(*(a.first.get()) < *(b.first.get()));
50  }
51  };
52 
53  public:
55 
59  DetMergeOrder(PubSubSystemEnvironment * pssi, bool b) : BaseOrder<NetworkType>(pssi, b), rt(0), lastEv(new typename OrderInfoType::Timestamp()), queue_(), pssi_(pssi) {
60  lastEv->kn[0] = 1;
61  periodicID_ = pssi->scheduler_.runRepeated(5000, boost::bind(&DetMergeOrder::updatePhyClock, this), 1);
62  }
63 
67  virtual ~DetMergeOrder() {
68  pssi_->scheduler_.stop(periodicID_);
69  }
70 
74  bool hasPending() {
75  return !queue_.empty();
76  }
77 
82  bool configureOrderInfo(uint64_t id, const message::ActionType mtype, typename message::OrderInfo::Ptr ptr, const typename NetworkType::Key &) {
83  if (mtype != message::PUBLISH) {
85  return true;
86  }
87  typename message::DetMergeOrderInfo<Config>::Ptr info = boost::static_pointer_cast<message::DetMergeOrderInfo<Config>>(ptr);
88  // update local time
89  rt = pssi_->scheduler_.getTime() / Config::raster;
90  lastEv->c = std::max(int64_t(0), int64_t(lastEv->r + lastEv->c - rt));
91  // shifts kn with (rt - lastEv->r) -> this time elapsed since last update
92  for (uint32_t t = 0; t < Config::eps * 2; ++t) {
93  if (t + rt - lastEv->r < 2 * Config::eps) {
94  lastEv->kn[t] = lastEv->kn[t + size_t(rt - lastEv->r)];
95  } else {
96  lastEv->kn[t] = 0;
97  }
98  }
99  lastEv->kn[0]++;
100  lastEv->r = rt;
101  info->ts->r = lastEv->r;
102  info->ts->c = lastEv->c;
103  info->ts->kn = lastEv->kn;
105  return true;
106  }
107 
112  bool processControlPayload(typename message::OrderInfo::Ptr, const typename NetworkType::Key &) {
113  assert(false);
114  return false;
115  }
116 
120  void processSubscribePayload(typename message::OrderInfo::Ptr, const typename NetworkType::Key &) {
121  }
122 
126  void processPublishPayload(typename message::OrderInfo::Ptr, const typename NetworkType::Key &) {
127  }
128 
132  void processNotifyPayload(typename message::OrderInfo::Ptr, const typename NetworkType::Key &) {
133  }
134 
138  void otherOrders(const std::vector<DetMergeOrder *> &) {
139  }
140 
145  void receive(uint64_t id, typename message::OrderInfo::Ptr ptr, const typename NetworkType::Key &) {
146  typename OrderInfoType::Ptr info = boost::static_pointer_cast<OrderInfoType>(ptr);
147  // Update time
148  rt = pssi_->scheduler_.getTime() / Config::raster;
149  lastEv->c = std::max(int64_t(0), std::max(int64_t(lastEv->r + lastEv->c - rt), int64_t(int64_t(info->ts->r) + info->ts->c - rt)));
150  // shift array
151  for(uint32_t t = 0; t < Config::eps * 2; ++t) {
152  int32_t v1 = 0;
153  int32_t v2 = 0;
154  int32_t v3 = 0;
155  // values outside the array are 0
156  if (t + rt - lastEv->r < 2 * Config::eps && t + rt - lastEv->r >= 0) {
157  v2 = lastEv->kn[t + size_t(rt - lastEv->r)];
158  }
159  if (t + rt - info->ts->r < 2 * Config::eps && t + rt - info->ts->r >= 0) {
160  v3 = info->ts->kn[t + size_t(rt - info->ts->r)];
161  }
162  lastEv->kn[t] = max(v1, max(v2, v3));
163  }
164  lastEv->kn[0]++;
165  lastEv->r = rt;
166  queue_.push(std::make_pair(info, id));
167  }
168 
169  // update clock and check for delivery
170  bool updatePhyClock() {
171  uint64_t a = pssi_->scheduler_.getTime();
172  rt = a / Config::raster;
173  // deliver all waiting msgs
174  while (!queue_.empty() && queue_.top().first->ts->r + queue_.top().first->ts->c + Config::delta + Config::eps <= rt) {
175  // delivers message
177  queue_.pop();
178  }
179  // reschedule this task
180  return true;
181  }
182 
186  void notifyRemovedMessage(typename message::OrderInfo::Ptr, const typename NetworkType::Key &) {
187  }
188 
189  private:
190  template<class T>
191  T max(const T & a, const T & b) {
192  if (a > b) {
193  return a;
194  } else {
195  return b;
196  }
197  }
198 
204  volatile uint64_t rt;
205 
206  typename OrderInfoType::timestamp_p lastEv; // <r.j, c.j, kn.j>
207 
211  std::priority_queue<std::pair<typename OrderInfoType::Ptr, uint64_t>, std::vector<std::pair<typename OrderInfoType::Ptr, uint64_t>>, Compare> queue_;
212 
216  PubSubSystemEnvironment * pssi_;
217 
218  uint64_t periodicID_;
219  };
220 
221 } /* namespace order */
222 } /* namespace pubsub */
223 } /* namespace m2etis */
224 
225 #endif /* __M2ETIS_PUBSUB_ORDER_DETMERGEORDER_H__ */
226 
boost::shared_ptr< OrderInfo > Ptr
Definition: OrderInfo.h:31
boost::shared_ptr< DetMergeOrderInfo > Ptr
void notifyRemovedMessage(typename message::OrderInfo::Ptr, const typename NetworkType::Key &)
notified when message dropped by filter or validity strategy
void processSubscribePayload(typename message::OrderInfo::Ptr, const typename NetworkType::Key &)
this strategie doesn't care about subscribes
bool hasPending()
returns whether there are messages waiting
Definition: DetMergeOrder.h:74
timestamp_p ts
timestamp for this OrderInfo
Implements the Deterministic Merge Order Strategy.
Definition: DetMergeOrder.h:41
DetMergeOrder(PubSubSystemEnvironment *pssi, bool b)
Constructor.
Definition: DetMergeOrder.h:59
void receive(uint64_t id, typename message::OrderInfo::Ptr ptr, const typename NetworkType::Key &)
called when a message is received This function updates the internal state of this strategie and push...
This is the interface that should be used for all Order Strategies It specifies all functions that sh...
Definition: BaseOrder.h:39
uint64_t runRepeated(uint64_t interval, const boost::function< bool(void)> &func, int16_t priority)
adds new job running repeatedly
Definition: Scheduler.h:94
Scheduler< util::RealTimeClock > scheduler_
uint64_t getTime() const
returns current time
Definition: Scheduler.h:106
boost::function< void(uint64_t, msgProcess)> function_
stores the function to be called for delivering
Definition: BaseOrder.h:136
bool configureOrderInfo(uint64_t id, const message::ActionType mtype, typename message::OrderInfo::Ptr ptr, const typename NetworkType::Key &)
configure Infos This functions sets all necessary information for this message
Definition: DetMergeOrder.h:82
void stop(uint64_t id)
Definition: Scheduler.h:110
void processPublishPayload(typename message::OrderInfo::Ptr, const typename NetworkType::Key &)
this strategie doesn't care about publish payloads here
void otherOrders(const std::vector< DetMergeOrder * > &)
this strategie doesn't need the other trees
void processNotifyPayload(typename message::OrderInfo::Ptr, const typename NetworkType::Key &)
called for every NotifyMsg that arrived
boost::shared_ptr< Timestamp > timestamp_p
virtual ~DetMergeOrder()
Destructor.
Definition: DetMergeOrder.h:67
message::DetMergeOrderInfo< Config > OrderInfoType
Definition: DetMergeOrder.h:54
bool processControlPayload(typename message::OrderInfo::Ptr, const typename NetworkType::Key &)
process Control Messages They are only used to sync