Skip to content

Commit

Permalink
undo separate gather
Browse files Browse the repository at this point in the history
  • Loading branch information
orlp committed Oct 4, 2024
1 parent 4596ba3 commit 1a24e7e
Showing 1 changed file with 11 additions and 25 deletions.
36 changes: 11 additions & 25 deletions crates/polars-core/src/frame/group_by/hashing.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use hashbrown::hash_map::Entry;
use polars_utils::hashing::{hash_to_partition, DirtyHash};
use polars_utils::idx_vec::IdxVec;
use polars_utils::itertools::Itertools;
use polars_utils::sync::SyncPtr;
use polars_utils::total_ord::{ToTotalOrd, TotalHash, TotalOrdWrap};
use polars_utils::unitvec;
use polars_utils::vec::PushUnchecked;
use rayon::prelude::*;

use crate::hashing::*;
Expand Down Expand Up @@ -81,45 +79,34 @@ where
let (mut first, mut groups);
if sorted {
groups = Vec::with_capacity(get_init_size());
let mut hash_tbl: PlHashMap<TotalOrdWrap<K>, IdxSize> = PlHashMap::with_capacity(init_size);
first = Vec::with_capacity(get_init_size());
let mut hash_tbl = PlHashMap::with_capacity(init_size);
for (idx, k) in keys.enumerate() {
let idx = idx as IdxSize;
match hash_tbl.entry(TotalOrdWrap(k)) {
Entry::Vacant(entry) => {
let group_idx = groups.len() as IdxSize;
entry.insert(group_idx);
groups.push(unitvec![idx]);
first.push(idx);
},
Entry::Occupied(entry) => unsafe {
groups.get_unchecked_mut(*entry.get() as usize).push(idx)
},
}
}

first = groups
.iter()
.map(|v| unsafe { *v.first().unwrap_unchecked() })
.collect_vec();
} else {
let mut hash_tbl: PlHashMap<TotalOrdWrap<K>, IdxVec> = PlHashMap::with_capacity(init_size);
let mut hash_tbl = PlHashMap::with_capacity(init_size);
for (idx, k) in keys.enumerate() {
let idx = idx as IdxSize;
match hash_tbl.entry(TotalOrdWrap(k)) {
Entry::Vacant(entry) => {
entry.insert(unitvec![idx]);
entry.insert((idx, unitvec![idx]));
},
Entry::Occupied(mut entry) => entry.get_mut().push(idx),
}
}

first = Vec::with_capacity(hash_tbl.len());
groups = Vec::with_capacity(hash_tbl.len());
unsafe {
for v in hash_tbl.into_values() {
first.push_unchecked(*v.first().unwrap_unchecked());
groups.push_unchecked(v);
Entry::Occupied(mut entry) => entry.get_mut().1.push(idx),
}
}
(first, groups) = hash_tbl.into_values().unzip();
}
GroupsProxy::Idx(GroupsIdx::new(first, groups, sorted))
}
Expand All @@ -146,8 +133,7 @@ where
(0..n_partitions)
.into_par_iter()
.map(|thread_no| {
let mut hash_tbl: PlHashMap<T::TotalOrdItem, IdxVec> =
PlHashMap::with_capacity(init_size);
let mut hash_tbl = PlHashMap::with_capacity(init_size);

let mut offset = 0;
for keys in &keys {
Expand All @@ -161,10 +147,10 @@ where
if thread_no == hash_to_partition(k.dirty_hash(), n_partitions) {
match hash_tbl.entry(k) {
Entry::Vacant(entry) => {
entry.insert(unitvec![idx]);
entry.insert((idx, unitvec![idx]));
},
Entry::Occupied(mut entry) => {
entry.get_mut().push(idx);
entry.get_mut().1.push(idx);
},
}
}
Expand All @@ -173,7 +159,7 @@ where
}
hash_tbl
.into_iter()
.map(|(_k, v)| (unsafe { *v.first().unwrap_unchecked() }, v))
.map(|(_k, v)| v)
.collect_trusted::<Vec<_>>()
})
.collect::<Vec<_>>()
Expand Down

0 comments on commit 1a24e7e

Please sign in to comment.