Skip to content

Commit

Permalink
fix: Don't drop rows in sinks between new streaming phases (#21489)
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite authored Feb 27, 2025
1 parent 828a1e7 commit 8622382
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 35 deletions.
58 changes: 41 additions & 17 deletions crates/polars-stream/src/nodes/io_sinks/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,12 @@ impl SinkNode for IpcSinkNode {
while let Ok(input) = recv_ports_recv.recv().await {
let mut receiver = input.port.serial();

let mut stop_requested = false;
loop {
if buffer.height() >= chunk_size || (buffer.height() > 0 && stop_requested) {
while let Ok(morsel) = receiver.recv().await {
let df = morsel.into_df();
// @NOTE: This also performs schema validation.
buffer.vstack_mut(&df)?;

while buffer.height() >= chunk_size {
let df;
(df, buffer) = buffer.split_at(buffer.height().min(chunk_size) as i64);

Expand Down Expand Up @@ -147,24 +150,45 @@ impl SinkNode for IpcSinkNode {
break;
}
}
}

// If we have no more rows to write and there are no more morsels coming, we can
// stop this task.
if buffer.is_empty() && stop_requested {
break;
}

let Ok(morsel) = receiver.recv().await else {
stop_requested = true;
continue;
};
input.outcome.stop();
}

let df = morsel.into_df();
// @NOTE: This also performs schema validation.
buffer.vstack_mut(&df)?;
// Flush the remaining rows.
while buffer.height() > 0 {
let df;
(df, buffer) = buffer.split_at(buffer.height().min(chunk_size) as i64);

// We want to rechunk for two reasons:
// 1. the IPC writer expects aligned column chunks
// 2. the IPC writer turns chunks / record batches into chunks in the file,
// so we want to respect the given `chunk_size`.
//
// This also properly sets the inner types of the record batches, which is
// important for dictionary and nested type encoding.
let record_batch = df.rechunk_to_record_batch(compat_level);

// If there are dictionaries, we might need to emit the original dictionary
// definitions or dictionary deltas. We have precomputed which columns contain
// dictionaries and only check those columns.
let mut dicts_to_encode = Vec::new();
for &i in &dict_columns_idxs {
dictionaries_to_encode(
&ipc_fields[i],
record_batch.arrays()[i].as_ref(),
&mut dictionary_tracker,
&mut dicts_to_encode,
)?;
}

input.outcome.stop();
// Send of the dictionaries and record batch to be encoded by an Encoder
// task. This is compute heavy, so distribute the chunks.
let msg = (seq, dicts_to_encode, record_batch);
seq += 1;
if distribute.send(msg).await.is_err() {
break;
}
}

PolarsResult::Ok(())
Expand Down
38 changes: 20 additions & 18 deletions crates/polars-stream/src/nodes/io_sinks/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,25 +109,17 @@ impl SinkNode for ParquetSinkNode {
.row_group_size
.unwrap_or(DEFAULT_ROW_GROUP_SIZE)
.max(1);
let mut stop_requested = false;
let mut row_group_index = 0;

while let Ok(input) = recv_ports_recv.recv().await {
let mut receiver = input.port.serial();
loop {
match receiver.recv().await {
Err(_) => stop_requested = true,
Ok(morsel) => {
let df = morsel.into_df();

// @NOTE: This also performs schema validation.
buffer.vstack_mut(&df)?;
},
}

while (stop_requested && buffer.height() > 0)
|| buffer.height() >= row_group_size
{
while let Ok(morsel) = receiver.recv().await {
let df = morsel.into_df();
// @NOTE: This also performs schema validation.
buffer.vstack_mut(&df)?;

while buffer.height() >= row_group_size {
let row_group;

(row_group, buffer) =
Expand All @@ -143,15 +135,25 @@ impl SinkNode for ParquetSinkNode {

row_group_index += 1;
}

if stop_requested {
break;
}
}

input.outcome.stop();
}

// Flush the remaining rows.
while buffer.height() > 0 {
let row_group;

(row_group, buffer) = buffer.split_at(row_group_size.min(buffer.height()) as i64);
for (column_idx, column) in row_group.take_columns().into_iter().enumerate() {
distribute
.send((row_group_index, column_idx, column))
.await
.unwrap();
}
row_group_index += 1;
}

PolarsResult::Ok(())
}));

Expand Down

0 comments on commit 8622382

Please sign in to comment.