Skip to content

Commit

Permalink
Merge pull request #16 from bemanproject/move-to-execution
Browse files Browse the repository at this point in the history
  • Loading branch information
dietmarkuehl authored Jan 29, 2025
2 parents b153aa2 + f746b24 commit 6972ef9
Show file tree
Hide file tree
Showing 23 changed files with 311 additions and 332 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
sanitizer: [none, asan, usan, tsan]
sanitizer: [none, asan, usan]
compiler: [g++]
steps:
- uses: actions/checkout@v4
Expand Down
14 changes: 7 additions & 7 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,26 @@ set(TARGET_NAME net)
set(TARGET_PREFIX beman.${TARGET_NAME})
set(TARGET_LIBRARY beman_${TARGET_NAME})
set(TARGET_ALIAS beman::${TARGET_NAME})
set(TARGETS_EXPORT_NAME ${CMAKE_PROJECT_NAME}Targets)
set(TARGETS_EXPORT_NAME ${CMAKE_PROJECT_NAME})

set(CMAKE_CXX_STANDARD 23)

include(FetchContent)
FetchContent_Declare(
execution26
# for local development, use SOURCE_DIR <path-to>/execution26
GIT_REPOSITORY https://github.com/bemanproject/execution26
GIT_TAG 752882e
execution
# for local development, use SOURCE_DIR <path-to>/execution
GIT_REPOSITORY https://github.com/bemanproject/execution
GIT_TAG e9c3032
)
FetchContent_MakeAvailable(execution26)
FetchContent_MakeAvailable(execution)

include(CTest)
if(BUILD_TESTING)
add_subdirectory(tests/beman/${TARGET_NAME})
enable_testing()
endif()

add_subdirectory(src/beman/net)
add_subdirectory(src/beman/${TARGET_NAME})
add_subdirectory(examples)

include(GNUInstallDirs)
Expand Down
1 change: 1 addition & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ foreach(EXAMPLE ${EXAMPLES})
add_executable(${EXAMPLE_TARGET})
target_sources(${EXAMPLE_TARGET} PRIVATE ${EXAMPLE}.cpp)
target_link_libraries(${EXAMPLE_TARGET} PRIVATE ${TARGET_LIBRARY})
target_link_libraries(${EXAMPLE_TARGET} PRIVATE beman::execution)
endforeach()
100 changes: 50 additions & 50 deletions examples/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
#include "demo_task.hpp"
#include "demo_scope.hpp"

namespace ex = ::beman::execution26;
namespace ex = ::beman::execution;
namespace net = ::beman::net;

// ----------------------------------------------------------------------------
Expand All @@ -23,14 +23,15 @@ auto main() -> int
demo::scope scope;

scope.spawn(std::invoke(
[](auto& context) -> demo::task<> {
[](auto& ctxt) -> demo::task<> {
on_exit msg("5s timer - done");
co_await net::resume_after(context.get_scheduler(), 5s);
co_await net::resume_after(ctxt.get_scheduler(), 5s);
std::cout << "5s timer expired\n";
} , context)
//net::resume_after(context.get_scheduler(), 5s)
//| ex::then([]{ std::cout << "5s timer expired\n"; })
//| ex::upon_stopped([]{ std::cout << "5s timer got cancelled\n"; })
},
context)
// net::resume_after(context.get_scheduler(), 5s)
//| ex::then([]{ std::cout << "5s timer expired\n"; })
//| ex::upon_stopped([]{ std::cout << "5s timer got cancelled\n"; })
);

auto stop = [&scope, &context]{
Expand All @@ -43,55 +44,54 @@ auto main() -> int
};

scope.spawn(std::invoke(
[](auto& context, auto stop)->demo::task<> {
[](auto& ctxt, auto stp) -> demo::task<> {
on_exit msg("timer task (enqueing stop task) - done");
co_await net::resume_after(context.get_scheduler(), 1s);
stop();
}, context, stop));
co_await net::resume_after(ctxt.get_scheduler(), 1s);
stp();
},
context,
stop));

scope.spawn(std::invoke([](auto& context)->demo::task<> {
on_exit msg("connecting client - done");
net::ip::tcp::endpoint ep(net::ip::address_v4::loopback(), 12345);
net::ip::tcp::socket client(context, ep);

if (false) for (int i{}; i < 5; ++i)
{
std::cout << "i=" << i << "\n";
co_await net::resume_after(context.get_scheduler(), 1s);
}
scope.spawn(std::invoke(
[](auto& ctxt) -> demo::task<> {
on_exit msg("connecting client - done");
net::ip::tcp::endpoint ep(net::ip::address_v4::loopback(), 12345);
net::ip::tcp::socket client(ctxt, ep);

if (not co_await (net::async_connect(client)
| ex::then([](auto&&...){ return true; })
| ex::upon_error([](auto e){
if constexpr (std::same_as<std::error_code, decltype(e)>)
{
std::error_code f = e;
std::cout << "error_code=" << f.message() << "\n";
if (false)
for (int i{}; i < 5; ++i) {
std::cout << "i=" << i << "\n";
co_await net::resume_after(ctxt.get_scheduler(), 1s);
}
else if constexpr (std::same_as<std::exception_ptr, decltype(e)>)
;
else
static_assert(std::same_as<std::error_code, decltype(e)>);
return false;
})))
{
co_return;
}

std::cout << "connected\n";
char message[] = "hello, world\n";
auto b = net::buffer(message);
if (0 < co_await net::async_send(client, b)) {
char buffer[20];
while (auto size = co_await net::async_receive(client, net::buffer(buffer)))
{
std::string_view response(buffer, size);
std::cout << "received='" << response << "'\n";
//if (response.find('\n') != response.npos)
// break;
if (not co_await (net::async_connect(client) | ex::then([](auto&&...) { return true; }) |
ex::upon_error([](auto e) {
if constexpr (std::same_as<std::error_code, decltype(e)>) {
std::error_code f = e;
std::cout << "error_code=" << f.message() << "\n";
} else if constexpr (std::same_as<std::exception_ptr, decltype(e)>)
;
else
static_assert(std::same_as<std::error_code, decltype(e)>);
return false;
}))) {
co_return;
}
}
}, context)

std::cout << "connected\n";
char message[] = "hello, world\n";
auto b = net::buffer(message);
if (0 < co_await net::async_send(client, b)) {
char buffer[20];
while (auto size = co_await net::async_receive(client, net::buffer(buffer))) {
std::string_view response(buffer, size);
std::cout << "received='" << response << "'\n";
// if (response.find('\n') != response.npos)
// break;
}
}
},
context)
#if 0
| ex::upon_error([](std::exception_ptr ex) {
try {
Expand Down
23 changes: 13 additions & 10 deletions examples/cppcon-2024.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception

#include <beman/net/net.hpp>
#include <beman/execution26/execution.hpp>
#include <beman/execution/execution.hpp>
#include "demo_algorithm.hpp"
#include "demo_error.hpp"
#include "demo_scope.hpp"
Expand All @@ -14,7 +14,7 @@
#include <string_view>
#include <unordered_map>

namespace ex = beman::execution26;
namespace ex = beman::execution;
namespace net = beman::net;
using namespace std::chrono_literals;

Expand Down Expand Up @@ -95,14 +95,17 @@ auto main() -> int
net::ip::tcp::endpoint endpoint(net::ip::address_v4::any(), 12345);
net::ip::tcp::acceptor acceptor(context, endpoint);

scope.spawn(std::invoke([](auto scheduler, auto& scope, auto& acceptor)->demo::task<> {
while (true)
{
auto[stream, address] = co_await net::async_accept(acceptor);
std::cout << "received client: " << address << "\n";
scope.spawn(make_client_handler(scheduler, std::move(stream)));
}
}, context.get_scheduler(), scope, acceptor));
scope.spawn(std::invoke(
[](auto scheduler, auto& scp, auto& acc) -> demo::task<> {
while (true) {
auto [stream, address] = co_await net::async_accept(acc);
std::cout << "received client: " << address << "\n";
scp.spawn(make_client_handler(scheduler, std::move(stream)));
}
},
context.get_scheduler(),
scope,
acceptor));

context.run();
}
69 changes: 23 additions & 46 deletions examples/demo_algorithm.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,9 @@ struct demo::into_error_t::sender
using sender_concept = ex::sender_t;
template <typename Env>
auto get_completion_signatures(Env const& env) const {
return ::beman::execution26::detail::meta::transform<
demo::detail::into_error_transform<Fun>::template type,
decltype(ex::get_completion_signatures(::std::declval<Sender>(),
env))
>();
return ::beman::execution::detail::meta::transform<demo::detail::into_error_transform<Fun>::template type,
decltype(ex::get_completion_signatures(
::std::declval<Sender>(), env))>();
}

template <ex::receiver Receiver>
Expand All @@ -208,10 +206,9 @@ struct demo::into_error_t::sender
};

template <demo::ex::sender Sender, typename Fun>
inline auto demo::into_error_t::operator()(Sender&& sender, Fun&& fun) const
-> demo::into_error_t::sender<::std::remove_cvref_t<Sender>, ::std::remove_cvref_t<Fun>>
{
return {::std::forward<Sender>(sender), ::std::forward<Fun>(fun)};
inline auto demo::into_error_t::operator()(Sender&& sndr, Fun&& fun) const
-> demo::into_error_t::sender<::std::remove_cvref_t<Sender>, ::std::remove_cvref_t<Fun>> {
return {::std::forward<Sender>(sndr), ::std::forward<Fun>(fun)};
}

template <typename Fun>
Expand Down Expand Up @@ -264,11 +261,8 @@ struct demo::when_any_t::state_base
::demo::ex::inplace_stop_source source{};

template <typename R>
state_base(std::size_t total, R&& receiver)
: total(total)
, receiver(::std::forward<R>(receiver))
{
}
state_base(std::size_t tot, R&& rcvr) : total(tot), receiver(::std::forward<R>(rcvr)) {}
virtual ~state_base() = default;
auto complete() -> bool
{
if (0u == this->done_count++)
Expand Down Expand Up @@ -296,10 +290,7 @@ struct demo::when_any_t::state_value
::std::optional<Value> value{};

template <typename R>
state_value(::std::size_t total, R&& receiver)
: state_base<Receiver>{total, ::std::forward<R>(receiver)}
{
}
state_value(::std::size_t tot, R&& rcvr) : state_base<Receiver>{tot, ::std::forward<R>(rcvr)} {}

auto notify_done() -> void override
{
Expand Down Expand Up @@ -386,22 +377,15 @@ struct demo::when_any_t::state<::std::index_sequence<I...>, Receiver, Value, Err
template <::std::size_t J>
using receiver_type = when_any_t::receiver<J, Receiver, value_type, error_type>;
using operation_state_concept = ex::operation_state_t;
using states_type = ::beman::execution26::detail::product_type<
decltype(
demo::ex::connect(::std::declval<Sender>(),
::std::declval<receiver_type<I>>())
)...>;
using states_type = ::beman::execution::detail::product_type<decltype(demo::ex::connect(
::std::declval<Sender>(), ::std::declval<receiver_type<I>>()))...>;
states_type states;

template <typename R, typename P>
state(R&& receiver, P&& s)
: state_value<Receiver, value_type, error_type>(sizeof...(Sender), ::std::forward<R>(receiver))
, states{demo::ex::connect(
::beman::net::detail::ex::detail::forward_like<P>(s.template get<I>()),
receiver_type<I>{this}
)...}
{
}
state(R&& rcvr, P&& s)
: state_value<Receiver, value_type, error_type>(sizeof...(Sender), ::std::forward<R>(rcvr)),
states{demo::ex::connect(::beman::net::detail::ex::detail::forward_like<P>(s.template get<I>()),
receiver_type<I>{this})...} {}
state(state&&) = delete;
auto start() & noexcept -> void
{
Expand All @@ -414,16 +398,11 @@ struct demo::when_any_t::state<::std::index_sequence<I...>, Receiver, Value, Err
template <demo::ex::sender... Sender>
struct demo::when_any_t::sender
{
::beman::execution26::detail::product_type<::std::remove_cvref_t<Sender>...> sender;
::beman::execution::detail::product_type<::std::remove_cvref_t<Sender>...> sender;
using sender_concept = ex::sender_t;
using completion_signatures =
::beman::execution26::detail::meta::unique<
::beman::execution26::detail::meta::combine<
decltype(ex::get_completion_signatures(::std::declval<Sender&&>(),
ex::empty_env{}))...
>
>;

using completion_signatures = ::beman::execution::detail::meta::unique<::beman::execution::detail::meta::combine<
decltype(ex::get_completion_signatures(::std::declval<Sender&&>(), ex::empty_env{}))...>>;

template <demo::ex::receiver Receiver>
auto connect(Receiver&& receiver) &&
-> state<::std::index_sequence_for<Sender...>,
Expand All @@ -449,11 +428,9 @@ struct demo::when_any_t::sender
};

template <demo::ex::sender... Sender>
requires (0u < sizeof...(Sender))
inline auto demo::when_any_t::operator()(Sender&&...sender) const
-> ::demo::when_any_t::sender<Sender...>
{
return {::std::forward<Sender>(sender)...};
requires(0u < sizeof...(Sender))
inline auto demo::when_any_t::operator()(Sender&&... sndr) const -> ::demo::when_any_t::sender<Sender...> {
return {::std::forward<Sender>(sndr)...};
}

// ----------------------------------------------------------------------------
Expand Down
7 changes: 3 additions & 4 deletions examples/demo_scope.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,10 @@ namespace demo
}
auto complete() -> void
{
scope* self{this->self};
scope* slf{this->self};
delete this->state;
if (0u == --self->count)
{
self->complete();
if (0u == --slf->count) {
slf->complete();
}
}
auto get_env() const noexcept -> env { return {this->self}; }
Expand Down
Loading

0 comments on commit 6972ef9

Please sign in to comment.