From 545f4abe445d70363f7afbf97f85d2dc23531335 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Tue, 2 Dec 2025 23:24:11 +0000 Subject: [PATCH 01/16] enhanced run_loop to have an infallible scheduler --- include/beman/execution/detail/run_loop.hpp | 16 ++++++++++++++++ .../beman/execution/exec-run-loop-types.test.cpp | 12 ++++++++---- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/include/beman/execution/detail/run_loop.hpp b/include/beman/execution/detail/run_loop.hpp index d2c05993..28c1f73d 100644 --- a/include/beman/execution/detail/run_loop.hpp +++ b/include/beman/execution/detail/run_loop.hpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -70,10 +71,25 @@ class run_loop { }; struct sender { using sender_concept = ::beman::execution::sender_t; +#if 0 using completion_signatures = ::beman::execution::completion_signatures<::beman::execution::set_value_t(), ::beman::execution::set_error_t(::std::exception_ptr), ::beman::execution::set_stopped_t()>; +#else + template + auto get_completion_signatures(Env&& env) const noexcept { + if constexpr (::beman::execution::unstoppable_token) + return ::beman::execution::completion_signatures< + ::beman::execution::set_value_t() + >{}; + else + return ::beman::execution::completion_signatures< + ::beman::execution::set_value_t(), + ::beman::execution::set_stopped_t() + >{}; + } +#endif run_loop* loop; diff --git a/tests/beman/execution/exec-run-loop-types.test.cpp b/tests/beman/execution/exec-run-loop-types.test.cpp index c72515e8..d5c44242 100644 --- a/tests/beman/execution/exec-run-loop-types.test.cpp +++ b/tests/beman/execution/exec-run-loop-types.test.cpp @@ -80,11 +80,15 @@ TEST(exec_run_loop_types) { // p5: auto sender{test_std::schedule(scheduler)}; struct env {}; - static_assert(::std::same_as, + test_std::inplace_stop_source source{}; + struct token_env { + test_std::inplace_stop_token token; + auto query(const test_std::get_stop_token_t&) const noexcept { return this->token; } + }; + static_assert(::std::same_as, decltype(test_std::get_completion_signatures(sender, env{}))>); - + static_assert(::std::same_as, + decltype(test_std::get_completion_signatures(sender, token_env{source.get_token()}))>); // p7: static_assert(test_std::receiver_of); // p7.1: From c3a40ba891a0c302e9b0d5aa30f34dcdfb837eaf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Tue, 2 Dec 2025 23:24:49 +0000 Subject: [PATCH 02/16] clang format --- include/beman/execution/detail/run_loop.hpp | 10 +++------- tests/beman/execution/exec-run-loop-types.test.cpp | 7 ++++--- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/include/beman/execution/detail/run_loop.hpp b/include/beman/execution/detail/run_loop.hpp index 28c1f73d..1267f512 100644 --- a/include/beman/execution/detail/run_loop.hpp +++ b/include/beman/execution/detail/run_loop.hpp @@ -80,14 +80,10 @@ class run_loop { template auto get_completion_signatures(Env&& env) const noexcept { if constexpr (::beman::execution::unstoppable_token) - return ::beman::execution::completion_signatures< - ::beman::execution::set_value_t() - >{}; + return ::beman::execution::completion_signatures<::beman::execution::set_value_t()>{}; else - return ::beman::execution::completion_signatures< - ::beman::execution::set_value_t(), - ::beman::execution::set_stopped_t() - >{}; + return ::beman::execution::completion_signatures<::beman::execution::set_value_t(), + ::beman::execution::set_stopped_t()>{}; } #endif diff --git a/tests/beman/execution/exec-run-loop-types.test.cpp b/tests/beman/execution/exec-run-loop-types.test.cpp index d5c44242..740a8cc8 100644 --- a/tests/beman/execution/exec-run-loop-types.test.cpp +++ b/tests/beman/execution/exec-run-loop-types.test.cpp @@ -83,12 +83,13 @@ TEST(exec_run_loop_types) { test_std::inplace_stop_source source{}; struct token_env { test_std::inplace_stop_token token; - auto query(const test_std::get_stop_token_t&) const noexcept { return this->token; } + auto query(const test_std::get_stop_token_t&) const noexcept { return this->token; } }; static_assert(::std::same_as, decltype(test_std::get_completion_signatures(sender, env{}))>); - static_assert(::std::same_as, - decltype(test_std::get_completion_signatures(sender, token_env{source.get_token()}))>); + static_assert( + ::std::same_as, + decltype(test_std::get_completion_signatures(sender, token_env{source.get_token()}))>); // p7: static_assert(test_std::receiver_of); // p7.1: From 838d5da322448bfd57266e3e2e189074022a6105 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Tue, 2 Dec 2025 23:36:45 +0000 Subject: [PATCH 03/16] don't use set_stopped for unstoppable tokens --- include/beman/execution/detail/run_loop.hpp | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/include/beman/execution/detail/run_loop.hpp b/include/beman/execution/detail/run_loop.hpp index 1267f512..ac26ecc2 100644 --- a/include/beman/execution/detail/run_loop.hpp +++ b/include/beman/execution/detail/run_loop.hpp @@ -63,10 +63,16 @@ class run_loop { } // NOLINTEND(misc-no-recursion) auto execute() noexcept -> void override { + using token = decltype(::beman::execution::get_stop_token(::beman::execution::get_env(this->receiver))); + if constexpr (not ::beman::execution::unstoppable_token) + { if (::beman::execution::get_stop_token(::beman::execution::get_env(this->receiver)).stop_requested()) ::beman::execution::set_stopped(::std::move(this->receiver)); else ::beman::execution::set_value(::std::move(this->receiver)); + } + else + ::beman::execution::set_value(::std::move(this->receiver)); } }; struct sender { From 6ee955aea76245c95aab9fa26f643185b92422a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Wed, 3 Dec 2025 21:12:35 +0000 Subject: [PATCH 04/16] removed remaining uses of set_error in run_loop --- include/beman/execution/detail/run_loop.hpp | 36 +++++++-------------- 1 file changed, 11 insertions(+), 25 deletions(-) diff --git a/include/beman/execution/detail/run_loop.hpp b/include/beman/execution/detail/run_loop.hpp index ac26ecc2..eb960148 100644 --- a/include/beman/execution/detail/run_loop.hpp +++ b/include/beman/execution/detail/run_loop.hpp @@ -12,7 +12,6 @@ #include #include #include -#include #include #include #include @@ -54,35 +53,21 @@ class run_loop { // NOLINTBEGIN(misc-no-recursion) template opstate(run_loop* l, R&& rcvr) : loop(l), receiver(::std::forward(rcvr)) {} - auto start() & noexcept -> void { - try { - this->loop->push_back(this); - } catch (...) { - ::beman::execution::set_error(::std::move(this->receiver), ::std::current_exception()); - } - } + auto start() & noexcept -> void { this->loop->push_back(this); } // NOLINTEND(misc-no-recursion) auto execute() noexcept -> void override { using token = decltype(::beman::execution::get_stop_token(::beman::execution::get_env(this->receiver))); - if constexpr (not ::beman::execution::unstoppable_token) - { - if (::beman::execution::get_stop_token(::beman::execution::get_env(this->receiver)).stop_requested()) - ::beman::execution::set_stopped(::std::move(this->receiver)); - else - ::beman::execution::set_value(::std::move(this->receiver)); - } - else + if constexpr (not ::beman::execution::unstoppable_token) { + if (::beman::execution::get_stop_token(::beman::execution::get_env(this->receiver)).stop_requested()) + ::beman::execution::set_stopped(::std::move(this->receiver)); + else + ::beman::execution::set_value(::std::move(this->receiver)); + } else ::beman::execution::set_value(::std::move(this->receiver)); } }; struct sender { using sender_concept = ::beman::execution::sender_t; -#if 0 - using completion_signatures = - ::beman::execution::completion_signatures<::beman::execution::set_value_t(), - ::beman::execution::set_error_t(::std::exception_ptr), - ::beman::execution::set_stopped_t()>; -#else template auto get_completion_signatures(Env&& env) const noexcept { if constexpr (::beman::execution::unstoppable_token) @@ -91,7 +76,6 @@ class run_loop { return ::beman::execution::completion_signatures<::beman::execution::set_value_t(), ::beman::execution::set_stopped_t()>{}; } -#endif run_loop* loop; @@ -118,7 +102,8 @@ class run_loop { opstate_base* front{}; opstate_base* back{}; - auto push_back(opstate_base* item) -> void { + auto push_back(opstate_base* item) noexcept -> void { + //-dk:TODO run_loop::push_back should really be lock-free ::std::lock_guard guard(this->mutex); if (auto previous_back{::std::exchange(this->back, item)}) { previous_back->next = item; @@ -127,7 +112,8 @@ class run_loop { this->condition.notify_one(); } } - auto pop_front() -> opstate_base* { + auto pop_front() noexcept -> opstate_base* { + //-dk:TODO run_loop::pop_front should really be lock-free ::std::unique_lock guard(this->mutex); this->condition.wait(guard, [this] { return this->front || this->current_state == state::finishing; }); if (this->front == this->back) From f499931a6582a26bff0f6f9fa3ac778ca21e14c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Wed, 3 Dec 2025 22:03:31 +0000 Subject: [PATCH 05/16] try to avoid set_error in let* based on noexcept --- include/beman/execution/detail/let.hpp | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/include/beman/execution/detail/let.hpp b/include/beman/execution/detail/let.hpp index 32d30e30..e8102be4 100644 --- a/include/beman/execution/detail/let.hpp +++ b/include/beman/execution/detail/let.hpp @@ -162,7 +162,12 @@ struct impls_for<::beman::execution::detail::let_t> : ::beman::execu {}}; }}; template - static auto let_bind(auto& state, Receiver& receiver, Args&&... args) { + static auto + let_bind(auto& state, Receiver& receiver, Args&&... args) noexcept(noexcept(::beman::execution::connect( + ::std::apply(::std::move(state.fun), + ::std::move(state.args.template emplace<::beman::execution::detail::decayed_tuple>( + ::std::forward(args)...))), + let_receiver{receiver, state.env}))) { using args_t = ::beman::execution::detail::decayed_tuple; auto mkop{[&] { return ::beman::execution::connect( @@ -179,7 +184,8 @@ struct impls_for<::beman::execution::detail::let_t> : ::beman::execu try { let_bind(state, receiver, ::std::forward(args)...); } catch (...) { - ::beman::execution::set_error(::std::move(receiver), ::std::current_exception()); + if constexpr (not noexcept(let_bind(state, receiver, ::std::forward(args)...))) + ::beman::execution::set_error(::std::move(receiver), ::std::current_exception()); } } else { Tag()(::std::move(receiver), ::std::forward(args)...); From e6ec4848566b05703949af8ef7245efdedfc5760 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Mon, 12 Jan 2026 19:00:47 +0000 Subject: [PATCH 06/16] fix run_loop's use of empty_env --- include/beman/execution/detail/run_loop.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/beman/execution/detail/run_loop.hpp b/include/beman/execution/detail/run_loop.hpp index eb960148..4a1f8796 100644 --- a/include/beman/execution/detail/run_loop.hpp +++ b/include/beman/execution/detail/run_loop.hpp @@ -68,7 +68,7 @@ class run_loop { }; struct sender { using sender_concept = ::beman::execution::sender_t; - template + template > auto get_completion_signatures(Env&& env) const noexcept { if constexpr (::beman::execution::unstoppable_token) return ::beman::execution::completion_signatures<::beman::execution::set_value_t()>{}; From 34ccc9943472b474ae24b51c1533a014d5e8e375 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Mon, 12 Jan 2026 20:24:12 +0000 Subject: [PATCH 07/16] implemented a basic affine_on --- include/beman/execution/detail/affine_on.hpp | 55 +++++++++++++++++++ tests/beman/execution/CMakeLists.txt | 1 + tests/beman/execution/exec-affine-on.test.cpp | 48 ++++++++++++++++ 3 files changed, 104 insertions(+) create mode 100644 include/beman/execution/detail/affine_on.hpp create mode 100644 tests/beman/execution/exec-affine-on.test.cpp diff --git a/include/beman/execution/detail/affine_on.hpp b/include/beman/execution/detail/affine_on.hpp new file mode 100644 index 00000000..ee1a4d09 --- /dev/null +++ b/include/beman/execution/detail/affine_on.hpp @@ -0,0 +1,55 @@ +// include/beman/execution/detail/affine_on.hpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#ifndef INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_AFFINE_ON +#define INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_AFFINE_ON + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +// ---------------------------------------------------------------------------- + +namespace beman::execution::detail { + struct affine_on_t: ::beman::execution::sender_adaptor_closure { + template <::beman::execution::sender Sender> + auto operator()(Sender&& sender) const { + return ::beman::execution::detail::transform_sender( + ::beman::execution::detail::get_domain_early(sender), + ::beman::execution::detail::make_sender( + *this, ::beman::execution::env<>{}, ::std::forward(sender))); + } + auto operator()() const { + return ::beman::execution::detail::sender_adaptor{*this}; + } + + template <::beman::execution::sender Sender, typename Env> + static auto transform_sender(Sender&& sender, Env const& env) { + auto&[tag, data, child] = sender; + return beman::execution::schedule_from( + ::beman::execution::get_scheduler(env), + ::beman::execution::detail::forward_like(child) + ); + } + }; +} + +namespace beman::execution { + using beman::execution::detail::affine_on_t; + inline constexpr affine_on_t affine_on{}; +} + +// ---------------------------------------------------------------------------- + +#endif diff --git a/tests/beman/execution/CMakeLists.txt b/tests/beman/execution/CMakeLists.txt index 01511154..4f4fefc7 100644 --- a/tests/beman/execution/CMakeLists.txt +++ b/tests/beman/execution/CMakeLists.txt @@ -20,6 +20,7 @@ endif() list( APPEND execution_tests + exec-affine-on.test issue-174.test issue-186.test exec-scope-counting.test diff --git a/tests/beman/execution/exec-affine-on.test.cpp b/tests/beman/execution/exec-affine-on.test.cpp new file mode 100644 index 00000000..7b7d70fa --- /dev/null +++ b/tests/beman/execution/exec-affine-on.test.cpp @@ -0,0 +1,48 @@ +// tests/beman/execution/exec-affine-on.test.cpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +// ---------------------------------------------------------------------------- + +namespace { + template + struct receiver { + using receiver_concept = test_std::receiver_t; + Sched scheduler_; + auto set_value(auto&&...) && noexcept -> void {} + auto set_error(auto&&) && noexcept -> void {} + auto set_stopped() && noexcept -> void {} + + auto get_env() const noexcept { return test_std::env{test_std::prop(test_std::get_scheduler, this->scheduler_)}; } + }; + template + receiver(Sched) -> receiver; +} + +auto main() -> int { + static_assert(test_std::sender); + static_assert(test_std::sender); + + //static_assert(test_std::sender_in>); + + test_std::run_loop loop; + auto r{receiver(loop.get_scheduler())}; + static_assert(test_std::receiver); + auto s{test_std::get_scheduler(test_std::get_env(r))}; + assert(s == loop.get_scheduler()); + auto st{test_std::transform_sender(test_std::default_domain(), test_std::affine_on(test_std::just(42)), test_std::get_env(r))}; + test_std::connect(std::move(st), std::move(r)); + auto s0{test_std::connect(test_std::affine_on(test_std::just(42)), receiver(loop.get_scheduler()))}; + return 0; +} \ No newline at end of file From b4db2bb1442b3c2465e984fff14e37974acb2187 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Mon, 12 Jan 2026 21:55:27 +0000 Subject: [PATCH 08/16] added the various constraints and customization to affine_on --- docs/overview.md | 11 +++ include/beman/execution/detail/affine_on.hpp | 96 ++++++++++++------- include/beman/execution/execution.hpp | 1 + tests/beman/execution/exec-affine-on.test.cpp | 33 +++---- 4 files changed, 93 insertions(+), 48 deletions(-) diff --git a/docs/overview.md b/docs/overview.md index e226623f..ffef8475 100644 --- a/docs/overview.md +++ b/docs/overview.md @@ -639,6 +639,13 @@ The expression schedule(scheduler) creates a sender which up ### Sender Adaptors The sender adaptors take one or more senders and adapt their respective behavior to complete with a corresponding result. The description uses the informal function completions-of(sender) to represent the completion signatures which sender produces. Also, completion signatures are combined using +: the result is the deduplicated set of the combined completion signatures. +
+affine_on(sender) -> sender-of<completions-of(sender)> +The expression affine_on(sender) creates +a sender which completes on the same scheduler it was started on, even if sender changes the scheduler. The scheduler to resume on is determined using get_scheduler(get_env(rcvr)) where rcvr is the reciver the sender is connected to. + +The primary use of affine_on is implementing scheduler affinity for task. +
`bulk`
@@ -698,6 +705,10 @@ The expression into_variant(sender) creates a sender which t
when_all_with_variant(sender...) -> sender
+
+write_env(sender, env) -> sender +
+ ### Sender Consumers diff --git a/include/beman/execution/detail/affine_on.hpp b/include/beman/execution/detail/affine_on.hpp index ee1a4d09..2a8f9c71 100644 --- a/include/beman/execution/detail/affine_on.hpp +++ b/include/beman/execution/detail/affine_on.hpp @@ -4,51 +4,83 @@ #ifndef INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_AFFINE_ON #define INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_AFFINE_ON -#include -#include -#include -#include #include +#include +#include +#include #include +#include +#include +#include #include -#include +#include +#include +#include +#include +#include #include -#include -#include -#include -#include -#include +#include +#include +#include +#include + +#include +#include // ---------------------------------------------------------------------------- namespace beman::execution::detail { - struct affine_on_t: ::beman::execution::sender_adaptor_closure { - template <::beman::execution::sender Sender> - auto operator()(Sender&& sender) const { - return ::beman::execution::detail::transform_sender( - ::beman::execution::detail::get_domain_early(sender), - ::beman::execution::detail::make_sender( - *this, ::beman::execution::env<>{}, ::std::forward(sender))); - } - auto operator()() const { - return ::beman::execution::detail::sender_adaptor{*this}; +struct affine_on_t : ::beman::execution::sender_adaptor_closure { + template <::beman::execution::sender Sender> + auto operator()(Sender&& sender) const { + return ::beman::execution::detail::transform_sender( + ::beman::execution::detail::get_domain_early(sender), + ::beman::execution::detail::make_sender( + *this, ::beman::execution::env<>{}, ::std::forward(sender))); + } + auto operator()() const { return ::beman::execution::detail::sender_adaptor{*this}; } + + template <::beman::execution::sender Sender, typename Env> + requires requires(const Env& env) { + { ::beman::execution::get_scheduler(env) } -> ::beman::execution::scheduler; + { ::beman::execution::schedule(::beman::execution::get_scheduler(env)) } -> ::beman::execution::sender; + { + ::beman::execution::get_completion_signatures( + ::beman::execution::schedule(::beman::execution::get_scheduler(env)), + ::beman::execution::detail::join_env( + ::beman::execution::env{::beman::execution::prop{::beman::execution::get_stop_token, + ::beman::execution::never_stop_token{}}}, + env)) + } -> ::std::same_as<::beman::execution::completion_signatures<::beman::execution::set_value_t()>>; } + static auto transform_sender(Sender&& sender, const Env& env) { + auto& [tag, data, child] = sender; + using child_tag_t = ::beman::execution::tag_of_t<::std::remove_cvref_t>; - template <::beman::execution::sender Sender, typename Env> - static auto transform_sender(Sender&& sender, Env const& env) { - auto&[tag, data, child] = sender; - return beman::execution::schedule_from( - ::beman::execution::get_scheduler(env), - ::beman::execution::detail::forward_like(child) - ); + if constexpr (requires(const child_tag_t& t) { + { + t.affine_on(::beman::execution::detail::forward_like(child), env) + } -> ::beman::execution::sender; + }) { + return child_tag_t{}.affine_on(::beman::execution::detail::forward_like(child), env); + } else { + return ::beman::execution::write_env( + ::beman::execution::schedule_from( + ::beman::execution::get_scheduler(env), + ::beman::execution::write_env(::beman::execution::detail::forward_like(child), env)), + ::beman::execution::detail::join_env( + ::beman::execution::env{::beman::execution::prop{::beman::execution::get_stop_token, + ::beman::execution::never_stop_token{}}}, + env)); } - }; -} + } +}; +} // namespace beman::execution::detail namespace beman::execution { - using beman::execution::detail::affine_on_t; - inline constexpr affine_on_t affine_on{}; -} +using beman::execution::detail::affine_on_t; +inline constexpr affine_on_t affine_on{}; +} // namespace beman::execution // ---------------------------------------------------------------------------- diff --git a/include/beman/execution/execution.hpp b/include/beman/execution/execution.hpp index c5bb7748..b68c68c7 100644 --- a/include/beman/execution/execution.hpp +++ b/include/beman/execution/execution.hpp @@ -38,6 +38,7 @@ #include #include +#include #include #include #include diff --git a/tests/beman/execution/exec-affine-on.test.cpp b/tests/beman/execution/exec-affine-on.test.cpp index 7b7d70fa..7ad737ed 100644 --- a/tests/beman/execution/exec-affine-on.test.cpp +++ b/tests/beman/execution/exec-affine-on.test.cpp @@ -16,33 +16,34 @@ // ---------------------------------------------------------------------------- namespace { - template - struct receiver { - using receiver_concept = test_std::receiver_t; - Sched scheduler_; - auto set_value(auto&&...) && noexcept -> void {} - auto set_error(auto&&) && noexcept -> void {} - auto set_stopped() && noexcept -> void {} +template +struct receiver { + using receiver_concept = test_std::receiver_t; + Sched scheduler_; + auto set_value(auto&&...) && noexcept -> void {} + auto set_error(auto&&) && noexcept -> void {} + auto set_stopped() && noexcept -> void {} - auto get_env() const noexcept { return test_std::env{test_std::prop(test_std::get_scheduler, this->scheduler_)}; } - }; - template - receiver(Sched) -> receiver; -} + auto get_env() const noexcept { return test_std::env{test_std::prop(test_std::get_scheduler, this->scheduler_)}; } +}; +template +receiver(Sched) -> receiver; +} // namespace auto main() -> int { static_assert(test_std::sender); static_assert(test_std::sender); - //static_assert(test_std::sender_in>); + static_assert(not test_std::sender_in>); test_std::run_loop loop; - auto r{receiver(loop.get_scheduler())}; + auto r{receiver(loop.get_scheduler())}; static_assert(test_std::receiver); auto s{test_std::get_scheduler(test_std::get_env(r))}; assert(s == loop.get_scheduler()); - auto st{test_std::transform_sender(test_std::default_domain(), test_std::affine_on(test_std::just(42)), test_std::get_env(r))}; + auto st{test_std::transform_sender( + test_std::default_domain(), test_std::affine_on(test_std::just(42)), test_std::get_env(r))}; test_std::connect(std::move(st), std::move(r)); auto s0{test_std::connect(test_std::affine_on(test_std::just(42)), receiver(loop.get_scheduler()))}; return 0; -} \ No newline at end of file +} From 0aab83044ad22a8426221e2d483919ff234fb24d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Tue, 13 Jan 2026 20:04:11 +0000 Subject: [PATCH 09/16] added some documentation to affine_on --- include/beman/execution/detail/affine_on.hpp | 45 ++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/include/beman/execution/detail/affine_on.hpp b/include/beman/execution/detail/affine_on.hpp index 2a8f9c71..4f2be0b1 100644 --- a/include/beman/execution/detail/affine_on.hpp +++ b/include/beman/execution/detail/affine_on.hpp @@ -30,7 +30,23 @@ // ---------------------------------------------------------------------------- namespace beman::execution::detail { + +/** + * @brief The affine_on_t struct is a sender adaptor closure that transforms a sender + * to complete on the scheduler obtained from the receiver's environment. + * + * This adaptor implements scheduler affinity to adapt a sender to complete on the + * scheduler obtained the receiver's environment. The get_scheduler query is used + * to obtain the scheduler on which the sender gets started. + */ struct affine_on_t : ::beman::execution::sender_adaptor_closure { + /** + * @brief Adapt a sender with affine_on. + * + * @tparam Sender The deduced type of the sender to be transformed. + * @param sender The sender to be transformed. + * @return An adapted sender to complete on the scheduler it was started on. + */ template <::beman::execution::sender Sender> auto operator()(Sender&& sender) const { return ::beman::execution::detail::transform_sender( @@ -38,8 +54,32 @@ struct affine_on_t : ::beman::execution::sender_adaptor_closure { ::beman::execution::detail::make_sender( *this, ::beman::execution::env<>{}, ::std::forward(sender))); } + + /** + * @brief Overload for creating a sender adaptor from affine_on. + * + * @return A sender adaptor for the affine_on_t. + */ auto operator()() const { return ::beman::execution::detail::sender_adaptor{*this}; } + /** + * @brief affine_on is implemented by transforming it into a use of schedule_from. + * + * The constraints ensure that the environment provides a scheduler which is + * infallible and, thus, can be used to guarantee completion on the correct + * scheduler. + * + * The implementation first tries to see if the child sender's tag has a custom + * affine_on implementation. If it does, that is used. Otherwise, the default + * implementation gets a scheduler from the environment and uses schedule_from + * to adapt the sender to complete on that scheduler. + * + * @tparam Sender The type of the sender to be transformed. + * @tparam Env The type of the environment providing the scheduler. + * @param sender The sender to be transformed. + * @param env The environment providing the scheduler. + * @return A transformed sender that is affined to the scheduler. + */ template <::beman::execution::sender Sender, typename Env> requires requires(const Env& env) { { ::beman::execution::get_scheduler(env) } -> ::beman::execution::scheduler; @@ -75,9 +115,14 @@ struct affine_on_t : ::beman::execution::sender_adaptor_closure { } } }; + } // namespace beman::execution::detail namespace beman::execution { +/** + * @brief affine_on is a CPO, used to adapt a sender to complete on the scheduler + * it got started on which is derived from get_scheduler on the receiver's environment. + */ using beman::execution::detail::affine_on_t; inline constexpr affine_on_t affine_on{}; } // namespace beman::execution From 5279bf1f86181b8f121fd7a9a00a996354803fe8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Tue, 13 Jan 2026 23:24:33 +0000 Subject: [PATCH 10/16] minor fixes to the affine_on implementation --- include/beman/execution/detail/affine_on.hpp | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/include/beman/execution/detail/affine_on.hpp b/include/beman/execution/detail/affine_on.hpp index 4f2be0b1..43a4f3d5 100644 --- a/include/beman/execution/detail/affine_on.hpp +++ b/include/beman/execution/detail/affine_on.hpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -81,7 +82,7 @@ struct affine_on_t : ::beman::execution::sender_adaptor_closure { * @return A transformed sender that is affined to the scheduler. */ template <::beman::execution::sender Sender, typename Env> - requires requires(const Env& env) { + requires ::beman::execution::detail::sender_for && requires(const Env& env) { { ::beman::execution::get_scheduler(env) } -> ::beman::execution::scheduler; { ::beman::execution::schedule(::beman::execution::get_scheduler(env)) } -> ::beman::execution::sender; { @@ -94,8 +95,8 @@ struct affine_on_t : ::beman::execution::sender_adaptor_closure { } -> ::std::same_as<::beman::execution::completion_signatures<::beman::execution::set_value_t()>>; } static auto transform_sender(Sender&& sender, const Env& env) { - auto& [tag, data, child] = sender; - using child_tag_t = ::beman::execution::tag_of_t<::std::remove_cvref_t>; + [[maybe_unused]] auto& [tag, data, child] = sender; + using child_tag_t = ::beman::execution::tag_of_t<::std::remove_cvref_t>; if constexpr (requires(const child_tag_t& t) { { From 90130970b58770ab19ef9be1be28b3396abca192 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Wed, 14 Jan 2026 21:46:12 +0000 Subject: [PATCH 11/16] added some tests and an example affine_on customization --- docs/code/CMakeLists.txt | 22 +++- include/beman/execution/detail/just.hpp | 4 + tests/beman/execution/exec-affine-on.test.cpp | 115 +++++++++++++++++- 3 files changed, 135 insertions(+), 6 deletions(-) diff --git a/docs/code/CMakeLists.txt b/docs/code/CMakeLists.txt index 2d2752d3..330d5906 100644 --- a/docs/code/CMakeLists.txt +++ b/docs/code/CMakeLists.txt @@ -3,7 +3,27 @@ # SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception # gersemi: on -list(APPEND EXAMPLES) +list(APPEND EXAMPLES +sync_wait-just +sync_wait-just_error +sync_wait-just_stopped +just-then +just-then-throw +just_error-then +just_stopped-then +upon_error +expected +upon_stopped +just-let_value +just-let_value-just_error +just_error-let_value +just-let_value-throw +just_error-let_error +just_stopped-let_stopped +when_all +when_all-error +when_all-stopped +) if(BEMAN_USE_MODULES) list(APPEND EXAMPLES modules) # modules.cpp diff --git a/include/beman/execution/detail/just.hpp b/include/beman/execution/detail/just.hpp index e5e2b264..8cd1af59 100644 --- a/include/beman/execution/detail/just.hpp +++ b/include/beman/execution/detail/just.hpp @@ -34,6 +34,10 @@ struct just_t { return ::beman::execution::detail::make_sender( *this, ::beman::execution::detail::product_type{::std::forward(arg)...}); } + template <::beman::execution::sender Sender> + static auto affine_on(Sender&& sndr, const auto&) noexcept { + return ::std::forward(sndr); + } }; template diff --git a/tests/beman/execution/exec-affine-on.test.cpp b/tests/beman/execution/exec-affine-on.test.cpp index 7ad737ed..1ac023a2 100644 --- a/tests/beman/execution/exec-affine-on.test.cpp +++ b/tests/beman/execution/exec-affine-on.test.cpp @@ -2,15 +2,21 @@ // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception #include -#include -#include -#include #include -#include #include -#include #include +#include +#include #include +#include +#include +#include +#include +#include +#include +#include +#include + #include // ---------------------------------------------------------------------------- @@ -28,6 +34,90 @@ struct receiver { }; template receiver(Sched) -> receiver; + +struct test_scheduler { + using scheduler_concept = test_std::scheduler_t; + + struct data { + std::size_t connected_{}; + std::size_t started_{}; + }; + data* data_; + + template + struct state { + using operation_state_concept = test_std::operation_state_t; + std::remove_cvref_t receiver_; + data* data_; + auto start() & noexcept -> void { + ++this->data_->started_; + test_std::set_value(std::move(this->receiver_)); + } + }; + struct sender { + using sender_concept = test_std::sender_t; + using completion_signatures = test_std::completion_signatures; + data* data_; + struct env { + data* data_; + auto query(test_std::get_completion_scheduler_t) const noexcept { + return test_scheduler{}; + } + }; + auto get_env() const noexcept -> env { return {this->data_}; } + template + auto connect(Receiver&& rcvr) && noexcept -> state { + ++this->data_->connected_; + return {std::forward(rcvr), this->data_}; + } + }; + + auto schedule() const noexcept { + return sender{this->data_}; + } + friend auto operator==(const test_scheduler&, const test_scheduler&) noexcept -> bool = default; +}; + +static_assert(test_std::scheduler); + + auto test_order_of_connect() -> void { + test_scheduler::data inner_data{}; + test_scheduler inner_sched{&inner_data}; + + test_scheduler::data data{}; + test_scheduler sched{&data}; + auto sndr{test_std::affine_on(test_std::starts_on(inner_sched, test_std::just(42)))}; + + assert(data.connected_ == 0); + assert(data.started_ == 0); + assert(inner_data.connected_ == 0); + assert(inner_data.started_ == 0); + auto st{test_std::connect(std::move(sndr), receiver{sched})}; + assert(data.connected_ == 1); + assert(data.started_ == 0); + assert(inner_data.connected_ == 1); + assert(inner_data.started_ == 0); + test_std::start(st); + assert(data.connected_ == 1); + assert(data.started_ == 1); + assert(inner_data.connected_ == 1); + assert(inner_data.started_ == 1); + } + + auto test_affine_on_specialization() -> void { + test_scheduler::data data{}; + test_scheduler sched{&data}; + auto sndr{test_std::affine_on(test_std::just(42))}; + + assert(data.connected_ == 0); + assert(data.started_ == 0); + auto st{test_std::connect(std::move(sndr), receiver{sched})}; + assert(data.connected_ == 0); + assert(data.started_ == 0); + test_std::start(st); + assert(data.connected_ == 0); + assert(data.started_ == 0); + } } // namespace auto main() -> int { @@ -45,5 +135,20 @@ auto main() -> int { test_std::default_domain(), test_std::affine_on(test_std::just(42)), test_std::get_env(r))}; test_std::connect(std::move(st), std::move(r)); auto s0{test_std::connect(test_std::affine_on(test_std::just(42)), receiver(loop.get_scheduler()))}; + + std::jthread t{[&]() noexcept { loop.run(); }}; + auto r0 = test_std::sync_wait(test_std::affine_on(test_std::just(42))); + assert(r0); + auto[v0] = *r0; + assert(v0 == 42); + auto r1 = test_std::sync_wait(test_std::starts_on(loop.get_scheduler(), test_std::affine_on(test_std::just(42)))); + assert(r1); + auto[v1] = *r1; + assert(v1 == 42); + + loop.finish(); + + test_order_of_connect(); + test_affine_on_specialization(); return 0; } From 0ae43846f526e52cf67bfcc9f05865221725114e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Thu, 15 Jan 2026 22:48:18 +0000 Subject: [PATCH 12/16] added a few affine_on customizations --- docs/tutorial.mds | 2 +- include/beman/execution/detail/affine_on.hpp | 8 +- .../detail/nested_sender_has_affine_on.hpp | 20 +++ include/beman/execution/detail/read_env.hpp | 4 + .../execution/detail/sender_has_affine_on.hpp | 24 +++ include/beman/execution/detail/then.hpp | 6 + include/beman/execution/detail/write_env.hpp | 6 + src/beman/execution/CMakeLists.txt | 3 + tests/beman/execution/exec-affine-on.test.cpp | 158 ++++++++++++------ 9 files changed, 174 insertions(+), 57 deletions(-) create mode 100644 include/beman/execution/detail/nested_sender_has_affine_on.hpp create mode 100644 include/beman/execution/detail/sender_has_affine_on.hpp diff --git a/docs/tutorial.mds b/docs/tutorial.mds index 55f36dbe..90e6ac31 100644 --- a/docs/tutorial.mds +++ b/docs/tutorial.mds @@ -469,7 +469,7 @@ pipe notation. int main() { int f{3}; - try { *tst::sync_wait(ex::just(17) | ex::let_value([f](int i){ throw f * i; return ex::just(); })); } + try { tst::sync_wait(ex::just(17) | ex::let_value([f](int i){ throw f * i; return ex::just(); })); } catch (int e) { std::cout << "e=" << e << "\n"; } diff --git a/include/beman/execution/detail/affine_on.hpp b/include/beman/execution/detail/affine_on.hpp index 43a4f3d5..d5e91195 100644 --- a/include/beman/execution/detail/affine_on.hpp +++ b/include/beman/execution/detail/affine_on.hpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -98,11 +99,16 @@ struct affine_on_t : ::beman::execution::sender_adaptor_closure { [[maybe_unused]] auto& [tag, data, child] = sender; using child_tag_t = ::beman::execution::tag_of_t<::std::remove_cvref_t>; +#if 0 if constexpr (requires(const child_tag_t& t) { { t.affine_on(::beman::execution::detail::forward_like(child), env) } -> ::beman::execution::sender; - }) { + }) +#else + if constexpr (::beman::execution::detail::nested_sender_has_affine_on) +#endif + { return child_tag_t{}.affine_on(::beman::execution::detail::forward_like(child), env); } else { return ::beman::execution::write_env( diff --git a/include/beman/execution/detail/nested_sender_has_affine_on.hpp b/include/beman/execution/detail/nested_sender_has_affine_on.hpp new file mode 100644 index 00000000..be374edc --- /dev/null +++ b/include/beman/execution/detail/nested_sender_has_affine_on.hpp @@ -0,0 +1,20 @@ +// include/beman/execution/detail/nested_sender_has_affine_on.hpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#ifndef INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_NESTED_SENDER_HAS_AFFINE_ON +#define INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_NESTED_SENDER_HAS_AFFINE_ON + +#include + +// ---------------------------------------------------------------------------- + +namespace beman::execution::detail { +template +concept nested_sender_has_affine_on = requires(Sender&& sndr, const Env& env) { + { sndr.template get<2>() } -> ::beman::execution::detail::sender_has_affine_on; +}; +} // namespace beman::execution::detail + +// ---------------------------------------------------------------------------- + +#endif diff --git a/include/beman/execution/detail/read_env.hpp b/include/beman/execution/detail/read_env.hpp index ed1f3724..5b96137b 100644 --- a/include/beman/execution/detail/read_env.hpp +++ b/include/beman/execution/detail/read_env.hpp @@ -21,6 +21,10 @@ namespace beman::execution::detail { struct read_env_t { auto operator()(auto&& query) const { return ::beman::execution::detail::make_sender(*this, query); } + template <::beman::execution::sender Sender> + static auto affine_on(Sender&& sndr, const auto&) noexcept { + return ::std::forward(sndr); + } }; template <> diff --git a/include/beman/execution/detail/sender_has_affine_on.hpp b/include/beman/execution/detail/sender_has_affine_on.hpp new file mode 100644 index 00000000..742fb026 --- /dev/null +++ b/include/beman/execution/detail/sender_has_affine_on.hpp @@ -0,0 +1,24 @@ +// include/beman/execution/detail/sender_has_affine_on.hpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#ifndef INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_SENDER_HAS_AFFINE_ON +#define INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_SENDER_HAS_AFFINE_ON + +#include +#include +#include + +// ---------------------------------------------------------------------------- + +namespace beman::execution::detail { +template +concept sender_has_affine_on = + beman::execution::sender<::std::remove_cvref_t> && requires(Sender&& sndr, const Env& env) { + sndr.template get<0>(); + { sndr.template get<0>().affine_on(std::forward(sndr), env) } -> ::beman::execution::sender; + }; +} // namespace beman::execution::detail + +// ---------------------------------------------------------------------------- + +#endif diff --git a/include/beman/execution/detail/then.hpp b/include/beman/execution/detail/then.hpp index 1a342201..56078d0a 100644 --- a/include/beman/execution/detail/then.hpp +++ b/include/beman/execution/detail/then.hpp @@ -16,6 +16,7 @@ #include #include #include +#include #include #include #include @@ -46,6 +47,11 @@ struct then_t : ::beman::execution::sender_adaptor_closure> { domain, ::beman::execution::detail::make_sender(*this, ::std::forward(fun), ::std::forward(sender))); } + template <::beman::execution::sender Sender, typename Env> + requires ::beman::execution::detail::nested_sender_has_affine_on + static auto affine_on(Sender&& sndr, const Env&) noexcept { + return ::std::forward(sndr); + } }; template diff --git a/include/beman/execution/detail/write_env.hpp b/include/beman/execution/detail/write_env.hpp index 6da548a4..6fff51b3 100644 --- a/include/beman/execution/detail/write_env.hpp +++ b/include/beman/execution/detail/write_env.hpp @@ -11,6 +11,7 @@ #include #include #include +#include #include #include @@ -24,6 +25,11 @@ struct write_env_t { *this, ::std::forward(env), ::std::forward(sender)); } static auto name() { return "write_env_t"; } + template <::beman::execution::sender Sender, typename Env> + requires ::beman::execution::detail::nested_sender_has_affine_on + static auto affine_on(Sender&& sndr, const Env&) noexcept { + return ::std::forward(sndr); + } }; template diff --git a/src/beman/execution/CMakeLists.txt b/src/beman/execution/CMakeLists.txt index fc15f853..0a3e3e34 100644 --- a/src/beman/execution/CMakeLists.txt +++ b/src/beman/execution/CMakeLists.txt @@ -26,6 +26,7 @@ target_sources( TYPE HEADERS BASE_DIRS ${PROJECT_SOURCE_DIR}/include FILES + ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/affine_on.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/allocator_aware_move.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/almost_scheduler.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/apply_sender.hpp @@ -118,6 +119,7 @@ target_sources( ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/meta_transform.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/meta_unique.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/movable_value.hpp + ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/nested_sender_has_affine_on.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/never_stop_token.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/non_assignable.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/nostopstate.hpp @@ -149,6 +151,7 @@ target_sources( ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/sender_awaitable.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/sender_decompose.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/sender_for.hpp + ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/sender_has_affine_on.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/sender_in.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/sends_stopped.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/set_error.hpp diff --git a/tests/beman/execution/exec-affine-on.test.cpp b/tests/beman/execution/exec-affine-on.test.cpp index 1ac023a2..c72e4fdf 100644 --- a/tests/beman/execution/exec-affine-on.test.cpp +++ b/tests/beman/execution/exec-affine-on.test.cpp @@ -6,6 +6,8 @@ #include #include #include +#include +#include #include #include #include @@ -15,6 +17,11 @@ #include #include #include +#include +#include +#include +#include +#include #include #include @@ -22,18 +29,35 @@ // ---------------------------------------------------------------------------- namespace { +struct awaiter { + bool done{false}; + std::mutex mtx{}; + std::condition_variable cv{}; + + auto complete() -> bool { + std::lock_guard lock{this->mtx}; + this->done = true; + this->cv.notify_all(); + return true; + } + auto await() { + std::unique_lock lock{this->mtx}; + this->cv.wait(lock, [&]() noexcept { return this->done; }); + } +}; template struct receiver { using receiver_concept = test_std::receiver_t; - Sched scheduler_; - auto set_value(auto&&...) && noexcept -> void {} - auto set_error(auto&&) && noexcept -> void {} - auto set_stopped() && noexcept -> void {} + Sched scheduler_; + awaiter* awaiter_{nullptr}; + auto set_value(auto&&...) && noexcept -> void { this->awaiter_&& this->awaiter_->complete(); } + auto set_error(auto&&) && noexcept -> void { this->awaiter_&& this->awaiter_->complete(); } + auto set_stopped() && noexcept -> void { this->awaiter_&& this->awaiter_->complete(); } auto get_env() const noexcept { return test_std::env{test_std::prop(test_std::get_scheduler, this->scheduler_)}; } }; template -receiver(Sched) -> receiver; +receiver(Sched, awaiter* = nullptr) -> receiver; struct test_scheduler { using scheduler_concept = test_std::scheduler_t; @@ -48,23 +72,23 @@ struct test_scheduler { struct state { using operation_state_concept = test_std::operation_state_t; std::remove_cvref_t receiver_; - data* data_; - auto start() & noexcept -> void { + data* data_; + auto start() & noexcept -> void { ++this->data_->started_; test_std::set_value(std::move(this->receiver_)); } }; struct sender { - using sender_concept = test_std::sender_t; + using sender_concept = test_std::sender_t; using completion_signatures = test_std::completion_signatures; data* data_; struct env { data* data_; - auto query(test_std::get_completion_scheduler_t) const noexcept { + auto query(test_std::get_completion_scheduler_t) const noexcept { return test_scheduler{}; } }; - auto get_env() const noexcept -> env { return {this->data_}; } + auto get_env() const noexcept -> env { return {this->data_}; } template auto connect(Receiver&& rcvr) && noexcept -> state { ++this->data_->connected_; @@ -72,52 +96,54 @@ struct test_scheduler { } }; - auto schedule() const noexcept { - return sender{this->data_}; - } + auto schedule() const noexcept { return sender{this->data_}; } friend auto operator==(const test_scheduler&, const test_scheduler&) noexcept -> bool = default; }; static_assert(test_std::scheduler); - auto test_order_of_connect() -> void { - test_scheduler::data inner_data{}; - test_scheduler inner_sched{&inner_data}; - - test_scheduler::data data{}; - test_scheduler sched{&data}; - auto sndr{test_std::affine_on(test_std::starts_on(inner_sched, test_std::just(42)))}; - - assert(data.connected_ == 0); - assert(data.started_ == 0); - assert(inner_data.connected_ == 0); - assert(inner_data.started_ == 0); - auto st{test_std::connect(std::move(sndr), receiver{sched})}; - assert(data.connected_ == 1); - assert(data.started_ == 0); - assert(inner_data.connected_ == 1); - assert(inner_data.started_ == 0); - test_std::start(st); - assert(data.connected_ == 1); - assert(data.started_ == 1); - assert(inner_data.connected_ == 1); - assert(inner_data.started_ == 1); - } +auto test_order_of_connect() -> void { + test_scheduler::data inner_data{}; + test_scheduler inner_sched{&inner_data}; + + test_scheduler::data data{}; + test_scheduler sched{&data}; + auto sndr{test_std::affine_on(test_std::starts_on(inner_sched, test_std::just(42)))}; + + assert(data.connected_ == 0); + assert(data.started_ == 0); + assert(inner_data.connected_ == 0); + assert(inner_data.started_ == 0); + auto st{test_std::connect(std::move(sndr), receiver{sched})}; + assert(data.connected_ == 1); + assert(data.started_ == 0); + assert(inner_data.connected_ == 1); + assert(inner_data.started_ == 0); + test_std::start(st); + assert(data.connected_ == 1); + assert(data.started_ == 1); + assert(inner_data.connected_ == 1); + assert(inner_data.started_ == 1); +} - auto test_affine_on_specialization() -> void { - test_scheduler::data data{}; - test_scheduler sched{&data}; - auto sndr{test_std::affine_on(test_std::just(42))}; - - assert(data.connected_ == 0); - assert(data.started_ == 0); - auto st{test_std::connect(std::move(sndr), receiver{sched})}; - assert(data.connected_ == 0); - assert(data.started_ == 0); - test_std::start(st); - assert(data.connected_ == 0); - assert(data.started_ == 0); - } +template +auto test_affine_on_specializations(Sender&& sender, std::size_t count = 0u) -> void { + test_scheduler::data data{}; + test_scheduler sched{&data}; + auto sndr{test_std::affine_on(std::forward(sender))}; + awaiter aw{}; + + assert(data.connected_ == 0); + assert(data.started_ == 0); + auto st{test_std::connect(std::move(sndr), receiver{sched, &aw})}; + assert(data.connected_ == count); + assert(data.started_ == 0); + test_std::start(st); + aw.await(); + + assert(data.connected_ == count); + assert(data.started_ == count); +} } // namespace auto main() -> int { @@ -137,18 +163,40 @@ auto main() -> int { auto s0{test_std::connect(test_std::affine_on(test_std::just(42)), receiver(loop.get_scheduler()))}; std::jthread t{[&]() noexcept { loop.run(); }}; - auto r0 = test_std::sync_wait(test_std::affine_on(test_std::just(42))); + auto r0 = test_std::sync_wait(test_std::affine_on(test_std::just(42))); assert(r0); - auto[v0] = *r0; + auto [v0] = *r0; assert(v0 == 42); auto r1 = test_std::sync_wait(test_std::starts_on(loop.get_scheduler(), test_std::affine_on(test_std::just(42)))); assert(r1); - auto[v1] = *r1; + auto [v1] = *r1; assert(v1 == 42); + test_order_of_connect(); + test_affine_on_specializations(test_std::just(42)); + test_affine_on_specializations(test_std::just(42, true, 3.14)); + test_affine_on_specializations(test_std::just_error(42)); + test_affine_on_specializations(test_std::just_stopped()); + test_affine_on_specializations(test_std::just_stopped()); + test_affine_on_specializations(test_std::read_env(test_std::get_stop_token)); + test_affine_on_specializations(test_std::write_env( + test_std::just(42), test_std::env{test_std::prop{test_std::get_stop_token, test_std::never_stop_token{}}})); + test_affine_on_specializations( + test_std::write_env(test_std::starts_on(loop.get_scheduler(), test_std::just(42)), + test_std::env{test_std::prop{test_std::get_stop_token, test_std::never_stop_token{}}}), + 1u); + test_affine_on_specializations(test_std::then(test_std::just(42), [](int) {})); + test_affine_on_specializations( + test_std::then(test_std::starts_on(loop.get_scheduler(), test_std::just(42)), [](auto&&...) {}), 1u); + test_affine_on_specializations(test_std::upon_error(test_std::just_error(42), [](int) {})); + test_affine_on_specializations( + test_std::upon_error(test_std::starts_on(loop.get_scheduler(), test_std::just_error(42)), [](auto&&...) {}), + 1u); + test_affine_on_specializations(test_std::upon_stopped(test_std::just_stopped(), [] {})); + test_affine_on_specializations( + test_std::upon_stopped(test_std::starts_on(loop.get_scheduler(), test_std::just_stopped()), [] {}), 1u); + loop.finish(); - test_order_of_connect(); - test_affine_on_specialization(); return 0; } From e306f20837832ecb1d58c00773e694a900fa0cfe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Thu, 15 Jan 2026 22:51:10 +0000 Subject: [PATCH 13/16] restore empty CMakeLists.txt for code examples --- docs/code/CMakeLists.txt | 22 +--------------------- 1 file changed, 1 insertion(+), 21 deletions(-) diff --git a/docs/code/CMakeLists.txt b/docs/code/CMakeLists.txt index 330d5906..2d2752d3 100644 --- a/docs/code/CMakeLists.txt +++ b/docs/code/CMakeLists.txt @@ -3,27 +3,7 @@ # SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception # gersemi: on -list(APPEND EXAMPLES -sync_wait-just -sync_wait-just_error -sync_wait-just_stopped -just-then -just-then-throw -just_error-then -just_stopped-then -upon_error -expected -upon_stopped -just-let_value -just-let_value-just_error -just_error-let_value -just-let_value-throw -just_error-let_error -just_stopped-let_stopped -when_all -when_all-error -when_all-stopped -) +list(APPEND EXAMPLES) if(BEMAN_USE_MODULES) list(APPEND EXAMPLES modules) # modules.cpp From 8650193b6e446c21e83fb11b05b24752cae22945 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Thu, 15 Jan 2026 22:59:35 +0000 Subject: [PATCH 14/16] fix two minor issues discovered by CI --- docs/overview.md | 2 +- tests/beman/execution/exec-affine-on.test.cpp | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/docs/overview.md b/docs/overview.md index ffef8475..2fd3606a 100644 --- a/docs/overview.md +++ b/docs/overview.md @@ -642,7 +642,7 @@ The sender adaptors take one or more senders and adapt their respective behavior
affine_on(sender) -> sender-of<completions-of(sender)> The expression affine_on(sender) creates -a sender which completes on the same scheduler it was started on, even if sender changes the scheduler. The scheduler to resume on is determined using get_scheduler(get_env(rcvr)) where rcvr is the reciver the sender is connected to. +a sender which completes on the same scheduler it was started on, even if sender changes the scheduler. The scheduler to resume on is determined using get_scheduler(get_env(rcvr)) where rcvr is the receiver the sender is connected to. The primary use of affine_on is implementing scheduler affinity for task.
diff --git a/tests/beman/execution/exec-affine-on.test.cpp b/tests/beman/execution/exec-affine-on.test.cpp index c72e4fdf..c5e95b9e 100644 --- a/tests/beman/execution/exec-affine-on.test.cpp +++ b/tests/beman/execution/exec-affine-on.test.cpp @@ -162,7 +162,7 @@ auto main() -> int { test_std::connect(std::move(st), std::move(r)); auto s0{test_std::connect(test_std::affine_on(test_std::just(42)), receiver(loop.get_scheduler()))}; - std::jthread t{[&]() noexcept { loop.run(); }}; + std::thread t{[&]() noexcept { loop.run(); }}; auto r0 = test_std::sync_wait(test_std::affine_on(test_std::just(42))); assert(r0); auto [v0] = *r0; @@ -197,6 +197,7 @@ auto main() -> int { test_std::upon_stopped(test_std::starts_on(loop.get_scheduler(), test_std::just_stopped()), [] {}), 1u); loop.finish(); + t.join(); return 0; } From e4be2b84153eb1596162511bd28743f061b449ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Thu, 15 Jan 2026 23:00:52 +0000 Subject: [PATCH 15/16] fix a formatting issue --- tests/beman/execution/exec-affine-on.test.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/beman/execution/exec-affine-on.test.cpp b/tests/beman/execution/exec-affine-on.test.cpp index c5e95b9e..506df77d 100644 --- a/tests/beman/execution/exec-affine-on.test.cpp +++ b/tests/beman/execution/exec-affine-on.test.cpp @@ -163,7 +163,7 @@ auto main() -> int { auto s0{test_std::connect(test_std::affine_on(test_std::just(42)), receiver(loop.get_scheduler()))}; std::thread t{[&]() noexcept { loop.run(); }}; - auto r0 = test_std::sync_wait(test_std::affine_on(test_std::just(42))); + auto r0 = test_std::sync_wait(test_std::affine_on(test_std::just(42))); assert(r0); auto [v0] = *r0; assert(v0 == 42); From 475fc410165b3e2e5071ab11a5431b238827fae2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Thu, 15 Jan 2026 23:12:17 +0000 Subject: [PATCH 16/16] remove duplicate headers --- include/beman/execution/detail/affine_on.hpp | 1 - tests/beman/execution/exec-affine-on.test.cpp | 13 ++++++------- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/include/beman/execution/detail/affine_on.hpp b/include/beman/execution/detail/affine_on.hpp index d5e91195..3f3a6ed4 100644 --- a/include/beman/execution/detail/affine_on.hpp +++ b/include/beman/execution/detail/affine_on.hpp @@ -4,7 +4,6 @@ #ifndef INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_AFFINE_ON #define INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_AFFINE_ON -#include #include #include #include diff --git a/tests/beman/execution/exec-affine-on.test.cpp b/tests/beman/execution/exec-affine-on.test.cpp index 506df77d..63af703c 100644 --- a/tests/beman/execution/exec-affine-on.test.cpp +++ b/tests/beman/execution/exec-affine-on.test.cpp @@ -4,22 +4,21 @@ #include #include #include +#include #include +#include #include -#include -#include #include +#include #include #include #include #include #include -#include -#include -#include -#include -#include #include +#include +#include + #include #include #include