From 0bcdad5f6f75a6e591a0cef9040e9b60f0dedb44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Fri, 29 Nov 2024 14:32:31 -0600 Subject: [PATCH 01/14] feat(targets): Added a new built-in setting `activate_version` for targets to optionally disable processing of `ACTIVATE_VERSION` messages --- singer_sdk/helpers/capabilities.py | 8 ++++++++ singer_sdk/sinks/core.py | 9 +++++++++ singer_sdk/target_base.py | 13 +++++++++++++ 3 files changed, 30 insertions(+) diff --git a/singer_sdk/helpers/capabilities.py b/singer_sdk/helpers/capabilities.py index 0f5c42918..b23ae0f00 100644 --- a/singer_sdk/helpers/capabilities.py +++ b/singer_sdk/helpers/capabilities.py @@ -144,6 +144,14 @@ description="The default target database schema name to use for all streams.", ), ).to_dict() +ACTIVATE_VERSION_CONFIG = PropertiesList( + Property( + "activate_version", + BooleanType, + title="Process `ACTIVATE_VERSION` messages", + description="Whether to process `ACTIVATE_VERSION` messages.", + ), +).to_dict() ADD_RECORD_METADATA_CONFIG = PropertiesList( Property( "add_record_metadata", diff --git a/singer_sdk/sinks/core.py b/singer_sdk/sinks/core.py index 5070655d4..7d9d24a3c 100644 --- a/singer_sdk/sinks/core.py +++ b/singer_sdk/sinks/core.py @@ -399,6 +399,15 @@ def include_sdc_metadata_properties(self) -> bool: """ return self.config.get("add_record_metadata", False) + @property + def process_activate_version_messages(self) -> bool: + """Check if activate version messages should be processed. + + Returns: + True if activate version messages should be processed. + """ + return self.config.get("activate_version", True) + @property def datetime_error_treatment(self) -> DatetimeErrorTreatmentEnum: """Return a treatment to use for datetime parse errors: ERROR. MAX, or NULL. diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index 8907b6201..b361698ce 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -454,6 +454,19 @@ def _process_activate_version_message(self, message_dict: dict) -> None: for stream_map in self.mapper.stream_maps[stream_name]: sink = self.get_sink(stream_map.stream_alias) + if not sink.process_activate_version_messages: + self.logger.warning( + "Activate version messages are not enabled for '%s'. Ignoring.", + stream_map.stream_alias, + ) + continue + if not sink.include_sdc_metadata_properties: + self.logger.warning( + "ACTIVATE_VERSION requires _sdc_* metadata properties to be " + "included. Set `add_record_metadata` to `True` if you wanna use " + "this feature." + ) + continue sink.activate_version(message_dict["version"]) def _process_batch_message(self, message_dict: dict) -> None: From c38f72ebdbfa40d6fd3aca6535b7e95f3d4e0f93 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Fri, 29 Nov 2024 14:34:37 -0600 Subject: [PATCH 02/14] Update capabilities --- singer_sdk/target_base.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index b361698ce..0f50211d4 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -16,6 +16,7 @@ from singer_sdk.helpers._batch import BaseBatchFileEncoding from singer_sdk.helpers._classproperty import classproperty from singer_sdk.helpers.capabilities import ( + ACTIVATE_VERSION_CONFIG, ADD_RECORD_METADATA_CONFIG, BATCH_CONFIG, TARGET_BATCH_SIZE_ROWS_CONFIG, @@ -634,6 +635,9 @@ def _merge_missing(source_jsonschema: dict, target_jsonschema: dict) -> None: capabilities = cls.capabilities + if PluginCapabilities.ACTIVATE_VERSION in capabilities: + _merge_missing(ACTIVATE_VERSION_CONFIG, config_jsonschema) + if PluginCapabilities.BATCH in capabilities: _merge_missing(BATCH_CONFIG, config_jsonschema) @@ -673,6 +677,7 @@ def capabilities(self) -> list[CapabilitiesEnum]: sql_target_capabilities: list[CapabilitiesEnum] = super().capabilities sql_target_capabilities.extend( [ + PluginCapabilities.ACTIVATE_VERSION, TargetCapabilities.TARGET_SCHEMA, TargetCapabilities.HARD_DELETE, ] From c1af7c00dc6887ec594491ecf4e79cf8217a7f13 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Fri, 29 Nov 2024 14:46:29 -0600 Subject: [PATCH 03/14] Default `activate_version` to `True` --- singer_sdk/helpers/capabilities.py | 1 + 1 file changed, 1 insertion(+) diff --git a/singer_sdk/helpers/capabilities.py b/singer_sdk/helpers/capabilities.py index b23ae0f00..a679bfd84 100644 --- a/singer_sdk/helpers/capabilities.py +++ b/singer_sdk/helpers/capabilities.py @@ -148,6 +148,7 @@ Property( "activate_version", BooleanType, + default=True, title="Process `ACTIVATE_VERSION` messages", description="Whether to process `ACTIVATE_VERSION` messages.", ), From a8f369a44a3dd44a82d50942088b9ae0c87bdec2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Fri, 29 Nov 2024 16:43:32 -0600 Subject: [PATCH 04/14] Default to enabling SDC columns --- singer_sdk/sinks/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/singer_sdk/sinks/core.py b/singer_sdk/sinks/core.py index 7d9d24a3c..fceb43394 100644 --- a/singer_sdk/sinks/core.py +++ b/singer_sdk/sinks/core.py @@ -397,7 +397,7 @@ def include_sdc_metadata_properties(self) -> bool: Returns: True if metadata columns should be added. """ - return self.config.get("add_record_metadata", False) + return self.config.get("add_record_metadata", True) @property def process_activate_version_messages(self) -> bool: From 9c768945e2f8bd40d8882a6973108392fae8e9fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Fri, 29 Nov 2024 16:50:35 -0600 Subject: [PATCH 05/14] Revert "Default to enabling SDC columns" This reverts commit 12fb01aac25d853d2090106bda708aa15c5e99ac. --- singer_sdk/sinks/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/singer_sdk/sinks/core.py b/singer_sdk/sinks/core.py index fceb43394..7d9d24a3c 100644 --- a/singer_sdk/sinks/core.py +++ b/singer_sdk/sinks/core.py @@ -397,7 +397,7 @@ def include_sdc_metadata_properties(self) -> bool: Returns: True if metadata columns should be added. """ - return self.config.get("add_record_metadata", True) + return self.config.get("add_record_metadata", False) @property def process_activate_version_messages(self) -> bool: From 17fcd92ecb2004eca37d8856d21dbd35b77b892a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Fri, 29 Nov 2024 17:03:09 -0600 Subject: [PATCH 06/14] Enable SDC metadata in SQLite tests --- .pre-commit-config.yaml | 2 +- .../sample_tap_gitlab/gitlab_rest_streams.py | 5 ++- samples/sample_target_sqlite/__init__.py | 42 +++++++++++++++++++ tests/samples/conftest.py | 2 +- tests/samples/test_tap_sqlite.py | 8 ++-- tests/samples/test_target_sqlite.py | 13 ++++-- 6 files changed, 61 insertions(+), 11 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 12ea90545..39b9aeb65 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -44,7 +44,7 @@ repos: - id: check-readthedocs - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.8.0 + rev: v0.8.1 hooks: - id: ruff args: [--fix, --exit-non-zero-on-fix, --show-fixes] diff --git a/samples/sample_tap_gitlab/gitlab_rest_streams.py b/samples/sample_tap_gitlab/gitlab_rest_streams.py index 5ed59f666..6fb1e5640 100644 --- a/samples/sample_tap_gitlab/gitlab_rest_streams.py +++ b/samples/sample_tap_gitlab/gitlab_rest_streams.py @@ -74,7 +74,7 @@ def partitions(self) -> list[dict]: if "{project_id}" in self.path: return [ {"project_id": pid} - for pid in t.cast(list, self.config.get("project_ids")) + for pid in t.cast("list", self.config.get("project_ids")) ] if "{group_id}" in self.path: if "group_ids" not in self.config: @@ -84,7 +84,8 @@ def partitions(self) -> list[dict]: ) raise ValueError(msg) return [ - {"group_id": gid} for gid in t.cast(list, self.config.get("group_ids")) + {"group_id": gid} + for gid in t.cast("list", self.config.get("group_ids")) ] msg = ( f"Could not detect partition type for Gitlab stream '{self.name}' " diff --git a/samples/sample_target_sqlite/__init__.py b/samples/sample_target_sqlite/__init__.py index 0e1062642..7274f51f1 100644 --- a/samples/sample_target_sqlite/__init__.py +++ b/samples/sample_target_sqlite/__init__.py @@ -2,6 +2,8 @@ from __future__ import annotations +import datetime +import sqlite3 import typing as t from singer_sdk import SQLConnector, SQLSink, SQLTarget @@ -10,6 +12,46 @@ DB_PATH_CONFIG = "path_to_db" +def adapt_date_iso(val): + """Adapt datetime.date to ISO 8601 date.""" + return val.isoformat() + + +def adapt_datetime_iso(val): + """Adapt datetime.datetime to timezone-naive ISO 8601 date.""" + return val.isoformat() + + +def adapt_datetime_epoch(val): + """Adapt datetime.datetime to Unix timestamp.""" + return int(val.timestamp()) + + +sqlite3.register_adapter(datetime.date, adapt_date_iso) +sqlite3.register_adapter(datetime.datetime, adapt_datetime_iso) +sqlite3.register_adapter(datetime.datetime, adapt_datetime_epoch) + + +def convert_date(val): + """Convert ISO 8601 date to datetime.date object.""" + return datetime.date.fromisoformat(val.decode()) + + +def convert_datetime(val): + """Convert ISO 8601 datetime to datetime.datetime object.""" + return datetime.datetime.fromisoformat(val.decode()) + + +def convert_timestamp(val): + """Convert Unix epoch timestamp to datetime.datetime object.""" + return datetime.datetime.fromtimestamp(int(val), tz=datetime.timezone.utc) + + +sqlite3.register_converter("date", convert_date) +sqlite3.register_converter("datetime", convert_datetime) +sqlite3.register_converter("timestamp", convert_timestamp) + + class SQLiteConnector(SQLConnector): """The connector for SQLite. diff --git a/tests/samples/conftest.py b/tests/samples/conftest.py index 6580c0d0c..d1bd024d3 100644 --- a/tests/samples/conftest.py +++ b/tests/samples/conftest.py @@ -15,7 +15,7 @@ @pytest.fixture def csv_config(outdir: str) -> dict: """Get configuration dictionary for target-csv.""" - return {"target_folder": outdir} + return {"target_folder": outdir, "add_record_metadata": False} @pytest.fixture diff --git a/tests/samples/test_tap_sqlite.py b/tests/samples/test_tap_sqlite.py index ff83de338..a59c4a08e 100644 --- a/tests/samples/test_tap_sqlite.py +++ b/tests/samples/test_tap_sqlite.py @@ -10,7 +10,6 @@ from samples.sample_tap_sqlite import SQLiteTap from samples.sample_target_csv.csv_target import SampleTargetCSV -from singer_sdk import SQLStream from singer_sdk._singerlib import MetadataMapping, StreamMetadata from singer_sdk.testing import ( get_standard_tap_tests, @@ -21,6 +20,7 @@ if t.TYPE_CHECKING: from pathlib import Path + from singer_sdk import SQLStream from singer_sdk.tap_base import SQLTap @@ -50,7 +50,7 @@ def test_tap_sqlite_cli(sqlite_sample_db_config: dict[str, t.Any], tmp_path: Pat def test_sql_metadata(sqlite_sample_tap: SQLTap): - stream = t.cast(SQLStream, sqlite_sample_tap.streams["main-t1"]) + stream = t.cast("SQLStream", sqlite_sample_tap.streams["main-t1"]) detected_metadata = stream.catalog_entry["metadata"] detected_root_md = next(md for md in detected_metadata if md["breadcrumb"] == []) detected_root_md = detected_root_md["metadata"] @@ -68,7 +68,7 @@ def test_sql_metadata(sqlite_sample_tap: SQLTap): def test_sqlite_discovery(sqlite_sample_tap: SQLTap): _discover_and_select_all(sqlite_sample_tap) sqlite_sample_tap.sync_all() - stream = t.cast(SQLStream, sqlite_sample_tap.streams["main-t1"]) + stream = t.cast("SQLStream", sqlite_sample_tap.streams["main-t1"]) schema = stream.schema assert len(schema["properties"]) == 3 assert stream.name == stream.tap_stream_id == "main-t1" @@ -89,7 +89,7 @@ def test_sqlite_discovery(sqlite_sample_tap: SQLTap): def test_sqlite_input_catalog(sqlite_sample_tap: SQLTap): sqlite_sample_tap.sync_all() - stream = t.cast(SQLStream, sqlite_sample_tap.streams["main-t1"]) + stream = t.cast("SQLStream", sqlite_sample_tap.streams["main-t1"]) assert len(stream.schema["properties"]) == 3 assert len(stream.stream_maps[0].transformed_schema["properties"]) == 3 diff --git a/tests/samples/test_target_sqlite.py b/tests/samples/test_target_sqlite.py index 4f6d54e60..9f8b138dd 100644 --- a/tests/samples/test_target_sqlite.py +++ b/tests/samples/test_target_sqlite.py @@ -38,7 +38,7 @@ def path_to_target_db(tmp_path: Path) -> Path: @pytest.fixture def sqlite_target_test_config(path_to_target_db: Path) -> dict: """Get configuration dictionary for target-csv.""" - return {"path_to_db": str(path_to_target_db)} + return {"path_to_db": str(path_to_target_db), "add_record_metadata": True} @pytest.fixture @@ -505,8 +505,8 @@ def test_record_with_missing_properties( dedent( """\ INSERT INTO test_stream - (id, name, "table") - VALUES (:id, :name, :table)""", + (id, name, "table", _sdc_extracted_at, _sdc_received_at, _sdc_batched_at, _sdc_deleted_at, _sdc_sequence, _sdc_table_version, _sdc_sync_started_at) + VALUES (:id, :name, :table, :_sdc_extracted_at, :_sdc_received_at, :_sdc_batched_at, :_sdc_deleted_at, :_sdc_sequence, :_sdc_table_version, :_sdc_sync_started_at)""", # noqa: E501 ), ), ], @@ -563,6 +563,13 @@ def test_hostile_to_sqlite( ) columns = {res[0] for res in cursor.fetchall()} assert columns == { + "_sdc_batched_at", + "_sdc_deleted_at", + "_sdc_extracted_at", + "_sdc_received_at", + "_sdc_sequence", + "_sdc_sync_started_at", + "_sdc_table_version", "name_with_spaces", "nameiscamelcase", "name_with_dashes", From 786c3d81ddaa1376f4d2cb5c70f7d7d7bd39abe5 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 29 Nov 2024 23:03:22 +0000 Subject: [PATCH 07/14] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- singer_sdk/connectors/sql.py | 4 ++-- singer_sdk/helpers/_state.py | 2 +- singer_sdk/helpers/_typing.py | 4 ++-- singer_sdk/plugin_base.py | 2 +- singer_sdk/tap_base.py | 2 +- singer_sdk/testing/legacy.py | 2 +- singer_sdk/testing/runners.py | 6 +++--- singer_sdk/testing/tap_tests.py | 2 +- singer_sdk/typing.py | 2 +- 9 files changed, 13 insertions(+), 13 deletions(-) diff --git a/singer_sdk/connectors/sql.py b/singer_sdk/connectors/sql.py index 6ec100023..540de023e 100644 --- a/singer_sdk/connectors/sql.py +++ b/singer_sdk/connectors/sql.py @@ -641,7 +641,7 @@ def get_sqlalchemy_url(self, config: dict[str, t.Any]) -> str: # noqa: PLR6301 msg = "Could not find or create 'sqlalchemy_url' for connection." raise ConfigValidationError(msg) - return t.cast(str, config["sqlalchemy_url"]) + return t.cast("str", config["sqlalchemy_url"]) def to_jsonschema_type( self, @@ -1429,7 +1429,7 @@ def _get_type_sort_key( _len = int(getattr(sql_type, "length", 0) or 0) - _pytype = t.cast(type, sql_type.python_type) + _pytype = t.cast("type", sql_type.python_type) if issubclass(_pytype, (str, bytes)): return 900, _len if issubclass(_pytype, datetime): diff --git a/singer_sdk/helpers/_state.py b/singer_sdk/helpers/_state.py index 565dbd51d..9b0aa7be1 100644 --- a/singer_sdk/helpers/_state.py +++ b/singer_sdk/helpers/_state.py @@ -125,7 +125,7 @@ def get_writeable_state_dict( tap_state["bookmarks"] = {} if tap_stream_id not in tap_state["bookmarks"]: tap_state["bookmarks"][tap_stream_id] = {} - stream_state = t.cast(dict, tap_state["bookmarks"][tap_stream_id]) + stream_state = t.cast("dict", tap_state["bookmarks"][tap_stream_id]) if not state_partition_context: return stream_state diff --git a/singer_sdk/helpers/_typing.py b/singer_sdk/helpers/_typing.py index d3b629c11..b916d886b 100644 --- a/singer_sdk/helpers/_typing.py +++ b/singer_sdk/helpers/_typing.py @@ -187,11 +187,11 @@ def get_datelike_property_type(property_schema: dict) -> str | None: Otherwise return None. """ if _is_string_with_format(property_schema): - return t.cast(str, property_schema["format"]) + return t.cast("str", property_schema["format"]) if "anyOf" in property_schema: for type_dict in property_schema["anyOf"]: if _is_string_with_format(type_dict): - return t.cast(str, type_dict["format"]) + return t.cast("str", type_dict["format"]) return None diff --git a/singer_sdk/plugin_base.py b/singer_sdk/plugin_base.py index f12f15227..1b8735f2f 100644 --- a/singer_sdk/plugin_base.py +++ b/singer_sdk/plugin_base.py @@ -384,7 +384,7 @@ def config(self) -> t.Mapping[str, t.Any]: Returns: A frozen (read-only) config dictionary map. """ - return t.cast(dict, MappingProxyType(self._config)) + return t.cast("dict", MappingProxyType(self._config)) @staticmethod def _is_secret_config(config_key: str) -> bool: diff --git a/singer_sdk/tap_base.py b/singer_sdk/tap_base.py index 82900049c..ea6f96beb 100644 --- a/singer_sdk/tap_base.py +++ b/singer_sdk/tap_base.py @@ -316,7 +316,7 @@ def catalog_dict(self) -> dict: Returns: The tap's catalog as a dict """ - return t.cast(dict, self._singer_catalog.to_dict()) + return t.cast("dict", self._singer_catalog.to_dict()) @property def catalog_json_text(self) -> str: diff --git a/singer_sdk/testing/legacy.py b/singer_sdk/testing/legacy.py index eae4c5b5d..30849abd1 100644 --- a/singer_sdk/testing/legacy.py +++ b/singer_sdk/testing/legacy.py @@ -166,7 +166,7 @@ def _select_all(catalog_dict: dict) -> dict: for catalog_entry in catalog.streams: catalog_entry.metadata.root.selected = True - return t.cast(dict, catalog.to_dict()) + return t.cast("dict", catalog.to_dict()) def target_sync_test( diff --git a/singer_sdk/testing/runners.py b/singer_sdk/testing/runners.py index 9ef644d02..609eafd99 100644 --- a/singer_sdk/testing/runners.py +++ b/singer_sdk/testing/runners.py @@ -115,7 +115,7 @@ def new_tap(self) -> Tap: Returns: A configured Tap instance. """ - return t.cast(Tap, self.create()) + return t.cast("Tap", self.create()) def run_discovery(self) -> str: """Run tap discovery. @@ -233,7 +233,7 @@ def new_target(self) -> Target: Returns: A configured Target instance. """ - return t.cast(Target, self.create()) + return t.cast("Target", self.create()) @property def target_input(self) -> t.IO[str]: @@ -247,7 +247,7 @@ def target_input(self) -> t.IO[str]: self._input = self.input_io elif self.input_filepath: self._input = self.input_filepath.open(encoding="utf8") - return t.cast(t.IO[str], self._input) + return t.cast("t.IO[str]", self._input) @target_input.setter def target_input(self, value: t.IO[str]) -> None: diff --git a/singer_sdk/testing/tap_tests.py b/singer_sdk/testing/tap_tests.py index 54352c694..b5e936b26 100644 --- a/singer_sdk/testing/tap_tests.py +++ b/singer_sdk/testing/tap_tests.py @@ -43,7 +43,7 @@ def test(self) -> None: catalog = tap1.catalog_dict # Reset and re-initialize with discovered catalog kwargs = {k: v for k, v in self.runner.default_kwargs.items() if k != "catalog"} - tap2: Tap = t.cast(type[Tap], self.runner.singer_class)( + tap2: Tap = t.cast("type[Tap]", self.runner.singer_class)( config=self.runner.config, catalog=catalog, **kwargs, diff --git a/singer_sdk/typing.py b/singer_sdk/typing.py index fb22f4e82..336eb6995 100644 --- a/singer_sdk/typing.py +++ b/singer_sdk/typing.py @@ -703,7 +703,7 @@ def type_dict(self) -> dict: # type: ignore[override] # TODO: this should be a TypeError, but it's a breaking change. raise ValueError(msg) # noqa: TRY004 - return t.cast(dict, wrapped.type_dict) + return t.cast("dict", wrapped.type_dict) def to_dict(self) -> dict: """Return a dict mapping the property name to its definition. From 9d2180f38a52fb375bfdc61e6d6c77c3c7ab2382 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Fri, 29 Nov 2024 17:08:24 -0600 Subject: [PATCH 08/14] Make Ruff happy --- singer_sdk/testing/runners.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/singer_sdk/testing/runners.py b/singer_sdk/testing/runners.py index 609eafd99..145f44deb 100644 --- a/singer_sdk/testing/runners.py +++ b/singer_sdk/testing/runners.py @@ -9,12 +9,12 @@ from collections import defaultdict from contextlib import redirect_stderr, redirect_stdout -from singer_sdk import Tap, Target from singer_sdk.testing.config import SuiteConfig if t.TYPE_CHECKING: from pathlib import Path + from singer_sdk import Tap, Target from singer_sdk.helpers._compat import Traversable From d9fded95c27b4cec3c8adfc5f3593e1fb67751f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Fri, 29 Nov 2024 17:21:27 -0600 Subject: [PATCH 09/14] Test specifically for _sdc attributes --- tests/samples/test_target_sqlite.py | 69 ++++++++++++++++++++++++----- 1 file changed, 59 insertions(+), 10 deletions(-) diff --git a/tests/samples/test_target_sqlite.py b/tests/samples/test_target_sqlite.py index 9f8b138dd..a104870be 100644 --- a/tests/samples/test_target_sqlite.py +++ b/tests/samples/test_target_sqlite.py @@ -38,7 +38,7 @@ def path_to_target_db(tmp_path: Path) -> Path: @pytest.fixture def sqlite_target_test_config(path_to_target_db: Path) -> dict: """Get configuration dictionary for target-csv.""" - return {"path_to_db": str(path_to_target_db), "add_record_metadata": True} + return {"path_to_db": str(path_to_target_db)} @pytest.fixture @@ -53,6 +53,14 @@ def sqlite_sample_target_hard_delete(sqlite_target_test_config): return SQLiteTarget(config={**sqlite_target_test_config, "hard_delete": True}) +@pytest.fixture +def sqlite_target_add_record_metadata(sqlite_target_test_config): + """Get a sample target object with add_record_metadata enabled.""" + return SQLiteTarget( + config={**sqlite_target_test_config, "add_record_metadata": True} + ) + + @pytest.fixture def sqlite_sample_target_batch(sqlite_target_test_config): """Get a sample target object with hard_delete disabled.""" @@ -269,6 +277,54 @@ def test_sqlite_activate_version( ) +def test_sqlite_add_record_metadata(sqlite_target_add_record_metadata: SQLTarget): + """Test handling the activate_version message for the SQLite target. + + Test performs the following actions: + + - Sends an activate_version message for a table that doesn't exist (which should + have no effect) + """ + test_tbl = f"zzz_tmp_{str(uuid4()).split('-')[-1]}" + schema_msg = { + "type": "SCHEMA", + "stream": test_tbl, + "schema": th.PropertiesList(th.Property("col_a", th.StringType())).to_dict(), + } + + tap_output = "\n".join( + json.dumps(msg) + for msg in [ + schema_msg, + { + "type": "RECORD", + "stream": test_tbl, + "record": {"col_a": "samplerow1"}, + "version": 12345, + }, + ] + ) + + target_sync_test( + sqlite_target_add_record_metadata, + input=StringIO(tap_output), + finalize=True, + ) + + # Check that the record metadata was added + db_path = sqlite_target_add_record_metadata.config["path_to_db"] + engine = sa.create_engine(f"sqlite:///{db_path}") + meta = sa.MetaData() + meta.reflect(bind=engine) + table = meta.tables[test_tbl] + + assert "_sdc_received_at" in table.columns + assert type(table.columns["_sdc_received_at"].type) is sa.DATETIME + + assert "_sdc_sync_started_at" in table.columns + assert type(table.columns["_sdc_sync_started_at"].type) is sa.INTEGER + + def test_sqlite_column_morph(sqlite_sample_target: SQLTarget): """End-to-end-to-end test for SQLite tap and target. @@ -505,8 +561,8 @@ def test_record_with_missing_properties( dedent( """\ INSERT INTO test_stream - (id, name, "table", _sdc_extracted_at, _sdc_received_at, _sdc_batched_at, _sdc_deleted_at, _sdc_sequence, _sdc_table_version, _sdc_sync_started_at) - VALUES (:id, :name, :table, :_sdc_extracted_at, :_sdc_received_at, :_sdc_batched_at, :_sdc_deleted_at, :_sdc_sequence, :_sdc_table_version, :_sdc_sync_started_at)""", # noqa: E501 + (id, name, "table") + VALUES (:id, :name, :table)""", ), ), ], @@ -563,13 +619,6 @@ def test_hostile_to_sqlite( ) columns = {res[0] for res in cursor.fetchall()} assert columns == { - "_sdc_batched_at", - "_sdc_deleted_at", - "_sdc_extracted_at", - "_sdc_received_at", - "_sdc_sequence", - "_sdc_sync_started_at", - "_sdc_table_version", "name_with_spaces", "nameiscamelcase", "name_with_dashes", From 997f716e66e8790885a07ee185e892bd74e4734c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Fri, 29 Nov 2024 17:43:49 -0600 Subject: [PATCH 10/14] Test _sdc_table_version --- tests/samples/test_target_sqlite.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/samples/test_target_sqlite.py b/tests/samples/test_target_sqlite.py index a104870be..69921b5f8 100644 --- a/tests/samples/test_target_sqlite.py +++ b/tests/samples/test_target_sqlite.py @@ -296,6 +296,7 @@ def test_sqlite_add_record_metadata(sqlite_target_add_record_metadata: SQLTarget json.dumps(msg) for msg in [ schema_msg, + {"type": "ACTIVATE_VERSION", "stream": test_tbl, "version": 12345}, { "type": "RECORD", "stream": test_tbl, @@ -324,6 +325,9 @@ def test_sqlite_add_record_metadata(sqlite_target_add_record_metadata: SQLTarget assert "_sdc_sync_started_at" in table.columns assert type(table.columns["_sdc_sync_started_at"].type) is sa.INTEGER + assert "_sdc_table_version" in table.columns + assert type(table.columns["_sdc_table_version"].type) is sa.INTEGER + def test_sqlite_column_morph(sqlite_sample_target: SQLTarget): """End-to-end-to-end test for SQLite tap and target. From cb62ed2ad53fa44a97268decaf672c8def199998 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Fri, 29 Nov 2024 17:47:05 -0600 Subject: [PATCH 11/14] Use `add_record_metadata` with hard delete --- tests/samples/test_target_sqlite.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/tests/samples/test_target_sqlite.py b/tests/samples/test_target_sqlite.py index 69921b5f8..adc7b455b 100644 --- a/tests/samples/test_target_sqlite.py +++ b/tests/samples/test_target_sqlite.py @@ -50,7 +50,13 @@ def sqlite_sample_target(sqlite_target_test_config): @pytest.fixture def sqlite_sample_target_hard_delete(sqlite_target_test_config): """Get a sample target object with hard_delete disabled.""" - return SQLiteTarget(config={**sqlite_target_test_config, "hard_delete": True}) + return SQLiteTarget( + config={ + **sqlite_target_test_config, + "hard_delete": True, + "add_record_metadata": False, + } + ) @pytest.fixture From 72565cd1eeb50a5e747ff08ca508370b0eab494f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Fri, 29 Nov 2024 17:57:03 -0600 Subject: [PATCH 12/14] Do not make a breaking change --- singer_sdk/target_base.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index 0f50211d4..c30c2084a 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -457,17 +457,17 @@ def _process_activate_version_message(self, message_dict: dict) -> None: sink = self.get_sink(stream_map.stream_alias) if not sink.process_activate_version_messages: self.logger.warning( - "Activate version messages are not enabled for '%s'. Ignoring.", + "`ACTIVATE_VERSION` messages are not enabled for '%s'. Ignoring.", stream_map.stream_alias, ) continue if not sink.include_sdc_metadata_properties: self.logger.warning( - "ACTIVATE_VERSION requires _sdc_* metadata properties to be " - "included. Set `add_record_metadata` to `True` if you wanna use " - "this feature." + "The `ACTIVATE_VERSION` feature uses the `_sdc_deleted_at` and " + "`_sdc_deleted_at` metadata properties so they will be added to " + "the schema for '%s' even though `add_record_metadata` is " + "disabled.", ) - continue sink.activate_version(message_dict["version"]) def _process_batch_message(self, message_dict: dict) -> None: From e8adcad9b2cf66b6a3993faf34c96089f1294ec2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Fri, 29 Nov 2024 18:05:42 -0600 Subject: [PATCH 13/14] Test with activate version disabled --- tests/samples/test_target_sqlite.py | 68 +++++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/tests/samples/test_target_sqlite.py b/tests/samples/test_target_sqlite.py index adc7b455b..778c7fef3 100644 --- a/tests/samples/test_target_sqlite.py +++ b/tests/samples/test_target_sqlite.py @@ -59,6 +59,12 @@ def sqlite_sample_target_hard_delete(sqlite_target_test_config): ) +@pytest.fixture +def sqlite_sample_target_no_activate_version(sqlite_target_test_config): + """Get a sample target object with hard_delete disabled.""" + return SQLiteTarget(config={**sqlite_target_test_config, "activate_version": False}) + + @pytest.fixture def sqlite_target_add_record_metadata(sqlite_target_test_config): """Get a sample target object with add_record_metadata enabled.""" @@ -282,6 +288,68 @@ def test_sqlite_activate_version( finalize=True, ) + # Check that the record metadata was added + db_path = sqlite_sample_target_hard_delete.config["path_to_db"] + engine = sa.create_engine(f"sqlite:///{db_path}") + meta = sa.MetaData() + meta.reflect(bind=engine) + table = meta.tables[test_tbl] + + assert "_sdc_table_version" in table.columns + assert type(table.columns["_sdc_table_version"].type) is sa.INTEGER + + assert "_sdc_deleted_at" in table.columns + assert type(table.columns["_sdc_deleted_at"].type) is sa.DATETIME + + +def test_sqlite_no_activate_version( + sqlite_sample_target_no_activate_version: SQLTarget, +): + """Test handling the activate_version message for the SQLite target. + + Test performs the following actions: + + - Sends an activate_version message for a table that doesn't exist (which should + have no effect) + """ + test_tbl = f"zzz_tmp_{str(uuid4()).split('-')[-1]}" + schema_msg = { + "type": "SCHEMA", + "stream": test_tbl, + "schema": th.PropertiesList(th.Property("col_a", th.StringType())).to_dict(), + } + + tap_output = "\n".join( + json.dumps(msg) + for msg in [ + schema_msg, + {"type": "ACTIVATE_VERSION", "stream": test_tbl, "version": 12345}, + { + "type": "RECORD", + "stream": test_tbl, + "record": {"col_a": "samplerow1"}, + "version": 12345, + }, + ] + ) + + target_sync_test( + sqlite_sample_target_no_activate_version, + input=StringIO(tap_output), + finalize=True, + ) + + # Check that the record metadata was added + db_path = sqlite_sample_target_no_activate_version.config["path_to_db"] + engine = sa.create_engine(f"sqlite:///{db_path}") + meta = sa.MetaData() + meta.reflect(bind=engine) + table = meta.tables[test_tbl] + + assert "col_a" in table.columns + assert "_sdc_table_version" not in table.columns + assert "_sdc_deleted_at" not in table.columns + def test_sqlite_add_record_metadata(sqlite_target_add_record_metadata: SQLTarget): """Test handling the activate_version message for the SQLite target. From 6ae38584959327548a0c9da1fe1d65a78f8fd5f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez=20Mondrag=C3=B3n?= Date: Sat, 30 Nov 2024 14:47:16 -0600 Subject: [PATCH 14/14] DRY --- tests/samples/test_target_sqlite.py | 35 ++++++++++++++++------------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/tests/samples/test_target_sqlite.py b/tests/samples/test_target_sqlite.py index 778c7fef3..bb659763d 100644 --- a/tests/samples/test_target_sqlite.py +++ b/tests/samples/test_target_sqlite.py @@ -30,6 +30,23 @@ from singer_sdk.target_base import SQLTarget +def get_table(config: dict, table_name: str) -> sa.Table: + """Get SQLAlchemy metadata and table for inspection. + + Args: + config: Target configuration dictionary containing database path + table_name: Name of the table to inspect + + Returns: + Tuple of (metadata, table) + """ + db_path = config["path_to_db"] + engine = sa.create_engine(f"sqlite:///{db_path}") + meta = sa.MetaData() + meta.reflect(bind=engine) + return meta.tables[table_name] + + @pytest.fixture def path_to_target_db(tmp_path: Path) -> Path: return Path(f"{tmp_path}/target_test.db") @@ -289,11 +306,7 @@ def test_sqlite_activate_version( ) # Check that the record metadata was added - db_path = sqlite_sample_target_hard_delete.config["path_to_db"] - engine = sa.create_engine(f"sqlite:///{db_path}") - meta = sa.MetaData() - meta.reflect(bind=engine) - table = meta.tables[test_tbl] + table = get_table(sqlite_sample_target_hard_delete.config, test_tbl) assert "_sdc_table_version" in table.columns assert type(table.columns["_sdc_table_version"].type) is sa.INTEGER @@ -340,11 +353,7 @@ def test_sqlite_no_activate_version( ) # Check that the record metadata was added - db_path = sqlite_sample_target_no_activate_version.config["path_to_db"] - engine = sa.create_engine(f"sqlite:///{db_path}") - meta = sa.MetaData() - meta.reflect(bind=engine) - table = meta.tables[test_tbl] + table = get_table(sqlite_sample_target_no_activate_version.config, test_tbl) assert "col_a" in table.columns assert "_sdc_table_version" not in table.columns @@ -387,11 +396,7 @@ def test_sqlite_add_record_metadata(sqlite_target_add_record_metadata: SQLTarget ) # Check that the record metadata was added - db_path = sqlite_target_add_record_metadata.config["path_to_db"] - engine = sa.create_engine(f"sqlite:///{db_path}") - meta = sa.MetaData() - meta.reflect(bind=engine) - table = meta.tables[test_tbl] + table = get_table(sqlite_target_add_record_metadata.config, test_tbl) assert "_sdc_received_at" in table.columns assert type(table.columns["_sdc_received_at"].type) is sa.DATETIME