From 1f6509f38cb9de166b8664964bfdc5eda9f2a2cd Mon Sep 17 00:00:00 2001 From: James Mineau Date: Fri, 20 Sep 2024 14:14:25 -0600 Subject: [PATCH] parquet trajectory output --- docs/configuration.md | 2 +- docs/output-files.md | 61 +++++++++++--------------------- r/src/read_traj.r | 78 +++++++++++++++++++++++++++++++++++++---- r/src/simulation_step.r | 15 ++++---- r/src/write_traj.r | 60 ++++++++++++++++++++++++++----- 5 files changed, 152 insertions(+), 64 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index 14ca5ce..def764a 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -79,7 +79,7 @@ str(receptors) | `rm_dat` | Logical indicating whether to delete `PARTICLE.DAT` after each simulation. Default to TRUE to reduce disk space since all of the trajectory information is also stored in `STILT_OUTPUT.rds` alongside the calculated upstream influence footprint | | `run_foot` | Logical indicating whether to produce footprints. If FALSE, `run_trajec` must be TRUE. This can be useful when calculating trajectories separate from footprints | | `run_trajec` | Logical indicating whether to produce new trajectories with `hycs_std`. If FALSE, will try to load the previous trajectory outputs. This is often useful for regridding purposes | -| `trajec_fmt` | Trajectory output format - file extension for trajectory output files. Defaults to `rds` (serialized R data). Other options include `h5` (HDF5). Other formats are less efficient than `rds` and may require additional dependencies to be installed, but can be useful for interoperability with other software packages | +| `trajec_fmt` | File extension for trajectory output files. Defaults to `rds` (serialized R data). Can be set to `parquet` for cross-platform output or as an empty string `''` to disable writing trajectory outputs | | `simulation_id` | Unique identifier for each simulation; defaults to NA which determines a unique identifier for each simulation by hashing the time and receptor location | | `timeout` | number of seconds to allow `hycs_std` to complete before sending SIGTERM and moving to the next simulation; defaults to 3600 (1 hour) | | `varsiwant` | character vector of 4-letter `hycs_std` variables. Defaults to the minimum required variables including `'time', 'indx', 'long', 'lati', 'zagl', 'foot', 'mlht', 'dens', 'samt', 'sigw', 'tlgr'`. Can optionally include options listed below. | diff --git a/docs/output-files.md b/docs/output-files.md index 9302235..ca8cfd2 100644 --- a/docs/output-files.md +++ b/docs/output-files.md @@ -9,16 +9,12 @@ Simulation identifiers follow a `yyyymmddHHMM_lati_long_zagl` convention, see [p ## Particle trajectories -Particle trajectories and simulation configuration information are packaged and saved in the format specified by `trajec_fmt` (defaults to `rds`). Options include +Particle trajectories and simulation configuration information are packaged and saved with the naming convention `_traj.`. Preserving the particle trajectories enables regridding the footprints at a later time without the computational cost of recalculating particle trajectories. - - `rds` - [serialized R data](https://stat.ethz.ch/R-manual/R-devel/library/base/html/readRDS.html) - - `h5` - [HDF5](https://www.hdfgroup.org/solutions/hdf5/) +Different output formats are available for the particle trajectories as specified by the `trajec_fmt` parameter. The default format is `rds` which is a [serialized R data](https://stat.ethz.ch/R-manual/R-devel/library/base/html/readRDS.html) object. Other options include `parquet` for [Apache Parquet](https://parquet.apache.org/) or an empty string `''` to disable writing trajectory outputs. +> Formats other than `rds` will be less efficient, but `parquet` offers the advantage of being a widely supported format by other software packages. -> Formats other than `rds` will be less efficient, but `h5` offers the advantage of being a widely supported format by other software packages. - -Trajectories are saved with the naming convention `_traj.`. Preserving the particle trajectories enables regridding the footprints at a later time without the computational cost of recalculating particle trajectories. - -This object can be loaded with `read_traj()` and is structured as +This object can be loaded using `R` with `read_traj()` and is structured as ```r source('r/dependencies.r') @@ -48,42 +44,25 @@ str(traj) The `traj$receptor` object is a named list with the time and location of the release point for the simulation. The `traj$particle` object is a data frame containing each particle's position and characteristics over time. -The `h5` output can be read using python using a combination of pandas and pytables: +Or `parquet` formatted files may be read using `Python` with the `pyarrow` package. ```python -import numpy as np import pandas as pd -import tables as tb - -# Read the particle data as a pandas dataframe -particle = pd.read_hdf('_traj.h5', 'particle') - -# Function to get group attributes -def get_group_attrs(h5file: tb.File, group: str) -> dict: - 'Get the attributes of a group in an h5 file' - root = h5file.root - roots = ['', '/', 'root'] - group = root if group in roots else getattr(root, group) - - attrs = {} - keys = group._v_attrs._v_attrnamesuser - for k in keys: - val = group._v_attrs[k] - if val is not None: - val = val[0] # values are stored as arrays - if np.issubdtype(val.dtype, np.bytes_): - val = val.decode() # convert bytes to string - val = None if val == 'NA' else val - attrs[k] = val - return attrs # dictionary output - -# Open the h5 file with pytables -h5file = tb.open_file('_traj.h5') - -# Read group attributes -file = get_group_attrs(h5file, '/')['file'] -receptor = get_group_attrs(h5file, 'receptor') -params = get_group_attrs(h5file, 'params') +import pyarrow.parquet as pq + +# Read the parquet file into a pandas DataFrame +particle = pd.read_parquet('_traj.parquet') +particle.head() +# | | time | indx | long | lati | zagl | foot | mlht | dens | samt | sigw | tlgr | foot_no_hnf_dilution | +# |---:|-------:|-------:|-------:|-------:|-------:|--------:|-------:|-------:|-------:|-------:|-------:|-----------------------:| +# | 0 | -1 | 1 | -80.4 | 39.6 | 5.63 | 0.0626 | 1459 | 1.1 | 1 | 1.08 | 1.67 | 0.00224 | +# | 1 | -1 | 2 | -80.4 | 39.6 | 41.37 | 0.043 | 1459 | 1.1 | 1 | 1.09 | 4.98 | 0.00224 | +# | 2 | -1 | 3 | -80.4 | 39.6 | 30.95 | 0.043 | 1459 | 1.1 | 1 | 1.09 | 4.98 | 0.00224 | +# | 3 | -1 | 4 | -80.4 | 39.6 | 3.32 | 0.0626 | 1459 | 1.1 | 1 | 1.08 | 1.67 | 0.00224 | +# | 4 | -1 | 5 | -80.4 | 39.6 | 28.88 | 0.0627 | 1459 | 1.1 | 1 | 1.08 | 1.67 | 0.00224 | + +# Receptor & config parameter metadata is stored in the parquet `FileMetaData.metadata` attribute +metadata = pq.read_metadata('_traj.parquet').metadata ``` ## Gridded footprints diff --git a/r/src/read_traj.r b/r/src/read_traj.r index e4c5324..a824640 100644 --- a/r/src/read_traj.r +++ b/r/src/read_traj.r @@ -3,12 +3,12 @@ #' #' @param file filename argument. Must end with .rds (for serialized R data #' output, preferred), .h5 (for Hierarchical Data Format output), -#' or NULL to return the footprint object for programatic use. +#' or .parquet (for Apache Parquet output). #' rds files do not require any additional libraries and have -#' better compression. The \code{rhdf5} package is required for .h5 output. -#' HDF5 output is slightly less efficient than .rds output, but is more -#' portable and can be read by other languages. +#' better compression. Alternative output formats require additional +#' libraries. #' +#' @import arrow #' @import rhdf5 #' @export @@ -22,11 +22,10 @@ read_traj <- function(file) { output <- list() # Define output list to rebuild - # .h5 output + # Alternative .h5 output if (!is.null(file) && grepl('\\.h5$', file, ignore.case = T)) { - # Assign current file path - output$file <- file + output$file <- file # Assign current file path # Get receptor attributes output$receptor <- rhdf5::h5readAttributes(file, 'receptor') @@ -47,4 +46,69 @@ read_traj <- function(file) { return(output) } + + # Alternative .parquet output + if (!is.null(file) && grepl('\\.parquet$', file, ignore.case = T)) { + extract_meta <- function(meta, prefix) { + prefix <- paste0('^', prefix, ':') + keys <- grep(prefix, names(meta), value = T) + data <- meta[keys] + names(data) <- gsub(prefix, '', names(data)) + return(data) + } + + convert_to_numeric <- function(x) { + if (x == "NA") { + return(NA) # Handle the string "NA" case + } + num_val <- suppressWarnings(as.numeric(x)) # Attempt conversion + if (is.na(num_val) && !is.na(x)) { + return(x) # Return original string if conversion fails + } + return(num_val) # Otherwise, return the numeric value + } + + output$file <- file # Assign current file path + + # Read in parquet file + particle <- arrow::read_parquet(file, as_data_frame = F) + metadata <- particle$metadata + + # Get receptor metadata + receptor <- extract_meta(metadata, 'receptor') + receptor$run_time <- as.POSIXct(receptor$run_time, tz='UTC') + receptor$lati <- as.numeric(receptor$lati) + receptor$long <- as.numeric(receptor$long) + receptor$zagl <- as.numeric(receptor$zagl) + + # Get param metadata + params <- extract_meta(metadata, 'param') + params <- lapply(params, convert_to_numeric) + + particle <- as.data.frame(particle) # Convert to data frame + + # Handle particle error data + err_cols <- grepl('_err$', names(particle)) + if (any(err_cols)) { + # Split particle data into particle and particle_error + particle <- particle[!err_cols] + particle_error <- particle[err_cols] + + # Strip '_err' from particle_error names + names(particle_error) <- gsub('_err$', '', names(particle_error)) + + # Get particle error params + particle_error_params <- extract_meta(metadata, 'error_param') + particle_error_params <- lapply(particle_error_params, convert_to_numeric) + + output$particle_error <- particle_error + output$particle_error_params <- particle_error_params + } + + output$receptor <- receptor + output$particle <- particle + output$params <- params + + return(output) + } } diff --git a/r/src/simulation_step.r b/r/src/simulation_step.r index 2eb6282..ac5c8ba 100644 --- a/r/src/simulation_step.r +++ b/r/src/simulation_step.r @@ -325,13 +325,14 @@ simulation_step <- function(before_footprint = list(function() {output}), horcorzierr = horcorzierr, winderrtf = winderrtf) } - - # Save output object and symlink to out/particles - write_traj(output, output$file) - - link <- file.path(output_wd, 'particles', basename(output$file)) - suppressWarnings(file.symlink(output$file, link)) - + + if (traject_fmt != '') { # Do not write trajectory file if format is empty + # Save output object and symlink to out/particles + write_traj(output, output$file) + + link <- file.path(output_wd, 'particles', basename(output$file)) + suppressWarnings(file.symlink(output$file, link)) + } } else { # If user opted to recycle existing trajectory files, read in the recycled # file to a data frame with an adjusted timestamp and index for the diff --git a/r/src/write_traj.r b/r/src/write_traj.r index 5803310..cdeeaeb 100644 --- a/r/src/write_traj.r +++ b/r/src/write_traj.r @@ -4,12 +4,12 @@ #' @param traj output created during simulation_step #' @param file filename argument. Must end with .rds (for serialized R data #' output, preferred), .h5 (for Hierarchical Data Format output), -#' or NULL to return the footprint object for programatic use. +#' or .parquet (for Apache Parquet output). #' rds files do not require any additional libraries and have -#' better compression. The \code{rhdf5} package is required for .h5 output. -#' HDF5 output is slightly less efficient than .rds output, but is more -#' portable and can be read by other languages. +#' better compression. Alternative output formats require additional +#' libraries. #' +#' @import arrow #' @import rhdf5 #' @export @@ -22,10 +22,10 @@ write_traj <- function(traj, file) { # .rds output (preferred) if (!is.null(file) && grepl('\\.rds$', file, ignore.case = T)) { saveRDS(traj, file) - return(file) + return(traj) } - # .h5 output + # Alternative .h5 output if (!is.null(file) && grepl('\\.h5$', file, ignore.case = T)) { # Check if rhdf5 package is installed if (!requireNamespace("rhdf5", quietly = TRUE)) { @@ -83,8 +83,52 @@ write_traj <- function(traj, file) { # Close file rhdf5::H5Fclose(fid) - return(file) + return(traj) } - return(traj) + # Alternative .parquet output + if (!is.null(file) && grepl('\\.parquet$', file, ignore.case = T)) { + # Check if arrow package is installed + if (!requireNamespace("arrow", quietly = TRUE)) { + stop("The 'arrow' package is required for parquet output. Please install it.") + } + + write_meta <- function(meta, prefix, obj) { + for (x in names(obj)) { + key <- paste0(prefix, ':', x) + meta[[key]] <- as.character(obj[[x]]) + } + } + + # Convert particle dataframe to arrow table + particle <- arrow::arrow_table(traj$particle) + + # Write receptor metadata + write_meta(particle$metadata, 'receptor', traj$receptor) + + # Write params metadata + write_meta(particle$metadata, 'param', traj$params) + + if ('particle_error' %in% names(traj)) { + # Add '_err' suffix to particle_error columns + names(traj$particle_error) <- paste0(names(traj$particle_error), '_err') + + # Convert particle_error dataframe to arrow table + particle_error <- arrow::arrow_table(traj$particle_error) + + # Concat particle and particle_error tables + particle <- arrow::concat_tables(particle, particle_error) + + # Write particle_error_params metadata + write_meta(particle$metadata, 'err_param', + traj$particle_error_params) + } + + # Write parquet file + arrow::write_parquet(particle, file) + + return(traj) + } + + invisible(traj) }