From 0fbf74d8be10926f5fc8a29eaa59dee3e8b143ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20Chor=C4=85=C5=BCewicz?= Date: Mon, 6 Dec 2021 18:28:08 +0100 Subject: [PATCH] examples: Add example with coroutines How it works: When co_await async_memcpy is called, the compiler calls async_memcpy::awaitable::await_suspend() which puts miniasync future and coroutine handle to a shared containers. It also registers miniasync notifier so that other entities can now when the future completes. There is also an executor loop which takes elements from the container and executes them (calls runtime_wait for miniasync future and and then resumes the coroutine). --- .github/workflows/on_pull_request.yml | 2 +- CMakeLists.txt | 7 +- examples/CMakeLists.txt | 5 + .../coroutine_memcpy/coroutine_helpers.hpp | 144 ++++++++++++++++++ .../coroutine_memcpy/coroutine_memcpy.cpp | 70 +++++++++ examples/coroutine_memcpy/executor.hpp | 94 ++++++++++++ .../coroutine_memcpy/miniasync_operation.cpp | 43 ++++++ .../coroutine_memcpy/miniasync_operation.hpp | 45 ++++++ utils/docker/build.sh | 2 + utils/docker/run-build.sh | 2 + 10 files changed, 412 insertions(+), 2 deletions(-) create mode 100644 examples/coroutine_memcpy/coroutine_helpers.hpp create mode 100644 examples/coroutine_memcpy/coroutine_memcpy.cpp create mode 100644 examples/coroutine_memcpy/executor.hpp create mode 100644 examples/coroutine_memcpy/miniasync_operation.cpp create mode 100644 examples/coroutine_memcpy/miniasync_operation.hpp diff --git a/.github/workflows/on_pull_request.yml b/.github/workflows/on_pull_request.yml index 86dfaed..68d40d0 100644 --- a/.github/workflows/on_pull_request.yml +++ b/.github/workflows/on_pull_request.yml @@ -26,7 +26,7 @@ jobs: matrix: CONFIG: ["N=1 OS=ubuntu OS_VER=21.04 TYPE=normal CC=gcc COVERAGE=1", "N=2 OS=ubuntu OS_VER=21.04 TYPE=normal CC=clang PUSH_IMAGE=1", - "N=3 OS=fedora OS_VER=35 TYPE=normal CC=gcc PUSH_IMAGE=1"] + "N=3 OS=fedora OS_VER=35 TYPE=normal CC=gcc PUSH_IMAGE=1 CXX_STANDARD=20 BUILD_CPP_EXAMPLES=1"] steps: - name: Clone the git repo uses: actions/checkout@v1 diff --git a/CMakeLists.txt b/CMakeLists.txt index ef221cd..d0987a6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -5,7 +5,7 @@ cmake_minimum_required(VERSION 3.3) -project(miniasync C) +project(miniasync C CXX) set(MINIASYNC_ROOT_DIR ${CMAKE_CURRENT_SOURCE_DIR}) set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/out CACHE STRING "") @@ -42,6 +42,7 @@ option(BUILD_EXAMPLES "build examples" ON) option(BUILD_TESTS "build tests" ON) option(TESTS_USE_VALGRIND "enable tests with valgrind (if found)" ON) option(COMPILE_DML "compile miniasync dml implementation library" OFF) +option(BUILD_CPP_EXAMPLES "build cpp examples - require c++20 support" OFF) include(FindPerl) include(FindThreads) @@ -50,6 +51,10 @@ include(CheckCCompilerFlag) include(GNUInstallDirs) include(${CMAKE_SOURCE_DIR}/cmake/functions.cmake) +if(BUILD_CPP_EXAMPLES AND "${CMAKE_CXX_STANDARD}" LESS 20) + message(FATAL_ERROR "To build cpp example, CMAKE_CXX_STANDARD must be set to 20 or higher") +endif() + # look for pkg config (use it later for managing valgrind) if(NOT WIN32) find_package(PkgConfig REQUIRED) diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index f92060f..99ebce4 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -43,3 +43,8 @@ endfunction() # add all the examples with a use of the add_example function defined above add_example(basic basic/basic.c) add_example(basic-async basic-async/basic-async.c) + +if(BUILD_CPP_EXAMPLES) + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fcoroutines") + add_example(coroutine_memcpy coroutine_memcpy/coroutine_memcpy.cpp coroutine_memcpy/miniasync_operation.cpp) +endif() diff --git a/examples/coroutine_memcpy/coroutine_helpers.hpp b/examples/coroutine_memcpy/coroutine_helpers.hpp new file mode 100644 index 0000000..1979d5e --- /dev/null +++ b/examples/coroutine_memcpy/coroutine_helpers.hpp @@ -0,0 +1,144 @@ +/* SPDX-License-Identifier: BSD-3-Clause */ +/* Copyright 2021-2022, Intel Corporation */ +// SPDX-License-Identifier: MIT +/* Copyright (c) Lewis Baker */ + +#include +#include +#include +#include +#include +#include + +#ifndef MINIASYNC_COROUTINE_HELPERS +#define MINIASYNC_COROUTINE_HELPERS + +/* + * Helper structures for coroutines, they are heavily inspired by + * https://github.com/lewissbaker/cppcoro + */ + +/* This is a generic task which supports continuation. */ +struct task { + struct promise_type { + struct final_awaitable { + bool await_ready() const noexcept + { + return false; + } + void await_resume() noexcept + { + } + + void await_suspend(std::coroutine_handle h) noexcept + { + auto &cont = h.promise().cont; + if (cont) + cont.resume(); + } + }; + + task get_return_object() + { + return task{std::coroutine_handle::from_promise(*this)}; + } + std::suspend_always initial_suspend() + { + return {}; + } + auto final_suspend() noexcept + { + return final_awaitable{}; + } + void return_void() + { + } + void unhandled_exception() + { + } + + std::coroutine_handle cont; + }; + + void wait() + { + h.resume(); + } + + std::coroutine_handle release() && + { + return std::exchange(h, nullptr); + } + + bool await_ready() + { + return !h || h.done(); + } + + std::coroutine_handle<> await_suspend(std::coroutine_handle aw) + { + h.promise().cont = aw; + return h; + } + + void await_resume() + { + } + + std::coroutine_handle h; +}; + +namespace detail +{ + +template +task when_all_task(Awaitable awaitable, std::atomic &counter, std::coroutine_handle<> h) +{ + co_await awaitable; + + auto cnt = counter.fetch_sub(1); + if (cnt - 1 == 0) { + h.resume(); + } +} + +template +struct when_all_ready_awaitable { + when_all_ready_awaitable(std::vector &&tasks) : counter(tasks.size()), tasks(std::move(tasks)) + { + } + + bool await_ready() + { + return false; + } + + void await_suspend(std::coroutine_handle<> h) + { + for (auto &&task : tasks) { + when_all_task(std::move(task), counter, h).h.resume(); + } + } + + void await_resume() + { + } + + std::atomic counter = 0; + std::vector tasks; +}; +} // namespace detail + +template +auto when_all(A &&aw, Awaitables &&...awaitables) +{ + std::vector> tasks; + tasks.emplace_back(std::move(aw)); + + for (auto &&a : {awaitables...}) + tasks.emplace_back(std::move(a)); + + return detail::when_all_ready_awaitable(std::move(tasks)); +} + +#endif diff --git a/examples/coroutine_memcpy/coroutine_memcpy.cpp b/examples/coroutine_memcpy/coroutine_memcpy.cpp new file mode 100644 index 0000000..6cf55c8 --- /dev/null +++ b/examples/coroutine_memcpy/coroutine_memcpy.cpp @@ -0,0 +1,70 @@ +// SPDX-License-Identifier: BSD-3-Clause +/* Copyright 2021-2022, Intel Corporation */ + +/* + * coroutine_memcpy.cpp -- example showing miniasync integration with coroutines + */ + +#include "libminiasync.h" + +#include +#include +#include +#include +#include +#include +#include + +#include "coroutine_helpers.hpp" +#include "executor.hpp" +#include "miniasync_operation.hpp" + +task run_async_memcpy(executor_type &executor, char *dst, char *src, size_t n) +{ + std::cout << "Before memcpy" << std::endl; + co_await async_memcpy(executor, dst, src, n / 2); + std::cout << "After memcpy " << ((char *)dst) << std::endl; + co_await async_memcpy(executor, dst + n / 2, src + n / 2, n - n / 2); + std::cout << "After second memcpy " << ((char *)dst) << std::endl; + + auto a1 = async_memcpy(executor, dst, src, 1); + auto a2 = async_memcpy(executor, dst + 1, src, 1); + auto a3 = async_memcpy(executor, dst + 2, src, 1); + + co_await when_all(a1, a2, a3); + std::cout << "After 3 concurrent memcopies " << ((char *)dst) << std::endl; +} + +task async_memcpy_print(executor_type &executor, char *dst, char *src, size_t n, std::string to_print) +{ + auto a1 = run_async_memcpy(executor, dst, src, n / 2); + auto a2 = run_async_memcpy(executor, dst + n / 2, src + n / 2, n - n / 2); + + co_await when_all(a1, a2); + + std::cout << to_print << std::endl; +} + +int main(int argc, char *argv[]) +{ + static constexpr size_t nthreads = 3; + static constexpr size_t ringbuf_size = 1024; + executor_type executor( + std::unique_ptr(data_mover_threads_new(nthreads, ringbuf_size, FUTURE_NOTIFIER_POLLER), &data_mover_threads_delete)); + + static constexpr auto buffer_size = 10; + static constexpr auto to_copy = "something"; + static constexpr auto to_print = "async print!"; + + char buffer[buffer_size] = {0}; + auto t = async_memcpy_print(executor, buffer, std::string(to_copy).data(), buffer_size, to_print); + executor.submit(std::move(t)); + + std::cout << "inside main" << std::endl; + + executor.run_to_completion(); + + std::cout << buffer << std::endl; + + return 0; +} diff --git a/examples/coroutine_memcpy/executor.hpp b/examples/coroutine_memcpy/executor.hpp new file mode 100644 index 0000000..ec10b43 --- /dev/null +++ b/examples/coroutine_memcpy/executor.hpp @@ -0,0 +1,94 @@ +/* SPDX-License-Identifier: BSD-3-Clause */ +/* Copyright 2021-2022, Intel Corporation */ + +/* + * executor.hpp -- miniasync-aware coroutines executor. + */ + +#include "libminiasync.h" + +#include +#include +#include + +#include "coroutine_helpers.hpp" +#include "miniasync_operation.hpp" + +#ifndef MINIASYNC_EXECUTOR +#define MINIASYNC_EXECUTOR + +/* Executor keeps a queue of coroutines to execute. New coroutine can be + * submitted via submit() function. Run_to_completion will loop until all + * coroutines are executed. */ +struct executor_type { + executor_type(std::unique_ptr &&data_mover) : vdm_(data_mover_threads_get_vdm(data_mover.get())), data_mover(std::move(data_mover)) + { + } + + void submit(task &&t) + { + auto handle = std::move(t).release(); + pending_coro.push_back(handle); + } + + void submit(miniasync_operation *operation) + { + pending_miniasync.push_back(operation); + } + + void run_to_completion() + { + while (true) { + auto next_miniasync_op = pop_next(pending_miniasync); + auto next_coro = pop_next(pending_coro); + if (!next_miniasync_op && !next_coro) + break; + + if (next_miniasync_op) + run_pending(next_miniasync_op.value()); + if (next_coro) + run_pending(next_coro.value()); + } + } + + vdm *get_mover() + { + return vdm_; + } + + private: + void run_pending(std::coroutine_handle<> h) + { + if (!h.done()) + h.resume(); + } + + void run_pending(miniasync_operation *operation) + { + if (operation->ready() && !operation->done()) { + operation->resume(); + } else { + /* Operation not ready, yet, put it back to the queue. */ + pending_miniasync.push_back(operation); + } + } + + template + std::optional pop_next(Deque &deque) + { + if (deque.empty()) + return std::nullopt; + + auto first = deque.front(); + deque.pop_front(); + + return first; + } + + std::deque> pending_coro; + std::deque pending_miniasync; + struct vdm* vdm_; + std::unique_ptr data_mover; +}; + +#endif /* MINIASYNC_EXECUTOR */ diff --git a/examples/coroutine_memcpy/miniasync_operation.cpp b/examples/coroutine_memcpy/miniasync_operation.cpp new file mode 100644 index 0000000..ab40181 --- /dev/null +++ b/examples/coroutine_memcpy/miniasync_operation.cpp @@ -0,0 +1,43 @@ +// SPDX-License-Identifier: BSD-3-Clause +/* Copyright 2021-2022, Intel Corporation */ + +/* + * miniasync_operation.cpp - implementation of miniasync_operation + */ + +#include "miniasync_operation.hpp" + +#include + +#include "executor.hpp" + +void miniasync_operation::await_resume() +{ +} + +bool miniasync_operation::await_ready() +{ + return ready(); +} + +void miniasync_operation::await_suspend(std::coroutine_handle<> h) +{ + this->h = h; + this->executor.submit(this); +} + +bool miniasync_operation::ready() +{ + assert(notifier.notifier_used == FUTURE_NOTIFIER_POLLER); + return *notifier.poller.ptr_to_monitor == 1; +} + +bool miniasync_operation::done() +{ + return this->h.done(); +} + +void miniasync_operation::resume() +{ + this->h.resume(); +} diff --git a/examples/coroutine_memcpy/miniasync_operation.hpp b/examples/coroutine_memcpy/miniasync_operation.hpp new file mode 100644 index 0000000..368dc13 --- /dev/null +++ b/examples/coroutine_memcpy/miniasync_operation.hpp @@ -0,0 +1,45 @@ +/* SPDX-License-Identifier: BSD-3-Clause */ +/* Copyright 2021-2022, Intel Corporation */ + +/* + * miniasync_operation.hpp - awaitable wrapper around miniasync operations + */ + +#include "libminiasync.h" + +#include +#include + +#ifndef MINIASYNC_OPERATION +#define MINIASYNC_OPERATION + +struct executor_type; + +struct miniasync_operation { + template + miniasync_operation(Executor &executor, Operation &&operation, Args &&...args) : executor(executor), future(operation(executor.get_mover(), std::forward(args)...)) + { + future_poll(FUTURE_AS_RUNNABLE(&future), ¬ifier); + } + + void await_resume(); + bool await_ready(); + void await_suspend(std::coroutine_handle<> h); + + bool done(); + bool ready(); + void resume(); + + private: + executor_type &executor; + std::coroutine_handle<> h; + struct future_notifier notifier; + vdm_operation_future future; +}; + +static inline auto async_memcpy(executor_type &executor, void *dst, void *src, size_t n) +{ + return miniasync_operation(executor, vdm_memcpy, dst, src, n, 0); +} + +#endif /* MINIASYNC_OPERATION */ diff --git a/utils/docker/build.sh b/utils/docker/build.sh index 84f86bc..8e627df 100755 --- a/utils/docker/build.sh +++ b/utils/docker/build.sh @@ -114,6 +114,8 @@ docker run --privileged=true --name=$containerName -i $TTY \ --env TEST_PACKAGES=${TEST_PACKAGES:-ON} \ --env CHECK_CSTYLE=${CHECK_CSTYLE:-ON} \ --env FAULT_INJECTION=$FAULT_INJECTION \ + --env BUILD_CPP_EXAMPLES=${BUILD_CPP_EXAMPLES:-OFF} \ + --env CXX_STANDARD=${CXX_STANDARD:-20} \ --env CC=${CC:-gcc} \ --shm-size=4G \ -v $HOST_WORKDIR:$WORKDIR \ diff --git a/utils/docker/run-build.sh b/utils/docker/run-build.sh index 3d34e9c..7815a03 100755 --- a/utils/docker/run-build.sh +++ b/utils/docker/run-build.sh @@ -215,6 +215,8 @@ cmake .. -DCMAKE_BUILD_TYPE=Debug \ -DCOVERAGE=$COVERAGE \ -DCHECK_CSTYLE=${CHECK_CSTYLE} \ -DDEVELOPER_MODE=1 \ + -DBUILD_CPP_EXAMPLES=${BUILD_CPP_EXAMPLES} \ + -DCMAKE_CXX_STANDARD=${CXX_STANDARD} \ -DTEST_DIR=$TEST_DIR \ -DCOMPILE_DML=1 make -j$(nproc)