diff --git a/doc/modules/ROOT/pages/coroutines/composition.adoc b/doc/modules/ROOT/pages/coroutines/composition.adoc index 563fcc72..72f1dc2a 100644 --- a/doc/modules/ROOT/pages/coroutines/composition.adoc +++ b/doc/modules/ROOT/pages/coroutines/composition.adoc @@ -137,27 +137,27 @@ task<> long_running() == when_any: First-to-Finish Wins -`when_any` is not yet implemented in Capy, but its design is planned: - -* Launch multiple tasks concurrently -* Return when the *first* task completes -* Cancel remaining tasks via stop token -* Return the winning task's result - -The pattern would look like: +`when_any` launches multiple tasks concurrently and returns when the *first* one completes: [source,cpp] ---- -// Planned API (not yet available) -task> example() +#include + +task<> example() { - co_return co_await when_any( + auto [index, result] = co_await when_any( fetch_int(), // task fetch_string() // task ); + // index indicates which task won (0 or 1) + // result is std::variant } ---- +The result is a pair containing the winner's index and a deduplicated variant of possible result types. When a winner is determined, stop is requested for all siblings. All tasks complete before `when_any` returns. + +For detailed coverage including error handling, cancellation, and the vector overload, see xref:when-any.adoc[Racing Tasks]. + == Practical Patterns === Parallel Fetch @@ -207,20 +207,26 @@ task process_all(std::vector const& items) === Timeout with Fallback -Use when_any (when available) to implement timeout with fallback: +Use `when_any` to implement timeout with fallback: [source,cpp] ---- -// Planned pattern -task fetch_with_fallback() +task fetch_with_timeout(Request req) { - co_return co_await when_any( - fetch_from_primary(), - delay_then(std::chrono::seconds(5), fetch_from_backup()) + auto [index, result] = co_await when_any( + fetch_data(req), + timeout_after(100ms) ); + + if (index == 1) + throw timeout_error{"Request timed out"}; + + co_return std::get(result); } ---- +The `timeout_after` helper waits for the specified duration then throws. If `fetch_data` completes first, its result is returned. If the timer wins, the timeout exception propagates. + == Implementation Notes === Task Storage @@ -250,6 +256,9 @@ This design ensures proper context propagation to all children. | `` | Concurrent composition with when_all + +| `` +| First-completion racing with when_any |=== -You have now learned how to compose tasks concurrently with `when_all`. In the next section, you will learn about frame allocators for customizing coroutine memory allocation. +You have now learned how to compose tasks concurrently with `when_all` and `when_any`. In the next section, you will learn about frame allocators for customizing coroutine memory allocation. diff --git a/doc/modules/ROOT/pages/coroutines/when-all.adoc b/doc/modules/ROOT/pages/coroutines/when-all.adoc index 1ce34883..dc7cd131 100644 --- a/doc/modules/ROOT/pages/coroutines/when-all.adoc +++ b/doc/modules/ROOT/pages/coroutines/when-all.adoc @@ -242,7 +242,7 @@ Use `when_all` when: Do NOT use `when_all` when: * Operations depend on each other — use sequential `co_await` -* You need results as they complete — consider `when_any` (not yet available) +* You need only the first result — use xref:when-any.adoc[when_any] * Memory is constrained — concurrent tasks consume more memory == Summary @@ -269,6 +269,7 @@ Do NOT use `when_all` when: == Next Steps +* xref:when-any.adoc[Racing Tasks] — Return first completion with `when_any` * xref:cancellation.adoc[Cancellation] — Stop token propagation * xref:../execution/thread-pool.adoc[Thread Pool] — Multi-threaded execution * xref:affinity.adoc[Executor Affinity] — Control where tasks run diff --git a/doc/modules/ROOT/pages/coroutines/when-any.adoc b/doc/modules/ROOT/pages/coroutines/when-any.adoc new file mode 100644 index 00000000..45348b0a --- /dev/null +++ b/doc/modules/ROOT/pages/coroutines/when-any.adoc @@ -0,0 +1,387 @@ +// +// Copyright (c) 2026 Michael Vandeberg +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/capy +// + += Racing Tasks + +In this tutorial, you will learn how to race multiple concurrent tasks using `when_any`, returning as soon as the first task completes. This pattern is essential for implementing timeouts, redundant requests, and speculative execution. + +By the end of this page, you will understand how to use both the variadic and range-based overloads of `when_any`, handle the result types correctly, and manage cancellation of sibling tasks. + +== Prerequisites + +* Completed xref:tasks.adoc[The task Type] +* Completed xref:cancellation.adoc[Stop Tokens and Cancellation] +* Familiarity with `std::variant` and structured bindings + +NOTE: Code snippets assume `using namespace boost::capy;` is in effect. + +== The Problem + +Sometimes you need the result from whichever task finishes first, not all of them. Common scenarios include: + +* Racing requests to multiple servers, using the first response +* Implementing timeouts by racing against a timer +* Speculative execution of multiple algorithms +* Waiting for first available resource from a pool + +== when_any + +The `when_any` function launches multiple tasks concurrently and returns when +the first one completes: + +[source,cpp] +---- +#include + +task race() +{ + auto [index, result] = co_await when_any( + fetch_from_primary(), + fetch_from_backup() + ); +} +---- + +The return value is a `std::pair` containing two elements: `index` indicates which task completed first (0 for the first argument, 1 for the second), and `result` holds the winning task's return value in a variant. + +The winning task's result is returned immediately. All sibling tasks receive +a stop request and are allowed to complete before `when_any` returns. + +== Return Value + +`when_any` returns a `std::pair` containing the winner's index and result. + +=== Heterogeneous Tasks (Variadic) + +When racing tasks with different return types, the result is a variant: + +[source,cpp] +---- +auto [index, result] = co_await when_any( + task_returning_int(), // task + task_returning_string() // task +); + +if (index == 0) + std::cout << "Got int: " << std::get(result) << "\n"; +else + std::cout << "Got string: " << std::get(result) << "\n"; +---- + +The `result` variable is a `std::variant`. Use `index` to determine which alternative is active, then extract the value with `std::get`. + +=== Void Tasks + +Void tasks contribute `std::monostate` to the variant: + +[source,cpp] +---- +auto [index, result] = co_await when_any( + task_returning_int(), // task + task_void() // task +); + +if (index == 0) + std::cout << "Got int: " << std::get(result) << "\n"; +else + std::cout << "Void task completed\n"; +---- + +Tasks returning `void` contribute `std::monostate` to the variant. In this example, `result` has type `std::variant`. When the void task wins, check for `std::monostate` or use the index to detect it. + +=== Duplicate Types + +The variant is deduplicated. When racing tasks with the same return type, +use the index to identify which task won: + +[source,cpp] +---- +auto [index, result] = co_await when_any( + fetch_from_server_a(), // task + fetch_from_server_b(), // task + fetch_from_server_c() // task +); + +auto response = std::get(result); +std::cout << "Server " << index << " responded first\n"; +---- + +When multiple tasks share the same return type, the variant is deduplicated to contain only unique types. Here, `result` is `std::variant` with a single alternative. The `index` value (0, 1, or 2) tells you which server responded first. + +=== Homogeneous Tasks (Vector) + +For a dynamic number of tasks with the same type, use the vector overload: + +[source,cpp] +---- +std::vector> requests; +for (auto& server : servers) + requests.push_back(fetch_from(server)); + +auto [index, response] = co_await when_any(std::move(requests)); +std::cout << "Server " << index << " responded: " << response << "\n"; +---- + +The vector overload accepts any sized input range of awaitables with the same result type. Since all tasks return `Response`, the result is `std::pair` directly—no variant wrapper is needed. + +For void tasks in a vector, only the winner's index is returned: + +[source,cpp] +---- +std::vector> tasks; +// ... populate tasks + +std::size_t winner = co_await when_any(std::move(tasks)); +std::cout << "Task " << winner << " completed first\n"; +---- + +Since void tasks produce no result value, the return type is `std::size_t` rather than a pair. + +== Error Handling + +Exceptions are treated as valid completions. If the winning task throws, +that exception is rethrown from `when_any`: + +[source,cpp] +---- +task handle_errors() +{ + try { + auto [index, result] = co_await when_any( + might_fail(), + might_succeed() + ); + // If we get here, the winner succeeded + } catch (std::exception const& e) { + // The winning task threw this exception + std::cerr << "Winner failed: " << e.what() << "\n"; + } +} +---- + +=== First-Completion Semantics + +Unlike `when_all` (which captures the first _error_), `when_any` returns +whichever task completes first, whether it succeeds or fails. Exceptions +from non-winning tasks are discarded. + +=== Stop Propagation + +When a winner is determined, `when_any` requests stop for all sibling tasks. +Tasks that support cancellation can exit early: + +[source,cpp] +---- +task fetch_with_cancel_support() +{ + auto token = co_await get_stop_token(); + + for (auto& chunk : data_source) + { + if (token.stop_requested()) + co_return partial_response(); // Exit early + co_await send_chunk(chunk); + } + co_return complete_response(); +} + +task example() +{ + // When one fetch wins, the other sees stop_requested + auto [index, response] = co_await when_any( + fetch_with_cancel_support(), + fetch_with_cancel_support() + ); +} +---- + +Tasks that ignore the stop token will run to completion. `when_any` always +waits for all tasks to finish before returning, ensuring proper cleanup. + +== Parent Stop Token + +`when_any` forwards the parent's stop token to children. If the parent is +cancelled, all children see the request: + +[source,cpp] +---- +task parent() +{ + auto [index, result] = co_await when_any( + child_a(), // Sees parent's stop token + child_b() // Sees parent's stop token + ); +} + +std::stop_source source; +run_async(ex, source.get_token())(parent()); + +// Later: cancel everything +source.request_stop(); +---- + +== Execution Model + +All child tasks inherit the parent's executor affinity: + +[source,cpp] +---- +task parent() // Running on executor ex +{ + auto [index, result] = co_await when_any( + child_a(), // Runs on ex + child_b() // Runs on ex + ); +} +---- + +Children are launched via `dispatch()` on the executor, which may run them +inline or queue them depending on the executor implementation. + +=== True Concurrency + +With a multi-threaded executor, tasks race in parallel: + +[source,cpp] +---- +thread_pool pool(4); +run_async(pool.get_executor())(parent()); + +// Tasks may complete in any order based on actual execution time +---- + +With a single-threaded executor, tasks interleave at suspension points but +execute sequentially. + +== Example: Redundant Requests + +Race requests to multiple servers for reliability: + +[source,cpp] +---- +task fetch_with_redundancy(Request req) +{ + auto [index, response] = co_await when_any( + fetch_from(primary_server, req), + fetch_from(backup_server, req) + ); + + std::cout << (index == 0 ? "Primary" : "Backup") + << " server responded\n"; + co_return std::get(response); +} +---- + +== Example: Timeout Pattern + +Race an operation against a timer: + +[source,cpp] +---- +task fetch_with_timeout(Request req) +{ + auto [index, result] = co_await when_any( + fetch_data(req), + timeout_after(100ms) + ); + + if (index == 1) + throw timeout_error{"Request timed out"}; + + co_return std::get(result); +} + +// Helper that waits then throws +template +task timeout_after(std::chrono::milliseconds ms) +{ + co_await sleep(ms); + throw timeout_error{"Timeout"}; +} +---- + +The `timeout_after` helper waits for the specified duration then throws an exception. If `fetch_data` completes before the timer, its result is returned. If the timer wins, the timeout exception propagates from `when_any`. + +== Example: First Available Resource + +Wait for the first available connection from a pool: + +[source,cpp] +---- +task get_connection(std::vector& pools) +{ + std::vector> attempts; + for (auto& pool : pools) + attempts.push_back(pool.acquire()); + + auto [index, conn] = co_await when_any(std::move(attempts)); + + std::cout << "Got connection from pool " << index << "\n"; + co_return conn; +} +---- + +This function creates an acquire task for each pool, then races them. Whichever pool provides a connection first wins, and the remaining acquire attempts are cancelled. The `index` indicates which pool provided the connection. + +== Comparison with when_all + +[cols="1,2,2"] +|=== +| Aspect | `when_all` | `when_any` + +| Completion +| Waits for all tasks +| Returns on first completion + +| Return type +| Tuple of results +| Pair of (index, variant/value) + +| Error handling +| First exception wins, siblings get stop +| Exceptions are valid completions + +| Use case +| Need all results +| Need fastest result +|=== + +== Summary + +[cols="1,3"] +|=== +| Feature | Description + +| `when_any(tasks...)` +| Race tasks, return first completion + +| `when_any(vector>)` +| Race homogeneous tasks from a vector + +| Return type (variadic) +| `pair>` with deduplicated types + +| Return type (vector) +| `pair` or `size_t` for void + +| Error handling +| Winner's exception propagated, others discarded + +| Stop propagation +| Siblings receive stop request on winner + +| Cleanup +| All tasks complete before returning +|=== + +== Next Steps + +* xref:when-all.adoc[Concurrent Composition] — Wait for all tasks +* xref:cancellation.adoc[Cancellation] — Stop token propagation +* xref:../execution/thread-pool.adoc[Thread Pool] — Multi-threaded execution diff --git a/include/boost/capy.hpp b/include/boost/capy.hpp index 7e56d418..10f83990 100644 --- a/include/boost/capy.hpp +++ b/include/boost/capy.hpp @@ -29,6 +29,7 @@ #include #include #include +#include #include // Buffers diff --git a/include/boost/capy/when_all.hpp b/include/boost/capy/when_all.hpp index a1a2129f..6ba4b920 100644 --- a/include/boost/capy/when_all.hpp +++ b/include/boost/capy/when_all.hpp @@ -324,7 +324,11 @@ class when_all_launcher state_->stop_source_.request_stop(); } - // Launch all tasks concurrently + // CRITICAL: If the last task finishes synchronously then the parent + // coroutine resumes, destroying its frame, and destroying this object + // prior to the completion of await_suspend. Therefore, await_suspend + // must ensure `this` cannot be referenced after calling `launch_one` + // for the last time. auto token = state_->stop_source_.get_token(); [&](std::index_sequence) { (..., launch_one(caller_ex, token)); diff --git a/include/boost/capy/when_any.hpp b/include/boost/capy/when_any.hpp new file mode 100644 index 00000000..ce2e8edd --- /dev/null +++ b/include/boost/capy/when_any.hpp @@ -0,0 +1,1017 @@ +// +// Copyright (c) 2026 Michael Vandeberg +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/capy +// + +#ifndef BOOST_CAPY_WHEN_ANY_HPP +#define BOOST_CAPY_WHEN_ANY_HPP + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/* + when_any - Race multiple tasks, return first completion + ======================================================== + + OVERVIEW: + --------- + when_any launches N tasks concurrently and completes when the FIRST task + finishes (success or failure). It then requests stop for all siblings and + waits for them to acknowledge before returning. + + ARCHITECTURE: + ------------- + The design mirrors when_all but with inverted completion semantics: + + when_all: complete when remaining_count reaches 0 (all done) + when_any: complete when has_winner becomes true (first done) + BUT still wait for remaining_count to reach 0 for cleanup + + Key components: + - when_any_state: Shared state tracking winner and completion + - when_any_runner: Wrapper coroutine for each child task + - when_any_launcher: Awaitable that starts all runners concurrently + + CRITICAL INVARIANTS: + -------------------- + 1. Exactly one task becomes the winner (via atomic compare_exchange) + 2. All tasks must complete before parent resumes (cleanup safety) + 3. Stop is requested immediately when winner is determined + 4. Only the winner's result/exception is stored + + TYPE DEDUPLICATION: + ------------------- + std::variant requires unique alternative types. Since when_any can race + tasks with identical return types (e.g., three task), we must + deduplicate types before constructing the variant. + + Example: when_any(task, task, task) + - Raw types after void->monostate: int, string, int + - Deduplicated variant: std::variant + - Return: pair> + + The winner_index tells you which task won (0, 1, or 2), while the variant + holds the result. Use the index to determine how to interpret the variant. + + VOID HANDLING: + -------------- + void tasks contribute std::monostate to the variant (then deduplicated). + All-void tasks result in: pair> + + MEMORY MODEL: + ------------- + Synchronization chain from winner's write to parent's read: + + 1. Winner thread writes result_/winner_exception_ (non-atomic) + 2. Winner thread calls signal_completion() → fetch_sub(acq_rel) on remaining_count_ + 3. Last task thread (may be winner or non-winner) calls signal_completion() + → fetch_sub(acq_rel) on remaining_count_, observing count becomes 0 + 4. Last task returns caller_ex_.dispatch(continuation_) via symmetric transfer + 5. Parent coroutine resumes and reads result_/winner_exception_ + + Synchronization analysis: + - All fetch_sub operations on remaining_count_ form a release sequence + - Winner's fetch_sub releases; subsequent fetch_sub operations participate + in the modification order of remaining_count_ + - Last task's fetch_sub(acq_rel) synchronizes-with prior releases in the + modification order, establishing happens-before from winner's writes + - Executor dispatch() is expected to provide queue-based synchronization + (release-on-post, acquire-on-execute) completing the chain to parent + - Even inline executors work (same thread = sequenced-before) + + Alternative considered: Adding winner_ready_ atomic (set with release after + storing winner data, acquired before reading) would make synchronization + self-contained and not rely on executor implementation details. Current + approach is correct but requires careful reasoning about release sequences + and executor behavior. + + EXCEPTION SEMANTICS: + -------------------- + Unlike when_all (which captures first exception, discards others), when_any + treats exceptions as valid completions. If the winning task threw, that + exception is rethrown. Exceptions from non-winners are silently discarded. +*/ + +namespace boost { +namespace capy { + +namespace detail { + +/** Convert void to monostate for variant storage. + + std::variant is ill-formed, so void tasks contribute + std::monostate to the result variant instead. Non-void types + pass through unchanged. + + @tparam T The type to potentially convert (void becomes monostate). +*/ +template +using void_to_monostate_t = std::conditional_t, std::monostate, T>; + +// Type deduplication: std::variant requires unique alternative types. +// Fold left over the type list, appending each type only if not already present. +template +struct variant_append_if_unique; + +template +struct variant_append_if_unique, T> +{ + using type = std::conditional_t< + (std::is_same_v || ...), + std::variant, + std::variant>; +}; + +template +struct deduplicate_impl; + +template +struct deduplicate_impl +{ + using type = Accumulated; +}; + +template +struct deduplicate_impl +{ + using next = typename variant_append_if_unique::type; + using type = typename deduplicate_impl::type; +}; + +// Deduplicated variant; void types become monostate before deduplication +template +using unique_variant_t = typename deduplicate_impl< + std::variant>, + void_to_monostate_t...>::type; + +// Result: (winner_index, deduplicated_variant). Use index to disambiguate +// when multiple tasks share the same return type. +template +using when_any_result_t = std::pair>; + +// Extract result type from any awaitable via await_resume() +template +using awaitable_result_t = decltype(std::declval&>().await_resume()); + +/** Core shared state for when_any operations. + + Contains all members and methods common to both heterogeneous (variadic) + and homogeneous (range) when_any implementations. State classes embed + this via composition to avoid CRTP destructor ordering issues. + + @par Thread Safety + Atomic operations protect winner selection and completion count. +*/ +struct when_any_core +{ + std::atomic remaining_count_; + std::size_t winner_index_{0}; + std::exception_ptr winner_exception_; + std::stop_source stop_source_; + + // Bridges parent's stop token to our stop_source + struct stop_callback_fn + { + std::stop_source* source_; + void operator()() const noexcept { source_->request_stop(); } + }; + using stop_callback_t = std::stop_callback; + std::optional parent_stop_callback_; + + coro continuation_; + executor_ref caller_ex_; + + // Placed last to avoid padding (1-byte atomic followed by 8-byte aligned members) + std::atomic has_winner_{false}; + + explicit when_any_core(std::size_t count) noexcept + : remaining_count_(count) + { + } + + /** Atomically claim winner status; exactly one task succeeds. */ + bool try_win(std::size_t index) noexcept + { + bool expected = false; + if(has_winner_.compare_exchange_strong( + expected, true, std::memory_order_acq_rel)) + { + winner_index_ = index; + stop_source_.request_stop(); + return true; + } + return false; + } + + /** @pre try_win() returned true. */ + void set_winner_exception(std::exception_ptr ep) noexcept + { + winner_exception_ = ep; + } + + /** Last task to complete resumes the parent via symmetric transfer. */ + coro signal_completion() + { + auto remaining = remaining_count_.fetch_sub(1, std::memory_order_acq_rel); + if(remaining == 1) + caller_ex_.dispatch(continuation_); + return std::noop_coroutine(); + } +}; + +/** Shared state for heterogeneous when_any operation. + + Coordinates winner selection, result storage, and completion tracking + for all child tasks in a when_any operation. Uses composition with + when_any_core for shared functionality. + + @par Lifetime + Allocated on the parent coroutine's frame, outlives all runners. + + @tparam T0 First task's result type. + @tparam Ts Remaining tasks' result types. +*/ +template +struct when_any_state +{ + static constexpr std::size_t task_count = 1 + sizeof...(Ts); + using variant_type = unique_variant_t; + + when_any_core core_; + std::optional result_; + std::array runner_handles_{}; + + when_any_state() + : core_(task_count) + { + } + + ~when_any_state() + { + for(auto h : runner_handles_) + if(h) + h.destroy(); + } + + /** @pre core_.try_win() returned true. + @note Uses in_place_type (not index) because variant is deduplicated. + */ + template + void set_winner_result(T value) + noexcept(std::is_nothrow_move_constructible_v) + { + result_.emplace(std::in_place_type, std::move(value)); + } + + /** @pre core_.try_win() returned true. */ + void set_winner_void() noexcept + { + result_.emplace(std::in_place_type, std::monostate{}); + } +}; + +/** Wrapper coroutine that runs a single child task for when_any. + + Propagates executor/stop_token to the child, attempts to claim winner + status on completion, and signals completion for cleanup coordination. + + @tparam StateType The state type (when_any_state or when_any_homogeneous_state). +*/ +template +struct when_any_runner +{ + struct promise_type // : frame_allocating_base // DISABLED FOR TESTING + { + StateType* state_ = nullptr; + std::size_t index_ = 0; + executor_ref ex_; + std::stop_token stop_token_; + + when_any_runner get_return_object() noexcept + { + return when_any_runner(std::coroutine_handle::from_promise(*this)); + } + + // Starts suspended; launcher sets up state/ex/token then resumes + std::suspend_always initial_suspend() noexcept + { + return {}; + } + + auto final_suspend() noexcept + { + struct awaiter + { + promise_type* p_; + bool await_ready() const noexcept { return false; } + coro await_suspend(coro) noexcept { return p_->state_->core_.signal_completion(); } + void await_resume() const noexcept {} + }; + return awaiter{this}; + } + + void return_void() noexcept {} + + // Exceptions are valid completions in when_any (unlike when_all) + void unhandled_exception() + { + if(state_->core_.try_win(index_)) + state_->core_.set_winner_exception(std::current_exception()); + } + + /** Injects executor and stop token into child awaitables. */ + template + struct transform_awaiter + { + std::decay_t a_; + promise_type* p_; + + bool await_ready() { return a_.await_ready(); } + auto await_resume() { return a_.await_resume(); } + + template + auto await_suspend(std::coroutine_handle h) + { + return a_.await_suspend(h, p_->ex_, p_->stop_token_); + } + }; + + template + auto await_transform(Awaitable&& a) + { + using A = std::decay_t; + if constexpr (IoAwaitable) + { + return transform_awaiter{ + std::forward(a), this}; + } + else + { + static_assert(sizeof(A) == 0, "requires IoAwaitable"); + } + } + }; + + std::coroutine_handle h_; + + explicit when_any_runner(std::coroutine_handle h) noexcept + : h_(h) + { + } + + // Enable move for all clang versions - some versions need it + when_any_runner(when_any_runner&& other) noexcept : h_(std::exchange(other.h_, nullptr)) {} + + // Non-copyable + when_any_runner(when_any_runner const&) = delete; + when_any_runner& operator=(when_any_runner const&) = delete; + when_any_runner& operator=(when_any_runner&&) = delete; + + auto release() noexcept + { + return std::exchange(h_, nullptr); + } +}; + +/** Wraps a child awaitable, attempts to claim winner on completion. + + Uses requires-expressions to detect state capabilities: + - set_winner_void(): for heterogeneous void tasks (stores monostate) + - set_winner_result(): for non-void tasks + - Neither: for homogeneous void tasks (no result storage) +*/ +template +when_any_runner +make_when_any_runner(Awaitable inner, StateType* state, std::size_t index) +{ + using T = awaitable_result_t; + if constexpr (std::is_void_v) + { + co_await std::move(inner); + if(state->core_.try_win(index)) + { + // Heterogeneous void tasks store monostate in the variant + if constexpr (requires { state->set_winner_void(); }) + state->set_winner_void(); + // Homogeneous void tasks have no result to store + } + } + else + { + auto result = co_await std::move(inner); + if(state->core_.try_win(index)) + { + // Defensive: move should not throw (already moved once), but we + // catch just in case since an uncaught exception would be devastating. + try + { + state->set_winner_result(std::move(result)); + } + catch(...) + { + state->core_.set_winner_exception(std::current_exception()); + } + } + } +} + +/** Launches all runners concurrently; see await_suspend for lifetime concerns. */ +template +class when_any_launcher +{ + using state_type = when_any_state...>; + + std::tuple* tasks_; + state_type* state_; + +public: + when_any_launcher( + std::tuple* tasks, + state_type* state) + : tasks_(tasks) + , state_(state) + { + } + + bool await_ready() const noexcept + { + return sizeof...(Awaitables) == 0; + } + + /** CRITICAL: If the last task finishes synchronously, parent resumes and + destroys this object before await_suspend returns. Must not reference + `this` after the final launch_one call. + */ + template + coro await_suspend(coro continuation, Ex const& caller_ex, std::stop_token parent_token = {}) + { + state_->core_.continuation_ = continuation; + state_->core_.caller_ex_ = caller_ex; + + if(parent_token.stop_possible()) + { + state_->core_.parent_stop_callback_.emplace( + parent_token, + when_any_core::stop_callback_fn{&state_->core_.stop_source_}); + + if(parent_token.stop_requested()) + state_->core_.stop_source_.request_stop(); + } + + auto token = state_->core_.stop_source_.get_token(); + [&](std::index_sequence) { + (..., launch_one(caller_ex, token)); + }(std::index_sequence_for{}); + + return std::noop_coroutine(); + } + + void await_resume() const noexcept + { + } + +private: + /** @pre Ex::dispatch() and coro::resume() must not throw (handle may leak). */ + template + void launch_one(Ex const& caller_ex, std::stop_token token) + { + auto runner = make_when_any_runner( + std::move(std::get(*tasks_)), state_, I); + + auto h = runner.release(); + h.promise().state_ = state_; + h.promise().index_ = I; + h.promise().ex_ = caller_ex; + h.promise().stop_token_ = token; + + coro ch{h}; + state_->runner_handles_[I] = ch; + caller_ex.dispatch(ch); + } +}; + +} // namespace detail + +/** Wait for the first awaitable to complete. + + Races multiple heterogeneous awaitables concurrently and returns when the + first one completes. The result includes the winner's index and a + deduplicated variant containing the result value. + + @par Suspends + The calling coroutine suspends when co_await is invoked. All awaitables + are launched concurrently and execute in parallel. The coroutine resumes + only after all awaitables have completed, even though the winner is + determined by the first to finish. + + @par Completion Conditions + @li Winner is determined when the first awaitable completes (success or exception) + @li Only one task can claim winner status via atomic compare-exchange + @li Once a winner exists, stop is requested for all remaining siblings + @li Parent coroutine resumes only after all siblings acknowledge completion + @li The winner's result is returned; if the winner threw, the exception is rethrown + + @par Cancellation Semantics + Cancellation is supported via stop_token propagated through the + IoAwaitable protocol: + @li Each child awaitable receives a stop_token derived from a shared stop_source + @li When the parent's stop token is activated, the stop is forwarded to all children + @li When a winner is determined, stop_source_.request_stop() is called immediately + @li Siblings must handle cancellation gracefully and complete before parent resumes + @li Stop requests are cooperative; tasks must check and respond to them + + @par Concurrency/Overlap + All awaitables are launched concurrently before any can complete. + The launcher iterates through the arguments, starting each task on the + caller's executor. Tasks may execute in parallel on multi-threaded + executors or interleave on single-threaded executors. There is no + guaranteed ordering of task completion. + + @par Notable Error Conditions + @li Winner exception: if the winning task threw, that exception is rethrown + @li Non-winner exceptions: silently discarded (only winner's result matters) + @li Cancellation: tasks may complete via cancellation without throwing + + @par Example + @code + task example() { + auto [index, result] = co_await when_any( + fetch_from_primary(), // task + fetch_from_backup() // task + ); + // index is 0 or 1, result holds the winner's Response + auto response = std::get(result); + } + @endcode + + @par Example with Heterogeneous Types + @code + task mixed_types() { + auto [index, result] = co_await when_any( + fetch_int(), // task + fetch_string() // task + ); + if (index == 0) + std::cout << "Got int: " << std::get(result) << "\n"; + else + std::cout << "Got string: " << std::get(result) << "\n"; + } + @endcode + + @tparam A0 First awaitable type (must satisfy IoAwaitable). + @tparam As Remaining awaitable types (must satisfy IoAwaitable). + @param a0 The first awaitable to race. + @param as Additional awaitables to race concurrently. + @return A task yielding a pair of (winner_index, result_variant). + + @throws Rethrows the winner's exception if the winning task threw an exception. + + @par Remarks + Awaitables are moved into the coroutine frame; original objects become + empty after the call. When multiple awaitables share the same return type, + the variant is deduplicated to contain only unique types. Use the winner + index to determine which awaitable completed first. Void awaitables + contribute std::monostate to the variant. + + @see when_all, IoAwaitable +*/ +template +[[nodiscard]] auto when_any(A0 a0, As... as) + -> task, + detail::awaitable_result_t...>> +{ + using result_type = detail::when_any_result_t< + detail::awaitable_result_t, + detail::awaitable_result_t...>; + + detail::when_any_state< + detail::awaitable_result_t, + detail::awaitable_result_t...> state; + std::tuple awaitable_tuple(std::move(a0), std::move(as)...); + + co_await detail::when_any_launcher(&awaitable_tuple, &state); + + if(state.core_.winner_exception_) + std::rethrow_exception(state.core_.winner_exception_); + + co_return result_type{state.core_.winner_index_, std::move(*state.result_)}; +} + +/** Concept for ranges of full I/O awaitables. + + A range satisfies `IoAwaitableRange` if it is a sized input range + whose value type satisfies @ref IoAwaitable. This enables when_any + to accept any container or view of awaitables, not just std::vector. + + @tparam R The range type. + + @par Requirements + @li `R` must satisfy `std::ranges::input_range` + @li `R` must satisfy `std::ranges::sized_range` + @li `std::ranges::range_value_t` must satisfy @ref IoAwaitable + + @par Syntactic Requirements + Given `r` of type `R`: + @li `std::ranges::begin(r)` is valid + @li `std::ranges::end(r)` is valid + @li `std::ranges::size(r)` returns `std::ranges::range_size_t` + @li `*std::ranges::begin(r)` satisfies @ref IoAwaitable + + @par Example + @code + template + task race_all(R&& awaitables) { + auto winner = co_await when_any(std::forward(awaitables)); + // Process winner... + } + @endcode + + @see when_any, IoAwaitable +*/ +template +concept IoAwaitableRange = + std::ranges::input_range && + std::ranges::sized_range && + IoAwaitable>; + +namespace detail { + +/** Shared state for homogeneous when_any (range overload). + + Uses composition with when_any_core for shared functionality. + Simpler than heterogeneous: optional instead of variant, vector + instead of array for runner handles. +*/ +template +struct when_any_homogeneous_state +{ + when_any_core core_; + std::optional result_; + std::vector runner_handles_; + + explicit when_any_homogeneous_state(std::size_t count) + : core_(count) + , runner_handles_(count) + { + } + + ~when_any_homogeneous_state() + { + for(auto h : runner_handles_) + if(h) + h.destroy(); + } + + /** @pre core_.try_win() returned true. */ + void set_winner_result(T value) + noexcept(std::is_nothrow_move_constructible_v) + { + result_.emplace(std::move(value)); + } +}; + +/** Specialization for void tasks (no result storage needed). */ +template<> +struct when_any_homogeneous_state +{ + when_any_core core_; + std::vector runner_handles_; + + explicit when_any_homogeneous_state(std::size_t count) + : core_(count) + , runner_handles_(count) + { + } + + ~when_any_homogeneous_state() + { + for(auto h : runner_handles_) + if(h) + h.destroy(); + } + + // No set_winner_result - void tasks have no result to store +}; + +/** Launches all runners concurrently; see await_suspend for lifetime concerns. */ +template +class when_any_homogeneous_launcher +{ + using Awaitable = std::ranges::range_value_t; + using T = awaitable_result_t; + + Range* range_; + when_any_homogeneous_state* state_; + +public: + when_any_homogeneous_launcher( + Range* range, + when_any_homogeneous_state* state) + : range_(range) + , state_(state) + { + } + + bool await_ready() const noexcept + { + return std::ranges::empty(*range_); + } + + /** CRITICAL: If the last task finishes synchronously, parent resumes and + destroys this object before await_suspend returns. Must not reference + `this` after the final launch_one call. + */ + template + coro await_suspend(coro continuation, Ex const& caller_ex, std::stop_token parent_token = {}) + { + state_->core_.continuation_ = continuation; + state_->core_.caller_ex_ = caller_ex; + + if(parent_token.stop_possible()) + { + state_->core_.parent_stop_callback_.emplace( + parent_token, + when_any_core::stop_callback_fn{&state_->core_.stop_source_}); + + if(parent_token.stop_requested()) + state_->core_.stop_source_.request_stop(); + } + + auto token = state_->core_.stop_source_.get_token(); + std::size_t index = 0; + for(auto&& a : *range_) + { + launch_one(index, std::move(a), caller_ex, token); + ++index; + } + + return std::noop_coroutine(); + } + + void await_resume() const noexcept + { + } + +private: + /** @pre Ex::dispatch() and coro::resume() must not throw (handle may leak). */ + template + void launch_one(std::size_t index, Awaitable&& awaitable, Ex const& caller_ex, std::stop_token token) + { + auto runner = make_when_any_runner( + std::move(awaitable), state_, index); + + auto h = runner.release(); + h.promise().state_ = state_; + h.promise().index_ = index; + h.promise().ex_ = caller_ex; + h.promise().stop_token_ = token; + + coro ch{h}; + state_->runner_handles_[index] = ch; + caller_ex.dispatch(ch); + } +}; + +} // namespace detail + +/** Wait for the first awaitable to complete (range overload). + + Races a range of awaitables with the same result type. Accepts any + sized input range of IoAwaitable types, enabling use with arrays, + spans, or custom containers. + + @par Suspends + The calling coroutine suspends when co_await is invoked. All awaitables + in the range are launched concurrently and execute in parallel. The + coroutine resumes only after all awaitables have completed, even though + the winner is determined by the first to finish. + + @par Completion Conditions + @li Winner is determined when the first awaitable completes (success or exception) + @li Only one task can claim winner status via atomic compare-exchange + @li Once a winner exists, stop is requested for all remaining siblings + @li Parent coroutine resumes only after all siblings acknowledge completion + @li The winner's index and result are returned; if the winner threw, the exception is rethrown + + @par Cancellation Semantics + Cancellation is supported via stop_token propagated through the + IoAwaitable protocol: + @li Each child awaitable receives a stop_token derived from a shared stop_source + @li When the parent's stop token is activated, the stop is forwarded to all children + @li When a winner is determined, stop_source_.request_stop() is called immediately + @li Siblings must handle cancellation gracefully and complete before parent resumes + @li Stop requests are cooperative; tasks must check and respond to them + + @par Concurrency/Overlap + All awaitables are launched concurrently before any can complete. + The launcher iterates through the range, starting each task on the + caller's executor. Tasks may execute in parallel on multi-threaded + executors or interleave on single-threaded executors. There is no + guaranteed ordering of task completion. + + @par Notable Error Conditions + @li Empty range: throws std::invalid_argument immediately (not via co_return) + @li Winner exception: if the winning task threw, that exception is rethrown + @li Non-winner exceptions: silently discarded (only winner's result matters) + @li Cancellation: tasks may complete via cancellation without throwing + + @par Example + @code + task example() { + std::array, 3> requests = { + fetch_from_server(0), + fetch_from_server(1), + fetch_from_server(2) + }; + + auto [index, response] = co_await when_any(std::move(requests)); + } + @endcode + + @par Example with Vector + @code + task fetch_fastest(std::vector const& servers) { + std::vector> requests; + for (auto const& server : servers) + requests.push_back(fetch_from(server)); + + auto [index, response] = co_await when_any(std::move(requests)); + co_return response; + } + @endcode + + @tparam R Range type satisfying IoAwaitableRange. + @param awaitables Range of awaitables to race concurrently (must not be empty). + @return A task yielding a pair of (winner_index, result). + + @throws std::invalid_argument if range is empty (thrown before coroutine suspends). + @throws Rethrows the winner's exception if the winning task threw an exception. + + @par Remarks + Elements are moved from the range; for lvalue ranges, the original + container will have moved-from elements after this call. The range + is moved onto the coroutine frame to ensure lifetime safety. Unlike + the variadic overload, no variant wrapper is needed since all tasks + share the same return type. + + @see when_any, IoAwaitableRange +*/ +template + requires (!std::is_void_v>>) +[[nodiscard]] auto when_any(R&& awaitables) + -> task>>> +{ + using Awaitable = std::ranges::range_value_t; + using T = detail::awaitable_result_t; + using result_type = std::pair; + using OwnedRange = std::remove_cvref_t; + + auto count = std::ranges::size(awaitables); + if(count == 0) + throw std::invalid_argument("when_any requires at least one awaitable"); + + // Move/copy range onto coroutine frame to ensure lifetime + OwnedRange owned_awaitables = std::forward(awaitables); + + detail::when_any_homogeneous_state state(count); + + co_await detail::when_any_homogeneous_launcher(&owned_awaitables, &state); + + if(state.core_.winner_exception_) + std::rethrow_exception(state.core_.winner_exception_); + + co_return result_type{state.core_.winner_index_, std::move(*state.result_)}; +} + +/** Wait for the first awaitable to complete (void range overload). + + Races a range of void-returning awaitables. Since void awaitables have + no result value, only the winner's index is returned. + + @par Suspends + The calling coroutine suspends when co_await is invoked. All awaitables + in the range are launched concurrently and execute in parallel. The + coroutine resumes only after all awaitables have completed, even though + the winner is determined by the first to finish. + + @par Completion Conditions + @li Winner is determined when the first awaitable completes (success or exception) + @li Only one task can claim winner status via atomic compare-exchange + @li Once a winner exists, stop is requested for all remaining siblings + @li Parent coroutine resumes only after all siblings acknowledge completion + @li The winner's index is returned; if the winner threw, the exception is rethrown + + @par Cancellation Semantics + Cancellation is supported via stop_token propagated through the + IoAwaitable protocol: + @li Each child awaitable receives a stop_token derived from a shared stop_source + @li When the parent's stop token is activated, the stop is forwarded to all children + @li When a winner is determined, stop_source_.request_stop() is called immediately + @li Siblings must handle cancellation gracefully and complete before parent resumes + @li Stop requests are cooperative; tasks must check and respond to them + + @par Concurrency/Overlap + All awaitables are launched concurrently before any can complete. + The launcher iterates through the range, starting each task on the + caller's executor. Tasks may execute in parallel on multi-threaded + executors or interleave on single-threaded executors. There is no + guaranteed ordering of task completion. + + @par Notable Error Conditions + @li Empty range: throws std::invalid_argument immediately (not via co_return) + @li Winner exception: if the winning task threw, that exception is rethrown + @li Non-winner exceptions: silently discarded (only winner's result matters) + @li Cancellation: tasks may complete via cancellation without throwing + + @par Example + @code + task example() { + std::vector> tasks; + for (int i = 0; i < 5; ++i) + tasks.push_back(background_work(i)); + + std::size_t winner = co_await when_any(std::move(tasks)); + // winner is the index of the first task to complete + } + @endcode + + @par Example with Timeout + @code + task with_timeout() { + std::vector> tasks; + tasks.push_back(long_running_operation()); + tasks.push_back(delay(std::chrono::seconds(5))); + + std::size_t winner = co_await when_any(std::move(tasks)); + if (winner == 1) { + // Timeout occurred + } + } + @endcode + + @tparam R Range type satisfying IoAwaitableRange with void result. + @param awaitables Range of void awaitables to race concurrently (must not be empty). + @return A task yielding the winner's index (zero-based). + + @throws std::invalid_argument if range is empty (thrown before coroutine suspends). + @throws Rethrows the winner's exception if the winning task threw an exception. + + @par Remarks + Elements are moved from the range; for lvalue ranges, the original + container will have moved-from elements after this call. The range + is moved onto the coroutine frame to ensure lifetime safety. Unlike + the non-void overload, no result storage is needed since void tasks + produce no value. + + @see when_any, IoAwaitableRange +*/ +template + requires std::is_void_v>> +[[nodiscard]] auto when_any(R&& awaitables) -> task +{ + using OwnedRange = std::remove_cvref_t; + + auto count = std::ranges::size(awaitables); + if(count == 0) + throw std::invalid_argument("when_any requires at least one awaitable"); + + // Move/copy range onto coroutine frame to ensure lifetime + OwnedRange owned_awaitables = std::forward(awaitables); + + detail::when_any_homogeneous_state state(count); + + co_await detail::when_any_homogeneous_launcher(&owned_awaitables, &state); + + if(state.core_.winner_exception_) + std::rethrow_exception(state.core_.winner_exception_); + + co_return state.core_.winner_index_; +} + +} // namespace capy +} // namespace boost + +#endif diff --git a/test/unit/ex/async_event.cpp b/test/unit/ex/async_event.cpp index 242aad3d..cf608280 100644 --- a/test/unit/ex/async_event.cpp +++ b/test/unit/ex/async_event.cpp @@ -26,44 +26,6 @@ static_assert(IoAwaitable); namespace { -/** Queuing executor that queues coroutines for manual execution control. -*/ -struct queuing_executor -{ - std::queue* queue_; - test_io_context* ctx_ = nullptr; - - explicit queuing_executor(std::queue& q) - : queue_(&q) - { - } - - bool operator==(queuing_executor const& other) const noexcept - { - return queue_ == other.queue_; - } - - execution_context& context() const noexcept - { - return ctx_ ? *ctx_ : default_test_io_context(); - } - - void on_work_started() const noexcept {} - void on_work_finished() const noexcept {} - - void dispatch(coro h) const - { - queue_->push(h); - } - - void post(coro h) const - { - queue_->push(h); - } -}; - -static_assert(Executor); - /** Run a task to completion by manually stepping through it. */ template diff --git a/test/unit/ex/run_async.cpp b/test/unit/ex/run_async.cpp index 5252380b..145fe0a3 100644 --- a/test/unit/ex/run_async.cpp +++ b/test/unit/ex/run_async.cpp @@ -135,15 +135,6 @@ test_io_context queue_executor::default_ctx_; static_assert(Executor); -/// Test exception type. -struct test_exception : std::runtime_error -{ - explicit test_exception(char const* msg) - : std::runtime_error(msg) - { - } -}; - } // namespace capy } // namespace boost diff --git a/test/unit/task.cpp b/test/unit/task.cpp index aa5496aa..71df18b8 100644 --- a/test/unit/task.cpp +++ b/test/unit/task.cpp @@ -77,45 +77,6 @@ struct tracking_executor static_assert(Executor); -/** Queuing executor that queues coroutines for manual execution control. - Returns noop_coroutine so the caller doesn't resume immediately. -*/ -struct queuing_executor -{ - std::queue* queue_; - test_io_context* ctx_ = nullptr; - - explicit queuing_executor(std::queue& q) - : queue_(&q) - { - } - - bool operator==(queuing_executor const& other) const noexcept - { - return queue_ == other.queue_; - } - - execution_context& context() const noexcept - { - return ctx_ ? *ctx_ : default_test_io_context(); - } - - void on_work_started() const noexcept {} - void on_work_finished() const noexcept {} - - void dispatch(coro h) const - { - queue_->push(h); - } - - void post(coro h) const - { - queue_->push(h); - } -}; - -static_assert(Executor); - /** Run a task to completion by manually stepping through it. Takes ownership of the task via release() and runs until done. @@ -154,20 +115,6 @@ inline void run_void_task(task t) run_task(std::move(t)); } -struct test_exception : std::runtime_error -{ - explicit test_exception(const char* msg) - : std::runtime_error(msg) - { - } -}; - -[[noreturn]] inline void -throw_test_exception(char const* msg) -{ - throw test_exception(msg); -} - struct task_test { static task @@ -1088,15 +1035,15 @@ struct task_test BOOST_TEST(all_same); } - struct tear_down_t + struct tear_down_t { bool *torn_down; tear_down_t(bool &torn_down) : torn_down(&torn_down) {} tear_down_t(tear_down_t && rhs) noexcept : torn_down(std::exchange(rhs.torn_down, nullptr)) {} - ~tear_down_t() + ~tear_down_t() { if (torn_down) - *torn_down = true; + *torn_down = true; } }; @@ -1119,7 +1066,7 @@ struct task_test { bool td1 = false, td2 = false; { - auto l = []() -> capy::task + auto l = []() -> capy::task { co_await self_destroy_awaitable{}; }; @@ -1141,15 +1088,15 @@ struct task_test { std::atomic state = 0; - - auto l = [&]() -> capy::task + + auto l = [&]() -> capy::task { struct scope_check { std::atomic &state; ~scope_check() { BOOST_TEST(state == 2);} } sc{state}; - + state = 1; co_await stop_only_awaitable{}; state = 2; diff --git a/test/unit/test_helpers.hpp b/test/unit/test_helpers.hpp index b6941baf..f6df87ba 100644 --- a/test/unit/test_helpers.hpp +++ b/test/unit/test_helpers.hpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -23,6 +24,9 @@ #include #include +#include +#include +#include #include #if defined(__linux__) || defined(__FreeBSD__) || defined(__NetBSD__) || defined(__APPLE__) @@ -264,6 +268,143 @@ struct stop_only_awaitable void await_resume() {} }; +//---------------------------------------------------------- +// Test Exception Types +//---------------------------------------------------------- + +/** Standard test exception type used across test files. */ +struct test_exception : std::runtime_error +{ + explicit test_exception(const char* msg) + : std::runtime_error(msg) + { + } +}; + +/** Helper to throw test_exception with a message. */ +[[noreturn]] inline void +throw_test_exception(char const* msg) +{ + throw test_exception(msg); +} + +//---------------------------------------------------------- +// Common Test Task Helpers +//---------------------------------------------------------- + +/** Returns a task that completes with an int value. */ +inline task +returns_int(int value) +{ + co_return value; +} + +/** Returns a task that completes with a string value. */ +inline task +returns_string(std::string value) +{ + co_return value; +} + +/** Returns a task that completes with void. */ +inline task +void_task() +{ + co_return; +} + +/** Returns a task that throws an exception. */ +inline task +throws_exception(char const* msg) +{ + throw_test_exception(msg); + co_return 0; +} + +/** Returns a void task that throws an exception. */ +inline task +void_throws_exception(char const* msg) +{ + throw_test_exception(msg); + co_return; +} + +//---------------------------------------------------------- +// Queuing Executor +//---------------------------------------------------------- + +/** Queuing executor that allows controlled interleaving of tasks. + + Unlike test_executor which runs tasks synchronously, this executor + queues work and runs it in FIFO order when manually resumed. + This allows tasks to observe stop requests between suspension points. +*/ +struct queuing_executor +{ + std::queue* queue_; + test_io_context* ctx_ = nullptr; + + explicit queuing_executor(std::queue& q) + : queue_(&q) + { + } + + bool operator==(queuing_executor const& other) const noexcept + { + return queue_ == other.queue_; + } + + test_io_context& context() const noexcept + { + return ctx_ ? *ctx_ : default_test_io_context(); + } + + void on_work_started() const noexcept {} + void on_work_finished() const noexcept {} + + coro dispatch(coro h) const + { + queue_->push(h); + return std::noop_coroutine(); + } + + void post(coro h) const + { + queue_->push(h); + } +}; + +static_assert(Executor); + +//---------------------------------------------------------- +// Yield Awaitable +//---------------------------------------------------------- + +/** Awaitable that yields to the executor, allowing other tasks to run. + + When awaited, this suspends the current coroutine and posts it back + to the executor's queue. This creates a yield point where the task + can be interleaved with other tasks. +*/ +struct yield_awaitable +{ + bool await_ready() const noexcept + { + return false; + } + + template + coro await_suspend(coro h, Ex const& ex, std::stop_token) + { + // Post ourselves back to the queue + ex.post(h); + return std::noop_coroutine(); + } + + void await_resume() const noexcept + { + } +}; } // capy } // boost diff --git a/test/unit/when_all.cpp b/test/unit/when_all.cpp index b7251b5d..d7668fc3 100644 --- a/test/unit/when_all.cpp +++ b/test/unit/when_all.cpp @@ -63,54 +63,8 @@ static_assert(std::is_void_v< // Verify when_all returns task which satisfies awaitable protocols static_assert(IoAwaitableTask>>); -struct test_exception : std::runtime_error -{ - explicit test_exception(const char* msg) - : std::runtime_error(msg) - { - } -}; - -[[noreturn]] inline void -throw_test_exception(char const* msg) -{ - throw test_exception(msg); -} - struct when_all_test { - // Helper tasks - static task - returns_int(int value) - { - co_return value; - } - - static task - returns_string(std::string value) - { - co_return value; - } - - static task - void_task() - { - co_return; - } - - static task - throws_exception(char const* msg) - { - throw_test_exception(msg); - co_return 0; - } - - static task - void_throws_exception(char const* msg) - { - throw_test_exception(msg); - co_return; - } // Test: Single task with when_all succeeds void diff --git a/test/unit/when_any.cpp b/test/unit/when_any.cpp new file mode 100644 index 00000000..56ede9b4 --- /dev/null +++ b/test/unit/when_any.cpp @@ -0,0 +1,1738 @@ +// +// Copyright (c) 2026 Michael Vandeberg +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/capy +// + +// Test that header file is self-contained. +#include + +#include +#include +#include +#include + +#include "test_helpers.hpp" +#include "test_suite.hpp" + +#include +#include +#include +#include +#include + +namespace boost { +namespace capy { + +struct when_any_test +{ + //---------------------------------------------------------- + // Basic functionality tests + //---------------------------------------------------------- + + // Test: Single task returns immediately + void + testSingleTask() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + int result = 0; + std::size_t winner_index = 999; + + run_async(ex, + [&](auto&& r) { + completed = true; + winner_index = r.first; + result = std::get<0>(r.second); + }, + [](std::exception_ptr) {})( + when_any(returns_int(42))); + + BOOST_TEST(completed); + BOOST_TEST_EQ(winner_index, 0u); + BOOST_TEST_EQ(result, 42); + } + + // Test: Two tasks - first completes wins + void + testTwoTasksFirstWins() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + std::size_t winner_index = 999; + int result_value = 0; + + run_async(ex, + [&](auto&& r) { + completed = true; + winner_index = r.first; + // Variant is deduplicated to single int type + result_value = std::get(r.second); + }, + [](std::exception_ptr) {})( + when_any(returns_int(10), returns_int(20))); + + BOOST_TEST(completed); + // One of them should win, with correct index-to-value mapping + BOOST_TEST(winner_index == 0 || winner_index == 1); + if (winner_index == 0) + BOOST_TEST_EQ(result_value, 10); + else + BOOST_TEST_EQ(result_value, 20); + } + + // Test: Three tasks with different types + void + testMixedTypes() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + std::size_t winner_index = 999; + std::variant result_value; + + run_async(ex, + [&](auto&& r) { + completed = true; + winner_index = r.first; + result_value = r.second; + }, + [](std::exception_ptr) {})( + when_any(returns_int(1), returns_string("hello"), returns_int(3))); + + BOOST_TEST(completed); + BOOST_TEST(winner_index == 0 || winner_index == 1 || winner_index == 2); + if (winner_index == 0) + BOOST_TEST_EQ(std::get(result_value), 1); + else if (winner_index == 1) + BOOST_TEST_EQ(std::get(result_value), "hello"); + else + BOOST_TEST_EQ(std::get(result_value), 3); + } + + // Test: Void task can win + void + testVoidTaskWins() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + std::size_t winner_index = 999; + std::variant result_value; + + run_async(ex, + [&](auto&& r) { + completed = true; + winner_index = r.first; + result_value = r.second; + }, + [](std::exception_ptr) {})( + when_any(void_task(), returns_int(42))); + + BOOST_TEST(completed); + BOOST_TEST(winner_index == 0 || winner_index == 1); + if (winner_index == 0) + BOOST_TEST(std::holds_alternative(result_value)); + else + BOOST_TEST_EQ(std::get(result_value), 42); + } + + // Test: All void tasks + void + testAllVoidTasks() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + std::size_t winner_index = 999; + std::variant result_value; + + run_async(ex, + [&](auto&& r) { + completed = true; + winner_index = r.first; + result_value = r.second; + }, + [](std::exception_ptr) {})( + when_any(void_task(), void_task(), void_task())); + + BOOST_TEST(completed); + BOOST_TEST(winner_index == 0 || winner_index == 1 || winner_index == 2); + // All void tasks produce monostate regardless of index + BOOST_TEST(std::holds_alternative(result_value)); + } + + //---------------------------------------------------------- + // Exception handling tests + //---------------------------------------------------------- + + // Test: Exception from single task propagates + void + testSingleTaskException() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + bool caught_exception = false; + std::string error_msg; + + run_async(ex, + [&](auto&&) { completed = true; }, + [&](std::exception_ptr ep) { + try { + std::rethrow_exception(ep); + } catch (test_exception const& e) { + caught_exception = true; + error_msg = e.what(); + } + })(when_any(throws_exception("test error"))); + + BOOST_TEST(!completed); + BOOST_TEST(caught_exception); + BOOST_TEST_EQ(error_msg, "test error"); + } + + // Test: Exception wins the race (exception is a valid completion) + void + testExceptionWinsRace() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool caught_exception = false; + std::string error_msg; + + run_async(ex, + [](auto&&) {}, + [&](std::exception_ptr ep) { + try { + std::rethrow_exception(ep); + } catch (test_exception const& e) { + caught_exception = true; + error_msg = e.what(); + } + })(when_any(throws_exception("winner error"), returns_int(42))); + + // With synchronous executor, first task (the thrower) wins + BOOST_TEST(caught_exception); + BOOST_TEST_EQ(error_msg, "winner error"); + } + + // Test: Void task exception + void + testVoidTaskException() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool caught_exception = false; + std::string error_msg; + + run_async(ex, + [](auto&&) {}, + [&](std::exception_ptr ep) { + try { + std::rethrow_exception(ep); + } catch (test_exception const& e) { + caught_exception = true; + error_msg = e.what(); + } + })(when_any(void_throws_exception("void error"), returns_int(42))); + + BOOST_TEST(caught_exception); + BOOST_TEST_EQ(error_msg, "void error"); + } + + // Test: Multiple exceptions - first wins + void + testMultipleExceptionsFirstWins() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool caught_exception = false; + std::string error_msg; + + run_async(ex, + [](auto&&) {}, + [&](std::exception_ptr ep) { + try { + std::rethrow_exception(ep); + } catch (test_exception const& e) { + caught_exception = true; + error_msg = e.what(); + } + })(when_any( + throws_exception("error_1"), + throws_exception("error_2"), + throws_exception("error_3"))); + + BOOST_TEST(caught_exception); + // One of them wins + BOOST_TEST( + error_msg == "error_1" || + error_msg == "error_2" || + error_msg == "error_3"); + } + + //---------------------------------------------------------- + // Stop token propagation tests + //---------------------------------------------------------- + + // Test: Stop is requested when winner completes + void + testStopRequestedOnCompletion() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + std::atomic completion_count{0}; + bool completed = false; + + auto counting_task = [&]() -> task { + ++completion_count; + co_return completion_count.load(); + }; + + run_async(ex, + [&](auto&&) { + completed = true; + }, + [](std::exception_ptr) {})( + when_any(counting_task(), counting_task(), counting_task())); + + BOOST_TEST(completed); + // All three tasks should run to completion + // (stop is requested, but synchronous tasks complete anyway) + BOOST_TEST_EQ(completion_count.load(), 3); + } + + // Test: All tasks complete even after winner (cleanup) + void + testAllTasksCompleteForCleanup() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + std::atomic completion_count{0}; + bool completed = false; + + auto counting_task = [&](int value) -> task { + ++completion_count; + co_return value; + }; + + run_async(ex, + [&](auto&& r) { + completed = true; + // Winner should be first task (synchronous executor) + BOOST_TEST_EQ(r.first, 0u); + }, + [](std::exception_ptr) {})( + when_any( + counting_task(1), + counting_task(2), + counting_task(3), + counting_task(4))); + + BOOST_TEST(completed); + // All four tasks must complete for proper cleanup + BOOST_TEST_EQ(completion_count.load(), 4); + } + + //---------------------------------------------------------- + // Long-lived task cancellation tests + //---------------------------------------------------------- + + // Test: Long-lived tasks exit early when stop is requested + void + testLongLivedTasksCancelledOnWinner() + { + std::queue work_queue; + queuing_executor ex(work_queue); + + std::atomic cancelled_count{0}; + std::atomic completed_normally_count{0}; + bool when_any_completed = false; + std::size_t winner_index = 999; + int winner_value = 0; + + // A task that completes immediately + auto fast_task = [&]() -> task { + ++completed_normally_count; + co_return 42; + }; + + // A task that does multiple steps, checking stop token between each + auto slow_task = [&](int id, int steps) -> task { + for (int i = 0; i < steps; ++i) { + auto token = co_await this_coro::stop_token; + if (token.stop_requested()) { + ++cancelled_count; + co_return -1; // Cancelled + } + co_await yield_awaitable{}; + } + ++completed_normally_count; + co_return id; + }; + + run_async(ex, + [&](auto&& r) { + when_any_completed = true; + winner_index = r.first; + winner_value = std::get(r.second); + }, + [](std::exception_ptr) {})( + when_any(fast_task(), slow_task(100, 10), slow_task(200, 10))); + + // Process work queue until empty + while (!work_queue.empty()) { + auto h = work_queue.front(); + work_queue.pop(); + h.resume(); + } + + BOOST_TEST(when_any_completed); + BOOST_TEST_EQ(winner_index, 0u); // fast_task wins + BOOST_TEST_EQ(winner_value, 42); + + // The fast task completed normally + BOOST_TEST_EQ(completed_normally_count.load(), 1); + + // Both slow tasks should have been cancelled + BOOST_TEST_EQ(cancelled_count.load(), 2); + } + + // Test: Slow task can win if it finishes first + void + testSlowTaskCanWin() + { + std::queue work_queue; + queuing_executor ex(work_queue); + + std::atomic cancelled_count{0}; + std::atomic completed_normally_count{0}; + bool when_any_completed = false; + std::size_t winner_index = 999; + int winner_value = 0; + + // A task that does a few steps then completes + auto medium_task = [&](int id, int steps) -> task { + for (int i = 0; i < steps; ++i) { + auto token = co_await this_coro::stop_token; + if (token.stop_requested()) { + ++cancelled_count; + co_return -1; + } + co_await yield_awaitable{}; + } + ++completed_normally_count; + co_return id; + }; + + // Task 0: 3 steps, Task 1: 1 step (wins), Task 2: 4 steps + // With FIFO scheduling, task1 completes after 1 yield while others + // are still in progress and will observe the stop request. + run_async(ex, + [&](auto&& r) { + when_any_completed = true; + winner_index = r.first; + winner_value = std::get(r.second); + }, + [](std::exception_ptr) {})( + when_any(medium_task(10, 3), medium_task(20, 1), medium_task(30, 4))); + + // Process work queue until empty + while (!work_queue.empty()) { + auto h = work_queue.front(); + work_queue.pop(); + h.resume(); + } + + BOOST_TEST(when_any_completed); + BOOST_TEST_EQ(winner_index, 1u); // Task with 1 step wins + BOOST_TEST_EQ(winner_value, 20); + + // Only the winner completed normally + BOOST_TEST_EQ(completed_normally_count.load(), 1); + + // Other two tasks were cancelled + BOOST_TEST_EQ(cancelled_count.load(), 2); + } + + // Test: Tasks that don't check stop token still complete (cleanup) + void + testNonCooperativeTasksStillComplete() + { + std::queue work_queue; + queuing_executor ex(work_queue); + + std::atomic completion_count{0}; + bool when_any_completed = false; + + // A task that completes immediately + auto fast_task = [&]() -> task { + ++completion_count; + co_return 42; + }; + + // A task that ignores stop token (non-cooperative) + auto non_cooperative_task = [&](int id, int steps) -> task { + for (int i = 0; i < steps; ++i) { + // Deliberately NOT checking stop token + co_await yield_awaitable{}; + } + ++completion_count; + co_return id; + }; + + run_async(ex, + [&](auto&& r) { + when_any_completed = true; + BOOST_TEST_EQ(r.first, 0u); // fast_task wins + }, + [](std::exception_ptr) {})( + when_any(fast_task(), non_cooperative_task(100, 3), non_cooperative_task(200, 3))); + + // Process work queue until empty + while (!work_queue.empty()) { + auto h = work_queue.front(); + work_queue.pop(); + h.resume(); + } + + BOOST_TEST(when_any_completed); + + // All three tasks complete (non-cooperative tasks run to completion) + BOOST_TEST_EQ(completion_count.load(), 3); + } + + // Test: Mixed cooperative and non-cooperative tasks + void + testMixedCooperativeAndNonCooperativeTasks() + { + std::queue work_queue; + queuing_executor ex(work_queue); + + std::atomic cooperative_cancelled{0}; + std::atomic non_cooperative_finished{0}; + std::atomic winner_finished{0}; + bool when_any_completed = false; + + auto fast_task = [&]() -> task { + ++winner_finished; + co_return 1; + }; + + auto cooperative_slow = [&](int steps) -> task { + for (int i = 0; i < steps; ++i) { + auto token = co_await this_coro::stop_token; + if (token.stop_requested()) { + ++cooperative_cancelled; + co_return -1; + } + co_await yield_awaitable{}; + } + co_return 2; + }; + + auto non_cooperative_slow = [&](int steps) -> task { + for (int i = 0; i < steps; ++i) { + co_await yield_awaitable{}; + } + ++non_cooperative_finished; + co_return 3; + }; + + run_async(ex, + [&](auto&& r) { + when_any_completed = true; + BOOST_TEST_EQ(r.first, 0u); + }, + [](std::exception_ptr) {})( + when_any(fast_task(), cooperative_slow(5), non_cooperative_slow(5))); + + while (!work_queue.empty()) { + auto h = work_queue.front(); + work_queue.pop(); + h.resume(); + } + + BOOST_TEST(when_any_completed); + BOOST_TEST_EQ(winner_finished.load(), 1); + BOOST_TEST_EQ(cooperative_cancelled.load(), 1); + BOOST_TEST_EQ(non_cooperative_finished.load(), 1); + } + + //---------------------------------------------------------- + // Nested when_any tests + //---------------------------------------------------------- + + // Test: Nested when_any + void + testNestedWhenAny() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + int result = 0; + + auto inner1 = []() -> task { + auto [idx, res] = co_await when_any(returns_int(10), returns_int(20)); + co_return std::get(res); + }; + + auto inner2 = []() -> task { + auto [idx, res] = co_await when_any(returns_int(30), returns_int(40)); + co_return std::get(res); + }; + + std::size_t winner_index = 999; + + run_async(ex, + [&](auto&& r) { + completed = true; + winner_index = r.first; + result = std::get(r.second); + }, + [](std::exception_ptr) {})( + when_any(inner1(), inner2())); + + BOOST_TEST(completed); + BOOST_TEST(winner_index == 0 || winner_index == 1); + // inner1 returns 10 or 20, inner2 returns 30 or 40 + if (winner_index == 0) + BOOST_TEST(result == 10 || result == 20); + else + BOOST_TEST(result == 30 || result == 40); + } + + // Test: when_any inside when_all + void + testWhenAnyInsideWhenAll() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + + auto race1 = []() -> task { + auto [idx, res] = co_await when_any(returns_int(1), returns_int(2)); + co_return std::get(res); + }; + + auto race2 = []() -> task { + auto [idx, res] = co_await when_any(returns_int(3), returns_int(4)); + co_return std::get(res); + }; + + run_async(ex, + [&](std::tuple t) { + auto [a, b] = t; + completed = true; + BOOST_TEST((a == 1 || a == 2)); + BOOST_TEST((b == 3 || b == 4)); + }, + [](std::exception_ptr) {})( + when_all(race1(), race2())); + + BOOST_TEST(completed); + } + + // Test: when_all inside when_any + void + testWhenAllInsideWhenAny() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + std::size_t winner_index = 999; + int result_value = 0; + + auto concurrent1 = []() -> task { + auto [a, b] = co_await when_all(returns_int(1), returns_int(2)); + co_return a + b; + }; + + auto concurrent2 = []() -> task { + auto [a, b] = co_await when_all(returns_int(3), returns_int(4)); + co_return a + b; + }; + + run_async(ex, + [&](auto&& r) { + completed = true; + winner_index = r.first; + result_value = std::get(r.second); + }, + [](std::exception_ptr) {})( + when_any(concurrent1(), concurrent2())); + + BOOST_TEST(completed); + BOOST_TEST(winner_index == 0 || winner_index == 1); + // concurrent1 returns 1+2=3, concurrent2 returns 3+4=7 + if (winner_index == 0) + BOOST_TEST_EQ(result_value, 3); + else + BOOST_TEST_EQ(result_value, 7); + } + + //---------------------------------------------------------- + // Edge case tests + //---------------------------------------------------------- + + // Test: Large number of tasks + void + testManyTasks() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + std::size_t winner_index = 999; + int result_value = 0; + + run_async(ex, + [&](auto r) { + completed = true; + winner_index = r.first; + result_value = std::get(r.second); + }, + [](std::exception_ptr) {})(when_any( + returns_int(1), returns_int(2), returns_int(3), returns_int(4), + returns_int(5), returns_int(6), returns_int(7), returns_int(8))); + + BOOST_TEST(completed); + BOOST_TEST(winner_index < 8); + // Verify correct index-to-value mapping (index 0 -> value 1, etc.) + BOOST_TEST_EQ(result_value, static_cast(winner_index + 1)); + } + + // Test: Task that does multiple internal operations + static task + multi_step_task(int start) + { + int value = start; + value += co_await returns_int(1); + value += co_await returns_int(2); + co_return value; + } + + void + testTasksWithMultipleSteps() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + std::size_t winner_index = 999; + int result_value = 0; + + run_async(ex, + [&](auto&& r) { + completed = true; + winner_index = r.first; + result_value = std::get(r.second); + }, + [](std::exception_ptr) {})( + when_any(multi_step_task(10), multi_step_task(20))); + + BOOST_TEST(completed); + BOOST_TEST(winner_index == 0 || winner_index == 1); + // Index 0: 10+1+2=13, Index 1: 20+1+2=23 + if (winner_index == 0) + BOOST_TEST_EQ(result_value, 13); + else + BOOST_TEST_EQ(result_value, 23); + } + + //---------------------------------------------------------- + // Awaitable lifecycle tests + //---------------------------------------------------------- + + // Test: when_any result is move constructible + void + testAwaitableMoveConstruction() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + std::size_t winner_index = 999; + int result_value = 0; + + auto awaitable1 = when_any(returns_int(1), returns_int(2)); + auto awaitable2 = std::move(awaitable1); + + run_async(ex, + [&](auto&& r) { + completed = true; + winner_index = r.first; + result_value = std::get(r.second); + }, + [](std::exception_ptr) {})(std::move(awaitable2)); + + BOOST_TEST(completed); + BOOST_TEST(winner_index == 0 || winner_index == 1); + if (winner_index == 0) + BOOST_TEST_EQ(result_value, 1); + else + BOOST_TEST_EQ(result_value, 2); + } + + // Test: when_any can be stored and awaited later + void + testDeferredAwait() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + std::size_t winner_index = 999; + int result_value = 0; + + auto deferred = when_any(returns_int(10), returns_int(20)); + + run_async(ex, + [&](auto&& r) { + completed = true; + winner_index = r.first; + result_value = std::get(r.second); + }, + [](std::exception_ptr) {})(std::move(deferred)); + + BOOST_TEST(completed); + BOOST_TEST(winner_index == 0 || winner_index == 1); + if (winner_index == 0) + BOOST_TEST_EQ(result_value, 10); + else + BOOST_TEST_EQ(result_value, 20); + } + + //---------------------------------------------------------- + // Variant access tests + //---------------------------------------------------------- + + // Test: Correct variant alternative is populated + void + testVariantAlternativePopulated() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + + // Note: deduplicates to variant + run_async(ex, + [&](auto&& r) { + completed = true; + // With synchronous executor, first task wins + BOOST_TEST_EQ(r.first, 0u); + BOOST_TEST(std::holds_alternative(r.second)); + BOOST_TEST_EQ(std::get(r.second), 42); + }, + [](std::exception_ptr) {})( + when_any(returns_int(42), returns_string("hello"), returns_int(99))); + + BOOST_TEST(completed); + } + + // Test: Can use std::visit on result variant + void + testVariantVisit() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + std::size_t winner_index = 999; + std::variant result_value; + + run_async(ex, + [&](auto&& r) { + completed = true; + winner_index = r.first; + result_value = r.second; + }, + [](std::exception_ptr) {})( + when_any(returns_int(42), returns_string("hello"))); + + BOOST_TEST(completed); + BOOST_TEST(winner_index == 0 || winner_index == 1); + if (winner_index == 0) + BOOST_TEST_EQ(std::get(result_value), 42); + else + BOOST_TEST_EQ(std::get(result_value), "hello"); + } + + //---------------------------------------------------------- + // Parent stop token propagation tests + //---------------------------------------------------------- + + // Test: Parent stop token already requested before when_any starts + void + testParentStopAlreadyRequested() + { + std::queue work_queue; + queuing_executor ex(work_queue); + + std::atomic saw_stop_count{0}; + bool when_any_completed = false; + std::size_t winner_index = 999; + + // A task that checks stop token on first suspension + auto check_stop_task = [&](int id) -> task { + auto token = co_await this_coro::stop_token; + if (token.stop_requested()) { + ++saw_stop_count; + } + co_return id; + }; + + // Use a stop_source to simulate parent cancellation + std::stop_source parent_stop; + parent_stop.request_stop(); + + // Use run_async with stop_token parameter to test propagation + run_async(ex, parent_stop.get_token(), + [&](auto&& r) { + when_any_completed = true; + winner_index = r.first; + }, + [](std::exception_ptr) {})( + when_any(check_stop_task(1), check_stop_task(2), check_stop_task(3))); + + while (!work_queue.empty()) { + auto h = work_queue.front(); + work_queue.pop(); + h.resume(); + } + + BOOST_TEST(when_any_completed); + // All tasks should have seen the stop token as requested + // (inherited from parent) + BOOST_TEST_EQ(saw_stop_count.load(), 3); + } + + // Test: Parent stop requested after tasks start but before winner + void + testParentStopDuringExecution() + { + std::queue work_queue; + queuing_executor ex(work_queue); + + std::atomic cancelled_count{0}; + bool when_any_completed = false; + + auto slow_task = [&](int id, int steps) -> task { + for (int i = 0; i < steps; ++i) { + auto token = co_await this_coro::stop_token; + if (token.stop_requested()) { + ++cancelled_count; + co_return -1; + } + co_await yield_awaitable{}; + } + co_return id; + }; + + std::stop_source parent_stop; + + // Use run_async with stop_token parameter + run_async(ex, parent_stop.get_token(), + [&](auto&&) { + when_any_completed = true; + }, + [](std::exception_ptr) {})( + when_any(slow_task(1, 10), slow_task(2, 10))); + + // Run a few iterations, then request parent stop + for (int i = 0; i < 3 && !work_queue.empty(); ++i) { + auto h = work_queue.front(); + work_queue.pop(); + h.resume(); + } + + // Request stop from parent + parent_stop.request_stop(); + + // Finish processing + while (!work_queue.empty()) { + auto h = work_queue.front(); + work_queue.pop(); + h.resume(); + } + + BOOST_TEST(when_any_completed); + // Both tasks should have been cancelled by parent stop + BOOST_TEST_EQ(cancelled_count.load(), 2); + } + + //---------------------------------------------------------- + // Interleaved exception tests + //---------------------------------------------------------- + + // Test: Multiple exceptions thrown with interleaved execution + void + testInterleavedExceptions() + { + std::queue work_queue; + queuing_executor ex(work_queue); + + bool caught_exception = false; + std::string error_msg; + + // Tasks that yield before throwing + auto delayed_throw = [](int id, int yields) -> task { + for (int i = 0; i < yields; ++i) { + co_await yield_awaitable{}; + } + throw test_exception(("error_" + std::to_string(id)).c_str()); + co_return id; + }; + + run_async(ex, + [](auto&&) {}, + [&](std::exception_ptr ep) { + try { + std::rethrow_exception(ep); + } catch (test_exception const& e) { + caught_exception = true; + error_msg = e.what(); + } + })(when_any(delayed_throw(1, 2), delayed_throw(2, 1), delayed_throw(3, 3))); + + while (!work_queue.empty()) { + auto h = work_queue.front(); + work_queue.pop(); + h.resume(); + } + + BOOST_TEST(caught_exception); + // Task 2 throws first (after 1 yield) + BOOST_TEST_EQ(error_msg, "error_2"); + } + + //---------------------------------------------------------- + // Nested stop propagation tests + //---------------------------------------------------------- + + // Test: Stop propagates through nested when_any - outer task cancelled before inner starts + void + testNestedStopPropagationOuterCancelled() + { + std::queue work_queue; + queuing_executor ex(work_queue); + + std::atomic outer_cancelled{0}; + bool when_any_completed = false; + std::size_t winner_index = 999; + + auto fast_task = [&]() -> task { + co_return 42; + }; + + // A task that checks stop before launching inner when_any + auto nested_when_any_task = [&]() -> task { + auto token = co_await this_coro::stop_token; + if (token.stop_requested()) { + ++outer_cancelled; + co_return -1; + } + // Won't reach here if stopped + co_return 100; + }; + + run_async(ex, + [&](auto&& r) { + when_any_completed = true; + winner_index = r.first; + }, + [](std::exception_ptr) {})( + when_any(fast_task(), nested_when_any_task())); + + while (!work_queue.empty()) { + auto h = work_queue.front(); + work_queue.pop(); + h.resume(); + } + + BOOST_TEST(when_any_completed); + BOOST_TEST_EQ(winner_index, 0u); // fast_task wins + // The nested task should see stop and exit early + BOOST_TEST_EQ(outer_cancelled.load(), 1); + } + + // Test: Stop propagates to inner when_any's children + void + testNestedStopPropagationInnerCancelled() + { + std::queue work_queue; + queuing_executor ex(work_queue); + + std::atomic inner_cancelled{0}; + std::atomic inner_completed{0}; + bool when_any_completed = false; + std::size_t winner_index = 999; + + // Fast task that yields first to let nested when_any start + auto yielding_fast_task = [&]() -> task { + co_await yield_awaitable{}; + co_return 42; + }; + + auto slow_inner_task = [&](int steps) -> task { + for (int i = 0; i < steps; ++i) { + auto token = co_await this_coro::stop_token; + if (token.stop_requested()) { + ++inner_cancelled; + co_return -1; + } + co_await yield_awaitable{}; + } + ++inner_completed; + co_return 100; + }; + + // A task containing a nested when_any - doesn't check stop first + auto nested_when_any_task = [&]() -> task { + // Start inner when_any immediately (no stop check first) + auto [idx, res] = co_await when_any( + slow_inner_task(10), + slow_inner_task(10)); + co_return std::get(res); + }; + + run_async(ex, + [&](auto&& r) { + when_any_completed = true; + winner_index = r.first; + }, + [](std::exception_ptr) {})( + when_any(yielding_fast_task(), nested_when_any_task())); + + while (!work_queue.empty()) { + auto h = work_queue.front(); + work_queue.pop(); + h.resume(); + } + + BOOST_TEST(when_any_completed); + // One of them should win + BOOST_TEST(winner_index == 0 || winner_index == 1); + + if (winner_index == 0) { + // If yielding_fast_task won, the inner tasks should be cancelled + BOOST_TEST_EQ(inner_cancelled.load(), 2); + BOOST_TEST_EQ(inner_completed.load(), 0); + } else { + // If nested_when_any_task won (one of its inner tasks completed) + // one inner task completes, other gets cancelled + BOOST_TEST_EQ(inner_completed.load(), 1); + BOOST_TEST_EQ(inner_cancelled.load(), 1); + } + } + + //---------------------------------------------------------- + // Variant usage pattern tests + //---------------------------------------------------------- + + // Test: Document correct pattern for variant access based on index + void + testVariantAccessByIndex() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + bool correct_access = false; + + run_async(ex, + [&](auto&& r) { + completed = true; + // The correct pattern: use index to determine which type to access + switch (r.first) { + case 0: + correct_access = std::holds_alternative(r.second); + BOOST_TEST_EQ(std::get(r.second), 42); + break; + case 1: + correct_access = std::holds_alternative(r.second); + BOOST_TEST_EQ(std::get(r.second), "hello"); + break; + case 2: + correct_access = std::holds_alternative(r.second); + BOOST_TEST_EQ(std::get(r.second), 3.14); + break; + } + }, + [](std::exception_ptr) {})( + when_any(returns_int(42), returns_string("hello"), []() -> task { co_return 3.14; }())); + + BOOST_TEST(completed); + BOOST_TEST(correct_access); + } + + // Test: Variant with duplicate types - index disambiguation + void + testVariantDuplicateTypesIndexDisambiguation() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + std::size_t winner_index = 999; + int result_value = 0; + + // when_any(int, int, int) deduplicates to variant + // but winner_index tells us WHICH task won + run_async(ex, + [&](auto&& r) { + completed = true; + winner_index = r.first; + result_value = std::get(r.second); + }, + [](std::exception_ptr) {})( + when_any(returns_int(100), returns_int(200), returns_int(300))); + + BOOST_TEST(completed); + // With synchronous executor, first task wins + BOOST_TEST_EQ(winner_index, 0u); + BOOST_TEST_EQ(result_value, 100); + } + + void + run() + { + // Basic functionality + testSingleTask(); + testTwoTasksFirstWins(); + testMixedTypes(); + testVoidTaskWins(); + testAllVoidTasks(); + + // Exception handling + testSingleTaskException(); + testExceptionWinsRace(); + testVoidTaskException(); + testMultipleExceptionsFirstWins(); + + // Stop token propagation + testStopRequestedOnCompletion(); + testAllTasksCompleteForCleanup(); + + // Parent stop token propagation + testParentStopAlreadyRequested(); + testParentStopDuringExecution(); + + // Long-lived task cancellation + testLongLivedTasksCancelledOnWinner(); + testSlowTaskCanWin(); + testNonCooperativeTasksStillComplete(); + testMixedCooperativeAndNonCooperativeTasks(); + + // Interleaved exceptions + testInterleavedExceptions(); + + // Nested combinators + testNestedWhenAny(); + testWhenAnyInsideWhenAll(); + testWhenAllInsideWhenAny(); + + // Nested stop propagation + testNestedStopPropagationOuterCancelled(); + testNestedStopPropagationInnerCancelled(); + + // Edge cases + testManyTasks(); + testTasksWithMultipleSteps(); + + // Awaitable lifecycle + testAwaitableMoveConstruction(); + testDeferredAwait(); + + // Variant access + testVariantAlternativePopulated(); + testVariantVisit(); + testVariantAccessByIndex(); + testVariantDuplicateTypesIndexDisambiguation(); + } +}; + +TEST_SUITE( + when_any_test, + "boost.capy.when_any"); + +//---------------------------------------------------------- +// Homogeneous when_any tests (vector overload) +//---------------------------------------------------------- + +struct when_any_vector_test +{ + //---------------------------------------------------------- + // Basic functionality tests + //---------------------------------------------------------- + + // Test: Single task in vector + void + testSingleTaskVector() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + int result = 0; + std::size_t winner_index = 999; + + std::vector> tasks; + tasks.push_back(returns_int(42)); + + run_async(ex, + [&](std::pair r) { + completed = true; + winner_index = r.first; + result = r.second; + }, + [](std::exception_ptr) {})( + when_any(std::move(tasks))); + + BOOST_TEST(completed); + BOOST_TEST_EQ(winner_index, 0u); + BOOST_TEST_EQ(result, 42); + } + + // Test: Multiple tasks in vector + void + testMultipleTasksVector() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + std::size_t winner_index = 999; + int result_value = 0; + + std::vector> tasks; + tasks.push_back(returns_int(10)); + tasks.push_back(returns_int(20)); + tasks.push_back(returns_int(30)); + + run_async(ex, + [&](std::pair r) { + completed = true; + winner_index = r.first; + result_value = r.second; + }, + [](std::exception_ptr) {})( + when_any(std::move(tasks))); + + BOOST_TEST(completed); + BOOST_TEST(winner_index < 3); + // Verify correct index-to-value mapping + BOOST_TEST_EQ(result_value, static_cast((winner_index + 1) * 10)); + } + + // Test: Empty vector throws + void + testEmptyVectorThrows() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool caught_exception = false; + + std::vector> tasks; + + run_async(ex, + [](std::pair) {}, + [&](std::exception_ptr ep) { + try { + std::rethrow_exception(ep); + } catch (std::invalid_argument const&) { + caught_exception = true; + } + })(when_any(std::move(tasks))); + + BOOST_TEST(caught_exception); + } + + // Test: Void tasks in vector + void + testVoidTasksVector() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + std::size_t winner_index = 999; + + std::vector> tasks; + tasks.push_back(void_task()); + tasks.push_back(void_task()); + tasks.push_back(void_task()); + + run_async(ex, + [&](std::size_t idx) { + completed = true; + winner_index = idx; + }, + [](std::exception_ptr) {})( + when_any(std::move(tasks))); + + BOOST_TEST(completed); + BOOST_TEST(winner_index < 3); + } + + //---------------------------------------------------------- + // Exception handling tests + //---------------------------------------------------------- + + // Test: Exception from task in vector + void + testExceptionInVector() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool caught_exception = false; + std::string error_msg; + + std::vector> tasks; + tasks.push_back(throws_exception("vector error")); + + run_async(ex, + [](std::pair) {}, + [&](std::exception_ptr ep) { + try { + std::rethrow_exception(ep); + } catch (test_exception const& e) { + caught_exception = true; + error_msg = e.what(); + } + })(when_any(std::move(tasks))); + + BOOST_TEST(caught_exception); + BOOST_TEST_EQ(error_msg, "vector error"); + } + + // Test: Exception wins race in vector + void + testExceptionWinsRaceVector() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool caught_exception = false; + std::string error_msg; + + std::vector> tasks; + tasks.push_back(throws_exception("winner")); + tasks.push_back(returns_int(42)); + tasks.push_back(returns_int(99)); + + run_async(ex, + [](std::pair) {}, + [&](std::exception_ptr ep) { + try { + std::rethrow_exception(ep); + } catch (test_exception const& e) { + caught_exception = true; + error_msg = e.what(); + } + })(when_any(std::move(tasks))); + + BOOST_TEST(caught_exception); + BOOST_TEST_EQ(error_msg, "winner"); + } + + // Test: Void task exception in vector + void + testVoidExceptionInVector() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool caught_exception = false; + std::string error_msg; + + std::vector> tasks; + tasks.push_back(void_throws_exception("void vector error")); + tasks.push_back(void_task()); + + run_async(ex, + [](std::size_t) {}, + [&](std::exception_ptr ep) { + try { + std::rethrow_exception(ep); + } catch (test_exception const& e) { + caught_exception = true; + error_msg = e.what(); + } + })(when_any(std::move(tasks))); + + BOOST_TEST(caught_exception); + BOOST_TEST_EQ(error_msg, "void vector error"); + } + + //---------------------------------------------------------- + // Stop token propagation tests + //---------------------------------------------------------- + + // Test: All tasks complete for cleanup (vector) + void + testAllTasksCompleteForCleanupVector() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + std::atomic completion_count{0}; + bool completed = false; + + auto counting_task = [&](int value) -> task { + ++completion_count; + co_return value; + }; + + std::vector> tasks; + tasks.push_back(counting_task(1)); + tasks.push_back(counting_task(2)); + tasks.push_back(counting_task(3)); + tasks.push_back(counting_task(4)); + + run_async(ex, + [&](std::pair) { + completed = true; + }, + [](std::exception_ptr) {})( + when_any(std::move(tasks))); + + BOOST_TEST(completed); + // All four tasks must complete for proper cleanup + BOOST_TEST_EQ(completion_count.load(), 4); + } + + //---------------------------------------------------------- + // Long-lived task cancellation tests (vector) + //---------------------------------------------------------- + + // Test: Long-lived tasks cancelled on winner (vector) + void + testLongLivedTasksCancelledVector() + { + std::queue work_queue; + queuing_executor ex(work_queue); + + std::atomic cancelled_count{0}; + std::atomic completed_normally_count{0}; + bool when_any_completed = false; + std::size_t winner_index = 999; + int winner_value = 0; + + auto fast_task = [&]() -> task { + ++completed_normally_count; + co_return 42; + }; + + auto slow_task = [&](int id, int steps) -> task { + for (int i = 0; i < steps; ++i) { + auto token = co_await this_coro::stop_token; + if (token.stop_requested()) { + ++cancelled_count; + co_return -1; + } + co_await yield_awaitable{}; + } + ++completed_normally_count; + co_return id; + }; + + std::vector> tasks; + tasks.push_back(fast_task()); + tasks.push_back(slow_task(100, 10)); + tasks.push_back(slow_task(200, 10)); + + run_async(ex, + [&](std::pair r) { + when_any_completed = true; + winner_index = r.first; + winner_value = r.second; + }, + [](std::exception_ptr) {})( + when_any(std::move(tasks))); + + while (!work_queue.empty()) { + auto h = work_queue.front(); + work_queue.pop(); + h.resume(); + } + + BOOST_TEST(when_any_completed); + BOOST_TEST_EQ(winner_index, 0u); + BOOST_TEST_EQ(winner_value, 42); + BOOST_TEST_EQ(completed_normally_count.load(), 1); + BOOST_TEST_EQ(cancelled_count.load(), 2); + } + + //---------------------------------------------------------- + // Large vector tests + //---------------------------------------------------------- + + // Test: Many tasks in vector + void + testManyTasksVector() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + std::size_t winner_index = 999; + int result_value = 0; + + std::vector> tasks; + for (int i = 1; i <= 20; ++i) + tasks.push_back(returns_int(i)); + + run_async(ex, + [&](std::pair r) { + completed = true; + winner_index = r.first; + result_value = r.second; + }, + [](std::exception_ptr) {})( + when_any(std::move(tasks))); + + BOOST_TEST(completed); + BOOST_TEST(winner_index < 20); + // Verify correct index-to-value mapping (index 0 -> value 1, etc.) + BOOST_TEST_EQ(result_value, static_cast(winner_index + 1)); + } + + //---------------------------------------------------------- + // Nested combinator tests + //---------------------------------------------------------- + + // Test: Nested when_any with vectors + void + testNestedWhenAnyVector() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + int result = 0; + + auto inner = []() -> task { + std::vector> tasks; + tasks.push_back(returns_int(10)); + tasks.push_back(returns_int(20)); + auto [idx, res] = co_await when_any(std::move(tasks)); + co_return res; + }; + + std::vector> outer_tasks; + outer_tasks.push_back(inner()); + outer_tasks.push_back(inner()); + + run_async(ex, + [&](std::pair r) { + completed = true; + result = r.second; + }, + [](std::exception_ptr) {})( + when_any(std::move(outer_tasks))); + + BOOST_TEST(completed); + BOOST_TEST(result == 10 || result == 20); + } + + // Test: when_any vector inside when_all + void + testWhenAnyVectorInsideWhenAll() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + + auto race = []() -> task { + std::vector> tasks; + tasks.push_back(returns_int(1)); + tasks.push_back(returns_int(2)); + auto [idx, res] = co_await when_any(std::move(tasks)); + co_return res; + }; + + run_async(ex, + [&](std::tuple t) { + auto [a, b] = t; + completed = true; + BOOST_TEST((a == 1 || a == 2)); + BOOST_TEST((b == 1 || b == 2)); + }, + [](std::exception_ptr) {})( + when_all(race(), race())); + + BOOST_TEST(completed); + } + + //---------------------------------------------------------- + // Mixed variadic and vector tests + //---------------------------------------------------------- + + // Test: Mix variadic and vector when_any + void + testMixedVariadicAndVector() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool completed = false; + std::size_t outer_winner = 999; + + auto variadic_race = []() -> task { + auto [idx, res] = co_await when_any(returns_int(1), returns_int(2)); + co_return std::get(res); + }; + + auto vector_race = []() -> task { + std::vector> tasks; + tasks.push_back(returns_int(3)); + tasks.push_back(returns_int(4)); + auto [idx, res] = co_await when_any(std::move(tasks)); + co_return res; + }; + + run_async(ex, + [&](auto r) { + completed = true; + outer_winner = r.first; + auto result = std::get(r.second); + if (outer_winner == 0) + BOOST_TEST((result == 1 || result == 2)); + else + BOOST_TEST((result == 3 || result == 4)); + }, + [](std::exception_ptr) {})( + when_any(variadic_race(), vector_race())); + + BOOST_TEST(completed); + } + + void + run() + { + // Basic functionality + testSingleTaskVector(); + testMultipleTasksVector(); + testEmptyVectorThrows(); + testVoidTasksVector(); + + // Exception handling + testExceptionInVector(); + testExceptionWinsRaceVector(); + testVoidExceptionInVector(); + + // Stop token propagation + testAllTasksCompleteForCleanupVector(); + + // Long-lived task cancellation + testLongLivedTasksCancelledVector(); + + // Large vectors + testManyTasksVector(); + + // Nested combinators + testNestedWhenAnyVector(); + testWhenAnyVectorInsideWhenAll(); + + // Mixed variadic and vector + testMixedVariadicAndVector(); + } +}; + +TEST_SUITE( + when_any_vector_test, + "boost.capy.when_any_vector"); + +} // capy +} // boost