Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dws: rabbit config file support #234

Merged
merged 8 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 75 additions & 96 deletions src/modules/coral2_dws.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import urllib3

import flux
import flux.kvs
from flux.hostlist import Hostlist
from flux.job.JobID import id_parse
from flux.constants import FLUX_MSGTYPE_REQUEST
Expand Down Expand Up @@ -277,7 +278,7 @@ def create_cb(handle, _t, msg, arg):

Triggered when a new job with a jobdw directive is submitted.
"""
api_instance, unrestricted_persistent = arg
api_instance, restrict_persistent = arg
dw_directives = msg.payload["dw_directives"]
jobid = msg.payload["jobid"]
userid = msg.payload["userid"]
Expand All @@ -291,7 +292,7 @@ def create_cb(handle, _t, msg, arg):
f"Malformed dw_directives, not list or string: {dw_directives!r}"
)
for directive in dw_directives:
if not unrestricted_persistent and "create_persistent" in directive:
if restrict_persistent and "create_persistent" in directive:
if userid != owner_uid(handle):
raise ValueError(
"only the instance owner can create persistent file systems"
Expand Down Expand Up @@ -592,15 +593,15 @@ def _workflow_state_change_cb_inner(workflow, winfo, handle, k8s_api, disable_fl
WORKFLOWS_IN_TC.discard(winfo)


def drain_offline_nodes(handle, rabbit_name, nodelist, disable_draining, allowlist):
def drain_offline_nodes(handle, rabbit_name, nodelist, allowlist):
"""Drain nodes listed as offline in a given Storage resource.

Drain all the nodes in `nodelist` that are Offline, provided they are
in `allowlist`.

If disable_draining is True, do nothing.
If draining is disabled in the rabbit config table, do nothing.
"""
if disable_draining:
if not handle.conf_get("rabbit.drain_compute_nodes", True):
return
offline_nodes = Hostlist()
for compute_node in nodelist:
Expand Down Expand Up @@ -635,9 +636,7 @@ def mark_rabbit(handle, status, resource_path, ssdcount, name):
handle.rpc("sched-fluxion-resource.set_status", payload).then(log_rpc_response)


def rabbit_state_change_cb(
event, handle, rabbit_rpaths, disable_draining, disable_fluxion, allowlist
):
def rabbit_state_change_cb(event, handle, rabbit_rpaths, disable_fluxion, allowlist):
"""Callback firing when a Storage object changes.

Marks a rabbit as up or down.
Expand Down Expand Up @@ -666,23 +665,20 @@ def rabbit_state_change_cb(
handle,
name,
computes,
disable_draining,
allowlist,
)
# TODO: add some check for whether rabbit capacity has changed
# TODO: update capacity of rabbit in resource graph (mark some slices down?)


def map_rabbits_to_fluxion_paths(graph_path):
def map_rabbits_to_fluxion_paths(handle):
"""Read the fluxion resource graph and map rabbit hostnames to resource paths."""
rabbit_rpaths = {}
try:
with open(graph_path, encoding="utf8") as graph_fd:
nodes = json.load(graph_fd)["scheduling"]["graph"]["nodes"]
nodes = flux.kvs.get(handle, "resource.R")["scheduling"]["graph"]["nodes"]
except Exception as exc:
raise ValueError(
f"Could not load rabbit resource graph data from {graph_path} "
"expected a Flux R file augmented with JGF from 'flux-dws2jgf'"
f"Could not load rabbit resource graph data from KVS's resource.R"
) from exc
for vertex in nodes:
metadata = vertex["metadata"]
Expand All @@ -694,7 +690,7 @@ def map_rabbits_to_fluxion_paths(graph_path):
return rabbit_rpaths


def init_rabbits(k8s_api, handle, watchers, args):
def init_rabbits(k8s_api, handle, watchers, disable_fluxion, drain_queues):
"""Watch every rabbit ('Storage' resources in k8s) known to k8s.

Whenever a Storage resource changes, mark it as 'up' or 'down' in Fluxion.
Expand All @@ -703,19 +699,17 @@ def init_rabbits(k8s_api, handle, watchers, args):
down, because status may have changed while this service was inactive.
"""
api_response = k8s_api.list_namespaced_custom_object(*crd.RABBIT_CRD)
if not args.disable_fluxion:
rabbit_rpaths = map_rabbits_to_fluxion_paths(args.resourcegraph)
if not disable_fluxion:
rabbit_rpaths = map_rabbits_to_fluxion_paths(handle)
else:
rabbit_rpaths = {}
resource_version = 0
if args.drain_queues is not None:
if drain_queues is not None:
rset = flux.resource.resource_list(handle).get().all
allowlist = set(
rset.copy_constraint({"properties": args.drain_queues}).nodelist
)
allowlist = set(rset.copy_constraint({"properties": drain_queues}).nodelist)
if not allowlist:
raise ValueError(
f"No resources found associated with queues {args.drain_queues}"
f"No resources found associated with queues {drain_queues}"
)
else:
allowlist = None
Expand All @@ -724,7 +718,7 @@ def init_rabbits(k8s_api, handle, watchers, args):
resource_version = max(
resource_version, int(rabbit["metadata"]["resourceVersion"])
)
if args.disable_fluxion:
if disable_fluxion:
# don't mark the rabbit up or down but add the rabbit to the mapping
rabbit_rpaths[name] = None
elif name not in rabbit_rpaths:
Expand All @@ -749,7 +743,6 @@ def init_rabbits(k8s_api, handle, watchers, args):
handle,
name,
computes,
args.disable_compute_node_draining,
allowlist,
)
watchers.add_watch(
Expand All @@ -760,8 +753,7 @@ def init_rabbits(k8s_api, handle, watchers, args):
rabbit_state_change_cb,
handle,
rabbit_rpaths,
args.disable_compute_node_draining,
args.disable_fluxion,
disable_fluxion,
allowlist,
)
)
Expand Down Expand Up @@ -821,40 +813,13 @@ def setup_parsing():
default=0,
help="Increase verbosity of output",
)
parser.add_argument(
"--transient-condition-timeout",
"-e",
type=float,
default=10,
metavar="N",
help="Kill workflows in TransientCondition state for more than 'N' seconds",
)
parser.add_argument(
"--kubeconfig",
"-k",
default=None,
metavar="FILE",
help="Path to kubeconfig file to use",
)
parser.add_argument(
"--resourcegraph",
"-r",
default=str(pathlib.Path("/etc/flux/system/R").absolute()),
metavar="FILE",
help="Path to file containing Fluxion JGF resource graph",
)
parser.add_argument(
"--min-allocation-size",
"-m",
default=_MIN_ALLOCATION_SIZE,
metavar="N",
help="Minimum allocation size of rabbit allocations, in bytes",
)
parser.add_argument(
"--disable-compute-node-draining",
action="store_true",
help="Disable the draining of compute nodes based on k8s status",
)
parser.add_argument(
"--drain-queues",
nargs="+",
Expand All @@ -875,33 +840,6 @@ def setup_parsing():
"failures occur back-to-back"
),
)
parser.add_argument(
"--save-datamovements",
metavar="N",
default=5,
type=int,
help="Number of nnfdatamovements to save to job KVS, defaults to 5",
)
parser.add_argument(
"--unrestricted-persistent",
action="store_true",
help=(
"Allow any user to create persistent file systems, not just the instance "
"owner"
),
)
for fs_option, fs_help in (
("xfs", "XFS"),
("gfs2", "GFS2"),
("lustre", "Lustre"),
("raw", "raw"),
):
parser.add_argument(
f"--max-{fs_option}",
metavar="N",
help=f"Maximum {fs_help} capacity per node, in GiB",
type=int,
)
return parser


Expand Down Expand Up @@ -942,14 +880,14 @@ def populate_rabbits_dict(k8s_api):
_RABBITS_TO_HOSTLISTS[nnf["name"]] = hlist.encode()


def register_services(handle, k8s_api, unrestricted_persistent):
def register_services(handle, k8s_api, restrict_persistent):
"""register dws.create, dws.setup, and dws.post_run services."""
serv_reg_fut = handle.service_register("dws")
create_watcher = handle.msg_watcher_create(
create_cb,
FLUX_MSGTYPE_REQUEST,
"dws.create",
args=(k8s_api, unrestricted_persistent),
args=(k8s_api, restrict_persistent),
)
create_watcher.start()
setup_watcher = handle.msg_watcher_create(
Expand All @@ -972,16 +910,11 @@ def raise_self_exception(handle):
testing purposes.
Once https://github.com/flux-framework/flux-core/issues/3821 is
implemented/closed, this can be replaced with that solution.

Also, remove FLUX_KVS_NAMESPACE from the environment, because otherwise
KVS lookups will look relative to that namespace, changing the behavior
relative to when the script runs as a service.
"""
try:
jobid = id_parse(os.environ["FLUX_JOB_ID"])
except KeyError:
return
del os.environ["FLUX_KVS_NAMESPACE"]
Future(handle.job_raise(jobid, "exception", 7, "dws watchers setup")).get()


Expand Down Expand Up @@ -1009,19 +942,62 @@ def kubernetes_backoff(handle, orig_retry_delay):
time.sleep(retry_delay)


def validate_config(config):
"""Validate the `rabbit` config table."""
accepted_keys = {
"save_datamovements",
"kubeconfig",
"tc_timeout",
"drain_compute_nodes",
"restrict_persistent_creation",
"policy",
}
keys = set(config.keys())
if not keys <= accepted_keys:
raise RuntimeError(
f"misconfiguration: unrecognized "
f"`rabbit.{(keys - accepted_keys).pop()}` key in Flux config, accepted "
f"keys are {accepted_keys}"
)
if "policy" in config:
if len(config["policy"]) != 1 or "maximums" not in config["policy"]:
raise RuntimeError(
"`rabbit.policy` config table muxt have a `maximums` table"
)
keys = set(config["policy"]["maximums"].keys())
accepted_keys = set(directivebreakdown.ResourceLimits.TYPES)
if not keys <= accepted_keys:
raise RuntimeError(
f"misconfiguration: unrecognized "
f"`rabbit.policy.maximums.{(keys - accepted_keys).pop()}` key in Flux "
f"config, accepted keys are {accepted_keys}"
)


def main():
"""Init script, begin processing of services."""
args = setup_parsing().parse_args()
_MIN_ALLOCATION_SIZE = args.min_allocation_size
config_logging(args)
WorkflowInfo.save_datamovements = args.save_datamovements
# Remove FLUX_KVS_NAMESPACE from the environment if set, because otherwise
# KVS lookups will look relative to that namespace, but this service
# must operate on the default namespace.
if "FLUX_KVS_NAMESPACE" in os.environ:
del os.environ["FLUX_KVS_NAMESPACE"]
handle = flux.Flux()
validate_config(handle.conf_get("rabbit", {}))
WorkflowInfo.save_datamovements = handle.conf_get("rabbit.save_datamovements", 0)
# set the maximum allowable allocation sizes on the ResourceLimits class
for fs_type in directivebreakdown.ResourceLimits.TYPES:
setattr(
directivebreakdown.ResourceLimits, fs_type, getattr(args, f"max_{fs_type}")
directivebreakdown.ResourceLimits,
fs_type,
handle.conf_get(f"rabbit.policy.maximums.{fs_type}"),
)
try:
k8s_client = k8s.config.new_client_from_config(config_file=args.kubeconfig)
k8s_client = k8s.config.new_client_from_config(
handle.conf_get("rabbit.kubeconfig")
)
except ConfigException:
LOGGER.exception("Kubernetes misconfigured, service will shut down")
sys.exit(_EXITCODE_NORESTART)
Expand All @@ -1036,14 +1012,14 @@ def main():
LOGGER.exception("Cannot access kubernetes, service will shut down")
sys.exit(_EXITCODE_NORESTART)
populate_rabbits_dict(k8s_api)
handle = flux.Flux()
# create a timer watcher for killing workflows that have been stuck in
# the "Error" state for too long
tc_timeout = handle.conf_get("rabbit.tc_timeout", 10)
handle.timer_watcher_create(
args.transient_condition_timeout / 2,
tc_timeout / 2,
kill_workflows_in_tc,
repeat=args.transient_condition_timeout / 2,
args=(args.transient_condition_timeout, k8s_api),
repeat=tc_timeout / 2,
args=(tc_timeout, k8s_api),
).start()
# start watching k8s workflow resources and operate on them when updates occur
# or new RPCs are received
Expand All @@ -1052,9 +1028,12 @@ def main():
k8s_api,
handle,
watchers,
args,
args.disable_fluxion,
args.drain_queues,
)
services = register_services(
handle, k8s_api, handle.conf_get("rabbit.restrict_persistent", True)
)
services = register_services(handle, k8s_api, args.unrestricted_persistent)
watchers.add_watch(
Watch(
k8s_api,
Expand Down
7 changes: 7 additions & 0 deletions t/data/workflow-obj/maximums/rabbit.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# maximum filesystem capacity per node, in GiB
[rabbit.policy.maximums]
xfs = 500
gfs2 = 200
raw = 300
lustre = 100

Loading
Loading