-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Litestream Apsis db replica test #338
Merged
alexhsamuel
merged 11 commits into
apsis-scheduler:master
from
LudovicoRighi:litestream
Jul 23, 2024
Merged
Changes from 5 commits
Commits
Show all changes
11 commits
Select commit
Hold shift + click to select a range
b3864e9
Add test for Litestream db replica
62173f4
Add Litestream test where Apsis is killed with SIGKILL
6d28f8a
Make litestream operations reusable
57eb54a
Modify `job1`
ca2d384
(Fix comment)
d2bc2aa
Remove duplicate client lines
65ff109
Assert runs succeded
8c98efc
Skip litestream tests if litestream is not available
a5d8173
Use `shutil.which` to check if `litestream` is available
1881678
Update test/int/litestream/test_replica.py
alexhsamuel 4f3ec6d
Update test/int/litestream/test_replica.py
alexhsamuel File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
program: | ||
type: shell | ||
command: /usr/bin/sleep 10 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,176 @@ | ||
from pathlib import Path | ||
from contextlib import closing | ||
import subprocess | ||
import os | ||
import signal | ||
from time import sleep | ||
|
||
from instance import ApsisService | ||
|
||
|
||
JOB_DIR = Path(__file__).parent / "jobs" | ||
|
||
# ------------------------------------------------------------------------------- | ||
|
||
|
||
def start_litestream(db_path, replica_path): | ||
|
||
return subprocess.Popen( | ||
[ | ||
"litestream", | ||
"replicate", | ||
db_path, | ||
f"file://{str(replica_path)}", | ||
] | ||
) | ||
|
||
|
||
def restore_litestream_db(restored_db_path, litestream_replica_path): | ||
subprocess.run( | ||
[ | ||
"litestream", | ||
"restore", | ||
"-o", | ||
str(restored_db_path), | ||
f"file://{str(litestream_replica_path)}", | ||
], | ||
check=True, | ||
) | ||
|
||
|
||
def test_replica(): | ||
""" | ||
Tests Litestream replica of the SQLite db. | ||
|
||
Steps: | ||
- start Litestream process; | ||
- start Apsis and populate its db with some data; | ||
- stop Apsis (i.e. simulating a db failure) and Litestream processes; | ||
- restore db from Litestream replica; | ||
- check that Apsis works fine with the restored db and that data initially written to the original db are there. | ||
""" | ||
|
||
with closing(ApsisService(job_dir=JOB_DIR)) as inst: | ||
|
||
inst.create_db() | ||
inst.write_cfg() | ||
|
||
# start Litestream replica | ||
litestream_replica_path = inst.tmp_dir / "litestream_replica.db" | ||
|
||
with start_litestream( | ||
inst.db_path, litestream_replica_path | ||
) as litestream_process: | ||
|
||
inst.start_serve() | ||
inst.wait_for_serve() | ||
|
||
# populate apsis db with some data | ||
client = inst.client | ||
run_ids = [] | ||
expected_states = ["success", "failure", "error", "skipped"] | ||
for state in expected_states: | ||
run_id = client.schedule("job1", {})["run_id"] | ||
inst.wait_run(run_id) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Check that the run succeeded. |
||
if state != "success": | ||
client.mark(run_id, state) | ||
run_ids.append(run_id) | ||
|
||
# stop Apsis and Litestream | ||
inst.stop_serve() | ||
litestream_process.terminate() | ||
|
||
# restore the db from the replica | ||
restored_db_name = "restored.db" | ||
restored_db_path = inst.tmp_dir / restored_db_name | ||
|
||
restore_litestream_db(restored_db_path, litestream_replica_path) | ||
|
||
# rewrite the config to use restored db | ||
inst.db_path = restored_db_path | ||
inst.write_cfg() | ||
|
||
# restart Apsis | ||
inst.start_serve() | ||
inst.wait_for_serve() | ||
|
||
log = inst.get_log_lines() | ||
assert any(restored_db_name in l for l in log) | ||
|
||
# check runs data is accurate in the restored db | ||
client = inst.client | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You already have this above. |
||
for id, state in zip(run_ids, expected_states): | ||
assert client.get_run(id)["state"] == state | ||
|
||
# run job again to verify the db is in a good state and Apsis can read/write it | ||
new_run_id = client.schedule("job1", {})["run_id"] | ||
inst.wait_run(new_run_id) | ||
assert client.get_run(new_run_id)["state"] == "success" | ||
|
||
inst.stop_serve() | ||
|
||
|
||
def test_replica_killing_apsis_and_litestream(): | ||
""" | ||
Similar to `test_replica`, but here: | ||
- Apsis and Litestream processes are terminated with SIGKILL signal; | ||
- Apsis is killed while there is still a run in the running state; | ||
|
||
Steps: | ||
- start Litestream process; | ||
- start Apsis and schedule a run; | ||
- kill Apsis and Litestream using SIGKILL signals; | ||
- restore db from Litestream replica; | ||
- check that the run is still in running state using the restored db. | ||
""" | ||
|
||
with closing(ApsisService(job_dir=JOB_DIR)) as inst: | ||
|
||
inst.create_db() | ||
inst.write_cfg() | ||
|
||
# start Litestream replica | ||
litestream_replica_path = inst.tmp_dir / "litestream_replica.db" | ||
|
||
with start_litestream( | ||
inst.db_path, litestream_replica_path | ||
) as litestream_process: | ||
|
||
inst.start_serve() | ||
inst.wait_for_serve() | ||
|
||
client = inst.client | ||
run_id = client.schedule("job1", {})["run_id"] | ||
|
||
# stop Apsis and Litestream | ||
sleep(1) | ||
os.kill(inst.srv_proc.pid, signal.SIGKILL) | ||
sleep(1) | ||
os.kill(litestream_process.pid, signal.SIGKILL) | ||
|
||
# restore the db from the replica | ||
restored_db_name = "restored.db" | ||
restored_db_path = inst.tmp_dir / restored_db_name | ||
restore_litestream_db(restored_db_path, litestream_replica_path) | ||
|
||
# rewrite the config to use restored db | ||
inst.db_path = restored_db_path | ||
inst.write_cfg() | ||
|
||
# restart Apsis | ||
inst.srv_proc = None | ||
inst.start_serve() | ||
inst.wait_for_serve() | ||
|
||
log = inst.get_log_lines() | ||
assert any(restored_db_name in l for l in log) | ||
|
||
# check the run is still in the running state after reconnecting to the new Apsis instance | ||
client = inst.client | ||
assert client.get_run(run_id)["state"] == "running" | ||
|
||
# check run eventually completes successfully | ||
inst.wait_run(run_id) | ||
assert client.get_run(run_id)["state"] == "success" | ||
|
||
inst.stop_serve() |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a
@pytest.mark.skipif
if litestream isn't available.