From 978be1b4eed6b0225a4882102a5fd36ebf5cd8ea Mon Sep 17 00:00:00 2001 From: Patrick Nilan Date: Wed, 19 Feb 2025 19:15:37 -0800 Subject: [PATCH 01/11] feat: expose `str_to_datetime` jinja macro (#351) --- .../declarative_component_schema.yaml | 10 ++++ .../declarative/interpolation/__init__.py | 2 +- .../declarative/interpolation/filters.py | 3 +- .../interpolation/interpolated_boolean.py | 2 +- .../interpolation/interpolated_mapping.py | 2 +- .../interpolated_nested_mapping.py | 2 +- .../interpolation/interpolated_string.py | 2 +- .../interpolation/interpolation.py | 3 +- .../declarative/interpolation/jinja.py | 2 +- .../declarative/interpolation/macros.py | 23 ++++++++-- .../declarative/interpolation/test_macros.py | 46 ++++++++++++++++++- 11 files changed, 84 insertions(+), 13 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index b4eef5f03..7bf5c4fa3 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -3744,6 +3744,16 @@ interpolation: - "{{ format_datetime(config['start_time'], '%Y-%m-%d') }}" - "{{ format_datetime(config['start_date'], '%Y-%m-%dT%H:%M:%S.%fZ') }}" - "{{ format_datetime(config['start_date'], '%Y-%m-%dT%H:%M:%S.%fZ', '%a, %d %b %Y %H:%M:%S %z') }}" + - title: str_to_datetime + description: Converts a string to a datetime object with UTC timezone. + arguments: + s: The string to convert. + return_type: datetime.datetime + examples: + - "{{ str_to_datetime('2022-01-14') }}" + - "{{ str_to_datetime('2022-01-01 13:45:30') }}" + - "{{ str_to_datetime('2022-01-01T13:45:30+00:00') }}" + - "{{ str_to_datetime('2022-01-01T13:45:30.123456Z') }}" filters: - title: hash description: Convert the specified value to a hashed string. diff --git a/airbyte_cdk/sources/declarative/interpolation/__init__.py b/airbyte_cdk/sources/declarative/interpolation/__init__.py index d721b99f1..84362e9ab 100644 --- a/airbyte_cdk/sources/declarative/interpolation/__init__.py +++ b/airbyte_cdk/sources/declarative/interpolation/__init__.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. # from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean diff --git a/airbyte_cdk/sources/declarative/interpolation/filters.py b/airbyte_cdk/sources/declarative/interpolation/filters.py index 52d76cab6..ffebe73da 100644 --- a/airbyte_cdk/sources/declarative/interpolation/filters.py +++ b/airbyte_cdk/sources/declarative/interpolation/filters.py @@ -1,6 +1,7 @@ # -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. # + import base64 import hashlib import json diff --git a/airbyte_cdk/sources/declarative/interpolation/interpolated_boolean.py b/airbyte_cdk/sources/declarative/interpolation/interpolated_boolean.py index 78569b350..04cc7e694 100644 --- a/airbyte_cdk/sources/declarative/interpolation/interpolated_boolean.py +++ b/airbyte_cdk/sources/declarative/interpolation/interpolated_boolean.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. # from dataclasses import InitVar, dataclass diff --git a/airbyte_cdk/sources/declarative/interpolation/interpolated_mapping.py b/airbyte_cdk/sources/declarative/interpolation/interpolated_mapping.py index 11b2dac97..b96a2a6b7 100644 --- a/airbyte_cdk/sources/declarative/interpolation/interpolated_mapping.py +++ b/airbyte_cdk/sources/declarative/interpolation/interpolated_mapping.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. # diff --git a/airbyte_cdk/sources/declarative/interpolation/interpolated_nested_mapping.py b/airbyte_cdk/sources/declarative/interpolation/interpolated_nested_mapping.py index 82454919e..f441ba918 100644 --- a/airbyte_cdk/sources/declarative/interpolation/interpolated_nested_mapping.py +++ b/airbyte_cdk/sources/declarative/interpolation/interpolated_nested_mapping.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. # diff --git a/airbyte_cdk/sources/declarative/interpolation/interpolated_string.py b/airbyte_cdk/sources/declarative/interpolation/interpolated_string.py index 542fa8068..ef20a436f 100644 --- a/airbyte_cdk/sources/declarative/interpolation/interpolated_string.py +++ b/airbyte_cdk/sources/declarative/interpolation/interpolated_string.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. # from dataclasses import InitVar, dataclass diff --git a/airbyte_cdk/sources/declarative/interpolation/interpolation.py b/airbyte_cdk/sources/declarative/interpolation/interpolation.py index 5af61905e..021f96df6 100644 --- a/airbyte_cdk/sources/declarative/interpolation/interpolation.py +++ b/airbyte_cdk/sources/declarative/interpolation/interpolation.py @@ -1,7 +1,8 @@ # -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. # + from abc import ABC, abstractmethod from typing import Any, Optional diff --git a/airbyte_cdk/sources/declarative/interpolation/jinja.py b/airbyte_cdk/sources/declarative/interpolation/jinja.py index 8f8548aee..543fe9b46 100644 --- a/airbyte_cdk/sources/declarative/interpolation/jinja.py +++ b/airbyte_cdk/sources/declarative/interpolation/jinja.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. # import ast diff --git a/airbyte_cdk/sources/declarative/interpolation/macros.py b/airbyte_cdk/sources/declarative/interpolation/macros.py index 1ca5b31f0..62b6904c8 100644 --- a/airbyte_cdk/sources/declarative/interpolation/macros.py +++ b/airbyte_cdk/sources/declarative/interpolation/macros.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. # import builtins @@ -63,10 +63,24 @@ def timestamp(dt: Union[float, str]) -> Union[int, float]: if isinstance(dt, (int, float)): return int(dt) else: - return _str_to_datetime(dt).astimezone(pytz.utc).timestamp() + return str_to_datetime(dt).astimezone(pytz.utc).timestamp() -def _str_to_datetime(s: str) -> datetime.datetime: +def str_to_datetime(s: str) -> datetime.datetime: + """ + Converts a string to a datetime object with UTC timezone + + If the input string does not contain timezone information, UTC is assumed. + Supports both basic date strings like "2022-01-14" and datetime strings with optional timezone + like "2022-01-01T13:45:30+00:00". + + Usage: + `"{{ str_to_datetime('2022-01-14') }}"` + + :param s: string to parse as datetime + :return: datetime object in UTC timezone + """ + parsed_date = parser.isoparse(s) if not parsed_date.tzinfo: # Assume UTC if the input does not contain a timezone @@ -155,7 +169,7 @@ def format_datetime( if isinstance(dt, datetime.datetime): return dt.strftime(format) dt_datetime = ( - datetime.datetime.strptime(dt, input_format) if input_format else _str_to_datetime(dt) + datetime.datetime.strptime(dt, input_format) if input_format else str_to_datetime(dt) ) if format == "%s": return str(int(dt_datetime.timestamp())) @@ -172,5 +186,6 @@ def format_datetime( duration, format_datetime, today_with_timezone, + str_to_datetime, ] macros = {f.__name__: f for f in _macros_list} diff --git a/unit_tests/sources/declarative/interpolation/test_macros.py b/unit_tests/sources/declarative/interpolation/test_macros.py index 3fcad5d15..c531a9811 100644 --- a/unit_tests/sources/declarative/interpolation/test_macros.py +++ b/unit_tests/sources/declarative/interpolation/test_macros.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. # import datetime @@ -120,3 +120,47 @@ def test_utc_datetime_to_local_timestamp_conversion(): This test ensures correct timezone handling independent of the timezone of the system on which the sync is running. """ assert macros["format_datetime"](dt="2020-10-01T00:00:00Z", format="%s") == "1601510400" + + +@pytest.mark.parametrize( + "test_name, input_value, expected_output", + [ + ( + "test_basic_date", + "2022-01-14", + datetime.datetime(2022, 1, 14, tzinfo=datetime.timezone.utc), + ), + ( + "test_datetime_with_time", + "2022-01-01 13:45:30", + datetime.datetime(2022, 1, 1, 13, 45, 30, tzinfo=datetime.timezone.utc), + ), + ( + "test_datetime_with_timezone", + "2022-01-01T13:45:30+00:00", + datetime.datetime(2022, 1, 1, 13, 45, 30, tzinfo=datetime.timezone.utc), + ), + ( + "test_datetime_with_timezone_offset", + "2022-01-01T13:45:30+05:30", + datetime.datetime(2022, 1, 1, 8, 15, 30, tzinfo=datetime.timezone.utc), + ), + ( + "test_datetime_with_microseconds", + "2022-01-01T13:45:30.123456Z", + datetime.datetime(2022, 1, 1, 13, 45, 30, 123456, tzinfo=datetime.timezone.utc), + ), + ], +) +def test_give_valid_date_str_to_datetime_returns_datetime_object( + test_name, input_value, expected_output +): + str_to_datetime_fn = macros["str_to_datetime"] + actual_output = str_to_datetime_fn(input_value) + assert actual_output == expected_output + + +def test_given_invalid_date_str_to_datetime_raises_value_error(): + str_to_datetime_fn = macros["str_to_datetime"] + with pytest.raises(ValueError): + str_to_datetime_fn("invalid-date") From 6ef3153b5398ab444da46ff3a955ba38ee37e468 Mon Sep 17 00:00:00 2001 From: Baz Date: Thu, 20 Feb 2025 19:32:46 +0200 Subject: [PATCH 02/11] fix: (CDK) (CsvParser) - Fix the `\\` escaping when passing the `delimiter` from Builder's UI (#358) --- .../declarative/decoders/composite_raw_decoder.py | 12 +++++++++++- .../declarative/decoders/test_composite_decoder.py | 4 +++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py index 389679406..2cb618175 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py @@ -107,6 +107,16 @@ class CsvParser(Parser): encoding: Optional[str] = "utf-8" delimiter: Optional[str] = "," + def _get_delimiter(self) -> Optional[str]: + """ + Get delimiter from the configuration. Check for the escape character and decode it. + """ + if self.delimiter is not None: + if self.delimiter.startswith("\\"): + self.delimiter = self.delimiter.encode("utf-8").decode("unicode_escape") + + return self.delimiter + def parse( self, data: BufferedIOBase, @@ -115,7 +125,7 @@ def parse( Parse CSV data from decompressed bytes. """ text_data = TextIOWrapper(data, encoding=self.encoding) # type: ignore - reader = csv.DictReader(text_data, delimiter=self.delimiter or ",") + reader = csv.DictReader(text_data, delimiter=self._get_delimiter() or ",") yield from reader diff --git a/unit_tests/sources/declarative/decoders/test_composite_decoder.py b/unit_tests/sources/declarative/decoders/test_composite_decoder.py index 524593b56..745113925 100644 --- a/unit_tests/sources/declarative/decoders/test_composite_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_composite_decoder.py @@ -62,7 +62,9 @@ def test_composite_raw_decoder_gzip_csv_parser(requests_mock, encoding: str): ) response = requests.get("https://airbyte.io/", stream=True) - parser = GzipParser(inner_parser=CsvParser(encoding=encoding, delimiter="\t")) + # the delimiter is set to `\\t` intentionally to test the parsing logic here + parser = GzipParser(inner_parser=CsvParser(encoding=encoding, delimiter="\\t")) + composite_raw_decoder = CompositeRawDecoder(parser=parser) counter = 0 for _ in composite_raw_decoder.decode(response): From ef0ca58f08e314d5088234382562d6964e56d4e2 Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi <53845333+lazebnyi@users.noreply.github.com> Date: Thu, 20 Feb 2025 19:17:15 +0100 Subject: [PATCH 03/11] feat(low-code cdk): add AbstractStreamFacade processing as concurrent streams in declarative source (#347) Co-authored-by: octavia-squidington-iii --- .../declarative/concurrent_declarative_source.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 7419b9dda..1dd5b962c 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -44,6 +44,7 @@ from airbyte_cdk.sources.source import TState from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream +from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade from airbyte_cdk.sources.streams.concurrent.availability_strategy import ( AlwaysAvailableAvailabilityStrategy, ) @@ -118,6 +119,12 @@ def __init__( message_repository=self.message_repository, ) + # TODO: Remove this. This property is necessary to safely migrate Stripe during the transition state. + @property + def is_partially_declarative(self) -> bool: + """This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams.""" + return False + def read( self, logger: logging.Logger, @@ -369,6 +376,14 @@ def _group_streams( ) else: synchronous_streams.append(declarative_stream) + # TODO: Remove this. This check is necessary to safely migrate Stripe during the transition state. + # Condition below needs to ensure that concurrent support is not lost for sources that already support + # it before migration, but now are only partially migrated to declarative implementation (e.g., Stripe). + elif ( + isinstance(declarative_stream, AbstractStreamFacade) + and self.is_partially_declarative + ): + concurrent_streams.append(declarative_stream.get_underlying_stream()) else: synchronous_streams.append(declarative_stream) From 665dc1ffe1c2ade7ab3fe46bf1f90d50ceca3ec6 Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi <53845333+lazebnyi@users.noreply.github.com> Date: Thu, 20 Feb 2025 19:19:04 +0100 Subject: [PATCH 04/11] feat(low-code cdk): add interpolation for limit field in Rate (#353) Co-authored-by: octavia-squidington-iii --- .../sources/declarative/declarative_component_schema.yaml | 6 +++++- .../declarative/models/declarative_component_schema.py | 2 +- .../declarative/parsers/model_to_component_factory.py | 3 ++- .../sources/declarative/requesters/test_http_requester.py | 2 +- 4 files changed, 9 insertions(+), 4 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 7bf5c4fa3..5bd110c4e 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1490,7 +1490,11 @@ definitions: limit: title: Limit description: The maximum number of calls allowed within the interval. - type: integer + anyOf: + - type: integer + - type: string + interpolation_context: + - config interval: title: Interval description: The time interval for the rate limit. diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 8f5be6867..7ff18fa1d 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -646,7 +646,7 @@ class Rate(BaseModel): class Config: extra = Extra.allow - limit: int = Field( + limit: Union[int, str] = Field( ..., description="The maximum number of calls allowed within the interval.", title="Limit", diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 739d15825..d9cc3e3ef 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -3024,8 +3024,9 @@ def create_unlimited_call_rate_policy( ) def create_rate(self, model: RateModel, config: Config, **kwargs: Any) -> Rate: + interpolated_limit = InterpolatedString.create(str(model.limit), parameters={}) return Rate( - limit=model.limit, + limit=int(interpolated_limit.eval(config=config)), interval=parse_duration(model.interval), ) diff --git a/unit_tests/sources/declarative/requesters/test_http_requester.py b/unit_tests/sources/declarative/requesters/test_http_requester.py index c5d5c218d..dfe78011a 100644 --- a/unit_tests/sources/declarative/requesters/test_http_requester.py +++ b/unit_tests/sources/declarative/requesters/test_http_requester.py @@ -946,7 +946,7 @@ def test_backoff_strategy_from_manifest_is_respected(http_requester_factory: Any ) -def test_http_requester_with_mock_apibudget(http_requester_factory, monkeypatch): +def test_http_requester_with_mock_api_budget(http_requester_factory, monkeypatch): mock_budget = MagicMock(spec=HttpAPIBudget) requester = http_requester_factory( From c3efa4c5efdee693cd72391f70aaae5a50df48fd Mon Sep 17 00:00:00 2001 From: Christo Grabowski <108154848+ChristoGrab@users.noreply.github.com> Date: Fri, 21 Feb 2025 13:59:12 -0500 Subject: [PATCH 05/11] fix: update csv parser for builder compatibility (#364) --- .../declarative/decoders/composite_raw_decoder.py | 3 ++- .../parsers/model_to_component_factory.py | 14 ++++++++------ 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py index 2cb618175..b8e8e3315 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py @@ -126,7 +126,8 @@ def parse( """ text_data = TextIOWrapper(data, encoding=self.encoding) # type: ignore reader = csv.DictReader(text_data, delimiter=self._get_delimiter() or ",") - yield from reader + for row in reader: + yield row @dataclass diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index d9cc3e3ef..0ad66e1b0 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2091,10 +2091,10 @@ def create_dynamic_schema_loader( def create_json_decoder(model: JsonDecoderModel, config: Config, **kwargs: Any) -> Decoder: return JsonDecoder(parameters={}) - @staticmethod - def create_csv_decoder(model: CsvDecoderModel, config: Config, **kwargs: Any) -> Decoder: + def create_csv_decoder(self, model: CsvDecoderModel, config: Config, **kwargs: Any) -> Decoder: return CompositeRawDecoder( - parser=ModelToComponentFactory._get_parser(model, config), stream_response=True + parser=ModelToComponentFactory._get_parser(model, config), + stream_response=False if self._emit_connector_builder_messages else True, ) @staticmethod @@ -2103,10 +2103,12 @@ def create_jsonl_decoder(model: JsonlDecoderModel, config: Config, **kwargs: Any parser=ModelToComponentFactory._get_parser(model, config), stream_response=True ) - @staticmethod - def create_gzip_decoder(model: GzipDecoderModel, config: Config, **kwargs: Any) -> Decoder: + def create_gzip_decoder( + self, model: GzipDecoderModel, config: Config, **kwargs: Any + ) -> Decoder: return CompositeRawDecoder( - parser=ModelToComponentFactory._get_parser(model, config), stream_response=True + parser=ModelToComponentFactory._get_parser(model, config), + stream_response=False if self._emit_connector_builder_messages else True, ) @staticmethod From 4991e07fb6c46f56d7774231c88f3d4a9681f633 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk <35109939+tolik0@users.noreply.github.com> Date: Mon, 24 Feb 2025 14:08:52 +0200 Subject: [PATCH 06/11] feat(concurrent perpartition cursor): Add parent state updates (#343) --- .../concurrent_partition_cursor.py | 100 +++- .../test_concurrent_perpartitioncursor.py | 433 ++++++++++++++++++ 2 files changed, 516 insertions(+), 17 deletions(-) diff --git a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py index ddcba0470..715589026 100644 --- a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py +++ b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py @@ -95,6 +95,10 @@ def __init__( # the oldest partitions can be efficiently removed, maintaining the most recent partitions. self._cursor_per_partition: OrderedDict[str, ConcurrentCursor] = OrderedDict() self._semaphore_per_partition: OrderedDict[str, threading.Semaphore] = OrderedDict() + + # Parent-state tracking: store each partition’s parent state in creation order + self._partition_parent_state_map: OrderedDict[str, Mapping[str, Any]] = OrderedDict() + self._finished_partitions: set[str] = set() self._lock = threading.Lock() self._timer = Timer() @@ -155,11 +159,62 @@ def close_partition(self, partition: Partition) -> None: and self._semaphore_per_partition[partition_key]._value == 0 ): self._update_global_cursor(cursor.state[self.cursor_field.cursor_field_key]) - self._emit_state_message() + + self._check_and_update_parent_state() + + self._emit_state_message() + + def _check_and_update_parent_state(self) -> None: + """ + Pop the leftmost partition state from _partition_parent_state_map only if + *all partitions* up to (and including) that partition key in _semaphore_per_partition + are fully finished (i.e. in _finished_partitions and semaphore._value == 0). + Additionally, delete finished semaphores with a value of 0 to free up memory, + as they are only needed to track errors and completion status. + """ + last_closed_state = None + + while self._partition_parent_state_map: + # Look at the earliest partition key in creation order + earliest_key = next(iter(self._partition_parent_state_map)) + + # Verify ALL partitions from the left up to earliest_key are finished + all_left_finished = True + for p_key, sem in list( + self._semaphore_per_partition.items() + ): # Use list to allow modification during iteration + # If any earlier partition is still not finished, we must stop + if p_key not in self._finished_partitions or sem._value != 0: + all_left_finished = False + break + # Once we've reached earliest_key in the semaphore order, we can stop checking + if p_key == earliest_key: + break + + # If the partitions up to earliest_key are not all finished, break the while-loop + if not all_left_finished: + break + + # Pop the leftmost entry from parent-state map + _, closed_parent_state = self._partition_parent_state_map.popitem(last=False) + last_closed_state = closed_parent_state + + # Clean up finished semaphores with value 0 up to and including earliest_key + for p_key in list(self._semaphore_per_partition.keys()): + sem = self._semaphore_per_partition[p_key] + if p_key in self._finished_partitions and sem._value == 0: + del self._semaphore_per_partition[p_key] + logger.debug(f"Deleted finished semaphore for partition {p_key} with value 0") + if p_key == earliest_key: + break + + # Update _parent_state if we popped at least one partition + if last_closed_state is not None: + self._parent_state = last_closed_state def ensure_at_least_one_state_emitted(self) -> None: """ - The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be + The platform expects at least one state message on successful syncs. Hence, whatever happens, we expect this method to be called. """ if not any( @@ -201,13 +256,19 @@ def stream_slices(self) -> Iterable[StreamSlice]: slices = self._partition_router.stream_slices() self._timer.start() - for partition in slices: - yield from self._generate_slices_from_partition(partition) + for partition, last, parent_state in iterate_with_last_flag_and_state( + slices, self._partition_router.get_stream_state + ): + yield from self._generate_slices_from_partition(partition, parent_state) - def _generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[StreamSlice]: + def _generate_slices_from_partition( + self, partition: StreamSlice, parent_state: Mapping[str, Any] + ) -> Iterable[StreamSlice]: # Ensure the maximum number of partitions is not exceeded self._ensure_partition_limit() + partition_key = self._to_partition_key(partition.partition) + cursor = self._cursor_per_partition.get(self._to_partition_key(partition.partition)) if not cursor: cursor = self._create_cursor( @@ -216,18 +277,26 @@ def _generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[St ) with self._lock: self._number_of_partitions += 1 - self._cursor_per_partition[self._to_partition_key(partition.partition)] = cursor - self._semaphore_per_partition[self._to_partition_key(partition.partition)] = ( - threading.Semaphore(0) - ) + self._cursor_per_partition[partition_key] = cursor + self._semaphore_per_partition[partition_key] = threading.Semaphore(0) + + with self._lock: + if ( + len(self._partition_parent_state_map) == 0 + or self._partition_parent_state_map[ + next(reversed(self._partition_parent_state_map)) + ] + != parent_state + ): + self._partition_parent_state_map[partition_key] = deepcopy(parent_state) for cursor_slice, is_last_slice, _ in iterate_with_last_flag_and_state( cursor.stream_slices(), lambda: None, ): - self._semaphore_per_partition[self._to_partition_key(partition.partition)].release() + self._semaphore_per_partition[partition_key].release() if is_last_slice: - self._finished_partitions.add(self._to_partition_key(partition.partition)) + self._finished_partitions.add(partition_key) yield StreamSlice( partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields ) @@ -257,9 +326,9 @@ def _ensure_partition_limit(self) -> None: while len(self._cursor_per_partition) > self.DEFAULT_MAX_PARTITIONS_NUMBER - 1: # Try removing finished partitions first for partition_key in list(self._cursor_per_partition.keys()): - if ( - partition_key in self._finished_partitions - and self._semaphore_per_partition[partition_key]._value == 0 + if partition_key in self._finished_partitions and ( + partition_key not in self._semaphore_per_partition + or self._semaphore_per_partition[partition_key]._value == 0 ): oldest_partition = self._cursor_per_partition.pop( partition_key @@ -338,9 +407,6 @@ def _set_initial_state(self, stream_state: StreamState) -> None: self._cursor_per_partition[self._to_partition_key(state["partition"])] = ( self._create_cursor(state["cursor"]) ) - self._semaphore_per_partition[self._to_partition_key(state["partition"])] = ( - threading.Semaphore(0) - ) # set default state for missing partitions if it is per partition with fallback to global if self._GLOBAL_STATE_KEY in stream_state: diff --git a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py index 86a7e4c46..042a430aa 100644 --- a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py +++ b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py @@ -20,6 +20,14 @@ ConcurrentDeclarativeSource, ) from airbyte_cdk.sources.declarative.incremental import ConcurrentPerPartitionCursor +from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import ( + DeclarativePartition, +) +from airbyte_cdk.sources.streams.concurrent.cursor import CursorField +from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import ( + CustomFormatConcurrentStreamStateConverter, +) +from airbyte_cdk.sources.types import StreamSlice from airbyte_cdk.test.catalog_builder import CatalogBuilder, ConfiguredAirbyteStreamBuilder from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read @@ -2027,6 +2035,8 @@ def test_incremental_parent_state_no_records( "cursor": {"updated_at": PARENT_COMMENT_CURSOR_PARTITION_1}, } ], + "state": {}, + "use_global_cursor": False, "parent_state": {"posts": {"updated_at": PARENT_POSTS_CURSOR}}, } }, @@ -3017,3 +3027,426 @@ def test_state_throttling(mocker): cursor._emit_state_message() mock_connector_manager.update_state_for_stream.assert_called_once() mock_repo.emit_message.assert_called_once() + + +def test_given_no_partitions_processed_when_close_partition_then_no_state_update(): + mock_cursor = MagicMock() + # No slices for no partitions + mock_cursor.stream_slices.side_effect = [iter([])] + mock_cursor.state = {} # Empty state for no partitions + + cursor_factory_mock = MagicMock() + cursor_factory_mock.create.return_value = mock_cursor + + connector_state_converter = CustomFormatConcurrentStreamStateConverter( + datetime_format="%Y-%m-%dT%H:%M:%SZ", + input_datetime_formats=["%Y-%m-%dT%H:%M:%SZ"], + is_sequential_state=True, + cursor_granularity=timedelta(0), + ) + + cursor = ConcurrentPerPartitionCursor( + cursor_factory=cursor_factory_mock, + partition_router=MagicMock(), + stream_name="test_stream", + stream_namespace=None, + stream_state={}, + message_repository=MagicMock(), + connector_state_manager=MagicMock(), + connector_state_converter=connector_state_converter, + cursor_field=CursorField(cursor_field_key="updated_at"), + ) + partition_router = cursor._partition_router + partition_router.stream_slices.return_value = iter([]) + partition_router.get_stream_state.return_value = {} + + slices = list(cursor.stream_slices()) # Call once + for slice in slices: + cursor.close_partition( + DeclarativePartition("test_stream", {}, MagicMock(), MagicMock(), slice) + ) + + assert cursor.state == { + "use_global_cursor": False, + "lookback_window": 0, + "states": [], + } + assert len(cursor._cursor_per_partition) == 0 + assert len(cursor._semaphore_per_partition) == 0 + assert len(cursor._partition_parent_state_map) == 0 + assert mock_cursor.stream_slices.call_count == 0 # No calls since no partitions + + +def test_given_unfinished_first_parent_partition_no_parent_state_update(): + # Create two mock cursors with different states for each partition + mock_cursor_1 = MagicMock() + mock_cursor_1.stream_slices.return_value = iter( + [ + {"slice1": "data1"}, + {"slice2": "data1"}, # First partition slices + ] + ) + mock_cursor_1.state = {"updated_at": "2024-01-01T00:00:00Z"} # State for partition "1" + + mock_cursor_2 = MagicMock() + mock_cursor_2.stream_slices.return_value = iter( + [ + {"slice2": "data2"}, + {"slice2": "data2"}, # Second partition slices + ] + ) + mock_cursor_2.state = {"updated_at": "2024-01-02T00:00:00Z"} # State for partition "2" + + # Configure cursor factory to return different mock cursors based on partition + cursor_factory_mock = MagicMock() + cursor_factory_mock.create.side_effect = [mock_cursor_1, mock_cursor_2] + + connector_state_converter = CustomFormatConcurrentStreamStateConverter( + datetime_format="%Y-%m-%dT%H:%M:%SZ", + input_datetime_formats=["%Y-%m-%dT%H:%M:%SZ"], + is_sequential_state=True, + cursor_granularity=timedelta(0), + ) + + cursor = ConcurrentPerPartitionCursor( + cursor_factory=cursor_factory_mock, + partition_router=MagicMock(), + stream_name="test_stream", + stream_namespace=None, + stream_state={ + "states": [ + {"partition": {"id": "1"}, "cursor": {"updated_at": "2024-01-01T00:00:00Z"}} + ], + "state": {"updated_at": "2024-01-01T00:00:00Z"}, + "lookback_window": 86400, + "parent_state": {"posts": {"updated_at": "2024-01-01T00:00:00Z"}}, + }, + message_repository=MagicMock(), + connector_state_manager=MagicMock(), + connector_state_converter=connector_state_converter, + cursor_field=CursorField(cursor_field_key="updated_at"), + ) + partition_router = cursor._partition_router + all_partitions = [ + StreamSlice(partition={"id": "1"}, cursor_slice={}, extra_fields={}), + StreamSlice(partition={"id": "2"}, cursor_slice={}, extra_fields={}), # New partition + ] + partition_router.stream_slices.return_value = iter(all_partitions) + partition_router.get_stream_state.side_effect = [ + {"posts": {"updated_at": "2024-01-04T00:00:00Z"}}, # Initial parent state + {"posts": {"updated_at": "2024-01-05T00:00:00Z"}}, # Updated parent state for new partition + ] + + slices = list(cursor.stream_slices()) + # Close all partitions except from the first one + for slice in slices[1:]: + cursor.close_partition( + DeclarativePartition("test_stream", {}, MagicMock(), MagicMock(), slice) + ) + cursor.ensure_at_least_one_state_emitted() + + state = cursor.state + assert state == { + "use_global_cursor": False, + "states": [ + {"partition": {"id": "1"}, "cursor": {"updated_at": "2024-01-01T00:00:00Z"}}, + {"partition": {"id": "2"}, "cursor": {"updated_at": "2024-01-02T00:00:00Z"}}, + ], + "state": {"updated_at": "2024-01-01T00:00:00Z"}, + "lookback_window": 86400, + "parent_state": {"posts": {"updated_at": "2024-01-01T00:00:00Z"}}, + } + assert mock_cursor_1.stream_slices.call_count == 1 # Called once for each partition + assert mock_cursor_2.stream_slices.call_count == 1 # Called once for each partition + assert len(cursor._semaphore_per_partition) == 2 + + +def test_given_unfinished_last_parent_partition_with_partial_parent_state_update(): + # Create two mock cursors with different states for each partition + mock_cursor_1 = MagicMock() + mock_cursor_1.stream_slices.return_value = iter( + [ + {"slice1": "data1"}, + {"slice2": "data1"}, # First partition slices + ] + ) + mock_cursor_1.state = {"updated_at": "2024-01-02T00:00:00Z"} # State for partition "1" + + mock_cursor_2 = MagicMock() + mock_cursor_2.stream_slices.return_value = iter( + [ + {"slice2": "data2"}, + {"slice2": "data2"}, # Second partition slices + ] + ) + mock_cursor_2.state = {"updated_at": "2024-01-01T00:00:00Z"} # State for partition "2" + + # Configure cursor factory to return different mock cursors based on partition + cursor_factory_mock = MagicMock() + cursor_factory_mock.create.side_effect = [mock_cursor_1, mock_cursor_2] + + connector_state_converter = CustomFormatConcurrentStreamStateConverter( + datetime_format="%Y-%m-%dT%H:%M:%SZ", + input_datetime_formats=["%Y-%m-%dT%H:%M:%SZ"], + is_sequential_state=True, + cursor_granularity=timedelta(0), + ) + + cursor = ConcurrentPerPartitionCursor( + cursor_factory=cursor_factory_mock, + partition_router=MagicMock(), + stream_name="test_stream", + stream_namespace=None, + stream_state={ + "states": [ + {"partition": {"id": "1"}, "cursor": {"updated_at": "2024-01-01T00:00:00Z"}} + ], + "state": {"updated_at": "2024-01-01T00:00:00Z"}, + "lookback_window": 86400, + "parent_state": {"posts": {"updated_at": "2024-01-01T00:00:00Z"}}, + }, + message_repository=MagicMock(), + connector_state_manager=MagicMock(), + connector_state_converter=connector_state_converter, + cursor_field=CursorField(cursor_field_key="updated_at"), + ) + partition_router = cursor._partition_router + all_partitions = [ + StreamSlice(partition={"id": "1"}, cursor_slice={}, extra_fields={}), + StreamSlice(partition={"id": "2"}, cursor_slice={}, extra_fields={}), # New partition + ] + partition_router.stream_slices.return_value = iter(all_partitions) + partition_router.get_stream_state.side_effect = [ + {"posts": {"updated_at": "2024-01-04T00:00:00Z"}}, # Initial parent state + {"posts": {"updated_at": "2024-01-05T00:00:00Z"}}, # Updated parent state for new partition + ] + + slices = list(cursor.stream_slices()) + # Close all partitions except from the first one + for slice in slices[:-1]: + cursor.close_partition( + DeclarativePartition("test_stream", {}, MagicMock(), MagicMock(), slice) + ) + cursor.ensure_at_least_one_state_emitted() + + state = cursor.state + assert state == { + "use_global_cursor": False, + "states": [ + {"partition": {"id": "1"}, "cursor": {"updated_at": "2024-01-02T00:00:00Z"}}, + {"partition": {"id": "2"}, "cursor": {"updated_at": "2024-01-01T00:00:00Z"}}, + ], + "state": {"updated_at": "2024-01-01T00:00:00Z"}, + "lookback_window": 86400, + "parent_state": {"posts": {"updated_at": "2024-01-04T00:00:00Z"}}, + } + assert mock_cursor_1.stream_slices.call_count == 1 # Called once for each partition + assert mock_cursor_2.stream_slices.call_count == 1 # Called once for each partition + assert len(cursor._semaphore_per_partition) == 1 + + +def test_given_all_partitions_finished_when_close_partition_then_final_state_emitted(): + mock_cursor = MagicMock() + # Simulate one slice per cursor + mock_cursor.stream_slices.side_effect = [ + iter( + [ + {"slice1": "data"}, # First slice for partition 1 + ] + ), + iter( + [ + {"slice2": "data"}, # First slice for partition 2 + ] + ), + ] + mock_cursor.state = {"updated_at": "2024-01-02T00:00:00Z"} # Set cursor state (latest) + + cursor_factory_mock = MagicMock() + cursor_factory_mock.create.return_value = mock_cursor + + connector_state_converter = CustomFormatConcurrentStreamStateConverter( + datetime_format="%Y-%m-%dT%H:%M:%SZ", + input_datetime_formats=["%Y-%m-%dT%H:%M:%SZ"], + is_sequential_state=True, + cursor_granularity=timedelta(0), + ) + + cursor = ConcurrentPerPartitionCursor( + cursor_factory=cursor_factory_mock, + partition_router=MagicMock(), + stream_name="test_stream", + stream_namespace=None, + stream_state={ + "states": [ + {"partition": {"id": "1"}, "cursor": {"updated_at": "2024-01-01T00:00:00Z"}}, + {"partition": {"id": "2"}, "cursor": {"updated_at": "2024-01-02T00:00:00Z"}}, + ], + "state": {"updated_at": "2024-01-02T00:00:00Z"}, + "lookback_window": 86400, + "parent_state": {"posts": {"updated_at": "2024-01-03T00:00:00Z"}}, + }, + message_repository=MagicMock(), + connector_state_manager=MagicMock(), + connector_state_converter=connector_state_converter, + cursor_field=CursorField(cursor_field_key="updated_at"), + ) + partition_router = cursor._partition_router + partitions = [ + StreamSlice(partition={"id": "1"}, cursor_slice={}, extra_fields={}), + StreamSlice(partition={"id": "2"}, cursor_slice={}, extra_fields={}), + ] + partition_router.stream_slices.return_value = iter(partitions) + partition_router.get_stream_state.return_value = { + "posts": {"updated_at": "2024-01-06T00:00:00Z"} + } + + slices = list(cursor.stream_slices()) + for slice in slices: + cursor.close_partition( + DeclarativePartition("test_stream", {}, MagicMock(), MagicMock(), slice) + ) + + cursor.ensure_at_least_one_state_emitted() + + final_state = cursor.state + assert final_state["use_global_cursor"] is False + assert len(final_state["states"]) == 2 + assert final_state["state"]["updated_at"] == "2024-01-02T00:00:00Z" + assert final_state["parent_state"] == {"posts": {"updated_at": "2024-01-06T00:00:00Z"}} + assert final_state["lookback_window"] == 1 + assert cursor._message_repository.emit_message.call_count == 2 + assert mock_cursor.stream_slices.call_count == 2 # Called once for each partition + assert len(cursor._semaphore_per_partition) == 1 + + +def test_given_partition_limit_exceeded_when_close_partition_then_switch_to_global_cursor(): + mock_cursor = MagicMock() + # Simulate one slice per cursor + mock_cursor.stream_slices.side_effect = [iter([{"slice" + str(i): "data"}]) for i in range(3)] + mock_cursor.state = {"updated_at": "2024-01-01T00:00:00Z"} # Set cursor state + + cursor_factory_mock = MagicMock() + cursor_factory_mock.create.return_value = mock_cursor + + connector_state_converter = CustomFormatConcurrentStreamStateConverter( + datetime_format="%Y-%m-%dT%H:%M:%SZ", + input_datetime_formats=["%Y-%m-%dT%H:%M:%SZ"], + is_sequential_state=True, + cursor_granularity=timedelta(0), + ) + + cursor = ConcurrentPerPartitionCursor( + cursor_factory=cursor_factory_mock, + partition_router=MagicMock(), + stream_name="test_stream", + stream_namespace=None, + stream_state={}, + message_repository=MagicMock(), + connector_state_manager=MagicMock(), + connector_state_converter=connector_state_converter, + cursor_field=CursorField(cursor_field_key="updated_at"), + ) + # Override default limit for testing + cursor.DEFAULT_MAX_PARTITIONS_NUMBER = 2 + cursor.SWITCH_TO_GLOBAL_LIMIT = 1 + + partition_router = cursor._partition_router + partitions = [ + StreamSlice(partition={"id": str(i)}, cursor_slice={}, extra_fields={}) for i in range(3) + ] # 3 partitions + partition_router.stream_slices.return_value = iter(partitions) + partition_router.get_stream_state.side_effect = [ + {"updated_at": "2024-01-02T00:00:00Z"}, + {"updated_at": "2024-01-03T00:00:00Z"}, + {"updated_at": "2024-01-04T00:00:00Z"}, + {"updated_at": "2024-01-04T00:00:00Z"}, + ] + + slices = list(cursor.stream_slices()) + for slice in slices: + cursor.close_partition( + DeclarativePartition("test_stream", {}, MagicMock(), MagicMock(), slice) + ) + cursor.ensure_at_least_one_state_emitted() + + final_state = cursor.state + assert len(slices) == 3 + assert final_state["use_global_cursor"] is True + assert len(final_state.get("states", [])) == 0 # No per-partition states + assert final_state["parent_state"] == {"updated_at": "2024-01-04T00:00:00Z"} + assert "lookback_window" in final_state + assert len(cursor._cursor_per_partition) <= cursor.DEFAULT_MAX_PARTITIONS_NUMBER + assert mock_cursor.stream_slices.call_count == 3 # Called once for each partition + + +def test_semaphore_cleanup(): + # Create two mock cursors with different states for each partition + mock_cursor_1 = MagicMock() + mock_cursor_1.stream_slices.return_value = iter( + [ + {"slice1": "data1"}, + {"slice2": "data1"}, # First partition slices + ] + ) + mock_cursor_1.state = {"updated_at": "2024-01-02T00:00:00Z"} # State for partition "1" + + mock_cursor_2 = MagicMock() + mock_cursor_2.stream_slices.return_value = iter( + [ + {"slice2": "data2"}, + {"slice2": "data2"}, # Second partition slices + ] + ) + mock_cursor_2.state = {"updated_at": "2024-01-03T00:00:00Z"} # State for partition "2" + + # Configure cursor factory to return different mock cursors based on partition + cursor_factory_mock = MagicMock() + cursor_factory_mock.create.side_effect = [mock_cursor_1, mock_cursor_2] + + cursor = ConcurrentPerPartitionCursor( + cursor_factory=cursor_factory_mock, + partition_router=MagicMock(), + stream_name="test_stream", + stream_namespace=None, + stream_state={}, + message_repository=MagicMock(), + connector_state_manager=MagicMock(), + connector_state_converter=MagicMock(), + cursor_field=CursorField(cursor_field_key="updated_at"), + ) + + # Simulate partitions with unique parent states + slices = [ + StreamSlice(partition={"id": "1"}, cursor_slice={}), + StreamSlice(partition={"id": "2"}, cursor_slice={}), + ] + cursor._partition_router.stream_slices.return_value = iter(slices) + # Simulate unique parent states for each partition + cursor._partition_router.get_stream_state.side_effect = [ + {"parent": {"state": "state1"}}, # Parent state for partition "1" + {"parent": {"state": "state2"}}, # Parent state for partition "2" + ] + + # Generate slices to populate semaphores and parent states + generated_slices = list( + cursor.stream_slices() + ) # Populate _semaphore_per_partition and _partition_parent_state_map + + # Verify initial state + assert len(cursor._semaphore_per_partition) == 2 + assert len(cursor._partition_parent_state_map) == 2 + assert cursor._partition_parent_state_map['{"id":"1"}'] == {"parent": {"state": "state1"}} + assert cursor._partition_parent_state_map['{"id":"2"}'] == {"parent": {"state": "state2"}} + + # Close partitions to acquire semaphores (value back to 0) + for s in generated_slices: + cursor.close_partition(DeclarativePartition("test_stream", {}, MagicMock(), MagicMock(), s)) + + # Check state after closing partitions + assert len(cursor._finished_partitions) == 2 + assert len(cursor._semaphore_per_partition) == 0 + assert '{"id":"1"}' not in cursor._semaphore_per_partition + assert '{"id":"2"}' not in cursor._semaphore_per_partition + assert len(cursor._partition_parent_state_map) == 0 # All parent states should be popped + assert cursor._parent_state == {"parent": {"state": "state2"}} # Last parent state From 1ad437e6e49cef4ad55e293ef8df5a8a79892cd3 Mon Sep 17 00:00:00 2001 From: Baz Date: Mon, 24 Feb 2025 16:10:10 +0200 Subject: [PATCH 07/11] fix: (CDK) (ConnectorBuilder) - Add `auxiliary requests` to slice; support `TestRead` for AsyncRetriever (part 1/2) (#355) --- airbyte_cdk/connector_builder/models.py | 30 ++-- .../connector_builder/test_reader/helpers.py | 142 +++++++++++++++--- .../test_reader/message_grouper.py | 19 ++- .../connector_builder/test_reader/types.py | 10 +- .../declarative/auth/token_provider.py | 1 + .../parsers/model_to_component_factory.py | 65 +++++--- airbyte_cdk/sources/http_logger.py | 3 + .../requests_native_auth/abstract_oauth.py | 1 + .../test_connector_builder_handler.py | 1 + unit_tests/sources/test_http_logger.py | 7 + 10 files changed, 216 insertions(+), 63 deletions(-) diff --git a/airbyte_cdk/connector_builder/models.py b/airbyte_cdk/connector_builder/models.py index 50eb8eb95..561c159fc 100644 --- a/airbyte_cdk/connector_builder/models.py +++ b/airbyte_cdk/connector_builder/models.py @@ -21,20 +21,6 @@ class HttpRequest: body: Optional[str] = None -@dataclass -class StreamReadPages: - records: List[object] - request: Optional[HttpRequest] = None - response: Optional[HttpResponse] = None - - -@dataclass -class StreamReadSlices: - pages: List[StreamReadPages] - slice_descriptor: Optional[Dict[str, Any]] - state: Optional[List[Dict[str, Any]]] = None - - @dataclass class LogMessage: message: str @@ -46,11 +32,27 @@ class LogMessage: @dataclass class AuxiliaryRequest: title: str + type: str description: str request: HttpRequest response: HttpResponse +@dataclass +class StreamReadPages: + records: List[object] + request: Optional[HttpRequest] = None + response: Optional[HttpResponse] = None + + +@dataclass +class StreamReadSlices: + pages: List[StreamReadPages] + slice_descriptor: Optional[Dict[str, Any]] + state: Optional[List[Dict[str, Any]]] = None + auxiliary_requests: Optional[List[AuxiliaryRequest]] = None + + @dataclass class StreamRead(object): logs: List[LogMessage] diff --git a/airbyte_cdk/connector_builder/test_reader/helpers.py b/airbyte_cdk/connector_builder/test_reader/helpers.py index 21b75c134..fcd36189f 100644 --- a/airbyte_cdk/connector_builder/test_reader/helpers.py +++ b/airbyte_cdk/connector_builder/test_reader/helpers.py @@ -28,7 +28,7 @@ SchemaInferrer, ) -from .types import LOG_MESSAGES_OUTPUT_TYPE +from .types import ASYNC_AUXILIARY_REQUEST_TYPES, LOG_MESSAGES_OUTPUT_TYPE # ------- # Parsers @@ -226,7 +226,8 @@ def should_close_page( at_least_one_page_in_group and is_log_message(message) and ( - is_page_http_request(json_message) or message.log.message.startswith("slice:") # type: ignore[union-attr] # AirbyteMessage with MessageType.LOG has log.message + is_page_http_request(json_message) + or message.log.message.startswith(SliceLogger.SLICE_LOG_PREFIX) # type: ignore[union-attr] # AirbyteMessage with MessageType.LOG has log.message ) ) @@ -330,6 +331,10 @@ def is_auxiliary_http_request(message: Optional[Dict[str, Any]]) -> bool: return is_http_log(message) and message.get("http", {}).get("is_auxiliary", False) +def is_async_auxiliary_request(message: AuxiliaryRequest) -> bool: + return message.type in ASYNC_AUXILIARY_REQUEST_TYPES + + def is_log_message(message: AirbyteMessage) -> bool: """ Determines whether the provided message is of type LOG. @@ -413,6 +418,7 @@ def handle_current_slice( current_slice_pages: List[StreamReadPages], current_slice_descriptor: Optional[Dict[str, Any]] = None, latest_state_message: Optional[Dict[str, Any]] = None, + auxiliary_requests: Optional[List[AuxiliaryRequest]] = None, ) -> StreamReadSlices: """ Handles the current slice by packaging its pages, descriptor, and state into a StreamReadSlices instance. @@ -421,6 +427,7 @@ def handle_current_slice( current_slice_pages (List[StreamReadPages]): The pages to be included in the slice. current_slice_descriptor (Optional[Dict[str, Any]]): Descriptor for the current slice, optional. latest_state_message (Optional[Dict[str, Any]]): The latest state message, optional. + auxiliary_requests (Optional[List[AuxiliaryRequest]]): The auxiliary requests to include, optional. Returns: StreamReadSlices: An object containing the current slice's pages, descriptor, and state. @@ -429,6 +436,7 @@ def handle_current_slice( pages=current_slice_pages, slice_descriptor=current_slice_descriptor, state=[latest_state_message] if latest_state_message else [], + auxiliary_requests=auxiliary_requests if auxiliary_requests else [], ) @@ -486,29 +494,24 @@ def handle_auxiliary_request(json_message: Dict[str, JsonType]) -> AuxiliaryRequ Raises: ValueError: If any of the "airbyte_cdk", "stream", or "http" fields is not a dictionary. """ - airbyte_cdk = json_message.get("airbyte_cdk", {}) - - if not isinstance(airbyte_cdk, dict): - raise ValueError( - f"Expected airbyte_cdk to be a dict, got {airbyte_cdk} of type {type(airbyte_cdk)}" - ) - - stream = airbyte_cdk.get("stream", {}) - if not isinstance(stream, dict): - raise ValueError(f"Expected stream to be a dict, got {stream} of type {type(stream)}") + airbyte_cdk = get_airbyte_cdk_from_message(json_message) + stream = get_stream_from_airbyte_cdk(airbyte_cdk) + title_prefix = get_auxiliary_request_title_prefix(stream) + http = get_http_property_from_message(json_message) + request_type = get_auxiliary_request_type(stream, http) - title_prefix = "Parent stream: " if stream.get("is_substream", False) else "" - http = json_message.get("http", {}) - - if not isinstance(http, dict): - raise ValueError(f"Expected http to be a dict, got {http} of type {type(http)}") + title = title_prefix + str(http.get("title", None)) + description = str(http.get("description", None)) + request = create_request_from_log_message(json_message) + response = create_response_from_log_message(json_message) return AuxiliaryRequest( - title=title_prefix + str(http.get("title", None)), - description=str(http.get("description", None)), - request=create_request_from_log_message(json_message), - response=create_response_from_log_message(json_message), + title=title, + type=request_type, + description=description, + request=request, + response=response, ) @@ -558,7 +561,8 @@ def handle_log_message( at_least_one_page_in_group, current_page_request, current_page_response, - auxiliary_request or log_message, + auxiliary_request, + log_message, ) @@ -589,3 +593,97 @@ def handle_record_message( datetime_format_inferrer.accumulate(message.record) # type: ignore return records_count + + +# ------- +# Reusable Getters +# ------- + + +def get_airbyte_cdk_from_message(json_message: Dict[str, JsonType]) -> dict: # type: ignore + """ + Retrieves the "airbyte_cdk" dictionary from the provided JSON message. + + This function validates that the extracted "airbyte_cdk" is of type dict, + raising a ValueError if the validation fails. + + Parameters: + json_message (Dict[str, JsonType]): A dictionary representing the JSON message. + + Returns: + dict: The "airbyte_cdk" dictionary extracted from the JSON message. + + Raises: + ValueError: If the "airbyte_cdk" field is not a dictionary. + """ + airbyte_cdk = json_message.get("airbyte_cdk", {}) + + if not isinstance(airbyte_cdk, dict): + raise ValueError( + f"Expected airbyte_cdk to be a dict, got {airbyte_cdk} of type {type(airbyte_cdk)}" + ) + + return airbyte_cdk + + +def get_stream_from_airbyte_cdk(airbyte_cdk: dict) -> dict: # type: ignore + """ + Retrieves the "stream" dictionary from the provided "airbyte_cdk" dictionary. + + This function ensures that the extracted "stream" is of type dict, + raising a ValueError if the validation fails. + + Parameters: + airbyte_cdk (dict): The dictionary representing the Airbyte CDK data. + + Returns: + dict: The "stream" dictionary extracted from the Airbyte CDK data. + + Raises: + ValueError: If the "stream" field is not a dictionary. + """ + + stream = airbyte_cdk.get("stream", {}) + + if not isinstance(stream, dict): + raise ValueError(f"Expected stream to be a dict, got {stream} of type {type(stream)}") + + return stream + + +def get_auxiliary_request_title_prefix(stream: dict) -> str: # type: ignore + """ + Generates a title prefix based on the stream type. + """ + return "Parent stream: " if stream.get("is_substream", False) else "" + + +def get_http_property_from_message(json_message: Dict[str, JsonType]) -> dict: # type: ignore + """ + Retrieves the "http" dictionary from the provided JSON message. + + This function validates that the extracted "http" is of type dict, + raising a ValueError if the validation fails. + + Parameters: + json_message (Dict[str, JsonType]): A dictionary representing the JSON message. + + Returns: + dict: The "http" dictionary extracted from the JSON message. + + Raises: + ValueError: If the "http" field is not a dictionary. + """ + http = json_message.get("http", {}) + + if not isinstance(http, dict): + raise ValueError(f"Expected http to be a dict, got {http} of type {type(http)}") + + return http + + +def get_auxiliary_request_type(stream: dict, http: dict) -> str: # type: ignore + """ + Determines the type of the auxiliary request based on the stream and HTTP properties. + """ + return "PARENT_STREAM" if stream.get("is_substream", False) else str(http.get("type", None)) diff --git a/airbyte_cdk/connector_builder/test_reader/message_grouper.py b/airbyte_cdk/connector_builder/test_reader/message_grouper.py index 56082e202..e4478a0ad 100644 --- a/airbyte_cdk/connector_builder/test_reader/message_grouper.py +++ b/airbyte_cdk/connector_builder/test_reader/message_grouper.py @@ -6,6 +6,7 @@ from typing import Any, Dict, Iterator, List, Mapping, Optional from airbyte_cdk.connector_builder.models import ( + AuxiliaryRequest, HttpRequest, HttpResponse, StreamReadPages, @@ -24,6 +25,7 @@ handle_current_slice, handle_log_message, handle_record_message, + is_async_auxiliary_request, is_config_update_message, is_log_message, is_record_message, @@ -89,6 +91,7 @@ def get_message_groups( current_page_request: Optional[HttpRequest] = None current_page_response: Optional[HttpResponse] = None latest_state_message: Optional[Dict[str, Any]] = None + slice_auxiliary_requests: List[AuxiliaryRequest] = [] while records_count < limit and (message := next(messages, None)): json_message = airbyte_message_to_json(message) @@ -106,6 +109,7 @@ def get_message_groups( current_slice_pages, current_slice_descriptor, latest_state_message, + slice_auxiliary_requests, ) current_slice_descriptor = parse_slice_description(message.log.message) # type: ignore current_slice_pages = [] @@ -118,7 +122,8 @@ def get_message_groups( at_least_one_page_in_group, current_page_request, current_page_response, - log_or_auxiliary_request, + auxiliary_request, + log_message, ) = handle_log_message( message, json_message, @@ -126,8 +131,15 @@ def get_message_groups( current_page_request, current_page_response, ) - if log_or_auxiliary_request: - yield log_or_auxiliary_request + + if auxiliary_request: + if is_async_auxiliary_request(auxiliary_request): + slice_auxiliary_requests.append(auxiliary_request) + else: + yield auxiliary_request + + if log_message: + yield log_message elif is_trace_with_error(message): if message.trace is not None: yield message.trace @@ -157,4 +169,5 @@ def get_message_groups( current_slice_pages, current_slice_descriptor, latest_state_message, + slice_auxiliary_requests, ) diff --git a/airbyte_cdk/connector_builder/test_reader/types.py b/airbyte_cdk/connector_builder/test_reader/types.py index b20a009af..0bb95d8a6 100644 --- a/airbyte_cdk/connector_builder/test_reader/types.py +++ b/airbyte_cdk/connector_builder/test_reader/types.py @@ -71,5 +71,13 @@ bool, HttpRequest | None, HttpResponse | None, - AuxiliaryRequest | AirbyteLogMessage | None, + AuxiliaryRequest | None, + AirbyteLogMessage | None, +] + +ASYNC_AUXILIARY_REQUEST_TYPES = [ + "ASYNC_CREATE", + "ASYNC_POLL", + "ASYNC_ABORT", + "ASYNC_DELETE", ] diff --git a/airbyte_cdk/sources/declarative/auth/token_provider.py b/airbyte_cdk/sources/declarative/auth/token_provider.py index 4db16ca03..c4bae02f1 100644 --- a/airbyte_cdk/sources/declarative/auth/token_provider.py +++ b/airbyte_cdk/sources/declarative/auth/token_provider.py @@ -58,6 +58,7 @@ def _refresh(self) -> None: "Obtains session token", None, is_auxiliary=True, + type="AUTH", ), ) if response is None: diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 0ad66e1b0..452c4e84a 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2629,6 +2629,47 @@ def create_async_retriever( transformations: List[RecordTransformation], **kwargs: Any, ) -> AsyncRetriever: + def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetriever: + record_selector = RecordSelector( + extractor=download_extractor, + name=name, + record_filter=None, + transformations=transformations, + schema_normalization=TypeTransformer(TransformConfig.NoTransform), + config=config, + parameters={}, + ) + paginator = ( + self._create_component_from_model( + model=model.download_paginator, decoder=decoder, config=config, url_base="" + ) + if model.download_paginator + else NoPagination(parameters={}) + ) + maximum_number_of_slices = self._limit_slices_fetched or 5 + + if self._limit_slices_fetched or self._emit_connector_builder_messages: + return SimpleRetrieverTestReadDecorator( + requester=download_requester, + record_selector=record_selector, + primary_key=None, + name=job_download_components_name, + paginator=paginator, + config=config, + parameters={}, + maximum_number_of_slices=maximum_number_of_slices, + ) + + return SimpleRetriever( + requester=download_requester, + record_selector=record_selector, + primary_key=None, + name=job_download_components_name, + paginator=paginator, + config=config, + parameters={}, + ) + decoder = ( self._create_component_from_model(model=model.decoder, config=config) if model.decoder @@ -2682,29 +2723,7 @@ def create_async_retriever( config=config, name=job_download_components_name, ) - download_retriever = SimpleRetriever( - requester=download_requester, - record_selector=RecordSelector( - extractor=download_extractor, - name=name, - record_filter=None, - transformations=transformations, - schema_normalization=TypeTransformer(TransformConfig.NoTransform), - config=config, - parameters={}, - ), - primary_key=None, - name=job_download_components_name, - paginator=( - self._create_component_from_model( - model=model.download_paginator, decoder=decoder, config=config, url_base="" - ) - if model.download_paginator - else NoPagination(parameters={}) - ), - config=config, - parameters={}, - ) + download_retriever = _get_download_retriever() abort_requester = ( self._create_component_from_model( model=model.abort_requester, diff --git a/airbyte_cdk/sources/http_logger.py b/airbyte_cdk/sources/http_logger.py index 832f17f1c..6d6785c86 100644 --- a/airbyte_cdk/sources/http_logger.py +++ b/airbyte_cdk/sources/http_logger.py @@ -15,11 +15,14 @@ def format_http_message( description: str, stream_name: Optional[str], is_auxiliary: bool | None = None, + type: Optional[str] = None, ) -> LogMessage: + request_type: str = type if type else "HTTP" request = response.request log_message = { "http": { "title": title, + "type": request_type, "description": description, "request": { "method": request.method, diff --git a/airbyte_cdk/sources/streams/http/requests_native_auth/abstract_oauth.py b/airbyte_cdk/sources/streams/http/requests_native_auth/abstract_oauth.py index df936f8b6..ac0b996d4 100644 --- a/airbyte_cdk/sources/streams/http/requests_native_auth/abstract_oauth.py +++ b/airbyte_cdk/sources/streams/http/requests_native_auth/abstract_oauth.py @@ -396,6 +396,7 @@ def _log_response(self, response: requests.Response) -> None: "Obtains access token", self._NO_STREAM_NAME, is_auxiliary=True, + type="AUTH", ), ) diff --git a/unit_tests/connector_builder/test_connector_builder_handler.py b/unit_tests/connector_builder/test_connector_builder_handler.py index e6e69bd1d..af5968faa 100644 --- a/unit_tests/connector_builder/test_connector_builder_handler.py +++ b/unit_tests/connector_builder/test_connector_builder_handler.py @@ -537,6 +537,7 @@ def test_read(): "pages": [{"records": [real_record], "request": None, "response": None}], "slice_descriptor": None, "state": None, + "auxiliary_requests": None, } ], "test_read_limit_reached": False, diff --git a/unit_tests/sources/test_http_logger.py b/unit_tests/sources/test_http_logger.py index d2491f710..2374b1940 100644 --- a/unit_tests/sources/test_http_logger.py +++ b/unit_tests/sources/test_http_logger.py @@ -65,6 +65,7 @@ def build(self): "airbyte_cdk": {"stream": {"name": A_STREAM_NAME}}, "http": { "title": A_TITLE, + "type": "HTTP", "description": A_DESCRIPTION, "request": {"method": "GET", "body": {"content": None}, "headers": {}}, "response": EMPTY_RESPONSE, @@ -85,6 +86,7 @@ def build(self): "airbyte_cdk": {"stream": {"name": A_STREAM_NAME}}, "http": { "title": A_TITLE, + "type": "HTTP", "description": A_DESCRIPTION, "request": { "method": "GET", @@ -109,6 +111,7 @@ def build(self): "airbyte_cdk": {"stream": {"name": A_STREAM_NAME}}, "http": { "title": A_TITLE, + "type": "HTTP", "description": A_DESCRIPTION, "request": {"method": "GET", "body": {"content": None}, "headers": {}}, "response": EMPTY_RESPONSE, @@ -129,6 +132,7 @@ def build(self): "airbyte_cdk": {"stream": {"name": A_STREAM_NAME}}, "http": { "title": A_TITLE, + "type": "HTTP", "description": A_DESCRIPTION, "request": { "method": "GET", @@ -153,6 +157,7 @@ def build(self): "airbyte_cdk": {"stream": {"name": A_STREAM_NAME}}, "http": { "title": A_TITLE, + "type": "HTTP", "description": A_DESCRIPTION, "request": { "method": "GET", @@ -181,6 +186,7 @@ def build(self): "airbyte_cdk": {"stream": {"name": A_STREAM_NAME}}, "http": { "title": A_TITLE, + "type": "HTTP", "description": A_DESCRIPTION, "request": { "method": "GET", @@ -208,6 +214,7 @@ def build(self): "airbyte_cdk": {"stream": {"name": A_STREAM_NAME}}, "http": { "title": A_TITLE, + "type": "HTTP", "description": A_DESCRIPTION, "request": { "method": "POST", From 4dbb6feb7f21fed4dc246285ffce2e3f0c837e9e Mon Sep 17 00:00:00 2001 From: Baz Date: Mon, 24 Feb 2025 17:31:06 +0200 Subject: [PATCH 08/11] fix: (CDK) (AsyncRetriever) - Add the `request` and `response` to each `async` operations (#356) --- .../extractors/response_to_file_extractor.py | 1 + .../requesters/http_job_repository.py | 46 +++++++++++++++++-- .../declarative/retrievers/async_retriever.py | 13 ++++-- 3 files changed, 53 insertions(+), 7 deletions(-) diff --git a/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py b/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py index 0215ddb45..76631ee6b 100644 --- a/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py +++ b/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py @@ -136,6 +136,7 @@ def _read_with_chunks( """ try: + # TODO: Add support for other file types, like `json`, with `pd.read_json()` with open(path, "r", encoding=file_encoding) as data: chunks = pd.read_csv( data, chunksize=chunk_size, iterator=True, dialect="unix", dtype=object diff --git a/airbyte_cdk/sources/declarative/requesters/http_job_repository.py b/airbyte_cdk/sources/declarative/requesters/http_job_repository.py index fce146fd8..da335b2b7 100644 --- a/airbyte_cdk/sources/declarative/requesters/http_job_repository.py +++ b/airbyte_cdk/sources/declarative/requesters/http_job_repository.py @@ -23,6 +23,7 @@ ) from airbyte_cdk.sources.declarative.requesters.requester import Requester from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever +from airbyte_cdk.sources.http_logger import format_http_message from airbyte_cdk.sources.types import Record, StreamSlice from airbyte_cdk.utils import AirbyteTracedException @@ -71,7 +72,15 @@ def _get_validated_polling_response(self, stream_slice: StreamSlice) -> requests """ polling_response: Optional[requests.Response] = self.polling_requester.send_request( - stream_slice=stream_slice + stream_slice=stream_slice, + log_formatter=lambda polling_response: format_http_message( + response=polling_response, + title="Async Job -- Polling", + description="Poll the status of the server-side async job.", + stream_name=None, + is_auxiliary=True, + type="ASYNC_POLL", + ), ) if polling_response is None: raise AirbyteTracedException( @@ -118,8 +127,17 @@ def _start_job_and_validate_response(self, stream_slice: StreamSlice) -> request """ response: Optional[requests.Response] = self.creation_requester.send_request( - stream_slice=stream_slice + stream_slice=stream_slice, + log_formatter=lambda response: format_http_message( + response=response, + title="Async Job -- Create", + description="Create the server-side async job.", + stream_name=None, + is_auxiliary=True, + type="ASYNC_CREATE", + ), ) + if not response: raise AirbyteTracedException( internal_message="Always expect a response or an exception from creation_requester", @@ -217,13 +235,33 @@ def abort(self, job: AsyncJob) -> None: if not self.abort_requester: return - self.abort_requester.send_request(stream_slice=self._get_create_job_stream_slice(job)) + abort_response = self.abort_requester.send_request( + stream_slice=self._get_create_job_stream_slice(job), + log_formatter=lambda abort_response: format_http_message( + response=abort_response, + title="Async Job -- Abort", + description="Abort the running server-side async job.", + stream_name=None, + is_auxiliary=True, + type="ASYNC_ABORT", + ), + ) def delete(self, job: AsyncJob) -> None: if not self.delete_requester: return - self.delete_requester.send_request(stream_slice=self._get_create_job_stream_slice(job)) + delete_job_reponse = self.delete_requester.send_request( + stream_slice=self._get_create_job_stream_slice(job), + log_formatter=lambda delete_job_reponse: format_http_message( + response=delete_job_reponse, + title="Async Job -- Delete", + description="Delete the specified job from the list of Jobs.", + stream_name=None, + is_auxiliary=True, + type="ASYNC_DELETE", + ), + ) self._clean_up_job(job.api_job_id()) def _clean_up_job(self, job_id: str) -> None: diff --git a/airbyte_cdk/sources/declarative/retrievers/async_retriever.py b/airbyte_cdk/sources/declarative/retrievers/async_retriever.py index 24f52cfd3..7dad06a54 100644 --- a/airbyte_cdk/sources/declarative/retrievers/async_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/async_retriever.py @@ -1,13 +1,12 @@ # Copyright (c) 2024 Airbyte, Inc., all rights reserved. -from dataclasses import InitVar, dataclass +from dataclasses import InitVar, dataclass, field from typing import Any, Iterable, Mapping, Optional from typing_extensions import deprecated from airbyte_cdk.sources.declarative.async_job.job import AsyncJob -from airbyte_cdk.sources.declarative.async_job.job_orchestrator import AsyncPartition from airbyte_cdk.sources.declarative.extractors.record_selector import RecordSelector from airbyte_cdk.sources.declarative.partition_routers.async_job_partition_router import ( AsyncJobPartitionRouter, @@ -16,6 +15,7 @@ from airbyte_cdk.sources.source import ExperimentalClassWarning from airbyte_cdk.sources.streams.core import StreamData from airbyte_cdk.sources.types import Config, StreamSlice, StreamState +from airbyte_cdk.sources.utils.slice_logger import AlwaysLogSliceLogger @deprecated( @@ -28,6 +28,10 @@ class AsyncRetriever(Retriever): parameters: InitVar[Mapping[str, Any]] record_selector: RecordSelector stream_slicer: AsyncJobPartitionRouter + slice_logger: AlwaysLogSliceLogger = field( + init=False, + default_factory=lambda: AlwaysLogSliceLogger(), + ) def __post_init__(self, parameters: Mapping[str, Any]) -> None: self._parameters = parameters @@ -75,13 +79,16 @@ def _validate_and_get_stream_slice_jobs( return stream_slice.extra_fields.get("jobs", []) if stream_slice else [] def stream_slices(self) -> Iterable[Optional[StreamSlice]]: - return self.stream_slicer.stream_slices() + yield from self.stream_slicer.stream_slices() def read_records( self, records_schema: Mapping[str, Any], stream_slice: Optional[StreamSlice] = None, ) -> Iterable[StreamData]: + # emit the slice_descriptor log message, for connector builder TestRead + yield self.slice_logger.create_slice_log_message(stream_slice.cursor_slice) # type: ignore + stream_state: StreamState = self._get_stream_state() jobs: Iterable[AsyncJob] = self._validate_and_get_stream_slice_jobs(stream_slice) records: Iterable[Mapping[str, Any]] = self.stream_slicer.fetch_records(jobs) From a4022888377543bfb96aeb00e5da140f185b10f4 Mon Sep 17 00:00:00 2001 From: Baz Date: Wed, 26 Feb 2025 15:06:40 +0200 Subject: [PATCH 09/11] fix: (CDK) (AsyncRetriever) - Improve UX on variable naming and interpolation (#368) --- .../declarative_component_schema.yaml | 24 ++++++++++++-- .../models/declarative_component_schema.py | 4 +-- .../parsers/model_to_component_factory.py | 14 ++++----- .../sources/declarative/requesters/README.md | 10 +++--- .../requesters/http_job_repository.py | 31 +++++++++++-------- .../declarative/requesters/http_requester.py | 8 ++++- .../test_model_to_component_factory.py | 12 +++---- .../requesters/test_http_job_repository.py | 6 ++-- .../test_concurrent_declarative_source.py | 4 +-- 9 files changed, 71 insertions(+), 42 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 5bd110c4e..c664e237a 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1779,6 +1779,9 @@ definitions: - stream_interval - stream_partition - stream_slice + - creation_response + - polling_response + - download_target examples: - "/products" - "/quotes/{{ stream_partition['id'] }}/quote_line_groups" @@ -3223,7 +3226,7 @@ definitions: - polling_requester - download_requester - status_extractor - - urls_extractor + - download_target_extractor properties: type: type: string @@ -3240,7 +3243,7 @@ definitions: anyOf: - "$ref": "#/definitions/CustomRecordExtractor" - "$ref": "#/definitions/DpathExtractor" - urls_extractor: + download_target_extractor: description: Responsible for fetching the final result `urls` provided by the completed / finished / ready async job. anyOf: - "$ref": "#/definitions/CustomRecordExtractor" @@ -3261,7 +3264,7 @@ definitions: anyOf: - "$ref": "#/definitions/CustomRequester" - "$ref": "#/definitions/HttpRequester" - url_requester: + download_target_requester: description: Requester component that describes how to prepare HTTP requests to send to the source API to extract the url from polling response by the completed async job. anyOf: - "$ref": "#/definitions/CustomRequester" @@ -3667,6 +3670,21 @@ interpolation: self: https://api.sendgrid.com/v3/marketing/lists?page_size=1&page_token= next: https://api.sendgrid.com/v3/marketing/lists?page_size=1&page_token=0236d6d2 count: 82 + - title: creation_response + description: The response received from the creation_requester in the AsyncRetriever component. + type: object + examples: + - id: "1234" + - title: polling_response + description: The response received from the polling_requester in the AsyncRetriever component. + type: object + examples: + - id: "1234" + - title: download_target + description: The `URL` received from the polling_requester in the AsyncRetriever with jobStatus as `COMPLETED`. + type: string + examples: + - "https://api.sendgrid.com/v3/marketing/lists?page_size=1&page_token=0236d6d2&filename=xxx_yyy_zzz.csv" - title: stream_interval description: The current stream interval being processed. The keys are defined by the incremental sync component. Default keys are `start_time` and `end_time`. type: object diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 7ff18fa1d..abe4d89cf 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -2263,7 +2263,7 @@ class AsyncRetriever(BaseModel): status_extractor: Union[CustomRecordExtractor, DpathExtractor] = Field( ..., description="Responsible for fetching the actual status of the async job." ) - urls_extractor: Union[CustomRecordExtractor, DpathExtractor] = Field( + download_target_extractor: Union[CustomRecordExtractor, DpathExtractor] = Field( ..., description="Responsible for fetching the final result `urls` provided by the completed / finished / ready async job.", ) @@ -2278,7 +2278,7 @@ class AsyncRetriever(BaseModel): ..., description="Requester component that describes how to prepare HTTP requests to send to the source API to fetch the status of the running async job.", ) - url_requester: Optional[Union[CustomRequester, HttpRequester]] = Field( + download_target_requester: Optional[Union[CustomRequester, HttpRequester]] = Field( None, description="Requester component that describes how to prepare HTTP requests to send to the source API to extract the url from polling response by the completed async job.", ) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 452c4e84a..f6e1cc0d6 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2744,32 +2744,32 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie if model.delete_requester else None ) - url_requester = ( + download_target_requester = ( self._create_component_from_model( - model=model.url_requester, + model=model.download_target_requester, decoder=decoder, config=config, name=f"job extract_url - {name}", ) - if model.url_requester + if model.download_target_requester else None ) status_extractor = self._create_component_from_model( model=model.status_extractor, decoder=decoder, config=config, name=name ) - urls_extractor = self._create_component_from_model( - model=model.urls_extractor, decoder=decoder, config=config, name=name + download_target_extractor = self._create_component_from_model( + model=model.download_target_extractor, decoder=decoder, config=config, name=name ) job_repository: AsyncJobRepository = AsyncHttpJobRepository( creation_requester=creation_requester, polling_requester=polling_requester, download_retriever=download_retriever, - url_requester=url_requester, + download_target_requester=download_target_requester, abort_requester=abort_requester, delete_requester=delete_requester, status_extractor=status_extractor, status_mapping=self._create_async_job_status_mapping(model.status_mapping, config), - urls_extractor=urls_extractor, + download_target_extractor=download_target_extractor, ) async_job_partition_router = AsyncJobPartitionRouter( diff --git a/airbyte_cdk/sources/declarative/requesters/README.md b/airbyte_cdk/sources/declarative/requesters/README.md index de8b3380c..cfeaf7e76 100644 --- a/airbyte_cdk/sources/declarative/requesters/README.md +++ b/airbyte_cdk/sources/declarative/requesters/README.md @@ -1,8 +1,8 @@ # AsyncHttpJobRepository sequence diagram - Components marked as optional are not required and can be ignored. -- if `url_requester` is not provided, `urls_extractor` will get urls from the `polling_job_response` -- interpolation_context, e.g. `create_job_response` or `polling_job_response` can be obtained from stream_slice +- if `download_target_requester` is not provided, `download_target_extractor` will get urls from the `polling_response` +- interpolation_context, e.g. `creation_response` or `polling_response` can be obtained from stream_slice ```mermaid --- @@ -12,7 +12,7 @@ sequenceDiagram participant AsyncHttpJobRepository as AsyncOrchestrator participant CreationRequester as creation_requester participant PollingRequester as polling_requester - participant UrlRequester as url_requester (Optional) + participant UrlRequester as download_target_requester (Optional) participant DownloadRetriever as download_retriever participant AbortRequester as abort_requester (Optional) participant DeleteRequester as delete_requester (Optional) @@ -25,14 +25,14 @@ sequenceDiagram loop Poll for job status AsyncHttpJobRepository ->> PollingRequester: Check job status - PollingRequester ->> Reporting Server: Status request (interpolation_context: `create_job_response`) + PollingRequester ->> Reporting Server: Status request (interpolation_context: `creation_response`) Reporting Server -->> PollingRequester: Status response PollingRequester -->> AsyncHttpJobRepository: Job status end alt Status: Ready AsyncHttpJobRepository ->> UrlRequester: Request download URLs (if applicable) - UrlRequester ->> Reporting Server: URL request (interpolation_context: `polling_job_response`) + UrlRequester ->> Reporting Server: URL request (interpolation_context: `polling_response`) Reporting Server -->> UrlRequester: Download URLs UrlRequester -->> AsyncHttpJobRepository: Download URLs diff --git a/airbyte_cdk/sources/declarative/requesters/http_job_repository.py b/airbyte_cdk/sources/declarative/requesters/http_job_repository.py index da335b2b7..28e9528ea 100644 --- a/airbyte_cdk/sources/declarative/requesters/http_job_repository.py +++ b/airbyte_cdk/sources/declarative/requesters/http_job_repository.py @@ -43,13 +43,13 @@ class AsyncHttpJobRepository(AsyncJobRepository): delete_requester: Optional[Requester] status_extractor: DpathExtractor status_mapping: Mapping[str, AsyncJobStatus] - urls_extractor: DpathExtractor + download_target_extractor: DpathExtractor job_timeout: Optional[timedelta] = None record_extractor: RecordExtractor = field( init=False, repr=False, default_factory=lambda: ResponseToFileExtractor({}) ) - url_requester: Optional[Requester] = ( + download_target_requester: Optional[Requester] = ( None # use it in case polling_requester provides some and extra request is needed to obtain list of urls to download from ) @@ -211,12 +211,15 @@ def fetch_records(self, job: AsyncJob) -> Iterable[Mapping[str, Any]]: """ - for url in self._get_download_url(job): + for target_url in self._get_download_targets(job): job_slice = job.job_parameters() stream_slice = StreamSlice( partition=job_slice.partition, cursor_slice=job_slice.cursor_slice, - extra_fields={**job_slice.extra_fields, "url": url}, + extra_fields={ + **job_slice.extra_fields, + "download_target": target_url, + }, ) for message in self.download_retriever.read_records({}, stream_slice): if isinstance(message, Record): @@ -269,27 +272,29 @@ def _clean_up_job(self, job_id: str) -> None: del self._polling_job_response_by_id[job_id] def _get_create_job_stream_slice(self, job: AsyncJob) -> StreamSlice: + creation_response = self._create_job_response_by_id[job.api_job_id()].json() stream_slice = StreamSlice( - partition={"create_job_response": self._create_job_response_by_id[job.api_job_id()]}, + partition={}, cursor_slice={}, + extra_fields={"creation_response": creation_response}, ) return stream_slice - def _get_download_url(self, job: AsyncJob) -> Iterable[str]: - if not self.url_requester: + def _get_download_targets(self, job: AsyncJob) -> Iterable[str]: + if not self.download_target_requester: url_response = self._polling_job_response_by_id[job.api_job_id()] else: + polling_response = self._polling_job_response_by_id[job.api_job_id()].json() stream_slice: StreamSlice = StreamSlice( - partition={ - "polling_job_response": self._polling_job_response_by_id[job.api_job_id()] - }, + partition={}, cursor_slice={}, + extra_fields={"polling_response": polling_response}, ) - url_response = self.url_requester.send_request(stream_slice=stream_slice) # type: ignore # we expect url_requester to always be presented, otherwise raise an exception as we cannot proceed with the report + url_response = self.download_target_requester.send_request(stream_slice=stream_slice) # type: ignore # we expect download_target_requester to always be presented, otherwise raise an exception as we cannot proceed with the report if not url_response: raise AirbyteTracedException( - internal_message="Always expect a response or an exception from url_requester", + internal_message="Always expect a response or an exception from download_target_requester", failure_type=FailureType.system_error, ) - yield from self.urls_extractor.extract_records(url_response) # type: ignore # we expect urls_extractor to always return list of strings + yield from self.download_target_extractor.extract_records(url_response) # type: ignore # we expect download_target_extractor to always return list of strings diff --git a/airbyte_cdk/sources/declarative/requesters/http_requester.py b/airbyte_cdk/sources/declarative/requesters/http_requester.py index e8c446503..8a7b6aba0 100644 --- a/airbyte_cdk/sources/declarative/requesters/http_requester.py +++ b/airbyte_cdk/sources/declarative/requesters/http_requester.py @@ -85,7 +85,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: self._parameters = parameters if self.error_handler is not None and hasattr(self.error_handler, "backoff_strategies"): - backoff_strategies = self.error_handler.backoff_strategies + backoff_strategies = self.error_handler.backoff_strategies # type: ignore else: backoff_strategies = None @@ -125,6 +125,12 @@ def get_path( kwargs = { "stream_slice": stream_slice, "next_page_token": next_page_token, + # update the interpolation context with extra fields, if passed. + **( + stream_slice.extra_fields + if stream_slice is not None and hasattr(stream_slice, "extra_fields") + else {} + ), } path = str(self._path.eval(self.config, **kwargs)) return path.lstrip("/") diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index faab999cb..be9177638 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -3607,7 +3607,7 @@ def test_create_async_retriever(): "timeout": ["timeout"], "completed": ["ready"], }, - "urls_extractor": {"type": "DpathExtractor", "field_path": ["urls"]}, + "download_target_extractor": {"type": "DpathExtractor", "field_path": ["urls"]}, "record_selector": { "type": "RecordSelector", "extractor": {"type": "DpathExtractor", "field_path": ["data"]}, @@ -3615,7 +3615,7 @@ def test_create_async_retriever(): "status_extractor": {"type": "DpathExtractor", "field_path": ["status"]}, "polling_requester": { "type": "HttpRequester", - "path": "/v3/marketing/contacts/exports/{{stream_slice['create_job_response'].json()['id'] }}", + "path": "/v3/marketing/contacts/exports/{{creation_response['id'] }}", "url_base": "https://api.sendgrid.com", "http_method": "GET", "authenticator": { @@ -3635,19 +3635,19 @@ def test_create_async_retriever(): }, "download_requester": { "type": "HttpRequester", - "path": "{{stream_slice['url']}}", + "path": "{{download_target}}", "url_base": "", "http_method": "GET", }, "abort_requester": { "type": "HttpRequester", - "path": "{{stream_slice['url']}}/abort", + "path": "{{download_target}}/abort", "url_base": "", "http_method": "POST", }, "delete_requester": { "type": "HttpRequester", - "path": "{{stream_slice['url']}}", + "path": "{{download_target}}", "url_base": "", "http_method": "POST", }, @@ -3681,7 +3681,7 @@ def test_create_async_retriever(): assert job_repository.abort_requester assert job_repository.delete_requester assert job_repository.status_extractor - assert job_repository.urls_extractor + assert job_repository.download_target_extractor selector = component.record_selector extractor = selector.extractor diff --git a/unit_tests/sources/declarative/requesters/test_http_job_repository.py b/unit_tests/sources/declarative/requesters/test_http_job_repository.py index cdc14e600..4be3ecb11 100644 --- a/unit_tests/sources/declarative/requesters/test_http_job_repository.py +++ b/unit_tests/sources/declarative/requesters/test_http_job_repository.py @@ -69,7 +69,7 @@ def setUp(self) -> None: self._polling_job_requester = HttpRequester( name="stream : polling", url_base=_URL_BASE, - path=_EXPORT_PATH + "/{{stream_slice['create_job_response'].json()['id']}}", + path=_EXPORT_PATH + "/{{creation_response['id']}}", error_handler=error_handler, http_method=HttpMethod.GET, config=_ANY_CONFIG, @@ -84,7 +84,7 @@ def setUp(self) -> None: requester=HttpRequester( name="stream : fetch_result", url_base="", - path="{{stream_slice.extra_fields['url']}}", + path="{{download_target}}", error_handler=error_handler, http_method=HttpMethod.GET, config=_ANY_CONFIG, @@ -143,7 +143,7 @@ def setUp(self) -> None: "failure": AsyncJobStatus.FAILED, "pending": AsyncJobStatus.RUNNING, }, - urls_extractor=DpathExtractor( + download_target_extractor=DpathExtractor( decoder=JsonDecoder(parameters={}), field_path=["urls"], config={}, diff --git a/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/unit_tests/sources/declarative/test_concurrent_declarative_source.py index 188256b10..8d9e6c675 100644 --- a/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -299,7 +299,7 @@ "timeout": ["timeout"], "completed": ["ready"], }, - "urls_extractor": {"type": "DpathExtractor", "field_path": ["urls"]}, + "download_target_extractor": {"type": "DpathExtractor", "field_path": ["urls"]}, "record_selector": { "type": "RecordSelector", "extractor": {"type": "DpathExtractor", "field_path": []}, @@ -307,7 +307,7 @@ "status_extractor": {"type": "DpathExtractor", "field_path": ["status"]}, "polling_requester": { "type": "HttpRequester", - "path": "/async_job/{{stream_slice['create_job_response'].json()['id'] }}", + "path": "/async_job/{{creation_response['id'] }}", "http_method": "GET", "authenticator": { "type": "BearerAuthenticator", From 406542d10bdc843096c9da727756094be50352f5 Mon Sep 17 00:00:00 2001 From: Daryna Ishchenko <80129833+darynaishchenko@users.noreply.github.com> Date: Thu, 27 Feb 2025 17:26:10 +0200 Subject: [PATCH 10/11] feat: (low-code cdk) datetime format with milliseconds (#369) --- .../sources/declarative/datetime/datetime_parser.py | 8 +++++++- .../declarative/declarative_component_schema.yaml | 2 ++ .../declarative/datetime/test_datetime_parser.py | 12 ++++++++++++ 3 files changed, 21 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/datetime/datetime_parser.py b/airbyte_cdk/sources/declarative/datetime/datetime_parser.py index 93122e29c..130406fcc 100644 --- a/airbyte_cdk/sources/declarative/datetime/datetime_parser.py +++ b/airbyte_cdk/sources/declarative/datetime/datetime_parser.py @@ -31,7 +31,8 @@ def parse(self, date: Union[str, int], format: str) -> datetime.datetime: return datetime.datetime.fromtimestamp(float(date), tz=datetime.timezone.utc) elif format == "%ms": return self._UNIX_EPOCH + datetime.timedelta(milliseconds=int(date)) - + elif "%_ms" in format: + format = format.replace("%_ms", "%f") parsed_datetime = datetime.datetime.strptime(str(date), format) if self._is_naive(parsed_datetime): return parsed_datetime.replace(tzinfo=datetime.timezone.utc) @@ -48,6 +49,11 @@ def format(self, dt: datetime.datetime, format: str) -> str: if format == "%ms": # timstamp() returns a float representing the number of seconds since the unix epoch return str(int(dt.timestamp() * 1000)) + if "%_ms" in format: + _format = format.replace("%_ms", "%f") + milliseconds = int(dt.microsecond / 1000) + formatted_dt = dt.strftime(_format).replace(dt.strftime("%f"), "%03d" % milliseconds) + return formatted_dt else: return dt.strftime(format) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index c664e237a..35244aedc 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -844,6 +844,7 @@ definitions: * **%M**: Minute (zero-padded) - `00`, `01`, ..., `59` * **%S**: Second (zero-padded) - `00`, `01`, ..., `59` * **%f**: Microsecond (zero-padded to 6 digits) - `000000` + * **%_ms**: Millisecond (zero-padded to 3 digits) - `000` * **%z**: UTC offset - `(empty)`, `+0000`, `-04:00` * **%Z**: Time zone name - `(empty)`, `UTC`, `GMT` * **%j**: Day of the year (zero-padded) - `001`, `002`, ..., `366` @@ -2401,6 +2402,7 @@ definitions: * **%M**: Minute (zero-padded) - `00`, `01`, ..., `59` * **%S**: Second (zero-padded) - `00`, `01`, ..., `59` * **%f**: Microsecond (zero-padded to 6 digits) - `000000`, `000001`, ..., `999999` + * **%_ms**: Millisecond (zero-padded to 3 digits) - `000`, `001`, ..., `999` * **%z**: UTC offset - `(empty)`, `+0000`, `-04:00` * **%Z**: Time zone name - `(empty)`, `UTC`, `GMT` * **%j**: Day of the year (zero-padded) - `001`, `002`, ..., `366` diff --git a/unit_tests/sources/declarative/datetime/test_datetime_parser.py b/unit_tests/sources/declarative/datetime/test_datetime_parser.py index 7a1ba951b..640abd6c2 100644 --- a/unit_tests/sources/declarative/datetime/test_datetime_parser.py +++ b/unit_tests/sources/declarative/datetime/test_datetime_parser.py @@ -50,6 +50,12 @@ "%Y%m%d", datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), ), + ( + "test_parse_format_datetime_with__ms", + "2021-11-22T08:41:55.640Z", + "%Y-%m-%dT%H:%M:%S.%_msZ", + datetime.datetime(2021, 11, 22, 8, 41, 55, 640000, tzinfo=datetime.timezone.utc), + ), ], ) def test_parse_date(test_name, input_date, date_format, expected_output_date): @@ -91,6 +97,12 @@ def test_parse_date(test_name, input_date, date_format, expected_output_date): "%Y%m%d", "20210101", ), + ( + "test_parse_format_datetime_with__ms", + datetime.datetime(2021, 11, 22, 8, 41, 55, 640000, tzinfo=datetime.timezone.utc), + "%Y-%m-%dT%H:%M:%S.%_msZ", + "2021-11-22T08:41:55.640Z", + ), ], ) def test_format_datetime(test_name, input_dt, datetimeformat, expected_output): From 533b70adf97a61855672327756b5cca954ca839a Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants <36314070+artem1205@users.noreply.github.com> Date: Fri, 28 Feb 2025 18:49:41 +0100 Subject: [PATCH 11/11] feat: add IncrementingCountCursor (#346) Signed-off-by: Artem Inzhyyants Co-authored-by: brianjlai --- .../concurrent_declarative_source.py | 35 ++++-- .../declarative_component_schema.yaml | 39 ++++++ .../models/declarative_component_schema.py | 26 +++- .../parsers/model_to_component_factory.py | 112 ++++++++++++++++++ .../abstract_stream_state_converter.py | 3 +- ...crementing_count_stream_state_converter.py | 92 ++++++++++++++ .../test_concurrent_declarative_source.py | 73 +++++++++++- ...test_incrementing_count_state_converter.py | 29 +++++ 8 files changed, 398 insertions(+), 11 deletions(-) create mode 100644 airbyte_cdk/sources/streams/concurrent/state_converters/incrementing_count_stream_state_converter.py create mode 100644 unit_tests/sources/streams/concurrent/test_incrementing_count_state_converter.py diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 1dd5b962c..c92ffb150 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -31,6 +31,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( DatetimeBasedCursor as DatetimeBasedCursorModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + IncrementingCountCursor as IncrementingCountCursorModel, +) from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ( ModelToComponentFactory, ) @@ -222,7 +225,7 @@ def _group_streams( and not incremental_sync_component_definition ) - if self._is_datetime_incremental_without_partition_routing( + if self._is_concurrent_cursor_incremental_without_partition_routing( declarative_stream, incremental_sync_component_definition ): stream_state = self._connector_state_manager.get_stream_state( @@ -254,15 +257,26 @@ def _group_streams( stream_slicer=declarative_stream.retriever.stream_slicer, ) else: - cursor = ( - self._constructor.create_concurrent_cursor_from_datetime_based_cursor( + if ( + incremental_sync_component_definition + and incremental_sync_component_definition.get("type") + == IncrementingCountCursorModel.__name__ + ): + cursor = self._constructor.create_concurrent_cursor_from_incrementing_count_cursor( + model_type=IncrementingCountCursorModel, + component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above + stream_name=declarative_stream.name, + stream_namespace=declarative_stream.namespace, + config=config or {}, + ) + else: + cursor = self._constructor.create_concurrent_cursor_from_datetime_based_cursor( model_type=DatetimeBasedCursorModel, component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above stream_name=declarative_stream.name, stream_namespace=declarative_stream.namespace, config=config or {}, ) - ) partition_generator = StreamSlicerPartitionGenerator( partition_factory=DeclarativePartitionFactory( declarative_stream.name, @@ -389,7 +403,7 @@ def _group_streams( return concurrent_streams, synchronous_streams - def _is_datetime_incremental_without_partition_routing( + def _is_concurrent_cursor_incremental_without_partition_routing( self, declarative_stream: DeclarativeStream, incremental_sync_component_definition: Mapping[str, Any] | None, @@ -397,11 +411,18 @@ def _is_datetime_incremental_without_partition_routing( return ( incremental_sync_component_definition is not None and bool(incremental_sync_component_definition) - and incremental_sync_component_definition.get("type", "") - == DatetimeBasedCursorModel.__name__ + and ( + incremental_sync_component_definition.get("type", "") + in (DatetimeBasedCursorModel.__name__, IncrementingCountCursorModel.__name__) + ) and hasattr(declarative_stream.retriever, "stream_slicer") and ( isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor) + # IncrementingCountCursorModel is hardcoded to be of type DatetimeBasedCursor + # add isintance check here if we want to create a Declarative IncrementingCountCursor + # or isinstance( + # declarative_stream.retriever.stream_slicer, IncrementingCountCursor + # ) or isinstance(declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter) ) ) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 35244aedc..ac57ca992 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -777,6 +777,44 @@ definitions: type: type: string enum: [LegacyToPerPartitionStateMigration] + IncrementingCountCursor: + title: Incrementing Count Cursor + description: Cursor that allows for incremental sync according to a continuously increasing integer. + type: object + required: + - type + - cursor_field + properties: + type: + type: string + enum: [IncrementingCountCursor] + cursor_field: + title: Cursor Field + description: The location of the value on a record that will be used as a bookmark during sync. To ensure no data loss, the API must return records in ascending order based on the cursor field. Nested fields are not supported, so the field must be at the top level of the record. You can use a combination of Add Field and Remove Field transformations to move the nested field to the top. + type: string + interpolation_context: + - config + examples: + - "created_at" + - "{{ config['record_cursor'] }}" + start_value: + title: Start Value + description: The value that determines the earliest record that should be synced. + anyOf: + - type: string + - type: integer + interpolation_context: + - config + examples: + - 0 + - "{{ config['start_value'] }}" + start_value_option: + title: Inject Start Value Into Outgoing HTTP Request + description: Optionally configures how the start value will be sent in requests to the source API. + "$ref": "#/definitions/RequestOption" + $parameters: + type: object + additionalProperties: true DatetimeBasedCursor: title: Datetime Based Cursor description: Cursor to provide incremental capabilities over datetime. @@ -1319,6 +1357,7 @@ definitions: anyOf: - "$ref": "#/definitions/CustomIncrementalSync" - "$ref": "#/definitions/DatetimeBasedCursor" + - "$ref": "#/definitions/IncrementingCountCursor" name: title: Name description: The stream name. diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index abe4d89cf..a23a10070 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1508,6 +1508,28 @@ class AuthFlow(BaseModel): oauth_config_specification: Optional[OAuthConfigSpecification] = None +class IncrementingCountCursor(BaseModel): + type: Literal["IncrementingCountCursor"] + cursor_field: str = Field( + ..., + description="The location of the value on a record that will be used as a bookmark during sync. To ensure no data loss, the API must return records in ascending order based on the cursor field. Nested fields are not supported, so the field must be at the top level of the record. You can use a combination of Add Field and Remove Field transformations to move the nested field to the top.", + examples=["created_at", "{{ config['record_cursor'] }}"], + title="Cursor Field", + ) + start_value: Optional[Union[str, int]] = Field( + None, + description="The value that determines the earliest record that should be synced.", + examples=[0, "{{ config['start_value'] }}"], + title="Start Value", + ) + start_value_option: Optional[RequestOption] = Field( + None, + description="Optionally configures how the start value will be sent in requests to the source API.", + title="Inject Start Value Into Outgoing HTTP Request", + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class DatetimeBasedCursor(BaseModel): type: Literal["DatetimeBasedCursor"] clamping: Optional[Clamping] = Field( @@ -1948,7 +1970,9 @@ class Config: description="Component used to coordinate how records are extracted across stream slices and request pages.", title="Retriever", ) - incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = Field( + incremental_sync: Optional[ + Union[CustomIncrementalSync, DatetimeBasedCursor, IncrementingCountCursor] + ] = Field( None, description="Component used to fetch data incrementally based on a time field in the data.", title="Incremental Sync", diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index f6e1cc0d6..39058f834 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -245,6 +245,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( HttpResponseFilter as HttpResponseFilterModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + IncrementingCountCursor as IncrementingCountCursorModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( InlineSchemaLoader as InlineSchemaLoaderModel, ) @@ -496,6 +499,9 @@ CustomFormatConcurrentStreamStateConverter, DateTimeStreamStateConverter, ) +from airbyte_cdk.sources.streams.concurrent.state_converters.incrementing_count_stream_state_converter import ( + IncrementingCountStreamStateConverter, +) from airbyte_cdk.sources.streams.http.error_handlers.response_models import ResponseAction from airbyte_cdk.sources.types import Config from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer @@ -584,6 +590,7 @@ def _init_mappings(self) -> None: FlattenFieldsModel: self.create_flatten_fields, DpathFlattenFieldsModel: self.create_dpath_flatten_fields, IterableDecoderModel: self.create_iterable_decoder, + IncrementingCountCursorModel: self.create_incrementing_count_cursor, XmlDecoderModel: self.create_xml_decoder, JsonFileSchemaLoaderModel: self.create_json_file_schema_loader, DynamicSchemaLoaderModel: self.create_dynamic_schema_loader, @@ -1189,6 +1196,70 @@ def create_concurrent_cursor_from_datetime_based_cursor( clamping_strategy=clamping_strategy, ) + def create_concurrent_cursor_from_incrementing_count_cursor( + self, + model_type: Type[BaseModel], + component_definition: ComponentDefinition, + stream_name: str, + stream_namespace: Optional[str], + config: Config, + message_repository: Optional[MessageRepository] = None, + **kwargs: Any, + ) -> ConcurrentCursor: + # Per-partition incremental streams can dynamically create child cursors which will pass their current + # state via the stream_state keyword argument. Incremental syncs without parent streams use the + # incoming state and connector_state_manager that is initialized when the component factory is created + stream_state = ( + self._connector_state_manager.get_stream_state(stream_name, stream_namespace) + if "stream_state" not in kwargs + else kwargs["stream_state"] + ) + + component_type = component_definition.get("type") + if component_definition.get("type") != model_type.__name__: + raise ValueError( + f"Expected manifest component of type {model_type.__name__}, but received {component_type} instead" + ) + + incrementing_count_cursor_model = model_type.parse_obj(component_definition) + + if not isinstance(incrementing_count_cursor_model, IncrementingCountCursorModel): + raise ValueError( + f"Expected {model_type.__name__} component, but received {incrementing_count_cursor_model.__class__.__name__}" + ) + + interpolated_start_value = ( + InterpolatedString.create( + incrementing_count_cursor_model.start_value, # type: ignore + parameters=incrementing_count_cursor_model.parameters or {}, + ) + if incrementing_count_cursor_model.start_value + else 0 + ) + + interpolated_cursor_field = InterpolatedString.create( + incrementing_count_cursor_model.cursor_field, + parameters=incrementing_count_cursor_model.parameters or {}, + ) + cursor_field = CursorField(interpolated_cursor_field.eval(config=config)) + + connector_state_converter = IncrementingCountStreamStateConverter( + is_sequential_state=True, # ConcurrentPerPartitionCursor only works with sequential state + ) + + return ConcurrentCursor( + stream_name=stream_name, + stream_namespace=stream_namespace, + stream_state=stream_state, + message_repository=message_repository or self._message_repository, + connector_state_manager=self._connector_state_manager, + connector_state_converter=connector_state_converter, + cursor_field=cursor_field, + slice_boundary_fields=None, + start=interpolated_start_value, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice + end_provider=connector_state_converter.get_end_provider(), # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice + ) + def _assemble_weekday(self, weekday: str) -> Weekday: match weekday: case "MONDAY": @@ -1622,6 +1693,31 @@ def create_declarative_stream( config=config, parameters=model.parameters or {}, ) + elif model.incremental_sync and isinstance( + model.incremental_sync, IncrementingCountCursorModel + ): + cursor_model: IncrementingCountCursorModel = model.incremental_sync # type: ignore + + start_time_option = ( + self._create_component_from_model( + cursor_model.start_value_option, # type: ignore # mypy still thinks cursor_model of type DatetimeBasedCursor + config, + parameters=cursor_model.parameters or {}, + ) + if cursor_model.start_value_option # type: ignore # mypy still thinks cursor_model of type DatetimeBasedCursor + else None + ) + + # The concurrent engine defaults the start/end fields on the slice to "start" and "end", but + # the default DatetimeBasedRequestOptionsProvider() sets them to start_time/end_time + partition_field_start = "start" + + request_options_provider = DatetimeBasedRequestOptionsProvider( + start_time_option=start_time_option, + partition_field_start=partition_field_start, + config=config, + parameters=model.parameters or {}, + ) else: request_options_provider = None @@ -2111,6 +2207,22 @@ def create_gzip_decoder( stream_response=False if self._emit_connector_builder_messages else True, ) + @staticmethod + def create_incrementing_count_cursor( + model: IncrementingCountCursorModel, config: Config, **kwargs: Any + ) -> DatetimeBasedCursor: + # This should not actually get used anywhere at runtime, but needed to add this to pass checks since + # we still parse models into components. The issue is that there's no runtime implementation of a + # IncrementingCountCursor. + # A known and expected issue with this stub is running a check with the declared IncrementingCountCursor because it is run without ConcurrentCursor. + return DatetimeBasedCursor( + cursor_field=model.cursor_field, + datetime_format="%Y-%m-%d", + start_datetime="2024-12-12", + config=config, + parameters={}, + ) + @staticmethod def create_iterable_decoder( model: IterableDecoderModel, config: Config, **kwargs: Any diff --git a/airbyte_cdk/sources/streams/concurrent/state_converters/abstract_stream_state_converter.py b/airbyte_cdk/sources/streams/concurrent/state_converters/abstract_stream_state_converter.py index 987915317..ccff41ba7 100644 --- a/airbyte_cdk/sources/streams/concurrent/state_converters/abstract_stream_state_converter.py +++ b/airbyte_cdk/sources/streams/concurrent/state_converters/abstract_stream_state_converter.py @@ -4,7 +4,7 @@ from abc import ABC, abstractmethod from enum import Enum -from typing import TYPE_CHECKING, Any, List, MutableMapping, Optional, Tuple +from typing import TYPE_CHECKING, Any, Callable, List, MutableMapping, Optional, Tuple if TYPE_CHECKING: from airbyte_cdk.sources.streams.concurrent.cursor import CursorField @@ -12,6 +12,7 @@ class ConcurrencyCompatibleStateType(Enum): date_range = "date-range" + integer = "integer" class AbstractStreamStateConverter(ABC): diff --git a/airbyte_cdk/sources/streams/concurrent/state_converters/incrementing_count_stream_state_converter.py b/airbyte_cdk/sources/streams/concurrent/state_converters/incrementing_count_stream_state_converter.py new file mode 100644 index 000000000..fecc984bc --- /dev/null +++ b/airbyte_cdk/sources/streams/concurrent/state_converters/incrementing_count_stream_state_converter.py @@ -0,0 +1,92 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +from typing import Any, Callable, MutableMapping, Optional, Tuple + +from airbyte_cdk.sources.streams.concurrent.cursor import CursorField +from airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter import ( + AbstractStreamStateConverter, + ConcurrencyCompatibleStateType, +) + + +class IncrementingCountStreamStateConverter(AbstractStreamStateConverter): + def _from_state_message(self, value: Any) -> Any: + return value + + def _to_state_message(self, value: Any) -> Any: + return value + + @classmethod + def get_end_provider(cls) -> Callable[[], float]: + return lambda: float("inf") + + def convert_from_sequential_state( + self, + cursor_field: "CursorField", # to deprecate as it is only needed for sequential state + stream_state: MutableMapping[str, Any], + start: Optional[Any], + ) -> Tuple[Any, MutableMapping[str, Any]]: + """ + Convert the state message to the format required by the ConcurrentCursor. + + e.g. + { + "state_type": ConcurrencyCompatibleStateType.date_range.value, + "metadata": { … }, + "slices": [ + {"start": "10", "end": "2021-01-18T21:18:20.000+00:00"}, + ] + } + """ + sync_start = self._get_sync_start(cursor_field, stream_state, start) + if self.is_state_message_compatible(stream_state): + return sync_start, stream_state + + # Create a slice to represent the records synced during prior syncs. + # The start and end are the same to avoid confusion as to whether the records for this slice + # were actually synced + slices = [ + { + self.START_KEY: start if start is not None else sync_start, + self.END_KEY: sync_start, # this may not be relevant anymore + self.MOST_RECENT_RECORD_KEY: sync_start, + } + ] + + return sync_start, { + "state_type": ConcurrencyCompatibleStateType.integer.value, + "slices": slices, + "legacy": stream_state, + } + + def parse_value(self, value: int) -> int: + return value + + @property + def zero_value(self) -> int: + return 0 + + def increment(self, value: int) -> int: + return value + 1 + + def output_format(self, value: int) -> int: + return value + + def _get_sync_start( + self, + cursor_field: CursorField, + stream_state: MutableMapping[str, Any], + start: Optional[int], + ) -> int: + sync_start = start if start is not None else self.zero_value + prev_sync_low_water_mark: Optional[int] = ( + stream_state[cursor_field.cursor_field_key] + if cursor_field.cursor_field_key in stream_state + else None + ) + if prev_sync_low_water_mark and prev_sync_low_water_mark >= sync_start: + return prev_sync_low_water_mark + else: + return sync_start diff --git a/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/unit_tests/sources/declarative/test_concurrent_declarative_source.py index 8d9e6c675..4a043ac82 100644 --- a/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -4,6 +4,7 @@ import copy import json +import math from datetime import datetime, timedelta, timezone from typing import Any, Dict, Iterable, List, Mapping, Optional, Tuple, Union from unittest.mock import patch @@ -43,6 +44,9 @@ from airbyte_cdk.sources.streams.checkpoint import Cursor from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream +from airbyte_cdk.sources.streams.concurrent.state_converters.incrementing_count_stream_state_converter import ( + IncrementingCountStreamStateConverter, +) from airbyte_cdk.sources.streams.core import StreamData from airbyte_cdk.sources.types import Record, StreamSlice from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse @@ -230,6 +234,16 @@ "inject_into": "request_parameter", }, }, + "incremental_counting_cursor": { + "type": "IncrementingCountCursor", + "cursor_field": "id", + "start_value": 0, + "start_time_option": { + "type": "RequestOption", + "field_name": "since_id", + "inject_into": "request_parameter", + }, + }, "base_stream": {"retriever": {"$ref": "#/definitions/retriever"}}, "base_incremental_stream": { "retriever": { @@ -238,6 +252,13 @@ }, "incremental_sync": {"$ref": "#/definitions/incremental_cursor"}, }, + "base_incremental_counting_stream": { + "retriever": { + "$ref": "#/definitions/retriever", + "requester": {"$ref": "#/definitions/requester"}, + }, + "incremental_sync": {"$ref": "#/definitions/incremental_counting_cursor"}, + }, "party_members_stream": { "$ref": "#/definitions/base_incremental_stream", "retriever": { @@ -527,6 +548,35 @@ }, }, }, + "incremental_counting_stream": { + "$ref": "#/definitions/base_incremental_counting_stream", + "retriever": { + "$ref": "#/definitions/base_incremental_counting_stream/retriever", + "record_selector": {"$ref": "#/definitions/selector"}, + }, + "$parameters": { + "name": "incremental_counting_stream", + "primary_key": "id", + "path": "/party_members", + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "https://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": { + "description": "The identifier", + "type": ["null", "string"], + }, + "name": { + "description": "The name of the party member", + "type": ["null", "string"], + }, + }, + }, + }, + }, }, "streams": [ "#/definitions/party_members_stream", @@ -536,6 +586,7 @@ "#/definitions/arcana_personas_stream", "#/definitions/palace_enemies_stream", "#/definitions/async_job_stream", + "#/definitions/incremental_counting_stream", ], "check": {"stream_names": ["party_members", "locations"]}, "concurrency_level": { @@ -658,9 +709,9 @@ def test_group_streams(): ) concurrent_streams, synchronous_streams = source._group_streams(config=_CONFIG) - # 1 full refresh stream, 2 incremental streams, 1 substream w/o incremental, 1 list based substream w/o incremental + # 1 full refresh stream, 3 incremental streams, 1 substream w/o incremental, 1 list based substream w/o incremental # 1 async job stream, 1 substream w/ incremental - assert len(concurrent_streams) == 7 + assert len(concurrent_streams) == 8 ( concurrent_stream_0, concurrent_stream_1, @@ -669,6 +720,7 @@ def test_group_streams(): concurrent_stream_4, concurrent_stream_5, concurrent_stream_6, + concurrent_stream_7, ) = concurrent_streams assert isinstance(concurrent_stream_0, DefaultStream) assert concurrent_stream_0.name == "party_members" @@ -684,6 +736,8 @@ def test_group_streams(): assert concurrent_stream_5.name == "palace_enemies" assert isinstance(concurrent_stream_6, DefaultStream) assert concurrent_stream_6.name == "async_job_stream" + assert isinstance(concurrent_stream_7, DefaultStream) + assert concurrent_stream_7.name == "incremental_counting_stream" @freezegun.freeze_time(time_to_freeze=datetime(2024, 9, 1, 0, 0, 0, 0, tzinfo=timezone.utc)) @@ -756,6 +810,20 @@ def test_create_concurrent_cursor(): "state_type": "date-range", } + incremental_counting_stream = concurrent_streams[7] + assert isinstance(incremental_counting_stream, DefaultStream) + incremental_counting_cursor = incremental_counting_stream.cursor + + assert isinstance(incremental_counting_cursor, ConcurrentCursor) + assert isinstance( + incremental_counting_cursor._connector_state_converter, + IncrementingCountStreamStateConverter, + ) + assert incremental_counting_cursor._stream_name == "incremental_counting_stream" + assert incremental_counting_cursor._cursor_field.cursor_field_key == "id" + assert incremental_counting_cursor._start == 0 + assert incremental_counting_cursor._end_provider() == math.inf + def test_check(): """ @@ -808,6 +876,7 @@ def test_discover(): "arcana_personas", "palace_enemies", "async_job_stream", + "incremental_counting_stream", } source = ConcurrentDeclarativeSource( diff --git a/unit_tests/sources/streams/concurrent/test_incrementing_count_state_converter.py b/unit_tests/sources/streams/concurrent/test_incrementing_count_state_converter.py new file mode 100644 index 000000000..460d094be --- /dev/null +++ b/unit_tests/sources/streams/concurrent/test_incrementing_count_state_converter.py @@ -0,0 +1,29 @@ +from airbyte_cdk.sources.streams.concurrent.cursor import CursorField +from airbyte_cdk.sources.streams.concurrent.state_converters.incrementing_count_stream_state_converter import ( + IncrementingCountStreamStateConverter, +) + + +def test_convert_from_sequential_state(): + converter = IncrementingCountStreamStateConverter( + is_sequential_state=True, + ) + + _, conversion = converter.convert_from_sequential_state(CursorField("id"), {"id": 12345}, 0) + + assert conversion["state_type"] == "integer" + assert conversion["legacy"] == {"id": 12345} + assert len(conversion["slices"]) == 1 + assert conversion["slices"][0] == {"end": 12345, "most_recent_cursor_value": 12345, "start": 0} + + +def test_convert_to_sequential_state(): + converter = IncrementingCountStreamStateConverter( + is_sequential_state=True, + ) + concurrent_state = { + "legacy": {"id": 12345}, + "slices": [{"end": 12345, "most_recent_cursor_value": 12345, "start": 0}], + "state_type": "integer", + } + assert converter.convert_to_state_message(CursorField("id"), concurrent_state) == {"id": 12345}