Skip to content

Commit

Permalink
simplify engine options
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite committed Mar 6, 2025
1 parent 8e653dd commit 6198d0b
Showing 1 changed file with 30 additions and 41 deletions.
71 changes: 30 additions & 41 deletions crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -759,8 +759,7 @@ impl LazyFrame {
#[cfg(feature = "new_streaming")]
{
let mut slf = self;
if let Some(df) = slf.try_new_streaming_if_requested(SinkType::Memory, engine)
{
if let Some(df) = slf.try_new_streaming_if_requested(SinkType::Memory, engine) {
return Ok(df?.unwrap());
}

Expand Down Expand Up @@ -818,23 +817,25 @@ impl LazyFrame {
options: ParquetWriteOptions,
cloud_options: Option<polars_io::cloud::CloudOptions>,
sink_options: SinkOptions,
mut engine: Engine,
engine: Engine,
) -> PolarsResult<()> {
if engine == Engine::InMemory {
use std::ops::DerefMut;
use std::io::BufWriter;

use polars_io::parquet::write::ParquetWriter;

let mut df = self.collect()?;

let path = path.as_ref().display().to_string();
let f = polars_io::utils::file::try_get_writeable(&path, cloud_options.as_ref())?;
ParquetWriter::new(BufWriter::new(f))
let mut f = polars_io::utils::file::try_get_writeable(&path, cloud_options.as_ref())?;
ParquetWriter::new(BufWriter::new(f.deref_mut()))
.with_compression(options.compression)
.with_statistics(options.statistics)
.with_row_group_size(options.row_group_size)
.with_data_page_size(options.data_page_size)
.finish(&mut df)?;
f.close()?;

return Ok(());
}
Expand All @@ -861,8 +862,10 @@ impl LazyFrame {
options: IpcWriterOptions,
cloud_options: Option<polars_io::cloud::CloudOptions>,
sink_options: SinkOptions,
mut engine: Engine,
engine: Engine,
) -> PolarsResult<()> {
use std::ops::DerefMut;

if engine == Engine::InMemory {
use std::io::BufWriter;

Expand All @@ -872,11 +875,12 @@ impl LazyFrame {
let mut df = self.collect()?;

let path = path.as_ref().display().to_string();
let f = polars_io::utils::file::try_get_writeable(&path, cloud_options.as_ref())?;
IpcWriter::new(BufWriter::new(f))
let mut f = polars_io::utils::file::try_get_writeable(&path, cloud_options.as_ref())?;
IpcWriter::new(BufWriter::new(f.deref_mut()))
.with_compression(options.compression)
.with_compat_level(options.compat_level)
.finish(&mut df)?;
f.close()?;

return Ok(());
}
Expand All @@ -903,8 +907,10 @@ impl LazyFrame {
options: CsvWriterOptions,
cloud_options: Option<polars_io::cloud::CloudOptions>,
sink_options: SinkOptions,
mut engine: Engine,
engine: Engine,
) -> PolarsResult<()> {
use std::ops::DerefMut;

if engine == Engine::InMemory {
use std::io::BufWriter;

Expand All @@ -914,8 +920,8 @@ impl LazyFrame {
let mut df = self.collect()?;

let path = path.as_ref().display().to_string();
let f = polars_io::utils::file::try_get_writeable(&path, cloud_options.as_ref())?;
CsvWriter::new(BufWriter::new(f))
let mut f = polars_io::utils::file::try_get_writeable(&path, cloud_options.as_ref())?;
CsvWriter::new(BufWriter::new(f.deref_mut()))
.include_bom(options.include_bom)
.include_header(options.include_header)
.with_separator(options.serialize_options.separator)
Expand All @@ -930,6 +936,7 @@ impl LazyFrame {
.with_null_value(options.serialize_options.null)
.with_quote_style(options.serialize_options.quote_style)
.finish(&mut df)?;
f.close()?;

return Ok(());
}
Expand All @@ -956,21 +963,23 @@ impl LazyFrame {
options: JsonWriterOptions,
cloud_options: Option<polars_io::cloud::CloudOptions>,
sink_options: SinkOptions,
mut engine: Engine,
engine: Engine,
) -> PolarsResult<()> {
if engine == Engine::InMemory {
use std::io::BufWriter;
use std::ops::DerefMut;

use polars_io::json::{JsonFormat, JsonWriter};
use polars_io::SerWriter;

let mut df = self.collect()?;

let path = path.as_ref().display().to_string();
let f = polars_io::utils::file::try_get_writeable(&path, cloud_options.as_ref())?;
JsonWriter::new(BufWriter::new(f))
let mut f = polars_io::utils::file::try_get_writeable(&path, cloud_options.as_ref())?;
JsonWriter::new(BufWriter::new(f.deref_mut()))
.with_json_format(JsonFormat::JsonLines)
.finish(&mut df)?;
f.close()?;

return Ok(());
}
Expand Down Expand Up @@ -998,13 +1007,8 @@ impl LazyFrame {
options: ParquetWriteOptions,
cloud_options: Option<polars_io::cloud::CloudOptions>,
sink_options: SinkOptions,
mut engine: Engine,
engine: Engine,
) -> PolarsResult<()> {
polars_ensure!(
engine != Engine::InMemory,
nyi = "partitioned sinks are not implemented for the in-memory engine"
);

self.sink(
SinkType::Partition {
path_f_string: Arc::new(path_f_string.as_ref().to_path_buf()),
Expand All @@ -1029,13 +1033,8 @@ impl LazyFrame {
options: IpcWriterOptions,
cloud_options: Option<polars_io::cloud::CloudOptions>,
sink_options: SinkOptions,
mut engine: Engine,
engine: Engine,
) -> PolarsResult<()> {
polars_ensure!(
engine != Engine::InMemory,
nyi = "partitioned sinks are not implemented for the in-memory engine"
);

self.sink(
SinkType::Partition {
path_f_string: Arc::new(path_f_string.as_ref().to_path_buf()),
Expand All @@ -1060,13 +1059,8 @@ impl LazyFrame {
options: CsvWriterOptions,
cloud_options: Option<polars_io::cloud::CloudOptions>,
sink_options: SinkOptions,
mut engine: Engine,
engine: Engine,
) -> PolarsResult<()> {
polars_ensure!(
engine != Engine::InMemory,
nyi = "partitioned sinks are not implemented for the in-memory engine"
);

self.sink(
SinkType::Partition {
path_f_string: Arc::new(path_f_string.as_ref().to_path_buf()),
Expand All @@ -1091,13 +1085,8 @@ impl LazyFrame {
options: JsonWriterOptions,
cloud_options: Option<polars_io::cloud::CloudOptions>,
sink_options: SinkOptions,
mut engine: Engine,
engine: Engine,
) -> PolarsResult<()> {
polars_ensure!(
engine != Engine::InMemory,
nyi = "partitioned sinks are not implemented for the in-memory engine"
);

self.sink(
SinkType::Partition {
path_f_string: Arc::new(path_f_string.as_ref().to_path_buf()),
Expand Down Expand Up @@ -1217,9 +1206,9 @@ impl LazyFrame {
Engine::Gpu => {
Err(polars_err!(InvalidOperation: "sink is not supported for the gpu engine"))
},
Engine::InMemory => {
Err(polars_err!(InvalidOperation: "this sink is not supported for the in-memory engine"))
},
Engine::InMemory => Err(
polars_err!(InvalidOperation: "this sink is not supported for the in-memory engine"),
),
Engine::OldStreaming => {
self.logical_plan = DslPlan::Sink {
input: Arc::new(self.logical_plan),
Expand Down

0 comments on commit 6198d0b

Please sign in to comment.