diff --git a/CHANGELOG.md b/CHANGELOG.md index 9198269..f27f92c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + - Add `zarrs_validate` to check that arrays are equivalent + ### Changed - [#12](https://github.com/LDeakin/zarrs_tools/pull/12) Bump netcdf to 0.10.2 by [@magnusuMET] - **Breaking**: Bump MSRV to 1.80 diff --git a/Cargo.toml b/Cargo.toml index 9517992..7e8ea90 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ filter = [] info = [] ncvar2zarr = ["dep:netcdf"] ome = ["dep:ome_zarr_metadata"] +validate = [] [dependencies] anyhow = "1.0.86" @@ -90,6 +91,10 @@ required-features = ["ome"] name = "zarrs_reencode" required-features = [] +[[bin]] +name = "zarrs_validate" +required-features = ["validate"] + [profile.dist] inherits = "release" lto = "thin" diff --git a/README.md b/README.md index 91f3a84..a049e98 100644 --- a/README.md +++ b/README.md @@ -12,20 +12,21 @@ A changelog can be found [here](https://github.com/LDeakin/zarrs_tools/blob/main All tools support input and output of [Zarr V3](https://zarr-specs.readthedocs.io/en/latest/v3/core/v3.0.html) data. Some tools additionally support input of a [V3 compatible subset](https://docs.rs/zarrs/latest/zarrs/#implementation-status) of [Zarr V2](https://zarr-specs.readthedocs.io/en/latest/v2/v2.0.html). -- [zarrs_reencode](https://github.com/LDeakin/zarrs_tools/blob/main/docs/zarrs_reencode.md): reencode an array. Manipulate the chunk size, shard size, codecs, fill value, chunk key encoding separator, and attributes. -- [zarrs_filter](https://github.com/LDeakin/zarrs_tools/blob/main/docs/zarrs_filter.md) (feature `filter`): apply simple image filters (transformations) to an array. -- [zarrs_ome](https://github.com/LDeakin/zarrs_tools/blob/main/docs/zarrs_ome.md) (feature `ome`): convert an array to an [OME-Zarr](https://ngff.openmicroscopy.org/latest/index.html) multi-scale image. +- [`zarrs_reencode`](https://github.com/LDeakin/zarrs_tools/blob/main/docs/zarrs_reencode.md): reencode an array. Manipulate the chunk size, shard size, codecs, fill value, chunk key encoding separator, and attributes. +- [`zarrs_filter`](https://github.com/LDeakin/zarrs_tools/blob/main/docs/zarrs_filter.md) (feature `filter`): apply simple image filters (transformations) to an array. +- [`zarrs_ome`](https://github.com/LDeakin/zarrs_tools/blob/main/docs/zarrs_ome.md) (feature `ome`): convert an array to an [OME-Zarr](https://ngff.openmicroscopy.org/latest/index.html) multi-scale image. - Supports OME-Zarr `0.5-dev` (as Zarr V3) and `0.5-dev1`. The first is recognised by [Neuroglancer](https://github.com/google/neuroglancer). -- zarrs_info (feature `info`): return metadata related info or the range/histogram of an array. -- [zarrs_binary2zarr](https://github.com/LDeakin/zarrs_tools/blob/main/docs/zarrs_binary2zarr.md) (feature `binary2zarr`): create an array from piped binary data. -- [zarrs_ncvar2zarr](https://github.com/LDeakin/zarrs_tools/blob/main/docs/zarrs_ncvar2zarr.md) (feature `ncvar2zarr`): convert a netCDF variable to an array. +- `zarrs_info` (feature `info`): return metadata related info or the range/histogram of an array. +- `zarrs_validate` (feature `validate`): validate that two arrays are equivalent. +- [`zarrs_binary2zarr`](https://github.com/LDeakin/zarrs_tools/blob/main/docs/zarrs_binary2zarr.md) (feature `binary2zarr`): create an array from piped binary data. +- [`zarrs_ncvar2zarr`](https://github.com/LDeakin/zarrs_tools/blob/main/docs/zarrs_ncvar2zarr.md) (feature `ncvar2zarr`): convert a netCDF variable to an array. See [docs/](https://github.com/LDeakin/zarrs_tools/blob/main/docs/) for tool documentation. ## `zarrs` Benchmarking -- zarrs_reencode: suitable for round trip benchmarking. -- zarrs_benchmark_read_sync (feature `benchmark`): benchmark the zarrs sync API. -- zarrs_benchmark_read_async (feature `benchmark`): benchmark the zarrs async API. +- `zarrs_reencode`: suitable for round trip benchmarking. +- `zarrs_benchmark_read_sync` (feature `benchmark`): benchmark the zarrs sync API. +- `zarrs_benchmark_read_async` (feature `benchmark`): benchmark the zarrs async API. See the [LDeakin/zarr_benchmarks](https://github.com/LDeakin/zarr_benchmarks) repository for benchmarks of `zarrs` against other Zarr V3 implementations. diff --git a/src/bin/zarrs_validate.rs b/src/bin/zarrs_validate.rs new file mode 100644 index 0000000..1ca3936 --- /dev/null +++ b/src/bin/zarrs_validate.rs @@ -0,0 +1,157 @@ +use std::sync::atomic::AtomicU64; +use std::sync::Arc; + +use clap::Parser; +use indicatif::{ProgressBar, ProgressStyle}; +use rayon::iter::{IntoParallelIterator, ParallelIterator}; +use rayon_iter_concurrent_limit::iter_concurrent_limit; +use zarrs::array::codec::CodecOptionsBuilder; +use zarrs::array_subset::ArraySubset; +use zarrs::filesystem::{FilesystemStore, FilesystemStoreOptions}; +use zarrs::storage::{ + storage_adapter::async_to_sync::{AsyncToSyncBlockOn, AsyncToSyncStorageAdapter}, + AsyncReadableListableStorage, ReadableListableStorage, +}; +use zarrs_opendal::AsyncOpendalStore; +use zarrs_tools::calculate_chunk_and_codec_concurrency; + +/// Compare the data in two Zarr arrays. +/// +/// Equality of the arrays is determined by comparing the shape, data type, and data. +/// +/// Differences in encoding (e.g codecs, chunk key encoding) and attributes are ignored. +#[derive(Parser, Debug)] +#[command(author, version)] +struct Args { + /// The path to the first zarr array. + first: String, + + /// The path to the second zarr array. + second: String, + + /// Number of concurrent chunks to compare. + #[arg(long)] + concurrent_chunks: Option, +} + +fn bar_style_run() -> ProgressStyle { + ProgressStyle::with_template( + "[{elapsed_precise}/{duration_precise}] {bar:40.black/bold} {pos}/{len} ({percent}%) {prefix} {msg}", + ) + .unwrap_or(ProgressStyle::default_bar()) +} + +struct TokioBlockOn(tokio::runtime::Runtime); + +impl AsyncToSyncBlockOn for TokioBlockOn { + fn block_on(&self, future: F) -> F::Output { + self.0.block_on(future) + } +} + +fn get_storage(path: &str) -> anyhow::Result { + if path.starts_with("http://") || path.starts_with("https://") { + let builder = opendal::services::Http::default().endpoint(path); + let operator = opendal::Operator::new(builder)?.finish(); + let storage: AsyncReadableListableStorage = Arc::new(AsyncOpendalStore::new(operator)); + let block_on = TokioBlockOn(tokio::runtime::Runtime::new()?); + Ok(Arc::new(AsyncToSyncStorageAdapter::new(storage, block_on))) + // } else if path.starts_with("s3://") { + // let endpoint = ""; + // let bucket = ""; + // let root = ""; + // let region = "auto"; + // let builder = opendal::services::S3::default() + // .endpoint(&endpoint) + // .region(®ion) + // .root(path) + // .allow_anonymous() + // .bucket(&bucket); + // let operator = opendal::Operator::new(builder)?.finish(); + // Arc::new(AsyncOpendalStore::new(operator)) + } else { + Ok(Arc::new(FilesystemStore::new_with_options( + path, + FilesystemStoreOptions::default().direct_io(true).clone(), + )?)) + } +} + +fn main() { + match try_main() { + Ok(success) => println!("{}", success), + Err(err) => { + eprintln!("{}", err); + std::process::exit(1); + } + } +} + +fn try_main() -> anyhow::Result { + let args = Args::parse(); + + let storage1 = get_storage(&args.first)?; + let storage2 = get_storage(&args.second)?; + let array1 = zarrs::array::Array::open(storage1.clone(), "/").unwrap(); + let array2 = zarrs::array::Array::open(storage2.clone(), "/").unwrap(); + + let bar = ProgressBar::new(0); + bar.set_style(bar_style_run()); + + if array1.shape() != array2.shape() { + anyhow::bail!( + "Array shapes do not match: {:?} vs {:?}", + array1.shape(), + array2.shape() + ); + } else if array1.data_type() != array2.data_type() { + anyhow::bail!( + "Array data types do not match: {} vs {}", + array1.data_type(), + array2.data_type() + ); + } + + let chunks = ArraySubset::new_with_shape(array1.chunk_grid_shape().unwrap()); + + let chunk_representation = array1 + .chunk_array_representation(&vec![0; array1.chunk_grid().dimensionality()]) + .unwrap(); + + let concurrent_target = std::thread::available_parallelism().unwrap().get(); + let (chunks_concurrent_limit, codec_concurrent_target) = calculate_chunk_and_codec_concurrency( + concurrent_target, + args.concurrent_chunks, + array1.codecs(), + chunks.num_elements_usize(), + &chunk_representation, + ); + let codec_options = CodecOptionsBuilder::new() + .concurrent_target(codec_concurrent_target) + .build(); + + let num_iterations = chunks.num_elements_usize(); + bar.set_length(num_iterations as u64); + let indices = chunks.indices(); + let step = AtomicU64::new(0); + iter_concurrent_limit!( + chunks_concurrent_limit, + indices, + try_for_each, + |chunk_indices: Vec| { + let step = step.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + bar.set_position(step); + let chunk_subset = array1.chunk_subset(&chunk_indices).unwrap(); + let bytes_first = array1.retrieve_chunk_opt(&chunk_indices, &codec_options)?; + let bytes_second = array2.retrieve_array_subset_opt(&chunk_subset, &codec_options)?; + if bytes_first == bytes_second { + Ok(()) + } else { + anyhow::bail!("Data differs in region: {chunk_subset}"); + } + } + )?; + bar.finish_and_clear(); + + Ok(format!("Success: {} and {} match", args.first, args.second)) +}