diff --git a/datahub-web-react/src/app/ingest/source/builder/sources.json b/datahub-web-react/src/app/ingest/source/builder/sources.json index 5e81a7855e9c4..44b8a37f14655 100644 --- a/datahub-web-react/src/app/ingest/source/builder/sources.json +++ b/datahub-web-react/src/app/ingest/source/builder/sources.json @@ -309,7 +309,7 @@ "displayName": "Dremio", "description": "Import Spaces, Sources, Tables and statistics from Dremio.", "docsUrl": "https://datahubproject.io/docs/metadata-ingestion/", - "recipe": "source:\n type: dremio\n config:\n # Coordinates\n hostname: null\n port: null\n #true if https, otherwise false\n tls: true\n\n #For cloud instance\n #is_dremio_cloud: True\n #dremio_cloud_project_id: \n\n #Credentials with personal access token\n authentication_method: PAT\n password: pass\n\n #Or Credentials with basic auth\n #authentication_method: password\n #username: null\n #password: null\n\n stateful_ingestion:\n enabled: true" + "recipe": "source:\n type: dremio\n config:\n # Coordinates\n hostname: null\n port: null\n #true if https, otherwise false\n tls: true\n\n #For cloud instance\n #is_dremio_cloud: True\n #dremio_cloud_project_id: \n\n #Credentials with personal access token\n authentication_method: PAT\n password: pass\n\n #Or Credentials with basic auth\n #authentication_method: password\n #username: null\n #password: null\n\n ingest_owner: true\n\n stateful_ingestion:\n enabled: true" }, { "urn": "urn:li:dataPlatform:cassandra", @@ -317,5 +317,13 @@ "displayName": "CassandraDB", "docsUrl": "https://datahubproject.io/docs/generated/ingestion/sources/cassandra", "recipe": "source:\n type: cassandra\n config:\n # Credentials for on prem cassandra\n contact_point: localhost\n port: 9042\n username: admin\n password: password\n\n # Or\n # Credentials Astra Cloud\n #cloud_config:\n # secure_connect_bundle: Path to Secure Connect Bundle (.zip)\n # token: Application Token\n\n # Optional Allow / Deny extraction of particular keyspaces.\n keyspace_pattern:\n allow: [.*]\n\n # Optional Allow / Deny extraction of particular tables.\n table_pattern:\n allow: [.*]" + }, + { + "urn": "urn:li:dataPlatform:iceberg", + "name": "iceberg", + "displayName": "Iceberg", + "description": "Ingest databases and tables from any Iceberg catalog implementation", + "docsUrl": "https://datahubproject.io/docs/generated/ingestion/sources/iceberg", + "recipe": "source:\n type: \"iceberg\"\n config:\n env: dev\n # each thread will open internet connections to fetch manifest files independently, \n # this value needs to be adjusted with ulimit\n processing_threads: 1 \n # a single catalog definition with a form of a dictionary\n catalog: \n demo: # name of the catalog\n type: \"rest\" # other types are available\n uri: \"uri\"\n s3.access-key-id: \"access-key\"\n s3.secret-access-key: \"secret-access-key\"\n s3.region: \"aws-region\"\n profiling:\n enabled: false\n" } ] diff --git a/docs/managed-datahub/release-notes/v_0_3_7.md b/docs/managed-datahub/release-notes/v_0_3_7.md index 694283d945fac..94cbdd79dbf5e 100644 --- a/docs/managed-datahub/release-notes/v_0_3_7.md +++ b/docs/managed-datahub/release-notes/v_0_3_7.md @@ -7,7 +7,7 @@ Release Availability Date Recommended CLI/SDK --- -- `v0.14.1.11` with release notes at https://github.com/datahub/datahub/releases/tag/v0.14.1.11 +- `v0.14.1.12` with release notes at https://github.com/datahub/datahub/releases/tag/v0.14.1.12 If you are using an older CLI/SDK version, then please upgrade it. This applies for all CLI/SDK usages, if you are using it through your terminal, GitHub Actions, Airflow, in Python SDK somewhere, Java SDK, etc. This is a strong recommendation to upgrade, as we keep on pushing fixes in the CLI, and it helps us support you better. @@ -116,7 +116,7 @@ If you are using an older CLI/SDK version, then please upgrade it. This applies - Improved UX for setting up and managing SSO - Ingestion changes - - In addition to the improvements listed here: https://github.com/acryldata/datahub/releases/tag/v0.14.1.11 + - In addition to the improvements listed here: https://github.com/acryldata/datahub/releases/tag/v0.14.1.12 - PowerBI: Support for PowerBI Apps and cross-workspace lineage - Fivetran: Major improvements to configurability and improved reliability with large Fivetran setups - Snowflake & BigQuery: Improved handling of temporary tables and swap statements when generating lineage diff --git a/docs/what/urn.md b/docs/what/urn.md index 2f4dffb985653..c7fb0555cd992 100644 --- a/docs/what/urn.md +++ b/docs/what/urn.md @@ -35,11 +35,17 @@ urn:li:dataset:(urn:li:dataPlatform:hdfs,PageViewEvent,EI) ## Restrictions -There are a few restrictions when creating an urn: +There are a few restrictions when creating an URN: -1. Commas are reserved character in URN fields: `,` -2. Parentheses are reserved characters in URN fields: `(` or `)` -3. Colons are reserved characters in URN fields: `:` -4. Urn separator UTF-8 character `␟` +The following characters are not allowed anywhere in the URN + +1. Parentheses are reserved characters in URN fields: `(` or `)` +2. The "unit separator" unicode character `␟` (U+241F) + +The following characters are not allowed within an URN tuple. + +1. Commas are reserved characters in URN tuples: `,` + +Example: `urn:li:dashboard:(looker,dashboards.thelook)` is a valid urn, but `urn:li:dashboard:(looker,dashboards.the,look)` is invalid. Please do not use these characters when creating or generating urns. One approach is to use URL encoding for the characters. diff --git a/metadata-ingestion/docs/sources/dremio/dremio_recipe.yml b/metadata-ingestion/docs/sources/dremio/dremio_recipe.yml index 9dcd4f8b337d1..d18d19da2de84 100644 --- a/metadata-ingestion/docs/sources/dremio/dremio_recipe.yml +++ b/metadata-ingestion/docs/sources/dremio/dremio_recipe.yml @@ -20,6 +20,8 @@ source: include_query_lineage: True + ingest_owner: true + #Optional source_mappings: - platform: s3 diff --git a/metadata-ingestion/docs/sources/iceberg/iceberg.md b/metadata-ingestion/docs/sources/iceberg/iceberg.md index 7e40315a2e319..92aac5ffa6ce5 100644 --- a/metadata-ingestion/docs/sources/iceberg/iceberg.md +++ b/metadata-ingestion/docs/sources/iceberg/iceberg.md @@ -18,6 +18,8 @@ This ingestion source maps the following Source System Concepts to DataHub Conce ## Troubleshooting -### [Common Issue] +### Exceptions while increasing `processing_threads` -[Provide description of common issues with this integration and steps to resolve] +Each processing thread will open several files/sockets to download manifest files from blob storage. If you experience +exceptions appearing when increasing `processing_threads` configuration parameter, try to increase limit of open +files (i.e. using `ulimit` in Linux). diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 74c2e611cf68f..d7e056b31370d 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -14,8 +14,8 @@ ) base_requirements = { - # Typing extension should be >=3.10.0.2 ideally but we can't restrict due to a Airflow 2.1 dependency conflict. - "typing_extensions>=3.7.4.3", + # Our min version of typing_extensions is somewhat constrained by Airflow. + "typing_extensions>=3.10.0.2", # Actual dependencies. "typing-inspect", # pydantic 1.8.2 is incompatible with mypy 0.910. @@ -249,7 +249,8 @@ iceberg_common = { # Iceberg Python SDK - "pyiceberg>=0.4,<0.7", + # Kept at 0.4.0 due to higher versions requiring pydantic>2, as soon as we are fine with it, bump this dependency + "pyiceberg>=0.4.0", } mssql_common = { diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py index 4598ae388b827..499e7e1231d05 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py @@ -53,19 +53,7 @@ make_assertion_from_test, make_assertion_result_from_test, ) -from datahub.ingestion.source.sql.sql_types import ( - ATHENA_SQL_TYPES_MAP, - BIGQUERY_TYPES_MAP, - POSTGRES_TYPES_MAP, - SNOWFLAKE_TYPES_MAP, - SPARK_SQL_TYPES_MAP, - TRINO_SQL_TYPES_MAP, - VERTICA_SQL_TYPES_MAP, - resolve_athena_modified_type, - resolve_postgres_modified_type, - resolve_trino_modified_type, - resolve_vertica_modified_type, -) +from datahub.ingestion.source.sql.sql_types import resolve_sql_type from datahub.ingestion.source.state.stale_entity_removal_handler import ( StaleEntityRemovalHandler, StaleEntityRemovalSourceReport, @@ -89,17 +77,11 @@ from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent from datahub.metadata.com.linkedin.pegasus2avro.schema import ( - BooleanTypeClass, - DateTypeClass, MySqlDDL, NullTypeClass, - NumberTypeClass, - RecordType, SchemaField, SchemaFieldDataType, SchemaMetadata, - StringTypeClass, - TimeTypeClass, ) from datahub.metadata.schema_classes import ( DataPlatformInstanceClass, @@ -804,28 +786,6 @@ def make_mapping_upstream_lineage( ) -# See https://github.com/fishtown-analytics/dbt/blob/master/core/dbt/adapters/sql/impl.py -_field_type_mapping = { - "boolean": BooleanTypeClass, - "date": DateTypeClass, - "time": TimeTypeClass, - "numeric": NumberTypeClass, - "text": StringTypeClass, - "timestamp with time zone": DateTypeClass, - "timestamp without time zone": DateTypeClass, - "integer": NumberTypeClass, - "float8": NumberTypeClass, - "struct": RecordType, - **POSTGRES_TYPES_MAP, - **SNOWFLAKE_TYPES_MAP, - **BIGQUERY_TYPES_MAP, - **SPARK_SQL_TYPES_MAP, - **TRINO_SQL_TYPES_MAP, - **ATHENA_SQL_TYPES_MAP, - **VERTICA_SQL_TYPES_MAP, -} - - def get_column_type( report: DBTSourceReport, dataset_name: str, @@ -835,24 +795,10 @@ def get_column_type( """ Maps known DBT types to datahub types """ - TypeClass: Any = _field_type_mapping.get(column_type) if column_type else None - - if TypeClass is None and column_type: - # resolve a modified type - if dbt_adapter == "trino": - TypeClass = resolve_trino_modified_type(column_type) - elif dbt_adapter == "athena": - TypeClass = resolve_athena_modified_type(column_type) - elif dbt_adapter == "postgres" or dbt_adapter == "redshift": - # Redshift uses a variant of Postgres, so we can use the same logic. - TypeClass = resolve_postgres_modified_type(column_type) - elif dbt_adapter == "vertica": - TypeClass = resolve_vertica_modified_type(column_type) - elif dbt_adapter == "snowflake": - # Snowflake types are uppercase, so we check that. - TypeClass = _field_type_mapping.get(column_type.upper()) - - # if still not found, report the warning + + TypeClass = resolve_sql_type(column_type, dbt_adapter) + + # if still not found, report a warning if TypeClass is None: if column_type: report.info( @@ -861,9 +807,9 @@ def get_column_type( context=f"{dataset_name} - {column_type}", log=False, ) - TypeClass = NullTypeClass + TypeClass = NullTypeClass() - return SchemaFieldDataType(type=TypeClass()) + return SchemaFieldDataType(type=TypeClass) @platform_name("dbt") diff --git a/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_aspects.py b/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_aspects.py index b29fc91a25e74..d9d85edbf4f7a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_aspects.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_aspects.py @@ -142,6 +142,7 @@ def __init__( platform: str, ui_url: str, env: str, + ingest_owner: bool, domain: Optional[str] = None, platform_instance: Optional[str] = None, ): @@ -150,6 +151,7 @@ def __init__( self.env = env self.domain = domain self.ui_url = ui_url + self.ingest_owner = ingest_owner def get_container_key( self, name: Optional[str], path: Optional[List[str]] @@ -426,21 +428,23 @@ def _create_external_url(self, dataset: DremioDataset) -> str: return f'{self.ui_url}/{container_type}/{dataset_url_path}"{dataset.resource_name}"' def _create_ownership(self, dataset: DremioDataset) -> Optional[OwnershipClass]: - if not dataset.owner: - return None - owner = ( - make_user_urn(dataset.owner) - if dataset.owner_type == "USER" - else make_group_urn(dataset.owner) - ) - return OwnershipClass( - owners=[ - OwnerClass( - owner=owner, - type=OwnershipTypeClass.TECHNICAL_OWNER, - ) - ] - ) + if self.ingest_owner and dataset.owner: + owner_urn = ( + make_user_urn(dataset.owner) + if dataset.owner_type == "USER" + else make_group_urn(dataset.owner) + ) + ownership: OwnershipClass = OwnershipClass( + owners=[ + OwnerClass( + owner=owner_urn, + type=OwnershipTypeClass.TECHNICAL_OWNER, + ) + ] + ) + return ownership + + return None def _create_glossary_terms(self, entity: DremioDataset) -> GlossaryTermsClass: return GlossaryTermsClass( diff --git a/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_config.py b/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_config.py index d966d575c0332..b3f2107a1dfaa 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_config.py @@ -174,3 +174,8 @@ def is_profiling_enabled(self) -> bool: default=False, description="Whether to include query-based lineage information.", ) + + ingest_owner: bool = Field( + default=True, + description="Ingest Owner from source. This will override Owner info entered from UI", + ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_source.py b/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_source.py index 5b96845ec0496..5535a40617701 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_source.py @@ -97,6 +97,7 @@ class DremioSource(StatefulIngestionSourceBase): - Ownership and Glossary Terms: - Metadata related to ownership of datasets, extracted from Dremio’s ownership model. - Glossary terms and business metadata associated with datasets, providing additional context to the data. + - Note: Ownership information will only be available for the Cloud and Enterprise editions, it will not be available for the Community edition. - Optional SQL Profiling (if enabled): - Table, row, and column statistics can be profiled and ingested via optional SQL queries. @@ -123,6 +124,7 @@ def __init__(self, config: DremioSourceConfig, ctx: PipelineContext): self.dremio_aspects = DremioAspects( platform=self.get_platform(), domain=self.config.domain, + ingest_owner=self.config.ingest_owner, platform_instance=self.config.platform_instance, env=self.config.env, ui_url=dremio_api.ui_url, diff --git a/metadata-ingestion/src/datahub/ingestion/source/feast.py b/metadata-ingestion/src/datahub/ingestion/source/feast.py index e097fd1f221ea..6330fe0291660 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/feast.py +++ b/metadata-ingestion/src/datahub/ingestion/source/feast.py @@ -42,10 +42,14 @@ from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent from datahub.metadata.schema_classes import ( BrowsePathsClass, + GlobalTagsClass, MLFeaturePropertiesClass, MLFeatureTablePropertiesClass, MLPrimaryKeyPropertiesClass, + OwnerClass, + OwnershipClass, StatusClass, + TagAssociationClass, ) # FIXME: ValueType module cannot be used as a type @@ -91,6 +95,24 @@ class FeastRepositorySourceConfig(ConfigModel): environment: str = Field( default=DEFAULT_ENV, description="Environment to use when constructing URNs" ) + # owner_mappings example: + # This must be added to the recipe in order to extract owners, otherwise NO owners will be extracted + # owner_mappings: + # - feast_owner_name: "" + # datahub_owner_urn: "urn:li:corpGroup:" + # datahub_ownership_type: "BUSINESS_OWNER" + owner_mappings: Optional[List[Dict[str, str]]] = Field( + default=None, description="Mapping of owner names to owner types" + ) + enable_owner_extraction: bool = Field( + default=False, + description="If this is disabled, then we NEVER try to map owners. " + "If this is enabled, then owner_mappings is REQUIRED to extract ownership.", + ) + enable_tag_extraction: bool = Field( + default=False, + description="If this is disabled, then we NEVER try to extract tags.", + ) @platform_name("Feast") @@ -215,10 +237,15 @@ def _get_entity_workunit( """ feature_view_name = f"{self.feature_store.project}.{feature_view.name}" + aspects = ( + [StatusClass(removed=False)] + + self._get_tags(entity) + + self._get_owners(entity) + ) entity_snapshot = MLPrimaryKeySnapshot( urn=builder.make_ml_primary_key_urn(feature_view_name, entity.name), - aspects=[StatusClass(removed=False)], + aspects=aspects, ) entity_snapshot.aspects.append( @@ -243,10 +270,11 @@ def _get_feature_workunit( Generate an MLFeature work unit for a Feast feature. """ feature_view_name = f"{self.feature_store.project}.{feature_view.name}" + aspects = [StatusClass(removed=False)] + self._get_tags(field) feature_snapshot = MLFeatureSnapshot( urn=builder.make_ml_feature_urn(feature_view_name, field.name), - aspects=[StatusClass(removed=False)], + aspects=aspects, ) feature_sources = [] @@ -295,13 +323,18 @@ def _get_feature_view_workunit(self, feature_view: FeatureView) -> MetadataWorkU """ feature_view_name = f"{self.feature_store.project}.{feature_view.name}" + aspects = ( + [ + BrowsePathsClass(paths=[f"/feast/{self.feature_store.project}"]), + StatusClass(removed=False), + ] + + self._get_tags(feature_view) + + self._get_owners(feature_view) + ) feature_view_snapshot = MLFeatureTableSnapshot( urn=builder.make_ml_feature_table_urn("feast", feature_view_name), - aspects=[ - BrowsePathsClass(paths=[f"/feast/{self.feature_store.project}"]), - StatusClass(removed=False), - ], + aspects=aspects, ) feature_view_snapshot.aspects.append( @@ -360,6 +393,64 @@ def _get_on_demand_feature_view_workunit( return MetadataWorkUnit(id=on_demand_feature_view_name, mce=mce) + # If a tag is specified in a Feast object, then the tag will be ingested into Datahub if enable_tag_extraction is + # True, otherwise NO tags will be ingested + def _get_tags(self, obj: Union[Entity, FeatureView, FeastField]) -> list: + """ + Extracts tags from the given object and returns a list of aspects. + """ + aspects: List[Union[GlobalTagsClass]] = [] + + # Extract tags + if self.source_config.enable_tag_extraction: + if obj.tags.get("name"): + tag_name: str = obj.tags["name"] + tag_association = TagAssociationClass( + tag=builder.make_tag_urn(tag_name) + ) + global_tags_aspect = GlobalTagsClass(tags=[tag_association]) + aspects.append(global_tags_aspect) + + return aspects + + # If an owner is specified in a Feast object, it will only be ingested into Datahub if owner_mappings is specified + # and enable_owner_extraction is True in FeastRepositorySourceConfig, otherwise NO owners will be ingested + def _get_owners(self, obj: Union[Entity, FeatureView, FeastField]) -> list: + """ + Extracts owners from the given object and returns a list of aspects. + """ + aspects: List[Union[OwnershipClass]] = [] + + # Extract owner + if self.source_config.enable_owner_extraction: + owner = getattr(obj, "owner", None) + if owner: + # Create owner association, skipping if None + owner_association = self._create_owner_association(owner) + if owner_association: # Only add valid owner associations + owners_aspect = OwnershipClass(owners=[owner_association]) + aspects.append(owners_aspect) + + return aspects + + def _create_owner_association(self, owner: str) -> Optional[OwnerClass]: + """ + Create an OwnerClass instance for the given owner using the owner mappings. + """ + if self.source_config.owner_mappings is not None: + for mapping in self.source_config.owner_mappings: + if mapping["feast_owner_name"] == owner: + ownership_type_class: str = mapping.get( + "datahub_ownership_type", "TECHNICAL_OWNER" + ) + datahub_owner_urn = mapping.get("datahub_owner_urn") + if datahub_owner_urn: + return OwnerClass( + owner=datahub_owner_urn, + type=ownership_type_class, + ) + return None + @classmethod def create(cls, config_dict, ctx): config = FeastRepositorySourceConfig.parse_obj(config_dict) diff --git a/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py b/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py index 258a4b9ad6daf..5931873f54236 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py +++ b/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py @@ -9,6 +9,7 @@ NoSuchIcebergTableError, NoSuchNamespaceError, NoSuchPropertyException, + NoSuchTableError, ) from pyiceberg.schema import Schema, SchemaVisitorPerPrimitiveType, visit from pyiceberg.table import Table @@ -104,7 +105,7 @@ @capability(SourceCapability.DESCRIPTIONS, "Enabled by default.") @capability( SourceCapability.OWNERSHIP, - "Optionally enabled via configuration by specifying which Iceberg table property holds user or group ownership.", + "Automatically ingests ownership information from table properties based on `user_ownership_property` and `group_ownership_property`", ) @capability(SourceCapability.DELETION_DETECTION, "Enabled via stateful ingestion") class IcebergSource(StatefulIngestionSourceBase): @@ -192,9 +193,7 @@ def _process_dataset(dataset_path: Identifier) -> Iterable[MetadataWorkUnit]: table = thread_local.local_catalog.load_table(dataset_path) time_taken = timer.elapsed_seconds() self.report.report_table_load_time(time_taken) - LOGGER.debug( - f"Loaded table: {table.identifier}, time taken: {time_taken}" - ) + LOGGER.debug(f"Loaded table: {table.name()}, time taken: {time_taken}") yield from self._create_iceberg_workunit(dataset_name, table) except NoSuchPropertyException as e: self.report.report_warning( @@ -206,12 +205,20 @@ def _process_dataset(dataset_path: Identifier) -> Iterable[MetadataWorkUnit]: ) except NoSuchIcebergTableError as e: self.report.report_warning( - "no-iceberg-table", + "not-an-iceberg-table", f"Failed to create workunit for {dataset_name}. {e}", ) LOGGER.warning( f"NoSuchIcebergTableError while processing table {dataset_path}, skipping it.", ) + except NoSuchTableError as e: + self.report.report_warning( + "no-such-table", + f"Failed to create workunit for {dataset_name}. {e}", + ) + LOGGER.warning( + f"NoSuchTableError while processing table {dataset_path}, skipping it.", + ) except Exception as e: self.report.report_failure("general", f"Failed to create workunit: {e}") LOGGER.exception( diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py b/metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py index e57dc853a83c6..709ba431f0f87 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py @@ -148,7 +148,7 @@ def get_kafka_consumer( ) -> confluent_kafka.Consumer: consumer = confluent_kafka.Consumer( { - "group.id": "test", + "group.id": "datahub-kafka-ingestion", "bootstrap.servers": connection.bootstrap, **connection.consumer_config, } @@ -164,6 +164,25 @@ def get_kafka_consumer( return consumer +def get_kafka_admin_client( + connection: KafkaConsumerConnectionConfig, +) -> AdminClient: + client = AdminClient( + { + "group.id": "datahub-kafka-ingestion", + "bootstrap.servers": connection.bootstrap, + **connection.consumer_config, + } + ) + if CallableConsumerConfig.is_callable_config(connection.consumer_config): + # As per documentation, we need to explicitly call the poll method to make sure OAuth callback gets executed + # https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#kafka-client-configuration + logger.debug("Initiating polling for kafka admin client") + client.poll(timeout=30) + logger.debug("Initiated polling for kafka admin client") + return client + + @dataclass class KafkaSourceReport(StaleEntityRemovalSourceReport): topics_scanned: int = 0 @@ -278,13 +297,7 @@ def __init__(self, config: KafkaSourceConfig, ctx: PipelineContext): def init_kafka_admin_client(self) -> None: try: # TODO: Do we require separate config than existing consumer_config ? - self.admin_client = AdminClient( - { - "group.id": "test", - "bootstrap.servers": self.source_config.connection.bootstrap, - **self.source_config.connection.consumer_config, - } - ) + self.admin_client = get_kafka_admin_client(self.source_config.connection) except Exception as e: logger.debug(e, exc_info=e) self.report.report_warning( diff --git a/metadata-ingestion/src/datahub/ingestion/source/qlik_sense/data_classes.py b/metadata-ingestion/src/datahub/ingestion/source/qlik_sense/data_classes.py index 672fcbceb0603..a43f5f32493f2 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/qlik_sense/data_classes.py +++ b/metadata-ingestion/src/datahub/ingestion/source/qlik_sense/data_classes.py @@ -15,6 +15,7 @@ TimeType, ) +# TODO: Replace with standardized types in sql_types.py FIELD_TYPE_MAPPING: Dict[ str, Type[ diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py index 4bc4c1451c262..06cbb7fbae27c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py @@ -222,6 +222,7 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource): ``` """ + # TODO: Replace with standardized types in sql_types.py REDSHIFT_FIELD_TYPE_MAPPINGS: Dict[ str, Type[ diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py index d4442749a0622..2bd8e8017f549 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py @@ -103,6 +103,7 @@ logger = logging.getLogger(__name__) # https://docs.snowflake.com/en/sql-reference/intro-summary-data-types.html +# TODO: Move to the standardized types in sql_types.py SNOWFLAKE_FIELD_TYPE_MAPPINGS = { "DATE": DateType, "BIGINT": NumberType, diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py b/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py index 71cfd0268ee6b..6f7decc79b1df 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py @@ -26,6 +26,7 @@ platform_name, support_status, ) +from datahub.ingestion.api.source import StructuredLogLevel from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.aws.s3_util import make_s3_urn from datahub.ingestion.source.common.subtypes import DatasetContainerSubTypes @@ -35,6 +36,7 @@ register_custom_type, ) from datahub.ingestion.source.sql.sql_config import SQLCommonConfig, make_sqlalchemy_uri +from datahub.ingestion.source.sql.sql_report import SQLSourceReport from datahub.ingestion.source.sql.sql_utils import ( add_table_to_schema_container, gen_database_container, @@ -48,6 +50,15 @@ get_schema_fields_for_sqlalchemy_column, ) +try: + from typing_extensions import override +except ImportError: + _F = typing.TypeVar("_F", bound=typing.Callable[..., typing.Any]) + + def override(f: _F, /) -> _F: # noqa: F811 + return f + + logger = logging.getLogger(__name__) assert STRUCT, "required type modules are not available" @@ -322,12 +333,15 @@ class AthenaSource(SQLAlchemySource): - Profiling when enabled. """ - table_partition_cache: Dict[str, Dict[str, Partitionitem]] = {} + config: AthenaConfig + report: SQLSourceReport def __init__(self, config, ctx): super().__init__(config, ctx, "athena") self.cursor: Optional[BaseCursor] = None + self.table_partition_cache: Dict[str, Dict[str, Partitionitem]] = {} + @classmethod def create(cls, config_dict, ctx): config = AthenaConfig.parse_obj(config_dict) @@ -452,6 +466,7 @@ def add_table_to_schema_container( ) # It seems like database/schema filter in the connection string does not work and this to work around that + @override def get_schema_names(self, inspector: Inspector) -> List[str]: athena_config = typing.cast(AthenaConfig, self.config) schemas = inspector.get_schema_names() @@ -459,34 +474,42 @@ def get_schema_names(self, inspector: Inspector) -> List[str]: return [schema for schema in schemas if schema == athena_config.database] return schemas - # Overwrite to get partitions + @classmethod + def _casted_partition_key(cls, key: str) -> str: + # We need to cast the partition keys to a VARCHAR, since otherwise + # Athena may throw an error during concatenation / comparison. + return f"CAST({key} as VARCHAR)" + + @override def get_partitions( self, inspector: Inspector, schema: str, table: str - ) -> List[str]: - partitions = [] - - athena_config = typing.cast(AthenaConfig, self.config) - - if not athena_config.extract_partitions: - return [] + ) -> Optional[List[str]]: + if not self.config.extract_partitions: + return None if not self.cursor: - return [] + return None metadata: AthenaTableMetadata = self.cursor.get_table_metadata( table_name=table, schema_name=schema ) - if metadata.partition_keys: - for key in metadata.partition_keys: - if key.name: - partitions.append(key.name) - - if not partitions: - return [] + partitions = [] + for key in metadata.partition_keys: + if key.name: + partitions.append(key.name) + if not partitions: + return [] - # We create an artiificaial concatenated partition key to be able to query max partition easier - part_concat = "|| '-' ||".join(partitions) + with self.report.report_exc( + message="Failed to extract partition details", + context=f"{schema}.{table}", + level=StructuredLogLevel.WARN, + ): + # We create an artifical concatenated partition key to be able to query max partition easier + part_concat = " || '-' || ".join( + self._casted_partition_key(key) for key in partitions + ) max_partition_query = f'select {",".join(partitions)} from "{schema}"."{table}$partitions" where {part_concat} = (select max({part_concat}) from "{schema}"."{table}$partitions")' ret = self.cursor.execute(max_partition_query) max_partition: Dict[str, str] = {} @@ -500,9 +523,8 @@ def get_partitions( partitions=partitions, max_partition=max_partition, ) - return partitions - return [] + return partitions # Overwrite to modify the creation of schema fields def get_schema_fields_for_column( @@ -551,7 +573,9 @@ def generate_partition_profiler_query( if partition and partition.max_partition: max_partition_filters = [] for key, value in partition.max_partition.items(): - max_partition_filters.append(f"CAST({key} as VARCHAR) = '{value}'") + max_partition_filters.append( + f"{self._casted_partition_key(key)} = '{value}'" + ) max_partition = str(partition.max_partition) return ( max_partition, diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_types.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_types.py index 8ea4209784063..89ca160ba1f48 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_types.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_types.py @@ -1,5 +1,5 @@ import re -from typing import Any, Dict, ValuesView +from typing import Any, Dict, Optional, Type, Union, ValuesView from datahub.metadata.com.linkedin.pegasus2avro.schema import ( ArrayType, @@ -16,14 +16,28 @@ UnionType, ) -# these can be obtained by running `select format_type(oid, null),* from pg_type;` -# we've omitted the types without a meaningful DataHub type (e.g. postgres-specific types, index vectors, etc.) -# (run `\copy (select format_type(oid, null),* from pg_type) to 'pg_type.csv' csv header;` to get a CSV) +DATAHUB_FIELD_TYPE = Union[ + ArrayType, + BooleanType, + BytesType, + DateType, + EnumType, + MapType, + NullType, + NumberType, + RecordType, + StringType, + TimeType, + UnionType, +] -# we map from format_type since this is what dbt uses -# see https://github.com/fishtown-analytics/dbt/blob/master/plugins/postgres/dbt/include/postgres/macros/catalog.sql#L22 -# see https://www.npgsql.org/dev/types.html for helpful type annotations +# These can be obtained by running `select format_type(oid, null),* from pg_type;` +# We've omitted the types without a meaningful DataHub type (e.g. postgres-specific types, index vectors, etc.) +# (run `\copy (select format_type(oid, null),* from pg_type) to 'pg_type.csv' csv header;` to get a CSV) +# We map from format_type since this is what dbt uses. +# See https://github.com/fishtown-analytics/dbt/blob/master/plugins/postgres/dbt/include/postgres/macros/catalog.sql#L22 +# See https://www.npgsql.org/dev/types.html for helpful type annotations POSTGRES_TYPES_MAP: Dict[str, Any] = { "boolean": BooleanType, "bytea": BytesType, @@ -430,3 +444,54 @@ def resolve_vertica_modified_type(type_string: str) -> Any: "geography": None, "uuid": StringType, } + + +_merged_mapping = { + "boolean": BooleanType, + "date": DateType, + "time": TimeType, + "numeric": NumberType, + "text": StringType, + "timestamp with time zone": DateType, + "timestamp without time zone": DateType, + "integer": NumberType, + "float8": NumberType, + "struct": RecordType, + **POSTGRES_TYPES_MAP, + **SNOWFLAKE_TYPES_MAP, + **BIGQUERY_TYPES_MAP, + **SPARK_SQL_TYPES_MAP, + **TRINO_SQL_TYPES_MAP, + **ATHENA_SQL_TYPES_MAP, + **VERTICA_SQL_TYPES_MAP, +} + + +def resolve_sql_type( + column_type: Optional[str], + platform: Optional[str] = None, +) -> Optional[DATAHUB_FIELD_TYPE]: + # In theory, we should use the platform-specific mapping where available. + # However, the types don't ever conflict, so the merged mapping is fine. + TypeClass: Optional[Type[DATAHUB_FIELD_TYPE]] = ( + _merged_mapping.get(column_type) if column_type else None + ) + + if TypeClass is None and column_type: + # resolve a modified type + if platform == "trino": + TypeClass = resolve_trino_modified_type(column_type) + elif platform == "athena": + TypeClass = resolve_athena_modified_type(column_type) + elif platform == "postgres" or platform == "redshift": + # Redshift uses a variant of Postgres, so we can use the same logic. + TypeClass = resolve_postgres_modified_type(column_type) + elif platform == "vertica": + TypeClass = resolve_vertica_modified_type(column_type) + elif platform == "snowflake": + # Snowflake types are uppercase, so we check that. + TypeClass = _merged_mapping.get(column_type.upper()) + + if TypeClass: + return TypeClass() + return None diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy_types.py b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy_types.py index f84f6c1b0c08d..9c5752c518df1 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy_types.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy_types.py @@ -33,6 +33,7 @@ logger = logging.getLogger(__name__) +# TODO: (maybe) Replace with standardized types in sql_types.py DATA_TYPE_REGISTRY: dict = { ColumnTypeName.BOOLEAN: BooleanTypeClass, ColumnTypeName.BYTE: BytesTypeClass, diff --git a/metadata-ingestion/src/datahub/utilities/urn_encoder.py b/metadata-ingestion/src/datahub/utilities/urn_encoder.py index 88c0a128b8e46..4f19eeff3e70f 100644 --- a/metadata-ingestion/src/datahub/utilities/urn_encoder.py +++ b/metadata-ingestion/src/datahub/utilities/urn_encoder.py @@ -4,7 +4,8 @@ # NOTE: Frontend relies on encoding these three characters. Specifically, we decode and encode schema fields for column level lineage. # If this changes, make appropriate changes to datahub-web-react/src/app/lineage/utils/columnLineageUtils.ts # We also rely on encoding these exact three characters when generating schemaField urns in our graphQL layer. Update SchemaFieldUtils if this changes. -RESERVED_CHARS = {",", "(", ")"} +# Also see https://datahubproject.io/docs/what/urn/#restrictions +RESERVED_CHARS = {",", "(", ")", "␟"} RESERVED_CHARS_EXTENDED = RESERVED_CHARS.union({"%"}) diff --git a/metadata-ingestion/tests/integration/dbt/test_dbt.py b/metadata-ingestion/tests/integration/dbt/test_dbt.py index 390d8d7698dd4..c6a3dc4fd590b 100644 --- a/metadata-ingestion/tests/integration/dbt/test_dbt.py +++ b/metadata-ingestion/tests/integration/dbt/test_dbt.py @@ -11,12 +11,6 @@ from datahub.ingestion.run.pipeline_config import PipelineConfig, SourceConfig from datahub.ingestion.source.dbt.dbt_common import DBTEntitiesEnabled, EmitDirective from datahub.ingestion.source.dbt.dbt_core import DBTCoreConfig, DBTCoreSource -from datahub.ingestion.source.sql.sql_types import ( - ATHENA_SQL_TYPES_MAP, - TRINO_SQL_TYPES_MAP, - resolve_athena_modified_type, - resolve_trino_modified_type, -) from tests.test_helpers import mce_helpers, test_connection_helpers FROZEN_TIME = "2022-02-03 07:00:00" @@ -362,69 +356,6 @@ def test_dbt_tests(test_resources_dir, pytestconfig, tmp_path, mock_time, **kwar ) -@pytest.mark.parametrize( - "data_type, expected_data_type", - [ - ("boolean", "boolean"), - ("tinyint", "tinyint"), - ("smallint", "smallint"), - ("int", "int"), - ("integer", "integer"), - ("bigint", "bigint"), - ("real", "real"), - ("double", "double"), - ("decimal(10,0)", "decimal"), - ("varchar(20)", "varchar"), - ("char", "char"), - ("varbinary", "varbinary"), - ("json", "json"), - ("date", "date"), - ("time", "time"), - ("time(12)", "time"), - ("timestamp", "timestamp"), - ("timestamp(3)", "timestamp"), - ("row(x bigint, y double)", "row"), - ("array(row(x bigint, y double))", "array"), - ("map(varchar, varchar)", "map"), - ], -) -def test_resolve_trino_modified_type(data_type, expected_data_type): - assert ( - resolve_trino_modified_type(data_type) - == TRINO_SQL_TYPES_MAP[expected_data_type] - ) - - -@pytest.mark.parametrize( - "data_type, expected_data_type", - [ - ("boolean", "boolean"), - ("tinyint", "tinyint"), - ("smallint", "smallint"), - ("int", "int"), - ("integer", "integer"), - ("bigint", "bigint"), - ("float", "float"), - ("double", "double"), - ("decimal(10,0)", "decimal"), - ("varchar(20)", "varchar"), - ("char", "char"), - ("binary", "binary"), - ("date", "date"), - ("timestamp", "timestamp"), - ("timestamp(3)", "timestamp"), - ("struct", "struct"), - ("array>", "array"), - ("map", "map"), - ], -) -def test_resolve_athena_modified_type(data_type, expected_data_type): - assert ( - resolve_athena_modified_type(data_type) - == ATHENA_SQL_TYPES_MAP[expected_data_type] - ) - - @pytest.mark.integration @freeze_time(FROZEN_TIME) def test_dbt_tests_only_assertions( diff --git a/metadata-ingestion/tests/integration/feast/feast_repository_mces_golden.json b/metadata-ingestion/tests/integration/feast/feast_repository_mces_golden.json index 1b91925289845..a4fd9843c5cf4 100644 --- a/metadata-ingestion/tests/integration/feast/feast_repository_mces_golden.json +++ b/metadata-ingestion/tests/integration/feast/feast_repository_mces_golden.json @@ -9,8 +9,33 @@ "removed": false } }, + { + "com.linkedin.pegasus2avro.common.GlobalTags": { + "tags": [ + { + "tag": "urn:li:tag:deprecated" + } + ] + } + }, + { + "com.linkedin.pegasus2avro.common.Ownership": { + "owners": [ + { + "owner": "urn:li:corpGroup:MOCK_OWNER", + "type": "BUSINESS_OWNER" + } + ], + "ownerTypes": {}, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + } + } + }, { "com.linkedin.pegasus2avro.ml.metadata.MLPrimaryKeyProperties": { + "customProperties": {}, "description": "Driver ID", "dataType": "ORDINAL", "sources": [ @@ -23,7 +48,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "feast-repository-test" + "runId": "feast-repository-test", + "lastRunId": "no-run-id-provided" } }, { @@ -36,8 +62,18 @@ "removed": false } }, + { + "com.linkedin.pegasus2avro.common.GlobalTags": { + "tags": [ + { + "tag": "urn:li:tag:needs_documentation" + } + ] + } + }, { "com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": { + "customProperties": {}, "description": "Conv rate", "dataType": "CONTINUOUS", "sources": [ @@ -50,7 +86,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "feast-repository-test" + "runId": "feast-repository-test", + "lastRunId": "no-run-id-provided" } }, { @@ -65,6 +102,7 @@ }, { "com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": { + "customProperties": {}, "description": "Acc rate", "dataType": "CONTINUOUS", "sources": [ @@ -77,7 +115,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "feast-repository-test" + "runId": "feast-repository-test", + "lastRunId": "no-run-id-provided" } }, { @@ -92,6 +131,7 @@ }, { "com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": { + "customProperties": {}, "description": "Avg daily trips", "dataType": "ORDINAL", "sources": [ @@ -104,7 +144,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "feast-repository-test" + "runId": "feast-repository-test", + "lastRunId": "no-run-id-provided" } }, { @@ -119,6 +160,7 @@ }, { "com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": { + "customProperties": {}, "description": "String feature", "dataType": "TEXT", "sources": [ @@ -131,7 +173,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "feast-repository-test" + "runId": "feast-repository-test", + "lastRunId": "no-run-id-provided" } }, { @@ -151,6 +194,30 @@ "removed": false } }, + { + "com.linkedin.pegasus2avro.common.GlobalTags": { + "tags": [ + { + "tag": "urn:li:tag:deprecated" + } + ] + } + }, + { + "com.linkedin.pegasus2avro.common.Ownership": { + "owners": [ + { + "owner": "urn:li:corpGroup:MOCK_OWNER", + "type": "BUSINESS_OWNER" + } + ], + "ownerTypes": {}, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + } + } + }, { "com.linkedin.pegasus2avro.ml.metadata.MLFeatureTableProperties": { "customProperties": {}, @@ -170,7 +237,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "feast-repository-test" + "runId": "feast-repository-test", + "lastRunId": "no-run-id-provided" } }, { @@ -189,7 +257,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "feast-repository-test" + "runId": "feast-repository-test", + "lastRunId": "no-run-id-provided" } }, { @@ -204,6 +273,7 @@ }, { "com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": { + "customProperties": {}, "dataType": "CONTINUOUS", "sources": [ "urn:li:dataset:(urn:li:dataPlatform:request,vals_to_add,PROD)", @@ -216,7 +286,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "feast-repository-test" + "runId": "feast-repository-test", + "lastRunId": "no-run-id-provided" } }, { @@ -231,6 +302,7 @@ }, { "com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": { + "customProperties": {}, "dataType": "CONTINUOUS", "sources": [ "urn:li:dataset:(urn:li:dataPlatform:request,vals_to_add,PROD)", @@ -243,7 +315,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "feast-repository-test" + "runId": "feast-repository-test", + "lastRunId": "no-run-id-provided" } }, { @@ -278,7 +351,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "feast-repository-test" + "runId": "feast-repository-test", + "lastRunId": "no-run-id-provided" } }, { @@ -297,7 +371,40 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "feast-repository-test" + "runId": "feast-repository-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "tag", + "entityUrn": "urn:li:tag:deprecated", + "changeType": "UPSERT", + "aspectName": "tagKey", + "aspect": { + "json": { + "name": "deprecated" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "feast-repository-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "tag", + "entityUrn": "urn:li:tag:needs_documentation", + "changeType": "UPSERT", + "aspectName": "tagKey", + "aspect": { + "json": { + "name": "needs_documentation" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "feast-repository-test", + "lastRunId": "no-run-id-provided" } } ] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/feast/feature_store/data/registry.db b/metadata-ingestion/tests/integration/feast/feature_store/data/registry.db index a511ff56c9770..5dca29d92afe5 100644 Binary files a/metadata-ingestion/tests/integration/feast/feature_store/data/registry.db and b/metadata-ingestion/tests/integration/feast/feature_store/data/registry.db differ diff --git a/metadata-ingestion/tests/integration/feast/feature_store/features.py b/metadata-ingestion/tests/integration/feast/feature_store/features.py index a6e6cd3616e92..dcfd417637958 100644 --- a/metadata-ingestion/tests/integration/feast/feature_store/features.py +++ b/metadata-ingestion/tests/integration/feast/feature_store/features.py @@ -19,6 +19,8 @@ join_keys=["driver_id"], value_type=ValueType.INT64, description="Driver ID", + owner="MOCK_OWNER", + tags={"name": "deprecated"}, ) driver_hourly_stats_view = FeatureView( @@ -29,7 +31,7 @@ Field( name="conv_rate", dtype=feast.types.Float64, - tags=dict(description="Conv rate"), + tags={"name": "needs_documentation", "description": "Conv rate"}, ), Field( name="acc_rate", @@ -49,7 +51,8 @@ ], online=True, source=driver_hourly_stats_source, - tags={}, + tags={"name": "deprecated"}, + owner="MOCK_OWNER", ) input_request = RequestSource( diff --git a/metadata-ingestion/tests/integration/feast/test_feast_repository.py b/metadata-ingestion/tests/integration/feast/test_feast_repository.py index a6bdce6722289..7f04337145dc3 100644 --- a/metadata-ingestion/tests/integration/feast/test_feast_repository.py +++ b/metadata-ingestion/tests/integration/feast/test_feast_repository.py @@ -19,6 +19,15 @@ def test_feast_repository_ingest(pytestconfig, tmp_path, mock_time): "config": { "path": str(test_resources_dir / "feature_store"), "environment": "PROD", + "enable_tag_extraction": True, + "enable_owner_extraction": True, + "owner_mappings": [ + { + "feast_owner_name": "MOCK_OWNER", + "datahub_owner_urn": "urn:li:corpGroup:MOCK_OWNER", + "datahub_ownership_type": "BUSINESS_OWNER", + } + ], }, }, "sink": { diff --git a/metadata-ingestion/tests/integration/kafka/test_kafka.py b/metadata-ingestion/tests/integration/kafka/test_kafka.py index 597889c8440b7..7462f177684b7 100644 --- a/metadata-ingestion/tests/integration/kafka/test_kafka.py +++ b/metadata-ingestion/tests/integration/kafka/test_kafka.py @@ -128,11 +128,32 @@ def test_kafka_oauth_callback( pipeline.run() - is_found: bool = False + # Initialize flags to track oauth events + checks = { + "consumer_polling": False, + "consumer_oauth_callback": False, + "admin_polling": False, + "admin_oauth_callback": False, + } + + # Read log file and check for oauth events with open(log_file, "r") as file: - for line_number, line in enumerate(file, 1): + for line in file: + # Check for polling events + if "Initiating polling for kafka admin client" in line: + checks["admin_polling"] = True + elif "Initiating polling for kafka consumer" in line: + checks["consumer_polling"] = True + + # Check for oauth callbacks if oauth.MESSAGE in line: - is_found = True - break - - assert is_found + if checks["consumer_polling"] and not checks["admin_polling"]: + checks["consumer_oauth_callback"] = True + elif checks["consumer_polling"] and checks["admin_polling"]: + checks["admin_oauth_callback"] = True + + # Verify all oauth events occurred + assert checks["consumer_polling"], "Consumer polling was not initiated" + assert checks["consumer_oauth_callback"], "Consumer oauth callback not found" + assert checks["admin_polling"], "Admin polling was not initiated" + assert checks["admin_oauth_callback"], "Admin oauth callback not found" diff --git a/metadata-ingestion/tests/unit/test_athena_source.py b/metadata-ingestion/tests/unit/test_athena_source.py index 875cf3800daf8..f8b6220d18273 100644 --- a/metadata-ingestion/tests/unit/test_athena_source.py +++ b/metadata-ingestion/tests/unit/test_athena_source.py @@ -93,7 +93,8 @@ def test_athena_get_table_properties(): "CreateTime": datetime.now(), "LastAccessTime": datetime.now(), "PartitionKeys": [ - {"Name": "testKey", "Type": "string", "Comment": "testComment"} + {"Name": "year", "Type": "string", "Comment": "testComment"}, + {"Name": "month", "Type": "string", "Comment": "testComment"}, ], "Parameters": { "comment": "testComment", @@ -112,8 +113,18 @@ def test_athena_get_table_properties(): response=table_metadata ) + # Mock partition query results + mock_cursor.execute.return_value.description = [ + ["year"], + ["month"], + ] + mock_cursor.execute.return_value.__iter__.return_value = [["2023", "12"]] + ctx = PipelineContext(run_id="test") source = AthenaSource(config=config, ctx=ctx) + source.cursor = mock_cursor + + # Test table properties description, custom_properties, location = source.get_table_properties( inspector=mock_inspector, table=table, schema=schema ) @@ -124,13 +135,35 @@ def test_athena_get_table_properties(): "last_access_time": "2020-04-14 07:00:00", "location": "s3://testLocation", "outputformat": "testOutputFormat", - "partition_keys": '[{"name": "testKey", "type": "string", "comment": "testComment"}]', + "partition_keys": '[{"name": "year", "type": "string", "comment": "testComment"}, {"name": "month", "type": "string", "comment": "testComment"}]', "serde.serialization.lib": "testSerde", "table_type": "testType", } - assert location == make_s3_urn("s3://testLocation", "PROD") + # Test partition functionality + partitions = source.get_partitions( + inspector=mock_inspector, schema=schema, table=table + ) + assert partitions == ["year", "month"] + + # Verify the correct SQL query was generated for partitions + expected_query = """\ +select year,month from "test_schema"."test_table$partitions" \ +where CAST(year as VARCHAR) || '-' || CAST(month as VARCHAR) = \ +(select max(CAST(year as VARCHAR) || '-' || CAST(month as VARCHAR)) \ +from "test_schema"."test_table$partitions")""" + mock_cursor.execute.assert_called_once() + actual_query = mock_cursor.execute.call_args[0][0] + assert actual_query == expected_query + + # Verify partition cache was populated correctly + assert source.table_partition_cache[schema][table].partitions == partitions + assert source.table_partition_cache[schema][table].max_partition == { + "year": "2023", + "month": "12", + } + def test_get_column_type_simple_types(): assert isinstance( @@ -214,3 +247,9 @@ def test_column_type_complex_combination(): assert isinstance( result._STRUCT_fields[2][1].item_type._STRUCT_fields[1][1], types.String ) + + +def test_casted_partition_key(): + from datahub.ingestion.source.sql.athena import AthenaSource + + assert AthenaSource._casted_partition_key("test_col") == "CAST(test_col as VARCHAR)" diff --git a/metadata-ingestion/tests/unit/test_sql_types.py b/metadata-ingestion/tests/unit/test_sql_types.py new file mode 100644 index 0000000000000..ebe5ade115cdd --- /dev/null +++ b/metadata-ingestion/tests/unit/test_sql_types.py @@ -0,0 +1,78 @@ +import pytest + +from datahub.ingestion.source.sql.sql_types import ( + ATHENA_SQL_TYPES_MAP, + TRINO_SQL_TYPES_MAP, + resolve_athena_modified_type, + resolve_sql_type, + resolve_trino_modified_type, +) +from datahub.metadata.schema_classes import BooleanTypeClass, StringTypeClass + + +@pytest.mark.parametrize( + "data_type, expected_data_type", + [ + ("boolean", "boolean"), + ("tinyint", "tinyint"), + ("smallint", "smallint"), + ("int", "int"), + ("integer", "integer"), + ("bigint", "bigint"), + ("real", "real"), + ("double", "double"), + ("decimal(10,0)", "decimal"), + ("varchar(20)", "varchar"), + ("char", "char"), + ("varbinary", "varbinary"), + ("json", "json"), + ("date", "date"), + ("time", "time"), + ("time(12)", "time"), + ("timestamp", "timestamp"), + ("timestamp(3)", "timestamp"), + ("row(x bigint, y double)", "row"), + ("array(row(x bigint, y double))", "array"), + ("map(varchar, varchar)", "map"), + ], +) +def test_resolve_trino_modified_type(data_type, expected_data_type): + assert ( + resolve_trino_modified_type(data_type) + == TRINO_SQL_TYPES_MAP[expected_data_type] + ) + + +@pytest.mark.parametrize( + "data_type, expected_data_type", + [ + ("boolean", "boolean"), + ("tinyint", "tinyint"), + ("smallint", "smallint"), + ("int", "int"), + ("integer", "integer"), + ("bigint", "bigint"), + ("float", "float"), + ("double", "double"), + ("decimal(10,0)", "decimal"), + ("varchar(20)", "varchar"), + ("char", "char"), + ("binary", "binary"), + ("date", "date"), + ("timestamp", "timestamp"), + ("timestamp(3)", "timestamp"), + ("struct", "struct"), + ("array>", "array"), + ("map", "map"), + ], +) +def test_resolve_athena_modified_type(data_type, expected_data_type): + assert ( + resolve_athena_modified_type(data_type) + == ATHENA_SQL_TYPES_MAP[expected_data_type] + ) + + +def test_resolve_sql_type() -> None: + assert resolve_sql_type("boolean") == BooleanTypeClass() + assert resolve_sql_type("varchar") == StringTypeClass() diff --git a/metadata-ingestion/tests/unit/urns/test_urn.py b/metadata-ingestion/tests/unit/urns/test_urn.py index 1bf48082fec8c..73badb3d1b423 100644 --- a/metadata-ingestion/tests/unit/urns/test_urn.py +++ b/metadata-ingestion/tests/unit/urns/test_urn.py @@ -1,6 +1,12 @@ import pytest -from datahub.metadata.urns import DatasetUrn, Urn +from datahub.metadata.urns import ( + CorpUserUrn, + DashboardUrn, + DataPlatformUrn, + DatasetUrn, + Urn, +) from datahub.utilities.urns.error import InvalidUrnError pytestmark = pytest.mark.filterwarnings("ignore::DeprecationWarning") @@ -36,20 +42,51 @@ def test_url_encode_urn() -> None: def test_invalid_urn() -> None: with pytest.raises(InvalidUrnError): - Urn.create_from_string("urn:li:abc") + Urn.from_string("urn:li:abc") with pytest.raises(InvalidUrnError): - Urn.create_from_string("urn:li:abc:") + Urn.from_string("urn:li:abc:") with pytest.raises(InvalidUrnError): - Urn.create_from_string("urn:li:abc:()") + Urn.from_string("urn:li:abc:()") with pytest.raises(InvalidUrnError): - Urn.create_from_string("urn:li:abc:(abc,)") + Urn.from_string("urn:li:abc:(abc,)") + + with pytest.raises(InvalidUrnError): + Urn.from_string("urn:li:corpuser:abc)") + + +def test_urn_colon() -> None: + # Colon characters are valid in urns, and should not mess up parsing. + + urn = Urn.from_string( + "urn:li:dashboard:(looker,dashboards.thelook::customer_lookup)" + ) + assert isinstance(urn, DashboardUrn) + + assert DataPlatformUrn.from_string("urn:li:dataPlatform:abc:def") + assert DatasetUrn.from_string( + "urn:li:dataset:(urn:li:dataPlatform:abc:def,table_name,PROD)" + ) + assert Urn.from_string("urn:li:corpuser:foo:bar@example.com") + + # I'm not sure why you'd ever want this, but technically it's a valid urn. + urn = Urn.from_string("urn:li:corpuser::") + assert isinstance(urn, CorpUserUrn) + assert urn.username == ":" + assert urn == CorpUserUrn(":") + + +def test_urn_coercion() -> None: + urn = CorpUserUrn("foo␟bar") + assert urn.urn() == "urn:li:corpuser:foo%E2%90%9Fbar" + + assert urn == Urn.from_string(urn.urn()) def test_urn_type_dispatch() -> None: - urn = Urn.from_string("urn:li:dataset:(urn:li:dataPlatform:abc,def,prod)") + urn = Urn.from_string("urn:li:dataset:(urn:li:dataPlatform:abc,def,PROD)") assert isinstance(urn, DatasetUrn) with pytest.raises(InvalidUrnError, match="Passed an urn of type corpuser"): diff --git a/metadata-io/metadata-io-api/build.gradle b/metadata-io/metadata-io-api/build.gradle index b8028fad07bb6..5273177b75281 100644 --- a/metadata-io/metadata-io-api/build.gradle +++ b/metadata-io/metadata-io-api/build.gradle @@ -16,3 +16,7 @@ dependencies { testImplementation externalDependency.lombok testAnnotationProcessor externalDependency.lombok } + +test { + environment 'STRICT_URN_VALIDATION_ENABLED', 'true' +} \ No newline at end of file diff --git a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/validation/ValidationApiUtils.java b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/validation/ValidationApiUtils.java index c2e1c47eca1fd..5e1f09fcc6439 100644 --- a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/validation/ValidationApiUtils.java +++ b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/validation/ValidationApiUtils.java @@ -30,7 +30,8 @@ public class ValidationApiUtils { // Related to BrowsePathv2 public static final String URN_DELIMITER_SEPARATOR = "␟"; // https://datahubproject.io/docs/what/urn/#restrictions - public static final Set ILLEGAL_URN_COMPONENT_CHARACTERS = Set.of(":", "(", ")", ","); + public static final Set ILLEGAL_URN_COMPONENT_CHARACTERS = Set.of("(", ")"); + public static final Set ILLEGAL_URN_TUPLE_CHARACTERS = Set.of(","); /** * Validates a {@link RecordTemplate} and throws {@link ValidationException} if validation fails. @@ -86,11 +87,10 @@ public static void validateUrn( "Error: URN cannot contain " + URN_DELIMITER_SEPARATOR + " character"); } + int totalParts = urn.getEntityKey().getParts().size(); List illegalComponents = urn.getEntityKey().getParts().stream() - .flatMap(ValidationApiUtils::processUrnPartRecursively) - .filter( - urnPart -> ILLEGAL_URN_COMPONENT_CHARACTERS.stream().anyMatch(urnPart::contains)) + .flatMap(part -> processUrnPartRecursively(part, totalParts)) .collect(Collectors.toList()); if (!illegalComponents.isEmpty()) { @@ -114,15 +114,25 @@ public static void validateUrn( } /** Recursively process URN parts with URL decoding */ - private static Stream processUrnPartRecursively(String urnPart) { + private static Stream processUrnPartRecursively(String urnPart, int totalParts) { String decodedPart = URLDecoder.decode(URLEncodingFixer.fixURLEncoding(urnPart), StandardCharsets.UTF_8); if (decodedPart.startsWith("urn:li:")) { // Recursively process nested URN after decoding + int nestedParts = UrnUtils.getUrn(decodedPart).getEntityKey().getParts().size(); return UrnUtils.getUrn(decodedPart).getEntityKey().getParts().stream() - .flatMap(ValidationApiUtils::processUrnPartRecursively); + .flatMap(part -> processUrnPartRecursively(part, nestedParts)); } - return Stream.of(decodedPart); + if (totalParts > 1) { + if (ILLEGAL_URN_TUPLE_CHARACTERS.stream().anyMatch(c -> urnPart.contains(c))) { + return Stream.of(urnPart); + } + } + if (ILLEGAL_URN_COMPONENT_CHARACTERS.stream().anyMatch(c -> urnPart.contains(c))) { + return Stream.of(urnPart); + } + + return Stream.empty(); } /** diff --git a/metadata-io/metadata-io-api/src/test/java/com/linkedin/metadata/entity/validation/ValidationApiUtilsTest.java b/metadata-io/metadata-io-api/src/test/java/com/linkedin/metadata/entity/validation/ValidationApiUtilsTest.java index e683e594d8766..a2c9a15d92f90 100644 --- a/metadata-io/metadata-io-api/src/test/java/com/linkedin/metadata/entity/validation/ValidationApiUtilsTest.java +++ b/metadata-io/metadata-io-api/src/test/java/com/linkedin/metadata/entity/validation/ValidationApiUtilsTest.java @@ -18,10 +18,36 @@ public void testValidateDatasetUrn() { // If no exception is thrown, test passes } - @Test(expectedExceptions = IllegalArgumentException.class) + @Test public void testSimpleUrnColon() { - Urn invalidUrn = UrnUtils.getUrn("urn:li:corpuser:foo:bar"); - ValidationApiUtils.validateUrn(entityRegistry, invalidUrn, true); + ValidationApiUtils.validateUrn( + entityRegistry, UrnUtils.getUrn("urn:li:corpuser:foo:bar"), true); + ValidationApiUtils.validateUrn( + entityRegistry, UrnUtils.getUrn("urn:li:dataPlatform:abc:def"), true); + ValidationApiUtils.validateUrn( + entityRegistry, UrnUtils.getUrn("urn:li:corpuser:foo:bar@example.com"), true); + // If no exception is thrown, test passes + } + + @Test + public void testSimpleUrnComma() { + ValidationApiUtils.validateUrn(entityRegistry, UrnUtils.getUrn("urn:li:corpuser:,"), true); + // If no exception is thrown, test passes + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testTupleUrnComma() { + ValidationApiUtils.validateUrn( + entityRegistry, UrnUtils.getUrn("urn:li:dashboard:(looker,dashboards,thelook)"), true); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testFabricTypeCasing() { + // prod != PROD + ValidationApiUtils.validateUrn( + entityRegistry, + UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:abc:def,table_name,prod)"), + true); } @Test @@ -34,7 +60,7 @@ public void testComplexUrnColon() throws URISyntaxException { } @Test(expectedExceptions = IllegalArgumentException.class) - public void testUrnFabricType() { + public void testFabricTypeParen() { Urn invalidUrn = UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hdfs,/path/to/data,())"); ValidationApiUtils.validateUrn(entityRegistry, invalidUrn, true); } @@ -83,20 +109,20 @@ public void testValidComplexUrn() { UrnUtils.getUrn( "urn:li:dataset:(urn:li:dataPlatform:bigquery,myproject.dataset.table,PROD)"); - ValidationApiUtils.validateUrn(entityRegistry, validUrn); + ValidationApiUtils.validateUrn(entityRegistry, validUrn, true); // If no exception is thrown, test passes } @Test(expectedExceptions = NullPointerException.class) public void testUrnNull() { - ValidationApiUtils.validateUrn(entityRegistry, null); + ValidationApiUtils.validateUrn(entityRegistry, null, true); } @Test public void testValidPartialUrlEncode() { Urn validUrn = UrnUtils.getUrn("urn:li:assertion:123=-%28__% weekly__%29"); - ValidationApiUtils.validateUrn(entityRegistry, validUrn); + ValidationApiUtils.validateUrn(entityRegistry, validUrn, true); // If no exception is thrown, test passes } @@ -106,7 +132,23 @@ public void testValidPartialUrlEncode2() { UrnUtils.getUrn( "urn:li:dataset:(urn:li:dataPlatform:s3,urn:li:dataset:%28urn:li:dataPlatform:s3%2Ctest-datalake-concepts%prog_maintenance%2CPROD%29,PROD)"); - ValidationApiUtils.validateUrn(entityRegistry, validUrn); + ValidationApiUtils.validateUrn(entityRegistry, validUrn, true); + // If no exception is thrown, test passes + } + + @Test + public void testValidColon() { + Urn validUrn = + UrnUtils.getUrn("urn:li:dashboard:(looker,dashboards.thelook::cohort_data_tool)"); + + ValidationApiUtils.validateUrn(entityRegistry, validUrn, true); + // If no exception is thrown, test passes + } + + @Test + public void testNoTupleComma() { + Urn invalidUrn = UrnUtils.getUrn("urn:li:corpuser:,"); + ValidationApiUtils.validateUrn(entityRegistry, invalidUrn, true); // If no exception is thrown, test passes } } diff --git a/metadata-service/configuration/src/main/resources/search_config.yaml b/metadata-service/configuration/src/main/resources/search_config.yaml index e93f8af8b1d6c..47494c8cb1ca4 100644 --- a/metadata-service/configuration/src/main/resources/search_config.yaml +++ b/metadata-service/configuration/src/main/resources/search_config.yaml @@ -65,9 +65,9 @@ queryConfigurations: boost_mode: replace # Criteria for exact-match only - # Contains quotes, is a single term with `_`, `.`, or `-` (normally consider for tokenization) then use exact match query + # Contains quotes then use exact match query - queryRegex: >- - ^["'].+["']$|^[a-zA-Z0-9]\S+[_.-]\S+[a-zA-Z0-9]$ + ^["'].+["']$ simpleQuery: false prefixMatchQuery: true exactMatchQuery: true diff --git a/smoke-test/tests/cypress/cypress/e2e/siblings/siblings.js b/smoke-test/tests/cypress/cypress/e2e/siblings/siblings.js index fb772bd7af1e7..57617d7721e59 100644 --- a/smoke-test/tests/cypress/cypress/e2e/siblings/siblings.js +++ b/smoke-test/tests/cypress/cypress/e2e/siblings/siblings.js @@ -98,7 +98,7 @@ describe("siblings", () => { it("will combine results in search", () => { cy.login(); - cy.visit("/search?page=1&query=raw_orders"); + cy.visit("/search?page=1&query=%22raw_orders%22"); cy.contains("Showing 1 - 2 of ");