From f7b971967dd9e64e0c1fcd5759ab60a838c4b71a Mon Sep 17 00:00:00 2001 From: Steve Gerbino Date: Thu, 12 Feb 2026 02:02:02 +0100 Subject: [PATCH] Add multi-waiter support, cancel_one, and scheduler_impl intermediary Refactor timer internals to support multiple concurrent waiters on a single timer via per-waiter waiter_node linked with intrusive_list. Add constructors with initial expiry, cancel_one() for FIFO single cancellation, and return cancelled waiter counts from cancel(), expires_at(), and expires_after(). Add a waiter node cache for the wait-then-complete hot path. Introduce scheduler_impl as a private intermediary between the public scheduler interface and concrete backend schedulers (epoll, select, kqueue, iocp). Move timer_svc_ from each backend into scheduler_impl and remove the timer_svc() pure virtual from the public scheduler interface. Timer service lookup in timer_service_create now goes through timer_service_access to get the scheduler_impl and its cached timer_svc_ pointer. Update timer guide documentation with cancel_one, multi-waiter usage, and return values. Increase timer durations in cancel_one and stop-token-cancels-one tests from 50ms to 500ms to avoid CI flakiness on slow machines. --- doc/modules/ROOT/pages/4.guide/4h.timers.adoc | 47 +- include/boost/corosio/basic_io_context.hpp | 15 +- include/boost/corosio/detail/scheduler.hpp | 1 + include/boost/corosio/timer.hpp | 76 ++- src/corosio/src/detail/dispatch_coro.hpp | 3 +- src/corosio/src/detail/epoll/scheduler.cpp | 1 + src/corosio/src/detail/epoll/scheduler.hpp | 6 +- src/corosio/src/detail/iocp/scheduler.hpp | 7 +- src/corosio/src/detail/kqueue/scheduler.cpp | 1 + src/corosio/src/detail/kqueue/scheduler.hpp | 6 +- src/corosio/src/detail/scheduler_impl.hpp | 28 + src/corosio/src/detail/select/scheduler.cpp | 1 + src/corosio/src/detail/select/scheduler.hpp | 6 +- src/corosio/src/detail/timer_service.cpp | 498 +++++++++++++----- src/corosio/src/timer.cpp | 34 +- test/unit/timer.cpp | 466 ++++++++++++++++ 16 files changed, 1002 insertions(+), 194 deletions(-) create mode 100644 src/corosio/src/detail/scheduler_impl.hpp diff --git a/doc/modules/ROOT/pages/4.guide/4h.timers.adoc b/doc/modules/ROOT/pages/4.guide/4h.timers.adoc index 4260361b..ecfe63f6 100644 --- a/doc/modules/ROOT/pages/4.guide/4h.timers.adoc +++ b/doc/modules/ROOT/pages/4.guide/4h.timers.adoc @@ -1,5 +1,6 @@ // // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) +// Copyright (c) 2026 Steve Gerbino // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -83,12 +84,17 @@ if (!ec) === Cancellation +`cancel()` cancels all pending waits. `cancel_one()` cancels only the +oldest pending wait ( FIFO order ). Both return the number of operations +cancelled: + [source,cpp] ---- -t.cancel(); // Pending wait completes with capy::error::canceled +std::size_t n = t.cancel(); // Cancel all pending waits +std::size_t m = t.cancel_one(); // Cancel oldest pending wait (0 or 1) ---- -The wait completes immediately with an error: +The cancelled wait completes with an error: [source,cpp] ---- @@ -111,14 +117,47 @@ clock adjustments. == Resetting Timers -Setting a new expiry cancels any pending wait: +Setting a new expiry cancels any pending waits and returns the number +cancelled: [source,cpp] ---- t.expires_after(10s); // Later, before 10s elapses: -t.expires_after(5s); // Resets to 5s, cancels previous wait +std::size_t n = t.expires_after(5s); // Resets to 5s, cancels previous waits +---- + +== Multiple Waiters + +Multiple coroutines can wait on the same timer concurrently. When the +timer expires, all waiters complete with success. When cancelled, all +waiters complete with `capy::error::canceled`: + +[source,cpp] ---- +capy::task waiter(corosio::timer& t, int id) +{ + auto [ec] = co_await t.wait(); + if (!ec) + std::cout << "Waiter " << id << " expired\n"; +} + +capy::task multi_wait(corosio::io_context& ioc) +{ + corosio::timer t(ioc); + t.expires_after(1s); + + // All three coroutines wait on the same timer + co_await capy::when_all( + waiter(t, 1), + waiter(t, 2), + waiter(t, 3)); +} +---- + +Each waiter has independent stop token cancellation. Cancelling one +waiter's stop token does not affect the others. `cancel_one()` cancels +the oldest waiter only. == Use Cases diff --git a/include/boost/corosio/basic_io_context.hpp b/include/boost/corosio/basic_io_context.hpp index 56431f8b..46c3a2f3 100644 --- a/include/boost/corosio/basic_io_context.hpp +++ b/include/boost/corosio/basic_io_context.hpp @@ -12,15 +12,19 @@ #include #include -#include #include #include +#include #include #include namespace boost::corosio { +namespace detail { +struct timer_service_access; +} // namespace detail + /** Base class for I/O context implementations. This class provides the common API for all I/O context types. @@ -33,6 +37,8 @@ namespace boost::corosio { */ class BOOST_COROSIO_DECL basic_io_context : public capy::execution_context { + friend struct detail::timer_service_access; + public: /** The executor type for this context. */ class executor_type; @@ -254,15 +260,14 @@ class BOOST_COROSIO_DECL basic_io_context : public capy::execution_context Derived classes must set sched_ in their constructor body. */ basic_io_context() - : sched_(nullptr) + : capy::execution_context(this) + , sched_(nullptr) { } detail::scheduler* sched_; }; -//------------------------------------------------------------------------------ - /** An executor for dispatching work to an I/O context. The executor provides the interface for posting work items and @@ -392,8 +397,6 @@ class basic_io_context::executor_type } }; -//------------------------------------------------------------------------------ - inline basic_io_context::executor_type basic_io_context:: diff --git a/include/boost/corosio/detail/scheduler.hpp b/include/boost/corosio/detail/scheduler.hpp index fc9635cd..f87ef8af 100644 --- a/include/boost/corosio/detail/scheduler.hpp +++ b/include/boost/corosio/detail/scheduler.hpp @@ -1,5 +1,6 @@ // // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) +// Copyright (c) 2026 Steve Gerbino // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) diff --git a/include/boost/corosio/timer.hpp b/include/boost/corosio/timer.hpp index 25e816c5..d9952b1c 100644 --- a/include/boost/corosio/timer.hpp +++ b/include/boost/corosio/timer.hpp @@ -1,5 +1,6 @@ // // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) +// Copyright (c) 2026 Steve Gerbino // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -22,10 +23,9 @@ #include #include -#include #include +#include #include -#include namespace boost::corosio { @@ -35,13 +35,17 @@ namespace boost::corosio { awaitable types. The timer can be used to schedule operations to occur after a specified duration or at a specific time point. + Multiple coroutines may wait concurrently on the same timer. + When the timer expires, all waiters complete with success. When + the timer is cancelled, all waiters complete with an error that + compares equal to `capy::cond::canceled`. + Each timer operation participates in the affine awaitable protocol, ensuring coroutines resume on the correct executor. @par Thread Safety Distinct objects: Safe.@n - Shared objects: Unsafe. A timer must not have concurrent wait - operations. + Shared objects: Unsafe. @par Semantics Wraps platform timer facilities via the io_context reactor. @@ -111,6 +115,27 @@ class BOOST_COROSIO_DECL timer : public io_object */ explicit timer(capy::execution_context& ctx); + /** Construct a timer with an initial absolute expiry time. + + @param ctx The execution context that will own this timer. + @param t The initial expiry time point. + */ + timer(capy::execution_context& ctx, time_point t); + + /** Construct a timer with an initial relative expiry time. + + @param ctx The execution context that will own this timer. + @param d The initial expiry duration relative to now. + */ + template + timer( + capy::execution_context& ctx, + std::chrono::duration d) + : timer(ctx) + { + expires_after(d); + } + /** Move constructor. Transfers ownership of the timer resources. @@ -135,14 +160,26 @@ class BOOST_COROSIO_DECL timer : public io_object timer(timer const&) = delete; timer& operator=(timer const&) = delete; - /** Cancel any pending asynchronous operations. + /** Cancel all pending asynchronous wait operations. All outstanding operations complete with an error code that compares equal to `capy::cond::canceled`. + + @return The number of operations that were cancelled. + */ + std::size_t cancel(); + + /** Cancel one pending asynchronous wait operation. + + The oldest pending wait is cancelled (FIFO order). It + completes with an error code that compares equal to + `capy::cond::canceled`. + + @return The number of operations that were cancelled (0 or 1). */ - void cancel(); + std::size_t cancel_one(); - /** Get the timer's expiry time as an absolute time. + /** Return the timer's expiry time as an absolute time. @return The expiry time point. If no expiry has been set, returns a default-constructed time_point. @@ -154,36 +191,47 @@ class BOOST_COROSIO_DECL timer : public io_object Any pending asynchronous wait operations will be cancelled. @param t The expiry time to be used for the timer. + + @return The number of pending operations that were cancelled. */ - void expires_at(time_point t); + std::size_t expires_at(time_point t); /** Set the timer's expiry time relative to now. Any pending asynchronous wait operations will be cancelled. @param d The expiry time relative to now. + + @return The number of pending operations that were cancelled. */ - void expires_after(duration d); + std::size_t expires_after(duration d); /** Set the timer's expiry time relative to now. This is a convenience overload that accepts any duration type - and converts it to the timer's native duration type. + and converts it to the timer's native duration type. Any + pending asynchronous wait operations will be cancelled. @param d The expiry time relative to now. + + @return The number of pending operations that were cancelled. */ template - void expires_after(std::chrono::duration d) + std::size_t expires_after(std::chrono::duration d) { - expires_after(std::chrono::duration_cast(d)); + return expires_after(std::chrono::duration_cast(d)); } /** Wait for the timer to expire. + Multiple coroutines may wait on the same timer concurrently. + When the timer expires, all waiters complete with success. + The operation supports cancellation via `std::stop_token` through the affine awaitable protocol. If the associated stop token is - triggered, the operation completes immediately with an error - that compares equal to `capy::cond::canceled`. + triggered, only that waiter completes with an error that + compares equal to `capy::cond::canceled`; other waiters are + unaffected. @par Example @code diff --git a/src/corosio/src/detail/dispatch_coro.hpp b/src/corosio/src/detail/dispatch_coro.hpp index 84140327..efa99bb8 100644 --- a/src/corosio/src/detail/dispatch_coro.hpp +++ b/src/corosio/src/detail/dispatch_coro.hpp @@ -1,5 +1,6 @@ // // Copyright (c) 2026 Vinnie Falco (vinnie.falco@gmail.com) +// Copyright (c) 2026 Steve Gerbino // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -37,7 +38,7 @@ dispatch_coro( capy::executor_ref ex, std::coroutine_handle<> h) { - if (&ex.type_id() == &capy::detail::type_id()) + if ( ex.target< basic_io_context::executor_type >() ) return h; return ex.dispatch(h); } diff --git a/src/corosio/src/detail/epoll/scheduler.cpp b/src/corosio/src/detail/epoll/scheduler.cpp index d2ad481c..abca8fd8 100644 --- a/src/corosio/src/detail/epoll/scheduler.cpp +++ b/src/corosio/src/detail/epoll/scheduler.cpp @@ -13,6 +13,7 @@ #include "src/detail/epoll/scheduler.hpp" #include "src/detail/epoll/op.hpp" +#include "src/detail/timer_service.hpp" #include "src/detail/make_err.hpp" #include "src/detail/posix/resolver_service.hpp" #include "src/detail/posix/signals.hpp" diff --git a/src/corosio/src/detail/epoll/scheduler.hpp b/src/corosio/src/detail/epoll/scheduler.hpp index e51833a3..454d633f 100644 --- a/src/corosio/src/detail/epoll/scheduler.hpp +++ b/src/corosio/src/detail/epoll/scheduler.hpp @@ -15,11 +15,10 @@ #if BOOST_COROSIO_HAS_EPOLL #include -#include #include +#include "src/detail/scheduler_impl.hpp" #include "src/detail/scheduler_op.hpp" -#include "src/detail/timer_service.hpp" #include #include @@ -53,7 +52,7 @@ struct scheduler_context; All public member functions are thread-safe. */ class epoll_scheduler - : public scheduler + : public scheduler_impl , public capy::execution_context::service { public: @@ -255,7 +254,6 @@ class epoll_scheduler mutable std::atomic outstanding_work_; bool stopped_; bool shutdown_; - timer_service* timer_svc_ = nullptr; // True while a thread is blocked in epoll_wait. Used by // wake_one_thread_and_unlock and work_finished to know when diff --git a/src/corosio/src/detail/iocp/scheduler.hpp b/src/corosio/src/detail/iocp/scheduler.hpp index ae44320f..cc784874 100644 --- a/src/corosio/src/detail/iocp/scheduler.hpp +++ b/src/corosio/src/detail/iocp/scheduler.hpp @@ -15,8 +15,9 @@ #if BOOST_COROSIO_HAS_IOCP #include -#include #include + +#include "src/detail/scheduler_impl.hpp" #include #include "src/detail/scheduler_op.hpp" @@ -34,10 +35,9 @@ namespace boost::corosio::detail { // Forward declarations struct overlapped_op; class win_timers; -class timer_service; class win_scheduler - : public scheduler + : public scheduler_impl , public capy::execution_context::service { public: @@ -90,7 +90,6 @@ class win_scheduler mutable win_mutex dispatch_mutex_; mutable op_queue completed_ops_; std::unique_ptr timers_; - timer_service* timer_svc_ = nullptr; }; } // namespace boost::corosio::detail diff --git a/src/corosio/src/detail/kqueue/scheduler.cpp b/src/corosio/src/detail/kqueue/scheduler.cpp index 6be398dc..915aee9d 100644 --- a/src/corosio/src/detail/kqueue/scheduler.cpp +++ b/src/corosio/src/detail/kqueue/scheduler.cpp @@ -13,6 +13,7 @@ #include "src/detail/kqueue/scheduler.hpp" #include "src/detail/kqueue/op.hpp" +#include "src/detail/timer_service.hpp" #include "src/detail/make_err.hpp" #include "src/detail/posix/resolver_service.hpp" #include "src/detail/posix/signals.hpp" diff --git a/src/corosio/src/detail/kqueue/scheduler.hpp b/src/corosio/src/detail/kqueue/scheduler.hpp index 67cd6344..b2a7b29a 100644 --- a/src/corosio/src/detail/kqueue/scheduler.hpp +++ b/src/corosio/src/detail/kqueue/scheduler.hpp @@ -15,11 +15,10 @@ #if BOOST_COROSIO_HAS_KQUEUE #include -#include #include +#include "src/detail/scheduler_impl.hpp" #include "src/detail/scheduler_op.hpp" -#include "src/detail/timer_service.hpp" #include #include @@ -57,7 +56,7 @@ struct scheduler_context; All public member functions are thread-safe. */ class kqueue_scheduler - : public scheduler + : public scheduler_impl , public capy::execution_context::service { public: @@ -273,7 +272,6 @@ class kqueue_scheduler mutable std::atomic outstanding_work_{0}; std::atomic stopped_{false}; bool shutdown_ = false; - timer_service* timer_svc_ = nullptr; // True while a thread is blocked in kevent(). Used by // wake_one_thread_and_unlock and work_finished to know when diff --git a/src/corosio/src/detail/scheduler_impl.hpp b/src/corosio/src/detail/scheduler_impl.hpp new file mode 100644 index 00000000..2d301f04 --- /dev/null +++ b/src/corosio/src/detail/scheduler_impl.hpp @@ -0,0 +1,28 @@ +// +// Copyright (c) 2026 Steve Gerbino +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + +#ifndef BOOST_COROSIO_SRC_DETAIL_SCHEDULER_IMPL_HPP +#define BOOST_COROSIO_SRC_DETAIL_SCHEDULER_IMPL_HPP + +#include + +namespace boost::corosio::detail { + +class timer_service; + +// Intermediary between public scheduler and concrete backends, +// holds cached service pointers behind the compilation firewall +struct scheduler_impl : scheduler +{ + timer_service* timer_svc_ = nullptr; +}; + +} // namespace boost::corosio::detail + +#endif diff --git a/src/corosio/src/detail/select/scheduler.cpp b/src/corosio/src/detail/select/scheduler.cpp index 96f7ccaf..6542daa3 100644 --- a/src/corosio/src/detail/select/scheduler.cpp +++ b/src/corosio/src/detail/select/scheduler.cpp @@ -13,6 +13,7 @@ #include "src/detail/select/scheduler.hpp" #include "src/detail/select/op.hpp" +#include "src/detail/timer_service.hpp" #include "src/detail/make_err.hpp" #include "src/detail/posix/resolver_service.hpp" #include "src/detail/posix/signals.hpp" diff --git a/src/corosio/src/detail/select/scheduler.hpp b/src/corosio/src/detail/select/scheduler.hpp index 0c003daf..ad68b018 100644 --- a/src/corosio/src/detail/select/scheduler.hpp +++ b/src/corosio/src/detail/select/scheduler.hpp @@ -15,11 +15,10 @@ #if BOOST_COROSIO_HAS_SELECT #include -#include #include +#include "src/detail/scheduler_impl.hpp" #include "src/detail/scheduler_op.hpp" -#include "src/detail/timer_service.hpp" #include @@ -58,7 +57,7 @@ struct select_op; All public member functions are thread-safe. */ class select_scheduler - : public scheduler + : public scheduler_impl , public capy::execution_context::service { public: @@ -146,7 +145,6 @@ class select_scheduler mutable std::atomic outstanding_work_; std::atomic stopped_; bool shutdown_; - timer_service* timer_svc_ = nullptr; // Per-fd state for tracking registered operations struct fd_state diff --git a/src/corosio/src/detail/timer_service.cpp b/src/corosio/src/detail/timer_service.cpp index fdc545c8..a79f1309 100644 --- a/src/corosio/src/detail/timer_service.cpp +++ b/src/corosio/src/detail/timer_service.cpp @@ -8,17 +8,21 @@ // #include "src/detail/timer_service.hpp" +#include "src/detail/scheduler_impl.hpp" -#include +#include +#include #include "src/detail/scheduler_op.hpp" +#include "src/detail/intrusive.hpp" #include #include #include +#include #include #include #include -#include +#include #include #include @@ -32,13 +36,18 @@ Data Structures --------------- - timer_impl holds per-timer state: expiry, coroutine handle, - executor, embedded completion_op, heap index, and free-list link. + waiter_node holds per-waiter state: coroutine handle, executor, + error output, stop_token, embedded completion_op. Each concurrent + co_await t.wait() allocates one waiter_node. - timer_service_impl owns a min-heap of active timers and a free - list of recycled impls. The heap is ordered by expiry time; the - scheduler queries nearest_expiry() to set the epoll/timerfd - timeout. + timer_impl holds per-timer state: expiry, heap index, and an + intrusive_list of waiter_nodes. Multiple coroutines can wait on + the same timer simultaneously. + + timer_service_impl owns a min-heap of active timers, a free list + of recycled impls, and a free list of recycled waiter_nodes. The + heap is ordered by expiry time; the scheduler queries + nearest_expiry() to set the epoll/timerfd timeout. Optimization Strategy --------------------- @@ -59,46 +68,60 @@ 2. Thread-local impl cache — A single-slot per-thread cache of timer_impl avoids the mutex on create/destroy for the common - create-then-destroy-on-same-thread pattern. The RAII wrapper - tl_impl_cache deletes the cached impl when the thread exits. - - 3. Thread-local service cache — Caches the {context, service} - pair per-thread to skip find_service() on every timer - construction. + create-then-destroy-on-same-thread pattern. On pop, if the + cached impl's svc_ doesn't match the current service, the + stale impl is deleted eagerly rather than reused. - 4. Embedded completion_op — timer_impl embeds a scheduler_op - subclass, eliminating heap allocation per fire/cancel. Its - destroy() is a no-op since the timer_impl owns the lifetime. + 3. Embedded completion_op — Each waiter_node embeds a + scheduler_op subclass, eliminating heap allocation per + fire/cancel. Its destroy() is a no-op since the waiter_node + owns the lifetime. - 5. Cached nearest expiry — An atomic mirrors the heap + 4. Cached nearest expiry — An atomic mirrors the heap root's time, updated under the lock. nearest_expiry() and empty() read the atomic without locking. - 6. might_have_pending_waits_ flag — Set on wait(), cleared on + 5. might_have_pending_waits_ flag — Set on wait(), cleared on cancel. Lets cancel_timer() return without locking when no wait was ever issued. + 6. Thread-local waiter cache — Single-slot per-thread cache of + waiter_node avoids the free-list mutex for the common + wait-then-complete-on-same-thread pattern. + With all fast paths hit (idle timer, same thread), the schedule/cancel cycle takes zero mutex locks. + + Concurrency + ----------- + stop_token callbacks can fire from any thread. The impl_ + pointer on waiter_node is used as a "still in list" marker: + set to nullptr under the mutex when a waiter is removed by + cancel_timer() or process_expired(). cancel_waiter() checks + this under the mutex to avoid double-removal races. + + Multiple io_contexts in the same program are safe. The + service pointer is obtained directly from the scheduler, + and TL-cached impls are validated by comparing svc_ against + the current service pointer. Waiter nodes have no service + affinity and can safely migrate between contexts. */ namespace boost::corosio::detail { class timer_service_impl; +struct timer_impl; +struct waiter_node; void timer_service_invalidate_cache() noexcept; -struct timer_impl - : timer::timer_impl +struct waiter_node + : intrusive_list::node { - using clock_type = std::chrono::steady_clock; - using time_point = clock_type::time_point; - using duration = clock_type::duration; - // Embedded completion op — avoids heap allocation per fire/cancel struct completion_op final : scheduler_op { - timer_impl* impl_ = nullptr; + waiter_node* waiter_ = nullptr; static void do_complete( void* owner, @@ -112,34 +135,54 @@ struct timer_impl } void operator()() override; - // No-op — lifetime owned by timer_impl, not the scheduler queue + // No-op — lifetime owned by waiter_node, not the scheduler queue void destroy() override {} }; - timer_service_impl* svc_ = nullptr; - time_point expiry_; - std::size_t heap_index_ = (std::numeric_limits::max)(); - // Lets cancel_timer() skip the lock when no wait() was ever issued - bool might_have_pending_waits_ = false; + // Per-waiter stop_token cancellation + struct canceller + { + waiter_node* waiter_; + void operator()() const; + }; - // Wait operation state + // nullptr once removed from timer's waiter list (concurrency marker) + timer_impl* impl_ = nullptr; + timer_service_impl* svc_ = nullptr; std::coroutine_handle<> h_; capy::executor_ref d_; std::error_code* ec_out_ = nullptr; std::stop_token token_; - bool waiting_ = false; - + std::optional> stop_cb_; completion_op op_; std::error_code ec_value_; + waiter_node* next_free_ = nullptr; + + waiter_node() noexcept + { + op_.waiter_ = this; + } +}; + +struct timer_impl + : timer::timer_impl +{ + using clock_type = std::chrono::steady_clock; + using time_point = clock_type::time_point; + using duration = clock_type::duration; + + timer_service_impl* svc_ = nullptr; + time_point expiry_; + std::size_t heap_index_ = (std::numeric_limits::max)(); + // Lets cancel_timer() skip the lock when no wait() was ever issued + bool might_have_pending_waits_ = false; + intrusive_list waiters_; // Free list linkage (reused when impl is on free_list) timer_impl* next_free_ = nullptr; - explicit timer_impl(timer_service_impl& svc) noexcept - : svc_(&svc) - { - op_.impl_ = this; - } + explicit timer_impl(timer_service_impl& svc) noexcept; + void release() override; @@ -152,6 +195,8 @@ struct timer_impl timer_impl* try_pop_tl_cache(timer_service_impl*) noexcept; bool try_push_tl_cache(timer_impl*) noexcept; +waiter_node* try_pop_waiter_tl_cache() noexcept; +bool try_push_waiter_tl_cache(waiter_node*) noexcept; class timer_service_impl : public timer_service { @@ -171,8 +216,7 @@ class timer_service_impl : public timer_service mutable std::mutex mutex_; std::vector heap_; timer_impl* free_list_ = nullptr; - // Tracks impls not on free-list, for shutdown correctness - std::size_t live_count_ = 0; + waiter_node* waiter_free_list_ = nullptr; callback on_earliest_changed_; // Avoids mutex in nearest_expiry() and empty() mutable std::atomic cached_nearest_ns_{ @@ -205,15 +249,15 @@ class timer_service_impl : public timer_service for (auto& entry : heap_) { auto* impl = entry.timer_; - if (impl->waiting_) + while (auto* w = impl->waiters_.pop_front()) { - impl->waiting_ = false; - impl->h_.destroy(); + w->stop_cb_.reset(); + w->h_.destroy(); sched_->on_work_finished(); + delete w; } impl->heap_index_ = (std::numeric_limits::max)(); delete impl; - --live_count_; } heap_.clear(); cached_nearest_ns_.store( @@ -228,9 +272,13 @@ class timer_service_impl : public timer_service free_list_ = next; } - // Any live timers not in heap and not on free list are still - // referenced by timer objects — they'll call destroy_impl() - // which will delete them (live_count_ tracks this). + // Delete free-listed waiters + while (waiter_free_list_) + { + auto* next = waiter_free_list_->next_free_; + delete waiter_free_list_; + waiter_free_list_ = next; + } } timer::timer_impl* create_impl() override @@ -250,6 +298,7 @@ class timer_service_impl : public timer_service impl = free_list_; free_list_ = impl->next_free_; impl->next_free_ = nullptr; + impl->svc_ = this; impl->heap_index_ = (std::numeric_limits::max)(); impl->might_have_pending_waits_ = false; } @@ -257,12 +306,13 @@ class timer_service_impl : public timer_service { impl = new timer_impl(*this); } - ++live_count_; return impl; } void destroy_impl(timer_impl& impl) { + cancel_timer(impl); + if (impl.heap_index_ != (std::numeric_limits::max)()) { std::lock_guard lock(mutex_); @@ -276,27 +326,53 @@ class timer_service_impl : public timer_service std::lock_guard lock(mutex_); impl.next_free_ = free_list_; free_list_ = &impl; - --live_count_; + } + + waiter_node* create_waiter() + { + if (auto* w = try_pop_waiter_tl_cache()) + return w; + + std::lock_guard lock(mutex_); + if (waiter_free_list_) + { + auto* w = waiter_free_list_; + waiter_free_list_ = w->next_free_; + w->next_free_ = nullptr; + return w; + } + + return new waiter_node(); + } + + void destroy_waiter(waiter_node* w) + { + if (try_push_waiter_tl_cache(w)) + return; + + std::lock_guard lock(mutex_); + w->next_free_ = waiter_free_list_; + waiter_free_list_ = w; } // Heap insertion deferred to wait() — avoids lock when timer is idle - void update_timer(timer_impl& impl, time_point new_time) + std::size_t update_timer(timer_impl& impl, time_point new_time) { bool in_heap = (impl.heap_index_ != (std::numeric_limits::max)()); - if (!in_heap && !impl.waiting_) - return; + if (!in_heap && impl.waiters_.empty()) + return 0; bool notify = false; - bool was_waiting = false; + intrusive_list canceled; { std::lock_guard lock(mutex_); - if (impl.waiting_) + while (auto* w = impl.waiters_.pop_front()) { - was_waiting = true; - impl.waiting_ = false; + w->impl_ = nullptr; + canceled.push_back(w); } if (impl.heap_index_ < heap_.size()) @@ -315,65 +391,128 @@ class timer_service_impl : public timer_service refresh_cached_nearest(); } - if (was_waiting) + std::size_t count = 0; + while (auto* w = canceled.pop_front()) { - impl.ec_value_ = make_error_code(capy::error::canceled); - sched_->post(&impl.op_); + w->ec_value_ = make_error_code(capy::error::canceled); + sched_->post(&w->op_); + ++count; } if (notify) on_earliest_changed_(); + + return count; } - // Called from wait() when timer hasn't been inserted into the heap yet - void insert_timer(timer_impl& impl) + // Inserts timer into heap if needed and pushes waiter, all under + // one lock to prevent races with cancel_waiter/process_expired + void insert_waiter(timer_impl& impl, waiter_node* w) { bool notify = false; { std::lock_guard lock(mutex_); - impl.heap_index_ = heap_.size(); - heap_.push_back({impl.expiry_, &impl}); - up_heap(heap_.size() - 1); - notify = (impl.heap_index_ == 0); - refresh_cached_nearest(); + if (impl.heap_index_ == (std::numeric_limits::max)()) + { + impl.heap_index_ = heap_.size(); + heap_.push_back({impl.expiry_, &impl}); + up_heap(heap_.size() - 1); + notify = (impl.heap_index_ == 0); + refresh_cached_nearest(); + } + impl.waiters_.push_back(w); } if (notify) on_earliest_changed_(); } - void cancel_timer(timer_impl& impl) + std::size_t cancel_timer(timer_impl& impl) { if (!impl.might_have_pending_waits_) - return; + return 0; - // Not in heap and not waiting — just clear the flag + // Not in heap and no waiters — just clear the flag if (impl.heap_index_ == (std::numeric_limits::max)() - && !impl.waiting_) + && impl.waiters_.empty()) { impl.might_have_pending_waits_ = false; - return; + return 0; } - bool was_waiting = false; + intrusive_list canceled; { std::lock_guard lock(mutex_); remove_timer_impl(impl); - if (impl.waiting_) + while (auto* w = impl.waiters_.pop_front()) { - was_waiting = true; - impl.waiting_ = false; + w->impl_ = nullptr; + canceled.push_back(w); } refresh_cached_nearest(); } impl.might_have_pending_waits_ = false; - if (was_waiting) + std::size_t count = 0; + while (auto* w = canceled.pop_front()) { - impl.ec_value_ = make_error_code(capy::error::canceled); - sched_->post(&impl.op_); + w->ec_value_ = make_error_code(capy::error::canceled); + sched_->post(&w->op_); + ++count; } + + return count; + } + + // Cancel a single waiter (called from stop_token callback, any thread) + void cancel_waiter(waiter_node* w) + { + { + std::lock_guard lock(mutex_); + // Already removed by cancel_timer or process_expired + if (!w->impl_) + return; + auto* impl = w->impl_; + w->impl_ = nullptr; + impl->waiters_.remove(w); + if (impl->waiters_.empty()) + { + remove_timer_impl(*impl); + impl->might_have_pending_waits_ = false; + } + refresh_cached_nearest(); + } + + w->ec_value_ = make_error_code(capy::error::canceled); + sched_->post(&w->op_); + } + + // Cancel front waiter only (FIFO), return 0 or 1 + std::size_t cancel_one_waiter(timer_impl& impl) + { + if (!impl.might_have_pending_waits_) + return 0; + + waiter_node* w = nullptr; + + { + std::lock_guard lock(mutex_); + w = impl.waiters_.pop_front(); + if (!w) + return 0; + w->impl_ = nullptr; + if (impl.waiters_.empty()) + { + remove_timer_impl(impl); + impl.might_have_pending_waits_ = false; + } + refresh_cached_nearest(); + } + + w->ec_value_ = make_error_code(capy::error::canceled); + sched_->post(&w->op_); + return 1; } bool empty() const noexcept override @@ -390,7 +529,7 @@ class timer_service_impl : public timer_service std::size_t process_expired() override { - std::vector expired; + intrusive_list expired; { std::lock_guard lock(mutex_); @@ -400,22 +539,26 @@ class timer_service_impl : public timer_service { timer_impl* t = heap_[0].timer_; remove_timer_impl(*t); - - if (t->waiting_) + while (auto* w = t->waiters_.pop_front()) { - t->waiting_ = false; - t->ec_value_ = {}; - expired.push_back(t); + w->impl_ = nullptr; + w->ec_value_ = {}; + expired.push_back(w); } + t->might_have_pending_waits_ = false; } refresh_cached_nearest(); } - for (auto* t : expired) - sched_->post(&t->op_); + std::size_t count = 0; + while (auto* w = expired.pop_front()) + { + sched_->post(&w->op_); + ++count; + } - return expired.size(); + return count; } private: @@ -493,8 +636,21 @@ class timer_service_impl : public timer_service } }; +timer_impl:: +timer_impl(timer_service_impl& svc) noexcept + : svc_(&svc) +{ +} + +void +waiter_node::canceller:: +operator()() const +{ + waiter_->svc_->cancel_waiter(waiter_); +} + void -timer_impl::completion_op:: +waiter_node::completion_op:: do_complete( void* owner, scheduler_op* base, @@ -507,15 +663,22 @@ do_complete( } void -timer_impl::completion_op:: +waiter_node::completion_op:: operator()() { - auto* impl = impl_; - if (impl->ec_out_) - *impl->ec_out_ = impl->ec_value_; + auto* w = waiter_; + w->stop_cb_.reset(); + if (w->ec_out_) + *w->ec_out_ = w->ec_value_; + + auto h = w->h_; + auto d = w->d_; + auto* svc = w->svc_; + auto& sched = svc->get_scheduler(); + + svc->destroy_waiter(w); - auto& sched = impl->svc_->get_scheduler(); - impl->d_.post(impl->h_); + d.post(h); sched.on_work_finished(); } @@ -534,54 +697,63 @@ wait( std::stop_token token, std::error_code* ec) { + // Already-expired fast path — no waiter_node, no mutex if (heap_index_ == (std::numeric_limits::max)()) { if (expiry_ <= clock_type::now()) { if (ec) *ec = {}; - d.post(h); - return std::noop_coroutine(); + return d.dispatch(h); } - - svc_->insert_timer(*this); } - h_ = h; - d_ = std::move(d); - token_ = std::move(token); - ec_out_ = ec; - waiting_ = true; + auto* w = svc_->create_waiter(); + w->impl_ = this; + w->svc_ = svc_; + w->h_ = h; + w->d_ = std::move(d); + w->token_ = std::move(token); + w->ec_out_ = ec; + + svc_->insert_waiter(*this, w); might_have_pending_waits_ = true; svc_->get_scheduler().on_work_started(); + + if (w->token_.stop_possible()) + w->stop_cb_.emplace(w->token_, waiter_node::canceller{w}); + return std::noop_coroutine(); } // Extern free functions called from timer.cpp // -// Thread-local caches invalidated by timer_service_invalidate_cache() -// during shutdown. The service cache avoids find_service overhead per -// timer construction. The impl cache avoids the free-list mutex for -// the common create-then-destroy-on-same-thread pattern. -static thread_local capy::execution_context* cached_ctx = nullptr; -static thread_local timer_service_impl* cached_svc = nullptr; - -// RAII wrapper deletes the cached impl when the thread exits -struct tl_impl_cache -{ - timer_impl* ptr = nullptr; - ~tl_impl_cache() { delete ptr; } -}; -static thread_local tl_impl_cache tl_cached_impl; +// Two thread-local caches avoid hot-path mutex acquisitions: +// +// 1. Impl cache — single-slot, validated by comparing svc_ on the +// impl against the current service pointer. +// +// 2. Waiter cache — single-slot, no service affinity. +// +// The service pointer is obtained from the scheduler_impl's +// timer_svc_ member, avoiding find_service() on the hot path. +// All caches are cleared by timer_service_invalidate_cache() +// during shutdown. + +thread_local_ptr tl_cached_impl; +thread_local_ptr tl_cached_waiter; timer_impl* try_pop_tl_cache(timer_service_impl* svc) noexcept { - if (tl_cached_impl.ptr && tl_cached_impl.ptr->svc_ == svc) + auto* impl = tl_cached_impl.get(); + if (impl) { - auto* impl = tl_cached_impl.ptr; - tl_cached_impl.ptr = nullptr; - return impl; + tl_cached_impl.set(nullptr); + if (impl->svc_ == svc) + return impl; + // Stale impl from a destroyed service + delete impl; } return nullptr; } @@ -589,9 +761,32 @@ try_pop_tl_cache(timer_service_impl* svc) noexcept bool try_push_tl_cache(timer_impl* impl) noexcept { - if (!tl_cached_impl.ptr) + if (!tl_cached_impl.get()) { - tl_cached_impl.ptr = impl; + tl_cached_impl.set(impl); + return true; + } + return false; +} + +waiter_node* +try_pop_waiter_tl_cache() noexcept +{ + auto* w = tl_cached_waiter.get(); + if (w) + { + tl_cached_waiter.set(nullptr); + return w; + } + return nullptr; +} + +bool +try_push_waiter_tl_cache(waiter_node* w) noexcept +{ + if (!tl_cached_waiter.get()) + { + tl_cached_waiter.set(w); return true; } return false; @@ -600,24 +795,32 @@ try_push_tl_cache(timer_impl* impl) noexcept void timer_service_invalidate_cache() noexcept { - cached_ctx = nullptr; - cached_svc = nullptr; - delete tl_cached_impl.ptr; - tl_cached_impl.ptr = nullptr; + delete tl_cached_impl.get(); + tl_cached_impl.set(nullptr); + + delete tl_cached_waiter.get(); + tl_cached_waiter.set(nullptr); } -timer::timer_impl* -timer_service_create(capy::execution_context& ctx) +struct timer_service_access { - if (cached_ctx != &ctx) + static scheduler_impl& get_scheduler(basic_io_context& ctx) noexcept { - cached_svc = static_cast( - ctx.find_service()); - if (!cached_svc) - throw std::runtime_error("timer_service not found"); - cached_ctx = &ctx; + return static_cast(*ctx.sched_); } - return cached_svc->create_impl(); +}; + +timer::timer_impl* +timer_service_create(capy::execution_context& ctx) +{ + if (!ctx.target()) + detail::throw_logic_error(); + auto& ioctx = static_cast(ctx); + auto* svc = static_cast( + timer_service_access::get_scheduler(ioctx).timer_svc_); + if (!svc) + detail::throw_logic_error(); + return svc->create_impl(); } void @@ -632,27 +835,34 @@ timer_service_expiry(timer::timer_impl& base) noexcept return static_cast(base).expiry_; } -void +std::size_t timer_service_expires_at(timer::timer_impl& base, timer::time_point t) { auto& impl = static_cast(base); impl.expiry_ = t; - impl.svc_->update_timer(impl, t); + return impl.svc_->update_timer(impl, t); } -void +std::size_t timer_service_expires_after(timer::timer_impl& base, timer::duration d) { auto& impl = static_cast(base); impl.expiry_ = timer::clock_type::now() + d; - impl.svc_->update_timer(impl, impl.expiry_); + return impl.svc_->update_timer(impl, impl.expiry_); } -void +std::size_t timer_service_cancel(timer::timer_impl& base) noexcept { auto& impl = static_cast(base); - impl.svc_->cancel_timer(impl); + return impl.svc_->cancel_timer(impl); +} + +std::size_t +timer_service_cancel_one(timer::timer_impl& base) noexcept +{ + auto& impl = static_cast(base); + return impl.svc_->cancel_one_waiter(impl); } timer_service& diff --git a/src/corosio/src/timer.cpp b/src/corosio/src/timer.cpp index 600ef758..68bd51a5 100644 --- a/src/corosio/src/timer.cpp +++ b/src/corosio/src/timer.cpp @@ -1,5 +1,6 @@ // // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) +// Copyright (c) 2026 Steve Gerbino // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -19,9 +20,10 @@ namespace detail { extern timer::timer_impl* timer_service_create(capy::execution_context&); extern void timer_service_destroy(timer::timer_impl&) noexcept; extern timer::time_point timer_service_expiry(timer::timer_impl&) noexcept; -extern void timer_service_expires_at(timer::timer_impl&, timer::time_point); -extern void timer_service_expires_after(timer::timer_impl&, timer::duration); -extern void timer_service_cancel(timer::timer_impl&) noexcept; +extern std::size_t timer_service_expires_at(timer::timer_impl&, timer::time_point); +extern std::size_t timer_service_expires_after(timer::timer_impl&, timer::duration); +extern std::size_t timer_service_cancel(timer::timer_impl&) noexcept; +extern std::size_t timer_service_cancel_one(timer::timer_impl&) noexcept; } // namespace detail @@ -39,6 +41,13 @@ timer(capy::execution_context& ctx) impl_ = detail::timer_service_create(ctx); } +timer:: +timer(capy::execution_context& ctx, time_point t) + : timer(ctx) +{ + expires_at(t); +} + timer:: timer(timer&& other) noexcept : io_object(other.context()) @@ -64,11 +73,18 @@ operator=(timer&& other) return *this; } -void +std::size_t timer:: cancel() { - detail::timer_service_cancel(get()); + return detail::timer_service_cancel(get()); +} + +std::size_t +timer:: +cancel_one() +{ + return detail::timer_service_cancel_one(get()); } timer::time_point @@ -78,18 +94,18 @@ expiry() const return detail::timer_service_expiry(get()); } -void +std::size_t timer:: expires_at(time_point t) { - detail::timer_service_expires_at(get(), t); + return detail::timer_service_expires_at(get(), t); } -void +std::size_t timer:: expires_after(duration d) { - detail::timer_service_expires_after(get(), d); + return detail::timer_service_expires_after(get(), d); } } // namespace boost::corosio diff --git a/test/unit/timer.cpp b/test/unit/timer.cpp index 384b288c..83e3fdb5 100644 --- a/test/unit/timer.cpp +++ b/test/unit/timer.cpp @@ -1,5 +1,6 @@ // // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) +// Copyright (c) 2026 Steve Gerbino // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) @@ -15,6 +16,8 @@ #include #include +#include +#include #include "context.hpp" #include "test_suite.hpp" @@ -44,6 +47,28 @@ struct timer_test BOOST_TEST_PASS(); } + void + testConstructionWithTimePoint() + { + Context ioc; + auto tp = timer::clock_type::now() + std::chrono::seconds(10); + timer t(ioc, tp); + + BOOST_TEST(t.expiry() == tp); + } + + void + testConstructionWithDuration() + { + Context ioc; + auto before = timer::clock_type::now(); + timer t(ioc, std::chrono::milliseconds(500)); + auto after = timer::clock_type::now(); + + BOOST_TEST(t.expiry() >= before + std::chrono::milliseconds(500)); + BOOST_TEST(t.expiry() <= after + std::chrono::milliseconds(500)); + } + void testMoveConstruct() { @@ -368,6 +393,66 @@ struct timer_test BOOST_TEST(result_ec == capy::cond::canceled); } + void + testStopTokenCancellation() + { + // A pending timer wait should be cancelled when its stop_token + // is signaled after the wait has already suspended. + Context ioc; + timer t(ioc); + timer delay(ioc); + + std::stop_source stop_src; + bool wait_done = false; + bool failsafe_hit = false; + std::error_code wait_ec; + + t.expires_after(std::chrono::seconds(60)); + + // Waiter task — bound to stop_token + auto wait_task = [&]() -> capy::task<> + { + auto [ec] = co_await t.wait(); + wait_ec = ec; + wait_done = true; + }; + + // Canceller — short delay then signal stop + auto canceller_task = [&]() -> capy::task<> + { + delay.expires_after(std::chrono::milliseconds(10)); + (void)co_await delay.wait(); + stop_src.request_stop(); + }; + + // Failsafe — if stop_token didn't cancel the timer, + // fall back to manual cancel so the test doesn't hang + auto failsafe_task = [&]() -> capy::task<> + { + timer ft(ioc); + ft.expires_after(std::chrono::milliseconds(1000)); + auto [ec] = co_await ft.wait(); + if (!ec && !wait_done) + { + failsafe_hit = true; + t.cancel(); + } + }; + + capy::run_async(ioc.get_executor(), stop_src.get_token())(wait_task()); + capy::run_async(ioc.get_executor())(canceller_task()); + capy::run_async(ioc.get_executor())(failsafe_task()); + + ioc.run(); + + BOOST_TEST(wait_done); + BOOST_TEST(wait_ec == capy::cond::canceled); + + // If the failsafe was hit, stop_token cancellation didn't work — + // only the manual t.cancel() fallback rescued the test. + BOOST_TEST(!failsafe_hit); + } + //-------------------------------------------- // Multiple timer tests //-------------------------------------------- @@ -432,6 +517,366 @@ struct timer_test BOOST_TEST(t2_done); } + //-------------------------------------------- + // Multiple waiters on one timer + //-------------------------------------------- + + void + testMultipleWaiters() + { + Context ioc; + timer t(ioc); + + bool w1 = false, w2 = false, w3 = false; + std::error_code ec1, ec2, ec3; + + t.expires_after(std::chrono::milliseconds(10)); + + auto task = [](timer& t_ref, std::error_code& ec_out, bool& done) -> capy::task<> + { + auto [ec] = co_await t_ref.wait(); + ec_out = ec; + done = true; + }; + + capy::run_async(ioc.get_executor())(task(t, ec1, w1)); + capy::run_async(ioc.get_executor())(task(t, ec2, w2)); + capy::run_async(ioc.get_executor())(task(t, ec3, w3)); + + ioc.run(); + + BOOST_TEST(w1); + BOOST_TEST(w2); + BOOST_TEST(w3); + BOOST_TEST(!ec1); + BOOST_TEST(!ec2); + BOOST_TEST(!ec3); + } + + void + testMultipleWaitersCancelAll() + { + Context ioc; + timer t(ioc); + timer delay(ioc); + + bool w1 = false, w2 = false, w3 = false; + std::error_code ec1, ec2, ec3; + + t.expires_after(std::chrono::seconds(60)); + delay.expires_after(std::chrono::milliseconds(10)); + + auto task = [](timer& t_ref, std::error_code& ec_out, bool& done) -> capy::task<> + { + auto [ec] = co_await t_ref.wait(); + ec_out = ec; + done = true; + }; + + auto cancel_task = [](timer& delay_ref, timer& t_ref) -> capy::task<> + { + (void)co_await delay_ref.wait(); + t_ref.cancel(); + }; + + capy::run_async(ioc.get_executor())(task(t, ec1, w1)); + capy::run_async(ioc.get_executor())(task(t, ec2, w2)); + capy::run_async(ioc.get_executor())(task(t, ec3, w3)); + capy::run_async(ioc.get_executor())(cancel_task(delay, t)); + + ioc.run(); + + BOOST_TEST(w1); + BOOST_TEST(w2); + BOOST_TEST(w3); + BOOST_TEST(ec1 == capy::cond::canceled); + BOOST_TEST(ec2 == capy::cond::canceled); + BOOST_TEST(ec3 == capy::cond::canceled); + } + + void + testMultipleWaitersStopTokenCancelsOne() + { + Context ioc; + timer t(ioc); + timer delay(ioc); + + std::stop_source stop_src; + bool w1 = false, w2 = false; + std::error_code ec1, ec2; + + t.expires_after(std::chrono::milliseconds(500)); + delay.expires_after(std::chrono::milliseconds(10)); + + // w1 has a stop_token — will be cancelled individually + auto wait_task = [&]() -> capy::task<> + { + auto [ec] = co_await t.wait(); + ec1 = ec; + w1 = true; + }; + + // w2 has no stop_token — completes when timer fires + auto wait_task2 = [&]() -> capy::task<> + { + auto [ec] = co_await t.wait(); + ec2 = ec; + w2 = true; + }; + + auto cancel_one = [&]() -> capy::task<> + { + (void)co_await delay.wait(); + stop_src.request_stop(); + }; + + capy::run_async(ioc.get_executor(), stop_src.get_token())(wait_task()); + capy::run_async(ioc.get_executor())(wait_task2()); + capy::run_async(ioc.get_executor())(cancel_one()); + + ioc.run(); + + BOOST_TEST(w1); + BOOST_TEST(w2); + BOOST_TEST(ec1 == capy::cond::canceled); + BOOST_TEST(!ec2); + } + + //-------------------------------------------- + // Destruction cancels pending waiters + //-------------------------------------------- + + void + testDestructionCancelsPendingWaiters() + { + Context ioc; + timer delay(ioc); + + bool w1 = false, w2 = false; + std::error_code ec1, ec2; + + auto t = std::make_unique(ioc); + t->expires_after(std::chrono::seconds(60)); + + delay.expires_after(std::chrono::milliseconds(10)); + + auto wait_task = [](timer& t_ref, std::error_code& ec_out, bool& done) -> capy::task<> + { + auto [ec] = co_await t_ref.wait(); + ec_out = ec; + done = true; + }; + + auto destroy_task = [&]() -> capy::task<> + { + (void)co_await delay.wait(); + t.reset(); + }; + + capy::run_async(ioc.get_executor())(wait_task(*t, ec1, w1)); + capy::run_async(ioc.get_executor())(wait_task(*t, ec2, w2)); + capy::run_async(ioc.get_executor())(destroy_task()); + + ioc.run(); + + BOOST_TEST(w1); + BOOST_TEST(w2); + BOOST_TEST(ec1 == capy::cond::canceled); + BOOST_TEST(ec2 == capy::cond::canceled); + } + + //-------------------------------------------- + // cancel_one() tests + //-------------------------------------------- + + void + testCancelOne() + { + Context ioc; + timer t(ioc); + timer delay(ioc); + + bool w1 = false, w2 = false; + std::error_code ec1, ec2; + + t.expires_after(std::chrono::milliseconds(500)); + delay.expires_after(std::chrono::milliseconds(10)); + + auto wait_task = [](timer& t_ref, std::error_code& ec_out, bool& done) -> capy::task<> + { + auto [ec] = co_await t_ref.wait(); + ec_out = ec; + done = true; + }; + + auto cancel_one_task = [](timer& delay_ref, timer& t_ref) -> capy::task<> + { + (void)co_await delay_ref.wait(); + auto n = t_ref.cancel_one(); + BOOST_TEST_EQ(n, 1u); + }; + + capy::run_async(ioc.get_executor())(wait_task(t, ec1, w1)); + capy::run_async(ioc.get_executor())(wait_task(t, ec2, w2)); + capy::run_async(ioc.get_executor())(cancel_one_task(delay, t)); + + ioc.run(); + + BOOST_TEST(w1); + BOOST_TEST(w2); + // First waiter (FIFO) is cancelled, second fires normally + BOOST_TEST(ec1 == capy::cond::canceled); + BOOST_TEST(!ec2); + } + + void + testCancelOneNoWaiters() + { + Context ioc; + timer t(ioc); + + t.expires_after(std::chrono::seconds(60)); + + auto n = t.cancel_one(); + BOOST_TEST_EQ(n, 0u); + } + + //-------------------------------------------- + // Return value tests + //-------------------------------------------- + + void + testCancelReturnsCount() + { + Context ioc; + timer t(ioc); + timer delay(ioc); + + bool w1 = false, w2 = false, w3 = false; + std::error_code ec1, ec2, ec3; + + t.expires_after(std::chrono::seconds(60)); + delay.expires_after(std::chrono::milliseconds(10)); + + auto wait_task = [](timer& t_ref, std::error_code& ec_out, bool& done) -> capy::task<> + { + auto [ec] = co_await t_ref.wait(); + ec_out = ec; + done = true; + }; + + std::size_t cancel_count = 0; + auto cancel_task = [&](timer& delay_ref, timer& t_ref) -> capy::task<> + { + (void)co_await delay_ref.wait(); + cancel_count = t_ref.cancel(); + }; + + capy::run_async(ioc.get_executor())(wait_task(t, ec1, w1)); + capy::run_async(ioc.get_executor())(wait_task(t, ec2, w2)); + capy::run_async(ioc.get_executor())(wait_task(t, ec3, w3)); + capy::run_async(ioc.get_executor())(cancel_task(delay, t)); + + ioc.run(); + + BOOST_TEST_EQ(cancel_count, 3u); + BOOST_TEST(w1); + BOOST_TEST(w2); + BOOST_TEST(w3); + } + + void + testCancelReturnsZeroNoWaiters() + { + Context ioc; + timer t(ioc); + + t.expires_after(std::chrono::seconds(60)); + auto n = t.cancel(); + BOOST_TEST_EQ(n, 0u); + } + + void + testExpiresAtReturnsCount() + { + Context ioc; + timer t(ioc); + timer delay(ioc); + + bool w1 = false, w2 = false; + std::error_code ec1, ec2; + + t.expires_after(std::chrono::seconds(60)); + delay.expires_after(std::chrono::milliseconds(10)); + + auto wait_task = [](timer& t_ref, std::error_code& ec_out, bool& done) -> capy::task<> + { + auto [ec] = co_await t_ref.wait(); + ec_out = ec; + done = true; + }; + + std::size_t expires_count = 0; + auto reset_task = [&](timer& delay_ref, timer& t_ref) -> capy::task<> + { + (void)co_await delay_ref.wait(); + expires_count = t_ref.expires_at( + timer::clock_type::now() + std::chrono::seconds(30)); + }; + + capy::run_async(ioc.get_executor())(wait_task(t, ec1, w1)); + capy::run_async(ioc.get_executor())(wait_task(t, ec2, w2)); + capy::run_async(ioc.get_executor())(reset_task(delay, t)); + + ioc.run_for(std::chrono::milliseconds(100)); + + BOOST_TEST_EQ(expires_count, 2u); + BOOST_TEST(w1); + BOOST_TEST(w2); + BOOST_TEST(ec1 == capy::cond::canceled); + BOOST_TEST(ec2 == capy::cond::canceled); + } + + void + testExpiresAfterReturnsCount() + { + Context ioc; + timer t(ioc); + timer delay(ioc); + + bool w1 = false, w2 = false; + std::error_code ec1, ec2; + + t.expires_after(std::chrono::seconds(60)); + delay.expires_after(std::chrono::milliseconds(10)); + + auto wait_task = [](timer& t_ref, std::error_code& ec_out, bool& done) -> capy::task<> + { + auto [ec] = co_await t_ref.wait(); + ec_out = ec; + done = true; + }; + + std::size_t expires_count = 0; + auto reset_task = [&](timer& delay_ref, timer& t_ref) -> capy::task<> + { + (void)co_await delay_ref.wait(); + expires_count = t_ref.expires_after(std::chrono::seconds(30)); + }; + + capy::run_async(ioc.get_executor())(wait_task(t, ec1, w1)); + capy::run_async(ioc.get_executor())(wait_task(t, ec2, w2)); + capy::run_async(ioc.get_executor())(reset_task(delay, t)); + + ioc.run_for(std::chrono::milliseconds(100)); + + BOOST_TEST_EQ(expires_count, 2u); + BOOST_TEST(w1); + BOOST_TEST(w2); + BOOST_TEST(ec1 == capy::cond::canceled); + BOOST_TEST(ec2 == capy::cond::canceled); + } + //-------------------------------------------- // Sequential wait tests //-------------------------------------------- @@ -614,6 +1059,8 @@ struct timer_test { // Construction and move semantics testConstruction(); + testConstructionWithTimePoint(); + testConstructionWithDuration(); testMoveConstruct(); testMoveAssign(); testMoveAssignCrossContextThrows(); @@ -637,11 +1084,30 @@ struct timer_test testCancelNoWaiters(); testCancelMultipleTimes(); testExpiresAtCancelsWaiter(); + testStopTokenCancellation(); // Multiple timer tests testMultipleTimersDifferentExpiry(); testMultipleTimersSameExpiry(); + // Multiple waiters on one timer + testMultipleWaiters(); + testMultipleWaitersCancelAll(); + testMultipleWaitersStopTokenCancelsOne(); + + // Destruction cancels pending waiters + testDestructionCancelsPendingWaiters(); + + // cancel_one() tests + testCancelOne(); + testCancelOneNoWaiters(); + + // Return value tests + testCancelReturnsCount(); + testCancelReturnsZeroNoWaiters(); + testExpiresAtReturnsCount(); + testExpiresAfterReturnsCount(); + // Sequential wait tests testSequentialWaits();