Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Improve join performance for new-streaming engine #21620

Merged
merged 25 commits into from
Mar 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 6 additions & 5 deletions crates/polars-arrow/src/array/primitive/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,12 @@ impl<T: NativeType> StaticArrayBuilder for PrimitiveArrayBuilder<T> {
idxs: &[IdxSize],
_share: ShareStrategy,
) {
self.values.reserve(idxs.len());
for idx in idxs {
self.values
.push_unchecked(other.value_unchecked(*idx as usize));
}
// TODO: SIMD gather kernels?
let other_values_slice = other.values().as_slice();
self.values.extend(
idxs.iter()
.map(|idx| *other_values_slice.get_unchecked(*idx as usize)),
);
self.validity
.gather_extend_from_opt_validity(other.validity(), idxs);
}
Expand Down
49 changes: 49 additions & 0 deletions crates/polars-arrow/src/bitmap/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,18 @@ impl BitmapBuilder {
self.extend_from_slice(slice, bm_offset + start, length);
}

pub fn subslice_extend_from_opt_validity(
&mut self,
bitmap: Option<&Bitmap>,
start: usize,
length: usize,
) {
match bitmap {
Some(bm) => self.subslice_extend_from_bitmap(bm, start, length),
None => self.extend_constant(length, true),
}
}

/// # Safety
/// The indices must be in-bounds.
pub unsafe fn gather_extend_from_slice(
Expand Down Expand Up @@ -308,6 +320,43 @@ impl BitmapBuilder {
self.opt_gather_extend_from_slice(slice, offset, length, idxs);
}

/// # Safety
/// The indices must be in-bounds.
pub unsafe fn gather_extend_from_opt_validity(
&mut self,
bitmap: Option<&Bitmap>,
idxs: &[IdxSize],
length: usize,
) {
if let Some(bm) = bitmap {
let (slice, offset, sl_length) = bm.as_slice();
debug_assert_eq!(sl_length, length);
self.gather_extend_from_slice(slice, offset, length, idxs);
} else {
self.extend_constant(length, true);
}
}

pub fn opt_gather_extend_from_opt_validity(
&mut self,
bitmap: Option<&Bitmap>,
idxs: &[IdxSize],
length: usize,
) {
if let Some(bm) = bitmap {
let (slice, offset, sl_length) = bm.as_slice();
debug_assert_eq!(sl_length, length);
self.opt_gather_extend_from_slice(slice, offset, sl_length, idxs);
} else {
unsafe {
self.reserve(idxs.len());
for idx in idxs {
self.push_unchecked((*idx as usize) < length);
}
}
}
}

/// # Safety
/// May only be called once at the end.
unsafe fn finish(&mut self) {
Expand Down
95 changes: 95 additions & 0 deletions crates/polars-core/src/chunked_array/object/builder.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use arrow::array::builder::{ArrayBuilder, ShareStrategy};
use arrow::bitmap::BitmapBuilder;
use polars_utils::vec::PushUnchecked;

use super::*;
use crate::utils::get_iter_capacity;
Expand Down Expand Up @@ -176,3 +178,96 @@ pub(crate) fn object_series_to_arrow_array(s: &Series) -> ArrayRef {
let arr = arr.as_any().downcast_ref::<ListArray<i64>>().unwrap();
arr.values().to_boxed()
}

impl<T: PolarsObject> ArrayBuilder for ObjectChunkedBuilder<T> {
fn dtype(&self) -> &ArrowDataType {
&ArrowDataType::FixedSizeBinary(size_of::<T>())
}

fn reserve(&mut self, additional: usize) {
self.bitmask_builder.reserve(additional);
self.values.reserve(additional);
}

fn freeze(self) -> Box<dyn Array> {
Box::new(ObjectArray {
values: self.values.into(),
validity: self.bitmask_builder.into_opt_validity(),
})
}

fn freeze_reset(&mut self) -> Box<dyn Array> {
Box::new(ObjectArray {
values: core::mem::take(&mut self.values).into(),
validity: core::mem::take(&mut self.bitmask_builder).into_opt_validity(),
})
}

fn len(&self) -> usize {
self.values.len()
}

fn extend_nulls(&mut self, length: usize) {
self.values.resize(self.values.len() + length, T::default());
self.bitmask_builder.extend_constant(length, false);
}

fn subslice_extend(
&mut self,
other: &dyn Array,
start: usize,
length: usize,
_share: ShareStrategy,
) {
let other: &ObjectArray<T> = other.as_any().downcast_ref().unwrap();
self.values
.extend_from_slice(&other.values[start..start + length]);
self.bitmask_builder
.subslice_extend_from_opt_validity(other.validity(), start, length);
}

fn subslice_extend_repeated(
&mut self,
other: &dyn Array,
start: usize,
length: usize,
repeats: usize,
share: ShareStrategy,
) {
for _ in 0..repeats {
self.subslice_extend(other, start, length, share)
}
}

unsafe fn gather_extend(&mut self, other: &dyn Array, idxs: &[IdxSize], _share: ShareStrategy) {
let other: &ObjectArray<T> = other.as_any().downcast_ref().unwrap();
let other_values_slice = other.values.as_slice();
self.values.extend(
idxs.iter()
.map(|idx| other_values_slice.get_unchecked(*idx as usize).clone()),
);
self.bitmask_builder
.gather_extend_from_opt_validity(other.validity(), idxs, other.len());
}

fn opt_gather_extend(&mut self, other: &dyn Array, idxs: &[IdxSize], _share: ShareStrategy) {
let other: &ObjectArray<T> = other.as_any().downcast_ref().unwrap();
let other_values_slice = other.values.as_slice();
self.values.reserve(idxs.len());
unsafe {
for idx in idxs {
let val = if (*idx as usize) < other.len() {
other_values_slice.get_unchecked(*idx as usize).clone()
} else {
T::default()
};
self.values.push_unchecked(val);
}
}
self.bitmask_builder.opt_gather_extend_from_opt_validity(
other.validity(),
idxs,
other.len(),
);
}
}
12 changes: 10 additions & 2 deletions crates/polars-core/src/chunked_array/object/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::fmt::{Debug, Formatter};
use std::ops::Deref;
use std::sync::{Arc, RwLock};

use arrow::array::builder::ArrayBuilder;
use arrow::array::ArrayRef;
use arrow::datatypes::ArrowDataType;
use once_cell::sync::Lazy;
Expand Down Expand Up @@ -40,7 +41,9 @@ static GLOBAL_OBJECT_REGISTRY: Lazy<RwLock<Option<ObjectRegistry>>> = Lazy::new(

/// This trait can be registered, after which that global registration
/// can be used to materialize object types
pub trait AnonymousObjectBuilder {
pub trait AnonymousObjectBuilder: ArrayBuilder {
fn as_array_builder(self: Box<Self>) -> Box<dyn ArrayBuilder>;

/// # Safety
/// Expect `ObjectArray<T>` arrays.
unsafe fn from_chunks(self: Box<Self>, chunks: Vec<ArrayRef>) -> Series;
Expand Down Expand Up @@ -73,12 +76,17 @@ pub trait AnonymousObjectBuilder {
}

impl<T: PolarsObject> AnonymousObjectBuilder for ObjectChunkedBuilder<T> {
// Expect ObjectArray<T> arrays.
/// # Safety
/// Expects `ObjectArray<T>` arrays.
unsafe fn from_chunks(self: Box<Self>, chunks: Vec<ArrayRef>) -> Series {
ObjectChunked::<T>::new_with_compute_len(Arc::new(self.field().clone()), chunks)
.into_series()
}

fn as_array_builder(self: Box<Self>) -> Box<dyn ArrayBuilder> {
self
}

fn append_null(&mut self) {
self.append_null()
}
Expand Down
9 changes: 9 additions & 0 deletions crates/polars-core/src/series/builder.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use arrow::array::builder::{make_builder, ArrayBuilder, ShareStrategy};
use polars_utils::IdxSize;

#[cfg(feature = "object")]
use crate::chunked_array::object::registry::get_object_builder;
use crate::prelude::*;
use crate::utils::Container;

Expand All @@ -12,6 +14,13 @@ pub struct SeriesBuilder {

impl SeriesBuilder {
pub fn new(dtype: DataType) -> Self {
// FIXME: get rid of this hack.
#[cfg(feature = "object")]
if matches!(dtype, DataType::Object(_)) {
let builder = get_object_builder(PlSmallStr::EMPTY, 0).as_array_builder();
return Self { dtype, builder };
}

let builder = make_builder(&dtype.to_physical().to_arrow(CompatLevel::newest()));
Self { dtype, builder }
}
Expand Down
77 changes: 61 additions & 16 deletions crates/polars-expr/src/hash_keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,25 @@ impl HashKeys {
self.len() == 0
}

/// After this call partition_idxs[p] will contain the indices of hashes
/// that belong to partition p, and the cardinality sketches are updated
/// accordingly.
pub fn gen_partition_idxs(
/// After this call partitions will be extended with the partition for each
/// hash. Nulls are assigned IdxSize::MAX or a specific partition depending
/// on whether partition_nulls is true.
pub fn gen_partitions(
&self,
partitioner: &HashPartitioner,
partitions: &mut Vec<IdxSize>,
partition_nulls: bool,
) {
match self {
Self::RowEncoded(s) => s.gen_partitions(partitioner, partitions, partition_nulls),
Self::Single(s) => s.gen_partitions(partitioner, partitions, partition_nulls),
}
}

/// After this call partition_idxs[p] will be extended with the indices of
/// hashes that belong to partition p, and the cardinality sketches are
/// updated accordingly.
pub fn gen_idxs_per_partition(
&self,
partitioner: &HashPartitioner,
partition_idxs: &mut [Vec<IdxSize>],
Expand All @@ -86,13 +101,13 @@ impl HashKeys {
) {
if sketches.is_empty() {
match self {
Self::RowEncoded(s) => s.gen_partition_idxs::<false>(
Self::RowEncoded(s) => s.gen_idxs_per_partition::<false>(
partitioner,
partition_idxs,
sketches,
partition_nulls,
),
Self::Single(s) => s.gen_partition_idxs::<false>(
Self::Single(s) => s.gen_idxs_per_partition::<false>(
partitioner,
partition_idxs,
sketches,
Expand All @@ -101,13 +116,13 @@ impl HashKeys {
}
} else {
match self {
Self::RowEncoded(s) => s.gen_partition_idxs::<true>(
Self::RowEncoded(s) => s.gen_idxs_per_partition::<true>(
partitioner,
partition_idxs,
sketches,
partition_nulls,
),
Self::Single(s) => s.gen_partition_idxs::<true>(
Self::Single(s) => s.gen_idxs_per_partition::<true>(
partitioner,
partition_idxs,
sketches,
Expand Down Expand Up @@ -159,7 +174,33 @@ pub struct RowEncodedKeys {
}

impl RowEncodedKeys {
pub fn gen_partition_idxs<const BUILD_SKETCHES: bool>(
pub fn gen_partitions(
&self,
partitioner: &HashPartitioner,
partitions: &mut Vec<IdxSize>,
partition_nulls: bool,
) {
partitions.reserve(self.hashes.len());
if let Some(validity) = self.keys.validity() {
// Arbitrarily put nulls in partition 0.
let null_p = if partition_nulls { 0 } else { IdxSize::MAX };
partitions.extend(self.hashes.values_iter().zip(validity).map(|(h, is_v)| {
if is_v {
partitioner.hash_to_partition(*h) as IdxSize
} else {
null_p
}
}))
} else {
partitions.extend(
self.hashes
.values_iter()
.map(|h| partitioner.hash_to_partition(*h) as IdxSize),
)
}
}

pub fn gen_idxs_per_partition<const BUILD_SKETCHES: bool>(
&self,
partitioner: &HashPartitioner,
partition_idxs: &mut [Vec<IdxSize>],
Expand All @@ -168,9 +209,6 @@ impl RowEncodedKeys {
) {
assert!(partition_idxs.len() == partitioner.num_partitions());
assert!(!BUILD_SKETCHES || sketches.len() == partitioner.num_partitions());
for p in partition_idxs.iter_mut() {
p.clear();
}

if let Some(validity) = self.keys.validity() {
for (i, (h, is_v)) in self.hashes.values_iter().zip(validity).enumerate() {
Expand Down Expand Up @@ -264,17 +302,24 @@ pub struct SingleKeys {
}

impl SingleKeys {
pub fn gen_partition_idxs<const BUILD_SKETCHES: bool>(
#[allow(clippy::ptr_arg)] // Remove when implemented.
pub fn gen_partitions(
&self,
_partitioner: &HashPartitioner,
_partitions: &mut Vec<IdxSize>,
_partition_nulls: bool,
) {
todo!()
}

pub fn gen_idxs_per_partition<const BUILD_SKETCHES: bool>(
&self,
partitioner: &HashPartitioner,
partition_idxs: &mut [Vec<IdxSize>],
_sketches: &mut [CardinalitySketch],
_partition_nulls: bool,
) {
assert!(partitioner.num_partitions() == partition_idxs.len());
for p in partition_idxs.iter_mut() {
p.clear();
}

todo!()
}
Expand Down
Loading
Loading