diff --git a/docs-website/sidebars.js b/docs-website/sidebars.js index b0289ec56b0905..404f6f376ded8b 100644 --- a/docs-website/sidebars.js +++ b/docs-website/sidebars.js @@ -742,7 +742,7 @@ module.exports = { type: "category", label: "DataHub CLI", link: { type: "doc", id: "docs/cli" }, - items: ["docs/datahub_lite"], + items: ["docs/cli-commands/dataset", "docs/datahub_lite"], }, { type: "category", diff --git a/docs/cli-commands/dataset.md b/docs/cli-commands/dataset.md new file mode 100644 index 00000000000000..bd4e7b28c3968e --- /dev/null +++ b/docs/cli-commands/dataset.md @@ -0,0 +1,285 @@ +# DataHub Dataset Command + +The `dataset` command allows you to interact with Dataset entities in DataHub. This includes creating, updating, retrieving, validating, and synchronizing Dataset metadata. + +## Commands + +### sync + +Synchronize Dataset metadata between YAML files and DataHub. + +```shell +datahub dataset sync -f PATH_TO_YAML_FILE --to-datahub|--from-datahub +``` + +**Options:** +- `-f, --file` - Path to the YAML file (required) +- `--to-datahub` - Push metadata from YAML file to DataHub +- `--from-datahub` - Pull metadata from DataHub to YAML file + +**Example:** +```shell +# Push to DataHub +datahub dataset sync -f dataset.yaml --to-datahub + +# Pull from DataHub +datahub dataset sync -f dataset.yaml --from-datahub +``` + +The `sync` command offers bidirectional synchronization, allowing you to keep your local YAML files in sync with the DataHub platform. The `upsert` command actually uses `sync` with the `--to-datahub` flag internally. + +For details on the supported YAML format, see the [Dataset YAML Format](#dataset-yaml-format) section. + +### file + +Operate on a Dataset YAML file for validation or linting. + +```shell +datahub dataset file [--lintCheck] [--lintFix] PATH_TO_YAML_FILE +``` + +**Options:** +- `--lintCheck` - Check the YAML file for formatting issues (optional) +- `--lintFix` - Fix formatting issues in the YAML file (optional) + +**Example:** +```shell +# Check for linting issues +datahub dataset file --lintCheck dataset.yaml + +# Fix linting issues +datahub dataset file --lintFix dataset.yaml +``` + +This command helps maintain consistent formatting of your Dataset YAML files. For more information on the expected format, refer to the [Dataset YAML Format](#dataset-yaml-format) section. + +### upsert + +Create or update Dataset metadata in DataHub. + +```shell +datahub dataset upsert -f PATH_TO_YAML_FILE +``` + +**Options:** +- `-f, --file` - Path to the YAML file containing Dataset metadata (required) + +**Example:** +```shell +datahub dataset upsert -f dataset.yaml +``` + +This command will parse the YAML file, validate that any entity references exist in DataHub, and then emit the corresponding metadata change proposals to update or create the Dataset. + +For details on the required structure of your YAML file, see the [Dataset YAML Format](#dataset-yaml-format) section. + +### get + +Retrieve Dataset metadata from DataHub and optionally write it to a file. + +```shell +datahub dataset get --urn DATASET_URN [--to-file OUTPUT_FILE] +``` + +**Options:** +- `--urn` - The Dataset URN to retrieve (required) +- `--to-file` - Path to write the Dataset metadata as YAML (optional) + +**Example:** +```shell +datahub dataset get --urn "urn:li:dataset:(urn:li:dataPlatform:hive,example_table,PROD)" --to-file my_dataset.yaml +``` + +If the URN does not start with `urn:li:dataset:`, it will be automatically prefixed. + +The output file will be formatted according to the [Dataset YAML Format](#dataset-yaml-format) section. + +### add_sibling + +Add sibling relationships between Datasets. + +```shell +datahub dataset add_sibling --urn PRIMARY_URN --sibling-urns SECONDARY_URN [--sibling-urns ANOTHER_URN ...] +``` + +**Options:** +- `--urn` - URN of the primary Dataset (required) +- `--sibling-urns` - URNs of secondary sibling Datasets (required, multiple allowed) + +**Example:** +```shell +datahub dataset add_sibling --urn "urn:li:dataset:(urn:li:dataPlatform:hive,example_table,PROD)" --sibling-urns "urn:li:dataset:(urn:li:dataPlatform:snowflake,example_table,PROD)" +``` + +Siblings are semantically equivalent datasets, typically representing the same data across different platforms or environments. + +## Dataset YAML Format + +The Dataset YAML file follows a structured format with various supported fields: + +```yaml +# Basic identification (required) +id: "example_table" # Dataset identifier +platform: "hive" # Platform name +env: "PROD" # Environment (PROD by default) + +# Metadata (optional) +name: "Example Table" # Display name (defaults to id if not specified) +description: "This is an example table" + +# Schema definition (optional) +schema: + fields: + - id: "field1" # Field identifier + type: "string" # Data type + description: "First field" # Field description + doc: "First field" # Alias for description + nativeDataType: "VARCHAR" # Native platform type (defaults to type if not specified) + nullable: false # Whether field can be null (default: false) + label: "Field One" # Display label (optional business label for the field) + isPartOfKey: true # Whether field is part of primary key + isPartitioningKey: false # Whether field is a partitioning key + jsonProps: {"customProp": "value"} # Custom JSON properties + + - id: "field2" + type: "number" + description: "Second field" + nullable: true + globalTags: ["PII", "Sensitive"] + glossaryTerms: ["urn:li:glossaryTerm:Revenue"] + structured_properties: + property1: "value1" + property2: 42 + file: example.schema.avsc # Optional schema file (required if defining tables with nested fields) + +# Additional metadata (all optional) +properties: # Custom properties as key-value pairs + origin: "external" + pipeline: "etl_daily" + +subtype: "View" # Dataset subtype +subtypes: ["View", "Materialized"] # Multiple subtypes (if only one, use subtype field instead) + +downstreams: # Downstream Dataset URNs + - "urn:li:dataset:(urn:li:dataPlatform:hive,downstream_table,PROD)" + +tags: # Tags + - "Tier1" + - "Verified" + +glossary_terms: # Associated glossary terms + - "urn:li:glossaryTerm:Revenue" + +owners: # Dataset owners + - "jdoe" # Simple format (defaults to TECHNICAL_OWNER) + - id: "alice" # Extended format with ownership type + type: "BUSINESS_OWNER" + +structured_properties: # Structured properties + priority: "P1" + cost_center: 123 + +external_url: "https://example.com/datasets/example_table" +``` + +You can also define multiple datasets in a single YAML file by using a list format: + +```yaml +- id: "dataset1" + platform: "hive" + description: "First dataset" + # other properties... + +- id: "dataset2" + platform: "snowflake" + description: "Second dataset" + # other properties... +``` + +### Schema Definition + +You can define Dataset schema in two ways: + +1. **Direct field definitions** as shown above + > **Important limitation**: When using inline schema field definitions, only non-nested (flat) fields are currently supported. For nested or complex schemas, you must use the Avro file approach described below. + +2. **Reference to an Avro schema file**: + ```yaml + schema: + file: "path/to/schema.avsc" + ``` + +Even when using the Avro file approach for the basic schema structure, you can still use the `fields` section to provide additional metadata like structured properties, tags, and glossary terms for your schema fields. + +#### Schema Field Properties + +The Schema Field object supports the following properties: + +| Property | Type | Description | +|----------|------|-------------| +| `id` | string | Field identifier/path (required if `urn` not provided) | +| `urn` | string | URN of the schema field (required if `id` not provided) | +| `type` | string | Data type (one of the supported [Field Types](#field-types)) | +| `nativeDataType` | string | Native data type in the source platform (defaults to `type` if not specified) | +| `description` | string | Field description | +| `doc` | string | Alias for description | +| `nullable` | boolean | Whether the field can be null (default: false) | +| `label` | string | Display label for the field | +| `recursive` | boolean | Whether the field is recursive (default: false) | +| `isPartOfKey` | boolean | Whether the field is part of the primary key | +| `isPartitioningKey` | boolean | Whether the field is a partitioning key | +| `jsonProps` | object | Custom JSON properties | +| `globalTags` | array | List of tags associated with the field | +| `glossaryTerms` | array | List of glossary terms associated with the field | +| `structured_properties` | object | Structured properties for the field | + + +**Important Note on Schema Field Types**: +When specifying fields in the YAML file, you must follow an all-or-nothing approach with the `type` field: +- If you want the command to generate the schema for you, specify the `type` field for ALL fields. +- If you only want to add field-level metadata (like tags, glossary terms, or structured properties), do NOT specify the `type` field for ANY field. + +Example of fields with only metadata (no types): +```yaml +schema: + fields: + - id: "field1" # Field identifier + structured_properties: + prop1: prop_value + - id: "field2" + structured_properties: + prop1: prop_value +``` + +### Ownership Types + +When specifying owners, the following ownership types are supported: +- `TECHNICAL_OWNER` (default) +- `BUSINESS_OWNER` +- `DATA_STEWARD` + +Custom ownership types can be specified using the URN format. + +### Field Types + +When defining schema fields, the following primitive types are supported: +- `string` +- `number` +- `int` +- `long` +- `float` +- `double` +- `boolean` +- `bytes` +- `fixed` + +## Implementation Notes + +- URNs are generated automatically if not provided, based on the platform, id, and env values +- The command performs validation to ensure referenced entities (like structured properties) exist +- When updating schema fields, changes are propagated correctly to maintain consistent metadata +- The Dataset object will check for existence of entity references and will skip datasets with missing references +- When using the `sync` command with `--from-datahub`, existing YAML files will be updated with metadata from DataHub while preserving comments and structure +- For structured properties, single values are simplified (not wrapped in lists) when appropriate +- Field paths are simplified for better readability +- When specifying field types, all fields must have type information or none of them should \ No newline at end of file diff --git a/docs/cli.md b/docs/cli.md index f332f77d9d21a5..6bedb1f6825123 100644 --- a/docs/cli.md +++ b/docs/cli.md @@ -404,21 +404,17 @@ datahub timeline --urn "urn:li:dataset:(urn:li:dataPlatform:mysql,User.UserAccou ### dataset (Dataset Entity) -The `dataset` command allows you to interact with the dataset entity. - -The `get` operation can be used to read in a dataset into a yaml file. +The `dataset` command allows you to interact with Dataset entities in DataHub, including creating, updating, retrieving, and validating Dataset metadata. ```shell -datahub dataset get --urn "$URN" --to-file "$FILE_NAME" -``` +# Get a dataset and write to YAML file +datahub dataset get --urn "urn:li:dataset:(urn:li:dataPlatform:hive,example_table,PROD)" --to-file dataset.yaml -The `upsert` operation can be used to create a new user or update an existing one. - -```shell +# Create or update dataset from YAML file datahub dataset upsert -f dataset.yaml ``` -An example of `dataset.yaml` would look like as in [dataset.yaml](https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/examples/cli_usage/dataset/dataset.yaml). +➡️ [Learn more about the dataset command](./cli-commands/dataset.md) ### user (User Entity) diff --git a/metadata-ingestion/examples/structured_properties/click_event.avsc b/metadata-ingestion/examples/structured_properties/click_event.avsc index b277674f8b62fb..a0b8e57982fb95 100644 --- a/metadata-ingestion/examples/structured_properties/click_event.avsc +++ b/metadata-ingestion/examples/structured_properties/click_event.avsc @@ -9,6 +9,16 @@ { "name": "referer", "type": ["string", "null"] }, { "name": "user_agent", "type": ["string", "null"] }, { "name": "user_id", "type": ["string", "null"] }, - { "name": "session_id", "type": ["string", "null"] } + { "name": "session_id", "type": ["string", "null"] }, + { + "name": "locator", "type": { + "type": "record", + "name": "Locator", + "fields": [ + { "name": "latitude", "type": "float" }, + { "name": "longitude", "type": "float" } + ] + } + } ] } diff --git a/metadata-ingestion/examples/structured_properties/dataset.yaml b/metadata-ingestion/examples/structured_properties/dataset.yaml index 557bf0167a51bc..6f8c3a69bb951d 100644 --- a/metadata-ingestion/examples/structured_properties/dataset.yaml +++ b/metadata-ingestion/examples/structured_properties/dataset.yaml @@ -6,17 +6,22 @@ schema: file: examples/structured_properties/click_event.avsc fields: - - id: ip - - urn: urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,user.clicks,PROD),ip) - structured_properties: # structured properties for schema fields/columns go here - io.acryl.dataManagement.deprecationDate: "2023-01-01" - io.acryl.dataManagement.certifier: urn:li:corpuser:john.doe@example.com - io.acryl.dataManagement.replicationSLA: 90 + - id: ip + structured_properties: + io.acryl.dataManagement.deprecationDate: '2023-01-01' + io.acryl.dataManagement.certifier: urn:li:corpuser:john.doe@example.com + io.acryl.dataManagement.replicationSLA: 90 + - id: url + structured_properties: + io.acryl.dataManagement.deprecationDate: '2023-01-01' + - id: locator.latitude + structured_properties: + io.acryl.dataManagement.deprecationDate: '2023-01-01' structured_properties: # dataset level structured properties go here io.acryl.privacy.retentionTime: 365 projectNames: - - Tracking - - DataHub + - Tracking + - DataHub - id: ClickEvent platform: events subtype: Topic @@ -27,19 +32,20 @@ project_name: Tracking namespace: org.acryl.tracking version: 1.0.0 - retention: 30 + retention: '30' structured_properties: io.acryl.dataManagement.certifier: urn:li:corpuser:john.doe@example.com schema: file: examples/structured_properties/click_event.avsc downstreams: - - urn:li:dataset:(urn:li:dataPlatform:hive,user.clicks,PROD) + - urn:li:dataset:(urn:li:dataPlatform:hive,user.clicks,PROD) - id: user.clicks platform: snowflake - schema: - fields: - - id: user_id - structured_properties: - io.acryl.dataManagement.deprecationDate: "2023-01-01" structured_properties: io.acryl.dataManagement.replicationSLA: 90 + schema: + fields: + - id: user_id + structured_properties: + io.acryl.dataManagement.deprecationDate: '2023-01-01' + type: string diff --git a/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py b/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py index bf824a11a77b5d..26f95991b0f993 100644 --- a/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py +++ b/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py @@ -2,11 +2,24 @@ import logging import time from pathlib import Path -from typing import Dict, Iterable, List, Optional, Tuple, Union +from typing import ( + Dict, + Iterable, + List, + Literal, + Optional, + Tuple, + Union, + get_args, +) -from pydantic import BaseModel, Field, validator +import avro +import yaml +from pydantic import BaseModel, Field, root_validator, validator from ruamel.yaml import YAML +from typing_extensions import TypeAlias +import datahub.metadata.schema_classes as models from datahub.api.entities.structuredproperties.structuredproperties import AllowedTypes from datahub.configuration.common import ConfigModel from datahub.emitter.mce_builder import ( @@ -40,6 +53,16 @@ TagAssociationClass, UpstreamClass, ) +from datahub.metadata.urns import ( + DataPlatformUrn, + GlossaryTermUrn, + SchemaFieldUrn, + StructuredPropertyUrn, + TagUrn, +) +from datahub.pydantic.compat import ( + PYDANTIC_VERSION, +) from datahub.specific.dataset import DatasetPatchBuilder from datahub.utilities.urns.dataset_urn import DatasetUrn @@ -47,35 +70,103 @@ logger = logging.getLogger(__name__) -class SchemaFieldSpecification(BaseModel): +class StrictModel(BaseModel): + """ + Base model with strict validation. + Compatible with both Pydantic v1 and v2. + """ + + if PYDANTIC_VERSION >= 2: + # Pydantic v2 config + model_config = { + "validate_assignment": True, + "extra": "forbid", + } + else: + # Pydantic v1 config + class Config: + validate_assignment = True + extra = "forbid" + + +# Define type aliases for the complex types +PropertyValue: TypeAlias = Union[float, str] +PropertyValueList: TypeAlias = List[PropertyValue] +StructuredProperties: TypeAlias = Dict[str, Union[PropertyValue, PropertyValueList]] + + +class StructuredPropertiesHelper: + @staticmethod + def simplify_structured_properties_list( + structured_properties: Optional[StructuredProperties], + ) -> Optional[StructuredProperties]: + def urn_strip(urn: str) -> str: + if urn.startswith("urn:li:structuredProperty:"): + return urn[len("urn:li:structuredProperty:") :] + return urn + + if structured_properties: + simplified_structured_properties = ( + {urn_strip(k): v for k, v in structured_properties.items()} + if structured_properties + else None + ) + if simplified_structured_properties: + # convert lists to single values if possible + for k, v in simplified_structured_properties.items(): + if isinstance(v, list): + if len(v) == 1: + simplified_structured_properties[k] = v[0] + else: + simplified_structured_properties[k] = v + else: + simplified_structured_properties[k] = v + + return simplified_structured_properties + return None + + +class SchemaFieldSpecification(StrictModel): id: Optional[str] = None urn: Optional[str] = None - structured_properties: Optional[ - Dict[str, Union[str, float, List[Union[str, float]]]] - ] = None + structured_properties: Optional[StructuredProperties] = None type: Optional[str] = None nativeDataType: Optional[str] = None jsonPath: Union[None, str] = None - nullable: Optional[bool] = None + nullable: bool = False description: Union[None, str] = None + doc: Union[None, str] = None # doc is an alias for description label: Optional[str] = None created: Optional[dict] = None lastModified: Optional[dict] = None - recursive: Optional[bool] = None + recursive: bool = False globalTags: Optional[List[str]] = None glossaryTerms: Optional[List[str]] = None isPartOfKey: Optional[bool] = None isPartitioningKey: Optional[bool] = None jsonProps: Optional[dict] = None + def remove_type_metadata(self) -> "SchemaFieldSpecification": + """ + Removes type metadata from the schema field specification. + This is useful when syncing field metadata back to yaml when + the type information is already present in the schema file. + """ + self.type = None + self.nativeDataType = None + self.jsonPath = None + self.isPartitioningKey = None + self.isPartOfKey = None + self.jsonProps = None + return self + def with_structured_properties( - self, - structured_properties: Optional[Dict[str, List[Union[str, float]]]], + self, structured_properties: Optional[StructuredProperties] ) -> "SchemaFieldSpecification": self.structured_properties = ( - {k: v for k, v in structured_properties.items()} - if structured_properties - else None + StructuredPropertiesHelper.simplify_structured_properties_list( + structured_properties + ) ) return self @@ -85,10 +176,10 @@ def from_schema_field( ) -> "SchemaFieldSpecification": return SchemaFieldSpecification( id=Dataset._simplify_field_path(schema_field.fieldPath), - urn=make_schema_field_urn( - parent_urn, Dataset._simplify_field_path(schema_field.fieldPath) + urn=make_schema_field_urn(parent_urn, schema_field.fieldPath), + type=SchemaFieldSpecification._from_datahub_type( + schema_field.type, schema_field.nativeDataType, allow_complex=True ), - type=str(schema_field.type), nativeDataType=schema_field.nativeDataType, nullable=schema_field.nullable, description=schema_field.description, @@ -100,14 +191,15 @@ def from_schema_field( else None ), recursive=schema_field.recursive, - globalTags=( - schema_field.globalTags.__dict__ if schema_field.globalTags else None - ), - glossaryTerms=( - schema_field.glossaryTerms.__dict__ - if schema_field.glossaryTerms - else None - ), + globalTags=[TagUrn(tag.tag).name for tag in schema_field.globalTags.tags] + if schema_field.globalTags + else None, + glossaryTerms=[ + GlossaryTermUrn(term.urn).name + for term in schema_field.glossaryTerms.terms + ] + if schema_field.glossaryTerms + else None, isPartitioningKey=schema_field.isPartitioningKey, jsonProps=( json.loads(schema_field.jsonProps) if schema_field.jsonProps else None @@ -120,10 +212,142 @@ def either_id_or_urn_must_be_filled_out(cls, v, values): raise ValueError("Either id or urn must be present") return v + @root_validator(pre=True) + def sync_description_and_doc(cls, values: Dict) -> Dict: + """Synchronize doc and description fields if one is provided but not the other.""" + description = values.get("description") + doc = values.get("doc") + + if description is not None and doc is None: + values["doc"] = description + elif doc is not None and description is None: + values["description"] = doc + + return values + + def get_datahub_type(self) -> models.SchemaFieldDataTypeClass: + PrimitiveType = Literal[ + "string", + "number", + "int", + "long", + "float", + "double", + "boolean", + "bytes", + "fixed", + ] + type = self.type.lower() if self.type else self.type + if type not in set(get_args(PrimitiveType)): + raise ValueError(f"Type {self.type} is not a valid primitive type") + + if type == "string": + return models.SchemaFieldDataTypeClass(type=models.StringTypeClass()) + elif type in ["number", "long", "float", "double", "int"]: + return models.SchemaFieldDataTypeClass(type=models.NumberTypeClass()) + elif type == "fixed": + return models.SchemaFieldDataTypeClass(type=models.FixedTypeClass()) + elif type == "bytes": + return models.SchemaFieldDataTypeClass(type=models.BytesTypeClass()) + elif type == "boolean": + return models.SchemaFieldDataTypeClass(type=models.BooleanTypeClass()) + + raise ValueError(f"Type {self.type} is not a valid primitive type") + + @staticmethod + def _from_datahub_type( + input_type: models.SchemaFieldDataTypeClass, + native_data_type: str, + allow_complex: bool = False, + ) -> str: + if isinstance(input_type.type, models.StringTypeClass): + return "string" + elif isinstance(input_type.type, models.NumberTypeClass): + if native_data_type in ["long", "float", "double", "int"]: + return native_data_type + return "number" + elif isinstance(input_type.type, models.FixedTypeClass): + return "fixed" + elif isinstance(input_type.type, models.BytesTypeClass): + return "bytes" + elif isinstance(input_type.type, models.BooleanTypeClass): + return "boolean" + elif allow_complex and isinstance(input_type.type, models.ArrayTypeClass): + return "array" + elif allow_complex and isinstance(input_type.type, models.MapTypeClass): + return "map" + elif allow_complex and isinstance(input_type.type, models.UnionTypeClass): + return "union" + elif allow_complex: + return "record" + raise ValueError(f"Type {input_type} is not a valid primitive type") + + if PYDANTIC_VERSION < 2: + + def dict(self, **kwargs): + """Custom dict method for Pydantic v1 to handle YAML serialization properly.""" + exclude = kwargs.pop("exclude", None) or set() + + # If description and doc are identical, exclude doc from the output + if self.description == self.doc and self.description is not None: + exclude.add("doc") + + # if nativeDataType and type are identical, exclude nativeDataType from the output + if self.nativeDataType == self.type and self.nativeDataType is not None: + exclude.add("nativeDataType") + + # if the id is the same as the urn's fieldPath, exclude id from the output + + if self.urn: + field_urn = SchemaFieldUrn.from_string(self.urn) + if Dataset._simplify_field_path(field_urn.field_path) == self.id: + exclude.add("urn") + + kwargs.pop("exclude_defaults", None) + + self.structured_properties = ( + StructuredPropertiesHelper.simplify_structured_properties_list( + self.structured_properties + ) + ) + + return super().dict(exclude=exclude, exclude_defaults=True, **kwargs) + + else: + # For v2, implement model_dump with similar logic as dict + def model_dump(self, **kwargs): + """Custom model_dump method for Pydantic v2 to handle YAML serialization properly.""" + exclude = kwargs.pop("exclude", None) or set() + + # If description and doc are identical, exclude doc from the output + if self.description == self.doc and self.description is not None: + exclude.add("doc") + + # if nativeDataType and type are identical, exclude nativeDataType from the output + if self.nativeDataType == self.type and self.nativeDataType is not None: + exclude.add("nativeDataType") + + # if the id is the same as the urn's fieldPath, exclude id from the output + if self.urn: + field_urn = SchemaFieldUrn.from_string(self.urn) + if Dataset._simplify_field_path(field_urn.field_path) == self.id: + exclude.add("urn") + + self.structured_properties = ( + StructuredPropertiesHelper.simplify_structured_properties_list( + self.structured_properties + ) + ) + if hasattr(super(), "model_dump"): + return super().model_dump( # type: ignore + exclude=exclude, exclude_defaults=True, **kwargs + ) + class SchemaSpecification(BaseModel): file: Optional[str] = None fields: Optional[List[SchemaFieldSpecification]] = None + raw_schema: Optional[str] = None @validator("file") def file_must_be_avsc(cls, v): @@ -143,12 +367,16 @@ def ownership_type_must_be_mappable_or_custom(cls, v: str) -> str: class StructuredPropertyValue(ConfigModel): - value: Union[str, float, List[str], List[float]] + value: Union[str, int, float, List[str], List[int], List[float]] created: Optional[str] = None lastModified: Optional[str] = None -class Dataset(BaseModel): +class DatasetRetrievalConfig(BaseModel): + include_downstreams: Optional[bool] = False + + +class Dataset(StrictModel): id: Optional[str] = None platform: Optional[str] = None env: str = "PROD" @@ -163,9 +391,7 @@ class Dataset(BaseModel): tags: Optional[List[str]] = None glossary_terms: Optional[List[str]] = None owners: Optional[List[Union[str, Ownership]]] = None - structured_properties: Optional[ - Dict[str, Union[str, float, List[Union[str, float]]]] - ] = None + structured_properties: Optional[StructuredProperties] = None external_url: Optional[str] = None @property @@ -199,6 +425,10 @@ def platform_must_not_be_urn(cls, v): return v[len("urn:li:dataPlatform:") :] return v + @validator("structured_properties") + def simplify_structured_properties(cls, v): + return StructuredPropertiesHelper.simplify_structured_properties_list(v) + def _mint_auditstamp(self, message: str) -> AuditStampClass: return AuditStampClass( time=int(time.time() * 1000.0), @@ -221,6 +451,14 @@ def _mint_owner(self, owner: Union[str, Ownership]) -> OwnerClass: typeUrn=ownership_type_urn, ) + @staticmethod + def get_patch_builder(urn: str) -> DatasetPatchBuilder: + return DatasetPatchBuilder(urn) + + def patch_builder(self) -> DatasetPatchBuilder: + assert self.urn + return DatasetPatchBuilder(self.urn) + @classmethod def from_yaml(cls, file: str) -> Iterable["Dataset"]: with open(file) as fp: @@ -230,9 +468,45 @@ def from_yaml(cls, file: str) -> Iterable["Dataset"]: datasets = [datasets] for dataset_raw in datasets: dataset = Dataset.parse_obj(dataset_raw) + # dataset = Dataset.model_validate(dataset_raw, strict=True) yield dataset - def generate_mcp( + def entity_references(self) -> List[str]: + urn_prefix = f"{StructuredPropertyUrn.URN_PREFIX}:{StructuredPropertyUrn.LI_DOMAIN}:{StructuredPropertyUrn.ENTITY_TYPE}" + references = [] + if self.schema_metadata: + if self.schema_metadata.fields: + for field in self.schema_metadata.fields: + if field.structured_properties: + references.extend( + [ + f"{urn_prefix}:{prop_key}" + if not prop_key.startswith(urn_prefix) + else prop_key + for prop_key in field.structured_properties.keys() + ] + ) + if field.glossaryTerms: + references.extend( + [make_term_urn(term) for term in field.glossaryTerms] + ) + # We don't check references for tags + if self.structured_properties: + references.extend( + [ + f"{urn_prefix}:{prop_key}" + if not prop_key.startswith(urn_prefix) + else prop_key + for prop_key in self.structured_properties.keys() + ] + ) + if self.glossary_terms: + references.extend([make_term_urn(term) for term in self.glossary_terms]) + + # We don't check references for tags + return list(set(references)) + + def generate_mcp( # noqa: C901 self, ) -> Iterable[Union[MetadataChangeProposalClass, MetadataChangeProposalWrapper]]: mcp = MetadataChangeProposalWrapper( @@ -247,9 +521,12 @@ def generate_mcp( yield mcp if self.schema_metadata: + schema_fields = set() if self.schema_metadata.file: with open(self.schema_metadata.file) as schema_fp: schema_string = schema_fp.read() + schema_fields_list = avro_schema_to_mce_fields(schema_string) + schema_fields = {field.fieldPath for field in schema_fields_list} schema_metadata = SchemaMetadataClass( schemaName=self.name or self.id or self.urn or "", platform=self.platform_urn, @@ -264,7 +541,102 @@ def generate_mcp( yield mcp if self.schema_metadata.fields: + field_type_info_present = any( + field.type for field in self.schema_metadata.fields + ) + all_fields_type_info_present = all( + field.type for field in self.schema_metadata.fields + ) + if field_type_info_present and not all_fields_type_info_present: + raise ValueError( + "Either all fields must have type information or none of them should" + ) + + if all_fields_type_info_present: + update_technical_schema = True + else: + update_technical_schema = False + if update_technical_schema and not self.schema_metadata.file: + # We produce a schema metadata aspect only if we have type information + # and a schema file is not provided. + schema_metadata = SchemaMetadataClass( + schemaName=self.name or self.id or self.urn or "", + platform=self.platform_urn, + version=0, + hash="", + fields=[ + SchemaFieldClass( + fieldPath=field.id, # type: ignore[arg-type] + type=field.get_datahub_type(), + nativeDataType=field.nativeDataType or field.type, # type: ignore[arg-type] + nullable=field.nullable, + description=field.description, + label=field.label, + created=None, # This should be auto-populated. + lastModified=None, # This should be auto-populated. + recursive=field.recursive, + globalTags=GlobalTagsClass( + tags=[ + TagAssociationClass(tag=make_tag_urn(tag)) + for tag in field.globalTags + ] + ) + if field.globalTags is not None + else None, + glossaryTerms=GlossaryTermsClass( + terms=[ + GlossaryTermAssociationClass( + urn=make_term_urn(term) + ) + for term in field.glossaryTerms + ], + auditStamp=self._mint_auditstamp("yaml"), + ) + if field.glossaryTerms is not None + else None, + isPartOfKey=field.isPartOfKey, + isPartitioningKey=field.isPartitioningKey, + jsonProps=json.dumps(field.jsonProps) + if field.jsonProps is not None + else None, + ) + for field in self.schema_metadata.fields + ], + platformSchema=OtherSchemaClass( + rawSchema=yaml.dump( + self.schema_metadata.dict( + exclude_none=True, exclude_unset=True + ) + ) + ), + ) + mcp = MetadataChangeProposalWrapper( + entityUrn=self.urn, aspect=schema_metadata + ) + yield mcp + for field in self.schema_metadata.fields: + if schema_fields: + # search for the field in the schema fields set + matched_fields = [ + schema_field + for schema_field in schema_fields + if field.id == schema_field + or field.id == Dataset._simplify_field_path(schema_field) + ] + if not matched_fields: + raise ValueError( + f"Field {field.id} not found in the schema file" + ) + if len(matched_fields) > 1: + raise ValueError( + f"Field {field.id} matches multiple entries {matched_fields}in the schema file. Use the fully qualified field path." + ) + assert len(matched_fields) == 1 + assert ( + self.urn is not None + ) # validator should have filled this in + field.urn = make_schema_field_urn(self.urn, matched_fields[0]) field_urn = field.urn or make_schema_field_urn( self.urn, # type: ignore[arg-type] field.id, # type: ignore[arg-type] @@ -299,12 +671,15 @@ def generate_mcp( yield mcp if field.structured_properties: + urn_prefix = f"{StructuredPropertyUrn.URN_PREFIX}:{StructuredPropertyUrn.LI_DOMAIN}:{StructuredPropertyUrn.ENTITY_TYPE}" mcp = MetadataChangeProposalWrapper( entityUrn=field_urn, aspect=StructuredPropertiesClass( properties=[ StructuredPropertyValueAssignmentClass( - propertyUrn=f"urn:li:structuredProperty:{prop_key}", + propertyUrn=f"{urn_prefix}:{prop_key}" + if not prop_key.startswith(urn_prefix) + else prop_key, values=( prop_value if isinstance(prop_value, list) @@ -403,6 +778,10 @@ def validate_type( @staticmethod def _simplify_field_path(field_path: str) -> str: + # field paths with [type=array] or [type=map] or [type=union] should never be simplified + for type in ["array", "map", "union"]: + if f"[type={type}]" in field_path: + return field_path if field_path.startswith("[version=2.0]"): # v2 field path field_components = [] @@ -434,7 +813,26 @@ def _schema_from_schema_metadata( ) if schema_metadata: + # If the schema is built off of an avro schema, we only extract the fields if they have structured properties + # Otherwise, we extract all fields + if ( + schema_metadata.platformSchema + and isinstance(schema_metadata.platformSchema, models.OtherSchemaClass) + and schema_metadata.platformSchema.rawSchema + ): + try: + maybe_avro_schema = avro.schema.parse( + schema_metadata.platformSchema.rawSchema + ) + schema_fields = avro_schema_to_mce_fields(maybe_avro_schema) + except Exception as e: + logger.debug("Failed to parse avro schema: %s", e) + schema_fields = [] + schema_specification = SchemaSpecification( + raw_schema=schema_metadata.platformSchema.rawSchema + if hasattr(schema_metadata.platformSchema, "rawSchema") + else None, fields=[ SchemaFieldSpecification.from_schema_field( field, urn @@ -462,8 +860,21 @@ def _schema_from_schema_metadata( ) for field in schema_metadata.fields ] - ] + ], ) + if schema_fields and schema_specification.fields: + # Source was an avro schema, so we only include fields with structured properties, tags or glossary terms + schema_specification.fields = [ + field.remove_type_metadata() + for field in schema_specification.fields + if field.structured_properties + or field.globalTags + or field.glossaryTerms + ] + if ( + not schema_specification.fields + ): # set fields to None if there are no fields after filtering + schema_specification.fields = None return schema_specification else: return None @@ -485,7 +896,14 @@ def extract_owners_if_exists( return yaml_owners @classmethod - def from_datahub(cls, graph: DataHubGraph, urn: str) -> "Dataset": + def from_datahub( + cls, + graph: DataHubGraph, + urn: str, + config: DatasetRetrievalConfig = DatasetRetrievalConfig(), + ) -> "Dataset": + dataset_urn = DatasetUrn.from_string(urn) + platform_urn = DataPlatformUrn.from_string(dataset_urn.platform) dataset_properties: Optional[DatasetPropertiesClass] = graph.get_aspect( urn, DatasetPropertiesClass ) @@ -500,7 +918,7 @@ def from_datahub(cls, graph: DataHubGraph, urn: str) -> "Dataset": urn, StructuredPropertiesClass ) if structured_properties: - structured_properties_map: Dict[str, List[Union[str, float]]] = {} + structured_properties_map: StructuredProperties = {} for sp in structured_properties.properties: if sp.propertyUrn in structured_properties_map: assert isinstance(structured_properties_map[sp.propertyUrn], list) @@ -508,7 +926,19 @@ def from_datahub(cls, graph: DataHubGraph, urn: str) -> "Dataset": else: structured_properties_map[sp.propertyUrn] = sp.values - return Dataset( # type: ignore[call-arg] + if config.include_downstreams: + related_downstreams = graph.get_related_entities( + urn, + relationship_types=[ + "DownstreamOf", + ], + direction=DataHubGraph.RelationshipDirection.INCOMING, + ) + downstreams = [r.urn for r in related_downstreams] + + return Dataset( # type: ignore[arg-type] + id=dataset_urn.name, + platform=platform_urn.platform_name, urn=urn, description=( dataset_properties.description @@ -521,9 +951,11 @@ def from_datahub(cls, graph: DataHubGraph, urn: str) -> "Dataset": else None ), schema=Dataset._schema_from_schema_metadata(graph, urn), - tags=[tag.tag for tag in tags.tags] if tags else None, + tags=[TagUrn(tag.tag).name for tag in tags.tags] if tags else None, glossary_terms=( - [term.urn for term in glossary_terms.terms] if glossary_terms else None + [GlossaryTermUrn(term.urn).name for term in glossary_terms.terms] + if glossary_terms + else None ), owners=yaml_owners, properties=( @@ -533,14 +965,271 @@ def from_datahub(cls, graph: DataHubGraph, urn: str) -> "Dataset": structured_properties=( structured_properties_map if structured_properties else None ), + downstreams=downstreams if config.include_downstreams else None, ) + if PYDANTIC_VERSION < 2: + + def dict(self, **kwargs): + """Custom dict method for Pydantic v1 to handle YAML serialization properly.""" + exclude = kwargs.pop("exclude", set()) + + # If id and name are identical, exclude name from the output + if self.id == self.name and self.id is not None: + exclude.add("name") + + # if subtype and subtypes are identical or subtypes is a singleton list, exclude subtypes from the output + if self.subtypes and len(self.subtypes) == 1: + self.subtype = self.subtypes[0] + exclude.add("subtypes") + + result = super().dict(exclude=exclude, **kwargs) + + # Custom handling for schema_metadata/schema + if self.schema_metadata and "schema" in result: + schema_data = result["schema"] + + # Handle fields if they exist + if "fields" in schema_data and isinstance(schema_data["fields"], list): + # Process each field using its custom dict method + processed_fields = [] + if self.schema_metadata and self.schema_metadata.fields: + for field in self.schema_metadata.fields: + if field: + # Use dict method for Pydantic v1 + processed_field = field.dict(**kwargs) + processed_fields.append(processed_field) + + # Replace the fields in the result with the processed ones + schema_data["fields"] = processed_fields + + return result + else: + + def model_dump(self, **kwargs): + """Custom model_dump method for Pydantic v2 to handle YAML serialization properly.""" + exclude = kwargs.pop("exclude", set()) + + # If id and name are identical, exclude name from the output + if self.id == self.name and self.id is not None: + exclude.add("name") + + # if subtype and subtypes are identical or subtypes is a singleton list, exclude subtypes from the output + if self.subtypes and len(self.subtypes) == 1: + self.subtype = self.subtypes[0] + exclude.add("subtypes") + + if hasattr(super(), "model_dump"): + result = super().model_dump(exclude=exclude, **kwargs) # type: ignore + else: + result = super().dict(exclude=exclude, **kwargs) + + # Custom handling for schema_metadata/schema + if self.schema_metadata and "schema" in result: + schema_data = result["schema"] + + # Handle fields if they exist + if "fields" in schema_data and isinstance(schema_data["fields"], list): + # Process each field using its custom model_dump method + processed_fields = [] + if self.schema_metadata and self.schema_metadata.fields: + for field in self.schema_metadata.fields: + if field: + processed_field = field.model_dump(**kwargs) + processed_fields.append(processed_field) + + # Replace the fields in the result with the processed ones + schema_data["fields"] = processed_fields + + return result + def to_yaml( self, file: Path, - ) -> None: + ) -> bool: + """ + Write model to YAML file only if content has changed. + Preserves comments and structure of the existing YAML file. + Returns True if file was written, False if no changes were detected. + """ + # Create new model data + # Create new model data - choose dict() or model_dump() based on Pydantic version + if PYDANTIC_VERSION >= 2: + new_data = self.model_dump( + exclude_none=True, exclude_unset=True, by_alias=True + ) + else: + new_data = self.dict(exclude_none=True, exclude_unset=True, by_alias=True) + + # Set up ruamel.yaml for preserving comments + yaml_handler = YAML(typ="rt") # round-trip mode + yaml_handler.default_flow_style = False + yaml_handler.preserve_quotes = True # type: ignore[assignment] + yaml_handler.indent(mapping=2, sequence=2, offset=0) + + if file.exists(): + try: + # Load existing data with comments preserved + with open(file, "r") as fp: + existing_data = yaml_handler.load(fp) + + # Determine if the file contains a list or a single document + if isinstance(existing_data, dict): + existing_data = [existing_data] + is_original_list = False + else: + is_original_list = True + if isinstance(existing_data, list): + # Handle list case + updated = False + identifier = "urn" + model_id = self.urn + + if model_id is not None: + # Try to find and update existing item + for item in existing_data: + existing_dataset = Dataset(**item) + item_identifier = item.get(identifier, existing_dataset.urn) + if item_identifier == model_id: + # Found the item to update - preserve structure while updating values + updated = True + if ( + existing_dataset.schema_metadata + and existing_dataset.schema_metadata.file + ): + # Preserve the existing schema file path + new_data["schema"]["file"] = ( + existing_dataset.schema_metadata.file + ) + # Check if the content of the schema file has changed + with open( + existing_dataset.schema_metadata.file + ) as schema_fp: + schema_fp_content = schema_fp.read() + + if ( + schema_fp_content + != new_data["schema"]["raw_schema"] + ): + # If the content has changed, update the schema file + schema_file_path = Path( + existing_dataset.schema_metadata.file + ) + schema_file_path.write_text( + new_data["schema"]["raw_schema"] + ) + # Remove raw_schema from the schema aspect before updating + if "schema" in new_data: + new_data["schema"].pop("raw_schema") + + _update_dict_preserving_comments( + item, new_data, ["urn", "properties", "raw_schema"] + ) + break + + if not updated: + # Item not found, append to the list + existing_data.append(new_data) + updated = True + + # If no update was needed, return early + if not updated: + return False + + # Write the updated data back + with open(file, "w") as fp: + if not is_original_list: + existing_data = existing_data[0] + yaml_handler.dump(existing_data, fp) + + return True + + except Exception as e: + # If there's any error, we'll create a new file + print( + f"Error processing existing file {file}: {e}. Will create a new one." + ) + else: + # File doesn't exist or had errors - create a new one with default settings + yaml_handler.indent(mapping=2, sequence=2, offset=0) + + file.parent.mkdir(parents=True, exist_ok=True) + with open(file, "w") as fp: - yaml = YAML(typ="rt") # default, if not specfied, is 'rt' (round-trip) - yaml.indent(mapping=2, sequence=4, offset=2) - yaml.default_flow_style = False - yaml.dump(self.dict(exclude_none=True, exclude_unset=True), fp) + yaml_handler.dump(new_data, fp) + + return True + + +def _update_dict_preserving_comments( + target: Dict, source: Dict, optional_fields: Optional[List[str]] = None +) -> None: + """ + Updates a target dictionary with values from source, preserving comments and structure. + This modifies the target dictionary in-place. + """ + if optional_fields is None: + optional_fields = ["urn"] + # For each key in the source dict + for key, value in source.items(): + if key in target: + if isinstance(value, dict) and isinstance(target[key], dict): + # Recursively update nested dictionaries + _update_dict_preserving_comments(target[key], value) + else: + # Update scalar or list values + # If target value is an int, and source value is a float that is equal to the int, convert to int + if isinstance(value, float) and int(value) == value: + target[key] = int(value) + else: + target[key] = value + elif key not in optional_fields: + # Add new keys + target[key] = value + + # Remove keys that are in target but not in source + keys_to_remove = [k for k in target if k not in source] + for key in keys_to_remove: + del target[key] + + +def _dict_equal(dict1: Dict, dict2: Dict, optional_keys: List[str]) -> bool: + """ + Compare two dictionaries for equality, ignoring ruamel.yaml's metadata. + """ + + if len(dict1) != len(dict2): + # Check if the difference is only in optional keys + if len(dict1) > len(dict2): + for key in optional_keys: + if key in dict1 and key not in dict2: + del dict1[key] + elif len(dict2) > len(dict1): + for key in optional_keys: + if key in dict2 and key not in dict1: + del dict2[key] + if len(dict1) != len(dict2): + return False + + for key, value in dict1.items(): + if key not in dict2: + return False + + if isinstance(value, dict) and isinstance(dict2[key], dict): + if not _dict_equal(value, dict2[key], optional_keys): + return False + elif isinstance(value, list) and isinstance(dict2[key], list): + if len(value) != len(dict2[key]): + return False + + # Check list items (simplified for brevity) + for i in range(len(value)): + if isinstance(value[i], dict) and isinstance(dict2[key][i], dict): + if not _dict_equal(value[i], dict2[key][i], optional_keys): + return False + elif value[i] != dict2[key][i]: + return False + elif value != dict2[key]: + return False + + return True diff --git a/metadata-ingestion/src/datahub/api/entities/structuredproperties/structuredproperties.py b/metadata-ingestion/src/datahub/api/entities/structuredproperties/structuredproperties.py index b0b434751ad2cc..25bd13ba077327 100644 --- a/metadata-ingestion/src/datahub/api/entities/structuredproperties/structuredproperties.py +++ b/metadata-ingestion/src/datahub/api/entities/structuredproperties/structuredproperties.py @@ -1,7 +1,7 @@ import logging from enum import Enum from pathlib import Path -from typing import Iterable, List, Optional +from typing import Iterable, List, Optional, Union import yaml from pydantic import validator @@ -38,7 +38,7 @@ def values(): class AllowedValue(ConfigModel): - value: str + value: Union[int, float, str] description: Optional[str] = None diff --git a/metadata-ingestion/src/datahub/cli/specific/dataset_cli.py b/metadata-ingestion/src/datahub/cli/specific/dataset_cli.py index 5601d7e716c797..c3b5a827213676 100644 --- a/metadata-ingestion/src/datahub/cli/specific/dataset_cli.py +++ b/metadata-ingestion/src/datahub/cli/specific/dataset_cli.py @@ -1,12 +1,15 @@ +import filecmp import json import logging +import os +import shutil from pathlib import Path -from typing import Set, Tuple +from typing import List, Set, Tuple import click from click_default_group import DefaultGroup -from datahub.api.entities.dataset.dataset import Dataset +from datahub.api.entities.dataset.dataset import Dataset, DatasetRetrievalConfig from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.graph.client import DataHubGraph, get_default_graph from datahub.metadata.com.linkedin.pegasus2avro.common import Siblings @@ -30,18 +33,9 @@ def dataset() -> None: @telemetry.with_telemetry() def upsert(file: Path) -> None: """Upsert attributes to a Dataset in DataHub.""" - - with get_default_graph() as graph: - for dataset in Dataset.from_yaml(str(file)): - try: - for mcp in dataset.generate_mcp(): - graph.emit(mcp) - click.secho(f"Update succeeded for urn {dataset.urn}.", fg="green") - except Exception as e: - click.secho( - f"Update failed for id {id}. due to {e}", - fg="red", - ) + # Call the sync command with to_datahub=True to perform the upsert operation + ctx = click.get_current_context() + ctx.invoke(sync, file=str(file), to_datahub=True) @dataset.command( @@ -111,3 +105,123 @@ def _get_existing_siblings(graph: DataHubGraph, urn: str) -> Set[str]: return set(existing.siblings) else: return set() + + +@dataset.command( + name="file", +) +@click.option("--lintCheck", required=False, is_flag=True) +@click.option("--lintFix", required=False, is_flag=True) +@click.argument("file", type=click.Path(exists=True)) +@upgrade.check_upgrade +@telemetry.with_telemetry() +def file(lintcheck: bool, lintfix: bool, file: str) -> None: + """Operate on a Dataset file""" + + if lintcheck or lintfix: + import tempfile + from pathlib import Path + + # Create a temporary file in a secure way + # The file will be automatically deleted when the context manager exits + with tempfile.NamedTemporaryFile(suffix=".yml", delete=False) as temp: + temp_path = Path(temp.name) + try: + # Copy content to the temporary file + shutil.copyfile(file, temp_path) + + # Run the linting + datasets = Dataset.from_yaml(temp.name) + for dataset in datasets: + dataset.to_yaml(temp_path) + + # Compare the files + files_match = filecmp.cmp(file, temp_path) + + if files_match: + click.secho("No differences found", fg="green") + else: + # Show diff for visibility + os.system(f"diff {file} {temp_path}") + + if lintfix: + shutil.copyfile(temp_path, file) + click.secho(f"Fixed linting issues in {file}", fg="green") + else: + click.secho( + f"To fix these differences, run 'datahub dataset file --lintFix {file}'", + fg="yellow", + ) + finally: + # Ensure the temporary file is removed + if temp_path.exists(): + temp_path.unlink() + else: + click.secho( + "No operation specified. Choose from --lintCheck or --lintFix", fg="yellow" + ) + + +@dataset.command( + name="sync", +) +@click.option("-f", "--file", required=True, type=click.Path(exists=True)) +@click.option("--to-datahub/--from-datahub", required=True, is_flag=True) +@upgrade.check_upgrade +@telemetry.with_telemetry() +def sync(file: str, to_datahub: bool) -> None: + """Sync a Dataset file to/from DataHub""" + + failures: List[str] = [] + with get_default_graph() as graph: + datasets = Dataset.from_yaml(file) + for dataset in datasets: + assert ( + dataset.urn is not None + ) # Validator should have ensured this is filled. Tell mypy it's not None + if to_datahub: + missing_entity_references = [ + entity_reference + for entity_reference in dataset.entity_references() + if not graph.exists(entity_reference) + ] + if missing_entity_references: + click.secho( + "\n\t- ".join( + [ + f"Skipping Dataset {dataset.urn} due to missing entity references: " + ] + + missing_entity_references + ), + fg="red", + ) + failures.append(dataset.urn) + continue + try: + for mcp in dataset.generate_mcp(): + graph.emit(mcp) + click.secho(f"Update succeeded for urn {dataset.urn}.", fg="green") + except Exception as e: + click.secho( + f"Update failed for id {id}. due to {e}", + fg="red", + ) + else: + # Sync from DataHub + if graph.exists(dataset.urn): + dataset_get_config = DatasetRetrievalConfig() + if dataset.downstreams: + dataset_get_config.include_downstreams = True + existing_dataset: Dataset = Dataset.from_datahub( + graph=graph, urn=dataset.urn, config=dataset_get_config + ) + existing_dataset.to_yaml(Path(file)) + else: + click.secho(f"Dataset {dataset.urn} does not exist") + failures.append(dataset.urn) + if failures: + click.secho( + f"\nFailed to sync the following Datasets: {', '.join(failures)}", + fg="red", + ) + raise click.Abort() diff --git a/metadata-ingestion/src/datahub/pydantic/__init__.py b/metadata-ingestion/src/datahub/pydantic/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/metadata-ingestion/src/datahub/pydantic/compat.py b/metadata-ingestion/src/datahub/pydantic/compat.py new file mode 100644 index 00000000000000..f6c1208ae85afd --- /dev/null +++ b/metadata-ingestion/src/datahub/pydantic/compat.py @@ -0,0 +1,58 @@ +import functools +from typing import Any, Callable, Optional, TypeVar, cast + +# Define a type variable for the decorator +F = TypeVar("F", bound=Callable[..., Any]) + + +# Check which Pydantic version is installed +def get_pydantic_version() -> int: + """Determine if Pydantic v1 or v2 is installed.""" + try: + import pydantic + + version = pydantic.__version__ + return 1 if version.startswith("1.") else 2 + except (ImportError, AttributeError): + # Default to v1 if we can't determine version + return 1 + + +PYDANTIC_VERSION = get_pydantic_version() + + +# Create compatibility layer for dict-like methods +def compat_dict_method(v1_method: Optional[Callable] = None) -> Callable: + """ + Decorator to make a dict method work with both Pydantic v1 and v2. + + In v1: Uses the decorated method (typically dict) + In v2: Redirects to model_dump with appropriate parameter mapping + """ + + def decorator(func: F) -> F: + @functools.wraps(func) + def wrapper(self, *args, **kwargs): + if PYDANTIC_VERSION >= 2: + # Map v1 parameters to v2 parameters + # exclude -> exclude + # exclude_unset -> exclude_unset + # exclude_defaults -> exclude_defaults + # exclude_none -> exclude_none + # by_alias -> by_alias + model_dump_kwargs = kwargs.copy() + + # Handle the 'exclude' parameter differently between versions + exclude = kwargs.get("exclude", set()) + if isinstance(exclude, (set, dict)): + model_dump_kwargs["exclude"] = exclude + + return self.model_dump(**model_dump_kwargs) + return func(self, *args, **kwargs) + + return cast(F, wrapper) + + # Allow use as both @compat_dict_method and @compat_dict_method() + if v1_method is None: + return decorator + return decorator(v1_method) diff --git a/metadata-ingestion/tests/unit/api/entities/structuredproperties/example_structured_properties_golden.json b/metadata-ingestion/tests/unit/api/entities/structuredproperties/example_structured_properties_golden.json index 29386ece7b0ca1..71f4c7126b2f9d 100644 --- a/metadata-ingestion/tests/unit/api/entities/structuredproperties/example_structured_properties_golden.json +++ b/metadata-ingestion/tests/unit/api/entities/structuredproperties/example_structured_properties_golden.json @@ -12,19 +12,19 @@ "allowedValues": [ { "value": { - "string": "30" + "double": 30 }, "description": "30 days, usually reserved for datasets that are ephemeral and contain pii" }, { "value": { - "string": "90" + "double": 90 }, "description": "Use this for datasets that drive monthly reporting but contain pii" }, { "value": { - "string": "365" + "double": 365 }, "description": "Use this for non-sensitive data that can be retained for longer" } diff --git a/metadata-ingestion/tests/unit/cli/dataset/test_dataset_cmd.py b/metadata-ingestion/tests/unit/cli/dataset/test_dataset_cmd.py new file mode 100644 index 00000000000000..f29caac531664f --- /dev/null +++ b/metadata-ingestion/tests/unit/cli/dataset/test_dataset_cmd.py @@ -0,0 +1,219 @@ +import shutil +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest +from click.testing import CliRunner + +from datahub.cli.specific.dataset_cli import dataset + +TEST_RESOURCES_DIR = Path(__file__).parent / "test_resources" + + +@pytest.fixture +def test_yaml_file(): + """Creates a temporary test yaml file for testing.""" + # Define test data path + test_file = TEST_RESOURCES_DIR / "dataset.yaml" + + # Create a temporary copy to work with + temp_file = Path(f"{test_file}.tmp") + shutil.copyfile(test_file, temp_file) + + yield temp_file + + # Clean up + if temp_file.exists(): + temp_file.unlink() + + +@pytest.fixture +def malformed_yaml_file(): + """Creates a temporary malformed yaml file for testing.""" + malformed_content = """ +## This file is intentionally malformed +- id: user.badformat + platform: hive + schema: + fields: + - id: ip + type string # Missing colon here + description: The IP address + """ + + # Create a temporary file + temp_file = TEST_RESOURCES_DIR / "malformed_dataset.yaml.tmp" + with open(temp_file, "w") as f: + f.write(malformed_content) + + yield temp_file + + # Clean up + if temp_file.exists(): + temp_file.unlink() + + +@pytest.fixture +def fixable_yaml_file(): + """Creates a temporary yaml file with fixable formatting issues.""" + fixable_content = """ +## This file has fixable formatting issues +- id: user.fixable + platform: hive + schema: + fields: + - id: ip + type: string + description: The IP address + - id: user_id # Extra spaces + type: string + description: The user ID # Extra spaces + """ + + temp_file = TEST_RESOURCES_DIR / "fixable_dataset.yaml.tmp" + with open(temp_file, "w") as f: + f.write(fixable_content) + + yield temp_file + + # Clean up + if temp_file.exists(): + temp_file.unlink() + + +class TestDatasetCli: + def test_dataset_file_command_exists(self): + """Test that the dataset file command exists.""" + runner = CliRunner() + result = runner.invoke(dataset, ["--help"]) + assert result.exit_code == 0 + assert "file" in result.output + + @patch("datahub.cli.specific.dataset_cli.Dataset") + def test_lint_check_no_issues(self, mock_dataset, test_yaml_file): + """Test the lintCheck option when no issues are found.""" + # Setup mocks + mock_dataset_instance = MagicMock() + mock_dataset.from_yaml.return_value = [mock_dataset_instance] + mock_dataset_instance.to_yaml.return_value = None + + # Mock filecmp.cmp to return True (files match) + with patch("filecmp.cmp", return_value=True): + runner = CliRunner() + result = runner.invoke( + dataset, ["file", "--lintCheck", str(test_yaml_file)] + ) + + # Verify + assert result.exit_code == 0 + assert "No differences found" in result.output + mock_dataset.from_yaml.assert_called_once() + mock_dataset_instance.to_yaml.assert_called_once() + + @patch("datahub.cli.specific.dataset_cli.Dataset") + @patch("os.system") + def test_lint_check_with_issues(self, mock_system, mock_dataset, fixable_yaml_file): + """Test the lintCheck option when issues are found.""" + # Setup mocks + mock_dataset_instance = MagicMock() + mock_dataset.from_yaml.return_value = [mock_dataset_instance] + + # Mock filecmp.cmp to return False (files don't match) + with patch("filecmp.cmp", return_value=False): + runner = CliRunner() + result = runner.invoke( + dataset, ["file", "--lintCheck", str(fixable_yaml_file)] + ) + + # Verify + assert result.exit_code == 0 + assert "To fix these differences" in result.output + mock_dataset.from_yaml.assert_called_once() + mock_dataset_instance.to_yaml.assert_called_once() + mock_system.assert_called_once() # Should call diff + + @patch("datahub.cli.specific.dataset_cli.Dataset") + @patch("os.system") + @patch("shutil.copyfile") + def test_lint_fix( + self, mock_copyfile, mock_system, mock_dataset, fixable_yaml_file + ): + """Test the lintFix option.""" + # Setup mocks + mock_dataset_instance = MagicMock() + mock_dataset.from_yaml.return_value = [mock_dataset_instance] + + # Mock filecmp.cmp to return False (files don't match) + with patch("filecmp.cmp", return_value=False): + runner = CliRunner() + result = runner.invoke( + dataset, ["file", "--lintCheck", "--lintFix", str(fixable_yaml_file)] + ) + + # Verify + assert result.exit_code == 0 + assert "Fixed linting issues" in result.output + + # Check that copyfile was called twice: + # 1. To copy the original file to the temp file + # 2. To copy the fixed temp file back to the original + assert mock_copyfile.call_count == 2 + + # The second call should copy from temp file to the original + mock_copyfile.call_args_list[1][0][0] # Source of second call + assert mock_copyfile.call_args_list[1][0][1] == str( + fixable_yaml_file + ) # Destination + + @patch("datahub.cli.specific.dataset_cli.Dataset") + def test_error_handling(self, mock_dataset, malformed_yaml_file): + """Test error handling when processing a malformed yaml file.""" + # Setup mock to raise an exception + mock_dataset.from_yaml.side_effect = Exception("YAML parsing error") + + runner = CliRunner() + result = runner.invoke( + dataset, ["file", "--lintCheck", str(malformed_yaml_file)] + ) + + # Verify exception is properly handled + assert result.exit_code != 0 + mock_dataset.from_yaml.assert_called_once() + + def test_temporary_file_cleanup(self, test_yaml_file): + """Test that temporary files are properly cleaned up.""" + # Count files in the directory before + files_before = len(list(TEST_RESOURCES_DIR.glob("*.tmp"))) + + runner = CliRunner() + with patch("datahub.cli.specific.dataset_cli.Dataset"): + with patch("filecmp.cmp", return_value=True): + runner.invoke(dataset, ["file", "--lintCheck", str(test_yaml_file)]) + + # Count files after + files_after = len(list(TEST_RESOURCES_DIR.glob("*.tmp"))) + + # Should be same count (our fixture creates one tmp file) + assert files_before == files_after + + @patch("datahub.cli.specific.dataset_cli.Dataset") + def test_multiple_datasets_in_file(self, mock_dataset, test_yaml_file): + """Test handling of multiple datasets defined in a single file.""" + # Create mock dataset instances + mock_dataset1 = MagicMock() + mock_dataset2 = MagicMock() + mock_dataset.from_yaml.return_value = [mock_dataset1, mock_dataset2] + + with patch("filecmp.cmp", return_value=True): + runner = CliRunner() + result = runner.invoke( + dataset, ["file", "--lintCheck", str(test_yaml_file)] + ) + + # Verify + assert result.exit_code == 0 + assert "No differences found" in result.output + + # Verify both dataset instances had to_yaml called + mock_dataset1.to_yaml.assert_called_once() + mock_dataset2.to_yaml.assert_called_once() diff --git a/metadata-ingestion/tests/unit/cli/dataset/test_resources/dataset.yaml b/metadata-ingestion/tests/unit/cli/dataset/test_resources/dataset.yaml new file mode 100644 index 00000000000000..29fb0c73bca4c9 --- /dev/null +++ b/metadata-ingestion/tests/unit/cli/dataset/test_resources/dataset.yaml @@ -0,0 +1,33 @@ +## This file is used to define the dataset schema for the dataset `foobar` +- id: user.clicksv2 + platform: hive + # - urn: urn:li:dataset:(urn:li:dataPlatform:hive,user.clicks,PROD) # use urn instead of id and platform + subtype: Table + properties: + test_property: test_value + schema: + # file: examples/structured_properties/click_event.avsc + fields: + - id: ip + structured_properties: + io.acryl.privacy.retentionTime: + - 30 + - 90 + type: string + description: The IP address of the user + - id: user_id + type: string + description: The user ID of the user +- id: user.clicksv3 + platform: hive + urn: urn:li:dataset:(urn:li:dataPlatform:hive,user.clicksv3,PROD) # use urn instead of id and platform + subtype: View + schema: + # file: examples/structured_properties/click_event.avsc + fields: + - id: ip + type: string + description: The IP address of the user + - id: user_id + type: string + description: The user ID of the user \ No newline at end of file diff --git a/smoke-test/tests/cli/dataset_cmd/__init__.py b/smoke-test/tests/cli/dataset_cmd/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/smoke-test/tests/cli/dataset_cmd/test_dataset_command.py b/smoke-test/tests/cli/dataset_cmd/test_dataset_command.py new file mode 100644 index 00000000000000..dfcd59d335e2e7 --- /dev/null +++ b/smoke-test/tests/cli/dataset_cmd/test_dataset_command.py @@ -0,0 +1,297 @@ +import json +import logging +import os +import tempfile +from pathlib import Path +from random import randint + +import pytest +import yaml +from click.testing import CliRunner + +from datahub.api.entities.dataset.dataset import Dataset +from datahub.emitter.mce_builder import make_dataset_urn +from datahub.entrypoints import datahub +from datahub.ingestion.graph.client import DataHubGraph +from tests.consistency_utils import wait_for_writes_to_sync +from tests.utils import delete_urns, get_sleep_info + +logger = logging.getLogger(__name__) + +# Generate random dataset IDs to avoid test interference +start_index = randint(10, 10000) +dataset_id = f"test_dataset_sync_{start_index}" +dataset_urn = make_dataset_urn("snowflake", dataset_id) + +sleep_sec, sleep_times = get_sleep_info() +runner = CliRunner(mix_stderr=False) + + +@pytest.fixture(scope="module") +def setup_teardown_dataset(graph_client: DataHubGraph): + """Fixture to setup and teardown the test dataset""" + # Setup: Create empty environment + try: + # Teardown any existing dataset first to ensure clean state + if graph_client.exists(dataset_urn): + delete_urns(graph_client, [dataset_urn]) + wait_for_writes_to_sync() + + yield + finally: + # Teardown: Clean up created dataset + if graph_client.exists(dataset_urn): + delete_urns(graph_client, [dataset_urn]) + wait_for_writes_to_sync() + + +def create_dataset_yaml(file_path: Path, additional_properties=None): + """Create a test dataset YAML file""" + dataset_yaml = { + "id": dataset_id, + "platform": "snowflake", + "env": "PROD", + "description": "Test dataset for CLI sync smoke test", + "schema": { + "fields": [ + {"id": "id", "type": "number", "description": "Primary key"}, + {"id": "name", "type": "string", "description": "User name"}, + ] + }, + "tags": ["test", "smoke_test"], + "properties": {"origin": "cli_test"}, + } + + # Add any additional properties + if additional_properties: + for key, value in additional_properties.items(): + dataset_yaml[key] = value + + # Write YAML file + with open(file_path, "w") as f: + f.write("# Dataset sync test\n") # Add a comment to test comment preservation + yaml.dump(dataset_yaml, f, indent=2) + + +def run_cli_command(cmd, auth_session): + """Run a DataHub CLI command using CliRunner and auth_session""" + args = cmd.split() + result = runner.invoke( + datahub, + args, + env={ + "DATAHUB_GMS_URL": auth_session.gms_url(), + "DATAHUB_GMS_TOKEN": auth_session.gms_token(), + }, + ) + + if result.exit_code != 0: + logger.error(f"Command failed: {cmd}") + logger.error(f"STDOUT: {result.stdout}") + logger.error(f"STDERR: {result.stderr}") + raise Exception(f"Command failed with return code {result.exit_code}") + + return result + + +def test_dataset_sync_to_datahub( + setup_teardown_dataset, graph_client: DataHubGraph, auth_session +): + """Test syncing dataset from YAML to DataHub""" + with tempfile.NamedTemporaryFile(suffix=".yml", delete=False) as tmp: + temp_file_path = Path(tmp.name) + try: + # Create a dataset YAML file + create_dataset_yaml(temp_file_path) + + # Run the CLI command to sync to DataHub + cmd = f"dataset sync -f {temp_file_path} --to-datahub" + result = run_cli_command(cmd, auth_session) + + # Verify success message in output + assert f"Update succeeded for urn {dataset_urn}" in result.stdout + + # Wait for changes to propagate + wait_for_writes_to_sync() + + # Verify the dataset exists in DataHub + assert graph_client.exists(dataset_urn) + + # Retrieve dataset and verify properties + dataset = Dataset.from_datahub(graph=graph_client, urn=dataset_urn) + assert dataset.id == dataset_id + assert dataset.platform == "snowflake" + assert dataset.description == "Test dataset for CLI sync smoke test" + assert dataset.tags == ["test", "smoke_test"] + assert dataset.properties is not None + assert dataset.properties["origin"] == "cli_test" + + finally: + # Clean up temporary file + if temp_file_path.exists(): + os.unlink(temp_file_path) + + +def test_dataset_sync_from_datahub( + setup_teardown_dataset, graph_client: DataHubGraph, auth_session +): + """Test syncing dataset from DataHub to YAML""" + with tempfile.NamedTemporaryFile(suffix=".yml", delete=False) as tmp: + temp_file_path = Path(tmp.name) + try: + # First, create a dataset in DataHub + dataset = Dataset( + id=dataset_id, + platform="snowflake", + description="Test dataset created directly in DataHub", + tags=["from_datahub", "cli_test"], + properties={"origin": "direct_creation"}, + schema=None, + ) + + # Emit the dataset to DataHub + for mcp in dataset.generate_mcp(): + graph_client.emit(mcp) + + wait_for_writes_to_sync() + + # Create a minimal dataset YAML as a starting point + create_dataset_yaml( + temp_file_path, {"description": "This will be overwritten"} + ) + + # Run the CLI command to sync from DataHub + cmd = f"dataset sync -f {temp_file_path} --from-datahub" + result = run_cli_command(cmd, auth_session) + assert result.exit_code == 0 + + # Wait to ensure file is updated + wait_for_writes_to_sync() + # Verify the YAML file was updated + with open(temp_file_path, "r") as f: + content = f.read() + # Check for the comment preservation + assert "# Dataset sync test" in content + # Check for content from DataHub + assert "Test dataset created directly in DataHub" in content + assert "from_datahub" in content + assert "direct_creation" in content + + finally: + # Clean up temporary file + if temp_file_path.exists(): + os.unlink(temp_file_path) + + +def test_dataset_sync_bidirectional( + setup_teardown_dataset, graph_client: DataHubGraph, auth_session +): + """Test bidirectional sync with modifications on both sides""" + with tempfile.NamedTemporaryFile(suffix=".yml", delete=False) as tmp: + temp_file_path = Path(tmp.name) + try: + # 1. Create initial dataset in YAML + create_dataset_yaml(temp_file_path) + + # 2. Sync to DataHub + run_cli_command( + f"dataset sync -f {temp_file_path} --to-datahub", auth_session + ) + wait_for_writes_to_sync() + + # 3. Modify directly in DataHub + dataset_patcher = Dataset.get_patch_builder(dataset_urn) + dataset_patcher.set_description("Modified directly in DataHub") + for mcp in dataset_patcher.build(): + graph_client.emit(mcp) + wait_for_writes_to_sync() + + # 4. Sync from DataHub to update YAML + run_cli_command( + f"dataset sync -f {temp_file_path} --from-datahub", auth_session + ) + + # 5. Modify the YAML file directly + with open(temp_file_path, "r") as f: + import yaml + + data = yaml.safe_load(f) + data["properties"]["modified_by"] = "cli_test" + data["tags"].append("modified_yaml") + + with open(temp_file_path, "w") as f: + f.write("# Modified comment\n") + json.dump(data, f, indent=2) + + # 6. Sync back to DataHub + run_cli_command( + f"dataset sync -f {temp_file_path} --to-datahub", auth_session + ) + wait_for_writes_to_sync() + + # 7. Verify both modifications are present in DataHub + final_dataset = Dataset.from_datahub(graph=graph_client, urn=dataset_urn) + assert final_dataset.description == "Modified directly in DataHub" + assert final_dataset.properties is not None + assert final_dataset.properties["modified_by"] == "cli_test" + assert final_dataset.tags is not None + assert "modified_yaml" in final_dataset.tags + + # 8. Sync one more time from DataHub and verify YAML is intact + run_cli_command( + f"dataset sync -f {temp_file_path} --from-datahub", auth_session + ) + + with open(temp_file_path, "r") as f: + content = f.read() + assert "# Modified comment" in content + + finally: + # Clean up temporary file + if temp_file_path.exists(): + os.unlink(temp_file_path) + + +def test_dataset_sync_validation( + setup_teardown_dataset, graph_client: DataHubGraph, auth_session +): + """Test validation during sync to DataHub with invalid references""" + with tempfile.NamedTemporaryFile(suffix=".yml", delete=False) as tmp: + temp_file_path = Path(tmp.name) + try: + # Create dataset with invalid structured property reference + create_dataset_yaml( + temp_file_path, + { + "structured_properties": {"non_existent_property": "value"}, + "schema": { + "fields": [ + { + "id": "field1", + "type": "string", + "structured_properties": { + "non_existent_field_property": "value" + }, + } + ] + }, + }, + ) + + # Attempt to sync to DataHub - should fail due to validation + cmd = f"dataset sync -f {temp_file_path} --to-datahub" + try: + run_cli_command(cmd, auth_session) + # If we get here, the command didn't fail as expected + raise AssertionError("Command should have failed due to validation") + except Exception as e: + if not isinstance(e, AssertionError): + # Command failed as expected + pass + else: + raise e + + finally: + # Clean up temporary file + if temp_file_path.exists(): + os.unlink(temp_file_path) diff --git a/smoke-test/tests/structured_properties/test_dataset.yaml b/smoke-test/tests/structured_properties/test_dataset.yaml index 2ac1cca6c6dc28..89dd5e57bad53a 100644 --- a/smoke-test/tests/structured_properties/test_dataset.yaml +++ b/smoke-test/tests/structured_properties/test_dataset.yaml @@ -5,15 +5,16 @@ schema: file: tests/structured_properties/click_event.avsc fields: - # - id: ip - - urn: urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,user.clicks,PROD),ip) - structured_properties: - io.acryl.dataManagement.deprecationDate: "2023-01-01" + - id: ip + structured_properties: + io.acryl.dataManagement.deprecationDate: '2023-01-01' properties: - retention: 365 + retention: '365' structured_properties: clusterType: primary clusterName: gold projectNames: - - Tracking - - DataHub + - Tracking + - DataHub + io.acryl.smoketest.dm.retentionTime: 365 + io.acryl.smoketest.dm.certifier: urn:li:corpuser:datahub diff --git a/smoke-test/tests/structured_properties/test_structured_properties.py b/smoke-test/tests/structured_properties/test_structured_properties.py index 8b0f61d8a5ea26..d785e4274a53b8 100644 --- a/smoke-test/tests/structured_properties/test_structured_properties.py +++ b/smoke-test/tests/structured_properties/test_structured_properties.py @@ -341,8 +341,9 @@ def test_dataset_yaml_loader(ingest_cleanup_data, graph_client): wait_for_writes_to_sync() property_name = "io.acryl.dataManagement.deprecationDate" + field_name = "[version=2.0].[type=ClickEvent].[type=string].ip" assert get_property_from_entity( - make_schema_field_urn(make_dataset_urn("hive", "user.clicks"), "ip"), + make_schema_field_urn(make_dataset_urn("hive", "user.clicks"), field_name), property_name, graph=graph_client, ) == ["2023-01-01"] @@ -351,19 +352,21 @@ def test_dataset_yaml_loader(ingest_cleanup_data, graph_client): graph=graph_client, urn="urn:li:dataset:(urn:li:dataPlatform:hive,user.clicks,PROD)", ) - field_name = "ip" assert dataset.schema_metadata is not None assert dataset.schema_metadata.fields is not None matching_fields = [ f for f in dataset.schema_metadata.fields - if f.id is not None and Dataset._simplify_field_path(f.id) == field_name + if f.id is not None and f.id == Dataset._simplify_field_path(field_name) ] assert len(matching_fields) == 1 assert matching_fields[0].structured_properties is not None - assert matching_fields[0].structured_properties[ - Urn.make_structured_property_urn("io.acryl.dataManagement.deprecationDate") - ] == ["2023-01-01"] + assert ( + matching_fields[0].structured_properties[ + "io.acryl.dataManagement.deprecationDate" + ] + == "2023-01-01" + ) def test_structured_property_search( diff --git a/smoke-test/tests/structured_properties/test_structured_properties.yaml b/smoke-test/tests/structured_properties/test_structured_properties.yaml index eee8d53aeccf5a..a50221c5a338d1 100644 --- a/smoke-test/tests/structured_properties/test_structured_properties.yaml +++ b/smoke-test/tests/structured_properties/test_structured_properties.yaml @@ -31,3 +31,22 @@ - dataFlow - dataJob - schemaField +- id: io.acryl.smoketest.dm.retentionTime + type: NUMBER + display_name: Retention Time + entity_types: + - dataset + description: 'Retention Time is used to figure out how long to retain records in a dataset' + allowed_values: + - value: 30 + description: 30 days, usually reserved for datasets that are ephemeral and contain pii + - value: 90 + description: Use this for datasets that drive monthly reporting but contain pii + - value: 365 + description: Use this for non-sensitive data that can be retained for longer +- id: io.acryl.smoketest.dm.certifier + type: URN + display_name: Person Certifying the asset + entity_types: + - dataset + - schemaField