Skip to content

Commit

Permalink
parquet trajectory output
Browse files Browse the repository at this point in the history
  • Loading branch information
jmineau committed Sep 20, 2024
1 parent 42f5d65 commit 1f6509f
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 64 deletions.
2 changes: 1 addition & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ str(receptors)
| `rm_dat` | Logical indicating whether to delete `PARTICLE.DAT` after each simulation. Default to TRUE to reduce disk space since all of the trajectory information is also stored in `STILT_OUTPUT.rds` alongside the calculated upstream influence footprint |
| `run_foot` | Logical indicating whether to produce footprints. If FALSE, `run_trajec` must be TRUE. This can be useful when calculating trajectories separate from footprints |
| `run_trajec` | Logical indicating whether to produce new trajectories with `hycs_std`. If FALSE, will try to load the previous trajectory outputs. This is often useful for regridding purposes |
| `trajec_fmt` | Trajectory output format - file extension for trajectory output files. Defaults to `rds` (serialized R data). Other options include `h5` (HDF5). Other formats are less efficient than `rds` and may require additional dependencies to be installed, but can be useful for interoperability with other software packages |
| `trajec_fmt` | File extension for trajectory output files. Defaults to `rds` (serialized R data). Can be set to `parquet` for cross-platform output or as an empty string `''` to disable writing trajectory outputs |
| `simulation_id` | Unique identifier for each simulation; defaults to NA which determines a unique identifier for each simulation by hashing the time and receptor location |
| `timeout` | number of seconds to allow `hycs_std` to complete before sending SIGTERM and moving to the next simulation; defaults to 3600 (1 hour) |
| `varsiwant` | character vector of 4-letter `hycs_std` variables. Defaults to the minimum required variables including `'time', 'indx', 'long', 'lati', 'zagl', 'foot', 'mlht', 'dens', 'samt', 'sigw', 'tlgr'`. Can optionally include options listed below. |
Expand Down
61 changes: 20 additions & 41 deletions docs/output-files.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,12 @@ Simulation identifiers follow a `yyyymmddHHMM_lati_long_zagl` convention, see [p

## Particle trajectories

Particle trajectories and simulation configuration information are packaged and saved in the format specified by `trajec_fmt` (defaults to `rds`). Options include
Particle trajectories and simulation configuration information are packaged and saved with the naming convention `<simulation_id>_traj.<trajec_fmt>`. Preserving the particle trajectories enables regridding the footprints at a later time without the computational cost of recalculating particle trajectories.

- `rds` - [serialized R data](https://stat.ethz.ch/R-manual/R-devel/library/base/html/readRDS.html)
- `h5` - [HDF5](https://www.hdfgroup.org/solutions/hdf5/)
Different output formats are available for the particle trajectories as specified by the `trajec_fmt` parameter. The default format is `rds` which is a [serialized R data](https://stat.ethz.ch/R-manual/R-devel/library/base/html/readRDS.html) object. Other options include `parquet` for [Apache Parquet](https://parquet.apache.org/) or an empty string `''` to disable writing trajectory outputs.
> Formats other than `rds` will be less efficient, but `parquet` offers the advantage of being a widely supported format by other software packages.
> Formats other than `rds` will be less efficient, but `h5` offers the advantage of being a widely supported format by other software packages.
Trajectories are saved with the naming convention `<simulation_id>_traj.<trajec_fmt>`. Preserving the particle trajectories enables regridding the footprints at a later time without the computational cost of recalculating particle trajectories.

This object can be loaded with `read_traj(<path>)` and is structured as
This object can be loaded using `R` with `read_traj(<path>)` and is structured as

```r
source('r/dependencies.r')
Expand Down Expand Up @@ -48,42 +44,25 @@ str(traj)

The `traj$receptor` object is a named list with the time and location of the release point for the simulation. The `traj$particle` object is a data frame containing each particle's position and characteristics over time.

The `h5` output can be read using python using a combination of pandas and pytables:
Or `parquet` formatted files may be read using `Python` with the `pyarrow` package.

```python
import numpy as np
import pandas as pd
import tables as tb

# Read the particle data as a pandas dataframe
particle = pd.read_hdf('<simulation_id>_traj.h5', 'particle')

# Function to get group attributes
def get_group_attrs(h5file: tb.File, group: str) -> dict:
'Get the attributes of a group in an h5 file'
root = h5file.root
roots = ['', '/', 'root']
group = root if group in roots else getattr(root, group)

attrs = {}
keys = group._v_attrs._v_attrnamesuser
for k in keys:
val = group._v_attrs[k]
if val is not None:
val = val[0] # values are stored as arrays
if np.issubdtype(val.dtype, np.bytes_):
val = val.decode() # convert bytes to string
val = None if val == 'NA' else val
attrs[k] = val
return attrs # dictionary output

# Open the h5 file with pytables
h5file = tb.open_file('<simulation_id>_traj.h5')

# Read group attributes
file = get_group_attrs(h5file, '/')['file']
receptor = get_group_attrs(h5file, 'receptor')
params = get_group_attrs(h5file, 'params')
import pyarrow.parquet as pq

# Read the parquet file into a pandas DataFrame
particle = pd.read_parquet('<simulation_id>_traj.parquet')
particle.head()
# | | time | indx | long | lati | zagl | foot | mlht | dens | samt | sigw | tlgr | foot_no_hnf_dilution |
# |---:|-------:|-------:|-------:|-------:|-------:|--------:|-------:|-------:|-------:|-------:|-------:|-----------------------:|
# | 0 | -1 | 1 | -80.4 | 39.6 | 5.63 | 0.0626 | 1459 | 1.1 | 1 | 1.08 | 1.67 | 0.00224 |
# | 1 | -1 | 2 | -80.4 | 39.6 | 41.37 | 0.043 | 1459 | 1.1 | 1 | 1.09 | 4.98 | 0.00224 |
# | 2 | -1 | 3 | -80.4 | 39.6 | 30.95 | 0.043 | 1459 | 1.1 | 1 | 1.09 | 4.98 | 0.00224 |
# | 3 | -1 | 4 | -80.4 | 39.6 | 3.32 | 0.0626 | 1459 | 1.1 | 1 | 1.08 | 1.67 | 0.00224 |
# | 4 | -1 | 5 | -80.4 | 39.6 | 28.88 | 0.0627 | 1459 | 1.1 | 1 | 1.08 | 1.67 | 0.00224 |

# Receptor & config parameter metadata is stored in the parquet `FileMetaData.metadata` attribute
metadata = pq.read_metadata('<simulation_id>_traj.parquet').metadata
```

## Gridded footprints
Expand Down
78 changes: 71 additions & 7 deletions r/src/read_traj.r
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
#'
#' @param file filename argument. Must end with .rds (for serialized R data
#' output, preferred), .h5 (for Hierarchical Data Format output),
#' or NULL to return the footprint object for programatic use.
#' or .parquet (for Apache Parquet output).
#' rds files do not require any additional libraries and have
#' better compression. The \code{rhdf5} package is required for .h5 output.
#' HDF5 output is slightly less efficient than .rds output, but is more
#' portable and can be read by other languages.
#' better compression. Alternative output formats require additional
#' libraries.
#'
#' @import arrow
#' @import rhdf5
#' @export

Expand All @@ -22,11 +22,10 @@ read_traj <- function(file) {

output <- list() # Define output list to rebuild

# .h5 output
# Alternative .h5 output
if (!is.null(file) && grepl('\\.h5$', file, ignore.case = T)) {

# Assign current file path
output$file <- file
output$file <- file # Assign current file path

# Get receptor attributes
output$receptor <- rhdf5::h5readAttributes(file, 'receptor')
Expand All @@ -47,4 +46,69 @@ read_traj <- function(file) {

return(output)
}

# Alternative .parquet output
if (!is.null(file) && grepl('\\.parquet$', file, ignore.case = T)) {
extract_meta <- function(meta, prefix) {
prefix <- paste0('^', prefix, ':')
keys <- grep(prefix, names(meta), value = T)
data <- meta[keys]
names(data) <- gsub(prefix, '', names(data))
return(data)
}

convert_to_numeric <- function(x) {
if (x == "NA") {
return(NA) # Handle the string "NA" case
}
num_val <- suppressWarnings(as.numeric(x)) # Attempt conversion
if (is.na(num_val) && !is.na(x)) {
return(x) # Return original string if conversion fails
}
return(num_val) # Otherwise, return the numeric value
}

output$file <- file # Assign current file path

# Read in parquet file
particle <- arrow::read_parquet(file, as_data_frame = F)
metadata <- particle$metadata

# Get receptor metadata
receptor <- extract_meta(metadata, 'receptor')
receptor$run_time <- as.POSIXct(receptor$run_time, tz='UTC')
receptor$lati <- as.numeric(receptor$lati)
receptor$long <- as.numeric(receptor$long)
receptor$zagl <- as.numeric(receptor$zagl)

# Get param metadata
params <- extract_meta(metadata, 'param')
params <- lapply(params, convert_to_numeric)

particle <- as.data.frame(particle) # Convert to data frame

# Handle particle error data
err_cols <- grepl('_err$', names(particle))
if (any(err_cols)) {
# Split particle data into particle and particle_error
particle <- particle[!err_cols]
particle_error <- particle[err_cols]

# Strip '_err' from particle_error names
names(particle_error) <- gsub('_err$', '', names(particle_error))

# Get particle error params
particle_error_params <- extract_meta(metadata, 'error_param')
particle_error_params <- lapply(particle_error_params, convert_to_numeric)

output$particle_error <- particle_error
output$particle_error_params <- particle_error_params
}

output$receptor <- receptor
output$particle <- particle
output$params <- params

return(output)
}
}
15 changes: 8 additions & 7 deletions r/src/simulation_step.r
Original file line number Diff line number Diff line change
Expand Up @@ -325,13 +325,14 @@ simulation_step <- function(before_footprint = list(function() {output}),
horcorzierr = horcorzierr,
winderrtf = winderrtf)
}

# Save output object and symlink to out/particles
write_traj(output, output$file)

link <- file.path(output_wd, 'particles', basename(output$file))
suppressWarnings(file.symlink(output$file, link))


if (traject_fmt != '') { # Do not write trajectory file if format is empty
# Save output object and symlink to out/particles
write_traj(output, output$file)

link <- file.path(output_wd, 'particles', basename(output$file))
suppressWarnings(file.symlink(output$file, link))
}
} else {
# If user opted to recycle existing trajectory files, read in the recycled
# file to a data frame with an adjusted timestamp and index for the
Expand Down
60 changes: 52 additions & 8 deletions r/src/write_traj.r
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
#' @param traj output created during simulation_step
#' @param file filename argument. Must end with .rds (for serialized R data
#' output, preferred), .h5 (for Hierarchical Data Format output),
#' or NULL to return the footprint object for programatic use.
#' or .parquet (for Apache Parquet output).
#' rds files do not require any additional libraries and have
#' better compression. The \code{rhdf5} package is required for .h5 output.
#' HDF5 output is slightly less efficient than .rds output, but is more
#' portable and can be read by other languages.
#' better compression. Alternative output formats require additional
#' libraries.
#'
#' @import arrow
#' @import rhdf5
#' @export

Expand All @@ -22,10 +22,10 @@ write_traj <- function(traj, file) {
# .rds output (preferred)
if (!is.null(file) && grepl('\\.rds$', file, ignore.case = T)) {
saveRDS(traj, file)
return(file)
return(traj)
}

# .h5 output
# Alternative .h5 output
if (!is.null(file) && grepl('\\.h5$', file, ignore.case = T)) {
# Check if rhdf5 package is installed
if (!requireNamespace("rhdf5", quietly = TRUE)) {
Expand Down Expand Up @@ -83,8 +83,52 @@ write_traj <- function(traj, file) {
# Close file
rhdf5::H5Fclose(fid)

return(file)
return(traj)
}

return(traj)
# Alternative .parquet output
if (!is.null(file) && grepl('\\.parquet$', file, ignore.case = T)) {
# Check if arrow package is installed
if (!requireNamespace("arrow", quietly = TRUE)) {
stop("The 'arrow' package is required for parquet output. Please install it.")
}

write_meta <- function(meta, prefix, obj) {
for (x in names(obj)) {
key <- paste0(prefix, ':', x)
meta[[key]] <- as.character(obj[[x]])
}
}

# Convert particle dataframe to arrow table
particle <- arrow::arrow_table(traj$particle)

# Write receptor metadata
write_meta(particle$metadata, 'receptor', traj$receptor)

# Write params metadata
write_meta(particle$metadata, 'param', traj$params)

if ('particle_error' %in% names(traj)) {
# Add '_err' suffix to particle_error columns
names(traj$particle_error) <- paste0(names(traj$particle_error), '_err')

# Convert particle_error dataframe to arrow table
particle_error <- arrow::arrow_table(traj$particle_error)

# Concat particle and particle_error tables
particle <- arrow::concat_tables(particle, particle_error)

# Write particle_error_params metadata
write_meta(particle$metadata, 'err_param',
traj$particle_error_params)
}

# Write parquet file
arrow::write_parquet(particle, file)

return(traj)
}

invisible(traj)
}

0 comments on commit 1f6509f

Please sign in to comment.