Skip to content

Commit

Permalink
Add unit tests for non-root groups
Browse files Browse the repository at this point in the history
  • Loading branch information
chuckwondo committed Jan 23, 2025
1 parent 07ed9e6 commit d88483f
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 81 deletions.
139 changes: 74 additions & 65 deletions virtualizarr/tests/test_writers/test_icechunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@

import numpy as np
import numpy.testing as npt
from xarray import Coordinates, Dataset, concat, open_dataset, open_zarr
from xarray.core.variable import Variable
from zarr import Array, Group, group # type: ignore
from zarr.core.metadata import ArrayV3Metadata # type: ignore
import xarray as xr
import zarr
from zarr.core.metadata import ArrayV3Metadata

from virtualizarr.manifests import ChunkManifest, ManifestArray
from virtualizarr.readers.common import separate_coords
Expand All @@ -40,24 +39,27 @@ def icechunk_filestore(icechunk_storage: "Storage") -> "IcechunkStore":
return session.store


@pytest.mark.parametrize("group_path", [None, "", "/a", "a", "/a/b", "a/b", "a/b/"])
def test_write_new_virtual_variable(
icechunk_filestore: "IcechunkStore", vds_with_manifest_arrays: Dataset
icechunk_filestore: "IcechunkStore",
vds_with_manifest_arrays: xr.Dataset,
group_path: Optional[str],
):
vds = vds_with_manifest_arrays

dataset_to_icechunk(vds, icechunk_filestore)
dataset_to_icechunk(vds, icechunk_filestore, group=group_path)

# check attrs
root_group = group(store=icechunk_filestore)
assert isinstance(root_group, Group)
assert root_group.attrs == {"something": 0}
group = zarr.group(store=icechunk_filestore, path=group_path)
assert isinstance(group, zarr.Group)
assert group.attrs.asdict() == {"something": 0}

# TODO check against vds, then perhaps parametrize?

# check array exists
assert "a" in root_group
arr = root_group["a"]
assert isinstance(arr, Array)
assert "a" in group
arr = group["a"]
assert isinstance(arr, zarr.Array)

# check array metadata
assert arr.metadata.zarr_format == 3
Expand Down Expand Up @@ -101,24 +103,24 @@ def test_set_single_virtual_ref_without_encoding(
chunkmanifest=manifest,
zarray=zarray,
)
foo = Variable(data=ma, dims=["x", "y"])
vds = Dataset(
foo = xr.Variable(data=ma, dims=["x", "y"])
vds = xr.Dataset(
{"foo": foo},
)

dataset_to_icechunk(vds, icechunk_filestore)

root_group = group(store=icechunk_filestore)
root_group = zarr.group(store=icechunk_filestore)
array = root_group["foo"]

# check chunk references
# TODO we can't explicitly check that the path/offset/length is correct because icechunk doesn't yet expose any get_virtual_refs method

expected_ds = open_dataset(simple_netcdf4)
expected_ds = xr.open_dataset(simple_netcdf4)
expected_array = expected_ds["foo"].to_numpy()
npt.assert_equal(array, expected_array)

ds = open_zarr(store=icechunk_filestore, zarr_format=3, consolidated=False)
ds = xr.open_zarr(store=icechunk_filestore, zarr_format=3, consolidated=False)
# TODO: Check using xarray.testing.assert_identical
xrt.assert_identical(ds.foo, expected_ds.foo)

Expand All @@ -131,7 +133,7 @@ def test_set_single_virtual_ref_with_encoding(
):
import xarray.testing as xrt

expected_ds = open_dataset(netcdf4_file).drop_vars(["lon", "lat", "time"])
expected_ds = xr.open_dataset(netcdf4_file).drop_vars(["lon", "lat", "time"])
# these attributes encode floats different and I am not sure why, but its not important enough to block everything
expected_ds.air.attrs.pop("actual_range")

Expand All @@ -151,19 +153,19 @@ def test_set_single_virtual_ref_with_encoding(
chunkmanifest=manifest,
zarray=zarray,
)
air = Variable(
air = xr.Variable(
data=ma,
dims=["time", "lat", "lon"],
encoding={"scale_factor": 0.01},
attrs=expected_ds.air.attrs,
)
vds = Dataset({"air": air}, attrs=expected_ds.attrs)
vds = xr.Dataset({"air": air}, attrs=expected_ds.attrs)

dataset_to_icechunk(vds, icechunk_filestore)

root_group = group(store=icechunk_filestore)
root_group = zarr.group(store=icechunk_filestore)
air_array = root_group["air"]
assert isinstance(air_array, Array)
assert isinstance(air_array, zarr.Array)

# check array metadata
assert air_array.shape == (2920, 25, 53)
Expand All @@ -176,7 +178,7 @@ def test_set_single_virtual_ref_with_encoding(

# Load in the dataset, we drop the coordinates because we don't have them in the zarr test case
# Check with xarray
ds = open_zarr(store=icechunk_filestore, zarr_format=3, consolidated=False)
ds = xr.open_zarr(store=icechunk_filestore, zarr_format=3, consolidated=False)
xrt.assert_identical(ds, expected_ds)

# note: we don't need to test that committing works, because now we have confirmed
Expand Down Expand Up @@ -211,16 +213,16 @@ def test_set_grid_virtual_refs(icechunk_filestore: "IcechunkStore", netcdf4_file
chunkmanifest=manifest,
zarray=zarray,
)
air = Variable(data=ma, dims=["y", "x"])
vds = Dataset(
air = xr.Variable(data=ma, dims=["y", "x"])
vds = xr.Dataset(
{"air": air},
)

dataset_to_icechunk(vds, icechunk_filestore)

root_group = group(store=icechunk_filestore)
root_group = zarr.group(store=icechunk_filestore)
air_array = root_group["air"]
assert isinstance(air_array, Array)
assert isinstance(air_array, zarr.Array)

# check array metadata
assert air_array.shape == (4, 4)
Expand Down Expand Up @@ -263,33 +265,33 @@ def test_write_loadable_variable(
zarray=zarray,
)

ma_v = Variable(data=ma, dims=["x", "y"])
ma_v = xr.Variable(data=ma, dims=["x", "y"])

la_v = Variable(
la_v = xr.Variable(
dims=["x", "y"],
data=np.random.rand(3, 4),
attrs={"units": "km"},
)
vds = Dataset({"air": la_v}, {"pres": ma_v})
vds = xr.Dataset({"air": la_v}, {"pres": ma_v})

# Icechunk checksums currently store with second precision, so we need to make sure
# the checksum_date is at least one second in the future
checksum_date = datetime.now(timezone.utc) + timedelta(seconds=1)
dataset_to_icechunk(vds, icechunk_filestore, last_updated_at=checksum_date)

root_group = group(store=icechunk_filestore)
root_group = zarr.group(store=icechunk_filestore)
air_array = root_group["air"]
assert isinstance(air_array, Array)
assert isinstance(air_array, zarr.Array)
assert air_array.shape == (3, 4)
assert air_array.dtype == np.dtype("float64")
assert air_array.attrs["units"] == "km"
npt.assert_equal(air_array[:], la_v[:])

pres_array = root_group["pres"]
assert isinstance(pres_array, Array)
assert isinstance(pres_array, zarr.Array)
assert pres_array.shape == (3, 4)
assert pres_array.dtype == np.dtype("int32")
expected_ds = open_dataset(simple_netcdf4)
expected_ds = xr.open_dataset(simple_netcdf4)
expected_array = expected_ds["foo"].to_numpy()
npt.assert_equal(pres_array, expected_array)

Expand All @@ -302,8 +304,8 @@ def test_checksum(

netcdf_path = tmpdir / "test.nc"
arr = np.arange(12, dtype=np.dtype("int32")).reshape(3, 4) * 2
var = Variable(data=arr, dims=["x", "y"])
ds = Dataset({"foo": var})
var = xr.Variable(data=arr, dims=["x", "y"])
ds = xr.Dataset({"foo": var})
ds.to_netcdf(netcdf_path)

# instead for now just write out byte ranges explicitly
Expand All @@ -323,9 +325,9 @@ def test_checksum(
zarray=zarray,
)

ma_v = Variable(data=ma, dims=["x", "y"])
ma_v = xr.Variable(data=ma, dims=["x", "y"])

vds = Dataset({"pres": ma_v})
vds = xr.Dataset({"pres": ma_v})

# Icechunk checksums currently store with second precision, so we need to make sure
# the checksum_date is at least one second in the future
Expand All @@ -336,29 +338,29 @@ def test_checksum(
with pytest.raises(TypeError):
dataset_to_icechunk(vds, icechunk_filestore, last_updated_at="not a datetime") # type: ignore

root_group = group(store=icechunk_filestore)
root_group = zarr.group(store=icechunk_filestore)
pres_array = root_group["pres"]
assert isinstance(pres_array, Array)
assert isinstance(pres_array, zarr.Array)
assert pres_array.shape == (3, 4)
assert pres_array.dtype == np.dtype("int32")
expected_ds = open_dataset(netcdf_path)
expected_ds = xr.open_dataset(netcdf_path)
expected_array = expected_ds["foo"].to_numpy()
npt.assert_equal(pres_array, expected_array)
expected_ds.close()

# Now we can overwrite the simple_netcdf4 file with new data to make sure that
# the checksum_date is being used to determine if the data is valid
arr = np.arange(12, dtype=np.dtype("int32")).reshape(3, 4) * 2
var = Variable(data=arr, dims=["x", "y"])
ds = Dataset({"foo": var})
var = xr.Variable(data=arr, dims=["x", "y"])
ds = xr.Dataset({"foo": var})
time.sleep(1) # Make sure the checksum_date is at least one second in the future
ds.to_netcdf(netcdf_path)

# Now if we try to read the data back in, it should fail because the checksum_date
# is newer than the last_updated_at
with pytest.raises(IcechunkError):
pres_array = root_group["pres"]
assert isinstance(pres_array, Array)
assert isinstance(pres_array, zarr.Array)
npt.assert_equal(pres_array, arr)


Expand Down Expand Up @@ -439,7 +441,7 @@ def gen_virtual_variable(
dims: list[str] = [],
zarr_format: Literal[2, 3] = 2,
attrs: dict[str, Any] = {},
) -> Variable:
) -> xr.Variable:
manifest = generate_chunk_manifest(
file_uri,
shape=shape,
Expand All @@ -457,7 +459,7 @@ def gen_virtual_variable(
zarr_format=zarr_format,
)
ma = ManifestArray(chunkmanifest=manifest, zarray=zarray)
return Variable(
return xr.Variable(
data=ma,
dims=dims,
encoding=encoding,
Expand All @@ -479,9 +481,9 @@ def gen_virtual_dataset(
length: int = 48,
dims: Optional[list[str]] = None,
zarr_format: Literal[2, 3] = 2,
coords: Optional[Coordinates] = None,
) -> Dataset:
ds = open_dataset(file_uri)
coords: Optional[xr.Coordinates] = None,
) -> xr.Dataset:
ds = xr.open_dataset(file_uri)
ds_dims: list[str] = cast(list[str], list(ds.dims))
dims = dims or ds_dims
var = gen_virtual_variable(
Expand All @@ -499,7 +501,7 @@ def gen_virtual_dataset(
zarr_format=zarr_format,
attrs=ds[variable_name].attrs,
)
return Dataset(
return xr.Dataset(
{variable_name: var},
coords=coords,
attrs=ds.attrs,
Expand Down Expand Up @@ -537,12 +539,12 @@ def test_append_virtual_ref_without_encoding(
icechunk_filestore_append = repo.writable_session("main")
dataset_to_icechunk(vds, icechunk_filestore_append.store, append_dim="x")
icechunk_filestore_append.commit("appended data again")
array = open_zarr(
array = xr.open_zarr(
icechunk_filestore_append.store, consolidated=False, zarr_format=3
)

expected_ds = open_dataset(simple_netcdf4)
expected_array = concat([expected_ds, expected_ds, expected_ds], dim="x")
expected_ds = xr.open_dataset(simple_netcdf4)
expected_array = xr.concat([expected_ds, expected_ds, expected_ds], dim="x")
xrt.assert_identical(array, expected_array)

def test_append_virtual_ref_with_encoding(
Expand Down Expand Up @@ -592,12 +594,15 @@ def test_append_virtual_ref_with_encoding(
icechunk_filestore_append = icechunk_repo.writable_session("main")
dataset_to_icechunk(vds2, icechunk_filestore_append.store, append_dim="time")
icechunk_filestore_append.commit("appended data")
new_ds = open_zarr(
new_ds = xr.open_zarr(
icechunk_filestore_append.store, consolidated=False, zarr_format=3
)

expected_ds1, expected_ds2 = open_dataset(filepath1), open_dataset(filepath2)
expected_ds = concat([expected_ds1, expected_ds2], dim="time").drop_vars(
expected_ds1, expected_ds2 = (
xr.open_dataset(filepath1),
xr.open_dataset(filepath2),
)
expected_ds = xr.concat([expected_ds1, expected_ds2], dim="time").drop_vars(
["time", "lat", "lon"], errors="ignore"
)
# Because we encode attributes, attributes may differ, for example
Expand Down Expand Up @@ -695,7 +700,9 @@ async def test_append_with_multiple_root_arrays(
icechunk_filestore.commit(
"test commit"
) # need to commit it in order to append to it in the next lines
new_ds = open_zarr(icechunk_filestore.store, consolidated=False, zarr_format=3)
new_ds = xr.open_zarr(
icechunk_filestore.store, consolidated=False, zarr_format=3
)
first_time_chunk_before_append = await icechunk_filestore.store.get(
"time/c/0", prototype=default_buffer_prototype()
)
Expand All @@ -709,12 +716,15 @@ async def test_append_with_multiple_root_arrays(
"time/c/0", prototype=default_buffer_prototype()
)
) == first_time_chunk_before_append
new_ds = open_zarr(
new_ds = xr.open_zarr(
icechunk_filestore_append.store, consolidated=False, zarr_format=3
)

expected_ds1, expected_ds2 = open_dataset(filepath1), open_dataset(filepath2)
expected_ds = concat([expected_ds1, expected_ds2], dim="time")
expected_ds1, expected_ds2 = (
xr.open_dataset(filepath1),
xr.open_dataset(filepath2),
)
expected_ds = xr.concat([expected_ds1, expected_ds2], dim="time")
xrt.assert_equal(new_ds, expected_ds)

# When appending to a virtual ref with compression, it succeeds
Expand Down Expand Up @@ -775,12 +785,12 @@ def test_append_with_compression_succeeds(
icechunk_filestore_append = icechunk_repo.writable_session("main")
dataset_to_icechunk(vds2, icechunk_filestore_append.store, append_dim="time")
icechunk_filestore_append.commit("appended data")
updated_ds = open_zarr(
updated_ds = xr.open_zarr(
store=icechunk_filestore_append.store, consolidated=False, zarr_format=3
)

expected_ds1, expected_ds2 = open_dataset(file1), open_dataset(file2)
expected_ds = concat([expected_ds1, expected_ds2], dim="time")
expected_ds1, expected_ds2 = xr.open_dataset(file1), xr.open_dataset(file2)
expected_ds = xr.concat([expected_ds1, expected_ds2], dim="time")
expected_ds = expected_ds.drop_vars(["lon", "lat", "time"], errors="ignore")
xrt.assert_equal(updated_ds, expected_ds)

Expand Down Expand Up @@ -886,13 +896,12 @@ def test_append_dim_not_in_dims_raises_error(

# Attempt to append using a non-existent append_dim "z"
icechunk_filestore_append = icechunk_repo.writable_session("main")

with pytest.raises(
ValueError,
match="append_dim 'z' does not match any existing dataset dimensions",
):
dataset_to_icechunk(vds, icechunk_filestore_append.store, append_dim="z")


# TODO test writing to a group that isn't the root group

# TODO test with S3 / minio
Loading

0 comments on commit d88483f

Please sign in to comment.