Skip to content

Commit

Permalink
fix: Parquet predicate pushdown for lit(_) !=.
Browse files Browse the repository at this point in the history
Fixes #19238.
  • Loading branch information
coastalwhite committed Oct 15, 2024
1 parent e29e9df commit 7e54abe
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 19 deletions.
8 changes: 0 additions & 8 deletions crates/polars-expr/src/expressions/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,14 +525,6 @@ fn apply_multiple_elementwise<'a>(
impl StatsEvaluator for ApplyExpr {
fn should_read(&self, stats: &BatchStats) -> PolarsResult<bool> {
let read = self.should_read_impl(stats)?;
if ExecutionState::new().verbose() {
if read {
eprintln!("parquet file must be read, statistics not sufficient for predicate.")
} else {
eprintln!("parquet file can be skipped, the statistics were sufficient to apply the predicate.")
}
}

Ok(read)
}
}
Expand Down
6 changes: 1 addition & 5 deletions crates/polars-expr/src/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ mod stats {
use ChunkCompareIneq as C;
match op {
Operator::Eq => apply_operator_stats_eq(min_max, literal),
Operator::NotEq => apply_operator_stats_eq(min_max, literal),
Operator::NotEq => apply_operator_stats_neq(min_max, literal),
Operator::Gt => {
// Literal is bigger than max value, selection needs all rows.
C::gt(literal, min_max).map(|ca| ca.any()).unwrap_or(false)
Expand Down Expand Up @@ -457,10 +457,6 @@ mod stats {

impl StatsEvaluator for BinaryExpr {
fn should_read(&self, stats: &BatchStats) -> PolarsResult<bool> {
if std::env::var("POLARS_NO_PARQUET_STATISTICS").is_ok() {
return Ok(true);
}

use Operator::*;
match (
self.left.as_stats_evaluator(),
Expand Down
31 changes: 25 additions & 6 deletions crates/polars-io/src/parquet/read/predicates.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use polars_core::config;
use polars_core::prelude::*;
use polars_parquet::read::statistics::{deserialize, Statistics};
use polars_parquet::read::RowGroupMetadata;
Expand Down Expand Up @@ -50,18 +51,36 @@ pub fn read_this_row_group(
md: &RowGroupMetadata,
schema: &ArrowSchema,
) -> PolarsResult<bool> {
if std::env::var("POLARS_NO_PARQUET_STATISTICS").is_ok() {
return Ok(true);
}

let mut should_read = true;

if let Some(pred) = predicate {
if let Some(pred) = pred.as_stats_evaluator() {
if let Some(stats) = collect_statistics(md, schema)? {
let should_read = pred.should_read(&stats);
let pred_result = pred.should_read(&stats);

// a parquet file may not have statistics of all columns
if matches!(should_read, Ok(false)) {
return Ok(false);
} else if !matches!(should_read, Err(PolarsError::ColumnNotFound(_))) {
let _ = should_read?;
match pred_result {
Err(PolarsError::ColumnNotFound(errstr)) => {
return Err(PolarsError::ColumnNotFound(errstr))
},
Ok(false) => should_read = false,
_ => {},
}
}
}
}
Ok(true)

if config::verbose() {
if should_read {
eprintln!("parquet row group must be read, statistics not sufficient for predicate.");
} else {
eprintln!("parquet row group can be skipped, the statistics were sufficient to apply the predicate.");
}
}

Ok(should_read)
}

0 comments on commit 7e54abe

Please sign in to comment.