Skip to content

Commit

Permalink
fix: Fix deadlock in cache + hconcat (#21640)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Mar 7, 2025
1 parent 40c7019 commit 52b93ef
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 65 deletions.
17 changes: 15 additions & 2 deletions crates/polars-mem-engine/src/executors/merge_sorted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub(crate) struct MergeSorted {
pub(crate) input_left: Box<dyn Executor>,
pub(crate) input_right: Box<dyn Executor>,
pub(crate) key: PlSmallStr,
pub(crate) parallel: bool,
}

impl Executor for MergeSorted {
Expand All @@ -17,8 +18,20 @@ impl Executor for MergeSorted {
eprintln!("run MergeSorted")
}
}
let left = self.input_left.execute(state)?;
let right = self.input_right.execute(state)?;
let (left, right) = if self.parallel {
let mut state2 = state.split();
state2.branch_idx += 1;
let (left, right) = POOL.join(
|| self.input_left.execute(state),
|| self.input_right.execute(&mut state2),
);
(left?, right?)
} else {
(
self.input_left.execute(state)?,
self.input_right.execute(state)?,
)
};

let profile_name = Cow::Borrowed("Merge Sorted");
state.record(
Expand Down
1 change: 0 additions & 1 deletion crates/polars-mem-engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ mod executors;
mod planner;
mod predicate;
mod prelude;
mod utils;

pub use executors::Executor;
pub use planner::{create_physical_plan, create_scan_predicate};
Expand Down
96 changes: 67 additions & 29 deletions crates/polars-mem-engine/src/planner/lp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use self::python_dsl::PythonScanSource;
use super::super::executors::{self, Executor};
use super::*;
use crate::predicate::PhysicalColumnPredicates;
use crate::utils::*;
use crate::ScanPredicate;

fn partitionable_gb(
Expand Down Expand Up @@ -48,32 +47,43 @@ fn partitionable_gb(
}
}

#[derive(Clone)]
struct ConversionState {
expr_depth: u16,
has_cache: bool,
}

impl ConversionState {
fn new() -> PolarsResult<Self> {
Ok(ConversionState {
expr_depth: get_expr_depth_limit()?,
has_cache: false,
})
}

fn with_new_branch<K, F: FnOnce(&mut Self) -> K>(&mut self, func: F) -> (K, Self) {
let mut new_state = self.clone();
new_state.has_cache = false;
let out = func(&mut new_state);
self.has_cache = new_state.has_cache;
(out, new_state)
}
}

pub fn create_physical_plan(
root: Node,
lp_arena: &mut Arena<IR>,
expr_arena: &mut Arena<AExpr>,
) -> PolarsResult<Box<dyn Executor>> {
let state = ConversionState::new()?;
create_physical_plan_impl(root, lp_arena, expr_arena, &state)
let mut state = ConversionState::new()?;
create_physical_plan_impl(root, lp_arena, expr_arena, &mut state)
}

fn create_physical_plan_impl(
root: Node,
lp_arena: &mut Arena<IR>,
expr_arena: &mut Arena<AExpr>,
state: &ConversionState,
state: &mut ConversionState,
) -> PolarsResult<Box<dyn Executor>> {
use IR::*;

Expand Down Expand Up @@ -136,20 +146,39 @@ fn create_physical_plan_impl(
)
},
},
Union { inputs, options } => {
let inputs = inputs
.into_iter()
.map(|node| create_physical_plan_impl(node, lp_arena, expr_arena, state))
.collect::<PolarsResult<Vec<_>>>()?;
Union {
inputs,
mut options,
} => {
let (inputs, new_state) = state.with_new_branch(|new_state| {
inputs
.into_iter()
.map(|node| create_physical_plan_impl(node, lp_arena, expr_arena, new_state))
.collect::<PolarsResult<Vec<_>>>()
});
if new_state.has_cache {
options.parallel = false
}
let inputs = inputs?;
Ok(Box::new(executors::UnionExec { inputs, options }))
},
HConcat {
inputs, options, ..
inputs,
mut options,
..
} => {
let inputs = inputs
.into_iter()
.map(|node| create_physical_plan_impl(node, lp_arena, expr_arena, state))
.collect::<PolarsResult<Vec<_>>>()?;
let (inputs, new_state) = state.with_new_branch(|new_state| {
inputs
.into_iter()
.map(|node| create_physical_plan_impl(node, lp_arena, expr_arena, new_state))
.collect::<PolarsResult<Vec<_>>>()
});

if new_state.has_cache {
options.parallel = false
}
let inputs = inputs?;

Ok(Box::new(executors::HConcatExec { inputs, options }))
},
Slice { input, offset, len } => {
Expand Down Expand Up @@ -385,6 +414,7 @@ fn create_physical_plan_impl(
cache_hits,
} => {
let input = create_physical_plan_impl(input, lp_arena, expr_arena, state)?;
state.has_cache = true;
Ok(Box::new(executors::CacheExec {
id,
input,
Expand Down Expand Up @@ -503,25 +533,26 @@ fn create_physical_plan_impl(
schema,
..
} => {
let schema_left = lp_arena.get(input_left).schema(lp_arena).into_owned();
let schema_right = lp_arena.get(input_right).schema(lp_arena).into_owned();

let ((input_left, input_right), new_state) = state.with_new_branch(|new_state| {
(
create_physical_plan_impl(input_left, lp_arena, expr_arena, new_state),
create_physical_plan_impl(input_right, lp_arena, expr_arena, new_state),
)
});
let input_left = input_left?;
let input_right = input_right?;

// Todo! remove the force option. It can deadlock.
let parallel = if options.force_parallel {
true
} else if options.allow_parallel {
// check if two DataFrames come from a separate source.
// If they don't we can parallelize,
// we may deadlock if we don't check this
let mut sources_left = PlHashSet::new();
agg_source_paths(input_left, &mut sources_left, lp_arena);
let mut sources_right = PlHashSet::new();
agg_source_paths(input_right, &mut sources_right, lp_arena);
sources_left.intersection(&sources_right).next().is_none()
!new_state.has_cache
} else {
false
};
let schema_left = lp_arena.get(input_left).schema(lp_arena).into_owned();
let schema_right = lp_arena.get(input_right).schema(lp_arena).into_owned();

let input_left = create_physical_plan_impl(input_left, lp_arena, expr_arena, state)?;
let input_right = create_physical_plan_impl(input_right, lp_arena, expr_arena, state)?;

let left_on = create_physical_expressions_from_irs(
&left_on,
Expand Down Expand Up @@ -638,13 +669,20 @@ fn create_physical_plan_impl(
input_right,
key,
} => {
let input_left = create_physical_plan_impl(input_left, lp_arena, expr_arena, state)?;
let input_right = create_physical_plan_impl(input_right, lp_arena, expr_arena, state)?;
let ((input_left, input_right), new_state) = state.with_new_branch(|new_state| {
(
create_physical_plan_impl(input_left, lp_arena, expr_arena, new_state),
create_physical_plan_impl(input_right, lp_arena, expr_arena, new_state),
)
});
let input_left = input_left?;
let input_right = input_right?;

let exec = executors::MergeSorted {
input_left,
input_right,
key,
parallel: new_state.has_cache,
};
Ok(Box::new(exec))
},
Expand Down
29 changes: 0 additions & 29 deletions crates/polars-mem-engine/src/utils.rs

This file was deleted.

4 changes: 0 additions & 4 deletions crates/polars-plan/src/plans/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,6 @@ More information on the new streaming engine: https://github.com/pola-rs/polars/
#[cfg(not(feature = "cse"))]
let comm_subexpr_elim = false;

#[allow(unused_variables)]
let agg_scan_projection =
opt_flags.contains(OptFlags::FILE_CACHING) && !opt_flags.streaming() && !opt_flags.eager();

// During debug we check if the optimizations have not modified the final schema.
#[cfg(debug_assertions)]
let prev_schema = lp_arena.get(lp_top).schema(lp_arena).into_owned();
Expand Down

0 comments on commit 52b93ef

Please sign in to comment.