Skip to content

Commit

Permalink
feat(ingestion/iceberg): Several improvements to iceberg connector (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
skrydal authored Mar 1, 2025
1 parent 862d1ac commit 3e1b20c
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 50 deletions.
58 changes: 46 additions & 12 deletions metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
import logging
import threading
import uuid
from typing import Any, Dict, Iterable, List, Optional
from typing import Any, Dict, Iterable, List, Optional, Tuple

from dateutil import parser as dateutil_parser
from pyiceberg.catalog import Catalog
from pyiceberg.exceptions import (
NoSuchIcebergTableError,
Expand Down Expand Up @@ -81,6 +82,7 @@
OwnerClass,
OwnershipClass,
OwnershipTypeClass,
TimeStampClass,
)
from datahub.utilities.perf_timer import PerfTimer
from datahub.utilities.threaded_iterator_executor import ThreadedIteratorExecutor
Expand Down Expand Up @@ -183,16 +185,9 @@ def _get_datasets(self, catalog: Catalog) -> Iterable[Identifier]:
def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
thread_local = threading.local()

def _process_dataset(dataset_path: Identifier) -> Iterable[MetadataWorkUnit]:
LOGGER.debug(f"Processing dataset for path {dataset_path}")
dataset_name = ".".join(dataset_path)
if not self.config.table_pattern.allowed(dataset_name):
# Dataset name is rejected by pattern, report as dropped.
self.report.report_dropped(dataset_name)
LOGGER.debug(
f"Skipping table {dataset_name} due to not being allowed by the config pattern"
)
return
def _try_processing_dataset(
dataset_path: Tuple[str, ...], dataset_name: str
) -> Iterable[MetadataWorkUnit]:
try:
if not hasattr(thread_local, "local_catalog"):
LOGGER.debug(
Expand Down Expand Up @@ -248,10 +243,31 @@ def _process_dataset(dataset_path: Identifier) -> Iterable[MetadataWorkUnit]:
LOGGER.warning(
f"Iceberg Rest Catalog server error (500 status) encountered when processing table {dataset_path}, skipping it."
)
except ValueError as e:
if "Could not initialize FileIO" not in str(e):
raise
self.report.warning(
"Could not initialize FileIO",
f"Could not initialize FileIO for {dataset_path} due to: {e}",
)

def _process_dataset(dataset_path: Identifier) -> Iterable[MetadataWorkUnit]:
try:
LOGGER.debug(f"Processing dataset for path {dataset_path}")
dataset_name = ".".join(dataset_path)
if not self.config.table_pattern.allowed(dataset_name):
# Dataset name is rejected by pattern, report as dropped.
self.report.report_dropped(dataset_name)
LOGGER.debug(
f"Skipping table {dataset_name} due to not being allowed by the config pattern"
)
return

yield from _try_processing_dataset(dataset_path, dataset_name)
except Exception as e:
self.report.report_failure(
"general",
f"Failed to create workunit for dataset {dataset_name}: {e}",
f"Failed to create workunit for dataset {dataset_path}: {e}",
)
LOGGER.exception(
f"Exception while processing table {dataset_path}, skipping it.",
Expand Down Expand Up @@ -288,6 +304,7 @@ def _create_iceberg_workunit(
)

# Dataset properties aspect.
additional_properties = {}
custom_properties = table.metadata.properties.copy()
custom_properties["location"] = table.metadata.location
custom_properties["format-version"] = str(table.metadata.format_version)
Expand All @@ -299,10 +316,27 @@ def _create_iceberg_workunit(
custom_properties["manifest-list"] = (
table.current_snapshot().manifest_list
)
additional_properties["lastModified"] = TimeStampClass(
int(table.current_snapshot().timestamp_ms)
)
if "created-at" in custom_properties:
try:
dt = dateutil_parser.isoparse(custom_properties["created-at"])
additional_properties["created"] = TimeStampClass(
int(dt.timestamp() * 1000)
)
except Exception as ex:
LOGGER.warning(
f"Exception while trying to parse creation date {custom_properties['created-at']}, ignoring: {ex}"
)

dataset_properties = DatasetPropertiesClass(
name=table.name()[-1],
description=table.metadata.properties.get("comment", None),
customProperties=custom_properties,
lastModified=additional_properties.get("lastModified"),
created=additional_properties.get("created"),
qualifiedName=dataset_name,
)
dataset_snapshot.aspects.append(dataset_properties)
# Dataset ownership aspect.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import threading
from dataclasses import dataclass, field
from typing import Any, Dict, Optional

Expand Down Expand Up @@ -156,43 +157,53 @@ class TopTableTimings:
def __init__(self, size: int = 10):
self._size = size
self.top_entites = SortedList(key=lambda x: -x.get(self._VALUE_FIELD, 0))
self._lock = threading.Lock()

def add(self, entity: Dict[str, Any]) -> None:
if self._VALUE_FIELD not in entity:
return
self.top_entites.add(entity)
if len(self.top_entites) > self._size:
self.top_entites.pop()
with self._lock:
self.top_entites.add(entity)
if len(self.top_entites) > self._size:
self.top_entites.pop()

def __str__(self) -> str:
if len(self.top_entites) == 0:
return "no timings reported"
return str(list(self.top_entites))
with self._lock:
if len(self.top_entites) == 0:
return "no timings reported"
return str(list(self.top_entites))


class TimingClass:
times: SortedList

def __init__(self):
self.times = SortedList()
self._lock = threading.Lock()

def add_timing(self, t: float) -> None:
self.times.add(t)
with self._lock:
self.times.add(t)

def __str__(self) -> str:
if len(self.times) == 0:
return "no timings reported"
total = sum(self.times)
avg = total / len(self.times)
return str(
{
"average_time": format_timespan(avg, detailed=True, max_units=3),
"min_time": format_timespan(self.times[0], detailed=True, max_units=3),
"max_time": format_timespan(self.times[-1], detailed=True, max_units=3),
# total_time does not provide correct information in case we run in more than 1 thread
"total_time": format_timespan(total, detailed=True, max_units=3),
}
)
with self._lock:
if len(self.times) == 0:
return "no timings reported"
total = sum(self.times)
avg = total / len(self.times)
return str(
{
"average_time": format_timespan(avg, detailed=True, max_units=3),
"min_time": format_timespan(
self.times[0], detailed=True, max_units=3
),
"max_time": format_timespan(
self.times[-1], detailed=True, max_units=3
),
# total_time does not provide correct information in case we run in more than 1 thread
"total_time": format_timespan(total, detailed=True, max_units=3),
}
)


@dataclass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,23 @@
},
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"name": "another_taxis",
"customProperties": {
"owner": "root",
"created-at": "2024-06-27T17:29:32.492204247Z",
"created-at": "2025-02-28T22:29:45.128452801Z",
"write.format.default": "parquet",
"location": "s3a://warehouse/wh/nyc/another_taxis",
"format-version": "1",
"partition-spec": "[{\"name\": \"trip_date\", \"transform\": \"identity\", \"source\": \"trip_date\", \"source-id\": 2, \"source-type\": \"timestamptz\", \"field-id\": 1000}]",
"snapshot-id": "1131595459662979239",
"manifest-list": "s3a://warehouse/wh/nyc/another_taxis/metadata/snap-1131595459662979239-1-0e80739b-774c-4eda-9d96-3a4c70873c32.avro"
"snapshot-id": "2650690634256613262",
"manifest-list": "s3a://warehouse/wh/nyc/another_taxis/metadata/snap-2650690634256613262-1-da2608b8-1ca0-4331-ac49-332e8a7654d3.avro"
},
"name": "another_taxis",
"qualifiedName": "nyc.another_taxis",
"created": {
"time": 1740781785128
},
"lastModified": {
"time": 1740781786629
},
"tags": []
}
Expand Down Expand Up @@ -150,7 +157,7 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "iceberg-2020_04_14-07_00_00",
"runId": "iceberg-2020_04_14-07_00_00-x9zw1z",
"lastRunId": "no-run-id-provided",
"pipelineName": "test_pipeline"
}
Expand All @@ -168,7 +175,7 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "iceberg-2020_04_14-07_00_00",
"runId": "iceberg-2020_04_14-07_00_00-x9zw1z",
"lastRunId": "no-run-id-provided",
"pipelineName": "test_pipeline"
}
Expand All @@ -185,7 +192,7 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "iceberg-2020_04_14-07_00_00",
"runId": "iceberg-2020_04_14-07_00_00-x9zw1z",
"lastRunId": "no-run-id-provided",
"pipelineName": "test_pipeline"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,23 @@
},
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"name": "taxis",
"customProperties": {
"owner": "root",
"created-at": "2024-05-22T14:08:04.001538500Z",
"created-at": "2025-02-28T22:28:42.803284675Z",
"write.format.default": "parquet",
"location": "s3a://warehouse/wh/nyc/taxis",
"format-version": "1",
"partition-spec": "[{\"name\": \"trip_date\", \"transform\": \"identity\", \"source\": \"trip_date\", \"source-id\": 2, \"source-type\": \"timestamptz\", \"field-id\": 1000}]",
"snapshot-id": "5259199139271057622",
"manifest-list": "s3a://warehouse/wh/nyc/taxis/metadata/snap-5259199139271057622-1-24dca7b8-d437-458e-ae91-df1d3e30bdc8.avro"
"snapshot-id": "7588721351475793452",
"manifest-list": "s3a://warehouse/wh/nyc/taxis/metadata/snap-7588721351475793452-1-55e4dfe5-1478-4fda-a866-f03c24e643bb.avro"
},
"name": "taxis",
"qualifiedName": "nyc.taxis",
"created": {
"time": 1740781722803
},
"lastModified": {
"time": 1740781724377
},
"tags": []
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,23 @@
},
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"name": "taxis",
"customProperties": {
"owner": "root",
"created-at": "2024-05-22T14:10:22.926080700Z",
"created-at": "2025-02-28T22:30:17.629572885Z",
"write.format.default": "parquet",
"location": "s3a://warehouse/wh/nyc/taxis",
"format-version": "1",
"partition-spec": "[{\"name\": \"trip_date\", \"transform\": \"identity\", \"source\": \"trip_date\", \"source-id\": 2, \"source-type\": \"timestamptz\", \"field-id\": 1000}]",
"snapshot-id": "564034874306625146",
"manifest-list": "s3a://warehouse/wh/nyc/taxis/metadata/snap-564034874306625146-1-562a1705-d774-4e0a-baf0-1988bcc7be72.avro"
"snapshot-id": "7306419080492452372",
"manifest-list": "s3a://warehouse/wh/nyc/taxis/metadata/snap-7306419080492452372-1-53f849ab-2791-4a72-888c-fd2b0173712c.avro"
},
"name": "taxis",
"qualifiedName": "nyc.taxis",
"created": {
"time": 1740781817629
},
"lastModified": {
"time": 1740781819146
},
"tags": []
}
Expand Down Expand Up @@ -163,8 +170,8 @@
"json": {
"timestampMillis": 1586847600000,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"rowCount": 5,
"columnCount": 6,
Expand Down
2 changes: 2 additions & 0 deletions metadata-ingestion/tests/integration/iceberg/test_iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
# these.
PATHS_IN_GOLDEN_FILE_TO_IGNORE = [
r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['customProperties'\]\['created-at'\]",
r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['com.linkedin.pegasus2avro.dataset.DatasetProperties'\]\['created'\]",
r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['com.linkedin.pegasus2avro.dataset.DatasetProperties'\]\['lastModified'\]",
r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['customProperties'\]\['snapshot-id'\]",
r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['customProperties'\]\['manifest-list'\]",
]
Expand Down
19 changes: 18 additions & 1 deletion metadata-ingestion/tests/unit/test_iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -968,6 +968,9 @@ def _raise_no_such_table_exception():
def _raise_server_error():
raise ServerError()

def _raise_fileio_error():
raise ValueError("Could not initialize FileIO: abc.dummy.fileio")

mock_catalog = MockCatalog(
{
"namespaceA": {
Expand Down Expand Up @@ -1024,6 +1027,7 @@ def _raise_server_error():
"table7": _raise_file_not_found_error,
"table8": _raise_no_such_iceberg_table_exception,
"table9": _raise_server_error,
"table10": _raise_fileio_error,
}
}
)
Expand All @@ -1047,7 +1051,7 @@ def _raise_server_error():
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table4,PROD)",
],
)
assert source.report.warnings.total_elements == 5
assert source.report.warnings.total_elements == 6
assert source.report.failures.total_elements == 0
assert source.report.tables_scanned == 4

Expand All @@ -1058,6 +1062,9 @@ def test_handle_unexpected_exceptions() -> None:
def _raise_exception():
raise Exception()

def _raise_other_value_error_exception():
raise ValueError("Other value exception")

mock_catalog = MockCatalog(
{
"namespaceA": {
Expand Down Expand Up @@ -1110,6 +1117,7 @@ def _raise_exception():
catalog=None,
),
"table5": _raise_exception,
"table6": _raise_other_value_error_exception,
}
}
)
Expand All @@ -1136,3 +1144,12 @@ def _raise_exception():
assert source.report.warnings.total_elements == 0
assert source.report.failures.total_elements == 1
assert source.report.tables_scanned == 4
# Needed to make sure all failures are recognized properly
failures = [f for f in source.report.failures]
TestCase().assertCountEqual(
failures[0].context,
[
"Failed to create workunit for dataset ('namespaceA', 'table6'): Other value exception",
"Failed to create workunit for dataset ('namespaceA', 'table5'): ",
],
)

0 comments on commit 3e1b20c

Please sign in to comment.