Skip to content

Commit

Permalink
chore(ingest): remove deprecated calls to Urn.create_from_string
Browse files Browse the repository at this point in the history
Also does some light refactoring in the patch builders.
  • Loading branch information
hsheth2 committed Nov 27, 2024
1 parent d9d6255 commit ceb8f5b
Show file tree
Hide file tree
Showing 22 changed files with 76 additions and 178 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from __future__ import annotations

import time
from pathlib import Path
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union

Expand All @@ -13,7 +12,6 @@
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.graph.client import DataHubGraph
from datahub.metadata.schema_classes import (
AuditStampClass,
DataProductAssociationClass,
DataProductPropertiesClass,
DomainsClass,
Expand Down Expand Up @@ -139,13 +137,6 @@ def __init__(self, **data):
else:
self._resolved_domain_urn = None

def _mint_auditstamp(self, message: str) -> AuditStampClass:
return AuditStampClass(
time=int(time.time() * 1000.0),
actor="urn:li:corpuser:datahub",
message=message,
)

def _mint_owner(self, owner: Union[str, Ownership]) -> OwnerClass:
if isinstance(owner, str):
return OwnerClass(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import json
import logging
import time
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Tuple, Union

Expand All @@ -22,7 +21,6 @@
from datahub.ingestion.extractor.schema_util import avro_schema_to_mce_fields
from datahub.ingestion.graph.client import DataHubGraph
from datahub.metadata.schema_classes import (
AuditStampClass,
DatasetPropertiesClass,
GlobalTagsClass,
GlossaryTermAssociationClass,
Expand Down Expand Up @@ -199,13 +197,6 @@ def platform_must_not_be_urn(cls, v):
return v[len("urn:li:dataPlatform:") :]
return v

def _mint_auditstamp(self, message: str) -> AuditStampClass:
return AuditStampClass(
time=int(time.time() * 1000.0),
actor="urn:li:corpuser:datahub",
message=message,
)

def _mint_owner(self, owner: Union[str, Ownership]) -> OwnerClass:
if isinstance(owner, str):
return OwnerClass(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def fqn(self) -> str:
return (
self.qualified_name
or self.id
or Urn.create_from_string(self.urn).get_entity_id()[0]
or Urn.from_string(self.urn).get_entity_id()[0]
)

@validator("urn", pre=True, always=True)
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/cli/put_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def platform(
"""

if name.startswith(f"urn:li:{DataPlatformUrn.ENTITY_TYPE}"):
platform_urn = DataPlatformUrn.create_from_string(name)
platform_urn = DataPlatformUrn.from_string(name)
platform_name = platform_urn.get_entity_id_as_string()
else:
platform_name = name.lower()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def _get_owner_urn(maybe_urn: str) -> str:

def _abort_if_non_existent_urn(graph: DataHubGraph, urn: str, operation: str) -> None:
try:
parsed_urn: Urn = Urn.create_from_string(urn)
parsed_urn: Urn = Urn.from_string(urn)
entity_type = parsed_urn.get_type()
except Exception:
click.secho(f"Provided urn {urn} does not seem valid", fg="red")
Expand Down
43 changes: 43 additions & 0 deletions metadata-ingestion/src/datahub/emitter/mcp_patch_builder.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
import json
import time
from collections import defaultdict
from dataclasses import dataclass
from typing import Any, Dict, Iterable, List, Optional, Sequence, Union

from datahub.emitter.aspect import JSON_PATCH_CONTENT_TYPE
from datahub.emitter.serialization_helper import pre_json_transform
from datahub.metadata.schema_classes import (
AuditStampClass,
ChangeTypeClass,
EdgeClass,
GenericAspectClass,
KafkaAuditHeaderClass,
MetadataChangeProposalClass,
SystemMetadataClass,
)
from datahub.metadata.urn import Urn
from datahub.utilities.urns.urn import guess_entity_type


Expand Down Expand Up @@ -89,3 +93,42 @@ def build(self) -> Iterable[MetadataChangeProposalClass]:
)
for aspect_name, patches in self.patches.items()
]

@classmethod
def _mint_auditstamp(cls, message: Optional[str] = None) -> AuditStampClass:
"""
Creates an AuditStampClass instance with the current timestamp and other default values.
Args:
message: The message associated with the audit stamp (optional).
Returns:
An instance of AuditStampClass.
"""
return AuditStampClass(
time=int(time.time() * 1000.0),
actor="urn:li:corpuser:datahub",
message=message,
)

@classmethod
def _ensure_urn_type(
cls, entity_type: str, edges: List[EdgeClass], context: str
) -> None:
"""
Ensures that the destination URNs in the given edges have the specified entity type.
Args:
entity_type: The entity type to check against.
edges: A list of Edge objects.
context: The context or description of the operation.
Raises:
ValueError: If any of the destination URNs is not of the specified entity type.
"""
for e in edges:
urn = Urn.from_string(e.destinationUrn)
if not urn.entity_type == entity_type:
raise ValueError(
f"{context}: {e.destinationUrn} is not of type {entity_type}"
)
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def from_string_name(cls, ref: str) -> "BigQueryTableRef":
@classmethod
def from_urn(cls, urn: str) -> "BigQueryTableRef":
"""Raises: ValueError if urn is not a valid BigQuery table URN."""
dataset_urn = DatasetUrn.create_from_string(urn)
dataset_urn = DatasetUrn.from_string(urn)
split = dataset_urn.name.rsplit(".", 3)
if len(split) == 3:
project, dataset, table = split
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:

is_resource_row: bool = not row["subresource"]
entity_urn = row["resource"]
entity_type = Urn.create_from_string(row["resource"]).get_type()
entity_type = Urn.from_string(row["resource"]).get_type()

term_associations: List[
GlossaryTermAssociationClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ def collapse_name(name: str, collapse_urns: CollapseUrns) -> str:
def collapse_urn(urn: str, collapse_urns: CollapseUrns) -> str:
if len(collapse_urns.urns_suffix_regex) == 0:
return urn
urn_obj = DatasetUrn.create_from_string(urn)
urn_obj = DatasetUrn.from_string(urn)
name = collapse_name(name=urn_obj.get_dataset_name(), collapse_urns=collapse_urns)
data_platform_urn = urn_obj.get_data_platform_urn()
return str(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def __init__(
def delete_entity(self, urn: str) -> None:
assert self.ctx.graph

entity_urn = Urn.create_from_string(urn)
entity_urn = Urn.from_string(urn)
self.report.num_soft_deleted_entity_removed += 1
self.report.num_soft_deleted_entity_removed_by_type[entity_urn.entity_type] = (
self.report.num_soft_deleted_entity_removed_by_type.get(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def handle_end_of_stream(
logger.debug("Generating tags")

for tag_association in self.processed_tags.values():
tag_urn = TagUrn.create_from_string(tag_association.tag)
tag_urn = TagUrn.from_string(tag_association.tag)
mcps.append(
MetadataChangeProposalWrapper(
entityUrn=tag_urn.urn(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def transform(
)
if transformed_aspect:
# for end of stream records, we modify the workunit-id
structured_urn = Urn.create_from_string(urn)
structured_urn = Urn.from_string(urn)
simple_name = "-".join(structured_urn.get_entity_id())
record_metadata = envelope.metadata.copy()
record_metadata.update(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def get_entity_name(assertion: BaseEntityAssertion) -> Tuple[str, str, str]:
if qualified_name is not None:
parts = qualified_name.split(".")
else:
urn_id = Urn.create_from_string(assertion.entity).entity_ids[1]
urn_id = Urn.from_string(assertion.entity).entity_ids[1]
parts = urn_id.split(".")
if len(parts) > 3:
parts = parts[-3:]
Expand Down
29 changes: 12 additions & 17 deletions metadata-ingestion/src/datahub/lite/duckdb_lite.py
Original file line number Diff line number Diff line change
Expand Up @@ -609,15 +609,15 @@ def get_typed_aspect(
aspect_map, DataPlatformInstanceClass
) # type: ignore

needs_platform = Urn.create_from_string(entity_urn).get_type() in [
needs_platform = Urn.from_string(entity_urn).get_type() in [
"dataset",
"container",
"chart",
"dashboard",
"dataFlow",
"dataJob",
]
entity_urn_parsed = Urn.create_from_string(entity_urn)
entity_urn_parsed = Urn.from_string(entity_urn)
if entity_urn_parsed.get_type() in ["dataFlow", "dataJob"]:
self.add_edge(
entity_urn,
Expand All @@ -630,15 +630,12 @@ def get_typed_aspect(
# this is a top-level entity
if not dpi:
logger.debug(f"No data platform instance for {entity_urn}")
maybe_parent_urn = Urn.create_from_string(entity_urn).get_entity_id()[0]
maybe_parent_urn = Urn.from_string(entity_urn).get_entity_id()[0]
needs_dpi = False
if maybe_parent_urn.startswith(Urn.URN_PREFIX):
parent_urn = maybe_parent_urn
if (
Urn.create_from_string(maybe_parent_urn).get_type()
== "dataPlatform"
):
data_platform_urn = DataPlatformUrn.create_from_string(
if Urn.from_string(maybe_parent_urn).get_type() == "dataPlatform":
data_platform_urn = DataPlatformUrn.from_string(
maybe_parent_urn
)
needs_dpi = True
Expand All @@ -660,7 +657,7 @@ def get_typed_aspect(
logger.error(f"Failed to generate edges entity {entity_urn}", e)
parent_urn = str(data_platform_instance_urn)
else:
data_platform_urn = DataPlatformUrn.create_from_string(dpi.platform)
data_platform_urn = DataPlatformUrn.from_string(dpi.platform)
data_platform_instance = dpi.instance or "default"
data_platform_instance_urn = Urn(
entity_type="dataPlatformInstance",
Expand All @@ -673,9 +670,7 @@ def get_typed_aspect(
parent_urn = "__root__"

types = (
subtypes.typeNames
if subtypes
else [Urn.create_from_string(entity_urn).get_type()]
subtypes.typeNames if subtypes else [Urn.from_string(entity_urn).get_type()]
)
for t in types:
type_urn = Urn(entity_type="systemNode", entity_id=[parent_urn, t])
Expand All @@ -686,7 +681,7 @@ def get_typed_aspect(
def _create_edges_from_data_platform_instance(
self, data_platform_instance_urn: Urn
) -> None:
data_platform_urn = DataPlatformUrn.create_from_string(
data_platform_urn = DataPlatformUrn.from_string(
data_platform_instance_urn.get_entity_id()[0]
)
data_platform_instances_urn = Urn(
Expand Down Expand Up @@ -735,7 +730,7 @@ def post_update_hook(
if isinstance(aspect, DatasetPropertiesClass):
dp: DatasetPropertiesClass = aspect
if dp.name:
specific_urn = DatasetUrn.create_from_string(entity_urn)
specific_urn = DatasetUrn.from_string(entity_urn)
if (
specific_urn.get_data_platform_urn().get_entity_id_as_string()
== "looker"
Expand All @@ -755,23 +750,23 @@ def post_update_hook(
self.add_edge(entity_urn, "name", cp.name, remove_existing=True)
elif isinstance(aspect, DataPlatformInstanceClass):
dpi: DataPlatformInstanceClass = aspect
data_platform_urn = DataPlatformUrn.create_from_string(dpi.platform)
data_platform_urn = DataPlatformUrn.from_string(dpi.platform)
data_platform_instance = dpi.instance or "default"
data_platform_instance_urn = Urn(
entity_type="dataPlatformInstance",
entity_id=[str(data_platform_urn), data_platform_instance],
)
self._create_edges_from_data_platform_instance(data_platform_instance_urn)
elif isinstance(aspect, ChartInfoClass):
urn = Urn.create_from_string(entity_urn)
urn = Urn.from_string(entity_urn)
self.add_edge(
entity_urn,
"name",
aspect.title + f" ({urn.get_entity_id()[-1]})",
remove_existing=True,
)
elif isinstance(aspect, DashboardInfoClass):
urn = Urn.create_from_string(entity_urn)
urn = Urn.from_string(entity_urn)
self.add_edge(
entity_urn,
"name",
Expand Down
39 changes: 0 additions & 39 deletions metadata-ingestion/src/datahub/specific/chart.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import time
from typing import Dict, List, Optional, Union

from datahub.emitter.mcp_patch_builder import MetadataPatchProposal
from datahub.metadata.schema_classes import (
AccessLevelClass,
AuditStampClass,
ChangeAuditStampsClass,
ChartInfoClass as ChartInfo,
ChartTypeClass,
Expand Down Expand Up @@ -47,43 +45,6 @@ def __init__(
)
self.ownership_patch_helper = OwnershipPatchHelper(self)

def _mint_auditstamp(self, message: Optional[str] = None) -> AuditStampClass:
"""
Creates an AuditStampClass instance with the current timestamp and other default values.
Args:
message: The message associated with the audit stamp (optional).
Returns:
An instance of AuditStampClass.
"""
return AuditStampClass(
time=int(time.time() * 1000.0),
actor="urn:li:corpuser:datahub",
message=message,
)

def _ensure_urn_type(
self, entity_type: str, edges: List[Edge], context: str
) -> None:
"""
Ensures that the destination URNs in the given edges have the specified entity type.
Args:
entity_type: The entity type to check against.
edges: A list of Edge objects.
context: The context or description of the operation.
Raises:
ValueError: If any of the destination URNs is not of the specified entity type.
"""
for e in edges:
urn = Urn.create_from_string(e.destinationUrn)
if not urn.get_type() == entity_type:
raise ValueError(
f"{context}: {e.destinationUrn} is not of type {entity_type}"
)

def add_owner(self, owner: Owner) -> "ChartPatchBuilder":
"""
Adds an owner to the ChartPatchBuilder.
Expand Down
Loading

0 comments on commit ceb8f5b

Please sign in to comment.