Skip to content

Commit

Permalink
Minor: Further Clean-up in Enforce Sorting (#14732)
Browse files Browse the repository at this point in the history
* Update mod.rs

* Update mod.rs

* Review

---------

Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com>
  • Loading branch information
berkaysynnada and ozankabak authored Feb 19, 2025
1 parent 8ab0661 commit 2f40f6c
Showing 1 changed file with 5 additions and 11 deletions.
16 changes: 5 additions & 11 deletions datafusion/physical-optimizer/src/enforce_sorting/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,9 +385,6 @@ pub fn parallelize_sorts(
pub fn ensure_sorting(
mut requirements: PlanWithCorrespondingSort,
) -> Result<Transformed<PlanWithCorrespondingSort>> {
// Before starting, making requirements' children's ExecutionPlan be same as the requirements' plan's children's ExecutionPlan.
// It should be guaranteed by previous code, but we need to make sure to avoid any potential missing.
requirements = requirements.update_plan_from_children()?;
requirements = update_sort_ctx_children_data(requirements, false)?;

// Perform naive analysis at the beginning -- remove already-satisfied sorts:
Expand Down Expand Up @@ -419,7 +416,6 @@ pub fn ensure_sorting(
child = update_child_to_remove_unnecessary_sort(idx, child, plan)?;
}
child = add_sort_above(child, required, None);
child = child.update_plan_from_children()?;
child = update_sort_ctx_children_data(child, true)?;
}
} else if physical_ordering.is_none()
Expand All @@ -433,25 +429,24 @@ pub fn ensure_sorting(
updated_children.push(child);
}
requirements.children = updated_children;
requirements = requirements.update_plan_from_children()?;
// For window expressions, we can remove some sorts when we can
// calculate the result in reverse:
let child_node = &requirements.children[0];
if is_window(plan) && child_node.data {
if is_window(&requirements.plan) && child_node.data {
return adjust_window_sort_removal(requirements).map(Transformed::yes);
} else if is_sort_preserving_merge(plan)
} else if is_sort_preserving_merge(&requirements.plan)
&& child_node.plan.output_partitioning().partition_count() <= 1
{
// This `SortPreservingMergeExec` is unnecessary, input already has a
// single partition and no fetch is required.
let mut child_node = requirements.children.swap_remove(0);
if let Some(fetch) = plan.fetch() {
// Add the limit exec if the spm has a fetch
if let Some(fetch) = requirements.plan.fetch() {
// Add the limit exec if the original SPM had a fetch:
child_node.plan =
Arc::new(LocalLimitExec::new(Arc::clone(&child_node.plan), fetch));
}
return Ok(Transformed::yes(child_node));
} else {
requirements = requirements.update_plan_from_children()?;
}
update_sort_ctx_children_data(requirements, false).map(Transformed::yes)
}
Expand Down Expand Up @@ -712,7 +707,6 @@ fn remove_corresponding_sort_from_sub_plan(
Arc::new(CoalescePartitionsExec::new(plan)) as _
};
node = PlanWithCorrespondingSort::new(plan, false, vec![node]);
node = node.update_plan_from_children()?;
node = update_sort_ctx_children_data(node, false)?;
}
Ok(node)
Expand Down

0 comments on commit 2f40f6c

Please sign in to comment.