-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(low code): Add GroupingPartitionRouter #354
base: main
Are you sure you want to change the base?
Conversation
📝 WalkthroughWalkthroughThis pull request introduces a new component called GroupingPartitionRouter for declarative data extraction. It allows batches of partition keys to be grouped into slices with a configurable size and optional deduplication. The changes span YAML schema updates, new and modified Python classes in models, a factory method for component instantiation, and export adjustments to expose the new router. Existing retriever components are updated to support the new grouping functionality. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Factory as ModelToComponentFactory
participant Underlying as PartitionRouter
participant Grouping as GroupingPartitionRouter
Client->>Factory: create_grouping_partition_router(model, config)
Factory->>Factory: _create_component_from_model(model)
Factory-->>Underlying: Instantiate underlying partition router
Factory->>Factory: Validate instance is a PartitionRouter
alt Valid underlying router
Factory->>Grouping: Initialize GroupingPartitionRouter with parameters
Grouping-->>Factory: GroupingPartitionRouter instance created
else Invalid underlying router
Factory-->>Client: Raise ValueError
end
Factory-->>Client: Return GroupingPartitionRouter instance
sequenceDiagram
participant Grouping as GroupingPartitionRouter
participant Underlying as UnderlyingPartitionRouter
participant Client as DataConsumer
Client->>Grouping: Request stream slices
Grouping->>Underlying: Call stream_slices()
loop Processing partitions
Underlying-->>Grouping: Return partition slice
Grouping->>Grouping: Buffer slice (apply deduplication if enabled)
alt Batch size reached or stream end
Grouping-->>Client: Yield grouped slice batch
end
end
Possibly Related PRs
Suggested Reviewers
Tip CodeRabbit's docstrings feature is now available as part of our Pro Plan! Simply use the command ✨ Finishing Touches
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Nitpick comments (3)
airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py (1)
12-24
: Nitpick: Clarify mention of SinglePartitionRouter in the docstring.In the docstring (line 20), the router is referred to as a
SinglePartitionRouter
, but the code usesPartitionRouter
. Might be worth updating to maintain consistency and reduce confusion, wdyt?airbyte_cdk/sources/declarative/declarative_component_schema.yaml (2)
3133-3141
: Integration in SimpleRetriever: Inclusion of GroupingPartitionRouter
It looks like you added references to the newGroupingPartitionRouter
inside thepartition_router
allowed types forSimpleRetriever
. This should enable users to choose the grouping strategy for partitioning. Have you validated that the ordering and overall schema constraints are still correct here? wdyt?
3295-3302
: Integration in AsyncRetriever: Inclusion of GroupingPartitionRouter
Similar to the SimpleRetriever, theAsyncRetriever
now acceptsGroupingPartitionRouter
in itspartition_router
field. This is a great step toward consistency. Would you mind confirming that the asynchronous flow correctly processes grouped partitions as expected in practice? wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml
(3 hunks)airbyte_cdk/sources/declarative/models/declarative_component_schema.py
(4 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(3 hunks)airbyte_cdk/sources/declarative/partition_routers/__init__.py
(2 hunks)airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py
(1 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py
[error] 43-43: Need type annotation for 'seen_keys'
[error] 51-51: Unsupported right operand type for in ('set[Any] | None')
[error] 57-57: Item 'None' of 'set[Any] | None' has no attribute 'add'
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
[error] 3056-3056: Argument 'model' to '_create_component_from_model' of 'ModelToComponentFactory' has incompatible type 'CustomPartitionRouter | ListPartitionRouter | SubstreamPartitionRouter | None'; expected 'BaseModel'
[error] 3064-3064: Unexpected keyword argument 'parameters' for 'GroupingPartitionRouter'
airbyte_cdk/sources/declarative/models/declarative_component_schema.py
[error] 2376-2376: Invalid type comment or annotation. Suggestion: use conint[...] instead of conint(...)
⏰ Context from checks skipped due to timeout of 90000ms (8)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Analyze (python)
🔇 Additional comments (3)
airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py (1)
68-117
: LGTM for request/response methods.The request/response components and their delegations appear solid and maintainable. wdyt?
airbyte_cdk/sources/declarative/partition_routers/__init__.py (1)
11-13
: Looks good!Exporting
GroupingPartitionRouter
here cleanly integrates it into your module’s public API, wdyt?airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
230-232
: LGTM! Import follows the established pattern.
airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py
Outdated
Show resolved
Hide resolved
airbyte_cdk/sources/declarative/models/declarative_component_schema.py
Outdated
Show resolved
Hide resolved
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Outdated
Show resolved
Hide resolved
airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (2)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
3052-3070
:⚠️ Potential issueFix parameter handling in GroupingPartitionRouter instantiation.
The pipeline failure indicates that
parameters
is not a valid argument forGroupingPartitionRouter
. Looking at the implementation, we should remove this argument.What do you think about this fix?
return GroupingPartitionRouter( group_size=model.group_size, underlying_partition_router=underlying_router, deduplicate=model.deduplicate if model.deduplicate is not None else True, - parameters=model.parameters or {}, config=config, )
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
2390-2411
: 🛠️ Refactor suggestionThe GroupingPartitionRouter schema looks good, but should we update the type annotation?
The implementation looks solid with well-documented fields. However, there's a potential issue with the
conint
type annotation that might cause problems. Would you like to update it to use bracket notation instead?- group_size: conint(ge=1) = Field( + group_size: conint[ge=1] = Field(
🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
3055-3057
: Add type safety check for underlying_partition_router.To prevent potential runtime errors, we should add a type check for the underlying_partition_router model.
What do you think about adding this type check?
+ if not isinstance(model.underlying_partition_router, BaseModel): + raise ValueError( + f"underlying_partition_router must be a BaseModel instance, got {type(model.underlying_partition_router)}" + ) + underlying_router = self._create_component_from_model( model=model.underlying_partition_router, config=config )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml
(3 hunks)airbyte_cdk/sources/declarative/models/declarative_component_schema.py
(11 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(3 hunks)airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py
(1 hunks)unit_tests/sources/declarative/partition_routers/test_grouping_partition_router.py
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/models/declarative_component_schema.py
[error] 1-1: Ruff formatting check failed. 1 file would be reformatted. Run 'ruff format' to fix code style issues in this file.
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
[error] 3086-3086: Unexpected keyword argument 'parameters' for 'GroupingPartitionRouter'
🪛 GitHub Actions: Pytest (Fast)
unit_tests/sources/declarative/partition_routers/test_grouping_partition_router.py
[error] 9-9: ModuleNotFoundError: No module named 'test_substream_partition_router'.
⏰ Context from checks skipped due to timeout of 90000ms (5)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Analyze (python)
🔇 Additional comments (8)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml (1)
3419-3458
: Friendly Check on the New GroupingPartitionRouter ComponentYou've introduced the new
GroupingPartitionRouter
with clear required fields such astype
,group_size
, andunderlying_partition_router
, and added an optionaldeduplicate
flag with a sensible default of true. I really appreciate how the description transparently explains that grouping may affect per-partition incremental syncs—this thoughtful note will help users understand potential trade-offs.A couple of questions to ensure we’re covering all bases:
- Would you consider expanding the description of
deduplicate
to detail how duplicate partitions are identified or handled? This might further assist users configuring the component.- Is the naming of
underlying_partition_router
consistent with related schemas elsewhere and aligned with our overall naming conventions? It looks correct here, but it might be good to verify this across the codebase.Overall, this component appears well thought out. wdyt?
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
230-232
: LGTM!The import of GroupingPartitionRouterModel is correctly placed in alphabetical order with other model imports.
385-385
: LGTM!The import of GroupingPartitionRouter is correctly placed in alphabetical order with other router imports.
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
2242-2250
: LGTM! The partition router integration is well-structured.The GroupingPartitionRouter is properly integrated into the list of available partition routers for both SimpleRetriever and AsyncRetriever.
unit_tests/sources/declarative/partition_routers/test_grouping_partition_router.py (4)
89-171
: LGTM! Comprehensive test coverage for grouping behavior.The parametrized test cases effectively cover different group sizes and deduplication scenarios. The expected results are well-defined and the assertions are clear.
203-299
: LGTM! Excellent lazy iteration testing.The test effectively verifies that the GroupingPartitionRouter processes partitions lazily using a controlled iterator. The assertions at different stages provide good confidence in the implementation.
330-375
: LGTM! Good edge case coverage for extra fields.The test properly handles scenarios with missing fields, ensuring the router can handle varied data structures gracefully.
419-446
: LGTM! Good test for non-empty parent slices.The test ensures that parent slice information is properly preserved during grouping operations.
unit_tests/sources/declarative/partition_routers/test_grouping_partition_router.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
♻️ Duplicate comments (2)
airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py (2)
42-44
: 🛠️ Refactor suggestionFix type safety in deduplication logic.
Based on past review comments, there are type safety issues in the deduplication logic. Would you consider this approach to improve type safety, wdyt?
- batch = [] - seen_keys = set() + from typing import List, Set, Any + batch: List[StreamSlice] = [] + seen_keys: Set[Any] = set()
42-44
:⚠️ Potential issueFix type safety issues in deduplication logic.
The
seen_keys
variable needs type annotation to prevent type checker issues. Additionally, consider handling the case where partitions might be empty.What do you think about these changes?
- batch = [] - seen_keys = set() + from typing import Set, Any + batch: list[StreamSlice] = [] + seen_keys: Set[Any] = set()
🧹 Nitpick comments (8)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (1)
3849-3903
: LGTM! The test is well-structured and comprehensive.The test effectively validates:
- Creation of GroupingPartitionRouter with correct group size
- Proper configuration of the underlying SubstreamPartitionRouter
- Parent stream configuration and field mappings
One suggestion: Would you consider adding a test case for deduplication behavior since GroupingPartitionRouter supports it? wdyt?
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
3053-3072
: Add type safety checks for the underlying partition router.The current implementation could be improved with better type safety. Would you consider adding type checks for
model.underlying_partition_router
before passing it to_create_component_from_model
? This would help catch type-related issues earlier. For example:def create_grouping_partition_router( self, model: GroupingPartitionRouterModel, config: Config, **kwargs: Any ) -> GroupingPartitionRouter: + if not isinstance(model.underlying_partition_router, BaseModel): + raise ValueError( + f"underlying_partition_router must be a BaseModel instance, got {type(model.underlying_partition_router)}" + ) + underlying_router = self._create_component_from_model( model=model.underlying_partition_router, config=config )wdyt?
3053-3072
: Consider adding type hints for better type safety.The implementation could benefit from explicit type hints. Would you consider these changes to improve type safety, wdyt?
def create_grouping_partition_router( self, model: GroupingPartitionRouterModel, config: Config, **kwargs: Any ) -> GroupingPartitionRouter: + from typing import cast underlying_router = self._create_component_from_model( model=model.underlying_partition_router, config=config ) if model.group_size < 1: raise ValueError(f"Group size must be greater than 0, got {model.group_size}") if not isinstance(underlying_router, PartitionRouter): raise ValueError( f"Underlying partition router must be a PartitionRouter subclass, got {type(underlying_router)}" ) return GroupingPartitionRouter( group_size=model.group_size, underlying_partition_router=cast(PartitionRouter, underlying_router), deduplicate=model.deduplicate if model.deduplicate is not None else True, config=config, )
airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py (4)
31-67
: Consider adding error handling for empty partitions.The current implementation assumes that partitions will always have values. Would you consider adding error handling for empty partitions? For example:
def stream_slices(self) -> Iterable[StreamSlice]: batch = [] seen_keys = set() for partition in self.underlying_partition_router.stream_slices(): + if not partition.partition: + continue + key = next(iter(partition.partition.values()), None)Also, consider adding a docstring note about how empty partitions are handled, wdyt?
68-96
: Consider handling edge cases in _create_grouped_slice.The method assumes that all partitions in a batch have the same keys. Would you consider adding validation or handling for cases where partitions might have different keys? For example:
def _create_grouped_slice(self, batch: list[StreamSlice]) -> StreamSlice: + if not batch: + raise ValueError("Cannot create a grouped slice from an empty batch") + grouped_partition = { - key: [p.partition.get(key) for p in batch] for key in batch[0].partition.keys() + key: [p.partition.get(key) for p in batch] + for key in set().union(*(p.partition.keys() for p in batch)) }wdyt?
68-97
: Consider improving type hints in _create_grouped_slice method.The method could benefit from more explicit type hints. Would you consider these changes, wdyt?
- def _create_grouped_slice(self, batch: list[StreamSlice]) -> StreamSlice: + def _create_grouped_slice(self, batch: List[StreamSlice]) -> StreamSlice: """ Creates a grouped StreamSlice from a batch of partitions, aggregating extra fields into a dictionary with list values. Args: - batch (list[StreamSlice]): A list of StreamSlice objects to group. + batch (List[StreamSlice]): A list of StreamSlice objects to group. Returns: StreamSlice: A single StreamSlice with combined partition and extra field values. """ # Combine partition values into a single dict with lists + from typing import Dict + grouped_partition: Dict[str, list] = { - grouped_partition = { key: [p.partition.get(key) for p in batch] for key in batch[0].partition.keys() } # Aggregate extra fields into a dict with list values - extra_fields_dict = ( + extra_fields_dict: Dict[str, list] = (
47-52
: Consider adding error handling for empty partitions.The code assumes that partition values exist. Would you consider adding validation to handle empty partitions gracefully, wdyt?
# Extract the partition key (assuming single key-value pair, e.g., {"board_ids": value}) + if not partition.partition: + continue key = next(iter(partition.partition.values()), None) + if key is None: + continueairbyte_cdk/sources/declarative/declarative_component_schema.yaml (1)
3420-3457
: Review the GroupingPartitionRouter schema definition:
- The new
GroupingPartitionRouter
component is clearly defined with its key properties:type
,group_size
, andunderlying_partition_router
. The renaming tounderlying_partition_router
nicely addresses the previous naming consistency comment.- The description is comprehensive, explaining both the use case (grouping partitions for APIs supporting multiple keys in one request) and a potential pitfall with incremental syncs; this extra context is really helpful.
- The default value for the
deduplicate
flag is set totrue
, which seems sensible for ensuring unique partitions. Is this the intended behavior in most cases? Perhaps adding a brief note on why duplications are removed by default could further guide users.- Would you consider providing a small example configuration snippet in the documentation? It might help users better understand how to integrate this new component into their stream settings.
wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml
(3 hunks)airbyte_cdk/sources/declarative/models/declarative_component_schema.py
(3 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(4 hunks)airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py
(1 hunks)unit_tests/sources/declarative/parsers/test_model_to_component_factory.py
(3 hunks)unit_tests/sources/declarative/partition_routers/test_grouping_partition_router.py
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- unit_tests/sources/declarative/partition_routers/test_grouping_partition_router.py
👮 Files not reviewed due to content moderation or server errors (2)
- airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
- airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py
⏰ Context from checks skipped due to timeout of 90000ms (4)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Analyze (python)
🔇 Additional comments (5)
airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py (3)
1-30
: LGTM! Well-documented class with clear attributes.The class is well-structured with comprehensive docstrings explaining its purpose and attributes. The use of dataclass and type hints is appropriate.
98-137
: LGTM! Clean implementation of request methods and state management.The request methods return empty mappings as expected, and state management is properly delegated to the underlying router.
130-136
: LGTM! Clean state management delegation.The implementation correctly delegates state management to the underlying partition router.
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (2)
2374-2394
: Consider adding validation for group_size.The
group_size
field should be positive to ensure valid grouping. Based on the pipeline error, would you like to update it using bracket notation like this?- group_size: int = Field( + group_size: conint[ge=1] = Field(Also, consider adding more descriptive examples in the docstring to show how the router can be used with different underlying routers, wdyt?
2228-2237
: LGTM! The partition_router field updates look good.The changes maintain backward compatibility and consistency between SimpleRetriever and AsyncRetriever classes.
Also applies to: 2314-2323
airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py
Show resolved
Hide resolved
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Outdated
Show resolved
Hide resolved
airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py
Show resolved
Hide resolved
airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
3053-3079
: Consider adding type hints for the underlying_partition_router parameter.The implementation looks solid with proper validation checks. However, we could improve type safety by adding type hints for the underlying_partition_router parameter in the model. What do you think about this change?
- underlying_router = self._create_component_from_model( - model=model.underlying_partition_router, config=config + underlying_router: PartitionRouter = self._create_component_from_model( + model=model.underlying_partition_router, config=config )Verify the behavior of deduplicate parameter.
The deduplicate parameter defaults to True if not specified. Could you confirm if this is the intended default behavior? Also, should we document this default value in the model's schema?
Consider adding validation for parent stream configurations.
The code checks for request options in parent stream configs but doesn't validate other potential issues. Should we add more comprehensive validation? For example:
- Validate that parent streams don't have their own GroupingPartitionRouter to prevent nesting
- Check for any other incompatible configurations
wdyt?unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (1)
3849-3999
: Consider adding test cases for edge scenarios?The current test coverage looks good but we could potentially add tests for:
- Deduplication behavior
- Different group sizes
- Multiple parent streams
- Empty parent stream results
WDYT about adding these test cases to make the test coverage more comprehensive?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(4 hunks)unit_tests/sources/declarative/parsers/test_model_to_component_factory.py
(3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (5)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
🔇 Additional comments (6)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
230-232
: LGTM! Clean import of the new GroupingPartitionRouter model.The import is correctly placed in alphabetical order with other model imports.
631-631
: LGTM! Factory mapping registration looks good.The new component is properly registered in the PYDANTIC_MODEL_TO_CONSTRUCTOR mapping.
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (4)
68-70
: LGTM! Clean import addition.The GroupingPartitionRouter import is properly grouped with other similar imports.
3849-3896
: LGTM! Well-structured test for GroupingPartitionRouter creation.The test thoroughly validates:
- Successful creation of GroupingPartitionRouter with underlying SubstreamPartitionRouter
- Correct group_size configuration
- Proper setup of parent stream configs
3905-3949
: LGTM! Good negative test case.The test properly validates that an invalid group_size raises a ValueError with a descriptive message.
3951-3999
: LGTM! Good validation test.The test ensures that using request_option with GroupingPartitionRouter raises an appropriate error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a couple of questions just to clarify my understanding
if model.group_size < 1: | ||
raise ValueError(f"Group size must be greater than 0, got {model.group_size}") | ||
|
||
if not isinstance(underlying_router, PartitionRouter): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In which case can this happen?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, we explicitly define the underlying partition router class in the schema, so this check is unnecessary. I'll remove it.
|
||
def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: | ||
"""Delegate state retrieval to the underlying partition router.""" | ||
return self.underlying_partition_router.get_stream_state() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is interesting to me because it feels like we have a gap between the state of the parent and what is actually emitted and I'm not sure if this is fine. Let me explain:
. For example, given we have group_size = 2, we could have the following situation:
- T0: Parent emit parent_1, state is parent_state_1, no slices created for the child stream yet
- T1: Parent emit parent_2, state is parent_state_2, we emit a slice to the child
If we were to request the state between T0 and T1, the state would actually be wrong because we haven't consumed parent_1 from the child's perspective. I don't know if we have a process that does that. I could easily see that if we fail on the parent stream between T0 and T1, the sync would stop and maybe at that point we would update the state. However, I don't know if it is actually the case. Do we have something like that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. I'll update the GroupingPartitionRouter to return the state for the last emitted parent record to ensure state consistency.
|
||
# Iterate over partitions lazily from the underlying router | ||
for partition in self.underlying_partition_router.stream_slices(): | ||
# Extract the partition key (assuming single key-value pair, e.g., {"board_ids": value}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we assume this? Couldn't we group objects like {"key1": "value1", "key2": "value2"}
?
To be clear, I'm fine with not supporting that as part of the first iteration but I think we would need to:
- Have clear error message when we have many values
- Check quickly that when we will want to support objects there will be a not too annoying way of making this not be a breaking change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both ListPartitionRouter and SubstreamPartitionRouter return partitions as dictionaries with a single key. If a partition router were to return multiple keys, we couldn't guarantee that all partition keys would be consistently present in every partition. Some keys might be missing in the first batch and appear in the next one, making grouping unreliable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm good with that. Does that means that there should be an error if the partition router has multiple values? I would prefer to fail loudly especially since we allow for CustomPartitionRouter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
3053-3082
: Implementation looks good, but could we enhance the error messages?The implementation properly validates group size and checks for unsupported configurations. I particularly like the comment explaining why request options aren't supported for GroupingPartitionRouter.
Just one suggestion - the error messages on lines 3071 and 3075 could be more descriptive to help users understand why request options don't work with grouped partitions, wdyt?
- raise ValueError("Request options are not supported for GroupingPartitionRouter.") + raise ValueError("Request options are not supported for GroupingPartitionRouter because they are specific to individual partitions and can't be properly applied when partitions are grouped.")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(4 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (7)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
🔇 Additional comments (6)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (6)
230-232
: Good addition of the required import for the new router model.The import of
GroupingPartitionRouter
model follows the established pattern in this file, which helps maintain consistency.
385-385
: LGTM on adding GroupingPartitionRouter to the imports.This follows the pattern of importing all partition router implementations in this section.
631-631
: Correctly added the model to constructor mapping.This is required for the factory pattern to work with the new router type.
3059-3060
: The group_size validation looks good.As discussed in the previous comments, keeping the min value at 1 makes sense because even a group_size of 1 can be useful for deduplication purposes.
3063-3076
: Well thought-out validation of underlying router configurations.The validation logic checks for request options in both SubstreamPartitionRouter and ListPartitionRouter, which prevents users from creating invalid configurations. This is a proactive approach to ensure proper component usage.
3077-3082
: Constructor call looks good.The creation of the GroupingPartitionRouter instance includes all the necessary parameters, and the default for deduplicate (True) makes sense as a sensible default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚢
Implements: https://github.com/airbytehq/airbyte-internal-issues/issues/11622
Summary by CodeRabbit
New Features
GroupingPartitionRouter
for enhanced partition grouping functionality, allowing for batching of partitions based on a specified size.SimpleRetriever
andAsyncRetriever
components to support the new grouping functionality, enabling simultaneous processing of multiple partitions in API requests.Tests
GroupingPartitionRouter
, covering various scenarios including grouping behavior, lazy iteration, and handling of extra fields.GroupingPartitionRouter
with an underlying router and validation of group size.