Skip to content

Commit

Permalink
fix: Properly handle phase transitions in row-wise sinks
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite committed Mar 5, 2025
1 parent 241d34b commit f9948ff
Show file tree
Hide file tree
Showing 7 changed files with 279 additions and 373 deletions.
100 changes: 41 additions & 59 deletions crates/polars-stream/src/nodes/io_sinks/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,12 @@ use polars_io::SerWriter;
use polars_plan::dsl::SinkOptions;
use polars_utils::priority::Priority;

use super::{SinkInputPort, SinkNode, SinkRecvPort};
use super::{SinkInputPort, SinkNode};
use crate::async_executor::spawn;
use crate::async_primitives::linearizer::Linearizer;
use crate::nodes::io_sinks::{tokio_sync_on_close, DEFAULT_SINK_LINEARIZER_BUFFER_SIZE};
use crate::nodes::{JoinHandle, MorselSeq, TaskPriority};
use crate::async_primitives::connector::Receiver;
use crate::nodes::io_sinks::{parallelize_receive_task, tokio_sync_on_close};
use crate::nodes::{JoinHandle, PhaseOutcome, TaskPriority};

type Linearized = Priority<Reverse<MorselSeq>, Vec<u8>>;
pub struct CsvSinkNode {
path: PathBuf,
schema: SchemaRef,
Expand Down Expand Up @@ -47,39 +46,18 @@ impl SinkNode for CsvSinkNode {
fn is_sink_input_parallel(&self) -> bool {
true
}
fn do_maintain_order(&self) -> bool {
self.sink_options.maintain_order
}

fn spawn_sink(
&mut self,
num_pipelines: usize,
recv_ports_recv: SinkRecvPort,
recv_port_rx: Receiver<(PhaseOutcome, SinkInputPort)>,
_state: &ExecutionState,
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
) {
let rxs = recv_ports_recv.parallel(join_handles);
self.spawn_sink_once(
num_pipelines,
SinkInputPort::Parallel(rxs),
_state,
let (pass_rxs, mut io_rx) = parallelize_receive_task(
join_handles,
);
}

fn spawn_sink_once(
&mut self,
num_pipelines: usize,
recv_port: SinkInputPort,
_state: &ExecutionState,
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
) {
// .. -> Encode task
let rxs = recv_port.parallel();
// Encode tasks -> IO task
let (mut lin_rx, lin_txs) = Linearizer::<Linearized>::new_with_maintain_order(
recv_port_rx,
num_pipelines,
DEFAULT_SINK_LINEARIZER_BUFFER_SIZE,
self.sink_options.maintain_order,
);

Expand All @@ -89,7 +67,7 @@ impl SinkNode for CsvSinkNode {
// Encode task.
//
// Task encodes the columns into their corresponding CSV encoding.
join_handles.extend(rxs.into_iter().zip(lin_txs).map(|(mut rx, mut lin_tx)| {
join_handles.extend(pass_rxs.into_iter().map(|mut pass_rx| {
let schema = self.schema.clone();
let options = self.write_options.clone();

Expand All @@ -99,34 +77,36 @@ impl SinkNode for CsvSinkNode {
let mut allocation_size = DEFAULT_ALLOCATION_SIZE;
let options = options.clone();

while let Ok(morsel) = rx.recv().await {
let (df, seq, _, consume_token) = morsel.into_inner();

let mut buffer = Vec::with_capacity(allocation_size);
let mut writer = CsvWriter::new(&mut buffer)
.include_bom(false) // Handled once in the IO task.
.include_header(false) // Handled once in the IO task.
.with_separator(options.serialize_options.separator)
.with_line_terminator(options.serialize_options.line_terminator.clone())
.with_quote_char(options.serialize_options.quote_char)
.with_datetime_format(options.serialize_options.datetime_format.clone())
.with_date_format(options.serialize_options.date_format.clone())
.with_time_format(options.serialize_options.time_format.clone())
.with_float_scientific(options.serialize_options.float_scientific)
.with_float_precision(options.serialize_options.float_precision)
.with_null_value(options.serialize_options.null.clone())
.with_quote_style(options.serialize_options.quote_style)
.n_threads(1) // Disable rayon parallelism
.batched(&schema)?;

writer.write_batch(&df)?;

allocation_size = allocation_size.max(buffer.len());
if lin_tx.insert(Priority(Reverse(seq), buffer)).await.is_err() {
return Ok(());
while let Ok((mut rx, mut lin_tx)) = pass_rx.recv().await {
while let Ok(morsel) = rx.recv().await {
let (df, seq, _, consume_token) = morsel.into_inner();

let mut buffer = Vec::with_capacity(allocation_size);
let mut writer = CsvWriter::new(&mut buffer)
.include_bom(false) // Handled once in the IO task.
.include_header(false) // Handled once in the IO task.
.with_separator(options.serialize_options.separator)
.with_line_terminator(options.serialize_options.line_terminator.clone())
.with_quote_char(options.serialize_options.quote_char)
.with_datetime_format(options.serialize_options.datetime_format.clone())
.with_date_format(options.serialize_options.date_format.clone())
.with_time_format(options.serialize_options.time_format.clone())
.with_float_scientific(options.serialize_options.float_scientific)
.with_float_precision(options.serialize_options.float_precision)
.with_null_value(options.serialize_options.null.clone())
.with_quote_style(options.serialize_options.quote_style)
.n_threads(1) // Disable rayon parallelism
.batched(&schema)?;

writer.write_batch(&df)?;

allocation_size = allocation_size.max(buffer.len());
if lin_tx.insert(Priority(Reverse(seq), buffer)).await.is_err() {
return Ok(());
}
drop(consume_token); // Keep the consume_token until here to increase the
// backpressure.
}
drop(consume_token); // Keep the consume_token until here to increase the
// backpressure.
}

PolarsResult::Ok(())
Expand Down Expand Up @@ -165,8 +145,10 @@ impl SinkNode for CsvSinkNode {
file = tokio::fs::File::from_std(std_file);
}

while let Some(Priority(_, buffer)) = lin_rx.get().await {
file.write_all(&buffer).await?;
while let Ok(mut lin_rx) = io_rx.recv().await {
while let Some(Priority(_, buffer)) = lin_rx.get().await {
file.write_all(&buffer).await?;
}
}

tokio_sync_on_close(sink_options.sync_on_close, &mut file).await?;
Expand Down
28 changes: 5 additions & 23 deletions crates/polars-stream/src/nodes/io_sinks/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@ use polars_plan::dsl::SinkOptions;
use polars_utils::priority::Priority;

use super::{
buffer_and_distribute_columns_task, SinkInputPort, SinkNode, SinkRecvPort,
buffer_and_distribute_columns_task, SinkInputPort, SinkNode,
DEFAULT_SINK_DISTRIBUTOR_BUFFER_SIZE, DEFAULT_SINK_LINEARIZER_BUFFER_SIZE,
};
use crate::async_executor::spawn;
use crate::async_primitives::connector::connector;
use crate::async_primitives::connector::{connector, Receiver};
use crate::async_primitives::distributor_channel::distributor_channel;
use crate::async_primitives::linearizer::Linearizer;
use crate::morsel::get_ideal_morsel_size;
use crate::nodes::io_sinks::sync_on_close;
use crate::nodes::{JoinHandle, TaskPriority};
use crate::nodes::{JoinHandle, PhaseOutcome, TaskPriority};

pub struct IpcSinkNode {
path: PathBuf,
Expand Down Expand Up @@ -77,28 +77,10 @@ impl SinkNode for IpcSinkNode {
fn spawn_sink(
&mut self,
num_pipelines: usize,
recv_ports_recv: SinkRecvPort,
recv_port_rx: Receiver<(PhaseOutcome, SinkInputPort)>,
_state: &ExecutionState,
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
) {
let rx = recv_ports_recv.serial(join_handles);
self.spawn_sink_once(
num_pipelines,
SinkInputPort::Serial(rx),
_state,
join_handles,
);
}

fn spawn_sink_once(
&mut self,
num_pipelines: usize,
recv_port: SinkInputPort,
_state: &ExecutionState,
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
) {
// .. -> Buffer task
let buffer_rx = recv_port.serial();
// Buffer task -> Encode tasks
let (dist_tx, dist_rxs) =
distributor_channel(num_pipelines, DEFAULT_SINK_DISTRIBUTOR_BUFFER_SIZE);
Expand All @@ -124,7 +106,7 @@ impl SinkNode for IpcSinkNode {

// Buffer task.
join_handles.push(buffer_and_distribute_columns_task(
buffer_rx,
recv_port_rx,
dist_tx,
chunk_size,
self.input_schema.clone(),
Expand Down
63 changes: 24 additions & 39 deletions crates/polars-stream/src/nodes/io_sinks/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@ use polars_io::json::BatchedWriter;
use polars_plan::dsl::SinkOptions;
use polars_utils::priority::Priority;

use super::{SinkInputPort, SinkNode, SinkRecvPort};
use super::{SinkInputPort, SinkNode};
use crate::async_executor::spawn;
use crate::async_primitives::linearizer::Linearizer;
use crate::nodes::io_sinks::{tokio_sync_on_close, DEFAULT_SINK_LINEARIZER_BUFFER_SIZE};
use crate::nodes::{JoinHandle, MorselSeq, TaskPriority};
use crate::async_primitives::connector::Receiver;
use crate::nodes::io_sinks::{parallelize_receive_task, tokio_sync_on_close};
use crate::nodes::{JoinHandle, PhaseOutcome, TaskPriority};

type Linearized = Priority<Reverse<MorselSeq>, Vec<u8>>;
pub struct NDJsonSinkNode {
path: PathBuf,
sink_options: SinkOptions,
Expand All @@ -39,32 +38,14 @@ impl SinkNode for NDJsonSinkNode {
fn spawn_sink(
&mut self,
num_pipelines: usize,
recv_ports_recv: SinkRecvPort,
recv_port_rx: Receiver<(PhaseOutcome, SinkInputPort)>,
_state: &ExecutionState,
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
) {
let rxs = recv_ports_recv.parallel(join_handles);
self.spawn_sink_once(
num_pipelines,
SinkInputPort::Parallel(rxs),
_state,
let (pass_rxs, mut io_rx) = parallelize_receive_task(
join_handles,
);
}

fn spawn_sink_once(
&mut self,
num_pipelines: usize,
recv_port: SinkInputPort,
_state: &ExecutionState,
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
) {
// .. -> Encode task
let rxs = recv_port.parallel();
// Encode tasks -> IO task
let (mut lin_rx, lin_txs) = Linearizer::<Linearized>::new_with_maintain_order(
recv_port_rx,
num_pipelines,
DEFAULT_SINK_LINEARIZER_BUFFER_SIZE,
self.sink_options.maintain_order,
);

Expand All @@ -74,26 +55,28 @@ impl SinkNode for NDJsonSinkNode {
// Encode task.
//
// Task encodes the columns into their corresponding JSON encoding.
join_handles.extend(rxs.into_iter().zip(lin_txs).map(|(mut rx, mut lin_tx)| {
join_handles.extend(pass_rxs.into_iter().map(|mut pass_rx| {
spawn(TaskPriority::High, async move {
// Amortize the allocations over time. If we see that we need to do way larger
// allocations, we adjust to that over time.
let mut allocation_size = DEFAULT_ALLOCATION_SIZE;

while let Ok(morsel) = rx.recv().await {
let (df, seq, _, consume_token) = morsel.into_inner();
while let Ok((mut rx, mut lin_tx)) = pass_rx.recv().await {
while let Ok(morsel) = rx.recv().await {
let (df, seq, _, consume_token) = morsel.into_inner();

let mut buffer = Vec::with_capacity(allocation_size);
let mut writer = BatchedWriter::new(&mut buffer);
let mut buffer = Vec::with_capacity(allocation_size);
let mut writer = BatchedWriter::new(&mut buffer);

writer.write_batch(&df)?;
writer.write_batch(&df)?;

allocation_size = allocation_size.max(buffer.len());
if lin_tx.insert(Priority(Reverse(seq), buffer)).await.is_err() {
return Ok(());
allocation_size = allocation_size.max(buffer.len());
if lin_tx.insert(Priority(Reverse(seq), buffer)).await.is_err() {
return Ok(());
}
drop(consume_token); // Keep the consume_token until here to increase the
// backpressure.
}
drop(consume_token); // Keep the consume_token until here to increase the
// backpressure.
}

PolarsResult::Ok(())
Expand All @@ -117,8 +100,10 @@ impl SinkNode for NDJsonSinkNode {
.await
.map_err(|err| polars_utils::_limit_path_len_io_err(path.as_path(), err))?;

while let Some(Priority(_, buffer)) = lin_rx.get().await {
file.write_all(&buffer).await?;
while let Ok(mut lin_rx) = io_rx.recv().await {
while let Some(Priority(_, buffer)) = lin_rx.get().await {
file.write_all(&buffer).await?;
}
}

tokio_sync_on_close(sink_options.sync_on_close, &mut file).await?;
Expand Down
Loading

0 comments on commit f9948ff

Please sign in to comment.