From aff5c9671675e25a0a5185b370ba0327e1372470 Mon Sep 17 00:00:00 2001 From: Josh Ferge Date: Wed, 26 Feb 2025 10:57:48 -0500 Subject: [PATCH] ref(cleanup): use concurrency for fileblobs (#85800) reopening https://github.com/getsentry/sentry/pull/68556 - [x] adds new worker queue for deleting files which uses the `concurrency` setting - [x] wait for both queues at end of job tested the job runs successfully locally. we do not have tests on this functionality, so will have to be very careful and watch various dashboards when merging. --- src/sentry/runner/commands/cleanup.py | 103 ++++++++++++++++++++------ 1 file changed, 82 insertions(+), 21 deletions(-) diff --git a/src/sentry/runner/commands/cleanup.py b/src/sentry/runner/commands/cleanup.py index 7e4bc6ce989b94..c6fa8c0f54f1c1 100644 --- a/src/sentry/runner/commands/cleanup.py +++ b/src/sentry/runner/commands/cleanup.py @@ -37,9 +37,11 @@ def get_project(value: str) -> int | None: # an identity on an object() isn't guaranteed to work between parent # and child proc _STOP_WORKER: Final = "91650ec271ae4b3e8a67cdc909d80f8c" -_WorkQueue: TypeAlias = ( +_EntityWorkQueue: TypeAlias = ( "Queue[Literal['91650ec271ae4b3e8a67cdc909d80f8c'] | tuple[str, tuple[int, ...]]]" ) +_FileWorkQueue: TypeAlias = "Queue[Literal['91650ec271ae4b3e8a67cdc909d80f8c'] | tuple[int]]" + API_TOKEN_TTL_IN_DAYS = 30 @@ -50,7 +52,7 @@ def debug_output(msg: str) -> None: click.echo(msg) -def multiprocess_worker(task_queue: _WorkQueue) -> None: +def multiprocess_delete_entity_worker(task_queue: _EntityWorkQueue) -> None: # Configure within each Process import logging @@ -101,6 +103,46 @@ def multiprocess_worker(task_queue: _WorkQueue) -> None: task_queue.task_done() +def multiprocess_del_file_worker(task_queue: _FileWorkQueue) -> None: + # Configure within each Process + import logging + + logger = logging.getLogger("sentry.cleanup") + + from sentry.runner import configure + + configure() + + from sentry.models.files.file import File + from sentry.models.files.fileblob import FileBlob + from sentry.models.files.fileblobindex import FileBlobIndex + + while True: + j = task_queue.get() + try: + if j == _STOP_WORKER: + return + + (blob_id,) = j + + blob = FileBlob.objects.get(pk=blob_id) + + # These checks were moved out of the query iterator and into the multiprocess task queue because they + # were bottlenecking the iteration. + if ( + FileBlobIndex.objects.filter(blob=blob).exists() + or File.objects.filter(blob=blob).exists() + ): + # still used + continue + + blob.delete() + except Exception as e: + logger.exception(e) + finally: + task_queue.task_done() + + @click.command() @click.option("--days", default=30, show_default=True, help="Numbers of days to truncate on.") @click.option("--project", help="Limit truncation to only entries from project.") @@ -153,9 +195,21 @@ def cleanup( # before we import or configure the app pool = [] - task_queue: _WorkQueue = Queue(1000) + queues: list[_EntityWorkQueue | _FileWorkQueue] = [] + del_entity_task_queue: _EntityWorkQueue = Queue(1000) + queues.append(del_entity_task_queue) + + for _ in range(concurrency): + p = Process(target=multiprocess_delete_entity_worker, args=(del_entity_task_queue,)) + p.daemon = True + p.start() + pool.append(p) + + del_file_task_queue: _FileWorkQueue = Queue(1000) + queues.append(del_file_task_queue) + for _ in range(concurrency): - p = Process(target=multiprocess_worker, args=(task_queue,)) + p = Process(target=multiprocess_del_file_worker, args=(del_file_task_queue,)) p.daemon = True p.start() pool.append(p) @@ -236,9 +290,9 @@ def is_filtered(model: type[Model]) -> bool: ) for chunk in q.iterator(chunk_size=100): - task_queue.put((imp, chunk)) + del_entity_task_queue.put((imp, chunk)) - task_queue.join() + del_entity_task_queue.join() project_deletion_query, to_delete_by_project = prepare_deletes_by_project( project, project_id, is_filtered @@ -266,21 +320,30 @@ def is_filtered(model: type[Model]) -> bool: ) for chunk in q.iterator(chunk_size=100): - task_queue.put((imp, chunk)) + del_entity_task_queue.put((imp, chunk)) - task_queue.join() + del_entity_task_queue.join() - remove_file_blobs(is_filtered, silent) + remove_file_blobs(del_file_task_queue, is_filtered, silent) finally: + if not silent: + click.echo("Beginning shutdown") # Shut down our pool - for _ in pool: - task_queue.put(_STOP_WORKER) + for _q in queues: + for _ in range(concurrency): + _q.put(_STOP_WORKER) + + if not silent: + click.echo("Waiting for pool to shutdown") # And wait for it to drain for p in pool: p.join() + if not silent: + click.echo("Process pool has stopped") + if timed and start_time: duration = int(time.time() - start_time) metrics.timing("cleanup.duration", duration, instance=router, sample_rate=1.0) @@ -486,7 +549,9 @@ def prepare_deletes_by_project( return project_deletion_query, to_delete_by_project -def remove_file_blobs(is_filtered: Callable[[type[Model]], bool], silent: bool) -> None: +def remove_file_blobs( + del_file_task_queue: _FileWorkQueue, is_filtered: Callable[[type[Model]], bool], silent: bool +) -> None: from sentry.models.file import FileBlob # Clean up FileBlob instances which are no longer used and aren't super @@ -495,10 +560,10 @@ def remove_file_blobs(is_filtered: Callable[[type[Model]], bool], silent: bool) if is_filtered(FileBlob): debug_output(">> Skipping FileBlob") else: - cleanup_unused_files(silent) + cleanup_unused_files(del_file_task_queue, silent) -def cleanup_unused_files(quiet: bool = False) -> None: +def cleanup_unused_files(del_file_task_queue: _FileWorkQueue, quiet: bool = False) -> None: """ Remove FileBlob's (and thus the actual files) if they are no longer referenced by any File. @@ -507,9 +572,7 @@ def cleanup_unused_files(quiet: bool = False) -> None: any blobs which are brand new and potentially in the process of being referenced. """ - from sentry.models.files.file import File from sentry.models.files.fileblob import FileBlob - from sentry.models.files.fileblobindex import FileBlobIndex if quiet: from sentry.utils.query import RangeQuerySetWrapper @@ -520,8 +583,6 @@ def cleanup_unused_files(quiet: bool = False) -> None: queryset = FileBlob.objects.filter(timestamp__lte=cutoff) for blob in RangeQuerySetWrapper(queryset): - if FileBlobIndex.objects.filter(blob=blob).exists(): - continue - if File.objects.filter(blob=blob).exists(): - continue - blob.delete() + del_file_task_queue.put((blob.id,)) + + del_file_task_queue.join()