From 67b4e96c4a98965d4529efee048d7cbd7f3dd2c2 Mon Sep 17 00:00:00 2001 From: Marius S <39998+winding-lines@users.noreply.github.com> Date: Sat, 4 Mar 2023 16:25:27 -0800 Subject: [PATCH 1/2] feat(rust,python): Enable the object_store integration in python --- examples/read_parquet_cloud/src/main.rs | 2 + polars/polars-core/Cargo.toml | 2 +- polars/polars-io/Cargo.toml | 4 +- polars/polars-io/src/cloud/glob.rs | 11 +- polars/polars-io/src/lib.rs | 7 +- polars/polars-io/src/parquet/async_impl.rs | 31 +- polars/polars-io/src/parquet/async_manager.rs | 117 +++++ polars/polars-io/src/parquet/mmap.rs | 18 +- polars/polars-io/src/parquet/mod.rs | 3 + polars/polars-io/src/parquet/read.rs | 38 +- polars/polars-io/src/parquet/read_impl.rs | 89 ++-- polars/polars-lazy/polars-plan/Cargo.toml | 2 +- .../polars-plan/src/logical_plan/builder.rs | 54 ++- .../src/physical_plan/executors/scan/ipc.rs | 4 +- .../src/physical_plan/executors/scan/mod.rs | 7 +- py-polars/Cargo.lock | 417 ++++++++++++++++++ py-polars/Cargo.toml | 7 + py-polars/polars/internals/lazyframe/frame.py | 16 +- py-polars/src/lib.rs | 11 + 19 files changed, 760 insertions(+), 80 deletions(-) create mode 100644 polars/polars-io/src/parquet/async_manager.rs diff --git a/examples/read_parquet_cloud/src/main.rs b/examples/read_parquet_cloud/src/main.rs index e179266e1de3..6b371ca275b1 100644 --- a/examples/read_parquet_cloud/src/main.rs +++ b/examples/read_parquet_cloud/src/main.rs @@ -22,6 +22,8 @@ fn main() -> PolarsResult<()> { .select([ // select all columns all(), + // and do some aggregations + cols(["fats_g", "sugars_g"]).sum().suffix("_summed"), ]) .collect()?; diff --git a/polars/polars-core/Cargo.toml b/polars/polars-core/Cargo.toml index 407fd01bb0b4..d1a7ff0ffe15 100644 --- a/polars/polars-core/Cargo.toml +++ b/polars/polars-core/Cargo.toml @@ -162,7 +162,7 @@ indexmap = { version = "1", features = ["std"] } itoap = { version = "1", optional = true } ndarray = { version = "0.15", optional = true, default_features = false } num-traits.workspace = true -object_store = { version = "0.5.3", default-features = false, optional = true } +object_store = { version = "0.5.5", default-features = false, optional = true } once_cell.workspace = true polars-arrow = { version = "0.27.2", path = "../polars-arrow", features = ["compute"] } polars-error = { version = "0.27.2", path = "../polars-error" } diff --git a/polars/polars-io/Cargo.toml b/polars/polars-io/Cargo.toml index 08ad00fe5500..2c92fb4c00c1 100644 --- a/polars/polars-io/Cargo.toml +++ b/polars/polars-io/Cargo.toml @@ -64,7 +64,7 @@ lexical-core = { version = "0.8", optional = true } memchr.workspace = true memmap = { package = "memmap2", version = "0.5.2", optional = true } num-traits.workspace = true -object_store = { version = "0.5.3", default-features = false, optional = true } +object_store = { version = "0.5.5", default-features = false, optional = true } once_cell = "1" polars-arrow = { version = "0.27.2", path = "../polars-arrow" } polars-core = { version = "0.27.2", path = "../polars-core", features = ["private"], default-features = false } @@ -76,7 +76,7 @@ serde = { version = "1", features = ["derive"], optional = true } serde_json = { version = "1", optional = true, default-features = false, features = ["alloc", "raw_value"] } simd-json = { version = "0.7.0", optional = true, features = ["allow-non-simd", "known-key"] } simdutf8 = "0.1" -tokio = { version = "1.22.0", features = ["net"], optional = true } +tokio = { version = "1.24.0", features = ["net", "rt-multi-thread"], optional = true } url = { version = "2.3.1", optional = true } [dev-dependencies] diff --git a/polars/polars-io/src/cloud/glob.rs b/polars/polars-io/src/cloud/glob.rs index 344600f35514..4b86ba1ebdd8 100644 --- a/polars/polars-io/src/cloud/glob.rs +++ b/polars/polars-io/src/cloud/glob.rs @@ -149,7 +149,6 @@ impl Matcher { } } -#[tokio::main(flavor = "current_thread")] /// List files with a prefix derived from the pattern. pub async fn glob(url: &str, cloud_options: Option<&CloudOptions>) -> PolarsResult> { // Find the fixed prefix, up to the first '*'. @@ -164,6 +163,9 @@ pub async fn glob(url: &str, cloud_options: Option<&CloudOptions>) -> PolarsResu store, ) = super::build(url, cloud_options)?; let matcher = Matcher::new(prefix.clone(), expansion.as_deref())?; + if expansion.is_none() { + return Ok(vec![url.into()]); + } let list_stream = store .list(Some(&Path::from(prefix))) @@ -260,6 +262,13 @@ mod test { assert!(!a.is_matching(&Path::from("folder/1parquet"))); // Intermediary folders are not allowed. assert!(!a.is_matching(&Path::from("folder/other/1.parquet"))); + + // Match full name. + let cloud_location = CloudLocation::new("s3://bucket/folder/some.parquet").unwrap(); + let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap(); + // Regular match. + assert!(a.is_matching(&Path::from("folder/some.parquet"))); + assert!(!a.is_matching(&Path::from("folder/other.parquet"))); } #[test] diff --git a/polars/polars-io/src/lib.rs b/polars/polars-io/src/lib.rs index c30c3768d7e7..94a5436e30fa 100644 --- a/polars/polars-io/src/lib.rs +++ b/polars/polars-io/src/lib.rs @@ -41,12 +41,14 @@ pub mod partition; use std::io::{Read, Seek, Write}; use std::path::{Path, PathBuf}; +use std::str::FromStr; #[allow(unused)] // remove when updating to rust nightly >= 1.61 use arrow::array::new_empty_array; use arrow::error::Result as ArrowResult; pub use options::*; use polars_core::config::verbose; +use polars_core::cloud::CloudType; use polars_core::frame::ArrowChunk; use polars_core::prelude::*; @@ -173,7 +175,6 @@ pub(crate) fn finish_reader( /// Check if the path is a cloud url. pub fn is_cloud_url>(p: P) -> bool { - p.as_ref().starts_with("s3://") - || p.as_ref().starts_with("file://") - || p.as_ref().starts_with("gcs://") + let path = p.as_ref(); + return CloudType::from_str(&path.to_string_lossy()).is_ok(); } diff --git a/polars/polars-io/src/parquet/async_impl.rs b/polars/polars-io/src/parquet/async_impl.rs index 01befe97ecf4..26a65df4dcd2 100644 --- a/polars/polars-io/src/parquet/async_impl.rs +++ b/polars/polars-io/src/parquet/async_impl.rs @@ -8,7 +8,7 @@ use arrow::io::parquet::read::{ use arrow::io::parquet::write::FileMetaData; use futures::future::BoxFuture; use futures::lock::Mutex; -use futures::{stream, StreamExt, TryFutureExt, TryStreamExt}; +use futures::{stream, StreamExt, TryFutureExt, TryStreamExt, FutureExt}; use object_store::path::Path as ObjectPath; use object_store::ObjectStore; use polars_core::cloud::CloudOptions; @@ -20,7 +20,7 @@ use polars_core::schema::Schema; use super::cloud::{build, CloudLocation, CloudReader}; use super::mmap; -use super::mmap::ColumnStore; +use super::mmap::CloudMapper; use super::read_impl::FetchRowGroups; pub struct ParquetObjectStore { @@ -104,7 +104,6 @@ type RowGroupChunks<'a> = Vec)>>; /// Download rowgroups for the column whose indexes are given in `projection`. /// We concurrently download the columns for each field. -#[tokio::main(flavor = "current_thread")] async fn download_projection<'a: 'b, 'b>( projection: &[usize], row_groups: &'a [RowGroupMetaData], @@ -149,7 +148,7 @@ pub(crate) struct FetchRowGroupsFromObjectStore { row_groups_metadata: Vec, projection: Vec, logging: bool, - schema: ArrowSchema, + pub schema: ArrowSchema, } impl FetchRowGroupsFromObjectStore { @@ -173,11 +172,12 @@ impl FetchRowGroupsFromObjectStore { schema, }) } -} -impl FetchRowGroups for FetchRowGroupsFromObjectStore { - fn fetch_row_groups(&mut self, row_groups: Range) -> PolarsResult { - // Fetch the required row groups. + /// Fetch the required row groups asynchronously. + pub async fn fetch_row_groups_async( + &mut self, + row_groups: Range, + ) -> PolarsResult { let row_groups = &self .row_groups_metadata .get(row_groups.clone()) @@ -194,9 +194,9 @@ impl FetchRowGroups for FetchRowGroupsFromObjectStore { Ok, )?; - // Package in the format required by ColumnStore. + // Package in the format required by ColumnAccess. let downloaded = - download_projection(&self.projection, row_groups, &self.schema, &self.reader)?; + download_projection(&self.projection, row_groups, &self.schema, &self.reader).await?; if self.logging { eprintln!( "BatchedParquetReader: fetched {} row_groups for {} fields, yielding {} column chunks.", @@ -224,6 +224,15 @@ impl FetchRowGroups for FetchRowGroupsFromObjectStore { ); } - Ok(mmap::ColumnStore::Fetched(downloaded_per_filepos)) + Ok(mmap::CloudMapper::Fetched(downloaded_per_filepos)) + } +} + +impl FetchRowGroups for FetchRowGroupsFromObjectStore { + fn fetch_row_groups<'a>( + &'a mut self, + row_groups: Range, + ) -> BoxFuture<'a, PolarsResult> { + self.fetch_row_groups_async(row_groups).boxed() } } diff --git a/polars/polars-io/src/parquet/async_manager.rs b/polars/polars-io/src/parquet/async_manager.rs new file mode 100644 index 000000000000..4099ef896b2b --- /dev/null +++ b/polars/polars-io/src/parquet/async_manager.rs @@ -0,0 +1,117 @@ +//! Polars is a heavily multi-threaded library. Some IO operations, specially cloud based ones, +//! are best served by an async module. The AsyncManager owns a multi-threaded Tokio runtime +//! and is responsible for managing the async calls to the object store and the associated state. + +use std::ops::Range; + +use arrow::io::parquet::read::RowGroupMetaData; +use arrow::io::parquet::write::FileMetaData; +use futures::channel::mpsc::Sender; +use futures::channel::oneshot; +use once_cell::sync::Lazy; +use polars_core::prelude::*; +use tokio::runtime::Handle; + +use super::mmap::ColumnMapper; + +static GLOBAL_ASYNC_MANAGER: Lazy = Lazy::new(AsyncManager::default); + +enum AsyncParquetReaderMessage { + /// Fetch the metadata of the parquet file, do not memoize it. + FetchMetadata { + /// The channel to send the result to. + tx: oneshot::Sender>, + }, + /// Fetch and memoize the metadata of the parquet file. + GetMetadata { + /// The channel to send the result to. + tx: oneshot::Sender>, + }, + /// Fetch the number of rows of the parquet file. + NumRows { + /// The channel to send the result to. + tx: oneshot::Sender>, + }, + /// Fetch the schema of the parquet file. + Schema { + /// The channel to send the result to. + tx: oneshot::Sender>, + }, + /// Fetch the row groups of the parquet file. + RowGroups { + /// The channel to send the result to. + tx: oneshot::Sender>>, + }, + /// Fetch the row groups of the parquet file. + FetchRowGroups { + /// The row groups to fetch. + row_groups: Range, + /// The channel to send the result to. + tx: oneshot::Sender>, + }, +} + +/// Separate the async calls in their own manager and interact with the rest of the code with a channel. +pub(crate) struct AsyncManager { + /// The channel to communicate with the manager. + tx: Sender, + /// A handle to the Tokio runtime running the manager. + handle: Handle, + /// Opened readers. + readers: PlHashMap>, +} + +impl AsyncManager { + /// Create a new async manager. + pub fn new() -> AsyncManager { + let (tx, rx) = futures::channel::mpsc::channel(1); + let handle = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + handle.spawn(async move { + let mut reader = None; + while let Some(message) = rx.next().await { + match message { + AsyncParquetReaderMessage::FetchMetadata { tx } => { + let reader = reader.as_mut().unwrap(); + let result = reader.fetch_metadata().await; + tx.send(result).unwrap(); + } + AsyncParquetReaderMessage::GetMetadata { tx } => { + let reader = reader.as_mut().unwrap(); + let result = reader.get_metadata().await; + tx.send(result).unwrap(); + } + AsyncParquetReaderMessage::NumRows { tx } => { + let reader = reader.as_mut().unwrap(); + let result = reader.num_rows().await; + tx.send(result).unwrap(); + } + AsyncParquetReaderMessage::Schema { tx } => { + let reader = reader.as_mut().unwrap(); + let result = reader.schema().await; + tx.send(result).unwrap(); + } + AsyncParquetReaderMessage::RowGroups { tx } => { + let reader = reader.as_mut().unwrap(); + let result = reader.row_groups().await; + tx.send(result).unwrap(); + } + AsyncParquetReaderMessage::FetchRowGroups { row_groups, tx } => { + let reader = reader.as_mut().unwrap(); + let result = reader.fetch_row_groups(row_groups).await; + tx.send(result).unwrap(); + } + } + } + }); + AsyncManager { tx, handle } + } +} + +impl Default for AsyncManager { + fn default() -> Self { + AsyncManager::new() + } +} diff --git a/polars/polars-io/src/parquet/mmap.rs b/polars/polars-io/src/parquet/mmap.rs index c3891b207c34..1dae25b7ac90 100644 --- a/polars/polars-io/src/parquet/mmap.rs +++ b/polars/polars-io/src/parquet/mmap.rs @@ -8,8 +8,8 @@ use polars_core::datatypes::PlHashMap; use super::*; -/// Store columns data in two scenarios: -/// 1. a local memory mapped file +/// Map column data in two scenarios: +/// 1. a local memory mapped file, there is nothing to do in this case. /// 2. data fetched from cloud storage on demand, in this case /// a. the key in the hashmap is the start in the file /// b. the value in the hashmap is the actual data. @@ -19,8 +19,8 @@ use super::*; /// b. asynchronously fetch them in parallel, for example using object_store /// c. store the data in this data structure /// d. when all the data is available deserialize on multiple threads, for example using rayon -pub enum ColumnStore<'a> { - Local(&'a [u8]), +pub enum CloudMapper<'a> { + PassThrough(&'a [u8]), #[cfg(feature = "async")] Fetched(PlHashMap>), } @@ -28,7 +28,7 @@ pub enum ColumnStore<'a> { /// For local files memory maps all columns that are part of the parquet field `field_name`. /// For cloud files the relevant memory regions should have been prefetched. pub(super) fn mmap_columns<'a>( - store: &'a ColumnStore, + store: &'a CloudMapper, columns: &'a [ColumnChunkMetaData], field_name: &str, ) -> Vec<(&'a ColumnChunkMetaData, &'a [u8])> { @@ -39,17 +39,17 @@ pub(super) fn mmap_columns<'a>( } fn _mmap_single_column<'a>( - store: &'a ColumnStore, + store: &'a CloudMapper, meta: &'a ColumnChunkMetaData, ) -> (&'a ColumnChunkMetaData, &'a [u8]) { let (start, len) = meta.byte_range(); let chunk = match store { - ColumnStore::Local(file) => &file[start as usize..(start + len) as usize], + CloudMapper::PassThrough(file) => &file[start as usize..(start + len) as usize], #[cfg(all(feature = "async", feature = "parquet"))] - ColumnStore::Fetched(fetched) => { + CloudMapper::Fetched(fetched) => { let entry = fetched.get(&start).unwrap_or_else(|| { panic!( - "mmap_columns: column with start {start} must be prefetched in ColumnStore.\n" + "mmap_columns: column with start {start} must be prefetched in ColumnAccess.\n" ) }); entry.as_slice() diff --git a/polars/polars-io/src/parquet/mod.rs b/polars/polars-io/src/parquet/mod.rs index 95da99e48b53..25393d35256c 100644 --- a/polars/polars-io/src/parquet/mod.rs +++ b/polars/polars-io/src/parquet/mod.rs @@ -16,6 +16,9 @@ //! #[cfg(feature = "async")] pub(super) mod async_impl; +#[cfg(feature = "async")] +pub(super) mod async_manager; + pub(super) mod mmap; pub mod predicates; mod read; diff --git a/polars/polars-io/src/parquet/read.rs b/polars/polars-io/src/parquet/read.rs index 25132fa04400..5796059bf2d0 100644 --- a/polars/polars-io/src/parquet/read.rs +++ b/polars/polars-io/src/parquet/read.rs @@ -9,7 +9,7 @@ use polars_core::prelude::*; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; -use super::read_impl::FetchRowGroupsFromMmapReader; +use super::read_impl::{read_parquet_from_cloud_mapper, FetchRowGroupsFromMmapReader}; use crate::mmap::MmapBytesReader; #[cfg(feature = "async")] use crate::parquet::async_impl::FetchRowGroupsFromObjectStore; @@ -160,6 +160,7 @@ impl ParquetReader { metadata, self.n_rows.unwrap_or(usize::MAX), self.projection, + None, self.row_count, chunk_size, self.use_statistics, @@ -228,6 +229,7 @@ pub struct ParquetAsyncReader { row_count: Option, low_memory: bool, use_statistics: bool, + predicate: Option>, } #[cfg(feature = "async")] @@ -244,11 +246,11 @@ impl ParquetAsyncReader { row_count: None, low_memory: false, use_statistics: true, + predicate: None, }) } - /// Fetch the file info in a synchronous way to for the query planning phase. - #[tokio::main(flavor = "current_thread")] + /// Fetch the file info from the cloud storage. pub async fn file_info( uri: &str, options: Option<&CloudOptions>, @@ -298,7 +300,7 @@ impl ParquetAsyncReader { self } - #[tokio::main(flavor = "current_thread")] + #[tokio::main(flavor = "multi_thread")] pub async fn batched(mut self, chunk_size: usize) -> PolarsResult { let metadata = self.reader.get_metadata().await?.to_owned(); let row_group_fetcher = Box::new(FetchRowGroupsFromObjectStore::new( @@ -311,9 +313,37 @@ impl ParquetAsyncReader { metadata, self.n_rows.unwrap_or(usize::MAX), self.projection, + self.predicate, self.row_count, chunk_size, self.use_statistics, ) } + + /// Actually read the parquet file. + pub async fn finish(mut self, rechunk: bool) -> PolarsResult { + let metadata = self.reader.get_metadata().await?.to_owned(); + let mut row_group_fetcher = + FetchRowGroupsFromObjectStore::new(self.reader, &metadata, &self.projection)?; + let schema = row_group_fetcher.schema.clone(); + let column_store = row_group_fetcher + .fetch_row_groups_async(0..metadata.row_groups.len()) + .await?; + read_parquet_from_cloud_mapper( + &column_store, + self.n_rows.unwrap_or(usize::MAX), + self.projection.as_deref(), + &schema, + metadata, + self.predicate, + Default::default(), + self.row_count, + ) + .map(|mut df| { + if rechunk { + df.rechunk(); + }; + df + }) + } } diff --git a/polars/polars-io/src/parquet/read_impl.rs b/polars/polars-io/src/parquet/read_impl.rs index 840dd3b41015..5243f6856748 100644 --- a/polars/polars-io/src/parquet/read_impl.rs +++ b/polars/polars-io/src/parquet/read_impl.rs @@ -7,12 +7,14 @@ use std::sync::Arc; use arrow::array::new_empty_array; use arrow::io::parquet::read; use arrow::io::parquet::read::{ArrayIter, FileMetaData, RowGroupMetaData}; +use futures::future::BoxFuture; +use futures::TryFutureExt; use polars_core::prelude::*; use polars_core::utils::{accumulate_dataframes_vertical, split_df}; use polars_core::POOL; use rayon::prelude::*; -use super::mmap::ColumnStore; +use super::mmap::CloudMapper; use crate::mmap::{MmapBytesReader, ReaderBytes}; use crate::parquet::mmap::mmap_columns; use crate::parquet::predicates::read_this_row_group; @@ -27,7 +29,7 @@ fn column_idx_to_series( md: &RowGroupMetaData, remaining_rows: usize, schema: &ArrowSchema, - store: &mmap::ColumnStore, + store: &mmap::CloudMapper, chunk_size: usize, ) -> PolarsResult { let mut field = schema.fields[column_i].clone(); @@ -88,14 +90,14 @@ pub(super) fn array_iter_to_series( #[allow(clippy::too_many_arguments)] // might parallelize over columns fn rg_to_dfs( - store: &mmap::ColumnStore, + store: &mmap::CloudMapper, previous_row_count: &mut IdxSize, row_group_start: usize, row_group_end: usize, remaining_rows: &mut usize, file_metadata: &FileMetaData, schema: &ArrowSchema, - predicate: Option>, + predicate: Option<&Arc>, row_count: Option, parallel: ParallelStrategy, projection: &[usize], @@ -165,14 +167,14 @@ fn rg_to_dfs( #[allow(clippy::too_many_arguments)] // parallelizes over row groups fn rg_to_dfs_par( - store: &mmap::ColumnStore, + store: &mmap::CloudMapper, row_group_start: usize, row_group_end: usize, previous_row_count: &mut IdxSize, remaining_rows: &mut usize, file_metadata: &FileMetaData, schema: &ArrowSchema, - predicate: Option>, + predicate: Option<&Arc>, row_count: Option, projection: &[usize], use_statistics: bool, @@ -224,7 +226,7 @@ fn rg_to_dfs_par( df.with_row_count_mut(&rc.name, Some(row_count_start as IdxSize + rc.offset)); } - apply_predicate(&mut df, predicate.as_deref(), false)?; + apply_predicate(&mut df, predicate, false)?; Ok(Some(df)) }) @@ -233,21 +235,18 @@ fn rg_to_dfs_par( } #[allow(clippy::too_many_arguments)] -pub fn read_parquet( - mut reader: R, +pub fn read_parquet_from_cloud_mapper( + cloud_mapper: &mmap::CloudMapper, mut limit: usize, projection: Option<&[usize]>, schema: &ArrowSchema, - metadata: Option, + metadata: FileMetaData, predicate: Option>, mut parallel: ParallelStrategy, row_count: Option, use_statistics: bool, ) -> PolarsResult { - let file_metadata = metadata - .map(Ok) - .unwrap_or_else(|| read::read_metadata(&mut reader))?; - let row_group_len = file_metadata.row_groups.len(); + let row_group_len = metadata.row_groups.len(); let projection = projection .map(Cow::Borrowed) @@ -265,33 +264,30 @@ pub fn read_parquet( parallel = ParallelStrategy::None; } - let reader = ReaderBytes::from(&reader); - let bytes = reader.deref(); - let store = mmap::ColumnStore::Local(bytes); let dfs = match parallel { ParallelStrategy::Columns | ParallelStrategy::None => rg_to_dfs( - &store, + cloud_mapper, &mut 0, 0, row_group_len, &mut limit, - &file_metadata, + &metadata, schema, - predicate, + predicate.as_ref(), row_count, parallel, &projection, use_statistics, )?, ParallelStrategy::RowGroups => rg_to_dfs_par( - &store, + cloud_mapper, 0, - file_metadata.row_groups.len(), + row_group_len, &mut 0, &mut limit, - &file_metadata, + &metadata, schema, - predicate, + predicate.as_ref(), row_count, &projection, use_statistics, @@ -312,6 +308,35 @@ pub fn read_parquet( } } +#[allow(clippy::too_many_arguments)] +pub fn read_parquet( + mut reader: R, + limit: usize, + projection: Option<&[usize]>, + schema: &ArrowSchema, + metadata: Option, + predicate: Option>, + parallel: ParallelStrategy, + row_count: Option, +) -> PolarsResult { + let file_metadata = metadata + .map(Ok) + .unwrap_or_else(|| read::read_metadata(&mut reader))?; + let reader = ReaderBytes::from(&reader); + let bytes = reader.deref(); + let column_store = mmap::CloudMapper::PassThrough(bytes); + + read_parquet_from_cloud_mapper( + &column_store, + limit, + projection, + schema, + file_metadata, + predicate, + parallel, + row_count, + ) +} /// Provide RowGroup content to the BatchedReader. /// This allows us to share the code to do in-memory processing for different use cases. pub trait FetchRowGroups: Sync + Send { @@ -338,8 +363,11 @@ impl FetchRowGroupsFromMmapReader { /// There is nothing to do when fetching a mmap-ed file. impl FetchRowGroups for FetchRowGroupsFromMmapReader { - fn fetch_row_groups(&mut self, _row_groups: Range) -> PolarsResult { - Ok(mmap::ColumnStore::Local(self.0.deref())) + fn fetch_row_groups( + &mut self, + _row_groups: Range, + ) -> PolarsResult { + Ok(mmap::CloudMapper::PassThrough(self.0.deref())) } } @@ -349,6 +377,7 @@ pub struct BatchedParquetReader { row_group_fetcher: Box, limit: usize, projection: Vec, + predicate: Option>, schema: ArrowSchema, metadata: FileMetaData, row_count: Option, @@ -403,8 +432,8 @@ impl BatchedParquetReader { pub fn next_batches(&mut self, n: usize) -> PolarsResult>> { // fill up fifo stack if self.row_group_offset <= self.n_row_groups && self.chunks_fifo.len() < n { - let row_group_start = self.row_group_offset; - let row_group_end = std::cmp::min(self.row_group_offset + n, self.n_row_groups); + let row_group_start = self.row_group_offset; + let row_group_end = std::cmp::min(self.row_group_offset + n, self.n_row_groups); let store = self .row_group_fetcher .fetch_row_groups(row_group_start..row_group_end)?; @@ -444,8 +473,8 @@ impl BatchedParquetReader { self.row_group_offset += n; dfs } - _ => unimplemented!(), - }; + _ => unimplemented!(), + }; // TODO! this is slower than it needs to be // we also need to parallelize over row groups here. diff --git a/polars/polars-lazy/polars-plan/Cargo.toml b/polars/polars-lazy/polars-plan/Cargo.toml index 3a65abd0d9d7..a895c24ea92c 100644 --- a/polars/polars-lazy/polars-plan/Cargo.toml +++ b/polars/polars-lazy/polars-plan/Cargo.toml @@ -34,7 +34,7 @@ compile = [] default = ["compile", "private"] streaming = [] parquet = ["polars-core/parquet", "polars-io/parquet"] -async = [] +async = ["futures"] ipc = ["polars-io/ipc"] json = ["polars-io/json"] csv-file = ["polars-io/csv-file"] diff --git a/polars/polars-lazy/polars-plan/src/logical_plan/builder.rs b/polars/polars-lazy/polars-plan/src/logical_plan/builder.rs index 35145a12e5ee..695c649aee80 100644 --- a/polars/polars-lazy/polars-plan/src/logical_plan/builder.rs +++ b/polars/polars-lazy/polars-plan/src/logical_plan/builder.rs @@ -2,6 +2,8 @@ use std::io::{Read, Seek}; use std::ops::Deref; use std::path::PathBuf; +#[cfg(all(feature = "parquet", feature = "async"))] +use futures::future::BoxFuture; #[cfg(feature = "parquet")] use polars_core::cloud::CloudOptions; use polars_core::frame::_duplicate_err; @@ -12,7 +14,7 @@ use polars_core::utils::try_get_supertype; use polars_io::ipc::IpcReader; #[cfg(all(feature = "parquet", feature = "async"))] use polars_io::parquet::ParquetAsyncReader; -#[cfg(feature = "parquet")] +#[cfg(all(feature = "parquet", not(feature = "async")))] use polars_io::parquet::ParquetReader; use polars_io::RowCount; #[cfg(feature = "csv-file")] @@ -113,7 +115,46 @@ impl LogicalPlanBuilder { .into()) } - #[cfg(any(feature = "parquet", feature = "parquet_async"))] + #[cfg(all(feature = "parquet", feature = "async"))] + #[allow(clippy::too_many_arguments)] + pub fn scan_parquet_async( + uri: String, + n_rows: Option, + cache: bool, + parallel: polars_io::parquet::ParallelStrategy, + row_count: Option, + rechunk: bool, + low_memory: bool, + cloud_options: Option, + ) -> BoxFuture<'static, PolarsResult> { + Box::pin(async move { + let (schema, num_rows) = + ParquetAsyncReader::file_info(&uri, cloud_options.as_ref()).await?; + let file_info = FileInfo { + schema: Arc::new(schema), + row_estimation: (Some(num_rows), num_rows), + }; + Ok(LogicalPlan::ParquetScan { + path: PathBuf::from(uri), + file_info, + predicate: None, + options: ParquetOptions { + n_rows, + with_columns: None, + cache, + parallel, + row_count, + rechunk, + file_counter: Default::default(), + low_memory, + }, + cloud_options, + } + .into()) + }) + } + + #[cfg(feature = "parquet")] #[allow(clippy::too_many_arguments)] pub fn scan_parquet>( path: P, @@ -126,6 +167,7 @@ impl LogicalPlanBuilder { cloud_options: Option, use_statistics: bool, ) -> PolarsResult { + use polars_io::prelude::ParquetReader; use polars_io::{is_cloud_url, SerReader as _}; let path = path.into(); @@ -137,13 +179,7 @@ impl LogicalPlanBuilder { #[cfg(feature = "async")] { - let uri = path.to_string_lossy(); - let (schema, num_rows) = - ParquetAsyncReader::file_info(&uri, cloud_options.as_ref())?; - Ok(FileInfo { - schema: Arc::new(schema), - row_estimation: (Some(num_rows), num_rows), - }) + panic!("Use scan_parquet_async for cloud storage.") } } else { let file = std::fs::File::open(&path)?; diff --git a/polars/polars-lazy/src/physical_plan/executors/scan/ipc.rs b/polars/polars-lazy/src/physical_plan/executors/scan/ipc.rs index 08437546c667..562c65eaf69b 100644 --- a/polars/polars-lazy/src/physical_plan/executors/scan/ipc.rs +++ b/polars/polars-lazy/src/physical_plan/executors/scan/ipc.rs @@ -9,8 +9,8 @@ pub struct IpcExec { impl IpcExec { fn read(&mut self, verbose: bool) -> PolarsResult { - let (file, projection, n_rows, predicate) = prepare_scan_args( - &self.path, + let file = std::fs::File::open(&self.path).unwrap(); + let (projection, n_rows, predicate) = prepare_scan_args( &self.predicate, &mut self.options.with_columns, &mut self.schema, diff --git a/polars/polars-lazy/src/physical_plan/executors/scan/mod.rs b/polars/polars-lazy/src/physical_plan/executors/scan/mod.rs index 3661aa09a966..01458ed687f5 100644 --- a/polars/polars-lazy/src/physical_plan/executors/scan/mod.rs +++ b/polars/polars-lazy/src/physical_plan/executors/scan/mod.rs @@ -38,14 +38,11 @@ type Predicate = Option>; #[cfg(any(feature = "ipc", feature = "parquet"))] fn prepare_scan_args( - path: &std::path::Path, predicate: &Option>, with_columns: &mut Option>>, schema: &mut SchemaRef, n_rows: Option, -) -> (std::fs::File, Projection, StopNRows, Predicate) { - let file = std::fs::File::open(path).unwrap(); - +) -> (Projection, StopNRows, Predicate) { let with_columns = mem::take(with_columns); let schema = mem::take(schema); @@ -61,7 +58,7 @@ fn prepare_scan_args( .clone() .map(|expr| Arc::new(PhysicalIoHelper { expr }) as Arc); - (file, projection, n_rows, predicate) + (projection, n_rows, predicate) } /// Producer of an in memory DataFrame diff --git a/py-polars/Cargo.lock b/py-polars/Cargo.lock index 917a85416c0e..d6ad4ea1141a 100644 --- a/py-polars/Cargo.lock +++ b/py-polars/Cargo.lock @@ -539,6 +539,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "doc-comment" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" + [[package]] name = "dyn-clone" version = "1.0.11" @@ -551,6 +557,15 @@ version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7fcaabb2fef8c910e7f4c7ce9f67a1283a1715879a7c230ca9d6d1ae31f16d91" +[[package]] +name = "encoding_rs" +version = "0.8.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9852635589dc9f9ea1b6fe9f05b50ef208c85c834a562f0c6abb1c475736ec2b" +dependencies = [ + "cfg-if", +] + [[package]] name = "enum_dispatch" version = "0.3.11" @@ -594,6 +609,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + [[package]] name = "foreign_vec" version = "0.1.0" @@ -750,6 +771,25 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" +[[package]] +name = "h2" +version = "0.3.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f9f29bc9dda355256b2916cf526ab02ce0aeaaaf2bad60d65ef3f12f11dd0f4" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "halfbrown" version = "0.1.18" @@ -804,6 +844,77 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "http" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75f43d41e26995c17e71ee126451dd3941010b0514a81a9d11f3b341debc2399" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + +[[package]] +name = "http-body" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" +dependencies = [ + "bytes", + "http", + "pin-project-lite", +] + +[[package]] +name = "httparse" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904" + +[[package]] +name = "httpdate" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" + +[[package]] +name = "hyper" +version = "0.14.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "034711faac9d2166cb1baf1a2fb0b60b1f277f8492fd72176c17f3515e1abd3c" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "socket2", + "tokio", + "tower-service", + "tracing", + "want", +] + +[[package]] +name = "hyper-rustls" +version = "0.23.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1788965e61b367cd03a62950836d5cd41560c3577d90e40e0819373194d1661c" +dependencies = [ + "http", + "hyper", + "rustls", + "tokio", + "tokio-rustls", +] + [[package]] name = "iana-time-zone" version = "0.1.53" @@ -874,6 +985,21 @@ dependencies = [ "ghost", ] +[[package]] +name = "ipnet" +version = "2.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30e22bd8629359895450b59ea7a776c850561b96a3b1d31321c1949d9e6c9146" + +[[package]] +name = "itertools" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.5" @@ -1172,6 +1298,12 @@ dependencies = [ "libmimalloc-sys", ] +[[package]] +name = "mime" +version = "0.3.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" + [[package]] name = "miniz_oxide" version = "0.6.2" @@ -1298,6 +1430,33 @@ dependencies = [ "rustc-hash", ] +[[package]] +name = "object_store" +version = "0.5.3" +source = "git+https://github.com/apache/arrow-rs?rev=0f1a92a5f31916570d70b78562913cf877e8929c#0f1a92a5f31916570d70b78562913cf877e8929c" +dependencies = [ + "async-trait", + "base64 0.21.0", + "bytes", + "chrono", + "futures", + "itertools", + "parking_lot 0.12.1", + "percent-encoding", + "quick-xml", + "rand", + "reqwest", + "ring", + "rustls-pemfile", + "serde", + "serde_json", + "snafu", + "tokio", + "tracing", + "url", + "walkdir", +] + [[package]] name = "once_cell" version = "1.17.1" @@ -1518,6 +1677,7 @@ dependencies = [ "serde_json", "smartstring", "thiserror", + "url", "wasm-timer", "xxhash-rust", ] @@ -1539,11 +1699,13 @@ dependencies = [ "ahash", "anyhow", "arrow2", + "async-trait", "bytes", "chrono", "chrono-tz 0.8.1", "dirs", "flate2", + "futures", "lexical", "lexical-core", "memchr", @@ -1560,6 +1722,8 @@ dependencies = [ "serde_json", "simd-json", "simdutf8", + "tokio", + "url", ] [[package]] @@ -1623,6 +1787,7 @@ name = "polars-plan" version = "0.27.2" dependencies = [ "ahash", + "futures", "once_cell", "polars-arrow", "polars-core", @@ -1793,6 +1958,16 @@ dependencies = [ "syn", ] +[[package]] +name = "quick-xml" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffc053f057dd768a56f62cd7e434c42c831d296968997e9ac1f76ea7c2d14c41" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "quote" version = "1.0.23" @@ -1907,6 +2082,62 @@ version = "0.6.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "456c603be3e8d448b072f410900c09faf164fbce2d480456f50eea6e25f9c848" +[[package]] +name = "reqwest" +version = "0.11.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21eed90ec8570952d53b772ecf8f206aa1ec9a3d76b2521c56c42973f2d91ee9" +dependencies = [ + "base64 0.21.0", + "bytes", + "encoding_rs", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "hyper", + "hyper-rustls", + "ipnet", + "js-sys", + "log", + "mime", + "once_cell", + "percent-encoding", + "pin-project-lite", + "rustls", + "rustls-pemfile", + "serde", + "serde_json", + "serde_urlencoded", + "tokio", + "tokio-rustls", + "tokio-util", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "wasm-streams", + "web-sys", + "webpki-roots", + "winreg", +] + +[[package]] +name = "ring" +version = "0.16.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc" +dependencies = [ + "cc", + "libc", + "once_cell", + "spin", + "untrusted", + "web-sys", + "winapi", +] + [[package]] name = "rle-decode-fast" version = "1.0.3" @@ -1928,6 +2159,27 @@ dependencies = [ "semver", ] +[[package]] +name = "rustls" +version = "0.20.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fff78fc74d175294f4e83b28343315ffcfb114b156f0185e9741cb5570f50e2f" +dependencies = [ + "log", + "ring", + "sct", + "webpki", +] + +[[package]] +name = "rustls-pemfile" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d194b56d58803a43635bdc398cd17e383d6f71f9182b9a192c127ca42494a59b" +dependencies = [ + "base64 0.21.0", +] + [[package]] name = "rustversion" version = "1.0.11" @@ -1940,6 +2192,15 @@ version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b4b9743ed687d4b4bcedf9ff5eaa7398495ae14e61cba0a295704edbc7decde" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "scopeguard" version = "1.1.0" @@ -1952,6 +2213,16 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ddccb15bcce173023b3fedd9436f882a0739b8dfb45e4f6b6002bee5929f61b2" +[[package]] +name = "sct" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "semver" version = "1.0.16" @@ -1999,6 +2270,18 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + [[package]] name = "signal-hook" version = "0.3.15" @@ -2082,6 +2365,28 @@ dependencies = [ "version_check", ] +[[package]] +name = "snafu" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb0656e7e3ffb70f6c39b3c2a86332bb74aa3c679da781642590f3c1118c5045" +dependencies = [ + "doc-comment", + "snafu-derive", +] + +[[package]] +name = "snafu-derive" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "475b3bbe5245c26f2d8a6f62d67c1f30eb9fffeccee721c45d162c3ebbdf81b2" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "snap" version = "1.1.0" @@ -2227,6 +2532,50 @@ dependencies = [ "serde", ] +[[package]] +name = "tower-service" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" + +[[package]] +name = "tracing" +version = "0.1.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" +dependencies = [ + "cfg-if", + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4017f8f45139870ca7e672686113917c71c7a6e02d4924eda67186083c03081a" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing-core" +version = "0.1.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24eb03ba0eab1fd845050058ce5e616558e8f8d8fca633e6b163fe25c797213a" +dependencies = [ + "once_cell", +] + +[[package]] +name = "try-lock" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" + [[package]] name = "uncased" version = "0.9.7" @@ -2269,6 +2618,12 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e1766d682d402817b5ac4490b3c3002d91dfa0d22812f341609f97b08757359c" +[[package]] +name = "untrusted" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" + [[package]] name = "url" version = "2.3.1" @@ -2304,6 +2659,27 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "walkdir" +version = "2.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "808cf2735cd4b6866113f648b791c6adc5714537bc222d9347bb203386ffda56" +dependencies = [ + "same-file", + "winapi", + "winapi-util", +] + +[[package]] +name = "want" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0" +dependencies = [ + "log", + "try-lock", +] + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" @@ -2376,6 +2752,19 @@ version = "0.2.84" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0046fef7e28c3804e5e38bfa31ea2a0f73905319b677e57ebe37e49358989b5d" +[[package]] +name = "wasm-streams" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bbae3363c08332cadccd13b67db371814cd214c2524020932f0804b8cf7c078" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "wasm-timer" version = "0.2.5" @@ -2401,6 +2790,25 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd" +dependencies = [ + "ring", + "untrusted", +] + +[[package]] +name = "webpki-roots" +version = "0.22.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6c71e40d7d2c34a5106301fb632274ca37242cd0c9d3e64dbece371a40a2d87" +dependencies = [ + "webpki", +] + [[package]] name = "winapi" version = "0.3.9" @@ -2498,6 +2906,15 @@ version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "447660ad36a13288b1db4d4248e857b510e8c3a225c822ba4fb748c0aafecffd" +[[package]] +name = "winreg" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80d0f4e272c85def139476380b12f9ac60926689dd2e01d4923222f40580869d" +dependencies = [ + "winapi", +] + [[package]] name = "xxhash-rust" version = "0.8.6" diff --git a/py-polars/Cargo.toml b/py-polars/Cargo.toml index 6efe56d1a1b0..f5af06bf5b0f 100644 --- a/py-polars/Cargo.toml +++ b/py-polars/Cargo.toml @@ -65,6 +65,10 @@ merge_sorted = ["polars/merge_sorted"] list_take = ["polars/list_take"] list_count = ["polars/list_count"] binary_encoding = ["polars/binary_encoding"] +"async" = ["polars/async"] +"aws" = ["polars/aws", "async"] +"gcp" = ["polars/gcp", "async"] +"azure" = ["polars/azure", "async"] all = [ "json", @@ -99,6 +103,9 @@ all = [ "performant", "list_take", "list_count", + "aws", + "gcp", + "azure", ] # we cannot conditionaly activate simd diff --git a/py-polars/polars/internals/lazyframe/frame.py b/py-polars/polars/internals/lazyframe/frame.py index eaeb4bbdf8a1..25dd1867f486 100644 --- a/py-polars/polars/internals/lazyframe/frame.py +++ b/py-polars/polars/internals/lazyframe/frame.py @@ -57,7 +57,7 @@ ) with contextlib.suppress(ImportError): # Module not available when building docs - from polars.polars import PyLazyFrame + from polars.polars import PyLazyFrame, is_object_store_url if TYPE_CHECKING: @@ -101,6 +101,10 @@ def wrap_ldf(ldf: PyLazyFrame) -> LazyFrame: return LazyFrame._from_pyldf(ldf) +# The storage_options key to enable the Rust implementation +# of the download code. +OBJECT_STORE_KEY = "object_store" + @redirect( { @@ -384,8 +388,14 @@ def _scan_parquet( polars.io.scan_parquet """ + object_store = ( + storage_options + and OBJECT_STORE_KEY in storage_options + and bool(storage_options[OBJECT_STORE_KEY]) + and is_object_store_url(file) + ) # try fsspec scanner - if not pli._is_local_file(file): + if not (pli._is_local_file(file) or object_store): scan = pli._scan_parquet_fsspec(file, storage_options) if n_rows: scan = scan.head(n_rows) @@ -393,6 +403,8 @@ def _scan_parquet( scan = scan.with_row_count(row_count_name, row_count_offset) return scan # type: ignore[return-value] + if storage_options and OBJECT_STORE_KEY in storage_options: + del storage_options[OBJECT_STORE_KEY] self = cls.__new__(cls) self._ldf = PyLazyFrame.new_from_parquet( file, diff --git a/py-polars/src/lib.rs b/py-polars/src/lib.rs index 645aabaec836..99ed0ff981e4 100644 --- a/py-polars/src/lib.rs +++ b/py-polars/src/lib.rs @@ -456,6 +456,15 @@ fn parquet_schema(py: Python, py_f: PyObject) -> PyResult { Ok(dict.to_object(py)) } +#[pyfunction] +fn is_object_store_url(py: Python, url: &str) -> PyResult { + #[cfg(feature = "async")] + let is_supported_url = polars_rs::io::is_cloud_url(url); + #[cfg(not(feature = "async"))] + let is_supported_url = false; + Ok(is_supported_url.to_object(py)) +} + #[pyfunction] fn collect_all(lfs: Vec, py: Python) -> PyResult> { use polars_core::utils::rayon::prelude::*; @@ -672,6 +681,8 @@ fn polars(py: Python, m: &PyModule) -> PyResult<()> { m.add_wrapped(wrap_pyfunction!(ipc_schema)).unwrap(); #[cfg(feature = "parquet")] m.add_wrapped(wrap_pyfunction!(parquet_schema)).unwrap(); + m.add_wrapped(wrap_pyfunction!(is_object_store_url)) + .unwrap(); m.add_wrapped(wrap_pyfunction!(collect_all)).unwrap(); m.add_wrapped(wrap_pyfunction!(spearman_rank_corr)).unwrap(); m.add_wrapped(wrap_pyfunction!(map_mul)).unwrap(); From fb5502b84251ce6adb83dedb7c0f9da15afd7c93 Mon Sep 17 00:00:00 2001 From: Marius S <39998+winding-lines@users.noreply.github.com> Date: Sun, 5 Mar 2023 07:46:06 -0800 Subject: [PATCH 2/2] wip --- polars/polars-io/src/parquet/async_manager.rs | 21 ++++++++++++------- polars/polars-io/src/parquet/read_impl.rs | 18 +++++++--------- 2 files changed, 21 insertions(+), 18 deletions(-) diff --git a/polars/polars-io/src/parquet/async_manager.rs b/polars/polars-io/src/parquet/async_manager.rs index 4099ef896b2b..d132bb68a7c7 100644 --- a/polars/polars-io/src/parquet/async_manager.rs +++ b/polars/polars-io/src/parquet/async_manager.rs @@ -10,9 +10,10 @@ use futures::channel::mpsc::Sender; use futures::channel::oneshot; use once_cell::sync::Lazy; use polars_core::prelude::*; -use tokio::runtime::Handle; +use tokio::runtime::Runtime; -use super::mmap::ColumnMapper; +use super::async_impl::ParquetObjectStore; +use super::mmap::CloudMapper; static GLOBAL_ASYNC_MANAGER: Lazy = Lazy::new(AsyncManager::default); @@ -47,7 +48,7 @@ enum AsyncParquetReaderMessage { /// The row groups to fetch. row_groups: Range, /// The channel to send the result to. - tx: oneshot::Sender>, + tx: oneshot::Sender>, }, } @@ -56,7 +57,7 @@ pub(crate) struct AsyncManager { /// The channel to communicate with the manager. tx: Sender, /// A handle to the Tokio runtime running the manager. - handle: Handle, + runtime: Runtime, /// Opened readers. readers: PlHashMap>, } @@ -64,12 +65,14 @@ pub(crate) struct AsyncManager { impl AsyncManager { /// Create a new async manager. pub fn new() -> AsyncManager { + use futures::stream::StreamExt; + let (tx, rx) = futures::channel::mpsc::channel(1); - let handle = tokio::runtime::Builder::new_multi_thread() + let runtime = tokio::runtime::Builder::new_multi_thread() .enable_all() .build() .unwrap(); - handle.spawn(async move { + runtime.spawn(async move { let mut reader = None; while let Some(message) = rx.next().await { match message { @@ -106,7 +109,11 @@ impl AsyncManager { } } }); - AsyncManager { tx, handle } + AsyncManager { + tx, + runtime, + readers: PlHashMap::new(), + } } } diff --git a/polars/polars-io/src/parquet/read_impl.rs b/polars/polars-io/src/parquet/read_impl.rs index 5243f6856748..241275fd546e 100644 --- a/polars/polars-io/src/parquet/read_impl.rs +++ b/polars/polars-io/src/parquet/read_impl.rs @@ -109,7 +109,7 @@ fn rg_to_dfs( let md = &file_metadata.row_groups[rg]; let current_row_count = md.num_rows() as IdxSize; - if use_statistics && !read_this_row_group(predicate.as_ref(), file_metadata, schema, rg)? { + if use_statistics && !read_this_row_group(predicate, file_metadata, schema, rg)? { *previous_row_count += current_row_count; continue; } @@ -363,11 +363,8 @@ impl FetchRowGroupsFromMmapReader { /// There is nothing to do when fetching a mmap-ed file. impl FetchRowGroups for FetchRowGroupsFromMmapReader { - fn fetch_row_groups( - &mut self, - _row_groups: Range, - ) -> PolarsResult { - Ok(mmap::CloudMapper::PassThrough(self.0.deref())) + fn fetch_row_groups(&mut self, _row_groups: Range) -> PolarsResult { + Ok(mmap::CloudMapper::PassThrough(self.0.deref())) } } @@ -377,7 +374,6 @@ pub struct BatchedParquetReader { row_group_fetcher: Box, limit: usize, projection: Vec, - predicate: Option>, schema: ArrowSchema, metadata: FileMetaData, row_count: Option, @@ -432,8 +428,8 @@ impl BatchedParquetReader { pub fn next_batches(&mut self, n: usize) -> PolarsResult>> { // fill up fifo stack if self.row_group_offset <= self.n_row_groups && self.chunks_fifo.len() < n { - let row_group_start = self.row_group_offset; - let row_group_end = std::cmp::min(self.row_group_offset + n, self.n_row_groups); + let row_group_start = self.row_group_offset; + let row_group_end = std::cmp::min(self.row_group_offset + n, self.n_row_groups); let store = self .row_group_fetcher .fetch_row_groups(row_group_start..row_group_end)?; @@ -473,8 +469,8 @@ impl BatchedParquetReader { self.row_group_offset += n; dfs } - _ => unimplemented!(), - }; + _ => unimplemented!(), + }; // TODO! this is slower than it needs to be // we also need to parallelize over row groups here.