diff --git a/datafusion/catalog-listing/src/file_meta.rs b/datafusion/catalog-listing/src/file_meta.rs new file mode 100644 index 000000000000..098a15eeb38a --- /dev/null +++ b/datafusion/catalog-listing/src/file_meta.rs @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use object_store::{path::Path, ObjectMeta}; + +use crate::FileRange; + +/// A single file or part of a file that should be read, along with its schema, statistics +pub struct FileMeta { + /// Path for the file (e.g. URL, filesystem path, etc) + pub object_meta: ObjectMeta, + /// An optional file range for a more fine-grained parallel execution + pub range: Option, + /// An optional field for user defined per object metadata + pub extensions: Option>, + /// Size hint for the metadata of this file + pub metadata_size_hint: Option, +} + +impl FileMeta { + /// The full path to the object + pub fn location(&self) -> &Path { + &self.object_meta.location + } +} + +impl From for FileMeta { + fn from(object_meta: ObjectMeta) -> Self { + Self { + object_meta, + range: None, + extensions: None, + metadata_size_hint: None, + } + } +} diff --git a/datafusion/catalog-listing/src/file_scan_config.rs b/datafusion/catalog-listing/src/file_scan_config.rs new file mode 100644 index 000000000000..bfddbc3a1fc4 --- /dev/null +++ b/datafusion/catalog-listing/src/file_scan_config.rs @@ -0,0 +1,278 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{borrow::Cow, collections::HashMap, marker::PhantomData, sync::Arc}; + +use arrow::{ + array::{ + ArrayData, ArrayRef, BufferBuilder, DictionaryArray, RecordBatch, + RecordBatchOptions, + }, + buffer::Buffer, + datatypes::{ArrowNativeType, DataType, SchemaRef, UInt16Type}, +}; +use datafusion_common::{exec_err, Result}; +use datafusion_common::{DataFusionError, ScalarValue}; +use log::warn; + +/// A helper that projects partition columns into the file record batches. +/// +/// One interesting trick is the usage of a cache for the key buffers of the partition column +/// dictionaries. Indeed, the partition columns are constant, so the dictionaries that represent them +/// have all their keys equal to 0. This enables us to re-use the same "all-zero" buffer across batches, +/// which makes the space consumption of the partition columns O(batch_size) instead of O(record_count). +pub struct PartitionColumnProjector { + /// An Arrow buffer initialized to zeros that represents the key array of all partition + /// columns (partition columns are materialized by dictionary arrays with only one + /// value in the dictionary, thus all the keys are equal to zero). + key_buffer_cache: ZeroBufferGenerators, + /// Mapping between the indexes in the list of partition columns and the target + /// schema. Sorted by index in the target schema so that we can iterate on it to + /// insert the partition columns in the target record batch. + projected_partition_indexes: Vec<(usize, usize)>, + /// The schema of the table once the projection was applied. + projected_schema: SchemaRef, +} + +impl PartitionColumnProjector { + // Create a projector to insert the partitioning columns into batches read from files + // - `projected_schema`: the target schema with both file and partitioning columns + // - `table_partition_cols`: all the partitioning column names + pub fn new(projected_schema: SchemaRef, table_partition_cols: &[String]) -> Self { + let mut idx_map = HashMap::new(); + for (partition_idx, partition_name) in table_partition_cols.iter().enumerate() { + if let Ok(schema_idx) = projected_schema.index_of(partition_name) { + idx_map.insert(partition_idx, schema_idx); + } + } + + let mut projected_partition_indexes: Vec<_> = idx_map.into_iter().collect(); + projected_partition_indexes.sort_by(|(_, a), (_, b)| a.cmp(b)); + + Self { + projected_partition_indexes, + key_buffer_cache: Default::default(), + projected_schema, + } + } + + // Transform the batch read from the file by inserting the partitioning columns + // to the right positions as deduced from `projected_schema` + // - `file_batch`: batch read from the file, with internal projection applied + // - `partition_values`: the list of partition values, one for each partition column + pub fn project( + &mut self, + file_batch: RecordBatch, + partition_values: &[ScalarValue], + ) -> Result { + let expected_cols = + self.projected_schema.fields().len() - self.projected_partition_indexes.len(); + + if file_batch.columns().len() != expected_cols { + return exec_err!( + "Unexpected batch schema from file, expected {} cols but got {}", + expected_cols, + file_batch.columns().len() + ); + } + + let mut cols = file_batch.columns().to_vec(); + for &(pidx, sidx) in &self.projected_partition_indexes { + let p_value = + partition_values + .get(pidx) + .ok_or(DataFusionError::Execution( + "Invalid partitioning found on disk".to_string(), + ))?; + + let mut partition_value = Cow::Borrowed(p_value); + + // check if user forgot to dict-encode the partition value + let field = self.projected_schema.field(sidx); + let expected_data_type = field.data_type(); + let actual_data_type = partition_value.data_type(); + if let DataType::Dictionary(key_type, _) = expected_data_type { + if !matches!(actual_data_type, DataType::Dictionary(_, _)) { + warn!("Partition value for column {} was not dictionary-encoded, applied auto-fix.", field.name()); + partition_value = Cow::Owned(ScalarValue::Dictionary( + key_type.clone(), + Box::new(partition_value.as_ref().clone()), + )); + } + } + + cols.insert( + sidx, + create_output_array( + &mut self.key_buffer_cache, + partition_value.as_ref(), + file_batch.num_rows(), + )?, + ) + } + + RecordBatch::try_new_with_options( + Arc::clone(&self.projected_schema), + cols, + &RecordBatchOptions::new().with_row_count(Some(file_batch.num_rows())), + ) + .map_err(Into::into) + } +} + +#[derive(Debug, Default)] +struct ZeroBufferGenerators { + gen_i8: ZeroBufferGenerator, + gen_i16: ZeroBufferGenerator, + gen_i32: ZeroBufferGenerator, + gen_i64: ZeroBufferGenerator, + gen_u8: ZeroBufferGenerator, + gen_u16: ZeroBufferGenerator, + gen_u32: ZeroBufferGenerator, + gen_u64: ZeroBufferGenerator, +} + +/// Generate a arrow [`Buffer`] that contains zero values. +#[derive(Debug, Default)] +struct ZeroBufferGenerator +where + T: ArrowNativeType, +{ + cache: Option, + _t: PhantomData, +} + +impl ZeroBufferGenerator +where + T: ArrowNativeType, +{ + const SIZE: usize = size_of::(); + + fn get_buffer(&mut self, n_vals: usize) -> Buffer { + match &mut self.cache { + Some(buf) if buf.len() >= n_vals * Self::SIZE => { + buf.slice_with_length(0, n_vals * Self::SIZE) + } + _ => { + let mut key_buffer_builder = BufferBuilder::::new(n_vals); + key_buffer_builder.advance(n_vals); // keys are all 0 + self.cache.insert(key_buffer_builder.finish()).clone() + } + } + } +} + +fn create_dict_array( + buffer_gen: &mut ZeroBufferGenerator, + dict_val: &ScalarValue, + len: usize, + data_type: DataType, +) -> Result +where + T: ArrowNativeType, +{ + let dict_vals = dict_val.to_array()?; + + let sliced_key_buffer = buffer_gen.get_buffer(len); + + // assemble pieces together + let mut builder = ArrayData::builder(data_type) + .len(len) + .add_buffer(sliced_key_buffer); + builder = builder.add_child_data(dict_vals.to_data()); + Ok(Arc::new(DictionaryArray::::from( + builder.build().unwrap(), + ))) +} + +fn create_output_array( + key_buffer_cache: &mut ZeroBufferGenerators, + val: &ScalarValue, + len: usize, +) -> Result { + if let ScalarValue::Dictionary(key_type, dict_val) = &val { + match key_type.as_ref() { + DataType::Int8 => { + return create_dict_array( + &mut key_buffer_cache.gen_i8, + dict_val, + len, + val.data_type(), + ); + } + DataType::Int16 => { + return create_dict_array( + &mut key_buffer_cache.gen_i16, + dict_val, + len, + val.data_type(), + ); + } + DataType::Int32 => { + return create_dict_array( + &mut key_buffer_cache.gen_i32, + dict_val, + len, + val.data_type(), + ); + } + DataType::Int64 => { + return create_dict_array( + &mut key_buffer_cache.gen_i64, + dict_val, + len, + val.data_type(), + ); + } + DataType::UInt8 => { + return create_dict_array( + &mut key_buffer_cache.gen_u8, + dict_val, + len, + val.data_type(), + ); + } + DataType::UInt16 => { + return create_dict_array( + &mut key_buffer_cache.gen_u16, + dict_val, + len, + val.data_type(), + ); + } + DataType::UInt32 => { + return create_dict_array( + &mut key_buffer_cache.gen_u32, + dict_val, + len, + val.data_type(), + ); + } + DataType::UInt64 => { + return create_dict_array( + &mut key_buffer_cache.gen_u64, + dict_val, + len, + val.data_type(), + ); + } + _ => {} + } + } + + val.to_array_of_size(len) +} diff --git a/datafusion/catalog-listing/src/file_stream.rs b/datafusion/catalog-listing/src/file_stream.rs new file mode 100644 index 000000000000..570ca6678538 --- /dev/null +++ b/datafusion/catalog-listing/src/file_stream.rs @@ -0,0 +1,214 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! A generic stream over file format readers that can be used by +//! any file format that read its files from start to end. +//! +//! Note: Most traits here need to be marked `Sync + Send` to be +//! compliant with the `SendableRecordBatchStream` trait. + +use crate::file_meta::FileMeta; +use datafusion_common::error::Result; +use datafusion_physical_plan::metrics::{ + Count, ExecutionPlanMetricsSet, MetricBuilder, Time, +}; + +use arrow::error::ArrowError; +use arrow::record_batch::RecordBatch; +use datafusion_common::instant::Instant; +use datafusion_common::ScalarValue; + +use futures::future::BoxFuture; +use futures::stream::BoxStream; + +/// A fallible future that resolves to a stream of [`RecordBatch`] +pub type FileOpenFuture = + BoxFuture<'static, Result>>>; + +/// Describes the behavior of the `FileStream` if file opening or scanning fails +pub enum OnError { + /// Fail the entire stream and return the underlying error + Fail, + /// Continue scanning, ignoring the failed file + Skip, +} + +impl Default for OnError { + fn default() -> Self { + Self::Fail + } +} + +/// Generic API for opening a file using an [`ObjectStore`] and resolving to a +/// stream of [`RecordBatch`] +/// +/// [`ObjectStore`]: object_store::ObjectStore +pub trait FileOpener: Unpin + Send + Sync { + /// Asynchronously open the specified file and return a stream + /// of [`RecordBatch`] + fn open(&self, file_meta: FileMeta) -> Result; +} + +/// Represents the state of the next `FileOpenFuture`. Since we need to poll +/// this future while scanning the current file, we need to store the result if it +/// is ready +pub enum NextOpen { + Pending(FileOpenFuture), + Ready(Result>>), +} + +pub enum FileStreamState { + /// The idle state, no file is currently being read + Idle, + /// Currently performing asynchronous IO to obtain a stream of RecordBatch + /// for a given file + Open { + /// A [`FileOpenFuture`] returned by [`FileOpener::open`] + future: FileOpenFuture, + /// The partition values for this file + partition_values: Vec, + }, + /// Scanning the [`BoxStream`] returned by the completion of a [`FileOpenFuture`] + /// returned by [`FileOpener::open`] + Scan { + /// Partitioning column values for the current batch_iter + partition_values: Vec, + /// The reader instance + reader: BoxStream<'static, Result>, + /// A [`FileOpenFuture`] for the next file to be processed, + /// and its corresponding partition column values, if any. + /// This allows the next file to be opened in parallel while the + /// current file is read. + next: Option<(NextOpen, Vec)>, + }, + /// Encountered an error + Error, + /// Reached the row limit + Limit, +} + +/// A timer that can be started and stopped. +pub struct StartableTime { + pub metrics: Time, + // use for record each part cost time, will eventually add into 'metrics'. + pub start: Option, +} + +impl StartableTime { + pub fn start(&mut self) { + assert!(self.start.is_none()); + self.start = Some(Instant::now()); + } + + pub fn stop(&mut self) { + if let Some(start) = self.start.take() { + self.metrics.add_elapsed(start); + } + } +} + +#[allow(rustdoc::broken_intra_doc_links)] +/// Metrics for [`FileStream`] +/// +/// Note that all of these metrics are in terms of wall clock time +/// (not cpu time) so they include time spent waiting on I/O as well +/// as other operators. +/// +/// [`FileStream`]: +pub struct FileStreamMetrics { + /// Wall clock time elapsed for file opening. + /// + /// Time between when [`FileOpener::open`] is called and when the + /// [`FileStream`] receives a stream for reading. + /// + /// If there are multiple files being scanned, the stream + /// will open the next file in the background while scanning the + /// current file. This metric will only capture time spent opening + /// while not also scanning. + /// [`FileStream`]: + pub time_opening: StartableTime, + /// Wall clock time elapsed for file scanning + first record batch of decompression + decoding + /// + /// Time between when the [`FileStream`] requests data from the + /// stream and when the first [`RecordBatch`] is produced. + /// [`FileStream`]: + pub time_scanning_until_data: StartableTime, + /// Total elapsed wall clock time for scanning + record batch decompression / decoding + /// + /// Sum of time between when the [`FileStream`] requests data from + /// the stream and when a [`RecordBatch`] is produced for all + /// record batches in the stream. Note that this metric also + /// includes the time of the parent operator's execution. + pub time_scanning_total: StartableTime, + /// Wall clock time elapsed for data decompression + decoding + /// + /// Time spent waiting for the FileStream's input. + pub time_processing: StartableTime, + /// Count of errors opening file. + /// + /// If using `OnError::Skip` this will provide a count of the number of files + /// which were skipped and will not be included in the scan results. + pub file_open_errors: Count, + /// Count of errors scanning file + /// + /// If using `OnError::Skip` this will provide a count of the number of files + /// which were skipped and will not be included in the scan results. + pub file_scan_errors: Count, +} + +impl FileStreamMetrics { + pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self { + let time_opening = StartableTime { + metrics: MetricBuilder::new(metrics) + .subset_time("time_elapsed_opening", partition), + start: None, + }; + + let time_scanning_until_data = StartableTime { + metrics: MetricBuilder::new(metrics) + .subset_time("time_elapsed_scanning_until_data", partition), + start: None, + }; + + let time_scanning_total = StartableTime { + metrics: MetricBuilder::new(metrics) + .subset_time("time_elapsed_scanning_total", partition), + start: None, + }; + + let time_processing = StartableTime { + metrics: MetricBuilder::new(metrics) + .subset_time("time_elapsed_processing", partition), + start: None, + }; + + let file_open_errors = + MetricBuilder::new(metrics).counter("file_open_errors", partition); + + let file_scan_errors = + MetricBuilder::new(metrics).counter("file_scan_errors", partition); + + Self { + time_opening, + time_scanning_until_data, + time_scanning_total, + time_processing, + file_open_errors, + file_scan_errors, + } + } +} diff --git a/datafusion/catalog-listing/src/mod.rs b/datafusion/catalog-listing/src/mod.rs index 786c27acb95e..9eb79ec07ac8 100644 --- a/datafusion/catalog-listing/src/mod.rs +++ b/datafusion/catalog-listing/src/mod.rs @@ -20,7 +20,10 @@ pub mod file_compression_type; pub mod file_groups; +pub mod file_meta; +pub mod file_scan_config; pub mod file_sink_config; +pub mod file_stream; pub mod helpers; pub mod url; pub mod write; diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs b/datafusion/core/src/datasource/physical_plan/avro.rs index b148c412c48e..6aa330caffab 100644 --- a/datafusion/core/src/datasource/physical_plan/avro.rs +++ b/datafusion/core/src/datasource/physical_plan/avro.rs @@ -265,8 +265,8 @@ impl FileSource for AvroSource { #[cfg(feature = "avro")] mod private { use super::*; - use crate::datasource::physical_plan::file_stream::{FileOpenFuture, FileOpener}; use crate::datasource::physical_plan::FileMeta; + use crate::datasource::physical_plan::{FileOpenFuture, FileOpener}; use bytes::Buf; use futures::StreamExt; diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index bfc2c1df8eab..5e017b992581 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -28,8 +28,8 @@ use crate::datasource::data_source::FileSource; use crate::datasource::file_format::file_compression_type::FileCompressionType; use crate::datasource::file_format::{deserialize_stream, DecoderDeserializer}; use crate::datasource::listing::{FileRange, ListingTableUrl, PartitionedFile}; -use crate::datasource::physical_plan::file_stream::{FileOpenFuture, FileOpener}; use crate::datasource::physical_plan::FileMeta; +use crate::datasource::physical_plan::{FileOpenFuture, FileOpener}; use crate::error::{DataFusionError, Result}; use crate::physical_plan::{ExecutionPlan, ExecutionPlanProperties}; diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index c714fad6e9c1..3708fe6abd5e 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -27,23 +27,15 @@ use crate::datasource::{listing::PartitionedFile, object_store::ObjectStoreUrl}; use crate::{error::Result, scalar::ScalarValue}; use std::any::Any; use std::fmt::Formatter; -use std::{ - borrow::Cow, collections::HashMap, fmt, fmt::Debug, marker::PhantomData, - mem::size_of, sync::Arc, vec, -}; +use std::{fmt, sync::Arc}; -use arrow::array::{ - ArrayData, ArrayRef, BufferBuilder, DictionaryArray, RecordBatch, RecordBatchOptions, -}; -use arrow::buffer::Buffer; -use arrow::datatypes::{ArrowNativeType, DataType, Field, Schema, SchemaRef, UInt16Type}; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion_common::stats::Precision; -use datafusion_common::{ - exec_err, ColumnStatistics, Constraints, DataFusionError, Statistics, -}; +use datafusion_common::{ColumnStatistics, Constraints, Statistics}; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, Partitioning}; use crate::datasource::data_source::FileSource; +pub use datafusion_catalog_listing::file_scan_config::*; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_physical_plan::display::{display_orderings, ProjectSchemaDisplay}; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; @@ -52,7 +44,6 @@ use datafusion_physical_plan::projection::{ }; use datafusion_physical_plan::source::{DataSource, DataSourceExec}; use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; -use log::warn; /// Convert type to a type suitable for use as a [`ListingTable`] /// partition column. Returns `Dictionary(UInt16, val_type)`, which is @@ -600,261 +591,13 @@ impl FileScanConfig { } } -/// A helper that projects partition columns into the file record batches. -/// -/// One interesting trick is the usage of a cache for the key buffers of the partition column -/// dictionaries. Indeed, the partition columns are constant, so the dictionaries that represent them -/// have all their keys equal to 0. This enables us to re-use the same "all-zero" buffer across batches, -/// which makes the space consumption of the partition columns O(batch_size) instead of O(record_count). -pub struct PartitionColumnProjector { - /// An Arrow buffer initialized to zeros that represents the key array of all partition - /// columns (partition columns are materialized by dictionary arrays with only one - /// value in the dictionary, thus all the keys are equal to zero). - key_buffer_cache: ZeroBufferGenerators, - /// Mapping between the indexes in the list of partition columns and the target - /// schema. Sorted by index in the target schema so that we can iterate on it to - /// insert the partition columns in the target record batch. - projected_partition_indexes: Vec<(usize, usize)>, - /// The schema of the table once the projection was applied. - projected_schema: SchemaRef, -} - -impl PartitionColumnProjector { - // Create a projector to insert the partitioning columns into batches read from files - // - `projected_schema`: the target schema with both file and partitioning columns - // - `table_partition_cols`: all the partitioning column names - pub fn new(projected_schema: SchemaRef, table_partition_cols: &[String]) -> Self { - let mut idx_map = HashMap::new(); - for (partition_idx, partition_name) in table_partition_cols.iter().enumerate() { - if let Ok(schema_idx) = projected_schema.index_of(partition_name) { - idx_map.insert(partition_idx, schema_idx); - } - } - - let mut projected_partition_indexes: Vec<_> = idx_map.into_iter().collect(); - projected_partition_indexes.sort_by(|(_, a), (_, b)| a.cmp(b)); - - Self { - projected_partition_indexes, - key_buffer_cache: Default::default(), - projected_schema, - } - } - - // Transform the batch read from the file by inserting the partitioning columns - // to the right positions as deduced from `projected_schema` - // - `file_batch`: batch read from the file, with internal projection applied - // - `partition_values`: the list of partition values, one for each partition column - pub fn project( - &mut self, - file_batch: RecordBatch, - partition_values: &[ScalarValue], - ) -> Result { - let expected_cols = - self.projected_schema.fields().len() - self.projected_partition_indexes.len(); - - if file_batch.columns().len() != expected_cols { - return exec_err!( - "Unexpected batch schema from file, expected {} cols but got {}", - expected_cols, - file_batch.columns().len() - ); - } - - let mut cols = file_batch.columns().to_vec(); - for &(pidx, sidx) in &self.projected_partition_indexes { - let p_value = - partition_values - .get(pidx) - .ok_or(DataFusionError::Execution( - "Invalid partitioning found on disk".to_string(), - ))?; - - let mut partition_value = Cow::Borrowed(p_value); - - // check if user forgot to dict-encode the partition value - let field = self.projected_schema.field(sidx); - let expected_data_type = field.data_type(); - let actual_data_type = partition_value.data_type(); - if let DataType::Dictionary(key_type, _) = expected_data_type { - if !matches!(actual_data_type, DataType::Dictionary(_, _)) { - warn!("Partition value for column {} was not dictionary-encoded, applied auto-fix.", field.name()); - partition_value = Cow::Owned(ScalarValue::Dictionary( - key_type.clone(), - Box::new(partition_value.as_ref().clone()), - )); - } - } - - cols.insert( - sidx, - create_output_array( - &mut self.key_buffer_cache, - partition_value.as_ref(), - file_batch.num_rows(), - )?, - ) - } - - RecordBatch::try_new_with_options( - Arc::clone(&self.projected_schema), - cols, - &RecordBatchOptions::new().with_row_count(Some(file_batch.num_rows())), - ) - .map_err(Into::into) - } -} - -#[derive(Debug, Default)] -struct ZeroBufferGenerators { - gen_i8: ZeroBufferGenerator, - gen_i16: ZeroBufferGenerator, - gen_i32: ZeroBufferGenerator, - gen_i64: ZeroBufferGenerator, - gen_u8: ZeroBufferGenerator, - gen_u16: ZeroBufferGenerator, - gen_u32: ZeroBufferGenerator, - gen_u64: ZeroBufferGenerator, -} - -/// Generate a arrow [`Buffer`] that contains zero values. -#[derive(Debug, Default)] -struct ZeroBufferGenerator -where - T: ArrowNativeType, -{ - cache: Option, - _t: PhantomData, -} - -impl ZeroBufferGenerator -where - T: ArrowNativeType, -{ - const SIZE: usize = size_of::(); - - fn get_buffer(&mut self, n_vals: usize) -> Buffer { - match &mut self.cache { - Some(buf) if buf.len() >= n_vals * Self::SIZE => { - buf.slice_with_length(0, n_vals * Self::SIZE) - } - _ => { - let mut key_buffer_builder = BufferBuilder::::new(n_vals); - key_buffer_builder.advance(n_vals); // keys are all 0 - self.cache.insert(key_buffer_builder.finish()).clone() - } - } - } -} - -fn create_dict_array( - buffer_gen: &mut ZeroBufferGenerator, - dict_val: &ScalarValue, - len: usize, - data_type: DataType, -) -> Result -where - T: ArrowNativeType, -{ - let dict_vals = dict_val.to_array()?; - - let sliced_key_buffer = buffer_gen.get_buffer(len); - - // assemble pieces together - let mut builder = ArrayData::builder(data_type) - .len(len) - .add_buffer(sliced_key_buffer); - builder = builder.add_child_data(dict_vals.to_data()); - Ok(Arc::new(DictionaryArray::::from( - builder.build().unwrap(), - ))) -} - -fn create_output_array( - key_buffer_cache: &mut ZeroBufferGenerators, - val: &ScalarValue, - len: usize, -) -> Result { - if let ScalarValue::Dictionary(key_type, dict_val) = &val { - match key_type.as_ref() { - DataType::Int8 => { - return create_dict_array( - &mut key_buffer_cache.gen_i8, - dict_val, - len, - val.data_type(), - ); - } - DataType::Int16 => { - return create_dict_array( - &mut key_buffer_cache.gen_i16, - dict_val, - len, - val.data_type(), - ); - } - DataType::Int32 => { - return create_dict_array( - &mut key_buffer_cache.gen_i32, - dict_val, - len, - val.data_type(), - ); - } - DataType::Int64 => { - return create_dict_array( - &mut key_buffer_cache.gen_i64, - dict_val, - len, - val.data_type(), - ); - } - DataType::UInt8 => { - return create_dict_array( - &mut key_buffer_cache.gen_u8, - dict_val, - len, - val.data_type(), - ); - } - DataType::UInt16 => { - return create_dict_array( - &mut key_buffer_cache.gen_u16, - dict_val, - len, - val.data_type(), - ); - } - DataType::UInt32 => { - return create_dict_array( - &mut key_buffer_cache.gen_u32, - dict_val, - len, - val.data_type(), - ); - } - DataType::UInt64 => { - return create_dict_array( - &mut key_buffer_cache.gen_u64, - dict_val, - len, - val.data_type(), - ); - } - _ => {} - } - } - - val.to_array_of_size(len) -} - #[cfg(test)] mod tests { - use arrow::array::Int32Array; - use super::*; use crate::datasource::physical_plan::ArrowSource; use crate::{test::columns, test_util::aggr_test_schema}; + use arrow::array::{Int32Array, RecordBatch}; + use std::collections::HashMap; #[test] fn physical_plan_config_no_projection() { diff --git a/datafusion/core/src/datasource/physical_plan/file_stream.rs b/datafusion/core/src/datasource/physical_plan/file_stream.rs index 497af101bee7..c88d4c4458a5 100644 --- a/datafusion/core/src/datasource/physical_plan/file_stream.rs +++ b/datafusion/core/src/datasource/physical_plan/file_stream.rs @@ -31,49 +31,20 @@ use crate::datasource::listing::PartitionedFile; use crate::datasource::physical_plan::file_scan_config::PartitionColumnProjector; use crate::datasource::physical_plan::{FileMeta, FileScanConfig}; use crate::error::Result; -use crate::physical_plan::metrics::{ - BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, Time, -}; +use crate::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet}; use crate::physical_plan::RecordBatchStream; use arrow::datatypes::SchemaRef; use arrow::error::ArrowError; use arrow::record_batch::RecordBatch; -use datafusion_common::instant::Instant; +pub use datafusion_catalog_listing::file_stream::{FileOpenFuture, FileOpener, OnError}; +use datafusion_catalog_listing::file_stream::{ + FileStreamMetrics, FileStreamState, NextOpen, +}; use datafusion_common::ScalarValue; -use futures::future::BoxFuture; -use futures::stream::BoxStream; use futures::{ready, FutureExt, Stream, StreamExt}; -/// A fallible future that resolves to a stream of [`RecordBatch`] -pub type FileOpenFuture = - BoxFuture<'static, Result>>>; - -/// Describes the behavior of the `FileStream` if file opening or scanning fails -pub enum OnError { - /// Fail the entire stream and return the underlying error - Fail, - /// Continue scanning, ignoring the failed file - Skip, -} - -impl Default for OnError { - fn default() -> Self { - Self::Fail - } -} - -/// Generic API for opening a file using an [`ObjectStore`] and resolving to a -/// stream of [`RecordBatch`] -/// -/// [`ObjectStore`]: object_store::ObjectStore -pub trait FileOpener: Unpin + Send + Sync { - /// Asynchronously open the specified file and return a stream - /// of [`RecordBatch`] - fn open(&self, file_meta: FileMeta) -> Result; -} - /// A stream that iterates record batch by record batch, file over file. pub struct FileStream { /// An iterator over input files. @@ -98,151 +69,6 @@ pub struct FileStream { on_error: OnError, } -/// Represents the state of the next `FileOpenFuture`. Since we need to poll -/// this future while scanning the current file, we need to store the result if it -/// is ready -enum NextOpen { - Pending(FileOpenFuture), - Ready(Result>>), -} - -enum FileStreamState { - /// The idle state, no file is currently being read - Idle, - /// Currently performing asynchronous IO to obtain a stream of RecordBatch - /// for a given file - Open { - /// A [`FileOpenFuture`] returned by [`FileOpener::open`] - future: FileOpenFuture, - /// The partition values for this file - partition_values: Vec, - }, - /// Scanning the [`BoxStream`] returned by the completion of a [`FileOpenFuture`] - /// returned by [`FileOpener::open`] - Scan { - /// Partitioning column values for the current batch_iter - partition_values: Vec, - /// The reader instance - reader: BoxStream<'static, Result>, - /// A [`FileOpenFuture`] for the next file to be processed, - /// and its corresponding partition column values, if any. - /// This allows the next file to be opened in parallel while the - /// current file is read. - next: Option<(NextOpen, Vec)>, - }, - /// Encountered an error - Error, - /// Reached the row limit - Limit, -} - -/// A timer that can be started and stopped. -pub struct StartableTime { - pub(crate) metrics: Time, - // use for record each part cost time, will eventually add into 'metrics'. - pub(crate) start: Option, -} - -impl StartableTime { - pub(crate) fn start(&mut self) { - assert!(self.start.is_none()); - self.start = Some(Instant::now()); - } - - pub(crate) fn stop(&mut self) { - if let Some(start) = self.start.take() { - self.metrics.add_elapsed(start); - } - } -} - -/// Metrics for [`FileStream`] -/// -/// Note that all of these metrics are in terms of wall clock time -/// (not cpu time) so they include time spent waiting on I/O as well -/// as other operators. -struct FileStreamMetrics { - /// Wall clock time elapsed for file opening. - /// - /// Time between when [`FileOpener::open`] is called and when the - /// [`FileStream`] receives a stream for reading. - /// - /// If there are multiple files being scanned, the stream - /// will open the next file in the background while scanning the - /// current file. This metric will only capture time spent opening - /// while not also scanning. - pub time_opening: StartableTime, - /// Wall clock time elapsed for file scanning + first record batch of decompression + decoding - /// - /// Time between when the [`FileStream`] requests data from the - /// stream and when the first [`RecordBatch`] is produced. - pub time_scanning_until_data: StartableTime, - /// Total elapsed wall clock time for scanning + record batch decompression / decoding - /// - /// Sum of time between when the [`FileStream`] requests data from - /// the stream and when a [`RecordBatch`] is produced for all - /// record batches in the stream. Note that this metric also - /// includes the time of the parent operator's execution. - pub time_scanning_total: StartableTime, - /// Wall clock time elapsed for data decompression + decoding - /// - /// Time spent waiting for the FileStream's input. - pub time_processing: StartableTime, - /// Count of errors opening file. - /// - /// If using `OnError::Skip` this will provide a count of the number of files - /// which were skipped and will not be included in the scan results. - pub file_open_errors: Count, - /// Count of errors scanning file - /// - /// If using `OnError::Skip` this will provide a count of the number of files - /// which were skipped and will not be included in the scan results. - pub file_scan_errors: Count, -} - -impl FileStreamMetrics { - fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self { - let time_opening = StartableTime { - metrics: MetricBuilder::new(metrics) - .subset_time("time_elapsed_opening", partition), - start: None, - }; - - let time_scanning_until_data = StartableTime { - metrics: MetricBuilder::new(metrics) - .subset_time("time_elapsed_scanning_until_data", partition), - start: None, - }; - - let time_scanning_total = StartableTime { - metrics: MetricBuilder::new(metrics) - .subset_time("time_elapsed_scanning_total", partition), - start: None, - }; - - let time_processing = StartableTime { - metrics: MetricBuilder::new(metrics) - .subset_time("time_elapsed_processing", partition), - start: None, - }; - - let file_open_errors = - MetricBuilder::new(metrics).counter("file_open_errors", partition); - - let file_scan_errors = - MetricBuilder::new(metrics).counter("file_scan_errors", partition); - - Self { - time_opening, - time_scanning_until_data, - time_scanning_total, - time_processing, - file_open_errors, - file_scan_errors, - } - } -} - impl FileStream { /// Create a new `FileStream` using the give `FileOpener` to scan underlying files pub fn new( diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs index 76cb657b0c5f..51e0a46d942e 100644 --- a/datafusion/core/src/datasource/physical_plan/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -27,8 +27,8 @@ use crate::datasource::data_source::FileSource; use crate::datasource::file_format::file_compression_type::FileCompressionType; use crate::datasource::file_format::{deserialize_stream, DecoderDeserializer}; use crate::datasource::listing::{ListingTableUrl, PartitionedFile}; -use crate::datasource::physical_plan::file_stream::{FileOpenFuture, FileOpener}; use crate::datasource::physical_plan::FileMeta; +use crate::datasource::physical_plan::{FileOpenFuture, FileOpener}; use crate::error::{DataFusionError, Result}; use crate::physical_plan::{ExecutionPlan, ExecutionPlanProperties}; diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index f08981605f2f..18174bd54e4f 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -51,6 +51,7 @@ pub use avro::AvroSource; pub use csv::{CsvExec, CsvExecBuilder}; pub use csv::{CsvOpener, CsvSource}; pub use datafusion_catalog_listing::file_groups::FileGroupPartitioner; +pub use datafusion_catalog_listing::file_meta::FileMeta; pub use datafusion_catalog_listing::file_sink_config::*; pub use file_scan_config::{ wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig, @@ -61,7 +62,7 @@ use futures::StreamExt; pub use json::NdJsonExec; pub use json::{JsonOpener, JsonSource}; use log::debug; -use object_store::{path::Path, GetOptions, GetRange, ObjectMeta, ObjectStore}; +use object_store::{path::Path, GetOptions, GetRange, ObjectStore}; use std::{ fmt::{Debug, Formatter, Result as FmtResult}, ops::Range, @@ -219,36 +220,6 @@ where Ok(()) } -/// A single file or part of a file that should be read, along with its schema, statistics -pub struct FileMeta { - /// Path for the file (e.g. URL, filesystem path, etc) - pub object_meta: ObjectMeta, - /// An optional file range for a more fine-grained parallel execution - pub range: Option, - /// An optional field for user defined per object metadata - pub extensions: Option>, - /// Size hint for the metadata of this file - pub metadata_size_hint: Option, -} - -impl FileMeta { - /// The full path to the object - pub fn location(&self) -> &Path { - &self.object_meta.location - } -} - -impl From for FileMeta { - fn from(object_meta: ObjectMeta) -> Self { - Self { - object_meta, - range: None, - extensions: None, - metadata_size_hint: None, - } - } -} - /// The various listing tables does not attempt to read all files /// concurrently, instead they will read files in sequence within a /// partition. This is an important property as it allows plans to @@ -490,6 +461,7 @@ mod tests { StringArray, UInt64Array, }; use arrow::datatypes::{DataType, Field, Schema}; + use object_store::ObjectMeta; use crate::datasource::schema_adapter::{ DefaultSchemaAdapterFactory, SchemaAdapterFactory,