Skip to content

Commit

Permalink
feat: Dispatch new-streaming CSV negative slice to separate node (#21579
Browse files Browse the repository at this point in the history
)
  • Loading branch information
nameexhaustion authored Mar 4, 2025
1 parent f9b9a0d commit f492e56
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 32 deletions.
8 changes: 3 additions & 5 deletions crates/polars-stream/src/nodes/io_sources/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,11 +236,9 @@ impl CsvSourceNode {
async_executor::spawn(TaskPriority::Low, async move {
let global_slice = if let Some((offset, len)) = global_slice {
if offset < 0 {
polars_bail!(
ComputeError:
"not implemented: negative slice offset {} for CSV source",
offset
);
// IR lowering puts negative slice in separate node.
// TODO: Native line buffering for negative slice
unreachable!()
}
Some(offset as usize..offset as usize + len)
} else {
Expand Down
5 changes: 5 additions & 0 deletions crates/polars-stream/src/physical_plan/lower_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,12 @@ pub fn lower_ir(
FileScan::Ipc { .. } => (None, None, predicate.take()),
#[cfg(feature = "csv")]
FileScan::Csv { options, .. } => {
// Note: We dispatch negative slice to separate node.
#[allow(clippy::nonminimal_bool)]
if options.parse_options.comment_prefix.is_none()
&& !file_options
.pre_slice
.is_some_and(|(offset, _)| offset < 0)
&& std::env::var("POLARS_DISABLE_EXPERIMENTAL_CSV_SLICE")
.as_deref()
!= Ok("1")
Expand Down
36 changes: 9 additions & 27 deletions py-polars/tests/unit/io/test_multiscan.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,10 +414,10 @@ def test_multiscan_head(
(pl.scan_ipc, pl.DataFrame.write_ipc),
(pl.scan_parquet, pl.DataFrame.write_parquet),
(pl.scan_ndjson, pl.DataFrame.write_ndjson),
# (
# pl.scan_csv,
# pl.DataFrame.write_csv,
# ),
(
pl.scan_csv,
pl.DataFrame.write_csv,
),
],
)
def test_multiscan_tail(
Expand All @@ -442,10 +442,10 @@ def test_multiscan_tail(
(pl.scan_ipc, pl.DataFrame.write_ipc),
(pl.scan_parquet, pl.DataFrame.write_parquet),
(pl.scan_ndjson, pl.DataFrame.write_ndjson),
# (
# pl.scan_csv,
# pl.DataFrame.write_csv,
# ),
(
pl.scan_csv,
pl.DataFrame.write_csv,
),
],
)
def test_multiscan_slice_middle(
Expand Down Expand Up @@ -495,13 +495,7 @@ def test_multiscan_slice_middle(
(pl.scan_ipc, pl.DataFrame.write_ipc, "ipc"),
(pl.scan_parquet, pl.DataFrame.write_parquet, "parquet"),
(pl.scan_ndjson, pl.DataFrame.write_ndjson, "jsonl"),
pytest.param(
pl.scan_csv,
pl.DataFrame.write_csv,
"csv",
marks=pytest.mark.may_fail_auto_streaming,
# negatives slices are not yet implemented for CSV
),
(pl.scan_csv, pl.DataFrame.write_csv, "jsonl"),
],
)
@given(offset=st.integers(-100, 100), length=st.integers(0, 101))
Expand All @@ -512,18 +506,6 @@ def test_multiscan_slice_parametric(
offset: int,
length: int,
) -> None:
# Once CSV negative slicing is implemented this should be removed. If we
# don't do this, this test is flaky.
if ext == "csv":
f = io.BytesIO()
write(pl.Series("a", [1]).to_frame(), f)
f.seek(0)
try:
scan(f).slice(-1, 1).collect(new_streaming=True) # type: ignore[call-overload]
pytest.fail("This should crash")
except pl.exceptions.ComputeError:
pass

ref = io.BytesIO()
write(pl.Series("c1", [i % 7 for i in range(13 * 7)]).to_frame(), ref)
ref.seek(0)
Expand Down

0 comments on commit f492e56

Please sign in to comment.