From 6f893e87a3034f7081c3cfad1e3cfb56b0a52223 Mon Sep 17 00:00:00 2001 From: Lawrence Mitchell Date: Fri, 28 Feb 2025 12:52:16 +0000 Subject: [PATCH] feat(rust, python): Support engine callback for LazyFrame.profile So that we can support the GPU engine in profiled collection of a lazyframe, plumb through a mechanism for recording raw timings for nodes that were executed through the PythonScan node. This necessitates some small changes to the internals of the NodeTimer, since `Instant`s are opaque. We instead directly store durations (as nanoseconds since the query start) and when calling into the IR post-optimization callback, provide a duration that is the number of nanoseconds since the query was started. On the Python side we can then keep track and record durations independently, offsetted by this optimisation duration. As a side-effect, `profile` now correctly measures the optimisation time in the logical plan, rather than as previously just the time to produce the physical plan from the optimised logical plan. --- .../polars-expr/src/state/execution_state.rs | 15 ++- crates/polars-expr/src/state/node_timer.rs | 28 +++-- crates/polars-lazy/src/frame/exitable.rs | 2 +- crates/polars-lazy/src/frame/mod.rs | 54 ++++++--- .../src/executors/scan/python_scan.rs | 29 ++++- crates/polars-python/src/lazyframe/general.rs | 76 +++++++----- py-polars/polars/lazyframe/frame.py | 108 +++++++++++++----- 7 files changed, 220 insertions(+), 92 deletions(-) diff --git a/crates/polars-expr/src/state/execution_state.rs b/crates/polars-expr/src/state/execution_state.rs index e1e073fc0083..9b72235ec774 100644 --- a/crates/polars-expr/src/state/execution_state.rs +++ b/crates/polars-expr/src/state/execution_state.rs @@ -136,8 +136,8 @@ impl ExecutionState { } /// Toggle this to measure execution times. - pub fn time_nodes(&mut self) { - self.node_timer = Some(NodeTimer::new()) + pub fn time_nodes(&mut self, start: std::time::Instant) { + self.node_timer = Some(NodeTimer::new(start)) } pub fn has_node_timer(&self) -> bool { self.node_timer.is_some() @@ -147,6 +147,17 @@ impl ExecutionState { self.node_timer.unwrap().finish() } + // Timings should be a list of (start, end, name) where the start + // and end are raw durations since the query start as nanoseconds. + pub fn record_raw_timings(&self, timings: &[(u64, u64, String)]) -> () { + for &(start, end, ref name) in timings { + self.node_timer + .as_ref() + .unwrap() + .store_raw(start, end, name.to_string()); + } + } + // This is wrong when the U64 overflows which will never happen. pub fn should_stop(&self) -> PolarsResult<()> { try_raise_keyboard_interrupt(); diff --git a/crates/polars-expr/src/state/node_timer.rs b/crates/polars-expr/src/state/node_timer.rs index c3114d3029cd..d95c329a0a09 100644 --- a/crates/polars-expr/src/state/node_timer.rs +++ b/crates/polars-expr/src/state/node_timer.rs @@ -1,5 +1,5 @@ use std::sync::Mutex; -use std::time::Instant; +use std::time::{Duration, Instant}; use polars_core::prelude::*; use polars_core::utils::NoNull; @@ -8,7 +8,7 @@ type StartInstant = Instant; type EndInstant = Instant; type Nodes = Vec; -type Ticks = Vec<(StartInstant, EndInstant)>; +type Ticks = Vec<(Duration, Duration)>; #[derive(Clone)] pub(super) struct NodeTimer { @@ -17,9 +17,9 @@ pub(super) struct NodeTimer { } impl NodeTimer { - pub(super) fn new() -> Self { + pub(super) fn new(query_start: Instant) -> Self { Self { - query_start: Instant::now(), + query_start, data: Arc::new(Mutex::new((Vec::with_capacity(16), Vec::with_capacity(16)))), } } @@ -29,7 +29,19 @@ impl NodeTimer { let nodes = &mut data.0; nodes.push(name); let ticks = &mut data.1; - ticks.push((start, end)) + ticks.push(( + start.duration_since(self.query_start), + end.duration_since(self.query_start), + )) + } + + // start and end should be raw nanosecond durations since query_start + pub(super) fn store_raw(&self, start: u64, end: u64, name: String) { + let mut data = self.data.lock().unwrap(); + let nodes = &mut data.0; + nodes.push(name); + let ticks = &mut data.1; + ticks.push((Duration::from_nanos(start), Duration::from_nanos(end))) } pub(super) fn finish(self) -> PolarsResult { @@ -41,18 +53,18 @@ impl NodeTimer { // first value is end of optimization polars_ensure!(!ticks.is_empty(), ComputeError: "no data to time"); let start = ticks[0].0; - ticks.push((self.query_start, start)); + ticks.push((Duration::from_nanos(0), start)); let nodes_s = Column::new(PlSmallStr::from_static("node"), nodes); let start: NoNull = ticks .iter() - .map(|(start, _)| (start.duration_since(self.query_start)).as_micros() as u64) + .map(|(start, _)| start.as_micros() as u64) .collect(); let mut start = start.into_inner(); start.rename(PlSmallStr::from_static("start")); let end: NoNull = ticks .iter() - .map(|(_, end)| (end.duration_since(self.query_start)).as_micros() as u64) + .map(|(_, end)| end.as_micros() as u64) .collect(); let mut end = end.into_inner(); end.rename(PlSmallStr::from_static("end")); diff --git a/crates/polars-lazy/src/frame/exitable.rs b/crates/polars-lazy/src/frame/exitable.rs index 0e32f3b3cbee..a223f14fd24f 100644 --- a/crates/polars-lazy/src/frame/exitable.rs +++ b/crates/polars-lazy/src/frame/exitable.rs @@ -8,7 +8,7 @@ use super::*; impl LazyFrame { pub fn collect_concurrently(self) -> PolarsResult { - let (mut state, mut physical_plan, _) = self.prepare_collect(false)?; + let (mut state, mut physical_plan, _) = self.prepare_collect(false, None)?; let (tx, rx) = channel(); let token = state.cancel_token(); diff --git a/crates/polars-lazy/src/frame/mod.rs b/crates/polars-lazy/src/frame/mod.rs index 6088c7fffedd..651d7afc47ef 100644 --- a/crates/polars-lazy/src/frame/mod.rs +++ b/crates/polars-lazy/src/frame/mod.rs @@ -47,6 +47,10 @@ use crate::frame::cached_arenas::CachedArena; use crate::physical_plan::streaming::insert_streaming_nodes; use crate::prelude::*; +// Function called after logical plan optimization that can potentially change the plan. +type PostOptFn = + Fn(Node, &mut Arena, &mut Arena, Option) -> PolarsResult<()>; + pub trait IntoLazy { fn lazy(self) -> LazyFrame; } @@ -668,18 +672,23 @@ impl LazyFrame { fn prepare_collect_post_opt

( mut self, check_sink: bool, - post_opt: P, - ) -> PolarsResult<(ExecutionState, Box, bool)> - where - P: Fn(Node, &mut Arena, &mut Arena) -> PolarsResult<()>, - { + query_start: Option, + post_opt: PostOptFn, + ) -> PolarsResult<(ExecutionState, Box, bool)> { let (mut lp_arena, mut expr_arena) = self.get_arenas(); let mut scratch = vec![]; let lp_top = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut scratch, false)?; - post_opt(lp_top, &mut lp_arena, &mut expr_arena)?; + post_opt( + lp_top, + &mut lp_arena, + &mut expr_arena, + // Post optimization callback gets the time since the + // query was started as its "base" timepoint. + query_start.map(|s| s.elapsed()), + )?; // sink should be replaced let no_file_sink = if check_sink { @@ -694,11 +703,9 @@ impl LazyFrame { } // post_opt: A function that is called after optimization. This can be used to modify the IR jit. - pub fn _collect_post_opt

(self, post_opt: P) -> PolarsResult - where - P: Fn(Node, &mut Arena, &mut Arena) -> PolarsResult<()>, - { - let (mut state, mut physical_plan, _) = self.prepare_collect_post_opt(false, post_opt)?; + pub fn _collect_post_opt

(self, post_opt: PostOptFn) -> PolarsResult { + let (mut state, mut physical_plan, _) = + self.prepare_collect_post_opt(false, None, post_opt)?; physical_plan.execute(&mut state) } @@ -706,8 +713,9 @@ impl LazyFrame { fn prepare_collect( self, check_sink: bool, + query_start: Option, ) -> PolarsResult<(ExecutionState, Box, bool)> { - self.prepare_collect_post_opt(check_sink, |_, _, _| Ok(())) + self.prepare_collect_post_opt(check_sink, query_start, |_, _, _, _| Ok(())) } /// Execute all the lazy operations and collect them into a [`DataFrame`]. @@ -745,7 +753,19 @@ impl LazyFrame { physical_plan.execute(&mut state) } #[cfg(not(feature = "new_streaming"))] - self._collect_post_opt(|_, _, _| Ok(())) + self._collect_post_opt(|_, _, _, _| Ok(())) + } + + // post_opt: A function that is called after optimization. This can be used to modify the IR jit. + // This version does profiling of the node execution. + pub fn _profile_post_opt

(self, post_opt: PostOptFn) -> PolarsResult<(DataFrame, DataFrame)> { + let query_start = std::time::Instant::now(); + let (mut state, mut physical_plan, _) = + self.prepare_collect_post_opt(false, Some(query_start), post_opt)?; + state.time_nodes(query_start); + let out = physical_plan.execute(&mut state)?; + let timer_df = state.finish_timer()?; + Ok((out, timer_df)) } /// Profile a LazyFrame. @@ -756,11 +776,7 @@ impl LazyFrame { /// /// The units of the timings are microseconds. pub fn profile(self) -> PolarsResult<(DataFrame, DataFrame)> { - let (mut state, mut physical_plan, _) = self.prepare_collect(false)?; - state.time_nodes(); - let out = physical_plan.execute(&mut state)?; - let timer_df = state.finish_timer()?; - Ok((out, timer_df)) + self._profile_post_opt(|_, _, _, _| Ok(())) } /// Stream a query result into a parquet file. This is useful if the final result doesn't fit @@ -919,7 +935,7 @@ impl LazyFrame { payload, }; self.opt_state |= OptFlags::STREAMING; - let (mut state, mut physical_plan, is_streaming) = self.prepare_collect(true)?; + let (mut state, mut physical_plan, is_streaming) = self.prepare_collect(true, None)?; polars_ensure!( is_streaming, ComputeError: format!("cannot run the whole query in a streaming order; \ diff --git a/crates/polars-mem-engine/src/executors/scan/python_scan.rs b/crates/polars-mem-engine/src/executors/scan/python_scan.rs index f3e3727bbd3a..f39939770b93 100644 --- a/crates/polars-mem-engine/src/executors/scan/python_scan.rs +++ b/crates/polars-mem-engine/src/executors/scan/python_scan.rs @@ -51,10 +51,31 @@ impl Executor for PythonScanExec { }, }; - let generator_init = if matches!( - self.options.python_source, - PythonScanSource::Pyarrow | PythonScanSource::Cuda - ) { + let generator_init = if matches!(self.options.python_source, PythonScanSource::Cuda) { + let args = ( + python_scan_function, + with_columns.map(|x| x.into_iter().map(|x| x.to_string()).collect::>()), + predicate, + n_rows, + // Is this boolean is true, callback should return + // a dataframe and list of timings [(start, end, + // name)] + state.has_node_timer(), + ); + let result = callable.call1(args).map_err(to_compute_err)?; + if state.has_node_timer() { + let df = result.get_item(0).map_err(to_compute_err); + let timing_info: Vec<(u64, u64, String)> = result + .get_item(1) + .map_err(to_compute_err)? + .extract() + .map_err(to_compute_err)?; + state.record_raw_timings(&timing_info); + df + } else { + Ok(result) + } + } else if matches!(self.options.python_source, PythonScanSource::Pyarrow) { let args = ( python_scan_function, with_columns.map(|x| x.into_iter().map(|x| x.to_string()).collect::>()), diff --git a/crates/polars-python/src/lazyframe/general.rs b/crates/polars-python/src/lazyframe/general.rs index c38039e09641..cbd64d3cea83 100644 --- a/crates/polars-python/src/lazyframe/general.rs +++ b/crates/polars-python/src/lazyframe/general.rs @@ -9,6 +9,8 @@ use polars_core::prelude::*; #[cfg(feature = "parquet")] use polars_parquet::arrow::write::StatisticsOptions; use polars_plan::dsl::ScanSources; +use polars_plan::plans::{AExpr, IR}; +use polars_utils::arena::{Arena, Node}; use pyo3::prelude::*; use pyo3::pybacked::PyBackedStr; use pyo3::types::{PyDict, PyList}; @@ -35,6 +37,35 @@ fn pyobject_to_first_path_and_scan_sources( }) } +fn post_opt_callback( + lambda: &PyObject, + root: Node, + lp_arena: &mut Arena, + expr_arena: &mut Arena, + duration_since_start: Option, +) -> PolarsResult<()> { + Python::with_gil(|py| { + let nt = NodeTraverser::new(root, std::mem::take(lp_arena), std::mem::take(expr_arena)); + + // Get a copy of the arenas. + let arenas = nt.get_arenas(); + + // Pass the node visitor which allows the python callback to replace parts of the query plan. + // Remove "cuda" or specify better once we have multiple post-opt callbacks. + lambda + .call1(py, (nt, duration_since_start.map(|t| t.as_nanos() as u64))) + .map_err(|e| polars_err!(ComputeError: "'cuda' conversion failed: {}", e))?; + + // Unpack the arenas. + // At this point the `nt` is useless. + + std::mem::swap(lp_arena, &mut *arenas.0.lock().unwrap()); + std::mem::swap(expr_arena, &mut *arenas.1.lock().unwrap()); + + Ok(()) + }) +} + #[pymethods] #[allow(clippy::should_implement_trait)] impl PyLazyFrame { @@ -613,8 +644,22 @@ impl PyLazyFrame { ldf.cache().into() } - fn profile(&self, py: Python) -> PyResult<(PyDataFrame, PyDataFrame)> { - let (df, time_df) = py.enter_polars(|| self.ldf.clone().profile())?; + #[pyo3(signature = (lambda_post_opt=None))] + fn profile( + &self, + py: Python, + lambda_post_opt: Option, + ) -> PyResult<(PyDataFrame, PyDataFrame)> { + let (df, time_df) = py.enter_polars(|| { + let ldf = self.ldf.clone(); + if let Some(lambda) = lambda_post_opt { + ldf._profile_post_opt(|root, lp_arena, expr_arena, duration_since_start| { + post_opt_callback(&lambda, root, lp_arena, expr_arena, duration_since_start) + }) + } else { + ldf.profile() + } + })?; Ok((df.into(), time_df.into())) } @@ -623,31 +668,8 @@ impl PyLazyFrame { py.enter_polars_df(|| { let ldf = self.ldf.clone(); if let Some(lambda) = lambda_post_opt { - ldf._collect_post_opt(|root, lp_arena, expr_arena| { - Python::with_gil(|py| { - let nt = NodeTraverser::new( - root, - std::mem::take(lp_arena), - std::mem::take(expr_arena), - ); - - // Get a copy of the arena's. - let arenas = nt.get_arenas(); - - // Pass the node visitor which allows the python callback to replace parts of the query plan. - // Remove "cuda" or specify better once we have multiple post-opt callbacks. - lambda.call1(py, (nt,)).map_err( - |e| polars_err!(ComputeError: "'cuda' conversion failed: {}", e), - )?; - - // Unpack the arena's. - // At this point the `nt` is useless. - - std::mem::swap(lp_arena, &mut *arenas.0.lock().unwrap()); - std::mem::swap(expr_arena, &mut *arenas.1.lock().unwrap()); - - Ok(()) - }) + ldf._collect_post_opt(|root, lp_arena, expr_arena, _| { + post_opt_callback(&lambda, root, lp_arena, expr_arena, None) }) } else { ldf.collect() diff --git a/py-polars/polars/lazyframe/frame.py b/py-polars/polars/lazyframe/frame.py index cc4df14fdce6..a1e16eedd0c3 100644 --- a/py-polars/polars/lazyframe/frame.py +++ b/py-polars/polars/lazyframe/frame.py @@ -143,6 +143,46 @@ P = ParamSpec("P") +def _gpu_engine_callback( + engine: EngineType, + *, + streaming: bool, + background: bool, + new_streaming: bool, + _eager: bool, +) -> Callable[[Any, int | None], None] | None: + is_gpu = (is_config_obj := isinstance(engine, GPUEngine)) or engine == "gpu" + if not (is_config_obj or engine in ("cpu", "gpu")): + msg = f"Invalid engine argument {engine=}" + raise ValueError(msg) + if (streaming or background or new_streaming) and is_gpu: + issue_warning( + "GPU engine does not support streaming or background collection, " + "disabling GPU engine.", + category=UserWarning, + ) + is_gpu = False + if _eager: + # Don't run on GPU in _eager mode (but don't warn) + is_gpu = False + + if not is_gpu: + return None + cudf_polars = import_optional( + "cudf_polars", + err_prefix="GPU engine requested, but required package", + install_message=( + "Please install using the command " + "`pip install cudf-polars-cu12` " + "(or `pip install --extra-index-url=https://pypi.nvidia.com cudf-polars-cu11` " + "if your system has a CUDA 11 driver)." + ), + ) + if not is_config_obj: + engine = GPUEngine() + return partial(cudf_polars.execute_with_cudf, config=engine) + + class LazyFrame: """ Representation of a Lazy computation graph/query against a DataFrame. @@ -1630,6 +1670,7 @@ def profile( truncate_nodes: int = 0, figsize: tuple[int, int] = (18, 8), streaming: bool = False, + engine: EngineType = "cpu", _check_order: bool = True, ) -> tuple[DataFrame, DataFrame]: """ @@ -1672,6 +1713,27 @@ def profile( matplotlib figsize of the profiling plot streaming Run parts of the query in a streaming fashion (this is in an alpha state) + engine + Select the engine used to process the query, optional. + If set to `"cpu"` (default), the query is run using the + polars CPU engine. If set to `"gpu"`, the GPU engine is + used. Fine-grained control over the GPU engine, for + example which device to use on a system with multiple + devices, is possible by providing a :class:`~.GPUEngine` object + with configuration options. + + .. note:: + GPU mode is considered **unstable**. Not all queries will run + successfully on the GPU, however, they should fall back transparently + to the default engine if execution is not supported. + + Running with `POLARS_VERBOSE=1` will provide information if a query + falls back (and why). + + .. note:: + The GPU engine does not support streaming, if streaming + is enabled then GPU execution is switched off. + Examples -------- @@ -1731,7 +1793,14 @@ def profile( _check_order=_check_order, new_streaming=False, ) - df, timings = ldf.profile() + callback = _gpu_engine_callback( + engine, + streaming=streaming, + background=False, + new_streaming=False, + _eager=False, + ) + df, timings = ldf.profile(callback) (df, timings) = wrap_df(df), wrap_df(timings) if show_plot: @@ -2008,21 +2077,13 @@ def collect( if streaming: issue_unstable_warning("Streaming mode is considered unstable.") - is_gpu = (is_config_obj := isinstance(engine, GPUEngine)) or engine == "gpu" - if not (is_config_obj or engine in ("cpu", "gpu")): - msg = f"Invalid engine argument {engine=}" - raise ValueError(msg) - if (streaming or background or new_streaming) and is_gpu: - issue_warning( - "GPU engine does not support streaming or background collection, " - "disabling GPU engine.", - category=UserWarning, - ) - is_gpu = False - if _eager: - # Don't run on GPU in _eager mode (but don't warn) - is_gpu = False - + callback = _gpu_engine_callback( + engine, + streaming=streaming, + background=background, + new_streaming=new_streaming, + _eager=_eager, + ) type_check = _type_check ldf = self._ldf.optimization_toggle( type_coercion=type_coercion, @@ -2045,21 +2106,6 @@ def collect( issue_unstable_warning("Background mode is considered unstable.") return InProcessQuery(ldf.collect_concurrently()) - callback = None - if is_gpu: - cudf_polars = import_optional( - "cudf_polars", - err_prefix="GPU engine requested, but required package", - install_message=( - "Please install using the command " - "`pip install cudf-polars-cu12` " - "(or `pip install --extra-index-url=https://pypi.nvidia.com cudf-polars-cu11` " - "if your system has a CUDA 11 driver)." - ), - ) - if not is_config_obj: - engine = GPUEngine() - callback = partial(cudf_polars.execute_with_cudf, config=engine) # Only for testing purposes callback = _kwargs.get("post_opt_callback", callback) return wrap_df(ldf.collect(callback))