Skip to content

Commit

Permalink
0.3.9: Merge pull request #12 from CODIANZ/development
Browse files Browse the repository at this point in the history
Synchronize `is_subscribed` exactly until `completed`, `error`, `unsubscribed` ends.
  • Loading branch information
terukazu-inoue authored Jun 27, 2023
2 parents 6e77a79 + 9e64858 commit 71caa0d
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 2 deletions.
4 changes: 3 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"files.associations": {
"*.inc": "cpp",
"atomic": "cpp",
"iostream": "cpp",
"ostream": "cpp",
Expand Down Expand Up @@ -83,6 +84,7 @@
"__functional_base_03": "cpp",
"*.tcc": "cpp",
"__bits": "cpp",
"*.inc": "cpp"
"compare": "cpp",
"concepts": "cpp"
}
}
12 changes: 11 additions & 1 deletion include/another-rxcpp/observer.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <functional>
#include <memory>
#include <mutex>
#include <atomic>
#include "internal/tools/fn.h"
#include "internal/tools/util.h"

Expand Down Expand Up @@ -41,6 +42,8 @@ template <typename T> struct observer {
error_sp error_;
completed_sp completed_;
unsubscribe_sp unsubscribe_;
std::atomic_bool is_subscribed_;
inner() : is_subscribed_(true) {}
};

mutable std::shared_ptr<inner> inner_;
Expand All @@ -63,6 +66,10 @@ template <typename T> struct observer {
return std::make_tuple(e, c, u);
}

void set_unsubscribed() const noexcept {
inner_->is_subscribed_ = false;
}

public:
template<typename N, typename E, typename C> observer(N&& n, E&& e, C&& c) noexcept {
inner_ = std::make_shared<inner>();
Expand Down Expand Up @@ -94,6 +101,7 @@ template <typename T> struct observer {
auto u = std::get<2>(ecu);
if(e && *e) (*e)(err);
if(u && *u) (*u)();
set_unsubscribed();
}

void on_completed() const noexcept {
Expand All @@ -102,16 +110,18 @@ template <typename T> struct observer {
auto u = std::get<2>(ecu);
if(c && *c) (*c)();
if(u && *u) (*u)();
set_unsubscribed();
}

void unsubscribe() const noexcept {
auto ecu = fetch_and_reset_all();
auto u = std::get<2>(ecu);
if(u && *u) (*u)();
set_unsubscribed();
}

bool is_subscribed() const noexcept {
return inner_->next_ && inner_->error_ && inner_->completed_;
return inner_->is_subscribed_;
}
};

Expand Down
88 changes: 88 additions & 0 deletions test/case_7.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#include <another-rxcpp/observable.h>
#include <another-rxcpp/operators/flat_map.h>
#include <another-rxcpp/subjects/subject.h>
#include <another-rxcpp/utils/ready_set_go.h>
#include "common.h"
#include <thread>

using namespace another_rxcpp;
using namespace another_rxcpp::operators;

void test_case_7() {
log() << "test_case_7 -- begin" << std::endl;

{
log() << "#1 normal observer -- begin" << std::endl;
auto sbsc = observable<>::create<int>([](subscriber<int> s){
subjects::subject<int> sbj;
observables::range(0, 10)
.observe_on(schedulers::observe_on_new_thread())
.take_until(sbj.as_observable())
.subscribe(
[s, sbj](auto x) {
log() << "#1 inner next: " << x << std::endl;
if(x == 5){
sbj.as_subscriber().on_next(1);
sbj.as_subscriber().on_completed();
}
},
[s](auto err) {
log() << "#1 inner error" << std::endl;
s.on_error(err);
},
[s]{
log() << "#1 inner completed" << std::endl;
s.on_next(999);
s.on_completed();
}
);
})
.subscribe(
[](auto x) { log() << "#1 outer next: " << x << std::endl; },
[](auto) { log() << "#1 outer error" << std::endl; },
[] { log() << "#1 outer completed" << std::endl; }
);
while(sbsc.is_subscribed()) {}
log() << "#1 normal observer -- end" << std::endl;
}

{
log() << "#2 stream_controller -- begin" << std::endl;

auto sbsc = observable<>::create<int>([](subscriber<int> s){
internal::stream_controller<int> sctl(s);
subjects::subject<int> sbj;
observables::range(0, 10)
.observe_on(schedulers::observe_on_new_thread())
.take_until(sbj.as_observable())
.subscribe(sctl.new_observer<int>(
[sctl, sbj](auto, auto x) {
log() << "#2 inner next: " << x << std::endl;
if(x == 5){
sbj.as_subscriber().on_next(1);
sbj.as_subscriber().on_completed();
}
},
[sctl](auto, auto err) {
log() << "#2 inner error" << std::endl;
sctl.sink_error(err);
},
[sctl](auto serial){
log() << "#2 inner completed" << std::endl;
sctl.sink_next(999);
sctl.sink_completed(serial);
}
));
})
.subscribe(
[](auto x) { log() << "#2 outer next: " << x << std::endl; },
[](auto) { log() << "#2 outer error" << std::endl; },
[] { log() << "#2 outer completed" << std::endl; }
);
while(sbsc.is_subscribed()) {}

log() << "#2 stream_controller -- end" << std::endl;
}

log() << "test_case_7 -- end" << std::endl << std::endl;
}
1 change: 1 addition & 0 deletions test/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ int main() {
DO(test_case_4)
DO(test_case_5)
DO(test_case_6)
DO(test_case_7)

DO(move_check)

Expand Down

0 comments on commit 71caa0d

Please sign in to comment.