Skip to content

Commit

Permalink
c
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion committed Mar 6, 2025
1 parent b3a3349 commit 5dbaf9e
Show file tree
Hide file tree
Showing 2 changed files with 286 additions and 126 deletions.
152 changes: 85 additions & 67 deletions crates/polars-io/src/cloud/adaptors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,15 @@ use std::sync::Arc;
use object_store::buffered::BufWriter;
use object_store::path::Path;
use object_store::ObjectStore;
use polars_error::{to_compute_err, PolarsResult};
use polars_error::PolarsResult;
use polars_utils::file::WriteClose;
use tokio::io::AsyncWriteExt;

use super::{object_path_from_str, CloudOptions};
use crate::pl_async::{get_runtime, get_upload_chunk_size};

enum WriterState {
Open(BufWriter),
/// Note: `Err` state is also used as the close state on success.
Err(std::io::Error),
}

impl WriterState {
fn try_with_writer<F, O>(&mut self, func: F) -> std::io::Result<O>
where
F: Fn(&mut BufWriter) -> std::io::Result<O>,
{
match self {
Self::Open(writer) => match func(writer) {
Ok(v) => Ok(v),
Err(e) => {
let _ = get_runtime().block_on_potential_spawn(writer.abort());
*self = Self::Err(e);
self.try_with_writer(func)
},
},
Self::Err(e) => Err(std::io::Error::new(e.kind(), e.to_string())),
}
}
fn clone_io_err(e: &std::io::Error) -> std::io::Error {
std::io::Error::new(e.kind(), e.to_string())
}

/// Adaptor which wraps the interface of [ObjectStore::BufWriter] exposing a synchronous interface
Expand All @@ -43,13 +23,12 @@ impl WriterState {
/// such as with `polars::prelude::CsvWriter`.
///
/// [ObjectStore::BufWriter]: https://docs.rs/object_store/latest/object_store/buffered/struct.BufWriter.html
pub struct CloudWriter {
// Internal writer, constructed at creation
inner: WriterState,
pub struct BlockingCloudWriter {
state: std::io::Result<BufWriter>,
}

impl CloudWriter {
/// Construct a new CloudWriter, re-using the given `object_store`
impl BlockingCloudWriter {
/// Construct a new BlockingCloudWriter, re-using the given `object_store`
///
/// Creates a new (current-thread) Tokio runtime
/// which bridges the sync writing process with the async ObjectStore multipart uploading.
Expand All @@ -59,21 +38,19 @@ impl CloudWriter {
path: Path,
) -> PolarsResult<Self> {
let writer = BufWriter::with_capacity(object_store, path, get_upload_chunk_size());
Ok(CloudWriter {
inner: WriterState::Open(writer),
})
Ok(BlockingCloudWriter { state: Ok(writer) })
}

/// Constructs a new CloudWriter from a path and an optional set of CloudOptions.
/// Constructs a new BlockingCloudWriter from a path and an optional set of CloudOptions.
///
/// Wrapper around `CloudWriter::new_with_object_store` that is useful if you only have a single write task.
/// Wrapper around `BlockingCloudWriter::new_with_object_store` that is useful if you only have a single write task.
/// TODO: Naming?
pub async fn new(uri: &str, cloud_options: Option<&CloudOptions>) -> PolarsResult<Self> {
if let Some(local_path) = uri.strip_prefix("file://") {
// Local paths must be created first, otherwise object store will not write anything.
if !matches!(std::fs::exists(local_path), Ok(true)) {
panic!(
"[CloudWriter] Expected local file to be created: {}",
"[BlockingCloudWriter] Expected local file to be created: {}",
local_path
);
}
Expand All @@ -87,64 +64,96 @@ impl CloudWriter {
)
}

pub fn close(&mut self) -> PolarsResult<()> {
let WriterState::Open(writer) = &mut self.inner else {
panic!();
};

get_runtime()
.block_on_potential_spawn(async { writer.shutdown().await })
.map_err(to_compute_err)?;
pub fn try_into_inner(mut self) -> std::io::Result<BufWriter> {
// We can't just return self.state:
// * cannot move out of type `adaptors::BlockingCloudWriter`, which implements the `Drop` trait
std::mem::replace(
&mut self.state,
Err(std::io::Error::new(std::io::ErrorKind::Other, "")),
)
}

self.inner = WriterState::Err(std::io::Error::new(
std::io::ErrorKind::Other,
"impl error: file was closed",
));
/// Closes the writer, or returns the existing error if it exists. After this function is called
/// the writer is guaranteed to be in an error state.
pub fn blocking_close(&mut self) -> std::io::Result<()> {
match self.blocking_try_with_writer(|writer| {
get_runtime().block_on_potential_spawn(writer.shutdown())
}) {
Ok(_) => {
self.state = Err(std::io::Error::new(std::io::ErrorKind::Other, "closed"));
Ok(())
},
Err(e) => Err(e),
}
}

Ok(())
fn blocking_try_with_writer<F, O>(&mut self, func: F) -> std::io::Result<O>
where
F: Fn(&mut BufWriter) -> std::io::Result<O>,
{
match func(self.state.as_mut().map_err(|e| clone_io_err(e))?) {
Ok(v) => Ok(v),
Err(e) => {
self.state = Err(clone_io_err(&e));
Err(e)
},
}
}
}

impl std::io::Write for CloudWriter {
impl std::io::Write for BlockingCloudWriter {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
// SAFETY:
// We extend the lifetime for the duration of this function. This is safe as well block the
// We extend the lifetime for the duration of this function. This is safe as we block the
// async runtime here
let buf = unsafe { std::mem::transmute::<&[u8], &'static [u8]>(buf) };

self.inner.try_with_writer(|writer| {
self.blocking_try_with_writer(|writer| {
get_runtime()
.block_on_potential_spawn(async { writer.write_all(buf).await.map(|_t| buf.len()) })
})
}

fn flush(&mut self) -> std::io::Result<()> {
self.inner.try_with_writer(|writer| {
get_runtime().block_on_potential_spawn(async { writer.flush().await })
self.blocking_try_with_writer(|writer| {
get_runtime().block_on_potential_spawn(writer.flush())
})
}
}

impl Drop for CloudWriter {
impl WriteClose for BlockingCloudWriter {
fn close(mut self: Box<Self>) -> std::io::Result<()> {
self.blocking_close()
}
}

impl Drop for BlockingCloudWriter {
fn drop(&mut self) {
// TODO: Properly raise this error instead of panicking.
match self.inner {
WriterState::Open(_) => self.close().unwrap(),
WriterState::Err(_) => {},
if self.state.is_err() {
return;
}

// Note: We should not hit here - the writer should instead be explicitly closed.
// But we still have this here as a safety measure to prevent silently dropping errors.
match self.blocking_close() {
Ok(()) => {},
e @ Err(_) => {
if std::thread::panicking() {
eprintln!("ERROR: CloudWriter errored on close: {:?}", e)
} else {
e.unwrap()
}
},
}
}
}

#[cfg(feature = "csv")]
#[cfg(test)]
mod tests {

use polars_core::df;
use polars_core::prelude::DataFrame;

use super::*;
use crate::prelude::CsvReadOptions;
use crate::SerReader;

fn example_dataframe() -> DataFrame {
df!(
"foo" => &[1, 2, 3],
Expand All @@ -154,7 +163,9 @@ mod tests {
}

#[test]
#[cfg(feature = "csv")]
fn csv_to_local_objectstore_cloudwriter() {
use super::*;
use crate::csv::write::CsvWriter;
use crate::prelude::SerWriter;

Expand All @@ -167,18 +178,22 @@ mod tests {

let path: object_store::path::Path = "cloud_writer_example.csv".into();

let mut cloud_writer = CloudWriter::new_with_object_store(object_store, path).unwrap();
let mut cloud_writer =
BlockingCloudWriter::new_with_object_store(object_store, path).unwrap();
CsvWriter::new(&mut cloud_writer)
.finish(&mut df)
.expect("Could not write DataFrame as CSV to remote location");
}

// Skip this tests on Windows since it does not have a convenient /tmp/ location.
#[cfg_attr(target_os = "windows", ignore)]
#[cfg(feature = "csv")]
#[test]
fn cloudwriter_from_cloudlocation_test() {
use super::*;
use crate::csv::write::CsvWriter;
use crate::prelude::SerWriter;
use crate::prelude::{CsvReadOptions, SerWriter};
use crate::SerReader;

let mut df = example_dataframe();

Expand All @@ -187,14 +202,17 @@ mod tests {
std::fs::File::create(path).unwrap();

let mut cloud_writer = get_runtime()
.block_on(CloudWriter::new(format!("file://{}", path).as_str(), None))
.block_on(BlockingCloudWriter::new(
format!("file://{}", path).as_str(),
None,
))
.unwrap();

CsvWriter::new(&mut cloud_writer)
.finish(&mut df)
.expect("Could not write DataFrame as CSV to remote location");

cloud_writer.close().unwrap();
cloud_writer.blocking_close().unwrap();

assert_eq!(
CsvReadOptions::default()
Expand Down
Loading

0 comments on commit 5dbaf9e

Please sign in to comment.