Skip to content

Commit

Permalink
Merge branch 'master' into peter/column-level-lineage
Browse files Browse the repository at this point in the history
  • Loading branch information
PeteMango authored Mar 5, 2025
2 parents 4da3108 + 6998167 commit 285bcb5
Show file tree
Hide file tree
Showing 21 changed files with 882 additions and 219 deletions.
2 changes: 1 addition & 1 deletion metadata-ingestion/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[build-system]
build-backend = "setuptools.build_meta"
requires = ["setuptools>=63.0.0", "wheel"]
requires = ["setuptools >= 71.1", "wheel"]

[tool.ruff.lint.isort]
section-order = ["future", "patch", "standard-library", "third-party", "first-party", "local-folder"]
Expand Down
30 changes: 30 additions & 0 deletions metadata-ingestion/scripts/avro_codegen.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,36 @@ def get_notebook_id(self) -> str:
"""
],
"tag": [_create_from_id.format(class_name="TagUrn")],
"chart": [
"""
@classmethod
def create_from_ids(
cls,
platform: str,
name: str,
platform_instance: Optional[str] = None,
) -> "ChartUrn":
return ChartUrn(
dashboard_tool=platform,
chart_id=f"{platform_instance}.{name}" if platform_instance else name,
)
"""
],
"dashboard": [
"""
@classmethod
def create_from_ids(
cls,
platform: str,
name: str,
platform_instance: Optional[str] = None,
) -> "DashboardUrn":
return DashboardUrn(
dashboard_tool=platform,
dashboard_id=f"{platform_instance}.{name}" if platform_instance else name,
)
"""
],
}


Expand Down
1 change: 0 additions & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,6 @@

mypy_stubs = {
"types-dataclasses",
"types-setuptools",
"types-six",
"types-python-dateutil",
# We need to avoid 2.31.0.5 and 2.31.0.4 due to
Expand Down
41 changes: 28 additions & 13 deletions metadata-ingestion/src/datahub/emitter/mce_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,15 @@
UpstreamLineageClass,
_Aspect as AspectAbstract,
)
from datahub.metadata.urns import DataFlowUrn, DatasetUrn, TagUrn
from datahub.metadata.urns import (
ChartUrn,
DashboardUrn,
DataFlowUrn,
DataJobUrn,
DataPlatformUrn,
DatasetUrn,
TagUrn,
)
from datahub.utilities.urn_encoder import UrnEncoder

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -119,7 +127,7 @@ def parse_ts_millis(ts: Optional[float]) -> Optional[datetime]:
def make_data_platform_urn(platform: str) -> str:
if platform.startswith("urn:li:dataPlatform:"):
return platform
return f"urn:li:dataPlatform:{platform}"
return DataPlatformUrn.create_from_id(platform).urn()


def make_dataset_urn(platform: str, name: str, env: str = DEFAULT_ENV) -> str:
Expand Down Expand Up @@ -236,7 +244,7 @@ def make_user_urn(username: str) -> str:
Makes a user urn if the input is not a user or group urn already
"""
return (
f"urn:li:corpuser:{username}"
f"urn:li:corpuser:{UrnEncoder.encode_string(username)}"
if not username.startswith(("urn:li:corpuser:", "urn:li:corpGroup:"))
else username
)
Expand All @@ -249,7 +257,7 @@ def make_group_urn(groupname: str) -> str:
if groupname and groupname.startswith(("urn:li:corpGroup:", "urn:li:corpuser:")):
return groupname
else:
return f"urn:li:corpGroup:{groupname}"
return f"urn:li:corpGroup:{UrnEncoder.encode_string(groupname)}"


def make_tag_urn(tag: str) -> str:
Expand Down Expand Up @@ -301,7 +309,12 @@ def make_data_flow_urn(


def make_data_job_urn_with_flow(flow_urn: str, job_id: str) -> str:
return f"urn:li:dataJob:({flow_urn},{job_id})"
data_flow_urn = DataFlowUrn.from_string(flow_urn)
data_job_urn = DataJobUrn.create_from_ids(
data_flow_urn=data_flow_urn.urn(),
job_id=job_id,
)
return data_job_urn.urn()


def make_data_process_instance_urn(dataProcessInstanceId: str) -> str:
Expand All @@ -324,10 +337,11 @@ def make_dashboard_urn(
platform: str, name: str, platform_instance: Optional[str] = None
) -> str:
# FIXME: dashboards don't currently include data platform urn prefixes.
if platform_instance:
return f"urn:li:dashboard:({platform},{platform_instance}.{name})"
else:
return f"urn:li:dashboard:({platform},{name})"
return DashboardUrn.create_from_ids(
platform=platform,
name=name,
platform_instance=platform_instance,
).urn()


def dashboard_urn_to_key(dashboard_urn: str) -> Optional[DashboardKeyClass]:
Expand All @@ -342,10 +356,11 @@ def make_chart_urn(
platform: str, name: str, platform_instance: Optional[str] = None
) -> str:
# FIXME: charts don't currently include data platform urn prefixes.
if platform_instance:
return f"urn:li:chart:({platform},{platform_instance}.{name})"
else:
return f"urn:li:chart:({platform},{name})"
return ChartUrn.create_from_ids(
platform=platform,
name=name,
platform_instance=platform_instance,
).urn()


def chart_urn_to_key(chart_urn: str) -> Optional[ChartKeyClass]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,15 @@ class BIContainerSubTypes(StrEnum):
MODE_COLLECTION = "Collection"


class FlowContainerSubTypes(StrEnum):
MSSQL_JOB = "Job"
MSSQL_PROCEDURE_CONTAINER = "Procedures Container"


class JobContainerSubTypes(StrEnum):
NIFI_PROCESS_GROUP = "Process Group"
MSSQL_JOBSTEP = "Job Step"
MSSQL_STORED_PROCEDURE = "Stored Procedure"


class BIAssetSubTypes(StrEnum):
Expand Down
22 changes: 22 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/identity/okta.py
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,27 @@ def _map_okta_user_profile_to_username(
self.config.okta_profile_to_username_regex,
)

def _map_okta_user_profile_custom_properties(
self, profile: UserProfile
) -> Dict[str, str]:
# filter out the common fields that are already mapped to the CorpUserInfo aspect and the private ones
return {
k: str(v)
for k, v in profile.__dict__.items()
if v
and k
not in [
"displayName",
"firstName",
"lastName",
"email",
"title",
"countryCode",
"department",
]
and not k.startswith("_")
}

# Converts Okta User Profile into a CorpUserInfo.
def _map_okta_user_profile(self, profile: UserProfile) -> CorpUserInfoClass:
# TODO: Extract user's manager if provided.
Expand All @@ -683,6 +704,7 @@ def _map_okta_user_profile(self, profile: UserProfile) -> CorpUserInfoClass:
title=profile.title,
countryCode=profile.countryCode,
departmentName=profile.department,
customProperties=self._map_okta_user_profile_custom_properties(profile),
)

def _make_corp_group_urn(self, name: str) -> str:
Expand Down
6 changes: 3 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/metabase.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ def construct_dashboard_from_api_data(
return None

dashboard_urn = builder.make_dashboard_urn(
self.platform, dashboard_details.get("id", "")
self.platform, str(dashboard_details.get("id", ""))
)
dashboard_snapshot = DashboardSnapshot(
urn=dashboard_urn,
Expand All @@ -337,7 +337,7 @@ def construct_dashboard_from_api_data(
card_id = card_info.get("card").get("id", "")
if not card_id:
continue # most likely a virtual card without an id (text or heading), not relevant.
chart_urn = builder.make_chart_urn(self.platform, card_id)
chart_urn = builder.make_chart_urn(self.platform, str(card_id))
chart_urns.append(chart_urn)

dashboard_info_class = DashboardInfoClass(
Expand Down Expand Up @@ -459,7 +459,7 @@ def construct_card_from_api_data(self, card_data: dict) -> Optional[ChartSnapsho
)
return None

chart_urn = builder.make_chart_urn(self.platform, card_id)
chart_urn = builder.make_chart_urn(self.platform, str(card_id))
chart_snapshot = ChartSnapshot(
urn=chart_urn,
aspects=[],
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/ingestion/source/mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ def _browse_path_chart(
]

def _dashboard_urn(self, report_info: dict) -> str:
return builder.make_dashboard_urn(self.platform, report_info.get("id", ""))
return builder.make_dashboard_urn(self.platform, str(report_info.get("id", "")))

def _parse_last_run_at(self, report_info: dict) -> Optional[int]:
# Mode queries are refreshed, and that timestamp is reflected correctly here.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,17 @@
DatabaseKey,
SchemaKey,
)
from datahub.ingestion.source.common.subtypes import (
FlowContainerSubTypes,
JobContainerSubTypes,
)
from datahub.metadata.schema_classes import (
ContainerClass,
DataFlowInfoClass,
DataJobInfoClass,
DataJobInputOutputClass,
DataPlatformInstanceClass,
SubTypesClass,
)


Expand Down Expand Up @@ -211,6 +216,18 @@ def as_datajob_info_aspect(self) -> DataJobInfoClass:
status=self.status,
)

@property
def as_subtypes_aspect(self) -> SubTypesClass:
assert isinstance(self.entity, (JobStep, StoredProcedure))
type = (
JobContainerSubTypes.MSSQL_JOBSTEP
if isinstance(self.entity, JobStep)
else JobContainerSubTypes.MSSQL_STORED_PROCEDURE
)
return SubTypesClass(
typeNames=[type],
)

@property
def as_maybe_platform_instance_aspect(self) -> Optional[DataPlatformInstanceClass]:
if self.entity.flow.platform_instance:
Expand Down Expand Up @@ -276,6 +293,18 @@ def as_dataflow_info_aspect(self) -> DataFlowInfoClass:
externalUrl=self.external_url,
)

@property
def as_subtypes_aspect(self) -> SubTypesClass:
assert isinstance(self.entity, (MSSQLJob, MSSQLProceduresContainer))
type = (
FlowContainerSubTypes.MSSQL_JOB
if isinstance(self.entity, MSSQLJob)
else FlowContainerSubTypes.MSSQL_PROCEDURE_CONTAINER
)
return SubTypesClass(
typeNames=[type],
)

@property
def as_maybe_platform_instance_aspect(self) -> Optional[DataPlatformInstanceClass]:
if self.entity.platform_instance:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,11 @@ def construct_job_workunits(
aspect=data_job.as_datajob_info_aspect,
).as_workunit()

yield MetadataChangeProposalWrapper(
entityUrn=data_job.urn,
aspect=data_job.as_subtypes_aspect,
).as_workunit()

data_platform_instance_aspect = data_job.as_maybe_platform_instance_aspect
if data_platform_instance_aspect:
yield MetadataChangeProposalWrapper(
Expand Down Expand Up @@ -676,8 +681,6 @@ def construct_job_workunits(
),
).as_workunit()

# TODO: Add SubType when it appear

def construct_flow_workunits(
self,
data_flow: MSSQLDataFlow,
Expand All @@ -687,6 +690,11 @@ def construct_flow_workunits(
aspect=data_flow.as_dataflow_info_aspect,
).as_workunit()

yield MetadataChangeProposalWrapper(
entityUrn=data_flow.urn,
aspect=data_flow.as_subtypes_aspect,
).as_workunit()

data_platform_instance_aspect = data_flow.as_maybe_platform_instance_aspect
if data_platform_instance_aspect:
yield MetadataChangeProposalWrapper(
Expand All @@ -700,8 +708,6 @@ def construct_flow_workunits(
aspect=data_flow.as_container_aspect,
).as_workunit()

# TODO: Add SubType when it appear

def get_inspectors(self) -> Iterable[Inspector]:
# This method can be overridden in the case that you want to dynamically
# run on multiple databases.
Expand Down
Loading

0 comments on commit 285bcb5

Please sign in to comment.