Skip to content

Commit

Permalink
get started on writing sorting columns
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite committed Oct 16, 2024
1 parent d8dde13 commit 7f6f1a2
Show file tree
Hide file tree
Showing 17 changed files with 362 additions and 35 deletions.
173 changes: 159 additions & 14 deletions crates/polars-io/src/parquet/write/batched_writer.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,29 @@
use std::io::Write;
use std::sync::Mutex;

use arrow::array::Array;
use arrow::record_batch::RecordBatch;
use polars_core::prelude::*;
use polars_core::series::IsSorted;
use polars_core::POOL;
use polars_parquet::read::ParquetError;
use polars_parquet::write::{
array_to_columns, CompressedPage, Compressor, DynIter, DynStreamingIterator, Encoding,
FallibleStreamingIterator, FileWriter, Page, ParquetType, RowGroupIterColumns,
SchemaDescriptor, WriteOptions,
RowGroupWriteOptions, SchemaDescriptor, SortingColumn, WriteOptions,
};
use rayon::prelude::*;

use super::options::{MaterializedSortingColumns, MetadataOptions, SortingColumnBehavior};

pub struct BatchedWriter<W: Write> {
// A mutex so that streaming engine can get concurrent read access to
// compress pages.
pub(super) writer: Mutex<FileWriter<W>>,
pub(super) parquet_schema: SchemaDescriptor,
pub(super) encodings: Vec<Vec<Encoding>>,
pub(super) options: WriteOptions,
pub(super) metadata_options: MetadataOptions,
pub(super) parallel: bool,
}

Expand Down Expand Up @@ -53,12 +58,14 @@ impl<W: Write> BatchedWriter<W> {
&self.parquet_schema,
&self.encodings,
self.options,
&self.metadata_options,
self.parallel,
);
// Lock before looping so that order is maintained under contention.
let mut writer = self.writer.lock().unwrap();
for group in row_group_iter {
writer.write(group?)?;
for item in row_group_iter {
let (group, rg_options) = item?;
writer.write(group, rg_options)?;
}
Ok(())
}
Expand All @@ -67,14 +74,29 @@ impl<W: Write> BatchedWriter<W> {
&self.writer
}

pub fn write_row_groups(
pub fn write_row_groups_default_options(
&self,
rgs: Vec<RowGroupIterColumns<'static, PolarsError>>,
) -> PolarsResult<()> {
// Lock before looping so that order is maintained.
let mut writer = self.writer.lock().unwrap();
for group in rgs {
writer.write(group)?;
writer.write(group, RowGroupWriteOptions::default())?;
}
Ok(())
}

pub fn write_row_groups(
&self,
rgs: Vec<(
RowGroupIterColumns<'static, PolarsError>,
RowGroupWriteOptions,
)>,
) -> PolarsResult<()> {
// Lock before looping so that order is maintained.
let mut writer = self.writer.lock().unwrap();
for (group, rg_options) in rgs {
writer.write(group, rg_options)?;
}
Ok(())
}
Expand All @@ -93,17 +115,40 @@ fn prepare_rg_iter<'a>(
parquet_schema: &'a SchemaDescriptor,
encodings: &'a [Vec<Encoding>],
options: WriteOptions,
md_options: &'a MetadataOptions,
parallel: bool,
) -> impl Iterator<Item = PolarsResult<RowGroupIterColumns<'static, PolarsError>>> + 'a {
) -> impl Iterator<
Item = PolarsResult<(
RowGroupIterColumns<'static, PolarsError>,
RowGroupWriteOptions,
)>,
> + 'a {
// @TODO: This does not work for nested columns.
let sortedness = df
.get_columns()
.iter()
.map(|c| c.is_sorted_flag())
.collect::<Vec<_>>();
// @TODO: This does not work for nested columns.
let dtypes = df
.get_columns()
.iter()
.map(|c| c.dtype())
.collect::<Vec<_>>();

let rb_iter = df.iter_chunks(CompatLevel::newest(), false);
rb_iter.filter_map(move |batch| match batch.len() {
0 => None,
_ => {
let row_group =
create_serializer(batch, parquet_schema.fields(), encodings, options, parallel);

Some(row_group)
},
_ => Some(create_serializer(
batch,
parquet_schema.fields(),
encodings,
&sortedness,
&dtypes,
options,
md_options,
parallel,
)),
})
}

Expand Down Expand Up @@ -147,9 +192,15 @@ fn create_serializer(
batch: RecordBatch,
fields: &[ParquetType],
encodings: &[Vec<Encoding>],
sortedness: &[IsSorted],
dtypes: &[&DataType],
options: WriteOptions,
md_options: &MetadataOptions,
parallel: bool,
) -> PolarsResult<RowGroupIterColumns<'static, PolarsError>> {
) -> PolarsResult<(
RowGroupIterColumns<'static, PolarsError>,
RowGroupWriteOptions,
)> {
let func = move |((array, type_), encoding): ((&ArrayRef, &ParquetType), &Vec<Encoding>)| {
array_to_pages_iter(array, type_, encoding, options)
};
Expand All @@ -176,7 +227,101 @@ fn create_serializer(

let row_group = DynIter::new(columns.into_iter());

Ok(row_group)
let mut rg_options = RowGroupWriteOptions::default();

match &md_options.sorting_columns {
MaterializedSortingColumns::All(behavior) => {
// @TODO: properly handle nested columns.
rg_options.sorting_columns = (0..batch.columns().len())
.filter_map(|leaf_idx| {
to_sorting_column(&batch, dtypes, sortedness, leaf_idx, *behavior)
})
.collect();
},
MaterializedSortingColumns::PerLeaf(sorting_columns) => {
rg_options.sorting_columns = sorting_columns
.iter()
.filter_map(|(leaf_idx, behavior)| {
to_sorting_column(&batch, dtypes, sortedness, *leaf_idx as usize, *behavior)
})
.collect();
},
}

Ok((row_group, rg_options))
}

fn has_compatible_sortedness(dtype: &DataType, _array: &dyn Array) -> bool {
use DataType as DT;

matches!(
dtype,
DT::UInt8
| DT::UInt16
| DT::UInt32
| DT::UInt64
| DT::Int8
| DT::Int16
| DT::Int32
| DT::Int64
)
}

fn to_sorting_column(
batch: &RecordBatch,
dtypes: &[&DataType],
sortedness: &[IsSorted],
leaf_idx: usize,
behavior: SortingColumnBehavior,
) -> Option<SortingColumn> {
use SortingColumnBehavior as B;

// @TODO: This does not work for nested structures.
let col_idx = leaf_idx;
let array = &batch.columns()[col_idx as usize];
let dtype = dtypes[leaf_idx as usize];

if matches!(
behavior,
B::Preserve { force: false } | B::Evaluate { force: false }
) {
if !has_compatible_sortedness(dtype, array.as_ref()) {
return None;
}
}

match (behavior, sortedness[leaf_idx as usize]) {
(B::NoPreserve, _) => None,
(
B::Force {
descending,
nulls_first,
},
_,
) => Some(SortingColumn {
column_idx: leaf_idx as i32,
descending,
nulls_first,
}),
(B::Preserve { .. }, IsSorted::Not) => None,
(B::Preserve { .. } | B::Evaluate { .. }, IsSorted::Ascending) => {
let nulls_first = !array.is_empty() && unsafe { array.get_unchecked(0) }.is_null();
Some(SortingColumn {
column_idx: leaf_idx as i32,
descending: false,
nulls_first,
})
},
(B::Preserve { .. } | B::Evaluate { .. }, IsSorted::Descending) => {
let nulls_first = !array.is_empty() && unsafe { array.get_unchecked(0) }.is_null();
Some(SortingColumn {
column_idx: leaf_idx as i32,
descending: true,
nulls_first,
})
},
(B::Evaluate { .. }, IsSorted::Not) => todo!(),
}
}

/// This serializer encodes and compresses all eagerly in memory.
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-io/src/parquet/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ mod options;
mod writer;

pub use batched_writer::BatchedWriter;
pub use options::{BrotliLevel, GzipLevel, ParquetCompression, ParquetWriteOptions, ZstdLevel};
pub use options::{BrotliLevel, GzipLevel, ParquetCompression, ParquetWriteOptions, ZstdLevel, SortingColumnBehavior};
pub use polars_parquet::write::{RowGroupIterColumns, StatisticsOptions};
pub use writer::ParquetWriter;
pub use writer::{SortingColumns, ParquetWriter};
33 changes: 33 additions & 0 deletions crates/polars-io/src/parquet/write/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,39 @@ use polars_parquet::write::{
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};

#[derive(Clone, Copy, PartialEq, Eq)]
pub enum SortingColumnBehavior {
/// Never set the `SortingColumn` metadata.
NoPreserve,

/// Preserve the known sortedness information into the `SortingColumn` field.
Preserve { force: bool },

/// Evaluate whether a column is sorted and store found information in the `SortingColumn`
/// field.
Evaluate { force: bool },

/// Force the column to be of a certain `SortingColumn` value.
Force { descending: bool, nulls_first: bool },
}

impl Default for SortingColumnBehavior {
fn default() -> Self {
Self::Preserve { force: false }
}
}

#[derive(Clone, PartialEq, Eq)]
pub enum MaterializedSortingColumns {
All(SortingColumnBehavior),
PerLeaf(Vec<(i32, SortingColumnBehavior)>),
}

#[derive(Clone, PartialEq, Eq)]
pub(crate) struct MetadataOptions {
pub sorting_columns: MaterializedSortingColumns,
}

#[derive(Copy, Clone, Debug, PartialEq, Eq, Default, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct ParquetWriteOptions {
Expand Down
Loading

0 comments on commit 7f6f1a2

Please sign in to comment.