m2etis  0.4
NackDeliver.h
Go to the documentation of this file.
1 /*
2  Copyright (2016) Michael Baer, Daniel Bonrath, All rights reserved.
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15  */
16 
22 #ifndef __M2ETIS_PUBSUB_DELIVER_NACKDELIVER_H__
23 #define __M2ETIS_PUBSUB_DELIVER_NACKDELIVER_H__
24 
25 #include <string>
26 #include <queue>
27 #include <map>
28 
30 
33 
34 namespace m2etis {
35 namespace pubsub {
36 namespace deliver {
37 
38  template<class NetworkType, int Retries, Amount Multiples>
39  class NackDeliver : public BaseDeliver<NetworkType> {
40  public:
42 
43  NackDeliver(PubSubSystemEnvironment * pssi, const typename NetworkType::Key & self) : BaseDeliver<NetworkType>(pssi), buffer_(), missing_(), lastMessages_(), lastID_(), self_(self), delivered_() {
44  periodicID_ = pssi->scheduler_.runRepeated(REQUESTTIME / 2, boost::bind(&NackDeliver::periodicCheck, this), 1);
45  }
46 
47  virtual ~NackDeliver() {
48  BaseDeliver<NetworkType>::pssi_->scheduler_.stop(periodicID_);
49  }
50 
54  bool processPublishPayload(typename message::DeliverInfo::Ptr ptr, const typename NetworkType::Key & sender) {
55  return processPublishAndNotify(ptr, sender);
56  }
57 
61  bool processNotifyPayload(typename message::DeliverInfo::Ptr ptr, const typename NetworkType::Key & sender) {
62  return processPublishAndNotify(ptr, sender);
63  }
64 
68  bool processSubscribePayload(typename message::DeliverInfo::Ptr ptr, const typename NetworkType::Key & sender) {
69  return processPublishAndNotify(ptr, sender);
70  }
71 
75  bool processUnsubscribePayload(typename message::DeliverInfo::Ptr ptr, const typename NetworkType::Key & sender) {
76  return processPublishAndNotify(ptr, sender);
77  }
78 
79  bool processOtherControlPayload(typename message::DeliverInfo::Ptr ptr, const typename NetworkType::Key & sender) {
80  return processPublishAndNotify(ptr, sender);
81  }
82 
86  void configureDeliverInfo(uint64_t id, const message::ActionType mtype, typename message::DeliverInfo::Ptr ptr, const typename NetworkType::Key & receiver, message::ControlType ct) {
88  message::NackDeliverInfo::Ptr info = boost::static_pointer_cast<message::NackDeliverInfo>(ptr);
89  info->nr = lastID_[receiver]++;
90  info->dropped = dropped_[receiver];
91  dropped_[receiver].clear();
92  buffer_.insert(std::make_pair(std::make_pair(self_, info->nr), std::make_pair(id, BaseDeliver<NetworkType>::pssi_->scheduler_.getTime() + BUFFERTIME)));
94  } else {
96  }
97  }
98 
102  bool processControlPayload(typename message::DeliverInfo::Ptr ptr, const typename NetworkType::Key & sender) {
103  message::NackDeliverInfo::Ptr info = boost::static_pointer_cast<message::NackDeliverInfo>(ptr);
104  typename std::map<std::pair<typename NetworkType::Key, uint64_t>, std::pair<uint64_t, uint64_t> >::iterator it = buffer_.find(std::make_pair(self_, info->nr));
105  if (it != buffer_.end()) {
106  it->second.second = BaseDeliver<NetworkType>::pssi_->scheduler_.getTime() + BUFFERTIME; // reset timeout for this message
107  BaseDeliver<NetworkType>::process_(it->second.first, msgProcess::MSG_PROCESS); // and resend it
108  } else {
109  dropped_[sender].push_back(info->nr);
110  }
111  return false;
112  }
113 
114  private:
115  bool processPublishAndNotify(typename message::DeliverInfo::Ptr ptr, const typename NetworkType::Key & sender) {
116  message::NackDeliverInfo::Ptr info = boost::static_pointer_cast<message::NackDeliverInfo>(ptr);
117  typename std::map<typename NetworkType::Key, uint64_t>::iterator it = lastMessages_.find(sender);
118  bool ret = false;
119  if (it != lastMessages_.end()) {
120  if (it->second > info->nr) { // message received is either duplicate or older unreceived message
121  if (missing_[sender].find(info->nr) == missing_[sender].end()) { // message was already received
122  if (Multiples == Amount::AT_LEAST_ONCE) {
123  ret = true;
124  } else {
125  ret = false;
126  }
127  } else { // message is received first time
128  missing_[sender].erase(info->nr);
129  ret = true;
130  }
131  } else if (it->second < info->nr) { // a definitly new message arrived
132  typename std::map<typename NetworkType::Key, std::map<uint64_t, std::pair<uint64_t, int>> >::iterator it2 = missing_.find(sender);
133  for (uint64_t i = it->second + 1; i < info->nr; ++i) {
134  it2->second[i] = std::make_pair(BaseDeliver<NetworkType>::pssi_->scheduler_.getTime() + REQUESTTIME / 2, 0); // add all IDs between the last this node received and the one actually received
135  }
136  it->second = info->nr;
137  ret = true;
138  } else { // the current message arrives again
139  if (Multiples == Amount::AT_LEAST_ONCE) {
140  ret = true;
141  } else {
142  ret = false;
143  }
144  }
145  } else {
146  lastMessages_[sender] = info->nr;
147  std::map<uint64_t, std::pair<uint64_t, int>> missings;
148  for (uint64_t i = 0; i < info->nr; ++i) {
149  missings[i] = std::make_pair(BaseDeliver<NetworkType>::pssi_->scheduler_.getTime() + REQUESTTIME / 2, 0); // add all IDs between the last this node received and the one actually received
150  }
151  missing_[sender] = missings;
152  ret = true;
153  }
154  for (uint64_t p : info->dropped) {
155  if (p < lastMessages_[sender]) {
156  missing_[sender].erase(p);
157  } else {
158  for (uint64_t i = lastMessages_[sender] + 1; i < p; ++i) {
159  missing_[sender][i] = std::make_pair(BaseDeliver<NetworkType>::pssi_->scheduler_.getTime() + REQUESTTIME / 2, 0); // add all IDs between the last this node received and the one actually received
160  }
161  lastMessages_[sender] = p;
162  }
163  }
164  return ret;
165  }
166 
170  bool periodicCheck() {
171  uint64_t cT = BaseDeliver<NetworkType>::pssi_->scheduler_.getTime();
172  for (typename std::map<typename NetworkType::Key, std::map<uint64_t, std::pair<uint64_t, int>> >::iterator it = missing_.begin(); it != missing_.end(); ++it) {
173  for (typename std::map<uint64_t, std::pair<uint64_t, int>>::iterator it2 = it->second.begin(); it2 != it->second.end();) {
174  if (it2->second.first < cT) {
175  if (it2->second.second < Retries || Multiples == Amount::EXACTLY_ONCE) {
176  typename DeliverInfoType::Ptr newInfo = boost::make_shared<DeliverInfoType>();
177  newInfo->nr = it2->first;
179  it2->second.second++;
180  it2->second.first = cT + REQUESTTIME;
181  it2++;
182  } else {
183  typename std::map<uint64_t, std::pair<uint64_t, int>>::iterator itDelete = it2;
184  it2++;
185  it->second.erase(itDelete);
186  }
187  } else {
188  it2++;
189  }
190  }
191  }
192  for (typename std::map<std::pair<typename NetworkType::Key, uint64_t>, std::pair<uint64_t, uint64_t> >::iterator it = buffer_.begin(); it != buffer_.end();) {
193  if (it->second.second < cT) {
194  BaseDeliver<NetworkType>::process_(it->second.first, msgProcess::MSG_DELETE); // remove message from buffer
195  typename std::map<std::pair<typename NetworkType::Key, uint64_t>, std::pair<uint64_t, uint64_t> >::iterator itDelete = it;
196  ++it;
197  buffer_.erase(itDelete);
198  } else {
199  ++it;
200  }
201  }
202  return true;
203  }
204 
208  std::map<std::pair<typename NetworkType::Key, uint64_t>, std::pair<uint64_t, uint64_t> > buffer_;
209 
210  std::map<typename NetworkType::Key, std::map<uint64_t, std::pair<uint64_t, int>> > missing_;
211 
212  std::map<typename NetworkType::Key, uint64_t> lastMessages_;
213 
214  std::map<typename NetworkType::Key, std::vector<uint64_t>> dropped_;
215 
216  std::map<typename NetworkType::Key, uint64_t> lastID_;
217 
218  typename NetworkType::Key self_;
219 
220  uint64_t periodicID_;
221 
222  util::SegmentTree<uint64_t> delivered_;
223 
224  const uint64_t REQUESTTIME = 100000;
225 
226  const uint64_t BUFFERTIME = 10000000;
227  };
228 
229 } /* namespace deliver */
230 } /* namespace pubsub */
231 } /* namespace m2etis */
232 
233 #endif /* __M2ETIS_PUBSUB_DELIVER_NACKDELIVER_H__ */
234 
bool processUnsubscribePayload(typename message::DeliverInfo::Ptr ptr, const typename NetworkType::Key &sender)
used to process payload
Definition: NackDeliver.h:75
bool processPublishPayload(typename message::DeliverInfo::Ptr ptr, const typename NetworkType::Key &sender)
used to process payload
Definition: NackDeliver.h:54
boost::shared_ptr< DeliverInfo > Ptr
bool processSubscribePayload(typename message::DeliverInfo::Ptr ptr, const typename NetworkType::Key &sender)
used to process payload
Definition: NackDeliver.h:68
NackDeliver(PubSubSystemEnvironment *pssi, const typename NetworkType::Key &self)
Definition: NackDeliver.h:43
boost::shared_ptr< NackDeliverInfo > Ptr
bool processControlPayload(typename message::DeliverInfo::Ptr ptr, const typename NetworkType::Key &sender)
processes Control Messages (id got acked)
Definition: NackDeliver.h:102
uint64_t runRepeated(uint64_t interval, const boost::function< bool(void)> &func, int16_t priority)
adds new job running repeatedly
Definition: Scheduler.h:94
Scheduler< util::RealTimeClock > scheduler_
bool processNotifyPayload(typename message::DeliverInfo::Ptr ptr, const typename NetworkType::Key &sender)
used to process payload
Definition: NackDeliver.h:61
void configureDeliverInfo(uint64_t id, const message::ActionType mtype, typename message::DeliverInfo::Ptr ptr, const typename NetworkType::Key &receiver, message::ControlType ct)
creates the DeliverInfo
Definition: NackDeliver.h:86
boost::function< void(message::DeliverInfo::Ptr, typename NetworkType::Key, ControlTarget)> sendCtrlMsg_
stores the function to send new messages This function will send a newly created control message cont...
Definition: BaseDeliver.h:117
message::NackDeliverInfo DeliverInfoType
Definition: NackDeliver.h:41
boost::function< void(uint64_t, msgProcess)> process_
stores the function to be called for delivering
Definition: BaseDeliver.h:107
bool processOtherControlPayload(typename message::DeliverInfo::Ptr ptr, const typename NetworkType::Key &sender)
creates the DeliverInfo
Definition: NackDeliver.h:79