-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(ingestion/iceberg): Several improvements to iceberg connector #12744
Changes from all commits
3d46ab4
6f25437
4578c29
f3d1acf
c463df7
29eba50
7e9fd3d
46cabf0
52dc9aa
dba1a6f
57daf0f
02031c7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -2,8 +2,9 @@ | |||||||||||||||||||||||||||||||||||||||||||
import logging | ||||||||||||||||||||||||||||||||||||||||||||
import threading | ||||||||||||||||||||||||||||||||||||||||||||
import uuid | ||||||||||||||||||||||||||||||||||||||||||||
from typing import Any, Dict, Iterable, List, Optional | ||||||||||||||||||||||||||||||||||||||||||||
from typing import Any, Dict, Iterable, List, Optional, Tuple | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
from dateutil import parser as dateutil_parser | ||||||||||||||||||||||||||||||||||||||||||||
from pyiceberg.catalog import Catalog | ||||||||||||||||||||||||||||||||||||||||||||
from pyiceberg.exceptions import ( | ||||||||||||||||||||||||||||||||||||||||||||
NoSuchIcebergTableError, | ||||||||||||||||||||||||||||||||||||||||||||
|
@@ -81,6 +82,7 @@ | |||||||||||||||||||||||||||||||||||||||||||
OwnerClass, | ||||||||||||||||||||||||||||||||||||||||||||
OwnershipClass, | ||||||||||||||||||||||||||||||||||||||||||||
OwnershipTypeClass, | ||||||||||||||||||||||||||||||||||||||||||||
TimeStampClass, | ||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||
from datahub.utilities.perf_timer import PerfTimer | ||||||||||||||||||||||||||||||||||||||||||||
from datahub.utilities.threaded_iterator_executor import ThreadedIteratorExecutor | ||||||||||||||||||||||||||||||||||||||||||||
|
@@ -183,16 +185,9 @@ def _get_datasets(self, catalog: Catalog) -> Iterable[Identifier]: | |||||||||||||||||||||||||||||||||||||||||||
def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: | ||||||||||||||||||||||||||||||||||||||||||||
thread_local = threading.local() | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
def _process_dataset(dataset_path: Identifier) -> Iterable[MetadataWorkUnit]: | ||||||||||||||||||||||||||||||||||||||||||||
LOGGER.debug(f"Processing dataset for path {dataset_path}") | ||||||||||||||||||||||||||||||||||||||||||||
dataset_name = ".".join(dataset_path) | ||||||||||||||||||||||||||||||||||||||||||||
if not self.config.table_pattern.allowed(dataset_name): | ||||||||||||||||||||||||||||||||||||||||||||
# Dataset name is rejected by pattern, report as dropped. | ||||||||||||||||||||||||||||||||||||||||||||
self.report.report_dropped(dataset_name) | ||||||||||||||||||||||||||||||||||||||||||||
LOGGER.debug( | ||||||||||||||||||||||||||||||||||||||||||||
f"Skipping table {dataset_name} due to not being allowed by the config pattern" | ||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||
return | ||||||||||||||||||||||||||||||||||||||||||||
def _try_processing_dataset( | ||||||||||||||||||||||||||||||||||||||||||||
dataset_path: Tuple[str, ...], dataset_name: str | ||||||||||||||||||||||||||||||||||||||||||||
) -> Iterable[MetadataWorkUnit]: | ||||||||||||||||||||||||||||||||||||||||||||
try: | ||||||||||||||||||||||||||||||||||||||||||||
if not hasattr(thread_local, "local_catalog"): | ||||||||||||||||||||||||||||||||||||||||||||
LOGGER.debug( | ||||||||||||||||||||||||||||||||||||||||||||
|
@@ -248,10 +243,31 @@ def _process_dataset(dataset_path: Identifier) -> Iterable[MetadataWorkUnit]: | |||||||||||||||||||||||||||||||||||||||||||
LOGGER.warning( | ||||||||||||||||||||||||||||||||||||||||||||
f"Iceberg Rest Catalog server error (500 status) encountered when processing table {dataset_path}, skipping it." | ||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||
except ValueError as e: | ||||||||||||||||||||||||||||||||||||||||||||
if "Could not initialize FileIO" not in str(e): | ||||||||||||||||||||||||||||||||||||||||||||
raise | ||||||||||||||||||||||||||||||||||||||||||||
self.report.warning( | ||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. may you have forgot the redundant There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
datahub/metadata-ingestion/src/datahub/ingestion/api/source.py Lines 247 to 267 in 5309ae0
|
||||||||||||||||||||||||||||||||||||||||||||
"Could not initialize FileIO", | ||||||||||||||||||||||||||||||||||||||||||||
f"Could not initialize FileIO for {dataset_path} due to: {e}", | ||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
def _process_dataset(dataset_path: Identifier) -> Iterable[MetadataWorkUnit]: | ||||||||||||||||||||||||||||||||||||||||||||
try: | ||||||||||||||||||||||||||||||||||||||||||||
LOGGER.debug(f"Processing dataset for path {dataset_path}") | ||||||||||||||||||||||||||||||||||||||||||||
dataset_name = ".".join(dataset_path) | ||||||||||||||||||||||||||||||||||||||||||||
if not self.config.table_pattern.allowed(dataset_name): | ||||||||||||||||||||||||||||||||||||||||||||
# Dataset name is rejected by pattern, report as dropped. | ||||||||||||||||||||||||||||||||||||||||||||
self.report.report_dropped(dataset_name) | ||||||||||||||||||||||||||||||||||||||||||||
LOGGER.debug( | ||||||||||||||||||||||||||||||||||||||||||||
f"Skipping table {dataset_name} due to not being allowed by the config pattern" | ||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||
return | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
yield from _try_processing_dataset(dataset_path, dataset_name) | ||||||||||||||||||||||||||||||||||||||||||||
except Exception as e: | ||||||||||||||||||||||||||||||||||||||||||||
self.report.report_failure( | ||||||||||||||||||||||||||||||||||||||||||||
"general", | ||||||||||||||||||||||||||||||||||||||||||||
f"Failed to create workunit for dataset {dataset_name}: {e}", | ||||||||||||||||||||||||||||||||||||||||||||
f"Failed to create workunit for dataset {dataset_path}: {e}", | ||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||
LOGGER.exception( | ||||||||||||||||||||||||||||||||||||||||||||
f"Exception while processing table {dataset_path}, skipping it.", | ||||||||||||||||||||||||||||||||||||||||||||
|
@@ -288,6 +304,7 @@ def _create_iceberg_workunit( | |||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
# Dataset properties aspect. | ||||||||||||||||||||||||||||||||||||||||||||
additional_properties = {} | ||||||||||||||||||||||||||||||||||||||||||||
custom_properties = table.metadata.properties.copy() | ||||||||||||||||||||||||||||||||||||||||||||
custom_properties["location"] = table.metadata.location | ||||||||||||||||||||||||||||||||||||||||||||
custom_properties["format-version"] = str(table.metadata.format_version) | ||||||||||||||||||||||||||||||||||||||||||||
|
@@ -299,10 +316,27 @@ def _create_iceberg_workunit( | |||||||||||||||||||||||||||||||||||||||||||
custom_properties["manifest-list"] = ( | ||||||||||||||||||||||||||||||||||||||||||||
table.current_snapshot().manifest_list | ||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||
additional_properties["lastModified"] = TimeStampClass( | ||||||||||||||||||||||||||||||||||||||||||||
int(table.current_snapshot().timestamp_ms) | ||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||
if "created-at" in custom_properties: | ||||||||||||||||||||||||||||||||||||||||||||
try: | ||||||||||||||||||||||||||||||||||||||||||||
dt = dateutil_parser.isoparse(custom_properties["created-at"]) | ||||||||||||||||||||||||||||||||||||||||||||
additional_properties["created"] = TimeStampClass( | ||||||||||||||||||||||||||||||||||||||||||||
int(dt.timestamp() * 1000) | ||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||
except Exception as ex: | ||||||||||||||||||||||||||||||||||||||||||||
LOGGER.warning( | ||||||||||||||||||||||||||||||||||||||||||||
f"Exception while trying to parse creation date {custom_properties['created-at']}, ignoring: {ex}" | ||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
dataset_properties = DatasetPropertiesClass( | ||||||||||||||||||||||||||||||||||||||||||||
name=table.name()[-1], | ||||||||||||||||||||||||||||||||||||||||||||
description=table.metadata.properties.get("comment", None), | ||||||||||||||||||||||||||||||||||||||||||||
customProperties=custom_properties, | ||||||||||||||||||||||||||||||||||||||||||||
lastModified=additional_properties.get("lastModified"), | ||||||||||||||||||||||||||||||||||||||||||||
created=additional_properties.get("created"), | ||||||||||||||||||||||||||||||||||||||||||||
qualifiedName=dataset_name, | ||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||
dataset_snapshot.aspects.append(dataset_properties) | ||||||||||||||||||||||||||||||||||||||||||||
# Dataset ownership aspect. | ||||||||||||||||||||||||||||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tuple[str, ...]
seems equivalent toList[str]
I guess only difference is the tuple ensures a size >= 1 and that's relevant here, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are more differences between a tuple and a list and this annotation reflects that. For one, tuple is immutable and passed by value. Moreover this annotation matches what we are getting from
pyiceberg
, so overall I would like to keep it like this.