From ea138a92a02f1f8c8326c879be3cdcd74439cf94 Mon Sep 17 00:00:00 2001 From: Maxime Dion Date: Mon, 19 Aug 2024 13:57:19 -0500 Subject: [PATCH 1/3] first steps to allow one dimensional reads of multi dim replicated data --- src/async_reader/zarr_read_async.rs | 148 ++++++++++++++++++++--- src/lib.rs | 15 +++ src/reader/metadata.rs | 178 ++++++++++++++++++++++++---- src/reader/zarr_read.rs | 127 ++++++++++++++++++-- 4 files changed, 412 insertions(+), 56 deletions(-) diff --git a/src/async_reader/zarr_read_async.rs b/src/async_reader/zarr_read_async.rs index d5c8776..d28805e 100644 --- a/src/async_reader/zarr_read_async.rs +++ b/src/async_reader/zarr_read_async.rs @@ -64,28 +64,41 @@ impl ZarrPath { impl<'a> ZarrReadAsync<'a> for ZarrPath { async fn get_zarr_metadata(&self) -> ZarrResult { let mut meta = ZarrStoreMetadata::new(); - let stream = self.store.list(Some(&self.location)); + let stream = self.store.list(Some(&self.location)); pin_mut!(stream); + + let mut meta_strs: Vec<(String, String)> = Vec::new(); + let mut attrs_map: HashMap = HashMap::new(); while let Some(p) = stream.next().await { let p = p?.location; if let Some(s) = p.filename() { - if s == ".zarray" || s == "zarr.json" { + // parse the file with the array metadata or the attributes (only for zarr v2 for the latter) + if s == ".zarray" || s == "zarr.json" || s == ".zattrs" { if let Some(mut dir_name) = p.prefix_match(&self.location) { let array_name = dir_name.next().unwrap().as_ref().to_string(); let get_res = self.store.get(&p).await?; - let meta_str = match get_res.payload { + let data_str = match get_res.payload { GetResultPayload::File(_, p) => read_to_string(p)?, GetResultPayload::Stream(_) => { std::str::from_utf8(&get_res.bytes().await?)?.to_string() } }; - meta.add_column(array_name, &meta_str)?; + match s { + ".zarray" | "zarr.json" => meta_strs.push((array_name, data_str)), + ".zattrs" => _ = attrs_map.insert(array_name, data_str), + _ => {} + }; } } } } + for (array_name, meta_str) in meta_strs { + let attrs = attrs_map.get(&array_name).map(|x| x.as_str()); + meta.add_column(array_name, &meta_str, attrs)?; + } + if meta.get_num_columns() == 0 { return Err(ZarrError::InvalidMetadata( "Could not find valid metadata in zarr store".to_string(), @@ -157,26 +170,21 @@ impl<'a> ZarrReadAsync<'a> for ZarrPath { #[cfg(test)] mod zarr_read_async_tests { - use object_store::{local::LocalFileSystem, path::Path}; + use object_store::path::Path; use std::collections::HashSet; - use std::path::PathBuf; use std::sync::Arc; use super::*; - use crate::reader::codecs::{Endianness, ZarrCodec, ZarrDataType}; + use crate::reader::codecs::{ + BloscOptions, CompressorName, Endianness, ShuffleOptions, ZarrCodec, ZarrDataType, + }; use crate::reader::metadata::{ChunkSeparator, ZarrArrayMetadata}; use crate::reader::ZarrProjection; - - fn get_test_data_file_system() -> LocalFileSystem { - LocalFileSystem::new_with_prefix( - PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test-data/data/zarr/v2_data"), - ) - .unwrap() - } + use crate::tests::{get_test_v2_data_file_system, get_test_v3_data_file_system}; #[tokio::test] - async fn read_metadata() { - let file_sys = get_test_data_file_system(); + async fn read_v2_metadata() { + let file_sys = get_test_v2_data_file_system(); let p = Path::parse("raw_bytes_example.zarr").unwrap(); let store = ZarrPath::new(Arc::new(file_sys), p); @@ -211,9 +219,113 @@ mod zarr_read_async_tests { ); } + // read the store metadata, which includes one dim represenations of some variables, + // given a path to a zarr store. + #[tokio::test] + async fn read_v2_metadata_w_one_dim_repr() { + let file_sys = get_test_v2_data_file_system(); + let p = Path::parse("lat_lon_example_w_1d_repr.zarr").unwrap(); + + let store = ZarrPath::new(Arc::new(file_sys), p); + let meta = store.get_zarr_metadata().await.unwrap(); + + // check the one dim repr for the lat + assert_eq!(meta.get_one_dim_repr_meta("lat").unwrap().0, 0); + assert_eq!(meta.get_one_dim_repr_meta("lat").unwrap().1, "one_d_lat"); + assert_eq!( + meta.get_one_dim_repr_meta("lat").unwrap().2, + ZarrArrayMetadata::new( + 2, + ZarrDataType::Float(8), + ChunkPattern { + separator: ChunkSeparator::Period, + c_prefix: false + }, + None, + vec![ + ZarrCodec::Bytes(Endianness::Little), + ZarrCodec::BloscCompressor(BloscOptions::new( + CompressorName::Lz4, + 5, + ShuffleOptions::ByteShuffle(8), + 0, + )), + ], + ) + ); + + // check the one dim repr for the lon + assert_eq!(meta.get_one_dim_repr_meta("lon").unwrap().0, 1); + assert_eq!(meta.get_one_dim_repr_meta("lon").unwrap().1, "one_d_lon"); + assert_eq!( + meta.get_one_dim_repr_meta("lon").unwrap().2, + ZarrArrayMetadata::new( + 2, + ZarrDataType::Float(8), + ChunkPattern { + separator: ChunkSeparator::Period, + c_prefix: false + }, + None, + vec![ + ZarrCodec::Bytes(Endianness::Little), + ZarrCodec::BloscCompressor(BloscOptions::new( + CompressorName::Lz4, + 5, + ShuffleOptions::ByteShuffle(8), + 0, + )), + ], + ) + ); + } + + #[tokio::test] + async fn read_v3_metadata_w_one_dim_repr() { + let file_sys = get_test_v3_data_file_system(); + let p = Path::parse("with_one_d_repr.zarr").unwrap(); + + let store = ZarrPath::new(Arc::new(file_sys), p); + let meta = store.get_zarr_metadata().await.unwrap(); + + // check the one dim repr for the lat + assert_eq!(meta.get_one_dim_repr_meta("lat").unwrap().0, 0); + assert_eq!(meta.get_one_dim_repr_meta("lat").unwrap().1, "one_d_lat"); + assert_eq!( + meta.get_one_dim_repr_meta("lat").unwrap().2, + ZarrArrayMetadata::new( + 3, + ZarrDataType::Float(8), + ChunkPattern { + separator: ChunkSeparator::Period, + c_prefix: false + }, + None, + vec![ZarrCodec::Bytes(Endianness::Little)], + ) + ); + + // check the one dim repr for the lon + assert_eq!(meta.get_one_dim_repr_meta("lon").unwrap().0, 1); + assert_eq!(meta.get_one_dim_repr_meta("lon").unwrap().1, "one_d_lon"); + assert_eq!( + meta.get_one_dim_repr_meta("lon").unwrap().2, + ZarrArrayMetadata::new( + 3, + ZarrDataType::Float(8), + ChunkPattern { + separator: ChunkSeparator::Period, + c_prefix: false + }, + None, + vec![ZarrCodec::Bytes(Endianness::Little)], + ) + ); + } + #[tokio::test] - async fn read_raw_chunks() { - let file_sys = get_test_data_file_system(); + async fn read_v2_raw_chunks() { + let file_sys = get_test_v2_data_file_system(); let p = Path::parse("raw_bytes_example.zarr").unwrap(); let store = ZarrPath::new(Arc::new(file_sys), p); diff --git a/src/lib.rs b/src/lib.rs index 158bf7f..425ef5e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -23,6 +23,7 @@ pub mod datafusion; #[cfg(test)] mod tests { + use object_store::local::LocalFileSystem; use std::path::PathBuf; #[cfg(feature = "datafusion")] @@ -37,4 +38,18 @@ mod tests { .join("test-data/data/zarr/v3_data") .join(zarr_array) } + + pub(crate) fn get_test_v2_data_file_system() -> LocalFileSystem { + LocalFileSystem::new_with_prefix( + PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test-data/data/zarr/v2_data"), + ) + .unwrap() + } + + pub(crate) fn get_test_v3_data_file_system() -> LocalFileSystem { + LocalFileSystem::new_with_prefix( + PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test-data/data/zarr/v3_data"), + ) + .unwrap() + } } diff --git a/src/reader/metadata.rs b/src/reader/metadata.rs index a61d945..5eb712b 100644 --- a/src/reader/metadata.rs +++ b/src/reader/metadata.rs @@ -212,6 +212,13 @@ pub struct ZarrStoreMetadata { shape: Option>, last_chunk_idx: Option>, array_params: HashMap, + + // This can be used to use a 1D representation of 2D or 3D data, when + // for example the data is a coordinate that is just duplicated along + // other dimensions. The usize is the dimension position (e.g. for + // lat by lon array, the lat would be 0, the lon would be 1) and the + // array metadata is for the files with the 1D representation. + one_dim_representations: HashMap, } impl ZarrStoreMetadata { @@ -223,6 +230,7 @@ impl ZarrStoreMetadata { shape: None, last_chunk_idx: None, array_params: HashMap::new(), + one_dim_representations: HashMap::new(), } } @@ -343,20 +351,56 @@ fn extract_compressor_params_v2( Ok(comp) } +fn extract_one_dim_representation( + params_map: &serde_json::Value, +) -> ZarrResult> { + let error_string = "error parsing one dimension representation"; + let params = params_map.get("one_dim_representation_of"); + if let Some(params) = params { + let col_name = extract_string_from_json(params, "col_name", error_string)?; + let dim_num = extract_u64_from_json(params, "dimension_number", error_string)? as usize; + return Ok(Some((dim_num, col_name))); + } + + Ok(None) +} + // Method to populate zarr metadata from zarr arrays metadata, using // version 2 of the zarr format. impl ZarrStoreMetadata { - fn add_column_v2(&mut self, col_name: String, meta_map: Value) -> ZarrResult<()> { + fn add_column_v2( + &mut self, + col_name: String, + meta_map: Value, + attrs_map: Option, + ) -> ZarrResult<()> { + // check if the column being added is a one dim representation on another array + let mut one_dim_repr = None; + if let Some(attrs_map) = attrs_map { + one_dim_repr = extract_one_dim_representation(&attrs_map)?; + } + // parse chunks let error_string = "error parsing metadata chunks"; - let chunks = extract_arr_and_check(&meta_map, "chunks", error_string, &self.chunks)?; - if self.chunks.is_none() { + let chnk_target = if one_dim_repr.is_some() { + &None + } else { + &self.chunks + }; + let chunks = extract_arr_and_check(&meta_map, "chunks", error_string, chnk_target)?; + + if self.chunks.is_none() && one_dim_repr.is_none() { self.chunks = Some(chunks.clone()); } // parse shape let error_string = "error parsing metadata shape"; - let shape = extract_arr_and_check(&meta_map, "shape", error_string, &self.shape)?; + let shp_target = if one_dim_repr.is_some() { + &None + } else { + &self.shape + }; + let shape = extract_arr_and_check(&meta_map, "shape", error_string, shp_target)?; if chunks.len() != shape.len() { return Err(ZarrError::InvalidMetadata( @@ -364,7 +408,7 @@ impl ZarrStoreMetadata { )); } - if self.shape.is_none() { + if self.shape.is_none() && one_dim_repr.is_none() { self.shape = Some(shape.clone()); } @@ -403,7 +447,7 @@ impl ZarrStoreMetadata { // parser order let error_string = "error parsing metadata order"; - let dim = self.shape.as_ref().unwrap().len(); + let dim = shape.len(); let order = extract_string_from_json(&meta_map, "order", error_string)?; let order = order.chars().next().unwrap(); @@ -444,6 +488,13 @@ impl ZarrStoreMetadata { codecs, }; + // insert differently is the column is a one dim representation + if let Some((dim_num, full_var_name)) = one_dim_repr { + self.one_dim_representations + .insert(full_var_name, (dim_num, col_name, array_meta)); + return Ok(()); + } + self.columns.push(col_name.to_string()); self.array_params.insert(col_name, array_meta); @@ -588,6 +639,15 @@ fn extract_sharding_options( // version 3 of the zarr format. impl ZarrStoreMetadata { fn add_column_v3(&mut self, col_name: String, meta_map: Value) -> ZarrResult<()> { + // check if this array is a one dimension representation of a different column + let mut one_dim_repr = None; + let attrbs = meta_map.get("attributes"); + if let Some(attrbs) = attrbs { + if let Some((dim_num, full_var_name)) = extract_one_dim_representation(attrbs)? { + one_dim_repr = Some((dim_num, full_var_name)) + } + } + // verify the metadata is for an array. let error_string = "error parsing node type from metadata"; let node_type = extract_string_from_json(&meta_map, "node_type", error_string)?; @@ -599,8 +659,13 @@ impl ZarrStoreMetadata { // parse shape let error_string = "error parsing metadata shape"; - let shape = extract_arr_and_check(&meta_map, "shape", error_string, &self.shape)?; - if self.shape.is_none() { + let shp_target = if one_dim_repr.is_some() { + &None + } else { + &self.shape + }; + let shape = extract_arr_and_check(&meta_map, "shape", error_string, shp_target)?; + if self.shape.is_none() && one_dim_repr.is_none() { self.shape = Some(shape.clone()); } @@ -617,8 +682,13 @@ impl ZarrStoreMetadata { "only regular chunks are supported".to_string(), )); } - let chunks = extract_arr_and_check(config, "chunk_shape", error_string, &self.chunks)?; - if self.chunks.is_none() { + let chnk_target = if one_dim_repr.is_some() { + &None + } else { + &self.chunks + }; + let chunks = extract_arr_and_check(config, "chunk_shape", error_string, chnk_target)?; + if self.chunks.is_none() && one_dim_repr.is_none() { self.chunks = Some(chunks.clone()); } @@ -700,6 +770,13 @@ impl ZarrStoreMetadata { let array_meta = ZarrArrayMetadata::new(3, data_type, chunk_key_encoding, sharding_options, codecs); + // insert differently is the column is a one dim representation + if let Some((dim_num, full_var_name)) = one_dim_repr { + self.one_dim_representations + .insert(full_var_name, (dim_num, col_name, array_meta)); + return Ok(()); + } + self.columns.push(col_name.to_string()); self.array_params.insert(col_name, array_meta); @@ -710,7 +787,12 @@ impl ZarrStoreMetadata { // Method to populate zarr metadata from zarr arrays metadata, works with either // zarr_format version 2 or 3. impl ZarrStoreMetadata { - pub(crate) fn add_column(&mut self, col_name: String, metadata_str: &str) -> ZarrResult<()> { + pub(crate) fn add_column( + &mut self, + col_name: String, + metadata_str: &str, + attrbs_map: Option<&str>, + ) -> ZarrResult<()> { let meta_map: Value = serde_json::from_str(metadata_str).or(Err( ZarrError::InvalidMetadata("could not parse metadata string".to_string()), ))?; @@ -719,9 +801,21 @@ impl ZarrStoreMetadata { match version { 2 => { - self.add_column_v2(col_name, meta_map)?; + let attrbs_map: Option = if let Some(attrbs_map) = attrbs_map { + serde_json::from_str(attrbs_map).or(Err(ZarrError::InvalidMetadata( + "could not parse aatributes string".to_string(), + )))? + } else { + None + }; + self.add_column_v2(col_name, meta_map, attrbs_map)?; } 3 => { + if attrbs_map.is_some() { + return Err(ZarrError::InvalidMetadata( + "zarr v3 does not support a separate attribute map".to_string(), + )); + } self.add_column_v3(col_name, meta_map)?; } _ => { @@ -753,6 +847,18 @@ impl ZarrStoreMetadata { ))) } + pub(crate) fn get_one_dim_repr_meta( + &self, + column: &str, + ) -> ZarrResult<&(usize, String, ZarrArrayMetadata)> { + self.one_dim_representations + .get(column) + .ok_or(ZarrError::InvalidMetadata(format!( + "Cannot find variable {} in one dimensional representations", + column + ))) + } + // return the real dimensions of a chhunk, given its position, taking into // account that it can be at the "edge" of the array for one or more dimension. pub(crate) fn get_real_dims(&self, pos: &[usize]) -> Vec { @@ -839,7 +945,8 @@ mod zarr_metadata_v3_tests { "shuffle": 1 } }"#; - meta.add_column("var1".to_string(), metadata_str).unwrap(); + meta.add_column("var1".to_string(), metadata_str, None) + .unwrap(); let metadata_str = r#" { @@ -855,7 +962,8 @@ mod zarr_metadata_v3_tests { "shuffle": 2 } }"#; - meta.add_column("var2".to_string(), metadata_str).unwrap(); + meta.add_column("var2".to_string(), metadata_str, None) + .unwrap(); let metadata_str = r#" { @@ -866,7 +974,8 @@ mod zarr_metadata_v3_tests { "order": "F", "compressor": null }"#; - meta.add_column("var3".to_string(), metadata_str).unwrap(); + meta.add_column("var3".to_string(), metadata_str, None) + .unwrap(); let metadata_str = r#" { @@ -882,7 +991,8 @@ mod zarr_metadata_v3_tests { "shuffle": 1 } }"#; - meta.add_column("var4".to_string(), metadata_str).unwrap(); + meta.add_column("var4".to_string(), metadata_str, None) + .unwrap(); assert_eq!(meta.chunks, Some(vec![10, 10])); assert_eq!(meta.shape, Some(vec![100, 100])); @@ -989,7 +1099,8 @@ mod zarr_metadata_v3_tests { "shuffle": 1 } }"#; - meta.add_column("var".to_string(), metadata_str).unwrap(); + meta.add_column("var".to_string(), metadata_str, None) + .unwrap(); assert_eq!(meta.last_chunk_idx, Some(vec![8, 8])) } @@ -1014,7 +1125,9 @@ mod zarr_metadata_v3_tests { "shuffle": 2 } }"#; - assert!(meta.add_column("var".to_string(), metadata_str).is_err()); + assert!(meta + .add_column("var".to_string(), metadata_str, None) + .is_err()); // invalid compressor let metadata_str = r#" @@ -1031,7 +1144,9 @@ mod zarr_metadata_v3_tests { "shuffle": 2 } }"#; - assert!(meta.add_column("var".to_string(), metadata_str).is_err()); + assert!(meta + .add_column("var".to_string(), metadata_str, None) + .is_err()); // mismatch between chunks // first let's create one valid array metadata @@ -1049,7 +1164,8 @@ mod zarr_metadata_v3_tests { "shuffle": 1 } }"#; - meta.add_column("var1".to_string(), metadata_str).unwrap(); + meta.add_column("var1".to_string(), metadata_str, None) + .unwrap(); let metadata_str = r#" { @@ -1065,7 +1181,9 @@ mod zarr_metadata_v3_tests { "shuffle": 1 } }"#; - assert!(meta.add_column("var2".to_string(), metadata_str).is_err()); + assert!(meta + .add_column("var2".to_string(), metadata_str, None) + .is_err()); // mismatch between shapes let metadata_str = r#" @@ -1082,7 +1200,9 @@ mod zarr_metadata_v3_tests { "shuffle": 1 } }"#; - assert!(meta.add_column("var2".to_string(), metadata_str).is_err()); + assert!(meta + .add_column("var2".to_string(), metadata_str, None) + .is_err()); } #[test] @@ -1109,7 +1229,8 @@ mod zarr_metadata_v3_tests { "zarr_format": 3, "node_type": "array" }"#; - meta.add_column("var1".to_string(), metadata_str).unwrap(); + meta.add_column("var1".to_string(), metadata_str, None) + .unwrap(); assert_eq!(meta.chunks, Some(vec![4, 4])); assert_eq!(meta.shape, Some(vec![16, 16])); @@ -1178,7 +1299,8 @@ mod zarr_metadata_v3_tests { "zarr_format": 3, "node_type": "array" }"#; - meta.add_column("var2".to_string(), metadata_str).unwrap(); + meta.add_column("var2".to_string(), metadata_str, None) + .unwrap(); assert_eq!(meta.chunks, Some(vec![8, 8])); assert_eq!(meta.shape, Some(vec![16, 16])); @@ -1243,7 +1365,9 @@ mod zarr_metadata_v3_tests { "zarr_format": 3, "node_type": "array" }"#; - assert!(meta.add_column("var1".to_string(), metadata_str).is_err()); + assert!(meta + .add_column("var1".to_string(), metadata_str, None) + .is_err()); // mismatch between shape and chunks let metadata_str = r#" @@ -1265,6 +1389,8 @@ mod zarr_metadata_v3_tests { "zarr_format": 3, "node_type": "array" }"#; - assert!(meta.add_column("var2".to_string(), metadata_str).is_err()); + assert!(meta + .add_column("var2".to_string(), metadata_str, None) + .is_err()); } } diff --git a/src/reader/zarr_read.rs b/src/reader/zarr_read.rs index 1b86426..c4c7ee7 100644 --- a/src/reader/zarr_read.rs +++ b/src/reader/zarr_read.rs @@ -274,6 +274,14 @@ impl ZarrRead for PathBuf { p = dir_entry.path().join("zarr.json"); } + // check if there's a file with attributes (only for v2) + let mut attrs: Option = None; + let attrs_path = dir_entry.path().join(".zattrs"); + if attrs_path.exists() { + let attrs_str = read_to_string(attrs_path)?; + attrs = Some(attrs_str.to_string()); + } + if p.exists() { let meta_str = read_to_string(p)?; meta.add_column( @@ -285,6 +293,7 @@ impl ZarrRead for PathBuf { .unwrap() .to_string(), &meta_str, + attrs.as_deref(), )?; } } @@ -346,22 +355,18 @@ impl ZarrRead for PathBuf { #[cfg(test)] mod zarr_read_tests { use std::collections::HashSet; - use std::path::PathBuf; use super::*; - use crate::reader::codecs::{Endianness, ZarrCodec, ZarrDataType}; + use crate::reader::codecs::{ + BloscOptions, CompressorName, Endianness, ShuffleOptions, ZarrCodec, ZarrDataType, + }; use crate::reader::metadata::{ChunkSeparator, ZarrArrayMetadata}; - - fn get_test_data_path(zarr_store: String) -> PathBuf { - PathBuf::from(env!("CARGO_MANIFEST_DIR")) - .join("test-data/data/zarr/v2_data") - .join(zarr_store) - } + use crate::tests::{get_test_v2_data_path, get_test_v3_data_path}; // read the store metadata, given a path to a zarr store. #[test] - fn read_metadata() { - let p = get_test_data_path("raw_bytes_example.zarr".to_string()); + fn read_v2_metadata() { + let p = get_test_v2_data_path("raw_bytes_example.zarr".to_string()); let meta = p.get_zarr_metadata().unwrap(); assert_eq!(meta.get_columns(), &vec!["byte_data", "float_data"]); @@ -393,11 +398,109 @@ mod zarr_read_tests { ); } + // read the store metadata, which includes one dim represenations of some variables, + // given a path to a zarr store. + #[test] + fn read_v2_metadata_w_one_dim_repr() { + let p = get_test_v2_data_path("lat_lon_example_w_1d_repr.zarr".to_string()); + let meta = p.get_zarr_metadata().unwrap(); + + // check the one dim repr for the lat + assert_eq!(meta.get_one_dim_repr_meta("lat").unwrap().0, 0); + assert_eq!(meta.get_one_dim_repr_meta("lat").unwrap().1, "one_d_lat"); + assert_eq!( + meta.get_one_dim_repr_meta("lat").unwrap().2, + ZarrArrayMetadata::new( + 2, + ZarrDataType::Float(8), + ChunkPattern { + separator: ChunkSeparator::Period, + c_prefix: false + }, + None, + vec![ + ZarrCodec::Bytes(Endianness::Little), + ZarrCodec::BloscCompressor(BloscOptions::new( + CompressorName::Lz4, + 5, + ShuffleOptions::ByteShuffle(8), + 0, + )), + ], + ) + ); + + // check the one dim repr for the lon + assert_eq!(meta.get_one_dim_repr_meta("lon").unwrap().0, 1); + assert_eq!(meta.get_one_dim_repr_meta("lon").unwrap().1, "one_d_lon"); + assert_eq!( + meta.get_one_dim_repr_meta("lon").unwrap().2, + ZarrArrayMetadata::new( + 2, + ZarrDataType::Float(8), + ChunkPattern { + separator: ChunkSeparator::Period, + c_prefix: false + }, + None, + vec![ + ZarrCodec::Bytes(Endianness::Little), + ZarrCodec::BloscCompressor(BloscOptions::new( + CompressorName::Lz4, + 5, + ShuffleOptions::ByteShuffle(8), + 0, + )), + ], + ) + ); + } + + #[test] + fn read_v3_metadata_w_one_dim_repr() { + let p = get_test_v3_data_path("with_one_d_repr.zarr".to_string()); + let meta = p.get_zarr_metadata().unwrap(); + + // check the one dim repr for the lat + assert_eq!(meta.get_one_dim_repr_meta("lat").unwrap().0, 0); + assert_eq!(meta.get_one_dim_repr_meta("lat").unwrap().1, "one_d_lat"); + assert_eq!( + meta.get_one_dim_repr_meta("lat").unwrap().2, + ZarrArrayMetadata::new( + 3, + ZarrDataType::Float(8), + ChunkPattern { + separator: ChunkSeparator::Period, + c_prefix: false + }, + None, + vec![ZarrCodec::Bytes(Endianness::Little)], + ) + ); + + // check the one dim repr for the lon + assert_eq!(meta.get_one_dim_repr_meta("lon").unwrap().0, 1); + assert_eq!(meta.get_one_dim_repr_meta("lon").unwrap().1, "one_d_lon"); + assert_eq!( + meta.get_one_dim_repr_meta("lon").unwrap().2, + ZarrArrayMetadata::new( + 3, + ZarrDataType::Float(8), + ChunkPattern { + separator: ChunkSeparator::Period, + c_prefix: false + }, + None, + vec![ZarrCodec::Bytes(Endianness::Little)], + ) + ); + } + // read the raw data contained into a zarr store. one of the variables contains // byte data, which we explicitly check here. #[test] - fn read_raw_chunks() { - let p = get_test_data_path("raw_bytes_example.zarr".to_string()); + fn read_v2_raw_chunks() { + let p = get_test_v2_data_path("raw_bytes_example.zarr".to_string()); let meta = p.get_zarr_metadata().unwrap(); // test read from an array where the data is just raw bytes From d987de963830775c2cf29f90467805285640e694 Mon Sep 17 00:00:00 2001 From: Maxime Dion Date: Mon, 9 Sep 2024 09:26:20 -0500 Subject: [PATCH 2/3] added functionality to read 2D or 3D arrays based on their 1D representation, when applicable --- src/async_reader/mod.rs | 93 +++++++++------ src/async_reader/zarr_read_async.rs | 76 ++++++++---- src/reader/codecs.rs | 80 ++++++++++++- src/reader/metadata.rs | 12 +- src/reader/mod.rs | 176 +++++++++++++++++----------- src/reader/zarr_read.rs | 76 +++++++++--- test-data | 2 +- 7 files changed, 360 insertions(+), 155 deletions(-) diff --git a/src/async_reader/mod.rs b/src/async_reader/mod.rs index 1f7a1ac..79e6b7b 100644 --- a/src/async_reader/mod.rs +++ b/src/async_reader/mod.rs @@ -161,6 +161,7 @@ where &cols, self.meta.get_real_dims(pos), self.meta.get_chunk_patterns(), + self.meta.get_one_dim_repr_meta(), ) .await; @@ -757,39 +758,8 @@ mod zarr_async_reader_tests { assert!(matched); } - #[tokio::test] - async fn projection_tests() { - let zp = get_v2_test_data_path("compression_example.zarr".to_string()); - let proj = ZarrProjection::keep(vec!["bool_data".to_string(), "int_data".to_string()]); - let stream_builder = ZarrRecordBatchStreamBuilder::new(zp).with_projection(proj); - - let stream = stream_builder.build().await.unwrap(); - let records: Vec<_> = stream.try_collect().await.unwrap(); - - let target_types = HashMap::from([ - ("bool_data".to_string(), DataType::Boolean), - ("int_data".to_string(), DataType::Int64), - ]); - - // center chunk - let rec = &records[4]; - validate_names_and_types(&target_types, rec); - validate_bool_column( - "bool_data", - rec, - &[false, true, false, false, true, false, false, true, false], - ); - validate_primitive_column::( - "int_data", - rec, - &[-4, -3, -2, 4, 5, 6, 12, 13, 14], - ); - } - - #[tokio::test] - async fn filters_tests() { - // set the filters to select part of the raster, based on lat and - // lon coordinates. + // create a test filter + fn create_filter() -> ZarrChunkFilter { let mut filters: Vec> = Vec::new(); let f = ZarrArrowPredicateFn::new( ZarrProjection::keep(vec!["lat".to_string()]), @@ -822,9 +792,42 @@ mod zarr_async_reader_tests { ); filters.push(Box::new(f)); + ZarrChunkFilter::new(filters) + } + + #[tokio::test] + async fn projection_tests() { + let zp = get_v2_test_data_path("compression_example.zarr".to_string()); + let proj = ZarrProjection::keep(vec!["bool_data".to_string(), "int_data".to_string()]); + let stream_builder = ZarrRecordBatchStreamBuilder::new(zp).with_projection(proj); + + let stream = stream_builder.build().await.unwrap(); + let records: Vec<_> = stream.try_collect().await.unwrap(); + + let target_types = HashMap::from([ + ("bool_data".to_string(), DataType::Boolean), + ("int_data".to_string(), DataType::Int64), + ]); + + // center chunk + let rec = &records[4]; + validate_names_and_types(&target_types, rec); + validate_bool_column( + "bool_data", + rec, + &[false, true, false, false, true, false, false, true, false], + ); + validate_primitive_column::( + "int_data", + rec, + &[-4, -3, -2, 4, 5, 6, 12, 13, 14], + ); + } + + #[tokio::test] + async fn filters_tests() { let zp = get_v2_test_data_path("lat_lon_example.zarr".to_string()); - let stream_builder = - ZarrRecordBatchStreamBuilder::new(zp).with_filter(ZarrChunkFilter::new(filters)); + let stream_builder = ZarrRecordBatchStreamBuilder::new(zp).with_filter(create_filter()); let stream = stream_builder.build().await.unwrap(); let records: Vec<_> = stream.try_collect().await.unwrap(); @@ -867,6 +870,26 @@ mod zarr_async_reader_tests { ); } + #[tokio::test] + async fn one_dim_repr_tests() { + let zp = get_v2_test_data_path("lat_lon_example_w_1d_repr.zarr".to_string()); + let stream_builder = ZarrRecordBatchStreamBuilder::new(zp).with_filter(create_filter()); + + let stream = stream_builder.build().await.unwrap(); + let records_from_one_d_repr: Vec<_> = stream.try_collect().await.unwrap(); + + let zp = get_v2_test_data_path("lat_lon_example.zarr".to_string()); + let stream_builder = ZarrRecordBatchStreamBuilder::new(zp).with_filter(create_filter()); + + let stream = stream_builder.build().await.unwrap(); + let records: Vec<_> = stream.try_collect().await.unwrap(); + + assert_eq!(records_from_one_d_repr.len(), records.len()); + for (rec, rec_from_one_d_repr) in records.iter().zip(records_from_one_d_repr.iter()) { + assert_eq!(rec, rec_from_one_d_repr); + } + } + #[tokio::test] async fn multiple_readers_tests() { let zp = get_v2_test_data_path("compression_example.zarr".to_string()); diff --git a/src/async_reader/zarr_read_async.rs b/src/async_reader/zarr_read_async.rs index d28805e..1ed63c2 100644 --- a/src/async_reader/zarr_read_async.rs +++ b/src/async_reader/zarr_read_async.rs @@ -22,7 +22,7 @@ use std::collections::HashMap; use std::fs::{read, read_to_string}; use std::sync::Arc; -use crate::reader::metadata::{ChunkPattern, ChunkSeparator}; +use crate::reader::metadata::{ChunkPattern, ChunkSeparator, ZarrArrayMetadata}; use crate::reader::{ZarrError, ZarrResult}; use crate::reader::{ZarrInMemoryChunk, ZarrStoreMetadata}; @@ -41,6 +41,7 @@ pub trait ZarrReadAsync<'a> { cols: &'a [String], real_dims: Vec, patterns: HashMap, + one_dim_repr: &HashMap, ) -> ZarrResult; } @@ -113,15 +114,35 @@ impl<'a> ZarrReadAsync<'a> for ZarrPath { cols: &'a [String], real_dims: Vec, patterns: HashMap, + one_dim_repr: &HashMap, ) -> ZarrResult { let mut chunk = ZarrInMemoryChunk::new(real_dims); for var in cols { - let s: Vec = position.iter().map(|i| i.to_string()).collect(); - let pattern = patterns - .get(var.as_str()) - .ok_or(ZarrError::InvalidMetadata( - "Could not find separator for column".to_string(), - ))?; + // this is admittedly a bit hard to follow without context. here we check to see if + // "var" has a one dimentional representation that we should use instead of the larger + // dimensional data. if there is, the file name will be different, and the "index" of + // the file will be one dimensional, e.g. if the real variable is [x, y], and the one + // dimensional representation is along the second dimension, then we would look for + // var.y (or var/y) instead of var.x.y. + let real_var_name = var; + let (s, var, pattern) = if let Some((pos, repr_name, meta)) = one_dim_repr.get(var) { + ( + vec![position[*pos].to_string()], + repr_name, + meta.get_chunk_pattern(), + ) + } else { + ( + position.iter().map(|i| i.to_string()).collect(), + var, + patterns + .get(var.as_str()) + .ok_or(ZarrError::InvalidMetadata( + "Could not find separator for column".to_string(), + ))? + .clone(), + ) + }; let p = match pattern { ChunkPattern { @@ -161,7 +182,7 @@ impl<'a> ZarrReadAsync<'a> for ZarrPath { GetResultPayload::File(_, p) => read(p)?, GetResultPayload::Stream(_) => get_res.bytes().await?.to_vec(), }; - chunk.add_array(var.to_string(), data); + chunk.add_array(real_var_name.to_string(), data); } Ok(chunk) @@ -230,10 +251,13 @@ mod zarr_read_async_tests { let meta = store.get_zarr_metadata().await.unwrap(); // check the one dim repr for the lat - assert_eq!(meta.get_one_dim_repr_meta("lat").unwrap().0, 0); - assert_eq!(meta.get_one_dim_repr_meta("lat").unwrap().1, "one_d_lat"); + assert_eq!(meta.get_one_dim_repr_meta().get("lat").unwrap().0, 1); + assert_eq!( + meta.get_one_dim_repr_meta().get("lat").unwrap().1, + "one_d_lat" + ); assert_eq!( - meta.get_one_dim_repr_meta("lat").unwrap().2, + meta.get_one_dim_repr_meta().get("lat").unwrap().2, ZarrArrayMetadata::new( 2, ZarrDataType::Float(8), @@ -255,10 +279,13 @@ mod zarr_read_async_tests { ); // check the one dim repr for the lon - assert_eq!(meta.get_one_dim_repr_meta("lon").unwrap().0, 1); - assert_eq!(meta.get_one_dim_repr_meta("lon").unwrap().1, "one_d_lon"); + assert_eq!(meta.get_one_dim_repr_meta().get("lon").unwrap().0, 0); + assert_eq!( + meta.get_one_dim_repr_meta().get("lon").unwrap().1, + "one_d_lon" + ); assert_eq!( - meta.get_one_dim_repr_meta("lon").unwrap().2, + meta.get_one_dim_repr_meta().get("lon").unwrap().2, ZarrArrayMetadata::new( 2, ZarrDataType::Float(8), @@ -289,10 +316,13 @@ mod zarr_read_async_tests { let meta = store.get_zarr_metadata().await.unwrap(); // check the one dim repr for the lat - assert_eq!(meta.get_one_dim_repr_meta("lat").unwrap().0, 0); - assert_eq!(meta.get_one_dim_repr_meta("lat").unwrap().1, "one_d_lat"); + assert_eq!(meta.get_one_dim_repr_meta().get("lat").unwrap().0, 0); assert_eq!( - meta.get_one_dim_repr_meta("lat").unwrap().2, + meta.get_one_dim_repr_meta().get("lat").unwrap().1, + "one_d_lat" + ); + assert_eq!( + meta.get_one_dim_repr_meta().get("lat").unwrap().2, ZarrArrayMetadata::new( 3, ZarrDataType::Float(8), @@ -306,10 +336,13 @@ mod zarr_read_async_tests { ); // check the one dim repr for the lon - assert_eq!(meta.get_one_dim_repr_meta("lon").unwrap().0, 1); - assert_eq!(meta.get_one_dim_repr_meta("lon").unwrap().1, "one_d_lon"); + assert_eq!(meta.get_one_dim_repr_meta().get("lon").unwrap().0, 1); + assert_eq!( + meta.get_one_dim_repr_meta().get("lon").unwrap().1, + "one_d_lon" + ); assert_eq!( - meta.get_one_dim_repr_meta("lon").unwrap().2, + meta.get_one_dim_repr_meta().get("lon").unwrap().2, ZarrArrayMetadata::new( 3, ZarrDataType::Float(8), @@ -339,6 +372,7 @@ mod zarr_read_async_tests { meta.get_columns(), meta.get_real_dims(&pos), meta.get_chunk_patterns(), + &HashMap::new(), ) .await .unwrap(); @@ -360,6 +394,7 @@ mod zarr_read_async_tests { &cols, meta.get_real_dims(&pos), meta.get_chunk_patterns(), + &HashMap::new(), ) .await .unwrap(); @@ -377,6 +412,7 @@ mod zarr_read_async_tests { &cols, meta.get_real_dims(&pos), meta.get_chunk_patterns(), + &HashMap::new(), ) .await .unwrap(); diff --git a/src/reader/codecs.rs b/src/reader/codecs.rs index fcb7463..3e09b8a 100644 --- a/src/reader/codecs.rs +++ b/src/reader/codecs.rs @@ -286,6 +286,43 @@ fn decode_transpose( .collect::>()) } +// function to project 1D data to a larger dimensionality. this is useful for when a 1D +// representation of some 2D or 3D data is present (when the full data is just the 1D +// data projected along one or 2 extra dimentions). +fn project_one_d_repr( + input: Vec, + real_chunk_dims: &[usize], + dim_position: usize, +) -> ZarrResult> { + let err = "invalid parameters while processing one dimenesional representation"; + let l = input.len(); + let n_dims = real_chunk_dims.len(); + if dim_position >= n_dims || l != real_chunk_dims[dim_position] { + return Err(throw_invalid_meta(err)); + } + + match (n_dims, dim_position) { + (2, 0) => Ok(input + .into_iter() + .flat_map(|v| std::iter::repeat(v).take(real_chunk_dims[1])) + .collect()), + (2, 1) => Ok(vec![&input[..]; real_chunk_dims[0]].concat()), + (3, 0) => Ok(input + .into_iter() + .flat_map(|v| std::iter::repeat(v).take(real_chunk_dims[1] * real_chunk_dims[2])) + .collect()), + (3, 1) => { + let v: Vec<_> = input + .into_iter() + .flat_map(|v| std::iter::repeat(v).take(real_chunk_dims[2])) + .collect(); + Ok(vec![&v[..]; real_chunk_dims[0]].concat()) + } + (3, 2) => Ok(vec![&input[..]; real_chunk_dims[0] * real_chunk_dims[1]].concat()), + _ => Err(throw_invalid_meta(err)), + } +} + // function to only keep the data at the specified indices from a vector. // it function only works if the indices to keep are ordered. fn keep_indices(v: &mut Vec, indices: &[usize]) { @@ -592,7 +629,7 @@ macro_rules! create_decode_function { let inner_data = $func_name( bytes[*o..o + n].to_vec(), &sharding_params.chunk_shape, - &get_inner_chunk_real_dims(&sharding_params, &real_dims, pos), // TODO: fix this to real dims + &get_inner_chunk_real_dims(&sharding_params, &real_dims, pos), &sharding_params.codecs, None, )?; @@ -715,10 +752,14 @@ pub(crate) fn apply_codecs( data_type: &ZarrDataType, codecs: &Vec, sharding_params: Option, + one_d_repr_params: Option<(usize, &Vec)>, ) -> ZarrResult<(ArrayRef, FieldRef)> { macro_rules! return_array { ($func_name: tt, $data_t: expr, $array_t: ty) => { - let data = $func_name(raw_data, &chunk_dims, &real_dims, &codecs, sharding_params)?; + let mut data = $func_name(raw_data, &chunk_dims, &real_dims, &codecs, sharding_params)?; + if let Some((pos, proj_dims)) = one_d_repr_params { + data = project_one_d_repr(data, &proj_dims[..], pos)?; + } let field = Field::new(col_name, $data_t, false); let arr: $array_t = data.into(); return Ok((Arc::new(arr), Arc::new(field))) @@ -864,6 +905,7 @@ mod zarr_codecs_tests { &data_type, &codecs, sharding_params, + None, ) .unwrap(); @@ -914,6 +956,7 @@ mod zarr_codecs_tests { &data_type, &codecs, sharding_params, + None, ) .unwrap(); @@ -965,6 +1008,7 @@ mod zarr_codecs_tests { &data_type, &codecs, sharding_params, + None, ) .unwrap(); @@ -1024,4 +1068,36 @@ mod zarr_codecs_tests { Ok(()) } + + #[test] + fn test_project_one_d_repr() { + // (1, n) 1D projected to 2D + let v = project_one_d_repr(vec![1, 2, 3], &[4, 3], 1).unwrap(); + assert_eq!(v, vec![1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3]); + + //(n, 1) 1D projected to 2D + let v = project_one_d_repr(vec![1, 2, 3, 4], &[4, 3], 0).unwrap(); + assert_eq!(v, vec![1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4]); + + // // (1, 1, n) 1D projected t0 3D + let v = project_one_d_repr(vec![1, 2, 3], &[2, 4, 3], 2).unwrap(); + assert_eq!( + v, + vec![1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3] + ); + + // // (1, n, 1) 1D projected t0 3D + let v = project_one_d_repr(vec![1, 2, 3, 4], &[2, 4, 3], 1).unwrap(); + assert_eq!( + v, + vec![1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4, 1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4] + ); + + // // (n, 1, 1) 1D projected t0 3D + let v = project_one_d_repr(vec![1, 2], &[2, 4, 3], 0).unwrap(); + assert_eq!( + v, + vec![1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2] + ); + } } diff --git a/src/reader/metadata.rs b/src/reader/metadata.rs index 5eb712b..b57122f 100644 --- a/src/reader/metadata.rs +++ b/src/reader/metadata.rs @@ -413,7 +413,7 @@ impl ZarrStoreMetadata { } // the index of the last chunk in each dimension - if self.last_chunk_idx.is_none() { + if self.last_chunk_idx.is_none() && one_dim_repr.is_none() { self.last_chunk_idx = Some( chunks .iter() @@ -849,14 +849,8 @@ impl ZarrStoreMetadata { pub(crate) fn get_one_dim_repr_meta( &self, - column: &str, - ) -> ZarrResult<&(usize, String, ZarrArrayMetadata)> { - self.one_dim_representations - .get(column) - .ok_or(ZarrError::InvalidMetadata(format!( - "Cannot find variable {} in one dimensional representations", - column - ))) + ) -> &HashMap { + &self.one_dim_representations } // return the real dimensions of a chhunk, given its position, taking into diff --git a/src/reader/mod.rs b/src/reader/mod.rs index 799f846..9f24a07 100644 --- a/src/reader/mod.rs +++ b/src/reader/mod.rs @@ -134,6 +134,7 @@ impl ZarrIterator for ZarrStore { &cols, self.meta.get_real_dims(pos), self.meta.get_chunk_patterns(), + self.meta.get_one_dim_repr_meta(), ); self.curr_chunk += 1; Some(chnk) @@ -200,10 +201,34 @@ impl ZarrRecordBatchReader { col_name: String, arr_chnk: ZarrInMemoryArray, real_dims: &Vec, - chunk_dims: &Vec, + chunk_dims: &[usize], ) -> ZarrResult<(ArrayRef, FieldRef)> { - // get the metadata for the array - let meta = self.meta.get_array_meta(&col_name)?; + // check if this array has a one dimensional representation + let one_dim_params = self.meta.get_one_dim_repr_meta().get(&col_name); + + // the logic here can be a bit confusing. the function arguments + // correspond to the real chunk, which may or may not have a + // 1D representation. so here we pick the meta and dimensions + // depending on the situation. if there is a 1D representation, + // we still need to pass the dimensions of the real chunk to the + // [`apply_codecs`] function so that the 1D reprensation can be + // projected to obrain the real chunk. + let (meta, real_dims, chunk_dims, proj_params) = + if let Some((pos, _, one_dim_meta)) = one_dim_params { + ( + one_dim_meta, + vec![real_dims[*pos]], + vec![chunk_dims[*pos]], + Some((*pos, real_dims)), + ) + } else { + ( + self.meta.get_array_meta(&col_name)?, + real_dims.clone(), + chunk_dims.to_vec(), + None, + ) + }; // take the raw data from the chunk let data = arr_chnk.take_data(); @@ -212,11 +237,12 @@ impl ZarrRecordBatchReader { let (arr, field) = apply_codecs( col_name, data, - chunk_dims, - real_dims, + &chunk_dims, + &real_dims, meta.get_type(), meta.get_codecs(), meta.get_sharding_params(), + proj_params, )?; Ok((arr, field)) @@ -394,10 +420,11 @@ mod zarr_reader_tests { use arrow_array::types::*; use arrow_schema::{DataType, TimeUnit}; use itertools::enumerate; - use std::{boxed::Box, collections::HashMap, fmt::Debug, path::PathBuf}; + use std::{boxed::Box, collections::HashMap, fmt::Debug}; use super::*; use crate::reader::filters::{ZarrArrowPredicate, ZarrArrowPredicateFn}; + use crate::tests::{get_test_v2_data_path, get_test_v3_data_path}; fn validate_names_and_types(targets: &HashMap, rec: &RecordBatch) { let mut target_cols: Vec<&String> = targets.keys().collect(); @@ -456,18 +483,50 @@ mod zarr_reader_tests { assert!(matched); } + // create a test filter + fn create_filter() -> ZarrChunkFilter { + let mut filters: Vec> = Vec::new(); + let f = ZarrArrowPredicateFn::new( + ZarrProjection::keep(vec!["lat".to_string()]), + move |batch| { + gt_eq( + batch.column_by_name("lat").unwrap(), + &Scalar::new(&Float64Array::from(vec![38.6])), + ) + }, + ); + filters.push(Box::new(f)); + let f = ZarrArrowPredicateFn::new( + ZarrProjection::keep(vec!["lon".to_string()]), + move |batch| { + gt_eq( + batch.column_by_name("lon").unwrap(), + &Scalar::new(&Float64Array::from(vec![-109.7])), + ) + }, + ); + filters.push(Box::new(f)); + let f = ZarrArrowPredicateFn::new( + ZarrProjection::keep(vec!["lon".to_string()]), + move |batch| { + lt( + batch.column_by_name("lon").unwrap(), + &Scalar::new(&Float64Array::from(vec![-109.2])), + ) + }, + ); + filters.push(Box::new(f)); + + ZarrChunkFilter::new(filters) + } + //************************** // zarr format v2 tests //************************** - fn get_v2_test_data_path(zarr_store: String) -> PathBuf { - PathBuf::from(env!("CARGO_MANIFEST_DIR")) - .join("test-data/data/zarr/v2_data") - .join(zarr_store) - } #[test] fn compression_tests() { - let p = get_v2_test_data_path("compression_example.zarr".to_string()); + let p = get_test_v2_data_path("compression_example.zarr".to_string()); let reader = ZarrRecordBatchReaderBuilder::new(p).build().unwrap(); let records: Vec = reader.map(|x| x.unwrap()).collect(); @@ -545,7 +604,7 @@ mod zarr_reader_tests { #[test] fn projection_tests() { - let p = get_v2_test_data_path("compression_example.zarr".to_string()); + let p = get_test_v2_data_path("compression_example.zarr".to_string()); let proj = ZarrProjection::keep(vec!["bool_data".to_string(), "int_data".to_string()]); let builder = ZarrRecordBatchReaderBuilder::new(p).with_projection(proj); let reader = builder.build().unwrap(); @@ -573,7 +632,7 @@ mod zarr_reader_tests { #[test] fn multiple_readers_tests() { - let p = get_v2_test_data_path("compression_example.zarr".to_string()); + let p = get_test_v2_data_path("compression_example.zarr".to_string()); let reader1 = ZarrRecordBatchReaderBuilder::new(p.clone()) .build_partial_reader(Some((0, 5))) .unwrap(); @@ -644,7 +703,7 @@ mod zarr_reader_tests { #[test] fn endianness_and_order_tests() { - let p = get_v2_test_data_path("endianness_and_order_example.zarr".to_string()); + let p = get_test_v2_data_path("endianness_and_order_example.zarr".to_string()); let reader = ZarrRecordBatchReaderBuilder::new(p).build().unwrap(); let records: Vec = reader.map(|x| x.unwrap()).collect(); @@ -670,7 +729,7 @@ mod zarr_reader_tests { #[test] fn string_data_tests() { - let p = get_v2_test_data_path("string_example.zarr".to_string()); + let p = get_test_v2_data_path("string_example.zarr".to_string()); let reader = ZarrRecordBatchReaderBuilder::new(p).build().unwrap(); let records: Vec = reader.map(|x| x.unwrap()).collect(); @@ -698,7 +757,7 @@ mod zarr_reader_tests { #[test] fn ts_data_tests() { - let p = get_v2_test_data_path("ts_example.zarr".to_string()); + let p = get_test_v2_data_path("ts_example.zarr".to_string()); let reader = ZarrRecordBatchReaderBuilder::new(p).build().unwrap(); let records: Vec = reader.map(|x| x.unwrap()).collect(); @@ -782,7 +841,7 @@ mod zarr_reader_tests { #[test] fn one_dim_tests() { - let p = get_v2_test_data_path("one_dim_example.zarr".to_string()); + let p = get_test_v2_data_path("one_dim_example.zarr".to_string()); let reader = ZarrRecordBatchReaderBuilder::new(p).build().unwrap(); let records: Vec = reader.map(|x| x.unwrap()).collect(); @@ -806,7 +865,7 @@ mod zarr_reader_tests { #[test] fn three_dim_tests() { - let p = get_v2_test_data_path("three_dim_example.zarr".to_string()); + let p = get_test_v2_data_path("three_dim_example.zarr".to_string()); let reader = ZarrRecordBatchReaderBuilder::new(p).build().unwrap(); let records: Vec = reader.map(|x| x.unwrap()).collect(); @@ -856,44 +915,10 @@ mod zarr_reader_tests { #[test] fn filters_tests() { - let p = get_v2_test_data_path("lat_lon_example.zarr".to_string()); + let p = get_test_v2_data_path("lat_lon_example.zarr".to_string()); let mut builder = ZarrRecordBatchReaderBuilder::new(p); - // set the filters to select part of the raster, based on lat and - // lon coordinates. - let mut filters: Vec> = Vec::new(); - let f = ZarrArrowPredicateFn::new( - ZarrProjection::keep(vec!["lat".to_string()]), - move |batch| { - gt_eq( - batch.column_by_name("lat").unwrap(), - &Scalar::new(&Float64Array::from(vec![38.6])), - ) - }, - ); - filters.push(Box::new(f)); - let f = ZarrArrowPredicateFn::new( - ZarrProjection::keep(vec!["lon".to_string()]), - move |batch| { - gt_eq( - batch.column_by_name("lon").unwrap(), - &Scalar::new(&Float64Array::from(vec![-109.7])), - ) - }, - ); - filters.push(Box::new(f)); - let f = ZarrArrowPredicateFn::new( - ZarrProjection::keep(vec!["lon".to_string()]), - move |batch| { - lt( - batch.column_by_name("lon").unwrap(), - &Scalar::new(&Float64Array::from(vec![-109.2])), - ) - }, - ); - filters.push(Box::new(f)); - - builder = builder.with_filter(ZarrChunkFilter::new(filters)); + builder = builder.with_filter(create_filter()); let reader = builder.build().unwrap(); let records: Vec = reader.map(|x| x.unwrap()).collect(); @@ -938,7 +963,7 @@ mod zarr_reader_tests { #[test] fn empty_query_tests() { - let p = get_v2_test_data_path("lat_lon_example.zarr".to_string()); + let p = get_test_v2_data_path("lat_lon_example.zarr".to_string()); let mut builder = ZarrRecordBatchReaderBuilder::new(p); // set a filter that will filter out all the data, there should be nothing left after @@ -963,18 +988,31 @@ mod zarr_reader_tests { assert_eq!(records.len(), 0); } - //************************** - // zarr format v3 tests - //************************** - fn get_v3_test_data_path(zarr_store: String) -> PathBuf { - PathBuf::from(env!("CARGO_MANIFEST_DIR")) - .join("test-data/data/zarr/v3_data") - .join(zarr_store) + #[test] + fn one_dim_repr_tests() { + let p = get_test_v2_data_path("lat_lon_example_w_1d_repr.zarr".to_string()); + let mut builder = ZarrRecordBatchReaderBuilder::new(p); + + builder = builder.with_filter(create_filter()); + let reader = builder.build().unwrap(); + let records_from_one_d_repr: Vec = reader.map(|x| x.unwrap()).collect(); + + let p = get_test_v2_data_path("lat_lon_example.zarr".to_string()); + let mut builder = ZarrRecordBatchReaderBuilder::new(p); + + builder = builder.with_filter(create_filter()); + let reader = builder.build().unwrap(); + let records: Vec = reader.map(|x| x.unwrap()).collect(); + + assert_eq!(records_from_one_d_repr.len(), records.len()); + for (rec, rec_from_one_d_repr) in records.iter().zip(records_from_one_d_repr.iter()) { + assert_eq!(rec, rec_from_one_d_repr); + } } #[test] fn no_sharding_tests() { - let p = get_v3_test_data_path("no_sharding.zarr".to_string()); + let p = get_test_v3_data_path("no_sharding.zarr".to_string()); let builder = ZarrRecordBatchReaderBuilder::new(p); let reader = builder.build().unwrap(); @@ -993,7 +1031,7 @@ mod zarr_reader_tests { #[test] fn no_sharding_with_edge_tests() { - let p = get_v3_test_data_path("no_sharding_with_edge.zarr".to_string()); + let p = get_test_v3_data_path("no_sharding_with_edge.zarr".to_string()); let builder = ZarrRecordBatchReaderBuilder::new(p); let reader = builder.build().unwrap(); @@ -1022,7 +1060,7 @@ mod zarr_reader_tests { #[test] fn with_sharding_tests() { - let p = get_v3_test_data_path("with_sharding.zarr".to_string()); + let p = get_test_v3_data_path("with_sharding.zarr".to_string()); let builder = ZarrRecordBatchReaderBuilder::new(p); let reader = builder.build().unwrap(); @@ -1054,7 +1092,7 @@ mod zarr_reader_tests { #[test] fn with_sharding_with_edge_tests() { - let p = get_v3_test_data_path("with_sharding_with_edge.zarr".to_string()); + let p = get_test_v3_data_path("with_sharding_with_edge.zarr".to_string()); let builder = ZarrRecordBatchReaderBuilder::new(p); let reader = builder.build().unwrap(); @@ -1073,7 +1111,7 @@ mod zarr_reader_tests { #[test] fn three_dims_no_sharding_with_edge_tests() { - let p = get_v3_test_data_path("no_sharding_with_edge_3d.zarr".to_string()); + let p = get_test_v3_data_path("no_sharding_with_edge_3d.zarr".to_string()); let builder = ZarrRecordBatchReaderBuilder::new(p); let reader = builder.build().unwrap(); @@ -1088,7 +1126,7 @@ mod zarr_reader_tests { #[test] fn three_dims_with_sharding_with_edge_tests() { - let p = get_v3_test_data_path("with_sharding_with_edge_3d.zarr".to_string()); + let p = get_test_v3_data_path("with_sharding_with_edge_3d.zarr".to_string()); let builder = ZarrRecordBatchReaderBuilder::new(p); let reader = builder.build().unwrap(); diff --git a/src/reader/zarr_read.rs b/src/reader/zarr_read.rs index c4c7ee7..113faec 100644 --- a/src/reader/zarr_read.rs +++ b/src/reader/zarr_read.rs @@ -24,6 +24,8 @@ use crate::reader::metadata::{ChunkPattern, ChunkSeparator}; use crate::reader::ZarrStoreMetadata; use crate::reader::{ZarrError, ZarrResult}; +use super::metadata::ZarrArrayMetadata; + /// An in-memory representation of the data contained in one chunk /// of one zarr array (a single variable). #[derive(Debug, Clone)] @@ -255,6 +257,7 @@ pub trait ZarrRead { cols: &[String], real_dims: Vec, patterns: HashMap, + one_dim_repr: &HashMap, ) -> ZarrResult; } @@ -312,15 +315,35 @@ impl ZarrRead for PathBuf { cols: &[String], real_dims: Vec, patterns: HashMap, + one_dim_repr: &HashMap, ) -> ZarrResult { let mut chunk = ZarrInMemoryChunk::new(real_dims); for var in cols { - let s: Vec = position.iter().map(|i| i.to_string()).collect(); - let pattern = patterns - .get(var.as_str()) - .ok_or(ZarrError::InvalidMetadata( - "Could not find separator for column".to_string(), - ))?; + // this is admittedly a bit hard to follow without context. here we check to see if + // "var" has a one dimentional representation that we should use instead of the larger + // dimensional data. if there is, the file name will be different, and the "index" of + // the file will be one dimensional, e.g. if the real variable is [x, y], and the one + // dimensional representation is along the second dimension, then we would look for + // var.y (or var/y) instead of var.x.y. + let real_var_name = var; + let (s, var, pattern) = if let Some((pos, repr_name, meta)) = one_dim_repr.get(var) { + ( + vec![position[*pos].to_string()], + repr_name, + meta.get_chunk_pattern(), + ) + } else { + ( + position.iter().map(|i| i.to_string()).collect(), + var, + patterns + .get(var.as_str()) + .ok_or(ZarrError::InvalidMetadata( + "Could not find separator for column".to_string(), + ))? + .clone(), + ) + }; let chunk_file = match pattern { ChunkPattern { @@ -345,7 +368,7 @@ impl ZarrRead for PathBuf { return Err(ZarrError::MissingChunk(position.to_vec())); } let data = read(path)?; - chunk.add_array(var.to_string(), data); + chunk.add_array(real_var_name.to_string(), data); } Ok(chunk) @@ -406,10 +429,13 @@ mod zarr_read_tests { let meta = p.get_zarr_metadata().unwrap(); // check the one dim repr for the lat - assert_eq!(meta.get_one_dim_repr_meta("lat").unwrap().0, 0); - assert_eq!(meta.get_one_dim_repr_meta("lat").unwrap().1, "one_d_lat"); + assert_eq!(meta.get_one_dim_repr_meta().get("lat").unwrap().0, 1); assert_eq!( - meta.get_one_dim_repr_meta("lat").unwrap().2, + meta.get_one_dim_repr_meta().get("lat").unwrap().1, + "one_d_lat" + ); + assert_eq!( + meta.get_one_dim_repr_meta().get("lat").unwrap().2, ZarrArrayMetadata::new( 2, ZarrDataType::Float(8), @@ -431,10 +457,13 @@ mod zarr_read_tests { ); // check the one dim repr for the lon - assert_eq!(meta.get_one_dim_repr_meta("lon").unwrap().0, 1); - assert_eq!(meta.get_one_dim_repr_meta("lon").unwrap().1, "one_d_lon"); + assert_eq!(meta.get_one_dim_repr_meta().get("lon").unwrap().0, 0); assert_eq!( - meta.get_one_dim_repr_meta("lon").unwrap().2, + meta.get_one_dim_repr_meta().get("lon").unwrap().1, + "one_d_lon" + ); + assert_eq!( + meta.get_one_dim_repr_meta().get("lon").unwrap().2, ZarrArrayMetadata::new( 2, ZarrDataType::Float(8), @@ -462,10 +491,13 @@ mod zarr_read_tests { let meta = p.get_zarr_metadata().unwrap(); // check the one dim repr for the lat - assert_eq!(meta.get_one_dim_repr_meta("lat").unwrap().0, 0); - assert_eq!(meta.get_one_dim_repr_meta("lat").unwrap().1, "one_d_lat"); + assert_eq!(meta.get_one_dim_repr_meta().get("lat").unwrap().0, 0); + assert_eq!( + meta.get_one_dim_repr_meta().get("lat").unwrap().1, + "one_d_lat" + ); assert_eq!( - meta.get_one_dim_repr_meta("lat").unwrap().2, + meta.get_one_dim_repr_meta().get("lat").unwrap().2, ZarrArrayMetadata::new( 3, ZarrDataType::Float(8), @@ -479,10 +511,13 @@ mod zarr_read_tests { ); // check the one dim repr for the lon - assert_eq!(meta.get_one_dim_repr_meta("lon").unwrap().0, 1); - assert_eq!(meta.get_one_dim_repr_meta("lon").unwrap().1, "one_d_lon"); + assert_eq!(meta.get_one_dim_repr_meta().get("lon").unwrap().0, 1); + assert_eq!( + meta.get_one_dim_repr_meta().get("lon").unwrap().1, + "one_d_lon" + ); assert_eq!( - meta.get_one_dim_repr_meta("lon").unwrap().2, + meta.get_one_dim_repr_meta().get("lon").unwrap().2, ZarrArrayMetadata::new( 3, ZarrDataType::Float(8), @@ -511,6 +546,7 @@ mod zarr_read_tests { meta.get_columns(), meta.get_real_dims(&pos), meta.get_chunk_patterns(), + &HashMap::new(), ) .unwrap(); assert_eq!( @@ -531,6 +567,7 @@ mod zarr_read_tests { &cols, meta.get_real_dims(&pos), meta.get_chunk_patterns(), + &HashMap::new(), ) .unwrap(); assert_eq!( @@ -547,6 +584,7 @@ mod zarr_read_tests { &cols, meta.get_real_dims(&pos), meta.get_chunk_patterns(), + &HashMap::new(), ) .unwrap(); assert_eq!( diff --git a/test-data b/test-data index 2af70b4..99a7a08 160000 --- a/test-data +++ b/test-data @@ -1 +1 @@ -Subproject commit 2af70b4c0f59921734b109520acc3ea83e681476 +Subproject commit 99a7a08ea6ca581cbf8d2933b7c9377df59a71f5 From a207bd5aca8549a7340cdcb4d373243fea4f0b92 Mon Sep 17 00:00:00 2001 From: Maxime Dion Date: Mon, 9 Sep 2024 22:44:26 -0500 Subject: [PATCH 3/3] linter fixes --- src/datafusion/helpers.rs | 10 +++------- src/lib.rs | 1 - 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/src/datafusion/helpers.rs b/src/datafusion/helpers.rs index 0bebc30..4ed1bdb 100644 --- a/src/datafusion/helpers.rs +++ b/src/datafusion/helpers.rs @@ -247,13 +247,9 @@ pub(crate) fn build_row_filter( let candidates: Vec = predicates .into_iter() .flat_map(|expr| { - if let Ok(candidate) = - ZarrFilterCandidateBuilder::new(expr.clone(), file_schema).build() - { - candidate - } else { - None - } + ZarrFilterCandidateBuilder::new(expr.clone(), file_schema) + .build() + .unwrap_or_default() }) .collect(); diff --git a/src/lib.rs b/src/lib.rs index 425ef5e..bc0a5c7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,7 +26,6 @@ mod tests { use object_store::local::LocalFileSystem; use std::path::PathBuf; - #[cfg(feature = "datafusion")] pub(crate) fn get_test_v2_data_path(zarr_store: String) -> PathBuf { PathBuf::from(env!("CARGO_MANIFEST_DIR")) .join("test-data/data/zarr/v2_data")