Skip to content

Commit

Permalink
Merge pull request #312 from CABLE-LSM/300-add-connection-to-meorg_cl…
Browse files Browse the repository at this point in the history
…ient-to-successful-run

300 add connection to meorg client to successful run
  • Loading branch information
bschroeter authored Oct 13, 2024
2 parents a984e8d + 8e0d58f commit 9c9a954
Show file tree
Hide file tree
Showing 19 changed files with 349 additions and 35 deletions.
6 changes: 4 additions & 2 deletions .conda/benchcab-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ channels:
- defaults

dependencies:
- python=3.9
- python=3.13
- payu>=1.0.30
- pip
- f90nml
Expand All @@ -16,6 +16,8 @@ dependencies:
- cerberus>=1.3.5
- gitpython
- jinja2
- hpcpy>=0.5.0
- meorg_client
# CI
- pytest-cov
# Dev Dependencies
Expand All @@ -24,4 +26,4 @@ dependencies:
- black
- ruff
- pip:
- -r mkdocs-requirements.txt
- -r mkdocs-requirements.txt
6 changes: 4 additions & 2 deletions .conda/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ build:

requirements:
host:
- python ==3.9
- python >=3.9,<3.13
- pip
run:
- python ==3.9
- python >=3.9,<3.13
- payu >=1.0.30
- netCDF4
- PyYAML
Expand All @@ -29,3 +29,5 @@ requirements:
- cerberus >=1.3.5
- gitpython
- jinja2
- hpcpy>=0.5.0
- meorg_client
21 changes: 20 additions & 1 deletion docs/user_guide/config_options.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ fluxsite:
walltime: 06:00:00
storage: [scratch/a00, gdata/xy11]
multiprocess: True
meorg_model_output_id: XXXXXXXX
```
### [experiment](#experiment)
Expand Down Expand Up @@ -163,6 +164,14 @@ fluxsites:
```

### [meorg_model_output_id](#meorg_model_output_id)

: **Default:** False, _optional key_. :octicons-dash-24: The unique Model Output ID from modelevaluation.org to which output files will be automatically uploaded for analysis.

A separate upload job will be submitted at the successful completion of benchcab tasks if this key is present, however, the validity is not checked by benchcab at this stage.

Note: It is the user's responsbility to ensure the model output is configured on modelevaluation.org.

## spatial

Contains settings specific to spatial tests.
Expand Down Expand Up @@ -493,4 +502,14 @@ codecov:
[f90nml-github]: https://github.com/marshallward/f90nml
[environment-modules]: https://modules.sourceforge.net/
[nci-pbs-directives]: https://opus.nci.org.au/display/Help/PBS+Directives+Explained
[cable-github]: https://github.com/CABLE-LSM/CABLE
[cable-github]: https://github.com/CABLE-LSM/CABLE

## meorg_bin

: **Default:** False, _optional key. :octicons-dash-24: Specifies the absolute system path to the ME.org client executable. In the absence of this key it will be inferred from the same directory as benchcab should `meorg_model_output_id` be set in `fluxsite` above.

``` yaml
meorg_bin: /path/to/meorg
```
22 changes: 16 additions & 6 deletions src/benchcab/benchcab.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from subprocess import CalledProcessError
from typing import Optional

import benchcab.utils.meorg as bm
from benchcab import fluxsite, internal, spatial
from benchcab.comparison import run_comparisons, run_comparisons_in_parallel
from benchcab.config import read_config
Expand All @@ -22,7 +23,7 @@
from benchcab.environment_modules import EnvironmentModules, EnvironmentModulesInterface
from benchcab.internal import get_met_forcing_file_names
from benchcab.model import Model
from benchcab.utils import is_verbose
from benchcab.utils import is_verbose, task_summary
from benchcab.utils.fs import mkdir, next_path
from benchcab.utils.pbs import render_job_script
from benchcab.utils.repo import create_repo
Expand Down Expand Up @@ -234,14 +235,25 @@ def fluxsite_submit_job(self, config_path: str, skip: list[str]) -> None:
logger.error(exc.output)
raise

logger.info(f"PBS job submitted: {proc.stdout.strip()}")
# Get the job ID
job_id = proc.stdout.strip()

logger.info(f"PBS job submitted: {job_id}")
logger.info("CABLE log file for each task is written to:")
logger.info(f"{internal.FLUXSITE_DIRS['LOG']}/<task_name>_log.txt")
logger.info("The CABLE standard output for each task is written to:")
logger.info(f"{internal.FLUXSITE_DIRS['TASKS']}/<task_name>/out.txt")
logger.info("The NetCDF output for each task is written to:")
logger.info(f"{internal.FLUXSITE_DIRS['OUTPUT']}/<task_name>_out.nc")

# Upload to meorg by default
bm.do_meorg(
config,
upload_dir=internal.FLUXSITE_DIRS["OUTPUT"],
benchcab_bin=str(self.benchcab_exe_path),
benchcab_job_id=job_id,
)

def gen_codecov(self, config_path: str):
"""Endpoint for `benchcab codecov`."""
logger = self._get_logger()
Expand Down Expand Up @@ -347,8 +359,7 @@ def fluxsite_run_tasks(self, config_path: str):
else:
fluxsite.run_tasks(tasks)

tasks_failed = [task for task in tasks if not task.is_done()]
n_failed, n_success = len(tasks_failed), len(tasks) - len(tasks_failed)
_, n_success, n_failed, _ = task_summary(tasks)
logger.info(f"{n_failed} failed, {n_success} passed")

def fluxsite_bitwise_cmp(self, config_path: str):
Expand Down Expand Up @@ -376,8 +387,7 @@ def fluxsite_bitwise_cmp(self, config_path: str):
else:
run_comparisons(comparisons)

tasks_failed = [task for task in comparisons if not task.is_done()]
n_failed, n_success = len(tasks_failed), len(comparisons) - len(tasks_failed)
_, n_success, n_failed, _ = task_summary(comparisons)
logger.info(f"{n_failed} failed, {n_success} passed")

def fluxsite(self, config_path: str, no_submit: bool, skip: list[str]):
Expand Down
3 changes: 3 additions & 0 deletions src/benchcab/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ def read_optional_key(config: dict):
config["fluxsite"]["pbs"] = internal.FLUXSITE_DEFAULT_PBS | config["fluxsite"].get(
"pbs", {}
)
config["fluxsite"]["meorg_model_output_id"] = config["fluxsite"].get(
"meorg_model_output_id", internal.FLUXSITE_DEFAULT_MEORG_MODEL_OUTPUT_ID
)

config["codecov"] = config.get("codecov", False)

Expand Down
15 changes: 14 additions & 1 deletion src/benchcab/data/config-schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ fluxsite:
schema:
type: "string"
required: false
meorg_model_output_id:
type:
- "boolean"
- "string"
required: false
default: false

spatial:
type: "dict"
Expand Down Expand Up @@ -134,4 +140,11 @@ spatial:

codecov:
type: "boolean"
required: false
required: false

meorg_bin:
type:
- "boolean"
- "string"
required: False
default: False
43 changes: 43 additions & 0 deletions src/benchcab/data/meorg_jobscript.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#!/bin/bash
#PBS -l wd
#PBS -l ncpus={{num_threads}}
#PBS -l mem={{mem}}
#PBS -l walltime={{walltime}}
#PBS -q copyq
#PBS -P {{project}}
#PBS -j oe
#PBS -m e
#PBS -l storage={{storage_str}}

module purge
{% for module in modules -%}
module load {{module}}
{% endfor %}
set -ev

# Set some things
DATA_DIR={{data_dir}}
NUM_THREADS={{num_threads}}
MODEL_OUTPUT_ID={{model_output_id}}
CACHE_DELAY={{cache_delay}}
MEORG_BIN={{meorg_bin}}

{% if purge_outputs %}
# Purge existing model outputs
echo "Purging existing outputs from $MODEL_OUTPUT_ID"
$MEORG_BIN file detach_all $MODEL_OUTPUT_ID
{% endif %}

# Upload the data
echo "Uploading data to $MODEL_OUTPUT_ID"
$MEORG_BIN file upload $DATA_DIR/*.nc -n $NUM_THREADS --attach_to $MODEL_OUTPUT_ID

# Wait for the cache to transfer to the object store.
echo "Waiting for object store transfer ($CACHE_DELAY sec)"
sleep $CACHE_DELAY

# Trigger the analysis
echo "Triggering analysis on $MODEL_OUTPUT_ID"
$MEORG_BIN analysis start $MODEL_OUTPUT_ID

echo "DONE"
1 change: 1 addition & 0 deletions src/benchcab/data/test/config-optional.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ project: hh5

fluxsite:
experiment: AU-Tum
meorg_model_output_id: False
multiprocess: False
pbs:
ncpus: 6
Expand Down
53 changes: 53 additions & 0 deletions src/benchcab/data/test/integration_meorg.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#!/bin/bash

set -ex

CABLE_REPO="git@github.com:CABLE-LSM/CABLE.git"
CABLE_DIR=/scratch/$PROJECT/$USER/benchcab/CABLE

TEST_DIR=/scratch/$PROJECT/$USER/benchcab/integration
EXAMPLE_REPO="git@github.com:CABLE-LSM/bench_example.git"

# Remove CABLE and test work space, then recreate
rm -rf $CABLE_DIR
mkdir -p $CABLE_DIR

rm -rf $TEST_DIR
mkdir -p $TEST_DIR

# Clone local checkout for CABLE
git clone $CABLE_REPO $CABLE_DIR
cd $CABLE_DIR

# Clone the example repo
git clone $EXAMPLE_REPO $TEST_DIR
cd $TEST_DIR
git reset --hard 9bfba54ee8bf23141d95b1abe4b7207b0f3498e2

cat > config.yaml << EOL
project: $PROJECT
realisations:
- repo:
local:
path: $CABLE_DIR
- repo:
git:
branch: main
modules: [
intel-compiler/2021.1.1,
netcdf/4.7.4,
openmpi/4.1.0
]
fluxsite:
experiment: AU-Tum
pbs:
storage:
- scratch/$PROJECT
- gdata/$PROJECT
# This ID is currently configured on the me.org server.
meorg_model_output_id: Sss7qupAHEZ8ovbCv
EOL

benchcab run -v
11 changes: 11 additions & 0 deletions src/benchcab/internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@
}

FLUXSITE_DEFAULT_EXPERIMENT = "forty-two-site-test"
FLUXSITE_DEFAULT_MEORG_MODEL_OUTPUT_ID = False

OPTIONAL_COMMANDS = ["fluxsite-bitwise-cmp", "gen_codecov"]

Expand All @@ -274,3 +275,13 @@ def get_met_forcing_file_names(experiment: str) -> list[str]:
]

return file_names


# Configuration for the client upload
MEORG_CLIENT = dict(
num_threads=1, # Parallel uploads over 4 cores
cache_delay=60 * 5, # 5mins between upload and analysis triggering
mem="8G",
walltime="01:00:00",
storage=["gdata/ks32", "gdata/hh5", "gdata/wd9", "gdata/rp23"],
)
23 changes: 22 additions & 1 deletion src/benchcab/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import sys
from importlib import resources
from pathlib import Path
from typing import Union
from typing import Iterable, Union

import yaml
from jinja2 import BaseLoader, Environment
Expand Down Expand Up @@ -148,3 +148,24 @@ def get_logger(name="benchcab", level="debug"):
def is_verbose():
"""Return True if verbose output is enabled, False otherwise."""
return get_logger().getEffectiveLevel() == logging.DEBUG


def task_summary(tasks: Iterable) -> tuple:
"""Return a summary of task completions.
Parameters
----------
tasks : Iterable
Iterable of tasks with an .is_done() method available.
Returns
-------
tuple
num_tasks, num_complete, num_failed, all_complete
"""
num_tasks = len(tasks)
num_complete = len([task for task in tasks if task.is_done()])
num_failed = num_tasks - num_complete

return num_tasks, num_complete, num_failed, num_complete == num_tasks
Loading

0 comments on commit 9c9a954

Please sign in to comment.