m2etis  0.4
NetworkController.h
Go to the documentation of this file.
1 /*
2  Copyright (2016) Michael Baer, Daniel Bonrath, All rights reserved.
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15  */
16 
22 #ifndef __M2ETIS_NET_NETWORKCONTROLLER_H__
23 #define __M2ETIS_NET_NETWORKCONTROLLER_H__
24 
25 #include <map>
26 
28 #include "m2etis/util/Logger.h"
29 
32 
34 
37 
38 #include "boost/function.hpp"
39 #include "boost/shared_ptr.hpp"
40 #include "boost/thread.hpp"
41 
42 namespace m2etis {
43 namespace net {
44 
51  template<class NetworkType>
52  class NetworkController : public NetworkCallbackInterface<NetworkType> {
53  public:
54  typedef boost::function<void(typename message::NetworkMessage<NetworkType>::Ptr message)> net_deliver_func;
55  typedef boost::function<pubsub::FIPtr(typename message::NetworkMessage<NetworkType>::Ptr message)> net_forward_func;
56 
57  // TODO: (Daniel) this can be removed, use NetworkMessage directly instead
58  struct DeliverInfo {
59  explicit DeliverInfo(typename message::NetworkMessage<NetworkType>::Ptr ms) : msg_type(*ms->typePtr), message(ms) {
60  }
63  };
64 
65  // TODO: (Daniel) public access to pointer isn't what we normally do, so refactor to be conform with our normal coding style
67 
72  NetworkController(NetworkInterface<NetworkType> * network, pubsub::PubSubSystemEnvironment * pssi) : network_(network), deliver_map_(), forward_map_(), msgQueue_(), pssi_(pssi), _running(true) {
73  processingID_ = pssi->scheduler_.runRepeated(parameters::PULL_DELIVERQUEUE, boost::bind(&NetworkController::processDeliverQueue, this), 3);
74  network_->setCallback(this);
75  }
76 
81  pssi_->scheduler_.stop(processingID_);
82  _running = false;
83  delete network_;
84  network_ = nullptr;
85  }
86 
91  typename FMapType::const_iterator iter = forward_map_.find(*message->typePtr);
92  if (iter != forward_map_.end()) {
93  pubsub::FIPtr ptr = iter->second(message);
94  if (ptr->stop) {
95  // FIXME STOP THE MESSAGE
96  return true;
97  }
98  }
99  return false;
100  }
101 
108  if (*message->typePtr == 0) {
109  M2ETIS_THROW_FAILURE("NetworkController - deliver", "invalid message type", -1);
110  }
111 
112  msgQueue_.push(DeliverInfo(message));
113  }
114 
118  void update(const typename NetworkType::Key &, const typename NodeHandle<NetworkType>::Ptr_const, bool) {
119  }
120 
127  // don't send message from me to me over network, can be delivered directly
128  if (msg->receiver == msg->sender) {
129  deliver(msg);
130  return;
131  }
132  network_->send(msg, typename NodeHandle<NetworkType>::Ptr_const());
133  }
134 
135  void registerMessageType(message::MessageType type, const bool ack = true) {
136  network_->registerMessageType(type, ack);
137  }
138 
140  forward_map_.insert(std::make_pair(nr, f));
141  }
142 
144  deliver_map_.insert(std::make_pair(nr, f));
145  }
146 
148  forward_map_.erase(nr);
149  }
150 
152  deliver_map_.erase(nr);
153  }
154 
155  inline typename NetworkType::Key getSelf() const {
156  return network_->getSelfNodeHandle()->key_;
157  }
158 
159  private:
160  typedef std::map<message::MessageType, net_deliver_func> DMapType;
161  DMapType deliver_map_;
162  typedef std::map<message::MessageType, net_forward_func> FMapType;
163  FMapType forward_map_;
164  typedef util::DoubleBufferQueue<DeliverInfo> DIQueueType;
165  DIQueueType msgQueue_;
166 
168 
169  bool _running;
170 
171  uint64_t processingID_;
172 
173  bool processDeliverQueue() {
174  /* Was will ich hier:
175  * Später darf sich ein Threadpool um die Einträge in der Queue kümmern.
176  * Beim Rausnehmen sollen die sich natürlich nicht in die Quere kommen.
177  */
178  if (!_running) {
179  return false;
180  }
181  while (!msgQueue_.empty()) {
182  DeliverInfo di = msgQueue_.poll();
183 
184  typename DMapType::iterator it = deliver_map_.find(di.msg_type);
185 
186  if (it != deliver_map_.end()) {
187  it->second(di.message);
188  }
189  }
190  return _running;
191  }
192 
193  // make non-copyable
194  NetworkController & operator=(const NetworkController & rhs) = delete;
195 
196  NetworkController(const NetworkController & rhs) = delete;
197  };
198 
199 } /* namespace net */
200 } /* namespace m2etis */
201 
202 #endif /* __M2ETIS_NET_NETWORKCONTROLLER_H__ */
203 
virtual void send(const typename message::NetworkMessage< NetworkType >::Ptr msg, typename NodeHandle< NetworkType >::Ptr_const hint)=0
Sends a message to the node which is responsible for the given key. Provide a hint for better routing...
void register_deliver(message::MessageType nr, net_deliver_func f)
boost::shared_ptr< const ForwardInfo > FIPtr
Definition: ForwardInfo.h:34
NetworkType::Key sender
sender of the message
boost::function< pubsub::FIPtr(typename message::NetworkMessage< m2etis::net::NetworkType< UDP > >::Ptr message)> net_forward_func
void setCallback(NetworkCallbackInterface< NetworkType > *cb)
Sets the callback-object on which the defined callbacks will be called.
Generic interface for the callback-class used for the network.
NetworkType::Key receiver
receiver of the message
void send(typename message::NetworkMessage< NetworkType >::Ptr msg)
sends a message to the receiver defined within the message if the sender is the receiver, message is put into deliver method directly to remove overhead by serializing and trying to send message otherwise the message is forwarded to the corresponding wrapper
bool forward(typename message::NetworkMessage< NetworkType >::Ptr message, const typename NodeHandle< NetworkType >::Ptr hint)
void push(const T &value)
pushes the given value into the queue
void deregister_deliver(message::MessageType nr)
#define M2ETIS_THROW_FAILURE(module, message, errorcode)
throws on internal errors
Definition: Exceptions.h:33
NetworkType::Key getSelf() const
virtual void registerMessageType(const typename NetworkType::MessageType type, const bool ack) const =0
register a messagetype. Only registered types are sent.
boost::shared_ptr< const NodeHandle< NetworkType > > Ptr_const
Definition: NodeHandle.h:61
void update(const typename NetworkType::Key &, const typename NodeHandle< NetworkType >::Ptr_const, bool)
static const uint32_t PULL_DELIVERQUEUE
message::NetworkMessage< NetworkType >::Ptr message
~NetworkController()
stops polling job and cleans up wrapper
NetworkInterface< NetworkType > * network_
uint64_t runRepeated(uint64_t interval, const boost::function< bool(void)> &func, int16_t priority)
adds new job running repeatedly
Definition: Scheduler.h:94
void register_forward(message::MessageType nr, net_forward_func f)
boost::function< void(typename message::NetworkMessage< m2etis::net::NetworkType< UDP > >::Ptr message)> net_deliver_func
Scheduler< util::RealTimeClock > scheduler_
void deregister_forward(message::MessageType nr)
void registerMessageType(message::MessageType type, const bool ack=true)
boost::shared_ptr< NetworkMessage > Ptr
void stop(uint64_t id)
Definition: Scheduler.h:110
boost::shared_ptr< NodeHandle< NetworkType > > Ptr
Definition: NodeHandle.h:62
uint32_t MessageType
Definition: MessageType.h:35
virtual NodeHandle< NetworkType >::Ptr getSelfNodeHandle() const =0
query your NodeHandle. You may query that for it's key.
DeliverInfo(typename message::NetworkMessage< NetworkType >::Ptr ms)
NetworkController(NetworkInterface< NetworkType > *network, pubsub::PubSubSystemEnvironment *pssi)
creates new interface for communication with a wrapper adds a polling job for incoming messages ...
void deliver(typename message::NetworkMessage< NetworkType >::Ptr message)
called from wrapper for every arriving message because of this method call can be done from different...