Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

try to catch issue with subjects #697

Merged
merged 10 commits into from
Jan 11, 2025
Merged

try to catch issue with subjects #697

merged 10 commits into from
Jan 11, 2025

Conversation

victimsnino
Copy link
Owner

@victimsnino victimsnino commented Jan 10, 2025

Summary by CodeRabbit

  • Tests
    • Added a new test case to verify the behavior of dynamic subscriptions within publish_subject.
    • Improved test coverage for subject handling of internal subscriptions.

Copy link
Contributor

coderabbitai bot commented Jan 10, 2025

Warning

Rate limit exceeded

@victimsnino has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 11 minutes and 15 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

📥 Commits

Reviewing files that changed from the base of the PR and between f180ef3 and 622be88.

📒 Files selected for processing (1)
  • src/rpp/rpp/subjects/details/subject_state.hpp (4 hunks)
📝 Walkthrough

Walkthrough

The pull request introduces a new test case in the src/tests/rpp/test_subjects.cpp file to validate the behavior of a publish_subject during dynamic subscriptions. The test specifically checks how a subject handles adding a new observer within its on_next method, ensuring that the subject can correctly process and emit values under such conditions. Additionally, modifications to the subject_state class in src/rpp/rpp/subjects/details/subject_state.hpp involve changing the container type for storing observers from std::deque to std::list and updating the iteration method in the on_next function.

Changes

File Change Summary
src/tests/rpp/test_subjects.cpp Added a new test case titled "subject handles addition from inside on_next properly" to verify publish_subject behavior when subscribing inside on_next.
src/rpp/rpp/subjects/details/subject_state.hpp Changed observer storage from std::deque to std::list and modified iteration in on_next from std::for_each to a traditional for loop. Updated return type of process_state_unsafe from void to auto.

Poem

🐰 In a world of subjects, we play and weave,
New tests arise, as we believe.
Observers hopping, subscriptions grow,
In the dance of code, watch the logic flow!
With every change, our spirits soar,
A rabbit's joy, forevermore! 🧪


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
src/tests/rpp/test_subjects.cpp (2)

172-172: Test name could be more specific.

Consider renaming the test case to better describe what specific aspect of the subject's behavior is being tested, e.g., "publish_subject handles nested subscription in on_next callback without missing values".


176-188: Test case could be more comprehensive.

The current test only verifies the last value received. Consider enhancing the test to:

  1. Verify that the nested subscription receives values correctly
  2. Verify that no values are missed in the original subscription

Here's a suggested improvement:

     SUBCASE("subscribe inside on_next")
     {
-        int value = {};
+        std::vector<int> values;
+        std::vector<int> nested_values;
         subject.get_observable().subscribe([&subject, &values, &nested_values](int v) {
-            subject.get_observable().subscribe([](int) {});
-            value = v;
+            values.push_back(v);
+            subject.get_observable().subscribe([&nested_values](int v) {
+                nested_values.push_back(v);
+            });
         });

         for (size_t i = 0; i < 100; ++i)
             subject.get_observer().on_next(i);

-        REQUIRE(value == 100);
+        // Verify original subscription received all values
+        REQUIRE(values.size() == 100);
+        for (size_t i = 0; i < 100; ++i)
+            REQUIRE(values[i] == i);
+
+        // Verify nested subscriptions receive subsequent values
+        REQUIRE(nested_values.size() == 99); // First value missed as subscription happens after
+        for (size_t i = 0; i < 99; ++i)
+            REQUIRE(nested_values[i] == i + 1);
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b212dbc and 1529116.

📒 Files selected for processing (1)
  • src/tests/rpp/test_subjects.cpp (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: Cache deps on ci-macos for Release

for (size_t i = 0; i < 100; ++i)
subject.get_observer().on_next(i);

REQUIRE(value == 100);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix incorrect assertion.

The test emits values from 0 to 99, so the last value would be 99, not 100.

Apply this diff to fix the assertion:

-        REQUIRE(value == 100);
+        REQUIRE(value == 99);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
REQUIRE(value == 100);
REQUIRE(value == 99);

Copy link
Contributor

github-actions bot commented Jan 10, 2025

BENCHMARK RESULTS (AUTOGENERATED)

ci-ubuntu-gcc

General

name rxcpp rpp prev rpp ratio rpp no optimization
Subscribe empty callbacks to empty observable 300.32 ns 1.55 ns 1.54 ns 1.00 1.85 ns
Subscribe empty callbacks to empty observable via pipe operator 302.84 ns 1.54 ns 1.54 ns 1.00 1.85 ns

Sources

name rxcpp rpp prev rpp ratio rpp no optimization
from array of 1 - create + subscribe + immediate 687.24 ns 0.31 ns 0.31 ns 1.00 0.31 ns
from array of 1 - create + subscribe + current_thread 1059.06 ns 3.71 ns 3.42 ns 1.08 3.71 ns
concat_as_source of just(1 immediate) create + subscribe 2223.67 ns 112.15 ns 119.40 ns 0.94 118.33 ns
defer from array of 1 - defer + create + subscribe + immediate 721.59 ns 0.31 ns 0.31 ns 1.00 0.31 ns
interval - interval + take(3) + subscribe + immediate 2108.48 ns 59.23 ns 59.23 ns 1.00 59.38 ns
interval - interval + take(3) + subscribe + current_thread 2987.56 ns 32.46 ns 32.46 ns 1.00 33.99 ns
from array of 1 - create + as_blocking + subscribe + new_thread 29126.94 ns 27363.81 ns 27513.55 ns 0.99 28154.85 ns
from array of 1000 - create + as_blocking + subscribe + new_thread 36971.63 ns 51711.59 ns 50418.20 ns 1.03 51542.23 ns
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe 3458.64 ns 141.09 ns 134.69 ns 1.05 149.77 ns

Filtering Operators

name rxcpp rpp prev rpp ratio rpp no optimization
immediate_just+take(1)+subscribe 1125.10 ns 0.31 ns 0.31 ns 1.00 0.31 ns
immediate_just+filter(true)+subscribe 831.39 ns 0.31 ns 0.31 ns 1.00 0.31 ns
immediate_just(1,2)+skip(1)+subscribe 999.77 ns 0.31 ns 0.31 ns 1.00 0.31 ns
immediate_just(1,1,2)+distinct_until_changed()+subscribe 891.63 ns 0.31 ns 0.31 ns 1.00 0.31 ns
immediate_just(1,2)+first()+subscribe 1260.57 ns 0.31 ns 0.31 ns 1.00 0.31 ns
immediate_just(1,2)+last()+subscribe 915.28 ns 0.31 ns 0.31 ns 1.00 0.31 ns
immediate_just+take_last(1)+subscribe 1190.11 ns 17.91 ns 19.67 ns 0.91 19.45 ns
immediate_just(1,2,3)+element_at(1)+subscribe 831.75 ns 0.31 ns 0.31 ns 1.00 0.31 ns

Schedulers

name rxcpp rpp prev rpp ratio rpp no optimization
immediate scheduler create worker + schedule 264.86 ns 0.46 ns 0.62 ns 0.75 1.54 ns
current_thread scheduler create worker + schedule 367.02 ns 4.64 ns 4.94 ns 0.94 4.63 ns
current_thread scheduler create worker + schedule + recursive schedule 831.24 ns 60.93 ns 61.44 ns 0.99 60.78 ns

Transforming Operators

name rxcpp rpp prev rpp ratio rpp no optimization
immediate_just+map(v*2)+subscribe 859.60 ns 0.31 ns 0.31 ns 1.00 0.31 ns
immediate_just+scan(10, std::plus)+subscribe 897.63 ns 0.31 ns 0.31 ns 1.00 0.31 ns
immediate_just+flat_map(immediate_just(v*2))+subscribe 2340.27 ns 141.57 ns 139.02 ns 1.02 167.38 ns
immediate_just+buffer(2)+subscribe 1541.59 ns 13.59 ns 13.90 ns 0.98 17.77 ns
immediate_just+window(2)+subscribe + subscsribe inner 2409.89 ns 1315.67 ns 1305.51 ns 1.01 1428.72 ns

Conditional Operators

name rxcpp rpp prev rpp ratio rpp no optimization
immediate_just+take_while(false)+subscribe 839.20 ns - - 0.00 -
immediate_just+take_while(true)+subscribe 863.37 ns 0.31 ns 0.31 ns 1.00 0.31 ns

Utility Operators

name rxcpp rpp prev rpp ratio rpp no optimization
immediate_just(1)+subscribe_on(immediate)+subscribe 1997.85 ns 0.31 ns 0.31 ns 1.00 0.31 ns

Combining Operators

name rxcpp rpp prev rpp ratio rpp no optimization
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 3389.10 ns 139.74 ns 141.09 ns 0.99 174.11 ns
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 3634.05 ns 156.47 ns 154.18 ns 1.01 161.88 ns
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 129.38 ns 136.63 ns 0.95 159.96 ns
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 3597.01 ns 392.72 ns 422.58 ns 0.93 368.75 ns
immediate_just(1) + zip(immediate_just(2)) + subscribe 2131.17 ns 209.60 ns 217.91 ns 0.96 212.02 ns
immediate_just(immediate_just(1), immediate_just(1)) + concat() + subscribe 3107.53 ns 218.46 ns 223.43 ns 0.98 253.84 ns

Subjects

name rxcpp rpp prev rpp ratio rpp no optimization
publish_subject with 1 observer - on_next 34.58 ns 14.71 ns 23.08 ns 0.64 14.87 ns
subscribe 100 observers to publish_subject 202404.00 ns 18930.91 ns 16021.04 ns 1.18 17549.30 ns
100 on_next to 100 observers to publish_subject 27156.93 ns 16771.81 ns 17139.20 ns 0.98 19844.79 ns

Scenarios

name rxcpp rpp prev rpp ratio rpp no optimization
basic sample 1500.16 ns 13.90 ns 13.28 ns 1.05 22.85 ns
basic sample with immediate scheduler 1404.59 ns 5.55 ns 5.55 ns 1.00 16.35 ns
mix operators with disposables and without disposables 6347.26 ns 1426.73 ns 1406.92 ns 1.01 1875.61 ns
single disposable and looooooong indentity chain 25445.98 ns 1022.15 ns 1082.54 ns 0.94 5173.30 ns

Aggregating Operators

name rxcpp rpp prev rpp ratio rpp no optimization
immediate_just+reduce(10, std::plus)+subscribe 985.33 ns 0.31 ns 0.31 ns 1.00 0.31 ns

Error Handling Operators

name rxcpp rpp prev rpp ratio rpp no optimization
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 2081.39 ns 995.99 ns 986.65 ns 1.01 1031.39 ns
create(on_error())+retry(1)+subscribe 596.49 ns 124.61 ns 110.10 ns 1.13 114.41 ns

ci-macos

General

name rxcpp rpp prev rpp ratio rpp no optimization
Subscribe empty callbacks to empty observable 370.69 ns 0.93 ns 0.47 ns 1.98 1.01 ns
Subscribe empty callbacks to empty observable via pipe operator 361.77 ns 0.96 ns 0.47 ns 2.04 1.01 ns

Sources

name rxcpp rpp prev rpp ratio rpp no optimization
from array of 1 - create + subscribe + immediate 695.11 ns 0.32 ns 0.31 ns 1.04 0.36 ns
from array of 1 - create + subscribe + current_thread 912.68 ns 4.14 ns 4.15 ns 1.00 4.31 ns
concat_as_source of just(1 immediate) create + subscribe 2073.30 ns 165.94 ns 177.96 ns 0.93 171.77 ns
defer from array of 1 - defer + create + subscribe + immediate 764.87 ns 0.31 ns 0.31 ns 1.01 0.34 ns
interval - interval + take(3) + subscribe + immediate 1922.58 ns 50.82 ns 49.72 ns 1.02 55.93 ns
interval - interval + take(3) + subscribe + current_thread 2349.03 ns 29.46 ns 29.36 ns 1.00 32.04 ns
from array of 1 - create + as_blocking + subscribe + new_thread 23728.72 ns 23144.49 ns 16740.23 ns 1.38 19817.72 ns
from array of 1000 - create + as_blocking + subscribe + new_thread 29967.95 ns 26045.83 ns 23141.67 ns 1.13 28180.53 ns
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe 3052.28 ns 176.06 ns 195.50 ns 0.90 191.67 ns

Filtering Operators

name rxcpp rpp prev rpp ratio rpp no optimization
immediate_just+take(1)+subscribe 1070.47 ns 0.31 ns 0.32 ns 0.97 0.32 ns
immediate_just+filter(true)+subscribe 801.11 ns 0.31 ns 0.32 ns 0.96 0.31 ns
immediate_just(1,2)+skip(1)+subscribe 1036.21 ns 0.31 ns 0.34 ns 0.93 0.34 ns
immediate_just(1,1,2)+distinct_until_changed()+subscribe 806.64 ns 0.32 ns 0.34 ns 0.95 0.34 ns
immediate_just(1,2)+first()+subscribe 1288.94 ns 0.31 ns 0.36 ns 0.87 0.32 ns
immediate_just(1,2)+last()+subscribe 941.17 ns 0.53 ns 0.91 ns 0.58 0.91 ns
immediate_just+take_last(1)+subscribe 1119.09 ns 0.31 ns 0.33 ns 0.96 0.32 ns
immediate_just(1,2,3)+element_at(1)+subscribe 795.31 ns 0.31 ns 0.38 ns 0.82 0.34 ns

Schedulers

name rxcpp rpp prev rpp ratio rpp no optimization
immediate scheduler create worker + schedule 293.37 ns 0.93 ns 0.47 ns 1.98 0.99 ns
current_thread scheduler create worker + schedule 417.46 ns 4.37 ns 4.06 ns 1.08 4.43 ns
current_thread scheduler create worker + schedule + recursive schedule 695.95 ns 63.43 ns 61.69 ns 1.03 66.76 ns

Transforming Operators

name rxcpp rpp prev rpp ratio rpp no optimization
immediate_just+map(v*2)+subscribe 801.81 ns 2.47 ns 2.76 ns 0.89 2.50 ns
immediate_just+scan(10, std::plus)+subscribe 917.98 ns 0.31 ns 0.34 ns 0.93 0.34 ns
immediate_just+flat_map(immediate_just(v*2))+subscribe 1981.08 ns 182.69 ns 191.86 ns 0.95 200.36 ns
immediate_just+buffer(2)+subscribe 945.87 ns 15.34 ns 19.49 ns 0.79 17.84 ns
immediate_just+window(2)+subscribe + subscsribe inner 1866.54 ns 968.67 ns 1072.01 ns 0.90 1038.61 ns

Conditional Operators

name rxcpp rpp prev rpp ratio rpp no optimization
immediate_just+take_while(false)+subscribe 786.67 ns - - 0.00 -
immediate_just+take_while(true)+subscribe 805.80 ns 0.31 ns 0.34 ns 0.91 0.31 ns

Utility Operators

name rxcpp rpp prev rpp ratio rpp no optimization
immediate_just(1)+subscribe_on(immediate)+subscribe 1800.82 ns 2.08 ns 1.87 ns 1.11 1.94 ns

Combining Operators

name rxcpp rpp prev rpp ratio rpp no optimization
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 2785.96 ns 201.25 ns 211.83 ns 0.95 219.73 ns
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 3017.92 ns 199.08 ns 223.47 ns 0.89 218.90 ns
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 202.92 ns 220.23 ns 0.92 215.12 ns
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 2933.29 ns 498.49 ns 535.99 ns 0.93 534.91 ns
immediate_just(1) + zip(immediate_just(2)) + subscribe 1956.21 ns 325.09 ns 347.09 ns 0.94 342.85 ns
immediate_just(immediate_just(1), immediate_just(1)) + concat() + subscribe 2815.00 ns 321.36 ns 352.22 ns 0.91 361.13 ns

Subjects

name rxcpp rpp prev rpp ratio rpp no optimization
publish_subject with 1 observer - on_next 41.28 ns 21.46 ns 23.15 ns 0.93 23.78 ns
subscribe 100 observers to publish_subject 128833.33 ns 16565.83 ns 16153.75 ns 1.03 18806.02 ns
100 on_next to 100 observers to publish_subject 32925.78 ns 14958.33 ns 14241.78 ns 1.05 16335.18 ns

Scenarios

name rxcpp rpp prev rpp ratio rpp no optimization
basic sample 1263.24 ns 12.05 ns 11.33 ns 1.06 24.89 ns
basic sample with immediate scheduler 1223.02 ns 5.40 ns 5.00 ns 1.08 12.28 ns
mix operators with disposables and without disposables 5582.73 ns 1372.83 ns 1459.39 ns 0.94 1659.51 ns
single disposable and looooooong indentity chain 16323.41 ns 1663.86 ns 1787.56 ns 0.93 3681.94 ns

Aggregating Operators

name rxcpp rpp prev rpp ratio rpp no optimization
immediate_just+reduce(10, std::plus)+subscribe 945.31 ns 0.31 ns 0.44 ns 0.71 0.33 ns

Error Handling Operators

name rxcpp rpp prev rpp ratio rpp no optimization
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 3300.14 ns 2524.83 ns 2985.82 ns 0.85 2745.50 ns
create(on_error())+retry(1)+subscribe 674.33 ns 167.18 ns 185.47 ns 0.90 184.16 ns

ci-ubuntu-clang

General

name rxcpp rpp prev rpp ratio rpp no optimization
Subscribe empty callbacks to empty observable 267.95 ns 1.54 ns 1.54 ns 1.00 0.64 ns
Subscribe empty callbacks to empty observable via pipe operator 271.03 ns 1.54 ns 1.54 ns 1.00 0.63 ns

Sources

name rxcpp rpp prev rpp ratio rpp no optimization
from array of 1 - create + subscribe + immediate 553.33 ns 0.31 ns 0.31 ns 1.00 0.31 ns
from array of 1 - create + subscribe + current_thread 786.74 ns 4.01 ns 4.01 ns 1.00 4.01 ns
concat_as_source of just(1 immediate) create + subscribe 2373.63 ns 130.03 ns 129.44 ns 1.00 130.56 ns
defer from array of 1 - defer + create + subscribe + immediate 802.34 ns 0.31 ns 0.31 ns 1.00 0.31 ns
interval - interval + take(3) + subscribe + immediate 2249.59 ns 58.31 ns 58.30 ns 1.00 58.31 ns
interval - interval + take(3) + subscribe + current_thread 3178.97 ns 30.88 ns 30.86 ns 1.00 31.50 ns
from array of 1 - create + as_blocking + subscribe + new_thread 29425.54 ns 28038.35 ns 27626.36 ns 1.01 28538.72 ns
from array of 1000 - create + as_blocking + subscribe + new_thread 41186.50 ns 38477.74 ns 36648.62 ns 1.05 37600.11 ns
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe 3685.71 ns 148.18 ns 148.97 ns 0.99 150.38 ns

Filtering Operators

name rxcpp rpp prev rpp ratio rpp no optimization
immediate_just+take(1)+subscribe 1173.50 ns 0.31 ns 0.31 ns 1.00 0.31 ns
immediate_just+filter(true)+subscribe 864.32 ns 0.31 ns 0.31 ns 1.00 0.31 ns
immediate_just(1,2)+skip(1)+subscribe 1118.57 ns 0.31 ns 0.31 ns 1.00 0.31 ns
immediate_just(1,1,2)+distinct_until_changed()+subscribe 910.19 ns 0.31 ns 0.62 ns 0.50 0.31 ns
immediate_just(1,2)+first()+subscribe 1423.47 ns 0.31 ns 0.31 ns 1.00 0.31 ns
immediate_just(1,2)+last()+subscribe 1027.44 ns 0.31 ns 0.31 ns 1.00 0.31 ns
immediate_just+take_last(1)+subscribe 1213.93 ns 0.31 ns 0.31 ns 1.00 0.31 ns
immediate_just(1,2,3)+element_at(1)+subscribe 894.79 ns 0.31 ns 0.31 ns 1.00 0.31 ns

Schedulers

name rxcpp rpp prev rpp ratio rpp no optimization
immediate scheduler create worker + schedule 279.93 ns 0.63 ns 0.63 ns 1.00 1.54 ns
current_thread scheduler create worker + schedule 420.80 ns 4.32 ns 4.01 ns 1.08 4.01 ns
current_thread scheduler create worker + schedule + recursive schedule 874.57 ns 54.81 ns 55.45 ns 0.99 61.92 ns

Transforming Operators

name rxcpp rpp prev rpp ratio rpp no optimization
immediate_just+map(v*2)+subscribe 875.43 ns 0.31 ns 0.31 ns 1.00 0.31 ns
immediate_just+scan(10, std::plus)+subscribe 1011.57 ns 0.62 ns 0.31 ns 2.00 0.31 ns
immediate_just+flat_map(immediate_just(v*2))+subscribe 2283.70 ns 139.26 ns 140.25 ns 0.99 137.07 ns
immediate_just+buffer(2)+subscribe 1561.33 ns 14.19 ns 13.58 ns 1.05 14.83 ns
immediate_just+window(2)+subscribe + subscsribe inner 2485.73 ns 895.28 ns 911.55 ns 0.98 894.53 ns

Conditional Operators

name rxcpp rpp prev rpp ratio rpp no optimization
immediate_just+take_while(false)+subscribe 866.64 ns - - 0.00 -
immediate_just+take_while(true)+subscribe 875.59 ns 0.31 ns 0.31 ns 1.00 0.31 ns

Utility Operators

name rxcpp rpp prev rpp ratio rpp no optimization
immediate_just(1)+subscribe_on(immediate)+subscribe 2032.21 ns 0.31 ns 0.31 ns 1.00 0.31 ns

Combining Operators

name rxcpp rpp prev rpp ratio rpp no optimization
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 3317.16 ns 161.40 ns 159.33 ns 1.01 156.86 ns
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 3765.98 ns 139.19 ns 139.67 ns 1.00 140.68 ns
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 140.12 ns 141.58 ns 0.99 138.64 ns
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 3404.17 ns 378.18 ns 396.37 ns 0.95 378.58 ns
immediate_just(1) + zip(immediate_just(2)) + subscribe 2322.83 ns 200.86 ns 195.38 ns 1.03 198.54 ns
immediate_just(immediate_just(1), immediate_just(1)) + concat() + subscribe 3261.20 ns 221.68 ns 223.16 ns 0.99 227.31 ns

Subjects

name rxcpp rpp prev rpp ratio rpp no optimization
publish_subject with 1 observer - on_next 54.08 ns 19.15 ns 17.72 ns 1.08 20.17 ns
subscribe 100 observers to publish_subject 207790.17 ns 17356.09 ns 16194.69 ns 1.07 17296.60 ns
100 on_next to 100 observers to publish_subject 37392.52 ns 20127.81 ns 23516.11 ns 0.86 20131.32 ns

Scenarios

name rxcpp rpp prev rpp ratio rpp no optimization
basic sample 1335.79 ns 11.42 ns 11.11 ns 1.03 20.72 ns
basic sample with immediate scheduler 1340.79 ns 5.86 ns 5.86 ns 1.00 6.49 ns
mix operators with disposables and without disposables 6562.21 ns 1182.67 ns 1167.99 ns 1.01 1619.72 ns
single disposable and looooooong indentity chain 27403.61 ns 1236.79 ns 1244.48 ns 0.99 4484.37 ns

Aggregating Operators

name rxcpp rpp prev rpp ratio rpp no optimization
immediate_just+reduce(10, std::plus)+subscribe 1026.98 ns 0.31 ns 0.31 ns 1.00 0.31 ns

Error Handling Operators

name rxcpp rpp prev rpp ratio rpp no optimization
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 2196.55 ns 1160.14 ns 1155.68 ns 1.00 1182.82 ns
create(on_error())+retry(1)+subscribe 678.05 ns 137.83 ns 138.78 ns 0.99 139.57 ns

ci-windows

General

name rxcpp rpp prev rpp ratio rpp no optimization
Subscribe empty callbacks to empty observable 574.93 ns 1.85 ns 1.85 ns 1.00 1.85 ns
Subscribe empty callbacks to empty observable via pipe operator 584.46 ns 1.85 ns 1.85 ns 1.00 1.85 ns

Sources

name rxcpp rpp prev rpp ratio rpp no optimization
from array of 1 - create + subscribe + immediate 1160.52 ns 5.24 ns 5.86 ns 0.89 5.86 ns
from array of 1 - create + subscribe + current_thread 1427.62 ns 15.46 ns 15.75 ns 0.98 15.75 ns
concat_as_source of just(1 immediate) create + subscribe 3766.99 ns 167.17 ns 164.24 ns 1.02 180.64 ns
defer from array of 1 - defer + create + subscribe + immediate 1191.89 ns 5.24 ns 5.55 ns 0.94 5.86 ns
interval - interval + take(3) + subscribe + immediate 3237.69 ns 140.80 ns 140.92 ns 1.00 142.10 ns
interval - interval + take(3) + subscribe + current_thread 3723.55 ns 61.72 ns 60.09 ns 1.03 61.85 ns
from array of 1 - create + as_blocking + subscribe + new_thread 139387.50 ns 129550.00 ns 112310.00 ns 1.15 121733.33 ns
from array of 1000 - create + as_blocking + subscribe + new_thread 140442.86 ns 148628.57 ns 132266.67 ns 1.12 139000.00 ns
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe 5388.54 ns 207.96 ns 200.24 ns 1.04 210.23 ns

Filtering Operators

name rxcpp rpp prev rpp ratio rpp no optimization
immediate_just+take(1)+subscribe 1847.16 ns 19.73 ns 19.74 ns 1.00 21.29 ns
immediate_just+filter(true)+subscribe 1626.84 ns 18.80 ns 18.82 ns 1.00 21.59 ns
immediate_just(1,2)+skip(1)+subscribe 1702.10 ns 18.50 ns 18.51 ns 1.00 21.60 ns
immediate_just(1,1,2)+distinct_until_changed()+subscribe 1360.00 ns 23.44 ns 23.50 ns 1.00 26.84 ns
immediate_just(1,2)+first()+subscribe 2384.38 ns 17.28 ns 17.29 ns 1.00 19.45 ns
immediate_just(1,2)+last()+subscribe 1464.59 ns 18.51 ns 18.51 ns 1.00 22.83 ns
immediate_just+take_last(1)+subscribe 2007.67 ns 64.24 ns 64.79 ns 0.99 69.69 ns
immediate_just(1,2,3)+element_at(1)+subscribe 1646.05 ns 21.90 ns 21.93 ns 1.00 23.46 ns

Schedulers

name rxcpp rpp prev rpp ratio rpp no optimization
immediate scheduler create worker + schedule 485.90 ns 4.63 ns 4.94 ns 0.94 4.01 ns
current_thread scheduler create worker + schedule 651.03 ns 11.71 ns 11.16 ns 1.05 11.19 ns
current_thread scheduler create worker + schedule + recursive schedule 1342.39 ns 101.85 ns 99.92 ns 1.02 97.41 ns

Transforming Operators

name rxcpp rpp prev rpp ratio rpp no optimization
immediate_just+map(v*2)+subscribe 1327.24 ns 18.81 ns 18.81 ns 1.00 21.60 ns
immediate_just+scan(10, std::plus)+subscribe 1445.76 ns 20.96 ns 21.29 ns 0.98 23.75 ns
immediate_just+flat_map(immediate_just(v*2))+subscribe 3870.59 ns 182.18 ns 184.14 ns 0.99 203.50 ns
immediate_just+buffer(2)+subscribe 2321.54 ns 64.96 ns 65.55 ns 0.99 67.82 ns
immediate_just+window(2)+subscribe + subscsribe inner 4051.22 ns 1200.84 ns 1310.67 ns 0.92 1218.19 ns

Conditional Operators

name rxcpp rpp prev rpp ratio rpp no optimization
immediate_just+take_while(false)+subscribe 1380.83 ns 17.57 ns 17.57 ns 1.00 19.12 ns
immediate_just+take_while(true)+subscribe 1312.99 ns 18.82 ns 18.81 ns 1.00 21.59 ns

Utility Operators

name rxcpp rpp prev rpp ratio rpp no optimization
immediate_just(1)+subscribe_on(immediate)+subscribe 3314.83 ns 11.11 ns 11.10 ns 1.00 11.11 ns

Combining Operators

name rxcpp rpp prev rpp ratio rpp no optimization
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 5214.14 ns 195.86 ns 203.87 ns 0.96 216.99 ns
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 5895.57 ns 185.29 ns 194.07 ns 0.95 205.36 ns
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 186.60 ns 195.08 ns 0.96 193.13 ns
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 6211.29 ns 434.78 ns 438.11 ns 0.99 455.22 ns
immediate_just(1) + zip(immediate_just(2)) + subscribe 3850.90 ns 504.71 ns 522.22 ns 0.97 517.95 ns
immediate_just(immediate_just(1), immediate_just(1)) + concat() + subscribe 4957.08 ns 322.71 ns 328.46 ns 0.98 337.17 ns

Subjects

name rxcpp rpp prev rpp ratio rpp no optimization
publish_subject with 1 observer - on_next 36.90 ns 29.29 ns 19.94 ns 1.47 29.62 ns
subscribe 100 observers to publish_subject 261000.00 ns 26292.50 ns 27814.29 ns 0.95 26834.09 ns
100 on_next to 100 observers to publish_subject 54838.89 ns 32840.00 ns 33112.90 ns 0.99 36386.21 ns

Scenarios

name rxcpp rpp prev rpp ratio rpp no optimization
basic sample 1906.90 ns 95.66 ns 96.33 ns 0.99 112.09 ns
basic sample with immediate scheduler 1931.99 ns 68.20 ns 68.64 ns 0.99 82.08 ns
mix operators with disposables and without disposables 10195.05 ns 1902.58 ns 1892.18 ns 1.01 2560.14 ns
single disposable and looooooong indentity chain 26923.81 ns 1686.09 ns 1699.22 ns 0.99 6479.78 ns

Aggregating Operators

name rxcpp rpp prev rpp ratio rpp no optimization
immediate_just+reduce(10, std::plus)+subscribe 1468.62 ns 19.42 ns 19.43 ns 1.00 22.81 ns

Error Handling Operators

name rxcpp rpp prev rpp ratio rpp no optimization
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 1916.63 ns 353.46 ns 349.92 ns 1.01 364.91 ns
create(on_error())+retry(1)+subscribe 1553.78 ns 137.89 ns 142.97 ns 0.96 139.91 ns

Copy link

codecov bot commented Jan 10, 2025

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 98.62%. Comparing base (b212dbc) to head (622be88).
Report is 1 commits behind head on v2.

Additional details and impacted files
@@            Coverage Diff             @@
##               v2     #697      +/-   ##
==========================================
+ Coverage   98.61%   98.62%   +0.01%     
==========================================
  Files         156      156              
  Lines        9786     9800      +14     
==========================================
+ Hits         9650     9665      +15     
+ Misses        136      135       -1     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
src/tests/rpp/test_subjects.cpp (1)

172-190: Consider memory and performance implications of the test.

While the test correctly verifies the subject's behavior with dynamic subscriptions, it creates 10,000 subscriptions (100 per on_next * 100 emissions) without cleanup. Consider:

  1. Adding explicit cleanup verification
  2. Reducing the number of iterations for faster test execution
  3. Testing unsubscribe scenarios

Apply this diff to improve the test:

 TEST_CASE("subject handles addition from inside on_next properly")
 {
     rpp::subjects::publish_subject<int> subject{};
 
     SUBCASE("subscribe inside on_next")
     {
         int value = {};
+        auto disposable = rpp::composite_disposable_wrapper::make();
         subject.get_observable().subscribe([&subject, &value](int v) {
-            for (int i = 0; i < 100; ++i)
-                subject.get_observable().subscribe([](int) {});
+            for (int i = 0; i < 10; ++i)
+                subject.get_observable().subscribe(disposable, [](int) {});
             value = v;
         });
 
-        for (int i = 0; i < 100; ++i)
+        for (int i = 0; i < 10; ++i)
             subject.get_observer().on_next(i);
 
-        REQUIRE(value == 99);
+        REQUIRE(value == 9);
+        
+        // Verify cleanup
+        disposable.dispose();
     }
+    
+    SUBCASE("unsubscribe inside on_next")
+    {
+        int value = {};
+        auto disposable = rpp::composite_disposable_wrapper::make();
+        
+        subject.get_observable().subscribe(disposable, [&subject, &value](int v) {
+            value = v;
+            disposable.dispose();  // Unsubscribe during on_next
+        });
+        
+        subject.get_observer().on_next(0);
+        subject.get_observer().on_next(1);
+        
+        REQUIRE(value == 0);  // Only first value should be received
+    }
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ae5cbc7 and 7b0bc3c.

📒 Files selected for processing (1)
  • src/tests/rpp/test_subjects.cpp (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (18)
  • GitHub Check: benchmarks ci-windows Debug (Optimizations disabled)
  • GitHub Check: benchmarks ci-windows Debug
  • GitHub Check: benchmarks ci-windows Release (Optimizations disabled)
  • GitHub Check: benchmarks ci-windows Release
  • GitHub Check: tests ci-windows Debug (Optimizations disabled)
  • GitHub Check: tests ci-windows Debug
  • GitHub Check: tests ci-windows Release (Optimizations disabled)
  • GitHub Check: tests ci-ubuntu-clang Debug (Optimizations disabled)
  • GitHub Check: tests ci-ubuntu-clang Debug
  • GitHub Check: tests ci-ubuntu-clang Release (Optimizations disabled)
  • GitHub Check: tests ci-ubuntu-clang Release
  • GitHub Check: tests ci-ubuntu-gcc Debug (Optimizations disabled)
  • GitHub Check: tests ci-ubuntu-gcc Debug
  • GitHub Check: Sanitize - ubsan
  • GitHub Check: Sanitize - lsan
  • GitHub Check: tests ci-ubuntu-gcc Release (Optimizations disabled)
  • GitHub Check: Sanitize - asan
  • GitHub Check: tests ci-ubuntu-gcc Release

@victimsnino
Copy link
Owner Author

@coderabbitai full review

Copy link
Contributor

coderabbitai bot commented Jan 10, 2025

✅ Actions performed

Full review triggered.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
src/rpp/rpp/subjects/details/subject_state.hpp (1)

124-133: Consider implementing a more robust subscription modification pattern

The current implementation attempts to handle dynamic subscriptions during on_next, but this creates complex thread-safety concerns. Consider these architectural alternatives:

  1. Queue-based approach: Buffer subscription modifications during emission
  2. Copy-on-Write pattern: Use a snapshot of observers for the entire emission
  3. Double-buffering: Maintain separate lists for current and pending modifications

Example implementation of the Copy-on-Write pattern:

void on_next(const Type& v)
{
    std::shared_ptr<observers> current_observers;
    {
        std::lock_guard observers_lock{m_mutex};
        if (!std::holds_alternative<shared_observers>(m_state))
            return;
        current_observers = std::get<shared_observers>(m_state);
        if (!current_observers)
            return;
    }

    std::lock_guard serialized_lock{m_serialized_mutex};
    for (const auto& observer : *current_observers)
    {
        observer->on_next(v);
    }
}

This approach:

  • Provides cleaner separation between subscription management and emission
  • Reduces lock contention
  • Makes the code's intent clearer
  • Maintains consistency during emission
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7b0bc3c and 73c834e.

📒 Files selected for processing (1)
  • src/rpp/rpp/subjects/details/subject_state.hpp (1 hunks)
🧰 Additional context used
📓 Learnings (1)
src/rpp/rpp/subjects/details/subject_state.hpp (1)
Learnt from: victimsnino
PR: victimsnino/ReactivePlusPlus#667
File: src/rpp/rpp/subjects/details/subject_state.hpp:168-168
Timestamp: 2024-11-10T04:25:44.618Z
Learning: In the `subject_state` class, observers are not thread-safe, so we should not check `obs->is_disposed()` in the `cleanup_observers` function.

Comment on lines 124 to 133
auto itr = observers->cbegin();
const auto size = observers->size();

observers_lock.unlock();

std::lock_guard lock{m_serialized_mutex};
std::for_each(begin, end, [&](const observer& obs) { obs->on_next(v); });
for (size_t i = 0; i < size; ++i)
{
(itr++)->on_next(v);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Critical: Fix potential race conditions and iterator invalidation

The current implementation has several thread-safety issues:

  1. Race Condition: Capturing size and releasing the lock creates a time window where the deque could be modified, making the size inconsistent with the actual number of observers.
  2. Iterator Invalidation: The iterator might be invalidated if the deque is modified after unlocking.
  3. Memory Safety: The shared_ptr is released while keeping its iterator, which could lead to undefined behavior.

Consider this safer implementation:

-            auto       itr  = observers->cbegin();
-            const auto size = observers->size();
-
-            observers_lock.unlock();
-
-            std::lock_guard lock{m_serialized_mutex};
-            for (size_t i = 0; i < size; ++i)
-            {
-                (itr++)->on_next(v);
-            }
+            std::lock_guard serialized_lock{m_serialized_mutex};
+            // Keep shared_ptr alive during iteration
+            for (const auto& observer : *observers)
+            {
+                observer->on_next(v);
+            }

This solution:

  • Maintains the shared_ptr throughout iteration
  • Prevents iterator invalidation
  • Eliminates the race condition window
  • Simplifies the code
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
auto itr = observers->cbegin();
const auto size = observers->size();
observers_lock.unlock();
std::lock_guard lock{m_serialized_mutex};
std::for_each(begin, end, [&](const observer& obs) { obs->on_next(v); });
for (size_t i = 0; i < size; ++i)
{
(itr++)->on_next(v);
}
std::lock_guard serialized_lock{m_serialized_mutex};
// Keep shared_ptr alive during iteration
for (const auto& observer : *observers)
{
observer->on_next(v);
}

@victimsnino victimsnino marked this pull request as ready for review January 10, 2025 19:48
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
src/tests/rpp/test_subjects.cpp (1)

172-190: Enhance test coverage with edge cases

While the test verifies basic functionality, consider adding test cases for:

  1. Concurrent subscribe/unsubscribe operations
  2. Error handling during subscription
  3. Memory pressure scenarios

Would you like me to generate additional test cases to cover these scenarios?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b212dbc and 73c834e.

📒 Files selected for processing (2)
  • src/rpp/rpp/subjects/details/subject_state.hpp (1 hunks)
  • src/tests/rpp/test_subjects.cpp (1 hunks)
🧰 Additional context used
🪛 GitHub Check: benchmarks ci-macos Release (Optimizations disabled)
src/rpp/rpp/subjects/details/subject_state.hpp

[failure] 132-132:
no member named 'on_next' in 'std::shared_ptr<rpp::details::observers::observer_vtable>'

🪛 GitHub Check: benchmarks ci-ubuntu-clang Release
src/rpp/rpp/subjects/details/subject_state.hpp

[failure] 132-132:
no member named 'on_next' in 'std::shared_ptr<rpp::details::observers::observer_vtable>'

🪛 GitHub Check: Sanitize - ubsan
src/rpp/rpp/subjects/details/subject_state.hpp

[failure] 132-132:
no member named 'on_next' in 'std::shared_ptr<rpp::details::observers::observer_vtable>'

🪛 GitHub Check: Sanitize - lsan
src/rpp/rpp/subjects/details/subject_state.hpp

[failure] 132-132:
no member named 'on_next' in 'std::shared_ptr<rpp::details::observers::observer_vtable>'

🪛 GitHub Check: Sanitize - asan
src/rpp/rpp/subjects/details/subject_state.hpp

[failure] 132-132:
no member named 'on_next' in 'std::shared_ptr<rpp::details::observers::observer_vtable>'

🪛 GitHub Actions: CI v2
src/rpp/rpp/subjects/details/subject_state.hpp

[error] 132-132: no member named 'on_next' in 'std::shared_ptr<rpp::details::observers::observer_vtable>'

🔇 Additional comments (1)
src/rpp/rpp/subjects/details/subject_state.hpp (1)

124-133: ⚠️ Potential issue

Critical: Fix multiple thread-safety and memory safety issues

The current implementation has several critical issues:

  1. Race Condition: Unlocking after capturing size and iterator creates a window where the deque can be modified
  2. Iterator Invalidation: The iterator might be invalidated after unlocking
  3. Memory Safety: The shared_ptr (observers) might be released while keeping its iterator
  4. Method Access: Pipeline failures indicate issues with accessing 'on_next'

Apply this safer implementation:

-            auto       itr  = observers->cbegin();
-            const auto size = observers->size();
-
-            observers_lock.unlock();
-
-            std::lock_guard lock{m_serialized_mutex};
-            for (size_t i = 0; i < size; ++i)
-            {
-                (itr++)->on_next(v);
-            }
+            std::lock_guard serialized_lock{m_serialized_mutex};
+            for (const auto& observer : *observers)
+            {
+                observer->on_next(v);
+            }

This solution:

  • Maintains the shared_ptr throughout iteration
  • Prevents iterator invalidation
  • Eliminates the race condition window
  • Uses proper member access syntax

Likely invalid or redundant comment.

🧰 Tools
🪛 GitHub Check: benchmarks ci-macos Release (Optimizations disabled)

[failure] 132-132:
no member named 'on_next' in 'std::shared_ptr<rpp::details::observers::observer_vtable>'

🪛 GitHub Check: benchmarks ci-ubuntu-clang Release

[failure] 132-132:
no member named 'on_next' in 'std::shared_ptr<rpp::details::observers::observer_vtable>'

🪛 GitHub Check: Sanitize - ubsan

[failure] 132-132:
no member named 'on_next' in 'std::shared_ptr<rpp::details::observers::observer_vtable>'

🪛 GitHub Check: Sanitize - lsan

[failure] 132-132:
no member named 'on_next' in 'std::shared_ptr<rpp::details::observers::observer_vtable>'

🪛 GitHub Check: Sanitize - asan

[failure] 132-132:
no member named 'on_next' in 'std::shared_ptr<rpp::details::observers::observer_vtable>'

🪛 GitHub Actions: CI v2

[error] 132-132: no member named 'on_next' in 'std::shared_ptr<rpp::details::observers::observer_vtable>'

Comment on lines +179 to +182
subject.get_observable().subscribe([&subject, &value](int v) {
for (int i = 0; i < 100; ++i)
subject.get_observable().subscribe([](int) {});
value = v;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve subscription management in test

The test creates 100 subscriptions without proper cleanup, which could lead to resource leaks.

Apply this diff to properly manage subscriptions:

-            for (int i = 0; i < 100; ++i)
-                subject.get_observable().subscribe([](int) {});
+            auto d = rpp::composite_disposable_wrapper::make();
+            for (int i = 0; i < 100; ++i)
+                subject.get_observable().subscribe(d, [](int) {});

Committable suggestion skipped: line range outside the PR's diff.

@victimsnino victimsnino merged commit 9ab9e07 into v2 Jan 11, 2025
55 checks passed
@victimsnino victimsnino deleted the fix_subject branch January 11, 2025 11:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant