Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(rust): Introduce Writeable and AsyncWriteable #21599

Merged
merged 5 commits into from
Mar 6, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 85 additions & 67 deletions crates/polars-io/src/cloud/adaptors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,15 @@ use std::sync::Arc;
use object_store::buffered::BufWriter;
use object_store::path::Path;
use object_store::ObjectStore;
use polars_error::{to_compute_err, PolarsResult};
use polars_error::PolarsResult;
use polars_utils::file::WriteClose;
use tokio::io::AsyncWriteExt;

use super::{object_path_from_str, CloudOptions};
use crate::pl_async::{get_runtime, get_upload_chunk_size};

enum WriterState {
Open(BufWriter),
/// Note: `Err` state is also used as the close state on success.
Err(std::io::Error),
}

impl WriterState {
fn try_with_writer<F, O>(&mut self, func: F) -> std::io::Result<O>
where
F: Fn(&mut BufWriter) -> std::io::Result<O>,
{
match self {
Self::Open(writer) => match func(writer) {
Ok(v) => Ok(v),
Err(e) => {
let _ = get_runtime().block_on_potential_spawn(writer.abort());
*self = Self::Err(e);
self.try_with_writer(func)
},
},
Self::Err(e) => Err(std::io::Error::new(e.kind(), e.to_string())),
}
}
fn clone_io_err(e: &std::io::Error) -> std::io::Error {
std::io::Error::new(e.kind(), e.to_string())
}

/// Adaptor which wraps the interface of [ObjectStore::BufWriter] exposing a synchronous interface
Expand All @@ -43,13 +23,12 @@ impl WriterState {
/// such as with `polars::prelude::CsvWriter`.
///
/// [ObjectStore::BufWriter]: https://docs.rs/object_store/latest/object_store/buffered/struct.BufWriter.html
pub struct CloudWriter {
// Internal writer, constructed at creation
inner: WriterState,
pub struct BlockingCloudWriter {
state: std::io::Result<BufWriter>,
}

impl CloudWriter {
/// Construct a new CloudWriter, re-using the given `object_store`
impl BlockingCloudWriter {
/// Construct a new BlockingCloudWriter, re-using the given `object_store`
///
/// Creates a new (current-thread) Tokio runtime
/// which bridges the sync writing process with the async ObjectStore multipart uploading.
Expand All @@ -59,21 +38,19 @@ impl CloudWriter {
path: Path,
) -> PolarsResult<Self> {
let writer = BufWriter::with_capacity(object_store, path, get_upload_chunk_size());
Ok(CloudWriter {
inner: WriterState::Open(writer),
})
Ok(BlockingCloudWriter { state: Ok(writer) })
}

/// Constructs a new CloudWriter from a path and an optional set of CloudOptions.
/// Constructs a new BlockingCloudWriter from a path and an optional set of CloudOptions.
///
/// Wrapper around `CloudWriter::new_with_object_store` that is useful if you only have a single write task.
/// Wrapper around `BlockingCloudWriter::new_with_object_store` that is useful if you only have a single write task.
/// TODO: Naming?
pub async fn new(uri: &str, cloud_options: Option<&CloudOptions>) -> PolarsResult<Self> {
if let Some(local_path) = uri.strip_prefix("file://") {
// Local paths must be created first, otherwise object store will not write anything.
if !matches!(std::fs::exists(local_path), Ok(true)) {
panic!(
"[CloudWriter] Expected local file to be created: {}",
"[BlockingCloudWriter] Expected local file to be created: {}",
local_path
);
}
Expand All @@ -87,64 +64,96 @@ impl CloudWriter {
)
}

pub fn close(&mut self) -> PolarsResult<()> {
let WriterState::Open(writer) = &mut self.inner else {
panic!();
};

get_runtime()
.block_on_potential_spawn(async { writer.shutdown().await })
.map_err(to_compute_err)?;
/// Returns the underlying [`object_store::buffered::BufWriter`]
pub fn try_into_inner(mut self) -> std::io::Result<BufWriter> {
// We can't just return self.state:
// * cannot move out of type `adaptors::BlockingCloudWriter`, which implements the `Drop` trait
std::mem::replace(
&mut self.state,
Err(std::io::Error::new(std::io::ErrorKind::Other, "")),
)
}

self.inner = WriterState::Err(std::io::Error::new(
std::io::ErrorKind::Other,
"impl error: file was closed",
));
/// Closes the writer, or returns the existing error if it exists. After this function is called
/// the writer is guaranteed to be in an error state.
pub fn close(&mut self) -> std::io::Result<()> {
match self
.try_with_writer(|writer| get_runtime().block_on_potential_spawn(writer.shutdown()))
{
Ok(_) => {
self.state = Err(std::io::Error::new(std::io::ErrorKind::Other, "closed"));
Ok(())
},
Err(e) => Err(e),
}
}

Ok(())
fn try_with_writer<F, O>(&mut self, func: F) -> std::io::Result<O>
where
F: Fn(&mut BufWriter) -> std::io::Result<O>,
{
let writer: &mut BufWriter = self.state.as_mut().map_err(|e| clone_io_err(e))?;
match func(writer) {
Ok(v) => Ok(v),
Err(e) => {
self.state = Err(clone_io_err(&e));
Err(e)
},
}
}
}

impl std::io::Write for CloudWriter {
impl std::io::Write for BlockingCloudWriter {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
// SAFETY:
// We extend the lifetime for the duration of this function. This is safe as well block the
// We extend the lifetime for the duration of this function. This is safe as we block the
// async runtime here
let buf = unsafe { std::mem::transmute::<&[u8], &'static [u8]>(buf) };

self.inner.try_with_writer(|writer| {
self.try_with_writer(|writer| {
get_runtime()
.block_on_potential_spawn(async { writer.write_all(buf).await.map(|_t| buf.len()) })
})
}

fn flush(&mut self) -> std::io::Result<()> {
self.inner.try_with_writer(|writer| {
get_runtime().block_on_potential_spawn(async { writer.flush().await })
})
self.try_with_writer(|writer| get_runtime().block_on_potential_spawn(writer.flush()))
}
}

impl WriteClose for BlockingCloudWriter {
fn close(mut self: Box<Self>) -> std::io::Result<()> {
BlockingCloudWriter::close(self.as_mut())
}
}

impl Drop for CloudWriter {
impl Drop for BlockingCloudWriter {
fn drop(&mut self) {
// TODO: Properly raise this error instead of panicking.
match self.inner {
WriterState::Open(_) => self.close().unwrap(),
WriterState::Err(_) => {},
if self.state.is_err() {
return;
}

// Note: We should not hit here - the writer should instead be explicitly closed.
// But we still have this here as a safety measure to prevent silently dropping errors.
match self.close() {
Ok(()) => {},
e @ Err(_) => {
if std::thread::panicking() {
eprintln!("ERROR: CloudWriter errored on close: {:?}", e)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drive-by, just print an error message if already panicking to avoid abort

} else {
e.unwrap()
}
},
}
}
}

#[cfg(feature = "csv")]
#[cfg(test)]
mod tests {

use polars_core::df;
use polars_core::prelude::DataFrame;

use super::*;
use crate::prelude::CsvReadOptions;
use crate::SerReader;

fn example_dataframe() -> DataFrame {
df!(
"foo" => &[1, 2, 3],
Expand All @@ -154,7 +163,9 @@ mod tests {
}

#[test]
#[cfg(feature = "csv")]
fn csv_to_local_objectstore_cloudwriter() {
use super::*;
use crate::csv::write::CsvWriter;
use crate::prelude::SerWriter;

Expand All @@ -167,18 +178,22 @@ mod tests {

let path: object_store::path::Path = "cloud_writer_example.csv".into();

let mut cloud_writer = CloudWriter::new_with_object_store(object_store, path).unwrap();
let mut cloud_writer =
BlockingCloudWriter::new_with_object_store(object_store, path).unwrap();
CsvWriter::new(&mut cloud_writer)
.finish(&mut df)
.expect("Could not write DataFrame as CSV to remote location");
}

// Skip this tests on Windows since it does not have a convenient /tmp/ location.
#[cfg_attr(target_os = "windows", ignore)]
#[cfg(feature = "csv")]
#[test]
fn cloudwriter_from_cloudlocation_test() {
use super::*;
use crate::csv::write::CsvWriter;
use crate::prelude::SerWriter;
use crate::prelude::{CsvReadOptions, SerWriter};
use crate::SerReader;

let mut df = example_dataframe();

Expand All @@ -187,7 +202,10 @@ mod tests {
std::fs::File::create(path).unwrap();

let mut cloud_writer = get_runtime()
.block_on(CloudWriter::new(format!("file://{}", path).as_str(), None))
.block_on(BlockingCloudWriter::new(
format!("file://{}", path).as_str(),
None,
))
.unwrap();

CsvWriter::new(&mut cloud_writer)
Expand Down
Loading
Loading