From 1a1d3ab6b35e81f429f0e2b5da8fca846abd6ff3 Mon Sep 17 00:00:00 2001 From: ritchie Date: Sat, 5 Oct 2024 16:23:29 +0200 Subject: [PATCH] add memoization and profile chunk sizes --- crates/polars-io/src/csv/read/parser.rs | 177 +++++++++---------- crates/polars-io/src/csv/read/read_impl.rs | 9 +- crates/polars-io/src/csv/read/splitfields.rs | 41 ++++- 3 files changed, 131 insertions(+), 96 deletions(-) diff --git a/crates/polars-io/src/csv/read/parser.rs b/crates/polars-io/src/csv/read/parser.rs index 2aefe43caa4a..789f8bfd254f 100644 --- a/crates/polars-io/src/csv/read/parser.rs +++ b/crates/polars-io/src/csv/read/parser.rs @@ -441,113 +441,110 @@ impl<'a> Iterator for SplitLines<'a> { if self.v.is_empty() { return None; } - { - self.total_index = 0; - let mut not_in_field_previous_iter = true; + self.total_index = 0; + let mut not_in_field_previous_iter = true; - if self.previous_valid_eols != 0 { - let pos = self.previous_valid_eols.trailing_zeros() as usize; - self.previous_valid_eols >>= (pos + 1) as u64; + if self.previous_valid_eols != 0 { + let pos = self.previous_valid_eols.trailing_zeros() as usize; + self.previous_valid_eols >>= (pos + 1) as u64; - unsafe { - debug_assert!((pos) <= self.v.len()); + unsafe { + debug_assert!((pos) <= self.v.len()); - // return line up to this position - let ret = Some(self.v.get_unchecked(..pos)); - // skip the '\n' token and update slice. - self.v = self.v.get_unchecked_release(pos + 1..); - return ret; - } + // return line up to this position + let ret = Some(self.v.get_unchecked(..pos)); + // skip the '\n' token and update slice. + self.v = self.v.get_unchecked_release(pos + 1..); + return ret; } + } - loop { - let bytes = unsafe { self.v.get_unchecked_release(self.total_index..) }; - if bytes.len() > SIMD_SIZE { - let lane: [u8; SIMD_SIZE] = unsafe { - bytes - .get_unchecked(0..SIMD_SIZE) - .try_into() - .unwrap_unchecked_release() - }; - let simd_bytes = SimdVec::from(lane); - let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask(); - - let valid_eols = if self.quoting { - let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask(); - let mut not_in_quote_field = prefix_xorsum_inclusive(quote_mask); - - if not_in_field_previous_iter { - not_in_quote_field = !not_in_quote_field; - } - not_in_field_previous_iter = - (not_in_quote_field & (1 << (SIMD_SIZE - 1))) > 0; - eol_mask & not_in_quote_field - } else { - eol_mask - }; + loop { + let bytes = unsafe { self.v.get_unchecked_release(self.total_index..) }; + if bytes.len() > SIMD_SIZE { + let lane: [u8; SIMD_SIZE] = unsafe { + bytes + .get_unchecked(0..SIMD_SIZE) + .try_into() + .unwrap_unchecked_release() + }; + let simd_bytes = SimdVec::from(lane); + let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask(); - if valid_eols != 0 { - let pos = valid_eols.trailing_zeros() as usize; - if pos == SIMD_SIZE - 1 { - self.previous_valid_eols = 0; - } else { - self.previous_valid_eols = valid_eols >> (pos + 1) as u64; - } + let valid_eols = if self.quoting { + let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask(); + let mut not_in_quote_field = prefix_xorsum_inclusive(quote_mask); - unsafe { - let pos = self.total_index + pos; - debug_assert!((pos) <= self.v.len()); - - // return line up to this position - let ret = Some(self.v.get_unchecked(..pos)); - // skip the '\n' token and update slice. - self.v = self.v.get_unchecked_release(pos + 1..); - return ret; - } - } else { - self.total_index += SIMD_SIZE; + if not_in_field_previous_iter { + not_in_quote_field = !not_in_quote_field; } + not_in_field_previous_iter = (not_in_quote_field & (1 << (SIMD_SIZE - 1))) > 0; + eol_mask & not_in_quote_field } else { - // Denotes if we are in a string field, started with a quote - let mut in_field = !not_in_field_previous_iter; - let mut pos = 0u32; - let mut iter = bytes.iter(); - loop { - match iter.next() { - Some(&c) => { - pos += 1; - - if self.quoting && c == self.quote_char { - // toggle between string field enclosure - // if we encounter a starting '"' -> in_field = true; - // if we encounter a closing '"' -> in_field = false; - in_field = !in_field; - } - // if we are not in a string and we encounter '\n' we can stop at this position. - else if c == self.eol_char && !in_field { - break; - } - }, - None => { - let remainder = self.v; - self.v = &[]; - return Some(remainder); - }, - } + eol_mask + }; + + if valid_eols != 0 { + let pos = valid_eols.trailing_zeros() as usize; + if pos == SIMD_SIZE - 1 { + self.previous_valid_eols = 0; + } else { + self.previous_valid_eols = valid_eols >> (pos + 1) as u64; } unsafe { - debug_assert!((pos as usize) <= self.v.len()); + let pos = self.total_index + pos; + debug_assert!((pos) <= self.v.len()); // return line up to this position - let ret = Some( - self.v - .get_unchecked(..(self.total_index + pos as usize - 1)), - ); + let ret = Some(self.v.get_unchecked(..pos)); // skip the '\n' token and update slice. - self.v = self.v.get_unchecked(self.total_index + pos as usize..); + self.v = self.v.get_unchecked_release(pos + 1..); return ret; } + } else { + self.total_index += SIMD_SIZE; + } + } else { + // Denotes if we are in a string field, started with a quote + let mut in_field = !not_in_field_previous_iter; + let mut pos = 0u32; + let mut iter = bytes.iter(); + loop { + match iter.next() { + Some(&c) => { + pos += 1; + + if self.quoting && c == self.quote_char { + // toggle between string field enclosure + // if we encounter a starting '"' -> in_field = true; + // if we encounter a closing '"' -> in_field = false; + in_field = !in_field; + } + // if we are not in a string and we encounter '\n' we can stop at this position. + else if c == self.eol_char && !in_field { + break; + } + }, + None => { + let remainder = self.v; + self.v = &[]; + return Some(remainder); + }, + } + } + + unsafe { + debug_assert!((pos as usize) <= self.v.len()); + + // return line up to this position + let ret = Some( + self.v + .get_unchecked(..(self.total_index + pos as usize - 1)), + ); + // skip the '\n' token and update slice. + self.v = self.v.get_unchecked(self.total_index + pos as usize..); + return ret; } } } diff --git a/crates/polars-io/src/csv/read/read_impl.rs b/crates/polars-io/src/csv/read/read_impl.rs index 4e1574d2ead8..47fa19f2e95d 100644 --- a/crates/polars-io/src/csv/read/read_impl.rs +++ b/crates/polars-io/src/csv/read/read_impl.rs @@ -403,8 +403,13 @@ impl<'a> CoreReader<'a> { } let n_threads = self.n_threads.unwrap_or_else(|| POOL.current_num_threads()); - let n_parts_hint = n_threads * 32; - let chunk_size = std::cmp::min(bytes.len() / n_parts_hint, 1024 * 128); + + // This is chosen by benchmarking on ny city trip csv dataset. + // We want small enough chunks such that threads start working as soon as possible + // But we also want them large enough, so that we have less chunks related overhead, but + // We minimize chunks to 16 MB to still fit L3 cache. + let n_parts_hint = n_threads * 16; + let chunk_size = std::cmp::min(bytes.len() / n_parts_hint, 16 * 1024 * 1024); // Use a small min chunk size to catch failures in tests. #[cfg(debug_assertions)] diff --git a/crates/polars-io/src/csv/read/splitfields.rs b/crates/polars-io/src/csv/read/splitfields.rs index bf28c6ad05c1..490a0a27ebf0 100644 --- a/crates/polars-io/src/csv/read/splitfields.rs +++ b/crates/polars-io/src/csv/read/splitfields.rs @@ -142,8 +142,8 @@ mod inner { use polars_utils::slice::GetSaferUnchecked; use polars_utils::unwrap::UnwrapUncheckedRelease; - const SIMD_SIZE: usize = 16; - type SimdVec = u8x16; + const SIMD_SIZE: usize = 64; + type SimdVec = u8x64; /// An adapted version of std::iter::Split. /// This exists solely because we cannot split the lines naively as @@ -157,6 +157,7 @@ mod inner { simd_separator: SimdVec, simd_eol_char: SimdVec, simd_quote_char: SimdVec, + previous_valid_ends: u64, } impl<'a> SplitFields<'a> { @@ -182,6 +183,7 @@ mod inner { simd_separator, simd_eol_char, simd_quote_char, + previous_valid_ends: 0, } } @@ -195,6 +197,7 @@ mod inner { Some((self.v.get_unchecked(..idx), need_escaping)) } + #[inline] fn finish(&mut self, need_escaping: bool) -> Option<(&'a [u8], bool)> { self.finished = true; Some((self.v, need_escaping)) @@ -213,9 +216,31 @@ mod inner { fn next(&mut self) -> Option<(&'a [u8], bool)> { if self.finished { return None; - } else if self.v.is_empty() { + } + if self.v.is_empty() { return self.finish(false); } + if self.previous_valid_ends != 0 { + let pos = self.previous_valid_ends.trailing_zeros() as usize; + self.previous_valid_ends >>= (pos + 1) as u64; + + unsafe { + debug_assert!(pos < self.v.len()); + // SAFETY: + // we are in bounds + let bytes = self.v.get_unchecked_release(..pos); + self.v = self.v.get_unchecked_release(pos + 1..); + let ret = Some(( + bytes, + bytes + .first() + .map(|c| *c == self.quote_char && self.quoting) + .unwrap_or(false), + )); + + return ret; + } + } let mut needs_escaping = false; // There can be strings with separators: @@ -254,11 +279,19 @@ mod inner { end_mask &= not_in_quote_field; if end_mask != 0 { - total_idx += end_mask.trailing_zeros() as usize; + let pos = end_mask.trailing_zeros() as usize; + total_idx += pos; debug_assert!( self.v[total_idx] == self.eol_char || self.v[total_idx] == self.separator ); + + if pos == SIMD_SIZE - 1 { + self.previous_valid_ends = 0; + } else { + self.previous_valid_ends = end_mask >> (pos + 1) as u64; + } + break; } else { total_idx += SIMD_SIZE;