Skip to content

Commit

Permalink
Load arbitrary number of datafiles from a master file (#6)
Browse files Browse the repository at this point in the history
* add option to specify number of datafiles loaded

* bump version

* raise HTTPException when datafiles cannot be loaded

* update get_master_file return schema
  • Loading branch information
fhernandezvivanco authored Jul 5, 2024
1 parent c6a80f3 commit 789d6e2
Show file tree
Hide file tree
Showing 6 changed files with 234 additions and 359 deletions.
10 changes: 5 additions & 5 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ default_language_version:
python: python3.11
repos:
- repo: https://github.com/myint/autoflake
rev: v2.2.1
rev: v2.3.1
hooks:
- id: autoflake
name: Autoflake
Expand All @@ -14,13 +14,13 @@ repos:
- --remove-duplicate-keys
- --remove-unused-variables
- repo: https://github.com/psf/black
rev: 23.7.0
rev: 24.4.2
hooks:
- id: black
name: Black
language_version: python3.11
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.4.0
rev: v4.6.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
Expand All @@ -36,15 +36,15 @@ repos:
args: ["--fix=lf"]
- id: requirements-txt-fixer
- repo: https://github.com/PyCQA/isort
rev: 5.12.0
rev: 5.13.2
hooks:
- id: isort
name: Import Sort
args:
- --settings=.
exclude: /__init__\.py$
- repo: https://github.com/PyCQA/flake8
rev: 6.1.0
rev: 7.1.0
hooks:
- id: flake8
name: Flake8
Expand Down
34 changes: 30 additions & 4 deletions ansto_simplon_api/routes/ansto_endpoints/load_hdf5_files.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,40 @@
from fastapi import APIRouter
from fastapi.exceptions import HTTPException
from starlette import status

from ...schemas.configuration import SimplonRequestStr
from ...schemas.ansto_endpoints import LoadHDF5File
from ...simulate_zmq_stream import zmq_stream

router = APIRouter(prefix="/ansto_endpoints", tags=["ANSTO Endpoints"])


@router.put("/load_hdf5_master_file")
async def set_user_data(hdf5_file_path: SimplonRequestStr):
zmq_stream.hdf5_file_path = hdf5_file_path.value
@router.put("/hdf5_master_file")
async def set_master_file(hdf5_model: LoadHDF5File):
try:
zmq_stream.create_list_of_compressed_frames(
hdf5_file_path=hdf5_model.hdf5_file_path,
compression=hdf5_model.compression,
number_of_datafiles=hdf5_model.number_of_datafiles,
)
except IndexError:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="The number of datafiles specified exceed the number of datafiles available "
"in the master file. Reduce number_of_datafiles",
)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)
)
zmq_stream.frame_id = 0
zmq_stream.image_number = 0
return {"value": zmq_stream.hdf5_file_path}


@router.get("/hdf5_master_file")
async def get_master_file() -> LoadHDF5File:
return LoadHDF5File(
hdf5_file_path=zmq_stream.hdf5_file_path,
number_of_datafiles=zmq_stream.number_of_data_files,
compression=zmq_stream.compression,
)
9 changes: 9 additions & 0 deletions ansto_simplon_api/schemas/ansto_endpoints.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from typing import Literal

from pydantic import BaseModel, Field


class LoadHDF5File(BaseModel):
hdf5_file_path: str = Field(examples=["/path/to/master_file"])
number_of_datafiles: int | None = Field(default=None, examples=[1])
compression: Literal["bslz4", "none"] = Field(default="bslz4", examples=["bslz4"])
218 changes: 29 additions & 189 deletions ansto_simplon_api/simulate_zmq_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from copy import deepcopy
from datetime import datetime, timezone
from os import environ
from typing import Any

import bitshuffle
import cbor2
Expand Down Expand Up @@ -72,7 +71,8 @@ def __init__(
self.socket = self.context.socket(zmq.PUSH)
self.socket.bind(self.address)

self._sequence_id = 0
self.frames = None
self.sequence_id = 0

self.frame_id = 0

Expand All @@ -82,6 +82,9 @@ def __init__(
self.series_unique_id = None
self.frames = None
self.hdf5_file_path = hdf5_file_path
self.create_list_of_compressed_frames(
self.hdf5_file_path, self.compression, self.number_of_data_files
)

logging.info(f"ZMQ Address: {self.address}")
logging.info(f"Hdf5 file path: {self.hdf5_file_path}")
Expand All @@ -90,7 +93,10 @@ def __init__(
logging.info(f"Number of data files: {self.number_of_data_files}")

def create_list_of_compressed_frames(
self, hdf5_file_path: str, compression: str
self,
hdf5_file_path: str,
compression: str,
number_of_datafiles: int | None = None,
) -> list[dict]:
"""
Creates a list of compressed frames from a hdf5 file
Expand All @@ -102,6 +108,9 @@ def create_list_of_compressed_frames(
compression : str
Compression type. Accepted compression types are lz4 and bslz4.
Default value is bslz4
number_of_datafiles: int | None = None
The number of datafiles loaded in memory. If number_of_datafiles=None,
we load all datafiles specified in the master file
Raises
------
Expand All @@ -114,24 +123,26 @@ def create_list_of_compressed_frames(
A list containing a dictionary of compressed frames, and
frame metadata
"""
self.hdf5_file_path = hdf5_file_path
self.compression = compression
self.number_of_data_files = number_of_datafiles

hdf5_file = h5py.File(hdf5_file_path)
keys = list(hdf5_file["entry"]["data"].keys())
with h5py.File(hdf5_file_path) as hdf5_file:
keys = list(hdf5_file["entry"]["data"].keys())

datafile_list: list[npt.NDArray] = [
np.array(hdf5_file["entry"]["data"][keys[i]])
for i in range(self.number_of_data_files)
]
if number_of_datafiles is None:
self.number_of_data_files = len(keys)

# Would make more sense in the __init__ section
# but then we'd need to read the file twice
self.start_message, self.image_message, self.end_message = Parse(
hdf5_file
).header()
datafile_list: list[npt.NDArray] = [
np.array(hdf5_file["entry"]["data"][keys[i]])
for i in range(self.number_of_data_files)
]

# Delete the hdf5_file, we got what we needed
hdf5_file.close()
del hdf5_file
# Would make more sense in the __init__ section
# but then we'd need to read the file twice
self.start_message, self.image_message, self.end_message = Parse(
hdf5_file
).header()

number_of_frames_per_data_file = [
datafile.shape[0] for datafile in datafile_list
Expand Down Expand Up @@ -175,7 +186,7 @@ def create_list_of_compressed_frames(

logging.info(f"Number of unique frames: {len(frame_list)}")
del datafile_list
return frame_list
self.frames = frame_list

def create_image_cbor_object(
self,
Expand Down Expand Up @@ -338,177 +349,6 @@ def start_stream(self) -> None:
self.stream_frames(self.frames)
self.stream_end_message()

@property
def sequence_id(self) -> int:
"""
Gets the sequence_id
Returns
-------
self._sequence_id: int
The sequence_id
"""
return self._sequence_id

@sequence_id.setter
def sequence_id(self, value: int) -> None:
"""
Sets the sequence_id
Returns
-------
None
"""
self._sequence_id = value

@property
def image_number(self) -> int:
"""
Gets the image_number
Returns
-------
self._image_number: int
The image_number
"""
return self._image_number

@image_number.setter
def image_number(self, value: int) -> None:
"""
Sets the image_number
Returns
-------
None
"""
self._image_number = value

@property
def number_of_frames_per_trigger(self) -> None:
"""
Sets the number of frames per trigger
Returns
-------
None
"""
return self._number_of_frames_per_trigger

@number_of_frames_per_trigger.setter
def number_of_frames_per_trigger(self, value: int) -> None:
"""
Sets the image_number
Parameters
----------
value : int
The number of frames per trigger
Returns
-------
None
"""
self._number_of_frames_per_trigger = value
logging.info(f"nimages set to: {value}")

@property
def user_data(self) -> Any:
"""
Gets the user data
Returns
-------
self._user_data : Any
The user data
"""
return self._user_data

@user_data.setter
def user_data(self, value: Any) -> None:
"""
Sets the user data
Parameters
----------
value : Any
New value
Returns
-------
None
"""
self._user_data = value

@property
def compression(self) -> str:
"""
Gets the compression type
Returns
-------
self._user_data : Any
The user data
"""
return self._compression

@compression.setter
def compression(self, value: str) -> None:
"""
Sets the compression type
Parameters
----------
value : str
New value
Returns
-------
None
"""
allowed_compressions = ["bslz4", "none"]
if value.lower() in allowed_compressions:
self._compression = value
try:
self.frames = self.create_list_of_compressed_frames(
self.hdf5_file_path, self.compression
)
except AttributeError:
pass
else:
raise ValueError(
"Allowed compressions are bslz4 and none only" f"not {value}"
)

@property
def hdf5_file_path(self) -> str:
"""
Gets the hdf5_file_path
Returns
-------
self._hdf5_file_path : str
The hdf5 file name
"""
return self._hdf5_file_path

@hdf5_file_path.setter
def hdf5_file_path(self, value: str) -> None:
"""
Sets the hdf5 file name and loads frames from the HDF5 file into memory
Parameters
----------
value : str
The hdf5 filename
"""
logging.info("Loading dataset...")
self._hdf5_file_path = value
self.frames = self.create_list_of_compressed_frames(
self._hdf5_file_path, self.compression
)
logging.info("Dataset loaded")


ZMQ_ADDRESS = environ.get("ZMQ_ADDRESS", "tcp://*:5555")
try:
Expand Down
Loading

0 comments on commit 789d6e2

Please sign in to comment.