Skip to content

Commit

Permalink
Add stop_time to schedule API.
Browse files Browse the repository at this point in the history
  • Loading branch information
alexhsamuel committed Dec 17, 2024
1 parent eecf760 commit fde2309
Showing 3 changed files with 59 additions and 35 deletions.
6 changes: 5 additions & 1 deletion python/apsis/cli.py
Original file line number Diff line number Diff line change
@@ -272,7 +272,8 @@ def cmd_schedule(client, args):
args.job_id,
dict(args.args),
args.time,
count=args.count,
count =args.count,
stop_time =args.stop_time,
)
for run in runs:
apsis.cmdline.print_run(run, con)
@@ -289,6 +290,9 @@ def parse_arg(arg):
cmd.add_argument(
"--count", metavar="NUM", type=int, default=1,
help="schedule NUM runs [def: 1]")
cmd.add_argument(
"--stop-time", metavar="TIME", default=None,
help="schedule program stop at TIME [time or duration]")
cmd.add_argument(
"time", metavar="TIME", type=apsis.cmdline.parse_at_time,
help="time to run [time, daytime, 'now']")
22 changes: 19 additions & 3 deletions python/apsis/service/api.py
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@
output_metadata_to_jso, run_log_to_jso, output_to_http_message
)
import apsis.lib.itr
from apsis.lib.parse import parse_duration
from apsis.lib.sys import to_signal
from apsis.states import to_state
from ..jobs import jso_to_job
@@ -564,10 +565,25 @@ async def run_post(request):
args = jso.get("args", {})
inst = Instance(job_id, args)

time = jso.get("times", {}).get("schedule", "now")
time = None if time == "now" else ora.Time(time)
times = jso.get("times", {})
time = times.get("schedule", "now")
time = ora.now() if time == "now" else ora.Time(time)

runs = ( apsis.schedule(time, inst) for _ in range(count) )
stop_time = times.get("stop", None)
if stop_time is not None:
# Either an absolute time or a duration ahead of schedule time.
try:
stop_time = ora.Time(stop_time)
except ValueError:
try:
stop_time = time + parse_duration(stop_time)
except ValueError:
raise ValueError(f"invalid stop time: {stop_time}")

runs = (
apsis.schedule(time, inst, stop_time=stop_time)
for _ in range(count)
)
runs = await asyncio.gather(*runs)
jso = runs_to_jso(request.app, ora.now(), runs)
return response_json(jso)
66 changes: 35 additions & 31 deletions python/apsis/service/client.py
Original file line number Diff line number Diff line change
@@ -13,6 +13,7 @@
import websockets.client

import apsis.service
from apsis.lib.json import nkey

#-------------------------------------------------------------------------------

@@ -312,63 +313,66 @@ def rerun(self, run_id):
return run


def schedule(self, job_id, args, time="now", *, count=None):
"""
Creates and schedules a new run.
"""
job_id = str(job_id)
args = { str(k): str(v) for k, v in args.items() }
time = "now" if time == "now" else str(Time(time))

data = {
"job_id": job_id,
"args": args,
"times": {
"schedule": time,
def __schedule(self, time, job_spec, *, count=None, stop_time=None):
time = "now" if time == "now" else str(Time(time))
stop_time = None if stop_time is None else str(stop_time)
params = {
"data": job_spec | {
"times": {
"schedule": time,
} | nkey("stop", stop_time)
}
}
runs = self.__post("/api/v1/runs", data=data, count=count)["runs"]
runs = self.__post("/api/v1/runs", **params)["runs"]
# FIXME: Hacky.
return next(iter(runs.values())) if count is None else runs.values()


def __schedule(self, time, job, count):
time = "now" if time == "now" else str(Time(time))
data = {
"job": job,
"times": {
"schedule": time,
def schedule(self, job_id, args, time="now", **kw_args):
"""
Creates and schedules a new run.
"""
return self.__schedule(
time,
{
"job_id": str(job_id),
"args" : { str(k): str(v) for k, v in args.items() },
},
}
runs = self.__post("/api/v1/runs", data=data, count=count)["runs"]
# FIXME: Hacky.
return next(iter(runs.values())) if count is None else runs.values()
**kw_args
)


def schedule_adhoc(self, time, job, *, count=None):
return self.__schedule(time, job, count)
def schedule_adhoc(self, time, job, **kw_args):
return self.__schedule(time, {"job": job}, **kw_args)


def schedule_program(self, time, args, *, count=None):
def schedule_program(self, time, args, **kw_args):
"""
:param time:
The schedule time, or "now" for immediate.
:param args:
The argument vector. The first item is the path to the program
to run.
"""
args = [ str(a) for a in args ]
return self.__schedule(time, {"program": args}, count)
return self.__schedule(
time,
{"job": {"program": [ str(a) for a in args ]}},
**kw_args
)


def schedule_shell_program(self, time, command, *, count=None):
def schedule_shell_program(self, time, command, **kw_args):
"""
:param time:
The schedule time, or "now" for immediate.
:param command:
The shell command to run.
"""
return self.__schedule(time, {"program": str(command)}, count)
return self.__schedule(
time,
{"job": {"program": str(command)}},
**kw_args
)


def reload_jobs(self, *, dry_run=False):

0 comments on commit fde2309

Please sign in to comment.