Skip to content

Commit

Permalink
Move FileSourceConfig and FileStream to the new `datafusion-datas…
Browse files Browse the repository at this point in the history
…ource` (#14838)

* Initial work

* Fix some CI issues

* remove cyclical dev-dependency on core

* Trying to keep some key things accessible in the same way

* ignore rustdoc test for example

* Restore doc test with mock parquet source

---------

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
  • Loading branch information
AdamGS and alamb authored Feb 25, 2025
1 parent d0ab003 commit 9285b84
Show file tree
Hide file tree
Showing 36 changed files with 2,727 additions and 2,597 deletions.
2 changes: 1 addition & 1 deletion datafusion-examples/examples/csv_json_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use arrow::datatypes::{DataType, Field, Schema};
use datafusion::datasource::physical_plan::JsonSource;
use datafusion::{
assert_batches_eq,
datasource::physical_plan::FileSource,
datasource::{
data_source::FileSource,
file_format::file_compression_type::FileCompressionType,
listing::PartitionedFile,
object_store::ObjectStoreUrl,
Expand Down
5 changes: 3 additions & 2 deletions datafusion-examples/examples/custom_file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ use arrow::{
array::{AsArray, RecordBatch, StringArray, UInt8Array},
datatypes::{DataType, Field, Schema, SchemaRef, UInt64Type},
};
use datafusion::datasource::data_source::FileSource;
use datafusion::execution::session_state::SessionStateBuilder;
use datafusion::physical_expr::LexRequirement;
use datafusion::physical_expr::PhysicalExpr;
use datafusion::{
catalog::Session,
common::{GetExt, Statistics},
};
use datafusion::{
datasource::physical_plan::FileSource, execution::session_state::SessionStateBuilder,
};
use datafusion::{
datasource::{
file_format::{
Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use std::{error::Error, path::PathBuf};
///
/// Expects to be called about like this:
///
/// `assert_batch_eq!(expected_lines: &[&str], batches: &[RecordBatch])`
/// `assert_batches_eq!(expected_lines: &[&str], batches: &[RecordBatch])`
///
/// # Example
/// ```
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ nested_expressions = ["datafusion-functions-nested"]
# This feature is deprecated. Use the `nested_expressions` feature instead.
array_expressions = ["nested_expressions"]
# Used to enable the avro format
avro = ["apache-avro", "num-traits", "datafusion-common/avro"]
avro = ["apache-avro", "num-traits", "datafusion-common/avro", "datafusion-datasource/avro"]
backtrace = ["datafusion-common/backtrace"]
compression = ["xz2", "bzip2", "flate2", "zstd", "datafusion-datasource/compression"]
crypto_expressions = ["datafusion-functions/crypto_expressions"]
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ use super::write::{create_writer, SharedBuffer};
use super::FileFormatFactory;
use crate::datasource::file_format::write::get_writer_schema;
use crate::datasource::file_format::FileFormat;
use crate::datasource::physical_plan::{
ArrowSource, FileGroupDisplay, FileScanConfig, FileSink, FileSinkConfig,
};
use crate::datasource::physical_plan::{ArrowSource, FileSink, FileSinkConfig};
use crate::error::Result;
use crate::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};

Expand All @@ -49,13 +47,15 @@ use datafusion_common::{
not_impl_err, DataFusionError, GetExt, Statistics, DEFAULT_ARROW_EXTENSION,
};
use datafusion_common_runtime::SpawnedTask;
use datafusion_datasource::display::FileGroupDisplay;
use datafusion_datasource::file::FileSource;
use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::dml::InsertOp;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr_common::sort_expr::LexRequirement;
use datafusion_physical_plan::insert::{DataSink, DataSinkExec};

use crate::datasource::data_source::FileSource;
use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::BoxStream;
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/src/datasource/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,11 @@ use super::file_compression_type::FileCompressionType;
use super::FileFormat;
use super::FileFormatFactory;
use crate::datasource::avro_to_arrow::read_avro_schema_from_reader;
use crate::datasource::physical_plan::{AvroSource, FileScanConfig};
use crate::datasource::physical_plan::AvroSource;
use crate::error::Result;
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::Statistics;

use crate::datasource::data_source::FileSource;
use arrow::datatypes::Schema;
use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
Expand All @@ -40,6 +39,8 @@ use datafusion_common::internal_err;
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::GetExt;
use datafusion_common::DEFAULT_AVRO_EXTENSION;
use datafusion_datasource::file::FileSource;
use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_physical_expr::PhysicalExpr;
use object_store::{GetResultPayload, ObjectMeta, ObjectStore};

Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ use super::{
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::file_format::write::demux::DemuxedStreamReceiver;
use crate::datasource::file_format::write::BatchSerializer;
use crate::datasource::physical_plan::{
CsvSource, FileGroupDisplay, FileScanConfig, FileSink, FileSinkConfig,
};
use crate::datasource::physical_plan::{CsvSource, FileSink, FileSinkConfig};
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::insert::{DataSink, DataSinkExec};
Expand All @@ -51,12 +49,14 @@ use datafusion_common::{
exec_err, not_impl_err, DataFusionError, GetExt, DEFAULT_CSV_EXTENSION,
};
use datafusion_common_runtime::SpawnedTask;
use datafusion_datasource::display::FileGroupDisplay;
use datafusion_datasource::file::FileSource;
use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_execution::TaskContext;
use datafusion_expr::dml::InsertOp;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr_common::sort_expr::LexRequirement;

use crate::datasource::data_source::FileSource;
use async_trait::async_trait;
use bytes::{Buf, Bytes};
use futures::stream::BoxStream;
Expand Down
7 changes: 3 additions & 4 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ use super::{
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::file_format::write::demux::DemuxedStreamReceiver;
use crate::datasource::file_format::write::BatchSerializer;
use crate::datasource::physical_plan::{
FileGroupDisplay, FileSink, FileSinkConfig, JsonSource,
};
use crate::datasource::physical_plan::{FileSink, FileSinkConfig, JsonSource};
use crate::error::Result;
use crate::execution::SessionState;
use crate::physical_plan::insert::{DataSink, DataSinkExec};
Expand All @@ -52,12 +50,13 @@ use datafusion_common::config::{ConfigField, ConfigFileType, JsonOptions};
use datafusion_common::file_options::json_writer::JsonWriterOptions;
use datafusion_common::{not_impl_err, GetExt, DEFAULT_JSON_EXTENSION};
use datafusion_common_runtime::SpawnedTask;
use datafusion_datasource::display::FileGroupDisplay;
use datafusion_datasource::file::FileSource;
use datafusion_execution::TaskContext;
use datafusion_expr::dml::InsertOp;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_plan::ExecutionPlan;

use crate::datasource::data_source::FileSource;
use async_trait::async_trait;
use bytes::{Buf, Bytes};
use datafusion_physical_expr_common::sort_expr::LexRequirement;
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ pub mod json;
pub mod options;
#[cfg(feature = "parquet")]
pub mod parquet;
use datafusion_datasource::file::FileSource;
pub use datafusion_datasource::file_compression_type;
use datafusion_datasource::file_scan_config::FileScanConfig;
pub use datafusion_datasource::write;

use std::any::Any;
Expand All @@ -40,7 +42,7 @@ use std::task::Poll;
use crate::arrow::array::RecordBatch;
use crate::arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef};
use crate::arrow::error::ArrowError;
use crate::datasource::physical_plan::{FileScanConfig, FileSinkConfig};
use crate::datasource::physical_plan::FileSinkConfig;
use crate::error::Result;
use crate::physical_plan::{ExecutionPlan, Statistics};

Expand All @@ -50,7 +52,6 @@ use datafusion_common::{internal_err, not_impl_err, GetExt};
use datafusion_expr::Expr;
use datafusion_physical_expr::PhysicalExpr;

use crate::datasource::data_source::FileSource;
use async_trait::async_trait;
use bytes::{Buf, Bytes};
use datafusion_physical_expr_common::sort_expr::LexRequirement;
Expand Down
8 changes: 5 additions & 3 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ use super::write::{create_writer, SharedBuffer};
use super::{
coerce_file_schema_to_string_type, coerce_file_schema_to_view_type,
transform_binary_to_string, transform_schema_to_view, FileFormat, FileFormatFactory,
FilePushdownSupport, FileScanConfig,
FilePushdownSupport,
};
use crate::arrow::array::RecordBatch;
use crate::arrow::datatypes::{Fields, Schema, SchemaRef};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::file_format::write::get_writer_schema;
use crate::datasource::physical_plan::parquet::can_expr_be_pushed_down_with_schemas;
use crate::datasource::physical_plan::parquet::source::ParquetSource;
use crate::datasource::physical_plan::{FileGroupDisplay, FileSink, FileSinkConfig};
use crate::datasource::physical_plan::{FileSink, FileSinkConfig};
use crate::datasource::statistics::{create_max_min_accs, get_col_stats};
use crate::error::Result;
use crate::execution::SessionState;
Expand All @@ -57,6 +57,9 @@ use datafusion_common::{
DEFAULT_PARQUET_EXTENSION,
};
use datafusion_common_runtime::SpawnedTask;
use datafusion_datasource::display::FileGroupDisplay;
use datafusion_datasource::file::FileSource;
use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation};
use datafusion_execution::TaskContext;
use datafusion_expr::dml::InsertOp;
Expand All @@ -65,7 +68,6 @@ use datafusion_functions_aggregate::min_max::{MaxAccumulator, MinAccumulator};
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr_common::sort_expr::LexRequirement;

use crate::datasource::data_source::FileSource;
use async_trait::async_trait;
use bytes::Bytes;
use futures::future::BoxFuture;
Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ use crate::datasource::{
file_compression_type::FileCompressionType, FileFormat, FilePushdownSupport,
},
get_statistics_with_limit,
physical_plan::{FileScanConfig, FileSinkConfig},
physical_plan::FileSinkConfig,
};
use crate::execution::context::SessionState;
use datafusion_catalog::TableProvider;
use datafusion_common::{config_err, DataFusionError, Result};
use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_expr::dml::InsertOp;
use datafusion_expr::{utils::conjunction, Expr, TableProviderFilterPushDown};
use datafusion_expr::{SortExpr, TableType};
Expand Down
1 change: 0 additions & 1 deletion datafusion/core/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
pub mod avro_to_arrow;
pub mod cte_worktable;
pub mod data_source;
pub mod default_table_source;
pub mod dynamic_file;
pub mod empty;
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/src/datasource/physical_plan/arrow_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@
use std::any::Any;
use std::sync::Arc;

use crate::datasource::data_source::FileSource;
use crate::datasource::listing::PartitionedFile;
use crate::datasource::physical_plan::{
FileMeta, FileOpenFuture, FileOpener, FileScanConfig, JsonSource,
FileMeta, FileOpenFuture, FileOpener, JsonSource,
};
use crate::error::Result;

Expand All @@ -32,6 +31,8 @@ use arrow::datatypes::SchemaRef;
use arrow_ipc::reader::FileDecoder;
use datafusion_common::config::ConfigOptions;
use datafusion_common::{Constraints, Statistics};
use datafusion_datasource::file::FileSource;
use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_datasource::source::DataSourceExec;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
Expand Down
6 changes: 4 additions & 2 deletions datafusion/core/src/datasource/physical_plan/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,16 @@ use std::any::Any;
use std::fmt::Formatter;
use std::sync::Arc;

use super::{FileOpener, FileScanConfig};
use super::FileOpener;
#[cfg(feature = "avro")]
use crate::datasource::avro_to_arrow::Reader as AvroReader;
use crate::datasource::data_source::FileSource;

use crate::error::Result;

use arrow::datatypes::SchemaRef;
use datafusion_common::{Constraints, Statistics};
use datafusion_datasource::file::FileSource;
use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_datasource::source::DataSourceExec;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
Expand Down
6 changes: 4 additions & 2 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use std::io::{Read, Seek, SeekFrom};
use std::sync::Arc;
use std::task::Poll;

use super::{calculate_range, FileScanConfig, RangeCalculation};
use crate::datasource::data_source::FileSource;
use super::{calculate_range, RangeCalculation};

use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::file_format::{deserialize_stream, DecoderDeserializer};
use crate::datasource::listing::{FileRange, ListingTableUrl, PartitionedFile};
Expand All @@ -37,6 +37,8 @@ use arrow::csv;
use arrow::datatypes::SchemaRef;
use datafusion_common::config::ConfigOptions;
use datafusion_common::{Constraints, Statistics};
use datafusion_datasource::file::FileSource;
use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_datasource::source::DataSourceExec;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
Expand Down
Loading

0 comments on commit 9285b84

Please sign in to comment.