Skip to content

Commit

Permalink
perf: Pre-sort groups in group-by-dynamic
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Mar 3, 2025
1 parent 5b6a7b7 commit 8d5449a
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 414 deletions.
19 changes: 17 additions & 2 deletions crates/polars-mem-engine/src/executors/group_by_dynamic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,29 @@ impl GroupByDynamicExec {
state: &ExecutionState,
mut df: DataFrame,
) -> PolarsResult<DataFrame> {
use crate::executors::group_by_rolling::sort_and_groups;

df.as_single_chunk_par();
let keys = self

let mut keys = self
.keys
.iter()
.map(|e| e.evaluate(&df, state))
.collect::<PolarsResult<Vec<_>>>()?;

let (mut time_key, mut keys, groups) = df.group_by_dynamic(keys, &self.options)?;
let group_by = if !self.keys.is_empty() {
Some(sort_and_groups(&mut df, &mut keys)?)
} else {
None
};

let (mut time_key, bounds, groups) = df.group_by_dynamic(group_by, &self.options)?;
POOL.install(|| {
keys.iter_mut().for_each(|key| {
unsafe { *key = key.agg_first(&groups) };
})
});
keys.extend(bounds);

if let Some(f) = &self.apply {
let gb = GroupBy::new(&df, vec![], groups, None);
Expand Down
87 changes: 47 additions & 40 deletions crates/polars-mem-engine/src/executors/group_by_rolling.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use polars_utils::unique_column_name;

use super::*;

#[cfg_attr(not(feature = "dynamic_group_by"), allow(dead_code))]
Expand All @@ -12,16 +14,57 @@ pub(crate) struct GroupByRollingExec {
pub(crate) apply: Option<Arc<dyn DataFrameUdf>>,
}

pub(super) fn sort_and_groups(
df: &mut DataFrame,
keys: &mut Vec<Column>,
) -> PolarsResult<Vec<[IdxSize; 2]>> {
let encoded = row_encode::encode_rows_vertical_par_unordered(keys)?;
let encoded = encoded.rechunk().into_owned();
let encoded = encoded.with_name(unique_column_name());
let idx = encoded.arg_sort(SortOptions {
maintain_order: true,
..Default::default()
});

let encoded = unsafe {
df.with_column_unchecked(encoded.into_series().into());

// If not sorted on keys, sort.
let idx_s = idx.clone().into_series();
if !idx_s.is_sorted(Default::default()).unwrap() {
let (df_ordered, keys_ordered) = POOL.join(
|| df.take_unchecked(&idx),
|| {
keys.iter()
.map(|c| c.take_unchecked(&idx))
.collect::<Vec<_>>()
},
);
*df = df_ordered;
*keys = keys_ordered;
}

df.get_columns_mut().pop().unwrap()
};
let encoded = encoded.as_series().unwrap();
let encoded = encoded.binary_offset().unwrap();
let encoded = encoded.with_sorted_flag(polars_core::series::IsSorted::Ascending);
let groups = encoded.group_tuples(true, false).unwrap();

let GroupsType::Slice { groups, .. } = groups else {
// memory would explode
unreachable!();
};
Ok(groups)
}

impl GroupByRollingExec {
#[cfg(feature = "dynamic_group_by")]
fn execute_impl(
&mut self,
state: &ExecutionState,
mut df: DataFrame,
) -> PolarsResult<DataFrame> {
use polars_utils::itertools::Itertools;
use polars_utils::unique_column_name;

df.as_single_chunk_par();

let mut keys = self
Expand All @@ -31,43 +74,7 @@ impl GroupByRollingExec {
.collect::<PolarsResult<Vec<_>>>()?;

let group_by = if !self.keys.is_empty() {
// 1. Sort
// 1. Compute offsets on sorted groups (this will ensure the amount)

let encoded = row_encode::encode_rows_vertical_par_unordered(&keys)?;
let encoded = encoded.rechunk().into_owned();
let encoded = encoded.with_name(unique_column_name());
let idx = encoded.arg_sort(SortOptions {
maintain_order: true,
..Default::default()
});

let encoded = unsafe {
df.with_column_unchecked(encoded.into_series().into());

// If not sorted on keys, sort.
let idx_s = idx.clone().into_series();
if !idx_s.is_sorted(Default::default()).unwrap() {
let (df_ordered, keys_ordered) = POOL.join(
|| df.take_unchecked(&idx),
|| keys.iter().map(|c| c.take_unchecked(&idx)).collect_vec(),
);
df = df_ordered;
keys = keys_ordered;
}

df.get_columns_mut().pop().unwrap()
};
let encoded = encoded.as_series().unwrap();
let encoded = encoded.binary_offset().unwrap();
let encoded = encoded.with_sorted_flag(polars_core::series::IsSorted::Ascending);
let groups = encoded.group_tuples(true, false).unwrap();

let GroupsType::Slice { groups, .. } = groups else {
// memory would explode
unreachable!();
};
Some(groups)
Some(sort_and_groups(&mut df, &mut keys)?)
} else {
None
};
Expand Down
Loading

0 comments on commit 8d5449a

Please sign in to comment.