Skip to content

Commit

Permalink
Rename DataSource and FileSource fields for consistency (apache#1…
Browse files Browse the repository at this point in the history
…4898)

* Rename DataSource and FileSource fields for consistency

* reduce diff
  • Loading branch information
alamb authored Feb 27, 2025
1 parent fc2fbb3 commit 1ae06a4
Show file tree
Hide file tree
Showing 19 changed files with 126 additions and 104 deletions.
10 changes: 6 additions & 4 deletions datafusion-examples/examples/parquet_exec_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,11 @@ impl ExecutionPlanVisitor for ParquetExecVisitor {
/// or `post_visit` (visit each node after its children/inputs)
fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
// If needed match on a specific `ExecutionPlan` node type
if let Some(data_source) = plan.as_any().downcast_ref::<DataSourceExec>() {
let source = data_source.source();
if let Some(file_config) = source.as_any().downcast_ref::<FileScanConfig>() {
if let Some(data_source_exec) = plan.as_any().downcast_ref::<DataSourceExec>() {
let data_source = data_source_exec.data_source();
if let Some(file_config) =
data_source.as_any().downcast_ref::<FileScanConfig>()
{
if file_config
.file_source()
.as_any()
Expand All @@ -108,7 +110,7 @@ impl ExecutionPlanVisitor for ParquetExecVisitor {
{
self.file_groups = Some(file_config.file_groups.clone());

let metrics = match data_source.metrics() {
let metrics = match data_source_exec.metrics() {
None => return Ok(true),
Some(metrics) => metrics,
};
Expand Down
9 changes: 4 additions & 5 deletions datafusion/core/src/datasource/physical_plan/arrow_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,16 @@ impl ArrowExec {
}

fn file_scan_config(&self) -> FileScanConfig {
let source = self.inner.source();
source
self.inner
.data_source()
.as_any()
.downcast_ref::<FileScanConfig>()
.unwrap()
.clone()
}

fn json_source(&self) -> JsonSource {
let source = self.file_scan_config();
source
self.file_scan_config()
.file_source()
.as_any()
.downcast_ref::<JsonSource>()
Expand Down Expand Up @@ -130,7 +129,7 @@ impl ArrowExec {
self.base_config.file_groups = file_groups.clone();
let mut file_source = self.file_scan_config();
file_source = file_source.with_file_groups(file_groups);
self.inner = self.inner.with_source(Arc::new(file_source));
self.inner = self.inner.with_data_source(Arc::new(file_source));
self
}
}
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,8 @@ impl CsvExec {
}

fn file_scan_config(&self) -> FileScanConfig {
let source = self.inner.source();
source
self.inner
.data_source()
.as_any()
.downcast_ref::<FileScanConfig>()
.unwrap()
Expand Down Expand Up @@ -316,7 +316,7 @@ impl CsvExec {
self.base_config.file_groups = file_groups.clone();
let mut file_source = self.file_scan_config();
file_source = file_source.with_file_groups(file_groups);
self.inner = self.inner.with_source(Arc::new(file_source));
self.inner = self.inner.with_data_source(Arc::new(file_source));
self
}
}
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/datasource/physical_plan/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ impl NdJsonExec {
}

fn file_scan_config(&self) -> FileScanConfig {
let source = self.inner.source();
source
self.inner
.data_source()
.as_any()
.downcast_ref::<FileScanConfig>()
.unwrap()
Expand Down Expand Up @@ -148,7 +148,7 @@ impl NdJsonExec {
self.base_config.file_groups = file_groups.clone();
let mut file_source = self.file_scan_config();
file_source = file_source.with_file_groups(file_groups);
self.inner = self.inner.with_source(Arc::new(file_source));
self.inner = self.inner.with_data_source(Arc::new(file_source));
self
}
}
Expand Down
19 changes: 9 additions & 10 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,17 +292,16 @@ impl ParquetExec {
}
}
fn file_scan_config(&self) -> FileScanConfig {
let source = self.inner.source();
source
self.inner
.data_source()
.as_any()
.downcast_ref::<FileScanConfig>()
.unwrap()
.clone()
}

fn parquet_source(&self) -> ParquetSource {
let source = self.file_scan_config();
source
self.file_scan_config()
.file_source()
.as_any()
.downcast_ref::<ParquetSource>()
Expand Down Expand Up @@ -343,7 +342,7 @@ impl ParquetExec {
let file_source = self.file_scan_config();
self.inner = self
.inner
.with_source(Arc::new(file_source.with_source(Arc::new(parquet))));
.with_data_source(Arc::new(file_source.with_source(Arc::new(parquet))));
self.parquet_file_reader_factory = Some(parquet_file_reader_factory);
self
}
Expand All @@ -366,7 +365,7 @@ impl ParquetExec {
let file_source = self.file_scan_config();
self.inner = self
.inner
.with_source(Arc::new(file_source.with_source(Arc::new(parquet))));
.with_data_source(Arc::new(file_source.with_source(Arc::new(parquet))));
self.schema_adapter_factory = Some(schema_adapter_factory);
self
}
Expand All @@ -380,7 +379,7 @@ impl ParquetExec {
let file_source = self.file_scan_config();
self.inner = self
.inner
.with_source(Arc::new(file_source.with_source(Arc::new(parquet))));
.with_data_source(Arc::new(file_source.with_source(Arc::new(parquet))));
self.table_parquet_options.global.pushdown_filters = pushdown_filters;
self
}
Expand All @@ -404,7 +403,7 @@ impl ParquetExec {
let file_source = self.file_scan_config();
self.inner = self
.inner
.with_source(Arc::new(file_source.with_source(Arc::new(parquet))));
.with_data_source(Arc::new(file_source.with_source(Arc::new(parquet))));
self.table_parquet_options.global.reorder_filters = reorder_filters;
self
}
Expand Down Expand Up @@ -463,7 +462,7 @@ impl ParquetExec {
) -> Self {
let mut config = self.file_scan_config();
config.file_groups = file_groups;
self.inner = self.inner.with_source(Arc::new(config));
self.inner = self.inner.with_data_source(Arc::new(config));
self
}
}
Expand Down Expand Up @@ -1469,7 +1468,7 @@ mod tests {
])
.build();
let partition_count = parquet_exec
.source()
.data_source()
.output_partitioning()
.partition_count();
assert_eq!(partition_count, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ use object_store::ObjectStore;
/// # fn parquet_exec() -> DataSourceExec { unimplemented!() }
/// // Split a single DataSourceExec into multiple DataSourceExecs, one for each file
/// let exec = parquet_exec();
/// let source = exec.source();
/// let base_config = source.as_any().downcast_ref::<FileScanConfig>().unwrap();
/// let data_source = exec.data_source();
/// let base_config = data_source.as_any().downcast_ref::<FileScanConfig>().unwrap();
/// let existing_file_groups = &base_config.file_groups;
/// let new_execs = existing_file_groups
/// .iter()
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,9 @@ pub fn partitioned_file_groups(
pub fn partitioned_csv_config(
schema: SchemaRef,
file_groups: Vec<Vec<PartitionedFile>>,
source: Arc<dyn FileSource>,
file_source: Arc<dyn FileSource>,
) -> FileScanConfig {
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema, source)
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema, file_source)
.with_file_groups(file_groups)
}

Expand Down
9 changes: 5 additions & 4 deletions datafusion/core/src/test_util/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,17 +200,18 @@ impl TestParquetFile {
/// Recursively searches for DataSourceExec and returns the metrics
/// on the first one it finds
pub fn parquet_metrics(plan: &Arc<dyn ExecutionPlan>) -> Option<MetricsSet> {
if let Some(maybe_file) = plan.as_any().downcast_ref::<DataSourceExec>() {
let source = maybe_file.source();
if let Some(maybe_parquet) = source.as_any().downcast_ref::<FileScanConfig>()
if let Some(data_source_exec) = plan.as_any().downcast_ref::<DataSourceExec>() {
let data_source = data_source_exec.data_source();
if let Some(maybe_parquet) =
data_source.as_any().downcast_ref::<FileScanConfig>()
{
if maybe_parquet
.file_source()
.as_any()
.downcast_ref::<ParquetSource>()
.is_some()
{
return maybe_file.metrics();
return data_source_exec.metrics();
}
}
}
Expand Down
27 changes: 18 additions & 9 deletions datafusion/core/tests/parquet/file_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,12 @@ async fn list_files_with_session_level_cache() {
//Session 1 first time list files
assert_eq!(get_list_file_cache_size(&state1), 0);
let exec1 = table1.scan(&state1, None, &[], None).await.unwrap();
let data_source = exec1.as_any().downcast_ref::<DataSourceExec>().unwrap();
let source = data_source.source();
let parquet1 = source.as_any().downcast_ref::<FileScanConfig>().unwrap();
let data_source_exec = exec1.as_any().downcast_ref::<DataSourceExec>().unwrap();
let data_source = data_source_exec.data_source();
let parquet1 = data_source
.as_any()
.downcast_ref::<FileScanConfig>()
.unwrap();

assert_eq!(get_list_file_cache_size(&state1), 1);
let fg = &parquet1.file_groups;
Expand All @@ -163,9 +166,12 @@ async fn list_files_with_session_level_cache() {
//check session 1 cache result not show in session 2
assert_eq!(get_list_file_cache_size(&state2), 0);
let exec2 = table2.scan(&state2, None, &[], None).await.unwrap();
let data_source = exec2.as_any().downcast_ref::<DataSourceExec>().unwrap();
let source = data_source.source();
let parquet2 = source.as_any().downcast_ref::<FileScanConfig>().unwrap();
let data_source_exec = exec2.as_any().downcast_ref::<DataSourceExec>().unwrap();
let data_source = data_source_exec.data_source();
let parquet2 = data_source
.as_any()
.downcast_ref::<FileScanConfig>()
.unwrap();

assert_eq!(get_list_file_cache_size(&state2), 1);
let fg2 = &parquet2.file_groups;
Expand All @@ -176,9 +182,12 @@ async fn list_files_with_session_level_cache() {
//check session 1 cache result not show in session 2
assert_eq!(get_list_file_cache_size(&state1), 1);
let exec3 = table1.scan(&state1, None, &[], None).await.unwrap();
let data_source = exec3.as_any().downcast_ref::<DataSourceExec>().unwrap();
let source = data_source.source();
let parquet3 = source.as_any().downcast_ref::<FileScanConfig>().unwrap();
let data_source_exec = exec3.as_any().downcast_ref::<DataSourceExec>().unwrap();
let data_source = data_source_exec.data_source();
let parquet3 = data_source
.as_any()
.downcast_ref::<FileScanConfig>()
.unwrap();

assert_eq!(get_list_file_cache_size(&state1), 1);
let fg = &parquet3.file_groups;
Expand Down
10 changes: 6 additions & 4 deletions datafusion/core/tests/parquet/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,18 @@ impl MetricsFinder {
impl ExecutionPlanVisitor for MetricsFinder {
type Error = std::convert::Infallible;
fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
if let Some(exec) = plan.as_any().downcast_ref::<DataSourceExec>() {
let source = exec.source();
if let Some(file_config) = source.as_any().downcast_ref::<FileScanConfig>() {
if let Some(data_source_exec) = plan.as_any().downcast_ref::<DataSourceExec>() {
let data_source = data_source_exec.data_source();
if let Some(file_config) =
data_source.as_any().downcast_ref::<FileScanConfig>()
{
if file_config
.file_source()
.as_any()
.downcast_ref::<ParquetSource>()
.is_some()
{
self.metrics = exec.metrics();
self.metrics = data_source_exec.metrics();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ fn test_memory_after_projection() -> Result<()> {
.as_any()
.downcast_ref::<DataSourceExec>()
.unwrap()
.source()
.data_source()
.as_any()
.downcast_ref::<MemorySourceConfig>()
.unwrap()
Expand Down
9 changes: 6 additions & 3 deletions datafusion/core/tests/sql/path_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,12 @@ async fn parquet_partition_pruning_filter() -> Result<()> {
Expr::gt(col("id"), lit(1)),
];
let exec = table.scan(&ctx.state(), None, &filters, None).await?;
let data_source = exec.as_any().downcast_ref::<DataSourceExec>().unwrap();
let source = data_source.source();
let file_source = source.as_any().downcast_ref::<FileScanConfig>().unwrap();
let data_source_exec = exec.as_any().downcast_ref::<DataSourceExec>().unwrap();
let data_source = data_source_exec.data_source();
let file_source = data_source
.as_any()
.downcast_ref::<FileScanConfig>()
.unwrap();
let parquet_config = file_source
.file_source()
.as_any()
Expand Down
Loading

0 comments on commit 1ae06a4

Please sign in to comment.