Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Try raw array to fix subject issue #698

Closed
wants to merge 12 commits into from
Closed

Try raw array to fix subject issue #698

wants to merge 12 commits into from

Conversation

victimsnino
Copy link
Owner

@victimsnino victimsnino commented Jan 11, 2025

Summary by CodeRabbit

  • New Features

    • Introduced a new shared_observers class to enhance observer management in the subject implementation.
    • Improved dynamic observer list handling with more robust resizing and access mechanisms.
  • Tests

    • Added a test case to verify subject behavior when adding subscriptions during value emissions.

Copy link
Contributor

coderabbitai bot commented Jan 11, 2025

📝 Walkthrough

Walkthrough

The pull request introduces a significant refactoring of the observer management in the subject_state class. The primary change is replacing the std::deque container with a custom shared_observers class that uses std::shared_ptr for managing observers. This new implementation provides methods for initialization, resizing, and safe access to the observer array. Additionally, a new test case has been added to verify the dynamic subscription behavior of the publish_subject.

Changes

File Change Summary
src/rpp/rpp/subjects/details/subject_state.hpp - Added shared_observers class with methods for observer management
- Replaced std::deque with custom observer management class
- Updated type aliases and state handling
src/tests/rpp/test_subjects.cpp - Added new test case for dynamic subject subscriptions
- Verifies multiple observer additions during emission

Sequence Diagram

sequenceDiagram
    participant Subject
    participant SharedObservers
    participant Observer1
    participant Observer2

    Subject->>SharedObservers: Create with initial size
    Subject->>SharedObservers: Add Observer1
    Subject->>SharedObservers: Add Observer2
    Subject->>SharedObservers: Get observer span
    SharedObservers-->>Subject: Return current observers
    Subject->>Observer1: Notify
    Subject->>Observer2: Notify
Loading

Poem

🐰 Observers dance, a dynamic array's delight,
Shared pointers twirl, making subscriptions bright.
From deque to custom class, we leap and bound,
Flexibility and speed, a rabbit's playground found!
Hop on, subscribe, the subject's magic unfurled! 🎉

Finishing Touches

  • 📝 Generate Docstrings (Beta)

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@victimsnino victimsnino changed the title Try raw array Try raw array to fix subject issue Jan 11, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
src/rpp/rpp/subjects/details/subject_state.hpp (1)

23-23: Remove unused include directive <list>.

The header file <list> is included but not used in the code. Removing it can improve compilation time and reduce unnecessary dependencies.

Apply this diff to remove the unused include:

-#include <list>
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b212dbc and 0167d6a.

📒 Files selected for processing (2)
  • src/rpp/rpp/subjects/details/subject_state.hpp (6 hunks)
  • src/tests/rpp/test_subjects.cpp (1 hunks)
🧰 Additional context used
📓 Learnings (1)
src/rpp/rpp/subjects/details/subject_state.hpp (1)
Learnt from: victimsnino
PR: victimsnino/ReactivePlusPlus#667
File: src/rpp/rpp/subjects/details/subject_state.hpp:168-168
Timestamp: 2024-11-10T04:25:44.618Z
Learning: In the `subject_state` class, observers are not thread-safe, so we should not check `obs->is_disposed()` in the `cleanup_observers` function.
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: Cache deps on ci-macos for Debug
🔇 Additional comments (3)
src/rpp/rpp/subjects/details/subject_state.hpp (2)

167-175: Proper cleanup of observers without checking is_disposed().

Based on previous learnings, it's important not to check obs->is_disposed() in the cleanup_observers function since observers are not thread-safe. Your implementation correctly avoids this by only comparing pointers.


132-138: ⚠️ Potential issue

Potential data race due to releasing lock before securing mutex.

In the on_next method, releasing observers_lock before acquiring m_serialized_mutex may lead to data races if other threads modify m_state between these operations.

Consider holding the observers_lock until after acquiring m_serialized_mutex to ensure thread safety.

Apply this diff to adjust the locking sequence:

 process_state_unsafe(m_state, [&](shared_observers observers) {
-    observers_lock.unlock();

-    std::lock_guard lock{m_serialized_mutex};
+    std::lock_guard lock{m_serialized_mutex};
+    observers_lock.unlock();
src/tests/rpp/test_subjects.cpp (1)

172-190: 🛠️ Refactor suggestion

Potential performance issues due to excessive subscriptions inside on_next.

In this test case, you are adding 100 new subscriptions inside each on_next call. Given that you emit values from 0 to 99, this results in 10,000 subscriptions, which can severely impact performance and test execution time.

Consider modifying the test to limit the number of subscriptions or to add subscriptions only once. If the goal is to stress-test the observer management, clarify this intention in the test comments.

Apply this diff to adjust the test:

 int value = {};
 subject.get_observable().subscribe([&subject, &value](int v) {
-    for (int i = 0; i < 100; ++i)
-        subject.get_observable().subscribe([](int) {});
+    static bool subscribed = false;
+    if (!subscribed) {
+        for (int i = 0; i < 100; ++i)
+            subject.get_observable().subscribe([](int) {});
+        subscribed = true;
+    }
     value = v;
 });

This change adds the additional subscriptions only once, improving performance while maintaining the test's purpose.

Comment on lines +91 to +92
std::shared_ptr<observer[]> m_observers{};
size_t m_size{};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ensure proper initialization of m_observers member variable.

The default-initialized std::shared_ptr<observer[]> m_observers{} may hold a nullptr. Accessing m_observers.get() without checking for nullptr could lead to undefined behavior.

Consider initializing m_observers with a non-null value or adding checks before usage.

Comment on lines +76 to +78
shared_observers(size_t size)
: m_observers{std::make_shared<observer[]>(size)}
, m_size{size}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Incorrect use of std::make_shared with array types may lead to undefined behavior.

Using std::make_shared with array types is not supported in C++. This can result in compilation errors or undefined behavior.

To fix this issue, allocate the array using new and wrap it with std::shared_ptr, specifying a custom deleter:

-shared_observers(size_t size)
-    : m_observers{std::make_shared<observer[]>(size)}
+shared_observers(size_t size)
+    : m_observers{std::shared_ptr<observer[]>(new observer[size], std::default_delete<observer[]>())}

Alternatively, consider using std::vector for automatic memory management:

-class shared_observers
+class shared_observers
+{
+public:
+    shared_observers() = default;
+
+    shared_observers(size_t size)
+        : m_observers(size)
+    {
+    }
+
+    std::span<observer> span() const noexcept { return {m_observers.data(), m_observers.size()}; }
+
+    void shrink(size_t new_size)
+    {
+        if (new_size > m_observers.size()) throw std::out_of_range{"shrink: new_size > m_size"};
+        m_observers.resize(new_size);
+    }
+
+private:
+    std::vector<observer> m_observers{};
+};

Committable suggestion skipped: line range outside the PR's diff.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
src/rpp/rpp/subjects/details/subject_state.hpp (1)

76-80: ⚠️ Potential issue

Fix undefined behavior with array type in std::make_shared.

Using std::make_shared with array types is not supported in C++ and may lead to undefined behavior.

Use std::shared_ptr with explicit array allocation:

-shared_observers(size_t size)
-    : m_observers{std::make_shared<observer[]>(size)}
-    , m_size{size}
-{
-}
+shared_observers(size_t size)
+    : m_observers{std::shared_ptr<observer[]>(new observer[size], std::default_delete<observer[]>())}
+    , m_size{size}
+{
+}
🧹 Nitpick comments (2)
src/rpp/rpp/subjects/details/subject_state.hpp (2)

23-23: Remove unused include.

The <list> header is included but not used in the code.

-#include <list>

108-114: Optimize subscription performance.

The current implementation creates and copies the entire array on each subscription, leading to O(n) complexity and frequent allocations.

Consider implementing a growth strategy similar to std::vector:

class shared_observers {
    static constexpr size_t INITIAL_CAPACITY = 4;
    static size_t calculate_growth(size_t current) {
        return current + (current >> 1); // Grow by 1.5x
    }
public:
    void reserve(size_t new_cap) {
        if (new_cap <= m_size) return;
        auto new_observers = std::shared_ptr<observer[]>(
            new observer[new_cap],
            std::default_delete<observer[]>());
        if (m_observers) {
            std::copy(span().begin(), span().end(),
                     std::span{new_observers.get(), new_cap}.begin());
        }
        m_observers = std::move(new_observers);
        m_capacity = new_cap;
    }
    
    void push_back(observer obs) {
        if (m_size == m_capacity) {
            reserve(m_capacity ? calculate_growth(m_capacity) : INITIAL_CAPACITY);
        }
        m_observers.get()[m_size++] = std::move(obs);
    }
private:
    size_t m_capacity{};
};
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0167d6a and df047d1.

📒 Files selected for processing (1)
  • src/rpp/rpp/subjects/details/subject_state.hpp (6 hunks)
🧰 Additional context used
📓 Learnings (1)
src/rpp/rpp/subjects/details/subject_state.hpp (1)
Learnt from: victimsnino
PR: victimsnino/ReactivePlusPlus#667
File: src/rpp/rpp/subjects/details/subject_state.hpp:168-168
Timestamp: 2024-11-10T04:25:44.618Z
Learning: In the `subject_state` class, observers are not thread-safe, so we should not check `obs->is_disposed()` in the `cleanup_observers` function.
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: Cache deps on ci-windows for Debug
🔇 Additional comments (1)
src/rpp/rpp/subjects/details/subject_state.hpp (1)

82-82: ⚠️ Potential issue

Add null check in span() method.

Accessing m_observers.get() without checking for nullptr could lead to undefined behavior.

-std::span<observer> span() const noexcept { return {m_observers.get(), m_size}; }
+std::span<observer> span() const noexcept
+{
+    if (!m_observers)
+        return {};
+    return {m_observers.get(), m_size};
+}

Likely invalid or redundant comment.

Comment on lines +132 to +139
// we are getting copy of curent observers and obtaining CURRENT begin/end of in case of some new observer would be added during on_next call
process_state_unsafe(m_state, [&](shared_observers observers) {
observers_lock.unlock();

if (!std::holds_alternative<shared_observers>(m_state))
return;

// we are getting copy of curent deque and obtaining CURRENT begin/end of in case of some new observer would be added during on_next call
const auto observers = std::get<shared_observers>(m_state);
if (!observers)
return;

const auto begin = observers->cbegin();
const auto end = observers->cend();

observers_lock.unlock();

std::lock_guard lock{m_serialized_mutex};
std::for_each(begin, end, [&](const observer& obs) { obs->on_next(v); });
std::lock_guard lock{m_serialized_mutex};
rpp::utils::for_each(observers.span(), [&](const observer& obs) { obs->on_next(v); });
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix potential race condition in on_next.

Releasing the lock before iterating over observers could lead to use-after-free if observers are disposed during iteration.

Consider maintaining the lock during iteration or using a thread-safe observer container:

 void on_next(const Type& v)
 {
     std::unique_lock observers_lock{m_mutex};
-    // we are getting copy of curent observers and obtaining CURRENT begin/end of in case of some new observer would be added during on_next call
-    process_state_unsafe(m_state, [&](shared_observers observers) {
-        observers_lock.unlock();
-
-        std::lock_guard lock{m_serialized_mutex};
-        rpp::utils::for_each(observers.span(), [&](const observer& obs) { obs->on_next(v); });
-    });
+    process_state_unsafe(m_state, [&](const shared_observers& observers) {
+        std::lock_guard lock{m_serialized_mutex};
+        // Keep observers_lock during iteration to prevent use-after-free
+        rpp::utils::for_each(observers.span(), [&](const observer& obs) {
+            if (obs) // Add null check for safety
+                obs->on_next(v);
+        });
+    });
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// we are getting copy of curent observers and obtaining CURRENT begin/end of in case of some new observer would be added during on_next call
process_state_unsafe(m_state, [&](shared_observers observers) {
observers_lock.unlock();
if (!std::holds_alternative<shared_observers>(m_state))
return;
// we are getting copy of curent deque and obtaining CURRENT begin/end of in case of some new observer would be added during on_next call
const auto observers = std::get<shared_observers>(m_state);
if (!observers)
return;
const auto begin = observers->cbegin();
const auto end = observers->cend();
observers_lock.unlock();
std::lock_guard lock{m_serialized_mutex};
std::for_each(begin, end, [&](const observer& obs) { obs->on_next(v); });
std::lock_guard lock{m_serialized_mutex};
rpp::utils::for_each(observers.span(), [&](const observer& obs) { obs->on_next(v); });
});
process_state_unsafe(m_state, [&](const shared_observers& observers) {
std::lock_guard lock{m_serialized_mutex};
// Keep observers_lock during iteration to prevent use-after-free
rpp::utils::for_each(observers.span(), [&](const observer& obs) {
if (obs) // Add null check for safety
obs->on_next(v);
});
});

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
src/rpp/rpp/subjects/details/subject_state.hpp (2)

77-81: ⚠️ Potential issue

Fix array allocation in shared_observers constructor.

The use of std::make_shared with arrays is problematic and may lead to undefined behavior.

Use one of these alternatives:

  1. Use std::shared_ptr with explicit array allocation:
-: m_observers{std::make_shared<observer[]>(size)}
+: m_observers{std::shared_ptr<observer[]>(new observer[size], std::default_delete<observer[]>())}
  1. Consider using std::vector for better memory management and safety.

133-139: ⚠️ Potential issue

Fix potential race condition in on_next.

Releasing the lock before iterating over observers could lead to use-after-free if observers are disposed during iteration.

Maintain the lock during iteration:

-process_state_unsafe(m_state, [&](shared_observers observers) {
-    observers_lock.unlock();
+process_state_unsafe(m_state, [&](const shared_observers& observers) {
    std::lock_guard lock{m_serialized_mutex};
    rpp::utils::for_each(observers.span(), [&](const observer& obs) { obs->on_next(v); });
});
🧹 Nitpick comments (2)
src/rpp/rpp/subjects/details/subject_state.hpp (2)

109-115: Consider pre-allocating space for observers.

The current implementation creates a new array and copies all observers on each subscription, which could be inefficient for large observer lists.

Consider pre-allocating with some extra capacity to reduce reallocations:

-auto       new_observers = shared_observers{old_size + 1};
+constexpr size_t extra_capacity = 4;  // Adjust based on typical usage
+auto       new_observers = shared_observers{old_size + extra_capacity};

168-176: Optimize observer cleanup.

The current implementation always allocates a new array of the same size as the input, even though we know the final size will be smaller.

Consider counting valid observers first to allocate exact size:

static shared_observers cleanup_observers(const shared_observers& current_subs, const rpp::details::observers::observer_vtable<Type>* to_delete)
{
-    auto   subs = shared_observers{current_subs.span().size()};
-    size_t i    = 0;
-    for (const auto& obs : current_subs.span())
-    {
-        if (obs.get() != to_delete)
-            subs.span()[i++] = obs;
-    }
-    subs.shrink(i);
+    const auto count = std::count_if(current_subs.span().begin(), current_subs.span().end(),
+        [to_delete](const auto& obs) { return obs.get() != to_delete; });
+    auto subs = shared_observers{count};
+    std::copy_if(current_subs.span().begin(), current_subs.span().end(), subs.span().begin(),
+        [to_delete](const auto& obs) { return obs.get() != to_delete; });
    return subs;
}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between df047d1 and 499a6d9.

📒 Files selected for processing (1)
  • src/rpp/rpp/subjects/details/subject_state.hpp (6 hunks)
🧰 Additional context used
📓 Learnings (1)
src/rpp/rpp/subjects/details/subject_state.hpp (1)
Learnt from: victimsnino
PR: victimsnino/ReactivePlusPlus#667
File: src/rpp/rpp/subjects/details/subject_state.hpp:168-168
Timestamp: 2024-11-10T04:25:44.618Z
Learning: In the `subject_state` class, observers are not thread-safe, so we should not check `obs->is_disposed()` in the `cleanup_observers` function.
🔇 Additional comments (1)
src/rpp/rpp/subjects/details/subject_state.hpp (1)

83-83: ⚠️ Potential issue

Add null check in span() method.

Accessing m_observers.get() without checking for nullptr could lead to undefined behavior.

-std::span<observer> span() const noexcept { return {m_observers.get(), m_size}; }
+std::span<observer> span() const noexcept {
+    if (!m_observers) return {};
+    return {m_observers.get(), m_size};
+}

Likely invalid or redundant comment.

Comment on lines +85 to +89
void shrink(size_t new_size)
{
assert(new_size < m_size);
m_size = new_size;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add runtime check in shrink method.

Using only assert is insufficient as it may be disabled in release builds.

void shrink(size_t new_size)
{
-    assert(new_size < m_size);
+    if (new_size >= m_size)
+        throw std::out_of_range{"shrink: new_size >= m_size"};
    m_size = new_size;
}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
void shrink(size_t new_size)
{
assert(new_size < m_size);
m_size = new_size;
}
void shrink(size_t new_size)
{
if (new_size >= m_size)
throw std::out_of_range{"shrink: new_size >= m_size"};
m_size = new_size;
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant