Skip to content

Commit

Permalink
refactor: Refactor code for re-use by streaming NDJSON source (#21520)
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion authored Mar 1, 2025
1 parent 6d33653 commit 20ee230
Show file tree
Hide file tree
Showing 10 changed files with 318 additions and 223 deletions.
59 changes: 42 additions & 17 deletions crates/polars-io/src/ndjson/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,15 +253,7 @@ impl<'a> CoreJsonReader<'a> {

fn count(mut self) -> PolarsResult<usize> {
let bytes = self.reader_bytes.take().unwrap();
let n_threads = self.n_threads.unwrap_or(POOL.current_num_threads());
let file_chunks = get_file_chunks_json(bytes.as_ref(), n_threads);

let iter = file_chunks.par_iter().map(|(start_pos, stop_at_nbytes)| {
let bytes = &bytes[*start_pos..*stop_at_nbytes];
let iter = json_lines(bytes);
iter.count()
});
Ok(POOL.install(|| iter.sum()))
Ok(super::count_rows_par(&bytes, self.n_threads))
}

fn parse_json(&mut self, mut n_threads: usize, bytes: &[u8]) -> PolarsResult<DataFrame> {
Expand Down Expand Up @@ -304,13 +296,11 @@ impl<'a> CoreJsonReader<'a> {
file_chunks
.into_par_iter()
.map(|(start_pos, stop_at_nbytes)| {
let mut buffers = init_buffers(&self.schema, capacity, self.ignore_errors)?;
parse_lines(&bytes[start_pos..stop_at_nbytes], &mut buffers)?;
let mut local_df = DataFrame::new(
buffers
.into_values()
.map(|buf| buf.into_series().into_column())
.collect::<_>(),
let mut local_df = parse_ndjson(
&bytes[start_pos..stop_at_nbytes],
Some(capacity),
&self.schema,
self.ignore_errors,
)?;

let prepredicate_height = local_df.height() as IdxSize;
Expand Down Expand Up @@ -394,7 +384,7 @@ struct Scratch {
buffers: simd_json::Buffers,
}

fn json_lines(bytes: &[u8]) -> impl Iterator<Item = &[u8]> {
pub fn json_lines(bytes: &[u8]) -> impl Iterator<Item = &[u8]> {
// This previously used `serde_json`'s `RawValue` to deserialize chunks without really deserializing them.
// However, this convenience comes at a cost. serde_json allocates and parses and does UTF-8 validation, all
// things we don't need since we use simd_json for them. Also, `serde_json::StreamDeserializer` has a more
Expand All @@ -417,6 +407,41 @@ fn parse_lines(bytes: &[u8], buffers: &mut PlIndexMap<BufferKey, Buffer>) -> Pol
Ok(())
}

pub fn parse_ndjson(
bytes: &[u8],
n_rows_hint: Option<usize>,
schema: &Schema,
ignore_errors: bool,
) -> PolarsResult<DataFrame> {
let capacity = n_rows_hint.unwrap_or_else(|| {
// Default to total len divided by max len of first and last non-empty lines or 1.
bytes
.split(|&c| c == b'\n')
.find(|x| !x.is_empty())
.map_or(1, |x| {
bytes.len().div_ceil(
x.len().max(
bytes
.rsplit(|&c| c == b'\n')
.find(|x| !x.is_empty())
.unwrap()
.len(),
),
)
})
});

let mut buffers = init_buffers(schema, capacity, ignore_errors)?;
parse_lines(bytes, &mut buffers)?;

DataFrame::new(
buffers
.into_values()
.map(|buf| buf.into_series().into_column())
.collect::<_>(),
)
}

/// Find the nearest next line position.
/// Does not check for new line characters embedded in String fields.
/// This just looks for `}\n`
Expand Down
28 changes: 28 additions & 0 deletions crates/polars-io/src/ndjson/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use core::{get_file_chunks_json, json_lines};
use std::num::NonZeroUsize;

use arrow::array::StructArray;
use polars_core::prelude::*;
use polars_core::POOL;
use rayon::iter::{IntoParallelIterator, ParallelIterator};

pub(crate) mod buffer;
pub mod core;
Expand All @@ -19,3 +22,28 @@ pub fn infer_schema<R: std::io::BufRead>(
.collect();
Ok(schema)
}

/// Count the number of rows. The slice passed must represent the entire file. This will
/// potentially parallelize using rayon.
///
/// This does not check if the lines are valid NDJSON - it assumes that is the case.
pub fn count_rows_par(full_bytes: &[u8], n_threads: Option<usize>) -> usize {
let n_threads = n_threads.unwrap_or(POOL.current_num_threads());
let file_chunks = get_file_chunks_json(full_bytes, n_threads);

if file_chunks.len() == 1 {
count_rows(full_bytes)
} else {
let iter = file_chunks
.into_par_iter()
.map(|(start_pos, stop_at_nbytes)| count_rows(&full_bytes[start_pos..stop_at_nbytes]));

POOL.install(|| iter.sum())
}
}

/// Count the number of rows. The slice passed must represent the entire file.
/// This does not check if the lines are valid NDJSON - it assumes that is the case.
pub fn count_rows(full_bytes: &[u8]) -> usize {
json_lines(full_bytes).count()
}
6 changes: 3 additions & 3 deletions crates/polars-plan/src/plans/conversion/scans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,10 +310,10 @@ pub fn csv_file_info(
}

#[cfg(feature = "json")]
pub(super) fn ndjson_file_info(
pub fn ndjson_file_info(
sources: &ScanSources,
file_options: &FileScanOptions,
ndjson_options: &mut NDJsonReadOptions,
ndjson_options: &NDJsonReadOptions,
cloud_options: Option<&polars_io::cloud::CloudOptions>,
) -> PolarsResult<FileInfo> {
use polars_core::config;
Expand Down Expand Up @@ -346,7 +346,7 @@ pub(super) fn ndjson_file_info(

let owned = &mut vec![];

let (mut reader_schema, schema) = if let Some(schema) = ndjson_options.schema.take() {
let (mut reader_schema, schema) = if let Some(schema) = ndjson_options.schema.clone() {
if file_options.row_index.is_none() {
(schema.clone(), schema.clone())
} else {
Expand Down
24 changes: 24 additions & 0 deletions crates/polars-plan/src/plans/optimizer/slice_pushdown_lp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,30 @@ impl SlicePushDown {

self.no_pushdown_finish_opt(lp, Some(state), lp_arena)
},
#[cfg(feature = "json")]
(Scan {
sources,
file_info,
hive_parts,
output_schema,
mut file_options,
predicate,
scan_type: FileScan::NDJson { options, cloud_options },
}, Some(state)) if predicate.is_none() && self.new_streaming => {
file_options.slice = Some((state.offset, state.len as usize));

let lp = Scan {
sources,
file_info,
hive_parts,
output_schema,
scan_type: FileScan::NDJson { options, cloud_options },
file_options,
predicate,
};

Ok(lp)
},
#[cfg(feature = "parquet")]
(Scan {
sources,
Expand Down

This file was deleted.

3 changes: 1 addition & 2 deletions crates/polars-stream/src/nodes/io_sources/parquet/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::sync::atomic::Ordering;
use std::sync::Arc;

use mem_prefetch_funcs::get_memory_prefetch_func;
use polars_core::config;
use polars_core::prelude::ArrowSchema;
use polars_core::schema::{Schema, SchemaExt, SchemaRef};
Expand All @@ -19,6 +18,7 @@ use polars_plan::plans::hive::HivePartitions;
use polars_plan::plans::FileInfo;
use polars_plan::prelude::FileScanOptions;
use polars_utils::index::AtomicIdxSize;
use polars_utils::mem::prefetch::get_memory_prefetch_func;
use polars_utils::pl_str::PlSmallStr;
use polars_utils::IdxSize;

Expand All @@ -33,7 +33,6 @@ use crate::nodes::TaskPriority;
use crate::utils::task_handles_ext;

mod init;
mod mem_prefetch_funcs;
mod metadata_fetch;
mod metadata_utils;
mod row_group_data_fetch;
Expand Down
Loading

0 comments on commit 20ee230

Please sign in to comment.