From 36aa76e2bb22d02b9b103b8586b3af2a5450a519 Mon Sep 17 00:00:00 2001 From: wiedld Date: Tue, 4 Mar 2025 13:18:44 -0800 Subject: [PATCH] chore(15003): add identation to plans, to make easier to read --- .../enforce_distribution.rs | 1171 +++++++++-------- 1 file changed, 590 insertions(+), 581 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index fc2394d889d1..85d826109f89 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -19,11 +19,11 @@ use std::fmt::Debug; use std::ops::Deref; use std::sync::Arc; +use crate::physical_optimizer::test_utils::parquet_exec_with_sort; use crate::physical_optimizer::test_utils::{ check_integrity, coalesce_partitions_exec, repartition_exec, schema, sort_merge_join_exec, sort_preserving_merge_exec, }; -use crate::physical_optimizer::test_utils::{parquet_exec_with_sort, trim_plan_display}; use arrow::compute::SortOptions; use datafusion::config::ConfigOptions; @@ -61,7 +61,9 @@ use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeE use datafusion_physical_plan::union::UnionExec; use datafusion_physical_plan::ExecutionPlanProperties; use datafusion_physical_plan::PlanProperties; -use datafusion_physical_plan::{displayable, DisplayAs, DisplayFormatType, Statistics}; +use datafusion_physical_plan::{ + get_plan_string, DisplayAs, DisplayFormatType, Statistics, +}; /// Models operators like BoundedWindowExec that require an input /// ordering but is easy to construct @@ -358,8 +360,7 @@ fn ensure_distribution_helper( macro_rules! plans_matches_expected { ($EXPECTED_LINES: expr, $PLAN: expr) => { let physical_plan = $PLAN; - let formatted = displayable(physical_plan.as_ref()).indent(true).to_string(); - let actual: Vec<&str> = formatted.trim().lines().collect(); + let actual = get_plan_string(&physical_plan); let expected_plan_lines: Vec<&str> = $EXPECTED_LINES .iter().map(|s| *s).collect(); @@ -485,8 +486,7 @@ macro_rules! assert_optimized { let optimized = optimizer.optimize(optimized, &config)?; // Now format correctly - let plan = displayable(optimized.as_ref()).indent(true).to_string(); - let actual_lines = trim_plan_display(&plan); + let actual_lines = get_plan_string(&optimized); assert_eq!( &expected_lines, &actual_lines, @@ -500,8 +500,7 @@ macro_rules! assert_plan_txt { ($EXPECTED_LINES: expr, $PLAN: expr) => { let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect(); // Now format correctly - let plan = displayable($PLAN.as_ref()).indent(true).to_string(); - let actual_lines = trim_plan_display(&plan); + let actual_lines = get_plan_string(&$PLAN); assert_eq!( &expected_lines, &actual_lines, @@ -542,9 +541,11 @@ fn multi_hash_joins() -> Result<()> { for join_type in join_types { let join = hash_join_exec(left.clone(), right.clone(), &join_on, &join_type); - let join_plan = format!( - "HashJoinExec: mode=Partitioned, join_type={join_type}, on=[(a@0, b1@1)]" - ); + let join_plan = |shift| -> String { + format!("{}HashJoinExec: mode=Partitioned, join_type={join_type}, on=[(a@0, b1@1)]", " ".repeat(shift)) + }; + let join_plan_indent2 = join_plan(2); + let join_plan_indent4 = join_plan(4); match join_type { JoinType::Inner @@ -572,33 +573,33 @@ fn multi_hash_joins() -> Result<()> { // Should include 3 RepartitionExecs JoinType::Inner | JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => vec![ top_join_plan.as_str(), - join_plan.as_str(), - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + &join_plan_indent2, + " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ], // Should include 4 RepartitionExecs _ => vec![ top_join_plan.as_str(), - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", - join_plan.as_str(), - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", + &join_plan_indent4, + " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ], }; assert_optimized!(expected, top_join.clone(), true); @@ -635,34 +636,34 @@ fn multi_hash_joins() -> Result<()> { JoinType::Inner | JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => vec![ top_join_plan.as_str(), - join_plan.as_str(), - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + &join_plan_indent2, + " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ], // Should include 4 RepartitionExecs _ => vec![ top_join_plan.as_str(), - "RepartitionExec: partitioning=Hash([b1@6], 10), input_partitions=10", - join_plan.as_str(), - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " RepartitionExec: partitioning=Hash([b1@6], 10), input_partitions=10", + &join_plan_indent4, + " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ], }; assert_optimized!(expected, top_join.clone(), true); @@ -710,17 +711,17 @@ fn multi_joins_after_alias() -> Result<()> { // Output partition need to respect the Alias and should not introduce additional RepartitionExec let expected = &[ "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a1@0, c@2)]", - "ProjectionExec: expr=[a@0 as a1, a@0 as a2]", - "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@1)]", - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "RepartitionExec: partitioning=Hash([b@1], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " ProjectionExec: expr=[a@0 as a1, a@0 as a2]", + " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@1)]", + " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " RepartitionExec: partitioning=Hash([b@1], 10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; assert_optimized!(expected, top_join.clone(), true); assert_optimized!(expected, top_join, false); @@ -736,17 +737,17 @@ fn multi_joins_after_alias() -> Result<()> { // Output partition need to respect the Alias and should not introduce additional RepartitionExec let expected = &[ "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a2@1, c@2)]", - "ProjectionExec: expr=[a@0 as a1, a@0 as a2]", - "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@1)]", - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "RepartitionExec: partitioning=Hash([b@1], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " ProjectionExec: expr=[a@0 as a1, a@0 as a2]", + " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@1)]", + " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " RepartitionExec: partitioning=Hash([b@1], 10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; assert_optimized!(expected, top_join.clone(), true); assert_optimized!(expected, top_join, false); @@ -787,19 +788,19 @@ fn multi_joins_after_multi_alias() -> Result<()> { // The original Output partition can not satisfy the Join requirements and need to add an additional RepartitionExec let expected = &[ "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, c@2)]", - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", - "ProjectionExec: expr=[c1@0 as a]", - "ProjectionExec: expr=[c@2 as c1]", - "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@1)]", - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "RepartitionExec: partitioning=Hash([b@1], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", + " ProjectionExec: expr=[c1@0 as a]", + " ProjectionExec: expr=[c@2 as c1]", + " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@1)]", + " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " RepartitionExec: partitioning=Hash([b@1], 10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; assert_optimized!(expected, top_join.clone(), true); @@ -831,16 +832,16 @@ fn join_after_agg_alias() -> Result<()> { // Only two RepartitionExecs added let expected = &[ "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a1@0, a2@0)]", - "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]", - "RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10", - "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "AggregateExec: mode=FinalPartitioned, gby=[a2@0 as a2], aggr=[]", - "RepartitionExec: partitioning=Hash([a2@0], 10), input_partitions=10", - "AggregateExec: mode=Partial, gby=[a@0 as a2], aggr=[]", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]", + " RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10", + " AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " AggregateExec: mode=FinalPartitioned, gby=[a2@0 as a2], aggr=[]", + " RepartitionExec: partitioning=Hash([a2@0], 10), input_partitions=10", + " AggregateExec: mode=Partial, gby=[a@0 as a2], aggr=[]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; assert_optimized!(expected, join.clone(), true); assert_optimized!(expected, join, false); @@ -883,17 +884,17 @@ fn hash_join_key_ordering() -> Result<()> { // Only two RepartitionExecs added let expected = &[ "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(b1@1, b@0), (a1@0, a@1)]", - "ProjectionExec: expr=[a1@1 as a1, b1@0 as b1]", - "AggregateExec: mode=FinalPartitioned, gby=[b1@0 as b1, a1@1 as a1], aggr=[]", - "RepartitionExec: partitioning=Hash([b1@0, a1@1], 10), input_partitions=10", - "AggregateExec: mode=Partial, gby=[b@1 as b1, a@0 as a1], aggr=[]", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[]", - "RepartitionExec: partitioning=Hash([b@0, a@1], 10), input_partitions=10", - "AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as a], aggr=[]", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " ProjectionExec: expr=[a1@1 as a1, b1@0 as b1]", + " AggregateExec: mode=FinalPartitioned, gby=[b1@0 as b1, a1@1 as a1], aggr=[]", + " RepartitionExec: partitioning=Hash([b1@0, a1@1], 10), input_partitions=10", + " AggregateExec: mode=Partial, gby=[b@1 as b1, a@0 as a1], aggr=[]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[]", + " RepartitionExec: partitioning=Hash([b@0, a@1], 10), input_partitions=10", + " AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as a], aggr=[]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; assert_optimized!(expected, join.clone(), true); assert_optimized!(expected, join, false); @@ -1002,24 +1003,24 @@ fn multi_hash_join_key_ordering() -> Result<()> { // The bottom joins' join key ordering is adjusted based on the top join. And the top join should not introduce additional RepartitionExec let expected = &[ "FilterExec: c@6 > 1", - "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(B@2, b1@6), (C@3, c@2), (AA@1, a1@5)]", - "ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]", - "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(b@1, b1@1), (c@2, c1@2), (a@0, a1@0)]", - "RepartitionExec: partitioning=Hash([b@1, c@2, a@0], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "RepartitionExec: partitioning=Hash([b1@1, c1@2, a1@0], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(b@1, b1@1), (c@2, c1@2), (a@0, a1@0)]", - "RepartitionExec: partitioning=Hash([b@1, c@2, a@0], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "RepartitionExec: partitioning=Hash([b1@1, c1@2, a1@0], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(B@2, b1@6), (C@3, c@2), (AA@1, a1@5)]", + " ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]", + " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(b@1, b1@1), (c@2, c1@2), (a@0, a1@0)]", + " RepartitionExec: partitioning=Hash([b@1, c@2, a@0], 10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " RepartitionExec: partitioning=Hash([b1@1, c1@2, a1@0], 10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(b@1, b1@1), (c@2, c1@2), (a@0, a1@0)]", + " RepartitionExec: partitioning=Hash([b@1, c@2, a@0], 10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " RepartitionExec: partitioning=Hash([b1@1, c1@2, a1@0], 10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; assert_optimized!(expected, filter_top_join.clone(), true); assert_optimized!(expected, filter_top_join, false); @@ -1141,23 +1142,23 @@ fn reorder_join_keys_to_left_input() -> Result<()> { // The top joins' join key ordering is adjusted based on the children inputs. let expected = &[ top_join_plan.as_str(), - "ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]", - "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a1@0), (b@1, b1@1), (c@2, c1@2)]", - "RepartitionExec: partitioning=Hash([a@0, b@1, c@2], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "RepartitionExec: partitioning=Hash([a1@0, b1@1, c1@2], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@2, c1@2), (b@1, b1@1), (a@0, a1@0)]", - "RepartitionExec: partitioning=Hash([c@2, b@1, a@0], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "RepartitionExec: partitioning=Hash([c1@2, b1@1, a1@0], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]", + " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a1@0), (b@1, b1@1), (c@2, c1@2)]", + " RepartitionExec: partitioning=Hash([a@0, b@1, c@2], 10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " RepartitionExec: partitioning=Hash([a1@0, b1@1, c1@2], 10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@2, c1@2), (b@1, b1@1), (a@0, a1@0)]", + " RepartitionExec: partitioning=Hash([c@2, b@1, a@0], 10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " RepartitionExec: partitioning=Hash([c1@2, b1@1, a1@0], 10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; assert_plan_txt!(expected, reordered); @@ -1275,23 +1276,23 @@ fn reorder_join_keys_to_right_input() -> Result<()> { // The top joins' join key ordering is adjusted based on the children inputs. let expected = &[ top_join_plan.as_str(), - "ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]", - "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a1@0), (b@1, b1@1)]", - "RepartitionExec: partitioning=Hash([a@0, b@1], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "RepartitionExec: partitioning=Hash([a1@0, b1@1], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@2, c1@2), (b@1, b1@1), (a@0, a1@0)]", - "RepartitionExec: partitioning=Hash([c@2, b@1, a@0], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "RepartitionExec: partitioning=Hash([c1@2, b1@1, a1@0], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]", + " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a1@0), (b@1, b1@1)]", + " RepartitionExec: partitioning=Hash([a@0, b@1], 10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " RepartitionExec: partitioning=Hash([a1@0, b1@1], 10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@2, c1@2), (b@1, b1@1), (a@0, a1@0)]", + " RepartitionExec: partitioning=Hash([c@2, b@1, a@0], 10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " RepartitionExec: partitioning=Hash([c1@2, b1@1, a1@0], 10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; assert_plan_txt!(expected, reordered); @@ -1331,7 +1332,15 @@ fn multi_smj_joins() -> Result<()> { for join_type in join_types { let join = sort_merge_join_exec(left.clone(), right.clone(), &join_on, &join_type); - let join_plan = format!("SortMergeJoin: join_type={join_type}, on=[(a@0, b1@1)]"); + let join_plan = |shift| -> String { + format!( + "{}SortMergeJoin: join_type={join_type}, on=[(a@0, b1@1)]", + " ".repeat(shift) + ) + }; + let join_plan_indent2 = join_plan(2); + let join_plan_indent6 = join_plan(6); + let join_plan_indent10 = join_plan(10); // Top join on (a == c) let top_join_on = vec![( @@ -1348,20 +1357,20 @@ fn multi_smj_joins() -> Result<()> { JoinType::Inner | JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti => vec![ top_join_plan.as_str(), - join_plan.as_str(), - "SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "SortExec: expr=[b1@1 ASC], preserve_partitioning=[true]", - "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "SortExec: expr=[c@2 ASC], preserve_partitioning=[true]", - "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + &join_plan_indent2, + " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", + " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " SortExec: expr=[b1@1 ASC], preserve_partitioning=[true]", + " RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " SortExec: expr=[c@2 ASC], preserve_partitioning=[true]", + " RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ], // Should include 7 RepartitionExecs (4 hash, 3 round-robin), 4 SortExecs // Since ordering of the left child is not preserved after SortMergeJoin @@ -1375,22 +1384,22 @@ fn multi_smj_joins() -> Result<()> { _ => vec![ top_join_plan.as_str(), // Below 2 operators are differences introduced, when join mode is changed - "SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", - join_plan.as_str(), - "SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "SortExec: expr=[b1@1 ASC], preserve_partitioning=[true]", - "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "SortExec: expr=[c@2 ASC], preserve_partitioning=[true]", - "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", + " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", + &join_plan_indent6, + " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", + " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " SortExec: expr=[b1@1 ASC], preserve_partitioning=[true]", + " RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " SortExec: expr=[c@2 ASC], preserve_partitioning=[true]", + " RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ], }; assert_optimized!(expected, top_join.clone(), true, true); @@ -1400,20 +1409,20 @@ fn multi_smj_joins() -> Result<()> { JoinType::Inner | JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti => vec![ top_join_plan.as_str(), - join_plan.as_str(), - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "SortExec: expr=[b1@1 ASC], preserve_partitioning=[false]", - "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + &join_plan_indent2, + " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " SortExec: expr=[b1@1 ASC], preserve_partitioning=[false]", + " ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ], // Should include 8 RepartitionExecs (4 hash, 8 round-robin), 4 SortExecs // Since ordering of the left child is not preserved after SortMergeJoin @@ -1427,24 +1436,24 @@ fn multi_smj_joins() -> Result<()> { _ => vec![ top_join_plan.as_str(), // Below 4 operators are differences introduced, when join mode is changed - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", - "CoalescePartitionsExec", - join_plan.as_str(), - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "SortExec: expr=[b1@1 ASC], preserve_partitioning=[false]", - "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " CoalescePartitionsExec", + &join_plan_indent10, + " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " SortExec: expr=[b1@1 ASC], preserve_partitioning=[false]", + " ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ], }; assert_optimized!(expected_first_sort_enforcement, top_join, false, true); @@ -1466,40 +1475,40 @@ fn multi_smj_joins() -> Result<()> { // Should include 6 RepartitionExecs(3 hash, 3 round-robin) and 3 SortExecs JoinType::Inner | JoinType::Right => vec![ top_join_plan.as_str(), - join_plan.as_str(), - "SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "SortExec: expr=[b1@1 ASC], preserve_partitioning=[true]", - "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "SortExec: expr=[c@2 ASC], preserve_partitioning=[true]", - "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + &join_plan_indent2, + " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", + " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " SortExec: expr=[b1@1 ASC], preserve_partitioning=[true]", + " RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " SortExec: expr=[c@2 ASC], preserve_partitioning=[true]", + " RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ], // Should include 7 RepartitionExecs (4 hash, 3 round-robin) and 4 SortExecs JoinType::Left | JoinType::Full => vec![ top_join_plan.as_str(), - "SortExec: expr=[b1@6 ASC], preserve_partitioning=[true]", - "RepartitionExec: partitioning=Hash([b1@6], 10), input_partitions=10", - join_plan.as_str(), - "SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "SortExec: expr=[b1@1 ASC], preserve_partitioning=[true]", - "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "SortExec: expr=[c@2 ASC], preserve_partitioning=[true]", - "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " SortExec: expr=[b1@6 ASC], preserve_partitioning=[true]", + " RepartitionExec: partitioning=Hash([b1@6], 10), input_partitions=10", + &join_plan_indent6, + " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", + " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " SortExec: expr=[b1@1 ASC], preserve_partitioning=[true]", + " RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " SortExec: expr=[c@2 ASC], preserve_partitioning=[true]", + " RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ], // this match arm cannot be reached _ => unreachable!() @@ -1510,42 +1519,42 @@ fn multi_smj_joins() -> Result<()> { // Should include 6 RepartitionExecs (3 of them preserves order) and 3 SortExecs JoinType::Inner | JoinType::Right => vec![ top_join_plan.as_str(), - join_plan.as_str(), - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "SortExec: expr=[b1@1 ASC], preserve_partitioning=[false]", - "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + &join_plan_indent2, + " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " SortExec: expr=[b1@1 ASC], preserve_partitioning=[false]", + " ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ], // Should include 8 RepartitionExecs (4 of them preserves order) and 4 SortExecs JoinType::Left | JoinType::Full => vec![ top_join_plan.as_str(), - "RepartitionExec: partitioning=Hash([b1@6], 10), input_partitions=10, preserve_order=true, sort_exprs=b1@6 ASC", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "SortExec: expr=[b1@6 ASC], preserve_partitioning=[false]", - "CoalescePartitionsExec", - join_plan.as_str(), - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "SortExec: expr=[b1@1 ASC], preserve_partitioning=[false]", - "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " RepartitionExec: partitioning=Hash([b1@6], 10), input_partitions=10, preserve_order=true, sort_exprs=b1@6 ASC", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " SortExec: expr=[b1@6 ASC], preserve_partitioning=[false]", + " CoalescePartitionsExec", + &join_plan_indent10, + " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " SortExec: expr=[b1@1 ASC], preserve_partitioning=[false]", + " ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ], // this match arm cannot be reached _ => unreachable!() @@ -1608,47 +1617,47 @@ fn smj_join_key_ordering() -> Result<()> { // Only two RepartitionExecs added let expected = &[ "SortMergeJoin: join_type=Inner, on=[(b3@1, b2@1), (a3@0, a2@0)]", - "SortExec: expr=[b3@1 ASC, a3@0 ASC], preserve_partitioning=[true]", - "ProjectionExec: expr=[a1@0 as a3, b1@1 as b3]", - "ProjectionExec: expr=[a1@1 as a1, b1@0 as b1]", - "AggregateExec: mode=FinalPartitioned, gby=[b1@0 as b1, a1@1 as a1], aggr=[]", - "RepartitionExec: partitioning=Hash([b1@0, a1@1], 10), input_partitions=10", - "AggregateExec: mode=Partial, gby=[b@1 as b1, a@0 as a1], aggr=[]", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "SortExec: expr=[b2@1 ASC, a2@0 ASC], preserve_partitioning=[true]", - "ProjectionExec: expr=[a@1 as a2, b@0 as b2]", - "AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[]", - "RepartitionExec: partitioning=Hash([b@0, a@1], 10), input_partitions=10", - "AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as a], aggr=[]", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " SortExec: expr=[b3@1 ASC, a3@0 ASC], preserve_partitioning=[true]", + " ProjectionExec: expr=[a1@0 as a3, b1@1 as b3]", + " ProjectionExec: expr=[a1@1 as a1, b1@0 as b1]", + " AggregateExec: mode=FinalPartitioned, gby=[b1@0 as b1, a1@1 as a1], aggr=[]", + " RepartitionExec: partitioning=Hash([b1@0, a1@1], 10), input_partitions=10", + " AggregateExec: mode=Partial, gby=[b@1 as b1, a@0 as a1], aggr=[]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " SortExec: expr=[b2@1 ASC, a2@0 ASC], preserve_partitioning=[true]", + " ProjectionExec: expr=[a@1 as a2, b@0 as b2]", + " AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[]", + " RepartitionExec: partitioning=Hash([b@0, a@1], 10), input_partitions=10", + " AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as a], aggr=[]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; assert_optimized!(expected, join.clone(), true, true); let expected_first_sort_enforcement = &[ "SortMergeJoin: join_type=Inner, on=[(b3@1, b2@1), (a3@0, a2@0)]", - "RepartitionExec: partitioning=Hash([b3@1, a3@0], 10), input_partitions=10, preserve_order=true, sort_exprs=b3@1 ASC, a3@0 ASC", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "SortExec: expr=[b3@1 ASC, a3@0 ASC], preserve_partitioning=[false]", - "CoalescePartitionsExec", - "ProjectionExec: expr=[a1@0 as a3, b1@1 as b3]", - "ProjectionExec: expr=[a1@1 as a1, b1@0 as b1]", - "AggregateExec: mode=FinalPartitioned, gby=[b1@0 as b1, a1@1 as a1], aggr=[]", - "RepartitionExec: partitioning=Hash([b1@0, a1@1], 10), input_partitions=10", - "AggregateExec: mode=Partial, gby=[b@1 as b1, a@0 as a1], aggr=[]", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "RepartitionExec: partitioning=Hash([b2@1, a2@0], 10), input_partitions=10, preserve_order=true, sort_exprs=b2@1 ASC, a2@0 ASC", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "SortExec: expr=[b2@1 ASC, a2@0 ASC], preserve_partitioning=[false]", - "CoalescePartitionsExec", - "ProjectionExec: expr=[a@1 as a2, b@0 as b2]", - "AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[]", - "RepartitionExec: partitioning=Hash([b@0, a@1], 10), input_partitions=10", - "AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as a], aggr=[]", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " RepartitionExec: partitioning=Hash([b3@1, a3@0], 10), input_partitions=10, preserve_order=true, sort_exprs=b3@1 ASC, a3@0 ASC", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " SortExec: expr=[b3@1 ASC, a3@0 ASC], preserve_partitioning=[false]", + " CoalescePartitionsExec", + " ProjectionExec: expr=[a1@0 as a3, b1@1 as b3]", + " ProjectionExec: expr=[a1@1 as a1, b1@0 as b1]", + " AggregateExec: mode=FinalPartitioned, gby=[b1@0 as b1, a1@1 as a1], aggr=[]", + " RepartitionExec: partitioning=Hash([b1@0, a1@1], 10), input_partitions=10", + " AggregateExec: mode=Partial, gby=[b@1 as b1, a@0 as a1], aggr=[]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " RepartitionExec: partitioning=Hash([b2@1, a2@0], 10), input_partitions=10, preserve_order=true, sort_exprs=b2@1 ASC, a2@0 ASC", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " SortExec: expr=[b2@1 ASC, a2@0 ASC], preserve_partitioning=[false]", + " CoalescePartitionsExec", + " ProjectionExec: expr=[a@1 as a2, b@0 as b2]", + " AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[]", + " RepartitionExec: partitioning=Hash([b@0, a@1], 10), input_partitions=10", + " AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as a], aggr=[]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; assert_optimized!(expected_first_sort_enforcement, join, false, true); @@ -1678,8 +1687,8 @@ fn merge_does_not_need_sort() -> Result<()> { // data is already sorted let expected = &[ "SortPreservingMergeExec: [a@0 ASC]", - "CoalesceBatchesExec: target_batch_size=4096", - "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet", + " CoalesceBatchesExec: target_batch_size=4096", + " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet", ]; assert_optimized!(expected, exec, true); @@ -1689,9 +1698,9 @@ fn merge_does_not_need_sort() -> Result<()> { // SortExec at the top. let expected = &[ "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", - "CoalescePartitionsExec", - "CoalesceBatchesExec: target_batch_size=4096", - "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet", + " CoalescePartitionsExec", + " CoalesceBatchesExec: target_batch_size=4096", + " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet", ]; assert_optimized!(expected, exec, false); @@ -1721,18 +1730,18 @@ fn union_to_interleave() -> Result<()> { // Only two RepartitionExecs added, no final RepartitionExec required let expected = &[ "AggregateExec: mode=FinalPartitioned, gby=[a2@0 as a2], aggr=[]", - "AggregateExec: mode=Partial, gby=[a1@0 as a2], aggr=[]", - "InterleaveExec", - "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]", - "RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10", - "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]", - "RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10", - "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " AggregateExec: mode=Partial, gby=[a1@0 as a2], aggr=[]", + " InterleaveExec", + " AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]", + " RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10", + " AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]", + " RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10", + " AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; assert_optimized!(expected, plan.clone(), true); assert_optimized!(expected, plan.clone(), false); @@ -1763,19 +1772,19 @@ fn union_not_to_interleave() -> Result<()> { // Only two RepartitionExecs added, no final RepartitionExec required let expected = &[ "AggregateExec: mode=FinalPartitioned, gby=[a2@0 as a2], aggr=[]", - "RepartitionExec: partitioning=Hash([a2@0], 10), input_partitions=20", - "AggregateExec: mode=Partial, gby=[a1@0 as a2], aggr=[]", - "UnionExec", - "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]", - "RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10", - "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]", - "RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10", - "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " RepartitionExec: partitioning=Hash([a2@0], 10), input_partitions=20", + " AggregateExec: mode=Partial, gby=[a1@0 as a2], aggr=[]", + " UnionExec", + " AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]", + " RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10", + " AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]", + " RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10", + " AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; // no sort in the plan but since we need it as a parameter, make it default false let prefer_existing_sort = false; @@ -1807,10 +1816,10 @@ fn added_repartition_to_single_partition() -> Result<()> { let expected = [ "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]", - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", - "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", + " AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; assert_optimized!(expected, plan.clone(), true); assert_optimized!(expected, plan, false); @@ -1825,11 +1834,11 @@ fn repartition_deepest_node() -> Result<()> { let expected = &[ "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]", - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", - "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]", - "FilterExec: c@2 = 0", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", + " AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]", + " FilterExec: c@2 = 0", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; assert_optimized!(expected, plan.clone(), true); assert_optimized!(expected, plan, false); @@ -1844,12 +1853,12 @@ fn repartition_unsorted_limit() -> Result<()> { let expected = &[ "GlobalLimitExec: skip=0, fetch=100", - "CoalescePartitionsExec", - "LocalLimitExec: fetch=100", - "FilterExec: c@2 = 0", + " CoalescePartitionsExec", + " LocalLimitExec: fetch=100", + " FilterExec: c@2 = 0", // nothing sorts the data, so the local limit doesn't require sorted data either - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; assert_optimized!(expected, plan.clone(), true); @@ -1869,10 +1878,10 @@ fn repartition_sorted_limit() -> Result<()> { let expected = &[ "GlobalLimitExec: skip=0, fetch=100", - "LocalLimitExec: fetch=100", + " LocalLimitExec: fetch=100", // data is sorted so can't repartition here - "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; assert_optimized!(expected, plan.clone(), true); assert_optimized!(expected, plan, false); @@ -1894,12 +1903,12 @@ fn repartition_sorted_limit_with_filter() -> Result<()> { let expected = &[ "SortRequiredExec: [c@2 ASC]", - "FilterExec: c@2 = 0", + " FilterExec: c@2 = 0", // We can use repartition here, ordering requirement by SortRequiredExec // is still satisfied. - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; assert_optimized!(expected, plan.clone(), true); @@ -1918,19 +1927,19 @@ fn repartition_ignores_limit() -> Result<()> { let expected = &[ "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]", - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", - "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "GlobalLimitExec: skip=0, fetch=100", - "CoalescePartitionsExec", - "LocalLimitExec: fetch=100", - "FilterExec: c@2 = 0", + " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", + " AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " GlobalLimitExec: skip=0, fetch=100", + " CoalescePartitionsExec", + " LocalLimitExec: fetch=100", + " FilterExec: c@2 = 0", // repartition should happen prior to the filter to maximize parallelism - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "GlobalLimitExec: skip=0, fetch=100", - "LocalLimitExec: fetch=100", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " GlobalLimitExec: skip=0, fetch=100", + " LocalLimitExec: fetch=100", // Expect no repartition to happen for local limit - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; assert_optimized!(expected, plan.clone(), true); assert_optimized!(expected, plan, false); @@ -1945,11 +1954,11 @@ fn repartition_ignores_union() -> Result<()> { let expected = &[ "UnionExec", // Expect no repartition of DataSourceExec - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; assert_optimized!(expected, plan.clone(), true); @@ -1971,7 +1980,7 @@ fn repartition_through_sort_preserving_merge() -> Result<()> { // need resort as the data was not sorted correctly let expected = &[ "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; assert_optimized!(expected, plan.clone(), true); assert_optimized!(expected, plan, false); @@ -1996,15 +2005,15 @@ fn repartition_ignores_sort_preserving_merge() -> Result<()> { // should not repartition, since increased parallelism is not beneficial for SortPReservingMerge let expected = &[ "SortPreservingMergeExec: [c@2 ASC]", - "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", + " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; assert_optimized!(expected, plan.clone(), true); let expected = &[ "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", - "CoalescePartitionsExec", - "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", + " CoalescePartitionsExec", + " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; assert_optimized!(expected, plan, false); @@ -2025,19 +2034,19 @@ fn repartition_ignores_sort_preserving_merge_with_union() -> Result<()> { // should not repartition / sort (as the data was already sorted) let expected = &[ "SortPreservingMergeExec: [c@2 ASC]", - "UnionExec", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", + " UnionExec", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; assert_optimized!(expected, plan.clone(), true); let expected = &[ "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", - "CoalescePartitionsExec", - "UnionExec", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", + " CoalescePartitionsExec", + " UnionExec", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; assert_optimized!(expected, plan, false); @@ -2061,9 +2070,9 @@ fn repartition_does_not_destroy_sort() -> Result<()> { // during repartitioning ordering is preserved let expected = &[ "SortRequiredExec: [d@3 ASC]", - "FilterExec: c@2 = 0", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[d@3 ASC], file_type=parquet", + " FilterExec: c@2 = 0", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[d@3 ASC], file_type=parquet", ]; assert_optimized!(expected, plan.clone(), true, true); @@ -2100,12 +2109,12 @@ fn repartition_does_not_destroy_sort_more_complex() -> Result<()> { let expected = &[ "UnionExec", // union input 1: no repartitioning - "SortRequiredExec: [c@2 ASC]", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", + " SortRequiredExec: [c@2 ASC]", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", // union input 2: should repartition - "FilterExec: c@2 = 0", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " FilterExec: c@2 = 0", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; assert_optimized!(expected, plan.clone(), true); assert_optimized!(expected, plan, false); @@ -2134,22 +2143,22 @@ fn repartition_transitively_with_projection() -> Result<()> { let expected = &[ "SortPreservingMergeExec: [sum@0 ASC]", - "SortExec: expr=[sum@0 ASC], preserve_partitioning=[true]", + " SortExec: expr=[sum@0 ASC], preserve_partitioning=[true]", // Since this projection is not trivial, increasing parallelism is beneficial - "ProjectionExec: expr=[a@0 + b@1 as sum]", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " ProjectionExec: expr=[a@0 + b@1 as sum]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; assert_optimized!(expected, plan.clone(), true); let expected_first_sort_enforcement = &[ "SortExec: expr=[sum@0 ASC], preserve_partitioning=[false]", - "CoalescePartitionsExec", + " CoalescePartitionsExec", // Since this projection is not trivial, increasing parallelism is beneficial - "ProjectionExec: expr=[a@0 + b@1 as sum]", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " ProjectionExec: expr=[a@0 + b@1 as sum]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; assert_optimized!(expected_first_sort_enforcement, plan, false); @@ -2180,8 +2189,8 @@ fn repartition_ignores_transitively_with_projection() -> Result<()> { let expected = &[ "SortRequiredExec: [c@2 ASC]", // Since this projection is trivial, increasing parallelism is not beneficial - "ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]", - "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", + " ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]", + " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; assert_optimized!(expected, plan.clone(), true); assert_optimized!(expected, plan, false); @@ -2213,8 +2222,8 @@ fn repartition_transitively_past_sort_with_projection() -> Result<()> { let expected = &[ "SortExec: expr=[c@2 ASC], preserve_partitioning=[true]", // Since this projection is trivial, increasing parallelism is not beneficial - "ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; assert_optimized!(expected, plan.clone(), true); assert_optimized!(expected, plan, false); @@ -2233,22 +2242,22 @@ fn repartition_transitively_past_sort_with_filter() -> Result<()> { let expected = &[ "SortPreservingMergeExec: [a@0 ASC]", - "SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", // Expect repartition on the input to the sort (as it can benefit from additional parallelism) - "FilterExec: c@2 = 0", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " FilterExec: c@2 = 0", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; assert_optimized!(expected, plan.clone(), true); let expected_first_sort_enforcement = &[ "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", - "CoalescePartitionsExec", - "FilterExec: c@2 = 0", + " CoalescePartitionsExec", + " FilterExec: c@2 = 0", // Expect repartition on the input of the filter (as it can benefit from additional parallelism) - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; assert_optimized!(expected_first_sort_enforcement, plan, false); @@ -2279,23 +2288,23 @@ fn repartition_transitively_past_sort_with_projection_and_filter() -> Result<()> let expected = &[ "SortPreservingMergeExec: [a@0 ASC]", // Expect repartition on the input to the sort (as it can benefit from additional parallelism) - "SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", - "ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]", - "FilterExec: c@2 = 0", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", + " ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]", + " FilterExec: c@2 = 0", // repartition is lowest down - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; assert_optimized!(expected, plan.clone(), true); let expected_first_sort_enforcement = &[ "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", - "CoalescePartitionsExec", - "ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]", - "FilterExec: c@2 = 0", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " CoalescePartitionsExec", + " ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]", + " FilterExec: c@2 = 0", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; assert_optimized!(expected_first_sort_enforcement, plan, false); @@ -2310,15 +2319,15 @@ fn parallelization_single_partition() -> Result<()> { let expected_parquet = [ "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]", - "RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2", - "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]", - "DataSourceExec: file_groups={2 groups: [[x:0..50], [x:50..100]]}, projection=[a, b, c, d, e], file_type=parquet", + " RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2", + " AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]", + " DataSourceExec: file_groups={2 groups: [[x:0..50], [x:50..100]]}, projection=[a, b, c, d, e], file_type=parquet", ]; let expected_csv = [ "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]", - "RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2", - "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]", - "DataSourceExec: file_groups={2 groups: [[x:0..50], [x:50..100]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", + " RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2", + " AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]", + " DataSourceExec: file_groups={2 groups: [[x:0..50], [x:50..100]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", ]; assert_optimized!(expected_parquet, plan_parquet, true, false, 2, true, 10); assert_optimized!(expected_csv, plan_csv, true, false, 2, true, 10); @@ -2342,8 +2351,8 @@ fn parallelization_multiple_files() -> Result<()> { // https://github.com/apache/datafusion/issues/8451 let expected = [ "SortRequiredExec: [a@0 ASC]", - "FilterExec: c@2 = 0", - "DataSourceExec: file_groups={3 groups: [[x:0..50], [y:0..100], [x:50..100]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet", ]; + " FilterExec: c@2 = 0", + " DataSourceExec: file_groups={3 groups: [[x:0..50], [y:0..100], [x:50..100]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet", ]; let target_partitions = 3; let repartition_size = 1; assert_optimized!( @@ -2359,8 +2368,8 @@ fn parallelization_multiple_files() -> Result<()> { let expected = [ "SortRequiredExec: [a@0 ASC]", - "FilterExec: c@2 = 0", - "DataSourceExec: file_groups={8 groups: [[x:0..25], [y:0..25], [x:25..50], [y:25..50], [x:50..75], [y:50..75], [x:75..100], [y:75..100]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet", + " FilterExec: c@2 = 0", + " DataSourceExec: file_groups={8 groups: [[x:0..25], [y:0..25], [x:25..50], [y:25..50], [x:50..75], [y:50..75], [x:75..100], [y:75..100]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet", ]; let target_partitions = 8; let repartition_size = 1; @@ -2392,17 +2401,17 @@ fn parallelization_compressed_csv() -> Result<()> { let expected_not_partitioned = [ "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]", - "RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2", - "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]", - "RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", + " RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2", + " AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]", + " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", ]; let expected_partitioned = [ "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]", - "RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2", - "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]", - "DataSourceExec: file_groups={2 groups: [[x:0..50], [x:50..100]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", + " RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2", + " AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]", + " DataSourceExec: file_groups={2 groups: [[x:0..50], [x:50..100]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", ]; for compression_type in compression_types { @@ -2436,17 +2445,17 @@ fn parallelization_two_partitions() -> Result<()> { let expected_parquet = [ "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]", - "RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2", - "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]", + " RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2", + " AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]", // Plan already has two partitions - "DataSourceExec: file_groups={2 groups: [[x:0..100], [y:0..100]]}, projection=[a, b, c, d, e], file_type=parquet", + " DataSourceExec: file_groups={2 groups: [[x:0..100], [y:0..100]]}, projection=[a, b, c, d, e], file_type=parquet", ]; let expected_csv = [ "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]", - "RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2", - "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]", + " RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2", + " AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]", // Plan already has two partitions - "DataSourceExec: file_groups={2 groups: [[x:0..100], [y:0..100]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", + " DataSourceExec: file_groups={2 groups: [[x:0..100], [y:0..100]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", ]; assert_optimized!(expected_parquet, plan_parquet, true, false, 2, true, 10); assert_optimized!(expected_csv, plan_csv, true, false, 2, true, 10); @@ -2461,17 +2470,17 @@ fn parallelization_two_partitions_into_four() -> Result<()> { let expected_parquet = [ "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]", - "RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4", - "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]", + " RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4", + " AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]", // Multiple source files splitted across partitions - "DataSourceExec: file_groups={4 groups: [[x:0..50], [x:50..100], [y:0..50], [y:50..100]]}, projection=[a, b, c, d, e], file_type=parquet", + " DataSourceExec: file_groups={4 groups: [[x:0..50], [x:50..100], [y:0..50], [y:50..100]]}, projection=[a, b, c, d, e], file_type=parquet", ]; let expected_csv = [ "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]", - "RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4", - "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]", + " RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4", + " AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]", // Multiple source files splitted across partitions - "DataSourceExec: file_groups={4 groups: [[x:0..50], [x:50..100], [y:0..50], [y:50..100]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", + " DataSourceExec: file_groups={4 groups: [[x:0..50], [x:50..100], [y:0..50], [y:50..100]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", ]; assert_optimized!(expected_parquet, plan_parquet, true, false, 4, true, 10); assert_optimized!(expected_csv, plan_csv, true, false, 4, true, 10); @@ -2491,19 +2500,19 @@ fn parallelization_sorted_limit() -> Result<()> { let expected_parquet = &[ "GlobalLimitExec: skip=0, fetch=100", - "LocalLimitExec: fetch=100", + " LocalLimitExec: fetch=100", // data is sorted so can't repartition here - "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", + " SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", // Doesn't parallelize for SortExec without preserve_partitioning - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; let expected_csv = &[ "GlobalLimitExec: skip=0, fetch=100", - "LocalLimitExec: fetch=100", + " LocalLimitExec: fetch=100", // data is sorted so can't repartition here - "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", + " SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", // Doesn't parallelize for SortExec without preserve_partitioning - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", ]; assert_optimized!(expected_parquet, plan_parquet, true); assert_optimized!(expected_csv, plan_csv, true); @@ -2527,27 +2536,27 @@ fn parallelization_limit_with_filter() -> Result<()> { let expected_parquet = &[ "GlobalLimitExec: skip=0, fetch=100", - "CoalescePartitionsExec", - "LocalLimitExec: fetch=100", - "FilterExec: c@2 = 0", + " CoalescePartitionsExec", + " LocalLimitExec: fetch=100", + " FilterExec: c@2 = 0", // even though data is sorted, we can use repartition here. Since // ordering is not used in subsequent stages anyway. - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", // SortExec doesn't benefit from input partitioning - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; let expected_csv = &[ "GlobalLimitExec: skip=0, fetch=100", - "CoalescePartitionsExec", - "LocalLimitExec: fetch=100", - "FilterExec: c@2 = 0", + " CoalescePartitionsExec", + " LocalLimitExec: fetch=100", + " FilterExec: c@2 = 0", // even though data is sorted, we can use repartition here. Since // ordering is not used in subsequent stages anyway. - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", // SortExec doesn't benefit from input partitioning - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", ]; assert_optimized!(expected_parquet, plan_parquet, true); assert_optimized!(expected_csv, plan_csv, true); @@ -2567,35 +2576,35 @@ fn parallelization_ignores_limit() -> Result<()> { let expected_parquet = &[ "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]", - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", - "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "GlobalLimitExec: skip=0, fetch=100", - "CoalescePartitionsExec", - "LocalLimitExec: fetch=100", - "FilterExec: c@2 = 0", + " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", + " AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " GlobalLimitExec: skip=0, fetch=100", + " CoalescePartitionsExec", + " LocalLimitExec: fetch=100", + " FilterExec: c@2 = 0", // repartition should happen prior to the filter to maximize parallelism - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "GlobalLimitExec: skip=0, fetch=100", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " GlobalLimitExec: skip=0, fetch=100", // Limit doesn't benefit from input partitioning - no parallelism - "LocalLimitExec: fetch=100", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " LocalLimitExec: fetch=100", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; let expected_csv = &[ "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]", - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", - "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "GlobalLimitExec: skip=0, fetch=100", - "CoalescePartitionsExec", - "LocalLimitExec: fetch=100", - "FilterExec: c@2 = 0", + " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", + " AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " GlobalLimitExec: skip=0, fetch=100", + " CoalescePartitionsExec", + " LocalLimitExec: fetch=100", + " FilterExec: c@2 = 0", // repartition should happen prior to the filter to maximize parallelism - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "GlobalLimitExec: skip=0, fetch=100", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " GlobalLimitExec: skip=0, fetch=100", // Limit doesn't benefit from input partitioning - no parallelism - "LocalLimitExec: fetch=100", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", + " LocalLimitExec: fetch=100", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", ]; assert_optimized!(expected_parquet, plan_parquet, true); assert_optimized!(expected_csv, plan_csv, true); @@ -2611,20 +2620,20 @@ fn parallelization_union_inputs() -> Result<()> { let expected_parquet = &[ "UnionExec", // Union doesn't benefit from input partitioning - no parallelism - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; let expected_csv = &[ "UnionExec", // Union doesn't benefit from input partitioning - no parallelism - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", ]; assert_optimized!(expected_parquet, plan_parquet, true); assert_optimized!(expected_csv, plan_csv, true); @@ -2678,15 +2687,15 @@ fn parallelization_sort_preserving_merge_with_union() -> Result<()> { // should not sort (as the data was already sorted) let expected_parquet = &[ "SortPreservingMergeExec: [c@2 ASC]", - "UnionExec", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", + " UnionExec", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; let expected_csv = &[ "SortPreservingMergeExec: [c@2 ASC]", - "UnionExec", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false", + " UnionExec", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false", ]; assert_optimized!(expected_parquet, plan_parquet, true); assert_optimized!(expected_csv, plan_csv, true); @@ -2713,11 +2722,11 @@ fn parallelization_does_not_benefit() -> Result<()> { // no parallelization, because SortRequiredExec doesn't benefit from increased parallelism let expected_parquet = &[ "SortRequiredExec: [c@2 ASC]", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; let expected_csv = &[ "SortRequiredExec: [c@2 ASC]", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false", ]; assert_optimized!(expected_parquet, plan_parquet, true); assert_optimized!(expected_csv, plan_csv, true); @@ -2757,7 +2766,7 @@ fn parallelization_ignores_transitively_with_projection_parquet() -> Result<()> // data should not be repartitioned / resorted let expected_parquet = &[ "ProjectionExec: expr=[a@0 as a2, c@2 as c2]", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; assert_optimized!(expected_parquet, plan_parquet, true); @@ -2796,7 +2805,7 @@ fn parallelization_ignores_transitively_with_projection_csv() -> Result<()> { // data should not be repartitioned / resorted let expected_csv = &[ "ProjectionExec: expr=[a@0 as a2, c@2 as c2]", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false", ]; assert_optimized!(expected_csv, plan_csv, true); @@ -2819,8 +2828,8 @@ fn remove_redundant_roundrobins() -> Result<()> { let expected = &[ "FilterExec: c@2 = 0", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; assert_optimized!(expected, physical_plan.clone(), true); assert_optimized!(expected, physical_plan, false); @@ -2842,9 +2851,9 @@ fn remove_unnecessary_spm_after_filter() -> Result<()> { // This is still satisfied since, after filter that column is constant. let expected = &[ "CoalescePartitionsExec", - "FilterExec: c@2 = 0", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2, preserve_order=true, sort_exprs=c@2 ASC", - "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", + " FilterExec: c@2 = 0", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2, preserve_order=true, sort_exprs=c@2 ASC", + " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; // last flag sets config.optimizer.PREFER_EXISTING_SORT assert_optimized!(expected, physical_plan.clone(), true, true); @@ -2865,9 +2874,9 @@ fn preserve_ordering_through_repartition() -> Result<()> { let expected = &[ "SortPreservingMergeExec: [d@3 ASC]", - "FilterExec: c@2 = 0", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2, preserve_order=true, sort_exprs=d@3 ASC", - "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[d@3 ASC], file_type=parquet", + " FilterExec: c@2 = 0", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2, preserve_order=true, sort_exprs=d@3 ASC", + " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[d@3 ASC], file_type=parquet", ]; // last flag sets config.optimizer.PREFER_EXISTING_SORT assert_optimized!(expected, physical_plan.clone(), true, true); @@ -2888,20 +2897,20 @@ fn do_not_preserve_ordering_through_repartition() -> Result<()> { let expected = &[ "SortPreservingMergeExec: [a@0 ASC]", - "SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", - "FilterExec: c@2 = 0", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", - "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", + " FilterExec: c@2 = 0", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", + " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet", ]; assert_optimized!(expected, physical_plan.clone(), true); let expected = &[ "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", - "CoalescePartitionsExec", - "FilterExec: c@2 = 0", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", - "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet", + " CoalescePartitionsExec", + " FilterExec: c@2 = 0", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", + " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet", ]; assert_optimized!(expected, physical_plan, false); @@ -2922,9 +2931,9 @@ fn no_need_for_sort_after_filter() -> Result<()> { // After CoalescePartitionsExec c is still constant. Hence c@2 ASC ordering is already satisfied. "CoalescePartitionsExec", // Since after this stage c is constant. c@2 ASC ordering is already satisfied. - "FilterExec: c@2 = 0", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", - "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", + " FilterExec: c@2 = 0", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", + " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; assert_optimized!(expected, physical_plan.clone(), true); assert_optimized!(expected, physical_plan, false); @@ -2949,21 +2958,21 @@ fn do_not_preserve_ordering_through_repartition2() -> Result<()> { let expected = &[ "SortPreservingMergeExec: [a@0 ASC]", - "SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", - "FilterExec: c@2 = 0", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", - "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", + " FilterExec: c@2 = 0", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", + " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; assert_optimized!(expected, physical_plan.clone(), true); let expected = &[ "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", - "CoalescePartitionsExec", - "SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", - "FilterExec: c@2 = 0", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", - "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", + " CoalescePartitionsExec", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", + " FilterExec: c@2 = 0", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", + " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; assert_optimized!(expected, physical_plan, false); @@ -2982,8 +2991,8 @@ fn do_not_preserve_ordering_through_repartition3() -> Result<()> { let expected = &[ "FilterExec: c@2 = 0", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", - "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", + " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; assert_optimized!(expected, physical_plan.clone(), true); assert_optimized!(expected, physical_plan, false); @@ -3004,8 +3013,8 @@ fn do_not_put_sort_when_input_is_invalid() -> Result<()> { // Ordering requirement of sort required exec is NOT satisfied // by existing ordering at the source. "SortRequiredExec: [a@0 ASC]", - "FilterExec: c@2 = 0", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " FilterExec: c@2 = 0", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; assert_plan_txt!(expected, physical_plan); @@ -3013,9 +3022,9 @@ fn do_not_put_sort_when_input_is_invalid() -> Result<()> { "SortRequiredExec: [a@0 ASC]", // Since at the start of the rule ordering requirement is not satisfied // EnforceDistribution rule doesn't satisfy this requirement either. - "FilterExec: c@2 = 0", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " FilterExec: c@2 = 0", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; let mut config = ConfigOptions::new(); @@ -3042,8 +3051,8 @@ fn put_sort_when_input_is_valid() -> Result<()> { // Ordering requirement of sort required exec is satisfied // by existing ordering at the source. "SortRequiredExec: [a@0 ASC]", - "FilterExec: c@2 = 0", - "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet", + " FilterExec: c@2 = 0", + " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet", ]; assert_plan_txt!(expected, physical_plan); @@ -3051,8 +3060,8 @@ fn put_sort_when_input_is_valid() -> Result<()> { // Since at the start of the rule ordering requirement is satisfied // EnforceDistribution rule satisfy this requirement also. "SortRequiredExec: [a@0 ASC]", - "FilterExec: c@2 = 0", - "DataSourceExec: file_groups={10 groups: [[x:0..20], [y:0..20], [x:20..40], [y:20..40], [x:40..60], [y:40..60], [x:60..80], [y:60..80], [x:80..100], [y:80..100]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet", + " FilterExec: c@2 = 0", + " DataSourceExec: file_groups={10 groups: [[x:0..20], [y:0..20], [x:20..40], [y:20..40], [x:40..60], [y:40..60], [x:60..80], [y:60..80], [x:80..100], [y:80..100]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet", ]; let mut config = ConfigOptions::new(); @@ -3078,8 +3087,8 @@ fn do_not_add_unnecessary_hash() -> Result<()> { let expected = &[ "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]", - "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", + " AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; // Make sure target partition number is 1. In this case hash repartition is unnecessary assert_optimized!(expected, physical_plan.clone(), true, false, 1, false, 1024); @@ -3104,12 +3113,12 @@ fn do_not_add_unnecessary_hash2() -> Result<()> { "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]", // Since hash requirements of this operator is satisfied. There shouldn't be // a hash repartition here - "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]", - "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]", - "RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4", - "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]", - "RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2", - "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", + " AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]", + " AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]", + " RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4", + " AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]", + " RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2", + " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; // Make sure target partition number is larger than 2 (e.g partition number at the source). assert_optimized!(expected, physical_plan.clone(), true, false, 4, false, 1024); @@ -3153,9 +3162,9 @@ fn optimize_away_unnecessary_repartition2() -> Result<()> { let expected = &[ "FilterExec: c@2 = 0", - "FilterExec: c@2 = 0", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " FilterExec: c@2 = 0", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; assert_optimized!(expected, physical_plan.clone(), true); assert_optimized!(expected, physical_plan, false);