-
Notifications
You must be signed in to change notification settings - Fork 31
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Try raw array to fix subject issue #698
Conversation
📝 WalkthroughWalkthroughThe pull request introduces a significant refactoring of the observer management in the Changes
Sequence DiagramsequenceDiagram
participant Subject
participant SharedObservers
participant Observer1
participant Observer2
Subject->>SharedObservers: Create with initial size
Subject->>SharedObservers: Add Observer1
Subject->>SharedObservers: Add Observer2
Subject->>SharedObservers: Get observer span
SharedObservers-->>Subject: Return current observers
Subject->>Observer1: Notify
Subject->>Observer2: Notify
Poem
Finishing Touches
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (1)
src/rpp/rpp/subjects/details/subject_state.hpp (1)
23-23
: Remove unused include directive<list>
.The header file
<list>
is included but not used in the code. Removing it can improve compilation time and reduce unnecessary dependencies.Apply this diff to remove the unused include:
-#include <list>
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
src/rpp/rpp/subjects/details/subject_state.hpp
(6 hunks)src/tests/rpp/test_subjects.cpp
(1 hunks)
🧰 Additional context used
📓 Learnings (1)
src/rpp/rpp/subjects/details/subject_state.hpp (1)
Learnt from: victimsnino
PR: victimsnino/ReactivePlusPlus#667
File: src/rpp/rpp/subjects/details/subject_state.hpp:168-168
Timestamp: 2024-11-10T04:25:44.618Z
Learning: In the `subject_state` class, observers are not thread-safe, so we should not check `obs->is_disposed()` in the `cleanup_observers` function.
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: Cache deps on ci-macos for Debug
🔇 Additional comments (3)
src/rpp/rpp/subjects/details/subject_state.hpp (2)
167-175
: Proper cleanup of observers without checkingis_disposed()
.Based on previous learnings, it's important not to check
obs->is_disposed()
in thecleanup_observers
function since observers are not thread-safe. Your implementation correctly avoids this by only comparing pointers.
132-138
:⚠️ Potential issuePotential data race due to releasing lock before securing mutex.
In the
on_next
method, releasingobservers_lock
before acquiringm_serialized_mutex
may lead to data races if other threads modifym_state
between these operations.Consider holding the
observers_lock
until after acquiringm_serialized_mutex
to ensure thread safety.Apply this diff to adjust the locking sequence:
process_state_unsafe(m_state, [&](shared_observers observers) { - observers_lock.unlock(); - std::lock_guard lock{m_serialized_mutex}; + std::lock_guard lock{m_serialized_mutex}; + observers_lock.unlock();src/tests/rpp/test_subjects.cpp (1)
172-190
: 🛠️ Refactor suggestionPotential performance issues due to excessive subscriptions inside
on_next
.In this test case, you are adding 100 new subscriptions inside each
on_next
call. Given that you emit values from 0 to 99, this results in 10,000 subscriptions, which can severely impact performance and test execution time.Consider modifying the test to limit the number of subscriptions or to add subscriptions only once. If the goal is to stress-test the observer management, clarify this intention in the test comments.
Apply this diff to adjust the test:
int value = {}; subject.get_observable().subscribe([&subject, &value](int v) { - for (int i = 0; i < 100; ++i) - subject.get_observable().subscribe([](int) {}); + static bool subscribed = false; + if (!subscribed) { + for (int i = 0; i < 100; ++i) + subject.get_observable().subscribe([](int) {}); + subscribed = true; + } value = v; });This change adds the additional subscriptions only once, improving performance while maintaining the test's purpose.
std::shared_ptr<observer[]> m_observers{}; | ||
size_t m_size{}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Ensure proper initialization of m_observers
member variable.
The default-initialized std::shared_ptr<observer[]> m_observers{}
may hold a nullptr. Accessing m_observers.get()
without checking for nullptr could lead to undefined behavior.
Consider initializing m_observers
with a non-null value or adding checks before usage.
shared_observers(size_t size) | ||
: m_observers{std::make_shared<observer[]>(size)} | ||
, m_size{size} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incorrect use of std::make_shared
with array types may lead to undefined behavior.
Using std::make_shared
with array types is not supported in C++. This can result in compilation errors or undefined behavior.
To fix this issue, allocate the array using new
and wrap it with std::shared_ptr
, specifying a custom deleter:
-shared_observers(size_t size)
- : m_observers{std::make_shared<observer[]>(size)}
+shared_observers(size_t size)
+ : m_observers{std::shared_ptr<observer[]>(new observer[size], std::default_delete<observer[]>())}
Alternatively, consider using std::vector
for automatic memory management:
-class shared_observers
+class shared_observers
+{
+public:
+ shared_observers() = default;
+
+ shared_observers(size_t size)
+ : m_observers(size)
+ {
+ }
+
+ std::span<observer> span() const noexcept { return {m_observers.data(), m_observers.size()}; }
+
+ void shrink(size_t new_size)
+ {
+ if (new_size > m_observers.size()) throw std::out_of_range{"shrink: new_size > m_size"};
+ m_observers.resize(new_size);
+ }
+
+private:
+ std::vector<observer> m_observers{};
+};
Committable suggestion skipped: line range outside the PR's diff.
0167d6a
to
e47eeb1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
src/rpp/rpp/subjects/details/subject_state.hpp (1)
76-80
:⚠️ Potential issueFix undefined behavior with array type in
std::make_shared
.Using
std::make_shared
with array types is not supported in C++ and may lead to undefined behavior.Use
std::shared_ptr
with explicit array allocation:-shared_observers(size_t size) - : m_observers{std::make_shared<observer[]>(size)} - , m_size{size} -{ -} +shared_observers(size_t size) + : m_observers{std::shared_ptr<observer[]>(new observer[size], std::default_delete<observer[]>())} + , m_size{size} +{ +}
🧹 Nitpick comments (2)
src/rpp/rpp/subjects/details/subject_state.hpp (2)
23-23
: Remove unused include.The
<list>
header is included but not used in the code.-#include <list>
108-114
: Optimize subscription performance.The current implementation creates and copies the entire array on each subscription, leading to O(n) complexity and frequent allocations.
Consider implementing a growth strategy similar to
std::vector
:class shared_observers { static constexpr size_t INITIAL_CAPACITY = 4; static size_t calculate_growth(size_t current) { return current + (current >> 1); // Grow by 1.5x } public: void reserve(size_t new_cap) { if (new_cap <= m_size) return; auto new_observers = std::shared_ptr<observer[]>( new observer[new_cap], std::default_delete<observer[]>()); if (m_observers) { std::copy(span().begin(), span().end(), std::span{new_observers.get(), new_cap}.begin()); } m_observers = std::move(new_observers); m_capacity = new_cap; } void push_back(observer obs) { if (m_size == m_capacity) { reserve(m_capacity ? calculate_growth(m_capacity) : INITIAL_CAPACITY); } m_observers.get()[m_size++] = std::move(obs); } private: size_t m_capacity{}; };
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/rpp/rpp/subjects/details/subject_state.hpp
(6 hunks)
🧰 Additional context used
📓 Learnings (1)
src/rpp/rpp/subjects/details/subject_state.hpp (1)
Learnt from: victimsnino
PR: victimsnino/ReactivePlusPlus#667
File: src/rpp/rpp/subjects/details/subject_state.hpp:168-168
Timestamp: 2024-11-10T04:25:44.618Z
Learning: In the `subject_state` class, observers are not thread-safe, so we should not check `obs->is_disposed()` in the `cleanup_observers` function.
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: Cache deps on ci-windows for Debug
🔇 Additional comments (1)
src/rpp/rpp/subjects/details/subject_state.hpp (1)
82-82
:⚠️ Potential issueAdd null check in
span()
method.Accessing
m_observers.get()
without checking for nullptr could lead to undefined behavior.-std::span<observer> span() const noexcept { return {m_observers.get(), m_size}; } +std::span<observer> span() const noexcept +{ + if (!m_observers) + return {}; + return {m_observers.get(), m_size}; +}Likely invalid or redundant comment.
// we are getting copy of curent observers and obtaining CURRENT begin/end of in case of some new observer would be added during on_next call | ||
process_state_unsafe(m_state, [&](shared_observers observers) { | ||
observers_lock.unlock(); | ||
|
||
if (!std::holds_alternative<shared_observers>(m_state)) | ||
return; | ||
|
||
// we are getting copy of curent deque and obtaining CURRENT begin/end of in case of some new observer would be added during on_next call | ||
const auto observers = std::get<shared_observers>(m_state); | ||
if (!observers) | ||
return; | ||
|
||
const auto begin = observers->cbegin(); | ||
const auto end = observers->cend(); | ||
|
||
observers_lock.unlock(); | ||
|
||
std::lock_guard lock{m_serialized_mutex}; | ||
std::for_each(begin, end, [&](const observer& obs) { obs->on_next(v); }); | ||
std::lock_guard lock{m_serialized_mutex}; | ||
rpp::utils::for_each(observers.span(), [&](const observer& obs) { obs->on_next(v); }); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix potential race condition in on_next
.
Releasing the lock before iterating over observers could lead to use-after-free if observers are disposed during iteration.
Consider maintaining the lock during iteration or using a thread-safe observer container:
void on_next(const Type& v)
{
std::unique_lock observers_lock{m_mutex};
- // we are getting copy of curent observers and obtaining CURRENT begin/end of in case of some new observer would be added during on_next call
- process_state_unsafe(m_state, [&](shared_observers observers) {
- observers_lock.unlock();
-
- std::lock_guard lock{m_serialized_mutex};
- rpp::utils::for_each(observers.span(), [&](const observer& obs) { obs->on_next(v); });
- });
+ process_state_unsafe(m_state, [&](const shared_observers& observers) {
+ std::lock_guard lock{m_serialized_mutex};
+ // Keep observers_lock during iteration to prevent use-after-free
+ rpp::utils::for_each(observers.span(), [&](const observer& obs) {
+ if (obs) // Add null check for safety
+ obs->on_next(v);
+ });
+ });
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
// we are getting copy of curent observers and obtaining CURRENT begin/end of in case of some new observer would be added during on_next call | |
process_state_unsafe(m_state, [&](shared_observers observers) { | |
observers_lock.unlock(); | |
if (!std::holds_alternative<shared_observers>(m_state)) | |
return; | |
// we are getting copy of curent deque and obtaining CURRENT begin/end of in case of some new observer would be added during on_next call | |
const auto observers = std::get<shared_observers>(m_state); | |
if (!observers) | |
return; | |
const auto begin = observers->cbegin(); | |
const auto end = observers->cend(); | |
observers_lock.unlock(); | |
std::lock_guard lock{m_serialized_mutex}; | |
std::for_each(begin, end, [&](const observer& obs) { obs->on_next(v); }); | |
std::lock_guard lock{m_serialized_mutex}; | |
rpp::utils::for_each(observers.span(), [&](const observer& obs) { obs->on_next(v); }); | |
}); | |
process_state_unsafe(m_state, [&](const shared_observers& observers) { | |
std::lock_guard lock{m_serialized_mutex}; | |
// Keep observers_lock during iteration to prevent use-after-free | |
rpp::utils::for_each(observers.span(), [&](const observer& obs) { | |
if (obs) // Add null check for safety | |
obs->on_next(v); | |
}); | |
}); |
df047d1
to
56de5ea
Compare
for more information, see https://pre-commit.ci
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (2)
src/rpp/rpp/subjects/details/subject_state.hpp (2)
77-81
:⚠️ Potential issueFix array allocation in shared_observers constructor.
The use of
std::make_shared
with arrays is problematic and may lead to undefined behavior.Use one of these alternatives:
- Use
std::shared_ptr
with explicit array allocation:-: m_observers{std::make_shared<observer[]>(size)} +: m_observers{std::shared_ptr<observer[]>(new observer[size], std::default_delete<observer[]>())}
- Consider using
std::vector
for better memory management and safety.
133-139
:⚠️ Potential issueFix potential race condition in on_next.
Releasing the lock before iterating over observers could lead to use-after-free if observers are disposed during iteration.
Maintain the lock during iteration:
-process_state_unsafe(m_state, [&](shared_observers observers) { - observers_lock.unlock(); +process_state_unsafe(m_state, [&](const shared_observers& observers) { std::lock_guard lock{m_serialized_mutex}; rpp::utils::for_each(observers.span(), [&](const observer& obs) { obs->on_next(v); }); });
🧹 Nitpick comments (2)
src/rpp/rpp/subjects/details/subject_state.hpp (2)
109-115
: Consider pre-allocating space for observers.The current implementation creates a new array and copies all observers on each subscription, which could be inefficient for large observer lists.
Consider pre-allocating with some extra capacity to reduce reallocations:
-auto new_observers = shared_observers{old_size + 1}; +constexpr size_t extra_capacity = 4; // Adjust based on typical usage +auto new_observers = shared_observers{old_size + extra_capacity};
168-176
: Optimize observer cleanup.The current implementation always allocates a new array of the same size as the input, even though we know the final size will be smaller.
Consider counting valid observers first to allocate exact size:
static shared_observers cleanup_observers(const shared_observers& current_subs, const rpp::details::observers::observer_vtable<Type>* to_delete) { - auto subs = shared_observers{current_subs.span().size()}; - size_t i = 0; - for (const auto& obs : current_subs.span()) - { - if (obs.get() != to_delete) - subs.span()[i++] = obs; - } - subs.shrink(i); + const auto count = std::count_if(current_subs.span().begin(), current_subs.span().end(), + [to_delete](const auto& obs) { return obs.get() != to_delete; }); + auto subs = shared_observers{count}; + std::copy_if(current_subs.span().begin(), current_subs.span().end(), subs.span().begin(), + [to_delete](const auto& obs) { return obs.get() != to_delete; }); return subs; }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/rpp/rpp/subjects/details/subject_state.hpp
(6 hunks)
🧰 Additional context used
📓 Learnings (1)
src/rpp/rpp/subjects/details/subject_state.hpp (1)
Learnt from: victimsnino
PR: victimsnino/ReactivePlusPlus#667
File: src/rpp/rpp/subjects/details/subject_state.hpp:168-168
Timestamp: 2024-11-10T04:25:44.618Z
Learning: In the `subject_state` class, observers are not thread-safe, so we should not check `obs->is_disposed()` in the `cleanup_observers` function.
🔇 Additional comments (1)
src/rpp/rpp/subjects/details/subject_state.hpp (1)
83-83
:⚠️ Potential issueAdd null check in span() method.
Accessing
m_observers.get()
without checking for nullptr could lead to undefined behavior.-std::span<observer> span() const noexcept { return {m_observers.get(), m_size}; } +std::span<observer> span() const noexcept { + if (!m_observers) return {}; + return {m_observers.get(), m_size}; +}Likely invalid or redundant comment.
void shrink(size_t new_size) | ||
{ | ||
assert(new_size < m_size); | ||
m_size = new_size; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add runtime check in shrink method.
Using only assert is insufficient as it may be disabled in release builds.
void shrink(size_t new_size)
{
- assert(new_size < m_size);
+ if (new_size >= m_size)
+ throw std::out_of_range{"shrink: new_size >= m_size"};
m_size = new_size;
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
void shrink(size_t new_size) | |
{ | |
assert(new_size < m_size); | |
m_size = new_size; | |
} | |
void shrink(size_t new_size) | |
{ | |
if (new_size >= m_size) | |
throw std::out_of_range{"shrink: new_size >= m_size"}; | |
m_size = new_size; | |
} |
Summary by CodeRabbit
New Features
shared_observers
class to enhance observer management in the subject implementation.Tests