From 3de19d40897279e379aa8f6e041461c66f1edc3d Mon Sep 17 00:00:00 2001 From: Janosch Machowinski Date: Fri, 21 Feb 2025 16:33:31 +0100 Subject: [PATCH 1/2] fix: Fixed expiring of goals if events executor is used (#2674) * feat: Add ClockWaiter and ClockConditionalVariable Added two new synchronization primitives to perform waits using an rclcpp::Clock. Signed-off-by: Janosch Machowinski * fix: Fixed expiring of goals if events executor is used This commit is only relevant if the events executor is used. This commit starts a background thread, to create events for the expire timer of the action. Without this the action server will not expire old goal results leading to memory and performance issues. Signed-off-by: Janosch Machowinski --------- Signed-off-by: Janosch Machowinski Co-authored-by: Janosch Machowinski --- rclcpp/src/rclcpp/clock.cpp | 2 +- rclcpp_action/src/server.cpp | 145 +++++++++++++++++++++++++++++++++++ 2 files changed, 146 insertions(+), 1 deletion(-) diff --git a/rclcpp/src/rclcpp/clock.cpp b/rclcpp/src/rclcpp/clock.cpp index 46c1f09981..bf048fa338 100644 --- a/rclcpp/src/rclcpp/clock.cpp +++ b/rclcpp/src/rclcpp/clock.cpp @@ -108,7 +108,7 @@ Clock::sleep_until( }); // No longer need the shutdown callback when this function exits auto callback_remover = rcpputils::scope_exit( - [context, &shutdown_cb_handle]() { + [&context, &shutdown_cb_handle]() { context->remove_on_shutdown_callback(shutdown_cb_handle); }); diff --git a/rclcpp_action/src/server.cpp b/rclcpp_action/src/server.cpp index d3812f18ba..1422f2e65e 100644 --- a/rclcpp_action/src/server.cpp +++ b/rclcpp_action/src/server.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -86,6 +87,147 @@ class ServerBaseImpl { } + ~ServerBaseImpl() + { + stop_expire_thread(); + } + + std::recursive_mutex expire_thread_reentrant_mutex_; + std::thread expire_thread; + + // must be a class member, so keep it in scope + std::function expire_reset_callback; + + rclcpp::ClockConditionalVariable::SharedPtr expire_cv; + std::atomic expire_time_changed = false; + std::atomic terminate_expire_thread = true; + + void + stop_expire_thread() + { + std::lock_guard lock(expire_thread_reentrant_mutex_); + if(expire_cv) { + { + // Note, even though terminate_expire_thread is an atomic, we + // need to acquire the mutex, to avoid a race between the wait_until + // and the setting of this terminate_expire_thread + std::lock_guard l(expire_cv->mutex()); + terminate_expire_thread = true; + } + expire_cv->notify_one(); + expire_thread.join(); + expire_reset_callback = std::function(); + expire_cv.reset(); + } + } + + void + start_expire_thread(std::function expire_ready_cb) + { + { + std::lock_guard lock(expire_thread_reentrant_mutex_); + if(expire_cv) { + return; + } + + terminate_expire_thread = false; + expire_cv = std::make_shared(clock_); + } + + rclcpp::Context::SharedPtr context = rclcpp::contexts::get_global_default_context(); + std::shared_ptr rcl_context = context->get_rcl_context(); + + rcl_wait_set_t wait_set = rcl_get_zero_initialized_wait_set(); + rcl_ret_t ret = + rcl_wait_set_init(&wait_set, num_subscriptions_, num_guard_conditions_, num_timers_, + num_clients_, num_services_, 0, rcl_context.get(), rcl_get_default_allocator()); + if(ret != RCL_RET_OK) { + throw std::runtime_error("Internal error starting timer thread"); + } + + ret = rcl_action_wait_set_add_action_server( + &wait_set, action_server_.get(), NULL); + if (RCL_RET_OK != ret) { + rclcpp::exceptions::throw_from_rcl_error(ret, "ServerBase::add_to_wait_set() failed"); + } + + // extract the timer from the wait set + const rcl_timer_t *expire_timer = wait_set.timers[0]; + + // don't need the wait set any more + ret = rcl_wait_set_fini(&wait_set); + + // the maybe_unused is needed here to satisfy cpplint + expire_reset_callback = [this] ([[maybe_unused]] size_t reset_calls) + { + { + std::lock_guard l(expire_cv->mutex()); + expire_time_changed = true; + } + expire_cv->notify_one(); + }; + + rcl_event_callback_t rcl_reset_callback = rclcpp::detail::cpp_callback_trampoline< + decltype(expire_reset_callback), const void *, size_t>; + + ret = rcl_timer_set_on_reset_callback(expire_timer, rcl_reset_callback, + static_cast(&expire_reset_callback)); + if (ret != RCL_RET_OK) { + rclcpp::exceptions::throw_from_rcl_error(ret, "Failed to set timer on reset callback"); + } + + expire_thread = std::thread([this, expire_timer, expire_ready_callback = expire_ready_cb]() { + rcl_clock_type_t clock_type = clock_->get_clock_type(); + + while(!terminate_expire_thread) { + { + std::unique_lock l(expire_cv->mutex()); + + // reset to false, we are handling the time change right now + expire_time_changed = false; + int64_t time_until_call = 0; + + auto ret2 = rcl_timer_get_time_until_next_call(expire_timer, &time_until_call); + if (ret2 == RCL_RET_TIMER_CANCELED) { + rclcpp::Time endless(std::chrono::duration_cast( + std::chrono::hours(100)).count(), clock_type); + expire_cv->wait_until(l, endless, [this] () -> bool { + return expire_time_changed || terminate_expire_thread; + }); + continue; + } + + if(time_until_call > 0) { + rclcpp::Time next_wakeup(clock_->now().nanoseconds() + time_until_call, clock_type); + expire_cv->wait_until(l, next_wakeup, [this] () -> bool { + return expire_time_changed || terminate_expire_thread; + }); + } + } + + // don't call expire callback if we are terminating + if(terminate_expire_thread) { + return; + } + + bool is_ready = false; + if(rcl_timer_is_ready(expire_timer, &is_ready) == RCL_RET_OK && is_ready) { + // we need to cancel the timer here, to avoid a endless loop + // in case a new goal expires, the timer will be reset. + auto ret2 = rcl_timer_cancel(const_cast(expire_timer)); + if(ret2 != RCL_RET_OK) { + rclcpp::exceptions::throw_from_rcl_error(ret2, "Failed to cancel timer"); + } + + std::lock_guard lock(expire_thread_reentrant_mutex_); + if(expire_ready_callback) { + expire_ready_callback(1, static_cast(ServerBase::EntityType::Expired)); + } + } + } + }); + } + // Lock for action_server_ std::recursive_mutex action_server_reentrant_mutex_; @@ -808,6 +950,8 @@ ServerBase::set_on_ready_callback(std::function callback) "is not callable."); } + pimpl_->start_expire_thread(callback); + set_callback_to_entity(EntityType::GoalService, callback); set_callback_to_entity(EntityType::ResultService, callback); set_callback_to_entity(EntityType::CancelService, callback); @@ -928,6 +1072,7 @@ ServerBase::clear_on_ready_callback() set_on_ready_callback(EntityType::GoalService, nullptr, nullptr); set_on_ready_callback(EntityType::ResultService, nullptr, nullptr); set_on_ready_callback(EntityType::CancelService, nullptr, nullptr); + pimpl_->stop_expire_thread(); on_ready_callback_set_ = false; } From 75000f3c7e25d781d182cef071aba50c1968c55f Mon Sep 17 00:00:00 2001 From: Janosch Machowinski Date: Fri, 11 Apr 2025 11:38:59 +0200 Subject: [PATCH 2/2] fix(rclcpp_action): Fix sleep of expire thread in case of canceled timer (#2800) This fixes a bug, that the expire action thread would not sleep as, the sleep duration was not computed correctly. Signed-off-by: Janosch Machowinski Co-authored-by: Janosch Machowinski --- rclcpp_action/src/server.cpp | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/rclcpp_action/src/server.cpp b/rclcpp_action/src/server.cpp index 1422f2e65e..f1fbf0fc35 100644 --- a/rclcpp_action/src/server.cpp +++ b/rclcpp_action/src/server.cpp @@ -189,9 +189,8 @@ class ServerBaseImpl auto ret2 = rcl_timer_get_time_until_next_call(expire_timer, &time_until_call); if (ret2 == RCL_RET_TIMER_CANCELED) { - rclcpp::Time endless(std::chrono::duration_cast( - std::chrono::hours(100)).count(), clock_type); - expire_cv->wait_until(l, endless, [this] () -> bool { + rclcpp::Duration endless(std::chrono::hours(100)); + expire_cv->wait_until(l, clock_->now() + endless, [this] () -> bool { return expire_time_changed || terminate_expire_thread; }); continue;