From a5f5729ff5f98f6bbd6e02957b7ced21b3d98357 Mon Sep 17 00:00:00 2001 From: Christoph Schulze Date: Thu, 19 Dec 2024 14:24:06 +0100 Subject: [PATCH 01/19] Support temporal data types in interval arithmetics --- datafusion/common/src/scalar/mod.rs | 11 + .../src/physical_optimizer/join_selection.rs | 0 .../physical_optimizer/pipeline_checker.rs | 0 datafusion/expr-common/Cargo.toml | 3 + .../expr-common/src/interval_arithmetic.rs | 574 ++++++++++++++++-- datafusion/expr-common/src/operator.rs | 20 + .../expr-common/src/type_coercion/binary.rs | 30 + datafusion/expr/src/udf.rs | 22 +- .../src/expressions/column.rs | 0 .../physical-expr-common/src/physical_expr.rs | 7 +- datafusion/physical-expr-common/src/utils.rs | 26 + .../physical-expr/src/expressions/binary.rs | 16 + .../physical-expr/src/expressions/cast.rs | 6 +- .../physical-expr/src/expressions/column.rs | 9 + .../physical-expr/src/expressions/in_list.rs | 51 ++ .../src/expressions/is_not_null.rs | 12 + .../physical-expr/src/expressions/literal.rs | 17 +- .../physical-expr/src/expressions/negative.rs | 5 + .../physical-expr/src/expressions/not.rs | 1 + .../physical-expr/src/intervals/cp_solver.rs | 75 ++- .../physical-expr/src/intervals/utils.rs | 74 --- .../physical-expr/src/scalar_function.rs | 5 + .../physical-optimizer/src/sanity_checker.rs | 6 +- datafusion/physical-plan/src/filter.rs | 3 +- datafusion/sqllogictest/test_files/dates.slt | 1 + 25 files changed, 808 insertions(+), 166 deletions(-) create mode 100644 datafusion/core/src/physical_optimizer/join_selection.rs create mode 100644 datafusion/core/src/physical_optimizer/pipeline_checker.rs create mode 100644 datafusion/physical-expr-common/src/expressions/column.rs diff --git a/datafusion/common/src/scalar/mod.rs b/datafusion/common/src/scalar/mod.rs index 6cd6a43941c8..74044c0bfb8b 100644 --- a/datafusion/common/src/scalar/mod.rs +++ b/datafusion/common/src/scalar/mod.rs @@ -1459,6 +1459,17 @@ impl ScalarValue { } } + /// Returns negation for a boolean scalar value + pub fn boolean_negate(&self) -> Result { + match self { + ScalarValue::Boolean(None) => Ok(self.clone()), + ScalarValue::Boolean(Some(value)) => Ok(ScalarValue::Boolean(Some(!value))), + value => { + _internal_err!("Can not run boolean negative on scalar value {value:?}") + } + } + } + /// Wrapping addition of `ScalarValue` /// /// NB: operating on `ScalarValue` directly is not efficient, performance sensitive code diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/datafusion/core/src/physical_optimizer/pipeline_checker.rs b/datafusion/core/src/physical_optimizer/pipeline_checker.rs new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/datafusion/expr-common/Cargo.toml b/datafusion/expr-common/Cargo.toml index 109d8e0b89a6..4cc758861565 100644 --- a/datafusion/expr-common/Cargo.toml +++ b/datafusion/expr-common/Cargo.toml @@ -41,3 +41,6 @@ arrow = { workspace = true } datafusion-common = { workspace = true } itertools = { workspace = true } paste = "^1.0" + +[dev-dependencies] +arrow-buffer = { workspace = true } \ No newline at end of file diff --git a/datafusion/expr-common/src/interval_arithmetic.rs b/datafusion/expr-common/src/interval_arithmetic.rs index 7f20020c3457..4c1b74c1f71f 100644 --- a/datafusion/expr-common/src/interval_arithmetic.rs +++ b/datafusion/expr-common/src/interval_arithmetic.rs @@ -30,7 +30,7 @@ use arrow::datatypes::{ MIN_DECIMAL128_FOR_EACH_PRECISION, MIN_DECIMAL256_FOR_EACH_PRECISION, }; use datafusion_common::rounding::{alter_fp_rounding_mode, next_down, next_up}; -use datafusion_common::{internal_err, Result, ScalarValue}; +use datafusion_common::{internal_err, DataFusionError, Result, ScalarValue}; macro_rules! get_extreme_value { ($extreme:ident, $value:expr) => { @@ -306,23 +306,15 @@ impl Interval { // Standardize floating-point endpoints: DataType::Float32 => handle_float_intervals!(Float32, f32, lower, upper), DataType::Float64 => handle_float_intervals!(Float64, f64, lower, upper), - // Unsigned null values for lower bounds are set to zero: - DataType::UInt8 if lower.is_null() => Self { - lower: ScalarValue::UInt8(Some(0)), - upper, - }, - DataType::UInt16 if lower.is_null() => Self { - lower: ScalarValue::UInt16(Some(0)), - upper, - }, - DataType::UInt32 if lower.is_null() => Self { - lower: ScalarValue::UInt32(Some(0)), - upper, - }, - DataType::UInt64 if lower.is_null() => Self { - lower: ScalarValue::UInt64(Some(0)), - upper, - }, + // Lower bounds of unsigned integer null values are set to zero: + DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 + if lower.is_null() => + { + Self { + lower: ScalarValue::new_zero(&lower.data_type()).unwrap(), + upper, + } + } // Other data types do not require standardization: _ => Self { lower, upper }, } @@ -405,8 +397,8 @@ impl Interval { // There must be no way to create an interval whose endpoints have // different types. - assert!( - lower_type == upper_type, + assert_eq!( + lower_type, upper_type, "Interval bounds have different types: {lower_type} != {upper_type}" ); lower_type @@ -424,6 +416,10 @@ impl Interval { ) } + pub fn is_null(&self) -> bool { + self.lower.is_null() && self.upper.is_null() + } + pub const CERTAINLY_FALSE: Self = Self { lower: ScalarValue::Boolean(Some(false)), upper: ScalarValue::Boolean(Some(false)), @@ -617,6 +613,66 @@ impl Interval { } } + /// Compute the union of this interval with the given interval. + /// + /// NOTE: This function only works with intervals of the same data type. + /// Attempting to compare intervals of different data types will lead + /// to an error. + pub fn union>(&self, other: T) -> Result> { + let rhs = other.borrow(); + + if self.data_type().ne(&rhs.data_type()) { + BinaryTypeCoercer::new(&self.data_type(), &Operator::Plus, &rhs.data_type()).get_result_type() + .map_err(|e| + DataFusionError::Internal( + format!( + "Cannot coerce data types for interval union, lhs:{}, rhs:{}. internal error: {}", + self.data_type(), + rhs.data_type(), + e + )) + )?; + }; + + // If the upper bound of one side is less than the lower bound of the + // other side or vice versa, then the resulting interval is expanded + // accordingly. Note that, this can only happen if one side has a null + // value. + // + // Examples: + // [1, 2] ∪ [3, NULL] = [1, 3] + // [3, NULL] ∪ [1, NULL] = [1, 3] + // [3, NULL] ∪ [NULL, 1] = [1, 3] + let (lower, upper) = if (!(self.lower.is_null() || rhs.upper.is_null()) + && self.lower > rhs.upper) + || (!(self.upper.is_null() || rhs.lower.is_null()) && self.upper < rhs.lower) + { + ( + min_of_bounds( + &min_of_bounds(&min_of_bounds(&self.lower, &rhs.lower), &self.upper), + &rhs.upper, + ), + max_of_bounds( + &max_of_bounds(&max_of_bounds(&self.lower, &rhs.lower), &self.upper), + &rhs.upper, + ), + ) + } else { + ( + min_of_bounds(&self.lower, &rhs.lower), + max_of_bounds(&self.upper, &rhs.upper), + ) + }; + + // New lower and upper bounds must always construct a valid interval. + assert!( + lower.is_null() || upper.is_null() || (lower <= upper), + "The union of two intervals can not be an invalid interval" + ); + + Ok(Some(Self { lower, upper })) + } + /// Compute the intersection of this interval with the given interval. /// If the intersection is empty, return `None`. /// @@ -625,13 +681,19 @@ impl Interval { /// to an error. pub fn intersect>(&self, other: T) -> Result> { let rhs = other.borrow(); + if self.data_type().ne(&rhs.data_type()) { - return internal_err!( - "Only intervals with the same data type are intersectable, lhs:{}, rhs:{}", - self.data_type(), - rhs.data_type() - ); - }; + BinaryTypeCoercer::new(&self.data_type(), &Operator::Plus, &rhs.data_type()).get_result_type() + .map_err(|e| + DataFusionError::Internal( + format!( + "Cannot coerce data types for interval intersection, lhs:{}, rhs:{}. internal error: {}", + self.data_type(), + rhs.data_type(), + e + )) + )?; + } // If it is evident that the result is an empty interval, short-circuit // and directly return `None`. @@ -646,7 +708,7 @@ impl Interval { // New lower and upper bounds must always construct a valid interval. assert!( - (lower.is_null() || upper.is_null() || (lower <= upper)), + lower.is_null() || upper.is_null() || (lower <= upper), "The intersection of two intervals can not be an invalid interval" ); @@ -880,6 +942,19 @@ impl Interval { upper: self.lower().clone().arithmetic_negate()?, }) } + + pub fn boolean_negate(self) -> Result { + if self.data_type() != DataType::Boolean { + return internal_err!( + "Boolean negation is only supported for boolean intervals" + ); + } + + Ok(Self { + lower: self.lower().clone().boolean_negate()?, + upper: self.upper().clone().boolean_negate()?, + }) + } } impl Display for Interval { @@ -902,6 +977,15 @@ pub fn apply_operator(op: &Operator, lhs: &Interval, rhs: &Interval) -> Result lhs.sub(rhs), Operator::Multiply => lhs.mul(rhs), Operator::Divide => lhs.div(rhs), + Operator::IsDistinctFrom | Operator::IsNotDistinctFrom => { + NullableInterval::from(lhs) + .apply_operator(op, &rhs.into()) + .and_then(|x| { + x.values().cloned().ok_or(DataFusionError::Internal( + "Unexpected null value interval".to_string(), + )) + }) + } _ => internal_err!("Interval arithmetic does not support the operator {op}"), } } @@ -1058,14 +1142,14 @@ fn handle_overflow( } } -// This function should remain private since it may corrupt the an interval if +// This function should remain private since it may corrupt an interval if // used without caution. fn next_value(value: ScalarValue) -> ScalarValue { use ScalarValue::*; value_transition!(MAX, true, value) } -// This function should remain private since it may corrupt the an interval if +// This function should remain private since it may corrupt an interval if // used without caution. fn prev_value(value: ScalarValue) -> ScalarValue { use ScalarValue::*; @@ -1075,10 +1159,10 @@ fn prev_value(value: ScalarValue) -> ScalarValue { trait OneTrait: Sized + std::ops::Add + std::ops::Sub { fn one() -> Self; } -macro_rules! impl_OneTrait{ +macro_rules! impl_one_trait { ($($m:ty),*) => {$( impl OneTrait for $m { fn one() -> Self { 1 as $m } })*} } -impl_OneTrait! {u8, u16, u32, u64, i8, i16, i32, i64, i128} +impl_one_trait! {u8, u16, u32, u64, i8, i16, i32, i64, i128} impl OneTrait for IntervalDayTime { fn one() -> Self { @@ -1237,18 +1321,18 @@ pub fn satisfy_greater( } if !left.upper.is_null() && left.upper <= right.lower { - if !strict && left.upper == right.lower { + return if !strict && left.upper == right.lower { // Singleton intervals: - return Ok(Some(( + Ok(Some(( Interval::new(left.upper.clone(), left.upper.clone()), Interval::new(left.upper.clone(), left.upper.clone()), - ))); + ))) } else { // Left-hand side: <--======----0------------> // Right-hand side: <------------0--======----> // No intersection, infeasible to propagate: - return Ok(None); - } + Ok(None) + }; } // Only the lower bound of left-hand side and the upper bound of the right-hand @@ -1629,6 +1713,24 @@ impl Display for NullableInterval { } } +impl From<&Interval> for NullableInterval { + fn from(value: &Interval) -> Self { + if value.is_null() { + Self::Null { + datatype: value.data_type(), + } + } else if value.lower.is_null() || value.upper.is_null() { + Self::MaybeNull { + values: value.clone(), + } + } else { + Self::NotNull { + values: value.clone(), + } + } + } +} + impl From for NullableInterval { /// Create an interval that represents a single value. fn from(value: ScalarValue) -> Self { @@ -1868,7 +1970,13 @@ mod tests { }; use arrow::datatypes::DataType; + use arrow_buffer::IntervalDayTime as ArrowIntervalDayTime; + use datafusion_common::ScalarValue::{ + Date32, DurationMillisecond, DurationSecond, IntervalDayTime, IntervalYearMonth, + TimestampSecond, + }; use datafusion_common::{Result, ScalarValue}; + use ScalarValue::{Date64, Time32Millisecond}; #[test] fn test_next_prev_value() -> Result<()> { @@ -2085,6 +2193,33 @@ mod tests { prev_value(ScalarValue::Float32(Some(-1.0))), )?, ), + ( + Interval::new(Date64(Some(1)), Date64(Some(1))), + Interval::new( + Date64(Some(-1)), + Date64(Some(-1)), + ), + ), + ( + Interval::new( + TimestampSecond(Some(1), None), + TimestampSecond(Some(10), None), + ), + Interval::new( + TimestampSecond(Some(-10), None), + TimestampSecond(Some(-1), None), + ), + ), + ( + Interval::new( + DurationSecond(Some(1)), + DurationSecond(Some(10)), + ), + Interval::new( + DurationSecond(Some(-10)), + DurationSecond(Some(-1)), + ), + ), ]; for (first, second) in exactly_gt_cases { assert_eq!(first.gt(second.clone())?, Interval::CERTAINLY_TRUE); @@ -2122,6 +2257,33 @@ mod tests { ScalarValue::Float32(Some(-1.0_f32)), )?, ), + ( + Interval::new( + Date64(Some(1)), + Date64(Some(10)), + ), + Interval::new(Date64(Some(1)), Date64(Some(1))), + ), + ( + Interval::new( + TimestampSecond(Some(1), None), + TimestampSecond(Some(10), None), + ), + Interval::new( + TimestampSecond(Some(1), None), + TimestampSecond(Some(1), None), + ), + ), + ( + Interval::new( + DurationSecond(Some(1)), + DurationSecond(Some(10)), + ), + Interval::new( + DurationSecond(Some(1)), + DurationSecond(Some(1)), + ), + ), ]; for (first, second) in possibly_gt_cases { assert_eq!(first.gt(second.clone())?, Interval::UNCERTAIN); @@ -2159,6 +2321,36 @@ mod tests { next_value(ScalarValue::Float32(Some(-1.0_f32))), )?, ), + ( + Interval::new( + Date64(Some(1)), + Date64(Some(10)), + ), + Interval::new( + Date64(Some(10)), + Date64(Some(100)), + ), + ), + ( + Interval::new( + TimestampSecond(Some(1), None), + TimestampSecond(Some(10), None), + ), + Interval::new( + TimestampSecond(Some(10), None), + TimestampSecond(None, None), + ), + ), + ( + Interval::new( + DurationSecond(Some(-10)), + DurationSecond(Some(-1)), + ), + Interval::new( + DurationSecond(Some(1)), + DurationSecond(Some(1)), + ), + ), ]; for (first, second) in not_gt_cases { assert_eq!(first.gt(second.clone())?, Interval::CERTAINLY_FALSE); @@ -2205,6 +2397,36 @@ mod tests { ScalarValue::Float32(Some(-1.0)), )?, ), + ( + Interval::new( + ScalarValue::Time32Second(Some(0)), + ScalarValue::Time32Second(Some(10)), + ), + Interval::new( + ScalarValue::Time32Second(Some(-1)), + ScalarValue::Time32Second(Some(-1)), + ), + ), + ( + Interval::new( + TimestampSecond(Some(1), None), + TimestampSecond(Some(10), None), + ), + Interval::new( + TimestampSecond(Some(1), None), + TimestampSecond(Some(1), None), + ), + ), + ( + Interval::new( + DurationSecond(Some(-10)), + DurationSecond(Some(1)), + ), + Interval::new( + DurationSecond(Some(-10)), + DurationSecond(Some(-10)), + ), + ), ]; for (first, second) in exactly_gteq_cases { assert_eq!(first.gt_eq(second.clone())?, Interval::CERTAINLY_TRUE); @@ -2242,6 +2464,36 @@ mod tests { next_value(ScalarValue::Float32(Some(-1.0_f32))), )?, ), + ( + Interval::new( + ScalarValue::Time32Second(Some(0)), + ScalarValue::Time32Second(Some(10)), + ), + Interval::new( + ScalarValue::Time32Second(Some(0)), + ScalarValue::Time32Second(None), + ), + ), + ( + Interval::new( + TimestampSecond(Some(1), None), + TimestampSecond(Some(10), None), + ), + Interval::new( + TimestampSecond(Some(1), None), + TimestampSecond(Some(10), None), + ), + ), + ( + Interval::new( + DurationSecond(Some(-10)), + DurationSecond(Some(1)), + ), + Interval::new( + DurationSecond(None), + DurationSecond(Some(0)), + ), + ), ]; for (first, second) in possibly_gteq_cases { assert_eq!(first.gt_eq(second.clone())?, Interval::UNCERTAIN); @@ -2275,6 +2527,36 @@ mod tests { next_value(ScalarValue::Float32(Some(-1.0))), )?, ), + ( + Interval::new( + ScalarValue::Time32Second(Some(-10)), + ScalarValue::Time32Second(Some(0)), + ), + Interval::new( + ScalarValue::Time32Second(Some(1)), + ScalarValue::Time32Second(Some(10)), + ), + ), + ( + Interval::new( + TimestampSecond(Some(5), None), + TimestampSecond(Some(9), None), + ), + Interval::new( + TimestampSecond(Some(10), None), + TimestampSecond(Some(100), None), + ), + ), + ( + Interval::new( + DurationSecond(None), + DurationSecond(Some(-1)), + ), + Interval::new( + DurationSecond(Some(0)), + DurationSecond(Some(1)), + ), + ), ]; for (first, second) in not_gteq_cases { assert_eq!(first.gt_eq(second.clone())?, Interval::CERTAINLY_FALSE); @@ -2303,6 +2585,46 @@ mod tests { Interval::make(Some(f64::MIN), Some(f64::MIN))?, Interval::make(Some(f64::MIN), Some(f64::MIN))?, ), + ( + Interval::new( + Date64(Some(1000)), + Date64(Some(1000)), + ), + Interval::new( + Date64(Some(1000)), + Date64(Some(1000)), + ), + ), + ( + Interval::new( + Time32Millisecond(Some(1000)), + Time32Millisecond(Some(1000)), + ), + Interval::new( + Time32Millisecond(Some(1000)), + Time32Millisecond(Some(1000)), + ), + ), + ( + Interval::new( + IntervalYearMonth(Some(10)), + IntervalYearMonth(Some(10)), + ), + Interval::new( + IntervalYearMonth(Some(10)), + IntervalYearMonth(Some(10)), + ), + ), + ( + Interval::new( + DurationSecond(Some(10)), + DurationSecond(Some(10)), + ), + Interval::new( + DurationSecond(Some(10)), + DurationSecond(Some(10)), + ), + ), ]; for (first, second) in exactly_eq_cases { assert_eq!(first.equal(second.clone())?, Interval::CERTAINLY_TRUE); @@ -2488,6 +2810,17 @@ mod tests { Interval::make(Some(32.0_f64), Some(64.0_f64))?, Interval::make(Some(32.0_f64), Some(32.0_f64))?, ), + ( + Interval::new(DurationSecond(Some(1)), DurationSecond(Some(10))), + Interval::new( + DurationMillisecond(Some(1001)), + DurationMillisecond(Some(1010)), + ), + Interval::new( + DurationMillisecond(Some(1001)), + DurationMillisecond(Some(1010)), + ), + ), ]; for (first, second, expected) in possible_cases { assert_eq!(first.intersect(second)?.unwrap(), expected) @@ -2532,6 +2865,107 @@ mod tests { Ok(()) } + #[test] + fn test_union() -> Result<()> { + let possible_cases: Vec<(Interval, Interval, Interval)> = vec![ + ( + Interval::make(Some(1000_i64), None)?, + Interval::make::(None, None)?, + Interval::make(Some(1000_i64), None)?, + ), + ( + Interval::make(Some(1000_i64), None)?, + Interval::make(Some(1000_i64), None)?, + Interval::make(Some(1000_i64), None)?, + ), + ( + Interval::make(Some(1000_i64), None)?, + Interval::make(None, Some(2000_i64))?, + Interval::make(Some(1000_i64), Some(2000_i64))?, + ), + ( + Interval::make::(None, None)?, + Interval::make(Some(1000_i64), Some(2000_i64))?, + Interval::make(Some(1000_i64), Some(2000_i64))?, + ), + ( + Interval::make(Some(1000_i64), Some(2000_i64))?, + Interval::make::(None, None)?, + Interval::make(Some(1000_i64), Some(2000_i64))?, + ), + ( + Interval::make::(None, None)?, + Interval::make::(None, None)?, + Interval::make::(None, None)?, + ), + ( + Interval::make(Some(1000_i64), Some(2000_i64))?, + Interval::make(Some(1000_i64), None)?, + Interval::make(Some(1000_i64), Some(2000_i64))?, + ), + ( + Interval::make(Some(1000_i64), Some(2000_i64))?, + Interval::make(Some(1000_i64), Some(1500_i64))?, + Interval::make(Some(1000_i64), Some(2000_i64))?, + ), + ( + Interval::make(Some(1000_i64), Some(2000_i64))?, + Interval::make(Some(500_i64), Some(1500_i64))?, + Interval::make(Some(500_i64), Some(2000_i64))?, + ), + ( + Interval::make(Some(1000_i64), None)?, + Interval::make(None, Some(10_i64))?, + Interval::make(Some(10_i64), Some(1000_i64))?, + ), + ( + Interval::make(Some(1000_i64), None)?, + Interval::make(Some(1_i64), Some(10_i64))?, + Interval::make(Some(1_i64), Some(1000_i64))?, + ), + ( + Interval::make(None, Some(2000_u64))?, + Interval::make(Some(500_u64), None)?, + Interval::make(Some(0_u64), Some(2000_u64))?, + ), + ( + Interval::make(Some(0_u64), Some(0_u64))?, + Interval::make(Some(0_u64), None)?, + Interval::make(Some(0_u64), Some(0_u64))?, + ), + ( + Interval::make(Some(1000.0_f32), None)?, + Interval::make(None, Some(1000.0_f32))?, + Interval::make(Some(1000.0_f32), Some(1000.0_f32))?, + ), + ( + Interval::make(Some(1000.0_f32), Some(1500.0_f32))?, + Interval::make(Some(0.0_f32), Some(1500.0_f32))?, + Interval::make(Some(0.0_f32), Some(1500.0_f32))?, + ), + ( + Interval::make(Some(-1000.0_f64), Some(1500.0_f64))?, + Interval::make(Some(-1500.0_f64), Some(2000.0_f64))?, + Interval::make(Some(-1500.0_f64), Some(2000.0_f64))?, + ), + ( + Interval::make(Some(16.0_f64), Some(32.0_f64))?, + Interval::make(Some(32.0_f64), Some(64.0_f64))?, + Interval::make(Some(16.0_f64), Some(64.0_f64))?, + ), + ( + Interval::new(DurationSecond(Some(1)), DurationSecond(Some(10))), + Interval::new(DurationSecond(Some(10)), DurationSecond(Some(100))), + Interval::new(DurationSecond(Some(1)), DurationSecond(Some(100))), + ), + ]; + for (first, second, expected) in possible_cases { + assert_eq!(first.union(second.clone())?.unwrap(), expected) + } + + Ok(()) + } + #[test] fn test_contains() -> Result<()> { let possible_cases = vec![ @@ -2545,6 +2979,28 @@ mod tests { Interval::make(Some(1501_i64), Some(1999_i64))?, Interval::CERTAINLY_TRUE, ), + ( + Interval::new( + TimestampSecond(Some(1), None), + TimestampSecond(Some(10), None), + ), + Interval::new( + TimestampSecond(Some(2), None), + TimestampSecond(Some(5), None), + ), + Interval::CERTAINLY_TRUE, + ), + ( + Interval::new( + DurationSecond(Some(0)), + DurationSecond(Some(600)), + ), + Interval::new( + DurationSecond(Some(1)), + DurationSecond(Some(599)), + ), + Interval::CERTAINLY_TRUE, + ), ( Interval::make(Some(1000_i64), None)?, Interval::make::(None, None)?, @@ -2586,6 +3042,17 @@ mod tests { Interval::make(Some(1.0_f32), Some(1.0_f32))?, Interval::CERTAINLY_FALSE, ), + ( + Interval::new( + ScalarValue::Time32Second(Some(0)), + ScalarValue::Time32Second(Some(60)), + ), + Interval::new( + ScalarValue::Time32Second(Some(61)), + ScalarValue::Time32Second(Some(120)), + ), + Interval::CERTAINLY_FALSE, + ), ]; for (first, second, expected) in possible_cases { assert_eq!(first.contains(second)?, expected) @@ -2662,6 +3129,41 @@ mod tests { Interval::make(None, Some(200_f64))?, Interval::make(None, Some(300_f64))?, ), + ( + Interval::new( + TimestampSecond(Some(100), None), + TimestampSecond(Some(200), None), + ), + Interval::new(DurationSecond(Some(100)), DurationSecond(Some(200))), + Interval::new( + TimestampSecond(Some(200), None), + TimestampSecond(Some(400), None), + ), + ), + ( + Interval::new(Date32(Some(100)), Date32(Some(100))), + Interval::new( + IntervalDayTime(Some(ArrowIntervalDayTime { + days: 1, + milliseconds: 0, + })), + IntervalDayTime(Some(ArrowIntervalDayTime { + days: 10, + milliseconds: 0, + })), + ), + Interval::new(Date32(Some(101)), Date32(Some(110))), + ), + ( + Interval::new(DurationSecond(Some(100)), DurationSecond(Some(100))), + Interval::new(DurationSecond(Some(100)), DurationSecond(Some(100))), + Interval::new(DurationSecond(Some(200)), DurationSecond(Some(200))), + ), + ( + Interval::new(IntervalYearMonth(Some(100)), IntervalYearMonth(Some(100))), + Interval::new(IntervalYearMonth(Some(100)), IntervalYearMonth(Some(100))), + Interval::new(IntervalYearMonth(Some(200)), IntervalYearMonth(Some(200))), + ), ]; for case in cases { let result = case.0.add(case.1)?; diff --git a/datafusion/expr-common/src/operator.rs b/datafusion/expr-common/src/operator.rs index 6ca0f04897ac..2f471c898560 100644 --- a/datafusion/expr-common/src/operator.rs +++ b/datafusion/expr-common/src/operator.rs @@ -164,6 +164,26 @@ impl Operator { ) } + /// Indicates whether this operator supports interval arithmetic + pub fn supports_bounds_evaluation(&self) -> bool { + matches!( + &self, + &Operator::Plus + | &Operator::Minus + | &Operator::And + | &Operator::Gt + | &Operator::GtEq + | &Operator::Lt + | &Operator::LtEq + | &Operator::Eq + | &Operator::NotEq + | &Operator::Multiply + | &Operator::Divide + | &Operator::IsDistinctFrom + | &Operator::IsNotDistinctFrom + ) + } + /// Return true if the comparison operator can be used in interval arithmetic and constraint /// propagation /// diff --git a/datafusion/expr-common/src/type_coercion/binary.rs b/datafusion/expr-common/src/type_coercion/binary.rs index 3be35490a4d0..5159333b9f56 100644 --- a/datafusion/expr-common/src/type_coercion/binary.rs +++ b/datafusion/expr-common/src/type_coercion/binary.rs @@ -1484,6 +1484,7 @@ fn null_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option { #[cfg(test)] mod tests { use super::*; + use arrow::datatypes::IntervalUnit::{MonthDayNano, YearMonth}; use datafusion_common::assert_contains; @@ -1874,6 +1875,35 @@ mod tests { Ok(()) } + #[test] + fn test_type_coercion_temporal() -> Result<()> { + test_coercion_binary_rule!( + DataType::Duration(TimeUnit::Second), + DataType::Duration(TimeUnit::Second), + Operator::Plus, + DataType::Duration(TimeUnit::Second) + ); + test_coercion_binary_rule!( + DataType::Duration(TimeUnit::Second), + DataType::Duration(TimeUnit::Nanosecond), + Operator::Plus, + DataType::Duration(TimeUnit::Nanosecond) + ); + test_coercion_binary_rule!( + DataType::Interval(YearMonth), + DataType::Interval(YearMonth), + Operator::Plus, + DataType::Interval(YearMonth) + ); + test_coercion_binary_rule!( + DataType::Interval(YearMonth), + DataType::Interval(MonthDayNano), + Operator::Plus, + DataType::Interval(MonthDayNano) + ); + Ok(()) + } + #[test] fn test_type_coercion_arithmetic() -> Result<()> { // integer diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs index 7c91b6b3b4ab..6f1219c926ce 100644 --- a/datafusion/expr/src/udf.rs +++ b/datafusion/expr/src/udf.rs @@ -23,7 +23,7 @@ use crate::sort_properties::{ExprProperties, SortProperties}; use crate::{ ColumnarValue, Documentation, Expr, ScalarFunctionImplementation, Signature, }; -use arrow::datatypes::DataType; +use arrow::datatypes::{DataType, SchemaRef}; use datafusion_common::{not_impl_err, ExprSchema, Result, ScalarValue}; use datafusion_expr_common::interval_arithmetic::Interval; use std::any::Any; @@ -281,6 +281,11 @@ impl ScalarUDF { self.inner.evaluate_bounds(inputs) } + /// Indicates whether this ['ScalarUDF'] supports interval arithmetic. + pub fn supports_bounds_evaluation(&self, schema: &SchemaRef) -> bool { + self.inner.supports_bounds_evaluation(schema) + } + /// Updates bounds for child expressions, given a known interval for this /// function. This is used to propagate constraints down through an expression /// tree. @@ -733,9 +738,18 @@ pub trait ScalarUDFImpl: Debug + Send + Sync { /// /// If the function is `ABS(a)`, and the input interval is `a: [-3, 2]`, /// then the output interval would be `[0, 3]`. - fn evaluate_bounds(&self, _input: &[&Interval]) -> Result { - // We cannot assume the input datatype is the same of output type. - Interval::make_unbounded(&DataType::Null) + fn evaluate_bounds(&self, input: &[&Interval]) -> Result { + let input_data_types = input + .iter() + .map(|i| i.data_type()) + .collect::>(); + let return_type = self.return_type(&input_data_types)?; + Interval::make_unbounded(&return_type) + } + + /// Indicates whether this ['ScalarUDFImpl'] supports interval arithmetic. + fn supports_bounds_evaluation(&self, _schema: &SchemaRef) -> bool { + false } /// Updates bounds for child expressions, given a known interval for this diff --git a/datafusion/physical-expr-common/src/expressions/column.rs b/datafusion/physical-expr-common/src/expressions/column.rs new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index b1b889136b35..f1e16c4b1c2a 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -24,7 +24,7 @@ use crate::utils::scatter; use arrow::array::BooleanArray; use arrow::compute::filter_record_batch; -use arrow::datatypes::{DataType, Schema}; +use arrow::datatypes::{DataType, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use datafusion_common::{internal_err, not_impl_err, Result}; use datafusion_expr_common::columnar_value::ColumnarValue; @@ -111,6 +111,11 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug + DynEq + DynHash { not_impl_err!("Not implemented for {self}") } + /// Indicates whether interval arithmetic is supported for this expression. + fn supports_bounds_evaluation(&self, _schema: &SchemaRef) -> bool { + false + } + /// Updates bounds for child expressions, given a known interval for this /// expression. /// diff --git a/datafusion/physical-expr-common/src/utils.rs b/datafusion/physical-expr-common/src/utils.rs index 114007bfa6af..c55fccdedc73 100644 --- a/datafusion/physical-expr-common/src/utils.rs +++ b/datafusion/physical-expr-common/src/utils.rs @@ -26,6 +26,7 @@ use datafusion_expr_common::sort_properties::ExprProperties; use crate::physical_expr::PhysicalExpr; use crate::sort_expr::{LexOrdering, PhysicalSortExpr}; use crate::tree_node::ExprContext; +use arrow::datatypes::{DataType}; /// Represents a [`PhysicalExpr`] node with associated properties (order and /// range) in a context where properties are tracked. @@ -103,6 +104,31 @@ pub fn reverse_order_bys(order_bys: &LexOrdering) -> LexOrdering { .collect() } +/// Indicates whether interval arithmetic is supported for the given data type. +pub fn is_supported_datatype_for_bounds_eval(data_type: &DataType) -> bool { + matches!( + data_type, + &DataType::Int64 + | &DataType::Int32 + | &DataType::Int16 + | &DataType::Int8 + | &DataType::UInt64 + | &DataType::UInt32 + | &DataType::UInt16 + | &DataType::UInt8 + | &DataType::Float64 + | &DataType::Float32 + | &DataType::Float16 + | &DataType::Timestamp(_, _) + | &DataType::Date32 + | &DataType::Date64 + | &DataType::Time32(_) + | &DataType::Time64(_) + | &DataType::Interval(_) + | &DataType::Duration(_) + ) +} + #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index 1713842f410e..e0a4717b93e9 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -376,6 +376,22 @@ impl PhysicalExpr for BinaryExpr { )) } + fn supports_bounds_evaluation(&self, schema: &SchemaRef) -> bool { + // Interval data types must be compatible for the given operation + if let (Ok(lhs), Ok(rhs)) = ( + self.left.data_type(schema.as_ref()), + self.right.data_type(schema.as_ref()), + ) { + if BinaryTypeCoercer::new(&lhs, &self.op, &rhs).get_result_type().is_err() { + return false; + } + } + + self.op().supports_bounds_evaluation() + && self.left.supports_bounds_evaluation(schema) + && self.right.supports_bounds_evaluation(schema) + } + fn evaluate_bounds(&self, children: &[&Interval]) -> Result { // Get children intervals: let left_interval = children[0]; diff --git a/datafusion/physical-expr/src/expressions/cast.rs b/datafusion/physical-expr/src/expressions/cast.rs index 8a093e0ae92e..88d465f4a6be 100644 --- a/datafusion/physical-expr/src/expressions/cast.rs +++ b/datafusion/physical-expr/src/expressions/cast.rs @@ -23,7 +23,7 @@ use std::sync::Arc; use crate::physical_expr::PhysicalExpr; use arrow::compute::{can_cast_types, CastOptions}; -use arrow::datatypes::{DataType, DataType::*, Schema}; +use arrow::datatypes::{DataType, DataType::*, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use datafusion_common::format::DEFAULT_FORMAT_OPTIONS; use datafusion_common::{not_impl_err, Result}; @@ -164,6 +164,10 @@ impl PhysicalExpr for CastExpr { children[0].cast_to(&self.cast_type, &self.cast_options) } + fn supports_bounds_evaluation(&self, schema: &SchemaRef) -> bool { + self.expr().supports_bounds_evaluation(schema) + } + fn propagate_constraints( &self, interval: &Interval, diff --git a/datafusion/physical-expr/src/expressions/column.rs b/datafusion/physical-expr/src/expressions/column.rs index 0649cbd65d34..52bd6e05a82d 100644 --- a/datafusion/physical-expr/src/expressions/column.rs +++ b/datafusion/physical-expr/src/expressions/column.rs @@ -30,6 +30,7 @@ use arrow_schema::SchemaRef; use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_common::{internal_err, plan_err, Result}; use datafusion_expr::ColumnarValue; +use datafusion_physical_expr_common::utils::is_supported_datatype_for_bounds_eval; /// Represents the column at a given index in a RecordBatch /// @@ -138,6 +139,14 @@ impl PhysicalExpr for Column { ) -> Result> { Ok(self) } + + fn supports_bounds_evaluation(&self, schema: &SchemaRef) -> bool { + if let Ok(field) = schema.field_with_name(self.name()) { + is_supported_datatype_for_bounds_eval(field.data_type()) + } else { + false + } + } } impl Column { diff --git a/datafusion/physical-expr/src/expressions/in_list.rs b/datafusion/physical-expr/src/expressions/in_list.rs index dfe9a905dfea..ea2c6a1b1a41 100644 --- a/datafusion/physical-expr/src/expressions/in_list.rs +++ b/datafusion/physical-expr/src/expressions/in_list.rs @@ -44,6 +44,7 @@ use datafusion_expr::ColumnarValue; use datafusion_physical_expr_common::datum::compare_with_eq; use ahash::RandomState; +use datafusion_expr::interval_arithmetic::Interval; use datafusion_common::HashMap; use hashbrown::hash_map::RawEntryMut; @@ -398,6 +399,56 @@ impl PhysicalExpr for InListExpr { self.static_filter.clone(), ))) } + + /// The output interval is computed by checking if the list item intervals are + /// a subset of, overlap, or are disjoint with the input expression's interval. + /// + /// If [InListExpr::negated] is true, the output interval gets negated. + /// + /// # Example: + /// If the input expression's interval is a superset of the + /// conjunction of the list items intervals, the output + /// interval is [`Interval::CERTAINLY_TRUE`]. + /// + /// ```text + /// interval of expr: ....---------------------.... + /// Some list items: ..........|..|.....|.|....... + /// + /// output interval: [`true`, `true`] + /// ``` + fn evaluate_bounds(&self, children: &[&Interval]) -> Result { + let expr_bounds = children[0]; + + // conjunction of list item intervals + let mut list_bounds = children[1].clone(); + if children.len() > 2 { + list_bounds = children[2..] + .iter() + .try_fold(Some(list_bounds), |acc, item| { + if let Some(acc) = acc { + acc.union(*item) + } else { + Some(Interval::make_unbounded(&expr_bounds.data_type())) + .transpose() + } + })? + .unwrap_or(Interval::make_unbounded(&expr_bounds.data_type())?); + } + + if self.negated { + expr_bounds.contains(list_bounds)?.boolean_negate() + } else { + expr_bounds.contains(list_bounds) + } + } + + fn supports_bounds_evaluation(&self, schema: &SchemaRef) -> bool { + self.expr.supports_bounds_evaluation(schema) + && self + .list + .iter() + .all(|expr| expr.supports_bounds_evaluation(schema)) + } } impl PartialEq for InListExpr { diff --git a/datafusion/physical-expr/src/expressions/is_not_null.rs b/datafusion/physical-expr/src/expressions/is_not_null.rs index 8e3544622b80..1235a89bcddd 100644 --- a/datafusion/physical-expr/src/expressions/is_not_null.rs +++ b/datafusion/physical-expr/src/expressions/is_not_null.rs @@ -27,6 +27,7 @@ use arrow::{ }; use datafusion_common::Result; use datafusion_common::ScalarValue; +use datafusion_expr::interval_arithmetic::Interval; use datafusion_expr::ColumnarValue; /// IS NOT NULL expression @@ -104,6 +105,17 @@ impl PhysicalExpr for IsNotNullExpr { ) -> Result> { Ok(Arc::new(IsNotNullExpr::new(Arc::clone(&children[0])))) } + + fn evaluate_bounds(&self, children: &[&Interval]) -> Result { + let inner = children[0]; + Ok(if inner.is_null() { + Interval::CERTAINLY_FALSE + } else if inner.lower().is_null() || inner.upper().is_null() { + Interval::UNCERTAIN + } else { + Interval::CERTAINLY_TRUE + }) + } } /// Create an IS NOT NULL expression diff --git a/datafusion/physical-expr/src/expressions/literal.rs b/datafusion/physical-expr/src/expressions/literal.rs index 232f9769b056..d3589f1defa0 100644 --- a/datafusion/physical-expr/src/expressions/literal.rs +++ b/datafusion/physical-expr/src/expressions/literal.rs @@ -27,11 +27,12 @@ use arrow::{ datatypes::{DataType, Schema}, record_batch::RecordBatch, }; +use arrow_schema::SchemaRef; use datafusion_common::{Result, ScalarValue}; -use datafusion_expr::Expr; -use datafusion_expr_common::columnar_value::ColumnarValue; -use datafusion_expr_common::interval_arithmetic::Interval; -use datafusion_expr_common::sort_properties::{ExprProperties, SortProperties}; +use datafusion_expr::interval_arithmetic::Interval; +use datafusion_expr::sort_properties::{ExprProperties, SortProperties}; +use datafusion_expr::{ColumnarValue, Expr}; +use datafusion_physical_expr_common::utils::is_supported_datatype_for_bounds_eval; /// Represents a literal value #[derive(Debug, PartialEq, Eq, Hash)] @@ -93,6 +94,14 @@ impl PhysicalExpr for Literal { preserves_lex_ordering: true, }) } + + fn supports_bounds_evaluation(&self, schema: &SchemaRef) -> bool { + if let Ok(dt) = self.data_type(schema) { + is_supported_datatype_for_bounds_eval(&dt) + } else { + false + } + } } /// Create a literal expression diff --git a/datafusion/physical-expr/src/expressions/negative.rs b/datafusion/physical-expr/src/expressions/negative.rs index 03f2111aca33..6bf865dfdfbf 100644 --- a/datafusion/physical-expr/src/expressions/negative.rs +++ b/datafusion/physical-expr/src/expressions/negative.rs @@ -28,6 +28,7 @@ use arrow::{ datatypes::{DataType, Schema}, record_batch::RecordBatch, }; +use arrow_schema::SchemaRef; use datafusion_common::{plan_err, Result}; use datafusion_expr::interval_arithmetic::Interval; use datafusion_expr::sort_properties::ExprProperties; @@ -122,6 +123,10 @@ impl PhysicalExpr for NegativeExpr { ) } + fn supports_bounds_evaluation(&self, schema: &SchemaRef) -> bool { + self.arg().supports_bounds_evaluation(schema) + } + /// Returns a new [`Interval`] of a NegativeExpr that has the existing `interval` given that /// given the input interval is known to be `children`. fn propagate_constraints( diff --git a/datafusion/physical-expr/src/expressions/not.rs b/datafusion/physical-expr/src/expressions/not.rs index 440c4e9557bd..ec8d2c64148b 100644 --- a/datafusion/physical-expr/src/expressions/not.rs +++ b/datafusion/physical-expr/src/expressions/not.rs @@ -112,6 +112,7 @@ impl PhysicalExpr for NotExpr { ) -> Result> { Ok(Arc::new(NotExpr::new(Arc::clone(&children[0])))) } + fn evaluate_bounds(&self, children: &[&Interval]) -> Result { children[0].not() } diff --git a/datafusion/physical-expr/src/intervals/cp_solver.rs b/datafusion/physical-expr/src/intervals/cp_solver.rs index 166d2564fdf3..abafce750053 100644 --- a/datafusion/physical-expr/src/intervals/cp_solver.rs +++ b/datafusion/physical-expr/src/intervals/cp_solver.rs @@ -34,11 +34,11 @@ use datafusion_common::{internal_err, Result}; use datafusion_expr::interval_arithmetic::{apply_operator, satisfy_greater, Interval}; use datafusion_expr::Operator; +use datafusion_expr::type_coercion::{is_datetime, is_interval}; use petgraph::graph::NodeIndex; use petgraph::stable_graph::{DefaultIx, StableGraph}; use petgraph::visit::{Bfs, Dfs, DfsPostOrder, EdgeRef}; use petgraph::Outgoing; - // Interval arithmetic provides a way to perform mathematical operations on // intervals, which represent a range of possible values rather than a single // point value. This allows for the propagation of ranges through mathematical @@ -223,42 +223,27 @@ pub fn propagate_arithmetic( right_child: &Interval, ) -> Result> { let inverse_op = get_inverse_op(*op)?; - match (left_child.data_type(), right_child.data_type()) { - // If we have a child whose type is a time interval (i.e. DataType::Interval), - // we need special handling since timestamp differencing results in a - // Duration type. - (DataType::Timestamp(..), DataType::Interval(_)) => { - propagate_time_interval_at_right( - left_child, - right_child, - parent, - op, - &inverse_op, - ) - } - (DataType::Interval(_), DataType::Timestamp(..)) => { - propagate_time_interval_at_left( - left_child, - right_child, - parent, - op, - &inverse_op, - ) - } - _ => { - // First, propagate to the left: - match apply_operator(&inverse_op, parent, right_child)? - .intersect(left_child)? - { - // Left is feasible: - Some(value) => Ok( - // Propagate to the right using the new left. - propagate_right(&value, parent, right_child, op, &inverse_op)? - .map(|right| (value, right)), - ), - // If the left child is infeasible, short-circuit. - None => Ok(None), - } + + // If we have a child whose data type is datetime (i.e. timestamp), + // we need special handling since timestamp differencing results in + // a Duration type. + if is_datetime(&left_child.data_type()) && is_interval(&right_child.data_type()) { + propagate_time_interval_at_right(left_child, right_child, parent, op, &inverse_op) + } else if is_interval(&left_child.data_type()) + && is_datetime(&right_child.data_type()) + { + propagate_time_interval_at_left(left_child, right_child, parent, op, &inverse_op) + } else { + // First, propagate to the left: + match apply_operator(&inverse_op, parent, right_child)?.intersect(left_child)? { + // Left is feasible: + Some(value) => Ok( + // Propagate to the right using the new left. + propagate_right(&value, parent, right_child, op, &inverse_op)? + .map(|right| (value, right)), + ), + // If the left child is infeasible, short-circuit. + None => Ok(None), } } } @@ -314,9 +299,14 @@ pub fn propagate_comparison( ) -> Result> { if parent == &Interval::CERTAINLY_TRUE { match op { - Operator::Eq => left_child.intersect(right_child).map(|result| { - result.map(|intersection| (intersection.clone(), intersection)) - }), + Operator::Eq | Operator::IsNotDistinctFrom => { + left_child.intersect(right_child).map(|result| { + result.map(|intersection| (intersection.clone(), intersection)) + }) + } + Operator::NotEq | Operator::IsDistinctFrom => left_child + .union(right_child) + .map(|result| result.map(|unin| (unin.clone(), unin))), Operator::Gt => satisfy_greater(left_child, right_child, true), Operator::GtEq => satisfy_greater(left_child, right_child, false), Operator::Lt => satisfy_greater(right_child, left_child, true) @@ -329,7 +319,10 @@ pub fn propagate_comparison( } } else if parent == &Interval::CERTAINLY_FALSE { match op { - Operator::Eq => { + Operator::Eq + | Operator::IsNotDistinctFrom + | Operator::NotEq + | Operator::IsDistinctFrom => { // TODO: Propagation is not possible until we support interval sets. Ok(None) } diff --git a/datafusion/physical-expr/src/intervals/utils.rs b/datafusion/physical-expr/src/intervals/utils.rs index 56af8238c04e..c6d1f2519e4c 100644 --- a/datafusion/physical-expr/src/intervals/utils.rs +++ b/datafusion/physical-expr/src/intervals/utils.rs @@ -17,51 +17,11 @@ //! Utility functions for the interval arithmetic library -use std::sync::Arc; - -use crate::{ - expressions::{BinaryExpr, CastExpr, Column, Literal, NegativeExpr}, - PhysicalExpr, -}; - use arrow::array::types::{IntervalDayTime, IntervalMonthDayNano}; -use arrow_schema::{DataType, SchemaRef}; use datafusion_common::{internal_err, Result, ScalarValue}; use datafusion_expr::interval_arithmetic::Interval; use datafusion_expr::Operator; -/// Indicates whether interval arithmetic is supported for the given expression. -/// Currently, we do not support all [`PhysicalExpr`]s for interval calculations. -/// We do not support every type of [`Operator`]s either. Over time, this check -/// will relax as more types of `PhysicalExpr`s and `Operator`s are supported. -/// Currently, [`CastExpr`], [`NegativeExpr`], [`BinaryExpr`], [`Column`] and [`Literal`] are supported. -pub fn check_support(expr: &Arc, schema: &SchemaRef) -> bool { - let expr_any = expr.as_any(); - if let Some(binary_expr) = expr_any.downcast_ref::() { - is_operator_supported(binary_expr.op()) - && check_support(binary_expr.left(), schema) - && check_support(binary_expr.right(), schema) - } else if let Some(column) = expr_any.downcast_ref::() { - if let Ok(field) = schema.field_with_name(column.name()) { - is_datatype_supported(field.data_type()) - } else { - return false; - } - } else if let Some(literal) = expr_any.downcast_ref::() { - if let Ok(dt) = literal.data_type(schema) { - is_datatype_supported(&dt) - } else { - return false; - } - } else if let Some(cast) = expr_any.downcast_ref::() { - check_support(cast.expr(), schema) - } else if let Some(negative) = expr_any.downcast_ref::() { - check_support(negative.arg(), schema) - } else { - false - } -} - // This function returns the inverse operator of the given operator. pub fn get_inverse_op(op: Operator) -> Result { match op { @@ -73,40 +33,6 @@ pub fn get_inverse_op(op: Operator) -> Result { } } -/// Indicates whether interval arithmetic is supported for the given operator. -pub fn is_operator_supported(op: &Operator) -> bool { - matches!( - op, - &Operator::Plus - | &Operator::Minus - | &Operator::And - | &Operator::Gt - | &Operator::GtEq - | &Operator::Lt - | &Operator::LtEq - | &Operator::Eq - | &Operator::Multiply - | &Operator::Divide - ) -} - -/// Indicates whether interval arithmetic is supported for the given data type. -pub fn is_datatype_supported(data_type: &DataType) -> bool { - matches!( - data_type, - &DataType::Int64 - | &DataType::Int32 - | &DataType::Int16 - | &DataType::Int8 - | &DataType::UInt64 - | &DataType::UInt32 - | &DataType::UInt16 - | &DataType::UInt8 - | &DataType::Float64 - | &DataType::Float32 - ) -} - /// Converts an [`Interval`] of time intervals to one of `Duration`s, if applicable. Otherwise, returns [`None`]. pub fn convert_interval_type_to_duration(interval: &Interval) -> Option { if let (Some(lower), Some(upper)) = ( diff --git a/datafusion/physical-expr/src/scalar_function.rs b/datafusion/physical-expr/src/scalar_function.rs index 1cd4b673ce7f..a1ab1c72b143 100644 --- a/datafusion/physical-expr/src/scalar_function.rs +++ b/datafusion/physical-expr/src/scalar_function.rs @@ -40,6 +40,7 @@ use crate::PhysicalExpr; use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; use arrow_array::Array; +use arrow_schema::SchemaRef; use datafusion_common::{internal_err, DFSchema, Result, ScalarValue}; use datafusion_expr::interval_arithmetic::Interval; use datafusion_expr::sort_properties::ExprProperties; @@ -248,6 +249,10 @@ impl PhysicalExpr for ScalarFunctionExpr { self.fun.evaluate_bounds(children) } + fn supports_bounds_evaluation(&self, schema: &SchemaRef) -> bool { + self.fun.supports_bounds_evaluation(schema) + } + fn propagate_constraints( &self, interval: &Interval, diff --git a/datafusion/physical-optimizer/src/sanity_checker.rs b/datafusion/physical-optimizer/src/sanity_checker.rs index 8edbb0f09114..476f23dfea68 100644 --- a/datafusion/physical-optimizer/src/sanity_checker.rs +++ b/datafusion/physical-optimizer/src/sanity_checker.rs @@ -29,7 +29,6 @@ use datafusion_physical_plan::ExecutionPlan; use datafusion_common::config::{ConfigOptions, OptimizerOptions}; use datafusion_common::plan_err; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_physical_expr::intervals::utils::{check_support, is_datatype_supported}; use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion_physical_plan::joins::SymmetricHashJoinExec; use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties}; @@ -37,6 +36,7 @@ use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties}; use crate::PhysicalOptimizerRule; use datafusion_physical_expr_common::sort_expr::format_physical_sort_requirement_list; use itertools::izip; +use datafusion_physical_expr_common::utils::is_supported_datatype_for_bounds_eval; /// The SanityCheckPlan rule rejects the following query plans: /// 1. Invalid plans containing nodes whose order and/or distribution requirements @@ -113,12 +113,12 @@ pub fn check_finiteness_requirements( /// [`Operator`]: datafusion_expr::Operator fn is_prunable(join: &SymmetricHashJoinExec) -> bool { join.filter().is_some_and(|filter| { - check_support(filter.expression(), &join.schema()) + filter.expression().supports_bounds_evaluation(&join.schema()) && filter .schema() .fields() .iter() - .all(|f| is_datatype_supported(f.data_type())) + .all(|f| is_supported_datatype_for_bounds_eval(f.data_type())) }) } diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 91c44a4139d2..3ab4c9934b53 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -47,7 +47,6 @@ use datafusion_execution::TaskContext; use datafusion_expr::Operator; use datafusion_physical_expr::equivalence::ProjectionMapping; use datafusion_physical_expr::expressions::BinaryExpr; -use datafusion_physical_expr::intervals::utils::check_support; use datafusion_physical_expr::utils::collect_columns; use datafusion_physical_expr::{ analyze, split_conjunction, AcrossPartitions, AnalysisContext, ConstExpr, @@ -175,7 +174,7 @@ impl FilterExec { ) -> Result { let input_stats = input.statistics()?; let schema = input.schema(); - if !check_support(predicate, &schema) { + if !predicate.supports_bounds_evaluation(&schema) { let selectivity = default_selectivity as f64 / 100.0; let mut stats = input_stats.to_inexact(); stats.num_rows = stats.num_rows.with_estimated_selectivity(selectivity); diff --git a/datafusion/sqllogictest/test_files/dates.slt b/datafusion/sqllogictest/test_files/dates.slt index 4425eee33373..7d043bb6c23b 100644 --- a/datafusion/sqllogictest/test_files/dates.slt +++ b/datafusion/sqllogictest/test_files/dates.slt @@ -58,6 +58,7 @@ where d3_date > d2_date + INTERVAL '5 days'; g h + # date and other predicate query T rowsort select i_item_desc From 68784cc22d4fb8970a7922815727e2347fc1c308 Mon Sep 17 00:00:00 2001 From: Christoph Schulze Date: Thu, 6 Feb 2025 13:42:25 +0100 Subject: [PATCH 02/19] fix test code --- datafusion/core/src/physical_optimizer/join_selection.rs | 0 .../core/src/physical_optimizer/pipeline_checker.rs | 0 .../core/tests/physical_optimizer/join_selection.rs | 9 ++++----- 3 files changed, 4 insertions(+), 5 deletions(-) delete mode 100644 datafusion/core/src/physical_optimizer/join_selection.rs delete mode 100644 datafusion/core/src/physical_optimizer/pipeline_checker.rs diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/datafusion/core/src/physical_optimizer/pipeline_checker.rs b/datafusion/core/src/physical_optimizer/pipeline_checker.rs deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/datafusion/core/tests/physical_optimizer/join_selection.rs b/datafusion/core/tests/physical_optimizer/join_selection.rs index ae7adacadb19..996fdec117fd 100644 --- a/datafusion/core/tests/physical_optimizer/join_selection.rs +++ b/datafusion/core/tests/physical_optimizer/join_selection.rs @@ -33,7 +33,6 @@ use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream, TaskCon use datafusion_expr::Operator; use datafusion_physical_expr::expressions::col; use datafusion_physical_expr::expressions::{BinaryExpr, Column, NegativeExpr}; -use datafusion_physical_expr::intervals::utils::check_support; use datafusion_physical_expr::PhysicalExprRef; use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr}; use datafusion_physical_optimizer::join_selection::{ @@ -1072,21 +1071,21 @@ fn check_expr_supported() { Operator::Plus, Arc::new(Column::new("a", 0)), )) as Arc; - assert!(check_support(&supported_expr, &schema)); + assert!(&supported_expr.supports_bounds_evaluation(&schema)); let supported_expr_2 = Arc::new(Column::new("a", 0)) as Arc; - assert!(check_support(&supported_expr_2, &schema)); + assert!(&supported_expr_2.supports_bounds_evaluation(&schema)); let unsupported_expr = Arc::new(BinaryExpr::new( Arc::new(Column::new("a", 0)), Operator::Or, Arc::new(Column::new("a", 0)), )) as Arc; - assert!(!check_support(&unsupported_expr, &schema)); + assert!(!&unsupported_expr.supports_bounds_evaluation(&schema)); let unsupported_expr_2 = Arc::new(BinaryExpr::new( Arc::new(Column::new("a", 0)), Operator::Or, Arc::new(NegativeExpr::new(Arc::new(Column::new("a", 0)))), )) as Arc; - assert!(!check_support(&unsupported_expr_2, &schema)); + assert!(!&unsupported_expr_2.supports_bounds_evaluation(&schema)); } struct TestCase { From bc9eb4dd355bba74b878218da86b6b5248aa433f Mon Sep 17 00:00:00 2001 From: Christoph Schulze Date: Thu, 6 Feb 2025 13:53:30 +0100 Subject: [PATCH 03/19] cargo fmt --- .../expr-common/src/interval_arithmetic.rs | 120 ++++-------------- datafusion/physical-expr-common/src/utils.rs | 2 +- .../physical-expr/src/expressions/binary.rs | 5 +- .../physical-expr/src/expressions/in_list.rs | 8 +- .../physical-optimizer/src/sanity_checker.rs | 6 +- 5 files changed, 37 insertions(+), 104 deletions(-) diff --git a/datafusion/expr-common/src/interval_arithmetic.rs b/datafusion/expr-common/src/interval_arithmetic.rs index 4c1b74c1f71f..4dd67fc37e87 100644 --- a/datafusion/expr-common/src/interval_arithmetic.rs +++ b/datafusion/expr-common/src/interval_arithmetic.rs @@ -2195,10 +2195,7 @@ mod tests { ), ( Interval::new(Date64(Some(1)), Date64(Some(1))), - Interval::new( - Date64(Some(-1)), - Date64(Some(-1)), - ), + Interval::new(Date64(Some(-1)), Date64(Some(-1))), ), ( Interval::new( @@ -2211,14 +2208,8 @@ mod tests { ), ), ( - Interval::new( - DurationSecond(Some(1)), - DurationSecond(Some(10)), - ), - Interval::new( - DurationSecond(Some(-10)), - DurationSecond(Some(-1)), - ), + Interval::new(DurationSecond(Some(1)), DurationSecond(Some(10))), + Interval::new(DurationSecond(Some(-10)), DurationSecond(Some(-1))), ), ]; for (first, second) in exactly_gt_cases { @@ -2258,10 +2249,7 @@ mod tests { )?, ), ( - Interval::new( - Date64(Some(1)), - Date64(Some(10)), - ), + Interval::new(Date64(Some(1)), Date64(Some(10))), Interval::new(Date64(Some(1)), Date64(Some(1))), ), ( @@ -2275,14 +2263,8 @@ mod tests { ), ), ( - Interval::new( - DurationSecond(Some(1)), - DurationSecond(Some(10)), - ), - Interval::new( - DurationSecond(Some(1)), - DurationSecond(Some(1)), - ), + Interval::new(DurationSecond(Some(1)), DurationSecond(Some(10))), + Interval::new(DurationSecond(Some(1)), DurationSecond(Some(1))), ), ]; for (first, second) in possibly_gt_cases { @@ -2322,14 +2304,8 @@ mod tests { )?, ), ( - Interval::new( - Date64(Some(1)), - Date64(Some(10)), - ), - Interval::new( - Date64(Some(10)), - Date64(Some(100)), - ), + Interval::new(Date64(Some(1)), Date64(Some(10))), + Interval::new(Date64(Some(10)), Date64(Some(100))), ), ( Interval::new( @@ -2342,14 +2318,8 @@ mod tests { ), ), ( - Interval::new( - DurationSecond(Some(-10)), - DurationSecond(Some(-1)), - ), - Interval::new( - DurationSecond(Some(1)), - DurationSecond(Some(1)), - ), + Interval::new(DurationSecond(Some(-10)), DurationSecond(Some(-1))), + Interval::new(DurationSecond(Some(1)), DurationSecond(Some(1))), ), ]; for (first, second) in not_gt_cases { @@ -2418,14 +2388,8 @@ mod tests { ), ), ( - Interval::new( - DurationSecond(Some(-10)), - DurationSecond(Some(1)), - ), - Interval::new( - DurationSecond(Some(-10)), - DurationSecond(Some(-10)), - ), + Interval::new(DurationSecond(Some(-10)), DurationSecond(Some(1))), + Interval::new(DurationSecond(Some(-10)), DurationSecond(Some(-10))), ), ]; for (first, second) in exactly_gteq_cases { @@ -2485,14 +2449,8 @@ mod tests { ), ), ( - Interval::new( - DurationSecond(Some(-10)), - DurationSecond(Some(1)), - ), - Interval::new( - DurationSecond(None), - DurationSecond(Some(0)), - ), + Interval::new(DurationSecond(Some(-10)), DurationSecond(Some(1))), + Interval::new(DurationSecond(None), DurationSecond(Some(0))), ), ]; for (first, second) in possibly_gteq_cases { @@ -2548,14 +2506,8 @@ mod tests { ), ), ( - Interval::new( - DurationSecond(None), - DurationSecond(Some(-1)), - ), - Interval::new( - DurationSecond(Some(0)), - DurationSecond(Some(1)), - ), + Interval::new(DurationSecond(None), DurationSecond(Some(-1))), + Interval::new(DurationSecond(Some(0)), DurationSecond(Some(1))), ), ]; for (first, second) in not_gteq_cases { @@ -2586,14 +2538,8 @@ mod tests { Interval::make(Some(f64::MIN), Some(f64::MIN))?, ), ( - Interval::new( - Date64(Some(1000)), - Date64(Some(1000)), - ), - Interval::new( - Date64(Some(1000)), - Date64(Some(1000)), - ), + Interval::new(Date64(Some(1000)), Date64(Some(1000))), + Interval::new(Date64(Some(1000)), Date64(Some(1000))), ), ( Interval::new( @@ -2606,24 +2552,12 @@ mod tests { ), ), ( - Interval::new( - IntervalYearMonth(Some(10)), - IntervalYearMonth(Some(10)), - ), - Interval::new( - IntervalYearMonth(Some(10)), - IntervalYearMonth(Some(10)), - ), + Interval::new(IntervalYearMonth(Some(10)), IntervalYearMonth(Some(10))), + Interval::new(IntervalYearMonth(Some(10)), IntervalYearMonth(Some(10))), ), ( - Interval::new( - DurationSecond(Some(10)), - DurationSecond(Some(10)), - ), - Interval::new( - DurationSecond(Some(10)), - DurationSecond(Some(10)), - ), + Interval::new(DurationSecond(Some(10)), DurationSecond(Some(10))), + Interval::new(DurationSecond(Some(10)), DurationSecond(Some(10))), ), ]; for (first, second) in exactly_eq_cases { @@ -2991,14 +2925,8 @@ mod tests { Interval::CERTAINLY_TRUE, ), ( - Interval::new( - DurationSecond(Some(0)), - DurationSecond(Some(600)), - ), - Interval::new( - DurationSecond(Some(1)), - DurationSecond(Some(599)), - ), + Interval::new(DurationSecond(Some(0)), DurationSecond(Some(600))), + Interval::new(DurationSecond(Some(1)), DurationSecond(Some(599))), Interval::CERTAINLY_TRUE, ), ( diff --git a/datafusion/physical-expr-common/src/utils.rs b/datafusion/physical-expr-common/src/utils.rs index c55fccdedc73..abf488ca8240 100644 --- a/datafusion/physical-expr-common/src/utils.rs +++ b/datafusion/physical-expr-common/src/utils.rs @@ -26,7 +26,7 @@ use datafusion_expr_common::sort_properties::ExprProperties; use crate::physical_expr::PhysicalExpr; use crate::sort_expr::{LexOrdering, PhysicalSortExpr}; use crate::tree_node::ExprContext; -use arrow::datatypes::{DataType}; +use arrow::datatypes::DataType; /// Represents a [`PhysicalExpr`] node with associated properties (order and /// range) in a context where properties are tracked. diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index e0a4717b93e9..cc4f27f33dd4 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -382,7 +382,10 @@ impl PhysicalExpr for BinaryExpr { self.left.data_type(schema.as_ref()), self.right.data_type(schema.as_ref()), ) { - if BinaryTypeCoercer::new(&lhs, &self.op, &rhs).get_result_type().is_err() { + if BinaryTypeCoercer::new(&lhs, &self.op, &rhs) + .get_result_type() + .is_err() + { return false; } } diff --git a/datafusion/physical-expr/src/expressions/in_list.rs b/datafusion/physical-expr/src/expressions/in_list.rs index ea2c6a1b1a41..0f89961c510f 100644 --- a/datafusion/physical-expr/src/expressions/in_list.rs +++ b/datafusion/physical-expr/src/expressions/in_list.rs @@ -44,8 +44,8 @@ use datafusion_expr::ColumnarValue; use datafusion_physical_expr_common::datum::compare_with_eq; use ahash::RandomState; -use datafusion_expr::interval_arithmetic::Interval; use datafusion_common::HashMap; +use datafusion_expr::interval_arithmetic::Interval; use hashbrown::hash_map::RawEntryMut; /// InList @@ -445,9 +445,9 @@ impl PhysicalExpr for InListExpr { fn supports_bounds_evaluation(&self, schema: &SchemaRef) -> bool { self.expr.supports_bounds_evaluation(schema) && self - .list - .iter() - .all(|expr| expr.supports_bounds_evaluation(schema)) + .list + .iter() + .all(|expr| expr.supports_bounds_evaluation(schema)) } } diff --git a/datafusion/physical-optimizer/src/sanity_checker.rs b/datafusion/physical-optimizer/src/sanity_checker.rs index 476f23dfea68..cbdf1773d8f3 100644 --- a/datafusion/physical-optimizer/src/sanity_checker.rs +++ b/datafusion/physical-optimizer/src/sanity_checker.rs @@ -35,8 +35,8 @@ use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties}; use crate::PhysicalOptimizerRule; use datafusion_physical_expr_common::sort_expr::format_physical_sort_requirement_list; -use itertools::izip; use datafusion_physical_expr_common::utils::is_supported_datatype_for_bounds_eval; +use itertools::izip; /// The SanityCheckPlan rule rejects the following query plans: /// 1. Invalid plans containing nodes whose order and/or distribution requirements @@ -113,7 +113,9 @@ pub fn check_finiteness_requirements( /// [`Operator`]: datafusion_expr::Operator fn is_prunable(join: &SymmetricHashJoinExec) -> bool { join.filter().is_some_and(|filter| { - filter.expression().supports_bounds_evaluation(&join.schema()) + filter + .expression() + .supports_bounds_evaluation(&join.schema()) && filter .schema() .fields() From 5960170a276e4ed7b98563dc42870a6b7ffe95d4 Mon Sep 17 00:00:00 2001 From: Christoph Schulze Date: Thu, 6 Feb 2025 13:58:38 +0100 Subject: [PATCH 04/19] clean up --- datafusion/physical-expr-common/src/expressions/column.rs | 0 datafusion/sqllogictest/test_files/dates.slt | 1 - 2 files changed, 1 deletion(-) delete mode 100644 datafusion/physical-expr-common/src/expressions/column.rs diff --git a/datafusion/physical-expr-common/src/expressions/column.rs b/datafusion/physical-expr-common/src/expressions/column.rs deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/datafusion/sqllogictest/test_files/dates.slt b/datafusion/sqllogictest/test_files/dates.slt index 7d043bb6c23b..4425eee33373 100644 --- a/datafusion/sqllogictest/test_files/dates.slt +++ b/datafusion/sqllogictest/test_files/dates.slt @@ -58,7 +58,6 @@ where d3_date > d2_date + INTERVAL '5 days'; g h - # date and other predicate query T rowsort select i_item_desc From fb1d681062ac6349f9d6d8b4dfc9454f571317c5 Mon Sep 17 00:00:00 2001 From: Christoph Schulze Date: Thu, 6 Feb 2025 17:40:31 +0100 Subject: [PATCH 05/19] fix interval type coercion --- datafusion/expr-common/Cargo.toml | 2 +- .../expr-common/src/type_coercion/binary.rs | 22 +++++++++---------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/datafusion/expr-common/Cargo.toml b/datafusion/expr-common/Cargo.toml index 4cc758861565..1dfcf1bbf9dc 100644 --- a/datafusion/expr-common/Cargo.toml +++ b/datafusion/expr-common/Cargo.toml @@ -43,4 +43,4 @@ itertools = { workspace = true } paste = "^1.0" [dev-dependencies] -arrow-buffer = { workspace = true } \ No newline at end of file +arrow-buffer = { workspace = true } diff --git a/datafusion/expr-common/src/type_coercion/binary.rs b/datafusion/expr-common/src/type_coercion/binary.rs index 5159333b9f56..1b612bfabce1 100644 --- a/datafusion/expr-common/src/type_coercion/binary.rs +++ b/datafusion/expr-common/src/type_coercion/binary.rs @@ -1485,7 +1485,7 @@ fn null_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option { mod tests { use super::*; use arrow::datatypes::IntervalUnit::{MonthDayNano, YearMonth}; - + use arrow::datatypes::TimeUnit::Nanosecond; use datafusion_common::assert_contains; #[test] @@ -1665,7 +1665,7 @@ mod tests { #[test] fn test_date_timestamp_arithmetic_error() -> Result<()> { let (lhs, rhs) = BinaryTypeCoercer::new( - &DataType::Timestamp(TimeUnit::Nanosecond, None), + &DataType::Timestamp(Nanosecond, None), &Operator::Minus, &DataType::Timestamp(TimeUnit::Millisecond, None), ) @@ -1760,33 +1760,33 @@ mod tests { ); test_coercion_binary_rule!( DataType::Utf8, - DataType::Time64(TimeUnit::Nanosecond), + DataType::Time64(Nanosecond), Operator::Eq, - DataType::Time64(TimeUnit::Nanosecond) + DataType::Time64(Nanosecond) ); test_coercion_binary_rule!( DataType::Utf8, DataType::Timestamp(TimeUnit::Second, None), Operator::Lt, - DataType::Timestamp(TimeUnit::Nanosecond, None) + DataType::Timestamp(Nanosecond, None) ); test_coercion_binary_rule!( DataType::Utf8, DataType::Timestamp(TimeUnit::Millisecond, None), Operator::Lt, - DataType::Timestamp(TimeUnit::Nanosecond, None) + DataType::Timestamp(Nanosecond, None) ); test_coercion_binary_rule!( DataType::Utf8, DataType::Timestamp(TimeUnit::Microsecond, None), Operator::Lt, - DataType::Timestamp(TimeUnit::Nanosecond, None) + DataType::Timestamp(Nanosecond, None) ); test_coercion_binary_rule!( DataType::Utf8, - DataType::Timestamp(TimeUnit::Nanosecond, None), + DataType::Timestamp(Nanosecond, None), Operator::Lt, - DataType::Timestamp(TimeUnit::Nanosecond, None) + DataType::Timestamp(Nanosecond, None) ); test_coercion_binary_rule!( DataType::Utf8, @@ -1885,9 +1885,9 @@ mod tests { ); test_coercion_binary_rule!( DataType::Duration(TimeUnit::Second), - DataType::Duration(TimeUnit::Nanosecond), + DataType::Duration(Nanosecond), Operator::Plus, - DataType::Duration(TimeUnit::Nanosecond) + DataType::Interval(MonthDayNano) ); test_coercion_binary_rule!( DataType::Interval(YearMonth), From 4ea39eb04360318ed2146bc83c1565823e6cae71 Mon Sep 17 00:00:00 2001 From: Christoph Schulze Date: Tue, 11 Feb 2025 18:17:06 +0100 Subject: [PATCH 06/19] remove NotEq from supported operators --- datafusion/expr-common/src/operator.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/expr-common/src/operator.rs b/datafusion/expr-common/src/operator.rs index 2f471c898560..808e009192e5 100644 --- a/datafusion/expr-common/src/operator.rs +++ b/datafusion/expr-common/src/operator.rs @@ -176,7 +176,6 @@ impl Operator { | &Operator::Lt | &Operator::LtEq | &Operator::Eq - | &Operator::NotEq | &Operator::Multiply | &Operator::Divide | &Operator::IsDistinctFrom From b1b8aec19404b9f32943d294233d56a2b7619e62 Mon Sep 17 00:00:00 2001 From: Christoph Schulze Date: Wed, 12 Feb 2025 14:19:51 +0100 Subject: [PATCH 07/19] fix imports --- datafusion/physical-expr/src/expressions/literal.rs | 2 +- datafusion/physical-expr/src/expressions/negative.rs | 2 +- datafusion/physical-expr/src/scalar_function.rs | 3 +-- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/literal.rs b/datafusion/physical-expr/src/expressions/literal.rs index d3589f1defa0..3162739ba283 100644 --- a/datafusion/physical-expr/src/expressions/literal.rs +++ b/datafusion/physical-expr/src/expressions/literal.rs @@ -27,7 +27,7 @@ use arrow::{ datatypes::{DataType, Schema}, record_batch::RecordBatch, }; -use arrow_schema::SchemaRef; +use arrow::datatypes::SchemaRef; use datafusion_common::{Result, ScalarValue}; use datafusion_expr::interval_arithmetic::Interval; use datafusion_expr::sort_properties::{ExprProperties, SortProperties}; diff --git a/datafusion/physical-expr/src/expressions/negative.rs b/datafusion/physical-expr/src/expressions/negative.rs index b5d82db5117c..0ddb0965eb43 100644 --- a/datafusion/physical-expr/src/expressions/negative.rs +++ b/datafusion/physical-expr/src/expressions/negative.rs @@ -28,7 +28,7 @@ use arrow::{ datatypes::{DataType, Schema}, record_batch::RecordBatch, }; -use arrow_schema::SchemaRef; +use arrow::datatypes::SchemaRef; use datafusion_common::{plan_err, Result}; use datafusion_expr::interval_arithmetic::Interval; use datafusion_expr::sort_properties::ExprProperties; diff --git a/datafusion/physical-expr/src/scalar_function.rs b/datafusion/physical-expr/src/scalar_function.rs index a93e51d30ffd..790bc7224101 100644 --- a/datafusion/physical-expr/src/scalar_function.rs +++ b/datafusion/physical-expr/src/scalar_function.rs @@ -38,8 +38,7 @@ use crate::expressions::Literal; use crate::PhysicalExpr; use arrow::array::{Array, RecordBatch}; -use arrow::datatypes::{DataType, Schema}; -use arrow_schema::SchemaRef; +use arrow::datatypes::{DataType, Schema, SchemaRef}; use datafusion_common::{internal_err, DFSchema, Result, ScalarValue}; use datafusion_expr::interval_arithmetic::Interval; use datafusion_expr::sort_properties::ExprProperties; From cb11459387dac20c35963b7a85aca6a2b635df0d Mon Sep 17 00:00:00 2001 From: Christoph Schulze Date: Wed, 12 Feb 2025 14:23:14 +0100 Subject: [PATCH 08/19] cargo fmt --- datafusion/physical-expr/src/expressions/literal.rs | 2 +- datafusion/physical-expr/src/expressions/negative.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/literal.rs b/datafusion/physical-expr/src/expressions/literal.rs index 3162739ba283..5a3da5f58d9b 100644 --- a/datafusion/physical-expr/src/expressions/literal.rs +++ b/datafusion/physical-expr/src/expressions/literal.rs @@ -23,11 +23,11 @@ use std::sync::Arc; use crate::physical_expr::PhysicalExpr; +use arrow::datatypes::SchemaRef; use arrow::{ datatypes::{DataType, Schema}, record_batch::RecordBatch, }; -use arrow::datatypes::SchemaRef; use datafusion_common::{Result, ScalarValue}; use datafusion_expr::interval_arithmetic::Interval; use datafusion_expr::sort_properties::{ExprProperties, SortProperties}; diff --git a/datafusion/physical-expr/src/expressions/negative.rs b/datafusion/physical-expr/src/expressions/negative.rs index 0ddb0965eb43..5984d6b6764b 100644 --- a/datafusion/physical-expr/src/expressions/negative.rs +++ b/datafusion/physical-expr/src/expressions/negative.rs @@ -23,12 +23,12 @@ use std::sync::Arc; use crate::PhysicalExpr; +use arrow::datatypes::SchemaRef; use arrow::{ compute::kernels::numeric::neg_wrapping, datatypes::{DataType, Schema}, record_batch::RecordBatch, }; -use arrow::datatypes::SchemaRef; use datafusion_common::{plan_err, Result}; use datafusion_expr::interval_arithmetic::Interval; use datafusion_expr::sort_properties::ExprProperties; From b4bd85103961de6ed13a3a1864061f82fff37e3e Mon Sep 17 00:00:00 2001 From: Christoph Schulze Date: Thu, 13 Feb 2025 17:36:51 +0100 Subject: [PATCH 09/19] revisit union interval logic --- .../expr-common/src/interval_arithmetic.rs | 103 ++++++++++-------- 1 file changed, 56 insertions(+), 47 deletions(-) diff --git a/datafusion/expr-common/src/interval_arithmetic.rs b/datafusion/expr-common/src/interval_arithmetic.rs index 4dd67fc37e87..6157e16b1bc1 100644 --- a/datafusion/expr-common/src/interval_arithmetic.rs +++ b/datafusion/expr-common/src/interval_arithmetic.rs @@ -420,6 +420,10 @@ impl Interval { self.lower.is_null() && self.upper.is_null() } + pub fn is_not_null(&self) -> bool { + !(self.lower.is_null() || self.upper.is_null()) + } + pub const CERTAINLY_FALSE: Self = Self { lower: ScalarValue::Boolean(Some(false)), upper: ScalarValue::Boolean(Some(false)), @@ -620,57 +624,62 @@ impl Interval { /// to an error. pub fn union>(&self, other: T) -> Result> { let rhs = other.borrow(); - if self.data_type().ne(&rhs.data_type()) { - BinaryTypeCoercer::new(&self.data_type(), &Operator::Plus, &rhs.data_type()).get_result_type() - .map_err(|e| - DataFusionError::Internal( - format!( - "Cannot coerce data types for interval union, lhs:{}, rhs:{}. internal error: {}", - self.data_type(), - rhs.data_type(), - e - )) - )?; + return internal_err!( + "Cannot calculate the union of intervals with different data types, lhs:{}, rhs:{}", + self.data_type(), + rhs.data_type() + ); }; - // If the upper bound of one side is less than the lower bound of the - // other side or vice versa, then the resulting interval is expanded - // accordingly. Note that, this can only happen if one side has a null - // value. - // - // Examples: - // [1, 2] ∪ [3, NULL] = [1, 3] - // [3, NULL] ∪ [1, NULL] = [1, 3] - // [3, NULL] ∪ [NULL, 1] = [1, 3] - let (lower, upper) = if (!(self.lower.is_null() || rhs.upper.is_null()) - && self.lower > rhs.upper) - || (!(self.upper.is_null() || rhs.lower.is_null()) && self.upper < rhs.lower) - { - ( - min_of_bounds( - &min_of_bounds(&min_of_bounds(&self.lower, &rhs.lower), &self.upper), - &rhs.upper, - ), - max_of_bounds( - &max_of_bounds(&max_of_bounds(&self.lower, &rhs.lower), &self.upper), - &rhs.upper, - ), - ) - } else { - ( - min_of_bounds(&self.lower, &rhs.lower), - max_of_bounds(&self.upper, &rhs.upper), - ) + let lower_bound = match (&self.lower.is_null(), &rhs.lower.is_null()) { + (false, false) => Some(min_of_bounds(&self.lower, &rhs.lower)), + (false, true) => Some(self.lower.clone()), + (true, false) => Some(rhs.lower.clone()), + (true, true) => None, + }; + let upper_bound = match (&self.upper.is_null(), &rhs.upper.is_null()) { + (false, false) => Some(max_of_bounds(&self.upper, &rhs.upper)), + (false, true) => Some(self.upper.clone()), + (true, false) => Some(rhs.upper.clone()), + (true, true) => None, }; - // New lower and upper bounds must always construct a valid interval. - assert!( - lower.is_null() || upper.is_null() || (lower <= upper), - "The union of two intervals can not be an invalid interval" - ); + // If the intervals overlap or touch, return a single merged interval + if (self.is_not_null() && rhs.is_not_null()) && self.upper >= rhs.lower + || rhs.upper >= self.lower + { + return Ok(Some(Self::new(lower_bound.unwrap(), upper_bound.unwrap()))); + } - Ok(Some(Self { lower, upper })) + // Handle non-overlapping intervals since interval sets are not supported + // TODO: with interval sets, we should return a set of disjoint intervals + let mut lower_value = + lower_bound.unwrap_or(ScalarValue::try_from(self.lower.data_type())?); + let mut upper_value = + upper_bound.unwrap_or(ScalarValue::try_from(self.lower.data_type())?); + + // If both directions are unbounded, return unbounded set + // e.g. {10,None} ∪ {None,2} = {None, None} + if !(lower_value.is_null() || upper_value.is_null()) && lower_value > upper_value + { + return Ok(Some(Self::make_unbounded(&lower_value.data_type())?)); + } + + // If only one direction has a bound, the other direction is unbounded + // e.g. {5,NULL} ∪ {1,2} = {1,NULL} + if (self.upper.is_null() && !self.lower.is_null() && self.lower > rhs.upper) + || (rhs.upper.is_null() && !rhs.lower.is_null() && rhs.lower > self.upper) + { + upper_value = ScalarValue::try_new_null(&self.upper.data_type())?; + } + if (self.lower.is_null() && !self.upper.is_null() && self.upper < rhs.lower) + || (rhs.lower.is_null() && !rhs.upper.is_null() && rhs.upper < self.lower) + { + lower_value = ScalarValue::try_new_null(&self.lower.data_type())?; + } + + Ok(Some(Interval::new(lower_value, upper_value))) } /// Compute the intersection of this interval with the given interval. @@ -2850,12 +2859,12 @@ mod tests { ( Interval::make(Some(1000_i64), None)?, Interval::make(None, Some(10_i64))?, - Interval::make(Some(10_i64), Some(1000_i64))?, + Interval::make::(None, None)?, ), ( Interval::make(Some(1000_i64), None)?, Interval::make(Some(1_i64), Some(10_i64))?, - Interval::make(Some(1_i64), Some(1000_i64))?, + Interval::make(Some(1_i64), None)?, ), ( Interval::make(None, Some(2000_u64))?, From aaf116b7a96fac7a1a4ffe902cfbb85d6be67ac2 Mon Sep 17 00:00:00 2001 From: Christoph Schulze Date: Fri, 14 Feb 2025 21:07:27 +0100 Subject: [PATCH 10/19] revisit union interval logic --- datafusion/expr-common/src/interval_arithmetic.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/datafusion/expr-common/src/interval_arithmetic.rs b/datafusion/expr-common/src/interval_arithmetic.rs index 6157e16b1bc1..ddba0cd391fc 100644 --- a/datafusion/expr-common/src/interval_arithmetic.rs +++ b/datafusion/expr-common/src/interval_arithmetic.rs @@ -646,8 +646,9 @@ impl Interval { }; // If the intervals overlap or touch, return a single merged interval - if (self.is_not_null() && rhs.is_not_null()) && self.upper >= rhs.lower - || rhs.upper >= self.lower + if self.is_not_null() + && rhs.is_not_null() + && (self.upper >= rhs.lower || rhs.upper >= self.lower) { return Ok(Some(Self::new(lower_bound.unwrap(), upper_bound.unwrap()))); } From 5d65ba168a11d7f8c2628de71e7b3043392a6f69 Mon Sep 17 00:00:00 2001 From: Christoph Schulze Date: Tue, 18 Feb 2025 13:38:34 +0100 Subject: [PATCH 11/19] treat null as unbounded --- .../expr-common/src/interval_arithmetic.rs | 91 +++++++------------ 1 file changed, 32 insertions(+), 59 deletions(-) diff --git a/datafusion/expr-common/src/interval_arithmetic.rs b/datafusion/expr-common/src/interval_arithmetic.rs index ddba0cd391fc..5f492f96e413 100644 --- a/datafusion/expr-common/src/interval_arithmetic.rs +++ b/datafusion/expr-common/src/interval_arithmetic.rs @@ -622,65 +622,38 @@ impl Interval { /// NOTE: This function only works with intervals of the same data type. /// Attempting to compare intervals of different data types will lead /// to an error. - pub fn union>(&self, other: T) -> Result> { + pub fn union>(&self, other: T) -> Result { let rhs = other.borrow(); if self.data_type().ne(&rhs.data_type()) { return internal_err!( - "Cannot calculate the union of intervals with different data types, lhs:{}, rhs:{}", - self.data_type(), - rhs.data_type() - ); - }; - - let lower_bound = match (&self.lower.is_null(), &rhs.lower.is_null()) { - (false, false) => Some(min_of_bounds(&self.lower, &rhs.lower)), - (false, true) => Some(self.lower.clone()), - (true, false) => Some(rhs.lower.clone()), - (true, true) => None, - }; - let upper_bound = match (&self.upper.is_null(), &rhs.upper.is_null()) { - (false, false) => Some(max_of_bounds(&self.upper, &rhs.upper)), - (false, true) => Some(self.upper.clone()), - (true, false) => Some(rhs.upper.clone()), - (true, true) => None, + "Cannot calculate the union of intervals with different data types, lhs:{}, rhs:{}", + self.data_type(), + rhs.data_type() + ); }; - // If the intervals overlap or touch, return a single merged interval - if self.is_not_null() - && rhs.is_not_null() - && (self.upper >= rhs.lower || rhs.upper >= self.lower) + let lower = if self.lower.is_null() + || (!rhs.lower.is_null() && self.lower <= rhs.lower) { - return Ok(Some(Self::new(lower_bound.unwrap(), upper_bound.unwrap()))); - } - - // Handle non-overlapping intervals since interval sets are not supported - // TODO: with interval sets, we should return a set of disjoint intervals - let mut lower_value = - lower_bound.unwrap_or(ScalarValue::try_from(self.lower.data_type())?); - let mut upper_value = - upper_bound.unwrap_or(ScalarValue::try_from(self.lower.data_type())?); - - // If both directions are unbounded, return unbounded set - // e.g. {10,None} ∪ {None,2} = {None, None} - if !(lower_value.is_null() || upper_value.is_null()) && lower_value > upper_value + self.lower.clone() + } else { + rhs.lower.clone() + }; + let upper = if self.upper.is_null() + || (!rhs.upper.is_null() && self.upper >= rhs.upper) { - return Ok(Some(Self::make_unbounded(&lower_value.data_type())?)); - } + self.upper.clone() + } else { + rhs.upper.clone() + }; - // If only one direction has a bound, the other direction is unbounded - // e.g. {5,NULL} ∪ {1,2} = {1,NULL} - if (self.upper.is_null() && !self.lower.is_null() && self.lower > rhs.upper) - || (rhs.upper.is_null() && !rhs.lower.is_null() && rhs.lower > self.upper) - { - upper_value = ScalarValue::try_new_null(&self.upper.data_type())?; - } - if (self.lower.is_null() && !self.upper.is_null() && self.upper < rhs.lower) - || (rhs.lower.is_null() && !rhs.upper.is_null() && rhs.upper < self.lower) - { - lower_value = ScalarValue::try_new_null(&self.lower.data_type())?; - } + // New lower and upper bounds must always construct a valid interval. + debug_assert!( + (lower.is_null() || upper.is_null() || (lower <= upper)), + "The union of two intervals can not be an invalid interval" + ); - Ok(Some(Interval::new(lower_value, upper_value))) + Ok(Self { lower, upper }) } /// Compute the intersection of this interval with the given interval. @@ -2815,7 +2788,7 @@ mod tests { ( Interval::make(Some(1000_i64), None)?, Interval::make::(None, None)?, - Interval::make(Some(1000_i64), None)?, + Interval::make_unbounded(&DataType::Int64)?, ), ( Interval::make(Some(1000_i64), None)?, @@ -2825,17 +2798,17 @@ mod tests { ( Interval::make(Some(1000_i64), None)?, Interval::make(None, Some(2000_i64))?, - Interval::make(Some(1000_i64), Some(2000_i64))?, + Interval::make_unbounded(&DataType::Int64)?, ), ( Interval::make::(None, None)?, Interval::make(Some(1000_i64), Some(2000_i64))?, - Interval::make(Some(1000_i64), Some(2000_i64))?, + Interval::make_unbounded(&DataType::Int64)?, ), ( Interval::make(Some(1000_i64), Some(2000_i64))?, Interval::make::(None, None)?, - Interval::make(Some(1000_i64), Some(2000_i64))?, + Interval::make_unbounded(&DataType::Int64)?, ), ( Interval::make::(None, None)?, @@ -2845,7 +2818,7 @@ mod tests { ( Interval::make(Some(1000_i64), Some(2000_i64))?, Interval::make(Some(1000_i64), None)?, - Interval::make(Some(1000_i64), Some(2000_i64))?, + Interval::make(Some(1000_i64), None)?, ), ( Interval::make(Some(1000_i64), Some(2000_i64))?, @@ -2870,17 +2843,17 @@ mod tests { ( Interval::make(None, Some(2000_u64))?, Interval::make(Some(500_u64), None)?, - Interval::make(Some(0_u64), Some(2000_u64))?, + Interval::make_unbounded(&DataType::UInt64)?, ), ( Interval::make(Some(0_u64), Some(0_u64))?, Interval::make(Some(0_u64), None)?, - Interval::make(Some(0_u64), Some(0_u64))?, + Interval::make(Some(0_u64), None)?, ), ( Interval::make(Some(1000.0_f32), None)?, Interval::make(None, Some(1000.0_f32))?, - Interval::make(Some(1000.0_f32), Some(1000.0_f32))?, + Interval::make_unbounded(&DataType::Float32)?, ), ( Interval::make(Some(1000.0_f32), Some(1500.0_f32))?, @@ -2904,7 +2877,7 @@ mod tests { ), ]; for (first, second, expected) in possible_cases { - assert_eq!(first.union(second.clone())?.unwrap(), expected) + assert_eq!(first.union(second.clone())?, expected) } Ok(()) From 4642bfaa919d9fb88f5917a3f332a9a29aacf711 Mon Sep 17 00:00:00 2001 From: Christoph Schulze Date: Tue, 18 Feb 2025 14:42:19 +0100 Subject: [PATCH 12/19] clean up --- .../physical-expr/src/expressions/in_list.rs | 23 ++++++++----------- .../physical-expr/src/intervals/cp_solver.rs | 2 +- 2 files changed, 10 insertions(+), 15 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/in_list.rs b/datafusion/physical-expr/src/expressions/in_list.rs index 0f89961c510f..cc7a86581e49 100644 --- a/datafusion/physical-expr/src/expressions/in_list.rs +++ b/datafusion/physical-expr/src/expressions/in_list.rs @@ -419,21 +419,16 @@ impl PhysicalExpr for InListExpr { fn evaluate_bounds(&self, children: &[&Interval]) -> Result { let expr_bounds = children[0]; + debug_assert!( + children.len() >= 2, + "InListExpr requires at least one list item" + ); + // conjunction of list item intervals - let mut list_bounds = children[1].clone(); - if children.len() > 2 { - list_bounds = children[2..] - .iter() - .try_fold(Some(list_bounds), |acc, item| { - if let Some(acc) = acc { - acc.union(*item) - } else { - Some(Interval::make_unbounded(&expr_bounds.data_type())) - .transpose() - } - })? - .unwrap_or(Interval::make_unbounded(&expr_bounds.data_type())?); - } + let list_bounds = children + .iter() + .skip(2) + .try_fold(children[1].clone(), |acc, item| acc.union(*item))?; if self.negated { expr_bounds.contains(list_bounds)?.boolean_negate() diff --git a/datafusion/physical-expr/src/intervals/cp_solver.rs b/datafusion/physical-expr/src/intervals/cp_solver.rs index c17cdce5f458..1dc5da2005f2 100644 --- a/datafusion/physical-expr/src/intervals/cp_solver.rs +++ b/datafusion/physical-expr/src/intervals/cp_solver.rs @@ -306,7 +306,7 @@ pub fn propagate_comparison( } Operator::NotEq | Operator::IsDistinctFrom => left_child .union(right_child) - .map(|result| result.map(|unin| (unin.clone(), unin))), + .map(|union| Some((union.clone(), union.clone()))), Operator::Gt => satisfy_greater(left_child, right_child, true), Operator::GtEq => satisfy_greater(left_child, right_child, false), Operator::Lt => satisfy_greater(right_child, left_child, true) From 12f7b3c170d08b3b06b6b0431a0cd1cf94830c7f Mon Sep 17 00:00:00 2001 From: Christoph Schulze Date: Wed, 19 Feb 2025 14:58:52 +0100 Subject: [PATCH 13/19] csv source yields too many column stats --- datafusion/physical-expr/src/analysis.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-expr/src/analysis.rs b/datafusion/physical-expr/src/analysis.rs index 5abd50f6d1b4..d2e2fe0baaaa 100644 --- a/datafusion/physical-expr/src/analysis.rs +++ b/datafusion/physical-expr/src/analysis.rs @@ -63,10 +63,13 @@ impl AnalysisContext { input_schema: &Schema, statistics: &[ColumnStatistics], ) -> Result { - statistics + input_schema + .fields .iter() .enumerate() - .map(|(idx, stats)| ExprBoundaries::try_from_column(input_schema, stats, idx)) + .map(|(idx, stats)| { + ExprBoundaries::try_from_column(input_schema, &statistics[idx], idx) + }) .collect::>>() .map(Self::new) } From cbeb26c9e4edf18199145e85b2ef2198a8123aaf Mon Sep 17 00:00:00 2001 From: Christoph Schulze Date: Wed, 19 Feb 2025 15:20:44 +0100 Subject: [PATCH 14/19] clippy --- datafusion/physical-expr/src/analysis.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/datafusion/physical-expr/src/analysis.rs b/datafusion/physical-expr/src/analysis.rs index d2e2fe0baaaa..b50f814c712b 100644 --- a/datafusion/physical-expr/src/analysis.rs +++ b/datafusion/physical-expr/src/analysis.rs @@ -63,11 +63,8 @@ impl AnalysisContext { input_schema: &Schema, statistics: &[ColumnStatistics], ) -> Result { - input_schema - .fields - .iter() - .enumerate() - .map(|(idx, stats)| { + (0..statistics.len()) + .map(|idx| { ExprBoundaries::try_from_column(input_schema, &statistics[idx], idx) }) .collect::>>() From ec9f8beb9dfb8d3d0b927a761146107d5a50fbd1 Mon Sep 17 00:00:00 2001 From: Christoph Schulze Date: Wed, 19 Feb 2025 15:24:32 +0100 Subject: [PATCH 15/19] csv source yields too many column stats --- datafusion/physical-expr/src/analysis.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-expr/src/analysis.rs b/datafusion/physical-expr/src/analysis.rs index b50f814c712b..51b80b1d971b 100644 --- a/datafusion/physical-expr/src/analysis.rs +++ b/datafusion/physical-expr/src/analysis.rs @@ -63,7 +63,7 @@ impl AnalysisContext { input_schema: &Schema, statistics: &[ColumnStatistics], ) -> Result { - (0..statistics.len()) + (0..input_schema.fields.len()) .map(|idx| { ExprBoundaries::try_from_column(input_schema, &statistics[idx], idx) }) From d14747d71f6bae6b6f8ea4fc28c86c7a1241cab0 Mon Sep 17 00:00:00 2001 From: Christoph Schulze Date: Mon, 24 Feb 2025 18:11:46 +0100 Subject: [PATCH 16/19] addressing comments --- datafusion-examples/examples/expr_api.rs | 13 ++++++-- datafusion/common/src/stats.rs | 2 +- .../expr-common/src/interval_arithmetic.rs | 14 ++++---- datafusion/expr-common/src/operator.rs | 2 +- datafusion/expr/src/udf.rs | 9 ++--- datafusion/physical-expr/src/analysis.rs | 30 +++++++++-------- .../physical-expr/src/expressions/binary.rs | 2 +- .../physical-expr/src/expressions/in_list.rs | 33 +++++++++++++++++++ .../src/expressions/is_not_null.rs | 2 +- 9 files changed, 74 insertions(+), 33 deletions(-) diff --git a/datafusion-examples/examples/expr_api.rs b/datafusion-examples/examples/expr_api.rs index 349850df6148..fea11ec5d822 100644 --- a/datafusion-examples/examples/expr_api.rs +++ b/datafusion-examples/examples/expr_api.rs @@ -24,7 +24,7 @@ use arrow::record_batch::RecordBatch; use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use datafusion::common::stats::Precision; use datafusion::common::tree_node::{Transformed, TreeNode}; -use datafusion::common::{ColumnStatistics, DFSchema}; +use datafusion::common::{internal_datafusion_err, ColumnStatistics, DFSchema}; use datafusion::common::{ScalarValue, ToDFSchema}; use datafusion::error::Result; use datafusion::functions_aggregate::first_last::first_value_udaf; @@ -302,10 +302,17 @@ fn boundary_analysis_and_selectivity_demo() -> Result<()> { distinct_count: Precision::Absent, }; + let field = schema.fields().first().ok_or_else(|| { + internal_datafusion_err!("schema does not have a field at index 0") + })?; + // We can then build our expression boundaries from the column statistics // allowing the analysis to be more precise. - let initial_boundaries = - vec![ExprBoundaries::try_from_column(&schema, &column_stats, 0)?]; + let initial_boundaries = vec![ExprBoundaries::try_from_column( + field.as_ref(), + &column_stats, + 0, + )?]; // With the above we can perform the boundary analysis similar to the previous // example. diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index 5b841db53c5e..7ba754a12105 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -158,7 +158,7 @@ impl Precision { /// rows are selected. A selectivity of `0.5` means half the rows are /// selected. Will always return inexact statistics. pub fn with_estimated_selectivity(self, selectivity: f64) -> Self { - self.map(|v| ((v as f64 * selectivity).ceil()) as usize) + self.map(|v| (v as f64 * selectivity).ceil() as usize) .to_inexact() } } diff --git a/datafusion/expr-common/src/interval_arithmetic.rs b/datafusion/expr-common/src/interval_arithmetic.rs index 5f492f96e413..edd785a030a8 100644 --- a/datafusion/expr-common/src/interval_arithmetic.rs +++ b/datafusion/expr-common/src/interval_arithmetic.rs @@ -416,14 +416,11 @@ impl Interval { ) } - pub fn is_null(&self) -> bool { + /// Check whether this interval has no boundary in either direction + pub fn is_unbounded(&self) -> bool { self.lower.is_null() && self.upper.is_null() } - pub fn is_not_null(&self) -> bool { - !(self.lower.is_null() || self.upper.is_null()) - } - pub const CERTAINLY_FALSE: Self = Self { lower: ScalarValue::Boolean(Some(false)), upper: ScalarValue::Boolean(Some(false)), @@ -1698,7 +1695,7 @@ impl Display for NullableInterval { impl From<&Interval> for NullableInterval { fn from(value: &Interval) -> Self { - if value.is_null() { + if value.is_unbounded() { Self::Null { datatype: value.data_type(), } @@ -2927,6 +2924,11 @@ mod tests { Interval::make(Some(32.0), Some(64.0))?, Interval::UNCERTAIN, ), + ( + Interval::make::(Some(3_i64), Some(5_i64))?, + Interval::make::(Some(0_i64), Some(9_i64))?, + Interval::UNCERTAIN, + ), ( Interval::make(Some(1000_i64), None)?, Interval::make(None, Some(0_i64))?, diff --git a/datafusion/expr-common/src/operator.rs b/datafusion/expr-common/src/operator.rs index 808e009192e5..ac8a9d9e06dd 100644 --- a/datafusion/expr-common/src/operator.rs +++ b/datafusion/expr-common/src/operator.rs @@ -165,7 +165,7 @@ impl Operator { } /// Indicates whether this operator supports interval arithmetic - pub fn supports_bounds_evaluation(&self) -> bool { + pub fn supports_interval_evaluation(&self) -> bool { matches!( &self, &Operator::Plus diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs index 53f053db6b12..8b7ce0a9f012 100644 --- a/datafusion/expr/src/udf.rs +++ b/datafusion/expr/src/udf.rs @@ -722,13 +722,8 @@ pub trait ScalarUDFImpl: Debug + Send + Sync { /// /// If the function is `ABS(a)`, and the input interval is `a: [-3, 2]`, /// then the output interval would be `[0, 3]`. - fn evaluate_bounds(&self, input: &[&Interval]) -> Result { - let input_data_types = input - .iter() - .map(|i| i.data_type()) - .collect::>(); - let return_type = self.return_type(&input_data_types)?; - Interval::make_unbounded(&return_type) + fn evaluate_bounds(&self, _input: &[&Interval]) -> Result { + not_impl_err!("Not implemented for UDF {:?}", self) } /// Indicates whether this ['ScalarUDFImpl'] supports interval arithmetic. diff --git a/datafusion/physical-expr/src/analysis.rs b/datafusion/physical-expr/src/analysis.rs index 51b80b1d971b..6f47d843973a 100644 --- a/datafusion/physical-expr/src/analysis.rs +++ b/datafusion/physical-expr/src/analysis.rs @@ -25,7 +25,7 @@ use crate::intervals::cp_solver::{ExprIntervalGraph, PropagationResult}; use crate::utils::collect_columns; use crate::PhysicalExpr; -use arrow::datatypes::Schema; +use arrow::datatypes::{Field, Schema}; use datafusion_common::stats::Precision; use datafusion_common::{ internal_datafusion_err, internal_err, ColumnStatistics, Result, ScalarValue, @@ -60,12 +60,23 @@ impl AnalysisContext { /// Create a new analysis context from column statistics. pub fn try_from_statistics( - input_schema: &Schema, + schema: &Schema, statistics: &[ColumnStatistics], ) -> Result { - (0..input_schema.fields.len()) - .map(|idx| { - ExprBoundaries::try_from_column(input_schema, &statistics[idx], idx) + assert_eq!(schema.fields().len(), + statistics.len(), + "Mismatching number of fields in schema and column statistics. schema: {}, column statistics: {}", + schema.fields().len(), + statistics.len() + ); + + schema + .fields() + .iter() + .zip(statistics.iter()) + .enumerate() + .map(|(idx, (field, stats))| { + ExprBoundaries::try_from_column(field.as_ref(), stats, idx) }) .collect::>>() .map(Self::new) @@ -94,17 +105,10 @@ pub struct ExprBoundaries { impl ExprBoundaries { /// Create a new `ExprBoundaries` object from column level statistics. pub fn try_from_column( - schema: &Schema, + field: &Field, col_stats: &ColumnStatistics, col_index: usize, ) -> Result { - let field = schema.fields().get(col_index).ok_or_else(|| { - internal_datafusion_err!( - "Could not create `ExprBoundaries`: in `try_from_column` `col_index` - has gone out of bounds with a value of {col_index}, the schema has {} columns.", - schema.fields.len() - ) - })?; let empty_field = ScalarValue::try_from(field.data_type()).unwrap_or(ScalarValue::Null); let interval = Interval::try_new( diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index 76527741b498..41e6bced80bd 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -390,7 +390,7 @@ impl PhysicalExpr for BinaryExpr { } } - self.op().supports_bounds_evaluation() + self.op().supports_interval_evaluation() && self.left.supports_bounds_evaluation(schema) && self.right.supports_bounds_evaluation(schema) } diff --git a/datafusion/physical-expr/src/expressions/in_list.rs b/datafusion/physical-expr/src/expressions/in_list.rs index cc7a86581e49..b67f1811f8fb 100644 --- a/datafusion/physical-expr/src/expressions/in_list.rs +++ b/datafusion/physical-expr/src/expressions/in_list.rs @@ -1468,4 +1468,37 @@ mod tests { Ok(()) } + + #[test] + fn test_in_list_bounds_eval() -> Result<()> { + let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]); + let col_a = col("a", &schema)?; + let list = vec![lit(0i64), lit(2i64), lit(9i64), lit(6i64)]; + + let expr = in_list(col_a, list, &false, &schema).unwrap(); + + let child_intervals: &[&Interval] = &[ + &Interval::make(Some(3_i64), Some(5_i64))?, + &Interval::make(Some(0_i64), Some(2_i64))?, + &Interval::make(Some(6_i64), Some(9_i64))?, + ]; + let result = expr.evaluate_bounds(child_intervals)?; + debug_assert_eq!(result, Interval::UNCERTAIN); + + let child_intervals: &[&Interval] = &[ + &Interval::make(Some(3_i64), Some(5_i64))?, + &Interval::make(Some(4_i64), Some(4_i64))?, + ]; + let result = expr.evaluate_bounds(child_intervals)?; + debug_assert_eq!(result, Interval::CERTAINLY_TRUE); + + let child_intervals: &[&Interval] = &[ + &Interval::make(Some(3_i64), Some(5_i64))?, + &Interval::make(Some(10_i64), Some(10_i64))?, + ]; + let result = expr.evaluate_bounds(child_intervals)?; + debug_assert_eq!(result, Interval::CERTAINLY_FALSE); + + Ok(()) + } } diff --git a/datafusion/physical-expr/src/expressions/is_not_null.rs b/datafusion/physical-expr/src/expressions/is_not_null.rs index 0f1214908724..bbd5d80908fc 100644 --- a/datafusion/physical-expr/src/expressions/is_not_null.rs +++ b/datafusion/physical-expr/src/expressions/is_not_null.rs @@ -108,7 +108,7 @@ impl PhysicalExpr for IsNotNullExpr { fn evaluate_bounds(&self, children: &[&Interval]) -> Result { let inner = children[0]; - Ok(if inner.is_null() { + Ok(if inner.is_unbounded() { Interval::CERTAINLY_FALSE } else if inner.lower().is_null() || inner.upper().is_null() { Interval::UNCERTAIN From d3810de1fdacfca49767afb9c8d0d1353ac4a64c Mon Sep 17 00:00:00 2001 From: Christoph Schulze Date: Tue, 25 Feb 2025 12:43:52 +0100 Subject: [PATCH 17/19] omit clone --- .../expr-common/src/interval_arithmetic.rs | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/datafusion/expr-common/src/interval_arithmetic.rs b/datafusion/expr-common/src/interval_arithmetic.rs index edd785a030a8..1fa18cc91438 100644 --- a/datafusion/expr-common/src/interval_arithmetic.rs +++ b/datafusion/expr-common/src/interval_arithmetic.rs @@ -960,10 +960,18 @@ pub fn apply_operator(op: &Operator, lhs: &Interval, rhs: &Interval) -> Result { NullableInterval::from(lhs) .apply_operator(op, &rhs.into()) - .and_then(|x| { - x.values().cloned().ok_or(DataFusionError::Internal( - "Unexpected null value interval".to_string(), - )) + .and_then(|nullable_interval| match nullable_interval { + NullableInterval::Null { .. } => { + let return_type = BinaryTypeCoercer::new( + &lhs.data_type(), + op, + &rhs.data_type(), + ) + .get_result_type()?; + Interval::make_unbounded(&return_type) + } + NullableInterval::MaybeNull { values } + | NullableInterval::NotNull { values } => Ok(values), }) } _ => internal_err!("Interval arithmetic does not support the operator {op}"), From 8813adb6db1cc61487b675e10fdc6b5c1199e00a Mon Sep 17 00:00:00 2001 From: Christoph Schulze Date: Tue, 25 Feb 2025 13:13:41 +0100 Subject: [PATCH 18/19] remove check --- datafusion/physical-expr/src/analysis.rs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/datafusion/physical-expr/src/analysis.rs b/datafusion/physical-expr/src/analysis.rs index 6f47d843973a..0dd8666d217c 100644 --- a/datafusion/physical-expr/src/analysis.rs +++ b/datafusion/physical-expr/src/analysis.rs @@ -63,13 +63,6 @@ impl AnalysisContext { schema: &Schema, statistics: &[ColumnStatistics], ) -> Result { - assert_eq!(schema.fields().len(), - statistics.len(), - "Mismatching number of fields in schema and column statistics. schema: {}, column statistics: {}", - schema.fields().len(), - statistics.len() - ); - schema .fields() .iter() From f521a6a47dd7a5877ba3fb3aa6db42b946a3dddd Mon Sep 17 00:00:00 2001 From: Christoph Schulze Date: Tue, 25 Feb 2025 14:34:12 +0100 Subject: [PATCH 19/19] UDF evaluate bounds default impl --- datafusion/expr/src/udf.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs index 8b7ce0a9f012..53f053db6b12 100644 --- a/datafusion/expr/src/udf.rs +++ b/datafusion/expr/src/udf.rs @@ -722,8 +722,13 @@ pub trait ScalarUDFImpl: Debug + Send + Sync { /// /// If the function is `ABS(a)`, and the input interval is `a: [-3, 2]`, /// then the output interval would be `[0, 3]`. - fn evaluate_bounds(&self, _input: &[&Interval]) -> Result { - not_impl_err!("Not implemented for UDF {:?}", self) + fn evaluate_bounds(&self, input: &[&Interval]) -> Result { + let input_data_types = input + .iter() + .map(|i| i.data_type()) + .collect::>(); + let return_type = self.return_type(&input_data_types)?; + Interval::make_unbounded(&return_type) } /// Indicates whether this ['ScalarUDFImpl'] supports interval arithmetic.