diff --git a/python/lsst/daf/butler/direct_query_driver/_driver.py b/python/lsst/daf/butler/direct_query_driver/_driver.py index 7b8ccbf17e..a6e153a9e7 100644 --- a/python/lsst/daf/butler/direct_query_driver/_driver.py +++ b/python/lsst/daf/butler/direct_query_driver/_driver.py @@ -334,14 +334,14 @@ def build_query( order_by: Iterable[qt.OrderExpression] = (), find_first_dataset: str | None = None, ) -> tuple[QueryPlan, QueryBuilder]: - plan, builder = self._analyze_query(tree, final_columns, order_by, find_first_dataset) - self._apply_query_joins(plan.joins, builder.joiner) - self._apply_query_projection(plan.projection, builder) - builder = self._apply_find_first(plan.find_first, builder) + plan, builder = self.analyze_query(tree, final_columns, order_by, find_first_dataset) + self.apply_query_joins(plan.joins, builder.joiner) + self.apply_query_projection(plan.projection, builder) + builder = self.apply_query_find_first(plan.find_first, builder) builder.columns = plan.final_columns return plan, builder - def _analyze_query( + def analyze_query( self, tree: qt.QueryTree, final_columns: qt.ColumnSet, @@ -384,7 +384,7 @@ def _analyze_query( find_first_plan = None if find_first_dataset is not None: - find_first_plan = QueryFindFirstPlan(find_first_dataset, joins_plan.datasets[find_first_dataset]) + find_first_plan = QueryFindFirstPlan(joins_plan.datasets[find_first_dataset]) # If we're doing a find-first search and there's a calibration # collection in play, we need to make sure the rows coming out of # the base query have only one timespan for each data ID + @@ -404,118 +404,7 @@ def _analyze_query( ) return plan, builder - def _analyze_query_tree(self, tree: qt.QueryTree) -> tuple[QueryJoinsPlan, QueryBuilder]: - # Delegate to the dimensions manager to rewrite the predicate and - # start a QueryBuilder to cover any spatial overlap - # joins or constraints. We'll return that SqlBuilder at the end. - ( - predicate, - builder, - ) = self.managers.dimensions.process_query_overlaps( - tree.dimensions, - tree.predicate, - tree.get_joined_dimension_groups(), - ) - result = QueryJoinsPlan(predicate=predicate, columns=builder.columns) - # We also check that the predicate doesn't reference any dimensions - # without constraining their governor dimensions, since that's a - # particularly easy mistake to make and it's almost never intentional. - # We also allow the registry data ID values to provide governor values. - where_columns = qt.ColumnSet(self.universe.empty.as_group()) - result.predicate.gather_required_columns(where_columns) - for governor in where_columns.dimensions.governors: - if governor not in result.constraint_data_id: - if governor in self._defaults.dataId.dimensions: - result.constraint_data_id[governor] = self._defaults.dataId[governor] - else: - raise qt.InvalidQueryError( - f"Query 'where' expression references a dimension dependent on {governor} without " - "constraining it directly." - ) - # Add materializations, which can also bring in more postprocessing. - for m_key, m_dimensions in tree.materializations.items(): - _, m_postprocessing = self._materializations[m_key] - result.materializations[m_key] = m_dimensions - # When a query is materialized, the new tree has an empty - # (trivially true) predicate because the original was used to make - # the materialized rows. But the original postprocessing isn't - # executed when the materialization happens, so we have to include - # it here. - builder.postprocessing.spatial_join_filtering.extend(m_postprocessing.spatial_join_filtering) - builder.postprocessing.spatial_where_filtering.extend(m_postprocessing.spatial_where_filtering) - # Add data coordinate uploads. - result.data_coordinate_uploads.update(tree.data_coordinate_uploads) - # Add dataset_searches and filter out collections that don't have the - # right dataset type or governor dimensions. - for dataset_type_name, dataset_search in tree.datasets.items(): - resolved_dataset_search = self._resolve_dataset_search( - dataset_type_name, dataset_search, result.constraint_data_id - ) - result.datasets[dataset_type_name] = resolved_dataset_search - if not resolved_dataset_search.collection_records: - result.messages.append(f"Search for dataset type {dataset_type_name!r} is doomed to fail.") - result.messages.extend(resolved_dataset_search.messages) - return result, builder - - def _resolve_dataset_search( - self, - dataset_type_name: str, - dataset_search: qt.DatasetSearch, - constraint_data_id: Mapping[str, DataIdValue], - ) -> ResolvedDatasetSearch: - result = ResolvedDatasetSearch(dataset_type_name, dataset_search.dimensions) - for collection_record, collection_summary in self._resolve_collection_path( - dataset_search.collections - ): - rejected: bool = False - if result.name not in collection_summary.dataset_types.names: - result.messages.append( - f"No datasets of type {result.name!r} in collection {collection_record.name}." - ) - rejected = True - for governor in constraint_data_id.keys() & collection_summary.governors.keys(): - if constraint_data_id[governor] not in collection_summary.governors[governor]: - result.messages.append( - f"No datasets with {governor}={constraint_data_id[governor]!r} " - f"in collection {collection_record.name}." - ) - rejected = True - if not rejected: - if collection_record.type is CollectionType.CALIBRATION: - result.is_calibration_search = True - result.collection_records.append(collection_record) - if result.dimensions != self.get_dataset_type(dataset_type_name).dimensions.as_group(): - # This is really for server-side defensiveness; it's hard to - # imagine the query getting different dimensions for a dataset - # type in two calls to the same query driver. - raise qt.InvalidQueryError( - f"Incorrect dimensions {result.dimensions} for dataset {dataset_type_name} " - f"in query (vs. {self.get_dataset_type(dataset_type_name).dimensions.as_group()})." - ) - return result - - def _resolve_collection_path( - self, collections: Iterable[str] - ) -> list[tuple[CollectionRecord, CollectionSummary]]: - result: list[tuple[CollectionRecord, CollectionSummary]] = [] - done: set[str] = set() - - def recurse(collection_names: Iterable[str]) -> None: - for collection_name in collection_names: - if collection_name not in done: - done.add(collection_name) - record = self.managers.collections.find(collection_name) - - if record.type is CollectionType.CHAINED: - recurse(cast(ChainedCollectionRecord, record).children) - else: - result.append((record, self.managers.datasets.getCollectionSummary(record))) - - recurse(collections) - - return result - - def _apply_query_joins(self, plan: QueryJoinsPlan, joiner: QueryJoiner) -> None: + def apply_query_joins(self, plan: QueryJoinsPlan, joiner: QueryJoiner) -> None: # Process data coordinate upload joins. for upload_key, upload_dimensions in plan.data_coordinate_uploads.items(): joiner.join( @@ -558,7 +447,7 @@ def _apply_query_joins(self, plan: QueryJoinsPlan, joiner: QueryJoiner) -> None: # Add the WHERE clause to the joiner. joiner.where(plan.predicate.visit(SqlColumnVisitor(joiner, self))) - def _apply_query_projection(self, plan: QueryProjectionPlan, builder: QueryBuilder) -> None: + def apply_query_projection(self, plan: QueryProjectionPlan, builder: QueryBuilder) -> None: builder.columns = plan.columns if not plan.needs_dimension_distinct and not builder.postprocessing.check_validity_match_count: # Rows are already unique; nothing else to do in this method. @@ -673,7 +562,7 @@ def _apply_query_projection(self, plan: QueryProjectionPlan, builder: QueryBuild unique_keys.append(builder.joiner.fields[logical_table][field]) builder.group_by = unique_keys - def _apply_find_first(self, plan: QueryFindFirstPlan | None, builder: QueryBuilder) -> QueryBuilder: + def apply_query_find_first(self, plan: QueryFindFirstPlan | None, builder: QueryBuilder) -> QueryBuilder: if not plan: return builder # The query we're building looks like this: @@ -722,6 +611,117 @@ def _apply_find_first(self, plan: QueryFindFirstPlan | None, builder: QueryBuild del builder.joiner.special["_ROWNUM"] return builder + def _analyze_query_tree(self, tree: qt.QueryTree) -> tuple[QueryJoinsPlan, QueryBuilder]: + # Delegate to the dimensions manager to rewrite the predicate and + # start a QueryBuilder to cover any spatial overlap + # joins or constraints. We'll return that SqlBuilder at the end. + ( + predicate, + builder, + ) = self.managers.dimensions.process_query_overlaps( + tree.dimensions, + tree.predicate, + tree.get_joined_dimension_groups(), + ) + result = QueryJoinsPlan(predicate=predicate, columns=builder.columns) + # We also check that the predicate doesn't reference any dimensions + # without constraining their governor dimensions, since that's a + # particularly easy mistake to make and it's almost never intentional. + # We also allow the registry data ID values to provide governor values. + where_columns = qt.ColumnSet(self.universe.empty.as_group()) + result.predicate.gather_required_columns(where_columns) + for governor in where_columns.dimensions.governors: + if governor not in result.constraint_data_id: + if governor in self._defaults.dataId.dimensions: + result.constraint_data_id[governor] = self._defaults.dataId[governor] + else: + raise qt.InvalidQueryError( + f"Query 'where' expression references a dimension dependent on {governor} without " + "constraining it directly." + ) + # Add materializations, which can also bring in more postprocessing. + for m_key, m_dimensions in tree.materializations.items(): + _, m_postprocessing = self._materializations[m_key] + result.materializations[m_key] = m_dimensions + # When a query is materialized, the new tree has an empty + # (trivially true) predicate because the original was used to make + # the materialized rows. But the original postprocessing isn't + # executed when the materialization happens, so we have to include + # it here. + builder.postprocessing.spatial_join_filtering.extend(m_postprocessing.spatial_join_filtering) + builder.postprocessing.spatial_where_filtering.extend(m_postprocessing.spatial_where_filtering) + # Add data coordinate uploads. + result.data_coordinate_uploads.update(tree.data_coordinate_uploads) + # Add dataset_searches and filter out collections that don't have the + # right dataset type or governor dimensions. + for dataset_type_name, dataset_search in tree.datasets.items(): + resolved_dataset_search = self._resolve_dataset_search( + dataset_type_name, dataset_search, result.constraint_data_id + ) + result.datasets[dataset_type_name] = resolved_dataset_search + if not resolved_dataset_search.collection_records: + result.messages.append(f"Search for dataset type {dataset_type_name!r} is doomed to fail.") + result.messages.extend(resolved_dataset_search.messages) + return result, builder + + def _resolve_dataset_search( + self, + dataset_type_name: str, + dataset_search: qt.DatasetSearch, + constraint_data_id: Mapping[str, DataIdValue], + ) -> ResolvedDatasetSearch: + result = ResolvedDatasetSearch(dataset_type_name, dataset_search.dimensions) + for collection_record, collection_summary in self._resolve_collection_path( + dataset_search.collections + ): + rejected: bool = False + if result.name not in collection_summary.dataset_types.names: + result.messages.append( + f"No datasets of type {result.name!r} in collection {collection_record.name}." + ) + rejected = True + for governor in constraint_data_id.keys() & collection_summary.governors.keys(): + if constraint_data_id[governor] not in collection_summary.governors[governor]: + result.messages.append( + f"No datasets with {governor}={constraint_data_id[governor]!r} " + f"in collection {collection_record.name}." + ) + rejected = True + if not rejected: + if collection_record.type is CollectionType.CALIBRATION: + result.is_calibration_search = True + result.collection_records.append(collection_record) + if result.dimensions != self.get_dataset_type(dataset_type_name).dimensions.as_group(): + # This is really for server-side defensiveness; it's hard to + # imagine the query getting different dimensions for a dataset + # type in two calls to the same query driver. + raise qt.InvalidQueryError( + f"Incorrect dimensions {result.dimensions} for dataset {dataset_type_name} " + f"in query (vs. {self.get_dataset_type(dataset_type_name).dimensions.as_group()})." + ) + return result + + def _resolve_collection_path( + self, collections: Iterable[str] + ) -> list[tuple[CollectionRecord, CollectionSummary]]: + result: list[tuple[CollectionRecord, CollectionSummary]] = [] + done: set[str] = set() + + def recurse(collection_names: Iterable[str]) -> None: + for collection_name in collection_names: + if collection_name not in done: + done.add(collection_name) + record = self.managers.collections.find(collection_name) + + if record.type is CollectionType.CHAINED: + recurse(cast(ChainedCollectionRecord, record).children) + else: + result.append((record, self.managers.datasets.getCollectionSummary(record))) + + recurse(collections) + + return result + def _join_materialization( self, joiner: QueryJoiner, diff --git a/python/lsst/daf/butler/direct_query_driver/_query_plan.py b/python/lsst/daf/butler/direct_query_driver/_query_plan.py index ee22c33f34..b85703a075 100644 --- a/python/lsst/daf/butler/direct_query_driver/_query_plan.py +++ b/python/lsst/daf/butler/direct_query_driver/_query_plan.py @@ -33,7 +33,6 @@ "QueryProjectionPlan", "QueryFindFirstPlan", "ResolvedDatasetSearch", - "DataIdExtractionVisitor", ) import dataclasses @@ -48,33 +47,90 @@ @dataclasses.dataclass class ResolvedDatasetSearch: + """A struct describing a dataset search joined into a query, after + resolving its collection search path. + """ + name: str + """Name of the dataset type.""" + dimensions: DimensionGroup + """Dimensions of the dataset type.""" + collection_records: list[CollectionRecord] = dataclasses.field(default_factory=list) + """Records of the collections to search for this dataset, in order, after + removing collections inconsistent with the dataset type or the query's + data ID constraint. + """ + messages: list[str] = dataclasses.field(default_factory=list) + """Diagnostic messages about collections that were filtered out of + collection records. + """ + is_calibration_search: bool = False + """`True` if any of the collections to be searched is a + `~CollectionType.CALIBRATION` collection, `False` otherwise. + + Since only calibration datasets can be present in + `~CollectionType.CALIBRATION` collections, this also + """ @dataclasses.dataclass class QueryJoinsPlan: + """A struct describing the "joins" section of a butler query. + + See `QueryPlan` and `QueryPlan.joins` for additional information. + """ + predicate: qt.Predicate + """Boolean expression to apply to rows.""" + columns: qt.ColumnSet + """All columns whose tables need to be joined into the query. + + This is updated after construction to include all columns required by + `predicate`. + """ + materializations: dict[qt.MaterializationKey, DimensionGroup] = dataclasses.field(default_factory=dict) + """Materializations to join into the query.""" + datasets: dict[str, ResolvedDatasetSearch] = dataclasses.field(default_factory=dict) + """Dataset searches to join into the query.""" + data_coordinate_uploads: dict[qt.DataCoordinateUploadKey, DimensionGroup] = dataclasses.field( default_factory=dict ) + """Data coordinate uploads to join into the query.""" + constraint_data_id: dict[str, DataIdValue] = dataclasses.field(default_factory=dict) + """A data ID that must be consistent with all result rows, extracted from + `predicate` at construction. + """ + messages: list[str] = dataclasses.field(default_factory=list) + """Diagnostic messages that report reasons the query may not return any + rows. + """ def __post_init__(self) -> None: self.predicate.gather_required_columns(self.columns) # Extract the data ID implied by the predicate; we can use the governor # dimensions in that to constrain the collections we search for # datasets later. - self.predicate.visit(DataIdExtractionVisitor(self.constraint_data_id, self.messages)) + self.predicate.visit(_DataIdExtractionVisitor(self.constraint_data_id, self.messages)) def iter_mandatory(self) -> Iterator[DimensionElement]: + """Return an iterator over the dimension elements that must be joined + into the query. + + These elements either provide "field" (non-key) columns or define + relationships that result rows must be consistent with. They do not + necessarily include all dimension keys in `columns`, since each of + those can typically be included in a query in multiple different ways. + """ for element_name in self.columns.dimensions.elements: element = self.columns.dimensions.universe[element_name] if self.columns.dimension_fields[element_name]: @@ -82,7 +138,7 @@ def iter_mandatory(self) -> Iterator[DimensionElement]: # its table is the only place to get those. yield element elif element.defines_relationships: - # We als need to join in DimensionElements tables that define + # We also need to join in DimensionElement tables that define # one-to-many and many-to-many relationships, but data # coordinate uploads, materializations, and datasets can also # provide these relationships. Data coordinate uploads and @@ -109,17 +165,55 @@ def iter_mandatory(self) -> Iterator[DimensionElement]: @dataclasses.dataclass class QueryProjectionPlan: + """A struct describing the "projection" stage of a butler query. + + See `QueryPlan` and `QueryPlan.projection` for additional information. + """ + columns: qt.ColumnSet + """The columns present in the query after the projection is applied. + + This is always a subset of `QueryJoinsPlan.columns`. + """ + datasets: dict[str, ResolvedDatasetSearch] + """Dataset searches to join into the query.""" + needs_dimension_distinct: bool = False + """If `True`, the projection's dimensions do not include all dimensions in + the "joins" stage, and hence a SELECT DISTINCT [ON] or GROUP BY must be + used to make post-projection rows unique. + """ + find_first_dataset: str | None = None + """If not `None`, this is a find-first query for this dataset. + + This is set even if the find-first search is trivial because there is only + one resolved collection. + """ + region_aggregates: list[DimensionElement] = dataclasses.field(default_factory=list) + """Dimension elements with spatial regions that must be aggregated by the + projection, since their dimension keys are being dropped. + + This can only be non-empty if `needs_dimension_distinct` is `True`. + """ @dataclasses.dataclass class QueryFindFirstPlan: - dataset_type: str + """A struct describing the "find-first" stage of a butler query. + + See `QueryPlan` and `QueryPlan.find_first` for additional information. + """ + search: ResolvedDatasetSearch + """Information about the dataset being searched for.""" + + @property + def dataset_type(self) -> str: + """Name of the dataset type.""" + return self.search.name def __bool__(self) -> bool: return len(self.search.collection_records) > 1 @@ -127,16 +221,82 @@ def __bool__(self) -> bool: @dataclasses.dataclass class QueryPlan: + """A struct that aggregates information about a complete butler query. + + Notes + ----- + Butler queries are transformed into a combination of SQL and Python-side + postprocessing in three stages, with each corresponding to an attributes of + this class and a method of `DirectQueryDriver` + + - In the `joins` stage (`~DirectQueryButler.apply_query_joins`), we define + the main SQL FROM and WHERE clauses, by joining all tables needed to + bring in any columns, or constrain the keys of its rows. + + - In the `projection` stage (`~DirectQueryButler.apply_query_projection`), + we select only the columns needed for the query's result rows (including + columns needed only postprocessing and ORDER BY, as well those needed by + the objects returned to users). If the result rows are not naturally + unique given what went into the query in the "joins" stage, the + projection involves a SELECT DISTINCT [ON] or GROUP BY to make them + unique, and in a few rare cases uses aggregate functions with GROUP BY. + + - In the `find_first` stage (`~DirectQueryButler.apply_query_find_first`), + we use a window function (PARTITION BY) subquery to find only the first + dataset in the collection search path for each data ID. This stage does + nothing if there is no find-first dataset search, or if the search is + trivial because there is only one collection. + + In `DirectQueryDriver.build_query`, a `QueryPlan` instance is constructed + via `DirectQueryDriver.analyze_query`, which also returns an initial + `QueryBuilder`. After this point the plans are considered frozen, and the + nested plan attributes are then passed to each of the corresponding + `DirectQuery` along with the builder, which is mutated (and occasionally + replaced) into the complete SQL/postprocessing form of the query. + """ + joins: QueryJoinsPlan + """Description of the "joins" stage of query construction.""" + projection: QueryProjectionPlan + """Description of the "projection" stage of query construction.""" + find_first: QueryFindFirstPlan | None + """Description of the "find_first" stage of query construction. + + This attribute is `None` if there is no find-first search at all, and + `False` in boolean contexts if the search is trivial because there is only + one collection after the collections have been resolved. + """ + final_columns: qt.ColumnSet + """The columns included in the SELECT clause of the complete SQL query + that is actually executed. + + This is a subset of `QueryProjectionPlan.columns` that differs only in + columns used by the `find_first` stage or an ORDER BY expression. + + Like all other `.queries.tree.ColumnSet` attributes, it does not include + fields added directly to `QueryBuilder.special`, which may also be added + to the SELECT clause. + """ -class DataIdExtractionVisitor( +class _DataIdExtractionVisitor( SimplePredicateVisitor, ColumnExpressionVisitor[tuple[str, None] | tuple[None, Any] | tuple[None, None]], ): + """A column-expression visitor that extracts quality constraints on + dimensions that are not OR'd with anything else. + + Parameters + ---------- + data_id : `dict` + Dictionary to populate in place. + messages : `list` [ `str` ] + List of diagnostic messages to populate in place. + """ + def __init__(self, data_id: dict[str, DataIdValue], messages: list[str]): self.data_id = data_id self.messages = messages