From ba31b0e47d4addb7213cc5e33da7c67158149dd1 Mon Sep 17 00:00:00 2001 From: Gijs Burghoorn Date: Tue, 4 Mar 2025 16:30:59 +0100 Subject: [PATCH] perf: Don't maintain order when maintain_order=False in new streaming sinks (#21586) --- crates/polars-io/src/csv/write/options.rs | 2 -- crates/polars-io/src/ipc/write.rs | 2 -- crates/polars-io/src/json/mod.rs | 5 +--- crates/polars-io/src/parquet/write/options.rs | 2 -- .../src/executors/sinks/output/csv.rs | 2 +- .../src/executors/sinks/output/ipc.rs | 2 +- .../src/executors/sinks/output/json.rs | 4 +-- crates/polars-plan/src/dsl/options.rs | 14 ++++++++++- crates/polars-python/src/dataframe/io.rs | 1 - crates/polars-python/src/lazyframe/general.rs | 17 ++++--------- crates/polars-python/src/lazyframe/sink.rs | 15 ++++++++--- .../src/async_primitives/linearizer.rs | 25 +++++++++++++++++++ .../polars-stream/src/nodes/io_sinks/csv.rs | 10 ++++++-- .../polars-stream/src/nodes/io_sinks/ipc.rs | 3 +++ .../polars-stream/src/nodes/io_sinks/json.rs | 10 ++++++-- .../polars-stream/src/nodes/io_sinks/mod.rs | 3 ++- .../src/nodes/io_sinks/parquet.rs | 3 +++ .../src/nodes/io_sinks/partition/max_size.rs | 14 ++++++++++- .../src/physical_plan/to_graph.rs | 1 + crates/polars-stream/src/pipe.rs | 25 ++++++++++++------- py-polars/polars/lazyframe/frame.py | 8 +++--- 21 files changed, 118 insertions(+), 50 deletions(-) diff --git a/crates/polars-io/src/csv/write/options.rs b/crates/polars-io/src/csv/write/options.rs index b0602cbc2a92..e49595dbafde 100644 --- a/crates/polars-io/src/csv/write/options.rs +++ b/crates/polars-io/src/csv/write/options.rs @@ -10,7 +10,6 @@ pub struct CsvWriterOptions { pub include_bom: bool, pub include_header: bool, pub batch_size: NonZeroUsize, - pub maintain_order: bool, pub serialize_options: SerializeOptions, } @@ -20,7 +19,6 @@ impl Default for CsvWriterOptions { include_bom: false, include_header: true, batch_size: NonZeroUsize::new(1024).unwrap(), - maintain_order: false, serialize_options: SerializeOptions::default(), } } diff --git a/crates/polars-io/src/ipc/write.rs b/crates/polars-io/src/ipc/write.rs index 0f13b9967b07..b598373becea 100644 --- a/crates/polars-io/src/ipc/write.rs +++ b/crates/polars-io/src/ipc/write.rs @@ -14,8 +14,6 @@ use crate::shared::schema_to_arrow_checked; pub struct IpcWriterOptions { /// Data page compression pub compression: Option, - /// maintain the order the data was processed - pub maintain_order: bool, } impl IpcWriterOptions { diff --git a/crates/polars-io/src/json/mod.rs b/crates/polars-io/src/json/mod.rs index df2ff8bb9c28..e3f37a156d6d 100644 --- a/crates/polars-io/src/json/mod.rs +++ b/crates/polars-io/src/json/mod.rs @@ -82,10 +82,7 @@ use crate::prelude::*; #[derive(Copy, Clone, Debug, PartialEq, Eq, Default, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -pub struct JsonWriterOptions { - /// maintain the order the data was processed - pub maintain_order: bool, -} +pub struct JsonWriterOptions {} /// The format to use to write the DataFrame to JSON: `Json` (a JSON array) /// or `JsonLines` (each row output on a separate line). diff --git a/crates/polars-io/src/parquet/write/options.rs b/crates/polars-io/src/parquet/write/options.rs index 4e4bfa9e1edf..2b733ad17e80 100644 --- a/crates/polars-io/src/parquet/write/options.rs +++ b/crates/polars-io/src/parquet/write/options.rs @@ -17,8 +17,6 @@ pub struct ParquetWriteOptions { pub row_group_size: Option, /// if `None` will be 1024^2 bytes pub data_page_size: Option, - /// maintain the order the data was processed - pub maintain_order: bool, } /// The compression strategy to use for writing Parquet files. diff --git a/crates/polars-pipe/src/executors/sinks/output/csv.rs b/crates/polars-pipe/src/executors/sinks/output/csv.rs index 68859fa4654a..26afc2cb4d9b 100644 --- a/crates/polars-pipe/src/executors/sinks/output/csv.rs +++ b/crates/polars-pipe/src/executors/sinks/output/csv.rs @@ -45,7 +45,7 @@ impl CsvSink { let io_thread_handle = Arc::new(Some(init_writer_thread( receiver, writer, - options.maintain_order, + true, morsels_per_sink, ))); diff --git a/crates/polars-pipe/src/executors/sinks/output/ipc.rs b/crates/polars-pipe/src/executors/sinks/output/ipc.rs index 2ef592c19fcd..ce32249af554 100644 --- a/crates/polars-pipe/src/executors/sinks/output/ipc.rs +++ b/crates/polars-pipe/src/executors/sinks/output/ipc.rs @@ -32,7 +32,7 @@ impl IpcSink { let io_thread_handle = Arc::new(Some(init_writer_thread( receiver, writer, - options.maintain_order, + true, morsels_per_sink, ))); diff --git a/crates/polars-pipe/src/executors/sinks/output/json.rs b/crates/polars-pipe/src/executors/sinks/output/json.rs index c0cab43e067a..24412a144517 100644 --- a/crates/polars-pipe/src/executors/sinks/output/json.rs +++ b/crates/polars-pipe/src/executors/sinks/output/json.rs @@ -24,7 +24,7 @@ impl JsonSink { #[allow(clippy::new_ret_no_self)] pub fn new( path: &Path, - options: JsonWriterOptions, + _options: JsonWriterOptions, _schema: &Schema, cloud_options: Option<&CloudOptions>, ) -> PolarsResult { @@ -38,7 +38,7 @@ impl JsonSink { let io_thread_handle = Arc::new(Some(init_writer_thread( receiver, writer, - options.maintain_order, + true, morsels_per_sink, ))); diff --git a/crates/polars-plan/src/dsl/options.rs b/crates/polars-plan/src/dsl/options.rs index 4c6beaf7419e..92ff28f62015 100644 --- a/crates/polars-plan/src/dsl/options.rs +++ b/crates/polars-plan/src/dsl/options.rs @@ -304,11 +304,23 @@ pub enum SyncOnCloseType { } /// Options that apply to all sinks. -#[derive(Clone, PartialEq, Eq, Debug, Default, Hash)] +#[derive(Clone, PartialEq, Eq, Debug, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct SinkOptions { /// Call sync when closing the file. pub sync_on_close: SyncOnCloseType, + + /// The output file needs to maintain order of the data that comes in. + pub maintain_order: bool, +} + +impl Default for SinkOptions { + fn default() -> Self { + Self { + sync_on_close: Default::default(), + maintain_order: true, + } + } } #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] diff --git a/crates/polars-python/src/dataframe/io.rs b/crates/polars-python/src/dataframe/io.rs index 9cdf1eeb413d..9cbe678211a2 100644 --- a/crates/polars-python/src/dataframe/io.rs +++ b/crates/polars-python/src/dataframe/io.rs @@ -443,7 +443,6 @@ impl PyDataFrame { statistics: statistics.0, row_group_size, data_page_size, - maintain_order: true, }; write_partitioned_dataset( &mut self.df, diff --git a/crates/polars-python/src/lazyframe/general.rs b/crates/polars-python/src/lazyframe/general.rs index 1397dadeb19a..f90300e4fd61 100644 --- a/crates/polars-python/src/lazyframe/general.rs +++ b/crates/polars-python/src/lazyframe/general.rs @@ -704,7 +704,7 @@ impl PyLazyFrame { #[cfg(all(feature = "streaming", feature = "parquet"))] #[pyo3(signature = ( target, compression, compression_level, statistics, row_group_size, data_page_size, - maintain_order, cloud_options, credential_provider, retries, sink_options + cloud_options, credential_provider, retries, sink_options ))] fn sink_parquet( &self, @@ -715,7 +715,6 @@ impl PyLazyFrame { statistics: Wrap, row_group_size: Option, data_page_size: Option, - maintain_order: bool, cloud_options: Option>, credential_provider: Option, retries: usize, @@ -728,7 +727,6 @@ impl PyLazyFrame { statistics: statistics.0, row_group_size, data_page_size, - maintain_order, }; let cloud_options = { @@ -766,13 +764,12 @@ impl PyLazyFrame { } #[cfg(all(feature = "streaming", feature = "ipc"))] - #[pyo3(signature = (target, compression, maintain_order, cloud_options, credential_provider, retries, sink_options))] + #[pyo3(signature = (target, compression, cloud_options, credential_provider, retries, sink_options))] fn sink_ipc( &self, py: Python, target: SinkTarget, compression: Option>, - maintain_order: bool, cloud_options: Option>, credential_provider: Option, retries: usize, @@ -780,7 +777,6 @@ impl PyLazyFrame { ) -> PyResult<()> { let options = IpcWriterOptions { compression: compression.map(|c| c.0), - maintain_order, }; #[cfg(feature = "cloud")] @@ -822,7 +818,7 @@ impl PyLazyFrame { #[pyo3(signature = ( target, include_bom, include_header, separator, line_terminator, quote_char, batch_size, datetime_format, date_format, time_format, float_scientific, float_precision, null_value, - quote_style, maintain_order, cloud_options, credential_provider, retries, sink_options + quote_style, cloud_options, credential_provider, retries, sink_options ))] fn sink_csv( &self, @@ -841,7 +837,6 @@ impl PyLazyFrame { float_precision: Option, null_value: Option, quote_style: Option>, - maintain_order: bool, cloud_options: Option>, credential_provider: Option, retries: usize, @@ -866,7 +861,6 @@ impl PyLazyFrame { let options = CsvWriterOptions { include_bom, include_header, - maintain_order, batch_size, serialize_options, }; @@ -908,18 +902,17 @@ impl PyLazyFrame { #[allow(clippy::too_many_arguments)] #[cfg(all(feature = "streaming", feature = "json"))] - #[pyo3(signature = (target, maintain_order, cloud_options, credential_provider, retries, sink_options))] + #[pyo3(signature = (target, cloud_options, credential_provider, retries, sink_options))] fn sink_json( &self, py: Python, target: SinkTarget, - maintain_order: bool, cloud_options: Option>, credential_provider: Option, retries: usize, sink_options: Wrap, ) -> PyResult<()> { - let options = JsonWriterOptions { maintain_order }; + let options = JsonWriterOptions {}; let cloud_options = { let cloud_options = parse_cloud_options( diff --git a/crates/polars-python/src/lazyframe/sink.rs b/crates/polars-python/src/lazyframe/sink.rs index 10aa7985631c..97b1f0530fd9 100644 --- a/crates/polars-python/src/lazyframe/sink.rs +++ b/crates/polars-python/src/lazyframe/sink.rs @@ -74,9 +74,9 @@ impl<'py> FromPyObject<'py> for Wrap { fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult { let parsed = ob.extract::>()?; - if parsed.len() != 1 { + if parsed.len() != 2 { return Err(PyValueError::new_err( - "`sink_options` must be a dictionary with the exactly 1 field.", + "`sink_options` must be a dictionary with the exactly 2 field.", )); } @@ -84,6 +84,15 @@ impl<'py> FromPyObject<'py> for Wrap { .ok_or_else(|| PyValueError::new_err("`sink_options` must be `sync_on_close` field"))?; let sync_on_close = sync_on_close.extract::>()?.0; - Ok(Wrap(SinkOptions { sync_on_close })) + let maintain_order = + PyDictMethods::get_item(&parsed, "maintain_order")?.ok_or_else(|| { + PyValueError::new_err("`sink_options` must be `maintain_order` field") + })?; + let maintain_order = maintain_order.extract::()?; + + Ok(Wrap(SinkOptions { + sync_on_close, + maintain_order, + })) } } diff --git a/crates/polars-stream/src/async_primitives/linearizer.rs b/crates/polars-stream/src/async_primitives/linearizer.rs index 49e466d8a126..b2fe4f371324 100644 --- a/crates/polars-stream/src/async_primitives/linearizer.rs +++ b/crates/polars-stream/src/async_primitives/linearizer.rs @@ -59,6 +59,31 @@ impl Linearizer { (slf, inserters) } + pub fn new_with_maintain_order( + num_inserters: usize, + buffer_size: usize, + maintain_order: bool, + ) -> (Self, Vec>) { + if maintain_order { + return Self::new(num_inserters, buffer_size); + } + + let (sender, receiver) = channel(buffer_size * num_inserters); + let receivers = vec![receiver]; + let inserters = (0..num_inserters) + .map(|_| Inserter { + sender: sender.clone(), + }) + .collect(); + + let slf = Self { + receivers, + poll_state: PollState::PollAll, + heap: BinaryHeap::with_capacity(1), + }; + (slf, inserters) + } + /// Fetch the next ordered item produced by senders. /// /// This may wait for at each sender to have sent at least one value before the [`Linearizer`] diff --git a/crates/polars-stream/src/nodes/io_sinks/csv.rs b/crates/polars-stream/src/nodes/io_sinks/csv.rs index 7f252e4991fb..e52b0244b317 100644 --- a/crates/polars-stream/src/nodes/io_sinks/csv.rs +++ b/crates/polars-stream/src/nodes/io_sinks/csv.rs @@ -47,6 +47,9 @@ impl SinkNode for CsvSinkNode { fn is_sink_input_parallel(&self) -> bool { true } + fn do_maintain_order(&self) -> bool { + self.sink_options.maintain_order + } fn spawn_sink( &mut self, @@ -74,8 +77,11 @@ impl SinkNode for CsvSinkNode { // .. -> Encode task let rxs = recv_port.parallel(); // Encode tasks -> IO task - let (mut lin_rx, lin_txs) = - Linearizer::::new(num_pipelines, DEFAULT_SINK_LINEARIZER_BUFFER_SIZE); + let (mut lin_rx, lin_txs) = Linearizer::::new_with_maintain_order( + num_pipelines, + DEFAULT_SINK_LINEARIZER_BUFFER_SIZE, + self.sink_options.maintain_order, + ); // 16MB const DEFAULT_ALLOCATION_SIZE: usize = 1 << 24; diff --git a/crates/polars-stream/src/nodes/io_sinks/ipc.rs b/crates/polars-stream/src/nodes/io_sinks/ipc.rs index f328ecec2aea..c660f07a0868 100644 --- a/crates/polars-stream/src/nodes/io_sinks/ipc.rs +++ b/crates/polars-stream/src/nodes/io_sinks/ipc.rs @@ -70,6 +70,9 @@ impl SinkNode for IpcSinkNode { fn is_sink_input_parallel(&self) -> bool { false } + fn do_maintain_order(&self) -> bool { + self.sink_options.maintain_order + } fn spawn_sink( &mut self, diff --git a/crates/polars-stream/src/nodes/io_sinks/json.rs b/crates/polars-stream/src/nodes/io_sinks/json.rs index 37ca3db22cee..85c5653dc075 100644 --- a/crates/polars-stream/src/nodes/io_sinks/json.rs +++ b/crates/polars-stream/src/nodes/io_sinks/json.rs @@ -32,6 +32,9 @@ impl SinkNode for NDJsonSinkNode { fn is_sink_input_parallel(&self) -> bool { true } + fn do_maintain_order(&self) -> bool { + self.sink_options.maintain_order + } fn spawn_sink( &mut self, @@ -59,8 +62,11 @@ impl SinkNode for NDJsonSinkNode { // .. -> Encode task let rxs = recv_port.parallel(); // Encode tasks -> IO task - let (mut lin_rx, lin_txs) = - Linearizer::::new(num_pipelines, DEFAULT_SINK_LINEARIZER_BUFFER_SIZE); + let (mut lin_rx, lin_txs) = Linearizer::::new_with_maintain_order( + num_pipelines, + DEFAULT_SINK_LINEARIZER_BUFFER_SIZE, + self.sink_options.maintain_order, + ); // 16MB const DEFAULT_ALLOCATION_SIZE: usize = 1 << 24; diff --git a/crates/polars-stream/src/nodes/io_sinks/mod.rs b/crates/polars-stream/src/nodes/io_sinks/mod.rs index 51386115ed3e..ffa888268582 100644 --- a/crates/polars-stream/src/nodes/io_sinks/mod.rs +++ b/crates/polars-stream/src/nodes/io_sinks/mod.rs @@ -185,6 +185,7 @@ fn buffer_and_distribute_columns_task( pub trait SinkNode { fn name(&self) -> &str; fn is_sink_input_parallel(&self) -> bool; + fn do_maintain_order(&self) -> bool; fn spawn_sink( &mut self, @@ -304,7 +305,7 @@ impl ComputeNode for SinkComputeNode { let sink_input = if self.sink.is_sink_input_parallel() { SinkInputPort::Parallel(recv.parallel()) } else { - SinkInputPort::Serial(recv.serial()) + SinkInputPort::Serial(recv.serial_with_maintain_order(self.sink.do_maintain_order())) }; join_handles.push(scope.spawn_task(TaskPriority::High, async move { let (token, outcome) = PhaseOutcome::new_shared_wait(wait_group.token()); diff --git a/crates/polars-stream/src/nodes/io_sinks/parquet.rs b/crates/polars-stream/src/nodes/io_sinks/parquet.rs index 72bcd1b93a95..f7cd8aba3fd9 100644 --- a/crates/polars-stream/src/nodes/io_sinks/parquet.rs +++ b/crates/polars-stream/src/nodes/io_sinks/parquet.rs @@ -78,6 +78,9 @@ impl SinkNode for ParquetSinkNode { fn is_sink_input_parallel(&self) -> bool { false } + fn do_maintain_order(&self) -> bool { + self.sink_options.maintain_order + } fn spawn_sink( &mut self, diff --git a/crates/polars-stream/src/nodes/io_sinks/partition/max_size.rs b/crates/polars-stream/src/nodes/io_sinks/partition/max_size.rs index fb74548ac450..d743de929800 100644 --- a/crates/polars-stream/src/nodes/io_sinks/partition/max_size.rs +++ b/crates/polars-stream/src/nodes/io_sinks/partition/max_size.rs @@ -8,6 +8,7 @@ use polars_core::prelude::{InitHashMaps, PlHashMap}; use polars_core::schema::SchemaRef; use polars_error::PolarsResult; use polars_expr::state::ExecutionState; +use polars_plan::dsl::SinkOptions; use polars_utils::pl_str::PlSmallStr; use polars_utils::{format_pl_smallstr, IdxSize}; @@ -25,6 +26,8 @@ pub struct MaxSizePartitionSinkNode { max_size: IdxSize, create_new: CreateNewSinkFn, + sink_options: SinkOptions, + /// The number of tasks that get used to wait for finished files. If you are write large enough /// files (i.e. they would be formed by multiple morsels) this should almost always be 1. But /// if you are writing many small files, this should scan up to allow for your threads to @@ -37,7 +40,12 @@ pub struct MaxSizePartitionSinkNode { const DEFAULT_RETIRE_TASKS: usize = 1; impl MaxSizePartitionSinkNode { - pub fn new(input_schema: SchemaRef, max_size: IdxSize, create_new: CreateNewSinkFn) -> Self { + pub fn new( + input_schema: SchemaRef, + max_size: IdxSize, + create_new: CreateNewSinkFn, + sink_options: SinkOptions, + ) -> Self { assert!(max_size > 0); let num_retire_tasks = std::env::var("POLARS_MAX_SIZE_SINK_RETIRE_TASKS").map_or(DEFAULT_RETIRE_TASKS, |v| { @@ -50,6 +58,7 @@ impl MaxSizePartitionSinkNode { input_schema, max_size, create_new, + sink_options, num_retire_tasks, } } @@ -68,6 +77,9 @@ impl SinkNode for MaxSizePartitionSinkNode { fn is_sink_input_parallel(&self) -> bool { false } + fn do_maintain_order(&self) -> bool { + self.sink_options.maintain_order + } fn spawn_sink( &mut self, diff --git a/crates/polars-stream/src/physical_plan/to_graph.rs b/crates/polars-stream/src/physical_plan/to_graph.rs index 98b3a80fc6e9..f4fe48fb093d 100644 --- a/crates/polars-stream/src/physical_plan/to_graph.rs +++ b/crates/polars-stream/src/physical_plan/to_graph.rs @@ -307,6 +307,7 @@ fn to_graph_rec<'a>( input_schema, *max_size, create_new, + sink_options.clone(), ), ), [(input_key, input.port)], diff --git a/crates/polars-stream/src/pipe.rs b/crates/polars-stream/src/pipe.rs index 60ff8ceeca59..d79d349c4726 100644 --- a/crates/polars-stream/src/pipe.rs +++ b/crates/polars-stream/src/pipe.rs @@ -13,9 +13,11 @@ use crate::{DEFAULT_DISTRIBUTOR_BUFFER_SIZE, DEFAULT_LINEARIZER_BUFFER_SIZE}; pub enum PhysicalPipe { Uninit(usize), - SerialReceiver(usize, Sender), + /// (_, _, maintain_order) + SerialReceiver(usize, Sender, bool), ParallelReceiver(Vec>), - NeedsLinearizer(Vec>, Sender), + /// (_, _, maintain_order) + NeedsLinearizer(Vec>, Sender, bool), NeedsDistributor(Receiver, Vec>), Initialized, } @@ -25,11 +27,15 @@ pub struct RecvPort<'a>(&'a mut PhysicalPipe); impl RecvPort<'_> { pub fn serial(self) -> Receiver { + self.serial_with_maintain_order(true) + } + + pub fn serial_with_maintain_order(self, maintain_order: bool) -> Receiver { let PhysicalPipe::Uninit(num_pipelines) = self.0 else { unreachable!() }; let (send, recv) = connector(); - *self.0 = PhysicalPipe::SerialReceiver(*num_pipelines, send); + *self.0 = PhysicalPipe::SerialReceiver(*num_pipelines, send, maintain_order); recv } @@ -52,7 +58,7 @@ impl SendPort<'_> { pub fn serial(self) -> Sender { match core::mem::replace(self.0, PhysicalPipe::Uninit(0)) { - PhysicalPipe::SerialReceiver(_, send) => { + PhysicalPipe::SerialReceiver(_, send, _) => { *self.0 = PhysicalPipe::Initialized; send }, @@ -67,10 +73,10 @@ impl SendPort<'_> { pub fn parallel(self) -> Vec> { match core::mem::replace(self.0, PhysicalPipe::Uninit(0)) { - PhysicalPipe::SerialReceiver(num_pipelines, send) => { + PhysicalPipe::SerialReceiver(num_pipelines, send, maintain_order) => { let (senders, receivers): (Vec>, Vec>) = (0..num_pipelines).map(|_| connector()).unzip(); - *self.0 = PhysicalPipe::NeedsLinearizer(receivers, send); + *self.0 = PhysicalPipe::NeedsLinearizer(receivers, send, maintain_order); senders }, PhysicalPipe::ParallelReceiver(senders) => { @@ -109,18 +115,19 @@ impl PhysicalPipe { handles: &mut Vec>>, ) { match core::mem::replace(self, Self::Initialized) { - Self::Uninit(_) | Self::SerialReceiver(_, _) | Self::ParallelReceiver(_) => { + Self::Uninit(_) | Self::SerialReceiver(_, _, _) | Self::ParallelReceiver(_) => { panic!("PhysicalPipe::spawn called on (partially) initialized pipe"); }, Self::Initialized => {}, - Self::NeedsLinearizer(receivers, mut sender) => { + Self::NeedsLinearizer(receivers, mut sender, maintain_order) => { let num_pipelines = receivers.len(); let (mut linearizer, inserters) = - Linearizer::, Morsel>>::new( + Linearizer::, Morsel>>::new_with_maintain_order( num_pipelines, DEFAULT_LINEARIZER_BUFFER_SIZE, + maintain_order, ); handles.push(scope.spawn_task(TaskPriority::High, async move { diff --git a/py-polars/polars/lazyframe/frame.py b/py-polars/polars/lazyframe/frame.py index aa92222773da..c5b91dffd4d0 100644 --- a/py-polars/polars/lazyframe/frame.py +++ b/py-polars/polars/lazyframe/frame.py @@ -2528,6 +2528,7 @@ def sink_parquet( sink_options = { "sync_on_close": sync_on_close or "none", + "maintain_order": maintain_order, } return lf.sink_parquet( @@ -2537,7 +2538,6 @@ def sink_parquet( statistics=statistics, row_group_size=row_group_size, data_page_size=data_page_size, - maintain_order=maintain_order, cloud_options=storage_options, credential_provider=credential_provider_builder, retries=retries, @@ -2673,12 +2673,12 @@ def sink_ipc( sink_options = { "sync_on_close": sync_on_close or "none", + "maintain_order": maintain_order, } return lf.sink_ipc( target=target, compression=compression, - maintain_order=maintain_order, cloud_options=storage_options, credential_provider=credential_provider_builder, retries=retries, @@ -2881,6 +2881,7 @@ def sink_csv( sink_options = { "sync_on_close": sync_on_close or "none", + "maintain_order": maintain_order, } return lf.sink_csv( @@ -2898,7 +2899,6 @@ def sink_csv( float_precision=float_precision, null_value=null_value, quote_style=quote_style, - maintain_order=maintain_order, cloud_options=storage_options, credential_provider=credential_provider_builder, retries=retries, @@ -3030,11 +3030,11 @@ def sink_ndjson( sink_options = { "sync_on_close": sync_on_close or "none", + "maintain_order": maintain_order, } return lf.sink_json( target=target, - maintain_order=maintain_order, cloud_options=storage_options, credential_provider=credential_provider_builder, retries=retries,