Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improving API: part 1: functionality for input pyfive.high_level.Dataset #241

Merged
merged 30 commits into from
Feb 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
97d6360
start api changes
valeriupredoi Feb 25, 2025
d4f0588
start api changes
valeriupredoi Feb 25, 2025
3b6f0ce
set structure
valeriupredoi Feb 26, 2025
bf0c3fd
add test
valeriupredoi Feb 26, 2025
35a29d5
actual test with pyfive variable
valeriupredoi Feb 26, 2025
28d97e5
actual test with pyfive variable
valeriupredoi Feb 26, 2025
25668bd
clean up
valeriupredoi Feb 26, 2025
8cdd4cb
clean up and add bits
valeriupredoi Feb 26, 2025
a2f41ab
add chunking test case
valeriupredoi Feb 26, 2025
e471dee
add exception if not pyfive dataset
valeriupredoi Feb 27, 2025
620e2b6
test for that exception
valeriupredoi Feb 27, 2025
dca443f
start reduce chunk with correct syntax
valeriupredoi Feb 27, 2025
1bd40f5
it bloody works
valeriupredoi Feb 27, 2025
0bdc310
it bloody works
valeriupredoi Feb 27, 2025
9f46ab9
add inline comment
valeriupredoi Feb 27, 2025
0c562e1
correct handling for s3
valeriupredoi Feb 27, 2025
74f0c26
run s3 tests
valeriupredoi Feb 27, 2025
51e27ca
add real world s3 dataset test
valeriupredoi Feb 27, 2025
2499066
add note to test
valeriupredoi Feb 27, 2025
c495019
remove leftover
valeriupredoi Feb 27, 2025
ad3fb54
unused import
valeriupredoi Feb 27, 2025
9bf31bd
unused return
valeriupredoi Feb 27, 2025
61cf5dd
add correct function docstring
valeriupredoi Feb 28, 2025
4cdeb1b
removed obsolete inline
valeriupredoi Feb 28, 2025
9aca259
remove obsolete inline
valeriupredoi Feb 28, 2025
2507fe4
Update activestorage/active.py
valeriupredoi Feb 28, 2025
17684aa
Update activestorage/active.py
valeriupredoi Feb 28, 2025
b80862d
Update activestorage/active.py
valeriupredoi Feb 28, 2025
ebd54e8
fix test
valeriupredoi Feb 28, 2025
050bc8f
test mock s3 dataset
valeriupredoi Feb 28, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/test_s3_minio.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ on:
push:
branches:
- main # keep this at all times
- pyfive
# - pyfive # reinstate
- new_api_pyfive
pull_request:
schedule:
- cron: '0 0 * * *' # nightly
Expand Down
116 changes: 50 additions & 66 deletions activestorage/active.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
import pathlib
import urllib
import pyfive
import s3fs
import time
from pyfive.h5d import StoreInfo

import s3fs
from pathlib import Path
from pyfive.h5d import StoreInfo
from typing import Optional

from activestorage.config import *
from activestorage import reductionist
Expand Down Expand Up @@ -47,21 +49,6 @@ def load_from_s3(uri, storage_options=None):
print(f"Dataset loaded from S3 with s3fs and Pyfive: {uri} ({t2-t1:.2},{t3-t2:.2})")
return ds

def _metricise(method):
""" Decorator for class methods loads into metric_data"""
def timed(self, *args, **kw):
ts = time.time()
metric_name=''
if '__metric_name' in kw:
metric_name = kw['__metric_name']
del kw['__metric_name']
result = method(self,*args, **kw)
te = time.time()
if metric_name:
self.metric_data[metric_name] = te-ts
return result
return timed


def get_missing_attributes(ds):
""""
Expand Down Expand Up @@ -122,13 +109,13 @@ def __new__(cls, *args, **kwargs):

def __init__(
self,
uri,
ncvar,
storage_type=None,
max_threads=100,
storage_options=None,
active_storage_url=None
):
dataset: Optional[str | Path | object] ,
ncvar: str = None,
storage_type: str = None,
max_threads: int = 100,
storage_options: dict = None,
active_storage_url: str = None
) -> None:
"""
Instantiate with a NetCDF4 dataset URI and the variable of interest within that file.
(We need the variable, because we need variable specific metadata from within that
Expand All @@ -138,50 +125,69 @@ def __init__(
:param storage_options: s3fs.S3FileSystem options
:param active_storage_url: Reductionist server URL
"""
# Assume NetCDF4 for now
self.uri = uri
if self.uri is None:
raise ValueError(f"Must use a valid file for uri. Got {uri}")
self.ds = None
input_variable = False
if dataset is None:
raise ValueError(f"Must use a valid file name or variable object for dataset. Got {dataset!r}")
if isinstance(dataset, Path) and not dataset.exists():
raise ValueError(f"Path to input file {dataset!r} does not exist.")
if not isinstance(dataset, Path) and not isinstance(dataset, str):
print(f"Treating input {dataset} as variable object.")
if not type(dataset) is pyfive.high_level.Dataset:
raise TypeError(f"Variable object dataset can only be pyfive.high_level.Dataset. Got {dataset!r}")
input_variable = True
self.ds = dataset
self.uri = dataset


# still allow for a passable storage_type
# for special cases eg "special-POSIX" ie DDN
if not storage_type and storage_options is not None:
storage_type = urllib.parse.urlparse(uri).scheme
storage_type = urllib.parse.urlparse(dataset).scheme
self.storage_type = storage_type

# set correct filename attr
if input_variable and not self.storage_type:
self.filename = self.ds
elif input_variable and self.storage_type == "s3":
self.filename = self.ds.id._filename

# get storage_options
self.storage_options = storage_options
self.active_storage_url = active_storage_url

# basic check on file
if not os.path.isfile(self.uri) and not self.storage_type:
raise ValueError(f"Must use existing file for uri. {self.uri} not found")
if not input_variable:
if not os.path.isfile(self.uri) and not self.storage_type:
raise ValueError(f"Must use existing file for uri. {self.uri} not found")

self.ncvar = ncvar
if self.ncvar is None:
if self.ncvar is None and not input_variable:
raise ValueError("Must set a netCDF variable name to slice")

self._version = 1
self._components = False
self._method = None
self._max_threads = max_threads
self.missing = None
self.ds = None
self.metric_data = {}
self.data_read = 0

@_metricise
def __load_nc_file(self):
""" Get the netcdf file and it's b-tree"""
"""
Get the netcdf file and its b-tree.

This private method is used only if the input to Active
is not a pyfive.high_level.Dataset object. In that case,
any file opening is skipped, and ncvar is not used. The
Dataset object will have already contained the b-tree,
and `_filename` attribute.
"""
ncvar = self.ncvar
# in all cases we need an open netcdf file to get at attributes
# we keep it open because we need it's b-tree
if self.storage_type is None:
nc = pyfive.File(self.uri)
elif self.storage_type == "s3":
nc = load_from_s3(self.uri, self.storage_options)
self.filename = self.uri

self.ds = nc[ncvar]

def __get_missing_attributes(self):
Expand All @@ -194,10 +200,8 @@ def __getitem__(self, index):
Provides support for a standard get item.
#FIXME-BNL: Why is the argument index?
"""
self.metric_data = {}
if self.ds is None:
self.__load_nc_file(__metric_name='load nc time')
#self.__metricise('Load','__load_nc_file')
self.__load_nc_file()

self.missing = self.__get_missing_attributes()

Expand All @@ -206,21 +210,20 @@ def __getitem__(self, index):
if self.method is None and self._version == 0:

# No active operation
return self._get_vanilla(index, __metric_name='vanilla_time')
return self._get_vanilla(index)

elif self._version == 1:

#FIXME: is the difference between version 1 and 2 still honoured?
return self._get_selection(index, __metric_name='selection 1 time (s)')
return self._get_selection(index)

elif self._version == 2:

return self._get_selection(index, __metric_name='selection 2 time (s)')
return self._get_selection(index)

else:
raise ValueError(f'Version {self._version} not supported')

@_metricise
def _get_vanilla(self, index):
"""
Get the data without any active operation
Expand Down Expand Up @@ -294,7 +297,7 @@ def _get_active(self, method, *args):
"""
raise NotImplementedError

@_metricise

def _get_selection(self, *args):
"""
At this point we have a Dataset object, but all the important information about
Expand All @@ -307,13 +310,8 @@ def _get_selection(self, *args):

name = self.ds.name
dtype = np.dtype(self.ds.dtype)
# hopefully fix pyfive to get a dtype directly
array = pyfive.indexing.ZarrArrayStub(self.ds.shape, self.ds.chunks)
ds = self.ds.id

self.metric_data['args'] = args
self.metric_data['dataset shape'] = self.ds.shape
self.metric_data['dataset chunks'] = self.ds.chunks
if ds.filter_pipeline is None:
compressor, filters = None, None
else:
Expand Down Expand Up @@ -359,16 +357,6 @@ def _from_storage(self, ds, indexer, chunks, out_shape, out_dtype, compressor, f
session = None

# Process storage chunks using a thread pool.
# Because we do this, we need to read the dataset b-tree now, not as we go, so
# it is already in cache. If we remove the thread pool from here, we probably
# wouldn't need to do it before the first one.

if ds.chunks is not None:
t1 = time.time()
# ds._get_chunk_addresses()
t2 = time.time() - t1
self.metric_data['indexing time (s)'] = t2
# self.metric_data['chunk number'] = len(ds._zchunk_index)
chunk_count = 0
t1 = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=self._max_threads) as executor:
Expand Down Expand Up @@ -433,10 +421,6 @@ def _from_storage(self, ds, indexer, chunks, out_shape, out_dtype, compressor, f
# size.
out = out / np.sum(counts).reshape(shape1)

t2 = time.time()
self.metric_data['reduction time (s)'] = t2-t1
self.metric_data['chunks processed'] = chunk_count
self.metric_data['storage read (B)'] = self.data_read
return out

def _get_endpoint_url(self):
Expand Down
49 changes: 35 additions & 14 deletions activestorage/storage.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
"""Active storage module."""
import numpy as np
import pyfive

from numcodecs.compat import ensure_ndarray


def reduce_chunk(rfile,
offset, size, compression, filters, missing, dtype, shape,
order, chunk_selection, method=None):
Expand All @@ -25,20 +27,39 @@ def reduce_chunk(rfile,
storage implementations we'll change to controlled vocabulary)

"""

#FIXME: for the moment, open the file every time ... we might want to do that, or not
with open(rfile,'rb') as open_file:
# get the data
chunk = read_block(open_file, offset, size)
# reverse any compression and filters
chunk = filter_pipeline(chunk, compression, filters)
# make it a numpy array of bytes
chunk = ensure_ndarray(chunk)
# convert to the appropriate data type
chunk = chunk.view(dtype)
# sort out ordering and convert to the parent hyperslab dimensions
chunk = chunk.reshape(-1, order='A')
chunk = chunk.reshape(shape, order=order)
obj_type = type(rfile)
print(f"Reducing chunk of object {obj_type}")

if not obj_type is pyfive.high_level.Dataset:
#FIXME: for the moment, open the file every time ... we might want to do that, or not
# we could just use an instance of pyfive.high_level.Dataset.id
# passed directly from active.py, as below
with open(rfile,'rb') as open_file:
# get the data
chunk = read_block(open_file, offset, size)
# reverse any compression and filters
chunk = filter_pipeline(chunk, compression, filters)
# make it a numpy array of bytes
chunk = ensure_ndarray(chunk)
# convert to the appropriate data type
chunk = chunk.view(dtype)
# sort out ordering and convert to the parent hyperslab dimensions
chunk = chunk.reshape(-1, order='A')
chunk = chunk.reshape(shape, order=order)
else:
class storeinfo: pass
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we import the class so it's more obvious what it's for?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried this just now, there is unfortunately flakiness involved:

E       AssertionError: assert 265.90347 == array(258.62814, dtype=float32)
E        +  where array(258.62814, dtype=float32) = <built-in function array>(258.62814, dtype='float32')
E        +    where <built-in function array> = np.array

tests/unit/test_active.py:104: AssertionError
----------------------------------------------------------------- Captured stdout call ------------------------------------------------------------------
Treating input <HDF5 dataset "TREFHT": shape (12, 4, 8), type "float32"> as variable object.
Reducing chunk of object <class 'pyfive.high_level.Dataset'>
Reducing chunk of object <class 'pyfive.high_level.Dataset'>XXX StI byte offset 12394 StI size 128
XXX 
StI byte offset 12522actual offset  StI size12394 actual size 128
 128
actual offset 12522 actual size 128

with implement:

            from pyfive.h5d import StoreInfo as storeinfo
            storeinfo.byte_offset = offset
            storeinfo.size = size
            print("XXX", "StI byte offset", storeinfo.byte_offset, "StI size", storeinfo.size)
            print("actual offset", offset, "actual size", size)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StoreInfo does not seem to be a safe class the way it is implemented at the moment, since it gets frozen in at times, and doesn't update on the go

storeinfo.byte_offset = offset
storeinfo.size = size
chunk = rfile.id._get_raw_chunk(storeinfo)
# reverse any compression and filters
chunk = filter_pipeline(chunk, compression, filters)
# make it a numpy array of bytes
chunk = ensure_ndarray(chunk)
# convert to the appropriate data type
chunk = chunk.view(dtype)
# sort out ordering and convert to the parent hyperslab dimensions
chunk = chunk.reshape(-1, order='A')
chunk = chunk.reshape(shape, order=order)

tmp = chunk[chunk_selection]
if method:
Expand Down
42 changes: 42 additions & 0 deletions tests/test_real_s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import os
import numpy as np

from activestorage.active import Active
from activestorage.active import load_from_s3

S3_BUCKET = "bnl"

# TODO Remove after full testing and right before deployment
def test_s3_dataset():
"""Run somewhat as the 'gold' test."""
storage_options = {
'key': "f2d55c6dcfc7618b2c34e00b58df3cef",
'secret': "$/'#M{0{/4rVhp%n^(XeX$q@y#&(NM3W1->~N.Q6VP.5[@bLpi='nt]AfH)>78pT",
'client_kwargs': {'endpoint_url': "https://uor-aces-o.s3-ext.jc.rl.ac.uk"}, # old proxy
# 'client_kwargs': {'endpoint_url': "https://uor-aces-o.ext.proxy.jc.rl.ac.uk"}, # new proxy
}
active_storage_url = "https://192.171.169.113:8080"
# bigger_file = "ch330a.pc19790301-bnl.nc" # 18GB 3400 HDF5 chunks
bigger_file = "ch330a.pc19790301-def.nc" # 17GB 64 HDF5 chunks
# bigger_file = "da193a_25_day__198808-198808.nc" # 3GB 30 HDF5 chunks

test_file_uri = os.path.join(
S3_BUCKET,
bigger_file
)
print("S3 Test file path:", test_file_uri)
dataset = load_from_s3(test_file_uri, storage_options=storage_options)
av = dataset['UM_m01s16i202_vn1106']

# big file bnl: 18GB/3400 HDF5 chunks; def: 17GB/64 HDF5 chunks
active = Active(av, storage_type="s3",
storage_options=storage_options,
active_storage_url=active_storage_url)
active._version = 2
active._method = "min"

# result = active[:]
result = active[0:3, 4:6, 7:9] # standardized slice

print("Result is", result)
assert result == 5098.625
Loading
Loading