Skip to content

Commit

Permalink
Send signal to process.
Browse files Browse the repository at this point in the history
  • Loading branch information
alexhsamuel committed Dec 30, 2023
1 parent 73321b7 commit 0c7b8b1
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 14 deletions.
20 changes: 20 additions & 0 deletions python/apsis/apsis.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from .host_group import config_host_groups
from .jobs import Jobs, load_jobs_dir, diff_jobs_dirs
from .lib.asyn import cancel_task
from .lib.sys import to_signal
from .program.base import _InternalProgram, Output, OutputMetadata, ProgramError, ProgramFailure
from .program.procstar.agent import start_server
from . import runs
Expand All @@ -22,6 +23,7 @@
from .runs import get_bind_args
from .scheduled import ScheduledRuns
from .scheduler import Scheduler, get_runs_to_schedule
from .states import State

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -679,6 +681,24 @@ async def rerun(self, run, *, time=None):
return new_run


async def send_signal(self, run, signal):
"""
:raise RuntimeError:
`run` is not running.
"""
signal = to_signal(signal)
if run.state != State.running:
raise RuntimeError("invalid run state for signal: {run.state.name}")
assert run.program is not None

self.run_log.info(run, f"sending {signal.name}")
try:
await run.program.signal(run.run_id, run.run_state, signal)
except Exception:
self.run_log.exc(run, f"sending {signal.name} failed")
raise RuntimeError(f"sending {signal.name} failed")


async def shut_down(self):
log.info("shutting down Apsis")
for task in list(self.__action_tasks):
Expand Down
2 changes: 1 addition & 1 deletion python/apsis/lib/sys.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def get_username():
return pwd.getpwuid(os.getuid()).pw_name


def to_signal(sig):
def to_signal(sig) -> signal.Signals:
"""
Parses a signal number or named signal.
Expand Down
13 changes: 9 additions & 4 deletions python/apsis/program/procstar/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from apsis.lib.json import check_schema
from apsis.lib.parse import parse_duration
from apsis.lib.py import or_none
from apsis.lib.sys import to_signal
from apsis.program import base
from apsis.runs import join_args, template_expand

Expand Down Expand Up @@ -255,7 +256,9 @@ async def __wait(self, run_id, proc):
try:
# Unless the proc is already terminated, await the next message.
result = (
proc.results.latest if proc.results.latest.state == "terminated"
proc.results.latest
if proc.results.latest is not None
and proc.results.latest.state == "terminated"
else await asyncio.wait_for(anext(proc.results), wait_timeout)
)
except asyncio.TimeoutError:
Expand Down Expand Up @@ -316,7 +319,7 @@ async def __wait(self, run_id, proc):
else f"killed by {status.signal}"
)
raise base.ProgramFailure(
f"program failed: {cause}",
str(cause),
outputs =outputs,
meta =meta,
)
Expand Down Expand Up @@ -352,8 +355,10 @@ async def reconnect():


async def signal(self, run_id, run_state, signal):
# FIXME
raise NotImplementedError("signal")
signal = to_signal(signal)
log.info(f"sending signal: {run_id}: {signal}")
proc_id = run_state["proc_id"]
await SERVER.send_signal(proc_id, int(signal))



19 changes: 10 additions & 9 deletions python/apsis/service/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from apsis.lib.api import response_json, error, time_to_jso, to_bool, encode_response
import apsis.lib.itr
from apsis.lib.timing import Timer
from apsis.lib.sys import to_signal
from ..jobs import jso_to_job
from ..runs import Instance, Run, RunError

Expand Down Expand Up @@ -361,23 +362,23 @@ async def run_rerun(request, run_id):
return response_json(jso)


# FIXME: PUT is probably right, but run actions currently are POST only.
# PUT is probably right, but run actions currently are POST only.
@API.route("/runs/<run_id>/signal/<signal>", methods={"PUT", "POST"})
async def run_signal(request, run_id, signal):
apsis = request.app.apsis
_, run = apsis.run_store.get(run_id)

if run.state not in {run.STATE.running}:
return error("invalid run state for signal", 409, state=run.state.name)
assert run.program is not None
try:
signal = to_signal(signal)
except ValueError:
return error(f"invalid signal: {signal}", 400)

apsis.run_log.info(run, f"sending signal {signal}")
try:
# FIXME: This should be via the apsis API.
await run.program.signal(run_id, run.run_state, signal)
await apsis.send_signal(run, signal)
except RuntimeError as exc:
return error(str(exc), 400) # FIXME: code?
return response_json({})
return error(str(exc), 400)
else:
return response_json({})


@API.route("/runs/<run_id>/mark/<state>", methods={"PUT", "POST"})
Expand Down

0 comments on commit 0c7b8b1

Please sign in to comment.