From da1a795f3f61e3b0c398051029993b5e1b7af18b Mon Sep 17 00:00:00 2001 From: coastalwhite Date: Tue, 4 Mar 2025 11:17:04 +0100 Subject: [PATCH] add sync_on_close flag --- crates/polars-lazy/src/frame/mod.rs | 16 ++++ crates/polars-pipe/src/pipeline/convert.rs | 1 + crates/polars-plan/src/dsl/options.rs | 23 +++++ crates/polars-python/src/functions/mod.rs | 4 - .../src/functions/partitioning.rs | 24 ----- crates/polars-python/src/lazyframe/general.rs | 65 ++++++-------- crates/polars-python/src/lazyframe/mod.rs | 2 + crates/polars-python/src/lazyframe/sink.rs | 89 +++++++++++++++++++ .../polars-stream/src/nodes/io_sinks/csv.rs | 14 ++- .../polars-stream/src/nodes/io_sinks/ipc.rs | 17 +++- .../polars-stream/src/nodes/io_sinks/json.rs | 13 +-- .../polars-stream/src/nodes/io_sinks/mod.rs | 22 +++++ .../src/nodes/io_sinks/parquet.rs | 18 ++-- .../src/nodes/io_sinks/partition/mod.rs | 17 +++- .../src/physical_plan/lower_ir.rs | 9 ++ crates/polars-stream/src/physical_plan/mod.rs | 6 +- .../src/physical_plan/to_graph.rs | 14 ++- py-polars/polars/_typing.py | 1 + py-polars/polars/lazyframe/frame.py | 49 ++++++++++ py-polars/src/lib.rs | 4 +- py-polars/tests/unit/io/test_partition.py | 19 ++-- .../tests/unit/streaming/test_streaming_io.py | 54 ----------- 22 files changed, 326 insertions(+), 155 deletions(-) delete mode 100644 crates/polars-python/src/functions/partitioning.rs create mode 100644 crates/polars-python/src/lazyframe/sink.rs diff --git a/crates/polars-lazy/src/frame/mod.rs b/crates/polars-lazy/src/frame/mod.rs index 88c102e5ff69..ed27dc41a772 100644 --- a/crates/polars-lazy/src/frame/mod.rs +++ b/crates/polars-lazy/src/frame/mod.rs @@ -772,10 +772,12 @@ impl LazyFrame { path: &dyn AsRef, options: ParquetWriteOptions, cloud_options: Option, + sink_options: SinkOptions, ) -> PolarsResult<()> { self.sink( SinkType::File { path: Arc::new(path.as_ref().to_path_buf()), + sink_options, file_type: FileType::Parquet(options), cloud_options, }, @@ -792,10 +794,12 @@ impl LazyFrame { path: impl AsRef, options: IpcWriterOptions, cloud_options: Option, + sink_options: SinkOptions, ) -> PolarsResult<()> { self.sink( SinkType::File { path: Arc::new(path.as_ref().to_path_buf()), + sink_options, file_type: FileType::Ipc(options), cloud_options, }, @@ -812,10 +816,12 @@ impl LazyFrame { path: impl AsRef, options: CsvWriterOptions, cloud_options: Option, + sink_options: SinkOptions, ) -> PolarsResult<()> { self.sink( SinkType::File { path: Arc::new(path.as_ref().to_path_buf()), + sink_options, file_type: FileType::Csv(options), cloud_options, }, @@ -832,10 +838,12 @@ impl LazyFrame { path: impl AsRef, options: JsonWriterOptions, cloud_options: Option, + sink_options: SinkOptions, ) -> PolarsResult<()> { self.sink( SinkType::File { path: Arc::new(path.as_ref().to_path_buf()), + sink_options, file_type: FileType::Json(options), cloud_options, }, @@ -853,10 +861,12 @@ impl LazyFrame { variant: PartitionVariant, options: ParquetWriteOptions, cloud_options: Option, + sink_options: SinkOptions, ) -> PolarsResult<()> { self.sink( SinkType::Partition { path_f_string: Arc::new(path_f_string.as_ref().to_path_buf()), + sink_options, variant, file_type: FileType::Parquet(options), cloud_options, @@ -875,10 +885,12 @@ impl LazyFrame { variant: PartitionVariant, options: IpcWriterOptions, cloud_options: Option, + sink_options: SinkOptions, ) -> PolarsResult<()> { self.sink( SinkType::Partition { path_f_string: Arc::new(path_f_string.as_ref().to_path_buf()), + sink_options, variant, file_type: FileType::Ipc(options), cloud_options, @@ -897,10 +909,12 @@ impl LazyFrame { variant: PartitionVariant, options: CsvWriterOptions, cloud_options: Option, + sink_options: SinkOptions, ) -> PolarsResult<()> { self.sink( SinkType::Partition { path_f_string: Arc::new(path_f_string.as_ref().to_path_buf()), + sink_options, variant, file_type: FileType::Csv(options), cloud_options, @@ -919,10 +933,12 @@ impl LazyFrame { variant: PartitionVariant, options: JsonWriterOptions, cloud_options: Option, + sink_options: SinkOptions, ) -> PolarsResult<()> { self.sink( SinkType::Partition { path_f_string: Arc::new(path_f_string.as_ref().to_path_buf()), + sink_options, variant, file_type: FileType::Json(options), cloud_options, diff --git a/crates/polars-pipe/src/pipeline/convert.rs b/crates/polars-pipe/src/pipeline/convert.rs index 51f03cce4478..f03cea313a04 100644 --- a/crates/polars-pipe/src/pipeline/convert.rs +++ b/crates/polars-pipe/src/pipeline/convert.rs @@ -183,6 +183,7 @@ where #[allow(unused_variables)] SinkType::File { path, + sink_options: _, file_type, cloud_options, } => { diff --git a/crates/polars-plan/src/dsl/options.rs b/crates/polars-plan/src/dsl/options.rs index bea9a181ee77..4c6beaf7419e 100644 --- a/crates/polars-plan/src/dsl/options.rs +++ b/crates/polars-plan/src/dsl/options.rs @@ -290,6 +290,27 @@ pub struct AnonymousScanOptions { pub fmt_str: &'static str, } +#[derive(Clone, Copy, PartialEq, Eq, Debug, Default, Hash)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub enum SyncOnCloseType { + /// Don't call sync on close. + #[default] + None, + + /// Sync only the file contents. + Data, + /// Synce the file contents and the metadata. + All, +} + +/// Options that apply to all sinks. +#[derive(Clone, PartialEq, Eq, Debug, Default, Hash)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub struct SinkOptions { + /// Call sync when closing the file. + pub sync_on_close: SyncOnCloseType, +} + #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub enum SinkType { @@ -297,11 +318,13 @@ pub enum SinkType { File { path: Arc, file_type: FileType, + sink_options: SinkOptions, cloud_options: Option, }, Partition { path_f_string: Arc, file_type: FileType, + sink_options: SinkOptions, variant: PartitionVariant, cloud_options: Option, }, diff --git a/crates/polars-python/src/functions/mod.rs b/crates/polars-python/src/functions/mod.rs index 2a18b884f2c6..a8a40f999faa 100644 --- a/crates/polars-python/src/functions/mod.rs +++ b/crates/polars-python/src/functions/mod.rs @@ -5,8 +5,6 @@ mod io; mod lazy; mod meta; mod misc; -#[cfg(feature = "pymethods")] -mod partitioning; mod random; mod range; mod string_cache; @@ -21,8 +19,6 @@ pub use io::*; pub use lazy::*; pub use meta::*; pub use misc::*; -#[cfg(feature = "pymethods")] -pub use partitioning::*; pub use random::*; pub use range::*; pub use string_cache::*; diff --git a/crates/polars-python/src/functions/partitioning.rs b/crates/polars-python/src/functions/partitioning.rs deleted file mode 100644 index a3e29d30601a..000000000000 --- a/crates/polars-python/src/functions/partitioning.rs +++ /dev/null @@ -1,24 +0,0 @@ -use std::path::PathBuf; -use std::sync::Arc; - -use polars::prelude::PartitionVariant; -use polars_utils::IdxSize; -use pyo3::{pyclass, pymethods}; - -#[pyclass] -#[derive(Clone)] -pub struct PyPartitioning { - pub path: Arc, - pub variant: PartitionVariant, -} - -#[pymethods] -impl PyPartitioning { - #[staticmethod] - pub fn new_max_size(path: PathBuf, max_size: IdxSize) -> PyPartitioning { - PyPartitioning { - path: Arc::new(path), - variant: PartitionVariant::MaxSize(max_size), - } - } -} diff --git a/crates/polars-python/src/lazyframe/general.rs b/crates/polars-python/src/lazyframe/general.rs index 269cb0827a96..24a87e4f06e1 100644 --- a/crates/polars-python/src/lazyframe/general.rs +++ b/crates/polars-python/src/lazyframe/general.rs @@ -13,10 +13,9 @@ use pyo3::prelude::*; use pyo3::pybacked::PyBackedStr; use pyo3::types::{PyDict, PyList}; -use super::PyLazyFrame; +use super::{PyLazyFrame, SinkTarget}; use crate::error::PyPolarsErr; use crate::expr::ToExprs; -use crate::functions::PyPartitioning; use crate::interop::arrow::to_rust::pyarrow_schema_to_rust; use crate::lazyframe::visit::NodeTraverser; use crate::prelude::*; @@ -36,31 +35,6 @@ fn pyobject_to_first_path_and_scan_sources( }) } -#[derive(Clone)] -enum SinkTarget { - Path(PathBuf), - Partition(PyPartitioning), -} - -impl<'py> FromPyObject<'py> for SinkTarget { - fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult { - if let Ok(v) = ob.extract::() { - return Ok(Self::Partition(v)); - } - - Ok(Self::Path(ob.extract::()?)) - } -} - -impl SinkTarget { - fn unformatted_path(&self) -> &Path { - match self { - Self::Path(path) => path.as_path(), - Self::Partition(partition) => partition.path.as_ref().as_path(), - } - } -} - #[pymethods] #[allow(clippy::should_implement_trait)] impl PyLazyFrame { @@ -708,7 +682,7 @@ impl PyLazyFrame { #[cfg(all(feature = "streaming", feature = "parquet"))] #[pyo3(signature = ( target, compression, compression_level, statistics, row_group_size, data_page_size, - maintain_order, cloud_options, credential_provider, retries + maintain_order, cloud_options, credential_provider, retries, sink_options ))] fn sink_parquet( &self, @@ -723,6 +697,7 @@ impl PyLazyFrame { cloud_options: Option>, credential_provider: Option, retries: usize, + sink_options: Wrap, ) -> PyResult<()> { let compression = parse_parquet_compression(compression, compression_level)?; @@ -751,21 +726,25 @@ impl PyLazyFrame { py.enter_polars(|| { let ldf = self.ldf.clone(); match target { - SinkTarget::Path(path) => { - ldf.sink_parquet(&path as &dyn AsRef, options, cloud_options) - }, + SinkTarget::Path(path) => ldf.sink_parquet( + &path as &dyn AsRef, + options, + cloud_options, + sink_options.0, + ), SinkTarget::Partition(partition) => ldf.sink_parquet_partitioned( partition.path.as_ref(), partition.variant, options, cloud_options, + sink_options.0, ), } }) } #[cfg(all(feature = "streaming", feature = "ipc"))] - #[pyo3(signature = (target, compression, maintain_order, cloud_options, credential_provider, retries))] + #[pyo3(signature = (target, compression, maintain_order, cloud_options, credential_provider, retries, sink_options))] fn sink_ipc( &self, py: Python, @@ -775,6 +754,7 @@ impl PyLazyFrame { cloud_options: Option>, credential_provider: Option, retries: usize, + sink_options: Wrap, ) -> PyResult<()> { let options = IpcWriterOptions { compression: compression.map(|c| c.0), @@ -802,12 +782,15 @@ impl PyLazyFrame { py.enter_polars(|| { let ldf = self.ldf.clone(); match target { - SinkTarget::Path(path) => ldf.sink_ipc(path, options, cloud_options), + SinkTarget::Path(path) => { + ldf.sink_ipc(path, options, cloud_options, sink_options.0) + }, SinkTarget::Partition(partition) => ldf.sink_ipc_partitioned( partition.path.as_ref(), partition.variant, options, cloud_options, + sink_options.0, ), } }) @@ -817,7 +800,7 @@ impl PyLazyFrame { #[pyo3(signature = ( target, include_bom, include_header, separator, line_terminator, quote_char, batch_size, datetime_format, date_format, time_format, float_scientific, float_precision, null_value, - quote_style, maintain_order, cloud_options, credential_provider, retries + quote_style, maintain_order, cloud_options, credential_provider, retries, sink_options ))] fn sink_csv( &self, @@ -840,6 +823,7 @@ impl PyLazyFrame { cloud_options: Option>, credential_provider: Option, retries: usize, + sink_options: Wrap, ) -> PyResult<()> { let quote_style = quote_style.map_or(QuoteStyle::default(), |wrap| wrap.0); let null_value = null_value.unwrap_or(SerializeOptions::default().null); @@ -886,12 +870,15 @@ impl PyLazyFrame { py.enter_polars(|| { let ldf = self.ldf.clone(); match target { - SinkTarget::Path(path) => ldf.sink_csv(path, options, cloud_options), + SinkTarget::Path(path) => { + ldf.sink_csv(path, options, cloud_options, sink_options.0) + }, SinkTarget::Partition(partition) => ldf.sink_csv_partitioned( partition.path.as_ref(), partition.variant, options, cloud_options, + sink_options.0, ), } }) @@ -899,7 +886,7 @@ impl PyLazyFrame { #[allow(clippy::too_many_arguments)] #[cfg(all(feature = "streaming", feature = "json"))] - #[pyo3(signature = (target, maintain_order, cloud_options, credential_provider, retries))] + #[pyo3(signature = (target, maintain_order, cloud_options, credential_provider, retries, sink_options))] fn sink_json( &self, py: Python, @@ -908,6 +895,7 @@ impl PyLazyFrame { cloud_options: Option>, credential_provider: Option, retries: usize, + sink_options: Wrap, ) -> PyResult<()> { let options = JsonWriterOptions { maintain_order }; @@ -928,12 +916,15 @@ impl PyLazyFrame { py.enter_polars(|| { let ldf = self.ldf.clone(); match target { - SinkTarget::Path(path) => ldf.sink_json(path, options, cloud_options), + SinkTarget::Path(path) => { + ldf.sink_json(path, options, cloud_options, sink_options.0) + }, SinkTarget::Partition(partition) => ldf.sink_json_partitioned( partition.path.as_ref(), partition.variant, options, cloud_options, + sink_options.0, ), } }) diff --git a/crates/polars-python/src/lazyframe/mod.rs b/crates/polars-python/src/lazyframe/mod.rs index 132959d54bff..1fed7196133f 100644 --- a/crates/polars-python/src/lazyframe/mod.rs +++ b/crates/polars-python/src/lazyframe/mod.rs @@ -3,6 +3,7 @@ mod exitable; mod general; #[cfg(feature = "pymethods")] mod serde; +mod sink; pub mod visit; pub mod visitor; @@ -10,6 +11,7 @@ pub mod visitor; pub use exitable::PyInProcessQuery; use polars::prelude::LazyFrame; use pyo3::pyclass; +pub use sink::{PyPartitioning, SinkTarget}; #[pyclass] #[repr(transparent)] diff --git a/crates/polars-python/src/lazyframe/sink.rs b/crates/polars-python/src/lazyframe/sink.rs new file mode 100644 index 000000000000..10aa7985631c --- /dev/null +++ b/crates/polars-python/src/lazyframe/sink.rs @@ -0,0 +1,89 @@ +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use polars::prelude::{PartitionVariant, SinkOptions, SyncOnCloseType}; +use polars_utils::IdxSize; +use pyo3::exceptions::PyValueError; +use pyo3::pybacked::PyBackedStr; +use pyo3::types::{PyAnyMethods, PyDict, PyDictMethods}; +use pyo3::{pyclass, pymethods, Bound, FromPyObject, PyAny, PyResult}; + +use crate::prelude::Wrap; + +#[derive(Clone)] +pub enum SinkTarget { + Path(PathBuf), + Partition(PyPartitioning), +} + +#[pyclass] +#[derive(Clone)] +pub struct PyPartitioning { + pub path: Arc, + pub variant: PartitionVariant, +} + +#[cfg(feature = "pymethods")] +#[pymethods] +impl PyPartitioning { + #[staticmethod] + pub fn new_max_size(path: PathBuf, max_size: IdxSize) -> PyPartitioning { + PyPartitioning { + path: Arc::new(path), + variant: PartitionVariant::MaxSize(max_size), + } + } +} + +impl<'py> FromPyObject<'py> for SinkTarget { + fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult { + if let Ok(v) = ob.extract::() { + return Ok(Self::Partition(v)); + } + + Ok(Self::Path(ob.extract::()?)) + } +} + +impl SinkTarget { + pub fn unformatted_path(&self) -> &Path { + match self { + Self::Path(path) => path.as_path(), + Self::Partition(partition) => partition.path.as_ref().as_path(), + } + } +} + +impl<'py> FromPyObject<'py> for Wrap { + fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult { + let parsed = match &*ob.extract::()? { + "none" => SyncOnCloseType::None, + "data" => SyncOnCloseType::Data, + "all" => SyncOnCloseType::All, + v => { + return Err(PyValueError::new_err(format!( + "`sync_on_close` must be one of {{'none', 'data', 'all'}}, got {v}", + ))) + }, + }; + Ok(Wrap(parsed)) + } +} + +impl<'py> FromPyObject<'py> for Wrap { + fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult { + let parsed = ob.extract::>()?; + + if parsed.len() != 1 { + return Err(PyValueError::new_err( + "`sink_options` must be a dictionary with the exactly 1 field.", + )); + } + + let sync_on_close = PyDictMethods::get_item(&parsed, "sync_on_close")? + .ok_or_else(|| PyValueError::new_err("`sink_options` must be `sync_on_close` field"))?; + let sync_on_close = sync_on_close.extract::>()?.0; + + Ok(Wrap(SinkOptions { sync_on_close })) + } +} diff --git a/crates/polars-stream/src/nodes/io_sinks/csv.rs b/crates/polars-stream/src/nodes/io_sinks/csv.rs index ce988bc67033..7f252e4991fb 100644 --- a/crates/polars-stream/src/nodes/io_sinks/csv.rs +++ b/crates/polars-stream/src/nodes/io_sinks/csv.rs @@ -7,25 +7,33 @@ use polars_error::PolarsResult; use polars_expr::state::ExecutionState; use polars_io::prelude::{CsvWriter, CsvWriterOptions}; use polars_io::SerWriter; +use polars_plan::dsl::SinkOptions; use polars_utils::priority::Priority; use super::{SinkInputPort, SinkNode, SinkRecvPort}; use crate::async_executor::spawn; use crate::async_primitives::linearizer::Linearizer; -use crate::nodes::io_sinks::DEFAULT_SINK_LINEARIZER_BUFFER_SIZE; +use crate::nodes::io_sinks::{tokio_sync_on_close, DEFAULT_SINK_LINEARIZER_BUFFER_SIZE}; use crate::nodes::{JoinHandle, MorselSeq, TaskPriority}; type Linearized = Priority, Vec>; pub struct CsvSinkNode { path: PathBuf, schema: SchemaRef, + sink_options: SinkOptions, write_options: CsvWriterOptions, } impl CsvSinkNode { - pub fn new(schema: SchemaRef, path: PathBuf, write_options: CsvWriterOptions) -> Self { + pub fn new( + schema: SchemaRef, + path: PathBuf, + sink_options: SinkOptions, + write_options: CsvWriterOptions, + ) -> Self { Self { path, schema, + sink_options, write_options, } } @@ -123,6 +131,7 @@ impl SinkNode for CsvSinkNode { // // Task that will actually do write to the target file. let path = self.path.clone(); + let sink_options = self.sink_options.clone(); let schema = self.schema.clone(); let include_header = self.write_options.include_header; let include_bom = self.write_options.include_bom; @@ -154,6 +163,7 @@ impl SinkNode for CsvSinkNode { file.write_all(&buffer).await?; } + tokio_sync_on_close(sink_options.sync_on_close, &mut file).await?; PolarsResult::Ok(()) }); join_handles.push(spawn(TaskPriority::Low, async move { diff --git a/crates/polars-stream/src/nodes/io_sinks/ipc.rs b/crates/polars-stream/src/nodes/io_sinks/ipc.rs index 6c7791546cbc..f328ecec2aea 100644 --- a/crates/polars-stream/src/nodes/io_sinks/ipc.rs +++ b/crates/polars-stream/src/nodes/io_sinks/ipc.rs @@ -14,6 +14,7 @@ use polars_error::PolarsResult; use polars_expr::state::ExecutionState; use polars_io::ipc::{IpcWriter, IpcWriterOptions}; use polars_io::SerWriter; +use polars_plan::dsl::SinkOptions; use polars_utils::priority::Priority; use super::{ @@ -25,6 +26,7 @@ use crate::async_primitives::connector::connector; use crate::async_primitives::distributor_channel::distributor_channel; use crate::async_primitives::linearizer::Linearizer; use crate::morsel::get_ideal_morsel_size; +use crate::nodes::io_sinks::sync_on_close; use crate::nodes::{JoinHandle, TaskPriority}; pub struct IpcSinkNode { @@ -32,6 +34,7 @@ pub struct IpcSinkNode { input_schema: SchemaRef, write_options: IpcWriterOptions, + sink_options: SinkOptions, compat_level: CompatLevel, @@ -39,12 +42,18 @@ pub struct IpcSinkNode { } impl IpcSinkNode { - pub fn new(input_schema: SchemaRef, path: PathBuf, write_options: IpcWriterOptions) -> Self { + pub fn new( + input_schema: SchemaRef, + path: PathBuf, + sink_options: SinkOptions, + write_options: IpcWriterOptions, + ) -> Self { Self { path, input_schema, write_options, + sink_options, compat_level: CompatLevel::newest(), // @TODO: make this accessible from outside @@ -308,6 +317,7 @@ impl SinkNode for IpcSinkNode { // // Task that will actually do write to the target file. let path = self.path.clone(); + let sink_options = self.sink_options.clone(); let write_options = self.write_options; let input_schema = self.input_schema.clone(); let io_task = polars_io::pl_async::get_runtime().spawn(async move { @@ -319,7 +329,8 @@ impl SinkNode for IpcSinkNode { .truncate(true) .open(path.as_path()) .await?; - let writer = BufWriter::new(file.into_std().await); + let mut file = file.into_std().await; + let writer = BufWriter::new(&mut file); let mut writer = IpcWriter::new(writer) .with_compression(write_options.compression) .with_parallel(false) @@ -332,7 +343,9 @@ impl SinkNode for IpcSinkNode { } writer.finish()?; + drop(writer); + sync_on_close(sink_options.sync_on_close, &mut file)?; PolarsResult::Ok(()) }); join_handles.push(spawn(TaskPriority::Low, async move { diff --git a/crates/polars-stream/src/nodes/io_sinks/json.rs b/crates/polars-stream/src/nodes/io_sinks/json.rs index 574a2a2d5c12..37ca3db22cee 100644 --- a/crates/polars-stream/src/nodes/io_sinks/json.rs +++ b/crates/polars-stream/src/nodes/io_sinks/json.rs @@ -4,21 +4,23 @@ use std::path::PathBuf; use polars_error::PolarsResult; use polars_expr::state::ExecutionState; use polars_io::json::BatchedWriter; +use polars_plan::dsl::SinkOptions; use polars_utils::priority::Priority; use super::{SinkInputPort, SinkNode, SinkRecvPort}; use crate::async_executor::spawn; use crate::async_primitives::linearizer::Linearizer; -use crate::nodes::io_sinks::DEFAULT_SINK_LINEARIZER_BUFFER_SIZE; +use crate::nodes::io_sinks::{tokio_sync_on_close, DEFAULT_SINK_LINEARIZER_BUFFER_SIZE}; use crate::nodes::{JoinHandle, MorselSeq, TaskPriority}; type Linearized = Priority, Vec>; pub struct NDJsonSinkNode { path: PathBuf, + sink_options: SinkOptions, } impl NDJsonSinkNode { - pub fn new(path: PathBuf) -> Self { - Self { path } + pub fn new(path: PathBuf, sink_options: SinkOptions) -> Self { + Self { path, sink_options } } } @@ -92,11 +94,11 @@ impl SinkNode for NDJsonSinkNode { }) })); - let path = self.path.clone(); - // IO task. // // Task that will actually do write to the target file. + let sink_options = self.sink_options.clone(); + let path = self.path.clone(); let io_task = polars_io::pl_async::get_runtime().spawn(async move { use tokio::fs::OpenOptions; use tokio::io::AsyncWriteExt; @@ -113,6 +115,7 @@ impl SinkNode for NDJsonSinkNode { file.write_all(&buffer).await?; } + tokio_sync_on_close(sink_options.sync_on_close, &mut file).await?; PolarsResult::Ok(()) }); join_handles.push(spawn(TaskPriority::Low, async move { diff --git a/crates/polars-stream/src/nodes/io_sinks/mod.rs b/crates/polars-stream/src/nodes/io_sinks/mod.rs index 53a3bbe8f23a..51386115ed3e 100644 --- a/crates/polars-stream/src/nodes/io_sinks/mod.rs +++ b/crates/polars-stream/src/nodes/io_sinks/mod.rs @@ -1,3 +1,5 @@ +use std::{fs, io}; + use futures::stream::FuturesUnordered; use futures::StreamExt; use polars_core::config; @@ -6,6 +8,7 @@ use polars_core::prelude::Column; use polars_core::schema::SchemaRef; use polars_error::PolarsResult; use polars_expr::state::ExecutionState; +use polars_plan::dsl::SyncOnCloseType; use super::io_sources::PhaseOutcomeToken; use super::{ @@ -326,3 +329,22 @@ impl ComputeNode for SinkComputeNode { })); } } + +pub fn sync_on_close(sync_on_close: SyncOnCloseType, file: &mut fs::File) -> io::Result<()> { + match sync_on_close { + SyncOnCloseType::None => Ok(()), + SyncOnCloseType::Data => file.sync_data(), + SyncOnCloseType::All => file.sync_all(), + } +} + +pub async fn tokio_sync_on_close( + sync_on_close: SyncOnCloseType, + file: &mut tokio::fs::File, +) -> io::Result<()> { + match sync_on_close { + SyncOnCloseType::None => Ok(()), + SyncOnCloseType::Data => file.sync_data().await, + SyncOnCloseType::All => file.sync_all().await, + } +} diff --git a/crates/polars-stream/src/nodes/io_sinks/parquet.rs b/crates/polars-stream/src/nodes/io_sinks/parquet.rs index f13013447468..72bcd1b93a95 100644 --- a/crates/polars-stream/src/nodes/io_sinks/parquet.rs +++ b/crates/polars-stream/src/nodes/io_sinks/parquet.rs @@ -16,6 +16,7 @@ use polars_parquet::write::{ array_to_columns, to_parquet_schema, CompressedPage, Compressor, Encoding, FileWriter, SchemaDescriptor, Version, WriteOptions, }; +use polars_plan::dsl::SinkOptions; use polars_utils::priority::Priority; use super::{ @@ -26,12 +27,14 @@ use crate::async_executor::spawn; use crate::async_primitives::connector::connector; use crate::async_primitives::distributor_channel::distributor_channel; use crate::async_primitives::linearizer::Linearizer; +use crate::nodes::io_sinks::sync_on_close; use crate::nodes::{JoinHandle, TaskPriority}; pub struct ParquetSinkNode { path: PathBuf, input_schema: SchemaRef, + sink_options: SinkOptions, write_options: ParquetWriteOptions, parquet_schema: SchemaDescriptor, @@ -43,6 +46,7 @@ impl ParquetSinkNode { pub fn new( input_schema: SchemaRef, path: &Path, + sink_options: SinkOptions, write_options: &ParquetWriteOptions, ) -> PolarsResult { let schema = schema_to_arrow_checked(&input_schema, CompatLevel::newest(), "parquet")?; @@ -53,6 +57,7 @@ impl ParquetSinkNode { path: path.to_path_buf(), input_schema, + sink_options, write_options: *write_options, parquet_schema, @@ -242,6 +247,7 @@ impl SinkNode for ParquetSinkNode { // Task that will actually do write to the target file. It is important that this is only // spawned once. let path = self.path.clone(); + let sink_options = self.sink_options.clone(); let write_options = self.write_options; let arrow_schema = self.arrow_schema.clone(); let parquet_schema = self.parquet_schema.clone(); @@ -256,9 +262,9 @@ impl SinkNode for ParquetSinkNode { .open(path.as_path()) .await .map_err(|err| polars_utils::_limit_path_len_io_err(path.as_path(), err))?; - let file = file.into_std().await; - let writer = BufWriter::new(file); - let options = WriteOptions { + let mut file = file.into_std().await; + let writer = BufWriter::new(&mut file); + let write_options = WriteOptions { statistics: write_options.statistics, compression: write_options.compression.into(), version: Version::V1, @@ -268,9 +274,9 @@ impl SinkNode for ParquetSinkNode { writer, arrow_schema, parquet_schema, - options, + write_options, )); - let mut writer = BatchedWriter::new(file_writer, encodings, options, false); + let mut writer = BatchedWriter::new(file_writer, encodings, write_options, false); let num_parquet_columns = writer.parquet_schema().leaves().len(); while let Ok(current_row_group) = io_rx.recv().await { @@ -281,7 +287,9 @@ impl SinkNode for ParquetSinkNode { } writer.finish()?; + drop(writer); + sync_on_close(sink_options.sync_on_close, &mut file)?; PolarsResult::Ok(()) }); join_handles.push(spawn(TaskPriority::Low, async move { diff --git a/crates/polars-stream/src/nodes/io_sinks/partition/mod.rs b/crates/polars-stream/src/nodes/io_sinks/partition/mod.rs index 31cd32912a47..854f7c4a80e6 100644 --- a/crates/polars-stream/src/nodes/io_sinks/partition/mod.rs +++ b/crates/polars-stream/src/nodes/io_sinks/partition/mod.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use polars_core::prelude::PlHashMap; use polars_core::schema::SchemaRef; use polars_error::PolarsResult; -use polars_plan::dsl::{FileType, PartitionVariant}; +use polars_plan::dsl::{FileType, PartitionVariant, SinkOptions}; use polars_utils::pl_str::PlSmallStr; use super::SinkNode; @@ -46,7 +46,11 @@ pub fn get_args_to_path_fn( }) as _, } } -pub fn get_create_new_fn(file_type: FileType, args_to_path: ArgsToPathFn) -> CreateNewSinkFn { +pub fn get_create_new_fn( + file_type: FileType, + sink_options: SinkOptions, + args_to_path: ArgsToPathFn, +) -> CreateNewSinkFn { match file_type { #[cfg(feature = "ipc")] FileType::Ipc(ipc_writer_options) => Arc::new(move |input_schema, args| { @@ -54,6 +58,7 @@ pub fn get_create_new_fn(file_type: FileType, args_to_path: ArgsToPathFn) -> Cre let sink = Box::new(super::ipc::IpcSinkNode::new( input_schema, path.clone(), + sink_options.clone(), ipc_writer_options, )) as Box; Ok((path, sink, args)) @@ -61,8 +66,10 @@ pub fn get_create_new_fn(file_type: FileType, args_to_path: ArgsToPathFn) -> Cre #[cfg(feature = "json")] FileType::Json(_ndjson_writer_options) => Arc::new(move |_input_schema, args| { let (path, args) = args_to_path(args); - let sink = Box::new(super::json::NDJsonSinkNode::new(path.clone())) - as Box; + let sink = Box::new(super::json::NDJsonSinkNode::new( + path.clone(), + sink_options.clone(), + )) as Box; Ok((path, sink, args)) }) as _, #[cfg(feature = "parquet")] @@ -71,6 +78,7 @@ pub fn get_create_new_fn(file_type: FileType, args_to_path: ArgsToPathFn) -> Cre let sink = Box::new(super::parquet::ParquetSinkNode::new( input_schema, path.as_path(), + sink_options.clone(), &parquet_writer_options, )?) as Box; Ok((path, sink, args)) @@ -81,6 +89,7 @@ pub fn get_create_new_fn(file_type: FileType, args_to_path: ArgsToPathFn) -> Cre let sink = Box::new(super::csv::CsvSinkNode::new( input_schema, path.clone(), + sink_options.clone(), csv_writer_options.clone(), )) as Box; Ok((path, sink, args)) diff --git a/crates/polars-stream/src/physical_plan/lower_ir.rs b/crates/polars-stream/src/physical_plan/lower_ir.rs index 19ada5b01510..9d3dde3cf968 100644 --- a/crates/polars-stream/src/physical_plan/lower_ir.rs +++ b/crates/polars-stream/src/physical_plan/lower_ir.rs @@ -227,10 +227,12 @@ pub fn lower_ir( }, SinkType::File { path, + sink_options, file_type, cloud_options: _, } => { let path = path.clone(); + let sink_options = sink_options.clone(); let file_type = file_type.clone(); match file_type { @@ -239,6 +241,7 @@ pub fn lower_ir( let phys_input = lower_ir!(*input)?; PhysNodeKind::FileSink { path, + sink_options, file_type, input: phys_input, } @@ -248,6 +251,7 @@ pub fn lower_ir( let phys_input = lower_ir!(*input)?; PhysNodeKind::FileSink { path, + sink_options, file_type, input: phys_input, } @@ -257,6 +261,7 @@ pub fn lower_ir( let phys_input = lower_ir!(*input)?; PhysNodeKind::FileSink { path, + sink_options, file_type, input: phys_input, } @@ -266,6 +271,7 @@ pub fn lower_ir( let phys_input = lower_ir!(*input)?; PhysNodeKind::FileSink { path, + sink_options, file_type, input: phys_input, } @@ -274,17 +280,20 @@ pub fn lower_ir( }, SinkType::Partition { path_f_string, + sink_options, variant, file_type, cloud_options: _, } => { let path_f_string = path_f_string.clone(); + let sink_options = sink_options.clone(); let variant = variant.clone(); let file_type = file_type.clone(); let phys_input = lower_ir!(*input)?; PhysNodeKind::PartitionSink { path_f_string, + sink_options, variant, file_type, input: phys_input, diff --git a/crates/polars-stream/src/physical_plan/mod.rs b/crates/polars-stream/src/physical_plan/mod.rs index d78e93266818..aaba84c1d10f 100644 --- a/crates/polars-stream/src/physical_plan/mod.rs +++ b/crates/polars-stream/src/physical_plan/mod.rs @@ -8,7 +8,9 @@ use polars_core::utils::arrow::bitmap::Bitmap; use polars_error::PolarsResult; use polars_io::RowIndex; use polars_ops::frame::JoinArgs; -use polars_plan::dsl::{FileScan, JoinTypeOptionsIR, PartitionVariant, ScanSource, ScanSources}; +use polars_plan::dsl::{ + FileScan, JoinTypeOptionsIR, PartitionVariant, ScanSource, ScanSources, SinkOptions, +}; use polars_plan::plans::hive::HivePartitionsDf; use polars_plan::plans::{AExpr, DataFrameUdf, FileInfo, IR}; use polars_plan::prelude::expr_ir::ExprIR; @@ -129,12 +131,14 @@ pub enum PhysNodeKind { FileSink { path: Arc, + sink_options: SinkOptions, file_type: FileType, input: PhysStream, }, PartitionSink { path_f_string: Arc, + sink_options: SinkOptions, variant: PartitionVariant, file_type: FileType, input: PhysStream, diff --git a/crates/polars-stream/src/physical_plan/to_graph.rs b/crates/polars-stream/src/physical_plan/to_graph.rs index 8e79337e711f..98b3a80fc6e9 100644 --- a/crates/polars-stream/src/physical_plan/to_graph.rs +++ b/crates/polars-stream/src/physical_plan/to_graph.rs @@ -222,9 +222,11 @@ fn to_graph_rec<'a>( FileSink { path, + sink_options, file_type, input, } => { + let sink_options = sink_options.clone(); let input_schema = ctx.phys_sm[input.node].output_schema.clone(); let input_key = to_graph_rec(input.node, ctx)?; @@ -234,6 +236,7 @@ fn to_graph_rec<'a>( SinkComputeNode::from(nodes::io_sinks::ipc::IpcSinkNode::new( input_schema, path.to_path_buf(), + sink_options, *ipc_writer_options, )), [(input_key, input.port)], @@ -242,6 +245,7 @@ fn to_graph_rec<'a>( FileType::Json(_) => ctx.graph.add_node( SinkComputeNode::from(nodes::io_sinks::json::NDJsonSinkNode::new( path.to_path_buf(), + sink_options, )), [(input_key, input.port)], ), @@ -250,6 +254,7 @@ fn to_graph_rec<'a>( SinkComputeNode::from(nodes::io_sinks::parquet::ParquetSinkNode::new( input_schema, path, + sink_options, parquet_writer_options, )?), [(input_key, input.port)], @@ -259,6 +264,7 @@ fn to_graph_rec<'a>( SinkComputeNode::from(nodes::io_sinks::csv::CsvSinkNode::new( input_schema, path.to_path_buf(), + sink_options, csv_writer_options.clone(), )), [(input_key, input.port)], @@ -277,6 +283,7 @@ fn to_graph_rec<'a>( PartitionSink { path_f_string, + sink_options, variant, file_type, input, @@ -287,8 +294,11 @@ fn to_graph_rec<'a>( let path_f_string = path_f_string.clone(); let args_to_path = nodes::io_sinks::partition::get_args_to_path_fn(variant, path_f_string); - let create_new = - nodes::io_sinks::partition::get_create_new_fn(file_type.clone(), args_to_path); + let create_new = nodes::io_sinks::partition::get_create_new_fn( + file_type.clone(), + sink_options.clone(), + args_to_path, + ); match variant { PartitionVariant::MaxSize(max_size) => ctx.graph.add_node( diff --git a/py-polars/polars/_typing.py b/py-polars/polars/_typing.py index 01bb370dcb20..b3fe9aadf672 100644 --- a/py-polars/polars/_typing.py +++ b/py-polars/polars/_typing.py @@ -150,6 +150,7 @@ def __arrow_c_stream__(self, requested_schema: object | None = None) -> object: "saturday", "sunday", ] +SyncOnClose: TypeAlias = Literal["none", "data", "all"] TimeUnit: TypeAlias = Literal["ns", "us", "ms"] UnicodeForm: TypeAlias = Literal["NFC", "NFKC", "NFD", "NFKD"] UniqueKeepStrategy: TypeAlias = Literal["first", "last", "any", "none"] diff --git a/py-polars/polars/lazyframe/frame.py b/py-polars/polars/lazyframe/frame.py index 32ed3714c9fd..8e194993d8f3 100644 --- a/py-polars/polars/lazyframe/frame.py +++ b/py-polars/polars/lazyframe/frame.py @@ -127,6 +127,7 @@ SchemaDict, SerializationFormat, StartBy, + SyncOnClose, UniqueKeepStrategy, ) from polars.dependencies import numpy as np @@ -2311,6 +2312,7 @@ def sink_parquet( | Literal["auto"] | None = "auto", retries: int = 2, + sync_on_close: SyncOnClose = "none", ) -> None: """ Evaluate the query in streaming mode and write to a Parquet file. @@ -2403,6 +2405,12 @@ def sink_parquet( at any point without it being considered a breaking change. retries Number of retries if accessing a cloud instance fails. + sync_on_close + Sync to disk when before closing a file.. + + * `none` does not sync. + * `data` syncs the file contents. + * `all` syncs the file contents and metadata. Returns ------- @@ -2462,6 +2470,10 @@ def sink_parquet( else: target = normalize_filepath(path) + sink_options = { + "sync_on_close": sync_on_close, + } + return lf.sink_parquet( target=target, compression=compression, @@ -2473,6 +2485,7 @@ def sink_parquet( cloud_options=storage_options, credential_provider=credential_provider_builder, retries=retries, + sink_options=sink_options, ) @unstable() @@ -2495,6 +2508,7 @@ def sink_ipc( | Literal["auto"] | None = "auto", retries: int = 2, + sync_on_close: SyncOnClose = "none", ) -> None: """ Evaluate the query in streaming mode and write to an IPC file. @@ -2553,6 +2567,12 @@ def sink_ipc( at any point without it being considered a breaking change. retries Number of retries if accessing a cloud instance fails. + sync_on_close + Sync to disk when before closing a file.. + + * `none` does not sync. + * `data` syncs the file contents. + * `all` syncs the file contents and metadata. Returns ------- @@ -2595,6 +2615,10 @@ def sink_ipc( else: target = path + sink_options = { + "sync_on_close": sync_on_close, + } + return lf.sink_ipc( target=target, compression=compression, @@ -2602,6 +2626,7 @@ def sink_ipc( cloud_options=storage_options, credential_provider=credential_provider_builder, retries=retries, + sink_options=sink_options, ) @unstable() @@ -2636,6 +2661,7 @@ def sink_csv( | Literal["auto"] | None = "auto", retries: int = 2, + sync_on_close: SyncOnClose = "none", ) -> None: """ Evaluate the query in streaming mode and write to a CSV file. @@ -2742,6 +2768,12 @@ def sink_csv( at any point without it being considered a breaking change. retries Number of retries if accessing a cloud instance fails. + sync_on_close + Sync to disk when before closing a file.. + + * `none` does not sync. + * `data` syncs the file contents. + * `all` syncs the file contents and metadata. Returns ------- @@ -2791,6 +2823,10 @@ def sink_csv( else: target = normalize_filepath(path) + sink_options = { + "sync_on_close": sync_on_close, + } + return lf.sink_csv( target=target, include_bom=include_bom, @@ -2810,6 +2846,7 @@ def sink_csv( cloud_options=storage_options, credential_provider=credential_provider_builder, retries=retries, + sink_options=sink_options, ) @unstable() @@ -2831,6 +2868,7 @@ def sink_ndjson( | Literal["auto"] | None = "auto", retries: int = 2, + sync_on_close: SyncOnClose = "none", ) -> None: """ Evaluate the query in streaming mode and write to an NDJSON file. @@ -2886,6 +2924,12 @@ def sink_ndjson( at any point without it being considered a breaking change. retries Number of retries if accessing a cloud instance fails. + sync_on_close + Sync to disk when before closing a file.. + + * `none` does not sync. + * `data` syncs the file contents. + * `all` syncs the file contents and metadata. Returns ------- @@ -2928,12 +2972,17 @@ def sink_ndjson( else: target = path + sink_options = { + "sync_on_close": sync_on_close, + } + return lf.sink_json( target=target, maintain_order=maintain_order, cloud_options=storage_options, credential_provider=credential_provider_builder, retries=retries, + sink_options=sink_options, ) def _set_sink_optimizations( diff --git a/py-polars/src/lib.rs b/py-polars/src/lib.rs index d95c41cb0644..377205aa6a21 100644 --- a/py-polars/src/lib.rs +++ b/py-polars/src/lib.rs @@ -14,10 +14,10 @@ use polars_python::catalog::unity::PyCatalogClient; use polars_python::cloud; use polars_python::dataframe::PyDataFrame; use polars_python::expr::PyExpr; -use polars_python::functions::{PyPartitioning, PyStringCacheHolder}; +use polars_python::functions::PyStringCacheHolder; #[cfg(not(target_arch = "wasm32"))] use polars_python::lazyframe::PyInProcessQuery; -use polars_python::lazyframe::PyLazyFrame; +use polars_python::lazyframe::{PyLazyFrame, PyPartitioning}; use polars_python::lazygroupby::PyLazyGroupBy; use polars_python::series::PySeries; #[cfg(feature = "sql")] diff --git a/py-polars/tests/unit/io/test_partition.py b/py-polars/tests/unit/io/test_partition.py index e271edf87c53..8aa44a2064c5 100644 --- a/py-polars/tests/unit/io/test_partition.py +++ b/py-polars/tests/unit/io/test_partition.py @@ -1,7 +1,5 @@ from __future__ import annotations -import os -import sys from typing import TYPE_CHECKING, Any, TypedDict import pytest @@ -32,10 +30,6 @@ class IOType(TypedDict): @pytest.mark.parametrize("io_type", io_types) @pytest.mark.parametrize("length", [0, 1, 4, 5, 6, 7]) @pytest.mark.parametrize("max_size", [1, 2, 3]) -@pytest.mark.skipif( - sys.platform == "win32", - reason="sync does not exist on Windows", -) @pytest.mark.write_disk def test_max_size_partition( tmp_path: Path, @@ -50,15 +44,14 @@ def test_max_size_partition( (io_type["sink"])( lf, MaxSizePartitioning(tmp_path / f"{{part}}.{io_type['ext']}", max_size=max_size), + # We need to sync here because platforms do not guarantee that a close on + # one thread is immediately visible on another thread. + # + # "Multithreaded processes and close()" + # https://man7.org/linux/man-pages/man2/close.2.html + sync_on_close="data", ) - # We need to sync here because platforms do not guarantee that a close on - # one thread is immediately visible on another thread. - # - # "Multithreaded processes and close()" - # https://man7.org/linux/man-pages/man2/close.2.html - os.sync() - i = 0 while length > 0: assert (io_type["scan"])(tmp_path / f"{i}.{io_type['ext']}").select( diff --git a/py-polars/tests/unit/streaming/test_streaming_io.py b/py-polars/tests/unit/streaming/test_streaming_io.py index 2197fac4c4ea..85858fc0bc9c 100644 --- a/py-polars/tests/unit/streaming/test_streaming_io.py +++ b/py-polars/tests/unit/streaming/test_streaming_io.py @@ -2,7 +2,6 @@ import io from typing import TYPE_CHECKING, Any -from unittest.mock import patch import pytest @@ -128,59 +127,6 @@ def test_sink_csv_14494(tmp_path: Path) -> None: assert pl.read_csv(tmp_path / "sink.csv").columns == ["c"] -def test_sink_csv_with_options() -> None: - """ - Test with all possible options. - - As we already tested the main read/write functionality of the `sink_csv` method in - the `test_sink_csv` method above, we only need to verify that all the options are - passed into the rust-polars correctly. - """ - df = pl.LazyFrame({"dummy": ["abc"]}) - with patch.object(df, "_ldf") as ldf: - df.sink_csv( - "target", - include_bom=True, - include_header=False, - separator=";", - line_terminator="|", - quote_char="$", - batch_size=42, - datetime_format="%Y", - date_format="%d", - time_format="%H", - float_scientific=True, - float_precision=42, - null_value="BOOM", - quote_style="always", - maintain_order=False, - storage_options=None, - credential_provider="auto", - retries=2, - ) - - ldf.optimization_toggle().sink_csv.assert_called_with( - target="target", - include_bom=True, - include_header=False, - separator=ord(";"), - line_terminator="|", - quote_char=ord("$"), - batch_size=42, - datetime_format="%Y", - date_format="%d", - time_format="%H", - float_scientific=True, - float_precision=42, - null_value="BOOM", - quote_style="always", - maintain_order=False, - cloud_options=None, - credential_provider=None, - retries=2, - ) - - @pytest.mark.parametrize(("value"), ["abc", ""]) def test_sink_csv_exception_for_separator(value: str) -> None: df = pl.LazyFrame({"dummy": ["abc"]})