Skip to content

Commit

Permalink
add memoization and profile chunk sizes
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Oct 5, 2024
1 parent 92af4ae commit 1a1d3ab
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 96 deletions.
177 changes: 87 additions & 90 deletions crates/polars-io/src/csv/read/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,113 +441,110 @@ impl<'a> Iterator for SplitLines<'a> {
if self.v.is_empty() {
return None;
}
{
self.total_index = 0;
let mut not_in_field_previous_iter = true;
self.total_index = 0;
let mut not_in_field_previous_iter = true;

if self.previous_valid_eols != 0 {
let pos = self.previous_valid_eols.trailing_zeros() as usize;
self.previous_valid_eols >>= (pos + 1) as u64;
if self.previous_valid_eols != 0 {
let pos = self.previous_valid_eols.trailing_zeros() as usize;
self.previous_valid_eols >>= (pos + 1) as u64;

unsafe {
debug_assert!((pos) <= self.v.len());
unsafe {
debug_assert!((pos) <= self.v.len());

// return line up to this position
let ret = Some(self.v.get_unchecked(..pos));
// skip the '\n' token and update slice.
self.v = self.v.get_unchecked_release(pos + 1..);
return ret;
}
// return line up to this position
let ret = Some(self.v.get_unchecked(..pos));
// skip the '\n' token and update slice.
self.v = self.v.get_unchecked_release(pos + 1..);
return ret;
}
}

loop {
let bytes = unsafe { self.v.get_unchecked_release(self.total_index..) };
if bytes.len() > SIMD_SIZE {
let lane: [u8; SIMD_SIZE] = unsafe {
bytes
.get_unchecked(0..SIMD_SIZE)
.try_into()
.unwrap_unchecked_release()
};
let simd_bytes = SimdVec::from(lane);
let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();

let valid_eols = if self.quoting {
let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
let mut not_in_quote_field = prefix_xorsum_inclusive(quote_mask);

if not_in_field_previous_iter {
not_in_quote_field = !not_in_quote_field;
}
not_in_field_previous_iter =
(not_in_quote_field & (1 << (SIMD_SIZE - 1))) > 0;
eol_mask & not_in_quote_field
} else {
eol_mask
};
loop {
let bytes = unsafe { self.v.get_unchecked_release(self.total_index..) };
if bytes.len() > SIMD_SIZE {
let lane: [u8; SIMD_SIZE] = unsafe {
bytes
.get_unchecked(0..SIMD_SIZE)
.try_into()
.unwrap_unchecked_release()
};
let simd_bytes = SimdVec::from(lane);
let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();

if valid_eols != 0 {
let pos = valid_eols.trailing_zeros() as usize;
if pos == SIMD_SIZE - 1 {
self.previous_valid_eols = 0;
} else {
self.previous_valid_eols = valid_eols >> (pos + 1) as u64;
}
let valid_eols = if self.quoting {
let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
let mut not_in_quote_field = prefix_xorsum_inclusive(quote_mask);

unsafe {
let pos = self.total_index + pos;
debug_assert!((pos) <= self.v.len());

// return line up to this position
let ret = Some(self.v.get_unchecked(..pos));
// skip the '\n' token and update slice.
self.v = self.v.get_unchecked_release(pos + 1..);
return ret;
}
} else {
self.total_index += SIMD_SIZE;
if not_in_field_previous_iter {
not_in_quote_field = !not_in_quote_field;
}
not_in_field_previous_iter = (not_in_quote_field & (1 << (SIMD_SIZE - 1))) > 0;
eol_mask & not_in_quote_field
} else {
// Denotes if we are in a string field, started with a quote
let mut in_field = !not_in_field_previous_iter;
let mut pos = 0u32;
let mut iter = bytes.iter();
loop {
match iter.next() {
Some(&c) => {
pos += 1;

if self.quoting && c == self.quote_char {
// toggle between string field enclosure
// if we encounter a starting '"' -> in_field = true;
// if we encounter a closing '"' -> in_field = false;
in_field = !in_field;
}
// if we are not in a string and we encounter '\n' we can stop at this position.
else if c == self.eol_char && !in_field {
break;
}
},
None => {
let remainder = self.v;
self.v = &[];
return Some(remainder);
},
}
eol_mask
};

if valid_eols != 0 {
let pos = valid_eols.trailing_zeros() as usize;
if pos == SIMD_SIZE - 1 {
self.previous_valid_eols = 0;
} else {
self.previous_valid_eols = valid_eols >> (pos + 1) as u64;
}

unsafe {
debug_assert!((pos as usize) <= self.v.len());
let pos = self.total_index + pos;
debug_assert!((pos) <= self.v.len());

// return line up to this position
let ret = Some(
self.v
.get_unchecked(..(self.total_index + pos as usize - 1)),
);
let ret = Some(self.v.get_unchecked(..pos));
// skip the '\n' token and update slice.
self.v = self.v.get_unchecked(self.total_index + pos as usize..);
self.v = self.v.get_unchecked_release(pos + 1..);
return ret;
}
} else {
self.total_index += SIMD_SIZE;
}
} else {
// Denotes if we are in a string field, started with a quote
let mut in_field = !not_in_field_previous_iter;
let mut pos = 0u32;
let mut iter = bytes.iter();
loop {
match iter.next() {
Some(&c) => {
pos += 1;

if self.quoting && c == self.quote_char {
// toggle between string field enclosure
// if we encounter a starting '"' -> in_field = true;
// if we encounter a closing '"' -> in_field = false;
in_field = !in_field;
}
// if we are not in a string and we encounter '\n' we can stop at this position.
else if c == self.eol_char && !in_field {
break;
}
},
None => {
let remainder = self.v;
self.v = &[];
return Some(remainder);
},
}
}

unsafe {
debug_assert!((pos as usize) <= self.v.len());

// return line up to this position
let ret = Some(
self.v
.get_unchecked(..(self.total_index + pos as usize - 1)),
);
// skip the '\n' token and update slice.
self.v = self.v.get_unchecked(self.total_index + pos as usize..);
return ret;
}
}
}
Expand Down
9 changes: 7 additions & 2 deletions crates/polars-io/src/csv/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,8 +403,13 @@ impl<'a> CoreReader<'a> {
}

let n_threads = self.n_threads.unwrap_or_else(|| POOL.current_num_threads());
let n_parts_hint = n_threads * 32;
let chunk_size = std::cmp::min(bytes.len() / n_parts_hint, 1024 * 128);

// This is chosen by benchmarking on ny city trip csv dataset.
// We want small enough chunks such that threads start working as soon as possible
// But we also want them large enough, so that we have less chunks related overhead, but
// We minimize chunks to 16 MB to still fit L3 cache.
let n_parts_hint = n_threads * 16;
let chunk_size = std::cmp::min(bytes.len() / n_parts_hint, 16 * 1024 * 1024);

// Use a small min chunk size to catch failures in tests.
#[cfg(debug_assertions)]
Expand Down
41 changes: 37 additions & 4 deletions crates/polars-io/src/csv/read/splitfields.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ mod inner {
use polars_utils::slice::GetSaferUnchecked;
use polars_utils::unwrap::UnwrapUncheckedRelease;

const SIMD_SIZE: usize = 16;
type SimdVec = u8x16;
const SIMD_SIZE: usize = 64;
type SimdVec = u8x64;

/// An adapted version of std::iter::Split.
/// This exists solely because we cannot split the lines naively as
Expand All @@ -157,6 +157,7 @@ mod inner {
simd_separator: SimdVec,
simd_eol_char: SimdVec,
simd_quote_char: SimdVec,
previous_valid_ends: u64,
}

impl<'a> SplitFields<'a> {
Expand All @@ -182,6 +183,7 @@ mod inner {
simd_separator,
simd_eol_char,
simd_quote_char,
previous_valid_ends: 0,
}
}

Expand All @@ -195,6 +197,7 @@ mod inner {
Some((self.v.get_unchecked(..idx), need_escaping))
}

#[inline]
fn finish(&mut self, need_escaping: bool) -> Option<(&'a [u8], bool)> {
self.finished = true;
Some((self.v, need_escaping))
Expand All @@ -213,9 +216,31 @@ mod inner {
fn next(&mut self) -> Option<(&'a [u8], bool)> {
if self.finished {
return None;
} else if self.v.is_empty() {
}
if self.v.is_empty() {
return self.finish(false);
}
if self.previous_valid_ends != 0 {
let pos = self.previous_valid_ends.trailing_zeros() as usize;
self.previous_valid_ends >>= (pos + 1) as u64;

unsafe {
debug_assert!(pos < self.v.len());
// SAFETY:
// we are in bounds
let bytes = self.v.get_unchecked_release(..pos);
self.v = self.v.get_unchecked_release(pos + 1..);
let ret = Some((
bytes,
bytes
.first()
.map(|c| *c == self.quote_char && self.quoting)
.unwrap_or(false),
));

return ret;
}
}

let mut needs_escaping = false;
// There can be strings with separators:
Expand Down Expand Up @@ -254,11 +279,19 @@ mod inner {
end_mask &= not_in_quote_field;

if end_mask != 0 {
total_idx += end_mask.trailing_zeros() as usize;
let pos = end_mask.trailing_zeros() as usize;
total_idx += pos;
debug_assert!(
self.v[total_idx] == self.eol_char
|| self.v[total_idx] == self.separator
);

if pos == SIMD_SIZE - 1 {
self.previous_valid_ends = 0;
} else {
self.previous_valid_ends = end_mask >> (pos + 1) as u64;
}

break;
} else {
total_idx += SIMD_SIZE;
Expand Down

0 comments on commit 1a1d3ab

Please sign in to comment.