Skip to content

Commit

Permalink
fix roundtripping of field types when using file-based schema
Browse files Browse the repository at this point in the history
  • Loading branch information
shirshanka committed Mar 3, 2025
1 parent 777b86f commit a293129
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,12 @@
io.acryl.dataManagement.deprecationDate: '2023-01-01'
io.acryl.dataManagement.certifier: urn:li:corpuser:john.doe@example.com
io.acryl.dataManagement.replicationSLA: 90.0
type: string
- id: url
structured_properties:
io.acryl.dataManagement.deprecationDate: '2023-01-01'
type: string
- id: locator.latitude
structured_properties:
io.acryl.dataManagement.deprecationDate: '2023-01-01'
type: float
structured_properties: # dataset level structured properties go here
io.acryl.privacy.retentionTime: 365
projectNames:
Expand Down
68 changes: 15 additions & 53 deletions metadata-ingestion/src/datahub/api/entities/dataset/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,20 @@ class SchemaFieldSpecification(StrictModel):
isPartitioningKey: Optional[bool] = None
jsonProps: Optional[dict] = None

def remove_type_metadata(self) -> "SchemaFieldSpecification":
"""
Removes type metadata from the schema field specification.
This is useful when syncing field metadata back to yaml when
the type information is already present in the schema file.
"""
self.type = None
self.nativeDataType = None
self.jsonPath = None
self.isPartitioningKey = None
self.isPartOfKey = None
self.jsonProps = None
return self

def with_structured_properties(
self,
structured_properties: Optional[
Expand Down Expand Up @@ -803,7 +817,7 @@ def _schema_from_schema_metadata(
if schema_fields and schema_specification.fields:
# Source was an avro schema, so we only include fields with structured properties, tags or glossary terms
schema_specification.fields = [
field
field.remove_type_metadata()
for field in schema_specification.fields
if field.structured_properties
or field.globalTags
Expand Down Expand Up @@ -873,7 +887,6 @@ def from_datahub(
direction=DataHubGraph.RelationshipDirection.INCOMING,
)
downstreams = [r.urn for r in related_downstreams]
breakpoint()

return Dataset( # type: ignore[arg-type]
id=dataset_urn.name,
Expand Down Expand Up @@ -942,57 +955,6 @@ def dict(self, **kwargs):

return result

def model_dump(self, **kwargs):
"""Custom model_dump to handle YAML serialization properly."""
exclude = kwargs.pop("exclude", set())

# If id and name are identical, exclude name from the output
if self.id == self.name and self.id is not None:
exclude.add("name")

# if subtype and subtypes are identical or subtypes is a singleton list, exclude subtypes from the output
if self.subtypes and len(self.subtypes) == 1:
self.subtype = self.subtypes[0]
exclude.add("subtypes")

# Check which method exists in the parent class
if hasattr(super(), "model_dump"):
# For Pydantic v2
result = super().model_dump(exclude=exclude, **kwargs) # type: ignore[misc]
elif hasattr(super(), "dict"):
# For Pydantic v1
result = super().dict(exclude=exclude, **kwargs) # type: ignore[misc]
else:
# Fallback to __dict__ if neither method exists
result = {k: v for k, v in self.__dict__.items() if k not in exclude}

# Custom handling for schema_metadata/schema
if self.schema_metadata and "schema" in result:
schema_data = result["schema"]

# Handle fields if they exist
if "fields" in schema_data and isinstance(schema_data["fields"], list):
# Process each field using its custom model_dump
processed_fields = []
if self.schema_metadata and self.schema_metadata.fields:
for field in self.schema_metadata.fields:
if field:
# Call the appropriate serialization method on each field
if hasattr(field, "model_dump"):
processed_field = field.model_dump(**kwargs)
elif hasattr(field, "dict"):
processed_field = field.dict(**kwargs)
else:
processed_field = {
k: v for k, v in field.__dict__.items()
}
processed_fields.append(processed_field)

# Replace the fields in the result with the processed ones
schema_data["fields"] = processed_fields

return result

def to_yaml(
self,
file: Path,
Expand Down

0 comments on commit a293129

Please sign in to comment.