From b3864e947fb33207f0bd256cdbe697717b48a554 Mon Sep 17 00:00:00 2001 From: Ludovico Righi Date: Fri, 19 Jul 2024 10:51:49 -0400 Subject: [PATCH 01/11] Add test for Litestream db replica --- test/int/litestream/jobs/job1.yaml | 2 + test/int/litestream/test_replica.py | 94 +++++++++++++++++++++++++++++ 2 files changed, 96 insertions(+) create mode 100644 test/int/litestream/jobs/job1.yaml create mode 100644 test/int/litestream/test_replica.py diff --git a/test/int/litestream/jobs/job1.yaml b/test/int/litestream/jobs/job1.yaml new file mode 100644 index 00000000..4b0f520d --- /dev/null +++ b/test/int/litestream/jobs/job1.yaml @@ -0,0 +1,2 @@ +program: + type: no-op diff --git a/test/int/litestream/test_replica.py b/test/int/litestream/test_replica.py new file mode 100644 index 00000000..811eaaa8 --- /dev/null +++ b/test/int/litestream/test_replica.py @@ -0,0 +1,94 @@ +from pathlib import Path +from contextlib import closing +import subprocess + +from instance import ApsisService + + +JOB_DIR = Path(__file__).parent / "jobs" + +# ------------------------------------------------------------------------------- + + +def test_replica(): + """ + Tests Litestream replica of the SQLite db. + + Steps: + - start Litestream process; + - start Apsis and populate its db with some data; + - stop Apsis (i.e. simulating a db failure) and Litestream processes; + - restore db from Litestream replica; + - check that Apsis works fine with the restored db and that data initially written to the original db are there. + """ + + with closing(ApsisService(job_dir=JOB_DIR)) as inst: + + inst.create_db() + inst.write_cfg() + + # start Litestream replica + litestream_replica_path = inst.tmp_dir / "litestream_replica.db" + with subprocess.Popen( + [ + "litestream", + "replicate", + inst.db_path, + f"file://{str(litestream_replica_path)}", + ] + ) as litestream_process: + + inst.start_serve() + inst.wait_for_serve() + + # populate apsis db with some data + client = inst.client + run_ids = [] + expected_states = ["success", "failure", "error", "skipped"] + for state in expected_states: + run_id = client.schedule("job1", {})["run_id"] + inst.wait_run(run_id) + if state != "success": + client.mark(run_id, state) + run_ids.append(run_id) + + # stop Apsis and Litestream + inst.stop_serve() + litestream_process.terminate() + + # restore the db from the replica + restored_db_name = "restored.db" + restored_db_path = inst.tmp_dir / restored_db_name + subprocess.run( + [ + "litestream", + "restore", + "-o", + str(restored_db_path), + f"file://{str(litestream_replica_path)}", + ], + check=True, + ) + + # rewrite the config to use restored db + inst.db_path = restored_db_path + inst.write_cfg() + + # restart Apsis + inst.start_serve() + inst.wait_for_serve() + + log = inst.get_log_lines() + assert any(restored_db_name in l for l in log) + + # check runs data is accurate in the restored db + client = inst.client + for id, state in zip(run_ids, expected_states): + assert client.get_run(id)["state"] == state + + # run job again to verify the db is in a good state and Apsis can read/write it + new_run_id = client.schedule("job1", {})["run_id"] + inst.wait_run(new_run_id) + assert client.get_run(new_run_id)["state"] == "success" + + inst.stop_serve() From 62173f426f803067274e791253cdd8949a4e7c96 Mon Sep 17 00:00:00 2001 From: Ludovico Righi Date: Mon, 22 Jul 2024 14:07:31 -0400 Subject: [PATCH 02/11] Add Litestream test where Apsis is killed with SIGKILL --- test/int/litestream/test_replica.py | 83 +++++++++++++++++++++++++++++ 1 file changed, 83 insertions(+) diff --git a/test/int/litestream/test_replica.py b/test/int/litestream/test_replica.py index 811eaaa8..5ebe9de8 100644 --- a/test/int/litestream/test_replica.py +++ b/test/int/litestream/test_replica.py @@ -1,6 +1,9 @@ from pathlib import Path from contextlib import closing import subprocess +import os +import signal +from time import sleep from instance import ApsisService @@ -92,3 +95,83 @@ def test_replica(): assert client.get_run(new_run_id)["state"] == "success" inst.stop_serve() + + +def test_replica_killing_apsis_and_litestream(): + """ + Similar to `test_replica`, but here: + - Apsis and Litestream processes are terminated with SIGKILL signal; + - Apsis is killed while there is still a run in the running state; + + Steps: + - start Litestream process; + - start Apsis and schedule a run; + - kill Apsis and Litestream using SIGKILL signals; + - stop Litestream process gracefully through SIGTERM; + - restore db from Litestream replica; + - check that the run is still in running state using the restored db. + """ + + with closing(ApsisService(job_dir=JOB_DIR)) as inst: + + inst.create_db() + inst.write_cfg() + + # start Litestream replica + litestream_replica_path = inst.tmp_dir / "litestream_replica.db" + with subprocess.Popen( + [ + "litestream", + "replicate", + inst.db_path, + f"file://{str(litestream_replica_path)}", + ] + ) as litestream_process: + + inst.start_serve() + inst.wait_for_serve() + + client = inst.client + run_id = client.schedule("job1", {})["run_id"] + + # stop Apsis and Litestream + sleep(1) + os.kill(inst.srv_proc.pid, signal.SIGKILL) + sleep(1) + os.kill(litestream_process.pid, signal.SIGKILL) + + # restore the db from the replica + restored_db_name = "restored.db" + restored_db_path = inst.tmp_dir / restored_db_name + subprocess.run( + [ + "litestream", + "restore", + "-o", + str(restored_db_path), + f"file://{str(litestream_replica_path)}", + ], + check=True, + ) + + # rewrite the config to use restored db + inst.db_path = restored_db_path + inst.write_cfg() + + # restart Apsis + inst.srv_proc = None + inst.start_serve() + inst.wait_for_serve() + + log = inst.get_log_lines() + assert any(restored_db_name in l for l in log) + + # check the run is still in the running state after reconnecting to the new Apsis instance + client = inst.client + assert client.get_run(run_id)["state"] == "running" + + # check run eventually completes successfully + inst.wait_run(run_id) + assert client.get_run(run_id)["state"] == "success" + + inst.stop_serve() From 6d28f8a5a309aae5eef010f0257bdd0296711232 Mon Sep 17 00:00:00 2001 From: Ludovico Righi Date: Mon, 22 Jul 2024 14:13:57 -0400 Subject: [PATCH 03/11] Make litestream operations reusable --- test/int/litestream/test_replica.py | 68 ++++++++++++++--------------- 1 file changed, 34 insertions(+), 34 deletions(-) diff --git a/test/int/litestream/test_replica.py b/test/int/litestream/test_replica.py index 5ebe9de8..90038050 100644 --- a/test/int/litestream/test_replica.py +++ b/test/int/litestream/test_replica.py @@ -13,6 +13,31 @@ # ------------------------------------------------------------------------------- +def start_litestream(db_path, replica_path): + + return subprocess.Popen( + [ + "litestream", + "replicate", + db_path, + f"file://{str(replica_path)}", + ] + ) + + +def restore_litestream_db(restored_db_path, litestream_replica_path): + subprocess.run( + [ + "litestream", + "restore", + "-o", + str(restored_db_path), + f"file://{str(litestream_replica_path)}", + ], + check=True, + ) + + def test_replica(): """ Tests Litestream replica of the SQLite db. @@ -32,13 +57,9 @@ def test_replica(): # start Litestream replica litestream_replica_path = inst.tmp_dir / "litestream_replica.db" - with subprocess.Popen( - [ - "litestream", - "replicate", - inst.db_path, - f"file://{str(litestream_replica_path)}", - ] + + with start_litestream( + inst.db_path, litestream_replica_path ) as litestream_process: inst.start_serve() @@ -62,16 +83,8 @@ def test_replica(): # restore the db from the replica restored_db_name = "restored.db" restored_db_path = inst.tmp_dir / restored_db_name - subprocess.run( - [ - "litestream", - "restore", - "-o", - str(restored_db_path), - f"file://{str(litestream_replica_path)}", - ], - check=True, - ) + + restore_litestream_db(restored_db_path, litestream_replica_path) # rewrite the config to use restored db inst.db_path = restored_db_path @@ -119,13 +132,9 @@ def test_replica_killing_apsis_and_litestream(): # start Litestream replica litestream_replica_path = inst.tmp_dir / "litestream_replica.db" - with subprocess.Popen( - [ - "litestream", - "replicate", - inst.db_path, - f"file://{str(litestream_replica_path)}", - ] + + with start_litestream( + inst.db_path, litestream_replica_path ) as litestream_process: inst.start_serve() @@ -143,16 +152,7 @@ def test_replica_killing_apsis_and_litestream(): # restore the db from the replica restored_db_name = "restored.db" restored_db_path = inst.tmp_dir / restored_db_name - subprocess.run( - [ - "litestream", - "restore", - "-o", - str(restored_db_path), - f"file://{str(litestream_replica_path)}", - ], - check=True, - ) + restore_litestream_db(restored_db_path, litestream_replica_path) # rewrite the config to use restored db inst.db_path = restored_db_path From 57eb54a15029575c5ff4db1d5dfa4f4bb6f0c301 Mon Sep 17 00:00:00 2001 From: Ludovico Righi Date: Mon, 22 Jul 2024 14:18:39 -0400 Subject: [PATCH 04/11] Modify `job1` --- test/int/litestream/jobs/job1.yaml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/int/litestream/jobs/job1.yaml b/test/int/litestream/jobs/job1.yaml index 4b0f520d..9b3c8c8d 100644 --- a/test/int/litestream/jobs/job1.yaml +++ b/test/int/litestream/jobs/job1.yaml @@ -1,2 +1,3 @@ program: - type: no-op + type: shell + command: /usr/bin/sleep 10 From ca2d3843e33034a86dfb2171041539f321a56ddf Mon Sep 17 00:00:00 2001 From: Ludovico Righi Date: Mon, 22 Jul 2024 14:23:46 -0400 Subject: [PATCH 05/11] (Fix comment) --- test/int/litestream/test_replica.py | 1 - 1 file changed, 1 deletion(-) diff --git a/test/int/litestream/test_replica.py b/test/int/litestream/test_replica.py index 90038050..903f94ea 100644 --- a/test/int/litestream/test_replica.py +++ b/test/int/litestream/test_replica.py @@ -120,7 +120,6 @@ def test_replica_killing_apsis_and_litestream(): - start Litestream process; - start Apsis and schedule a run; - kill Apsis and Litestream using SIGKILL signals; - - stop Litestream process gracefully through SIGTERM; - restore db from Litestream replica; - check that the run is still in running state using the restored db. """ From d2bc2aa0ff6a9dc5c6bd93e36b25932a29aab4d5 Mon Sep 17 00:00:00 2001 From: Ludovico Righi Date: Tue, 23 Jul 2024 08:38:48 -0400 Subject: [PATCH 06/11] Remove duplicate client lines --- test/int/litestream/test_replica.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/test/int/litestream/test_replica.py b/test/int/litestream/test_replica.py index 903f94ea..e6f3d0ac 100644 --- a/test/int/litestream/test_replica.py +++ b/test/int/litestream/test_replica.py @@ -98,7 +98,6 @@ def test_replica(): assert any(restored_db_name in l for l in log) # check runs data is accurate in the restored db - client = inst.client for id, state in zip(run_ids, expected_states): assert client.get_run(id)["state"] == state @@ -166,7 +165,6 @@ def test_replica_killing_apsis_and_litestream(): assert any(restored_db_name in l for l in log) # check the run is still in the running state after reconnecting to the new Apsis instance - client = inst.client assert client.get_run(run_id)["state"] == "running" # check run eventually completes successfully From 65ff109f31a909f21d0309c0b36d97e8bf687517 Mon Sep 17 00:00:00 2001 From: Ludovico Righi Date: Tue, 23 Jul 2024 08:39:12 -0400 Subject: [PATCH 07/11] Assert runs succeded --- test/int/litestream/test_replica.py | 1 + 1 file changed, 1 insertion(+) diff --git a/test/int/litestream/test_replica.py b/test/int/litestream/test_replica.py index e6f3d0ac..cf6dc6bc 100644 --- a/test/int/litestream/test_replica.py +++ b/test/int/litestream/test_replica.py @@ -72,6 +72,7 @@ def test_replica(): for state in expected_states: run_id = client.schedule("job1", {})["run_id"] inst.wait_run(run_id) + assert client.get_run(run_id)["state"] == "success" if state != "success": client.mark(run_id, state) run_ids.append(run_id) From 8c98efc8966089763990b081a85b14c1ec13742f Mon Sep 17 00:00:00 2001 From: Ludovico Righi Date: Tue, 23 Jul 2024 08:39:44 -0400 Subject: [PATCH 08/11] Skip litestream tests if litestream is not available --- test/int/litestream/test_replica.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/test/int/litestream/test_replica.py b/test/int/litestream/test_replica.py index cf6dc6bc..afa58ac6 100644 --- a/test/int/litestream/test_replica.py +++ b/test/int/litestream/test_replica.py @@ -4,6 +4,7 @@ import os import signal from time import sleep +import pytest from instance import ApsisService @@ -13,6 +14,19 @@ # ------------------------------------------------------------------------------- +def is_litestream_available(): + try: + subprocess.run( + ["litestream", "version"], + check=True, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + except (subprocess.CalledProcessError, FileNotFoundError): + return False + return True + + def start_litestream(db_path, replica_path): return subprocess.Popen( @@ -38,6 +52,7 @@ def restore_litestream_db(restored_db_path, litestream_replica_path): ) +@pytest.mark.skipif(not is_litestream_available(), reason="Litestream is not available") def test_replica(): """ Tests Litestream replica of the SQLite db. @@ -110,6 +125,7 @@ def test_replica(): inst.stop_serve() +@pytest.mark.skipif(not is_litestream_available(), reason="Litestream is not available") def test_replica_killing_apsis_and_litestream(): """ Similar to `test_replica`, but here: From a5d81734ab144e9d281bc7560d3077c8cbb8be64 Mon Sep 17 00:00:00 2001 From: Ludovico Righi Date: Tue, 23 Jul 2024 10:09:14 -0400 Subject: [PATCH 09/11] Use `shutil.which` to check if `litestream` is available --- test/int/litestream/test_replica.py | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/test/int/litestream/test_replica.py b/test/int/litestream/test_replica.py index afa58ac6..0515c1d2 100644 --- a/test/int/litestream/test_replica.py +++ b/test/int/litestream/test_replica.py @@ -15,16 +15,7 @@ def is_litestream_available(): - try: - subprocess.run( - ["litestream", "version"], - check=True, - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - ) - except (subprocess.CalledProcessError, FileNotFoundError): - return False - return True + return True if shutil.which("litestream") else False def start_litestream(db_path, replica_path): From 1881678045f4952bf58c8ed89abec40746963c1b Mon Sep 17 00:00:00 2001 From: Alex Samuel Date: Tue, 23 Jul 2024 10:53:39 -0400 Subject: [PATCH 10/11] Update test/int/litestream/test_replica.py --- test/int/litestream/test_replica.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/int/litestream/test_replica.py b/test/int/litestream/test_replica.py index 0515c1d2..e3bab5cd 100644 --- a/test/int/litestream/test_replica.py +++ b/test/int/litestream/test_replica.py @@ -15,7 +15,7 @@ def is_litestream_available(): - return True if shutil.which("litestream") else False + return shutil.which("litestream") is not None def start_litestream(db_path, replica_path): From 4f3ec6da1c1030fa49729514bc56ff04eace881b Mon Sep 17 00:00:00 2001 From: Alex Samuel Date: Tue, 23 Jul 2024 10:56:52 -0400 Subject: [PATCH 11/11] Update test/int/litestream/test_replica.py --- test/int/litestream/test_replica.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/test/int/litestream/test_replica.py b/test/int/litestream/test_replica.py index e3bab5cd..e905f1a3 100644 --- a/test/int/litestream/test_replica.py +++ b/test/int/litestream/test_replica.py @@ -1,10 +1,11 @@ -from pathlib import Path -from contextlib import closing -import subprocess +from contextlib import closing import os -import signal -from time import sleep +from pathlib import Path import pytest +import shutil +import signal +import subprocess +from time import sleep from instance import ApsisService