Node.hh
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2012 Open Source Robotics Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 
18 #ifndef GAZEBO_TRANSPORT_NODE_HH_
19 #define GAZEBO_TRANSPORT_NODE_HH_
20 
21 #include <tbb/task.h>
22 #include <boost/bind.hpp>
23 #include <boost/enable_shared_from_this.hpp>
24 #include <map>
25 #include <list>
26 #include <string>
27 #include <vector>
28 
31 #include "gazebo/util/system.hh"
32 
33 namespace gazebo
34 {
35  namespace transport
36  {
39  class GZ_TRANSPORT_VISIBLE PublishTask : public tbb::task
40  {
44  public: PublishTask(transport::PublisherPtr _pub,
45  const google::protobuf::Message &_message)
46  : pub(_pub)
47  {
48  this->msg = _message.New();
49  this->msg->CopyFrom(_message);
50  }
51 
54  public: tbb::task *execute()
55  {
56  this->pub->WaitForConnection();
57  this->pub->Publish(*this->msg, true);
58  this->pub->SendMessage();
59  delete this->msg;
60  this->pub.reset();
61  return NULL;
62  }
63 
65  private: transport::PublisherPtr pub;
66 
68  private: google::protobuf::Message *msg;
69  };
71 
74 
78  class GZ_TRANSPORT_VISIBLE Node :
79  public boost::enable_shared_from_this<Node>
80  {
82  public: Node();
83 
85  public: virtual ~Node();
86 
95  public: void Init(const std::string &_space ="");
96 
108  public: bool TryInit(
109  const common::Time &_maxWait = common::Time(1, 0));
110 
115  public: bool IsInitialized() const;
116 
118  public: void Fini();
119 
122  public: std::string GetTopicNamespace() const;
123 
127  public: std::string DecodeTopicName(const std::string &_topic);
128 
132  public: std::string EncodeTopicName(const std::string &_topic);
133 
136  public: unsigned int GetId() const;
137 
140  public: void ProcessPublishers();
141 
143  public: void ProcessIncoming();
144 
148  public: bool HasLatchedSubscriber(const std::string &_topic) const;
149 
150 
157  public: template<typename M>
158  void Publish(const std::string &_topic,
159  const google::protobuf::Message &_message)
160  {
161  transport::PublisherPtr pub = this->Advertise<M>(_topic);
162  PublishTask *task = new(tbb::task::allocate_root())
163  PublishTask(pub, _message);
164 
165  tbb::task::enqueue(*task);
166  return;
167  }
168 
176  public: template<typename M>
177  transport::PublisherPtr Advertise(const std::string &_topic,
178  unsigned int _queueLimit = 1000,
179  double _hzRate = 0)
180  {
181  std::string decodedTopic = this->DecodeTopicName(_topic);
182  PublisherPtr publisher =
183  transport::TopicManager::Instance()->Advertise<M>(
184  decodedTopic, _queueLimit, _hzRate);
185 
186  boost::mutex::scoped_lock lock(this->publisherMutex);
187  publisher->SetNode(shared_from_this());
188  this->publishers.push_back(publisher);
189 
190  return publisher;
191  }
192 
200  public: template<typename M, typename T>
201  SubscriberPtr Subscribe(const std::string &_topic,
202  void(T::*_fp)(const boost::shared_ptr<M const> &), T *_obj,
203  bool _latching = false)
204  {
205  SubscribeOptions ops;
206  std::string decodedTopic = this->DecodeTopicName(_topic);
207  ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
208 
209  {
210  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
211  this->callbacks[decodedTopic].push_back(CallbackHelperPtr(
212  new CallbackHelperT<M>(boost::bind(_fp, _obj, _1), _latching)));
213  }
214 
215  SubscriberPtr result =
216  transport::TopicManager::Instance()->Subscribe(ops);
217 
218  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
219 
220  return result;
221  }
222 
229  public: template<typename M>
230  SubscriberPtr Subscribe(const std::string &_topic,
231  void(*_fp)(const boost::shared_ptr<M const> &),
232  bool _latching = false)
233  {
234  SubscribeOptions ops;
235  std::string decodedTopic = this->DecodeTopicName(_topic);
236  ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
237 
238  {
239  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
240  this->callbacks[decodedTopic].push_back(
241  CallbackHelperPtr(new CallbackHelperT<M>(_fp, _latching)));
242  }
243 
244  SubscriberPtr result =
245  transport::TopicManager::Instance()->Subscribe(ops);
246 
247  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
248 
249  return result;
250  }
251 
259  template<typename T>
260  SubscriberPtr Subscribe(const std::string &_topic,
261  void(T::*_fp)(const std::string &), T *_obj,
262  bool _latching = false)
263  {
264  SubscribeOptions ops;
265  std::string decodedTopic = this->DecodeTopicName(_topic);
266  ops.Init(decodedTopic, shared_from_this(), _latching);
267 
268  {
269  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
270  this->callbacks[decodedTopic].push_back(CallbackHelperPtr(
271  new RawCallbackHelper(boost::bind(_fp, _obj, _1))));
272  }
273 
274  SubscriberPtr result =
275  transport::TopicManager::Instance()->Subscribe(ops);
276 
277  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
278 
279  return result;
280  }
281 
282 
289  SubscriberPtr Subscribe(const std::string &_topic,
290  void(*_fp)(const std::string &), bool _latching = false)
291  {
292  SubscribeOptions ops;
293  std::string decodedTopic = this->DecodeTopicName(_topic);
294  ops.Init(decodedTopic, shared_from_this(), _latching);
295 
296  {
297  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
298  this->callbacks[decodedTopic].push_back(
300  }
301 
302  SubscriberPtr result =
303  transport::TopicManager::Instance()->Subscribe(ops);
304 
305  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
306 
307  return result;
308  }
309 
314  public: bool HandleData(const std::string &_topic,
315  const std::string &_msg);
316 
321  public: bool HandleMessage(const std::string &_topic, MessagePtr _msg);
322 
329  public: void InsertLatchedMsg(const std::string &_topic,
330  const std::string &_msg);
331 
338  public: void InsertLatchedMsg(const std::string &_topic,
339  MessagePtr _msg);
340 
344  public: std::string GetMsgType(const std::string &_topic) const;
345 
351  public: void RemoveCallback(const std::string &_topic, unsigned int _id);
352 
364  private: bool PrivateInit(const std::string &_space,
365  const common::Time &_maxWait,
366  const bool _fallbackToDefault);
367 
368  private: std::string topicNamespace;
369  private: std::vector<PublisherPtr> publishers;
370  private: std::vector<PublisherPtr>::iterator publishersIter;
371  private: static unsigned int idCounter;
372  private: unsigned int id;
373 
374  private: typedef std::list<CallbackHelperPtr> Callback_L;
375  private: typedef std::map<std::string, Callback_L> Callback_M;
376  private: Callback_M callbacks;
377  private: std::map<std::string, std::list<std::string> > incomingMsgs;
378 
380  private: std::map<std::string, std::list<MessagePtr> > incomingMsgsLocal;
381 
382  private: boost::mutex publisherMutex;
383  private: boost::mutex publisherDeleteMutex;
384  private: boost::recursive_mutex incomingMutex;
385 
388  private: boost::recursive_mutex processIncomingMutex;
389 
390  private: bool initialized;
391  };
393  }
394 }
395 #endif
void ProcessIncoming()
Process incoming messages.
SubscriberPtr Subscribe(const std::string &_topic, void(*_fp)(const boost::shared_ptr< M const > &), bool _latching=false)
Subscribe to a topic using a bare function as the callback.
Definition: Node.hh:230
Node()
Constructor.
Options for a subscription.
Definition: SubscribeOptions.hh:35
virtual ~Node()
Destructor.
Forward declarations for the common classes.
Definition: Animation.hh:26
boost::shared_ptr< google::protobuf::Message > MessagePtr
Definition: TransportTypes.hh:45
A Time class, can be used to hold wall- or sim-time. stored as sec and nano-sec.
Definition: Time.hh:47
#define NULL
Definition: CommonTypes.hh:31
boost::shared_ptr< Publisher > PublisherPtr
Definition: TransportTypes.hh:49
bool HandleMessage(const std::string &_topic, MessagePtr _msg)
Handle incoming msg.
std::string GetMsgType(const std::string &_topic) const
Get the message type for a topic.
Forward declarations for transport.
bool HandleData(const std::string &_topic, const std::string &_msg)
Handle incoming data.
std::string EncodeTopicName(const std::string &_topic)
Encode a topic name.
void InsertLatchedMsg(const std::string &_topic, const std::string &_msg)
Add a latched message to the node for publication.
std::string DecodeTopicName(const std::string &_topic)
Decode a topic name.
boost::shared_ptr< Subscriber > SubscriberPtr
Definition: TransportTypes.hh:53
void Publish(const std::string &_topic, const google::protobuf::Message &_message)
A convenience function for a one-time publication of a message.
Definition: Node.hh:158
SubscriberPtr Subscribe(const std::string &_topic, void(*_fp)(const std::string &), bool _latching=false)
Subscribe to a topic using a bare function as the callback.
Definition: Node.hh:289
A node can advertise and subscribe topics, publish on advertised topics and listen to subscribed topi...
Definition: Node.hh:78
unsigned int GetId() const
Get the unique ID of the node.
GAZEBO_VISIBLE void Init(google::protobuf::Message &_message, const std::string &_id="")
Initialize a message.
bool TryInit(const common::Time &_maxWait=common::Time(1, 0))
Try to initialize the node to use the global namespace, and specify the maximum wait time.
Callback helper Template.
Definition: CallbackHelper.hh:111
static TopicManager * Instance()
Get an instance of the singleton.
Definition: SingletonT.hh:36
void RemoveCallback(const std::string &_topic, unsigned int _id)
void Init(const std::string &_space="")
Init the node.
void Init(const std::string &_topic, NodePtr _node, bool _latching)
Initialize the options.
Definition: SubscribeOptions.hh:48
transport
Definition: ConnectionManager.hh:35
void ProcessPublishers()
Process all publishers, which has each publisher send it's most recent message over the wire.
transport::PublisherPtr Advertise(const std::string &_topic, unsigned int _queueLimit=1000, double _hzRate=0)
Adverise a topic.
Definition: Node.hh:177
SubscriberPtr Subscribe(const std::string &_topic, void(T::*_fp)(const std::string &), T *_obj, bool _latching=false)
Subscribe to a topic using a class method as the callback.
Definition: Node.hh:260
boost::shared_ptr< CallbackHelper > CallbackHelperPtr
boost shared pointer to transport::CallbackHelper
Definition: CallbackHelper.hh:105
std::string GetTopicNamespace() const
Get the topic namespace for this node.
bool HasLatchedSubscriber(const std::string &_topic) const
Return true if a subscriber on a specific topic is latched.
SubscriberPtr Subscribe(const std::string &_topic, void(T::*_fp)(const boost::shared_ptr< M const > &), T *_obj, bool _latching=false)
Subscribe to a topic using a class method as the callback.
Definition: Node.hh:201
void Fini()
Finalize the node.
bool IsInitialized() const
Check if this Node has been initialized.
Used to connect publishers to subscribers, where the subscriber wants the raw data from the publisher...
Definition: CallbackHelper.hh:177