From fed146370b7f9c53f688770570e02b02be0aae16 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Fri, 28 Feb 2025 20:16:21 +1100 Subject: [PATCH 1/2] c --- crates/polars-io/src/ndjson/core.rs | 59 +++- crates/polars-io/src/ndjson/mod.rs | 295 ++++++++++++++++++ .../polars-plan/src/plans/conversion/scans.rs | 6 +- .../src/plans/optimizer/slice_pushdown_lp.rs | 24 ++ .../io_sources/parquet/mem_prefetch_funcs.rs | 73 ----- .../src/nodes/io_sources/parquet/mod.rs | 3 +- .../parquet/row_group_data_fetch.rs | 115 ++++--- crates/polars-utils/src/mem.rs | 216 +++++++++---- crates/polars-utils/src/mmap.rs | 2 +- py-polars/tests/unit/io/test_lazy_json.py | 15 + 10 files changed, 585 insertions(+), 223 deletions(-) delete mode 100644 crates/polars-stream/src/nodes/io_sources/parquet/mem_prefetch_funcs.rs diff --git a/crates/polars-io/src/ndjson/core.rs b/crates/polars-io/src/ndjson/core.rs index 7567116fd33c..e33596b638fd 100644 --- a/crates/polars-io/src/ndjson/core.rs +++ b/crates/polars-io/src/ndjson/core.rs @@ -253,15 +253,7 @@ impl<'a> CoreJsonReader<'a> { fn count(mut self) -> PolarsResult { let bytes = self.reader_bytes.take().unwrap(); - let n_threads = self.n_threads.unwrap_or(POOL.current_num_threads()); - let file_chunks = get_file_chunks_json(bytes.as_ref(), n_threads); - - let iter = file_chunks.par_iter().map(|(start_pos, stop_at_nbytes)| { - let bytes = &bytes[*start_pos..*stop_at_nbytes]; - let iter = json_lines(bytes); - iter.count() - }); - Ok(POOL.install(|| iter.sum())) + Ok(super::count_rows_par(&bytes)) } fn parse_json(&mut self, mut n_threads: usize, bytes: &[u8]) -> PolarsResult { @@ -304,13 +296,11 @@ impl<'a> CoreJsonReader<'a> { file_chunks .into_par_iter() .map(|(start_pos, stop_at_nbytes)| { - let mut buffers = init_buffers(&self.schema, capacity, self.ignore_errors)?; - parse_lines(&bytes[start_pos..stop_at_nbytes], &mut buffers)?; - let mut local_df = DataFrame::new( - buffers - .into_values() - .map(|buf| buf.into_series().into_column()) - .collect::<_>(), + let mut local_df = parse_ndjson( + &bytes[start_pos..stop_at_nbytes], + Some(capacity), + &self.schema, + self.ignore_errors, )?; let prepredicate_height = local_df.height() as IdxSize; @@ -394,7 +384,7 @@ struct Scratch { buffers: simd_json::Buffers, } -fn json_lines(bytes: &[u8]) -> impl Iterator { +pub fn json_lines(bytes: &[u8]) -> impl Iterator { // This previously used `serde_json`'s `RawValue` to deserialize chunks without really deserializing them. // However, this convenience comes at a cost. serde_json allocates and parses and does UTF-8 validation, all // things we don't need since we use simd_json for them. Also, `serde_json::StreamDeserializer` has a more @@ -417,6 +407,41 @@ fn parse_lines(bytes: &[u8], buffers: &mut PlIndexMap) -> Pol Ok(()) } +pub fn parse_ndjson( + bytes: &[u8], + n_rows_hint: Option, + schema: &Schema, + ignore_errors: bool, +) -> PolarsResult { + let capacity = n_rows_hint.unwrap_or_else(|| { + // Default to total len divided by max len of first and last non-empty lines or 1. + bytes + .split(|&c| c == b'\n') + .find(|x| !x.is_empty()) + .map_or(1, |x| { + bytes.len().div_ceil( + x.len().max( + bytes + .rsplit(|&c| c == b'\n') + .find(|x| !x.is_empty()) + .unwrap() + .len(), + ), + ) + }) + }); + + let mut buffers = init_buffers(schema, capacity, ignore_errors)?; + parse_lines(bytes, &mut buffers)?; + + DataFrame::new( + buffers + .into_values() + .map(|buf| buf.into_series().into_column()) + .collect::<_>(), + ) +} + /// Find the nearest next line position. /// Does not check for new line characters embedded in String fields. /// This just looks for `}\n` diff --git a/crates/polars-io/src/ndjson/mod.rs b/crates/polars-io/src/ndjson/mod.rs index e01be6e12804..6d462fa9ca9e 100644 --- a/crates/polars-io/src/ndjson/mod.rs +++ b/crates/polars-io/src/ndjson/mod.rs @@ -1,7 +1,10 @@ +use core::json_lines; use std::num::NonZeroUsize; use arrow::array::StructArray; use polars_core::prelude::*; +use polars_core::POOL; +use rayon::iter::{IntoParallelIterator, ParallelIterator}; pub(crate) mod buffer; pub mod core; @@ -19,3 +22,295 @@ pub fn infer_schema( .collect(); Ok(schema) } + +/// Statistics for a chunk of text used for NDJSON parsing. +#[derive(Debug, Clone, PartialEq)] +struct ChunkStats { + non_empty_rows: usize, + /// Set to None if the chunk was empty. + last_newline_offset: Option, + /// Used when counting rows. + has_leading_empty_line: bool, + has_non_empty_remainder: bool, +} + +impl ChunkStats { + /// Assumes that: + /// * There is no quoting of newlines characters (unlike CSV) + /// * We do not count empty lines (successive newlines, or lines containing only whitespace / tab) + fn from_chunk(chunk: &[u8]) -> Self { + // Notes: Offsets are right-to-left in reverse mode. + let first_newline_offset = memchr::memchr(b'\n', chunk); + let last_newline_offset = memchr::memrchr(b'\n', chunk); + + let has_leading_empty_line = + first_newline_offset.is_some_and(|i| json_lines(&chunk[..i]).next().is_none()); + let has_non_empty_remainder = + json_lines(&chunk[last_newline_offset.map_or(0, |i| 1 + i)..chunk.len()]) + .next() + .is_some(); + + let mut non_empty_rows = if first_newline_offset.is_some() && !has_leading_empty_line { + 1 + } else { + 0 + }; + + if first_newline_offset.is_some() { + let range = first_newline_offset.unwrap() + 1..last_newline_offset.unwrap() + 1; + non_empty_rows += json_lines(&chunk[range]).count() + } + + Self { + non_empty_rows, + has_leading_empty_line, + last_newline_offset, + has_non_empty_remainder, + } + } + + /// Reduction state for counting rows. + /// + /// Note: `rhs` should be from the chunk immediately after `slf`, otherwise the results will be + /// incorrect. + pub fn reduce_count_rows(slf: &Self, rhs: &Self) -> Self { + let mut non_empty_rows = slf.non_empty_rows + rhs.non_empty_rows; + + if slf.has_non_empty_remainder && rhs.has_leading_empty_line { + non_empty_rows += 1; + } + + ChunkStats { + non_empty_rows, + last_newline_offset: rhs.last_newline_offset, + has_leading_empty_line: slf.has_leading_empty_line, + has_non_empty_remainder: rhs.has_non_empty_remainder + || (rhs.last_newline_offset.is_none() && slf.has_non_empty_remainder), + } + } + + /// The non-empty row count of this chunk assuming it is the last chunk (adds 1 if there is a + /// non-empty remainder). + pub fn non_empty_row_count_as_last_chunk(&self) -> usize { + self.non_empty_rows + self.has_non_empty_remainder as usize + } +} + +/// Count the number of rows. The slice passed must represent the entire file. This will +/// potentially parallelize using rayon. +/// +/// This does not check if the lines are valid NDJSON - it assumes that is the case. +pub fn count_rows_par(full_bytes: &[u8]) -> usize { + _count_rows_impl( + full_bytes, + std::env::var("POLARS_FORCE_NDJSON_CHUNK_SIZE") + .ok() + .and_then(|x| x.parse::().ok()), + ) +} + +/// Count the number of rows. The slice passed must represent the entire file. +/// This does not check if the lines are valid NDJSON - it assumes that is the case. +pub fn count_rows(full_bytes: &[u8]) -> usize { + json_lines(full_bytes).count() +} + +/// This is separate for testing purposes. +fn _count_rows_impl(full_bytes: &[u8], force_chunk_size: Option) -> usize { + let min_chunk_size = if cfg!(debug_assertions) { 0 } else { 16 * 1024 }; + + // Row count does not have a parsing dependency between threads, so we can just split into + // the same number of chunks as threads. + let chunk_size = force_chunk_size.unwrap_or( + full_bytes + .len() + .div_ceil(POOL.current_num_threads()) + .max(min_chunk_size), + ); + + if full_bytes.is_empty() { + return 0; + } + + let n_chunks = full_bytes.len().div_ceil(chunk_size); + + if n_chunks > 1 { + let identity = ChunkStats::from_chunk(&[]); + let acc_stats = POOL.install(|| { + (0..n_chunks) + .into_par_iter() + .map(|i| { + ChunkStats::from_chunk( + &full_bytes[i * chunk_size + ..(1 + i).saturating_mul(chunk_size).min(full_bytes.len())], + ) + }) + .reduce( + || identity.clone(), + |l, r| ChunkStats::reduce_count_rows(&l, &r), + ) + }); + + acc_stats.non_empty_row_count_as_last_chunk() + } else { + count_rows(full_bytes) + } +} + +#[cfg(test)] +mod tests { + use super::ChunkStats; + + #[test] + fn test_chunk_stats() { + let bytes = r#" +{"a": 1} +{"a": 2} +"# + .as_bytes(); + + assert_eq!( + ChunkStats::from_chunk(bytes), + ChunkStats { + non_empty_rows: 2, + last_newline_offset: Some(18), + has_leading_empty_line: true, + has_non_empty_remainder: false, + } + ); + + assert_eq!( + ChunkStats::from_chunk(&bytes[..bytes.len() - 3]), + ChunkStats { + non_empty_rows: 1, + last_newline_offset: Some(9), + has_leading_empty_line: true, + has_non_empty_remainder: true, + } + ); + + assert_eq!(super::_count_rows_impl(&[], Some(1)), 0); + assert_eq!(super::_count_rows_impl(bytes, Some(1)), 2); + assert_eq!(super::_count_rows_impl(bytes, Some(3)), 2); + assert_eq!(super::_count_rows_impl(bytes, Some(5)), 2); + assert_eq!(super::_count_rows_impl(bytes, Some(7)), 2); + assert_eq!(super::_count_rows_impl(bytes, Some(bytes.len())), 2); + + assert_eq!(super::count_rows_par(&[]), 0); + + assert_eq!( + ChunkStats::from_chunk(&[]), + ChunkStats { + non_empty_rows: 0, + last_newline_offset: None, + has_leading_empty_line: false, + has_non_empty_remainder: false, + } + ); + + // Single-chars + + assert_eq!( + ChunkStats::from_chunk(b"\n"), + ChunkStats { + non_empty_rows: 0, + last_newline_offset: Some(0), + has_leading_empty_line: true, + has_non_empty_remainder: false, + } + ); + + assert_eq!( + ChunkStats::from_chunk(b"a"), + ChunkStats { + non_empty_rows: 0, + last_newline_offset: None, + has_leading_empty_line: false, + has_non_empty_remainder: true, + } + ); + + assert_eq!( + ChunkStats::from_chunk(b" "), + ChunkStats { + non_empty_rows: 0, + last_newline_offset: None, + has_leading_empty_line: false, + has_non_empty_remainder: false, + } + ); + + // Double-char combinations + + assert_eq!( + ChunkStats::from_chunk(b"a\n"), + ChunkStats { + non_empty_rows: 1, + last_newline_offset: Some(1), + has_leading_empty_line: false, + has_non_empty_remainder: false, + } + ); + + assert_eq!( + ChunkStats::from_chunk(b" \n"), + ChunkStats { + non_empty_rows: 0, + last_newline_offset: Some(1), + has_leading_empty_line: true, + has_non_empty_remainder: false, + } + ); + + assert_eq!( + ChunkStats::from_chunk(b"a "), + ChunkStats { + non_empty_rows: 0, + last_newline_offset: None, + has_leading_empty_line: false, + has_non_empty_remainder: true, + } + ); + } + + #[test] + fn test_chunk_stats_whitespace() { + let space_char = ' '; + let tab_char = '\t'; + // This is not valid JSON, but we simply need to test that ChunkStats only counts lines + // containing at least 1 non-whitespace character. + let bytes = format!( + " +abc + +abc + +{tab_char} +{space_char}{space_char}{space_char} + + abc{space_char} + +" + ); + let bytes = bytes.as_bytes(); + + assert_eq!( + ChunkStats::from_chunk(bytes), + ChunkStats { + non_empty_rows: 3, + last_newline_offset: Some(28), + has_leading_empty_line: true, + has_non_empty_remainder: false, + } + ); + } + + #[test] + fn test_count_rows() { + let bytes = r#"{"text": "\"hello", "id": 1} +{"text": "\"hello", "id": 1} "# + .as_bytes(); + + assert_eq!(super::count_rows_par(bytes), 2); + } +} diff --git a/crates/polars-plan/src/plans/conversion/scans.rs b/crates/polars-plan/src/plans/conversion/scans.rs index 14096d9cc2d6..4ef409cd6756 100644 --- a/crates/polars-plan/src/plans/conversion/scans.rs +++ b/crates/polars-plan/src/plans/conversion/scans.rs @@ -310,10 +310,10 @@ pub fn csv_file_info( } #[cfg(feature = "json")] -pub(super) fn ndjson_file_info( +pub fn ndjson_file_info( sources: &ScanSources, file_options: &FileScanOptions, - ndjson_options: &mut NDJsonReadOptions, + ndjson_options: &NDJsonReadOptions, cloud_options: Option<&polars_io::cloud::CloudOptions>, ) -> PolarsResult { use polars_core::config; @@ -346,7 +346,7 @@ pub(super) fn ndjson_file_info( let owned = &mut vec![]; - let (mut reader_schema, schema) = if let Some(schema) = ndjson_options.schema.take() { + let (mut reader_schema, schema) = if let Some(schema) = ndjson_options.schema.clone() { if file_options.row_index.is_none() { (schema.clone(), schema.clone()) } else { diff --git a/crates/polars-plan/src/plans/optimizer/slice_pushdown_lp.rs b/crates/polars-plan/src/plans/optimizer/slice_pushdown_lp.rs index c79193eaea91..d1f9d2c6f5a0 100644 --- a/crates/polars-plan/src/plans/optimizer/slice_pushdown_lp.rs +++ b/crates/polars-plan/src/plans/optimizer/slice_pushdown_lp.rs @@ -244,6 +244,30 @@ impl SlicePushDown { self.no_pushdown_finish_opt(lp, Some(state), lp_arena) }, + #[cfg(feature = "json")] + (Scan { + sources, + file_info, + hive_parts, + output_schema, + mut file_options, + predicate, + scan_type: FileScan::NDJson { options, cloud_options }, + }, Some(state)) if predicate.is_none() && self.new_streaming => { + file_options.slice = Some((state.offset, state.len as usize)); + + let lp = Scan { + sources, + file_info, + hive_parts, + output_schema, + scan_type: FileScan::NDJson { options, cloud_options }, + file_options, + predicate, + }; + + Ok(lp) + }, #[cfg(feature = "parquet")] (Scan { sources, diff --git a/crates/polars-stream/src/nodes/io_sources/parquet/mem_prefetch_funcs.rs b/crates/polars-stream/src/nodes/io_sources/parquet/mem_prefetch_funcs.rs deleted file mode 100644 index 414e88c3e4aa..000000000000 --- a/crates/polars-stream/src/nodes/io_sources/parquet/mem_prefetch_funcs.rs +++ /dev/null @@ -1,73 +0,0 @@ -pub(super) use polars_utils::mem::{ - force_populate_read, madvise_populate_read, madvise_sequential, madvise_willneed, prefetch_l2, -}; -pub(super) fn no_prefetch(_: &[u8]) {} - -pub(super) fn get_memory_prefetch_func(verbose: bool) -> fn(&[u8]) -> () { - let memory_prefetch_func = match std::env::var("POLARS_MEMORY_PREFETCH").ok().as_deref() { - None => { - // madvise_willneed performed the best on both MacOS on Apple Silicon and Ubuntu on x86-64, - // using PDS-H query 3 SF=10 after clearing file cache as a benchmark. - #[cfg(target_family = "unix")] - { - madvise_willneed - } - #[cfg(not(target_family = "unix"))] - { - no_prefetch - } - }, - Some("no_prefetch") => no_prefetch, - Some("prefetch_l2") => prefetch_l2, - Some("madvise_sequential") => { - #[cfg(target_family = "unix")] - { - madvise_sequential - } - #[cfg(not(target_family = "unix"))] - { - panic!("POLARS_MEMORY_PREFETCH=madvise_sequential is not supported by this system"); - } - }, - Some("madvise_willneed") => { - #[cfg(target_family = "unix")] - { - madvise_willneed - } - #[cfg(not(target_family = "unix"))] - { - panic!("POLARS_MEMORY_PREFETCH=madvise_willneed is not supported by this system"); - } - }, - Some("madvise_populate_read") => { - #[cfg(target_os = "linux")] - { - madvise_populate_read - } - #[cfg(not(target_os = "linux"))] - { - panic!( - "POLARS_MEMORY_PREFETCH=madvise_populate_read is not supported by this system" - ); - } - }, - Some("force_populate_read") => force_populate_read, - Some(v) => panic!("invalid value for POLARS_MEMORY_PREFETCH: {}", v), - }; - - if verbose { - let func_name = match memory_prefetch_func as usize { - v if v == no_prefetch as usize => "no_prefetch", - v if v == prefetch_l2 as usize => "prefetch_l2", - v if v == madvise_sequential as usize => "madvise_sequential", - v if v == madvise_willneed as usize => "madvise_willneed", - v if v == madvise_populate_read as usize => "madvise_populate_read", - v if v == force_populate_read as usize => "force_populate_read", - _ => unreachable!(), - }; - - eprintln!("[ParquetSource] Memory prefetch function: {}", func_name); - } - - memory_prefetch_func -} diff --git a/crates/polars-stream/src/nodes/io_sources/parquet/mod.rs b/crates/polars-stream/src/nodes/io_sources/parquet/mod.rs index b696503bb5bb..86ace3e570b9 100644 --- a/crates/polars-stream/src/nodes/io_sources/parquet/mod.rs +++ b/crates/polars-stream/src/nodes/io_sources/parquet/mod.rs @@ -1,7 +1,6 @@ use std::sync::atomic::Ordering; use std::sync::Arc; -use mem_prefetch_funcs::get_memory_prefetch_func; use polars_core::config; use polars_core::prelude::ArrowSchema; use polars_core::schema::{Schema, SchemaExt, SchemaRef}; @@ -19,6 +18,7 @@ use polars_plan::plans::hive::HivePartitions; use polars_plan::plans::FileInfo; use polars_plan::prelude::FileScanOptions; use polars_utils::index::AtomicIdxSize; +use polars_utils::mem::prefetch::get_memory_prefetch_func; use polars_utils::pl_str::PlSmallStr; use polars_utils::IdxSize; @@ -33,7 +33,6 @@ use crate::nodes::TaskPriority; use crate::utils::task_handles_ext; mod init; -mod mem_prefetch_funcs; mod metadata_fetch; mod metadata_utils; mod row_group_data_fetch; diff --git a/crates/polars-stream/src/nodes/io_sources/parquet/row_group_data_fetch.rs b/crates/polars-stream/src/nodes/io_sources/parquet/row_group_data_fetch.rs index c1dbd74c2c0a..947b29c24a9f 100644 --- a/crates/polars-stream/src/nodes/io_sources/parquet/row_group_data_fetch.rs +++ b/crates/polars-stream/src/nodes/io_sources/parquet/row_group_data_fetch.rs @@ -14,7 +14,6 @@ use polars_utils::mmap::MemSlice; use polars_utils::pl_str::PlSmallStr; use polars_utils::IdxSize; -use super::mem_prefetch_funcs; use super::row_group_decode::SharedFileState; use crate::utils::task_handles_ext; @@ -161,70 +160,70 @@ impl RowGroupDataFetcher { let current_max_row_group_height = self.current_max_row_group_height; let handle = io_runtime.spawn(async move { - let fetched_bytes = if let DynByteSource::MemSlice(mem_slice) = - current_byte_source.as_ref() - { - // Skip byte range calculation for `no_prefetch`. - if memory_prefetch_func as usize != mem_prefetch_funcs::no_prefetch as usize - { - let slice = mem_slice.0.as_ref(); - - if let Some(columns) = projection.as_ref() { - for range in get_row_group_byte_ranges_for_projection( - &row_group_metadata, - columns.as_ref(), - ) { + let fetched_bytes = + if let DynByteSource::MemSlice(mem_slice) = current_byte_source.as_ref() { + // Skip byte range calculation for `no_prefetch`. + if memory_prefetch_func as usize + != polars_utils::mem::prefetch::no_prefetch as usize + { + let slice = mem_slice.0.as_ref(); + + if let Some(columns) = projection.as_ref() { + for range in get_row_group_byte_ranges_for_projection( + &row_group_metadata, + columns.as_ref(), + ) { + memory_prefetch_func(unsafe { slice.get_unchecked(range) }) + } + } else { + let range = row_group_metadata.full_byte_range(); + let range = range.start as usize..range.end as usize; + memory_prefetch_func(unsafe { slice.get_unchecked(range) }) - } - } else { - let range = row_group_metadata.full_byte_range(); - let range = range.start as usize..range.end as usize; + }; + } - memory_prefetch_func(unsafe { slice.get_unchecked(range) }) - }; - } - - // We have a mmapped or in-memory slice representing the entire - // file that can be sliced directly, so we can skip the byte-range - // calculations and HashMap allocation. - let mem_slice = mem_slice.0.clone(); - FetchedBytes::MemSlice { - offset: 0, - mem_slice, - } - } else if let Some(columns) = projection.as_ref() { - let mut ranges = get_row_group_byte_ranges_for_projection( - &row_group_metadata, - columns.as_ref(), - ) - .collect::>(); - - let n_ranges = ranges.len(); - - let bytes_map = current_byte_source.get_ranges(&mut ranges).await?; - - assert_eq!(bytes_map.len(), n_ranges); - - FetchedBytes::BytesMap(bytes_map) - } else { - // We still prefer `get_ranges()` over a single `get_range()` for downloading - // the entire row group, as it can have less memory-copying. A single `get_range()` - // would naively concatenate the memory blocks of the entire row group, while - // `get_ranges()` can skip concatenation since the downloaded blocks are - // aligned to the columns. - let mut ranges = row_group_metadata - .byte_ranges_iter() - .map(|x| x.start as usize..x.end as usize) + // We have a mmapped or in-memory slice representing the entire + // file that can be sliced directly, so we can skip the byte-range + // calculations and HashMap allocation. + let mem_slice = mem_slice.0.clone(); + FetchedBytes::MemSlice { + offset: 0, + mem_slice, + } + } else if let Some(columns) = projection.as_ref() { + let mut ranges = get_row_group_byte_ranges_for_projection( + &row_group_metadata, + columns.as_ref(), + ) .collect::>(); - let n_ranges = ranges.len(); + let n_ranges = ranges.len(); - let bytes_map = current_byte_source.get_ranges(&mut ranges).await?; + let bytes_map = current_byte_source.get_ranges(&mut ranges).await?; - assert_eq!(bytes_map.len(), n_ranges); + assert_eq!(bytes_map.len(), n_ranges); - FetchedBytes::BytesMap(bytes_map) - }; + FetchedBytes::BytesMap(bytes_map) + } else { + // We still prefer `get_ranges()` over a single `get_range()` for downloading + // the entire row group, as it can have less memory-copying. A single `get_range()` + // would naively concatenate the memory blocks of the entire row group, while + // `get_ranges()` can skip concatenation since the downloaded blocks are + // aligned to the columns. + let mut ranges = row_group_metadata + .byte_ranges_iter() + .map(|x| x.start as usize..x.end as usize) + .collect::>(); + + let n_ranges = ranges.len(); + + let bytes_map = current_byte_source.get_ranges(&mut ranges).await?; + + assert_eq!(bytes_map.len(), n_ranges); + + FetchedBytes::BytesMap(bytes_map) + }; PolarsResult::Ok(RowGroupData { fetched_bytes, diff --git a/crates/polars-utils/src/mem.rs b/crates/polars-utils/src/mem.rs index bbe15eea6e3d..20bf08bf4784 100644 --- a/crates/polars-utils/src/mem.rs +++ b/crates/polars-utils/src/mem.rs @@ -1,15 +1,3 @@ -use once_cell::sync::Lazy; -static PAGE_SIZE: Lazy = Lazy::new(|| { - #[cfg(target_family = "unix")] - unsafe { - libc::sysconf(libc::_SC_PAGESIZE) as usize - } - #[cfg(not(target_family = "unix"))] - { - 4096 - } -}); - /// # Safety /// This may break aliasing rules, make sure you are the only owner. #[allow(clippy::mut_from_ref)] @@ -19,82 +7,172 @@ pub unsafe fn to_mutable_slice(s: &[T]) -> &mut [T] { std::slice::from_raw_parts_mut(ptr, len) } -/// # Safety -/// -/// This should only be called with pointers to valid memory. -unsafe fn prefetch_l2_impl(ptr: *const u8) { - #[cfg(target_arch = "x86_64")] - { - use std::arch::x86_64::*; - unsafe { _mm_prefetch(ptr as *const _, _MM_HINT_T1) }; +pub mod prefetch { + use once_cell::sync::Lazy; + static PAGE_SIZE: Lazy = Lazy::new(|| { + #[cfg(target_family = "unix")] + unsafe { + libc::sysconf(libc::_SC_PAGESIZE) as usize + } + #[cfg(not(target_family = "unix"))] + { + 4096 + } + }); + + /// # Safety + /// + /// This should only be called with pointers to valid memory. + unsafe fn prefetch_l2_impl(ptr: *const u8) { + #[cfg(target_arch = "x86_64")] + { + use std::arch::x86_64::*; + unsafe { _mm_prefetch(ptr as *const _, _MM_HINT_T1) }; + } + + #[cfg(all(target_arch = "aarch64", feature = "nightly"))] + { + use std::arch::aarch64::*; + unsafe { _prefetch(ptr as *const _, _PREFETCH_READ, _PREFETCH_LOCALITY2) }; + } } - #[cfg(all(target_arch = "aarch64", feature = "nightly"))] - { - use std::arch::aarch64::*; - unsafe { _prefetch(ptr as *const _, _PREFETCH_READ, _PREFETCH_LOCALITY2) }; + /// Attempt to prefetch the memory in the slice to the L2 cache. + pub fn prefetch_l2(slice: &[u8]) { + if slice.is_empty() { + return; + } + + // @TODO: We can play a bit more with this prefetching. Maybe introduce a maximum number of + // prefetches as to not overwhelm the processor. The linear prefetcher should pick it up + // at a certain point. + + for i in (0..slice.len()).step_by(*PAGE_SIZE) { + unsafe { prefetch_l2_impl(slice[i..].as_ptr()) }; + } + + unsafe { prefetch_l2_impl(slice[slice.len() - 1..].as_ptr()) } } -} -/// Attempt to prefetch the memory in the slice to the L2 cache. -pub fn prefetch_l2(slice: &[u8]) { - if slice.is_empty() { - return; + /// `madvise()` with `MADV_SEQUENTIAL` on unix systems. This is a no-op on non-unix systems. + pub fn madvise_sequential(#[allow(unused)] slice: &[u8]) { + #[cfg(target_family = "unix")] + madvise(slice, libc::MADV_SEQUENTIAL); } - // @TODO: We can play a bit more with this prefetching. Maybe introduce a maximum number of - // prefetches as to not overwhelm the processor. The linear prefetcher should pick it up - // at a certain point. + /// `madvise()` with `MADV_WILLNEED` on unix systems. This is a no-op on non-unix systems. + pub fn madvise_willneed(#[allow(unused)] slice: &[u8]) { + #[cfg(target_family = "unix")] + madvise(slice, libc::MADV_WILLNEED); + } - for i in (0..slice.len()).step_by(*PAGE_SIZE) { - unsafe { prefetch_l2_impl(slice[i..].as_ptr()) }; + /// `madvise()` with `MADV_POPULATE_READ` on linux systems. This a no-op on non-linux systems. + pub fn madvise_populate_read(#[allow(unused)] slice: &[u8]) { + #[cfg(target_os = "linux")] + madvise(slice, libc::MADV_POPULATE_READ); } - unsafe { prefetch_l2_impl(slice[slice.len() - 1..].as_ptr()) } -} + /// Forcibly reads at least one byte each page. + pub fn force_populate_read(slice: &[u8]) { + for i in (0..slice.len()).step_by(*PAGE_SIZE) { + std::hint::black_box(slice[i]); + } -/// `madvise()` with `MADV_SEQUENTIAL` on unix systems. This is a no-op on non-unix systems. -pub fn madvise_sequential(#[allow(unused)] slice: &[u8]) { - #[cfg(target_family = "unix")] - madvise(slice, libc::MADV_SEQUENTIAL); -} + std::hint::black_box(slice.last().copied()); + } -/// `madvise()` with `MADV_WILLNEED` on unix systems. This is a no-op on non-unix systems. -pub fn madvise_willneed(#[allow(unused)] slice: &[u8]) { #[cfg(target_family = "unix")] - madvise(slice, libc::MADV_WILLNEED); -} + fn madvise(slice: &[u8], advice: libc::c_int) { + if slice.is_empty() { + return; + } + let ptr = slice.as_ptr(); -/// `madvise()` with `MADV_POPULATE_READ` on linux systems. This a no-op on non-linux systems. -pub fn madvise_populate_read(#[allow(unused)] slice: &[u8]) { - #[cfg(target_os = "linux")] - madvise(slice, libc::MADV_POPULATE_READ); -} + let align = ptr as usize % *PAGE_SIZE; + let ptr = ptr.wrapping_sub(align); + let len = slice.len() + align; -/// Forcibly reads at least one byte each page. -pub fn force_populate_read(slice: &[u8]) { - for i in (0..slice.len()).step_by(*PAGE_SIZE) { - std::hint::black_box(slice[i]); + if unsafe { libc::madvise(ptr as *mut libc::c_void, len, advice) } != 0 { + let err = std::io::Error::last_os_error(); + if let std::io::ErrorKind::InvalidInput = err.kind() { + panic!("{}", err); + } + } } - std::hint::black_box(slice.last().copied()); -} + pub fn no_prefetch(_: &[u8]) {} -#[cfg(target_family = "unix")] -fn madvise(slice: &[u8], advice: libc::c_int) { - if slice.is_empty() { - return; - } - let ptr = slice.as_ptr(); + /// Get the configured memory prefetch function. + pub fn get_memory_prefetch_func(verbose: bool) -> fn(&[u8]) -> () { + let memory_prefetch_func = match std::env::var("POLARS_MEMORY_PREFETCH").ok().as_deref() { + None => { + // madvise_willneed performed the best on both MacOS on Apple Silicon and Ubuntu on x86-64, + // using PDS-H query 3 SF=10 after clearing file cache as a benchmark. + #[cfg(target_family = "unix")] + { + madvise_willneed + } + #[cfg(not(target_family = "unix"))] + { + no_prefetch + } + }, + Some("no_prefetch") => no_prefetch, + Some("prefetch_l2") => prefetch_l2, + Some("madvise_sequential") => { + #[cfg(target_family = "unix")] + { + madvise_sequential + } + #[cfg(not(target_family = "unix"))] + { + panic!( + "POLARS_MEMORY_PREFETCH=madvise_sequential is not supported by this system" + ); + } + }, + Some("madvise_willneed") => { + #[cfg(target_family = "unix")] + { + madvise_willneed + } + #[cfg(not(target_family = "unix"))] + { + panic!( + "POLARS_MEMORY_PREFETCH=madvise_willneed is not supported by this system" + ); + } + }, + Some("madvise_populate_read") => { + #[cfg(target_os = "linux")] + { + madvise_populate_read + } + #[cfg(not(target_os = "linux"))] + { + panic!( + "POLARS_MEMORY_PREFETCH=madvise_populate_read is not supported by this system" + ); + } + }, + Some("force_populate_read") => force_populate_read, + Some(v) => panic!("invalid value for POLARS_MEMORY_PREFETCH: {}", v), + }; - let align = ptr as usize % *PAGE_SIZE; - let ptr = ptr.wrapping_sub(align); - let len = slice.len() + align; + if verbose { + let func_name = match memory_prefetch_func as usize { + v if v == no_prefetch as usize => "no_prefetch", + v if v == prefetch_l2 as usize => "prefetch_l2", + v if v == madvise_sequential as usize => "madvise_sequential", + v if v == madvise_willneed as usize => "madvise_willneed", + v if v == madvise_populate_read as usize => "madvise_populate_read", + v if v == force_populate_read as usize => "force_populate_read", + _ => unreachable!(), + }; - if unsafe { libc::madvise(ptr as *mut libc::c_void, len, advice) } != 0 { - let err = std::io::Error::last_os_error(); - if let std::io::ErrorKind::InvalidInput = err.kind() { - panic!("{}", err); + eprintln!("memory prefetch function: {}", func_name); } + + memory_prefetch_func } } diff --git a/crates/polars-utils/src/mmap.rs b/crates/polars-utils/src/mmap.rs index bdac6e095a94..922e78d30cc2 100644 --- a/crates/polars-utils/src/mmap.rs +++ b/crates/polars-utils/src/mmap.rs @@ -11,7 +11,7 @@ mod private { use polars_error::PolarsResult; use super::MMapSemaphore; - use crate::mem::prefetch_l2; + use crate::mem::prefetch::prefetch_l2; /// A read-only reference to a slice of memory that can potentially be memory-mapped. /// diff --git a/py-polars/tests/unit/io/test_lazy_json.py b/py-polars/tests/unit/io/test_lazy_json.py index a5a53d78f94c..c5c6ff145b5e 100644 --- a/py-polars/tests/unit/io/test_lazy_json.py +++ b/py-polars/tests/unit/io/test_lazy_json.py @@ -172,3 +172,18 @@ def test_glob_single_scan(io_files_path: Path) -> None: assert explain.count("SCAN") == 1 assert "UNION" not in explain + + +def test_scan_ndjson_empty_lines_in_middle() -> None: + assert_frame_equal( + pl.scan_ndjson( + f"""\ +{{"a": 1}} +{" "} +{{"a": 2}}{" "} +{" "} +{{"a": 3}} +""".encode() + ).collect(), + pl.DataFrame({"a": [1, 2, 3]}), + ) From 317b7b12908aada03c920b3b26caaee4dfcef15c Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Fri, 28 Feb 2025 20:38:26 +1100 Subject: [PATCH 2/2] c --- crates/polars-io/src/ndjson/core.rs | 2 +- crates/polars-io/src/ndjson/mod.rs | 295 ++-------------------------- 2 files changed, 15 insertions(+), 282 deletions(-) diff --git a/crates/polars-io/src/ndjson/core.rs b/crates/polars-io/src/ndjson/core.rs index e33596b638fd..fbbf89471920 100644 --- a/crates/polars-io/src/ndjson/core.rs +++ b/crates/polars-io/src/ndjson/core.rs @@ -253,7 +253,7 @@ impl<'a> CoreJsonReader<'a> { fn count(mut self) -> PolarsResult { let bytes = self.reader_bytes.take().unwrap(); - Ok(super::count_rows_par(&bytes)) + Ok(super::count_rows_par(&bytes, self.n_threads)) } fn parse_json(&mut self, mut n_threads: usize, bytes: &[u8]) -> PolarsResult { diff --git a/crates/polars-io/src/ndjson/mod.rs b/crates/polars-io/src/ndjson/mod.rs index 6d462fa9ca9e..676ee77ae9f8 100644 --- a/crates/polars-io/src/ndjson/mod.rs +++ b/crates/polars-io/src/ndjson/mod.rs @@ -1,4 +1,4 @@ -use core::json_lines; +use core::{get_file_chunks_json, json_lines}; use std::num::NonZeroUsize; use arrow::array::StructArray; @@ -23,90 +23,23 @@ pub fn infer_schema( Ok(schema) } -/// Statistics for a chunk of text used for NDJSON parsing. -#[derive(Debug, Clone, PartialEq)] -struct ChunkStats { - non_empty_rows: usize, - /// Set to None if the chunk was empty. - last_newline_offset: Option, - /// Used when counting rows. - has_leading_empty_line: bool, - has_non_empty_remainder: bool, -} - -impl ChunkStats { - /// Assumes that: - /// * There is no quoting of newlines characters (unlike CSV) - /// * We do not count empty lines (successive newlines, or lines containing only whitespace / tab) - fn from_chunk(chunk: &[u8]) -> Self { - // Notes: Offsets are right-to-left in reverse mode. - let first_newline_offset = memchr::memchr(b'\n', chunk); - let last_newline_offset = memchr::memrchr(b'\n', chunk); - - let has_leading_empty_line = - first_newline_offset.is_some_and(|i| json_lines(&chunk[..i]).next().is_none()); - let has_non_empty_remainder = - json_lines(&chunk[last_newline_offset.map_or(0, |i| 1 + i)..chunk.len()]) - .next() - .is_some(); - - let mut non_empty_rows = if first_newline_offset.is_some() && !has_leading_empty_line { - 1 - } else { - 0 - }; - - if first_newline_offset.is_some() { - let range = first_newline_offset.unwrap() + 1..last_newline_offset.unwrap() + 1; - non_empty_rows += json_lines(&chunk[range]).count() - } - - Self { - non_empty_rows, - has_leading_empty_line, - last_newline_offset, - has_non_empty_remainder, - } - } - - /// Reduction state for counting rows. - /// - /// Note: `rhs` should be from the chunk immediately after `slf`, otherwise the results will be - /// incorrect. - pub fn reduce_count_rows(slf: &Self, rhs: &Self) -> Self { - let mut non_empty_rows = slf.non_empty_rows + rhs.non_empty_rows; - - if slf.has_non_empty_remainder && rhs.has_leading_empty_line { - non_empty_rows += 1; - } - - ChunkStats { - non_empty_rows, - last_newline_offset: rhs.last_newline_offset, - has_leading_empty_line: slf.has_leading_empty_line, - has_non_empty_remainder: rhs.has_non_empty_remainder - || (rhs.last_newline_offset.is_none() && slf.has_non_empty_remainder), - } - } - - /// The non-empty row count of this chunk assuming it is the last chunk (adds 1 if there is a - /// non-empty remainder). - pub fn non_empty_row_count_as_last_chunk(&self) -> usize { - self.non_empty_rows + self.has_non_empty_remainder as usize - } -} - /// Count the number of rows. The slice passed must represent the entire file. This will /// potentially parallelize using rayon. /// /// This does not check if the lines are valid NDJSON - it assumes that is the case. -pub fn count_rows_par(full_bytes: &[u8]) -> usize { - _count_rows_impl( - full_bytes, - std::env::var("POLARS_FORCE_NDJSON_CHUNK_SIZE") - .ok() - .and_then(|x| x.parse::().ok()), - ) +pub fn count_rows_par(full_bytes: &[u8], n_threads: Option) -> usize { + let n_threads = n_threads.unwrap_or(POOL.current_num_threads()); + let file_chunks = get_file_chunks_json(full_bytes, n_threads); + + if file_chunks.len() == 1 { + count_rows(full_bytes) + } else { + let iter = file_chunks + .into_par_iter() + .map(|(start_pos, stop_at_nbytes)| count_rows(&full_bytes[start_pos..stop_at_nbytes])); + + POOL.install(|| iter.sum()) + } } /// Count the number of rows. The slice passed must represent the entire file. @@ -114,203 +47,3 @@ pub fn count_rows_par(full_bytes: &[u8]) -> usize { pub fn count_rows(full_bytes: &[u8]) -> usize { json_lines(full_bytes).count() } - -/// This is separate for testing purposes. -fn _count_rows_impl(full_bytes: &[u8], force_chunk_size: Option) -> usize { - let min_chunk_size = if cfg!(debug_assertions) { 0 } else { 16 * 1024 }; - - // Row count does not have a parsing dependency between threads, so we can just split into - // the same number of chunks as threads. - let chunk_size = force_chunk_size.unwrap_or( - full_bytes - .len() - .div_ceil(POOL.current_num_threads()) - .max(min_chunk_size), - ); - - if full_bytes.is_empty() { - return 0; - } - - let n_chunks = full_bytes.len().div_ceil(chunk_size); - - if n_chunks > 1 { - let identity = ChunkStats::from_chunk(&[]); - let acc_stats = POOL.install(|| { - (0..n_chunks) - .into_par_iter() - .map(|i| { - ChunkStats::from_chunk( - &full_bytes[i * chunk_size - ..(1 + i).saturating_mul(chunk_size).min(full_bytes.len())], - ) - }) - .reduce( - || identity.clone(), - |l, r| ChunkStats::reduce_count_rows(&l, &r), - ) - }); - - acc_stats.non_empty_row_count_as_last_chunk() - } else { - count_rows(full_bytes) - } -} - -#[cfg(test)] -mod tests { - use super::ChunkStats; - - #[test] - fn test_chunk_stats() { - let bytes = r#" -{"a": 1} -{"a": 2} -"# - .as_bytes(); - - assert_eq!( - ChunkStats::from_chunk(bytes), - ChunkStats { - non_empty_rows: 2, - last_newline_offset: Some(18), - has_leading_empty_line: true, - has_non_empty_remainder: false, - } - ); - - assert_eq!( - ChunkStats::from_chunk(&bytes[..bytes.len() - 3]), - ChunkStats { - non_empty_rows: 1, - last_newline_offset: Some(9), - has_leading_empty_line: true, - has_non_empty_remainder: true, - } - ); - - assert_eq!(super::_count_rows_impl(&[], Some(1)), 0); - assert_eq!(super::_count_rows_impl(bytes, Some(1)), 2); - assert_eq!(super::_count_rows_impl(bytes, Some(3)), 2); - assert_eq!(super::_count_rows_impl(bytes, Some(5)), 2); - assert_eq!(super::_count_rows_impl(bytes, Some(7)), 2); - assert_eq!(super::_count_rows_impl(bytes, Some(bytes.len())), 2); - - assert_eq!(super::count_rows_par(&[]), 0); - - assert_eq!( - ChunkStats::from_chunk(&[]), - ChunkStats { - non_empty_rows: 0, - last_newline_offset: None, - has_leading_empty_line: false, - has_non_empty_remainder: false, - } - ); - - // Single-chars - - assert_eq!( - ChunkStats::from_chunk(b"\n"), - ChunkStats { - non_empty_rows: 0, - last_newline_offset: Some(0), - has_leading_empty_line: true, - has_non_empty_remainder: false, - } - ); - - assert_eq!( - ChunkStats::from_chunk(b"a"), - ChunkStats { - non_empty_rows: 0, - last_newline_offset: None, - has_leading_empty_line: false, - has_non_empty_remainder: true, - } - ); - - assert_eq!( - ChunkStats::from_chunk(b" "), - ChunkStats { - non_empty_rows: 0, - last_newline_offset: None, - has_leading_empty_line: false, - has_non_empty_remainder: false, - } - ); - - // Double-char combinations - - assert_eq!( - ChunkStats::from_chunk(b"a\n"), - ChunkStats { - non_empty_rows: 1, - last_newline_offset: Some(1), - has_leading_empty_line: false, - has_non_empty_remainder: false, - } - ); - - assert_eq!( - ChunkStats::from_chunk(b" \n"), - ChunkStats { - non_empty_rows: 0, - last_newline_offset: Some(1), - has_leading_empty_line: true, - has_non_empty_remainder: false, - } - ); - - assert_eq!( - ChunkStats::from_chunk(b"a "), - ChunkStats { - non_empty_rows: 0, - last_newline_offset: None, - has_leading_empty_line: false, - has_non_empty_remainder: true, - } - ); - } - - #[test] - fn test_chunk_stats_whitespace() { - let space_char = ' '; - let tab_char = '\t'; - // This is not valid JSON, but we simply need to test that ChunkStats only counts lines - // containing at least 1 non-whitespace character. - let bytes = format!( - " -abc - -abc - -{tab_char} -{space_char}{space_char}{space_char} - - abc{space_char} - -" - ); - let bytes = bytes.as_bytes(); - - assert_eq!( - ChunkStats::from_chunk(bytes), - ChunkStats { - non_empty_rows: 3, - last_newline_offset: Some(28), - has_leading_empty_line: true, - has_non_empty_remainder: false, - } - ); - } - - #[test] - fn test_count_rows() { - let bytes = r#"{"text": "\"hello", "id": 1} -{"text": "\"hello", "id": 1} "# - .as_bytes(); - - assert_eq!(super::count_rows_par(bytes), 2); - } -}