From 34f5d17f8f698ee81a2ec8cce5ae6a2f4a3f9ce3 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Thu, 6 Mar 2025 20:37:28 +1100 Subject: [PATCH] c --- crates/polars-plan/src/dsl/scan_sources.rs | 22 ++++++++++++ crates/polars-python/src/lazyframe/sink.rs | 5 +++ .../polars-stream/src/nodes/io_sinks/csv.rs | 33 ++++++++++------- .../polars-stream/src/nodes/io_sinks/ipc.rs | 28 +++++++++------ .../polars-stream/src/nodes/io_sinks/json.rs | 36 +++++++++++++------ .../src/nodes/io_sinks/parquet.rs | 28 +++++++++------ .../src/nodes/io_sinks/partition/mod.rs | 8 ++++- .../polars-stream/src/nodes/io_sources/csv.rs | 1 + .../polars-stream/src/nodes/io_sources/ipc.rs | 34 ++++++++++++++---- .../src/physical_plan/lower_ir.rs | 11 ++++-- crates/polars-stream/src/physical_plan/mod.rs | 3 ++ .../src/physical_plan/to_graph.rs | 9 ++++- py-polars/polars/io/cloud/_utils.py | 3 ++ py-polars/polars/io/partition.py | 4 +++ 14 files changed, 168 insertions(+), 57 deletions(-) diff --git a/crates/polars-plan/src/dsl/scan_sources.rs b/crates/polars-plan/src/dsl/scan_sources.rs index 040a6553b37d..6e3dacf4b9d2 100644 --- a/crates/polars-plan/src/dsl/scan_sources.rs +++ b/crates/polars-plan/src/dsl/scan_sources.rs @@ -336,6 +336,28 @@ impl ScanSourceRef<'_> { } } + pub fn to_memslice_async_check_latest(&self, run_async: bool) -> PolarsResult { + match self { + ScanSourceRef::Path(path) => { + let file = if run_async { + feature_gated!("cloud", { + polars_io::file_cache::FILE_CACHE + .get_entry(path.to_str().unwrap()) + // Safety: This was initialized by schema inference. + .unwrap() + .try_open_check_latest()? + }) + } else { + polars_utils::open_file(path)? + }; + + MemSlice::from_file(&file) + }, + ScanSourceRef::File(file) => MemSlice::from_file(file), + ScanSourceRef::Buffer(buff) => Ok((*buff).clone()), + } + } + pub fn to_memslice_possibly_async( &self, run_async: bool, diff --git a/crates/polars-python/src/lazyframe/sink.rs b/crates/polars-python/src/lazyframe/sink.rs index 97b1f0530fd9..95d932668d45 100644 --- a/crates/polars-python/src/lazyframe/sink.rs +++ b/crates/polars-python/src/lazyframe/sink.rs @@ -33,6 +33,11 @@ impl PyPartitioning { variant: PartitionVariant::MaxSize(max_size), } } + + #[getter] + fn path(&self) -> &str { + self.path.to_str().unwrap() + } } impl<'py> FromPyObject<'py> for SinkTarget { diff --git a/crates/polars-stream/src/nodes/io_sinks/csv.rs b/crates/polars-stream/src/nodes/io_sinks/csv.rs index 04e6ed450bdf..8f01c9ea6060 100644 --- a/crates/polars-stream/src/nodes/io_sinks/csv.rs +++ b/crates/polars-stream/src/nodes/io_sinks/csv.rs @@ -5,7 +5,9 @@ use polars_core::frame::DataFrame; use polars_core::schema::SchemaRef; use polars_error::PolarsResult; use polars_expr::state::ExecutionState; +use polars_io::cloud::CloudOptions; use polars_io::prelude::{CsvWriter, CsvWriterOptions}; +use polars_io::utils::file::AsyncWriteable; use polars_io::SerWriter; use polars_plan::dsl::SinkOptions; use polars_utils::priority::Priority; @@ -22,19 +24,22 @@ pub struct CsvSinkNode { schema: SchemaRef, sink_options: SinkOptions, write_options: CsvWriterOptions, + cloud_options: Option, } impl CsvSinkNode { pub fn new( - schema: SchemaRef, path: PathBuf, + schema: SchemaRef, sink_options: SinkOptions, write_options: CsvWriterOptions, + cloud_options: Option, ) -> Self { Self { path, schema, sink_options, write_options, + cloud_options, } } } @@ -144,35 +149,37 @@ impl SinkNode for CsvSinkNode { let schema = self.schema.clone(); let include_header = self.write_options.include_header; let include_bom = self.write_options.include_bom; + let cloud_options = self.cloud_options.clone(); let io_task = polars_io::pl_async::get_runtime().spawn(async move { - use tokio::fs::OpenOptions; use tokio::io::AsyncWriteExt; - let mut file = OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(path.as_path()) - .await - .map_err(|err| polars_utils::_limit_path_len_io_err(path.as_path(), err))?; + let mut file = polars_io::utils::file::Writeable::try_new( + path.to_str().unwrap(), + cloud_options.as_ref(), + )?; // Write the header if include_header || include_bom { - let mut std_file = file.into_std().await; - let mut writer = CsvWriter::new(&mut std_file) + let mut writer = CsvWriter::new(&mut *file) .include_bom(include_bom) .include_header(include_header) .n_threads(1) // Disable rayon parallelism .batched(&schema)?; writer.write_batch(&DataFrame::empty_with_schema(&schema))?; - file = tokio::fs::File::from_std(std_file); } + let mut file = file.try_into_async_writeable()?; + while let Some(Priority(_, buffer)) = lin_rx.get().await { file.write_all(&buffer).await?; } - tokio_sync_on_close(sink_options.sync_on_close, &mut file).await?; + if let AsyncWriteable::Local(file) = &mut file { + tokio_sync_on_close(sink_options.sync_on_close, file).await?; + } + + file.close().await?; + PolarsResult::Ok(()) }); join_handles.push(spawn(TaskPriority::Low, async move { diff --git a/crates/polars-stream/src/nodes/io_sinks/ipc.rs b/crates/polars-stream/src/nodes/io_sinks/ipc.rs index c660f07a0868..e5695a66a22c 100644 --- a/crates/polars-stream/src/nodes/io_sinks/ipc.rs +++ b/crates/polars-stream/src/nodes/io_sinks/ipc.rs @@ -12,7 +12,9 @@ use polars_core::utils::arrow::io::ipc::write::{ }; use polars_error::PolarsResult; use polars_expr::state::ExecutionState; +use polars_io::cloud::CloudOptions; use polars_io::ipc::{IpcWriter, IpcWriterOptions}; +use polars_io::utils::file::Writeable; use polars_io::SerWriter; use polars_plan::dsl::SinkOptions; use polars_utils::priority::Priority; @@ -39,6 +41,7 @@ pub struct IpcSinkNode { compat_level: CompatLevel, chunk_size: usize, + cloud_options: Option, } impl IpcSinkNode { @@ -47,6 +50,7 @@ impl IpcSinkNode { path: PathBuf, sink_options: SinkOptions, write_options: IpcWriterOptions, + cloud_options: Option, ) -> Self { Self { path, @@ -58,6 +62,7 @@ impl IpcSinkNode { compat_level: CompatLevel::newest(), // @TODO: make this accessible from outside chunk_size: get_ideal_morsel_size(), // @TODO: change to something more appropriate + cloud_options, } } } @@ -322,18 +327,14 @@ impl SinkNode for IpcSinkNode { let path = self.path.clone(); let sink_options = self.sink_options.clone(); let write_options = self.write_options; + let cloud_options = self.cloud_options.clone(); let input_schema = self.input_schema.clone(); let io_task = polars_io::pl_async::get_runtime().spawn(async move { - use tokio::fs::OpenOptions; - - let file = OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(path.as_path()) - .await?; - let mut file = file.into_std().await; - let writer = BufWriter::new(&mut file); + let mut file = polars_io::utils::file::Writeable::try_new( + path.to_str().unwrap(), + cloud_options.as_ref(), + )?; + let writer = BufWriter::new(&mut *file); let mut writer = IpcWriter::new(writer) .with_compression(write_options.compression) .with_parallel(false) @@ -348,7 +349,12 @@ impl SinkNode for IpcSinkNode { writer.finish()?; drop(writer); - sync_on_close(sink_options.sync_on_close, &mut file)?; + if let Writeable::Local(file) = &mut file { + sync_on_close(sink_options.sync_on_close, file)?; + } + + file.close()?; + PolarsResult::Ok(()) }); join_handles.push(spawn(TaskPriority::Low, async move { diff --git a/crates/polars-stream/src/nodes/io_sinks/json.rs b/crates/polars-stream/src/nodes/io_sinks/json.rs index 748f732038ee..f81dc106b32d 100644 --- a/crates/polars-stream/src/nodes/io_sinks/json.rs +++ b/crates/polars-stream/src/nodes/io_sinks/json.rs @@ -3,7 +3,9 @@ use std::path::PathBuf; use polars_error::PolarsResult; use polars_expr::state::ExecutionState; +use polars_io::cloud::CloudOptions; use polars_io::json::BatchedWriter; +use polars_io::utils::file::AsyncWriteable; use polars_plan::dsl::SinkOptions; use polars_utils::priority::Priority; @@ -17,10 +19,19 @@ type Linearized = Priority, Vec>; pub struct NDJsonSinkNode { path: PathBuf, sink_options: SinkOptions, + cloud_options: Option, } impl NDJsonSinkNode { - pub fn new(path: PathBuf, sink_options: SinkOptions) -> Self { - Self { path, sink_options } + pub fn new( + path: PathBuf, + sink_options: SinkOptions, + cloud_options: Option, + ) -> Self { + Self { + path, + sink_options, + cloud_options, + } } } @@ -102,6 +113,7 @@ impl SinkNode for NDJsonSinkNode { PolarsResult::Ok(()) }) })); + let cloud_options = self.cloud_options.clone(); // IO task. // @@ -109,22 +121,24 @@ impl SinkNode for NDJsonSinkNode { let sink_options = self.sink_options.clone(); let path = self.path.clone(); let io_task = polars_io::pl_async::get_runtime().spawn(async move { - use tokio::fs::OpenOptions; use tokio::io::AsyncWriteExt; - let mut file = OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(path.as_path()) - .await - .map_err(|err| polars_utils::_limit_path_len_io_err(path.as_path(), err))?; + let mut file = polars_io::utils::file::AsyncWriteable::try_new( + path.to_str().unwrap(), + cloud_options.as_ref(), + ) + .await?; while let Some(Priority(_, buffer)) = lin_rx.get().await { file.write_all(&buffer).await?; } - tokio_sync_on_close(sink_options.sync_on_close, &mut file).await?; + if let AsyncWriteable::Local(file) = &mut file { + tokio_sync_on_close(sink_options.sync_on_close, file).await?; + } + + file.close().await?; + PolarsResult::Ok(()) }); join_handles.push(spawn(TaskPriority::Low, async move { diff --git a/crates/polars-stream/src/nodes/io_sinks/parquet.rs b/crates/polars-stream/src/nodes/io_sinks/parquet.rs index f7cd8aba3fd9..4b970c320eeb 100644 --- a/crates/polars-stream/src/nodes/io_sinks/parquet.rs +++ b/crates/polars-stream/src/nodes/io_sinks/parquet.rs @@ -7,9 +7,11 @@ use polars_core::prelude::{ArrowSchema, CompatLevel}; use polars_core::schema::SchemaRef; use polars_error::PolarsResult; use polars_expr::state::ExecutionState; +use polars_io::cloud::CloudOptions; use polars_io::parquet::write::BatchedWriter; use polars_io::prelude::{get_encodings, ParquetWriteOptions}; use polars_io::schema_to_arrow_checked; +use polars_io::utils::file::Writeable; use polars_parquet::parquet::error::ParquetResult; use polars_parquet::read::ParquetError; use polars_parquet::write::{ @@ -40,6 +42,7 @@ pub struct ParquetSinkNode { parquet_schema: SchemaDescriptor, arrow_schema: ArrowSchema, encodings: Vec>, + cloud_options: Option, } impl ParquetSinkNode { @@ -48,6 +51,7 @@ impl ParquetSinkNode { path: &Path, sink_options: SinkOptions, write_options: &ParquetWriteOptions, + cloud_options: Option, ) -> PolarsResult { let schema = schema_to_arrow_checked(&input_schema, CompatLevel::newest(), "parquet")?; let parquet_schema = to_parquet_schema(&schema)?; @@ -63,6 +67,7 @@ impl ParquetSinkNode { parquet_schema, arrow_schema: schema, encodings, + cloud_options, }) } } @@ -251,22 +256,18 @@ impl SinkNode for ParquetSinkNode { // spawned once. let path = self.path.clone(); let sink_options = self.sink_options.clone(); + let cloud_options = self.cloud_options.clone(); let write_options = self.write_options; let arrow_schema = self.arrow_schema.clone(); let parquet_schema = self.parquet_schema.clone(); let encodings = self.encodings.clone(); let io_task = polars_io::pl_async::get_runtime().spawn(async move { - use tokio::fs::OpenOptions; + let mut file = polars_io::utils::file::Writeable::try_new( + path.to_str().unwrap(), + cloud_options.as_ref(), + )?; - let file = OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(path.as_path()) - .await - .map_err(|err| polars_utils::_limit_path_len_io_err(path.as_path(), err))?; - let mut file = file.into_std().await; - let writer = BufWriter::new(&mut file); + let writer = BufWriter::new(&mut *file); let write_options = WriteOptions { statistics: write_options.statistics, compression: write_options.compression.into(), @@ -292,7 +293,12 @@ impl SinkNode for ParquetSinkNode { writer.finish()?; drop(writer); - sync_on_close(sink_options.sync_on_close, &mut file)?; + if let Writeable::Local(file) = &mut file { + sync_on_close(sink_options.sync_on_close, file)?; + } + + file.close()?; + PolarsResult::Ok(()) }); join_handles.push(spawn(TaskPriority::Low, async move { diff --git a/crates/polars-stream/src/nodes/io_sinks/partition/mod.rs b/crates/polars-stream/src/nodes/io_sinks/partition/mod.rs index 854f7c4a80e6..3828a89c3d24 100644 --- a/crates/polars-stream/src/nodes/io_sinks/partition/mod.rs +++ b/crates/polars-stream/src/nodes/io_sinks/partition/mod.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use polars_core::prelude::PlHashMap; use polars_core::schema::SchemaRef; use polars_error::PolarsResult; +use polars_io::cloud::CloudOptions; use polars_plan::dsl::{FileType, PartitionVariant, SinkOptions}; use polars_utils::pl_str::PlSmallStr; @@ -50,6 +51,7 @@ pub fn get_create_new_fn( file_type: FileType, sink_options: SinkOptions, args_to_path: ArgsToPathFn, + cloud_options: Option, ) -> CreateNewSinkFn { match file_type { #[cfg(feature = "ipc")] @@ -60,6 +62,7 @@ pub fn get_create_new_fn( path.clone(), sink_options.clone(), ipc_writer_options, + cloud_options.clone(), )) as Box; Ok((path, sink, args)) }) as _, @@ -69,6 +72,7 @@ pub fn get_create_new_fn( let sink = Box::new(super::json::NDJsonSinkNode::new( path.clone(), sink_options.clone(), + cloud_options.clone(), )) as Box; Ok((path, sink, args)) }) as _, @@ -80,6 +84,7 @@ pub fn get_create_new_fn( path.as_path(), sink_options.clone(), &parquet_writer_options, + cloud_options.clone(), )?) as Box; Ok((path, sink, args)) }) as _, @@ -87,10 +92,11 @@ pub fn get_create_new_fn( FileType::Csv(csv_writer_options) => Arc::new(move |input_schema, args| { let (path, args) = args_to_path(args); let sink = Box::new(super::csv::CsvSinkNode::new( - input_schema, path.clone(), + input_schema, sink_options.clone(), csv_writer_options.clone(), + cloud_options.clone(), )) as Box; Ok((path, sink, args)) }) as _, diff --git a/crates/polars-stream/src/nodes/io_sources/csv.rs b/crates/polars-stream/src/nodes/io_sources/csv.rs index d6499eace002..47a9c2fef40e 100644 --- a/crates/polars-stream/src/nodes/io_sources/csv.rs +++ b/crates/polars-stream/src/nodes/io_sources/csv.rs @@ -135,6 +135,7 @@ impl SourceNode for CsvSourceNode { if morsel_output.port.send(morsel).await.is_err() { break; } + wait_group.wait().await; if source_token.stop_requested() { diff --git a/crates/polars-stream/src/nodes/io_sources/ipc.rs b/crates/polars-stream/src/nodes/io_sources/ipc.rs index 2f7cae832a95..67e168b64e2f 100644 --- a/crates/polars-stream/src/nodes/io_sources/ipc.rs +++ b/crates/polars-stream/src/nodes/io_sources/ipc.rs @@ -64,7 +64,7 @@ impl IpcSourceNode { source: ScanSource, file_info: FileInfo, options: IpcScanOptions, - _cloud_options: Option, + cloud_options: Option, file_options: FileScanOptions, mut metadata: Option>, ) -> PolarsResult { @@ -87,10 +87,30 @@ impl IpcSourceNode { allow_missing_columns: _, } = file_options; - let memslice = source.as_scan_source_ref().to_memslice()?; + let memslice = { + if let ScanSourceRef::Path(p) = source.as_scan_source_ref() { + if source.run_async() { + polars_io::file_cache::init_entries_from_uri_list( + &[Arc::from(p.to_str().unwrap())], + cloud_options.as_ref(), + )?; + } + } + + // check_latest: IR resolution does not download IPC. + + source + .as_scan_source_ref() + .to_memslice_async_check_latest(source.run_async())? + }; + + #[allow(clippy::match_single_binding)] let metadata = match metadata.take() { - Some(md) => md, - None => Arc::new(read_file_metadata(&mut std::io::Cursor::new( + // TODO: Don't know why, this metadata does not match the file. This was during testing + // against a cloud scan: + // * ComputeError: out-of-spec: InvalidBuffersLength { buffers_size: 7200, file_size: 453 } + // Some(md) => md, + _ => Arc::new(read_file_metadata(&mut std::io::Cursor::new( memslice.as_ref(), ))?), }; @@ -138,7 +158,7 @@ fn slice_take(slice: &mut Range, n: usize) -> Range { let offset = slice.start; let length = slice.len(); - assert!(offset < n); + assert!(offset <= n); let chunk_length = (n - offset).min(length); let rng = offset..offset + chunk_length; @@ -498,7 +518,7 @@ impl MultiScanable for IpcSourceNode { let options = options.clone(); // TODO - // * `to_memslice_async_assume_latest` being a non-async function is not ideal. + // * `to_memslice_async_check_latest` being a non-async function is not ideal. // * This is also downloading the whole file even if there is a projection let memslice = { if let ScanSourceRef::Path(p) = source.as_scan_source_ref() { @@ -510,7 +530,7 @@ impl MultiScanable for IpcSourceNode { source .as_scan_source_ref() - .to_memslice_async_assume_latest(source.run_async())? + .to_memslice_async_check_latest(source.run_async())? }; let metadata = Arc::new(read_file_metadata(&mut std::io::Cursor::new( memslice.as_ref(), diff --git a/crates/polars-stream/src/physical_plan/lower_ir.rs b/crates/polars-stream/src/physical_plan/lower_ir.rs index 836fc75a76dc..251068b8bc29 100644 --- a/crates/polars-stream/src/physical_plan/lower_ir.rs +++ b/crates/polars-stream/src/physical_plan/lower_ir.rs @@ -229,11 +229,12 @@ pub fn lower_ir( path, sink_options, file_type, - cloud_options: _, + cloud_options, } => { let path = path.clone(); let sink_options = sink_options.clone(); let file_type = file_type.clone(); + let cloud_options = cloud_options.clone(); match file_type { #[cfg(feature = "ipc")] @@ -244,6 +245,7 @@ pub fn lower_ir( sink_options, file_type, input: phys_input, + cloud_options, } }, #[cfg(feature = "parquet")] @@ -254,6 +256,7 @@ pub fn lower_ir( sink_options, file_type, input: phys_input, + cloud_options, } }, #[cfg(feature = "csv")] @@ -264,6 +267,7 @@ pub fn lower_ir( sink_options, file_type, input: phys_input, + cloud_options, } }, #[cfg(feature = "json")] @@ -274,6 +278,7 @@ pub fn lower_ir( sink_options, file_type, input: phys_input, + cloud_options, } }, } @@ -283,12 +288,13 @@ pub fn lower_ir( sink_options, variant, file_type, - cloud_options: _, + cloud_options, } => { let path_f_string = path_f_string.clone(); let sink_options = sink_options.clone(); let variant = variant.clone(); let file_type = file_type.clone(); + let cloud_options = cloud_options.clone(); let phys_input = lower_ir!(*input)?; PhysNodeKind::PartitionSink { @@ -297,6 +303,7 @@ pub fn lower_ir( variant, file_type, input: phys_input, + cloud_options, } }, }, diff --git a/crates/polars-stream/src/physical_plan/mod.rs b/crates/polars-stream/src/physical_plan/mod.rs index aaba84c1d10f..9906a66dacbb 100644 --- a/crates/polars-stream/src/physical_plan/mod.rs +++ b/crates/polars-stream/src/physical_plan/mod.rs @@ -6,6 +6,7 @@ use polars_core::prelude::{IdxSize, InitHashMaps, PlHashMap, SortMultipleOptions use polars_core::schema::{Schema, SchemaRef}; use polars_core::utils::arrow::bitmap::Bitmap; use polars_error::PolarsResult; +use polars_io::cloud::CloudOptions; use polars_io::RowIndex; use polars_ops::frame::JoinArgs; use polars_plan::dsl::{ @@ -134,6 +135,7 @@ pub enum PhysNodeKind { sink_options: SinkOptions, file_type: FileType, input: PhysStream, + cloud_options: Option, }, PartitionSink { @@ -142,6 +144,7 @@ pub enum PhysNodeKind { variant: PartitionVariant, file_type: FileType, input: PhysStream, + cloud_options: Option, }, /// Generic fallback for (as-of-yet) unsupported streaming mappings. diff --git a/crates/polars-stream/src/physical_plan/to_graph.rs b/crates/polars-stream/src/physical_plan/to_graph.rs index 4b68269c826e..7f0170677e36 100644 --- a/crates/polars-stream/src/physical_plan/to_graph.rs +++ b/crates/polars-stream/src/physical_plan/to_graph.rs @@ -225,6 +225,7 @@ fn to_graph_rec<'a>( sink_options, file_type, input, + cloud_options, } => { let sink_options = sink_options.clone(); let input_schema = ctx.phys_sm[input.node].output_schema.clone(); @@ -238,6 +239,7 @@ fn to_graph_rec<'a>( path.to_path_buf(), sink_options, *ipc_writer_options, + cloud_options.clone(), )), [(input_key, input.port)], ), @@ -246,6 +248,7 @@ fn to_graph_rec<'a>( SinkComputeNode::from(nodes::io_sinks::json::NDJsonSinkNode::new( path.to_path_buf(), sink_options, + cloud_options.clone(), )), [(input_key, input.port)], ), @@ -256,16 +259,18 @@ fn to_graph_rec<'a>( path, sink_options, parquet_writer_options, + cloud_options.clone(), )?), [(input_key, input.port)], ), #[cfg(feature = "csv")] FileType::Csv(csv_writer_options) => ctx.graph.add_node( SinkComputeNode::from(nodes::io_sinks::csv::CsvSinkNode::new( - input_schema, path.to_path_buf(), + input_schema, sink_options, csv_writer_options.clone(), + cloud_options.clone(), )), [(input_key, input.port)], ), @@ -287,6 +292,7 @@ fn to_graph_rec<'a>( variant, file_type, input, + cloud_options, } => { let input_schema = ctx.phys_sm[input.node].output_schema.clone(); let input_key = to_graph_rec(input.node, ctx)?; @@ -298,6 +304,7 @@ fn to_graph_rec<'a>( file_type.clone(), sink_options.clone(), args_to_path, + cloud_options.clone(), ); match variant { diff --git a/py-polars/polars/io/cloud/_utils.py b/py-polars/polars/io/cloud/_utils.py index dba620560db4..71f24ab81117 100644 --- a/py-polars/polars/io/cloud/_utils.py +++ b/py-polars/polars/io/cloud/_utils.py @@ -4,6 +4,7 @@ from typing import Any from polars._utils.various import is_path_or_str_sequence +from polars.io.partition import PartitionMaxSize def _first_scan_path( @@ -13,6 +14,8 @@ def _first_scan_path( return source elif is_path_or_str_sequence(source) and source: return source[0] + elif isinstance(source, PartitionMaxSize): + return source._path return None diff --git a/py-polars/polars/io/partition.py b/py-polars/polars/io/partition.py index 928f905a9942..1b26325cf55d 100644 --- a/py-polars/polars/io/partition.py +++ b/py-polars/polars/io/partition.py @@ -37,3 +37,7 @@ class PartitionMaxSize: def __init__(self, path: Path | str, *, max_size: int) -> None: issue_unstable_warning("Partitioning strategies are considered unstable.") self._p = PyPartitioning.new_max_size(path, max_size) + + @property + def _path(self) -> str: + return self._p.path