From 82b03420b147c44dd5c646c20fc9a2d035feb86c Mon Sep 17 00:00:00 2001 From: wiedld Date: Tue, 4 Mar 2025 16:17:47 -0800 Subject: [PATCH] refactor(15003): refactor test suite in EnforceDistribution, to use test config builder --- .../enforce_distribution.rs | 654 +++++++++++++----- 1 file changed, 486 insertions(+), 168 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index fc2394d889d1..14d4accbb357 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -371,46 +371,91 @@ macro_rules! plans_matches_expected { } } +fn test_suite_default_config_options() -> ConfigOptions { + let mut config = ConfigOptions::new(); + + // By default, will not repartition / resort data if it is already sorted. + config.optimizer.prefer_existing_sort = false; + + // By default, will attempt to convert Union to Interleave. + config.optimizer.prefer_existing_union = false; + + // By default, will not repartition file scans. + config.optimizer.repartition_file_scans = false; + config.optimizer.repartition_file_min_size = 1024; + + // By default, set query execution concurrency to 10. + config.execution.target_partitions = 10; + + // Use a small batch size, to trigger RoundRobin in tests + config.execution.batch_size = 1; + + config +} + +/// How the optimizers are run. +#[derive(PartialEq, Clone)] +enum DoFirst { + /// Runs: (EnforceDistribution, EnforceDistribution, EnforceSorting) + Distribution, + /// Runs: (EnforceSorting, EnforceDistribution, EnforceDistribution) + Sorting, +} + +#[derive(Clone)] +struct TestConfig { + config: ConfigOptions, + optimizers_to_run: DoFirst, +} + +impl TestConfig { + fn new(optimizers_to_run: DoFirst) -> Self { + Self { + config: test_suite_default_config_options(), + optimizers_to_run, + } + } + + /// If preferred, will not repartition / resort data if it is already sorted. + fn with_prefer_existing_sort(mut self) -> Self { + self.config.optimizer.prefer_existing_sort = true; + self + } + + /// If preferred, will not attempt to convert Union to Interleave. + fn with_prefer_existing_union(mut self) -> Self { + self.config.optimizer.prefer_existing_union = true; + self + } + + /// If preferred, will repartition file scans. + /// Accepts a minimum file size to repartition. + fn with_prefer_repartition_file_scans(mut self, file_min_size: usize) -> Self { + self.config.optimizer.repartition_file_scans = true; + self.config.optimizer.repartition_file_min_size = file_min_size; + self + } + + /// Set the preferred target partitions for query execution concurrency. + fn with_query_execution_partitions(mut self, target_partitions: usize) -> Self { + self.config.execution.target_partitions = target_partitions; + self + } +} + /// Runs the repartition optimizer and asserts the plan against the expected /// Arguments /// * `EXPECTED_LINES` - Expected output plan /// * `PLAN` - Input plan -/// * `FIRST_ENFORCE_DIST` - -/// true: (EnforceDistribution, EnforceDistribution, EnforceSorting) -/// false: else runs (EnforceSorting, EnforceDistribution, EnforceDistribution) -/// * `PREFER_EXISTING_SORT` (optional) - if true, will not repartition / resort data if it is already sorted -/// * `TARGET_PARTITIONS` (optional) - number of partitions to repartition to -/// * `REPARTITION_FILE_SCANS` (optional) - if true, will repartition file scans -/// * `REPARTITION_FILE_MIN_SIZE` (optional) - minimum file size to repartition -/// * `PREFER_EXISTING_UNION` (optional) - if true, will not attempt to convert Union to Interleave +/// * `CONFIG` - [`TestConfig`] macro_rules! assert_optimized { - ($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr) => { - assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, false, 10, false, 1024, false); - }; - - ($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $PREFER_EXISTING_SORT: expr) => { - assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, $PREFER_EXISTING_SORT, 10, false, 1024, false); - }; - - ($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $PREFER_EXISTING_SORT: expr, $PREFER_EXISTING_UNION: expr) => { - assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, $PREFER_EXISTING_SORT, 10, false, 1024, $PREFER_EXISTING_UNION); - }; - - ($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $PREFER_EXISTING_SORT: expr, $TARGET_PARTITIONS: expr, $REPARTITION_FILE_SCANS: expr, $REPARTITION_FILE_MIN_SIZE: expr) => { - assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, $PREFER_EXISTING_SORT, $TARGET_PARTITIONS, $REPARTITION_FILE_SCANS, $REPARTITION_FILE_MIN_SIZE, false); - }; - - ($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $PREFER_EXISTING_SORT: expr, $TARGET_PARTITIONS: expr, $REPARTITION_FILE_SCANS: expr, $REPARTITION_FILE_MIN_SIZE: expr, $PREFER_EXISTING_UNION: expr) => { + ($EXPECTED_LINES: expr, $PLAN: expr, $CONFIG: expr) => { let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect(); - let mut config = ConfigOptions::new(); - config.execution.target_partitions = $TARGET_PARTITIONS; - config.optimizer.repartition_file_scans = $REPARTITION_FILE_SCANS; - config.optimizer.repartition_file_min_size = $REPARTITION_FILE_MIN_SIZE; - config.optimizer.prefer_existing_sort = $PREFER_EXISTING_SORT; - config.optimizer.prefer_existing_union = $PREFER_EXISTING_UNION; - // Use a small batch size, to trigger RoundRobin in tests - config.execution.batch_size = 1; + let TestConfig { + config, + optimizers_to_run, + } = $CONFIG; // NOTE: These tests verify the joint `EnforceDistribution` + `EnforceSorting` cascade // because they were written prior to the separation of `BasicEnforcement` into @@ -454,7 +499,7 @@ macro_rules! assert_optimized { // TODO: End state payloads will be checked here. } - let optimized = if $FIRST_ENFORCE_DIST { + let optimized = if *optimizers_to_run == DoFirst::Distribution { // Run enforce distribution rule first: let optimizer = EnforceDistribution::new(); let optimized = optimizer.optimize(optimized, &config)?; @@ -601,8 +646,12 @@ fn multi_hash_joins() -> Result<()> { "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ], }; - assert_optimized!(expected, top_join.clone(), true); - assert_optimized!(expected, top_join, false); + assert_optimized!( + expected, + top_join.clone(), + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!(expected, top_join, &TestConfig::new(DoFirst::Sorting)); } JoinType::RightSemi | JoinType::RightAnti => {} } @@ -665,8 +714,12 @@ fn multi_hash_joins() -> Result<()> { "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ], }; - assert_optimized!(expected, top_join.clone(), true); - assert_optimized!(expected, top_join, false); + assert_optimized!( + expected, + top_join.clone(), + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!(expected, top_join, &TestConfig::new(DoFirst::Sorting)); } JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {} } @@ -722,8 +775,12 @@ fn multi_joins_after_alias() -> Result<()> { "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected, top_join.clone(), true); - assert_optimized!(expected, top_join, false); + assert_optimized!( + expected, + top_join.clone(), + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!(expected, top_join, &TestConfig::new(DoFirst::Sorting)); // Join on (a2 == c) let top_join_on = vec![( @@ -748,8 +805,12 @@ fn multi_joins_after_alias() -> Result<()> { "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected, top_join.clone(), true); - assert_optimized!(expected, top_join, false); + assert_optimized!( + expected, + top_join.clone(), + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!(expected, top_join, &TestConfig::new(DoFirst::Sorting)); Ok(()) } @@ -802,8 +863,12 @@ fn multi_joins_after_multi_alias() -> Result<()> { "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected, top_join.clone(), true); - assert_optimized!(expected, top_join, false); + assert_optimized!( + expected, + top_join.clone(), + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!(expected, top_join, &TestConfig::new(DoFirst::Sorting)); Ok(()) } @@ -842,8 +907,12 @@ fn join_after_agg_alias() -> Result<()> { "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected, join.clone(), true); - assert_optimized!(expected, join, false); + assert_optimized!( + expected, + join.clone(), + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!(expected, join, &TestConfig::new(DoFirst::Sorting)); Ok(()) } @@ -895,8 +964,12 @@ fn hash_join_key_ordering() -> Result<()> { "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected, join.clone(), true); - assert_optimized!(expected, join, false); + assert_optimized!( + expected, + join.clone(), + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!(expected, join, &TestConfig::new(DoFirst::Sorting)); Ok(()) } @@ -1021,8 +1094,16 @@ fn multi_hash_join_key_ordering() -> Result<()> { "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]", "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected, filter_top_join.clone(), true); - assert_optimized!(expected, filter_top_join, false); + assert_optimized!( + expected, + filter_top_join.clone(), + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!( + expected, + filter_top_join, + &TestConfig::new(DoFirst::Sorting) + ); Ok(()) } @@ -1300,6 +1381,7 @@ fn reorder_join_keys_to_right_input() -> Result<()> { Ok(()) } +/// These test cases use [`TestConfig::with_prefer_existing_sort`]. #[test] fn multi_smj_joins() -> Result<()> { let left = parquet_exec(); @@ -1393,7 +1475,11 @@ fn multi_smj_joins() -> Result<()> { "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ], }; - assert_optimized!(expected, top_join.clone(), true, true); + assert_optimized!( + expected, + top_join.clone(), + &TestConfig::new(DoFirst::Distribution).with_prefer_existing_sort() + ); let expected_first_sort_enforcement = match join_type { // Should include 6 RepartitionExecs (3 hash, 3 round-robin), 3 SortExecs @@ -1447,7 +1533,11 @@ fn multi_smj_joins() -> Result<()> { "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ], }; - assert_optimized!(expected_first_sort_enforcement, top_join, false, true); + assert_optimized!( + expected_first_sort_enforcement, + top_join, + &TestConfig::new(DoFirst::Sorting).with_prefer_existing_sort() + ); match join_type { JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => { @@ -1504,7 +1594,11 @@ fn multi_smj_joins() -> Result<()> { // this match arm cannot be reached _ => unreachable!() }; - assert_optimized!(expected, top_join.clone(), true, true); + assert_optimized!( + expected, + top_join.clone(), + &TestConfig::new(DoFirst::Distribution).with_prefer_existing_sort() + ); let expected_first_sort_enforcement = match join_type { // Should include 6 RepartitionExecs (3 of them preserves order) and 3 SortExecs @@ -1550,7 +1644,12 @@ fn multi_smj_joins() -> Result<()> { // this match arm cannot be reached _ => unreachable!() }; - assert_optimized!(expected_first_sort_enforcement, top_join, false, true); + + assert_optimized!( + expected_first_sort_enforcement, + top_join, + &TestConfig::new(DoFirst::Sorting).with_prefer_existing_sort() + ); } _ => {} } @@ -1559,6 +1658,7 @@ fn multi_smj_joins() -> Result<()> { Ok(()) } +/// These test cases use [`TestConfig::with_prefer_existing_sort`]. #[test] fn smj_join_key_ordering() -> Result<()> { // group by (a as a1, b as b1) @@ -1624,7 +1724,11 @@ fn smj_join_key_ordering() -> Result<()> { "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected, join.clone(), true, true); + assert_optimized!( + expected, + join.clone(), + &TestConfig::new(DoFirst::Distribution).with_prefer_existing_sort() + ); let expected_first_sort_enforcement = &[ "SortMergeJoin: join_type=Inner, on=[(b3@1, b2@1), (a3@0, a2@0)]", @@ -1650,7 +1754,11 @@ fn smj_join_key_ordering() -> Result<()> { "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected_first_sort_enforcement, join, false, true); + assert_optimized!( + expected_first_sort_enforcement, + join, + &TestConfig::new(DoFirst::Sorting).with_prefer_existing_sort() + ); Ok(()) } @@ -1681,7 +1789,7 @@ fn merge_does_not_need_sort() -> Result<()> { "CoalesceBatchesExec: target_batch_size=4096", "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet", ]; - assert_optimized!(expected, exec, true); + assert_optimized!(expected, exec, &TestConfig::new(DoFirst::Distribution)); // In this case preserving ordering through order preserving operators is not desirable // (according to flag: PREFER_EXISTING_SORT) @@ -1693,7 +1801,7 @@ fn merge_does_not_need_sort() -> Result<()> { "CoalesceBatchesExec: target_batch_size=4096", "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet", ]; - assert_optimized!(expected, exec, false); + assert_optimized!(expected, exec, &TestConfig::new(DoFirst::Sorting)); Ok(()) } @@ -1734,8 +1842,12 @@ fn union_to_interleave() -> Result<()> { "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected, plan.clone(), true); - assert_optimized!(expected, plan.clone(), false); + assert_optimized!( + expected, + plan.clone(), + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!(expected, plan.clone(), &TestConfig::new(DoFirst::Sorting)); Ok(()) } @@ -1777,24 +1889,16 @@ fn union_not_to_interleave() -> Result<()> { "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - // no sort in the plan but since we need it as a parameter, make it default false - let prefer_existing_sort = false; - let first_enforce_distribution = true; - let prefer_existing_union = true; assert_optimized!( expected, plan.clone(), - first_enforce_distribution, - prefer_existing_sort, - prefer_existing_union + &TestConfig::new(DoFirst::Distribution).with_prefer_existing_union() ); assert_optimized!( expected, plan, - !first_enforce_distribution, - prefer_existing_sort, - prefer_existing_union + &TestConfig::new(DoFirst::Sorting).with_prefer_existing_union() ); Ok(()) @@ -1812,8 +1916,12 @@ fn added_repartition_to_single_partition() -> Result<()> { "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected, plan.clone(), true); - assert_optimized!(expected, plan, false); + assert_optimized!( + expected, + plan.clone(), + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting)); Ok(()) } @@ -1831,14 +1939,17 @@ fn repartition_deepest_node() -> Result<()> { "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected, plan.clone(), true); - assert_optimized!(expected, plan, false); + assert_optimized!( + expected, + plan.clone(), + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting)); Ok(()) } #[test] - fn repartition_unsorted_limit() -> Result<()> { let plan = limit_exec(filter_exec(parquet_exec())); @@ -1852,8 +1963,12 @@ fn repartition_unsorted_limit() -> Result<()> { "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected, plan.clone(), true); - assert_optimized!(expected, plan, false); + assert_optimized!( + expected, + plan.clone(), + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting)); Ok(()) } @@ -1874,8 +1989,12 @@ fn repartition_sorted_limit() -> Result<()> { "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected, plan.clone(), true); - assert_optimized!(expected, plan, false); + assert_optimized!( + expected, + plan.clone(), + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting)); Ok(()) } @@ -1902,8 +2021,12 @@ fn repartition_sorted_limit_with_filter() -> Result<()> { "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected, plan.clone(), true); - assert_optimized!(expected, plan, false); + assert_optimized!( + expected, + plan.clone(), + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting)); Ok(()) } @@ -1932,8 +2055,12 @@ fn repartition_ignores_limit() -> Result<()> { // Expect no repartition to happen for local limit "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected, plan.clone(), true); - assert_optimized!(expected, plan, false); + assert_optimized!( + expected, + plan.clone(), + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting)); Ok(()) } @@ -1952,8 +2079,12 @@ fn repartition_ignores_union() -> Result<()> { "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected, plan.clone(), true); - assert_optimized!(expected, plan, false); + assert_optimized!( + expected, + plan.clone(), + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting)); Ok(()) } @@ -1973,8 +2104,12 @@ fn repartition_through_sort_preserving_merge() -> Result<()> { "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected, plan.clone(), true); - assert_optimized!(expected, plan, false); + assert_optimized!( + expected, + plan.clone(), + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting)); Ok(()) } @@ -1999,14 +2134,18 @@ fn repartition_ignores_sort_preserving_merge() -> Result<()> { "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; - assert_optimized!(expected, plan.clone(), true); + assert_optimized!( + expected, + plan.clone(), + &TestConfig::new(DoFirst::Distribution) + ); let expected = &[ "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", "CoalescePartitionsExec", "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; - assert_optimized!(expected, plan, false); + assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting)); Ok(()) } @@ -2030,7 +2169,11 @@ fn repartition_ignores_sort_preserving_merge_with_union() -> Result<()> { "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; - assert_optimized!(expected, plan.clone(), true); + assert_optimized!( + expected, + plan.clone(), + &TestConfig::new(DoFirst::Distribution) + ); let expected = &[ "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", @@ -2039,11 +2182,12 @@ fn repartition_ignores_sort_preserving_merge_with_union() -> Result<()> { "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; - assert_optimized!(expected, plan, false); + assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting)); Ok(()) } +/// These test cases use [`TestConfig::with_prefer_existing_sort`]. #[test] fn repartition_does_not_destroy_sort() -> Result<()> { // SortRequired @@ -2066,8 +2210,16 @@ fn repartition_does_not_destroy_sort() -> Result<()> { "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[d@3 ASC], file_type=parquet", ]; - assert_optimized!(expected, plan.clone(), true, true); - assert_optimized!(expected, plan, false, true); + assert_optimized!( + expected, + plan.clone(), + &TestConfig::new(DoFirst::Distribution).with_prefer_existing_sort() + ); + assert_optimized!( + expected, + plan, + &TestConfig::new(DoFirst::Sorting).with_prefer_existing_sort() + ); Ok(()) } @@ -2107,8 +2259,12 @@ fn repartition_does_not_destroy_sort_more_complex() -> Result<()> { "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected, plan.clone(), true); - assert_optimized!(expected, plan, false); + assert_optimized!( + expected, + plan.clone(), + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting)); Ok(()) } @@ -2141,7 +2297,11 @@ fn repartition_transitively_with_projection() -> Result<()> { "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected, plan.clone(), true); + assert_optimized!( + expected, + plan.clone(), + &TestConfig::new(DoFirst::Distribution) + ); let expected_first_sort_enforcement = &[ "SortExec: expr=[sum@0 ASC], preserve_partitioning=[false]", @@ -2151,7 +2311,11 @@ fn repartition_transitively_with_projection() -> Result<()> { "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected_first_sort_enforcement, plan, false); + assert_optimized!( + expected_first_sort_enforcement, + plan, + &TestConfig::new(DoFirst::Sorting) + ); Ok(()) } @@ -2183,8 +2347,12 @@ fn repartition_ignores_transitively_with_projection() -> Result<()> { "ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]", "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; - assert_optimized!(expected, plan.clone(), true); - assert_optimized!(expected, plan, false); + assert_optimized!( + expected, + plan.clone(), + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting)); Ok(()) } @@ -2216,8 +2384,12 @@ fn repartition_transitively_past_sort_with_projection() -> Result<()> { "ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]", "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected, plan.clone(), true); - assert_optimized!(expected, plan, false); + assert_optimized!( + expected, + plan.clone(), + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting)); Ok(()) } @@ -2240,7 +2412,11 @@ fn repartition_transitively_past_sort_with_filter() -> Result<()> { "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected, plan.clone(), true); + assert_optimized!( + expected, + plan.clone(), + &TestConfig::new(DoFirst::Distribution) + ); let expected_first_sort_enforcement = &[ "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", @@ -2250,7 +2426,11 @@ fn repartition_transitively_past_sort_with_filter() -> Result<()> { "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected_first_sort_enforcement, plan, false); + assert_optimized!( + expected_first_sort_enforcement, + plan, + &TestConfig::new(DoFirst::Sorting) + ); Ok(()) } @@ -2287,7 +2467,11 @@ fn repartition_transitively_past_sort_with_projection_and_filter() -> Result<()> "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected, plan.clone(), true); + assert_optimized!( + expected, + plan.clone(), + &TestConfig::new(DoFirst::Distribution) + ); let expected_first_sort_enforcement = &[ "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", @@ -2297,7 +2481,11 @@ fn repartition_transitively_past_sort_with_projection_and_filter() -> Result<()> "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected_first_sort_enforcement, plan, false); + assert_optimized!( + expected_first_sort_enforcement, + plan, + &TestConfig::new(DoFirst::Sorting) + ); Ok(()) } @@ -2320,8 +2508,12 @@ fn parallelization_single_partition() -> Result<()> { "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]", "DataSourceExec: file_groups={2 groups: [[x:0..50], [x:50..100]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", ]; - assert_optimized!(expected_parquet, plan_parquet, true, false, 2, true, 10); - assert_optimized!(expected_csv, plan_csv, true, false, 2, true, 10); + + let test_config = TestConfig::new(DoFirst::Distribution) + .with_prefer_repartition_file_scans(10) + .with_query_execution_partitions(2); + assert_optimized!(expected_parquet, plan_parquet, &test_config); + assert_optimized!(expected_csv, plan_csv, &test_config); Ok(()) } @@ -2337,6 +2529,10 @@ fn parallelization_multiple_files() -> Result<()> { let plan = filter_exec(parquet_exec_multiple_sorted(vec![sort_key.clone()])); let plan = sort_required_exec_with_req(plan, sort_key); + let test_config = TestConfig::new(DoFirst::Distribution) + .with_prefer_existing_sort() + .with_prefer_repartition_file_scans(1); + // The groups must have only contiguous ranges of rows from the same file // if any group has rows from multiple files, the data is no longer sorted destroyed // https://github.com/apache/datafusion/issues/8451 @@ -2344,17 +2540,10 @@ fn parallelization_multiple_files() -> Result<()> { "SortRequiredExec: [a@0 ASC]", "FilterExec: c@2 = 0", "DataSourceExec: file_groups={3 groups: [[x:0..50], [y:0..100], [x:50..100]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet", ]; - let target_partitions = 3; - let repartition_size = 1; assert_optimized!( expected, plan, - true, - true, - target_partitions, - true, - repartition_size, - false + &test_config.clone().with_query_execution_partitions(3) ); let expected = [ @@ -2362,17 +2551,10 @@ fn parallelization_multiple_files() -> Result<()> { "FilterExec: c@2 = 0", "DataSourceExec: file_groups={8 groups: [[x:0..25], [y:0..25], [x:25..50], [y:25..50], [x:50..75], [y:50..75], [x:75..100], [y:75..100]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet", ]; - let target_partitions = 8; - let repartition_size = 1; assert_optimized!( expected, plan, - true, - true, - target_partitions, - true, - repartition_size, - false + &test_config.with_query_execution_partitions(8) ); Ok(()) @@ -2423,7 +2605,13 @@ fn parallelization_compressed_csv() -> Result<()> { .build(), vec![("a".to_string(), "a".to_string())], ); - assert_optimized!(expected, plan, true, false, 2, true, 10, false); + assert_optimized!( + expected, + plan, + &TestConfig::new(DoFirst::Distribution) + .with_query_execution_partitions(2) + .with_prefer_repartition_file_scans(10) + ); } Ok(()) } @@ -2448,8 +2636,11 @@ fn parallelization_two_partitions() -> Result<()> { // Plan already has two partitions "DataSourceExec: file_groups={2 groups: [[x:0..100], [y:0..100]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", ]; - assert_optimized!(expected_parquet, plan_parquet, true, false, 2, true, 10); - assert_optimized!(expected_csv, plan_csv, true, false, 2, true, 10); + let test_config = TestConfig::new(DoFirst::Distribution) + .with_query_execution_partitions(2) + .with_prefer_repartition_file_scans(10); + assert_optimized!(expected_parquet, plan_parquet, &test_config); + assert_optimized!(expected_csv, plan_csv, &test_config); Ok(()) } @@ -2473,8 +2664,11 @@ fn parallelization_two_partitions_into_four() -> Result<()> { // Multiple source files splitted across partitions "DataSourceExec: file_groups={4 groups: [[x:0..50], [x:50..100], [y:0..50], [y:50..100]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", ]; - assert_optimized!(expected_parquet, plan_parquet, true, false, 4, true, 10); - assert_optimized!(expected_csv, plan_csv, true, false, 4, true, 10); + let test_config = TestConfig::new(DoFirst::Distribution) + .with_query_execution_partitions(4) + .with_prefer_repartition_file_scans(10); + assert_optimized!(expected_parquet, plan_parquet, &test_config); + assert_optimized!(expected_csv, plan_csv, &test_config); Ok(()) } @@ -2505,8 +2699,16 @@ fn parallelization_sorted_limit() -> Result<()> { // Doesn't parallelize for SortExec without preserve_partitioning "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", ]; - assert_optimized!(expected_parquet, plan_parquet, true); - assert_optimized!(expected_csv, plan_csv, true); + assert_optimized!( + expected_parquet, + plan_parquet, + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!( + expected_csv, + plan_csv, + &TestConfig::new(DoFirst::Distribution) + ); Ok(()) } @@ -2549,8 +2751,16 @@ fn parallelization_limit_with_filter() -> Result<()> { // SortExec doesn't benefit from input partitioning "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", ]; - assert_optimized!(expected_parquet, plan_parquet, true); - assert_optimized!(expected_csv, plan_csv, true); + assert_optimized!( + expected_parquet, + plan_parquet, + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!( + expected_csv, + plan_csv, + &TestConfig::new(DoFirst::Distribution) + ); Ok(()) } @@ -2597,8 +2807,16 @@ fn parallelization_ignores_limit() -> Result<()> { "LocalLimitExec: fetch=100", "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", ]; - assert_optimized!(expected_parquet, plan_parquet, true); - assert_optimized!(expected_csv, plan_csv, true); + assert_optimized!( + expected_parquet, + plan_parquet, + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!( + expected_csv, + plan_csv, + &TestConfig::new(DoFirst::Distribution) + ); Ok(()) } @@ -2626,8 +2844,16 @@ fn parallelization_union_inputs() -> Result<()> { "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", ]; - assert_optimized!(expected_parquet, plan_parquet, true); - assert_optimized!(expected_csv, plan_csv, true); + assert_optimized!( + expected_parquet, + plan_parquet, + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!( + expected_csv, + plan_csv, + &TestConfig::new(DoFirst::Distribution) + ); Ok(()) } @@ -2654,8 +2880,16 @@ fn parallelization_prior_to_sort_preserving_merge() -> Result<()> { let expected_csv = &[ "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false", ]; - assert_optimized!(expected_parquet, plan_parquet, true); - assert_optimized!(expected_csv, plan_csv, true); + assert_optimized!( + expected_parquet, + plan_parquet, + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!( + expected_csv, + plan_csv, + &TestConfig::new(DoFirst::Distribution) + ); Ok(()) } @@ -2688,8 +2922,16 @@ fn parallelization_sort_preserving_merge_with_union() -> Result<()> { "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false", "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false", ]; - assert_optimized!(expected_parquet, plan_parquet, true); - assert_optimized!(expected_csv, plan_csv, true); + assert_optimized!( + expected_parquet, + plan_parquet, + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!( + expected_csv, + plan_csv, + &TestConfig::new(DoFirst::Distribution) + ); Ok(()) } @@ -2719,8 +2961,16 @@ fn parallelization_does_not_benefit() -> Result<()> { "SortRequiredExec: [c@2 ASC]", "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false", ]; - assert_optimized!(expected_parquet, plan_parquet, true); - assert_optimized!(expected_csv, plan_csv, true); + assert_optimized!( + expected_parquet, + plan_parquet, + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!( + expected_csv, + plan_csv, + &TestConfig::new(DoFirst::Distribution) + ); Ok(()) } @@ -2759,7 +3009,11 @@ fn parallelization_ignores_transitively_with_projection_parquet() -> Result<()> "ProjectionExec: expr=[a@0 as a2, c@2 as c2]", "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; - assert_optimized!(expected_parquet, plan_parquet, true); + assert_optimized!( + expected_parquet, + plan_parquet, + &TestConfig::new(DoFirst::Distribution) + ); Ok(()) } @@ -2798,7 +3052,11 @@ fn parallelization_ignores_transitively_with_projection_csv() -> Result<()> { "ProjectionExec: expr=[a@0 as a2, c@2 as c2]", "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false", ]; - assert_optimized!(expected_csv, plan_csv, true); + assert_optimized!( + expected_csv, + plan_csv, + &TestConfig::new(DoFirst::Distribution) + ); Ok(()) } @@ -2822,12 +3080,17 @@ fn remove_redundant_roundrobins() -> Result<()> { "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected, physical_plan.clone(), true); - assert_optimized!(expected, physical_plan, false); + assert_optimized!( + expected, + physical_plan.clone(), + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!(expected, physical_plan, &TestConfig::new(DoFirst::Sorting)); Ok(()) } +/// This test case uses [`TestConfig::with_prefer_existing_sort`]. #[test] fn remove_unnecessary_spm_after_filter() -> Result<()> { let schema = schema(); @@ -2846,13 +3109,21 @@ fn remove_unnecessary_spm_after_filter() -> Result<()> { "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2, preserve_order=true, sort_exprs=c@2 ASC", "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; - // last flag sets config.optimizer.PREFER_EXISTING_SORT - assert_optimized!(expected, physical_plan.clone(), true, true); - assert_optimized!(expected, physical_plan, false, true); + assert_optimized!( + expected, + physical_plan.clone(), + &TestConfig::new(DoFirst::Distribution).with_prefer_existing_sort() + ); + assert_optimized!( + expected, + physical_plan, + &TestConfig::new(DoFirst::Sorting).with_prefer_existing_sort() + ); Ok(()) } +/// This test case uses [`TestConfig::with_prefer_existing_sort`]. #[test] fn preserve_ordering_through_repartition() -> Result<()> { let schema = schema(); @@ -2869,9 +3140,16 @@ fn preserve_ordering_through_repartition() -> Result<()> { "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2, preserve_order=true, sort_exprs=d@3 ASC", "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[d@3 ASC], file_type=parquet", ]; - // last flag sets config.optimizer.PREFER_EXISTING_SORT - assert_optimized!(expected, physical_plan.clone(), true, true); - assert_optimized!(expected, physical_plan, false, true); + assert_optimized!( + expected, + physical_plan.clone(), + &TestConfig::new(DoFirst::Distribution).with_prefer_existing_sort() + ); + assert_optimized!( + expected, + physical_plan, + &TestConfig::new(DoFirst::Sorting).with_prefer_existing_sort() + ); Ok(()) } @@ -2894,7 +3172,11 @@ fn do_not_preserve_ordering_through_repartition() -> Result<()> { "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet", ]; - assert_optimized!(expected, physical_plan.clone(), true); + assert_optimized!( + expected, + physical_plan.clone(), + &TestConfig::new(DoFirst::Distribution) + ); let expected = &[ "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", @@ -2903,7 +3185,7 @@ fn do_not_preserve_ordering_through_repartition() -> Result<()> { "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet", ]; - assert_optimized!(expected, physical_plan, false); + assert_optimized!(expected, physical_plan, &TestConfig::new(DoFirst::Sorting)); Ok(()) } @@ -2926,8 +3208,12 @@ fn no_need_for_sort_after_filter() -> Result<()> { "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; - assert_optimized!(expected, physical_plan.clone(), true); - assert_optimized!(expected, physical_plan, false); + assert_optimized!( + expected, + physical_plan.clone(), + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!(expected, physical_plan, &TestConfig::new(DoFirst::Sorting)); Ok(()) } @@ -2955,7 +3241,11 @@ fn do_not_preserve_ordering_through_repartition2() -> Result<()> { "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; - assert_optimized!(expected, physical_plan.clone(), true); + assert_optimized!( + expected, + physical_plan.clone(), + &TestConfig::new(DoFirst::Distribution) + ); let expected = &[ "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", @@ -2965,7 +3255,7 @@ fn do_not_preserve_ordering_through_repartition2() -> Result<()> { "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; - assert_optimized!(expected, physical_plan, false); + assert_optimized!(expected, physical_plan, &TestConfig::new(DoFirst::Sorting)); Ok(()) } @@ -2985,8 +3275,12 @@ fn do_not_preserve_ordering_through_repartition3() -> Result<()> { "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; - assert_optimized!(expected, physical_plan.clone(), true); - assert_optimized!(expected, physical_plan, false); + assert_optimized!( + expected, + physical_plan.clone(), + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!(expected, physical_plan, &TestConfig::new(DoFirst::Sorting)); Ok(()) } @@ -3082,8 +3376,16 @@ fn do_not_add_unnecessary_hash() -> Result<()> { "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; // Make sure target partition number is 1. In this case hash repartition is unnecessary - assert_optimized!(expected, physical_plan.clone(), true, false, 1, false, 1024); - assert_optimized!(expected, physical_plan, false, false, 1, false, 1024); + assert_optimized!( + expected, + physical_plan.clone(), + &TestConfig::new(DoFirst::Distribution).with_query_execution_partitions(1) + ); + assert_optimized!( + expected, + physical_plan, + &TestConfig::new(DoFirst::Sorting).with_query_execution_partitions(1) + ); Ok(()) } @@ -3112,8 +3414,16 @@ fn do_not_add_unnecessary_hash2() -> Result<()> { "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; // Make sure target partition number is larger than 2 (e.g partition number at the source). - assert_optimized!(expected, physical_plan.clone(), true, false, 4, false, 1024); - assert_optimized!(expected, physical_plan, false, false, 4, false, 1024); + assert_optimized!( + expected, + physical_plan.clone(), + &TestConfig::new(DoFirst::Distribution).with_query_execution_partitions(4) + ); + assert_optimized!( + expected, + physical_plan, + &TestConfig::new(DoFirst::Sorting).with_query_execution_partitions(4) + ); Ok(()) } @@ -3130,8 +3440,12 @@ fn optimize_away_unnecessary_repartition() -> Result<()> { let expected = &["DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet"]; - assert_optimized!(expected, physical_plan.clone(), true); - assert_optimized!(expected, physical_plan, false); + assert_optimized!( + expected, + physical_plan.clone(), + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!(expected, physical_plan, &TestConfig::new(DoFirst::Sorting)); Ok(()) } @@ -3157,8 +3471,12 @@ fn optimize_away_unnecessary_repartition2() -> Result<()> { "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected, physical_plan.clone(), true); - assert_optimized!(expected, physical_plan, false); + assert_optimized!( + expected, + physical_plan.clone(), + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!(expected, physical_plan, &TestConfig::new(DoFirst::Sorting)); Ok(()) }