Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lh5concat: Command line tool to concatenate array-like LGDOs in a single file #73

Merged
merged 5 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ test = [

[project.scripts]
lh5ls = "lgdo.cli:lh5ls"
lh5concat = "lgdo.cli:lh5concat"

[tool.setuptools]
include-package-data = true
Expand Down
215 changes: 203 additions & 12 deletions src/lgdo/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,27 @@
from __future__ import annotations

import argparse
import fnmatch
import logging
import sys

import lgdo
import lgdo.logging
from lgdo.lh5 import show
from . import Array, Table, VectorOfVectors, __version__, lh5
from . import logging as lgdogging # eheheh

log = logging.getLogger(__name__)

def lh5ls():

def lh5ls(args=None):
""":func:`.lh5.show` command line interface."""
parser = argparse.ArgumentParser(
prog="lh5ls", description="Inspect LEGEND HDF5 (LH5) file contents"
)

# global options
parser.add_argument(
"--version", action="store_true", help="""Print lgdo version and exit"""
"--version",
action="store_true",
help="""Print legend-pydataobj version and exit""",
)
parser.add_argument(
"--verbose",
Expand All @@ -34,7 +38,7 @@ def lh5ls():

parser.add_argument(
"lh5_file",
help="""Input LH5 file.""",
help="""Input LH5 file""",
)
parser.add_argument("lh5_group", nargs="?", help="""LH5 group.""", default="/")
parser.add_argument(
Expand All @@ -48,17 +52,204 @@ def lh5ls():
help="""Maximum tree depth of groups to print""",
)

args = parser.parse_args()
args = parser.parse_args(args)

if args.verbose:
lgdo.logging.setup(logging.DEBUG)
lgdogging.setup(logging.DEBUG)
elif args.debug:
lgdo.logging.setup(logging.DEBUG, logging.root)
lgdogging.setup(logging.DEBUG, logging.root)
else:
lgdo.logging.setup()
lgdogging.setup()

if args.version:
print(lgdo.__version__) # noqa: T201
print(__version__) # noqa: T201
sys.exit()

show(args.lh5_file, args.lh5_group, attrs=args.attributes, depth=args.depth)
lh5.show(args.lh5_file, args.lh5_group, attrs=args.attributes, depth=args.depth)


def lh5concat(args=None):
"""Command line interface for concatenating array-like LGDOs in LH5 files."""
parser = argparse.ArgumentParser(
prog="lh5concat", description="Concatenate LGDO Arrays and Tables in LH5 files"
)

# global options
parser.add_argument(
"--version",
action="store_true",
help="""Print legend-pydataobj version and exit""",
)
parser.add_argument(
"--verbose",
"-v",
action="store_true",
help="""Increase the program verbosity""",
)
parser.add_argument(
"--debug",
action="store_true",
help="""Increase the program verbosity to maximum""",
)

parser.add_argument(
"lh5_file",
nargs="+",
help="""Input LH5 files""",
)
parser.add_argument(
"--output",
"-o",
help="""Output file""",
required=True,
)
parser.add_argument(
"--overwrite",
"-w",
action="store_true",
help="""Overwrite output file""",
)
parser.add_argument(
"--include",
"-i",
help="""Regular expression (fnmatch style) for object names that should
be concatenated. The option can be passed multiple times to pass a list
of patterns.
""",
action="append",
default=None,
)
parser.add_argument(
"--exclude",
"-e",
help="""List of object names that should be excluded. Takes priority over --include""",
action="append",
default=None,
)

args = parser.parse_args(args)

if args.verbose:
lgdogging.setup(logging.DEBUG, log)
elif args.debug:
lgdogging.setup(logging.DEBUG, logging.root)
else:
lgdogging.setup()

if args.version:
print(__version__) # noqa: T201
sys.exit()

if len(args.lh5_file) < 2:
msg = "you must provide at least two input files"
raise RuntimeError(msg)

# determine list of objects by recursively ls'ing first file
file0 = args.lh5_file[0]
obj_list_full = set(lh5.ls(file0, recursive=True))

# let's remove objects with nested LGDOs inside
to_remove = set()
for name in obj_list_full:
if len(fnmatch.filter(obj_list_full, f"{name}/*")) > 1:
to_remove.add(name)
obj_list_full -= to_remove

obj_list = set()
# now first remove excluded stuff
if args.exclude is not None:
for exc in args.exclude:
obj_list_full -= set(fnmatch.filter(obj_list_full, exc.strip("/")))

# then make list of included, based on latest list
if args.include is not None:
for inc in args.include:
obj_list |= set(fnmatch.filter(obj_list_full, inc.strip("/")))
else:
obj_list = obj_list_full

# sort
obj_list = sorted(obj_list)

msg = f"objects matching include patterns {args.include} in {file0}: {obj_list}"
log.debug(msg)

# 1. read first valid lgdo from left to right
store = lh5.LH5Store()
h5f0 = store.gimme_file(file0)
lgdos = {}
# loop over object list in the first file
for name in obj_list:
# now loop over groups starting from root
current = ""
for item in name.split("/"):
current = f"{current}/{item}".strip("/")

if current in lgdos:
break

# not even an LGDO!
if "datatype" not in h5f0[current].attrs:
continue

# read as little as possible
obj, _ = store.read(current, h5f0, n_rows=1)
if isinstance(obj, (Table, Array, VectorOfVectors)):
# read all!
obj, _ = store.read(current, h5f0)
lgdos[current] = obj

break

msg = f"first-level, array-like objects: {lgdos.keys()}"
log.debug(msg)

h5f0.close()

# 2. remove (nested) table fields based on obj_list

def _inplace_table_filter(name, table):
# filter objects nested in this LGDO
skm = fnmatch.filter(obj_list, f"{name}/*")
kept = {it.removeprefix(name).strip("/").split("/")[0] for it in skm}

# now remove fields
for k in list(table.keys()):
if k not in kept:
table.pop(k)

msg = f"fields left in table '{name}': {table.keys()}"
log.debug(msg)

# recurse!
for k2, v2 in table.items():
if not isinstance(v2, Table):
continue

_inplace_table_filter(f"{name}/{k2}", v2)

for key, val in lgdos.items():
if not isinstance(val, Table):
continue

_inplace_table_filter(key, val)

# 3. write to output file

for name, obj in lgdos.items():
store.write(
obj,
name,
args.output,
wo_mode="overwrite_file" if args.overwrite else "write_safe",
)

# 4. loop over rest of files/names and write-append

for file in args.lh5_file[1:]:
msg = f"chaining file {file}"
log.debug(msg)

for name in obj_list:
obj, _ = store.read(name, file)
store.write(obj, name, args.output, wo_mode="append")
75 changes: 43 additions & 32 deletions src/lgdo/lh5/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,29 +202,31 @@ def read(
actual number of rows read will be returned as one of the return
values (see below).
idx
For NumPy-style "fancying indexing" for the read to select only some
rows, e.g. after applying some cuts to particular columns.
Only selection along the first axis is supported, so tuple arguments
must be one-tuples. If `n_rows` is not false, `idx` will be truncated to
`n_rows` before reading. To use with a list of files, can pass in a list of
`idx`'s (one for each file) or use a long contiguous list (e.g. built from a previous
identical read). If used in conjunction with `start_row` and `n_rows`,
will be sliced to obey those constraints, where `n_rows` is
interpreted as the (max) number of *selected* values (in `idx`) to be
read out. Note that the ``use_h5idx`` parameter controls some behaviour of the
read and that the default behavior (``use_h5idx=False``) prioritizes speed over
a small memory penalty.
For NumPy-style "fancying indexing" for the read to select only
some rows, e.g. after applying some cuts to particular columns.
Only selection along the first axis is supported, so tuple
arguments must be one-tuples. If `n_rows` is not false, `idx` will
be truncated to `n_rows` before reading. To use with a list of
files, can pass in a list of `idx`'s (one for each file) or use a
long contiguous list (e.g. built from a previous identical read).
If used in conjunction with `start_row` and `n_rows`, will be
sliced to obey those constraints, where `n_rows` is interpreted as
the (max) number of *selected* values (in `idx`) to be read out.
Note that the ``use_h5idx`` parameter controls some behaviour of
the read and that the default behavior (``use_h5idx=False``)
prioritizes speed over a small memory penalty.
use_h5idx
``True`` will directly pass the ``idx`` parameter to the underlying
``h5py`` call such that only the selected rows are read directly into memory,
which conserves memory at the cost of speed. There can be a significant penalty
to speed for larger files (1 - 2 orders of magnitude longer time).
``False`` (default) will read the entire object into memory before
performing the indexing. The default is much faster but requires additional memory,
though a relatively small amount in the typical use case. It is recommended to
leave this parameter as its default.
``h5py`` call such that only the selected rows are read directly
into memory, which conserves memory at the cost of speed. There can
be a significant penalty to speed for larger files (1 - 2 orders of
magnitude longer time). ``False`` (default) will read the entire
object into memory before performing the indexing. The default is
much faster but requires additional memory, though a relatively
small amount in the typical use case. It is recommended to leave
this parameter as its default.
field_mask
For tables and structs, determines which fields get written out.
For tables and structs, determines which fields get read out.
Only applies to immediate fields of the requested objects. If a dict
is used, a default dict will be made with the default set to the
opposite of the first element in the dict. This way if one specifies
Expand Down Expand Up @@ -375,7 +377,7 @@ def read(
# fields. If implemented, get_buffer() above should probably also
# (optionally?) prep buffers for each field
if obj_buf is not None:
msg = "obj_buf not implemented for LGOD Structs"
msg = "obj_buf not implemented for LGDO Structs"
raise NotImplementedError(msg)

# loop over fields and read
Expand Down Expand Up @@ -934,10 +936,11 @@ def write(
end of array is the same as ``append``.
- ``overwrite_file`` or ``of``: delete file if present prior to
writing to it. `write_start` should be 0 (its ignored).
- ``append_column`` or ``ac``: append columns from an :class:`~.lgdo.table.Table`
`obj` only if there is an existing :class:`~.lgdo.table.Table` in the `lh5_file` with
the same `name` and :class:`~.lgdo.table.Table.size`. If the sizes don't match,
or if there are matching fields, it errors out.
- ``append_column`` or ``ac``: append columns from an
:class:`~.lgdo.table.Table` `obj` only if there is an existing
:class:`~.lgdo.table.Table` in the `lh5_file` with the same
`name` and :class:`~.lgdo.table.Table.size`. If the sizes don't
match, or if there are matching fields, it errors out.
write_start
row in the output file (if already existing) to start overwriting
from.
Expand Down Expand Up @@ -982,17 +985,22 @@ def write(

# struct or table or waveform table
if isinstance(obj, Struct):
# In order to append a column, we need to update the `table{old_fields}` value in `group.attrs['datatype"]` to include the new fields.
# One way to do this is to override `obj.attrs["datatype"]` to include old and new fields. Then we can write the fields to the table as normal.
# In order to append a column, we need to update the
# `table{old_fields}` value in `group.attrs['datatype"]` to include
# the new fields. One way to do this is to override
# `obj.attrs["datatype"]` to include old and new fields. Then we
# can write the fields to the table as normal.
if wo_mode == "ac":
old_group = self.gimme_group(name, group)
datatype, shape, fields = parse_datatype(old_group.attrs["datatype"])
if datatype not in ["table", "struct"]:
msg = f"Trying to append columns to an object of type {datatype}"
raise RuntimeError(msg)

# If the mode is `append_column`, make sure we aren't appending a table that has a column of the same name as in the existing table
# Also make sure that the field we are adding has the same size
# If the mode is `append_column`, make sure we aren't appending
# a table that has a column of the same name as in the existing
# table. Also make sure that the field we are adding has the
# same size
if len(list(set(fields).intersection(set(obj.keys())))) != 0:
msg = f"Can't append {list(set(fields).intersection(set(obj.keys())))} column(s) to a table with the same field(s)"
raise ValueError(msg)
Expand All @@ -1012,10 +1020,13 @@ def write(
grp_attrs=obj.attrs,
overwrite=(wo_mode in ["o", "ac"]),
)
# If the mode is overwrite, then we need to peek into the file's table's existing fields
# If we are writing a new table to the group that does not contain an old field, we should delete that old field from the file
# If the mode is overwrite, then we need to peek into the file's
# table's existing fields. If we are writing a new table to the
# group that does not contain an old field, we should delete that
# old field from the file
if wo_mode == "o":
# Find the old keys in the group that are not present in the new table's keys, then delete them
# Find the old keys in the group that are not present in the
# new table's keys, then delete them
for key in list(set(group.keys()) - set(obj.keys())):
log.debug(f"{key} is not present in new table, deleting field")
del group[key]
Expand Down
Loading
Loading