From 765d91b10867c12afa6972dcef39ffa7b9574a77 Mon Sep 17 00:00:00 2001 From: wiedld Date: Fri, 21 Feb 2025 19:21:26 -0800 Subject: [PATCH] refactor: have sort pushdown use transform_down, and provide minor refactor in sort_pushdown_helper to make it more understandable --- .../src/enforce_sorting/sort_pushdown.rs | 137 +++++++++++------- 1 file changed, 83 insertions(+), 54 deletions(-) diff --git a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs index 13d46940c87c..54a280f04547 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs @@ -23,9 +23,7 @@ use crate::utils::{ }; use arrow::datatypes::SchemaRef; -use datafusion_common::tree_node::{ - ConcreteTreeNode, Transformed, TreeNode, TreeNodeRecursion, -}; +use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_common::{plan_err, HashSet, JoinSide, Result}; use datafusion_expr::JoinType; use datafusion_physical_expr::expressions::Column; @@ -72,16 +70,18 @@ pub fn assign_initial_requirements(node: &mut SortPushDown) { } pub fn pushdown_sorts(sort_pushdown: SortPushDown) -> Result { - let mut new_node = pushdown_sorts_helper(sort_pushdown)?; - while new_node.tnr == TreeNodeRecursion::Stop { - new_node = pushdown_sorts_helper(new_node.data)?; + sort_pushdown + .transform_down(pushdown_sorts_helper) + .map(|transformed| transformed.data) +} + +fn min_fetch(f1: Option, f2: Option) -> Option { + match (f1, f2) { + (Some(f1), Some(f2)) => Some(f1.min(f2)), + (Some(_), _) => f1, + (_, Some(_)) => f2, + _ => None, } - let (new_node, children) = new_node.data.take_children(); - let new_children = children - .into_iter() - .map(pushdown_sorts) - .collect::>()?; - new_node.with_new_children(new_children) } fn pushdown_sorts_helper( @@ -98,66 +98,91 @@ fn pushdown_sorts_helper( .ordering_satisfy_requirement(&parent_reqs); if is_sort(plan) { - let sort_fetch = plan.fetch(); - let required_ordering = plan + let current_sort_fetch = plan.fetch(); + let parent_req_fetch = requirements.data.fetch; + + let child_reqs = plan .output_ordering() .cloned() .map(LexRequirement::from) .unwrap_or_default(); - if !satisfy_parent { - // Make sure this `SortExec` satisfies parent requirements: - let sort_reqs = requirements.data.ordering_requirement.unwrap_or_default(); - // It's possible current plan (`SortExec`) has a fetch value. - // And if both of them have fetch values, we should use the minimum one. - if let Some(fetch) = sort_fetch { - if let Some(requirement_fetch) = requirements.data.fetch { - requirements.data.fetch = Some(fetch.min(requirement_fetch)); - } - } - let fetch = requirements.data.fetch.or(sort_fetch); + let parent_is_stricter = plan + .equivalence_properties() + .requirements_compatible(&parent_reqs, &child_reqs); + let child_is_stricter = plan + .equivalence_properties() + .requirements_compatible(&child_reqs, &parent_reqs); + + if !satisfy_parent && !parent_is_stricter { + // This new sort has different requirements than the ordering being pushed down. + // 1. add a `SortExec` here for the pushed down ordering (parent reqs). + // 2. continue sort pushdown, but with the new ordering of the new sort. + + // remove current sort (which will be the new ordering to pushdown) + let new_reqs = child_reqs; requirements = requirements.children.swap_remove(0); - requirements = add_sort_above(requirements, sort_reqs, fetch); - }; - // We can safely get the 0th index as we are dealing with a `SortExec`. - let mut child = requirements.children.swap_remove(0); - if let Some(adjusted) = - pushdown_requirement_to_children(&child.plan, &required_ordering)? - { - let fetch = sort_fetch.or_else(|| child.plan.fetch()); - for (grand_child, order) in child.children.iter_mut().zip(adjusted) { - grand_child.data = ParentRequirements { - ordering_requirement: order, - fetch, + // add back sort exec matching parent + requirements = add_sort_above(requirements, parent_reqs, parent_req_fetch); + + // If we have totally orthogonal sort, (2 different sorts in a row), that means the child sort + // gets immdiately re-sorted. + // e.g. Sort col1 ASC + // Sort col1 DESC + // + // Remove this redundant sort by not pushing down. + let is_orthogonal_sort = + !satisfy_parent && !parent_is_stricter && !child_is_stricter; + + // make pushdown requirements be the new ones. + if !is_orthogonal_sort || current_sort_fetch.is_some() { + requirements.children[0].data = ParentRequirements { + ordering_requirement: Some(new_reqs), + fetch: current_sort_fetch, }; } - // Can push down requirements - child.data = ParentRequirements { - ordering_requirement: Some(required_ordering), - fetch, - }; - - return Ok(Transformed { - data: child, - transformed: true, - tnr: TreeNodeRecursion::Stop, - }); } else { - // Can not push down requirements - requirements.children = vec![child]; - assign_initial_requirements(&mut requirements); + // Don't add a SortExec + // Do update what sort requirements to keep pushing down + + // remove current sort, and get the sort's child + requirements = requirements.children.swap_remove(0); + requirements = requirements.update_plan_from_children()?; + + // set the stricter fetch + requirements.data.fetch = min_fetch(current_sort_fetch, parent_req_fetch); + + // set the stricter ordering + if child_is_stricter { + requirements.data.ordering_requirement = Some(child_reqs); + } else { + requirements.data.ordering_requirement = Some(parent_reqs); + } + + // recursive call to helper, so it doesn't transform_down and miss the new node (previous child of sort) + return pushdown_sorts_helper(requirements); } + } else if satisfy_parent && parent_reqs.is_empty() { + // Nothing to do. + return Ok(Transformed::no(requirements)); } else if satisfy_parent { - // For non-sort operators, immediately return if parent requirements are met: + // For non-sort operators which satisfy ordering: let reqs = plan.required_input_ordering(); + let parent_req_fetch = requirements.data.fetch; + for (child, order) in requirements.children.iter_mut().zip(reqs) { child.data.ordering_requirement = order; + child.data.fetch = min_fetch(parent_req_fetch, child.data.fetch); } } else if let Some(adjusted) = pushdown_requirement_to_children(plan, &parent_reqs)? { - // Can not satisfy the parent requirements, check whether we can push - // requirements down: + // For operators that can take a sort pushdown. + + // Continue pushdown, with updated requirements: + let parent_fetch = requirements.data.fetch; + let current_fetch = plan.fetch(); for (child, order) in requirements.children.iter_mut().zip(adjusted) { child.data.ordering_requirement = order; + child.data.fetch = min_fetch(current_fetch, parent_fetch); } requirements.data.ordering_requirement = None; } else { @@ -171,9 +196,13 @@ fn pushdown_sorts_helper( requirements = add_sort_above(requirements, sort_reqs, fetch); assign_initial_requirements(&mut requirements); } + + requirements = requirements.update_plan_from_children()?; Ok(Transformed::yes(requirements)) } +/// Calculate the pushdown ordering requirements for children. +/// If sort cannot be pushed down, return None. fn pushdown_requirement_to_children( plan: &Arc, parent_required: &LexRequirement,