Skip to content

Commit

Permalink
Update profiler.py
Browse files Browse the repository at this point in the history
  • Loading branch information
acrylJonny committed Feb 21, 2025
1 parent ade17f6 commit 25e6123
Showing 1 changed file with 62 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,23 +86,42 @@ def _get_required_partition_filters(
col.name for col in table.partition_info.columns if col
)

# Also check table columns for any partition columns
if hasattr(table, "columns") and table.columns:
required_partition_columns.update(
col.name for col in table.columns if col.is_partition_column
)
# If no partition columns found
if not required_partition_columns:
logger.debug(f"No partition columns found for table {table.name}")
if table.external:
# For external tables, check if it has external partitioning
try:
query = f"""
SELECT column_name
FROM `{project}.{schema}.INFORMATION_SCHEMA.COLUMNS`
WHERE table_name = '{table.name}'
AND is_partitioning_column = 'YES'
"""
query_job = self.config.get_bigquery_client().query(query)
results = list(query_job)

logger.debug(f"Required partition columns: {required_partition_columns}")
if results:
required_partition_columns.update(
row.column_name for row in results
)
logger.debug(
f"Found external partition columns: {required_partition_columns}"
)
else:
# No partitions found at all - this is valid for external tables
return []
except Exception as e:
logger.error(f"Error checking external table partitioning: {e}")
return None
else:
# Internal table with no partitions - this is unexpected
return None

# Get all table columns for reference
all_columns = {}
if hasattr(table, "columns") and table.columns:
all_columns = {col.name: col for col in table.columns}
logger.debug(f"Required partition columns: {required_partition_columns}")

# Add filters for each partition column
for col_name in required_partition_columns:
col = all_columns.get(col_name)

# Handle standard time-based columns first
col_name_lower = col_name.lower()
if col_name_lower == "year":
Expand All @@ -111,21 +130,16 @@ def _get_required_partition_filters(
partition_filters.append(f"`{col_name}` = {current_time.month}")
elif col_name_lower == "day":
partition_filters.append(f"`{col_name}` = {current_time.day}")
elif (
col
and col.data_type
and col.data_type.lower() in ("timestamp", "date", "datetime")
):
partition_filters.append(
f"`{col_name}` = {col.data_type}('{current_time}')"
)
else:
try:
query = f"""SELECT DISTINCT {col_name} as val
FROM `{project}.{schema}.{table.name}`
WHERE {col_name} IS NOT NULL
ORDER BY {col_name}
LIMIT 1"""
# Query for a valid partition value
query = f"""
SELECT DISTINCT {col_name} as val
FROM `{project}.{schema}.{table.name}`
WHERE {col_name} IS NOT NULL
ORDER BY {col_name} DESC -- Get most recent partition by default
LIMIT 1
"""
query_job = self.config.get_bigquery_client().query(query)
results = list(query_job)

Expand Down Expand Up @@ -161,23 +175,30 @@ def get_batch_kwargs(
"table_name": bq_table.name,
}

# Get partition filters
# For external tables, add specific handling
if bq_table.external:
base_kwargs["is_external"] = "true"
# Add any specific external table options needed

partition_filters = self._get_required_partition_filters(
bq_table, db_name, schema_name
)

if not partition_filters:
if partition_filters is None:
logger.warning(
f"Could not construct partition filters for {bq_table.name}. "
"This may cause partition elimination errors."
)
return base_kwargs

# Construct base query with partition filters
# If no partition filters needed (e.g. some external tables), return base kwargs
if not partition_filters:
return base_kwargs

# Construct query with partition filters
partition_where = " AND ".join(partition_filters)
logger.debug(f"Using partition filters: {partition_where}")

# Apply partition filters via custom_sql
custom_sql = f"""SELECT *
FROM `{db_name}.{schema_name}.{table.name}`
WHERE {partition_where}"""
Expand Down Expand Up @@ -211,7 +232,8 @@ def get_profile_request(
bq_table, db_name, schema_name
)

if not partition_filters:
# If we got None back, that means there was an error getting partition filters
if partition_filters is None:
self.report.report_warning(
title="Profile skipped for partitioned table",
message="Could not construct partition filters - required for partition elimination",
Expand All @@ -228,16 +250,17 @@ def get_profile_request(
)
return None

# Construct query with partition filters
partition_where = " AND ".join(partition_filters)
custom_sql = f"""SELECT *
FROM `{db_name}.{schema_name}.{bq_table.name}`
WHERE {partition_where}"""
# Only add partition handling if we actually have partition filters
if partition_filters:
partition_where = " AND ".join(partition_filters)
custom_sql = f"""SELECT *
FROM `{db_name}.{schema_name}.{bq_table.name}`
WHERE {partition_where}"""

logger.debug(f"Using partition filters: {partition_where}")
profile_request.batch_kwargs.update(
dict(custom_sql=custom_sql, partition_handling="true")
)
logger.debug(f"Using partition filters: {partition_where}")
profile_request.batch_kwargs.update(
dict(custom_sql=custom_sql, partition_handling="true")
)

return profile_request

Expand Down

0 comments on commit 25e6123

Please sign in to comment.