diff --git a/python/apsis/cli.py b/python/apsis/cli.py index 23c661e9..a3610d29 100644 --- a/python/apsis/cli.py +++ b/python/apsis/cli.py @@ -272,7 +272,8 @@ def cmd_schedule(client, args): args.job_id, dict(args.args), args.time, - count=args.count, + count =args.count, + stop_time =args.stop_time, ) for run in runs: apsis.cmdline.print_run(run, con) @@ -289,6 +290,9 @@ def parse_arg(arg): cmd.add_argument( "--count", metavar="NUM", type=int, default=1, help="schedule NUM runs [def: 1]") + cmd.add_argument( + "--stop-time", metavar="TIME", default=None, + help="schedule program stop at TIME [time or duration]") cmd.add_argument( "time", metavar="TIME", type=apsis.cmdline.parse_at_time, help="time to run [time, daytime, 'now']") diff --git a/python/apsis/service/api.py b/python/apsis/service/api.py index 1aba5d4f..d06f8366 100644 --- a/python/apsis/service/api.py +++ b/python/apsis/service/api.py @@ -17,6 +17,7 @@ output_metadata_to_jso, run_log_to_jso, output_to_http_message ) import apsis.lib.itr +from apsis.lib.parse import parse_duration from apsis.lib.sys import to_signal from apsis.states import to_state from ..jobs import jso_to_job @@ -564,10 +565,25 @@ async def run_post(request): args = jso.get("args", {}) inst = Instance(job_id, args) - time = jso.get("times", {}).get("schedule", "now") - time = None if time == "now" else ora.Time(time) + times = jso.get("times", {}) + time = times.get("schedule", "now") + time = ora.now() if time == "now" else ora.Time(time) - runs = ( apsis.schedule(time, inst) for _ in range(count) ) + stop_time = times.get("stop", None) + if stop_time is not None: + # Either an absolute time or a duration ahead of schedule time. + try: + stop_time = ora.Time(stop_time) + except ValueError: + try: + stop_time = time + parse_duration(stop_time) + except ValueError: + raise ValueError(f"invalid stop time: {stop_time}") + + runs = ( + apsis.schedule(time, inst, stop_time=stop_time) + for _ in range(count) + ) runs = await asyncio.gather(*runs) jso = runs_to_jso(request.app, ora.now(), runs) return response_json(jso) diff --git a/python/apsis/service/client.py b/python/apsis/service/client.py index b9b0bc6d..d4feab58 100644 --- a/python/apsis/service/client.py +++ b/python/apsis/service/client.py @@ -13,6 +13,7 @@ import websockets.client import apsis.service +from apsis.lib.json import nkey #------------------------------------------------------------------------------- @@ -312,44 +313,40 @@ def rerun(self, run_id): return run - def schedule(self, job_id, args, time="now", *, count=None): - """ - Creates and schedules a new run. - """ - job_id = str(job_id) - args = { str(k): str(v) for k, v in args.items() } - time = "now" if time == "now" else str(Time(time)) - - data = { - "job_id": job_id, - "args": args, - "times": { - "schedule": time, + def __schedule(self, time, job_spec, *, count=None, stop_time=None): + time = "now" if time == "now" else str(Time(time)) + stop_time = None if stop_time is None else str(stop_time) + params = { + "data": job_spec | { + "times": { + "schedule": time, + } | nkey("stop", stop_time) } } - runs = self.__post("/api/v1/runs", data=data, count=count)["runs"] + runs = self.__post("/api/v1/runs", **params)["runs"] # FIXME: Hacky. return next(iter(runs.values())) if count is None else runs.values() - def __schedule(self, time, job, count): - time = "now" if time == "now" else str(Time(time)) - data = { - "job": job, - "times": { - "schedule": time, + def schedule(self, job_id, args, time="now", **kw_args): + """ + Creates and schedules a new run. + """ + return self.__schedule( + time, + { + "job_id": str(job_id), + "args" : { str(k): str(v) for k, v in args.items() }, }, - } - runs = self.__post("/api/v1/runs", data=data, count=count)["runs"] - # FIXME: Hacky. - return next(iter(runs.values())) if count is None else runs.values() + **kw_args + ) - def schedule_adhoc(self, time, job, *, count=None): - return self.__schedule(time, job, count) + def schedule_adhoc(self, time, job, **kw_args): + return self.__schedule(time, {"job": job}, **kw_args) - def schedule_program(self, time, args, *, count=None): + def schedule_program(self, time, args, **kw_args): """ :param time: The schedule time, or "now" for immediate. @@ -357,18 +354,25 @@ def schedule_program(self, time, args, *, count=None): The argument vector. The first item is the path to the program to run. """ - args = [ str(a) for a in args ] - return self.__schedule(time, {"program": args}, count) + return self.__schedule( + time, + {"job": {"program": [ str(a) for a in args ]}}, + **kw_args + ) - def schedule_shell_program(self, time, command, *, count=None): + def schedule_shell_program(self, time, command, **kw_args): """ :param time: The schedule time, or "now" for immediate. :param command: The shell command to run. """ - return self.__schedule(time, {"program": str(command)}, count) + return self.__schedule( + time, + {"job": {"program": str(command)}}, + **kw_args + ) def reload_jobs(self, *, dry_run=False):