diff --git a/include/beman/execution26/detail/basic_sender.hpp b/include/beman/execution26/detail/basic_sender.hpp index af0d0272..ce765f1e 100644 --- a/include/beman/execution26/detail/basic_sender.hpp +++ b/include/beman/execution26/detail/basic_sender.hpp @@ -74,7 +74,7 @@ struct basic_sender : ::beman::execution26::detail::product_type(self), ::std::move(receiver)}; } #endif -#if __cpp_explicit_this_parameter < 202110L +#if __cpp_explicit_this_parameter < 302110L template auto get_completion_signatures(Env&&) && -> ::beman::execution26::detail::completion_signatures_for { @@ -82,7 +82,7 @@ struct basic_sender : ::beman::execution26::detail::product_type auto get_completion_signatures( - Env&&) const&& -> ::beman::execution26::detail::completion_signatures_for { + Env&&) const&& -> ::beman::execution26::detail::completion_signatures_for { return {}; } template @@ -92,7 +92,7 @@ struct basic_sender : ::beman::execution26::detail::product_type auto get_completion_signatures( - Env&&) const& -> ::beman::execution26::detail::completion_signatures_for { + Env&&) const& -> ::beman::execution26::detail::completion_signatures_for { return {}; } #else diff --git a/include/beman/execution26/detail/connect.hpp b/include/beman/execution26/detail/connect.hpp index 0ec5a2c8..f47b3631 100644 --- a/include/beman/execution26/detail/connect.hpp +++ b/include/beman/execution26/detail/connect.hpp @@ -23,7 +23,9 @@ namespace beman::execution26::detail { struct connect_t { private: template - static auto make_new_sender(Sender&& sender, Receiver&& receiver) noexcept(true) -> decltype(auto) { + static auto make_new_sender(Sender&& sender, Receiver&& receiver) + //-dk:TODO this noexcept needs to get confirmed/fixed + noexcept(true) -> decltype(auto) { return ::beman::execution26::transform_sender( decltype(::beman::execution26::detail::get_domain_late(::std::forward(sender), ::beman::execution26::get_env(receiver))){}, diff --git a/include/beman/execution26/detail/on.hpp b/include/beman/execution26/detail/on.hpp new file mode 100644 index 00000000..10d4a443 --- /dev/null +++ b/include/beman/execution26/detail/on.hpp @@ -0,0 +1,169 @@ +// include/beman/execution26/detail/on.hpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#ifndef INCLUDED_INCLUDE_BEMAN_EXECUTION26_DETAIL_ON +#define INCLUDED_INCLUDE_BEMAN_EXECUTION26_DETAIL_ON + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +// ---------------------------------------------------------------------------- + +namespace beman::execution26::detail { +struct on_t : ::beman::execution26::sender_adaptor_closure { + template <::beman::execution26::detail::sender_for OutSndr, typename Env> + auto transform_env(OutSndr&& out_sndr, Env&& env) const -> decltype(auto) { + auto&& data{out_sndr.template get<1>()}; + + if constexpr (::beman::execution26::scheduler) + return ::beman::execution26::detail::join_env( + ::beman::execution26::detail::sched_env(::beman::execution26::detail::forward_like(data) + + ), + ::beman::execution26::detail::fwd_env(::std::forward(env))); + else + return std::forward(env); + } + + template + struct env_needs_get_scheduler { + using sender_concept = ::beman::execution26::sender_t; + template + auto get_completion_signatures(Env&&) const { + return env_needs_get_scheduler{}; + } + }; + + template <::beman::execution26::detail::sender_for OutSndr, typename Env> + auto transform_sender(OutSndr&& out_sndr, Env&& env) const -> decltype(auto) { + struct not_a_scheduler {}; + auto&& [_, data, child] = out_sndr; + + if constexpr (::beman::execution26::scheduler) { + auto sch{::beman::execution26::detail::query_with_default( + ::beman::execution26::get_scheduler, env, not_a_scheduler{})}; + if constexpr (::std::same_as) { + return env_needs_get_scheduler{}; + } else { + return ::beman::execution26::continues_on( + ::beman::execution26::starts_on(::beman::execution26::detail::forward_like(data), + ::beman::execution26::detail::forward_like(child)), + ::std::move(sch)); + } + } else { + auto& [sch, closure] = data; + auto orig_sch{::beman::execution26::detail::query_with_default( + ::beman::execution26::get_completion_scheduler<::beman::execution26::set_value_t>, + ::beman::execution26::get_env(child), + ::beman::execution26::detail::query_with_default( + ::beman::execution26::get_scheduler, env, not_a_scheduler{}))}; + + if constexpr (::std::same_as) { + return env_needs_get_scheduler{}; + } else { + return ::beman::execution26::detail::write_env( + ::beman::execution26::continues_on( + ::beman::execution26::detail::forward_like(closure)( + ::beman::execution26::continues_on( + ::beman::execution26::detail::write_env( + ::beman::execution26::detail::forward_like(child), + ::beman::execution26::detail::sched_env(orig_sch)), + sch)), + orig_sch), + ::beman::execution26::detail::sched_env(env)); + } + } + } + + template <::beman::execution26::scheduler Sch, ::beman::execution26::sender Sndr> + requires ::beman::execution26::detail::is_sender_adaptor_closure + auto operator()(Sch&&, Sndr&&) const -> void = + BEMAN_EXECUTION26_DELETE("on(sch, sndr) requires that sndr isn't both a sender and sender adaptor closure"); + + template <::beman::execution26::scheduler Sch, + ::beman::execution26::sender Sndr, + ::beman::execution26::detail::is_sender_adaptor_closure Closure> + requires ::beman::execution26::detail::is_sender_adaptor_closure + auto operator()(Sndr&&, Sch&&, Closure&&) const -> void = + BEMAN_EXECUTION26_DELETE("on(sch, sndr) requires that sndr isn't both a sender and sender adaptor closure"); + + template <::beman::execution26::scheduler Sch, ::beman::execution26::sender Sndr> + auto operator()(Sch&& sch, Sndr&& sndr) const { + auto domain{::beman::execution26::detail::query_with_default( + ::beman::execution26::get_domain, sch, ::beman::execution26::default_domain{})}; + return ::beman::execution26::transform_sender( + domain, + ::beman::execution26::detail::make_sender(*this, ::std::forward(sch), ::std::forward(sndr))); + } + template <::beman::execution26::scheduler Sch, + ::beman::execution26::sender Sndr, + ::beman::execution26::detail::is_sender_adaptor_closure Closure> + auto operator()(Sndr&& sndr, Sch&& sch, Closure&& closure) const { + auto domain{::beman::execution26::detail::get_domain_early(sndr)}; + return ::beman::execution26::transform_sender( + domain, + ::beman::execution26::detail::make_sender( + *this, + ::beman::execution26::detail::product_type{::std::forward(sch), ::std::forward(closure)}, + ::std::forward(sndr))); + } + template <::beman::execution26::scheduler Sch, ::beman::execution26::detail::is_sender_adaptor_closure Closure> + auto operator()(Sch&& sch, Closure&& closure) const { + return ::beman::execution26::detail::sender_adaptor{ + *this, ::std::forward(sch), ::std::forward(closure)}; + } +}; + +#if 0 +template +struct completion_signatures_for_impl< + ::beman::execution26::detail::basic_sender<::beman::execution26::detail::on_t, Data, Sender>, + Env> { + //-dk:TODO pick up scheduler errors and merge them in? + using type = +#if 0 + ::beman::execution26::detail::meta::combine< + ::beman::execution26::completion_signatures_of_t, + ::beman::execution26::completion_signatures<::beman::execution26::set_error_t(::std::exception_ptr)> + > +#else + ::beman::execution26::completion_signatures< + ::beman::execution26::set_value_t(), + ::beman::execution26::set_error_t(::std::exception_ptr) + > +#endif + ; +}; +#endif + +} // namespace beman::execution26::detail + +namespace beman::execution26 { +using on_t = ::beman::execution26::detail::on_t; +inline constexpr ::beman::execution26::on_t on{}; +} // namespace beman::execution26 + +// ---------------------------------------------------------------------------- + +#include + +#endif diff --git a/include/beman/execution26/detail/product_type.hpp b/include/beman/execution26/detail/product_type.hpp index 8b1d9e49..fdfaec3b 100644 --- a/include/beman/execution26/detail/product_type.hpp +++ b/include/beman/execution26/detail/product_type.hpp @@ -10,7 +10,8 @@ // ---------------------------------------------------------------------------- namespace beman::execution26::detail { -template <::std::size_t, typename T> + +template <::std::size_t I, typename T> struct product_type_element { T value; auto operator==(const product_type_element&) const -> bool = default; @@ -23,6 +24,7 @@ template <::std::size_t... I, typename... T> struct product_type_base<::std::index_sequence, T...> : ::beman::execution26::detail::product_type_element... { static constexpr ::std::size_t size() { return sizeof...(T); } + static constexpr bool is_product_type{true}; template <::std::size_t J, typename S> static auto element_get(::beman::execution26::detail::product_type_element& self) noexcept -> S& { @@ -64,6 +66,9 @@ struct product_type_base<::std::index_sequence, T...> auto operator==(const product_type_base&) const -> bool = default; }; +template +concept is_product_type_c = requires(const T& t) { T::is_product_type; }; + template struct product_type : ::beman::execution26::detail::product_type_base<::std::index_sequence_for, T...> { template @@ -108,13 +113,14 @@ constexpr auto is_product_type(const ::beman::execution26::detail::product_type< } // namespace beman::execution26::detail namespace std { -template -struct tuple_size<::beman::execution26::detail::product_type> - : ::std::integral_constant {}; -template <::std::size_t I, typename... T> -struct tuple_element> { - using type = - ::std::decay_t>().template get())>; +template + requires ::beman::execution26::detail::is_product_type_c +struct tuple_size : ::std::integral_constant {}; + +template <::std::size_t I, typename T> + requires ::beman::execution26::detail::is_product_type_c +struct tuple_element { + using type = ::std::decay_t().template get())>; }; } // namespace std diff --git a/include/beman/execution26/detail/schedule_from.hpp b/include/beman/execution26/detail/schedule_from.hpp index b0ac6e20..9894e830 100644 --- a/include/beman/execution26/detail/schedule_from.hpp +++ b/include/beman/execution26/detail/schedule_from.hpp @@ -5,11 +5,13 @@ #define INCLUDED_BEMAN_EXECUTION26_DETAIL_SCHEDULE_FROM #include +#include #include #include #include #include #include +#include #include #include #include @@ -144,10 +146,15 @@ struct impls_for<::beman::execution26::detail::schedule_from_t> : ::beman::execu ::std::monostate, ::beman::execution26::detail::meta::transform< ::beman::execution26::detail::as_tuple_t, - ::beman::execution26::detail::meta::to<::std::variant, - ::beman::execution26::completion_signatures_of_t< - ::beman::execution26::detail::child_type, - ::beman::execution26::env_of_t>>>>>; + ::beman::execution26::detail::meta::to< + ::std::variant, + ::beman::execution26::detail::meta::combine< + ::beman::execution26::completion_signatures_of_t< + ::beman::execution26::detail::child_type, + ::beman::execution26::env_of_t>, + //-dk:TODO get proper error completion signatures + ::beman::execution26::completion_signatures<::beman::execution26::set_error_t( + ::std::exception_ptr)>>>>>>; return state_type(sch, receiver); }}; @@ -176,8 +183,15 @@ template struct completion_signatures_for_impl< ::beman::execution26::detail::basic_sender<::beman::execution26::detail::schedule_from_t, Scheduler, Sender>, Env> { - using type = - decltype(::beman::execution26::get_completion_signatures(::std::declval(), ::std::declval())); + using scheduler_sender = decltype(::beman::execution26::schedule(::std::declval())); + template + using as_set_error = ::beman::execution26::completion_signatures<::beman::execution26::set_error_t(E)...>; + using type = ::beman::execution26::detail::meta::combine< + decltype(::beman::execution26::get_completion_signatures(::std::declval(), ::std::declval())), + ::beman::execution26::error_types_of_t, + ::beman::execution26::completion_signatures<::beman::execution26::set_error_t( + ::std::exception_ptr)> //-dk:TODO this one should be deduced + >; }; } // namespace beman::execution26::detail diff --git a/include/beman/execution26/detail/sender_adaptor_closure.hpp b/include/beman/execution26/detail/sender_adaptor_closure.hpp index 53945dc2..23da0705 100644 --- a/include/beman/execution26/detail/sender_adaptor_closure.hpp +++ b/include/beman/execution26/detail/sender_adaptor_closure.hpp @@ -20,8 +20,16 @@ namespace beman::execution26 { template struct sender_adaptor_closure : ::beman::execution26::detail::pipeable::sender_adaptor_closure_base {}; // NOLINTEND(bugprone-crtp-constructor-accessibility) + } // namespace beman::execution26 +namespace beman::execution26::detail { +template +concept is_sender_adaptor_closure = + ::std::derived_from<::std::decay_t, + ::beman::execution26::sender_adaptor_closure<::std::decay_t>>; +} + namespace beman::execution26::detail::pipeable { template <::beman::execution26::sender Sender, typename Adaptor> requires(not::beman::execution26::sender) && diff --git a/include/beman/execution26/detail/transform_sender.hpp b/include/beman/execution26/detail/transform_sender.hpp index 376a3b6b..def72c09 100644 --- a/include/beman/execution26/detail/transform_sender.hpp +++ b/include/beman/execution26/detail/transform_sender.hpp @@ -16,9 +16,9 @@ template requires(Domain dom, Sender&& sender, const Env&... env) { dom.transform_sender(::std::forward(sender), env...); } && - (::std::same_as< - ::std::remove_cvref_t, - std::remove_cvref_t().transform_sender(::std::declval()))>>) + (::std::same_as<::std::remove_cvref_t, + std::remove_cvref_t().transform_sender( + ::std::declval(), ::std::declval()...))>>) constexpr auto transform_sender(Domain, Sender&& sender, const Env&...) noexcept -> ::beman::execution26::sender auto { return ::std::forward(sender); } @@ -28,9 +28,9 @@ template requires(Domain dom, Sender&& sender, const Env&... env) { dom.transform_sender(::std::forward(sender), env...); } && - (not::std::same_as< - ::std::remove_cvref_t, - std::remove_cvref_t().transform_sender(::std::declval()))>>) + (not::std::same_as<::std::remove_cvref_t, + std::remove_cvref_t().transform_sender( + ::std::declval(), ::std::declval()...))>>) constexpr auto transform_sender(Domain dom, Sender&& sender, const Env&... env) noexcept -> ::beman::execution26::sender decltype(auto) { return ::beman::execution26::detail::transform_sender( @@ -60,7 +60,6 @@ template constexpr auto transform_sender(Domain dom, Sender&& sender, const Env&... env) noexcept(noexcept( ::beman::execution26::default_domain{}.transform_sender(::std::declval(), ::std::declval()...))) -> ::beman::execution26::sender decltype(auto) { - (void)dom; return ::beman::execution26::detail::transform_sender( dom, ::beman::execution26::default_domain{}.transform_sender(::std::forward(sender), env...), env...); } @@ -72,9 +71,9 @@ template requires(Domain dom, Sender&& sender, const Env&... env) { dom.transform_sender(::std::forward(sender), env...); } && - (::std::same_as< - ::std::remove_cvref_t, - std::remove_cvref_t().transform_sender(::std::declval()))>>) + (::std::same_as<::std::remove_cvref_t, + std::remove_cvref_t().transform_sender( + ::std::declval(), ::std::declval()...))>>) constexpr auto transform_sender(Domain, Sender&& sender, const Env&...) noexcept -> ::beman::execution26::sender decltype(auto) { return ::std::forward(sender); @@ -85,9 +84,9 @@ template requires(Domain dom, Sender&& sender, const Env&... env) { dom.transform_sender(::std::forward(sender), env...); } && - (not::std::same_as< - ::std::remove_cvref_t, - std::remove_cvref_t().transform_sender(::std::declval()))>>) + (not::std::same_as<::std::remove_cvref_t, + std::remove_cvref_t().transform_sender( + ::std::declval(), ::std::declval()...))>>) constexpr auto transform_sender(Domain dom, Sender&& sender, const Env&... env) noexcept -> ::beman::execution26::sender auto { return ::beman::execution26::detail::transform_sender( diff --git a/include/beman/execution26/execution.hpp b/include/beman/execution26/execution.hpp index 10f33ede..78cfd62e 100644 --- a/include/beman/execution26/execution.hpp +++ b/include/beman/execution26/execution.hpp @@ -42,15 +42,16 @@ #include #include #include +#include #include #include #include #include #include -#include #include #include #include +#include // ---------------------------------------------------------------------------- diff --git a/src/beman/execution26/CMakeLists.txt b/src/beman/execution26/CMakeLists.txt index a7463994..1c02b30a 100644 --- a/src/beman/execution26/CMakeLists.txt +++ b/src/beman/execution26/CMakeLists.txt @@ -123,6 +123,7 @@ target_sources( ${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/nostopstate.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/nothrow_callable.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/notify.hpp + ${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/on.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/on_stop_request.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/operation_state.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/operation_state_task.hpp diff --git a/tests/beman/execution26/CMakeLists.txt b/tests/beman/execution26/CMakeLists.txt index 068148df..b9130f6e 100644 --- a/tests/beman/execution26/CMakeLists.txt +++ b/tests/beman/execution26/CMakeLists.txt @@ -11,6 +11,7 @@ endif() list( APPEND execution_tests + exec-on.test notify.test exec-scounting.test exec-awaitable.test diff --git a/tests/beman/execution26/exec-on.test.cpp b/tests/beman/execution26/exec-on.test.cpp new file mode 100644 index 00000000..be930dc9 --- /dev/null +++ b/tests/beman/execution26/exec-on.test.cpp @@ -0,0 +1,134 @@ +// tests/beman/execution26/exec-on.test.cpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +// ---------------------------------------------------------------------------- + +namespace { +struct both : test_std::sender_adaptor_closure { + using sender_concept = test_std::sender_t; +}; + +static_assert(test_std::sender); +static_assert(test_detail::is_sender_adaptor_closure); + +template +auto test_interface(Sch sch, Sndr sndr, Closure closure, Both both) -> void { + static_assert(requires { + { test_std::on(sch, sndr) } -> test_std::sender; + }); + static_assert(not requires { test_std::on(sch, both); }); + static_assert(requires { + { test_std::on(sndr, sch, closure) } -> test_std::sender; + }); + static_assert(not requires { test_std::on(both, sch, closure); }); + + auto sndr1{test_std::on(sch, sndr)}; + auto sndr2{test_std::on(sndr, sch, closure)}; + test::use(sndr1, sndr2); +} + +template OutSndr> +auto test_transform_env(OutSndr out_sndr) -> void { + auto e{test_std::on.transform_env(out_sndr, test_std::empty_env{})}; + test::use(e); +} + +template OutSndr> +auto test_transform_sender(OutSndr out_sndr) -> void { + auto s{test_std::on.transform_sender(std::move(out_sndr), test_std::empty_env{})}; + static_assert(test_std::sender); + auto ts{std::move(s) | test_std::then([](auto&&...) {})}; + static_assert(test_std::sender); +} + +struct on_receiver { + using receiver_concept = test_std::receiver_t; + test::thread_pool& pool; + auto set_value(auto&&...) && noexcept {} + auto set_error(auto&&) && noexcept {} + auto set_stopped() && noexcept {} + auto get_env() const noexcept { return test_detail::make_env(test_std::get_scheduler, pool.get_scheduler()); } +}; +static_assert(test_std::receiver); +} // namespace + +TEST(exec_on) { + test::thread_pool pool{}; + + static_assert(std::same_as); + static_assert(test_detail::is_sender_adaptor_closure); + static_assert(not test_detail::is_sender_adaptor_closure); + test_interface(pool.get_scheduler(), test_std::just(), test_std::then([] {}), both{}); + + test_transform_env(test_detail::make_sender(test_std::on, pool.get_scheduler(), test_std::just())); + test_transform_env(test_detail::make_sender( + test_std::on, + ::beman::execution26::detail::product_type{pool.get_scheduler(), test_std::then([] {})}, + test_std::just())); + + test_transform_sender(test_detail::make_sender(test_std::on, pool.get_scheduler(), test_std::just())); + test_transform_sender(test_detail::make_sender( + test_std::on, + ::beman::execution26::detail::product_type{pool.get_scheduler(), test_std::then([] {})}, + test_std::just())); + + std::thread::id on_id{}; + std::thread::id pool_id{}; + std::thread::id cont_id{}; + + test_std::sync_wait(test_std::starts_on(pool.get_scheduler(), test_std::just() | test_std::then([&pool_id] { + pool_id = std::this_thread::get_id(); + }))); + test_std::sync_wait(test_std::on(pool.get_scheduler(), test_std::just() | test_std::then([&on_id] { + on_id = std::this_thread::get_id(); + return 42; + })) | + test_std::then([&cont_id](int val) { + assert(val == 42); + cont_id = std::this_thread::get_id(); + })); + assert(on_id == pool_id); + assert(cont_id == std::this_thread::get_id()); + assert(on_id != std::this_thread::get_id()); + + test_std::sync_wait(test_std::on(test_std::just(17), pool.get_scheduler(), test_std::then([&on_id](int val) { + assert(val == 17); + on_id = std::this_thread::get_id(); + return 42; + })) | + test_std::then([&cont_id](int val) { + assert(val == 42); + cont_id = std::this_thread::get_id(); + })); + assert(on_id == pool_id); + assert(cont_id == std::this_thread::get_id()); + assert(on_id != std::this_thread::get_id()); + + test_std::sync_wait(test_std::just(17) | test_std::on(pool.get_scheduler(), test_std::then([&on_id](int val) { + assert(val == 17); + on_id = std::this_thread::get_id(); + return 42; + })) | + test_std::then([&cont_id](int val) { + assert(val == 42); + cont_id = std::this_thread::get_id(); + })); + assert(on_id == pool_id); + assert(cont_id == std::this_thread::get_id()); + assert(on_id != std::this_thread::get_id()); +} diff --git a/tests/beman/execution26/exec-snd-expos.test.cpp b/tests/beman/execution26/exec-snd-expos.test.cpp index 27c1e12e..1dcfcc28 100644 --- a/tests/beman/execution26/exec-snd-expos.test.cpp +++ b/tests/beman/execution26/exec-snd-expos.test.cpp @@ -858,6 +858,27 @@ auto test_product_type() -> void { ASSERT(p5.get<2>() == nm(3)); ASSERT(p5.get<3>() == nm(4)); ASSERT(p5.get<4>() == nm(5)); + + test_detail::product_type prod{1, true, 'c'}; + static_assert(test_detail::is_product_type_c); + static_assert(3u == std::tuple_size::value); + static_assert(std::same_as::type>); + static_assert(std::same_as::type>); + static_assert(std::same_as::type>); + auto&& [i, b, c] = prod; + test::use(i, b, c); + + struct derived : decltype(prod) {}; + static_assert(3u == std::tuple_size::value); + static_assert(std::same_as::type>); + static_assert(std::same_as::type>); + static_assert(std::same_as::type>); + derived d{1, true, 'c'}; + auto&& [di, db, dc] = d; + assert(di == d.get<0>()); + assert(db == d.get<1>()); + assert(dc == d.get<2>()); + test::use(di, db, dc); } auto test_connect_all() -> void { static_assert(test_std::operation_state>); diff --git a/tests/beman/execution26/include/test/thread_pool.hpp b/tests/beman/execution26/include/test/thread_pool.hpp new file mode 100644 index 00000000..5313c128 --- /dev/null +++ b/tests/beman/execution26/include/test/thread_pool.hpp @@ -0,0 +1,117 @@ +// tests/beman/execution26/include/test/thread_pool.hpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +// ---------------------------------------------------------------------------- + +#ifndef INCLUDED_TESTS_BEMAN_EXECUTION26_INCLUDE_TEST_THREAD_POOL +#define INCLUDED_TESTS_BEMAN_EXECUTION26_INCLUDE_TEST_THREAD_POOL + +#include +#include + +#include +#include +#include +#include +// ---------------------------------------------------------------------------- + +namespace test { +struct thread_pool; +} + +struct test::thread_pool { + struct node { + node* next{}; + virtual void run() = 0; + + node(const node&) = delete; + node(node&&) = delete; + node& operator=(const node&) = delete; + node& operator=(node&&) = delete; + + protected: + node() = default; + ~node() = default; + }; + + std::mutex mutex; + std::condition_variable condition; + node* stack{}; + bool stopped{false}; + std::thread driver{[this] { + while (std::optional n = [this] { + std::unique_lock cerberus(mutex); + condition.wait(cerberus, [this] { return stopped || stack; }); + return this->stack ? std::optional(std::exchange(this->stack, this->stack->next)) + : std::optional(); + }()) { + (*n)->run(); + } + }}; + + thread_pool() = default; + thread_pool(thread_pool&&) = delete; + thread_pool(const thread_pool&) = delete; + ~thread_pool() { + this->stop(); + this->driver.join(); + } + thread_pool& operator=(thread_pool&&) = delete; + thread_pool& operator=(const thread_pool&) = delete; + void stop() { + { + std::lock_guard cerberus(this->mutex); + stopped = true; + } + this->condition.notify_one(); + } + + struct scheduler { + using scheduler_concept = test_std::scheduler_t; + struct env { + test::thread_pool* pool; + + template + scheduler query(const test_std::get_completion_scheduler_t&) const noexcept { + return {this->pool}; + } + }; + template + struct state final : test::thread_pool::node { + using operation_state_concept = test_std::operation_state_t; + std::remove_cvref_t receiver; + test::thread_pool* pool; + + template + state(R&& r, test::thread_pool* p) : node{}, receiver(std::forward(r)), pool(p) {} + void start() & noexcept { + { + std::lock_guard cerberus(this->pool->mutex); + this->next = std::exchange(this->pool->stack, this); + } + this->pool->condition.notify_one(); + } + void run() override { test_std::set_value(std::move(this->receiver)); } + }; + struct sender { + using sender_concept = test_std::sender_t; + using completion_signatures = test_std::completion_signatures; + test::thread_pool* pool; + template + state connect(Receiver&& receiver) { + return state(std::forward(receiver), pool); + } + + env get_env() const noexcept { return {this->pool}; } + }; + test::thread_pool* pool; + sender schedule() { return {this->pool}; } + bool operator==(const scheduler&) const = default; + }; + scheduler get_scheduler() { return {this}; } +}; + +static_assert(test_std::scheduler); + +// ---------------------------------------------------------------------------- + +#endif