From fb5502b84251ce6adb83dedb7c0f9da15afd7c93 Mon Sep 17 00:00:00 2001 From: Marius S <39998+winding-lines@users.noreply.github.com> Date: Sun, 5 Mar 2023 07:46:06 -0800 Subject: [PATCH] wip --- polars/polars-io/src/parquet/async_manager.rs | 21 ++++++++++++------- polars/polars-io/src/parquet/read_impl.rs | 18 +++++++--------- 2 files changed, 21 insertions(+), 18 deletions(-) diff --git a/polars/polars-io/src/parquet/async_manager.rs b/polars/polars-io/src/parquet/async_manager.rs index 4099ef896b2b..d132bb68a7c7 100644 --- a/polars/polars-io/src/parquet/async_manager.rs +++ b/polars/polars-io/src/parquet/async_manager.rs @@ -10,9 +10,10 @@ use futures::channel::mpsc::Sender; use futures::channel::oneshot; use once_cell::sync::Lazy; use polars_core::prelude::*; -use tokio::runtime::Handle; +use tokio::runtime::Runtime; -use super::mmap::ColumnMapper; +use super::async_impl::ParquetObjectStore; +use super::mmap::CloudMapper; static GLOBAL_ASYNC_MANAGER: Lazy = Lazy::new(AsyncManager::default); @@ -47,7 +48,7 @@ enum AsyncParquetReaderMessage { /// The row groups to fetch. row_groups: Range, /// The channel to send the result to. - tx: oneshot::Sender>, + tx: oneshot::Sender>, }, } @@ -56,7 +57,7 @@ pub(crate) struct AsyncManager { /// The channel to communicate with the manager. tx: Sender, /// A handle to the Tokio runtime running the manager. - handle: Handle, + runtime: Runtime, /// Opened readers. readers: PlHashMap>, } @@ -64,12 +65,14 @@ pub(crate) struct AsyncManager { impl AsyncManager { /// Create a new async manager. pub fn new() -> AsyncManager { + use futures::stream::StreamExt; + let (tx, rx) = futures::channel::mpsc::channel(1); - let handle = tokio::runtime::Builder::new_multi_thread() + let runtime = tokio::runtime::Builder::new_multi_thread() .enable_all() .build() .unwrap(); - handle.spawn(async move { + runtime.spawn(async move { let mut reader = None; while let Some(message) = rx.next().await { match message { @@ -106,7 +109,11 @@ impl AsyncManager { } } }); - AsyncManager { tx, handle } + AsyncManager { + tx, + runtime, + readers: PlHashMap::new(), + } } } diff --git a/polars/polars-io/src/parquet/read_impl.rs b/polars/polars-io/src/parquet/read_impl.rs index 5243f6856748..241275fd546e 100644 --- a/polars/polars-io/src/parquet/read_impl.rs +++ b/polars/polars-io/src/parquet/read_impl.rs @@ -109,7 +109,7 @@ fn rg_to_dfs( let md = &file_metadata.row_groups[rg]; let current_row_count = md.num_rows() as IdxSize; - if use_statistics && !read_this_row_group(predicate.as_ref(), file_metadata, schema, rg)? { + if use_statistics && !read_this_row_group(predicate, file_metadata, schema, rg)? { *previous_row_count += current_row_count; continue; } @@ -363,11 +363,8 @@ impl FetchRowGroupsFromMmapReader { /// There is nothing to do when fetching a mmap-ed file. impl FetchRowGroups for FetchRowGroupsFromMmapReader { - fn fetch_row_groups( - &mut self, - _row_groups: Range, - ) -> PolarsResult { - Ok(mmap::CloudMapper::PassThrough(self.0.deref())) + fn fetch_row_groups(&mut self, _row_groups: Range) -> PolarsResult { + Ok(mmap::CloudMapper::PassThrough(self.0.deref())) } } @@ -377,7 +374,6 @@ pub struct BatchedParquetReader { row_group_fetcher: Box, limit: usize, projection: Vec, - predicate: Option>, schema: ArrowSchema, metadata: FileMetaData, row_count: Option, @@ -432,8 +428,8 @@ impl BatchedParquetReader { pub fn next_batches(&mut self, n: usize) -> PolarsResult>> { // fill up fifo stack if self.row_group_offset <= self.n_row_groups && self.chunks_fifo.len() < n { - let row_group_start = self.row_group_offset; - let row_group_end = std::cmp::min(self.row_group_offset + n, self.n_row_groups); + let row_group_start = self.row_group_offset; + let row_group_end = std::cmp::min(self.row_group_offset + n, self.n_row_groups); let store = self .row_group_fetcher .fetch_row_groups(row_group_start..row_group_end)?; @@ -473,8 +469,8 @@ impl BatchedParquetReader { self.row_group_offset += n; dfs } - _ => unimplemented!(), - }; + _ => unimplemented!(), + }; // TODO! this is slower than it needs to be // we also need to parallelize over row groups here.