Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion/superset): superset column level lineage #12786

Draft
wants to merge 9 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 89 additions & 15 deletions metadata-ingestion/src/datahub/ingestion/source/superset.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
make_dataset_urn,
make_dataset_urn_with_platform_instance,
make_domain_urn,
make_schema_field_urn,
make_user_urn,
)
from datahub.emitter.mcp_builder import add_domain_to_entity_wu
Expand Down Expand Up @@ -72,6 +73,9 @@
DashboardInfoClass,
DatasetLineageTypeClass,
DatasetPropertiesClass,
FineGrainedLineageClass,
FineGrainedLineageDownstreamTypeClass,
FineGrainedLineageUpstreamTypeClass,
GlobalTagsClass,
OwnerClass,
OwnershipClass,
Expand All @@ -80,6 +84,10 @@
UpstreamClass,
UpstreamLineageClass,
)
from datahub.sql_parsing.sqlglot_lineage import (
SqlParsingResult,
create_lineage_sql_parsed_result,
)
from datahub.utilities import config_clean
from datahub.utilities.lossy_collections import LossyList
from datahub.utilities.registries.domain_registry import DomainRegistry
Expand Down Expand Up @@ -682,6 +690,52 @@ def gen_dataset_urn(self, datahub_dataset_name: str) -> str:
env=self.config.env,
)

def generate_virtual_dataset_lineage(
self,
parsed_query_object: SqlParsingResult,
datasource_urn: str,
dataset_url: str,
) -> UpstreamLineageClass:
cll = (
parsed_query_object.column_lineage
if parsed_query_object.column_lineage is not None
else []
)

fine_grained_lineages: List[FineGrainedLineageClass] = []

for cll_info in cll:
downstream = (
[make_schema_field_urn(datasource_urn, cll_info.downstream.column)]
if cll_info.downstream and cll_info.downstream.column
else []
)
upstreams = [
make_schema_field_urn(column_ref.table, column_ref.column)
for column_ref in cll_info.upstreams
]
fine_grained_lineages.append(
FineGrainedLineageClass(
downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD,
downstreams=downstream,
upstreamType=FineGrainedLineageUpstreamTypeClass.FIELD_SET,
upstreams=upstreams,
)
)

upstream_lineage = UpstreamLineageClass(
upstreams=[
UpstreamClass(
type=DatasetLineageTypeClass.TRANSFORMED,
dataset=input_table_urn,
properties={"externalUrl": dataset_url},
)
for input_table_urn in parsed_query_object.in_tables
],
fineGrainedLineages=fine_grained_lineages,
)
return upstream_lineage

def construct_dataset_from_dataset_data(
self, dataset_data: dict
) -> DatasetSnapshot:
Expand All @@ -702,6 +756,14 @@ def construct_dataset_from_dataset_data(
upstream_warehouse_platform = (
dataset_response.get("result", {}).get("database", {}).get("backend")
)
upstream_warehouse_db_name = (
dataset_response.get("result", {}).get("database", {}).get("database_name")
)

# if we have rendered sql, we always use that and defualt back to regular sql
sql = dataset_response.get("result", {}).get(
"rendered_sql"
) or dataset_response.get("result", {}).get("sql")

# Preset has a way of naming their platforms differently than
# how datahub names them, so map the platform name to the correct naming
Expand All @@ -714,23 +776,35 @@ def construct_dataset_from_dataset_data(
if upstream_warehouse_platform in warehouse_naming:
upstream_warehouse_platform = warehouse_naming[upstream_warehouse_platform]

# TODO: Categorize physical vs virtual upstream dataset
# mark all upstream dataset as physical for now, in the future we would ideally like
# to differentiate physical vs virtual upstream datasets
tag_urn = f"urn:li:tag:{self.platform}:physical"
upstream_dataset = self.get_datasource_urn_from_id(
dataset_response, upstream_warehouse_platform
)
upstream_lineage = UpstreamLineageClass(
upstreams=[
UpstreamClass(
type=DatasetLineageTypeClass.TRANSFORMED,
dataset=upstream_dataset,
properties={"externalUrl": dataset_url},
)
]
parsed_query_object = create_lineage_sql_parsed_result(
query=sql,
default_db=upstream_warehouse_db_name,
platform=upstream_warehouse_platform,
platform_instance=None,
env=self.config.env,
)

# if we sql, we label the datasets as virtual
if sql:
tag_urn = f"urn:li:tag:{self.platform}:virtual"
upstream_lineage = self.generate_virtual_dataset_lineage(
parsed_query_object, datasource_urn, dataset_url
)
else:
tag_urn = f"urn:li:tag:{self.platform}:physical"
upstream_dataset = self.get_datasource_urn_from_id(
dataset_response, upstream_warehouse_platform
)
upstream_lineage = UpstreamLineageClass(
upstreams=[
UpstreamClass(
type=DatasetLineageTypeClass.TRANSFORMED,
dataset=upstream_dataset,
properties={"externalUrl": dataset_url},
)
]
)

dataset_info = DatasetPropertiesClass(
name=dataset.table_name,
description="",
Expand Down
Loading
Loading