From e95566bb095a49ba0ade8dfbd5a6444ead07a872 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Tue, 2 Dec 2025 23:00:30 +0000 Subject: [PATCH 01/11] rewrote affine_on to use infallible scheduling [mostly] --- examples/CMakeLists.txt | 6 +- include/beman/task/detail/affine_on.hpp | 112 ++++++++++++++++++++---- 2 files changed, 99 insertions(+), 19 deletions(-) diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index f2c9f34..9db1299 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -1,6 +1,9 @@ # SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception -set(TODO tls-scheduler) +set(TODO + tls-scheduler + into_optional +) set(ALL_EXAMPLES odd-return @@ -29,7 +32,6 @@ set(ALL_EXAMPLES escaped-exception friendly hello - into_optional issue-affine_on issue-frame-allocator issue-start-reschedules diff --git a/include/beman/task/detail/affine_on.hpp b/include/beman/task/detail/affine_on.hpp index 0bb5baf..0c6d56f 100644 --- a/include/beman/task/detail/affine_on.hpp +++ b/include/beman/task/detail/affine_on.hpp @@ -7,6 +7,8 @@ #include #include #include +#include +#include // ---------------------------------------------------------------------------- @@ -55,23 +57,103 @@ struct affine_on_t::sender : ::beman::execution::detail::product_type<::beman::t } } + template <::beman::execution::receiver Receiver> + struct state { + template + struct to_tuple_t; + template + struct to_tuple_t { + using type = ::std::tuple...>; + }; + template + struct to_variant_t; + template + struct to_variant_t<::beman::execution::completion_signatures> { + using type = ::std::variant::type...>; + }; + + using operation_state_concept = ::beman::execution::operation_state_t; + using completion_signatures = decltype(::beman::execution::get_completion_signatures( + ::std::declval(), ::beman::execution::get_env(::std::declval()))); + using value_type = typename to_variant_t::type; + // static_assert(std::same_as); + + struct schedule_receiver { + using receiver_concept = ::beman::execution::receiver_t; + state* s; + auto set_value() noexcept -> void { + static_assert(::beman::execution::receiver); + std::visit( + [this](auto&& v) { + std::apply( + [this](auto tag, auto&&... a) { tag(std::move(this->s->receiver), ::std::move(a)...); }, + v); + }, + s->value); + } + auto get_env() const noexcept -> ::beman::execution::empty_env { /*-dk:TODO */ return {}; } + auto set_error(auto&&) noexcept -> void { /*-dk:TODO remove */ } + auto set_stopped() noexcept -> void { /*-dk:TODO remove */ } + }; + + struct work_receiver { + using receiver_concept = ::beman::execution::receiver_t; + state* s; + template + auto set_value(T&&... args) noexcept -> void { + static_assert(::beman::execution::receiver); + this->s->value + .template emplace<::std::tuple<::beman::execution::set_value_t, ::std::remove_cvref_t...>>( + ::beman::execution::set_value, ::std::forward(args)...); + this->s->sched_op_.start(); + } + template + auto set_error(E&& error) noexcept -> void { + static_assert(::beman::execution::receiver); + this->s->value + .template emplace<::std::tuple<::beman::execution::set_error_t, ::std::remove_cvref_t>>( + ::beman::execution::set_error, ::std::forward(error)); + this->s->sched_op_.start(); + } + auto set_stopped(auto&&...) noexcept -> void { + static_assert(::beman::execution::receiver); + this->s->value.template emplace<::std::tuple<::beman::execution::set_stopped_t>>( + ::beman::execution::set_stopped); + this->s->sched_op_.start(); + } + auto get_env() const noexcept -> decltype(::beman::execution::get_env(::std::declval())) { + return ::beman::execution::get_env(this->s->receiver); + } + }; + using scheduler_t = + decltype(::beman::execution::get_scheduler(::beman::execution::get_env(::std::declval()))); + using schedule_op = decltype(::beman::execution::connect( + ::beman::execution::schedule(::std::declval()), ::std::declval())); + using work_op = + decltype(::beman::execution::connect(::std::declval(), ::std::declval())); + + ::std::remove_cvref_t receiver; + value_type value; + schedule_op sched_op_; + work_op work_op_; + + template + explicit state(S&& s, R&& r) + : receiver(::std::forward(r)), + sched_op_(::beman::execution::connect(::beman::execution::schedule(::beman::execution::get_scheduler( + ::beman::execution::get_env(this->receiver))), + schedule_receiver{this})), + work_op_(::beman::execution::connect(::std::forward(s), work_receiver{this})) { + static_assert(::beman::execution::operation_state); + } + auto start() & noexcept -> void { ::beman::execution::start(this->work_op_); } + }; + template sender(S&& s) : ::beman::execution::detail::product_type<::beman::task::detail::affine_on_t, Sender>{ {{::beman::task::detail::affine_on_t{}}, {Sender(::std::forward(s))}}} {} - template <::beman::execution::receiver Receiver> - auto connect(Receiver&& receiver) const& { - if constexpr (elide_schedule) { - return ::beman::execution::connect(this->template get<1>(), ::std::forward(receiver)); - } else { - return ::beman::execution::connect( - ::beman::execution::continues_on( - this->template get<1>(), ::beman::execution::get_scheduler(::beman::execution::get_env(receiver))), - ::std::forward(receiver)); - } - } template <::beman::execution::receiver Receiver> auto connect(Receiver&& receiver) && { if constexpr (elide_scheduletemplate get<1>()), ::std::forward(receiver)); } else { - return ::beman::execution::connect( - ::beman::execution::continues_on( - ::std::move(this->template get<1>()), - ::beman::execution::get_scheduler(::beman::execution::get_env(receiver))), - ::std::forward(receiver)); + return state(::std::move(this->template get<1>()), ::std::forward(receiver)); } } }; From 14909d0b6454ce37a935b4060ae2c592e2df204e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Tue, 2 Dec 2025 23:48:52 +0000 Subject: [PATCH 02/11] changed the code to require unstoppable schedulers --- CMakeLists.txt | 2 +- examples/CMakeLists.txt | 2 +- include/beman/task/detail/affine_on.hpp | 2 +- include/beman/task/detail/task_scheduler.hpp | 4 +++- tests/beman/task/task_scheduler.test.cpp | 4 ++-- 5 files changed, 8 insertions(+), 6 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 8563055..88fb931 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -36,7 +36,7 @@ FetchContent_Declare( execution # SOURCE_DIR ${CMAKE_SOURCE_DIR}/../execution GIT_REPOSITORY https://github.com/bemanproject/execution - GIT_TAG 686685c + GIT_TAG 945aa7e ) FetchContent_MakeAvailable(execution) diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 9db1299..cc6e9c5 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -3,6 +3,7 @@ set(TODO tls-scheduler into_optional + issue-start-reschedules ) set(ALL_EXAMPLES @@ -34,7 +35,6 @@ set(ALL_EXAMPLES hello issue-affine_on issue-frame-allocator - issue-start-reschedules loop query result_example diff --git a/include/beman/task/detail/affine_on.hpp b/include/beman/task/detail/affine_on.hpp index 0c6d56f..e89ba9c 100644 --- a/include/beman/task/detail/affine_on.hpp +++ b/include/beman/task/detail/affine_on.hpp @@ -93,7 +93,7 @@ struct affine_on_t::sender : ::beman::execution::detail::product_type<::beman::t } auto get_env() const noexcept -> ::beman::execution::empty_env { /*-dk:TODO */ return {}; } auto set_error(auto&&) noexcept -> void { /*-dk:TODO remove */ } - auto set_stopped() noexcept -> void { /*-dk:TODO remove */ } + // auto set_stopped() noexcept -> void { /*-dk:TODO remove */ } }; struct work_receiver { diff --git a/include/beman/task/detail/task_scheduler.hpp b/include/beman/task/detail/task_scheduler.hpp index 1df2851..5aa50b1 100644 --- a/include/beman/task/detail/task_scheduler.hpp +++ b/include/beman/task/detail/task_scheduler.hpp @@ -113,7 +113,9 @@ class task_scheduler { void complete_error(std::exception_ptr ptr) override { ::beman::execution::set_error(std::move(receiver), std::move(ptr)); } - void complete_stopped() override { ::beman::execution::set_stopped(std::move(this->receiver)); } + void complete_stopped() override { + //::beman::execution::set_stopped(std::move(this->receiver)); + } ::beman::execution::inplace_stop_token get_stop_token() override { if constexpr (::std::same_as) { return ::beman::execution::get_stop_token(::beman::execution::get_env(this->receiver)); diff --git a/tests/beman/task/task_scheduler.test.cpp b/tests/beman/task/task_scheduler.test.cpp index d2b1ebf..d8d081e 100644 --- a/tests/beman/task/task_scheduler.test.cpp +++ b/tests/beman/task/task_scheduler.test.cpp @@ -284,7 +284,7 @@ int main() { source.request_stop(); assert(result == stop_result::stopped); } - { + if constexpr (false) { //-dk:TODO ex::inplace_stop_source source; stop_result result{stop_result::none}; auto state{ex::connect( @@ -296,7 +296,7 @@ int main() { source.request_stop(); assert(result == stop_result::stopped); } - { + if constexpr (false) { //-dk:TODO ex::stop_source source; stop_result result{stop_result::none}; auto state{ex::connect( From ead51ae0d0fc1facfa066fb5fe349cf9c719ea62 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Wed, 3 Dec 2025 21:24:51 +0000 Subject: [PATCH 03/11] removed set_error from task_scheduler --- CMakeLists.txt | 2 +- examples/CMakeLists.txt | 4 +- include/beman/task/detail/affine_on.hpp | 3 +- include/beman/task/detail/task_scheduler.hpp | 53 +-------------- tests/beman/task/task_scheduler.test.cpp | 68 +------------------- 5 files changed, 7 insertions(+), 123 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 88fb931..6714d37 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -36,7 +36,7 @@ FetchContent_Declare( execution # SOURCE_DIR ${CMAKE_SOURCE_DIR}/../execution GIT_REPOSITORY https://github.com/bemanproject/execution - GIT_TAG 945aa7e + GIT_TAG b903ceb ) FetchContent_MakeAvailable(execution) diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index cc6e9c5..c49ecf2 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -4,6 +4,8 @@ set(TODO tls-scheduler into_optional issue-start-reschedules + task_scheduler + loop ) set(ALL_EXAMPLES @@ -35,11 +37,9 @@ set(ALL_EXAMPLES hello issue-affine_on issue-frame-allocator - loop query result_example stop - task_scheduler ) set(xALL_EXAMPLES issue-symmetric-transfer) set(xALL_EXAMPLES customize) diff --git a/include/beman/task/detail/affine_on.hpp b/include/beman/task/detail/affine_on.hpp index e89ba9c..2b14775 100644 --- a/include/beman/task/detail/affine_on.hpp +++ b/include/beman/task/detail/affine_on.hpp @@ -92,8 +92,7 @@ struct affine_on_t::sender : ::beman::execution::detail::product_type<::beman::t s->value); } auto get_env() const noexcept -> ::beman::execution::empty_env { /*-dk:TODO */ return {}; } - auto set_error(auto&&) noexcept -> void { /*-dk:TODO remove */ } - // auto set_stopped() noexcept -> void { /*-dk:TODO remove */ } + // auto set_error(auto&&) noexcept -> void { /*-dk:TODO remove */ } }; struct work_receiver { diff --git a/include/beman/task/detail/task_scheduler.hpp b/include/beman/task/detail/task_scheduler.hpp index 5aa50b1..5090c3a 100644 --- a/include/beman/task/detail/task_scheduler.hpp +++ b/include/beman/task/detail/task_scheduler.hpp @@ -39,31 +39,15 @@ namespace beman::task::detail { class task_scheduler { struct state_base { virtual ~state_base() = default; - virtual void complete_value() = 0; - virtual void complete_error(::std::error_code) = 0; - virtual void complete_error(::std::exception_ptr) = 0; - virtual void complete_stopped() = 0; - virtual ::beman::execution::inplace_stop_token get_stop_token() = 0; + virtual void complete_value() = 0; }; struct inner_state { struct receiver; - struct env { - state_base* state; - auto query(::beman::execution::get_stop_token_t) const noexcept { return this->state->get_stop_token(); } - }; struct receiver { using receiver_concept = ::beman::execution::receiver_t; state_base* state; void set_value() && noexcept { this->state->complete_value(); } - void set_error(std::error_code err) && noexcept { this->state->complete_error(err); } - void set_error(std::exception_ptr ptr) && noexcept { this->state->complete_error(std::move(ptr)); } - template - void set_error(E e) && noexcept { - this->state->complete_error(std::make_exception_ptr(std::move(e))); - } - void set_stopped() && noexcept { this->state->complete_stopped(); } - env get_env() const noexcept { return {this->state}; } }; static_assert(::beman::execution::receiver); @@ -88,48 +72,13 @@ class task_scheduler { template <::beman::execution::receiver Receiver> struct state : state_base { using operation_state_concept = ::beman::execution::operation_state_t; - struct stopper { - state* st; - void operator()() noexcept { - state* self = this->st; - self->callback.reset(); - self->source.request_stop(); - } - }; - using token_t = - decltype(::beman::execution::get_stop_token(::beman::execution::get_env(std::declval()))); - using callback_t = ::beman::execution::stop_callback_for_t; - std::remove_cvref_t receiver; inner_state s; - ::beman::execution::inplace_stop_source source; - ::std::optional callback; template <::beman::execution::receiver R, typename PS> state(R&& r, PS& ps) : receiver(std::forward(r)), s(ps->connect(this)) {} void start() & noexcept { this->s.start(); } void complete_value() override { ::beman::execution::set_value(std::move(this->receiver)); } - void complete_error(std::error_code err) override { ::beman::execution::set_error(std::move(receiver), err); } - void complete_error(std::exception_ptr ptr) override { - ::beman::execution::set_error(std::move(receiver), std::move(ptr)); - } - void complete_stopped() override { - //::beman::execution::set_stopped(std::move(this->receiver)); - } - ::beman::execution::inplace_stop_token get_stop_token() override { - if constexpr (::std::same_as) { - return ::beman::execution::get_stop_token(::beman::execution::get_env(this->receiver)); - } else { - if constexpr (not ::std::same_as) { - if (not this->callback) { - this->callback.emplace( - ::beman::execution::get_stop_token(::beman::execution::get_env(this->receiver)), - stopper{this}); - } - } - return this->source.get_token(); - } - } }; class sender; diff --git a/tests/beman/task/task_scheduler.test.cpp b/tests/beman/task/task_scheduler.test.cpp index d8d081e..20914e1 100644 --- a/tests/beman/task/task_scheduler.test.cpp +++ b/tests/beman/task/task_scheduler.test.cpp @@ -111,14 +111,7 @@ struct thread_context { } void complete() override { this->callback.reset(); - if (this->cmpl == thread_context::complete::success) - ex::set_value(std::move(this->receiver)); - else if (this->cmpl == thread_context::complete::failure) - ex::set_error(std::move(this->receiver), std::make_error_code(std::errc::address_in_use)); - else - ex::set_error( - std::move(this->receiver), - std::make_exception_ptr(std::system_error(std::make_error_code(std::errc::address_in_use)))); + ex::set_value(std::move(this->receiver)); } }; struct env { @@ -129,8 +122,7 @@ struct thread_context { }; struct sender { using sender_concept = ex::sender_t; - using completion_signatures = - ex::completion_signatures; + using completion_signatures = ex::completion_signatures; thread_context* ctxt; thread_context::complete cmpl; @@ -241,38 +233,6 @@ int main() { ex::sync_wait(ex::schedule(ly::detail::task_scheduler(sched2)) | ex::then([&id2]() { assert(id2 == std::this_thread::get_id()); })); - { - bool success{false}; - bool failed{false}; - bool exception{false}; - ex::sync_wait(ex::schedule(ctxt1.get_scheduler(thread_context::complete::failure)) | - ex::then([&success] { success = true; }) | - ex::upon_error([&failed, &exception](const E&) { - if constexpr (std::same_as) - failed = true; - else if constexpr (std::same_as) - exception = true; - })); - assert(not success); - assert(failed); - assert(not exception); - } - { - bool success{false}; - bool failed{false}; - bool exception{false}; - ex::sync_wait(ex::schedule(ctxt1.get_scheduler(thread_context::complete::exception)) | - ex::then([&success] { success = true; }) | - ex::upon_error([&failed, &exception](const E&) { - if constexpr (std::same_as) - failed = true; - else if constexpr (std::same_as) - exception = true; - })); - assert(not success); - assert(not failed); - assert(exception); - } { ex::inplace_stop_source source; stop_result result{stop_result::none}; @@ -284,30 +244,6 @@ int main() { source.request_stop(); assert(result == stop_result::stopped); } - if constexpr (false) { //-dk:TODO - ex::inplace_stop_source source; - stop_result result{stop_result::none}; - auto state{ex::connect( - ex::schedule(ly::detail::task_scheduler(ctxt1.get_scheduler(thread_context::complete::never))), - stop_receiver{source.get_token(), result})}; - assert(result == stop_result::none); - ex::start(state); - assert(result == stop_result::none); - source.request_stop(); - assert(result == stop_result::stopped); - } - if constexpr (false) { //-dk:TODO - ex::stop_source source; - stop_result result{stop_result::none}; - auto state{ex::connect( - ex::schedule(ly::detail::task_scheduler(ctxt1.get_scheduler(thread_context::complete::never))), - stop_receiver{source.get_token(), result})}; - assert(result == stop_result::none); - ex::start(state); - assert(result == stop_result::none); - source.request_stop(); - assert(result == stop_result::stopped); - } { std::latch completed{1}; stop_result result{stop_result::none}; From 21e5a7a332b5736cfb433f93be8d41032df9f281 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Wed, 3 Dec 2025 22:14:14 +0000 Subject: [PATCH 04/11] a few more fixes and restored tests --- CMakeLists.txt | 2 +- examples/CMakeLists.txt | 4 ++-- examples/issue-start-reschedules.cpp | 14 ++++++++++---- include/beman/task/detail/affine_on.hpp | 4 +++- 4 files changed, 16 insertions(+), 8 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 6714d37..a1c58ec 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -36,7 +36,7 @@ FetchContent_Declare( execution # SOURCE_DIR ${CMAKE_SOURCE_DIR}/../execution GIT_REPOSITORY https://github.com/bemanproject/execution - GIT_TAG b903ceb + GIT_TAG 7451ece ) FetchContent_MakeAvailable(execution) diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index c49ecf2..96ad11a 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -4,11 +4,11 @@ set(TODO tls-scheduler into_optional issue-start-reschedules - task_scheduler - loop ) set(ALL_EXAMPLES + task_scheduler + loop odd-return bulk dangling-references diff --git a/examples/issue-start-reschedules.cpp b/examples/issue-start-reschedules.cpp index f960d41..89d721a 100644 --- a/examples/issue-start-reschedules.cpp +++ b/examples/issue-start-reschedules.cpp @@ -12,6 +12,9 @@ namespace ex = beman::execution; ex::task<> test(auto sched) { std::cout << "init =" << std::this_thread::get_id() << "\n"; co_await ex::starts_on(sched, ex::just()); + // static_assert(std::same_as); + co_await ex::just(); std::cout << "final=" << std::this_thread::get_id() << "\n"; } @@ -23,8 +26,11 @@ int main() { ex::then([] { std::cout << "loop1=" << std::this_thread::get_id() << "\n"; })); ex::sync_wait(ex::schedule(loop2.get_scheduler()) | ex::then([] { std::cout << "loop2=" << std::this_thread::get_id() << "\n"; })); - std::cout << "--- use 1 ---\n"; - ex::sync_wait(test(loop2.get_scheduler())); - std::cout << "--- use 2 ---\n"; - ex::sync_wait(ex::starts_on(loop1.get_scheduler(), test(loop2.get_scheduler()))); + try { + std::cout << "--- use 1 ---\n"; + ex::sync_wait(test(loop2.get_scheduler())); + std::cout << "--- use 2 ---\n"; + // ex::sync_wait(ex::starts_on(loop1.get_scheduler(), test(loop2.get_scheduler()))); + } catch (...) { + } } diff --git a/include/beman/task/detail/affine_on.hpp b/include/beman/task/detail/affine_on.hpp index 2b14775..86f7761 100644 --- a/include/beman/task/detail/affine_on.hpp +++ b/include/beman/task/detail/affine_on.hpp @@ -5,6 +5,7 @@ #define INCLUDED_INCLUDE_BEMAN_TASK_DETAIL_AFFINE_ON #include +#include #include #include #include @@ -69,7 +70,7 @@ struct affine_on_t::sender : ::beman::execution::detail::product_type<::beman::t struct to_variant_t; template struct to_variant_t<::beman::execution::completion_signatures> { - using type = ::std::variant::type...>; + using type = ::beman::execution::detail::meta::unique<::std::variant::type...>>; }; using operation_state_concept = ::beman::execution::operation_state_t; @@ -109,6 +110,7 @@ struct affine_on_t::sender : ::beman::execution::detail::product_type<::beman::t template auto set_error(E&& error) noexcept -> void { static_assert(::beman::execution::receiver); + // static_assert(std::same_ass->value .template emplace<::std::tuple<::beman::execution::set_error_t, ::std::remove_cvref_t>>( ::beman::execution::set_error, ::std::forward(error)); From f87488d3ea679890a245ef3a98603fe584b92488 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Sun, 7 Dec 2025 22:46:38 +0000 Subject: [PATCH 05/11] made some progress on the affinity paper --- docs/Makefile | 2 +- docs/affinity.md | 200 ++++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 189 insertions(+), 13 deletions(-) diff --git a/docs/Makefile b/docs/Makefile index 833fdbf..a91a359 100644 --- a/docs/Makefile +++ b/docs/Makefile @@ -4,7 +4,7 @@ default: doc doc: doc-html -doc-html: P3552-task.html P3796-task-issues.html +doc-html: P3552-task.html P3796-task-issues.html affinity.html doc-pdf: P3552-task.pdf P3796-task-issues.pdf diff --git a/docs/affinity.md b/docs/affinity.md index d5335a0..edbc1da 100644 --- a/docs/affinity.md +++ b/docs/affinity.md @@ -1,7 +1,7 @@ --- title: Scheduler Affinity -document: D???? -date: 2025-11-23 +document: D0000R0 +date: 2025-12-07 audience: - Concurrency Working Group (SG1) - Library Evolution Working Group (LEWG) @@ -26,14 +26,14 @@ and discussed as part of [P3796R1](https://wg21.link/P3796R1). This proposal is intended to specifically address the concerns raised relating to `task`'s scheduler affinity and in particular `affine_on`. The gist of this proposal is impose constraints on `affine_on` to -guarantee it can its objective at run-time. +guarantee it can meet its objective at run-time.

# Change History ## R0 Initial Revision -# Discussion +# Overview of Changes

There are a few NB comments raised about the way `affine_on` works: @@ -48,26 +48,25 @@ There are a few NB comments raised about the way `affine_on` works:

The discussion on `affine_on` revealed some aspects which were not quite clear previously and taking these into account points towards -a better design than was previously documented: +a better design than was previously specified:

  1. To implement scheduler affinity the algorithm needs to know the scheduler on which it was started itself. The correct receiver - may actually be hard to determine while build the work graph. - However, this scheduler is communicated using + may actually be hard to determine while building the work graph. + However, this scheduler can be communicated using get_scheduler(get_env(rcvr)) when an algorithm is `start`ed. This requirement is more general than just `affine_on` and is introduced by - [P3826R2](https://isocpp.org/files/papers/P3826R2.html) *TODO* - verify the reference: with this guarantee in place, `affine_on` - only needs one parameter, i.e., the sender for the work to be - executed. + [P3718R0](https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2025/p3718r0.html): + with this guarantee in place, `affine_on` only needs one + parameter, i.e., the sender for the work to be executed.
  2. The scheduler sched on which the work needs to resume has to guarantee that it is possible to resume in the - correct context. The implication is that scheduling work needs + correct execution agent. The implication is that scheduling work needs to be infallible, i.e., the completion signatures of scheduler(sched) cannot contain a `set_error_t(E)` completion signature. This requirement should @@ -83,4 +82,181 @@ a better design than was previously documented: an `unstoppable_token`. This requirement should also be checked statically.
  3. +
  4. + When a sender knows that it will complete on the scheduler it + was start on, it should be possible to customize the `affine_on` + algorithm to avoid rescheduling. This customization can be + achieved by `connect`ing to the result of an `affine_on` member + function called on the child sender, if such a member function + is present, when `connect`ing an `affine_on` sendering +
  5. +
+ +

+None of these changes really contradict any earlier design: the +shape and behavior of the `affine_on` algorithm wasn't fully fleshed +out. Tightening the behavior scheduler affinity and the `affine_on` +algorithm has some implications on some other components: +

+
    +
  1. + If `affine_on` requires an infallible scheluder at least + `inline_scheduler`, `task_scheduler`, and `run_loop::scheduler` + should be infallible (i.e., they always complete successfully + with `set_value()`). `parallel_scheduler` can probably not be + made infallible. +
  2. +
  3. + The scheduling semantics when changing a `task`'s scheduler + using co_await change_coroutine_scheduler(sch) + become somewhat unclear and this functionality should be removed. + Similar semantics are better modeled using co_await + on(sch, nested-task). +
  4. +
  5. + The name `affine_on` isn't particular good and wasn't designed. + It may be worth renaming the algorithms to something different. +
+ +# Discussion of Changes + +## `affine_on` Shape + +

+The original proposal for `task` used `continues_on` to schedule +the work back on the original scheduler. This algorithm takes the +work to be executed and the scheduler on which to continue as +arguments. When SG1 requested that a similar but different algorithms +is to be used to implement scheduler affinity, `continues_on` was +just replaced by `affine_on` with the same shape but the potential +to get customized differently. +

+

+For scheduler affinity the scheduler to resume on can, however, +also be communicated via the `get_scheduler` query on the receiver's +environment. The result from `get_scheduler` is also the scheduler +any use of `affine_on` would use when invoking the algorihtm. In +the context of the `task` coroutine this scheduler can be obtained +via the promise type but in general it is actually not straight +forward to get hold of this scheduler because it is only provided +by `connect`. It is much more reasonable to have `affine_on` only +take the work, i.e., a sender, as argument and determine the scheduler +to resume on from the receiver's environment in `connect`. +

+

+Thus, instead of using +```c++ +affine_on(@_sndr_@, @_sch_@) +``` +the algorithm is used just with the sender: +```c++ +affine_on(@_sndr_@) +``` +

+

+Note that this change implies that an operation state resulting +from `connect`ing `affine_on` to a receiver rcvr +is `start`ed on the execution agent associated with the scheduler obtained +from get_scheduler(get_env(rcvr)). The same +requirement is also assumed to be meet when `start`ing the operation +state resulting from `connect`ing a `task`. While it is possible to +statically detect whether the query is valid and provides a scheduler +it cannot be detected if the scheduler matches the execution agent on which +`start` was called. +[P3718r0](https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2025/p3718r0.html) +proposes to add this exact requirement to +[[exec.get.scheduler]](https://wg21.link/exec.get.scheduler). + +

+ +## Infallible Schedulers +

+The objective of `affine_on(@_sndr_@)` is to execute `@_sndr_@` and +to complete on the execution agent on which the operation was +`start`ed. Let `sch` be the scheduler obtained from +`get_scheduler(get_env(@_rcvr_@))` where `@_rcvr_@` is the receiver +used when `connect`ing `affine_on(@_sndr_@)`. If `connect`ing and +`start`ing the result of `schedule(@_sch_@)` is successful, `affine_on` +can achieve its objective. However, if this scheduling operation +fails, i.e., it completes with `set_error(@_e_@)`, or if it gets +cancelled, i.e., it completes with `set_stopped()`, the execution +agent on which the scheduling operation resumes is unclear and +`affine_on` cannot guarantee its promise. Thus, it seems reasonable +to require that a scheduler used with `affine_on` is infallible, at +least when used appropriately. +

+

+The current working draft specifies 4 schedulers: +

+
    +
  1. + [`inline_scheduler`](https://wg21.link/exec.inline.scheduler) which + just completes with `set_value()` when `start()`ed, i.e., this + scheduler is already infallibe. +
  2. +
  3. + [`task_scheduler`](https://wg21.link/exec.task.scheduler) is a + type-erased scheduler delegating to another scheduler. If the + underlying scheduler is infallible, the only error case for + `task_scheduler` is potential memory allocation during `connect` + of its `@_ts-sender_@`. If `affine_on` creates an operation state + for the scheduling operation during `connect`, it can guarantee + that any necessary scheduling operation succeeds. Thus, this + scheduler can be made infallible. +
  4. +
  5. + The [`run_loop::@_run-loop-scheduler_@`](https://wg21.link/exec.run.loop) + is used by [`run_loop`](https://wg21.link/exec.run.loop). The + current specification allows the scheduling operation to fail + with `set_error_t(std::exception_ptr)`. This permission allows + an implementation to use [`std::mutex`](https://wg21.link/thread.mutex) + and [`std::condition_variable`](https://wg21.link/thread.condition) + whose operations may throw. It is possible to implement the logic + using atomic operations which can't throw. The `set_stopped()` + completion is only used when the receiver's stop token, i.e. the + result of `get_stop_token(get_env(@_rcvr_@))`, was stopped. This + receiver is controlled by `affine_on`, i.e., it can provide a + [`never_stoptoken`](https://wg21.link/stoptoken.never) and this + scheduler won't complete with `set_stopped()`. If the + [`get_completion_signatures`](https://wg21.link/exec.getcomplsigs) for + the corresponding sender takes the environment into account, this + scheduler can also be made infallible. +
  6. +
  7. + The [`parallel_scheduler`](https://wg21.link/exec.par.scheduler) + provides an interface to a replacable implementation of a thread + pool. The current interface allows + [`parallel_scheduler`](https://wg21.link/exec.par.scheduler) to + complete with `set_error_t(std::exception_ptr)` as well as with + `set_stopped_t()`. It seems unlikely that this interface can be + constrained to make it infallible. +
  8. +
+

+In general it seems unlikely that all schedulers can be constrained +to be infallible. As a result `affine_on` and, by extension, `task` +won't be usable with all schedulers if `affine_on` insists on using +only infallible schedulers. Note that `affine_on` can fail and get +cancelled but in all cases its promise is that it resumes on the +original scheduler. Thus, a `set_error(@_e_@)` completion can't be +used to indicate scheduling failure, either. +

+

+If a users wants to use a fallible scheduler with `affine_on` or +`task` the scheduler will need to be adapted. The adapted scheduler +can define what it means when the underlying scheduler fails. For +example, the user can cause this failure to terminate the program +or consider the execution agent on which the underlying scheduler +completed to be suitable to continue running. +

+ +## `affine_on` Customization + +TODO + +## Removing `change_coroutine_scheduler` + +TODO + +# Wording Changes \ No newline at end of file From fe1fbcf4b18ccdf1c4d63c5cbef65bc6c8064f60 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Tue, 9 Dec 2025 23:40:10 +0000 Subject: [PATCH 06/11] added a static assertion for affine_on's scheduler being infallible --- docs/Makefile | 2 +- docs/{affinity.md => P3941-affinity.md} | 6 +- examples/CMakeLists.txt | 16 ++-- examples/issue-affine_on.cpp | 5 ++ include/beman/task/detail/affine_on.hpp | 48 +++++++----- include/beman/task/detail/task_scheduler.hpp | 8 +- tests/beman/task/affine_on.test.cpp | 80 ++++++++++++++++---- 7 files changed, 114 insertions(+), 51 deletions(-) rename docs/{affinity.md => P3941-affinity.md} (99%) diff --git a/docs/Makefile b/docs/Makefile index a91a359..9594236 100644 --- a/docs/Makefile +++ b/docs/Makefile @@ -4,7 +4,7 @@ default: doc doc: doc-html -doc-html: P3552-task.html P3796-task-issues.html affinity.html +doc-html: P3552-task.html P3796-task-issues.html P3941-affinity.html doc-pdf: P3552-task.pdf P3796-task-issues.pdf diff --git a/docs/affinity.md b/docs/P3941-affinity.md similarity index 99% rename from docs/affinity.md rename to docs/P3941-affinity.md index edbc1da..fbd85ce 100644 --- a/docs/affinity.md +++ b/docs/P3941-affinity.md @@ -1,6 +1,6 @@ --- title: Scheduler Affinity -document: D0000R0 +document: D3941R0 date: 2025-12-07 audience: - Concurrency Working Group (SG1) @@ -259,4 +259,8 @@ TODO TODO +## `affine_on` Default Implementation + +TODO + # Wording Changes \ No newline at end of file diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 96ad11a..e54d478 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -4,25 +4,26 @@ set(TODO tls-scheduler into_optional issue-start-reschedules + loop ) set(ALL_EXAMPLES task_scheduler - loop - odd-return bulk + c++now-allocator + c++now-cancel + c++now-errors + alloc + affinity + odd-return dangling-references rvalue-task aggregate-return customize + issue-affine_on issue-symmetric-transfer - affinity - alloc c++now-affinity - c++now-allocator c++now-basic - c++now-cancel - c++now-errors c++now-query c++now-result-types c++now-return @@ -35,7 +36,6 @@ set(ALL_EXAMPLES escaped-exception friendly hello - issue-affine_on issue-frame-allocator query result_example diff --git a/examples/issue-affine_on.cpp b/examples/issue-affine_on.cpp index f89663d..2b9b709 100644 --- a/examples/issue-affine_on.cpp +++ b/examples/issue-affine_on.cpp @@ -14,6 +14,11 @@ ex::task<> test(Sender&& sender) { int main() { ex::sync_wait(test(ex::just())); + ex::sync_wait(test(ex::read_env(ex::get_scheduler))); +#if 0 + ex::sync_wait(test(ex::read_env(ex::get_scheduler) | + ex::let_value([](auto sched) noexcept { return ex::just(); }))); ex::sync_wait(test(ex::read_env(ex::get_scheduler) | ex::let_value([](auto sched) { return ex::starts_on(sched, ex::just()); }))); +#endif } diff --git a/include/beman/task/detail/affine_on.hpp b/include/beman/task/detail/affine_on.hpp index 86f7761..14de6d2 100644 --- a/include/beman/task/detail/affine_on.hpp +++ b/include/beman/task/detail/affine_on.hpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -36,26 +37,13 @@ struct affine_on_t::sender : ::beman::execution::detail::product_type<::beman::t template auto get_completion_signatures(const Env& env) const& noexcept { - if constexpr (elide_schedule()))>) { - return ::beman::execution::get_completion_signatures( - ::std::remove_cvref_t(::std::move(this->template get<2>())), env); - } else { - return ::beman::execution::get_completion_signatures( - ::beman::execution::continues_on(this->template get<1>(), ::beman::execution::get_scheduler(env)), - env); - } + return ::beman::execution::get_completion_signatures(::std::remove_cvref_t(this->template get<1>()), + env); } template auto get_completion_signatures(const Env& env) && noexcept { - if constexpr (elide_schedule()))>) { - return ::beman::execution::get_completion_signatures( - ::std::remove_cvref_t(::std::move(this->template get<1>())), env); - } else { - return ::beman::execution::get_completion_signatures( - ::beman::execution::continues_on(::std::move(this->template get<1>()), - ::beman::execution::get_scheduler(env)), - env); - } + return ::beman::execution::get_completion_signatures( + ::std::remove_cvref_t(::std::move(this->template get<1>())), env); } template <::beman::execution::receiver Receiver> @@ -77,7 +65,6 @@ struct affine_on_t::sender : ::beman::execution::detail::product_type<::beman::t using completion_signatures = decltype(::beman::execution::get_completion_signatures( ::std::declval(), ::beman::execution::get_env(::std::declval()))); using value_type = typename to_variant_t::type; - // static_assert(std::same_as); struct schedule_receiver { using receiver_concept = ::beman::execution::receiver_t; @@ -85,7 +72,7 @@ struct affine_on_t::sender : ::beman::execution::detail::product_type<::beman::t auto set_value() noexcept -> void { static_assert(::beman::execution::receiver); std::visit( - [this](auto&& v) { + [this](auto&& v) -> void { std::apply( [this](auto tag, auto&&... a) { tag(std::move(this->s->receiver), ::std::move(a)...); }, v); @@ -93,7 +80,6 @@ struct affine_on_t::sender : ::beman::execution::detail::product_type<::beman::t s->value); } auto get_env() const noexcept -> ::beman::execution::empty_env { /*-dk:TODO */ return {}; } - // auto set_error(auto&&) noexcept -> void { /*-dk:TODO remove */ } }; struct work_receiver { @@ -110,7 +96,6 @@ struct affine_on_t::sender : ::beman::execution::detail::product_type<::beman::t template auto set_error(E&& error) noexcept -> void { static_assert(::beman::execution::receiver); - // static_assert(std::same_ass->value .template emplace<::std::tuple<::beman::execution::set_error_t, ::std::remove_cvref_t>>( ::beman::execution::set_error, ::std::forward(error)); @@ -146,6 +131,21 @@ struct affine_on_t::sender : ::beman::execution::detail::product_type<::beman::t schedule_receiver{this})), work_op_(::beman::execution::connect(::std::forward(s), work_receiver{this})) { static_assert(::beman::execution::operation_state); + if constexpr (not ::std::same_as< + ::beman::execution::completion_signatures<::beman::execution::set_value_t()>, + decltype(::beman::execution::get_completion_signatures( + ::beman::execution::schedule( + ::beman::execution::get_scheduler(::beman::execution::get_env(this->receiver))), + ::beman::execution::get_env(this->receiver)))>) { + static_assert(std::same_asreceiver)))>); + } + static_assert(::std::same_as<::beman::execution::completion_signatures<::beman::execution::set_value_t()>, + decltype(::beman::execution::get_completion_signatures( + ::beman::execution::schedule(::beman::execution::get_scheduler( + ::beman::execution::get_env(this->receiver))), + ::beman::execution::get_env(this->receiver)))>); } auto start() & noexcept -> void { ::beman::execution::start(this->work_op_); } }; @@ -157,6 +157,12 @@ struct affine_on_t::sender : ::beman::execution::detail::product_type<::beman::t template <::beman::execution::receiver Receiver> auto connect(Receiver&& receiver) && { + static_assert(::std::same_as<::beman::execution::completion_signatures<::beman::execution::set_value_t()>, + decltype(::beman::execution::get_completion_signatures( + ::beman::execution::schedule( + ::beman::execution::get_scheduler(::beman::execution::get_env(receiver))), + ::beman::execution::get_env(receiver)))>, + "affine_on requires that the receiver's scheduler is infallible"); if constexpr (elide_schedule) { return ::beman::execution::connect(::std::move(this->template get<1>()), diff --git a/include/beman/task/detail/task_scheduler.hpp b/include/beman/task/detail/task_scheduler.hpp index 5090c3a..25886b7 100644 --- a/include/beman/task/detail/task_scheduler.hpp +++ b/include/beman/task/detail/task_scheduler.hpp @@ -27,8 +27,6 @@ namespace beman::task::detail { * Completion signatures: * * - `ex::set_value_t()` - * - `ex::set_error_t(std::error_code)` - * - `ex::set_error_t(std::exception_ptr)` * - `ex::set_stopped()` * * Usage: @@ -127,11 +125,7 @@ class task_scheduler { public: using sender_concept = ::beman::execution::sender_t; - using completion_signatures = - ::beman::execution::completion_signatures<::beman::execution::set_value_t(), - ::beman::execution::set_error_t(std::error_code), - ::beman::execution::set_error_t(std::exception_ptr), - ::beman::execution::set_stopped_t()>; + using completion_signatures = ::beman::execution::completion_signatures<::beman::execution::set_value_t()>; template <::beman::execution::scheduler S> explicit sender(S&& s) : inner_sender(static_cast*>(nullptr), std::forward(s)) {} diff --git a/tests/beman/task/affine_on.test.cpp b/tests/beman/task/affine_on.test.cpp index 665dda1..da57335 100644 --- a/tests/beman/task/affine_on.test.cpp +++ b/tests/beman/task/affine_on.test.cpp @@ -21,15 +21,72 @@ struct receiver { std::remove_cvref_t scheduler; auto get_env() const noexcept { return ex::detail::make_env(ex::get_scheduler, scheduler); } - - void set_value(auto&&...) && noexcept {} - void set_error(auto&&) && noexcept {} - void set_stopped() && noexcept {} + auto set_value(auto&&...) && noexcept -> void {} }; template receiver(Scheduler&& sched) -> receiver; +template +struct test_scheduler { + using scheduler_concept = ex::scheduler_t; + struct sender { + using sender_concept = ex::sender_t; + template + auto get_completion_signatures(Env&& env) const noexcept { + if constexpr (ex::unstoppable_token) + return ex::completion_signatures{}; + else + return ex::completion_signatures{}; + } + struct env { + template + auto query(ex::get_completion_scheduler_t) const noexcept -> test_scheduler { + return {}; + } + }; + auto get_env() const noexcept { return env{}; } + + template + struct op_state { + using operation_state_concept = ex::operation_state_t; + std::remove_cvref_t rcvr; + void start() & noexcept { + static_assert(ex::operation_state); + ex::set_value(std::move(this->rcvr)); + } + }; + template + auto connect(Rcvr&& rcvr) noexcept { + static_assert(ex::sender); + return op_state{std::forward(rcvr)}; + } + }; + + auto schedule() noexcept -> sender { + static_assert(ex::scheduler>); + return {}; + } + auto operator==(const test_scheduler&) const -> bool = default; +}; +static_assert(ex::scheduler>); + static_assert(ex::receiver>); +static_assert(ex::receiver>>); + +template +auto test_affine_on_accepted_scheduler(Receiver rcvr) { + if constexpr (requires { ex::connect(beman::task::affine_on(ex::just()), std::move(rcvr)); }) { + auto state(ex::connect(beman::task::affine_on(ex::just()), std::move(rcvr))); + ex::start(state); + } +} + +auto test_affine_on_accepted_scheduler() { + test_affine_on_accepted_scheduler(receiver{beman::task::detail::inline_scheduler{}}); + test_affine_on_accepted_scheduler(receiver{test_scheduler<>{}}); + // test_affine_on_accepted_scheduler(receiver{test_scheduler{}}); +} + } // namespace int main() { @@ -45,13 +102,10 @@ int main() { [[maybe_unused]] auto s(beman::task::affine_on(ex::just(42))); static_assert(ex::sender); [[maybe_unused]] auto st(ex::connect(std::move(s), receiver{context.get_scheduler()})); -#if 0 - ex::sync_wait(beman::task::affine_on(ex::just(42), context.get_scheduler()) -#else - ex::sync_wait(beman::execution::continues_on(ex::just(42), context.get_scheduler()) -#endif - | ex::then([thread_id](int value) { - assert(thread_id == std::this_thread::get_id()); - assert(value == 42); - })); + ex::sync_wait( + beman::task::affine_on(ex::starts_on(context.get_scheduler(), ex::just(42)) | ex::then([thread_id](int value) { + assert(thread_id == std::this_thread::get_id()); + assert(value == 42); + })) | + ex::then([thread_id]() { assert(thread_id != std::this_thread::get_id()); })); } From 9368f17346bd3bcf8c8e8beefeca81057990dd41 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Mon, 15 Dec 2025 00:07:10 +0000 Subject: [PATCH 07/11] added more wording to the affinity proposal --- Makefile | 4 +- docs/P3941-affinity.md | 340 ++++++++++++++++++++++++++++++++++------- 2 files changed, 289 insertions(+), 55 deletions(-) diff --git a/Makefile b/Makefile index 98ca772..d0aa522 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ #-dk: note to self: PATH=/opt/llvm-19.1.6/bin:$PATH LDFLAGS=-fuse-ld=lld -.PHONY: config test default compile clean distclean doc html pdf format clang-format tidy +.PHONY: config test default compile clean distclean doc docs html pdf format clang-format tidy BUILDDIR = build PRESET = gcc-release @@ -12,7 +12,7 @@ BUILD = $(BUILDDIR)/$(PRESET) default: compile -doc: +docs doc: cd docs; $(MAKE) pdf html: diff --git a/docs/P3941-affinity.md b/docs/P3941-affinity.md index fbd85ce..afac553 100644 --- a/docs/P3941-affinity.md +++ b/docs/P3941-affinity.md @@ -1,7 +1,7 @@ --- title: Scheduler Affinity -document: D3941R0 -date: 2025-12-07 +document: P3941R0 +date: 2025-12-14 audience: - Concurrency Working Group (SG1) - Library Evolution Working Group (LEWG) @@ -18,15 +18,15 @@ toc: true One important design of `std::execution::task` is that a coroutine resumes after a `co_await` on the same scheduler as the one it was executing on prior to the `co_await`. To achieve this, `task` -transforms the awaited object obj using -affine_on(obj, sched) where -sched is the corresponding scheduler. There -were multiple concerns raised against the specification of `affine_on` -and discussed as part of [P3796R1](https://wg21.link/P3796R1). This -proposal is intended to specifically address the concerns raised -relating to `task`'s scheduler affinity and in particular `affine_on`. -The gist of this proposal is impose constraints on `affine_on` to -guarantee it can meet its objective at run-time. +transforms the awaited object `@_obj_@` using +`affine_on(@_obj_@, @_sched_@)` where `@_sched_@` is the corresponding +scheduler. There were multiple concerns raised against the specification +of `affine_on` and discussed as part of +[P3796R1](https://wg21.link/P3796R1). This proposal is intended +to specifically address the concerns raised relating to `task`'s +scheduler affinity and in particular `affine_on`. The gist of this +proposal is impose constraints on `affine_on` to guarantee it can +meet its objective at run-time.

# Change History @@ -56,7 +56,7 @@ a better design than was previously specified: scheduler on which it was started itself. The correct receiver may actually be hard to determine while building the work graph. However, this scheduler can be communicated using - get_scheduler(get_env(rcvr)) when an algorithm + `get_scheduler(get_env(@_rcvr_@))` when an algorithm is `start`ed. This requirement is more general than just `affine_on` and is introduced by [P3718R0](https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2025/p3718r0.html): @@ -64,11 +64,11 @@ a better design than was previously specified: parameter, i.e., the sender for the work to be executed.
  • - The scheduler sched on which the work needs - to resume has to guarantee that it is possible to resume in the + The scheduler `@_sched_@` on which the work needs + to resume has to guarantee that it is possible to resume on the correct execution agent. The implication is that scheduling work needs to be infallible, i.e., the completion signatures of - scheduler(sched) cannot contain a + `scheduler(@_sched_@)` cannot contain a `set_error_t(E)` completion signature. This requirement should be checked statically.
  • @@ -108,10 +108,10 @@ algorithm has some implications on some other components:
  • The scheduling semantics when changing a `task`'s scheduler - using co_await change_coroutine_scheduler(sch) + using `co_await change_coroutine_scheduler(@_sch_@)` become somewhat unclear and this functionality should be removed. - Similar semantics are better modeled using co_await - on(sch, nested-task). + Similar semantics are better modeled using + `co_await on(@_sch_@, @_nested-task_@)`.
  • The name `affine_on` isn't particular good and wasn't designed. @@ -133,16 +133,18 @@ just replaced by `affine_on` with the same shape but the potential to get customized differently.

    -For scheduler affinity the scheduler to resume on can, however, -also be communicated via the `get_scheduler` query on the receiver's -environment. The result from `get_scheduler` is also the scheduler -any use of `affine_on` would use when invoking the algorihtm. In -the context of the `task` coroutine this scheduler can be obtained -via the promise type but in general it is actually not straight -forward to get hold of this scheduler because it is only provided -by `connect`. It is much more reasonable to have `affine_on` only -take the work, i.e., a sender, as argument and determine the scheduler -to resume on from the receiver's environment in `connect`. +The scheduler used for affinity is the scheduler communicated via +the `get_scheduler` query on the receiver's environment: the scheduler +argument passed to the `affine_on` algorithm would need to match +the scheduler obtained from `get_scheduler` query. In the context +of the `task` coroutine this scheduler can be obtained via the +promise type but in general it is actually not straight forward to +get hold of this scheduler because the receiver and hence its +associated scheduler is only provided by `connect`. It is much more +reasonable to have `affine_on` only take the work, i.e., a sender, +as argument and determine the scheduler to resume on from the +receiver's +environment in `connect`.

    Thus, instead of using @@ -156,10 +158,10 @@ affine_on(@_sndr_@)

    Note that this change implies that an operation state resulting -from `connect`ing `affine_on` to a receiver rcvr +from `connect`ing `affine_on` to a receiver `@_rcvr_@` is `start`ed on the execution agent associated with the scheduler obtained -from get_scheduler(get_env(rcvr)). The same -requirement is also assumed to be meet when `start`ing the operation +from `get_scheduler(get_env(@_rcvr_@))`. The same +requirement is also assumed to be met when `start`ing the operation state resulting from `connect`ing a `task`. While it is possible to statically detect whether the query is valid and provides a scheduler it cannot be detected if the scheduler matches the execution agent on which @@ -167,7 +169,9 @@ it cannot be detected if the scheduler matches the execution agent on which [P3718r0](https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2025/p3718r0.html) proposes to add this exact requirement to [[exec.get.scheduler]](https://wg21.link/exec.get.scheduler). - +

    +

    +This change addresses [US 234-364](https://github.com/cplusplus/nbballot/issues/939) ([LWG4331](https://cplusplus.github.io/LWG/issue4331)).

    ## Infallible Schedulers @@ -176,15 +180,27 @@ The objective of `affine_on(@_sndr_@)` is to execute `@_sndr_@` and to complete on the execution agent on which the operation was `start`ed. Let `sch` be the scheduler obtained from `get_scheduler(get_env(@_rcvr_@))` where `@_rcvr_@` is the receiver -used when `connect`ing `affine_on(@_sndr_@)`. If `connect`ing and -`start`ing the result of `schedule(@_sch_@)` is successful, `affine_on` -can achieve its objective. However, if this scheduling operation -fails, i.e., it completes with `set_error(@_e_@)`, or if it gets -cancelled, i.e., it completes with `set_stopped()`, the execution -agent on which the scheduling operation resumes is unclear and -`affine_on` cannot guarantee its promise. Thus, it seems reasonable -to require that a scheduler used with `affine_on` is infallible, at -least when used appropriately. +used when `connect`ing `affine_on(@_sndr_@)` (the discussion in +this section also applies if the scheduler would be taken as a +parameter, i.e., if the [previous change](#affine_on-shape) isn't +applied this discussion still applies). If `connect`ing the result +of `schedule(@_sch_@)` fails (i.e., `connect(schedule(@_sch_@), +@_rcvr_@)` throws where `@_rcvr_@` is a suitable receiver), `affine_on` +can avoid `start`ing the main work and fail on the execution agent +where it was `start`ed. Otherwise, if it obtained an operation +state `@_os_@` from `connect(scheduler(@_sch_@), @_rcvr_@)`, +`affine_on` would `start` its main work and would `start(@_os_@)` +on the execution agent where the main work completed. If `start(@_os_@)` +is always successful, `affine_on` can achieve its objective. However, +if this scheduling operation fails, i.e., it completes with +`set_error(@_e_@)`, or if it gets cancelled, i.e., it completes +with `set_stopped()`, the execution agent on which the scheduling +operation resumes is unclear and `affine_on` cannot guarantee its +promise. Thus, it seems reasonable to require that a scheduler used +with `affine_on` is infallible, at least when used appropriately +(i.e., when providing a receiver whose associated stop token is an +`unstoppable_token`). +

    The current working draft specifies 4 schedulers: @@ -237,30 +253,248 @@ The current working draft specifies 4 schedulers: In general it seems unlikely that all schedulers can be constrained to be infallible. As a result `affine_on` and, by extension, `task` won't be usable with all schedulers if `affine_on` insists on using -only infallible schedulers. Note that `affine_on` can fail and get -cancelled but in all cases its promise is that it resumes on the -original scheduler. Thus, a `set_error(@_e_@)` completion can't be -used to indicate scheduling failure, either. +only infallible schedulers. If there are fallible schedulers, there +aren't any good options for using them with a `task`. Note that +`affine_on` can fail and get cancelled (due to the main work failing +or getting cancelled) but `affine_on` can still guarantee that +execution resumes on the expect exuection agent when it uses an +infallible scheduler. +

    +

    +This change addresses +[US 235-363](https://github.com/cplusplus/nbballot/issues/938) +([LWG4332](https://cplusplus.github.io/LWG/issue4332)). This change +goes beyond the actual issue and clarifies that the scheduling +operation used be `affine_on` needs to be always successful.

    + +### Require Infallible Schedulers For `affine_on` +

    +If `affine_on` promises in all cases that it resumes on the +original scheduler it can only work with infallible schedulers. If a users wants to use a fallible scheduler with `affine_on` or `task` the scheduler will need to be adapted. The adapted scheduler -can define what it means when the underlying scheduler fails. For -example, the user can cause this failure to terminate the program -or consider the execution agent on which the underlying scheduler -completed to be suitable to continue running. +can define what it means when the underlying scheduler fails. There +are conceptually only two options (the exact details may vary) on how +to deal with a failed scheduling operation: +

    +
      +
    1. +The user can transform the scheduling failure into a call to +`std::terminate`. +
    2. +
    3. +The user can consider resuming on an execution agent where the +adapting scheduluer can schedule to infallibly (e.g., the execution +agent on which operation completed) but which is different from +execution agent associated with the adapted scheduler to be suitable +to continue running. In that case the scheduling operation would +just succeed without necessarily running on the correct execution +agent. However, there is no indication that scheduling to the adapted +scheduler failed and the scheduler affinity may be impacted in this +failure case. +
    4. +
    + +The standard library doesn't provide a way to adapt schedulers +easily. However, it can certainly be done. + +### Allow Fallible Schedulers For `affine_on` + +

    +If the scheduler used with `affine_on` is allowed to fail, `affine_on` +can't guarantee that it completes on the correct scheduler in case of +an error completion. It could be specified that `affine_on` completes +with `set_error(@_rcvr_@, scheduling_error{@_e_@})` when the scheduling +operation completes with `set_error(@_r_@, @_e_@)` to make it detectable +that it didn't complete on the correct scheduler. This situation is +certainly not ideal but, at least, only affects the error completion and +it can be made detectable.

    +

    +A use of `affine_on` which always needs to complete on a specific scheduler +is still possible: in that case the user will need to make sure that the +used scheduler is infallible. The main issue here is that there is no +automatic static checking whether that is the case. +

    + +### Considerations On Infallibe Schedulers + +In an ideal world, all schedulers would be infallible. It is unclear +if that is achievable. If schedulers need to be allowed to be fallible, +it may be viable to require that all standard library schedulers +are infallible. As outlined above that should be doable for all current +schedulers except, possibly, `parallel_scheduler`. So, the proposed +change is to require schedulers to be infallible when being used with +`affine_on` (and, thus, being used by `task`) and to change as many of +the standard C++ libraries to be infallible as possible. + +If constraining `affine_on` to only infallible schedulers turns out +to be too strong, the constraint can be relaxed in a future revision +of the standard by explicitly opting out of that constraints, e.g., +using an additional argument. For `task` to make use of it, it too +would need an explicit mechnasism to indicate that its `affine_on` +use should opt out of the constraint, e.g., by adding a suitable +`static` member to the environment template argument. ## `affine_on` Customization -TODO +Senders which don't cause the execution agent to be changed like +`just` or the various queries should be able to customize `affine_on` +to avoid unnecessary scheduding. Sadly, a proposal +([P3206](https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2025/p3206r0.pdf)) +to standardize properties which could be used to determine how a +sender completes didn't make much progress, yet. An implementation +can make use of similar techniques using an implementation-specific +protocol. If a future standard defines a standard approach to +determine the necessary properties the implementation can pick up +on those. + +The idea is to have `affine_on` define a `transform_sender(s)` +member function which determines what sender should be returned. +By default the argument is returned but if the child sender indicates +that it doesn't actually change the execution agent the function +would return the child sender. There are a number of senders for +which this can be done: + +- `just`, `just_error`, and `just_stopped` +- `read_env` and `write_env` +- `then`, `upon_error`, and `upon_stopped` if the child sender + doesn't change the execution agent + +The proposal is to define a `transform_sender` member which uses +an implementation-specific property to determine that a sender +completes on the same execution agent as the one it was started on. +In addition, it is recommended that this property gets defined by +the various standard library senders where it can make a difference. + +This change addresses +[US 232-366](https://github.com/cplusplus/nbballot/issues/941) +([LWG4329](https://cplusplus.github.io/LWG/issue4329)), although +not in a way allowing application code to plug into this mechanism. +Such an approach can be designed in a future revision of the standard. ## Removing `change_coroutine_scheduler` -TODO +The current working paper specifies `change_coroutine_scheduler` to change +the scheduler used by the coroutine for scheduler affinity. It turns out that +this use is somewhat problematic in two ways: + +1. Changing the scheduler affects the coroutine until the end of + the coroutine or until `change_coroutine_scheduler` is `co_await`ed + again. It doesn't automatically reset. Thus, local variables + constructed before `change_coroutine_scheduler(s)` was + `co_await`ed were constructed on the original scheduler and are + destroyed on the replaced scheduler. +2. The `task`'s execution may finish on a different than the original + scheduler. To allow symmetric transfer between two `task`s each + `task` needs to complete on the correct scheduler. Thus, the + `task` needs to be prepared to change to the orginal scheduler + before actually completing. To do so, it is necessary to know + the original scheduler and also to have storage for the state + needed to change to a different scheduler. It can't be statically + detected whether `change_coroutine_scheduler(s)` is `co_await`ed + in the body of a coroutine and, thus, the necessary storage and + checks are needed even for `task`s which don't use + `change_coroutine_scheduler`. + +If there were no way to change the scheduler it would still be possible +to execute using a different scheduler, although not as direct: +instead of using `co_await change_coroutine_scheduler(s)` to change +the scheduler used for affinity to `s` a nested `task` executing on `s` +could be `co_await`ed: + +```c++ +co_await ex::starts_on(s, [](@_parameters_@)->task<@_T_@, @_E_@> { @_logic_@ }(@_arguments_@)); +``` + +Using this approach the use of the scheduler `s` is clearly limited +to the nested coroutine. The scheduler affinity is fully taken care +of by the use of `affine_on` when `co_await`ing work. There is no +need to provide storage or checks needed for the potiential of +having a `task` return to the original scheduler if the scheduler +isn't actually changed by a `task`. + +The proposal is remove `change_coroutine_scheduler` and the possibility +of changing the scheduler within a `task`. The alternative to +controlling the scheduler used for affinity from within a `task` +is a bit verbose. This need under the control of the coroutine is +likely relatively rare. Replacing the used scheduler for an existing +`task` by nesting it within `on(s, t)` or `starts_on(s, t)` is +fairly straightforward. + +This functionality was originally included because it is present +for, at least, one of the existing libraries, although in a form +which was recommended against. The existing use changes the scheduler +of a coroutine when `co_await`ing the result of `schedule(s)`; this +exact approach was found to be fragile and surprising and the +recommendation was to provide the functionality more explicit. + +This change is not associated with any national body comment. +However, it is still important to do! It isn't adding any new +functionality but removes a problematic way to achieve something +which can be better achieved differently. If this change is not +made the inherent cost of having the possibility of having +`change_routine_scheduler` can't be removed later without breaking +existing code. ## `affine_on` Default Implementation -TODO +Using the previous discussion leads to a definition of `affine_on` which +is quite different from effectively just using `continues_on`: + +1. The class `affine_on` + should define a `transform_sender` member function which returns the + child sender if this child sender indicates via an implementation + specific way that it doesn't change the execution agent. It + should be recommended that some of the standard library sender + algorithms (see above) to indicate that they don't change the + execution agent. +2. The `affine_on` algorithm should only allow to get `connect`ed to a + receiver `r` whose scheduler `sched` obtained by + `get_scheduler(get_env(r))` is infallible, i.e., + `get_completion_signatures(schedule(sched), e)` with an envionment + `e` where `get_stop_token(e)` yields `never_stop_token` returns + `completion_signatures`. +3. When `affine_on` gets `connect`ed, the scheduling operation state needs + to be created by `connect`ing the scheduler's sender to a suitable receiver to guarantee + that the completion can be scheduled on the execution agent. + The stop token `get_stop_token(get_env(r))` for the receiver + `r` used for this `connect` shall be an `unstoppable_token`. + The child sender also needs to be `connect`ed with a receiver + which will capture the respective result upon completion and + start the scheduling operation. +4. When the result operation state gets `start`ed it `start`s the + operation state from the child operation. +5. Upon completion of the child operation the kind of completion and + the parameters, if any, are stored. If this operation throws, + the storage is set up to be as if `set_error(current_exception)` + were called. Once the parameters are stored, the scheduling + operation is started. +6. Upon completion of the scheduling operation, the appropriate + completion function with the respective arguments is invoked. + +This behaviour is similar to `continues_on` but is subtly different +with respect to when the scheduling operation state needs to be +created and that any stop token from the receiver doesn't get +forwarded. In addition `affine_on` is more constrained with respect +to the schedulers it supports and the shape of the algorithm is +different: `affine_on` gets the scheduler to execute on from the +receiver it gets `connect`ed to. + +This change addresses +[US 233-365](https://github.com/cplusplus/nbballot/issues/940) +([LWG4330](https://cplusplus.github.io/LWG/issue4330)) and +[US 236-362](https://github.com/cplusplus/nbballot/issues/937) +([LWG](https://cplusplus.github.io/LWG/issue4344); the proposed +resolution in this issue is incomplete). + +## Name Change + +The name `affine_on` isn't great. It may be worth giving the +algorithm a better name. + +# Wording Changes: TODO -# Wording Changes \ No newline at end of file +To be done. \ No newline at end of file From d7744a1087ebced147605240a20c304afb798618 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Mon, 15 Dec 2025 00:49:59 +0000 Subject: [PATCH 08/11] spell check --- docs/P3941-affinity.md | 44 +++++++++++++++++++++--------------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/docs/P3941-affinity.md b/docs/P3941-affinity.md index afac553..922ee72 100644 --- a/docs/P3941-affinity.md +++ b/docs/P3941-affinity.md @@ -39,7 +39,7 @@ meet its objective at run-time. There are a few NB comments raised about the way `affine_on` works:

      -
    • [US 232-366](https://github.com/cplusplus/nbballot/issues/941): specify customization of `affine_on` when the scheduler doesn't change.
    • +
    • [US 232-366](https://github.com/cplusplus/nbballot/issues/941): specify customisation of `affine_on` when the scheduler doesn't change.
    • [US 233-365](https://github.com/cplusplus/nbballot/issues/940): clarify `affine_on` vs. `continues_on`.
    • [US 234-364](https://github.com/cplusplus/nbballot/issues/939): remove scheduler parameter from `affine_on`.
    • [US 235-363](https://github.com/cplusplus/nbballot/issues/938): `affine_on` should not forward the stop token to the scheduling operation.
    • @@ -84,23 +84,23 @@ a better design than was previously specified:
    • When a sender knows that it will complete on the scheduler it - was start on, it should be possible to customize the `affine_on` - algorithm to avoid rescheduling. This customization can be + was start on, it should be possible to customise the `affine_on` + algorithm to avoid rescheduling. This customisation can be achieved by `connect`ing to the result of an `affine_on` member function called on the child sender, if such a member function - is present, when `connect`ing an `affine_on` sendering + is present, when `connect`ing an `affine_on` sender.
    • None of these changes really contradict any earlier design: the -shape and behavior of the `affine_on` algorithm wasn't fully fleshed -out. Tightening the behavior scheduler affinity and the `affine_on` +shape and behaviour of the `affine_on` algorithm wasn't fully fleshed +out. Tightening the behaviour scheduler affinity and the `affine_on` algorithm has some implications on some other components:

      1. - If `affine_on` requires an infallible scheluder at least + If `affine_on` requires an infallible scheduler modelled at least `inline_scheduler`, `task_scheduler`, and `run_loop::scheduler` should be infallible (i.e., they always complete successfully with `set_value()`). `parallel_scheduler` can probably not be @@ -110,7 +110,7 @@ algorithm has some implications on some other components: The scheduling semantics when changing a `task`'s scheduler using `co_await change_coroutine_scheduler(@_sch_@)` become somewhat unclear and this functionality should be removed. - Similar semantics are better modeled using + Similar semantics are better modelled using `co_await on(@_sch_@, @_nested-task_@)`.
      2. @@ -130,7 +130,7 @@ work to be executed and the scheduler on which to continue as arguments. When SG1 requested that a similar but different algorithms is to be used to implement scheduler affinity, `continues_on` was just replaced by `affine_on` with the same shape but the potential -to get customized differently. +to get customised differently.

        The scheduler used for affinity is the scheduler communicated via @@ -209,7 +209,7 @@ The current working draft specifies 4 schedulers:

      3. [`inline_scheduler`](https://wg21.link/exec.inline.scheduler) which just completes with `set_value()` when `start()`ed, i.e., this - scheduler is already infallibe. + scheduler is already infallible.
      4. [`task_scheduler`](https://wg21.link/exec.task.scheduler) is a @@ -241,7 +241,7 @@ The current working draft specifies 4 schedulers:
      5. The [`parallel_scheduler`](https://wg21.link/exec.par.scheduler) - provides an interface to a replacable implementation of a thread + provides an interface to a replaceable implementation of a thread pool. The current interface allows [`parallel_scheduler`](https://wg21.link/exec.par.scheduler) to complete with `set_error_t(std::exception_ptr)` as well as with @@ -257,7 +257,7 @@ only infallible schedulers. If there are fallible schedulers, there aren't any good options for using them with a `task`. Note that `affine_on` can fail and get cancelled (due to the main work failing or getting cancelled) but `affine_on` can still guarantee that -execution resumes on the expect exuection agent when it uses an +execution resumes on the expect execution agent when it uses an infallible scheduler.

        @@ -286,7 +286,7 @@ The user can transform the scheduling failure into a call to

      6. The user can consider resuming on an execution agent where the -adapting scheduluer can schedule to infallibly (e.g., the execution +adapting scheduler can schedule to infallibly (e.g., the execution agent on which operation completed) but which is different from execution agent associated with the adapted scheduler to be suitable to continue running. In that case the scheduling operation would @@ -319,7 +319,7 @@ used scheduler is infallible. The main issue here is that there is no automatic static checking whether that is the case.

        -### Considerations On Infallibe Schedulers +### Considerations On Infallible Schedulers In an ideal world, all schedulers would be infallible. It is unclear if that is achievable. If schedulers need to be allowed to be fallible, @@ -334,17 +334,17 @@ If constraining `affine_on` to only infallible schedulers turns out to be too strong, the constraint can be relaxed in a future revision of the standard by explicitly opting out of that constraints, e.g., using an additional argument. For `task` to make use of it, it too -would need an explicit mechnasism to indicate that its `affine_on` +would need an explicit mechanisms to indicate that its `affine_on` use should opt out of the constraint, e.g., by adding a suitable `static` member to the environment template argument. -## `affine_on` Customization +## `affine_on` Customisation Senders which don't cause the execution agent to be changed like -`just` or the various queries should be able to customize `affine_on` -to avoid unnecessary scheduding. Sadly, a proposal +`just` or the various queries should be able to customise `affine_on` +to avoid unnecessary scheduling. Sadly, a proposal ([P3206](https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2025/p3206r0.pdf)) -to standardize properties which could be used to determine how a +to standardise properties which could be used to determine how a sender completes didn't make much progress, yet. An implementation can make use of similar techniques using an implementation-specific protocol. If a future standard defines a standard approach to @@ -390,7 +390,7 @@ this use is somewhat problematic in two ways: 2. The `task`'s execution may finish on a different than the original scheduler. To allow symmetric transfer between two `task`s each `task` needs to complete on the correct scheduler. Thus, the - `task` needs to be prepared to change to the orginal scheduler + `task` needs to be prepared to change to the original scheduler before actually completing. To do so, it is necessary to know the original scheduler and also to have storage for the state needed to change to a different scheduler. It can't be statically @@ -412,7 +412,7 @@ co_await ex::starts_on(s, [](@_parameters_@)->task<@_T_@, @_E_@> { @_logic_@ }(@ Using this approach the use of the scheduler `s` is clearly limited to the nested coroutine. The scheduler affinity is fully taken care of by the use of `affine_on` when `co_await`ing work. There is no -need to provide storage or checks needed for the potiential of +need to provide storage or checks needed for the potential of having a `task` return to the original scheduler if the scheduler isn't actually changed by a `task`. @@ -454,7 +454,7 @@ is quite different from effectively just using `continues_on`: 2. The `affine_on` algorithm should only allow to get `connect`ed to a receiver `r` whose scheduler `sched` obtained by `get_scheduler(get_env(r))` is infallible, i.e., - `get_completion_signatures(schedule(sched), e)` with an envionment + `get_completion_signatures(schedule(sched), e)` with an environment `e` where `get_stop_token(e)` yields `never_stop_token` returns `completion_signatures`. 3. When `affine_on` gets `connect`ed, the scheduling operation state needs From 82047fcdb4e35e56e09ec5604fef134c5a628dde Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Sat, 10 Jan 2026 20:02:23 +0000 Subject: [PATCH 09/11] fix formatting issues --- docs/P3941-affinity.md | 4 +- examples/CMakeLists.txt | 7 +- include/beman/task/detail/task_scheduler.hpp | 394 +++++++------- tests/beman/task/affine_on.test.cpp | 222 ++++---- tests/beman/task/task_scheduler.test.cpp | 522 +++++++++---------- 5 files changed, 572 insertions(+), 577 deletions(-) diff --git a/docs/P3941-affinity.md b/docs/P3941-affinity.md index 922ee72..ea07814 100644 --- a/docs/P3941-affinity.md +++ b/docs/P3941-affinity.md @@ -384,7 +384,7 @@ this use is somewhat problematic in two ways: 1. Changing the scheduler affects the coroutine until the end of the coroutine or until `change_coroutine_scheduler` is `co_await`ed again. It doesn't automatically reset. Thus, local variables - constructed before `change_coroutine_scheduler(s)` was + constructed before `change_coroutine_scheduler(s)` was `co_await`ed were constructed on the original scheduler and are destroyed on the replaced scheduler. 2. The `task`'s execution may finish on a different than the original @@ -497,4 +497,4 @@ algorithm a better name. # Wording Changes: TODO -To be done. \ No newline at end of file +To be done. diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index e54d478..f344a1f 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -1,11 +1,6 @@ # SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception -set(TODO - tls-scheduler - into_optional - issue-start-reschedules - loop -) +set(TODO tls-scheduler into_optional issue-start-reschedules loop) set(ALL_EXAMPLES task_scheduler diff --git a/include/beman/task/detail/task_scheduler.hpp b/include/beman/task/detail/task_scheduler.hpp index 25886b7..4c53919 100644 --- a/include/beman/task/detail/task_scheduler.hpp +++ b/include/beman/task/detail/task_scheduler.hpp @@ -1,197 +1,197 @@ -// include/beman/task/detail/task_scheduler.hpp -*-C++-*- -// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception - -#ifndef INCLUDED_BEMAN_TASK_DETAIL_task_scheduler -#define INCLUDED_BEMAN_TASK_DETAIL_task_scheduler - -#include -#include -#include -#include -#include - -// ---------------------------------------------------------------------------- - -namespace beman::task::detail { - -/*! - * \brief Type-erasing scheduler - * \headerfile beman/task/task.hpp - * - * The class `task_scheduler` is used to type-erase any scheduler class. - * Any error produced by the underlying scheduler except `std::error_code` is turned into - * an `std::exception_ptr`. `std::error_code` is forwarded as is. The `task_scheduler` - * forwards stop requests reported by the stop token obtained from the `connect`ed - * receiver to the sender used by the underlying scheduler. - * - * Completion signatures: - * - * - `ex::set_value_t()` - * - `ex::set_stopped()` - * - * Usage: - * - * task_scheduler sched(other_scheduler); - * auto sender{ex::schedule(sched) | some_sender}; - */ -class task_scheduler { - struct state_base { - virtual ~state_base() = default; - virtual void complete_value() = 0; - }; - - struct inner_state { - struct receiver; - struct receiver { - using receiver_concept = ::beman::execution::receiver_t; - state_base* state; - void set_value() && noexcept { this->state->complete_value(); } - }; - static_assert(::beman::execution::receiver); - - struct base { - virtual ~base() = default; - virtual void start() = 0; - }; - template <::beman::execution::sender Sender> - struct concrete : base { - using state_t = decltype(::beman::execution::connect(std::declval(), std::declval())); - state_t state; - template <::beman::execution::sender S> - concrete(S&& s, state_base* b) : state(::beman::execution::connect(std::forward(s), receiver{b})) {} - void start() override { ::beman::execution::start(state); } - }; - ::beman::task::detail::poly state; - template <::beman::execution::sender S> - inner_state(S&& s, state_base* b) : state(static_cast*>(nullptr), std::forward(s), b) {} - void start() { this->state->start(); } - }; - - template <::beman::execution::receiver Receiver> - struct state : state_base { - using operation_state_concept = ::beman::execution::operation_state_t; - std::remove_cvref_t receiver; - inner_state s; - - template <::beman::execution::receiver R, typename PS> - state(R&& r, PS& ps) : receiver(std::forward(r)), s(ps->connect(this)) {} - void start() & noexcept { this->s.start(); } - void complete_value() override { ::beman::execution::set_value(std::move(this->receiver)); } - }; - - class sender; - class env { - friend class sender; - - private: - const sender* sndr; - env(const sender* s) : sndr(s) {} - - public: - task_scheduler - query(const ::beman::execution::get_completion_scheduler_t<::beman::execution::set_value_t>&) const noexcept { - return this->sndr->inner_sender->get_completion_scheduler(); - } - }; - - // sender implementation - class sender { - friend class env; - - private: - struct base { - virtual ~base() = default; - virtual base* move(void*) = 0; - virtual base* clone(void*) const = 0; - virtual inner_state connect(state_base*) = 0; - virtual task_scheduler get_completion_scheduler() const = 0; - }; - template <::beman::execution::scheduler Scheduler> - struct concrete : base { - using sender_t = decltype(::beman::execution::schedule(std::declval())); - sender_t sender; - - template <::beman::execution::scheduler S> - concrete(S&& s) : sender(::beman::execution::schedule(std::forward(s))) {} - base* move(void* buffer) override { return new (buffer) concrete(std::move(*this)); } - base* clone(void* buffer) const override { return new (buffer) concrete(*this); } - inner_state connect(state_base* b) override { return inner_state(::std::move(sender), b); } - task_scheduler get_completion_scheduler() const override { - return task_scheduler(::beman::execution::get_completion_scheduler<::beman::execution::set_value_t>( - ::beman::execution::get_env(this->sender))); - } - }; - poly inner_sender; - - public: - using sender_concept = ::beman::execution::sender_t; - using completion_signatures = ::beman::execution::completion_signatures<::beman::execution::set_value_t()>; - - template <::beman::execution::scheduler S> - explicit sender(S&& s) : inner_sender(static_cast*>(nullptr), std::forward(s)) {} - sender(sender&&) = default; - sender(const sender&) = default; - - template <::beman::execution::receiver R> - state connect(R&& r) { - return state(std::forward(r), this->inner_sender); - } - - env get_env() const noexcept { return env(this); } - }; - - // scheduler implementation - struct base { - virtual ~base() = default; - virtual sender schedule() = 0; - virtual base* move(void* buffer) = 0; - virtual base* clone(void*) const = 0; - virtual bool equals(const base*) const = 0; - }; - template <::beman::execution::scheduler Scheduler> - struct concrete : base { - Scheduler scheduler; - template - requires ::beman::execution::scheduler<::std::remove_cvref_t> - explicit concrete(S&& s) : scheduler(std::forward(s)) {} - sender schedule() override { return sender(this->scheduler); } - base* move(void* buffer) override { return new (buffer) concrete(std::move(*this)); } - base* clone(void* buffer) const override { return new (buffer) concrete(*this); } - bool equals(const base* o) const override { - auto other{dynamic_cast(o)}; - return other ? this->scheduler == other->scheduler : false; - } - }; - - poly scheduler; - - public: - using scheduler_concept = ::beman::execution::scheduler_t; - - template > - requires(not std::same_as>) && - ::beman::execution::scheduler<::std::remove_cvref_t> - explicit task_scheduler(S&& s, Allocator = {}) - : scheduler(static_cast>*>(nullptr), std::forward(s)) {} - task_scheduler(task_scheduler&&) = default; - task_scheduler(const task_scheduler&) = default; - template - task_scheduler(const task_scheduler& other, Allocator) : scheduler(other.scheduler) {} - task_scheduler& operator=(const task_scheduler&) = default; - ~task_scheduler() = default; - - sender schedule() { return this->scheduler->schedule(); } - bool operator==(const task_scheduler&) const = default; - template - requires(not ::std::same_as) && ::beman::execution::scheduler - bool operator==(const Sched& other [[maybe_unused]]) const { - return *this == task_scheduler(other); - } -}; -static_assert(::beman::execution::scheduler); - -} // namespace beman::task::detail - -// ---------------------------------------------------------------------------- - -#endif + // include/beman/task/detail/task_scheduler.hpp -*-C++-*- + // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + + #ifndef INCLUDED_BEMAN_TASK_DETAIL_task_scheduler + #define INCLUDED_BEMAN_TASK_DETAIL_task_scheduler + + #include + #include + #include + #include + #include + + // ---------------------------------------------------------------------------- + + namespace beman::task::detail { + + /*! + * \brief Type-erasing scheduler + * \headerfile beman/task/task.hpp + * + * The class `task_scheduler` is used to type-erase any scheduler class. + * Any error produced by the underlying scheduler except `std::error_code` is turned into + * an `std::exception_ptr`. `std::error_code` is forwarded as is. The `task_scheduler` + * forwards stop requests reported by the stop token obtained from the `connect`ed + * receiver to the sender used by the underlying scheduler. + * + * Completion signatures: + * + * - `ex::set_value_t()` + * - `ex::set_stopped()` + * + * Usage: + * + * task_scheduler sched(other_scheduler); + * auto sender{ex::schedule(sched) | some_sender}; + */ + class task_scheduler { + struct state_base { + virtual ~state_base() = default; + virtual void complete_value() = 0; + }; + + struct inner_state { + struct receiver; + struct receiver { + using receiver_concept = ::beman::execution::receiver_t; + state_base* state; + void set_value() && noexcept { this->state->complete_value(); } + }; + static_assert(::beman::execution::receiver); + + struct base { + virtual ~base() = default; + virtual void start() = 0; + }; + template <::beman::execution::sender Sender> + struct concrete : base { + using state_t = decltype(::beman::execution::connect(std::declval(), std::declval())); + state_t state; + template <::beman::execution::sender S> + concrete(S&& s, state_base* b) : state(::beman::execution::connect(std::forward(s), receiver{b})) {} + void start() override { ::beman::execution::start(state); } + }; + ::beman::task::detail::poly state; + template <::beman::execution::sender S> + inner_state(S&& s, state_base* b) : state(static_cast*>(nullptr), std::forward(s), b) {} + void start() { this->state->start(); } + }; + + template <::beman::execution::receiver Receiver> + struct state : state_base { + using operation_state_concept = ::beman::execution::operation_state_t; + std::remove_cvref_t receiver; + inner_state s; + + template <::beman::execution::receiver R, typename PS> + state(R&& r, PS& ps) : receiver(std::forward(r)), s(ps->connect(this)) {} + void start() & noexcept { this->s.start(); } + void complete_value() override { ::beman::execution::set_value(std::move(this->receiver)); } + }; + + class sender; + class env { + friend class sender; + + private: + const sender* sndr; + env(const sender* s) : sndr(s) {} + + public: + task_scheduler + query(const ::beman::execution::get_completion_scheduler_t<::beman::execution::set_value_t>&) const noexcept { + return this->sndr->inner_sender->get_completion_scheduler(); + } + }; + + // sender implementation + class sender { + friend class env; + + private: + struct base { + virtual ~base() = default; + virtual base* move(void*) = 0; + virtual base* clone(void*) const = 0; + virtual inner_state connect(state_base*) = 0; + virtual task_scheduler get_completion_scheduler() const = 0; + }; + template <::beman::execution::scheduler Scheduler> + struct concrete : base { + using sender_t = decltype(::beman::execution::schedule(std::declval())); + sender_t sender; + + template <::beman::execution::scheduler S> + concrete(S&& s) : sender(::beman::execution::schedule(std::forward(s))) {} + base* move(void* buffer) override { return new (buffer) concrete(std::move(*this)); } + base* clone(void* buffer) const override { return new (buffer) concrete(*this); } + inner_state connect(state_base* b) override { return inner_state(::std::move(sender), b); } + task_scheduler get_completion_scheduler() const override { + return task_scheduler(::beman::execution::get_completion_scheduler<::beman::execution::set_value_t>( + ::beman::execution::get_env(this->sender))); + } + }; + poly inner_sender; + + public: + using sender_concept = ::beman::execution::sender_t; + using completion_signatures = ::beman::execution::completion_signatures<::beman::execution::set_value_t()>; + + template <::beman::execution::scheduler S> + explicit sender(S&& s) : inner_sender(static_cast*>(nullptr), std::forward(s)) {} + sender(sender&&) = default; + sender(const sender&) = default; + + template <::beman::execution::receiver R> + state connect(R&& r) { + return state(std::forward(r), this->inner_sender); + } + + env get_env() const noexcept { return env(this); } + }; + + // scheduler implementation + struct base { + virtual ~base() = default; + virtual sender schedule() = 0; + virtual base* move(void* buffer) = 0; + virtual base* clone(void*) const = 0; + virtual bool equals(const base*) const = 0; + }; + template <::beman::execution::scheduler Scheduler> + struct concrete : base { + Scheduler scheduler; + template + requires ::beman::execution::scheduler<::std::remove_cvref_t> + explicit concrete(S&& s) : scheduler(std::forward(s)) {} + sender schedule() override { return sender(this->scheduler); } + base* move(void* buffer) override { return new (buffer) concrete(std::move(*this)); } + base* clone(void* buffer) const override { return new (buffer) concrete(*this); } + bool equals(const base* o) const override { + auto other{dynamic_cast(o)}; + return other ? this->scheduler == other->scheduler : false; + } + }; + + poly scheduler; + + public: + using scheduler_concept = ::beman::execution::scheduler_t; + + template > + requires(not std::same_as>) && + ::beman::execution::scheduler<::std::remove_cvref_t> + explicit task_scheduler(S&& s, Allocator = {}) + : scheduler(static_cast>*>(nullptr), std::forward(s)) {} + task_scheduler(task_scheduler&&) = default; + task_scheduler(const task_scheduler&) = default; + template + task_scheduler(const task_scheduler& other, Allocator) : scheduler(other.scheduler) {} + task_scheduler& operator=(const task_scheduler&) = default; + ~task_scheduler() = default; + + sender schedule() { return this->scheduler->schedule(); } + bool operator==(const task_scheduler&) const = default; + template + requires(not ::std::same_as) && ::beman::execution::scheduler + bool operator==(const Sched& other [[maybe_unused]]) const { + return *this == task_scheduler(other); + } + }; + static_assert(::beman::execution::scheduler); + + } // namespace beman::task::detail + + // ---------------------------------------------------------------------------- + + #endif diff --git a/tests/beman/task/affine_on.test.cpp b/tests/beman/task/affine_on.test.cpp index da57335..d39716e 100644 --- a/tests/beman/task/affine_on.test.cpp +++ b/tests/beman/task/affine_on.test.cpp @@ -1,111 +1,111 @@ -// tests/beman/task/affine_on.test.cpp -*-C++-*- -// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception - -#include -#include -#include -#include -#ifdef NDEBUG -#undef NDEBUG -#endif -#include - -namespace ex = beman::execution; - -// ---------------------------------------------------------------------------- - -namespace { -template -struct receiver { - using receiver_concept = ex::receiver_t; - - std::remove_cvref_t scheduler; - auto get_env() const noexcept { return ex::detail::make_env(ex::get_scheduler, scheduler); } - auto set_value(auto&&...) && noexcept -> void {} -}; -template -receiver(Scheduler&& sched) -> receiver; - -template -struct test_scheduler { - using scheduler_concept = ex::scheduler_t; - struct sender { - using sender_concept = ex::sender_t; - template - auto get_completion_signatures(Env&& env) const noexcept { - if constexpr (ex::unstoppable_token) - return ex::completion_signatures{}; - else - return ex::completion_signatures{}; - } - struct env { - template - auto query(ex::get_completion_scheduler_t) const noexcept -> test_scheduler { - return {}; - } - }; - auto get_env() const noexcept { return env{}; } - - template - struct op_state { - using operation_state_concept = ex::operation_state_t; - std::remove_cvref_t rcvr; - void start() & noexcept { - static_assert(ex::operation_state); - ex::set_value(std::move(this->rcvr)); - } - }; - template - auto connect(Rcvr&& rcvr) noexcept { - static_assert(ex::sender); - return op_state{std::forward(rcvr)}; - } - }; - - auto schedule() noexcept -> sender { - static_assert(ex::scheduler>); - return {}; - } - auto operator==(const test_scheduler&) const -> bool = default; -}; -static_assert(ex::scheduler>); - -static_assert(ex::receiver>); -static_assert(ex::receiver>>); - -template -auto test_affine_on_accepted_scheduler(Receiver rcvr) { - if constexpr (requires { ex::connect(beman::task::affine_on(ex::just()), std::move(rcvr)); }) { - auto state(ex::connect(beman::task::affine_on(ex::just()), std::move(rcvr))); - ex::start(state); - } -} - -auto test_affine_on_accepted_scheduler() { - test_affine_on_accepted_scheduler(receiver{beman::task::detail::inline_scheduler{}}); - test_affine_on_accepted_scheduler(receiver{test_scheduler<>{}}); - // test_affine_on_accepted_scheduler(receiver{test_scheduler{}}); -} - -} // namespace - -int main() { - beman::task::detail::single_thread_context context; - - auto main_id{std::this_thread::get_id()}; - auto [thread_id]{ - ex::sync_wait(ex::schedule(context.get_scheduler()) | ex::then([] { return std::this_thread::get_id(); })) - .value_or(std::tuple{std::thread::id{}})}; - - assert(main_id != thread_id); - - [[maybe_unused]] auto s(beman::task::affine_on(ex::just(42))); - static_assert(ex::sender); - [[maybe_unused]] auto st(ex::connect(std::move(s), receiver{context.get_scheduler()})); - ex::sync_wait( - beman::task::affine_on(ex::starts_on(context.get_scheduler(), ex::just(42)) | ex::then([thread_id](int value) { - assert(thread_id == std::this_thread::get_id()); - assert(value == 42); - })) | - ex::then([thread_id]() { assert(thread_id != std::this_thread::get_id()); })); -} + // tests/beman/task/affine_on.test.cpp -*-C++-*- + // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + + #include + #include + #include + #include + #ifdef NDEBUG + #undef NDEBUG + #endif + #include + + namespace ex = beman::execution; + + // ---------------------------------------------------------------------------- + + namespace { + template + struct receiver { + using receiver_concept = ex::receiver_t; + + std::remove_cvref_t scheduler; + auto get_env() const noexcept { return ex::detail::make_env(ex::get_scheduler, scheduler); } + auto set_value(auto&&...) && noexcept -> void {} + }; + template + receiver(Scheduler&& sched) -> receiver; + + template + struct test_scheduler { + using scheduler_concept = ex::scheduler_t; + struct sender { + using sender_concept = ex::sender_t; + template + auto get_completion_signatures(Env&& env) const noexcept { + if constexpr (ex::unstoppable_token) + return ex::completion_signatures{}; + else + return ex::completion_signatures{}; + } + struct env { + template + auto query(ex::get_completion_scheduler_t) const noexcept -> test_scheduler { + return {}; + } + }; + auto get_env() const noexcept { return env{}; } + + template + struct op_state { + using operation_state_concept = ex::operation_state_t; + std::remove_cvref_t rcvr; + void start() & noexcept { + static_assert(ex::operation_state); + ex::set_value(std::move(this->rcvr)); + } + }; + template + auto connect(Rcvr&& rcvr) noexcept { + static_assert(ex::sender); + return op_state{std::forward(rcvr)}; + } + }; + + auto schedule() noexcept -> sender { + static_assert(ex::scheduler>); + return {}; + } + auto operator==(const test_scheduler&) const -> bool = default; + }; + static_assert(ex::scheduler>); + + static_assert(ex::receiver>); + static_assert(ex::receiver>>); + + template + auto test_affine_on_accepted_scheduler(Receiver rcvr) { + if constexpr (requires { ex::connect(beman::task::affine_on(ex::just()), std::move(rcvr)); }) { + auto state(ex::connect(beman::task::affine_on(ex::just()), std::move(rcvr))); + ex::start(state); + } + } + + auto test_affine_on_accepted_scheduler() { + test_affine_on_accepted_scheduler(receiver{beman::task::detail::inline_scheduler{}}); + test_affine_on_accepted_scheduler(receiver{test_scheduler<>{}}); + // test_affine_on_accepted_scheduler(receiver{test_scheduler{}}); + } + + } // namespace + + int main() { + beman::task::detail::single_thread_context context; + + auto main_id{std::this_thread::get_id()}; + auto [thread_id]{ + ex::sync_wait(ex::schedule(context.get_scheduler()) | ex::then([] { return std::this_thread::get_id(); })) + .value_or(std::tuple{std::thread::id{}})}; + + assert(main_id != thread_id); + + [[maybe_unused]] auto s(beman::task::affine_on(ex::just(42))); + static_assert(ex::sender); + [[maybe_unused]] auto st(ex::connect(std::move(s), receiver{context.get_scheduler()})); + ex::sync_wait( + beman::task::affine_on(ex::starts_on(context.get_scheduler(), ex::just(42)) | ex::then([thread_id](int value) { + assert(thread_id == std::this_thread::get_id()); + assert(value == 42); + })) | + ex::then([thread_id]() { assert(thread_id != std::this_thread::get_id()); })); + } diff --git a/tests/beman/task/task_scheduler.test.cpp b/tests/beman/task/task_scheduler.test.cpp index 20914e1..ca06d36 100644 --- a/tests/beman/task/task_scheduler.test.cpp +++ b/tests/beman/task/task_scheduler.test.cpp @@ -1,261 +1,261 @@ -// tests/beman/task/task_scheduler.test.cpp -*-C++-*- -// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#ifdef NDEBUG -#undef NDEBUG -#endif -#include - -namespace ex = beman::execution; -namespace ly = beman::task; - -// ---------------------------------------------------------------------------- - -namespace { - -void unexpected_call_assert(const char* msg) { assert(nullptr == msg); } - -struct thread_context { - enum class complete : char { success, failure, exception, never }; - struct base { - base* next{}; - base() = default; - base(base&&) = delete; - base(const base&) = delete; - virtual ~base() = default; - base& operator=(base&&) = delete; - base& operator=(const base&) = delete; - virtual void complete() = 0; - }; - - std::mutex mutex; - std::condition_variable condition; - bool done{false}; - base* work{}; - std::thread thread; - - base* get_work() { - std::unique_lock cerberus(this->mutex); - condition.wait(cerberus, [this] { return this->done || this->work; }); - base* rc{this->work}; - if (rc) { - this->work = rc->next; - } - return rc; - } - void enqueue(base* w) { - { - std::lock_guard cerberus(this->mutex); - w->next = std::exchange(this->work, w); - } - this->condition.notify_one(); - } - - thread_context() - : thread([this] { - while (auto w{this->get_work()}) { - w->complete(); - } - }) {} - thread_context(thread_context&&) = delete; - thread_context(const thread_context&) = delete; - ~thread_context() { - this->stop(); - this->thread.join(); - } - thread_context& operator=(thread_context&&) = delete; - thread_context& operator=(const thread_context&) = delete; - - struct scheduler { - using scheduler_concept = ex::scheduler_t; - thread_context* context; - complete cmpl{complete::success}; - bool operator==(const scheduler&) const = default; - - template - struct state : base { - struct stopper { - state* st; - void operator()() noexcept { - auto self{this->st}; - self->callback.reset(); - ex::set_stopped(std::move(self->receiver)); - } - }; - using operation_state_concept = ex::operation_state_t; - using token_t = decltype(ex::get_stop_token(ex::get_env(std::declval()))); - using callback_t = ex::stop_callback_for_t; - - thread_context* ctxt; - std::remove_cvref_t receiver; - thread_context::complete cmpl; - std::optional callback; - - template - state(auto c, R&& r, thread_context::complete cm) : ctxt(c), receiver(std::forward(r)), cmpl(cm) {} - void start() & noexcept { - callback.emplace(ex::get_stop_token(ex::get_env(this->receiver)), stopper{this}); - if (cmpl != thread_context::complete::never) - this->ctxt->enqueue(this); - } - void complete() override { - this->callback.reset(); - ex::set_value(std::move(this->receiver)); - } - }; - struct env { - thread_context* ctxt; - scheduler query(const ex::get_completion_scheduler_t&) const noexcept { - return scheduler{ctxt}; - } - }; - struct sender { - using sender_concept = ex::sender_t; - using completion_signatures = ex::completion_signatures; - - thread_context* ctxt; - thread_context::complete cmpl; - - template - auto connect(Receiver&& receiver) { - static_assert(ex::operation_state>); - return state(this->ctxt, std::forward(receiver), this->cmpl); - } - env get_env() const noexcept { return {this->ctxt}; } - }; - static_assert(ex::sender); - - sender schedule() noexcept { return sender{this->context, this->cmpl}; } - }; - static_assert(ex::scheduler); - - scheduler get_scheduler(complete cmpl = complete::success) { return scheduler{this, cmpl}; } - void stop() { - { - std::lock_guard cerberus(this->mutex); - this->done = true; - } - this->condition.notify_one(); - } -}; - -enum class stop_result : char { none, success, failure, stopped }; -template -struct stop_env { - Token token; - auto query(ex::get_stop_token_t) const noexcept { return this->token; } -}; -template -stop_env(Token&&) -> stop_env>; - -template -struct stop_receiver { - using receiver_concept = ex::receiver_t; - Token token; - stop_result& result; - std::latch* completed{}; - auto get_env() const noexcept { return stop_env{this->token}; } - - void set_value(auto&&...) && noexcept { - this->result = stop_result::success; - if (this->completed) - this->completed->count_down(); - } - void set_error(auto&&) && noexcept { - this->result = stop_result::failure; - if (this->completed) - this->completed->count_down(); - } - void set_stopped() && noexcept { - this->result = stop_result::stopped; - if (this->completed) - this->completed->count_down(); - } -}; -template -stop_receiver(Token&&, stop_result&, std::latch* = nullptr) -> stop_receiver>; -static_assert(ex::receiver>); - -} // namespace - -// ---------------------------------------------------------------------------- - -int main() { - try { - static_assert(ex::scheduler); - - thread_context ctxt1; - thread_context ctxt2; - - assert(ctxt1.get_scheduler() == ctxt1.get_scheduler()); - assert(ctxt2.get_scheduler() == ctxt2.get_scheduler()); - assert(ctxt1.get_scheduler() != ctxt2.get_scheduler()); - - ly::detail::task_scheduler sched1(ctxt1.get_scheduler()); - ly::detail::task_scheduler sched2(ctxt2.get_scheduler()); - assert(sched1 == sched1); - assert(sched2 == sched2); - assert(sched1 != sched2); - - ly::detail::task_scheduler copy(sched1); - assert(copy == sched1); - assert(copy != sched2); - ly::detail::task_scheduler move(std::move(copy)); - assert(move == sched1); - assert(move != sched2); - - copy = sched2; - assert(copy == sched2); - assert(copy != sched1); - - move = std::move(copy); - assert(move == sched2); - assert(move != sched1); - - std::atomic id1{}; - std::atomic id2{}; - ex::sync_wait(ex::schedule(sched1) | ex::then([&id1]() { id1 = std::this_thread::get_id(); })); - ex::sync_wait(ex::schedule(sched2) | ex::then([&id2]() { id2 = std::this_thread::get_id(); })); - assert(id1 != id2); - ex::sync_wait(ex::schedule(ly::detail::task_scheduler(sched1)) | - ex::then([&id1]() { assert(id1 == std::this_thread::get_id()); })); - ex::sync_wait(ex::schedule(ly::detail::task_scheduler(sched2)) | - ex::then([&id2]() { assert(id2 == std::this_thread::get_id()); })); - - { - ex::inplace_stop_source source; - stop_result result{stop_result::none}; - auto state{ex::connect(ex::schedule(ctxt1.get_scheduler(thread_context::complete::never)), - stop_receiver{source.get_token(), result})}; - assert(result == stop_result::none); - ex::start(state); - assert(result == stop_result::none); - source.request_stop(); - assert(result == stop_result::stopped); - } - { - std::latch completed{1}; - stop_result result{stop_result::none}; - auto state{ex::connect( - ex::schedule(ly::detail::task_scheduler(ctxt1.get_scheduler(thread_context::complete::success))), - stop_receiver{ex::never_stop_token(), result, &completed})}; - assert(result == stop_result::none); - ex::start(state); - completed.wait(); - assert(result == stop_result::success); - } - } catch (...) { - unexpected_call_assert("no exception should escape to main"); - } -} + // tests/beman/task/task_scheduler.test.cpp -*-C++-*- + // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + + #include + #include + #include + #include + #include + #include + #include + #include + #include + #include + #include + #ifdef NDEBUG + #undef NDEBUG + #endif + #include + + namespace ex = beman::execution; + namespace ly = beman::task; + + // ---------------------------------------------------------------------------- + + namespace { + + void unexpected_call_assert(const char* msg) { assert(nullptr == msg); } + + struct thread_context { + enum class complete : char { success, failure, exception, never }; + struct base { + base* next{}; + base() = default; + base(base&&) = delete; + base(const base&) = delete; + virtual ~base() = default; + base& operator=(base&&) = delete; + base& operator=(const base&) = delete; + virtual void complete() = 0; + }; + + std::mutex mutex; + std::condition_variable condition; + bool done{false}; + base* work{}; + std::thread thread; + + base* get_work() { + std::unique_lock cerberus(this->mutex); + condition.wait(cerberus, [this] { return this->done || this->work; }); + base* rc{this->work}; + if (rc) { + this->work = rc->next; + } + return rc; + } + void enqueue(base* w) { + { + std::lock_guard cerberus(this->mutex); + w->next = std::exchange(this->work, w); + } + this->condition.notify_one(); + } + + thread_context() + : thread([this] { + while (auto w{this->get_work()}) { + w->complete(); + } + }) {} + thread_context(thread_context&&) = delete; + thread_context(const thread_context&) = delete; + ~thread_context() { + this->stop(); + this->thread.join(); + } + thread_context& operator=(thread_context&&) = delete; + thread_context& operator=(const thread_context&) = delete; + + struct scheduler { + using scheduler_concept = ex::scheduler_t; + thread_context* context; + complete cmpl{complete::success}; + bool operator==(const scheduler&) const = default; + + template + struct state : base { + struct stopper { + state* st; + void operator()() noexcept { + auto self{this->st}; + self->callback.reset(); + ex::set_stopped(std::move(self->receiver)); + } + }; + using operation_state_concept = ex::operation_state_t; + using token_t = decltype(ex::get_stop_token(ex::get_env(std::declval()))); + using callback_t = ex::stop_callback_for_t; + + thread_context* ctxt; + std::remove_cvref_t receiver; + thread_context::complete cmpl; + std::optional callback; + + template + state(auto c, R&& r, thread_context::complete cm) : ctxt(c), receiver(std::forward(r)), cmpl(cm) {} + void start() & noexcept { + callback.emplace(ex::get_stop_token(ex::get_env(this->receiver)), stopper{this}); + if (cmpl != thread_context::complete::never) + this->ctxt->enqueue(this); + } + void complete() override { + this->callback.reset(); + ex::set_value(std::move(this->receiver)); + } + }; + struct env { + thread_context* ctxt; + scheduler query(const ex::get_completion_scheduler_t&) const noexcept { + return scheduler{ctxt}; + } + }; + struct sender { + using sender_concept = ex::sender_t; + using completion_signatures = ex::completion_signatures; + + thread_context* ctxt; + thread_context::complete cmpl; + + template + auto connect(Receiver&& receiver) { + static_assert(ex::operation_state>); + return state(this->ctxt, std::forward(receiver), this->cmpl); + } + env get_env() const noexcept { return {this->ctxt}; } + }; + static_assert(ex::sender); + + sender schedule() noexcept { return sender{this->context, this->cmpl}; } + }; + static_assert(ex::scheduler); + + scheduler get_scheduler(complete cmpl = complete::success) { return scheduler{this, cmpl}; } + void stop() { + { + std::lock_guard cerberus(this->mutex); + this->done = true; + } + this->condition.notify_one(); + } + }; + + enum class stop_result : char { none, success, failure, stopped }; + template + struct stop_env { + Token token; + auto query(ex::get_stop_token_t) const noexcept { return this->token; } + }; + template + stop_env(Token&&) -> stop_env>; + + template + struct stop_receiver { + using receiver_concept = ex::receiver_t; + Token token; + stop_result& result; + std::latch* completed{}; + auto get_env() const noexcept { return stop_env{this->token}; } + + void set_value(auto&&...) && noexcept { + this->result = stop_result::success; + if (this->completed) + this->completed->count_down(); + } + void set_error(auto&&) && noexcept { + this->result = stop_result::failure; + if (this->completed) + this->completed->count_down(); + } + void set_stopped() && noexcept { + this->result = stop_result::stopped; + if (this->completed) + this->completed->count_down(); + } + }; + template + stop_receiver(Token&&, stop_result&, std::latch* = nullptr) -> stop_receiver>; + static_assert(ex::receiver>); + + } // namespace + + // ---------------------------------------------------------------------------- + + int main() { + try { + static_assert(ex::scheduler); + + thread_context ctxt1; + thread_context ctxt2; + + assert(ctxt1.get_scheduler() == ctxt1.get_scheduler()); + assert(ctxt2.get_scheduler() == ctxt2.get_scheduler()); + assert(ctxt1.get_scheduler() != ctxt2.get_scheduler()); + + ly::detail::task_scheduler sched1(ctxt1.get_scheduler()); + ly::detail::task_scheduler sched2(ctxt2.get_scheduler()); + assert(sched1 == sched1); + assert(sched2 == sched2); + assert(sched1 != sched2); + + ly::detail::task_scheduler copy(sched1); + assert(copy == sched1); + assert(copy != sched2); + ly::detail::task_scheduler move(std::move(copy)); + assert(move == sched1); + assert(move != sched2); + + copy = sched2; + assert(copy == sched2); + assert(copy != sched1); + + move = std::move(copy); + assert(move == sched2); + assert(move != sched1); + + std::atomic id1{}; + std::atomic id2{}; + ex::sync_wait(ex::schedule(sched1) | ex::then([&id1]() { id1 = std::this_thread::get_id(); })); + ex::sync_wait(ex::schedule(sched2) | ex::then([&id2]() { id2 = std::this_thread::get_id(); })); + assert(id1 != id2); + ex::sync_wait(ex::schedule(ly::detail::task_scheduler(sched1)) | + ex::then([&id1]() { assert(id1 == std::this_thread::get_id()); })); + ex::sync_wait(ex::schedule(ly::detail::task_scheduler(sched2)) | + ex::then([&id2]() { assert(id2 == std::this_thread::get_id()); })); + + { + ex::inplace_stop_source source; + stop_result result{stop_result::none}; + auto state{ex::connect(ex::schedule(ctxt1.get_scheduler(thread_context::complete::never)), + stop_receiver{source.get_token(), result})}; + assert(result == stop_result::none); + ex::start(state); + assert(result == stop_result::none); + source.request_stop(); + assert(result == stop_result::stopped); + } + { + std::latch completed{1}; + stop_result result{stop_result::none}; + auto state{ex::connect( + ex::schedule(ly::detail::task_scheduler(ctxt1.get_scheduler(thread_context::complete::success))), + stop_receiver{ex::never_stop_token(), result, &completed})}; + assert(result == stop_result::none); + ex::start(state); + completed.wait(); + assert(result == stop_result::success); + } + } catch (...) { + unexpected_call_assert("no exception should escape to main"); + } + } From a6b94036410986342591238f546ff632eee40248 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Sat, 10 Jan 2026 20:14:28 +0000 Subject: [PATCH 10/11] fix more CI errors --- include/beman/task/detail/task_scheduler.hpp | 394 +++++++------- tests/beman/task/affine_on.test.cpp | 224 ++++---- tests/beman/task/task_scheduler.test.cpp | 522 +++++++++---------- 3 files changed, 571 insertions(+), 569 deletions(-) diff --git a/include/beman/task/detail/task_scheduler.hpp b/include/beman/task/detail/task_scheduler.hpp index 4c53919..cc3f127 100644 --- a/include/beman/task/detail/task_scheduler.hpp +++ b/include/beman/task/detail/task_scheduler.hpp @@ -1,197 +1,197 @@ - // include/beman/task/detail/task_scheduler.hpp -*-C++-*- - // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception - - #ifndef INCLUDED_BEMAN_TASK_DETAIL_task_scheduler - #define INCLUDED_BEMAN_TASK_DETAIL_task_scheduler - - #include - #include - #include - #include - #include - - // ---------------------------------------------------------------------------- - - namespace beman::task::detail { - - /*! - * \brief Type-erasing scheduler - * \headerfile beman/task/task.hpp - * - * The class `task_scheduler` is used to type-erase any scheduler class. - * Any error produced by the underlying scheduler except `std::error_code` is turned into - * an `std::exception_ptr`. `std::error_code` is forwarded as is. The `task_scheduler` - * forwards stop requests reported by the stop token obtained from the `connect`ed - * receiver to the sender used by the underlying scheduler. - * - * Completion signatures: - * - * - `ex::set_value_t()` - * - `ex::set_stopped()` - * - * Usage: - * - * task_scheduler sched(other_scheduler); - * auto sender{ex::schedule(sched) | some_sender}; - */ - class task_scheduler { - struct state_base { - virtual ~state_base() = default; - virtual void complete_value() = 0; - }; - - struct inner_state { - struct receiver; - struct receiver { - using receiver_concept = ::beman::execution::receiver_t; - state_base* state; - void set_value() && noexcept { this->state->complete_value(); } - }; - static_assert(::beman::execution::receiver); - - struct base { - virtual ~base() = default; - virtual void start() = 0; - }; - template <::beman::execution::sender Sender> - struct concrete : base { - using state_t = decltype(::beman::execution::connect(std::declval(), std::declval())); - state_t state; - template <::beman::execution::sender S> - concrete(S&& s, state_base* b) : state(::beman::execution::connect(std::forward(s), receiver{b})) {} - void start() override { ::beman::execution::start(state); } - }; - ::beman::task::detail::poly state; - template <::beman::execution::sender S> - inner_state(S&& s, state_base* b) : state(static_cast*>(nullptr), std::forward(s), b) {} - void start() { this->state->start(); } - }; - - template <::beman::execution::receiver Receiver> - struct state : state_base { - using operation_state_concept = ::beman::execution::operation_state_t; - std::remove_cvref_t receiver; - inner_state s; - - template <::beman::execution::receiver R, typename PS> - state(R&& r, PS& ps) : receiver(std::forward(r)), s(ps->connect(this)) {} - void start() & noexcept { this->s.start(); } - void complete_value() override { ::beman::execution::set_value(std::move(this->receiver)); } - }; - - class sender; - class env { - friend class sender; - - private: - const sender* sndr; - env(const sender* s) : sndr(s) {} - - public: - task_scheduler - query(const ::beman::execution::get_completion_scheduler_t<::beman::execution::set_value_t>&) const noexcept { - return this->sndr->inner_sender->get_completion_scheduler(); - } - }; - - // sender implementation - class sender { - friend class env; - - private: - struct base { - virtual ~base() = default; - virtual base* move(void*) = 0; - virtual base* clone(void*) const = 0; - virtual inner_state connect(state_base*) = 0; - virtual task_scheduler get_completion_scheduler() const = 0; - }; - template <::beman::execution::scheduler Scheduler> - struct concrete : base { - using sender_t = decltype(::beman::execution::schedule(std::declval())); - sender_t sender; - - template <::beman::execution::scheduler S> - concrete(S&& s) : sender(::beman::execution::schedule(std::forward(s))) {} - base* move(void* buffer) override { return new (buffer) concrete(std::move(*this)); } - base* clone(void* buffer) const override { return new (buffer) concrete(*this); } - inner_state connect(state_base* b) override { return inner_state(::std::move(sender), b); } - task_scheduler get_completion_scheduler() const override { - return task_scheduler(::beman::execution::get_completion_scheduler<::beman::execution::set_value_t>( - ::beman::execution::get_env(this->sender))); - } - }; - poly inner_sender; - - public: - using sender_concept = ::beman::execution::sender_t; - using completion_signatures = ::beman::execution::completion_signatures<::beman::execution::set_value_t()>; - - template <::beman::execution::scheduler S> - explicit sender(S&& s) : inner_sender(static_cast*>(nullptr), std::forward(s)) {} - sender(sender&&) = default; - sender(const sender&) = default; - - template <::beman::execution::receiver R> - state connect(R&& r) { - return state(std::forward(r), this->inner_sender); - } - - env get_env() const noexcept { return env(this); } - }; - - // scheduler implementation - struct base { - virtual ~base() = default; - virtual sender schedule() = 0; - virtual base* move(void* buffer) = 0; - virtual base* clone(void*) const = 0; - virtual bool equals(const base*) const = 0; - }; - template <::beman::execution::scheduler Scheduler> - struct concrete : base { - Scheduler scheduler; - template - requires ::beman::execution::scheduler<::std::remove_cvref_t> - explicit concrete(S&& s) : scheduler(std::forward(s)) {} - sender schedule() override { return sender(this->scheduler); } - base* move(void* buffer) override { return new (buffer) concrete(std::move(*this)); } - base* clone(void* buffer) const override { return new (buffer) concrete(*this); } - bool equals(const base* o) const override { - auto other{dynamic_cast(o)}; - return other ? this->scheduler == other->scheduler : false; - } - }; - - poly scheduler; - - public: - using scheduler_concept = ::beman::execution::scheduler_t; - - template > - requires(not std::same_as>) && - ::beman::execution::scheduler<::std::remove_cvref_t> - explicit task_scheduler(S&& s, Allocator = {}) - : scheduler(static_cast>*>(nullptr), std::forward(s)) {} - task_scheduler(task_scheduler&&) = default; - task_scheduler(const task_scheduler&) = default; - template - task_scheduler(const task_scheduler& other, Allocator) : scheduler(other.scheduler) {} - task_scheduler& operator=(const task_scheduler&) = default; - ~task_scheduler() = default; - - sender schedule() { return this->scheduler->schedule(); } - bool operator==(const task_scheduler&) const = default; - template - requires(not ::std::same_as) && ::beman::execution::scheduler - bool operator==(const Sched& other [[maybe_unused]]) const { - return *this == task_scheduler(other); - } - }; - static_assert(::beman::execution::scheduler); - - } // namespace beman::task::detail - - // ---------------------------------------------------------------------------- - - #endif +// include/beman/task/detail/task_scheduler.hpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#ifndef INCLUDED_BEMAN_TASK_DETAIL_task_scheduler +#define INCLUDED_BEMAN_TASK_DETAIL_task_scheduler + +#include +#include +#include +#include +#include + +// ---------------------------------------------------------------------------- + +namespace beman::task::detail { + +/*! + * \brief Type-erasing scheduler + * \headerfile beman/task/task.hpp + * + * The class `task_scheduler` is used to type-erase any scheduler class. + * Any error produced by the underlying scheduler except `std::error_code` is turned into + * an `std::exception_ptr`. `std::error_code` is forwarded as is. The `task_scheduler` + * forwards stop requests reported by the stop token obtained from the `connect`ed + * receiver to the sender used by the underlying scheduler. + * + * Completion signatures: + * + * - `ex::set_value_t()` + * - `ex::set_stopped()` + * + * Usage: + * + * task_scheduler sched(other_scheduler); + * auto sender{ex::schedule(sched) | some_sender}; + */ +class task_scheduler { + struct state_base { + virtual ~state_base() = default; + virtual void complete_value() = 0; + }; + + struct inner_state { + struct receiver; + struct receiver { + using receiver_concept = ::beman::execution::receiver_t; + state_base* state; + void set_value() && noexcept { this->state->complete_value(); } + }; + static_assert(::beman::execution::receiver); + + struct base { + virtual ~base() = default; + virtual void start() = 0; + }; + template <::beman::execution::sender Sender> + struct concrete : base { + using state_t = decltype(::beman::execution::connect(std::declval(), std::declval())); + state_t state; + template <::beman::execution::sender S> + concrete(S&& s, state_base* b) : state(::beman::execution::connect(std::forward(s), receiver{b})) {} + void start() override { ::beman::execution::start(state); } + }; + ::beman::task::detail::poly state; + template <::beman::execution::sender S> + inner_state(S&& s, state_base* b) : state(static_cast*>(nullptr), std::forward(s), b) {} + void start() { this->state->start(); } + }; + + template <::beman::execution::receiver Receiver> + struct state : state_base { + using operation_state_concept = ::beman::execution::operation_state_t; + std::remove_cvref_t receiver; + inner_state s; + + template <::beman::execution::receiver R, typename PS> + state(R&& r, PS& ps) : receiver(std::forward(r)), s(ps->connect(this)) {} + void start() & noexcept { this->s.start(); } + void complete_value() override { ::beman::execution::set_value(std::move(this->receiver)); } + }; + + class sender; + class env { + friend class sender; + + private: + const sender* sndr; + env(const sender* s) : sndr(s) {} + + public: + task_scheduler + query(const ::beman::execution::get_completion_scheduler_t<::beman::execution::set_value_t>&) const noexcept { + return this->sndr->inner_sender->get_completion_scheduler(); + } + }; + + // sender implementation + class sender { + friend class env; + + private: + struct base { + virtual ~base() = default; + virtual base* move(void*) = 0; + virtual base* clone(void*) const = 0; + virtual inner_state connect(state_base*) = 0; + virtual task_scheduler get_completion_scheduler() const = 0; + }; + template <::beman::execution::scheduler Scheduler> + struct concrete : base { + using sender_t = decltype(::beman::execution::schedule(std::declval())); + sender_t sender; + + template <::beman::execution::scheduler S> + concrete(S&& s) : sender(::beman::execution::schedule(std::forward(s))) {} + base* move(void* buffer) override { return new (buffer) concrete(std::move(*this)); } + base* clone(void* buffer) const override { return new (buffer) concrete(*this); } + inner_state connect(state_base* b) override { return inner_state(::std::move(sender), b); } + task_scheduler get_completion_scheduler() const override { + return task_scheduler(::beman::execution::get_completion_scheduler<::beman::execution::set_value_t>( + ::beman::execution::get_env(this->sender))); + } + }; + poly inner_sender; + + public: + using sender_concept = ::beman::execution::sender_t; + using completion_signatures = ::beman::execution::completion_signatures<::beman::execution::set_value_t()>; + + template <::beman::execution::scheduler S> + explicit sender(S&& s) : inner_sender(static_cast*>(nullptr), std::forward(s)) {} + sender(sender&&) = default; + sender(const sender&) = default; + + template <::beman::execution::receiver R> + state connect(R&& r) { + return state(std::forward(r), this->inner_sender); + } + + env get_env() const noexcept { return env(this); } + }; + + // scheduler implementation + struct base { + virtual ~base() = default; + virtual sender schedule() = 0; + virtual base* move(void* buffer) = 0; + virtual base* clone(void*) const = 0; + virtual bool equals(const base*) const = 0; + }; + template <::beman::execution::scheduler Scheduler> + struct concrete : base { + Scheduler scheduler; + template + requires ::beman::execution::scheduler<::std::remove_cvref_t> + explicit concrete(S&& s) : scheduler(std::forward(s)) {} + sender schedule() override { return sender(this->scheduler); } + base* move(void* buffer) override { return new (buffer) concrete(std::move(*this)); } + base* clone(void* buffer) const override { return new (buffer) concrete(*this); } + bool equals(const base* o) const override { + auto other{dynamic_cast(o)}; + return other ? this->scheduler == other->scheduler : false; + } + }; + + poly scheduler; + + public: + using scheduler_concept = ::beman::execution::scheduler_t; + + template > + requires(not std::same_as>) && + ::beman::execution::scheduler<::std::remove_cvref_t> + explicit task_scheduler(S&& s, Allocator = {}) + : scheduler(static_cast>*>(nullptr), std::forward(s)) {} + task_scheduler(task_scheduler&&) = default; + task_scheduler(const task_scheduler&) = default; + template + task_scheduler(const task_scheduler& other, Allocator) : scheduler(other.scheduler) {} + task_scheduler& operator=(const task_scheduler&) = default; + ~task_scheduler() = default; + + sender schedule() { return this->scheduler->schedule(); } + bool operator==(const task_scheduler&) const = default; + template + requires(not ::std::same_as) && ::beman::execution::scheduler + bool operator==(const Sched& other [[maybe_unused]]) const { + return *this == task_scheduler(other); + } +}; +static_assert(::beman::execution::scheduler); + +} // namespace beman::task::detail + +// ---------------------------------------------------------------------------- + +#endif diff --git a/tests/beman/task/affine_on.test.cpp b/tests/beman/task/affine_on.test.cpp index d39716e..8161b32 100644 --- a/tests/beman/task/affine_on.test.cpp +++ b/tests/beman/task/affine_on.test.cpp @@ -1,111 +1,113 @@ - // tests/beman/task/affine_on.test.cpp -*-C++-*- - // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception - - #include - #include - #include - #include - #ifdef NDEBUG - #undef NDEBUG - #endif - #include - - namespace ex = beman::execution; - - // ---------------------------------------------------------------------------- - - namespace { - template - struct receiver { - using receiver_concept = ex::receiver_t; - - std::remove_cvref_t scheduler; - auto get_env() const noexcept { return ex::detail::make_env(ex::get_scheduler, scheduler); } - auto set_value(auto&&...) && noexcept -> void {} - }; - template - receiver(Scheduler&& sched) -> receiver; - - template - struct test_scheduler { - using scheduler_concept = ex::scheduler_t; - struct sender { - using sender_concept = ex::sender_t; - template - auto get_completion_signatures(Env&& env) const noexcept { - if constexpr (ex::unstoppable_token) - return ex::completion_signatures{}; - else - return ex::completion_signatures{}; - } - struct env { - template - auto query(ex::get_completion_scheduler_t) const noexcept -> test_scheduler { - return {}; - } - }; - auto get_env() const noexcept { return env{}; } - - template - struct op_state { - using operation_state_concept = ex::operation_state_t; - std::remove_cvref_t rcvr; - void start() & noexcept { - static_assert(ex::operation_state); - ex::set_value(std::move(this->rcvr)); - } - }; - template - auto connect(Rcvr&& rcvr) noexcept { - static_assert(ex::sender); - return op_state{std::forward(rcvr)}; - } - }; - - auto schedule() noexcept -> sender { - static_assert(ex::scheduler>); - return {}; - } - auto operator==(const test_scheduler&) const -> bool = default; - }; - static_assert(ex::scheduler>); - - static_assert(ex::receiver>); - static_assert(ex::receiver>>); - - template - auto test_affine_on_accepted_scheduler(Receiver rcvr) { - if constexpr (requires { ex::connect(beman::task::affine_on(ex::just()), std::move(rcvr)); }) { - auto state(ex::connect(beman::task::affine_on(ex::just()), std::move(rcvr))); - ex::start(state); - } - } - - auto test_affine_on_accepted_scheduler() { - test_affine_on_accepted_scheduler(receiver{beman::task::detail::inline_scheduler{}}); - test_affine_on_accepted_scheduler(receiver{test_scheduler<>{}}); - // test_affine_on_accepted_scheduler(receiver{test_scheduler{}}); - } - - } // namespace - - int main() { - beman::task::detail::single_thread_context context; - - auto main_id{std::this_thread::get_id()}; - auto [thread_id]{ - ex::sync_wait(ex::schedule(context.get_scheduler()) | ex::then([] { return std::this_thread::get_id(); })) - .value_or(std::tuple{std::thread::id{}})}; - - assert(main_id != thread_id); - - [[maybe_unused]] auto s(beman::task::affine_on(ex::just(42))); - static_assert(ex::sender); - [[maybe_unused]] auto st(ex::connect(std::move(s), receiver{context.get_scheduler()})); - ex::sync_wait( - beman::task::affine_on(ex::starts_on(context.get_scheduler(), ex::just(42)) | ex::then([thread_id](int value) { - assert(thread_id == std::this_thread::get_id()); - assert(value == 42); - })) | - ex::then([thread_id]() { assert(thread_id != std::this_thread::get_id()); })); - } +// tests/beman/task/affine_on.test.cpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#include +#include +#include +#include +#ifdef NDEBUG +#undef NDEBUG +#endif +#include + +namespace ex = beman::execution; + +// ---------------------------------------------------------------------------- + +namespace { +template +struct receiver { + using receiver_concept = ex::receiver_t; + + std::remove_cvref_t scheduler; + auto get_env() const noexcept { return ex::detail::make_env(ex::get_scheduler, scheduler); } + auto set_value(auto&&...) && noexcept -> void {} +}; + +template +receiver(Scheduler&& sched) -> receiver; + +template +struct test_scheduler { + using scheduler_concept = ex::scheduler_t; + struct sender { + using sender_concept = ex::sender_t; + template + auto get_completion_signatures(Env&& env) const noexcept { + if constexpr (ex::unstoppable_token) + return ex::completion_signatures{}; + else + return ex::completion_signatures{}; + } + struct env { + template + auto query(ex::get_completion_scheduler_t) const noexcept -> test_scheduler { + return {}; + } + }; + auto get_env() const noexcept { return env{}; } + + template + struct op_state { + using operation_state_concept = ex::operation_state_t; + std::remove_cvref_t rcvr; + void start() & noexcept { + static_assert(ex::operation_state); + ex::set_value(std::move(this->rcvr)); + } + }; + template + auto connect(Rcvr&& rcvr) noexcept { + static_assert(ex::sender); + return op_state{std::forward(rcvr)}; + } + }; + + auto schedule() noexcept -> sender { + static_assert(ex::scheduler>); + return {}; + } + auto operator==(const test_scheduler&) const -> bool = default; +}; +static_assert(ex::scheduler>); + +static_assert(ex::receiver>); +static_assert(ex::receiver>>); + +template +auto test_affine_on_accepted_scheduler(Receiver rcvr) { + if constexpr (requires { ex::connect(beman::task::affine_on(ex::just()), std::move(rcvr)); }) { + auto state(ex::connect(beman::task::affine_on(ex::just()), std::move(rcvr))); + ex::start(state); + } +} + +auto test_affine_on_accepted_scheduler() { + test_affine_on_accepted_scheduler(receiver{beman::task::detail::inline_scheduler{}}); + test_affine_on_accepted_scheduler(receiver{test_scheduler<>{}}); + // test_affine_on_accepted_scheduler(receiver{test_scheduler{}}); +} + +} // namespace + +int main() { + beman::task::detail::single_thread_context context; + + auto main_id{std::this_thread::get_id()}; + auto [thread_id]{ + ex::sync_wait(ex::schedule(context.get_scheduler()) | ex::then([] { return std::this_thread::get_id(); })) + .value_or(std::tuple{std::thread::id{}})}; + + assert(main_id != thread_id); + + [[maybe_unused]] auto s(beman::task::affine_on(ex::just(42))); + static_assert(ex::sender); + [[maybe_unused]] auto st(ex::connect(std::move(s), receiver{context.get_scheduler()})); + ex::sync_wait( + beman::task::affine_on(ex::starts_on(context.get_scheduler(), ex::just(42)) | ex::then([thread_id](int value) { + assert(thread_id == std::this_thread::get_id()); + assert(value == 42); + })) | + ex::then([thread_id]() { assert(thread_id != std::this_thread::get_id()); })); + test_affine_on_accepted_scheduler(); +} diff --git a/tests/beman/task/task_scheduler.test.cpp b/tests/beman/task/task_scheduler.test.cpp index ca06d36..20914e1 100644 --- a/tests/beman/task/task_scheduler.test.cpp +++ b/tests/beman/task/task_scheduler.test.cpp @@ -1,261 +1,261 @@ - // tests/beman/task/task_scheduler.test.cpp -*-C++-*- - // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception - - #include - #include - #include - #include - #include - #include - #include - #include - #include - #include - #include - #ifdef NDEBUG - #undef NDEBUG - #endif - #include - - namespace ex = beman::execution; - namespace ly = beman::task; - - // ---------------------------------------------------------------------------- - - namespace { - - void unexpected_call_assert(const char* msg) { assert(nullptr == msg); } - - struct thread_context { - enum class complete : char { success, failure, exception, never }; - struct base { - base* next{}; - base() = default; - base(base&&) = delete; - base(const base&) = delete; - virtual ~base() = default; - base& operator=(base&&) = delete; - base& operator=(const base&) = delete; - virtual void complete() = 0; - }; - - std::mutex mutex; - std::condition_variable condition; - bool done{false}; - base* work{}; - std::thread thread; - - base* get_work() { - std::unique_lock cerberus(this->mutex); - condition.wait(cerberus, [this] { return this->done || this->work; }); - base* rc{this->work}; - if (rc) { - this->work = rc->next; - } - return rc; - } - void enqueue(base* w) { - { - std::lock_guard cerberus(this->mutex); - w->next = std::exchange(this->work, w); - } - this->condition.notify_one(); - } - - thread_context() - : thread([this] { - while (auto w{this->get_work()}) { - w->complete(); - } - }) {} - thread_context(thread_context&&) = delete; - thread_context(const thread_context&) = delete; - ~thread_context() { - this->stop(); - this->thread.join(); - } - thread_context& operator=(thread_context&&) = delete; - thread_context& operator=(const thread_context&) = delete; - - struct scheduler { - using scheduler_concept = ex::scheduler_t; - thread_context* context; - complete cmpl{complete::success}; - bool operator==(const scheduler&) const = default; - - template - struct state : base { - struct stopper { - state* st; - void operator()() noexcept { - auto self{this->st}; - self->callback.reset(); - ex::set_stopped(std::move(self->receiver)); - } - }; - using operation_state_concept = ex::operation_state_t; - using token_t = decltype(ex::get_stop_token(ex::get_env(std::declval()))); - using callback_t = ex::stop_callback_for_t; - - thread_context* ctxt; - std::remove_cvref_t receiver; - thread_context::complete cmpl; - std::optional callback; - - template - state(auto c, R&& r, thread_context::complete cm) : ctxt(c), receiver(std::forward(r)), cmpl(cm) {} - void start() & noexcept { - callback.emplace(ex::get_stop_token(ex::get_env(this->receiver)), stopper{this}); - if (cmpl != thread_context::complete::never) - this->ctxt->enqueue(this); - } - void complete() override { - this->callback.reset(); - ex::set_value(std::move(this->receiver)); - } - }; - struct env { - thread_context* ctxt; - scheduler query(const ex::get_completion_scheduler_t&) const noexcept { - return scheduler{ctxt}; - } - }; - struct sender { - using sender_concept = ex::sender_t; - using completion_signatures = ex::completion_signatures; - - thread_context* ctxt; - thread_context::complete cmpl; - - template - auto connect(Receiver&& receiver) { - static_assert(ex::operation_state>); - return state(this->ctxt, std::forward(receiver), this->cmpl); - } - env get_env() const noexcept { return {this->ctxt}; } - }; - static_assert(ex::sender); - - sender schedule() noexcept { return sender{this->context, this->cmpl}; } - }; - static_assert(ex::scheduler); - - scheduler get_scheduler(complete cmpl = complete::success) { return scheduler{this, cmpl}; } - void stop() { - { - std::lock_guard cerberus(this->mutex); - this->done = true; - } - this->condition.notify_one(); - } - }; - - enum class stop_result : char { none, success, failure, stopped }; - template - struct stop_env { - Token token; - auto query(ex::get_stop_token_t) const noexcept { return this->token; } - }; - template - stop_env(Token&&) -> stop_env>; - - template - struct stop_receiver { - using receiver_concept = ex::receiver_t; - Token token; - stop_result& result; - std::latch* completed{}; - auto get_env() const noexcept { return stop_env{this->token}; } - - void set_value(auto&&...) && noexcept { - this->result = stop_result::success; - if (this->completed) - this->completed->count_down(); - } - void set_error(auto&&) && noexcept { - this->result = stop_result::failure; - if (this->completed) - this->completed->count_down(); - } - void set_stopped() && noexcept { - this->result = stop_result::stopped; - if (this->completed) - this->completed->count_down(); - } - }; - template - stop_receiver(Token&&, stop_result&, std::latch* = nullptr) -> stop_receiver>; - static_assert(ex::receiver>); - - } // namespace - - // ---------------------------------------------------------------------------- - - int main() { - try { - static_assert(ex::scheduler); - - thread_context ctxt1; - thread_context ctxt2; - - assert(ctxt1.get_scheduler() == ctxt1.get_scheduler()); - assert(ctxt2.get_scheduler() == ctxt2.get_scheduler()); - assert(ctxt1.get_scheduler() != ctxt2.get_scheduler()); - - ly::detail::task_scheduler sched1(ctxt1.get_scheduler()); - ly::detail::task_scheduler sched2(ctxt2.get_scheduler()); - assert(sched1 == sched1); - assert(sched2 == sched2); - assert(sched1 != sched2); - - ly::detail::task_scheduler copy(sched1); - assert(copy == sched1); - assert(copy != sched2); - ly::detail::task_scheduler move(std::move(copy)); - assert(move == sched1); - assert(move != sched2); - - copy = sched2; - assert(copy == sched2); - assert(copy != sched1); - - move = std::move(copy); - assert(move == sched2); - assert(move != sched1); - - std::atomic id1{}; - std::atomic id2{}; - ex::sync_wait(ex::schedule(sched1) | ex::then([&id1]() { id1 = std::this_thread::get_id(); })); - ex::sync_wait(ex::schedule(sched2) | ex::then([&id2]() { id2 = std::this_thread::get_id(); })); - assert(id1 != id2); - ex::sync_wait(ex::schedule(ly::detail::task_scheduler(sched1)) | - ex::then([&id1]() { assert(id1 == std::this_thread::get_id()); })); - ex::sync_wait(ex::schedule(ly::detail::task_scheduler(sched2)) | - ex::then([&id2]() { assert(id2 == std::this_thread::get_id()); })); - - { - ex::inplace_stop_source source; - stop_result result{stop_result::none}; - auto state{ex::connect(ex::schedule(ctxt1.get_scheduler(thread_context::complete::never)), - stop_receiver{source.get_token(), result})}; - assert(result == stop_result::none); - ex::start(state); - assert(result == stop_result::none); - source.request_stop(); - assert(result == stop_result::stopped); - } - { - std::latch completed{1}; - stop_result result{stop_result::none}; - auto state{ex::connect( - ex::schedule(ly::detail::task_scheduler(ctxt1.get_scheduler(thread_context::complete::success))), - stop_receiver{ex::never_stop_token(), result, &completed})}; - assert(result == stop_result::none); - ex::start(state); - completed.wait(); - assert(result == stop_result::success); - } - } catch (...) { - unexpected_call_assert("no exception should escape to main"); - } - } +// tests/beman/task/task_scheduler.test.cpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#ifdef NDEBUG +#undef NDEBUG +#endif +#include + +namespace ex = beman::execution; +namespace ly = beman::task; + +// ---------------------------------------------------------------------------- + +namespace { + +void unexpected_call_assert(const char* msg) { assert(nullptr == msg); } + +struct thread_context { + enum class complete : char { success, failure, exception, never }; + struct base { + base* next{}; + base() = default; + base(base&&) = delete; + base(const base&) = delete; + virtual ~base() = default; + base& operator=(base&&) = delete; + base& operator=(const base&) = delete; + virtual void complete() = 0; + }; + + std::mutex mutex; + std::condition_variable condition; + bool done{false}; + base* work{}; + std::thread thread; + + base* get_work() { + std::unique_lock cerberus(this->mutex); + condition.wait(cerberus, [this] { return this->done || this->work; }); + base* rc{this->work}; + if (rc) { + this->work = rc->next; + } + return rc; + } + void enqueue(base* w) { + { + std::lock_guard cerberus(this->mutex); + w->next = std::exchange(this->work, w); + } + this->condition.notify_one(); + } + + thread_context() + : thread([this] { + while (auto w{this->get_work()}) { + w->complete(); + } + }) {} + thread_context(thread_context&&) = delete; + thread_context(const thread_context&) = delete; + ~thread_context() { + this->stop(); + this->thread.join(); + } + thread_context& operator=(thread_context&&) = delete; + thread_context& operator=(const thread_context&) = delete; + + struct scheduler { + using scheduler_concept = ex::scheduler_t; + thread_context* context; + complete cmpl{complete::success}; + bool operator==(const scheduler&) const = default; + + template + struct state : base { + struct stopper { + state* st; + void operator()() noexcept { + auto self{this->st}; + self->callback.reset(); + ex::set_stopped(std::move(self->receiver)); + } + }; + using operation_state_concept = ex::operation_state_t; + using token_t = decltype(ex::get_stop_token(ex::get_env(std::declval()))); + using callback_t = ex::stop_callback_for_t; + + thread_context* ctxt; + std::remove_cvref_t receiver; + thread_context::complete cmpl; + std::optional callback; + + template + state(auto c, R&& r, thread_context::complete cm) : ctxt(c), receiver(std::forward(r)), cmpl(cm) {} + void start() & noexcept { + callback.emplace(ex::get_stop_token(ex::get_env(this->receiver)), stopper{this}); + if (cmpl != thread_context::complete::never) + this->ctxt->enqueue(this); + } + void complete() override { + this->callback.reset(); + ex::set_value(std::move(this->receiver)); + } + }; + struct env { + thread_context* ctxt; + scheduler query(const ex::get_completion_scheduler_t&) const noexcept { + return scheduler{ctxt}; + } + }; + struct sender { + using sender_concept = ex::sender_t; + using completion_signatures = ex::completion_signatures; + + thread_context* ctxt; + thread_context::complete cmpl; + + template + auto connect(Receiver&& receiver) { + static_assert(ex::operation_state>); + return state(this->ctxt, std::forward(receiver), this->cmpl); + } + env get_env() const noexcept { return {this->ctxt}; } + }; + static_assert(ex::sender); + + sender schedule() noexcept { return sender{this->context, this->cmpl}; } + }; + static_assert(ex::scheduler); + + scheduler get_scheduler(complete cmpl = complete::success) { return scheduler{this, cmpl}; } + void stop() { + { + std::lock_guard cerberus(this->mutex); + this->done = true; + } + this->condition.notify_one(); + } +}; + +enum class stop_result : char { none, success, failure, stopped }; +template +struct stop_env { + Token token; + auto query(ex::get_stop_token_t) const noexcept { return this->token; } +}; +template +stop_env(Token&&) -> stop_env>; + +template +struct stop_receiver { + using receiver_concept = ex::receiver_t; + Token token; + stop_result& result; + std::latch* completed{}; + auto get_env() const noexcept { return stop_env{this->token}; } + + void set_value(auto&&...) && noexcept { + this->result = stop_result::success; + if (this->completed) + this->completed->count_down(); + } + void set_error(auto&&) && noexcept { + this->result = stop_result::failure; + if (this->completed) + this->completed->count_down(); + } + void set_stopped() && noexcept { + this->result = stop_result::stopped; + if (this->completed) + this->completed->count_down(); + } +}; +template +stop_receiver(Token&&, stop_result&, std::latch* = nullptr) -> stop_receiver>; +static_assert(ex::receiver>); + +} // namespace + +// ---------------------------------------------------------------------------- + +int main() { + try { + static_assert(ex::scheduler); + + thread_context ctxt1; + thread_context ctxt2; + + assert(ctxt1.get_scheduler() == ctxt1.get_scheduler()); + assert(ctxt2.get_scheduler() == ctxt2.get_scheduler()); + assert(ctxt1.get_scheduler() != ctxt2.get_scheduler()); + + ly::detail::task_scheduler sched1(ctxt1.get_scheduler()); + ly::detail::task_scheduler sched2(ctxt2.get_scheduler()); + assert(sched1 == sched1); + assert(sched2 == sched2); + assert(sched1 != sched2); + + ly::detail::task_scheduler copy(sched1); + assert(copy == sched1); + assert(copy != sched2); + ly::detail::task_scheduler move(std::move(copy)); + assert(move == sched1); + assert(move != sched2); + + copy = sched2; + assert(copy == sched2); + assert(copy != sched1); + + move = std::move(copy); + assert(move == sched2); + assert(move != sched1); + + std::atomic id1{}; + std::atomic id2{}; + ex::sync_wait(ex::schedule(sched1) | ex::then([&id1]() { id1 = std::this_thread::get_id(); })); + ex::sync_wait(ex::schedule(sched2) | ex::then([&id2]() { id2 = std::this_thread::get_id(); })); + assert(id1 != id2); + ex::sync_wait(ex::schedule(ly::detail::task_scheduler(sched1)) | + ex::then([&id1]() { assert(id1 == std::this_thread::get_id()); })); + ex::sync_wait(ex::schedule(ly::detail::task_scheduler(sched2)) | + ex::then([&id2]() { assert(id2 == std::this_thread::get_id()); })); + + { + ex::inplace_stop_source source; + stop_result result{stop_result::none}; + auto state{ex::connect(ex::schedule(ctxt1.get_scheduler(thread_context::complete::never)), + stop_receiver{source.get_token(), result})}; + assert(result == stop_result::none); + ex::start(state); + assert(result == stop_result::none); + source.request_stop(); + assert(result == stop_result::stopped); + } + { + std::latch completed{1}; + stop_result result{stop_result::none}; + auto state{ex::connect( + ex::schedule(ly::detail::task_scheduler(ctxt1.get_scheduler(thread_context::complete::success))), + stop_receiver{ex::never_stop_token(), result, &completed})}; + assert(result == stop_result::none); + ex::start(state); + completed.wait(); + assert(result == stop_result::success); + } + } catch (...) { + unexpected_call_assert("no exception should escape to main"); + } +} From 3064acf279949594800004544b8f76e3eb781e21 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Sat, 10 Jan 2026 20:17:07 +0000 Subject: [PATCH 11/11] add another formatting fix --- tests/beman/task/task_scheduler.test.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/beman/task/task_scheduler.test.cpp b/tests/beman/task/task_scheduler.test.cpp index 20914e1..31e4a86 100644 --- a/tests/beman/task/task_scheduler.test.cpp +++ b/tests/beman/task/task_scheduler.test.cpp @@ -121,7 +121,7 @@ struct thread_context { } }; struct sender { - using sender_concept = ex::sender_t; + using sender_concept = ex::sender_t; using completion_signatures = ex::completion_signatures; thread_context* ctxt;