diff --git a/metadata-ingestion/src/datahub/emitter/mce_builder.py b/metadata-ingestion/src/datahub/emitter/mce_builder.py index 9fa060266a7ab8..6719b1ae71da2e 100644 --- a/metadata-ingestion/src/datahub/emitter/mce_builder.py +++ b/metadata-ingestion/src/datahub/emitter/mce_builder.py @@ -52,7 +52,13 @@ UpstreamLineageClass, _Aspect as AspectAbstract, ) -from datahub.metadata.urns import DataFlowUrn, DatasetUrn, TagUrn +from datahub.metadata.urns import ( + DataFlowUrn, + DataJobUrn, + DataPlatformUrn, + DatasetUrn, + TagUrn, +) from datahub.utilities.urn_encoder import UrnEncoder logger = logging.getLogger(__name__) @@ -119,7 +125,7 @@ def parse_ts_millis(ts: Optional[float]) -> Optional[datetime]: def make_data_platform_urn(platform: str) -> str: if platform.startswith("urn:li:dataPlatform:"): return platform - return f"urn:li:dataPlatform:{platform}" + return DataPlatformUrn.create_from_id(platform).urn() def make_dataset_urn(platform: str, name: str, env: str = DEFAULT_ENV) -> str: @@ -236,7 +242,7 @@ def make_user_urn(username: str) -> str: Makes a user urn if the input is not a user or group urn already """ return ( - f"urn:li:corpuser:{username}" + f"urn:li:corpuser:{UrnEncoder.encode_string(username)}" if not username.startswith(("urn:li:corpuser:", "urn:li:corpGroup:")) else username ) @@ -249,7 +255,7 @@ def make_group_urn(groupname: str) -> str: if groupname and groupname.startswith(("urn:li:corpGroup:", "urn:li:corpuser:")): return groupname else: - return f"urn:li:corpGroup:{groupname}" + return f"urn:li:corpGroup:{UrnEncoder.encode_string(groupname)}" def make_tag_urn(tag: str) -> str: @@ -301,7 +307,12 @@ def make_data_flow_urn( def make_data_job_urn_with_flow(flow_urn: str, job_id: str) -> str: - return f"urn:li:dataJob:({flow_urn},{job_id})" + data_flow_urn = DataFlowUrn.from_string(flow_urn) + data_job_urn = DataJobUrn.create_from_ids( + data_flow_urn=data_flow_urn, + job_id=job_id, + ) + return data_job_urn.urn() def make_data_process_instance_urn(dataProcessInstanceId: str) -> str: @@ -325,9 +336,9 @@ def make_dashboard_urn( ) -> str: # FIXME: dashboards don't currently include data platform urn prefixes. if platform_instance: - return f"urn:li:dashboard:({platform},{platform_instance}.{name})" + return f"urn:li:dashboard:({UrnEncoder.encode_string(platform)},{UrnEncoder.encode_string(platform_instance)}.{UrnEncoder.encode_string(name)})" else: - return f"urn:li:dashboard:({platform},{name})" + return f"urn:li:dashboard:({UrnEncoder.encode_string(platform)},{UrnEncoder.encode_string(name)})" def dashboard_urn_to_key(dashboard_urn: str) -> Optional[DashboardKeyClass]: @@ -343,9 +354,9 @@ def make_chart_urn( ) -> str: # FIXME: charts don't currently include data platform urn prefixes. if platform_instance: - return f"urn:li:chart:({platform},{platform_instance}.{name})" + return f"urn:li:chart:({UrnEncoder.encode_string(platform)},{UrnEncoder.encode_string(platform_instance)}.{UrnEncoder.encode_string(name)})" else: - return f"urn:li:chart:({platform},{name})" + return f"urn:li:chart:({UrnEncoder.encode_string(platform)},{UrnEncoder.encode_string(name)})" def chart_urn_to_key(chart_urn: str) -> Optional[ChartKeyClass]: diff --git a/metadata-ingestion/tests/unit/sdk/test_mce_builder.py b/metadata-ingestion/tests/unit/sdk/test_mce_builder.py index 3bdbf07bf28b7d..45122926de89b9 100644 --- a/metadata-ingestion/tests/unit/sdk/test_mce_builder.py +++ b/metadata-ingestion/tests/unit/sdk/test_mce_builder.py @@ -8,6 +8,9 @@ MetadataChangeEventClass, OwnershipClass, ) +from datahub.utilities.urn_encoder import RESERVED_CHARS_EXTENDED + +_RESERVED_CHARS_STRING = "".join(sorted(list(RESERVED_CHARS_EXTENDED))) def test_can_add_aspect(): @@ -25,15 +28,76 @@ def test_can_add_aspect(): assert not builder.can_add_aspect(dataset_mce, DataFlowInfoClass) -def test_create_dataset_urn_with_reserved_chars() -> None: +def test_create_urns_with_reserved_chars() -> None: + assert ( + builder.make_dataset_urn( + platform=f"platform){_RESERVED_CHARS_STRING}", + name=f"table{_RESERVED_CHARS_STRING}", + env=builder.DEFAULT_ENV, + ) + == "urn:li:dataset:(urn:li:dataPlatform:platform%29%%28%29%2C%E2%90%9F,table%%28%29%2C%E2%90%9F,PROD)" + ) assert ( builder.make_dataset_urn_with_platform_instance( - "platform)", - "table_(name)", - "platform,instance", - builder.DEFAULT_ENV, + platform=f"platform){_RESERVED_CHARS_STRING}", + name=f"table{_RESERVED_CHARS_STRING}", + platform_instance=f"platform-instance{_RESERVED_CHARS_STRING}", + env=builder.DEFAULT_ENV, + ) + == "urn:li:dataset:(urn:li:dataPlatform:platform%29%%28%29%2C%E2%90%9F,platform-instance%%28%29%2C%E2%90%9F.table%%28%29%2C%E2%90%9F,PROD)" + ) + assert ( + builder.make_data_platform_urn( + f"platform{_RESERVED_CHARS_STRING}", + ) + == "urn:li:dataPlatform:platform%%28%29%2C%E2%90%9F" + ) + assert ( + builder.make_data_flow_urn( + orchestrator=f"orchestrator{_RESERVED_CHARS_STRING}", + flow_id=f"flowid{_RESERVED_CHARS_STRING}", + cluster=f"cluster{_RESERVED_CHARS_STRING}", + platform_instance=f"platform{_RESERVED_CHARS_STRING}", + ) + == "urn:li:dataFlow:(orchestrator%%28%29%2C%E2%90%9F,platform%%28%29%2C%E2%90%9F.flowid%%28%29%2C%E2%90%9F,cluster%%28%29%2C%E2%90%9F)" + ) + assert ( + builder.make_data_job_urn( + orchestrator=f"orchestrator{_RESERVED_CHARS_STRING}", + flow_id=f"flowid{_RESERVED_CHARS_STRING}", + cluster=f"cluster{_RESERVED_CHARS_STRING}", + platform_instance=f"platform{_RESERVED_CHARS_STRING}", + job_id=f"job_name{_RESERVED_CHARS_STRING}", + ) + == "urn:li:dataJob:(urn:li:dataFlow:(orchestrator%%28%29%2C%E2%90%9F,platform%%28%29%2C%E2%90%9F.flowid%%28%29%2C%E2%90%9F,cluster%%28%29%2C%E2%90%9F),job_name%%28%29%2C%E2%90%9F)" + ) + assert ( + builder.make_user_urn( + username=f"user{_RESERVED_CHARS_STRING}", + ) + == "urn:li:corpuser:user%%28%29%2C%E2%90%9F" + ) + assert ( + builder.make_group_urn( + groupname=f"group{_RESERVED_CHARS_STRING}", + ) + == "urn:li:corpGroup:group%%28%29%2C%E2%90%9F" + ) + assert ( + builder.make_dashboard_urn( + platform=f"platform{_RESERVED_CHARS_STRING}", + name=f"dashboard{_RESERVED_CHARS_STRING}", + platform_instance=f"platform-instance{_RESERVED_CHARS_STRING}", + ) + == "urn:li:dashboard:(platform%%28%29%2C%E2%90%9F,platform-instance%%28%29%2C%E2%90%9F.dashboard%%28%29%2C%E2%90%9F)" + ) + assert ( + builder.make_chart_urn( + platform=f"platform{_RESERVED_CHARS_STRING}", + name=f"dashboard{_RESERVED_CHARS_STRING}", + platform_instance=f"platform-instance{_RESERVED_CHARS_STRING}", ) - == "urn:li:dataset:(urn:li:dataPlatform:platform%29,platform%2Cinstance.table_%28name%29,PROD)" + == "urn:li:chart:(platform%%28%29%2C%E2%90%9F,platform-instance%%28%29%2C%E2%90%9F.dashboard%%28%29%2C%E2%90%9F)" )