From 25e61231687e29d5e1725e9c4ea9cdf7fd018c65 Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Fri, 21 Feb 2025 09:29:16 +0000 Subject: [PATCH] Update profiler.py --- .../ingestion/source/bigquery_v2/profiler.py | 101 +++++++++++------- 1 file changed, 62 insertions(+), 39 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py index 445787a122940..b563c5df70379 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py @@ -86,23 +86,42 @@ def _get_required_partition_filters( col.name for col in table.partition_info.columns if col ) - # Also check table columns for any partition columns - if hasattr(table, "columns") and table.columns: - required_partition_columns.update( - col.name for col in table.columns if col.is_partition_column - ) + # If no partition columns found + if not required_partition_columns: + logger.debug(f"No partition columns found for table {table.name}") + if table.external: + # For external tables, check if it has external partitioning + try: + query = f""" + SELECT column_name + FROM `{project}.{schema}.INFORMATION_SCHEMA.COLUMNS` + WHERE table_name = '{table.name}' + AND is_partitioning_column = 'YES' + """ + query_job = self.config.get_bigquery_client().query(query) + results = list(query_job) - logger.debug(f"Required partition columns: {required_partition_columns}") + if results: + required_partition_columns.update( + row.column_name for row in results + ) + logger.debug( + f"Found external partition columns: {required_partition_columns}" + ) + else: + # No partitions found at all - this is valid for external tables + return [] + except Exception as e: + logger.error(f"Error checking external table partitioning: {e}") + return None + else: + # Internal table with no partitions - this is unexpected + return None - # Get all table columns for reference - all_columns = {} - if hasattr(table, "columns") and table.columns: - all_columns = {col.name: col for col in table.columns} + logger.debug(f"Required partition columns: {required_partition_columns}") # Add filters for each partition column for col_name in required_partition_columns: - col = all_columns.get(col_name) - # Handle standard time-based columns first col_name_lower = col_name.lower() if col_name_lower == "year": @@ -111,21 +130,16 @@ def _get_required_partition_filters( partition_filters.append(f"`{col_name}` = {current_time.month}") elif col_name_lower == "day": partition_filters.append(f"`{col_name}` = {current_time.day}") - elif ( - col - and col.data_type - and col.data_type.lower() in ("timestamp", "date", "datetime") - ): - partition_filters.append( - f"`{col_name}` = {col.data_type}('{current_time}')" - ) else: try: - query = f"""SELECT DISTINCT {col_name} as val -FROM `{project}.{schema}.{table.name}` -WHERE {col_name} IS NOT NULL -ORDER BY {col_name} -LIMIT 1""" + # Query for a valid partition value + query = f""" + SELECT DISTINCT {col_name} as val + FROM `{project}.{schema}.{table.name}` + WHERE {col_name} IS NOT NULL + ORDER BY {col_name} DESC -- Get most recent partition by default + LIMIT 1 + """ query_job = self.config.get_bigquery_client().query(query) results = list(query_job) @@ -161,23 +175,30 @@ def get_batch_kwargs( "table_name": bq_table.name, } - # Get partition filters + # For external tables, add specific handling + if bq_table.external: + base_kwargs["is_external"] = "true" + # Add any specific external table options needed + partition_filters = self._get_required_partition_filters( bq_table, db_name, schema_name ) - if not partition_filters: + if partition_filters is None: logger.warning( f"Could not construct partition filters for {bq_table.name}. " "This may cause partition elimination errors." ) return base_kwargs - # Construct base query with partition filters + # If no partition filters needed (e.g. some external tables), return base kwargs + if not partition_filters: + return base_kwargs + + # Construct query with partition filters partition_where = " AND ".join(partition_filters) logger.debug(f"Using partition filters: {partition_where}") - # Apply partition filters via custom_sql custom_sql = f"""SELECT * FROM `{db_name}.{schema_name}.{table.name}` WHERE {partition_where}""" @@ -211,7 +232,8 @@ def get_profile_request( bq_table, db_name, schema_name ) - if not partition_filters: + # If we got None back, that means there was an error getting partition filters + if partition_filters is None: self.report.report_warning( title="Profile skipped for partitioned table", message="Could not construct partition filters - required for partition elimination", @@ -228,16 +250,17 @@ def get_profile_request( ) return None - # Construct query with partition filters - partition_where = " AND ".join(partition_filters) - custom_sql = f"""SELECT * -FROM `{db_name}.{schema_name}.{bq_table.name}` -WHERE {partition_where}""" + # Only add partition handling if we actually have partition filters + if partition_filters: + partition_where = " AND ".join(partition_filters) + custom_sql = f"""SELECT * + FROM `{db_name}.{schema_name}.{bq_table.name}` + WHERE {partition_where}""" - logger.debug(f"Using partition filters: {partition_where}") - profile_request.batch_kwargs.update( - dict(custom_sql=custom_sql, partition_handling="true") - ) + logger.debug(f"Using partition filters: {partition_where}") + profile_request.batch_kwargs.update( + dict(custom_sql=custom_sql, partition_handling="true") + ) return profile_request