Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rclcpp/src/rclcpp/clock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ Clock::sleep_until(
});
// No longer need the shutdown callback when this function exits
auto callback_remover = rcpputils::scope_exit(
[context, &shutdown_cb_handle]() {
[&context, &shutdown_cb_handle]() {
context->remove_on_shutdown_callback(shutdown_cb_handle);
});

Expand Down
144 changes: 144 additions & 0 deletions rclcpp_action/src/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <inttypes.h>
#include <memory>
#include <mutex>
#include <string>
Expand Down Expand Up @@ -86,6 +87,146 @@ class ServerBaseImpl
{
}

~ServerBaseImpl()
{
stop_expire_thread();
}

std::recursive_mutex expire_thread_reentrant_mutex_;
std::thread expire_thread;

// must be a class member, so keep it in scope
std::function<void(size_t)> expire_reset_callback;

rclcpp::ClockConditionalVariable::SharedPtr expire_cv;
std::atomic<bool> expire_time_changed = false;
std::atomic<bool> terminate_expire_thread = true;

void
stop_expire_thread()
{
std::lock_guard<std::recursive_mutex> lock(expire_thread_reentrant_mutex_);
if(expire_cv) {
{
// Note, even though terminate_expire_thread is an atomic, we
// need to acquire the mutex, to avoid a race between the wait_until
// and the setting of this terminate_expire_thread
std::lock_guard<std::mutex> l(expire_cv->mutex());
terminate_expire_thread = true;
}
expire_cv->notify_one();
expire_thread.join();
expire_reset_callback = std::function<void(size_t)>();
expire_cv.reset();
}
}

void
start_expire_thread(std::function<void(size_t, int)> expire_ready_cb)
{
{
std::lock_guard<std::recursive_mutex> lock(expire_thread_reentrant_mutex_);
if(expire_cv) {
return;
}

terminate_expire_thread = false;
expire_cv = std::make_shared<rclcpp::ClockConditionalVariable>(clock_);
}

rclcpp::Context::SharedPtr context = rclcpp::contexts::get_global_default_context();
std::shared_ptr<rcl_context_t> rcl_context = context->get_rcl_context();

rcl_wait_set_t wait_set = rcl_get_zero_initialized_wait_set();
rcl_ret_t ret =
rcl_wait_set_init(&wait_set, num_subscriptions_, num_guard_conditions_, num_timers_,
num_clients_, num_services_, 0, rcl_context.get(), rcl_get_default_allocator());
if(ret != RCL_RET_OK) {
throw std::runtime_error("Internal error starting timer thread");
}

ret = rcl_action_wait_set_add_action_server(
&wait_set, action_server_.get(), NULL);
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "ServerBase::add_to_wait_set() failed");
}

// extract the timer from the wait set
const rcl_timer_t *expire_timer = wait_set.timers[0];

// don't need the wait set any more
ret = rcl_wait_set_fini(&wait_set);

// the maybe_unused is needed here to satisfy cpplint
expire_reset_callback = [this] ([[maybe_unused]] size_t reset_calls)
{
{
std::lock_guard<std::mutex> l(expire_cv->mutex());
expire_time_changed = true;
}
expire_cv->notify_one();
};

rcl_event_callback_t rcl_reset_callback = rclcpp::detail::cpp_callback_trampoline<
decltype(expire_reset_callback), const void *, size_t>;

ret = rcl_timer_set_on_reset_callback(expire_timer, rcl_reset_callback,
static_cast<const void *>(&expire_reset_callback));
if (ret != RCL_RET_OK) {
rclcpp::exceptions::throw_from_rcl_error(ret, "Failed to set timer on reset callback");
}

expire_thread = std::thread([this, expire_timer, expire_ready_callback = expire_ready_cb]() {
rcl_clock_type_t clock_type = clock_->get_clock_type();

while(!terminate_expire_thread) {
{
std::unique_lock<std::mutex> l(expire_cv->mutex());

// reset to false, we are handling the time change right now
expire_time_changed = false;
int64_t time_until_call = 0;

auto ret2 = rcl_timer_get_time_until_next_call(expire_timer, &time_until_call);
if (ret2 == RCL_RET_TIMER_CANCELED) {
rclcpp::Duration endless(std::chrono::hours(100));
expire_cv->wait_until(l, clock_->now() + endless, [this] () -> bool {
return expire_time_changed || terminate_expire_thread;
});
continue;
}

if(time_until_call > 0) {
rclcpp::Time next_wakeup(clock_->now().nanoseconds() + time_until_call, clock_type);
expire_cv->wait_until(l, next_wakeup, [this] () -> bool {
return expire_time_changed || terminate_expire_thread;
});
}
}

// don't call expire callback if we are terminating
if(terminate_expire_thread) {
return;
}

bool is_ready = false;
if(rcl_timer_is_ready(expire_timer, &is_ready) == RCL_RET_OK && is_ready) {
// we need to cancel the timer here, to avoid a endless loop
// in case a new goal expires, the timer will be reset.
auto ret2 = rcl_timer_cancel(const_cast<rcl_timer_t *>(expire_timer));
if(ret2 != RCL_RET_OK) {
rclcpp::exceptions::throw_from_rcl_error(ret2, "Failed to cancel timer");
}

std::lock_guard<std::recursive_mutex> lock(expire_thread_reentrant_mutex_);
if(expire_ready_callback) {
expire_ready_callback(1, static_cast<int>(ServerBase::EntityType::Expired));
}
}
}
});
}

// Lock for action_server_
std::recursive_mutex action_server_reentrant_mutex_;

Expand Down Expand Up @@ -808,6 +949,8 @@ ServerBase::set_on_ready_callback(std::function<void(size_t, int)> callback)
"is not callable.");
}

pimpl_->start_expire_thread(callback);

set_callback_to_entity(EntityType::GoalService, callback);
set_callback_to_entity(EntityType::ResultService, callback);
set_callback_to_entity(EntityType::CancelService, callback);
Expand Down Expand Up @@ -928,6 +1071,7 @@ ServerBase::clear_on_ready_callback()
set_on_ready_callback(EntityType::GoalService, nullptr, nullptr);
set_on_ready_callback(EntityType::ResultService, nullptr, nullptr);
set_on_ready_callback(EntityType::CancelService, nullptr, nullptr);
pimpl_->stop_expire_thread();
on_ready_callback_set_ = false;
}

Expand Down