Skip to content

Commit

Permalink
fix(ingestion): fixes producing some URNs with reserved characters
Browse files Browse the repository at this point in the history
  • Loading branch information
sgomezvillamor committed Mar 4, 2025
1 parent 5576b3c commit 0294f1f
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 15 deletions.
29 changes: 20 additions & 9 deletions metadata-ingestion/src/datahub/emitter/mce_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,13 @@
UpstreamLineageClass,
_Aspect as AspectAbstract,
)
from datahub.metadata.urns import DataFlowUrn, DatasetUrn, TagUrn
from datahub.metadata.urns import (
DataFlowUrn,
DataJobUrn,
DataPlatformUrn,
DatasetUrn,
TagUrn,
)
from datahub.utilities.urn_encoder import UrnEncoder

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -119,7 +125,7 @@ def parse_ts_millis(ts: Optional[float]) -> Optional[datetime]:
def make_data_platform_urn(platform: str) -> str:
if platform.startswith("urn:li:dataPlatform:"):
return platform
return f"urn:li:dataPlatform:{platform}"
return DataPlatformUrn.create_from_id(platform).urn()


def make_dataset_urn(platform: str, name: str, env: str = DEFAULT_ENV) -> str:
Expand Down Expand Up @@ -236,7 +242,7 @@ def make_user_urn(username: str) -> str:
Makes a user urn if the input is not a user or group urn already
"""
return (
f"urn:li:corpuser:{username}"
f"urn:li:corpuser:{UrnEncoder.encode_string(username)}"
if not username.startswith(("urn:li:corpuser:", "urn:li:corpGroup:"))
else username
)
Expand All @@ -249,7 +255,7 @@ def make_group_urn(groupname: str) -> str:
if groupname and groupname.startswith(("urn:li:corpGroup:", "urn:li:corpuser:")):
return groupname
else:
return f"urn:li:corpGroup:{groupname}"
return f"urn:li:corpGroup:{UrnEncoder.encode_string(groupname)}"


def make_tag_urn(tag: str) -> str:
Expand Down Expand Up @@ -301,7 +307,12 @@ def make_data_flow_urn(


def make_data_job_urn_with_flow(flow_urn: str, job_id: str) -> str:
return f"urn:li:dataJob:({flow_urn},{job_id})"
data_flow_urn = DataFlowUrn.from_string(flow_urn)
data_job_urn = DataJobUrn.create_from_ids(
data_flow_urn=data_flow_urn,
job_id=job_id,
)
return data_job_urn.urn()


def make_data_process_instance_urn(dataProcessInstanceId: str) -> str:
Expand All @@ -325,9 +336,9 @@ def make_dashboard_urn(
) -> str:
# FIXME: dashboards don't currently include data platform urn prefixes.
if platform_instance:
return f"urn:li:dashboard:({platform},{platform_instance}.{name})"
return f"urn:li:dashboard:({UrnEncoder.encode_string(platform)},{UrnEncoder.encode_string(platform_instance)}.{UrnEncoder.encode_string(name)})"
else:
return f"urn:li:dashboard:({platform},{name})"
return f"urn:li:dashboard:({UrnEncoder.encode_string(platform)},{UrnEncoder.encode_string(name)})"


def dashboard_urn_to_key(dashboard_urn: str) -> Optional[DashboardKeyClass]:
Expand All @@ -343,9 +354,9 @@ def make_chart_urn(
) -> str:
# FIXME: charts don't currently include data platform urn prefixes.
if platform_instance:
return f"urn:li:chart:({platform},{platform_instance}.{name})"
return f"urn:li:chart:({UrnEncoder.encode_string(platform)},{UrnEncoder.encode_string(platform_instance)}.{UrnEncoder.encode_string(name)})"
else:
return f"urn:li:chart:({platform},{name})"
return f"urn:li:chart:({UrnEncoder.encode_string(platform)},{UrnEncoder.encode_string(name)})"


def chart_urn_to_key(chart_urn: str) -> Optional[ChartKeyClass]:
Expand Down
76 changes: 70 additions & 6 deletions metadata-ingestion/tests/unit/sdk/test_mce_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
MetadataChangeEventClass,
OwnershipClass,
)
from datahub.utilities.urn_encoder import RESERVED_CHARS_EXTENDED

_RESERVED_CHARS_STRING = "".join(sorted(list(RESERVED_CHARS_EXTENDED)))


def test_can_add_aspect():
Expand All @@ -25,15 +28,76 @@ def test_can_add_aspect():
assert not builder.can_add_aspect(dataset_mce, DataFlowInfoClass)


def test_create_dataset_urn_with_reserved_chars() -> None:
def test_create_urns_with_reserved_chars() -> None:
assert (
builder.make_dataset_urn(
platform=f"platform){_RESERVED_CHARS_STRING}",
name=f"table{_RESERVED_CHARS_STRING}",
env=builder.DEFAULT_ENV,
)
== "urn:li:dataset:(urn:li:dataPlatform:platform%29%%28%29%2C%E2%90%9F,table%%28%29%2C%E2%90%9F,PROD)"
)
assert (
builder.make_dataset_urn_with_platform_instance(
"platform)",
"table_(name)",
"platform,instance",
builder.DEFAULT_ENV,
platform=f"platform){_RESERVED_CHARS_STRING}",
name=f"table{_RESERVED_CHARS_STRING}",
platform_instance=f"platform-instance{_RESERVED_CHARS_STRING}",
env=builder.DEFAULT_ENV,
)
== "urn:li:dataset:(urn:li:dataPlatform:platform%29%%28%29%2C%E2%90%9F,platform-instance%%28%29%2C%E2%90%9F.table%%28%29%2C%E2%90%9F,PROD)"
)
assert (
builder.make_data_platform_urn(
f"platform{_RESERVED_CHARS_STRING}",
)
== "urn:li:dataPlatform:platform%%28%29%2C%E2%90%9F"
)
assert (
builder.make_data_flow_urn(
orchestrator=f"orchestrator{_RESERVED_CHARS_STRING}",
flow_id=f"flowid{_RESERVED_CHARS_STRING}",
cluster=f"cluster{_RESERVED_CHARS_STRING}",
platform_instance=f"platform{_RESERVED_CHARS_STRING}",
)
== "urn:li:dataFlow:(orchestrator%%28%29%2C%E2%90%9F,platform%%28%29%2C%E2%90%9F.flowid%%28%29%2C%E2%90%9F,cluster%%28%29%2C%E2%90%9F)"
)
assert (
builder.make_data_job_urn(
orchestrator=f"orchestrator{_RESERVED_CHARS_STRING}",
flow_id=f"flowid{_RESERVED_CHARS_STRING}",
cluster=f"cluster{_RESERVED_CHARS_STRING}",
platform_instance=f"platform{_RESERVED_CHARS_STRING}",
job_id=f"job_name{_RESERVED_CHARS_STRING}",
)
== "urn:li:dataJob:(urn:li:dataFlow:(orchestrator%%28%29%2C%E2%90%9F,platform%%28%29%2C%E2%90%9F.flowid%%28%29%2C%E2%90%9F,cluster%%28%29%2C%E2%90%9F),job_name%%28%29%2C%E2%90%9F)"
)
assert (
builder.make_user_urn(
username=f"user{_RESERVED_CHARS_STRING}",
)
== "urn:li:corpuser:user%%28%29%2C%E2%90%9F"
)
assert (
builder.make_group_urn(
groupname=f"group{_RESERVED_CHARS_STRING}",
)
== "urn:li:corpGroup:group%%28%29%2C%E2%90%9F"
)
assert (
builder.make_dashboard_urn(
platform=f"platform{_RESERVED_CHARS_STRING}",
name=f"dashboard{_RESERVED_CHARS_STRING}",
platform_instance=f"platform-instance{_RESERVED_CHARS_STRING}",
)
== "urn:li:dashboard:(platform%%28%29%2C%E2%90%9F,platform-instance%%28%29%2C%E2%90%9F.dashboard%%28%29%2C%E2%90%9F)"
)
assert (
builder.make_chart_urn(
platform=f"platform{_RESERVED_CHARS_STRING}",
name=f"dashboard{_RESERVED_CHARS_STRING}",
platform_instance=f"platform-instance{_RESERVED_CHARS_STRING}",
)
== "urn:li:dataset:(urn:li:dataPlatform:platform%29,platform%2Cinstance.table_%28name%29,PROD)"
== "urn:li:chart:(platform%%28%29%2C%E2%90%9F,platform-instance%%28%29%2C%E2%90%9F.dashboard%%28%29%2C%E2%90%9F)"
)


Expand Down

0 comments on commit 0294f1f

Please sign in to comment.