AutoAPMS
Streamlining behaviors in ROS 2
Loading...
Searching...
No Matches
ros_subscriber_node.hpp
1// Copyright 2024 Robin Müller
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#pragma once
16
17#include <algorithm>
18#include <functional>
19#include <memory>
20#include <mutex>
21#include <string>
22#include <unordered_map>
23#include <vector>
24
25#include "auto_apms_behavior_tree_core/exceptions.hpp"
26#include "auto_apms_behavior_tree_core/node/ros_node_context.hpp"
27#include "auto_apms_util/string.hpp"
28#include "behaviortree_cpp/condition_node.h"
29#include "rclcpp/executors.hpp"
30#include "rclcpp/qos.hpp"
31
33{
34
63template <class MessageT>
64class RosSubscriberNode : public BT::ConditionNode
65{
66 using Subscriber = typename rclcpp::Subscription<MessageT>;
67
68 struct SubscriberInstance
69 {
70 SubscriberInstance(
71 rclcpp::Node::SharedPtr node, rclcpp::CallbackGroup::SharedPtr group, const std::string & topic_name,
72 const rclcpp::QoS & qos);
73
74 std::shared_ptr<Subscriber> subscriber;
75 std::vector<std::pair<const void *, std::function<void(const std::shared_ptr<MessageT>)>>> callbacks;
76 std::shared_ptr<MessageT> last_msg;
77 std::string name;
78
79 void addCallback(
80 const void * callback_owner, const std::function<void(const std::shared_ptr<MessageT>)> & callback);
81 void removeCallback(const void * callback_owner);
82 void broadcast(const std::shared_ptr<MessageT> & msg);
83 };
84
85 using SubscribersRegistry = std::unordered_map<std::string, std::weak_ptr<SubscriberInstance>>;
86
87public:
88 using MessageType = MessageT;
89 using Config = BT::NodeConfig;
90 using Context = RosNodeContext;
91
102 explicit RosSubscriberNode(
103 const std::string & instance_name, const Config & config, Context context, const rclcpp::QoS & qos = {10});
104
105 virtual ~RosSubscriberNode()
106 {
107 if (sub_instance_) {
108 sub_instance_->removeCallback(this);
109 }
110 }
111
119 static BT::PortsList providedBasicPorts(BT::PortsList addition)
120 {
121 BT::PortsList basic = {BT::InputPort<std::string>("topic", "Name of the ROS 2 topic to subscribe to.")};
122 basic.insert(addition.begin(), addition.end());
123 return basic;
124 }
125
130 static BT::PortsList providedPorts() { return providedBasicPorts({}); }
131
142 virtual BT::NodeStatus onTick(const std::shared_ptr<MessageT> & last_msg_ptr);
143
153 virtual BT::NodeStatus onMessageReceived(const MessageT & msg);
154
160 bool createSubscriber(const std::string & topic_name);
161
166 std::string getTopicName() const;
167
168protected:
169 const Context context_;
170 const rclcpp::Logger logger_;
171
172 BT::NodeStatus tick() override final;
173
174private:
175 static std::mutex & registryMutex();
176
177 // contains the fully-qualified name of the node and the name of the topic
178 static SubscribersRegistry & getRegistry();
179
180 const rclcpp::QoS qos_;
181 std::string topic_name_;
182 bool dynamic_client_instance_ = false;
183 std::shared_ptr<SubscriberInstance> sub_instance_;
184 std::shared_ptr<MessageT> last_msg_;
185 std::string subscriber_key_;
186};
187
188// #####################################################################################################################
189// ################################ DEFINITIONS ##############################################
190// #####################################################################################################################
191
192template <class MessageT>
193inline RosSubscriberNode<MessageT>::SubscriberInstance::SubscriberInstance(
194 rclcpp::Node::SharedPtr node, rclcpp::CallbackGroup::SharedPtr group, const std::string & topic_name,
195 const rclcpp::QoS & qos)
196{
197 rclcpp::SubscriptionOptions option;
198 option.callback_group = group;
199
200 // The callback will broadcast to all the instances of RosSubscriberNode<MessageT>
201 auto callback = [this](const std::shared_ptr<MessageT> msg) {
202 this->last_msg = msg;
203 this->broadcast(msg);
204 };
205 subscriber = node->create_subscription<MessageT>(topic_name, qos, callback, option);
206 name = topic_name;
207}
208
209template <class MessageT>
210inline void RosSubscriberNode<MessageT>::SubscriberInstance::addCallback(
211 const void * callback_owner, const std::function<void(const std::shared_ptr<MessageT>)> & callback)
212{
213 callbacks.emplace_back(callback_owner, callback);
214}
215
216template <class MessageT>
217inline void RosSubscriberNode<MessageT>::SubscriberInstance::removeCallback(const void * callback_owner)
218{
219 callbacks.erase(
220 std::remove_if(
221 callbacks.begin(), callbacks.end(), [callback_owner](const auto & pair) { return pair.first == callback_owner; }),
222 callbacks.end());
223}
224
225template <class MessageT>
226inline void RosSubscriberNode<MessageT>::SubscriberInstance::broadcast(const std::shared_ptr<MessageT> & msg)
227{
228 for (auto & callback_pair : callbacks) {
229 callback_pair.second(msg);
230 }
231}
232
233template <class MessageT>
235 const std::string & instance_name, const Config & config, Context context, const rclcpp::QoS & qos)
236: BT::ConditionNode{instance_name, config},
237 context_{context},
238 logger_(context.getChildLogger(auto_apms_util::toSnakeCase(instance_name))),
239 qos_{qos}
240{
241 // Consider aliasing in ports and copy the values from aliased to original ports
242 this->modifyPortsRemapping(context_.copyAliasedPortValuesToOriginalPorts(this));
243
244 if (const BT::Expected<std::string> expected_name = context_.getTopicName(this)) {
245 createSubscriber(expected_name.value());
246 } else {
247 // We assume that determining the topic name requires a blackboard pointer, which cannot be evaluated at
248 // construction time. The expression will be evaluated each time before the node is ticked the first time after
249 // successful execution.
250 dynamic_client_instance_ = true;
251 }
252}
253
254template <class MessageT>
255inline bool RosSubscriberNode<MessageT>::createSubscriber(const std::string & topic_name)
256{
257 if (topic_name.empty()) {
258 throw exceptions::RosNodeError(
259 context_.getFullyQualifiedTreeNodeName(this) + " - Argument topic_name is empty when trying to create a client.");
260 }
261
262 // Check if the subscriber with given name is already set up
263 if (sub_instance_ && topic_name == sub_instance_->name) return true;
264
265 std::unique_lock lk(registryMutex());
266
267 rclcpp::Node::SharedPtr node = context_.nh_.lock();
268 if (!node) {
269 throw exceptions::RosNodeError(
270 context_.getFullyQualifiedTreeNodeName(this) +
271 " - The weak pointer to the ROS 2 node expired. The tree node doesn't "
272 "take ownership of it.");
273 }
274 rclcpp::CallbackGroup::SharedPtr group = context_.cb_group_.lock();
275 if (!group) {
276 throw exceptions::RosNodeError(
277 context_.getFullyQualifiedTreeNodeName(this) +
278 " - The weak pointer to the ROS 2 callback group expired. The tree node doesn't "
279 "take ownership of it.");
280 }
281 subscriber_key_ = std::string(node->get_fully_qualified_name()) + "/" + topic_name;
282
283 auto & registry = getRegistry();
284 auto it = registry.find(subscriber_key_);
285 if (it == registry.end() || it->second.expired()) {
286 sub_instance_ = std::make_shared<SubscriberInstance>(node, group, topic_name, qos_);
287 registry.insert({subscriber_key_, sub_instance_});
288 RCLCPP_DEBUG(
289 logger_, "%s - Created subscriber for topic '%s'.", context_.getFullyQualifiedTreeNodeName(this).c_str(),
290 topic_name.c_str());
291 } else {
292 sub_instance_ = it->second.lock();
293 }
294
295 // Check if there was a message received before the creation of this subscriber action
296 if (sub_instance_->last_msg) {
297 last_msg_ = sub_instance_->last_msg;
298 }
299
300 // add "this" as received of the broadcaster
301 sub_instance_->addCallback(this, [this](const std::shared_ptr<MessageT> msg) { last_msg_ = msg; });
302
303 return true;
304}
305
306template <class MessageT>
307inline BT::NodeStatus RosSubscriberNode<MessageT>::tick()
308{
309 if (!rclcpp::ok()) {
310 halt();
311 return BT::NodeStatus::FAILURE;
312 }
313
314 // If client has been set up in derived constructor, event though this constructor couldn't, we discard the intention
315 // of dynamically creating the client
316 if (dynamic_client_instance_ && sub_instance_) {
317 dynamic_client_instance_ = false;
318 }
319
320 // Try again to create the client on first tick if this was not possible during construction or if client should be
321 // created from a blackboard entry on the start of every iteration
322 if (status() == BT::NodeStatus::IDLE && dynamic_client_instance_) {
323 const BT::Expected<std::string> expected_name = context_.getTopicName(this);
324 if (expected_name) {
325 createSubscriber(expected_name.value());
326 } else {
327 throw exceptions::RosNodeError(
328 context_.getFullyQualifiedTreeNodeName(this) +
329 " - Cannot create the subscriber because the topic name couldn't be resolved using "
330 "the expression specified by the node's registration parameters (" +
331 NodeRegistrationOptions::PARAM_NAME_ROS2TOPIC + ": " + context_.registration_options_.topic +
332 "). Error message: " + expected_name.error());
333 }
334 }
335
336 if (!sub_instance_) {
337 throw exceptions::RosNodeError(context_.getFullyQualifiedTreeNodeName(this) + " - sub_instance_ is nullptr.");
338 }
339
340 auto check_status = [this](BT::NodeStatus status) {
341 if (!isStatusCompleted(status)) {
342 throw exceptions::RosNodeError(
343 context_.getFullyQualifiedTreeNodeName(this) + " - The callback must return either SUCCESS or FAILURE.");
344 }
345 return status;
346 };
347 auto status = check_status(onTick(last_msg_));
348 last_msg_.reset();
349 return status;
350}
351
352template <class MessageT>
353inline BT::NodeStatus RosSubscriberNode<MessageT>::onTick(const std::shared_ptr<MessageT> & last_msg_ptr)
354{
355 if (!last_msg_ptr) return BT::NodeStatus::FAILURE;
356 return onMessageReceived(*last_msg_ptr);
357}
358
359template <class MessageT>
360inline BT::NodeStatus RosSubscriberNode<MessageT>::onMessageReceived(const MessageT & /*msg*/)
361{
362 return BT::NodeStatus::SUCCESS;
363}
364
365template <class MessageT>
367{
368 if (sub_instance_) return sub_instance_->name;
369 return "unkown";
370}
371
372template <class MessageT>
373inline std::mutex & RosSubscriberNode<MessageT>::registryMutex()
374{
375 static std::mutex sub_mutex;
376 return sub_mutex;
377}
378
379template <class MessageT>
380inline typename RosSubscriberNode<MessageT>::SubscribersRegistry & RosSubscriberNode<MessageT>::getRegistry()
381{
382 static SubscribersRegistry subscribers_registry;
383 return subscribers_registry;
384}
385
386} // namespace auto_apms_behavior_tree::core
Additional parameters specific to ROS 2 determined at runtime by TreeBuilder.
Generic behavior tree node wrapper for a ROS 2 subscriber.
RosSubscriberNode(const std::string &instance_name, const Config &config, Context context, const rclcpp::QoS &qos={10})
Constructor.
std::string getTopicName() const
Get the name of the topic name this node subscribes to.
virtual BT::NodeStatus onMessageReceived(const MessageT &msg)
Callback invoked when the node is ticked and a valid message has been received.
static BT::PortsList providedBasicPorts(BT::PortsList addition)
Derived nodes implementing the static method RosSubscriberNode::providedPorts may call this method to...
bool createSubscriber(const std::string &topic_name)
Create the ROS 2 subscriber.
static BT::PortsList providedPorts()
If a behavior tree requires input/output data ports, the developer must define this method accordingl...
virtual BT::NodeStatus onTick(const std::shared_ptr< MessageT > &last_msg_ptr)
Callback invoked when the node is ticked.
Core API for AutoAPMS's behavior tree implementation.
Definition behavior.hpp:32
Fundamental helper classes and utility functions.