Skip to content

Commit

Permalink
Merge pull request #234 from jameshcorbett/rabbit-configfile
Browse files Browse the repository at this point in the history
dws: rabbit config file support
  • Loading branch information
mergify[bot] authored Nov 5, 2024
2 parents d121c85 + cfc450d commit c3131b6
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 116 deletions.
171 changes: 75 additions & 96 deletions src/modules/coral2_dws.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import urllib3

import flux
import flux.kvs
from flux.hostlist import Hostlist
from flux.job.JobID import id_parse
from flux.constants import FLUX_MSGTYPE_REQUEST
Expand Down Expand Up @@ -277,7 +278,7 @@ def create_cb(handle, _t, msg, arg):
Triggered when a new job with a jobdw directive is submitted.
"""
api_instance, unrestricted_persistent = arg
api_instance, restrict_persistent = arg
dw_directives = msg.payload["dw_directives"]
jobid = msg.payload["jobid"]
userid = msg.payload["userid"]
Expand All @@ -291,7 +292,7 @@ def create_cb(handle, _t, msg, arg):
f"Malformed dw_directives, not list or string: {dw_directives!r}"
)
for directive in dw_directives:
if not unrestricted_persistent and "create_persistent" in directive:
if restrict_persistent and "create_persistent" in directive:
if userid != owner_uid(handle):
raise ValueError(
"only the instance owner can create persistent file systems"
Expand Down Expand Up @@ -592,15 +593,15 @@ def _workflow_state_change_cb_inner(workflow, winfo, handle, k8s_api, disable_fl
WORKFLOWS_IN_TC.discard(winfo)


def drain_offline_nodes(handle, rabbit_name, nodelist, disable_draining, allowlist):
def drain_offline_nodes(handle, rabbit_name, nodelist, allowlist):
"""Drain nodes listed as offline in a given Storage resource.
Drain all the nodes in `nodelist` that are Offline, provided they are
in `allowlist`.
If disable_draining is True, do nothing.
If draining is disabled in the rabbit config table, do nothing.
"""
if disable_draining:
if not handle.conf_get("rabbit.drain_compute_nodes", True):
return
offline_nodes = Hostlist()
for compute_node in nodelist:
Expand Down Expand Up @@ -635,9 +636,7 @@ def mark_rabbit(handle, status, resource_path, ssdcount, name):
handle.rpc("sched-fluxion-resource.set_status", payload).then(log_rpc_response)


def rabbit_state_change_cb(
event, handle, rabbit_rpaths, disable_draining, disable_fluxion, allowlist
):
def rabbit_state_change_cb(event, handle, rabbit_rpaths, disable_fluxion, allowlist):
"""Callback firing when a Storage object changes.
Marks a rabbit as up or down.
Expand Down Expand Up @@ -666,23 +665,20 @@ def rabbit_state_change_cb(
handle,
name,
computes,
disable_draining,
allowlist,
)
# TODO: add some check for whether rabbit capacity has changed
# TODO: update capacity of rabbit in resource graph (mark some slices down?)


def map_rabbits_to_fluxion_paths(graph_path):
def map_rabbits_to_fluxion_paths(handle):
"""Read the fluxion resource graph and map rabbit hostnames to resource paths."""
rabbit_rpaths = {}
try:
with open(graph_path, encoding="utf8") as graph_fd:
nodes = json.load(graph_fd)["scheduling"]["graph"]["nodes"]
nodes = flux.kvs.get(handle, "resource.R")["scheduling"]["graph"]["nodes"]
except Exception as exc:
raise ValueError(
f"Could not load rabbit resource graph data from {graph_path} "
"expected a Flux R file augmented with JGF from 'flux-dws2jgf'"
f"Could not load rabbit resource graph data from KVS's resource.R"
) from exc
for vertex in nodes:
metadata = vertex["metadata"]
Expand All @@ -694,7 +690,7 @@ def map_rabbits_to_fluxion_paths(graph_path):
return rabbit_rpaths


def init_rabbits(k8s_api, handle, watchers, args):
def init_rabbits(k8s_api, handle, watchers, disable_fluxion, drain_queues):
"""Watch every rabbit ('Storage' resources in k8s) known to k8s.
Whenever a Storage resource changes, mark it as 'up' or 'down' in Fluxion.
Expand All @@ -703,19 +699,17 @@ def init_rabbits(k8s_api, handle, watchers, args):
down, because status may have changed while this service was inactive.
"""
api_response = k8s_api.list_namespaced_custom_object(*crd.RABBIT_CRD)
if not args.disable_fluxion:
rabbit_rpaths = map_rabbits_to_fluxion_paths(args.resourcegraph)
if not disable_fluxion:
rabbit_rpaths = map_rabbits_to_fluxion_paths(handle)
else:
rabbit_rpaths = {}
resource_version = 0
if args.drain_queues is not None:
if drain_queues is not None:
rset = flux.resource.resource_list(handle).get().all
allowlist = set(
rset.copy_constraint({"properties": args.drain_queues}).nodelist
)
allowlist = set(rset.copy_constraint({"properties": drain_queues}).nodelist)
if not allowlist:
raise ValueError(
f"No resources found associated with queues {args.drain_queues}"
f"No resources found associated with queues {drain_queues}"
)
else:
allowlist = None
Expand All @@ -724,7 +718,7 @@ def init_rabbits(k8s_api, handle, watchers, args):
resource_version = max(
resource_version, int(rabbit["metadata"]["resourceVersion"])
)
if args.disable_fluxion:
if disable_fluxion:
# don't mark the rabbit up or down but add the rabbit to the mapping
rabbit_rpaths[name] = None
elif name not in rabbit_rpaths:
Expand All @@ -749,7 +743,6 @@ def init_rabbits(k8s_api, handle, watchers, args):
handle,
name,
computes,
args.disable_compute_node_draining,
allowlist,
)
watchers.add_watch(
Expand All @@ -760,8 +753,7 @@ def init_rabbits(k8s_api, handle, watchers, args):
rabbit_state_change_cb,
handle,
rabbit_rpaths,
args.disable_compute_node_draining,
args.disable_fluxion,
disable_fluxion,
allowlist,
)
)
Expand Down Expand Up @@ -821,40 +813,13 @@ def setup_parsing():
default=0,
help="Increase verbosity of output",
)
parser.add_argument(
"--transient-condition-timeout",
"-e",
type=float,
default=10,
metavar="N",
help="Kill workflows in TransientCondition state for more than 'N' seconds",
)
parser.add_argument(
"--kubeconfig",
"-k",
default=None,
metavar="FILE",
help="Path to kubeconfig file to use",
)
parser.add_argument(
"--resourcegraph",
"-r",
default=str(pathlib.Path("/etc/flux/system/R").absolute()),
metavar="FILE",
help="Path to file containing Fluxion JGF resource graph",
)
parser.add_argument(
"--min-allocation-size",
"-m",
default=_MIN_ALLOCATION_SIZE,
metavar="N",
help="Minimum allocation size of rabbit allocations, in bytes",
)
parser.add_argument(
"--disable-compute-node-draining",
action="store_true",
help="Disable the draining of compute nodes based on k8s status",
)
parser.add_argument(
"--drain-queues",
nargs="+",
Expand All @@ -875,33 +840,6 @@ def setup_parsing():
"failures occur back-to-back"
),
)
parser.add_argument(
"--save-datamovements",
metavar="N",
default=5,
type=int,
help="Number of nnfdatamovements to save to job KVS, defaults to 5",
)
parser.add_argument(
"--unrestricted-persistent",
action="store_true",
help=(
"Allow any user to create persistent file systems, not just the instance "
"owner"
),
)
for fs_option, fs_help in (
("xfs", "XFS"),
("gfs2", "GFS2"),
("lustre", "Lustre"),
("raw", "raw"),
):
parser.add_argument(
f"--max-{fs_option}",
metavar="N",
help=f"Maximum {fs_help} capacity per node, in GiB",
type=int,
)
return parser


Expand Down Expand Up @@ -942,14 +880,14 @@ def populate_rabbits_dict(k8s_api):
_RABBITS_TO_HOSTLISTS[nnf["name"]] = hlist.encode()


def register_services(handle, k8s_api, unrestricted_persistent):
def register_services(handle, k8s_api, restrict_persistent):
"""register dws.create, dws.setup, and dws.post_run services."""
serv_reg_fut = handle.service_register("dws")
create_watcher = handle.msg_watcher_create(
create_cb,
FLUX_MSGTYPE_REQUEST,
"dws.create",
args=(k8s_api, unrestricted_persistent),
args=(k8s_api, restrict_persistent),
)
create_watcher.start()
setup_watcher = handle.msg_watcher_create(
Expand All @@ -972,16 +910,11 @@ def raise_self_exception(handle):
testing purposes.
Once https://github.com/flux-framework/flux-core/issues/3821 is
implemented/closed, this can be replaced with that solution.
Also, remove FLUX_KVS_NAMESPACE from the environment, because otherwise
KVS lookups will look relative to that namespace, changing the behavior
relative to when the script runs as a service.
"""
try:
jobid = id_parse(os.environ["FLUX_JOB_ID"])
except KeyError:
return
del os.environ["FLUX_KVS_NAMESPACE"]
Future(handle.job_raise(jobid, "exception", 7, "dws watchers setup")).get()


Expand Down Expand Up @@ -1009,19 +942,62 @@ def kubernetes_backoff(handle, orig_retry_delay):
time.sleep(retry_delay)


def validate_config(config):
"""Validate the `rabbit` config table."""
accepted_keys = {
"save_datamovements",
"kubeconfig",
"tc_timeout",
"drain_compute_nodes",
"restrict_persistent_creation",
"policy",
}
keys = set(config.keys())
if not keys <= accepted_keys:
raise RuntimeError(
f"misconfiguration: unrecognized "
f"`rabbit.{(keys - accepted_keys).pop()}` key in Flux config, accepted "
f"keys are {accepted_keys}"
)
if "policy" in config:
if len(config["policy"]) != 1 or "maximums" not in config["policy"]:
raise RuntimeError(
"`rabbit.policy` config table muxt have a `maximums` table"
)
keys = set(config["policy"]["maximums"].keys())
accepted_keys = set(directivebreakdown.ResourceLimits.TYPES)
if not keys <= accepted_keys:
raise RuntimeError(
f"misconfiguration: unrecognized "
f"`rabbit.policy.maximums.{(keys - accepted_keys).pop()}` key in Flux "
f"config, accepted keys are {accepted_keys}"
)


def main():
"""Init script, begin processing of services."""
args = setup_parsing().parse_args()
_MIN_ALLOCATION_SIZE = args.min_allocation_size
config_logging(args)
WorkflowInfo.save_datamovements = args.save_datamovements
# Remove FLUX_KVS_NAMESPACE from the environment if set, because otherwise
# KVS lookups will look relative to that namespace, but this service
# must operate on the default namespace.
if "FLUX_KVS_NAMESPACE" in os.environ:
del os.environ["FLUX_KVS_NAMESPACE"]
handle = flux.Flux()
validate_config(handle.conf_get("rabbit", {}))
WorkflowInfo.save_datamovements = handle.conf_get("rabbit.save_datamovements", 0)
# set the maximum allowable allocation sizes on the ResourceLimits class
for fs_type in directivebreakdown.ResourceLimits.TYPES:
setattr(
directivebreakdown.ResourceLimits, fs_type, getattr(args, f"max_{fs_type}")
directivebreakdown.ResourceLimits,
fs_type,
handle.conf_get(f"rabbit.policy.maximums.{fs_type}"),
)
try:
k8s_client = k8s.config.new_client_from_config(config_file=args.kubeconfig)
k8s_client = k8s.config.new_client_from_config(
handle.conf_get("rabbit.kubeconfig")
)
except ConfigException:
LOGGER.exception("Kubernetes misconfigured, service will shut down")
sys.exit(_EXITCODE_NORESTART)
Expand All @@ -1036,14 +1012,14 @@ def main():
LOGGER.exception("Cannot access kubernetes, service will shut down")
sys.exit(_EXITCODE_NORESTART)
populate_rabbits_dict(k8s_api)
handle = flux.Flux()
# create a timer watcher for killing workflows that have been stuck in
# the "Error" state for too long
tc_timeout = handle.conf_get("rabbit.tc_timeout", 10)
handle.timer_watcher_create(
args.transient_condition_timeout / 2,
tc_timeout / 2,
kill_workflows_in_tc,
repeat=args.transient_condition_timeout / 2,
args=(args.transient_condition_timeout, k8s_api),
repeat=tc_timeout / 2,
args=(tc_timeout, k8s_api),
).start()
# start watching k8s workflow resources and operate on them when updates occur
# or new RPCs are received
Expand All @@ -1052,9 +1028,12 @@ def main():
k8s_api,
handle,
watchers,
args,
args.disable_fluxion,
args.drain_queues,
)
services = register_services(
handle, k8s_api, handle.conf_get("rabbit.restrict_persistent", True)
)
services = register_services(handle, k8s_api, args.unrestricted_persistent)
watchers.add_watch(
Watch(
k8s_api,
Expand Down
7 changes: 7 additions & 0 deletions t/data/workflow-obj/maximums/rabbit.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# maximum filesystem capacity per node, in GiB
[rabbit.policy.maximums]
xfs = 500
gfs2 = 200
raw = 300
lustre = 100

Loading

0 comments on commit c3131b6

Please sign in to comment.