Skip to content

Commit

Permalink
fix: Ensure parquet schema arg is propagated to IR
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion committed Oct 3, 2024
1 parent 35946cf commit ee07020
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 17 deletions.
4 changes: 3 additions & 1 deletion crates/polars-io/src/parquet/read/options.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use arrow::datatypes::ArrowSchemaRef;
use polars_core::schema::SchemaRef;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct ParquetOptions {
pub schema: Option<ArrowSchemaRef>,
pub schema: Option<SchemaRef>,
pub arrow_schema: Option<ArrowSchemaRef>,
pub parallel: ParallelStrategy,
pub low_memory: bool,
pub use_statistics: bool,
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl LazyFileListReader for LazyParquetReader {
self.args.low_memory,
self.args.cloud_options,
self.args.use_statistics,
self.args.schema.as_deref(),
self.args.schema,
self.args.hive_options,
self.args.glob,
self.args.include_file_paths,
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-mem-engine/src/executors/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl ParquetExec {

let first_schema = self
.options
.schema
.arrow_schema
.clone()
.unwrap_or_else(|| self.file_info.reader_schema.clone().unwrap().unwrap_left());

Expand Down Expand Up @@ -264,7 +264,7 @@ impl ParquetExec {

let first_schema = self
.options
.schema
.arrow_schema
.clone()
.unwrap_or_else(|| self.file_info.reader_schema.clone().unwrap().unwrap_left());

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-pipe/src/executors/sources/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ impl ParquetSource {
let run_async = paths.first().map(is_cloud_url).unwrap_or(false) || config::force_async();

let first_schema = options
.schema
.arrow_schema
.clone()
.unwrap_or_else(|| file_info.reader_schema.clone().unwrap().unwrap_left());

Expand Down
5 changes: 3 additions & 2 deletions crates/polars-plan/src/plans/builder_dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl DslBuilder {
low_memory: bool,
cloud_options: Option<CloudOptions>,
use_statistics: bool,
schema: Option<&Schema>,
schema: Option<SchemaRef>,
hive_options: HiveOptions,
glob: bool,
include_file_paths: Option<PlSmallStr>,
Expand All @@ -109,7 +109,8 @@ impl DslBuilder {
file_options: options,
scan_type: FileScan::Parquet {
options: ParquetOptions {
schema: schema.map(|x| Arc::new(x.to_arrow(CompatLevel::newest()))),
schema,
arrow_schema: None,
parallel,
low_memory,
use_statistics,
Expand Down
31 changes: 22 additions & 9 deletions crates/polars-plan/src/plans/conversion/dsl_to_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,18 +145,31 @@ pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult
let mut file_info = match &mut scan_type {
#[cfg(feature = "parquet")]
FileScan::Parquet {
options,
cloud_options,
metadata,
..
} => {
let (file_info, md) = scans::parquet_file_info(
&sources,
&file_options,
cloud_options.as_ref(),
)
.map_err(|e| e.context(failed_here!(parquet scan)))?;
*metadata = md;
file_info
if let Some(schema) = &options.schema {
// We were passed a schema, we don't have to call `parquet_file_info`,
// but this does mean we don't have `row_estimation` and `first_metadata`.
FileInfo {
schema: schema.clone(),
reader_schema: Some(either::Either::Left(Arc::new(
schema.to_arrow(CompatLevel::newest()),
))),
row_estimation: (None, 0),
}
} else {
let (file_info, md) = scans::parquet_file_info(
&sources,
&file_options,
cloud_options.as_ref(),
)
.map_err(|e| e.context(failed_here!(parquet scan)))?;

*metadata = md;
file_info
}
},
#[cfg(feature = "ipc")]
FileScan::Ipc {
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-stream/src/nodes/parquet_source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl ComputeNode for ParquetSourceNode {

self.schema = Some(
self.options
.schema
.arrow_schema
.take()
.unwrap_or_else(|| self.file_info.reader_schema.take().unwrap().unwrap_left()),
);
Expand Down
10 changes: 10 additions & 0 deletions py-polars/tests/unit/io/test_lazy_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,16 @@ def test_parquet_schema_arg(
pl.DataFrame({"1": None, "a": [1, 2], "b": [1, 2]}, schema=schema),
)

# Issue #19081: Ensure explicit schema fields are propagated to the DSL/IR,
# otherwise downstream `select()`s etc. fail.
lf = pl.scan_parquet(
paths, parallel=parallel, schema=schema, allow_missing_columns=True
).select("1")

s = lf.collect(streaming=streaming).to_series()
assert s.len() == 2
assert s.null_count() == 2

# Test files containing extra columns not in `schema`

schema: dict[str, type[pl.DataType]] = {"a": pl.Int64} # type: ignore[no-redef]
Expand Down

0 comments on commit ee07020

Please sign in to comment.