Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get missing values and dtype without re-opening the dataset #180

Merged
merged 15 commits into from
Feb 12, 2024
152 changes: 78 additions & 74 deletions activestorage/active.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
}
return instance

def __init__(self, uri, ncvar, storage_type=None, missing_value=None, _FillValue=None, valid_min=None, valid_max=None, max_threads=100):
def __init__(self, uri, ncvar, storage_type=None, max_threads=100):
"""
Instantiate with a NetCDF4 dataset and the variable of interest within that file.
(We need the variable, because we need variable specific metadata from within that
Expand All @@ -92,65 +92,6 @@
self._method = None
self._lock = False
self._max_threads = max_threads

# obtain metadata, using netcdf4_python for now
# FIXME: There is an outstanding issue with ._FilLValue to be handled.
# If the user actually wrote the data with no fill value, or the
# default fill value is in play, then this might go wrong.
if storage_type is None:
ds = Dataset(uri)
elif storage_type == "s3":
with load_from_s3(uri) as _ds:
ds = _ds
try:
ds_var = ds[ncvar]
except IndexError as exc:
print(f"Dataset {ds} does not contain ncvar {ncvar!r}.")
raise exc

# FIXME: We do not get the correct byte order on the Zarr Array's dtype
# when using S3, so capture it here.
self._dtype = ds_var.dtype

if (missing_value, _FillValue, valid_min, valid_max) == (None, None, None, None):
if isinstance(ds, Dataset):
self._missing = getattr(ds_var, 'missing_value', None)
self._fillvalue = getattr(ds_var, '_FillValue', None)
# could be fill_value set as netCDF4 attr
if self._fillvalue is None:
self._fillvalue = getattr(ds_var, 'fill_value', None)
valid_min = getattr(ds_var, 'valid_min', None)
valid_max = getattr(ds_var, 'valid_max', None)
valid_range = getattr(ds_var, 'valid_range', None)
elif storage_type == "s3":
self._missing = ds_var.attrs.get('missing_value')
self._fillvalue = ds_var.attrs.get('_FillValue')
# could be fill_value set as netCDF4 attr
if self._fillvalue is None:
self._fillvalue = ds_var.attrs.get('fill_value')
valid_min = ds_var.attrs.get('valid_min')
valid_max = ds_var.attrs.get('valid_max')
valid_range = ds_var.attrs.get('valid_range')

if valid_max is not None or valid_min is not None:
if valid_range is not None:
raise ValueError(
"Invalid combination in the file of valid_min, "
"valid_max, valid_range: "
f"{valid_min}, {valid_max}, {valid_range}"
)
valid_range = (valid_min, valid_max)
elif valid_range is None:
valid_range = (None, None)
self._valid_min, self._valid_max = valid_range

else:
self._missing = missing_value
self._fillvalue = _FillValue
self._valid_min = valid_min
self._valid_max = valid_max

ds.close()

def __getitem__(self, index):
"""
Expand All @@ -174,22 +115,16 @@
elif self.storage_type == "s3":
with load_from_s3(self.uri) as nc:
data = nc[ncvar][index]
# h5netcdf doesn't return masked arrays.
if self._fillvalue:
data = np.ma.masked_equal(data, self._fillvalue)
if self._missing:
data = np.ma.masked_equal(data, self._missing)
if self._valid_max:
data = np.ma.masked_greater(data, self._valid_max)
if self._valid_min:
data = np.ma.masked_less(data, self._valid_min)
data = self._mask_data(data, nc[ncvar])

if lock:
lock.release()

return data

elif self._version == 1:
return self._via_kerchunk(index)

elif self._version == 2:
# No active operation either
lock = self.lock
Expand All @@ -202,6 +137,7 @@
lock.release()

return data

else:
raise ValueError(f'Version {self._version} not supported')

Expand Down Expand Up @@ -299,14 +235,27 @@
if self.zds is None:
print(f"Kerchunking file {self.uri} with variable "
f"{self.ncvar} for storage type {self.storage_type}")
ds = nz.load_netcdf_zarr_generic(self.uri,
self.ncvar,
self.storage_type)
ds, zarray, zattrs = nz.load_netcdf_zarr_generic(
self.uri,
self.ncvar,
self.storage_type
)
# The following is a hangove from exploration
# and is needed if using the original doing it ourselves
# self.zds = make_an_array_instance_active(ds)
self.zds = ds

# Retain attributes and other information
if zarray.get('fill_value') is not None:
zattrs['_FillValue'] = zarray['fill_value']

self.zarray = zarray
self.zattrs = zattrs

# FIXME: We do not get the correct byte order on the Zarr
# Array's dtype when using S3, so capture it here.
self._dtype = np.dtype(zarray['dtype'])

return self._get_selection(index)

def _get_selection(self, *args):
Expand All @@ -319,7 +268,28 @@
compressor = self.zds._compressor
filters = self.zds._filters

missing = self._fillvalue, self._missing, self._valid_min, self._valid_max
# Get missing values
_FillValue = self.zattrs.get('_FillValue')
missing_value = self.zattrs.get('missing_value')
valid_min = self.zattrs.get('valid_min')
valid_max = self.zattrs.get('valid_max')
valid_range = self.zattrs.get('valid_range')
if valid_max is not None or valid_min is not None:
if valid_range is not None:
raise ValueError(

Check warning on line 279 in activestorage/active.py

View check run for this annotation

Codecov / codecov/patch

activestorage/active.py#L279

Added line #L279 was not covered by tests
"Invalid combination in the file of valid_min, "
"valid_max, valid_range: "
f"{valid_min}, {valid_max}, {valid_range}"
)
elif valid_range:
valid_min, valid_max = valid_range

missing = (
_FillValue,
missing_value,
valid_min,
valid_max,
)

indexer = OrthogonalIndexer(*args, self.zds)
out_shape = indexer.shape
Expand Down Expand Up @@ -468,3 +438,37 @@
if drop_axes:
tmp = np.squeeze(tmp, axis=drop_axes)
return tmp, out_selection

def _mask_data(self, data, ds_var):
"""ppp"""
# TODO: replace with cfdm.NetCDFIndexer, hopefully.
attrs = ds_var.attrs
missing_value = attrs.get('missing_value')
_FillValue = attrs.get('_FillValue')
valid_min = attrs.get('valid_min')
valid_max = attrs.get('valid_max')
valid_range = attrs.get('valid_range')

if valid_max is not None or valid_min is not None:
if valid_range is not None:
raise ValueError(

Check warning on line 454 in activestorage/active.py

View check run for this annotation

Codecov / codecov/patch

activestorage/active.py#L453-L454

Added lines #L453 - L454 were not covered by tests
"Invalid combination in the file of valid_min, "
"valid_max, valid_range: "
f"{valid_min}, {valid_max}, {valid_range}"
)
elif valid_range:
valid_min, valid_max = valid_range

Check warning on line 460 in activestorage/active.py

View check run for this annotation

Codecov / codecov/patch

activestorage/active.py#L460

Added line #L460 was not covered by tests

if _FillValue is not None:
data = np.ma.masked_equal(data, fillvalue)

Check warning on line 463 in activestorage/active.py

View check run for this annotation

Codecov / codecov/patch

activestorage/active.py#L463

Added line #L463 was not covered by tests

if missing_value is not None:
data = np.ma.masked_equal(data, missing)

Check warning on line 466 in activestorage/active.py

View check run for this annotation

Codecov / codecov/patch

activestorage/active.py#L466

Added line #L466 was not covered by tests

if valid_max is not None:
data = np.ma.masked_greater(data, valid_max)

Check warning on line 469 in activestorage/active.py

View check run for this annotation

Codecov / codecov/patch

activestorage/active.py#L469

Added line #L469 was not covered by tests

if valid_min is not None:
data = np.ma.masked_less(data, valid_min)

Check warning on line 472 in activestorage/active.py

View check run for this annotation

Codecov / codecov/patch

activestorage/active.py#L472

Added line #L472 was not covered by tests

return data
34 changes: 27 additions & 7 deletions activestorage/netcdf_to_zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from kerchunk.hdf import SingleHdf5ToZarr


def gen_json(file_url, outf, storage_type):
def gen_json(file_url, varname, outf, storage_type):
"""Generate a json file that contains the kerchunk-ed data for Zarr."""
if storage_type == "s3":
fs = s3fs.S3FileSystem(key=S3_ACCESS_KEY,
Expand All @@ -24,7 +24,8 @@
h5chunks = SingleHdf5ToZarr(s3file, file_url,
inline_threshold=0)
with fs2.open(outf, 'wb') as f:
f.write(ujson.dumps(h5chunks.translate()).encode())
content = h5chunks.translate()
f.write(ujson.dumps(content).encode())

Check warning on line 28 in activestorage/netcdf_to_zarr.py

View check run for this annotation

Codecov / codecov/patch

activestorage/netcdf_to_zarr.py#L27-L28

Added lines #L27 - L28 were not covered by tests
else:
fs = fsspec.filesystem('')
with fs.open(file_url, 'rb') as local_file:
Expand All @@ -43,9 +44,13 @@
# faster loading time
# for active storage, we don't want anything inline
with fs.open(outf, 'wb') as f:
f.write(ujson.dumps(h5chunks.translate()).encode())
content = h5chunks.translate()
f.write(ujson.dumps(content).encode())

return outf
zarray = ujson.loads(content['refs'][f"{varname}/.zarray"])
zattrs = ujson.loads(content['refs'][f"{varname}/.zattrs"])

return outf, zarray, zattrs


def open_zarr_group(out_json, varname):
Expand All @@ -60,14 +65,15 @@
mapper = fs.get_mapper("") # local FS mapper
#mapper.fs.reference has the kerchunk mapping, how does this propagate into the Zarr array?
zarr_group = zarr.open_group(mapper)

try:
zarr_array = getattr(zarr_group, varname)
except AttributeError as attrerr:
print(f"Zarr Group does not contain variable {varname}. "
f"Zarr Group info: {zarr_group.info}")
raise attrerr
#print("Zarr array info:", zarr_array.info)

return zarr_array


Expand All @@ -77,10 +83,24 @@

# Write the Zarr group JSON to a temporary file.
with tempfile.NamedTemporaryFile() as out_json:
gen_json(fileloc, out_json.name, storage_type)
_, zarray, zattrs = gen_json(fileloc, varname, out_json.name, storage_type)

# open this monster
print(f"Attempting to open and convert {fileloc}.")
ref_ds = open_zarr_group(out_json.name, varname)

return ref_ds
return ref_ds, zarray, zattrs


#d = {'version': 1,
# 'refs': {
# '.zgroup': '{"zarr_format":2}',
# '.zattrs': '{"Conventions":"CF-1.6","access-list":"grenvillelister simonwilson jeffcole","awarning":"**** THIS SUITE WILL ARCHIVE NON-DUPLEXED DATA TO MOOSE. FOR CRITICAL MODEL RUNS SWITCH TO DUPLEXED IN: postproc --> Post Processing - common settings --> Moose Archiving --> non_duplexed_set. Follow guidance in http:\\/\\/www-twiki\\/Main\\/MassNonDuplexPolicy","branch-date":"1950-01-01","calendar":"360_day","code-version":"UM 11.6, NEMO vn3.6","creation_time":"2022-10-28 12:28","decription":"Initialised from EN4 climatology","description":"Copy of u-ar696\\/trunk@77470","email":"r.k.schieman@reading.ac.uk","end-date":"2015-01-01","experiment-id":"historical","forcing":"AA,BC,CO2","forcing-info":"blah, blah, blah","institution":"NCAS","macro-parent-experiment-id":"historical","macro-parent-experiment-mip":"CMIP","macro-parent-variant-id":"r1i1p1f3","model-id":"HadGEM3-CG31-MM","name":"\\/work\\/n02\\/n02\\/grenvill\\/cylc-run\\/u-cn134\\/share\\/cycle\\/19500101T0000Z\\/3h_","owner":"rosalynhatcher","project":"Coupled Climate","timeStamp":"2022-Oct-28 12:20:33 GMT","title":"[CANARI] GC3.1 N216 ORCA025 UM11.6","uuid":"51e5ef20-d376-4aa6-938e-4c242886b7b1"}',
# 'lat/.zarray': '{"chunks":[324],"compressor":{"id":"zlib","level":1},"dtype":"<f4","fill_value":null,"filters":[{"elementsize":4,"id":"shuffle"}],"order":"C","shape":[324],"zarr_format":2}', 'lat/.zattrs': '{"_ARRAY_DIMENSIONS":["lat"],"axis":"Y","long_name":"Latitude","standard_name":"latitude","units":"degrees_north"}',
# 'lat/0': ['/home/david/Downloads/3h__19500101-19500110.nc', 26477, 560],
# 'lon/.zarray': '{"chunks":[432],"compressor":{"id":"zlib","level":1},"dtype":"<f4","fill_value":null,"filters":[{"elementsize":4,"id":"shuffle"}],"order":"C","shape":[432],"zarr_format":2}',
# 'lon/.zattrs': '{"_ARRAY_DIMENSIONS":["lon"],"axis":"X","long_name":"Longitude","standard_name":"longitude","units":"degrees_east"}',
# 'lon/0': ['/home/david/Downloads/3h__19500101-19500110.nc', 27037, 556],
# 'm01s00i507_10/.zarray': '{"chunks":[1,324,432],"compressor":{"id":"zlib","level":1},"dtype":"<f4","fill_value":-1073741824.0,"filters":[{"elementsize":4,"id":"shuffle"}],"order":"C","shape":[80,324,432],"zarr_format":2}',
# 'm01s00i507_10/.zattrs': '{"_ARRAY_DIMENSIONS":["time_counter","lat","lon"],"cell_methods":"time: mean (interval: 900 s)","coordinates":"time_centered","interval_offset":"0ts","interval_operation":"900 s","interval_write":"3 h","long_name":"OPEN SEA SURFACE TEMP AFTER TIMESTEP","missing_value":-1073741824.0,"online_operation":"average","standard_name":"surface_temperature","units":"K"}',
# }}
4 changes: 2 additions & 2 deletions tests/test_bigger_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,13 +243,13 @@ def test_daily_data_masked(test_data_path):
"""
ncfile = str(test_data_path / "daily_data_masked.nc")
uri = utils.write_to_storage(ncfile)
active = Active(uri, "ta", utils.get_storage_type(), missing_value=999.)
active = Active(uri, "ta", utils.get_storage_type())
active._version = 0
d = active[:]
d = np.ma.masked_where(d==999., d)
mean_result = np.ma.mean(d)

active = Active(uri, "ta", utils.get_storage_type(), missing_value=999.)
active = Active(uri, "ta", utils.get_storage_type())
active._version = 2
active.method = "mean"
active.components = True
Expand Down
Binary file modified tests/test_data/daily_data_masked.nc
Binary file not shown.
11 changes: 1 addition & 10 deletions tests/unit/test_active.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,6 @@ def test_getitem():
active = Active(uri, ncvar=None)
assert str(exc.value) == "Must set a netCDF variable name to slice"

# unopenable file
ncvar = "tas"
baseexc = "tas not found in /"
with pytest.raises(IndexError) as exc:
active = Active(uri, ncvar=ncvar)
assert baseexc in str(exc.value)

# openable file and correct variable
uri = "tests/test_data/cesm2_native.nc"
ncvar = "TREFHT"
Expand Down Expand Up @@ -81,9 +74,7 @@ def test_active():
uri = "tests/test_data/cesm2_native.nc"
ncvar = "TREFHT"
active = Active(uri, ncvar=ncvar)
init = active.__init__(uri=uri, ncvar=ncvar, missing_value=True,
_FillValue=1e20, valid_min=-1,
valid_max=1200)
init = active.__init__(uri=uri, ncvar=ncvar)


def test_lock():
Expand Down
5 changes: 4 additions & 1 deletion tests/unit/test_storage_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ def reduce_chunk(

assert result == 999.0

mock_load.assert_called_once_with(uri)
# S3 loading is not done from Active anymore
# mock_load.assert_called_once_with(uri)

mock_nz.assert_called_once_with(uri, "data", "s3")
# NOTE: This gets called multiple times with various arguments. Match on
# the common ones.
Expand Down Expand Up @@ -134,6 +136,7 @@ def load_from_s3(uri):
assert np.max(result) == 999.0


@pytest.mark.skip(reason="No more valid file load in Active")
@mock.patch.object(activestorage.active, "load_from_s3")
def test_s3_load_failure(mock_load):
"""Test when an S3 object doesn't exist."""
Expand Down
Loading