From 5d5bf85c358c9b336560b40f9a2ccde21500acf8 Mon Sep 17 00:00:00 2001 From: Patrick Roncagliolo Date: Mon, 23 Feb 2026 21:44:19 +0100 Subject: [PATCH] Initial implementation for GenericSubscriber support in IntraProcessManager Signed-off-by: Patrick Roncagliolo --- .../experimental/intra_process_manager.hpp | 459 ++++++++++++++++-- .../include/rclcpp/generic_subscription.hpp | 51 ++ rclcpp/src/rclcpp/generic_subscription.cpp | 6 + rclcpp/src/rclcpp/intra_process_manager.cpp | 140 +++++- rclcpp/test/rclcpp/test_generic_pubsub.cpp | 82 +++- 5 files changed, 680 insertions(+), 58 deletions(-) diff --git a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp index a403c20c8b..3cd7df9fa5 100644 --- a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp @@ -148,6 +148,44 @@ class IntraProcessManager return sub_id; } + template< + typename ROSMessageType, + typename Alloc = std::allocator + > + uint64_t + add_generic_subscription( + rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription) + { + std::unique_lock lock(mutex_); + + uint64_t sub_id = IntraProcessManager::get_next_unique_id(); + + generic_subscriptions_[sub_id] = subscription; + + // adds the subscription id to all the matchable publishers + for (auto & pair : publishers_) { + auto publisher = pair.second.lock(); + if (!publisher) { + continue; + } + if (can_communicate(publisher, subscription)) { + uint64_t pub_id = pair.first; + insert_generic_sub_id_for_pub(sub_id, pub_id, subscription->use_take_shared_method()); + if (publisher->is_durability_transient_local() && + subscription->is_durability_transient_local()) + { + // FIXME PATRICK this shall be investigated + do_transient_local_publish( + pub_id, sub_id, + subscription->use_take_shared_method()); + } + } + } + + return sub_id; + } + + /// Unregister a subscription using the subscription's unique id. /** * This method does not allocate memory. @@ -230,51 +268,148 @@ class IntraProcessManager std::shared_lock lock(mutex_); - auto publisher_it = pub_to_subs_.find(intra_process_publisher_id); - if (publisher_it == pub_to_subs_.end()) { - // Publisher is either invalid or no longer exists. - RCLCPP_WARN( - rclcpp::get_logger("rclcpp"), - "Calling do_intra_process_publish for invalid or no longer existing publisher id"); - return; + bool a = false; + bool b = false; + + auto publisher_it = pub_to_generic_subs_.find(intra_process_publisher_id); + if (publisher_it != pub_to_generic_subs_.end()) { + const auto & sub_ids = publisher_it->second; + b = true; + using AllocatorT = std::allocator; + using GMessageT = rclcpp::SerializedMessage; + using GSubscribedTypeAllocatorTraits = allocator::AllocRebind; + using GAlloc = typename GSubscribedTypeAllocatorTraits::allocator_type; + using GDeleter = allocator::Deleter; + using ROSMessageTypeAllocatorTraits = allocator::AllocRebind; + using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type; + using ROSMessageTypeDeleter = allocator::Deleter; + + GAlloc gallocator; + + if (sub_ids.take_ownership_subscriptions.empty()) { + rclcpp::Serialization serializer; + auto ser_msg = std::make_shared(); + if constexpr (rclcpp::TypeAdapter::is_specialized::value) { + ROSMessageTypeAllocator ros_message_alloc(allocator); + auto ptr = ROSMessageTypeAllocatorTraits::allocate(ros_message_alloc, 1); + ROSMessageTypeAllocatorTraits::construct(ros_message_alloc, ptr); + ROSMessageTypeDeleter deleter; + allocator::set_allocator_for_deleter(&deleter, &allocator); + rclcpp::TypeAdapter::convert_to_ros_message(*message, *ptr); + serializer.serialize_message(ptr, ser_msg.get()); + } else { + serializer.serialize_message(message.get(), ser_msg.get()); + } + this->template add_shared_msg_to_generic_buffers( + ser_msg, sub_ids.take_shared_subscriptions); + } else if (!sub_ids.take_ownership_subscriptions.empty() && // NOLINT + sub_ids.take_shared_subscriptions.size() <= 1) + { + // There is at maximum 1 buffer that does not require ownership. + // So this case is equivalent to all the buffers requiring ownership + + rclcpp::Serialization serializer; + auto ser_msg = std::make_unique(); + if constexpr (rclcpp::TypeAdapter::is_specialized::value) { + ROSMessageTypeAllocator ros_message_alloc(allocator); + auto ptr = ROSMessageTypeAllocatorTraits::allocate(ros_message_alloc, 1); + ROSMessageTypeAllocatorTraits::construct(ros_message_alloc, ptr); + ROSMessageTypeDeleter deleter; + allocator::set_allocator_for_deleter(&deleter, &allocator); + rclcpp::TypeAdapter::convert_to_ros_message(*message, *ptr); + serializer.serialize_message(ptr, ser_msg.get()); + } else { + serializer.serialize_message(message.get(), ser_msg.get()); + } + + // Merge the two vector of ids into a unique one + std::vector concatenated_vector( + sub_ids.take_shared_subscriptions.begin(), sub_ids.take_shared_subscriptions.end()); + concatenated_vector.insert( + concatenated_vector.end(), + sub_ids.take_ownership_subscriptions.begin(), + sub_ids.take_ownership_subscriptions.end()); + this->template add_owned_msg_to_generic_buffers( + std::move(ser_msg), + concatenated_vector, + gallocator); + } else if (!sub_ids.take_ownership_subscriptions.empty() && // NOLINT + sub_ids.take_shared_subscriptions.size() > 1) + { + // Construct a new shared pointer from the message + // for the buffers that do not require ownership + rclcpp::Serialization serializer; + auto ser_msg = std::make_shared(); + auto ser_msg_u = std::make_unique(); + if constexpr (rclcpp::TypeAdapter::is_specialized::value) { + ROSMessageTypeAllocator ros_message_alloc(allocator); + auto ptr = ROSMessageTypeAllocatorTraits::allocate(ros_message_alloc, 1); + ROSMessageTypeAllocatorTraits::construct(ros_message_alloc, ptr); + ROSMessageTypeDeleter deleter; + allocator::set_allocator_for_deleter(&deleter, &allocator); + rclcpp::TypeAdapter::convert_to_ros_message(*message, *ptr); + serializer.serialize_message(ptr, ser_msg.get()); + serializer.serialize_message(ptr, ser_msg_u.get()); + } else { + serializer.serialize_message(message.get(), ser_msg.get()); + serializer.serialize_message(message.get(), ser_msg_u.get()); + } + + this->template add_shared_msg_to_generic_buffers( + ser_msg, sub_ids.take_shared_subscriptions); + this->template add_owned_msg_to_generic_buffers( + std::move(ser_msg_u), sub_ids.take_ownership_subscriptions, gallocator); + } } - const auto & sub_ids = publisher_it->second; - if (sub_ids.take_ownership_subscriptions.empty()) { - // None of the buffers require ownership, so we promote the pointer - std::shared_ptr msg = std::move(message); - - this->template add_shared_msg_to_buffers( - msg, sub_ids.take_shared_subscriptions); - } else if (!sub_ids.take_ownership_subscriptions.empty() && // NOLINT - sub_ids.take_shared_subscriptions.size() <= 1) - { - // There is at maximum 1 buffer that does not require ownership. - // So this case is equivalent to all the buffers requiring ownership - - // Merge the two vector of ids into a unique one - std::vector concatenated_vector( - sub_ids.take_shared_subscriptions.begin(), sub_ids.take_shared_subscriptions.end()); - concatenated_vector.insert( - concatenated_vector.end(), - sub_ids.take_ownership_subscriptions.begin(), - sub_ids.take_ownership_subscriptions.end()); - this->template add_owned_msg_to_buffers( - std::move(message), - concatenated_vector, - allocator); - } else if (!sub_ids.take_ownership_subscriptions.empty() && // NOLINT - sub_ids.take_shared_subscriptions.size() > 1) - { - // Construct a new shared pointer from the message - // for the buffers that do not require ownership - auto shared_msg = std::allocate_shared(allocator, *message); + publisher_it = pub_to_subs_.find(intra_process_publisher_id); + if (publisher_it != pub_to_subs_.end()) { + a = true; + const auto & sub_ids = publisher_it->second; + + if (sub_ids.take_ownership_subscriptions.empty()) { + // None of the buffers require ownership, so we promote the pointer + std::shared_ptr msg = std::move(message); + this->template add_shared_msg_to_buffers( + msg, sub_ids.take_shared_subscriptions); + } else if (!sub_ids.take_ownership_subscriptions.empty() && // NOLINT + sub_ids.take_shared_subscriptions.size() <= 1) + { + // There is at maximum 1 buffer that does not require ownership. + // So this case is equivalent to all the buffers requiring ownership + + // Merge the two vector of ids into a unique one + std::vector concatenated_vector( + sub_ids.take_shared_subscriptions.begin(), sub_ids.take_shared_subscriptions.end()); + concatenated_vector.insert( + concatenated_vector.end(), + sub_ids.take_ownership_subscriptions.begin(), + sub_ids.take_ownership_subscriptions.end()); + this->template add_owned_msg_to_buffers( + std::move(message), + concatenated_vector, + allocator); + } else if (!sub_ids.take_ownership_subscriptions.empty() && // NOLINT + sub_ids.take_shared_subscriptions.size() > 1) + { + // Construct a new shared pointer from the message + // for the buffers that do not require ownership + auto shared_msg = std::allocate_shared(allocator, *message); + + this->template add_shared_msg_to_buffers( + shared_msg, sub_ids.take_shared_subscriptions); + this->template add_owned_msg_to_buffers( + std::move(message), sub_ids.take_ownership_subscriptions, allocator); + } + } - this->template add_shared_msg_to_buffers( - shared_msg, sub_ids.take_shared_subscriptions); - this->template add_owned_msg_to_buffers( - std::move(message), sub_ids.take_ownership_subscriptions, allocator); + if (a || b) { + return; } + // Publisher is either invalid or no longer exists. + RCLCPP_WARN( + rclcpp::get_logger("rclcpp"), + "Calling do_intra_process_publish for invalid or no longer existing publisher id"); } template< @@ -294,7 +429,71 @@ class IntraProcessManager std::shared_lock lock(mutex_); - auto publisher_it = pub_to_subs_.find(intra_process_publisher_id); + auto publisher_it = pub_to_generic_subs_.find(intra_process_publisher_id); + if (publisher_it != pub_to_generic_subs_.end()) { + const auto & sub_ids = publisher_it->second; + using AllocatorT = std::allocator; + using GMessageT = rclcpp::SerializedMessage; + using GSubscribedTypeAllocatorTraits = allocator::AllocRebind; + using GAlloc = typename GSubscribedTypeAllocatorTraits::allocator_type; + using GDeleter = allocator::Deleter; + using ROSMessageTypeAllocatorTraits = allocator::AllocRebind; + using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type; + using ROSMessageTypeDeleter = allocator::Deleter; + GAlloc gallocator; + + if (sub_ids.take_ownership_subscriptions.empty()) { + rclcpp::Serialization serializer; + auto ser_msg = std::make_shared(); + + if constexpr (rclcpp::TypeAdapter::is_specialized::value) { + ROSMessageTypeAllocator ros_message_alloc(allocator); + auto ptr = ROSMessageTypeAllocatorTraits::allocate(ros_message_alloc, 1); + ROSMessageTypeAllocatorTraits::construct(ros_message_alloc, ptr); + ROSMessageTypeDeleter deleter; + allocator::set_allocator_for_deleter(&deleter, &allocator); + rclcpp::TypeAdapter::convert_to_ros_message(message.get(), + *ptr); + serializer.serialize_message(ptr, ser_msg.get()); + } else { + serializer.serialize_message(message.get(), ser_msg.get()); + } + + if (!sub_ids.take_shared_subscriptions.empty()) { + this->template add_shared_msg_to_generic_buffers( + ser_msg, sub_ids.take_shared_subscriptions); + } + } else { + rclcpp::Serialization serializer; + auto ser_msg = std::make_shared(); + auto ser_msg_u = std::make_unique(); + if constexpr (rclcpp::TypeAdapter::is_specialized::value) { + ROSMessageTypeAllocator ros_message_alloc(allocator); + auto ptr = ROSMessageTypeAllocatorTraits::allocate(ros_message_alloc, 1); + ROSMessageTypeAllocatorTraits::construct(ros_message_alloc, ptr); + ROSMessageTypeDeleter deleter; + allocator::set_allocator_for_deleter(&deleter, &allocator); + rclcpp::TypeAdapter::convert_to_ros_message(message.get(), + *ptr); + serializer.serialize_message(ptr, ser_msg.get()); + serializer.serialize_message(ptr, ser_msg_u.get()); + } else { + serializer.serialize_message(message.get(), ser_msg.get()); + serializer.serialize_message(message.get(), ser_msg_u.get()); + } + + if (!sub_ids.take_shared_subscriptions.empty()) { + this->template add_shared_msg_to_generic_buffers( + ser_msg, sub_ids.take_shared_subscriptions); + } + if (!sub_ids.take_ownership_subscriptions.empty()) { + this->template add_owned_msg_to_generic_buffers( + std::move(ser_msg_u), sub_ids.take_ownership_subscriptions, gallocator); + } + } + } + + publisher_it = pub_to_subs_.find(intra_process_publisher_id); if (publisher_it == pub_to_subs_.end()) { // Publisher is either invalid or no longer exists. RCLCPP_WARN( @@ -407,6 +606,10 @@ class IntraProcessManager void insert_sub_id_for_pub(uint64_t sub_id, uint64_t pub_id, bool use_take_shared_method); + RCLCPP_PUBLIC + void + insert_generic_sub_id_for_pub(uint64_t sub_id, uint64_t pub_id, bool use_take_shared_method); + RCLCPP_PUBLIC bool can_communicate( @@ -636,8 +839,180 @@ class IntraProcessManager } } + + template< + typename Alloc, + typename Deleter, + typename ROSMessageType> + void + add_shared_msg_to_generic_buffers( + std::shared_ptr message, + std::vector subscription_ids) + { + /* + using ROSMessageTypeAllocatorTraits = allocator::AllocRebind; + using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type; + using ROSMessageTypeDeleter = allocator::Deleter; + */ + using MessageT = rclcpp::SerializedMessage; + using AllocatorT = std::allocator; + using SubscribedTypeAllocatorTraits = allocator::AllocRebind; + using SubscribedTypeAllocator = typename SubscribedTypeAllocatorTraits::allocator_type; + using SubscribedTypeDeleter = allocator::Deleter; + + for (auto id : subscription_ids) { + auto subscription_it = generic_subscriptions_.find(id); + if (subscription_it == generic_subscriptions_.end()) { + throw std::runtime_error("subscription has unexpectedly gone out of scope"); + } + auto subscription_base = subscription_it->second.lock(); + if (subscription_base == nullptr) { + generic_subscriptions_.erase(id); + continue; + } + + auto subscription = std::dynamic_pointer_cast< + rclcpp::experimental::SubscriptionIntraProcessBuffer< + MessageT, + SubscribedTypeAllocator, + SubscribedTypeDeleter, + MessageT> + >(subscription_base); + if (subscription != nullptr) { + subscription->provide_intra_process_data(message); + continue; + } else { + throw std::runtime_error( + "failed to dynamic cast SubscriptionIntraProcessBase to " + "SubscriptionIntraProcessBuffer " + "which can happen when the publisher and " + "subscription use different allocator types, which is not supported"); + } + + /* + auto ros_message_subscription = std::dynamic_pointer_cast< + rclcpp::experimental::SubscriptionROSMsgIntraProcessBuffer + >(subscription_base); + if (nullptr == ros_message_subscription) { + throw std::runtime_error( + "failed to dynamic cast SubscriptionIntraProcessBase to " + "SubscriptionIntraProcessBuffer, or to " + "SubscriptionROSMsgIntraProcessBuffer which can happen when the publisher and " + "subscription use different allocator types, which is not supported"); + } + + ros_message_subscription->provide_intra_process_message(message); + */ + } + } + + template< + typename Alloc, + typename Deleter, + typename ROSMessageType> + void + add_owned_msg_to_generic_buffers( + std::unique_ptr message, + std::vector subscription_ids, + typename allocator::AllocRebind::allocator_type & allocator) + { + using MessageT = rclcpp::SerializedMessage; + using MessageAllocTraits = allocator::AllocRebind; + using MessageUniquePtr = std::unique_ptr; + /* + using ROSMessageTypeAllocatorTraits = allocator::AllocRebind; + using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type; + using ROSMessageTypeDeleter = allocator::Deleter; + */ + using AllocatorT = std::allocator; + using SubscribedTypeAllocatorTraits = allocator::AllocRebind; + using SubscribedTypeAllocator = typename SubscribedTypeAllocatorTraits::allocator_type; + using SubscribedTypeDeleter = allocator::Deleter; + + for (auto it = subscription_ids.begin(); it != subscription_ids.end(); it++) { + auto subscription_it = generic_subscriptions_.find(*it); + if (subscription_it == generic_subscriptions_.end()) { + throw std::runtime_error("subscription has unexpectedly gone out of scope"); + } + auto subscription_base = subscription_it->second.lock(); + if (subscription_base == nullptr) { + generic_subscriptions_.erase(subscription_it); + continue; + } + + auto subscription = std::dynamic_pointer_cast< + rclcpp::experimental::SubscriptionIntraProcessBuffer + >(subscription_base); + if (subscription != nullptr) { + if (std::next(it) == subscription_ids.end()) { + // If this is the last subscription, give up ownership + subscription->provide_intra_process_data(std::move(message)); + // Last message delivered, break from for loop + break; + } else { + // Copy the message since we have additional subscriptions to serve + Deleter deleter = message.get_deleter(); + auto ptr = MessageAllocTraits::allocate(allocator, 1); + MessageAllocTraits::construct(allocator, ptr, *message); + subscription->provide_intra_process_data(MessageUniquePtr(ptr, deleter)); + } + + continue; + } else { + throw std::runtime_error( + "failed to dynamic cast SubscriptionIntraProcessBase to " + "SubscriptionIntraProcessBuffer " + "which can happen when the publisher and " + "subscription use different allocator types, which is not supported"); + } + + /* + auto ros_message_subscription = std::dynamic_pointer_cast< + rclcpp::experimental::SubscriptionROSMsgIntraProcessBuffer + >(subscription_base); + if (nullptr == ros_message_subscription) { + throw std::runtime_error( + "failed to dynamic cast SubscriptionIntraProcessBase to " + "SubscriptionIntraProcessBuffer, or to " + "SubscriptionROSMsgIntraProcessBuffer which can happen when the publisher and " + "subscription use different allocator types, which is not supported"); + } + + if (std::next(it) == subscription_ids.end()) { + // If this is the last subscription, give up ownership + ros_message_subscription->provide_intra_process_message(std::move(message)); + // Last message delivered, break from for loop + break; + } else { + // Copy the message since we have additional subscriptions to serve + Deleter deleter = message.get_deleter(); + allocator::set_allocator_for_deleter(&deleter, &allocator); + auto ptr = MessageAllocTraits::allocate(allocator, 1); + MessageAllocTraits::construct(allocator, ptr, *message); + + ros_message_subscription->provide_intra_process_message( + MessageUniquePtr(ptr, deleter)); + } + */ + } + } + + PublisherToSubscriptionIdsMap pub_to_subs_; + PublisherToSubscriptionIdsMap pub_to_generic_subs_; SubscriptionMap subscriptions_; + SubscriptionMap generic_subscriptions_; PublisherMap publishers_; PublisherBufferMap publisher_buffers_; diff --git a/rclcpp/include/rclcpp/generic_subscription.hpp b/rclcpp/include/rclcpp/generic_subscription.hpp index ef3669ae96..c575ee4386 100644 --- a/rclcpp/include/rclcpp/generic_subscription.hpp +++ b/rclcpp/include/rclcpp/generic_subscription.hpp @@ -87,6 +87,57 @@ class GenericSubscription : public rclcpp::SubscriptionBase any_callback_(callback), ts_lib_(ts_lib) { + // Setup intra process publishing if requested. + if (rclcpp::detail::resolve_use_intra_process(options, *node_base)) { + using rclcpp::detail::resolve_intra_process_buffer_type; + + // Check if the QoS is compatible with intra-process. + auto qos_profile = get_actual_qos(); + if (qos_profile.history() != rclcpp::HistoryPolicy::KeepLast) { + throw std::invalid_argument( + "intraprocess communication on topic '" + topic_name + + "' allowed only with keep last history qos policy"); + } + if (qos_profile.depth() == 0) { + throw std::invalid_argument( + "intraprocess communication on topic '" + topic_name + + "' is not allowed with 0 depth qos policy"); + } + using MessageT = rclcpp::SerializedMessage; + using SubscribedTypeAllocatorTraits = allocator::AllocRebind; + using SubscribedTypeAllocator = typename SubscribedTypeAllocatorTraits::allocator_type; + using SubscribedTypeDeleter = allocator::Deleter; + + using SubscriptionIntraProcessT = rclcpp::experimental::SubscriptionIntraProcess< + MessageT, + MessageT, + SubscribedTypeAllocator, + SubscribedTypeDeleter, + MessageT, + AllocatorT>; + + // First create a SubscriptionIntraProcess which will be given to the intra-process manager. + auto context = node_base->get_context(); + subscription_intra_process_ = std::make_shared( + callback, + options.get_allocator(), + context, + this->get_topic_name(), // important to get like this, as it has the fully-qualified name + qos_profile, + resolve_intra_process_buffer_type(options.intra_process_buffer_type, callback)); + TRACETOOLS_TRACEPOINT( + rclcpp_subscription_init, + static_cast(get_subscription_handle().get()), + static_cast(subscription_intra_process_.get())); + + // Add it to the intra process manager. + using rclcpp::experimental::IntraProcessManager; + auto ipm = context->get_sub_context(); + uint64_t intra_process_subscription_id = ipm->template add_generic_subscription< + MessageT, SubscribedTypeAllocator>(subscription_intra_process_); + this->setup_intra_process(intra_process_subscription_id, ipm); + } + TRACETOOLS_TRACEPOINT( rclcpp_subscription_init, static_cast(get_subscription_handle().get()), diff --git a/rclcpp/src/rclcpp/generic_subscription.cpp b/rclcpp/src/rclcpp/generic_subscription.cpp index 1c52942edf..ced6e4682f 100644 --- a/rclcpp/src/rclcpp/generic_subscription.cpp +++ b/rclcpp/src/rclcpp/generic_subscription.cpp @@ -75,6 +75,12 @@ GenericSubscription::handle_serialized_message( const std::shared_ptr & message, const rclcpp::MessageInfo & message_info) { + if (matches_any_intra_process_publishers(&message_info.get_rmw_message_info().publisher_gid)) { + // In this case, the message will be delivered via intra process and + // we should ignore this copy of the message. + return; + } + any_callback_.dispatch(message, message_info); } diff --git a/rclcpp/src/rclcpp/intra_process_manager.cpp b/rclcpp/src/rclcpp/intra_process_manager.cpp index 831ffdf2ca..0ea1fedd06 100644 --- a/rclcpp/src/rclcpp/intra_process_manager.cpp +++ b/rclcpp/src/rclcpp/intra_process_manager.cpp @@ -53,6 +53,7 @@ IntraProcessManager::add_publisher( // Initialize the subscriptions storage for this publisher. pub_to_subs_[pub_id] = SplittedSubscriptions(); + pub_to_generic_subs_[pub_id] = SplittedSubscriptions(); // create an entry for the publisher id and populate with already existing subscriptions for (auto & pair : subscriptions_) { @@ -66,6 +67,19 @@ IntraProcessManager::add_publisher( } } + // create an entry for the publisher id and populate with already existing subscriptions + for (auto & pair : generic_subscriptions_) { + auto subscription = pair.second.lock(); + if (!subscription) { + continue; + } + if (can_communicate(publisher, subscription)) { + uint64_t sub_id = pair.first; + insert_generic_sub_id_for_pub(sub_id, pub_id, subscription->use_take_shared_method()); + } + } + + return pub_id; } @@ -75,6 +89,7 @@ IntraProcessManager::remove_subscription(uint64_t intra_process_subscription_id) std::unique_lock lock(mutex_); subscriptions_.erase(intra_process_subscription_id); + generic_subscriptions_.erase(intra_process_subscription_id); for (auto & pair : pub_to_subs_) { pair.second.take_shared_subscriptions.erase( @@ -91,6 +106,22 @@ IntraProcessManager::remove_subscription(uint64_t intra_process_subscription_id) intra_process_subscription_id), pair.second.take_ownership_subscriptions.end()); } + + for (auto & pair : pub_to_generic_subs_) { + pair.second.take_shared_subscriptions.erase( + std::remove( + pair.second.take_shared_subscriptions.begin(), + pair.second.take_shared_subscriptions.end(), + intra_process_subscription_id), + pair.second.take_shared_subscriptions.end()); + + pair.second.take_ownership_subscriptions.erase( + std::remove( + pair.second.take_ownership_subscriptions.begin(), + pair.second.take_ownership_subscriptions.end(), + intra_process_subscription_id), + pair.second.take_ownership_subscriptions.end()); + } } void @@ -101,6 +132,7 @@ IntraProcessManager::remove_publisher(uint64_t intra_process_publisher_id) publishers_.erase(intra_process_publisher_id); publisher_buffers_.erase(intra_process_publisher_id); pub_to_subs_.erase(intra_process_publisher_id); + pub_to_generic_subs_.erase(intra_process_publisher_id); } bool @@ -113,6 +145,7 @@ IntraProcessManager::matches_any_publishers(const rmw_gid_t * id) const if (!publisher) { continue; } + // publisher is of type "rclcpp::PublisherBase::WeakPtr" if (*publisher.get() == id) { return true; } @@ -125,8 +158,11 @@ IntraProcessManager::get_subscription_count(uint64_t intra_process_publisher_id) { std::shared_lock lock(mutex_); + size_t count = 0; + auto publisher_it = pub_to_subs_.find(intra_process_publisher_id); - if (publisher_it == pub_to_subs_.end()) { + auto publisher_g_it = pub_to_generic_subs_.find(intra_process_publisher_id); + if (publisher_it == pub_to_subs_.end() && publisher_g_it == pub_to_generic_subs_.end() ) { // Publisher is either invalid or no longer exists. RCLCPP_WARN( rclcpp::get_logger("rclcpp"), @@ -134,10 +170,15 @@ IntraProcessManager::get_subscription_count(uint64_t intra_process_publisher_id) return 0; } - auto count = - publisher_it->second.take_shared_subscriptions.size() + - publisher_it->second.take_ownership_subscriptions.size(); + if (publisher_it != pub_to_subs_.end()) { + count += publisher_it->second.take_shared_subscriptions.size(); + count += publisher_it->second.take_ownership_subscriptions.size(); + } + if (publisher_g_it != pub_to_generic_subs_.end()) { + count += publisher_g_it->second.take_shared_subscriptions.size(); + count += publisher_g_it->second.take_ownership_subscriptions.size(); + } return count; } @@ -147,9 +188,14 @@ IntraProcessManager::get_subscription_intra_process(uint64_t intra_process_subsc std::shared_lock lock(mutex_); auto subscription_it = subscriptions_.find(intra_process_subscription_id); - if (subscription_it == subscriptions_.end()) { + auto subscription_g_it = generic_subscriptions_.find(intra_process_subscription_id); + if (subscription_it == subscriptions_.end() && + subscription_g_it == generic_subscriptions_.end()) + { return nullptr; - } else { + } + + if (subscription_it != subscriptions_.end()) { auto subscription = subscription_it->second.lock(); if (subscription) { return subscription; @@ -158,6 +204,18 @@ IntraProcessManager::get_subscription_intra_process(uint64_t intra_process_subsc return nullptr; } } + + if (subscription_g_it != generic_subscriptions_.end()) { + auto subscription = subscription_g_it->second.lock(); + if (subscription) { + return subscription; + } else { + generic_subscriptions_.erase(subscription_g_it); + return nullptr; + } + } + + return nullptr; } uint64_t @@ -195,6 +253,19 @@ IntraProcessManager::insert_sub_id_for_pub( } } +void +IntraProcessManager::insert_generic_sub_id_for_pub( + uint64_t sub_id, + uint64_t pub_id, + bool use_take_shared_method) +{ + if (use_take_shared_method) { + pub_to_generic_subs_[pub_id].take_shared_subscriptions.push_back(sub_id); + } else { + pub_to_generic_subs_[pub_id].take_ownership_subscriptions.push_back(sub_id); + } +} + bool IntraProcessManager::can_communicate( rclcpp::PublisherBase::SharedPtr pub, @@ -219,7 +290,8 @@ IntraProcessManager::lowest_available_capacity(const uint64_t intra_process_publ size_t capacity = std::numeric_limits::max(); auto publisher_it = pub_to_subs_.find(intra_process_publisher_id); - if (publisher_it == pub_to_subs_.end()) { + auto publisher_g_it = pub_to_generic_subs_.find(intra_process_publisher_id); + if (publisher_it == pub_to_subs_.end() && publisher_g_it == pub_to_generic_subs_.end()) { // Publisher is either invalid or no longer exists. RCLCPP_WARN( rclcpp::get_logger("rclcpp"), @@ -227,9 +299,19 @@ IntraProcessManager::lowest_available_capacity(const uint64_t intra_process_publ return 0u; } - if (publisher_it->second.take_shared_subscriptions.empty() && - publisher_it->second.take_ownership_subscriptions.empty()) - { + bool a = false; + bool b = false; + if (publisher_it != pub_to_subs_.end()) { + a = publisher_it->second.take_shared_subscriptions.empty() && + publisher_it->second.take_ownership_subscriptions.empty(); + } + + if (publisher_g_it != pub_to_generic_subs_.end()) { + b = publisher_g_it->second.take_shared_subscriptions.empty() && + publisher_g_it->second.take_ownership_subscriptions.empty(); + } + + if (!(a || b)) { // no subscriptions available return 0u; } @@ -250,12 +332,42 @@ IntraProcessManager::lowest_available_capacity(const uint64_t intra_process_publ } }; - for (const auto sub_id : publisher_it->second.take_shared_subscriptions) { - available_capacity(sub_id); + auto available_g_capacity = [this, &capacity](const uint64_t intra_process_subscription_id) + { + auto subscription_it = generic_subscriptions_.find(intra_process_subscription_id); + if (subscription_it != generic_subscriptions_.end()) { + auto subscription = subscription_it->second.lock(); + if (subscription) { + capacity = std::min(capacity, subscription->available_capacity()); + } + } else { + // Subscription is either invalid or no longer exists. + RCLCPP_WARN( + rclcpp::get_logger("rclcpp"), + "Calling available_capacity for invalid or no longer existing subscription id"); + } + }; + + + if (publisher_it != pub_to_subs_.end()) { + for (const auto sub_id : publisher_it->second.take_shared_subscriptions) { + available_capacity(sub_id); + } + + for (const auto sub_id : publisher_it->second.take_ownership_subscriptions) { + available_capacity(sub_id); + } } - for (const auto sub_id : publisher_it->second.take_ownership_subscriptions) { - available_capacity(sub_id); + if (publisher_g_it != pub_to_generic_subs_.end()) { + + for (const auto sub_id : publisher_g_it->second.take_shared_subscriptions) { + available_g_capacity(sub_id); + } + + for (const auto sub_id : publisher_g_it->second.take_ownership_subscriptions) { + available_g_capacity(sub_id); + } } return capacity; diff --git a/rclcpp/test/rclcpp/test_generic_pubsub.cpp b/rclcpp/test/rclcpp/test_generic_pubsub.cpp index 498322334a..edbe38f37c 100644 --- a/rclcpp/test/rclcpp/test_generic_pubsub.cpp +++ b/rclcpp/test/rclcpp/test_generic_pubsub.cpp @@ -44,10 +44,12 @@ class RclcppGenericNodeFixture : public Test public: RclcppGenericNodeFixture() { - node_ = std::make_shared("pubsub"); + node_ = std::make_shared( + "pubsub", + rclcpp::NodeOptions().start_parameter_event_publisher(false).use_intra_process_comms(true)); publisher_node_ = std::make_shared( "publisher_node", - rclcpp::NodeOptions().start_parameter_event_publisher(false)); + rclcpp::NodeOptions().start_parameter_event_publisher(false).use_intra_process_comms(true)); } static void SetUpTestCase() @@ -188,6 +190,82 @@ TEST_F(RclcppGenericNodeFixture, publisher_and_subscriber_work) EXPECT_THAT(subscribed_messages[0], StrEq("Hello World")); } +TEST_F(RclcppGenericNodeFixture, publisher_and_subscriber_intra_unique) +{ + // We currently publish more messages because they can get lost + std::vector test_messages = {"Hello World", "Hello World"}; + std::string topic_name = "/string_topic_intra"; + std::string type = "test_msgs/msg/Strings"; + + auto publisher = node_->create_publisher(topic_name, rclcpp::QoS(1)); + + auto subscriber_future_ = std::async( + std::launch::async, [this, topic_name, type] { + return subscribe_raw_messages(1, topic_name, type); + }); + + // TODO(karsten1987): Port 'wait_for_sub' to rclcpp + auto allocator = node_->get_node_options().allocator(); + auto success = false; + auto ret = rcl_wait_for_subscribers( + node_->get_node_base_interface()->get_rcl_node_handle(), + &allocator, + topic_name.c_str(), + 1u, + static_cast(1e9), // timeout in ns + &success); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + ASSERT_TRUE(success); + + for (const auto & message : test_messages) { + auto msg1 = std::make_unique(); + msg1->string_value = message; + publisher->publish(std::move(msg1)); + } + + auto subscribed_messages = subscriber_future_.get(); + EXPECT_THAT(subscribed_messages, SizeIs(Not(0))); + EXPECT_THAT(subscribed_messages[0], StrEq("Hello World")); +} + +TEST_F(RclcppGenericNodeFixture, publisher_and_subscriber_intra_shared) +{ + // We currently publish more messages because they can get lost + std::vector test_messages = {"Hello World", "Hello World"}; + std::string topic_name = "/string_topic_intra"; + std::string type = "test_msgs/msg/Strings"; + + auto publisher = node_->create_publisher(topic_name, rclcpp::QoS(1)); + + auto subscriber_future_ = std::async( + std::launch::async, [this, topic_name, type] { + return subscribe_raw_messages(1, topic_name, type); + }); + + // TODO(karsten1987): Port 'wait_for_sub' to rclcpp + auto allocator = node_->get_node_options().allocator(); + auto success = false; + auto ret = rcl_wait_for_subscribers( + node_->get_node_base_interface()->get_rcl_node_handle(), + &allocator, + topic_name.c_str(), + 1u, + static_cast(1e9), // timeout in ns + &success); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + ASSERT_TRUE(success); + + for (const auto & message : test_messages) { + test_msgs::msg::Strings msg2; + msg2.string_value = message; + publisher->publish(msg2); + } + + auto subscribed_messages = subscriber_future_.get(); + EXPECT_THAT(subscribed_messages, SizeIs(Not(0))); + EXPECT_THAT(subscribed_messages[0], StrEq("Hello World")); +} + TEST_F(RclcppGenericNodeFixture, publish_loaned_msg_work) { // We currently publish more messages because they can get lost