Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Refactor code for re-use by streaming NDJSON source #21520

Merged
merged 2 commits into from
Mar 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 42 additions & 17 deletions crates/polars-io/src/ndjson/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,15 +253,7 @@ impl<'a> CoreJsonReader<'a> {

fn count(mut self) -> PolarsResult<usize> {
let bytes = self.reader_bytes.take().unwrap();
let n_threads = self.n_threads.unwrap_or(POOL.current_num_threads());
let file_chunks = get_file_chunks_json(bytes.as_ref(), n_threads);

let iter = file_chunks.par_iter().map(|(start_pos, stop_at_nbytes)| {
let bytes = &bytes[*start_pos..*stop_at_nbytes];
let iter = json_lines(bytes);
iter.count()
});
Ok(POOL.install(|| iter.sum()))
Ok(super::count_rows_par(&bytes, self.n_threads))
}

fn parse_json(&mut self, mut n_threads: usize, bytes: &[u8]) -> PolarsResult<DataFrame> {
Expand Down Expand Up @@ -304,13 +296,11 @@ impl<'a> CoreJsonReader<'a> {
file_chunks
.into_par_iter()
.map(|(start_pos, stop_at_nbytes)| {
let mut buffers = init_buffers(&self.schema, capacity, self.ignore_errors)?;
parse_lines(&bytes[start_pos..stop_at_nbytes], &mut buffers)?;
let mut local_df = DataFrame::new(
buffers
.into_values()
.map(|buf| buf.into_series().into_column())
.collect::<_>(),
let mut local_df = parse_ndjson(
&bytes[start_pos..stop_at_nbytes],
Some(capacity),
&self.schema,
self.ignore_errors,
)?;

let prepredicate_height = local_df.height() as IdxSize;
Expand Down Expand Up @@ -394,7 +384,7 @@ struct Scratch {
buffers: simd_json::Buffers,
}

fn json_lines(bytes: &[u8]) -> impl Iterator<Item = &[u8]> {
pub fn json_lines(bytes: &[u8]) -> impl Iterator<Item = &[u8]> {
// This previously used `serde_json`'s `RawValue` to deserialize chunks without really deserializing them.
// However, this convenience comes at a cost. serde_json allocates and parses and does UTF-8 validation, all
// things we don't need since we use simd_json for them. Also, `serde_json::StreamDeserializer` has a more
Expand All @@ -417,6 +407,41 @@ fn parse_lines(bytes: &[u8], buffers: &mut PlIndexMap<BufferKey, Buffer>) -> Pol
Ok(())
}

pub fn parse_ndjson(
bytes: &[u8],
n_rows_hint: Option<usize>,
schema: &Schema,
ignore_errors: bool,
) -> PolarsResult<DataFrame> {
let capacity = n_rows_hint.unwrap_or_else(|| {
// Default to total len divided by max len of first and last non-empty lines or 1.
bytes
.split(|&c| c == b'\n')
.find(|x| !x.is_empty())
.map_or(1, |x| {
bytes.len().div_ceil(
x.len().max(
bytes
.rsplit(|&c| c == b'\n')
.find(|x| !x.is_empty())
.unwrap()
.len(),
),
)
})
});

let mut buffers = init_buffers(schema, capacity, ignore_errors)?;
parse_lines(bytes, &mut buffers)?;

DataFrame::new(
buffers
.into_values()
.map(|buf| buf.into_series().into_column())
.collect::<_>(),
)
}

/// Find the nearest next line position.
/// Does not check for new line characters embedded in String fields.
/// This just looks for `}\n`
Expand Down
28 changes: 28 additions & 0 deletions crates/polars-io/src/ndjson/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use core::{get_file_chunks_json, json_lines};
use std::num::NonZeroUsize;

use arrow::array::StructArray;
use polars_core::prelude::*;
use polars_core::POOL;
use rayon::iter::{IntoParallelIterator, ParallelIterator};

pub(crate) mod buffer;
pub mod core;
Expand All @@ -19,3 +22,28 @@ pub fn infer_schema<R: std::io::BufRead>(
.collect();
Ok(schema)
}

/// Count the number of rows. The slice passed must represent the entire file. This will
/// potentially parallelize using rayon.
///
/// This does not check if the lines are valid NDJSON - it assumes that is the case.
pub fn count_rows_par(full_bytes: &[u8], n_threads: Option<usize>) -> usize {
let n_threads = n_threads.unwrap_or(POOL.current_num_threads());
let file_chunks = get_file_chunks_json(full_bytes, n_threads);

if file_chunks.len() == 1 {
count_rows(full_bytes)
} else {
let iter = file_chunks
.into_par_iter()
.map(|(start_pos, stop_at_nbytes)| count_rows(&full_bytes[start_pos..stop_at_nbytes]));

POOL.install(|| iter.sum())
}
}

/// Count the number of rows. The slice passed must represent the entire file.
/// This does not check if the lines are valid NDJSON - it assumes that is the case.
pub fn count_rows(full_bytes: &[u8]) -> usize {
json_lines(full_bytes).count()
}
6 changes: 3 additions & 3 deletions crates/polars-plan/src/plans/conversion/scans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,10 +310,10 @@ pub fn csv_file_info(
}

#[cfg(feature = "json")]
pub(super) fn ndjson_file_info(
pub fn ndjson_file_info(
sources: &ScanSources,
file_options: &FileScanOptions,
ndjson_options: &mut NDJsonReadOptions,
ndjson_options: &NDJsonReadOptions,
cloud_options: Option<&polars_io::cloud::CloudOptions>,
) -> PolarsResult<FileInfo> {
use polars_core::config;
Expand Down Expand Up @@ -346,7 +346,7 @@ pub(super) fn ndjson_file_info(

let owned = &mut vec![];

let (mut reader_schema, schema) = if let Some(schema) = ndjson_options.schema.take() {
let (mut reader_schema, schema) = if let Some(schema) = ndjson_options.schema.clone() {
if file_options.row_index.is_none() {
(schema.clone(), schema.clone())
} else {
Expand Down
24 changes: 24 additions & 0 deletions crates/polars-plan/src/plans/optimizer/slice_pushdown_lp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,30 @@ impl SlicePushDown {

self.no_pushdown_finish_opt(lp, Some(state), lp_arena)
},
#[cfg(feature = "json")]
(Scan {
sources,
file_info,
hive_parts,
output_schema,
mut file_options,
predicate,
scan_type: FileScan::NDJson { options, cloud_options },
}, Some(state)) if predicate.is_none() && self.new_streaming => {
file_options.slice = Some((state.offset, state.len as usize));

let lp = Scan {
sources,
file_info,
hive_parts,
output_schema,
scan_type: FileScan::NDJson { options, cloud_options },
file_options,
predicate,
};

Ok(lp)
},
#[cfg(feature = "parquet")]
(Scan {
sources,
Expand Down

This file was deleted.

3 changes: 1 addition & 2 deletions crates/polars-stream/src/nodes/io_sources/parquet/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::sync::atomic::Ordering;
use std::sync::Arc;

use mem_prefetch_funcs::get_memory_prefetch_func;
use polars_core::config;
use polars_core::prelude::ArrowSchema;
use polars_core::schema::{Schema, SchemaExt, SchemaRef};
Expand All @@ -19,6 +18,7 @@ use polars_plan::plans::hive::HivePartitions;
use polars_plan::plans::FileInfo;
use polars_plan::prelude::FileScanOptions;
use polars_utils::index::AtomicIdxSize;
use polars_utils::mem::prefetch::get_memory_prefetch_func;
use polars_utils::pl_str::PlSmallStr;
use polars_utils::IdxSize;

Expand All @@ -33,7 +33,6 @@ use crate::nodes::TaskPriority;
use crate::utils::task_handles_ext;

mod init;
mod mem_prefetch_funcs;
mod metadata_fetch;
mod metadata_utils;
mod row_group_data_fetch;
Expand Down
Loading
Loading