diff --git a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp index a403c20c8b..51b0445324 100644 --- a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp @@ -19,6 +19,7 @@ #include +#include #include #include #include @@ -386,6 +387,39 @@ class IntraProcessManager std::vector take_ownership_subscriptions; }; + /// Hash function for rmw_gid_t to enable use in unordered_map + struct rmw_gid_hash + { + std::size_t operator()(const rmw_gid_t & gid) const noexcept + { + // Using the FNV-1a hash algorithm on the gid data + constexpr std::size_t FNV_prime = 1099511628211u; + std::size_t result = 14695981039346656037u; + + for (std::size_t i = 0; i < RMW_GID_STORAGE_SIZE; ++i) { + result ^= gid.data[i]; + result *= FNV_prime; + } + return result; + } + }; + + /// Equality comparison for rmw_gid_t to enable use in unordered_map + struct rmw_gid_equal + { + bool operator()(const rmw_gid_t & lhs, const rmw_gid_t & rhs) const noexcept + { + // Compare the data bytes only. + // implementation_identifier pointer comparison is not used here because + // intra-process communication is always within the same process and RMW, + // and pointer comparison is fragile across dynamically loaded components. + return std::equal( + std::begin(lhs.data), + std::end(lhs.data), + std::begin(rhs.data)); + } + }; + using SubscriptionMap = std::unordered_map; @@ -398,6 +432,16 @@ class IntraProcessManager using PublisherToSubscriptionIdsMap = std::unordered_map; + /// Structure to store publisher information in GID lookup map + struct PublisherInfo + { + uint64_t pub_id; + rclcpp::PublisherBase::WeakPtr publisher; + }; + + using GidToPublisherInfoMap = + std::unordered_map; + RCLCPP_PUBLIC static uint64_t @@ -642,6 +686,8 @@ class IntraProcessManager PublisherBufferMap publisher_buffers_; mutable std::shared_timed_mutex mutex_; + + GidToPublisherInfoMap gid_to_publisher_info_; }; } // namespace experimental diff --git a/rclcpp/src/rclcpp/intra_process_manager.cpp b/rclcpp/src/rclcpp/intra_process_manager.cpp index 831ffdf2ca..f7bc6fa9b3 100644 --- a/rclcpp/src/rclcpp/intra_process_manager.cpp +++ b/rclcpp/src/rclcpp/intra_process_manager.cpp @@ -51,6 +51,9 @@ IntraProcessManager::add_publisher( } } + // Add GID to publisher info mapping for fast lookups (stores both ID and weak_ptr) + gid_to_publisher_info_[publisher->get_gid()] = {pub_id, publisher}; + // Initialize the subscriptions storage for this publisher. pub_to_subs_[pub_id] = SplittedSubscriptions(); @@ -98,6 +101,24 @@ IntraProcessManager::remove_publisher(uint64_t intra_process_publisher_id) { std::unique_lock lock(mutex_); + // Remove GID to publisher info mapping. + // First try via the publisher's own GID (fast path). + auto pub_it = publishers_.find(intra_process_publisher_id); + if (pub_it != publishers_.end()) { + auto publisher = pub_it->second.lock(); + if (publisher) { + gid_to_publisher_info_.erase(publisher->get_gid()); + } else { + // Publisher weak_ptr already expired, fall back to linear scan by pub_id. + for (auto git = gid_to_publisher_info_.begin(); git != gid_to_publisher_info_.end(); ++git) { + if (git->second.pub_id == intra_process_publisher_id) { + gid_to_publisher_info_.erase(git); + break; + } + } + } + } + publishers_.erase(intra_process_publisher_id); publisher_buffers_.erase(intra_process_publisher_id); pub_to_subs_.erase(intra_process_publisher_id); @@ -108,16 +129,15 @@ IntraProcessManager::matches_any_publishers(const rmw_gid_t * id) const { std::shared_lock lock(mutex_); - for (auto & publisher_pair : publishers_) { - auto publisher = publisher_pair.second.lock(); - if (!publisher) { - continue; - } - if (*publisher.get() == id) { - return true; - } + // Single O(1) hash map lookup - struct contains both ID and weak_ptr + auto it = gid_to_publisher_info_.find(*id); + if (it == gid_to_publisher_info_.end()) { + return false; } - return false; + + // Verify the publisher still exists by checking the weak_ptr + auto publisher = it->second.publisher.lock(); + return publisher != nullptr; } size_t diff --git a/rclcpp/test/rclcpp/test_intra_process_manager.cpp b/rclcpp/test/rclcpp/test_intra_process_manager.cpp index 5210f6eac0..838b8298b8 100644 --- a/rclcpp/test/rclcpp/test_intra_process_manager.cpp +++ b/rclcpp/test/rclcpp/test_intra_process_manager.cpp @@ -162,7 +162,14 @@ class PublisherBase explicit PublisherBase(const std::string & topic, const rclcpp::QoS & qos) : topic_name(topic), qos_profile(qos) - {} + { + // Initialize a mock GID with unique data based on this pointer + gid_.implementation_identifier = "mock_rmw"; + auto ptr_value = reinterpret_cast(this); + for (size_t i = 0; i < RMW_GID_STORAGE_SIZE; ++i) { + gid_.data[i] = static_cast((ptr_value >> (i * 8)) & 0xFF); + } + } virtual ~PublisherBase() {} @@ -192,6 +199,12 @@ class PublisherBase return qos_profile.durability() == rclcpp::DurabilityPolicy::TransientLocal; } + const rmw_gid_t & + get_gid() const + { + return gid_; + } + bool operator==([[maybe_unused]] const rmw_gid_t & gid) const { @@ -210,6 +223,7 @@ class PublisherBase private: std::string topic_name; rclcpp::QoS qos_profile; + rmw_gid_t gid_; }; template>