From 1c608e41615ff992a706b4b18ee19cb59bb9b4a7 Mon Sep 17 00:00:00 2001 From: wiedld Date: Fri, 7 Mar 2025 11:44:23 -0800 Subject: [PATCH] refactor: added test cases for orthogonal sorting, and remove 1 unneeded conditional --- .../physical_optimizer/enforce_sorting.rs | 63 +++++++++++++++++-- .../tests/physical_optimizer/test_utils.rs | 10 ++- .../src/enforce_sorting/sort_pushdown.rs | 19 ++---- 3 files changed, 72 insertions(+), 20 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs index 4ed6e4d393e48..c8b535ccdc94a 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs @@ -21,10 +21,10 @@ use crate::physical_optimizer::test_utils::{ aggregate_exec, bounded_window_exec, check_integrity, coalesce_batches_exec, coalesce_partitions_exec, create_test_schema, create_test_schema2, create_test_schema3, filter_exec, global_limit_exec, hash_join_exec, limit_exec, - local_limit_exec, memory_exec, parquet_exec, repartition_exec, sort_exec, sort_expr, - sort_expr_options, sort_merge_join_exec, sort_preserving_merge_exec, - sort_preserving_merge_exec_with_fetch, spr_repartition_exec, stream_exec_ordered, - union_exec, RequirementsTestExec, + local_limit_exec, memory_exec, parquet_exec, repartition_exec, sort_exec, + sort_exec_with_fetch, sort_expr, sort_expr_options, sort_merge_join_exec, + sort_preserving_merge_exec, sort_preserving_merge_exec_with_fetch, + spr_repartition_exec, stream_exec_ordered, union_exec, RequirementsTestExec, }; use arrow::compute::SortOptions; @@ -3346,3 +3346,58 @@ async fn test_window_partial_constant_and_set_monotonicity() -> Result<()> { Ok(()) } + +#[test] +fn test_removes_unused_orthogonal_sort() -> Result<()> { + let schema = create_test_schema3()?; + let input_sort_exprs = vec![sort_expr("b", &schema), sort_expr("c", &schema)]; + let unbounded_input = stream_exec_ordered(&schema, input_sort_exprs.clone()); + + let orthogonal_sort = sort_exec(vec![sort_expr("a", &schema)], unbounded_input); + let output_sort = sort_exec(input_sort_exprs, orthogonal_sort); // same sort as data source + + // Test scenario/input has an orthogonal sort: + let expected_input = [ + "SortExec: expr=[b@1 ASC, c@2 ASC], preserve_partitioning=[false]", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[b@1 ASC, c@2 ASC]" + ]; + assert_eq!(get_plan_string(&output_sort), expected_input,); + + // Test: should remove orthogonal sort, and the uppermost (unneeded) sort: + let expected_optimized = [ + "StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[b@1 ASC, c@2 ASC]" + ]; + assert_optimized!(expected_input, expected_optimized, output_sort, true); + + Ok(()) +} + +#[test] +fn test_keeps_used_orthogonal_sort() -> Result<()> { + let schema = create_test_schema3()?; + let input_sort_exprs = vec![sort_expr("b", &schema), sort_expr("c", &schema)]; + let unbounded_input = stream_exec_ordered(&schema, input_sort_exprs.clone()); + + let orthogonal_sort = + sort_exec_with_fetch(vec![sort_expr("a", &schema)], Some(3), unbounded_input); + let output_sort = sort_exec(input_sort_exprs, orthogonal_sort); // same sort as data source + + // Test scenario/input has an orthogonal sort: + let expected_input = [ + "SortExec: expr=[b@1 ASC, c@2 ASC], preserve_partitioning=[false]", + " SortExec: TopK(fetch=3), expr=[a@0 ASC], preserve_partitioning=[false]", + " StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[b@1 ASC, c@2 ASC]" + ]; + assert_eq!(get_plan_string(&output_sort), expected_input,); + + // Test: should keep the orthogonal sort, since it modifies the output: + let expected_optimized = [ + "SortExec: expr=[b@1 ASC, c@2 ASC], preserve_partitioning=[false]", + " SortExec: TopK(fetch=3), expr=[a@0 ASC], preserve_partitioning=[false]", + " StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[b@1 ASC, c@2 ASC]", + ]; + assert_optimized!(expected_input, expected_optimized, output_sort, true); + + Ok(()) +} diff --git a/datafusion/core/tests/physical_optimizer/test_utils.rs b/datafusion/core/tests/physical_optimizer/test_utils.rs index 1b8e754ee3570..99a75e6e5067d 100644 --- a/datafusion/core/tests/physical_optimizer/test_utils.rs +++ b/datafusion/core/tests/physical_optimizer/test_utils.rs @@ -295,9 +295,17 @@ pub fn coalesce_batches_exec(input: Arc) -> Arc, input: Arc, +) -> Arc { + sort_exec_with_fetch(sort_exprs, None, input) +} + +pub fn sort_exec_with_fetch( + sort_exprs: impl IntoIterator, + fetch: Option, + input: Arc, ) -> Arc { let sort_exprs = sort_exprs.into_iter().collect(); - Arc::new(SortExec::new(sort_exprs, input)) + Arc::new(SortExec::new(sort_exprs, input).with_fetch(fetch)) } /// A test [`ExecutionPlan`] whose requirements can be configured. diff --git a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs index 645b6acf8298c..099aa8ffb94e3 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs @@ -127,22 +127,11 @@ fn pushdown_sorts_helper( sort_push_down = add_sort_above(sort_push_down, parent_reqs, parent_req_fetch); - // If we have totally orthogonal sort, (2 different sorts in a row), that means the child sort - // gets immdiately re-sorted. - // e.g. Sort col1 ASC - // Sort col1 DESC - // - // Remove this redundant sort by not pushing down. - let is_orthogonal_sort = - !satisfy_parent && !parent_is_stricter && !current_is_stricter; - // make pushdown requirements be the new ones. - if !is_orthogonal_sort || current_sort_fetch.is_some() { - sort_push_down.children[0].data = ParentRequirements { - ordering_requirement: Some(new_reqs), - fetch: current_sort_fetch, - }; - } + sort_push_down.children[0].data = ParentRequirements { + ordering_requirement: Some(new_reqs), + fetch: current_sort_fetch, + }; } else { // Don't add a SortExec // Do update what sort requirements to keep pushing down