Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ FetchContent_Declare(
execution
# SOURCE_DIR ${CMAKE_SOURCE_DIR}/../execution
GIT_REPOSITORY https://github.com/bemanproject/execution
GIT_TAG 686685c
GIT_TAG 7451ece
)
FetchContent_MakeAvailable(execution)

Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#-dk: note to self: PATH=/opt/llvm-19.1.6/bin:$PATH LDFLAGS=-fuse-ld=lld

.PHONY: config test default compile clean distclean doc html pdf format clang-format tidy
.PHONY: config test default compile clean distclean doc docs html pdf format clang-format tidy

BUILDDIR = build
PRESET = gcc-release
Expand All @@ -12,7 +12,7 @@ BUILD = $(BUILDDIR)/$(PRESET)

default: compile

doc:
docs doc:
cd docs; $(MAKE)

pdf html:
Expand Down
2 changes: 1 addition & 1 deletion docs/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ default: doc

doc: doc-html

doc-html: P3552-task.html P3796-task-issues.html
doc-html: P3552-task.html P3796-task-issues.html P3941-affinity.html

doc-pdf: P3552-task.pdf P3796-task-issues.pdf

Expand Down
500 changes: 500 additions & 0 deletions docs/P3941-affinity.md

Large diffs are not rendered by default.

86 changes: 0 additions & 86 deletions docs/affinity.md

This file was deleted.

21 changes: 9 additions & 12 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception

set(TODO tls-scheduler)
set(TODO tls-scheduler into_optional issue-start-reschedules loop)

set(ALL_EXAMPLES
odd-return
task_scheduler
bulk
c++now-allocator
c++now-cancel
c++now-errors
alloc
affinity
odd-return
dangling-references
rvalue-task
aggregate-return
customize
issue-affine_on
issue-symmetric-transfer
affinity
alloc
c++now-affinity
c++now-allocator
c++now-basic
c++now-cancel
c++now-errors
c++now-query
c++now-result-types
c++now-return
Expand All @@ -29,15 +31,10 @@ set(ALL_EXAMPLES
escaped-exception
friendly
hello
into_optional
issue-affine_on
issue-frame-allocator
issue-start-reschedules
loop
query
result_example
stop
task_scheduler
)
set(xALL_EXAMPLES issue-symmetric-transfer)
set(xALL_EXAMPLES customize)
Expand Down
5 changes: 5 additions & 0 deletions examples/issue-affine_on.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ ex::task<> test(Sender&& sender) {

int main() {
ex::sync_wait(test(ex::just()));
ex::sync_wait(test(ex::read_env(ex::get_scheduler)));
#if 0
ex::sync_wait(test(ex::read_env(ex::get_scheduler) |
ex::let_value([](auto sched) noexcept { return ex::just(); })));
ex::sync_wait(test(ex::read_env(ex::get_scheduler) |
ex::let_value([](auto sched) { return ex::starts_on(sched, ex::just()); })));
#endif
}
14 changes: 10 additions & 4 deletions examples/issue-start-reschedules.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ namespace ex = beman::execution;
ex::task<> test(auto sched) {
std::cout << "init =" << std::this_thread::get_id() << "\n";
co_await ex::starts_on(sched, ex::just());
// static_assert(std::same_as<void, decltype(ex::get_completion_signatures(ex::starts_on(sched, ex::just()),
// ex::empty_env{}))>);
co_await ex::just();
std::cout << "final=" << std::this_thread::get_id() << "\n";
}

Expand All @@ -23,8 +26,11 @@ int main() {
ex::then([] { std::cout << "loop1=" << std::this_thread::get_id() << "\n"; }));
ex::sync_wait(ex::schedule(loop2.get_scheduler()) |
ex::then([] { std::cout << "loop2=" << std::this_thread::get_id() << "\n"; }));
std::cout << "--- use 1 ---\n";
ex::sync_wait(test(loop2.get_scheduler()));
std::cout << "--- use 2 ---\n";
ex::sync_wait(ex::starts_on(loop1.get_scheduler(), test(loop2.get_scheduler())));
try {
std::cout << "--- use 1 ---\n";
ex::sync_wait(test(loop2.get_scheduler()));
std::cout << "--- use 2 ---\n";
// ex::sync_wait(ex::starts_on(loop1.get_scheduler(), test(loop2.get_scheduler())));
} catch (...) {
}
}
153 changes: 119 additions & 34 deletions include/beman/task/detail/affine_on.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@
#define INCLUDED_INCLUDE_BEMAN_TASK_DETAIL_AFFINE_ON

#include <beman/execution/execution.hpp>
#include <beman/execution/detail/meta_unique.hpp>
#include <beman/task/detail/inline_scheduler.hpp>
#include <concepts>
#include <utility>
#include <tuple>
#include <variant>

// ----------------------------------------------------------------------------

Expand All @@ -33,57 +37,138 @@ struct affine_on_t::sender : ::beman::execution::detail::product_type<::beman::t

template <typename Env>
auto get_completion_signatures(const Env& env) const& noexcept {
if constexpr (elide_schedule<decltype(::beman::execution::get_scheduler(::std::declval<Env>()))>) {
return ::beman::execution::get_completion_signatures(
::std::remove_cvref_t<Sender>(::std::move(this->template get<2>())), env);
} else {
return ::beman::execution::get_completion_signatures(
::beman::execution::continues_on(this->template get<1>(), ::beman::execution::get_scheduler(env)),
env);
}
return ::beman::execution::get_completion_signatures(::std::remove_cvref_t<Sender>(this->template get<1>()),
env);
}
template <typename Env>
auto get_completion_signatures(const Env& env) && noexcept {
if constexpr (elide_schedule<decltype(::beman::execution::get_scheduler(::std::declval<Env>()))>) {
return ::beman::execution::get_completion_signatures(
::std::remove_cvref_t<Sender>(::std::move(this->template get<1>())), env);
} else {
return ::beman::execution::get_completion_signatures(
::beman::execution::continues_on(::std::move(this->template get<1>()),
::beman::execution::get_scheduler(env)),
env);
}
return ::beman::execution::get_completion_signatures(
::std::remove_cvref_t<Sender>(::std::move(this->template get<1>())), env);
}

template <::beman::execution::receiver Receiver>
struct state {
template <typename>
struct to_tuple_t;
template <typename R, typename... A>
struct to_tuple_t<R(A...)> {
using type = ::std::tuple<R, std::remove_cvref_t<A>...>;
};
template <typename>
struct to_variant_t;
template <typename... Sig>
struct to_variant_t<::beman::execution::completion_signatures<Sig...>> {
using type = ::beman::execution::detail::meta::unique<::std::variant<typename to_tuple_t<Sig>::type...>>;
};

using operation_state_concept = ::beman::execution::operation_state_t;
using completion_signatures = decltype(::beman::execution::get_completion_signatures(
::std::declval<Sender>(), ::beman::execution::get_env(::std::declval<Receiver>())));
using value_type = typename to_variant_t<completion_signatures>::type;

struct schedule_receiver {
using receiver_concept = ::beman::execution::receiver_t;
state* s;
auto set_value() noexcept -> void {
static_assert(::beman::execution::receiver<schedule_receiver>);
std::visit(
[this](auto&& v) -> void {
std::apply(
[this](auto tag, auto&&... a) { tag(std::move(this->s->receiver), ::std::move(a)...); },
v);
},
s->value);
}
auto get_env() const noexcept -> ::beman::execution::empty_env { /*-dk:TODO */ return {}; }
};

struct work_receiver {
using receiver_concept = ::beman::execution::receiver_t;
state* s;
template <typename... T>
auto set_value(T&&... args) noexcept -> void {
static_assert(::beman::execution::receiver<work_receiver>);
this->s->value
.template emplace<::std::tuple<::beman::execution::set_value_t, ::std::remove_cvref_t<T>...>>(
::beman::execution::set_value, ::std::forward<T>(args)...);
this->s->sched_op_.start();
}
template <typename E>
auto set_error(E&& error) noexcept -> void {
static_assert(::beman::execution::receiver<work_receiver>);
this->s->value
.template emplace<::std::tuple<::beman::execution::set_error_t, ::std::remove_cvref_t<E>>>(
::beman::execution::set_error, ::std::forward<E>(error));
this->s->sched_op_.start();
}
auto set_stopped(auto&&...) noexcept -> void {
static_assert(::beman::execution::receiver<work_receiver>);
this->s->value.template emplace<::std::tuple<::beman::execution::set_stopped_t>>(
::beman::execution::set_stopped);
this->s->sched_op_.start();
}
auto get_env() const noexcept -> decltype(::beman::execution::get_env(::std::declval<Receiver>())) {
return ::beman::execution::get_env(this->s->receiver);
}
};
using scheduler_t =
decltype(::beman::execution::get_scheduler(::beman::execution::get_env(::std::declval<Receiver>())));
using schedule_op = decltype(::beman::execution::connect(
::beman::execution::schedule(::std::declval<scheduler_t>()), ::std::declval<schedule_receiver>()));
using work_op =
decltype(::beman::execution::connect(::std::declval<Sender>(), ::std::declval<work_receiver>()));

::std::remove_cvref_t<Receiver> receiver;
value_type value;
schedule_op sched_op_;
work_op work_op_;

template <typename S, typename R>
explicit state(S&& s, R&& r)
: receiver(::std::forward<R>(r)),
sched_op_(::beman::execution::connect(::beman::execution::schedule(::beman::execution::get_scheduler(
::beman::execution::get_env(this->receiver))),
schedule_receiver{this})),
work_op_(::beman::execution::connect(::std::forward<S>(s), work_receiver{this})) {
static_assert(::beman::execution::operation_state<state>);
if constexpr (not ::std::same_as<
::beman::execution::completion_signatures<::beman::execution::set_value_t()>,
decltype(::beman::execution::get_completion_signatures(
::beman::execution::schedule(
::beman::execution::get_scheduler(::beman::execution::get_env(this->receiver))),
::beman::execution::get_env(this->receiver)))>) {
static_assert(std::same_as<void,
decltype(::beman::execution::get_scheduler(
::beman::execution::get_env(this->receiver)))>);
}
static_assert(::std::same_as<::beman::execution::completion_signatures<::beman::execution::set_value_t()>,
decltype(::beman::execution::get_completion_signatures(
::beman::execution::schedule(::beman::execution::get_scheduler(
::beman::execution::get_env(this->receiver))),
::beman::execution::get_env(this->receiver)))>);
}
auto start() & noexcept -> void { ::beman::execution::start(this->work_op_); }
};

template <typename S>
sender(S&& s)
: ::beman::execution::detail::product_type<::beman::task::detail::affine_on_t, Sender>{
{{::beman::task::detail::affine_on_t{}}, {Sender(::std::forward<S>(s))}}} {}

template <::beman::execution::receiver Receiver>
auto connect(Receiver&& receiver) const& {
if constexpr (elide_schedule<decltype(::beman::execution::get_scheduler(
::beman::execution::get_env(receiver)))>) {
return ::beman::execution::connect(this->template get<1>(), ::std::forward<Receiver>(receiver));
} else {
return ::beman::execution::connect(
::beman::execution::continues_on(
this->template get<1>(), ::beman::execution::get_scheduler(::beman::execution::get_env(receiver))),
::std::forward<Receiver>(receiver));
}
}
template <::beman::execution::receiver Receiver>
auto connect(Receiver&& receiver) && {
static_assert(::std::same_as<::beman::execution::completion_signatures<::beman::execution::set_value_t()>,
decltype(::beman::execution::get_completion_signatures(
::beman::execution::schedule(
::beman::execution::get_scheduler(::beman::execution::get_env(receiver))),
::beman::execution::get_env(receiver)))>,
"affine_on requires that the receiver's scheduler is infallible");
if constexpr (elide_schedule<decltype(::beman::execution::get_scheduler(
::beman::execution::get_env(receiver)))>) {
return ::beman::execution::connect(::std::move(this->template get<1>()),
::std::forward<Receiver>(receiver));
} else {
return ::beman::execution::connect(
::beman::execution::continues_on(
::std::move(this->template get<1>()),
::beman::execution::get_scheduler(::beman::execution::get_env(receiver))),
::std::forward<Receiver>(receiver));
return state<Receiver>(::std::move(this->template get<1>()), ::std::forward<Receiver>(receiver));
}
}
};
Expand Down
Loading