diff --git a/.github/workflows/test_s3_minio.yml b/.github/workflows/test_s3_minio.yml index cceb63d..1232d76 100644 --- a/.github/workflows/test_s3_minio.yml +++ b/.github/workflows/test_s3_minio.yml @@ -6,7 +6,8 @@ on: push: branches: - main # keep this at all times - - pyfive + # - pyfive # reinstate + - new_api_pyfive pull_request: schedule: - cron: '0 0 * * *' # nightly diff --git a/activestorage/active.py b/activestorage/active.py index 761cf3b..734321f 100644 --- a/activestorage/active.py +++ b/activestorage/active.py @@ -4,10 +4,12 @@ import pathlib import urllib import pyfive +import s3fs import time -from pyfive.h5d import StoreInfo -import s3fs +from pathlib import Path +from pyfive.h5d import StoreInfo +from typing import Optional from activestorage.config import * from activestorage import reductionist @@ -47,21 +49,6 @@ def load_from_s3(uri, storage_options=None): print(f"Dataset loaded from S3 with s3fs and Pyfive: {uri} ({t2-t1:.2},{t3-t2:.2})") return ds -def _metricise(method): - """ Decorator for class methods loads into metric_data""" - def timed(self, *args, **kw): - ts = time.time() - metric_name='' - if '__metric_name' in kw: - metric_name = kw['__metric_name'] - del kw['__metric_name'] - result = method(self,*args, **kw) - te = time.time() - if metric_name: - self.metric_data[metric_name] = te-ts - return result - return timed - def get_missing_attributes(ds): """" @@ -122,13 +109,13 @@ def __new__(cls, *args, **kwargs): def __init__( self, - uri, - ncvar, - storage_type=None, - max_threads=100, - storage_options=None, - active_storage_url=None - ): + dataset: Optional[str | Path | object] , + ncvar: str = None, + storage_type: str = None, + max_threads: int = 100, + storage_options: dict = None, + active_storage_url: str = None + ) -> None: """ Instantiate with a NetCDF4 dataset URI and the variable of interest within that file. (We need the variable, because we need variable specific metadata from within that @@ -138,27 +125,44 @@ def __init__( :param storage_options: s3fs.S3FileSystem options :param active_storage_url: Reductionist server URL """ - # Assume NetCDF4 for now - self.uri = uri - if self.uri is None: - raise ValueError(f"Must use a valid file for uri. Got {uri}") + self.ds = None + input_variable = False + if dataset is None: + raise ValueError(f"Must use a valid file name or variable object for dataset. Got {dataset!r}") + if isinstance(dataset, Path) and not dataset.exists(): + raise ValueError(f"Path to input file {dataset!r} does not exist.") + if not isinstance(dataset, Path) and not isinstance(dataset, str): + print(f"Treating input {dataset} as variable object.") + if not type(dataset) is pyfive.high_level.Dataset: + raise TypeError(f"Variable object dataset can only be pyfive.high_level.Dataset. Got {dataset!r}") + input_variable = True + self.ds = dataset + self.uri = dataset + # still allow for a passable storage_type # for special cases eg "special-POSIX" ie DDN if not storage_type and storage_options is not None: - storage_type = urllib.parse.urlparse(uri).scheme + storage_type = urllib.parse.urlparse(dataset).scheme self.storage_type = storage_type + # set correct filename attr + if input_variable and not self.storage_type: + self.filename = self.ds + elif input_variable and self.storage_type == "s3": + self.filename = self.ds.id._filename + # get storage_options self.storage_options = storage_options self.active_storage_url = active_storage_url # basic check on file - if not os.path.isfile(self.uri) and not self.storage_type: - raise ValueError(f"Must use existing file for uri. {self.uri} not found") + if not input_variable: + if not os.path.isfile(self.uri) and not self.storage_type: + raise ValueError(f"Must use existing file for uri. {self.uri} not found") self.ncvar = ncvar - if self.ncvar is None: + if self.ncvar is None and not input_variable: raise ValueError("Must set a netCDF variable name to slice") self._version = 1 @@ -166,22 +170,24 @@ def __init__( self._method = None self._max_threads = max_threads self.missing = None - self.ds = None - self.metric_data = {} self.data_read = 0 - @_metricise def __load_nc_file(self): - """ Get the netcdf file and it's b-tree""" + """ + Get the netcdf file and its b-tree. + + This private method is used only if the input to Active + is not a pyfive.high_level.Dataset object. In that case, + any file opening is skipped, and ncvar is not used. The + Dataset object will have already contained the b-tree, + and `_filename` attribute. + """ ncvar = self.ncvar - # in all cases we need an open netcdf file to get at attributes - # we keep it open because we need it's b-tree if self.storage_type is None: nc = pyfive.File(self.uri) elif self.storage_type == "s3": nc = load_from_s3(self.uri, self.storage_options) self.filename = self.uri - self.ds = nc[ncvar] def __get_missing_attributes(self): @@ -194,10 +200,8 @@ def __getitem__(self, index): Provides support for a standard get item. #FIXME-BNL: Why is the argument index? """ - self.metric_data = {} if self.ds is None: - self.__load_nc_file(__metric_name='load nc time') - #self.__metricise('Load','__load_nc_file') + self.__load_nc_file() self.missing = self.__get_missing_attributes() @@ -206,21 +210,20 @@ def __getitem__(self, index): if self.method is None and self._version == 0: # No active operation - return self._get_vanilla(index, __metric_name='vanilla_time') + return self._get_vanilla(index) elif self._version == 1: #FIXME: is the difference between version 1 and 2 still honoured? - return self._get_selection(index, __metric_name='selection 1 time (s)') + return self._get_selection(index) elif self._version == 2: - return self._get_selection(index, __metric_name='selection 2 time (s)') + return self._get_selection(index) else: raise ValueError(f'Version {self._version} not supported') - @_metricise def _get_vanilla(self, index): """ Get the data without any active operation @@ -294,7 +297,7 @@ def _get_active(self, method, *args): """ raise NotImplementedError - @_metricise + def _get_selection(self, *args): """ At this point we have a Dataset object, but all the important information about @@ -307,13 +310,8 @@ def _get_selection(self, *args): name = self.ds.name dtype = np.dtype(self.ds.dtype) - # hopefully fix pyfive to get a dtype directly array = pyfive.indexing.ZarrArrayStub(self.ds.shape, self.ds.chunks) ds = self.ds.id - - self.metric_data['args'] = args - self.metric_data['dataset shape'] = self.ds.shape - self.metric_data['dataset chunks'] = self.ds.chunks if ds.filter_pipeline is None: compressor, filters = None, None else: @@ -359,16 +357,6 @@ def _from_storage(self, ds, indexer, chunks, out_shape, out_dtype, compressor, f session = None # Process storage chunks using a thread pool. - # Because we do this, we need to read the dataset b-tree now, not as we go, so - # it is already in cache. If we remove the thread pool from here, we probably - # wouldn't need to do it before the first one. - - if ds.chunks is not None: - t1 = time.time() - # ds._get_chunk_addresses() - t2 = time.time() - t1 - self.metric_data['indexing time (s)'] = t2 - # self.metric_data['chunk number'] = len(ds._zchunk_index) chunk_count = 0 t1 = time.time() with concurrent.futures.ThreadPoolExecutor(max_workers=self._max_threads) as executor: @@ -433,10 +421,6 @@ def _from_storage(self, ds, indexer, chunks, out_shape, out_dtype, compressor, f # size. out = out / np.sum(counts).reshape(shape1) - t2 = time.time() - self.metric_data['reduction time (s)'] = t2-t1 - self.metric_data['chunks processed'] = chunk_count - self.metric_data['storage read (B)'] = self.data_read return out def _get_endpoint_url(self): diff --git a/activestorage/storage.py b/activestorage/storage.py index 80a575b..56a1ff8 100644 --- a/activestorage/storage.py +++ b/activestorage/storage.py @@ -1,8 +1,10 @@ """Active storage module.""" import numpy as np +import pyfive from numcodecs.compat import ensure_ndarray + def reduce_chunk(rfile, offset, size, compression, filters, missing, dtype, shape, order, chunk_selection, method=None): @@ -25,20 +27,39 @@ def reduce_chunk(rfile, storage implementations we'll change to controlled vocabulary) """ - - #FIXME: for the moment, open the file every time ... we might want to do that, or not - with open(rfile,'rb') as open_file: - # get the data - chunk = read_block(open_file, offset, size) - # reverse any compression and filters - chunk = filter_pipeline(chunk, compression, filters) - # make it a numpy array of bytes - chunk = ensure_ndarray(chunk) - # convert to the appropriate data type - chunk = chunk.view(dtype) - # sort out ordering and convert to the parent hyperslab dimensions - chunk = chunk.reshape(-1, order='A') - chunk = chunk.reshape(shape, order=order) + obj_type = type(rfile) + print(f"Reducing chunk of object {obj_type}") + + if not obj_type is pyfive.high_level.Dataset: + #FIXME: for the moment, open the file every time ... we might want to do that, or not + # we could just use an instance of pyfive.high_level.Dataset.id + # passed directly from active.py, as below + with open(rfile,'rb') as open_file: + # get the data + chunk = read_block(open_file, offset, size) + # reverse any compression and filters + chunk = filter_pipeline(chunk, compression, filters) + # make it a numpy array of bytes + chunk = ensure_ndarray(chunk) + # convert to the appropriate data type + chunk = chunk.view(dtype) + # sort out ordering and convert to the parent hyperslab dimensions + chunk = chunk.reshape(-1, order='A') + chunk = chunk.reshape(shape, order=order) + else: + class storeinfo: pass + storeinfo.byte_offset = offset + storeinfo.size = size + chunk = rfile.id._get_raw_chunk(storeinfo) + # reverse any compression and filters + chunk = filter_pipeline(chunk, compression, filters) + # make it a numpy array of bytes + chunk = ensure_ndarray(chunk) + # convert to the appropriate data type + chunk = chunk.view(dtype) + # sort out ordering and convert to the parent hyperslab dimensions + chunk = chunk.reshape(-1, order='A') + chunk = chunk.reshape(shape, order=order) tmp = chunk[chunk_selection] if method: diff --git a/tests/test_real_s3.py b/tests/test_real_s3.py new file mode 100644 index 0000000..2a3f0d5 --- /dev/null +++ b/tests/test_real_s3.py @@ -0,0 +1,42 @@ +import os +import numpy as np + +from activestorage.active import Active +from activestorage.active import load_from_s3 + +S3_BUCKET = "bnl" + +# TODO Remove after full testing and right before deployment +def test_s3_dataset(): + """Run somewhat as the 'gold' test.""" + storage_options = { + 'key': "f2d55c6dcfc7618b2c34e00b58df3cef", + 'secret': "$/'#M{0{/4rVhp%n^(XeX$q@y#&(NM3W1->~N.Q6VP.5[@bLpi='nt]AfH)>78pT", + 'client_kwargs': {'endpoint_url': "https://uor-aces-o.s3-ext.jc.rl.ac.uk"}, # old proxy + # 'client_kwargs': {'endpoint_url': "https://uor-aces-o.ext.proxy.jc.rl.ac.uk"}, # new proxy + } + active_storage_url = "https://192.171.169.113:8080" + # bigger_file = "ch330a.pc19790301-bnl.nc" # 18GB 3400 HDF5 chunks + bigger_file = "ch330a.pc19790301-def.nc" # 17GB 64 HDF5 chunks + # bigger_file = "da193a_25_day__198808-198808.nc" # 3GB 30 HDF5 chunks + + test_file_uri = os.path.join( + S3_BUCKET, + bigger_file + ) + print("S3 Test file path:", test_file_uri) + dataset = load_from_s3(test_file_uri, storage_options=storage_options) + av = dataset['UM_m01s16i202_vn1106'] + + # big file bnl: 18GB/3400 HDF5 chunks; def: 17GB/64 HDF5 chunks + active = Active(av, storage_type="s3", + storage_options=storage_options, + active_storage_url=active_storage_url) + active._version = 2 + active._method = "min" + + # result = active[:] + result = active[0:3, 4:6, 7:9] # standardized slice + + print("Result is", result) + assert result == 5098.625 diff --git a/tests/unit/test_active.py b/tests/unit/test_active.py index 25f988c..1e6fd80 100644 --- a/tests/unit/test_active.py +++ b/tests/unit/test_active.py @@ -1,5 +1,6 @@ import os import numpy as np +import pyfive import pytest import threading @@ -8,13 +9,14 @@ from activestorage.config import * from botocore.exceptions import EndpointConnectionError as botoExc from botocore.exceptions import NoCredentialsError as NoCredsExc +from netCDF4 import Dataset def test_uri_none(): """Unit test for class:Active.""" # test invalid uri some_file = None - expected = "Must use a valid file for uri. Got None" + expected = "Must use a valid file name or variable object for dataset. Got None" with pytest.raises(ValueError) as exc: active = Active(some_file, ncvar="") assert str(exc.value) == expected @@ -78,7 +80,31 @@ def test_active(): uri = "tests/test_data/cesm2_native.nc" ncvar = "TREFHT" active = Active(uri, ncvar=ncvar) - init = active.__init__(uri=uri, ncvar=ncvar) + init = active.__init__(dataset=uri, ncvar=ncvar) + + +def test_activevariable_netCDF4(): + uri = "tests/test_data/cesm2_native.nc" + ncvar = "TREFHT" + ds = Dataset(uri)[ncvar] + exc_str = "Variable object dataset can only be pyfive.high_level.Dataset" + with pytest.raises(TypeError) as exc: + av = Active(ds) + assert exc_str in str(exc) + + +def test_activevariable_pyfive(): + uri = "tests/test_data/cesm2_native.nc" + ncvar = "TREFHT" + ds = pyfive.File(uri)[ncvar] + av = Active(ds) + av._method = "min" + assert av.method([3,444]) == 3 + av_slice_min = av[3:5] + assert av_slice_min == np.array(258.62814, dtype="float32") + # test with Numpy + np_slice_min = np.min(ds[3:5]) + assert av_slice_min == np_slice_min @pytest.mark.xfail(reason="We don't employ locks with Pyfive anymore, yet.") diff --git a/tests/unit/test_mock_s3.py b/tests/unit/test_mock_s3.py index 63adf8d..9aea9a4 100644 --- a/tests/unit/test_mock_s3.py +++ b/tests/unit/test_mock_s3.py @@ -1,11 +1,13 @@ import os import s3fs import pathlib +import pyfive import pytest import h5netcdf +import numpy as np from tempfile import NamedTemporaryFile -from activestorage.active import load_from_s3 +from activestorage.active import load_from_s3, Active # needed by the spoofed s3 filesystem @@ -133,7 +135,7 @@ def test_s3file_with_s3fs(s3fs_s3): anon=False, version_aware=True, client_kwargs={"endpoint_url": endpoint_uri} ) - # test load by h5netcdf + # test load by standard h5netcdf with s3.open(os.path.join("MY_BUCKET", file_name), "rb") as f: print("File path", f.path) ncfile = h5netcdf.File(f, 'r', invalid_netcdf=True) @@ -141,9 +143,21 @@ def test_s3file_with_s3fs(s3fs_s3): print(ncfile["ta"]) assert "ta" in ncfile - # test Active + # test active.load_from_s3 storage_options = dict(anon=False, version_aware=True, client_kwargs={"endpoint_url": endpoint_uri}) with load_from_s3(os.path.join("MY_BUCKET", file_name), storage_options) as ac_file: print(ac_file) assert "ta" in ac_file + + # test loading with Pyfive and passing the Dataset to Active + with s3.open(os.path.join("MY_BUCKET", file_name), "rb") as f: + print("File path", f.path) + pie_ds = pyfive.File(f, 'r') + print("File loaded from spoof S3 with Pyfive:", pie_ds) + print("Pyfive dataset:", pie_ds["ta"]) + av = Active(pie_ds["ta"]) + av._method = "min" + assert av.method([3,444]) == 3 + av_slice_min = av[3:5] + assert av_slice_min == np.array(249.6583, dtype="float32")