Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

300 add connection to meorg client to successful run #312

Merged
merged 16 commits into from
Oct 13, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .conda/benchcab-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ channels:
- defaults

dependencies:
- python=3.9
- python<3.13
- payu>=1.0.30
- pip
- f90nml
Expand All @@ -16,6 +16,8 @@ dependencies:
- cerberus>=1.3.5
- gitpython
- jinja2
- hpcpy>=0.5.0
- meorg_client
# CI
- pytest-cov
# Dev Dependencies
Expand All @@ -24,4 +26,4 @@ dependencies:
- black
- ruff
- pip:
- -r mkdocs-requirements.txt
- -r mkdocs-requirements.txt
6 changes: 4 additions & 2 deletions .conda/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ build:

requirements:
host:
- python ==3.9
- python <3.13
- pip
run:
- python ==3.9
- python <3.13
- payu >=1.0.30
- netCDF4
- PyYAML
Expand All @@ -29,3 +29,5 @@ requirements:
- cerberus >=1.3.5
- gitpython
- jinja2
- hpcpy>=0.5.0
- meorg_client
21 changes: 20 additions & 1 deletion docs/user_guide/config_options.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ fluxsite:
walltime: 06:00:00
storage: [scratch/a00, gdata/xy11]
multiprocess: True
meorg_model_output_id: XXXXXXXX
```

### [experiment](#experiment)
Expand Down Expand Up @@ -163,6 +164,14 @@ fluxsites:

```

### [meorg_model_output_id](#meorg_model_output_id)

: **Default:** False, _optional key_. :octicons-dash-24: The unique Model Output ID from modelevaluation.org to which output files will be automatically uploaded for analysis.

A separate upload job will be submitted at the successful completion of benchcab tasks if this key is present, however, the validity is not checked by benchcab at this stage.

Note: It is the user's responsbility to ensure the model output is configured on modelevaluation.org.

## spatial

Contains settings specific to spatial tests.
Expand Down Expand Up @@ -493,4 +502,14 @@ codecov:
[f90nml-github]: https://github.com/marshallward/f90nml
[environment-modules]: https://modules.sourceforge.net/
[nci-pbs-directives]: https://opus.nci.org.au/display/Help/PBS+Directives+Explained
[cable-github]: https://github.com/CABLE-LSM/CABLE
[cable-github]: https://github.com/CABLE-LSM/CABLE

## meorg_bin

: **Default:** False, _optional key. :octicons-dash-24: Specifies the absolute system path to the ME.org client executable. In the absence of this key it will be inferred from the same directory as benchcab should `meorg_model_output_id` be set in `fluxsite` above.

``` yaml

meorg_bin: /path/to/meorg

```
22 changes: 16 additions & 6 deletions src/benchcab/benchcab.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from subprocess import CalledProcessError
from typing import Optional

import benchcab.utils.meorg as bm
from benchcab import fluxsite, internal, spatial
from benchcab.comparison import run_comparisons, run_comparisons_in_parallel
from benchcab.config import read_config
Expand All @@ -22,7 +23,7 @@
from benchcab.environment_modules import EnvironmentModules, EnvironmentModulesInterface
from benchcab.internal import get_met_forcing_file_names
from benchcab.model import Model
from benchcab.utils import is_verbose
from benchcab.utils import is_verbose, task_summary
from benchcab.utils.fs import mkdir, next_path
from benchcab.utils.pbs import render_job_script
from benchcab.utils.repo import create_repo
Expand Down Expand Up @@ -234,14 +235,25 @@
logger.error(exc.output)
raise

logger.info(f"PBS job submitted: {proc.stdout.strip()}")
# Get the job ID
job_id = proc.stdout.strip()

Check warning on line 239 in src/benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

src/benchcab/benchcab.py#L239

Added line #L239 was not covered by tests

logger.info(f"PBS job submitted: {job_id}")

Check warning on line 241 in src/benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

src/benchcab/benchcab.py#L241

Added line #L241 was not covered by tests
logger.info("CABLE log file for each task is written to:")
logger.info(f"{internal.FLUXSITE_DIRS['LOG']}/<task_name>_log.txt")
logger.info("The CABLE standard output for each task is written to:")
logger.info(f"{internal.FLUXSITE_DIRS['TASKS']}/<task_name>/out.txt")
logger.info("The NetCDF output for each task is written to:")
logger.info(f"{internal.FLUXSITE_DIRS['OUTPUT']}/<task_name>_out.nc")

# Upload to meorg by default
bm.do_meorg(

Check warning on line 250 in src/benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

src/benchcab/benchcab.py#L250

Added line #L250 was not covered by tests
config,
upload_dir=internal.FLUXSITE_DIRS["OUTPUT"],
benchcab_bin=str(self.benchcab_exe_path),
benchcab_job_id=job_id,
)

def gen_codecov(self, config_path: str):
"""Endpoint for `benchcab codecov`."""
logger = self._get_logger()
Expand Down Expand Up @@ -347,8 +359,7 @@
else:
fluxsite.run_tasks(tasks)

tasks_failed = [task for task in tasks if not task.is_done()]
n_failed, n_success = len(tasks_failed), len(tasks) - len(tasks_failed)
_, n_success, n_failed, _ = task_summary(tasks)

Check warning on line 362 in src/benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

src/benchcab/benchcab.py#L362

Added line #L362 was not covered by tests
logger.info(f"{n_failed} failed, {n_success} passed")

def fluxsite_bitwise_cmp(self, config_path: str):
Expand Down Expand Up @@ -376,8 +387,7 @@
else:
run_comparisons(comparisons)

tasks_failed = [task for task in comparisons if not task.is_done()]
n_failed, n_success = len(tasks_failed), len(comparisons) - len(tasks_failed)
_, n_success, n_failed, _ = task_summary(comparisons)

Check warning on line 390 in src/benchcab/benchcab.py

View check run for this annotation

Codecov / codecov/patch

src/benchcab/benchcab.py#L390

Added line #L390 was not covered by tests
logger.info(f"{n_failed} failed, {n_success} passed")

def fluxsite(self, config_path: str, no_submit: bool, skip: list[str]):
Expand Down
3 changes: 3 additions & 0 deletions src/benchcab/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ def read_optional_key(config: dict):
config["fluxsite"]["pbs"] = internal.FLUXSITE_DEFAULT_PBS | config["fluxsite"].get(
"pbs", {}
)
config["fluxsite"]["meorg_model_output_id"] = config["fluxsite"].get(
"meorg_model_output_id", internal.FLUXSITE_DEFAULT_MEORG_MODEL_OUTPUT_ID
)

config["codecov"] = config.get("codecov", False)

Expand Down
15 changes: 14 additions & 1 deletion src/benchcab/data/config-schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ fluxsite:
schema:
type: "string"
required: false
meorg_model_output_id:
type:
- "boolean"
- "string"
required: false
default: false

spatial:
type: "dict"
Expand Down Expand Up @@ -134,4 +140,11 @@ spatial:

codecov:
type: "boolean"
required: false
required: false

meorg_bin:
type:
- "boolean"
- "string"
required: False
default: False
43 changes: 43 additions & 0 deletions src/benchcab/data/meorg_jobscript.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#!/bin/bash
#PBS -l wd
#PBS -l ncpus={{num_threads}}
#PBS -l mem={{mem}}
#PBS -l walltime={{walltime}}
#PBS -q copyq
#PBS -P {{project}}
#PBS -j oe
#PBS -m e
#PBS -l storage={{storage_str}}

module purge
{% for module in modules -%}
module load {{module}}
{% endfor %}
set -ev

# Set some things
DATA_DIR={{data_dir}}
NUM_THREADS={{num_threads}}
MODEL_OUTPUT_ID={{model_output_id}}
CACHE_DELAY={{cache_delay}}
MEORG_BIN={{meorg_bin}}

{% if purge_outputs %}
# Purge existing model outputs
echo "Purging existing outputs from $MODEL_OUTPUT_ID"
$MEORG_BIN file detach_all $MODEL_OUTPUT_ID
{% endif %}

# Upload the data
echo "Uploading data to $MODEL_OUTPUT_ID"
$MEORG_BIN file upload $DATA_DIR/*.nc -n $NUM_THREADS --attach_to $MODEL_OUTPUT_ID

# Wait for the cache to transfer to the object store.
echo "Waiting for object store transfer ($CACHE_DELAY sec)"
sleep $CACHE_DELAY

# Trigger the analysis
echo "Triggering analysis on $MODEL_OUTPUT_ID"
$MEORG_BIN analysis start $MODEL_OUTPUT_ID

echo "DONE"
1 change: 1 addition & 0 deletions src/benchcab/data/test/config-optional.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ project: hh5

fluxsite:
experiment: AU-Tum
meorg_model_output_id: False
multiprocess: False
pbs:
ncpus: 6
Expand Down
53 changes: 53 additions & 0 deletions src/benchcab/data/test/integration_meorg.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#!/bin/bash

set -ex

CABLE_REPO="git@github.com:CABLE-LSM/CABLE.git"
CABLE_DIR=/scratch/$PROJECT/$USER/benchcab/CABLE

TEST_DIR=/scratch/$PROJECT/$USER/benchcab/integration
EXAMPLE_REPO="git@github.com:CABLE-LSM/bench_example.git"

# Remove CABLE and test work space, then recreate
rm -rf $CABLE_DIR
mkdir -p $CABLE_DIR

rm -rf $TEST_DIR
mkdir -p $TEST_DIR

# Clone local checkout for CABLE
git clone $CABLE_REPO $CABLE_DIR
cd $CABLE_DIR

# Clone the example repo
git clone $EXAMPLE_REPO $TEST_DIR
cd $TEST_DIR
git reset --hard 9bfba54ee8bf23141d95b1abe4b7207b0f3498e2

cat > config.yaml << EOL
project: $PROJECT

realisations:
- repo:
local:
path: $CABLE_DIR
- repo:
git:
branch: main
modules: [
intel-compiler/2021.1.1,
netcdf/4.7.4,
openmpi/4.1.0
]

fluxsite:
experiment: AU-Tum
pbs:
storage:
- scratch/$PROJECT
- gdata/$PROJECT
# This ID is currently configured on the me.org server.
meorg_model_output_id: Sss7qupAHEZ8ovbCv
EOL

benchcab run -v
11 changes: 11 additions & 0 deletions src/benchcab/internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@
}

FLUXSITE_DEFAULT_EXPERIMENT = "forty-two-site-test"
FLUXSITE_DEFAULT_MEORG_MODEL_OUTPUT_ID = False

OPTIONAL_COMMANDS = ["fluxsite-bitwise-cmp", "gen_codecov"]

Expand All @@ -274,3 +275,13 @@ def get_met_forcing_file_names(experiment: str) -> list[str]:
]

return file_names


# Configuration for the client upload
MEORG_CLIENT = dict(
num_threads=1, # Parallel uploads over 4 cores
cache_delay=60 * 5, # 5mins between upload and analysis triggering
mem="8G",
walltime="01:00:00",
storage=["gdata/ks32", "gdata/hh5", "gdata/wd9", "gdata/rp23"],
)
23 changes: 22 additions & 1 deletion src/benchcab/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import sys
from importlib import resources
from pathlib import Path
from typing import Union
from typing import Iterable, Union

import yaml
from jinja2 import BaseLoader, Environment
Expand Down Expand Up @@ -148,3 +148,24 @@ def get_logger(name="benchcab", level="debug"):
def is_verbose():
"""Return True if verbose output is enabled, False otherwise."""
return get_logger().getEffectiveLevel() == logging.DEBUG


def task_summary(tasks: Iterable) -> tuple:
"""Return a summary of task completions.

Parameters
----------
tasks : Iterable
Iterable of tasks with an .is_done() method available.

Returns
-------
tuple
num_tasks, num_complete, num_failed, all_complete

"""
num_tasks = len(tasks)
num_complete = len([task for task in tasks if task.is_done()])
num_failed = num_tasks - num_complete

return num_tasks, num_complete, num_failed, num_complete == num_tasks
Loading
Loading