Skip to content

Commit

Permalink
Provide way to specify where schedulable should be resheduled from "n…
Browse files Browse the repository at this point in the history
…ow" or from "start" of this schedulable (#478)
  • Loading branch information
victimsnino authored Nov 22, 2023
1 parent 9bc12da commit d9813a3
Show file tree
Hide file tree
Showing 21 changed files with 492 additions and 214 deletions.
8 changes: 4 additions & 4 deletions src/benchmarks/benchmarks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ int main(int argc, char* argv[]) // NOLINT(bugprone-exception-escape)
SECTION("immediate scheduler create worker + schedule")
{
TEST_RPP([&]() {
rpp::schedulers::immediate::create_worker().schedule([](const auto& v) { ankerl::nanobench::doNotOptimizeAway(v); return rpp::schedulers::optional_duration{}; }, rpp::make_lambda_observer([](int) {}));
rpp::schedulers::immediate::create_worker().schedule([](const auto& v) { ankerl::nanobench::doNotOptimizeAway(v); return rpp::schedulers::optional_delay_from_now{}; }, rpp::make_lambda_observer([](int) {}));
});
TEST_RXCPP([&]() {
rxcpp::identity_immediate().create_coordinator().get_worker().schedule([](const auto& v) { ankerl::nanobench::doNotOptimizeAway(v); });
Expand All @@ -206,7 +206,7 @@ int main(int argc, char* argv[]) // NOLINT(bugprone-exception-escape)
SECTION("current_thread scheduler create worker + schedule")
{
TEST_RPP([&]() {
rpp::schedulers::current_thread::create_worker().schedule([](const auto& v) { ankerl::nanobench::doNotOptimizeAway(v); return rpp::schedulers::optional_duration{}; }, rpp::make_lambda_observer([](int) {}));
rpp::schedulers::current_thread::create_worker().schedule([](const auto& v) { ankerl::nanobench::doNotOptimizeAway(v); return rpp::schedulers::optional_delay_from_now{}; }, rpp::make_lambda_observer([](int) {}));
});
TEST_RXCPP([&]() {
rxcpp::identity_current_thread().create_coordinator().get_worker().schedule([](const auto& v) { ankerl::nanobench::doNotOptimizeAway(v); });
Expand All @@ -223,10 +223,10 @@ int main(int argc, char* argv[]) // NOLINT(bugprone-exception-escape)
worker.schedule(
[](const auto& v) {
ankerl::nanobench::doNotOptimizeAway(v);
return rpp::schedulers::optional_duration{};
return rpp::schedulers::optional_delay_from_now{};
},
std::move(v));
return rpp::schedulers::optional_duration{};
return rpp::schedulers::optional_delay_from_now{};
},
rpp::make_lambda_observer([](int) {}));
});
Expand Down
13 changes: 6 additions & 7 deletions src/rpp/rpp/operators/debounce.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ class debounce_disposable final : public rpp::composite_disposable_impl<Containe
{
m_worker.schedule(
m_period,
[](const debounce_disposable_wrapper<Observer, Worker, Container>& handler) -> schedulers::optional_duration {
[](const debounce_disposable_wrapper<Observer, Worker, Container>& handler) -> schedulers::optional_delay_to {
auto value_or_duration = handler.disposable->extract_value_or_time();
if (auto* duration = std::get_if<schedulers::duration>(&value_or_duration))
return *duration;
if (auto* timepoint = std::get_if<schedulers::time_point>(&value_or_duration))
return schedulers::optional_delay_to{*timepoint};

if (auto* value = std::get_if<T>(&value_or_duration))
handler.disposable->get_observer_under_lock()->on_next(std::move(*value));
Expand All @@ -89,15 +89,14 @@ class debounce_disposable final : public rpp::composite_disposable_impl<Containe
debounce_disposable_wrapper<Observer, Worker, Container>{this->shared_from_this()});
}

std::variant<std::monostate, T, schedulers::duration> extract_value_or_time()
std::variant<std::monostate, T, schedulers::time_point> extract_value_or_time()
{
std::lock_guard lock{m_mutex};
if (!m_time_when_value_should_be_emitted.has_value() || !m_value_to_be_emitted.has_value())
return std::monostate{};

const auto now = m_worker.now();
if (m_time_when_value_should_be_emitted > now)
return m_time_when_value_should_be_emitted.value() - now;
if (m_time_when_value_should_be_emitted > m_worker.now())
return m_time_when_value_should_be_emitted.value();

m_time_when_value_should_be_emitted.reset();
auto v = std::move(m_value_to_be_emitted).value();
Expand Down
9 changes: 4 additions & 5 deletions src/rpp/rpp/operators/delay.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ struct delay_observer_strategy
{
disposable->worker.schedule(
delay.value(),
[](const delay_disposable_wrapper<Observer, Worker, Container>& wrapper) -> schedulers::optional_duration { return drain_queue(wrapper.disposable); },
[](const delay_disposable_wrapper<Observer, Worker, Container>& wrapper) { return drain_queue(wrapper.disposable); },
delay_disposable_wrapper<Observer, Worker, Container>{disposable});
}
}
Expand All @@ -137,7 +137,7 @@ struct delay_observer_strategy
}
}

static schedulers::optional_duration drain_queue(const std::shared_ptr<delay_disposable<Observer, Worker, Container>>& disposable)
static schedulers::optional_delay_to drain_queue(const std::shared_ptr<delay_disposable<Observer, Worker, Container>>& disposable)
{
while (true)
{
Expand All @@ -149,9 +149,8 @@ struct delay_observer_strategy
}

auto& top = disposable->queue.front();
const auto now = disposable->worker.now();
if (top.time_point > now)
return top.time_point - now;
if (top.time_point > disposable->worker.now())
return schedulers::optional_delay_to{top.time_point};

auto item = std::move(top.value);
disposable->queue.pop();
Expand Down
4 changes: 2 additions & 2 deletions src/rpp/rpp/operators/subscribe_on.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ struct subscribe_on_schedulable
using Type = typename TObservableChainStrategy::value_type;

template<rpp::constraint::observer_strategy<Type> ObserverStrategy>
rpp::schedulers::optional_duration operator()(observer<Type, ObserverStrategy>& observer) const
rpp::schedulers::optional_delay_from_now operator()(observer<Type, ObserverStrategy>& observer) const
{
observable.subscribe(std::move(observer));
return rpp::schedulers::optional_duration{};
return rpp::schedulers::optional_delay_from_now{};
}
};

Expand Down
84 changes: 39 additions & 45 deletions src/rpp/rpp/schedulers/current_thread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,20 @@ namespace rpp::schedulers
* worker.schedule([](const auto&)
* {
* std::cout << "Task 4" << std::endl;
* return rpp::schedulers::optional_duration{};
* return rpp::schedulers::optional_delay_from_now{};
* }, handler);
* std::cout << "Task 2 ends" << std::endl;
* return rpp::schedulers::optional_duration{};
* return rpp::schedulers::optional_delay_from_now{};
* }, handler);
*
* worker.schedule([](const auto&)
* {
* std::cout << "Task 3" << std::endl;
* return rpp::schedulers::optional_duration{};
* return rpp::schedulers::optional_delay_from_now{};
* }, handler);
*
* std::cout << "Task 1 ends" << std::endl;
* return rpp::schedulers::optional_duration{};
* return rpp::schedulers::optional_delay_from_now{};
* }, handler);
* \endcode
* Would lead to:
Expand Down Expand Up @@ -87,105 +87,99 @@ namespace rpp::schedulers
class current_thread
{
friend class new_thread;
class worker_strategy;

inline static thread_local std::optional<details::schedulables_queue> s_queue{};
inline static thread_local time_point s_last_now_time{};
inline static thread_local std::optional<details::schedulables_queue<worker_strategy>> s_queue{};

struct is_queue_is_empty
{
const details::schedulables_queue& queue;
const details::schedulables_queue<worker_strategy>& queue;

bool operator()() const { return queue.is_empty(); }
};

static void sleep_until(const time_point timepoint)
{
if (timepoint <= s_last_now_time)
return;

const auto now = clock_type::now();
std::this_thread::sleep_for(timepoint - now);
s_last_now_time = std::max(now, timepoint);
}

static void drain_current_queue()
{
drain_queue(s_queue);
}

static void drain_queue(std::optional<details::schedulables_queue>& queue)
static void drain_queue(std::optional<details::schedulables_queue<worker_strategy>>& queue)
{
while (!queue->is_empty())
{
auto top = queue->pop();
if (top->is_disposed())
continue;

sleep_until(top->get_timepoint());

optional_duration duration{0};
std::optional<time_point> timepoint{top->get_timepoint()};
// immediate like scheduling
do
{
if (duration.value() > duration::zero() && !top->is_disposed())
std::this_thread::sleep_for(duration.value());
if (timepoint && !top->is_disposed())
details::sleep_until(top->get_timepoint());

if (top->is_disposed())
duration.reset();
timepoint.reset();
else
duration = (*top)();
timepoint = (*top)();

} while (queue->is_empty() && duration.has_value());
} while (queue->is_empty() && timepoint.has_value());

if (duration.has_value())
queue->emplace(worker_strategy::now() + duration.value(), std::move(top));
if (timepoint.has_value())
queue->emplace(timepoint.value(), std::move(top));
}

queue.reset();
}

public:
static rpp::utils::finally_action<void (*)()> own_queue_and_drain_finally_if_not_owned()
{
const bool someone_owns_queue = s_queue.has_value();

if (!someone_owns_queue)
s_queue.emplace();

return rpp::utils::finally_action{!someone_owns_queue ? &drain_current_queue : &rpp::utils::empty_function<>};
}

class worker_strategy
{
public:
template<rpp::schedulers::constraint::schedulable_handler Handler, typename... Args, constraint::schedulable_fn<Handler, Args...> Fn>
static void defer_for(duration duration, Fn&& fn, Handler&& handler, Args&&... args)
{
if (handler.is_disposed())
return;

auto& queue = s_queue;
const bool someone_owns_queue = queue.has_value();
std::optional<time_point> timepoint{};
if (!someone_owns_queue)
{
queue.emplace();

const auto optional_duration = details::immediate_scheduling_while_condition(duration, is_queue_is_empty{queue.value()}, fn, handler, args...);
if (!optional_duration || handler.is_disposed())
timepoint = details::immediate_scheduling_while_condition<worker_strategy>(duration, is_queue_is_empty{queue.value()}, fn, handler, args...);
if (!timepoint || handler.is_disposed())
return drain_queue(queue);
duration = optional_duration.value();
}
else if (handler.is_disposed())
return;
else
{
timepoint = now() + duration;
}

queue->emplace(now() + duration, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
queue->emplace(timepoint.value(), std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);

if (!someone_owns_queue)
drain_queue(queue);
}

static constexpr rpp::schedulers::details::none_disposable get_disposable() { return {}; }

static rpp::schedulers::time_point now() { return s_last_now_time = clock_type::now(); }
static rpp::schedulers::time_point now() { return details::now(); }
};

public:
static rpp::utils::finally_action<void (*)()> own_queue_and_drain_finally_if_not_owned()
{
const bool someone_owns_queue = s_queue.has_value();

if (!someone_owns_queue)
s_queue.emplace();

return rpp::utils::finally_action{!someone_owns_queue ? &drain_current_queue : &rpp::utils::empty_function<>};
}

static rpp::schedulers::worker<worker_strategy> create_worker()
{
return rpp::schedulers::worker<worker_strategy>{};
Expand Down
Loading

1 comment on commit d9813a3

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BENCHMARK RESULTS (AUTOGENERATED)

ci-ubuntu-gcc

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 303.46 ns 1.24 ns 1.23 ns 1.00
Subscribe empty callbacks to empty observable via pipe operator 302.27 ns 1.24 ns 1.23 ns 1.00

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 729.56 ns 0.31 ns 0.62 ns 0.50
from array of 1 - create + subscribe + current_thread 1025.89 ns 4.63 ns 4.94 ns 0.94
concat_as_source of just(1 immediate) create + subscribe 2253.45 ns 5.25 ns 5.25 ns 1.00
defer from array of 1 - defer + create + subscribe + immediate 746.80 ns 0.31 ns 0.31 ns 1.00
interval - interval + take(3) + subscribe + immediate 2109.58 ns 58.03 ns 1.23 ns 47.03
interval - interval + take(3) + subscribe + current_thread 3065.47 ns 32.12 ns 7.10 ns 4.53

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 1074.32 ns 0.31 ns 0.31 ns 1.00
immediate_just+filter(true)+subscribe 819.78 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+skip(1)+subscribe 1015.16 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,1,2)+distinct_until_changed()+subscribe 844.98 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+first()+subscribe 1231.00 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+last()+subscribe 929.90 ns 0.31 ns 0.31 ns 1.00
immediate_just+take_last(1)+subscribe 1108.43 ns 18.22 ns 17.28 ns 1.05

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 256.56 ns 1.24 ns 1.23 ns 1.00
current_thread scheduler create worker + schedule 371.78 ns 5.56 ns 5.87 ns 0.95
current_thread scheduler create worker + schedule + recursive schedule 813.41 ns 70.94 ns 54.80 ns 1.29

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 831.03 ns 0.31 ns 0.31 ns 1.00
immediate_just+scan(10, std::plus)+subscribe 886.49 ns 0.31 ns 0.31 ns 1.00
immediate_just+flat_map(immediate_just(v*2))+subscribe 2356.17 ns 81.40 ns 80.49 ns 1.01
immediate_just+buffer(2)+subscribe 1550.99 ns 13.90 ns 13.58 ns 1.02
immediate_just+window(2)+subscribe + subscsribe inner 2332.40 ns 569.69 ns 596.16 ns 0.96

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 817.41 ns - - 0.00
immediate_just+take_while(true)+subscribe 830.34 ns 0.31 ns 0.31 ns 1.00

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 1997.71 ns 0.31 ns 0.31 ns 1.00

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 3437.73 ns 94.83 ns 91.89 ns 1.03
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 3628.98 ns 88.30 ns 83.93 ns 1.05
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 84.60 ns 89.53 ns 0.94

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 34.54 ns 23.80 ns 52.39 ns 0.45

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 1436.64 ns 14.82 ns 15.43 ns 0.96
basic sample with immediate scheduler 1321.71 ns 5.55 ns 5.55 ns 1.00

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 926.58 ns 0.31 ns 0.31 ns 1.00

ci-macos

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 986.38 ns 3.03 ns 3.96 ns 0.77
Subscribe empty callbacks to empty observable via pipe operator 983.71 ns 3.03 ns 3.80 ns 0.80

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 1929.51 ns 0.23 ns 0.28 ns 0.84
from array of 1 - create + subscribe + current_thread 2431.06 ns 25.58 ns 29.72 ns 0.86
concat_as_source of just(1 immediate) create + subscribe 5445.94 ns 4.90 ns 5.77 ns 0.85
defer from array of 1 - defer + create + subscribe + immediate 2023.33 ns 0.23 ns 0.26 ns 0.89
interval - interval + take(3) + subscribe + immediate 4950.48 ns 114.06 ns 0.77 ns 147.92
interval - interval + take(3) + subscribe + current_thread 6283.99 ns 108.22 ns 39.33 ns 2.75

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 2864.79 ns 0.23 ns 0.23 ns 1.00
immediate_just+filter(true)+subscribe 2120.62 ns 0.23 ns 0.23 ns 1.00
immediate_just(1,2)+skip(1)+subscribe 2914.85 ns 0.25 ns 0.24 ns 1.03
immediate_just(1,1,2)+distinct_until_changed()+subscribe 2220.96 ns 0.48 ns 0.49 ns 0.98
immediate_just(1,2)+first()+subscribe 3225.40 ns 0.24 ns 0.24 ns 1.00
immediate_just(1,2)+last()+subscribe 2595.21 ns 0.23 ns 0.24 ns 0.95
immediate_just+take_last(1)+subscribe 3088.48 ns 78.61 ns 70.38 ns 1.12

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 933.38 ns 3.37 ns 2.33 ns 1.44
current_thread scheduler create worker + schedule 1243.62 ns 36.58 ns 35.42 ns 1.03
current_thread scheduler create worker + schedule + recursive schedule 3033.76 ns 344.54 ns 222.96 ns 1.55

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 2474.47 ns 0.52 ns 0.23 ns 2.23
immediate_just+scan(10, std::plus)+subscribe 3590.44 ns 0.62 ns 0.47 ns 1.33
immediate_just+flat_map(immediate_just(v*2))+subscribe 5769.83 ns 552.16 ns 318.32 ns 1.73
immediate_just+buffer(2)+subscribe 2907.81 ns 81.81 ns 68.09 ns 1.20
immediate_just+window(2)+subscribe + subscsribe inner 5576.32 ns 1516.84 ns 1351.07 ns 1.12

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 2332.26 ns - - 0.00
immediate_just+take_while(true)+subscribe 3016.20 ns 0.39 ns 0.23 ns 1.67

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 5266.11 ns 0.25 ns 0.23 ns 1.08

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 10333.39 ns 444.22 ns 351.10 ns 1.27
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 9805.06 ns 600.81 ns 359.61 ns 1.67
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 429.84 ns 392.71 ns 1.09

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 78.34 ns 77.58 ns 72.01 ns 1.08

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 2956.23 ns 124.55 ns 105.22 ns 1.18
basic sample with immediate scheduler 3127.42 ns 18.79 ns 17.72 ns 1.06

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 2504.64 ns 0.25 ns 0.23 ns 1.08

ci-ubuntu-clang

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 267.16 ns 0.89 ns 0.90 ns 1.00
Subscribe empty callbacks to empty observable via pipe operator 267.36 ns 0.88 ns 0.88 ns 1.00

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 554.07 ns 0.31 ns 0.31 ns 1.00
from array of 1 - create + subscribe + current_thread 785.14 ns 5.56 ns 5.56 ns 1.00
concat_as_source of just(1 immediate) create + subscribe 1919.47 ns 0.31 ns 0.31 ns 1.00
defer from array of 1 - defer + create + subscribe + immediate 564.83 ns 0.31 ns 0.31 ns 1.00
interval - interval + take(3) + subscribe + immediate 1512.17 ns 57.07 ns 0.46 ns 123.34
interval - interval + take(3) + subscribe + current_thread 2107.65 ns 30.86 ns 5.87 ns 5.26

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 918.71 ns 0.31 ns 0.31 ns 1.00
immediate_just+filter(true)+subscribe 656.14 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+skip(1)+subscribe 833.62 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,1,2)+distinct_until_changed()+subscribe 672.39 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+first()+subscribe 1076.18 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+last()+subscribe 740.19 ns 0.31 ns 0.31 ns 1.00
immediate_just+take_last(1)+subscribe 960.44 ns 0.31 ns 0.31 ns 1.00

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 202.72 ns 0.88 ns 0.89 ns 0.99
current_thread scheduler create worker + schedule 306.92 ns 5.56 ns 5.55 ns 1.00
current_thread scheduler create worker + schedule + recursive schedule 629.64 ns 58.91 ns 56.93 ns 1.03

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 640.47 ns 0.31 ns 0.31 ns 1.00
immediate_just+scan(10, std::plus)+subscribe 712.83 ns 0.31 ns 0.62 ns 0.50
immediate_just+flat_map(immediate_just(v*2))+subscribe 1749.47 ns 70.46 ns 70.67 ns 1.00
immediate_just+buffer(2)+subscribe 1405.63 ns 14.51 ns 13.28 ns 1.09
immediate_just+window(2)+subscribe + subscsribe inner 2162.43 ns 368.78 ns 358.79 ns 1.03

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 649.74 ns - - 0.00
immediate_just+take_while(true)+subscribe 636.86 ns 0.31 ns 0.31 ns 1.00

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 1563.46 ns 0.31 ns 0.31 ns 1.00

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 2465.77 ns 80.96 ns 78.94 ns 1.03
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 3044.88 ns 86.99 ns 81.38 ns 1.07
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 83.78 ns 78.20 ns 1.07

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 23.28 ns 24.66 ns 24.37 ns 1.01

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 1006.63 ns 13.57 ns 11.73 ns 1.16
basic sample with immediate scheduler 1037.07 ns 6.17 ns 5.86 ns 1.05

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 722.67 ns 0.31 ns 0.31 ns 1.00

ci-windows

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 565.24 ns 2.47 ns 3.09 ns 0.80
Subscribe empty callbacks to empty observable via pipe operator 582.87 ns 2.47 ns 3.09 ns 0.80

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 1179.19 ns 5.24 ns 8.02 ns 0.65
from array of 1 - create + subscribe + current_thread 1433.68 ns 20.02 ns 26.02 ns 0.77
concat_as_source of just(1 immediate) create + subscribe 4680.00 ns 11.11 ns 10.49 ns 1.06
defer from array of 1 - defer + create + subscribe + immediate 1230.66 ns 5.55 ns 8.42 ns 0.66
interval - interval + take(3) + subscribe + immediate 2965.16 ns 129.88 ns 19.77 ns 6.57
interval - interval + take(3) + subscribe + current_thread 3455.48 ns 59.87 ns 25.00 ns 2.40

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 1844.58 ns 12.86 ns 12.92 ns 1.00
immediate_just+filter(true)+subscribe 1338.56 ns 12.33 ns 12.38 ns 1.00
immediate_just(1,2)+skip(1)+subscribe 1749.85 ns 13.87 ns 13.03 ns 1.06
immediate_just(1,1,2)+distinct_until_changed()+subscribe 1367.64 ns 15.77 ns 15.93 ns 0.99
immediate_just(1,2)+first()+subscribe 2060.97 ns 12.64 ns 12.69 ns 1.00
immediate_just(1,2)+last()+subscribe 1493.02 ns 14.05 ns 14.10 ns 1.00
immediate_just+take_last(1)+subscribe 2368.45 ns 59.21 ns 65.63 ns 0.90

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 490.22 ns 4.94 ns 8.64 ns 0.57
current_thread scheduler create worker + schedule 655.44 ns 16.04 ns 8.95 ns 1.79
current_thread scheduler create worker + schedule + recursive schedule 1080.30 ns 106.96 ns 166.91 ns 0.64

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 1339.70 ns 12.18 ns 13.96 ns 0.87
immediate_just+scan(10, std::plus)+subscribe 1424.41 ns 21.27 ns 21.43 ns 0.99
immediate_just+flat_map(immediate_just(v*2))+subscribe 3564.24 ns 185.37 ns 209.33 ns 0.89
immediate_just+buffer(2)+subscribe 2385.74 ns 59.71 ns 58.67 ns 1.02
immediate_just+window(2)+subscribe + subscsribe inner 4110.43 ns 814.71 ns 866.37 ns 0.94

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 1356.63 ns 11.46 ns 11.45 ns 1.00
immediate_just+take_while(true)+subscribe 1338.67 ns 12.33 ns 12.34 ns 1.00

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 3196.92 ns 7.40 ns 7.09 ns 1.04

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 5249.25 ns 220.97 ns 224.50 ns 0.98
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 6525.14 ns 215.38 ns 202.50 ns 1.06
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 221.00 ns 213.87 ns 1.03

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 36.25 ns 36.10 ns 40.60 ns 0.89

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 2375.53 ns 59.47 ns 54.22 ns 1.10
basic sample with immediate scheduler 2277.11 ns 37.03 ns 37.03 ns 1.00

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 1763.23 ns 19.97 ns 19.98 ns 1.00

Please sign in to comment.