From 306f255050352c1b224b04c60309428ab01d49c9 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Tue, 25 Feb 2025 14:53:52 +0000 Subject: [PATCH 1/5] initial work --- .../core/src/datasource/file_format/json.rs | 3 +- .../core/src/datasource/file_format/mod.rs | 396 ++---------------- datafusion/datasource/src/file_format.rs | 377 +++++++++++++++++ datafusion/datasource/src/mod.rs | 1 + 4 files changed, 406 insertions(+), 371 deletions(-) create mode 100644 datafusion/datasource/src/file_format.rs diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index 7a5aaf7c64e8..1a2aaf3af8be 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -26,7 +26,7 @@ use std::sync::Arc; use super::write::orchestration::spawn_writer_tasks_and_join; use super::{ - Decoder, DecoderDeserializer, FileFormat, FileFormatFactory, FileScanConfig, + Decoder, DecoderDeserializer, FileFormat, FileFormatFactory, DEFAULT_SCHEMA_INFER_MAX_RECORD, }; use crate::datasource::file_format::file_compression_type::FileCompressionType; @@ -52,6 +52,7 @@ use datafusion_common::{not_impl_err, GetExt, DEFAULT_JSON_EXTENSION}; use datafusion_common_runtime::SpawnedTask; use datafusion_datasource::display::FileGroupDisplay; use datafusion_datasource::file::FileSource; +use datafusion_datasource::file_scan_config::FileScanConfig; use datafusion_execution::TaskContext; use datafusion_expr::dml::InsertOp; use datafusion_physical_expr::PhysicalExpr; diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index 2b46748d0a52..2a992d0c3d34 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -18,9 +18,6 @@ //! Module containing helper methods for the various file formats //! See write.rs for write related helper methods -/// Default max records to scan to infer the schema -pub const DEFAULT_SCHEMA_INFER_MAX_RECORD: usize = 1000; - pub mod arrow; pub mod avro; pub mod csv; @@ -28,154 +25,21 @@ pub mod json; pub mod options; #[cfg(feature = "parquet")] pub mod parquet; -use datafusion_datasource::file::FileSource; + +use ::arrow::array::RecordBatch; +use arrow_schema::ArrowError; +use bytes::Buf; +use bytes::Bytes; +use datafusion_common::Result; pub use datafusion_datasource::file_compression_type; -use datafusion_datasource::file_scan_config::FileScanConfig; +pub use datafusion_datasource::file_format::*; pub use datafusion_datasource::write; - -use std::any::Any; -use std::collections::{HashMap, VecDeque}; -use std::fmt::{self, Debug, Display}; -use std::sync::Arc; -use std::task::Poll; - -use crate::arrow::array::RecordBatch; -use crate::arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef}; -use crate::arrow::error::ArrowError; -use crate::datasource::physical_plan::FileSinkConfig; -use crate::error::Result; -use crate::physical_plan::{ExecutionPlan, Statistics}; - -use datafusion_catalog::Session; -use datafusion_common::file_options::file_type::FileType; -use datafusion_common::{internal_err, not_impl_err, GetExt}; -use datafusion_expr::Expr; -use datafusion_physical_expr::PhysicalExpr; - -use async_trait::async_trait; -use bytes::{Buf, Bytes}; -use datafusion_physical_expr_common::sort_expr::LexRequirement; -use file_compression_type::FileCompressionType; use futures::stream::BoxStream; -use futures::{ready, Stream, StreamExt}; -use object_store::{ObjectMeta, ObjectStore}; - -/// Factory for creating [`FileFormat`] instances based on session and command level options -/// -/// Users can provide their own `FileFormatFactory` to support arbitrary file formats -pub trait FileFormatFactory: Sync + Send + GetExt + Debug { - /// Initialize a [FileFormat] and configure based on session and command level options - fn create( - &self, - state: &dyn Session, - format_options: &HashMap, - ) -> Result>; - - /// Initialize a [FileFormat] with all options set to default values - fn default(&self) -> Arc; - - /// Returns the table source as [`Any`] so that it can be - /// downcast to a specific implementation. - fn as_any(&self) -> &dyn Any; -} - -/// This trait abstracts all the file format specific implementations -/// from the [`TableProvider`]. This helps code re-utilization across -/// providers that support the same file formats. -/// -/// [`TableProvider`]: crate::catalog::TableProvider -#[async_trait] -pub trait FileFormat: Send + Sync + Debug { - /// Returns the table provider as [`Any`](std::any::Any) so that it can be - /// downcast to a specific implementation. - fn as_any(&self) -> &dyn Any; - - /// Returns the extension for this FileFormat, e.g. "file.csv" -> csv - fn get_ext(&self) -> String; - - /// Returns the extension for this FileFormat when compressed, e.g. "file.csv.gz" -> csv - fn get_ext_with_compression( - &self, - _file_compression_type: &FileCompressionType, - ) -> Result; - - /// Infer the common schema of the provided objects. The objects will usually - /// be analysed up to a given number of records or files (as specified in the - /// format config) then give the estimated common schema. This might fail if - /// the files have schemas that cannot be merged. - async fn infer_schema( - &self, - state: &dyn Session, - store: &Arc, - objects: &[ObjectMeta], - ) -> Result; - - /// Infer the statistics for the provided object. The cost and accuracy of the - /// estimated statistics might vary greatly between file formats. - /// - /// `table_schema` is the (combined) schema of the overall table - /// and may be a superset of the schema contained in this file. - /// - /// TODO: should the file source return statistics for only columns referred to in the table schema? - async fn infer_stats( - &self, - state: &dyn Session, - store: &Arc, - table_schema: SchemaRef, - object: &ObjectMeta, - ) -> Result; - - /// Take a list of files and convert it to the appropriate executor - /// according to this file format. - async fn create_physical_plan( - &self, - state: &dyn Session, - conf: FileScanConfig, - filters: Option<&Arc>, - ) -> Result>; - - /// Take a list of files and the configuration to convert it to the - /// appropriate writer executor according to this file format. - async fn create_writer_physical_plan( - &self, - _input: Arc, - _state: &dyn Session, - _conf: FileSinkConfig, - _order_requirements: Option, - ) -> Result> { - not_impl_err!("Writer not implemented for this format") - } - - /// Check if the specified file format has support for pushing down the provided filters within - /// the given schemas. Added initially to support the Parquet file format's ability to do this. - fn supports_filters_pushdown( - &self, - _file_schema: &Schema, - _table_schema: &Schema, - _filters: &[&Expr], - ) -> Result { - Ok(FilePushdownSupport::NoSupport) - } - - /// Return the related FileSource such as `CsvSource`, `JsonSource`, etc. - fn file_source(&self) -> Arc; -} - -/// An enum to distinguish between different states when determining if certain filters can be -/// pushed down to file scanning -#[derive(Debug, PartialEq)] -pub enum FilePushdownSupport { - /// The file format/system being asked does not support any sort of pushdown. This should be - /// used even if the file format theoretically supports some sort of pushdown, but it's not - /// enabled or implemented yet. - NoSupport, - /// The file format/system being asked *does* support pushdown, but it can't make it work for - /// the provided filter/expression - NotSupportedForFilter, - /// The file format/system being asked *does* support pushdown and *can* make it work for the - /// provided filter/expression - Supported, -} +use futures::StreamExt as _; +use futures::{ready, Stream}; +use std::collections::VecDeque; +use std::fmt; +use std::task::Poll; /// Possible outputs of a [`BatchDeserializer`]. #[derive(Debug, PartialEq)] @@ -191,7 +55,7 @@ pub enum DeserializerOutput { /// Trait defining a scheme for deserializing byte streams into structured data. /// Implementors of this trait are responsible for converting raw bytes into /// `RecordBatch` objects. -pub trait BatchDeserializer: Send + Debug { +pub trait BatchDeserializer: Send + fmt::Debug { /// Feeds a message for deserialization, updating the internal state of /// this `BatchDeserializer`. Note that one can call this function multiple /// times before calling `next`, which will queue multiple messages for @@ -217,7 +81,7 @@ pub trait BatchDeserializer: Send + Debug { /// [`arrow::csv::reader::Decoder`]: ::arrow::csv::reader::Decoder /// [`Decoder::decode`]: ::arrow::json::reader::Decoder::decode /// [`Decoder::flush`]: ::arrow::json::reader::Decoder::flush -pub(crate) trait Decoder: Send + Debug { +pub trait Decoder: Send + fmt::Debug { /// See [`arrow::json::reader::Decoder::decode`]. /// /// [`arrow::json::reader::Decoder::decode`]: ::arrow::json::reader::Decoder::decode @@ -232,7 +96,7 @@ pub(crate) trait Decoder: Send + Debug { fn can_flush_early(&self) -> bool; } -impl Debug for DecoderDeserializer { +impl fmt::Debug for DecoderDeserializer { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Deserializer") .field("buffered_queue", &self.buffered_queue) @@ -291,7 +155,7 @@ impl BatchDeserializer for DecoderDeserializer { /// encoded data, into a stream of `RecordBatch` objects, following the specified /// schema and formatting options. It also handles any buffering necessary to satisfy /// the `Decoder` interface. -pub(crate) struct DecoderDeserializer { +pub struct DecoderDeserializer { /// The underlying decoder used for deserialization pub(crate) decoder: T, /// The buffer used to store the remaining bytes to be decoded @@ -302,7 +166,7 @@ pub(crate) struct DecoderDeserializer { impl DecoderDeserializer { /// Creates a new `DecoderDeserializer` with the provided decoder. - pub(crate) fn new(decoder: T) -> Self { + pub fn new(decoder: T) -> Self { DecoderDeserializer { decoder, buffered_queue: VecDeque::new(), @@ -336,237 +200,29 @@ pub(crate) fn deserialize_stream<'a>( .boxed() } -/// A container of [FileFormatFactory] which also implements [FileType]. -/// This enables converting a dyn FileFormat to a dyn FileType. -/// The former trait is a superset of the latter trait, which includes execution time -/// relevant methods. [FileType] is only used in logical planning and only implements -/// the subset of methods required during logical planning. -#[derive(Debug)] -pub struct DefaultFileType { - file_format_factory: Arc, -} - -impl DefaultFileType { - /// Constructs a [DefaultFileType] wrapper from a [FileFormatFactory] - pub fn new(file_format_factory: Arc) -> Self { - Self { - file_format_factory, - } - } - - /// get a reference to the inner [FileFormatFactory] struct - pub fn as_format_factory(&self) -> &Arc { - &self.file_format_factory - } -} - -impl FileType for DefaultFileType { - fn as_any(&self) -> &dyn Any { - self - } -} - -impl Display for DefaultFileType { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{:?}", self.file_format_factory) - } -} - -impl GetExt for DefaultFileType { - fn get_ext(&self) -> String { - self.file_format_factory.get_ext() - } -} - -/// Converts a [FileFormatFactory] to a [FileType] -pub fn format_as_file_type( - file_format_factory: Arc, -) -> Arc { - Arc::new(DefaultFileType { - file_format_factory, - }) -} - -/// Converts a [FileType] to a [FileFormatFactory]. -/// Returns an error if the [FileType] cannot be -/// downcasted to a [DefaultFileType]. -pub fn file_type_to_format( - file_type: &Arc, -) -> Result> { - match file_type - .as_ref() - .as_any() - .downcast_ref::() - { - Some(source) => Ok(Arc::clone(&source.file_format_factory)), - _ => internal_err!("FileType was not DefaultFileType"), - } -} - -/// Create a new field with the specified data type, copying the other -/// properties from the input field -fn field_with_new_type(field: &FieldRef, new_type: DataType) -> FieldRef { - Arc::new(field.as_ref().clone().with_data_type(new_type)) -} - -/// Transform a schema to use view types for Utf8 and Binary -/// -/// See [parquet::ParquetFormat::force_view_types] for details -pub fn transform_schema_to_view(schema: &Schema) -> Schema { - let transformed_fields: Vec> = schema - .fields - .iter() - .map(|field| match field.data_type() { - DataType::Utf8 | DataType::LargeUtf8 => { - field_with_new_type(field, DataType::Utf8View) - } - DataType::Binary | DataType::LargeBinary => { - field_with_new_type(field, DataType::BinaryView) - } - _ => Arc::clone(field), - }) - .collect(); - Schema::new_with_metadata(transformed_fields, schema.metadata.clone()) -} - -/// Coerces the file schema if the table schema uses a view type. -#[cfg(not(target_arch = "wasm32"))] -pub(crate) fn coerce_file_schema_to_view_type( - table_schema: &Schema, - file_schema: &Schema, -) -> Option { - let mut transform = false; - let table_fields: HashMap<_, _> = table_schema - .fields - .iter() - .map(|f| { - let dt = f.data_type(); - if dt.equals_datatype(&DataType::Utf8View) - || dt.equals_datatype(&DataType::BinaryView) - { - transform = true; - } - (f.name(), dt) - }) - .collect(); - - if !transform { - return None; - } - - let transformed_fields: Vec> = file_schema - .fields - .iter() - .map( - |field| match (table_fields.get(field.name()), field.data_type()) { - (Some(DataType::Utf8View), DataType::Utf8 | DataType::LargeUtf8) => { - field_with_new_type(field, DataType::Utf8View) - } - ( - Some(DataType::BinaryView), - DataType::Binary | DataType::LargeBinary, - ) => field_with_new_type(field, DataType::BinaryView), - _ => Arc::clone(field), - }, - ) - .collect(); - - Some(Schema::new_with_metadata( - transformed_fields, - file_schema.metadata.clone(), - )) -} - -/// Transform a schema so that any binary types are strings -pub fn transform_binary_to_string(schema: &Schema) -> Schema { - let transformed_fields: Vec> = schema - .fields - .iter() - .map(|field| match field.data_type() { - DataType::Binary => field_with_new_type(field, DataType::Utf8), - DataType::LargeBinary => field_with_new_type(field, DataType::LargeUtf8), - DataType::BinaryView => field_with_new_type(field, DataType::Utf8View), - _ => Arc::clone(field), - }) - .collect(); - Schema::new_with_metadata(transformed_fields, schema.metadata.clone()) -} - -/// If the table schema uses a string type, coerce the file schema to use a string type. -/// -/// See [parquet::ParquetFormat::binary_as_string] for details -#[cfg(not(target_arch = "wasm32"))] -pub(crate) fn coerce_file_schema_to_string_type( - table_schema: &Schema, - file_schema: &Schema, -) -> Option { - let mut transform = false; - let table_fields: HashMap<_, _> = table_schema - .fields - .iter() - .map(|f| (f.name(), f.data_type())) - .collect(); - let transformed_fields: Vec> = file_schema - .fields - .iter() - .map( - |field| match (table_fields.get(field.name()), field.data_type()) { - // table schema uses string type, coerce the file schema to use string type - ( - Some(DataType::Utf8), - DataType::Binary | DataType::LargeBinary | DataType::BinaryView, - ) => { - transform = true; - field_with_new_type(field, DataType::Utf8) - } - // table schema uses large string type, coerce the file schema to use large string type - ( - Some(DataType::LargeUtf8), - DataType::Binary | DataType::LargeBinary | DataType::BinaryView, - ) => { - transform = true; - field_with_new_type(field, DataType::LargeUtf8) - } - // table schema uses string view type, coerce the file schema to use view type - ( - Some(DataType::Utf8View), - DataType::Binary | DataType::LargeBinary | DataType::BinaryView, - ) => { - transform = true; - field_with_new_type(field, DataType::Utf8View) - } - _ => Arc::clone(field), - }, - ) - .collect(); - - if !transform { - None - } else { - Some(Schema::new_with_metadata( - transformed_fields, - file_schema.metadata.clone(), - )) - } -} - #[cfg(test)] pub(crate) mod test_util { + use std::fmt::{self, Display}; use std::ops::Range; - use std::sync::Mutex; + use std::sync::{Arc, Mutex}; - use super::*; use crate::datasource::listing::PartitionedFile; use crate::datasource::object_store::ObjectStoreUrl; use crate::test::object_store::local_unpartitioned_file; + use async_trait::async_trait; use bytes::Bytes; + use datafusion_catalog::Session; + use datafusion_common::Result; + use datafusion_datasource::file_format::FileFormat; + use datafusion_datasource::file_scan_config::FileScanConfig; + use datafusion_physical_plan::ExecutionPlan; use futures::stream::BoxStream; use futures::StreamExt; use object_store::local::LocalFileSystem; use object_store::path::Path; use object_store::{ Attributes, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, - PutMultipartOpts, PutOptions, PutPayload, PutResult, + ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult, }; pub async fn scan_format( diff --git a/datafusion/datasource/src/file_format.rs b/datafusion/datasource/src/file_format.rs new file mode 100644 index 000000000000..3c46196c52f9 --- /dev/null +++ b/datafusion/datasource/src/file_format.rs @@ -0,0 +1,377 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Module containing helper methods for the various file formats +//! See write.rs for write related helper methods + +use std::any::Any; +use std::collections::HashMap; +use std::fmt; +use std::sync::Arc; + +use crate::file::FileSource; +use crate::file_compression_type::FileCompressionType; +use crate::file_scan_config::FileScanConfig; +use crate::file_sink_config::FileSinkConfig; + +use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef}; +use async_trait::async_trait; +use datafusion_catalog::Session; +use datafusion_common::file_options::file_type::FileType; +use datafusion_common::{internal_err, not_impl_err, GetExt, Result, Statistics}; +use datafusion_expr::Expr; +use datafusion_physical_expr::{LexRequirement, PhysicalExpr}; +use datafusion_physical_plan::ExecutionPlan; + +use object_store::{ObjectMeta, ObjectStore}; + +/// Default max records to scan to infer the schema +pub const DEFAULT_SCHEMA_INFER_MAX_RECORD: usize = 1000; + +/// This trait abstracts all the file format specific implementations +/// from the [`TableProvider`]. This helps code re-utilization across +/// providers that support the same file formats. +/// +/// [`TableProvider`]: crate::catalog::TableProvider +#[async_trait] +pub trait FileFormat: Send + Sync + fmt::Debug { + /// Returns the table provider as [`Any`](std::any::Any) so that it can be + /// downcast to a specific implementation. + fn as_any(&self) -> &dyn Any; + + /// Returns the extension for this FileFormat, e.g. "file.csv" -> csv + fn get_ext(&self) -> String; + + /// Returns the extension for this FileFormat when compressed, e.g. "file.csv.gz" -> csv + fn get_ext_with_compression( + &self, + _file_compression_type: &FileCompressionType, + ) -> Result; + + /// Infer the common schema of the provided objects. The objects will usually + /// be analysed up to a given number of records or files (as specified in the + /// format config) then give the estimated common schema. This might fail if + /// the files have schemas that cannot be merged. + async fn infer_schema( + &self, + state: &dyn Session, + store: &Arc, + objects: &[ObjectMeta], + ) -> Result; + + /// Infer the statistics for the provided object. The cost and accuracy of the + /// estimated statistics might vary greatly between file formats. + /// + /// `table_schema` is the (combined) schema of the overall table + /// and may be a superset of the schema contained in this file. + /// + /// TODO: should the file source return statistics for only columns referred to in the table schema? + async fn infer_stats( + &self, + state: &dyn Session, + store: &Arc, + table_schema: SchemaRef, + object: &ObjectMeta, + ) -> Result; + + /// Take a list of files and convert it to the appropriate executor + /// according to this file format. + async fn create_physical_plan( + &self, + state: &dyn Session, + conf: FileScanConfig, + filters: Option<&Arc>, + ) -> Result>; + + /// Take a list of files and the configuration to convert it to the + /// appropriate writer executor according to this file format. + async fn create_writer_physical_plan( + &self, + _input: Arc, + _state: &dyn Session, + _conf: FileSinkConfig, + _order_requirements: Option, + ) -> Result> { + not_impl_err!("Writer not implemented for this format") + } + + /// Check if the specified file format has support for pushing down the provided filters within + /// the given schemas. Added initially to support the Parquet file format's ability to do this. + fn supports_filters_pushdown( + &self, + _file_schema: &Schema, + _table_schema: &Schema, + _filters: &[&Expr], + ) -> Result { + Ok(FilePushdownSupport::NoSupport) + } + + /// Return the related FileSource such as `CsvSource`, `JsonSource`, etc. + fn file_source(&self) -> Arc; +} + +/// An enum to distinguish between different states when determining if certain filters can be +/// pushed down to file scanning +#[derive(Debug, PartialEq)] +pub enum FilePushdownSupport { + /// The file format/system being asked does not support any sort of pushdown. This should be + /// used even if the file format theoretically supports some sort of pushdown, but it's not + /// enabled or implemented yet. + NoSupport, + /// The file format/system being asked *does* support pushdown, but it can't make it work for + /// the provided filter/expression + NotSupportedForFilter, + /// The file format/system being asked *does* support pushdown and *can* make it work for the + /// provided filter/expression + Supported, +} + +/// Factory for creating [`FileFormat`] instances based on session and command level options +/// +/// Users can provide their own `FileFormatFactory` to support arbitrary file formats +pub trait FileFormatFactory: Sync + Send + GetExt + fmt::Debug { + /// Initialize a [FileFormat] and configure based on session and command level options + fn create( + &self, + state: &dyn Session, + format_options: &HashMap, + ) -> Result>; + + /// Initialize a [FileFormat] with all options set to default values + fn default(&self) -> Arc; + + /// Returns the table source as [`Any`] so that it can be + /// downcast to a specific implementation. + fn as_any(&self) -> &dyn Any; +} + +/// A container of [FileFormatFactory] which also implements [FileType]. +/// This enables converting a dyn FileFormat to a dyn FileType. +/// The former trait is a superset of the latter trait, which includes execution time +/// relevant methods. [FileType] is only used in logical planning and only implements +/// the subset of methods required during logical planning. +#[derive(Debug)] +pub struct DefaultFileType { + file_format_factory: Arc, +} + +impl DefaultFileType { + /// Constructs a [DefaultFileType] wrapper from a [FileFormatFactory] + pub fn new(file_format_factory: Arc) -> Self { + Self { + file_format_factory, + } + } + + /// get a reference to the inner [FileFormatFactory] struct + pub fn as_format_factory(&self) -> &Arc { + &self.file_format_factory + } +} + +impl FileType for DefaultFileType { + fn as_any(&self) -> &dyn Any { + self + } +} + +impl fmt::Display for DefaultFileType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{:?}", self.file_format_factory) + } +} + +impl GetExt for DefaultFileType { + fn get_ext(&self) -> String { + self.file_format_factory.get_ext() + } +} + +/// Converts a [FileFormatFactory] to a [FileType] +pub fn format_as_file_type( + file_format_factory: Arc, +) -> Arc { + Arc::new(DefaultFileType { + file_format_factory, + }) +} + +/// Converts a [FileType] to a [FileFormatFactory]. +/// Returns an error if the [FileType] cannot be +/// downcasted to a [DefaultFileType]. +pub fn file_type_to_format( + file_type: &Arc, +) -> Result> { + match file_type + .as_ref() + .as_any() + .downcast_ref::() + { + Some(source) => Ok(Arc::clone(&source.file_format_factory)), + _ => internal_err!("FileType was not DefaultFileType"), + } +} + +/// Create a new field with the specified data type, copying the other +/// properties from the input field +fn field_with_new_type(field: &FieldRef, new_type: DataType) -> FieldRef { + Arc::new(field.as_ref().clone().with_data_type(new_type)) +} + +/// Transform a schema to use view types for Utf8 and Binary +/// +/// See [parquet::ParquetFormat::force_view_types] for details +pub fn transform_schema_to_view(schema: &Schema) -> Schema { + let transformed_fields: Vec> = schema + .fields + .iter() + .map(|field| match field.data_type() { + DataType::Utf8 | DataType::LargeUtf8 => { + field_with_new_type(field, DataType::Utf8View) + } + DataType::Binary | DataType::LargeBinary => { + field_with_new_type(field, DataType::BinaryView) + } + _ => Arc::clone(field), + }) + .collect(); + Schema::new_with_metadata(transformed_fields, schema.metadata.clone()) +} + +/// Coerces the file schema if the table schema uses a view type. +#[cfg(not(target_arch = "wasm32"))] +pub fn coerce_file_schema_to_view_type( + table_schema: &Schema, + file_schema: &Schema, +) -> Option { + let mut transform = false; + let table_fields: HashMap<_, _> = table_schema + .fields + .iter() + .map(|f| { + let dt = f.data_type(); + if dt.equals_datatype(&DataType::Utf8View) + || dt.equals_datatype(&DataType::BinaryView) + { + transform = true; + } + (f.name(), dt) + }) + .collect(); + + if !transform { + return None; + } + + let transformed_fields: Vec> = file_schema + .fields + .iter() + .map( + |field| match (table_fields.get(field.name()), field.data_type()) { + (Some(DataType::Utf8View), DataType::Utf8 | DataType::LargeUtf8) => { + field_with_new_type(field, DataType::Utf8View) + } + ( + Some(DataType::BinaryView), + DataType::Binary | DataType::LargeBinary, + ) => field_with_new_type(field, DataType::BinaryView), + _ => Arc::clone(field), + }, + ) + .collect(); + + Some(Schema::new_with_metadata( + transformed_fields, + file_schema.metadata.clone(), + )) +} + +/// Transform a schema so that any binary types are strings +pub fn transform_binary_to_string(schema: &Schema) -> Schema { + let transformed_fields: Vec> = schema + .fields + .iter() + .map(|field| match field.data_type() { + DataType::Binary => field_with_new_type(field, DataType::Utf8), + DataType::LargeBinary => field_with_new_type(field, DataType::LargeUtf8), + DataType::BinaryView => field_with_new_type(field, DataType::Utf8View), + _ => Arc::clone(field), + }) + .collect(); + Schema::new_with_metadata(transformed_fields, schema.metadata.clone()) +} + +/// If the table schema uses a string type, coerce the file schema to use a string type. +/// +/// See [parquet::ParquetFormat::binary_as_string] for details +#[cfg(not(target_arch = "wasm32"))] +pub fn coerce_file_schema_to_string_type( + table_schema: &Schema, + file_schema: &Schema, +) -> Option { + let mut transform = false; + let table_fields: HashMap<_, _> = table_schema + .fields + .iter() + .map(|f| (f.name(), f.data_type())) + .collect(); + let transformed_fields: Vec> = file_schema + .fields + .iter() + .map( + |field| match (table_fields.get(field.name()), field.data_type()) { + // table schema uses string type, coerce the file schema to use string type + ( + Some(DataType::Utf8), + DataType::Binary | DataType::LargeBinary | DataType::BinaryView, + ) => { + transform = true; + field_with_new_type(field, DataType::Utf8) + } + // table schema uses large string type, coerce the file schema to use large string type + ( + Some(DataType::LargeUtf8), + DataType::Binary | DataType::LargeBinary | DataType::BinaryView, + ) => { + transform = true; + field_with_new_type(field, DataType::LargeUtf8) + } + // table schema uses string view type, coerce the file schema to use view type + ( + Some(DataType::Utf8View), + DataType::Binary | DataType::LargeBinary | DataType::BinaryView, + ) => { + transform = true; + field_with_new_type(field, DataType::Utf8View) + } + _ => Arc::clone(field), + }, + ) + .collect(); + + if !transform { + None + } else { + Some(Schema::new_with_metadata( + transformed_fields, + file_schema.metadata.clone(), + )) + } +} + +#[cfg(test)] +mod tests {} diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs index e60b02f9c9e6..0ed59758476a 100644 --- a/datafusion/datasource/src/mod.rs +++ b/datafusion/datasource/src/mod.rs @@ -27,6 +27,7 @@ pub mod display; pub mod file; pub mod file_compression_type; +pub mod file_format; pub mod file_groups; pub mod file_meta; pub mod file_scan_config; From c7230b803794e998da5ed80c85f87149d4ba4e86 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Tue, 25 Feb 2025 14:54:52 +0000 Subject: [PATCH 2/5] Trigger Build From e017c87cda5e2c3b28363b4ecca00fca59f75b5b Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Tue, 25 Feb 2025 15:06:49 +0000 Subject: [PATCH 3/5] restore pub crate --- datafusion/core/src/datasource/file_format/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index 2a992d0c3d34..5dbf4957a4b5 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -81,7 +81,7 @@ pub trait BatchDeserializer: Send + fmt::Debug { /// [`arrow::csv::reader::Decoder`]: ::arrow::csv::reader::Decoder /// [`Decoder::decode`]: ::arrow::json::reader::Decoder::decode /// [`Decoder::flush`]: ::arrow::json::reader::Decoder::flush -pub trait Decoder: Send + fmt::Debug { +pub(crate) trait Decoder: Send + fmt::Debug { /// See [`arrow::json::reader::Decoder::decode`]. /// /// [`arrow::json::reader::Decoder::decode`]: ::arrow::json::reader::Decoder::decode @@ -155,7 +155,7 @@ impl BatchDeserializer for DecoderDeserializer { /// encoded data, into a stream of `RecordBatch` objects, following the specified /// schema and formatting options. It also handles any buffering necessary to satisfy /// the `Decoder` interface. -pub struct DecoderDeserializer { +pub(crate) struct DecoderDeserializer { /// The underlying decoder used for deserialization pub(crate) decoder: T, /// The buffer used to store the remaining bytes to be decoded From 71c5a04010d5ccc29eb37668e8cfd578cca04ca8 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Tue, 25 Feb 2025 15:12:31 +0000 Subject: [PATCH 4/5] move things into parquet --- .../src/datasource/file_format/parquet.rs | 154 +++++++++++++++++- .../physical_plan/parquet/opener.rs | 2 +- datafusion/datasource/src/file_format.rs | 152 +---------------- 3 files changed, 151 insertions(+), 157 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index e9ecff7baff5..0a07ecf4c312 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -25,11 +25,7 @@ use std::sync::Arc; use super::write::demux::DemuxedStreamReceiver; use super::write::{create_writer, SharedBuffer}; -use super::{ - coerce_file_schema_to_string_type, coerce_file_schema_to_view_type, - transform_binary_to_string, transform_schema_to_view, FileFormat, FileFormatFactory, - FilePushdownSupport, -}; +use super::{FileFormat, FileFormatFactory, FilePushdownSupport}; use crate::arrow::array::RecordBatch; use crate::arrow::datatypes::{Fields, Schema, SchemaRef}; use crate::datasource::file_format::file_compression_type::FileCompressionType; @@ -47,6 +43,7 @@ use crate::physical_plan::{ }; use arrow::compute::sum; +use arrow_schema::{DataType, Field, FieldRef}; use datafusion_catalog::Session; use datafusion_common::config::{ConfigField, ConfigFileType, TableParquetOptions}; use datafusion_common::parsers::CompressionTypeVariant; @@ -471,6 +468,153 @@ impl FileFormat for ParquetFormat { } } +/// Coerces the file schema if the table schema uses a view type. +#[cfg(not(target_arch = "wasm32"))] +pub fn coerce_file_schema_to_view_type( + table_schema: &Schema, + file_schema: &Schema, +) -> Option { + let mut transform = false; + let table_fields: HashMap<_, _> = table_schema + .fields + .iter() + .map(|f| { + let dt = f.data_type(); + if dt.equals_datatype(&DataType::Utf8View) + || dt.equals_datatype(&DataType::BinaryView) + { + transform = true; + } + (f.name(), dt) + }) + .collect(); + + if !transform { + return None; + } + + let transformed_fields: Vec> = file_schema + .fields + .iter() + .map( + |field| match (table_fields.get(field.name()), field.data_type()) { + (Some(DataType::Utf8View), DataType::Utf8 | DataType::LargeUtf8) => { + field_with_new_type(field, DataType::Utf8View) + } + ( + Some(DataType::BinaryView), + DataType::Binary | DataType::LargeBinary, + ) => field_with_new_type(field, DataType::BinaryView), + _ => Arc::clone(field), + }, + ) + .collect(); + + Some(Schema::new_with_metadata( + transformed_fields, + file_schema.metadata.clone(), + )) +} + +/// If the table schema uses a string type, coerce the file schema to use a string type. +/// +/// See [parquet::ParquetFormat::binary_as_string] for details +#[cfg(not(target_arch = "wasm32"))] +pub fn coerce_file_schema_to_string_type( + table_schema: &Schema, + file_schema: &Schema, +) -> Option { + let mut transform = false; + let table_fields: HashMap<_, _> = table_schema + .fields + .iter() + .map(|f| (f.name(), f.data_type())) + .collect(); + let transformed_fields: Vec> = file_schema + .fields + .iter() + .map( + |field| match (table_fields.get(field.name()), field.data_type()) { + // table schema uses string type, coerce the file schema to use string type + ( + Some(DataType::Utf8), + DataType::Binary | DataType::LargeBinary | DataType::BinaryView, + ) => { + transform = true; + field_with_new_type(field, DataType::Utf8) + } + // table schema uses large string type, coerce the file schema to use large string type + ( + Some(DataType::LargeUtf8), + DataType::Binary | DataType::LargeBinary | DataType::BinaryView, + ) => { + transform = true; + field_with_new_type(field, DataType::LargeUtf8) + } + // table schema uses string view type, coerce the file schema to use view type + ( + Some(DataType::Utf8View), + DataType::Binary | DataType::LargeBinary | DataType::BinaryView, + ) => { + transform = true; + field_with_new_type(field, DataType::Utf8View) + } + _ => Arc::clone(field), + }, + ) + .collect(); + + if !transform { + None + } else { + Some(Schema::new_with_metadata( + transformed_fields, + file_schema.metadata.clone(), + )) + } +} + +/// Create a new field with the specified data type, copying the other +/// properties from the input field +fn field_with_new_type(field: &FieldRef, new_type: DataType) -> FieldRef { + Arc::new(field.as_ref().clone().with_data_type(new_type)) +} + +/// Transform a schema to use view types for Utf8 and Binary +/// +/// See [parquet::ParquetFormat::force_view_types] for details +pub fn transform_schema_to_view(schema: &Schema) -> Schema { + let transformed_fields: Vec> = schema + .fields + .iter() + .map(|field| match field.data_type() { + DataType::Utf8 | DataType::LargeUtf8 => { + field_with_new_type(field, DataType::Utf8View) + } + DataType::Binary | DataType::LargeBinary => { + field_with_new_type(field, DataType::BinaryView) + } + _ => Arc::clone(field), + }) + .collect(); + Schema::new_with_metadata(transformed_fields, schema.metadata.clone()) +} + +/// Transform a schema so that any binary types are strings +pub fn transform_binary_to_string(schema: &Schema) -> Schema { + let transformed_fields: Vec> = schema + .fields + .iter() + .map(|field| match field.data_type() { + DataType::Binary => field_with_new_type(field, DataType::Utf8), + DataType::LargeBinary => field_with_new_type(field, DataType::LargeUtf8), + DataType::BinaryView => field_with_new_type(field, DataType::Utf8View), + _ => Arc::clone(field), + }) + .collect(); + Schema::new_with_metadata(transformed_fields, schema.metadata.clone()) +} + /// [`MetadataFetch`] adapter for reading bytes from an [`ObjectStore`] struct ObjectStoreFetch<'a> { store: &'a dyn ObjectStore, diff --git a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs index 138b44897931..4230a1bdce38 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs @@ -19,7 +19,7 @@ use std::sync::Arc; -use crate::datasource::file_format::{ +use crate::datasource::file_format::parquet::{ coerce_file_schema_to_string_type, coerce_file_schema_to_view_type, }; use crate::datasource::physical_plan::parquet::page_filter::PagePruningAccessPlanFilter; diff --git a/datafusion/datasource/src/file_format.rs b/datafusion/datasource/src/file_format.rs index 3c46196c52f9..1eb51490918f 100644 --- a/datafusion/datasource/src/file_format.rs +++ b/datafusion/datasource/src/file_format.rs @@ -28,7 +28,7 @@ use crate::file_compression_type::FileCompressionType; use crate::file_scan_config::FileScanConfig; use crate::file_sink_config::FileSinkConfig; -use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef}; +use arrow::datatypes::{Schema, SchemaRef}; use async_trait::async_trait; use datafusion_catalog::Session; use datafusion_common::file_options::file_type::FileType; @@ -225,153 +225,3 @@ pub fn file_type_to_format( _ => internal_err!("FileType was not DefaultFileType"), } } - -/// Create a new field with the specified data type, copying the other -/// properties from the input field -fn field_with_new_type(field: &FieldRef, new_type: DataType) -> FieldRef { - Arc::new(field.as_ref().clone().with_data_type(new_type)) -} - -/// Transform a schema to use view types for Utf8 and Binary -/// -/// See [parquet::ParquetFormat::force_view_types] for details -pub fn transform_schema_to_view(schema: &Schema) -> Schema { - let transformed_fields: Vec> = schema - .fields - .iter() - .map(|field| match field.data_type() { - DataType::Utf8 | DataType::LargeUtf8 => { - field_with_new_type(field, DataType::Utf8View) - } - DataType::Binary | DataType::LargeBinary => { - field_with_new_type(field, DataType::BinaryView) - } - _ => Arc::clone(field), - }) - .collect(); - Schema::new_with_metadata(transformed_fields, schema.metadata.clone()) -} - -/// Coerces the file schema if the table schema uses a view type. -#[cfg(not(target_arch = "wasm32"))] -pub fn coerce_file_schema_to_view_type( - table_schema: &Schema, - file_schema: &Schema, -) -> Option { - let mut transform = false; - let table_fields: HashMap<_, _> = table_schema - .fields - .iter() - .map(|f| { - let dt = f.data_type(); - if dt.equals_datatype(&DataType::Utf8View) - || dt.equals_datatype(&DataType::BinaryView) - { - transform = true; - } - (f.name(), dt) - }) - .collect(); - - if !transform { - return None; - } - - let transformed_fields: Vec> = file_schema - .fields - .iter() - .map( - |field| match (table_fields.get(field.name()), field.data_type()) { - (Some(DataType::Utf8View), DataType::Utf8 | DataType::LargeUtf8) => { - field_with_new_type(field, DataType::Utf8View) - } - ( - Some(DataType::BinaryView), - DataType::Binary | DataType::LargeBinary, - ) => field_with_new_type(field, DataType::BinaryView), - _ => Arc::clone(field), - }, - ) - .collect(); - - Some(Schema::new_with_metadata( - transformed_fields, - file_schema.metadata.clone(), - )) -} - -/// Transform a schema so that any binary types are strings -pub fn transform_binary_to_string(schema: &Schema) -> Schema { - let transformed_fields: Vec> = schema - .fields - .iter() - .map(|field| match field.data_type() { - DataType::Binary => field_with_new_type(field, DataType::Utf8), - DataType::LargeBinary => field_with_new_type(field, DataType::LargeUtf8), - DataType::BinaryView => field_with_new_type(field, DataType::Utf8View), - _ => Arc::clone(field), - }) - .collect(); - Schema::new_with_metadata(transformed_fields, schema.metadata.clone()) -} - -/// If the table schema uses a string type, coerce the file schema to use a string type. -/// -/// See [parquet::ParquetFormat::binary_as_string] for details -#[cfg(not(target_arch = "wasm32"))] -pub fn coerce_file_schema_to_string_type( - table_schema: &Schema, - file_schema: &Schema, -) -> Option { - let mut transform = false; - let table_fields: HashMap<_, _> = table_schema - .fields - .iter() - .map(|f| (f.name(), f.data_type())) - .collect(); - let transformed_fields: Vec> = file_schema - .fields - .iter() - .map( - |field| match (table_fields.get(field.name()), field.data_type()) { - // table schema uses string type, coerce the file schema to use string type - ( - Some(DataType::Utf8), - DataType::Binary | DataType::LargeBinary | DataType::BinaryView, - ) => { - transform = true; - field_with_new_type(field, DataType::Utf8) - } - // table schema uses large string type, coerce the file schema to use large string type - ( - Some(DataType::LargeUtf8), - DataType::Binary | DataType::LargeBinary | DataType::BinaryView, - ) => { - transform = true; - field_with_new_type(field, DataType::LargeUtf8) - } - // table schema uses string view type, coerce the file schema to use view type - ( - Some(DataType::Utf8View), - DataType::Binary | DataType::LargeBinary | DataType::BinaryView, - ) => { - transform = true; - field_with_new_type(field, DataType::Utf8View) - } - _ => Arc::clone(field), - }, - ) - .collect(); - - if !transform { - None - } else { - Some(Schema::new_with_metadata( - transformed_fields, - file_schema.metadata.clone(), - )) - } -} - -#[cfg(test)] -mod tests {} From 2e8be539157acdf27f735cf5afc4955c8291d3ed Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Tue, 25 Feb 2025 15:27:43 +0000 Subject: [PATCH 5/5] Fix docs references --- datafusion/core/src/datasource/file_format/parquet.rs | 4 ++-- datafusion/datasource/src/file_format.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 0a07ecf4c312..98aa24ad00cb 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -518,7 +518,7 @@ pub fn coerce_file_schema_to_view_type( /// If the table schema uses a string type, coerce the file schema to use a string type. /// -/// See [parquet::ParquetFormat::binary_as_string] for details +/// See [ParquetFormat::binary_as_string] for details #[cfg(not(target_arch = "wasm32"))] pub fn coerce_file_schema_to_string_type( table_schema: &Schema, @@ -582,7 +582,7 @@ fn field_with_new_type(field: &FieldRef, new_type: DataType) -> FieldRef { /// Transform a schema to use view types for Utf8 and Binary /// -/// See [parquet::ParquetFormat::force_view_types] for details +/// See [ParquetFormat::force_view_types] for details pub fn transform_schema_to_view(schema: &Schema) -> Schema { let transformed_fields: Vec> = schema .fields diff --git a/datafusion/datasource/src/file_format.rs b/datafusion/datasource/src/file_format.rs index 1eb51490918f..aa0338fab71d 100644 --- a/datafusion/datasource/src/file_format.rs +++ b/datafusion/datasource/src/file_format.rs @@ -46,7 +46,7 @@ pub const DEFAULT_SCHEMA_INFER_MAX_RECORD: usize = 1000; /// from the [`TableProvider`]. This helps code re-utilization across /// providers that support the same file formats. /// -/// [`TableProvider`]: crate::catalog::TableProvider +/// [`TableProvider`]: datafusion_catalog::TableProvider #[async_trait] pub trait FileFormat: Send + Sync + fmt::Debug { /// Returns the table provider as [`Any`](std::any::Any) so that it can be