diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index e1b50d9c0cfa7..c5af1e35b6b44 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -409,17 +409,17 @@ const SORT_DISTRIB_DISTRIB: [Run; 3] = #[derive(Clone)] struct TestConfig { config: ConfigOptions, - optimizers_to_run: Vec, } -impl TestConfig { - fn new(optimizers_to_run: Vec) -> Self { +impl Default for TestConfig { + fn default() -> Self { Self { config: test_suite_default_config_options(), - optimizers_to_run, } } +} +impl TestConfig { /// If preferred, will not repartition / resort data if it is already sorted. fn with_prefer_existing_sort(mut self) -> Self { self.config.optimizer.prefer_existing_sort = true; @@ -445,40 +445,30 @@ impl TestConfig { self.config.execution.target_partitions = target_partitions; self } -} - -/// Runs the repartition optimizer and asserts the plan against the expected -/// Arguments -/// * `EXPECTED_LINES` - Expected output plan -/// * `PLAN` - Input plan -/// * `CONFIG` - [`TestConfig`] -macro_rules! assert_optimized { - ($EXPECTED_LINES: expr, $PLAN: expr, $CONFIG: expr) => { - let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect(); - - let TestConfig { - config, - optimizers_to_run, - } = $CONFIG; - // NOTE: These tests verify the joint `EnforceDistribution` + `EnforceSorting` cascade - // because they were written prior to the separation of `BasicEnforcement` into - // `EnforceSorting` and `EnforceDistribution`. - // TODO: Orthogonalize the tests here just to verify `EnforceDistribution` and create - // new tests for the cascade. + /// Perform a series of runs using the current [`TestConfig`], + /// assert the expected plan result, + /// and return the result plan (for potentional subsequent runs). + fn run( + &self, + expected_lines: &[&str], + plan: Arc, + optimizers_to_run: Vec, + ) -> Result<()> { + let expected_lines: Vec<&str> = expected_lines.to_vec(); // Add the ancillary output requirements operator at the start: let optimizer = OutputRequirements::new_add_mode(); - let mut optimized = optimizer.optimize($PLAN.clone(), &config)?; + let mut optimized = optimizer.optimize(plan.clone(), &self.config)?; // This file has 2 rules that use tree node, apply these rules to original plan consecutively // After these operations tree nodes should be in a consistent state. // This code block makes sure that these rules doesn't violate tree node integrity. { - let adjusted = if config.optimizer.top_down_join_key_reordering { + let adjusted = if self.config.optimizer.top_down_join_key_reordering { // Run adjust_input_keys_ordering rule let plan_requirements = - PlanWithKeyRequirements::new_default($PLAN.clone()); + PlanWithKeyRequirements::new_default(plan.clone()); let adjusted = plan_requirements .transform_down(adjust_input_keys_ordering) .data() @@ -487,16 +477,17 @@ macro_rules! assert_optimized { adjusted.plan } else { // Run reorder_join_keys_to_inputs rule - $PLAN.clone().transform_up(|plan| { - Ok(Transformed::yes(reorder_join_keys_to_inputs(plan)?)) - }) - .data()? + plan.clone() + .transform_up(|plan| { + Ok(Transformed::yes(reorder_join_keys_to_inputs(plan)?)) + }) + .data()? }; // Then run ensure_distribution rule DistributionContext::new_default(adjusted) .transform_up(|distribution_context| { - ensure_distribution(distribution_context, &config) + ensure_distribution(distribution_context, &self.config) }) .data() .and_then(check_integrity)?; @@ -507,18 +498,18 @@ macro_rules! assert_optimized { optimized = match run { Run::Distribution => { let optimizer = EnforceDistribution::new(); - optimizer.optimize(optimized, &config)? - }, + optimizer.optimize(optimized, &self.config)? + } Run::Sorting => { let optimizer = EnforceSorting::new(); - optimizer.optimize(optimized, &config)? - }, + optimizer.optimize(optimized, &self.config)? + } }; } // Remove the ancillary output requirements operator when done: let optimizer = OutputRequirements::new_remove_mode(); - let optimized = optimizer.optimize(optimized, &config)?; + let optimized = optimizer.optimize(optimized, &self.config)?; // Now format correctly let actual_lines = get_plan_string(&optimized); @@ -528,7 +519,9 @@ macro_rules! assert_optimized { "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", expected_lines, actual_lines ); - }; + + Ok(()) + } } macro_rules! assert_plan_txt { @@ -637,16 +630,18 @@ fn multi_hash_joins() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ], }; - assert_optimized!( - expected, + + let test_config = TestConfig::default(); + test_config.run( + &expected, top_join.clone(), - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - ); - assert_optimized!( - expected, + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( + &expected, top_join, - &TestConfig::new(SORT_DISTRIB_DISTRIB.into()) - ); + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; } JoinType::RightSemi | JoinType::RightAnti => {} } @@ -709,16 +704,18 @@ fn multi_hash_joins() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ], }; - assert_optimized!( - expected, + + let test_config = TestConfig::default(); + test_config.run( + &expected, top_join.clone(), - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - ); - assert_optimized!( - expected, + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( + &expected, top_join, - &TestConfig::new(SORT_DISTRIB_DISTRIB.into()) - ); + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; } JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {} } @@ -774,16 +771,17 @@ fn multi_joins_after_alias() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!( + let test_config = TestConfig::default(); + test_config.run( expected, top_join.clone(), - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - ); - assert_optimized!( + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( expected, top_join, - &TestConfig::new(SORT_DISTRIB_DISTRIB.into()) - ); + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; // Join on (a2 == c) let top_join_on = vec![( @@ -808,16 +806,17 @@ fn multi_joins_after_alias() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!( + let test_config = TestConfig::default(); + test_config.run( expected, top_join.clone(), - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - ); - assert_optimized!( + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( expected, top_join, - &TestConfig::new(SORT_DISTRIB_DISTRIB.into()) - ); + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; Ok(()) } @@ -870,16 +869,17 @@ fn multi_joins_after_multi_alias() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!( + let test_config = TestConfig::default(); + test_config.run( expected, top_join.clone(), - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - ); - assert_optimized!( + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( expected, top_join, - &TestConfig::new(SORT_DISTRIB_DISTRIB.into()) - ); + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; Ok(()) } @@ -918,16 +918,17 @@ fn join_after_agg_alias() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!( + let test_config = TestConfig::default(); + test_config.run( expected, join.clone(), - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - ); - assert_optimized!( + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( expected, join, - &TestConfig::new(SORT_DISTRIB_DISTRIB.into()) - ); + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; Ok(()) } @@ -979,16 +980,17 @@ fn hash_join_key_ordering() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!( + let test_config = TestConfig::default(); + test_config.run( expected, join.clone(), - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - ); - assert_optimized!( + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( expected, join, - &TestConfig::new(SORT_DISTRIB_DISTRIB.into()) - ); + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; Ok(()) } @@ -1113,16 +1115,17 @@ fn multi_hash_join_key_ordering() -> Result<()> { " ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!( + let test_config = TestConfig::default(); + test_config.run( expected, filter_top_join.clone(), - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - ); - assert_optimized!( + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( expected, filter_top_join, - &TestConfig::new(SORT_DISTRIB_DISTRIB.into()) - ); + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; Ok(()) } @@ -1403,6 +1406,8 @@ fn reorder_join_keys_to_right_input() -> Result<()> { /// These test cases use [`TestConfig::with_prefer_existing_sort`]. #[test] fn multi_smj_joins() -> Result<()> { + let test_config = TestConfig::default().with_prefer_existing_sort(); + let left = parquet_exec(); let alias_pairs: Vec<(String, String)> = vec![ ("a".to_string(), "a1".to_string()), @@ -1502,11 +1507,8 @@ fn multi_smj_joins() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ], }; - assert_optimized!( - expected, - top_join.clone(), - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()).with_prefer_existing_sort() - ); + // TODO(wiedld): show different test result if enforce sorting first. + test_config.run(&expected, top_join.clone(), DISTRIB_DISTRIB_SORT.into())?; let expected_first_sort_enforcement = match join_type { // Should include 6 RepartitionExecs (3 hash, 3 round-robin), 3 SortExecs @@ -1560,11 +1562,12 @@ fn multi_smj_joins() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ], }; - assert_optimized!( - expected_first_sort_enforcement, + // TODO(wiedld): show different test result if enforce distribution first. + test_config.run( + &expected_first_sort_enforcement, top_join, - &TestConfig::new(SORT_DISTRIB_DISTRIB.into()).with_prefer_existing_sort() - ); + SORT_DISTRIB_DISTRIB.into(), + )?; match join_type { JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => { @@ -1621,12 +1624,12 @@ fn multi_smj_joins() -> Result<()> { // this match arm cannot be reached _ => unreachable!() }; - assert_optimized!( - expected, + // TODO(wiedld): show different test result if enforce sorting first. + test_config.run( + &expected, top_join.clone(), - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - .with_prefer_existing_sort() - ); + DISTRIB_DISTRIB_SORT.into(), + )?; let expected_first_sort_enforcement = match join_type { // Should include 6 RepartitionExecs (3 of them preserves order) and 3 SortExecs @@ -1673,12 +1676,12 @@ fn multi_smj_joins() -> Result<()> { _ => unreachable!() }; - assert_optimized!( - expected_first_sort_enforcement, + // TODO(wiedld): show different test result if enforce distribution first. + test_config.run( + &expected_first_sort_enforcement, top_join, - &TestConfig::new(SORT_DISTRIB_DISTRIB.into()) - .with_prefer_existing_sort() - ); + SORT_DISTRIB_DISTRIB.into(), + )?; } _ => {} } @@ -1734,6 +1737,10 @@ fn smj_join_key_ordering() -> Result<()> { ]; let join = sort_merge_join_exec(left, right.clone(), &join_on, &JoinType::Inner); + // TestConfig: Prefer existing sort. + let test_config = TestConfig::default().with_prefer_existing_sort(); + + // Test: run EnforceDistribution, then EnforceSort. // Only two RepartitionExecs added let expected = &[ "SortMergeJoin: join_type=Inner, on=[(b3@1, b2@1), (a3@0, a2@0)]", @@ -1753,12 +1760,9 @@ fn smj_join_key_ordering() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!( - expected, - join.clone(), - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()).with_prefer_existing_sort() - ); + test_config.run(expected, join.clone(), DISTRIB_DISTRIB_SORT.into())?; + // Test: result IS DIFFERENT, if EnforceSorting is run first: let expected_first_sort_enforcement = &[ "SortMergeJoin: join_type=Inner, on=[(b3@1, b2@1), (a3@0, a2@0)]", " RepartitionExec: partitioning=Hash([b3@1, a3@0], 10), input_partitions=10, preserve_order=true, sort_exprs=b3@1 ASC, a3@0 ASC", @@ -1783,11 +1787,11 @@ fn smj_join_key_ordering() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!( + test_config.run( expected_first_sort_enforcement, join, - &TestConfig::new(SORT_DISTRIB_DISTRIB.into()).with_prefer_existing_sort() - ); + SORT_DISTRIB_DISTRIB.into(), + )?; Ok(()) } @@ -1811,6 +1815,8 @@ fn merge_does_not_need_sort() -> Result<()> { let exec: Arc = Arc::new(SortPreservingMergeExec::new(sort_key, exec)); + // Test: run EnforceDistribution, then EnforceSort. + // // The optimizer should not add an additional SortExec as the // data is already sorted let expected = &[ @@ -1818,27 +1824,26 @@ fn merge_does_not_need_sort() -> Result<()> { " CoalesceBatchesExec: target_batch_size=4096", " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet", ]; - assert_optimized!( - expected, - exec, - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - ); + let test_config = TestConfig::default(); + test_config.run(expected, exec.clone(), DISTRIB_DISTRIB_SORT.into())?; + // Test: result IS DIFFERENT, if EnforceSorting is run first: + // // In this case preserving ordering through order preserving operators is not desirable // (according to flag: PREFER_EXISTING_SORT) // hence in this case ordering lost during CoalescePartitionsExec and re-introduced with // SortExec at the top. - let expected = &[ + let expected_first_sort_enforcement = &[ "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", " CoalescePartitionsExec", " CoalesceBatchesExec: target_batch_size=4096", " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet", ]; - assert_optimized!( - expected, + test_config.run( + expected_first_sort_enforcement, exec, - &TestConfig::new(SORT_DISTRIB_DISTRIB.into()) - ); + SORT_DISTRIB_DISTRIB.into(), + )?; Ok(()) } @@ -1879,16 +1884,18 @@ fn union_to_interleave() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!( + + let test_config = TestConfig::default(); + test_config.run( expected, plan.clone(), - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - ); - assert_optimized!( + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( expected, - plan.clone(), - &TestConfig::new(SORT_DISTRIB_DISTRIB.into()) - ); + plan, + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; Ok(()) } @@ -1931,16 +1938,19 @@ fn union_not_to_interleave() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!( + // TestConfig: Prefer existing union. + let test_config = TestConfig::default().with_prefer_existing_union(); + + test_config.run( expected, plan.clone(), - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()).with_prefer_existing_union() - ); - assert_optimized!( + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( expected, plan, - &TestConfig::new(SORT_DISTRIB_DISTRIB.into()).with_prefer_existing_union() - ); + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; Ok(()) } @@ -1957,16 +1967,18 @@ fn added_repartition_to_single_partition() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!( - expected, + + let test_config = TestConfig::default(); + test_config.run( + &expected, plan.clone(), - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - ); - assert_optimized!( - expected, + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( + &expected, plan, - &TestConfig::new(SORT_DISTRIB_DISTRIB.into()) - ); + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; Ok(()) } @@ -1984,16 +1996,18 @@ fn repartition_deepest_node() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!( + + let test_config = TestConfig::default(); + test_config.run( expected, plan.clone(), - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - ); - assert_optimized!( + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( expected, plan, - &TestConfig::new(SORT_DISTRIB_DISTRIB.into()) - ); + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; Ok(()) } @@ -2012,16 +2026,17 @@ fn repartition_unsorted_limit() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!( + let test_config = TestConfig::default(); + test_config.run( expected, plan.clone(), - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - ); - assert_optimized!( + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( expected, plan, - &TestConfig::new(SORT_DISTRIB_DISTRIB.into()) - ); + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; Ok(()) } @@ -2042,16 +2057,18 @@ fn repartition_sorted_limit() -> Result<()> { " SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!( + + let test_config = TestConfig::default(); + test_config.run( expected, plan.clone(), - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - ); - assert_optimized!( + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( expected, plan, - &TestConfig::new(SORT_DISTRIB_DISTRIB.into()) - ); + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; Ok(()) } @@ -2078,16 +2095,17 @@ fn repartition_sorted_limit_with_filter() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!( + let test_config = TestConfig::default(); + test_config.run( expected, plan.clone(), - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - ); - assert_optimized!( + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( expected, plan, - &TestConfig::new(SORT_DISTRIB_DISTRIB.into()) - ); + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; Ok(()) } @@ -2116,16 +2134,18 @@ fn repartition_ignores_limit() -> Result<()> { // Expect no repartition to happen for local limit " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!( + + let test_config = TestConfig::default(); + test_config.run( expected, plan.clone(), - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - ); - assert_optimized!( + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( expected, plan, - &TestConfig::new(SORT_DISTRIB_DISTRIB.into()) - ); + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; Ok(()) } @@ -2144,16 +2164,17 @@ fn repartition_ignores_union() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!( + let test_config = TestConfig::default(); + test_config.run( expected, plan.clone(), - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - ); - assert_optimized!( + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( expected, plan, - &TestConfig::new(SORT_DISTRIB_DISTRIB.into()) - ); + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; Ok(()) } @@ -2173,16 +2194,18 @@ fn repartition_through_sort_preserving_merge() -> Result<()> { "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!( + + let test_config = TestConfig::default(); + test_config.run( expected, plan.clone(), - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - ); - assert_optimized!( + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( expected, plan, - &TestConfig::new(SORT_DISTRIB_DISTRIB.into()) - ); + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; Ok(()) } @@ -2200,29 +2223,28 @@ fn repartition_ignores_sort_preserving_merge() -> Result<()> { parquet_exec_multiple_sorted(vec![sort_key]), ); + // Test: run EnforceDistribution, then EnforceSort + // // should not sort (as the data was already sorted) // should not repartition, since increased parallelism is not beneficial for SortPReservingMerge let expected = &[ "SortPreservingMergeExec: [c@2 ASC]", " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; + let test_config = TestConfig::default(); + test_config.run(expected, plan.clone(), DISTRIB_DISTRIB_SORT.into())?; - assert_optimized!( - expected, - plan.clone(), - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - ); - - let expected = &[ + // Test: result IS DIFFERENT, if EnforceSorting is run first: + let expected_first_sort_enforcement = &[ "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", " CoalescePartitionsExec", " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; - assert_optimized!( - expected, + test_config.run( + expected_first_sort_enforcement, plan, - &TestConfig::new(SORT_DISTRIB_DISTRIB.into()) - ); + SORT_DISTRIB_DISTRIB.into(), + )?; Ok(()) } @@ -2238,6 +2260,8 @@ fn repartition_ignores_sort_preserving_merge_with_union() -> Result<()> { let input = union_exec(vec![parquet_exec_with_sort(vec![sort_key.clone()]); 2]); let plan = sort_preserving_merge_exec(sort_key, input); + // Test: run EnforceDistribution, then EnforceSort. + // // should not repartition / sort (as the data was already sorted) let expected = &[ "SortPreservingMergeExec: [c@2 ASC]", @@ -2245,25 +2269,22 @@ fn repartition_ignores_sort_preserving_merge_with_union() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; + let test_config = TestConfig::default(); + test_config.run(expected, plan.clone(), DISTRIB_DISTRIB_SORT.into())?; - assert_optimized!( - expected, - plan.clone(), - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - ); - - let expected = &[ + // test: result IS DIFFERENT, if EnforceSorting is run first: + let expected_first_sort_enforcement = &[ "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", " CoalescePartitionsExec", " UnionExec", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; - assert_optimized!( - expected, + test_config.run( + expected_first_sort_enforcement, plan, - &TestConfig::new(SORT_DISTRIB_DISTRIB.into()) - ); + SORT_DISTRIB_DISTRIB.into(), + )?; Ok(()) } @@ -2283,6 +2304,9 @@ fn repartition_does_not_destroy_sort() -> Result<()> { sort_key, ); + // TestConfig: Prefer existing sort. + let test_config = TestConfig::default().with_prefer_existing_sort(); + // during repartitioning ordering is preserved let expected = &[ "SortRequiredExec: [d@3 ASC]", @@ -2291,16 +2315,16 @@ fn repartition_does_not_destroy_sort() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[d@3 ASC], file_type=parquet", ]; - assert_optimized!( + test_config.run( expected, plan.clone(), - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()).with_prefer_existing_sort() - ); - assert_optimized!( + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( expected, plan, - &TestConfig::new(SORT_DISTRIB_DISTRIB.into()).with_prefer_existing_sort() - ); + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; Ok(()) } @@ -2340,16 +2364,18 @@ fn repartition_does_not_destroy_sort_more_complex() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!( + + let test_config = TestConfig::default(); + test_config.run( expected, plan.clone(), - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - ); - assert_optimized!( + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( expected, plan, - &TestConfig::new(SORT_DISTRIB_DISTRIB.into()) - ); + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; Ok(()) } @@ -2373,6 +2399,7 @@ fn repartition_transitively_with_projection() -> Result<()> { }]); let plan = sort_preserving_merge_exec(sort_key, proj); + // Test: run EnforceDistribution, then EnforceSort. let expected = &[ "SortPreservingMergeExec: [sum@0 ASC]", " SortExec: expr=[sum@0 ASC], preserve_partitioning=[true]", @@ -2381,13 +2408,10 @@ fn repartition_transitively_with_projection() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; + let test_config = TestConfig::default(); + test_config.run(expected, plan.clone(), DISTRIB_DISTRIB_SORT.into())?; - assert_optimized!( - expected, - plan.clone(), - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - ); - + // Test: result IS DIFFERENT, if EnforceSorting is run first: let expected_first_sort_enforcement = &[ "SortExec: expr=[sum@0 ASC], preserve_partitioning=[false]", " CoalescePartitionsExec", @@ -2396,11 +2420,11 @@ fn repartition_transitively_with_projection() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!( + test_config.run( expected_first_sort_enforcement, plan, - &TestConfig::new(SORT_DISTRIB_DISTRIB.into()) - ); + SORT_DISTRIB_DISTRIB.into(), + )?; Ok(()) } @@ -2432,16 +2456,18 @@ fn repartition_ignores_transitively_with_projection() -> Result<()> { " ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]", " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; - assert_optimized!( + + let test_config = TestConfig::default(); + test_config.run( expected, plan.clone(), - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - ); - assert_optimized!( + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( expected, plan, - &TestConfig::new(SORT_DISTRIB_DISTRIB.into()) - ); + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; Ok(()) } @@ -2473,16 +2499,18 @@ fn repartition_transitively_past_sort_with_projection() -> Result<()> { " ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!( + + let test_config = TestConfig::default(); + test_config.run( expected, plan.clone(), - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - ); - assert_optimized!( + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( expected, plan, - &TestConfig::new(SORT_DISTRIB_DISTRIB.into()) - ); + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; Ok(()) } @@ -2496,6 +2524,7 @@ fn repartition_transitively_past_sort_with_filter() -> Result<()> { }]); let plan = sort_exec(sort_key, filter_exec(parquet_exec()), false); + // Test: run EnforceDistribution, then EnforceSort. let expected = &[ "SortPreservingMergeExec: [a@0 ASC]", " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", @@ -2504,13 +2533,10 @@ fn repartition_transitively_past_sort_with_filter() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; + let test_config = TestConfig::default(); + test_config.run(expected, plan.clone(), DISTRIB_DISTRIB_SORT.into())?; - assert_optimized!( - expected, - plan.clone(), - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - ); - + // Test: result IS DIFFERENT, if EnforceSorting is run first: let expected_first_sort_enforcement = &[ "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", " CoalescePartitionsExec", @@ -2519,11 +2545,11 @@ fn repartition_transitively_past_sort_with_filter() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!( + test_config.run( expected_first_sort_enforcement, plan, - &TestConfig::new(SORT_DISTRIB_DISTRIB.into()) - ); + SORT_DISTRIB_DISTRIB.into(), + )?; Ok(()) } @@ -2549,6 +2575,7 @@ fn repartition_transitively_past_sort_with_projection_and_filter() -> Result<()> false, ); + // Test: run EnforceDistribution, then EnforceSort. let expected = &[ "SortPreservingMergeExec: [a@0 ASC]", // Expect repartition on the input to the sort (as it can benefit from additional parallelism) @@ -2559,13 +2586,10 @@ fn repartition_transitively_past_sort_with_projection_and_filter() -> Result<()> " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; + let test_config = TestConfig::default(); + test_config.run(expected, plan.clone(), DISTRIB_DISTRIB_SORT.into())?; - assert_optimized!( - expected, - plan.clone(), - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - ); - + // Test: result IS DIFFERENT, if EnforceSorting is run first: let expected_first_sort_enforcement = &[ "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", " CoalescePartitionsExec", @@ -2574,11 +2598,11 @@ fn repartition_transitively_past_sort_with_projection_and_filter() -> Result<()> " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!( + test_config.run( expected_first_sort_enforcement, plan, - &TestConfig::new(SORT_DISTRIB_DISTRIB.into()) - ); + SORT_DISTRIB_DISTRIB.into(), + )?; Ok(()) } @@ -2589,24 +2613,45 @@ fn parallelization_single_partition() -> Result<()> { let plan_parquet = aggregate_exec_with_alias(parquet_exec(), alias.clone()); let plan_csv = aggregate_exec_with_alias(csv_exec(), alias); + let test_config = TestConfig::default() + .with_prefer_repartition_file_scans(10) + .with_query_execution_partitions(2); + + // Test: with parquet let expected_parquet = [ "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]", " RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2", " AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]", " DataSourceExec: file_groups={2 groups: [[x:0..50], [x:50..100]]}, projection=[a, b, c, d, e], file_type=parquet", ]; + test_config.run( + &expected_parquet, + plan_parquet.clone(), + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( + &expected_parquet, + plan_parquet, + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; + + // Test: with csv let expected_csv = [ "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]", " RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2", " AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]", " DataSourceExec: file_groups={2 groups: [[x:0..50], [x:50..100]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", ]; - - let test_config = TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - .with_prefer_repartition_file_scans(10) - .with_query_execution_partitions(2); - assert_optimized!(expected_parquet, plan_parquet, &test_config); - assert_optimized!(expected_csv, plan_csv, &test_config); + test_config.run( + &expected_csv, + plan_csv.clone(), + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( + &expected_csv, + plan_csv, + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; Ok(()) } @@ -2622,34 +2667,47 @@ fn parallelization_multiple_files() -> Result<()> { let plan = filter_exec(parquet_exec_multiple_sorted(vec![sort_key.clone()])); let plan = sort_required_exec_with_req(plan, sort_key); - let test_config = TestConfig::new(DISTRIB_DISTRIB_SORT.into()) + let test_config = TestConfig::default() .with_prefer_existing_sort() .with_prefer_repartition_file_scans(1); // The groups must have only contiguous ranges of rows from the same file // if any group has rows from multiple files, the data is no longer sorted destroyed // https://github.com/apache/datafusion/issues/8451 - let expected = [ + let expected_with_3_target_partitions = [ "SortRequiredExec: [a@0 ASC]", " FilterExec: c@2 = 0", " DataSourceExec: file_groups={3 groups: [[x:0..50], [y:0..100], [x:50..100]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet", ]; - assert_optimized!( - expected, - plan, - &test_config.clone().with_query_execution_partitions(3) - ); + let test_config_concurrency_3 = + test_config.clone().with_query_execution_partitions(3); + test_config_concurrency_3.run( + &expected_with_3_target_partitions, // same if distribution enforced before sort. + plan.clone(), + DISTRIB_DISTRIB_SORT.into(), + )?; + test_config_concurrency_3.run( + &expected_with_3_target_partitions, + plan.clone(), + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; - let expected = [ + let expected_with_8_target_partitions = [ "SortRequiredExec: [a@0 ASC]", " FilterExec: c@2 = 0", " DataSourceExec: file_groups={8 groups: [[x:0..25], [y:0..25], [x:25..50], [y:25..50], [x:50..75], [y:50..75], [x:75..100], [y:75..100]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet", ]; - assert_optimized!( - expected, + let test_config_concurrency_8 = test_config.with_query_execution_partitions(8); + test_config_concurrency_8.run( + &expected_with_8_target_partitions, + plan.clone(), + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config_concurrency_8.run( + &expected_with_8_target_partitions, plan, - &test_config.with_query_execution_partitions(8) - ); + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; Ok(()) } @@ -2699,13 +2757,19 @@ fn parallelization_compressed_csv() -> Result<()> { .build(), vec![("a".to_string(), "a".to_string())], ); - assert_optimized!( + let test_config = TestConfig::default() + .with_query_execution_partitions(2) + .with_prefer_repartition_file_scans(10); + test_config.run( + expected, + plan.clone(), + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( expected, plan, - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - .with_query_execution_partitions(2) - .with_prefer_repartition_file_scans(10) - ); + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; } Ok(()) } @@ -2716,6 +2780,11 @@ fn parallelization_two_partitions() -> Result<()> { let plan_parquet = aggregate_exec_with_alias(parquet_exec_multiple(), alias.clone()); let plan_csv = aggregate_exec_with_alias(csv_exec_multiple(), alias); + let test_config = TestConfig::default() + .with_query_execution_partitions(2) + .with_prefer_repartition_file_scans(10); + + // Test: with parquet let expected_parquet = [ "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]", " RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2", @@ -2723,6 +2792,18 @@ fn parallelization_two_partitions() -> Result<()> { // Plan already has two partitions " DataSourceExec: file_groups={2 groups: [[x:0..100], [y:0..100]]}, projection=[a, b, c, d, e], file_type=parquet", ]; + test_config.run( + &expected_parquet, + plan_parquet.clone(), + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( + &expected_parquet, + plan_parquet, + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; + + // Test: with csv let expected_csv = [ "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]", " RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2", @@ -2730,11 +2811,17 @@ fn parallelization_two_partitions() -> Result<()> { // Plan already has two partitions " DataSourceExec: file_groups={2 groups: [[x:0..100], [y:0..100]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", ]; - let test_config = TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - .with_query_execution_partitions(2) - .with_prefer_repartition_file_scans(10); - assert_optimized!(expected_parquet, plan_parquet, &test_config); - assert_optimized!(expected_csv, plan_csv, &test_config); + test_config.run( + &expected_csv, + plan_csv.clone(), + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( + &expected_csv, + plan_csv, + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; + Ok(()) } @@ -2744,6 +2831,11 @@ fn parallelization_two_partitions_into_four() -> Result<()> { let plan_parquet = aggregate_exec_with_alias(parquet_exec_multiple(), alias.clone()); let plan_csv = aggregate_exec_with_alias(csv_exec_multiple(), alias); + let test_config = TestConfig::default() + .with_query_execution_partitions(4) + .with_prefer_repartition_file_scans(10); + + // Test: with parquet let expected_parquet = [ "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]", " RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4", @@ -2751,6 +2843,18 @@ fn parallelization_two_partitions_into_four() -> Result<()> { // Multiple source files splitted across partitions " DataSourceExec: file_groups={4 groups: [[x:0..50], [x:50..100], [y:0..50], [y:50..100]]}, projection=[a, b, c, d, e], file_type=parquet", ]; + test_config.run( + &expected_parquet, + plan_parquet.clone(), + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( + &expected_parquet, + plan_parquet, + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; + + // Test: with csv let expected_csv = [ "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]", " RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4", @@ -2758,11 +2862,16 @@ fn parallelization_two_partitions_into_four() -> Result<()> { // Multiple source files splitted across partitions " DataSourceExec: file_groups={4 groups: [[x:0..50], [x:50..100], [y:0..50], [y:50..100]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", ]; - let test_config = TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - .with_query_execution_partitions(4) - .with_prefer_repartition_file_scans(10); - assert_optimized!(expected_parquet, plan_parquet, &test_config); - assert_optimized!(expected_csv, plan_csv, &test_config); + test_config.run( + &expected_csv, + plan_csv.clone(), + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( + &expected_csv, + plan_csv, + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; Ok(()) } @@ -2777,6 +2886,9 @@ fn parallelization_sorted_limit() -> Result<()> { let plan_parquet = limit_exec(sort_exec(sort_key.clone(), parquet_exec(), false)); let plan_csv = limit_exec(sort_exec(sort_key, csv_exec(), false)); + let test_config = TestConfig::default(); + + // Test: with parquet let expected_parquet = &[ "GlobalLimitExec: skip=0, fetch=100", " LocalLimitExec: fetch=100", @@ -2785,6 +2897,18 @@ fn parallelization_sorted_limit() -> Result<()> { // Doesn't parallelize for SortExec without preserve_partitioning " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; + test_config.run( + expected_parquet, + plan_parquet.clone(), + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( + expected_parquet, + plan_parquet, + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; + + // Test: with csv let expected_csv = &[ "GlobalLimitExec: skip=0, fetch=100", " LocalLimitExec: fetch=100", @@ -2793,16 +2917,16 @@ fn parallelization_sorted_limit() -> Result<()> { // Doesn't parallelize for SortExec without preserve_partitioning " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", ]; - assert_optimized!( - expected_parquet, - plan_parquet, - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - ); - assert_optimized!( + test_config.run( + expected_csv, + plan_csv.clone(), + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( expected_csv, plan_csv, - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - ); + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; Ok(()) } @@ -2821,6 +2945,9 @@ fn parallelization_limit_with_filter() -> Result<()> { ))); let plan_csv = limit_exec(filter_exec(sort_exec(sort_key, csv_exec(), false))); + let test_config = TestConfig::default(); + + // Test: with parquet let expected_parquet = &[ "GlobalLimitExec: skip=0, fetch=100", " CoalescePartitionsExec", @@ -2833,6 +2960,18 @@ fn parallelization_limit_with_filter() -> Result<()> { // SortExec doesn't benefit from input partitioning " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; + test_config.run( + expected_parquet, + plan_parquet.clone(), + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( + expected_parquet, + plan_parquet, + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; + + // Test: with csv let expected_csv = &[ "GlobalLimitExec: skip=0, fetch=100", " CoalescePartitionsExec", @@ -2845,16 +2984,16 @@ fn parallelization_limit_with_filter() -> Result<()> { // SortExec doesn't benefit from input partitioning " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", ]; - assert_optimized!( - expected_parquet, - plan_parquet, - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - ); - assert_optimized!( + test_config.run( + expected_csv, + plan_csv.clone(), + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( expected_csv, plan_csv, - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - ); + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; Ok(()) } @@ -2869,6 +3008,9 @@ fn parallelization_ignores_limit() -> Result<()> { let plan_csv = aggregate_exec_with_alias(limit_exec(filter_exec(limit_exec(csv_exec()))), alias); + let test_config = TestConfig::default(); + + // Test: with parquet let expected_parquet = &[ "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]", " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", @@ -2885,6 +3027,18 @@ fn parallelization_ignores_limit() -> Result<()> { " LocalLimitExec: fetch=100", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; + test_config.run( + expected_parquet, + plan_parquet.clone(), + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( + expected_parquet, + plan_parquet, + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; + + // Test: with csv let expected_csv = &[ "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]", " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", @@ -2901,16 +3055,16 @@ fn parallelization_ignores_limit() -> Result<()> { " LocalLimitExec: fetch=100", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", ]; - assert_optimized!( - expected_parquet, - plan_parquet, - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - ); - assert_optimized!( + test_config.run( + expected_csv, + plan_csv.clone(), + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( expected_csv, plan_csv, - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - ); + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; Ok(()) } @@ -2920,6 +3074,9 @@ fn parallelization_union_inputs() -> Result<()> { let plan_parquet = union_exec(vec![parquet_exec(); 5]); let plan_csv = union_exec(vec![csv_exec(); 5]); + let test_config = TestConfig::default(); + + // Test: with parquet let expected_parquet = &[ "UnionExec", // Union doesn't benefit from input partitioning - no parallelism @@ -2929,6 +3086,18 @@ fn parallelization_union_inputs() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; + test_config.run( + expected_parquet, + plan_parquet.clone(), + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( + expected_parquet, + plan_parquet, + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; + + // Test: with csv let expected_csv = &[ "UnionExec", // Union doesn't benefit from input partitioning - no parallelism @@ -2938,16 +3107,16 @@ fn parallelization_union_inputs() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", ]; - assert_optimized!( - expected_parquet, - plan_parquet, - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - ); - assert_optimized!( + test_config.run( + expected_csv, + plan_csv.clone(), + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( expected_csv, plan_csv, - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - ); + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; Ok(()) } @@ -2967,23 +3136,40 @@ fn parallelization_prior_to_sort_preserving_merge() -> Result<()> { let plan_csv = sort_preserving_merge_exec(sort_key.clone(), csv_exec_with_sort(vec![sort_key])); + let test_config = TestConfig::default(); + + // Expected Outcome: // parallelization is not beneficial for SortPreservingMerge + + // Test: with parquet let expected_parquet = &[ "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; + test_config.run( + expected_parquet, + plan_parquet.clone(), + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( + expected_parquet, + plan_parquet, + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; + + // Test: with csv let expected_csv = &[ "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false", ]; - assert_optimized!( - expected_parquet, - plan_parquet, - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - ); - assert_optimized!( + test_config.run( + expected_csv, + plan_csv.clone(), + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( expected_csv, plan_csv, - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - ); + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; Ok(()) } @@ -3002,30 +3188,61 @@ fn parallelization_sort_preserving_merge_with_union() -> Result<()> { let plan_parquet = sort_preserving_merge_exec(sort_key.clone(), input_parquet); let plan_csv = sort_preserving_merge_exec(sort_key, input_csv); + let test_config = TestConfig::default(); + + // Expected Outcome: // should not repartition (union doesn't benefit from increased parallelism) // should not sort (as the data was already sorted) + + // Test: with parquet let expected_parquet = &[ "SortPreservingMergeExec: [c@2 ASC]", " UnionExec", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; + test_config.run( + expected_parquet, + plan_parquet.clone(), + DISTRIB_DISTRIB_SORT.into(), + )?; + let expected_parquet_first_sort_enforcement = &[ + // no SPM + "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", + // has coalesce + " CoalescePartitionsExec", + " UnionExec", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", + ]; + test_config.run( + expected_parquet_first_sort_enforcement, + plan_parquet, + SORT_DISTRIB_DISTRIB.into(), + )?; + + // Test: with csv let expected_csv = &[ "SortPreservingMergeExec: [c@2 ASC]", " UnionExec", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false", ]; - assert_optimized!( - expected_parquet, - plan_parquet, - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - ); - assert_optimized!( - expected_csv, - plan_csv, - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - ); + test_config.run(expected_csv, plan_csv.clone(), DISTRIB_DISTRIB_SORT.into())?; + let expected_csv_first_sort_enforcement = &[ + // no SPM + "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", + // has coalesce + " CoalescePartitionsExec", + " UnionExec", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false", + ]; + test_config.run( + expected_csv_first_sort_enforcement, + plan_csv.clone(), + SORT_DISTRIB_DISTRIB.into(), + )?; Ok(()) } @@ -3046,25 +3263,42 @@ fn parallelization_does_not_benefit() -> Result<()> { let plan_csv = sort_required_exec_with_req(csv_exec_with_sort(vec![sort_key.clone()]), sort_key); + let test_config = TestConfig::default(); + + // Expected Outcome: // no parallelization, because SortRequiredExec doesn't benefit from increased parallelism + + // Test: with parquet let expected_parquet = &[ "SortRequiredExec: [c@2 ASC]", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; + test_config.run( + expected_parquet, + plan_parquet.clone(), + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( + expected_parquet, + plan_parquet, + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; + + // Test: with csv let expected_csv = &[ "SortRequiredExec: [c@2 ASC]", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false", ]; - assert_optimized!( - expected_parquet, - plan_parquet, - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - ); - assert_optimized!( + test_config.run( + expected_csv, + plan_csv.clone(), + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( expected_csv, plan_csv, - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - ); + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; Ok(()) } @@ -3098,16 +3332,23 @@ fn parallelization_ignores_transitively_with_projection_parquet() -> Result<()> ]; plans_matches_expected!(expected, &plan_parquet); + // Expected Outcome: // data should not be repartitioned / resorted let expected_parquet = &[ "ProjectionExec: expr=[a@0 as a2, c@2 as c2]", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; - assert_optimized!( + let test_config = TestConfig::default(); + test_config.run( + expected_parquet, + plan_parquet.clone(), + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( expected_parquet, plan_parquet, - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - ); + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; Ok(()) } @@ -3141,16 +3382,23 @@ fn parallelization_ignores_transitively_with_projection_csv() -> Result<()> { ]; plans_matches_expected!(expected, &plan_csv); + // Expected Outcome: // data should not be repartitioned / resorted let expected_csv = &[ "ProjectionExec: expr=[a@0 as a2, c@2 as c2]", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false", ]; - assert_optimized!( + let test_config = TestConfig::default(); + test_config.run( + expected_csv, + plan_csv.clone(), + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( expected_csv, plan_csv, - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - ); + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; Ok(()) } @@ -3174,16 +3422,18 @@ fn remove_redundant_roundrobins() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!( + + let test_config = TestConfig::default(); + test_config.run( expected, physical_plan.clone(), - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - ); - assert_optimized!( + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( expected, physical_plan, - &TestConfig::new(SORT_DISTRIB_DISTRIB.into()) - ); + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; Ok(()) } @@ -3199,6 +3449,10 @@ fn remove_unnecessary_spm_after_filter() -> Result<()> { let input = parquet_exec_multiple_sorted(vec![sort_key.clone()]); let physical_plan = sort_preserving_merge_exec(sort_key, filter_exec(input)); + // TestConfig: Prefer existing sort. + let test_config = TestConfig::default().with_prefer_existing_sort(); + + // Expected Outcome: // Original plan expects its output to be ordered by c@2 ASC. // This is still satisfied since, after filter that column is constant. let expected = &[ @@ -3207,16 +3461,17 @@ fn remove_unnecessary_spm_after_filter() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2, preserve_order=true, sort_exprs=c@2 ASC", " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; - assert_optimized!( + + test_config.run( expected, physical_plan.clone(), - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()).with_prefer_existing_sort() - ); - assert_optimized!( + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( expected, physical_plan, - &TestConfig::new(SORT_DISTRIB_DISTRIB.into()).with_prefer_existing_sort() - ); + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; Ok(()) } @@ -3232,22 +3487,25 @@ fn preserve_ordering_through_repartition() -> Result<()> { let input = parquet_exec_multiple_sorted(vec![sort_key.clone()]); let physical_plan = sort_preserving_merge_exec(sort_key, filter_exec(input)); + // TestConfig: Prefer existing sort. + let test_config = TestConfig::default().with_prefer_existing_sort(); + let expected = &[ "SortPreservingMergeExec: [d@3 ASC]", " FilterExec: c@2 = 0", " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2, preserve_order=true, sort_exprs=d@3 ASC", " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[d@3 ASC], file_type=parquet", ]; - assert_optimized!( + test_config.run( expected, physical_plan.clone(), - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()).with_prefer_existing_sort() - ); - assert_optimized!( + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( expected, physical_plan, - &TestConfig::new(SORT_DISTRIB_DISTRIB.into()).with_prefer_existing_sort() - ); + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; Ok(()) } @@ -3262,6 +3520,9 @@ fn do_not_preserve_ordering_through_repartition() -> Result<()> { let input = parquet_exec_multiple_sorted(vec![sort_key.clone()]); let physical_plan = sort_preserving_merge_exec(sort_key, filter_exec(input)); + let test_config = TestConfig::default(); + + // Test: run EnforceDistribution, then EnforceSort. let expected = &[ "SortPreservingMergeExec: [a@0 ASC]", " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", @@ -3269,25 +3530,21 @@ fn do_not_preserve_ordering_through_repartition() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet", ]; + test_config.run(expected, physical_plan.clone(), DISTRIB_DISTRIB_SORT.into())?; - assert_optimized!( - expected, - physical_plan.clone(), - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - ); - - let expected = &[ + // Test: result IS DIFFERENT, if EnforceSorting is run first: + let expected_first_sort_enforcement = &[ "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", " CoalescePartitionsExec", " FilterExec: c@2 = 0", " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet", ]; - assert_optimized!( - expected, + test_config.run( + expected_first_sort_enforcement, physical_plan, - &TestConfig::new(SORT_DISTRIB_DISTRIB.into()) - ); + SORT_DISTRIB_DISTRIB.into(), + )?; Ok(()) } @@ -3310,16 +3567,17 @@ fn no_need_for_sort_after_filter() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; - assert_optimized!( + let test_config = TestConfig::default(); + test_config.run( expected, physical_plan.clone(), - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - ); - assert_optimized!( + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( expected, physical_plan, - &TestConfig::new(SORT_DISTRIB_DISTRIB.into()) - ); + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; Ok(()) } @@ -3339,6 +3597,9 @@ fn do_not_preserve_ordering_through_repartition2() -> Result<()> { }]); let physical_plan = sort_preserving_merge_exec(sort_req, filter_exec(input)); + let test_config = TestConfig::default(); + + // Test: run EnforceDistribution, then EnforceSort. let expected = &[ "SortPreservingMergeExec: [a@0 ASC]", " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", @@ -3346,14 +3607,10 @@ fn do_not_preserve_ordering_through_repartition2() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; + test_config.run(expected, physical_plan.clone(), DISTRIB_DISTRIB_SORT.into())?; - assert_optimized!( - expected, - physical_plan.clone(), - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - ); - - let expected = &[ + // Test: result IS DIFFERENT, if EnforceSorting is run first: + let expected_first_sort_enforcement = &[ "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", " CoalescePartitionsExec", " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", @@ -3361,11 +3618,11 @@ fn do_not_preserve_ordering_through_repartition2() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; - assert_optimized!( - expected, + test_config.run( + expected_first_sort_enforcement, physical_plan, - &TestConfig::new(SORT_DISTRIB_DISTRIB.into()) - ); + SORT_DISTRIB_DISTRIB.into(), + )?; Ok(()) } @@ -3385,16 +3642,17 @@ fn do_not_preserve_ordering_through_repartition3() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; - assert_optimized!( + let test_config = TestConfig::default(); + test_config.run( expected, physical_plan.clone(), - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - ); - assert_optimized!( + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( expected, physical_plan, - &TestConfig::new(SORT_DISTRIB_DISTRIB.into()) - ); + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; Ok(()) } @@ -3484,22 +3742,25 @@ fn do_not_add_unnecessary_hash() -> Result<()> { let input = parquet_exec_with_sort(vec![sort_key]); let physical_plan = aggregate_exec_with_alias(input, alias); + // TestConfig: + // Make sure target partition number is 1. In this case hash repartition is unnecessary. + let test_config = TestConfig::default().with_query_execution_partitions(1); + let expected = &[ "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]", " AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; - // Make sure target partition number is 1. In this case hash repartition is unnecessary - assert_optimized!( + test_config.run( expected, physical_plan.clone(), - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()).with_query_execution_partitions(1) - ); - assert_optimized!( + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( expected, physical_plan, - &TestConfig::new(SORT_DISTRIB_DISTRIB.into()).with_query_execution_partitions(1) - ); + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; Ok(()) } @@ -3516,6 +3777,10 @@ fn do_not_add_unnecessary_hash2() -> Result<()> { let aggregate = aggregate_exec_with_alias(input, alias.clone()); let physical_plan = aggregate_exec_with_alias(aggregate, alias); + // TestConfig: + // Make sure target partition number is larger than 2 (e.g partition number at the source). + let test_config = TestConfig::default().with_query_execution_partitions(4); + let expected = &[ "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]", // Since hash requirements of this operator is satisfied. There shouldn't be @@ -3527,17 +3792,16 @@ fn do_not_add_unnecessary_hash2() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2", " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; - // Make sure target partition number is larger than 2 (e.g partition number at the source). - assert_optimized!( + test_config.run( expected, physical_plan.clone(), - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()).with_query_execution_partitions(4) - ); - assert_optimized!( + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( expected, physical_plan, - &TestConfig::new(SORT_DISTRIB_DISTRIB.into()).with_query_execution_partitions(4) - ); + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; Ok(()) } @@ -3554,16 +3818,18 @@ fn optimize_away_unnecessary_repartition() -> Result<()> { let expected = &["DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet"]; - assert_optimized!( + + let test_config = TestConfig::default(); + test_config.run( expected, physical_plan.clone(), - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - ); - assert_optimized!( + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( expected, physical_plan, - &TestConfig::new(SORT_DISTRIB_DISTRIB.into()) - ); + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; Ok(()) } @@ -3589,16 +3855,17 @@ fn optimize_away_unnecessary_repartition2() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!( + let test_config = TestConfig::default(); + test_config.run( expected, physical_plan.clone(), - &TestConfig::new(DISTRIB_DISTRIB_SORT.into()) - ); - assert_optimized!( + DISTRIB_DISTRIB_SORT.into(), // same if distribution enforced before sort. + )?; + test_config.run( expected, physical_plan, - &TestConfig::new(SORT_DISTRIB_DISTRIB.into()) - ); + SORT_DISTRIB_DISTRIB.into(), // same if sort enforced before distribution. + )?; Ok(()) }