From 3cd0c62a3124b73c5c5d3336c4cb1caa24e5340a Mon Sep 17 00:00:00 2001 From: coastalwhite Date: Tue, 15 Oct 2024 17:31:08 +0200 Subject: [PATCH 1/3] feat: Conserve Parquet `SortingColumns` for ints MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR makes it so that `SortedColumns` can be used to preserve the sorted flag when reading into Polars. Currently, this is only enabled for integers as other types might require additional considerations. Enabling this feature for other types is trivial now, however. ```rust import polars as pl import pyarrow.parquet as pq import io f = io.BytesIO() df = pl.DataFrame({ "a": [1, 2, 3, 4, 5, None], "b": [1.0, 2.0, 3.0, 4.0, 5.0, None], "c": range(6), }) pq.write_table( df.to_arrow(), f, sorting_columns=[ pq.SortingColumn(0, False, False), pq.SortingColumn(1, False, False), ], ) f.seek(0) df = pl.read_parquet(f)._to_metadata(stats='sorted_asc') ``` Before: ```console shape: (3, 2) ┌─────────────┬────────────┐ │ column_name ┆ sorted_asc │ │ --- ┆ --- │ │ str ┆ bool │ ╞═════════════╪════════════╡ │ a ┆ false │ │ b ┆ false │ │ c ┆ false │ └─────────────┴────────────┘ ``` After: ```console shape: (3, 2) ┌─────────────┬────────────┐ │ column_name ┆ sorted_asc │ │ --- ┆ --- │ │ str ┆ bool │ ╞═════════════╪════════════╡ │ a ┆ true │ │ b ┆ false │ │ c ┆ false │ └─────────────┴────────────┘ ``` --- .../polars-io/src/parquet/read/read_impl.rs | 103 +++++++++++++++--- .../src/parquet/metadata/row_metadata.rs | 10 +- py-polars/tests/unit/io/test_parquet.py | 45 ++++++++ 3 files changed, 142 insertions(+), 16 deletions(-) diff --git a/crates/polars-io/src/parquet/read/read_impl.rs b/crates/polars-io/src/parquet/read/read_impl.rs index 0389a73b5081..1be5f554361a 100644 --- a/crates/polars-io/src/parquet/read/read_impl.rs +++ b/crates/polars-io/src/parquet/read/read_impl.rs @@ -7,8 +7,9 @@ use arrow::bitmap::MutableBitmap; use arrow::datatypes::ArrowSchemaRef; use polars_core::chunked_array::builder::NullChunkedBuilder; use polars_core::prelude::*; +use polars_core::series::IsSorted; use polars_core::utils::{accumulate_dataframes_vertical, split_df}; -use polars_core::POOL; +use polars_core::{config, POOL}; use polars_parquet::parquet::error::ParquetResult; use polars_parquet::parquet::statistics::Statistics; use polars_parquet::read::{ @@ -60,6 +61,57 @@ fn assert_dtypes(dtype: &ArrowDataType) { } } +fn should_copy_sortedness(dtype: &DataType) -> bool { + // @NOTE: For now, we are a bit conservative with this. + use DataType as D; + + matches!( + dtype, + D::Int8 | D::Int16 | D::Int32 | D::Int64 | D::UInt8 | D::UInt16 | D::UInt32 | D::UInt64 + ) +} + +fn try_set_sorted_flag( + series: &mut Series, + col_idx: usize, + sorting_map: &PlHashMap, +) { + if let Some(is_sorted) = sorting_map.get(&col_idx) { + if should_copy_sortedness(series.dtype()) { + if config::verbose() { + eprintln!( + "Parquet conserved SortingColumn for column chunk of '{}' to {is_sorted:?}", + series.name() + ); + } + + series.set_sorted_flag(*is_sorted); + } + } +} + +fn create_sorting_map(md: &RowGroupMetadata) -> PlHashMap { + let capacity = md.sorting_columns().map_or(0, |s| s.len()); + let mut sorting_map = PlHashMap::with_capacity(capacity); + + if let Some(sorting_columns) = md.sorting_columns() { + for sorting in sorting_columns { + let prev_value = sorting_map.insert( + sorting.column_idx as usize, + if sorting.descending { + IsSorted::Descending + } else { + IsSorted::Ascending + }, + ); + + debug_assert!(prev_value.is_none()); + } + } + + sorting_map +} + fn column_idx_to_series( column_i: usize, // The metadata belonging to this column @@ -320,6 +372,8 @@ fn rg_to_dfs_prefiltered( } } + let sorting_map = create_sorting_map(md); + // Collect the data for the live columns let live_columns = (0..num_live_columns) .into_par_iter() @@ -338,8 +392,12 @@ fn rg_to_dfs_prefiltered( let part = iter.collect::>(); - column_idx_to_series(col_idx, part.as_slice(), None, schema, store) - .map(Column::from) + let mut series = + column_idx_to_series(col_idx, part.as_slice(), None, schema, store)?; + + try_set_sorted_flag(&mut series, col_idx, &sorting_map); + + Ok(series.into_column()) }) .collect::>>()?; @@ -445,7 +503,7 @@ fn rg_to_dfs_prefiltered( array.filter(&mask_arr) }; - let array = if mask_setting.should_prefilter( + let mut series = if mask_setting.should_prefilter( prefilter_cost, &schema.get_at_index(col_idx).unwrap().1.dtype, ) { @@ -454,9 +512,11 @@ fn rg_to_dfs_prefiltered( post()? }; - debug_assert_eq!(array.len(), filter_mask.set_bits()); + debug_assert_eq!(series.len(), filter_mask.set_bits()); + + try_set_sorted_flag(&mut series, col_idx, &sorting_map); - Ok(array.into_column()) + Ok(series.into_column()) }) .collect::>>()?; @@ -569,6 +629,8 @@ fn rg_to_dfs_optionally_par_over_columns( assert!(std::env::var("POLARS_PANIC_IF_PARQUET_PARSED").is_err()) } + let sorting_map = create_sorting_map(md); + let columns = if let ParallelStrategy::Columns = parallel { POOL.install(|| { projection @@ -586,14 +648,17 @@ fn rg_to_dfs_optionally_par_over_columns( let part = iter.collect::>(); - column_idx_to_series( + let mut series = column_idx_to_series( *column_i, part.as_slice(), Some(Filter::new_ranged(rg_slice.0, rg_slice.0 + rg_slice.1)), schema, store, - ) - .map(Column::from) + )?; + + try_set_sorted_flag(&mut series, *column_i, &sorting_map); + + Ok(series.into_column()) }) .collect::>>() })? @@ -613,14 +678,17 @@ fn rg_to_dfs_optionally_par_over_columns( let part = iter.collect::>(); - column_idx_to_series( + let mut series = column_idx_to_series( *column_i, part.as_slice(), Some(Filter::new_ranged(rg_slice.0, rg_slice.0 + rg_slice.1)), schema, store, - ) - .map(Column::from) + )?; + + try_set_sorted_flag(&mut series, *column_i, &sorting_map); + + Ok(series.into_column()) }) .collect::>>()? }; @@ -705,6 +773,8 @@ fn rg_to_dfs_par_over_rg( assert!(std::env::var("POLARS_PANIC_IF_PARQUET_PARSED").is_err()) } + let sorting_map = create_sorting_map(md); + let columns = projection .iter() .map(|column_i| { @@ -720,14 +790,17 @@ fn rg_to_dfs_par_over_rg( let part = iter.collect::>(); - column_idx_to_series( + let mut series = column_idx_to_series( *column_i, part.as_slice(), Some(Filter::new_ranged(slice.0, slice.0 + slice.1)), schema, store, - ) - .map(Column::from) + )?; + + try_set_sorted_flag(&mut series, *column_i, &sorting_map); + + Ok(series.into_column()) }) .collect::>>()?; diff --git a/crates/polars-parquet/src/parquet/metadata/row_metadata.rs b/crates/polars-parquet/src/parquet/metadata/row_metadata.rs index 9cca27553415..e23a96893c8a 100644 --- a/crates/polars-parquet/src/parquet/metadata/row_metadata.rs +++ b/crates/polars-parquet/src/parquet/metadata/row_metadata.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use hashbrown::hash_map::RawEntryMut; -use parquet_format_safe::RowGroup; +use parquet_format_safe::{RowGroup, SortingColumn}; use polars_utils::aliases::{InitHashMaps, PlHashMap}; use polars_utils::idx_vec::UnitVec; use polars_utils::pl_str::PlSmallStr; @@ -41,6 +41,7 @@ pub struct RowGroupMetadata { num_rows: usize, total_byte_size: usize, full_byte_range: core::ops::Range, + sorting_columns: Option>, } impl RowGroupMetadata { @@ -85,6 +86,10 @@ impl RowGroupMetadata { self.columns.iter().map(|x| x.byte_range()) } + pub fn sorting_columns(&self) -> Option<&[SortingColumn]> { + self.sorting_columns.as_deref() + } + /// Method to convert from Thrift. pub(crate) fn try_from_thrift( schema_descr: &SchemaDescriptor, @@ -106,6 +111,8 @@ impl RowGroupMetadata { 0..0 }; + let sorting_columns = rg.sorting_columns.clone(); + let columns = rg .columns .into_iter() @@ -131,6 +138,7 @@ impl RowGroupMetadata { num_rows, total_byte_size, full_byte_range, + sorting_columns, }) } } diff --git a/py-polars/tests/unit/io/test_parquet.py b/py-polars/tests/unit/io/test_parquet.py index 850bf61d978b..89848e5cb4e0 100644 --- a/py-polars/tests/unit/io/test_parquet.py +++ b/py-polars/tests/unit/io/test_parquet.py @@ -1990,3 +1990,48 @@ def test_nested_nonnullable_19158() -> None: f.seek(0) assert_frame_equal(pl.read_parquet(f), pl.DataFrame(tbl)) + + +@pytest.mark.parametrize("parallel", ["prefiltered", "columns", "row_groups", "auto"]) +def test_conserve_sortedness(parallel: pl.ParallelStrategy) -> None: + f = io.BytesIO() + + df = pl.DataFrame( + { + "a": [1, 2, 3, 4, 5, None], + "b": [1.0, 2.0, 3.0, 4.0, 5.0, None], + "c": [None, 5, 4, 3, 2, 1], + "d": [None, 5.0, 4.0, 3.0, 2.0, 1.0], + "a_nosort": [1, 2, 3, 4, 5, None], + "f": range(6), + } + ) + + pq.write_table( + df.to_arrow(), + f, + sorting_columns=[ + pq.SortingColumn(0, False, False), + pq.SortingColumn(1, False, False), + pq.SortingColumn(2, True, True), + pq.SortingColumn(3, True, True), + ], + ) + + f.seek(0) + df = pl.scan_parquet(f, parallel=parallel).filter(pl.col.f > 1).collect() + + cols = ["a", "b", "c", "d", "a_nosort"] + + # @NOTE: We don't conserve sortedness for anything except integers at the + # moment. + assert_frame_equal( + df._to_metadata(cols, ["sorted_asc", "sorted_dsc"]), + pl.DataFrame( + { + "column_name": cols, + "sorted_asc": [True, False, False, False, False], + "sorted_dsc": [False, False, True, False, False], + } + ), + ) From 06ed71af90b01b28a92a7eb136888026e361460e Mon Sep 17 00:00:00 2001 From: coastalwhite Date: Wed, 16 Oct 2024 09:23:25 +0200 Subject: [PATCH 2/3] add streaming impl and use verbose in test --- crates/polars-io/src/parquet/read/mod.rs | 1 + .../polars-io/src/parquet/read/read_impl.rs | 4 +-- .../src/parquet/metadata/row_metadata.rs | 5 +++ .../parquet_source/row_group_data_fetch.rs | 6 +++- .../nodes/parquet_source/row_group_decode.rs | 35 ++++++++++++++----- py-polars/tests/unit/io/test_parquet.py | 20 +++++------ 6 files changed, 48 insertions(+), 23 deletions(-) diff --git a/crates/polars-io/src/parquet/read/mod.rs b/crates/polars-io/src/parquet/read/mod.rs index 1fec749af5ce..cc0020cc7857 100644 --- a/crates/polars-io/src/parquet/read/mod.rs +++ b/crates/polars-io/src/parquet/read/mod.rs @@ -33,6 +33,7 @@ or set 'streaming'", pub use options::{ParallelStrategy, ParquetOptions}; use polars_error::{ErrString, PolarsError}; +pub use read_impl::{create_sorting_map, try_set_sorted_flag}; #[cfg(feature = "cloud")] pub use reader::ParquetAsyncReader; pub use reader::{BatchedParquetReader, ParquetReader}; diff --git a/crates/polars-io/src/parquet/read/read_impl.rs b/crates/polars-io/src/parquet/read/read_impl.rs index 1be5f554361a..b6944755fdea 100644 --- a/crates/polars-io/src/parquet/read/read_impl.rs +++ b/crates/polars-io/src/parquet/read/read_impl.rs @@ -71,7 +71,7 @@ fn should_copy_sortedness(dtype: &DataType) -> bool { ) } -fn try_set_sorted_flag( +pub fn try_set_sorted_flag( series: &mut Series, col_idx: usize, sorting_map: &PlHashMap, @@ -90,7 +90,7 @@ fn try_set_sorted_flag( } } -fn create_sorting_map(md: &RowGroupMetadata) -> PlHashMap { +pub fn create_sorting_map(md: &RowGroupMetadata) -> PlHashMap { let capacity = md.sorting_columns().map_or(0, |s| s.len()); let mut sorting_map = PlHashMap::with_capacity(capacity); diff --git a/crates/polars-parquet/src/parquet/metadata/row_metadata.rs b/crates/polars-parquet/src/parquet/metadata/row_metadata.rs index e23a96893c8a..bf27bffb66ef 100644 --- a/crates/polars-parquet/src/parquet/metadata/row_metadata.rs +++ b/crates/polars-parquet/src/parquet/metadata/row_metadata.rs @@ -60,6 +60,11 @@ impl RowGroupMetadata { .map(|x| x.iter().map(|&x| &self.columns[x])) } + /// Fetch all columns under this root name if it exists. + pub fn columns_idxs_under_root_iter<'a>(&'a self, root_name: &str) -> Option<&'a [usize]> { + self.column_lookup.get(root_name).map(|x| x.as_slice()) + } + /// Number of rows in this row group. pub fn num_rows(&self) -> usize { self.num_rows diff --git a/crates/polars-stream/src/nodes/parquet_source/row_group_data_fetch.rs b/crates/polars-stream/src/nodes/parquet_source/row_group_data_fetch.rs index dfa4b11e3b02..52d3003de7ea 100644 --- a/crates/polars-stream/src/nodes/parquet_source/row_group_data_fetch.rs +++ b/crates/polars-stream/src/nodes/parquet_source/row_group_data_fetch.rs @@ -2,11 +2,12 @@ use std::future::Future; use std::sync::Arc; use polars_core::prelude::{ArrowSchema, InitHashMaps, PlHashMap}; +use polars_core::series::IsSorted; use polars_core::utils::operation_exceeded_idxsize_msg; use polars_error::{polars_err, PolarsResult}; use polars_io::predicates::PhysicalIoExpr; -use polars_io::prelude::FileMetadata; use polars_io::prelude::_internal::read_this_row_group; +use polars_io::prelude::{create_sorting_map, FileMetadata}; use polars_io::utils::byte_source::{ByteSource, DynByteSource}; use polars_io::utils::slice::SplitSlicePosition; use polars_parquet::read::RowGroupMetadata; @@ -27,6 +28,7 @@ pub(super) struct RowGroupData { pub(super) slice: Option<(usize, usize)>, pub(super) file_max_row_group_height: usize, pub(super) row_group_metadata: RowGroupMetadata, + pub(super) sorting_map: PlHashMap, pub(super) shared_file_state: Arc>, } @@ -86,6 +88,7 @@ impl RowGroupDataFetcher { let current_row_group_idx = self.current_row_group_idx; let num_rows = row_group_metadata.num_rows(); + let sorting_map = create_sorting_map(&row_group_metadata); self.current_row_offset = current_row_offset.saturating_add(num_rows); self.current_row_group_idx += 1; @@ -246,6 +249,7 @@ impl RowGroupDataFetcher { slice, file_max_row_group_height: current_max_row_group_height, row_group_metadata, + sorting_map, shared_file_state: current_shared_file_state.clone(), }) }); diff --git a/crates/polars-stream/src/nodes/parquet_source/row_group_decode.rs b/crates/polars-stream/src/nodes/parquet_source/row_group_decode.rs index 119345295686..975ff6de22cb 100644 --- a/crates/polars-stream/src/nodes/parquet_source/row_group_decode.rs +++ b/crates/polars-stream/src/nodes/parquet_source/row_group_decode.rs @@ -11,6 +11,7 @@ use polars_error::{polars_bail, PolarsResult}; use polars_io::predicates::PhysicalIoExpr; use polars_io::prelude::_internal::calc_prefilter_cost; pub use polars_io::prelude::_internal::PrefilterMaskSetting; +use polars_io::prelude::try_set_sorted_flag; use polars_io::RowIndex; use polars_plan::plans::hive::HivePartitions; use polars_plan::plans::ScanSources; @@ -367,11 +368,20 @@ fn decode_column( assert_eq!(array.len(), expected_num_rows); - let series = Series::try_from((arrow_field, array))?; + let mut series = Series::try_from((arrow_field, array))?; + + if let Some(col_idxs) = row_group_data + .row_group_metadata + .columns_idxs_under_root_iter(&arrow_field.name) + { + if col_idxs.len() == 1 { + try_set_sorted_flag(&mut series, col_idxs[0], &row_group_data.sorting_map); + } + } // TODO: Also load in the metadata. - Ok(series.into()) + Ok(series.into_column()) } /// # Safety @@ -652,17 +662,26 @@ fn decode_column_prefiltered( deserialize_filter, )?; - let column = Series::try_from((arrow_field, array))?.into_column(); + let mut series = Series::try_from((arrow_field, array))?; + + if let Some(col_idxs) = row_group_data + .row_group_metadata + .columns_idxs_under_root_iter(&arrow_field.name) + { + if col_idxs.len() == 1 { + try_set_sorted_flag(&mut series, col_idxs[0], &row_group_data.sorting_map); + } + } - let column = if !prefilter { - column.filter(mask)? + let series = if !prefilter { + series.filter(mask)? } else { - column + series }; - assert_eq!(column.len(), expected_num_rows); + assert_eq!(series.len(), expected_num_rows); - Ok(column) + Ok(series.into_column()) } mod tests { diff --git a/py-polars/tests/unit/io/test_parquet.py b/py-polars/tests/unit/io/test_parquet.py index 89848e5cb4e0..2bcd1af220c0 100644 --- a/py-polars/tests/unit/io/test_parquet.py +++ b/py-polars/tests/unit/io/test_parquet.py @@ -1993,7 +1993,7 @@ def test_nested_nonnullable_19158() -> None: @pytest.mark.parametrize("parallel", ["prefiltered", "columns", "row_groups", "auto"]) -def test_conserve_sortedness(parallel: pl.ParallelStrategy) -> None: +def test_conserve_sortedness(monkeypatch: Any, capfd: Any, parallel: pl.ParallelStrategy) -> None: f = io.BytesIO() df = pl.DataFrame( @@ -2019,19 +2019,15 @@ def test_conserve_sortedness(parallel: pl.ParallelStrategy) -> None: ) f.seek(0) + + monkeypatch.setenv("POLARS_VERBOSE", "1") + df = pl.scan_parquet(f, parallel=parallel).filter(pl.col.f > 1).collect() - cols = ["a", "b", "c", "d", "a_nosort"] + captured = capfd.readouterr().err # @NOTE: We don't conserve sortedness for anything except integers at the # moment. - assert_frame_equal( - df._to_metadata(cols, ["sorted_asc", "sorted_dsc"]), - pl.DataFrame( - { - "column_name": cols, - "sorted_asc": [True, False, False, False, False], - "sorted_dsc": [False, False, True, False, False], - } - ), - ) + assert captured.count("Parquet conserved SortingColumn for column chunk of") == 2 + assert "Parquet conserved SortingColumn for column chunk of 'a' to Ascending" in captured + assert "Parquet conserved SortingColumn for column chunk of 'c' to Descending" in captured From 9696e5b9ea0bed544c7eb9815ae96948ee3c34e5 Mon Sep 17 00:00:00 2001 From: coastalwhite Date: Wed, 16 Oct 2024 09:23:39 +0200 Subject: [PATCH 3/3] pyfmt --- py-polars/tests/unit/io/test_parquet.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/py-polars/tests/unit/io/test_parquet.py b/py-polars/tests/unit/io/test_parquet.py index 2bcd1af220c0..71dfab8913d9 100644 --- a/py-polars/tests/unit/io/test_parquet.py +++ b/py-polars/tests/unit/io/test_parquet.py @@ -1993,7 +1993,9 @@ def test_nested_nonnullable_19158() -> None: @pytest.mark.parametrize("parallel", ["prefiltered", "columns", "row_groups", "auto"]) -def test_conserve_sortedness(monkeypatch: Any, capfd: Any, parallel: pl.ParallelStrategy) -> None: +def test_conserve_sortedness( + monkeypatch: Any, capfd: Any, parallel: pl.ParallelStrategy +) -> None: f = io.BytesIO() df = pl.DataFrame( @@ -2029,5 +2031,11 @@ def test_conserve_sortedness(monkeypatch: Any, capfd: Any, parallel: pl.Parallel # @NOTE: We don't conserve sortedness for anything except integers at the # moment. assert captured.count("Parquet conserved SortingColumn for column chunk of") == 2 - assert "Parquet conserved SortingColumn for column chunk of 'a' to Ascending" in captured - assert "Parquet conserved SortingColumn for column chunk of 'c' to Descending" in captured + assert ( + "Parquet conserved SortingColumn for column chunk of 'a' to Ascending" + in captured + ) + assert ( + "Parquet conserved SortingColumn for column chunk of 'c' to Descending" + in captured + )