Skip to content

Commit

Permalink
Merge branch 'master' into docs/update-mlflow-doc
Browse files Browse the repository at this point in the history
  • Loading branch information
yoonhyejin authored Mar 5, 2025
2 parents b32d697 + a0319af commit e06c485
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 26 deletions.
2 changes: 1 addition & 1 deletion metadata-ingestion/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[build-system]
build-backend = "setuptools.build_meta"
requires = ["setuptools>=63.0.0", "wheel"]
requires = ["setuptools >= 71.1", "wheel"]

[tool.ruff.lint.isort]
section-order = ["future", "patch", "standard-library", "third-party", "first-party", "local-folder"]
Expand Down
30 changes: 30 additions & 0 deletions metadata-ingestion/scripts/avro_codegen.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,36 @@ def get_notebook_id(self) -> str:
"""
],
"tag": [_create_from_id.format(class_name="TagUrn")],
"chart": [
"""
@classmethod
def create_from_ids(
cls,
platform: str,
name: str,
platform_instance: Optional[str] = None,
) -> "ChartUrn":
return ChartUrn(
dashboard_tool=platform,
chart_id=f"{platform_instance}.{name}" if platform_instance else name,
)
"""
],
"dashboard": [
"""
@classmethod
def create_from_ids(
cls,
platform: str,
name: str,
platform_instance: Optional[str] = None,
) -> "DashboardUrn":
return DashboardUrn(
dashboard_tool=platform,
dashboard_id=f"{platform_instance}.{name}" if platform_instance else name,
)
"""
],
}


Expand Down
1 change: 0 additions & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,6 @@

mypy_stubs = {
"types-dataclasses",
"types-setuptools",
"types-six",
"types-python-dateutil",
# We need to avoid 2.31.0.5 and 2.31.0.4 due to
Expand Down
41 changes: 28 additions & 13 deletions metadata-ingestion/src/datahub/emitter/mce_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,15 @@
UpstreamLineageClass,
_Aspect as AspectAbstract,
)
from datahub.metadata.urns import DataFlowUrn, DatasetUrn, TagUrn
from datahub.metadata.urns import (
ChartUrn,
DashboardUrn,
DataFlowUrn,
DataJobUrn,
DataPlatformUrn,
DatasetUrn,
TagUrn,
)
from datahub.utilities.urn_encoder import UrnEncoder

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -119,7 +127,7 @@ def parse_ts_millis(ts: Optional[float]) -> Optional[datetime]:
def make_data_platform_urn(platform: str) -> str:
if platform.startswith("urn:li:dataPlatform:"):
return platform
return f"urn:li:dataPlatform:{platform}"
return DataPlatformUrn.create_from_id(platform).urn()


def make_dataset_urn(platform: str, name: str, env: str = DEFAULT_ENV) -> str:
Expand Down Expand Up @@ -236,7 +244,7 @@ def make_user_urn(username: str) -> str:
Makes a user urn if the input is not a user or group urn already
"""
return (
f"urn:li:corpuser:{username}"
f"urn:li:corpuser:{UrnEncoder.encode_string(username)}"
if not username.startswith(("urn:li:corpuser:", "urn:li:corpGroup:"))
else username
)
Expand All @@ -249,7 +257,7 @@ def make_group_urn(groupname: str) -> str:
if groupname and groupname.startswith(("urn:li:corpGroup:", "urn:li:corpuser:")):
return groupname
else:
return f"urn:li:corpGroup:{groupname}"
return f"urn:li:corpGroup:{UrnEncoder.encode_string(groupname)}"


def make_tag_urn(tag: str) -> str:
Expand Down Expand Up @@ -301,7 +309,12 @@ def make_data_flow_urn(


def make_data_job_urn_with_flow(flow_urn: str, job_id: str) -> str:
return f"urn:li:dataJob:({flow_urn},{job_id})"
data_flow_urn = DataFlowUrn.from_string(flow_urn)
data_job_urn = DataJobUrn.create_from_ids(
data_flow_urn=data_flow_urn.urn(),
job_id=job_id,
)
return data_job_urn.urn()


def make_data_process_instance_urn(dataProcessInstanceId: str) -> str:
Expand All @@ -324,10 +337,11 @@ def make_dashboard_urn(
platform: str, name: str, platform_instance: Optional[str] = None
) -> str:
# FIXME: dashboards don't currently include data platform urn prefixes.
if platform_instance:
return f"urn:li:dashboard:({platform},{platform_instance}.{name})"
else:
return f"urn:li:dashboard:({platform},{name})"
return DashboardUrn.create_from_ids(
platform=platform,
name=name,
platform_instance=platform_instance,
).urn()


def dashboard_urn_to_key(dashboard_urn: str) -> Optional[DashboardKeyClass]:
Expand All @@ -342,10 +356,11 @@ def make_chart_urn(
platform: str, name: str, platform_instance: Optional[str] = None
) -> str:
# FIXME: charts don't currently include data platform urn prefixes.
if platform_instance:
return f"urn:li:chart:({platform},{platform_instance}.{name})"
else:
return f"urn:li:chart:({platform},{name})"
return ChartUrn.create_from_ids(
platform=platform,
name=name,
platform_instance=platform_instance,
).urn()


def chart_urn_to_key(chart_urn: str) -> Optional[ChartKeyClass]:
Expand Down
6 changes: 3 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/metabase.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ def construct_dashboard_from_api_data(
return None

dashboard_urn = builder.make_dashboard_urn(
self.platform, dashboard_details.get("id", "")
self.platform, str(dashboard_details.get("id", ""))
)
dashboard_snapshot = DashboardSnapshot(
urn=dashboard_urn,
Expand All @@ -337,7 +337,7 @@ def construct_dashboard_from_api_data(
card_id = card_info.get("card").get("id", "")
if not card_id:
continue # most likely a virtual card without an id (text or heading), not relevant.
chart_urn = builder.make_chart_urn(self.platform, card_id)
chart_urn = builder.make_chart_urn(self.platform, str(card_id))
chart_urns.append(chart_urn)

dashboard_info_class = DashboardInfoClass(
Expand Down Expand Up @@ -459,7 +459,7 @@ def construct_card_from_api_data(self, card_data: dict) -> Optional[ChartSnapsho
)
return None

chart_urn = builder.make_chart_urn(self.platform, card_id)
chart_urn = builder.make_chart_urn(self.platform, str(card_id))
chart_snapshot = ChartSnapshot(
urn=chart_urn,
aspects=[],
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/ingestion/source/mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ def _browse_path_chart(
]

def _dashboard_urn(self, report_info: dict) -> str:
return builder.make_dashboard_urn(self.platform, report_info.get("id", ""))
return builder.make_dashboard_urn(self.platform, str(report_info.get("id", "")))

def _parse_last_run_at(self, report_info: dict) -> Optional[int]:
# Mode queries are refreshed, and that timestamp is reflected correctly here.
Expand Down
90 changes: 84 additions & 6 deletions metadata-ingestion/tests/unit/sdk/test_mce_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
MetadataChangeEventClass,
OwnershipClass,
)
from datahub.utilities.urn_encoder import RESERVED_CHARS_EXTENDED

_RESERVED_CHARS_STRING = "".join(sorted(list(RESERVED_CHARS_EXTENDED)))


def test_can_add_aspect():
Expand All @@ -25,15 +28,90 @@ def test_can_add_aspect():
assert not builder.can_add_aspect(dataset_mce, DataFlowInfoClass)


def test_create_dataset_urn_with_reserved_chars() -> None:
def test_create_urns_with_reserved_chars() -> None:
assert (
builder.make_dataset_urn(
platform=f"platform){_RESERVED_CHARS_STRING}",
name=f"table{_RESERVED_CHARS_STRING}",
env=builder.DEFAULT_ENV,
)
== "urn:li:dataset:(urn:li:dataPlatform:platform%29%%28%29%2C%E2%90%9F,table%%28%29%2C%E2%90%9F,PROD)"
)
assert (
builder.make_dataset_urn_with_platform_instance(
"platform)",
"table_(name)",
"platform,instance",
builder.DEFAULT_ENV,
platform=f"platform){_RESERVED_CHARS_STRING}",
name=f"table{_RESERVED_CHARS_STRING}",
platform_instance=f"platform-instance{_RESERVED_CHARS_STRING}",
env=builder.DEFAULT_ENV,
)
== "urn:li:dataset:(urn:li:dataPlatform:platform%29%%28%29%2C%E2%90%9F,platform-instance%%28%29%2C%E2%90%9F.table%%28%29%2C%E2%90%9F,PROD)"
)
assert (
builder.make_data_platform_urn(
f"platform{_RESERVED_CHARS_STRING}",
)
== "urn:li:dataPlatform:platform%%28%29%2C%E2%90%9F"
)
assert (
builder.make_data_flow_urn(
orchestrator=f"orchestrator{_RESERVED_CHARS_STRING}",
flow_id=f"flowid{_RESERVED_CHARS_STRING}",
cluster=f"cluster{_RESERVED_CHARS_STRING}",
platform_instance=f"platform{_RESERVED_CHARS_STRING}",
)
== "urn:li:dataFlow:(orchestrator%%28%29%2C%E2%90%9F,platform%%28%29%2C%E2%90%9F.flowid%%28%29%2C%E2%90%9F,cluster%%28%29%2C%E2%90%9F)"
)
assert (
builder.make_data_job_urn(
orchestrator=f"orchestrator{_RESERVED_CHARS_STRING}",
flow_id=f"flowid{_RESERVED_CHARS_STRING}",
cluster=f"cluster{_RESERVED_CHARS_STRING}",
platform_instance=f"platform{_RESERVED_CHARS_STRING}",
job_id=f"job_name{_RESERVED_CHARS_STRING}",
)
== "urn:li:dataJob:(urn:li:dataFlow:(orchestrator%%28%29%2C%E2%90%9F,platform%%28%29%2C%E2%90%9F.flowid%%28%29%2C%E2%90%9F,cluster%%28%29%2C%E2%90%9F),job_name%%28%29%2C%E2%90%9F)"
)
assert (
builder.make_user_urn(
username=f"user{_RESERVED_CHARS_STRING}",
)
== "urn:li:corpuser:user%%28%29%2C%E2%90%9F"
)
assert (
builder.make_group_urn(
groupname=f"group{_RESERVED_CHARS_STRING}",
)
== "urn:li:corpGroup:group%%28%29%2C%E2%90%9F"
)
assert (
builder.make_dashboard_urn(
platform=f"platform{_RESERVED_CHARS_STRING}",
name=f"dashboard{_RESERVED_CHARS_STRING}",
platform_instance=f"platform-instance{_RESERVED_CHARS_STRING}",
)
== "urn:li:dashboard:(platform%%28%29%2C%E2%90%9F,platform-instance%%28%29%2C%E2%90%9F.dashboard%%28%29%2C%E2%90%9F)"
)
assert (
builder.make_dashboard_urn(
platform=f"platform{_RESERVED_CHARS_STRING}",
name=f"dashboard{_RESERVED_CHARS_STRING}",
)
== "urn:li:dashboard:(platform%%28%29%2C%E2%90%9F,dashboard%%28%29%2C%E2%90%9F)"
)
assert (
builder.make_chart_urn(
platform=f"platform{_RESERVED_CHARS_STRING}",
name=f"dashboard{_RESERVED_CHARS_STRING}",
platform_instance=f"platform-instance{_RESERVED_CHARS_STRING}",
)
== "urn:li:chart:(platform%%28%29%2C%E2%90%9F,platform-instance%%28%29%2C%E2%90%9F.dashboard%%28%29%2C%E2%90%9F)"
)
assert (
builder.make_chart_urn(
platform=f"platform{_RESERVED_CHARS_STRING}",
name=f"dashboard{_RESERVED_CHARS_STRING}",
)
== "urn:li:dataset:(urn:li:dataPlatform:platform%29,platform%2Cinstance.table_%28name%29,PROD)"
== "urn:li:chart:(platform%%28%29%2C%E2%90%9F,dashboard%%28%29%2C%E2%90%9F)"
)


Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/tests/unit/test_packaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@
)
def test_datahub_version():
# Simply importing pkg_resources checks for unsatisfied dependencies.
import pkg_resources
import pkg_resources # type: ignore[import-untyped]

assert pkg_resources.get_distribution(datahub_version.__package_name__).version

0 comments on commit e06c485

Please sign in to comment.