Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix CSV count with comment prefix skipped empty lines #21577

Merged
merged 2 commits into from
Mar 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/polars-io/src/csv/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ mod splitfields;
mod utils;

pub use options::{CommentPrefix, CsvEncoding, CsvParseOptions, CsvReadOptions, NullValues};
pub use parser::{count_rows, count_rows_from_slice};
pub use parser::{count_rows, count_rows_from_slice, count_rows_from_slice_par};
pub use read_impl::batched::{BatchedCsvReader, OwnedBatchedCsvReader};
pub use reader::CsvReader;
pub use schema_inference::infer_file_schema;
Expand Down
55 changes: 41 additions & 14 deletions crates/polars-io/src/csv/read/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use num_traits::Pow;
use polars_core::prelude::*;
use polars_core::{config, POOL};
use polars_error::feature_gated;
use polars_utils::index::Bounded;
use polars_utils::select::select_unpredictable;
use rayon::prelude::*;

Expand Down Expand Up @@ -43,7 +42,7 @@ pub fn count_rows(
let owned = &mut vec![];
let reader_bytes = maybe_decompress_bytes(mmap.as_ref(), owned)?;

count_rows_from_slice(
count_rows_from_slice_par(
reader_bytes,
separator,
quote_char,
Expand All @@ -55,7 +54,7 @@ pub fn count_rows(

/// Read the number of rows without parsing columns
/// useful for count(*) queries
pub fn count_rows_from_slice(
pub fn count_rows_from_slice_par(
mut bytes: &[u8],
separator: u8,
quote_char: Option<u8>,
Expand Down Expand Up @@ -89,27 +88,55 @@ pub fn count_rows_from_slice(
})
.unwrap_or(1);

if n_threads == 1 {
return count_rows_from_slice(bytes, quote_char, comment_prefix, eol_char, has_header);
}

let file_chunks: Vec<(usize, usize)> =
get_file_chunks(bytes, n_threads, None, separator, quote_char, eol_char);

let iter = file_chunks.into_par_iter().map(|(start, stop)| {
let local_bytes = &bytes[start..stop];
let row_iterator = SplitLines::new(local_bytes, quote_char, eol_char, comment_prefix);
let bytes = &bytes[start..stop];

if comment_prefix.is_some() {
Ok(row_iterator
.filter(|line| !line.is_empty() && !is_comment_line(line, comment_prefix))
.count())
SplitLines::new(bytes, quote_char, eol_char, comment_prefix)
.filter(|line| !is_comment_line(line, comment_prefix))
.count()
} else {
Ok(row_iterator.count())
CountLines::new(quote_char, eol_char).count(bytes).0
}
});

let count_result: PolarsResult<usize> = POOL.install(|| iter.sum());
let n: usize = POOL.install(|| iter.sum());

Ok(n - (has_header as usize))
}

/// Read the number of rows without parsing columns
pub fn count_rows_from_slice(
mut bytes: &[u8],
quote_char: Option<u8>,
comment_prefix: Option<&CommentPrefix>,
eol_char: u8,
has_header: bool,
) -> PolarsResult<usize> {
for _ in 0..bytes.len() {
if bytes[0] != eol_char {
break;
}

match count_result {
Ok(val) => Ok(val - (has_header as usize)),
Err(err) => Err(err),
bytes = &bytes[1..];
}

let n = if comment_prefix.is_some() {
SplitLines::new(bytes, quote_char, eol_char, comment_prefix)
.filter(|line| !is_comment_line(line, comment_prefix))
.count()
} else {
CountLines::new(quote_char, eol_char).count(bytes).0
};

Ok(n - (has_header as usize))
}

/// Skip the utf-8 Byte Order Mark.
Expand Down Expand Up @@ -740,7 +767,7 @@ impl CountLines {
}
}

// Returns count and offset in slice
/// Returns count and offset to split for remainder in slice.
#[cfg(feature = "simd")]
pub fn count(&self, bytes: &[u8]) -> (usize, usize) {
let mut total_idx = 0;
Expand Down
3 changes: 3 additions & 0 deletions crates/polars-io/src/csv/read/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ use super::parser::next_line_position;
use super::parser::next_line_position_naive;
use super::splitfields::SplitFields;

/// TODO: Remove this in favor of parallel CountLines::analyze_chunk
///
/// (see https://github.com/pola-rs/polars/issues/19078)
pub(crate) fn get_file_chunks(
bytes: &[u8],
n_chunks: usize,
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-mem-engine/src/executors/scan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ impl ScanExec for CsvExec {

let bytes = maybe_decompress_bytes(&memslice, owned)?;

let num_rows = count_rows_from_slice(
let num_rows = polars_io::csv::read::count_rows_from_slice_par(
bytes,
popt.separator,
popt.quote_char,
Expand Down
8 changes: 2 additions & 6 deletions crates/polars-plan/src/plans/functions/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@ use arrow::io::ipc::read::get_row_count as count_rows_ipc_sync;
use polars_core::error::feature_gated;
#[cfg(any(feature = "parquet", feature = "json"))]
use polars_io::cloud::CloudOptions;
#[cfg(feature = "csv")]
use polars_io::csv::read::{
count_rows as count_rows_csv, count_rows_from_slice as count_rows_csv_from_slice,
};
#[cfg(all(feature = "parquet", feature = "async"))]
use polars_io::parquet::read::ParquetAsyncReader;
#[cfg(feature = "parquet")]
Expand Down Expand Up @@ -96,7 +92,7 @@ fn count_all_rows_csv(
sources
.iter()
.map(|source| match source {
ScanSourceRef::Path(path) => count_rows_csv(
ScanSourceRef::Path(path) => polars_io::csv::read::count_rows(
path,
parse_options.separator,
parse_options.quote_char,
Expand All @@ -107,7 +103,7 @@ fn count_all_rows_csv(
_ => {
let memslice = source.to_memslice()?;

count_rows_csv_from_slice(
polars_io::csv::read::count_rows_from_slice_par(
&memslice[..],
parse_options.separator,
parse_options.quote_char,
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-stream/src/nodes/io_sources/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -618,9 +618,9 @@ impl MultiScanable for CsvSourceNode {
}
};

// TODO: Parallelize this over the async executor
let num_rows = polars_io::csv::read::count_rows_from_slice(
&mem_slice[..],
parse_options.separator,
parse_options.quote_char,
parse_options.comment_prefix.as_ref(),
parse_options.eol_char,
Expand Down
18 changes: 18 additions & 0 deletions py-polars/tests/unit/io/test_lazy_count_star.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,24 @@ def test_count_csv(io_files_path: Path, path: str, n_rows: int) -> None:
assert_frame_equal(lf.collect(), expected)


def test_count_csv_comment_char() -> None:
q = pl.scan_csv(
b"""
a,b
1,2

#
3,4
""",
comment_prefix="#",
)

assert_frame_equal(
q.collect(), pl.DataFrame({"a": [1, None, 3], "b": [2, None, 4]})
)
assert q.select(pl.len()).collect().item() == 3


@pytest.mark.write_disk
def test_commented_csv() -> None:
with NamedTemporaryFile() as csv_a:
Expand Down
Loading