Skip to content

Commit

Permalink
nightly! fix: inner_chunk_byte_range and retrieve_encoded_inner_chunk
Browse files Browse the repository at this point in the history
  • Loading branch information
LDeakin committed Dec 31, 2024
1 parent 7a1ed48 commit d56840d
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 10 deletions.
68 changes: 58 additions & 10 deletions zarrs/src/array/array_sync_sharded_readable_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use rayon::iter::{IntoParallelIterator, ParallelIterator};
use rayon_iter_concurrent_limit::iter_concurrent_limit;
use unsafe_cell_slice::UnsafeCellSlice;
use zarrs_storage::byte_range::ByteRange;
use zarrs_storage::StorageHandle;

use super::array_bytes::{merge_chunks_vlen, update_bytes_flen};
use super::codec::array_to_bytes::sharding::ShardingPartialDecoder;
Expand All @@ -14,6 +15,7 @@ use super::{
ArrayShardedExt, ChunkGrid,
};
use super::{ArrayBytes, ArraySize, DataTypeSize};
use crate::array::codec::StoragePartialDecoder;
use crate::storage::ReadableStorageTraits;
use crate::{array::codec::ArrayPartialDecoderTraits, array_subset::ArraySubset};

Expand Down Expand Up @@ -88,6 +90,28 @@ impl ArrayShardedReadableExtCache {
let mut cache = self.cache.lock().unwrap();
if let Some(partial_decoder) = cache.get(shard_indices) {
Ok(partial_decoder.clone())
} else if self.array_is_exclusively_sharded() {
// Create the sharding partial decoder directly, without a codec chain
let storage_handle = Arc::new(StorageHandle::new(array.storage.clone()));
let storage_transformer = array
.storage_transformers()
.create_readable_transformer(storage_handle)?;
let input_handle = Arc::new(StoragePartialDecoder::new(
storage_transformer,
array.chunk_key(shard_indices),
));
let chunk_representation = array.chunk_array_representation(shard_indices)?;
let partial_decoder = array
.codecs()
.array_to_bytes_codec()
.clone()
.partial_decoder(
input_handle,
&chunk_representation,
&CodecOptions::default(),
)?;
cache.insert(shard_indices.to_vec(), partial_decoder.clone());
Ok(partial_decoder)
} else {
let partial_decoder: Arc<dyn ArrayPartialDecoderTraits> =
array.partial_decoder(shard_indices)?;
Expand Down Expand Up @@ -282,17 +306,20 @@ impl<TStorage: ?Sized + ReadableStorageTraits + 'static> ArrayShardedReadableExt
cache: &ArrayShardedReadableExtCache,
inner_chunk_indices: &[u64],
) -> Result<Option<ByteRange>, ArrayError> {
if cache.array_is_sharded() {
if cache.array_is_exclusively_sharded() {
let (shard_indices, chunk_indices) =
inner_chunk_shard_index_and_chunk_index(self, cache, inner_chunk_indices)?;
let partial_decoder = cache.retrieve(self, &shard_indices)?;
let partial_decoder = (&partial_decoder as &dyn Any)
.downcast_ref::<ShardingPartialDecoder>()
.expect("array is sharded");
let partial_decoder: Arc<dyn Any + Send + Sync> = partial_decoder.clone();
let partial_decoder = partial_decoder
.downcast::<ShardingPartialDecoder>()
.expect("array is exclusively sharded");

Ok(partial_decoder.inner_chunk_byte_range(&chunk_indices)?)
} else {
Ok(None)
Err(ArrayError::UnsupportedMethod(
"the array is not exclusively sharded".to_string(),
))
}
}

Expand All @@ -301,19 +328,22 @@ impl<TStorage: ?Sized + ReadableStorageTraits + 'static> ArrayShardedReadableExt
cache: &ArrayShardedReadableExtCache,
inner_chunk_indices: &[u64],
) -> Result<Option<Vec<u8>>, ArrayError> {
if cache.array_is_sharded() {
if cache.array_is_exclusively_sharded() {
let (shard_indices, chunk_indices) =
inner_chunk_shard_index_and_chunk_index(self, cache, inner_chunk_indices)?;
let partial_decoder = cache.retrieve(self, &shard_indices)?;
let partial_decoder = (&partial_decoder as &dyn Any)
.downcast_ref::<ShardingPartialDecoder>()
.expect("array is sharded");
let partial_decoder: Arc<dyn Any + Send + Sync> = partial_decoder.clone();
let partial_decoder = partial_decoder
.downcast::<ShardingPartialDecoder>()
.expect("array is exclusively sharded");

Ok(partial_decoder
.retrieve_inner_chunk_encoded(&chunk_indices)?
.map(Vec::from))
} else {
Ok(self.retrieve_encoded_chunk(inner_chunk_indices)?)
Err(ArrayError::UnsupportedMethod(
"the array is not exclusively sharded".to_string(),
))
}
}

Expand Down Expand Up @@ -717,6 +747,21 @@ mod tests {
assert_eq!(compare, test);
assert_eq!(cache.len(), 4);
}

let encoded_inner_chunk = array
.retrieve_encoded_inner_chunk(&cache, &[0, 0])?
.unwrap();
assert_eq!(
array
.inner_chunk_byte_range(&cache, &[0, 0])?
.unwrap()
.length(u64::MAX),
encoded_inner_chunk.len() as u64
);
// assert_eq!(
// u16::from_array_bytes(array.data_type(), encoded_inner_chunk.into())?,
// array.retrieve_chunk_elements::<u16>(&[0, 0])?
// );
} else {
assert_eq!(array.inner_chunk_shape(), None);
assert_eq!(
Expand Down Expand Up @@ -745,6 +790,9 @@ mod tests {
)?;
assert_eq!(compare, test);
assert!(cache.is_empty());

assert!(array.retrieve_encoded_inner_chunk(&cache, &[0, 0]).is_err());
assert!(array.inner_chunk_byte_range(&cache, &[0, 0]).is_err());
}

Ok(())
Expand Down
2 changes: 2 additions & 0 deletions zarrs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@
//! Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.
#![cfg_attr(docsrs, feature(doc_auto_cfg))]

#![feature(trait_upcasting)] // FIXME: Remove

pub mod array;
pub mod array_subset;
pub mod config;
Expand Down

0 comments on commit d56840d

Please sign in to comment.