Skip to content

Commit

Permalink
asio strand scheduler (#627)
Browse files Browse the repository at this point in the history
* asio strand scheduler

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Address comments

* Handle current_thread queue

* Remove duplication

* Address comment

* Fix build

---------

Co-authored-by: Aleksey Loginov <victimsnino@gmail.com>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Sep 11, 2024
1 parent 8ddb555 commit ed65786
Show file tree
Hide file tree
Showing 18 changed files with 506 additions and 15 deletions.
5 changes: 3 additions & 2 deletions BUILDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ But RPP is header-only library, so, without enabling any extra options is just c
- `RPP_BUILD_TESTS` - (ON/OFF) build unit tests (default OFF)
- `RPP_BUILD_EXAMPLES` - (ON/OFF) build examples of usage of RPP (default OFF)
- `RPP_BUILD_SFML_CODE` - (ON/OFF) build RPP code related to SFML or not (default OFF) - requires SFML to be installed
- `RPP_BUILD_QT_CODE` - (ON/OFF) build QT related code (examples/tests)(rppqt module doesn't requires this one) (default OFF) - requires QT5/6 to be installed
- `RPP_BUILD_GRPC_CODE` - (ON/OFF) build GRPC related code (examples/tests)(rppgrpc module doesn't requires this one) (default OFF) - requires grpc++/protobuf to be installed
- `RPP_BUILD_QT_CODE` - (ON/OFF) build QT related code (examples/tests)(rppqt module doesn't require this one) (default OFF) - requires QT5/6 to be installed
- `RPP_BUILD_GRPC_CODE` - (ON/OFF) build GRPC related code (examples/tests)(rppgrpc module doesn't require this one) (default OFF) - requires grpc++/protobuf to be installed
- `RPP_BUILD_ASIO_CODE` - (ON/OFF) build RPPASIO related code (examples/tests)(rppasio module doesn't require this one) (default OFF) - requires asio to be installed

By default, it provides rpp and rppqt INTERFACE modules.

Expand Down
29 changes: 18 additions & 11 deletions CMakePresets.json
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,13 @@
"RPP_BUILD_GRPC_CODE" : "ON"
}
},
{
"name" : "build-asio",
"hidden": true,
"cacheVariables": {
"RPP_BUILD_ASIO_CODE" : "ON"
}
},
{
"name" : "use-conan",
"hidden": true,
Expand All @@ -168,7 +175,7 @@
},
{
"name": "ci-coverage-clang",
"inherits": ["ci-build", "build-tests", "build-qt", "build-grpc", "ci-unix", "ci-clang"],
"inherits": ["ci-build", "build-tests", "build-qt", "build-grpc", "build-asio", "ci-unix", "ci-clang"],
"cacheVariables": {
"RPP_ENABLE_COVERAGE": "ON",
"CMAKE_CXX_FLAGS": "-O0 -g -fprofile-instr-generate -fcoverage-mapping --coverage",
Expand All @@ -177,58 +184,58 @@
},
{
"name": "ci-sanitize-tsan",
"inherits": ["ci-build", "build-tests", "build-benchmarks", "build-qt", "ci-unix", "ci-clang"],
"inherits": ["ci-build", "build-tests", "build-benchmarks", "build-qt", "build-asio", "ci-unix", "ci-clang"],
"cacheVariables": {
"CMAKE_CXX_FLAGS": "-fsanitize=thread -g -O1"
}
},
{
"name": "ci-sanitize-asan",
"inherits": ["ci-build", "build-tests", "build-benchmarks", "build-qt", "build-grpc", "ci-unix", "ci-clang"],
"inherits": ["ci-build", "build-tests", "build-benchmarks", "build-qt", "build-grpc", "build-asio", "ci-unix", "ci-clang"],
"cacheVariables": {
"CMAKE_CXX_FLAGS": "-fsanitize=address -fno-optimize-sibling-calls -fsanitize-address-use-after-scope -fno-omit-frame-pointer -g -O1"
}
},
{
"name": "ci-sanitize-lsan",
"inherits": ["ci-build", "build-tests", "build-benchmarks", "build-qt", "build-grpc", "ci-unix", "ci-clang"],
"inherits": ["ci-build", "build-tests", "build-benchmarks", "build-qt", "build-grpc", "build-asio", "ci-unix", "ci-clang"],
"cacheVariables": {
"CMAKE_CXX_FLAGS": "-fsanitize=leak -fno-omit-frame-pointer -g -O1"
}
},
{
"name": "ci-sanitize-msan",
"inherits": ["ci-build", "build-tests", "build-benchmarks", "build-qt", "build-grpc", "ci-unix", "ci-clang"],
"inherits": ["ci-build", "build-tests", "build-benchmarks", "build-qt", "build-grpc", "build-asio", "ci-unix", "ci-clang"],
"cacheVariables": {
"CMAKE_CXX_FLAGS": "-fsanitize=memory -fno-optimize-sibling-calls -fsanitize-memory-track-origins=2 -fno-omit-frame-pointer -g -O2"
}
},
{
"name": "ci-sanitize-ubsan",
"inherits": ["ci-build", "build-tests", "build-benchmarks", "build-qt", "build-grpc", "ci-unix", "ci-clang"],
"inherits": ["ci-build", "build-tests", "build-benchmarks", "build-qt", "build-grpc", "build-asio", "ci-unix", "ci-clang"],
"cacheVariables": {
"CMAKE_CXX_FLAGS": "-fsanitize=undefined"
}
},
{
"name": "ci-macos-tests",
"inherits": ["ci-build", "build-tests", "build-examples", "build-qt", "build-grpc", "build-sfml", "ci-unix"]
"inherits": ["ci-build", "build-tests", "build-examples", "build-qt", "build-grpc", "build-asio", "build-sfml", "ci-unix"]
},
{
"name": "ci-ubuntu-clang-tests",
"inherits": ["ci-build", "build-tests", "build-examples", "build-qt", "build-grpc", "build-sfml", "ci-unix", "ci-clang", "cppcheck", "clang-tidy"]
"inherits": ["ci-build", "build-tests", "build-examples", "build-qt", "build-grpc", "build-asio", "build-sfml", "ci-unix", "ci-clang", "cppcheck", "clang-tidy"]
},
{
"name": "ci-ubuntu-gcc-tests",
"inherits": ["ci-build", "build-tests", "build-examples", "build-qt", "build-grpc", "build-sfml", "ci-unix", "ci-gcc", "cppcheck", "clang-tidy"]
"inherits": ["ci-build", "build-tests", "build-examples", "build-qt", "build-grpc", "build-asio", "build-sfml", "ci-unix", "ci-gcc", "cppcheck", "clang-tidy"]
},
{
"name": "ci-windows-tests",
"inherits": ["ci-build", "build-tests", "build-examples", "build-qt", "build-grpc", "build-sfml", "ci-win64"]
"inherits": ["ci-build", "build-tests", "build-examples", "build-qt", "build-grpc", "build-asio", "build-sfml", "ci-win64"]
},
{
"name": "ci-ubuntu-clang-tests-no-checks",
"inherits": ["ci-build", "build-tests", "build-examples", "build-qt", "build-grpc", "build-sfml", "ci-unix", "ci-clang" ]
"inherits": ["ci-build", "build-tests", "build-examples", "build-qt", "build-grpc", "build-asio", "build-sfml", "ci-unix", "ci-clang" ]
},


Expand Down
10 changes: 10 additions & 0 deletions cmake/dependencies.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,13 @@ endif()
if (RPP_BUILD_BENCHMARKS)
rpp_fetch_library(nanobench https://github.com/martinus/nanobench.git master)
endif()

# ==================== ASIO =====================
if (RPP_BUILD_ASIO_CODE)
find_package(asio REQUIRED)

macro(rpp_add_asio_support_to_executable TARGET)
target_link_libraries(${TARGET} PRIVATE asio::asio)
target_compile_definitions(${TARGET} PRIVATE "ASIO_NO_TYPEID")
endmacro()
endif()
6 changes: 6 additions & 0 deletions cmake/install-rules.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ install(
INCLUDES DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/rppgrpc"
)

install(
TARGETS rppasio
EXPORT RPPTargets
INCLUDES DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/rppasio"
)

write_basic_package_version_file(
"${package}ConfigVersion.cmake"
COMPATIBILITY SameMajorVersion
Expand Down
4 changes: 4 additions & 0 deletions cmake/variables.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ endfunction()
option(RPP_BUILD_SFML_CODE "Enable SFML support in examples/code." OFF)
option(RPP_BUILD_QT_CODE "Enable QT support in examples/code." OFF)
option(RPP_BUILD_GRPC_CODE "Enable grpc++ support in examples/code." OFF)
option(RPP_BUILD_ASIO_CODE "Enable ASIO support in examples/code." OFF)

if (RPP_DEVELOPER_MODE)
option(RPP_BUILD_TESTS "Build unit tests tree." OFF)
Expand All @@ -104,6 +105,9 @@ if (RPP_DEVELOPER_MODE)
if (RPP_BUILD_GRPC_CODE)
set(CONAN_ARGS "${CONAN_ARGS};-o rpp/*:with_grpc=True")
endif()
if (RPP_BUILD_ASIO_CODE)
set(CONAN_ARGS "${CONAN_ARGS};-o rpp/*:with_asio=True")
endif()
endif()

if(RPP_ENABLE_COVERAGE)
Expand Down
9 changes: 7 additions & 2 deletions conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,16 @@ class RppConan(ConanFile):
"with_sfml" : [False, True],
"with_tests" : [False, True],
"with_cmake" : [False, True],
"with_benchmarks" : [False, True]
"with_benchmarks" : [False, True],
"with_asio" : [False, True]
}
default_options = {
"with_grpc" : False,
"with_sfml" : False,
"with_tests": False,
"with_cmake": False,
"with_benchmarks" : False
"with_benchmarks" : False,
"with_asio" : False
}

def requirements(self):
Expand All @@ -37,5 +39,8 @@ def requirements(self):
self.requires("protobuf/3.21.12")
self.requires("libmount/2.39", override=True)

if self.options.with_asio:
self.requires("asio/1.30.2")

if self.options.with_cmake:
self.tool_requires("cmake/3.29.3")
1 change: 1 addition & 0 deletions src/extensions/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
add_subdirectory(rppqt)
add_subdirectory(rppgrpc)
add_subdirectory(rppasio)
11 changes: 11 additions & 0 deletions src/extensions/rppasio/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# ReactivePlusPlus library
#
# Copyright Aleksey Loginov 2022 - present.
# Distributed under the Boost Software License, Version 1.0.
# (See accompanying file LICENSE_1_0.txt or copy at
# https://www.boost.org/LICENSE_1_0.txt)
#
# Project home: https://github.com/victimsnino/ReactivePlusPlus
#

rpp_add_library(rppasio)
23 changes: 23 additions & 0 deletions src/extensions/rppasio/rppasio/fwd.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// ReactivePlusPlus library
//
// Copyright Aleksey Loginov 2022 - present.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// https://www.boost.org/LICENSE_1_0.txt)
//
// Project home: https://github.com/victimsnino/ReactivePlusPlus
//

#pragma once

/**
* @defgroup rppasio RPPASIO
* @brief RppAsio is extension of RPP which enables support of boost-asio library.
*/


/**
* @defgroup asio_schedulers Asio Schedulers
* @ingroup rppasio
*/
#include <rppasio/schedulers/fwd.hpp>
14 changes: 14 additions & 0 deletions src/extensions/rppasio/rppasio/rppasio.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// ReactivePlusPlus library
//
// Copyright Aleksey Loginov 2022 - present.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// https://www.boost.org/LICENSE_1_0.txt)
//
// Project home: https://github.com/victimsnino/ReactivePlusPlus
//

#pragma once

#include <rppasio/fwd.hpp>
#include <rppasio/schedulers.hpp>
20 changes: 20 additions & 0 deletions src/extensions/rppasio/rppasio/schedulers.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// ReactivePlusPlus library
//
// Copyright Aleksey Loginov 2022 - present.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// https://www.boost.org/LICENSE_1_0.txt)
//
// Project home: https://github.com/victimsnino/ReactivePlusPlus
//

#pragma once

/**
* @defgroup asio_schedulers Asio Schedulers
* @brief Scheduler is the way to introduce multi-threading in your application via RPP
* @see https://reactivex.io/documentation/scheduler.html
* @ingroup rppasio
*/

#include <rppasio/schedulers/thread_pool.hpp>
16 changes: 16 additions & 0 deletions src/extensions/rppasio/rppasio/schedulers/fwd.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// ReactivePlusPlus library
//
// Copyright Aleksey Loginov 2022 - present.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// https://www.boost.org/LICENSE_1_0.txt)
//
// Project home: https://github.com/victimsnino/ReactivePlusPlus
//

#pragma once

namespace rppasio::schedulers
{
class strand;
} // namespace rppasio::schedulers
Loading

1 comment on commit ed65786

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BENCHMARK RESULTS (AUTOGENERATED)

ci-ubuntu-gcc

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 302.53 ns 2.19 ns 2.16 ns 1.01
Subscribe empty callbacks to empty observable via pipe operator 313.04 ns 2.16 ns 2.16 ns 1.00

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 740.46 ns 0.31 ns 0.31 ns 1.00
from array of 1 - create + subscribe + current_thread 1037.15 ns 3.70 ns 3.42 ns 1.08
concat_as_source of just(1 immediate) create + subscribe 2260.74 ns 120.59 ns 104.71 ns 1.15
defer from array of 1 - defer + create + subscribe + immediate 745.42 ns 0.31 ns 0.31 ns 1.00
interval - interval + take(3) + subscribe + immediate 2113.39 ns 59.19 ns 59.23 ns 1.00
interval - interval + take(3) + subscribe + current_thread 2995.98 ns 32.40 ns 32.46 ns 1.00
from array of 1 - create + as_blocking + subscribe + new_thread 31937.91 ns 31627.25 ns 35779.06 ns 0.88
from array of 1000 - create + as_blocking + subscribe + new_thread 37193.77 ns 52459.94 ns 55631.67 ns 0.94
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe 3442.62 ns 135.40 ns 127.89 ns 1.06

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 1107.08 ns 0.31 ns 0.31 ns 1.00
immediate_just+filter(true)+subscribe 862.69 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+skip(1)+subscribe 993.87 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,1,2)+distinct_until_changed()+subscribe 871.65 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+first()+subscribe 1240.51 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+last()+subscribe 957.69 ns 0.31 ns 0.31 ns 1.00
immediate_just+take_last(1)+subscribe 1126.08 ns 17.90 ns 18.55 ns 0.96
immediate_just(1,2,3)+element_at(1)+subscribe 878.71 ns 0.31 ns 0.31 ns 1.00

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 274.97 ns 2.17 ns 2.16 ns 1.00
current_thread scheduler create worker + schedule 364.43 ns 5.86 ns 5.90 ns 0.99
current_thread scheduler create worker + schedule + recursive schedule 825.76 ns 56.58 ns 55.62 ns 1.02

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 865.33 ns 0.31 ns 0.31 ns 1.00
immediate_just+scan(10, std::plus)+subscribe 897.53 ns 0.31 ns 0.31 ns 1.00
immediate_just+flat_map(immediate_just(v*2))+subscribe 2307.20 ns 173.01 ns 162.78 ns 1.06
immediate_just+buffer(2)+subscribe 1556.35 ns 13.58 ns 14.20 ns 0.96
immediate_just+window(2)+subscribe + subscsribe inner 2454.23 ns 1072.71 ns 1062.29 ns 1.01

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 862.25 ns - - 0.00
immediate_just+take_while(true)+subscribe 856.87 ns 0.31 ns 0.31 ns 1.00

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 2019.97 ns 0.31 ns 0.31 ns 1.00

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 3487.98 ns 188.62 ns 187.54 ns 1.01
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 3644.02 ns 183.53 ns 171.48 ns 1.07
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 140.16 ns 136.91 ns 1.02
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 3573.79 ns 1101.62 ns 885.27 ns 1.24
immediate_just(1) + zip(immediate_just(2)) + subscribe 2172.21 ns 223.15 ns 225.23 ns 0.99

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 34.54 ns 14.71 ns 14.61 ns 1.01
subscribe 100 observers to publish_subject 200104.80 ns 15760.26 ns 15421.60 ns 1.02
100 on_next to 100 observers to publish_subject 28076.86 ns 17221.73 ns 20460.62 ns 0.84

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 1413.89 ns 12.96 ns 12.65 ns 1.02
basic sample with immediate scheduler 1406.64 ns 5.56 ns 5.24 ns 1.06

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 970.99 ns 0.31 ns 0.31 ns 1.00

Error Handling Operators

name rxcpp rpp prev rpp ratio
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 2098.72 ns 988.30 ns 1013.24 ns 0.98
create(on_error())+retry(1)+subscribe 598.32 ns 98.40 ns 107.89 ns 0.91

ci-macos

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 977.49 ns 3.88 ns 3.89 ns 1.00
Subscribe empty callbacks to empty observable via pipe operator 981.06 ns 3.87 ns 3.87 ns 1.00

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 1983.99 ns 0.24 ns 0.23 ns 1.01
from array of 1 - create + subscribe + current_thread 2467.22 ns 33.94 ns 34.68 ns 0.98
concat_as_source of just(1 immediate) create + subscribe 5366.15 ns 370.61 ns 337.75 ns 1.10
defer from array of 1 - defer + create + subscribe + immediate 1962.41 ns 0.23 ns 0.24 ns 0.99
interval - interval + take(3) + subscribe + immediate 4907.89 ns 114.57 ns 113.98 ns 1.01
interval - interval + take(3) + subscribe + current_thread 5996.85 ns 96.48 ns 97.30 ns 0.99
from array of 1 - create + as_blocking + subscribe + new_thread 85461.29 ns 96759.08 ns 94219.18 ns 1.03
from array of 1000 - create + as_blocking + subscribe + new_thread 98037.42 ns 108004.75 ns 99710.60 ns 1.08
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe 8107.75 ns 377.16 ns 383.78 ns 0.98

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 3356.60 ns 0.28 ns 0.23 ns 1.24
immediate_just+filter(true)+subscribe 2116.47 ns 0.23 ns 0.22 ns 1.05
immediate_just(1,2)+skip(1)+subscribe 3312.08 ns 0.26 ns 0.23 ns 1.12
immediate_just(1,1,2)+distinct_until_changed()+subscribe 2440.31 ns 0.47 ns 0.46 ns 1.03
immediate_just(1,2)+first()+subscribe 3169.95 ns 0.23 ns 0.22 ns 1.05
immediate_just(1,2)+last()+subscribe 2636.32 ns 0.26 ns 0.22 ns 1.19
immediate_just+take_last(1)+subscribe 3250.50 ns 0.28 ns 0.22 ns 1.25
immediate_just(1,2,3)+element_at(1)+subscribe 2383.06 ns 0.28 ns 0.23 ns 1.23

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 845.43 ns 4.11 ns 4.14 ns 0.99
current_thread scheduler create worker + schedule 1179.44 ns 38.37 ns 38.52 ns 1.00
current_thread scheduler create worker + schedule + recursive schedule 1977.91 ns 206.36 ns 209.26 ns 0.99

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 2388.02 ns 5.05 ns 4.33 ns 1.17
immediate_just+scan(10, std::plus)+subscribe 2800.39 ns 0.54 ns 0.46 ns 1.19
immediate_just+flat_map(immediate_just(v*2))+subscribe 5323.35 ns 404.08 ns 400.88 ns 1.01
immediate_just+buffer(2)+subscribe 2478.49 ns 64.78 ns 62.93 ns 1.03
immediate_just+window(2)+subscribe + subscsribe inner 6278.13 ns 2548.52 ns 2306.79 ns 1.10

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 2103.23 ns - - 0.00
immediate_just+take_while(true)+subscribe 2267.91 ns 0.23 ns 0.23 ns 1.03

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 5960.45 ns 5.54 ns 4.78 ns 1.16

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 7435.27 ns 439.76 ns 440.60 ns 1.00
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 8129.23 ns 440.39 ns 439.71 ns 1.00
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 498.12 ns 459.90 ns 1.08
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 7927.98 ns 2010.22 ns 1911.64 ns 1.05
immediate_just(1) + zip(immediate_just(2)) + subscribe 5169.89 ns 834.94 ns 828.49 ns 1.01

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 81.63 ns 55.47 ns 48.72 ns 1.14
subscribe 100 observers to publish_subject 402908.67 ns 45958.00 ns 40678.50 ns 1.13
100 on_next to 100 observers to publish_subject 58754.05 ns 21626.32 ns 16590.69 ns 1.30

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 4083.71 ns 80.86 ns 69.65 ns 1.16
basic sample with immediate scheduler 3088.19 ns 22.14 ns 18.74 ns 1.18

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 2744.50 ns 0.28 ns 0.23 ns 1.20

Error Handling Operators

name rxcpp rpp prev rpp ratio
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 7008.16 ns 4580.18 ns 4110.65 ns 1.11
create(on_error())+retry(1)+subscribe 1983.34 ns 294.48 ns 284.80 ns 1.03

ci-ubuntu-clang

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 277.88 ns 1.57 ns 1.54 ns 1.02
Subscribe empty callbacks to empty observable via pipe operator 266.53 ns 1.56 ns 1.54 ns 1.01

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 555.49 ns 0.31 ns 0.31 ns 1.00
from array of 1 - create + subscribe + current_thread 786.40 ns 4.33 ns 4.32 ns 1.00
concat_as_source of just(1 immediate) create + subscribe 2332.62 ns 131.79 ns 130.58 ns 1.01
defer from array of 1 - defer + create + subscribe + immediate 775.14 ns 0.31 ns 0.31 ns 1.00
interval - interval + take(3) + subscribe + immediate 2224.07 ns 58.26 ns 58.31 ns 1.00
interval - interval + take(3) + subscribe + current_thread 3172.95 ns 30.86 ns 30.90 ns 1.00
from array of 1 - create + as_blocking + subscribe + new_thread 30853.81 ns 35202.59 ns 31702.06 ns 1.11
from array of 1000 - create + as_blocking + subscribe + new_thread 39839.20 ns 41525.30 ns 36502.28 ns 1.14
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe 3654.77 ns 149.63 ns 150.14 ns 1.00

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 1161.89 ns 0.31 ns 0.31 ns 1.00
immediate_just+filter(true)+subscribe 846.91 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+skip(1)+subscribe 1080.62 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,1,2)+distinct_until_changed()+subscribe 867.65 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+first()+subscribe 1373.83 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+last()+subscribe 995.42 ns 0.31 ns 0.31 ns 1.00
immediate_just+take_last(1)+subscribe 1178.35 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2,3)+element_at(1)+subscribe 867.08 ns 0.31 ns 0.31 ns 1.00

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 281.33 ns 1.56 ns 1.54 ns 1.01
current_thread scheduler create worker + schedule 389.98 ns 4.94 ns 4.94 ns 1.00
current_thread scheduler create worker + schedule + recursive schedule 844.97 ns 55.77 ns 55.73 ns 1.00

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 836.57 ns 0.31 ns 0.31 ns 1.00
immediate_just+scan(10, std::plus)+subscribe 953.30 ns 0.31 ns 0.31 ns 1.00
immediate_just+flat_map(immediate_just(v*2))+subscribe 2249.80 ns 137.52 ns 138.53 ns 0.99
immediate_just+buffer(2)+subscribe 1500.83 ns 13.58 ns 13.59 ns 1.00
immediate_just+window(2)+subscribe + subscsribe inner 2461.12 ns 955.31 ns 928.39 ns 1.03

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 844.57 ns - - 0.00
immediate_just+take_while(true)+subscribe 842.87 ns 0.31 ns 0.31 ns 1.00

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 1991.14 ns 0.31 ns 0.31 ns 1.00

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 3226.66 ns 158.27 ns 164.77 ns 0.96
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 3734.21 ns 145.99 ns 151.11 ns 0.97
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 143.48 ns 143.84 ns 1.00
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 3362.53 ns 843.78 ns 844.08 ns 1.00
immediate_just(1) + zip(immediate_just(2)) + subscribe 2211.34 ns 201.13 ns 200.48 ns 1.00

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 53.99 ns 17.52 ns 17.55 ns 1.00
subscribe 100 observers to publish_subject 212374.60 ns 16102.09 ns 15970.75 ns 1.01
100 on_next to 100 observers to publish_subject 42739.41 ns 20592.25 ns 20908.68 ns 0.98

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 1291.29 ns 11.72 ns 11.73 ns 1.00
basic sample with immediate scheduler 1285.52 ns 6.17 ns 6.17 ns 1.00

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 1003.88 ns 0.31 ns 0.31 ns 1.00

Error Handling Operators

name rxcpp rpp prev rpp ratio
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 2163.29 ns 1240.12 ns 1272.39 ns 0.97
create(on_error())+retry(1)+subscribe 662.47 ns 138.40 ns 138.53 ns 1.00

ci-windows

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 805.52 ns 4.01 ns 4.88 ns 0.82
Subscribe empty callbacks to empty observable via pipe operator 796.59 ns 4.01 ns 4.94 ns 0.81

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 1425.96 ns 9.63 ns 9.71 ns 0.99
from array of 1 - create + subscribe + current_thread 1424.90 ns 17.91 ns 17.60 ns 1.02
concat_as_source of just(1 immediate) create + subscribe 4086.45 ns 189.21 ns 177.26 ns 1.07
defer from array of 1 - defer + create + subscribe + immediate 1205.62 ns 9.41 ns 9.41 ns 1.00
interval - interval + take(3) + subscribe + immediate 3351.90 ns 145.29 ns 144.40 ns 1.01
interval - interval + take(3) + subscribe + current_thread 3678.80 ns 65.06 ns 65.09 ns 1.00
from array of 1 - create + as_blocking + subscribe + new_thread 118750.00 ns 113920.00 ns 124000.00 ns 0.92
from array of 1000 - create + as_blocking + subscribe + new_thread 127544.44 ns 121444.44 ns 146937.50 ns 0.83
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe 5301.38 ns 207.64 ns 207.82 ns 1.00

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 1807.85 ns 25.30 ns 24.99 ns 1.01
immediate_just+filter(true)+subscribe 1555.16 ns 25.50 ns 24.05 ns 1.06
immediate_just(1,2)+skip(1)+subscribe 1715.34 ns 24.07 ns 23.45 ns 1.03
immediate_just(1,1,2)+distinct_until_changed()+subscribe 1874.91 ns 29.01 ns 26.37 ns 1.10
immediate_just(1,2)+first()+subscribe 2063.65 ns 23.38 ns 23.74 ns 0.98
immediate_just(1,2)+last()+subscribe 1805.16 ns 24.07 ns 24.72 ns 0.97
immediate_just+take_last(1)+subscribe 2262.35 ns 128.77 ns 70.33 ns 1.83
immediate_just(1,2,3)+element_at(1)+subscribe 1330.22 ns 27.46 ns 26.58 ns 1.03

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 483.51 ns 6.18 ns 6.79 ns 0.91
current_thread scheduler create worker + schedule 666.20 ns 14.19 ns 14.57 ns 0.97
current_thread scheduler create worker + schedule + recursive schedule 1097.63 ns 104.44 ns 103.29 ns 1.01

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 1305.11 ns 24.35 ns 24.36 ns 1.00
immediate_just+scan(10, std::plus)+subscribe 1414.11 ns 26.84 ns 26.51 ns 1.01
immediate_just+flat_map(immediate_just(v*2))+subscribe 3808.21 ns 204.70 ns 212.91 ns 0.96
immediate_just+buffer(2)+subscribe 2568.82 ns 71.11 ns 69.02 ns 1.03
immediate_just+window(2)+subscribe + subscsribe inner 3997.95 ns 1277.04 ns 1307.55 ns 0.98

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 1305.30 ns 23.12 ns 23.14 ns 1.00
immediate_just+take_while(true)+subscribe 1313.01 ns 24.36 ns 24.05 ns 1.01

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 3126.65 ns 11.11 ns 11.10 ns 1.00

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 5200.88 ns 224.47 ns 229.81 ns 0.98
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 5362.24 ns 211.43 ns 225.48 ns 0.94
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 197.55 ns 204.26 ns 0.97
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 5326.70 ns 923.25 ns 961.55 ns 0.96
immediate_just(1) + zip(immediate_just(2)) + subscribe 3518.77 ns 536.47 ns 532.02 ns 1.01

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 37.40 ns 20.66 ns 20.68 ns 1.00
subscribe 100 observers to publish_subject 264075.00 ns 30131.58 ns 28417.95 ns 1.06
100 on_next to 100 observers to publish_subject 52186.96 ns 38819.23 ns 38822.58 ns 1.00

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 1845.60 ns 102.37 ns 102.64 ns 1.00
basic sample with immediate scheduler 2240.16 ns 74.11 ns 74.07 ns 1.00

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 1427.45 ns 24.97 ns 24.66 ns 1.01

Error Handling Operators

name rxcpp rpp prev rpp ratio
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 2795.78 ns 349.11 ns 366.82 ns 0.95
create(on_error())+retry(1)+subscribe 2587.50 ns 185.37 ns 141.04 ns 1.31

Please sign in to comment.