From 2ea52b4df58f09be357bea88e8afd0516a2b90fa Mon Sep 17 00:00:00 2001 From: Shirshanka Das Date: Sun, 2 Mar 2025 15:51:23 -0800 Subject: [PATCH 01/16] dataset cli - add support for schema, round-tripping to yaml --- docs-website/sidebars.js | 2 +- docs/cli-commands/dataset.md | 263 +++++++++ docs/cli.md | 14 +- .../datahub/api/entities/dataset/dataset.py | 516 +++++++++++++++++- .../structuredproperties.py | 4 +- .../src/datahub/cli/specific/dataset_cli.py | 133 ++++- .../unit/cli/dataset/test_dataset_cmd.py | 219 ++++++++ .../cli/dataset/test_resources/dataset.yaml | 33 ++ .../cli/dataset_cmd/test_dataset_command.py | 264 +++++++++ .../structured_properties/test_dataset.yaml | 15 +- .../test_structured_properties.yaml | 19 + 11 files changed, 1422 insertions(+), 60 deletions(-) create mode 100644 docs/cli-commands/dataset.md create mode 100644 metadata-ingestion/tests/unit/cli/dataset/test_dataset_cmd.py create mode 100644 metadata-ingestion/tests/unit/cli/dataset/test_resources/dataset.yaml create mode 100644 smoke-test/tests/cli/dataset_cmd/test_dataset_command.py diff --git a/docs-website/sidebars.js b/docs-website/sidebars.js index b0289ec56b090..404f6f376ded8 100644 --- a/docs-website/sidebars.js +++ b/docs-website/sidebars.js @@ -742,7 +742,7 @@ module.exports = { type: "category", label: "DataHub CLI", link: { type: "doc", id: "docs/cli" }, - items: ["docs/datahub_lite"], + items: ["docs/cli-commands/dataset", "docs/datahub_lite"], }, { type: "category", diff --git a/docs/cli-commands/dataset.md b/docs/cli-commands/dataset.md new file mode 100644 index 0000000000000..31e39f47d5dda --- /dev/null +++ b/docs/cli-commands/dataset.md @@ -0,0 +1,263 @@ +# DataHub Dataset Command + +The `dataset` command allows you to interact with Dataset entities in DataHub. This includes creating, updating, retrieving, validating, and synchronizing Dataset metadata. + +## Commands + +### upsert + +Create or update Dataset metadata in DataHub. + +```shell +datahub dataset upsert -f PATH_TO_YAML_FILE +``` + +**Options:** +- `-f, --file` - Path to the YAML file containing Dataset metadata (required) + +**Example:** +```shell +datahub dataset upsert -f dataset.yaml +``` + +This command will parse the YAML file, validate that any entity references exist in DataHub, and then emit the corresponding metadata change proposals to update or create the Dataset. + +### sync + +Synchronize Dataset metadata between YAML files and DataHub. + +```shell +datahub dataset sync -f PATH_TO_YAML_FILE --to-datahub|--from-datahub +``` + +**Options:** +- `-f, --file` - Path to the YAML file (required) +- `--to-datahub` - Push metadata from YAML file to DataHub +- `--from-datahub` - Pull metadata from DataHub to YAML file + +**Example:** +```shell +# Push to DataHub +datahub dataset sync -f dataset.yaml --to-datahub + +# Pull from DataHub +datahub dataset sync -f dataset.yaml --from-datahub +``` + +The `sync` command offers bidirectional synchronization, allowing you to keep your local YAML files in sync with the DataHub platform. The `upsert` command actually uses `sync` with the `--to-datahub` flag internally. + +### get + +Retrieve Dataset metadata from DataHub and optionally write it to a file. + +```shell +datahub dataset get --urn DATASET_URN [--to-file OUTPUT_FILE] +``` + +**Options:** +- `--urn` - The Dataset URN to retrieve (required) +- `--to-file` - Path to write the Dataset metadata as YAML (optional) + +**Example:** +```shell +datahub dataset get --urn "urn:li:dataset:(urn:li:dataPlatform:hive,example_table,PROD)" --to-file my_dataset.yaml +``` + +If the URN does not start with `urn:li:dataset:`, it will be automatically prefixed. + +### file + +Operate on a Dataset YAML file for validation or linting. + +```shell +datahub dataset file [--lintCheck] [--lintFix] PATH_TO_YAML_FILE +``` + +**Options:** +- `--lintCheck` - Check the YAML file for formatting issues (optional) +- `--lintFix` - Fix formatting issues in the YAML file (optional) + +**Example:** +```shell +# Check for linting issues +datahub dataset file --lintCheck dataset.yaml + +# Fix linting issues +datahub dataset file --lintFix dataset.yaml +``` + +This command helps maintain consistent formatting of your Dataset YAML files. + +### add_sibling + +Add sibling relationships between Datasets. + +```shell +datahub dataset add_sibling --urn PRIMARY_URN --sibling-urns SECONDARY_URN [--sibling-urns ANOTHER_URN ...] +``` + +**Options:** +- `--urn` - URN of the primary Dataset (required) +- `--sibling-urns` - URNs of secondary sibling Datasets (required, multiple allowed) + +**Example:** +```shell +datahub dataset add_sibling --urn "urn:li:dataset:(urn:li:dataPlatform:hive,example_table,PROD)" --sibling-urns "urn:li:dataset:(urn:li:dataPlatform:snowflake,example_table,PROD)" +``` + +Siblings are semantically equivalent datasets, typically representing the same data across different platforms or environments. + +## Dataset YAML Format + +The Dataset YAML file follows a structured format with various supported fields: + +```yaml +# Basic identification (required) +id: "example_table" # Dataset identifier +platform: "hive" # Platform name +env: "PROD" # Environment (PROD by default) + +# Metadata (optional) +name: "Example Table" # Display name (defaults to id if not specified) +description: "This is an example table" + +# Schema definition (optional) +schema: + fields: + - id: "field1" # Field identifier + type: "string" # Data type + description: "First field" # Field description + doc: "First field" # Alias for description + nativeDataType: "VARCHAR" # Native platform type (defaults to type if not specified) + nullable: false # Whether field can be null (default: false) + jsonPath: "$.field1" # JSON path for the field + label: "Field One" # Display label + recursive: false # Whether field is recursive (default: false) + isPartOfKey: true # Whether field is part of primary key + isPartitioningKey: false # Whether field is a partitioning key + jsonProps: {"customProp": "value"} # Custom JSON properties + + - id: "field2" + type: "number" + description: "Second field" + nullable: true + globalTags: ["PII", "Sensitive"] + glossaryTerms: ["urn:li:glossaryTerm:Revenue"] + structured_properties: + property1: "value1" + property2: 42 + +# Additional metadata (all optional) +properties: # Custom properties as key-value pairs + origin: "external" + pipeline: "etl_daily" + +subtype: "View" # Dataset subtype +subtypes: ["View", "Materialized"] # Multiple subtypes (if only one, use subtype field instead) + +downstreams: # Downstream Dataset URNs + - "urn:li:dataset:(urn:li:dataPlatform:hive,downstream_table,PROD)" + +tags: # Tags + - "Tier1" + - "Verified" + +glossary_terms: # Associated glossary terms + - "urn:li:glossaryTerm:Revenue" + +owners: # Dataset owners + - "jdoe" # Simple format (defaults to TECHNICAL_OWNER) + - id: "alice" # Extended format with ownership type + type: "BUSINESS_OWNER" + +structured_properties: # Structured properties + priority: "P1" + cost_center: 123 + +external_url: "https://example.com/datasets/example_table" +``` + +You can also define multiple datasets in a single YAML file by using a list format: + +```yaml +- id: "dataset1" + platform: "hive" + description: "First dataset" + # other properties... + +- id: "dataset2" + platform: "snowflake" + description: "Second dataset" + # other properties... +``` + +### Schema Definition + +You can define Dataset schema in two ways: + +1. **Direct field definitions** as shown above + > **Important limitation**: When using inline schema field definitions, only non-nested (flat) fields are currently supported. For nested or complex schemas, you must use the Avro file approach described below. + +2. **Reference to an Avro schema file**: + ```yaml + schema: + file: "path/to/schema.avsc" + ``` + +Even when using the Avro file approach for the basic schema structure, you can still use the `fields` section to provide additional metadata like structured properties, tags, and glossary terms for your schema fields. + +#### Schema Field Properties + +The Schema Field object supports the following properties: + +| Property | Type | Description | +|----------|------|-------------| +| `id` | string | Field identifier/path (required if `urn` not provided) | +| `urn` | string | URN of the schema field (required if `id` not provided) | +| `type` | string | Data type (one of the supported field types) | +| `nativeDataType` | string | Native data type in the source platform (defaults to `type` if not specified) | +| `description` | string | Field description | +| `doc` | string | Alias for description | +| `nullable` | boolean | Whether the field can be null (default: false) | +| `jsonPath` | string | JSON path for the field | +| `label` | string | Display label for the field | +| `recursive` | boolean | Whether the field is recursive (default: false) | +| `isPartOfKey` | boolean | Whether the field is part of the primary key | +| `isPartitioningKey` | boolean | Whether the field is a partitioning key | +| `jsonProps` | object | Custom JSON properties | +| `globalTags` | array | List of tags associated with the field | +| `glossaryTerms` | array | List of glossary terms associated with the field | +| `structured_properties` | object | Structured properties for the field | + +### Ownership Types + +When specifying owners, the following ownership types are supported: +- `TECHNICAL_OWNER` (default) +- `BUSINESS_OWNER` +- `DATA_STEWARD` + +Custom ownership types can be specified using the URN format. + +### Field Types + +When defining schema fields, the following primitive types are supported: +- `string` +- `number` +- `int` +- `long` +- `float` +- `double` +- `boolean` +- `bytes` +- `fixed` + +## Implementation Notes + +- URNs are generated automatically if not provided, based on the platform, id, and env values +- The command performs validation to ensure referenced entities (like structured properties) exist +- When updating schema fields, changes are propagated correctly to maintain consistent metadata +- The Dataset object will check for existence of entity references and will skip datasets with missing references +- When using the `sync` command with `--from-datahub`, existing YAML files will be updated with metadata from DataHub while preserving comments and structure +- For structured properties, single values are simplified (not wrapped in lists) when appropriate +- Field paths are simplified for better readability +- When specifying field types, all fields must have type information or none of them should \ No newline at end of file diff --git a/docs/cli.md b/docs/cli.md index f332f77d9d21a..6bedb1f682512 100644 --- a/docs/cli.md +++ b/docs/cli.md @@ -404,21 +404,17 @@ datahub timeline --urn "urn:li:dataset:(urn:li:dataPlatform:mysql,User.UserAccou ### dataset (Dataset Entity) -The `dataset` command allows you to interact with the dataset entity. - -The `get` operation can be used to read in a dataset into a yaml file. +The `dataset` command allows you to interact with Dataset entities in DataHub, including creating, updating, retrieving, and validating Dataset metadata. ```shell -datahub dataset get --urn "$URN" --to-file "$FILE_NAME" -``` +# Get a dataset and write to YAML file +datahub dataset get --urn "urn:li:dataset:(urn:li:dataPlatform:hive,example_table,PROD)" --to-file dataset.yaml -The `upsert` operation can be used to create a new user or update an existing one. - -```shell +# Create or update dataset from YAML file datahub dataset upsert -f dataset.yaml ``` -An example of `dataset.yaml` would look like as in [dataset.yaml](https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/examples/cli_usage/dataset/dataset.yaml). +➡️ [Learn more about the dataset command](./cli-commands/dataset.md) ### user (User Entity) diff --git a/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py b/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py index bf824a11a77b5..37db755ec17a3 100644 --- a/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py +++ b/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py @@ -2,11 +2,13 @@ import logging import time from pathlib import Path -from typing import Dict, Iterable, List, Optional, Tuple, Union +from typing import Dict, Iterable, List, Literal, Optional, Tuple, Union, get_args -from pydantic import BaseModel, Field, validator +import yaml +from pydantic import BaseModel, Field, root_validator, validator from ruamel.yaml import YAML +import datahub.metadata.schema_classes as models from datahub.api.entities.structuredproperties.structuredproperties import AllowedTypes from datahub.configuration.common import ConfigModel from datahub.emitter.mce_builder import ( @@ -40,6 +42,7 @@ TagAssociationClass, UpstreamClass, ) +from datahub.metadata.urns import DataPlatformUrn, StructuredPropertyUrn, TagUrn, GlossaryTermUrn from datahub.specific.dataset import DatasetPatchBuilder from datahub.utilities.urns.dataset_urn import DatasetUrn @@ -47,36 +50,67 @@ logger = logging.getLogger(__name__) -class SchemaFieldSpecification(BaseModel): +class StrictModel(BaseModel): + class Config: + validate_assignment = True + extra = "forbid" + + +class SchemaFieldSpecification(StrictModel): id: Optional[str] = None urn: Optional[str] = None structured_properties: Optional[ - Dict[str, Union[str, float, List[Union[str, float]]]] + Dict[str, Union[float, str, List[Union[float, str]]]] ] = None type: Optional[str] = None nativeDataType: Optional[str] = None jsonPath: Union[None, str] = None - nullable: Optional[bool] = None + nullable: bool = False description: Union[None, str] = None + doc: Union[None, str] = None # doc is an alias for description label: Optional[str] = None created: Optional[dict] = None lastModified: Optional[dict] = None - recursive: Optional[bool] = None + recursive: bool = False globalTags: Optional[List[str]] = None glossaryTerms: Optional[List[str]] = None isPartOfKey: Optional[bool] = None isPartitioningKey: Optional[bool] = None jsonProps: Optional[dict] = None + def simplify_structured_properties(self) -> None: + if self.structured_properties: + # convert lists to single values if possible + for k, v in self.structured_properties.items(): + if isinstance(v, list): + v = [ + int(x) if isinstance(x, float) and x.is_integer() else x + for x in v + ] + if len(v) == 1: + self.structured_properties[k] = v[0] + else: + self.structured_properties[k] = v + else: + self.structured_properties[k] = ( + int(v) if v and isinstance(v, float) and v.is_integer() else v + ) + def with_structured_properties( self, - structured_properties: Optional[Dict[str, List[Union[str, float]]]], + structured_properties: Optional[Dict[str, List[Union[str, int, float]]]], ) -> "SchemaFieldSpecification": + def urn_strip(urn: str) -> str: + if urn.startswith("urn:li:structuredProperty:"): + return urn[len("urn:li:structuredProperty:") :] + return urn + self.structured_properties = ( - {k: v for k, v in structured_properties.items()} + {urn_strip(k): v for k, v in structured_properties.items()} if structured_properties else None ) + self.simplify_structured_properties() return self @classmethod @@ -85,10 +119,10 @@ def from_schema_field( ) -> "SchemaFieldSpecification": return SchemaFieldSpecification( id=Dataset._simplify_field_path(schema_field.fieldPath), - urn=make_schema_field_urn( - parent_urn, Dataset._simplify_field_path(schema_field.fieldPath) + urn=make_schema_field_urn(parent_urn, schema_field.fieldPath), + type=SchemaFieldSpecification._from_datahub_type( + schema_field.type, schema_field.nativeDataType ), - type=str(schema_field.type), nativeDataType=schema_field.nativeDataType, nullable=schema_field.nullable, description=schema_field.description, @@ -100,14 +134,13 @@ def from_schema_field( else None ), recursive=schema_field.recursive, - globalTags=( - schema_field.globalTags.__dict__ if schema_field.globalTags else None - ), - glossaryTerms=( - schema_field.glossaryTerms.__dict__ - if schema_field.glossaryTerms - else None - ), + globalTags=[TagUrn(tag.tag).name for tag in schema_field.globalTags.tags] + if schema_field.globalTags + else None, + glossaryTerms=[GlossaryTermUrn(term.urn).name for term in schema_field.glossaryTerms.terms] + if + schema_field.glossaryTerms + else None, isPartitioningKey=schema_field.isPartitioningKey, jsonProps=( json.loads(schema_field.jsonProps) if schema_field.jsonProps else None @@ -120,6 +153,114 @@ def either_id_or_urn_must_be_filled_out(cls, v, values): raise ValueError("Either id or urn must be present") return v + @root_validator(pre=True) + def sync_description_and_doc(cls, values) -> dict: + """Synchronize doc and description fields if one is provided but not the other.""" + description = values.get("description") + doc = values.get("doc") + + if description is not None and doc is None: + values["doc"] = description + elif doc is not None and description is None: + values["description"] = doc + + return values + + def get_datahub_type(self) -> models.SchemaFieldDataTypeClass: + PrimitiveType = Literal[ + "string", + "number", + "int", + "long", + "float", + "double", + "boolean", + "bytes", + "fixed", + ] + type = self.type.lower() + if type not in set(get_args(PrimitiveType)): + raise ValueError(f"Type {self.type} is not a valid primitive type") + + if type == "string": + return models.SchemaFieldDataTypeClass(type=models.StringTypeClass()) + elif type in ["number", "long", "float", "double", "int"]: + return models.SchemaFieldDataTypeClass(type=models.NumberTypeClass()) + elif type == "fixed": + return models.SchemaFieldDataTypeClass(type=models.FixedTypeClass()) + elif type == "bytes": + return models.SchemaFieldDataTypeClass(type=models.BytesTypeClass()) + elif type == "boolean": + return models.SchemaFieldDataTypeClass(type=models.BooleanTypeClass()) + + raise ValueError(f"Type {self.type} is not a valid primitive type") + + @staticmethod + def _from_datahub_type( + input_type: models.SchemaFieldDataTypeClass, native_data_type: str + ) -> str: + if isinstance(input_type.type, models.StringTypeClass): + return "string" + elif isinstance(input_type.type, models.NumberTypeClass): + if native_data_type in ["long", "float", "double", "int"]: + return native_data_type + return "number" + elif isinstance(input_type.type, models.FixedTypeClass): + return "fixed" + elif isinstance(input_type.type, models.BytesTypeClass): + return "bytes" + elif isinstance(input_type.type, models.BooleanTypeClass): + return "boolean" + raise ValueError(f"Type {input_type} is not a valid primitive type") + + def dict(self, **kwargs): + """Custom dict method for Pydantic v1 to handle YAML serialization properly.""" + exclude = kwargs.pop("exclude", None) or set() + + # If description and doc are identical, exclude doc from the output + if self.description == self.doc and self.description is not None: + exclude.add("doc") + + # if nativeDataType and type are identical, exclude nativeDataType from the output + if self.nativeDataType == self.type and self.nativeDataType is not None: + exclude.add("nativeDataType") + + # if the id is the same as the urn's fieldPath, exclude id from the output + from datahub.metadata.urns import SchemaFieldUrn + + if self.urn: + field_urn = SchemaFieldUrn.from_string(self.urn) + if field_urn.field_path == self.id: + exclude.add("urn") + + kwargs.pop("exclude_defaults", None) + + self.simplify_structured_properties() + + return super().dict(exclude=exclude, exclude_defaults=True, **kwargs) + + def model_dump(self, **kwargs): + """Custom model_dump to handle YAML serialization properly.""" + exclude = kwargs.pop("exclude", set()) + + # If description and doc are identical, exclude doc from the output + if self.description == self.doc and self.description is not None: + exclude.add("doc") + + # if nativeDataType and type are identical, exclude nativeDataType from the output + if self.nativeDataType == self.type and self.nativeDataType is not None: + exclude.add("nativeDataType") + + # if the id is the same as the urn's fieldPath, exclude id from the output + from datahub.metadata.urns import SchemaFieldUrn + + if self.urn: + field_urn = SchemaFieldUrn.from_string(self.urn) + if field_urn.field_path == self.id: + exclude.add("urn") + + return super().model_dump(exclude=exclude, exclude_defaults=True, **kwargs) + class SchemaSpecification(BaseModel): file: Optional[str] = None @@ -148,7 +289,7 @@ class StructuredPropertyValue(ConfigModel): lastModified: Optional[str] = None -class Dataset(BaseModel): +class Dataset(StrictModel): id: Optional[str] = None platform: Optional[str] = None env: str = "PROD" @@ -221,6 +362,13 @@ def _mint_owner(self, owner: Union[str, Ownership]) -> OwnerClass: typeUrn=ownership_type_urn, ) + @staticmethod + def get_patch_builder(urn: str) -> DatasetPatchBuilder: + return DatasetPatchBuilder(urn) + + def patch_builder(self) -> DatasetPatchBuilder: + return DatasetPatchBuilder(self.urn) + @classmethod def from_yaml(cls, file: str) -> Iterable["Dataset"]: with open(file) as fp: @@ -230,8 +378,25 @@ def from_yaml(cls, file: str) -> Iterable["Dataset"]: datasets = [datasets] for dataset_raw in datasets: dataset = Dataset.parse_obj(dataset_raw) + # dataset = Dataset.model_validate(dataset_raw, strict=True) yield dataset + def entity_references(self) -> List[str]: + urn_prefix = f"{StructuredPropertyUrn.URN_PREFIX}:{StructuredPropertyUrn.LI_DOMAIN}:{StructuredPropertyUrn.ENTITY_TYPE}" + references = [] + if self.schema_metadata: + for field in self.schema_metadata.fields: + if field.structured_properties: + references.extend( + [ + f"{urn_prefix}:{prop_key}" + if not prop_key.startswith(urn_prefix) + else prop_key + for prop_key in field.structured_properties.keys() + ] + ) + return references + def generate_mcp( self, ) -> Iterable[Union[MetadataChangeProposalClass, MetadataChangeProposalWrapper]]: @@ -264,6 +429,60 @@ def generate_mcp( yield mcp if self.schema_metadata.fields: + field_type_info_present = any( + field.type for field in self.schema_metadata.fields + ) + all_fields_type_info_present = all( + field.type for field in self.schema_metadata.fields + ) + if field_type_info_present and not all_fields_type_info_present: + raise ValueError( + "Either all fields must have type information or none of them should" + ) + if all_fields_type_info_present: + update_technical_schema = True + else: + update_technical_schema = False + if update_technical_schema and not self.schema_metadata.file: + # We produce a schema metadata aspect only if we have type information + # and a schema file is not provided. + schema_metadata = SchemaMetadataClass( + schemaName=self.name or self.id or self.urn or "", + platform=self.platform_urn, + version=0, + hash="", + fields=[ + SchemaFieldClass( + fieldPath=field.id, + type=field.get_datahub_type(), + nativeDataType=field.nativeDataType or field.type, + nullable=field.nullable, + description=field.description, + label=field.label, + created=field.created, + lastModified=field.lastModified, + recursive=field.recursive, + globalTags=field.globalTags, + glossaryTerms=field.glossaryTerms, + isPartOfKey=field.isPartOfKey, + isPartitioningKey=field.isPartitioningKey, + jsonProps=field.jsonProps, + ) + for field in self.schema_metadata.fields + ], + platformSchema=OtherSchemaClass( + rawSchema=yaml.dump( + self.schema_metadata.dict( + exclude_none=True, exclude_unset=True + ) + ) + ), + ) + mcp = MetadataChangeProposalWrapper( + entityUrn=self.urn, aspect=schema_metadata + ) + yield mcp + for field in self.schema_metadata.fields: field_urn = field.urn or make_schema_field_urn( self.urn, # type: ignore[arg-type] @@ -299,12 +518,15 @@ def generate_mcp( yield mcp if field.structured_properties: + urn_prefix = f"{StructuredPropertyUrn.URN_PREFIX}:{StructuredPropertyUrn.LI_DOMAIN}:{StructuredPropertyUrn.ENTITY_TYPE}" mcp = MetadataChangeProposalWrapper( entityUrn=field_urn, aspect=StructuredPropertiesClass( properties=[ StructuredPropertyValueAssignmentClass( - propertyUrn=f"urn:li:structuredProperty:{prop_key}", + propertyUrn=f"{urn_prefix}:{prop_key}" + if not prop_key.startswith(urn_prefix) + else prop_key, values=( prop_value if isinstance(prop_value, list) @@ -486,6 +708,8 @@ def extract_owners_if_exists( @classmethod def from_datahub(cls, graph: DataHubGraph, urn: str) -> "Dataset": + dataset_urn = DatasetUrn.from_string(urn) + platform_urn = DataPlatformUrn.from_string(dataset_urn.platform) dataset_properties: Optional[DatasetPropertiesClass] = graph.get_aspect( urn, DatasetPropertiesClass ) @@ -508,7 +732,10 @@ def from_datahub(cls, graph: DataHubGraph, urn: str) -> "Dataset": else: structured_properties_map[sp.propertyUrn] = sp.values - return Dataset( # type: ignore[call-arg] + from datahub.metadata.urns import TagUrn, GlossaryTermUrn + return Dataset( # type: ignore[arg-type] + id=dataset_urn.name, + platform=platform_urn.platform_name, urn=urn, description=( dataset_properties.description @@ -521,9 +748,9 @@ def from_datahub(cls, graph: DataHubGraph, urn: str) -> "Dataset": else None ), schema=Dataset._schema_from_schema_metadata(graph, urn), - tags=[tag.tag for tag in tags.tags] if tags else None, + tags=[TagUrn(tag.tag).name for tag in tags.tags] if tags else None, glossary_terms=( - [term.urn for term in glossary_terms.terms] if glossary_terms else None + [GlossaryTermUrn(term.urn).name for term in glossary_terms.terms] if glossary_terms else None ), owners=yaml_owners, properties=( @@ -535,12 +762,243 @@ def from_datahub(cls, graph: DataHubGraph, urn: str) -> "Dataset": ), ) + def dict(self, **kwargs): + """Custom dict method for Pydantic v1 to handle YAML serialization properly.""" + exclude = kwargs.pop("exclude", set()) + + # If id and name are identical, exclude name from the output + if self.id == self.name and self.id is not None: + exclude.add("name") + + # if subtype and subtypes are identical or subtypes is a singleton list, exclude subtypes from the output + if self.subtypes and len(self.subtypes) == 1: + self.subtype = self.subtypes[0] + exclude.add("subtypes") + + result = super().dict(exclude=exclude, **kwargs) + + # Custom handling for schema_metadata/schema + if self.schema_metadata and "schema" in result: + schema_data = result["schema"] + + # Handle fields if they exist + if "fields" in schema_data and isinstance(schema_data["fields"], list): + # Process each field using its custom dict method + processed_fields = [] + for field in self.schema_metadata.fields: + if field: + # Use dict method for Pydantic v1 + processed_field = field.dict(**kwargs) + processed_fields.append(processed_field) + + # Replace the fields in the result with the processed ones + schema_data["fields"] = processed_fields + + return result + + def model_dump(self, **kwargs): + """Custom model_dump to handle YAML serialization properly.""" + exclude = kwargs.pop("exclude", set()) + + # If id and name are identical, exclude name from the output + if self.id == self.name and self.id is not None: + exclude.add("name") + + # if subtype and subtypes are identical or subtypes is a singleton list, exclude subtypes from the output + if self.subtypes and len(self.subtypes) == 1: + self.subtype = self.subtypes[0] + exclude.add("subtypes") + + # Check which method exists in the parent class + if hasattr(super(), "model_dump"): + # For Pydantic v2 + result = super().model_dump(exclude=exclude, **kwargs) + elif hasattr(super(), "dict"): + # For Pydantic v1 + result = super().dict(exclude=exclude, **kwargs) + else: + # Fallback to __dict__ if neither method exists + result = {k: v for k, v in self.__dict__.items() if k not in exclude} + + # Custom handling for schema_metadata/schema + if self.schema_metadata and "schema" in result: + schema_data = result["schema"] + + # Handle fields if they exist + if "fields" in schema_data and isinstance(schema_data["fields"], list): + # Process each field using its custom model_dump + processed_fields = [] + for field in self.schema_metadata.fields: + if field: + # Call the appropriate serialization method on each field + if hasattr(field, "model_dump"): + processed_field = field.model_dump(**kwargs) + elif hasattr(field, "dict"): + processed_field = field.dict(**kwargs) + else: + processed_field = {k: v for k, v in field.__dict__.items()} + processed_fields.append(processed_field) + + # Replace the fields in the result with the processed ones + schema_data["fields"] = processed_fields + + return result + def to_yaml( self, file: Path, - ) -> None: + ) -> bool: + """ + Write model to YAML file only if content has changed. + Preserves comments and structure of the existing YAML file. + Returns True if file was written, False if no changes were detected. + """ + # Create new model data + # Create new model data with dict() for Pydantic v1 + new_data = self.dict(exclude_none=True, exclude_unset=True, by_alias=True) + + # Set up ruamel.yaml for preserving comments + yaml_handler = YAML(typ="rt") # round-trip mode + yaml_handler.default_flow_style = False + yaml_handler.preserve_quotes = True + yaml_handler.indent(mapping=2, sequence=2, offset=0) + + if file.exists(): + try: + # Load existing data with comments preserved + with open(file, "r") as fp: + existing_data = yaml_handler.load(fp) + + # Determine if the file contains a list or a single document + if isinstance(existing_data, list): + # Handle list case + updated = False + identifier = "urn" + model_id = self.urn + + if model_id is not None: + # Try to find and update existing item + for item in existing_data: + item_identifier = item.get(identifier, Dataset(**item).urn) + if item_identifier == model_id: + # Found the item to update - preserve structure while updating values + updated = True + _update_dict_preserving_comments( + item, new_data, ["urn", "properties"] + ) + break + + if not updated: + # Item not found, append to the list + existing_data.append(new_data) + updated = True + + # If no update was needed, return early + if not updated: + return False + + # Write the updated data back + with open(file, "w") as fp: + yaml_handler.dump(existing_data, fp) + + else: + # Handle single document case + if _dict_equal(existing_data, new_data, ["urn"]): + return False # No changes needed + + # Update the existing document while preserving comments + _update_dict_preserving_comments(existing_data, new_data, ["urn"]) + + # Write the updated data back + with open(file, "w") as fp: + yaml_handler.dump(existing_data, fp) + + return True + + except Exception as e: + # If there's any error, we'll create a new file + print( + f"Error processing existing file {file}: {e}. Will create a new one." + ) + else: + # File doesn't exist or had errors - create a new one with default settings + yaml_handler.indent(mapping=2, sequence=2, offset=0) + + file.parent.mkdir(parents=True, exist_ok=True) + with open(file, "w") as fp: - yaml = YAML(typ="rt") # default, if not specfied, is 'rt' (round-trip) - yaml.indent(mapping=2, sequence=4, offset=2) - yaml.default_flow_style = False - yaml.dump(self.dict(exclude_none=True, exclude_unset=True), fp) + yaml_handler.dump(new_data, fp) + + return True + + +def _update_dict_preserving_comments( + target: Dict, source: Dict, optional_fields: List[str] = ["urn"] +) -> None: + """ + Updates a target dictionary with values from source, preserving comments and structure. + This modifies the target dictionary in-place. + """ + # For each key in the source dict + for key, value in source.items(): + if key in target: + if isinstance(value, dict) and isinstance(target[key], dict): + # Recursively update nested dictionaries + _update_dict_preserving_comments(target[key], value) + else: + # Update scalar or list values + # If target value is an int, and source value is a float that is equal to the int, convert to int + if isinstance(value, float) and int(value) == value: + target[key] = int(value) + else: + target[key] = value + elif key not in optional_fields: + # Add new keys + target[key] = value + + # Remove keys that are in target but not in source + keys_to_remove = [k for k in target if k not in source] + for key in keys_to_remove: + del target[key] + + +def _dict_equal(dict1: Dict, dict2: Dict, optional_keys: List[str]) -> bool: + """ + Compare two dictionaries for equality, ignoring ruamel.yaml's metadata. + """ + + if len(dict1) != len(dict2): + # Check if the difference is only in optional keys + if len(dict1) > len(dict2): + for key in optional_keys: + if key in dict1 and key not in dict2: + del dict1[key] + elif len(dict2) > len(dict1): + for key in optional_keys: + if key in dict2 and key not in dict1: + del dict2[key] + if len(dict1) != len(dict2): + return False + + for key, value in dict1.items(): + if key not in dict2: + return False + + if isinstance(value, dict) and isinstance(dict2[key], dict): + if not _dict_equal(value, dict2[key], optional_keys): + return False + elif isinstance(value, list) and isinstance(dict2[key], list): + if len(value) != len(dict2[key]): + return False + + # Check list items (simplified for brevity) + for i in range(len(value)): + if isinstance(value[i], dict) and isinstance(dict2[key][i], dict): + if not _dict_equal(value[i], dict2[key][i], optional_keys): + return False + elif value[i] != dict2[key][i]: + return False + elif value != dict2[key]: + return False + + return True diff --git a/metadata-ingestion/src/datahub/api/entities/structuredproperties/structuredproperties.py b/metadata-ingestion/src/datahub/api/entities/structuredproperties/structuredproperties.py index b0b434751ad2c..25bd13ba07732 100644 --- a/metadata-ingestion/src/datahub/api/entities/structuredproperties/structuredproperties.py +++ b/metadata-ingestion/src/datahub/api/entities/structuredproperties/structuredproperties.py @@ -1,7 +1,7 @@ import logging from enum import Enum from pathlib import Path -from typing import Iterable, List, Optional +from typing import Iterable, List, Optional, Union import yaml from pydantic import validator @@ -38,7 +38,7 @@ def values(): class AllowedValue(ConfigModel): - value: str + value: Union[int, float, str] description: Optional[str] = None diff --git a/metadata-ingestion/src/datahub/cli/specific/dataset_cli.py b/metadata-ingestion/src/datahub/cli/specific/dataset_cli.py index 5601d7e716c79..b54669498ee3b 100644 --- a/metadata-ingestion/src/datahub/cli/specific/dataset_cli.py +++ b/metadata-ingestion/src/datahub/cli/specific/dataset_cli.py @@ -1,5 +1,7 @@ import json import logging +import os +import shutil from pathlib import Path from typing import Set, Tuple @@ -30,18 +32,9 @@ def dataset() -> None: @telemetry.with_telemetry() def upsert(file: Path) -> None: """Upsert attributes to a Dataset in DataHub.""" - - with get_default_graph() as graph: - for dataset in Dataset.from_yaml(str(file)): - try: - for mcp in dataset.generate_mcp(): - graph.emit(mcp) - click.secho(f"Update succeeded for urn {dataset.urn}.", fg="green") - except Exception as e: - click.secho( - f"Update failed for id {id}. due to {e}", - fg="red", - ) + # Call the sync command with to_datahub=True to perform the upsert operation + ctx = click.get_current_context() + ctx.invoke(sync, file=str(file), to_datahub=True) @dataset.command( @@ -111,3 +104,119 @@ def _get_existing_siblings(graph: DataHubGraph, urn: str) -> Set[str]: return set(existing.siblings) else: return set() + + +@dataset.command( + name="file", +) +@click.option("--lintCheck", required=False, is_flag=True) +@click.option("--lintFix", required=False, is_flag=True) +@click.argument("file", type=click.Path(exists=True)) +@upgrade.check_upgrade +@telemetry.with_telemetry() +def file(lintcheck: bool, lintfix: bool, file: str) -> None: + """Operate on a Dataset file""" + + if lintcheck or lintfix: + import tempfile + from pathlib import Path + + # Create a temporary file in a secure way + # The file will be automatically deleted when the context manager exits + with tempfile.NamedTemporaryFile(suffix=".yml", delete=False) as temp: + temp_path = Path(temp.name) + try: + # Copy content to the temporary file + shutil.copyfile(file, temp_path) + + # Run the linting + datasets = Dataset.from_yaml(temp_path) + for dataset in datasets: + dataset.to_yaml(temp_path) + + # Compare the files + import filecmp + + files_match = filecmp.cmp(file, temp_path) + + if files_match: + click.secho("No differences found", fg="green") + else: + # Show diff for visibility + os.system(f"diff {file} {temp_path}") + + if lintfix: + shutil.copyfile(temp_path, file) + click.secho(f"Fixed linting issues in {file}", fg="green") + else: + click.secho( + f"To fix these differences, run 'datahub dataset file --lintFix {file}'", + fg="yellow", + ) + finally: + # Ensure the temporary file is removed + if temp_path.exists(): + temp_path.unlink() + else: + click.secho( + "No operation specified. Choose from --lintCheck or --lintFix", fg="yellow" + ) + + +@dataset.command( + name="sync", +) +@click.option("-f", "--file", required=True, type=click.Path(exists=True)) +@click.option("--to-datahub/--from-datahub", required=True, is_flag=True) +@upgrade.check_upgrade +@telemetry.with_telemetry() +def sync(file: str, to_datahub: bool) -> None: + """Sync a Dataset file to/from DataHub""" + + failures = [] + with get_default_graph() as graph: + datasets = Dataset.from_yaml(file) + for dataset in datasets: + if to_datahub: + missing_entity_references = [ + entity_reference + for entity_reference in dataset.entity_references() + if not graph.exists(entity_reference) + ] + if missing_entity_references: + click.secho( + "\n\t- ".join( + [ + f"Skipping Dataset {dataset.urn} due to missing entity references: " + ] + + missing_entity_references + ), + fg="red", + ) + failures.append(dataset.urn) + continue + try: + for mcp in dataset.generate_mcp(): + graph.emit(mcp) + click.secho(f"Update succeeded for urn {dataset.urn}.", fg="green") + except Exception as e: + click.secho( + f"Update failed for id {id}. due to {e}", + fg="red", + ) + else: + # Sync from DataHub + if graph.exists(dataset.urn): + existing_dataset: Dataset = Dataset.from_datahub( + graph=graph, urn=dataset.urn + ) + existing_dataset.to_yaml(Path(file)) + else: + click.secho(f"Dataset {dataset.urn} does not exist") + failures.append(dataset.urn) + if failures: + click.secho( + f"\nFailed to sync the following Datasets: {', '.join(failures)}", + fg="red", + ) + raise click.Abort() diff --git a/metadata-ingestion/tests/unit/cli/dataset/test_dataset_cmd.py b/metadata-ingestion/tests/unit/cli/dataset/test_dataset_cmd.py new file mode 100644 index 0000000000000..f29caac531664 --- /dev/null +++ b/metadata-ingestion/tests/unit/cli/dataset/test_dataset_cmd.py @@ -0,0 +1,219 @@ +import shutil +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest +from click.testing import CliRunner + +from datahub.cli.specific.dataset_cli import dataset + +TEST_RESOURCES_DIR = Path(__file__).parent / "test_resources" + + +@pytest.fixture +def test_yaml_file(): + """Creates a temporary test yaml file for testing.""" + # Define test data path + test_file = TEST_RESOURCES_DIR / "dataset.yaml" + + # Create a temporary copy to work with + temp_file = Path(f"{test_file}.tmp") + shutil.copyfile(test_file, temp_file) + + yield temp_file + + # Clean up + if temp_file.exists(): + temp_file.unlink() + + +@pytest.fixture +def malformed_yaml_file(): + """Creates a temporary malformed yaml file for testing.""" + malformed_content = """ +## This file is intentionally malformed +- id: user.badformat + platform: hive + schema: + fields: + - id: ip + type string # Missing colon here + description: The IP address + """ + + # Create a temporary file + temp_file = TEST_RESOURCES_DIR / "malformed_dataset.yaml.tmp" + with open(temp_file, "w") as f: + f.write(malformed_content) + + yield temp_file + + # Clean up + if temp_file.exists(): + temp_file.unlink() + + +@pytest.fixture +def fixable_yaml_file(): + """Creates a temporary yaml file with fixable formatting issues.""" + fixable_content = """ +## This file has fixable formatting issues +- id: user.fixable + platform: hive + schema: + fields: + - id: ip + type: string + description: The IP address + - id: user_id # Extra spaces + type: string + description: The user ID # Extra spaces + """ + + temp_file = TEST_RESOURCES_DIR / "fixable_dataset.yaml.tmp" + with open(temp_file, "w") as f: + f.write(fixable_content) + + yield temp_file + + # Clean up + if temp_file.exists(): + temp_file.unlink() + + +class TestDatasetCli: + def test_dataset_file_command_exists(self): + """Test that the dataset file command exists.""" + runner = CliRunner() + result = runner.invoke(dataset, ["--help"]) + assert result.exit_code == 0 + assert "file" in result.output + + @patch("datahub.cli.specific.dataset_cli.Dataset") + def test_lint_check_no_issues(self, mock_dataset, test_yaml_file): + """Test the lintCheck option when no issues are found.""" + # Setup mocks + mock_dataset_instance = MagicMock() + mock_dataset.from_yaml.return_value = [mock_dataset_instance] + mock_dataset_instance.to_yaml.return_value = None + + # Mock filecmp.cmp to return True (files match) + with patch("filecmp.cmp", return_value=True): + runner = CliRunner() + result = runner.invoke( + dataset, ["file", "--lintCheck", str(test_yaml_file)] + ) + + # Verify + assert result.exit_code == 0 + assert "No differences found" in result.output + mock_dataset.from_yaml.assert_called_once() + mock_dataset_instance.to_yaml.assert_called_once() + + @patch("datahub.cli.specific.dataset_cli.Dataset") + @patch("os.system") + def test_lint_check_with_issues(self, mock_system, mock_dataset, fixable_yaml_file): + """Test the lintCheck option when issues are found.""" + # Setup mocks + mock_dataset_instance = MagicMock() + mock_dataset.from_yaml.return_value = [mock_dataset_instance] + + # Mock filecmp.cmp to return False (files don't match) + with patch("filecmp.cmp", return_value=False): + runner = CliRunner() + result = runner.invoke( + dataset, ["file", "--lintCheck", str(fixable_yaml_file)] + ) + + # Verify + assert result.exit_code == 0 + assert "To fix these differences" in result.output + mock_dataset.from_yaml.assert_called_once() + mock_dataset_instance.to_yaml.assert_called_once() + mock_system.assert_called_once() # Should call diff + + @patch("datahub.cli.specific.dataset_cli.Dataset") + @patch("os.system") + @patch("shutil.copyfile") + def test_lint_fix( + self, mock_copyfile, mock_system, mock_dataset, fixable_yaml_file + ): + """Test the lintFix option.""" + # Setup mocks + mock_dataset_instance = MagicMock() + mock_dataset.from_yaml.return_value = [mock_dataset_instance] + + # Mock filecmp.cmp to return False (files don't match) + with patch("filecmp.cmp", return_value=False): + runner = CliRunner() + result = runner.invoke( + dataset, ["file", "--lintCheck", "--lintFix", str(fixable_yaml_file)] + ) + + # Verify + assert result.exit_code == 0 + assert "Fixed linting issues" in result.output + + # Check that copyfile was called twice: + # 1. To copy the original file to the temp file + # 2. To copy the fixed temp file back to the original + assert mock_copyfile.call_count == 2 + + # The second call should copy from temp file to the original + mock_copyfile.call_args_list[1][0][0] # Source of second call + assert mock_copyfile.call_args_list[1][0][1] == str( + fixable_yaml_file + ) # Destination + + @patch("datahub.cli.specific.dataset_cli.Dataset") + def test_error_handling(self, mock_dataset, malformed_yaml_file): + """Test error handling when processing a malformed yaml file.""" + # Setup mock to raise an exception + mock_dataset.from_yaml.side_effect = Exception("YAML parsing error") + + runner = CliRunner() + result = runner.invoke( + dataset, ["file", "--lintCheck", str(malformed_yaml_file)] + ) + + # Verify exception is properly handled + assert result.exit_code != 0 + mock_dataset.from_yaml.assert_called_once() + + def test_temporary_file_cleanup(self, test_yaml_file): + """Test that temporary files are properly cleaned up.""" + # Count files in the directory before + files_before = len(list(TEST_RESOURCES_DIR.glob("*.tmp"))) + + runner = CliRunner() + with patch("datahub.cli.specific.dataset_cli.Dataset"): + with patch("filecmp.cmp", return_value=True): + runner.invoke(dataset, ["file", "--lintCheck", str(test_yaml_file)]) + + # Count files after + files_after = len(list(TEST_RESOURCES_DIR.glob("*.tmp"))) + + # Should be same count (our fixture creates one tmp file) + assert files_before == files_after + + @patch("datahub.cli.specific.dataset_cli.Dataset") + def test_multiple_datasets_in_file(self, mock_dataset, test_yaml_file): + """Test handling of multiple datasets defined in a single file.""" + # Create mock dataset instances + mock_dataset1 = MagicMock() + mock_dataset2 = MagicMock() + mock_dataset.from_yaml.return_value = [mock_dataset1, mock_dataset2] + + with patch("filecmp.cmp", return_value=True): + runner = CliRunner() + result = runner.invoke( + dataset, ["file", "--lintCheck", str(test_yaml_file)] + ) + + # Verify + assert result.exit_code == 0 + assert "No differences found" in result.output + + # Verify both dataset instances had to_yaml called + mock_dataset1.to_yaml.assert_called_once() + mock_dataset2.to_yaml.assert_called_once() diff --git a/metadata-ingestion/tests/unit/cli/dataset/test_resources/dataset.yaml b/metadata-ingestion/tests/unit/cli/dataset/test_resources/dataset.yaml new file mode 100644 index 0000000000000..29fb0c73bca4c --- /dev/null +++ b/metadata-ingestion/tests/unit/cli/dataset/test_resources/dataset.yaml @@ -0,0 +1,33 @@ +## This file is used to define the dataset schema for the dataset `foobar` +- id: user.clicksv2 + platform: hive + # - urn: urn:li:dataset:(urn:li:dataPlatform:hive,user.clicks,PROD) # use urn instead of id and platform + subtype: Table + properties: + test_property: test_value + schema: + # file: examples/structured_properties/click_event.avsc + fields: + - id: ip + structured_properties: + io.acryl.privacy.retentionTime: + - 30 + - 90 + type: string + description: The IP address of the user + - id: user_id + type: string + description: The user ID of the user +- id: user.clicksv3 + platform: hive + urn: urn:li:dataset:(urn:li:dataPlatform:hive,user.clicksv3,PROD) # use urn instead of id and platform + subtype: View + schema: + # file: examples/structured_properties/click_event.avsc + fields: + - id: ip + type: string + description: The IP address of the user + - id: user_id + type: string + description: The user ID of the user \ No newline at end of file diff --git a/smoke-test/tests/cli/dataset_cmd/test_dataset_command.py b/smoke-test/tests/cli/dataset_cmd/test_dataset_command.py new file mode 100644 index 0000000000000..25acab433ad78 --- /dev/null +++ b/smoke-test/tests/cli/dataset_cmd/test_dataset_command.py @@ -0,0 +1,264 @@ +import json +import logging +import os +import subprocess +import tempfile +from pathlib import Path +from random import randint + +import pytest +import yaml + +from datahub.api.entities.dataset.dataset import Dataset +from datahub.emitter.mce_builder import make_dataset_urn +from datahub.ingestion.graph.client import DataHubGraph +from tests.consistency_utils import wait_for_writes_to_sync +from tests.utils import delete_urns, get_sleep_info + +logger = logging.getLogger(__name__) + +# Generate random dataset IDs to avoid test interference +start_index = randint(10, 10000) +dataset_id = f"test_dataset_sync_{start_index}" +dataset_urn = make_dataset_urn("snowflake", dataset_id) + +sleep_sec, sleep_times = get_sleep_info() + + +@pytest.fixture(scope="module") +def setup_teardown_dataset(graph_client: DataHubGraph): + """Fixture to setup and teardown the test dataset""" + # Setup: Create empty environment + try: + # Teardown any existing dataset first to ensure clean state + if graph_client.exists(dataset_urn): + delete_urns(graph_client, [dataset_urn]) + wait_for_writes_to_sync() + + yield + finally: + # Teardown: Clean up created dataset + if graph_client.exists(dataset_urn): + delete_urns(graph_client, [dataset_urn]) + wait_for_writes_to_sync() + + +def create_dataset_yaml(file_path: Path, additional_properties=None): + """Create a test dataset YAML file""" + dataset_yaml = { + "id": dataset_id, + "platform": "snowflake", + "env": "PROD", + "description": "Test dataset for CLI sync smoke test", + "schema": { + "fields": [ + {"id": "id", "type": "number", "description": "Primary key"}, + {"id": "name", "type": "string", "description": "User name"}, + ] + }, + "tags": ["test", "smoke_test"], + "properties": {"origin": "cli_test"}, + } + + # Add any additional properties + if additional_properties: + for key, value in additional_properties.items(): + dataset_yaml[key] = value + + # Write YAML file + with open(file_path, "w") as f: + f.write("# Dataset sync test\n") # Add a comment to test comment preservation + yaml.dump(dataset_yaml, f, indent=2) + + +def run_cli_command(cmd): + """Run a DataHub CLI command""" + result = subprocess.run(cmd, shell=True, capture_output=True, text=True) + + if result.returncode != 0: + logger.error(f"Command failed: {cmd}") + logger.error(f"STDOUT: {result.stdout}") + logger.error(f"STDERR: {result.stderr}") + raise Exception(f"Command failed with return code {result.returncode}") + + return result + + +def test_dataset_sync_to_datahub(setup_teardown_dataset, graph_client: DataHubGraph): + """Test syncing dataset from YAML to DataHub""" + with tempfile.NamedTemporaryFile(suffix=".yml", delete=False) as tmp: + temp_file_path = Path(tmp.name) + try: + # Create a dataset YAML file + create_dataset_yaml(temp_file_path) + + # Run the CLI command to sync to DataHub + cmd = f"datahub dataset sync -f {temp_file_path} --to-datahub" + result = run_cli_command(cmd) + + # Verify success message in output + assert f"Update succeeded for urn {dataset_urn}" in result.stdout + + # Wait for changes to propagate + wait_for_writes_to_sync() + + # Verify the dataset exists in DataHub + assert graph_client.exists(dataset_urn) + + # Retrieve dataset and verify properties + dataset = Dataset.from_datahub(graph=graph_client, urn=dataset_urn) + assert dataset.id == dataset_id + assert dataset.platform == "snowflake" + assert dataset.description == "Test dataset for CLI sync smoke test" + assert dataset.tags == ["test", "smoke_test"] + assert dataset.properties is not None + assert dataset.properties["origin"] == "cli_test" + + finally: + # Clean up temporary file + if temp_file_path.exists(): + os.unlink(temp_file_path) + + +def test_dataset_sync_from_datahub(setup_teardown_dataset, graph_client: DataHubGraph): + """Test syncing dataset from DataHub to YAML""" + with tempfile.NamedTemporaryFile(suffix=".yml", delete=False) as tmp: + temp_file_path = Path(tmp.name) + try: + # First, create a dataset in DataHub + dataset = Dataset( + id=dataset_id, + platform="snowflake", + description="Test dataset created directly in DataHub", + tags=["from_datahub", "cli_test"], + properties={"origin": "direct_creation"}, + schema=None, + ) + + # Emit the dataset to DataHub + for mcp in dataset.generate_mcp(): + graph_client.emit(mcp) + + wait_for_writes_to_sync() + + # Create a minimal dataset YAML as a starting point + create_dataset_yaml( + temp_file_path, {"description": "This will be overwritten"} + ) + + # Run the CLI command to sync from DataHub + cmd = f"datahub dataset sync -f {temp_file_path} --from-datahub" + result = run_cli_command(cmd) + assert result.returncode == 0 + + # Wait to ensure file is updated + wait_for_writes_to_sync() + # Verify the YAML file was updated + with open(temp_file_path, "r") as f: + content = f.read() + # Check for the comment preservation + assert "# Dataset sync test" in content + # Check for content from DataHub + assert "Test dataset created directly in DataHub" in content + assert "from_datahub" in content + assert "direct_creation" in content + + finally: + # Clean up temporary file + if temp_file_path.exists(): + os.unlink(temp_file_path) + + +def test_dataset_sync_bidirectional(setup_teardown_dataset, graph_client: DataHubGraph): + """Test bidirectional sync with modifications on both sides""" + with tempfile.NamedTemporaryFile(suffix=".yml", delete=False) as tmp: + temp_file_path = Path(tmp.name) + try: + # 1. Create initial dataset in YAML + create_dataset_yaml(temp_file_path) + + # 2. Sync to DataHub + run_cli_command(f"datahub dataset sync -f {temp_file_path} --to-datahub") + wait_for_writes_to_sync() + + # 3. Modify directly in DataHub + dataset_patcher = Dataset.get_patch_builder(dataset_urn) + dataset_patcher.set_description("Modified directly in DataHub") + for mcp in dataset_patcher.build(): + graph_client.emit(mcp) + wait_for_writes_to_sync() + + # 4. Sync from DataHub to update YAML + run_cli_command(f"datahub dataset sync -f {temp_file_path} --from-datahub") + + # 5. Modify the YAML file directly + with open(temp_file_path, "r") as f: + import yaml + + data = yaml.safe_load(f) + + data["properties"]["modified_by"] = "cli_test" + data["tags"].append("modified_yaml") + + with open(temp_file_path, "w") as f: + f.write("# Modified comment\n") + json.dump(data, f, indent=2) + + # 6. Sync back to DataHub + run_cli_command(f"datahub dataset sync -f {temp_file_path} --to-datahub") + wait_for_writes_to_sync() + + # 7. Verify both modifications are present in DataHub + final_dataset = Dataset.from_datahub(graph=graph_client, urn=dataset_urn) + assert final_dataset.description == "Modified directly in DataHub" + assert final_dataset.properties is not None + assert final_dataset.properties["modified_by"] == "cli_test" + assert final_dataset.tags is not None + assert "modified_yaml" in final_dataset.tags + + # 8. Sync one more time from DataHub and verify YAML is intact + run_cli_command(f"datahub dataset sync -f {temp_file_path} --from-datahub") + + with open(temp_file_path, "r") as f: + content = f.read() + assert "# Modified comment" in content + + finally: + # Clean up temporary file + if temp_file_path.exists(): + os.unlink(temp_file_path) + + +def test_dataset_sync_validation(setup_teardown_dataset, graph_client: DataHubGraph): + """Test validation during sync to DataHub with invalid references""" + with tempfile.NamedTemporaryFile(suffix=".yml", delete=False) as tmp: + temp_file_path = Path(tmp.name) + try: + # Create dataset with invalid structured property reference + create_dataset_yaml( + temp_file_path, + { + "structured_properties": {"non_existent_property": "value"}, + "schema": { + "fields": [ + { + "id": "field1", + "type": "string", + "structured_properties": { + "non_existent_field_property": "value" + }, + } + ] + }, + }, + ) + # Attempt to sync to DataHub - should fail due to validation + cmd = f"datahub dataset sync -f {temp_file_path} --to-datahub" + result = subprocess.run(cmd, shell=True, capture_output=True, text=True) + # Assert that the command failed (non-zero return code) + assert result.returncode != 0 + + finally: + # Clean up temporary file + if temp_file_path.exists(): + os.unlink(temp_file_path) diff --git a/smoke-test/tests/structured_properties/test_dataset.yaml b/smoke-test/tests/structured_properties/test_dataset.yaml index 2ac1cca6c6dc2..89dd5e57bad53 100644 --- a/smoke-test/tests/structured_properties/test_dataset.yaml +++ b/smoke-test/tests/structured_properties/test_dataset.yaml @@ -5,15 +5,16 @@ schema: file: tests/structured_properties/click_event.avsc fields: - # - id: ip - - urn: urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,user.clicks,PROD),ip) - structured_properties: - io.acryl.dataManagement.deprecationDate: "2023-01-01" + - id: ip + structured_properties: + io.acryl.dataManagement.deprecationDate: '2023-01-01' properties: - retention: 365 + retention: '365' structured_properties: clusterType: primary clusterName: gold projectNames: - - Tracking - - DataHub + - Tracking + - DataHub + io.acryl.smoketest.dm.retentionTime: 365 + io.acryl.smoketest.dm.certifier: urn:li:corpuser:datahub diff --git a/smoke-test/tests/structured_properties/test_structured_properties.yaml b/smoke-test/tests/structured_properties/test_structured_properties.yaml index eee8d53aeccf5..a50221c5a338d 100644 --- a/smoke-test/tests/structured_properties/test_structured_properties.yaml +++ b/smoke-test/tests/structured_properties/test_structured_properties.yaml @@ -31,3 +31,22 @@ - dataFlow - dataJob - schemaField +- id: io.acryl.smoketest.dm.retentionTime + type: NUMBER + display_name: Retention Time + entity_types: + - dataset + description: 'Retention Time is used to figure out how long to retain records in a dataset' + allowed_values: + - value: 30 + description: 30 days, usually reserved for datasets that are ephemeral and contain pii + - value: 90 + description: Use this for datasets that drive monthly reporting but contain pii + - value: 365 + description: Use this for non-sensitive data that can be retained for longer +- id: io.acryl.smoketest.dm.certifier + type: URN + display_name: Person Certifying the asset + entity_types: + - dataset + - schemaField From df8d4c45c1e642ac0f5a1db8254fb1c007df9372 Mon Sep 17 00:00:00 2001 From: Shirshanka Das Date: Mon, 3 Mar 2025 06:54:03 -0800 Subject: [PATCH 02/16] fix golden test file --- .../datahub/api/entities/dataset/dataset.py | 55 ++++++++----------- .../example_structured_properties_golden.json | 6 +- 2 files changed, 26 insertions(+), 35 deletions(-) diff --git a/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py b/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py index 37db755ec17a3..2ab6b0a00ac4d 100644 --- a/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py +++ b/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py @@ -42,7 +42,12 @@ TagAssociationClass, UpstreamClass, ) -from datahub.metadata.urns import DataPlatformUrn, StructuredPropertyUrn, TagUrn, GlossaryTermUrn +from datahub.metadata.urns import ( + DataPlatformUrn, + GlossaryTermUrn, + StructuredPropertyUrn, + TagUrn, +) from datahub.specific.dataset import DatasetPatchBuilder from datahub.utilities.urns.dataset_urn import DatasetUrn @@ -137,9 +142,11 @@ def from_schema_field( globalTags=[TagUrn(tag.tag).name for tag in schema_field.globalTags.tags] if schema_field.globalTags else None, - glossaryTerms=[GlossaryTermUrn(term.urn).name for term in schema_field.glossaryTerms.terms] - if - schema_field.glossaryTerms + glossaryTerms=[ + GlossaryTermUrn(term.urn).name + for term in schema_field.glossaryTerms.terms + ] + if schema_field.glossaryTerms else None, isPartitioningKey=schema_field.isPartitioningKey, jsonProps=( @@ -154,7 +161,7 @@ def either_id_or_urn_must_be_filled_out(cls, v, values): return v @root_validator(pre=True) - def sync_description_and_doc(cls, values) -> dict: + def sync_description_and_doc(cls, values: Dict) -> Dict: """Synchronize doc and description fields if one is provided but not the other.""" description = values.get("description") doc = values.get("doc") @@ -178,7 +185,7 @@ def get_datahub_type(self) -> models.SchemaFieldDataTypeClass: "bytes", "fixed", ] - type = self.type.lower() + type = self.type.lower() if self.type else self.type if type not in set(get_args(PrimitiveType)): raise ValueError(f"Type {self.type} is not a valid primitive type") @@ -239,28 +246,6 @@ def dict(self, **kwargs): return super().dict(exclude=exclude, exclude_defaults=True, **kwargs) - def model_dump(self, **kwargs): - """Custom model_dump to handle YAML serialization properly.""" - exclude = kwargs.pop("exclude", set()) - - # If description and doc are identical, exclude doc from the output - if self.description == self.doc and self.description is not None: - exclude.add("doc") - - # if nativeDataType and type are identical, exclude nativeDataType from the output - if self.nativeDataType == self.type and self.nativeDataType is not None: - exclude.add("nativeDataType") - - # if the id is the same as the urn's fieldPath, exclude id from the output - from datahub.metadata.urns import SchemaFieldUrn - - if self.urn: - field_urn = SchemaFieldUrn.from_string(self.urn) - if field_urn.field_path == self.id: - exclude.add("urn") - - return super().model_dump(exclude=exclude, exclude_defaults=True, **kwargs) - class SchemaSpecification(BaseModel): file: Optional[str] = None @@ -365,8 +350,9 @@ def _mint_owner(self, owner: Union[str, Ownership]) -> OwnerClass: @staticmethod def get_patch_builder(urn: str) -> DatasetPatchBuilder: return DatasetPatchBuilder(urn) - + def patch_builder(self) -> DatasetPatchBuilder: + assert self.urn return DatasetPatchBuilder(self.urn) @classmethod @@ -732,7 +718,8 @@ def from_datahub(cls, graph: DataHubGraph, urn: str) -> "Dataset": else: structured_properties_map[sp.propertyUrn] = sp.values - from datahub.metadata.urns import TagUrn, GlossaryTermUrn + from datahub.metadata.urns import GlossaryTermUrn, TagUrn + return Dataset( # type: ignore[arg-type] id=dataset_urn.name, platform=platform_urn.platform_name, @@ -750,7 +737,9 @@ def from_datahub(cls, graph: DataHubGraph, urn: str) -> "Dataset": schema=Dataset._schema_from_schema_metadata(graph, urn), tags=[TagUrn(tag.tag).name for tag in tags.tags] if tags else None, glossary_terms=( - [GlossaryTermUrn(term.urn).name for term in glossary_terms.terms] if glossary_terms else None + [GlossaryTermUrn(term.urn).name for term in glossary_terms.terms] + if glossary_terms + else None ), owners=yaml_owners, properties=( @@ -933,12 +922,14 @@ def to_yaml( def _update_dict_preserving_comments( - target: Dict, source: Dict, optional_fields: List[str] = ["urn"] + target: Dict, source: Dict, optional_fields: List[str] = None ) -> None: """ Updates a target dictionary with values from source, preserving comments and structure. This modifies the target dictionary in-place. """ + if optional_fields is None: + optional_fields = ["urn"] # For each key in the source dict for key, value in source.items(): if key in target: diff --git a/metadata-ingestion/tests/unit/api/entities/structuredproperties/example_structured_properties_golden.json b/metadata-ingestion/tests/unit/api/entities/structuredproperties/example_structured_properties_golden.json index 29386ece7b0ca..71f4c7126b2f9 100644 --- a/metadata-ingestion/tests/unit/api/entities/structuredproperties/example_structured_properties_golden.json +++ b/metadata-ingestion/tests/unit/api/entities/structuredproperties/example_structured_properties_golden.json @@ -12,19 +12,19 @@ "allowedValues": [ { "value": { - "string": "30" + "double": 30 }, "description": "30 days, usually reserved for datasets that are ephemeral and contain pii" }, { "value": { - "string": "90" + "double": 90 }, "description": "Use this for datasets that drive monthly reporting but contain pii" }, { "value": { - "string": "365" + "double": 365 }, "description": "Use this for non-sensitive data that can be retained for longer" } From a2c356b4f6b345557d88bfb867d2a5f767ae8bfe Mon Sep 17 00:00:00 2001 From: Chakravarthy Racharla Date: Mon, 3 Mar 2025 15:22:21 +0530 Subject: [PATCH 03/16] add missing __init__.py in tests new tests folder --- smoke-test/tests/cli/dataset_cmd/__init__.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 smoke-test/tests/cli/dataset_cmd/__init__.py diff --git a/smoke-test/tests/cli/dataset_cmd/__init__.py b/smoke-test/tests/cli/dataset_cmd/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d From 9f2ceb29fa84d4d8bbe2927719473b0b0d9879fd Mon Sep 17 00:00:00 2001 From: Chakravarthy Racharla Date: Mon, 3 Mar 2025 15:41:38 +0530 Subject: [PATCH 04/16] lint fix --- .../datahub/api/entities/dataset/dataset.py | 78 ++++++++++++------- .../src/datahub/cli/specific/dataset_cli.py | 9 ++- 2 files changed, 57 insertions(+), 30 deletions(-) diff --git a/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py b/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py index 2ab6b0a00ac4d..277fe74f317b2 100644 --- a/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py +++ b/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py @@ -244,7 +244,7 @@ def dict(self, **kwargs): self.simplify_structured_properties() - return super().dict(exclude=exclude, exclude_defaults=True, **kwargs) + return super().dict(exclude=exclude, exclude_defaults=True, **kwargs) # type: ignore[misc] class SchemaSpecification(BaseModel): @@ -370,7 +370,7 @@ def from_yaml(cls, file: str) -> Iterable["Dataset"]: def entity_references(self) -> List[str]: urn_prefix = f"{StructuredPropertyUrn.URN_PREFIX}:{StructuredPropertyUrn.LI_DOMAIN}:{StructuredPropertyUrn.ENTITY_TYPE}" references = [] - if self.schema_metadata: + if self.schema_metadata and self.schema_metadata.fields: for field in self.schema_metadata.fields: if field.structured_properties: references.extend( @@ -425,6 +425,7 @@ def generate_mcp( raise ValueError( "Either all fields must have type information or none of them should" ) + if all_fields_type_info_present: update_technical_schema = True else: @@ -439,20 +440,39 @@ def generate_mcp( hash="", fields=[ SchemaFieldClass( - fieldPath=field.id, + fieldPath=field.id, # type: ignore[arg-type] type=field.get_datahub_type(), - nativeDataType=field.nativeDataType or field.type, + nativeDataType=field.nativeDataType or field.type, # type: ignore[arg-type] nullable=field.nullable, description=field.description, label=field.label, - created=field.created, - lastModified=field.lastModified, + created=None, # This should be auto-populated. + lastModified=None, # This should be auto-populated. recursive=field.recursive, - globalTags=field.globalTags, - glossaryTerms=field.glossaryTerms, + globalTags=GlobalTagsClass( + tags=[ + TagAssociationClass(tag=make_tag_urn(tag)) + for tag in field.globalTags + ] + ) + if field.globalTags is not None + else None, + glossaryTerms=GlossaryTermsClass( + terms=[ + GlossaryTermAssociationClass( + urn=make_term_urn(term) + ) + for term in field.glossaryTerms + ], + auditStamp=self._mint_auditstamp("yaml"), + ) + if field.glossaryTerms is not None + else None, isPartOfKey=field.isPartOfKey, isPartitioningKey=field.isPartitioningKey, - jsonProps=field.jsonProps, + jsonProps=json.dumps(field.jsonProps) + if field.jsonProps is not None + else None, ) for field in self.schema_metadata.fields ], @@ -774,11 +794,12 @@ def dict(self, **kwargs): if "fields" in schema_data and isinstance(schema_data["fields"], list): # Process each field using its custom dict method processed_fields = [] - for field in self.schema_metadata.fields: - if field: - # Use dict method for Pydantic v1 - processed_field = field.dict(**kwargs) - processed_fields.append(processed_field) + if self.schema_metadata and self.schema_metadata.fields: + for field in self.schema_metadata.fields: + if field: + # Use dict method for Pydantic v1 + processed_field = field.dict(**kwargs) + processed_fields.append(processed_field) # Replace the fields in the result with the processed ones schema_data["fields"] = processed_fields @@ -801,10 +822,10 @@ def model_dump(self, **kwargs): # Check which method exists in the parent class if hasattr(super(), "model_dump"): # For Pydantic v2 - result = super().model_dump(exclude=exclude, **kwargs) + result = super().model_dump(exclude=exclude, **kwargs) # type: ignore[misc] elif hasattr(super(), "dict"): # For Pydantic v1 - result = super().dict(exclude=exclude, **kwargs) + result = super().dict(exclude=exclude, **kwargs) # type: ignore[misc] else: # Fallback to __dict__ if neither method exists result = {k: v for k, v in self.__dict__.items() if k not in exclude} @@ -817,16 +838,19 @@ def model_dump(self, **kwargs): if "fields" in schema_data and isinstance(schema_data["fields"], list): # Process each field using its custom model_dump processed_fields = [] - for field in self.schema_metadata.fields: - if field: - # Call the appropriate serialization method on each field - if hasattr(field, "model_dump"): - processed_field = field.model_dump(**kwargs) - elif hasattr(field, "dict"): - processed_field = field.dict(**kwargs) - else: - processed_field = {k: v for k, v in field.__dict__.items()} - processed_fields.append(processed_field) + if self.schema_metadata and self.schema_metadata.fields: + for field in self.schema_metadata.fields: + if field: + # Call the appropriate serialization method on each field + if hasattr(field, "model_dump"): + processed_field = field.model_dump(**kwargs) + elif hasattr(field, "dict"): + processed_field = field.dict(**kwargs) + else: + processed_field = { + k: v for k, v in field.__dict__.items() + } + processed_fields.append(processed_field) # Replace the fields in the result with the processed ones schema_data["fields"] = processed_fields @@ -849,7 +873,7 @@ def to_yaml( # Set up ruamel.yaml for preserving comments yaml_handler = YAML(typ="rt") # round-trip mode yaml_handler.default_flow_style = False - yaml_handler.preserve_quotes = True + yaml_handler.preserve_quotes = True # type: ignore[assignment] yaml_handler.indent(mapping=2, sequence=2, offset=0) if file.exists(): diff --git a/metadata-ingestion/src/datahub/cli/specific/dataset_cli.py b/metadata-ingestion/src/datahub/cli/specific/dataset_cli.py index b54669498ee3b..91d6c066f429d 100644 --- a/metadata-ingestion/src/datahub/cli/specific/dataset_cli.py +++ b/metadata-ingestion/src/datahub/cli/specific/dataset_cli.py @@ -3,7 +3,7 @@ import os import shutil from pathlib import Path -from typing import Set, Tuple +from typing import List, Set, Tuple import click from click_default_group import DefaultGroup @@ -130,7 +130,7 @@ def file(lintcheck: bool, lintfix: bool, file: str) -> None: shutil.copyfile(file, temp_path) # Run the linting - datasets = Dataset.from_yaml(temp_path) + datasets = Dataset.from_yaml(temp.name) for dataset in datasets: dataset.to_yaml(temp_path) @@ -173,10 +173,13 @@ def file(lintcheck: bool, lintfix: bool, file: str) -> None: def sync(file: str, to_datahub: bool) -> None: """Sync a Dataset file to/from DataHub""" - failures = [] + failures: List[str] = [] with get_default_graph() as graph: datasets = Dataset.from_yaml(file) for dataset in datasets: + assert ( + dataset.urn is not None + ) # Validator should have ensured this is filled. Tell mypy it's not None if to_datahub: missing_entity_references = [ entity_reference From 50e910f6e770ee7e8b287697cd32f6fe63fe164f Mon Sep 17 00:00:00 2001 From: Shirshanka Das Date: Mon, 3 Mar 2025 07:13:28 -0800 Subject: [PATCH 05/16] better reference checking --- .../datahub/api/entities/dataset/dataset.py | 45 +++++++++++++------ 1 file changed, 32 insertions(+), 13 deletions(-) diff --git a/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py b/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py index 277fe74f317b2..0e6d500f95261 100644 --- a/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py +++ b/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py @@ -244,7 +244,7 @@ def dict(self, **kwargs): self.simplify_structured_properties() - return super().dict(exclude=exclude, exclude_defaults=True, **kwargs) # type: ignore[misc] + return super().dict(exclude=exclude, exclude_defaults=True, **kwargs) class SchemaSpecification(BaseModel): @@ -370,18 +370,37 @@ def from_yaml(cls, file: str) -> Iterable["Dataset"]: def entity_references(self) -> List[str]: urn_prefix = f"{StructuredPropertyUrn.URN_PREFIX}:{StructuredPropertyUrn.LI_DOMAIN}:{StructuredPropertyUrn.ENTITY_TYPE}" references = [] - if self.schema_metadata and self.schema_metadata.fields: - for field in self.schema_metadata.fields: - if field.structured_properties: - references.extend( - [ - f"{urn_prefix}:{prop_key}" - if not prop_key.startswith(urn_prefix) - else prop_key - for prop_key in field.structured_properties.keys() - ] - ) - return references + if self.schema_metadata: + if self.schema_metadata.fields: + for field in self.schema_metadata.fields: + if field.structured_properties: + references.extend( + [ + f"{urn_prefix}:{prop_key}" + if not prop_key.startswith(urn_prefix) + else prop_key + for prop_key in field.structured_properties.keys() + ] + ) + if field.glossaryTerms: + references.extend( + [make_term_urn(term) for term in field.glossaryTerms] + ) + # We don't check references for tags + if self.structured_properties: + references.extend( + [ + f"{urn_prefix}:{prop_key}" + if not prop_key.startswith(urn_prefix) + else prop_key + for prop_key in self.structured_properties.keys() + ] + ) + if self.glossary_terms: + references.extend([make_term_urn(term) for term in self.glossary_terms]) + + # We don't check references for tags + return list(set(references)) def generate_mcp( self, From 2ebae89e81a707a7d9fc55e146ebfee2e64d6efd Mon Sep 17 00:00:00 2001 From: Shirshanka Das Date: Mon, 3 Mar 2025 07:59:04 -0800 Subject: [PATCH 06/16] fix lint --- metadata-ingestion/src/datahub/api/entities/dataset/dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py b/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py index 0e6d500f95261..b44d4f486161e 100644 --- a/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py +++ b/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py @@ -965,7 +965,7 @@ def to_yaml( def _update_dict_preserving_comments( - target: Dict, source: Dict, optional_fields: List[str] = None + target: Dict, source: Dict, optional_fields: Optional[List[str]] = None ) -> None: """ Updates a target dictionary with values from source, preserving comments and structure. From 777b86f6576a8ff4b399520364d5ffaa5301a75f Mon Sep 17 00:00:00 2001 From: Shirshanka Das Date: Mon, 3 Mar 2025 12:14:18 -0800 Subject: [PATCH 07/16] fix nested schemas --- .../structured_properties/click_event.avsc | 12 +- .../structured_properties/dataset.yaml | 39 +-- .../datahub/api/entities/dataset/dataset.py | 237 ++++++++++++++---- .../src/datahub/cli/specific/dataset_cli.py | 7 +- 4 files changed, 227 insertions(+), 68 deletions(-) diff --git a/metadata-ingestion/examples/structured_properties/click_event.avsc b/metadata-ingestion/examples/structured_properties/click_event.avsc index b277674f8b62f..a0b8e57982fb9 100644 --- a/metadata-ingestion/examples/structured_properties/click_event.avsc +++ b/metadata-ingestion/examples/structured_properties/click_event.avsc @@ -9,6 +9,16 @@ { "name": "referer", "type": ["string", "null"] }, { "name": "user_agent", "type": ["string", "null"] }, { "name": "user_id", "type": ["string", "null"] }, - { "name": "session_id", "type": ["string", "null"] } + { "name": "session_id", "type": ["string", "null"] }, + { + "name": "locator", "type": { + "type": "record", + "name": "Locator", + "fields": [ + { "name": "latitude", "type": "float" }, + { "name": "longitude", "type": "float" } + ] + } + } ] } diff --git a/metadata-ingestion/examples/structured_properties/dataset.yaml b/metadata-ingestion/examples/structured_properties/dataset.yaml index 557bf0167a51b..b6186d8ee5195 100644 --- a/metadata-ingestion/examples/structured_properties/dataset.yaml +++ b/metadata-ingestion/examples/structured_properties/dataset.yaml @@ -6,17 +6,25 @@ schema: file: examples/structured_properties/click_event.avsc fields: - - id: ip - - urn: urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,user.clicks,PROD),ip) - structured_properties: # structured properties for schema fields/columns go here - io.acryl.dataManagement.deprecationDate: "2023-01-01" - io.acryl.dataManagement.certifier: urn:li:corpuser:john.doe@example.com - io.acryl.dataManagement.replicationSLA: 90 + - id: ip + structured_properties: + io.acryl.dataManagement.deprecationDate: '2023-01-01' + io.acryl.dataManagement.certifier: urn:li:corpuser:john.doe@example.com + io.acryl.dataManagement.replicationSLA: 90.0 + type: string + - id: url + structured_properties: + io.acryl.dataManagement.deprecationDate: '2023-01-01' + type: string + - id: locator.latitude + structured_properties: + io.acryl.dataManagement.deprecationDate: '2023-01-01' + type: float structured_properties: # dataset level structured properties go here io.acryl.privacy.retentionTime: 365 projectNames: - - Tracking - - DataHub + - Tracking + - DataHub - id: ClickEvent platform: events subtype: Topic @@ -27,19 +35,20 @@ project_name: Tracking namespace: org.acryl.tracking version: 1.0.0 - retention: 30 + retention: '30' structured_properties: io.acryl.dataManagement.certifier: urn:li:corpuser:john.doe@example.com schema: file: examples/structured_properties/click_event.avsc downstreams: - - urn:li:dataset:(urn:li:dataPlatform:hive,user.clicks,PROD) + - urn:li:dataset:(urn:li:dataPlatform:hive,user.clicks,PROD) - id: user.clicks platform: snowflake - schema: - fields: - - id: user_id - structured_properties: - io.acryl.dataManagement.deprecationDate: "2023-01-01" structured_properties: io.acryl.dataManagement.replicationSLA: 90 + schema: + fields: + - id: user_id + structured_properties: + io.acryl.dataManagement.deprecationDate: '2023-01-01' + type: string diff --git a/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py b/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py index b44d4f486161e..f8f90a72eabe7 100644 --- a/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py +++ b/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py @@ -4,6 +4,7 @@ from pathlib import Path from typing import Dict, Iterable, List, Literal, Optional, Tuple, Union, get_args +import avro import yaml from pydantic import BaseModel, Field, root_validator, validator from ruamel.yaml import YAML @@ -61,6 +62,46 @@ class Config: extra = "forbid" +class StructuredPropertiesHelper: + @staticmethod + def simplify_structured_properties_list( + structured_properties: Optional[ + Dict[str, Union[float, int, str, List[Union[float, int, str]]]] + ], + ) -> Optional[Dict[str, Union[float, int, str, List[Union[float, int, str]]]]]: + def urn_strip(urn: str) -> str: + if urn.startswith("urn:li:structuredProperty:"): + return urn[len("urn:li:structuredProperty:") :] + return urn + + if structured_properties: + simplified_structured_properties = ( + {urn_strip(k): v for k, v in structured_properties.items()} + if structured_properties + else None + ) + if simplified_structured_properties: + # convert lists to single values if possible + for k, v in simplified_structured_properties.items(): + if isinstance(v, list): + v = [ + int(x) if isinstance(x, float) and x.is_integer() else x + for x in v + ] + if len(v) == 1: + simplified_structured_properties[k] = v[0] + else: + simplified_structured_properties[k] = v + else: + simplified_structured_properties[k] = ( + int(v) + if v and isinstance(v, float) and v.is_integer() + else v + ) + return simplified_structured_properties + return None + + class SchemaFieldSpecification(StrictModel): id: Optional[str] = None urn: Optional[str] = None @@ -83,39 +124,17 @@ class SchemaFieldSpecification(StrictModel): isPartitioningKey: Optional[bool] = None jsonProps: Optional[dict] = None - def simplify_structured_properties(self) -> None: - if self.structured_properties: - # convert lists to single values if possible - for k, v in self.structured_properties.items(): - if isinstance(v, list): - v = [ - int(x) if isinstance(x, float) and x.is_integer() else x - for x in v - ] - if len(v) == 1: - self.structured_properties[k] = v[0] - else: - self.structured_properties[k] = v - else: - self.structured_properties[k] = ( - int(v) if v and isinstance(v, float) and v.is_integer() else v - ) - def with_structured_properties( self, - structured_properties: Optional[Dict[str, List[Union[str, int, float]]]], + structured_properties: Optional[ + Dict[str, Union[float, int, str, List[Union[float, int, str]]]] + ], ) -> "SchemaFieldSpecification": - def urn_strip(urn: str) -> str: - if urn.startswith("urn:li:structuredProperty:"): - return urn[len("urn:li:structuredProperty:") :] - return urn - self.structured_properties = ( - {urn_strip(k): v for k, v in structured_properties.items()} - if structured_properties - else None + StructuredPropertiesHelper.simplify_structured_properties_list( + structured_properties + ) ) - self.simplify_structured_properties() return self @classmethod @@ -126,7 +145,7 @@ def from_schema_field( id=Dataset._simplify_field_path(schema_field.fieldPath), urn=make_schema_field_urn(parent_urn, schema_field.fieldPath), type=SchemaFieldSpecification._from_datahub_type( - schema_field.type, schema_field.nativeDataType + schema_field.type, schema_field.nativeDataType, allow_complex=True ), nativeDataType=schema_field.nativeDataType, nullable=schema_field.nullable, @@ -204,7 +223,9 @@ def get_datahub_type(self) -> models.SchemaFieldDataTypeClass: @staticmethod def _from_datahub_type( - input_type: models.SchemaFieldDataTypeClass, native_data_type: str + input_type: models.SchemaFieldDataTypeClass, + native_data_type: str, + allow_complex: bool = False, ) -> str: if isinstance(input_type.type, models.StringTypeClass): return "string" @@ -218,6 +239,14 @@ def _from_datahub_type( return "bytes" elif isinstance(input_type.type, models.BooleanTypeClass): return "boolean" + elif allow_complex and isinstance(input_type.type, models.ArrayTypeClass): + return "array" + elif allow_complex and isinstance(input_type.type, models.MapTypeClass): + return "map" + elif allow_complex and isinstance(input_type.type, models.UnionTypeClass): + return "union" + elif allow_complex: + return "record" raise ValueError(f"Type {input_type} is not a valid primitive type") def dict(self, **kwargs): @@ -237,12 +266,16 @@ def dict(self, **kwargs): if self.urn: field_urn = SchemaFieldUrn.from_string(self.urn) - if field_urn.field_path == self.id: + if Dataset._simplify_field_path(field_urn.field_path) == self.id: exclude.add("urn") kwargs.pop("exclude_defaults", None) - self.simplify_structured_properties() + self.structured_properties = ( + StructuredPropertiesHelper.simplify_structured_properties_list( + self.structured_properties + ) + ) return super().dict(exclude=exclude, exclude_defaults=True, **kwargs) @@ -250,6 +283,7 @@ def dict(self, **kwargs): class SchemaSpecification(BaseModel): file: Optional[str] = None fields: Optional[List[SchemaFieldSpecification]] = None + raw_schema: Optional[str] = None @validator("file") def file_must_be_avsc(cls, v): @@ -274,6 +308,10 @@ class StructuredPropertyValue(ConfigModel): lastModified: Optional[str] = None +class DatasetRetrievalConfig(BaseModel): + include_downstreams: Optional[bool] = False + + class Dataset(StrictModel): id: Optional[str] = None platform: Optional[str] = None @@ -325,6 +363,10 @@ def platform_must_not_be_urn(cls, v): return v[len("urn:li:dataPlatform:") :] return v + @validator("structured_properties") + def simplify_structured_properties(cls, v): + return StructuredPropertiesHelper.simplify_structured_properties_list(v) + def _mint_auditstamp(self, message: str) -> AuditStampClass: return AuditStampClass( time=int(time.time() * 1000.0), @@ -402,7 +444,7 @@ def entity_references(self) -> List[str]: # We don't check references for tags return list(set(references)) - def generate_mcp( + def generate_mcp( # noqa: C901 self, ) -> Iterable[Union[MetadataChangeProposalClass, MetadataChangeProposalWrapper]]: mcp = MetadataChangeProposalWrapper( @@ -417,9 +459,12 @@ def generate_mcp( yield mcp if self.schema_metadata: + schema_fields = set() if self.schema_metadata.file: with open(self.schema_metadata.file) as schema_fp: schema_string = schema_fp.read() + schema_fields_list = avro_schema_to_mce_fields(schema_string) + schema_fields = {field.fieldPath for field in schema_fields_list} schema_metadata = SchemaMetadataClass( schemaName=self.name or self.id or self.urn or "", platform=self.platform_urn, @@ -509,6 +554,27 @@ def generate_mcp( yield mcp for field in self.schema_metadata.fields: + if schema_fields: + # search for the field in the schema fields set + matched_fields = [ + schema_field + for schema_field in schema_fields + if field.id == schema_field + or field.id == Dataset._simplify_field_path(schema_field) + ] + if not matched_fields: + raise ValueError( + f"Field {field.id} not found in the schema file" + ) + if len(matched_fields) > 1: + raise ValueError( + f"Field {field.id} matches multiple entries {matched_fields}in the schema file. Use the fully qualified field path." + ) + assert len(matched_fields) == 1 + assert ( + self.urn is not None + ) # validator should have filled this in + field.urn = make_schema_field_urn(self.urn, matched_fields[0]) field_urn = field.urn or make_schema_field_urn( self.urn, # type: ignore[arg-type] field.id, # type: ignore[arg-type] @@ -650,6 +716,10 @@ def validate_type( @staticmethod def _simplify_field_path(field_path: str) -> str: + # field paths with [type=array] or [type=map] or [type=union] should never be simplified + for type in ["array", "map", "union"]: + if f"[type={type}]" in field_path: + return field_path if field_path.startswith("[version=2.0]"): # v2 field path field_components = [] @@ -681,7 +751,26 @@ def _schema_from_schema_metadata( ) if schema_metadata: + # If the schema is built off of an avro schema, we only extract the fields if they have structured properties + # Otherwise, we extract all fields + if ( + schema_metadata.platformSchema + and isinstance(schema_metadata.platformSchema, models.OtherSchemaClass) + and schema_metadata.platformSchema.rawSchema + ): + try: + maybe_avro_schema = avro.schema.parse( + schema_metadata.platformSchema.rawSchema + ) + schema_fields = avro_schema_to_mce_fields(maybe_avro_schema) + except Exception as e: + logger.debug("Failed to parse avro schema: %s", e) + schema_fields = [] + schema_specification = SchemaSpecification( + raw_schema=schema_metadata.platformSchema.rawSchema + if hasattr(schema_metadata.platformSchema, "rawSchema") + else None, fields=[ SchemaFieldSpecification.from_schema_field( field, urn @@ -709,8 +798,21 @@ def _schema_from_schema_metadata( ) for field in schema_metadata.fields ] - ] + ], ) + if schema_fields and schema_specification.fields: + # Source was an avro schema, so we only include fields with structured properties, tags or glossary terms + schema_specification.fields = [ + field + for field in schema_specification.fields + if field.structured_properties + or field.globalTags + or field.glossaryTerms + ] + if ( + not schema_specification.fields + ): # set fields to None if there are no fields after filtering + schema_specification.fields = None return schema_specification else: return None @@ -732,7 +834,12 @@ def extract_owners_if_exists( return yaml_owners @classmethod - def from_datahub(cls, graph: DataHubGraph, urn: str) -> "Dataset": + def from_datahub( + cls, + graph: DataHubGraph, + urn: str, + config: DatasetRetrievalConfig = DatasetRetrievalConfig(), + ) -> "Dataset": dataset_urn = DatasetUrn.from_string(urn) platform_urn = DataPlatformUrn.from_string(dataset_urn.platform) dataset_properties: Optional[DatasetPropertiesClass] = graph.get_aspect( @@ -757,7 +864,16 @@ def from_datahub(cls, graph: DataHubGraph, urn: str) -> "Dataset": else: structured_properties_map[sp.propertyUrn] = sp.values - from datahub.metadata.urns import GlossaryTermUrn, TagUrn + if config.include_downstreams: + related_downstreams = graph.get_related_entities( + urn, + relationship_types=[ + "DownstreamOf", + ], + direction=DataHubGraph.RelationshipDirection.INCOMING, + ) + downstreams = [r.urn for r in related_downstreams] + breakpoint() return Dataset( # type: ignore[arg-type] id=dataset_urn.name, @@ -788,6 +904,7 @@ def from_datahub(cls, graph: DataHubGraph, urn: str) -> "Dataset": structured_properties=( structured_properties_map if structured_properties else None ), + downstreams=downstreams if config.include_downstreams else None, ) def dict(self, **kwargs): @@ -902,6 +1019,8 @@ def to_yaml( existing_data = yaml_handler.load(fp) # Determine if the file contains a list or a single document + if isinstance(existing_data, dict): + existing_data = [existing_data] if isinstance(existing_data, list): # Handle list case updated = False @@ -911,12 +1030,42 @@ def to_yaml( if model_id is not None: # Try to find and update existing item for item in existing_data: - item_identifier = item.get(identifier, Dataset(**item).urn) + existing_dataset = Dataset(**item) + item_identifier = item.get(identifier, existing_dataset.urn) if item_identifier == model_id: # Found the item to update - preserve structure while updating values updated = True + if ( + existing_dataset.schema_metadata + and existing_dataset.schema_metadata.file + ): + # Preserve the existing schema file path + new_data["schema"]["file"] = ( + existing_dataset.schema_metadata.file + ) + # Check if the content of the schema file has changed + with open( + existing_dataset.schema_metadata.file + ) as schema_fp: + schema_fp_content = schema_fp.read() + + if ( + schema_fp_content + != new_data["schema"]["raw_schema"] + ): + # If the content has changed, update the schema file + schema_file_path = Path( + existing_dataset.schema_metadata.file + ) + schema_file_path.write_text( + new_data["schema"]["raw_schema"] + ) + # Remove raw_schema from the schema aspect before updating + if "schema" in new_data: + new_data["schema"].pop("raw_schema") + _update_dict_preserving_comments( - item, new_data, ["urn", "properties"] + item, new_data, ["urn", "properties", "raw_schema"] ) break @@ -933,18 +1082,6 @@ def to_yaml( with open(file, "w") as fp: yaml_handler.dump(existing_data, fp) - else: - # Handle single document case - if _dict_equal(existing_data, new_data, ["urn"]): - return False # No changes needed - - # Update the existing document while preserving comments - _update_dict_preserving_comments(existing_data, new_data, ["urn"]) - - # Write the updated data back - with open(file, "w") as fp: - yaml_handler.dump(existing_data, fp) - return True except Exception as e: diff --git a/metadata-ingestion/src/datahub/cli/specific/dataset_cli.py b/metadata-ingestion/src/datahub/cli/specific/dataset_cli.py index 91d6c066f429d..012a4fff4ad04 100644 --- a/metadata-ingestion/src/datahub/cli/specific/dataset_cli.py +++ b/metadata-ingestion/src/datahub/cli/specific/dataset_cli.py @@ -8,7 +8,7 @@ import click from click_default_group import DefaultGroup -from datahub.api.entities.dataset.dataset import Dataset +from datahub.api.entities.dataset.dataset import Dataset, DatasetRetrievalConfig from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.graph.client import DataHubGraph, get_default_graph from datahub.metadata.com.linkedin.pegasus2avro.common import Siblings @@ -210,8 +210,11 @@ def sync(file: str, to_datahub: bool) -> None: else: # Sync from DataHub if graph.exists(dataset.urn): + dataset_get_config = DatasetRetrievalConfig() + if dataset.downstreams: + dataset_get_config.include_downstreams = True existing_dataset: Dataset = Dataset.from_datahub( - graph=graph, urn=dataset.urn + graph=graph, urn=dataset.urn, config=dataset_get_config ) existing_dataset.to_yaml(Path(file)) else: From a293129c595355fd37079e9fd74e5c14c8032053 Mon Sep 17 00:00:00 2001 From: Shirshanka Das Date: Mon, 3 Mar 2025 12:21:49 -0800 Subject: [PATCH 08/16] fix roundtripping of field types when using file-based schema --- .../structured_properties/dataset.yaml | 3 - .../datahub/api/entities/dataset/dataset.py | 68 ++++--------------- 2 files changed, 15 insertions(+), 56 deletions(-) diff --git a/metadata-ingestion/examples/structured_properties/dataset.yaml b/metadata-ingestion/examples/structured_properties/dataset.yaml index b6186d8ee5195..13cb3de7693d6 100644 --- a/metadata-ingestion/examples/structured_properties/dataset.yaml +++ b/metadata-ingestion/examples/structured_properties/dataset.yaml @@ -11,15 +11,12 @@ io.acryl.dataManagement.deprecationDate: '2023-01-01' io.acryl.dataManagement.certifier: urn:li:corpuser:john.doe@example.com io.acryl.dataManagement.replicationSLA: 90.0 - type: string - id: url structured_properties: io.acryl.dataManagement.deprecationDate: '2023-01-01' - type: string - id: locator.latitude structured_properties: io.acryl.dataManagement.deprecationDate: '2023-01-01' - type: float structured_properties: # dataset level structured properties go here io.acryl.privacy.retentionTime: 365 projectNames: diff --git a/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py b/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py index f8f90a72eabe7..ef9e5115e420b 100644 --- a/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py +++ b/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py @@ -124,6 +124,20 @@ class SchemaFieldSpecification(StrictModel): isPartitioningKey: Optional[bool] = None jsonProps: Optional[dict] = None + def remove_type_metadata(self) -> "SchemaFieldSpecification": + """ + Removes type metadata from the schema field specification. + This is useful when syncing field metadata back to yaml when + the type information is already present in the schema file. + """ + self.type = None + self.nativeDataType = None + self.jsonPath = None + self.isPartitioningKey = None + self.isPartOfKey = None + self.jsonProps = None + return self + def with_structured_properties( self, structured_properties: Optional[ @@ -803,7 +817,7 @@ def _schema_from_schema_metadata( if schema_fields and schema_specification.fields: # Source was an avro schema, so we only include fields with structured properties, tags or glossary terms schema_specification.fields = [ - field + field.remove_type_metadata() for field in schema_specification.fields if field.structured_properties or field.globalTags @@ -873,7 +887,6 @@ def from_datahub( direction=DataHubGraph.RelationshipDirection.INCOMING, ) downstreams = [r.urn for r in related_downstreams] - breakpoint() return Dataset( # type: ignore[arg-type] id=dataset_urn.name, @@ -942,57 +955,6 @@ def dict(self, **kwargs): return result - def model_dump(self, **kwargs): - """Custom model_dump to handle YAML serialization properly.""" - exclude = kwargs.pop("exclude", set()) - - # If id and name are identical, exclude name from the output - if self.id == self.name and self.id is not None: - exclude.add("name") - - # if subtype and subtypes are identical or subtypes is a singleton list, exclude subtypes from the output - if self.subtypes and len(self.subtypes) == 1: - self.subtype = self.subtypes[0] - exclude.add("subtypes") - - # Check which method exists in the parent class - if hasattr(super(), "model_dump"): - # For Pydantic v2 - result = super().model_dump(exclude=exclude, **kwargs) # type: ignore[misc] - elif hasattr(super(), "dict"): - # For Pydantic v1 - result = super().dict(exclude=exclude, **kwargs) # type: ignore[misc] - else: - # Fallback to __dict__ if neither method exists - result = {k: v for k, v in self.__dict__.items() if k not in exclude} - - # Custom handling for schema_metadata/schema - if self.schema_metadata and "schema" in result: - schema_data = result["schema"] - - # Handle fields if they exist - if "fields" in schema_data and isinstance(schema_data["fields"], list): - # Process each field using its custom model_dump - processed_fields = [] - if self.schema_metadata and self.schema_metadata.fields: - for field in self.schema_metadata.fields: - if field: - # Call the appropriate serialization method on each field - if hasattr(field, "model_dump"): - processed_field = field.model_dump(**kwargs) - elif hasattr(field, "dict"): - processed_field = field.dict(**kwargs) - else: - processed_field = { - k: v for k, v in field.__dict__.items() - } - processed_fields.append(processed_field) - - # Replace the fields in the result with the processed ones - schema_data["fields"] = processed_fields - - return result - def to_yaml( self, file: Path, From 881cb3472e2c7d31611cd873ac82d54ec3030620 Mon Sep 17 00:00:00 2001 From: Shirshanka Das Date: Mon, 3 Mar 2025 13:45:00 -0800 Subject: [PATCH 09/16] improve doc --- docs/cli-commands/dataset.md | 102 +++++++++++++++++++++-------------- 1 file changed, 62 insertions(+), 40 deletions(-) diff --git a/docs/cli-commands/dataset.md b/docs/cli-commands/dataset.md index 31e39f47d5dda..bd4e7b28c3968 100644 --- a/docs/cli-commands/dataset.md +++ b/docs/cli-commands/dataset.md @@ -4,24 +4,6 @@ The `dataset` command allows you to interact with Dataset entities in DataHub. T ## Commands -### upsert - -Create or update Dataset metadata in DataHub. - -```shell -datahub dataset upsert -f PATH_TO_YAML_FILE -``` - -**Options:** -- `-f, --file` - Path to the YAML file containing Dataset metadata (required) - -**Example:** -```shell -datahub dataset upsert -f dataset.yaml -``` - -This command will parse the YAML file, validate that any entity references exist in DataHub, and then emit the corresponding metadata change proposals to update or create the Dataset. - ### sync Synchronize Dataset metadata between YAML files and DataHub. @@ -46,47 +28,71 @@ datahub dataset sync -f dataset.yaml --from-datahub The `sync` command offers bidirectional synchronization, allowing you to keep your local YAML files in sync with the DataHub platform. The `upsert` command actually uses `sync` with the `--to-datahub` flag internally. -### get +For details on the supported YAML format, see the [Dataset YAML Format](#dataset-yaml-format) section. -Retrieve Dataset metadata from DataHub and optionally write it to a file. +### file + +Operate on a Dataset YAML file for validation or linting. ```shell -datahub dataset get --urn DATASET_URN [--to-file OUTPUT_FILE] +datahub dataset file [--lintCheck] [--lintFix] PATH_TO_YAML_FILE ``` **Options:** -- `--urn` - The Dataset URN to retrieve (required) -- `--to-file` - Path to write the Dataset metadata as YAML (optional) +- `--lintCheck` - Check the YAML file for formatting issues (optional) +- `--lintFix` - Fix formatting issues in the YAML file (optional) **Example:** ```shell -datahub dataset get --urn "urn:li:dataset:(urn:li:dataPlatform:hive,example_table,PROD)" --to-file my_dataset.yaml +# Check for linting issues +datahub dataset file --lintCheck dataset.yaml + +# Fix linting issues +datahub dataset file --lintFix dataset.yaml ``` -If the URN does not start with `urn:li:dataset:`, it will be automatically prefixed. +This command helps maintain consistent formatting of your Dataset YAML files. For more information on the expected format, refer to the [Dataset YAML Format](#dataset-yaml-format) section. -### file +### upsert -Operate on a Dataset YAML file for validation or linting. +Create or update Dataset metadata in DataHub. ```shell -datahub dataset file [--lintCheck] [--lintFix] PATH_TO_YAML_FILE +datahub dataset upsert -f PATH_TO_YAML_FILE ``` **Options:** -- `--lintCheck` - Check the YAML file for formatting issues (optional) -- `--lintFix` - Fix formatting issues in the YAML file (optional) +- `-f, --file` - Path to the YAML file containing Dataset metadata (required) **Example:** ```shell -# Check for linting issues -datahub dataset file --lintCheck dataset.yaml +datahub dataset upsert -f dataset.yaml +``` -# Fix linting issues -datahub dataset file --lintFix dataset.yaml +This command will parse the YAML file, validate that any entity references exist in DataHub, and then emit the corresponding metadata change proposals to update or create the Dataset. + +For details on the required structure of your YAML file, see the [Dataset YAML Format](#dataset-yaml-format) section. + +### get + +Retrieve Dataset metadata from DataHub and optionally write it to a file. + +```shell +datahub dataset get --urn DATASET_URN [--to-file OUTPUT_FILE] ``` -This command helps maintain consistent formatting of your Dataset YAML files. +**Options:** +- `--urn` - The Dataset URN to retrieve (required) +- `--to-file` - Path to write the Dataset metadata as YAML (optional) + +**Example:** +```shell +datahub dataset get --urn "urn:li:dataset:(urn:li:dataPlatform:hive,example_table,PROD)" --to-file my_dataset.yaml +``` + +If the URN does not start with `urn:li:dataset:`, it will be automatically prefixed. + +The output file will be formatted according to the [Dataset YAML Format](#dataset-yaml-format) section. ### add_sibling @@ -130,9 +136,7 @@ schema: doc: "First field" # Alias for description nativeDataType: "VARCHAR" # Native platform type (defaults to type if not specified) nullable: false # Whether field can be null (default: false) - jsonPath: "$.field1" # JSON path for the field - label: "Field One" # Display label - recursive: false # Whether field is recursive (default: false) + label: "Field One" # Display label (optional business label for the field) isPartOfKey: true # Whether field is part of primary key isPartitioningKey: false # Whether field is a partitioning key jsonProps: {"customProp": "value"} # Custom JSON properties @@ -146,6 +150,7 @@ schema: structured_properties: property1: "value1" property2: 42 + file: example.schema.avsc # Optional schema file (required if defining tables with nested fields) # Additional metadata (all optional) properties: # Custom properties as key-value pairs @@ -214,12 +219,11 @@ The Schema Field object supports the following properties: |----------|------|-------------| | `id` | string | Field identifier/path (required if `urn` not provided) | | `urn` | string | URN of the schema field (required if `id` not provided) | -| `type` | string | Data type (one of the supported field types) | +| `type` | string | Data type (one of the supported [Field Types](#field-types)) | | `nativeDataType` | string | Native data type in the source platform (defaults to `type` if not specified) | | `description` | string | Field description | | `doc` | string | Alias for description | | `nullable` | boolean | Whether the field can be null (default: false) | -| `jsonPath` | string | JSON path for the field | | `label` | string | Display label for the field | | `recursive` | boolean | Whether the field is recursive (default: false) | | `isPartOfKey` | boolean | Whether the field is part of the primary key | @@ -229,6 +233,24 @@ The Schema Field object supports the following properties: | `glossaryTerms` | array | List of glossary terms associated with the field | | `structured_properties` | object | Structured properties for the field | + +**Important Note on Schema Field Types**: +When specifying fields in the YAML file, you must follow an all-or-nothing approach with the `type` field: +- If you want the command to generate the schema for you, specify the `type` field for ALL fields. +- If you only want to add field-level metadata (like tags, glossary terms, or structured properties), do NOT specify the `type` field for ANY field. + +Example of fields with only metadata (no types): +```yaml +schema: + fields: + - id: "field1" # Field identifier + structured_properties: + prop1: prop_value + - id: "field2" + structured_properties: + prop1: prop_value +``` + ### Ownership Types When specifying owners, the following ownership types are supported: From ead194c6aea094ea4d4eba06ff9cd2be69ac8bf2 Mon Sep 17 00:00:00 2001 From: Shirshanka Das Date: Mon, 3 Mar 2025 18:21:14 -0800 Subject: [PATCH 10/16] fix auth in smoke tests --- .../datahub/api/entities/dataset/dataset.py | 5 ++ .../cli/dataset_cmd/test_dataset_command.py | 81 +++++++++++++------ 2 files changed, 62 insertions(+), 24 deletions(-) diff --git a/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py b/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py index ef9e5115e420b..86fbcc01d4f66 100644 --- a/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py +++ b/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py @@ -983,6 +983,9 @@ def to_yaml( # Determine if the file contains a list or a single document if isinstance(existing_data, dict): existing_data = [existing_data] + is_original_list = False + else: + is_original_list = True if isinstance(existing_data, list): # Handle list case updated = False @@ -1042,6 +1045,8 @@ def to_yaml( # Write the updated data back with open(file, "w") as fp: + if not is_original_list: + existing_data = existing_data[0] yaml_handler.dump(existing_data, fp) return True diff --git a/smoke-test/tests/cli/dataset_cmd/test_dataset_command.py b/smoke-test/tests/cli/dataset_cmd/test_dataset_command.py index 25acab433ad78..dfcd59d335e2e 100644 --- a/smoke-test/tests/cli/dataset_cmd/test_dataset_command.py +++ b/smoke-test/tests/cli/dataset_cmd/test_dataset_command.py @@ -1,16 +1,17 @@ import json import logging import os -import subprocess import tempfile from pathlib import Path from random import randint import pytest import yaml +from click.testing import CliRunner from datahub.api.entities.dataset.dataset import Dataset from datahub.emitter.mce_builder import make_dataset_urn +from datahub.entrypoints import datahub from datahub.ingestion.graph.client import DataHubGraph from tests.consistency_utils import wait_for_writes_to_sync from tests.utils import delete_urns, get_sleep_info @@ -23,6 +24,7 @@ dataset_urn = make_dataset_urn("snowflake", dataset_id) sleep_sec, sleep_times = get_sleep_info() +runner = CliRunner(mix_stderr=False) @pytest.fixture(scope="module") @@ -71,20 +73,30 @@ def create_dataset_yaml(file_path: Path, additional_properties=None): yaml.dump(dataset_yaml, f, indent=2) -def run_cli_command(cmd): - """Run a DataHub CLI command""" - result = subprocess.run(cmd, shell=True, capture_output=True, text=True) +def run_cli_command(cmd, auth_session): + """Run a DataHub CLI command using CliRunner and auth_session""" + args = cmd.split() + result = runner.invoke( + datahub, + args, + env={ + "DATAHUB_GMS_URL": auth_session.gms_url(), + "DATAHUB_GMS_TOKEN": auth_session.gms_token(), + }, + ) - if result.returncode != 0: + if result.exit_code != 0: logger.error(f"Command failed: {cmd}") logger.error(f"STDOUT: {result.stdout}") logger.error(f"STDERR: {result.stderr}") - raise Exception(f"Command failed with return code {result.returncode}") + raise Exception(f"Command failed with return code {result.exit_code}") return result -def test_dataset_sync_to_datahub(setup_teardown_dataset, graph_client: DataHubGraph): +def test_dataset_sync_to_datahub( + setup_teardown_dataset, graph_client: DataHubGraph, auth_session +): """Test syncing dataset from YAML to DataHub""" with tempfile.NamedTemporaryFile(suffix=".yml", delete=False) as tmp: temp_file_path = Path(tmp.name) @@ -93,8 +105,8 @@ def test_dataset_sync_to_datahub(setup_teardown_dataset, graph_client: DataHubGr create_dataset_yaml(temp_file_path) # Run the CLI command to sync to DataHub - cmd = f"datahub dataset sync -f {temp_file_path} --to-datahub" - result = run_cli_command(cmd) + cmd = f"dataset sync -f {temp_file_path} --to-datahub" + result = run_cli_command(cmd, auth_session) # Verify success message in output assert f"Update succeeded for urn {dataset_urn}" in result.stdout @@ -120,7 +132,9 @@ def test_dataset_sync_to_datahub(setup_teardown_dataset, graph_client: DataHubGr os.unlink(temp_file_path) -def test_dataset_sync_from_datahub(setup_teardown_dataset, graph_client: DataHubGraph): +def test_dataset_sync_from_datahub( + setup_teardown_dataset, graph_client: DataHubGraph, auth_session +): """Test syncing dataset from DataHub to YAML""" with tempfile.NamedTemporaryFile(suffix=".yml", delete=False) as tmp: temp_file_path = Path(tmp.name) @@ -147,9 +161,9 @@ def test_dataset_sync_from_datahub(setup_teardown_dataset, graph_client: DataHub ) # Run the CLI command to sync from DataHub - cmd = f"datahub dataset sync -f {temp_file_path} --from-datahub" - result = run_cli_command(cmd) - assert result.returncode == 0 + cmd = f"dataset sync -f {temp_file_path} --from-datahub" + result = run_cli_command(cmd, auth_session) + assert result.exit_code == 0 # Wait to ensure file is updated wait_for_writes_to_sync() @@ -169,7 +183,9 @@ def test_dataset_sync_from_datahub(setup_teardown_dataset, graph_client: DataHub os.unlink(temp_file_path) -def test_dataset_sync_bidirectional(setup_teardown_dataset, graph_client: DataHubGraph): +def test_dataset_sync_bidirectional( + setup_teardown_dataset, graph_client: DataHubGraph, auth_session +): """Test bidirectional sync with modifications on both sides""" with tempfile.NamedTemporaryFile(suffix=".yml", delete=False) as tmp: temp_file_path = Path(tmp.name) @@ -178,7 +194,9 @@ def test_dataset_sync_bidirectional(setup_teardown_dataset, graph_client: DataHu create_dataset_yaml(temp_file_path) # 2. Sync to DataHub - run_cli_command(f"datahub dataset sync -f {temp_file_path} --to-datahub") + run_cli_command( + f"dataset sync -f {temp_file_path} --to-datahub", auth_session + ) wait_for_writes_to_sync() # 3. Modify directly in DataHub @@ -189,14 +207,15 @@ def test_dataset_sync_bidirectional(setup_teardown_dataset, graph_client: DataHu wait_for_writes_to_sync() # 4. Sync from DataHub to update YAML - run_cli_command(f"datahub dataset sync -f {temp_file_path} --from-datahub") + run_cli_command( + f"dataset sync -f {temp_file_path} --from-datahub", auth_session + ) # 5. Modify the YAML file directly with open(temp_file_path, "r") as f: import yaml data = yaml.safe_load(f) - data["properties"]["modified_by"] = "cli_test" data["tags"].append("modified_yaml") @@ -205,7 +224,9 @@ def test_dataset_sync_bidirectional(setup_teardown_dataset, graph_client: DataHu json.dump(data, f, indent=2) # 6. Sync back to DataHub - run_cli_command(f"datahub dataset sync -f {temp_file_path} --to-datahub") + run_cli_command( + f"dataset sync -f {temp_file_path} --to-datahub", auth_session + ) wait_for_writes_to_sync() # 7. Verify both modifications are present in DataHub @@ -217,7 +238,9 @@ def test_dataset_sync_bidirectional(setup_teardown_dataset, graph_client: DataHu assert "modified_yaml" in final_dataset.tags # 8. Sync one more time from DataHub and verify YAML is intact - run_cli_command(f"datahub dataset sync -f {temp_file_path} --from-datahub") + run_cli_command( + f"dataset sync -f {temp_file_path} --from-datahub", auth_session + ) with open(temp_file_path, "r") as f: content = f.read() @@ -229,7 +252,9 @@ def test_dataset_sync_bidirectional(setup_teardown_dataset, graph_client: DataHu os.unlink(temp_file_path) -def test_dataset_sync_validation(setup_teardown_dataset, graph_client: DataHubGraph): +def test_dataset_sync_validation( + setup_teardown_dataset, graph_client: DataHubGraph, auth_session +): """Test validation during sync to DataHub with invalid references""" with tempfile.NamedTemporaryFile(suffix=".yml", delete=False) as tmp: temp_file_path = Path(tmp.name) @@ -252,11 +277,19 @@ def test_dataset_sync_validation(setup_teardown_dataset, graph_client: DataHubGr }, }, ) + # Attempt to sync to DataHub - should fail due to validation - cmd = f"datahub dataset sync -f {temp_file_path} --to-datahub" - result = subprocess.run(cmd, shell=True, capture_output=True, text=True) - # Assert that the command failed (non-zero return code) - assert result.returncode != 0 + cmd = f"dataset sync -f {temp_file_path} --to-datahub" + try: + run_cli_command(cmd, auth_session) + # If we get here, the command didn't fail as expected + raise AssertionError("Command should have failed due to validation") + except Exception as e: + if not isinstance(e, AssertionError): + # Command failed as expected + pass + else: + raise e finally: # Clean up temporary file From 178e4c3bc722ee60ed350fe25af6e19b9d0d6aa0 Mon Sep 17 00:00:00 2001 From: Shirshanka Das Date: Mon, 3 Mar 2025 19:27:21 -0800 Subject: [PATCH 11/16] pydantic compat --- .../datahub/api/entities/dataset/dataset.py | 199 +++++++++++++----- .../src/datahub/cli/specific/dataset_cli.py | 3 +- 2 files changed, 147 insertions(+), 55 deletions(-) diff --git a/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py b/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py index 86fbcc01d4f66..cf9cf6a19511d 100644 --- a/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py +++ b/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py @@ -46,9 +46,13 @@ from datahub.metadata.urns import ( DataPlatformUrn, GlossaryTermUrn, + SchemaFieldUrn, StructuredPropertyUrn, TagUrn, ) +from datahub.pydantic.compat import ( + PYDANTIC_VERSION, +) from datahub.specific.dataset import DatasetPatchBuilder from datahub.utilities.urns.dataset_urn import DatasetUrn @@ -57,9 +61,22 @@ class StrictModel(BaseModel): - class Config: - validate_assignment = True - extra = "forbid" + """ + Base model with strict validation. + Compatible with both Pydantic v1 and v2. + """ + + if PYDANTIC_VERSION >= 2: + # Pydantic v2 config + model_config = { + "validate_assignment": True, + "extra": "forbid", + } + else: + # Pydantic v1 config + class Config: + validate_assignment = True + extra = "forbid" class StructuredPropertiesHelper: @@ -263,35 +280,66 @@ def _from_datahub_type( return "record" raise ValueError(f"Type {input_type} is not a valid primitive type") - def dict(self, **kwargs): - """Custom dict method for Pydantic v1 to handle YAML serialization properly.""" - exclude = kwargs.pop("exclude", None) or set() + if PYDANTIC_VERSION < 2: - # If description and doc are identical, exclude doc from the output - if self.description == self.doc and self.description is not None: - exclude.add("doc") + def dict(self, **kwargs): + """Custom dict method for Pydantic v1 to handle YAML serialization properly.""" + exclude = kwargs.pop("exclude", None) or set() - # if nativeDataType and type are identical, exclude nativeDataType from the output - if self.nativeDataType == self.type and self.nativeDataType is not None: - exclude.add("nativeDataType") + # If description and doc are identical, exclude doc from the output + if self.description == self.doc and self.description is not None: + exclude.add("doc") - # if the id is the same as the urn's fieldPath, exclude id from the output - from datahub.metadata.urns import SchemaFieldUrn + # if nativeDataType and type are identical, exclude nativeDataType from the output + if self.nativeDataType == self.type and self.nativeDataType is not None: + exclude.add("nativeDataType") - if self.urn: - field_urn = SchemaFieldUrn.from_string(self.urn) - if Dataset._simplify_field_path(field_urn.field_path) == self.id: - exclude.add("urn") + # if the id is the same as the urn's fieldPath, exclude id from the output - kwargs.pop("exclude_defaults", None) + if self.urn: + field_urn = SchemaFieldUrn.from_string(self.urn) + if Dataset._simplify_field_path(field_urn.field_path) == self.id: + exclude.add("urn") - self.structured_properties = ( - StructuredPropertiesHelper.simplify_structured_properties_list( - self.structured_properties + kwargs.pop("exclude_defaults", None) + + self.structured_properties = ( + StructuredPropertiesHelper.simplify_structured_properties_list( + self.structured_properties + ) ) - ) - return super().dict(exclude=exclude, exclude_defaults=True, **kwargs) + return super().dict(exclude=exclude, exclude_defaults=True, **kwargs) + + else: + # For v2, implement model_dump with similar logic as dict + def model_dump(self, **kwargs): + """Custom model_dump method for Pydantic v2 to handle YAML serialization properly.""" + exclude = kwargs.pop("exclude", None) or set() + + # If description and doc are identical, exclude doc from the output + if self.description == self.doc and self.description is not None: + exclude.add("doc") + + # if nativeDataType and type are identical, exclude nativeDataType from the output + if self.nativeDataType == self.type and self.nativeDataType is not None: + exclude.add("nativeDataType") + + # if the id is the same as the urn's fieldPath, exclude id from the output + if self.urn: + field_urn = SchemaFieldUrn.from_string(self.urn) + if Dataset._simplify_field_path(field_urn.field_path) == self.id: + exclude.add("urn") + + self.structured_properties = ( + StructuredPropertiesHelper.simplify_structured_properties_list( + self.structured_properties + ) + ) + if hasattr(super(), "model_dump"): + return super().model_dump( # type: ignore + exclude=exclude, exclude_defaults=True, **kwargs + ) class SchemaSpecification(BaseModel): @@ -920,40 +968,80 @@ def from_datahub( downstreams=downstreams if config.include_downstreams else None, ) - def dict(self, **kwargs): - """Custom dict method for Pydantic v1 to handle YAML serialization properly.""" - exclude = kwargs.pop("exclude", set()) + if PYDANTIC_VERSION < 2: + + def dict(self, **kwargs): + """Custom dict method for Pydantic v1 to handle YAML serialization properly.""" + exclude = kwargs.pop("exclude", set()) + + # If id and name are identical, exclude name from the output + if self.id == self.name and self.id is not None: + exclude.add("name") + + # if subtype and subtypes are identical or subtypes is a singleton list, exclude subtypes from the output + if self.subtypes and len(self.subtypes) == 1: + self.subtype = self.subtypes[0] + exclude.add("subtypes") + + result = super().dict(exclude=exclude, **kwargs) + + # Custom handling for schema_metadata/schema + if self.schema_metadata and "schema" in result: + schema_data = result["schema"] - # If id and name are identical, exclude name from the output - if self.id == self.name and self.id is not None: - exclude.add("name") + # Handle fields if they exist + if "fields" in schema_data and isinstance(schema_data["fields"], list): + # Process each field using its custom dict method + processed_fields = [] + if self.schema_metadata and self.schema_metadata.fields: + for field in self.schema_metadata.fields: + if field: + # Use dict method for Pydantic v1 + processed_field = field.dict(**kwargs) + processed_fields.append(processed_field) - # if subtype and subtypes are identical or subtypes is a singleton list, exclude subtypes from the output - if self.subtypes and len(self.subtypes) == 1: - self.subtype = self.subtypes[0] - exclude.add("subtypes") + # Replace the fields in the result with the processed ones + schema_data["fields"] = processed_fields - result = super().dict(exclude=exclude, **kwargs) + return result + else: - # Custom handling for schema_metadata/schema - if self.schema_metadata and "schema" in result: - schema_data = result["schema"] + def model_dump(self, **kwargs): + """Custom model_dump method for Pydantic v2 to handle YAML serialization properly.""" + exclude = kwargs.pop("exclude", set()) - # Handle fields if they exist - if "fields" in schema_data and isinstance(schema_data["fields"], list): - # Process each field using its custom dict method - processed_fields = [] - if self.schema_metadata and self.schema_metadata.fields: - for field in self.schema_metadata.fields: - if field: - # Use dict method for Pydantic v1 - processed_field = field.dict(**kwargs) - processed_fields.append(processed_field) + # If id and name are identical, exclude name from the output + if self.id == self.name and self.id is not None: + exclude.add("name") - # Replace the fields in the result with the processed ones - schema_data["fields"] = processed_fields + # if subtype and subtypes are identical or subtypes is a singleton list, exclude subtypes from the output + if self.subtypes and len(self.subtypes) == 1: + self.subtype = self.subtypes[0] + exclude.add("subtypes") - return result + if hasattr(super(), "model_dump"): + result = super().model_dump(exclude=exclude, **kwargs) # type: ignore + else: + result = super().dict(exclude=exclude, **kwargs) + + # Custom handling for schema_metadata/schema + if self.schema_metadata and "schema" in result: + schema_data = result["schema"] + + # Handle fields if they exist + if "fields" in schema_data and isinstance(schema_data["fields"], list): + # Process each field using its custom model_dump method + processed_fields = [] + if self.schema_metadata and self.schema_metadata.fields: + for field in self.schema_metadata.fields: + if field: + processed_field = field.model_dump(**kwargs) + processed_fields.append(processed_field) + + # Replace the fields in the result with the processed ones + schema_data["fields"] = processed_fields + + return result def to_yaml( self, @@ -965,8 +1053,13 @@ def to_yaml( Returns True if file was written, False if no changes were detected. """ # Create new model data - # Create new model data with dict() for Pydantic v1 - new_data = self.dict(exclude_none=True, exclude_unset=True, by_alias=True) + # Create new model data - choose dict() or model_dump() based on Pydantic version + if PYDANTIC_VERSION >= 2: + new_data = self.model_dump( + exclude_none=True, exclude_unset=True, by_alias=True + ) + else: + new_data = self.dict(exclude_none=True, exclude_unset=True, by_alias=True) # Set up ruamel.yaml for preserving comments yaml_handler = YAML(typ="rt") # round-trip mode diff --git a/metadata-ingestion/src/datahub/cli/specific/dataset_cli.py b/metadata-ingestion/src/datahub/cli/specific/dataset_cli.py index 012a4fff4ad04..c3b5a82721367 100644 --- a/metadata-ingestion/src/datahub/cli/specific/dataset_cli.py +++ b/metadata-ingestion/src/datahub/cli/specific/dataset_cli.py @@ -1,3 +1,4 @@ +import filecmp import json import logging import os @@ -135,8 +136,6 @@ def file(lintcheck: bool, lintfix: bool, file: str) -> None: dataset.to_yaml(temp_path) # Compare the files - import filecmp - files_match = filecmp.cmp(file, temp_path) if files_match: From b3fc57cf4b9bc0bfef8dd86be18e32bc7b79b731 Mon Sep 17 00:00:00 2001 From: Shirshanka Das Date: Mon, 3 Mar 2025 19:46:26 -0800 Subject: [PATCH 12/16] add compat for pydantic v2 --- .../structured_properties/dataset.yaml | 2 +- .../datahub/api/entities/dataset/dataset.py | 41 +++++++++++-------- 2 files changed, 25 insertions(+), 18 deletions(-) diff --git a/metadata-ingestion/examples/structured_properties/dataset.yaml b/metadata-ingestion/examples/structured_properties/dataset.yaml index 13cb3de7693d6..6f8c3a69bb951 100644 --- a/metadata-ingestion/examples/structured_properties/dataset.yaml +++ b/metadata-ingestion/examples/structured_properties/dataset.yaml @@ -10,7 +10,7 @@ structured_properties: io.acryl.dataManagement.deprecationDate: '2023-01-01' io.acryl.dataManagement.certifier: urn:li:corpuser:john.doe@example.com - io.acryl.dataManagement.replicationSLA: 90.0 + io.acryl.dataManagement.replicationSLA: 90 - id: url structured_properties: io.acryl.dataManagement.deprecationDate: '2023-01-01' diff --git a/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py b/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py index cf9cf6a19511d..b7dfe0105f620 100644 --- a/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py +++ b/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py @@ -2,7 +2,17 @@ import logging import time from pathlib import Path -from typing import Dict, Iterable, List, Literal, Optional, Tuple, Union, get_args +from typing import ( + Dict, + Iterable, + List, + Literal, + Optional, + Tuple, + TypeAlias, + Union, + get_args, +) import avro import yaml @@ -79,13 +89,17 @@ class Config: extra = "forbid" +# Define type aliases for the complex types +PropertyValue: TypeAlias = Union[float, int, str] +PropertyValueList: TypeAlias = List[PropertyValue] +StructuredProperties: TypeAlias = Dict[str, Union[PropertyValue, PropertyValueList]] + + class StructuredPropertiesHelper: @staticmethod def simplify_structured_properties_list( - structured_properties: Optional[ - Dict[str, Union[float, int, str, List[Union[float, int, str]]]] - ], - ) -> Optional[Dict[str, Union[float, int, str, List[Union[float, int, str]]]]]: + structured_properties: Optional[StructuredProperties], + ) -> Optional[StructuredProperties]: def urn_strip(urn: str) -> str: if urn.startswith("urn:li:structuredProperty:"): return urn[len("urn:li:structuredProperty:") :] @@ -122,9 +136,7 @@ def urn_strip(urn: str) -> str: class SchemaFieldSpecification(StrictModel): id: Optional[str] = None urn: Optional[str] = None - structured_properties: Optional[ - Dict[str, Union[float, str, List[Union[float, str]]]] - ] = None + structured_properties: Optional[StructuredProperties] = None type: Optional[str] = None nativeDataType: Optional[str] = None jsonPath: Union[None, str] = None @@ -156,10 +168,7 @@ def remove_type_metadata(self) -> "SchemaFieldSpecification": return self def with_structured_properties( - self, - structured_properties: Optional[ - Dict[str, Union[float, int, str, List[Union[float, int, str]]]] - ], + self, structured_properties: Optional[StructuredProperties] ) -> "SchemaFieldSpecification": self.structured_properties = ( StructuredPropertiesHelper.simplify_structured_properties_list( @@ -365,7 +374,7 @@ def ownership_type_must_be_mappable_or_custom(cls, v: str) -> str: class StructuredPropertyValue(ConfigModel): - value: Union[str, float, List[str], List[float]] + value: Union[str, int, float, List[str], List[int], List[float]] created: Optional[str] = None lastModified: Optional[str] = None @@ -389,9 +398,7 @@ class Dataset(StrictModel): tags: Optional[List[str]] = None glossary_terms: Optional[List[str]] = None owners: Optional[List[Union[str, Ownership]]] = None - structured_properties: Optional[ - Dict[str, Union[str, float, List[Union[str, float]]]] - ] = None + structured_properties: Optional[StructuredProperties] = None external_url: Optional[str] = None @property @@ -918,7 +925,7 @@ def from_datahub( urn, StructuredPropertiesClass ) if structured_properties: - structured_properties_map: Dict[str, List[Union[str, float]]] = {} + structured_properties_map: StructuredProperties = {} for sp in structured_properties.properties: if sp.propertyUrn in structured_properties_map: assert isinstance(structured_properties_map[sp.propertyUrn], list) From 37ec051ab37e5ca9adb92e6989d990abbe0f2d15 Mon Sep 17 00:00:00 2001 From: Shirshanka Das Date: Mon, 3 Mar 2025 19:47:56 -0800 Subject: [PATCH 13/16] add compat for pydantic v2 --- .../src/datahub/pydantic/__init__.py | 0 .../src/datahub/pydantic/compat.py | 58 +++++++++++++++++++ 2 files changed, 58 insertions(+) create mode 100644 metadata-ingestion/src/datahub/pydantic/__init__.py create mode 100644 metadata-ingestion/src/datahub/pydantic/compat.py diff --git a/metadata-ingestion/src/datahub/pydantic/__init__.py b/metadata-ingestion/src/datahub/pydantic/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/metadata-ingestion/src/datahub/pydantic/compat.py b/metadata-ingestion/src/datahub/pydantic/compat.py new file mode 100644 index 0000000000000..f6c1208ae85af --- /dev/null +++ b/metadata-ingestion/src/datahub/pydantic/compat.py @@ -0,0 +1,58 @@ +import functools +from typing import Any, Callable, Optional, TypeVar, cast + +# Define a type variable for the decorator +F = TypeVar("F", bound=Callable[..., Any]) + + +# Check which Pydantic version is installed +def get_pydantic_version() -> int: + """Determine if Pydantic v1 or v2 is installed.""" + try: + import pydantic + + version = pydantic.__version__ + return 1 if version.startswith("1.") else 2 + except (ImportError, AttributeError): + # Default to v1 if we can't determine version + return 1 + + +PYDANTIC_VERSION = get_pydantic_version() + + +# Create compatibility layer for dict-like methods +def compat_dict_method(v1_method: Optional[Callable] = None) -> Callable: + """ + Decorator to make a dict method work with both Pydantic v1 and v2. + + In v1: Uses the decorated method (typically dict) + In v2: Redirects to model_dump with appropriate parameter mapping + """ + + def decorator(func: F) -> F: + @functools.wraps(func) + def wrapper(self, *args, **kwargs): + if PYDANTIC_VERSION >= 2: + # Map v1 parameters to v2 parameters + # exclude -> exclude + # exclude_unset -> exclude_unset + # exclude_defaults -> exclude_defaults + # exclude_none -> exclude_none + # by_alias -> by_alias + model_dump_kwargs = kwargs.copy() + + # Handle the 'exclude' parameter differently between versions + exclude = kwargs.get("exclude", set()) + if isinstance(exclude, (set, dict)): + model_dump_kwargs["exclude"] = exclude + + return self.model_dump(**model_dump_kwargs) + return func(self, *args, **kwargs) + + return cast(F, wrapper) + + # Allow use as both @compat_dict_method and @compat_dict_method() + if v1_method is None: + return decorator + return decorator(v1_method) From cd8d1d6f195bd46a073cbc95db70ce915983844f Mon Sep 17 00:00:00 2001 From: Shirshanka Das Date: Mon, 3 Mar 2025 21:24:54 -0800 Subject: [PATCH 14/16] remove int --- .../src/datahub/api/entities/dataset/dataset.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py b/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py index b7dfe0105f620..0330345b5e008 100644 --- a/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py +++ b/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py @@ -9,7 +9,6 @@ Literal, Optional, Tuple, - TypeAlias, Union, get_args, ) @@ -18,6 +17,7 @@ import yaml from pydantic import BaseModel, Field, root_validator, validator from ruamel.yaml import YAML +from typing_extensions import TypeAlias import datahub.metadata.schema_classes as models from datahub.api.entities.structuredproperties.structuredproperties import AllowedTypes @@ -90,7 +90,7 @@ class Config: # Define type aliases for the complex types -PropertyValue: TypeAlias = Union[float, int, str] +PropertyValue: TypeAlias = Union[float, str] PropertyValueList: TypeAlias = List[PropertyValue] StructuredProperties: TypeAlias = Dict[str, Union[PropertyValue, PropertyValueList]] From 6c72bbec9e0e0d20d1ad30afab598839580923c7 Mon Sep 17 00:00:00 2001 From: Shirshanka Das Date: Mon, 3 Mar 2025 23:01:56 -0800 Subject: [PATCH 15/16] fix test --- .../test_structured_properties.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/smoke-test/tests/structured_properties/test_structured_properties.py b/smoke-test/tests/structured_properties/test_structured_properties.py index 8b0f61d8a5ea2..d785e4274a53b 100644 --- a/smoke-test/tests/structured_properties/test_structured_properties.py +++ b/smoke-test/tests/structured_properties/test_structured_properties.py @@ -341,8 +341,9 @@ def test_dataset_yaml_loader(ingest_cleanup_data, graph_client): wait_for_writes_to_sync() property_name = "io.acryl.dataManagement.deprecationDate" + field_name = "[version=2.0].[type=ClickEvent].[type=string].ip" assert get_property_from_entity( - make_schema_field_urn(make_dataset_urn("hive", "user.clicks"), "ip"), + make_schema_field_urn(make_dataset_urn("hive", "user.clicks"), field_name), property_name, graph=graph_client, ) == ["2023-01-01"] @@ -351,19 +352,21 @@ def test_dataset_yaml_loader(ingest_cleanup_data, graph_client): graph=graph_client, urn="urn:li:dataset:(urn:li:dataPlatform:hive,user.clicks,PROD)", ) - field_name = "ip" assert dataset.schema_metadata is not None assert dataset.schema_metadata.fields is not None matching_fields = [ f for f in dataset.schema_metadata.fields - if f.id is not None and Dataset._simplify_field_path(f.id) == field_name + if f.id is not None and f.id == Dataset._simplify_field_path(field_name) ] assert len(matching_fields) == 1 assert matching_fields[0].structured_properties is not None - assert matching_fields[0].structured_properties[ - Urn.make_structured_property_urn("io.acryl.dataManagement.deprecationDate") - ] == ["2023-01-01"] + assert ( + matching_fields[0].structured_properties[ + "io.acryl.dataManagement.deprecationDate" + ] + == "2023-01-01" + ) def test_structured_property_search( From d4f5597d3e708ae8dc1d19fccf88d879c0d31593 Mon Sep 17 00:00:00 2001 From: Chakravarthy Racharla Date: Tue, 4 Mar 2025 20:39:17 +0530 Subject: [PATCH 16/16] remove optional int conversion --- .../src/datahub/api/entities/dataset/dataset.py | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py b/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py index 0330345b5e008..26f95991b0f99 100644 --- a/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py +++ b/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py @@ -115,20 +115,13 @@ def urn_strip(urn: str) -> str: # convert lists to single values if possible for k, v in simplified_structured_properties.items(): if isinstance(v, list): - v = [ - int(x) if isinstance(x, float) and x.is_integer() else x - for x in v - ] if len(v) == 1: simplified_structured_properties[k] = v[0] else: simplified_structured_properties[k] = v else: - simplified_structured_properties[k] = ( - int(v) - if v and isinstance(v, float) and v.is_integer() - else v - ) + simplified_structured_properties[k] = v + return simplified_structured_properties return None