Skip to content

Commit

Permalink
S3 and GCS external urls (#12762)
Browse files Browse the repository at this point in the history
  • Loading branch information
acrylJonny authored Mar 3, 2025
1 parent 8d7af35 commit a76d8eb
Show file tree
Hide file tree
Showing 19 changed files with 2,588 additions and 2,488 deletions.
55 changes: 52 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/gcs/gcs_source.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import logging
from typing import Dict, Iterable, List, Optional
from typing import Any, Dict, Iterable, List, Optional
from urllib.parse import unquote

from pandas import DataFrame
from pydantic import Field, SecretStr, validator

from datahub.configuration.common import ConfigModel
Expand All @@ -20,9 +21,14 @@
from datahub.ingestion.source.data_lake_common.config import PathSpecsConfigMixin
from datahub.ingestion.source.data_lake_common.data_lake_utils import PLATFORM_GCS
from datahub.ingestion.source.data_lake_common.path_spec import PathSpec, is_gcs_uri
from datahub.ingestion.source.gcs.gcs_utils import (
get_gcs_bucket_name,
get_gcs_bucket_relative_path,
)
from datahub.ingestion.source.s3.config import DataLakeSourceConfig
from datahub.ingestion.source.s3.datalake_profiler_config import DataLakeProfilerConfig
from datahub.ingestion.source.s3.report import DataLakeSourceReport
from datahub.ingestion.source.s3.source import S3Source
from datahub.ingestion.source.s3.source import S3Source, TableData
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
StatefulStaleMetadataRemovalConfig,
Expand Down Expand Up @@ -57,6 +63,19 @@ class GCSSourceConfig(
description="Number of files to list to sample for schema inference. This will be ignored if sample_files is set to False in the pathspec.",
)

profiling: Optional[DataLakeProfilerConfig] = Field(
default=DataLakeProfilerConfig(), description="Data profiling configuration"
)

spark_driver_memory: str = Field(
default="4g", description="Max amount of memory to grant Spark."
)

spark_config: Dict[str, Any] = Field(
description="Spark configuration properties",
default={},
)

stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None

@validator("path_specs", always=True)
Expand All @@ -72,6 +91,9 @@ def check_path_specs_and_infer_platform(

return path_specs

def is_profiling_enabled(self) -> bool:
return self.profiling is not None and self.profiling.enabled


class GCSSourceReport(DataLakeSourceReport):
pass
Expand All @@ -82,7 +104,7 @@ class GCSSourceReport(DataLakeSourceReport):
@support_status(SupportStatus.INCUBATING)
@capability(SourceCapability.CONTAINERS, "Enabled by default")
@capability(SourceCapability.SCHEMA_METADATA, "Enabled by default")
@capability(SourceCapability.DATA_PROFILING, "Not supported", supported=False)
@capability(SourceCapability.DATA_PROFILING, "Enabled via configuration")
class GCSSource(StatefulIngestionSourceBase):
def __init__(self, config: GCSSourceConfig, ctx: PipelineContext):
super().__init__(config, ctx)
Expand Down Expand Up @@ -110,6 +132,11 @@ def create_equivalent_s3_config(self):
env=self.config.env,
max_rows=self.config.max_rows,
number_of_files_to_sample=self.config.number_of_files_to_sample,
profiling=self.config.profiling,
spark_driver_memory=self.config.spark_driver_memory,
spark_config=self.config.spark_config,
use_s3_bucket_tags=False,
use_s3_object_tags=False,
)
return s3_config

Expand Down Expand Up @@ -145,6 +172,28 @@ def s3_source_overrides(self, source: S3Source) -> S3Source:
source.create_s3_path = lambda bucket_name, key: unquote( # type: ignore
f"s3://{bucket_name}/{key}"
)

if self.config.is_profiling_enabled():
original_read_file_spark = source.read_file_spark

from types import MethodType

def read_file_spark_with_gcs(
self_source: S3Source, file: str, ext: str
) -> Optional[DataFrame]:
# Convert s3:// path back to gs:// for Spark
if file.startswith("s3://"):
file = f"gs://{file[5:]}"
return original_read_file_spark(file, ext)

source.read_file_spark = MethodType(read_file_spark_with_gcs, source) # type: ignore

def get_external_url_override(table_data: TableData) -> Optional[str]:
bucket_name = get_gcs_bucket_name(table_data.table_path)
key_prefix = get_gcs_bucket_relative_path(table_data.table_path)
return f"https://console.cloud.google.com/storage/browser/{bucket_name}/{key_prefix}"

source.get_external_url = get_external_url_override # type: ignore
return source

def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
Expand Down
16 changes: 12 additions & 4 deletions metadata-ingestion/src/datahub/ingestion/source/gcs/gcs_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,18 @@ def strip_gcs_prefix(gcs_uri: str) -> str:
return gcs_uri[len(GCS_PREFIX) :]


def get_gcs_bucket_name(path):
if not is_gcs_uri(path):
raise ValueError(f"Not a GCS URI. Must start with prefixe: {GCS_PREFIX}")
return strip_gcs_prefix(path).split("/")[0]
def get_gcs_bucket_name(path: str) -> str:
"""Get the bucket name from either a GCS (gs://) or S3-style (s3://) URI."""
# Handle both gs:// and s3:// prefixes since we use S3-style URIs internally
if is_gcs_uri(path):
return strip_gcs_prefix(path).split("/")[0]
elif path.startswith("s3://"):
# For internal S3-style paths used by the source
return path[5:].split("/")[0]
else:
raise ValueError(
f"Not a valid GCS or S3 URI. Must start with prefixes: {GCS_PREFIX} or s3://"
)


def get_gcs_bucket_relative_path(gcs_uri: str) -> str:
Expand Down
11 changes: 11 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/s3/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,16 @@ def __create_partition_summary_aspect(
maxPartition=max_partition_summary, minPartition=min_partition_summary
)

def get_external_url(self, table_data: TableData) -> Optional[str]:
if self.is_s3_platform() and self.source_config.aws_config:
# Get region from AWS config, default to us-east-1 if not specified
region = self.source_config.aws_config.aws_region or "us-east-1"
bucket_name = get_bucket_name(table_data.table_path)
key_prefix = get_bucket_relative_path(table_data.table_path)
external_url = f"https://{region}.console.aws.amazon.com/s3/buckets/{bucket_name}?prefix={key_prefix}"
return external_url
return None

def ingest_table(
self, table_data: TableData, path_spec: PathSpec
) -> Iterable[MetadataWorkUnit]:
Expand Down Expand Up @@ -674,6 +684,7 @@ def ingest_table(
if max_partition
else None
),
externalUrl=self.get_external_url(table_data),
)
aspects.append(dataset_properties)
if table_data.size_in_bytes > 0:
Expand Down
Loading

0 comments on commit a76d8eb

Please sign in to comment.