Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Move various parts of datasource out of core #14616

Merged
merged 10 commits into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions datafusion/catalog-listing/src/file_meta.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use object_store::{path::Path, ObjectMeta};

use crate::FileRange;

/// A single file or part of a file that should be read, along with its schema, statistics
pub struct FileMeta {
/// Path for the file (e.g. URL, filesystem path, etc)
pub object_meta: ObjectMeta,
/// An optional file range for a more fine-grained parallel execution
pub range: Option<FileRange>,
/// An optional field for user defined per object metadata
pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
/// Size hint for the metadata of this file
pub metadata_size_hint: Option<usize>,
}

impl FileMeta {
/// The full path to the object
pub fn location(&self) -> &Path {
&self.object_meta.location
}
}

impl From<ObjectMeta> for FileMeta {
fn from(object_meta: ObjectMeta) -> Self {
Self {
object_meta,
range: None,
extensions: None,
metadata_size_hint: None,
}
}
}
278 changes: 278 additions & 0 deletions datafusion/catalog-listing/src/file_scan_config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::{borrow::Cow, collections::HashMap, marker::PhantomData, sync::Arc};

use arrow::{
array::{
ArrayData, ArrayRef, BufferBuilder, DictionaryArray, RecordBatch,
RecordBatchOptions,
},
buffer::Buffer,
datatypes::{ArrowNativeType, DataType, SchemaRef, UInt16Type},
};
use datafusion_common::{exec_err, Result};
use datafusion_common::{DataFusionError, ScalarValue};
use log::warn;

/// A helper that projects partition columns into the file record batches.
///
/// One interesting trick is the usage of a cache for the key buffers of the partition column
/// dictionaries. Indeed, the partition columns are constant, so the dictionaries that represent them
/// have all their keys equal to 0. This enables us to re-use the same "all-zero" buffer across batches,
/// which makes the space consumption of the partition columns O(batch_size) instead of O(record_count).
pub struct PartitionColumnProjector {
/// An Arrow buffer initialized to zeros that represents the key array of all partition
/// columns (partition columns are materialized by dictionary arrays with only one
/// value in the dictionary, thus all the keys are equal to zero).
key_buffer_cache: ZeroBufferGenerators,
/// Mapping between the indexes in the list of partition columns and the target
/// schema. Sorted by index in the target schema so that we can iterate on it to
/// insert the partition columns in the target record batch.
projected_partition_indexes: Vec<(usize, usize)>,
/// The schema of the table once the projection was applied.
projected_schema: SchemaRef,
}

impl PartitionColumnProjector {
// Create a projector to insert the partitioning columns into batches read from files
// - `projected_schema`: the target schema with both file and partitioning columns
// - `table_partition_cols`: all the partitioning column names
pub fn new(projected_schema: SchemaRef, table_partition_cols: &[String]) -> Self {
let mut idx_map = HashMap::new();
for (partition_idx, partition_name) in table_partition_cols.iter().enumerate() {
if let Ok(schema_idx) = projected_schema.index_of(partition_name) {
idx_map.insert(partition_idx, schema_idx);
}
}

let mut projected_partition_indexes: Vec<_> = idx_map.into_iter().collect();
projected_partition_indexes.sort_by(|(_, a), (_, b)| a.cmp(b));

Self {
projected_partition_indexes,
key_buffer_cache: Default::default(),
projected_schema,
}
}

// Transform the batch read from the file by inserting the partitioning columns
// to the right positions as deduced from `projected_schema`
// - `file_batch`: batch read from the file, with internal projection applied
// - `partition_values`: the list of partition values, one for each partition column
pub fn project(
&mut self,
file_batch: RecordBatch,
partition_values: &[ScalarValue],
) -> Result<RecordBatch> {
let expected_cols =
self.projected_schema.fields().len() - self.projected_partition_indexes.len();

if file_batch.columns().len() != expected_cols {
return exec_err!(
"Unexpected batch schema from file, expected {} cols but got {}",
expected_cols,
file_batch.columns().len()
);
}

let mut cols = file_batch.columns().to_vec();
for &(pidx, sidx) in &self.projected_partition_indexes {
let p_value =
partition_values
.get(pidx)
.ok_or(DataFusionError::Execution(
"Invalid partitioning found on disk".to_string(),
))?;

let mut partition_value = Cow::Borrowed(p_value);

// check if user forgot to dict-encode the partition value
let field = self.projected_schema.field(sidx);
let expected_data_type = field.data_type();
let actual_data_type = partition_value.data_type();
if let DataType::Dictionary(key_type, _) = expected_data_type {
if !matches!(actual_data_type, DataType::Dictionary(_, _)) {
warn!("Partition value for column {} was not dictionary-encoded, applied auto-fix.", field.name());
partition_value = Cow::Owned(ScalarValue::Dictionary(
key_type.clone(),
Box::new(partition_value.as_ref().clone()),
));
}
}

cols.insert(
sidx,
create_output_array(
&mut self.key_buffer_cache,
partition_value.as_ref(),
file_batch.num_rows(),
)?,
)
}

RecordBatch::try_new_with_options(
Arc::clone(&self.projected_schema),
cols,
&RecordBatchOptions::new().with_row_count(Some(file_batch.num_rows())),
)
.map_err(Into::into)
}
}

#[derive(Debug, Default)]
struct ZeroBufferGenerators {
gen_i8: ZeroBufferGenerator<i8>,
gen_i16: ZeroBufferGenerator<i16>,
gen_i32: ZeroBufferGenerator<i32>,
gen_i64: ZeroBufferGenerator<i64>,
gen_u8: ZeroBufferGenerator<u8>,
gen_u16: ZeroBufferGenerator<u16>,
gen_u32: ZeroBufferGenerator<u32>,
gen_u64: ZeroBufferGenerator<u64>,
}

/// Generate a arrow [`Buffer`] that contains zero values.
#[derive(Debug, Default)]
struct ZeroBufferGenerator<T>
where
T: ArrowNativeType,
{
cache: Option<Buffer>,
_t: PhantomData<T>,
}

impl<T> ZeroBufferGenerator<T>
where
T: ArrowNativeType,
{
const SIZE: usize = size_of::<T>();

fn get_buffer(&mut self, n_vals: usize) -> Buffer {
match &mut self.cache {
Some(buf) if buf.len() >= n_vals * Self::SIZE => {
buf.slice_with_length(0, n_vals * Self::SIZE)
}
_ => {
let mut key_buffer_builder = BufferBuilder::<T>::new(n_vals);
key_buffer_builder.advance(n_vals); // keys are all 0
self.cache.insert(key_buffer_builder.finish()).clone()
}
}
}
}

fn create_dict_array<T>(
buffer_gen: &mut ZeroBufferGenerator<T>,
dict_val: &ScalarValue,
len: usize,
data_type: DataType,
) -> Result<ArrayRef>
where
T: ArrowNativeType,
{
let dict_vals = dict_val.to_array()?;

let sliced_key_buffer = buffer_gen.get_buffer(len);

// assemble pieces together
let mut builder = ArrayData::builder(data_type)
.len(len)
.add_buffer(sliced_key_buffer);
builder = builder.add_child_data(dict_vals.to_data());
Ok(Arc::new(DictionaryArray::<UInt16Type>::from(
builder.build().unwrap(),
)))
}

fn create_output_array(
key_buffer_cache: &mut ZeroBufferGenerators,
val: &ScalarValue,
len: usize,
) -> Result<ArrayRef> {
if let ScalarValue::Dictionary(key_type, dict_val) = &val {
match key_type.as_ref() {
DataType::Int8 => {
return create_dict_array(
&mut key_buffer_cache.gen_i8,
dict_val,
len,
val.data_type(),
);
}
DataType::Int16 => {
return create_dict_array(
&mut key_buffer_cache.gen_i16,
dict_val,
len,
val.data_type(),
);
}
DataType::Int32 => {
return create_dict_array(
&mut key_buffer_cache.gen_i32,
dict_val,
len,
val.data_type(),
);
}
DataType::Int64 => {
return create_dict_array(
&mut key_buffer_cache.gen_i64,
dict_val,
len,
val.data_type(),
);
}
DataType::UInt8 => {
return create_dict_array(
&mut key_buffer_cache.gen_u8,
dict_val,
len,
val.data_type(),
);
}
DataType::UInt16 => {
return create_dict_array(
&mut key_buffer_cache.gen_u16,
dict_val,
len,
val.data_type(),
);
}
DataType::UInt32 => {
return create_dict_array(
&mut key_buffer_cache.gen_u32,
dict_val,
len,
val.data_type(),
);
}
DataType::UInt64 => {
return create_dict_array(
&mut key_buffer_cache.gen_u64,
dict_val,
len,
val.data_type(),
);
}
_ => {}
}
}

val.to_array_of_size(len)
}
Loading