From 1303b456f4fa5800a3e30cd5d8767587d8e34dce Mon Sep 17 00:00:00 2001 From: ritchie Date: Fri, 7 Mar 2025 09:38:41 +0100 Subject: [PATCH] fix: Fix deadlock in cache + hconcat --- .../src/executors/merge_sorted.rs | 17 +++- crates/polars-mem-engine/src/lib.rs | 1 - crates/polars-mem-engine/src/planner/lp.rs | 96 +++++++++++++------ crates/polars-mem-engine/src/utils.rs | 29 ------ crates/polars-plan/src/plans/optimizer/mod.rs | 4 - 5 files changed, 82 insertions(+), 65 deletions(-) delete mode 100644 crates/polars-mem-engine/src/utils.rs diff --git a/crates/polars-mem-engine/src/executors/merge_sorted.rs b/crates/polars-mem-engine/src/executors/merge_sorted.rs index 229ab4c20821..a70aae6888e1 100644 --- a/crates/polars-mem-engine/src/executors/merge_sorted.rs +++ b/crates/polars-mem-engine/src/executors/merge_sorted.rs @@ -6,6 +6,7 @@ pub(crate) struct MergeSorted { pub(crate) input_left: Box, pub(crate) input_right: Box, pub(crate) key: PlSmallStr, + pub(crate) parallel: bool, } impl Executor for MergeSorted { @@ -17,8 +18,20 @@ impl Executor for MergeSorted { eprintln!("run MergeSorted") } } - let left = self.input_left.execute(state)?; - let right = self.input_right.execute(state)?; + let (left, right) = if self.parallel { + let mut state2 = state.split(); + state2.branch_idx += 1; + let (left, right) = POOL.join( + || self.input_left.execute(state), + || self.input_right.execute(&mut state2), + ); + (left?, right?) + } else { + ( + self.input_left.execute(state)?, + self.input_right.execute(state)?, + ) + }; let profile_name = Cow::Borrowed("Merge Sorted"); state.record( diff --git a/crates/polars-mem-engine/src/lib.rs b/crates/polars-mem-engine/src/lib.rs index 8ab1c2e59ff0..4de7170c78d8 100644 --- a/crates/polars-mem-engine/src/lib.rs +++ b/crates/polars-mem-engine/src/lib.rs @@ -2,7 +2,6 @@ mod executors; mod planner; mod predicate; mod prelude; -mod utils; pub use executors::Executor; pub use planner::{create_physical_plan, create_scan_predicate}; diff --git a/crates/polars-mem-engine/src/planner/lp.rs b/crates/polars-mem-engine/src/planner/lp.rs index 22e80314b76c..57759baffa42 100644 --- a/crates/polars-mem-engine/src/planner/lp.rs +++ b/crates/polars-mem-engine/src/planner/lp.rs @@ -12,7 +12,6 @@ use self::python_dsl::PythonScanSource; use super::super::executors::{self, Executor}; use super::*; use crate::predicate::PhysicalColumnPredicates; -use crate::utils::*; use crate::ScanPredicate; fn partitionable_gb( @@ -48,16 +47,27 @@ fn partitionable_gb( } } +#[derive(Clone)] struct ConversionState { expr_depth: u16, + has_cache: bool, } impl ConversionState { fn new() -> PolarsResult { Ok(ConversionState { expr_depth: get_expr_depth_limit()?, + has_cache: false, }) } + + fn with_new_branch K>(&mut self, func: F) -> (K, Self) { + let mut new_state = self.clone(); + new_state.has_cache = false; + let out = func(&mut new_state); + self.has_cache = new_state.has_cache; + (out, new_state) + } } pub fn create_physical_plan( @@ -65,15 +75,15 @@ pub fn create_physical_plan( lp_arena: &mut Arena, expr_arena: &mut Arena, ) -> PolarsResult> { - let state = ConversionState::new()?; - create_physical_plan_impl(root, lp_arena, expr_arena, &state) + let mut state = ConversionState::new()?; + create_physical_plan_impl(root, lp_arena, expr_arena, &mut state) } fn create_physical_plan_impl( root: Node, lp_arena: &mut Arena, expr_arena: &mut Arena, - state: &ConversionState, + state: &mut ConversionState, ) -> PolarsResult> { use IR::*; @@ -136,20 +146,39 @@ fn create_physical_plan_impl( ) }, }, - Union { inputs, options } => { - let inputs = inputs - .into_iter() - .map(|node| create_physical_plan_impl(node, lp_arena, expr_arena, state)) - .collect::>>()?; + Union { + inputs, + mut options, + } => { + let (inputs, new_state) = state.with_new_branch(|new_state| { + inputs + .into_iter() + .map(|node| create_physical_plan_impl(node, lp_arena, expr_arena, new_state)) + .collect::>>() + }); + if new_state.has_cache { + options.parallel = false + } + let inputs = inputs?; Ok(Box::new(executors::UnionExec { inputs, options })) }, HConcat { - inputs, options, .. + inputs, + mut options, + .. } => { - let inputs = inputs - .into_iter() - .map(|node| create_physical_plan_impl(node, lp_arena, expr_arena, state)) - .collect::>>()?; + let (inputs, new_state) = state.with_new_branch(|new_state| { + inputs + .into_iter() + .map(|node| create_physical_plan_impl(node, lp_arena, expr_arena, new_state)) + .collect::>>() + }); + + if new_state.has_cache { + options.parallel = false + } + let inputs = inputs?; + Ok(Box::new(executors::HConcatExec { inputs, options })) }, Slice { input, offset, len } => { @@ -385,6 +414,7 @@ fn create_physical_plan_impl( cache_hits, } => { let input = create_physical_plan_impl(input, lp_arena, expr_arena, state)?; + state.has_cache = true; Ok(Box::new(executors::CacheExec { id, input, @@ -503,25 +533,26 @@ fn create_physical_plan_impl( schema, .. } => { + let schema_left = lp_arena.get(input_left).schema(lp_arena).into_owned(); + let schema_right = lp_arena.get(input_right).schema(lp_arena).into_owned(); + + let ((input_left, input_right), new_state) = state.with_new_branch(|new_state| { + ( + create_physical_plan_impl(input_left, lp_arena, expr_arena, new_state), + create_physical_plan_impl(input_right, lp_arena, expr_arena, new_state), + ) + }); + let input_left = input_left?; + let input_right = input_right?; + + // Todo! remove the force option. It can deadlock. let parallel = if options.force_parallel { true } else if options.allow_parallel { - // check if two DataFrames come from a separate source. - // If they don't we can parallelize, - // we may deadlock if we don't check this - let mut sources_left = PlHashSet::new(); - agg_source_paths(input_left, &mut sources_left, lp_arena); - let mut sources_right = PlHashSet::new(); - agg_source_paths(input_right, &mut sources_right, lp_arena); - sources_left.intersection(&sources_right).next().is_none() + !new_state.has_cache } else { false }; - let schema_left = lp_arena.get(input_left).schema(lp_arena).into_owned(); - let schema_right = lp_arena.get(input_right).schema(lp_arena).into_owned(); - - let input_left = create_physical_plan_impl(input_left, lp_arena, expr_arena, state)?; - let input_right = create_physical_plan_impl(input_right, lp_arena, expr_arena, state)?; let left_on = create_physical_expressions_from_irs( &left_on, @@ -638,13 +669,20 @@ fn create_physical_plan_impl( input_right, key, } => { - let input_left = create_physical_plan_impl(input_left, lp_arena, expr_arena, state)?; - let input_right = create_physical_plan_impl(input_right, lp_arena, expr_arena, state)?; + let ((input_left, input_right), new_state) = state.with_new_branch(|new_state| { + ( + create_physical_plan_impl(input_left, lp_arena, expr_arena, new_state), + create_physical_plan_impl(input_right, lp_arena, expr_arena, new_state), + ) + }); + let input_left = input_left?; + let input_right = input_right?; let exec = executors::MergeSorted { input_left, input_right, key, + parallel: new_state.has_cache, }; Ok(Box::new(exec)) }, diff --git a/crates/polars-mem-engine/src/utils.rs b/crates/polars-mem-engine/src/utils.rs deleted file mode 100644 index 71542a1dfac3..000000000000 --- a/crates/polars-mem-engine/src/utils.rs +++ /dev/null @@ -1,29 +0,0 @@ -use std::path::Path; - -use polars_plan::dsl::ScanSources; -pub(crate) use polars_plan::plans::ArenaLpIter; -use polars_plan::plans::IR; -use polars_utils::aliases::PlHashSet; -use polars_utils::arena::{Arena, Node}; - -/// Get a set of the data source paths in this LogicalPlan -/// -/// # Notes -/// -/// - Scan sources with opened files or in-memory buffers are ignored. -pub(crate) fn agg_source_paths<'a>( - root_lp: Node, - acc_paths: &mut PlHashSet<&'a Path>, - lp_arena: &'a Arena, -) { - for (_, lp) in lp_arena.iter(root_lp) { - if let IR::Scan { sources, .. } = lp { - match sources { - ScanSources::Paths(paths) => acc_paths.extend(paths.iter().map(|p| p.as_path())), - ScanSources::Buffers(_) | ScanSources::Files(_) => { - // Ignore - }, - } - } - } -} diff --git a/crates/polars-plan/src/plans/optimizer/mod.rs b/crates/polars-plan/src/plans/optimizer/mod.rs index 3b53050e1306..19bdcde04c20 100644 --- a/crates/polars-plan/src/plans/optimizer/mod.rs +++ b/crates/polars-plan/src/plans/optimizer/mod.rs @@ -108,10 +108,6 @@ More information on the new streaming engine: https://github.com/pola-rs/polars/ #[cfg(not(feature = "cse"))] let comm_subexpr_elim = false; - #[allow(unused_variables)] - let agg_scan_projection = - opt_flags.contains(OptFlags::FILE_CACHING) && !opt_flags.streaming() && !opt_flags.eager(); - // During debug we check if the optimizations have not modified the final schema. #[cfg(debug_assertions)] let prev_schema = lp_arena.get(lp_top).schema(lp_arena).into_owned();