Skip to content

Commit

Permalink
refactor: have sort pushdown use transform_down, and provide minor re…
Browse files Browse the repository at this point in the history
…factor in sort_pushdown_helper to make it more understandable
  • Loading branch information
wiedld committed Feb 22, 2025
1 parent e03f9f6 commit 765d91b
Showing 1 changed file with 83 additions and 54 deletions.
137 changes: 83 additions & 54 deletions datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ use crate::utils::{
};

use arrow::datatypes::SchemaRef;
use datafusion_common::tree_node::{
ConcreteTreeNode, Transformed, TreeNode, TreeNodeRecursion,
};
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{plan_err, HashSet, JoinSide, Result};
use datafusion_expr::JoinType;
use datafusion_physical_expr::expressions::Column;
Expand Down Expand Up @@ -72,16 +70,18 @@ pub fn assign_initial_requirements(node: &mut SortPushDown) {
}

pub fn pushdown_sorts(sort_pushdown: SortPushDown) -> Result<SortPushDown> {
let mut new_node = pushdown_sorts_helper(sort_pushdown)?;
while new_node.tnr == TreeNodeRecursion::Stop {
new_node = pushdown_sorts_helper(new_node.data)?;
sort_pushdown
.transform_down(pushdown_sorts_helper)
.map(|transformed| transformed.data)
}

fn min_fetch(f1: Option<usize>, f2: Option<usize>) -> Option<usize> {
match (f1, f2) {
(Some(f1), Some(f2)) => Some(f1.min(f2)),
(Some(_), _) => f1,
(_, Some(_)) => f2,
_ => None,
}
let (new_node, children) = new_node.data.take_children();
let new_children = children
.into_iter()
.map(pushdown_sorts)
.collect::<Result<_>>()?;
new_node.with_new_children(new_children)
}

fn pushdown_sorts_helper(
Expand All @@ -98,66 +98,91 @@ fn pushdown_sorts_helper(
.ordering_satisfy_requirement(&parent_reqs);

if is_sort(plan) {
let sort_fetch = plan.fetch();
let required_ordering = plan
let current_sort_fetch = plan.fetch();
let parent_req_fetch = requirements.data.fetch;

let child_reqs = plan
.output_ordering()
.cloned()
.map(LexRequirement::from)
.unwrap_or_default();
if !satisfy_parent {
// Make sure this `SortExec` satisfies parent requirements:
let sort_reqs = requirements.data.ordering_requirement.unwrap_or_default();
// It's possible current plan (`SortExec`) has a fetch value.
// And if both of them have fetch values, we should use the minimum one.
if let Some(fetch) = sort_fetch {
if let Some(requirement_fetch) = requirements.data.fetch {
requirements.data.fetch = Some(fetch.min(requirement_fetch));
}
}
let fetch = requirements.data.fetch.or(sort_fetch);
let parent_is_stricter = plan
.equivalence_properties()
.requirements_compatible(&parent_reqs, &child_reqs);
let child_is_stricter = plan
.equivalence_properties()
.requirements_compatible(&child_reqs, &parent_reqs);

if !satisfy_parent && !parent_is_stricter {
// This new sort has different requirements than the ordering being pushed down.
// 1. add a `SortExec` here for the pushed down ordering (parent reqs).
// 2. continue sort pushdown, but with the new ordering of the new sort.

// remove current sort (which will be the new ordering to pushdown)
let new_reqs = child_reqs;
requirements = requirements.children.swap_remove(0);
requirements = add_sort_above(requirements, sort_reqs, fetch);
};

// We can safely get the 0th index as we are dealing with a `SortExec`.
let mut child = requirements.children.swap_remove(0);
if let Some(adjusted) =
pushdown_requirement_to_children(&child.plan, &required_ordering)?
{
let fetch = sort_fetch.or_else(|| child.plan.fetch());
for (grand_child, order) in child.children.iter_mut().zip(adjusted) {
grand_child.data = ParentRequirements {
ordering_requirement: order,
fetch,
// add back sort exec matching parent
requirements = add_sort_above(requirements, parent_reqs, parent_req_fetch);

// If we have totally orthogonal sort, (2 different sorts in a row), that means the child sort
// gets immdiately re-sorted.
// e.g. Sort col1 ASC
// Sort col1 DESC
//
// Remove this redundant sort by not pushing down.
let is_orthogonal_sort =
!satisfy_parent && !parent_is_stricter && !child_is_stricter;

// make pushdown requirements be the new ones.
if !is_orthogonal_sort || current_sort_fetch.is_some() {
requirements.children[0].data = ParentRequirements {
ordering_requirement: Some(new_reqs),
fetch: current_sort_fetch,
};
}
// Can push down requirements
child.data = ParentRequirements {
ordering_requirement: Some(required_ordering),
fetch,
};

return Ok(Transformed {
data: child,
transformed: true,
tnr: TreeNodeRecursion::Stop,
});
} else {
// Can not push down requirements
requirements.children = vec![child];
assign_initial_requirements(&mut requirements);
// Don't add a SortExec
// Do update what sort requirements to keep pushing down

// remove current sort, and get the sort's child
requirements = requirements.children.swap_remove(0);
requirements = requirements.update_plan_from_children()?;

// set the stricter fetch
requirements.data.fetch = min_fetch(current_sort_fetch, parent_req_fetch);

// set the stricter ordering
if child_is_stricter {
requirements.data.ordering_requirement = Some(child_reqs);
} else {
requirements.data.ordering_requirement = Some(parent_reqs);
}

// recursive call to helper, so it doesn't transform_down and miss the new node (previous child of sort)
return pushdown_sorts_helper(requirements);
}
} else if satisfy_parent && parent_reqs.is_empty() {
// Nothing to do.
return Ok(Transformed::no(requirements));
} else if satisfy_parent {
// For non-sort operators, immediately return if parent requirements are met:
// For non-sort operators which satisfy ordering:
let reqs = plan.required_input_ordering();
let parent_req_fetch = requirements.data.fetch;

for (child, order) in requirements.children.iter_mut().zip(reqs) {
child.data.ordering_requirement = order;
child.data.fetch = min_fetch(parent_req_fetch, child.data.fetch);
}
} else if let Some(adjusted) = pushdown_requirement_to_children(plan, &parent_reqs)? {
// Can not satisfy the parent requirements, check whether we can push
// requirements down:
// For operators that can take a sort pushdown.

// Continue pushdown, with updated requirements:
let parent_fetch = requirements.data.fetch;
let current_fetch = plan.fetch();
for (child, order) in requirements.children.iter_mut().zip(adjusted) {
child.data.ordering_requirement = order;
child.data.fetch = min_fetch(current_fetch, parent_fetch);
}
requirements.data.ordering_requirement = None;
} else {
Expand All @@ -171,9 +196,13 @@ fn pushdown_sorts_helper(
requirements = add_sort_above(requirements, sort_reqs, fetch);
assign_initial_requirements(&mut requirements);
}

requirements = requirements.update_plan_from_children()?;
Ok(Transformed::yes(requirements))
}

/// Calculate the pushdown ordering requirements for children.
/// If sort cannot be pushed down, return None.
fn pushdown_requirement_to_children(
plan: &Arc<dyn ExecutionPlan>,
parent_required: &LexRequirement,
Expand Down

0 comments on commit 765d91b

Please sign in to comment.