Skip to content

Commit

Permalink
perf(rust, python): improve batched csv readers perf and memory perf (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Mar 3, 2023
1 parent 7c56fdb commit b542fee
Show file tree
Hide file tree
Showing 7 changed files with 210 additions and 55 deletions.
173 changes: 144 additions & 29 deletions polars/polars-io/src/csv/read_impl/batched.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,132 @@
use polars_core::config::verbose;
use std::collections::VecDeque;

use super::*;
use crate::csv::CsvReader;
use crate::mmap::MmapBytesReader;
use crate::prelude::update_row_counts2;

#[allow(clippy::too_many_arguments)]
pub(crate) fn get_file_chunks_iterator(
offsets: &mut VecDeque<(usize, usize)>,
last_pos: &mut usize,
n_chunks: usize,
chunk_size: usize,
bytes: &[u8],
expected_fields: usize,
delimiter: u8,
quote_char: Option<u8>,
eol_char: u8,
) {
for _ in 0..n_chunks {
let search_pos = *last_pos + chunk_size;

if search_pos >= bytes.len() {
break;
}

let end_pos = match next_line_position(
&bytes[search_pos..],
Some(expected_fields),
delimiter,
quote_char,
eol_char,
) {
Some(pos) => search_pos + pos,
None => {
break;
}
};
offsets.push_back((*last_pos, end_pos));
*last_pos = end_pos;
}
}

struct ChunkOffsetIter<'a> {
bytes: &'a [u8],
offsets: VecDeque<(usize, usize)>,
last_offset: usize,
n_chunks: usize,
// not a promise, but something we want
rows_per_batch: usize,
expected_fields: usize,
delimiter: u8,
quote_char: Option<u8>,
eol_char: u8,
}

impl<'a> Iterator for ChunkOffsetIter<'a> {
type Item = (usize, usize);

fn next(&mut self) -> Option<Self::Item> {
match self.offsets.pop_front() {
Some(offsets) => Some(offsets),
None => {
if self.last_offset == self.bytes.len() {
return None;
}
let bytes_first_row = if self.rows_per_batch > 1 {
let bytes_first_row = next_line_position(
&self.bytes[self.last_offset + 2..],
Some(self.expected_fields),
self.delimiter,
self.quote_char,
self.eol_char,
)
.unwrap_or(1);
bytes_first_row + 2
} else {
1
};
get_file_chunks_iterator(
&mut self.offsets,
&mut self.last_offset,
self.n_chunks,
self.rows_per_batch * bytes_first_row,
self.bytes,
self.expected_fields,
self.delimiter,
self.quote_char,
self.eol_char,
);
match self.offsets.pop_front() {
Some(offsets) => Some(offsets),
// We depleted the iterator. Ensure we deplete the slice as well
None => {
let out = Some((self.last_offset, self.bytes.len()));
self.last_offset = self.bytes.len();
out
}
}
}
}
}
}

impl<'a> CoreReader<'a> {
pub fn batched(mut self, _has_cat: bool) -> PolarsResult<BatchedCsvReader<'a>> {
let mut n_threads = self.n_threads.unwrap_or_else(|| POOL.current_num_threads());
let reader_bytes = self.reader_bytes.take().unwrap();
let logging = verbose();
let (file_chunks, chunk_size, _total_rows, starting_point_offset, _bytes) = self
.determine_file_chunks_and_statistics(&mut n_threads, &reader_bytes, logging, true)?;
let bytes = reader_bytes.as_ref();
let (bytes, starting_point_offset) = self.find_starting_point(bytes, self.eol_char)?;

// this is arbitrarily chosen.
// we don't want this to depend on the thread pool size
// otherwise the chunks are not deterministic
let offset_batch_size = 16;
// extend lifetime. It is bound to `readerbytes` and we keep track of that
// lifetime so this is sound.
let bytes = unsafe { std::mem::transmute::<&[u8], &'static [u8]>(bytes) };
let file_chunks = ChunkOffsetIter {
bytes,
offsets: VecDeque::with_capacity(offset_batch_size),
last_offset: 0,
n_chunks: offset_batch_size,
rows_per_batch: self.chunk_size,
expected_fields: self.schema.len(),
delimiter: self.delimiter,
quote_char: self.quote_char,
eol_char: self.eol_char,
};

let projection = self.get_projection();

let str_columns = self.get_string_columns(&projection)?;
Expand All @@ -28,10 +144,10 @@ impl<'a> CoreReader<'a> {

Ok(BatchedCsvReader {
reader_bytes,
chunk_size,
file_chunks,
chunk_offset: 0,
str_capacities: self.init_string_size_stats(&str_columns, chunk_size),
chunk_size: self.chunk_size,
file_chunks_iter: file_chunks,
file_chunks: vec![],
str_capacities: self.init_string_size_stats(&str_columns, self.chunk_size),
str_columns,
projection,
starting_point_offset,
Expand All @@ -56,8 +172,8 @@ impl<'a> CoreReader<'a> {
pub struct BatchedCsvReader<'a> {
reader_bytes: ReaderBytes<'a>,
chunk_size: usize,
file_chunks_iter: ChunkOffsetIter<'a>,
file_chunks: Vec<(usize, usize)>,
chunk_offset: IdxSize,
str_capacities: Vec<RunningSize>,
str_columns: StringColumns,
projection: Vec<usize>,
Expand All @@ -82,22 +198,25 @@ pub struct BatchedCsvReader<'a> {
}

impl<'a> BatchedCsvReader<'a> {
pub fn next_batches(&mut self, n: usize) -> PolarsResult<Option<Vec<(IdxSize, DataFrame)>>> {
pub fn next_batches(&mut self, n: usize) -> PolarsResult<Option<Vec<DataFrame>>> {
if n == 0 {
return Ok(None);
}
if self.chunk_offset == self.file_chunks.len() as IdxSize {
return Ok(None);
}
if let Some(n_rows) = self.n_rows {
if self.rows_read >= n_rows as IdxSize {
return Ok(None);
}
}
let end = std::cmp::min(self.chunk_offset as usize + n, self.file_chunks.len());

let chunks = &self.file_chunks[self.chunk_offset as usize..end];
self.chunk_offset = end as IdxSize;
// get next `n` offset positions.
let file_chunks_iter = (&mut self.file_chunks_iter).take(n);
self.file_chunks.extend(file_chunks_iter);
// depleted the offsets iterator, we are done as well.
if self.file_chunks.is_empty() {
return Ok(None);
}
let chunks = &self.file_chunks;

let mut bytes = self.reader_bytes.deref();
if let Some(pos) = self.starting_point_offset {
bytes = &bytes[pos..];
Expand Down Expand Up @@ -134,23 +253,19 @@ impl<'a> BatchedCsvReader<'a> {
if let Some(rc) = &self.row_count {
df.with_row_count_mut(&rc.name, Some(rc.offset));
}
let n_read = df.height() as IdxSize;
Ok((df, n_read))
Ok(df)
})
.collect::<PolarsResult<Vec<_>>>()
})?;
self.file_chunks.clear();

if self.row_count.is_some() {
update_row_counts(&mut chunks, self.rows_read)
update_row_counts2(&mut chunks, self.rows_read)
}
self.rows_read += chunks[chunks.len() - 1].1;
Ok(Some(
chunks
.into_iter()
.enumerate()
.map(|(i, t)| (i as IdxSize + self.chunk_offset, t.0))
.collect(),
))
for df in &chunks {
self.rows_read += df.height() as IdxSize;
}
Ok(Some(chunks))
}
}

Expand All @@ -166,7 +281,7 @@ unsafe impl Send for OwnedBatchedCsvReader {}
unsafe impl Sync for OwnedBatchedCsvReader {}

impl OwnedBatchedCsvReader {
pub fn next_batches(&mut self, n: usize) -> PolarsResult<Option<Vec<(IdxSize, DataFrame)>>> {
pub fn next_batches(&mut self, n: usize) -> PolarsResult<Option<Vec<DataFrame>>> {
let reader = unsafe { &mut *self.batched_reader };
reader.next_batches(n)
}
Expand Down
40 changes: 25 additions & 15 deletions polars/polars-io/src/csv/read_impl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,17 +341,14 @@ impl<'a> CoreReader<'a> {
Ok((bytes, starting_point_offset))
}

#[allow(clippy::type_complexity)]
fn determine_file_chunks_and_statistics(
/// Estimates number of rows and optionally ensure we don't read more than `n_rows`
/// by slicing `bytes` to the upper bound.
fn estimate_rows_and_set_upper_bound<'b>(
&self,
n_threads: &mut usize,
bytes: &'a [u8],
mut bytes: &'b [u8],
logging: bool,
streaming: bool,
) -> PolarsResult<(Vec<(usize, usize)>, usize, usize, Option<usize>, &'a [u8])> {
// Make the variable mutable so that we can reassign the sliced file to this variable.
let (mut bytes, starting_point_offset) = self.find_starting_point(bytes, self.eol_char)?;

set_upper_bound: bool,
) -> (&'b [u8], usize) {
// initial row guess. We use the line statistic to guess the number of rows to allocate
let mut total_rows = 128;

Expand Down Expand Up @@ -388,14 +385,30 @@ impl<'a> CoreReader<'a> {
self.quote_char,
self.eol_char,
) {
bytes = &bytes[..n_bytes + pos]
if set_upper_bound {
bytes = &bytes[..n_bytes + pos]
}
}
}
}
if logging {
eprintln!("initial row estimate: {total_rows}")
}
}
(bytes, total_rows)
}

#[allow(clippy::type_complexity)]
fn determine_file_chunks_and_statistics(
&self,
n_threads: &mut usize,
bytes: &'a [u8],
logging: bool,
) -> PolarsResult<(Vec<(usize, usize)>, usize, usize, Option<usize>, &'a [u8])> {
// Make the variable mutable so that we can reassign the sliced file to this variable.
let (bytes, starting_point_offset) = self.find_starting_point(bytes, self.eol_char)?;

let (bytes, total_rows) = self.estimate_rows_and_set_upper_bound(bytes, logging, true);
if total_rows == 128 {
*n_threads = 1;

Expand All @@ -405,9 +418,7 @@ impl<'a> CoreReader<'a> {
}

let chunk_size = std::cmp::min(self.chunk_size, total_rows);
let n_chunks = total_rows / chunk_size;

let n_file_chunks = if streaming { n_chunks } else { *n_threads };
let n_file_chunks = *n_threads;

// split the file by the nearest new line characters such that every thread processes
// approximately the same number of rows.
Expand All @@ -430,7 +441,6 @@ impl<'a> CoreReader<'a> {

Ok((chunks, chunk_size, total_rows, starting_point_offset, bytes))
}

fn get_projection(&mut self) -> Vec<usize> {
// we also need to sort the projection to have predictable output.
// the `parse_lines` function expects this.
Expand Down Expand Up @@ -487,7 +497,7 @@ impl<'a> CoreReader<'a> {
) -> PolarsResult<DataFrame> {
let logging = verbose();
let (file_chunks, chunk_size, total_rows, starting_point_offset, bytes) =
self.determine_file_chunks_and_statistics(&mut n_threads, bytes, logging, false)?;
self.determine_file_chunks_and_statistics(&mut n_threads, bytes, logging)?;
let projection = self.get_projection();
let str_columns = self.get_string_columns(&projection)?;

Expand Down
1 change: 1 addition & 0 deletions polars/polars-io/src/csv/write_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ fn write_anyvalue(
))?,
}
.map_err(|err| match value {
#[cfg(feature = "dtype-datetime")]
AnyValue::Datetime(_, _, tz) => {
// If this is a datetime, then datetime_format was either set or inferred.
let datetime_format = datetime_format.unwrap();
Expand Down
15 changes: 15 additions & 0 deletions polars/polars-io/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,21 @@ pub(crate) fn update_row_counts(dfs: &mut [(DataFrame, IdxSize)], offset: IdxSiz
}
}

/// Because of threading every row starts from `0` or from `offset`.
/// We must correct that so that they are monotonically increasing.
pub(crate) fn update_row_counts2(dfs: &mut [DataFrame], offset: IdxSize) {
if !dfs.is_empty() {
let mut previous = dfs[0].height() as IdxSize + offset;
for df in &mut dfs[1..] {
let n_read = df.height() as IdxSize;
if let Some(s) = df.get_columns_mut().get_mut(0) {
*s = &*s + previous;
}
previous += n_read;
}
}
}

#[cfg(test)]
mod tests {
use std::path::PathBuf;
Expand Down
11 changes: 10 additions & 1 deletion polars/polars-lazy/polars-pipe/src/executors/sources/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub(crate) struct CsvSource {
reader: *mut CsvReader<'static, File>,
batched_reader: *mut BatchedCsvReader<'static>,
n_threads: usize,
chunk_index: IdxSize,
}

impl CsvSource {
Expand Down Expand Up @@ -80,6 +81,7 @@ impl CsvSource {
reader,
batched_reader,
n_threads: POOL.current_num_threads(),
chunk_index: 0,
})
}
}
Expand All @@ -106,7 +108,14 @@ impl Source for CsvSource {
Some(batches) => SourceResult::GotMoreData(
batches
.into_iter()
.map(|(chunk_index, data)| DataChunk { chunk_index, data })
.map(|data| {
let out = DataChunk {
chunk_index: self.chunk_index,
data,
};
self.chunk_index += 1;
out
})
.collect(),
),
})
Expand Down
6 changes: 5 additions & 1 deletion py-polars/src/batched_csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ impl PyBatchedCsv {

fn next_batches(&mut self, n: usize) -> PyResult<Option<Vec<PyDataFrame>>> {
let batches = self.reader.next_batches(n).map_err(PyPolarsErr::from)?;
Ok(batches.map(|batches| batches.into_iter().map(|out| out.1.into()).collect()))
// safety: same memory layout
let batches = unsafe {
std::mem::transmute::<Option<Vec<DataFrame>>, Option<Vec<PyDataFrame>>>(batches)
};
Ok(batches)
}
}
Loading

0 comments on commit b542fee

Please sign in to comment.