diff --git a/crates/polars-arrow/src/array/fixed_size_binary/mutable.rs b/crates/polars-arrow/src/array/fixed_size_binary/mutable.rs index ddec4455ca85..d0736972ddca 100644 --- a/crates/polars-arrow/src/array/fixed_size_binary/mutable.rs +++ b/crates/polars-arrow/src/array/fixed_size_binary/mutable.rs @@ -222,6 +222,14 @@ impl MutableFixedSizeBinaryArray { validity.shrink_to_fit() } } + + pub fn freeze(self) -> FixedSizeBinaryArray { + FixedSizeBinaryArray::new( + ArrowDataType::FixedSizeBinary(self.size), + self.values.into(), + self.validity.map(|x| x.into()), + ) + } } /// Accessors diff --git a/crates/polars-io/src/parquet/read/predicates.rs b/crates/polars-io/src/parquet/read/predicates.rs index d713217a5cc8..960e04ada2e3 100644 --- a/crates/polars-io/src/parquet/read/predicates.rs +++ b/crates/polars-io/src/parquet/read/predicates.rs @@ -1,69 +1,45 @@ use polars_core::config; use polars_core::prelude::*; -use polars_parquet::read::statistics::{deserialize, Statistics}; +use polars_parquet::read::statistics::{ + deserialize, deserialize_all, ArrowColumnStatisticsArrays, Statistics, +}; use polars_parquet::read::RowGroupMetadata; use crate::predicates::{BatchStats, ColumnStats, ScanIOPredicate}; /// Collect the statistics in a row-group pub fn collect_statistics_with_live_columns( - md: &RowGroupMetadata, + row_groups: &[RowGroupMetadata], schema: &ArrowSchema, - pl_schema: &SchemaRef, live_columns: &PlIndexSet, -) -> PolarsResult> { - // TODO! fix this performance. This is a full sequential scan. - let stats = live_columns +) -> PolarsResult>> { + if row_groups.is_empty() { + return Ok((0..live_columns.len()).map(|_| None).collect()); + } + + let md = &row_groups[0]; + live_columns .iter() .map(|c| { let field = schema.get(c).unwrap(); - let default_fn = || ColumnStats::new(field.into(), None, None, None); - // This can be None in the allow_missing_columns case. - let Some(mut iter) = md.columns_under_root_iter(&field.name) else { - return Ok(default_fn()); + let Some(idxs) = md.columns_idxs_under_root_iter(&field.name) else { + return Ok(None); }; - let statistics = deserialize(field, &mut iter)?; - assert!(iter.next().is_none()); - - // We don't support reading nested statistics for now. It does not really make any - // sense at the moment with how we structure statistics. - let Some(Statistics::Column(stats)) = statistics else { - return Ok(default_fn()); - }; - - let stats = stats.into_arrow()?; - - let null_count = stats - .null_count - .map(|x| Scalar::from(x).into_series(PlSmallStr::EMPTY)); - let min_value = stats - .min_value - .map(|x| Series::try_from((PlSmallStr::EMPTY, x)).unwrap()); - let max_value = stats - .max_value - .map(|x| Series::try_from((PlSmallStr::EMPTY, x)).unwrap()); + // 0 is possible for possible for empty structs. + // + // 2+ is for structs. We don't support reading nested statistics for now. It does not + // really make any sense at the moment with how we structure statistics. + if idxs.is_empty() || idxs.len() > 1 { + return Ok(None); + } - Ok(ColumnStats::new( - field.into(), - null_count, - min_value, - max_value, - )) + let idx = idxs[0]; + Ok(deserialize_all(field, row_groups, idx)?) }) - .collect::>>()?; - - if stats.is_empty() { - return Ok(None); - } - - Ok(Some(BatchStats::new( - pl_schema.clone(), - stats, - Some(md.num_rows()), - ))) + .collect::>>() } /// Collect the statistics in a row-group diff --git a/crates/polars-parquet/src/arrow/read/statistics.rs b/crates/polars-parquet/src/arrow/read/statistics.rs index 5951298de264..5fe2d524f042 100644 --- a/crates/polars-parquet/src/arrow/read/statistics.rs +++ b/crates/polars-parquet/src/arrow/read/statistics.rs @@ -1,13 +1,16 @@ //! APIs exposing `crate::parquet`'s statistics as arrow's statistics. use arrow::array::{ - Array, BinaryViewArray, BooleanArray, FixedSizeBinaryArray, PrimitiveArray, Utf8ViewArray, + Array, BinaryViewArray, BooleanArray, FixedSizeBinaryArray, MutableBinaryViewArray, + MutableBooleanArray, MutableFixedSizeBinaryArray, MutablePrimitiveArray, NullArray, + PrimitiveArray, Utf8ViewArray, }; use arrow::datatypes::{ArrowDataType, Field, IntegerType, IntervalUnit, TimeUnit}; -use arrow::types::{f16, i256, NativeType}; +use arrow::types::{days_ms, f16, i256, NativeType}; use ethnum::I256; use polars_utils::pl_str::PlSmallStr; +use polars_utils::IdxSize; -use super::ParquetTimeUnit; +use super::{ParquetTimeUnit, RowGroupMetadata}; use crate::parquet::error::{ParquetError, ParquetResult}; use crate::parquet::schema::types::PhysicalType as ParquetPhysicalType; use crate::parquet::statistics::Statistics as ParquetStatistics; @@ -60,6 +63,14 @@ pub struct ArrowColumnStatistics { pub max_value: Option>, } +/// Arrow-deserialized parquet statistics of a leaf-column +pub struct ArrowColumnStatisticsArrays { + pub null_count: PrimitiveArray, + pub distinct_count: PrimitiveArray, + pub min_value: Box, + pub max_value: Box, +} + fn timestamp(logical_type: Option<&PrimitiveLogicalType>, time_unit: TimeUnit, x: i64) -> i64 { let unit = if let Some(PrimitiveLogicalType::Timestamp { unit, .. }) = logical_type { unit @@ -277,6 +288,259 @@ impl ColumnStatistics { } } +/// Deserializes the statistics in the column chunks from a single `row_group` +/// into [`Statistics`] associated from `field`'s name. +/// +/// # Errors +/// This function errors if the deserialization of the statistics fails (e.g. invalid utf8) +pub fn deserialize_all( + field: &Field, + row_groups: &[RowGroupMetadata], + field_idx: usize, +) -> ParquetResult> { + assert!(!row_groups.is_empty()); + use ArrowDataType as D; + match field.dtype() { + // @TODO: These are all a bit more complex, skip for now. + D::List(..) | D::LargeList(..) => Ok(None), + D::Dictionary(..) => Ok(None), + D::FixedSizeList(..) => Ok(None), + D::Struct(..) => Ok(None), + + _ => { + let mut null_count = MutablePrimitiveArray::::with_capacity(row_groups.len()); + let mut distinct_count = + MutablePrimitiveArray::::with_capacity(row_groups.len()); + + let primitive_type = &row_groups[0].parquet_columns()[field_idx] + .descriptor() + .descriptor + .primitive_type; + + let logical_type = &primitive_type.logical_type; + let physical_type = &primitive_type.physical_type; + + macro_rules! rmap { + ($expect:ident, $map:expr, $arr:ty$(, $arg:expr)?) => {{ + let mut min_arr = <$arr>::with_capacity(row_groups.len()$(, $arg)?); + let mut max_arr = <$arr>::with_capacity(row_groups.len()$(, $arg)?); + + for rg in row_groups { + let column = &rg.parquet_columns()[field_idx]; + let s = column.statistics().transpose()?; + + let (v_min, v_max, v_null_count, v_distinct_count) = match s { + None => (None, None, None, None), + Some(s) => { + let s = s.$expect(); + + let min = s.min_value; + let max = s.max_value; + + let min = ($map)(min)?; + let max = ($map)(max)?; + + ( + min, + max, + s.null_count.map(|v| v as IdxSize), + s.distinct_count.map(|v| v as IdxSize), + ) + } + }; + + min_arr.push(v_min); + max_arr.push(v_max); + null_count.push(v_null_count); + distinct_count.push(v_distinct_count); + } + + (min_arr.freeze().to_boxed(), max_arr.freeze().to_boxed()) + }}; + ($expect:ident, $arr:ty, @prim $from:ty $(as $to:ty)? $(, $map:expr)?) => {{ + rmap!( + $expect, + |x: Option<$from>| { + $( + let x = x.map(|x| x as $to); + )? + $( + let x = x.map($map); + )? + ParquetResult::Ok(x) + }, + $arr + ) + }}; + (@binary $(, $map:expr)?) => {{ + rmap!( + expect_binary, + |x: Option>| { + $( + let x = x.map($map); + )? + ParquetResult::Ok(x) + }, + MutableBinaryViewArray<[u8]> + ) + }}; + (@string) => {{ + rmap!( + expect_binary, + |x: Option>| { + let x = x.map(String::from_utf8).transpose().map_err(|_| { + ParquetError::oos("Invalid UTF8 in Statistics") + })?; + ParquetResult::Ok(x) + }, + MutableBinaryViewArray + ) + }}; + } + + use {ArrowDataType as D, ParquetPhysicalType as PPT}; + let (min_value, max_value) = match (field.dtype(), physical_type) { + (D::Null, _) => ( + NullArray::new(ArrowDataType::Null, row_groups.len()).to_boxed(), + NullArray::new(ArrowDataType::Null, row_groups.len()).to_boxed(), + ), + + (D::Boolean, _) => rmap!( + expect_boolean, + |x: Option| ParquetResult::Ok(x), + MutableBooleanArray + ), + + (D::Int8, _) => rmap!(expect_int32, MutablePrimitiveArray::, @prim i32 as i8), + (D::Int16, _) => { + rmap!(expect_int32, MutablePrimitiveArray::, @prim i32 as i16) + }, + (D::Int32 | D::Date32 | D::Time32(_), _) => { + rmap!(expect_int32, MutablePrimitiveArray::, @prim i32 as i32) + }, + + // some implementations of parquet write arrow's date64 into i32. + (D::Date64, PPT::Int32) => { + rmap!(expect_int32, MutablePrimitiveArray::, @prim i32 as i64, |x| x * 86400000) + }, + + (D::Int64 | D::Time64(_) | D::Duration(_), _) | (D::Date64, PPT::Int64) => { + rmap!(expect_int64, MutablePrimitiveArray::, @prim i64 as i64) + }, + + (D::Interval(IntervalUnit::YearMonth), _) => rmap!( + expect_binary, + MutablePrimitiveArray::, + @prim Vec, + |x| convert_year_month(&x) + ), + (D::Interval(IntervalUnit::DayTime), _) => rmap!( + expect_binary, + MutablePrimitiveArray::, + @prim Vec, + |x| convert_days_ms(&x) + ), + + (D::UInt8, _) => rmap!(expect_int32, MutablePrimitiveArray::, @prim i32 as u8), + (D::UInt16, _) => { + rmap!(expect_int32, MutablePrimitiveArray::, @prim i32 as u16) + }, + (D::UInt32, PPT::Int32) => { + rmap!(expect_int32, MutablePrimitiveArray::, @prim i32 as u32) + }, + + // some implementations of parquet write arrow's u32 into i64. + (D::UInt32, PPT::Int64) => { + rmap!(expect_int64, MutablePrimitiveArray::, @prim i64 as u32) + }, + (D::UInt64, _) => { + rmap!(expect_int64, MutablePrimitiveArray::, @prim i64 as u64) + }, + + (D::Timestamp(time_unit, _), PPT::Int96) => { + rmap!(expect_int96, MutablePrimitiveArray::, @prim [u32; 3], |x| { + timestamp(logical_type.as_ref(), *time_unit, int96_to_i64_ns(x)) + }) + }, + (D::Timestamp(time_unit, _), PPT::Int64) => { + rmap!(expect_int64, MutablePrimitiveArray::, @prim i64, |x| { + timestamp(logical_type.as_ref(), *time_unit, x) + }) + }, + + // Read Float16, since we don't have a f16 type in Polars we read it to a Float32. + (_, PPT::FixedLenByteArray(2)) + if matches!(logical_type.as_ref(), Some(PrimitiveLogicalType::Float16)) => + { + rmap!(expect_fixedlen, MutablePrimitiveArray::, @prim Vec, |v| f16::from_le_bytes([v[0], v[1]]).to_f32()) + }, + (D::Float32, _) => rmap!(expect_float, MutablePrimitiveArray::, @prim f32), + (D::Float64, _) => rmap!(expect_double, MutablePrimitiveArray::, @prim f64), + + (D::Decimal(_, _), PPT::Int32) => { + rmap!(expect_int32, MutablePrimitiveArray::, @prim i32 as i128) + }, + (D::Decimal(_, _), PPT::Int64) => { + rmap!(expect_int64, MutablePrimitiveArray::, @prim i64 as i128) + }, + (D::Decimal(_, _), PPT::FixedLenByteArray(n)) if *n > 16 => { + return Err(ParquetError::not_supported(format!( + "Can't decode Decimal128 type from Fixed Size Byte Array of len {n:?}", + ))) + }, + (D::Decimal(_, _), PPT::FixedLenByteArray(n)) => rmap!( + expect_fixedlen, + MutablePrimitiveArray::, + @prim Vec, + |x| convert_i128(&x, *n) + ), + (D::Decimal256(_, _), PPT::Int32) => { + rmap!(expect_int32, MutablePrimitiveArray::, @prim i32, |x: i32| i256(I256::new(x.into()))) + }, + (D::Decimal256(_, _), PPT::Int64) => { + rmap!(expect_int64, MutablePrimitiveArray::, @prim i64, |x: i64| i256(I256::new(x.into()))) + }, + (D::Decimal256(_, _), PPT::FixedLenByteArray(n)) if *n > 16 => { + return Err(ParquetError::not_supported(format!( + "Can't decode Decimal256 type from Fixed Size Byte Array of len {n:?}", + ))) + }, + (D::Decimal256(_, _), PPT::FixedLenByteArray(_)) => rmap!( + expect_fixedlen, + MutablePrimitiveArray::, + @prim Vec, + |x| convert_i256(&x) + ), + (D::Binary, _) => rmap!(@binary), + (D::LargeBinary, _) => rmap!(@binary), + (D::Utf8, _) => rmap!(@string), + (D::LargeUtf8, _) => rmap!(@string), + + (D::BinaryView, _) => rmap!(@binary), + (D::Utf8View, _) => rmap!(@string), + + (D::FixedSizeBinary(width), _) => { + rmap!( + expect_fixedlen, + |x: Option>| ParquetResult::Ok(x), + MutableFixedSizeBinaryArray, + *width + ) + }, + + other => todo!("{:?}", other), + }; + + Ok(Some(ArrowColumnStatisticsArrays { + null_count: null_count.freeze(), + distinct_count: distinct_count.freeze(), + min_value, + max_value, + })) + }, + } +} + /// Deserializes the statistics in the column chunks from a single `row_group` /// into [`Statistics`] associated from `field`'s name. /// diff --git a/crates/polars-parquet/src/parquet/metadata/row_metadata.rs b/crates/polars-parquet/src/parquet/metadata/row_metadata.rs index 65f119a51232..47953bfb7c6d 100644 --- a/crates/polars-parquet/src/parquet/metadata/row_metadata.rs +++ b/crates/polars-parquet/src/parquet/metadata/row_metadata.rs @@ -68,6 +68,10 @@ impl RowGroupMetadata { self.column_lookup.get(root_name).map(|x| x.as_slice()) } + pub fn parquet_columns(&self) -> &[ColumnChunkMetadata] { + self.columns.as_ref().as_slice() + } + /// Number of rows in this row group. pub fn num_rows(&self) -> usize { self.num_rows diff --git a/crates/polars-stream/src/nodes/io_sources/parquet/init.rs b/crates/polars-stream/src/nodes/io_sources/parquet/init.rs index 11f8c856fd9f..5faae7fa8ac2 100644 --- a/crates/polars-stream/src/nodes/io_sources/parquet/init.rs +++ b/crates/polars-stream/src/nodes/io_sources/parquet/init.rs @@ -3,14 +3,13 @@ use std::sync::Arc; use polars_core::frame::DataFrame; use polars_core::prelude::{Column, DataType, IntoColumn, IDX_DTYPE}; -use polars_core::schema::{Schema, SchemaExt}; +use polars_core::series::Series; use polars_core::utils::arrow::bitmap::Bitmap; use polars_core::utils::arrow::datatypes::ArrowSchemaRef; use polars_error::{polars_ensure, PolarsResult}; use polars_io::predicates::ScanIOPredicate; use polars_io::prelude::_internal::{collect_statistics_with_live_columns, PrefilterMaskSetting}; use polars_io::prelude::{FileMetadata, ParallelStrategy}; -use polars_utils::pl_str::PlSmallStr; use polars_utils::{format_pl_smallstr, IdxSize}; use super::row_group_data_fetch::RowGroupDataFetcher; @@ -47,7 +46,12 @@ async fn calculate_row_group_pred_pushdown_skip_mask( let live_columns = predicate.live_columns.clone(); let reader_schema = reader_schema.clone(); let skip_row_group_mask = async_executor::spawn(TaskPriority::High, async move { - let pl_schema = Arc::new(Schema::from_arrow_schema(reader_schema.as_ref())); + let stats = collect_statistics_with_live_columns( + &metadata.row_groups[row_group_slice.clone()], + reader_schema.as_ref(), + &live_columns, + )?; + let mut columns = Vec::with_capacity(1 + live_columns.len() * 3); let lengths: Vec = metadata.row_groups[row_group_slice.clone()] @@ -55,60 +59,54 @@ async fn calculate_row_group_pred_pushdown_skip_mask( .map(|rg| rg.num_rows() as IdxSize) .collect(); columns.push(Column::new("len".into(), lengths)); - for c in live_columns.iter() { - let dtype = DataType::from_arrow_field(reader_schema.get(c).unwrap()); - columns.push(Column::new_empty(format_pl_smallstr!("{c}_min"), &dtype)); - columns.push(Column::new_empty(format_pl_smallstr!("{c}_max"), &dtype)); - columns.push(Column::new_empty(format_pl_smallstr!("{c}_nc"), &IDX_DTYPE)); - } - - for rg in &metadata.row_groups[row_group_slice.clone()] { - if let Some(stats) = collect_statistics_with_live_columns( - rg, - reader_schema.as_ref(), - &pl_schema, - &live_columns, - )? { - // @TODO: - // 1. Only collect statistics for live columns - // 2. Gather into a contiguous buffer, not this rechunking - for col in stats.column_stats().iter() { - let Some(idx) = live_columns.get_index_of(col.field_name()) else { - continue; - }; + for (c, stat) in live_columns.iter().zip(stats) { + let field = reader_schema.get(c).unwrap(); + + let min_name = format_pl_smallstr!("{c}_min"); + let max_name = format_pl_smallstr!("{c}_max"); + let nc_name = format_pl_smallstr!("{c}_nc"); + + let (min, max, nc) = match stat { + None => { + let dtype = DataType::from_arrow_field(field); + + ( + Column::full_null(min_name, num_row_groups, &dtype), + Column::full_null(max_name, num_row_groups, &dtype), + Column::full_null(nc_name, num_row_groups, &IDX_DTYPE), + ) + }, + Some(stat) => { + let md = field.metadata.as_deref(); + + ( + unsafe { + Series::_try_from_arrow_unchecked_with_md( + min_name, + vec![stat.min_value], + field.dtype(), + md, + ) + }? + .into_column(), + unsafe { + Series::_try_from_arrow_unchecked_with_md( + max_name, + vec![stat.max_value], + field.dtype(), + md, + ) + }? + .into_column(), + Series::from_arrow(nc_name, stat.null_count.boxed())?.into_column(), + ) + }, + }; - let min = col.to_min().map_or( - Column::full_null(PlSmallStr::EMPTY, 1, columns[1 + idx * 3].dtype()), - |s| s.clone().into_column(), - ); - let max = col.to_max().map_or( - Column::full_null(PlSmallStr::EMPTY, 1, columns[1 + idx * 3].dtype()), - |s| s.clone().into_column(), - ); - let nc = col - .null_count() - .map_or(Column::full_null(PlSmallStr::EMPTY, 1, &IDX_DTYPE), |nc| { - Column::new_scalar(PlSmallStr::EMPTY, (nc as IdxSize).into(), 1) - }); - - columns[1 + idx * 3].append_owned(min)?; - columns[1 + idx * 3 + 1].append_owned(max)?; - columns[1 + idx * 3 + 2].append_owned(nc)?; - } - } else { - for i in 0..live_columns.len() { - let min = Column::full_null(PlSmallStr::EMPTY, 1, columns[1 + i * 3].dtype()); - let max = Column::full_null(PlSmallStr::EMPTY, 1, columns[1 + i * 3].dtype()); - let nc = Column::full_null(PlSmallStr::EMPTY, 1, &IDX_DTYPE); - columns[1 + i * 3].append_owned(min)?; - columns[1 + i * 3 + 1].append_owned(max)?; - columns[1 + i * 3 + 2].append_owned(nc)?; - } - } + columns.extend([min, max, nc]); } - let mut statistics_df = DataFrame::new_with_height(num_row_groups, columns)?; - statistics_df.rechunk_mut(); + let statistics_df = DataFrame::new_with_height(num_row_groups, columns)?; sbp.evaluate_with_stat_df(&statistics_df) }) .await?;