Skip to content

Commit

Permalink
addressing comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ch-sc committed Feb 24, 2025
1 parent 621809f commit d14747d
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 33 deletions.
13 changes: 10 additions & 3 deletions datafusion-examples/examples/expr_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use arrow::record_batch::RecordBatch;
use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use datafusion::common::stats::Precision;
use datafusion::common::tree_node::{Transformed, TreeNode};
use datafusion::common::{ColumnStatistics, DFSchema};
use datafusion::common::{internal_datafusion_err, ColumnStatistics, DFSchema};
use datafusion::common::{ScalarValue, ToDFSchema};
use datafusion::error::Result;
use datafusion::functions_aggregate::first_last::first_value_udaf;
Expand Down Expand Up @@ -302,10 +302,17 @@ fn boundary_analysis_and_selectivity_demo() -> Result<()> {
distinct_count: Precision::Absent,
};

let field = schema.fields().first().ok_or_else(|| {
internal_datafusion_err!("schema does not have a field at index 0")
})?;

// We can then build our expression boundaries from the column statistics
// allowing the analysis to be more precise.
let initial_boundaries =
vec![ExprBoundaries::try_from_column(&schema, &column_stats, 0)?];
let initial_boundaries = vec![ExprBoundaries::try_from_column(
field.as_ref(),
&column_stats,
0,
)?];

// With the above we can perform the boundary analysis similar to the previous
// example.
Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl Precision<usize> {
/// rows are selected. A selectivity of `0.5` means half the rows are
/// selected. Will always return inexact statistics.
pub fn with_estimated_selectivity(self, selectivity: f64) -> Self {
self.map(|v| ((v as f64 * selectivity).ceil()) as usize)
self.map(|v| (v as f64 * selectivity).ceil() as usize)
.to_inexact()
}
}
Expand Down
14 changes: 8 additions & 6 deletions datafusion/expr-common/src/interval_arithmetic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,14 +416,11 @@ impl Interval {
)
}

pub fn is_null(&self) -> bool {
/// Check whether this interval has no boundary in either direction
pub fn is_unbounded(&self) -> bool {
self.lower.is_null() && self.upper.is_null()
}

pub fn is_not_null(&self) -> bool {
!(self.lower.is_null() || self.upper.is_null())
}

pub const CERTAINLY_FALSE: Self = Self {
lower: ScalarValue::Boolean(Some(false)),
upper: ScalarValue::Boolean(Some(false)),
Expand Down Expand Up @@ -1698,7 +1695,7 @@ impl Display for NullableInterval {

impl From<&Interval> for NullableInterval {
fn from(value: &Interval) -> Self {
if value.is_null() {
if value.is_unbounded() {
Self::Null {
datatype: value.data_type(),
}
Expand Down Expand Up @@ -2927,6 +2924,11 @@ mod tests {
Interval::make(Some(32.0), Some(64.0))?,
Interval::UNCERTAIN,
),
(
Interval::make::<i64>(Some(3_i64), Some(5_i64))?,
Interval::make::<i64>(Some(0_i64), Some(9_i64))?,
Interval::UNCERTAIN,
),
(
Interval::make(Some(1000_i64), None)?,
Interval::make(None, Some(0_i64))?,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr-common/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ impl Operator {
}

/// Indicates whether this operator supports interval arithmetic
pub fn supports_bounds_evaluation(&self) -> bool {
pub fn supports_interval_evaluation(&self) -> bool {
matches!(
&self,
&Operator::Plus
Expand Down
9 changes: 2 additions & 7 deletions datafusion/expr/src/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -722,13 +722,8 @@ pub trait ScalarUDFImpl: Debug + Send + Sync {
///
/// If the function is `ABS(a)`, and the input interval is `a: [-3, 2]`,
/// then the output interval would be `[0, 3]`.
fn evaluate_bounds(&self, input: &[&Interval]) -> Result<Interval> {
let input_data_types = input
.iter()
.map(|i| i.data_type())
.collect::<Vec<DataType>>();
let return_type = self.return_type(&input_data_types)?;
Interval::make_unbounded(&return_type)
fn evaluate_bounds(&self, _input: &[&Interval]) -> Result<Interval> {
not_impl_err!("Not implemented for UDF {:?}", self)
}

/// Indicates whether this ['ScalarUDFImpl'] supports interval arithmetic.
Expand Down
30 changes: 17 additions & 13 deletions datafusion/physical-expr/src/analysis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::intervals::cp_solver::{ExprIntervalGraph, PropagationResult};
use crate::utils::collect_columns;
use crate::PhysicalExpr;

use arrow::datatypes::Schema;
use arrow::datatypes::{Field, Schema};
use datafusion_common::stats::Precision;
use datafusion_common::{
internal_datafusion_err, internal_err, ColumnStatistics, Result, ScalarValue,
Expand Down Expand Up @@ -60,12 +60,23 @@ impl AnalysisContext {

/// Create a new analysis context from column statistics.
pub fn try_from_statistics(
input_schema: &Schema,
schema: &Schema,
statistics: &[ColumnStatistics],
) -> Result<Self> {
(0..input_schema.fields.len())
.map(|idx| {
ExprBoundaries::try_from_column(input_schema, &statistics[idx], idx)
assert_eq!(schema.fields().len(),
statistics.len(),
"Mismatching number of fields in schema and column statistics. schema: {}, column statistics: {}",
schema.fields().len(),
statistics.len()
);

schema
.fields()
.iter()
.zip(statistics.iter())
.enumerate()
.map(|(idx, (field, stats))| {
ExprBoundaries::try_from_column(field.as_ref(), stats, idx)
})
.collect::<Result<Vec<_>>>()
.map(Self::new)
Expand Down Expand Up @@ -94,17 +105,10 @@ pub struct ExprBoundaries {
impl ExprBoundaries {
/// Create a new `ExprBoundaries` object from column level statistics.
pub fn try_from_column(
schema: &Schema,
field: &Field,
col_stats: &ColumnStatistics,
col_index: usize,
) -> Result<Self> {
let field = schema.fields().get(col_index).ok_or_else(|| {
internal_datafusion_err!(
"Could not create `ExprBoundaries`: in `try_from_column` `col_index`
has gone out of bounds with a value of {col_index}, the schema has {} columns.",
schema.fields.len()
)
})?;
let empty_field =
ScalarValue::try_from(field.data_type()).unwrap_or(ScalarValue::Null);
let interval = Interval::try_new(
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ impl PhysicalExpr for BinaryExpr {
}
}

self.op().supports_bounds_evaluation()
self.op().supports_interval_evaluation()
&& self.left.supports_bounds_evaluation(schema)
&& self.right.supports_bounds_evaluation(schema)
}
Expand Down
33 changes: 33 additions & 0 deletions datafusion/physical-expr/src/expressions/in_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1468,4 +1468,37 @@ mod tests {

Ok(())
}

#[test]
fn test_in_list_bounds_eval() -> Result<()> {
let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
let col_a = col("a", &schema)?;
let list = vec![lit(0i64), lit(2i64), lit(9i64), lit(6i64)];

let expr = in_list(col_a, list, &false, &schema).unwrap();

let child_intervals: &[&Interval] = &[
&Interval::make(Some(3_i64), Some(5_i64))?,
&Interval::make(Some(0_i64), Some(2_i64))?,
&Interval::make(Some(6_i64), Some(9_i64))?,
];
let result = expr.evaluate_bounds(child_intervals)?;
debug_assert_eq!(result, Interval::UNCERTAIN);

let child_intervals: &[&Interval] = &[
&Interval::make(Some(3_i64), Some(5_i64))?,
&Interval::make(Some(4_i64), Some(4_i64))?,
];
let result = expr.evaluate_bounds(child_intervals)?;
debug_assert_eq!(result, Interval::CERTAINLY_TRUE);

let child_intervals: &[&Interval] = &[
&Interval::make(Some(3_i64), Some(5_i64))?,
&Interval::make(Some(10_i64), Some(10_i64))?,
];
let result = expr.evaluate_bounds(child_intervals)?;
debug_assert_eq!(result, Interval::CERTAINLY_FALSE);

Ok(())
}
}
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/expressions/is_not_null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl PhysicalExpr for IsNotNullExpr {

fn evaluate_bounds(&self, children: &[&Interval]) -> Result<Interval> {
let inner = children[0];
Ok(if inner.is_null() {
Ok(if inner.is_unbounded() {
Interval::CERTAINLY_FALSE
} else if inner.lower().is_null() || inner.upper().is_null() {
Interval::UNCERTAIN
Expand Down

0 comments on commit d14747d

Please sign in to comment.