diff --git a/src/sentry/runner/commands/cleanup.py b/src/sentry/runner/commands/cleanup.py index 7e4bc6ce989b94..c6fa8c0f54f1c1 100644 --- a/src/sentry/runner/commands/cleanup.py +++ b/src/sentry/runner/commands/cleanup.py @@ -37,9 +37,11 @@ def get_project(value: str) -> int | None: # an identity on an object() isn't guaranteed to work between parent # and child proc _STOP_WORKER: Final = "91650ec271ae4b3e8a67cdc909d80f8c" -_WorkQueue: TypeAlias = ( +_EntityWorkQueue: TypeAlias = ( "Queue[Literal['91650ec271ae4b3e8a67cdc909d80f8c'] | tuple[str, tuple[int, ...]]]" ) +_FileWorkQueue: TypeAlias = "Queue[Literal['91650ec271ae4b3e8a67cdc909d80f8c'] | tuple[int]]" + API_TOKEN_TTL_IN_DAYS = 30 @@ -50,7 +52,7 @@ def debug_output(msg: str) -> None: click.echo(msg) -def multiprocess_worker(task_queue: _WorkQueue) -> None: +def multiprocess_delete_entity_worker(task_queue: _EntityWorkQueue) -> None: # Configure within each Process import logging @@ -101,6 +103,46 @@ def multiprocess_worker(task_queue: _WorkQueue) -> None: task_queue.task_done() +def multiprocess_del_file_worker(task_queue: _FileWorkQueue) -> None: + # Configure within each Process + import logging + + logger = logging.getLogger("sentry.cleanup") + + from sentry.runner import configure + + configure() + + from sentry.models.files.file import File + from sentry.models.files.fileblob import FileBlob + from sentry.models.files.fileblobindex import FileBlobIndex + + while True: + j = task_queue.get() + try: + if j == _STOP_WORKER: + return + + (blob_id,) = j + + blob = FileBlob.objects.get(pk=blob_id) + + # These checks were moved out of the query iterator and into the multiprocess task queue because they + # were bottlenecking the iteration. + if ( + FileBlobIndex.objects.filter(blob=blob).exists() + or File.objects.filter(blob=blob).exists() + ): + # still used + continue + + blob.delete() + except Exception as e: + logger.exception(e) + finally: + task_queue.task_done() + + @click.command() @click.option("--days", default=30, show_default=True, help="Numbers of days to truncate on.") @click.option("--project", help="Limit truncation to only entries from project.") @@ -153,9 +195,21 @@ def cleanup( # before we import or configure the app pool = [] - task_queue: _WorkQueue = Queue(1000) + queues: list[_EntityWorkQueue | _FileWorkQueue] = [] + del_entity_task_queue: _EntityWorkQueue = Queue(1000) + queues.append(del_entity_task_queue) + + for _ in range(concurrency): + p = Process(target=multiprocess_delete_entity_worker, args=(del_entity_task_queue,)) + p.daemon = True + p.start() + pool.append(p) + + del_file_task_queue: _FileWorkQueue = Queue(1000) + queues.append(del_file_task_queue) + for _ in range(concurrency): - p = Process(target=multiprocess_worker, args=(task_queue,)) + p = Process(target=multiprocess_del_file_worker, args=(del_file_task_queue,)) p.daemon = True p.start() pool.append(p) @@ -236,9 +290,9 @@ def is_filtered(model: type[Model]) -> bool: ) for chunk in q.iterator(chunk_size=100): - task_queue.put((imp, chunk)) + del_entity_task_queue.put((imp, chunk)) - task_queue.join() + del_entity_task_queue.join() project_deletion_query, to_delete_by_project = prepare_deletes_by_project( project, project_id, is_filtered @@ -266,21 +320,30 @@ def is_filtered(model: type[Model]) -> bool: ) for chunk in q.iterator(chunk_size=100): - task_queue.put((imp, chunk)) + del_entity_task_queue.put((imp, chunk)) - task_queue.join() + del_entity_task_queue.join() - remove_file_blobs(is_filtered, silent) + remove_file_blobs(del_file_task_queue, is_filtered, silent) finally: + if not silent: + click.echo("Beginning shutdown") # Shut down our pool - for _ in pool: - task_queue.put(_STOP_WORKER) + for _q in queues: + for _ in range(concurrency): + _q.put(_STOP_WORKER) + + if not silent: + click.echo("Waiting for pool to shutdown") # And wait for it to drain for p in pool: p.join() + if not silent: + click.echo("Process pool has stopped") + if timed and start_time: duration = int(time.time() - start_time) metrics.timing("cleanup.duration", duration, instance=router, sample_rate=1.0) @@ -486,7 +549,9 @@ def prepare_deletes_by_project( return project_deletion_query, to_delete_by_project -def remove_file_blobs(is_filtered: Callable[[type[Model]], bool], silent: bool) -> None: +def remove_file_blobs( + del_file_task_queue: _FileWorkQueue, is_filtered: Callable[[type[Model]], bool], silent: bool +) -> None: from sentry.models.file import FileBlob # Clean up FileBlob instances which are no longer used and aren't super @@ -495,10 +560,10 @@ def remove_file_blobs(is_filtered: Callable[[type[Model]], bool], silent: bool) if is_filtered(FileBlob): debug_output(">> Skipping FileBlob") else: - cleanup_unused_files(silent) + cleanup_unused_files(del_file_task_queue, silent) -def cleanup_unused_files(quiet: bool = False) -> None: +def cleanup_unused_files(del_file_task_queue: _FileWorkQueue, quiet: bool = False) -> None: """ Remove FileBlob's (and thus the actual files) if they are no longer referenced by any File. @@ -507,9 +572,7 @@ def cleanup_unused_files(quiet: bool = False) -> None: any blobs which are brand new and potentially in the process of being referenced. """ - from sentry.models.files.file import File from sentry.models.files.fileblob import FileBlob - from sentry.models.files.fileblobindex import FileBlobIndex if quiet: from sentry.utils.query import RangeQuerySetWrapper @@ -520,8 +583,6 @@ def cleanup_unused_files(quiet: bool = False) -> None: queryset = FileBlob.objects.filter(timestamp__lte=cutoff) for blob in RangeQuerySetWrapper(queryset): - if FileBlobIndex.objects.filter(blob=blob).exists(): - continue - if File.objects.filter(blob=blob).exists(): - continue - blob.delete() + del_file_task_queue.put((blob.id,)) + + del_file_task_queue.join()