From 1ae06a497e7c6b117c211c52b33445c2063b9921 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 27 Feb 2025 09:51:28 -0500 Subject: [PATCH] Rename `DataSource` and `FileSource` fields for consistency (#14898) * Rename DataSource and FileSource fields for consistency * reduce diff --- .../examples/parquet_exec_visitor.rs | 10 ++-- .../datasource/physical_plan/arrow_file.rs | 9 ++-- .../core/src/datasource/physical_plan/csv.rs | 6 +-- .../core/src/datasource/physical_plan/json.rs | 6 +-- .../datasource/physical_plan/parquet/mod.rs | 19 ++++--- .../physical_plan/parquet/source.rs | 4 +- datafusion/core/src/test/mod.rs | 4 +- datafusion/core/src/test_util/parquet.rs | 9 ++-- .../core/tests/parquet/file_statistics.rs | 27 ++++++---- datafusion/core/tests/parquet/utils.rs | 10 ++-- .../physical_optimizer/projection_pushdown.rs | 2 +- datafusion/core/tests/sql/path_partition.rs | 9 ++-- datafusion/datasource/src/file_scan_config.rs | 24 ++++----- datafusion/datasource/src/memory.rs | 2 +- datafusion/datasource/src/source.rs | 50 ++++++++++--------- .../proto/src/physical_plan/from_proto.rs | 7 +-- datafusion/proto/src/physical_plan/mod.rs | 20 +++++--- .../tests/cases/roundtrip_physical_plan.rs | 6 +-- .../substrait/src/physical_plan/producer.rs | 6 +-- 19 files changed, 126 insertions(+), 104 deletions(-) diff --git a/datafusion-examples/examples/parquet_exec_visitor.rs b/datafusion-examples/examples/parquet_exec_visitor.rs index 6c9f1a354430..96bd6c369dc1 100644 --- a/datafusion-examples/examples/parquet_exec_visitor.rs +++ b/datafusion-examples/examples/parquet_exec_visitor.rs @@ -97,9 +97,11 @@ impl ExecutionPlanVisitor for ParquetExecVisitor { /// or `post_visit` (visit each node after its children/inputs) fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result { // If needed match on a specific `ExecutionPlan` node type - if let Some(data_source) = plan.as_any().downcast_ref::() { - let source = data_source.source(); - if let Some(file_config) = source.as_any().downcast_ref::() { + if let Some(data_source_exec) = plan.as_any().downcast_ref::() { + let data_source = data_source_exec.data_source(); + if let Some(file_config) = + data_source.as_any().downcast_ref::() + { if file_config .file_source() .as_any() @@ -108,7 +110,7 @@ impl ExecutionPlanVisitor for ParquetExecVisitor { { self.file_groups = Some(file_config.file_groups.clone()); - let metrics = match data_source.metrics() { + let metrics = match data_source_exec.metrics() { None => return Ok(true), Some(metrics) => metrics, }; diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs b/datafusion/core/src/datasource/physical_plan/arrow_file.rs index 1cae5c5084b1..a0e1135e2cac 100644 --- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs +++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs @@ -84,8 +84,8 @@ impl ArrowExec { } fn file_scan_config(&self) -> FileScanConfig { - let source = self.inner.source(); - source + self.inner + .data_source() .as_any() .downcast_ref::() .unwrap() @@ -93,8 +93,7 @@ impl ArrowExec { } fn json_source(&self) -> JsonSource { - let source = self.file_scan_config(); - source + self.file_scan_config() .file_source() .as_any() .downcast_ref::() @@ -130,7 +129,7 @@ impl ArrowExec { self.base_config.file_groups = file_groups.clone(); let mut file_source = self.file_scan_config(); file_source = file_source.with_file_groups(file_groups); - self.inner = self.inner.with_source(Arc::new(file_source)); + self.inner = self.inner.with_data_source(Arc::new(file_source)); self } } diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index 1552060d067d..bc7d6779bbfd 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -254,8 +254,8 @@ impl CsvExec { } fn file_scan_config(&self) -> FileScanConfig { - let source = self.inner.source(); - source + self.inner + .data_source() .as_any() .downcast_ref::() .unwrap() @@ -316,7 +316,7 @@ impl CsvExec { self.base_config.file_groups = file_groups.clone(); let mut file_source = self.file_scan_config(); file_source = file_source.with_file_groups(file_groups); - self.inner = self.inner.with_source(Arc::new(file_source)); + self.inner = self.inner.with_data_source(Arc::new(file_source)); self } } diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs index c92d4dfdf835..c9a22add2afc 100644 --- a/datafusion/core/src/datasource/physical_plan/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -103,8 +103,8 @@ impl NdJsonExec { } fn file_scan_config(&self) -> FileScanConfig { - let source = self.inner.source(); - source + self.inner + .data_source() .as_any() .downcast_ref::() .unwrap() @@ -148,7 +148,7 @@ impl NdJsonExec { self.base_config.file_groups = file_groups.clone(); let mut file_source = self.file_scan_config(); file_source = file_source.with_file_groups(file_groups); - self.inner = self.inner.with_source(Arc::new(file_source)); + self.inner = self.inner.with_data_source(Arc::new(file_source)); self } } diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 89902336ce5e..b83b2ebbaf0f 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -292,8 +292,8 @@ impl ParquetExec { } } fn file_scan_config(&self) -> FileScanConfig { - let source = self.inner.source(); - source + self.inner + .data_source() .as_any() .downcast_ref::() .unwrap() @@ -301,8 +301,7 @@ impl ParquetExec { } fn parquet_source(&self) -> ParquetSource { - let source = self.file_scan_config(); - source + self.file_scan_config() .file_source() .as_any() .downcast_ref::() @@ -343,7 +342,7 @@ impl ParquetExec { let file_source = self.file_scan_config(); self.inner = self .inner - .with_source(Arc::new(file_source.with_source(Arc::new(parquet)))); + .with_data_source(Arc::new(file_source.with_source(Arc::new(parquet)))); self.parquet_file_reader_factory = Some(parquet_file_reader_factory); self } @@ -366,7 +365,7 @@ impl ParquetExec { let file_source = self.file_scan_config(); self.inner = self .inner - .with_source(Arc::new(file_source.with_source(Arc::new(parquet)))); + .with_data_source(Arc::new(file_source.with_source(Arc::new(parquet)))); self.schema_adapter_factory = Some(schema_adapter_factory); self } @@ -380,7 +379,7 @@ impl ParquetExec { let file_source = self.file_scan_config(); self.inner = self .inner - .with_source(Arc::new(file_source.with_source(Arc::new(parquet)))); + .with_data_source(Arc::new(file_source.with_source(Arc::new(parquet)))); self.table_parquet_options.global.pushdown_filters = pushdown_filters; self } @@ -404,7 +403,7 @@ impl ParquetExec { let file_source = self.file_scan_config(); self.inner = self .inner - .with_source(Arc::new(file_source.with_source(Arc::new(parquet)))); + .with_data_source(Arc::new(file_source.with_source(Arc::new(parquet)))); self.table_parquet_options.global.reorder_filters = reorder_filters; self } @@ -463,7 +462,7 @@ impl ParquetExec { ) -> Self { let mut config = self.file_scan_config(); config.file_groups = file_groups; - self.inner = self.inner.with_source(Arc::new(config)); + self.inner = self.inner.with_data_source(Arc::new(config)); self } } @@ -1469,7 +1468,7 @@ mod tests { ]) .build(); let partition_count = parquet_exec - .source() + .data_source() .output_partitioning() .partition_count(); assert_eq!(partition_count, 1); diff --git a/datafusion/core/src/datasource/physical_plan/parquet/source.rs b/datafusion/core/src/datasource/physical_plan/parquet/source.rs index 0f0863905a3c..142725524f1b 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/source.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/source.rs @@ -164,8 +164,8 @@ use object_store::ObjectStore; /// # fn parquet_exec() -> DataSourceExec { unimplemented!() } /// // Split a single DataSourceExec into multiple DataSourceExecs, one for each file /// let exec = parquet_exec(); -/// let source = exec.source(); -/// let base_config = source.as_any().downcast_ref::().unwrap(); +/// let data_source = exec.data_source(); +/// let base_config = data_source.as_any().downcast_ref::().unwrap(); /// let existing_file_groups = &base_config.file_groups; /// let new_execs = existing_file_groups /// .iter() diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs index 489e1ed240f4..23885c41c61a 100644 --- a/datafusion/core/src/test/mod.rs +++ b/datafusion/core/src/test/mod.rs @@ -190,9 +190,9 @@ pub fn partitioned_file_groups( pub fn partitioned_csv_config( schema: SchemaRef, file_groups: Vec>, - source: Arc, + file_source: Arc, ) -> FileScanConfig { - FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema, source) + FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema, file_source) .with_file_groups(file_groups) } diff --git a/datafusion/core/src/test_util/parquet.rs b/datafusion/core/src/test_util/parquet.rs index 9c6888bb8b10..c0be13baf21a 100644 --- a/datafusion/core/src/test_util/parquet.rs +++ b/datafusion/core/src/test_util/parquet.rs @@ -200,9 +200,10 @@ impl TestParquetFile { /// Recursively searches for DataSourceExec and returns the metrics /// on the first one it finds pub fn parquet_metrics(plan: &Arc) -> Option { - if let Some(maybe_file) = plan.as_any().downcast_ref::() { - let source = maybe_file.source(); - if let Some(maybe_parquet) = source.as_any().downcast_ref::() + if let Some(data_source_exec) = plan.as_any().downcast_ref::() { + let data_source = data_source_exec.data_source(); + if let Some(maybe_parquet) = + data_source.as_any().downcast_ref::() { if maybe_parquet .file_source() @@ -210,7 +211,7 @@ impl TestParquetFile { .downcast_ref::() .is_some() { - return maybe_file.metrics(); + return data_source_exec.metrics(); } } } diff --git a/datafusion/core/tests/parquet/file_statistics.rs b/datafusion/core/tests/parquet/file_statistics.rs index ad75cf2607c4..4c1d17c8426e 100644 --- a/datafusion/core/tests/parquet/file_statistics.rs +++ b/datafusion/core/tests/parquet/file_statistics.rs @@ -150,9 +150,12 @@ async fn list_files_with_session_level_cache() { //Session 1 first time list files assert_eq!(get_list_file_cache_size(&state1), 0); let exec1 = table1.scan(&state1, None, &[], None).await.unwrap(); - let data_source = exec1.as_any().downcast_ref::().unwrap(); - let source = data_source.source(); - let parquet1 = source.as_any().downcast_ref::().unwrap(); + let data_source_exec = exec1.as_any().downcast_ref::().unwrap(); + let data_source = data_source_exec.data_source(); + let parquet1 = data_source + .as_any() + .downcast_ref::() + .unwrap(); assert_eq!(get_list_file_cache_size(&state1), 1); let fg = &parquet1.file_groups; @@ -163,9 +166,12 @@ async fn list_files_with_session_level_cache() { //check session 1 cache result not show in session 2 assert_eq!(get_list_file_cache_size(&state2), 0); let exec2 = table2.scan(&state2, None, &[], None).await.unwrap(); - let data_source = exec2.as_any().downcast_ref::().unwrap(); - let source = data_source.source(); - let parquet2 = source.as_any().downcast_ref::().unwrap(); + let data_source_exec = exec2.as_any().downcast_ref::().unwrap(); + let data_source = data_source_exec.data_source(); + let parquet2 = data_source + .as_any() + .downcast_ref::() + .unwrap(); assert_eq!(get_list_file_cache_size(&state2), 1); let fg2 = &parquet2.file_groups; @@ -176,9 +182,12 @@ async fn list_files_with_session_level_cache() { //check session 1 cache result not show in session 2 assert_eq!(get_list_file_cache_size(&state1), 1); let exec3 = table1.scan(&state1, None, &[], None).await.unwrap(); - let data_source = exec3.as_any().downcast_ref::().unwrap(); - let source = data_source.source(); - let parquet3 = source.as_any().downcast_ref::().unwrap(); + let data_source_exec = exec3.as_any().downcast_ref::().unwrap(); + let data_source = data_source_exec.data_source(); + let parquet3 = data_source + .as_any() + .downcast_ref::() + .unwrap(); assert_eq!(get_list_file_cache_size(&state1), 1); let fg = &parquet3.file_groups; diff --git a/datafusion/core/tests/parquet/utils.rs b/datafusion/core/tests/parquet/utils.rs index 8cb50b22cf63..b16d420bf75c 100644 --- a/datafusion/core/tests/parquet/utils.rs +++ b/datafusion/core/tests/parquet/utils.rs @@ -47,16 +47,18 @@ impl MetricsFinder { impl ExecutionPlanVisitor for MetricsFinder { type Error = std::convert::Infallible; fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result { - if let Some(exec) = plan.as_any().downcast_ref::() { - let source = exec.source(); - if let Some(file_config) = source.as_any().downcast_ref::() { + if let Some(data_source_exec) = plan.as_any().downcast_ref::() { + let data_source = data_source_exec.data_source(); + if let Some(file_config) = + data_source.as_any().downcast_ref::() + { if file_config .file_source() .as_any() .downcast_ref::() .is_some() { - self.metrics = exec.metrics(); + self.metrics = data_source_exec.metrics(); } } } diff --git a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs index dfba57a584ea..836758b21318 100644 --- a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs @@ -460,7 +460,7 @@ fn test_memory_after_projection() -> Result<()> { .as_any() .downcast_ref::() .unwrap() - .source() + .data_source() .as_any() .downcast_ref::() .unwrap() diff --git a/datafusion/core/tests/sql/path_partition.rs b/datafusion/core/tests/sql/path_partition.rs index 1a19bfe9e86f..c88051d5c9ef 100644 --- a/datafusion/core/tests/sql/path_partition.rs +++ b/datafusion/core/tests/sql/path_partition.rs @@ -85,9 +85,12 @@ async fn parquet_partition_pruning_filter() -> Result<()> { Expr::gt(col("id"), lit(1)), ]; let exec = table.scan(&ctx.state(), None, &filters, None).await?; - let data_source = exec.as_any().downcast_ref::().unwrap(); - let source = data_source.source(); - let file_source = source.as_any().downcast_ref::().unwrap(); + let data_source_exec = exec.as_any().downcast_ref::().unwrap(); + let data_source = data_source_exec.data_source(); + let file_source = data_source + .as_any() + .downcast_ref::() + .unwrap(); let parquet_config = file_source .file_source() .as_any() diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index eb8fc2956af0..df38464f1b00 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -166,7 +166,7 @@ pub struct FileScanConfig { /// Are new lines in values supported for CSVOptions pub new_lines_in_values: bool, /// File source such as `ParquetSource`, `CsvSource`, `JsonSource`, etc. - pub source: Arc, + pub file_source: Arc, } impl DataSource for FileScanConfig { @@ -178,7 +178,7 @@ impl DataSource for FileScanConfig { let object_store = context.runtime_env().object_store(&self.object_store_url)?; let source = self - .source + .file_source .with_batch_size(context.session_config().batch_size()) .with_schema(Arc::clone(&self.file_schema)) .with_projection(self); @@ -223,7 +223,7 @@ impl DataSource for FileScanConfig { repartition_file_min_size: usize, output_ordering: Option, ) -> Result>> { - let source = self.source.repartitioned( + let source = self.file_source.repartitioned( target_partitions, repartition_file_min_size, output_ordering, @@ -244,7 +244,7 @@ impl DataSource for FileScanConfig { } fn statistics(&self) -> Result { - self.source.statistics() + self.file_source.statistics() } fn with_fetch(&self, limit: Option) -> Option> { @@ -257,7 +257,7 @@ impl DataSource for FileScanConfig { } fn metrics(&self) -> ExecutionPlanMetricsSet { - self.source.metrics().clone() + self.file_source.metrics().clone() } fn try_swapping_with_projection( @@ -268,7 +268,7 @@ impl DataSource for FileScanConfig { // This process can be moved into CsvExec, but it would be an overlap of their responsibility. Ok(all_alias_free_columns(projection.expr()).then(|| { let file_scan = self.clone(); - let source = Arc::clone(&file_scan.source); + let source = Arc::clone(&file_scan.file_source); let new_projections = new_projections_for_columns( projection, &file_scan @@ -315,7 +315,7 @@ impl FileScanConfig { output_ordering: vec![], file_compression_type: FileCompressionType::UNCOMPRESSED, new_lines_in_values: false, - source: Arc::clone(&file_source), + file_source: Arc::clone(&file_source), }; config = config.with_source(Arc::clone(&file_source)); @@ -323,14 +323,14 @@ impl FileScanConfig { } /// Set the file source - pub fn with_source(mut self, source: Arc) -> Self { + pub fn with_source(mut self, file_source: Arc) -> Self { let ( _projected_schema, _constraints, projected_statistics, _projected_output_ordering, ) = self.project(); - self.source = source.with_statistics(projected_statistics); + self.file_source = file_source.with_statistics(projected_statistics); self } @@ -597,13 +597,13 @@ impl FileScanConfig { /// Write the data_type based on file_source fn fmt_file_source(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult { - write!(f, ", file_type={}", self.source.file_type())?; - self.source.fmt_extra(t, f) + write!(f, ", file_type={}", self.file_source.file_type())?; + self.file_source.fmt_extra(t, f) } /// Returns the file_source pub fn file_source(&self) -> &Arc { - &self.source + &self.file_source } } diff --git a/datafusion/datasource/src/memory.rs b/datafusion/datasource/src/memory.rs index 516226d56849..363ef4348e91 100644 --- a/datafusion/datasource/src/memory.rs +++ b/datafusion/datasource/src/memory.rs @@ -247,7 +247,7 @@ impl MemoryExec { fn memory_source_config(&self) -> MemorySourceConfig { self.inner - .source() + .data_source() .as_any() .downcast_ref::() .unwrap() diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 40df75856ee1..b3089a6e59fe 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -45,6 +45,8 @@ pub trait DataSource: Send + Sync + Debug { ) -> datafusion_common::Result; fn as_any(&self) -> &dyn Any; fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result; + + /// Return a copy of this DataSource with a new partitioning scheme fn repartitioned( &self, _target_partitions: usize, @@ -57,6 +59,7 @@ pub trait DataSource: Send + Sync + Debug { fn output_partitioning(&self) -> Partitioning; fn eq_properties(&self) -> EquivalenceProperties; fn statistics(&self) -> datafusion_common::Result; + /// Return a copy of this DataSource with a new fetch limit fn with_fetch(&self, _limit: Option) -> Option>; fn fetch(&self) -> Option; fn metrics(&self) -> ExecutionPlanMetricsSet { @@ -71,14 +74,14 @@ pub trait DataSource: Send + Sync + Debug { /// Unified data source for file formats like JSON, CSV, AVRO, ARROW, PARQUET #[derive(Clone, Debug)] pub struct DataSourceExec { - source: Arc, + data_source: Arc, cache: PlanProperties, } impl DisplayAs for DataSourceExec { fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result { write!(f, "DataSourceExec: ")?; - self.source.fmt_as(t, f) + self.data_source.fmt_as(t, f) } } @@ -111,17 +114,17 @@ impl ExecutionPlan for DataSourceExec { target_partitions: usize, config: &ConfigOptions, ) -> datafusion_common::Result>> { - let source = self.source.repartitioned( + let data_source = self.data_source.repartitioned( target_partitions, config.optimizer.repartition_file_min_size, self.properties().eq_properties.output_ordering(), )?; - if let Some(source) = source { + if let Some(source) = data_source { let output_partitioning = source.output_partitioning(); let plan = self .clone() - .with_source(source) + .with_data_source(source) // Changing source partitioning may invalidate output partitioning. Update it also .with_partitioning(output_partitioning); Ok(Some(Arc::new(plan))) @@ -135,51 +138,50 @@ impl ExecutionPlan for DataSourceExec { partition: usize, context: Arc, ) -> datafusion_common::Result { - self.source.open(partition, context) + self.data_source.open(partition, context) } fn metrics(&self) -> Option { - Some(self.source.metrics().clone_inner()) + Some(self.data_source.metrics().clone_inner()) } fn statistics(&self) -> datafusion_common::Result { - self.source.statistics() + self.data_source.statistics() } fn with_fetch(&self, limit: Option) -> Option> { - let mut source = Arc::clone(&self.source); - source = source.with_fetch(limit)?; + let data_source = self.data_source.with_fetch(limit)?; let cache = self.cache.clone(); - Some(Arc::new(Self { source, cache })) + Some(Arc::new(Self { data_source, cache })) } fn fetch(&self) -> Option { - self.source.fetch() + self.data_source.fetch() } fn try_swapping_with_projection( &self, projection: &ProjectionExec, ) -> datafusion_common::Result>> { - self.source.try_swapping_with_projection(projection) + self.data_source.try_swapping_with_projection(projection) } } impl DataSourceExec { - pub fn new(source: Arc) -> Self { - let cache = Self::compute_properties(Arc::clone(&source)); - Self { source, cache } + pub fn new(data_source: Arc) -> Self { + let cache = Self::compute_properties(Arc::clone(&data_source)); + Self { data_source, cache } } /// Return the source object - pub fn source(&self) -> &Arc { - &self.source + pub fn data_source(&self) -> &Arc { + &self.data_source } - pub fn with_source(mut self, source: Arc) -> Self { - self.cache = Self::compute_properties(Arc::clone(&source)); - self.source = source; + pub fn with_data_source(mut self, data_source: Arc) -> Self { + self.cache = Self::compute_properties(Arc::clone(&data_source)); + self.data_source = data_source; self } @@ -195,10 +197,10 @@ impl DataSourceExec { self } - fn compute_properties(source: Arc) -> PlanProperties { + fn compute_properties(data_source: Arc) -> PlanProperties { PlanProperties::new( - source.eq_properties(), - source.output_partitioning(), + data_source.eq_properties(), + data_source.output_partitioning(), EmissionType::Incremental, Boundedness::Bounded, ) diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index ce01865b8c73..6331b7fb3114 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -479,7 +479,7 @@ pub fn parse_protobuf_file_scan_config( proto: &protobuf::FileScanExecConf, registry: &dyn FunctionRegistry, codec: &dyn PhysicalExtensionCodec, - source: Arc, + file_source: Arc, ) -> Result { let schema: Arc = parse_protobuf_file_scan_schema(proto)?; let projection = proto @@ -537,14 +537,15 @@ pub fn parse_protobuf_file_scan_config( output_ordering.push(sort_expr); } - Ok(FileScanConfig::new(object_store_url, file_schema, source) + let config = FileScanConfig::new(object_store_url, file_schema, file_source) .with_file_groups(file_groups) .with_constraints(constraints) .with_statistics(statistics) .with_projection(projection) .with_limit(proto.limit.as_ref().map(|sl| sl.limit as usize)) .with_table_partition_cols(table_partition_cols) - .with_output_ordering(output_ordering)) + .with_output_ordering(output_ordering); + Ok(config) } impl TryFrom<&protobuf::PartitionedFile> for PartitionedFile { diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 2c596255587b..d0a31097b5cd 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -1630,9 +1630,10 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { }); } - if let Some(exec) = plan.downcast_ref::() { - let source = exec.source(); - if let Some(maybe_csv) = source.as_any().downcast_ref::() { + if let Some(data_source_exec) = plan.downcast_ref::() { + let data_source = data_source_exec.data_source(); + if let Some(maybe_csv) = data_source.as_any().downcast_ref::() + { let source = maybe_csv.file_source(); if let Some(csv_config) = source.as_any().downcast_ref::() { return Ok(protobuf::PhysicalPlanNode { @@ -1677,8 +1678,9 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { #[cfg(feature = "parquet")] if let Some(exec) = plan.downcast_ref::() { - let source = exec.source(); - if let Some(maybe_parquet) = source.as_any().downcast_ref::() + let data_source_exec = exec.data_source(); + if let Some(maybe_parquet) = + data_source_exec.as_any().downcast_ref::() { let source = maybe_parquet.file_source(); if let Some(conf) = source.as_any().downcast_ref::() { @@ -1704,9 +1706,11 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { } } - if let Some(exec) = plan.downcast_ref::() { - let source = exec.source(); - if let Some(maybe_avro) = source.as_any().downcast_ref::() { + if let Some(data_source_exec) = plan.downcast_ref::() { + let data_source = data_source_exec.data_source(); + if let Some(maybe_avro) = + data_source.as_any().downcast_ref::() + { let source = maybe_avro.file_source(); if source.as_any().downcast_ref::().is_some() { return Ok(protobuf::PhysicalPlanNode { diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index a8ecb2d0749e..a2506bb318d2 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -738,7 +738,7 @@ fn roundtrip_parquet_exec_with_pruning_predicate() -> Result<()> { output_ordering: vec![], file_compression_type: FileCompressionType::UNCOMPRESSED, new_lines_in_values: false, - source, + file_source: source, }; roundtrip_test(scan_config.build()) @@ -769,7 +769,7 @@ async fn roundtrip_parquet_exec_with_table_partition_cols() -> Result<()> { output_ordering: vec![], file_compression_type: FileCompressionType::UNCOMPRESSED, new_lines_in_values: false, - source, + file_source: source, }; roundtrip_test(scan_config.build()) @@ -810,7 +810,7 @@ fn roundtrip_parquet_exec_with_custom_predicate_expr() -> Result<()> { output_ordering: vec![], file_compression_type: FileCompressionType::UNCOMPRESSED, new_lines_in_values: false, - source, + file_source: source, }; #[derive(Debug, Clone, Eq)] diff --git a/datafusion/substrait/src/physical_plan/producer.rs b/datafusion/substrait/src/physical_plan/producer.rs index e8c15731228c..f84db541b7c3 100644 --- a/datafusion/substrait/src/physical_plan/producer.rs +++ b/datafusion/substrait/src/physical_plan/producer.rs @@ -51,9 +51,9 @@ pub fn to_substrait_rel( HashMap, ), ) -> Result> { - if let Some(data_source) = plan.as_any().downcast_ref::() { - let source = data_source.source(); - if let Some(file_config) = source.as_any().downcast_ref::() { + if let Some(data_source_exec) = plan.as_any().downcast_ref::() { + let data_source = data_source_exec.data_source(); + if let Some(file_config) = data_source.as_any().downcast_ref::() { let is_parquet = file_config .file_source() .as_any()