diff --git a/CMakeLists.txt b/CMakeLists.txt index 8563055..a1c58ec 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -36,7 +36,7 @@ FetchContent_Declare( execution # SOURCE_DIR ${CMAKE_SOURCE_DIR}/../execution GIT_REPOSITORY https://github.com/bemanproject/execution - GIT_TAG 686685c + GIT_TAG 7451ece ) FetchContent_MakeAvailable(execution) diff --git a/Makefile b/Makefile index 98ca772..d0aa522 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ #-dk: note to self: PATH=/opt/llvm-19.1.6/bin:$PATH LDFLAGS=-fuse-ld=lld -.PHONY: config test default compile clean distclean doc html pdf format clang-format tidy +.PHONY: config test default compile clean distclean doc docs html pdf format clang-format tidy BUILDDIR = build PRESET = gcc-release @@ -12,7 +12,7 @@ BUILD = $(BUILDDIR)/$(PRESET) default: compile -doc: +docs doc: cd docs; $(MAKE) pdf html: diff --git a/docs/Makefile b/docs/Makefile index 833fdbf..9594236 100644 --- a/docs/Makefile +++ b/docs/Makefile @@ -4,7 +4,7 @@ default: doc doc: doc-html -doc-html: P3552-task.html P3796-task-issues.html +doc-html: P3552-task.html P3796-task-issues.html P3941-affinity.html doc-pdf: P3552-task.pdf P3796-task-issues.pdf diff --git a/docs/P3941-affinity.md b/docs/P3941-affinity.md new file mode 100644 index 0000000..ea07814 --- /dev/null +++ b/docs/P3941-affinity.md @@ -0,0 +1,500 @@ +--- +title: Scheduler Affinity +document: P3941R0 +date: 2025-12-14 +audience: + - Concurrency Working Group (SG1) + - Library Evolution Working Group (LEWG) + - Library Working Group (LWG) +author: + - name: Dietmar Kühl (Bloomberg) + email: +source: + - https://github.com/bemanproject/task/doc/issues.md +toc: true +--- + +

+One important design of `std::execution::task` is that a coroutine +resumes after a `co_await` on the same scheduler as the one it was +executing on prior to the `co_await`. To achieve this, `task` +transforms the awaited object `@_obj_@` using +`affine_on(@_obj_@, @_sched_@)` where `@_sched_@` is the corresponding +scheduler. There were multiple concerns raised against the specification +of `affine_on` and discussed as part of +[P3796R1](https://wg21.link/P3796R1). This proposal is intended +to specifically address the concerns raised relating to `task`'s +scheduler affinity and in particular `affine_on`. The gist of this +proposal is impose constraints on `affine_on` to guarantee it can +meet its objective at run-time. +

+ +# Change History + +## R0 Initial Revision + +# Overview of Changes + +

+There are a few NB comments raised about the way `affine_on` works: +

+ +

+The discussion on `affine_on` revealed some aspects which were not +quite clear previously and taking these into account points towards +a better design than was previously specified: +

+
    +
  1. + To implement scheduler affinity the algorithm needs to know the + scheduler on which it was started itself. The correct receiver + may actually be hard to determine while building the work graph. + However, this scheduler can be communicated using + `get_scheduler(get_env(@_rcvr_@))` when an algorithm + is `start`ed. This requirement is more general than just + `affine_on` and is introduced by + [P3718R0](https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2025/p3718r0.html): + with this guarantee in place, `affine_on` only needs one + parameter, i.e., the sender for the work to be executed. +
  2. +
  3. + The scheduler `@_sched_@` on which the work needs + to resume has to guarantee that it is possible to resume on the + correct execution agent. The implication is that scheduling work needs + to be infallible, i.e., the completion signatures of + `scheduler(@_sched_@)` cannot contain a + `set_error_t(E)` completion signature. This requirement should + be checked statically. +
  4. +
  5. + The work needs to be resumed on the correct scheduler even when + the work is stopped, i.e., the scheduling operation shall be + `connect`ed to a receiver whose environment's `get_stop_token` + query yields an `unstoppable_token`. In addition, the + schedule operation shall not have a `set_stopped_t()` completion + signature if the environment's `get_stop_token` query yields + an `unstoppable_token`. This requirement should also be checked + statically. +
  6. +
  7. + When a sender knows that it will complete on the scheduler it + was start on, it should be possible to customise the `affine_on` + algorithm to avoid rescheduling. This customisation can be + achieved by `connect`ing to the result of an `affine_on` member + function called on the child sender, if such a member function + is present, when `connect`ing an `affine_on` sender. +
  8. +
+ +

+None of these changes really contradict any earlier design: the +shape and behaviour of the `affine_on` algorithm wasn't fully fleshed +out. Tightening the behaviour scheduler affinity and the `affine_on` +algorithm has some implications on some other components: +

+
    +
  1. + If `affine_on` requires an infallible scheduler modelled at least + `inline_scheduler`, `task_scheduler`, and `run_loop::scheduler` + should be infallible (i.e., they always complete successfully + with `set_value()`). `parallel_scheduler` can probably not be + made infallible. +
  2. +
  3. + The scheduling semantics when changing a `task`'s scheduler + using `co_await change_coroutine_scheduler(@_sch_@)` + become somewhat unclear and this functionality should be removed. + Similar semantics are better modelled using + `co_await on(@_sch_@, @_nested-task_@)`. +
  4. +
  5. + The name `affine_on` isn't particular good and wasn't designed. + It may be worth renaming the algorithms to something different. +
  6. +
+ +# Discussion of Changes + +## `affine_on` Shape + +

+The original proposal for `task` used `continues_on` to schedule +the work back on the original scheduler. This algorithm takes the +work to be executed and the scheduler on which to continue as +arguments. When SG1 requested that a similar but different algorithms +is to be used to implement scheduler affinity, `continues_on` was +just replaced by `affine_on` with the same shape but the potential +to get customised differently. +

+

+The scheduler used for affinity is the scheduler communicated via +the `get_scheduler` query on the receiver's environment: the scheduler +argument passed to the `affine_on` algorithm would need to match +the scheduler obtained from `get_scheduler` query. In the context +of the `task` coroutine this scheduler can be obtained via the +promise type but in general it is actually not straight forward to +get hold of this scheduler because the receiver and hence its +associated scheduler is only provided by `connect`. It is much more +reasonable to have `affine_on` only take the work, i.e., a sender, +as argument and determine the scheduler to resume on from the +receiver's +environment in `connect`. +

+

+Thus, instead of using +```c++ +affine_on(@_sndr_@, @_sch_@) +``` +the algorithm is used just with the sender: +```c++ +affine_on(@_sndr_@) +``` +

+

+Note that this change implies that an operation state resulting +from `connect`ing `affine_on` to a receiver `@_rcvr_@` +is `start`ed on the execution agent associated with the scheduler obtained +from `get_scheduler(get_env(@_rcvr_@))`. The same +requirement is also assumed to be met when `start`ing the operation +state resulting from `connect`ing a `task`. While it is possible to +statically detect whether the query is valid and provides a scheduler +it cannot be detected if the scheduler matches the execution agent on which +`start` was called. +[P3718r0](https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2025/p3718r0.html) +proposes to add this exact requirement to +[[exec.get.scheduler]](https://wg21.link/exec.get.scheduler). +

+

+This change addresses [US 234-364](https://github.com/cplusplus/nbballot/issues/939) ([LWG4331](https://cplusplus.github.io/LWG/issue4331)). +

+ +## Infallible Schedulers +

+The objective of `affine_on(@_sndr_@)` is to execute `@_sndr_@` and +to complete on the execution agent on which the operation was +`start`ed. Let `sch` be the scheduler obtained from +`get_scheduler(get_env(@_rcvr_@))` where `@_rcvr_@` is the receiver +used when `connect`ing `affine_on(@_sndr_@)` (the discussion in +this section also applies if the scheduler would be taken as a +parameter, i.e., if the [previous change](#affine_on-shape) isn't +applied this discussion still applies). If `connect`ing the result +of `schedule(@_sch_@)` fails (i.e., `connect(schedule(@_sch_@), +@_rcvr_@)` throws where `@_rcvr_@` is a suitable receiver), `affine_on` +can avoid `start`ing the main work and fail on the execution agent +where it was `start`ed. Otherwise, if it obtained an operation +state `@_os_@` from `connect(scheduler(@_sch_@), @_rcvr_@)`, +`affine_on` would `start` its main work and would `start(@_os_@)` +on the execution agent where the main work completed. If `start(@_os_@)` +is always successful, `affine_on` can achieve its objective. However, +if this scheduling operation fails, i.e., it completes with +`set_error(@_e_@)`, or if it gets cancelled, i.e., it completes +with `set_stopped()`, the execution agent on which the scheduling +operation resumes is unclear and `affine_on` cannot guarantee its +promise. Thus, it seems reasonable to require that a scheduler used +with `affine_on` is infallible, at least when used appropriately +(i.e., when providing a receiver whose associated stop token is an +`unstoppable_token`). + +

+

+The current working draft specifies 4 schedulers: +

+
    +
  1. + [`inline_scheduler`](https://wg21.link/exec.inline.scheduler) which + just completes with `set_value()` when `start()`ed, i.e., this + scheduler is already infallible. +
  2. +
  3. + [`task_scheduler`](https://wg21.link/exec.task.scheduler) is a + type-erased scheduler delegating to another scheduler. If the + underlying scheduler is infallible, the only error case for + `task_scheduler` is potential memory allocation during `connect` + of its `@_ts-sender_@`. If `affine_on` creates an operation state + for the scheduling operation during `connect`, it can guarantee + that any necessary scheduling operation succeeds. Thus, this + scheduler can be made infallible. +
  4. +
  5. + The [`run_loop::@_run-loop-scheduler_@`](https://wg21.link/exec.run.loop) + is used by [`run_loop`](https://wg21.link/exec.run.loop). The + current specification allows the scheduling operation to fail + with `set_error_t(std::exception_ptr)`. This permission allows + an implementation to use [`std::mutex`](https://wg21.link/thread.mutex) + and [`std::condition_variable`](https://wg21.link/thread.condition) + whose operations may throw. It is possible to implement the logic + using atomic operations which can't throw. The `set_stopped()` + completion is only used when the receiver's stop token, i.e. the + result of `get_stop_token(get_env(@_rcvr_@))`, was stopped. This + receiver is controlled by `affine_on`, i.e., it can provide a + [`never_stoptoken`](https://wg21.link/stoptoken.never) and this + scheduler won't complete with `set_stopped()`. If the + [`get_completion_signatures`](https://wg21.link/exec.getcomplsigs) for + the corresponding sender takes the environment into account, this + scheduler can also be made infallible. +
  6. +
  7. + The [`parallel_scheduler`](https://wg21.link/exec.par.scheduler) + provides an interface to a replaceable implementation of a thread + pool. The current interface allows + [`parallel_scheduler`](https://wg21.link/exec.par.scheduler) to + complete with `set_error_t(std::exception_ptr)` as well as with + `set_stopped_t()`. It seems unlikely that this interface can be + constrained to make it infallible. +
  8. +
+

+In general it seems unlikely that all schedulers can be constrained +to be infallible. As a result `affine_on` and, by extension, `task` +won't be usable with all schedulers if `affine_on` insists on using +only infallible schedulers. If there are fallible schedulers, there +aren't any good options for using them with a `task`. Note that +`affine_on` can fail and get cancelled (due to the main work failing +or getting cancelled) but `affine_on` can still guarantee that +execution resumes on the expect execution agent when it uses an +infallible scheduler. +

+

+This change addresses +[US 235-363](https://github.com/cplusplus/nbballot/issues/938) +([LWG4332](https://cplusplus.github.io/LWG/issue4332)). This change +goes beyond the actual issue and clarifies that the scheduling +operation used be `affine_on` needs to be always successful. +

+ +### Require Infallible Schedulers For `affine_on` + +

+If `affine_on` promises in all cases that it resumes on the +original scheduler it can only work with infallible schedulers. +If a users wants to use a fallible scheduler with `affine_on` or +`task` the scheduler will need to be adapted. The adapted scheduler +can define what it means when the underlying scheduler fails. There +are conceptually only two options (the exact details may vary) on how +to deal with a failed scheduling operation: +

+
    +
  1. +The user can transform the scheduling failure into a call to +`std::terminate`. +
  2. +
  3. +The user can consider resuming on an execution agent where the +adapting scheduler can schedule to infallibly (e.g., the execution +agent on which operation completed) but which is different from +execution agent associated with the adapted scheduler to be suitable +to continue running. In that case the scheduling operation would +just succeed without necessarily running on the correct execution +agent. However, there is no indication that scheduling to the adapted +scheduler failed and the scheduler affinity may be impacted in this +failure case. +
  4. +
+ +The standard library doesn't provide a way to adapt schedulers +easily. However, it can certainly be done. + +### Allow Fallible Schedulers For `affine_on` + +

+If the scheduler used with `affine_on` is allowed to fail, `affine_on` +can't guarantee that it completes on the correct scheduler in case of +an error completion. It could be specified that `affine_on` completes +with `set_error(@_rcvr_@, scheduling_error{@_e_@})` when the scheduling +operation completes with `set_error(@_r_@, @_e_@)` to make it detectable +that it didn't complete on the correct scheduler. This situation is +certainly not ideal but, at least, only affects the error completion and +it can be made detectable. +

+

+A use of `affine_on` which always needs to complete on a specific scheduler +is still possible: in that case the user will need to make sure that the +used scheduler is infallible. The main issue here is that there is no +automatic static checking whether that is the case. +

+ +### Considerations On Infallible Schedulers + +In an ideal world, all schedulers would be infallible. It is unclear +if that is achievable. If schedulers need to be allowed to be fallible, +it may be viable to require that all standard library schedulers +are infallible. As outlined above that should be doable for all current +schedulers except, possibly, `parallel_scheduler`. So, the proposed +change is to require schedulers to be infallible when being used with +`affine_on` (and, thus, being used by `task`) and to change as many of +the standard C++ libraries to be infallible as possible. + +If constraining `affine_on` to only infallible schedulers turns out +to be too strong, the constraint can be relaxed in a future revision +of the standard by explicitly opting out of that constraints, e.g., +using an additional argument. For `task` to make use of it, it too +would need an explicit mechanisms to indicate that its `affine_on` +use should opt out of the constraint, e.g., by adding a suitable +`static` member to the environment template argument. + +## `affine_on` Customisation + +Senders which don't cause the execution agent to be changed like +`just` or the various queries should be able to customise `affine_on` +to avoid unnecessary scheduling. Sadly, a proposal +([P3206](https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2025/p3206r0.pdf)) +to standardise properties which could be used to determine how a +sender completes didn't make much progress, yet. An implementation +can make use of similar techniques using an implementation-specific +protocol. If a future standard defines a standard approach to +determine the necessary properties the implementation can pick up +on those. + +The idea is to have `affine_on` define a `transform_sender(s)` +member function which determines what sender should be returned. +By default the argument is returned but if the child sender indicates +that it doesn't actually change the execution agent the function +would return the child sender. There are a number of senders for +which this can be done: + +- `just`, `just_error`, and `just_stopped` +- `read_env` and `write_env` +- `then`, `upon_error`, and `upon_stopped` if the child sender + doesn't change the execution agent + +The proposal is to define a `transform_sender` member which uses +an implementation-specific property to determine that a sender +completes on the same execution agent as the one it was started on. +In addition, it is recommended that this property gets defined by +the various standard library senders where it can make a difference. + +This change addresses +[US 232-366](https://github.com/cplusplus/nbballot/issues/941) +([LWG4329](https://cplusplus.github.io/LWG/issue4329)), although +not in a way allowing application code to plug into this mechanism. +Such an approach can be designed in a future revision of the standard. + +## Removing `change_coroutine_scheduler` + +The current working paper specifies `change_coroutine_scheduler` to change +the scheduler used by the coroutine for scheduler affinity. It turns out that +this use is somewhat problematic in two ways: + +1. Changing the scheduler affects the coroutine until the end of + the coroutine or until `change_coroutine_scheduler` is `co_await`ed + again. It doesn't automatically reset. Thus, local variables + constructed before `change_coroutine_scheduler(s)` was + `co_await`ed were constructed on the original scheduler and are + destroyed on the replaced scheduler. +2. The `task`'s execution may finish on a different than the original + scheduler. To allow symmetric transfer between two `task`s each + `task` needs to complete on the correct scheduler. Thus, the + `task` needs to be prepared to change to the original scheduler + before actually completing. To do so, it is necessary to know + the original scheduler and also to have storage for the state + needed to change to a different scheduler. It can't be statically + detected whether `change_coroutine_scheduler(s)` is `co_await`ed + in the body of a coroutine and, thus, the necessary storage and + checks are needed even for `task`s which don't use + `change_coroutine_scheduler`. + +If there were no way to change the scheduler it would still be possible +to execute using a different scheduler, although not as direct: +instead of using `co_await change_coroutine_scheduler(s)` to change +the scheduler used for affinity to `s` a nested `task` executing on `s` +could be `co_await`ed: + +```c++ +co_await ex::starts_on(s, [](@_parameters_@)->task<@_T_@, @_E_@> { @_logic_@ }(@_arguments_@)); +``` + +Using this approach the use of the scheduler `s` is clearly limited +to the nested coroutine. The scheduler affinity is fully taken care +of by the use of `affine_on` when `co_await`ing work. There is no +need to provide storage or checks needed for the potential of +having a `task` return to the original scheduler if the scheduler +isn't actually changed by a `task`. + +The proposal is remove `change_coroutine_scheduler` and the possibility +of changing the scheduler within a `task`. The alternative to +controlling the scheduler used for affinity from within a `task` +is a bit verbose. This need under the control of the coroutine is +likely relatively rare. Replacing the used scheduler for an existing +`task` by nesting it within `on(s, t)` or `starts_on(s, t)` is +fairly straightforward. + +This functionality was originally included because it is present +for, at least, one of the existing libraries, although in a form +which was recommended against. The existing use changes the scheduler +of a coroutine when `co_await`ing the result of `schedule(s)`; this +exact approach was found to be fragile and surprising and the +recommendation was to provide the functionality more explicit. + +This change is not associated with any national body comment. +However, it is still important to do! It isn't adding any new +functionality but removes a problematic way to achieve something +which can be better achieved differently. If this change is not +made the inherent cost of having the possibility of having +`change_routine_scheduler` can't be removed later without breaking +existing code. + +## `affine_on` Default Implementation + +Using the previous discussion leads to a definition of `affine_on` which +is quite different from effectively just using `continues_on`: + +1. The class `affine_on` + should define a `transform_sender` member function which returns the + child sender if this child sender indicates via an implementation + specific way that it doesn't change the execution agent. It + should be recommended that some of the standard library sender + algorithms (see above) to indicate that they don't change the + execution agent. +2. The `affine_on` algorithm should only allow to get `connect`ed to a + receiver `r` whose scheduler `sched` obtained by + `get_scheduler(get_env(r))` is infallible, i.e., + `get_completion_signatures(schedule(sched), e)` with an environment + `e` where `get_stop_token(e)` yields `never_stop_token` returns + `completion_signatures`. +3. When `affine_on` gets `connect`ed, the scheduling operation state needs + to be created by `connect`ing the scheduler's sender to a suitable receiver to guarantee + that the completion can be scheduled on the execution agent. + The stop token `get_stop_token(get_env(r))` for the receiver + `r` used for this `connect` shall be an `unstoppable_token`. + The child sender also needs to be `connect`ed with a receiver + which will capture the respective result upon completion and + start the scheduling operation. +4. When the result operation state gets `start`ed it `start`s the + operation state from the child operation. +5. Upon completion of the child operation the kind of completion and + the parameters, if any, are stored. If this operation throws, + the storage is set up to be as if `set_error(current_exception)` + were called. Once the parameters are stored, the scheduling + operation is started. +6. Upon completion of the scheduling operation, the appropriate + completion function with the respective arguments is invoked. + +This behaviour is similar to `continues_on` but is subtly different +with respect to when the scheduling operation state needs to be +created and that any stop token from the receiver doesn't get +forwarded. In addition `affine_on` is more constrained with respect +to the schedulers it supports and the shape of the algorithm is +different: `affine_on` gets the scheduler to execute on from the +receiver it gets `connect`ed to. + +This change addresses +[US 233-365](https://github.com/cplusplus/nbballot/issues/940) +([LWG4330](https://cplusplus.github.io/LWG/issue4330)) and +[US 236-362](https://github.com/cplusplus/nbballot/issues/937) +([LWG](https://cplusplus.github.io/LWG/issue4344); the proposed +resolution in this issue is incomplete). + +## Name Change + +The name `affine_on` isn't great. It may be worth giving the +algorithm a better name. + +# Wording Changes: TODO + +To be done. diff --git a/docs/affinity.md b/docs/affinity.md deleted file mode 100644 index d5335a0..0000000 --- a/docs/affinity.md +++ /dev/null @@ -1,86 +0,0 @@ ---- -title: Scheduler Affinity -document: D???? -date: 2025-11-23 -audience: - - Concurrency Working Group (SG1) - - Library Evolution Working Group (LEWG) - - Library Working Group (LWG) -author: - - name: Dietmar Kühl (Bloomberg) - email: -source: - - https://github.com/bemanproject/task/doc/issues.md -toc: true ---- - -

-One important design of `std::execution::task` is that a coroutine -resumes after a `co_await` on the same scheduler as the one it was -executing on prior to the `co_await`. To achieve this, `task` -transforms the awaited object obj using -affine_on(obj, sched) where -sched is the corresponding scheduler. There -were multiple concerns raised against the specification of `affine_on` -and discussed as part of [P3796R1](https://wg21.link/P3796R1). This -proposal is intended to specifically address the concerns raised -relating to `task`'s scheduler affinity and in particular `affine_on`. -The gist of this proposal is impose constraints on `affine_on` to -guarantee it can its objective at run-time. -

- -# Change History - -## R0 Initial Revision - -# Discussion - -

-There are a few NB comments raised about the way `affine_on` works: -

-
    -
  • [US 232-366](https://github.com/cplusplus/nbballot/issues/941): specify customization of `affine_on` when the scheduler doesn't change.
  • -
  • [US 233-365](https://github.com/cplusplus/nbballot/issues/940): clarify `affine_on` vs. `continues_on`.
  • -
  • [US 234-364](https://github.com/cplusplus/nbballot/issues/939): remove scheduler parameter from `affine_on`.
  • -
  • [US 235-363](https://github.com/cplusplus/nbballot/issues/938): `affine_on` should not forward the stop token to the scheduling operation.
  • -
  • [US 236-362](https://github.com/cplusplus/nbballot/issues/937): specify default implementation of `affine_on`.
  • -
-

-The discussion on `affine_on` revealed some aspects which were not -quite clear previously and taking these into account points towards -a better design than was previously documented: -

-
    -
  1. - To implement scheduler affinity the algorithm needs to know the - scheduler on which it was started itself. The correct receiver - may actually be hard to determine while build the work graph. - However, this scheduler is communicated using - get_scheduler(get_env(rcvr)) when an algorithm - is `start`ed. This requirement is more general than just - `affine_on` and is introduced by - [P3826R2](https://isocpp.org/files/papers/P3826R2.html) *TODO* - verify the reference: with this guarantee in place, `affine_on` - only needs one parameter, i.e., the sender for the work to be - executed. -
  2. -
  3. - The scheduler sched on which the work needs - to resume has to guarantee that it is possible to resume in the - correct context. The implication is that scheduling work needs - to be infallible, i.e., the completion signatures of - scheduler(sched) cannot contain a - `set_error_t(E)` completion signature. This requirement should - be checked statically. -
  4. -
  5. - The work needs to be resumed on the correct scheduler even when - the work is stopped, i.e., the scheduling operation shall be - `connect`ed to a receiver whose environment's `get_stop_token` - query yields an `unstoppable_token`. In addition, the - schedule operation shall not have a `set_stopped_t()` completion - signature if the environment's `get_stop_token` query yields - an `unstoppable_token`. This requirement should also be checked - statically. -
  6. -
diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index f2c9f34..f344a1f 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -1,22 +1,24 @@ # SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception -set(TODO tls-scheduler) +set(TODO tls-scheduler into_optional issue-start-reschedules loop) set(ALL_EXAMPLES - odd-return + task_scheduler bulk + c++now-allocator + c++now-cancel + c++now-errors + alloc + affinity + odd-return dangling-references rvalue-task aggregate-return customize + issue-affine_on issue-symmetric-transfer - affinity - alloc c++now-affinity - c++now-allocator c++now-basic - c++now-cancel - c++now-errors c++now-query c++now-result-types c++now-return @@ -29,15 +31,10 @@ set(ALL_EXAMPLES escaped-exception friendly hello - into_optional - issue-affine_on issue-frame-allocator - issue-start-reschedules - loop query result_example stop - task_scheduler ) set(xALL_EXAMPLES issue-symmetric-transfer) set(xALL_EXAMPLES customize) diff --git a/examples/issue-affine_on.cpp b/examples/issue-affine_on.cpp index f89663d..2b9b709 100644 --- a/examples/issue-affine_on.cpp +++ b/examples/issue-affine_on.cpp @@ -14,6 +14,11 @@ ex::task<> test(Sender&& sender) { int main() { ex::sync_wait(test(ex::just())); + ex::sync_wait(test(ex::read_env(ex::get_scheduler))); +#if 0 + ex::sync_wait(test(ex::read_env(ex::get_scheduler) | + ex::let_value([](auto sched) noexcept { return ex::just(); }))); ex::sync_wait(test(ex::read_env(ex::get_scheduler) | ex::let_value([](auto sched) { return ex::starts_on(sched, ex::just()); }))); +#endif } diff --git a/examples/issue-start-reschedules.cpp b/examples/issue-start-reschedules.cpp index f960d41..89d721a 100644 --- a/examples/issue-start-reschedules.cpp +++ b/examples/issue-start-reschedules.cpp @@ -12,6 +12,9 @@ namespace ex = beman::execution; ex::task<> test(auto sched) { std::cout << "init =" << std::this_thread::get_id() << "\n"; co_await ex::starts_on(sched, ex::just()); + // static_assert(std::same_as); + co_await ex::just(); std::cout << "final=" << std::this_thread::get_id() << "\n"; } @@ -23,8 +26,11 @@ int main() { ex::then([] { std::cout << "loop1=" << std::this_thread::get_id() << "\n"; })); ex::sync_wait(ex::schedule(loop2.get_scheduler()) | ex::then([] { std::cout << "loop2=" << std::this_thread::get_id() << "\n"; })); - std::cout << "--- use 1 ---\n"; - ex::sync_wait(test(loop2.get_scheduler())); - std::cout << "--- use 2 ---\n"; - ex::sync_wait(ex::starts_on(loop1.get_scheduler(), test(loop2.get_scheduler()))); + try { + std::cout << "--- use 1 ---\n"; + ex::sync_wait(test(loop2.get_scheduler())); + std::cout << "--- use 2 ---\n"; + // ex::sync_wait(ex::starts_on(loop1.get_scheduler(), test(loop2.get_scheduler()))); + } catch (...) { + } } diff --git a/include/beman/task/detail/affine_on.hpp b/include/beman/task/detail/affine_on.hpp index 0bb5baf..14de6d2 100644 --- a/include/beman/task/detail/affine_on.hpp +++ b/include/beman/task/detail/affine_on.hpp @@ -5,8 +5,12 @@ #define INCLUDED_INCLUDE_BEMAN_TASK_DETAIL_AFFINE_ON #include +#include #include +#include #include +#include +#include // ---------------------------------------------------------------------------- @@ -33,57 +37,138 @@ struct affine_on_t::sender : ::beman::execution::detail::product_type<::beman::t template auto get_completion_signatures(const Env& env) const& noexcept { - if constexpr (elide_schedule()))>) { - return ::beman::execution::get_completion_signatures( - ::std::remove_cvref_t(::std::move(this->template get<2>())), env); - } else { - return ::beman::execution::get_completion_signatures( - ::beman::execution::continues_on(this->template get<1>(), ::beman::execution::get_scheduler(env)), - env); - } + return ::beman::execution::get_completion_signatures(::std::remove_cvref_t(this->template get<1>()), + env); } template auto get_completion_signatures(const Env& env) && noexcept { - if constexpr (elide_schedule()))>) { - return ::beman::execution::get_completion_signatures( - ::std::remove_cvref_t(::std::move(this->template get<1>())), env); - } else { - return ::beman::execution::get_completion_signatures( - ::beman::execution::continues_on(::std::move(this->template get<1>()), - ::beman::execution::get_scheduler(env)), - env); - } + return ::beman::execution::get_completion_signatures( + ::std::remove_cvref_t(::std::move(this->template get<1>())), env); } + template <::beman::execution::receiver Receiver> + struct state { + template + struct to_tuple_t; + template + struct to_tuple_t { + using type = ::std::tuple...>; + }; + template + struct to_variant_t; + template + struct to_variant_t<::beman::execution::completion_signatures> { + using type = ::beman::execution::detail::meta::unique<::std::variant::type...>>; + }; + + using operation_state_concept = ::beman::execution::operation_state_t; + using completion_signatures = decltype(::beman::execution::get_completion_signatures( + ::std::declval(), ::beman::execution::get_env(::std::declval()))); + using value_type = typename to_variant_t::type; + + struct schedule_receiver { + using receiver_concept = ::beman::execution::receiver_t; + state* s; + auto set_value() noexcept -> void { + static_assert(::beman::execution::receiver); + std::visit( + [this](auto&& v) -> void { + std::apply( + [this](auto tag, auto&&... a) { tag(std::move(this->s->receiver), ::std::move(a)...); }, + v); + }, + s->value); + } + auto get_env() const noexcept -> ::beman::execution::empty_env { /*-dk:TODO */ return {}; } + }; + + struct work_receiver { + using receiver_concept = ::beman::execution::receiver_t; + state* s; + template + auto set_value(T&&... args) noexcept -> void { + static_assert(::beman::execution::receiver); + this->s->value + .template emplace<::std::tuple<::beman::execution::set_value_t, ::std::remove_cvref_t...>>( + ::beman::execution::set_value, ::std::forward(args)...); + this->s->sched_op_.start(); + } + template + auto set_error(E&& error) noexcept -> void { + static_assert(::beman::execution::receiver); + this->s->value + .template emplace<::std::tuple<::beman::execution::set_error_t, ::std::remove_cvref_t>>( + ::beman::execution::set_error, ::std::forward(error)); + this->s->sched_op_.start(); + } + auto set_stopped(auto&&...) noexcept -> void { + static_assert(::beman::execution::receiver); + this->s->value.template emplace<::std::tuple<::beman::execution::set_stopped_t>>( + ::beman::execution::set_stopped); + this->s->sched_op_.start(); + } + auto get_env() const noexcept -> decltype(::beman::execution::get_env(::std::declval())) { + return ::beman::execution::get_env(this->s->receiver); + } + }; + using scheduler_t = + decltype(::beman::execution::get_scheduler(::beman::execution::get_env(::std::declval()))); + using schedule_op = decltype(::beman::execution::connect( + ::beman::execution::schedule(::std::declval()), ::std::declval())); + using work_op = + decltype(::beman::execution::connect(::std::declval(), ::std::declval())); + + ::std::remove_cvref_t receiver; + value_type value; + schedule_op sched_op_; + work_op work_op_; + + template + explicit state(S&& s, R&& r) + : receiver(::std::forward(r)), + sched_op_(::beman::execution::connect(::beman::execution::schedule(::beman::execution::get_scheduler( + ::beman::execution::get_env(this->receiver))), + schedule_receiver{this})), + work_op_(::beman::execution::connect(::std::forward(s), work_receiver{this})) { + static_assert(::beman::execution::operation_state); + if constexpr (not ::std::same_as< + ::beman::execution::completion_signatures<::beman::execution::set_value_t()>, + decltype(::beman::execution::get_completion_signatures( + ::beman::execution::schedule( + ::beman::execution::get_scheduler(::beman::execution::get_env(this->receiver))), + ::beman::execution::get_env(this->receiver)))>) { + static_assert(std::same_asreceiver)))>); + } + static_assert(::std::same_as<::beman::execution::completion_signatures<::beman::execution::set_value_t()>, + decltype(::beman::execution::get_completion_signatures( + ::beman::execution::schedule(::beman::execution::get_scheduler( + ::beman::execution::get_env(this->receiver))), + ::beman::execution::get_env(this->receiver)))>); + } + auto start() & noexcept -> void { ::beman::execution::start(this->work_op_); } + }; + template sender(S&& s) : ::beman::execution::detail::product_type<::beman::task::detail::affine_on_t, Sender>{ {{::beman::task::detail::affine_on_t{}}, {Sender(::std::forward(s))}}} {} - template <::beman::execution::receiver Receiver> - auto connect(Receiver&& receiver) const& { - if constexpr (elide_schedule) { - return ::beman::execution::connect(this->template get<1>(), ::std::forward(receiver)); - } else { - return ::beman::execution::connect( - ::beman::execution::continues_on( - this->template get<1>(), ::beman::execution::get_scheduler(::beman::execution::get_env(receiver))), - ::std::forward(receiver)); - } - } template <::beman::execution::receiver Receiver> auto connect(Receiver&& receiver) && { + static_assert(::std::same_as<::beman::execution::completion_signatures<::beman::execution::set_value_t()>, + decltype(::beman::execution::get_completion_signatures( + ::beman::execution::schedule( + ::beman::execution::get_scheduler(::beman::execution::get_env(receiver))), + ::beman::execution::get_env(receiver)))>, + "affine_on requires that the receiver's scheduler is infallible"); if constexpr (elide_schedule) { return ::beman::execution::connect(::std::move(this->template get<1>()), ::std::forward(receiver)); } else { - return ::beman::execution::connect( - ::beman::execution::continues_on( - ::std::move(this->template get<1>()), - ::beman::execution::get_scheduler(::beman::execution::get_env(receiver))), - ::std::forward(receiver)); + return state(::std::move(this->template get<1>()), ::std::forward(receiver)); } } }; diff --git a/include/beman/task/detail/task_scheduler.hpp b/include/beman/task/detail/task_scheduler.hpp index 1df2851..cc3f127 100644 --- a/include/beman/task/detail/task_scheduler.hpp +++ b/include/beman/task/detail/task_scheduler.hpp @@ -27,8 +27,6 @@ namespace beman::task::detail { * Completion signatures: * * - `ex::set_value_t()` - * - `ex::set_error_t(std::error_code)` - * - `ex::set_error_t(std::exception_ptr)` * - `ex::set_stopped()` * * Usage: @@ -38,32 +36,16 @@ namespace beman::task::detail { */ class task_scheduler { struct state_base { - virtual ~state_base() = default; - virtual void complete_value() = 0; - virtual void complete_error(::std::error_code) = 0; - virtual void complete_error(::std::exception_ptr) = 0; - virtual void complete_stopped() = 0; - virtual ::beman::execution::inplace_stop_token get_stop_token() = 0; + virtual ~state_base() = default; + virtual void complete_value() = 0; }; struct inner_state { struct receiver; - struct env { - state_base* state; - auto query(::beman::execution::get_stop_token_t) const noexcept { return this->state->get_stop_token(); } - }; struct receiver { using receiver_concept = ::beman::execution::receiver_t; state_base* state; void set_value() && noexcept { this->state->complete_value(); } - void set_error(std::error_code err) && noexcept { this->state->complete_error(err); } - void set_error(std::exception_ptr ptr) && noexcept { this->state->complete_error(std::move(ptr)); } - template - void set_error(E e) && noexcept { - this->state->complete_error(std::make_exception_ptr(std::move(e))); - } - void set_stopped() && noexcept { this->state->complete_stopped(); } - env get_env() const noexcept { return {this->state}; } }; static_assert(::beman::execution::receiver); @@ -88,46 +70,13 @@ class task_scheduler { template <::beman::execution::receiver Receiver> struct state : state_base { using operation_state_concept = ::beman::execution::operation_state_t; - struct stopper { - state* st; - void operator()() noexcept { - state* self = this->st; - self->callback.reset(); - self->source.request_stop(); - } - }; - using token_t = - decltype(::beman::execution::get_stop_token(::beman::execution::get_env(std::declval()))); - using callback_t = ::beman::execution::stop_callback_for_t; - - std::remove_cvref_t receiver; - inner_state s; - ::beman::execution::inplace_stop_source source; - ::std::optional callback; + std::remove_cvref_t receiver; + inner_state s; template <::beman::execution::receiver R, typename PS> state(R&& r, PS& ps) : receiver(std::forward(r)), s(ps->connect(this)) {} void start() & noexcept { this->s.start(); } void complete_value() override { ::beman::execution::set_value(std::move(this->receiver)); } - void complete_error(std::error_code err) override { ::beman::execution::set_error(std::move(receiver), err); } - void complete_error(std::exception_ptr ptr) override { - ::beman::execution::set_error(std::move(receiver), std::move(ptr)); - } - void complete_stopped() override { ::beman::execution::set_stopped(std::move(this->receiver)); } - ::beman::execution::inplace_stop_token get_stop_token() override { - if constexpr (::std::same_as) { - return ::beman::execution::get_stop_token(::beman::execution::get_env(this->receiver)); - } else { - if constexpr (not ::std::same_as) { - if (not this->callback) { - this->callback.emplace( - ::beman::execution::get_stop_token(::beman::execution::get_env(this->receiver)), - stopper{this}); - } - } - return this->source.get_token(); - } - } }; class sender; @@ -175,12 +124,8 @@ class task_scheduler { poly inner_sender; public: - using sender_concept = ::beman::execution::sender_t; - using completion_signatures = - ::beman::execution::completion_signatures<::beman::execution::set_value_t(), - ::beman::execution::set_error_t(std::error_code), - ::beman::execution::set_error_t(std::exception_ptr), - ::beman::execution::set_stopped_t()>; + using sender_concept = ::beman::execution::sender_t; + using completion_signatures = ::beman::execution::completion_signatures<::beman::execution::set_value_t()>; template <::beman::execution::scheduler S> explicit sender(S&& s) : inner_sender(static_cast*>(nullptr), std::forward(s)) {} diff --git a/tests/beman/task/affine_on.test.cpp b/tests/beman/task/affine_on.test.cpp index 665dda1..8161b32 100644 --- a/tests/beman/task/affine_on.test.cpp +++ b/tests/beman/task/affine_on.test.cpp @@ -21,15 +21,73 @@ struct receiver { std::remove_cvref_t scheduler; auto get_env() const noexcept { return ex::detail::make_env(ex::get_scheduler, scheduler); } - - void set_value(auto&&...) && noexcept {} - void set_error(auto&&) && noexcept {} - void set_stopped() && noexcept {} + auto set_value(auto&&...) && noexcept -> void {} }; + template receiver(Scheduler&& sched) -> receiver; +template +struct test_scheduler { + using scheduler_concept = ex::scheduler_t; + struct sender { + using sender_concept = ex::sender_t; + template + auto get_completion_signatures(Env&& env) const noexcept { + if constexpr (ex::unstoppable_token) + return ex::completion_signatures{}; + else + return ex::completion_signatures{}; + } + struct env { + template + auto query(ex::get_completion_scheduler_t) const noexcept -> test_scheduler { + return {}; + } + }; + auto get_env() const noexcept { return env{}; } + + template + struct op_state { + using operation_state_concept = ex::operation_state_t; + std::remove_cvref_t rcvr; + void start() & noexcept { + static_assert(ex::operation_state); + ex::set_value(std::move(this->rcvr)); + } + }; + template + auto connect(Rcvr&& rcvr) noexcept { + static_assert(ex::sender); + return op_state{std::forward(rcvr)}; + } + }; + + auto schedule() noexcept -> sender { + static_assert(ex::scheduler>); + return {}; + } + auto operator==(const test_scheduler&) const -> bool = default; +}; +static_assert(ex::scheduler>); + static_assert(ex::receiver>); +static_assert(ex::receiver>>); + +template +auto test_affine_on_accepted_scheduler(Receiver rcvr) { + if constexpr (requires { ex::connect(beman::task::affine_on(ex::just()), std::move(rcvr)); }) { + auto state(ex::connect(beman::task::affine_on(ex::just()), std::move(rcvr))); + ex::start(state); + } +} + +auto test_affine_on_accepted_scheduler() { + test_affine_on_accepted_scheduler(receiver{beman::task::detail::inline_scheduler{}}); + test_affine_on_accepted_scheduler(receiver{test_scheduler<>{}}); + // test_affine_on_accepted_scheduler(receiver{test_scheduler{}}); +} + } // namespace int main() { @@ -45,13 +103,11 @@ int main() { [[maybe_unused]] auto s(beman::task::affine_on(ex::just(42))); static_assert(ex::sender); [[maybe_unused]] auto st(ex::connect(std::move(s), receiver{context.get_scheduler()})); -#if 0 - ex::sync_wait(beman::task::affine_on(ex::just(42), context.get_scheduler()) -#else - ex::sync_wait(beman::execution::continues_on(ex::just(42), context.get_scheduler()) -#endif - | ex::then([thread_id](int value) { - assert(thread_id == std::this_thread::get_id()); - assert(value == 42); - })); + ex::sync_wait( + beman::task::affine_on(ex::starts_on(context.get_scheduler(), ex::just(42)) | ex::then([thread_id](int value) { + assert(thread_id == std::this_thread::get_id()); + assert(value == 42); + })) | + ex::then([thread_id]() { assert(thread_id != std::this_thread::get_id()); })); + test_affine_on_accepted_scheduler(); } diff --git a/tests/beman/task/task_scheduler.test.cpp b/tests/beman/task/task_scheduler.test.cpp index d2b1ebf..31e4a86 100644 --- a/tests/beman/task/task_scheduler.test.cpp +++ b/tests/beman/task/task_scheduler.test.cpp @@ -111,14 +111,7 @@ struct thread_context { } void complete() override { this->callback.reset(); - if (this->cmpl == thread_context::complete::success) - ex::set_value(std::move(this->receiver)); - else if (this->cmpl == thread_context::complete::failure) - ex::set_error(std::move(this->receiver), std::make_error_code(std::errc::address_in_use)); - else - ex::set_error( - std::move(this->receiver), - std::make_exception_ptr(std::system_error(std::make_error_code(std::errc::address_in_use)))); + ex::set_value(std::move(this->receiver)); } }; struct env { @@ -128,9 +121,8 @@ struct thread_context { } }; struct sender { - using sender_concept = ex::sender_t; - using completion_signatures = - ex::completion_signatures; + using sender_concept = ex::sender_t; + using completion_signatures = ex::completion_signatures; thread_context* ctxt; thread_context::complete cmpl; @@ -241,38 +233,6 @@ int main() { ex::sync_wait(ex::schedule(ly::detail::task_scheduler(sched2)) | ex::then([&id2]() { assert(id2 == std::this_thread::get_id()); })); - { - bool success{false}; - bool failed{false}; - bool exception{false}; - ex::sync_wait(ex::schedule(ctxt1.get_scheduler(thread_context::complete::failure)) | - ex::then([&success] { success = true; }) | - ex::upon_error([&failed, &exception](const E&) { - if constexpr (std::same_as) - failed = true; - else if constexpr (std::same_as) - exception = true; - })); - assert(not success); - assert(failed); - assert(not exception); - } - { - bool success{false}; - bool failed{false}; - bool exception{false}; - ex::sync_wait(ex::schedule(ctxt1.get_scheduler(thread_context::complete::exception)) | - ex::then([&success] { success = true; }) | - ex::upon_error([&failed, &exception](const E&) { - if constexpr (std::same_as) - failed = true; - else if constexpr (std::same_as) - exception = true; - })); - assert(not success); - assert(not failed); - assert(exception); - } { ex::inplace_stop_source source; stop_result result{stop_result::none}; @@ -284,30 +244,6 @@ int main() { source.request_stop(); assert(result == stop_result::stopped); } - { - ex::inplace_stop_source source; - stop_result result{stop_result::none}; - auto state{ex::connect( - ex::schedule(ly::detail::task_scheduler(ctxt1.get_scheduler(thread_context::complete::never))), - stop_receiver{source.get_token(), result})}; - assert(result == stop_result::none); - ex::start(state); - assert(result == stop_result::none); - source.request_stop(); - assert(result == stop_result::stopped); - } - { - ex::stop_source source; - stop_result result{stop_result::none}; - auto state{ex::connect( - ex::schedule(ly::detail::task_scheduler(ctxt1.get_scheduler(thread_context::complete::never))), - stop_receiver{source.get_token(), result})}; - assert(result == stop_result::none); - ex::start(state); - assert(result == stop_result::none); - source.request_stop(); - assert(result == stop_result::stopped); - } { std::latch completed{1}; stop_result result{stop_result::none};