Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Properly handle phase transitions in row-wise sinks #21600

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 41 additions & 62 deletions crates/polars-stream/src/nodes/io_sinks/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,12 @@ use polars_io::SerWriter;
use polars_plan::dsl::SinkOptions;
use polars_utils::priority::Priority;

use super::{SinkInputPort, SinkNode, SinkRecvPort};
use super::{SinkInputPort, SinkNode};
use crate::async_executor::spawn;
use crate::async_primitives::linearizer::Linearizer;
use crate::nodes::io_sinks::{tokio_sync_on_close, DEFAULT_SINK_LINEARIZER_BUFFER_SIZE};
use crate::nodes::{JoinHandle, MorselSeq, TaskPriority};
use crate::async_primitives::connector::Receiver;
use crate::nodes::io_sinks::{parallelize_receive_task, tokio_sync_on_close};
use crate::nodes::{JoinHandle, PhaseOutcome, TaskPriority};

type Linearized = Priority<Reverse<MorselSeq>, Vec<u8>>;
pub struct CsvSinkNode {
path: PathBuf,
schema: SchemaRef,
Expand Down Expand Up @@ -52,39 +51,18 @@ impl SinkNode for CsvSinkNode {
fn is_sink_input_parallel(&self) -> bool {
true
}
fn do_maintain_order(&self) -> bool {
self.sink_options.maintain_order
}

fn spawn_sink(
&mut self,
num_pipelines: usize,
recv_ports_recv: SinkRecvPort,
recv_port_rx: Receiver<(PhaseOutcome, SinkInputPort)>,
_state: &ExecutionState,
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
) {
let rxs = recv_ports_recv.parallel(join_handles);
self.spawn_sink_once(
num_pipelines,
SinkInputPort::Parallel(rxs),
_state,
let (pass_rxs, mut io_rx) = parallelize_receive_task(
join_handles,
);
}

fn spawn_sink_once(
&mut self,
num_pipelines: usize,
recv_port: SinkInputPort,
_state: &ExecutionState,
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
) {
// .. -> Encode task
let rxs = recv_port.parallel();
// Encode tasks -> IO task
let (mut lin_rx, lin_txs) = Linearizer::<Linearized>::new_with_maintain_order(
recv_port_rx,
num_pipelines,
DEFAULT_SINK_LINEARIZER_BUFFER_SIZE,
self.sink_options.maintain_order,
);

Expand All @@ -94,7 +72,7 @@ impl SinkNode for CsvSinkNode {
// Encode task.
//
// Task encodes the columns into their corresponding CSV encoding.
join_handles.extend(rxs.into_iter().zip(lin_txs).map(|(mut rx, mut lin_tx)| {
join_handles.extend(pass_rxs.into_iter().map(|mut pass_rx| {
let schema = self.schema.clone();
let options = self.write_options.clone();

Expand All @@ -104,36 +82,35 @@ impl SinkNode for CsvSinkNode {
let mut allocation_size = DEFAULT_ALLOCATION_SIZE;
let options = options.clone();

while let Ok(morsel) = rx.recv().await {
let (df, seq, _, consume_token) = morsel.into_inner();

let mut buffer = Vec::with_capacity(allocation_size);
let mut writer = CsvWriter::new(&mut buffer)
.include_bom(false) // Handled once in the IO task.
.include_header(false) // Handled once in the IO task.
.with_separator(options.serialize_options.separator)
.with_line_terminator(options.serialize_options.line_terminator.clone())
.with_quote_char(options.serialize_options.quote_char)
.with_datetime_format(options.serialize_options.datetime_format.clone())
.with_date_format(options.serialize_options.date_format.clone())
.with_time_format(options.serialize_options.time_format.clone())
.with_float_scientific(options.serialize_options.float_scientific)
.with_float_precision(options.serialize_options.float_precision)
.with_null_value(options.serialize_options.null.clone())
.with_quote_style(options.serialize_options.quote_style)
.n_threads(1) // Disable rayon parallelism
.batched(&schema)?;

writer.write_batch(&df)?;

allocation_size = allocation_size.max(buffer.len());

// Must drop before linearizer insert or will deadlock.
drop(consume_token); // Keep the consume_token until here to increase the
// backpressure.

if lin_tx.insert(Priority(Reverse(seq), buffer)).await.is_err() {
return Ok(());
while let Ok((mut rx, mut lin_tx)) = pass_rx.recv().await {
while let Ok(morsel) = rx.recv().await {
let (df, seq, _, consume_token) = morsel.into_inner();

let mut buffer = Vec::with_capacity(allocation_size);
let mut writer = CsvWriter::new(&mut buffer)
.include_bom(false) // Handled once in the IO task.
.include_header(false) // Handled once in the IO task.
.with_separator(options.serialize_options.separator)
.with_line_terminator(options.serialize_options.line_terminator.clone())
.with_quote_char(options.serialize_options.quote_char)
.with_datetime_format(options.serialize_options.datetime_format.clone())
.with_date_format(options.serialize_options.date_format.clone())
.with_time_format(options.serialize_options.time_format.clone())
.with_float_scientific(options.serialize_options.float_scientific)
.with_float_precision(options.serialize_options.float_precision)
.with_null_value(options.serialize_options.null.clone())
.with_quote_style(options.serialize_options.quote_style)
.n_threads(1) // Disable rayon parallelism
.batched(&schema)?;

writer.write_batch(&df)?;

allocation_size = allocation_size.max(buffer.len());
if lin_tx.insert(Priority(Reverse(seq), buffer)).await.is_err() {
return Ok(());
}
drop(consume_token); // Keep the consume_token until here to increase the
// backpressure.
}
}

Expand Down Expand Up @@ -170,8 +147,10 @@ impl SinkNode for CsvSinkNode {

let mut file = file.try_into_async_writeable()?;

while let Some(Priority(_, buffer)) = lin_rx.get().await {
file.write_all(&buffer).await?;
while let Ok(mut lin_rx) = io_rx.recv().await {
while let Some(Priority(_, buffer)) = lin_rx.get().await {
file.write_all(&buffer).await?;
}
}

if let AsyncWriteable::Local(file) = &mut file {
Expand Down
28 changes: 5 additions & 23 deletions crates/polars-stream/src/nodes/io_sinks/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ use polars_plan::dsl::SinkOptions;
use polars_utils::priority::Priority;

use super::{
buffer_and_distribute_columns_task, SinkInputPort, SinkNode, SinkRecvPort,
buffer_and_distribute_columns_task, SinkInputPort, SinkNode,
DEFAULT_SINK_DISTRIBUTOR_BUFFER_SIZE, DEFAULT_SINK_LINEARIZER_BUFFER_SIZE,
};
use crate::async_executor::spawn;
use crate::async_primitives::connector::connector;
use crate::async_primitives::connector::{connector, Receiver};
use crate::async_primitives::distributor_channel::distributor_channel;
use crate::async_primitives::linearizer::Linearizer;
use crate::nodes::io_sinks::sync_on_close;
use crate::nodes::{JoinHandle, TaskPriority};
use crate::nodes::{JoinHandle, PhaseOutcome, TaskPriority};

pub struct IpcSinkNode {
path: PathBuf,
Expand Down Expand Up @@ -72,28 +72,10 @@ impl SinkNode for IpcSinkNode {
fn spawn_sink(
&mut self,
num_pipelines: usize,
recv_ports_recv: SinkRecvPort,
recv_port_rx: Receiver<(PhaseOutcome, SinkInputPort)>,
_state: &ExecutionState,
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
) {
let rx = recv_ports_recv.serial(join_handles);
self.spawn_sink_once(
num_pipelines,
SinkInputPort::Serial(rx),
_state,
join_handles,
);
}

fn spawn_sink_once(
&mut self,
num_pipelines: usize,
recv_port: SinkInputPort,
_state: &ExecutionState,
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
) {
// .. -> Buffer task
let buffer_rx = recv_port.serial();
// Buffer task -> Encode tasks
let (dist_tx, dist_rxs) =
distributor_channel(num_pipelines, DEFAULT_SINK_DISTRIBUTOR_BUFFER_SIZE);
Expand All @@ -118,7 +100,7 @@ impl SinkNode for IpcSinkNode {

// Buffer task.
join_handles.push(buffer_and_distribute_columns_task(
buffer_rx,
recv_port_rx,
dist_tx,
chunk_size as usize,
self.input_schema.clone(),
Expand Down
66 changes: 24 additions & 42 deletions crates/polars-stream/src/nodes/io_sinks/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@ use polars_io::utils::file::AsyncWriteable;
use polars_plan::dsl::SinkOptions;
use polars_utils::priority::Priority;

use super::{SinkInputPort, SinkNode, SinkRecvPort};
use super::{SinkInputPort, SinkNode};
use crate::async_executor::spawn;
use crate::async_primitives::linearizer::Linearizer;
use crate::nodes::io_sinks::{tokio_sync_on_close, DEFAULT_SINK_LINEARIZER_BUFFER_SIZE};
use crate::nodes::{JoinHandle, MorselSeq, TaskPriority};
use crate::async_primitives::connector::Receiver;
use crate::nodes::io_sinks::{parallelize_receive_task, tokio_sync_on_close};
use crate::nodes::{JoinHandle, PhaseOutcome, TaskPriority};

type Linearized = Priority<Reverse<MorselSeq>, Vec<u8>>;
pub struct NDJsonSinkNode {
path: PathBuf,
sink_options: SinkOptions,
Expand Down Expand Up @@ -50,32 +49,14 @@ impl SinkNode for NDJsonSinkNode {
fn spawn_sink(
&mut self,
num_pipelines: usize,
recv_ports_recv: SinkRecvPort,
recv_port_rx: Receiver<(PhaseOutcome, SinkInputPort)>,
_state: &ExecutionState,
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
) {
let rxs = recv_ports_recv.parallel(join_handles);
self.spawn_sink_once(
num_pipelines,
SinkInputPort::Parallel(rxs),
_state,
let (pass_rxs, mut io_rx) = parallelize_receive_task(
join_handles,
);
}

fn spawn_sink_once(
&mut self,
num_pipelines: usize,
recv_port: SinkInputPort,
_state: &ExecutionState,
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
) {
// .. -> Encode task
let rxs = recv_port.parallel();
// Encode tasks -> IO task
let (mut lin_rx, lin_txs) = Linearizer::<Linearized>::new_with_maintain_order(
recv_port_rx,
num_pipelines,
DEFAULT_SINK_LINEARIZER_BUFFER_SIZE,
self.sink_options.maintain_order,
);

Expand All @@ -85,28 +66,27 @@ impl SinkNode for NDJsonSinkNode {
// Encode task.
//
// Task encodes the columns into their corresponding JSON encoding.
join_handles.extend(rxs.into_iter().zip(lin_txs).map(|(mut rx, mut lin_tx)| {
join_handles.extend(pass_rxs.into_iter().map(|mut pass_rx| {
spawn(TaskPriority::High, async move {
// Amortize the allocations over time. If we see that we need to do way larger
// allocations, we adjust to that over time.
let mut allocation_size = DEFAULT_ALLOCATION_SIZE;

while let Ok(morsel) = rx.recv().await {
let (df, seq, _, consume_token) = morsel.into_inner();
while let Ok((mut rx, mut lin_tx)) = pass_rx.recv().await {
while let Ok(morsel) = rx.recv().await {
let (df, seq, _, consume_token) = morsel.into_inner();

let mut buffer = Vec::with_capacity(allocation_size);
let mut writer = BatchedWriter::new(&mut buffer);
let mut buffer = Vec::with_capacity(allocation_size);
let mut writer = BatchedWriter::new(&mut buffer);

writer.write_batch(&df)?;
writer.write_batch(&df)?;

allocation_size = allocation_size.max(buffer.len());

// Must drop before linearizer insert or will deadlock.
drop(consume_token); // Keep the consume_token until here to increase the
// backpressure.

if lin_tx.insert(Priority(Reverse(seq), buffer)).await.is_err() {
return Ok(());
allocation_size = allocation_size.max(buffer.len());
if lin_tx.insert(Priority(Reverse(seq), buffer)).await.is_err() {
return Ok(());
}
drop(consume_token); // Keep the consume_token until here to increase the
// backpressure.
}
}

Expand All @@ -129,8 +109,10 @@ impl SinkNode for NDJsonSinkNode {
)
.await?;

while let Some(Priority(_, buffer)) = lin_rx.get().await {
file.write_all(&buffer).await?;
while let Ok(mut lin_rx) = io_rx.recv().await {
while let Some(Priority(_, buffer)) = lin_rx.get().await {
file.write_all(&buffer).await?;
}
}

if let AsyncWriteable::Local(file) = &mut file {
Expand Down
Loading
Loading