Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/ict-table-property' into ict-tab…
Browse files Browse the repository at this point in the history
…le-property
  • Loading branch information
zachschuermann committed Feb 6, 2025
2 parents 19d4d89 + 6e63f34 commit bf7759d
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 128 deletions.
22 changes: 10 additions & 12 deletions kernel/examples/read-table-multi-threaded/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
use delta_kernel::engine::default::DefaultEngine;
use delta_kernel::engine::sync::SyncEngine;
use delta_kernel::scan::state::{DvInfo, GlobalScanState, Stats};
use delta_kernel::scan::transform_to_logical;
use delta_kernel::scan::state::{transform_to_logical, DvInfo, GlobalScanState, Stats};
use delta_kernel::schema::Schema;
use delta_kernel::{DeltaResult, Engine, EngineData, ExpressionRef, FileMeta, Table};

Expand Down Expand Up @@ -81,7 +80,7 @@ fn main() -> ExitCode {
struct ScanFile {
path: String,
size: i64,
partition_values: HashMap<String, String>,
transform: Option<ExpressionRef>,
dv_info: DvInfo,
}

Expand Down Expand Up @@ -111,13 +110,13 @@ fn send_scan_file(
size: i64,
_stats: Option<Stats>,
dv_info: DvInfo,
_transform: Option<ExpressionRef>,
partition_values: HashMap<String, String>,
transform: Option<ExpressionRef>,
_: HashMap<String, String>,
) {
let scan_file = ScanFile {
path: path.to_string(),
size,
partition_values,
transform,
dv_info,
};
scan_tx.send(scan_file).unwrap();
Expand Down Expand Up @@ -258,7 +257,6 @@ fn do_work(
) {
// get the type for the function calls
let engine: &dyn Engine = engine.as_ref();
let physical_schema = scan_state.physical_schema.clone();
// in a loop, try and get a ScanFile. Note that `recv` will return an `Err` when the other side
// hangs up, which indicates there's no more data to process.
while let Ok(scan_file) = scan_file_rx.recv() {
Expand Down Expand Up @@ -289,19 +287,19 @@ fn do_work(
// vector
let read_results = engine
.get_parquet_handler()
.read_parquet_files(&[meta], physical_schema.clone(), None)
.read_parquet_files(&[meta], scan_state.physical_schema.clone(), None)
.unwrap();

for read_result in read_results {
let read_result = read_result.unwrap();
let len = read_result.len();

// ask the kernel to transform the physical data into the correct logical form
// transform the physical data into the correct logical form
let logical = transform_to_logical(
engine,
read_result,
&scan_state,
&scan_file.partition_values,
&scan_state.physical_schema,
&scan_state.logical_schema,
&scan_file.transform,
)
.unwrap();

Expand Down
58 changes: 51 additions & 7 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ pub(crate) const SET_TRANSACTION_NAME: &str = "txn";
pub(crate) const COMMIT_INFO_NAME: &str = "commitInfo";
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) const CDC_NAME: &str = "cdc";
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) const SIDECAR_NAME: &str = "sidecar";

static LOG_ADD_SCHEMA: LazyLock<SchemaRef> =
LazyLock::new(|| StructType::new([Option::<Add>::get_struct_field(ADD_NAME)]).into());
Expand All @@ -58,6 +60,7 @@ static LOG_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
Option::<SetTransaction>::get_struct_field(SET_TRANSACTION_NAME),
Option::<CommitInfo>::get_struct_field(COMMIT_INFO_NAME),
Option::<Cdc>::get_struct_field(CDC_NAME),
Option::<Sidecar>::get_struct_field(SIDECAR_NAME),
// We don't support the following actions yet
//Option::<DomainMetadata>::get_struct_field(DOMAIN_METADATA_NAME),
])
Expand Down Expand Up @@ -326,9 +329,8 @@ where

#[derive(Debug, Clone, PartialEq, Eq, Schema)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
#[cfg_attr(test, derive(Serialize, Default), serde(rename_all = "camelCase"))]
struct CommitInfo {
pub(crate) struct CommitInfo {
/// The time this logical file was created, as milliseconds since the epoch.
/// Read: optional, write: required (that is, kernel always writes).
pub(crate) timestamp: Option<i64>,
Expand Down Expand Up @@ -417,9 +419,8 @@ impl Add {

#[derive(Debug, Clone, PartialEq, Eq, Schema)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
#[cfg_attr(test, derive(Serialize, Default), serde(rename_all = "camelCase"))]
struct Remove {
pub(crate) struct Remove {
/// A relative path to a data file from the root of the table or an absolute path to a file
/// that should be added to the table. The path is a URI as specified by
/// [RFC 2396 URI Generic Syntax], which needs to be decoded to get the data file path.
Expand Down Expand Up @@ -468,9 +469,8 @@ struct Remove {

#[derive(Debug, Clone, PartialEq, Eq, Schema)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
#[cfg_attr(test, derive(Serialize, Default), serde(rename_all = "camelCase"))]
struct Cdc {
pub(crate) struct Cdc {
/// A relative path to a change data file from the root of the table or an absolute path to a
/// change data file that should be added to the table. The path is a URI as specified by
/// [RFC 2396 URI Generic Syntax], which needs to be decoded to get the file path.
Expand Down Expand Up @@ -511,6 +511,33 @@ pub struct SetTransaction {
pub last_updated: Option<i64>,
}

/// The sidecar action references a sidecar file which provides some of the checkpoint's
/// file actions. This action is only allowed in checkpoints following the V2 spec.
///
/// [More info]: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#sidecar-file-information
#[allow(unused)] //TODO: Remove once we implement V2 checkpoint file processing
#[derive(Schema, Debug, PartialEq)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) struct Sidecar {
/// A path to a sidecar file that can be either:
/// - A relative path (just the file name) within the `_delta_log/_sidecars` directory.
/// - An absolute path
/// The path is a URI as specified by [RFC 2396 URI Generic Syntax], which needs to be decoded
/// to get the file path.
///
/// [RFC 2396 URI Generic Syntax]: https://www.ietf.org/rfc/rfc2396.txt
pub path: String,

/// The size of the sidecar file in bytes.
pub size_in_bytes: i64,

/// The time this logical file was created, as milliseconds since the epoch.
pub modification_time: i64,

/// A map containing any additional metadata about the logicial file.
pub tags: Option<HashMap<String, String>>,
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand Down Expand Up @@ -637,7 +664,7 @@ mod tests {
fn test_cdc_schema() {
let schema = get_log_schema()
.project(&[CDC_NAME])
.expect("Couldn't get remove field");
.expect("Couldn't get cdc field");
let expected = Arc::new(StructType::new([StructField::nullable(
"cdc",
StructType::new([
Expand All @@ -654,6 +681,23 @@ mod tests {
assert_eq!(schema, expected);
}

#[test]
fn test_sidecar_schema() {
let schema = get_log_schema()
.project(&[SIDECAR_NAME])
.expect("Couldn't get sidecar field");
let expected = Arc::new(StructType::new([StructField::nullable(
"sidecar",
StructType::new([
StructField::not_null("path", DataType::STRING),
StructField::not_null("sizeInBytes", DataType::LONG),
StructField::not_null("modificationTime", DataType::LONG),
tags_field(),
]),
)]));
assert_eq!(schema, expected);
}

#[test]
fn test_transaction_schema() {
let schema = get_log_schema()
Expand Down
92 changes: 78 additions & 14 deletions kernel/src/actions/visitors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@ use crate::{DeltaResult, Error};
use super::deletion_vector::DeletionVectorDescriptor;
use super::schemas::ToSchema as _;
use super::{
Add, Cdc, Format, Metadata, Protocol, Remove, SetTransaction, ADD_NAME, CDC_NAME,
METADATA_NAME, PROTOCOL_NAME, REMOVE_NAME, SET_TRANSACTION_NAME,
Add, Cdc, Format, Metadata, Protocol, Remove, SetTransaction, Sidecar, ADD_NAME, CDC_NAME,
METADATA_NAME, PROTOCOL_NAME, REMOVE_NAME, SET_TRANSACTION_NAME, SIDECAR_NAME,
};

#[derive(Default)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
struct MetadataVisitor {
pub(crate) struct MetadataVisitor {
pub(crate) metadata: Option<Metadata>,
}

Expand Down Expand Up @@ -114,8 +113,7 @@ impl RowVisitor for SelectionVectorVisitor {

#[derive(Default)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
struct ProtocolVisitor {
pub(crate) struct ProtocolVisitor {
pub(crate) protocol: Option<Protocol>,
}

Expand Down Expand Up @@ -318,15 +316,13 @@ impl RowVisitor for RemoveVisitor {
#[allow(unused)]
#[derive(Default)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
struct CdcVisitor {
pub(crate) struct CdcVisitor {
pub(crate) cdcs: Vec<Cdc>,
}

impl CdcVisitor {
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
fn visit_cdc<'a>(
pub(crate) fn visit_cdc<'a>(
row_index: usize,
path: String,
getters: &[&'a dyn GetData<'a>],
Expand Down Expand Up @@ -377,7 +373,6 @@ pub type SetTransactionMap = HashMap<String, SetTransaction>;
///
#[derive(Default, Debug)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
pub(crate) struct SetTransactionVisitor {
pub(crate) set_transactions: SetTransactionMap,
pub(crate) application_id: Option<String>,
Expand All @@ -393,8 +388,7 @@ impl SetTransactionVisitor {
}

#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
fn visit_txn<'a>(
pub(crate) fn visit_txn<'a>(
row_index: usize,
app_id: String,
getters: &[&'a dyn GetData<'a>],
Expand Down Expand Up @@ -444,6 +438,52 @@ impl RowVisitor for SetTransactionVisitor {
}
}

#[allow(unused)] //TODO: Remove once we implement V2 checkpoint file processing
#[derive(Default)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) struct SidecarVisitor {
pub(crate) sidecars: Vec<Sidecar>,
}

impl SidecarVisitor {
fn visit_sidecar<'a>(
row_index: usize,
path: String,
getters: &[&'a dyn GetData<'a>],
) -> DeltaResult<Sidecar> {
Ok(Sidecar {
path,
size_in_bytes: getters[1].get(row_index, "sidecar.sizeInBytes")?,
modification_time: getters[2].get(row_index, "sidecar.modificationTime")?,
tags: getters[3].get_opt(row_index, "sidecar.tags")?,
})
}
}

impl RowVisitor for SidecarVisitor {
fn selected_column_names_and_types(&self) -> (&'static [ColumnName], &'static [DataType]) {
static NAMES_AND_TYPES: LazyLock<ColumnNamesAndTypes> =
LazyLock::new(|| Sidecar::to_schema().leaves(SIDECAR_NAME));
NAMES_AND_TYPES.as_ref()
}
fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
require!(
getters.len() == 4,
Error::InternalError(format!(
"Wrong number of SidecarVisitor getters: {}",
getters.len()
))
);
for i in 0..row_count {
// Since path column is required, use it to detect presence of a sidecar action
if let Some(path) = getters[0].get_opt(i, "sidecar.path")? {
self.sidecars.push(Self::visit_sidecar(i, path, getters)?);
}
}
Ok(())
}
}

/// Get a DV out of some engine data. The caller is responsible for slicing the `getters` slice such
/// that the first element contains the `storageType` element of the deletion vector.
pub(crate) fn visit_deletion_vector_at<'a>(
Expand Down Expand Up @@ -501,7 +541,8 @@ mod tests {
r#"{"commitInfo":{"timestamp":1677811178585,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"10","numOutputBytes":"635"},"engineInfo":"Databricks-Runtime/<unknown>","txnId":"a6a94671-55ef-450e-9546-b8465b9147de"}}"#,
r#"{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["deletionVectors"],"writerFeatures":["deletionVectors"]}}"#,
r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableDeletionVectors":"true","delta.columnMapping.mode":"none", "delta.enableChangeDataFeed":"true"},"createdTime":1677811175819}}"#,
r#"{"cdc":{"path":"_change_data/age=21/cdc-00000-93f7fceb-281a-446a-b221-07b88132d203.c000.snappy.parquet","partitionValues":{"age":"21"},"size":1033,"dataChange":false}}"#
r#"{"cdc":{"path":"_change_data/age=21/cdc-00000-93f7fceb-281a-446a-b221-07b88132d203.c000.snappy.parquet","partitionValues":{"age":"21"},"size":1033,"dataChange":false}}"#,
r#"{"sidecar":{"path":"016ae953-37a9-438e-8683-9a9a4a79a395.parquet","sizeInBytes":9268,"modificationTime":1714496113961,"tags":{"tag_foo":"tag_bar"}}}"#,
]
.into();
let output_schema = get_log_schema().clone();
Expand Down Expand Up @@ -544,6 +585,29 @@ mod tests {
Ok(())
}

#[test]
fn test_parse_sidecar() -> DeltaResult<()> {
let data = action_batch();

let mut visitor = SidecarVisitor::default();
visitor.visit_rows_of(data.as_ref())?;

let sidecar1 = Sidecar {
path: "016ae953-37a9-438e-8683-9a9a4a79a395.parquet".into(),
size_in_bytes: 9268,
modification_time: 1714496113961,
tags: Some(HashMap::from([(
"tag_foo".to_string(),
"tag_bar".to_string(),
)])),
};

assert_eq!(visitor.sidecars.len(), 1);
assert_eq!(visitor.sidecars[0], sidecar1);

Ok(())
}

#[test]
fn test_parse_metadata() -> DeltaResult<()> {
let data = action_batch();
Expand Down
Loading

0 comments on commit bf7759d

Please sign in to comment.