Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add zarrs_validate #17

Merged
merged 1 commit into from
Nov 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added
- Add `zarrs_validate` to check that arrays are equivalent

### Changed
- [#12](https://github.com/LDeakin/zarrs_tools/pull/12) Bump netcdf to 0.10.2 by [@magnusuMET]
- **Breaking**: Bump MSRV to 1.80
Expand Down
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ filter = []
info = []
ncvar2zarr = ["dep:netcdf"]
ome = ["dep:ome_zarr_metadata"]
validate = []

[dependencies]
anyhow = "1.0.86"
Expand Down Expand Up @@ -90,6 +91,10 @@ required-features = ["ome"]
name = "zarrs_reencode"
required-features = []

[[bin]]
name = "zarrs_validate"
required-features = ["validate"]

[profile.dist]
inherits = "release"
lto = "thin"
Expand Down
19 changes: 10 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,21 @@ A changelog can be found [here](https://github.com/LDeakin/zarrs_tools/blob/main
All tools support input and output of [Zarr V3](https://zarr-specs.readthedocs.io/en/latest/v3/core/v3.0.html) data.
Some tools additionally support input of a [V3 compatible subset](https://docs.rs/zarrs/latest/zarrs/#implementation-status) of [Zarr V2](https://zarr-specs.readthedocs.io/en/latest/v2/v2.0.html).

- [zarrs_reencode](https://github.com/LDeakin/zarrs_tools/blob/main/docs/zarrs_reencode.md): reencode an array. Manipulate the chunk size, shard size, codecs, fill value, chunk key encoding separator, and attributes.
- [zarrs_filter](https://github.com/LDeakin/zarrs_tools/blob/main/docs/zarrs_filter.md) (feature `filter`): apply simple image filters (transformations) to an array.
- [zarrs_ome](https://github.com/LDeakin/zarrs_tools/blob/main/docs/zarrs_ome.md) (feature `ome`): convert an array to an [OME-Zarr](https://ngff.openmicroscopy.org/latest/index.html) multi-scale image.
- [`zarrs_reencode`](https://github.com/LDeakin/zarrs_tools/blob/main/docs/zarrs_reencode.md): reencode an array. Manipulate the chunk size, shard size, codecs, fill value, chunk key encoding separator, and attributes.
- [`zarrs_filter`](https://github.com/LDeakin/zarrs_tools/blob/main/docs/zarrs_filter.md) (feature `filter`): apply simple image filters (transformations) to an array.
- [`zarrs_ome`](https://github.com/LDeakin/zarrs_tools/blob/main/docs/zarrs_ome.md) (feature `ome`): convert an array to an [OME-Zarr](https://ngff.openmicroscopy.org/latest/index.html) multi-scale image.
- Supports OME-Zarr `0.5-dev` (as Zarr V3) and `0.5-dev1`. The first is recognised by [Neuroglancer](https://github.com/google/neuroglancer).
- zarrs_info (feature `info`): return metadata related info or the range/histogram of an array.
- [zarrs_binary2zarr](https://github.com/LDeakin/zarrs_tools/blob/main/docs/zarrs_binary2zarr.md) (feature `binary2zarr`): create an array from piped binary data.
- [zarrs_ncvar2zarr](https://github.com/LDeakin/zarrs_tools/blob/main/docs/zarrs_ncvar2zarr.md) (feature `ncvar2zarr`): convert a netCDF variable to an array.
- `zarrs_info` (feature `info`): return metadata related info or the range/histogram of an array.
- `zarrs_validate` (feature `validate`): validate that two arrays are equivalent.
- [`zarrs_binary2zarr`](https://github.com/LDeakin/zarrs_tools/blob/main/docs/zarrs_binary2zarr.md) (feature `binary2zarr`): create an array from piped binary data.
- [`zarrs_ncvar2zarr`](https://github.com/LDeakin/zarrs_tools/blob/main/docs/zarrs_ncvar2zarr.md) (feature `ncvar2zarr`): convert a netCDF variable to an array.

See [docs/](https://github.com/LDeakin/zarrs_tools/blob/main/docs/) for tool documentation.

## `zarrs` Benchmarking
- zarrs_reencode: suitable for round trip benchmarking.
- zarrs_benchmark_read_sync (feature `benchmark`): benchmark the zarrs sync API.
- zarrs_benchmark_read_async (feature `benchmark`): benchmark the zarrs async API.
- `zarrs_reencode`: suitable for round trip benchmarking.
- `zarrs_benchmark_read_sync` (feature `benchmark`): benchmark the zarrs sync API.
- `zarrs_benchmark_read_async` (feature `benchmark`): benchmark the zarrs async API.

See the [LDeakin/zarr_benchmarks](https://github.com/LDeakin/zarr_benchmarks) repository for benchmarks of `zarrs` against other Zarr V3 implementations.

Expand Down
157 changes: 157 additions & 0 deletions src/bin/zarrs_validate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
use std::sync::atomic::AtomicU64;
use std::sync::Arc;

use clap::Parser;
use indicatif::{ProgressBar, ProgressStyle};
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use rayon_iter_concurrent_limit::iter_concurrent_limit;
use zarrs::array::codec::CodecOptionsBuilder;
use zarrs::array_subset::ArraySubset;
use zarrs::filesystem::{FilesystemStore, FilesystemStoreOptions};
use zarrs::storage::{
storage_adapter::async_to_sync::{AsyncToSyncBlockOn, AsyncToSyncStorageAdapter},
AsyncReadableListableStorage, ReadableListableStorage,
};
use zarrs_opendal::AsyncOpendalStore;
use zarrs_tools::calculate_chunk_and_codec_concurrency;

/// Compare the data in two Zarr arrays.
///
/// Equality of the arrays is determined by comparing the shape, data type, and data.
///
/// Differences in encoding (e.g codecs, chunk key encoding) and attributes are ignored.
#[derive(Parser, Debug)]
#[command(author, version)]
struct Args {
/// The path to the first zarr array.
first: String,

/// The path to the second zarr array.
second: String,

/// Number of concurrent chunks to compare.
#[arg(long)]
concurrent_chunks: Option<usize>,
}

fn bar_style_run() -> ProgressStyle {
ProgressStyle::with_template(
"[{elapsed_precise}/{duration_precise}] {bar:40.black/bold} {pos}/{len} ({percent}%) {prefix} {msg}",
)
.unwrap_or(ProgressStyle::default_bar())
}

struct TokioBlockOn(tokio::runtime::Runtime);

impl AsyncToSyncBlockOn for TokioBlockOn {
fn block_on<F: core::future::Future>(&self, future: F) -> F::Output {
self.0.block_on(future)
}
}

fn get_storage(path: &str) -> anyhow::Result<ReadableListableStorage> {
if path.starts_with("http://") || path.starts_with("https://") {
let builder = opendal::services::Http::default().endpoint(path);
let operator = opendal::Operator::new(builder)?.finish();
let storage: AsyncReadableListableStorage = Arc::new(AsyncOpendalStore::new(operator));
let block_on = TokioBlockOn(tokio::runtime::Runtime::new()?);
Ok(Arc::new(AsyncToSyncStorageAdapter::new(storage, block_on)))
// } else if path.starts_with("s3://") {
// let endpoint = "";
// let bucket = "";
// let root = "";
// let region = "auto";
// let builder = opendal::services::S3::default()
// .endpoint(&endpoint)
// .region(&region)
// .root(path)
// .allow_anonymous()
// .bucket(&bucket);
// let operator = opendal::Operator::new(builder)?.finish();
// Arc::new(AsyncOpendalStore::new(operator))
} else {
Ok(Arc::new(FilesystemStore::new_with_options(
path,
FilesystemStoreOptions::default().direct_io(true).clone(),
)?))
}
}

fn main() {
match try_main() {
Ok(success) => println!("{}", success),
Err(err) => {
eprintln!("{}", err);
std::process::exit(1);
}
}
}

fn try_main() -> anyhow::Result<String> {
let args = Args::parse();

let storage1 = get_storage(&args.first)?;
let storage2 = get_storage(&args.second)?;
let array1 = zarrs::array::Array::open(storage1.clone(), "/").unwrap();
let array2 = zarrs::array::Array::open(storage2.clone(), "/").unwrap();

let bar = ProgressBar::new(0);
bar.set_style(bar_style_run());

if array1.shape() != array2.shape() {
anyhow::bail!(
"Array shapes do not match: {:?} vs {:?}",
array1.shape(),
array2.shape()
);
} else if array1.data_type() != array2.data_type() {
anyhow::bail!(
"Array data types do not match: {} vs {}",
array1.data_type(),
array2.data_type()
);
}

let chunks = ArraySubset::new_with_shape(array1.chunk_grid_shape().unwrap());

let chunk_representation = array1
.chunk_array_representation(&vec![0; array1.chunk_grid().dimensionality()])
.unwrap();

let concurrent_target = std::thread::available_parallelism().unwrap().get();
let (chunks_concurrent_limit, codec_concurrent_target) = calculate_chunk_and_codec_concurrency(
concurrent_target,
args.concurrent_chunks,
array1.codecs(),
chunks.num_elements_usize(),
&chunk_representation,
);
let codec_options = CodecOptionsBuilder::new()
.concurrent_target(codec_concurrent_target)
.build();

let num_iterations = chunks.num_elements_usize();
bar.set_length(num_iterations as u64);
let indices = chunks.indices();
let step = AtomicU64::new(0);
iter_concurrent_limit!(
chunks_concurrent_limit,
indices,
try_for_each,
|chunk_indices: Vec<u64>| {
let step = step.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
bar.set_position(step);
let chunk_subset = array1.chunk_subset(&chunk_indices).unwrap();
let bytes_first = array1.retrieve_chunk_opt(&chunk_indices, &codec_options)?;
let bytes_second = array2.retrieve_array_subset_opt(&chunk_subset, &codec_options)?;
if bytes_first == bytes_second {
Ok(())
} else {
anyhow::bail!("Data differs in region: {chunk_subset}");
}
}
)?;
bar.finish_and_clear();

Ok(format!("Success: {} and {} match", args.first, args.second))
}