diff --git a/.conda/benchcab-dev.yaml b/.conda/benchcab-dev.yaml index b304a3b..3e97206 100644 --- a/.conda/benchcab-dev.yaml +++ b/.conda/benchcab-dev.yaml @@ -6,7 +6,7 @@ channels: - defaults dependencies: - - python=3.9 + - python=3.13 - payu>=1.0.30 - pip - f90nml @@ -16,6 +16,8 @@ dependencies: - cerberus>=1.3.5 - gitpython - jinja2 + - hpcpy>=0.5.0 + - meorg_client # CI - pytest-cov # Dev Dependencies @@ -24,4 +26,4 @@ dependencies: - black - ruff - pip: - - -r mkdocs-requirements.txt \ No newline at end of file + - -r mkdocs-requirements.txt diff --git a/.conda/meta.yaml b/.conda/meta.yaml index 4c61db2..6944735 100644 --- a/.conda/meta.yaml +++ b/.conda/meta.yaml @@ -17,10 +17,10 @@ build: requirements: host: - - python ==3.9 + - python >=3.9,<3.13 - pip run: - - python ==3.9 + - python >=3.9,<3.13 - payu >=1.0.30 - netCDF4 - PyYAML @@ -29,3 +29,5 @@ requirements: - cerberus >=1.3.5 - gitpython - jinja2 + - hpcpy>=0.5.0 + - meorg_client diff --git a/docs/user_guide/config_options.md b/docs/user_guide/config_options.md index fdc76b5..b5803fe 100644 --- a/docs/user_guide/config_options.md +++ b/docs/user_guide/config_options.md @@ -68,6 +68,7 @@ fluxsite: walltime: 06:00:00 storage: [scratch/a00, gdata/xy11] multiprocess: True + meorg_model_output_id: XXXXXXXX ``` ### [experiment](#experiment) @@ -163,6 +164,14 @@ fluxsites: ``` +### [meorg_model_output_id](#meorg_model_output_id) + +: **Default:** False, _optional key_. :octicons-dash-24: The unique Model Output ID from modelevaluation.org to which output files will be automatically uploaded for analysis. + +A separate upload job will be submitted at the successful completion of benchcab tasks if this key is present, however, the validity is not checked by benchcab at this stage. + +Note: It is the user's responsbility to ensure the model output is configured on modelevaluation.org. + ## spatial Contains settings specific to spatial tests. @@ -493,4 +502,14 @@ codecov: [f90nml-github]: https://github.com/marshallward/f90nml [environment-modules]: https://modules.sourceforge.net/ [nci-pbs-directives]: https://opus.nci.org.au/display/Help/PBS+Directives+Explained -[cable-github]: https://github.com/CABLE-LSM/CABLE \ No newline at end of file +[cable-github]: https://github.com/CABLE-LSM/CABLE + +## meorg_bin + +: **Default:** False, _optional key. :octicons-dash-24: Specifies the absolute system path to the ME.org client executable. In the absence of this key it will be inferred from the same directory as benchcab should `meorg_model_output_id` be set in `fluxsite` above. + +``` yaml + +meorg_bin: /path/to/meorg + +``` diff --git a/src/benchcab/benchcab.py b/src/benchcab/benchcab.py index 81b5574..c4d1b9b 100644 --- a/src/benchcab/benchcab.py +++ b/src/benchcab/benchcab.py @@ -11,6 +11,7 @@ from subprocess import CalledProcessError from typing import Optional +import benchcab.utils.meorg as bm from benchcab import fluxsite, internal, spatial from benchcab.comparison import run_comparisons, run_comparisons_in_parallel from benchcab.config import read_config @@ -22,7 +23,7 @@ from benchcab.environment_modules import EnvironmentModules, EnvironmentModulesInterface from benchcab.internal import get_met_forcing_file_names from benchcab.model import Model -from benchcab.utils import is_verbose +from benchcab.utils import is_verbose, task_summary from benchcab.utils.fs import mkdir, next_path from benchcab.utils.pbs import render_job_script from benchcab.utils.repo import create_repo @@ -234,7 +235,10 @@ def fluxsite_submit_job(self, config_path: str, skip: list[str]) -> None: logger.error(exc.output) raise - logger.info(f"PBS job submitted: {proc.stdout.strip()}") + # Get the job ID + job_id = proc.stdout.strip() + + logger.info(f"PBS job submitted: {job_id}") logger.info("CABLE log file for each task is written to:") logger.info(f"{internal.FLUXSITE_DIRS['LOG']}/_log.txt") logger.info("The CABLE standard output for each task is written to:") @@ -242,6 +246,14 @@ def fluxsite_submit_job(self, config_path: str, skip: list[str]) -> None: logger.info("The NetCDF output for each task is written to:") logger.info(f"{internal.FLUXSITE_DIRS['OUTPUT']}/_out.nc") + # Upload to meorg by default + bm.do_meorg( + config, + upload_dir=internal.FLUXSITE_DIRS["OUTPUT"], + benchcab_bin=str(self.benchcab_exe_path), + benchcab_job_id=job_id, + ) + def gen_codecov(self, config_path: str): """Endpoint for `benchcab codecov`.""" logger = self._get_logger() @@ -347,8 +359,7 @@ def fluxsite_run_tasks(self, config_path: str): else: fluxsite.run_tasks(tasks) - tasks_failed = [task for task in tasks if not task.is_done()] - n_failed, n_success = len(tasks_failed), len(tasks) - len(tasks_failed) + _, n_success, n_failed, _ = task_summary(tasks) logger.info(f"{n_failed} failed, {n_success} passed") def fluxsite_bitwise_cmp(self, config_path: str): @@ -376,8 +387,7 @@ def fluxsite_bitwise_cmp(self, config_path: str): else: run_comparisons(comparisons) - tasks_failed = [task for task in comparisons if not task.is_done()] - n_failed, n_success = len(tasks_failed), len(comparisons) - len(tasks_failed) + _, n_success, n_failed, _ = task_summary(comparisons) logger.info(f"{n_failed} failed, {n_success} passed") def fluxsite(self, config_path: str, no_submit: bool, skip: list[str]): diff --git a/src/benchcab/config.py b/src/benchcab/config.py index 102a9bb..14766e1 100644 --- a/src/benchcab/config.py +++ b/src/benchcab/config.py @@ -119,6 +119,9 @@ def read_optional_key(config: dict): config["fluxsite"]["pbs"] = internal.FLUXSITE_DEFAULT_PBS | config["fluxsite"].get( "pbs", {} ) + config["fluxsite"]["meorg_model_output_id"] = config["fluxsite"].get( + "meorg_model_output_id", internal.FLUXSITE_DEFAULT_MEORG_MODEL_OUTPUT_ID + ) config["codecov"] = config.get("codecov", False) diff --git a/src/benchcab/data/config-schema.yml b/src/benchcab/data/config-schema.yml index 07a55f4..654d1a5 100644 --- a/src/benchcab/data/config-schema.yml +++ b/src/benchcab/data/config-schema.yml @@ -107,6 +107,12 @@ fluxsite: schema: type: "string" required: false + meorg_model_output_id: + type: + - "boolean" + - "string" + required: false + default: false spatial: type: "dict" @@ -134,4 +140,11 @@ spatial: codecov: type: "boolean" - required: false \ No newline at end of file + required: false + +meorg_bin: + type: + - "boolean" + - "string" + required: False + default: False \ No newline at end of file diff --git a/src/benchcab/data/meorg_jobscript.j2 b/src/benchcab/data/meorg_jobscript.j2 new file mode 100644 index 0000000..1c89caa --- /dev/null +++ b/src/benchcab/data/meorg_jobscript.j2 @@ -0,0 +1,43 @@ +#!/bin/bash +#PBS -l wd +#PBS -l ncpus={{num_threads}} +#PBS -l mem={{mem}} +#PBS -l walltime={{walltime}} +#PBS -q copyq +#PBS -P {{project}} +#PBS -j oe +#PBS -m e +#PBS -l storage={{storage_str}} + +module purge +{% for module in modules -%} +module load {{module}} +{% endfor %} +set -ev + +# Set some things +DATA_DIR={{data_dir}} +NUM_THREADS={{num_threads}} +MODEL_OUTPUT_ID={{model_output_id}} +CACHE_DELAY={{cache_delay}} +MEORG_BIN={{meorg_bin}} + +{% if purge_outputs %} +# Purge existing model outputs +echo "Purging existing outputs from $MODEL_OUTPUT_ID" +$MEORG_BIN file detach_all $MODEL_OUTPUT_ID +{% endif %} + +# Upload the data +echo "Uploading data to $MODEL_OUTPUT_ID" +$MEORG_BIN file upload $DATA_DIR/*.nc -n $NUM_THREADS --attach_to $MODEL_OUTPUT_ID + +# Wait for the cache to transfer to the object store. +echo "Waiting for object store transfer ($CACHE_DELAY sec)" +sleep $CACHE_DELAY + +# Trigger the analysis +echo "Triggering analysis on $MODEL_OUTPUT_ID" +$MEORG_BIN analysis start $MODEL_OUTPUT_ID + +echo "DONE" \ No newline at end of file diff --git a/src/benchcab/data/test/config-optional.yml b/src/benchcab/data/test/config-optional.yml index f4605e8..e36c436 100644 --- a/src/benchcab/data/test/config-optional.yml +++ b/src/benchcab/data/test/config-optional.yml @@ -3,6 +3,7 @@ project: hh5 fluxsite: experiment: AU-Tum + meorg_model_output_id: False multiprocess: False pbs: ncpus: 6 diff --git a/src/benchcab/data/test/integration_meorg.sh b/src/benchcab/data/test/integration_meorg.sh new file mode 100644 index 0000000..e90c531 --- /dev/null +++ b/src/benchcab/data/test/integration_meorg.sh @@ -0,0 +1,53 @@ +#!/bin/bash + +set -ex + +CABLE_REPO="git@github.com:CABLE-LSM/CABLE.git" +CABLE_DIR=/scratch/$PROJECT/$USER/benchcab/CABLE + +TEST_DIR=/scratch/$PROJECT/$USER/benchcab/integration +EXAMPLE_REPO="git@github.com:CABLE-LSM/bench_example.git" + +# Remove CABLE and test work space, then recreate +rm -rf $CABLE_DIR +mkdir -p $CABLE_DIR + +rm -rf $TEST_DIR +mkdir -p $TEST_DIR + +# Clone local checkout for CABLE +git clone $CABLE_REPO $CABLE_DIR +cd $CABLE_DIR + +# Clone the example repo +git clone $EXAMPLE_REPO $TEST_DIR +cd $TEST_DIR +git reset --hard 9bfba54ee8bf23141d95b1abe4b7207b0f3498e2 + +cat > config.yaml << EOL +project: $PROJECT + +realisations: + - repo: + local: + path: $CABLE_DIR + - repo: + git: + branch: main +modules: [ + intel-compiler/2021.1.1, + netcdf/4.7.4, + openmpi/4.1.0 +] + +fluxsite: + experiment: AU-Tum + pbs: + storage: + - scratch/$PROJECT + - gdata/$PROJECT + # This ID is currently configured on the me.org server. + meorg_model_output_id: Sss7qupAHEZ8ovbCv +EOL + +benchcab run -v diff --git a/src/benchcab/internal.py b/src/benchcab/internal.py index 5c064a8..66af72d 100644 --- a/src/benchcab/internal.py +++ b/src/benchcab/internal.py @@ -252,6 +252,7 @@ } FLUXSITE_DEFAULT_EXPERIMENT = "forty-two-site-test" +FLUXSITE_DEFAULT_MEORG_MODEL_OUTPUT_ID = False OPTIONAL_COMMANDS = ["fluxsite-bitwise-cmp", "gen_codecov"] @@ -274,3 +275,13 @@ def get_met_forcing_file_names(experiment: str) -> list[str]: ] return file_names + + +# Configuration for the client upload +MEORG_CLIENT = dict( + num_threads=1, # Parallel uploads over 4 cores + cache_delay=60 * 5, # 5mins between upload and analysis triggering + mem="8G", + walltime="01:00:00", + storage=["gdata/ks32", "gdata/hh5", "gdata/wd9", "gdata/rp23"], +) diff --git a/src/benchcab/utils/__init__.py b/src/benchcab/utils/__init__.py index db8c427..b628e6e 100644 --- a/src/benchcab/utils/__init__.py +++ b/src/benchcab/utils/__init__.py @@ -11,7 +11,7 @@ import sys from importlib import resources from pathlib import Path -from typing import Union +from typing import Iterable, Union import yaml from jinja2 import BaseLoader, Environment @@ -148,3 +148,24 @@ def get_logger(name="benchcab", level="debug"): def is_verbose(): """Return True if verbose output is enabled, False otherwise.""" return get_logger().getEffectiveLevel() == logging.DEBUG + + +def task_summary(tasks: Iterable) -> tuple: + """Return a summary of task completions. + + Parameters + ---------- + tasks : Iterable + Iterable of tasks with an .is_done() method available. + + Returns + ------- + tuple + num_tasks, num_complete, num_failed, all_complete + + """ + num_tasks = len(tasks) + num_complete = len([task for task in tasks if task.is_done()]) + num_failed = num_tasks - num_complete + + return num_tasks, num_complete, num_failed, num_complete == num_tasks diff --git a/src/benchcab/utils/meorg.py b/src/benchcab/utils/meorg.py new file mode 100644 index 0000000..9ee35df --- /dev/null +++ b/src/benchcab/utils/meorg.py @@ -0,0 +1,105 @@ +"""Utility methods for interacting with the ME.org client.""" + +import os + +from hpcpy import get_client +from meorg_client.client import Client as MeorgClient + +import benchcab.utils as bu +from benchcab.internal import MEORG_CLIENT + + +def do_meorg(config: dict, upload_dir: str, benchcab_bin: str, benchcab_job_id: str): + """Perform the upload of model outputs to modelevaluation.org + + Parameters + ---------- + config : dict + The master config dictionary + upload_dir : str + Absolute path to the data dir for upload + benchcab_bin : str + Path to the benchcab bin, from which to infer the client bin + + Returns + ------- + bool + True if successful, False otherwise + + """ + logger = bu.get_logger() + + model_output_id = config["fluxsite"]["meorg_model_output_id"] + num_threads = MEORG_CLIENT["num_threads"] + + # Check if a model output id has been assigned + if model_output_id == False: + logger.info("No model_output_id found in fluxsite configuration.") + logger.info("NOT uploading to modelevaluation.org") + return False + + # Allow the user to specify an absolute path to the meorg bin in config + meorg_bin = config.get("meorg_bin", False) + + # Otherwise infer the path from the benchcab installation + if meorg_bin == False: + logger.debug(f"Inferring meorg bin from {benchcab_bin}") + bin_segments = benchcab_bin.split("/") + bin_segments[-1] = "meorg" + meorg_bin = "/".join(bin_segments) + + logger.debug(f"meorg_bin = {meorg_bin}") + + # Now, check if that actually exists + if os.path.isfile(meorg_bin) == False: + logger.error(f"No meorg_client executable found at {meorg_bin}") + logger.error("NOT uploading to modelevaluation.org") + return False + + # Also only run if the client is initialised + if MeorgClient().is_initialised() == False: + + logger.warn( + "A model_output_id has been supplied, but the meorg_client is not initialised." + ) + logger.warn( + "To initialise, run `meorg initialise` in the installation environment." + ) + logger.warn( + "Once initialised, the outputs from this run can be uploaded with the following command:" + ) + logger.warn( + f"meorg file upload {upload_dir}/*.nc -n {num_threads} --attach_to {model_output_id}" + ) + logger.warn("Then the analysis can be triggered with:") + logger.warn(f"meorg analysis start {model_output_id}") + return False + + # Finally, attempt the upload! + else: + + logger.info("Uploading outputs to modelevaluation.org") + + # Submit the outputs + client = get_client() + meorg_jobid = client.submit( + bu.get_installed_root() / "data" / "meorg_jobscript.j2", + render=True, + dry_run=False, + depends_on=benchcab_job_id, + # Interpolate into the job script + model_output_id=model_output_id, + data_dir=upload_dir, + cache_delay=MEORG_CLIENT["cache_delay"], + mem=MEORG_CLIENT["mem"], + num_threads=MEORG_CLIENT["num_threads"], + walltime=MEORG_CLIENT["walltime"], + storage=MEORG_CLIENT["storage"], + project=config["project"], + modules=config["modules"], + purge_outputs=True, + meorg_bin=meorg_bin, + ) + + logger.info(f"Upload job submitted: {meorg_jobid}") + return True diff --git a/tests/test_comparison.py b/tests/test_comparison.py index cec5125..568b1c2 100644 --- a/tests/test_comparison.py +++ b/tests/test_comparison.py @@ -8,6 +8,7 @@ from pathlib import Path import pytest + from benchcab import internal from benchcab.comparison import ComparisonTask diff --git a/tests/test_config.py b/tests/test_config.py index 5f741be..e995d99 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -75,6 +75,7 @@ def all_optional_default_config(no_optional_config) -> dict: "experiment": bi.FLUXSITE_DEFAULT_EXPERIMENT, "multiprocess": bi.FLUXSITE_DEFAULT_MULTIPROCESS, "pbs": bi.FLUXSITE_DEFAULT_PBS, + "meorg_model_output_id": bi.FLUXSITE_DEFAULT_MEORG_MODEL_OUTPUT_ID }, "science_configurations": bi.DEFAULT_SCIENCE_CONFIGURATIONS, "spatial": { @@ -106,6 +107,7 @@ def all_optional_custom_config(no_optional_config) -> dict: "walltime": "10:00:00", "storage": ["scratch/$PROJECT"], }, + "meorg_model_output_id": False }, "science_configurations": [ { diff --git a/tests/test_fluxsite.py b/tests/test_fluxsite.py index e4732ba..a7558d5 100644 --- a/tests/test_fluxsite.py +++ b/tests/test_fluxsite.py @@ -10,6 +10,7 @@ import f90nml import netCDF4 import pytest + from benchcab import __version__, internal from benchcab.fluxsite import ( CableError, diff --git a/tests/test_fs.py b/tests/test_fs.py index 23d95d0..3101699 100644 --- a/tests/test_fs.py +++ b/tests/test_fs.py @@ -10,6 +10,7 @@ from pathlib import Path import pytest + from benchcab.utils.fs import chdir, mkdir, next_path, prepend_path diff --git a/tests/test_spatial.py b/tests/test_spatial.py index aed4283..7a0d950 100644 --- a/tests/test_spatial.py +++ b/tests/test_spatial.py @@ -10,6 +10,7 @@ import f90nml import pytest import yaml + from benchcab import internal from benchcab.model import Model from benchcab.spatial import SpatialTask, get_spatial_tasks diff --git a/tests/test_state.py b/tests/test_state.py index 51738bd..cda2e73 100644 --- a/tests/test_state.py +++ b/tests/test_state.py @@ -1,39 +1,43 @@ import time from pathlib import Path +from tempfile import TemporaryDirectory import pytest -from benchcab.utils.state import State, StateAttributeError - -@pytest.fixture() -def state(): - """Return a State object.""" - return State(state_dir=Path("my_state")) +from benchcab.utils.state import State, StateAttributeError -def test_state_is_set(state): +def test_state_is_set(): """Success case: test state is set.""" - state.set("foo") - assert state.is_set("foo") + with TemporaryDirectory() as tmp_dir: + state = State(state_dir=Path(tmp_dir)) + state.set("foo") + assert state.is_set("foo") -def test_state_reset(state): +def test_state_reset(): """Success case: test state is reset.""" - state.set("foo") - state.reset() - assert not state.is_set("foo") + with TemporaryDirectory() as tmp_dir: + state = State(state_dir=Path(tmp_dir)) + state.set("foo") + state.reset() + assert not state.is_set("foo") -def test_state_get(state): +def test_state_get(): """Success case: test get() returns the most recent state attribute.""" - state.set("foo") - # This is done so that time stamps can be resolved between state attributes - time.sleep(0.01) - state.set("bar") - assert state.get() == "bar" + with TemporaryDirectory() as tmp_dir: + state = State(state_dir=Path(tmp_dir)) + state.set("foo") + # This is done so that time stamps can be resolved between state attributes + time.sleep(1) + state.set("bar") + assert state.get() == "bar" -def test_state_get_raises_exception(state): +def test_state_get_raises_exception(): """Failure case: test get() raises an exception when no attributes are set.""" - with pytest.raises(StateAttributeError): - state.get() + with TemporaryDirectory() as tmp_dir: + state = State(state_dir=Path(tmp_dir)) + with pytest.raises(StateAttributeError): + state.get() diff --git a/tests/test_utils.py b/tests/test_utils.py index a99fadd..d6ce264 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -3,6 +3,7 @@ import pytest import benchcab.utils as bu +from benchcab.comparison import ComparisonTask def test_get_installed_root(): @@ -60,3 +61,23 @@ def test_get_logger_singleton_fail(): logger2 = bu.get_logger(name="benchcab2") assert logger1 is not logger2 + + +def test_task_summary(): + + # Create some mocked tasks + t1 = ComparisonTask(files=(), task_name="t1") + t2 = ComparisonTask(files=(), task_name="t2") + + # Inject success/fail cases + t1.is_done = lambda: True + t2.is_done = lambda: False + + # Run the function + n_tasks, n_success, n_failed, all_complete = bu.task_summary([t1, t2]) + + # Check correct results + assert n_tasks == 2 + assert n_success == 1 + assert n_failed == 1 + assert all_complete == False