Skip to content

Commit

Permalink
skip system schemas, tests, docs
Browse files Browse the repository at this point in the history
  • Loading branch information
mayurinehate committed Mar 5, 2025
1 parent eaa855c commit f48b5f1
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import json
import logging
from typing import Dict, Optional, Type, Union
from typing import Dict, Optional, Type, TypeVar, Union

from avrogen.dict_wrapper import DictWrapper
from pydantic import BaseModel
Expand All @@ -13,6 +13,7 @@
_REMAPPED_SCHEMA_TYPES = {
k.replace("pegasus2avro.", ""): v for k, v in SCHEMA_TYPES.items()
}
T = TypeVar("T", bound=BaseModel)


class SerializedResourceValue(BaseModel):
Expand Down Expand Up @@ -83,8 +84,8 @@ def as_pegasus_object(self) -> DictWrapper:
)

def as_pydantic_object(
self, model_type: Type[BaseModel], validate_schema_ref: bool = False
) -> BaseModel:
self, model_type: Type[T], validate_schema_ref: bool = False
) -> T:
"""
Parse the blob into a Pydantic-defined Python object based on the schema type and schema
ref.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,16 +137,12 @@ def generate_lineage(
def find_upstream_share(
self, share: Union[InboundDatashare | PartialInboundDatashare]
) -> Optional[OutboundSharePlatformResource]:
# TODO: handle case of partial inbound share
upstream_share: Optional[OutboundSharePlatformResource] = None

if not self.graph:
self.report.warning(
title="Upstream lineage of inbound datashare will be missing",
message="Missing datahub graph. Either use the datahub-rest sink or "
"set the top-level datahub_api config in the recipe",
)

else:
resources = self.get_platform_resources(self.graph, share)

Expand All @@ -172,15 +168,13 @@ def find_upstream_share(
# and type is "OUTBOUND_DATASHARE"
for resource in resources:
try:
# TODO: should this be part of platform resource helper ?
assert (
resource.resource_info is not None
and resource.resource_info.value is not None
)
upstream_share = OutboundSharePlatformResource.parse_raw(
resource.resource_info.value.blob
return resource.resource_info.value.as_pydantic_object(
OutboundSharePlatformResource, True
)
break
except Exception as e:
self.report.warning(
title="Upstream lineage of inbound datashare will be missing",
Expand All @@ -189,13 +183,17 @@ def find_upstream_share(
exc=e,
)

return upstream_share
return None

def get_platform_resources(
self,
graph: DataHubGraph,
share: Union[InboundDatashare, PartialInboundDatashare],
) -> List[PlatformResource]:
# NOTE: ideally we receive InboundDatashare and not PartialInboundDatashare.
# however due to varchar(128) type of database table that captures datashare options
# we may receive only partial information about inbound share
# Alternate option to get InboundDatashare using svv_datashares requires superuser
if isinstance(share, PartialInboundDatashare):
return list(
PlatformResource.search_by_filters(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def list_schemas(database: str) -> str:
cast(null as varchar(256)) as external_database
FROM svv_redshift_schemas
WHERE database_name = '{database}'
AND schema_name != 'pg_catalog' and schema_name != 'information_schema'
UNION ALL
SELECT
schemaname as schema_name,
Expand Down
67 changes: 67 additions & 0 deletions metadata-ingestion/tests/unit/redshift/test_redshift_datashares.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
RedshiftTable,
RedshiftView,
)
from datahub.ingestion.source.redshift.redshift_schema import PartialInboundDatashare
from datahub.ingestion.source.redshift.report import RedshiftReport
from datahub.metadata.schema_classes import (
PlatformResourceInfoClass,
Expand Down Expand Up @@ -110,6 +111,72 @@ def mock_search_by_key(*args, **kwargs):
]
assert result == expected_mappings

def test_generate_lineage_success_partial_inbound_share(self):
"""
Test generate_lineage method when share and graph exist, resources are found,
and upstream namespace and database are successfully identified.
"""
# Setup
config = get_redshift_config()
report = RedshiftReport()
graph = get_datahub_graph()
helper = RedshiftDatasharesHelper(config, report, graph)

# Mock input data
share = PartialInboundDatashare(
producer_namespace_prefix="producer_na",
share_name="test_share",
consumer_database="consumer_db",
)
tables: dict[str, list[RedshiftTable | RedshiftView]] = {
"schema1": [
RedshiftTable(name="table1", comment=None, created=None),
RedshiftTable(name="table2", comment=None, created=None),
],
"schema2": [RedshiftTable(name="table3", comment=None, created=None)],
}

# Mock PlatformResource.search_by_key
def mock_search_by_filters(*args, **kwargs):
resource = PlatformResource.create(
key=PlatformResourceKey(
platform="redshift",
platform_instance="producer_instance",
resource_type="OUTBOUND_DATASHARE",
primary_key="producer_namespace.some_share",
),
value=OutboundSharePlatformResource(
namespace="producer_namespace",
platform_instance="producer_instance",
env="PROD",
source_database="producer_db",
share_name="test_share",
),
)

return [resource]

with patch.object(PlatformResource, "search_by_filters") as mocked_method:
mocked_method.side_effect = mock_search_by_filters
result = list(helper.generate_lineage(share, tables))
# Assert
assert len(result) == 3
expected_mappings = [
KnownLineageMapping(
upstream_urn="urn:li:dataset:(urn:li:dataPlatform:redshift,producer_instance.producer_db.schema1.table1,PROD)",
downstream_urn="urn:li:dataset:(urn:li:dataPlatform:redshift,consumer_instance.consumer_db.schema1.table1,PROD)",
),
KnownLineageMapping(
upstream_urn="urn:li:dataset:(urn:li:dataPlatform:redshift,producer_instance.producer_db.schema1.table2,PROD)",
downstream_urn="urn:li:dataset:(urn:li:dataPlatform:redshift,consumer_instance.consumer_db.schema1.table2,PROD)",
),
KnownLineageMapping(
upstream_urn="urn:li:dataset:(urn:li:dataPlatform:redshift,producer_instance.producer_db.schema2.table3,PROD)",
downstream_urn="urn:li:dataset:(urn:li:dataPlatform:redshift,consumer_instance.consumer_db.schema2.table3,PROD)",
),
]
assert result == expected_mappings

def test_generate_lineage_missing_graph_reports_warning(self):
"""
Test generate_lineage when share is provided but graph is not available.
Expand Down

0 comments on commit f48b5f1

Please sign in to comment.