From a599b9ea2f0869bba82eea7ffdca82eff97785b2 Mon Sep 17 00:00:00 2001 From: nameexhaustion Date: Thu, 6 Mar 2025 19:38:08 +1100 Subject: [PATCH] refactor(rust): Introduce `Writeable` and `AsyncWriteable` (#21599) --- crates/polars-io/src/cloud/adaptors.rs | 152 ++++++++------- crates/polars-io/src/utils/file.rs | 254 +++++++++++++++++++------ 2 files changed, 280 insertions(+), 126 deletions(-) diff --git a/crates/polars-io/src/cloud/adaptors.rs b/crates/polars-io/src/cloud/adaptors.rs index a77800bbd54e..433420ed5834 100644 --- a/crates/polars-io/src/cloud/adaptors.rs +++ b/crates/polars-io/src/cloud/adaptors.rs @@ -5,35 +5,15 @@ use std::sync::Arc; use object_store::buffered::BufWriter; use object_store::path::Path; use object_store::ObjectStore; -use polars_error::{to_compute_err, PolarsResult}; +use polars_error::PolarsResult; +use polars_utils::file::WriteClose; use tokio::io::AsyncWriteExt; use super::{object_path_from_str, CloudOptions}; use crate::pl_async::{get_runtime, get_upload_chunk_size}; -enum WriterState { - Open(BufWriter), - /// Note: `Err` state is also used as the close state on success. - Err(std::io::Error), -} - -impl WriterState { - fn try_with_writer(&mut self, func: F) -> std::io::Result - where - F: Fn(&mut BufWriter) -> std::io::Result, - { - match self { - Self::Open(writer) => match func(writer) { - Ok(v) => Ok(v), - Err(e) => { - let _ = get_runtime().block_on_potential_spawn(writer.abort()); - *self = Self::Err(e); - self.try_with_writer(func) - }, - }, - Self::Err(e) => Err(std::io::Error::new(e.kind(), e.to_string())), - } - } +fn clone_io_err(e: &std::io::Error) -> std::io::Error { + std::io::Error::new(e.kind(), e.to_string()) } /// Adaptor which wraps the interface of [ObjectStore::BufWriter] exposing a synchronous interface @@ -43,13 +23,12 @@ impl WriterState { /// such as with `polars::prelude::CsvWriter`. /// /// [ObjectStore::BufWriter]: https://docs.rs/object_store/latest/object_store/buffered/struct.BufWriter.html -pub struct CloudWriter { - // Internal writer, constructed at creation - inner: WriterState, +pub struct BlockingCloudWriter { + state: std::io::Result, } -impl CloudWriter { - /// Construct a new CloudWriter, re-using the given `object_store` +impl BlockingCloudWriter { + /// Construct a new BlockingCloudWriter, re-using the given `object_store` /// /// Creates a new (current-thread) Tokio runtime /// which bridges the sync writing process with the async ObjectStore multipart uploading. @@ -59,21 +38,19 @@ impl CloudWriter { path: Path, ) -> PolarsResult { let writer = BufWriter::with_capacity(object_store, path, get_upload_chunk_size()); - Ok(CloudWriter { - inner: WriterState::Open(writer), - }) + Ok(BlockingCloudWriter { state: Ok(writer) }) } - /// Constructs a new CloudWriter from a path and an optional set of CloudOptions. + /// Constructs a new BlockingCloudWriter from a path and an optional set of CloudOptions. /// - /// Wrapper around `CloudWriter::new_with_object_store` that is useful if you only have a single write task. + /// Wrapper around `BlockingCloudWriter::new_with_object_store` that is useful if you only have a single write task. /// TODO: Naming? pub async fn new(uri: &str, cloud_options: Option<&CloudOptions>) -> PolarsResult { if let Some(local_path) = uri.strip_prefix("file://") { // Local paths must be created first, otherwise object store will not write anything. if !matches!(std::fs::exists(local_path), Ok(true)) { panic!( - "[CloudWriter] Expected local file to be created: {}", + "[BlockingCloudWriter] Expected local file to be created: {}", local_path ); } @@ -87,64 +64,96 @@ impl CloudWriter { ) } - pub fn close(&mut self) -> PolarsResult<()> { - let WriterState::Open(writer) = &mut self.inner else { - panic!(); - }; - - get_runtime() - .block_on_potential_spawn(async { writer.shutdown().await }) - .map_err(to_compute_err)?; + /// Returns the underlying [`object_store::buffered::BufWriter`] + pub fn try_into_inner(mut self) -> std::io::Result { + // We can't just return self.state: + // * cannot move out of type `adaptors::BlockingCloudWriter`, which implements the `Drop` trait + std::mem::replace( + &mut self.state, + Err(std::io::Error::new(std::io::ErrorKind::Other, "")), + ) + } - self.inner = WriterState::Err(std::io::Error::new( - std::io::ErrorKind::Other, - "impl error: file was closed", - )); + /// Closes the writer, or returns the existing error if it exists. After this function is called + /// the writer is guaranteed to be in an error state. + pub fn close(&mut self) -> std::io::Result<()> { + match self + .try_with_writer(|writer| get_runtime().block_on_potential_spawn(writer.shutdown())) + { + Ok(_) => { + self.state = Err(std::io::Error::new(std::io::ErrorKind::Other, "closed")); + Ok(()) + }, + Err(e) => Err(e), + } + } - Ok(()) + fn try_with_writer(&mut self, func: F) -> std::io::Result + where + F: Fn(&mut BufWriter) -> std::io::Result, + { + let writer: &mut BufWriter = self.state.as_mut().map_err(|e| clone_io_err(e))?; + match func(writer) { + Ok(v) => Ok(v), + Err(e) => { + self.state = Err(clone_io_err(&e)); + Err(e) + }, + } } } -impl std::io::Write for CloudWriter { +impl std::io::Write for BlockingCloudWriter { fn write(&mut self, buf: &[u8]) -> std::io::Result { // SAFETY: - // We extend the lifetime for the duration of this function. This is safe as well block the + // We extend the lifetime for the duration of this function. This is safe as we block the // async runtime here let buf = unsafe { std::mem::transmute::<&[u8], &'static [u8]>(buf) }; - self.inner.try_with_writer(|writer| { + self.try_with_writer(|writer| { get_runtime() .block_on_potential_spawn(async { writer.write_all(buf).await.map(|_t| buf.len()) }) }) } fn flush(&mut self) -> std::io::Result<()> { - self.inner.try_with_writer(|writer| { - get_runtime().block_on_potential_spawn(async { writer.flush().await }) - }) + self.try_with_writer(|writer| get_runtime().block_on_potential_spawn(writer.flush())) + } +} + +impl WriteClose for BlockingCloudWriter { + fn close(mut self: Box) -> std::io::Result<()> { + BlockingCloudWriter::close(self.as_mut()) } } -impl Drop for CloudWriter { +impl Drop for BlockingCloudWriter { fn drop(&mut self) { - // TODO: Properly raise this error instead of panicking. - match self.inner { - WriterState::Open(_) => self.close().unwrap(), - WriterState::Err(_) => {}, + if self.state.is_err() { + return; + } + + // Note: We should not hit here - the writer should instead be explicitly closed. + // But we still have this here as a safety measure to prevent silently dropping errors. + match self.close() { + Ok(()) => {}, + e @ Err(_) => { + if std::thread::panicking() { + eprintln!("ERROR: CloudWriter errored on close: {:?}", e) + } else { + e.unwrap() + } + }, } } } -#[cfg(feature = "csv")] #[cfg(test)] mod tests { + use polars_core::df; use polars_core::prelude::DataFrame; - use super::*; - use crate::prelude::CsvReadOptions; - use crate::SerReader; - fn example_dataframe() -> DataFrame { df!( "foo" => &[1, 2, 3], @@ -154,7 +163,9 @@ mod tests { } #[test] + #[cfg(feature = "csv")] fn csv_to_local_objectstore_cloudwriter() { + use super::*; use crate::csv::write::CsvWriter; use crate::prelude::SerWriter; @@ -167,7 +178,8 @@ mod tests { let path: object_store::path::Path = "cloud_writer_example.csv".into(); - let mut cloud_writer = CloudWriter::new_with_object_store(object_store, path).unwrap(); + let mut cloud_writer = + BlockingCloudWriter::new_with_object_store(object_store, path).unwrap(); CsvWriter::new(&mut cloud_writer) .finish(&mut df) .expect("Could not write DataFrame as CSV to remote location"); @@ -175,10 +187,13 @@ mod tests { // Skip this tests on Windows since it does not have a convenient /tmp/ location. #[cfg_attr(target_os = "windows", ignore)] + #[cfg(feature = "csv")] #[test] fn cloudwriter_from_cloudlocation_test() { + use super::*; use crate::csv::write::CsvWriter; - use crate::prelude::SerWriter; + use crate::prelude::{CsvReadOptions, SerWriter}; + use crate::SerReader; let mut df = example_dataframe(); @@ -187,7 +202,10 @@ mod tests { std::fs::File::create(path).unwrap(); let mut cloud_writer = get_runtime() - .block_on(CloudWriter::new(format!("file://{}", path).as_str(), None)) + .block_on(BlockingCloudWriter::new( + format!("file://{}", path).as_str(), + None, + )) .unwrap(); CsvWriter::new(&mut cloud_writer) diff --git a/crates/polars-io/src/utils/file.rs b/crates/polars-io/src/utils/file.rs index a788871fc173..7f808c5a67b5 100644 --- a/crates/polars-io/src/utils/file.rs +++ b/crates/polars-io/src/utils/file.rs @@ -1,90 +1,226 @@ +use std::ops::{Deref, DerefMut}; use std::path::Path; +#[cfg(feature = "cloud")] +pub use async_writeable::AsyncWriteable; use polars_core::config; -use polars_error::{feature_gated, PolarsResult}; +use polars_error::{feature_gated, PolarsError, PolarsResult}; use polars_utils::create_file; use polars_utils::file::{ClosableFile, WriteClose}; use polars_utils::mmap::ensure_not_mapped; use crate::cloud::CloudOptions; -#[cfg(feature = "cloud")] -use crate::cloud::CloudWriter; use crate::{is_cloud_url, resolve_homedir}; -#[cfg(feature = "cloud")] -impl WriteClose for CloudWriter {} -/// Open a path for writing. Supports cloud paths. -pub fn try_get_writeable( - path: &str, - #[cfg_attr(not(feature = "cloud"), allow(unused))] cloud_options: Option<&CloudOptions>, -) -> PolarsResult> { - let is_cloud = is_cloud_url(path); - let verbose = config::verbose(); +/// Holds a non-async writeable file, abstracted over local files or cloud files. +/// +/// This implements `DerefMut` to a trait object implementing [`std::io::Write`]. +/// +/// Also see: `Writeable::try_into_async_writeable` and `AsyncWriteable`. +pub enum Writeable { + Local(std::fs::File), + #[cfg(feature = "cloud")] + Cloud(crate::cloud::BlockingCloudWriter), +} - if is_cloud { - feature_gated!("cloud", { - use crate::cloud::CloudWriter; +impl Writeable { + pub fn try_new( + path: &str, + #[cfg_attr(not(feature = "cloud"), allow(unused))] cloud_options: Option<&CloudOptions>, + ) -> PolarsResult { + let is_cloud = is_cloud_url(path); + let verbose = config::verbose(); - if verbose { - eprintln!("try_get_writeable: cloud: {}", path) - } + if is_cloud { + feature_gated!("cloud", { + use crate::cloud::BlockingCloudWriter; - if path.starts_with("file://") { - create_file(Path::new(&path[const { "file://".len() }..]))?; - } + if verbose { + eprintln!("Writeable: try_new: cloud: {}", path) + } + + if path.starts_with("file://") { + create_file(Path::new(&path[const { "file://".len() }..]))?; + } + + let writer = crate::pl_async::get_runtime() + .block_on_potential_spawn(BlockingCloudWriter::new(path, cloud_options))?; + Ok(Self::Cloud(writer)) + }) + } else if config::force_async() { + feature_gated!("cloud", { + use crate::cloud::BlockingCloudWriter; + + let path = resolve_homedir(&path); + + if verbose { + eprintln!( + "Writeable: try_new: forced async: {}", + path.to_str().unwrap() + ) + } + + create_file(&path)?; + let path = std::fs::canonicalize(&path)?; + + ensure_not_mapped(&path.metadata()?)?; + + let path = format!( + "file://{}", + if cfg!(target_family = "windows") { + path.to_str().unwrap().strip_prefix(r#"\\?\"#).unwrap() + } else { + path.to_str().unwrap() + } + ); - let writer = crate::pl_async::get_runtime() - .block_on_potential_spawn(CloudWriter::new(path, cloud_options))?; - Ok(Box::new(writer)) - }) - } else if config::force_async() { - feature_gated!("cloud", { - use crate::cloud::CloudWriter; + if verbose { + eprintln!("Writeable: try_new: forced async converted path: {}", path) + } + let writer = crate::pl_async::get_runtime() + .block_on_potential_spawn(BlockingCloudWriter::new(&path, cloud_options))?; + Ok(Self::Cloud(writer)) + }) + } else { let path = resolve_homedir(&path); + create_file(&path)?; + + // Note: `canonicalize` does not work on some systems. if verbose { eprintln!( - "try_get_writeable: forced async: {}", - path.to_str().unwrap() + "Writeable: try_new: local: {} (canonicalize: {:?})", + path.to_str().unwrap(), + std::fs::canonicalize(&path) ) } - create_file(&path)?; - let path = std::fs::canonicalize(&path)?; + Ok(Self::Local(polars_utils::open_file_write(&path)?)) + } + } - ensure_not_mapped(&path.metadata()?)?; + /// This returns `Result<>` - if a write was performed before calling this, + /// `CloudWriter` can be in an Err(_) state. + #[cfg(feature = "cloud")] + pub fn try_into_async_writeable(self) -> PolarsResult { + match self { + Self::Local(v) => Ok(AsyncWriteable::Local(tokio::fs::File::from_std(v))), + // Moves the `BufWriter` out of the `BlockingCloudWriter` wrapper, as + // `BlockingCloudWriter` has a `Drop` impl that we don't want. + Self::Cloud(v) => v + .try_into_inner() + .map(AsyncWriteable::Cloud) + .map_err(PolarsError::from), + } + } - let path = format!( - "file://{}", - if cfg!(target_family = "windows") { - path.to_str().unwrap().strip_prefix(r#"\\?\"#).unwrap() - } else { - path.to_str().unwrap() - } - ); + pub fn close(self) -> std::io::Result<()> { + match self { + Self::Local(v) => ClosableFile::from(v).close(), + #[cfg(feature = "cloud")] + Self::Cloud(mut v) => v.close(), + } + } +} - if verbose { - eprintln!("try_get_writeable: forced async converted path: {}", path) +impl Deref for Writeable { + type Target = dyn std::io::Write + Send; + + fn deref(&self) -> &Self::Target { + match self { + Self::Local(v) => v, + #[cfg(feature = "cloud")] + Self::Cloud(v) => v, + } + } +} + +impl DerefMut for Writeable { + fn deref_mut(&mut self) -> &mut Self::Target { + match self { + Self::Local(v) => v, + #[cfg(feature = "cloud")] + Self::Cloud(v) => v, + } + } +} + +/// Note: Prefer using [`Writeable`] / [`Writeable::try_new`] where possible. +/// +/// Open a path for writing. Supports cloud paths. +pub fn try_get_writeable( + path: &str, + cloud_options: Option<&CloudOptions>, +) -> PolarsResult> { + Writeable::try_new(path, cloud_options).map(|x| match x { + Writeable::Local(v) => Box::new(ClosableFile::from(v)) as Box, + #[cfg(feature = "cloud")] + Writeable::Cloud(v) => Box::new(v) as Box, + }) +} + +#[cfg(feature = "cloud")] +mod async_writeable { + use std::ops::{Deref, DerefMut}; + + use polars_error::{PolarsError, PolarsResult}; + use polars_utils::file::ClosableFile; + use tokio::io::AsyncWriteExt; + + use super::Writeable; + use crate::cloud::CloudOptions; + + /// Holds an async writeable file, abstracted over local files or cloud files. + /// + /// This implements `DerefMut` to a trait object implementing [`tokio::io::AsyncWrite`]. + /// + /// Note: It is important that you do not call `shutdown()` on the deref'ed `AsyncWrite` object. + /// You should instead call the [`AsyncWriteable::close`] at the end. + pub enum AsyncWriteable { + Local(tokio::fs::File), + Cloud(object_store::buffered::BufWriter), + } + + impl AsyncWriteable { + pub async fn try_new( + path: &str, + cloud_options: Option<&CloudOptions>, + ) -> PolarsResult { + // TODO: Native async impl + Writeable::try_new(path, cloud_options).and_then(|x| x.try_into_async_writeable()) + } + + pub async fn close(self) -> PolarsResult<()> { + match self { + Self::Local(v) => async { + let f = v.into_std().await; + ClosableFile::from(f).close() + } + .await + .map_err(PolarsError::from), + Self::Cloud(mut v) => v.shutdown().await.map_err(PolarsError::from), } + } + } - let writer = crate::pl_async::get_runtime() - .block_on_potential_spawn(CloudWriter::new(&path, cloud_options))?; - Ok(Box::new(writer)) - }) - } else { - let path = resolve_homedir(&path); - create_file(&path)?; - - if verbose { - eprintln!( - "try_get_writeable: local: {} (canonicalize: {:?})", - path.to_str().unwrap(), - std::fs::canonicalize(&path) - ) + impl Deref for AsyncWriteable { + type Target = dyn tokio::io::AsyncWrite + Send + Unpin; + + fn deref(&self) -> &Self::Target { + match self { + Self::Local(v) => v, + Self::Cloud(v) => v, + } } + } - let f: ClosableFile = polars_utils::open_file_write(&path)?.into(); - Ok(Box::new(f)) + impl DerefMut for AsyncWriteable { + fn deref_mut(&mut self) -> &mut Self::Target { + match self { + Self::Local(v) => v, + Self::Cloud(v) => v, + } + } } }