Skip to content

Commit

Permalink
feat(ingestion/superset): ownership info for charts, dashboards and d…
Browse files Browse the repository at this point in the history
…atasets (#12750)
  • Loading branch information
PeteMango authored Mar 4, 2025
1 parent be42e11 commit 9e7f482
Show file tree
Hide file tree
Showing 8 changed files with 843 additions and 115 deletions.
109 changes: 89 additions & 20 deletions metadata-ingestion/src/datahub/ingestion/source/superset.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
make_dataset_urn,
make_dataset_urn_with_platform_instance,
make_domain_urn,
make_user_urn,
)
from datahub.emitter.mcp_builder import add_domain_to_entity_wu
from datahub.ingestion.api.common import PipelineContext
Expand All @@ -46,7 +47,6 @@
StatefulIngestionSourceBase,
)
from datahub.metadata.com.linkedin.pegasus2avro.common import (
AuditStamp,
ChangeAuditStamps,
Status,
TimeStamp,
Expand All @@ -65,12 +65,16 @@
SchemaMetadata,
)
from datahub.metadata.schema_classes import (
AuditStampClass,
ChartInfoClass,
ChartTypeClass,
DashboardInfoClass,
DatasetLineageTypeClass,
DatasetPropertiesClass,
GlobalTagsClass,
OwnerClass,
OwnershipClass,
OwnershipTypeClass,
TagAssociationClass,
UpstreamClass,
UpstreamLineageClass,
Expand Down Expand Up @@ -234,6 +238,7 @@ def __init__(self, ctx: PipelineContext, config: SupersetConfig):
graph=self.ctx.graph,
)
self.session = self.login()
self.owner_info = self.parse_owner_info()

def login(self) -> requests.Session:
login_response = requests.post(
Expand Down Expand Up @@ -273,7 +278,7 @@ def paginate_entity_api_results(self, entity_type, page_size=100):

while current_page * page_size < total_items:
response = self.session.get(
f"{self.config.connect_uri}/api/v1/{entity_type}/",
f"{self.config.connect_uri}/api/v1/{entity_type}",
params={"q": f"(page:{current_page},page_size:{page_size})"},
)

Expand All @@ -289,6 +294,25 @@ def paginate_entity_api_results(self, entity_type, page_size=100):

current_page += 1

def parse_owner_info(self) -> Dict[str, Any]:
entity_types = ["dataset", "dashboard", "chart"]
owners_info = {}

for entity in entity_types:
for owner in self.paginate_entity_api_results(f"{entity}/related/owners"):
owner_id = owner.get("value")
if owner_id:
owners_info[owner_id] = owner.get("extra", {}).get("email", "")

return owners_info

def build_owner_urn(self, data: Dict[str, Any]) -> List[str]:
return [
make_user_urn(self.owner_info.get(owner.get("id"), ""))
for owner in data.get("owners", [])
if owner.get("id")
]

@lru_cache(maxsize=None)
def get_dataset_info(self, dataset_id: int) -> dict:
dataset_response = self.session.get(
Expand Down Expand Up @@ -346,15 +370,16 @@ def construct_dashboard_from_api_data(
aspects=[Status(removed=False)],
)

modified_actor = f"urn:li:corpuser:{(dashboard_data.get('changed_by') or {}).get('username', 'unknown')}"
modified_actor = f"urn:li:corpuser:{self.owner_info.get((dashboard_data.get('changed_by') or {}).get('id', -1), 'unknown')}"
modified_ts = int(
dp.parse(dashboard_data.get("changed_on_utc", "now")).timestamp() * 1000
)
title = dashboard_data.get("dashboard_title", "")
# note: the API does not currently supply created_by usernames due to a bug
last_modified = ChangeAuditStamps(
created=None,
lastModified=AuditStamp(time=modified_ts, actor=modified_actor),
last_modified = AuditStampClass(time=modified_ts, actor=modified_actor)

change_audit_stamps = ChangeAuditStamps(
created=None, lastModified=last_modified
)
dashboard_url = f"{self.config.display_uri}{dashboard_data.get('url', '')}"

Expand All @@ -380,7 +405,7 @@ def construct_dashboard_from_api_data(
"IsPublished": str(dashboard_data.get("published", False)).lower(),
"Owners": ", ".join(
map(
lambda owner: owner.get("username", "unknown"),
lambda owner: self.owner_info.get(owner.get("id", -1), "unknown"),
dashboard_data.get("owners", []),
)
),
Expand All @@ -400,15 +425,29 @@ def construct_dashboard_from_api_data(
description="",
title=title,
charts=chart_urns,
lastModified=last_modified,
dashboardUrl=dashboard_url,
customProperties=custom_properties,
lastModified=change_audit_stamps,
)
dashboard_snapshot.aspects.append(dashboard_info)

dashboard_owners_list = self.build_owner_urn(dashboard_data)
owners_info = OwnershipClass(
owners=[
OwnerClass(
owner=urn,
type=OwnershipTypeClass.TECHNICAL_OWNER,
)
for urn in (dashboard_owners_list or [])
],
lastModified=last_modified,
)
dashboard_snapshot.aspects.append(owners_info)

return dashboard_snapshot

def emit_dashboard_mces(self) -> Iterable[MetadataWorkUnit]:
for dashboard_data in self.paginate_entity_api_results("dashboard", PAGE_SIZE):
for dashboard_data in self.paginate_entity_api_results("dashboard/", PAGE_SIZE):
try:
dashboard_snapshot = self.construct_dashboard_from_api_data(
dashboard_data
Expand Down Expand Up @@ -437,17 +476,19 @@ def construct_chart_from_chart_data(self, chart_data: dict) -> ChartSnapshot:
aspects=[Status(removed=False)],
)

modified_actor = f"urn:li:corpuser:{(chart_data.get('changed_by') or {}).get('username', 'unknown')}"
modified_actor = f"urn:li:corpuser:{self.owner_info.get((chart_data.get('changed_by') or {}).get('id', -1), 'unknown')}"
modified_ts = int(
dp.parse(chart_data.get("changed_on_utc", "now")).timestamp() * 1000
)
title = chart_data.get("slice_name", "")

# note: the API does not currently supply created_by usernames due to a bug
last_modified = ChangeAuditStamps(
created=None,
lastModified=AuditStamp(time=modified_ts, actor=modified_actor),
last_modified = AuditStampClass(time=modified_ts, actor=modified_actor)

change_audit_stamps = ChangeAuditStamps(
created=None, lastModified=last_modified
)

chart_type = chart_type_from_viz_type.get(chart_data.get("viz_type", ""))
chart_url = f"{self.config.display_uri}{chart_data.get('url', '')}"

Expand Down Expand Up @@ -504,16 +545,29 @@ def construct_chart_from_chart_data(self, chart_data: dict) -> ChartSnapshot:
type=chart_type,
description="",
title=title,
lastModified=last_modified,
chartUrl=chart_url,
inputs=[datasource_urn] if datasource_urn else None,
customProperties=custom_properties,
lastModified=change_audit_stamps,
)
chart_snapshot.aspects.append(chart_info)

chart_owners_list = self.build_owner_urn(chart_data)
owners_info = OwnershipClass(
owners=[
OwnerClass(
owner=urn,
type=OwnershipTypeClass.TECHNICAL_OWNER,
)
for urn in (chart_owners_list or [])
],
lastModified=last_modified,
)
chart_snapshot.aspects.append(owners_info)
return chart_snapshot

def emit_chart_mces(self) -> Iterable[MetadataWorkUnit]:
for chart_data in self.paginate_entity_api_results("chart", PAGE_SIZE):
for chart_data in self.paginate_entity_api_results("chart/", PAGE_SIZE):
try:
chart_snapshot = self.construct_chart_from_chart_data(chart_data)

Expand Down Expand Up @@ -583,6 +637,12 @@ def construct_dataset_from_dataset_data(
)
dataset_url = f"{self.config.display_uri}{dataset_response.get('result', {}).get('url', '')}"

modified_actor = f"urn:li:corpuser:{self.owner_info.get((dataset_data.get('changed_by') or {}).get('id', -1), 'unknown')}"
modified_ts = int(
dp.parse(dataset_data.get("changed_on_utc", "now")).timestamp() * 1000
)
last_modified = AuditStampClass(time=modified_ts, actor=modified_actor)

upstream_warehouse_platform = (
dataset_response.get("result", {}).get("database", {}).get("backend")
)
Expand Down Expand Up @@ -618,10 +678,8 @@ def construct_dataset_from_dataset_data(
dataset_info = DatasetPropertiesClass(
name=dataset.table_name,
description="",
lastModified=(
TimeStamp(time=dataset.modified_ts) if dataset.modified_ts else None
),
externalUrl=dataset_url,
lastModified=TimeStamp(time=modified_ts),
)
global_tags = GlobalTagsClass(tags=[TagAssociationClass(tag=tag_urn)])

Expand All @@ -640,12 +698,23 @@ def construct_dataset_from_dataset_data(
aspects=aspects_items,
)

logger.info(f"Constructed dataset {datasource_urn}")
dataset_owners_list = self.build_owner_urn(dataset_data)
owners_info = OwnershipClass(
owners=[
OwnerClass(
owner=urn,
type=OwnershipTypeClass.TECHNICAL_OWNER,
)
for urn in (dataset_owners_list or [])
],
lastModified=last_modified,
)
aspects_items.append(owners_info)

return dataset_snapshot

def emit_dataset_mces(self) -> Iterable[MetadataWorkUnit]:
for dataset_data in self.paginate_entity_api_results("dataset", PAGE_SIZE):
for dataset_data in self.paginate_entity_api_results("dataset/", PAGE_SIZE):
try:
dataset_snapshot = self.construct_dataset_from_dataset_data(
dataset_data
Expand Down
Loading

0 comments on commit 9e7f482

Please sign in to comment.