Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(low code): Add GroupingPartitionRouter #354

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3130,12 +3130,14 @@ definitions:
- "$ref": "#/definitions/CustomPartitionRouter"
- "$ref": "#/definitions/ListPartitionRouter"
- "$ref": "#/definitions/SubstreamPartitionRouter"
- "$ref": "#/definitions/GroupingPartitionRouter"
- type: array
items:
anyOf:
- "$ref": "#/definitions/CustomPartitionRouter"
- "$ref": "#/definitions/ListPartitionRouter"
- "$ref": "#/definitions/SubstreamPartitionRouter"
- "$ref": "#/definitions/GroupingPartitionRouter"
decoder:
title: Decoder
description: Component decoding the response so records can be extracted.
Expand Down Expand Up @@ -3290,12 +3292,14 @@ definitions:
- "$ref": "#/definitions/CustomPartitionRouter"
- "$ref": "#/definitions/ListPartitionRouter"
- "$ref": "#/definitions/SubstreamPartitionRouter"
- "$ref": "#/definitions/GroupingPartitionRouter"
- type: array
items:
anyOf:
- "$ref": "#/definitions/CustomPartitionRouter"
- "$ref": "#/definitions/ListPartitionRouter"
- "$ref": "#/definitions/SubstreamPartitionRouter"
- "$ref": "#/definitions/GroupingPartitionRouter"
decoder:
title: Decoder
description: Component decoding the response so records can be extracted.
Expand Down Expand Up @@ -3412,6 +3416,44 @@ definitions:
$parameters:
type: object
additionalProperties: true
GroupingPartitionRouter:
title: Grouping Partition Router
description: >
A decorator on top of a partition router that groups partitions into batches of a specified size.
This is useful for APIs that support filtering by multiple partition keys in a single request.
Note that per-partition incremental syncs may not work as expected because the grouping
of partitions might change between syncs, potentially leading to inconsistent state tracking.
type: object
required:
- type
- group_size
- underlying_partition_router
properties:
type:
type: string
enum: [GroupingPartitionRouter]
group_size:
title: Group Size
description: The number of partitions to include in each group. This determines how many partition values are batched together in a single slice.
type: integer
examples:
- 10
- 50
underlying_partition_router:
title: Underlying Partition Router
description: The partition router whose output will be grouped. This can be any valid partition router component.
anyOf:
- "$ref": "#/definitions/CustomPartitionRouter"
- "$ref": "#/definitions/ListPartitionRouter"
- "$ref": "#/definitions/SubstreamPartitionRouter"
deduplicate:
title: Deduplicate Partitions
description: If true, ensures that partitions are unique within each group by removing duplicates based on the partition key.
type: boolean
default: true
$parameters:
type: object
additionalProperties: true
WaitUntilTimeFromHeader:
title: Wait Until Time Defined In Response Header
description: Extract time at which we can retry the request from response header and wait for the difference between now and that time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2225,7 +2225,15 @@ class SimpleRetriever(BaseModel):
CustomPartitionRouter,
ListPartitionRouter,
SubstreamPartitionRouter,
List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]],
GroupingPartitionRouter,
List[
Union[
CustomPartitionRouter,
ListPartitionRouter,
SubstreamPartitionRouter,
GroupingPartitionRouter,
]
],
]
] = Field(
[],
Expand Down Expand Up @@ -2303,7 +2311,15 @@ class AsyncRetriever(BaseModel):
CustomPartitionRouter,
ListPartitionRouter,
SubstreamPartitionRouter,
List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]],
GroupingPartitionRouter,
List[
Union[
CustomPartitionRouter,
ListPartitionRouter,
SubstreamPartitionRouter,
GroupingPartitionRouter,
]
],
]
] = Field(
[],
Expand Down Expand Up @@ -2355,6 +2371,29 @@ class SubstreamPartitionRouter(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class GroupingPartitionRouter(BaseModel):
type: Literal["GroupingPartitionRouter"]
group_size: int = Field(
...,
description="The number of partitions to include in each group. This determines how many partition values are batched together in a single slice.",
examples=[10, 50],
title="Group Size",
)
underlying_partition_router: Union[
CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter
] = Field(
...,
description="The partition router whose output will be grouped. This can be any valid partition router component.",
title="Underlying Partition Router",
)
deduplicate: Optional[bool] = Field(
True,
description="If true, ensures that partitions are unique within each group by removing duplicates based on the partition key.",
title="Deduplicate Partitions",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class HttpComponentsResolver(BaseModel):
type: Literal["HttpComponentsResolver"]
retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever] = Field(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
FlattenFields as FlattenFieldsModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
GroupingPartitionRouter as GroupingPartitionRouterModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
GzipDecoder as GzipDecoderModel,
)
Expand Down Expand Up @@ -379,6 +382,7 @@
)
from airbyte_cdk.sources.declarative.partition_routers import (
CartesianProductStreamSlicer,
GroupingPartitionRouter,
ListPartitionRouter,
PartitionRouter,
SinglePartitionRouter,
Expand Down Expand Up @@ -624,6 +628,7 @@ def _init_mappings(self) -> None:
UnlimitedCallRatePolicyModel: self.create_unlimited_call_rate_policy,
RateModel: self.create_rate,
HttpRequestRegexMatcherModel: self.create_http_request_matcher,
GroupingPartitionRouterModel: self.create_grouping_partition_router,
}

# Needed for the case where we need to perform a second parse on the fields of a custom component
Expand Down Expand Up @@ -3044,3 +3049,34 @@ def set_api_budget(self, component_definition: ComponentDefinition, config: Conf
self._api_budget = self.create_component(
model_type=HTTPAPIBudgetModel, component_definition=component_definition, config=config
)

def create_grouping_partition_router(
self, model: GroupingPartitionRouterModel, config: Config, **kwargs: Any
) -> GroupingPartitionRouter:
underlying_router = self._create_component_from_model(
model=model.underlying_partition_router, config=config
)
if model.group_size < 1:
raise ValueError(f"Group size must be greater than 0, got {model.group_size}")

# Request options in underlying partition routers are not supported for GroupingPartitionRouter
# because they are specific to individual partitions and cannot be aggregated or handled
# when grouping, potentially leading to incorrect API calls. Any request customization
# should be managed at the stream level through the requester's configuration.
if isinstance(underlying_router, SubstreamPartitionRouter):
if any(
parent_config.request_option
for parent_config in underlying_router.parent_stream_configs
):
raise ValueError("Request options are not supported for GroupingPartitionRouter.")

if isinstance(underlying_router, ListPartitionRouter):
if underlying_router.request_option:
raise ValueError("Request options are not supported for GroupingPartitionRouter.")

return GroupingPartitionRouter(
group_size=model.group_size,
underlying_partition_router=underlying_router,
deduplicate=model.deduplicate if model.deduplicate is not None else True,
config=config,
)
4 changes: 4 additions & 0 deletions airbyte_cdk/sources/declarative/partition_routers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
from airbyte_cdk.sources.declarative.partition_routers.cartesian_product_stream_slicer import (
CartesianProductStreamSlicer,
)
from airbyte_cdk.sources.declarative.partition_routers.grouping_partition_router import (
GroupingPartitionRouter,
)
from airbyte_cdk.sources.declarative.partition_routers.list_partition_router import (
ListPartitionRouter,
)
Expand All @@ -22,6 +25,7 @@
__all__ = [
"AsyncJobPartitionRouter",
"CartesianProductStreamSlicer",
"GroupingPartitionRouter",
"ListPartitionRouter",
"SinglePartitionRouter",
"SubstreamPartitionRouter",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from dataclasses import dataclass
from typing import Any, Iterable, Mapping, Optional

from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState


@dataclass
class GroupingPartitionRouter(PartitionRouter):
"""
A partition router that groups partitions from an underlying partition router into batches of a specified size.
This is useful for APIs that support filtering by multiple partition keys in a single request.

Attributes:
group_size (int): The number of partitions to include in each group.
underlying_partition_router (PartitionRouter): The partition router whose output will be grouped.
deduplicate (bool): If True, ensures unique partitions within each group by removing duplicates based on the partition key.
config (Config): The connector configuration.
parameters (Mapping[str, Any]): Additional parameters for interpolation and configuration.
"""

group_size: int
underlying_partition_router: PartitionRouter
config: Config
deduplicate: bool = True

def stream_slices(self) -> Iterable[StreamSlice]:
"""
Lazily groups partitions from the underlying partition router into batches of size `group_size`.

This method processes partitions one at a time from the underlying router, maintaining a batch buffer.
When the buffer reaches `group_size` or the underlying router is exhausted, it yields a grouped slice.
If deduplication is enabled, it tracks seen partition keys to ensure uniqueness within the current batch.

Yields:
Iterable[StreamSlice]: An iterable of StreamSlice objects, where each slice contains a batch of partition values.
"""
batch = []
seen_keys = set()

# Iterate over partitions lazily from the underlying router
for partition in self.underlying_partition_router.stream_slices():
# Extract the partition key (assuming single key-value pair, e.g., {"board_ids": value})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we assume this? Couldn't we group objects like {"key1": "value1", "key2": "value2"}?

To be clear, I'm fine with not supporting that as part of the first iteration but I think we would need to:

  • Have clear error message when we have many values
  • Check quickly that when we will want to support objects there will be a not too annoying way of making this not be a breaking change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both ListPartitionRouter and SubstreamPartitionRouter return partitions as dictionaries with a single key. If a partition router were to return multiple keys, we couldn't guarantee that all partition keys would be consistently present in every partition. Some keys might be missing in the first batch and appear in the next one, making grouping unreliable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm good with that. Does that means that there should be an error if the partition router has multiple values? I would prefer to fail loudly especially since we allow for CustomPartitionRouter

key = next(iter(partition.partition.values()), None)

# Skip duplicates if deduplication is enabled
if self.deduplicate and key in seen_keys:
continue

# Add partition to the batch
batch.append(partition)
if self.deduplicate:
seen_keys.add(key)

# Yield the batch when it reaches the group_size
if len(batch) == self.group_size:
yield self._create_grouped_slice(batch)
batch = [] # Reset the batch

# Yield any remaining partitions if the batch isn't empty
if batch:
yield self._create_grouped_slice(batch)

def _create_grouped_slice(self, batch: list[StreamSlice]) -> StreamSlice:
"""
Creates a grouped StreamSlice from a batch of partitions, aggregating extra fields into a dictionary with list values.

Args:
batch (list[StreamSlice]): A list of StreamSlice objects to group.

Returns:
StreamSlice: A single StreamSlice with combined partition and extra field values.
"""
# Combine partition values into a single dict with lists
grouped_partition = {
key: [p.partition.get(key) for p in batch] for key in batch[0].partition.keys()
}

# Aggregate extra fields into a dict with list values
extra_fields_dict = (
{
key: [p.extra_fields.get(key) for p in batch]
for key in set().union(*(p.extra_fields.keys() for p in batch if p.extra_fields))
}
if any(p.extra_fields for p in batch)
else {}
)
return StreamSlice(
partition=grouped_partition,
cursor_slice={}, # Cursor is managed by the underlying router or incremental sync
extra_fields=extra_fields_dict,
)

def get_request_params(
self,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return {}

def get_request_headers(
self,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return {}

def get_request_body_data(
self,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return {}

def get_request_body_json(
self,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return {}

def set_initial_state(self, stream_state: StreamState) -> None:
"""Delegate state initialization to the underlying partition router."""
self.underlying_partition_router.set_initial_state(stream_state)

def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
"""Delegate state retrieval to the underlying partition router."""
return self.underlying_partition_router.get_stream_state()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is interesting to me because it feels like we have a gap between the state of the parent and what is actually emitted and I'm not sure if this is fine. Let me explain:

. For example, given we have group_size = 2, we could have the following situation:

  • T0: Parent emit parent_1, state is parent_state_1, no slices created for the child stream yet
  • T1: Parent emit parent_2, state is parent_state_2, we emit a slice to the child

If we were to request the state between T0 and T1, the state would actually be wrong because we haven't consumed parent_1 from the child's perspective. I don't know if we have a process that does that. I could easily see that if we fail on the parent stream between T0 and T1, the sync would stop and maybe at that point we would update the state. However, I don't know if it is actually the case. Do we have something like that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. I'll update the GroupingPartitionRouter to return the state for the last emitted parent record to ensure state consistency.

Loading
Loading