Skip to content

Commit

Permalink
feat(rust,python): Enable the object_store integration in python
Browse files Browse the repository at this point in the history
  • Loading branch information
winding-lines committed Mar 5, 2023
1 parent d5ca27e commit 67b4e96
Show file tree
Hide file tree
Showing 19 changed files with 760 additions and 80 deletions.
2 changes: 2 additions & 0 deletions examples/read_parquet_cloud/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ fn main() -> PolarsResult<()> {
.select([
// select all columns
all(),
// and do some aggregations
cols(["fats_g", "sugars_g"]).sum().suffix("_summed"),
])
.collect()?;

Expand Down
2 changes: 1 addition & 1 deletion polars/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ indexmap = { version = "1", features = ["std"] }
itoap = { version = "1", optional = true }
ndarray = { version = "0.15", optional = true, default_features = false }
num-traits.workspace = true
object_store = { version = "0.5.3", default-features = false, optional = true }
object_store = { version = "0.5.5", default-features = false, optional = true }
once_cell.workspace = true
polars-arrow = { version = "0.27.2", path = "../polars-arrow", features = ["compute"] }
polars-error = { version = "0.27.2", path = "../polars-error" }
Expand Down
4 changes: 2 additions & 2 deletions polars/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ lexical-core = { version = "0.8", optional = true }
memchr.workspace = true
memmap = { package = "memmap2", version = "0.5.2", optional = true }
num-traits.workspace = true
object_store = { version = "0.5.3", default-features = false, optional = true }
object_store = { version = "0.5.5", default-features = false, optional = true }
once_cell = "1"
polars-arrow = { version = "0.27.2", path = "../polars-arrow" }
polars-core = { version = "0.27.2", path = "../polars-core", features = ["private"], default-features = false }
Expand All @@ -76,7 +76,7 @@ serde = { version = "1", features = ["derive"], optional = true }
serde_json = { version = "1", optional = true, default-features = false, features = ["alloc", "raw_value"] }
simd-json = { version = "0.7.0", optional = true, features = ["allow-non-simd", "known-key"] }
simdutf8 = "0.1"
tokio = { version = "1.22.0", features = ["net"], optional = true }
tokio = { version = "1.24.0", features = ["net", "rt-multi-thread"], optional = true }
url = { version = "2.3.1", optional = true }

[dev-dependencies]
Expand Down
11 changes: 10 additions & 1 deletion polars/polars-io/src/cloud/glob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ impl Matcher {
}
}

#[tokio::main(flavor = "current_thread")]
/// List files with a prefix derived from the pattern.
pub async fn glob(url: &str, cloud_options: Option<&CloudOptions>) -> PolarsResult<Vec<String>> {
// Find the fixed prefix, up to the first '*'.
Expand All @@ -164,6 +163,9 @@ pub async fn glob(url: &str, cloud_options: Option<&CloudOptions>) -> PolarsResu
store,
) = super::build(url, cloud_options)?;
let matcher = Matcher::new(prefix.clone(), expansion.as_deref())?;
if expansion.is_none() {
return Ok(vec![url.into()]);
}

let list_stream = store
.list(Some(&Path::from(prefix)))
Expand Down Expand Up @@ -260,6 +262,13 @@ mod test {
assert!(!a.is_matching(&Path::from("folder/1parquet")));
// Intermediary folders are not allowed.
assert!(!a.is_matching(&Path::from("folder/other/1.parquet")));

// Match full name.
let cloud_location = CloudLocation::new("s3://bucket/folder/some.parquet").unwrap();
let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
// Regular match.
assert!(a.is_matching(&Path::from("folder/some.parquet")));
assert!(!a.is_matching(&Path::from("folder/other.parquet")));
}

#[test]
Expand Down
7 changes: 4 additions & 3 deletions polars/polars-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@ pub mod partition;

use std::io::{Read, Seek, Write};
use std::path::{Path, PathBuf};
use std::str::FromStr;

#[allow(unused)] // remove when updating to rust nightly >= 1.61
use arrow::array::new_empty_array;
use arrow::error::Result as ArrowResult;
pub use options::*;
use polars_core::config::verbose;
use polars_core::cloud::CloudType;
use polars_core::frame::ArrowChunk;
use polars_core::prelude::*;

Expand Down Expand Up @@ -173,7 +175,6 @@ pub(crate) fn finish_reader<R: ArrowReader>(

/// Check if the path is a cloud url.
pub fn is_cloud_url<P: AsRef<Path>>(p: P) -> bool {
p.as_ref().starts_with("s3://")
|| p.as_ref().starts_with("file://")
|| p.as_ref().starts_with("gcs://")
let path = p.as_ref();
return CloudType::from_str(&path.to_string_lossy()).is_ok();
}
31 changes: 20 additions & 11 deletions polars/polars-io/src/parquet/async_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use arrow::io::parquet::read::{
use arrow::io::parquet::write::FileMetaData;
use futures::future::BoxFuture;
use futures::lock::Mutex;
use futures::{stream, StreamExt, TryFutureExt, TryStreamExt};
use futures::{stream, StreamExt, TryFutureExt, TryStreamExt, FutureExt};
use object_store::path::Path as ObjectPath;
use object_store::ObjectStore;
use polars_core::cloud::CloudOptions;
Expand All @@ -20,7 +20,7 @@ use polars_core::schema::Schema;

use super::cloud::{build, CloudLocation, CloudReader};
use super::mmap;
use super::mmap::ColumnStore;
use super::mmap::CloudMapper;
use super::read_impl::FetchRowGroups;

pub struct ParquetObjectStore {
Expand Down Expand Up @@ -104,7 +104,6 @@ type RowGroupChunks<'a> = Vec<Vec<(&'a ColumnChunkMetaData, Vec<u8>)>>;

/// Download rowgroups for the column whose indexes are given in `projection`.
/// We concurrently download the columns for each field.
#[tokio::main(flavor = "current_thread")]
async fn download_projection<'a: 'b, 'b>(
projection: &[usize],
row_groups: &'a [RowGroupMetaData],
Expand Down Expand Up @@ -149,7 +148,7 @@ pub(crate) struct FetchRowGroupsFromObjectStore {
row_groups_metadata: Vec<RowGroupMetaData>,
projection: Vec<usize>,
logging: bool,
schema: ArrowSchema,
pub schema: ArrowSchema,
}

impl FetchRowGroupsFromObjectStore {
Expand All @@ -173,11 +172,12 @@ impl FetchRowGroupsFromObjectStore {
schema,
})
}
}

impl FetchRowGroups for FetchRowGroupsFromObjectStore {
fn fetch_row_groups(&mut self, row_groups: Range<usize>) -> PolarsResult<ColumnStore> {
// Fetch the required row groups.
/// Fetch the required row groups asynchronously.
pub async fn fetch_row_groups_async(
&mut self,
row_groups: Range<usize>,
) -> PolarsResult<CloudMapper> {
let row_groups = &self
.row_groups_metadata
.get(row_groups.clone())
Expand All @@ -194,9 +194,9 @@ impl FetchRowGroups for FetchRowGroupsFromObjectStore {
Ok,
)?;

// Package in the format required by ColumnStore.
// Package in the format required by ColumnAccess.
let downloaded =
download_projection(&self.projection, row_groups, &self.schema, &self.reader)?;
download_projection(&self.projection, row_groups, &self.schema, &self.reader).await?;
if self.logging {
eprintln!(
"BatchedParquetReader: fetched {} row_groups for {} fields, yielding {} column chunks.",
Expand Down Expand Up @@ -224,6 +224,15 @@ impl FetchRowGroups for FetchRowGroupsFromObjectStore {
);
}

Ok(mmap::ColumnStore::Fetched(downloaded_per_filepos))
Ok(mmap::CloudMapper::Fetched(downloaded_per_filepos))
}
}

impl FetchRowGroups for FetchRowGroupsFromObjectStore {
fn fetch_row_groups<'a>(
&'a mut self,
row_groups: Range<usize>,
) -> BoxFuture<'a, PolarsResult<CloudMapper>> {
self.fetch_row_groups_async(row_groups).boxed()
}
}
117 changes: 117 additions & 0 deletions polars/polars-io/src/parquet/async_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
//! Polars is a heavily multi-threaded library. Some IO operations, specially cloud based ones,
//! are best served by an async module. The AsyncManager owns a multi-threaded Tokio runtime
//! and is responsible for managing the async calls to the object store and the associated state.
use std::ops::Range;

use arrow::io::parquet::read::RowGroupMetaData;
use arrow::io::parquet::write::FileMetaData;
use futures::channel::mpsc::Sender;
use futures::channel::oneshot;
use once_cell::sync::Lazy;
use polars_core::prelude::*;
use tokio::runtime::Handle;

use super::mmap::ColumnMapper;

static GLOBAL_ASYNC_MANAGER: Lazy<AsyncManager> = Lazy::new(AsyncManager::default);

enum AsyncParquetReaderMessage {
/// Fetch the metadata of the parquet file, do not memoize it.
FetchMetadata {
/// The channel to send the result to.
tx: oneshot::Sender<PolarsResult<FileMetaData>>,
},
/// Fetch and memoize the metadata of the parquet file.
GetMetadata {
/// The channel to send the result to.
tx: oneshot::Sender<PolarsResult<FileMetaData>>,
},
/// Fetch the number of rows of the parquet file.
NumRows {
/// The channel to send the result to.
tx: oneshot::Sender<PolarsResult<usize>>,
},
/// Fetch the schema of the parquet file.
Schema {
/// The channel to send the result to.
tx: oneshot::Sender<PolarsResult<ArrowSchema>>,
},
/// Fetch the row groups of the parquet file.
RowGroups {
/// The channel to send the result to.
tx: oneshot::Sender<PolarsResult<Vec<RowGroupMetaData>>>,
},
/// Fetch the row groups of the parquet file.
FetchRowGroups {
/// The row groups to fetch.
row_groups: Range<usize>,
/// The channel to send the result to.
tx: oneshot::Sender<PolarsResult<ColumnMapper>>,
},
}

/// Separate the async calls in their own manager and interact with the rest of the code with a channel.
pub(crate) struct AsyncManager {
/// The channel to communicate with the manager.
tx: Sender<AsyncParquetReaderMessage>,
/// A handle to the Tokio runtime running the manager.
handle: Handle,
/// Opened readers.
readers: PlHashMap<String, Arc<ParquetObjectStore>>,
}

impl AsyncManager {
/// Create a new async manager.
pub fn new() -> AsyncManager {
let (tx, rx) = futures::channel::mpsc::channel(1);
let handle = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
handle.spawn(async move {
let mut reader = None;
while let Some(message) = rx.next().await {
match message {
AsyncParquetReaderMessage::FetchMetadata { tx } => {
let reader = reader.as_mut().unwrap();
let result = reader.fetch_metadata().await;
tx.send(result).unwrap();
}
AsyncParquetReaderMessage::GetMetadata { tx } => {
let reader = reader.as_mut().unwrap();
let result = reader.get_metadata().await;
tx.send(result).unwrap();
}
AsyncParquetReaderMessage::NumRows { tx } => {
let reader = reader.as_mut().unwrap();
let result = reader.num_rows().await;
tx.send(result).unwrap();
}
AsyncParquetReaderMessage::Schema { tx } => {
let reader = reader.as_mut().unwrap();
let result = reader.schema().await;
tx.send(result).unwrap();
}
AsyncParquetReaderMessage::RowGroups { tx } => {
let reader = reader.as_mut().unwrap();
let result = reader.row_groups().await;
tx.send(result).unwrap();
}
AsyncParquetReaderMessage::FetchRowGroups { row_groups, tx } => {
let reader = reader.as_mut().unwrap();
let result = reader.fetch_row_groups(row_groups).await;
tx.send(result).unwrap();
}
}
}
});
AsyncManager { tx, handle }
}
}

impl Default for AsyncManager {
fn default() -> Self {
AsyncManager::new()
}
}
18 changes: 9 additions & 9 deletions polars/polars-io/src/parquet/mmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use polars_core::datatypes::PlHashMap;

use super::*;

/// Store columns data in two scenarios:
/// 1. a local memory mapped file
/// Map column data in two scenarios:
/// 1. a local memory mapped file, there is nothing to do in this case.
/// 2. data fetched from cloud storage on demand, in this case
/// a. the key in the hashmap is the start in the file
/// b. the value in the hashmap is the actual data.
Expand All @@ -19,16 +19,16 @@ use super::*;
/// b. asynchronously fetch them in parallel, for example using object_store
/// c. store the data in this data structure
/// d. when all the data is available deserialize on multiple threads, for example using rayon
pub enum ColumnStore<'a> {
Local(&'a [u8]),
pub enum CloudMapper<'a> {
PassThrough(&'a [u8]),
#[cfg(feature = "async")]
Fetched(PlHashMap<u64, Vec<u8>>),
}

/// For local files memory maps all columns that are part of the parquet field `field_name`.
/// For cloud files the relevant memory regions should have been prefetched.
pub(super) fn mmap_columns<'a>(
store: &'a ColumnStore,
store: &'a CloudMapper,
columns: &'a [ColumnChunkMetaData],
field_name: &str,
) -> Vec<(&'a ColumnChunkMetaData, &'a [u8])> {
Expand All @@ -39,17 +39,17 @@ pub(super) fn mmap_columns<'a>(
}

fn _mmap_single_column<'a>(
store: &'a ColumnStore,
store: &'a CloudMapper,
meta: &'a ColumnChunkMetaData,
) -> (&'a ColumnChunkMetaData, &'a [u8]) {
let (start, len) = meta.byte_range();
let chunk = match store {
ColumnStore::Local(file) => &file[start as usize..(start + len) as usize],
CloudMapper::PassThrough(file) => &file[start as usize..(start + len) as usize],
#[cfg(all(feature = "async", feature = "parquet"))]
ColumnStore::Fetched(fetched) => {
CloudMapper::Fetched(fetched) => {
let entry = fetched.get(&start).unwrap_or_else(|| {
panic!(
"mmap_columns: column with start {start} must be prefetched in ColumnStore.\n"
"mmap_columns: column with start {start} must be prefetched in ColumnAccess.\n"
)
});
entry.as_slice()
Expand Down
3 changes: 3 additions & 0 deletions polars/polars-io/src/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
//!
#[cfg(feature = "async")]
pub(super) mod async_impl;
#[cfg(feature = "async")]
pub(super) mod async_manager;

pub(super) mod mmap;
pub mod predicates;
mod read;
Expand Down
Loading

0 comments on commit 67b4e96

Please sign in to comment.