m2etis  0.4
Scheduler.h
Go to the documentation of this file.
1 /*
2  Copyright (2016) Michael Baer, Daniel Bonrath, All rights reserved.
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15  */
16 
22 #ifndef __M2ETIS_PUBSUB_SCHEDULER_H__
23 #define __M2ETIS_PUBSUB_SCHEDULER_H__
24 
25 #include <atomic>
26 #include <cstdint>
27 #include <queue>
28 
29 #include "m2etis/util/Clock.h"
30 
31 #include "boost/thread.hpp"
32 
33 namespace m2etis {
34 namespace pubsub {
35 
36  template<class ClockUpdater>
37  class Scheduler {
38  public:
39  typedef struct Job {
40  Job(const boost::function<bool(void)> & f, uint64_t t, int16_t p, uint64_t d, uint64_t i = UINT64_MAX) : func(f), time(t), priority(p), interval(i), id(d) {
41  }
42  boost::function<bool(void)> func;
43  uint64_t time;
44  int16_t priority; // the higher, the better
45  uint64_t interval;
46  uint64_t id;
47 
48  bool operator<(const Job & other) const {
49  if (time != other.time) {
50  return time > other.time;
51  }
52  if (priority != other.priority) {
53  return priority < other.priority;
54  }
55  return interval > other.interval;
56  }
57  } Job;
58 
59  explicit Scheduler(util::Clock<ClockUpdater> & c) : running_(true), clock_(c), queue_(), lock_()
60 #ifndef WITH_SIM
61  , worker_(boost::bind(&Scheduler<ClockUpdater>::worker, this))
62 #else
63  , worker_()
64 #endif
65  , id_(), stopMap_() {
66  }
67 
69  }
70 
78  uint64_t runOnce(uint64_t time, const boost::function<bool(void)> & func, int16_t priority) {
79  // TODO: if first element in queue, adjust waitingtime
80  Job j(func, clock_.getTime() + time, priority, id_++);
81  lock_.lock();
82  queue_.push(j);
83  lock_.unlock();
84  return j.id;
85  }
86 
94  uint64_t runRepeated(uint64_t interval, const boost::function<bool(void)> & func, int16_t priority) {
95  // TODO: if first element in queue, adjust waitingtime
96  Job j(func, clock_.getTime() + interval, priority, id_++, interval);
97  lock_.lock();
98  queue_.push(j);
99  lock_.unlock();
100  return j.id;
101  }
102 
106  uint64_t getTime() const {
107  return clock_.getTime();
108  }
109 
110  void stop(uint64_t id) {
111  lock_.lock();
112  stopMap_[id] = true;
113  lock_.unlock();
114  }
115 
119  void Stop() {
120  running_ = false;
121 #ifndef WITH_SIM
122  worker_.join();
123 #endif
124  lock_.lock();
125  while (!queue_.empty()) {
126  queue_.pop();
127  }
128  lock_.unlock();
129  }
130 
131 #ifdef WITH_SIM
132 
135  // TODO: convert to a threadpool if needed
136  void workerSim() {
137  lock_.lock();
138 
139  while (!queue_.empty() && queue_.top().time <= clock_.getTime()) {
140  Job j = queue_.top();
141  queue_.pop();
142 
143  bool b = stopMap_.find(j.id) != stopMap_.end();
144 
145  if (b) {
146  stopMap_.erase(j.id);
147  continue;
148  }
149 
150  lock_.unlock();
151 
152  b = j.func();
153  lock_.lock();
154  if (b && j.interval != UINT64_MAX) {
155  long cTime = clock_.getTime();
156  j.time = cTime + j.interval;
157  queue_.push(j);
158  }
159  }
160  lock_.unlock();
161  }
162 #endif /* WITH_SIM */
163 
164  private:
165  Scheduler(const Scheduler &) = delete;
166  Scheduler & operator=(const Scheduler &) = delete;
167 
168  volatile bool running_;
169 
170  util::Clock<ClockUpdater> & clock_;
171 
172  std::priority_queue<Job> queue_;
173 
174  boost::mutex lock_;
175 
176  boost::thread worker_;
177 
178  std::atomic<uint64_t> id_;
179 
180  std::map<uint64_t, bool> stopMap_;
181 
182  // TODO: (Michael) convert to a threadpool if needed
183  void worker() {
184  uint64_t tID = clock_.registerTimer();
185  while (running_) {
186  lock_.lock();
187 
188  while (running_ && !queue_.empty() && queue_.top().time <= clock_.getTime()) {
189  Job j = queue_.top();
190  queue_.pop();
191  bool b = stopMap_.find(j.id) != stopMap_.end();
192 
193  if (b) {
194  stopMap_.erase(j.id);
195  }
196 
197  if (b) {
198  continue;
199  }
200 
201  lock_.unlock();
202 
203  b = j.func();
204 
205  lock_.lock();
206  if (b && j.interval != UINT64_MAX) {
207  uint64_t cTime = clock_.getTime();
208  j.time = cTime + j.interval;
209  queue_.push(j);
210  }
211  }
212  uint64_t t = clock_.getTime() + 1000000; // sleep 1 second if no task is there
213  if (!queue_.empty()) {
214  t = queue_.top().time;
215  }
216  lock_.unlock();
217  if (!running_ || !clock_.waitForTime(tID, t)) {
218  break;
219  }
220  }
221  }
222  };
223 
224 } /* namespace pubsub */
225 } /* namespace m2etis */
226 
227 #endif /* __M2ETIS_PUBSUB_SCHEDULER_H__ */
228 
struct m2etis::pubsub::Scheduler::Job Job
uint64_t runOnce(uint64_t time, const boost::function< bool(void)> &func, int16_t priority)
adds new job running only once
Definition: Scheduler.h:78
boost::function< bool(void)> func
Definition: Scheduler.h:42
Scheduler(util::Clock< ClockUpdater > &c)
Definition: Scheduler.h:59
uint64_t runRepeated(uint64_t interval, const boost::function< bool(void)> &func, int16_t priority)
adds new job running repeatedly
Definition: Scheduler.h:94
void Stop()
Stops whole Scheduler and removes all tasks.
Definition: Scheduler.h:119
bool operator<(const Job &other) const
Definition: Scheduler.h:48
uint64_t getTime() const
Will return the time since the Clock on the rendezvouz node has started.
Definition: Clock.h:68
uint64_t getTime() const
returns current time
Definition: Scheduler.h:106
void stop(uint64_t id)
Definition: Scheduler.h:110
Job(const boost::function< bool(void)> &f, uint64_t t, int16_t p, uint64_t d, uint64_t i=UINT64_MAX)
Definition: Scheduler.h:40