Skip to content

Commit

Permalink
Int test for LegacyRunningProgram.
Browse files Browse the repository at this point in the history
  • Loading branch information
alexhsamuel committed Jan 18, 2025
1 parent 8cdd0db commit f98264d
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 0 deletions.
56 changes: 56 additions & 0 deletions python/apsis/program/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import asyncio

from .base import Program, ProgramRunning, ProgramSuccess
from apsis.lib.json import check_schema
from apsis.runs import template_expand

#-------------------------------------------------------------------------------

class SimpleLegacyProgram(Program):
"""
Test program that uses the *deprecated* legacy program API, namely the
`start()` and `reconnect()` methods.
"""

def __init__(self, time):
self.time = time


def to_jso(self):
return super().to_jso() | {"time": self.time}


@classmethod
def from_jso(cls, jso):
with check_schema(jso) as pop:
time = pop("time")
return cls(time)


def bind(self, args):
return type(self)(template_expand(self.time, args))


async def __finish(self):
meta = {"color": "green"}
await asyncio.sleep(float(self.time))
return ProgramSuccess(meta=meta)


async def start(self, run_id, cfg):
run_state = {
"run_id": run_id,
"secret": 42,
}
meta = {"color": "yellow"}
running = ProgramRunning(run_state, meta=meta)
return running, self.__finish()


def reconnect(self, run_id, run_state):
assert run_state["run_id"] == run_id
assert run_state["secret"] == 42
return self.__finish()



26 changes: 26 additions & 0 deletions test/int/basic/test_legacy_program.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from instance import ApsisService

#-------------------------------------------------------------------------------

def test_reconnect_legacy_program():
job = {
"program": {
"type": "apsis.program.test.SimpleLegacyProgram",
"time": 2,
}
}

with ApsisService() as svc:
run_id = svc.client.schedule_adhoc("now", job)["run_id"]
res = svc.wait_run(run_id, wait_states=("new", "scheduled", "waiting", "starting"))
assert res["state"] == "running"

svc.restart()

res = svc.client.get_run(run_id)
assert res["state"] == "running"

res = svc.wait_run(run_id)
assert res["state"] == "success"


0 comments on commit f98264d

Please sign in to comment.