From ace25f21dcaf5e4a692eb02c0128804c54f84113 Mon Sep 17 00:00:00 2001 From: skrydal Date: Sat, 1 Mar 2025 01:30:07 +0100 Subject: [PATCH] feat(ingestion/iceberg): Several improvements to iceberg connector (#12744) --- .../ingestion/source/iceberg/iceberg.py | 58 +++++++++++++++---- .../source/iceberg/iceberg_common.py | 51 +++++++++------- .../iceberg_deleted_table_mces_golden.json | 21 ++++--- .../iceberg/iceberg_ingest_mces_golden.json | 15 +++-- .../iceberg/iceberg_profile_mces_golden.json | 19 ++++-- .../tests/integration/iceberg/test_iceberg.py | 2 + metadata-ingestion/tests/unit/test_iceberg.py | 19 +++++- 7 files changed, 135 insertions(+), 50 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py b/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py index 9a62ee2dab52f4..4648fc9411799b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py +++ b/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py @@ -2,8 +2,9 @@ import logging import threading import uuid -from typing import Any, Dict, Iterable, List, Optional +from typing import Any, Dict, Iterable, List, Optional, Tuple +from dateutil import parser as dateutil_parser from pyiceberg.catalog import Catalog from pyiceberg.exceptions import ( NoSuchIcebergTableError, @@ -81,6 +82,7 @@ OwnerClass, OwnershipClass, OwnershipTypeClass, + TimeStampClass, ) from datahub.utilities.perf_timer import PerfTimer from datahub.utilities.threaded_iterator_executor import ThreadedIteratorExecutor @@ -183,16 +185,9 @@ def _get_datasets(self, catalog: Catalog) -> Iterable[Identifier]: def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: thread_local = threading.local() - def _process_dataset(dataset_path: Identifier) -> Iterable[MetadataWorkUnit]: - LOGGER.debug(f"Processing dataset for path {dataset_path}") - dataset_name = ".".join(dataset_path) - if not self.config.table_pattern.allowed(dataset_name): - # Dataset name is rejected by pattern, report as dropped. - self.report.report_dropped(dataset_name) - LOGGER.debug( - f"Skipping table {dataset_name} due to not being allowed by the config pattern" - ) - return + def _try_processing_dataset( + dataset_path: Tuple[str, ...], dataset_name: str + ) -> Iterable[MetadataWorkUnit]: try: if not hasattr(thread_local, "local_catalog"): LOGGER.debug( @@ -248,10 +243,31 @@ def _process_dataset(dataset_path: Identifier) -> Iterable[MetadataWorkUnit]: LOGGER.warning( f"Iceberg Rest Catalog server error (500 status) encountered when processing table {dataset_path}, skipping it." ) + except ValueError as e: + if "Could not initialize FileIO" not in str(e): + raise + self.report.warning( + "Could not initialize FileIO", + f"Could not initialize FileIO for {dataset_path} due to: {e}", + ) + + def _process_dataset(dataset_path: Identifier) -> Iterable[MetadataWorkUnit]: + try: + LOGGER.debug(f"Processing dataset for path {dataset_path}") + dataset_name = ".".join(dataset_path) + if not self.config.table_pattern.allowed(dataset_name): + # Dataset name is rejected by pattern, report as dropped. + self.report.report_dropped(dataset_name) + LOGGER.debug( + f"Skipping table {dataset_name} due to not being allowed by the config pattern" + ) + return + + yield from _try_processing_dataset(dataset_path, dataset_name) except Exception as e: self.report.report_failure( "general", - f"Failed to create workunit for dataset {dataset_name}: {e}", + f"Failed to create workunit for dataset {dataset_path}: {e}", ) LOGGER.exception( f"Exception while processing table {dataset_path}, skipping it.", @@ -288,6 +304,7 @@ def _create_iceberg_workunit( ) # Dataset properties aspect. + additional_properties = {} custom_properties = table.metadata.properties.copy() custom_properties["location"] = table.metadata.location custom_properties["format-version"] = str(table.metadata.format_version) @@ -299,10 +316,27 @@ def _create_iceberg_workunit( custom_properties["manifest-list"] = ( table.current_snapshot().manifest_list ) + additional_properties["lastModified"] = TimeStampClass( + int(table.current_snapshot().timestamp_ms) + ) + if "created-at" in custom_properties: + try: + dt = dateutil_parser.isoparse(custom_properties["created-at"]) + additional_properties["created"] = TimeStampClass( + int(dt.timestamp() * 1000) + ) + except Exception as ex: + LOGGER.warning( + f"Exception while trying to parse creation date {custom_properties['created-at']}, ignoring: {ex}" + ) + dataset_properties = DatasetPropertiesClass( name=table.name()[-1], description=table.metadata.properties.get("comment", None), customProperties=custom_properties, + lastModified=additional_properties.get("lastModified"), + created=additional_properties.get("created"), + qualifiedName=dataset_name, ) dataset_snapshot.aspects.append(dataset_properties) # Dataset ownership aspect. diff --git a/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg_common.py b/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg_common.py index 831b3b1a7e9132..a1ac7f67ae6cfd 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg_common.py @@ -1,4 +1,5 @@ import logging +import threading from dataclasses import dataclass, field from typing import Any, Dict, Optional @@ -156,18 +157,21 @@ class TopTableTimings: def __init__(self, size: int = 10): self._size = size self.top_entites = SortedList(key=lambda x: -x.get(self._VALUE_FIELD, 0)) + self._lock = threading.Lock() def add(self, entity: Dict[str, Any]) -> None: if self._VALUE_FIELD not in entity: return - self.top_entites.add(entity) - if len(self.top_entites) > self._size: - self.top_entites.pop() + with self._lock: + self.top_entites.add(entity) + if len(self.top_entites) > self._size: + self.top_entites.pop() def __str__(self) -> str: - if len(self.top_entites) == 0: - return "no timings reported" - return str(list(self.top_entites)) + with self._lock: + if len(self.top_entites) == 0: + return "no timings reported" + return str(list(self.top_entites)) class TimingClass: @@ -175,24 +179,31 @@ class TimingClass: def __init__(self): self.times = SortedList() + self._lock = threading.Lock() def add_timing(self, t: float) -> None: - self.times.add(t) + with self._lock: + self.times.add(t) def __str__(self) -> str: - if len(self.times) == 0: - return "no timings reported" - total = sum(self.times) - avg = total / len(self.times) - return str( - { - "average_time": format_timespan(avg, detailed=True, max_units=3), - "min_time": format_timespan(self.times[0], detailed=True, max_units=3), - "max_time": format_timespan(self.times[-1], detailed=True, max_units=3), - # total_time does not provide correct information in case we run in more than 1 thread - "total_time": format_timespan(total, detailed=True, max_units=3), - } - ) + with self._lock: + if len(self.times) == 0: + return "no timings reported" + total = sum(self.times) + avg = total / len(self.times) + return str( + { + "average_time": format_timespan(avg, detailed=True, max_units=3), + "min_time": format_timespan( + self.times[0], detailed=True, max_units=3 + ), + "max_time": format_timespan( + self.times[-1], detailed=True, max_units=3 + ), + # total_time does not provide correct information in case we run in more than 1 thread + "total_time": format_timespan(total, detailed=True, max_units=3), + } + ) @dataclass diff --git a/metadata-ingestion/tests/integration/iceberg/iceberg_deleted_table_mces_golden.json b/metadata-ingestion/tests/integration/iceberg/iceberg_deleted_table_mces_golden.json index 4b2afb29ddda8a..9384db249e1474 100644 --- a/metadata-ingestion/tests/integration/iceberg/iceberg_deleted_table_mces_golden.json +++ b/metadata-ingestion/tests/integration/iceberg/iceberg_deleted_table_mces_golden.json @@ -11,16 +11,23 @@ }, { "com.linkedin.pegasus2avro.dataset.DatasetProperties": { - "name": "another_taxis", "customProperties": { "owner": "root", - "created-at": "2024-06-27T17:29:32.492204247Z", + "created-at": "2025-02-28T22:29:45.128452801Z", "write.format.default": "parquet", "location": "s3a://warehouse/wh/nyc/another_taxis", "format-version": "1", "partition-spec": "[{\"name\": \"trip_date\", \"transform\": \"identity\", \"source\": \"trip_date\", \"source-id\": 2, \"source-type\": \"timestamptz\", \"field-id\": 1000}]", - "snapshot-id": "1131595459662979239", - "manifest-list": "s3a://warehouse/wh/nyc/another_taxis/metadata/snap-1131595459662979239-1-0e80739b-774c-4eda-9d96-3a4c70873c32.avro" + "snapshot-id": "2650690634256613262", + "manifest-list": "s3a://warehouse/wh/nyc/another_taxis/metadata/snap-2650690634256613262-1-da2608b8-1ca0-4331-ac49-332e8a7654d3.avro" + }, + "name": "another_taxis", + "qualifiedName": "nyc.another_taxis", + "created": { + "time": 1740781785128 + }, + "lastModified": { + "time": 1740781786629 }, "tags": [] } @@ -150,7 +157,7 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "iceberg-2020_04_14-07_00_00", + "runId": "iceberg-2020_04_14-07_00_00-x9zw1z", "lastRunId": "no-run-id-provided", "pipelineName": "test_pipeline" } @@ -168,7 +175,7 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "iceberg-2020_04_14-07_00_00", + "runId": "iceberg-2020_04_14-07_00_00-x9zw1z", "lastRunId": "no-run-id-provided", "pipelineName": "test_pipeline" } @@ -185,7 +192,7 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "iceberg-2020_04_14-07_00_00", + "runId": "iceberg-2020_04_14-07_00_00-x9zw1z", "lastRunId": "no-run-id-provided", "pipelineName": "test_pipeline" } diff --git a/metadata-ingestion/tests/integration/iceberg/iceberg_ingest_mces_golden.json b/metadata-ingestion/tests/integration/iceberg/iceberg_ingest_mces_golden.json index 477f719ef93178..d2c56e59c4dfee 100644 --- a/metadata-ingestion/tests/integration/iceberg/iceberg_ingest_mces_golden.json +++ b/metadata-ingestion/tests/integration/iceberg/iceberg_ingest_mces_golden.json @@ -11,16 +11,23 @@ }, { "com.linkedin.pegasus2avro.dataset.DatasetProperties": { - "name": "taxis", "customProperties": { "owner": "root", - "created-at": "2024-05-22T14:08:04.001538500Z", + "created-at": "2025-02-28T22:28:42.803284675Z", "write.format.default": "parquet", "location": "s3a://warehouse/wh/nyc/taxis", "format-version": "1", "partition-spec": "[{\"name\": \"trip_date\", \"transform\": \"identity\", \"source\": \"trip_date\", \"source-id\": 2, \"source-type\": \"timestamptz\", \"field-id\": 1000}]", - "snapshot-id": "5259199139271057622", - "manifest-list": "s3a://warehouse/wh/nyc/taxis/metadata/snap-5259199139271057622-1-24dca7b8-d437-458e-ae91-df1d3e30bdc8.avro" + "snapshot-id": "7588721351475793452", + "manifest-list": "s3a://warehouse/wh/nyc/taxis/metadata/snap-7588721351475793452-1-55e4dfe5-1478-4fda-a866-f03c24e643bb.avro" + }, + "name": "taxis", + "qualifiedName": "nyc.taxis", + "created": { + "time": 1740781722803 + }, + "lastModified": { + "time": 1740781724377 }, "tags": [] } diff --git a/metadata-ingestion/tests/integration/iceberg/iceberg_profile_mces_golden.json b/metadata-ingestion/tests/integration/iceberg/iceberg_profile_mces_golden.json index 6d2ca013d81d01..1594c23a4729f2 100644 --- a/metadata-ingestion/tests/integration/iceberg/iceberg_profile_mces_golden.json +++ b/metadata-ingestion/tests/integration/iceberg/iceberg_profile_mces_golden.json @@ -11,16 +11,23 @@ }, { "com.linkedin.pegasus2avro.dataset.DatasetProperties": { - "name": "taxis", "customProperties": { "owner": "root", - "created-at": "2024-05-22T14:10:22.926080700Z", + "created-at": "2025-02-28T22:30:17.629572885Z", "write.format.default": "parquet", "location": "s3a://warehouse/wh/nyc/taxis", "format-version": "1", "partition-spec": "[{\"name\": \"trip_date\", \"transform\": \"identity\", \"source\": \"trip_date\", \"source-id\": 2, \"source-type\": \"timestamptz\", \"field-id\": 1000}]", - "snapshot-id": "564034874306625146", - "manifest-list": "s3a://warehouse/wh/nyc/taxis/metadata/snap-564034874306625146-1-562a1705-d774-4e0a-baf0-1988bcc7be72.avro" + "snapshot-id": "7306419080492452372", + "manifest-list": "s3a://warehouse/wh/nyc/taxis/metadata/snap-7306419080492452372-1-53f849ab-2791-4a72-888c-fd2b0173712c.avro" + }, + "name": "taxis", + "qualifiedName": "nyc.taxis", + "created": { + "time": 1740781817629 + }, + "lastModified": { + "time": 1740781819146 }, "tags": [] } @@ -163,8 +170,8 @@ "json": { "timestampMillis": 1586847600000, "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" }, "rowCount": 5, "columnCount": 6, diff --git a/metadata-ingestion/tests/integration/iceberg/test_iceberg.py b/metadata-ingestion/tests/integration/iceberg/test_iceberg.py index 85809e557dd8d3..c5d006b241203a 100644 --- a/metadata-ingestion/tests/integration/iceberg/test_iceberg.py +++ b/metadata-ingestion/tests/integration/iceberg/test_iceberg.py @@ -22,6 +22,8 @@ # these. PATHS_IN_GOLDEN_FILE_TO_IGNORE = [ r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['customProperties'\]\['created-at'\]", + r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['com.linkedin.pegasus2avro.dataset.DatasetProperties'\]\['created'\]", + r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['com.linkedin.pegasus2avro.dataset.DatasetProperties'\]\['lastModified'\]", r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['customProperties'\]\['snapshot-id'\]", r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['customProperties'\]\['manifest-list'\]", ] diff --git a/metadata-ingestion/tests/unit/test_iceberg.py b/metadata-ingestion/tests/unit/test_iceberg.py index 12a7228ab792c2..c714ad1e568e2d 100644 --- a/metadata-ingestion/tests/unit/test_iceberg.py +++ b/metadata-ingestion/tests/unit/test_iceberg.py @@ -968,6 +968,9 @@ def _raise_no_such_table_exception(): def _raise_server_error(): raise ServerError() + def _raise_fileio_error(): + raise ValueError("Could not initialize FileIO: abc.dummy.fileio") + mock_catalog = MockCatalog( { "namespaceA": { @@ -1024,6 +1027,7 @@ def _raise_server_error(): "table7": _raise_file_not_found_error, "table8": _raise_no_such_iceberg_table_exception, "table9": _raise_server_error, + "table10": _raise_fileio_error, } } ) @@ -1047,7 +1051,7 @@ def _raise_server_error(): "urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table4,PROD)", ], ) - assert source.report.warnings.total_elements == 5 + assert source.report.warnings.total_elements == 6 assert source.report.failures.total_elements == 0 assert source.report.tables_scanned == 4 @@ -1058,6 +1062,9 @@ def test_handle_unexpected_exceptions() -> None: def _raise_exception(): raise Exception() + def _raise_other_value_error_exception(): + raise ValueError("Other value exception") + mock_catalog = MockCatalog( { "namespaceA": { @@ -1110,6 +1117,7 @@ def _raise_exception(): catalog=None, ), "table5": _raise_exception, + "table6": _raise_other_value_error_exception, } } ) @@ -1136,3 +1144,12 @@ def _raise_exception(): assert source.report.warnings.total_elements == 0 assert source.report.failures.total_elements == 1 assert source.report.tables_scanned == 4 + # Needed to make sure all failures are recognized properly + failures = [f for f in source.report.failures] + TestCase().assertCountEqual( + failures[0].context, + [ + "Failed to create workunit for dataset ('namespaceA', 'table6'): Other value exception", + "Failed to create workunit for dataset ('namespaceA', 'table5'): ", + ], + )