Skip to content

Commit

Permalink
Disable missing-column row group skipping (#435)
Browse files Browse the repository at this point in the history
## What changes are proposed in this pull request?

The optimization to treat a physically missing column as all-null is
unsound, if the schema was not already verified to prove that the
table's logical schema actually includes the missing column. Disable it
until we can add the necessary validation.

## How was this change tested?

New unit tests.

Fixes #434
  • Loading branch information
scovich authored and zachschuermann committed Oct 28, 2024
1 parent b09078d commit a883c30
Show file tree
Hide file tree
Showing 9 changed files with 116 additions and 7 deletions.
8 changes: 7 additions & 1 deletion kernel/src/engine/parquet_row_group_skipping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,14 @@ impl<'a> ParquetStatsSkippingFilter for RowGroupFilter<'a> {
fn get_nullcount_stat_value(&self, col: &ColumnPath) -> Option<i64> {
// NOTE: Stats for any given column are optional, which may produce a NULL nullcount. But if
// the column itself is missing, then we know all values are implied to be NULL.
//
let Some(stats) = self.get_stats(col) else {
return Some(self.get_rowcount_stat_value());
// WARNING: This optimization is only sound if the caller has verified that the column
// actually exists in the table's logical schema, and that any necessary logical to
// physical name mapping has been performed. Because we currently lack both the
// validation and the name mapping support, we must disable this optimization for the
// time being. See https://github.com/delta-incubator/delta-kernel-rs/issues/434.
return Some(self.get_rowcount_stat_value()).filter(|_| false);
};

// WARNING: [`Statistics::null_count_opt`] returns Some(0) when the underlying stat is
Expand Down
37 changes: 37 additions & 0 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,43 @@ mod tests {
assert_eq!(data.len(), 0);
}

#[test]
fn test_missing_column_row_group_skipping() {
let path = std::fs::canonicalize(PathBuf::from("./tests/data/parquet_row_group_skipping/"));
let url = url::Url::from_directory_path(path.unwrap()).unwrap();
let engine = SyncEngine::new();

let table = Table::new(url);
let snapshot = Arc::new(table.snapshot(&engine, None).unwrap());

// Predicate over a logically valid but physically missing column. No data files should be
// returned because the column is inferred to be all-null.
//
// WARNING: https://github.com/delta-incubator/delta-kernel-rs/issues/434 - This
// optimization is currently disabled, so the one data file is still returned.
let predicate = Arc::new(column_expr!("missing").lt(1000i64));
let scan = snapshot
.clone()
.scan_builder()
.with_predicate(predicate)
.build()
.unwrap();
let data: Vec<_> = scan.execute(&engine).unwrap().try_collect().unwrap();
assert_eq!(data.len(), 1);

// Predicate over a logically missing column, so the one data file should be returned.
//
// TODO: This should ideally trigger an error instead?
let predicate = Arc::new(column_expr!("numeric.ints.invalid").lt(1000));
let scan = snapshot
.scan_builder()
.with_predicate(predicate)
.build()
.unwrap();
let data: Vec<_> = scan.execute(&engine).unwrap().try_collect().unwrap();
assert_eq!(data.len(), 1);
}

#[test_log::test]
fn test_scan_with_checkpoint() -> DeltaResult<()> {
let path = std::fs::canonicalize(PathBuf::from(
Expand Down
19 changes: 14 additions & 5 deletions kernel/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -680,13 +680,22 @@ mod tests {
.try_collect()
.unwrap();

// The checkpoint has five parts, each containing one action. The P&M come from first and
// third parts, respectively. The parquet reader will skip the other three parts. Note that
// the actual `read_metadata` would anyway skip the last two parts because it terminates the
// iteration immediately after finding both P&M.
// The checkpoint has five parts, each containing one action:
// 1. txn (physically missing P&M columns)
// 2. metaData
// 3. protocol
// 4. add
// 5. txn (physically missing P&M columns)
//
// The parquet reader should skip parts 1, 3, and 5. Note that the actual `read_metadata`
// always skips parts 4 and 5 because it terminates the iteration after finding both P&M.
//
// NOTE: Each checkpoint part is a single-row file -- guaranteed to produce one row group.
assert_eq!(data.len(), 2);
//
// WARNING: https://github.com/delta-incubator/delta-kernel-rs/issues/434 -- We currently
// read parts 1 and 5 (4 in all instead of 2) because row group skipping is disabled for
// missing columns, but can still skip part 3 because has valid nullcount stats for P&M.
assert_eq!(data.len(), 4);
}

#[test_log::test]
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"version":1,"size":5,"sizeInBytes":51112,"parts":5,"numOfAddFiles":1,"checkpointSchema":{"type":"struct","fields":[{"name":"txn","type":{"type":"struct","fields":[{"name":"appId","type":"string","nullable":true,"metadata":{}},{"name":"version","type":"long","nullable":true,"metadata":{}},{"name":"lastUpdated","type":"long","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"add","type":{"type":"struct","fields":[{"name":"path","type":"string","nullable":true,"metadata":{}},{"name":"partitionValues","type":{"type":"map","keyType":"string","valueType":"string","valueContainsNull":true},"nullable":true,"metadata":{}},{"name":"size","type":"long","nullable":true,"metadata":{}},{"name":"modificationTime","type":"long","nullable":true,"metadata":{}},{"name":"dataChange","type":"boolean","nullable":true,"metadata":{}},{"name":"tags","type":{"type":"map","keyType":"string","valueType":"string","valueContainsNull":true},"nullable":true,"metadata":{}},{"name":"deletionVector","type":{"type":"struct","fields":[{"name":"storageType","type":"string","nullable":true,"metadata":{}},{"name":"pathOrInlineDv","type":"string","nullable":true,"metadata":{}},{"name":"offset","type":"integer","nullable":true,"metadata":{}},{"name":"sizeInBytes","type":"integer","nullable":true,"metadata":{}},{"name":"cardinality","type":"long","nullable":true,"metadata":{}},{"name":"maxRowIndex","type":"long","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"baseRowId","type":"long","nullable":true,"metadata":{}},{"name":"defaultRowCommitVersion","type":"long","nullable":true,"metadata":{}},{"name":"clusteringProvider","type":"string","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"remove","type":{"type":"struct","fields":[{"name":"path","type":"string","nullable":true,"metadata":{}},{"name":"deletionTimestamp","type":"long","nullable":true,"metadata":{}},{"name":"dataChange","type":"boolean","nullable":true,"metadata":{}},{"name":"extendedFileMetadata","type":"boolean","nullable":true,"metadata":{}},{"name":"partitionValues","type":{"type":"map","keyType":"string","valueType":"string","valueContainsNull":true},"nullable":true,"metadata":{}},{"name":"size","type":"long","nullable":true,"metadata":{}},{"name":"deletionVector","type":{"type":"struct","fields":[{"name":"storageType","type":"string","nullable":true,"metadata":{}},{"name":"pathOrInlineDv","type":"string","nullable":true,"metadata":{}},{"name":"offset","type":"integer","nullable":true,"metadata":{}},{"name":"sizeInBytes","type":"integer","nullable":true,"metadata":{}},{"name":"cardinality","type":"long","nullable":true,"metadata":{}},{"name":"maxRowIndex","type":"long","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"baseRowId","type":"long","nullable":true,"metadata":{}},{"name":"defaultRowCommitVersion","type":"long","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"metaData","type":{"type":"struct","fields":[{"name":"id","type":"string","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"description","type":"string","nullable":true,"metadata":{}},{"name":"format","type":{"type":"struct","fields":[{"name":"provider","type":"string","nullable":true,"metadata":{}},{"name":"options","type":{"type":"map","keyType":"string","valueType":"string","valueContainsNull":true},"nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"schemaString","type":"string","nullable":true,"metadata":{}},{"name":"partitionColumns","type":{"type":"array","elementType":"string","containsNull":true},"nullable":true,"metadata":{}},{"name":"configuration","type":{"type":"map","keyType":"string","valueType":"string","valueContainsNull":true},"nullable":true,"metadata":{}},{"name":"createdTime","type":"long","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"protocol","type":{"type":"struct","fields":[{"name":"minReaderVersion","type":"integer","nullable":true,"metadata":{}},{"name":"minWriterVersion","type":"integer","nullable":true,"metadata":{}},{"name":"readerFeatures","type":{"type":"array","elementType":"string","containsNull":true},"nullable":true,"metadata":{}},{"name":"writerFeatures","type":{"type":"array","elementType":"string","containsNull":true},"nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"domainMetadata","type":{"type":"struct","fields":[{"name":"domain","type":"string","nullable":true,"metadata":{}},{"name":"configuration","type":"string","nullable":true,"metadata":{}},{"name":"removed","type":"boolean","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]}}
{"version":1,"size":5,"sizeInBytes":51470,"parts":5,"numOfAddFiles":1,"checkpointSchema":{"type":"struct","fields":[{"name":"txn","type":{"type":"struct","fields":[{"name":"appId","type":"string","nullable":true,"metadata":{}},{"name":"version","type":"long","nullable":true,"metadata":{}},{"name":"lastUpdated","type":"long","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"add","type":{"type":"struct","fields":[{"name":"path","type":"string","nullable":true,"metadata":{}},{"name":"partitionValues","type":{"type":"map","keyType":"string","valueType":"string","valueContainsNull":true},"nullable":true,"metadata":{}},{"name":"size","type":"long","nullable":true,"metadata":{}},{"name":"modificationTime","type":"long","nullable":true,"metadata":{}},{"name":"dataChange","type":"boolean","nullable":true,"metadata":{}},{"name":"tags","type":{"type":"map","keyType":"string","valueType":"string","valueContainsNull":true},"nullable":true,"metadata":{}},{"name":"deletionVector","type":{"type":"struct","fields":[{"name":"storageType","type":"string","nullable":true,"metadata":{}},{"name":"pathOrInlineDv","type":"string","nullable":true,"metadata":{}},{"name":"offset","type":"integer","nullable":true,"metadata":{}},{"name":"sizeInBytes","type":"integer","nullable":true,"metadata":{}},{"name":"cardinality","type":"long","nullable":true,"metadata":{}},{"name":"maxRowIndex","type":"long","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"baseRowId","type":"long","nullable":true,"metadata":{}},{"name":"defaultRowCommitVersion","type":"long","nullable":true,"metadata":{}},{"name":"clusteringProvider","type":"string","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"remove","type":{"type":"struct","fields":[{"name":"path","type":"string","nullable":true,"metadata":{}},{"name":"deletionTimestamp","type":"long","nullable":true,"metadata":{}},{"name":"dataChange","type":"boolean","nullable":true,"metadata":{}},{"name":"extendedFileMetadata","type":"boolean","nullable":true,"metadata":{}},{"name":"partitionValues","type":{"type":"map","keyType":"string","valueType":"string","valueContainsNull":true},"nullable":true,"metadata":{}},{"name":"size","type":"long","nullable":true,"metadata":{}},{"name":"deletionVector","type":{"type":"struct","fields":[{"name":"storageType","type":"string","nullable":true,"metadata":{}},{"name":"pathOrInlineDv","type":"string","nullable":true,"metadata":{}},{"name":"offset","type":"integer","nullable":true,"metadata":{}},{"name":"sizeInBytes","type":"integer","nullable":true,"metadata":{}},{"name":"cardinality","type":"long","nullable":true,"metadata":{}},{"name":"maxRowIndex","type":"long","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"baseRowId","type":"long","nullable":true,"metadata":{}},{"name":"defaultRowCommitVersion","type":"long","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"metaData","type":{"type":"struct","fields":[{"name":"id","type":"string","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"description","type":"string","nullable":true,"metadata":{}},{"name":"format","type":{"type":"struct","fields":[{"name":"provider","type":"string","nullable":true,"metadata":{}},{"name":"options","type":{"type":"map","keyType":"string","valueType":"string","valueContainsNull":true},"nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"schemaString","type":"string","nullable":true,"metadata":{}},{"name":"partitionColumns","type":{"type":"array","elementType":"string","containsNull":true},"nullable":true,"metadata":{}},{"name":"configuration","type":{"type":"map","keyType":"string","valueType":"string","valueContainsNull":true},"nullable":true,"metadata":{}},{"name":"createdTime","type":"long","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"protocol","type":{"type":"struct","fields":[{"name":"minReaderVersion","type":"integer","nullable":true,"metadata":{}},{"name":"minWriterVersion","type":"integer","nullable":true,"metadata":{}},{"name":"readerFeatures","type":{"type":"array","elementType":"string","containsNull":true},"nullable":true,"metadata":{}},{"name":"writerFeatures","type":{"type":"array","elementType":"string","containsNull":true},"nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"domainMetadata","type":{"type":"struct","fields":[{"name":"domain","type":"string","nullable":true,"metadata":{}},{"name":"configuration","type":"string","nullable":true,"metadata":{}},{"name":"removed","type":"boolean","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]}}
57 changes: 57 additions & 0 deletions kernel/tests/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1073,3 +1073,60 @@ fn type_widening_decimal() -> Result<(), Box<dyn std::error::Error>> {
]);
read_table_data_str("./tests/data/type-widening/", select_cols, None, expected)
}

// Verify that predicates over invalid/missing columns do not cause skipping.
#[test]
fn predicate_references_invalid_missing_column() -> Result<(), Box<dyn std::error::Error>> {
// Attempted skipping over a logically valid but physically missing column. We should be able to
// skip the data file because the missing column is inferred to be all-null.
//
// WARNING: https://github.com/delta-incubator/delta-kernel-rs/issues/434 -- currently disabled.
//
//let expected = vec![
// "+--------+",
// "| chrono |",
// "+--------+",
// "+--------+",
//];
let columns = &["chrono"];
let expected = vec![
"+-------------------------------------------------------------------------------------------+",
"| chrono |",
"+-------------------------------------------------------------------------------------------+",
"| {date32: 1971-01-01, timestamp: 1970-02-01T08:00:00Z, timestamp_ntz: 1970-01-02T00:00:00} |",
"| {date32: 1971-01-02, timestamp: 1970-02-01T09:00:00Z, timestamp_ntz: 1970-01-02T00:01:00} |",
"| {date32: 1971-01-03, timestamp: 1970-02-01T10:00:00Z, timestamp_ntz: 1970-01-02T00:02:00} |",
"| {date32: 1971-01-04, timestamp: 1970-02-01T11:00:00Z, timestamp_ntz: 1970-01-02T00:03:00} |",
"| {date32: 1971-01-05, timestamp: 1970-02-01T12:00:00Z, timestamp_ntz: 1970-01-02T00:04:00} |",
"+-------------------------------------------------------------------------------------------+",
];
let predicate = column_expr!("missing").lt(10i64);
read_table_data_str(
"./tests/data/parquet_row_group_skipping/",
Some(columns),
Some(predicate),
expected,
)?;

// Attempted skipping over an invalid (logically missing) column. Ideally this should throw a
// query error, but at a minimum it should not cause incorrect data skipping.
let expected = vec![
"+-------------------------------------------------------------------------------------------+",
"| chrono |",
"+-------------------------------------------------------------------------------------------+",
"| {date32: 1971-01-01, timestamp: 1970-02-01T08:00:00Z, timestamp_ntz: 1970-01-02T00:00:00} |",
"| {date32: 1971-01-02, timestamp: 1970-02-01T09:00:00Z, timestamp_ntz: 1970-01-02T00:01:00} |",
"| {date32: 1971-01-03, timestamp: 1970-02-01T10:00:00Z, timestamp_ntz: 1970-01-02T00:02:00} |",
"| {date32: 1971-01-04, timestamp: 1970-02-01T11:00:00Z, timestamp_ntz: 1970-01-02T00:03:00} |",
"| {date32: 1971-01-05, timestamp: 1970-02-01T12:00:00Z, timestamp_ntz: 1970-01-02T00:04:00} |",
"+-------------------------------------------------------------------------------------------+",
];
let predicate = column_expr!("invalid").lt(10);
read_table_data_str(
"./tests/data/parquet_row_group_skipping/",
Some(columns),
Some(predicate),
expected,
)?;
Ok(())
}

0 comments on commit a883c30

Please sign in to comment.