Skip to content

Commit

Permalink
Merge pull request #301 from alexhsamuel/fix/schedule-action-tests
Browse files Browse the repository at this point in the history
Fix/schedule action tests
  • Loading branch information
alexhsamuel authored Aug 9, 2023
2 parents b442dbe + 37048a2 commit 746a890
Show file tree
Hide file tree
Showing 17 changed files with 126 additions and 23 deletions.
4 changes: 2 additions & 2 deletions jobs/action/double.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ program:

action:
- type: apsis.actions.test.SleepThreadAction
condition:
if:
states: [success, failure, skipped]
duration: 5

- type: apsis.actions.test.SleepThreadAction
condition:
if:
states: [success, failure, skipped]
duration: 5

2 changes: 1 addition & 1 deletion jobs/action/slow thread.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ program:

action:
type: apsis.actions.test.SleepThreadAction
condition:
if:
states: [success, failure, skipped]
duration: 5

4 changes: 2 additions & 2 deletions jobs/test/action/schedule.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ program:
action:
type: schedule
job_id: test/action/after
condition:
if:
states: success

action:
type: schedule
job_id: test/action/fail
condition:
if:
states: [failure, error]

4 changes: 2 additions & 2 deletions python/apsis/actions/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@ def condition(self):
@classmethod
def from_jso(cls, jso):
with check_schema(jso) as pop:
condition = pop("condition", Condition.from_jso, None)
condition = pop("if", Condition.from_jso, None)
return cls(condition=condition)


def to_jso(self):
jso = super().to_jso()
if self.__condition is not None:
jso["condition"] = self.__condition.to_jso()
jso["if"] = self.__condition.to_jso()
return jso


Expand Down
8 changes: 5 additions & 3 deletions python/apsis/actions/condition.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from apsis.lib.json import check_schema
from apsis.lib.py import format_ctor
from apsis.states import ALL_STATES, states_from_jso, states_to_jso
from apsis.lib import py
from apsis.states import ALL_STATES, FINISHED, states_from_jso, states_to_jso

#-------------------------------------------------------------------------------

Expand All @@ -11,7 +11,7 @@ def __init__(self, *, states=None):


def __repr__(self):
return format_ctor(self, states=self.states)
return py.format_ctor(self, states=self.states)


def __call__(self, run):
Expand All @@ -34,3 +34,5 @@ def to_jso(self):



Condition.DEFAULT = Condition(states=FINISHED)

6 changes: 3 additions & 3 deletions python/apsis/actions/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ class ScheduleAction(BaseAction):
"""

def __init__(self, instance, *, condition=None):
super().__init__(condition=condition)
self.job_id = instance.job_id
self.args = instance.args
self.condition = condition


async def __call__(self, apsis, run):
Expand Down Expand Up @@ -49,16 +49,16 @@ def to_jso(self):
**super().to_jso(),
"job_id" : self.job_id,
"args" : self.args,
"condition" : self.condition.to_jso()
"if" : self.condition.to_jso()
}


@classmethod
def from_jso(cls, jso):
with check_schema(jso) as pop:
condition = pop("if", Condition.from_jso, None)
job_id = pop("job_id")
args = pop("args", default={})
condition = Condition.from_jso(pop("if", default=None))
return cls(Instance(job_id, args), condition=condition)


Expand Down
6 changes: 3 additions & 3 deletions python/apsis/actions/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def __repr__(self):
@classmethod
def from_jso(cls, jso):
with check_schema(jso) as pop:
condition = pop("condition", Condition.from_jso, None)
condition = pop("if", Condition.from_jso, None)
duration = pop("duration", float)
return cls(duration, condition=condition)

Expand Down Expand Up @@ -65,7 +65,7 @@ def run(self, run):
@classmethod
def from_jso(cls, jso):
with check_schema(jso) as pop:
condition = pop("condition", Condition.from_jso, None)
condition = pop("if", Condition.from_jso, None)
return cls(condition=condition)


Expand All @@ -85,7 +85,7 @@ async def __call__(self, apsis, run):
@classmethod
def from_jso(cls, jso):
with check_schema(jso) as pop:
condition = pop("condition", Condition.from_jso, None)
condition = pop("if", Condition.from_jso, None)
return cls(condition=condition)


Expand Down
8 changes: 4 additions & 4 deletions python/apsis/apsis.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,25 +409,25 @@ def __start_actions(self, run):
# Nothing to do.
return

async def wrap(run, action):
async def wrap(run, snapshot, action):
"""
Wrapper to handle exceptions from an action.
Since these are run as background tasks, we don't transition the
run to error if an action fails.
"""
try:
await action(self, run)
await action(self, snapshot)
except Exception:
self.run_log.exc(run, "action")

# The actions run in tasks and the run may transition again soon, so
# hand the actions a snapshot instead.
run = snapshot_run(self, run)
snapshot = snapshot_run(self, run)

loop = asyncio.get_event_loop()
for action in actions:
task = loop.create_task(wrap(run, action))
task = loop.create_task(wrap(run, snapshot, action))
self.__action_tasks.add(task)
task.add_done_callback(
lambda _, task=task: self.__action_tasks.remove(task))
Expand Down
6 changes: 5 additions & 1 deletion python/apsis/run_snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def snapshot_run(apsis, run):
for oi in apsis.outputs.get_metadata(run.run_id)
}

return RunSnapshot(
snapshot = RunSnapshot(
run_id =run.run_id,
inst =run.inst,
state =run.state,
Expand All @@ -52,5 +52,9 @@ def snapshot_run(apsis, run):
meta =run.meta.copy(),
outputs =outputs,
)
# FIXME: expected isn't part of the API, but we need it for now so that the
# run log can log messages from the snapshot.
snapshot.expected = run.expected
return snapshot


2 changes: 1 addition & 1 deletion test/int/action/jobs/action.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ program:
action:
type: apsis.actions.test.SleepThreadAction
duration: 2
condition:
if:
states: success

2 changes: 1 addition & 1 deletion test/int/action/jobs/with snapshot.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ program:

action:
type: apsis.actions.test.LogAction
condition:
if:
states: success

5 changes: 5 additions & 0 deletions test/int/action/schedule-jobs/fifth.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
params: [label]

program:
type: no-op

26 changes: 26 additions & 0 deletions test/int/action/schedule-jobs/first.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
params: [label]

program:
type: no-op
duration: 0.5

action:
- type: schedule
if:
states: success
job_id: second
args:
label: on-success

- type: schedule
if:
states: [failure, error]
job_id: third
args:
label: on-failure

- type: schedule
if:
states: [success, error]
job_id: fifth

3 changes: 3 additions & 0 deletions test/int/action/schedule-jobs/fourth.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
program:
type: no-op

11 changes: 11 additions & 0 deletions test/int/action/schedule-jobs/second.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
params: [label]

program:
type: no-op

action:
type: schedule
if:
states: [success, failure, error]
job_id: fourth

5 changes: 5 additions & 0 deletions test/int/action/schedule-jobs/third.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
params: [label]

program:
type: no-op

47 changes: 47 additions & 0 deletions test/int/action/test_schedule_action.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from contextlib import closing
from pathlib import Path
import pytest

from instance import ApsisInstance

#-------------------------------------------------------------------------------

job_dir = Path(__file__).absolute().parent / "schedule-jobs"

@pytest.fixture(scope="function")
def inst():
with closing(ApsisInstance(job_dir=job_dir)) as inst:
inst.create_db()
inst.write_cfg()
inst.start_serve()
inst.wait_for_serve()
yield inst


def test_schedule_action(inst):
client = inst.client

r1 = client.schedule("first", {"label": "foo"})["run_id"]
# There should be only one one right now.
assert { r["run_id"] for r in client.get_runs().values() } == {r1}

inst.wait_run(r1)

# first(label=foo) should have scheduled second(label=on-success).
run, = tuple(client.get_runs(job_id="second").values())
assert run["args"] == {"label": "on-success"}

# first(label=foo) should not have scheduled third.
runs = tuple(client.get_runs(job_id="third").values())
assert len(runs) == 0

# second(label=on-success) should have scheduled fourth().
runs = tuple(client.get_runs(job_id="fourth").values())
assert len(runs) == 1

# first(label=foo) should have scheduled fifth(label=foo) with automatic
# label.
run, = tuple(client.get_runs(job_id="fifth").values())
assert run["args"] == {"label": "foo"}


0 comments on commit 746a890

Please sign in to comment.