Skip to content

Commit

Permalink
feat: exit gracefully on SIGTERM, timeout, and cancellation
Browse files Browse the repository at this point in the history
* Listen for SIGTERM. Trigger `on_terminated` and `on_failure` hooks and exit.
* Wait for cancellation signal on Redis and trigger `on_cancelled` and `on_failure` hooks.
* Log message and exit (0) when a job ID can't be pulled from Redis in a set timeout.

All this early exits allow the workflow to clean-up after itself using hooks.
Status updates are pushed for each of the states.

This should prevent hanging jobs that have been cancelled by a user or terminated by K8S.
  • Loading branch information
igboyes committed Apr 28, 2022
1 parent 313e382 commit 061eecd
Show file tree
Hide file tree
Showing 15 changed files with 268 additions and 116 deletions.
4 changes: 0 additions & 4 deletions tests/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
FROM virtool/workflow-tools:1.0.0
WORKDIR /test

RUN pip install poetry

COPY ./pyproject.toml .
COPY ./poetry.lock .
RUN poetry install --no-root

COPY . .

ENTRYPOINT ["poetry", "run", "pytest"]
1 change: 0 additions & 1 deletion tests/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,3 @@ services:
- ./coverage:/test/coverage
- /var/run/docker.sock:/var/run/docker.sock
network_mode: "host"

9 changes: 9 additions & 0 deletions tests/files/terminated_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import asyncio

from virtool_workflow import step


@step
async def waste_time():
for _ in range(10):
await asyncio.sleep(1)
3 changes: 2 additions & 1 deletion tests/integration/_fixtures/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pytest

from virtool_workflow._runtime import run_workflow
from virtool_workflow.events import Events


@pytest.fixture
Expand All @@ -19,6 +20,6 @@ async def exec_workflow(base_config: Dict[str, Any], job_id):

async def _exec_workflow(workflow, **kwargs):
base_config.update(kwargs)
await run_workflow(base_config, job_id, workflow)
await run_workflow(base_config, job_id, workflow, Events())

return _exec_workflow
123 changes: 123 additions & 0 deletions tests/integration/test_exit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import asyncio
import subprocess

import aioredis


async def test_exit_because_timeout(db, create_job, exec_workflow, job_id):
"""
Test that the runner exits if no job ID can be pulled from Redis before the timeout.
This situation does not involve a status update being sent to the server.
"""
p = subprocess.run(
[
"poetry",
"run",
"run-workflow",
"--workflow-file",
"tests/files/terminated_workflow.py",
"--redis-list-name",
"jobs_termination",
"--redis-connection-string",
"redis://localhost:6379",
"--timeout",
"5",
],
capture_output=True,
encoding="utf-8",
)

assert p.returncode == 0
assert "Waiting for a job for 5 seconds" in p.stderr
assert "Timed out while waiting for job" in p.stderr


async def test_exit_because_sigterm(db, create_job, exec_workflow, job_id, jobs_api):
job = await create_job({})

redis_list_name = f"jobs_{job['workflow']}"

redis = await aioredis.create_redis_pool("redis://localhost:6379")
await redis.rpush(redis_list_name, job_id)
redis.close()
await redis.wait_closed()

p = subprocess.Popen(
[
"poetry",
"run",
"run-workflow",
"--workflow-file",
"tests/files/terminated_workflow.py",
"--jobs-api-connection-string",
jobs_api,
"--redis-list-name",
redis_list_name,
"--redis-connection-string",
"redis://localhost:6379",
"--timeout",
"5",
]
)

await asyncio.sleep(10)

p.terminate()
p.wait()

assert p.returncode == 124

document = await db.jobs.find_one()

assert [(update["state"], update["progress"]) for update in document["status"]] == [
("waiting", 0),
("preparing", 3),
("running", 100),
("terminated", 100),
]


async def test_exit_because_cancelled(db, create_job, exec_workflow, job_id, jobs_api):
job = await create_job({})

redis_list_name = f"jobs_{job['workflow']}"

redis = await aioredis.create_redis_pool("redis://localhost:6379")
await redis.rpush(redis_list_name, job_id)

p = subprocess.Popen(
[
"poetry",
"run",
"run-workflow",
"--workflow-file",
"tests/files/terminated_workflow.py",
"--jobs-api-connection-string",
jobs_api,
"--redis-list-name",
redis_list_name,
"--redis-connection-string",
"redis://localhost:6379",
"--timeout",
"5",
]
)

await asyncio.sleep(5)

await redis.publish("channel:cancel", job_id)

redis.close()
await redis.wait_closed()

p.wait(timeout=15)

document = await db.jobs.find_one()

assert [(update["state"], update["progress"]) for update in document["status"]] == [
("waiting", 0),
("preparing", 3),
("running", 100),
("cancelled", 100),
]
25 changes: 0 additions & 25 deletions tests/integration/test_redis.py

This file was deleted.

32 changes: 19 additions & 13 deletions tests/integration/test_status.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
import pytest
from asyncio import sleep

from virtool_workflow import Workflow, hooks


async def test_status_updates_without_error(db, create_job, exec_workflow, job_id):
async def test_status_updates(db, create_job, exec_workflow, job_id):
await create_job(args={})

wf = Workflow()

@wf.step
def first(job):
"""Description of First."""
assert job.status[-1]["state"] == "waiting"
...
assert job.status[-1]["state"] == "preparing"

@wf.step
def second(job):
"""Description of Second."""
...

jobs = db.get_collection("jobs")
steps_finished = 0
Expand Down Expand Up @@ -71,26 +71,32 @@ async def test_status_updates_with_error(db, create_job, exec_workflow, job_id):
def raise_error():
raise error

hook_called = False
error_hook_called = False

@hooks.on_failure(once=True)
@hooks.on_error(once=True)
async def check_error_update_sent():
nonlocal hook_called
nonlocal error_hook_called

# Wait for status to be received at virtool server
await sleep(0.1)

jobs = db.get_collection("jobs")
job = await db.get_collection("jobs").find_one({"_id": job_id})

job = await jobs.find_one({"_id": job_id})
status = job["status"][-1]

assert status["state"] == "error"
assert status["error"]["type"] == "ValueError"

hook_called = True
error_hook_called = True

with pytest.raises(ValueError):
await exec_workflow(wf)
failure_hook_called = False

@hooks.on_failure(once=True)
async def check_failure_hook_called():
nonlocal failure_hook_called
failure_hook_called = True

await exec_workflow(wf)

assert hook_called is True
assert error_hook_called is True
assert failure_hook_called is True
29 changes: 20 additions & 9 deletions virtool_workflow/_executor.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import asyncio
import logging
import signal
from asyncio import CancelledError
from contextlib import asynccontextmanager, contextmanager
from itertools import chain

from fixtures import FixtureScope, fixture
from virtool_workflow import Workflow
from virtool_workflow._steps import WorkflowStep
from virtool_workflow.events import Events
from virtool_workflow.execution import states
from virtool_workflow.hooks import (
on_failure,
Expand All @@ -17,14 +18,15 @@
on_success,
on_workflow_start,
on_terminated,
on_cancelled,
on_error,
)

logger = logging.getLogger(__name__)


async def execute(
workflow: Workflow,
scope: FixtureScope,
workflow: Workflow, scope: FixtureScope, events: Events
) -> FixtureScope:
"""
Execute a workflow.
Expand All @@ -37,10 +39,6 @@ async def execute(
scope["logger"] = logger
scope["workflow"] = workflow

asyncio.get_event_loop().add_signal_handler(
signal.SIGTERM, lambda *_: asyncio.create_task(on_terminated.trigger(scope))
)

await on_workflow_start.trigger(scope)

try:
Expand All @@ -51,10 +49,23 @@ async def execute(
async with run_step_with_hooks(scope, step):
logger.info(f"Running step '{step.display_name}'")
await bound_step()
except CancelledError:
if events.cancelled.is_set():
await asyncio.gather(
on_cancelled.trigger(scope),
on_failure.trigger(scope),
)
else:
if not events.terminated.is_set():
logger.warning("Workflow cancelled for unknown reason")

await asyncio.gather(
on_terminated.trigger(scope),
on_failure.trigger(scope),
)
except Exception as error:
scope["error"] = error
await on_failure.trigger(scope)
raise error
await asyncio.gather(on_error.trigger(scope), on_failure.trigger(scope))
else:
if "results" in scope:
await on_result.trigger(scope)
Expand Down
27 changes: 0 additions & 27 deletions virtool_workflow/_graceful_exit.py

This file was deleted.

Loading

0 comments on commit 061eecd

Please sign in to comment.