Skip to content

Commit

Permalink
Improve async decode perf
Browse files Browse the repository at this point in the history
This is a huge workaround for zarrs being async runtime agnostic
  • Loading branch information
LDeakin committed Feb 22, 2024
1 parent 92fcb73 commit 3e3317d
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 73 deletions.
66 changes: 40 additions & 26 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ categories = ["command-line-utilities", "encoding"]
ncvar2zarr = ["dep:netcdf"]

[dependencies]
async-scoped = { version = "0.9.0", features = ["use-tokio"] }
clap = { version = "4.4.6", features = ["derive"] }
futures = "0.3.29"
indicatif = { version = "0.17.7", features = ["rayon"] }
Expand Down
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ Various tools for creating and manipulating [Zarr v3](https://zarr.dev) data wit
[Changelog (CHANGELOG.md)](https://github.com/LDeakin/zarrs_tools/blob/main/CHANGELOG.md)

## Tools
- Benchmark scripts that measure the time to retrieve an array either chunk-by-chunk or at once into a single array.
- `zarrs_benchmark_read_sync`: benchmark with the zarrs sync API.
- `zarrs_benchmark_read_async`: benchmark with the zarrs async API.
- See [docs/benchmarks.md](https://github.com/LDeakin/zarrs_tools/blob/main/docs/benchmarks.md) for some benchmark measurements.
- `zarrs_reencode`: reencode a Zarr v3 array. [Example](https://github.com/LDeakin/zarrs_tools/blob/main/docs/reencode_rechunk.md).
- Can change the chunk size, shard size, compression, etc.
- Suitable for round trip benchmarking
- Suitable for round trip benchmarking.
- `zarrs_binary2zarr`: create a Zarr v3 array from piped binary data. [Example](https://github.com/LDeakin/zarrs_tools/blob/main/docs/convert_binary.md).
- `zarrs_ncvar2zarr` (requires `ncvar2zarr` feature): convert a netCDF variable to a zarr V3 array. [Example](https://github.com/LDeakin/zarrs_tools/blob/main/docs/convert_netcdf.md).
- `zarrs_benchmark_read_sync`: Measure the time to read (decode) each chunk of an array using the zarrs sync API.
- `zarrs_benchmark_read_async`: Measure the time to read (decode) each chunk of an array using the zarrs async API.

See [docs/benchmarks.md](https://github.com/LDeakin/zarrs_tools/blob/main/docs/benchmarks.md) for some benchmark measurements.

## Install

Expand All @@ -41,7 +41,7 @@ Non-default `zarrs` codecs (see [`zarrs` crate features](https://docs.rs/zarrs/l

For example:
```bash
cargo install zarrs_tools --features zarrs/bitround,zarrs/zfp
cargo install zarrs_tools --features zarrs/bitround,zarrs/zfp,zarrs/bz2,zarrs/pcodec
```

## Licence
Expand Down
29 changes: 22 additions & 7 deletions docs/benchmarks.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@

# Benchmarks

> [!CAUTION]
> Take these benchmarks with a grain of salt, they need to be audited.
>
> The `zarrs_benchmark_read` and `zarrs_benchmark_read_async` binaries have been optimised to be as efficient as possible with the `zarrs` API.
>
> Conversely, the `tensorstore` benchmark script may not be using the optimal API for tensorstore, might not be handling async very well, and may not be doing exactly what the zarrs benchmark is doing.
> Furthermore, tensorstore benchmarks use the python rather than the C++ API and are subject to the overheads of python.
> TODO: Audit benchmarks and benchmark more Zarr V3 implementations
## Benchmark Data
Benchmark data is generated with `scripts/generate_benchmark_array.py` as follows
```bash
Expand All @@ -23,29 +33,34 @@ Benchmark data is generated with `scripts/generate_benchmark_array.py` as follow
- `data/benchmark_compress_shard.zarr`: 1.1G

## Benchmark System
- Ryzen 5900X
- AMD Ryzen 5900X
- 64GB DDR4 3600MHz (16-19-19-39)
- 2TB Samsung 990 Pro
- Ubuntu 22.04 (in Windows 11 WSL2, swap disabled, 24GB available memory)
- Rust 1.76.0
- Rust 1.76.0 (07dca489a 2024-02-04)

## Implementation Versions Benchmarked
- zarrs_tools v0.3.0 installed with `RUSTFLAGS="-C target-cpu=native" cargo install --path .`
- zarrs_tools v0.3.0 (prerelease) installed with `RUSTFLAGS="-C target-cpu=native" cargo install --path .`
- tensorstore v0.1.53 installed with `pip install tensorstore`

## Comparative Benchmarks
> TODO: Check benchmark equivalence between implementations, evaluate more implementations

### Read Entire Array
```bash
python3 ./scripts/run_benchmark_read_all.py
```

> [!NOTE]
> Rather than simply calling a single retrieve method like `async_retrieve_array_subset`, the zarrs async benchmark uses a ***complicated*** alternative routine.
>
> This is necessary to achieve decent performance with many chunks because the zarrs async API is unable to parallelise across chunks.
> See <https://docs.rs/zarrs/latest/zarrs/array/struct.Array.html#async-api>.
| Image | Wall time (s)<br>zarrs<br>sync | <br><br>async | <br>tensorstore<br>async | Memory usage (GB)<br>zarrs<br>sync | <br><br>async | <br>tensorstore<br>async |
|:-----------------------------------|---------------------------------:|----------------:|---------------------------:|-------------------------------------:|----------------:|---------------------------:|
| data/benchmark.zarr | 3 | 14.88 | 3.29 | 8.41 | 8.4 | 8.6 |
| data/benchmark_compress.zarr | 2.84 | 17.36 | 2.76 | 8.44 | 8.41 | 8.53 |
| data/benchmark_compress_shard.zarr | 1.67 | 2.93 | 2.66 | 8.63 | 8.63 | 8.6 |
| data/benchmark.zarr | 3.03 | 9.27 | 3.23 | 8.42 | 8.41 | 8.58 |
| data/benchmark_compress.zarr | 2.84 | 8.45 | 2.68 | 8.44 | 8.43 | 8.53 |
| data/benchmark_compress_shard.zarr | 1.62 | 1.83 | 2.58 | 8.63 | 8.73 | 8.57 |


### Read Chunk-By-Chunk
Expand Down
66 changes: 39 additions & 27 deletions scripts/run_benchmark_read_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import subprocess
import re
import pandas as pd
import os
import numpy as np
import math

implementation_to_args = {
Expand All @@ -15,6 +15,8 @@
def clear_cache():
subprocess.call(['sudo', 'sh', '-c', "sync; echo 3 > /proc/sys/vm/drop_caches"])

best_of = 3

index = []
rows = []
for image in [
Expand All @@ -26,32 +28,42 @@ def clear_cache():
wall_times = []
memory_usages = []
for implementation in ["zarrs_sync", "zarrs_async", "tensorstore"]:
print(implementation, image)
args = implementation_to_args[implementation] + [image]
clear_cache()
pipes = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
std_out, std_err = pipes.communicate()
# print(std_err)
wall_time = re.search(
r"Elapsed \(wall clock\) time \(h:mm:ss or m:ss\): (\d+?):([\d\.]+?)\\n",
str(std_err),
)
memory_usage = re.search(
r"Maximum resident set size \(kbytes\): (\d+?)\\n", str(std_err)
)
if wall_time and memory_usage:
m = int(wall_time.group(1))
s = float(wall_time.group(2))
wall_time_s = m * 60 + s
# print(wall_time_s)
memory_usage_kb = int(memory_usage.group(1))
memory_usage_gb = float(memory_usage_kb) / 1.0e6
# print(memory_usage_gb)
wall_times.append(f"{wall_time_s:.02f}")
memory_usages.append(f"{memory_usage_gb:.02f}")
else:
wall_times.append(math.nan)
memory_usages.append(math.nan)
wall_time_measurements = []
memory_usage_measurements = []
for i in range(best_of):
print(implementation, image, i)
args = implementation_to_args[implementation] + [image]
clear_cache()
pipes = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
std_out, std_err = pipes.communicate()
# print(std_err)

wall_time = re.search(
r"Elapsed \(wall clock\) time \(h:mm:ss or m:ss\): (\d+?):([\d\.]+?)\\n",
str(std_err),
)
memory_usage = re.search(
r"Maximum resident set size \(kbytes\): (\d+?)\\n", str(std_err)
)
if wall_time and memory_usage:
m = int(wall_time.group(1))
s = float(wall_time.group(2))
wall_time_s = m * 60 + s
# print(wall_time_s)
memory_usage_kb = int(memory_usage.group(1))
memory_usage_gb = float(memory_usage_kb) / 1.0e6
# print(memory_usage_gb)
wall_time_measurements.append(wall_time_s)
memory_usage_measurements.append(memory_usage_gb)
else:
wall_time_measurements.append(math.nan)
memory_usage_measurements.append(math.nan)

wall_time_best = np.nanmin(wall_time_measurements)
memory_usages_best = np.nanmin(memory_usage_measurements)
wall_times.append(f"{wall_time_best:.02f}")
memory_usages.append(f"{memory_usages_best:.02f}")

row = wall_times + memory_usages
rows.append(row)

Expand Down
Loading

0 comments on commit 3e3317d

Please sign in to comment.