diff --git a/datafusion-examples/examples/expr_api.rs b/datafusion-examples/examples/expr_api.rs index 349850df6148..b61a350a5a9c 100644 --- a/datafusion-examples/examples/expr_api.rs +++ b/datafusion-examples/examples/expr_api.rs @@ -84,6 +84,9 @@ async fn main() -> Result<()> { // See how to analyze boundaries in different kinds of expressions. boundary_analysis_and_selectivity_demo()?; + // See how boundary analysis works for `AND` & `OR` conjunctions. + boundary_analysis_in_conjuctions_demo()?; + // See how to determine the data types of expressions expression_type_demo()?; @@ -279,15 +282,15 @@ fn range_analysis_demo() -> Result<()> { Ok(()) } -// DataFusion's analysis can infer boundary statistics and selectivity in -// various situations which can be helpful in building more efficient -// query plans. +/// DataFusion's analysis can infer boundary statistics and selectivity in +/// various situations which can be helpful in building more efficient +/// query plans. fn boundary_analysis_and_selectivity_demo() -> Result<()> { // Consider the example where we want all rows with an `id` greater than // 5000. let id_greater_5000 = col("id").gt_eq(lit(5000i64)); - // As in most examples we must tell DaataFusion the type of the column. + // As in most examples we must tell DataFusion the type of the column. let schema = Arc::new(Schema::new(vec![make_field("id", DataType::Int64)])); // DataFusion is able to do cardinality estimation on various column types @@ -312,10 +315,10 @@ fn boundary_analysis_and_selectivity_demo() -> Result<()> { let df_schema = DFSchema::try_from(schema.clone())?; // Analysis case id >= 5000 - let physical_expr1 = + let physical_expr = SessionContext::new().create_physical_expr(id_greater_5000, &df_schema)?; let analysis = analyze( - &physical_expr1, + &physical_expr, AnalysisContext::new(initial_boundaries.clone()), df_schema.as_ref(), )?; @@ -347,14 +350,112 @@ fn boundary_analysis_and_selectivity_demo() -> Result<()> { Ok(()) } -fn make_field(name: &str, data_type: DataType) -> Field { - let nullable = false; - Field::new(name, data_type, nullable) -} +/// This function shows how to think about and leverage the analysis API +/// to infer boundaries in `AND` & `OR` conjunctions. +fn boundary_analysis_in_conjuctions_demo() -> Result<()> { + // Let us consider the more common case of AND & OR conjunctions. + // + // age > 18 AND age <= 25 + let age_between_18_25 = col("age").gt(lit(18i64)).and(col("age").lt_eq(lit(25))); -fn make_ts_field(name: &str) -> Field { - let tz = None; - make_field(name, DataType::Timestamp(TimeUnit::Nanosecond, tz)) + // As always we need to tell DataFusion the type of the column. + let schema = Arc::new(Schema::new(vec![make_field("age", DataType::Int64)])); + + // Similarly to the example in `boundary_analysis_and_selectivity_demo` we + // can establish column statistics that can be used to describe certain + // column properties. + let column_stats = ColumnStatistics { + null_count: Precision::Exact(0), + max_value: Precision::Exact(ScalarValue::Int64(Some(79))), + min_value: Precision::Exact(ScalarValue::Int64(Some(14))), + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + }; + + let initial_boundaries = + vec![ExprBoundaries::try_from_column(&schema, &column_stats, 0)?]; + + // Before we run the analysis pass; let us describe what we can infer from + // the initial information. + // + // To recap, the expression is `age > 18 AND age <= 25`. + // + // The column `age` can take any value in the `Int64` range. + // + // But using the `min`, `max` statistics we can reduce that initial range + // to `[min_value, max_value]` which is [14, 79]. + // + // During analysis, when evaluating, let's say the left-hand side of the `AND` + // expression, we know that `age` must be greater than 18. Therefore our range + // is now [19, 79]. + // And by evaluating the right-hand side we can get an upper bound, allowing + // us to infer that `age` must be in the range [19, 25] inclusive. + let df_schema = DFSchema::try_from(schema.clone())?; + + let physical_expr = + SessionContext::new().create_physical_expr(age_between_18_25, &df_schema)?; + let analysis = analyze( + &physical_expr, + // We re-use initial_boundaries elsewhere so we must clone it. + AnalysisContext::new(initial_boundaries.clone()), + df_schema.as_ref(), + )?; + + // We can check that DataFusion's analysis inferred the same bounds. + assert_eq!( + analysis.boundaries.first().map(|boundary| boundary + .interval + .clone() + .unwrap() + .into_bounds()), + Some((ScalarValue::Int64(Some(19)), ScalarValue::Int64(Some(25)))) + ); + + // We can also infer the selectivity using the same approach as before. + // + // Granted a column such as age will more likely follow a Normal distribution + // as such our selectivity estimation will not be as good as it can. + assert!(analysis + .selectivity + .is_some_and(|selectivity| (0.1..=0.2).contains(&selectivity))); + + // The above example was a good way to look at how we can derive better + // interval and get a lower selectivity during boundary analysis. + // + // But `AND` conjunctions are easier to reason with because their interval + // arithmetic follows naturally from set intersection operations, let us + // now look at an example that is a tad more complicated `OR` conjunctions. + + // The expression we will look at is `age > 60 OR age <= 18`. + let age_greater_than_60_less_than_18 = + col("age").gt(lit(64i64)).or(col("age").lt_eq(lit(18i64))); + + // We can re-use the same schema, initial boundaries and column statistics + // described above. So let's think about this for a bit. + // + // Initial range: [14, 79] as described in our column statistics. + // + // From the left-hand side and right-hand side of our `OR` conjunctions + // we end up with two ranges, instead of just one. + // + // - age > 60: [61, 79] + // - age <= 18: [14, 18] + // + // Thus the range of possible values the `age` column might take is a + // union of both sets [14, 18] U [61, 79]. + let physical_expr = SessionContext::new() + .create_physical_expr(age_greater_than_60_less_than_18, &df_schema)?; + + // Since we don't handle interval arithmetic for `OR` operator this will error out. + let analysis = analyze( + &physical_expr, + AnalysisContext::new(initial_boundaries), + df_schema.as_ref(), + ); + + assert!(analysis.is_err()); + + Ok(()) } /// This function shows how to use `Expr::get_type` to retrieve the DataType @@ -494,3 +595,13 @@ fn type_coercion_demo() -> Result<()> { Ok(()) } + +fn make_field(name: &str, data_type: DataType) -> Field { + let nullable = false; + Field::new(name, data_type, nullable) +} + +fn make_ts_field(name: &str) -> Field { + let tz = None; + make_field(name, DataType::Timestamp(TimeUnit::Nanosecond, tz)) +} diff --git a/docs/source/library-user-guide/query-optimizer.md b/docs/source/library-user-guide/query-optimizer.md index fad8adf83d81..af27bb75053b 100644 --- a/docs/source/library-user-guide/query-optimizer.md +++ b/docs/source/library-user-guide/query-optimizer.md @@ -388,3 +388,119 @@ In the following example, the `type_coercion` and `simplify_expressions` passes ``` [df]: https://crates.io/crates/datafusion + +## Thinking about Query Optimization + +Query optimization in DataFusion uses a cost based model. The cost based model +relies on table and column level statistics to estimate selectivity; selectivity +estimates are an important piece in cost analysis for filters and projections +as they allow estimating the cost of joins and filters. + +An important piece of building these estimates is _boundary analysis_ which uses +interval arithmetic to take an expression such as `a > 2500 AND a <= 5000` and +build an accurate selectivity estimate that can then be used to find more efficient +plans. + +#### `AnalysisContext` API + +The `AnalysisContext` serves as a shared knowledge base during expression evaluation +and boundary analysis. Think of it as a dynamic repository that maintains information about: + +1. Current known boundaries for columns and expressions +2. Statistics that have been gathered or inferred +3. A mutable state that can be updated as analysis progresses + +What makes `AnalysisContext` particularly powerful is its ability to propagate information +through the expression tree. As each node in the expression tree is analyzed, it can both +read from and write to this shared context, allowing for sophisticated boundary analysis and inference. + +#### `ColumnStatistics` for Cardinality Estimation + +Column statistics form the foundation of optimization decisions. Rather than just tracking +simple metrics, DataFusion's `ColumnStatistics` provides a rich set of information including: + +- Null value counts +- Maximum and minimum values +- Value sums (for numeric columns) +- Distinct value counts + +Each of these statistics is wrapped in a `Precision` type that indicates whether the value is +exact or estimated, allowing the optimizer to make informed decisions about the reliability +of its cardinality estimates. + +### Boundary Analaysis Flow + +The boundary analysis process flows through several stages, with each stage building +upon the information gathered in previous stages. The `AnalysisContext` is continuously +updated as the analysis progresses through the expression tree. + +#### Expression Boundary Analysis + +When analyzing expressions, DataFusion runs boundary analysis using interval arithmetic. +Consider a simple predicate like age > 18 AND age <= 25. The analysis flows as follows: + +1. Context Initialization + + - Begin with known column statistics + - Set up initial boundaries based on column constraints + - Initialize the shared analysis context + +2. Expression Tree Walk + + - Analyze each node in the expression tree + - Propagate boundary information upward + - Allow child nodes to influence parent boundaries + +3. Boundary Updates + - Each expression can update the shared context + - Changes flow through the entire expression tree + - Final boundaries inform optimization decisions + +### Working with the analysis API + +The following example shows how you can run an analysis pass on a physical expression +to infer the selectivity of the expression and the space of possible values it can +take. + +```rust +# use std::sync::Arc; +# use datafusion::prelude::*; +# use datafusion::physical_expr::{analyze, AnalysisContext, ExprBoundaries}; +# use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit}; +# use datafusion::common::stats::Precision; +# +# use datafusion::common::{ColumnStatistics, DFSchema}; +# use datafusion::common::{ScalarValue, ToDFSchema}; +# use datafusion::error::Result; +fn analyze_filter_example() -> Result<()> { + // Create a schema with an 'age' column + let age = Field::new("age", DataType::Int64, false); + let schema = Arc::new(Schema::new(vec![age])); + + // Define column statistics + let column_stats = ColumnStatistics { + null_count: Precision::Exact(0), + max_value: Precision::Exact(ScalarValue::Int64(Some(79))), + min_value: Precision::Exact(ScalarValue::Int64(Some(14))), + distinct_count: Precision::Absent, + sum_value: Precision::Absent, + }; + + // Create expression: age > 18 AND age <= 25 + let expr = col("age") + .gt(lit(18i64)) + .and(col("age").lt_eq(lit(25i64))); + + // Initialize analysis context + let initial_boundaries = vec![ExprBoundaries::try_from_column( + &schema, &column_stats, 0)?]; + let context = AnalysisContext::new(initial_boundaries); + + // Analyze expression + let df_schema = DFSchema::try_from(schema)?; + let physical_expr = SessionContext::new().create_physical_expr(expr, &df_schema)?; + let analysis = analyze(&physical_expr, context, df_schema.as_ref())?; + + Ok(()) +} +```