diff --git a/Cargo.lock b/Cargo.lock index 9cb702a..6a3cdbe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -117,6 +117,17 @@ dependencies = [ "syn 2.0.50", ] +[[package]] +name = "async-scoped" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4042078ea593edffc452eef14e99fdb2b120caa4ad9618bcdeabc4a023b98740" +dependencies = [ + "futures", + "pin-project", + "tokio", +] + [[package]] name = "async-trait" version = "0.1.77" @@ -136,9 +147,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "backon" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c1a6197b2120bb2185a267f6515038558b019e92b832bb0320e96d66268dcf9" +checksum = "c79c8ef183b8b663e8cb19cf92fb7d98c56739977bd47eae2de2717bd5de2c2c" dependencies = [ "fastrand", "futures-core", @@ -202,15 +213,29 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.15.0" +version = "3.15.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d32a994c2b3ca201d9b263612a374263f05e7adde37c4707f693dcd375076d1f" +checksum = "a3b1be7772ee4501dba05acbe66bb1e8760f6a6c474a36035631638e4415f130" [[package]] name = "bytemuck" version = "1.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2ef034f05691a48569bd920a96c81b9d91bbad1ab5ac7c4616c1f6ef36cb79f" +dependencies = [ + "bytemuck_derive", +] + +[[package]] +name = "bytemuck_derive" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "965ab7eb5f8f97d2a083c799f3a1b994fc397b2fe2da5d1da1626ce15a39f2b1" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.50", +] [[package]] name = "byteorder" @@ -226,11 +251,10 @@ checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" [[package]] name = "cc" -version = "1.0.83" +version = "1.0.86" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1174fb0b6ec23863f8b971027804a42614e347eafb0a95bf0b12cdae21fc4d0" +checksum = "7f9fa1897e4325be0d68d48df6aa1a71ac2ed4d27723887e7754192705350730" dependencies = [ - "jobserver", "libc", ] @@ -579,12 +603,9 @@ dependencies = [ [[package]] name = "fastrand" -version = "1.9.0" +version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be" -dependencies = [ - "instant", -] +checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5" [[package]] name = "flagset" @@ -760,6 +781,7 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bc52e53916c08643f1b56ec082790d1e86a32e58dc5268f897f313fbae7b4872" dependencies = [ + "bytemuck", "cfg-if", "crunchy", ] @@ -1004,15 +1026,6 @@ version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c" -[[package]] -name = "jobserver" -version = "0.1.28" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab46a6e9526ddef3ae7f787c06f0f2600639ba80ea3eade3d8e670a2230f51d6" -dependencies = [ - "libc", -] - [[package]] name = "js-sys" version = "0.3.68" @@ -1412,9 +1425,9 @@ dependencies = [ [[package]] name = "openssl-sys" -version = "0.9.100" +version = "0.9.101" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae94056a791d0e1217d18b6cbdccb02c61e3054fc69893607f4067e3bb0b1fd1" +checksum = "dda2b0f344e78efc2facf7d195d098df0dd72151b26ab98da807afc26c198dff" dependencies = [ "cc", "libc", @@ -2371,9 +2384,9 @@ checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" [[package]] name = "unicode-normalization" -version = "0.1.22" +version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c5713f0fc4b5db668a2ac63cdb7bb4469d8c9fed047b1d0292cc7b0ce2ba921" +checksum = "a56d1686db2308d901306f92a263857ef59ea39678a5458e7cb17f01415101f5" dependencies = [ "tinyvec", ] @@ -2738,7 +2751,7 @@ dependencies = [ [[package]] name = "zarrs" version = "0.12.0" -source = "git+https://github.com/LDeakin/zarrs.git#842efa0674d71dc392d004c10e0fa8d24f8dea27" +source = "git+https://github.com/LDeakin/zarrs.git#2c62275d1842d39c9861b0748467387b423c209f" dependencies = [ "async-lock", "async-recursion", @@ -2774,6 +2787,7 @@ dependencies = [ name = "zarrs_tools" version = "0.3.0" dependencies = [ + "async-scoped", "clap", "futures", "indicatif", diff --git a/Cargo.toml b/Cargo.toml index e917302..9da87cc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ categories = ["command-line-utilities", "encoding"] ncvar2zarr = ["dep:netcdf"] [dependencies] +async-scoped = { version = "0.9.0", features = ["use-tokio"] } clap = { version = "4.4.6", features = ["derive"] } futures = "0.3.29" indicatif = { version = "0.17.7", features = ["rayon"] } diff --git a/README.md b/README.md index c719dd0..fb0319b 100644 --- a/README.md +++ b/README.md @@ -7,15 +7,15 @@ Various tools for creating and manipulating [Zarr v3](https://zarr.dev) data wit [Changelog (CHANGELOG.md)](https://github.com/LDeakin/zarrs_tools/blob/main/CHANGELOG.md) ## Tools +- Benchmark scripts that measure the time to retrieve an array either chunk-by-chunk or at once into a single array. + - `zarrs_benchmark_read_sync`: benchmark with the zarrs sync API. + - `zarrs_benchmark_read_async`: benchmark with the zarrs async API. + - See [docs/benchmarks.md](https://github.com/LDeakin/zarrs_tools/blob/main/docs/benchmarks.md) for some benchmark measurements. - `zarrs_reencode`: reencode a Zarr v3 array. [Example](https://github.com/LDeakin/zarrs_tools/blob/main/docs/reencode_rechunk.md). - Can change the chunk size, shard size, compression, etc. - - Suitable for round trip benchmarking + - Suitable for round trip benchmarking. - `zarrs_binary2zarr`: create a Zarr v3 array from piped binary data. [Example](https://github.com/LDeakin/zarrs_tools/blob/main/docs/convert_binary.md). - `zarrs_ncvar2zarr` (requires `ncvar2zarr` feature): convert a netCDF variable to a zarr V3 array. [Example](https://github.com/LDeakin/zarrs_tools/blob/main/docs/convert_netcdf.md). -- `zarrs_benchmark_read_sync`: Measure the time to read (decode) each chunk of an array using the zarrs sync API. -- `zarrs_benchmark_read_async`: Measure the time to read (decode) each chunk of an array using the zarrs async API. - -See [docs/benchmarks.md](https://github.com/LDeakin/zarrs_tools/blob/main/docs/benchmarks.md) for some benchmark measurements. ## Install @@ -41,7 +41,7 @@ Non-default `zarrs` codecs (see [`zarrs` crate features](https://docs.rs/zarrs/l For example: ```bash -cargo install zarrs_tools --features zarrs/bitround,zarrs/zfp +cargo install zarrs_tools --features zarrs/bitround,zarrs/zfp,zarrs/bz2,zarrs/pcodec ``` ## Licence diff --git a/docs/benchmarks.md b/docs/benchmarks.md index 23bf745..725515e 100644 --- a/docs/benchmarks.md +++ b/docs/benchmarks.md @@ -1,6 +1,16 @@ # Benchmarks +> [!CAUTION] +> Take these benchmarks with a grain of salt, they need to be audited. +> +> The `zarrs_benchmark_read` and `zarrs_benchmark_read_async` binaries have been optimised to be as efficient as possible with the `zarrs` API. +> +> Conversely, the `tensorstore` benchmark script may not be using the optimal API for tensorstore, might not be handling async very well, and may not be doing exactly what the zarrs benchmark is doing. +> Furthermore, tensorstore benchmarks use the python rather than the C++ API and are subject to the overheads of python. + +> TODO: Audit benchmarks and benchmark more Zarr V3 implementations + ## Benchmark Data Benchmark data is generated with `scripts/generate_benchmark_array.py` as follows ```bash @@ -23,29 +33,34 @@ Benchmark data is generated with `scripts/generate_benchmark_array.py` as follow - `data/benchmark_compress_shard.zarr`: 1.1G ## Benchmark System -- Ryzen 5900X +- AMD Ryzen 5900X - 64GB DDR4 3600MHz (16-19-19-39) - 2TB Samsung 990 Pro - Ubuntu 22.04 (in Windows 11 WSL2, swap disabled, 24GB available memory) -- Rust 1.76.0 +- Rust 1.76.0 (07dca489a 2024-02-04) ## Implementation Versions Benchmarked -- zarrs_tools v0.3.0 installed with `RUSTFLAGS="-C target-cpu=native" cargo install --path .` +- zarrs_tools v0.3.0 (prerelease) installed with `RUSTFLAGS="-C target-cpu=native" cargo install --path .` - tensorstore v0.1.53 installed with `pip install tensorstore` ## Comparative Benchmarks - > TODO: Check benchmark equivalence between implementations, evaluate more implementations ### Read Entire Array ```bash python3 ./scripts/run_benchmark_read_all.py ``` +> [!NOTE] +> Rather than simply calling a single retrieve method like `async_retrieve_array_subset`, the zarrs async benchmark uses a ***complicated*** alternative routine. +> +> This is necessary to achieve decent performance with many chunks because the zarrs async API is unable to parallelise across chunks. +> See . + | Image | Wall time (s)
zarrs
sync |

async |
tensorstore
async | Memory usage (GB)
zarrs
sync |

async |
tensorstore
async | |:-----------------------------------|---------------------------------:|----------------:|---------------------------:|-------------------------------------:|----------------:|---------------------------:| -| data/benchmark.zarr | 3 | 14.88 | 3.29 | 8.41 | 8.4 | 8.6 | -| data/benchmark_compress.zarr | 2.84 | 17.36 | 2.76 | 8.44 | 8.41 | 8.53 | -| data/benchmark_compress_shard.zarr | 1.67 | 2.93 | 2.66 | 8.63 | 8.63 | 8.6 | +| data/benchmark.zarr | 3.03 | 9.27 | 3.23 | 8.42 | 8.41 | 8.58 | +| data/benchmark_compress.zarr | 2.84 | 8.45 | 2.68 | 8.44 | 8.43 | 8.53 | +| data/benchmark_compress_shard.zarr | 1.62 | 1.83 | 2.58 | 8.63 | 8.73 | 8.57 | ### Read Chunk-By-Chunk diff --git a/scripts/run_benchmark_read_all.py b/scripts/run_benchmark_read_all.py index 4605705..8daf4bb 100755 --- a/scripts/run_benchmark_read_all.py +++ b/scripts/run_benchmark_read_all.py @@ -3,7 +3,7 @@ import subprocess import re import pandas as pd -import os +import numpy as np import math implementation_to_args = { @@ -15,6 +15,8 @@ def clear_cache(): subprocess.call(['sudo', 'sh', '-c', "sync; echo 3 > /proc/sys/vm/drop_caches"]) +best_of = 3 + index = [] rows = [] for image in [ @@ -26,32 +28,42 @@ def clear_cache(): wall_times = [] memory_usages = [] for implementation in ["zarrs_sync", "zarrs_async", "tensorstore"]: - print(implementation, image) - args = implementation_to_args[implementation] + [image] - clear_cache() - pipes = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - std_out, std_err = pipes.communicate() - # print(std_err) - wall_time = re.search( - r"Elapsed \(wall clock\) time \(h:mm:ss or m:ss\): (\d+?):([\d\.]+?)\\n", - str(std_err), - ) - memory_usage = re.search( - r"Maximum resident set size \(kbytes\): (\d+?)\\n", str(std_err) - ) - if wall_time and memory_usage: - m = int(wall_time.group(1)) - s = float(wall_time.group(2)) - wall_time_s = m * 60 + s - # print(wall_time_s) - memory_usage_kb = int(memory_usage.group(1)) - memory_usage_gb = float(memory_usage_kb) / 1.0e6 - # print(memory_usage_gb) - wall_times.append(f"{wall_time_s:.02f}") - memory_usages.append(f"{memory_usage_gb:.02f}") - else: - wall_times.append(math.nan) - memory_usages.append(math.nan) + wall_time_measurements = [] + memory_usage_measurements = [] + for i in range(best_of): + print(implementation, image, i) + args = implementation_to_args[implementation] + [image] + clear_cache() + pipes = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + std_out, std_err = pipes.communicate() + # print(std_err) + + wall_time = re.search( + r"Elapsed \(wall clock\) time \(h:mm:ss or m:ss\): (\d+?):([\d\.]+?)\\n", + str(std_err), + ) + memory_usage = re.search( + r"Maximum resident set size \(kbytes\): (\d+?)\\n", str(std_err) + ) + if wall_time and memory_usage: + m = int(wall_time.group(1)) + s = float(wall_time.group(2)) + wall_time_s = m * 60 + s + # print(wall_time_s) + memory_usage_kb = int(memory_usage.group(1)) + memory_usage_gb = float(memory_usage_kb) / 1.0e6 + # print(memory_usage_gb) + wall_time_measurements.append(wall_time_s) + memory_usage_measurements.append(memory_usage_gb) + else: + wall_time_measurements.append(math.nan) + memory_usage_measurements.append(math.nan) + + wall_time_best = np.nanmin(wall_time_measurements) + memory_usages_best = np.nanmin(memory_usage_measurements) + wall_times.append(f"{wall_time_best:.02f}") + memory_usages.append(f"{memory_usages_best:.02f}") + row = wall_times + memory_usages rows.append(row) diff --git a/src/bin/zarrs_benchmark_read_async.rs b/src/bin/zarrs_benchmark_read_async.rs index 226d28c..1e51111 100644 --- a/src/bin/zarrs_benchmark_read_async.rs +++ b/src/bin/zarrs_benchmark_read_async.rs @@ -1,11 +1,13 @@ -use std::{sync::Arc, time::SystemTime}; +use std::{cell::UnsafeCell, sync::Arc, time::SystemTime}; +use async_scoped::spawner::Spawner; use clap::Parser; use futures::{FutureExt, StreamExt}; use zarrs::{ array::{ codec::{ArrayCodecTraits, CodecOptionsBuilder}, concurrency::RecommendedConcurrency, + ArrayView, }, array_subset::ArraySubset, config::global_config, @@ -58,17 +60,81 @@ async fn main() -> Result<(), Box> { // println!("{:#?}", array.metadata()); let chunks = ArraySubset::new_with_shape(array.chunk_grid_shape().unwrap()); - let chunks_shape = chunks.shape(); let start = SystemTime::now(); let mut bytes_decoded = 0; - let chunk_indices = (0..chunks.shape().iter().product()) - .map(|chunk_index| zarrs::array::unravel_index(chunk_index, chunks_shape)) - .collect::>(); + let element_size = array.data_type().size(); + let chunk_indices = chunks.indices().into_iter().collect::>(); if args.read_all { - let subset = ArraySubset::new_with_shape(array.shape().to_vec()); - bytes_decoded += array.async_retrieve_array_subset(&subset).await?.len(); + let array_shape = array.shape().to_vec(); + let array_subset = ArraySubset::new_with_shape(array_shape.to_vec()); + // -------------------------------------- SLOW -------------------------------------- + // See https://docs.rs/zarrs/latest/zarrs/array/struct.Array.html#async-api + // let array_data = array.async_retrieve_array_subset(&array_subset).await?; + // ---------------------------------------------------------------------------------- + + // -------------------------------------- FAST -------------------------------------- + // This might get integrated into zarrs itself as Array::async_retrieve_array_subset_tokio in the future + let array_data = { + // Calculate chunk/codec concurrency + let chunk_representation = + array.chunk_array_representation(&vec![0; array.chunk_grid().dimensionality()])?; + let concurrent_target = std::thread::available_parallelism().unwrap().get(); + let (chunk_concurrent_limit, codec_concurrent_target) = + zarrs::array::concurrency::calc_concurrency_outer_inner( + concurrent_target, + { + let concurrent_chunks = + std::cmp::min(chunks.num_elements_usize(), concurrent_target); + &RecommendedConcurrency::new(concurrent_chunks..concurrent_chunks) + }, + &array + .codecs() + .recommended_concurrency(&chunk_representation)?, + ); + let codec_options = CodecOptionsBuilder::new() + .concurrent_target(codec_concurrent_target) + .build(); + + // Allocate output and decode into it + let array_data = + UnsafeCell::new(vec![0u8; array_subset.num_elements_usize() * element_size]); + { + let decode_chunk_into_array = |chunk_indices: Vec| { + let chunk_subset = array.chunk_subset(&chunk_indices).unwrap(); + let codec_options = codec_options.clone(); + let array = array.clone(); + let data = unsafe { array_data.get().as_mut() }.unwrap().as_mut_slice(); + async move { + let array_shape = array.shape().to_vec(); + let array_subset = ArraySubset::new_with_shape(array_shape.clone()); + let array_view = ArrayView::new(data, &array_shape, array_subset).unwrap(); + array + .async_retrieve_array_subset_into_array_view_opt( + &chunk_subset, + &unsafe { array_view.subset_view(&chunk_subset).unwrap() }, + &codec_options, + ) + .await + } + }; + let spawner = async_scoped::spawner::use_tokio::Tokio::default(); + let futures = chunk_indices + .into_iter() + .map(|chunk_indices| decode_chunk_into_array(chunk_indices)); + let mut stream = futures::stream::iter(futures) + .map(|future| spawner.spawn(future)) + .buffer_unordered(chunk_concurrent_limit); + while let Some(item) = stream.next().await { + item??; + } + } + array_data.into_inner() + }; + // ---------------------------------------------------------------------------------- + bytes_decoded += array_data.len(); } else { + // Calculate chunk/codec concurrency let chunk_representation = array.chunk_array_representation(&vec![0; array.chunk_grid().dimensionality()])?; let concurrent_target = std::thread::available_parallelism().unwrap().get();