Skip to content

Commit

Permalink
Merge pull request #428 from alexhsamuel/feature/vacuum-program
Browse files Browse the repository at this point in the history
vacuum program
  • Loading branch information
alexhsamuel authored Dec 15, 2024
2 parents 0ad4333 + ba2537f commit 1094a25
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 14 deletions.
15 changes: 12 additions & 3 deletions docs/programs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,9 @@ This job produces a run once a minute, which appends the stats to a dated file:
Archive
^^^^^^^

A `apsis.program.interal.archive` program moves data pertaining to older runs
out of the Apsis database file, into a separate archive file. Keeping the main
Apsis database file from growing too large can avoid performance degredation.
An `ArchiveProgram` program moves data pertaining to older runs out of the Apsis
database file, into a separate archive file. Keeping the main Apsis database
file from growing too large can avoid performance degredation.

The archive program retires a run from Apsis's memory before archiving it. The
run is no longer visible through any UI. A run that is not completed cannot be
Expand Down Expand Up @@ -235,3 +235,12 @@ columns from the main database file that contains run data. The archive file
cannot be used directly by Apsis, but may be useful for historical analysis and
forensics.


Vacuum
^^^^^^

A `VacuumProgram` run vacuums (defragments and frees unused pages from) the
Apsis database file. The program blocks Apsis while it is running; schedule it
to run only during times the scheduler is otherwise quiet.


28 changes: 17 additions & 11 deletions python/apsis/program/internal/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from ..base import _InternalProgram, ProgramRunning, ProgramSuccess
from apsis.lib.json import check_schema, nkey
from apsis.lib.parse import parse_duration
from apsis.lib.timing import Timer
from apsis.runs import template_expand

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -104,9 +105,13 @@ async def wait(self, apsis):

row_counts = {}
meta = {
"run count" : 0,
"run_ids" : [],
"row counts": row_counts
"run count" : 0,
"run_ids" : [],
"row counts" : row_counts,
"time": {
"get runs" : 0,
"archive runs" : 0,
}
}

count = self.__count
Expand All @@ -115,31 +120,32 @@ async def wait(self, apsis):
count if self.__chunk_size is None
else min(count, self.__chunk_size)
)
run_ids = db.get_archive_run_ids(
before =ora.now() - self.__age,
count =chunk,
)
with Timer() as timer:
run_ids = db.get_archive_run_ids(
before =ora.now() - self.__age,
count =chunk,
)
meta["time"]["get runs"] += timer.elapsed
count -= chunk

# Make sure all runs are retired; else skip them.
run_ids = [ r for r in run_ids if apsis.run_store.retire(r) ]

if len(run_ids) > 0:
# Archive these runs.
chunk_row_counts = db.archive(self.__path, run_ids)
with Timer() as timer:
chunk_row_counts = db.archive(self.__path, run_ids)
# Accumulate metadata.
meta["run count"] += len(run_ids)
meta["run_ids"].append(run_ids)
for key, value in chunk_row_counts.items():
row_counts[key] = row_counts.get(key, 0) + value
meta["time"]["archive runs"] += timer.elapsed

if count > 0 and self.__chunk_sleep is not None:
# Yield to the event loop.
await asyncio.sleep(self.__chunk_sleep)

# Also vacuum to free space.
db.vacuum()

return ProgramSuccess(meta=meta)


Expand Down
57 changes: 57 additions & 0 deletions python/apsis/program/internal/vacuum.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import logging

from ..base import _InternalProgram, ProgramRunning, ProgramSuccess
from apsis.lib.json import check_schema
from apsis.lib.timing import Timer

log = logging.getLogger(__name__)

#-------------------------------------------------------------------------------

class VacuumProgram(_InternalProgram):
"""
A program that defragments the Apsis database.
This program runs within the Apsis process, and blocks all other activities
while it runs.
"""

def __str__(self):
return "vacuum database"


def bind(self, args):
return self


def to_jso(self):
return {
**super().to_jso()
}


@classmethod
def from_jso(cls, jso):
with check_schema(jso) as pop:
pass
return cls()


async def start(self, run_id, apsis):
return ProgramRunning({}), self.wait(apsis)


async def wait(self, apsis):
# FIXME: Private attributes.
db = apsis._Apsis__db

with Timer() as timer:
db.vacuum()

meta = {
"time": timer.elapsed,
}
return ProgramSuccess(meta=meta)



20 changes: 20 additions & 0 deletions test/int/test_archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,3 +219,23 @@ def test_clean_up_jobs(tmp_path):
assert job_ids == {job_id2, archive_job_id}


def test_vacuum():
with closing(ApsisService()) as inst:
inst.create_db()
inst.write_cfg()
inst.start_serve()
inst.wait_for_serve()

client = inst.client

res = client.schedule_adhoc("now", {
"program": {
"type": "apsis.program.internal.vacuum.VacuumProgram",
},
})
res = inst.wait_run(res["run_id"])
assert res["state"] == "success"
meta = res["meta"]["program"]
assert meta["time"] > 0


1 change: 1 addition & 0 deletions test/manual/procstar/archive/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.db
7 changes: 7 additions & 0 deletions test/manual/procstar/jobs/archive.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
params: [label, age]

program:
type: apsis.program.internal.archive.ArchiveProgram
age: "{{ age }}"
path: "/home/alex/dev/apsis/test/manual/procstar/archive/{{ label }}.db"
count: 1000000
3 changes: 3 additions & 0 deletions test/manual/procstar/jobs/vacuum.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
program:
type: apsis.program.internal.vacuum.VacuumProgram

0 comments on commit 1094a25

Please sign in to comment.