Skip to content

Commit

Permalink
Use consistent variable naming
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Feb 25, 2025
1 parent bcdebc1 commit a3b6813
Showing 1 changed file with 31 additions and 30 deletions.
61 changes: 31 additions & 30 deletions datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ pub struct ParentRequirements {
pub type SortPushDown = PlanContext<ParentRequirements>;

/// Assigns the ordering requirement of the root node to the its children.
pub fn assign_initial_requirements(node: &mut SortPushDown) {
let reqs = node.plan.required_input_ordering();
for (child, requirement) in node.children.iter_mut().zip(reqs) {
pub fn assign_initial_requirements(sort_push_down: &mut SortPushDown) {
let reqs = sort_push_down.plan.required_input_ordering();
for (child, requirement) in sort_push_down.children.iter_mut().zip(reqs) {
child.data = ParentRequirements {
ordering_requirement: requirement,
// If the parent has a fetch value, assign it to the children
Expand All @@ -69,8 +69,8 @@ pub fn assign_initial_requirements(node: &mut SortPushDown) {
}
}

pub fn pushdown_sorts(sort_pushdown: SortPushDown) -> Result<SortPushDown> {
sort_pushdown
pub fn pushdown_sorts(sort_push_down: SortPushDown) -> Result<SortPushDown> {
sort_push_down
.transform_down(pushdown_sorts_helper)
.map(|transformed| transformed.data)
}
Expand All @@ -85,10 +85,10 @@ fn min_fetch(f1: Option<usize>, f2: Option<usize>) -> Option<usize> {
}

fn pushdown_sorts_helper(
mut requirements: SortPushDown,
mut sort_push_down: SortPushDown,
) -> Result<Transformed<SortPushDown>> {
let plan = &requirements.plan;
let parent_reqs = requirements
let plan = &sort_push_down.plan;
let parent_reqs = sort_push_down
.data
.ordering_requirement
.clone()
Expand All @@ -99,7 +99,7 @@ fn pushdown_sorts_helper(

if is_sort(plan) {
let current_sort_fetch = plan.fetch();
let parent_req_fetch = requirements.data.fetch;
let parent_req_fetch = sort_push_down.data.fetch;

let child_reqs = plan
.output_ordering()
Expand All @@ -120,10 +120,11 @@ fn pushdown_sorts_helper(

// remove current sort (which will be the new ordering to pushdown)
let new_reqs = child_reqs;
requirements = requirements.children.swap_remove(0);
sort_push_down = sort_push_down.children.swap_remove(0);

// add back sort exec matching parent
requirements = add_sort_above(requirements, parent_reqs, parent_req_fetch);
sort_push_down =
add_sort_above(sort_push_down, parent_reqs, parent_req_fetch);

// If we have totally orthogonal sort, (2 different sorts in a row), that means the child sort
// gets immdiately re-sorted.
Expand All @@ -136,7 +137,7 @@ fn pushdown_sorts_helper(

// make pushdown requirements be the new ones.
if !is_orthogonal_sort || current_sort_fetch.is_some() {
requirements.children[0].data = ParentRequirements {
sort_push_down.children[0].data = ParentRequirements {
ordering_requirement: Some(new_reqs),
fetch: current_sort_fetch,
};
Expand All @@ -146,59 +147,59 @@ fn pushdown_sorts_helper(
// Do update what sort requirements to keep pushing down

// remove current sort, and get the sort's child
requirements = requirements.children.swap_remove(0);
requirements = requirements.update_plan_from_children()?;
sort_push_down = sort_push_down.children.swap_remove(0);
sort_push_down = sort_push_down.update_plan_from_children()?;

// set the stricter fetch
requirements.data.fetch = min_fetch(current_sort_fetch, parent_req_fetch);
sort_push_down.data.fetch = min_fetch(current_sort_fetch, parent_req_fetch);

// set the stricter ordering
if child_is_stricter {
requirements.data.ordering_requirement = Some(child_reqs);
sort_push_down.data.ordering_requirement = Some(child_reqs);
} else {
requirements.data.ordering_requirement = Some(parent_reqs);
sort_push_down.data.ordering_requirement = Some(parent_reqs);
}

// recursive call to helper, so it doesn't transform_down and miss the new node (previous child of sort)
return pushdown_sorts_helper(requirements);
return pushdown_sorts_helper(sort_push_down);
}
} else if satisfy_parent && parent_reqs.is_empty() {
// Nothing to do.
return Ok(Transformed::no(requirements));
return Ok(Transformed::no(sort_push_down));
} else if satisfy_parent {
// For non-sort operators which satisfy ordering:
let reqs = plan.required_input_ordering();
let parent_req_fetch = requirements.data.fetch;
let parent_req_fetch = sort_push_down.data.fetch;

for (child, order) in requirements.children.iter_mut().zip(reqs) {
for (child, order) in sort_push_down.children.iter_mut().zip(reqs) {
child.data.ordering_requirement = order;
child.data.fetch = min_fetch(parent_req_fetch, child.data.fetch);
}
} else if let Some(adjusted) = pushdown_requirement_to_children(plan, &parent_reqs)? {
// For operators that can take a sort pushdown.

// Continue pushdown, with updated requirements:
let parent_fetch = requirements.data.fetch;
let parent_fetch = sort_push_down.data.fetch;
let current_fetch = plan.fetch();
for (child, order) in requirements.children.iter_mut().zip(adjusted) {
for (child, order) in sort_push_down.children.iter_mut().zip(adjusted) {
child.data.ordering_requirement = order;
child.data.fetch = min_fetch(current_fetch, parent_fetch);
}
requirements.data.ordering_requirement = None;
sort_push_down.data.ordering_requirement = None;
} else {
// Can not push down requirements, add new `SortExec`:
let sort_reqs = requirements
let sort_reqs = sort_push_down
.data
.ordering_requirement
.clone()
.unwrap_or_default();
let fetch = requirements.data.fetch;
requirements = add_sort_above(requirements, sort_reqs, fetch);
assign_initial_requirements(&mut requirements);
let fetch = sort_push_down.data.fetch;
sort_push_down = add_sort_above(sort_push_down, sort_reqs, fetch);
assign_initial_requirements(&mut sort_push_down);
}

requirements = requirements.update_plan_from_children()?;
Ok(Transformed::yes(requirements))
sort_push_down = sort_push_down.update_plan_from_children()?;
Ok(Transformed::yes(sort_push_down))
}

/// Calculate the pushdown ordering requirements for children.
Expand Down

0 comments on commit a3b6813

Please sign in to comment.