diff --git a/crates/polars-io/src/parquet/write/batched_writer.rs b/crates/polars-io/src/parquet/write/batched_writer.rs index 86b95bc36f85..7fe7bc178ad0 100644 --- a/crates/polars-io/src/parquet/write/batched_writer.rs +++ b/crates/polars-io/src/parquet/write/batched_writer.rs @@ -1,17 +1,21 @@ use std::io::Write; use std::sync::Mutex; +use arrow::array::Array; use arrow::record_batch::RecordBatch; use polars_core::prelude::*; +use polars_core::series::IsSorted; use polars_core::POOL; use polars_parquet::read::ParquetError; use polars_parquet::write::{ array_to_columns, CompressedPage, Compressor, DynIter, DynStreamingIterator, Encoding, FallibleStreamingIterator, FileWriter, Page, ParquetType, RowGroupIterColumns, - SchemaDescriptor, WriteOptions, + RowGroupWriteOptions, SchemaDescriptor, SortingColumn, WriteOptions, }; use rayon::prelude::*; +use super::options::{MaterializedSortingColumns, MetadataOptions, SortingColumnBehavior}; + pub struct BatchedWriter { // A mutex so that streaming engine can get concurrent read access to // compress pages. @@ -19,6 +23,7 @@ pub struct BatchedWriter { pub(super) parquet_schema: SchemaDescriptor, pub(super) encodings: Vec>, pub(super) options: WriteOptions, + pub(super) metadata_options: MetadataOptions, pub(super) parallel: bool, } @@ -53,12 +58,14 @@ impl BatchedWriter { &self.parquet_schema, &self.encodings, self.options, + &self.metadata_options, self.parallel, ); // Lock before looping so that order is maintained under contention. let mut writer = self.writer.lock().unwrap(); - for group in row_group_iter { - writer.write(group?)?; + for item in row_group_iter { + let (group, rg_options) = item?; + writer.write(group, rg_options)?; } Ok(()) } @@ -67,14 +74,29 @@ impl BatchedWriter { &self.writer } - pub fn write_row_groups( + pub fn write_row_groups_default_options( &self, rgs: Vec>, ) -> PolarsResult<()> { // Lock before looping so that order is maintained. let mut writer = self.writer.lock().unwrap(); for group in rgs { - writer.write(group)?; + writer.write(group, RowGroupWriteOptions::default())?; + } + Ok(()) + } + + pub fn write_row_groups( + &self, + rgs: Vec<( + RowGroupIterColumns<'static, PolarsError>, + RowGroupWriteOptions, + )>, + ) -> PolarsResult<()> { + // Lock before looping so that order is maintained. + let mut writer = self.writer.lock().unwrap(); + for (group, rg_options) in rgs { + writer.write(group, rg_options)?; } Ok(()) } @@ -93,17 +115,40 @@ fn prepare_rg_iter<'a>( parquet_schema: &'a SchemaDescriptor, encodings: &'a [Vec], options: WriteOptions, + md_options: &'a MetadataOptions, parallel: bool, -) -> impl Iterator>> + 'a { +) -> impl Iterator< + Item = PolarsResult<( + RowGroupIterColumns<'static, PolarsError>, + RowGroupWriteOptions, + )>, +> + 'a { + // @TODO: This does not work for nested columns. + let sortedness = df + .get_columns() + .iter() + .map(|c| c.is_sorted_flag()) + .collect::>(); + // @TODO: This does not work for nested columns. + let dtypes = df + .get_columns() + .iter() + .map(|c| c.dtype()) + .collect::>(); + let rb_iter = df.iter_chunks(CompatLevel::newest(), false); rb_iter.filter_map(move |batch| match batch.len() { 0 => None, - _ => { - let row_group = - create_serializer(batch, parquet_schema.fields(), encodings, options, parallel); - - Some(row_group) - }, + _ => Some(create_serializer( + batch, + parquet_schema.fields(), + encodings, + &sortedness, + &dtypes, + options, + md_options, + parallel, + )), }) } @@ -147,9 +192,15 @@ fn create_serializer( batch: RecordBatch, fields: &[ParquetType], encodings: &[Vec], + sortedness: &[IsSorted], + dtypes: &[&DataType], options: WriteOptions, + md_options: &MetadataOptions, parallel: bool, -) -> PolarsResult> { +) -> PolarsResult<( + RowGroupIterColumns<'static, PolarsError>, + RowGroupWriteOptions, +)> { let func = move |((array, type_), encoding): ((&ArrayRef, &ParquetType), &Vec)| { array_to_pages_iter(array, type_, encoding, options) }; @@ -176,7 +227,101 @@ fn create_serializer( let row_group = DynIter::new(columns.into_iter()); - Ok(row_group) + let mut rg_options = RowGroupWriteOptions::default(); + + match &md_options.sorting_columns { + MaterializedSortingColumns::All(behavior) => { + // @TODO: properly handle nested columns. + rg_options.sorting_columns = (0..batch.columns().len()) + .filter_map(|leaf_idx| { + to_sorting_column(&batch, dtypes, sortedness, leaf_idx, *behavior) + }) + .collect(); + }, + MaterializedSortingColumns::PerLeaf(sorting_columns) => { + rg_options.sorting_columns = sorting_columns + .iter() + .filter_map(|(leaf_idx, behavior)| { + to_sorting_column(&batch, dtypes, sortedness, *leaf_idx as usize, *behavior) + }) + .collect(); + }, + } + + Ok((row_group, rg_options)) +} + +fn has_compatible_sortedness(dtype: &DataType, _array: &dyn Array) -> bool { + use DataType as DT; + + matches!( + dtype, + DT::UInt8 + | DT::UInt16 + | DT::UInt32 + | DT::UInt64 + | DT::Int8 + | DT::Int16 + | DT::Int32 + | DT::Int64 + ) +} + +fn to_sorting_column( + batch: &RecordBatch, + dtypes: &[&DataType], + sortedness: &[IsSorted], + leaf_idx: usize, + behavior: SortingColumnBehavior, +) -> Option { + use SortingColumnBehavior as B; + + // @TODO: This does not work for nested structures. + let col_idx = leaf_idx; + let array = &batch.columns()[col_idx as usize]; + let dtype = dtypes[leaf_idx as usize]; + + if matches!( + behavior, + B::Preserve { force: false } | B::Evaluate { force: false } + ) { + if !has_compatible_sortedness(dtype, array.as_ref()) { + return None; + } + } + + match (behavior, sortedness[leaf_idx as usize]) { + (B::NoPreserve, _) => None, + ( + B::Force { + descending, + nulls_first, + }, + _, + ) => Some(SortingColumn { + column_idx: leaf_idx as i32, + descending, + nulls_first, + }), + (B::Preserve { .. }, IsSorted::Not) => None, + (B::Preserve { .. } | B::Evaluate { .. }, IsSorted::Ascending) => { + let nulls_first = !array.is_empty() && unsafe { array.get_unchecked(0) }.is_null(); + Some(SortingColumn { + column_idx: leaf_idx as i32, + descending: false, + nulls_first, + }) + }, + (B::Preserve { .. } | B::Evaluate { .. }, IsSorted::Descending) => { + let nulls_first = !array.is_empty() && unsafe { array.get_unchecked(0) }.is_null(); + Some(SortingColumn { + column_idx: leaf_idx as i32, + descending: true, + nulls_first, + }) + }, + (B::Evaluate { .. }, IsSorted::Not) => todo!(), + } } /// This serializer encodes and compresses all eagerly in memory. diff --git a/crates/polars-io/src/parquet/write/mod.rs b/crates/polars-io/src/parquet/write/mod.rs index 705cf2a96d6a..55c704c6c7ab 100644 --- a/crates/polars-io/src/parquet/write/mod.rs +++ b/crates/polars-io/src/parquet/write/mod.rs @@ -5,6 +5,6 @@ mod options; mod writer; pub use batched_writer::BatchedWriter; -pub use options::{BrotliLevel, GzipLevel, ParquetCompression, ParquetWriteOptions, ZstdLevel}; +pub use options::{BrotliLevel, GzipLevel, ParquetCompression, ParquetWriteOptions, ZstdLevel, SortingColumnBehavior}; pub use polars_parquet::write::{RowGroupIterColumns, StatisticsOptions}; -pub use writer::ParquetWriter; +pub use writer::{SortingColumns, ParquetWriter}; diff --git a/crates/polars-io/src/parquet/write/options.rs b/crates/polars-io/src/parquet/write/options.rs index 4e4bfa9e1edf..6109fd9f72b8 100644 --- a/crates/polars-io/src/parquet/write/options.rs +++ b/crates/polars-io/src/parquet/write/options.rs @@ -6,6 +6,39 @@ use polars_parquet::write::{ #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; +#[derive(Clone, Copy, PartialEq, Eq)] +pub enum SortingColumnBehavior { + /// Never set the `SortingColumn` metadata. + NoPreserve, + + /// Preserve the known sortedness information into the `SortingColumn` field. + Preserve { force: bool }, + + /// Evaluate whether a column is sorted and store found information in the `SortingColumn` + /// field. + Evaluate { force: bool }, + + /// Force the column to be of a certain `SortingColumn` value. + Force { descending: bool, nulls_first: bool }, +} + +impl Default for SortingColumnBehavior { + fn default() -> Self { + Self::Preserve { force: false } + } +} + +#[derive(Clone, PartialEq, Eq)] +pub enum MaterializedSortingColumns { + All(SortingColumnBehavior), + PerLeaf(Vec<(i32, SortingColumnBehavior)>), +} + +#[derive(Clone, PartialEq, Eq)] +pub(crate) struct MetadataOptions { + pub sorting_columns: MaterializedSortingColumns, +} + #[derive(Copy, Clone, Debug, PartialEq, Eq, Default, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct ParquetWriteOptions { diff --git a/crates/polars-io/src/parquet/write/writer.rs b/crates/polars-io/src/parquet/write/writer.rs index 99cf4c95a45b..4486e3f29753 100644 --- a/crates/polars-io/src/parquet/write/writer.rs +++ b/crates/polars-io/src/parquet/write/writer.rs @@ -4,12 +4,13 @@ use std::sync::Mutex; use arrow::datatypes::PhysicalType; use polars_core::prelude::*; use polars_parquet::write::{ - to_parquet_schema, transverse, CompressionOptions, Encoding, FileWriter, StatisticsOptions, - Version, WriteOptions, + to_parquet_schema, transverse, CompressionOptions, Encoding, FileWriter, SchemaDescriptor, + StatisticsOptions, Version, WriteOptions, }; +use polars_utils::idx_vec::UnitVec; use super::batched_writer::BatchedWriter; -use super::options::ParquetCompression; +use super::options::{MaterializedSortingColumns, MetadataOptions, ParquetCompression, SortingColumnBehavior}; use super::ParquetWriteOptions; use crate::prelude::chunk_df_for_writing; use crate::shared::schema_to_arrow_checked; @@ -27,6 +28,12 @@ impl ParquetWriteOptions { } } +pub enum SortingColumns { + None, + All(SortingColumnBehavior), + Fields(PlHashMap), +} + /// Write a DataFrame to Parquet format. #[must_use] pub struct ParquetWriter { @@ -39,6 +46,9 @@ pub struct ParquetWriter { row_group_size: Option, /// if `None` will be 1024^2 bytes data_page_size: Option, + + sorting_columns: SortingColumns, + /// Serialize columns in parallel parallel: bool, } @@ -58,6 +68,7 @@ where statistics: StatisticsOptions::default(), row_group_size: None, data_page_size: None, + sorting_columns: SortingColumns::None, parallel: true, } } @@ -90,6 +101,12 @@ where self } + /// Set the `SortingColumn` + pub fn with_sorting_columns(mut self, sorting_columns: SortingColumns) -> Self { + self.sorting_columns = sorting_columns; + self + } + /// Serialize columns in parallel pub fn set_parallel(mut self, parallel: bool) -> Self { self.parallel = parallel; @@ -100,18 +117,70 @@ where let schema = schema_to_arrow_checked(schema, CompatLevel::newest(), "parquet")?; let parquet_schema = to_parquet_schema(&schema)?; let encodings = get_encodings(&schema); + let metadata_options = self.materialize_md_options(&parquet_schema)?; let options = self.materialize_options(); - let writer = Mutex::new(FileWriter::try_new(self.writer, schema, options)?); + let writer = Mutex::new(FileWriter::try_new( + self.writer, + schema, + options, + )?); Ok(BatchedWriter { writer, parquet_schema, encodings, options, + metadata_options, parallel: self.parallel, }) } + fn materialize_md_options( + &self, + parquet_schema: &SchemaDescriptor, + ) -> PolarsResult { + let sorting_columns = match &self.sorting_columns { + SortingColumns::None => MaterializedSortingColumns::All(SortingColumnBehavior::Preserve { force: false }), + SortingColumns::All(behavior) => MaterializedSortingColumns::All(*behavior), + SortingColumns::Fields(fields) => { + let mut col_idx_lookup = PlHashMap::with_capacity(parquet_schema.columns().len()); + for (i, col_descriptor) in parquet_schema.columns().iter().enumerate() { + col_idx_lookup.insert(col_descriptor.path_in_schema.as_slice(), i as i32); + } + + let mut sorting_columns = Vec::new(); + let mut stack = vec![(UnitVec::default(), fields)]; + + loop { + let Some((path, fields)) = stack.pop() else { + break; + }; + + for (name, sc) in fields.iter() { + let mut field_path = path.clone(); + field_path.push(name.clone()); + + let col_idx = col_idx_lookup + .get(field_path.as_slice()) + .ok_or_else(|| polars_err!(col_not_found = path.as_slice().join(" ")))?; + + match sc { + SortingColumns::None => sorting_columns.push((*col_idx, SortingColumnBehavior::default())), + SortingColumns::All(sorting_column_behavior) => sorting_columns.push((*col_idx, *sorting_column_behavior)), + SortingColumns::Fields(fields) => stack.push((field_path, fields)), + } + } + } + + sorting_columns.sort_unstable_by_key(|(col_idx, _)| *col_idx); + + MaterializedSortingColumns::PerLeaf(sorting_columns) + } + }; + + Ok(MetadataOptions { sorting_columns }) + } + fn materialize_options(&self) -> WriteOptions { WriteOptions { statistics: self.statistics, diff --git a/crates/polars-parquet/src/arrow/write/file.rs b/crates/polars-parquet/src/arrow/write/file.rs index 0fd32deb5b07..98bed0a27f2a 100644 --- a/crates/polars-parquet/src/arrow/write/file.rs +++ b/crates/polars-parquet/src/arrow/write/file.rs @@ -4,7 +4,7 @@ use arrow::datatypes::ArrowSchema; use polars_error::{PolarsError, PolarsResult}; use super::schema::schema_to_metadata_key; -use super::{to_parquet_schema, ThriftFileMetadata, WriteOptions}; +use super::{to_parquet_schema, RowGroupWriteOptions, ThriftFileMetadata, WriteOptions}; use crate::parquet::metadata::{KeyValue, SchemaDescriptor}; use crate::parquet::write::{RowGroupIterColumns, WriteOptions as FileWriteOptions}; @@ -50,7 +50,11 @@ impl FileWriter { /// Returns a new [`FileWriter`]. /// # Error /// If it is unable to derive a parquet schema from [`ArrowSchema`]. - pub fn try_new(writer: W, schema: ArrowSchema, options: WriteOptions) -> PolarsResult { + pub fn try_new( + writer: W, + schema: ArrowSchema, + options: WriteOptions, + ) -> PolarsResult { let parquet_schema = to_parquet_schema(&schema)?; let created_by = Some("Polars".to_string()); @@ -71,8 +75,8 @@ impl FileWriter { } /// Writes a row group to the file. - pub fn write(&mut self, row_group: RowGroupIterColumns<'_, PolarsError>) -> PolarsResult<()> { - Ok(self.writer.write(row_group)?) + pub fn write(&mut self, row_group: RowGroupIterColumns<'_, PolarsError>, row_group_write_options: RowGroupWriteOptions) -> PolarsResult<()> { + Ok(self.writer.write(row_group, row_group_write_options)?) } /// Writes the footer of the parquet file. Returns the total size of the file. diff --git a/crates/polars-parquet/src/arrow/write/mod.rs b/crates/polars-parquet/src/arrow/write/mod.rs index 02f0165d04c7..7fb777c1916f 100644 --- a/crates/polars-parquet/src/arrow/write/mod.rs +++ b/crates/polars-parquet/src/arrow/write/mod.rs @@ -32,6 +32,7 @@ use arrow::datatypes::*; use arrow::types::{days_ms, i256, NativeType}; pub use nested::{num_values, write_rep_and_def}; pub use pages::{to_leaves, to_nested, to_parquet_leaves}; +pub use parquet_format_safe::SortingColumn; use polars_utils::pl_str::PlSmallStr; pub use utils::write_def_levels; @@ -79,6 +80,11 @@ pub enum EncodeNullability { Optional, } +#[derive(Clone, Default, PartialEq, Eq)] +pub struct RowGroupWriteOptions { + pub sorting_columns: Vec, +} + /// Currently supported options to write to parquet #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct WriteOptions { diff --git a/crates/polars-parquet/src/parquet/write/file.rs b/crates/polars-parquet/src/parquet/write/file.rs index 8dd3212bb76a..9e7d2e2de500 100644 --- a/crates/polars-parquet/src/parquet/write/file.rs +++ b/crates/polars-parquet/src/parquet/write/file.rs @@ -12,6 +12,7 @@ pub use crate::parquet::metadata::KeyValue; use crate::parquet::metadata::{SchemaDescriptor, ThriftFileMetadata}; use crate::parquet::write::State; use crate::parquet::{FOOTER_SIZE, PARQUET_MAGIC}; +use crate::write::RowGroupWriteOptions; pub(super) fn start_file(writer: &mut W) -> ParquetResult { writer.write_all(&PARQUET_MAGIC)?; @@ -149,7 +150,11 @@ impl FileWriter { /// Writes a row group to the file. /// /// This call is IO-bounded - pub fn write(&mut self, row_group: RowGroupIterColumns<'_, E>) -> ParquetResult<()> + pub fn write( + &mut self, + row_group: RowGroupIterColumns<'_, E>, + rg_options: RowGroupWriteOptions, + ) -> ParquetResult<()> where ParquetError: From, E: std::error::Error, @@ -158,10 +163,15 @@ impl FileWriter { self.start()?; } let ordinal = self.row_groups.len(); + + let sorting_columns = + (!rg_options.sorting_columns.is_empty()).then_some(rg_options.sorting_columns); + let (group, specs, size) = write_row_group( &mut self.writer, self.offset, self.schema.columns(), + sorting_columns, row_group, ordinal, )?; diff --git a/crates/polars-parquet/src/parquet/write/row_group.rs b/crates/polars-parquet/src/parquet/write/row_group.rs index 43404dc32a89..0f483e946d6f 100644 --- a/crates/polars-parquet/src/parquet/write/row_group.rs +++ b/crates/polars-parquet/src/parquet/write/row_group.rs @@ -2,7 +2,7 @@ use std::io::Write; #[cfg(feature = "async")] use futures::AsyncWrite; -use parquet_format_safe::{ColumnChunk, RowGroup}; +use parquet_format_safe::{ColumnChunk, RowGroup, SortingColumn}; use super::column_chunk::write_column_chunk; #[cfg(feature = "async")] @@ -74,6 +74,7 @@ pub fn write_row_group< writer: &mut W, mut offset: u64, descriptors: &[ColumnDescriptor], + sorting_columns: Option>, columns: DynIter<'a, std::result::Result, E>>, ordinal: usize, ) -> ParquetResult<(RowGroup, Vec>, u64)> @@ -121,7 +122,7 @@ where columns, total_byte_size, num_rows, - sorting_columns: None, + sorting_columns, file_offset, total_compressed_size: Some(total_compressed_size), ordinal: ordinal.try_into().ok(), diff --git a/crates/polars-pipe/src/executors/sinks/output/parquet.rs b/crates/polars-pipe/src/executors/sinks/output/parquet.rs index 2291b1e21fcd..8831f210eaf8 100644 --- a/crates/polars-pipe/src/executors/sinks/output/parquet.rs +++ b/crates/polars-pipe/src/executors/sinks/output/parquet.rs @@ -40,7 +40,7 @@ pub(super) fn init_row_group_writer_thread( batched.sort_by_key(|chunk| chunk.0); for (_, rg) in batched.drain(0..) { - writer.write_row_groups(rg).unwrap() + writer.write_row_groups_default_options(rg).unwrap() } } if last_write { diff --git a/crates/polars-python/src/conversion/mod.rs b/crates/polars-python/src/conversion/mod.rs index abde51745554..8a5209cad416 100644 --- a/crates/polars-python/src/conversion/mod.rs +++ b/crates/polars-python/src/conversion/mod.rs @@ -20,6 +20,8 @@ use polars_core::utils::arrow::types::NativeType; use polars_core::utils::materialize_dyn_int; use polars_lazy::prelude::*; #[cfg(feature = "parquet")] +use polars::io::parquet::write::SortingColumnBehavior; +#[cfg(feature = "parquet")] use polars_parquet::write::StatisticsOptions; use polars_plan::plans::ScanSources; use polars_utils::pl_str::PlSmallStr; @@ -507,6 +509,26 @@ impl<'s> FromPyObject<'s> for Wrap { } } +#[cfg(feature = "parquet")] +impl<'s> FromPyObject<'s> for Wrap { + fn extract_bound(ob: &Bound<'s, PyAny>) -> PyResult { + let value = ob.extract::()?; + + let sorting_columns = match value.as_ref() { + "no-preserve" => SortingColumns::All(SortingColumnBehavior::NoPreserve), + "preserve" => SortingColumns::All(SortingColumnBehavior::Preserve { force: false }), + "evaluate" => SortingColumns::All(SortingColumnBehavior::Evaluate { force: false }), + _ => { + return Err(PyTypeError::new_err(format!( + "'{value}' is not a valid sorting column behavior", + ))) + }, + }; + + Ok(Wrap(sorting_columns)) + } +} + impl<'s> FromPyObject<'s> for Wrap> { fn extract_bound(ob: &Bound<'s, PyAny>) -> PyResult { let vals = ob.extract::>>>()?; diff --git a/crates/polars-python/src/dataframe/io.rs b/crates/polars-python/src/dataframe/io.rs index 9b34eb7e8ae9..ed884471de0d 100644 --- a/crates/polars-python/src/dataframe/io.rs +++ b/crates/polars-python/src/dataframe/io.rs @@ -7,6 +7,8 @@ use polars::io::avro::AvroCompression; use polars::io::RowIndex; use polars::prelude::*; #[cfg(feature = "parquet")] +use polars::io::parquet::write::SortingColumns; +#[cfg(feature = "parquet")] use polars_parquet::arrow::write::StatisticsOptions; use polars_utils::mmap::ensure_not_mapped; use pyo3::prelude::*; @@ -382,7 +384,7 @@ impl PyDataFrame { } #[cfg(feature = "parquet")] - #[pyo3(signature = (py_f, compression, compression_level, statistics, row_group_size, data_page_size, partition_by, partition_chunk_size_bytes))] + #[pyo3(signature = (py_f, compression, compression_level, statistics, row_group_size, data_page_size, partition_by, partition_chunk_size_bytes, sorting_columns))] pub fn write_parquet( &mut self, py: Python, @@ -394,6 +396,7 @@ impl PyDataFrame { data_page_size: Option, partition_by: Option>, partition_chunk_size_bytes: usize, + sorting_columns: Wrap, ) -> PyResult<()> { use polars_io::partition::write_partitioned_dataset; @@ -430,6 +433,7 @@ impl PyDataFrame { .with_statistics(statistics.0) .with_row_group_size(row_group_size) .with_data_page_size(data_page_size) + .with_sorting_columns(sorting_columns.0) .finish(&mut self.df) .map_err(PyPolarsErr::from) })?; diff --git a/crates/polars/tests/it/io/parquet/arrow/mod.rs b/crates/polars/tests/it/io/parquet/arrow/mod.rs index 0a573eb4a186..1285b354f8ee 100644 --- a/crates/polars/tests/it/io/parquet/arrow/mod.rs +++ b/crates/polars/tests/it/io/parquet/arrow/mod.rs @@ -1341,7 +1341,7 @@ fn integration_write( let mut writer = FileWriter::try_new(writer, schema.clone(), options)?; for group in row_groups { - writer.write(group?)?; + writer.write(group?, RowGroupWriteOptions::default())?; } writer.end(None)?; diff --git a/crates/polars/tests/it/io/parquet/arrow/write.rs b/crates/polars/tests/it/io/parquet/arrow/write.rs index 9619a083ddcb..61fc6ada7c4b 100644 --- a/crates/polars/tests/it/io/parquet/arrow/write.rs +++ b/crates/polars/tests/it/io/parquet/arrow/write.rs @@ -59,7 +59,7 @@ fn round_trip_opt_stats( let mut writer = FileWriter::try_new(writer, schema, options)?; for group in row_groups { - writer.write(group?)?; + writer.write(group?, RowGroupWriteOptions::default())?; } writer.end(None)?; diff --git a/crates/polars/tests/it/io/parquet/roundtrip.rs b/crates/polars/tests/it/io/parquet/roundtrip.rs index d20551432ec0..a67d0fc7d59e 100644 --- a/crates/polars/tests/it/io/parquet/roundtrip.rs +++ b/crates/polars/tests/it/io/parquet/roundtrip.rs @@ -7,7 +7,7 @@ use polars_error::PolarsResult; use polars_parquet::arrow::write::{FileWriter, WriteOptions}; use polars_parquet::read::read_metadata; use polars_parquet::write::{ - CompressionOptions, Encoding, RowGroupIterator, StatisticsOptions, Version, + CompressionOptions, Encoding, RowGroupIterator, RowGroupWriteOptions, StatisticsOptions, Version }; use crate::io::parquet::read::file::FileReader; @@ -37,7 +37,7 @@ fn round_trip( let mut writer = FileWriter::try_new(writer, schema.clone(), options)?; for group in row_groups { - writer.write(group?)?; + writer.write(group?, RowGroupWriteOptions::default())?; } writer.end(None)?; diff --git a/crates/polars/tests/it/io/parquet/write/mod.rs b/crates/polars/tests/it/io/parquet/write/mod.rs index e98a0223937f..89632c015eab 100644 --- a/crates/polars/tests/it/io/parquet/write/mod.rs +++ b/crates/polars/tests/it/io/parquet/write/mod.rs @@ -19,6 +19,7 @@ use polars_parquet::parquet::write::{ Compressor, DynIter, DynStreamingIterator, FileWriter, Version, WriteOptions, }; use polars_parquet::read::read_metadata; +use polars_parquet::write::RowGroupWriteOptions; use polars_utils::mmap::MemReader; use primitive::array_to_page_v1; @@ -87,7 +88,7 @@ fn test_column(column: &str, compression: CompressionOptions) -> ParquetResult<( let writer = Cursor::new(vec![]); let mut writer = FileWriter::new(writer, schema, options, None); - writer.write(DynIter::new(columns))?; + writer.write(DynIter::new(columns), RowGroupWriteOptions::default())?; writer.end(None)?; let data = writer.into_inner().into_inner(); @@ -202,7 +203,7 @@ fn basic() -> ParquetResult<()> { let writer = Cursor::new(vec![]); let mut writer = FileWriter::new(writer, schema, options, None); - writer.write(DynIter::new(columns))?; + writer.write(DynIter::new(columns), RowGroupWriteOptions::default())?; writer.end(None)?; let data = writer.into_inner().into_inner(); diff --git a/py-polars/polars/_typing.py b/py-polars/polars/_typing.py index 1670b08aeb2f..98932504de70 100644 --- a/py-polars/polars/_typing.py +++ b/py-polars/polars/_typing.py @@ -132,6 +132,7 @@ def __arrow_c_stream__(self, requested_schema: object | None = None) -> object: "gigabytes", "terabytes", ] +SortingColumnBehavior: TypeAlias = Literal["no-preserve", "preserve", "evaluate"] StartBy: TypeAlias = Literal[ "window", "datapoint", @@ -195,6 +196,24 @@ def __arrow_c_stream__(self, requested_schema: object | None = None) -> object: "ArrowStreamExportable", ] +# type signature for sorting columns +class SortingColumnPrecise(TypedDict): + force: bool + behavior: SortingColumnBehavior + descending: bool + nulls_first: bool + +_SortingColumn: TypeAlias = Union[ + SortingColumnBehavior, + SortingColumnPrecise, + dict[str, Any], +] +SortingColumn: TypeAlias = Union[ + SortingColumnBehavior, + SortingColumnPrecise, + dict[str, _SortingColumn], +] + # Excel IO ColumnFormatDict: TypeAlias = Mapping[ # dict of colname(s) or selector(s) to format string or dict diff --git a/py-polars/polars/dataframe/frame.py b/py-polars/polars/dataframe/frame.py index 8709af81983f..c4c4fe5c2e53 100644 --- a/py-polars/polars/dataframe/frame.py +++ b/py-polars/polars/dataframe/frame.py @@ -171,6 +171,7 @@ SingleColSelector, SingleIndexSelector, SizeUnit, + SortingColumn, StartBy, UniqueKeepStrategy, UnstackDirection, @@ -3678,6 +3679,7 @@ def write_parquet( pyarrow_options: dict[str, Any] | None = None, partition_by: str | Sequence[str] | None = None, partition_chunk_size_bytes: int = 4_294_967_296, + sorting_columns: SortingColumn = "preserve", ) -> None: """ Write to Apache Parquet file. @@ -3738,6 +3740,10 @@ def write_parquet( writing. Note this is calculated using the size of the DataFrame in memory - the size of the output file may differ depending on the file format / compression. + sorting_columns + .. warning:: + Sorting columns is considered **unstable**. It may be changed + at any point without it being considered a breaking change. Examples -------- @@ -3766,6 +3772,12 @@ def write_parquet( ... pyarrow_options={"partition_cols": ["watermark"]}, ... ) """ + + if sorting_columns != "preserve": + issue_unstable_warning( + "Using the highest compatibility level is considered unstable." + ) + if compression is None: compression = "uncompressed" if isinstance(file, (str, Path)): @@ -3853,6 +3865,7 @@ def write_parquet( data_page_size, partition_by=partition_by, partition_chunk_size_bytes=partition_chunk_size_bytes, + sorting_columns=sorting_columns, ) def write_database(