Skip to content

Commit

Permalink
Refactor disposbles (#483)
Browse files Browse the repository at this point in the history
  • Loading branch information
victimsnino authored Dec 3, 2023
1 parent 887403a commit 02fea8c
Show file tree
Hide file tree
Showing 12 changed files with 156 additions and 134 deletions.
36 changes: 31 additions & 5 deletions src/rpp/rpp/disposables/composite_disposable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,11 @@ namespace rpp
*
* @ingroup disposables
*/
template<rpp::constraint::decayed_type Container>
template<details::disposables::constraint::disposable_container Container>
class composite_disposable_impl : public interface_composite_disposable
{
public:
composite_disposable_impl()
requires details::disposables::constraint::disposable_container<Container>
= default;

composite_disposable_impl()= default;
composite_disposable_impl(const composite_disposable_impl&) = delete;
composite_disposable_impl(composite_disposable_impl&& other) noexcept = delete;

Expand Down Expand Up @@ -99,6 +96,33 @@ class composite_disposable_impl : public interface_composite_disposable
}
}

void remove(const disposable_wrapper& disposable) override
{
while (true)
{
State expected{State::None};
// need to acquire possible disposables state changing from other `add` or `remove`
if (m_current_state.compare_exchange_strong(expected, State::Edit, std::memory_order::acquire, std::memory_order::relaxed))
{
try
{
m_disposables.remove(disposable);
}
catch(...)
{
m_current_state.store(State::None, std::memory_order::release);
throw;
}
// need to propogate disposables state changing to others
m_current_state.store(State::None, std::memory_order::release);
return;
}

if (expected == State::Disposed)
return;
}
}

protected:
virtual void dispose_impl() noexcept {}

Expand All @@ -113,4 +137,6 @@ class composite_disposable_impl : public interface_composite_disposable
Container m_disposables{};
std::atomic<State> m_current_state{};
};

class composite_disposable : public composite_disposable_impl<rpp::details::disposables::dynamic_disposables_container<0>>{};
} // namespace rpp
20 changes: 18 additions & 2 deletions src/rpp/rpp/disposables/details/container.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <rpp/utils/exceptions.hpp>

#include <array>
#include <algorithm>
#include <vector>

namespace rpp::details::disposables
Expand All @@ -35,6 +36,11 @@ class dynamic_disposables_container_base
m_data.push_back(std::move(d));
}

void remove(const rpp::disposable_wrapper& d)
{
m_data.erase(std::remove(m_data.begin(), m_data.end(), d), m_data.end());
}

void dispose() const
{
for (auto& d : m_data) {
Expand Down Expand Up @@ -80,6 +86,15 @@ class static_disposables_container
m_data[m_size++] = std::move(d);
}

void remove(const rpp::disposable_wrapper& d)
{
auto itr = std::remove(m_data.begin(), m_data.end(), d);
while(itr != m_data.end()) {
(*itr++) = disposable_wrapper{};
--m_size;
}
}

void dispose() const
{
for (size_t i =0; i < m_size; ++i) {
Expand All @@ -105,7 +120,8 @@ struct none_disposables_container
throw rpp::utils::more_disposables_than_expected{"none_disposables_container expected none disposables but obtained one"};
}

static void dispose() {};
static void clear() {};
static void remove(const rpp::disposable_wrapper&) {}
static void dispose() {}
static void clear() {}
};
}
12 changes: 12 additions & 0 deletions src/rpp/rpp/disposables/disposable_wrapper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ class disposable_wrapper_impl
return disposable_wrapper_impl{weak_tag{}, std::move(disposable)};
}

bool operator==(const disposable_wrapper_impl& other) const
{
return get_original() == other.get_original();
}

bool is_disposed() const noexcept
{
if (const auto locked = get_original())
Expand All @@ -88,6 +93,13 @@ class disposable_wrapper_impl
other.dispose();
}

void remove(const disposable_wrapper& other) const
requires std::derived_from<TDisposable, interface_composite_disposable>
{
if (const auto locked = get_original())
locked->remove(other);
}

std::shared_ptr<TDisposable> get_original() const noexcept
{
if (const auto ptr_ptr = std::get_if<std::shared_ptr<TDisposable>>(&m_disposable))
Expand Down
5 changes: 1 addition & 4 deletions src/rpp/rpp/disposables/fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,7 @@ namespace constraint

namespace rpp
{
template<rpp::constraint::decayed_type Container>
class composite_disposable_impl;

using composite_disposable = composite_disposable_impl<rpp::details::disposables::dynamic_disposables_container<0>>;
class composite_disposable;

template<rpp::constraint::is_nothrow_invocable Fn>
class callback_disposable;
Expand Down
8 changes: 6 additions & 2 deletions src/rpp/rpp/disposables/interface_composite_disposable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,13 @@ struct interface_composite_disposable : public interface_disposable
virtual void add(disposable_wrapper disposable) = 0;

template<rpp::constraint::is_nothrow_invocable Fn>
void add(Fn&& invocable)
disposable_wrapper add(Fn&& invocable)
{
add(make_callback_disposable(std::forward<Fn>(invocable)));
auto d = make_callback_disposable(std::forward<Fn>(invocable));
add(d);
return d;
}

virtual void remove(const disposable_wrapper& d) = 0;
};
}
123 changes: 35 additions & 88 deletions src/rpp/rpp/disposables/refcount_disposable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,110 +18,76 @@

#include <atomic>
#include <memory>
#include <limits>

namespace rpp::details
{
struct refocunt_disposable_state_t final : public composite_disposable
class refocunt_disposable_state_t final : public rpp::composite_disposable
{
refocunt_disposable_state_t() = default;
refocunt_disposable_state_t(const refocunt_disposable_state_t&) = delete;
refocunt_disposable_state_t(refocunt_disposable_state_t&&) noexcept = delete;

private:
using composite_disposable::dispose;

void dispose_impl() noexcept override
{
m_refcount.store(0, std::memory_order::relaxed);
}

public:
void try_dispose()
void dispose_impl() noexcept override
{
// just need atomicity, not guarding anything
if (m_refcount.fetch_sub(1, std::memory_order::relaxed) == 1)
dispose();
m_refcount.store(s_disposed, std::memory_order::relaxed);
}

bool is_disposed_by_any() const noexcept
void release()
{
return m_refcount.load(std::memory_order::relaxed) == 0 || is_disposed();
auto current_value = m_refcount.load(std::memory_order::relaxed);
while (true)
{
if (current_value == s_disposed)
return;

const size_t new_value = current_value == 1 ? s_disposed : current_value - 1;
if (!m_refcount.compare_exchange_strong(current_value, new_value, std::memory_order::relaxed, std::memory_order::relaxed))
continue;

if (new_value == s_disposed)
dispose();
return;
}
}

bool add_ref()
{
auto current_value = m_refcount.load(std::memory_order::relaxed);
// just need atomicity, not guarding anything
while (current_value && !m_refcount.compare_exchange_strong(current_value, current_value + 1, std::memory_order::relaxed, std::memory_order::relaxed)){};

return current_value;
}

private:
std::atomic_size_t m_refcount{1};
};

class inner_refcount_disposable final : public interface_composite_disposable
{
public:
explicit inner_refcount_disposable(const std::shared_ptr<refocunt_disposable_state_t>& state)
: m_state{state}
{
}

inner_refcount_disposable(const inner_refcount_disposable&) = delete;
inner_refcount_disposable(inner_refcount_disposable&&) noexcept = delete;

void dispose() noexcept override
{
if (m_disposed.exchange(true, std::memory_order::relaxed) == false)
m_state->try_dispose();
}
while (current_value != s_disposed && !m_refcount.compare_exchange_strong(current_value, current_value + 1, std::memory_order::relaxed, std::memory_order::relaxed)){};

using interface_composite_disposable::add;

void add(disposable_wrapper disposable) override
{
m_state->add(std::move(disposable));
return current_value != s_disposed;
}

bool is_disposed() const noexcept override
{
return m_disposed.load(std::memory_order::relaxed) || m_state->is_disposed_by_any();
}

private:
std::shared_ptr<refocunt_disposable_state_t> m_state;
std::atomic_bool m_disposed{};
std::atomic<size_t> m_refcount{0};
constexpr static size_t s_disposed = std::numeric_limits<size_t>::max();
};
}

namespace rpp
{
/**
* @brief Disposable with counter inside. Each `add_ref` increments counter, while each `dispose()` call decrements. In case of reaching zero disposes underlying disposables
* @warning Don't use it as disposable of observer due to `is_disposed()` would be false till counter reaches zero, so, observer can be also not `is_disposed()` during this time.
*
* @ingroup disposables
*/
class refcount_disposable : public interface_composite_disposable,
public std::enable_shared_from_this<refcount_disposable> {
class refcount_disposable : public std::enable_shared_from_this<refcount_disposable> {

public:
refcount_disposable() = default;

refcount_disposable(const refcount_disposable&) = delete;
refcount_disposable(refcount_disposable&&) noexcept = delete;

bool is_disposed_underlying() const noexcept
{
return m_state.is_disposed_by_any();
return m_state.is_disposed();
}

composite_disposable_wrapper add_ref()
{
if (m_state.add_ref())
return composite_disposable_wrapper{std::make_shared<details::inner_refcount_disposable>(std::shared_ptr<details::refocunt_disposable_state_t>{this->shared_from_this(), &this->m_state})};
{
auto inner = std::make_shared<rpp::composite_disposable_impl<rpp::details::disposables::dynamic_disposables_container<1>>>();
auto as_weak = rpp::disposable_wrapper::from_weak(inner);
m_state.add(as_weak);
inner->add([s = shared_from_this(), as_weak]() noexcept {
s->m_state.remove(as_weak);
s->m_state.release();
});
return composite_disposable_wrapper{inner};
}

return composite_disposable_wrapper{};
}
Expand All @@ -131,26 +97,7 @@ class refcount_disposable : public interface_composite_disposable,
return composite_disposable_wrapper::from_shared(std::shared_ptr<rpp::composite_disposable>{this->shared_from_this(), &this->m_state});
}

bool is_disposed() const noexcept final
{
return m_disposed.load(std::memory_order::relaxed) || m_state.is_disposed_by_any();
}

void dispose() noexcept final
{
if (m_disposed.exchange(true, std::memory_order::relaxed) == false)
m_state.try_dispose();
}

using interface_composite_disposable::add;

void add(disposable_wrapper disposable) final
{
m_state.add(std::move(disposable));
}

private:
details::refocunt_disposable_state_t m_state{};
std::atomic_bool m_disposed{};
};
} // namespace rpp
2 changes: 1 addition & 1 deletion src/rpp/rpp/observables/connectable_observable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ struct ref_count_on_subscribe_t<rpp::connectable_observable<OriginalObservable,
return {m_state->disposable->add_ref(), std::nullopt};

m_state->disposable = std::make_shared<rpp::refcount_disposable>();
return {m_state->disposable, m_state->disposable->get_underlying()};
return {m_state->disposable->add_ref(), m_state->disposable->get_underlying()};
}
};
}
Expand Down
2 changes: 1 addition & 1 deletion src/rpp/rpp/operators/details/forwarding_subject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class forwarding_strategy
explicit forwarding_strategy(std::shared_ptr<rpp::refcount_disposable> refcount)
: m_refcount{std::move(refcount)}
{
m_refcount->add(rpp::disposable_wrapper::from_weak(m_state));
m_refcount->get_underlying().add(rpp::disposable_wrapper::from_weak(m_state));
}

auto get_observer() const
Expand Down
11 changes: 6 additions & 5 deletions src/rpp/rpp/operators/group_by.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,12 @@ struct group_by_observer_strategy

RPP_CALL_DURING_CONSTRUCTION(
{
observer.set_upstream(rpp::disposable_wrapper{disposable});
observer.set_upstream(disposable->add_ref());
});

void set_upstream(const rpp::disposable_wrapper& d) const
{
disposable->add(d);
disposable->get_underlying().add(d);
}

bool is_disposed() const
Expand Down Expand Up @@ -134,7 +134,7 @@ struct group_by_observer_strategy

if (inserted)
{
disposable->add(rpp::disposable_wrapper::from_weak(itr->second.get_disposable().get_original()));
disposable->get_underlying().add(rpp::disposable_wrapper::from_weak(itr->second.get_disposable().get_original()));
obs.on_next(rpp::grouped_observable_group_by<TKey, Type>{
std::move(key),
group_by_observable_strategy<Type>{itr->second, disposable}
Expand All @@ -158,9 +158,10 @@ struct group_by_observable_strategy
{
if (const auto locked = disposable.lock())
{
obs.set_upstream(locked->add_ref());
auto d = locked->add_ref();
obs.set_upstream(d);
subj.get_observable()
.subscribe(rpp::observer<T, group_by_inner_observer_strategy<observer<T, Strategy>>>{std::move(obs), rpp::composite_disposable_wrapper::from_weak(disposable)});
.subscribe(rpp::observer<T, group_by_inner_observer_strategy<observer<T, Strategy>>>{std::move(obs), std::move(d)});
}
}
};
Expand Down
Loading

1 comment on commit 02fea8c

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BENCHMARK RESULTS (AUTOGENERATED)

ci-ubuntu-gcc

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 307.15 ns 1.23 ns 1.24 ns 1.00
Subscribe empty callbacks to empty observable via pipe operator 305.62 ns 1.23 ns 1.23 ns 1.00

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 696.19 ns 0.31 ns 0.62 ns 0.50
from array of 1 - create + subscribe + current_thread 1034.81 ns 5.25 ns 4.63 ns 1.13
concat_as_source of just(1 immediate) create + subscribe 2267.57 ns 5.25 ns 4.94 ns 1.06
defer from array of 1 - defer + create + subscribe + immediate 752.57 ns 0.31 ns 0.31 ns 1.00
interval - interval + take(3) + subscribe + immediate 2113.30 ns 58.02 ns 58.00 ns 1.00
interval - interval + take(3) + subscribe + current_thread 3002.76 ns 32.11 ns 32.12 ns 1.00

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 1098.15 ns 0.31 ns 0.31 ns 1.00
immediate_just+filter(true)+subscribe 834.66 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+skip(1)+subscribe 1014.49 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,1,2)+distinct_until_changed()+subscribe 847.11 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+first()+subscribe 1247.88 ns 0.31 ns 0.62 ns 0.50
immediate_just(1,2)+last()+subscribe 933.16 ns 0.31 ns 0.31 ns 1.00
immediate_just+take_last(1)+subscribe 1132.90 ns 17.90 ns 17.28 ns 1.04

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 264.45 ns 1.24 ns 1.23 ns 1.00
current_thread scheduler create worker + schedule 367.36 ns 6.25 ns 5.55 ns 1.13
current_thread scheduler create worker + schedule + recursive schedule 801.35 ns 65.53 ns 65.73 ns 1.00

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 823.06 ns 0.31 ns 0.31 ns 1.00
immediate_just+scan(10, std::plus)+subscribe 907.77 ns 0.31 ns 0.31 ns 1.00
immediate_just+flat_map(immediate_just(v*2))+subscribe 2345.67 ns 84.90 ns 81.28 ns 1.04
immediate_just+buffer(2)+subscribe 1507.34 ns 13.59 ns 13.58 ns 1.00
immediate_just+window(2)+subscribe + subscsribe inner 2337.87 ns 906.62 ns 539.30 ns 1.68

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 838.51 ns - - 0.00
immediate_just+take_while(true)+subscribe 856.57 ns 0.31 ns 0.31 ns 1.00

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 2091.24 ns 0.31 ns 0.31 ns 1.00

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 3504.18 ns 98.84 ns 106.39 ns 0.93
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 3726.15 ns 90.74 ns 84.63 ns 1.07
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 85.97 ns 85.97 ns 1.00
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 3527.48 ns 790.62 ns 268.16 ns 2.95

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 34.53 ns 51.21 ns 24.03 ns 2.13

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 1423.33 ns 14.50 ns 15.74 ns 0.92
basic sample with immediate scheduler 1381.94 ns 5.55 ns 5.56 ns 1.00

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 926.34 ns 0.31 ns 0.31 ns 1.00

ci-macos

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 1127.88 ns 3.51 ns 2.36 ns 1.49
Subscribe empty callbacks to empty observable via pipe operator 1171.21 ns 3.52 ns 2.34 ns 1.50

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 2480.38 ns 0.46 ns 0.24 ns 1.89
from array of 1 - create + subscribe + current_thread 2945.27 ns 8.64 ns 25.89 ns 0.33
concat_as_source of just(1 immediate) create + subscribe 6978.99 ns 6.88 ns 6.33 ns 1.09
defer from array of 1 - defer + create + subscribe + immediate 2337.33 ns 0.28 ns 0.24 ns 1.17
interval - interval + take(3) + subscribe + immediate 6503.86 ns 109.93 ns 114.61 ns 0.96
interval - interval + take(3) + subscribe + current_thread 7918.07 ns 60.89 ns 110.45 ns 0.55

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 3473.06 ns 0.28 ns 0.23 ns 1.20
immediate_just+filter(true)+subscribe 2703.26 ns 0.28 ns 0.25 ns 1.14
immediate_just(1,2)+skip(1)+subscribe 3276.75 ns 0.28 ns 0.23 ns 1.19
immediate_just(1,1,2)+distinct_until_changed()+subscribe 2517.60 ns 0.56 ns 0.47 ns 1.20
immediate_just(1,2)+first()+subscribe 4090.81 ns 0.28 ns 0.26 ns 1.08
immediate_just(1,2)+last()+subscribe 2874.40 ns 0.28 ns 0.23 ns 1.19
immediate_just+take_last(1)+subscribe 3770.42 ns 78.74 ns 76.42 ns 1.03

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 981.56 ns 3.42 ns 2.28 ns 1.50
current_thread scheduler create worker + schedule 1290.73 ns 13.58 ns 34.91 ns 0.39
current_thread scheduler create worker + schedule + recursive schedule 2289.23 ns 156.76 ns 216.26 ns 0.72

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 2704.26 ns 0.28 ns 0.23 ns 1.21
immediate_just+scan(10, std::plus)+subscribe 2780.67 ns 0.56 ns 0.47 ns 1.20
immediate_just+flat_map(immediate_just(v*2))+subscribe 6401.62 ns 323.47 ns 321.39 ns 1.01
immediate_just+buffer(2)+subscribe 3095.89 ns 70.15 ns 68.98 ns 1.02
immediate_just+window(2)+subscribe + subscsribe inner 6565.38 ns 2727.04 ns 1340.62 ns 2.03

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 2636.60 ns - - 0.00
immediate_just+take_while(true)+subscribe 2525.91 ns 0.28 ns 0.25 ns 1.14

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 6320.56 ns 0.28 ns 0.23 ns 1.20

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 9831.78 ns 361.58 ns 345.91 ns 1.05
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 10909.34 ns 356.52 ns 361.31 ns 0.99
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 397.88 ns 400.01 ns 0.99
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 10339.83 ns 2644.15 ns 793.20 ns 3.33

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 101.50 ns 89.48 ns 70.56 ns 1.27

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 3458.77 ns 57.12 ns 111.11 ns 0.51
basic sample with immediate scheduler 3652.31 ns 6.44 ns 15.21 ns 0.42

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 2887.99 ns 0.28 ns 0.23 ns 1.20

ci-ubuntu-clang

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 268.70 ns 0.88 ns 0.88 ns 1.00
Subscribe empty callbacks to empty observable via pipe operator 276.72 ns 0.89 ns 0.89 ns 1.00

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 568.07 ns 0.31 ns 0.31 ns 1.00
from array of 1 - create + subscribe + current_thread 782.96 ns 5.56 ns 5.55 ns 1.00
concat_as_source of just(1 immediate) create + subscribe 1891.66 ns 0.31 ns 0.31 ns 1.00
defer from array of 1 - defer + create + subscribe + immediate 573.67 ns 0.31 ns 0.31 ns 1.00
interval - interval + take(3) + subscribe + immediate 1519.58 ns 57.03 ns 57.07 ns 1.00
interval - interval + take(3) + subscribe + current_thread 2092.87 ns 30.86 ns 30.88 ns 1.00

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 933.35 ns 0.31 ns 0.31 ns 1.00
immediate_just+filter(true)+subscribe 642.90 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+skip(1)+subscribe 840.05 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,1,2)+distinct_until_changed()+subscribe 688.17 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+first()+subscribe 1046.31 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+last()+subscribe 756.68 ns 0.31 ns 0.31 ns 1.00
immediate_just+take_last(1)+subscribe 956.34 ns 0.31 ns 0.31 ns 1.00

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 204.64 ns 0.89 ns 0.89 ns 1.00
current_thread scheduler create worker + schedule 315.17 ns 5.56 ns 5.56 ns 1.00
current_thread scheduler create worker + schedule + recursive schedule 633.99 ns 58.92 ns 58.52 ns 1.01

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 680.14 ns 0.31 ns 0.31 ns 1.00
immediate_just+scan(10, std::plus)+subscribe 736.54 ns 0.31 ns 0.31 ns 1.00
immediate_just+flat_map(immediate_just(v*2))+subscribe 1840.67 ns 72.54 ns 68.91 ns 1.05
immediate_just+buffer(2)+subscribe 1355.20 ns 14.91 ns 13.89 ns 1.07
immediate_just+window(2)+subscribe + subscsribe inner 2151.65 ns 768.47 ns 381.16 ns 2.02

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 653.33 ns - - 0.00
immediate_just+take_while(true)+subscribe 653.66 ns 0.31 ns 0.31 ns 1.00

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 1583.48 ns 0.31 ns 0.31 ns 1.00

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 2562.07 ns 81.69 ns 78.41 ns 1.04
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 2995.68 ns 82.30 ns 87.61 ns 0.94
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 81.85 ns 77.81 ns 1.05
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 2692.09 ns 735.84 ns 252.87 ns 2.91

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 23.11 ns 24.56 ns 24.66 ns 1.00

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 1061.33 ns 13.58 ns 13.89 ns 0.98
basic sample with immediate scheduler 1036.47 ns 6.17 ns 5.86 ns 1.05

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 730.64 ns 0.31 ns 0.31 ns 1.00

ci-windows

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 578.76 ns 2.47 ns 3.09 ns 0.80
Subscribe empty callbacks to empty observable via pipe operator 599.43 ns 2.47 ns 3.09 ns 0.80

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 1169.83 ns 5.55 ns 4.63 ns 1.20
from array of 1 - create + subscribe + current_thread 1437.08 ns 19.74 ns 20.36 ns 0.97
concat_as_source of just(1 immediate) create + subscribe 4634.54 ns 11.42 ns 11.42 ns 1.00
defer from array of 1 - defer + create + subscribe + immediate 1207.73 ns 5.24 ns 4.94 ns 1.06
interval - interval + take(3) + subscribe + immediate 3149.69 ns 129.62 ns 131.64 ns 0.98
interval - interval + take(3) + subscribe + current_thread 3474.59 ns 59.52 ns 60.75 ns 0.98

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 1855.11 ns 12.87 ns 12.87 ns 1.00
immediate_just+filter(true)+subscribe 1336.73 ns 12.32 ns 12.33 ns 1.00
immediate_just(1,2)+skip(1)+subscribe 1764.04 ns 13.15 ns 13.67 ns 0.96
immediate_just(1,1,2)+distinct_until_changed()+subscribe 1424.67 ns 15.78 ns 15.95 ns 0.99
immediate_just(1,2)+first()+subscribe 2438.63 ns 12.64 ns 12.98 ns 0.97
immediate_just(1,2)+last()+subscribe 1502.21 ns 14.08 ns 14.19 ns 0.99
immediate_just+take_last(1)+subscribe 2043.82 ns 59.35 ns 58.99 ns 1.01

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 496.35 ns 4.32 ns 4.63 ns 0.93
current_thread scheduler create worker + schedule 672.59 ns 16.04 ns 16.98 ns 0.94
current_thread scheduler create worker + schedule + recursive schedule 1101.07 ns 105.45 ns 105.05 ns 1.00

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 1319.95 ns 12.18 ns 12.26 ns 0.99
immediate_just+scan(10, std::plus)+subscribe 1448.50 ns 22.37 ns 21.27 ns 1.05
immediate_just+flat_map(immediate_just(v*2))+subscribe 3521.99 ns 191.84 ns 183.90 ns 1.04
immediate_just+buffer(2)+subscribe 2624.25 ns 58.29 ns 58.87 ns 0.99
immediate_just+window(2)+subscribe + subscsribe inner 4086.00 ns 1360.99 ns 805.91 ns 1.69

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 1643.72 ns 11.45 ns 11.45 ns 1.00
immediate_just+take_while(true)+subscribe 1329.29 ns 12.33 ns 12.34 ns 1.00

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 3539.45 ns 8.02 ns 6.79 ns 1.18

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 5168.63 ns 217.29 ns 207.61 ns 1.05
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 6460.00 ns 208.35 ns 192.91 ns 1.08
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 222.42 ns 222.38 ns 1.00
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 6227.27 ns 1166.48 ns 409.28 ns 2.85

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 36.77 ns 36.40 ns 36.72 ns 0.99

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 1877.83 ns 60.09 ns 59.90 ns 1.00
basic sample with immediate scheduler 1875.78 ns 36.72 ns 36.75 ns 1.00

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 1810.89 ns 20.00 ns 19.98 ns 1.00

Please sign in to comment.