22 #ifndef __M2ETIS_PUBSUB_ROUTING_HIERARCHICALSPREADITROUTING_H__ 
   23 #define __M2ETIS_PUBSUB_ROUTING_HIERARCHICALSPREADITROUTING_H__ 
   35 #include "boost/date_time/posix_time/posix_time_types.hpp" 
   41         template<
class NetworkType, 
unsigned int ChildAmount>
 
   45                 typedef std::pair<long, typename NetworkType::Key> 
TimePair;
 
   46                 typedef std::vector<typename NetworkType::Key> 
KeyList;
 
   85                         , removed_subscriber_eventlistener_()
 
   86                         , topic_name_(topic_name), self_(), subscribed_(false), subscriber_()
 
   87                         , purging_(true), _root(root) {
 
   88                         purgeID_ = pssi->
scheduler_.
runRepeated(purge_distance_, boost::bind(&HierarchicalSpreaditRouting::purgeList, 
this), 0);
 
   96                 void setSelf(
const typename NetworkType::Key & 
self) {
 
  101             removed_subscriber_eventlistener_ = listener;
 
  123                                 if (!parent_vector.empty()) {
 
  124                                         rInfo->node_adress = parent_vector.at(0).second;
 
  130                                 rInfo->selfSend = 
true;
 
  131                                 rInfo->sender = self_;
 
  136                                 rInfo->sender = self_;
 
  144                                 M2ETIS_THROW_FAILURE(
"HierarchicalSpreaditRouting", 
"configureRoutingInfo called with wrong action type", -1);
 
  160                                         m.push_back(rInfo->node_adress);
 
  161                                 } 
else if (!parent_vector.empty()) { 
 
  162                                         m.push_back(parent_vector.begin()->second);
 
  164                                         if (self_ == _root) {
 
  172                                 if (receiver == 
typename NetworkType::Key()) {
 
  179                                         m.resize(subscriber_.size());
 
  180                                         std::transform(subscriber_.begin(), subscriber_.end(), m.begin(), T::get_from_pair);
 
  182                                         m.push_back(receiver);
 
  189                                 if (!parent_vector.empty()) { 
 
  190                                         if (rInfo->sender != parent_vector.begin()->second) { 
 
  191                                                 m.push_back(parent_vector.begin()->second); 
 
  194                                         if (self_ != _root) { 
 
  199                                 for (TimePair p : subscriber_) {
 
  200                                         if (p.second != rInfo->sender) {
 
  201                                                 m.push_back(p.second); 
 
  204                                 if (rInfo->selfSend) {
 
  207                                 rInfo->sender = self_;
 
  215                                          static typename NetworkType::Key get_from_pair(
const TimePair & p) {
 
  220                                 m.resize(subscriber_.size());
 
  221                                 std::transform(subscriber_.begin(), subscriber_.end(), m.begin(), T::get_from_pair);
 
  223                                 if (!parent_vector.empty()) {
 
  224                                         m.push_back(parent_vector.at(0).second);
 
  236                         bool is_subscribe_successful = 
true;
 
  238                         typename TimeList::iterator iter;
 
  240                         for (iter = subscriber_.begin(); iter != subscriber_.end(); ++iter) {
 
  241                                 if (iter->second == sender) {
 
  253                                 if (bandwidth_capacity > subscriber_.size()) {
 
  255                                         subscriber_.push_back(std::make_pair(_pssi->
clock_.
getTime(), sender));
 
  261                                         rInfo->node_adress = 
typename NetworkType::Key(subscriber_.at(round_robin_pointer).second);
 
  262                                         round_robin_pointer = (round_robin_pointer + 1) % subscriber_.size();
 
  264                                         is_subscribe_successful = 
false;
 
  269                         return is_subscribe_successful;
 
  276                                 static bool test(
const typename NetworkType::Key & send, 
const TimePair & paar) {
 
  277                                         return paar.second == send;
 
  281                         subscriber_.erase(std::remove_if(subscriber_.begin(), subscriber_.end(), boost::bind(T::test, sender, _1)), subscriber_.end());
 
  284                         if (!parent_vector.empty()) {
 
  285                                 if (sender == parent_vector.at(0).second) {
 
  287                                         receiver = rInfo->node_adress;
 
  299                         if ((subscriber_.empty() && (parent_vector.empty() && self_ == _root)) || sender == self_) {
 
  308                                 if (parent_vector.empty()) {
 
  309                                         parent_vector.push_back(std::make_pair(_pssi->
clock_.
getTime(), sender));
 
  311                                         parent_vector.begin()->second = sender;
 
  317                                 receiver = rInfo->node_adress;
 
  326                 const unsigned int topic_name_;
 
  328                 typename NetworkType::Key self_;
 
  332                 unsigned int bandwidth_capacity = ChildAmount;
 
  333                 unsigned int round_robin_pointer = 0;
 
  336                 TimeList subscriber_;
 
  338                 TimeList parent_vector;
 
  341                 volatile bool purging_;
 
  343                 typename NetworkType::Key _root;
 
  358                                 static bool test(
const long & p, 
const long & jetzt, 
const TimePair & paar) {
 
  359                                         if ((jetzt - paar.first) > p) {
 
  366                         if (subscriber_.empty() && parent_vector.empty()) {
 
  376                         auto iter_first_erased_subscriber = std::remove_if(subscriber_.begin(), subscriber_.end(), boost::bind(T::test, purge_distance_, jetzt, _1));
 
  378                         for (
auto iter_subscriber = iter_first_erased_subscriber; iter_subscriber != subscriber_.end(); ++iter_subscriber) {
 
  382                         subscriber_.erase(iter_first_erased_subscriber, subscriber_.end());
 
const long purge_distance_
 
bool processSubscribePayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &sender, typename NetworkType::Key &receiver, message::ActionType &msgType) override
 
std::vector< TimePair > TimeList
 
static const bool register_deliver_subscribe
 
static const bool register_deliver_unsubscribe
 
#define M2ETIS_THROW_FAILURE(module, message, errorcode)
throws on internal errors 
 
bool selfSubscribed() const 
 
void setUnsubscriptionListener(const boost::function< void(const typename NetworkType::Key)> &listener)
 
#define M2ETIS_LOG_DEBUG(module, message)
 
HierarchicalSpreaditRouting(unsigned int topic_name, PubSubSystemEnvironment *pssi, const typename NetworkType::Key &root)
 
void configureRoutingInfo(message::ActionType &msgType, typename message::RoutingInfo< NetworkType >::Ptr routingInfo, typename NetworkType::Key &receiver)
 
PubSubSystemEnvironment * _pssi
 
void setSelf(const typename NetworkType::Key &self)
 
static const bool register_forward_unsubscribe
 
uint64_t runRepeated(uint64_t interval, const boost::function< bool(void)> &func, int16_t priority)
adds new job running repeatedly 
 
boost::function< void(const typename NetworkType::Key)> removed_subscriber_eventlistener_
 
virtual ~HierarchicalSpreaditRouting()
 
std::pair< long, typename NetworkType::Key > TimePair
 
Scheduler< util::RealTimeClock > scheduler_
 
util::Clock< util::RealTimeClock > clock_
 
#define M2ETIS_LOG_ERROR(module, message)
 
static const bool periodicSubscribtion
 
boost::shared_ptr< RoutingInfo< NetworkType > > Ptr
 
uint64_t getTime() const 
Will return the time since the Clock on the rendezvouz node has started. 
 
void processPublishPayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &sender, typename NetworkType::Key &receiver, message::ActionType &msgType)
 
void processUnsubscribePayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &sender, typename NetworkType::Key &receiver, message::ActionType &msgType)
 
void processControlPayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &sender, typename NetworkType::Key &receiver, message::ActionType &msgType)
 
void processNotifyPayload(typename message::RoutingInfo< NetworkType >::Ptr routingInfo, const typename NetworkType::Key &sender, typename NetworkType::Key &receiver, message::ActionType &msgType)
 
static const bool register_forward_subscribe
 
void selfSubscribed(const bool b)
 
boost::shared_ptr< HierarchicalSpreadItRoutingInfo< NetworkType > > Ptr
 
static const uint64_t PURGE_DISTANCE
 
std::vector< typename NetworkType::Key > KeyList
 
message::HierarchicalSpreadItRoutingInfo< NetworkType > RoutingInfoType
 
KeyList getTargetNodes(const message::ActionType mtype, typename message::RoutingInfo< NetworkType >::Ptr routingInfo, typename NetworkType::Key &receiver) const 
 
static const uint64_t RESUBSCRIPTION_INTERVAL