diff --git a/docs/config.rst b/docs/config.rst index 6b799987..d6120fd3 100644 --- a/docs/config.rst +++ b/docs/config.rst @@ -198,9 +198,10 @@ This configures the server. procstar: agent: - groups: + connection: start_timeout: "1 min" reconnect_timeout: 60 + update_interval: 60 This configures how Apsis handles Procstar groups. When a Procstar instance connects, it provides a group ID to which it belongs. Each Procstar program @@ -219,3 +220,6 @@ If Apsis reconnects a _running_ run with a Procstar program, the `reconnect_timeout` determines how long it waits for the Procstar instance to reconnect. The default is 0. +While a process is running, Apsis requests an update of its results every +`update_interval` if the agent is connected. The default is 0, which configures +no update requests. diff --git a/jobs/test/procstar/long.yaml b/jobs/test/procstar/long.yaml new file mode 100644 index 00000000..cb8c08f3 --- /dev/null +++ b/jobs/test/procstar/long.yaml @@ -0,0 +1,16 @@ +params: [time] + +program: + type: procstar + argv: + - "/usr/bin/bash" + - "-c" + - | + time={{ time }} + echo "starting for $time sec" + for ((i = 1; i <= $time; i++)); do + echo "$i sec" + sleep 1 + done + echo "done" + diff --git a/python/apsis/program/procstar/agent.py b/python/apsis/program/procstar/agent.py index 3593b6cc..aff64fed 100644 --- a/python/apsis/program/procstar/agent.py +++ b/python/apsis/program/procstar/agent.py @@ -1,8 +1,10 @@ import asyncio import logging +from procstar import proto import procstar.spec from procstar.agent.exc import NoConnectionError import procstar.agent.server +import time import uuid from apsis.lib.json import check_schema @@ -76,7 +78,7 @@ def start_server(cfg): Awaitable that runs the server. """ global SERVER - assert SERVER is None + assert SERVER is None, "server already created" # Network stuff. FROM_ENV = procstar.agent.server.FROM_ENV @@ -89,13 +91,15 @@ def start_server(cfg): key_path = tls_cfg.get("key_path", FROM_ENV) # Group config. - groups_cfg = cfg.get("groups", {}) - start_timeout = parse_duration(groups_cfg.get("start_timeout", "0")) - rec_timeout = parse_duration(groups_cfg.get("reconnect_timeout", "0")) + conn_cfg = cfg.get("connection", {}) + start_timeout = parse_duration(conn_cfg.get("start_timeout", "0")) + rec_timeout = parse_duration(conn_cfg.get("reconnect_timeout", "0")) + update_interval = parse_duration(conn_cfg.get("update_interval", "0")) SERVER = procstar.agent.server.Server() SERVER.start_timeout = start_timeout SERVER.reconnect_timeout = rec_timeout + SERVER.update_interval = update_interval return SERVER.run_forever( host =host, @@ -213,29 +217,75 @@ async def start(self, run_id, cfg): async def __wait(self, run_id, proc): + # Timeout to receive a result update from the agent, before marking the + # run as error. + # FIXME: This is temporary, until we handle WebSocket connection and + # disconnection, and aging out of connections, properly. + TIMEOUT = 600 + + conn = SERVER.connections[proc.conn_id] + last_result_time = time.monotonic() + while True: - # Wait for the next result from the agent. + # How far are we from timing out? + timeout = last_result_time + TIMEOUT - time.monotonic() + if timeout < 0: + # We haven't received a result in too long, so mark the run as + # error. Use output and metadata from the most recent results, + # if available. + if proc.results.latest is None: + meta = outputs = None + else: + meta = _get_metadata(proc.proc_id, proc.results.latest) + output = proc.results.latest.fds.stdout.text.encode() + outputs = base.program_outputs(output) + logging.warning(f"no result update in {TIMEOUT} s: {run_id}") + raise base.ProgramError( + f"lost Procstar process after {TIMEOUT} s", + outputs =outputs, + meta =meta, + ) + + # Wait for the next result from the agent, no more that update_interval. + if SERVER.update_interval > 0: + wait_timeout = min(timeout, SERVER.update_interval) + logging.debug(f"waiting for result {wait_timeout} (timeout in {timeout}): {run_id}") try: - result = await anext(proc.results) + result = await asyncio.wait_for(anext(proc.results), wait_timeout) + except asyncio.TimeoutError: + # Didn't receive a result. + if conn.open: + # Request an update. + # FIXME: There should be an API! + await conn.send(proto.ProcResultRequest(proc.proc_id)) + else: + # Can't request an update; the agent is not connected. + logging.warning(f"no connection to agent: {run_id}") + continue except Exception as exc: raise base.ProgramError(f"procstar: {exc}") + else: + # Received a result update. Reset the timeout clock. + last_result_time = time.monotonic() if result.state == "running": # Not completed yet. # FIXME: Do something with this! continue - output = result.fds.stdout.text.encode() - outputs = base.program_outputs(output) - meta = _get_metadata(proc.proc_id, result) - - # Clean up the process. + # The process is no longer running. Clean it up from the agent. try: await SERVER.delete(proc.proc_id) except Exception as exc: log.error(f"delete {proc.proc_id}: {exc}") + # Collect results. + output = result.fds.stdout.text.encode() + outputs = base.program_outputs(output) + meta = _get_metadata(proc.proc_id, result) + if result.state == "error": + # The process failed to start on the agent. raise base.ProgramError( "; ".join(result.errors), outputs =outputs, @@ -243,14 +293,17 @@ async def __wait(self, run_id, proc): ) elif result.state == "terminated": + # The process ran on the agent. status = result.status if status.exit_code == 0: + # The process completed successfully. return base.ProgramSuccess( outputs =outputs, meta =meta, ) else: + # The process terminated unsuccessfully. cause = ( f"exit code {status.exit_code}" if status.signal is None