Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Branchless Parquet Prefiltering #19190

Merged
merged 35 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
5f778ed
perf: Branchless Parquet Prefiltering
coastalwhite Oct 11, 2024
ed75aee
add filter_boolean_kernel
coastalwhite Oct 11, 2024
bd3d707
work
coastalwhite Oct 12, 2024
a75ad9b
working dictionary
coastalwhite Oct 14, 2024
9cf67ad
working plain
coastalwhite Oct 14, 2024
2fefb57
working nested
coastalwhite Oct 15, 2024
850bc31
use BitMask
coastalwhite Oct 15, 2024
5c1ee7f
clippy and format
coastalwhite Oct 15, 2024
9dd83e2
fix failing tests
coastalwhite Oct 15, 2024
5527433
clippy
coastalwhite Oct 15, 2024
6a8ec1a
massively reduce monomorphisations for kernels
coastalwhite Oct 17, 2024
a4babd6
move fixed size binary to branchless
coastalwhite Oct 17, 2024
56f32b5
impl fsb kernels
coastalwhite Oct 18, 2024
734a6c5
properly support and check fixed size binary
coastalwhite Oct 18, 2024
2f4b1e9
add size and alignment casting to sharedstorage
coastalwhite Oct 18, 2024
5d591af
fixedsizebinary with existing kernels
coastalwhite Oct 18, 2024
21e8a5d
properly drop backingstorage with original_type
coastalwhite Oct 18, 2024
d76ef66
some work
coastalwhite Oct 18, 2024
a4030f6
get started on string filters
coastalwhite Oct 18, 2024
97f9c67
add binview and booleans
coastalwhite Oct 22, 2024
115635f
move plain required to memcpy
coastalwhite Oct 23, 2024
955fb62
remove a whole load of unused code
coastalwhite Oct 23, 2024
65df8f9
remove a whole load more of unneeded code
coastalwhite Oct 23, 2024
8e619a9
remove even more code and simplify nested by a lot
coastalwhite Oct 24, 2024
8f56402
fix rebase problem
coastalwhite Oct 24, 2024
4d7069c
format
coastalwhite Oct 24, 2024
911adcd
fix Storage try_transmute
coastalwhite Oct 24, 2024
e43f651
add some docs
coastalwhite Oct 24, 2024
d10e391
pyfmt
coastalwhite Oct 24, 2024
b09c775
fix doc and broken test
coastalwhite Oct 24, 2024
590429b
typo
coastalwhite Oct 24, 2024
5d7ec68
rustfmt
coastalwhite Oct 24, 2024
d246d86
enable prefiltering by default
coastalwhite Oct 31, 2024
9ea1775
clippy
coastalwhite Oct 31, 2024
c43afbe
fix rebase
coastalwhite Nov 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion crates/polars-arrow/src/array/binview/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ impl<T: ViewType + ?Sized> BinaryViewArrayGeneric<T> {
// Verify the invariants
#[cfg(debug_assertions)]
{
if let Some(validity) = validity.as_ref() {
assert_eq!(validity.len(), views.len());
}

// @TODO: Enable this. This is currently bugged with concatenate.
// let mut actual_total_buffer_len = 0;
// let mut actual_total_bytes_len = 0;
Expand All @@ -169,7 +173,13 @@ impl<T: ViewType + ?Sized> BinaryViewArrayGeneric<T> {
// actual_total_buffer_len += buffer.len();
// }

for view in views.iter() {
for (i, view) in views.iter().enumerate() {
let is_valid = validity.as_ref().map_or(true, |v| v.get_bit(i));

if !is_valid {
continue;
}

// actual_total_bytes_len += view.length as usize;
if view.length > View::MAX_INLINE_SIZE {
assert!((view.buffer_idx as usize) < (buffers.len()));
Expand Down
6 changes: 5 additions & 1 deletion crates/polars-arrow/src/array/binview/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ impl<T: ViewType + ?Sized> MutableBinaryViewArray<T> {
Self::from_iterator(slice.as_ref().iter().map(|opt_v| opt_v.as_ref()))
}

fn finish_in_progress(&mut self) -> bool {
pub fn finish_in_progress(&mut self) -> bool {
if !self.in_progress_buffer.is_empty() {
self.completed_buffers
.push(std::mem::take(&mut self.in_progress_buffer).into());
Expand All @@ -530,6 +530,10 @@ impl<T: ViewType + ?Sized> MutableBinaryViewArray<T> {
arr
}

pub fn take(self) -> (Vec<View>, Vec<Buffer<u8>>) {
(self.views, self.completed_buffers)
}

#[inline]
pub fn value(&self, i: usize) -> &T {
assert!(i < self.len());
Expand Down
13 changes: 13 additions & 0 deletions crates/polars-arrow/src/array/binview/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,19 @@ impl View {
}
}

/// Construct a byte slice from an inline view.
///
/// # Safety
///
/// Assumes that this view is inlinable.
pub unsafe fn get_inlined_slice_unchecked(&self) -> &[u8] {
debug_assert!(self.length <= View::MAX_INLINE_SIZE);

let ptr = self as *const View as *const u8;
// SAFETY: Invariant of function
unsafe { std::slice::from_raw_parts(ptr.add(4), self.length as usize) }
}

/// Extend a `Vec<View>` with inline views slices of `src` with `width`.
///
/// This tries to use SIMD to optimize the copying and can be massively faster than doing a
Expand Down
41 changes: 40 additions & 1 deletion crates/polars-arrow/src/bitmap/bitmask.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use std::simd::{LaneCount, Mask, MaskElement, SupportedLaneCount};

use polars_utils::slice::load_padded_le_u64;

use super::iterator::FastU56BitmapIter;
use super::utils::{count_zeros, BitmapIter};
use crate::bitmap::Bitmap;

/// Returns the nth set bit in w, if n+1 bits are set. The indexing is
Expand Down Expand Up @@ -110,6 +112,39 @@ impl<'a> BitMask<'a> {
(left, right)
}

#[inline]
pub fn sliced(&self, offset: usize, length: usize) -> Self {
assert!(offset.checked_add(length).unwrap() <= self.len);
unsafe { self.sliced_unchecked(offset, length) }
}

/// # Safety
/// The index must be in-bounds.
#[inline]
pub unsafe fn sliced_unchecked(&self, offset: usize, length: usize) -> Self {
if cfg!(debug_assertions) {
assert!(offset.checked_add(length).unwrap() <= self.len);
}

Self {
bytes: self.bytes,
offset: self.offset + offset,
len: length,
}
}

pub fn unset_bits(&self) -> usize {
count_zeros(self.bytes, self.offset, self.len)
}

pub fn set_bits(&self) -> usize {
self.len - self.unset_bits()
}

pub fn fast_iter_u56(&self) -> FastU56BitmapIter {
FastU56BitmapIter::new(self.bytes, self.offset, self.len)
}

#[cfg(feature = "simd")]
#[inline]
pub fn get_simd<T, const N: usize>(&self, idx: usize) -> Mask<T, N>
Expand Down Expand Up @@ -162,7 +197,7 @@ impl<'a> BitMask<'a> {

/// Computes the index of the nth set bit after start.
///
/// Both are zero-indexed, so nth_set_bit_idx(0, 0) finds the index of the
/// Both are zero-indexed, so `nth_set_bit_idx(0, 0)` finds the index of the
/// first bit set (which can be 0 as well). The returned index is absolute,
/// not relative to start.
pub fn nth_set_bit_idx(&self, mut n: usize, mut start: usize) -> Option<usize> {
Expand Down Expand Up @@ -245,6 +280,10 @@ impl<'a> BitMask<'a> {
false
}
}

pub fn iter(&self) -> BitmapIter {
BitmapIter::new(self.bytes, self.offset, self.len)
}
}

#[cfg(test)]
Expand Down
1 change: 1 addition & 0 deletions crates/polars-arrow/src/bitmap/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ impl Iterator for FastU32BitmapIter<'_> {

unsafe impl TrustedLen for FastU32BitmapIter<'_> {}

#[derive(Clone)]
pub struct FastU56BitmapIter<'a> {
bytes: &'a [u8],
shift: u32,
Expand Down
4 changes: 3 additions & 1 deletion crates/polars-arrow/src/buffer/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl<T> Buffer<T> {
}

/// Auxiliary method to create a new Buffer
pub(crate) fn from_storage(storage: SharedStorage<T>) -> Self {
pub fn from_storage(storage: SharedStorage<T>) -> Self {
let ptr = storage.as_ptr();
let length = storage.len();
Buffer {
Expand Down Expand Up @@ -164,6 +164,8 @@ impl<T> Buffer<T> {
#[inline]
#[must_use]
pub unsafe fn sliced_unchecked(mut self, offset: usize, length: usize) -> Self {
debug_assert!(offset + length <= self.len());

self.slice_unchecked(offset, length);
self
}
Expand Down
1 change: 1 addition & 0 deletions crates/polars-arrow/src/pushable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ impl<T: NativeType> Pushable<Option<T>> for MutablePrimitiveArray<T> {
pub trait NoOption {}
impl NoOption for &str {}
impl NoOption for &[u8] {}
impl NoOption for Vec<u8> {}

impl<T, K> Pushable<T> for MutableBinaryViewArray<K>
where
Expand Down
6 changes: 4 additions & 2 deletions crates/polars-arrow/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,10 +283,12 @@ impl<T: Pod> SharedStorage<T> {
return Err(self);
}

Ok(SharedStorage {
let storage = SharedStorage {
inner: self.inner.cast(),
phantom: PhantomData,
})
};
std::mem::forget(self);
Ok(storage)
}
}

Expand Down
62 changes: 51 additions & 11 deletions crates/polars-arrow/src/types/aligned_bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub unsafe trait AlignedBytesCast<B: AlignedBytes>: Pod {}
pub trait AlignedBytes: Pod + Zeroable + Copy + Default + Eq {
const ALIGNMENT: usize;
const SIZE: usize;
const SIZE_ALIGNMENT_PAIR: PrimitiveSizeAlignmentPair;

type Unaligned: AsRef<[u8]>
+ AsMut<[u8]>
Expand Down Expand Up @@ -45,7 +46,7 @@ pub trait AlignedBytes: Pod + Zeroable + Copy + Default + Eq {

macro_rules! impl_aligned_bytes {
(
$(($name:ident, $size:literal, $alignment:literal, [$($eq_type:ty),*]),)+
$(($name:ident, $size:literal, $alignment:literal, $sap:ident, [$($eq_type:ty),*]),)+
) => {
$(
/// Bytes with a size and alignment.
Expand All @@ -59,6 +60,7 @@ macro_rules! impl_aligned_bytes {
impl AlignedBytes for $name {
const ALIGNMENT: usize = $alignment;
const SIZE: usize = $size;
const SIZE_ALIGNMENT_PAIR: PrimitiveSizeAlignmentPair = PrimitiveSizeAlignmentPair::$sap;

type Unaligned = [u8; $size];

Expand Down Expand Up @@ -98,15 +100,53 @@ macro_rules! impl_aligned_bytes {
}
}

#[derive(Clone, Copy)]
pub enum PrimitiveSizeAlignmentPair {
S1A1,
S2A2,
S4A4,
S8A4,
S8A8,
S12A4,
S16A4,
S16A8,
S16A16,
S32A16,
}

impl PrimitiveSizeAlignmentPair {
pub const fn size(self) -> usize {
match self {
Self::S1A1 => 1,
Self::S2A2 => 2,
Self::S4A4 => 4,
Self::S8A4 | Self::S8A8 => 8,
Self::S12A4 => 12,
Self::S16A4 | Self::S16A8 | Self::S16A16 => 16,
Self::S32A16 => 32,
}
}

pub const fn alignment(self) -> usize {
match self {
Self::S1A1 => 1,
Self::S2A2 => 2,
Self::S4A4 | Self::S8A4 | Self::S12A4 | Self::S16A4 => 4,
Self::S8A8 | Self::S16A8 => 8,
Self::S16A16 | Self::S32A16 => 16,
}
}
}

impl_aligned_bytes! {
(Bytes1Alignment1, 1, 1, [u8, i8]),
(Bytes2Alignment2, 2, 2, [u16, i16, f16]),
(Bytes4Alignment4, 4, 4, [u32, i32, f32]),
(Bytes8Alignment8, 8, 8, [u64, i64, f64]),
(Bytes8Alignment4, 8, 4, [days_ms]),
(Bytes12Alignment4, 12, 4, [[u32; 3]]),
(Bytes16Alignment4, 16, 4, [View]),
(Bytes16Alignment8, 16, 8, [months_days_ns]),
(Bytes16Alignment16, 16, 16, [u128, i128]),
(Bytes32Alignment16, 32, 16, [i256]),
(Bytes1Alignment1, 1, 1, S1A1, [u8, i8]),
(Bytes2Alignment2, 2, 2, S2A2, [u16, i16, f16]),
(Bytes4Alignment4, 4, 4, S4A4, [u32, i32, f32]),
(Bytes8Alignment8, 8, 8, S8A8, [u64, i64, f64]),
(Bytes8Alignment4, 8, 4, S8A4, [days_ms]),
(Bytes12Alignment4, 12, 4, S12A4, [[u32; 3]]),
(Bytes16Alignment4, 16, 4, S16A4, [View]),
(Bytes16Alignment8, 16, 8, S16A8, [months_days_ns]),
(Bytes16Alignment16, 16, 16, S16A16, [u128, i128]),
(Bytes32Alignment16, 32, 16, S32A16, [i256]),
}
1 change: 1 addition & 0 deletions crates/polars-compute/src/filter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use arrow::array::{new_empty_array, Array, BinaryViewArray, BooleanArray, Primit
use arrow::bitmap::utils::SlicesIterator;
use arrow::bitmap::Bitmap;
use arrow::with_match_primitive_type_full;
pub use boolean::filter_boolean_kernel;

pub fn filter(array: &dyn Array, mask: &BooleanArray) -> Box<dyn Array> {
assert_eq!(array.len(), mask.len());
Expand Down
35 changes: 28 additions & 7 deletions crates/polars-io/src/parquet/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,10 @@ fn rg_to_dfs(
use_statistics: bool,
hive_partition_columns: Option<&[Series]>,
) -> PolarsResult<Vec<DataFrame>> {
if config::verbose() {
eprintln!("parquet scan with parallel = {parallel:?}");
}

// If we are only interested in the row_index, we take a little special path here.
if projection.is_empty() {
if let Some(row_index) = row_index {
Expand Down Expand Up @@ -341,6 +345,10 @@ fn rg_to_dfs_prefiltered(
let num_live_columns = live_variables.len();
let num_dead_columns = projection.len() - num_live_columns;

if config::verbose() {
eprintln!("parquet live columns = {num_live_columns}, dead columns = {num_dead_columns}");
}

// @NOTE: This is probably already sorted, but just to be sure.
let mut projection_sorted = projection.to_vec();
projection_sorted.sort();
Expand Down Expand Up @@ -446,6 +454,10 @@ fn rg_to_dfs_prefiltered(
debug_assert_eq!(df.height(), filter_mask.set_bits());

if filter_mask.set_bits() == 0 {
if config::verbose() {
eprintln!("parquet filter mask found that row group can be skipped");
}

return Ok(None);
}

Expand Down Expand Up @@ -886,10 +898,19 @@ pub fn read_parquet<R: MmapBytesReader>(
.unwrap_or_else(|| Cow::Owned((0usize..reader_schema.len()).collect::<Vec<_>>()));

if let Some(predicate) = predicate {
if std::env::var("POLARS_PARQUET_AUTO_PREFILTERED").is_ok_and(|v| v == "1")
&& predicate.live_variables().map_or(0, |v| v.len()) * n_row_groups
>= POOL.current_num_threads()
{
let prefilter_env = std::env::var("POLARS_PARQUET_PREFILTER");
let prefilter_env = prefilter_env.as_deref();

let num_live_variables = predicate.live_variables().map_or(0, |v| v.len());
let mut do_prefilter = false;

do_prefilter |= prefilter_env == Ok("1"); // Force enable
do_prefilter |= num_live_variables * n_row_groups >= POOL.current_num_threads()
&& materialized_projection.len() >= num_live_variables;

do_prefilter &= prefilter_env != Ok("0"); // Force disable

if do_prefilter {
parallel = ParallelStrategy::Prefiltered;
}
}
Expand Down Expand Up @@ -1419,12 +1440,12 @@ impl PrefilterMaskSetting {
pub fn should_prefilter(&self, prefilter_cost: f64, dtype: &ArrowDataType) -> bool {
match self {
Self::Auto => {
// Prefiltering is more expensive for nested types so we make the cut-off
// higher.
// Prefiltering is only expensive for nested types so we make the cut-off quite
// high.
let is_nested = dtype.is_nested();

// We empirically selected these numbers.
(is_nested && prefilter_cost <= 0.01) || (!is_nested && prefilter_cost <= 0.02)
is_nested && prefilter_cost <= 0.01
},
Self::Pre => true,
Self::Post => false,
Expand Down
Loading
Loading