Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(cleanup): use concurrency for fileblobs #85800

Merged
merged 1 commit into from
Feb 26, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 82 additions & 21 deletions src/sentry/runner/commands/cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@ def get_project(value: str) -> int | None:
# an identity on an object() isn't guaranteed to work between parent
# and child proc
_STOP_WORKER: Final = "91650ec271ae4b3e8a67cdc909d80f8c"
_WorkQueue: TypeAlias = (
_EntityWorkQueue: TypeAlias = (
"Queue[Literal['91650ec271ae4b3e8a67cdc909d80f8c'] | tuple[str, tuple[int, ...]]]"
)
_FileWorkQueue: TypeAlias = "Queue[Literal['91650ec271ae4b3e8a67cdc909d80f8c'] | tuple[int]]"


API_TOKEN_TTL_IN_DAYS = 30

Expand All @@ -50,7 +52,7 @@ def debug_output(msg: str) -> None:
click.echo(msg)


def multiprocess_worker(task_queue: _WorkQueue) -> None:
def multiprocess_delete_entity_worker(task_queue: _EntityWorkQueue) -> None:
# Configure within each Process
import logging

Expand Down Expand Up @@ -101,6 +103,46 @@ def multiprocess_worker(task_queue: _WorkQueue) -> None:
task_queue.task_done()


def multiprocess_del_file_worker(task_queue: _FileWorkQueue) -> None:
# Configure within each Process
import logging

logger = logging.getLogger("sentry.cleanup")

from sentry.runner import configure

configure()

from sentry.models.files.file import File
from sentry.models.files.fileblob import FileBlob
from sentry.models.files.fileblobindex import FileBlobIndex

while True:
j = task_queue.get()
try:
if j == _STOP_WORKER:
return

(blob_id,) = j

blob = FileBlob.objects.get(pk=blob_id)

# These checks were moved out of the query iterator and into the multiprocess task queue because they
# were bottlenecking the iteration.
if (
FileBlobIndex.objects.filter(blob=blob).exists()
or File.objects.filter(blob=blob).exists()
):
# still used
continue

blob.delete()
except Exception as e:
logger.exception(e)
finally:
task_queue.task_done()


@click.command()
@click.option("--days", default=30, show_default=True, help="Numbers of days to truncate on.")
@click.option("--project", help="Limit truncation to only entries from project.")
Expand Down Expand Up @@ -153,9 +195,21 @@ def cleanup(
# before we import or configure the app

pool = []
task_queue: _WorkQueue = Queue(1000)
queues: list[_EntityWorkQueue | _FileWorkQueue] = []
del_entity_task_queue: _EntityWorkQueue = Queue(1000)
queues.append(del_entity_task_queue)

for _ in range(concurrency):
p = Process(target=multiprocess_delete_entity_worker, args=(del_entity_task_queue,))
p.daemon = True
p.start()
pool.append(p)

del_file_task_queue: _FileWorkQueue = Queue(1000)
queues.append(del_file_task_queue)

for _ in range(concurrency):
p = Process(target=multiprocess_worker, args=(task_queue,))
p = Process(target=multiprocess_del_file_worker, args=(del_file_task_queue,))
p.daemon = True
p.start()
pool.append(p)
Expand Down Expand Up @@ -236,9 +290,9 @@ def is_filtered(model: type[Model]) -> bool:
)

for chunk in q.iterator(chunk_size=100):
task_queue.put((imp, chunk))
del_entity_task_queue.put((imp, chunk))

task_queue.join()
del_entity_task_queue.join()

project_deletion_query, to_delete_by_project = prepare_deletes_by_project(
project, project_id, is_filtered
Expand Down Expand Up @@ -266,21 +320,30 @@ def is_filtered(model: type[Model]) -> bool:
)

for chunk in q.iterator(chunk_size=100):
task_queue.put((imp, chunk))
del_entity_task_queue.put((imp, chunk))

task_queue.join()
del_entity_task_queue.join()

remove_file_blobs(is_filtered, silent)
remove_file_blobs(del_file_task_queue, is_filtered, silent)

finally:
if not silent:
click.echo("Beginning shutdown")
# Shut down our pool
for _ in pool:
task_queue.put(_STOP_WORKER)
for _q in queues:
for _ in range(concurrency):
_q.put(_STOP_WORKER)

if not silent:
click.echo("Waiting for pool to shutdown")

# And wait for it to drain
for p in pool:
p.join()

if not silent:
click.echo("Process pool has stopped")

if timed and start_time:
duration = int(time.time() - start_time)
metrics.timing("cleanup.duration", duration, instance=router, sample_rate=1.0)
Expand Down Expand Up @@ -486,7 +549,9 @@ def prepare_deletes_by_project(
return project_deletion_query, to_delete_by_project


def remove_file_blobs(is_filtered: Callable[[type[Model]], bool], silent: bool) -> None:
def remove_file_blobs(
del_file_task_queue: _FileWorkQueue, is_filtered: Callable[[type[Model]], bool], silent: bool
) -> None:
from sentry.models.file import FileBlob

# Clean up FileBlob instances which are no longer used and aren't super
Expand All @@ -495,10 +560,10 @@ def remove_file_blobs(is_filtered: Callable[[type[Model]], bool], silent: bool)
if is_filtered(FileBlob):
debug_output(">> Skipping FileBlob")
else:
cleanup_unused_files(silent)
cleanup_unused_files(del_file_task_queue, silent)


def cleanup_unused_files(quiet: bool = False) -> None:
def cleanup_unused_files(del_file_task_queue: _FileWorkQueue, quiet: bool = False) -> None:
"""
Remove FileBlob's (and thus the actual files) if they are no longer
referenced by any File.
Expand All @@ -507,9 +572,7 @@ def cleanup_unused_files(quiet: bool = False) -> None:
any blobs which are brand new and potentially in the process of being
referenced.
"""
from sentry.models.files.file import File
from sentry.models.files.fileblob import FileBlob
from sentry.models.files.fileblobindex import FileBlobIndex

if quiet:
from sentry.utils.query import RangeQuerySetWrapper
Expand All @@ -520,8 +583,6 @@ def cleanup_unused_files(quiet: bool = False) -> None:
queryset = FileBlob.objects.filter(timestamp__lte=cutoff)

for blob in RangeQuerySetWrapper(queryset):
if FileBlobIndex.objects.filter(blob=blob).exists():
continue
if File.objects.filter(blob=blob).exists():
continue
blob.delete()
del_file_task_queue.put((blob.id,))

del_file_task_queue.join()
Loading