diff --git a/crates/polars-arrow/src/array/binview/builder.rs b/crates/polars-arrow/src/array/binview/builder.rs new file mode 100644 index 000000000000..86d9048914be --- /dev/null +++ b/crates/polars-arrow/src/array/binview/builder.rs @@ -0,0 +1,291 @@ +use std::marker::PhantomData; +use std::sync::{Arc, LazyLock}; + +use hashbrown::hash_map::Entry; +use polars_utils::aliases::{InitHashMaps, PlHashMap}; +use polars_utils::IdxSize; + +use crate::array::binview::{DEFAULT_BLOCK_SIZE, MAX_EXP_BLOCK_SIZE}; +use crate::array::builder::{ShareStrategy, StaticArrayBuilder}; +use crate::array::{Array, BinaryViewArrayGeneric, View, ViewType}; +use crate::bitmap::OptBitmapBuilder; +use crate::buffer::Buffer; +use crate::datatypes::ArrowDataType; + +static PLACEHOLDER_BUFFER: LazyLock> = LazyLock::new(|| Buffer::from_static(&[])); + +pub struct BinaryViewArrayGenericBuilder { + dtype: ArrowDataType, + views: Vec, + active_buffer: Vec, + active_buffer_idx: u32, + buffer_set: Vec>, + stolen_buffers: PlHashMap<*const u8, u32>, + + // With these we can amortize buffer set translation costs if repeatedly + // stealing from the same set of buffers. + last_buffer_set_stolen_from: Option]>>, + buffer_set_translation_idxs: Vec<(u32, u32)>, // (idx, generation) + buffer_set_translation_generation: u32, + + validity: OptBitmapBuilder, + /// Total bytes length if we would concatenate them all. + total_bytes_len: usize, + /// Total bytes in the buffer set (excluding remaining capacity). + total_buffer_len: usize, + view_type: PhantomData, +} + +impl BinaryViewArrayGenericBuilder { + pub fn new(dtype: ArrowDataType) -> Self { + Self { + dtype, + views: Vec::new(), + active_buffer: Vec::new(), + active_buffer_idx: 0, + buffer_set: Vec::new(), + stolen_buffers: PlHashMap::new(), + last_buffer_set_stolen_from: None, + buffer_set_translation_idxs: Vec::new(), + buffer_set_translation_generation: 0, + validity: OptBitmapBuilder::default(), + total_bytes_len: 0, + total_buffer_len: 0, + view_type: PhantomData, + } + } + + #[inline] + fn reserve_active_buffer(&mut self, additional: usize) { + let len = self.active_buffer.len(); + let cap = self.active_buffer.capacity(); + if additional > cap - len || len + additional >= (u32::MAX - 1) as usize { + self.reserve_active_buffer_slow(additional); + } + } + + #[cold] + fn reserve_active_buffer_slow(&mut self, additional: usize) { + assert!( + additional <= (u32::MAX - 1) as usize, + "strings longer than 2^32 - 2 are not supported" + ); + + // Allocate a new buffer and flush the old buffer. + let new_capacity = (self.active_buffer.capacity() * 2) + .clamp(DEFAULT_BLOCK_SIZE, MAX_EXP_BLOCK_SIZE) + .max(additional); + + let old_buffer = + core::mem::replace(&mut self.active_buffer, Vec::with_capacity(new_capacity)); + if !old_buffer.is_empty() { + // Replace dummy with real buffer. + self.buffer_set[self.active_buffer_idx as usize] = Buffer::from(old_buffer); + } + self.active_buffer_idx = self.buffer_set.len().try_into().unwrap(); + self.buffer_set.push(PLACEHOLDER_BUFFER.clone()) // Push placeholder so active_buffer_idx stays valid. + } + + fn push_value_ignore_validity(&mut self, bytes: &V) { + let bytes = bytes.to_bytes(); + self.total_bytes_len += bytes.len(); + unsafe { + let view = if bytes.len() > View::MAX_INLINE_SIZE as usize { + self.reserve_active_buffer(bytes.len()); + + let buffer_idx = u32::try_from(self.buffer_set.len()).unwrap(); + let offset = self.active_buffer.len() as u32; // Ensured no overflow by reserve_active_buffer. + self.active_buffer.extend_from_slice(bytes); + self.total_buffer_len += bytes.len(); + View::new_noninline_unchecked(bytes, buffer_idx, offset) + } else { + View::new_inline_unchecked(bytes) + }; + self.views.push(view); + } + } + + fn switch_active_stealing_bufferset_to(&mut self, buffer_set: &Arc<[Buffer]>) { + // Fat pointer equality, checks both start and length. + if self + .last_buffer_set_stolen_from + .as_ref() + .is_some_and(|stolen_bs| std::ptr::eq(Arc::as_ptr(stolen_bs), Arc::as_ptr(buffer_set))) + { + return; // Already active. + } + + // Switch to new generation (invalidating all old translation indices), + // and resizing the buffer with invalid indices if necessary. + let old_gen = self.buffer_set_translation_generation; + self.buffer_set_translation_generation = old_gen.wrapping_add(1); + if self.buffer_set_translation_idxs.len() < buffer_set.len() { + self.buffer_set_translation_idxs + .resize(buffer_set.len(), (0, old_gen)); + } + } + + unsafe fn extend_views_dedup_ignore_validity( + &mut self, + views: impl IntoIterator, + other_bufferset: &Arc<[Buffer]>, + ) { + // TODO: if there are way more buffers than length translate per-view + // rather than all at once. + self.switch_active_stealing_bufferset_to(other_bufferset); + + for mut view in views { + if view.length > View::MAX_INLINE_SIZE { + // Translate from old array-local buffer idx to global stolen buffer idx. + let (mut new_buffer_idx, gen) = *self + .buffer_set_translation_idxs + .get_unchecked(view.buffer_idx as usize); + if gen != self.buffer_set_translation_generation { + // This buffer index wasn't seen before for this array, do a dedup lookup. + // Since we map by starting pointer and different subslices may have different lengths, we expand + // the buffer to the maximum it could be. + let buffer = other_bufferset + .get_unchecked(view.buffer_idx as usize) + .clone() + .expand_end_to_storage(); + let buf_id = buffer.as_slice().as_ptr(); + let idx = match self.stolen_buffers.entry(buf_id) { + Entry::Occupied(o) => *o.get(), + Entry::Vacant(v) => { + let idx = self.buffer_set.len() as u32; + self.total_buffer_len += buffer.len(); + self.buffer_set.push(buffer); + v.insert(idx); + idx + }, + }; + + // Cache result for future lookups. + *self + .buffer_set_translation_idxs + .get_unchecked_mut(view.buffer_idx as usize) = + (idx, self.buffer_set_translation_generation); + new_buffer_idx = idx; + } + view.buffer_idx = new_buffer_idx; + } + + self.total_bytes_len += view.length as usize; + self.views.push(view); + } + } +} + +impl StaticArrayBuilder for BinaryViewArrayGenericBuilder { + type Array = BinaryViewArrayGeneric; + + fn dtype(&self) -> &ArrowDataType { + &self.dtype + } + + fn reserve(&mut self, additional: usize) { + self.views.reserve(additional); + self.validity.reserve(additional); + } + + fn freeze(mut self) -> Self::Array { + // Flush active buffer and/or remove extra placeholder buffer. + if !self.active_buffer.is_empty() { + self.buffer_set[self.active_buffer_idx as usize] = Buffer::from(self.active_buffer); + } else if self.buffer_set.last().is_some_and(|b| b.is_empty()) { + self.buffer_set.pop(); + } + + unsafe { + BinaryViewArrayGeneric::new_unchecked( + self.dtype, + Buffer::from(self.views), + Arc::from(self.buffer_set), + self.validity.into_opt_validity(), + self.total_bytes_len, + self.total_buffer_len, + ) + } + } + + fn subslice_extend( + &mut self, + other: &Self::Array, + start: usize, + length: usize, + share: ShareStrategy, + ) { + self.views.reserve(length); + + unsafe { + match share { + ShareStrategy::Never => { + if let Some(v) = other.validity() { + for i in start..start + length { + if v.get_bit_unchecked(i) { + self.push_value_ignore_validity(other.value_unchecked(i)); + } else { + self.views.push(View::default()) + } + } + } else { + for i in start..start + length { + self.push_value_ignore_validity(other.value_unchecked(i)); + } + } + }, + ShareStrategy::Always => { + let other_views = &other.views()[start..start + length]; + self.extend_views_dedup_ignore_validity( + other_views.iter().copied(), + other.data_buffers(), + ); + }, + } + } + + self.validity + .subslice_extend_from_opt_validity(other.validity(), start, length); + } + + unsafe fn gather_extend( + &mut self, + other: &Self::Array, + idxs: &[IdxSize], + share: ShareStrategy, + ) { + self.views.reserve(idxs.len()); + + unsafe { + match share { + ShareStrategy::Never => { + if let Some(v) = other.validity() { + for idx in idxs { + if v.get_bit_unchecked(*idx as usize) { + self.push_value_ignore_validity( + other.value_unchecked(*idx as usize), + ); + } else { + self.views.push(View::default()) + } + } + } else { + for idx in idxs { + self.push_value_ignore_validity(other.value_unchecked(*idx as usize)); + } + } + }, + ShareStrategy::Always => { + let other_view_slice = other.views().as_slice(); + let other_views = idxs + .iter() + .map(|idx| *other_view_slice.get_unchecked(*idx as usize)); + self.extend_views_dedup_ignore_validity(other_views, other.data_buffers()); + }, + } + } + + self.validity + .gather_extend_from_opt_validity(other.validity(), idxs); + } +} diff --git a/crates/polars-arrow/src/array/binview/mod.rs b/crates/polars-arrow/src/array/binview/mod.rs index 1062874b120e..bd684a7b2bc2 100644 --- a/crates/polars-arrow/src/array/binview/mod.rs +++ b/crates/polars-arrow/src/array/binview/mod.rs @@ -1,4 +1,7 @@ //! See thread: https://lists.apache.org/thread/w88tpz76ox8h3rxkjl4so6rg3f1rv7wt + +mod builder; +pub use builder::*; mod ffi; pub(super) mod fmt; mod iterator; @@ -44,6 +47,10 @@ pub type MutablePlBinary = MutableBinaryViewArray<[u8]>; static BIN_VIEW_TYPE: ArrowDataType = ArrowDataType::BinaryView; static UTF8_VIEW_TYPE: ArrowDataType = ArrowDataType::Utf8View; +// Growth parameters of view array buffers. +const DEFAULT_BLOCK_SIZE: usize = 8 * 1024; +const MAX_EXP_BLOCK_SIZE: usize = 16 * 1024 * 1024; + pub trait ViewType: Sealed + 'static + PartialEq + AsRef { const IS_UTF8: bool; const DATA_TYPE: ArrowDataType; @@ -356,6 +363,35 @@ impl BinaryViewArrayGeneric { T::from_bytes_unchecked(v.get_slice_unchecked(&self.buffers)) } + /// Returns the element at index `i`, or None if it is null. + /// # Panics + /// iff `i >= self.len()` + #[inline] + pub fn get(&self, i: usize) -> Option<&T> { + assert!(i < self.len()); + unsafe { self.get_unchecked(i) } + } + + /// Returns the element at index `i`, or None if it is null. + /// + /// # Safety + /// Assumes that the `i < self.len`. + #[inline] + pub unsafe fn get_unchecked(&self, i: usize) -> Option<&T> { + if self + .validity + .as_ref() + .is_none_or(|v| v.get_bit_unchecked(i)) + { + let v = self.views.get_unchecked(i); + Some(T::from_bytes_unchecked( + v.get_slice_unchecked(&self.buffers), + )) + } else { + None + } + } + /// Returns an iterator of `Option<&T>` over every element of this array. pub fn iter(&self) -> ZipValidity<&T, BinaryViewValueIter, BitmapIter> { ZipValidity::new_with_validity(self.values_iter(), self.validity.as_ref()) diff --git a/crates/polars-arrow/src/array/binview/mutable.rs b/crates/polars-arrow/src/array/binview/mutable.rs index 2f5746421e4c..cb3b38243ab1 100644 --- a/crates/polars-arrow/src/array/binview/mutable.rs +++ b/crates/polars-arrow/src/array/binview/mutable.rs @@ -9,7 +9,9 @@ use polars_utils::aliases::{InitHashMaps, PlHashMap}; use crate::array::binview::iterator::MutableBinaryViewValueIter; use crate::array::binview::view::validate_utf8_only; -use crate::array::binview::{BinaryViewArrayGeneric, ViewType}; +use crate::array::binview::{ + BinaryViewArrayGeneric, ViewType, DEFAULT_BLOCK_SIZE, MAX_EXP_BLOCK_SIZE, +}; use crate::array::{Array, MutableArray, TryExtend, TryPush, View}; use crate::bitmap::MutableBitmap; use crate::buffer::Buffer; @@ -17,9 +19,6 @@ use crate::datatypes::ArrowDataType; use crate::legacy::trusted_len::TrustedLenPush; use crate::trusted_len::TrustedLen; -const DEFAULT_BLOCK_SIZE: usize = 8 * 1024; -const MAX_EXP_BLOCK_SIZE: usize = 16 * 1024 * 1024; - // Invariants: // // - Each view must point to a valid slice of a buffer diff --git a/crates/polars-arrow/src/array/boolean/builder.rs b/crates/polars-arrow/src/array/boolean/builder.rs new file mode 100644 index 000000000000..990c9697b0f0 --- /dev/null +++ b/crates/polars-arrow/src/array/boolean/builder.rs @@ -0,0 +1,69 @@ +use polars_utils::IdxSize; + +use super::BooleanArray; +use crate::array::builder::{ShareStrategy, StaticArrayBuilder}; +use crate::bitmap::{BitmapBuilder, OptBitmapBuilder}; +use crate::datatypes::ArrowDataType; + +pub struct BooleanArrayBuilder { + dtype: ArrowDataType, + values: BitmapBuilder, + validity: OptBitmapBuilder, +} + +impl BooleanArrayBuilder { + pub fn new(dtype: ArrowDataType) -> Self { + Self { + dtype, + values: BitmapBuilder::new(), + validity: OptBitmapBuilder::default(), + } + } +} + +impl StaticArrayBuilder for BooleanArrayBuilder { + type Array = BooleanArray; + + fn dtype(&self) -> &ArrowDataType { + &self.dtype + } + + fn reserve(&mut self, additional: usize) { + self.values.reserve(additional); + self.validity.reserve(additional); + } + + fn freeze(self) -> BooleanArray { + let values = self.values.freeze(); + let validity = self.validity.into_opt_validity(); + BooleanArray::try_new(self.dtype, values, validity).unwrap() + } + + fn subslice_extend( + &mut self, + other: &BooleanArray, + start: usize, + length: usize, + _share: ShareStrategy, + ) { + self.values + .subslice_extend_from_bitmap(other.values(), start, length); + self.validity + .subslice_extend_from_opt_validity(other.validity(), start, length); + } + + unsafe fn gather_extend( + &mut self, + other: &BooleanArray, + idxs: &[IdxSize], + _share: ShareStrategy, + ) { + self.values.reserve(idxs.len()); + for idx in idxs { + self.values + .push_unchecked(other.value_unchecked(*idx as usize)); + } + self.validity + .gather_extend_from_opt_validity(other.validity(), idxs); + } +} diff --git a/crates/polars-arrow/src/array/boolean/mod.rs b/crates/polars-arrow/src/array/boolean/mod.rs index 320def5a940a..07d47e510e9c 100644 --- a/crates/polars-arrow/src/array/boolean/mod.rs +++ b/crates/polars-arrow/src/array/boolean/mod.rs @@ -1,4 +1,5 @@ use either::Either; +use polars_error::{polars_bail, PolarsResult}; use super::{Array, Splitable}; use crate::array::iterator::NonNullValuesIter; @@ -12,9 +13,9 @@ pub(super) mod fmt; mod from; mod iterator; mod mutable; - pub use mutable::*; -use polars_error::{polars_bail, PolarsResult}; +mod builder; +pub use builder::*; /// A [`BooleanArray`] is Arrow's semantically equivalent of an immutable `Vec>`. /// It implements [`Array`]. diff --git a/crates/polars-arrow/src/array/builder.rs b/crates/polars-arrow/src/array/builder.rs new file mode 100644 index 000000000000..c7cfc3952e9a --- /dev/null +++ b/crates/polars-arrow/src/array/builder.rs @@ -0,0 +1,207 @@ +use polars_utils::IdxSize; + +use crate::array::binview::BinaryViewArrayGenericBuilder; +use crate::array::boolean::BooleanArrayBuilder; +use crate::array::fixed_size_binary::FixedSizeBinaryArrayBuilder; +use crate::array::fixed_size_list::FixedSizeListArrayBuilder; +use crate::array::list::ListArrayBuilder; +use crate::array::null::NullArrayBuilder; +use crate::array::struct_::StructArrayBuilder; +use crate::array::{Array, PrimitiveArrayBuilder}; +use crate::datatypes::{ArrowDataType, PhysicalType}; +use crate::with_match_primitive_type_full; + +/// Used for arrays which can share buffers with input arrays to appends, +/// gathers, etc. +#[derive(Copy, Clone, Debug)] +pub enum ShareStrategy { + Never, + Always, +} + +pub trait StaticArrayBuilder { + type Array: Array; + + fn dtype(&self) -> &ArrowDataType; + fn reserve(&mut self, additional: usize); + + /// Consume this builder returning the built array. + fn freeze(self) -> Self::Array; + + /// Extends this builder with the contents of the given array. May panic if + /// other does not match the dtype of this array. + fn extend(&mut self, other: &Self::Array, share: ShareStrategy) { + self.subslice_extend(other, 0, other.len(), share); + } + + /// Extends this builder with the contents of the given array subslice. May + /// panic if other does not match the dtype of this array. + fn subslice_extend( + &mut self, + other: &Self::Array, + start: usize, + length: usize, + share: ShareStrategy, + ); + + /// Extends this builder with the contents of the given array at the given + /// indices. That is, other[idxs[i]] is appended to this array in order, + /// for each i=0..idxs.len(). May panic if other does not match the + /// dtype of this array. + /// + /// # Safety + /// The indices must be in-bounds. + unsafe fn gather_extend(&mut self, other: &Self::Array, idxs: &[IdxSize], share: ShareStrategy); +} + +impl ArrayBuilder for T { + #[inline(always)] + fn dtype(&self) -> &ArrowDataType { + StaticArrayBuilder::dtype(self) + } + + #[inline(always)] + fn reserve(&mut self, additional: usize) { + StaticArrayBuilder::reserve(self, additional) + } + + #[inline(always)] + fn freeze(self) -> Box { + Box::new(StaticArrayBuilder::freeze(self)) + } + + #[inline(always)] + fn subslice_extend( + &mut self, + other: &dyn Array, + start: usize, + length: usize, + share: ShareStrategy, + ) { + let other: &T::Array = other.as_any().downcast_ref().unwrap(); + StaticArrayBuilder::subslice_extend(self, other, start, length, share); + } + + #[inline(always)] + unsafe fn gather_extend(&mut self, other: &dyn Array, idxs: &[IdxSize], share: ShareStrategy) { + let other: &T::Array = other.as_any().downcast_ref().unwrap(); + StaticArrayBuilder::gather_extend(self, other, idxs, share); + } +} + +#[allow(private_bounds)] +pub trait ArrayBuilder: ArrayBuilderBoxedHelper { + fn dtype(&self) -> &ArrowDataType; + fn reserve(&mut self, additional: usize); + + /// Consume this builder returning the built array. + fn freeze(self) -> Box; + + /// Extends this builder with the contents of the given array. May panic if + /// other does not match the dtype of this array. + fn extend(&mut self, other: &dyn Array, share: ShareStrategy) { + self.subslice_extend(other, 0, other.len(), share); + } + + /// Extends this builder with the contents of the given array subslice. May + /// panic if other does not match the dtype of this array. + fn subslice_extend( + &mut self, + other: &dyn Array, + start: usize, + length: usize, + share: ShareStrategy, + ); + + /// Extends this builder with the contents of the given array at the given + /// indices. That is, other[idxs[i]] is appended to this array in order, + /// for each i=0..idxs.len(). May panic if other does not match the + /// dtype of this array. + /// + /// # Safety + /// The indices must be in-bounds. + unsafe fn gather_extend(&mut self, other: &dyn Array, idxs: &[IdxSize], share: ShareStrategy); +} + +/// A hack that lets us call the consuming `freeze` method on Box. +trait ArrayBuilderBoxedHelper { + fn freeze_boxed(self: Box) -> Box; +} + +impl ArrayBuilderBoxedHelper for T { + fn freeze_boxed(self: Box) -> Box { + self.freeze() + } +} + +impl ArrayBuilder for Box { + fn dtype(&self) -> &ArrowDataType { + (**self).dtype() + } + + fn reserve(&mut self, additional: usize) { + (**self).reserve(additional) + } + + fn freeze(self) -> Box { + self.freeze_boxed() + } + + fn subslice_extend( + &mut self, + other: &dyn Array, + start: usize, + length: usize, + share: ShareStrategy, + ) { + (**self).subslice_extend(other, start, length, share); + } + + unsafe fn gather_extend(&mut self, other: &dyn Array, idxs: &[IdxSize], share: ShareStrategy) { + (**self).gather_extend(other, idxs, share); + } +} + +/// Construct an ArrayBuilder for the given type. +pub fn make_builder(dtype: &ArrowDataType) -> Box { + use PhysicalType::*; + match dtype.to_physical_type() { + Null => Box::new(NullArrayBuilder::new(dtype.clone())), + Boolean => Box::new(BooleanArrayBuilder::new(dtype.clone())), + Primitive(prim_t) => with_match_primitive_type_full!(prim_t, |$T| { + Box::new(PrimitiveArrayBuilder::<$T>::new(dtype.clone())) + }), + FixedSizeBinary => Box::new(FixedSizeBinaryArrayBuilder::new(dtype.clone())), + LargeList => { + let ArrowDataType::LargeList(inner_dt) = dtype else { + unreachable!() + }; + Box::new(ListArrayBuilder::::new( + dtype.clone(), + make_builder(inner_dt.dtype()), + )) + }, + FixedSizeList => { + let ArrowDataType::FixedSizeList(inner_dt, _) = dtype else { + unreachable!() + }; + Box::new(FixedSizeListArrayBuilder::new( + dtype.clone(), + make_builder(inner_dt.dtype()), + )) + }, + Struct => { + let ArrowDataType::Struct(fields) = dtype else { + unreachable!() + }; + let builders = fields.iter().map(|f| make_builder(f.dtype())).collect(); + Box::new(StructArrayBuilder::new(dtype.clone(), builders)) + }, + BinaryView => Box::new(BinaryViewArrayGenericBuilder::<[u8]>::new(dtype.clone())), + Utf8View => Box::new(BinaryViewArrayGenericBuilder::::new(dtype.clone())), + + List | Binary | LargeBinary | Utf8 | LargeUtf8 | Map | Union | Dictionary(_) => { + unimplemented!() + }, + } +} diff --git a/crates/polars-arrow/src/array/fixed_size_binary/builder.rs b/crates/polars-arrow/src/array/fixed_size_binary/builder.rs new file mode 100644 index 000000000000..b5579f23a865 --- /dev/null +++ b/crates/polars-arrow/src/array/fixed_size_binary/builder.rs @@ -0,0 +1,76 @@ +use polars_utils::IdxSize; + +use super::FixedSizeBinaryArray; +use crate::array::builder::{ShareStrategy, StaticArrayBuilder}; +use crate::bitmap::OptBitmapBuilder; +use crate::buffer::Buffer; +use crate::datatypes::ArrowDataType; + +pub struct FixedSizeBinaryArrayBuilder { + dtype: ArrowDataType, + values: Vec, + validity: OptBitmapBuilder, +} + +impl FixedSizeBinaryArrayBuilder { + pub fn new(dtype: ArrowDataType) -> Self { + Self { + dtype, + values: Vec::new(), + validity: OptBitmapBuilder::default(), + } + } +} + +impl StaticArrayBuilder for FixedSizeBinaryArrayBuilder { + type Array = FixedSizeBinaryArray; + + fn dtype(&self) -> &ArrowDataType { + &self.dtype + } + + fn reserve(&mut self, additional: usize) { + let bytes = additional * FixedSizeBinaryArray::get_size(&self.dtype); + self.values.reserve(bytes); + self.validity.reserve(additional); + } + + fn freeze(self) -> FixedSizeBinaryArray { + let values = Buffer::from(self.values); + let validity = self.validity.into_opt_validity(); + FixedSizeBinaryArray::new(self.dtype, values, validity) + } + + fn subslice_extend( + &mut self, + other: &FixedSizeBinaryArray, + start: usize, + length: usize, + _share: ShareStrategy, + ) { + let other_slice = other.values().as_slice(); + let size = FixedSizeBinaryArray::get_size(&self.dtype); + self.values + .extend_from_slice(&other_slice[start * size..(start + length) * size]); + self.validity + .subslice_extend_from_opt_validity(other.validity(), start, length); + } + + unsafe fn gather_extend( + &mut self, + other: &FixedSizeBinaryArray, + idxs: &[IdxSize], + _share: ShareStrategy, + ) { + let other_slice = other.values().as_slice(); + let size = FixedSizeBinaryArray::get_size(&self.dtype); + self.values.reserve(idxs.len() * size); + for idx in idxs { + let idx = *idx as usize; + let subslice = other_slice.get_unchecked(idx * size..(idx + 1) * size); + self.values.extend_from_slice(subslice); + } + self.validity + .gather_extend_from_opt_validity(other.validity(), idxs); + } +} diff --git a/crates/polars-arrow/src/array/fixed_size_binary/mod.rs b/crates/polars-arrow/src/array/fixed_size_binary/mod.rs index 8217964ea44d..c92fb43a796d 100644 --- a/crates/polars-arrow/src/array/fixed_size_binary/mod.rs +++ b/crates/polars-arrow/src/array/fixed_size_binary/mod.rs @@ -3,9 +3,11 @@ use crate::bitmap::Bitmap; use crate::buffer::Buffer; use crate::datatypes::ArrowDataType; +mod builder; mod ffi; pub(super) mod fmt; mod iterator; +pub use builder::*; mod mutable; pub use mutable::*; use polars_error::{polars_bail, polars_ensure, PolarsResult}; diff --git a/crates/polars-arrow/src/array/fixed_size_list/builder.rs b/crates/polars-arrow/src/array/fixed_size_list/builder.rs new file mode 100644 index 000000000000..5e0862119886 --- /dev/null +++ b/crates/polars-arrow/src/array/fixed_size_list/builder.rs @@ -0,0 +1,95 @@ +use polars_utils::IdxSize; + +use super::FixedSizeListArray; +use crate::array::builder::{ArrayBuilder, ShareStrategy, StaticArrayBuilder}; +use crate::bitmap::OptBitmapBuilder; +use crate::datatypes::ArrowDataType; + +pub struct FixedSizeListArrayBuilder { + dtype: ArrowDataType, + size: usize, + length: usize, + inner_builder: B, + validity: OptBitmapBuilder, +} +impl FixedSizeListArrayBuilder { + pub fn new(dtype: ArrowDataType, inner_builder: B) -> Self { + Self { + size: FixedSizeListArray::get_child_and_size(&dtype).1, + dtype, + length: 0, + inner_builder, + validity: OptBitmapBuilder::default(), + } + } +} + +impl StaticArrayBuilder for FixedSizeListArrayBuilder { + type Array = FixedSizeListArray; + + fn dtype(&self) -> &ArrowDataType { + &self.dtype + } + + fn reserve(&mut self, additional: usize) { + self.inner_builder.reserve(additional); + self.validity.reserve(additional); + } + + fn freeze(self) -> FixedSizeListArray { + let values = self.inner_builder.freeze(); + let validity = self.validity.into_opt_validity(); + FixedSizeListArray::new(self.dtype, self.length, values, validity) + } + + fn subslice_extend( + &mut self, + other: &FixedSizeListArray, + start: usize, + length: usize, + share: ShareStrategy, + ) { + self.inner_builder.subslice_extend( + &**other.values(), + start * self.size, + length * self.size, + share, + ); + self.validity + .subslice_extend_from_opt_validity(other.validity(), start, length); + self.length += length; + } + + unsafe fn gather_extend( + &mut self, + other: &FixedSizeListArray, + idxs: &[IdxSize], + share: ShareStrategy, + ) { + let other_values = &**other.values(); + self.inner_builder.reserve(idxs.len() * self.size); + + // Group consecutive indices into larger copies. + let mut group_start = 0; + while group_start < idxs.len() { + let start_idx = idxs[group_start] as usize; + let mut group_len = 1; + while group_start + group_len < idxs.len() + && idxs[group_start + group_len] as usize == start_idx + group_len + { + group_len += 1; + } + self.inner_builder.subslice_extend( + other_values, + start_idx * self.size, + group_len * self.size, + share, + ); + group_start += group_len; + } + + self.validity + .gather_extend_from_opt_validity(other.validity(), idxs); + self.length += idxs.len(); + } +} diff --git a/crates/polars-arrow/src/array/fixed_size_list/mod.rs b/crates/polars-arrow/src/array/fixed_size_list/mod.rs index 58bb04b4dfe7..7b4005c807be 100644 --- a/crates/polars-arrow/src/array/fixed_size_list/mod.rs +++ b/crates/polars-arrow/src/array/fixed_size_list/mod.rs @@ -6,6 +6,8 @@ mod ffi; pub(super) mod fmt; mod iterator; +mod builder; +pub use builder::*; mod mutable; pub use mutable::*; use polars_error::{polars_bail, polars_ensure, PolarsResult}; diff --git a/crates/polars-arrow/src/array/list/builder.rs b/crates/polars-arrow/src/array/list/builder.rs new file mode 100644 index 000000000000..3d1b5e83833b --- /dev/null +++ b/crates/polars-arrow/src/array/list/builder.rs @@ -0,0 +1,120 @@ +use polars_utils::IdxSize; + +use super::ListArray; +use crate::array::builder::{ArrayBuilder, ShareStrategy, StaticArrayBuilder}; +use crate::bitmap::OptBitmapBuilder; +use crate::datatypes::ArrowDataType; +use crate::offset::{Offsets, OffsetsBuffer}; +use crate::types::Offset; + +pub struct ListArrayBuilder { + dtype: ArrowDataType, + offsets: Offsets, + inner_builder: B, + validity: OptBitmapBuilder, +} + +impl ListArrayBuilder { + pub fn new(dtype: ArrowDataType, inner_builder: B) -> Self { + Self { + dtype, + inner_builder, + offsets: Offsets::new(), + validity: OptBitmapBuilder::default(), + } + } +} + +impl StaticArrayBuilder for ListArrayBuilder { + type Array = ListArray; + + fn dtype(&self) -> &ArrowDataType { + &self.dtype + } + + fn reserve(&mut self, additional: usize) { + self.offsets.reserve(additional); + self.validity.reserve(additional); + // No inner reserve, we have no idea how large it needs to be. + } + + fn freeze(self) -> ListArray { + let offsets = OffsetsBuffer::from(self.offsets); + let values = self.inner_builder.freeze(); + let validity = self.validity.into_opt_validity(); + ListArray::new(self.dtype, offsets, values, validity) + } + + fn subslice_extend( + &mut self, + other: &ListArray, + start: usize, + length: usize, + share: ShareStrategy, + ) { + let start_offset = other.offsets()[start].to_usize(); + let stop_offset = other.offsets()[start + length].to_usize(); + self.offsets + .try_extend_from_slice(other.offsets(), start, length) + .unwrap(); + self.inner_builder.subslice_extend( + &**other.values(), + start_offset, + stop_offset - start_offset, + share, + ); + self.validity + .subslice_extend_from_opt_validity(other.validity(), start, length); + } + + unsafe fn gather_extend( + &mut self, + other: &ListArray, + idxs: &[IdxSize], + share: ShareStrategy, + ) { + let other_values = &**other.values(); + let other_offsets = other.offsets(); + + // Pre-compute proper length for reserve. + let total_len: usize = idxs + .iter() + .map(|i| { + let start = other_offsets.get_unchecked(*i as usize).to_usize(); + let stop = other_offsets.get_unchecked(*i as usize + 1).to_usize(); + stop - start + }) + .sum(); + self.inner_builder.reserve(total_len); + + // Group consecutive indices into larger copies. + let mut group_start = 0; + while group_start < idxs.len() { + let start_idx = idxs[group_start] as usize; + let mut group_len = 1; + while group_start + group_len < idxs.len() + && idxs[group_start + group_len] as usize == start_idx + group_len + { + group_len += 1; + } + + let start_offset = other_offsets.get_unchecked(start_idx).to_usize(); + let stop_offset = other_offsets + .get_unchecked(start_idx + group_len) + .to_usize(); + self.offsets + .try_extend_from_slice(other_offsets, group_start, group_len) + .unwrap(); + self.inner_builder.subslice_extend( + other_values, + start_offset, + stop_offset - start_offset, + share, + ); + group_start += group_len; + } + + self.validity + .gather_extend_from_opt_validity(other.validity(), idxs); + } +} diff --git a/crates/polars-arrow/src/array/list/mod.rs b/crates/polars-arrow/src/array/list/mod.rs index e340b4abbbb9..ab5400c1af1d 100644 --- a/crates/polars-arrow/src/array/list/mod.rs +++ b/crates/polars-arrow/src/array/list/mod.rs @@ -4,6 +4,8 @@ use crate::bitmap::Bitmap; use crate::datatypes::{ArrowDataType, Field}; use crate::offset::{Offset, Offsets, OffsetsBuffer}; +mod builder; +pub use builder::*; mod ffi; pub(super) mod fmt; mod iterator; diff --git a/crates/polars-arrow/src/array/mod.rs b/crates/polars-arrow/src/array/mod.rs index 76d68d0a1b5a..ff73e1d32e27 100644 --- a/crates/polars-arrow/src/array/mod.rs +++ b/crates/polars-arrow/src/array/mod.rs @@ -653,6 +653,7 @@ impl<'a> AsRef<(dyn Array + 'a)> for dyn Array { mod binary; mod boolean; +pub mod builder; mod dictionary; mod fixed_size_binary; mod fixed_size_list; diff --git a/crates/polars-arrow/src/array/null.rs b/crates/polars-arrow/src/array/null.rs index d4e63cca0288..d8602e123eec 100644 --- a/crates/polars-arrow/src/array/null.rs +++ b/crates/polars-arrow/src/array/null.rs @@ -1,8 +1,10 @@ use std::any::Any; use polars_error::{polars_bail, PolarsResult}; +use polars_utils::IdxSize; use super::Splitable; +use crate::array::builder::{ArrayBuilder, ShareStrategy}; use crate::array::{Array, FromFfi, MutableArray, ToFfi}; use crate::bitmap::{Bitmap, MutableBitmap}; use crate::datatypes::{ArrowDataType, PhysicalType}; @@ -213,3 +215,45 @@ impl FromFfi for NullArray { Self::try_new(dtype, array.array().len()) } } + +pub struct NullArrayBuilder { + dtype: ArrowDataType, + length: usize, +} + +impl NullArrayBuilder { + pub fn new(dtype: ArrowDataType) -> Self { + Self { dtype, length: 0 } + } +} + +impl ArrayBuilder for NullArrayBuilder { + fn dtype(&self) -> &ArrowDataType { + &self.dtype + } + + fn reserve(&mut self, _additional: usize) {} + + fn freeze(self) -> Box { + NullArray::new(self.dtype, self.length).to_boxed() + } + + fn subslice_extend( + &mut self, + _other: &dyn Array, + _start: usize, + length: usize, + _share: ShareStrategy, + ) { + self.length += length; + } + + unsafe fn gather_extend( + &mut self, + _other: &dyn Array, + idxs: &[IdxSize], + _share: ShareStrategy, + ) { + self.length += idxs.len(); + } +} diff --git a/crates/polars-arrow/src/array/primitive/builder.rs b/crates/polars-arrow/src/array/primitive/builder.rs new file mode 100644 index 000000000000..74aa71c849b5 --- /dev/null +++ b/crates/polars-arrow/src/array/primitive/builder.rs @@ -0,0 +1,75 @@ +use polars_utils::vec::PushUnchecked; +use polars_utils::IdxSize; + +use super::PrimitiveArray; +use crate::array::builder::{ShareStrategy, StaticArrayBuilder}; +use crate::array::Array; +use crate::bitmap::OptBitmapBuilder; +use crate::buffer::Buffer; +use crate::datatypes::ArrowDataType; +use crate::types::NativeType; + +pub struct PrimitiveArrayBuilder { + dtype: ArrowDataType, + values: Vec, + validity: OptBitmapBuilder, +} + +impl PrimitiveArrayBuilder { + pub fn new(dtype: ArrowDataType) -> Self { + Self { + dtype, + values: Vec::new(), + validity: OptBitmapBuilder::default(), + } + } +} + +impl StaticArrayBuilder for PrimitiveArrayBuilder { + type Array = PrimitiveArray; + + fn dtype(&self) -> &ArrowDataType { + &self.dtype + } + + fn reserve(&mut self, additional: usize) { + self.values.reserve(additional); + self.validity.reserve(additional); + } + + fn freeze(self) -> PrimitiveArray { + let values = Buffer::from(self.values); + let validity = self.validity.into_opt_validity(); + PrimitiveArray::new(self.dtype, values, validity) + } + + fn subslice_extend( + &mut self, + other: &PrimitiveArray, + start: usize, + length: usize, + _share: ShareStrategy, + ) { + let other: &PrimitiveArray = other.as_any().downcast_ref().unwrap(); + self.values + .extend_from_slice(&other.values()[start..start + length]); + self.validity + .subslice_extend_from_opt_validity(other.validity(), start, length); + } + + unsafe fn gather_extend( + &mut self, + other: &PrimitiveArray, + idxs: &[IdxSize], + _share: ShareStrategy, + ) { + let other: &PrimitiveArray = other.as_any().downcast_ref().unwrap(); + self.values.reserve(idxs.len()); + for idx in idxs { + self.values + .push_unchecked(other.value_unchecked(*idx as usize)); + } + self.validity + .gather_extend_from_opt_validity(other.validity(), idxs); + } +} diff --git a/crates/polars-arrow/src/array/primitive/mod.rs b/crates/polars-arrow/src/array/primitive/mod.rs index e28fbc009233..776d2c65853d 100644 --- a/crates/polars-arrow/src/array/primitive/mod.rs +++ b/crates/polars-arrow/src/array/primitive/mod.rs @@ -18,6 +18,8 @@ pub mod iterator; mod mutable; pub use mutable::*; +mod builder; +pub use builder::*; use polars_error::{polars_bail, PolarsResult}; use polars_utils::index::{Bounded, Indexable, NullCount}; use polars_utils::slice::SliceAble; diff --git a/crates/polars-arrow/src/array/struct_/builder.rs b/crates/polars-arrow/src/array/struct_/builder.rs new file mode 100644 index 000000000000..83ceae292690 --- /dev/null +++ b/crates/polars-arrow/src/array/struct_/builder.rs @@ -0,0 +1,78 @@ +use polars_utils::IdxSize; + +use super::StructArray; +use crate::array::builder::{ArrayBuilder, ShareStrategy, StaticArrayBuilder}; +use crate::bitmap::OptBitmapBuilder; +use crate::datatypes::ArrowDataType; + +pub struct StructArrayBuilder { + dtype: ArrowDataType, + length: usize, + inner_builders: Vec>, + validity: OptBitmapBuilder, +} + +impl StructArrayBuilder { + pub fn new(dtype: ArrowDataType, inner_builders: Vec>) -> Self { + Self { + dtype, + length: 0, + inner_builders, + validity: OptBitmapBuilder::default(), + } + } +} + +impl StaticArrayBuilder for StructArrayBuilder { + type Array = StructArray; + + fn dtype(&self) -> &ArrowDataType { + &self.dtype + } + + fn reserve(&mut self, additional: usize) { + for builder in &mut self.inner_builders { + builder.reserve(additional); + } + self.validity.reserve(additional); + } + + fn freeze(self) -> StructArray { + let values = self + .inner_builders + .into_iter() + .map(|b| b.freeze()) + .collect(); + let validity = self.validity.into_opt_validity(); + StructArray::new(self.dtype, self.length, values, validity) + } + + fn subslice_extend( + &mut self, + other: &StructArray, + start: usize, + length: usize, + share: ShareStrategy, + ) { + for (builder, other_values) in self.inner_builders.iter_mut().zip(other.values()) { + builder.subslice_extend(&**other_values, start, length, share); + } + self.validity + .subslice_extend_from_opt_validity(other.validity(), start, length); + self.length += length; + } + + unsafe fn gather_extend( + &mut self, + other: &StructArray, + idxs: &[IdxSize], + share: ShareStrategy, + ) { + for (builder, other_values) in self.inner_builders.iter_mut().zip(other.values()) { + builder.gather_extend(&**other_values, idxs, share); + } + self.validity + .gather_extend_from_opt_validity(other.validity(), idxs); + self.length += idxs.len(); + } +} diff --git a/crates/polars-arrow/src/array/struct_/mod.rs b/crates/polars-arrow/src/array/struct_/mod.rs index 3e34ef425c31..46824a1c4c26 100644 --- a/crates/polars-arrow/src/array/struct_/mod.rs +++ b/crates/polars-arrow/src/array/struct_/mod.rs @@ -2,6 +2,8 @@ use super::{new_empty_array, new_null_array, Array, Splitable}; use crate::bitmap::Bitmap; use crate::datatypes::{ArrowDataType, Field}; +mod builder; +pub use builder::*; mod ffi; pub(super) mod fmt; mod iterator; diff --git a/crates/polars-arrow/src/bitmap/builder.rs b/crates/polars-arrow/src/bitmap/builder.rs index 9289a7ac9ef4..61c86cb6fdf0 100644 --- a/crates/polars-arrow/src/bitmap/builder.rs +++ b/crates/polars-arrow/src/bitmap/builder.rs @@ -1,4 +1,5 @@ use polars_utils::slice::load_padded_le_u64; +use polars_utils::IdxSize; use super::bitmask::BitMask; use crate::bitmap::{Bitmap, MutableBitmap}; @@ -243,6 +244,42 @@ impl BitmapBuilder { self.extend_from_slice(slice, offset, length); } + /// Extends this BitmapBuilder with a subslice of a bitmap. + pub fn subslice_extend_from_bitmap(&mut self, bitmap: &Bitmap, start: usize, length: usize) { + let (slice, bm_offset, bm_length) = bitmap.as_slice(); + assert!(start + length <= bm_length); + self.extend_from_slice(slice, bm_offset + start, length); + } + + /// # Safety + /// The indices must be in-bounds. + pub unsafe fn gather_extend_from_slice( + &mut self, + slice: &[u8], + offset: usize, + length: usize, + idxs: &[IdxSize], + ) { + assert!(8 * slice.len() >= offset + length); + + self.reserve(idxs.len()); + unsafe { + for idx in idxs { + debug_assert!((*idx as usize) < length); + let idx_in_slice = offset + *idx as usize; + let bit = (*slice.get_unchecked(idx_in_slice / 8) >> (idx_in_slice % 8)) & 1; + self.push_unchecked(bit != 0); + } + } + } + + /// # Safety + /// The indices must be in-bounds. + pub unsafe fn gather_extend_from_bitmap(&mut self, bitmap: &Bitmap, idxs: &[IdxSize]) { + let (slice, offset, length) = bitmap.as_slice(); + self.gather_extend_from_slice(slice, offset, length, idxs); + } + /// # Safety /// May only be called once at the end. unsafe fn finish(&mut self) { @@ -327,3 +364,101 @@ impl BitmapBuilder { builder } } + +/// A wrapper for BitmapBuilder that does not allocate until the first false is +/// pushed. Less efficient if you know there are false values because it must +/// check if it has allocated for each push. +pub enum OptBitmapBuilder { + AllTrue { bit_len: usize, bit_cap: usize }, + MayHaveFalse(BitmapBuilder), +} + +impl Default for OptBitmapBuilder { + fn default() -> Self { + Self::AllTrue { + bit_len: 0, + bit_cap: 0, + } + } +} + +impl OptBitmapBuilder { + pub fn reserve(&mut self, additional: usize) { + match self { + Self::AllTrue { bit_len, bit_cap } => { + *bit_cap = usize::max(*bit_cap, *bit_len + additional); + }, + Self::MayHaveFalse(inner) => inner.reserve(additional), + } + } + + pub fn extend_constant(&mut self, length: usize, value: bool) { + match self { + Self::AllTrue { bit_len, bit_cap } => { + if value { + *bit_cap = usize::max(*bit_cap, *bit_len + length); + *bit_len += length; + } else { + self.get_builder().extend_constant(length, value); + } + }, + Self::MayHaveFalse(inner) => inner.extend_constant(length, value), + } + } + + pub fn into_opt_validity(self) -> Option { + match self { + Self::AllTrue { .. } => None, + Self::MayHaveFalse(inner) => inner.into_opt_validity(), + } + } + + pub fn subslice_extend_from_opt_validity( + &mut self, + bitmap: Option<&Bitmap>, + start: usize, + length: usize, + ) { + match bitmap { + Some(bm) => { + self.get_builder() + .subslice_extend_from_bitmap(bm, start, length); + }, + None => { + self.extend_constant(length, true); + }, + } + } + + /// # Safety + /// The indices must be in-bounds. + pub unsafe fn gather_extend_from_opt_validity( + &mut self, + bitmap: Option<&Bitmap>, + idxs: &[IdxSize], + ) { + match bitmap { + Some(bm) => { + self.get_builder().gather_extend_from_bitmap(bm, idxs); + }, + None => { + self.extend_constant(idxs.len(), true); + }, + } + } + + fn get_builder(&mut self) -> &mut BitmapBuilder { + match self { + Self::AllTrue { bit_len, bit_cap } => { + let mut builder = BitmapBuilder::with_capacity(*bit_cap); + builder.extend_constant(*bit_len, true); + *self = Self::MayHaveFalse(builder); + let Self::MayHaveFalse(inner) = self else { + unreachable!() + }; + inner + }, + Self::MayHaveFalse(inner) => inner, + } + } +} diff --git a/crates/polars-arrow/src/buffer/immutable.rs b/crates/polars-arrow/src/buffer/immutable.rs index a3ad6721a6f8..11300940f9c7 100644 --- a/crates/polars-arrow/src/buffer/immutable.rs +++ b/crates/polars-arrow/src/buffer/immutable.rs @@ -89,6 +89,10 @@ impl Buffer { } } + pub fn from_static(data: &'static [T]) -> Self { + Self::from_storage(SharedStorage::from_static(data)) + } + /// Returns the number of bytes in the buffer #[inline] pub fn len(&self) -> usize { @@ -108,6 +112,20 @@ impl Buffer { self.storage.len() != self.length } + /// Expands this slice to the maximum allowed by the underlying storage. + /// Only expands towards the end, the offset isn't changed. That is, element + /// i before and after this operation refer to the same element. + pub fn expand_end_to_storage(self) -> Self { + unsafe { + let offset = self.ptr.offset_from(self.storage.as_ptr()) as usize; + Self { + ptr: self.ptr, + length: self.storage.len() - offset, + storage: self.storage, + } + } + } + /// Returns the byte slice stored in this buffer #[inline] pub fn as_slice(&self) -> &[T] {