Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
orlp committed Mar 7, 2025
1 parent 69a13b2 commit c256195
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 45 deletions.
80 changes: 46 additions & 34 deletions crates/polars-stream/src/nodes/joins/equi_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ impl BuildState {
let num_partitions = self.local_builders[0].sketch_per_p.len();
let local_builders = &self.local_builders;
let probe_tables: SparseInitVec<ProbeTable> = SparseInitVec::with_capacity(num_partitions);

POOL.scope(|s| {
for p in 0..num_partitions {
let probe_tables = &probe_tables;
Expand All @@ -560,7 +560,9 @@ impl BuildState {
let mut sketch = CardinalitySketch::new();
let mut payload_rows = 0;
for (l_idx, l) in local_builders.iter().enumerate() {
let Some((seq, _, _)) = l.morsels.get(0) else { continue };
let Some((seq, _, _)) = l.morsels.get(0) else {
continue;
};
kmerge.push(Priority(Reverse(seq), l_idx));

sketch.combine(&l.sketch_per_p[p]);
Expand All @@ -574,7 +576,7 @@ impl BuildState {
p_table.reserve(sketch.estimate() * 5 / 4);
let mut p_payload = DataFrameBuilder::new(payload_schema.clone());
p_payload.reserve(payload_rows);

let mut p_seq_ids = Vec::new();
if track_unmatchable {
p_seq_ids.reserve(payload_rows);
Expand All @@ -590,7 +592,7 @@ impl BuildState {
if let Some((next_seq, _, _)) = l.morsels.get(idx_in_l + 1) {
kmerge.push(Priority(Reverse(next_seq), l_idx));
}

let (_mseq, payload, keys) = l.morsels.get_unchecked(idx_in_l);
let p_morsel_idxs_start =
l.morsel_idxs_offsets_per_p[idx_in_l * num_partitions + p];
Expand All @@ -599,24 +601,26 @@ impl BuildState {
let p_morsel_idxs = &l.morsel_idxs_values_per_p[p]
[p_morsel_idxs_start..p_morsel_idxs_stop];
p_table.insert_keys_subset(keys, p_morsel_idxs, track_unmatchable);
p_payload.gather_extend(
payload,
p_morsel_idxs,
ShareStrategy::Never,
);

p_payload.gather_extend(payload, p_morsel_idxs, ShareStrategy::Never);

if track_unmatchable {
p_seq_ids.resize(p_payload.len(), norm_seq_id);
norm_seq_id += 1;
}
}
}

probe_tables.try_set(p, ProbeTable {
hash_table: p_table,
payload: p_payload.freeze(),
seq_ids: p_seq_ids,
}).ok().unwrap();
probe_tables
.try_set(
p,
ProbeTable {
hash_table: p_table,
payload: p_payload.freeze(),
seq_ids: p_seq_ids,
},
)
.ok()
.unwrap();
});
}
});
Expand Down Expand Up @@ -645,7 +649,8 @@ impl BuildState {
.iter_mut()
.map(|b| Arc::new(core::mem::take(&mut b.morsels)))
.collect_vec();
let (morsel_drop_q_send, morsel_drop_q_recv) = crossbeam_channel::bounded(morsels_per_local_builder.len());
let (morsel_drop_q_send, morsel_drop_q_recv) =
crossbeam_channel::bounded(morsels_per_local_builder.len());
let num_partitions = self.local_builders[0].sketch_per_p.len();
let local_builders = &self.local_builders;
let probe_tables: SparseInitVec<ProbeTable> = SparseInitVec::with_capacity(num_partitions);
Expand Down Expand Up @@ -706,7 +711,7 @@ impl BuildState {
);
}
}

if let Some(l) = Arc::into_inner(l_morsels) {
// If we're the last thread to process this set of morsels we're probably
// falling behind the rest, since the drop can be quite expensive we skip
Expand All @@ -717,18 +722,24 @@ impl BuildState {
skip_drop_attempt = false;
}
}

// We're done, help others out by doing drops.
drop(morsel_drop_q_send); // So we don't deadlock trying to receive from ourselves.
while let Ok(l_morsels) = morsel_drop_q_recv.recv() {
drop(l_morsels);
}

probe_tables.try_set(p, ProbeTable {
hash_table: p_table,
payload: p_payload.freeze(),
seq_ids: Vec::new(),
}).ok().unwrap();
probe_tables
.try_set(
p,
ProbeTable {
hash_table: p_table,
payload: p_payload.freeze(),
seq_ids: Vec::new(),
},
)
.ok()
.unwrap();
});
}

Expand Down Expand Up @@ -1003,10 +1014,7 @@ impl ProbeState {
Ok(max_seq)
}

fn ordered_unmatched(
&mut self,
params: &EquiJoinParams,
) -> DataFrame {
fn ordered_unmatched(&mut self, params: &EquiJoinParams) -> DataFrame {
// TODO: parallelize this operator.

let build_payload_schema = if params.left_is_build.unwrap() {
Expand All @@ -1021,11 +1029,13 @@ impl ProbeState {
for (p_idx, p) in self.table_per_partition.iter().enumerate_idx() {
p.hash_table
.unmarked_keys(&mut unmarked_idxs, 0, IdxSize::MAX);
linearized_idxs.extend(unmarked_idxs.iter().map(|i| {
(unsafe { *p.seq_ids.get_unchecked(*i as usize) }, p_idx, *i)
}));
linearized_idxs.extend(
unmarked_idxs
.iter()
.map(|i| (unsafe { *p.seq_ids.get_unchecked(*i as usize) }, p_idx, *i)),
);
}

linearized_idxs.sort_by_key(|(seq_id, _, _)| *seq_id);

unsafe {
Expand All @@ -1045,7 +1055,7 @@ impl ProbeState {
gather_idxs.push(linearized_idxs[group_end].2);
group_end += 1;
}

build_out.gather_extend(
&self.table_per_partition[p_idx as usize].payload,
&gather_idxs,
Expand All @@ -1057,11 +1067,13 @@ impl ProbeState {

let mut build_df = build_out.freeze();
let out_df = if params.left_is_build.unwrap() {
let probe_df = DataFrame::full_null(&params.right_payload_schema, build_df.height());
let probe_df =
DataFrame::full_null(&params.right_payload_schema, build_df.height());
build_df.hstack_mut_unchecked(probe_df.get_columns());
build_df
} else {
let mut probe_df = DataFrame::full_null(&params.left_payload_schema, build_df.height());
let mut probe_df =
DataFrame::full_null(&params.left_payload_schema, build_df.height());
probe_df.hstack_mut_unchecked(build_df.get_columns());
probe_df
};
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-stream/src/nodes/joins/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
pub mod in_memory;
pub mod equi_join;
pub mod in_memory;
19 changes: 9 additions & 10 deletions crates/polars-utils/src/sparse_init_vec.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
use std::sync::atomic::{AtomicUsize, AtomicU8, Ordering};

use std::sync::atomic::{AtomicU8, AtomicUsize, Ordering};

pub struct SparseInitVec<T> {
ptr: *mut T,
len: usize,
cap: usize,

num_init: AtomicUsize,
init_mask: Vec<AtomicU8>,
}

unsafe impl<T: Send> Send for SparseInitVec<T> { }
unsafe impl<T: Send> Sync for SparseInitVec<T> { }
unsafe impl<T: Send> Send for SparseInitVec<T> {}
unsafe impl<T: Send> Sync for SparseInitVec<T> {}

impl<T> SparseInitVec<T> {
pub fn with_capacity(len: usize) -> Self {
Expand All @@ -28,7 +27,7 @@ impl<T> SparseInitVec<T> {
init_mask,
}
}

pub fn try_set(&self, idx: usize, value: T) -> Result<(), T> {
unsafe {
if idx >= self.len {
Expand All @@ -42,14 +41,14 @@ impl<T> SparseInitVec<T> {
if init_mask_byte.fetch_or(bit_mask, Ordering::Relaxed) & bit_mask != 0 {
return Err(value);
}

self.ptr.add(idx).write(value);
self.num_init.fetch_add(1, Ordering::Relaxed);
}

Ok(())
}

pub fn try_assume_init(mut self) -> Result<Vec<T>, Self> {
unsafe {
if *self.num_init.get_mut() == self.len {
Expand All @@ -69,7 +68,7 @@ impl<T> Drop for SparseInitVec<T> {
unsafe {
// Make sure storage gets dropped even if element drop panics.
let _storage = Vec::from_raw_parts(self.ptr, 0, self.cap);

for idx in 0..self.len {
let init_mask_byte = self.init_mask.get_unchecked_mut(idx / 8);
let bit_mask = 1 << (idx % 8);
Expand Down

0 comments on commit c256195

Please sign in to comment.