m2etis  0.4
PubSubSystem.h
Go to the documentation of this file.
1 /*
2  Copyright (2016) Michael Baer, Daniel Bonrath, All rights reserved.
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15  */
16 
22 #ifndef __M2ETIS_PUBSUB_PUBSUBSYSTEM_H__
23 #define __M2ETIS_PUBSUB_PUBSUBSYSTEM_H__
24 
27 
29 
34 
35 #include "boost/function.hpp"
36 
37 // Forward Declarations
38 
39 namespace m2etis {
40 namespace pubsub {
41 
42  template<class EventType> class BasicDeliverCallbackInterface;
43  class PubSubSystemEnvironment;
44 
49  UNDEFINED = 3,
51  };
52 
61  public:
62  PubSubSystem(const std::string & listenIP, const uint16_t listenPort, const std::string & connectIP, const uint16_t connectPort, const std::vector<std::string> & rootList);
63 
64  ~PubSubSystem();
65 
72  template<class EventType> BasicChannelInterface<EventType> & subscribe(const ChannelName channel, BasicDeliverCallbackInterface<EventType> & callback);
73 
74  // FIXME create predicate interface
82  template<class EventType> BasicChannelInterface<EventType> & subscribe(const ChannelName channel, BasicDeliverCallbackInterface<EventType> & callback, boost::shared_ptr<filter::FilterExp<EventType> > predicate);
83 
89  template<class EventType> const PubSubSystem & unsubscribe(const ChannelName channel) const;
90 
97  template<class EventType> const PubSubSystem & unsubscribe(const ChannelName channel, const boost::shared_ptr<filter::FilterExp<EventType> > predicate) const;
98 
105  template<class EventType> const PubSubSystem & publish(const ChannelName channel, const typename message::M2Message<EventType>::Ptr publish_message) const;
106 
110  // FIXME: (Daniel) if we kick M2Message, we can kick also these to methods and publish Payload directly
111  template<class EventType> typename message::M2Message<EventType>::Ptr createMessage(const ChannelName channel, const EventType & payload);
112  template<class EventType> typename message::M2Message<EventType>::Ptr createMessage(const ChannelName channel);
113 
119  template<class EventType> std::string getSelf(const ChannelName channel) const;
120 
124  inline bool isInitialized() const { return initialized; }
125 
126  void registerExceptionCallback(exceptionEvents e, boost::function<void(const std::string &)> _ptr);
127 
128  private:
129  PubSubSystem(const PubSubSystem &) = delete;
130  PubSubSystem & operator=(const PubSubSystem &) = delete;
131 
132  bool exceptionLoop();
133 
134  public:
136 
137  private:
138  ChannelConfiguration * channels_;
139  bool initialized;
140 
141  std::vector<std::vector<boost::function<void(const std::string &)>>> _exceptionCallbacks;
142 
143  uint64_t exceptionID_;
144 
145  bool _running;
146 
147 #ifdef WITH_SIM
148  public:
149 #endif
150 
157  template<class EventType> BasicChannelInterface<EventType> * getChannelHandle(const ChannelName channel) const;
158  };
159 
160  template<class EventType> std::string PubSubSystem::getSelf(ChannelName channel) const {
161  return "";
162  }
163 
164  template<class EventType> BasicChannelInterface<EventType> * PubSubSystem::getChannelHandle(ChannelName channel) const {
165  if (!initialized) {
166  M2ETIS_THROW_API("PubSubSystem", "Invalid call, initialize PubSubSystem first.");
167  }
168  if (channel >= channels_->count) {
169  M2ETIS_THROW_API("PubSubSystem", std::string("Invalid channel, enum exceeds CHANNEL_COUNT: ") + std::to_string(channel));
170  }
171  BasicChannelInterface<EventType> * const ret = dynamic_cast<BasicChannelInterface<EventType> * const>(channels_->channels()[channel]);
172  if (ret == NULL) {
173  M2ETIS_THROW_API("PubSubSystem", "Invalid channel type cast. Check your channel configuration.");
174  }
175  return ret;
176  }
177 
178  template<class EventType> BasicChannelInterface<EventType> & PubSubSystem::subscribe(ChannelName channel, BasicDeliverCallbackInterface<EventType> & callback) {
179  const boost::shared_ptr<filter::FilterExp<EventType> > pred = boost::make_shared<filter::TruePredicate<EventType> >();
180 
181  return subscribe(channel, callback, pred);
182  }
183 
184  template<class EventType> BasicChannelInterface<EventType> & PubSubSystem::subscribe(const ChannelName channel, BasicDeliverCallbackInterface<EventType> & callback, const boost::shared_ptr<filter::FilterExp<EventType> > predicate) {
185  BasicChannelInterface<EventType> * const ret = getChannelHandle<EventType>(channel);
186 
187  ret->subscribe(callback, predicate);
188  return *ret;
189  }
190 
191  template<class EventType> const PubSubSystem & PubSubSystem::unsubscribe(ChannelName channel) const {
192  BasicChannelInterface<EventType> * const ret = getChannelHandle<EventType>(channel);
193  ret->unsubscribe();
194  return *this;
195  }
196 
197 
198  // for filter strategies: deregistering single filter:
199  template<class EventType> const PubSubSystem & PubSubSystem::unsubscribe(const ChannelName channel, const boost::shared_ptr<filter::FilterExp<EventType> > predicate) const {
200  BasicChannelInterface<EventType> * const ret = getChannelHandle<EventType>(channel);
201  ret->unsubscribe(predicate);
202  return *this;
203  }
204 
205  template<class EventType> const PubSubSystem & PubSubSystem::publish(const ChannelName channel, const typename message::M2Message<EventType>::Ptr publish_message) const {
206  BasicChannelInterface<EventType> * const ret = getChannelHandle<EventType>(channel);
207  ret->publish(publish_message);
208  return *this;
209  }
210 
211  template<class EventType> typename message::M2Message<EventType>::Ptr PubSubSystem::createMessage(const ChannelName channel, const EventType & payload) {
212  return getChannelHandle<EventType>(channel)->createMessage(payload);
213  }
214 
215  template<class EventType> typename message::M2Message<EventType>::Ptr PubSubSystem::createMessage(const ChannelName channel) {
216  return createMessage<EventType>(channel, EventType());
217  }
218 
219 } /* namespace pubsub */
220 } /* namespace m2etis */
221 
222 #endif /* __M2ETIS_PUBSUB_PUBSUBSYSTEM_H__ */
223 
virtual void subscribe(BasicDeliverCallbackInterface< EventType > &callback, boost::shared_ptr< filter::FilterExp< EventType > >)=0
subscribes to the channel
boost::shared_ptr< M2Message< EventType > > Ptr
Definition: M2Message.h:42
#define M2ETIS_API
const PubSubSystem & unsubscribe(const ChannelName channel) const
User unsubscribes to the requested channel.
Definition: PubSubSystem.h:191
BasicChannelInterface< EventType > & subscribe(const ChannelName channel, BasicDeliverCallbackInterface< EventType > &callback)
User subscribes to the requested channel.
Definition: PubSubSystem.h:178
const PubSubSystem & publish(const ChannelName channel, const typename message::M2Message< EventType >::Ptr publish_message) const
Publishes a message to a channel.
Definition: PubSubSystem.h:205
const std::vector< ChannelEventInterface * > & channels() const
PubSubSystemEnvironment * _pssi
Definition: PubSubSystem.h:135
std::string getSelf(const ChannelName channel) const
Returns a string representation of the requested channel.
Definition: PubSubSystem.h:160
bool isInitialized() const
sets callback for disconnect
Definition: PubSubSystem.h:124
message::M2Message< EventType >::Ptr createMessage(const ChannelName channel, const EventType &payload)
returns a M2Message for the given EventType and Payload
class for accessing the m2etis pub/sub system. It is the main entry point for the usage of the m2etis...
Definition: PubSubSystem.h:60
virtual void publish(const typename message::M2Message< EventType >::Ptr msg)=0
publishes a message on the channel
#define M2ETIS_THROW_API(module, message)
throws on wrong API usage
Definition: Exceptions.h:40