diff --git a/docs/overview.md b/docs/overview.md index e226623f..2fd3606a 100644 --- a/docs/overview.md +++ b/docs/overview.md @@ -639,6 +639,13 @@ The expression schedule(scheduler) creates a sender which up ### Sender Adaptors The sender adaptors take one or more senders and adapt their respective behavior to complete with a corresponding result. The description uses the informal function completions-of(sender) to represent the completion signatures which sender produces. Also, completion signatures are combined using +: the result is the deduplicated set of the combined completion signatures. +
+affine_on(sender) -> sender-of<completions-of(sender)> +The expression affine_on(sender) creates +a sender which completes on the same scheduler it was started on, even if sender changes the scheduler. The scheduler to resume on is determined using get_scheduler(get_env(rcvr)) where rcvr is the receiver the sender is connected to. + +The primary use of affine_on is implementing scheduler affinity for task. +
`bulk`
@@ -698,6 +705,10 @@ The expression into_variant(sender) creates a sender which t
when_all_with_variant(sender...) -> sender
+
+write_env(sender, env) -> sender +
+ ### Sender Consumers diff --git a/docs/tutorial.mds b/docs/tutorial.mds index 55f36dbe..90e6ac31 100644 --- a/docs/tutorial.mds +++ b/docs/tutorial.mds @@ -469,7 +469,7 @@ pipe notation. int main() { int f{3}; - try { *tst::sync_wait(ex::just(17) | ex::let_value([f](int i){ throw f * i; return ex::just(); })); } + try { tst::sync_wait(ex::just(17) | ex::let_value([f](int i){ throw f * i; return ex::just(); })); } catch (int e) { std::cout << "e=" << e << "\n"; } diff --git a/include/beman/execution/detail/affine_on.hpp b/include/beman/execution/detail/affine_on.hpp new file mode 100644 index 00000000..3f3a6ed4 --- /dev/null +++ b/include/beman/execution/detail/affine_on.hpp @@ -0,0 +1,138 @@ +// include/beman/execution/detail/affine_on.hpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#ifndef INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_AFFINE_ON +#define INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_AFFINE_ON + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +// ---------------------------------------------------------------------------- + +namespace beman::execution::detail { + +/** + * @brief The affine_on_t struct is a sender adaptor closure that transforms a sender + * to complete on the scheduler obtained from the receiver's environment. + * + * This adaptor implements scheduler affinity to adapt a sender to complete on the + * scheduler obtained the receiver's environment. The get_scheduler query is used + * to obtain the scheduler on which the sender gets started. + */ +struct affine_on_t : ::beman::execution::sender_adaptor_closure { + /** + * @brief Adapt a sender with affine_on. + * + * @tparam Sender The deduced type of the sender to be transformed. + * @param sender The sender to be transformed. + * @return An adapted sender to complete on the scheduler it was started on. + */ + template <::beman::execution::sender Sender> + auto operator()(Sender&& sender) const { + return ::beman::execution::detail::transform_sender( + ::beman::execution::detail::get_domain_early(sender), + ::beman::execution::detail::make_sender( + *this, ::beman::execution::env<>{}, ::std::forward(sender))); + } + + /** + * @brief Overload for creating a sender adaptor from affine_on. + * + * @return A sender adaptor for the affine_on_t. + */ + auto operator()() const { return ::beman::execution::detail::sender_adaptor{*this}; } + + /** + * @brief affine_on is implemented by transforming it into a use of schedule_from. + * + * The constraints ensure that the environment provides a scheduler which is + * infallible and, thus, can be used to guarantee completion on the correct + * scheduler. + * + * The implementation first tries to see if the child sender's tag has a custom + * affine_on implementation. If it does, that is used. Otherwise, the default + * implementation gets a scheduler from the environment and uses schedule_from + * to adapt the sender to complete on that scheduler. + * + * @tparam Sender The type of the sender to be transformed. + * @tparam Env The type of the environment providing the scheduler. + * @param sender The sender to be transformed. + * @param env The environment providing the scheduler. + * @return A transformed sender that is affined to the scheduler. + */ + template <::beman::execution::sender Sender, typename Env> + requires ::beman::execution::detail::sender_for && requires(const Env& env) { + { ::beman::execution::get_scheduler(env) } -> ::beman::execution::scheduler; + { ::beman::execution::schedule(::beman::execution::get_scheduler(env)) } -> ::beman::execution::sender; + { + ::beman::execution::get_completion_signatures( + ::beman::execution::schedule(::beman::execution::get_scheduler(env)), + ::beman::execution::detail::join_env( + ::beman::execution::env{::beman::execution::prop{::beman::execution::get_stop_token, + ::beman::execution::never_stop_token{}}}, + env)) + } -> ::std::same_as<::beman::execution::completion_signatures<::beman::execution::set_value_t()>>; + } + static auto transform_sender(Sender&& sender, const Env& env) { + [[maybe_unused]] auto& [tag, data, child] = sender; + using child_tag_t = ::beman::execution::tag_of_t<::std::remove_cvref_t>; + +#if 0 + if constexpr (requires(const child_tag_t& t) { + { + t.affine_on(::beman::execution::detail::forward_like(child), env) + } -> ::beman::execution::sender; + }) +#else + if constexpr (::beman::execution::detail::nested_sender_has_affine_on) +#endif + { + return child_tag_t{}.affine_on(::beman::execution::detail::forward_like(child), env); + } else { + return ::beman::execution::write_env( + ::beman::execution::schedule_from( + ::beman::execution::get_scheduler(env), + ::beman::execution::write_env(::beman::execution::detail::forward_like(child), env)), + ::beman::execution::detail::join_env( + ::beman::execution::env{::beman::execution::prop{::beman::execution::get_stop_token, + ::beman::execution::never_stop_token{}}}, + env)); + } + } +}; + +} // namespace beman::execution::detail + +namespace beman::execution { +/** + * @brief affine_on is a CPO, used to adapt a sender to complete on the scheduler + * it got started on which is derived from get_scheduler on the receiver's environment. + */ +using beman::execution::detail::affine_on_t; +inline constexpr affine_on_t affine_on{}; +} // namespace beman::execution + +// ---------------------------------------------------------------------------- + +#endif diff --git a/include/beman/execution/detail/just.hpp b/include/beman/execution/detail/just.hpp index e5e2b264..8cd1af59 100644 --- a/include/beman/execution/detail/just.hpp +++ b/include/beman/execution/detail/just.hpp @@ -34,6 +34,10 @@ struct just_t { return ::beman::execution::detail::make_sender( *this, ::beman::execution::detail::product_type{::std::forward(arg)...}); } + template <::beman::execution::sender Sender> + static auto affine_on(Sender&& sndr, const auto&) noexcept { + return ::std::forward(sndr); + } }; template diff --git a/include/beman/execution/detail/let.hpp b/include/beman/execution/detail/let.hpp index 32d30e30..e8102be4 100644 --- a/include/beman/execution/detail/let.hpp +++ b/include/beman/execution/detail/let.hpp @@ -162,7 +162,12 @@ struct impls_for<::beman::execution::detail::let_t> : ::beman::execu {}}; }}; template - static auto let_bind(auto& state, Receiver& receiver, Args&&... args) { + static auto + let_bind(auto& state, Receiver& receiver, Args&&... args) noexcept(noexcept(::beman::execution::connect( + ::std::apply(::std::move(state.fun), + ::std::move(state.args.template emplace<::beman::execution::detail::decayed_tuple>( + ::std::forward(args)...))), + let_receiver{receiver, state.env}))) { using args_t = ::beman::execution::detail::decayed_tuple; auto mkop{[&] { return ::beman::execution::connect( @@ -179,7 +184,8 @@ struct impls_for<::beman::execution::detail::let_t> : ::beman::execu try { let_bind(state, receiver, ::std::forward(args)...); } catch (...) { - ::beman::execution::set_error(::std::move(receiver), ::std::current_exception()); + if constexpr (not noexcept(let_bind(state, receiver, ::std::forward(args)...))) + ::beman::execution::set_error(::std::move(receiver), ::std::current_exception()); } } else { Tag()(::std::move(receiver), ::std::forward(args)...); diff --git a/include/beman/execution/detail/nested_sender_has_affine_on.hpp b/include/beman/execution/detail/nested_sender_has_affine_on.hpp new file mode 100644 index 00000000..be374edc --- /dev/null +++ b/include/beman/execution/detail/nested_sender_has_affine_on.hpp @@ -0,0 +1,20 @@ +// include/beman/execution/detail/nested_sender_has_affine_on.hpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#ifndef INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_NESTED_SENDER_HAS_AFFINE_ON +#define INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_NESTED_SENDER_HAS_AFFINE_ON + +#include + +// ---------------------------------------------------------------------------- + +namespace beman::execution::detail { +template +concept nested_sender_has_affine_on = requires(Sender&& sndr, const Env& env) { + { sndr.template get<2>() } -> ::beman::execution::detail::sender_has_affine_on; +}; +} // namespace beman::execution::detail + +// ---------------------------------------------------------------------------- + +#endif diff --git a/include/beman/execution/detail/read_env.hpp b/include/beman/execution/detail/read_env.hpp index ed1f3724..5b96137b 100644 --- a/include/beman/execution/detail/read_env.hpp +++ b/include/beman/execution/detail/read_env.hpp @@ -21,6 +21,10 @@ namespace beman::execution::detail { struct read_env_t { auto operator()(auto&& query) const { return ::beman::execution::detail::make_sender(*this, query); } + template <::beman::execution::sender Sender> + static auto affine_on(Sender&& sndr, const auto&) noexcept { + return ::std::forward(sndr); + } }; template <> diff --git a/include/beman/execution/detail/run_loop.hpp b/include/beman/execution/detail/run_loop.hpp index d2c05993..4a1f8796 100644 --- a/include/beman/execution/detail/run_loop.hpp +++ b/include/beman/execution/detail/run_loop.hpp @@ -12,9 +12,9 @@ #include #include #include -#include #include #include +#include #include #include @@ -53,27 +53,29 @@ class run_loop { // NOLINTBEGIN(misc-no-recursion) template opstate(run_loop* l, R&& rcvr) : loop(l), receiver(::std::forward(rcvr)) {} - auto start() & noexcept -> void { - try { - this->loop->push_back(this); - } catch (...) { - ::beman::execution::set_error(::std::move(this->receiver), ::std::current_exception()); - } - } + auto start() & noexcept -> void { this->loop->push_back(this); } // NOLINTEND(misc-no-recursion) auto execute() noexcept -> void override { - if (::beman::execution::get_stop_token(::beman::execution::get_env(this->receiver)).stop_requested()) - ::beman::execution::set_stopped(::std::move(this->receiver)); - else + using token = decltype(::beman::execution::get_stop_token(::beman::execution::get_env(this->receiver))); + if constexpr (not ::beman::execution::unstoppable_token) { + if (::beman::execution::get_stop_token(::beman::execution::get_env(this->receiver)).stop_requested()) + ::beman::execution::set_stopped(::std::move(this->receiver)); + else + ::beman::execution::set_value(::std::move(this->receiver)); + } else ::beman::execution::set_value(::std::move(this->receiver)); } }; struct sender { using sender_concept = ::beman::execution::sender_t; - using completion_signatures = - ::beman::execution::completion_signatures<::beman::execution::set_value_t(), - ::beman::execution::set_error_t(::std::exception_ptr), - ::beman::execution::set_stopped_t()>; + template > + auto get_completion_signatures(Env&& env) const noexcept { + if constexpr (::beman::execution::unstoppable_token) + return ::beman::execution::completion_signatures<::beman::execution::set_value_t()>{}; + else + return ::beman::execution::completion_signatures<::beman::execution::set_value_t(), + ::beman::execution::set_stopped_t()>{}; + } run_loop* loop; @@ -100,7 +102,8 @@ class run_loop { opstate_base* front{}; opstate_base* back{}; - auto push_back(opstate_base* item) -> void { + auto push_back(opstate_base* item) noexcept -> void { + //-dk:TODO run_loop::push_back should really be lock-free ::std::lock_guard guard(this->mutex); if (auto previous_back{::std::exchange(this->back, item)}) { previous_back->next = item; @@ -109,7 +112,8 @@ class run_loop { this->condition.notify_one(); } } - auto pop_front() -> opstate_base* { + auto pop_front() noexcept -> opstate_base* { + //-dk:TODO run_loop::pop_front should really be lock-free ::std::unique_lock guard(this->mutex); this->condition.wait(guard, [this] { return this->front || this->current_state == state::finishing; }); if (this->front == this->back) diff --git a/include/beman/execution/detail/sender_has_affine_on.hpp b/include/beman/execution/detail/sender_has_affine_on.hpp new file mode 100644 index 00000000..742fb026 --- /dev/null +++ b/include/beman/execution/detail/sender_has_affine_on.hpp @@ -0,0 +1,24 @@ +// include/beman/execution/detail/sender_has_affine_on.hpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#ifndef INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_SENDER_HAS_AFFINE_ON +#define INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_SENDER_HAS_AFFINE_ON + +#include +#include +#include + +// ---------------------------------------------------------------------------- + +namespace beman::execution::detail { +template +concept sender_has_affine_on = + beman::execution::sender<::std::remove_cvref_t> && requires(Sender&& sndr, const Env& env) { + sndr.template get<0>(); + { sndr.template get<0>().affine_on(std::forward(sndr), env) } -> ::beman::execution::sender; + }; +} // namespace beman::execution::detail + +// ---------------------------------------------------------------------------- + +#endif diff --git a/include/beman/execution/detail/then.hpp b/include/beman/execution/detail/then.hpp index 1a342201..56078d0a 100644 --- a/include/beman/execution/detail/then.hpp +++ b/include/beman/execution/detail/then.hpp @@ -16,6 +16,7 @@ #include #include #include +#include #include #include #include @@ -46,6 +47,11 @@ struct then_t : ::beman::execution::sender_adaptor_closure> { domain, ::beman::execution::detail::make_sender(*this, ::std::forward(fun), ::std::forward(sender))); } + template <::beman::execution::sender Sender, typename Env> + requires ::beman::execution::detail::nested_sender_has_affine_on + static auto affine_on(Sender&& sndr, const Env&) noexcept { + return ::std::forward(sndr); + } }; template diff --git a/include/beman/execution/detail/write_env.hpp b/include/beman/execution/detail/write_env.hpp index 6da548a4..6fff51b3 100644 --- a/include/beman/execution/detail/write_env.hpp +++ b/include/beman/execution/detail/write_env.hpp @@ -11,6 +11,7 @@ #include #include #include +#include #include #include @@ -24,6 +25,11 @@ struct write_env_t { *this, ::std::forward(env), ::std::forward(sender)); } static auto name() { return "write_env_t"; } + template <::beman::execution::sender Sender, typename Env> + requires ::beman::execution::detail::nested_sender_has_affine_on + static auto affine_on(Sender&& sndr, const Env&) noexcept { + return ::std::forward(sndr); + } }; template diff --git a/include/beman/execution/execution.hpp b/include/beman/execution/execution.hpp index c5bb7748..b68c68c7 100644 --- a/include/beman/execution/execution.hpp +++ b/include/beman/execution/execution.hpp @@ -38,6 +38,7 @@ #include #include +#include #include #include #include diff --git a/src/beman/execution/CMakeLists.txt b/src/beman/execution/CMakeLists.txt index fc15f853..0a3e3e34 100644 --- a/src/beman/execution/CMakeLists.txt +++ b/src/beman/execution/CMakeLists.txt @@ -26,6 +26,7 @@ target_sources( TYPE HEADERS BASE_DIRS ${PROJECT_SOURCE_DIR}/include FILES + ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/affine_on.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/allocator_aware_move.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/almost_scheduler.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/apply_sender.hpp @@ -118,6 +119,7 @@ target_sources( ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/meta_transform.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/meta_unique.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/movable_value.hpp + ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/nested_sender_has_affine_on.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/never_stop_token.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/non_assignable.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/nostopstate.hpp @@ -149,6 +151,7 @@ target_sources( ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/sender_awaitable.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/sender_decompose.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/sender_for.hpp + ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/sender_has_affine_on.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/sender_in.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/sends_stopped.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/set_error.hpp diff --git a/tests/beman/execution/CMakeLists.txt b/tests/beman/execution/CMakeLists.txt index 01511154..4f4fefc7 100644 --- a/tests/beman/execution/CMakeLists.txt +++ b/tests/beman/execution/CMakeLists.txt @@ -20,6 +20,7 @@ endif() list( APPEND execution_tests + exec-affine-on.test issue-174.test issue-186.test exec-scope-counting.test diff --git a/tests/beman/execution/exec-affine-on.test.cpp b/tests/beman/execution/exec-affine-on.test.cpp new file mode 100644 index 00000000..63af703c --- /dev/null +++ b/tests/beman/execution/exec-affine-on.test.cpp @@ -0,0 +1,202 @@ +// tests/beman/execution/exec-affine-on.test.cpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include + +// ---------------------------------------------------------------------------- + +namespace { +struct awaiter { + bool done{false}; + std::mutex mtx{}; + std::condition_variable cv{}; + + auto complete() -> bool { + std::lock_guard lock{this->mtx}; + this->done = true; + this->cv.notify_all(); + return true; + } + auto await() { + std::unique_lock lock{this->mtx}; + this->cv.wait(lock, [&]() noexcept { return this->done; }); + } +}; +template +struct receiver { + using receiver_concept = test_std::receiver_t; + Sched scheduler_; + awaiter* awaiter_{nullptr}; + auto set_value(auto&&...) && noexcept -> void { this->awaiter_&& this->awaiter_->complete(); } + auto set_error(auto&&) && noexcept -> void { this->awaiter_&& this->awaiter_->complete(); } + auto set_stopped() && noexcept -> void { this->awaiter_&& this->awaiter_->complete(); } + + auto get_env() const noexcept { return test_std::env{test_std::prop(test_std::get_scheduler, this->scheduler_)}; } +}; +template +receiver(Sched, awaiter* = nullptr) -> receiver; + +struct test_scheduler { + using scheduler_concept = test_std::scheduler_t; + + struct data { + std::size_t connected_{}; + std::size_t started_{}; + }; + data* data_; + + template + struct state { + using operation_state_concept = test_std::operation_state_t; + std::remove_cvref_t receiver_; + data* data_; + auto start() & noexcept -> void { + ++this->data_->started_; + test_std::set_value(std::move(this->receiver_)); + } + }; + struct sender { + using sender_concept = test_std::sender_t; + using completion_signatures = test_std::completion_signatures; + data* data_; + struct env { + data* data_; + auto query(test_std::get_completion_scheduler_t) const noexcept { + return test_scheduler{}; + } + }; + auto get_env() const noexcept -> env { return {this->data_}; } + template + auto connect(Receiver&& rcvr) && noexcept -> state { + ++this->data_->connected_; + return {std::forward(rcvr), this->data_}; + } + }; + + auto schedule() const noexcept { return sender{this->data_}; } + friend auto operator==(const test_scheduler&, const test_scheduler&) noexcept -> bool = default; +}; + +static_assert(test_std::scheduler); + +auto test_order_of_connect() -> void { + test_scheduler::data inner_data{}; + test_scheduler inner_sched{&inner_data}; + + test_scheduler::data data{}; + test_scheduler sched{&data}; + auto sndr{test_std::affine_on(test_std::starts_on(inner_sched, test_std::just(42)))}; + + assert(data.connected_ == 0); + assert(data.started_ == 0); + assert(inner_data.connected_ == 0); + assert(inner_data.started_ == 0); + auto st{test_std::connect(std::move(sndr), receiver{sched})}; + assert(data.connected_ == 1); + assert(data.started_ == 0); + assert(inner_data.connected_ == 1); + assert(inner_data.started_ == 0); + test_std::start(st); + assert(data.connected_ == 1); + assert(data.started_ == 1); + assert(inner_data.connected_ == 1); + assert(inner_data.started_ == 1); +} + +template +auto test_affine_on_specializations(Sender&& sender, std::size_t count = 0u) -> void { + test_scheduler::data data{}; + test_scheduler sched{&data}; + auto sndr{test_std::affine_on(std::forward(sender))}; + awaiter aw{}; + + assert(data.connected_ == 0); + assert(data.started_ == 0); + auto st{test_std::connect(std::move(sndr), receiver{sched, &aw})}; + assert(data.connected_ == count); + assert(data.started_ == 0); + test_std::start(st); + aw.await(); + + assert(data.connected_ == count); + assert(data.started_ == count); +} +} // namespace + +auto main() -> int { + static_assert(test_std::sender); + static_assert(test_std::sender); + + static_assert(not test_std::sender_in>); + + test_std::run_loop loop; + auto r{receiver(loop.get_scheduler())}; + static_assert(test_std::receiver); + auto s{test_std::get_scheduler(test_std::get_env(r))}; + assert(s == loop.get_scheduler()); + auto st{test_std::transform_sender( + test_std::default_domain(), test_std::affine_on(test_std::just(42)), test_std::get_env(r))}; + test_std::connect(std::move(st), std::move(r)); + auto s0{test_std::connect(test_std::affine_on(test_std::just(42)), receiver(loop.get_scheduler()))}; + + std::thread t{[&]() noexcept { loop.run(); }}; + auto r0 = test_std::sync_wait(test_std::affine_on(test_std::just(42))); + assert(r0); + auto [v0] = *r0; + assert(v0 == 42); + auto r1 = test_std::sync_wait(test_std::starts_on(loop.get_scheduler(), test_std::affine_on(test_std::just(42)))); + assert(r1); + auto [v1] = *r1; + assert(v1 == 42); + + test_order_of_connect(); + test_affine_on_specializations(test_std::just(42)); + test_affine_on_specializations(test_std::just(42, true, 3.14)); + test_affine_on_specializations(test_std::just_error(42)); + test_affine_on_specializations(test_std::just_stopped()); + test_affine_on_specializations(test_std::just_stopped()); + test_affine_on_specializations(test_std::read_env(test_std::get_stop_token)); + test_affine_on_specializations(test_std::write_env( + test_std::just(42), test_std::env{test_std::prop{test_std::get_stop_token, test_std::never_stop_token{}}})); + test_affine_on_specializations( + test_std::write_env(test_std::starts_on(loop.get_scheduler(), test_std::just(42)), + test_std::env{test_std::prop{test_std::get_stop_token, test_std::never_stop_token{}}}), + 1u); + test_affine_on_specializations(test_std::then(test_std::just(42), [](int) {})); + test_affine_on_specializations( + test_std::then(test_std::starts_on(loop.get_scheduler(), test_std::just(42)), [](auto&&...) {}), 1u); + test_affine_on_specializations(test_std::upon_error(test_std::just_error(42), [](int) {})); + test_affine_on_specializations( + test_std::upon_error(test_std::starts_on(loop.get_scheduler(), test_std::just_error(42)), [](auto&&...) {}), + 1u); + test_affine_on_specializations(test_std::upon_stopped(test_std::just_stopped(), [] {})); + test_affine_on_specializations( + test_std::upon_stopped(test_std::starts_on(loop.get_scheduler(), test_std::just_stopped()), [] {}), 1u); + + loop.finish(); + t.join(); + + return 0; +} diff --git a/tests/beman/execution/exec-run-loop-types.test.cpp b/tests/beman/execution/exec-run-loop-types.test.cpp index c72515e8..740a8cc8 100644 --- a/tests/beman/execution/exec-run-loop-types.test.cpp +++ b/tests/beman/execution/exec-run-loop-types.test.cpp @@ -80,11 +80,16 @@ TEST(exec_run_loop_types) { // p5: auto sender{test_std::schedule(scheduler)}; struct env {}; - static_assert(::std::same_as, + test_std::inplace_stop_source source{}; + struct token_env { + test_std::inplace_stop_token token; + auto query(const test_std::get_stop_token_t&) const noexcept { return this->token; } + }; + static_assert(::std::same_as, decltype(test_std::get_completion_signatures(sender, env{}))>); - + static_assert( + ::std::same_as, + decltype(test_std::get_completion_signatures(sender, token_env{source.get_token()}))>); // p7: static_assert(test_std::receiver_of); // p7.1: