Skip to content

Commit

Permalink
refactor: added test cases for orthogonal sorting, and remove 1 unnee…
Browse files Browse the repository at this point in the history
…ded conditional
  • Loading branch information
wiedld committed Mar 7, 2025
1 parent 820e08b commit 1c608e4
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 20 deletions.
63 changes: 59 additions & 4 deletions datafusion/core/tests/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ use crate::physical_optimizer::test_utils::{
aggregate_exec, bounded_window_exec, check_integrity, coalesce_batches_exec,
coalesce_partitions_exec, create_test_schema, create_test_schema2,
create_test_schema3, filter_exec, global_limit_exec, hash_join_exec, limit_exec,
local_limit_exec, memory_exec, parquet_exec, repartition_exec, sort_exec, sort_expr,
sort_expr_options, sort_merge_join_exec, sort_preserving_merge_exec,
sort_preserving_merge_exec_with_fetch, spr_repartition_exec, stream_exec_ordered,
union_exec, RequirementsTestExec,
local_limit_exec, memory_exec, parquet_exec, repartition_exec, sort_exec,
sort_exec_with_fetch, sort_expr, sort_expr_options, sort_merge_join_exec,
sort_preserving_merge_exec, sort_preserving_merge_exec_with_fetch,
spr_repartition_exec, stream_exec_ordered, union_exec, RequirementsTestExec,
};

use arrow::compute::SortOptions;
Expand Down Expand Up @@ -3346,3 +3346,58 @@ async fn test_window_partial_constant_and_set_monotonicity() -> Result<()> {

Ok(())
}

#[test]
fn test_removes_unused_orthogonal_sort() -> Result<()> {
let schema = create_test_schema3()?;
let input_sort_exprs = vec![sort_expr("b", &schema), sort_expr("c", &schema)];
let unbounded_input = stream_exec_ordered(&schema, input_sort_exprs.clone());

let orthogonal_sort = sort_exec(vec![sort_expr("a", &schema)], unbounded_input);
let output_sort = sort_exec(input_sort_exprs, orthogonal_sort); // same sort as data source

// Test scenario/input has an orthogonal sort:
let expected_input = [
"SortExec: expr=[b@1 ASC, c@2 ASC], preserve_partitioning=[false]",
" SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
" StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[b@1 ASC, c@2 ASC]"
];
assert_eq!(get_plan_string(&output_sort), expected_input,);

// Test: should remove orthogonal sort, and the uppermost (unneeded) sort:
let expected_optimized = [
"StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[b@1 ASC, c@2 ASC]"
];
assert_optimized!(expected_input, expected_optimized, output_sort, true);

Ok(())
}

#[test]
fn test_keeps_used_orthogonal_sort() -> Result<()> {
let schema = create_test_schema3()?;
let input_sort_exprs = vec![sort_expr("b", &schema), sort_expr("c", &schema)];
let unbounded_input = stream_exec_ordered(&schema, input_sort_exprs.clone());

let orthogonal_sort =
sort_exec_with_fetch(vec![sort_expr("a", &schema)], Some(3), unbounded_input);
let output_sort = sort_exec(input_sort_exprs, orthogonal_sort); // same sort as data source

// Test scenario/input has an orthogonal sort:
let expected_input = [
"SortExec: expr=[b@1 ASC, c@2 ASC], preserve_partitioning=[false]",
" SortExec: TopK(fetch=3), expr=[a@0 ASC], preserve_partitioning=[false]",
" StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[b@1 ASC, c@2 ASC]"
];
assert_eq!(get_plan_string(&output_sort), expected_input,);

// Test: should keep the orthogonal sort, since it modifies the output:
let expected_optimized = [
"SortExec: expr=[b@1 ASC, c@2 ASC], preserve_partitioning=[false]",
" SortExec: TopK(fetch=3), expr=[a@0 ASC], preserve_partitioning=[false]",
" StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[b@1 ASC, c@2 ASC]",
];
assert_optimized!(expected_input, expected_optimized, output_sort, true);

Ok(())
}
10 changes: 9 additions & 1 deletion datafusion/core/tests/physical_optimizer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,9 +295,17 @@ pub fn coalesce_batches_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn Execution
pub fn sort_exec(
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
input: Arc<dyn ExecutionPlan>,
) -> Arc<dyn ExecutionPlan> {
sort_exec_with_fetch(sort_exprs, None, input)
}

pub fn sort_exec_with_fetch(
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
fetch: Option<usize>,
input: Arc<dyn ExecutionPlan>,
) -> Arc<dyn ExecutionPlan> {
let sort_exprs = sort_exprs.into_iter().collect();
Arc::new(SortExec::new(sort_exprs, input))
Arc::new(SortExec::new(sort_exprs, input).with_fetch(fetch))
}

/// A test [`ExecutionPlan`] whose requirements can be configured.
Expand Down
19 changes: 4 additions & 15 deletions datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,22 +127,11 @@ fn pushdown_sorts_helper(
sort_push_down =
add_sort_above(sort_push_down, parent_reqs, parent_req_fetch);

// If we have totally orthogonal sort, (2 different sorts in a row), that means the child sort
// gets immdiately re-sorted.
// e.g. Sort col1 ASC
// Sort col1 DESC
//
// Remove this redundant sort by not pushing down.
let is_orthogonal_sort =
!satisfy_parent && !parent_is_stricter && !current_is_stricter;

// make pushdown requirements be the new ones.
if !is_orthogonal_sort || current_sort_fetch.is_some() {
sort_push_down.children[0].data = ParentRequirements {
ordering_requirement: Some(new_reqs),
fetch: current_sort_fetch,
};
}
sort_push_down.children[0].data = ParentRequirements {
ordering_requirement: Some(new_reqs),
fetch: current_sort_fetch,
};
} else {
// Don't add a SortExec
// Do update what sort requirements to keep pushing down
Expand Down

0 comments on commit 1c608e4

Please sign in to comment.