diff --git a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs index 420c080f09c2..11f1d8751d83 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs @@ -385,9 +385,6 @@ pub fn parallelize_sorts( pub fn ensure_sorting( mut requirements: PlanWithCorrespondingSort, ) -> Result> { - // Before starting, making requirements' children's ExecutionPlan be same as the requirements' plan's children's ExecutionPlan. - // It should be guaranteed by previous code, but we need to make sure to avoid any potential missing. - requirements = requirements.update_plan_from_children()?; requirements = update_sort_ctx_children_data(requirements, false)?; // Perform naive analysis at the beginning -- remove already-satisfied sorts: @@ -419,7 +416,6 @@ pub fn ensure_sorting( child = update_child_to_remove_unnecessary_sort(idx, child, plan)?; } child = add_sort_above(child, required, None); - child = child.update_plan_from_children()?; child = update_sort_ctx_children_data(child, true)?; } } else if physical_ordering.is_none() @@ -433,25 +429,24 @@ pub fn ensure_sorting( updated_children.push(child); } requirements.children = updated_children; + requirements = requirements.update_plan_from_children()?; // For window expressions, we can remove some sorts when we can // calculate the result in reverse: let child_node = &requirements.children[0]; - if is_window(plan) && child_node.data { + if is_window(&requirements.plan) && child_node.data { return adjust_window_sort_removal(requirements).map(Transformed::yes); - } else if is_sort_preserving_merge(plan) + } else if is_sort_preserving_merge(&requirements.plan) && child_node.plan.output_partitioning().partition_count() <= 1 { // This `SortPreservingMergeExec` is unnecessary, input already has a // single partition and no fetch is required. let mut child_node = requirements.children.swap_remove(0); - if let Some(fetch) = plan.fetch() { - // Add the limit exec if the spm has a fetch + if let Some(fetch) = requirements.plan.fetch() { + // Add the limit exec if the original SPM had a fetch: child_node.plan = Arc::new(LocalLimitExec::new(Arc::clone(&child_node.plan), fetch)); } return Ok(Transformed::yes(child_node)); - } else { - requirements = requirements.update_plan_from_children()?; } update_sort_ctx_children_data(requirements, false).map(Transformed::yes) } @@ -712,7 +707,6 @@ fn remove_corresponding_sort_from_sub_plan( Arc::new(CoalescePartitionsExec::new(plan)) as _ }; node = PlanWithCorrespondingSort::new(plan, false, vec![node]); - node = node.update_plan_from_children()?; node = update_sort_ctx_children_data(node, false)?; } Ok(node)