m2etis  0.4
AckDeliver.h
Go to the documentation of this file.
1 /*
2  Copyright (2016) Michael Baer, Daniel Bonrath, All rights reserved.
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15  */
16 
22 #ifndef __M2ETIS_PUBSUB_DELIVER_ACKDELIVER_H__
23 #define __M2ETIS_PUBSUB_DELIVER_ACKDELIVER_H__
24 
25 #include <string>
26 #include <queue>
27 #include <map>
28 
30 
33 
35 
36 namespace m2etis {
37 namespace pubsub {
38 namespace deliver {
39 
40  struct Comp {
41  bool operator()(const std::pair<uint64_t, std::pair<uint64_t, int>> & a, const std::pair<uint64_t, std::pair<uint64_t, int>> & b) {
42  return !(a < b);
43  }
44  };
45 
46  template<class NetworkType, int Retries, Amount Multiples>
47  class AckDeliver : public BaseDeliver<NetworkType> {
48  public:
50 
51  AckDeliver(PubSubSystemEnvironment * pssi, const typename NetworkType::Key & self) : BaseDeliver<NetworkType>(pssi), queue_(), acked_(), delivered_() {
52  periodicID_ = pssi->scheduler_.runRepeated(5000, boost::bind(&AckDeliver::periodicCheck, this), 1);
53  }
54 
55  virtual ~AckDeliver() {
56  BaseDeliver<NetworkType>::pssi_->scheduler_.stop(periodicID_);
57  }
58 
62  bool processPublishPayload(typename message::DeliverInfo::Ptr ptr, const typename NetworkType::Key & sender) {
63  return process(ptr, sender);
64  }
65 
69  bool processNotifyPayload(typename message::DeliverInfo::Ptr ptr, const typename NetworkType::Key & sender) {
70  return process(ptr, sender);
71  }
72 
76  bool processSubscribePayload(typename message::DeliverInfo::Ptr ptr, const typename NetworkType::Key & sender) {
77  return process(ptr, sender);
78  }
79 
83  bool processUnsubscribePayload(typename message::DeliverInfo::Ptr ptr, const typename NetworkType::Key & sender) {
84  return process(ptr, sender);
85  }
86 
87  bool process(typename message::DeliverInfo::Ptr ptr, const typename NetworkType::Key & sender) {
88  message::AckDeliverInfo::Ptr info = boost::static_pointer_cast<message::AckDeliverInfo>(ptr);
89  DeliverInfoType::Ptr newInfo = boost::make_shared<DeliverInfoType>();
90  newInfo->nr = info->nr;
92  if (delivered_[sender].contains(info->nr)) {
93  if (Multiples == Amount::AT_LEAST_ONCE) {
94  return true;
95  } else {
96  return false;
97  }
98  } else {
99  delivered_[sender].insert(info->nr);
100  return true;
101  }
102  }
103 
104  // same as other message, but always with strategy EXACTLY_ONCE
105  bool processOtherControlPayload(typename message::DeliverInfo::Ptr ptr, const typename NetworkType::Key & sender) {
106  message::AckDeliverInfo::Ptr info = boost::static_pointer_cast<message::AckDeliverInfo>(ptr);
107  DeliverInfoType::Ptr newInfo = boost::make_shared<DeliverInfoType>();
108  newInfo->nr = info->nr;
110  if (delivered_[sender].contains(info->nr)) {
111  return false;
112  } else {
113  delivered_[sender].insert(info->nr);
114  return true;
115  }
116  }
117 
121  void configureDeliverInfo(uint64_t id, const message::ActionType mtype, typename message::DeliverInfo::Ptr ptr, const typename NetworkType::Key & receiver, message::ControlType ct) {
122  if (ct != message::ControlType::DELIVER) {
123  message::AckDeliverInfo::Ptr info = boost::static_pointer_cast<message::AckDeliverInfo>(ptr);
124  info->nr = id;
125  uint64_t cT = BaseDeliver<NetworkType>::pssi_->scheduler_.getTime();
126  queue_.push(std::make_pair(cT + 100000, std::make_pair(id, 0)));
128  } else {
130  }
131  }
132 
136  bool processControlPayload(typename message::DeliverInfo::Ptr ptr, const typename NetworkType::Key & sender) {
137  message::AckDeliverInfo::Ptr info = boost::static_pointer_cast<message::AckDeliverInfo>(ptr);
138  acked_.insert(info->nr);
139  return false;
140  }
141 
145  bool periodicCheck() {
146  uint64_t cT = BaseDeliver<NetworkType>::pssi_->scheduler_.getTime();
147  while (!queue_.empty() && queue_.top().first < cT) {
148  // message wasn't acked
149  std::set<uint64_t>::const_iterator it = acked_.find(queue_.top().second.first);
150  if (it != acked_.end()) {
151  acked_.erase(it);
153  } else {
154  if (queue_.top().second.second < Retries) {
156  queue_.push(std::make_pair(cT + 100000, std::make_pair(queue_.top().second.first, queue_.top().second.second + 1)));
157  } else {
159  }
160  }
161  queue_.pop();
162  }
163  return true;
164  }
165 
166  private:
170  std::priority_queue<std::pair<uint64_t, std::pair<uint64_t, int>>, std::vector<std::pair<uint64_t, std::pair<uint64_t, int>>>, Comp> queue_;
171 
175  std::set<uint64_t> acked_;
176 
177  std::map<typename NetworkType::Key, util::SegmentTree<uint64_t>> delivered_;
178 
179  uint64_t periodicID_;
180  };
181 
182 } /* namespace deliver */
183 } /* namespace pubsub */
184 } /* namespace m2etis */
185 
186 #endif /* __M2ETIS_PUBSUB_DELIVER_ACKDELIVER_H__ */
187 
bool periodicCheck()
checks the queue for un-acked messages
Definition: AckDeliver.h:145
bool process(typename message::DeliverInfo::Ptr ptr, const typename NetworkType::Key &sender)
Definition: AckDeliver.h:87
message::AckDeliverInfo DeliverInfoType
Definition: AckDeliver.h:49
boost::shared_ptr< DeliverInfo > Ptr
bool processNotifyPayload(typename message::DeliverInfo::Ptr ptr, const typename NetworkType::Key &sender)
used to process payload
Definition: AckDeliver.h:69
void configureDeliverInfo(uint64_t id, const message::ActionType mtype, typename message::DeliverInfo::Ptr ptr, const typename NetworkType::Key &receiver, message::ControlType ct)
creates the DeliverInfo
Definition: AckDeliver.h:121
uint64_t runRepeated(uint64_t interval, const boost::function< bool(void)> &func, int16_t priority)
adds new job running repeatedly
Definition: Scheduler.h:94
AckDeliver(PubSubSystemEnvironment *pssi, const typename NetworkType::Key &self)
Definition: AckDeliver.h:51
bool processPublishPayload(typename message::DeliverInfo::Ptr ptr, const typename NetworkType::Key &sender)
used to process payload
Definition: AckDeliver.h:62
Scheduler< util::RealTimeClock > scheduler_
bool processSubscribePayload(typename message::DeliverInfo::Ptr ptr, const typename NetworkType::Key &sender)
used to process payload
Definition: AckDeliver.h:76
bool processOtherControlPayload(typename message::DeliverInfo::Ptr ptr, const typename NetworkType::Key &sender)
creates the DeliverInfo
Definition: AckDeliver.h:105
bool processControlPayload(typename message::DeliverInfo::Ptr ptr, const typename NetworkType::Key &sender)
processes Control Messages (id got acked)
Definition: AckDeliver.h:136
boost::function< void(message::DeliverInfo::Ptr, typename NetworkType::Key, ControlTarget)> sendCtrlMsg_
stores the function to send new messages This function will send a newly created control message cont...
Definition: BaseDeliver.h:117
bool processUnsubscribePayload(typename message::DeliverInfo::Ptr ptr, const typename NetworkType::Key &sender)
used to process payload
Definition: AckDeliver.h:83
boost::shared_ptr< AckDeliverInfo > Ptr
boost::function< void(uint64_t, msgProcess)> process_
stores the function to be called for delivering
Definition: BaseDeliver.h:107
bool operator()(const std::pair< uint64_t, std::pair< uint64_t, int >> &a, const std::pair< uint64_t, std::pair< uint64_t, int >> &b)
Definition: AckDeliver.h:41