From 2c74290969922d973780927642def910abd8fd47 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20Chor=C4=85=C5=BCewicz?= Date: Mon, 6 Dec 2021 18:28:08 +0100 Subject: [PATCH] examples: Add example with coroutines How it works: When co_await async_memcpy is called, the compiler calls async_memcpy::awaitable::await_suspend() which puts miniasync future and coroutine handle to a shared containers. It also registers miniasync notifier so that other entities can now when the future completes. There is also an executor loop which takes elements from the container and executes them (calls runtime_wait for miniasync future and and then resumes the coroutine). --- .github/workflows/on_pull_request.yml | 2 +- CMakeLists.txt | 7 +- examples/CMakeLists.txt | 5 + .../coroutine_memcpy/coroutine_helpers.hpp | 118 ++++++++++++++++++ .../coroutine_memcpy/coroutine_memcpy.cpp | 95 ++++++++++++++ .../coroutine_memcpy/miniaync_awaitable.hpp | 77 ++++++++++++ utils/docker/build.sh | 2 + utils/docker/run-build.sh | 2 + 8 files changed, 306 insertions(+), 2 deletions(-) create mode 100644 examples/coroutine_memcpy/coroutine_helpers.hpp create mode 100644 examples/coroutine_memcpy/coroutine_memcpy.cpp create mode 100644 examples/coroutine_memcpy/miniaync_awaitable.hpp diff --git a/.github/workflows/on_pull_request.yml b/.github/workflows/on_pull_request.yml index f4d3be0e..4fb33a7f 100644 --- a/.github/workflows/on_pull_request.yml +++ b/.github/workflows/on_pull_request.yml @@ -26,7 +26,7 @@ jobs: matrix: CONFIG: ["N=1 OS=ubuntu OS_VER=21.04 TYPE=normal CC=gcc COVERAGE=1", "N=2 OS=ubuntu OS_VER=21.04 TYPE=normal CC=clang PUSH_IMAGE=1", - "N=3 OS=fedora OS_VER=35 TYPE=normal CC=gcc PUSH_IMAGE=1"] + "N=3 OS=fedora OS_VER=35 TYPE=normal CC=gcc PUSH_IMAGE=1 CXX_STANDARD=20 BUILD_CPP_EXAMPLES=1"] steps: - name: Clone the git repo uses: actions/checkout@v1 diff --git a/CMakeLists.txt b/CMakeLists.txt index 797c3cc7..ba517fba 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -5,7 +5,7 @@ cmake_minimum_required(VERSION 3.3) -project(miniasync C) +project(miniasync C CXX) set(MINIASYNC_ROOT_DIR ${CMAKE_CURRENT_SOURCE_DIR}) set(VERSION_MAJOR 0) @@ -32,6 +32,7 @@ option(BUILD_EXAMPLES "build examples" ON) option(BUILD_TESTS "build tests" ON) option(TESTS_USE_VALGRIND "enable tests with valgrind (if found)" ON) option(COMPILE_DML "compile miniasync dml implementation library" OFF) +option(BUILD_CPP_EXAMPLES "build cpp examples - require c++20 support" OFF) include(FindPerl) include(FindThreads) @@ -40,6 +41,10 @@ include(CheckCCompilerFlag) include(GNUInstallDirs) include(${CMAKE_SOURCE_DIR}/cmake/functions.cmake) +if(BUILD_CPP_EXAMPLES AND "${CMAKE_CXX_STANDARD}" LESS 20) + message(FATAL_ERROR "To build cpp example, CMAKE_CXX_STANDARD must be set to 20 or higher") +endif() + # look for pkg config (use it later for managing valgrind) find_package(PkgConfig REQUIRED) diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 2a45351f..ec3ee6d6 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -42,3 +42,8 @@ endfunction() # add all the examples with a use of the add_example function defined above add_example(basic basic/basic.c) + +if(BUILD_CPP_EXAMPLES) + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fcoroutines") + add_example(coroutine_memcpy coroutine_memcpy/coroutine_memcpy.cpp) +endif() diff --git a/examples/coroutine_memcpy/coroutine_helpers.hpp b/examples/coroutine_memcpy/coroutine_helpers.hpp new file mode 100644 index 00000000..f1568a44 --- /dev/null +++ b/examples/coroutine_memcpy/coroutine_helpers.hpp @@ -0,0 +1,118 @@ +// SPDX-License-Identifier: BSD-3-Clause +/* Copyright 2021, Intel Corporation */ +// SPDX-License-Identifier: MIT +/* Copyright (c) Lewis Baker */ + +#include +#include +#include +#include +#include +#include + +#ifndef MINIASYNC_COROUTINE_HELPERS +#define MINIASYNC_COROUTINE_HELPERS + +/* + * Helper structures for coroutines, they are heavily inspired by + * https://github.com/lewissbaker/cppcoro + */ + +/* This is a generic task which supports continuation. */ +struct task { + struct promise_type { + struct final_awaitable + { + bool await_ready() const noexcept { return false; } + void await_resume() noexcept {} + + void await_suspend(std::coroutine_handle h) noexcept { + auto &cont = h.promise().cont; + if (cont) + cont.resume(); + } + }; + + task get_return_object() { return task{std::coroutine_handle::from_promise(*this)}; } + std::suspend_always initial_suspend() { return {}; } + auto final_suspend() noexcept { return final_awaitable{}; } + void return_void() {} + void unhandled_exception() {} + + using future_type = std::pair>; + + std::vector futures; + std::coroutine_handle cont; + }; + + void + wait() { + h.resume(); + } + + bool + await_ready() { return !h || h.done();} + std::coroutine_handle<> await_suspend(std::coroutine_handle aw) { + h.promise().cont = aw; + return h; + } + + void + await_resume() {} + + std::coroutine_handle h; +}; + +namespace detail { + +template +task when_all_task(Awaitable awaitable, std::atomic &counter, std::coroutine_handle<> h) +{ + co_await awaitable; + + auto cnt = counter.fetch_sub(1); + if (cnt - 1 == 0) { + h.resume(); + } +} + +template +struct when_all_ready_awaitable +{ + when_all_ready_awaitable(std::vector&& tasks): counter(tasks.size()), tasks(std::move(tasks)) + { + } + + bool await_ready() + { + return false; + } + + void await_suspend(std::coroutine_handle<> h) + { + for (auto&& task : tasks) + { + when_all_task(std::move(task), counter, h).h.resume(); + } + } + + void await_resume() {} + + std::atomic counter = 0; + std::vector tasks; +}; +} + +template +auto when_all(A&& aw, Awaitables&&... awaitables) +{ + std::vector> tasks; + tasks.emplace_back(std::move(aw)); + + for (auto &&a : {awaitables...}) + tasks.emplace_back(std::move(a)); + + return detail::when_all_ready_awaitable(std::move(tasks)); +} + +#endif diff --git a/examples/coroutine_memcpy/coroutine_memcpy.cpp b/examples/coroutine_memcpy/coroutine_memcpy.cpp new file mode 100644 index 00000000..6c8cbefc --- /dev/null +++ b/examples/coroutine_memcpy/coroutine_memcpy.cpp @@ -0,0 +1,95 @@ +// SPDX-License-Identifier: BSD-3-Clause +/* Copyright 2021, Intel Corporation */ + +/* + * coroutine_memcpy.cpp -- example showing miniasync integration with coroutines + */ + +#include "libminiasync.h" + +#include +#include +#include +#include +#include + +#include "coroutine_helpers.hpp" +#include "miniaync_awaitable.hpp" + +/* Executor loop */ +void wait(runner_state &rs, task& t) +{ + t.h.resume(); + + while (!t.h.done()) { + // XXX - optimize this for single future case, + // it's not optimal to allocate new vector each time + auto awaitables_snapshot = std::move(rs.awaitables); + std::vector done(awaitables_snapshot.size(), false); + int done_cnt = 0; + + do { + // XXX: can use umwait + for (int i = 0; i < awaitables_snapshot.size(); i++) { + auto &f = awaitables_snapshot[i]; + if (f->is_done() && !done[i]) { + done_cnt++; + done[i] = true; + f->resume(); + } + } + } while (done_cnt != awaitables_snapshot.size()); + } +} + +task run_async_memcpy(runner_state &rs, char *dst, char *src, size_t n) +{ + std::cout << "Before memcpy" << std::endl; + co_await async_memcpy(rs, dst, src, n/2); + std::cout << "After memcpy " << ((char*) dst) << std::endl; + co_await async_memcpy(rs, dst + n/2, src + n/2, n - n/2); + std::cout << "After second memcpy " << ((char*) dst) << std::endl; + + auto a1 = async_memcpy(rs, dst, src, 1); + auto a2 = async_memcpy(rs, dst + 1, src, 1); + auto a3 = async_memcpy(rs, dst + 2, src, 1); + + co_await when_all(a1, a2, a3); + std::cout << "After 3 concurrent memcopies " << ((char*) dst) << std::endl; +} + +task async_memcpy_print(runner_state &rs, char *dst, char *src, size_t n, std::string to_print) +{ + auto a1 = run_async_memcpy(rs, dst, src, n/2); + auto a2 = run_async_memcpy(rs, dst + n/2, src + n/2, n - n/2); + + co_await when_all(a1, a2); + + std::cout << to_print << std::endl; +} + +int +main(int argc, char *argv[]) +{ + runner_state rs = { + .mover = std::unique_ptr( + vdm_new(vdm_descriptor_pthreads_polled()), &vdm_delete) + }; + + static constexpr auto buffer_size = 10; + static constexpr auto to_copy = "something"; + static constexpr auto to_print = "async print!"; + + char buffer[buffer_size] = {0}; + { + auto future = async_memcpy_print(rs, buffer, std::string(to_copy).data(), buffer_size, to_print); + + std::cout << "inside main" << std::endl; + + wait(rs, future); + + std::cout << buffer << std::endl; + } + + return 0; +} diff --git a/examples/coroutine_memcpy/miniaync_awaitable.hpp b/examples/coroutine_memcpy/miniaync_awaitable.hpp new file mode 100644 index 00000000..a596d3be --- /dev/null +++ b/examples/coroutine_memcpy/miniaync_awaitable.hpp @@ -0,0 +1,77 @@ +// SPDX-License-Identifier: BSD-3-Clause +/* Copyright 2021, Intel Corporation */ + +/* + * miniasync_awaitable.hpp - base class for all miniasync based awaitbales + */ + +#include "libminiasync.h" + +#include + +struct miniasync_awaitable_base +{ + bool await_ready(); + void await_resume(); + + bool is_done(); + void resume(); + +protected: + struct future_notifier notifier; + std::coroutine_handle<> cont; +}; + +bool miniasync_awaitable_base::await_ready() +{ + return false; +} + +void miniasync_awaitable_base::await_resume() +{ +} + +bool miniasync_awaitable_base::is_done() +{ + assert(notifier.notifier_used == FUTURE_NOTIFIER_POLLER); + return *notifier.poller.ptr_to_monitor; +} + +void miniasync_awaitable_base::resume() +{ + cont.resume(); +} + +struct runner_state { + std::unique_ptr mover; + std::vector awaitables; +}; + +template +struct miniasync_awaitable : public miniasync_awaitable_base +{ + miniasync_awaitable(runner_state &rs, Future future); + void await_suspend(std::coroutine_handle<> cont); + + runner_state &rs; + Future future; +}; + +template +miniasync_awaitable::miniasync_awaitable(runner_state &rs, Future future): rs(rs), future(future) { +} + +template +void miniasync_awaitable::await_suspend(std::coroutine_handle<> cont) +{ + this->cont = cont; + + future_poll(FUTURE_AS_RUNNABLE(&future), ¬ifier); + + rs.awaitables.push_back(static_cast(this)); +} + +miniasync_awaitable async_memcpy(runner_state &rs, void *dst, void *src, size_t n) +{ + return miniasync_awaitable(rs, vdm_memcpy(rs.mover.get(), dst, src, n, 0)); +} diff --git a/utils/docker/build.sh b/utils/docker/build.sh index 84f86bc0..8e627dfa 100755 --- a/utils/docker/build.sh +++ b/utils/docker/build.sh @@ -114,6 +114,8 @@ docker run --privileged=true --name=$containerName -i $TTY \ --env TEST_PACKAGES=${TEST_PACKAGES:-ON} \ --env CHECK_CSTYLE=${CHECK_CSTYLE:-ON} \ --env FAULT_INJECTION=$FAULT_INJECTION \ + --env BUILD_CPP_EXAMPLES=${BUILD_CPP_EXAMPLES:-OFF} \ + --env CXX_STANDARD=${CXX_STANDARD:-20} \ --env CC=${CC:-gcc} \ --shm-size=4G \ -v $HOST_WORKDIR:$WORKDIR \ diff --git a/utils/docker/run-build.sh b/utils/docker/run-build.sh index 5de511fd..fb654d4e 100755 --- a/utils/docker/run-build.sh +++ b/utils/docker/run-build.sh @@ -156,6 +156,8 @@ cmake .. -DCMAKE_BUILD_TYPE=Debug \ -DCOVERAGE=$COVERAGE \ -DCHECK_CSTYLE=${CHECK_CSTYLE} \ -DDEVELOPER_MODE=1 \ + -DBUILD_CPP_EXAMPLES = ${BUILD_CPP_EXAMPLES} \ + -DCMAKE_CXX_STANDARD = ${CXX_STANDARD} \ -DTEST_DIR=$TEST_DIR \ -DCOMPILE_DML=1