Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(concurrent perpartition cursor): Add parent state updates #343

Merged
merged 31 commits into from
Feb 24, 2025

Conversation

tolik0
Copy link
Contributor

@tolik0 tolik0 commented Feb 17, 2025

Summary by CodeRabbit

  • New Features

    • Upgraded partition management improves the handling of incremental data processing with increased partition limits and enhanced state synchronization.
    • New test scenarios validate cursor behavior in various edge cases, ensuring accurate state management during partition processing.
  • Bug Fixes

    • Improved state management ensures the cursor correctly handles updates and emissions based on partition status.
  • Refactor

    • Enhanced internal logic refines the switching mechanism between different processing modes, ensuring a smoother and more resilient operational flow.

@tolik0 tolik0 changed the title [concurrent perpartition cursor]: Add parent state updates feat(concurrent perpartition cursor): Add parent state updates Feb 17, 2025
Copy link
Contributor

coderabbitai bot commented Feb 17, 2025

📝 Walkthrough

Walkthrough

This pull request updates the functionality of the ConcurrentPerPartitionCursor class in the Airbyte CDK. It revises the partition management and state-handling logic by updating constants, introducing a new constant, and adding an attribute to track partition parent states. In addition, new methods and modified signatures refine the flow for generating stream slices, closing partitions, and emitting state messages with throttling.

Changes

File(s) Change Summary
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py - Constants: Updated DEFAULT_MAX_PARTITIONS_NUMBER (10000 → 25000) and added SWITCH_TO_GLOBAL_LIMIT.
- Attributes: Added _partition_parent_state_map to track parent state per partition.
- Methods: Introduced _check_and_update_parent_state and _throttle_state_message; modified _emit_state_message, close_partition, and _generate_slices_from_partition (added parent_state parameter); updated ensure_at_least_one_state_emitted.
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py - Tests: Added new test functions to validate cursor behavior under various scenarios, including no partitions processed, unfinished partitions, and exceeding partition limits.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant CP as ConcurrentPerPartitionCursor
    Client->>CP: close_partition(partition)
    Note right of CP: Invoke _check_and_update_parent_state()
    CP->>CP: Update _partition_parent_state_map
    CP->>CP: _emit_state_message(throttle=False)
Loading
sequenceDiagram
    participant CP as ConcurrentPerPartitionCursor
    participant SliceGen as _generate_slices_from_partition
    CP->>SliceGen: _generate_slices_from_partition(partition, parent_state)
    SliceGen-->>CP: Yield stream slice with parent state
Loading

Possibly related PRs

Suggested reviewers

  • maxi297: Would you like to have maxi297 review these changes? wdyt?
  • brianjlai: How about asking brianjlai for their insights on this update?
✨ Finishing Touches
  • 📝 Generate Docstrings (Beta)

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (14)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (13)

8-8: Consider using monotonic time for scheduling.
Would you like to use time.monotonic() instead, to avoid issues caused by system clock adjustments? wdyt?


62-63: Make partition limits configurable.
Raising the limit to 25,000 and introducing SWITCH_TO_GLOBAL_LIMIT at 10,000 helps, but would you consider allowing these to be configured via constructor parameters or environment variables? That might support more diverse use cases. wdyt?


98-101: Validate parent-state OrderedDict usage.
Storing parent state in an ordered dictionary is helpful. Would you like to confirm you won’t need to remove entries in arbitrary orders, beyond what's handled in _check_and_update_parent_state? wdyt?


108-108: Expose _number_of_partitions usage.
Might it be beneficial to track timestamps or reasons for partition increments to help monitoring and debugging partition growth over time? wdyt?


111-111: Use None for initial time sentinel.
Do you prefer initializing _last_emission_time as None rather than 0.0 to indicate "no prior emission"? This can make the code more self-descriptive. wdyt?


168-186: Ensure only the last closed parent state is retained.
This method updates _parent_state to the most recent popped state if multiple partitions are fully closed. Are you sure you only need the final partition’s state? If you'd like to preserve all, you might store them differently. wdyt?


198-198: Revisiting forced emission configuration.
You call _emit_state_message(throttle=False) here. Would you consider making forced emission more explicit via clear naming or docstring, so it's evident we bypass the throttle logic? wdyt?


200-208: Consider making the throttle interval configurable.
A 60-second hardcoded interval might be limiting. If the environment or use case changes, would you consider an approach to set this interval externally (e.g., in the constructor)? wdyt?


209-214: Log throttle behavior for debugging.
You skip emission whenever _throttle_state_message returns None. Do you want to log a debug message indicating the emission got throttled to help troubleshoot? wdyt?


236-267: Add concurrency docstring to _generate_slices_from_partition.
This method carefully locks and manages semaphores, which is great. Would you consider adding docstrings that explain how concurrency is managed here for future maintainers? wdyt?


282-288: Validate forced switch to global.
Once limit_reached() is true, the code forcibly switches to a global cursor. Are you okay losing granular partition states from that point onward? If not, you might keep partial coverage for old partitions. wdyt?


290-290: Check integer overflow concerns.
_number_of_partitions += 1 grows indefinitely. In Python, integers are unbounded, but for extremely large sync jobs, might it be beneficial to reset or track usage differently? wdyt?


464-464: Double-check strict greater sign.
return self._number_of_partitions > self.SWITCH_TO_GLOBAL_LIMIT uses > rather than >=. Do you intend to switch exactly after exceeding the limit, or once we reach it? wdyt?

unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (1)

1186-1198: Patching _throttle_state_message for easier testing.
Great approach to bypass or control throttling logic. Did you consider verifying you call _throttle_state_message the expected number of times? That might further ensure correctness. wdyt?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 263343a and d3e7fe2.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (11 hunks)
  • unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (4 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

[warning] 1-1: File would be reformatted. Please format the code according to the style guidelines.

⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Analyze (python)
🔇 Additional comments (7)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (4)

153-168: Check semaphore ordering in close_partition.
Currently, you acquire the semaphore (.acquire()) after updating the global cursor. Would it be helpful to acquire the semaphore first to prevent potential concurrency races where another thread might read the updated global cursor prematurely? wdyt?


231-234: Confirm parent state accuracy.
When calling _generate_slices_from_partition(partition, parent_state), can we confirm that each partition's parent state is always valid for that partition? This might help ensure correctness for nested substreams. wdyt?


302-311: Avoid potential PII in logs.
The logs mention dropping partitions by referencing the partition data. Could that data be sensitive or user-specific? Perhaps mask or hash it if user data can appear. wdyt?


426-430: Ensure consistent type when comparing cursor values.
We compare the old _new_global_cursor value to value with <. Are we certain both are the same type or that they compare reliably? If we expect strings vs. timestamps, that might cause issues. wdyt?

unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (3)

6-6: Grateful for the handy mock imports.
No issues here—just acknowledging the addition of MagicMock and patch. This helps significantly with concurrency tests.


22-22: Nice direct import of ConcurrentPerPartitionCursor.
This import clarifies references to the cursor in tests.


2978-3020: Comprehensive test for test_state_throttling.
The test ensures no new state is emitted if fewer than 60 seconds have passed. This is an excellent scenario coverage. If you anticipate different intervals in the future, might it be worthwhile to add a parameterized test for various intervals? wdyt?

@github-actions github-actions bot added the enhancement New feature or request label Feb 17, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)

415-423: ⚠️ Potential issue

Handle partition observation failures.

The order of operations in observe might lead to inconsistent state. If cursor_per_partition[...].observe(record) fails, the global cursor might be ahead incorrectly.

This was previously identified in a past review. Would you prefer a try/finally or a combined approach? wdyt?

🧹 Nitpick comments (2)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (2)

62-63: Consider documenting the rationale for partition limits.

The changes introduce two distinct limits: DEFAULT_MAX_PARTITIONS_NUMBER (25,000) and SWITCH_TO_GLOBAL_LIMIT (10,000). Would you consider adding docstrings to explain:

  1. Why these specific numbers were chosen?
  2. The memory implications of increasing the max partitions?
  3. The tradeoffs of switching to global cursor at 10,000?

wdyt?


199-207: Consider making the throttle interval configurable.

The 60-second throttle interval is currently hardcoded. Would you consider:

  1. Making it a class constant for easier configuration?
  2. Adding it as a parameter to the constructor?

This would make it easier to adjust based on different use cases or performance requirements. wdyt?

+    DEFAULT_STATE_EMISSION_INTERVAL = 60  # seconds
     def _throttle_state_message(self) -> Optional[float]:
         current_time = time.time()
-        if current_time - self._last_emission_time <= 60:
+        if current_time - self._last_emission_time <= self.DEFAULT_STATE_EMISSION_INTERVAL:
             return None
         return current_time
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d3e7fe2 and 8617cc8.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (11 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

[warning] 1-1: File would be reformatted. Please format the code according to the style guidelines.

⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Analyze (python)
🔇 Additional comments (3)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (3)

167-185: Consider thread safety in parent state updates.

The _check_and_update_parent_state method looks good, but there might be a potential race condition. Even though it's called within a lock in close_partition, would you consider:

  1. Adding a note in the docstring about thread safety requirements?
  2. Documenting that this method should only be called while holding self._lock?

wdyt?


235-255: LGTM! Good use of thread safety and deep copying.

The changes to partition generation look good:

  1. Thread safety is maintained with proper lock usage
  2. Parent state is properly deep copied to prevent mutations

281-287: LGTM! Clear logging and state transition.

Good addition of informative logging when switching to global cursor. The transition logic is clear and well-documented.

@maxi297 maxi297 changed the base branch from main to tolik0/refactor-concurrent-global-cursor February 17, 2025 16:53
Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So let me rephrase the logic of this implementation in words so that you can confirm my understanding.

Order of events

While consuming the parent stream, we keep track of the state after each parent record as part of an ordered list of states described by self._partition_parent_state_map. When a partition on the child stream is closed, we can check a couple of things:

  • Have we generated all the slices for this partition (for example, let's assume we have parent_record_id, we might have slices {"partition": <parent_record_id>, "cursor": <january>} and {"partition": <parent_record_id>, "cursor": <february>})? If not, we can't progress the state because it is impossible for <parent_record_id> to be fully processed
  • Have we processed all the slices associated with the parent record id? This is done using self._semaphore_per_partition[p_key]._value != 0

Given the two conditions above, it means that we can progress the state. Here is a timeline of what events can look like:

  • T0: Generate {"partition": <parent_record_id>, "cursor": <january>}, january is NOT last
  • T1: Closing {"partition": <parent_record_id>, "cursor": <january>}, the parent state is not progressed because we still have slices to generate
  • T2: Generate {"partition": <parent_record_id>, "cursor": <february>}, february is last
  • T3: Closing {"partition": <parent_record_id>, "cursor": <february>}, we can progress the parent state

This works with unordered execution of slices with the same slice.partition but different slice.cursor:

  • T0: Generate {"partition": <parent_record_id>, "cursor": <january>}, january is NOT last
  • T1: Generate {"partition": <parent_record_id>, "cursor": <february>}, february is last
  • T2: Closing {"partition": <parent_record_id>, "cursor": <february>}, the parent state is not progressed because we still have slices to process
  • T3: Closing {"partition": <parent_record_id>, "cursor": <january>}, we can progress the parent state

This works with unordered execution of slices with different child slice.partition:

  • T0: Generate {"partition": <parent_record_id_1>, "cursor": <whatever>}, whatever is last for parent_record_id_1
  • T1: Generate {"partition": <parent_record_id_2>, "cursor": <whatever>}, whatever is last for parent_record_id_2
  • T2: Closing {"partition": <parent_record_id_2>, "cursor": <whatever>}, the parent state is not progressed because parent_record_id_1 is first in the list and hasn't been cleared out
  • T3: Closing {"partition": <parent_record_id_1>, "cursor": <whatever>}, we can progress the parent state to the parent state when parent_record_id_2 was emitted

Assumptions/concerns

  • [I'm fine with this one] The state is updated as part of the partition router. We wanted to move away from DeclarativeStream traction their own state because of issues it created (calling twice stream_slices or read_records caused issues in the past). But we can have SubstreamPartitionRouter track the state using a cursor in the future and that would be just fine as well.
  • [I think this is already the case but...] Given we don't support checkpointing state on records, the cursors should only progress the state when closing partitions.
  • The parent state is deep copied in order to avoid it being updated for all entries in self._partition_parent_state_map at the same time.
  • [I think we need to evaluate the likeliness of issues related to this] Memory usage is not an issue. Do we need a backup plan if there are a lot of partitions?
    • An idea of optimization memory wise would be to remove the entry in _partition_parent_state_map if the state for the next entry is the same. For example, if records are emitted in the order ["parent_record_id_1", "parent_record_id_2", "parent_record_id_3"] and all records are from the same parent slice, it is very likely that the state will be the same because we only progress the state when closing the slice. This would mean that we would have the following sequence of events:
      • T0: Generate slice for parent_record_id_1 with parent state <parent state 1>, self._partition_parent_state_map = {"parent_record_id_1": <parent state 1>}
      • T1: Generate slice for parent_record_id_2 with parent state <parent state 1>, self._partition_parent_state_map = {"parent_record_id_2": <parent state 1>} instead of {"parent_record_id_1": <parent state 1>, "parent_record_id_2": <parent state 1>}
      • T2: Generate slice for parent_record_id_3 with parent state <parent state 2>, self._partition_parent_state_map = {"parent_record_id_2": <parent state 1>, "parent_record_id_3": <parent state 2>} instead of {"parent_record_id_1": <parent state 1>, "parent_record_id_2": <parent state 1>, "parent_record_id_3": <parent state 2>}
        This would not necessarily be enough to prevent any memory creep so we might need to find other guardrails

Benefits/drawbacks

Having the solution as part of ConcurrentPerPartitionCursor is good because:

  • We already track most of the information needed (apart from the parent state) in the ConcurrentPerPartitionCursor
  • The logic does not bleed (like in some proposal like this one where we would pass the information through the stream_slice)

Therefore, I think it is a smart solution and it feels we should go forward with it. What would be the next steps? Unit tests? Testing on the connection we struggled with for source-amazon-ads?

@tolik0
Copy link
Contributor Author

tolik0 commented Feb 17, 2025

@maxi297 Yes, the next steps will be to implement the optimization you suggested, followed by adding unit tests and testing with Amazon Seller Partner users who previously experienced issues.

@maxi297
Copy link
Contributor

maxi297 commented Feb 17, 2025

Regarding my optimization, I don't think it is possible as we need to know parent_record_id_1, parent_record_id_2 and parent_record_id_3 have all been processed in order to move the cursor to <parent state 2>. Hence, we need to keep track of parent_record_id_1 and parent_record_id_2 somewhere (without necessarily having to have the parent state multiple times though). This highlights the fact that order of all parent records is important and we will need to keep track of that. This constraints makes it more complex to optimize memory... Maybe there is something I'm missing?

@maxi297
Copy link
Contributor

maxi297 commented Feb 17, 2025

Adding to the previous comment, we need to keep all semaphores in memory anyway because if one does not have its value == 0, it means we can't progress the global state. We assume all those semaphores to be very low in terms of memory usage given that they are only a wrapper over and integer. Hence, we can reduce memory usage by instead of self._semaphore_per_partition[p_key]._value == 0, we validate that all semaphores before p_key have a value of 0

Base automatically changed from tolik0/refactor-concurrent-global-cursor to main February 18, 2025 10:36
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (3)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (3)

62-64: Consider documenting the relationship between partition limits.

The relationship between DEFAULT_MAX_PARTITIONS_NUMBER (25,000) and SWITCH_TO_GLOBAL_LIMIT (10,000) isn't immediately clear. Would you consider adding a docstring explaining why we switch to global cursor at 10K but allow up to 25K partitions? Also, have you validated the memory impact of increasing the max partitions from 10K to 25K? wdyt?


204-204: Consider making the throttle interval configurable.

The 60-second throttle interval is hardcoded. Would you consider making this configurable through a class constant or constructor parameter? This would allow users to adjust the throttling based on their needs. wdyt?

+    DEFAULT_STATE_EMISSION_INTERVAL = 60  # seconds
     def _throttle_state_message(self) -> Optional[float]:
         current_time = time.time()
-        if current_time - self._last_emission_time <= 60:
+        if current_time - self._last_emission_time <= self.DEFAULT_STATE_EMISSION_INTERVAL:

254-256: Consider reducing lock scope for better concurrency.

The lock is held while doing a deepcopy of the parent state. Would you consider moving the deepcopy outside the lock to minimize lock contention? wdyt?

+        parent_state_copy = deepcopy(parent_state)
         with self._lock:
-            self._partition_parent_state_map[partition_key] = deepcopy(parent_state)
+            self._partition_parent_state_map[partition_key] = parent_state_copy
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8617cc8 and a8db6b6.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (5 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (6)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Ruff Format Check
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Analyze (python)

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)

62-63: Consider documenting the memory implications of the increased partition limit.

The DEFAULT_MAX_PARTITIONS_NUMBER has been increased from 10,000 to 25,000. Would you consider adding a comment explaining the memory implications and why 25,000 was chosen as the new limit? This could help future maintainers understand the tradeoffs. wdyt?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a8db6b6 and 203c131.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (4 hunks)
  • unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Analyze (python)
🔇 Additional comments (6)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (3)

167-201: LGTM! Well-structured parent state management.

The implementation correctly ensures that parent states are updated in order, only when all earlier partitions are fully finished. The code is well-documented and handles edge cases appropriately.


215-223: LGTM! Effective state emission throttling.

The throttling implementation effectively limits state message emissions to once every 60 seconds, which should help reduce overhead.


246-249: LGTM! Clean integration of parent state in slice generation.

The changes effectively integrate parent state tracking into the slice generation process while maintaining the existing functionality.

unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (3)

308-361: LGTM! Robust test helper functions.

The test helpers provide a solid foundation for testing state transitions, request mocking, and record verification. The implementation is thorough and well-documented.

Also applies to: 763-867


425-2331: LGTM! Comprehensive test coverage.

The test suite thoroughly covers various scenarios including:

  • Parent state updates
  • Error handling
  • Empty record handling
  • State migrations
  • List partition router
  • Request options

The test cases are well-structured and provide good coverage of edge cases.


2980-3022: LGTM! Well-structured throttling test.

The test effectively verifies the state emission throttling behavior, covering both cases where emission should and should not occur. The mock setup and assertions are clear and comprehensive.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (4)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (4)

308-361: Consider adding type hints to improve code maintainability?

The run_mocked_test function could benefit from type hints for better code maintainability and IDE support. For example:

-def run_mocked_test(
+def run_mocked_test(
+    mock_requests: List[Tuple[str, Optional[Dict[str, Any]]]],
+    manifest: Dict[str, Any],
+    config: Dict[str, Any],
+    stream_name: str,
+    initial_state: Dict[str, Any],
+    expected_records: List[Dict[str, Any]],
+    expected_state: Dict[str, Any]
+) -> None:

Also, consider making the assertion messages more descriptive by including the actual values in the error message, wdyt?


382-396: Consider grouping related constants into classes or dataclasses?

The date-related constants could be organized into a dataclass or class for better encapsulation and readability, wdyt?

+@dataclass
+class TestDates:
+    START_DATE: str = "2024-01-01T00:00:01Z"
+    POST_1_UPDATED_AT: str = "2024-01-30T00:00:00Z"
+    POST_2_UPDATED_AT: str = "2024-01-29T00:00:00Z"
+    POST_3_UPDATED_AT: str = "2024-01-28T00:00:00Z"

737-761: Consider adding test class for better organization?

The test functions could be organized into a test class for better structure and potential shared setup/teardown, wdyt?

+class TestConcurrentPerPartitionCursor:
+    @pytest.mark.parametrize(...)
+    def test_incremental_parent_state_no_incremental_dependency(
+        self,
+        test_name,
+        manifest,
+        mock_requests,
+        expected_records,
+        initial_state,
+        expected_state
+    ):

3267-3309: Consider adding more edge cases to state throttling tests?

The state throttling tests cover basic scenarios but could benefit from additional edge cases, wdyt? For example:

  • Test behavior when time goes backwards
  • Test with very large time values
  • Test with time values close to the throttling threshold
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 203c131 and a1d98fb.

📒 Files selected for processing (1)
  • unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (8 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Analyze (python)
🔇 Additional comments (1)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (1)

1-3309: Great test coverage and organization!

The test suite is well-structured with comprehensive coverage of various scenarios. The use of pytest features, mock requests, and clear test data organization makes the tests maintainable and readable.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (4)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (4)

3032-3078: Consider enhancing test assertions for better failure messages, wdyt?

The test could be more maintainable with:

  1. More descriptive assertion messages using pytest's assert messages
  2. Additional edge cases like None values for state
  3. Verifying the order of mock calls
-    assert cursor.state == {
+    assert cursor.state == {
         "use_global_cursor": False,
         "lookback_window": 0,
         "states": [],
-    }
+    }, "Expected empty state with default values"

-    assert len(cursor._cursor_per_partition) == 0
+    assert len(cursor._cursor_per_partition) == 0, "Expected no cursors"
-    assert len(cursor._semaphore_per_partition) == 0
+    assert len(cursor._semaphore_per_partition) == 0, "Expected no semaphores"
-    assert len(cursor._partition_parent_state_map) == 0
+    assert len(cursor._partition_parent_state_map) == 0, "Expected no parent state mappings"
-    assert mock_cursor.stream_slices.call_count == 0
+    assert mock_cursor.stream_slices.call_count == 0, "Expected no calls to stream_slices"

+    # Add edge case for None state
+    cursor._state = None
+    slices = list(cursor.stream_slices())
+    assert cursor.state == {
+        "use_global_cursor": False,
+        "lookback_window": 0,
+        "states": [],
+    }, "Expected default state when input state is None"

3080-3153: Consider adding error cases and improving variable naming, thoughts?

The test could be enhanced with:

  1. More descriptive variable names (e.g., slice1_data instead of data1)
  2. Error case handling (e.g., what happens if parent state update fails?)
  3. Verification of operation order
-    mock_cursor.stream_slices.side_effect = [
-        iter(
-            [
-                {"slice1": "data1"},
-                {"slice2": "data1"},  # First slice
-            ]
-        ),
+    mock_cursor.stream_slices.side_effect = [
+        iter(
+            [
+                {"partition1": "initial_data"},
+                {"partition2": "initial_data"},
+            ]
+        ),
-        iter(
-            [
-                {"slice2": "data2"},
-                {"slice2": "data2"},  # First slice for new partition
-            ]
-        ),
+        iter(
+            [
+                {"partition2": "updated_data"},
+                {"partition2": "final_data"},
+            ]
+        ),
     ]

+    # Add error case test
+    partition_router.get_stream_state.side_effect = Exception("Failed to get parent state")
+    with pytest.raises(Exception, match="Failed to get parent state"):
+        cursor.close_partition(
+            DeclarativePartition("test_stream", {}, MagicMock(), MagicMock(), slices[0])
+        )

+    # Verify operation order
+    mock_calls = cursor._message_repository.method_calls
+    assert mock_calls[0][0] == "emit_message", "Expected emit_message to be called first"

3155-3227: Consider adding throttling tests and resource cleanup verification, what do you think?

The test could be enhanced with:

  1. Verification of state emission throttling
  2. Resource cleanup checks
  3. Memory leak prevention tests
+    # Test state emission throttling
+    with patch("time.time") as mock_time:
+        mock_time.return_value = 0
+        cursor.close_partition(
+            DeclarativePartition("test_stream", {}, MagicMock(), MagicMock(), slices[0])
+        )
+        assert cursor._message_repository.emit_message.call_count == 1
+
+        # Try to emit again immediately
+        mock_time.return_value = 30  # 30 seconds later
+        cursor.close_partition(
+            DeclarativePartition("test_stream", {}, MagicMock(), MagicMock(), slices[1])
+        )
+        assert cursor._message_repository.emit_message.call_count == 1  # Should not emit
+
+    # Verify resource cleanup
+    assert not cursor._cursor_per_partition, "Expected cursors to be cleaned up"
+    assert not cursor._semaphore_per_partition, "Expected semaphores to be cleaned up"
+    assert not cursor._partition_parent_state_map, "Expected parent state map to be cleaned up"

3229-3287: Consider adding boundary condition tests and memory checks, thoughts?

The test could be enhanced with:

  1. Testing at the exact boundary of partition limits
  2. Memory usage verification
  3. Testing the transition back from global cursor
+    # Test exact boundary condition
+    cursor.DEFAULT_MAX_PARTITIONS_NUMBER = 2
+    cursor.SWITCH_TO_GLOBAL_LIMIT = 2
+    
+    # Create exactly max partitions
+    partitions = [
+        StreamSlice(partition={"id": str(i)}, cursor_slice={}, extra_fields={})
+        for i in range(cursor.DEFAULT_MAX_PARTITIONS_NUMBER)
+    ]
+    partition_router.stream_slices.return_value = iter(partitions)
+    
+    slices = list(cursor.stream_slices())
+    for slice in slices:
+        cursor.close_partition(
+            DeclarativePartition("test_stream", {}, MagicMock(), MagicMock(), slice)
+        )
+    
+    assert not cursor.state.get("use_global_cursor", False), "Should not switch to global cursor at exact limit"
+
+    # Verify memory cleanup after switch
+    import sys
+    import gc
+    
+    initial_size = sys.getsizeof(cursor._cursor_per_partition)
+    # Add one more partition to trigger switch
+    cursor.close_partition(
+        DeclarativePartition("test_stream", {}, MagicMock(), MagicMock(), StreamSlice(partition={"id": "extra"}, cursor_slice={}, extra_fields={}))
+    )
+    gc.collect()
+    final_size = sys.getsizeof(cursor._cursor_per_partition)
+    
+    assert final_size <= initial_size, "Memory usage should not increase after switch to global cursor"
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a1d98fb and eff25ec.

📒 Files selected for processing (1)
  • unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Analyze (python)

@tolik0 tolik0 requested a review from maxi297 February 19, 2025 16:52
Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have a test where the parent state is partially progressed because the last partition that was generated has not yet been completed but the first one has?

Apart from those small comments, I think we're good!

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (5)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (3)

167-214: Consider optimizing parent state updates for better performance.

The current implementation iterates through all semaphores up to the earliest key, which could be inefficient for large numbers of partitions. Would you consider maintaining an index of the last cleaned parent state to avoid unnecessary iterations? This could be particularly important since this runs under a lock and blocks other threads from generating slices. wdyt?

 def _check_and_update_parent_state(self) -> None:
+    # Track the last cleaned parent state index
+    if not hasattr(self, '_last_cleaned_index'):
+        self._last_cleaned_index = 0
+
     last_closed_state = None
     while self._partition_parent_state_map:
         earliest_key = next(iter(self._partition_parent_state_map))
+        # Start from the last cleaned index
+        partition_keys = list(self._semaphore_per_partition.keys())[self._last_cleaned_index:]
         all_left_finished = True
-        for p_key, sem in list(self._semaphore_per_partition.items()):
+        for p_key, sem in [(k, self._semaphore_per_partition[k]) for k in partition_keys]:
             if p_key not in self._finished_partitions or sem._value != 0:
                 all_left_finished = False
                 break
             if p_key == earliest_key:
+                self._last_cleaned_index = list(self._semaphore_per_partition.keys()).index(p_key)
                 break

228-236: Consider making the throttle interval configurable.

The state emission throttle interval is hardcoded to 60 seconds. Would you consider making this configurable via a class constant or configuration parameter to allow for different use cases? This would provide more flexibility for different stream requirements. wdyt?

+    # Default throttle interval in seconds
+    DEFAULT_STATE_THROTTLE_INTERVAL = 60
+
     def _throttle_state_message(self) -> Optional[float]:
         """
         Throttles the state message emission to once every 60 seconds.
         """
         current_time = time.time()
-        if current_time - self._last_emission_time <= 60:
+        if current_time - self._last_emission_time <= self.DEFAULT_STATE_THROTTLE_INTERVAL:
             return None
         return current_time

254-254: Remove debug print statement.

This print statement appears to be for debugging purposes and should be removed or replaced with proper logging. wdyt?

-        print("stream_slices")
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (2)

3032-3078: Consider improving test method naming for better clarity.

The test name test_given_no_partitions_processed_when_close_partition_then_no_state_update follows a good pattern, but it's quite long. Would you consider using a more concise format like test_no_state_update_for_unprocessed_partitions? This maintains clarity while being more readable. wdyt?


3385-3455: Consider splitting the test into smaller focused tests.

This test method is quite long and tests multiple aspects of semaphore cleanup. Would you consider splitting it into smaller, more focused test methods? For example:

  • test_semaphore_cleanup_initial_state
  • test_semaphore_cleanup_after_closing_partitions
  • test_semaphore_cleanup_parent_state_updates

This would make the tests more maintainable and easier to understand. wdyt?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between eff25ec and 4a18954.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (5 hunks)
  • unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Analyze (python)

Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the CI passes, I'm good with this change. The comments are just nits/validation. Thanks @tolik0

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (2)

3032-3078: Great test coverage for the empty partition scenario! Would you consider adding a docstring?

The test thoroughly verifies the behavior when no partitions are processed. To make it even more maintainable, what do you think about adding a docstring that explains:

  • The purpose of testing empty partitions
  • The expected behavior
  • The key assertions being made
 def test_given_no_partitions_processed_when_close_partition_then_no_state_update():
+    """
+    Test that when no partitions are processed:
+    - The cursor state remains empty
+    - No cursor slices are generated
+    - All internal data structures (cursor_per_partition, semaphore_per_partition, partition_parent_state_map) are empty
+    """
     mock_cursor = MagicMock()

3383-3453: Consider extracting magic strings as constants for better maintainability?

The test data setup is thorough, but what do you think about extracting some of the repeated values as constants? For example:

  • "2024-01-02T00:00:00Z", "2024-01-03T00:00:00Z" as date constants
  • {"parent": {"state": "state1"}} as state constants
+# Test Constants
+PARTITION_1_DATE = "2024-01-02T00:00:00Z"
+PARTITION_2_DATE = "2024-01-03T00:00:00Z"
+PARENT_STATE_1 = {"parent": {"state": "state1"}}
+PARENT_STATE_2 = {"parent": {"state": "state2"}}

 def test_semaphore_cleanup():
     # Create two mock cursors with different states for each partition
     mock_cursor_1 = MagicMock()
     mock_cursor_1.stream_slices.return_value = iter([
         {"slice1": "data1"},
         {"slice2": "data1"},  # First partition slices
     ])
-    mock_cursor_1.state = {"updated_at": "2024-01-02T00:00:00Z"}  # State for partition "1"
+    mock_cursor_1.state = {"updated_at": PARTITION_1_DATE}  # State for partition "1"
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4a18954 and a7ece97.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (5 hunks)
  • unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Analyze (python)
  • GitHub Check: Pytest (Fast)
🔇 Additional comments (1)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (1)

316-369: LGTM! Well-structured test helper function.

The run_mocked_test helper function is well-documented and handles all the common test setup, making the tests more maintainable and reducing code duplication.

@tolik0 tolik0 merged commit 4991e07 into main Feb 24, 2025
21 of 23 checks passed
@tolik0 tolik0 deleted the tolik0/concurrent-perpartition-add-parent-state-updates branch February 24, 2025 12:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants