Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions docs/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,13 @@ The expression <code>schedule(<i>scheduler</i>)</code> creates a sender which up
### Sender Adaptors
The sender adaptors take one or more senders and adapt their respective behavior to complete with a corresponding result. The description uses the informal function <code><i>completions-of</i>(<i>sender</i>)</code> to represent the completion signatures which <code><i>sender</i></code> produces. Also, completion signatures are combined using <code>+</code>: the result is the deduplicated set of the combined completion signatures.

<details>
<summary><code>affine_on(<i>sender</i>) -> <i>sender-of</i><<i>completions-of</i>(<i>sender</i>)></code></summary>
The expression <code>affine_on(<i>sender</i>)</code> creates
a sender which completes on the same scheduler it was started on, even if <code><i>sender</i></code> changes the scheduler. The scheduler to resume on is determined using <code>get_scheduler(get_env(<i>rcvr</i>))</code> where <code><i>rcvr</i></code> is the receiver the sender is <code>connect</code>ed to.

The primary use of <code>affine_on</code> is implementing scheduler affinity for <code>task</code>.
</details>
<details>
<summary>`bulk`</summary>
</details>
Expand Down Expand Up @@ -698,6 +705,10 @@ The expression <code>into_variant(<i>sender</i>)</code> creates a sender which t
<details>
<summary><code>when_all_with_variant(<i>sender</i>...) -> <i>sender</i></code></summary>
</details>
<details>
<summary><code>write_env(<i>sender</i>, <i>env</i>) -> <i>sender</i></code></summary>
</details>


### Sender Consumers

Expand Down
2 changes: 1 addition & 1 deletion docs/tutorial.mds
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ pipe notation.

int main() {
int f{3};
try { *tst::sync_wait(ex::just(17) | ex::let_value([f](int i){ throw f * i; return ex::just(); })); }
try { tst::sync_wait(ex::just(17) | ex::let_value([f](int i){ throw f * i; return ex::just(); })); }
catch (int e) {
std::cout << "e=" << e << "\n";
}
Expand Down
138 changes: 138 additions & 0 deletions include/beman/execution/detail/affine_on.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// include/beman/execution/detail/affine_on.hpp -*-C++-*-
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception

#ifndef INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_AFFINE_ON
#define INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_AFFINE_ON

#include <beman/execution/detail/env.hpp>
#include <beman/execution/detail/forward_like.hpp>
#include <beman/execution/detail/fwd_env.hpp>
#include <beman/execution/detail/get_domain_early.hpp>
#include <beman/execution/detail/get_scheduler.hpp>
#include <beman/execution/detail/get_stop_token.hpp>
#include <beman/execution/detail/join_env.hpp>
#include <beman/execution/detail/make_sender.hpp>
#include <beman/execution/detail/never_stop_token.hpp>
#include <beman/execution/detail/prop.hpp>
#include <beman/execution/detail/schedule_from.hpp>
#include <beman/execution/detail/scheduler.hpp>
#include <beman/execution/detail/sender.hpp>
#include <beman/execution/detail/sender_adaptor.hpp>
#include <beman/execution/detail/sender_adaptor_closure.hpp>
#include <beman/execution/detail/sender_for.hpp>
#include <beman/execution/detail/sender_has_affine_on.hpp>
#include <beman/execution/detail/tag_of_t.hpp>
#include <beman/execution/detail/transform_sender.hpp>
#include <beman/execution/detail/write_env.hpp>

#include <concepts>
#include <type_traits>

// ----------------------------------------------------------------------------

namespace beman::execution::detail {

/**
* @brief The affine_on_t struct is a sender adaptor closure that transforms a sender
* to complete on the scheduler obtained from the receiver's environment.
*
* This adaptor implements scheduler affinity to adapt a sender to complete on the
* scheduler obtained the receiver's environment. The get_scheduler query is used
* to obtain the scheduler on which the sender gets started.
*/
struct affine_on_t : ::beman::execution::sender_adaptor_closure<affine_on_t> {
/**
* @brief Adapt a sender with affine_on.
*
* @tparam Sender The deduced type of the sender to be transformed.
* @param sender The sender to be transformed.
* @return An adapted sender to complete on the scheduler it was started on.
*/
template <::beman::execution::sender Sender>
auto operator()(Sender&& sender) const {
return ::beman::execution::detail::transform_sender(
::beman::execution::detail::get_domain_early(sender),
::beman::execution::detail::make_sender(
*this, ::beman::execution::env<>{}, ::std::forward<Sender>(sender)));
}

/**
* @brief Overload for creating a sender adaptor from affine_on.
*
* @return A sender adaptor for the affine_on_t.
*/
auto operator()() const { return ::beman::execution::detail::sender_adaptor{*this}; }

/**
* @brief affine_on is implemented by transforming it into a use of schedule_from.
*
* The constraints ensure that the environment provides a scheduler which is
* infallible and, thus, can be used to guarantee completion on the correct
* scheduler.
*
* The implementation first tries to see if the child sender's tag has a custom
* affine_on implementation. If it does, that is used. Otherwise, the default
* implementation gets a scheduler from the environment and uses schedule_from
* to adapt the sender to complete on that scheduler.
*
* @tparam Sender The type of the sender to be transformed.
* @tparam Env The type of the environment providing the scheduler.
* @param sender The sender to be transformed.
* @param env The environment providing the scheduler.
* @return A transformed sender that is affined to the scheduler.
*/
template <::beman::execution::sender Sender, typename Env>
requires ::beman::execution::detail::sender_for<Sender, affine_on_t> && requires(const Env& env) {
{ ::beman::execution::get_scheduler(env) } -> ::beman::execution::scheduler;
{ ::beman::execution::schedule(::beman::execution::get_scheduler(env)) } -> ::beman::execution::sender;
{
::beman::execution::get_completion_signatures(
::beman::execution::schedule(::beman::execution::get_scheduler(env)),
::beman::execution::detail::join_env(
::beman::execution::env{::beman::execution::prop{::beman::execution::get_stop_token,
::beman::execution::never_stop_token{}}},
env))
} -> ::std::same_as<::beman::execution::completion_signatures<::beman::execution::set_value_t()>>;
}
static auto transform_sender(Sender&& sender, const Env& env) {
[[maybe_unused]] auto& [tag, data, child] = sender;
using child_tag_t = ::beman::execution::tag_of_t<::std::remove_cvref_t<decltype(child)>>;

#if 0
if constexpr (requires(const child_tag_t& t) {
{
t.affine_on(::beman::execution::detail::forward_like<Sender>(child), env)
} -> ::beman::execution::sender;
})
#else
if constexpr (::beman::execution::detail::nested_sender_has_affine_on<Sender, Env>)
#endif
Comment on lines +101 to +109
Copy link

Copilot AI Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commented-out code should be removed. If the alternative implementation using #if 0 is no longer needed, remove lines 102-108 and 110 entirely, keeping only the active implementation.

Suggested change
#if 0
if constexpr (requires(const child_tag_t& t) {
{
t.affine_on(::beman::execution::detail::forward_like<Sender>(child), env)
} -> ::beman::execution::sender;
})
#else
if constexpr (::beman::execution::detail::nested_sender_has_affine_on<Sender, Env>)
#endif
if constexpr (::beman::execution::detail::nested_sender_has_affine_on<Sender, Env>)

Copilot uses AI. Check for mistakes.
{
return child_tag_t{}.affine_on(::beman::execution::detail::forward_like<Sender>(child), env);
} else {
return ::beman::execution::write_env(
::beman::execution::schedule_from(
::beman::execution::get_scheduler(env),
::beman::execution::write_env(::beman::execution::detail::forward_like<Sender>(child), env)),
::beman::execution::detail::join_env(
::beman::execution::env{::beman::execution::prop{::beman::execution::get_stop_token,
::beman::execution::never_stop_token{}}},
env));
}
}
};

} // namespace beman::execution::detail

namespace beman::execution {
/**
* @brief affine_on is a CPO, used to adapt a sender to complete on the scheduler
* it got started on which is derived from get_scheduler on the receiver's environment.
*/
using beman::execution::detail::affine_on_t;
inline constexpr affine_on_t affine_on{};
} // namespace beman::execution

// ----------------------------------------------------------------------------

#endif
4 changes: 4 additions & 0 deletions include/beman/execution/detail/just.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ struct just_t {
return ::beman::execution::detail::make_sender(
*this, ::beman::execution::detail::product_type{::std::forward<T>(arg)...});
}
template <::beman::execution::sender Sender>
static auto affine_on(Sender&& sndr, const auto&) noexcept {
return ::std::forward<Sender>(sndr);
}
};

template <typename Completion, typename... T, typename Env>
Expand Down
10 changes: 8 additions & 2 deletions include/beman/execution/detail/let.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,12 @@ struct impls_for<::beman::execution::detail::let_t<Completion>> : ::beman::execu
{}};
}};
template <typename Receiver, typename... Args>
static auto let_bind(auto& state, Receiver& receiver, Args&&... args) {
static auto
let_bind(auto& state, Receiver& receiver, Args&&... args) noexcept(noexcept(::beman::execution::connect(
::std::apply(::std::move(state.fun),
::std::move(state.args.template emplace<::beman::execution::detail::decayed_tuple<Args...>>(
::std::forward<Args>(args)...))),
let_receiver<Receiver, decltype(state.env)>{receiver, state.env}))) {
using args_t = ::beman::execution::detail::decayed_tuple<Args...>;
auto mkop{[&] {
return ::beman::execution::connect(
Expand All @@ -179,7 +184,8 @@ struct impls_for<::beman::execution::detail::let_t<Completion>> : ::beman::execu
try {
let_bind(state, receiver, ::std::forward<Args>(args)...);
} catch (...) {
::beman::execution::set_error(::std::move(receiver), ::std::current_exception());
if constexpr (not noexcept(let_bind(state, receiver, ::std::forward<Args>(args)...)))
::beman::execution::set_error(::std::move(receiver), ::std::current_exception());
}
} else {
Tag()(::std::move(receiver), ::std::forward<Args>(args)...);
Expand Down
20 changes: 20 additions & 0 deletions include/beman/execution/detail/nested_sender_has_affine_on.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// include/beman/execution/detail/nested_sender_has_affine_on.hpp -*-C++-*-
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception

#ifndef INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_NESTED_SENDER_HAS_AFFINE_ON
#define INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_NESTED_SENDER_HAS_AFFINE_ON

#include <beman/execution/detail/sender_has_affine_on.hpp>

// ----------------------------------------------------------------------------

namespace beman::execution::detail {
template <typename Sender, typename Env>
concept nested_sender_has_affine_on = requires(Sender&& sndr, const Env& env) {
{ sndr.template get<2>() } -> ::beman::execution::detail::sender_has_affine_on<Env>;
};
} // namespace beman::execution::detail

// ----------------------------------------------------------------------------

#endif
4 changes: 4 additions & 0 deletions include/beman/execution/detail/read_env.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
namespace beman::execution::detail {
struct read_env_t {
auto operator()(auto&& query) const { return ::beman::execution::detail::make_sender(*this, query); }
template <::beman::execution::sender Sender>
static auto affine_on(Sender&& sndr, const auto&) noexcept {
return ::std::forward<Sender>(sndr);
}
};

template <>
Expand Down
38 changes: 21 additions & 17 deletions include/beman/execution/detail/run_loop.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
#include <beman/execution/detail/operation_state.hpp>
#include <beman/execution/detail/scheduler.hpp>
#include <beman/execution/detail/sender.hpp>
#include <beman/execution/detail/set_error.hpp>
#include <beman/execution/detail/set_stopped.hpp>
#include <beman/execution/detail/set_value.hpp>
#include <beman/execution/detail/unstoppable_token.hpp>

#include <exception>
#include <condition_variable>
Expand Down Expand Up @@ -53,27 +53,29 @@ class run_loop {
// NOLINTBEGIN(misc-no-recursion)
template <typename R>
opstate(run_loop* l, R&& rcvr) : loop(l), receiver(::std::forward<Receiver>(rcvr)) {}
auto start() & noexcept -> void {
try {
this->loop->push_back(this);
} catch (...) {
::beman::execution::set_error(::std::move(this->receiver), ::std::current_exception());
}
}
auto start() & noexcept -> void { this->loop->push_back(this); }
// NOLINTEND(misc-no-recursion)
auto execute() noexcept -> void override {
if (::beman::execution::get_stop_token(::beman::execution::get_env(this->receiver)).stop_requested())
::beman::execution::set_stopped(::std::move(this->receiver));
else
using token = decltype(::beman::execution::get_stop_token(::beman::execution::get_env(this->receiver)));
if constexpr (not ::beman::execution::unstoppable_token<token>) {
if (::beman::execution::get_stop_token(::beman::execution::get_env(this->receiver)).stop_requested())
::beman::execution::set_stopped(::std::move(this->receiver));
else
::beman::execution::set_value(::std::move(this->receiver));
} else
::beman::execution::set_value(::std::move(this->receiver));
}
};
struct sender {
using sender_concept = ::beman::execution::sender_t;
using completion_signatures =
::beman::execution::completion_signatures<::beman::execution::set_value_t(),
::beman::execution::set_error_t(::std::exception_ptr),
::beman::execution::set_stopped_t()>;
template <typename Env = ::beman::execution::env<>>
auto get_completion_signatures(Env&& env) const noexcept {
if constexpr (::beman::execution::unstoppable_token<decltype(::beman::execution::get_stop_token(env))>)
return ::beman::execution::completion_signatures<::beman::execution::set_value_t()>{};
else
return ::beman::execution::completion_signatures<::beman::execution::set_value_t(),
::beman::execution::set_stopped_t()>{};
}

run_loop* loop;

Expand All @@ -100,7 +102,8 @@ class run_loop {
opstate_base* front{};
opstate_base* back{};

auto push_back(opstate_base* item) -> void {
auto push_back(opstate_base* item) noexcept -> void {
//-dk:TODO run_loop::push_back should really be lock-free
::std::lock_guard guard(this->mutex);
if (auto previous_back{::std::exchange(this->back, item)}) {
previous_back->next = item;
Expand All @@ -109,7 +112,8 @@ class run_loop {
this->condition.notify_one();
}
}
auto pop_front() -> opstate_base* {
auto pop_front() noexcept -> opstate_base* {
//-dk:TODO run_loop::pop_front should really be lock-free
::std::unique_lock guard(this->mutex);
this->condition.wait(guard, [this] { return this->front || this->current_state == state::finishing; });
if (this->front == this->back)
Expand Down
24 changes: 24 additions & 0 deletions include/beman/execution/detail/sender_has_affine_on.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// include/beman/execution/detail/sender_has_affine_on.hpp -*-C++-*-
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception

#ifndef INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_SENDER_HAS_AFFINE_ON
#define INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_SENDER_HAS_AFFINE_ON

#include <beman/execution/detail/sender.hpp>
#include <utility>
#include <type_traits>

// ----------------------------------------------------------------------------

namespace beman::execution::detail {
template <typename Sender, typename Env>
concept sender_has_affine_on =
beman::execution::sender<::std::remove_cvref_t<Sender>> && requires(Sender&& sndr, const Env& env) {
sndr.template get<0>();
{ sndr.template get<0>().affine_on(std::forward<Sender>(sndr), env) } -> ::beman::execution::sender;
};
} // namespace beman::execution::detail

// ----------------------------------------------------------------------------

#endif
6 changes: 6 additions & 0 deletions include/beman/execution/detail/then.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <beman/execution/detail/meta_combine.hpp>
#include <beman/execution/detail/meta_unique.hpp>
#include <beman/execution/detail/movable_value.hpp>
#include <beman/execution/detail/nested_sender_has_affine_on.hpp>
#include <beman/execution/detail/sender.hpp>
#include <beman/execution/detail/sender_adaptor.hpp>
#include <beman/execution/detail/sender_adaptor_closure.hpp>
Expand Down Expand Up @@ -46,6 +47,11 @@ struct then_t : ::beman::execution::sender_adaptor_closure<then_t<Completion>> {
domain,
::beman::execution::detail::make_sender(*this, ::std::forward<Fun>(fun), ::std::forward<Sender>(sender)));
}
template <::beman::execution::sender Sender, typename Env>
requires ::beman::execution::detail::nested_sender_has_affine_on<Sender, Env>
static auto affine_on(Sender&& sndr, const Env&) noexcept {
return ::std::forward<Sender>(sndr);
}
};

template <typename Completion>
Expand Down
6 changes: 6 additions & 0 deletions include/beman/execution/detail/write_env.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <beman/execution/detail/make_sender.hpp>
#include <beman/execution/detail/queryable.hpp>
#include <beman/execution/detail/sender.hpp>
#include <beman/execution/detail/nested_sender_has_affine_on.hpp>
#include <type_traits>
#include <utility>

Expand All @@ -24,6 +25,11 @@ struct write_env_t {
*this, ::std::forward<Env>(env), ::std::forward<Sender>(sender));
}
static auto name() { return "write_env_t"; }
template <::beman::execution::sender Sender, typename Env>
requires ::beman::execution::detail::nested_sender_has_affine_on<Sender, Env>
static auto affine_on(Sender&& sndr, const Env&) noexcept {
return ::std::forward<Sender>(sndr);
}
};

template <typename NewEnv, typename Child, typename Env>
Expand Down
1 change: 1 addition & 0 deletions include/beman/execution/execution.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include <beman/execution/detail/connect.hpp>
#include <beman/execution/detail/schedule.hpp>

#include <beman/execution/detail/affine_on.hpp>
#include <beman/execution/detail/bulk.hpp>
#include <beman/execution/detail/continues_on.hpp>
#include <beman/execution/detail/into_variant.hpp>
Expand Down
Loading