Skip to content

Commit

Permalink
Requst updates periodically. Error a run if we don't receive one.
Browse files Browse the repository at this point in the history
  • Loading branch information
alexhsamuel committed Nov 19, 2023
1 parent 256fcaf commit 2064b09
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 12 deletions.
6 changes: 5 additions & 1 deletion docs/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,10 @@ This configures the server.
procstar:
agent:
groups:
connection:
start_timeout: "1 min"
reconnect_timeout: 60
update_interval: 60
This configures how Apsis handles Procstar groups. When a Procstar instance
connects, it provides a group ID to which it belongs. Each Procstar program
Expand All @@ -219,3 +220,6 @@ If Apsis reconnects a _running_ run with a Procstar program, the
`reconnect_timeout` determines how long it waits for the Procstar instance to
reconnect. The default is 0.

While a process is running, Apsis requests an update of its results every
`update_interval` if the agent is connected. The default is 0, which configures
no update requests.
16 changes: 16 additions & 0 deletions jobs/test/procstar/long.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
params: [time]

program:
type: procstar
argv:
- "/usr/bin/bash"
- "-c"
- |
time={{ time }}
echo "starting for $time sec"
for ((i = 1; i <= $time; i++)); do
echo "$i sec"
sleep 1
done
echo "done"
75 changes: 64 additions & 11 deletions python/apsis/program/procstar/agent.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import asyncio
import logging
from procstar import proto
import procstar.spec
from procstar.agent.exc import NoConnectionError
import procstar.agent.server
import time
import uuid

from apsis.lib.json import check_schema
Expand Down Expand Up @@ -76,7 +78,7 @@ def start_server(cfg):
Awaitable that runs the server.
"""
global SERVER
assert SERVER is None
assert SERVER is None, "server already created"

# Network stuff.
FROM_ENV = procstar.agent.server.FROM_ENV
Expand All @@ -89,13 +91,15 @@ def start_server(cfg):
key_path = tls_cfg.get("key_path", FROM_ENV)

# Group config.
groups_cfg = cfg.get("groups", {})
start_timeout = parse_duration(groups_cfg.get("start_timeout", "0"))
rec_timeout = parse_duration(groups_cfg.get("reconnect_timeout", "0"))
conn_cfg = cfg.get("connection", {})
start_timeout = parse_duration(conn_cfg.get("start_timeout", "0"))
rec_timeout = parse_duration(conn_cfg.get("reconnect_timeout", "0"))
update_interval = parse_duration(conn_cfg.get("update_interval", "0"))

SERVER = procstar.agent.server.Server()
SERVER.start_timeout = start_timeout
SERVER.reconnect_timeout = rec_timeout
SERVER.update_interval = update_interval

return SERVER.run_forever(
host =host,
Expand Down Expand Up @@ -213,44 +217,93 @@ async def start(self, run_id, cfg):


async def __wait(self, run_id, proc):
# Timeout to receive a result update from the agent, before marking the
# run as error.
# FIXME: This is temporary, until we handle WebSocket connection and
# disconnection, and aging out of connections, properly.
TIMEOUT = 600

conn = SERVER.connections[proc.conn_id]
last_result_time = time.monotonic()

while True:
# Wait for the next result from the agent.
# How far are we from timing out?
timeout = last_result_time + TIMEOUT - time.monotonic()
if timeout < 0:
# We haven't received a result in too long, so mark the run as
# error. Use output and metadata from the most recent results,
# if available.
if proc.results.latest is None:
meta = outputs = None
else:
meta = _get_metadata(proc.proc_id, proc.results.latest)
output = proc.results.latest.fds.stdout.text.encode()
outputs = base.program_outputs(output)
logging.warning(f"no result update in {TIMEOUT} s: {run_id}")
raise base.ProgramError(
f"lost Procstar process after {TIMEOUT} s",
outputs =outputs,
meta =meta,
)

# Wait for the next result from the agent, no more that update_interval.
if SERVER.update_interval > 0:
wait_timeout = min(timeout, SERVER.update_interval)
logging.debug(f"waiting for result {wait_timeout} (timeout in {timeout}): {run_id}")
try:
result = await anext(proc.results)
result = await asyncio.wait_for(anext(proc.results), wait_timeout)
except asyncio.TimeoutError:
# Didn't receive a result.
if conn.open:
# Request an update.
# FIXME: There should be an API!
await conn.send(proto.ProcResultRequest(proc.proc_id))
else:
# Can't request an update; the agent is not connected.
logging.warning(f"no connection to agent: {run_id}")
continue
except Exception as exc:
raise base.ProgramError(f"procstar: {exc}")
else:
# Received a result update. Reset the timeout clock.
last_result_time = time.monotonic()

if result.state == "running":
# Not completed yet.
# FIXME: Do something with this!
continue

output = result.fds.stdout.text.encode()
outputs = base.program_outputs(output)
meta = _get_metadata(proc.proc_id, result)

# Clean up the process.
# The process is no longer running. Clean it up from the agent.
try:
await SERVER.delete(proc.proc_id)
except Exception as exc:
log.error(f"delete {proc.proc_id}: {exc}")

# Collect results.
output = result.fds.stdout.text.encode()
outputs = base.program_outputs(output)
meta = _get_metadata(proc.proc_id, result)

if result.state == "error":
# The process failed to start on the agent.
raise base.ProgramError(
"; ".join(result.errors),
outputs =outputs,
meta =meta,
)

elif result.state == "terminated":
# The process ran on the agent.
status = result.status
if status.exit_code == 0:
# The process completed successfully.
return base.ProgramSuccess(
outputs =outputs,
meta =meta,
)

else:
# The process terminated unsuccessfully.
cause = (
f"exit code {status.exit_code}"
if status.signal is None
Expand Down

0 comments on commit 2064b09

Please sign in to comment.