Skip to content

Commit

Permalink
feat: Support schema arg in read/scan_parquet()
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion committed Sep 30, 2024
1 parent c23266b commit 2a0c345
Show file tree
Hide file tree
Showing 15 changed files with 179 additions and 37 deletions.
1 change: 1 addition & 0 deletions crates/polars-io/src/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,5 @@ pub mod _internal {
pub use super::mmap::to_deserializer;
pub use super::predicates::read_this_row_group;
pub use super::read_impl::{calc_prefilter_cost, PrefilterMaskSetting};
pub use super::utils::ensure_matching_dtypes_if_found;
}
4 changes: 3 additions & 1 deletion crates/polars-io/src/parquet/read/options.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use arrow::datatypes::ArrowSchemaRef;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, PartialEq, Eq, Copy, Hash)]
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct ParquetOptions {
pub schema: Option<ArrowSchemaRef>,
pub parallel: ParallelStrategy,
pub low_memory: bool,
pub use_statistics: bool,
Expand Down
6 changes: 5 additions & 1 deletion crates/polars-io/src/parquet/read/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub use super::read_impl::BatchedParquetReader;
use super::read_impl::{compute_row_group_range, read_parquet, FetchRowGroupsFromMmapReader};
#[cfg(feature = "cloud")]
use super::utils::materialize_empty_df;
use super::utils::projected_arrow_schema_to_projection_indices;
use super::utils::{ensure_matching_dtypes_if_found, projected_arrow_schema_to_projection_indices};
#[cfg(feature = "cloud")]
use crate::cloud::CloudOptions;
use crate::mmap::MmapBytesReader;
Expand Down Expand Up @@ -90,6 +90,8 @@ impl<R: MmapBytesReader> ParquetReader<R> {
allow_missing_columns: bool,
) -> PolarsResult<Self> {
if allow_missing_columns {
// Must check the dtypes
ensure_matching_dtypes_if_found(first_schema, self.schema()?.as_ref())?;
self.schema.replace(first_schema.clone());
}

Expand Down Expand Up @@ -327,6 +329,8 @@ impl ParquetAsyncReader {
allow_missing_columns: bool,
) -> PolarsResult<Self> {
if allow_missing_columns {
// Must check the dtypes
ensure_matching_dtypes_if_found(first_schema, self.schema().await?.as_ref())?;
self.schema.replace(first_schema.clone());
}

Expand Down
34 changes: 32 additions & 2 deletions crates/polars-io/src/parquet/read/utils.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::borrow::Cow;

use polars_core::prelude::{ArrowSchema, DataFrame, DataType, Series, IDX_DTYPE};
use polars_core::schema::SchemaNamesAndDtypes;
use polars_error::{polars_bail, PolarsResult};

use crate::hive::materialize_hive_partitions;
Expand Down Expand Up @@ -51,11 +52,40 @@ pub(super) fn projected_arrow_schema_to_projection_indices(
let expected_dtype = DataType::from_arrow(&field.dtype, true);

if dtype.clone() != expected_dtype {
polars_bail!(SchemaMismatch: "data type mismatch for column {}: found: {}, expected: {}",
&field.name, dtype, expected_dtype
polars_bail!(SchemaMismatch: "data type mismatch for column {}: expected: {}, found: {}",
&field.name, expected_dtype, dtype
)
}
}

Ok((!is_full_ordered_projection).then_some(projection_indices))
}

/// Utility to ensure the dtype of the column in `current_schema` matches the dtype in `schema` if
/// that column exists in `schema`.
pub fn ensure_matching_dtypes_if_found(
schema: &ArrowSchema,
current_schema: &ArrowSchema,
) -> PolarsResult<()> {
current_schema
.iter_names_and_dtypes()
.try_for_each(|(name, dtype)| {
if let Some(field) = schema.get(name) {
if dtype != &field.dtype {
// Check again with timezone normalization
// TODO: Add an ArrowDtype eq wrapper?
let lhs = DataType::from_arrow(dtype, true);
let rhs = DataType::from_arrow(&field.dtype, true);

if lhs != rhs {
polars_bail!(
SchemaMismatch:
"dtypes differ for column {}: {:?} != {:?}"
, name, dtype, &field.dtype
);
}
}
}
Ok(())
})
}
3 changes: 3 additions & 0 deletions crates/polars-lazy/src/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub struct ScanArgsParquet {
pub cloud_options: Option<CloudOptions>,
pub hive_options: HiveOptions,
pub use_statistics: bool,
pub schema: Option<SchemaRef>,
pub low_memory: bool,
pub rechunk: bool,
pub cache: bool,
Expand All @@ -33,6 +34,7 @@ impl Default for ScanArgsParquet {
cloud_options: None,
hive_options: Default::default(),
use_statistics: true,
schema: None,
rechunk: false,
low_memory: false,
cache: true,
Expand Down Expand Up @@ -73,6 +75,7 @@ impl LazyFileListReader for LazyParquetReader {
self.args.low_memory,
self.args.cloud_options,
self.args.use_statistics,
self.args.schema.as_deref(),
self.args.hive_options,
self.args.glob,
self.args.include_file_paths,
Expand Down
12 changes: 10 additions & 2 deletions crates/polars-mem-engine/src/executors/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,11 @@ impl ParquetExec {
// Modified if we have a negative slice
let mut first_source = 0;

let first_schema = self.file_info.reader_schema.clone().unwrap().unwrap_left();
let first_schema = self
.options
.schema
.clone()
.unwrap_or_else(|| self.file_info.reader_schema.clone().unwrap().unwrap_left());

let projected_arrow_schema = {
if let Some(with_columns) = self.file_options.with_columns.as_deref() {
Expand Down Expand Up @@ -258,7 +262,11 @@ impl ParquetExec {
eprintln!("POLARS PREFETCH_SIZE: {}", batch_size)
}

let first_schema = self.file_info.reader_schema.clone().unwrap().unwrap_left();
let first_schema = self
.options
.schema
.clone()
.unwrap_or_else(|| self.file_info.reader_schema.clone().unwrap().unwrap_left());

let projected_arrow_schema = {
if let Some(with_columns) = self.file_options.with_columns.as_deref() {
Expand Down
7 changes: 5 additions & 2 deletions crates/polars-pipe/src/executors/sources/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl ParquetSource {
.as_paths()
.ok_or_else(|| polars_err!(nyi = "Streaming scanning of in-memory buffers"))?;
let path = &paths[index];
let options = self.options;
let options = self.options.clone();
let file_options = self.file_options.clone();

let hive_partitions = self
Expand Down Expand Up @@ -261,7 +261,10 @@ impl ParquetSource {
}
let run_async = paths.first().map(is_cloud_url).unwrap_or(false) || config::force_async();

let first_schema = file_info.reader_schema.clone().unwrap().unwrap_left();
let first_schema = options
.schema
.clone()
.unwrap_or_else(|| file_info.reader_schema.clone().unwrap().unwrap_left());

let projected_arrow_schema = {
if let Some(with_columns) = file_options.with_columns.as_deref() {
Expand Down
2 changes: 2 additions & 0 deletions crates/polars-plan/src/plans/builder_dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ impl DslBuilder {
low_memory: bool,
cloud_options: Option<CloudOptions>,
use_statistics: bool,
schema: Option<&Schema>,
hive_options: HiveOptions,
glob: bool,
include_file_paths: Option<PlSmallStr>,
Expand All @@ -108,6 +109,7 @@ impl DslBuilder {
file_options: options,
scan_type: FileScan::Parquet {
options: ParquetOptions {
schema: schema.map(|x| Arc::new(x.to_arrow(CompatLevel::newest()))),
parallel,
low_memory,
use_statistics,
Expand Down
4 changes: 3 additions & 1 deletion crates/polars-python/src/lazyframe/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ impl PyLazyFrame {
#[cfg(feature = "parquet")]
#[staticmethod]
#[pyo3(signature = (source, sources, n_rows, cache, parallel, rechunk, row_index,
low_memory, cloud_options, use_statistics, hive_partitioning, hive_schema, try_parse_hive_dates, retries, glob, include_file_paths, allow_missing_columns)
low_memory, cloud_options, use_statistics, hive_partitioning, schema, hive_schema, try_parse_hive_dates, retries, glob, include_file_paths, allow_missing_columns)
)]
fn new_from_parquet(
source: Option<PyObject>,
Expand All @@ -254,6 +254,7 @@ impl PyLazyFrame {
cloud_options: Option<Vec<(String, String)>>,
use_statistics: bool,
hive_partitioning: Option<bool>,
schema: Option<Wrap<Schema>>,
hive_schema: Option<Wrap<Schema>>,
try_parse_hive_dates: bool,
retries: usize,
Expand Down Expand Up @@ -285,6 +286,7 @@ impl PyLazyFrame {
low_memory,
cloud_options: None,
use_statistics,
schema: schema.map(|x| Arc::new(x.0)),
hive_options,
glob,
include_file_paths: include_file_paths.map(|x| x.into()),
Expand Down
18 changes: 2 additions & 16 deletions crates/polars-stream/src/nodes/parquet_source/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,7 @@ impl ParquetSourceNode {
);
}

let reader_schema = self
.file_info
.reader_schema
.as_ref()
.unwrap()
.as_ref()
.unwrap_left()
.clone();
let reader_schema = self.schema.clone().unwrap();

let (normalized_slice_oneshot_rx, metadata_rx, metadata_task_handle) =
self.init_metadata_fetcher();
Expand Down Expand Up @@ -361,14 +354,7 @@ impl ParquetSourceNode {
}

pub(super) fn init_projected_arrow_schema(&mut self) {
let reader_schema = self
.file_info
.reader_schema
.as_ref()
.unwrap()
.as_ref()
.unwrap_left()
.clone();
let reader_schema = self.schema.clone().unwrap();

self.projected_arrow_schema = Some(
if let Some(columns) = self.file_options.with_columns.as_deref() {
Expand Down
17 changes: 7 additions & 10 deletions crates/polars-stream/src/nodes/parquet_source/metadata_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::Arc;
use futures::StreamExt;
use polars_error::{polars_bail, PolarsResult};
use polars_io::prelude::FileMetadata;
use polars_io::prelude::_internal::ensure_matching_dtypes_if_found;
use polars_io::utils::byte_source::{DynByteSource, MemSliceByteSource};
use polars_io::utils::slice::SplitSlicePosition;
use polars_utils::mmap::MemSlice;
Expand Down Expand Up @@ -106,21 +107,15 @@ impl ParquetSourceNode {
};

let first_metadata = self.first_metadata.clone();
let reader_schema_len = self
.file_info
.reader_schema
.as_ref()
.unwrap()
.as_ref()
.unwrap_left()
.len();
let first_schema = self.schema.clone().unwrap();
let has_projection = self.file_options.with_columns.is_some();
let allow_missing_columns = self.file_options.allow_missing_columns;

let process_metadata_bytes = {
move |handle: task_handles_ext::AbortOnDropHandle<
PolarsResult<(usize, Arc<DynByteSource>, MemSlice)>,
>| {
let first_schema = first_schema.clone();
let projected_arrow_schema = projected_arrow_schema.clone();
let first_metadata = first_metadata.clone();
// Run on CPU runtime - metadata deserialization is expensive, especially
Expand All @@ -138,14 +133,16 @@ impl ParquetSourceNode {

let schema = polars_parquet::arrow::read::infer_schema(&metadata)?;

if !has_projection && schema.len() > reader_schema_len {
if !has_projection && schema.len() > first_schema.len() {
polars_bail!(
SchemaMismatch:
"parquet file contained extra columns and no selection was given"
)
}

if !allow_missing_columns {
if allow_missing_columns {
ensure_matching_dtypes_if_found(&first_schema, &schema)?;
} else {
ensure_schema_has_projected_fields(
&schema,
projected_arrow_schema.as_ref(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ pub(super) fn ensure_schema_has_projected_fields(
};

if dtype != expected_dtype {
polars_bail!(SchemaMismatch: "data type mismatch for column {}: found: {}, expected: {}",
&field.name, dtype, expected_dtype
polars_bail!(SchemaMismatch: "data type mismatch for column {}: expected: {}, found: {}",
&field.name, expected_dtype, dtype
)
}
}
Expand Down
15 changes: 15 additions & 0 deletions crates/polars-stream/src/nodes/parquet_source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub struct ParquetSourceNode {
config: Config,
verbose: bool,
physical_predicate: Option<Arc<dyn PhysicalIoExpr>>,
schema: Option<Arc<ArrowSchema>>,
projected_arrow_schema: Option<Arc<ArrowSchema>>,
byte_source_builder: DynByteSourceBuilder,
memory_prefetch_func: fn(&[u8]) -> (),
Expand Down Expand Up @@ -112,6 +113,7 @@ impl ParquetSourceNode {
},
verbose,
physical_predicate: None,
schema: None,
projected_arrow_schema: None,
byte_source_builder,
memory_prefetch_func,
Expand Down Expand Up @@ -154,6 +156,19 @@ impl ComputeNode for ParquetSourceNode {
eprintln!("[ParquetSource]: {:?}", &self.config);
}

self.schema = Some(
self.options
.schema
.take()
.unwrap_or_else(|| self.file_info.reader_schema.take().unwrap().unwrap_left()),
);

{
// Ensure these are not used anymore
self.options.schema.take();
self.file_info.reader_schema.take();
}

self.init_projected_arrow_schema();
self.physical_predicate = self.predicate.clone().map(phys_expr_to_io_expr);

Expand Down
Loading

0 comments on commit 2a0c345

Please sign in to comment.