diff --git a/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py b/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py index 7f9fe4279f7e22..0798c5268e8b10 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py +++ b/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py @@ -2,7 +2,7 @@ import logging import threading import uuid -from typing import Any, Dict, Iterable, List, Optional +from typing import Any, Dict, Iterable, List, Optional, Tuple from dateutil import parser as dateutil_parser from pyiceberg.catalog import Catalog @@ -185,16 +185,9 @@ def _get_datasets(self, catalog: Catalog) -> Iterable[Identifier]: def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: thread_local = threading.local() - def _process_dataset(dataset_path: Identifier) -> Iterable[MetadataWorkUnit]: - LOGGER.debug(f"Processing dataset for path {dataset_path}") - dataset_name = ".".join(dataset_path) - if not self.config.table_pattern.allowed(dataset_name): - # Dataset name is rejected by pattern, report as dropped. - self.report.report_dropped(dataset_name) - LOGGER.debug( - f"Skipping table {dataset_name} due to not being allowed by the config pattern" - ) - return + def _try_processing_dataset( + dataset_path: Tuple[str, ...], dataset_name: str + ) -> Iterable[MetadataWorkUnit]: try: if not hasattr(thread_local, "local_catalog"): LOGGER.debug( @@ -250,10 +243,31 @@ def _process_dataset(dataset_path: Identifier) -> Iterable[MetadataWorkUnit]: LOGGER.warning( f"Iceberg Rest Catalog server error (500 status) encountered when processing table {dataset_path}, skipping it." ) + except ValueError as e: + if "Could not initialize FileIO" not in str(e): + raise + self.report.warning( + "Could not initialize FileIO", + f"Could not initialize FileIO for {dataset_path} due to: {e}", + ) + + def _process_dataset(dataset_path: Identifier) -> Iterable[MetadataWorkUnit]: + try: + LOGGER.debug(f"Processing dataset for path {dataset_path}") + dataset_name = ".".join(dataset_path) + if not self.config.table_pattern.allowed(dataset_name): + # Dataset name is rejected by pattern, report as dropped. + self.report.report_dropped(dataset_name) + LOGGER.debug( + f"Skipping table {dataset_name} due to not being allowed by the config pattern" + ) + return [] + + yield from _try_processing_dataset(dataset_path, dataset_name) except Exception as e: self.report.report_failure( "general", - f"Failed to create workunit for dataset {dataset_name}: {e}", + f"Failed to create workunit for dataset {dataset_path}: {e}", ) LOGGER.exception( f"Exception while processing table {dataset_path}, skipping it.", diff --git a/metadata-ingestion/tests/unit/test_iceberg.py b/metadata-ingestion/tests/unit/test_iceberg.py index 12a7228ab792c2..5f808bdf80da53 100644 --- a/metadata-ingestion/tests/unit/test_iceberg.py +++ b/metadata-ingestion/tests/unit/test_iceberg.py @@ -968,6 +968,9 @@ def _raise_no_such_table_exception(): def _raise_server_error(): raise ServerError() + def _raise_fileio_error(): + raise ValueError("Could not initialize FileIO: abc.dummy.fileio") + mock_catalog = MockCatalog( { "namespaceA": { @@ -1024,6 +1027,7 @@ def _raise_server_error(): "table7": _raise_file_not_found_error, "table8": _raise_no_such_iceberg_table_exception, "table9": _raise_server_error, + "table10": _raise_fileio_error, } } ) @@ -1047,7 +1051,7 @@ def _raise_server_error(): "urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table4,PROD)", ], ) - assert source.report.warnings.total_elements == 5 + assert source.report.warnings.total_elements == 6 assert source.report.failures.total_elements == 0 assert source.report.tables_scanned == 4 @@ -1058,6 +1062,9 @@ def test_handle_unexpected_exceptions() -> None: def _raise_exception(): raise Exception() + def _raise_other_value_error_exception(): + raise ValueError("Other value exception") + mock_catalog = MockCatalog( { "namespaceA": { @@ -1110,6 +1117,7 @@ def _raise_exception(): catalog=None, ), "table5": _raise_exception, + "table6": _raise_other_value_error_exception, } } ) @@ -1136,3 +1144,12 @@ def _raise_exception(): assert source.report.warnings.total_elements == 0 assert source.report.failures.total_elements == 1 assert source.report.tables_scanned == 4 + # Needed to make sure all failures are recognized properly + failures = [entry for entry in source.report.failures[0][1].context] + TestCase().assertCountEqual( + failures, + [ + "Failed to create workunit for dataset ('namespaceA', 'table6'): Other value exception", + "Failed to create workunit for dataset ('namespaceA', 'table5'): ", + ], + )