diff --git a/docs/overview.md b/docs/overview.md
index e226623f..2fd3606a 100644
--- a/docs/overview.md
+++ b/docs/overview.md
@@ -639,6 +639,13 @@ The expression schedule(scheduler) creates a sender which up
### Sender Adaptors
The sender adaptors take one or more senders and adapt their respective behavior to complete with a corresponding result. The description uses the informal function completions-of(sender) to represent the completion signatures which sender produces. Also, completion signatures are combined using +: the result is the deduplicated set of the combined completion signatures.
+
+affine_on(sender) -> sender-of<completions-of(sender)>
+The expression affine_on(sender) creates
+a sender which completes on the same scheduler it was started on, even if sender changes the scheduler. The scheduler to resume on is determined using get_scheduler(get_env(rcvr)) where rcvr is the receiver the sender is connected to.
+
+The primary use of affine_on is implementing scheduler affinity for task.
+
`bulk`
@@ -698,6 +705,10 @@ The expression into_variant(sender) creates a sender which t
when_all_with_variant(sender...) -> sender
+
+write_env(sender, env) -> sender
+
+
### Sender Consumers
diff --git a/docs/tutorial.mds b/docs/tutorial.mds
index 55f36dbe..90e6ac31 100644
--- a/docs/tutorial.mds
+++ b/docs/tutorial.mds
@@ -469,7 +469,7 @@ pipe notation.
int main() {
int f{3};
- try { *tst::sync_wait(ex::just(17) | ex::let_value([f](int i){ throw f * i; return ex::just(); })); }
+ try { tst::sync_wait(ex::just(17) | ex::let_value([f](int i){ throw f * i; return ex::just(); })); }
catch (int e) {
std::cout << "e=" << e << "\n";
}
diff --git a/include/beman/execution/detail/affine_on.hpp b/include/beman/execution/detail/affine_on.hpp
new file mode 100644
index 00000000..3f3a6ed4
--- /dev/null
+++ b/include/beman/execution/detail/affine_on.hpp
@@ -0,0 +1,138 @@
+// include/beman/execution/detail/affine_on.hpp -*-C++-*-
+// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
+
+#ifndef INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_AFFINE_ON
+#define INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_AFFINE_ON
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+#include
+#include
+
+// ----------------------------------------------------------------------------
+
+namespace beman::execution::detail {
+
+/**
+ * @brief The affine_on_t struct is a sender adaptor closure that transforms a sender
+ * to complete on the scheduler obtained from the receiver's environment.
+ *
+ * This adaptor implements scheduler affinity to adapt a sender to complete on the
+ * scheduler obtained the receiver's environment. The get_scheduler query is used
+ * to obtain the scheduler on which the sender gets started.
+ */
+struct affine_on_t : ::beman::execution::sender_adaptor_closure {
+ /**
+ * @brief Adapt a sender with affine_on.
+ *
+ * @tparam Sender The deduced type of the sender to be transformed.
+ * @param sender The sender to be transformed.
+ * @return An adapted sender to complete on the scheduler it was started on.
+ */
+ template <::beman::execution::sender Sender>
+ auto operator()(Sender&& sender) const {
+ return ::beman::execution::detail::transform_sender(
+ ::beman::execution::detail::get_domain_early(sender),
+ ::beman::execution::detail::make_sender(
+ *this, ::beman::execution::env<>{}, ::std::forward(sender)));
+ }
+
+ /**
+ * @brief Overload for creating a sender adaptor from affine_on.
+ *
+ * @return A sender adaptor for the affine_on_t.
+ */
+ auto operator()() const { return ::beman::execution::detail::sender_adaptor{*this}; }
+
+ /**
+ * @brief affine_on is implemented by transforming it into a use of schedule_from.
+ *
+ * The constraints ensure that the environment provides a scheduler which is
+ * infallible and, thus, can be used to guarantee completion on the correct
+ * scheduler.
+ *
+ * The implementation first tries to see if the child sender's tag has a custom
+ * affine_on implementation. If it does, that is used. Otherwise, the default
+ * implementation gets a scheduler from the environment and uses schedule_from
+ * to adapt the sender to complete on that scheduler.
+ *
+ * @tparam Sender The type of the sender to be transformed.
+ * @tparam Env The type of the environment providing the scheduler.
+ * @param sender The sender to be transformed.
+ * @param env The environment providing the scheduler.
+ * @return A transformed sender that is affined to the scheduler.
+ */
+ template <::beman::execution::sender Sender, typename Env>
+ requires ::beman::execution::detail::sender_for && requires(const Env& env) {
+ { ::beman::execution::get_scheduler(env) } -> ::beman::execution::scheduler;
+ { ::beman::execution::schedule(::beman::execution::get_scheduler(env)) } -> ::beman::execution::sender;
+ {
+ ::beman::execution::get_completion_signatures(
+ ::beman::execution::schedule(::beman::execution::get_scheduler(env)),
+ ::beman::execution::detail::join_env(
+ ::beman::execution::env{::beman::execution::prop{::beman::execution::get_stop_token,
+ ::beman::execution::never_stop_token{}}},
+ env))
+ } -> ::std::same_as<::beman::execution::completion_signatures<::beman::execution::set_value_t()>>;
+ }
+ static auto transform_sender(Sender&& sender, const Env& env) {
+ [[maybe_unused]] auto& [tag, data, child] = sender;
+ using child_tag_t = ::beman::execution::tag_of_t<::std::remove_cvref_t>;
+
+#if 0
+ if constexpr (requires(const child_tag_t& t) {
+ {
+ t.affine_on(::beman::execution::detail::forward_like(child), env)
+ } -> ::beman::execution::sender;
+ })
+#else
+ if constexpr (::beman::execution::detail::nested_sender_has_affine_on)
+#endif
+ {
+ return child_tag_t{}.affine_on(::beman::execution::detail::forward_like(child), env);
+ } else {
+ return ::beman::execution::write_env(
+ ::beman::execution::schedule_from(
+ ::beman::execution::get_scheduler(env),
+ ::beman::execution::write_env(::beman::execution::detail::forward_like(child), env)),
+ ::beman::execution::detail::join_env(
+ ::beman::execution::env{::beman::execution::prop{::beman::execution::get_stop_token,
+ ::beman::execution::never_stop_token{}}},
+ env));
+ }
+ }
+};
+
+} // namespace beman::execution::detail
+
+namespace beman::execution {
+/**
+ * @brief affine_on is a CPO, used to adapt a sender to complete on the scheduler
+ * it got started on which is derived from get_scheduler on the receiver's environment.
+ */
+using beman::execution::detail::affine_on_t;
+inline constexpr affine_on_t affine_on{};
+} // namespace beman::execution
+
+// ----------------------------------------------------------------------------
+
+#endif
diff --git a/include/beman/execution/detail/just.hpp b/include/beman/execution/detail/just.hpp
index e5e2b264..8cd1af59 100644
--- a/include/beman/execution/detail/just.hpp
+++ b/include/beman/execution/detail/just.hpp
@@ -34,6 +34,10 @@ struct just_t {
return ::beman::execution::detail::make_sender(
*this, ::beman::execution::detail::product_type{::std::forward(arg)...});
}
+ template <::beman::execution::sender Sender>
+ static auto affine_on(Sender&& sndr, const auto&) noexcept {
+ return ::std::forward(sndr);
+ }
};
template
diff --git a/include/beman/execution/detail/let.hpp b/include/beman/execution/detail/let.hpp
index 32d30e30..e8102be4 100644
--- a/include/beman/execution/detail/let.hpp
+++ b/include/beman/execution/detail/let.hpp
@@ -162,7 +162,12 @@ struct impls_for<::beman::execution::detail::let_t> : ::beman::execu
{}};
}};
template
- static auto let_bind(auto& state, Receiver& receiver, Args&&... args) {
+ static auto
+ let_bind(auto& state, Receiver& receiver, Args&&... args) noexcept(noexcept(::beman::execution::connect(
+ ::std::apply(::std::move(state.fun),
+ ::std::move(state.args.template emplace<::beman::execution::detail::decayed_tuple>(
+ ::std::forward(args)...))),
+ let_receiver{receiver, state.env}))) {
using args_t = ::beman::execution::detail::decayed_tuple;
auto mkop{[&] {
return ::beman::execution::connect(
@@ -179,7 +184,8 @@ struct impls_for<::beman::execution::detail::let_t> : ::beman::execu
try {
let_bind(state, receiver, ::std::forward(args)...);
} catch (...) {
- ::beman::execution::set_error(::std::move(receiver), ::std::current_exception());
+ if constexpr (not noexcept(let_bind(state, receiver, ::std::forward(args)...)))
+ ::beman::execution::set_error(::std::move(receiver), ::std::current_exception());
}
} else {
Tag()(::std::move(receiver), ::std::forward(args)...);
diff --git a/include/beman/execution/detail/nested_sender_has_affine_on.hpp b/include/beman/execution/detail/nested_sender_has_affine_on.hpp
new file mode 100644
index 00000000..be374edc
--- /dev/null
+++ b/include/beman/execution/detail/nested_sender_has_affine_on.hpp
@@ -0,0 +1,20 @@
+// include/beman/execution/detail/nested_sender_has_affine_on.hpp -*-C++-*-
+// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
+
+#ifndef INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_NESTED_SENDER_HAS_AFFINE_ON
+#define INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_NESTED_SENDER_HAS_AFFINE_ON
+
+#include
+
+// ----------------------------------------------------------------------------
+
+namespace beman::execution::detail {
+template
+concept nested_sender_has_affine_on = requires(Sender&& sndr, const Env& env) {
+ { sndr.template get<2>() } -> ::beman::execution::detail::sender_has_affine_on;
+};
+} // namespace beman::execution::detail
+
+// ----------------------------------------------------------------------------
+
+#endif
diff --git a/include/beman/execution/detail/read_env.hpp b/include/beman/execution/detail/read_env.hpp
index ed1f3724..5b96137b 100644
--- a/include/beman/execution/detail/read_env.hpp
+++ b/include/beman/execution/detail/read_env.hpp
@@ -21,6 +21,10 @@
namespace beman::execution::detail {
struct read_env_t {
auto operator()(auto&& query) const { return ::beman::execution::detail::make_sender(*this, query); }
+ template <::beman::execution::sender Sender>
+ static auto affine_on(Sender&& sndr, const auto&) noexcept {
+ return ::std::forward(sndr);
+ }
};
template <>
diff --git a/include/beman/execution/detail/run_loop.hpp b/include/beman/execution/detail/run_loop.hpp
index d2c05993..4a1f8796 100644
--- a/include/beman/execution/detail/run_loop.hpp
+++ b/include/beman/execution/detail/run_loop.hpp
@@ -12,9 +12,9 @@
#include
#include
#include
-#include
#include
#include
+#include
#include
#include
@@ -53,27 +53,29 @@ class run_loop {
// NOLINTBEGIN(misc-no-recursion)
template
opstate(run_loop* l, R&& rcvr) : loop(l), receiver(::std::forward(rcvr)) {}
- auto start() & noexcept -> void {
- try {
- this->loop->push_back(this);
- } catch (...) {
- ::beman::execution::set_error(::std::move(this->receiver), ::std::current_exception());
- }
- }
+ auto start() & noexcept -> void { this->loop->push_back(this); }
// NOLINTEND(misc-no-recursion)
auto execute() noexcept -> void override {
- if (::beman::execution::get_stop_token(::beman::execution::get_env(this->receiver)).stop_requested())
- ::beman::execution::set_stopped(::std::move(this->receiver));
- else
+ using token = decltype(::beman::execution::get_stop_token(::beman::execution::get_env(this->receiver)));
+ if constexpr (not ::beman::execution::unstoppable_token) {
+ if (::beman::execution::get_stop_token(::beman::execution::get_env(this->receiver)).stop_requested())
+ ::beman::execution::set_stopped(::std::move(this->receiver));
+ else
+ ::beman::execution::set_value(::std::move(this->receiver));
+ } else
::beman::execution::set_value(::std::move(this->receiver));
}
};
struct sender {
using sender_concept = ::beman::execution::sender_t;
- using completion_signatures =
- ::beman::execution::completion_signatures<::beman::execution::set_value_t(),
- ::beman::execution::set_error_t(::std::exception_ptr),
- ::beman::execution::set_stopped_t()>;
+ template >
+ auto get_completion_signatures(Env&& env) const noexcept {
+ if constexpr (::beman::execution::unstoppable_token)
+ return ::beman::execution::completion_signatures<::beman::execution::set_value_t()>{};
+ else
+ return ::beman::execution::completion_signatures<::beman::execution::set_value_t(),
+ ::beman::execution::set_stopped_t()>{};
+ }
run_loop* loop;
@@ -100,7 +102,8 @@ class run_loop {
opstate_base* front{};
opstate_base* back{};
- auto push_back(opstate_base* item) -> void {
+ auto push_back(opstate_base* item) noexcept -> void {
+ //-dk:TODO run_loop::push_back should really be lock-free
::std::lock_guard guard(this->mutex);
if (auto previous_back{::std::exchange(this->back, item)}) {
previous_back->next = item;
@@ -109,7 +112,8 @@ class run_loop {
this->condition.notify_one();
}
}
- auto pop_front() -> opstate_base* {
+ auto pop_front() noexcept -> opstate_base* {
+ //-dk:TODO run_loop::pop_front should really be lock-free
::std::unique_lock guard(this->mutex);
this->condition.wait(guard, [this] { return this->front || this->current_state == state::finishing; });
if (this->front == this->back)
diff --git a/include/beman/execution/detail/sender_has_affine_on.hpp b/include/beman/execution/detail/sender_has_affine_on.hpp
new file mode 100644
index 00000000..742fb026
--- /dev/null
+++ b/include/beman/execution/detail/sender_has_affine_on.hpp
@@ -0,0 +1,24 @@
+// include/beman/execution/detail/sender_has_affine_on.hpp -*-C++-*-
+// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
+
+#ifndef INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_SENDER_HAS_AFFINE_ON
+#define INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_SENDER_HAS_AFFINE_ON
+
+#include
+#include
+#include
+
+// ----------------------------------------------------------------------------
+
+namespace beman::execution::detail {
+template
+concept sender_has_affine_on =
+ beman::execution::sender<::std::remove_cvref_t> && requires(Sender&& sndr, const Env& env) {
+ sndr.template get<0>();
+ { sndr.template get<0>().affine_on(std::forward(sndr), env) } -> ::beman::execution::sender;
+ };
+} // namespace beman::execution::detail
+
+// ----------------------------------------------------------------------------
+
+#endif
diff --git a/include/beman/execution/detail/then.hpp b/include/beman/execution/detail/then.hpp
index 1a342201..56078d0a 100644
--- a/include/beman/execution/detail/then.hpp
+++ b/include/beman/execution/detail/then.hpp
@@ -16,6 +16,7 @@
#include
#include
#include
+#include
#include
#include
#include
@@ -46,6 +47,11 @@ struct then_t : ::beman::execution::sender_adaptor_closure> {
domain,
::beman::execution::detail::make_sender(*this, ::std::forward(fun), ::std::forward(sender)));
}
+ template <::beman::execution::sender Sender, typename Env>
+ requires ::beman::execution::detail::nested_sender_has_affine_on
+ static auto affine_on(Sender&& sndr, const Env&) noexcept {
+ return ::std::forward(sndr);
+ }
};
template
diff --git a/include/beman/execution/detail/write_env.hpp b/include/beman/execution/detail/write_env.hpp
index 6da548a4..6fff51b3 100644
--- a/include/beman/execution/detail/write_env.hpp
+++ b/include/beman/execution/detail/write_env.hpp
@@ -11,6 +11,7 @@
#include
#include
#include
+#include
#include
#include
@@ -24,6 +25,11 @@ struct write_env_t {
*this, ::std::forward(env), ::std::forward(sender));
}
static auto name() { return "write_env_t"; }
+ template <::beman::execution::sender Sender, typename Env>
+ requires ::beman::execution::detail::nested_sender_has_affine_on
+ static auto affine_on(Sender&& sndr, const Env&) noexcept {
+ return ::std::forward(sndr);
+ }
};
template
diff --git a/include/beman/execution/execution.hpp b/include/beman/execution/execution.hpp
index c5bb7748..b68c68c7 100644
--- a/include/beman/execution/execution.hpp
+++ b/include/beman/execution/execution.hpp
@@ -38,6 +38,7 @@
#include
#include
+#include
#include
#include
#include
diff --git a/src/beman/execution/CMakeLists.txt b/src/beman/execution/CMakeLists.txt
index fc15f853..0a3e3e34 100644
--- a/src/beman/execution/CMakeLists.txt
+++ b/src/beman/execution/CMakeLists.txt
@@ -26,6 +26,7 @@ target_sources(
TYPE HEADERS
BASE_DIRS ${PROJECT_SOURCE_DIR}/include
FILES
+ ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/affine_on.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution/detail/allocator_aware_move.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution/detail/almost_scheduler.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution/detail/apply_sender.hpp
@@ -118,6 +119,7 @@ target_sources(
${PROJECT_SOURCE_DIR}/include/beman/execution/detail/meta_transform.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution/detail/meta_unique.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution/detail/movable_value.hpp
+ ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/nested_sender_has_affine_on.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution/detail/never_stop_token.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution/detail/non_assignable.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution/detail/nostopstate.hpp
@@ -149,6 +151,7 @@ target_sources(
${PROJECT_SOURCE_DIR}/include/beman/execution/detail/sender_awaitable.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution/detail/sender_decompose.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution/detail/sender_for.hpp
+ ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/sender_has_affine_on.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution/detail/sender_in.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution/detail/sends_stopped.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution/detail/set_error.hpp
diff --git a/tests/beman/execution/CMakeLists.txt b/tests/beman/execution/CMakeLists.txt
index 01511154..4f4fefc7 100644
--- a/tests/beman/execution/CMakeLists.txt
+++ b/tests/beman/execution/CMakeLists.txt
@@ -20,6 +20,7 @@ endif()
list(
APPEND execution_tests
+ exec-affine-on.test
issue-174.test
issue-186.test
exec-scope-counting.test
diff --git a/tests/beman/execution/exec-affine-on.test.cpp b/tests/beman/execution/exec-affine-on.test.cpp
new file mode 100644
index 00000000..63af703c
--- /dev/null
+++ b/tests/beman/execution/exec-affine-on.test.cpp
@@ -0,0 +1,202 @@
+// tests/beman/execution/exec-affine-on.test.cpp -*-C++-*-
+// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+#include
+#include
+#include
+
+#include
+
+// ----------------------------------------------------------------------------
+
+namespace {
+struct awaiter {
+ bool done{false};
+ std::mutex mtx{};
+ std::condition_variable cv{};
+
+ auto complete() -> bool {
+ std::lock_guard lock{this->mtx};
+ this->done = true;
+ this->cv.notify_all();
+ return true;
+ }
+ auto await() {
+ std::unique_lock lock{this->mtx};
+ this->cv.wait(lock, [&]() noexcept { return this->done; });
+ }
+};
+template
+struct receiver {
+ using receiver_concept = test_std::receiver_t;
+ Sched scheduler_;
+ awaiter* awaiter_{nullptr};
+ auto set_value(auto&&...) && noexcept -> void { this->awaiter_&& this->awaiter_->complete(); }
+ auto set_error(auto&&) && noexcept -> void { this->awaiter_&& this->awaiter_->complete(); }
+ auto set_stopped() && noexcept -> void { this->awaiter_&& this->awaiter_->complete(); }
+
+ auto get_env() const noexcept { return test_std::env{test_std::prop(test_std::get_scheduler, this->scheduler_)}; }
+};
+template
+receiver(Sched, awaiter* = nullptr) -> receiver;
+
+struct test_scheduler {
+ using scheduler_concept = test_std::scheduler_t;
+
+ struct data {
+ std::size_t connected_{};
+ std::size_t started_{};
+ };
+ data* data_;
+
+ template
+ struct state {
+ using operation_state_concept = test_std::operation_state_t;
+ std::remove_cvref_t receiver_;
+ data* data_;
+ auto start() & noexcept -> void {
+ ++this->data_->started_;
+ test_std::set_value(std::move(this->receiver_));
+ }
+ };
+ struct sender {
+ using sender_concept = test_std::sender_t;
+ using completion_signatures = test_std::completion_signatures;
+ data* data_;
+ struct env {
+ data* data_;
+ auto query(test_std::get_completion_scheduler_t) const noexcept {
+ return test_scheduler{};
+ }
+ };
+ auto get_env() const noexcept -> env { return {this->data_}; }
+ template
+ auto connect(Receiver&& rcvr) && noexcept -> state {
+ ++this->data_->connected_;
+ return {std::forward(rcvr), this->data_};
+ }
+ };
+
+ auto schedule() const noexcept { return sender{this->data_}; }
+ friend auto operator==(const test_scheduler&, const test_scheduler&) noexcept -> bool = default;
+};
+
+static_assert(test_std::scheduler);
+
+auto test_order_of_connect() -> void {
+ test_scheduler::data inner_data{};
+ test_scheduler inner_sched{&inner_data};
+
+ test_scheduler::data data{};
+ test_scheduler sched{&data};
+ auto sndr{test_std::affine_on(test_std::starts_on(inner_sched, test_std::just(42)))};
+
+ assert(data.connected_ == 0);
+ assert(data.started_ == 0);
+ assert(inner_data.connected_ == 0);
+ assert(inner_data.started_ == 0);
+ auto st{test_std::connect(std::move(sndr), receiver{sched})};
+ assert(data.connected_ == 1);
+ assert(data.started_ == 0);
+ assert(inner_data.connected_ == 1);
+ assert(inner_data.started_ == 0);
+ test_std::start(st);
+ assert(data.connected_ == 1);
+ assert(data.started_ == 1);
+ assert(inner_data.connected_ == 1);
+ assert(inner_data.started_ == 1);
+}
+
+template
+auto test_affine_on_specializations(Sender&& sender, std::size_t count = 0u) -> void {
+ test_scheduler::data data{};
+ test_scheduler sched{&data};
+ auto sndr{test_std::affine_on(std::forward(sender))};
+ awaiter aw{};
+
+ assert(data.connected_ == 0);
+ assert(data.started_ == 0);
+ auto st{test_std::connect(std::move(sndr), receiver{sched, &aw})};
+ assert(data.connected_ == count);
+ assert(data.started_ == 0);
+ test_std::start(st);
+ aw.await();
+
+ assert(data.connected_ == count);
+ assert(data.started_ == count);
+}
+} // namespace
+
+auto main() -> int {
+ static_assert(test_std::sender);
+ static_assert(test_std::sender);
+
+ static_assert(not test_std::sender_in>);
+
+ test_std::run_loop loop;
+ auto r{receiver(loop.get_scheduler())};
+ static_assert(test_std::receiver);
+ auto s{test_std::get_scheduler(test_std::get_env(r))};
+ assert(s == loop.get_scheduler());
+ auto st{test_std::transform_sender(
+ test_std::default_domain(), test_std::affine_on(test_std::just(42)), test_std::get_env(r))};
+ test_std::connect(std::move(st), std::move(r));
+ auto s0{test_std::connect(test_std::affine_on(test_std::just(42)), receiver(loop.get_scheduler()))};
+
+ std::thread t{[&]() noexcept { loop.run(); }};
+ auto r0 = test_std::sync_wait(test_std::affine_on(test_std::just(42)));
+ assert(r0);
+ auto [v0] = *r0;
+ assert(v0 == 42);
+ auto r1 = test_std::sync_wait(test_std::starts_on(loop.get_scheduler(), test_std::affine_on(test_std::just(42))));
+ assert(r1);
+ auto [v1] = *r1;
+ assert(v1 == 42);
+
+ test_order_of_connect();
+ test_affine_on_specializations(test_std::just(42));
+ test_affine_on_specializations(test_std::just(42, true, 3.14));
+ test_affine_on_specializations(test_std::just_error(42));
+ test_affine_on_specializations(test_std::just_stopped());
+ test_affine_on_specializations(test_std::just_stopped());
+ test_affine_on_specializations(test_std::read_env(test_std::get_stop_token));
+ test_affine_on_specializations(test_std::write_env(
+ test_std::just(42), test_std::env{test_std::prop{test_std::get_stop_token, test_std::never_stop_token{}}}));
+ test_affine_on_specializations(
+ test_std::write_env(test_std::starts_on(loop.get_scheduler(), test_std::just(42)),
+ test_std::env{test_std::prop{test_std::get_stop_token, test_std::never_stop_token{}}}),
+ 1u);
+ test_affine_on_specializations(test_std::then(test_std::just(42), [](int) {}));
+ test_affine_on_specializations(
+ test_std::then(test_std::starts_on(loop.get_scheduler(), test_std::just(42)), [](auto&&...) {}), 1u);
+ test_affine_on_specializations(test_std::upon_error(test_std::just_error(42), [](int) {}));
+ test_affine_on_specializations(
+ test_std::upon_error(test_std::starts_on(loop.get_scheduler(), test_std::just_error(42)), [](auto&&...) {}),
+ 1u);
+ test_affine_on_specializations(test_std::upon_stopped(test_std::just_stopped(), [] {}));
+ test_affine_on_specializations(
+ test_std::upon_stopped(test_std::starts_on(loop.get_scheduler(), test_std::just_stopped()), [] {}), 1u);
+
+ loop.finish();
+ t.join();
+
+ return 0;
+}
diff --git a/tests/beman/execution/exec-run-loop-types.test.cpp b/tests/beman/execution/exec-run-loop-types.test.cpp
index c72515e8..740a8cc8 100644
--- a/tests/beman/execution/exec-run-loop-types.test.cpp
+++ b/tests/beman/execution/exec-run-loop-types.test.cpp
@@ -80,11 +80,16 @@ TEST(exec_run_loop_types) {
// p5:
auto sender{test_std::schedule(scheduler)};
struct env {};
- static_assert(::std::same_as,
+ test_std::inplace_stop_source source{};
+ struct token_env {
+ test_std::inplace_stop_token token;
+ auto query(const test_std::get_stop_token_t&) const noexcept { return this->token; }
+ };
+ static_assert(::std::same_as,
decltype(test_std::get_completion_signatures(sender, env{}))>);
-
+ static_assert(
+ ::std::same_as,
+ decltype(test_std::get_completion_signatures(sender, token_env{source.get_token()}))>);
// p7:
static_assert(test_std::receiver_of);
// p7.1: