Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(replays): Default to separated compute and IO #86382

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions src/sentry/options/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,13 +477,6 @@
default=[],
flags=FLAG_ALLOW_EMPTY | FLAG_PRIORITIZE_DISK | FLAG_AUTOMATOR_MODIFIABLE,
)
# Separate compute and IO.
register(
"replay.consumer.separate-compute-and-io-org-ids",
type=Sequence,
default=[],
flags=FLAG_ALLOW_EMPTY | FLAG_PRIORITIZE_DISK | FLAG_AUTOMATOR_MODIFIABLE,
)
# Used for internal dogfooding of a reduced timeout on rage/dead clicks.
register(
"replay.rage-click.experimental-timeout.org-id-list",
Expand Down
68 changes: 1 addition & 67 deletions src/sentry/replays/usecases/ingest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,79 +107,13 @@ def ingest_recording(message_bytes: bytes) -> None:
):
try:
message = parse_recording_message(message_bytes)
if message.org_id in options.get("replay.consumer.separate-compute-and-io-org-ids"):
_ingest_recording_separated_io_compute(message)
else:
_ingest_recording(message)
_ingest_recording_separated_io_compute(message)
except DropSilently:
# The message couldn't be parsed for whatever reason. We shouldn't block the consumer
# so we ignore it.
pass


@sentry_sdk.trace
def _ingest_recording(message: RecordingIngestMessage) -> None:
"""Ingest recording messages."""
set_tag("org_id", message.org_id)
set_tag("project_id", message.project_id)

headers, segment_bytes = parse_headers(message.payload_with_headers, message.replay_id)
segment = decompress_segment(segment_bytes)
_report_size_metrics(len(segment.compressed), len(segment.decompressed))

# Normalize ingest data into a standardized ingest format.
segment_data = RecordingSegmentStorageMeta(
project_id=message.project_id,
replay_id=message.replay_id,
segment_id=headers["segment_id"],
retention_days=message.retention_days,
)

if message.replay_video:
# Logging org info for bigquery
logger.info(
"sentry.replays.slow_click",
extra={
"event_type": "mobile_event",
"org_id": message.org_id,
"project_id": message.project_id,
"size": len(message.replay_video),
},
)

# Record video size for COGS analysis.
metrics.incr("replays.recording_consumer.replay_video_count")
metrics.distribution(
"replays.recording_consumer.replay_video_size",
len(message.replay_video),
unit="byte",
)

dat = zlib.compress(pack(rrweb=segment.decompressed, video=message.replay_video))
storage_kv.set(make_recording_filename(segment_data), dat)

# Track combined payload size.
metrics.distribution(
"replays.recording_consumer.replay_video_event_size", len(dat), unit="byte"
)
else:
storage_kv.set(make_recording_filename(segment_data), segment.compressed)

recording_post_processor(message, headers, segment.decompressed, message.replay_event)

# The first segment records an accepted outcome. This is for billing purposes. Subsequent
# segments are not billed.
if headers["segment_id"] == 0:
track_initial_segment_event(
message.org_id,
message.project_id,
message.replay_id,
message.key_id,
message.received,
is_replay_video=message.replay_video is not None,
)


@sentry_sdk.trace
def track_initial_segment_event(
org_id: int,
Expand Down
70 changes: 0 additions & 70 deletions tests/sentry/replays/consumers/test_recording.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from sentry.replays.models import ReplayRecordingSegment
from sentry.replays.usecases.pack import unpack
from sentry.testutils.cases import TransactionTestCase
from sentry.testutils.helpers.options import override_options


def test_multiprocessing_strategy():
Expand Down Expand Up @@ -512,72 +511,3 @@ def processing_factory(self):
max_buffer_size_in_bytes=1000,
max_buffer_time_in_seconds=1000,
)


class SeparateIOComputeRecordingTestCase(RecordingTestCase):

def test_compressed_segment_ingestion(self):
with override_options(
{"replay.consumer.separate-compute-and-io-org-ids": [self.organization.id]}
):
super().test_compressed_segment_ingestion()

def test_event_with_replay_video(self):
with override_options(
{"replay.consumer.separate-compute-and-io-org-ids": [self.organization.id]}
):
super().test_event_with_replay_video()

def test_event_with_replay_video_packed(self):
with override_options(
{"replay.consumer.separate-compute-and-io-org-ids": [self.organization.id]}
):
super().test_event_with_replay_video_packed()

def test_uncompressed_segment_ingestion(self):
with override_options(
{"replay.consumer.separate-compute-and-io-org-ids": [self.organization.id]}
):
super().test_uncompressed_segment_ingestion()

def test_invalid_json(self):
with override_options(
{"replay.consumer.separate-compute-and-io-org-ids": [self.organization.id]}
):
super().test_invalid_json()

def test_invalid_payload_invalid_headers(self):
with override_options(
{"replay.consumer.separate-compute-and-io-org-ids": [self.organization.id]}
):
super().test_invalid_payload_invalid_headers()

def test_invalid_payload_invalid_unicode_codepoint(self):
with override_options(
{"replay.consumer.separate-compute-and-io-org-ids": [self.organization.id]}
):
super().test_invalid_payload_invalid_unicode_codepoint()

def test_invalid_payload_malformed_headers(self):
with override_options(
{"replay.consumer.separate-compute-and-io-org-ids": [self.organization.id]}
):
super().test_invalid_payload_malformed_headers()

def test_invalid_payload_missing_headers(self):
with override_options(
{"replay.consumer.separate-compute-and-io-org-ids": [self.organization.id]}
):
super().test_invalid_payload_missing_headers()

def test_invalid_payload_type(self):
with override_options(
{"replay.consumer.separate-compute-and-io-org-ids": [self.organization.id]}
):
super().test_invalid_payload_type()

def test_invalid_message(self):
with override_options(
{"replay.consumer.separate-compute-and-io-org-ids": [self.organization.id]}
):
super().test_invalid_message()
Loading