Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get missing values and dtype without re-opening the dataset #180

Merged
merged 15 commits into from
Feb 12, 2024
152 changes: 78 additions & 74 deletions activestorage/active.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
}
return instance

def __init__(self, uri, ncvar, storage_type=None, missing_value=None, _FillValue=None, valid_min=None, valid_max=None, max_threads=100):
def __init__(self, uri, ncvar, storage_type=None, max_threads=100):
"""
Instantiate with a NetCDF4 dataset and the variable of interest within that file.
(We need the variable, because we need variable specific metadata from within that
Expand All @@ -92,65 +92,6 @@
self._method = None
self._lock = False
self._max_threads = max_threads

# obtain metadata, using netcdf4_python for now
# FIXME: There is an outstanding issue with ._FilLValue to be handled.
# If the user actually wrote the data with no fill value, or the
# default fill value is in play, then this might go wrong.
if storage_type is None:
ds = Dataset(uri)
elif storage_type == "s3":
with load_from_s3(uri) as _ds:
ds = _ds
try:
ds_var = ds[ncvar]
except IndexError as exc:
print(f"Dataset {ds} does not contain ncvar {ncvar!r}.")
raise exc

# FIXME: We do not get the correct byte order on the Zarr Array's dtype
# when using S3, so capture it here.
self._dtype = ds_var.dtype

if (missing_value, _FillValue, valid_min, valid_max) == (None, None, None, None):
if isinstance(ds, Dataset):
self._missing = getattr(ds_var, 'missing_value', None)
self._fillvalue = getattr(ds_var, '_FillValue', None)
# could be fill_value set as netCDF4 attr
if self._fillvalue is None:
self._fillvalue = getattr(ds_var, 'fill_value', None)
valid_min = getattr(ds_var, 'valid_min', None)
valid_max = getattr(ds_var, 'valid_max', None)
valid_range = getattr(ds_var, 'valid_range', None)
elif storage_type == "s3":
self._missing = ds_var.attrs.get('missing_value')
self._fillvalue = ds_var.attrs.get('_FillValue')
# could be fill_value set as netCDF4 attr
if self._fillvalue is None:
self._fillvalue = ds_var.attrs.get('fill_value')
valid_min = ds_var.attrs.get('valid_min')
valid_max = ds_var.attrs.get('valid_max')
valid_range = ds_var.attrs.get('valid_range')

if valid_max is not None or valid_min is not None:
if valid_range is not None:
raise ValueError(
"Invalid combination in the file of valid_min, "
"valid_max, valid_range: "
f"{valid_min}, {valid_max}, {valid_range}"
)
valid_range = (valid_min, valid_max)
elif valid_range is None:
valid_range = (None, None)
self._valid_min, self._valid_max = valid_range

else:
self._missing = missing_value
self._fillvalue = _FillValue
self._valid_min = valid_min
self._valid_max = valid_max

ds.close()

def __getitem__(self, index):
"""
Expand All @@ -174,22 +115,16 @@
elif self.storage_type == "s3":
with load_from_s3(self.uri) as nc:
data = nc[ncvar][index]
# h5netcdf doesn't return masked arrays.
if self._fillvalue:
data = np.ma.masked_equal(data, self._fillvalue)
if self._missing:
data = np.ma.masked_equal(data, self._missing)
if self._valid_max:
data = np.ma.masked_greater(data, self._valid_max)
if self._valid_min:
data = np.ma.masked_less(data, self._valid_min)
data = self._mask_data(data, nc[ncvar])

if lock:
lock.release()

return data

elif self._version == 1:
return self._via_kerchunk(index)

elif self._version == 2:
# No active operation either
lock = self.lock
Expand All @@ -202,6 +137,7 @@
lock.release()

return data

else:
raise ValueError(f'Version {self._version} not supported')

Expand Down Expand Up @@ -299,14 +235,27 @@
if self.zds is None:
print(f"Kerchunking file {self.uri} with variable "
f"{self.ncvar} for storage type {self.storage_type}")
ds = nz.load_netcdf_zarr_generic(self.uri,
self.ncvar,
self.storage_type)
ds, zarray, zattrs = nz.load_netcdf_zarr_generic(
self.uri,
self.ncvar,
self.storage_type
)
# The following is a hangove from exploration
# and is needed if using the original doing it ourselves
# self.zds = make_an_array_instance_active(ds)
self.zds = ds

# Retain attributes and other information
if zarray.get('fill_value') is not None:
zattrs['_FillValue'] = zarray['fill_value']

self.zarray = zarray
self.zattrs = zattrs

# FIXME: We do not get the correct byte order on the Zarr
# Array's dtype when using S3, so capture it here.
self._dtype = np.dtype(zarray['dtype'])

return self._get_selection(index)

def _get_selection(self, *args):
Expand All @@ -319,7 +268,28 @@
compressor = self.zds._compressor
filters = self.zds._filters

missing = self._fillvalue, self._missing, self._valid_min, self._valid_max
# Get missing values
_FillValue = self.zattrs.get('_FillValue')
missing_value = self.zattrs.get('missing_value')
valid_min = self.zattrs.get('valid_min')
valid_max = self.zattrs.get('valid_max')
valid_range = self.zattrs.get('valid_range')
if valid_max is not None or valid_min is not None:
if valid_range is not None:
raise ValueError(

Check warning on line 279 in activestorage/active.py

View check run for this annotation

Codecov / codecov/patch

activestorage/active.py#L279

Added line #L279 was not covered by tests
"Invalid combination in the file of valid_min, "
"valid_max, valid_range: "
f"{valid_min}, {valid_max}, {valid_range}"
)
elif valid_range is not None:
valid_min, valid_max = valid_range

missing = (
_FillValue,
missing_value,
valid_min,
valid_max,
)

indexer = OrthogonalIndexer(*args, self.zds)
out_shape = indexer.shape
Expand Down Expand Up @@ -468,3 +438,37 @@
if drop_axes:
tmp = np.squeeze(tmp, axis=drop_axes)
return tmp, out_selection

def _mask_data(self, data, ds_var):
"""ppp"""
# TODO: replace with cfdm.NetCDFIndexer, hopefully.
attrs = ds_var.attrs
missing_value = attrs.get('missing_value')
_FillValue = attrs.get('_FillValue')
valid_min = attrs.get('valid_min')
valid_max = attrs.get('valid_max')
valid_range = attrs.get('valid_range')

if valid_max is not None or valid_min is not None:
if valid_range is not None:
raise ValueError(

Check warning on line 454 in activestorage/active.py

View check run for this annotation

Codecov / codecov/patch

activestorage/active.py#L454

Added line #L454 was not covered by tests
"Invalid combination in the file of valid_min, "
"valid_max, valid_range: "
f"{valid_min}, {valid_max}, {valid_range}"
)
elif valid_range is not None:
valid_min, valid_max = valid_range

if _FillValue is not None:
data = np.ma.masked_equal(data, _FillValue)

if missing_value is not None:
data = np.ma.masked_equal(data, missing_value)

if valid_max is not None:
data = np.ma.masked_greater(data, valid_max)

if valid_min is not None:
data = np.ma.masked_less(data, valid_min)

return data
34 changes: 27 additions & 7 deletions activestorage/netcdf_to_zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from kerchunk.hdf import SingleHdf5ToZarr


def gen_json(file_url, outf, storage_type):
def gen_json(file_url, varname, outf, storage_type):
"""Generate a json file that contains the kerchunk-ed data for Zarr."""
if storage_type == "s3":
fs = s3fs.S3FileSystem(key=S3_ACCESS_KEY,
Expand All @@ -24,7 +24,8 @@
h5chunks = SingleHdf5ToZarr(s3file, file_url,
inline_threshold=0)
with fs2.open(outf, 'wb') as f:
f.write(ujson.dumps(h5chunks.translate()).encode())
content = h5chunks.translate()
f.write(ujson.dumps(content).encode())

Check warning on line 28 in activestorage/netcdf_to_zarr.py

View check run for this annotation

Codecov / codecov/patch

activestorage/netcdf_to_zarr.py#L27-L28

Added lines #L27 - L28 were not covered by tests
else:
fs = fsspec.filesystem('')
with fs.open(file_url, 'rb') as local_file:
Expand All @@ -43,9 +44,13 @@
# faster loading time
# for active storage, we don't want anything inline
with fs.open(outf, 'wb') as f:
f.write(ujson.dumps(h5chunks.translate()).encode())
content = h5chunks.translate()
f.write(ujson.dumps(content).encode())

return outf
zarray = ujson.loads(content['refs'][f"{varname}/.zarray"])
zattrs = ujson.loads(content['refs'][f"{varname}/.zattrs"])

return outf, zarray, zattrs


def open_zarr_group(out_json, varname):
Expand All @@ -60,14 +65,15 @@
mapper = fs.get_mapper("") # local FS mapper
#mapper.fs.reference has the kerchunk mapping, how does this propagate into the Zarr array?
zarr_group = zarr.open_group(mapper)

try:
zarr_array = getattr(zarr_group, varname)
except AttributeError as attrerr:
print(f"Zarr Group does not contain variable {varname}. "
f"Zarr Group info: {zarr_group.info}")
raise attrerr
#print("Zarr array info:", zarr_array.info)

return zarr_array


Expand All @@ -77,10 +83,24 @@

# Write the Zarr group JSON to a temporary file.
with tempfile.NamedTemporaryFile() as out_json:
gen_json(fileloc, out_json.name, storage_type)
_, zarray, zattrs = gen_json(fileloc, varname, out_json.name, storage_type)

# open this monster
print(f"Attempting to open and convert {fileloc}.")
ref_ds = open_zarr_group(out_json.name, varname)

return ref_ds
return ref_ds, zarray, zattrs


#d = {'version': 1,
# 'refs': {
# '.zgroup': '{"zarr_format":2}',
# '.zattrs': '{"Conventions":"CF-1.6","access-list":"grenvillelister simonwilson jeffcole","awarning":"**** THIS SUITE WILL ARCHIVE NON-DUPLEXED DATA TO MOOSE. FOR CRITICAL MODEL RUNS SWITCH TO DUPLEXED IN: postproc --> Post Processing - common settings --> Moose Archiving --> non_duplexed_set. Follow guidance in http:\\/\\/www-twiki\\/Main\\/MassNonDuplexPolicy","branch-date":"1950-01-01","calendar":"360_day","code-version":"UM 11.6, NEMO vn3.6","creation_time":"2022-10-28 12:28","decription":"Initialised from EN4 climatology","description":"Copy of u-ar696\\/trunk@77470","email":"r.k.schieman@reading.ac.uk","end-date":"2015-01-01","experiment-id":"historical","forcing":"AA,BC,CO2","forcing-info":"blah, blah, blah","institution":"NCAS","macro-parent-experiment-id":"historical","macro-parent-experiment-mip":"CMIP","macro-parent-variant-id":"r1i1p1f3","model-id":"HadGEM3-CG31-MM","name":"\\/work\\/n02\\/n02\\/grenvill\\/cylc-run\\/u-cn134\\/share\\/cycle\\/19500101T0000Z\\/3h_","owner":"rosalynhatcher","project":"Coupled Climate","timeStamp":"2022-Oct-28 12:20:33 GMT","title":"[CANARI] GC3.1 N216 ORCA025 UM11.6","uuid":"51e5ef20-d376-4aa6-938e-4c242886b7b1"}',
# 'lat/.zarray': '{"chunks":[324],"compressor":{"id":"zlib","level":1},"dtype":"<f4","fill_value":null,"filters":[{"elementsize":4,"id":"shuffle"}],"order":"C","shape":[324],"zarr_format":2}', 'lat/.zattrs': '{"_ARRAY_DIMENSIONS":["lat"],"axis":"Y","long_name":"Latitude","standard_name":"latitude","units":"degrees_north"}',
# 'lat/0': ['/home/david/Downloads/3h__19500101-19500110.nc', 26477, 560],
# 'lon/.zarray': '{"chunks":[432],"compressor":{"id":"zlib","level":1},"dtype":"<f4","fill_value":null,"filters":[{"elementsize":4,"id":"shuffle"}],"order":"C","shape":[432],"zarr_format":2}',
# 'lon/.zattrs': '{"_ARRAY_DIMENSIONS":["lon"],"axis":"X","long_name":"Longitude","standard_name":"longitude","units":"degrees_east"}',
# 'lon/0': ['/home/david/Downloads/3h__19500101-19500110.nc', 27037, 556],
# 'm01s00i507_10/.zarray': '{"chunks":[1,324,432],"compressor":{"id":"zlib","level":1},"dtype":"<f4","fill_value":-1073741824.0,"filters":[{"elementsize":4,"id":"shuffle"}],"order":"C","shape":[80,324,432],"zarr_format":2}',
# 'm01s00i507_10/.zattrs': '{"_ARRAY_DIMENSIONS":["time_counter","lat","lon"],"cell_methods":"time: mean (interval: 900 s)","coordinates":"time_centered","interval_offset":"0ts","interval_operation":"900 s","interval_write":"3 h","long_name":"OPEN SEA SURFACE TEMP AFTER TIMESTEP","missing_value":-1073741824.0,"online_operation":"average","standard_name":"surface_temperature","units":"K"}',
# }}
9 changes: 5 additions & 4 deletions tests/test_bigger_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def test_native_emac_model_fails(test_data_path):
"""
An example of netCDF file that doesn't work

The actual issue is with h5py - it can't read it (netCDF classic)
The actual issue is with h5py - it can't read it (netCDF3 classic)

h5py/_objects.pyx:54: in h5py._objects.with_phil.wrapper
???
Expand All @@ -175,8 +175,9 @@ def test_native_emac_model_fails(test_data_path):
pass

if USE_S3:
active = Active(uri, "aps_ave", utils.get_storage_type())
with pytest.raises(OSError):
active = Active(uri, "aps_ave", utils.get_storage_type())
active[...]
else:
active = Active(uri, "aps_ave")
active._version = 2
Expand Down Expand Up @@ -243,13 +244,13 @@ def test_daily_data_masked(test_data_path):
"""
ncfile = str(test_data_path / "daily_data_masked.nc")
uri = utils.write_to_storage(ncfile)
active = Active(uri, "ta", utils.get_storage_type(), missing_value=999.)
active = Active(uri, "ta", utils.get_storage_type())
active._version = 0
d = active[:]
d = np.ma.masked_where(d==999., d)
mean_result = np.ma.mean(d)

active = Active(uri, "ta", utils.get_storage_type(), missing_value=999.)
active = Active(uri, "ta", utils.get_storage_type())
active._version = 2
active.method = "mean"
active.components = True
Expand Down
Binary file modified tests/test_data/daily_data_masked.nc
Binary file not shown.
62 changes: 62 additions & 0 deletions tests/test_missing.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import tempfile
import unittest

import h5netcdf

from netCDF4 import Dataset

from activestorage.active import Active
Expand Down Expand Up @@ -240,3 +242,63 @@ def test_validrange(tmp_path):

np.testing.assert_array_equal(masked_numpy_mean, active_mean)
np.testing.assert_array_equal(no_active_mean, active_mean)


def test_active_mask_data(tmp_path):
testfile = str(tmp_path / 'test_partially_missing_data.nc')

# with valid min
r = dd.make_validmin_ncdata(testfile, valid_min=500)

# retrieve the actual numpy-ed result
actual_data = load_dataset(testfile)

# dataset variable
ds = h5netcdf.File(testfile, 'r', invalid_netcdf=True)
dsvar = ds["data"]

# test the function
data = Active._mask_data(None, actual_data, dsvar)
ds.close()

# with valid range
r = dd.make_validrange_ncdata(testfile, valid_range=[750., 850.])

# retrieve the actual numpy-ed result
actual_data = load_dataset(testfile)

# dataset variable
ds = h5netcdf.File(testfile, 'r', invalid_netcdf=True)
dsvar = ds["data"]

# test the function
data = Active._mask_data(None, actual_data, dsvar)
ds.close()

# with missing
r = dd.make_missing_ncdata(testfile)

# retrieve the actual numpy-ed result
actual_data = load_dataset(testfile)

# dataset variable
ds = h5netcdf.File(testfile, 'r', invalid_netcdf=True)
dsvar = ds["data"]

# test the function
data = Active._mask_data(None, actual_data, dsvar)
ds.close()

# with _FillValue
r = dd.make_fillvalue_ncdata(testfile)

# retrieve the actual numpy-ed result
actual_data = load_dataset(testfile)

# dataset variable
ds = h5netcdf.File(testfile, 'r', invalid_netcdf=True)
dsvar = ds["data"]

# test the function
data = Active._mask_data(None, actual_data, dsvar)
ds.close()
Loading
Loading