-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(concurrent perpartition cursor): Add parent state updates #343
feat(concurrent perpartition cursor): Add parent state updates #343
Conversation
…concurrent-global-cursor
…h `_use_global_cursor`
📝 WalkthroughWalkthroughThis pull request updates the functionality of the Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant CP as ConcurrentPerPartitionCursor
Client->>CP: close_partition(partition)
Note right of CP: Invoke _check_and_update_parent_state()
CP->>CP: Update _partition_parent_state_map
CP->>CP: _emit_state_message(throttle=False)
sequenceDiagram
participant CP as ConcurrentPerPartitionCursor
participant SliceGen as _generate_slices_from_partition
CP->>SliceGen: _generate_slices_from_partition(partition, parent_state)
SliceGen-->>CP: Yield stream slice with parent state
Possibly related PRs
Suggested reviewers
✨ Finishing Touches
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (14)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (13)
8-8
: Consider using monotonic time for scheduling.
Would you like to usetime.monotonic()
instead, to avoid issues caused by system clock adjustments? wdyt?
62-63
: Make partition limits configurable.
Raising the limit to 25,000 and introducingSWITCH_TO_GLOBAL_LIMIT
at 10,000 helps, but would you consider allowing these to be configured via constructor parameters or environment variables? That might support more diverse use cases. wdyt?
98-101
: Validate parent-state OrderedDict usage.
Storing parent state in an ordered dictionary is helpful. Would you like to confirm you won’t need to remove entries in arbitrary orders, beyond what's handled in_check_and_update_parent_state
? wdyt?
108-108
: Expose_number_of_partitions
usage.
Might it be beneficial to track timestamps or reasons for partition increments to help monitoring and debugging partition growth over time? wdyt?
111-111
: Use None for initial time sentinel.
Do you prefer initializing_last_emission_time
asNone
rather than0.0
to indicate "no prior emission"? This can make the code more self-descriptive. wdyt?
168-186
: Ensure only the last closed parent state is retained.
This method updates_parent_state
to the most recent popped state if multiple partitions are fully closed. Are you sure you only need the final partition’s state? If you'd like to preserve all, you might store them differently. wdyt?
198-198
: Revisiting forced emission configuration.
You call_emit_state_message(throttle=False)
here. Would you consider making forced emission more explicit via clear naming or docstring, so it's evident we bypass the throttle logic? wdyt?
200-208
: Consider making the throttle interval configurable.
A 60-second hardcoded interval might be limiting. If the environment or use case changes, would you consider an approach to set this interval externally (e.g., in the constructor)? wdyt?
209-214
: Log throttle behavior for debugging.
You skip emission whenever_throttle_state_message
returnsNone
. Do you want to log a debug message indicating the emission got throttled to help troubleshoot? wdyt?
236-267
: Add concurrency docstring to_generate_slices_from_partition
.
This method carefully locks and manages semaphores, which is great. Would you consider adding docstrings that explain how concurrency is managed here for future maintainers? wdyt?
282-288
: Validate forced switch to global.
Oncelimit_reached()
is true, the code forcibly switches to a global cursor. Are you okay losing granular partition states from that point onward? If not, you might keep partial coverage for old partitions. wdyt?
290-290
: Check integer overflow concerns.
_number_of_partitions += 1
grows indefinitely. In Python, integers are unbounded, but for extremely large sync jobs, might it be beneficial to reset or track usage differently? wdyt?
464-464
: Double-check strict greater sign.
return self._number_of_partitions > self.SWITCH_TO_GLOBAL_LIMIT
uses>
rather than>=
. Do you intend to switch exactly after exceeding the limit, or once we reach it? wdyt?unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (1)
1186-1198
: Patching_throttle_state_message
for easier testing.
Great approach to bypass or control throttling logic. Did you consider verifying you call_throttle_state_message
the expected number of times? That might further ensure correctness. wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
(11 hunks)unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py
(4 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
[warning] 1-1: File would be reformatted. Please format the code according to the style guidelines.
⏰ Context from checks skipped due to timeout of 90000ms (8)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Analyze (python)
🔇 Additional comments (7)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (4)
153-168
: Check semaphore ordering inclose_partition
.
Currently, you acquire the semaphore (.acquire()
) after updating the global cursor. Would it be helpful to acquire the semaphore first to prevent potential concurrency races where another thread might read the updated global cursor prematurely? wdyt?
231-234
: Confirm parent state accuracy.
When calling_generate_slices_from_partition(partition, parent_state)
, can we confirm that each partition's parent state is always valid for that partition? This might help ensure correctness for nested substreams. wdyt?
302-311
: Avoid potential PII in logs.
The logs mention dropping partitions by referencing the partition data. Could that data be sensitive or user-specific? Perhaps mask or hash it if user data can appear. wdyt?
426-430
: Ensure consistent type when comparing cursor values.
We compare the old_new_global_cursor
value tovalue
with<
. Are we certain both are the same type or that they compare reliably? If we expect strings vs. timestamps, that might cause issues. wdyt?unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (3)
6-6
: Grateful for the handy mock imports.
No issues here—just acknowledging the addition ofMagicMock
andpatch
. This helps significantly with concurrency tests.
22-22
: Nice direct import ofConcurrentPerPartitionCursor
.
This import clarifies references to the cursor in tests.
2978-3020
: Comprehensive test fortest_state_throttling
.
The test ensures no new state is emitted if fewer than 60 seconds have passed. This is an excellent scenario coverage. If you anticipate different intervals in the future, might it be worthwhile to add a parameterized test for various intervals? wdyt?
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
Outdated
Show resolved
Hide resolved
…oncurrent-perpartition-add-parent-state-updates
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)
415-423
:⚠️ Potential issueHandle partition observation failures.
The order of operations in
observe
might lead to inconsistent state. Ifcursor_per_partition[...].observe(record)
fails, the global cursor might be ahead incorrectly.This was previously identified in a past review. Would you prefer a try/finally or a combined approach? wdyt?
🧹 Nitpick comments (2)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (2)
62-63
: Consider documenting the rationale for partition limits.The changes introduce two distinct limits:
DEFAULT_MAX_PARTITIONS_NUMBER
(25,000) andSWITCH_TO_GLOBAL_LIMIT
(10,000). Would you consider adding docstrings to explain:
- Why these specific numbers were chosen?
- The memory implications of increasing the max partitions?
- The tradeoffs of switching to global cursor at 10,000?
wdyt?
199-207
: Consider making the throttle interval configurable.The 60-second throttle interval is currently hardcoded. Would you consider:
- Making it a class constant for easier configuration?
- Adding it as a parameter to the constructor?
This would make it easier to adjust based on different use cases or performance requirements. wdyt?
+ DEFAULT_STATE_EMISSION_INTERVAL = 60 # seconds def _throttle_state_message(self) -> Optional[float]: current_time = time.time() - if current_time - self._last_emission_time <= 60: + if current_time - self._last_emission_time <= self.DEFAULT_STATE_EMISSION_INTERVAL: return None return current_time
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
(11 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
[warning] 1-1: File would be reformatted. Please format the code according to the style guidelines.
⏰ Context from checks skipped due to timeout of 90000ms (8)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Analyze (python)
🔇 Additional comments (3)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (3)
167-185
: Consider thread safety in parent state updates.The
_check_and_update_parent_state
method looks good, but there might be a potential race condition. Even though it's called within a lock inclose_partition
, would you consider:
- Adding a note in the docstring about thread safety requirements?
- Documenting that this method should only be called while holding
self._lock
?wdyt?
235-255
: LGTM! Good use of thread safety and deep copying.The changes to partition generation look good:
- Thread safety is maintained with proper lock usage
- Parent state is properly deep copied to prevent mutations
281-287
: LGTM! Clear logging and state transition.Good addition of informative logging when switching to global cursor. The transition logic is clear and well-documented.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So let me rephrase the logic of this implementation in words so that you can confirm my understanding.
Order of events
While consuming the parent stream, we keep track of the state after each parent record as part of an ordered list of states described by self._partition_parent_state_map
. When a partition on the child stream is closed, we can check a couple of things:
- Have we generated all the slices for this partition (for example, let's assume we have parent_record_id, we might have slices
{"partition": <parent_record_id>, "cursor": <january>}
and{"partition": <parent_record_id>, "cursor": <february>}
)? If not, we can't progress the state because it is impossible for <parent_record_id> to be fully processed - Have we processed all the slices associated with the parent record id? This is done using
self._semaphore_per_partition[p_key]._value != 0
Given the two conditions above, it means that we can progress the state. Here is a timeline of what events can look like:
- T0: Generate
{"partition": <parent_record_id>, "cursor": <january>}
, january is NOT last - T1: Closing
{"partition": <parent_record_id>, "cursor": <january>}
, the parent state is not progressed because we still have slices to generate - T2: Generate
{"partition": <parent_record_id>, "cursor": <february>}
, february is last - T3: Closing
{"partition": <parent_record_id>, "cursor": <february>}
, we can progress the parent state
This works with unordered execution of slices with the same slice.partition but different slice.cursor:
- T0: Generate
{"partition": <parent_record_id>, "cursor": <january>}
, january is NOT last - T1: Generate
{"partition": <parent_record_id>, "cursor": <february>}
, february is last - T2: Closing
{"partition": <parent_record_id>, "cursor": <february>}
, the parent state is not progressed because we still have slices to process - T3: Closing
{"partition": <parent_record_id>, "cursor": <january>}
, we can progress the parent state
This works with unordered execution of slices with different child slice.partition:
- T0: Generate
{"partition": <parent_record_id_1>, "cursor": <whatever>}
, whatever is last for parent_record_id_1 - T1: Generate
{"partition": <parent_record_id_2>, "cursor": <whatever>}
, whatever is last for parent_record_id_2 - T2: Closing
{"partition": <parent_record_id_2>, "cursor": <whatever>}
, the parent state is not progressed because parent_record_id_1 is first in the list and hasn't been cleared out - T3: Closing
{"partition": <parent_record_id_1>, "cursor": <whatever>}
, we can progress the parent state to the parent state whenparent_record_id_2
was emitted
Assumptions/concerns
- [I'm fine with this one] The state is updated as part of the partition router. We wanted to move away from DeclarativeStream traction their own state because of issues it created (calling twice
stream_slices
orread_records
caused issues in the past). But we can haveSubstreamPartitionRouter
track the state using a cursor in the future and that would be just fine as well. - [I think this is already the case but...] Given we don't support checkpointing state on records, the cursors should only progress the state when closing partitions.
- The parent state is deep copied in order to avoid it being updated for all entries in
self._partition_parent_state_map
at the same time. - [I think we need to evaluate the likeliness of issues related to this] Memory usage is not an issue. Do we need a backup plan if there are a lot of partitions?
- An idea of optimization memory wise would be to remove the entry in _partition_parent_state_map if the state for the next entry is the same. For example, if records are emitted in the order ["parent_record_id_1", "parent_record_id_2", "parent_record_id_3"] and all records are from the same parent slice, it is very likely that the state will be the same because we only progress the state when closing the slice. This would mean that we would have the following sequence of events:
- T0: Generate slice for
parent_record_id_1
with parent state <parent state 1>, self._partition_parent_state_map = {"parent_record_id_1": <parent state 1>} - T1: Generate slice for
parent_record_id_2
with parent state <parent state 1>, self._partition_parent_state_map = {"parent_record_id_2": <parent state 1>} instead of {"parent_record_id_1": <parent state 1>, "parent_record_id_2": <parent state 1>} - T2: Generate slice for
parent_record_id_3
with parent state <parent state 2>, self._partition_parent_state_map = {"parent_record_id_2": <parent state 1>, "parent_record_id_3": <parent state 2>} instead of {"parent_record_id_1": <parent state 1>, "parent_record_id_2": <parent state 1>, "parent_record_id_3": <parent state 2>}
This would not necessarily be enough to prevent any memory creep so we might need to find other guardrails
- T0: Generate slice for
- An idea of optimization memory wise would be to remove the entry in _partition_parent_state_map if the state for the next entry is the same. For example, if records are emitted in the order ["parent_record_id_1", "parent_record_id_2", "parent_record_id_3"] and all records are from the same parent slice, it is very likely that the state will be the same because we only progress the state when closing the slice. This would mean that we would have the following sequence of events:
Benefits/drawbacks
Having the solution as part of ConcurrentPerPartitionCursor is good because:
- We already track most of the information needed (apart from the parent state) in the ConcurrentPerPartitionCursor
- The logic does not bleed (like in some proposal like this one where we would pass the information through the stream_slice)
Therefore, I think it is a smart solution and it feels we should go forward with it. What would be the next steps? Unit tests? Testing on the connection we struggled with for source-amazon-ads?
@maxi297 Yes, the next steps will be to implement the optimization you suggested, followed by adding unit tests and testing with Amazon Seller Partner users who previously experienced issues. |
Regarding my optimization, I don't think it is possible as we need to know |
Adding to the previous comment, we need to keep all semaphores in memory anyway because if one does not have its value == 0, it means we can't progress the global state. We assume all those semaphores to be very low in terms of memory usage given that they are only a wrapper over and integer. Hence, we can reduce memory usage by instead of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (3)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (3)
62-64
: Consider documenting the relationship between partition limits.The relationship between
DEFAULT_MAX_PARTITIONS_NUMBER
(25,000) andSWITCH_TO_GLOBAL_LIMIT
(10,000) isn't immediately clear. Would you consider adding a docstring explaining why we switch to global cursor at 10K but allow up to 25K partitions? Also, have you validated the memory impact of increasing the max partitions from 10K to 25K? wdyt?
204-204
: Consider making the throttle interval configurable.The 60-second throttle interval is hardcoded. Would you consider making this configurable through a class constant or constructor parameter? This would allow users to adjust the throttling based on their needs. wdyt?
+ DEFAULT_STATE_EMISSION_INTERVAL = 60 # seconds def _throttle_state_message(self) -> Optional[float]: current_time = time.time() - if current_time - self._last_emission_time <= 60: + if current_time - self._last_emission_time <= self.DEFAULT_STATE_EMISSION_INTERVAL:
254-256
: Consider reducing lock scope for better concurrency.The lock is held while doing a deepcopy of the parent state. Would you consider moving the deepcopy outside the lock to minimize lock contention? wdyt?
+ parent_state_copy = deepcopy(parent_state) with self._lock: - self._partition_parent_state_map[partition_key] = deepcopy(parent_state) + self._partition_parent_state_map[partition_key] = parent_state_copy
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
(5 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (6)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Ruff Format Check
- GitHub Check: Pytest (Fast)
- GitHub Check: Analyze (python)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
Outdated
Show resolved
Hide resolved
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)
62-63
: Consider documenting the memory implications of the increased partition limit.The
DEFAULT_MAX_PARTITIONS_NUMBER
has been increased from 10,000 to 25,000. Would you consider adding a comment explaining the memory implications and why 25,000 was chosen as the new limit? This could help future maintainers understand the tradeoffs. wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
(4 hunks)unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (8)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Analyze (python)
🔇 Additional comments (6)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (3)
167-201
: LGTM! Well-structured parent state management.The implementation correctly ensures that parent states are updated in order, only when all earlier partitions are fully finished. The code is well-documented and handles edge cases appropriately.
215-223
: LGTM! Effective state emission throttling.The throttling implementation effectively limits state message emissions to once every 60 seconds, which should help reduce overhead.
246-249
: LGTM! Clean integration of parent state in slice generation.The changes effectively integrate parent state tracking into the slice generation process while maintaining the existing functionality.
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (3)
308-361
: LGTM! Robust test helper functions.The test helpers provide a solid foundation for testing state transitions, request mocking, and record verification. The implementation is thorough and well-documented.
Also applies to: 763-867
425-2331
: LGTM! Comprehensive test coverage.The test suite thoroughly covers various scenarios including:
- Parent state updates
- Error handling
- Empty record handling
- State migrations
- List partition router
- Request options
The test cases are well-structured and provide good coverage of edge cases.
2980-3022
: LGTM! Well-structured throttling test.The test effectively verifies the state emission throttling behavior, covering both cases where emission should and should not occur. The mock setup and assertions are clear and comprehensive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (4)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (4)
308-361
: Consider adding type hints to improve code maintainability?The
run_mocked_test
function could benefit from type hints for better code maintainability and IDE support. For example:-def run_mocked_test( +def run_mocked_test( + mock_requests: List[Tuple[str, Optional[Dict[str, Any]]]], + manifest: Dict[str, Any], + config: Dict[str, Any], + stream_name: str, + initial_state: Dict[str, Any], + expected_records: List[Dict[str, Any]], + expected_state: Dict[str, Any] +) -> None:Also, consider making the assertion messages more descriptive by including the actual values in the error message, wdyt?
382-396
: Consider grouping related constants into classes or dataclasses?The date-related constants could be organized into a dataclass or class for better encapsulation and readability, wdyt?
+@dataclass +class TestDates: + START_DATE: str = "2024-01-01T00:00:01Z" + POST_1_UPDATED_AT: str = "2024-01-30T00:00:00Z" + POST_2_UPDATED_AT: str = "2024-01-29T00:00:00Z" + POST_3_UPDATED_AT: str = "2024-01-28T00:00:00Z"
737-761
: Consider adding test class for better organization?The test functions could be organized into a test class for better structure and potential shared setup/teardown, wdyt?
+class TestConcurrentPerPartitionCursor: + @pytest.mark.parametrize(...) + def test_incremental_parent_state_no_incremental_dependency( + self, + test_name, + manifest, + mock_requests, + expected_records, + initial_state, + expected_state + ):
3267-3309
: Consider adding more edge cases to state throttling tests?The state throttling tests cover basic scenarios but could benefit from additional edge cases, wdyt? For example:
- Test behavior when time goes backwards
- Test with very large time values
- Test with time values close to the throttling threshold
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py
(8 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (8)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Analyze (python)
🔇 Additional comments (1)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (1)
1-3309
: Great test coverage and organization!The test suite is well-structured with comprehensive coverage of various scenarios. The use of pytest features, mock requests, and clear test data organization makes the tests maintainable and readable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (4)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (4)
3032-3078
: Consider enhancing test assertions for better failure messages, wdyt?The test could be more maintainable with:
- More descriptive assertion messages using pytest's assert messages
- Additional edge cases like None values for state
- Verifying the order of mock calls
- assert cursor.state == { + assert cursor.state == { "use_global_cursor": False, "lookback_window": 0, "states": [], - } + }, "Expected empty state with default values" - assert len(cursor._cursor_per_partition) == 0 + assert len(cursor._cursor_per_partition) == 0, "Expected no cursors" - assert len(cursor._semaphore_per_partition) == 0 + assert len(cursor._semaphore_per_partition) == 0, "Expected no semaphores" - assert len(cursor._partition_parent_state_map) == 0 + assert len(cursor._partition_parent_state_map) == 0, "Expected no parent state mappings" - assert mock_cursor.stream_slices.call_count == 0 + assert mock_cursor.stream_slices.call_count == 0, "Expected no calls to stream_slices" + # Add edge case for None state + cursor._state = None + slices = list(cursor.stream_slices()) + assert cursor.state == { + "use_global_cursor": False, + "lookback_window": 0, + "states": [], + }, "Expected default state when input state is None"
3080-3153
: Consider adding error cases and improving variable naming, thoughts?The test could be enhanced with:
- More descriptive variable names (e.g.,
slice1_data
instead ofdata1
)- Error case handling (e.g., what happens if parent state update fails?)
- Verification of operation order
- mock_cursor.stream_slices.side_effect = [ - iter( - [ - {"slice1": "data1"}, - {"slice2": "data1"}, # First slice - ] - ), + mock_cursor.stream_slices.side_effect = [ + iter( + [ + {"partition1": "initial_data"}, + {"partition2": "initial_data"}, + ] + ), - iter( - [ - {"slice2": "data2"}, - {"slice2": "data2"}, # First slice for new partition - ] - ), + iter( + [ + {"partition2": "updated_data"}, + {"partition2": "final_data"}, + ] + ), ] + # Add error case test + partition_router.get_stream_state.side_effect = Exception("Failed to get parent state") + with pytest.raises(Exception, match="Failed to get parent state"): + cursor.close_partition( + DeclarativePartition("test_stream", {}, MagicMock(), MagicMock(), slices[0]) + ) + # Verify operation order + mock_calls = cursor._message_repository.method_calls + assert mock_calls[0][0] == "emit_message", "Expected emit_message to be called first"
3155-3227
: Consider adding throttling tests and resource cleanup verification, what do you think?The test could be enhanced with:
- Verification of state emission throttling
- Resource cleanup checks
- Memory leak prevention tests
+ # Test state emission throttling + with patch("time.time") as mock_time: + mock_time.return_value = 0 + cursor.close_partition( + DeclarativePartition("test_stream", {}, MagicMock(), MagicMock(), slices[0]) + ) + assert cursor._message_repository.emit_message.call_count == 1 + + # Try to emit again immediately + mock_time.return_value = 30 # 30 seconds later + cursor.close_partition( + DeclarativePartition("test_stream", {}, MagicMock(), MagicMock(), slices[1]) + ) + assert cursor._message_repository.emit_message.call_count == 1 # Should not emit + + # Verify resource cleanup + assert not cursor._cursor_per_partition, "Expected cursors to be cleaned up" + assert not cursor._semaphore_per_partition, "Expected semaphores to be cleaned up" + assert not cursor._partition_parent_state_map, "Expected parent state map to be cleaned up"
3229-3287
: Consider adding boundary condition tests and memory checks, thoughts?The test could be enhanced with:
- Testing at the exact boundary of partition limits
- Memory usage verification
- Testing the transition back from global cursor
+ # Test exact boundary condition + cursor.DEFAULT_MAX_PARTITIONS_NUMBER = 2 + cursor.SWITCH_TO_GLOBAL_LIMIT = 2 + + # Create exactly max partitions + partitions = [ + StreamSlice(partition={"id": str(i)}, cursor_slice={}, extra_fields={}) + for i in range(cursor.DEFAULT_MAX_PARTITIONS_NUMBER) + ] + partition_router.stream_slices.return_value = iter(partitions) + + slices = list(cursor.stream_slices()) + for slice in slices: + cursor.close_partition( + DeclarativePartition("test_stream", {}, MagicMock(), MagicMock(), slice) + ) + + assert not cursor.state.get("use_global_cursor", False), "Should not switch to global cursor at exact limit" + + # Verify memory cleanup after switch + import sys + import gc + + initial_size = sys.getsizeof(cursor._cursor_per_partition) + # Add one more partition to trigger switch + cursor.close_partition( + DeclarativePartition("test_stream", {}, MagicMock(), MagicMock(), StreamSlice(partition={"id": "extra"}, cursor_slice={}, extra_fields={})) + ) + gc.collect() + final_size = sys.getsizeof(cursor._cursor_per_partition) + + assert final_size <= initial_size, "Memory usage should not increase after switch to global cursor"
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py
(3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (8)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Analyze (python)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we have a test where the parent state is partially progressed because the last partition that was generated has not yet been completed but the first one has?
Apart from those small comments, I think we're good!
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
Outdated
Show resolved
Hide resolved
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (5)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (3)
167-214
: Consider optimizing parent state updates for better performance.The current implementation iterates through all semaphores up to the earliest key, which could be inefficient for large numbers of partitions. Would you consider maintaining an index of the last cleaned parent state to avoid unnecessary iterations? This could be particularly important since this runs under a lock and blocks other threads from generating slices. wdyt?
def _check_and_update_parent_state(self) -> None: + # Track the last cleaned parent state index + if not hasattr(self, '_last_cleaned_index'): + self._last_cleaned_index = 0 + last_closed_state = None while self._partition_parent_state_map: earliest_key = next(iter(self._partition_parent_state_map)) + # Start from the last cleaned index + partition_keys = list(self._semaphore_per_partition.keys())[self._last_cleaned_index:] all_left_finished = True - for p_key, sem in list(self._semaphore_per_partition.items()): + for p_key, sem in [(k, self._semaphore_per_partition[k]) for k in partition_keys]: if p_key not in self._finished_partitions or sem._value != 0: all_left_finished = False break if p_key == earliest_key: + self._last_cleaned_index = list(self._semaphore_per_partition.keys()).index(p_key) break
228-236
: Consider making the throttle interval configurable.The state emission throttle interval is hardcoded to 60 seconds. Would you consider making this configurable via a class constant or configuration parameter to allow for different use cases? This would provide more flexibility for different stream requirements. wdyt?
+ # Default throttle interval in seconds + DEFAULT_STATE_THROTTLE_INTERVAL = 60 + def _throttle_state_message(self) -> Optional[float]: """ Throttles the state message emission to once every 60 seconds. """ current_time = time.time() - if current_time - self._last_emission_time <= 60: + if current_time - self._last_emission_time <= self.DEFAULT_STATE_THROTTLE_INTERVAL: return None return current_time
254-254
: Remove debug print statement.This print statement appears to be for debugging purposes and should be removed or replaced with proper logging. wdyt?
- print("stream_slices")
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (2)
3032-3078
: Consider improving test method naming for better clarity.The test name
test_given_no_partitions_processed_when_close_partition_then_no_state_update
follows a good pattern, but it's quite long. Would you consider using a more concise format liketest_no_state_update_for_unprocessed_partitions
? This maintains clarity while being more readable. wdyt?
3385-3455
: Consider splitting the test into smaller focused tests.This test method is quite long and tests multiple aspects of semaphore cleanup. Would you consider splitting it into smaller, more focused test methods? For example:
test_semaphore_cleanup_initial_state
test_semaphore_cleanup_after_closing_partitions
test_semaphore_cleanup_parent_state_updates
This would make the tests more maintainable and easier to understand. wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
(5 hunks)unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py
(3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (8)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Analyze (python)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given the CI passes, I'm good with this change. The comments are just nits/validation. Thanks @tolik0
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
Outdated
Show resolved
Hide resolved
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py
Outdated
Show resolved
Hide resolved
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py
Outdated
Show resolved
Hide resolved
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (2)
3032-3078
: Great test coverage for the empty partition scenario! Would you consider adding a docstring?The test thoroughly verifies the behavior when no partitions are processed. To make it even more maintainable, what do you think about adding a docstring that explains:
- The purpose of testing empty partitions
- The expected behavior
- The key assertions being made
def test_given_no_partitions_processed_when_close_partition_then_no_state_update(): + """ + Test that when no partitions are processed: + - The cursor state remains empty + - No cursor slices are generated + - All internal data structures (cursor_per_partition, semaphore_per_partition, partition_parent_state_map) are empty + """ mock_cursor = MagicMock()
3383-3453
: Consider extracting magic strings as constants for better maintainability?The test data setup is thorough, but what do you think about extracting some of the repeated values as constants? For example:
"2024-01-02T00:00:00Z"
,"2024-01-03T00:00:00Z"
as date constants{"parent": {"state": "state1"}}
as state constants+# Test Constants +PARTITION_1_DATE = "2024-01-02T00:00:00Z" +PARTITION_2_DATE = "2024-01-03T00:00:00Z" +PARENT_STATE_1 = {"parent": {"state": "state1"}} +PARENT_STATE_2 = {"parent": {"state": "state2"}} def test_semaphore_cleanup(): # Create two mock cursors with different states for each partition mock_cursor_1 = MagicMock() mock_cursor_1.stream_slices.return_value = iter([ {"slice1": "data1"}, {"slice2": "data1"}, # First partition slices ]) - mock_cursor_1.state = {"updated_at": "2024-01-02T00:00:00Z"} # State for partition "1" + mock_cursor_1.state = {"updated_at": PARTITION_1_DATE} # State for partition "1"
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
(5 hunks)unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py
(3 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
⏰ Context from checks skipped due to timeout of 90000ms (8)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Analyze (python)
- GitHub Check: Pytest (Fast)
🔇 Additional comments (1)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (1)
316-369
: LGTM! Well-structured test helper function.The
run_mocked_test
helper function is well-documented and handles all the common test setup, making the tests more maintainable and reducing code duplication.
Summary by CodeRabbit
New Features
Bug Fixes
Refactor