Skip to content

Commit

Permalink
ref(cleanup): use concurrency for fileblobs (#85800)
Browse files Browse the repository at this point in the history
reopening #68556 

- [x] adds new worker queue for deleting files which uses the
`concurrency` setting
- [x] wait for both queues at end of job

tested the job runs successfully locally. we do not have tests on this
functionality, so will have to be very careful and watch various
dashboards when merging.
  • Loading branch information
JoshFerge authored Feb 26, 2025
1 parent 12403cc commit aff5c96
Showing 1 changed file with 82 additions and 21 deletions.
103 changes: 82 additions & 21 deletions src/sentry/runner/commands/cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@ def get_project(value: str) -> int | None:
# an identity on an object() isn't guaranteed to work between parent
# and child proc
_STOP_WORKER: Final = "91650ec271ae4b3e8a67cdc909d80f8c"
_WorkQueue: TypeAlias = (
_EntityWorkQueue: TypeAlias = (
"Queue[Literal['91650ec271ae4b3e8a67cdc909d80f8c'] | tuple[str, tuple[int, ...]]]"
)
_FileWorkQueue: TypeAlias = "Queue[Literal['91650ec271ae4b3e8a67cdc909d80f8c'] | tuple[int]]"


API_TOKEN_TTL_IN_DAYS = 30

Expand All @@ -50,7 +52,7 @@ def debug_output(msg: str) -> None:
click.echo(msg)


def multiprocess_worker(task_queue: _WorkQueue) -> None:
def multiprocess_delete_entity_worker(task_queue: _EntityWorkQueue) -> None:
# Configure within each Process
import logging

Expand Down Expand Up @@ -101,6 +103,46 @@ def multiprocess_worker(task_queue: _WorkQueue) -> None:
task_queue.task_done()


def multiprocess_del_file_worker(task_queue: _FileWorkQueue) -> None:
# Configure within each Process
import logging

logger = logging.getLogger("sentry.cleanup")

from sentry.runner import configure

configure()

from sentry.models.files.file import File
from sentry.models.files.fileblob import FileBlob
from sentry.models.files.fileblobindex import FileBlobIndex

while True:
j = task_queue.get()
try:
if j == _STOP_WORKER:
return

(blob_id,) = j

blob = FileBlob.objects.get(pk=blob_id)

# These checks were moved out of the query iterator and into the multiprocess task queue because they
# were bottlenecking the iteration.
if (
FileBlobIndex.objects.filter(blob=blob).exists()
or File.objects.filter(blob=blob).exists()
):
# still used
continue

blob.delete()
except Exception as e:
logger.exception(e)
finally:
task_queue.task_done()


@click.command()
@click.option("--days", default=30, show_default=True, help="Numbers of days to truncate on.")
@click.option("--project", help="Limit truncation to only entries from project.")
Expand Down Expand Up @@ -153,9 +195,21 @@ def cleanup(
# before we import or configure the app

pool = []
task_queue: _WorkQueue = Queue(1000)
queues: list[_EntityWorkQueue | _FileWorkQueue] = []
del_entity_task_queue: _EntityWorkQueue = Queue(1000)
queues.append(del_entity_task_queue)

for _ in range(concurrency):
p = Process(target=multiprocess_delete_entity_worker, args=(del_entity_task_queue,))
p.daemon = True
p.start()
pool.append(p)

del_file_task_queue: _FileWorkQueue = Queue(1000)
queues.append(del_file_task_queue)

for _ in range(concurrency):
p = Process(target=multiprocess_worker, args=(task_queue,))
p = Process(target=multiprocess_del_file_worker, args=(del_file_task_queue,))
p.daemon = True
p.start()
pool.append(p)
Expand Down Expand Up @@ -236,9 +290,9 @@ def is_filtered(model: type[Model]) -> bool:
)

for chunk in q.iterator(chunk_size=100):
task_queue.put((imp, chunk))
del_entity_task_queue.put((imp, chunk))

task_queue.join()
del_entity_task_queue.join()

project_deletion_query, to_delete_by_project = prepare_deletes_by_project(
project, project_id, is_filtered
Expand Down Expand Up @@ -266,21 +320,30 @@ def is_filtered(model: type[Model]) -> bool:
)

for chunk in q.iterator(chunk_size=100):
task_queue.put((imp, chunk))
del_entity_task_queue.put((imp, chunk))

task_queue.join()
del_entity_task_queue.join()

remove_file_blobs(is_filtered, silent)
remove_file_blobs(del_file_task_queue, is_filtered, silent)

finally:
if not silent:
click.echo("Beginning shutdown")
# Shut down our pool
for _ in pool:
task_queue.put(_STOP_WORKER)
for _q in queues:
for _ in range(concurrency):
_q.put(_STOP_WORKER)

if not silent:
click.echo("Waiting for pool to shutdown")

# And wait for it to drain
for p in pool:
p.join()

if not silent:
click.echo("Process pool has stopped")

if timed and start_time:
duration = int(time.time() - start_time)
metrics.timing("cleanup.duration", duration, instance=router, sample_rate=1.0)
Expand Down Expand Up @@ -486,7 +549,9 @@ def prepare_deletes_by_project(
return project_deletion_query, to_delete_by_project


def remove_file_blobs(is_filtered: Callable[[type[Model]], bool], silent: bool) -> None:
def remove_file_blobs(
del_file_task_queue: _FileWorkQueue, is_filtered: Callable[[type[Model]], bool], silent: bool
) -> None:
from sentry.models.file import FileBlob

# Clean up FileBlob instances which are no longer used and aren't super
Expand All @@ -495,10 +560,10 @@ def remove_file_blobs(is_filtered: Callable[[type[Model]], bool], silent: bool)
if is_filtered(FileBlob):
debug_output(">> Skipping FileBlob")
else:
cleanup_unused_files(silent)
cleanup_unused_files(del_file_task_queue, silent)


def cleanup_unused_files(quiet: bool = False) -> None:
def cleanup_unused_files(del_file_task_queue: _FileWorkQueue, quiet: bool = False) -> None:
"""
Remove FileBlob's (and thus the actual files) if they are no longer
referenced by any File.
Expand All @@ -507,9 +572,7 @@ def cleanup_unused_files(quiet: bool = False) -> None:
any blobs which are brand new and potentially in the process of being
referenced.
"""
from sentry.models.files.file import File
from sentry.models.files.fileblob import FileBlob
from sentry.models.files.fileblobindex import FileBlobIndex

if quiet:
from sentry.utils.query import RangeQuerySetWrapper
Expand All @@ -520,8 +583,6 @@ def cleanup_unused_files(quiet: bool = False) -> None:
queryset = FileBlob.objects.filter(timestamp__lte=cutoff)

for blob in RangeQuerySetWrapper(queryset):
if FileBlobIndex.objects.filter(blob=blob).exists():
continue
if File.objects.filter(blob=blob).exists():
continue
blob.delete()
del_file_task_queue.put((blob.id,))

del_file_task_queue.join()

0 comments on commit aff5c96

Please sign in to comment.