From c7dea2b6a1edea6db269ff21c728e9131af44147 Mon Sep 17 00:00:00 2001 From: Audrius Butkevicius Date: Thu, 16 Mar 2023 22:38:47 +0000 Subject: [PATCH 1/4] Add BYTE_STREAM_SPLIT encoder/decoder --- src/encoding/byte_stream_split/decoder.rs | 57 +++++++++++++++++++++++ src/encoding/byte_stream_split/encoder.rs | 17 +++++++ src/encoding/byte_stream_split/mod.rs | 33 +++++++++++++ src/encoding/mod.rs | 1 + 4 files changed, 108 insertions(+) create mode 100644 src/encoding/byte_stream_split/decoder.rs create mode 100644 src/encoding/byte_stream_split/encoder.rs create mode 100644 src/encoding/byte_stream_split/mod.rs diff --git a/src/encoding/byte_stream_split/decoder.rs b/src/encoding/byte_stream_split/decoder.rs new file mode 100644 index 00000000..b519662b --- /dev/null +++ b/src/encoding/byte_stream_split/decoder.rs @@ -0,0 +1,57 @@ +use std::marker::PhantomData; +use crate::error::Error; +use crate::types::NativeType; + +/// Decodes according to [Byte Stream Split](https://github.com/apache/parquet-format/blob/master/Encodings.md#byte-stream-split-byte_stream_split--9). +/// # Implementation +/// This struct does not allocate on the heap. +#[derive(Debug)] +pub struct Decoder<'a, T: NativeType> { + values: &'a [u8], + num_elements: usize, + current: usize, + element_size: usize, + element_type: PhantomData +} + +impl<'a, T: NativeType> Decoder<'a, T> { + pub fn new(values: &'a [u8]) -> Self { + let element_size = std::mem::size_of::(); + let num_elements = values.len() / element_size; + Self { + values, + num_elements, + current: 0, + element_size, + element_type: PhantomData + } + } +} + +impl<'a, T: NativeType> Iterator for Decoder<'a, T> { + type Item = Result; + + #[inline] + fn next(&mut self) -> Option { + if self.current >= self.num_elements { + return None + } + + let mut buffer = vec![0_u8; self.element_size]; + + for n in 0..self.element_size { + buffer[n] = self.values[(self.num_elements * n) + self.current] + } + + let value = T::from_le_bytes(buffer.as_slice().try_into().unwrap()); + + self.current += 1; + + return Some(Ok(value)); + } + + #[inline] + fn size_hint(&self) -> (usize, Option) { + (self.num_elements, Some(self.num_elements)) + } +} diff --git a/src/encoding/byte_stream_split/encoder.rs b/src/encoding/byte_stream_split/encoder.rs new file mode 100644 index 00000000..76db2671 --- /dev/null +++ b/src/encoding/byte_stream_split/encoder.rs @@ -0,0 +1,17 @@ +use crate::types::NativeType; + +/// Encodes an array of NativeType according to BYTE_STREAM_SPLIT +pub fn encode(data: &[T], buffer: &mut Vec) { + let element_size = std::mem::size_of::(); + let num_elements = data.len(); + let total_length = element_size * num_elements; + buffer.reserve(total_length); + + for (i, v) in data.iter().enumerate() { + let value_bytes = v.to_le_bytes(); + let value_bytes_ref = value_bytes.as_ref(); + for n in 0..element_size { + buffer[(num_elements * n) + i] = value_bytes_ref[n]; + } + } +} diff --git a/src/encoding/byte_stream_split/mod.rs b/src/encoding/byte_stream_split/mod.rs new file mode 100644 index 00000000..d2624289 --- /dev/null +++ b/src/encoding/byte_stream_split/mod.rs @@ -0,0 +1,33 @@ +mod decoder; +mod encoder; + +pub use decoder::Decoder; +pub use encoder::encode; + +#[cfg(test)] +mod tests { + use super::*; + use crate::error::Error; + + #[test] + fn basic() -> Result<(), Error> { + let data = vec![1.0_f32, 2.0_f32, 3.0_f32]; + let mut buffer = vec![]; + encode(&data, &mut buffer); + + let mut decoder = Decoder::try_new(&buffer)?; + let prefixes = decoder.by_ref().collect::, _>>()?; + assert_eq!(prefixes, vec![0, 3]); + + // move to the lengths + let mut decoder = decoder.into_lengths()?; + + let lengths = decoder.by_ref().collect::, _>>()?; + assert_eq!(lengths, vec![5, 7]); + + // move to the values + let values = decoder.values(); + assert_eq!(values, b"Helloicopter"); + Ok(()) + } +} diff --git a/src/encoding/mod.rs b/src/encoding/mod.rs index b24aaafb..ccfe2029 100644 --- a/src/encoding/mod.rs +++ b/src/encoding/mod.rs @@ -1,6 +1,7 @@ use std::convert::TryInto; pub mod bitpacked; +pub mod byte_stream_split; pub mod delta_bitpacked; pub mod delta_byte_array; pub mod delta_length_byte_array; From 831252965d5ba376265ac19f1b0fd464906b9546 Mon Sep 17 00:00:00 2001 From: Audrius Butkevicius Date: Thu, 16 Mar 2023 23:04:05 +0000 Subject: [PATCH 2/4] More tests, check input size --- src/encoding/byte_stream_split/decoder.rs | 10 +++++-- src/encoding/byte_stream_split/encoder.rs | 2 +- src/encoding/byte_stream_split/mod.rs | 34 ++++++++++++++++------- 3 files changed, 32 insertions(+), 14 deletions(-) diff --git a/src/encoding/byte_stream_split/decoder.rs b/src/encoding/byte_stream_split/decoder.rs index b519662b..fede1487 100644 --- a/src/encoding/byte_stream_split/decoder.rs +++ b/src/encoding/byte_stream_split/decoder.rs @@ -15,16 +15,20 @@ pub struct Decoder<'a, T: NativeType> { } impl<'a, T: NativeType> Decoder<'a, T> { - pub fn new(values: &'a [u8]) -> Self { + pub fn try_new(values: &'a [u8]) -> Result { let element_size = std::mem::size_of::(); + let values_size = values.len(); + if values_size % element_size != 0 { + return Err(Error::oos("Value array is not a multiple of element size")); + } let num_elements = values.len() / element_size; - Self { + Ok(Self { values, num_elements, current: 0, element_size, element_type: PhantomData - } + }) } } diff --git a/src/encoding/byte_stream_split/encoder.rs b/src/encoding/byte_stream_split/encoder.rs index 76db2671..b2c5c46e 100644 --- a/src/encoding/byte_stream_split/encoder.rs +++ b/src/encoding/byte_stream_split/encoder.rs @@ -5,7 +5,7 @@ pub fn encode(data: &[T], buffer: &mut Vec) { let element_size = std::mem::size_of::(); let num_elements = data.len(); let total_length = element_size * num_elements; - buffer.reserve(total_length); + buffer.resize(total_length, 0); for (i, v) in data.iter().enumerate() { let value_bytes = v.to_le_bytes(); diff --git a/src/encoding/byte_stream_split/mod.rs b/src/encoding/byte_stream_split/mod.rs index d2624289..b740ff44 100644 --- a/src/encoding/byte_stream_split/mod.rs +++ b/src/encoding/byte_stream_split/mod.rs @@ -15,19 +15,33 @@ mod tests { let mut buffer = vec![]; encode(&data, &mut buffer); - let mut decoder = Decoder::try_new(&buffer)?; - let prefixes = decoder.by_ref().collect::, _>>()?; - assert_eq!(prefixes, vec![0, 3]); + let mut decoder = Decoder::::try_new(&buffer).unwrap(); + let values = decoder.by_ref().collect::, _>>()?; - // move to the lengths - let mut decoder = decoder.into_lengths()?; + assert_eq!(data, values); - let lengths = decoder.by_ref().collect::, _>>()?; - assert_eq!(lengths, vec![5, 7]); + Ok(()) + } + + #[test] + fn from_pyarrow_page() -> Result<(), Error> { + let buffer = vec![0, 205, 0, 205, 0, 0, 204, 0, 204, 0, 128, 140, 0, 140, 128, 255, 191, 0, 63, 127]; + + let mut decoder = Decoder::::try_new(&buffer).unwrap(); + let values = decoder.by_ref().collect::, _>>()?; + + assert_eq!(values, vec![-f32::INFINITY, -1.1, 0.0, 1.1, f32::INFINITY]); + + Ok(()) + } + + #[test] + fn fails_for_bad_size() -> Result<(), Error> { + let buffer = vec![0; 12]; + + let result = Decoder::::try_new(&buffer); + assert!(result.is_err()); - // move to the values - let values = decoder.values(); - assert_eq!(values, b"Helloicopter"); Ok(()) } } From 5ca4bac615f70950b082723bd3f90dba5ce43b14 Mon Sep 17 00:00:00 2001 From: Audrius Butkevicius Date: Fri, 17 Mar 2023 08:38:54 +0000 Subject: [PATCH 3/4] No need to unwrap --- src/encoding/byte_stream_split/mod.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/encoding/byte_stream_split/mod.rs b/src/encoding/byte_stream_split/mod.rs index b740ff44..33731877 100644 --- a/src/encoding/byte_stream_split/mod.rs +++ b/src/encoding/byte_stream_split/mod.rs @@ -15,7 +15,7 @@ mod tests { let mut buffer = vec![]; encode(&data, &mut buffer); - let mut decoder = Decoder::::try_new(&buffer).unwrap(); + let mut decoder = Decoder::::try_new(&buffer)?; let values = decoder.by_ref().collect::, _>>()?; assert_eq!(data, values); @@ -24,10 +24,12 @@ mod tests { } #[test] - fn from_pyarrow_page() -> Result<(), Error> { - let buffer = vec![0, 205, 0, 205, 0, 0, 204, 0, 204, 0, 128, 140, 0, 140, 128, 255, 191, 0, 63, 127]; + fn pyarrow_integration() -> Result<(), Error> { + let buffer = vec![ + 0, 205, 0, 205, 0, 0, 204, 0, 204, 0, 128, 140, 0, 140, 128, 255, 191, 0, 63, 127 + ]; - let mut decoder = Decoder::::try_new(&buffer).unwrap(); + let mut decoder = Decoder::::try_new(&buffer)?; let values = decoder.by_ref().collect::, _>>()?; assert_eq!(values, vec![-f32::INFINITY, -1.1, 0.0, 1.1, f32::INFINITY]); From 91205cb1d45889d1d3384214b1262aef619f559d Mon Sep 17 00:00:00 2001 From: Audrius Butkevicius Date: Thu, 13 Apr 2023 10:17:33 +0100 Subject: [PATCH 4/4] Allocate a buffer on the heap once --- src/encoding/byte_stream_split/decoder.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/encoding/byte_stream_split/decoder.rs b/src/encoding/byte_stream_split/decoder.rs index fede1487..dce1b254 100644 --- a/src/encoding/byte_stream_split/decoder.rs +++ b/src/encoding/byte_stream_split/decoder.rs @@ -8,6 +8,7 @@ use crate::types::NativeType; #[derive(Debug)] pub struct Decoder<'a, T: NativeType> { values: &'a [u8], + buffer: Vec, num_elements: usize, current: usize, element_size: usize, @@ -24,6 +25,7 @@ impl<'a, T: NativeType> Decoder<'a, T> { let num_elements = values.len() / element_size; Ok(Self { values, + buffer: vec![0_u8; element_size], num_elements, current: 0, element_size, @@ -41,13 +43,11 @@ impl<'a, T: NativeType> Iterator for Decoder<'a, T> { return None } - let mut buffer = vec![0_u8; self.element_size]; - for n in 0..self.element_size { - buffer[n] = self.values[(self.num_elements * n) + self.current] + self.buffer[n] = self.values[(self.num_elements * n) + self.current] } - let value = T::from_le_bytes(buffer.as_slice().try_into().unwrap()); + let value = T::from_le_bytes(self.buffer.as_slice().try_into().unwrap()); self.current += 1;