diff --git a/example/CMakeLists.txt b/example/CMakeLists.txt index 0d33c74c..c5b378d0 100644 --- a/example/CMakeLists.txt +++ b/example/CMakeLists.txt @@ -17,6 +17,7 @@ add_subdirectory(mock-stream-testing) add_subdirectory(parallel-fetch) add_subdirectory(parallel-tasks) add_subdirectory(producer-consumer) +add_subdirectory(quitter-shutdown) add_subdirectory(strand-serialization) add_subdirectory(stream-pipeline) add_subdirectory(timeout-cancellation) diff --git a/example/quitter-shutdown/CMakeLists.txt b/example/quitter-shutdown/CMakeLists.txt new file mode 100644 index 00000000..14d6f164 --- /dev/null +++ b/example/quitter-shutdown/CMakeLists.txt @@ -0,0 +1,22 @@ +# +# Copyright (c) 2026 Michael Vandeberg +# +# Distributed under the Boost Software License, Version 1.0. (See accompanying +# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +# +# Official repository: https://github.com/cppalliance/capy +# + +file(GLOB_RECURSE PFILES CONFIGURE_DEPENDS *.cpp *.hpp + CMakeLists.txt + Jamfile) + +source_group(TREE ${CMAKE_CURRENT_SOURCE_DIR} PREFIX "" FILES ${PFILES}) + +add_executable(capy_example_quitter_shutdown ${PFILES}) + +set_property(TARGET capy_example_quitter_shutdown + PROPERTY FOLDER "examples") + +target_link_libraries(capy_example_quitter_shutdown + Boost::capy) diff --git a/example/quitter-shutdown/quitter_shutdown.cpp b/example/quitter-shutdown/quitter_shutdown.cpp new file mode 100644 index 00000000..ef77f6e2 --- /dev/null +++ b/example/quitter-shutdown/quitter_shutdown.cpp @@ -0,0 +1,165 @@ +// +// Copyright (c) 2026 Michael Vandeberg +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/capy +// + +/* Quitter Shutdown Example + + Demonstrates quitter for responsive application shutdown. + + Four workers simulate a batch file-processing pipeline: each + "downloads" data (delay), "transforms" it, and "writes" the + result (delay). Workers are quitter<> coroutines — their + bodies contain zero cancellation-handling code. + + Press Ctrl+C to request shutdown. Every in-flight worker + exits at its next co_await, RAII cleanup runs (each worker + holds a resource_guard that logs its cleanup), and the + application prints a summary and exits. + + Contrast with task<>: + With task<>, every co_await that touches I/O needs: + auto [ec] = co_await delay(dur); + if(ec) co_return; // <-- cancellation boilerplate + This is repeated at every suspension point. + + With quitter<>, the promise intercepts the stop token + automatically. The worker body is pure business logic. +*/ + +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace capy = boost::capy; +using namespace std::chrono_literals; + +// Global stop source wired to Ctrl+C. +static std::stop_source g_stop; +static std::atomic + g_stop_time{std::chrono::steady_clock::time_point{}}; + +extern "C" void signal_handler(int) +{ + g_stop_time.store(std::chrono::steady_clock::now(), + std::memory_order_relaxed); + g_stop.request_stop(); +} + +// RAII resource that logs construction and destruction. +// Simulates holding a file handle, socket, or temp buffer +// that must be released on shutdown. +struct resource_guard +{ + int id; + std::atomic& cleanup_count; + + resource_guard(int id_, std::atomic& count) + : id(id_) + , cleanup_count(count) + { + std::ostringstream oss; + oss << " [worker " << id << "] acquired resources\n"; + std::cout << oss.str(); + } + + ~resource_guard() + { + ++cleanup_count; + std::ostringstream oss; + oss << " [worker " << id << "] released resources " + << "(cleanup)\n"; + std::cout << oss.str(); + } + + resource_guard(resource_guard const&) = delete; + resource_guard& operator=(resource_guard const&) = delete; +}; + +// A single worker: download → transform → write, repeated. +// No cancellation code. quitter handles it. +capy::quitter<> worker( + int id, + std::atomic& items_processed, + std::atomic& cleanup_count) +{ + resource_guard guard(id, cleanup_count); + + for(int item = 0; ; ++item) + { + // Simulate download (200-400ms depending on worker) + auto download_time = 200ms + 50ms * id; + (void) co_await capy::delay(download_time); + + // Simulate transform (CPU work — no co_await needed) + { + std::ostringstream oss; + oss << " [worker " << id << "] processing item " + << item << "\n"; + std::cout << oss.str(); + } + + // Simulate write (100ms) + (void) co_await capy::delay(100ms); + + ++items_processed; + } + + // Never reached — the loop is infinite. + // quitter exits at the next co_await after stop is requested. +} + +int main() +{ + std::signal(SIGINT, signal_handler); +#ifdef SIGTERM + std::signal(SIGTERM, signal_handler); +#endif + + constexpr int num_workers = 4; + capy::thread_pool pool(num_workers); + std::latch done(num_workers); + + std::atomic items_processed{0}; + std::atomic cleanup_count{0}; + + std::cout << "Starting " << num_workers + << " workers. Press Ctrl+C to quit.\n\n"; + + for(int i = 0; i < num_workers; ++i) + { + capy::run_async( + pool.get_executor(), + g_stop.get_token(), + [&]() { done.count_down(); }, + [&](std::exception_ptr) { done.count_down(); })( + worker(i, items_processed, cleanup_count)); + } + + done.wait(); + + auto stop_at = g_stop_time.load(std::memory_order_relaxed); + auto now = std::chrono::steady_clock::now(); + + std::cout << "\nShutdown complete.\n" + << " Items processed: " << items_processed << "\n" + << " Workers cleaned up: " << cleanup_count + << "/" << num_workers << "\n"; + + if(stop_at != std::chrono::steady_clock::time_point{}) + { + auto us = std::chrono::duration_cast< + std::chrono::microseconds>(now - stop_at).count(); + std::cout << " Shutdown latency: " << us << " us\n"; + } +} diff --git a/include/boost/capy.hpp b/include/boost/capy.hpp index f61899f6..eb1e20bc 100644 --- a/include/boost/capy.hpp +++ b/include/boost/capy.hpp @@ -22,6 +22,7 @@ #include #include #include +#include #include // Algorithms diff --git a/include/boost/capy/detail/stop_requested_exception.hpp b/include/boost/capy/detail/stop_requested_exception.hpp new file mode 100644 index 00000000..f5bc47e9 --- /dev/null +++ b/include/boost/capy/detail/stop_requested_exception.hpp @@ -0,0 +1,28 @@ +// +// Copyright (c) 2026 Michael Vandeberg +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/capy +// + +#ifndef BOOST_CAPY_DETAIL_STOP_REQUESTED_EXCEPTION_HPP +#define BOOST_CAPY_DETAIL_STOP_REQUESTED_EXCEPTION_HPP + +namespace boost { +namespace capy { +namespace detail { + +/* Lightweight sentinel thrown inside quitter when the stop token + is triggered. Not derived from std::exception. Never escapes the + coroutine — unhandled_exception() catches it and sets the stopped + flag. The cost is one throw+catch per cancellation per coroutine + lifetime. */ +struct stop_requested_exception {}; + +} // namespace detail +} // namespace capy +} // namespace boost + +#endif diff --git a/include/boost/capy/ex/run_async.hpp b/include/boost/capy/ex/run_async.hpp index bc268602..4f559c5d 100644 --- a/include/boost/capy/ex/run_async.hpp +++ b/include/boost/capy/ex/run_async.hpp @@ -283,9 +283,16 @@ make_trampoline(Ex, Handlers, Alloc) // promise_type ctor steals the parameters auto& p = co_await get_promise_awaiter< typename run_async_trampoline::promise_type>{}; - + + // Guard ensures the task frame is destroyed even when invoke_ + // throws (e.g. default_handler rethrows an unhandled exception). + struct frame_guard + { + std::coroutine_handle<>& h; + ~frame_guard() { h.destroy(); } + } guard{p.task_h_}; + p.invoke_(p.task_promise_, p.handlers_); - p.task_h_.destroy(); } } // namespace detail diff --git a/include/boost/capy/quitter.hpp b/include/boost/capy/quitter.hpp new file mode 100644 index 00000000..0ef64cc5 --- /dev/null +++ b/include/boost/capy/quitter.hpp @@ -0,0 +1,359 @@ +// +// Copyright (c) 2026 Michael Vandeberg +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/capy +// + +#ifndef BOOST_CAPY_QUITTER_HPP +#define BOOST_CAPY_QUITTER_HPP + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +/* Stop-aware coroutine task. + + quitter is identical to task except that when the stop token + is triggered, the coroutine body never sees the cancellation. The + promise intercepts it on resume (in transform_awaiter::await_resume) + and throws a sentinel exception that unwinds through RAII destructors + to final_suspend. The parent sees a "stopped" completion. + + See doc/quitter.md for the full design rationale. */ + +namespace boost { +namespace capy { + +namespace detail { + +// Reuse the same return-value storage as task. +// task_return_base is defined in task.hpp, but quitter needs its own +// copy to avoid a header dependency on task.hpp. +template +struct quitter_return_base +{ + std::optional result_; + + void return_value(T value) + { + result_ = std::move(value); + } + + T&& result() noexcept + { + return std::move(*result_); + } +}; + +template<> +struct quitter_return_base +{ + void return_void() + { + } +}; + +} // namespace detail + +/** Stop-aware lazy coroutine task satisfying @ref IoRunnable. + + When the stop token is triggered, the next `co_await` inside the + coroutine short-circuits: the body never sees the result and RAII + destructors run normally. The parent observes a "stopped" + completion via @ref promise_type::stopped. + + Everything else — frame allocation, environment propagation, + symmetric transfer, move semantics — is identical to @ref task. + + @tparam T The result type. Use `quitter<>` for `quitter`. + + @see task, IoRunnable, IoAwaitable +*/ +template +struct [[nodiscard]] BOOST_CAPY_CORO_AWAIT_ELIDABLE + quitter +{ + struct promise_type + : io_awaitable_promise_base + , detail::quitter_return_base + { + private: + friend quitter; + + enum class completion { running, value, exception, stopped }; + + union { std::exception_ptr ep_; }; + completion state_; + + public: + promise_type() noexcept + : state_(completion::running) + { + } + + ~promise_type() + { + if(state_ == completion::exception || + state_ == completion::stopped) + ep_.~exception_ptr(); + } + + /// Return a non-null exception_ptr when the coroutine threw + /// or was stopped. Stopped quitters report the sentinel + /// stop_requested_exception so that run_async routes to + /// the error handler instead of accessing a non-existent + /// result. + std::exception_ptr exception() const noexcept + { + if(state_ == completion::exception || + state_ == completion::stopped) + return ep_; + return {}; + } + + /// True when the coroutine was stopped via the stop token. + bool stopped() const noexcept + { + return state_ == completion::stopped; + } + + quitter get_return_object() + { + return quitter{ + std::coroutine_handle::from_promise(*this)}; + } + + auto initial_suspend() noexcept + { + struct awaiter + { + promise_type* p_; + + bool await_ready() const noexcept + { + return false; + } + + void await_suspend(std::coroutine_handle<>) const noexcept + { + } + + // Potentially-throwing: checks the stop token before + // the coroutine body executes its first statement. + void await_resume() const + { + set_current_frame_allocator( + p_->environment()->frame_allocator); + if(p_->environment()->stop_token.stop_requested()) + throw detail::stop_requested_exception{}; + } + }; + return awaiter{this}; + } + + auto final_suspend() noexcept + { + struct awaiter + { + promise_type* p_; + + bool await_ready() const noexcept + { + return false; + } + + std::coroutine_handle<> await_suspend( + std::coroutine_handle<>) const noexcept + { + return p_->continuation(); + } + + void await_resume() const noexcept + { + } + }; + return awaiter{this}; + } + + void unhandled_exception() + { + try + { + throw; + } + catch(detail::stop_requested_exception const&) + { + // Store the exception_ptr so that run_async's + // invoke_impl routes to the error handler + // instead of accessing a non-existent result. + new (&ep_) std::exception_ptr( + std::current_exception()); + state_ = completion::stopped; + } + catch(...) + { + new (&ep_) std::exception_ptr( + std::current_exception()); + state_ = completion::exception; + } + } + + //------------------------------------------------------ + // transform_awaitable — the key difference from task + //------------------------------------------------------ + + template + struct transform_awaiter + { + std::decay_t a_; + promise_type* p_; + + bool await_ready() noexcept + { + return a_.await_ready(); + } + + // Check the stop token BEFORE the coroutine body + // sees the result of the I/O operation. + decltype(auto) await_resume() + { + set_current_frame_allocator( + p_->environment()->frame_allocator); + if(p_->environment()->stop_token.stop_requested()) + throw detail::stop_requested_exception{}; + return a_.await_resume(); + } + + template + auto await_suspend( + std::coroutine_handle h) noexcept + { + using R = decltype( + a_.await_suspend(h, p_->environment())); + if constexpr (std::is_same_v< + R, std::coroutine_handle<>>) + return detail::symmetric_transfer( + a_.await_suspend(h, p_->environment())); + else + return a_.await_suspend( + h, p_->environment()); + } + }; + + template + auto transform_awaitable(Awaitable&& a) + { + using A = std::decay_t; + if constexpr (IoAwaitable) + { + return transform_awaiter{ + std::forward(a), this}; + } + else + { + static_assert(sizeof(A) == 0, + "requires IoAwaitable"); + } + } + }; + + std::coroutine_handle h_; + + /// Destroy the quitter and its coroutine frame if owned. + ~quitter() + { + if(h_) + h_.destroy(); + } + + /// Return false; quitters are never immediately ready. + bool await_ready() const noexcept + { + return false; + } + + /** Return the result, rethrow exception, or propagate stop. + + When stopped, throws stop_requested_exception so that a + parent quitter also stops. A parent task will see this + as an unhandled exception — by design. + */ + auto await_resume() + { + if(h_.promise().stopped()) + throw detail::stop_requested_exception{}; + if(h_.promise().state_ == promise_type::completion::exception) + std::rethrow_exception(h_.promise().ep_); + if constexpr (! std::is_void_v) + return std::move(*h_.promise().result_); + else + return; + } + + /// Start execution with the caller's context. + std::coroutine_handle<> await_suspend( + std::coroutine_handle<> cont, + io_env const* env) + { + h_.promise().set_continuation(cont); + h_.promise().set_environment(env); + return h_; + } + + /// Return the coroutine handle. + std::coroutine_handle handle() const noexcept + { + return h_; + } + + /// Release ownership of the coroutine frame. + void release() noexcept + { + h_ = nullptr; + } + + quitter(quitter const&) = delete; + quitter& operator=(quitter const&) = delete; + + /// Construct by moving, transferring ownership. + quitter(quitter&& other) noexcept + : h_(std::exchange(other.h_, nullptr)) + { + } + + /// Assign by moving, transferring ownership. + quitter& operator=(quitter&& other) noexcept + { + if(this != &other) + { + if(h_) + h_.destroy(); + h_ = std::exchange(other.h_, nullptr); + } + return *this; + } + +private: + explicit quitter(std::coroutine_handle h) + : h_(h) + { + } +}; + +} // namespace capy +} // namespace boost + +#endif diff --git a/test/unit/quitter.cpp b/test/unit/quitter.cpp new file mode 100644 index 00000000..52fe8652 --- /dev/null +++ b/test/unit/quitter.cpp @@ -0,0 +1,819 @@ +// +// Copyright (c) 2026 Michael Vandeberg +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/capy +// + +// Test that header file is self-contained. +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "test_helpers.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace boost { +namespace capy { + +static_assert(IoAwaitable>); +static_assert(IoAwaitable>); +static_assert(IoRunnable>); +static_assert(IoRunnable>); + +struct quitter_test +{ + //---------------------------------------------------------- + // 1. Normal completion — quitter returns a value + //---------------------------------------------------------- + + static quitter + returns_int() + { + co_return 42; + } + + void + testNormalCompletion() + { + int result = 0; + test::run_blocking([&](int v) { result = v; })(returns_int()); + BOOST_TEST_EQ(result, 42); + } + + //---------------------------------------------------------- + // 2. Void completion + //---------------------------------------------------------- + + static quitter<> + void_quitter() + { + co_return; + } + + void + testVoidCompletion() + { + test::run_blocking()(void_quitter()); + } + + //---------------------------------------------------------- + // 3. Exception propagation + //---------------------------------------------------------- + + static quitter<> + throws_quitter() + { + throw test_exception("quitter exception"); + co_return; + } + + void + testExceptionPropagation() + { + BOOST_TEST_THROWS( + test::run_blocking()(throws_quitter()), + test_exception); + } + + //---------------------------------------------------------- + // 4. Stop before first co_await + //---------------------------------------------------------- + + struct raii_counter + { + int* count; + raii_counter(int& c) : count(&c) {} + raii_counter(raii_counter&& o) noexcept + : count(std::exchange(o.count, nullptr)) {} + ~raii_counter() { if(count) ++(*count); } + }; + + static quitter<> + quitter_with_raii(int& dtor_count) + { + raii_counter guard(dtor_count); + co_await stop_only_awaitable{}; + } + + void + testStopBeforeFirstAwait() + { + // When stop is already requested, initial_suspend throws + // before the coroutine body starts. Body locals are never + // constructed, so dtor_count stays 0. The stopped state + // routes to the error handler via exception(). + int dispatch_count = 0; + test_executor ex(dispatch_count); + std::stop_source source; + source.request_stop(); + + int dtor_count = 0; + bool got_stopped = false; + + run_async(ex, source.get_token(), + [](){ BOOST_TEST(false); }, + [&](std::exception_ptr ep) { + got_stopped = (ep != nullptr); + })(quitter_with_raii(dtor_count)); + + BOOST_TEST(got_stopped); + // Body never started, so no destructors ran + BOOST_TEST_EQ(dtor_count, 0); + } + + //---------------------------------------------------------- + // 5. Stop during I/O + //---------------------------------------------------------- + + void + testStopDuringIO() + { + std::atomic state = 0; + std::binary_semaphore suspended{0}; + + int dtor_count = 0; + + auto q = [&]() -> quitter<> + { + raii_counter guard(dtor_count); + state = 1; + suspended.release(); + co_await stop_only_awaitable{}; + // Should never reach here when stopped + state = 99; + }; + + { + std::jthread jt( + [&](std::stop_token st) + { + int dc = 0; + test_executor ex(dc); + run_async(ex, st)(q()); + } + ); + suspended.acquire(); + BOOST_TEST(state == 1); + // jthread destructor calls request_stop() then join() + } + + // The coroutine was stopped; it should NOT have reached state=99 + BOOST_TEST(state != 99); + BOOST_TEST_EQ(dtor_count, 1); + } + + //---------------------------------------------------------- + // 6. Stop propagation through chain + //---------------------------------------------------------- + + static quitter<> + inner_quitter( + int& dtor_count, + bool& reached_end, + std::binary_semaphore& suspended) + { + raii_counter guard(dtor_count); + suspended.release(); + co_await stop_only_awaitable{}; + reached_end = true; + } + + static quitter<> + middle_quitter( + int& dtor_count, + bool& reached_end, + std::binary_semaphore& suspended) + { + raii_counter guard(dtor_count); + co_await inner_quitter(dtor_count, reached_end, suspended); + reached_end = true; + } + + static quitter<> + outer_quitter( + int& dtor_count, + bool& reached_end, + std::binary_semaphore& suspended) + { + raii_counter guard(dtor_count); + co_await middle_quitter(dtor_count, reached_end, suspended); + reached_end = true; + } + + void + testStopPropagationChain() + { + std::binary_semaphore suspended{0}; + int dtor_count = 0; + bool reached_end = false; + + auto top = [&]() -> quitter<> + { + co_await outer_quitter( + dtor_count, reached_end, suspended); + reached_end = true; + }; + + { + std::jthread jt( + [&](std::stop_token st) + { + int dc = 0; + test_executor ex(dc); + run_async(ex, st)(top()); + } + ); + suspended.acquire(); + } + + // 3 guards from outer/middle/inner + BOOST_TEST_EQ(dtor_count, 3); + // No coroutine body continued past its co_await + BOOST_TEST(!reached_end); + } + + //---------------------------------------------------------- + // 9. Mixing quitter and task — task awaits quitter + //---------------------------------------------------------- + + static quitter + quitter_returns_42() + { + co_return 42; + } + + static task + task_awaits_quitter() + { + int v = co_await quitter_returns_42(); + co_return v + 1; + } + + void + testMixingQuitterAndTask() + { + // Normal case: task awaits quitter that completes normally + { + int result = 0; + test::run_blocking( + [&](int v) { result = v; })(task_awaits_quitter()); + BOOST_TEST_EQ(result, 43); + } + + // Stopped case: task awaits stopped quitter — sees exception + { + std::stop_source source; + source.request_stop(); + + int dispatch_count = 0; + test_executor ex(dispatch_count); + bool got_exception = false; + + auto t = []() -> task + { + co_return co_await quitter_returns_42(); + }; + + run_async(ex, source.get_token(), + [](int) { BOOST_TEST(false); }, + [&](std::exception_ptr ep) { + got_exception = (ep != nullptr); + })(t()); + + BOOST_TEST(got_exception); + } + } + + //---------------------------------------------------------- + // 10. No stop requested — identical to task + //---------------------------------------------------------- + + static quitter + quitter_chain() + { + auto inner = []() -> quitter { + co_return 10; + }; + + auto middle = [inner]() -> quitter { + int v = co_await inner(); + co_return v * 2; + }; + + int v = co_await middle(); + co_return v + 5; + } + + void + testNoStopRequested() + { + int result = 0; + test::run_blocking( + [&](int v) { result = v; })(quitter_chain()); + BOOST_TEST_EQ(result, 25); + } + + //---------------------------------------------------------- + // 11. RAII verification + //---------------------------------------------------------- + + void + testRAIIVerification() + { + // Body starts, constructs guards, then stop is requested + // during the co_await. All guard destructors must run. + // The body must NOT continue past co_await. + std::binary_semaphore suspended{0}; + int dtor_count = 0; + bool reached_end = false; + + auto q = [&]() -> quitter<> + { + raii_counter g1(dtor_count); + raii_counter g2(dtor_count); + raii_counter g3(dtor_count); + suspended.release(); + co_await stop_only_awaitable{}; + reached_end = true; + }; + + { + std::jthread jt( + [&](std::stop_token st) + { + int dc = 0; + test_executor ex(dc); + run_async(ex, st)(q()); + } + ); + suspended.acquire(); + } + + BOOST_TEST_EQ(dtor_count, 3); + BOOST_TEST(!reached_end); + } + + //---------------------------------------------------------- + // 12. Multiple co_await — stop after second + //---------------------------------------------------------- + + static quitter + quitter_multi_await( + std::atomic& progress, + std::binary_semaphore& sem) + { + progress = 1; + co_await yield_awaitable{}; + progress = 2; + sem.release(); + co_await stop_only_awaitable{}; + // Should not reach here + progress = 3; + co_return 0; + } + + void + testMultipleCoAwait() + { + std::atomic progress{0}; + std::binary_semaphore sem{0}; + + { + std::jthread jt( + [&](std::stop_token st) + { + int dc = 0; + test_executor ex(dc); + run_async(ex, st)( + quitter_multi_await(progress, sem)); + } + ); + sem.acquire(); + BOOST_TEST(progress == 2); + // jthread destructor requests stop + } + + // The third await should have been short-circuited + BOOST_TEST(progress != 3); + } + + //---------------------------------------------------------- + // Move operations + //---------------------------------------------------------- + + void + testMoveOperations() + { + int dispatch_count = 0; + test_executor ex(dispatch_count); + io_env env{executor_ref(ex), {}, nullptr}; + + // move constructor + { + auto q1 = returns_int(); + auto h1 = q1.handle(); + q1.release(); + BOOST_TEST(h1); + + quitter q2(std::move(q1)); + BOOST_TEST(!q2.handle()); + + h1.promise().set_environment(&env); + while(!h1.done()) + h1.resume(); + BOOST_TEST_EQ(*h1.promise().result_, 42); + h1.destroy(); + } + + // release() + { + auto q = returns_int(); + auto h = q.handle(); + q.release(); + BOOST_TEST(h); + BOOST_TEST(!q.handle()); + + h.promise().set_environment(&env); + while(!h.done()) + h.resume(); + BOOST_TEST(h.promise().result_.has_value()); + BOOST_TEST_EQ(*h.promise().result_, 42); + h.destroy(); + } + } + + //---------------------------------------------------------- + // Quitter returning string + //---------------------------------------------------------- + + static quitter + returns_string() + { + co_return "hello"; + } + + void + testReturnString() + { + std::string result; + test::run_blocking( + [&](std::string v) { result = std::move(v); })( + returns_string()); + BOOST_TEST_EQ(result, "hello"); + } + + //---------------------------------------------------------- + // Exception in quitter + //---------------------------------------------------------- + + static quitter + quitter_throws_int() + { + throw test_exception("quitter int exception"); + co_return 0; + } + + void + testExceptionInValueQuitter() + { + BOOST_TEST_THROWS( + test::run_blocking()(quitter_throws_int()), + test_exception); + } + + //---------------------------------------------------------- + // 7. Stop propagation with when_all + // + // Two quitter> children inside when_all. + // Both block on stop_only_awaitable. when_all creates a + // child stop_source; when the parent stop fires, when_all + // propagates it. Each quitter child intercepts the stop + // in transform_awaiter::await_resume and short-circuits + // via stop_requested_exception — it never reaches the + // co_return. Verify both stop and when_all completes. + //---------------------------------------------------------- + + static quitter> + quitter_pending_size(bool& reached_co_return) + { + co_await stop_only_awaitable{}; + // If quitter's transform_awaiter intercepted the stop, + // we never reach here. + reached_co_return = true; + co_return io_result{ + make_error_code(error::canceled), 0}; + } + + void + testWhenAllWithStop() + { + thread_pool pool(2); + std::latch done(1); + std::latch suspended(1); + std::stop_source source; + + bool child1_returned = false; + bool child2_returned = false; + + auto outer = [&]() -> task<> + { + suspended.count_down(); + auto result = co_await when_all( + quitter_pending_size(child1_returned), + quitter_pending_size(child2_returned)); + (void)result; + }; + + run_async(pool.get_executor(), source.get_token(), + [&]() { done.count_down(); }, + [&](std::exception_ptr) { done.count_down(); })( + outer()); + + suspended.wait(); + std::this_thread::sleep_for( + std::chrono::milliseconds(10 * failsafe_scale)); + source.request_stop(); + + done.wait(); + // Both quitter children were stopped by transform_awaiter + // before reaching co_return. + BOOST_TEST(!child1_returned); + BOOST_TEST(!child2_returned); + } + + //---------------------------------------------------------- + // 8. Stop propagation with when_any + // + // Two quitter children. One succeeds immediately, + // when_any stops the sibling. The sibling quitter + // intercepts the stop and exits cleanly. + //---------------------------------------------------------- + + static quitter> + quitter_success_size(std::size_t n) + { + co_return io_result{{}, n}; + } + + void + testWhenAnyWithStop() + { + // One child succeeds immediately. when_any stops + // the pending sibling quitter. The sibling must + // be intercepted by transform_awaiter (never reach + // co_return). + { + thread_pool pool(2); + std::latch done(1); + bool sibling_returned = false; + + auto outer = [&]() -> task<> + { + auto result = co_await when_any( + quitter_success_size(42), + quitter_pending_size(sibling_returned)); + // Variadic when_any returns + // variant. + // Index 1 = first child won. + BOOST_TEST(result.index() == 1); + if(result.index() == 1) + BOOST_TEST_EQ(std::get<1>(result), + std::size_t(42)); + }; + + run_async(pool.get_executor(), + [&]() { done.count_down(); }, + [&](std::exception_ptr) { + done.count_down(); + })(outer()); + + done.wait(); + BOOST_TEST(!sibling_returned); + } + + // Both children pending. Parent stop fires. + // when_any propagates stop to children. Both + // quitter children short-circuit. + { + thread_pool pool(2); + std::latch done(1); + std::latch suspended(1); + std::stop_source source; + + bool child1_returned = false; + bool child2_returned = false; + + auto outer = [&]() -> task<> + { + suspended.count_down(); + auto result = co_await when_any( + quitter_pending_size(child1_returned), + quitter_pending_size(child2_returned)); + (void)result; + }; + + run_async(pool.get_executor(), source.get_token(), + [&]() { done.count_down(); }, + [&](std::exception_ptr) { + done.count_down(); + })(outer()); + + suspended.wait(); + std::this_thread::sleep_for( + std::chrono::milliseconds(10 * failsafe_scale)); + source.request_stop(); + + done.wait(); + BOOST_TEST(!child1_returned); + BOOST_TEST(!child2_returned); + } + } + + //---------------------------------------------------------- + // 14. Timer cancellation + //---------------------------------------------------------- + + void + testTimerCancellation() + { + using namespace std::chrono_literals; + + thread_pool pool(1); + std::latch done(1); + std::latch suspended(1); + std::stop_source source; + bool reached_end = false; + + auto q = [&]() -> quitter<> + { + suspended.count_down(); + auto [ec] = co_await delay(10s); + (void)ec; + reached_end = true; + }; + + auto start = std::chrono::steady_clock::now(); + + run_async(pool.get_executor(), source.get_token(), + [&]() { done.count_down(); }, + [&](std::exception_ptr) { done.count_down(); })(q()); + + suspended.wait(); + std::this_thread::sleep_for( + std::chrono::milliseconds(10 * failsafe_scale)); + source.request_stop(); + + done.wait(); + auto elapsed = std::chrono::steady_clock::now() - start; + // Should complete promptly, well under 10s + BOOST_TEST(elapsed < 1s); + // Quitter intercepted the stop — body did not continue + BOOST_TEST(!reached_end); + } + + //---------------------------------------------------------- + // 13. Echo server with shutdown + // + // A quitter echo loop over a mock stream pair. + // The client exchanges data, then requests stop. + // The echo quitter exits cleanly via stop interception, + // RAII runs, and the echoed data was correct. + // + // The mock stream's read_some is not stop-aware, so we + // wrap each read in when_any with a stop_only_awaitable + // to make it cancellable — mirroring how a real server + // would have the OS cancel an in-flight read. + //---------------------------------------------------------- + + void + testEchoWithShutdown() + { + // Echo server over a mock stream pair. The server + // reads pre-provided data, echoes it back, then + // waits for shutdown via stop_only_awaitable. When + // stop fires, the quitter intercepts at the co_await + // and exits cleanly. All stream access is on the + // jthread's synchronous executor — no cross-thread + // stream use. + test::fuse f; + auto [server_end, client_end] = + test::make_stream_pair(f); + + client_end.provide("hello"); + + int dtor_count = 0; + bool reached_end = false; + std::size_t total_echoed = 0; + std::binary_semaphore suspended{0}; + + auto echo_server = [&]() -> quitter<> + { + raii_counter guard(dtor_count); + char buf[64]; + + // Echo loop: process all available data + auto [ec, n] = co_await server_end.read_some( + make_buffer(buf)); + if(ec) + co_return; + total_echoed += n; + auto [ec2, n2] = co_await server_end.write_some( + make_buffer(buf, n)); + if(ec2) + co_return; + + // Signal that echo is done, then wait for + // shutdown. stop_only_awaitable suspends until + // the stop token fires. + suspended.release(); + co_await stop_only_awaitable{}; + + // Should never reach here — quitter intercepts + reached_end = true; + }; + + { + std::jthread jt( + [&](std::stop_token st) + { + int dc = 0; + test_executor ex(dc); + run_async(ex, st)(echo_server()); + } + ); + suspended.acquire(); + + // Verify the echo happened + BOOST_TEST_EQ(total_echoed, std::size_t(5)); + + // Read back the echoed data from client side + // (synchronous — data is already in the buffer) + // Not possible here since we're on the main + // thread and streams are single-threaded. The + // echo write went to client_end's read buffer + // which we can't access cross-thread. The + // echo itself is verified by total_echoed. + + // jthread destructor requests stop and joins + } + + BOOST_TEST_EQ(dtor_count, 1); + BOOST_TEST(!reached_end); + } + + //---------------------------------------------------------- + // run() + //---------------------------------------------------------- + + void + run() + { + testNormalCompletion(); + testVoidCompletion(); + testExceptionPropagation(); + testStopBeforeFirstAwait(); + testStopDuringIO(); + testStopPropagationChain(); + testMixingQuitterAndTask(); + testNoStopRequested(); + testRAIIVerification(); + testMultipleCoAwait(); + testMoveOperations(); + testReturnString(); + testExceptionInValueQuitter(); + testWhenAllWithStop(); + testWhenAnyWithStop(); + testTimerCancellation(); + testEchoWithShutdown(); + } +}; + +TEST_SUITE( + quitter_test, + "boost.capy.quitter"); + +} // capy +} // boost