diff --git a/src/sentry/replays/consumers/recording_buffered.py b/src/sentry/replays/consumers/recording_buffered.py index 2cf217cfae20cd..d056131897ec33 100644 --- a/src/sentry/replays/consumers/recording_buffered.py +++ b/src/sentry/replays/consumers/recording_buffered.py @@ -1,11 +1,13 @@ import logging from collections.abc import Mapping +import sentry_sdk from arroyo.backends.kafka.consumer import KafkaPayload from arroyo.processing.strategies import FilterStep, RunTask, RunTaskInThreads from arroyo.processing.strategies.abstract import ProcessingStrategy, ProcessingStrategyFactory from arroyo.processing.strategies.commit import CommitOffsets from arroyo.types import Commit, Message, Partition +from django.conf import settings from sentry.filestore.gcs import GCS_RETRYABLE_ERRORS from sentry.replays.usecases.ingest import ( @@ -44,21 +46,35 @@ def create_with_partitions( def process_message(message: Message[KafkaPayload]) -> ProcessedRecordingMessage | None: - try: - return process_recording_message(parse_recording_message(message.payload.value)) - except DropSilently: - return None - except Exception: - logger.exception("Failed to process replay recording message.") - return None + with sentry_sdk.start_transaction( + name="replays.consumer.recording_buffered.process_message", + op="replays.consumer.recording_buffered", + custom_sampling_context={ + "sample_rate": getattr(settings, "SENTRY_REPLAY_RECORDINGS_CONSUMER_APM_SAMPLING", 0) + }, + ): + try: + return process_recording_message(parse_recording_message(message.payload.value)) + except DropSilently: + return None + except Exception: + logger.exception("Failed to process replay recording message.") + return None def commit_message(message: Message[ProcessedRecordingMessage]) -> None: - try: - commit_recording_message(message.payload) - return None - except GCS_RETRYABLE_ERRORS: - raise - except Exception: - logger.exception("Failed to commit replay recording message.") - return None + with sentry_sdk.start_transaction( + name="replays.consumer.recording_buffered.commit_message", + op="replays.consumer.recording_buffered", + custom_sampling_context={ + "sample_rate": getattr(settings, "SENTRY_REPLAY_RECORDINGS_CONSUMER_APM_SAMPLING", 0) + }, + ): + try: + commit_recording_message(message.payload) + return None + except GCS_RETRYABLE_ERRORS: + raise + except Exception: + logger.exception("Failed to commit replay recording message.") + return None