Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ FetchContent_Declare(
execution
# SOURCE_DIR ${CMAKE_SOURCE_DIR}/../execution
GIT_REPOSITORY https://github.com/bemanproject/execution
GIT_TAG 7451ece
GIT_TAG 7d2a2b0
)
FetchContent_MakeAvailable(execution)

Expand Down
310 changes: 306 additions & 4 deletions docs/P3941-affinity.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
title: Scheduler Affinity
document: P3941R0
date: 2025-12-14
document: P3941R1
date: 2026-01-14
audience:
- Concurrency Working Group (SG1)
- Library Evolution Working Group (LEWG)
Expand Down Expand Up @@ -31,6 +31,10 @@ meet its objective at run-time.

# Change History

## R1

- added wording

## R0 Initial Revision

# Overview of Changes
Expand Down Expand Up @@ -495,6 +499,304 @@ resolution in this issue is incomplete).
The name `affine_on` isn't great. It may be worth giving the
algorithm a better name.

# Wording Changes: TODO
# Wording Changes

::: ednote
Change [exec.affine.on] to use only one parameter, require an
infallible scheduler from the receiver, and add a default implementation
which allows customization of `affine_on` for child senders:
:::

[1]{.pnum}
`affine_on` adapts a sender into one that completes on a [specified
scheduler]{.rm}[receiver's scheduler]{.add}. If the algorithm
determines that the adapted sender already completes on the correct
scheduler it can avoid any scheduling operation.

[2]{.pnum}
The name `affine_on` denotes a pipeable sender adaptor object. For
[a ]{.add} subexpression[s sch and]{.rm} `sndr`, if [`decltype((sch))`
does not satisfy scheduler, or]{.rm} `decltype((sndr))` does not
satisfy sender, <code>affine_on(sndr[, sch]{.rm})</code> is ill-formed.

[3]{.pnum}
Otherwise, the expression <code>affine_on(sndr[, sch]{.rm})</code>
is expression-equivalent to:
<code>transform_sender(_get-domain-early_(sndr), _make-sender_(affine_on,
[sch]{.rm}[env&lt;&gt;()]{.add}, sndr))</code> except that `sndr`
is evaluated only once.

[4]{.pnum}
The exposition-only class template <code>_impls-for_</code>
([exec.snd.expos]) is specialized for `affine_on_t` as follows:

```c++
namespace std::execution {
template<>
struct impls-for<affine_on_t> : default-impls {
static constexpr auto get-attrs =
[](const auto&@[ data]{.rm}]@, const auto& child) noexcept -> decltype(auto) {
return @[_JOIN-ENV_(_SCHED-ATTRS_(data),_FWD-ENV_(]{.rm}@get_env(child)@[))]{.rm}@;
};
};
}
```

:::{.add}
[?]{.pnum}
Let `sndr` and `ev` be subexpressions such that `Sndr` is
`decltype((sndr))`. If <code><i>sender-for</i>&lt;Sndr,
affine_on_t&gt;</code> is `false`, then the expression
`affine_on.transform_sender(sndr, ev)` is ill-formed; otherwise,
if otherwise, it is equal to:
Copy link

Copilot AI Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant phrase 'if otherwise' should be 'otherwise'. The text currently reads 'otherwise, if otherwise' which is duplicative.

Suggested change
if otherwise, it is equal to:
it is equal to:

Copilot uses AI. Check for mistakes.

```
auto&[_, _, child] = sndr;
using child_tag_t = tag_of_t<remove_cvref_t<decltype(child)>>;
if constexpr (requires(const child_tag_t& t){ t.affine_on(child, env); })
return t.affine_on(child, env);
Comment on lines +556 to +557
Copy link

Copilot AI Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variable name inconsistency: the parameter is named 'ev' (line 547) but used as 'env' in this code snippet. Should use 'ev' consistently throughout.

Suggested change
if constexpr (requires(const child_tag_t& t){ t.affine_on(child, env); })
return t.affine_on(child, env);
if constexpr (requires(const child_tag_t& t){ t.affine_on(child, ev); })
return t.affine_on(child, ev);

Copilot uses AI. Check for mistakes.
else
return write_env(
schedule_from(get_scheduler(get_env(ev)), write_env(std::move(child), ev)),
JOIN-ENV(env{prop{get_stop_token, never_stop_token()}}, ev)
);
```

[Note 1: This causes the `affine_on(sndr)` sender to become
`schedule_from(sch, sndr)` when it is connected with a receiver
`rcvr` whose execution domain does not customize `affine_on`,
for which `get_scheduler(get_env(rcvr))` is `sch`, and `affine_on`
isn't specialized for the child sender.
end note]

[?]{.pnum}
_Recommended Practice_: Implementations should provide `affine_on`
member functions for senders which are known to resume on the
scheduler where they were started. Example senders for which that
is the case are `just`, `just_error`, `just_stopped`, `read_env`,
and `write_env`.

:::

[5]{.pnum}
Let <code>_out_sndr_</code> be a subexpression denoting a sender
returned from <code>affine_on(sndr[, sch]{.rm})</code> or one equal
to such, and let <code>_OutSndr_</code> be the type
<code>decltype((_out_sndr_))</code>. Let <code>_out_rcvr_</code>
be a subexpression denoting a receiver that has an environment of
type `Env` such that <code>sender_in&lt;_OutSndr_, Env&gt;</code>
is `true`. [Let <code>_sch_</code> be the result of the expression
<code>get_scheduler(get_env(_out_rcvr_))</code>. If the completion
signatures of <code>schedule(_sch_)</code> contain a different
completion signature than `set_value_t()` when using an environment
where `get_stop_token()` returns an `unstoppable_token`, the
expression <code>connect(<i>out_sndr</i>, <i>out_rcvr</i>)</code> is
ill-formed.]{.add} Let `op` be an lvalue referring to the operation
state that results from connecting <code>_out_sndr_</code> to
<code>_out_rcvr_</code>. Calling <code>start(_op_)</code> will
start `sndr` on the current execution agent and execute completion
operations on <code>_out_rcvr_</code> on an execution agent of the
execution resource associated with [`sch`]{.rm}[<code>_sch_</code>]{.add}.
If the current execution resource is the same as the execution
resource associated with [`sch`]{.rm}[<code>_sch_</code>]{.add},
the completion operation on <code>_out_rcvr_</code> may be called
before <code>start(_op_)</code> completes. [If scheduling onto `sch`
fails, an error completion on <code>_out_rcvr_</code> shall be
executed on an unspecified execution agent.]{.rm}

::: ednote
Remove `change_coroutine_scheduler` from [execution.syn]:
:::

```
namespace std::execution {
...
// [exec.task.scheduler], task scheduler
class task_scheduler;

template<class E>
struct with_error {
using type = remove_cvref_t<E>;
type error;
};
template<class E>
with_error(E) -> with_error<E>;
```
::: rm
```
template<scheduler Sch>
struct change_coroutine_scheduler {
using type = remove_cvref_t<Sch>;
type scheduler;
};
template<scheduler Sch>
change_coroutine_scheduler(Sch) -> change_coroutine_scheduler<Sch>;
```
:::
```
// [exec.task], class template task
template<class T, class Environment>
class task;
...
}
```

::: ednote
Adjust the use of `affine_on` and remove `change_coroutine_scheduler` from [task.promise]:
:::

```
namespace std::execution {
template<class T, class Environment>
class task<T, Environment>::promise_type {
public:
...

template<class A>
auto await_transform(A&& a);
```
::: rm
```
template<class Sch>
auto await_transform(change_coroutine_scheduler<Sch> sch);
```
:::
```

@_unspecified_@ get_env() const noexcept;

...
}
};
```
...

```
template<sender Sender>
auto await_transform(Sender&& sndr) noexcept;
```
[9]{.pnum}
_Returns_: If `same_as<inline_scheduler, scheduler_type>` is `true` returns `as_awaitable(​std​::​​forward<Sender>(sndr), *this);` otherwise returns `as_awaitable(affine_on(​std​::​​forward<Sender>(sndr)@[, SCHED(*this)]{.rm}@), *this)`.

::: rm
```
template<class Sch>
auto await_transform(change_coroutine_scheduler<Sch> sch) noexcept;
```
[10]{.pnum}
_Effects_: Equivalent to:
```
return await_transform(just(exchange(SCHED(*this), scheduler_type(sch.scheduler))), *this);
```
:::

```
void unhandled_exception();
```
[11]{.pnum}
_Effects_: If the signature `set_error_t(exception_ptr)` is not an element of `error_types`, calls `terminate()` ([except.terminate]). Otherwise, stores `current_exception()` into <code>_errors_</code>.

...

::: ednote
In [exec.task.scheduler] change the constructor of `task_scheduler` to require that the scheduler passed
is infallible
:::

```
template<class Sch, class Allocator = allocator<void>>
requires(!same_as<task_scheduler, remove_cvref_t<Sch>>) && scheduler<Sch>
explicit task_scheduler(Sch&& sch, Allocator alloc = {});
```

::: add
[?]{.pnum}
_Mandates_: Let `e` be an environment and let `E` be `decltype(e)`.
If `unstoppable_token<decltype(get_stop_token(e))>` is `true`, then
the type `completion_signatures_of_t<decltype(schedule(sch)), E>`
only includes `set_value_t()`, otherwise it may additionally include
`set_stopped_t()`.
:::

[2]{.pnum}
_Effects_: Initialize <code><i>sch_</i></code> with `allocate_shared<remove_cvref_t<Sch>>(alloc,​ std​::​forward<Sch>​(sch))`.

[3]{.pnum}
_Recommended practice_: Implementations should avoid the use of
dynamically allocated memory for small scheduler objects.

[4]{.pnum}
_Remarks_: Any allocations performed by construction of
<code>_ts-sender_</code> or <code>_state_</code> objects resulting
from calls on `*this` are performed using a copy of `alloc`.

::: ednote
In [exec.task.scheduler] change the <code><i>ts-sender</i></code> completion signatures
to indicate that `task_scheduler` is infallible:
:::

[8]{.pnum}
```
namespace std::execution {
class task_scheduler::@_ts-sender_@ { // @_exposition only_@
public:
using sender_concept = sender_t;

template<receiver Rcvr>
@_state_@<Rcvr> connect(Rcvr&& rcvr) &&;
};
}
```

<code><i>ts-sender</i></code> is an exposition-only class that
models `sender` ([exec.snd]) and for which
<code>completion_signatures_of_t&lt;<i>ts-sender</i>[, E]{.add}&gt;</code>
denotes[:]{.rm}[ `completion_signatures<set_value_t()>` if `unstoppable_token<decltype(get_stop_token(declval<E>()))>` is `true`, and
otherwise `completion_signatures<set_value_t(), set_stopped_t()>`.]{.add}

::: rm
```
completion_signatures<
set_value_t(),
set_error_t(error_code),
set_error_t(exception_ptr),
set_stopped_t()>
```
:::

::: ednote
In [exec.run.loop.types] change the paragraph defining the completion signatures:
:::

...

```
class run-loop-sender;
```

[5]{.pnum}
<code><i>run-loop-sender</i></code> is an exposition-only type that satisfies `sender`.
[Let `E` be the type of an environment. If `unstoppable_token<decltype(get_stop_token(declval<E>()))>` is `true`,
then ]{.add} <code>completion_signatures_of_t&lt;<i>run-loop-sender</i>[, E]{.add}&gt;</code> is

::: rm
```
completion_signatures<set_value_t(), set_error_t(exception_ptr), set_stopped_t()>`
Copy link

Copilot AI Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mismatched backtick: there's a closing backtick at the end of the line that should be removed. The code block is properly closed on line 785.

Suggested change
completion_signatures<set_value_t(), set_error_t(exception_ptr), set_stopped_t()>`
completion_signatures<set_value_t(), set_error_t(exception_ptr), set_stopped_t()>

Copilot uses AI. Check for mistakes.
```
:::

::: add
```
completion_signatures<set_value_t()>
```
Otherwise it is
```
completion_signatures<set_value_t(), set_stopped_t()>
```
:::

[6]{.pnum} An instance of <code><i>run-loop-sender</i></code> remains
valid until the end of the lifetime of its associated `run_loop`
instance.

To be done.
...
2 changes: 1 addition & 1 deletion examples/bulk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ int main() {
ex::sync_wait(ex::write_env(ex::bulk(ex::just(), 16u, work{}), env{}));

ex::sync_wait(
ex::write_env([]() -> ex::task<void, ex::empty_env> { co_await ex::bulk(ex::just(), 16u, work{}); }(), env{}));
ex::write_env([]() -> ex::task<void, ex::env<>> { co_await ex::bulk(ex::just(), 16u, work{}); }(), env{}));
}
4 changes: 2 additions & 2 deletions examples/c++now-errors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ using identity_or_none_t = typename identity_or_none<T...>::type;
#if 202202 <= __cpp_lib_expected
template <ex::sender Sender>
auto as_expected(Sender&& sndr) {
using value_type = ex::value_types_of_t<Sender, ex::empty_env, std::tuple, identity_or_none_t>;
using error_type = ex::error_types_of_t<Sender, ex::empty_env, identity_or_none_t>;
using value_type = ex::value_types_of_t<Sender, ex::env<>, std::tuple, identity_or_none_t>;
using error_type = ex::error_types_of_t<Sender, ex::env<>, identity_or_none_t>;
using result_type = std::expected<value_type, error_type>;

return std::forward<Sender>(sndr) |
Expand Down
8 changes: 4 additions & 4 deletions examples/dangling-references.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ namespace ex = beman::execution;
// ----------------------------------------------------------------------------

namespace {
ex::task<int, ex::empty_env> do_work(std::string) { /* work */ co_return 0; };
ex::task<void, ex::empty_env> execute_all() {
ex::task<int, ex::env<>> do_work(std::string) { /* work */ co_return 0; };
ex::task<void, ex::env<>> execute_all() {
co_await ex::when_all(do_work("arguments 1"), do_work("arguments 2"));
co_return;
}
Expand All @@ -27,8 +27,8 @@ int main() {
ex::sync_wait([]() -> ex::task<ex::with_error<int>, error_env> { co_return ex::with_error<int>{42}; }());

ex::sync_wait(execute_all());
ex::sync_wait([]() -> ex::task<void, ex::empty_env> {
auto t = [](const int /* this would be added: &*/ v) -> ex::task<int, ex::empty_env> { co_return v; }(42);
ex::sync_wait([]() -> ex::task<void, ex::env<>> {
auto t = [](const int /* this would be added: &*/ v) -> ex::task<int, ex::env<>> { co_return v; }(42);
[[maybe_unused]] auto v = co_await std::move(t);
}());
}
2 changes: 1 addition & 1 deletion examples/issue-start-reschedules.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ ex::task<> test(auto sched) {
std::cout << "init =" << std::this_thread::get_id() << "\n";
co_await ex::starts_on(sched, ex::just());
// static_assert(std::same_as<void, decltype(ex::get_completion_signatures(ex::starts_on(sched, ex::just()),
// ex::empty_env{}))>);
// ex::env<>{}))>);
co_await ex::just();
std::cout << "final=" << std::this_thread::get_id() << "\n";
}
Expand Down
2 changes: 1 addition & 1 deletion examples/rvalue-task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ void test(T&& task) {
} // namespace

int main() {
auto task = []() -> ex::task<void, ex::empty_env> { co_return; }();
auto task = []() -> ex::task<void, ex::env<>> { co_return; }();
test(std::move(task));
// test(task);
}
Loading
Loading