Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion/iceberg): Several improvements to iceberg connector #12744

Merged
merged 12 commits into from
Mar 1, 2025
57 changes: 45 additions & 12 deletions metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
import logging
import threading
import uuid
from typing import Any, Dict, Iterable, List, Optional
from typing import Any, Dict, Iterable, List, Optional, Tuple

from dateutil import parser as dateutil_parser
from pyiceberg.catalog import Catalog
from pyiceberg.exceptions import (
NoSuchIcebergTableError,
Expand Down Expand Up @@ -81,6 +82,7 @@
OwnerClass,
OwnershipClass,
OwnershipTypeClass,
TimeStampClass,
)
from datahub.utilities.perf_timer import PerfTimer
from datahub.utilities.threaded_iterator_executor import ThreadedIteratorExecutor
Expand Down Expand Up @@ -183,16 +185,9 @@ def _get_datasets(self, catalog: Catalog) -> Iterable[Identifier]:
def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
thread_local = threading.local()

def _process_dataset(dataset_path: Identifier) -> Iterable[MetadataWorkUnit]:
LOGGER.debug(f"Processing dataset for path {dataset_path}")
dataset_name = ".".join(dataset_path)
if not self.config.table_pattern.allowed(dataset_name):
# Dataset name is rejected by pattern, report as dropped.
self.report.report_dropped(dataset_name)
LOGGER.debug(
f"Skipping table {dataset_name} due to not being allowed by the config pattern"
)
return
def _try_processing_dataset(
dataset_path: Tuple[str, ...], dataset_name: str
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tuple[str, ...] seems equivalent to List[str]

I guess only difference is the tuple ensures a size >= 1 and that's relevant here, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are more differences between a tuple and a list and this annotation reflects that. For one, tuple is immutable and passed by value. Moreover this annotation matches what we are getting from pyiceberg, so overall I would like to keep it like this.

) -> Iterable[MetadataWorkUnit]:
try:
if not hasattr(thread_local, "local_catalog"):
LOGGER.debug(
Expand Down Expand Up @@ -248,10 +243,31 @@ def _process_dataset(dataset_path: Identifier) -> Iterable[MetadataWorkUnit]:
LOGGER.warning(
f"Iceberg Rest Catalog server error (500 status) encountered when processing table {dataset_path}, skipping it."
)
except ValueError as e:
if "Could not initialize FileIO" not in str(e):
raise
self.report.warning(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may you have forgot the redundant LOGGER.warning for this except case?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning method will automatically log, unlike report_warning. I should change all lines to use warning I think, see:

def report_warning(
self,
message: LiteralString,
context: Optional[str] = None,
title: Optional[LiteralString] = None,
exc: Optional[BaseException] = None,
) -> None:
self._structured_logs.report_log(
StructuredLogLevel.WARN, message, title, context, exc, log=False
)
def warning(
self,
message: LiteralString,
context: Optional[str] = None,
title: Optional[LiteralString] = None,
exc: Optional[BaseException] = None,
) -> None:
self._structured_logs.report_log(
StructuredLogLevel.WARN, message, title, context, exc, log=True
)

"Could not initialize FileIO",
f"Could not initialize FileIO for {dataset_path} due to: {e}",
)

def _process_dataset(dataset_path: Identifier) -> Iterable[MetadataWorkUnit]:
try:
LOGGER.debug(f"Processing dataset for path {dataset_path}")
dataset_name = ".".join(dataset_path)
if not self.config.table_pattern.allowed(dataset_name):
# Dataset name is rejected by pattern, report as dropped.
self.report.report_dropped(dataset_name)
LOGGER.debug(
f"Skipping table {dataset_name} due to not being allowed by the config pattern"
)
return

yield from _try_processing_dataset(dataset_path, dataset_name)
except Exception as e:
self.report.report_failure(
"general",
f"Failed to create workunit for dataset {dataset_name}: {e}",
f"Failed to create workunit for dataset {dataset_path}: {e}",
)
LOGGER.exception(
f"Exception while processing table {dataset_path}, skipping it.",
Expand Down Expand Up @@ -288,6 +304,7 @@ def _create_iceberg_workunit(
)

# Dataset properties aspect.
additional_properties = {}
custom_properties = table.metadata.properties.copy()
custom_properties["location"] = table.metadata.location
custom_properties["format-version"] = str(table.metadata.format_version)
Expand All @@ -299,10 +316,26 @@ def _create_iceberg_workunit(
custom_properties["manifest-list"] = (
table.current_snapshot().manifest_list
)
additional_properties["lastModified"] = TimeStampClass(
int(table.current_snapshot().timestamp_ms)
)
if "created-at" in custom_properties:
try:
dt = dateutil_parser.isoparse(custom_properties["created-at"])
additional_properties["created"] = TimeStampClass(
int(dt.timestamp() * 1000)
)
except Exception as ex:
LOGGER.warning(
f"Exception while trying to parse creation date {custom_properties['created-at']}, ignoring: {ex}"
)

dataset_properties = DatasetPropertiesClass(
name=table.name()[-1],
description=table.metadata.properties.get("comment", None),
customProperties=custom_properties,
lastModified=additional_properties.get("lastModified"),
created=additional_properties.get("created"),
)
dataset_snapshot.aspects.append(dataset_properties)
# Dataset ownership aspect.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import threading
from dataclasses import dataclass, field
from typing import Any, Dict, Optional

Expand Down Expand Up @@ -156,43 +157,53 @@ class TopTableTimings:
def __init__(self, size: int = 10):
self._size = size
self.top_entites = SortedList(key=lambda x: -x.get(self._VALUE_FIELD, 0))
self._lock = threading.Lock()

def add(self, entity: Dict[str, Any]) -> None:
if self._VALUE_FIELD not in entity:
return
self.top_entites.add(entity)
if len(self.top_entites) > self._size:
self.top_entites.pop()
with self._lock:
self.top_entites.add(entity)
if len(self.top_entites) > self._size:
self.top_entites.pop()

def __str__(self) -> str:
if len(self.top_entites) == 0:
return "no timings reported"
return str(list(self.top_entites))
with self._lock:
if len(self.top_entites) == 0:
return "no timings reported"
return str(list(self.top_entites))


class TimingClass:
times: SortedList

def __init__(self):
self.times = SortedList()
self._lock = threading.Lock()

def add_timing(self, t: float) -> None:
self.times.add(t)
with self._lock:
self.times.add(t)

def __str__(self) -> str:
if len(self.times) == 0:
return "no timings reported"
total = sum(self.times)
avg = total / len(self.times)
return str(
{
"average_time": format_timespan(avg, detailed=True, max_units=3),
"min_time": format_timespan(self.times[0], detailed=True, max_units=3),
"max_time": format_timespan(self.times[-1], detailed=True, max_units=3),
# total_time does not provide correct information in case we run in more than 1 thread
"total_time": format_timespan(total, detailed=True, max_units=3),
}
)
with self._lock:
if len(self.times) == 0:
return "no timings reported"
total = sum(self.times)
avg = total / len(self.times)
return str(
{
"average_time": format_timespan(avg, detailed=True, max_units=3),
"min_time": format_timespan(
self.times[0], detailed=True, max_units=3
),
"max_time": format_timespan(
self.times[-1], detailed=True, max_units=3
),
# total_time does not provide correct information in case we run in more than 1 thread
"total_time": format_timespan(total, detailed=True, max_units=3),
}
)


@dataclass
Expand Down
2 changes: 2 additions & 0 deletions metadata-ingestion/tests/integration/iceberg/test_iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
# these.
PATHS_IN_GOLDEN_FILE_TO_IGNORE = [
r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['customProperties'\]\['created-at'\]",
r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['com.linkedin.pegasus2avro.dataset.DatasetProperties'\]\['created'\]",
r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['com.linkedin.pegasus2avro.dataset.DatasetProperties'\]\['lastModified'\]",
r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['customProperties'\]\['snapshot-id'\]",
r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['customProperties'\]\['manifest-list'\]",
]
Expand Down
19 changes: 18 additions & 1 deletion metadata-ingestion/tests/unit/test_iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -968,6 +968,9 @@ def _raise_no_such_table_exception():
def _raise_server_error():
raise ServerError()

def _raise_fileio_error():
raise ValueError("Could not initialize FileIO: abc.dummy.fileio")

mock_catalog = MockCatalog(
{
"namespaceA": {
Expand Down Expand Up @@ -1024,6 +1027,7 @@ def _raise_server_error():
"table7": _raise_file_not_found_error,
"table8": _raise_no_such_iceberg_table_exception,
"table9": _raise_server_error,
"table10": _raise_fileio_error,
}
}
)
Expand All @@ -1047,7 +1051,7 @@ def _raise_server_error():
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table4,PROD)",
],
)
assert source.report.warnings.total_elements == 5
assert source.report.warnings.total_elements == 6
assert source.report.failures.total_elements == 0
assert source.report.tables_scanned == 4

Expand All @@ -1058,6 +1062,9 @@ def test_handle_unexpected_exceptions() -> None:
def _raise_exception():
raise Exception()

def _raise_other_value_error_exception():
raise ValueError("Other value exception")

mock_catalog = MockCatalog(
{
"namespaceA": {
Expand Down Expand Up @@ -1110,6 +1117,7 @@ def _raise_exception():
catalog=None,
),
"table5": _raise_exception,
"table6": _raise_other_value_error_exception,
}
}
)
Expand All @@ -1136,3 +1144,12 @@ def _raise_exception():
assert source.report.warnings.total_elements == 0
assert source.report.failures.total_elements == 1
assert source.report.tables_scanned == 4
# Needed to make sure all failures are recognized properly
failures = [f for f in source.report.failures]
TestCase().assertCountEqual(
failures[0].context,
[
"Failed to create workunit for dataset ('namespaceA', 'table6'): Other value exception",
"Failed to create workunit for dataset ('namespaceA', 'table5'): ",
],
)
Loading