Skip to content

Commit

Permalink
Default to separated compute and IO
Browse files Browse the repository at this point in the history
  • Loading branch information
cmanallen committed Mar 5, 2025
1 parent 6539af9 commit 461bfce
Show file tree
Hide file tree
Showing 3 changed files with 1 addition and 144 deletions.
7 changes: 0 additions & 7 deletions src/sentry/options/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,13 +477,6 @@
default=[],
flags=FLAG_ALLOW_EMPTY | FLAG_PRIORITIZE_DISK | FLAG_AUTOMATOR_MODIFIABLE,
)
# Separate compute and IO.
register(
"replay.consumer.separate-compute-and-io-org-ids",
type=Sequence,
default=[],
flags=FLAG_ALLOW_EMPTY | FLAG_PRIORITIZE_DISK | FLAG_AUTOMATOR_MODIFIABLE,
)
# Used for internal dogfooding of a reduced timeout on rage/dead clicks.
register(
"replay.rage-click.experimental-timeout.org-id-list",
Expand Down
68 changes: 1 addition & 67 deletions src/sentry/replays/usecases/ingest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,79 +107,13 @@ def ingest_recording(message_bytes: bytes) -> None:
):
try:
message = parse_recording_message(message_bytes)
if message.org_id in options.get("replay.consumer.separate-compute-and-io-org-ids"):
_ingest_recording_separated_io_compute(message)
else:
_ingest_recording(message)
_ingest_recording_separated_io_compute(message)
except DropSilently:
# The message couldn't be parsed for whatever reason. We shouldn't block the consumer
# so we ignore it.
pass


@sentry_sdk.trace
def _ingest_recording(message: RecordingIngestMessage) -> None:
"""Ingest recording messages."""
set_tag("org_id", message.org_id)
set_tag("project_id", message.project_id)

headers, segment_bytes = parse_headers(message.payload_with_headers, message.replay_id)
segment = decompress_segment(segment_bytes)
_report_size_metrics(len(segment.compressed), len(segment.decompressed))

# Normalize ingest data into a standardized ingest format.
segment_data = RecordingSegmentStorageMeta(
project_id=message.project_id,
replay_id=message.replay_id,
segment_id=headers["segment_id"],
retention_days=message.retention_days,
)

if message.replay_video:
# Logging org info for bigquery
logger.info(
"sentry.replays.slow_click",
extra={
"event_type": "mobile_event",
"org_id": message.org_id,
"project_id": message.project_id,
"size": len(message.replay_video),
},
)

# Record video size for COGS analysis.
metrics.incr("replays.recording_consumer.replay_video_count")
metrics.distribution(
"replays.recording_consumer.replay_video_size",
len(message.replay_video),
unit="byte",
)

dat = zlib.compress(pack(rrweb=segment.decompressed, video=message.replay_video))
storage_kv.set(make_recording_filename(segment_data), dat)

# Track combined payload size.
metrics.distribution(
"replays.recording_consumer.replay_video_event_size", len(dat), unit="byte"
)
else:
storage_kv.set(make_recording_filename(segment_data), segment.compressed)

recording_post_processor(message, headers, segment.decompressed, message.replay_event)

# The first segment records an accepted outcome. This is for billing purposes. Subsequent
# segments are not billed.
if headers["segment_id"] == 0:
track_initial_segment_event(
message.org_id,
message.project_id,
message.replay_id,
message.key_id,
message.received,
is_replay_video=message.replay_video is not None,
)


@sentry_sdk.trace
def track_initial_segment_event(
org_id: int,
Expand Down
70 changes: 0 additions & 70 deletions tests/sentry/replays/consumers/test_recording.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from sentry.replays.models import ReplayRecordingSegment
from sentry.replays.usecases.pack import unpack
from sentry.testutils.cases import TransactionTestCase
from sentry.testutils.helpers.options import override_options


def test_multiprocessing_strategy():
Expand Down Expand Up @@ -512,72 +511,3 @@ def processing_factory(self):
max_buffer_size_in_bytes=1000,
max_buffer_time_in_seconds=1000,
)


class SeparateIOComputeRecordingTestCase(RecordingTestCase):

def test_compressed_segment_ingestion(self):
with override_options(
{"replay.consumer.separate-compute-and-io-org-ids": [self.organization.id]}
):
super().test_compressed_segment_ingestion()

def test_event_with_replay_video(self):
with override_options(
{"replay.consumer.separate-compute-and-io-org-ids": [self.organization.id]}
):
super().test_event_with_replay_video()

def test_event_with_replay_video_packed(self):
with override_options(
{"replay.consumer.separate-compute-and-io-org-ids": [self.organization.id]}
):
super().test_event_with_replay_video_packed()

def test_uncompressed_segment_ingestion(self):
with override_options(
{"replay.consumer.separate-compute-and-io-org-ids": [self.organization.id]}
):
super().test_uncompressed_segment_ingestion()

def test_invalid_json(self):
with override_options(
{"replay.consumer.separate-compute-and-io-org-ids": [self.organization.id]}
):
super().test_invalid_json()

def test_invalid_payload_invalid_headers(self):
with override_options(
{"replay.consumer.separate-compute-and-io-org-ids": [self.organization.id]}
):
super().test_invalid_payload_invalid_headers()

def test_invalid_payload_invalid_unicode_codepoint(self):
with override_options(
{"replay.consumer.separate-compute-and-io-org-ids": [self.organization.id]}
):
super().test_invalid_payload_invalid_unicode_codepoint()

def test_invalid_payload_malformed_headers(self):
with override_options(
{"replay.consumer.separate-compute-and-io-org-ids": [self.organization.id]}
):
super().test_invalid_payload_malformed_headers()

def test_invalid_payload_missing_headers(self):
with override_options(
{"replay.consumer.separate-compute-and-io-org-ids": [self.organization.id]}
):
super().test_invalid_payload_missing_headers()

def test_invalid_payload_type(self):
with override_options(
{"replay.consumer.separate-compute-and-io-org-ids": [self.organization.id]}
):
super().test_invalid_payload_type()

def test_invalid_message(self):
with override_options(
{"replay.consumer.separate-compute-and-io-org-ids": [self.organization.id]}
):
super().test_invalid_message()

0 comments on commit 461bfce

Please sign in to comment.