22#include <unordered_map>
25#include "auto_apms_behavior_tree_core/exceptions.hpp"
26#include "auto_apms_behavior_tree_core/node/ros_node_context.hpp"
27#include "auto_apms_util/string.hpp"
28#include "behaviortree_cpp/condition_node.h"
29#include "rclcpp/executors.hpp"
30#include "rclcpp/qos.hpp"
63template <
class MessageT>
66 using Subscriber =
typename rclcpp::Subscription<MessageT>;
68 struct SubscriberInstance
71 rclcpp::Node::SharedPtr node, rclcpp::CallbackGroup::SharedPtr group,
const std::string & topic_name,
72 const rclcpp::QoS & qos);
74 std::shared_ptr<Subscriber> subscriber;
75 std::vector<std::pair<
const void *, std::function<void(
const std::shared_ptr<MessageT>)>>> callbacks;
76 std::shared_ptr<MessageT> last_msg;
80 const void * callback_owner,
const std::function<
void(
const std::shared_ptr<MessageT>)> & callback);
81 void removeCallback(
const void * callback_owner);
82 void broadcast(
const std::shared_ptr<MessageT> & msg);
85 using SubscribersRegistry = std::unordered_map<std::string, std::weak_ptr<SubscriberInstance>>;
88 using MessageType = MessageT;
89 using Config = BT::NodeConfig;
103 const std::string & instance_name,
const Config & config, Context context,
const rclcpp::QoS & qos = {10});
108 sub_instance_->removeCallback(
this);
121 BT::PortsList basic = {BT::InputPort<std::string>(
"topic",
"Name of the ROS 2 topic to subscribe to.")};
122 basic.insert(addition.begin(), addition.end());
142 virtual BT::NodeStatus
onTick(
const std::shared_ptr<MessageT> & last_msg_ptr);
169 const Context context_;
170 const rclcpp::Logger logger_;
172 BT::NodeStatus tick() override final;
175 static std::mutex & registryMutex();
178 static SubscribersRegistry & getRegistry();
180 const rclcpp::QoS qos_;
181 std::
string topic_name_;
182 bool dynamic_client_instance_ = false;
183 std::shared_ptr<SubscriberInstance> sub_instance_;
184 std::shared_ptr<MessageT> last_msg_;
185 std::
string subscriber_key_;
192template <class MessageT>
194 rclcpp::Node::SharedPtr node, rclcpp::CallbackGroup::SharedPtr group, const std::
string & topic_name,
195 const rclcpp::QoS & qos)
197 rclcpp::SubscriptionOptions option;
198 option.callback_group = group;
201 auto callback = [
this](
const std::shared_ptr<MessageT> msg) {
202 this->last_msg = msg;
203 this->broadcast(msg);
205 subscriber = node->create_subscription<MessageT>(topic_name, qos, callback, option);
209template <
class MessageT>
210inline void RosSubscriberNode<MessageT>::SubscriberInstance::addCallback(
211 const void * callback_owner,
const std::function<
void(
const std::shared_ptr<MessageT>)> & callback)
213 callbacks.emplace_back(callback_owner, callback);
216template <
class MessageT>
221 callbacks.begin(), callbacks.end(), [callback_owner](
const auto & pair) { return pair.first == callback_owner; }),
225template <
class MessageT>
228 for (
auto & callback_pair : callbacks) {
229 callback_pair.second(msg);
233template <
class MessageT>
235 const std::string & instance_name,
const Config & config, Context context,
const rclcpp::QoS & qos)
236: BT::ConditionNode{instance_name, config},
238 logger_(context.getChildLogger(
auto_apms_util::toSnakeCase(instance_name))),
242 this->modifyPortsRemapping(context_.copyAliasedPortValuesToOriginalPorts(
this));
244 if (
const BT::Expected<std::string> expected_name = context_.getTopicName(
this)) {
250 dynamic_client_instance_ =
true;
254template <
class MessageT>
257 if (topic_name.empty()) {
258 throw exceptions::RosNodeError(
259 context_.getFullyQualifiedTreeNodeName(
this) +
" - Argument topic_name is empty when trying to create a client.");
263 if (sub_instance_ && topic_name == sub_instance_->name)
return true;
265 std::unique_lock lk(registryMutex());
267 rclcpp::Node::SharedPtr node = context_.nh_.lock();
269 throw exceptions::RosNodeError(
270 context_.getFullyQualifiedTreeNodeName(
this) +
271 " - The weak pointer to the ROS 2 node expired. The tree node doesn't "
272 "take ownership of it.");
274 rclcpp::CallbackGroup::SharedPtr group = context_.cb_group_.lock();
276 throw exceptions::RosNodeError(
277 context_.getFullyQualifiedTreeNodeName(
this) +
278 " - The weak pointer to the ROS 2 callback group expired. The tree node doesn't "
279 "take ownership of it.");
281 subscriber_key_ = std::string(node->get_fully_qualified_name()) +
"/" + topic_name;
283 auto & registry = getRegistry();
284 auto it = registry.find(subscriber_key_);
285 if (it == registry.end() || it->second.expired()) {
286 sub_instance_ = std::make_shared<SubscriberInstance>(node, group, topic_name, qos_);
287 registry.insert({subscriber_key_, sub_instance_});
289 logger_,
"%s - Created subscriber for topic '%s'.", context_.getFullyQualifiedTreeNodeName(
this).c_str(),
292 sub_instance_ = it->second.lock();
296 if (sub_instance_->last_msg) {
297 last_msg_ = sub_instance_->last_msg;
301 sub_instance_->addCallback(
this, [
this](
const std::shared_ptr<MessageT> msg) { last_msg_ = msg; });
306template <
class MessageT>
307inline BT::NodeStatus RosSubscriberNode<MessageT>::tick()
311 return BT::NodeStatus::FAILURE;
316 if (dynamic_client_instance_ && sub_instance_) {
317 dynamic_client_instance_ =
false;
322 if (status() == BT::NodeStatus::IDLE && dynamic_client_instance_) {
323 const BT::Expected<std::string> expected_name = context_.getTopicName(
this);
325 createSubscriber(expected_name.value());
327 throw exceptions::RosNodeError(
328 context_.getFullyQualifiedTreeNodeName(
this) +
329 " - Cannot create the subscriber because the topic name couldn't be resolved using "
330 "the expression specified by the node's registration parameters (" +
331 NodeRegistrationOptions::PARAM_NAME_ROS2TOPIC +
": " + context_.registration_options_.topic +
332 "). Error message: " + expected_name.error());
336 if (!sub_instance_) {
337 throw exceptions::RosNodeError(context_.getFullyQualifiedTreeNodeName(
this) +
" - sub_instance_ is nullptr.");
340 auto check_status = [
this](BT::NodeStatus status) {
341 if (!isStatusCompleted(status)) {
342 throw exceptions::RosNodeError(
343 context_.getFullyQualifiedTreeNodeName(
this) +
" - The callback must return either SUCCESS or FAILURE.");
347 auto status = check_status(onTick(last_msg_));
352template <
class MessageT>
355 if (!last_msg_ptr)
return BT::NodeStatus::FAILURE;
359template <
class MessageT>
362 return BT::NodeStatus::SUCCESS;
365template <
class MessageT>
368 if (sub_instance_)
return sub_instance_->name;
372template <
class MessageT>
373inline std::mutex & RosSubscriberNode<MessageT>::registryMutex()
375 static std::mutex sub_mutex;
379template <
class MessageT>
380inline typename RosSubscriberNode<MessageT>::SubscribersRegistry & RosSubscriberNode<MessageT>::getRegistry()
382 static SubscribersRegistry subscribers_registry;
383 return subscribers_registry;
Additional parameters specific to ROS 2 determined at runtime by TreeBuilder.
Generic behavior tree node wrapper for a ROS 2 subscriber.
RosSubscriberNode(const std::string &instance_name, const Config &config, Context context, const rclcpp::QoS &qos={10})
Constructor.
std::string getTopicName() const
Get the name of the topic name this node subscribes to.
virtual BT::NodeStatus onMessageReceived(const MessageT &msg)
Callback invoked when the node is ticked and a valid message has been received.
static BT::PortsList providedBasicPorts(BT::PortsList addition)
Derived nodes implementing the static method RosSubscriberNode::providedPorts may call this method to...
bool createSubscriber(const std::string &topic_name)
Create the ROS 2 subscriber.
static BT::PortsList providedPorts()
If a behavior tree requires input/output data ports, the developer must define this method accordingl...
virtual BT::NodeStatus onTick(const std::shared_ptr< MessageT > &last_msg_ptr)
Callback invoked when the node is ticked.
Core API for AutoAPMS's behavior tree implementation.
Fundamental helper classes and utility functions.