diff --git a/.github/workflows/test-windows.ps1 b/.github/workflows/test-windows.ps1 index 704893426..809542815 100644 --- a/.github/workflows/test-windows.ps1 +++ b/.github/workflows/test-windows.ps1 @@ -24,6 +24,7 @@ Invoke-NativeCommand cmake -B $BuildDirectory -G Ninja ` "-DCMAKE_MSVC_DEBUG_INFORMATION_FORMAT:STRING=Embedded" ` "-DSTDEXEC_ENABLE_ASIO:BOOL=TRUE" ` "-DSTDEXEC_ASIO_IMPLEMENTATION:STRING=boost" ` + "-DCMAKE_VERBOSE_MAKEFILE:BOOL=ON" ` "-DSTDEXEC_BUILD_TESTS:BOOL=TRUE" . Invoke-NativeCommand cmake --build $BuildDirectory -Invoke-NativeCommand ctest --test-dir $BuildDirectory +Invoke-NativeCommand ctest --test-dir $BuildDirectory --output-on-failure --verbose --timeout 60 diff --git a/include/stdexec/__detail/__connect_awaitable.hpp b/include/stdexec/__detail/__connect_awaitable.hpp index 2fa75cda8..ef373dd86 100644 --- a/include/stdexec/__detail/__connect_awaitable.hpp +++ b/include/stdexec/__detail/__connect_awaitable.hpp @@ -38,15 +38,6 @@ namespace STDEXEC // __connect_await namespace __connect_await { - STDEXEC_PRAGMA_OPTIMIZE_BEGIN() - -# if STDEXEC_MSVC() - static constexpr std::size_t __storage_size = 256; -# else - static constexpr std::size_t __storage_size = 8 * sizeof(void*); -# endif - static constexpr std::size_t __storage_align = __STDCPP_DEFAULT_NEW_ALIGNMENT__; - // clang-format off template concept __has_as_awaitable_member = requires(_Tp&& __t, _Promise& __promise) @@ -80,215 +71,476 @@ namespace STDEXEC __with_await_transform() = default; }; - struct __awaiter_base + struct __synthetic_coro_frame + { + void (*__resume_)(void*) noexcept; + // we never invoke __destroy_ so a no-op implementation is fine; we've chosen + // the address of a no-op function rather than nullptr in case some rogue awaitable + // *does* invoke destroy on the synthesized handle that it receives in its + // await_suspend function + void (*__destroy_)(void*) noexcept = &__noop_destroy; + + private: + static void __noop_destroy(void*) noexcept {} + }; + + static constexpr std::ptrdiff_t __promise_offset = sizeof(__synthetic_coro_frame); + + template + struct __opstate; + + template + struct __promise : __with_await_transform<__promise<_Awaiter, _Receiver>> { - static constexpr auto await_ready() noexcept -> bool + constexpr auto unhandled_stopped() noexcept -> __std::coroutine_handle<> { - return false; + __get_opstate().__on_stopped(); + // Returning noop_coroutine here causes the __connect_awaitable + // coroutine to never resume past its initial_suspend point + return __std::noop_coroutine(); } - [[noreturn]] - inline void await_resume() noexcept + [[nodiscard]] + constexpr auto get_env() const noexcept -> env_of_t<_Receiver> { - __std::unreachable(); + return STDEXEC::get_env(__get_opstate().__rcvr_); + } + + private: + using __opstate_t = __opstate<_Awaiter, _Receiver>; + friend __opstate_t; + + static void __resume(void* __frame) noexcept + { + auto* __op = static_cast<__opstate_t*>(__frame); + __op->__on_resume(); + } + + __opstate_t& __get_opstate() noexcept + { + return *reinterpret_cast<__opstate_t*>(reinterpret_cast(this) + - __promise_offset); + } + + __opstate_t const & __get_opstate() const noexcept + { + return *reinterpret_cast<__opstate_t const *>(reinterpret_cast(this) + - __promise_offset); } }; - inline void __destroy_coro(__std::coroutine_handle<> __coro) noexcept + template + struct __awaitable_wrapper { -# if STDEXEC_MSVC() - // MSVCBUG https://developercommunity.visualstudio.com/t/Double-destroy-of-a-local-in-coroutine-d/10456428 - // Reassign __coro before calling destroy to make the mutation - // observable and to hopefully ensure that the compiler does not eliminate it. - std::exchange(__coro, {}).destroy(); -# else - __coro.destroy(); -# endif - } + constexpr auto& __awaiter() noexcept + { + return static_cast<_Derived*>(this)->__awaiter_; + } - template - struct __opstate; + constexpr auto await_ready() noexcept(noexcept(__awaiter().await_ready())) -> bool + { + return __awaiter().await_ready(); + } - template - struct __promise : __with_await_transform<__promise<_Awaitable, _Receiver>> - { - using __opstate_t = __opstate<_Awaitable, _Receiver>; + template + constexpr auto await_suspend(__std::coroutine_handle<_Promise> __h) + noexcept(noexcept(__awaiter().await_suspend(__h))) + { + return __awaiter().await_suspend(__h); + } - struct __task + constexpr decltype(auto) await_resume() noexcept(noexcept(__awaiter().await_resume())) { - using promise_type = __promise; + return __awaiter().await_resume(); + } + }; - constexpr explicit __task(__std::coroutine_handle<__promise> __coro) noexcept - : __coro_(__coro) - {} + template + concept __has_distinct_awaitable = __has_as_awaitable_member<_Tp, _Promise>; - STDEXEC_IMMOVABLE(__task); + template + concept __has_distinct_awaiter = requires(_Awaitable&& __awaitable) { + { static_cast<_Awaitable&&>(__awaitable).operator co_await() }; + } || requires(_Awaitable&& __awaitable) { + { operator co_await(static_cast<_Awaitable&&>(__awaitable)) }; + }; - ~__task() - { - __connect_await::__destroy_coro(__coro_); - } + template + struct __awaitable_state : __awaitable_wrapper<__awaitable_state<_Awaitable, _Promise>> + { + using __awaitable_t = __result_of<__get_awaitable, _Awaitable, _Promise&>; + using __awaiter_t = __awaiter_of_t<__awaitable_t>; - __std::coroutine_handle<__promise> __coro_{}; - }; + static constexpr bool __is_nothrow = __noexcept_of<__get_awaitable, _Awaitable, _Promise&> + && __noexcept_of<__get_awaiter, __awaitable_t>; - struct __final_awaiter : __awaiter_base + struct __state : __awaitable_wrapper<__state> { - void await_suspend(__std::coroutine_handle<>) noexcept + __state(_Awaitable&& __source, __std::coroutine_handle<_Promise> __coro) + noexcept(__is_nothrow) { - using __awaitable_t = __result_of<__get_awaitable, _Awaitable, __promise&>; - using __awaiter_t = __awaiter_of_t<__awaitable_t>; - using __result_t = decltype(__declval<__awaiter_t>().await_resume()); + // GCC doesn't like initializing __awaitable_ or __awaiter_ in the member initializer + // clause when the result of __get_awaitable or __get_awaiter is immovable; it *seems* + // like direct initialization of a member with the result of a function ought to trigger + // C++17's mandatory copy elision, and both Clang and MSVC accept that code, but using + // a union with in-place new works around the issue. + new (static_cast(std::addressof(__awaitable_))) + __awaitable_t(__get_awaitable(static_cast<_Awaitable&&>(__source), __coro.promise())); + new (static_cast(std::addressof(__awaiter_))) + __awaiter_t(__get_awaiter(static_cast<__awaitable_t&&>(__awaitable_))); + } - if (__opstate_.__eptr_) - { - STDEXEC::set_error(static_cast<_Receiver&&>(__opstate_.__rcvr_), - std::move(__opstate_.__eptr_)); - } - else if constexpr (__same_as<__result_t, void>) - { - STDEXEC_ASSERT(__opstate_.__result_.has_value()); - STDEXEC::set_value(static_cast<_Receiver&&>(__opstate_.__rcvr_)); - } - else - { - STDEXEC_ASSERT(__opstate_.__result_.has_value()); - STDEXEC::set_value(static_cast<_Receiver&&>(__opstate_.__rcvr_), - static_cast<__result_t&&>(*__opstate_.__result_)); - } - // This coroutine is never resumed; its work is done. + ~__state() + { + // make sure to destroy in the reverse order of construction + std::destroy_at(std::addressof(__awaiter_)); + std::destroy_at(std::addressof(__awaitable_)); } - __opstate<_Awaitable, _Receiver>& __opstate_; + union + { + [[no_unique_address]] + __awaitable_t __awaitable_; + }; + + union + { + [[no_unique_address]] + __awaiter_t __awaiter_; + }; }; - constexpr explicit(!STDEXEC_EDG()) __promise(__opstate_t& __opstate) noexcept - : __opstate_(__opstate) + [[no_unique_address]] + _Awaitable __source_awaitable_; + union + { + [[no_unique_address]] + __state __awaiter_; + }; + + template <__not_decays_to<__awaitable_state> _Awaitable2> + explicit __awaitable_state(_Awaitable2&& __awaitable) + noexcept(__nothrow_constructible_from<_Awaitable, _Awaitable2>) + : __source_awaitable_(static_cast<_Awaitable2&&>(__awaitable)) {} -# if !STDEXEC_GCC() || STDEXEC_GCC_VERSION >= 12'00 - static constexpr auto - operator new([[maybe_unused]] std::size_t __bytes, __opstate_t& __opstate) noexcept -> void* + ~__awaitable_state() {} + + constexpr void construct(__std::coroutine_handle<_Promise> __coro) noexcept(__is_nothrow) { - STDEXEC_ASSERT(__bytes <= sizeof(__opstate.__storage_)); - return __opstate.__storage_; + std::construct_at(&__awaiter_, static_cast<_Awaitable&&>(__source_awaitable_), __coro); } - static constexpr void operator delete([[maybe_unused]] void* __ptr) noexcept + constexpr void destroy() noexcept { - // no-op + std::destroy_at(&__awaiter_); } -# endif + }; + + template + requires __awaitable<_Awaitable, _Promise> + && (!__has_distinct_awaitable<_Awaitable, _Promise>) + && __has_distinct_awaiter<_Awaitable> + struct __awaitable_state<_Awaitable, _Promise> + : __awaitable_wrapper<__awaitable_state<_Awaitable, _Promise>> + { + // _Awaitable has a distinct awaiter, but no distinct as_awaitable() + // so we don't need separate storage for it + using __awaiter_t = __awaiter_of_t<_Awaitable&&>; - constexpr auto get_return_object() noexcept -> __task + static constexpr bool __is_nothrow = __noexcept_of<__get_awaiter, _Awaitable&&>; + + struct __state : __awaitable_wrapper<__state> { - return __task{__std::coroutine_handle<__promise>::from_promise(*this)}; - } + __state(_Awaitable&& __source, __std::coroutine_handle<_Promise> __coro) + noexcept(__is_nothrow) + { + // GCC doesn't like initializing __awaiter_ in the member initializer clause when the + // result of __get_awaiter is immovable; it *seems* like direct initialization of a + // member with the result of a function ought to trigger C++17's mandatory copy elision, + // and both Clang and MSVC accept that code, but using a union with in-place new works + // around the issue. + new (static_cast(std::addressof(__awaiter_))) + __awaiter_t(__get_awaiter(static_cast<_Awaitable&&>(__source))); + + [[maybe_unused]] + auto&& __awaitable = __get_awaitable(static_cast<_Awaitable&&>(__source), + __coro.promise()); + + STDEXEC_ASSERT(std::addressof(__awaitable) == std::addressof(__source)); + } - [[noreturn]] - static auto get_return_object_on_allocation_failure() noexcept -> __task + ~__state() + { + std::destroy_at(std::addressof(__awaiter_)); + } + + union + { + [[no_unique_address]] + __awaiter_t __awaiter_; + }; + }; + + [[no_unique_address]] + _Awaitable __source_awaitable_; + union { - __std::unreachable(); - } + [[no_unique_address]] + __state __awaiter_; + }; + + template <__not_decays_to<__awaitable_state> _Awaitable2> + explicit __awaitable_state(_Awaitable2&& __awaitable) + noexcept(__nothrow_constructible_from<_Awaitable, _Awaitable2>) + : __source_awaitable_(static_cast<_Awaitable2&&>(__awaitable)) + {} + + ~__awaitable_state() {} - static constexpr auto initial_suspend() noexcept -> __std::suspend_always + constexpr void construct(__std::coroutine_handle<_Promise> __coro) noexcept(__is_nothrow) { - return {}; + std::construct_at(&__awaiter_, static_cast<_Awaitable&&>(__source_awaitable_), __coro); } - void unhandled_exception() noexcept + constexpr void destroy() noexcept { - __opstate_.__eptr_ = std::current_exception(); + std::destroy_at(&__awaiter_); } + }; - constexpr auto unhandled_stopped() noexcept -> __std::coroutine_handle<> + template + requires __awaitable<_Awaitable, _Promise> // + && __has_distinct_awaitable<_Awaitable, _Promise> + && (!__has_distinct_awaiter<__result_of<__get_awaitable, _Awaitable, _Promise&>>) + struct __awaitable_state<_Awaitable, _Promise> + : __awaitable_wrapper<__awaitable_state<_Awaitable, _Promise>> + { + // _Awaitable has a distinct awaitable, but no distinct awaiter + // so we don't need separate storage for it + using __awaiter_t = __result_of<__get_awaitable, _Awaitable, _Promise&>; + + static constexpr bool __is_nothrow = __noexcept_of<__get_awaitable, _Awaitable, _Promise&>; + + struct __state : __awaitable_wrapper<__state> { - STDEXEC::set_stopped(static_cast<_Receiver&&>(__opstate_.__rcvr_)); - // Returning noop_coroutine here causes the __connect_awaitable - // coroutine to never resume past the point where it co_await's - // the awaitable. - return __std::noop_coroutine(); + __state(_Awaitable&& __source, __std::coroutine_handle<_Promise> __coro) + noexcept(__is_nothrow) + { + // GCC doesn't like initializing __awaiter_ in the member initializer clause when the + // result of __get_awaitable is immovable; it *seems* like direct initialization of a + // member with the result of a function ought to trigger C++17's mandatory copy elision, + // and both Clang and MSVC accept that code, but using a union with in-place new works + // around the issue. + new (static_cast(std::addressof(__awaiter_))) + __awaiter_t(__get_awaitable(static_cast<_Awaitable&&>(__source), __coro.promise())); + + [[maybe_unused]] + auto&& __awaiter = __get_awaiter(static_cast<__awaiter_t&&>(__awaiter_)); + + STDEXEC_ASSERT(std::addressof(__awaiter) == std::addressof(__awaiter_)); + } + + ~__state() + { + std::destroy_at(std::addressof(__awaiter_)); + } + + union + { + [[no_unique_address]] + __awaiter_t __awaiter_; + }; + }; + + [[no_unique_address]] + _Awaitable __source_awaitable_; + union + { + [[no_unique_address]] + __state __awaiter_; + }; + + template <__not_decays_to<__awaitable_state> _Awaitable2> + explicit __awaitable_state(_Awaitable2&& __awaitable) + noexcept(__nothrow_constructible_from<_Awaitable, _Awaitable2>) + : __source_awaitable_(static_cast<_Awaitable2&&>(__awaitable)) + {} + + ~__awaitable_state() {} + + constexpr void construct(__std::coroutine_handle<_Promise> __coro) noexcept(__is_nothrow) + { + std::construct_at(&__awaiter_, static_cast<_Awaitable&&>(__source_awaitable_), __coro); } - constexpr auto final_suspend() noexcept -> __final_awaiter + constexpr void destroy() noexcept { - return __final_awaiter{{}, __opstate_}; + std::destroy_at(&__awaiter_); } + }; + + template + requires __awaitable<_Awaitable, _Promise> + && (!__has_distinct_awaitable<_Awaitable, _Promise>) + && (!__has_distinct_awaiter<_Awaitable>) + struct __awaitable_state<_Awaitable, _Promise> + : __awaitable_wrapper<__awaitable_state<_Awaitable, _Promise>> + { + // _Awaitable has neither a distinct awaiter, nor a distinct awaitable + // so we don't need separate storage for either + [[no_unique_address]] + _Awaitable __awaiter_; + + template <__not_decays_to<__awaitable_state> _Awaitable2> + explicit __awaitable_state(_Awaitable2&& __awaitable) + noexcept(__nothrow_constructible_from<_Awaitable, _Awaitable2>) + : __awaiter_(static_cast<_Awaitable2&&>(__awaitable)) + {} - static void return_void() noexcept + static constexpr void construct(__std::coroutine_handle<_Promise>) noexcept { // no-op } - [[nodiscard]] - constexpr auto get_env() const noexcept -> env_of_t<_Receiver> + static constexpr void destroy() noexcept { - return STDEXEC::get_env(__opstate_.__rcvr_); + // no-op } - - __opstate<_Awaitable, _Receiver>& __opstate_; }; template struct __opstate { constexpr explicit __opstate(_Awaitable&& __awaitable, _Receiver&& __rcvr) - noexcept(__is_nothrow) + noexcept(__nothrow_move_constructible<_Awaitable>) : __rcvr_(static_cast<_Receiver&&>(__rcvr)) - , __task_(__co_impl(*this)) - , __awaitable1_(static_cast<_Awaitable&&>(__awaitable)) - , __awaitable2_( - __get_awaitable(static_cast<_Awaitable&&>(__awaitable1_), __task_.__coro_.promise())) - , __awaiter_(__get_awaiter(static_cast<__awaitable_t&&>(__awaitable2_))) + , __awaiter_(static_cast<_Awaitable&&>(__awaitable)) {} + __opstate(__opstate&&) = delete; + + ~__opstate() + { + if (__started_) + { + __awaiter_.destroy(); + } + } + void start() & noexcept { - __task_.__coro_.resume(); + auto __coro = __co_impl(*this); + + STDEXEC_TRY + { + __awaiter_.construct(__coro); + __started_ = true; + + if (!__awaiter_.await_ready()) + { + using __suspend_result_t = decltype(__awaiter_.await_suspend(__coro)); + + // suspended + if constexpr (std::is_void_v<__suspend_result_t>) + { + // void-returning await_suspend means "always suspend" + __awaiter_.await_suspend(__coro); + return; + } + else if constexpr (std::same_as) + { + if (__awaiter_.await_suspend(__coro)) + { + // returning true from a bool-returning await_suspend means suspend + return; + } + else + { + // returning false means immediately resume + } + } + else + { + static_assert(__std::convertible_to<__suspend_result_t, __std::coroutine_handle<>>); + auto __resume_target = __awaiter_.await_suspend(__coro); + STDEXEC_TRY + { + __resume_target.resume(); + } + STDEXEC_CATCH_ALL + { + STDEXEC_ASSERT(false + && "about to deliberately commit UB in response to a misbehaving " + "awaitable"); + __std::unreachable(); + } + return; + } + } + + // immediate resumption + __on_resume(); + } + STDEXEC_CATCH_ALL + { + if constexpr (!noexcept(__awaiter_.construct(__coro)) + || !noexcept(__awaiter_.await_ready()) + || !noexcept(__awaiter_.await_suspend(__coro))) + { + STDEXEC::set_error(static_cast<_Receiver&&>(__rcvr_), std::current_exception()); + } + } } private: using __promise_t = __promise<_Awaitable, _Receiver>; - using __task_t = __promise_t::__task; using __awaitable_t = __result_of<__get_awaitable, _Awaitable, __promise_t&>; using __awaiter_t = __awaiter_of_t<__awaitable_t>; - using __result_t = decltype(__declval<__awaiter_t>().await_resume()); friend __promise_t; - static constexpr bool __is_nothrow = __nothrow_move_constructible<_Awaitable> - && __noexcept_of<__get_awaitable, _Awaitable, __promise_t&> - && __noexcept_of<__get_awaiter, __awaitable_t>; - - static constexpr std::size_t __storage_size = __connect_await::__storage_size - + sizeof(__manual_lifetime<__result_t>) - - __same_as<__result_t, void>; + static auto __co_impl(__opstate& __op) noexcept -> __std::coroutine_handle<__promise_t> + { + return __std::coroutine_handle<__promise_t>::from_address(&__op.__synthetic_frame_); + } - static auto __co_impl(__opstate& __op) noexcept -> __task_t + constexpr void __on_resume() noexcept { - using __op_awaiter_t = decltype(__op.__awaiter_); - if constexpr (__same_as) + STDEXEC_TRY { - co_await static_cast<__op_awaiter_t&&>(__op.__awaiter_); - __op.__result_.emplace(); + if constexpr (std::is_void_v) + { + __awaiter_.await_resume(); + STDEXEC::set_value(static_cast<_Receiver&&>(__rcvr_)); + } + else + { + STDEXEC::set_value(static_cast<_Receiver&&>(__rcvr_), __awaiter_.await_resume()); + } } - else + STDEXEC_CATCH_ALL { - __op.__result_.emplace(co_await static_cast<__op_awaiter_t&&>(__op.__awaiter_)); + if constexpr (!noexcept(__awaiter_.await_resume())) + { + STDEXEC::set_error(static_cast<_Receiver&&>(__rcvr_), std::current_exception()); + } } } - alignas(__storage_align) std::byte __storage_[__storage_size]; - _Receiver __rcvr_; - __promise_t::__task __task_; - _Awaitable __awaitable1_; - __awaitable_t __awaitable2_; - __awaiter_t __awaiter_; - std::exception_ptr __eptr_{}; - __optional<__result_t> __result_{}; - }; + constexpr void __on_stopped() noexcept + { + STDEXEC::set_stopped(static_cast<_Receiver&&>(__rcvr_)); + } - STDEXEC_PRAGMA_OPTIMIZE_END() + __synthetic_coro_frame __synthetic_frame_{&__promise_t::__resume}; + [[no_unique_address]] + _Receiver __rcvr_; + [[neo_unique_addres]] + __awaitable_state<_Awaitable, __promise_t> __awaiter_; + [[no_unique_address]] + bool __started_{false}; + }; } // namespace __connect_await struct __connect_awaitable_t diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 24d8dbe11..3566d85eb 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -20,6 +20,7 @@ set(stdexec_test_sources stdexec/cpos/test_cpo_receiver.cpp stdexec/cpos/test_cpo_start.cpp stdexec/cpos/test_cpo_connect.cpp + stdexec/cpos/test_cpo_connect_awaitable.cpp stdexec/cpos/test_cpo_schedule.cpp stdexec/cpos/test_cpo_upon_error.cpp stdexec/cpos/test_cpo_upon_stopped.cpp diff --git a/test/stdexec/cpos/test_cpo_connect_awaitable.cpp b/test/stdexec/cpos/test_cpo_connect_awaitable.cpp new file mode 100644 index 000000000..ce9e838b4 --- /dev/null +++ b/test/stdexec/cpos/test_cpo_connect_awaitable.cpp @@ -0,0 +1,770 @@ +/* + * Copyright (c) 2026 Ian Petersen + * Copyright (c) 2026 NVIDIA Corporation + * + * Licensed under the Apache License Version 2.0 with LLVM Exceptions + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://llvm.org/LICENSE.txt + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include + +#if !STDEXEC_NO_STDCPP_COROUTINES() + +# include "test_common/receivers.hpp" +# include "test_common/type_helpers.hpp" +# include +# include + +# include +# include + +namespace ex = STDEXEC; + +namespace +{ + template + struct ready_awaitable + { + public: + explicit constexpr ready_awaitable(T&& t) noexcept + : t_(std::move(t)) + {} + + explicit constexpr ready_awaitable(T const & t) + : t_(t) + {} + + static constexpr bool await_ready() noexcept + { + return true; + } + + static void await_suspend(std::coroutine_handle<>) noexcept + { + FAIL_CHECK("this awaitable should never suspend"); + } + + constexpr T await_resume() noexcept + { + return std::move(t_); + } + + ready_awaitable& base() noexcept + { + return *this; + } + + private: + T t_; + }; + + template <> + struct ready_awaitable + { + static constexpr bool await_ready() noexcept + { + return true; + } + + static void await_suspend(std::coroutine_handle<>) noexcept + { + FAIL_CHECK("this awaitable should never suspend"); + } + + static constexpr void await_resume() noexcept {} + + ready_awaitable& base() noexcept + { + return *this; + } + }; + + template + struct awaitable_ref + { + Awaitable* awaitable_; + + explicit constexpr awaitable_ref(Awaitable& awaitable) noexcept + : awaitable_(&awaitable) + {} + + constexpr auto await_ready() const noexcept(noexcept(awaitable_->await_ready())) + requires requires(Awaitable& a) { + { a.await_ready() }; + } + { + return awaitable_->await_ready(); + } + + template + constexpr auto await_suspend(std::coroutine_handle coro) const + noexcept(noexcept(awaitable_->await_suspend(coro))) + requires requires(Awaitable& a) { + { a.await_suspend(coro) }; + } + { + return awaitable_->await_suspend(coro); + } + + constexpr decltype(auto) await_resume() const noexcept(noexcept(awaitable_->await_resume())) + requires requires(Awaitable& a) { + { a.await_resume() }; + } + { + return awaitable_->await_resume(); + } + + template + requires requires(Awaitable& a, Promise& p) { + { a.as_awaitable(p) } -> ex::__awaitable; + } + constexpr auto as_awaitable(Promise& p) const noexcept(noexcept(awaitable_->as_awaitable(p))) + { + return awaitable_->as_awaitable(p); + } + + constexpr auto operator co_await() const noexcept(noexcept(awaitable_->operator co_await())) + requires requires(Awaitable& a) { + { a.operator co_await() }; + } + { + return awaitable_->operator co_await(); + } + }; + + template + awaitable_ref(Awaitable&) -> awaitable_ref; + + template + requires requires(Awaitable&& a) { + { operator co_await(std::move(a)) }; + } + constexpr auto operator co_await(awaitable_ref ref) + noexcept(noexcept(operator co_await(std::move(*ref.awaitable_)))) + { + return operator co_await(std::move(*ref.awaitable_)); + } + + template + struct suspending_awaitable : ready_awaitable + { + using ready_awaitable::ready_awaitable; + + suspending_awaitable(suspending_awaitable&&) = delete; + + ~suspending_awaitable() = default; + + constexpr void resume_parent() const noexcept + { + parent_.resume(); + } + + static constexpr bool await_ready() noexcept + { + return false; + } + + constexpr void await_suspend(std::coroutine_handle<> coro) noexcept + { + parent_ = coro; + } + + suspending_awaitable& base() noexcept + { + return *this; + } + + private: + std::coroutine_handle<> parent_; + }; + + template + struct conditionally_suspending_awaitable : suspending_awaitable + { + template + requires(sizeof...(U) == 0 && std::same_as) + || (sizeof...(U) == 1 && !std::same_as) + explicit constexpr conditionally_suspending_awaitable(bool suspend, U&&... u) noexcept + : suspending_awaitable(std::forward(u)...) + , suspend_(suspend) + {} + + constexpr bool await_suspend(std::coroutine_handle<> coro) noexcept + { + if (suspend_) + { + suspending_awaitable::await_suspend(coro); + } + + return suspend_; + } + + conditionally_suspending_awaitable& base() noexcept + { + return *this; + } + + private: + bool suspend_; + }; + + template + struct symmetrically_suspending_awaitable : conditionally_suspending_awaitable + { + using conditionally_suspending_awaitable::conditionally_suspending_awaitable; + + constexpr std::coroutine_handle<> await_suspend(std::coroutine_handle<> coro) noexcept + { + if (conditionally_suspending_awaitable::await_suspend(coro)) + { + return std::noop_coroutine(); + } + else + { + return coro; + } + } + + symmetrically_suspending_awaitable& base() noexcept + { + return *this; + } + }; + + template class Wrapper = std::type_identity_t> + struct with_as_awaitable + { + template + requires std::constructible_from + explicit(sizeof...(T) == 1) with_as_awaitable(T&&... t) + noexcept(std::is_nothrow_constructible_v) + : awaitable_(std::forward(t)...) + {} + + template + Wrapper> as_awaitable(Promise&) noexcept + { + return Wrapper>(awaitable_); + } + + auto& base() noexcept + { + return awaitable_.base(); + } + + private: + Awaitable awaitable_; + }; + + template class Wrapper = std::type_identity_t> + struct with_member_co_await + { + template + requires std::constructible_from + explicit(sizeof...(T) == 1) with_member_co_await(T&&... t) + noexcept(std::is_nothrow_constructible_v) + : awaitable_(std::forward(t)...) + {} + + constexpr Wrapper> operator co_await() noexcept + { + return Wrapper>(awaitable_); + } + + auto& base() noexcept + { + return awaitable_.base(); + } + + private: + Awaitable awaitable_; + }; + + template class Wrapper = std::type_identity_t> + struct with_friend_co_await + { + template + requires std::constructible_from + explicit(sizeof...(T) == 1) with_friend_co_await(T&&... t) + noexcept(std::is_nothrow_constructible_v) + : awaitable_(std::forward(t)...) + {} + + auto& base() noexcept + { + return awaitable_.base(); + } + + private: + Awaitable awaitable_; + + template + requires std::same_as, with_friend_co_await> + friend constexpr Wrapper> operator co_await(Self&& self) noexcept + { + return Wrapper>(self.awaitable_); + } + }; + + TEST_CASE("can connect and start a ready_awaitable", "[cpo][cpo_connect_awaitable]") + { + auto test = [](auto awaitable) noexcept + { + auto op = ex::connect(std::move(awaitable), expect_value_receiver{42}); + op.start(); + }; + + test(ready_awaitable{42}); + test(with_as_awaitable>{42}); + test(with_member_co_await>{42}); + test(with_friend_co_await>{42}); + test(with_as_awaitable>>{42}); + test(with_as_awaitable>>{42}); + } + + TEST_CASE("can connect and start a ready_awaitable", "[cpo][cpo_connect_awaitable]") + { + auto test = [](auto awaitable) noexcept + { + auto op = ex::connect(std::move(awaitable), expect_void_receiver{}); + op.start(); + }; + + test(ready_awaitable{}); + test(with_as_awaitable>{}); + test(with_member_co_await>{}); + test(with_friend_co_await>{}); + test(with_as_awaitable>>{}); + test(with_as_awaitable>>{}); + } + + TEST_CASE("can connect and start a suspending_awaitable", "[cpo][cpo_connect_awaitable]") + { + auto test = [](auto awaitable, auto... values) noexcept + { + auto op = ex::connect(awaitable_ref(awaitable), expect_value_receiver{std::move(values)...}); + op.start(); + awaitable.base().resume_parent(); + }; + + test(suspending_awaitable{42}, 42); + test(with_as_awaitable>{42}, 42); + test(with_member_co_await>{42}, 42); + test(with_friend_co_await>{42}, 42); + test(with_as_awaitable>>{42}, 42); + test(with_as_awaitable>>{42}, 42); + + test(suspending_awaitable{}); + test(with_as_awaitable>{}); + test(with_member_co_await>{}); + test(with_friend_co_await>{}); + test(with_as_awaitable>>{}); + test(with_as_awaitable>>{}); + } + + TEST_CASE("can connect and start a conditionally_suspending_awaitable", + "[cpo][cpo_connect_awaitable]") + { + { + auto test = [](auto awaitable, auto... values) noexcept + { + auto op = ex::connect(awaitable_ref(awaitable), + expect_value_receiver{std::move(values)...}); + op.start(); + awaitable.base().resume_parent(); + }; + + test(conditionally_suspending_awaitable(true, 42), 42); + test(with_as_awaitable>(true, 42), 42); + test(with_member_co_await>(true, 42), 42); + test(with_friend_co_await>(true, 42), 42); + test(with_as_awaitable>>(true, + 42), + 42); + test(with_as_awaitable>>(true, + 42), + 42); + + test(conditionally_suspending_awaitable(true)); + test(with_as_awaitable>(true)); + test(with_member_co_await>(true)); + test(with_friend_co_await>(true)); + test(with_as_awaitable>>(true)); + test(with_as_awaitable>>(true)); + } + + { + auto test = [](auto awaitable, auto... values) noexcept + { + auto op = ex::connect(awaitable_ref(awaitable), + expect_value_receiver{std::move(values)...}); + op.start(); + }; + + test(conditionally_suspending_awaitable(false, 42), 42); + test(with_as_awaitable>(false, 42), 42); + test(with_member_co_await>(false, 42), 42); + test(with_friend_co_await>(false, 42), 42); + test(with_as_awaitable>>(false, + 42), + 42); + test(with_as_awaitable>>(false, + 42), + 42); + + test(conditionally_suspending_awaitable(false)); + test(with_as_awaitable>(false)); + test(with_member_co_await>(false)); + test(with_friend_co_await>(false)); + test( + with_as_awaitable>>(false)); + test( + with_as_awaitable>>(false)); + } + } + + TEST_CASE("can connect and start a symmetrically_suspending_awaitable", + "[cpo][cpo_connect_awaitable]") + { + { + auto test = [](auto awaitable, auto... values) noexcept + { + auto op = ex::connect(awaitable_ref(awaitable), + expect_value_receiver{std::move(values)...}); + op.start(); + awaitable.base().resume_parent(); + }; + + test(symmetrically_suspending_awaitable(true, 42), 42); + test(with_as_awaitable>(true, 42), 42); + test(with_member_co_await>(true, 42), 42); + test(with_friend_co_await>(true, 42), 42); + test(with_as_awaitable>>(true, + 42), + 42); + test(with_as_awaitable>>(true, + 42), + 42); + + test(symmetrically_suspending_awaitable(true)); + test(with_as_awaitable>(true)); + test(with_member_co_await>(true)); + test(with_friend_co_await>(true)); + test(with_as_awaitable>>(true)); + test(with_as_awaitable>>(true)); + } + + { + auto test = [](auto awaitable, auto... values) noexcept + { + auto op = ex::connect(awaitable_ref(awaitable), + expect_value_receiver{std::move(values)...}); + op.start(); + }; + + test(symmetrically_suspending_awaitable(false, 42), 42); + test(with_as_awaitable>(false, 42), 42); + test(with_member_co_await>(false, 42), 42); + test(with_friend_co_await>(false, 42), 42); + test(with_as_awaitable>>(false, + 42), + 42); + test(with_as_awaitable>>(false, + 42), + 42); + + test(symmetrically_suspending_awaitable(false)); + test(with_as_awaitable>(false)); + test(with_member_co_await>(false)); + test(with_friend_co_await>(false)); + test( + with_as_awaitable>>(false)); + test( + with_as_awaitable>>(false)); + } + } + + TEST_CASE("exceptions thrown from await_ready are reported to set_error", + "[cpo][cpo_connect_awaitable]") + { + struct throw_on_ready : ready_awaitable + { + static bool await_ready() + { + throw std::runtime_error("not ready!"); + } + }; + + auto op = ex::connect(throw_on_ready{}, expect_error_receiver{}); + op.start(); + } + + TEST_CASE("exceptions thrown from void-returning await_suspend are reported to set_error", + "[cpo][cpo_connect_awaitable]") + { + struct throw_on_suspend : suspending_awaitable + { + static void await_suspend(std::coroutine_handle<>) + { + throw std::runtime_error("do not suspend!"); + } + } awaiter; + + auto op = ex::connect(awaitable_ref{awaiter}, expect_error_receiver{}); + op.start(); + } + + TEST_CASE("exceptions thrown from bool-returning await_suspend are reported to set_error", + "[cpo][cpo_connect_awaitable]") + { + struct throw_on_suspend : suspending_awaitable + { + static bool await_suspend(std::coroutine_handle<>) + { + throw std::runtime_error("do not suspend!"); + } + } awaiter; + + auto op = ex::connect(awaitable_ref{awaiter}, expect_error_receiver{}); + op.start(); + } + + TEST_CASE("exceptions thrown from handle-returning await_suspend are reported to set_error", + "[cpo][cpo_connect_awaitable]") + { + struct throw_on_suspend : suspending_awaitable + { + static std::coroutine_handle<> await_suspend(std::coroutine_handle<>) + { + throw std::runtime_error("do not suspend!"); + } + } awaiter; + + auto op = ex::connect(awaitable_ref{awaiter}, expect_error_receiver{}); + op.start(); + } + + TEST_CASE("exceptions thrown from immediately-invoked await_resume are reported to set_error", + "[cpo][cpo_connect_awaitable]") + { + { + struct throw_on_void_resume : ready_awaitable + { + static void await_resume() + { + throw std::runtime_error("no result for you!"); + } + }; + + auto op = ex::connect(throw_on_void_resume{}, expect_error_receiver{}); + op.start(); + } + { + struct throw_on_int_resume : ready_awaitable + { + static int await_resume() + { + throw std::runtime_error("no result for you!"); + } + }; + + auto op = ex::connect(throw_on_int_resume{}, expect_error_receiver{}); + op.start(); + } + } + + TEST_CASE("exceptions thrown from deferred-invoked await_resume are reported to set_error", + "[cpo][cpo_connect_awaitable]") + { + { + { + struct throw_on_void_resume : suspending_awaitable + { + static void await_resume() + { + throw std::runtime_error("no result for you!"); + } + } awaitable; + + auto op = ex::connect(awaitable_ref{awaitable}, expect_error_receiver{}); + op.start(); + awaitable.resume_parent(); + } + { + struct throw_on_int_resume : suspending_awaitable + { + static int await_resume() + { + throw std::runtime_error("no result for you!"); + } + } awaitable; + + auto op = ex::connect(awaitable_ref{awaitable}, expect_error_receiver{}); + op.start(); + awaitable.resume_parent(); + } + } + } + + template + struct throw_on_get_awaitable + { + template + Awaitable as_awaitable(Promise&) + { + throw std::runtime_error("no awaitable for you!"); + } + }; + + TEST_CASE("exceptions thrown from __get_awaitable are reported to set_error", + "[cpo][cpo_connect_awaitable]") + { + { + auto op = ex::connect(throw_on_get_awaitable>{}, + expect_error_receiver{}); + op.start(); + } + + { + auto op = ex::connect(throw_on_get_awaitable>>{}, + expect_error_receiver{}); + op.start(); + } + + { + auto op = ex::connect(throw_on_get_awaitable>>{}, + expect_error_receiver{}); + op.start(); + } + } + + TEST_CASE("exceptions thrown from member operator co_await are reported to set_error", + "[cpo][cpo_connect_awaitable]") + { + struct throw_on_co_await + { + ready_awaitable operator co_await() + { + throw std::runtime_error("no awaitable for you!"); + } + }; + + { + auto op = ex::connect(throw_on_co_await{}, expect_error_receiver{}); + op.start(); + } + + { + auto op = ex::connect(with_as_awaitable{}, expect_error_receiver{}); + op.start(); + } + } + + struct throw_on_co_await + { + friend ready_awaitable operator co_await(throw_on_co_await&&) + { + throw std::runtime_error("no awaitable for you!"); + } + }; + + TEST_CASE("exceptions thrown from friend operator co_await are reported to set_error", + "[cpo][cpo_connect_awaitable]") + { + { + auto op = ex::connect(throw_on_co_await{}, expect_error_receiver{}); + op.start(); + } + + { + auto op = ex::connect(with_as_awaitable{}, expect_error_receiver{}); + op.start(); + } + } + + struct stop_on_suspend + { + static constexpr bool await_ready() noexcept + { + return false; + } + + template + static constexpr auto + await_suspend(std::coroutine_handle coro) noexcept -> std::coroutine_handle<> + { + return coro.promise().unhandled_stopped(); + } + + static constexpr void await_resume() noexcept {} + }; + + TEST_CASE("promise().unhandled_stopped() invokes set_stopped", "[cpo][cpo_connect_awaitable]") + { + auto op = ex::connect(stop_on_suspend{}, expect_stopped_receiver{}); + op.start(); + } + + template + struct as_immovable : Awaitable + { + using Awaitable::Awaitable; + + as_immovable(as_immovable&&) = delete; + + as_immovable& base() noexcept + { + return *this; + } + }; + + TEST_CASE("can connect and start immovable awaiters", "[cpo][cpo_connect_awaitable]") + { + { + // .as_awaitable(promise) returns an immovable value + auto op = ex::connect(with_as_awaitable, as_immovable>{}, + expect_void_receiver{}); + op.start(); + } + { + // .operator co_await() returns an immovable value + auto op = ex::connect(with_member_co_await, as_immovable>{}, + expect_void_receiver{}); + op.start(); + } + { + // operator co_await(awaitable) returns an immovable value + auto op = ex::connect(with_friend_co_await, as_immovable>{}, + expect_void_receiver{}); + op.start(); + } + { + // both .as_awaitable(promise) and .as_awaitable(promise).operator co_await() return + // immovable values + auto op = + ex::connect(with_as_awaitable, as_immovable>, + as_immovable>{}, + expect_void_receiver{}); + op.start(); + } + { + // both .as_awaitable(promise) and operator co_await(as_awaitable(promise)) return + // immovable values + auto op = + ex::connect(with_as_awaitable, as_immovable>, + as_immovable>{}, + expect_void_receiver{}); + op.start(); + } + } +} // namespace + +#endif