From 7e090568716633aa1cd647287f8557e920c93f57 Mon Sep 17 00:00:00 2001 From: Fernando Elizalde Date: Mon, 15 Aug 2022 08:49:46 +0100 Subject: [PATCH] primary page decoder --- parquet-tools/Cargo.toml | 4 +- parquet-tools/src/lib/dump.rs | 186 ++++++++++++++++------------------ 2 files changed, 92 insertions(+), 98 deletions(-) diff --git a/parquet-tools/Cargo.toml b/parquet-tools/Cargo.toml index 982d5fcd3..d4f221d72 100644 --- a/parquet-tools/Cargo.toml +++ b/parquet-tools/Cargo.toml @@ -15,5 +15,5 @@ name = "parquet_tools" path = "src/main.rs" [dependencies] -parquet2 = { version = "0.14", path = "../" } -clap = {version = "2.33", features = ["yaml"]} +parquet2 = { version = "0.15", path = "../" } +clap = {version = "3.2.17", features = ["yaml"]} diff --git a/parquet-tools/src/lib/dump.rs b/parquet-tools/src/lib/dump.rs index ee2a7972e..f0dedc595 100644 --- a/parquet-tools/src/lib/dump.rs +++ b/parquet-tools/src/lib/dump.rs @@ -1,15 +1,40 @@ //! Subcommand `dump`. This subcommand shows the parquet metadata information use parquet2::{ - page::{ - BinaryPageDict, DataPageHeader, DictPage, FixedLenByteArrayPageDict, PrimitivePageDict, - }, - read::{get_page_iterator, read_metadata}, + error::{Error, Result}, + page::Page, + read::{decompress, get_page_iterator, read_metadata}, schema::types::PhysicalType, + types::{decode, NativeType}, }; -use std::{fs::File, io::Write, path::Path, sync::Arc}; +use std::{fs::File, io::Write, path::Path}; -use crate::{Result, SEPARATOR}; +use crate::SEPARATOR; + +pub struct PrimitivePageDict { + values: Vec, +} + +impl PrimitivePageDict { + pub fn new(values: Vec) -> Self { + Self { values } + } + + pub fn values(&self) -> &[T] { + &self.values + } + + #[inline] + pub fn value(&self, index: usize) -> Result<&T> { + let a = self.values.get(index).ok_or_else(|| { + Error::OutOfSpec( + "The data page has an index larger than the dictionary page values".to_string(), + ) + }); + + a + } +} // Dumps data from the file. // The function prints a sample of the data from each of the RowGroups. @@ -40,7 +65,7 @@ where for (i, group) in metadata.row_groups.iter().enumerate() { writeln!( writer, - "Group: {:<10}Rows: {:<15}Bytes: {:}", + "Group: {:<10}Rows: {:<15} Bytes: {:}", i, group.num_rows(), group.total_byte_size() @@ -49,42 +74,43 @@ where for column in &columns { let column_meta = &group.columns()[*column]; - let iter = - get_page_iterator(column_meta, &mut file, None, Vec::with_capacity(4 * 1024))?; + let iter = get_page_iterator( + column_meta, + &mut file, + None, + Vec::with_capacity(4 * 1024), + 1024 * 1024, + )?; + + let mut decompress_buffer = vec![]; for (page_ind, page) in iter.enumerate() { let page = page?; - writeln!( - writer, - "\nPage: {:<10}Column: {:<15} Bytes: {:}", - page_ind, - column, - page.uncompressed_size() - )?; - let (dict, msg_type) = match page.header() { - DataPageHeader::V1(_) => { - if let Some(dict) = page.dictionary_page { - (dict, "PageV1") - } else { - continue; - } - } - DataPageHeader::V2(_) => { - if let Some(dict) = page.dictionary_page { - (dict, "PageV2") - } else { - continue; - } - } - }; - writeln!( - writer, - "Compressed page: {:<15} Physical type: {:?}", - msg_type, - dict.physical_type() - )?; + writeln!(writer, "\nPage: {:<10}Column: {:<15}", page_ind, column,)?; - print_dictionary(dict, sample_size, writer)?; + let page = decompress(page, &mut decompress_buffer)?; + match page { + Page::Dict(_) => { + todo!() + } + Page::Data(page) => { + match page.descriptor.primitive_type.physical_type { + PhysicalType::Int32 => { + print_page::(page.buffer(), sample_size, true, writer)? + } + PhysicalType::Int64 => { + print_page::(page.buffer(), sample_size, true, writer)? + } + PhysicalType::Float => { + print_page::(page.buffer(), sample_size, true, writer)? + } + PhysicalType::Double => { + print_page::(page.buffer(), sample_size, true, writer)? + } + _ => continue, + }; + } + } } } } @@ -92,66 +118,34 @@ where Ok(()) } -fn print_dictionary(dict: Arc, sample_size: usize, writer: &mut W) -> Result<()> +pub fn read( + buf: &[u8], + num_values: usize, + _is_sorted: bool, +) -> Result> { + let size_of = std::mem::size_of::(); + + let typed_size = num_values.wrapping_mul(size_of); + + let values = buf.get(..typed_size).ok_or_else(|| { + Error::OutOfSpec( + "The number of values declared in the dict page does not match the length of the page" + .to_string(), + ) + })?; + + let values = values.chunks_exact(size_of).map(decode::).collect(); + + Ok(PrimitivePageDict::new(values)) +} + +fn print_page(buffer: &[u8], sample_size: usize, sorted: bool, writer: &mut W) -> Result<()> where + T: NativeType, W: Write, { - match dict.physical_type() { - PhysicalType::Boolean => { - writeln!(writer, "Boolean physical type cannot be dictionary-encoded")?; - } - PhysicalType::Int32 => { - if let Some(res) = dict.as_any().downcast_ref::>() { - print_iterator(res.values().iter(), sample_size, writer)?; - } - } - PhysicalType::Int64 => { - if let Some(res) = dict.as_any().downcast_ref::>() { - print_iterator(res.values().iter(), sample_size, writer)?; - } - } - PhysicalType::Int96 => { - if let Some(res) = dict.as_any().downcast_ref::>() { - print_iterator(res.values().iter(), sample_size, writer)?; - } - } - PhysicalType::Float => { - if let Some(res) = dict.as_any().downcast_ref::>() { - print_iterator(res.values().iter(), sample_size, writer)?; - } - } - PhysicalType::Double => { - if let Some(res) = dict.as_any().downcast_ref::>() { - print_iterator(res.values().iter(), sample_size, writer)?; - } - } - PhysicalType::ByteArray => { - if let Some(res) = dict.as_any().downcast_ref::() { - for (i, pair) in res.offsets().windows(2).enumerate().take(sample_size) { - let bytes = &res.values()[pair[0] as usize..pair[1] as usize]; - let msg = String::from_utf8_lossy(bytes); - - writeln!(writer, "Value: {:<10}\t{:?}", i, msg)?; - } - } - } - PhysicalType::FixedLenByteArray(size) => { - if let Some(res) = dict.as_any().downcast_ref::() { - for (i, bytes) in res - .values() - .chunks(*size as usize) - .enumerate() - .take(sample_size) - { - let msg = String::from_utf8_lossy(bytes); - - writeln!(writer, "Value: {:<10}\t{:?}", i, msg)?; - } - } - } - } - - Ok(()) + let dict = read::(buffer, sample_size, sorted)?; + print_iterator(dict.values().iter(), sample_size, writer) } fn print_iterator(iter: I, sample_size: usize, writer: &mut W) -> Result<()>