Skip to content

Commit

Permalink
Use Program.stop() for scheduled stop.
Browse files Browse the repository at this point in the history
  • Loading branch information
alexhsamuel committed Dec 17, 2024
1 parent 5e9cca1 commit eecf760
Showing 1 changed file with 7 additions and 15 deletions.
22 changes: 7 additions & 15 deletions python/apsis/apsis.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ def _start(self, run):
self if isinstance(run.program, _InternalProgram) else self.cfg,
)
# Start a task to process updates from the program.
run_task = _process_updates(self, run, updates)
run_task = _process_updates(self, run, updates, run.program)
self.__run_tasks.add(run.run_id, run_task)


Expand All @@ -265,7 +265,7 @@ def __reconnect(self, run):
self if isinstance(run.program, _InternalProgram) else self.cfg,
)
# Start a task to process updates from the program.
run_task = _process_updates(self, run, updates)
run_task = _process_updates(self, run, updates, run.program)
self.__run_tasks.add(run.run_id, run_task)


Expand Down Expand Up @@ -791,7 +791,7 @@ async def _cmpr(output):
return dict(zip(outputs.keys(), o))


async def _process_updates(apsis, run, updates):
async def _process_updates(apsis, run, updates, program):
"""
Processes program `updates` for `run` until the program is finished.
"""
Expand Down Expand Up @@ -829,24 +829,16 @@ async def _process_updates(apsis, run, updates):
# Does this run have a scheduled stop time?
try:
stop_time = run.times["stop"]

except KeyError:
stop_task = None

else:
# Start a task to stop the run at the scheduled time.
async def stop():
sleep = stop_time - now()
log.debug(f"{run_id}: sleeping {sleep} s until stop")
await asyncio.sleep(sleep)
duration = stop_time - now()
log.debug(f"{run_id}: running for {duration} s until stop")
await asyncio.sleep(duration)
log.debug(f"{run_id}: stopping")

# FIXME: Generalize to program.stop.
if not run.state.finished:
await apsis.send_signal(run, to_signal("SIGTERM"))
await asyncio.sleep(30)
if not run.state.finished:
await apsis.send_signal(run, to_signal("SIGKILL"))
await program.stop()

stop_task = asyncio.create_task(stop())

Expand Down

0 comments on commit eecf760

Please sign in to comment.