From 59509729de4d581e283c7d1477e15fbba66821f7 Mon Sep 17 00:00:00 2001 From: James Corbett Date: Sun, 3 Nov 2024 22:33:12 -0800 Subject: [PATCH 1/8] dws: support file system maximums in config Problem: a number of rabbit configuration options, such as the maximum size of file system users can request, are set by command-line options to the coral2-dws service. Move the file system size options to a config file and make coral2-dws read from it. Drop the command-line options. --- src/modules/coral2_dws.py | 18 ++++-------------- t/data/workflow-obj/maximums/rabbit.toml | 7 +++++++ t/t1002-dws-workflow-obj.t | 4 ++-- 3 files changed, 13 insertions(+), 16 deletions(-) create mode 100644 t/data/workflow-obj/maximums/rabbit.toml diff --git a/src/modules/coral2_dws.py b/src/modules/coral2_dws.py index f5f611db..cf37e6df 100755 --- a/src/modules/coral2_dws.py +++ b/src/modules/coral2_dws.py @@ -890,18 +890,6 @@ def setup_parsing(): "owner" ), ) - for fs_option, fs_help in ( - ("xfs", "XFS"), - ("gfs2", "GFS2"), - ("lustre", "Lustre"), - ("raw", "raw"), - ): - parser.add_argument( - f"--max-{fs_option}", - metavar="N", - help=f"Maximum {fs_help} capacity per node, in GiB", - type=int, - ) return parser @@ -1014,11 +1002,14 @@ def main(): args = setup_parsing().parse_args() _MIN_ALLOCATION_SIZE = args.min_allocation_size config_logging(args) + handle = flux.Flux() WorkflowInfo.save_datamovements = args.save_datamovements # set the maximum allowable allocation sizes on the ResourceLimits class for fs_type in directivebreakdown.ResourceLimits.TYPES: setattr( - directivebreakdown.ResourceLimits, fs_type, getattr(args, f"max_{fs_type}") + directivebreakdown.ResourceLimits, + fs_type, + handle.conf_get(f"rabbit.policy.maximums.{fs_type}"), ) try: k8s_client = k8s.config.new_client_from_config(config_file=args.kubeconfig) @@ -1036,7 +1027,6 @@ def main(): LOGGER.exception("Cannot access kubernetes, service will shut down") sys.exit(_EXITCODE_NORESTART) populate_rabbits_dict(k8s_api) - handle = flux.Flux() # create a timer watcher for killing workflows that have been stuck in # the "Error" state for too long handle.timer_watcher_create( diff --git a/t/data/workflow-obj/maximums/rabbit.toml b/t/data/workflow-obj/maximums/rabbit.toml new file mode 100644 index 00000000..e0c1d9b5 --- /dev/null +++ b/t/data/workflow-obj/maximums/rabbit.toml @@ -0,0 +1,7 @@ +# maximum filesystem capacity per node, in GiB +[rabbit.policy.maximums] +xfs = 500 +gfs2 = 200 +raw = 300 +lustre = 100 + diff --git a/t/t1002-dws-workflow-obj.t b/t/t1002-dws-workflow-obj.t index 7aea84e2..4442b857 100755 --- a/t/t1002-dws-workflow-obj.t +++ b/t/t1002-dws-workflow-obj.t @@ -474,11 +474,11 @@ test_expect_success 'back-to-back job submissions with 10TiB file systems works' test_expect_success 'launch service with storage maximum arguments' ' flux cancel $DWS_JOBID && + flux config load ${DATADIR}/maximums && DWS_JOBID=$(flux submit \ --setattr=system.alloc-bypass.R="$R" \ -o per-resource.type=node --output=dws4.out --error=dws4.err \ - python ${DWS_MODULE_PATH} -e1 --kubeconfig $PWD/kubeconfig -vvv -rR.local \ - --max-xfs 500 --max-lustre 100 --max-gfs2 200 --max-raw 300) && + python ${DWS_MODULE_PATH} -e1 --kubeconfig $PWD/kubeconfig -vvv -rR.local) && flux job wait-event -vt 15 -m "note=dws watchers setup" ${DWS_JOBID} exception && ${RPC} "dws.create" ' From 7a35be7d8e1faccd836fd30290c50b2cf093b21d Mon Sep 17 00:00:00 2001 From: James Corbett Date: Sun, 3 Nov 2024 22:48:53 -0800 Subject: [PATCH 2/8] dws: move command-line options to config Problem: a number of rabbit configuration options, such as the number of nnfdatamovement resources to save and whether or not to restrict the creation of persistent file systems to the instance owner, are set by command-line options to the coral2-dws service. Move those parameters to a config file and drop the command-line options. --- src/modules/coral2_dws.py | 29 ++++++++--------------------- 1 file changed, 8 insertions(+), 21 deletions(-) diff --git a/src/modules/coral2_dws.py b/src/modules/coral2_dws.py index cf37e6df..ae4625d8 100755 --- a/src/modules/coral2_dws.py +++ b/src/modules/coral2_dws.py @@ -277,7 +277,7 @@ def create_cb(handle, _t, msg, arg): Triggered when a new job with a jobdw directive is submitted. """ - api_instance, unrestricted_persistent = arg + api_instance, restrict_persistent = arg dw_directives = msg.payload["dw_directives"] jobid = msg.payload["jobid"] userid = msg.payload["userid"] @@ -291,7 +291,7 @@ def create_cb(handle, _t, msg, arg): f"Malformed dw_directives, not list or string: {dw_directives!r}" ) for directive in dw_directives: - if not unrestricted_persistent and "create_persistent" in directive: + if restrict_persistent and "create_persistent" in directive: if userid != owner_uid(handle): raise ValueError( "only the instance owner can create persistent file systems" @@ -875,21 +875,6 @@ def setup_parsing(): "failures occur back-to-back" ), ) - parser.add_argument( - "--save-datamovements", - metavar="N", - default=5, - type=int, - help="Number of nnfdatamovements to save to job KVS, defaults to 5", - ) - parser.add_argument( - "--unrestricted-persistent", - action="store_true", - help=( - "Allow any user to create persistent file systems, not just the instance " - "owner" - ), - ) return parser @@ -930,14 +915,14 @@ def populate_rabbits_dict(k8s_api): _RABBITS_TO_HOSTLISTS[nnf["name"]] = hlist.encode() -def register_services(handle, k8s_api, unrestricted_persistent): +def register_services(handle, k8s_api, restrict_persistent): """register dws.create, dws.setup, and dws.post_run services.""" serv_reg_fut = handle.service_register("dws") create_watcher = handle.msg_watcher_create( create_cb, FLUX_MSGTYPE_REQUEST, "dws.create", - args=(k8s_api, unrestricted_persistent), + args=(k8s_api, restrict_persistent), ) create_watcher.start() setup_watcher = handle.msg_watcher_create( @@ -1003,7 +988,7 @@ def main(): _MIN_ALLOCATION_SIZE = args.min_allocation_size config_logging(args) handle = flux.Flux() - WorkflowInfo.save_datamovements = args.save_datamovements + WorkflowInfo.save_datamovements = handle.conf_get("rabbit.save_datamovements", 0) # set the maximum allowable allocation sizes on the ResourceLimits class for fs_type in directivebreakdown.ResourceLimits.TYPES: setattr( @@ -1044,7 +1029,9 @@ def main(): watchers, args, ) - services = register_services(handle, k8s_api, args.unrestricted_persistent) + services = register_services( + handle, k8s_api, handle.conf_get("rabbit.restrict_persistent", True) + ) watchers.add_watch( Watch( k8s_api, From dfc55cbe1d7c28101e57e04f5059439c0c5c633c Mon Sep 17 00:00:00 2001 From: James Corbett Date: Sun, 3 Nov 2024 22:57:42 -0800 Subject: [PATCH 3/8] dws: support toggling rabbit draining in config Problem: the option to disable the draining of compute nodes that lose connection with their local rabbit is set by a command-line option to the coral2-dws service. It should a config file option. Move the draining option to a config file and make coral2-dws read from it. Drop the command-line option. --- src/modules/coral2_dws.py | 16 ++++------------ t/t1003-dws-nnf-watch.t | 17 +++++++++++------ 2 files changed, 15 insertions(+), 18 deletions(-) diff --git a/src/modules/coral2_dws.py b/src/modules/coral2_dws.py index ae4625d8..ad374383 100755 --- a/src/modules/coral2_dws.py +++ b/src/modules/coral2_dws.py @@ -592,15 +592,15 @@ def _workflow_state_change_cb_inner(workflow, winfo, handle, k8s_api, disable_fl WORKFLOWS_IN_TC.discard(winfo) -def drain_offline_nodes(handle, rabbit_name, nodelist, disable_draining, allowlist): +def drain_offline_nodes(handle, rabbit_name, nodelist, allowlist): """Drain nodes listed as offline in a given Storage resource. Drain all the nodes in `nodelist` that are Offline, provided they are in `allowlist`. - If disable_draining is True, do nothing. + If draining is disabled in the rabbit config table, do nothing. """ - if disable_draining: + if not handle.conf_get("rabbit.drain_compute_nodes", True): return offline_nodes = Hostlist() for compute_node in nodelist: @@ -636,7 +636,7 @@ def mark_rabbit(handle, status, resource_path, ssdcount, name): def rabbit_state_change_cb( - event, handle, rabbit_rpaths, disable_draining, disable_fluxion, allowlist + event, handle, rabbit_rpaths, disable_fluxion, allowlist ): """Callback firing when a Storage object changes. @@ -666,7 +666,6 @@ def rabbit_state_change_cb( handle, name, computes, - disable_draining, allowlist, ) # TODO: add some check for whether rabbit capacity has changed @@ -749,7 +748,6 @@ def init_rabbits(k8s_api, handle, watchers, args): handle, name, computes, - args.disable_compute_node_draining, allowlist, ) watchers.add_watch( @@ -760,7 +758,6 @@ def init_rabbits(k8s_api, handle, watchers, args): rabbit_state_change_cb, handle, rabbit_rpaths, - args.disable_compute_node_draining, args.disable_fluxion, allowlist, ) @@ -850,11 +847,6 @@ def setup_parsing(): metavar="N", help="Minimum allocation size of rabbit allocations, in bytes", ) - parser.add_argument( - "--disable-compute-node-draining", - action="store_true", - help="Disable the draining of compute nodes based on k8s status", - ) parser.add_argument( "--drain-queues", nargs="+", diff --git a/t/t1003-dws-nnf-watch.t b/t/t1003-dws-nnf-watch.t index 63711887..e67a22cb 100755 --- a/t/t1003-dws-nnf-watch.t +++ b/t/t1003-dws-nnf-watch.t @@ -131,16 +131,19 @@ test_expect_success 'test that flux drains Offline compute nodes' ' test_must_fail bash -c "flux resource drain | grep compute-01" ' -test_expect_success 'exec Storage watching script with --disable-draining' ' +test_expect_success 'exec Storage watching script with draining disabled' ' flux cancel ${jobid} && + echo " +[rabbit] +drain_compute_nodes = false + " | flux config load && jobid=$(flux submit \ --setattr=system.alloc-bypass.R="$(flux R encode -r0)" --output=dws2.out --error=dws2.err \ - -o per-resource.type=node flux python ${DWS_MODULE_PATH} -vvv -rR.local \ - --disable-compute-node-draining) && + -o per-resource.type=node flux python ${DWS_MODULE_PATH} -vvv -rR.local) && flux job wait-event -vt 15 -p guest.exec.eventlog ${jobid} shell.start ' -test_expect_success 'test that flux does not drain Offline compute nodes with --disable-draining' ' +test_expect_success 'test that flux does not drain Offline compute nodes with draining disabled' ' kubectl get storages kind-worker2 -ojson | jq -e ".spec.mode == \"Testing\"" && kubectl patch storage kind-worker2 --subresource=status --type=json \ -p "[{\"op\":\"replace\", \"path\":\"/status/access/computes/0/status\", \"value\": \"Ready\"}]" && @@ -149,8 +152,9 @@ test_expect_success 'test that flux does not drain Offline compute nodes with -- kubectl patch storage kind-worker2 --subresource=status --type=json \ -p "[{\"op\":\"replace\", \"path\":\"/status/access/computes/0/status\", \"value\": \"Disabled\"}]" && kubectl get storages kind-worker2 -ojson | jq -e ".status.access.computes[0].status == \"Disabled\"" && - sleep 2 && - test_must_fail bash -c "flux resource drain | grep compute-01" + sleep 5 && + test_must_fail bash -c "flux resource drain | grep compute-01" && + test_must_fail flux job wait-event -vt 1 ${jobid} finish ' test_expect_success 'return the storage resource to Live mode' ' @@ -183,6 +187,7 @@ test_expect_success 'configure flux with queues' ' ' test_expect_success 'exec Storage watching script with --drain-queues' ' + flux config reload && jobid=$(flux submit \ --setattr=system.alloc-bypass.R="$(flux R encode -r0)" --output=dws4.out --error=dws4.err \ -o per-resource.type=node flux python ${DWS_MODULE_PATH} -vvv -rR.local.queues \ From 2d5636f9f1899104434ec9faa1c1fe18e0699959 Mon Sep 17 00:00:00 2001 From: James Corbett Date: Mon, 4 Nov 2024 08:13:50 -0800 Subject: [PATCH 4/8] dws: drop resourcegraph command-line option Problem: the coral2_dws script has a command-line option to set the path of the file from which it reads JGF for the rabbit resources it will operate. However, that is unnecessary and unhelpful because it could instead fetch the `resource.R` KVS key, which is a more reliable option. Drop the command-line argument and make the script read from the KVS. --- src/modules/coral2_dws.py | 28 ++++++++++------------------ t/t1002-dws-workflow-obj.t | 14 +++++++------- t/t1003-dws-nnf-watch.t | 9 ++++----- 3 files changed, 21 insertions(+), 30 deletions(-) diff --git a/src/modules/coral2_dws.py b/src/modules/coral2_dws.py index ad374383..eb765d72 100755 --- a/src/modules/coral2_dws.py +++ b/src/modules/coral2_dws.py @@ -24,6 +24,7 @@ import urllib3 import flux +import flux.kvs from flux.hostlist import Hostlist from flux.job.JobID import id_parse from flux.constants import FLUX_MSGTYPE_REQUEST @@ -672,16 +673,14 @@ def rabbit_state_change_cb( # TODO: update capacity of rabbit in resource graph (mark some slices down?) -def map_rabbits_to_fluxion_paths(graph_path): +def map_rabbits_to_fluxion_paths(handle): """Read the fluxion resource graph and map rabbit hostnames to resource paths.""" rabbit_rpaths = {} try: - with open(graph_path, encoding="utf8") as graph_fd: - nodes = json.load(graph_fd)["scheduling"]["graph"]["nodes"] + nodes = flux.kvs.get(handle, "resource.R")["scheduling"]["graph"]["nodes"] except Exception as exc: raise ValueError( - f"Could not load rabbit resource graph data from {graph_path} " - "expected a Flux R file augmented with JGF from 'flux-dws2jgf'" + f"Could not load rabbit resource graph data from KVS's resource.R" ) from exc for vertex in nodes: metadata = vertex["metadata"] @@ -703,7 +702,7 @@ def init_rabbits(k8s_api, handle, watchers, args): """ api_response = k8s_api.list_namespaced_custom_object(*crd.RABBIT_CRD) if not args.disable_fluxion: - rabbit_rpaths = map_rabbits_to_fluxion_paths(args.resourcegraph) + rabbit_rpaths = map_rabbits_to_fluxion_paths(handle) else: rabbit_rpaths = {} resource_version = 0 @@ -833,13 +832,6 @@ def setup_parsing(): metavar="FILE", help="Path to kubeconfig file to use", ) - parser.add_argument( - "--resourcegraph", - "-r", - default=str(pathlib.Path("/etc/flux/system/R").absolute()), - metavar="FILE", - help="Path to file containing Fluxion JGF resource graph", - ) parser.add_argument( "--min-allocation-size", "-m", @@ -937,16 +929,11 @@ def raise_self_exception(handle): testing purposes. Once https://github.com/flux-framework/flux-core/issues/3821 is implemented/closed, this can be replaced with that solution. - - Also, remove FLUX_KVS_NAMESPACE from the environment, because otherwise - KVS lookups will look relative to that namespace, changing the behavior - relative to when the script runs as a service. """ try: jobid = id_parse(os.environ["FLUX_JOB_ID"]) except KeyError: return - del os.environ["FLUX_KVS_NAMESPACE"] Future(handle.job_raise(jobid, "exception", 7, "dws watchers setup")).get() @@ -979,6 +966,11 @@ def main(): args = setup_parsing().parse_args() _MIN_ALLOCATION_SIZE = args.min_allocation_size config_logging(args) + # Remove FLUX_KVS_NAMESPACE from the environment if set, because otherwise + # KVS lookups will look relative to that namespace, but this service + # must operate on the default namespace. + if "FLUX_KVS_NAMESPACE" in os.environ: + del os.environ["FLUX_KVS_NAMESPACE"] handle = flux.Flux() WorkflowInfo.save_datamovements = handle.conf_get("rabbit.save_datamovements", 0) # set the maximum allowable allocation sizes on the ResourceLimits class diff --git a/t/t1002-dws-workflow-obj.t b/t/t1002-dws-workflow-obj.t index 4442b857..19907858 100755 --- a/t/t1002-dws-workflow-obj.t +++ b/t/t1002-dws-workflow-obj.t @@ -42,11 +42,11 @@ test_expect_success 'job-manager: load dws-jobtap and alloc-bypass plugin' ' test_expect_success 'exec dws service-providing script with bad arguments' ' KUBECONFIG=/dev/null test_expect_code 3 flux python ${DWS_MODULE_PATH} \ - -e1 -v -rR.local && - test_expect_code 3 flux python ${DWS_MODULE_PATH} -e1 -v -rR.local \ + -e1 -v && + test_expect_code 3 flux python ${DWS_MODULE_PATH} -e1 -v \ --kubeconfig /dev/null && test_expect_code 2 flux python ${DWS_MODULE_PATH} \ - -e1 -v -rR.local --foobar + -e1 -v --foobar ' test_expect_success 'exec dws service-providing script with fluxion scheduling disabled' ' @@ -109,7 +109,7 @@ test_expect_success 'exec dws service-providing script' ' DWS_JOBID=$(flux submit \ --setattr=system.alloc-bypass.R="$R" \ -o per-resource.type=node --output=dws1.out --error=dws1.err \ - python ${DWS_MODULE_PATH} -e1 -vvv -rR.local) && + python ${DWS_MODULE_PATH} -e1 -vvv) && flux job wait-event -vt 15 -p guest.exec.eventlog ${DWS_JOBID} shell.start ' @@ -336,7 +336,7 @@ test_expect_success 'exec dws service-providing script with custom config path' DWS_JOBID=$(flux submit \ --setattr=system.alloc-bypass.R="$R" \ -o per-resource.type=node --output=dws2.out --error=dws2.err \ - python ${DWS_MODULE_PATH} -e1 --kubeconfig $PWD/kubeconfig -vvv -rR.local) && + python ${DWS_MODULE_PATH} -e1 --kubeconfig $PWD/kubeconfig -vvv) && flux job wait-event -vt 15 -m "note=dws watchers setup" ${DWS_JOBID} exception && ${RPC} "dws.create" ' @@ -420,7 +420,7 @@ test_expect_success 'dws service script handles restarts while a job is running' DWS_JOBID=$(flux submit \ --setattr=system.alloc-bypass.R="$R" \ -o per-resource.type=node --output=dws3.out --error=dws3.err \ - python ${DWS_MODULE_PATH} -e1 --kubeconfig $PWD/kubeconfig -vvv -rR.local) && + python ${DWS_MODULE_PATH} -e1 --kubeconfig $PWD/kubeconfig -vvv) && flux job wait-event -vt 5 -m status=0 ${jobid} finish && flux job wait-event -vt 5 -m description=${EPILOG_NAME} \ ${jobid} epilog-start && @@ -478,7 +478,7 @@ test_expect_success 'launch service with storage maximum arguments' ' DWS_JOBID=$(flux submit \ --setattr=system.alloc-bypass.R="$R" \ -o per-resource.type=node --output=dws4.out --error=dws4.err \ - python ${DWS_MODULE_PATH} -e1 --kubeconfig $PWD/kubeconfig -vvv -rR.local) && + python ${DWS_MODULE_PATH} -e1 --kubeconfig $PWD/kubeconfig -vvv) && flux job wait-event -vt 15 -m "note=dws watchers setup" ${DWS_JOBID} exception && ${RPC} "dws.create" ' diff --git a/t/t1003-dws-nnf-watch.t b/t/t1003-dws-nnf-watch.t index e67a22cb..7288ba81 100755 --- a/t/t1003-dws-nnf-watch.t +++ b/t/t1003-dws-nnf-watch.t @@ -52,7 +52,7 @@ test_expect_success 'rabbits default to down and are not allocated' ' test_expect_success 'exec Storage watching script' ' jobid=$(flux submit \ --setattr=system.alloc-bypass.R="$(flux R encode -r0)" --output=dws1.out --error=dws1.err \ - -o per-resource.type=node flux python ${DWS_MODULE_PATH} -vvv -rR.local) && + -o per-resource.type=node flux python ${DWS_MODULE_PATH} -vvv) && flux job wait-event -vt 15 -p guest.exec.eventlog ${jobid} shell.start ' @@ -139,7 +139,7 @@ drain_compute_nodes = false " | flux config load && jobid=$(flux submit \ --setattr=system.alloc-bypass.R="$(flux R encode -r0)" --output=dws2.out --error=dws2.err \ - -o per-resource.type=node flux python ${DWS_MODULE_PATH} -vvv -rR.local) && + -o per-resource.type=node flux python ${DWS_MODULE_PATH} -vvv) && flux job wait-event -vt 15 -p guest.exec.eventlog ${jobid} shell.start ' @@ -168,7 +168,7 @@ test_expect_success 'exec Storage watching script with invalid --drain-queues ar flux cancel ${jobid} && jobid=$(flux submit \ --setattr=system.alloc-bypass.R="$(flux R encode -r0)" --output=dws3.out --error=dws3.err \ - -o per-resource.type=node flux python ${DWS_MODULE_PATH} -vvv -rR.local \ + -o per-resource.type=node flux python ${DWS_MODULE_PATH} -vvv \ --drain-queues notaqueue alsonotaqueue) && flux job wait-event -vt 5 ${jobid} finish && test_must_fail flux job attach ${jobid} @@ -190,8 +190,7 @@ test_expect_success 'exec Storage watching script with --drain-queues' ' flux config reload && jobid=$(flux submit \ --setattr=system.alloc-bypass.R="$(flux R encode -r0)" --output=dws4.out --error=dws4.err \ - -o per-resource.type=node flux python ${DWS_MODULE_PATH} -vvv -rR.local.queues \ - --drain-queues debug) && + -o per-resource.type=node flux python ${DWS_MODULE_PATH} -vvv --drain-queues debug) && kubectl patch storage kind-worker2 --type=json \ -p "[{\"op\":\"replace\", \"path\":\"/spec/mode\", \"value\": \"Testing\"}]" && kubectl get storages kind-worker2 -ojson | jq -e ".spec.mode == \"Testing\"" && From d2ea2cf2b9ac0b15576e25b95e8cc7b4ca672ae1 Mon Sep 17 00:00:00 2001 From: James Corbett Date: Mon, 4 Nov 2024 15:28:45 -0800 Subject: [PATCH 5/8] dws: pass arguments more explicitly Problem: a function receives an argparse Namespace object with a number of variables set on it, but only uses two of them. Pass the two variables the function needs rather than the whole namespace object. --- src/modules/coral2_dws.py | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/src/modules/coral2_dws.py b/src/modules/coral2_dws.py index eb765d72..e6734b6c 100755 --- a/src/modules/coral2_dws.py +++ b/src/modules/coral2_dws.py @@ -636,9 +636,7 @@ def mark_rabbit(handle, status, resource_path, ssdcount, name): handle.rpc("sched-fluxion-resource.set_status", payload).then(log_rpc_response) -def rabbit_state_change_cb( - event, handle, rabbit_rpaths, disable_fluxion, allowlist -): +def rabbit_state_change_cb(event, handle, rabbit_rpaths, disable_fluxion, allowlist): """Callback firing when a Storage object changes. Marks a rabbit as up or down. @@ -692,7 +690,7 @@ def map_rabbits_to_fluxion_paths(handle): return rabbit_rpaths -def init_rabbits(k8s_api, handle, watchers, args): +def init_rabbits(k8s_api, handle, watchers, disable_fluxion, drain_queues): """Watch every rabbit ('Storage' resources in k8s) known to k8s. Whenever a Storage resource changes, mark it as 'up' or 'down' in Fluxion. @@ -701,19 +699,17 @@ def init_rabbits(k8s_api, handle, watchers, args): down, because status may have changed while this service was inactive. """ api_response = k8s_api.list_namespaced_custom_object(*crd.RABBIT_CRD) - if not args.disable_fluxion: + if not disable_fluxion: rabbit_rpaths = map_rabbits_to_fluxion_paths(handle) else: rabbit_rpaths = {} resource_version = 0 - if args.drain_queues is not None: + if drain_queues is not None: rset = flux.resource.resource_list(handle).get().all - allowlist = set( - rset.copy_constraint({"properties": args.drain_queues}).nodelist - ) + allowlist = set(rset.copy_constraint({"properties": drain_queues}).nodelist) if not allowlist: raise ValueError( - f"No resources found associated with queues {args.drain_queues}" + f"No resources found associated with queues {drain_queues}" ) else: allowlist = None @@ -722,7 +718,7 @@ def init_rabbits(k8s_api, handle, watchers, args): resource_version = max( resource_version, int(rabbit["metadata"]["resourceVersion"]) ) - if args.disable_fluxion: + if disable_fluxion: # don't mark the rabbit up or down but add the rabbit to the mapping rabbit_rpaths[name] = None elif name not in rabbit_rpaths: @@ -757,7 +753,7 @@ def init_rabbits(k8s_api, handle, watchers, args): rabbit_state_change_cb, handle, rabbit_rpaths, - args.disable_fluxion, + disable_fluxion, allowlist, ) ) @@ -1011,7 +1007,8 @@ def main(): k8s_api, handle, watchers, - args, + args.disable_fluxion, + args.drain_queues, ) services = register_services( handle, k8s_api, handle.conf_get("rabbit.restrict_persistent", True) From 87e1b4d4ef97490908eee9393f8d8adb3f8a197c Mon Sep 17 00:00:00 2001 From: James Corbett Date: Mon, 4 Nov 2024 15:37:16 -0800 Subject: [PATCH 6/8] dws: drop transient-condition command-line arg Problem: flux-coral2-dws takes an optional flag to set the timeout after which it kills jobs whose workflows are stuck in TransientCondition. However, it should a config file option. Make coral2-dws read the parameter from the `rabbit` table in the config. Drop the command-line option. --- src/modules/coral2_dws.py | 15 ++++----------- t/t1002-dws-workflow-obj.t | 16 ++++++++-------- 2 files changed, 12 insertions(+), 19 deletions(-) diff --git a/src/modules/coral2_dws.py b/src/modules/coral2_dws.py index e6734b6c..d7892f65 100755 --- a/src/modules/coral2_dws.py +++ b/src/modules/coral2_dws.py @@ -813,14 +813,6 @@ def setup_parsing(): default=0, help="Increase verbosity of output", ) - parser.add_argument( - "--transient-condition-timeout", - "-e", - type=float, - default=10, - metavar="N", - help="Kill workflows in TransientCondition state for more than 'N' seconds", - ) parser.add_argument( "--kubeconfig", "-k", @@ -994,11 +986,12 @@ def main(): populate_rabbits_dict(k8s_api) # create a timer watcher for killing workflows that have been stuck in # the "Error" state for too long + tc_timeout = handle.conf_get("rabbit.tc_timeout", 10) handle.timer_watcher_create( - args.transient_condition_timeout / 2, + tc_timeout / 2, kill_workflows_in_tc, - repeat=args.transient_condition_timeout / 2, - args=(args.transient_condition_timeout, k8s_api), + repeat=tc_timeout / 2, + args=(tc_timeout, k8s_api), ).start() # start watching k8s workflow resources and operate on them when updates occur # or new RPCs are received diff --git a/t/t1002-dws-workflow-obj.t b/t/t1002-dws-workflow-obj.t index 19907858..73dabf16 100755 --- a/t/t1002-dws-workflow-obj.t +++ b/t/t1002-dws-workflow-obj.t @@ -42,11 +42,11 @@ test_expect_success 'job-manager: load dws-jobtap and alloc-bypass plugin' ' test_expect_success 'exec dws service-providing script with bad arguments' ' KUBECONFIG=/dev/null test_expect_code 3 flux python ${DWS_MODULE_PATH} \ - -e1 -v && - test_expect_code 3 flux python ${DWS_MODULE_PATH} -e1 -v \ + -v && + test_expect_code 3 flux python ${DWS_MODULE_PATH} -v \ --kubeconfig /dev/null && test_expect_code 2 flux python ${DWS_MODULE_PATH} \ - -e1 -v --foobar + -v --foobar ' test_expect_success 'exec dws service-providing script with fluxion scheduling disabled' ' @@ -54,7 +54,7 @@ test_expect_success 'exec dws service-providing script with fluxion scheduling d DWS_JOBID=$(flux submit \ --setattr=system.alloc-bypass.R="$R" \ -o per-resource.type=node --output=dws-fluxion-disabled.out \ - --error=dws-fluxion-disabled.err python ${DWS_MODULE_PATH} -e1 \ + --error=dws-fluxion-disabled.err python ${DWS_MODULE_PATH} \ -vvv --disable-fluxion) && flux job wait-event -vt 15 -p guest.exec.eventlog ${DWS_JOBID} shell.start && flux job wait-event -vt 15 -m "note=dws watchers setup" ${DWS_JOBID} exception && @@ -109,7 +109,7 @@ test_expect_success 'exec dws service-providing script' ' DWS_JOBID=$(flux submit \ --setattr=system.alloc-bypass.R="$R" \ -o per-resource.type=node --output=dws1.out --error=dws1.err \ - python ${DWS_MODULE_PATH} -e1 -vvv) && + python ${DWS_MODULE_PATH} -vvv) && flux job wait-event -vt 15 -p guest.exec.eventlog ${DWS_JOBID} shell.start ' @@ -336,7 +336,7 @@ test_expect_success 'exec dws service-providing script with custom config path' DWS_JOBID=$(flux submit \ --setattr=system.alloc-bypass.R="$R" \ -o per-resource.type=node --output=dws2.out --error=dws2.err \ - python ${DWS_MODULE_PATH} -e1 --kubeconfig $PWD/kubeconfig -vvv) && + python ${DWS_MODULE_PATH} --kubeconfig $PWD/kubeconfig -vvv) && flux job wait-event -vt 15 -m "note=dws watchers setup" ${DWS_JOBID} exception && ${RPC} "dws.create" ' @@ -420,7 +420,7 @@ test_expect_success 'dws service script handles restarts while a job is running' DWS_JOBID=$(flux submit \ --setattr=system.alloc-bypass.R="$R" \ -o per-resource.type=node --output=dws3.out --error=dws3.err \ - python ${DWS_MODULE_PATH} -e1 --kubeconfig $PWD/kubeconfig -vvv) && + python ${DWS_MODULE_PATH} --kubeconfig $PWD/kubeconfig -vvv) && flux job wait-event -vt 5 -m status=0 ${jobid} finish && flux job wait-event -vt 5 -m description=${EPILOG_NAME} \ ${jobid} epilog-start && @@ -478,7 +478,7 @@ test_expect_success 'launch service with storage maximum arguments' ' DWS_JOBID=$(flux submit \ --setattr=system.alloc-bypass.R="$R" \ -o per-resource.type=node --output=dws4.out --error=dws4.err \ - python ${DWS_MODULE_PATH} -e1 --kubeconfig $PWD/kubeconfig -vvv) && + python ${DWS_MODULE_PATH} --kubeconfig $PWD/kubeconfig -vvv) && flux job wait-event -vt 15 -m "note=dws watchers setup" ${DWS_JOBID} exception && ${RPC} "dws.create" ' From a106d4b4e99928e80a2af7bbbea6c8309e1bf392 Mon Sep 17 00:00:00 2001 From: James Corbett Date: Mon, 4 Nov 2024 16:32:31 -0800 Subject: [PATCH 7/8] dws: drop kubeconfig command-line arg Problem: flux-coral2-dws takes an optional flag to set the path to the kubeconfig file for it to use. However, it should a config file option. Make coral2-dws read the parameter from the `rabbit` table in the config. Drop the command-line option. --- src/modules/coral2_dws.py | 11 +++-------- t/t1002-dws-workflow-obj.t | 18 +++++++++++++----- 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/src/modules/coral2_dws.py b/src/modules/coral2_dws.py index d7892f65..afeafb76 100755 --- a/src/modules/coral2_dws.py +++ b/src/modules/coral2_dws.py @@ -813,13 +813,6 @@ def setup_parsing(): default=0, help="Increase verbosity of output", ) - parser.add_argument( - "--kubeconfig", - "-k", - default=None, - metavar="FILE", - help="Path to kubeconfig file to use", - ) parser.add_argument( "--min-allocation-size", "-m", @@ -969,7 +962,9 @@ def main(): handle.conf_get(f"rabbit.policy.maximums.{fs_type}"), ) try: - k8s_client = k8s.config.new_client_from_config(config_file=args.kubeconfig) + k8s_client = k8s.config.new_client_from_config( + handle.conf_get("rabbit.kubeconfig") + ) except ConfigException: LOGGER.exception("Kubernetes misconfigured, service will shut down") sys.exit(_EXITCODE_NORESTART) diff --git a/t/t1002-dws-workflow-obj.t b/t/t1002-dws-workflow-obj.t index 73dabf16..93240bbd 100755 --- a/t/t1002-dws-workflow-obj.t +++ b/t/t1002-dws-workflow-obj.t @@ -43,13 +43,17 @@ test_expect_success 'job-manager: load dws-jobtap and alloc-bypass plugin' ' test_expect_success 'exec dws service-providing script with bad arguments' ' KUBECONFIG=/dev/null test_expect_code 3 flux python ${DWS_MODULE_PATH} \ -v && - test_expect_code 3 flux python ${DWS_MODULE_PATH} -v \ - --kubeconfig /dev/null && + echo " +[rabbit] +kubeconfig = \"/dev/null\" + " | flux config load && + test_expect_code 3 flux python ${DWS_MODULE_PATH} -v && test_expect_code 2 flux python ${DWS_MODULE_PATH} \ -v --foobar ' test_expect_success 'exec dws service-providing script with fluxion scheduling disabled' ' + flux config reload && R=$(flux R encode -r 0) && DWS_JOBID=$(flux submit \ --setattr=system.alloc-bypass.R="$R" \ @@ -333,10 +337,14 @@ test_expect_success 'exec dws service-providing script with custom config path' flux cancel ${DWS_JOBID} && cp $REAL_HOME/.kube/config ./kubeconfig R=$(flux R encode -r 0) && + echo " +[rabbit] +kubeconfig = \"$PWD/kubeconfig\" + " | flux config load && DWS_JOBID=$(flux submit \ --setattr=system.alloc-bypass.R="$R" \ -o per-resource.type=node --output=dws2.out --error=dws2.err \ - python ${DWS_MODULE_PATH} --kubeconfig $PWD/kubeconfig -vvv) && + python ${DWS_MODULE_PATH} -vvv) && flux job wait-event -vt 15 -m "note=dws watchers setup" ${DWS_JOBID} exception && ${RPC} "dws.create" ' @@ -420,7 +428,7 @@ test_expect_success 'dws service script handles restarts while a job is running' DWS_JOBID=$(flux submit \ --setattr=system.alloc-bypass.R="$R" \ -o per-resource.type=node --output=dws3.out --error=dws3.err \ - python ${DWS_MODULE_PATH} --kubeconfig $PWD/kubeconfig -vvv) && + python ${DWS_MODULE_PATH} -vvv) && flux job wait-event -vt 5 -m status=0 ${jobid} finish && flux job wait-event -vt 5 -m description=${EPILOG_NAME} \ ${jobid} epilog-start && @@ -478,7 +486,7 @@ test_expect_success 'launch service with storage maximum arguments' ' DWS_JOBID=$(flux submit \ --setattr=system.alloc-bypass.R="$R" \ -o per-resource.type=node --output=dws4.out --error=dws4.err \ - python ${DWS_MODULE_PATH} --kubeconfig $PWD/kubeconfig -vvv) && + python ${DWS_MODULE_PATH} -vvv) && flux job wait-event -vt 15 -m "note=dws watchers setup" ${DWS_JOBID} exception && ${RPC} "dws.create" ' From cfc450d5ace402669781b2364ee16a83510c0bed Mon Sep 17 00:00:00 2001 From: James Corbett Date: Mon, 4 Nov 2024 17:27:53 -0800 Subject: [PATCH 8/8] dws: add logic to validate rabbit config table Problem: there are no checks to ensure that the rabbit config table is valid. Add some simple validation. --- src/modules/coral2_dws.py | 33 +++++++++++++++++++++++++++++++++ t/t1002-dws-workflow-obj.t | 13 +++++++++++++ 2 files changed, 46 insertions(+) diff --git a/src/modules/coral2_dws.py b/src/modules/coral2_dws.py index afeafb76..49ba590f 100755 --- a/src/modules/coral2_dws.py +++ b/src/modules/coral2_dws.py @@ -942,6 +942,38 @@ def kubernetes_backoff(handle, orig_retry_delay): time.sleep(retry_delay) +def validate_config(config): + """Validate the `rabbit` config table.""" + accepted_keys = { + "save_datamovements", + "kubeconfig", + "tc_timeout", + "drain_compute_nodes", + "restrict_persistent_creation", + "policy", + } + keys = set(config.keys()) + if not keys <= accepted_keys: + raise RuntimeError( + f"misconfiguration: unrecognized " + f"`rabbit.{(keys - accepted_keys).pop()}` key in Flux config, accepted " + f"keys are {accepted_keys}" + ) + if "policy" in config: + if len(config["policy"]) != 1 or "maximums" not in config["policy"]: + raise RuntimeError( + "`rabbit.policy` config table muxt have a `maximums` table" + ) + keys = set(config["policy"]["maximums"].keys()) + accepted_keys = set(directivebreakdown.ResourceLimits.TYPES) + if not keys <= accepted_keys: + raise RuntimeError( + f"misconfiguration: unrecognized " + f"`rabbit.policy.maximums.{(keys - accepted_keys).pop()}` key in Flux " + f"config, accepted keys are {accepted_keys}" + ) + + def main(): """Init script, begin processing of services.""" args = setup_parsing().parse_args() @@ -953,6 +985,7 @@ def main(): if "FLUX_KVS_NAMESPACE" in os.environ: del os.environ["FLUX_KVS_NAMESPACE"] handle = flux.Flux() + validate_config(handle.conf_get("rabbit", {})) WorkflowInfo.save_datamovements = handle.conf_get("rabbit.save_datamovements", 0) # set the maximum allowable allocation sizes on the ResourceLimits class for fs_type in directivebreakdown.ResourceLimits.TYPES: diff --git a/t/t1002-dws-workflow-obj.t b/t/t1002-dws-workflow-obj.t index 93240bbd..deefebd4 100755 --- a/t/t1002-dws-workflow-obj.t +++ b/t/t1002-dws-workflow-obj.t @@ -52,6 +52,19 @@ kubeconfig = \"/dev/null\" -v --foobar ' +test_expect_success 'exec dws service-providing script with bad config' ' + echo " +[rabbit] +foobar = false + " | flux config load && + test_must_fail flux python ${DWS_MODULE_PATH} -v && + echo " +[rabbit.policy.maximums] +fake = 1 + " | flux config load && + test_must_fail flux python ${DWS_MODULE_PATH} -v +' + test_expect_success 'exec dws service-providing script with fluxion scheduling disabled' ' flux config reload && R=$(flux R encode -r 0) &&