Skip to content

Commit

Permalink
Added handling of FileIO errors
Browse files Browse the repository at this point in the history
  • Loading branch information
skrydal committed Feb 27, 2025
1 parent 6f25437 commit 4578c29
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 13 deletions.
38 changes: 26 additions & 12 deletions metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
import threading
import uuid
from typing import Any, Dict, Iterable, List, Optional
from typing import Any, Dict, Iterable, List, Optional, Tuple

from dateutil import parser as dateutil_parser
from pyiceberg.catalog import Catalog
Expand Down Expand Up @@ -185,16 +185,9 @@ def _get_datasets(self, catalog: Catalog) -> Iterable[Identifier]:
def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
thread_local = threading.local()

def _process_dataset(dataset_path: Identifier) -> Iterable[MetadataWorkUnit]:
LOGGER.debug(f"Processing dataset for path {dataset_path}")
dataset_name = ".".join(dataset_path)
if not self.config.table_pattern.allowed(dataset_name):
# Dataset name is rejected by pattern, report as dropped.
self.report.report_dropped(dataset_name)
LOGGER.debug(
f"Skipping table {dataset_name} due to not being allowed by the config pattern"
)
return
def _try_processing_dataset(
dataset_path: Tuple[str, ...], dataset_name: str
) -> Iterable[MetadataWorkUnit]:
try:
if not hasattr(thread_local, "local_catalog"):
LOGGER.debug(
Expand Down Expand Up @@ -250,10 +243,31 @@ def _process_dataset(dataset_path: Identifier) -> Iterable[MetadataWorkUnit]:
LOGGER.warning(
f"Iceberg Rest Catalog server error (500 status) encountered when processing table {dataset_path}, skipping it."
)
except ValueError as e:
if "Could not initialize FileIO" not in str(e):
raise
self.report.warning(
"Could not initialize FileIO",
f"Could not initialize FileIO for {dataset_path} due to: {e}",
)

def _process_dataset(dataset_path: Identifier) -> Iterable[MetadataWorkUnit]:
try:
LOGGER.debug(f"Processing dataset for path {dataset_path}")
dataset_name = ".".join(dataset_path)
if not self.config.table_pattern.allowed(dataset_name):
# Dataset name is rejected by pattern, report as dropped.
self.report.report_dropped(dataset_name)
LOGGER.debug(
f"Skipping table {dataset_name} due to not being allowed by the config pattern"
)
return []

yield from _try_processing_dataset(dataset_path, dataset_name)
except Exception as e:
self.report.report_failure(
"general",
f"Failed to create workunit for dataset {dataset_name}: {e}",
f"Failed to create workunit for dataset {dataset_path}: {e}",
)
LOGGER.exception(
f"Exception while processing table {dataset_path}, skipping it.",
Expand Down
19 changes: 18 additions & 1 deletion metadata-ingestion/tests/unit/test_iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -968,6 +968,9 @@ def _raise_no_such_table_exception():
def _raise_server_error():
raise ServerError()

def _raise_fileio_error():
raise ValueError("Could not initialize FileIO: abc.dummy.fileio")

mock_catalog = MockCatalog(
{
"namespaceA": {
Expand Down Expand Up @@ -1024,6 +1027,7 @@ def _raise_server_error():
"table7": _raise_file_not_found_error,
"table8": _raise_no_such_iceberg_table_exception,
"table9": _raise_server_error,
"table10": _raise_fileio_error,
}
}
)
Expand All @@ -1047,7 +1051,7 @@ def _raise_server_error():
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table4,PROD)",
],
)
assert source.report.warnings.total_elements == 5
assert source.report.warnings.total_elements == 6
assert source.report.failures.total_elements == 0
assert source.report.tables_scanned == 4

Expand All @@ -1058,6 +1062,9 @@ def test_handle_unexpected_exceptions() -> None:
def _raise_exception():
raise Exception()

def _raise_other_value_error_exception():
raise ValueError("Other value exception")

mock_catalog = MockCatalog(
{
"namespaceA": {
Expand Down Expand Up @@ -1110,6 +1117,7 @@ def _raise_exception():
catalog=None,
),
"table5": _raise_exception,
"table6": _raise_other_value_error_exception,
}
}
)
Expand All @@ -1136,3 +1144,12 @@ def _raise_exception():
assert source.report.warnings.total_elements == 0
assert source.report.failures.total_elements == 1
assert source.report.tables_scanned == 4
# Needed to make sure all failures are recognized properly
failures = [entry for entry in source.report.failures[0][1].context]
TestCase().assertCountEqual(
failures,
[
"Failed to create workunit for dataset ('namespaceA', 'table6'): Other value exception",
"Failed to create workunit for dataset ('namespaceA', 'table5'): ",
],
)

0 comments on commit 4578c29

Please sign in to comment.