diff --git a/Cargo.lock b/Cargo.lock index dcf382913791..53d086eabae4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3516,6 +3516,7 @@ name = "polars-stream" version = "0.46.0" dependencies = [ "atomic-waker", + "crossbeam-channel", "crossbeam-deque", "crossbeam-queue", "crossbeam-utils", @@ -3523,6 +3524,7 @@ dependencies = [ "memmap2", "parking_lot", "pin-project-lite", + "polars-arrow", "polars-core", "polars-error", "polars-expr", diff --git a/crates/polars-arrow/src/array/primitive/builder.rs b/crates/polars-arrow/src/array/primitive/builder.rs index 82177674926f..c942ae71553a 100644 --- a/crates/polars-arrow/src/array/primitive/builder.rs +++ b/crates/polars-arrow/src/array/primitive/builder.rs @@ -76,11 +76,12 @@ impl StaticArrayBuilder for PrimitiveArrayBuilder { idxs: &[IdxSize], _share: ShareStrategy, ) { - self.values.reserve(idxs.len()); - for idx in idxs { - self.values - .push_unchecked(other.value_unchecked(*idx as usize)); - } + // TODO: SIMD gather kernels? + let other_values_slice = other.values().as_slice(); + self.values.extend( + idxs.iter() + .map(|idx| *other_values_slice.get_unchecked(*idx as usize)), + ); self.validity .gather_extend_from_opt_validity(other.validity(), idxs); } diff --git a/crates/polars-arrow/src/bitmap/builder.rs b/crates/polars-arrow/src/bitmap/builder.rs index 6a4bf6013ec6..1ea13f865f43 100644 --- a/crates/polars-arrow/src/bitmap/builder.rs +++ b/crates/polars-arrow/src/bitmap/builder.rs @@ -251,6 +251,18 @@ impl BitmapBuilder { self.extend_from_slice(slice, bm_offset + start, length); } + pub fn subslice_extend_from_opt_validity( + &mut self, + bitmap: Option<&Bitmap>, + start: usize, + length: usize, + ) { + match bitmap { + Some(bm) => self.subslice_extend_from_bitmap(bm, start, length), + None => self.extend_constant(length, true), + } + } + /// # Safety /// The indices must be in-bounds. pub unsafe fn gather_extend_from_slice( @@ -308,6 +320,43 @@ impl BitmapBuilder { self.opt_gather_extend_from_slice(slice, offset, length, idxs); } + /// # Safety + /// The indices must be in-bounds. + pub unsafe fn gather_extend_from_opt_validity( + &mut self, + bitmap: Option<&Bitmap>, + idxs: &[IdxSize], + length: usize, + ) { + if let Some(bm) = bitmap { + let (slice, offset, sl_length) = bm.as_slice(); + debug_assert_eq!(sl_length, length); + self.gather_extend_from_slice(slice, offset, length, idxs); + } else { + self.extend_constant(length, true); + } + } + + pub fn opt_gather_extend_from_opt_validity( + &mut self, + bitmap: Option<&Bitmap>, + idxs: &[IdxSize], + length: usize, + ) { + if let Some(bm) = bitmap { + let (slice, offset, sl_length) = bm.as_slice(); + debug_assert_eq!(sl_length, length); + self.opt_gather_extend_from_slice(slice, offset, sl_length, idxs); + } else { + unsafe { + self.reserve(idxs.len()); + for idx in idxs { + self.push_unchecked((*idx as usize) < length); + } + } + } + } + /// # Safety /// May only be called once at the end. unsafe fn finish(&mut self) { diff --git a/crates/polars-core/src/chunked_array/object/builder.rs b/crates/polars-core/src/chunked_array/object/builder.rs index 7010ab59da49..05e522c80150 100644 --- a/crates/polars-core/src/chunked_array/object/builder.rs +++ b/crates/polars-core/src/chunked_array/object/builder.rs @@ -1,4 +1,6 @@ +use arrow::array::builder::{ArrayBuilder, ShareStrategy}; use arrow::bitmap::BitmapBuilder; +use polars_utils::vec::PushUnchecked; use super::*; use crate::utils::get_iter_capacity; @@ -176,3 +178,96 @@ pub(crate) fn object_series_to_arrow_array(s: &Series) -> ArrayRef { let arr = arr.as_any().downcast_ref::>().unwrap(); arr.values().to_boxed() } + +impl ArrayBuilder for ObjectChunkedBuilder { + fn dtype(&self) -> &ArrowDataType { + &ArrowDataType::FixedSizeBinary(size_of::()) + } + + fn reserve(&mut self, additional: usize) { + self.bitmask_builder.reserve(additional); + self.values.reserve(additional); + } + + fn freeze(self) -> Box { + Box::new(ObjectArray { + values: self.values.into(), + validity: self.bitmask_builder.into_opt_validity(), + }) + } + + fn freeze_reset(&mut self) -> Box { + Box::new(ObjectArray { + values: core::mem::take(&mut self.values).into(), + validity: core::mem::take(&mut self.bitmask_builder).into_opt_validity(), + }) + } + + fn len(&self) -> usize { + self.values.len() + } + + fn extend_nulls(&mut self, length: usize) { + self.values.resize(self.values.len() + length, T::default()); + self.bitmask_builder.extend_constant(length, false); + } + + fn subslice_extend( + &mut self, + other: &dyn Array, + start: usize, + length: usize, + _share: ShareStrategy, + ) { + let other: &ObjectArray = other.as_any().downcast_ref().unwrap(); + self.values + .extend_from_slice(&other.values[start..start + length]); + self.bitmask_builder + .subslice_extend_from_opt_validity(other.validity(), start, length); + } + + fn subslice_extend_repeated( + &mut self, + other: &dyn Array, + start: usize, + length: usize, + repeats: usize, + share: ShareStrategy, + ) { + for _ in 0..repeats { + self.subslice_extend(other, start, length, share) + } + } + + unsafe fn gather_extend(&mut self, other: &dyn Array, idxs: &[IdxSize], _share: ShareStrategy) { + let other: &ObjectArray = other.as_any().downcast_ref().unwrap(); + let other_values_slice = other.values.as_slice(); + self.values.extend( + idxs.iter() + .map(|idx| other_values_slice.get_unchecked(*idx as usize).clone()), + ); + self.bitmask_builder + .gather_extend_from_opt_validity(other.validity(), idxs, other.len()); + } + + fn opt_gather_extend(&mut self, other: &dyn Array, idxs: &[IdxSize], _share: ShareStrategy) { + let other: &ObjectArray = other.as_any().downcast_ref().unwrap(); + let other_values_slice = other.values.as_slice(); + self.values.reserve(idxs.len()); + unsafe { + for idx in idxs { + let val = if (*idx as usize) < other.len() { + other_values_slice.get_unchecked(*idx as usize).clone() + } else { + T::default() + }; + self.values.push_unchecked(val); + } + } + self.bitmask_builder.opt_gather_extend_from_opt_validity( + other.validity(), + idxs, + other.len(), + ); + } +} diff --git a/crates/polars-core/src/chunked_array/object/registry.rs b/crates/polars-core/src/chunked_array/object/registry.rs index 7364cf0361d7..6a1d30a97821 100644 --- a/crates/polars-core/src/chunked_array/object/registry.rs +++ b/crates/polars-core/src/chunked_array/object/registry.rs @@ -7,6 +7,7 @@ use std::fmt::{Debug, Formatter}; use std::ops::Deref; use std::sync::{Arc, RwLock}; +use arrow::array::builder::ArrayBuilder; use arrow::array::ArrayRef; use arrow::datatypes::ArrowDataType; use once_cell::sync::Lazy; @@ -40,7 +41,9 @@ static GLOBAL_OBJECT_REGISTRY: Lazy>> = Lazy::new( /// This trait can be registered, after which that global registration /// can be used to materialize object types -pub trait AnonymousObjectBuilder { +pub trait AnonymousObjectBuilder: ArrayBuilder { + fn as_array_builder(self: Box) -> Box; + /// # Safety /// Expect `ObjectArray` arrays. unsafe fn from_chunks(self: Box, chunks: Vec) -> Series; @@ -73,12 +76,17 @@ pub trait AnonymousObjectBuilder { } impl AnonymousObjectBuilder for ObjectChunkedBuilder { - // Expect ObjectArray arrays. + /// # Safety + /// Expects `ObjectArray` arrays. unsafe fn from_chunks(self: Box, chunks: Vec) -> Series { ObjectChunked::::new_with_compute_len(Arc::new(self.field().clone()), chunks) .into_series() } + fn as_array_builder(self: Box) -> Box { + self + } + fn append_null(&mut self) { self.append_null() } diff --git a/crates/polars-core/src/series/builder.rs b/crates/polars-core/src/series/builder.rs index 1d1f92de2b5e..71119f91171b 100644 --- a/crates/polars-core/src/series/builder.rs +++ b/crates/polars-core/src/series/builder.rs @@ -1,6 +1,8 @@ use arrow::array::builder::{make_builder, ArrayBuilder, ShareStrategy}; use polars_utils::IdxSize; +#[cfg(feature = "object")] +use crate::chunked_array::object::registry::get_object_builder; use crate::prelude::*; use crate::utils::Container; @@ -12,6 +14,13 @@ pub struct SeriesBuilder { impl SeriesBuilder { pub fn new(dtype: DataType) -> Self { + // FIXME: get rid of this hack. + #[cfg(feature = "object")] + if matches!(dtype, DataType::Object(_)) { + let builder = get_object_builder(PlSmallStr::EMPTY, 0).as_array_builder(); + return Self { dtype, builder }; + } + let builder = make_builder(&dtype.to_physical().to_arrow(CompatLevel::newest())); Self { dtype, builder } } diff --git a/crates/polars-expr/src/hash_keys.rs b/crates/polars-expr/src/hash_keys.rs index 639fa6fa8fdd..1674eb705fd1 100644 --- a/crates/polars-expr/src/hash_keys.rs +++ b/crates/polars-expr/src/hash_keys.rs @@ -74,10 +74,25 @@ impl HashKeys { self.len() == 0 } - /// After this call partition_idxs[p] will contain the indices of hashes - /// that belong to partition p, and the cardinality sketches are updated - /// accordingly. - pub fn gen_partition_idxs( + /// After this call partitions will be extended with the partition for each + /// hash. Nulls are assigned IdxSize::MAX or a specific partition depending + /// on whether partition_nulls is true. + pub fn gen_partitions( + &self, + partitioner: &HashPartitioner, + partitions: &mut Vec, + partition_nulls: bool, + ) { + match self { + Self::RowEncoded(s) => s.gen_partitions(partitioner, partitions, partition_nulls), + Self::Single(s) => s.gen_partitions(partitioner, partitions, partition_nulls), + } + } + + /// After this call partition_idxs[p] will be extended with the indices of + /// hashes that belong to partition p, and the cardinality sketches are + /// updated accordingly. + pub fn gen_idxs_per_partition( &self, partitioner: &HashPartitioner, partition_idxs: &mut [Vec], @@ -86,13 +101,13 @@ impl HashKeys { ) { if sketches.is_empty() { match self { - Self::RowEncoded(s) => s.gen_partition_idxs::( + Self::RowEncoded(s) => s.gen_idxs_per_partition::( partitioner, partition_idxs, sketches, partition_nulls, ), - Self::Single(s) => s.gen_partition_idxs::( + Self::Single(s) => s.gen_idxs_per_partition::( partitioner, partition_idxs, sketches, @@ -101,13 +116,13 @@ impl HashKeys { } } else { match self { - Self::RowEncoded(s) => s.gen_partition_idxs::( + Self::RowEncoded(s) => s.gen_idxs_per_partition::( partitioner, partition_idxs, sketches, partition_nulls, ), - Self::Single(s) => s.gen_partition_idxs::( + Self::Single(s) => s.gen_idxs_per_partition::( partitioner, partition_idxs, sketches, @@ -159,7 +174,33 @@ pub struct RowEncodedKeys { } impl RowEncodedKeys { - pub fn gen_partition_idxs( + pub fn gen_partitions( + &self, + partitioner: &HashPartitioner, + partitions: &mut Vec, + partition_nulls: bool, + ) { + partitions.reserve(self.hashes.len()); + if let Some(validity) = self.keys.validity() { + // Arbitrarily put nulls in partition 0. + let null_p = if partition_nulls { 0 } else { IdxSize::MAX }; + partitions.extend(self.hashes.values_iter().zip(validity).map(|(h, is_v)| { + if is_v { + partitioner.hash_to_partition(*h) as IdxSize + } else { + null_p + } + })) + } else { + partitions.extend( + self.hashes + .values_iter() + .map(|h| partitioner.hash_to_partition(*h) as IdxSize), + ) + } + } + + pub fn gen_idxs_per_partition( &self, partitioner: &HashPartitioner, partition_idxs: &mut [Vec], @@ -168,9 +209,6 @@ impl RowEncodedKeys { ) { assert!(partition_idxs.len() == partitioner.num_partitions()); assert!(!BUILD_SKETCHES || sketches.len() == partitioner.num_partitions()); - for p in partition_idxs.iter_mut() { - p.clear(); - } if let Some(validity) = self.keys.validity() { for (i, (h, is_v)) in self.hashes.values_iter().zip(validity).enumerate() { @@ -264,7 +302,17 @@ pub struct SingleKeys { } impl SingleKeys { - pub fn gen_partition_idxs( + #[allow(clippy::ptr_arg)] // Remove when implemented. + pub fn gen_partitions( + &self, + _partitioner: &HashPartitioner, + _partitions: &mut Vec, + _partition_nulls: bool, + ) { + todo!() + } + + pub fn gen_idxs_per_partition( &self, partitioner: &HashPartitioner, partition_idxs: &mut [Vec], @@ -272,9 +320,6 @@ impl SingleKeys { _partition_nulls: bool, ) { assert!(partitioner.num_partitions() == partition_idxs.len()); - for p in partition_idxs.iter_mut() { - p.clear(); - } todo!() } diff --git a/crates/polars-expr/src/chunked_idx_table/mod.rs b/crates/polars-expr/src/idx_table/mod.rs similarity index 52% rename from crates/polars-expr/src/chunked_idx_table/mod.rs rename to crates/polars-expr/src/idx_table/mod.rs index 948e34effad0..76a5093a1102 100644 --- a/crates/polars-expr/src/chunked_idx_table/mod.rs +++ b/crates/polars-expr/src/idx_table/mod.rs @@ -1,30 +1,38 @@ use std::any::Any; use polars_core::prelude::*; -use polars_utils::index::ChunkId; use polars_utils::IdxSize; use crate::hash_keys::HashKeys; mod row_encoded; -pub trait ChunkedIdxTable: Any + Send + Sync { - /// Creates a new empty ChunkedIdxTable similar to this one. - fn new_empty(&self) -> Box; +pub trait IdxTable: Any + Send + Sync { + /// Creates a new empty IdxTable similar to this one. + fn new_empty(&self) -> Box; /// Reserves space for the given number additional keys. fn reserve(&mut self, additional: usize); - /// Returns the number of unique keys in this ChunkedIdxTable. + /// Returns the number of unique keys in this IdxTable. fn num_keys(&self) -> IdxSize; - /// Inserts the given key chunk into this ChunkedIdxTable. - fn insert_key_chunk(&mut self, keys: HashKeys, track_unmatchable: bool); + /// Inserts the given keys into this IdxTable. + fn insert_keys(&mut self, keys: &HashKeys, track_unmatchable: bool); - /// Probe the table, updating table_match and probe_match with - /// (ChunkId, IdxSize) pairs for each match. Will stop processing new keys - /// once limit matches have been generated, returning the number of keys - /// processed. + /// Inserts a subset of the given keys into this IdxTable. + /// # Safety + /// The provided subset indices must be in-bounds. + unsafe fn insert_keys_subset( + &mut self, + keys: &HashKeys, + subset: &[IdxSize], + track_unmatchable: bool, + ); + + /// Probe the table, adding an entry to table_match and probe_match for each + /// match. Will stop processing new keys once limit matches have been + /// generated, returning the number of keys processed. /// /// If mark_matches is true, matches are marked in the table as such. /// @@ -33,7 +41,7 @@ pub trait ChunkedIdxTable: Any + Send + Sync { fn probe( &self, hash_keys: &HashKeys, - table_match: &mut Vec>, + table_match: &mut Vec, probe_match: &mut Vec, mark_matches: bool, emit_unmatched: bool, @@ -48,7 +56,7 @@ pub trait ChunkedIdxTable: Any + Send + Sync { &self, hash_keys: &HashKeys, subset: &[IdxSize], - table_match: &mut Vec>, + table_match: &mut Vec, probe_match: &mut Vec, mark_matches: bool, emit_unmatched: bool, @@ -56,10 +64,9 @@ pub trait ChunkedIdxTable: Any + Send + Sync { ) -> IdxSize; /// Get the ChunkIds for each key which was never marked during probing. - fn unmarked_keys(&self, out: &mut Vec>, offset: IdxSize, limit: IdxSize) - -> IdxSize; + fn unmarked_keys(&self, out: &mut Vec, offset: IdxSize, limit: IdxSize) -> IdxSize; } -pub fn new_chunked_idx_table(_key_schema: Arc) -> Box { - Box::new(row_encoded::RowEncodedChunkedIdxTable::new()) +pub fn new_idx_table(_key_schema: Arc) -> Box { + Box::new(row_encoded::RowEncodedIdxTable::new()) } diff --git a/crates/polars-expr/src/chunked_idx_table/row_encoded.rs b/crates/polars-expr/src/idx_table/row_encoded.rs similarity index 67% rename from crates/polars-expr/src/chunked_idx_table/row_encoded.rs rename to crates/polars-expr/src/idx_table/row_encoded.rs index 7cd8505252d0..881a78cb30bc 100644 --- a/crates/polars-expr/src/chunked_idx_table/row_encoded.rs +++ b/crates/polars-expr/src/idx_table/row_encoded.rs @@ -1,3 +1,5 @@ +#![allow(clippy::unnecessary_cast)] // Clippy doesn't recognize that IdxSize and u64 can be different. + use std::sync::atomic::{AtomicU64, Ordering}; use arrow::array::Array; @@ -10,40 +12,38 @@ use super::*; use crate::hash_keys::HashKeys; #[derive(Default)] -pub struct RowEncodedChunkedIdxTable { - // These AtomicU64s actually are ChunkIds, but we use the top bit of the - // first chunk in each to mark keys during probing. +pub struct RowEncodedIdxTable { + // These AtomicU64s actually are IdxSizes, but we use the top bit of the + // first index in each to mark keys during probing. idx_map: BytesIndexMap>, - chunk_ctr: u32, - null_keys: Vec>, + idx_offset: IdxSize, + null_keys: Vec, } -impl RowEncodedChunkedIdxTable { +impl RowEncodedIdxTable { pub fn new() -> Self { Self { idx_map: BytesIndexMap::new(), - chunk_ctr: 0, + idx_offset: 0, null_keys: Vec::new(), } } } -impl RowEncodedChunkedIdxTable { +impl RowEncodedIdxTable { #[inline(always)] fn probe_one( &self, key_idx: IdxSize, hash: u64, key: &[u8], - table_match: &mut Vec>, + table_match: &mut Vec, probe_match: &mut Vec, ) -> bool { - if let Some(chunk_ids) = self.idx_map.get(hash, key) { - for chunk_id in &chunk_ids[..] { + if let Some(idxs) = self.idx_map.get(hash, key) { + for idx in &idxs[..] { // Create matches, making sure to clear top bit. - let raw_chunk_id = chunk_id.load(Ordering::Relaxed); - let chunk_id = ChunkId::from_inner(raw_chunk_id & !(1 << 63)); - table_match.push(chunk_id); + table_match.push((idx.load(Ordering::Relaxed) & !(1 << 63)) as IdxSize); probe_match.push(key_idx); } @@ -51,10 +51,10 @@ impl RowEncodedChunkedIdxTable { // need any synchronization on the load, nor does it need a // fetch_or to do it atomically. if MARK_MATCHES { - let first_chunk_id = unsafe { chunk_ids.get_unchecked(0) }; - let first_chunk_val = first_chunk_id.load(Ordering::Relaxed); - if first_chunk_val >> 63 == 0 { - first_chunk_id.store(first_chunk_val | (1 << 63), Ordering::Release); + let first_idx = unsafe { idxs.get_unchecked(0) }; + let first_idx_val = first_idx.load(Ordering::Relaxed); + if first_idx_val >> 63 == 0 { + first_idx.store(first_idx_val | (1 << 63), Ordering::Release); } } true @@ -66,13 +66,10 @@ impl RowEncodedChunkedIdxTable { fn probe_impl<'a, const MARK_MATCHES: bool, const EMIT_UNMATCHED: bool>( &self, hash_keys: impl Iterator)>, - table_match: &mut Vec>, + table_match: &mut Vec, probe_match: &mut Vec, limit: IdxSize, ) -> IdxSize { - table_match.clear(); - probe_match.clear(); - let mut keys_processed = 0; for (key_idx, hash, key) in hash_keys { let found_match = if let Some(key) = key { @@ -82,7 +79,7 @@ impl RowEncodedChunkedIdxTable { }; if EMIT_UNMATCHED && !found_match { - table_match.push(ChunkId::null()); + table_match.push(IdxSize::MAX); probe_match.push(key_idx); } @@ -97,7 +94,7 @@ impl RowEncodedChunkedIdxTable { fn probe_dispatch<'a>( &self, hash_keys: impl Iterator)>, - table_match: &mut Vec>, + table_match: &mut Vec, probe_match: &mut Vec, mark_matches: bool, emit_unmatched: bool, @@ -120,8 +117,8 @@ impl RowEncodedChunkedIdxTable { } } -impl ChunkedIdxTable for RowEncodedChunkedIdxTable { - fn new_empty(&self) -> Box { +impl IdxTable for RowEncodedIdxTable { + fn new_empty(&self) -> Box { Box::new(Self::new()) } @@ -133,13 +130,17 @@ impl ChunkedIdxTable for RowEncodedChunkedIdxTable { self.idx_map.len() } - fn insert_key_chunk(&mut self, hash_keys: HashKeys, track_unmatchable: bool) { + fn insert_keys(&mut self, hash_keys: &HashKeys, track_unmatchable: bool) { let HashKeys::RowEncoded(hash_keys) = hash_keys else { unreachable!() }; - if hash_keys.keys.len() >= 1 << 31 { - panic!("overly large chunk in RowEncodedChunkedIdxTable"); - } + let new_idx_offset = (self.idx_offset as usize) + .checked_add(hash_keys.keys.len()) + .unwrap(); + assert!( + new_idx_offset < IdxSize::MAX as usize, + "overly large index in RowEncodedIdxTable" + ); for (i, (hash, key)) in hash_keys .hashes @@ -147,29 +148,66 @@ impl ChunkedIdxTable for RowEncodedChunkedIdxTable { .zip(hash_keys.keys.iter()) .enumerate_idx() { - let chunk_id = ChunkId::<32>::store(self.chunk_ctr as IdxSize, i); + let idx = self.idx_offset + i; if let Some(key) = key { - let chunk_id = AtomicU64::new(chunk_id.into_inner()); match self.idx_map.entry(*hash, key) { Entry::Occupied(o) => { - o.into_mut().push(chunk_id); + o.into_mut().push(AtomicU64::new(idx as u64)); + }, + Entry::Vacant(v) => { + v.insert(unitvec![AtomicU64::new(idx as u64)]); + }, + } + } else if track_unmatchable { + self.null_keys.push(idx); + } + } + + self.idx_offset = new_idx_offset as IdxSize; + } + + unsafe fn insert_keys_subset( + &mut self, + hash_keys: &HashKeys, + subset: &[IdxSize], + track_unmatchable: bool, + ) { + let HashKeys::RowEncoded(hash_keys) = hash_keys else { + unreachable!() + }; + let new_idx_offset = (self.idx_offset as usize) + .checked_add(subset.len()) + .unwrap(); + assert!( + new_idx_offset < IdxSize::MAX as usize, + "overly large index in RowEncodedIdxTable" + ); + + for (i, subset_idx) in subset.iter().enumerate_idx() { + let hash = unsafe { hash_keys.hashes.value_unchecked(*subset_idx as usize) }; + let key = unsafe { hash_keys.keys.get_unchecked(*subset_idx as usize) }; + let idx = self.idx_offset + i; + if let Some(key) = key { + match self.idx_map.entry(hash, key) { + Entry::Occupied(o) => { + o.into_mut().push(AtomicU64::new(idx as u64)); }, Entry::Vacant(v) => { - v.insert(unitvec![chunk_id]); + v.insert(unitvec![AtomicU64::new(idx as u64)]); }, } } else if track_unmatchable { - self.null_keys.push(chunk_id); + self.null_keys.push(idx); } } - self.chunk_ctr = self.chunk_ctr.checked_add(1).unwrap(); + self.idx_offset = new_idx_offset as IdxSize; } fn probe( &self, hash_keys: &HashKeys, - table_match: &mut Vec>, + table_match: &mut Vec, probe_match: &mut Vec, mark_matches: bool, emit_unmatched: bool, @@ -218,7 +256,7 @@ impl ChunkedIdxTable for RowEncodedChunkedIdxTable { &self, hash_keys: &HashKeys, subset: &[IdxSize], - table_match: &mut Vec>, + table_match: &mut Vec, probe_match: &mut Vec, mark_matches: bool, emit_unmatched: bool, @@ -265,7 +303,7 @@ impl ChunkedIdxTable for RowEncodedChunkedIdxTable { fn unmarked_keys( &self, - out: &mut Vec>, + out: &mut Vec, mut offset: IdxSize, limit: IdxSize, ) -> IdxSize { @@ -288,14 +326,12 @@ impl ChunkedIdxTable for RowEncodedChunkedIdxTable { offset -= self.null_keys.len() as IdxSize; - while let Some((_, _, chunk_ids)) = self.idx_map.get_index(offset) { - let first_chunk_id = unsafe { chunk_ids.get_unchecked(0) }; - let first_chunk_val = first_chunk_id.load(Ordering::Acquire); - if first_chunk_val >> 63 == 0 { - for chunk_id in &chunk_ids[..] { - let raw_chunk_id = chunk_id.load(Ordering::Relaxed); - let chunk_id = ChunkId::from_inner(raw_chunk_id & !(1 << 63)); - out.push(chunk_id); + while let Some((_, _, idxs)) = self.idx_map.get_index(offset) { + let first_idx = unsafe { idxs.get_unchecked(0) }; + let first_idx_val = first_idx.load(Ordering::Acquire); + if first_idx_val >> 63 == 0 { + for idx in &idxs[..] { + out.push((idx.load(Ordering::Relaxed) & !(1 << 63)) as IdxSize); } } diff --git a/crates/polars-expr/src/lib.rs b/crates/polars-expr/src/lib.rs index 138068e3c268..2683971d96de 100644 --- a/crates/polars-expr/src/lib.rs +++ b/crates/polars-expr/src/lib.rs @@ -1,7 +1,7 @@ -pub mod chunked_idx_table; mod expressions; pub mod groups; pub mod hash_keys; +pub mod idx_table; pub mod planner; pub mod prelude; pub mod reduce; diff --git a/crates/polars-stream/Cargo.toml b/crates/polars-stream/Cargo.toml index cbf2f9fbe528..731e373ceff1 100644 --- a/crates/polars-stream/Cargo.toml +++ b/crates/polars-stream/Cargo.toml @@ -9,7 +9,9 @@ repository = { workspace = true } description = "Private crate for the streaming execution engine for the Polars DataFrame library" [dependencies] +arrow = { workspace = true } atomic-waker = { workspace = true } +crossbeam-channel = { workspace = true } crossbeam-deque = { workspace = true } crossbeam-queue = { workspace = true } crossbeam-utils = { workspace = true } diff --git a/crates/polars-stream/src/nodes/joins/equi_join.rs b/crates/polars-stream/src/nodes/joins/equi_join.rs index 051440985e40..14a37dab855f 100644 --- a/crates/polars-stream/src/nodes/joins/equi_join.rs +++ b/crates/polars-stream/src/nodes/joins/equi_join.rs @@ -1,22 +1,25 @@ +use std::cmp::Reverse; +use std::collections::BinaryHeap; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, LazyLock}; +use arrow::array::builder::ShareStrategy; use crossbeam_queue::ArrayQueue; +use polars_core::frame::builder::DataFrameBuilder; use polars_core::prelude::*; use polars_core::schema::{Schema, SchemaExt}; -use polars_core::series::IsSorted; -use polars_core::utils::accumulate_dataframes_vertical_unchecked; use polars_core::{config, POOL}; -use polars_expr::chunked_idx_table::{new_chunked_idx_table, ChunkedIdxTable}; use polars_expr::hash_keys::HashKeys; +use polars_expr::idx_table::{new_idx_table, IdxTable}; use polars_io::pl_async::get_runtime; use polars_ops::frame::{JoinArgs, JoinType, MaintainOrderJoin}; -use polars_ops::prelude::TakeChunked; use polars_ops::series::coalesce_columns; use polars_utils::cardinality_sketch::CardinalitySketch; use polars_utils::hashing::HashPartitioner; use polars_utils::itertools::Itertools; use polars_utils::pl_str::PlSmallStr; +use polars_utils::priority::Priority; +use polars_utils::sparse_init_vec::SparseInitVec; use polars_utils::{format_pl_smallstr, IdxSize}; use rayon::prelude::*; @@ -157,15 +160,48 @@ fn estimate_cardinality( key_selectors: &[StreamExpr], params: &EquiJoinParams, state: &ExecutionState, -) -> PolarsResult { - // TODO: parallelize. - let mut sketch = CardinalitySketch::new(); - for morsel in morsels { - let hash_keys = - get_runtime().block_on(select_keys(morsel.df(), key_selectors, params, state))?; - hash_keys.sketch_cardinality(&mut sketch); +) -> PolarsResult { + let sample_limit = *SAMPLE_LIMIT; + if morsels.is_empty() || sample_limit == 0 { + return Ok(0.0); } - Ok(sketch.estimate()) + + let mut total_height = 0; + let mut to_process_end = 0; + while to_process_end < morsels.len() && total_height < sample_limit { + total_height += morsels[to_process_end].df().height(); + to_process_end += 1; + } + let last_morsel_idx = to_process_end - 1; + let last_morsel_len = morsels[last_morsel_idx].df().height(); + let last_morsel_slice = last_morsel_len - total_height.saturating_sub(sample_limit); + let runtime = get_runtime(); + + POOL.install(|| { + let sample_cardinality = morsels[..to_process_end] + .par_iter() + .enumerate() + .try_fold( + CardinalitySketch::new, + |mut sketch, (morsel_idx, morsel)| { + let sliced; + let df = if morsel_idx == last_morsel_idx { + sliced = morsel.df().slice(0, last_morsel_slice); + &sliced + } else { + morsel.df() + }; + let hash_keys = + runtime.block_on(select_keys(df, key_selectors, params, state))?; + hash_keys.sketch_cardinality(&mut sketch); + PolarsResult::Ok(sketch) + }, + ) + .map(|sketch| PolarsResult::Ok(sketch?.estimate())) + .try_reduce_with(|a, b| Ok(a + b)) + .unwrap()?; + Ok(sample_cardinality as f64 / total_height.min(sample_limit) as f64) + }) } struct BufferedStream { @@ -311,7 +347,7 @@ impl SampleState { recv: &[PortState], num_pipelines: usize, params: &mut EquiJoinParams, - table: &mut Option>, + table: &mut Option>, ) -> PolarsResult> { let left_saturated = self.left_len >= *SAMPLE_LIMIT; let right_saturated = self.right_len >= *SAMPLE_LIMIT; @@ -346,38 +382,18 @@ impl SampleState { params, &execution_state, )?; - let norm_left_factor = self.left_len.min(*SAMPLE_LIMIT) as f64 / self.left_len as f64; - let norm_right_factor = - self.right_len.min(*SAMPLE_LIMIT) as f64 / self.right_len as f64; - let norm_left_cardinality = (left_cardinality as f64 * norm_left_factor) as usize; - let norm_right_cardinality = (right_cardinality as f64 * norm_right_factor) as usize; if config::verbose() { - eprintln!("estimated cardinalities are: {norm_left_cardinality} vs. {norm_right_cardinality}"); + eprintln!( + "estimated cardinalities are: {left_cardinality} vs. {right_cardinality}" + ); } - PolarsResult::Ok((norm_left_cardinality, norm_right_cardinality)) + PolarsResult::Ok((left_cardinality, right_cardinality)) }; let left_is_build = match (left_saturated, right_saturated) { - (false, false) => { - if self.left_len * LOPSIDED_SAMPLE_FACTOR < self.right_len - || self.left_len > self.right_len * LOPSIDED_SAMPLE_FACTOR - { - // Don't bother estimating cardinality, just choose smaller as it's highly - // imbalanced. - self.left_len < self.right_len - } else { - let (lc, rc) = estimate_cardinalities()?; - // Let's assume for now that per element building a - // table is 3x more expensive than a probe, with - // unique keys getting an additional 3x factor for - // having to update the hash table in addition to the probe. - let left_build_cost = self.left_len * 3 + 3 * lc; - let left_probe_cost = self.left_len; - let right_build_cost = self.right_len * 3 + 3 * rc; - let right_probe_cost = self.right_len; - left_build_cost + right_probe_cost < left_probe_cost + right_build_cost - } - }, + // Don't bother estimating cardinality, just choose smaller side as + // we have everything in-memory anyway. + (false, false) => self.left_len < self.right_len, // Choose the unsaturated side, the saturated side could be // arbitrarily big. @@ -401,9 +417,9 @@ impl SampleState { // Transition to building state. params.left_is_build = Some(left_is_build); *table = Some(if left_is_build { - new_chunked_idx_table(params.left_key_schema.clone()) + new_idx_table(params.left_key_schema.clone()) } else { - new_chunked_idx_table(params.right_key_schema.clone()) + new_idx_table(params.right_key_schema.clone()) }); let mut sampled_build_morsels = @@ -416,7 +432,9 @@ impl SampleState { let partitioner = HashPartitioner::new(num_pipelines, 0); let mut build_state = BuildState { - partitions_per_worker: (0..num_pipelines).map(|_| Vec::new()).collect(), + local_builders: (0..num_pipelines) + .map(|_| LocalBuilder::default()) + .collect(), sampled_probe_morsels, }; @@ -429,13 +447,12 @@ impl SampleState { .reinsert(num_pipelines, None, scope, &mut join_handles) .unwrap(); - for (worker_ps, recv) in build_state.partitions_per_worker.iter_mut().zip(receivers) - { + for (local_builder, recv) in build_state.local_builders.iter_mut().zip(receivers) { join_handles.push(scope.spawn_task( TaskPriority::High, BuildState::partition_and_sink( recv, - worker_ps, + local_builder, partitioner.clone(), params, &state, @@ -457,30 +474,48 @@ impl SampleState { } #[derive(Default)] -struct BuildPartition { - hash_keys: Vec, - frames: Vec<(MorselSeq, DataFrame)>, - sketch: Option, +struct LocalBuilder { + // The complete list of morsels and their computed hashes seen by this builder. + morsels: Vec<(MorselSeq, DataFrame, HashKeys)>, + + // A cardinality sketch per partition for the keys seen by this builder. + sketch_per_p: Vec, + + // morsel_idxs_values_per_p[p][start..stop] contains the offsets into morsels[i] + // for partition p, where start, stop are: + // let start = morsel_idxs_offsets[i * num_partitions + p]; + // let stop = morsel_idxs_offsets[(i + 1) * num_partitions + p]; + morsel_idxs_values_per_p: Vec>, + morsel_idxs_offsets_per_p: Vec, } #[derive(Default)] struct BuildState { - partitions_per_worker: Vec>, + local_builders: Vec, sampled_probe_morsels: BufferedStream, } impl BuildState { async fn partition_and_sink( mut recv: Receiver, - partitions: &mut Vec, + local: &mut LocalBuilder, partitioner: HashPartitioner, params: &EquiJoinParams, state: &ExecutionState, ) -> PolarsResult<()> { let track_unmatchable = params.emit_unmatched_build(); - let mut partition_idxs = vec![Vec::new(); partitioner.num_partitions()]; - partitions.resize_with(partitioner.num_partitions(), BuildPartition::default); - let mut sketches = vec![CardinalitySketch::default(); partitioner.num_partitions()]; + local + .sketch_per_p + .resize_with(partitioner.num_partitions(), Default::default); + local + .morsel_idxs_values_per_p + .resize_with(partitioner.num_partitions(), Default::default); + + if local.morsel_idxs_offsets_per_p.is_empty() { + local + .morsel_idxs_offsets_per_p + .resize(partitioner.num_partitions(), 0); + } let (key_selectors, payload_selector); if params.left_is_build.unwrap() { @@ -493,137 +528,254 @@ impl BuildState { while let Ok(morsel) = recv.recv().await { // Compute hashed keys and payload. We must rechunk the payload for - // later chunked gathers. + // later gathers. let hash_keys = select_keys(morsel.df(), key_selectors, params, state).await?; let mut payload = select_payload(morsel.df().clone(), payload_selector); payload.rechunk_mut(); - payload._deshare_views_mut(); - unsafe { - hash_keys.gen_partition_idxs( - &partitioner, - &mut partition_idxs, - &mut sketches, - track_unmatchable, - ); - for (p, idxs_in_p) in partitions.iter_mut().zip(&partition_idxs) { - let payload_for_partition = payload.take_slice_unchecked_impl(idxs_in_p, false); - p.hash_keys.push(hash_keys.gather(idxs_in_p)); - p.frames.push((morsel.seq(), payload_for_partition)); - } - } - } + hash_keys.gen_idxs_per_partition( + &partitioner, + &mut local.morsel_idxs_values_per_p, + &mut local.sketch_per_p, + track_unmatchable, + ); - for (p, sketch) in sketches.into_iter().enumerate() { - partitions[p].sketch = Some(sketch); + local + .morsel_idxs_offsets_per_p + .extend(local.morsel_idxs_values_per_p.iter().map(|vp| vp.len())); + local.morsels.push((morsel.seq(), payload, hash_keys)); } - Ok(()) } - fn finalize(&mut self, params: &EquiJoinParams, table: &dyn ChunkedIdxTable) -> ProbeState { - // Transpose. - let num_workers = self.partitions_per_worker.len(); - let num_partitions = self.partitions_per_worker[0].len(); - let mut results_per_partition = (0..num_partitions) - .map(|_| Vec::with_capacity(num_workers)) - .collect_vec(); - for worker in self.partitions_per_worker.drain(..) { - for (p, result) in worker.into_iter().enumerate() { - results_per_partition[p].push(result); - } - } + fn finalize_ordered(&mut self, params: &EquiJoinParams, table: &dyn IdxTable) -> ProbeState { + let track_unmatchable = params.emit_unmatched_build(); + let payload_schema = if params.left_is_build.unwrap() { + ¶ms.left_payload_schema + } else { + ¶ms.right_payload_schema + }; - POOL.install(|| { - let track_unmatchable = params.emit_unmatched_build(); - let table_per_partition: Vec<_> = results_per_partition - .into_par_iter() - .with_max_len(1) - .map(|results| { - // Estimate sizes and cardinality. + let num_partitions = self.local_builders[0].sketch_per_p.len(); + let local_builders = &self.local_builders; + let probe_tables: SparseInitVec = SparseInitVec::with_capacity(num_partitions); + + POOL.scope(|s| { + for p in 0..num_partitions { + let probe_tables = &probe_tables; + s.spawn(move |_| { + // TODO: every thread does an identical linearize, we can do a single parallel one. + let mut kmerge = BinaryHeap::with_capacity(local_builders.len()); + let mut cur_idx_per_loc = vec![0; local_builders.len()]; + + // Compute cardinality estimate and total amount of + // payload for this partition, and initialize k-way merge. let mut sketch = CardinalitySketch::new(); - let mut num_frames = 0; - for result in &results { - sketch.combine(result.sketch.as_ref().unwrap()); - num_frames += result.frames.len(); + let mut payload_rows = 0; + for (l_idx, l) in local_builders.iter().enumerate() { + let Some((seq, _, _)) = l.morsels.first() else { + continue; + }; + kmerge.push(Priority(Reverse(seq), l_idx)); + + sketch.combine(&l.sketch_per_p[p]); + let offsets_len = l.morsel_idxs_offsets_per_p.len(); + payload_rows += + l.morsel_idxs_offsets_per_p[offsets_len - num_partitions + p]; } - // Build table for this partition. - let mut combined_frames = Vec::with_capacity(num_frames); - let mut chunk_seq_ids = Vec::with_capacity(num_frames); - let mut table = table.new_empty(); - table.reserve(sketch.estimate() * 5 / 4); - if params.preserve_order_build { - let mut combined = Vec::with_capacity(num_frames); - for result in results { - for (hash_keys, (seq, frame)) in - result.hash_keys.into_iter().zip(result.frames) - { - combined.push((seq, hash_keys, frame)); + // Allocate hash table and payload builder. + let mut p_table = table.new_empty(); + p_table.reserve(sketch.estimate() * 5 / 4); + let mut p_payload = DataFrameBuilder::new(payload_schema.clone()); + p_payload.reserve(payload_rows); + + let mut p_seq_ids = Vec::new(); + if track_unmatchable { + p_seq_ids.reserve(payload_rows); + } + + // Linearize and build. + unsafe { + let mut norm_seq_id = 0 as IdxSize; + while let Some(Priority(Reverse(_seq), l_idx)) = kmerge.pop() { + let l = local_builders.get_unchecked(l_idx); + let idx_in_l = *cur_idx_per_loc.get_unchecked(l_idx); + *cur_idx_per_loc.get_unchecked_mut(l_idx) += 1; + if let Some((next_seq, _, _)) = l.morsels.get(idx_in_l + 1) { + kmerge.push(Priority(Reverse(next_seq), l_idx)); } - } - combined.sort_unstable_by_key(|c| c.0); - for (seq, hash_keys, frame) in combined { - // Zero-sized chunks can get deleted, so skip entirely to avoid messing - // up the chunk counter. - if frame.height() == 0 { - continue; + let (_mseq, payload, keys) = l.morsels.get_unchecked(idx_in_l); + let p_morsel_idxs_start = + l.morsel_idxs_offsets_per_p[idx_in_l * num_partitions + p]; + let p_morsel_idxs_stop = + l.morsel_idxs_offsets_per_p[(idx_in_l + 1) * num_partitions + p]; + let p_morsel_idxs = &l.morsel_idxs_values_per_p[p] + [p_morsel_idxs_start..p_morsel_idxs_stop]; + p_table.insert_keys_subset(keys, p_morsel_idxs, track_unmatchable); + p_payload.gather_extend(payload, p_morsel_idxs, ShareStrategy::Never); + + if track_unmatchable { + p_seq_ids.resize(p_payload.len(), norm_seq_id); + norm_seq_id += 1; } + } + } + + probe_tables + .try_set( + p, + ProbeTable { + hash_table: p_table, + payload: p_payload.freeze(), + seq_ids: p_seq_ids, + }, + ) + .ok() + .unwrap(); + }); + } + }); + + ProbeState { + table_per_partition: probe_tables.try_assume_init().ok().unwrap(), + max_seq_sent: MorselSeq::default(), + sampled_probe_morsels: core::mem::take(&mut self.sampled_probe_morsels), + } + } + + fn finalize_unordered(&mut self, params: &EquiJoinParams, table: &dyn IdxTable) -> ProbeState { + let track_unmatchable = params.emit_unmatched_build(); + let payload_schema = if params.left_is_build.unwrap() { + ¶ms.left_payload_schema + } else { + ¶ms.right_payload_schema + }; - table.insert_key_chunk(hash_keys, track_unmatchable); - combined_frames.push(frame); - chunk_seq_ids.push(seq); + // To reduce maximum memory usage we want to drop the morsels + // as soon as they're processed, so we move into Arcs. The drops might + // also be expensive, so instead of directly dropping we put that on + // a work queue. + let morsels_per_local_builder = self + .local_builders + .iter_mut() + .map(|b| Arc::new(core::mem::take(&mut b.morsels))) + .collect_vec(); + let (morsel_drop_q_send, morsel_drop_q_recv) = + crossbeam_channel::bounded(morsels_per_local_builder.len()); + let num_partitions = self.local_builders[0].sketch_per_p.len(); + let local_builders = &self.local_builders; + let probe_tables: SparseInitVec = SparseInitVec::with_capacity(num_partitions); + + POOL.scope(|s| { + // Wrap in outer Arc to move to each thread, performing the + // expensive clone on that thread. + let arc_morsels_per_local_builder = Arc::new(morsels_per_local_builder); + for p in 0..num_partitions { + let arc_morsels_per_local_builder = Arc::clone(&arc_morsels_per_local_builder); + let morsel_drop_q_send = morsel_drop_q_send.clone(); + let morsel_drop_q_recv = morsel_drop_q_recv.clone(); + let probe_tables = &probe_tables; + s.spawn(move |_| { + // Extract from outer arc and drop outer arc. + let morsels_per_local_builder = + Arc::unwrap_or_clone(arc_morsels_per_local_builder); + + // Compute cardinality estimate and total amount of + // payload for this partition. + let mut sketch = CardinalitySketch::new(); + let mut payload_rows = 0; + for l in local_builders { + sketch.combine(&l.sketch_per_p[p]); + let offsets_len = l.morsel_idxs_offsets_per_p.len(); + payload_rows += + l.morsel_idxs_offsets_per_p[offsets_len - num_partitions + p]; + } + + // Allocate hash table and payload builder. + let mut p_table = table.new_empty(); + p_table.reserve(sketch.estimate() * 5 / 4); + let mut p_payload = DataFrameBuilder::new(payload_schema.clone()); + p_payload.reserve(payload_rows); + + // Build. + let mut skip_drop_attempt = false; + for (l, l_morsels) in local_builders.iter().zip(morsels_per_local_builder) { + // Try to help with dropping the processed morsels. + if !skip_drop_attempt { + drop(morsel_drop_q_recv.try_recv()); } - } else { - for result in results { - for (hash_keys, (_, frame)) in - result.hash_keys.into_iter().zip(result.frames) - { - // Zero-sized chunks can get deleted, so skip entirely to avoid messing - // up the chunk counter. - if frame.height() == 0 { - continue; - } - table.insert_key_chunk(hash_keys, track_unmatchable); - combined_frames.push(frame); + for (i, morsel) in l_morsels.iter().enumerate() { + let (_mseq, payload, keys) = morsel; + unsafe { + let p_morsel_idxs_start = + l.morsel_idxs_offsets_per_p[i * num_partitions + p]; + let p_morsel_idxs_stop = + l.morsel_idxs_offsets_per_p[(i + 1) * num_partitions + p]; + let p_morsel_idxs = &l.morsel_idxs_values_per_p[p] + [p_morsel_idxs_start..p_morsel_idxs_stop]; + p_table.insert_keys_subset(keys, p_morsel_idxs, track_unmatchable); + p_payload.gather_extend( + payload, + p_morsel_idxs, + ShareStrategy::Never, + ); } } - } - let df = if combined_frames.is_empty() { - if params.left_is_build.unwrap() { - DataFrame::empty_with_schema(¶ms.left_payload_schema) + if let Some(l) = Arc::into_inner(l_morsels) { + // If we're the last thread to process this set of morsels we're probably + // falling behind the rest, since the drop can be quite expensive we skip + // a drop attempt hoping someone else will pick up the slack. + morsel_drop_q_send.send(l).unwrap(); + skip_drop_attempt = true; } else { - DataFrame::empty_with_schema(¶ms.right_payload_schema) + skip_drop_attempt = false; } - } else { - accumulate_dataframes_vertical_unchecked(combined_frames) - }; - ProbeTable { - table, - df, - chunk_seq_ids, } - }) - .collect(); - ProbeState { - table_per_partition, - max_seq_sent: MorselSeq::default(), - sampled_probe_morsels: core::mem::take(&mut self.sampled_probe_morsels), + // We're done, help others out by doing drops. + drop(morsel_drop_q_send); // So we don't deadlock trying to receive from ourselves. + while let Ok(l_morsels) = morsel_drop_q_recv.recv() { + drop(l_morsels); + } + + probe_tables + .try_set( + p, + ProbeTable { + hash_table: p_table, + payload: p_payload.freeze(), + seq_ids: Vec::new(), + }, + ) + .ok() + .unwrap(); + }); } - }) + + // Drop outer arc after spawning each thread so the inner arcs + // can get dropped as soon as they're processed. We also have to + // drop the drop queue sender so we don't deadlock waiting for it + // to end. + drop(arc_morsels_per_local_builder); + drop(morsel_drop_q_send); + }); + + ProbeState { + table_per_partition: probe_tables.try_assume_init().ok().unwrap(), + max_seq_sent: MorselSeq::default(), + sampled_probe_morsels: core::mem::take(&mut self.sampled_probe_morsels), + } } } struct ProbeTable { - // Important that df is not rechunked, the chunks it was inserted with - // into the table must be preserved for chunked gathers. - table: Box, - df: DataFrame, - chunk_seq_ids: Vec, + hash_table: Box, + payload: DataFrame, + seq_ids: Vec, } struct ProbeState { @@ -644,6 +796,8 @@ impl ProbeState { ) -> PolarsResult { // TODO: shuffle after partitioning and keep probe tables thread-local. let mut partition_idxs = vec![Vec::new(); partitioner.num_partitions()]; + let mut probe_partitions = Vec::new(); + let mut materialized_idxsize_range = Vec::new(); let mut table_match = Vec::new(); let mut probe_match = Vec::new(); let mut max_seq = MorselSeq::default(); @@ -652,238 +806,291 @@ impl ProbeState { let mark_matches = params.emit_unmatched_build(); let emit_unmatched = params.emit_unmatched_probe(); - let (key_selectors, payload_selector); + let (key_selectors, payload_selector, build_payload_schema, probe_payload_schema); if params.left_is_build.unwrap() { - payload_selector = ¶ms.right_payload_select; key_selectors = ¶ms.right_key_selectors; + payload_selector = ¶ms.right_payload_select; + build_payload_schema = ¶ms.left_payload_schema; + probe_payload_schema = ¶ms.right_payload_schema; } else { - payload_selector = ¶ms.left_payload_select; key_selectors = ¶ms.left_key_selectors; + payload_selector = ¶ms.left_payload_select; + build_payload_schema = ¶ms.right_payload_schema; + probe_payload_schema = ¶ms.left_payload_schema; }; + let mut build_out = DataFrameBuilder::new(build_payload_schema.clone()); + let mut probe_out = DataFrameBuilder::new(probe_payload_schema.clone()); + + // A simple estimate used to size reserves. + let mut selectivity_estimate = 1.0; + while let Ok(morsel) = recv.recv().await { // Compute hashed keys and payload. let (df, seq, src_token, wait_token) = morsel.into_inner(); + max_seq = seq; + + let df_height = df.height(); + if df_height == 0 { + continue; + } + let hash_keys = select_keys(&df, key_selectors, params, state).await?; let mut payload = select_payload(df, payload_selector); let mut payload_rechunked = false; // We don't eagerly rechunk because there might be no matches. + let mut total_matches = 0; - max_seq = seq; + // Use selectivity estimate to reserve for morsel builders. + let max_match_per_key_est = selectivity_estimate as usize + 16; + let out_est_size = ((selectivity_estimate * 1.2 * df_height as f64) as usize) + .min(probe_limit as usize); + build_out.reserve(out_est_size + max_match_per_key_est); unsafe { - // Partition and probe the tables. - hash_keys.gen_partition_idxs( - &partitioner, - &mut partition_idxs, - &mut [], - emit_unmatched, - ); - if params.preserve_order_probe { - // TODO: non-sort based implementation, can directly scatter - // after finding matches for each partition. - let mut out_per_partition = Vec::with_capacity(partitioner.num_partitions()); - let name = PlSmallStr::from_static("__POLARS_PROBE_PRESERVE_ORDER_IDX"); - for (p, idxs_in_p) in partitions.iter().zip(&partition_idxs) { - p.table.probe_subset( - &hash_keys, - idxs_in_p, - &mut table_match, - &mut probe_match, - mark_matches, - emit_unmatched, - IdxSize::MAX, - ); + let new_morsel = |build: &mut DataFrameBuilder, probe: &mut DataFrameBuilder| { + let mut build_df = build.freeze_reset(); + let mut probe_df = probe.freeze_reset(); + let out_df = if params.left_is_build.unwrap() { + build_df.hstack_mut_unchecked(probe_df.get_columns()); + build_df + } else { + probe_df.hstack_mut_unchecked(build_df.get_columns()); + probe_df + }; + let out_df = postprocess_join(out_df, params); + Morsel::new(out_df, seq, src_token.clone()) + }; - if table_match.is_empty() { - continue; + if params.preserve_order_probe { + // To preserve the order we can't do bulk probes per partition and must follow + // the order of the probe morsel. We can still group probes that are + // consecutively on the same partition. + probe_partitions.clear(); + hash_keys.gen_partitions(&partitioner, &mut probe_partitions, emit_unmatched); + let mut probe_group_start = 0; + while probe_group_start < probe_partitions.len() { + let p_idx = probe_partitions[probe_group_start]; + let mut probe_group_end = probe_group_start + 1; + while probe_partitions.get(probe_group_end) == Some(&p_idx) { + probe_group_end += 1; } - - // Gather output and add to buffer. - let mut build_df = if emit_unmatched { - p.df.take_opt_chunked_unchecked(&table_match, false) - } else { - p.df.take_chunked_unchecked(&table_match, IsSorted::Not, false) + let Some(p) = partitions.get(p_idx as usize) else { + probe_group_start = probe_group_end; + continue; }; - if !payload_rechunked { - // TODO: can avoid rechunk? We have to rechunk here or else we do it - // multiple times during the gather. - payload.rechunk_mut(); - payload_rechunked = true; - } - let mut probe_df = payload.take_slice_unchecked_impl(&probe_match, false); + materialized_idxsize_range.extend( + materialized_idxsize_range.len() as IdxSize..probe_group_end as IdxSize, + ); - let mut out_df = if params.left_is_build.unwrap() { - build_df.hstack_mut_unchecked(probe_df.get_columns()); - build_df - } else { - probe_df.hstack_mut_unchecked(build_df.get_columns()); - probe_df - }; + while probe_group_start < probe_group_end { + let matches_before_limit = probe_limit - probe_match.len() as IdxSize; + table_match.clear(); + probe_group_start += p.hash_table.probe_subset( + &hash_keys, + &materialized_idxsize_range[probe_group_start..probe_group_end], + &mut table_match, + &mut probe_match, + mark_matches, + emit_unmatched, + matches_before_limit, + ) as usize; - let idxs_ca = - IdxCa::from_vec(name.clone(), core::mem::take(&mut probe_match)); - out_df.with_column_unchecked(idxs_ca.into_column()); - out_per_partition.push(out_df); - } + if emit_unmatched { + build_out.opt_gather_extend( + &p.payload, + &table_match, + ShareStrategy::Always, + ); + } else { + build_out.gather_extend( + &p.payload, + &table_match, + ShareStrategy::Always, + ); + }; - if !out_per_partition.is_empty() { - let sort_options = SortMultipleOptions { - descending: vec![false], - nulls_last: vec![false], - multithreaded: false, - maintain_order: true, - limit: None, - }; - let mut out_df = - accumulate_dataframes_vertical_unchecked(out_per_partition); - out_df.sort_in_place([name.clone()], sort_options).unwrap(); - out_df.drop_in_place(&name).unwrap(); - out_df = postprocess_join(out_df, params); - - // TODO: break in smaller morsels. - let out_morsel = Morsel::new(out_df, seq, src_token.clone()); - if send.send(out_morsel).await.is_err() { - break; + if probe_match.len() >= probe_limit as usize + || probe_group_start == probe_partitions.len() + { + if !payload_rechunked { + payload.rechunk_mut(); + payload_rechunked = true; + } + probe_out.gather_extend( + &payload, + &probe_match, + ShareStrategy::Always, + ); + probe_match.clear(); + let out_morsel = new_morsel(&mut build_out, &mut probe_out); + if send.send(out_morsel).await.is_err() { + return Ok(max_seq); + } + if probe_group_end != probe_partitions.len() { + // We had enough matches to need a mid-partition flush, let's assume there are a lot of + // matches and just do a large reserve. + build_out.reserve(probe_limit as usize + max_match_per_key_est); + } + } } } } else { - let mut out_frames = Vec::new(); - let mut out_len = 0; + // Partition and probe the tables. + for p in partition_idxs.iter_mut() { + p.clear(); + } + hash_keys.gen_idxs_per_partition( + &partitioner, + &mut partition_idxs, + &mut [], + emit_unmatched, + ); + for (p, idxs_in_p) in partitions.iter().zip(&partition_idxs) { let mut offset = 0; while offset < idxs_in_p.len() { - offset += p.table.probe_subset( + let matches_before_limit = probe_limit - probe_match.len() as IdxSize; + table_match.clear(); + offset += p.hash_table.probe_subset( &hash_keys, &idxs_in_p[offset..], &mut table_match, &mut probe_match, mark_matches, emit_unmatched, - probe_limit - out_len, + matches_before_limit, ) as usize; if table_match.is_empty() { continue; } - - // Gather output and send. - let mut build_df = if emit_unmatched { - p.df.take_opt_chunked_unchecked(&table_match, false) + total_matches += table_match.len(); + + if emit_unmatched { + build_out.opt_gather_extend( + &p.payload, + &table_match, + ShareStrategy::Always, + ); } else { - p.df.take_chunked_unchecked(&table_match, IsSorted::Not, false) + build_out.gather_extend( + &p.payload, + &table_match, + ShareStrategy::Always, + ); }; - if !payload_rechunked { - // TODO: can avoid rechunk? We have to rechunk here or else we do it - // multiple times during the gather. - payload.rechunk_mut(); - payload_rechunked = true; - } - let mut probe_df = - payload.take_slice_unchecked_impl(&probe_match, false); - let out_df = if params.left_is_build.unwrap() { - build_df.hstack_mut_unchecked(probe_df.get_columns()); - build_df - } else { - probe_df.hstack_mut_unchecked(build_df.get_columns()); - probe_df - }; - let out_df = postprocess_join(out_df, params); - - out_len = out_len - .checked_add(out_df.height().try_into().unwrap()) - .unwrap(); - out_frames.push(out_df); - - if out_len >= probe_limit { - out_len = 0; - let df = - accumulate_dataframes_vertical_unchecked(out_frames.drain(..)); - let out_morsel = Morsel::new(df, seq, src_token.clone()); + if probe_match.len() >= probe_limit as usize { + if !payload_rechunked { + payload.rechunk_mut(); + payload_rechunked = true; + } + probe_out.gather_extend( + &payload, + &probe_match, + ShareStrategy::Always, + ); + probe_match.clear(); + let out_morsel = new_morsel(&mut build_out, &mut probe_out); if send.send(out_morsel).await.is_err() { - break; + return Ok(max_seq); } + // We had enough matches to need a mid-partition flush, let's assume there are a lot of + // matches and just do a large reserve. + build_out.reserve(probe_limit as usize + max_match_per_key_est); } } } - if out_len > 0 { - let df = accumulate_dataframes_vertical_unchecked(out_frames.drain(..)); - let out_morsel = Morsel::new(df, seq, src_token.clone()); + if !probe_match.is_empty() { + if !payload_rechunked { + payload.rechunk_mut(); + } + probe_out.gather_extend(&payload, &probe_match, ShareStrategy::Always); + probe_match.clear(); + let out_morsel = new_morsel(&mut build_out, &mut probe_out); if send.send(out_morsel).await.is_err() { - break; + return Ok(max_seq); } } } } drop(wait_token); + + // Move selectivity estimate a bit towards latest value. + selectivity_estimate = + 0.8 * selectivity_estimate + 0.2 * (total_matches as f64 / df_height as f64); } Ok(max_seq) } - fn ordered_unmatched( - &mut self, - partitioner: &HashPartitioner, - params: &EquiJoinParams, - ) -> DataFrame { - let mut out_per_partition = Vec::with_capacity(partitioner.num_partitions()); - let seq_name = PlSmallStr::from_static("__POLARS_PROBE_PRESERVE_ORDER_SEQ"); - let idx_name = PlSmallStr::from_static("__POLARS_PROBE_PRESERVE_ORDER_IDX"); - let mut unmarked_idxs = Vec::new(); - unsafe { - for p in self.table_per_partition.iter() { - p.table.unmarked_keys(&mut unmarked_idxs, 0, IdxSize::MAX); + fn ordered_unmatched(&mut self, params: &EquiJoinParams) -> DataFrame { + // TODO: parallelize this operator. - // Gather and create full-null counterpart. - let mut build_df = - p.df.take_chunked_unchecked(&unmarked_idxs, IsSorted::Not, false); - let len = build_df.height(); - let mut out_df = if params.left_is_build.unwrap() { - let probe_df = DataFrame::full_null(¶ms.right_payload_schema, len); - build_df.hstack_mut_unchecked(probe_df.get_columns()); - build_df - } else { - let mut probe_df = DataFrame::full_null(¶ms.left_payload_schema, len); - probe_df.hstack_mut_unchecked(build_df.get_columns()); - probe_df - }; + let build_payload_schema = if params.left_is_build.unwrap() { + ¶ms.left_payload_schema + } else { + ¶ms.right_payload_schema + }; + + let mut unmarked_idxs = Vec::new(); + let mut linearized_idxs = Vec::new(); - // The indices are not ordered globally, but within each chunk they are, so sorting - // by chunk sequence id, breaking ties by inner chunk idx works. - let (chunk_seqs, idx_in_chunk) = unmarked_idxs + for (p_idx, p) in self.table_per_partition.iter().enumerate_idx() { + p.hash_table + .unmarked_keys(&mut unmarked_idxs, 0, IdxSize::MAX); + linearized_idxs.extend( + unmarked_idxs .iter() - .map(|chunk_id| { - let (chunk, idx_in_chunk) = chunk_id.extract(); - (p.chunk_seq_ids[chunk as usize].to_u64(), idx_in_chunk) - }) - .unzip(); + .map(|i| (unsafe { *p.seq_ids.get_unchecked(*i as usize) }, p_idx, *i)), + ); + } + + linearized_idxs.sort_by_key(|(seq_id, _, _)| *seq_id); - let chunk_seqs_ca = UInt64Chunked::from_vec(seq_name.clone(), chunk_seqs); - let idxs_ca = IdxCa::from_vec(idx_name.clone(), idx_in_chunk); - out_df.with_column_unchecked(chunk_seqs_ca.into_column()); - out_df.with_column_unchecked(idxs_ca.into_column()); - out_per_partition.push(out_df); + unsafe { + let mut build_out = DataFrameBuilder::new(build_payload_schema.clone()); + build_out.reserve(linearized_idxs.len()); + + // Group indices from the same partition. + let mut group_start = 0; + let mut gather_idxs = Vec::new(); + while group_start < linearized_idxs.len() { + gather_idxs.clear(); + + let (_seq, p_idx, idx_in_p) = linearized_idxs[group_start]; + gather_idxs.push(idx_in_p); + let mut group_end = group_start + 1; + while group_end < linearized_idxs.len() && linearized_idxs[group_end].1 == p_idx { + gather_idxs.push(linearized_idxs[group_end].2); + group_end += 1; + } + + build_out.gather_extend( + &self.table_per_partition[p_idx as usize].payload, + &gather_idxs, + ShareStrategy::Never, // Don't keep entire table alive for unmatched indices. + ); + + group_start = group_end; } - // Sort by chunk sequence id, then by inner chunk idx. - let sort_options = SortMultipleOptions { - descending: vec![false], - nulls_last: vec![false], - multithreaded: true, - maintain_order: false, - limit: None, + let mut build_df = build_out.freeze(); + let out_df = if params.left_is_build.unwrap() { + let probe_df = + DataFrame::full_null(¶ms.right_payload_schema, build_df.height()); + build_df.hstack_mut_unchecked(probe_df.get_columns()); + build_df + } else { + let mut probe_df = + DataFrame::full_null(¶ms.left_payload_schema, build_df.height()); + probe_df.hstack_mut_unchecked(build_df.get_columns()); + probe_df }; - let mut out_df = accumulate_dataframes_vertical_unchecked(out_per_partition); - out_df - .sort_in_place([seq_name.clone(), idx_name.clone()], sort_options) - .unwrap(); - out_df.drop_in_place(&seq_name).unwrap(); - out_df.drop_in_place(&idx_name).unwrap(); - out_df = postprocess_join(out_df, params); - out_df + postprocess_join(out_df, params) } } } @@ -914,7 +1121,7 @@ impl EmitUnmatchedState { let total_len: usize = self .partitions .iter() - .map(|p| p.table.num_keys() as usize) + .map(|p| p.hash_table.num_keys() as usize) .sum(); let ideal_morsel_count = (total_len / get_ideal_morsel_size()).max(1); let morsel_count = ideal_morsel_count.next_multiple_of(num_pipelines); @@ -926,7 +1133,7 @@ impl EmitUnmatchedState { while let Some(p) = self.partitions.get(self.active_partition_idx) { loop { // Generate a chunk of unmarked key indices. - self.offset_in_active_p += p.table.unmarked_keys( + self.offset_in_active_p += p.hash_table.unmarked_keys( &mut unmarked_idxs, self.offset_in_active_p as IdxSize, morsel_size as IdxSize, @@ -937,8 +1144,7 @@ impl EmitUnmatchedState { // Gather and create full-null counterpart. let out_df = unsafe { - let mut build_df = - p.df.take_chunked_unchecked(&unmarked_idxs, IsSorted::Not, false); + let mut build_df = p.payload.take_slice_unchecked_impl(&unmarked_idxs, false); let len = build_df.height(); if params.left_is_build.unwrap() { let probe_df = DataFrame::full_null(¶ms.right_payload_schema, len); @@ -993,8 +1199,8 @@ struct EquiJoinParams { right_key_selectors: Vec, left_payload_select: Vec>, right_payload_select: Vec>, - left_payload_schema: Schema, - right_payload_schema: Schema, + left_payload_schema: Arc, + right_payload_schema: Arc, args: JoinArgs, random_state: PlRandomState, } @@ -1023,7 +1229,7 @@ pub struct EquiJoinNode { state: EquiJoinState, params: EquiJoinParams, num_pipelines: usize, - table: Option>, + table: Option>, } impl EquiJoinNode { @@ -1050,9 +1256,9 @@ impl EquiJoinNode { let table = left_is_build.map(|lib| { if lib { - new_chunked_idx_table(left_key_schema.clone()) + new_idx_table(left_key_schema.clone()) } else { - new_chunked_idx_table(right_key_schema.clone()) + new_idx_table(right_key_schema.clone()) } }); @@ -1083,8 +1289,9 @@ impl EquiJoinNode { EquiJoinState::Sample(SampleState::default()) }; - let left_payload_schema = select_schema(&left_input_schema, &left_payload_select); - let right_payload_schema = select_schema(&right_input_schema, &right_payload_select); + let left_payload_schema = Arc::new(select_schema(&left_input_schema, &left_payload_select)); + let right_payload_schema = + Arc::new(select_schema(&right_input_schema, &right_payload_select)); Ok(Self { state, num_pipelines: 0, @@ -1147,9 +1354,12 @@ impl ComputeNode for EquiJoinNode { // If we are building and the build input is done, transition to probing. if let EquiJoinState::Build(build_state) = &mut self.state { if recv[build_idx] == PortState::Done { - self.state = EquiJoinState::Probe( - build_state.finalize(&self.params, self.table.as_deref().unwrap()), - ); + let probe_state = if self.params.preserve_order_build { + build_state.finalize_ordered(&self.params, self.table.as_deref().unwrap()) + } else { + build_state.finalize_unordered(&self.params, self.table.as_deref().unwrap()) + }; + self.state = EquiJoinState::Probe(probe_state); } } @@ -1160,8 +1370,7 @@ impl ComputeNode for EquiJoinNode { if samples_consumed && recv[probe_idx] == PortState::Done { if self.params.emit_unmatched_build() { if self.params.preserve_order_build { - let partitioner = HashPartitioner::new(self.num_pipelines, 0); - let unmatched = probe_state.ordered_unmatched(&partitioner, &self.params); + let unmatched = probe_state.ordered_unmatched(&self.params); let mut src = InMemorySourceNode::new( Arc::new(unmatched), probe_state.max_seq_sent.successor(), @@ -1321,16 +1530,15 @@ impl ComputeNode for EquiJoinNode { let receivers = recv_ports[build_idx].take().unwrap().parallel(); build_state - .partitions_per_worker - .resize_with(self.num_pipelines, Vec::new); + .local_builders + .resize_with(self.num_pipelines, Default::default); let partitioner = HashPartitioner::new(self.num_pipelines, 0); - for (worker_ps, recv) in build_state.partitions_per_worker.iter_mut().zip(receivers) - { + for (local_builder, recv) in build_state.local_builders.iter_mut().zip(receivers) { join_handles.push(scope.spawn_task( TaskPriority::High, BuildState::partition_and_sink( recv, - worker_ps, + local_builder, partitioner.clone(), &self.params, state, diff --git a/crates/polars-utils/src/lib.rs b/crates/polars-utils/src/lib.rs index f0126825a537..88b456f0e33f 100644 --- a/crates/polars-utils/src/lib.rs +++ b/crates/polars-utils/src/lib.rs @@ -31,6 +31,7 @@ pub mod priority; pub mod select; pub mod slice; pub mod sort; +pub mod sparse_init_vec; pub mod sync; #[cfg(feature = "sysinfo")] pub mod sys; diff --git a/crates/polars-utils/src/sparse_init_vec.rs b/crates/polars-utils/src/sparse_init_vec.rs new file mode 100644 index 000000000000..16de05bad2c7 --- /dev/null +++ b/crates/polars-utils/src/sparse_init_vec.rs @@ -0,0 +1,81 @@ +use std::sync::atomic::{AtomicU8, AtomicUsize, Ordering}; + +pub struct SparseInitVec { + ptr: *mut T, + len: usize, + cap: usize, + + num_init: AtomicUsize, + init_mask: Vec, +} + +unsafe impl Send for SparseInitVec {} +unsafe impl Sync for SparseInitVec {} + +impl SparseInitVec { + pub fn with_capacity(len: usize) -> Self { + let init_mask = (0..len.div_ceil(8)).map(|_| AtomicU8::new(0)).collect(); + let mut storage = Vec::with_capacity(len); + let cap = storage.capacity(); + let ptr = storage.as_mut_ptr(); + core::mem::forget(storage); + Self { + len, + cap, + ptr, + num_init: AtomicUsize::new(0), + init_mask, + } + } + + pub fn try_set(&self, idx: usize, value: T) -> Result<(), T> { + unsafe { + if idx >= self.len { + return Err(value); + } + + // SAFETY: we use Relaxed orderings as we only ever read data back in methods that take + // self mutably or owned, already implying synchronization. + let init_mask_byte = self.init_mask.get_unchecked(idx / 8); + let bit_mask = 1 << (idx % 8); + if init_mask_byte.fetch_or(bit_mask, Ordering::Relaxed) & bit_mask != 0 { + return Err(value); + } + + self.ptr.add(idx).write(value); + self.num_init.fetch_add(1, Ordering::Relaxed); + } + + Ok(()) + } + + pub fn try_assume_init(mut self) -> Result, Self> { + unsafe { + if *self.num_init.get_mut() == self.len { + let ret = Vec::from_raw_parts(self.ptr, self.len, self.cap); + drop(core::mem::take(&mut self.init_mask)); + core::mem::forget(self); + Ok(ret) + } else { + Err(self) + } + } + } +} + +impl Drop for SparseInitVec { + fn drop(&mut self) { + unsafe { + // Make sure storage gets dropped even if element drop panics. + let _storage = Vec::from_raw_parts(self.ptr, 0, self.cap); + + for idx in 0..self.len { + let init_mask_byte = self.init_mask.get_unchecked_mut(idx / 8); + let bit_mask = 1 << (idx % 8); + if *init_mask_byte.get_mut() & bit_mask != 0 { + self.ptr.add(idx).drop_in_place(); + } + } + } + } +}