Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Don't maintain order when maintain_order=False in new streaming sinks #21586

Merged
merged 1 commit into from
Mar 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions crates/polars-io/src/csv/write/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ pub struct CsvWriterOptions {
pub include_bom: bool,
pub include_header: bool,
pub batch_size: NonZeroUsize,
pub maintain_order: bool,
pub serialize_options: SerializeOptions,
}

Expand All @@ -20,7 +19,6 @@ impl Default for CsvWriterOptions {
include_bom: false,
include_header: true,
batch_size: NonZeroUsize::new(1024).unwrap(),
maintain_order: false,
serialize_options: SerializeOptions::default(),
}
}
Expand Down
2 changes: 0 additions & 2 deletions crates/polars-io/src/ipc/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ use crate::shared::schema_to_arrow_checked;
pub struct IpcWriterOptions {
/// Data page compression
pub compression: Option<IpcCompression>,
/// maintain the order the data was processed
pub maintain_order: bool,
}

impl IpcWriterOptions {
Expand Down
5 changes: 1 addition & 4 deletions crates/polars-io/src/json/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,7 @@ use crate::prelude::*;

#[derive(Copy, Clone, Debug, PartialEq, Eq, Default, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct JsonWriterOptions {
/// maintain the order the data was processed
pub maintain_order: bool,
}
pub struct JsonWriterOptions {}

/// The format to use to write the DataFrame to JSON: `Json` (a JSON array)
/// or `JsonLines` (each row output on a separate line).
Expand Down
2 changes: 0 additions & 2 deletions crates/polars-io/src/parquet/write/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ pub struct ParquetWriteOptions {
pub row_group_size: Option<usize>,
/// if `None` will be 1024^2 bytes
pub data_page_size: Option<usize>,
/// maintain the order the data was processed
pub maintain_order: bool,
}

/// The compression strategy to use for writing Parquet files.
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-pipe/src/executors/sinks/output/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl CsvSink {
let io_thread_handle = Arc::new(Some(init_writer_thread(
receiver,
writer,
options.maintain_order,
true,
morsels_per_sink,
)));

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-pipe/src/executors/sinks/output/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl IpcSink {
let io_thread_handle = Arc::new(Some(init_writer_thread(
receiver,
writer,
options.maintain_order,
true,
morsels_per_sink,
)));

Expand Down
4 changes: 2 additions & 2 deletions crates/polars-pipe/src/executors/sinks/output/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl JsonSink {
#[allow(clippy::new_ret_no_self)]
pub fn new(
path: &Path,
options: JsonWriterOptions,
_options: JsonWriterOptions,
_schema: &Schema,
cloud_options: Option<&CloudOptions>,
) -> PolarsResult<FilesSink> {
Expand All @@ -38,7 +38,7 @@ impl JsonSink {
let io_thread_handle = Arc::new(Some(init_writer_thread(
receiver,
writer,
options.maintain_order,
true,
morsels_per_sink,
)));

Expand Down
14 changes: 13 additions & 1 deletion crates/polars-plan/src/dsl/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,11 +304,23 @@ pub enum SyncOnCloseType {
}

/// Options that apply to all sinks.
#[derive(Clone, PartialEq, Eq, Debug, Default, Hash)]
#[derive(Clone, PartialEq, Eq, Debug, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct SinkOptions {
/// Call sync when closing the file.
pub sync_on_close: SyncOnCloseType,

/// The output file needs to maintain order of the data that comes in.
pub maintain_order: bool,
}

impl Default for SinkOptions {
fn default() -> Self {
Self {
sync_on_close: Default::default(),
maintain_order: true,
}
}
}

#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
Expand Down
1 change: 0 additions & 1 deletion crates/polars-python/src/dataframe/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,6 @@ impl PyDataFrame {
statistics: statistics.0,
row_group_size,
data_page_size,
maintain_order: true,
};
write_partitioned_dataset(
&mut self.df,
Expand Down
17 changes: 5 additions & 12 deletions crates/polars-python/src/lazyframe/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,7 @@ impl PyLazyFrame {
#[cfg(all(feature = "streaming", feature = "parquet"))]
#[pyo3(signature = (
target, compression, compression_level, statistics, row_group_size, data_page_size,
maintain_order, cloud_options, credential_provider, retries, sink_options
cloud_options, credential_provider, retries, sink_options
))]
fn sink_parquet(
&self,
Expand All @@ -715,7 +715,6 @@ impl PyLazyFrame {
statistics: Wrap<StatisticsOptions>,
row_group_size: Option<usize>,
data_page_size: Option<usize>,
maintain_order: bool,
cloud_options: Option<Vec<(String, String)>>,
credential_provider: Option<PyObject>,
retries: usize,
Expand All @@ -728,7 +727,6 @@ impl PyLazyFrame {
statistics: statistics.0,
row_group_size,
data_page_size,
maintain_order,
};

let cloud_options = {
Expand Down Expand Up @@ -766,21 +764,19 @@ impl PyLazyFrame {
}

#[cfg(all(feature = "streaming", feature = "ipc"))]
#[pyo3(signature = (target, compression, maintain_order, cloud_options, credential_provider, retries, sink_options))]
#[pyo3(signature = (target, compression, cloud_options, credential_provider, retries, sink_options))]
fn sink_ipc(
&self,
py: Python,
target: SinkTarget,
compression: Option<Wrap<IpcCompression>>,
maintain_order: bool,
cloud_options: Option<Vec<(String, String)>>,
credential_provider: Option<PyObject>,
retries: usize,
sink_options: Wrap<SinkOptions>,
) -> PyResult<()> {
let options = IpcWriterOptions {
compression: compression.map(|c| c.0),
maintain_order,
};

#[cfg(feature = "cloud")]
Expand Down Expand Up @@ -822,7 +818,7 @@ impl PyLazyFrame {
#[pyo3(signature = (
target, include_bom, include_header, separator, line_terminator, quote_char, batch_size,
datetime_format, date_format, time_format, float_scientific, float_precision, null_value,
quote_style, maintain_order, cloud_options, credential_provider, retries, sink_options
quote_style, cloud_options, credential_provider, retries, sink_options
))]
fn sink_csv(
&self,
Expand All @@ -841,7 +837,6 @@ impl PyLazyFrame {
float_precision: Option<usize>,
null_value: Option<String>,
quote_style: Option<Wrap<QuoteStyle>>,
maintain_order: bool,
cloud_options: Option<Vec<(String, String)>>,
credential_provider: Option<PyObject>,
retries: usize,
Expand All @@ -866,7 +861,6 @@ impl PyLazyFrame {
let options = CsvWriterOptions {
include_bom,
include_header,
maintain_order,
batch_size,
serialize_options,
};
Expand Down Expand Up @@ -908,18 +902,17 @@ impl PyLazyFrame {

#[allow(clippy::too_many_arguments)]
#[cfg(all(feature = "streaming", feature = "json"))]
#[pyo3(signature = (target, maintain_order, cloud_options, credential_provider, retries, sink_options))]
#[pyo3(signature = (target, cloud_options, credential_provider, retries, sink_options))]
fn sink_json(
&self,
py: Python,
target: SinkTarget,
maintain_order: bool,
cloud_options: Option<Vec<(String, String)>>,
credential_provider: Option<PyObject>,
retries: usize,
sink_options: Wrap<SinkOptions>,
) -> PyResult<()> {
let options = JsonWriterOptions { maintain_order };
let options = JsonWriterOptions {};

let cloud_options = {
let cloud_options = parse_cloud_options(
Expand Down
15 changes: 12 additions & 3 deletions crates/polars-python/src/lazyframe/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,25 @@ impl<'py> FromPyObject<'py> for Wrap<SinkOptions> {
fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {
let parsed = ob.extract::<pyo3::Bound<'_, PyDict>>()?;

if parsed.len() != 1 {
if parsed.len() != 2 {
return Err(PyValueError::new_err(
"`sink_options` must be a dictionary with the exactly 1 field.",
"`sink_options` must be a dictionary with the exactly 2 field.",
));
}

let sync_on_close = PyDictMethods::get_item(&parsed, "sync_on_close")?
.ok_or_else(|| PyValueError::new_err("`sink_options` must be `sync_on_close` field"))?;
let sync_on_close = sync_on_close.extract::<Wrap<SyncOnCloseType>>()?.0;

Ok(Wrap(SinkOptions { sync_on_close }))
let maintain_order =
PyDictMethods::get_item(&parsed, "maintain_order")?.ok_or_else(|| {
PyValueError::new_err("`sink_options` must be `maintain_order` field")
})?;
let maintain_order = maintain_order.extract::<bool>()?;

Ok(Wrap(SinkOptions {
sync_on_close,
maintain_order,
}))
}
}
25 changes: 25 additions & 0 deletions crates/polars-stream/src/async_primitives/linearizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,31 @@ impl<T: Ord> Linearizer<T> {
(slf, inserters)
}

pub fn new_with_maintain_order(
num_inserters: usize,
buffer_size: usize,
maintain_order: bool,
) -> (Self, Vec<Inserter<T>>) {
if maintain_order {
return Self::new(num_inserters, buffer_size);
}

let (sender, receiver) = channel(buffer_size * num_inserters);
let receivers = vec![receiver];
let inserters = (0..num_inserters)
.map(|_| Inserter {
sender: sender.clone(),
})
.collect();

let slf = Self {
receivers,
poll_state: PollState::PollAll,
heap: BinaryHeap::with_capacity(1),
};
(slf, inserters)
}

/// Fetch the next ordered item produced by senders.
///
/// This may wait for at each sender to have sent at least one value before the [`Linearizer`]
Expand Down
10 changes: 8 additions & 2 deletions crates/polars-stream/src/nodes/io_sinks/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ impl SinkNode for CsvSinkNode {
fn is_sink_input_parallel(&self) -> bool {
true
}
fn do_maintain_order(&self) -> bool {
self.sink_options.maintain_order
}

fn spawn_sink(
&mut self,
Expand Down Expand Up @@ -74,8 +77,11 @@ impl SinkNode for CsvSinkNode {
// .. -> Encode task
let rxs = recv_port.parallel();
// Encode tasks -> IO task
let (mut lin_rx, lin_txs) =
Linearizer::<Linearized>::new(num_pipelines, DEFAULT_SINK_LINEARIZER_BUFFER_SIZE);
let (mut lin_rx, lin_txs) = Linearizer::<Linearized>::new_with_maintain_order(
num_pipelines,
DEFAULT_SINK_LINEARIZER_BUFFER_SIZE,
self.sink_options.maintain_order,
);

// 16MB
const DEFAULT_ALLOCATION_SIZE: usize = 1 << 24;
Expand Down
3 changes: 3 additions & 0 deletions crates/polars-stream/src/nodes/io_sinks/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ impl SinkNode for IpcSinkNode {
fn is_sink_input_parallel(&self) -> bool {
false
}
fn do_maintain_order(&self) -> bool {
self.sink_options.maintain_order
}

fn spawn_sink(
&mut self,
Expand Down
10 changes: 8 additions & 2 deletions crates/polars-stream/src/nodes/io_sinks/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ impl SinkNode for NDJsonSinkNode {
fn is_sink_input_parallel(&self) -> bool {
true
}
fn do_maintain_order(&self) -> bool {
self.sink_options.maintain_order
}

fn spawn_sink(
&mut self,
Expand Down Expand Up @@ -59,8 +62,11 @@ impl SinkNode for NDJsonSinkNode {
// .. -> Encode task
let rxs = recv_port.parallel();
// Encode tasks -> IO task
let (mut lin_rx, lin_txs) =
Linearizer::<Linearized>::new(num_pipelines, DEFAULT_SINK_LINEARIZER_BUFFER_SIZE);
let (mut lin_rx, lin_txs) = Linearizer::<Linearized>::new_with_maintain_order(
num_pipelines,
DEFAULT_SINK_LINEARIZER_BUFFER_SIZE,
self.sink_options.maintain_order,
);

// 16MB
const DEFAULT_ALLOCATION_SIZE: usize = 1 << 24;
Expand Down
3 changes: 2 additions & 1 deletion crates/polars-stream/src/nodes/io_sinks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ fn buffer_and_distribute_columns_task(
pub trait SinkNode {
fn name(&self) -> &str;
fn is_sink_input_parallel(&self) -> bool;
fn do_maintain_order(&self) -> bool;

fn spawn_sink(
&mut self,
Expand Down Expand Up @@ -304,7 +305,7 @@ impl ComputeNode for SinkComputeNode {
let sink_input = if self.sink.is_sink_input_parallel() {
SinkInputPort::Parallel(recv.parallel())
} else {
SinkInputPort::Serial(recv.serial())
SinkInputPort::Serial(recv.serial_with_maintain_order(self.sink.do_maintain_order()))
};
join_handles.push(scope.spawn_task(TaskPriority::High, async move {
let (token, outcome) = PhaseOutcome::new_shared_wait(wait_group.token());
Expand Down
3 changes: 3 additions & 0 deletions crates/polars-stream/src/nodes/io_sinks/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ impl SinkNode for ParquetSinkNode {
fn is_sink_input_parallel(&self) -> bool {
false
}
fn do_maintain_order(&self) -> bool {
self.sink_options.maintain_order
}

fn spawn_sink(
&mut self,
Expand Down
Loading
Loading