Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(replays): Add buffered consumer implementation #85356

Draft
wants to merge 61 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 46 commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
b8cb413
Add buffered consumer runtime implementation
cmanallen Feb 18, 2025
d82c589
Begin adding process logic
cmanallen Feb 19, 2025
e82ce0f
Add tracing to each component of recording processing
cmanallen Feb 19, 2025
55cc3af
Delete unused function
cmanallen Feb 19, 2025
a243bdb
Report size metrics
cmanallen Feb 19, 2025
3d728b5
Merge branch 'cmanallen/replays-improve-tracing' into cmanallen/repla…
cmanallen Feb 19, 2025
890f170
Separate IO from processing
cmanallen Feb 19, 2025
2dcfaec
Add explicit return
cmanallen Feb 19, 2025
9e4e227
Merge branch 'cmanallen/replays-consumer-separate-processing-from-io'…
cmanallen Feb 19, 2025
db5fa11
Add buffer managers
cmanallen Feb 19, 2025
7a5a98c
Write FilePart rows and adopt a new subscription model
cmanallen Feb 20, 2025
44c899b
Add unit tests
cmanallen Feb 20, 2025
de76ad0
Add contextual errors
cmanallen Feb 20, 2025
abf22cf
Misc test updates
cmanallen Feb 21, 2025
2e16a01
Fully separate compute and io within the recording consumer
cmanallen Feb 24, 2025
208360f
Configure max workers
cmanallen Feb 25, 2025
13ea6ac
Merge branch 'master' into cmanallen/replays-add-separated-compute-an…
cmanallen Feb 25, 2025
fc8df9c
Remove conditional branch as its moved further up the hierarchy
cmanallen Feb 25, 2025
b0f518c
Merge branch 'cmanallen/replays-add-separated-compute-and-io' into cm…
cmanallen Feb 25, 2025
536c250
Use context manager
cmanallen Feb 25, 2025
ea571fa
Simplify buffer flushing (for now)
cmanallen Feb 25, 2025
1826ccb
Merge branch 'master' into cmanallen/replays-optimize-consumer
cmanallen Feb 27, 2025
3733886
Update tracing logic
cmanallen Feb 27, 2025
88747c1
Soften flag requirements and minor fixes
cmanallen Feb 27, 2025
74e811f
Remove buffer managers module
cmanallen Feb 27, 2025
8f8e5ce
Test clean up
cmanallen Feb 27, 2025
fac3204
Fix unit test
cmanallen Feb 27, 2025
ae59eb9
Add explicit return
cmanallen Feb 27, 2025
a6a32e5
Fix typing
cmanallen Feb 27, 2025
f1026ca
Remove unused option
cmanallen Feb 27, 2025
b363596
Reset dom_index module to align with master
cmanallen Feb 27, 2025
c656420
Update buffering run-time coverage
cmanallen Feb 27, 2025
29bb826
Update test ordering
cmanallen Feb 27, 2025
b4c6477
Add offset committing test
cmanallen Feb 27, 2025
2572955
Add docs
cmanallen Feb 27, 2025
7ca7462
More docstring fixes
cmanallen Feb 27, 2025
fb5731e
Add typing to flags and factory module
cmanallen Feb 27, 2025
17eb011
Adopt buffered strategy in callsite
cmanallen Feb 28, 2025
0c79eac
Add script for mocking recordings
cmanallen Feb 28, 2025
30ed5d6
Add handling for appending offsets when the message is not buffered
cmanallen Feb 28, 2025
52d8616
Add commit coverage
cmanallen Feb 28, 2025
b334400
Assert messages are committed regardless of if they're appended to th…
cmanallen Feb 28, 2025
8618e29
Fix typing
cmanallen Feb 28, 2025
0346a3c
Docstrings
cmanallen Feb 28, 2025
4ab0ff2
More docstrings
cmanallen Feb 28, 2025
77c2971
Yet more docstrings
cmanallen Feb 28, 2025
b956ad9
Implement declarative effect management
cmanallen Mar 3, 2025
5e2ce4f
Merge branch 'master' into cmanallen/replays-optimize-consumer
cmanallen Mar 3, 2025
fa42004
Move offset management into the runtime
cmanallen Mar 3, 2025
2215007
Fix typing
cmanallen Mar 3, 2025
4cc15d7
Remove comments on offsets
cmanallen Mar 3, 2025
4063f5d
Remove none-type messages
cmanallen Mar 3, 2025
ea64a50
Move offsets out of runtime and into platform strategy. Add support f…
cmanallen Mar 3, 2025
62d7433
Fix typing
cmanallen Mar 3, 2025
895185b
Docstrings and renames
cmanallen Mar 3, 2025
a0ea819
Update error handling and documentation
cmanallen Mar 4, 2025
ab3918c
Rename to sandbox
cmanallen Mar 4, 2025
369acde
Update test coverage
cmanallen Mar 4, 2025
85e0fcf
Remove unnecessary lambda
cmanallen Mar 4, 2025
c8d5745
Improve coveragE
cmanallen Mar 4, 2025
e349232
Sketch selective retry
cmanallen Mar 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions bin/mock-replay-recording
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#!/usr/bin/env python
""".

Helpful commands:

- Run the consumer.
- `sentry run consumer ingest-replay-recordings --consumer-group 0`
- Check if offsets are committed correctly.
- `docker exec -it kafka-kafka-1 kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group 0`
"""
from sentry.runner import configure

configure()
import logging
import os
import time
import uuid

import click
from arroyo import Topic as ArroyoTopic
from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration
from sentry_kafka_schemas.codecs import Codec
from sentry_kafka_schemas.schema_types.ingest_replay_recordings_v1 import ReplayRecording

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "sentry.conf.server")

import django

django.setup()

from sentry.conf.types.kafka_definition import Topic, get_topic_codec
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition

logger = logging.getLogger(__name__)


def get_producer() -> KafkaProducer:
cluster_name = get_topic_definition(Topic.INGEST_REPLAYS_RECORDINGS)["cluster"]
producer_config = get_kafka_producer_cluster_options(cluster_name)
return KafkaProducer(build_kafka_configuration(default_config=producer_config))


RECORDING_CODEC: Codec[ReplayRecording] = get_topic_codec(Topic.INGEST_REPLAYS_RECORDINGS)


@click.command()
@click.option("--organization-id", type=int, required=True, help="Organization ID")
@click.option("--project-id", type=int, required=True, help="Project ID")
def main(organization_id: int, project_id: int) -> None:
"""Produce a mock uptime result message to the INGEST_REPLAYS_RECORDINGS topic."""
message: ReplayRecording = {
"key_id": None,
"org_id": organization_id,
"payload": b'{"segment_id"',
"project_id": project_id,
"received": int(time.time()),
"replay_event": None,
"replay_id": uuid.uuid4().hex,
"replay_video": None,
"retention_days": 30,
"type": "replay_recording_not_chunked",
"version": 1,
}

producer = get_producer()
topic = get_topic_definition(Topic.INGEST_REPLAYS_RECORDINGS)["real_topic_name"]
payload = KafkaPayload(None, RECORDING_CODEC.encode(message), [])

producer.produce(ArroyoTopic(topic), payload)
producer.close()

logger.info("Successfully produced message to %s", topic)


if __name__ == "__main__":
main()
25 changes: 6 additions & 19 deletions src/sentry/consumers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,24 +84,11 @@ def ingest_replay_recordings_options() -> list[click.Option]:

def ingest_replay_recordings_buffered_options() -> list[click.Option]:
"""Return a list of ingest-replay-recordings-buffered options."""
options = [
click.Option(
["--max-buffer-message-count", "max_buffer_message_count"],
type=int,
default=100,
),
click.Option(
["--max-buffer-size-in-bytes", "max_buffer_size_in_bytes"],
type=int,
default=2_500_000,
),
click.Option(
["--max-buffer-time-in-seconds", "max_buffer_time_in_seconds"],
type=int,
default=1,
),
return [
click.Option(["--max-buffer-length", "max_buffer_length"], type=int, default=8),
click.Option(["--max-buffer-wait", "max_buffer_wait"], type=int, default=1),
click.Option(["--max-workers", "max_workers"], type=int, default=8),
]
return options


def ingest_monitors_options() -> list[click.Option]:
Expand Down Expand Up @@ -269,8 +256,8 @@ def ingest_transactions_options() -> list[click.Option]:
},
"ingest-replay-recordings": {
"topic": Topic.INGEST_REPLAYS_RECORDINGS,
"strategy_factory": "sentry.replays.consumers.recording.ProcessReplayRecordingStrategyFactory",
"click_options": ingest_replay_recordings_options(),
"strategy_factory": "sentry.replays.consumers.buffered.factory.PlatformStrategyFactory",
"click_options": ingest_replay_recordings_buffered_options(),
},
"ingest-replay-recordings-buffered": {
"topic": Topic.INGEST_REPLAYS_RECORDINGS,
Expand Down
Empty file.
128 changes: 128 additions & 0 deletions src/sentry/replays/consumers/buffered/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
"""Session Replay recording consumer implementation.

To understand how the buffering works visit the `lib.py` module and inspect the source of the
buffering runtime.

This module has two parts. A processing component and a buffer flushing component. The processing
component is straight-forward. It accepts a message and performs some work on it. After it
completes it instructs the runtime to append the message to the buffer. This is abstracted by the
buffering runtime library so we just return the transformed data in this module.

The second part is the flushing of the buffer. The buffering runtime library has no idea when to
flush this buffer so it constantly asks us if it can flush. We control flushing behavior through a
stateful "BufferManager" class. If we can_flush then we do_flush. After the flush completes the
RunTime will commit the offsets.
"""

import contextlib
import time
from concurrent.futures import FIRST_EXCEPTION, ThreadPoolExecutor, wait
from typing import TypedDict

import sentry_sdk

from sentry.replays.consumers.buffered.lib import Model, buffering_runtime
from sentry.replays.usecases.ingest import (
DropSilently,
ProcessedRecordingMessage,
commit_recording_message,
parse_recording_message,
process_recording_message,
sentry_tracing,
track_recording_metadata,
)


class Flags(TypedDict):
max_buffer_length: int
max_buffer_wait: int
max_workers: int


class BufferManager:
"""Buffer manager.

The buffer manager is a class instance has a lifetime as long as the RunTime's. We pass its
methods as callbacks to the Model. The state contained within the method's instance is implicit
and unknown to the RunTime.
"""

def __init__(self, flags: Flags) -> None:
self.__max_buffer_length = flags["max_buffer_length"]
self.__max_buffer_wait = flags["max_buffer_wait"]
self.__max_workers = flags["max_workers"]

self.__last_flushed_at = time.time()

def can_flush(self, model: Model[ProcessedRecordingMessage]) -> bool:
# TODO: time.time is stateful and hard to test. We should enable the RunTime to perform
# managed effects so we can properly test this behavior.
return (
len(model.buffer) >= self.__max_buffer_length
or (time.time() - self.__max_buffer_wait) >= self.__last_flushed_at
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arroyo primitives manage these kind of concerns for you (when to flush a batch for example). Are you sure about the idea of pushing them into the product code instead ?


def do_flush(self, model: Model[ProcessedRecordingMessage]) -> None:
with sentry_tracing("replays.consumers.buffered.flush_buffer"):
flush_buffer(model, max_workers=self.__max_workers)
# TODO: time.time again. Should be declarative for testing purposes.
self.__last_flushed_at = time.time()


@sentry_sdk.trace
def flush_buffer(model: Model[ProcessedRecordingMessage], max_workers: int) -> None:
if len(model.buffer) == 0:
return None

with ThreadPoolExecutor(max_workers=max_workers) as pool:
futures = [pool.submit(flush_message, message) for message in model.buffer]

# Tasks can fail. We check the done set for any failures. We will wait for all the
# futures to complete before running this step or eagerly run this step if any task
# errors.
done, _ = wait(futures, return_when=FIRST_EXCEPTION)
for future in done:
exc = future.exception()
if exc is not None:
# TODO: Why raise? Can I do something more meaningful here than reject the whole
# batch? Raising is certainly the easiest way of handling failures...
raise exc

# Recording metadata is not tracked in the threadpool. This is because this function will
# log. Logging will acquire a lock and make our threading less useful due to the speed of
# the I/O we do in this step.
for message in model.buffer:
track_recording_metadata(message)

return None


@sentry_sdk.trace
def flush_message(message: ProcessedRecordingMessage) -> None:
with contextlib.suppress(DropSilently):
commit_recording_message(message)


def process_message(message_bytes: bytes) -> ProcessedRecordingMessage | None:
"""Message processing function.

Accepts an unstructured type and returns a structured one. Other than tracing the goal is to
have no I/O here. We'll commit the I/O on flush.
"""
with sentry_tracing("replays.consumers.buffered.process_message"):
with contextlib.suppress(DropSilently):
message = parse_recording_message(message_bytes)
return process_recording_message(message)
return None


def init(flags: Flags) -> Model[ProcessedRecordingMessage]:
"""Return the initial state of the application."""
buffer = BufferManager(flags)
return Model(buffer=[], can_flush=buffer.can_flush, do_flush=buffer.do_flush, offsets={})


recording_runtime = buffering_runtime(
init_fn=init,
process_fn=process_message,
)
32 changes: 32 additions & 0 deletions src/sentry/replays/consumers/buffered/factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""Session Replay recording consumer strategy factory.

This module exists solely to abstract the bootstraping process of the application and runtime in
`sentry/consumers/__init__.py`.
"""

from collections.abc import Mapping

from arroyo.backends.kafka.consumer import KafkaPayload
from arroyo.processing.strategies import ProcessingStrategy, ProcessingStrategyFactory
from arroyo.types import Commit as ArroyoCommit
from arroyo.types import Partition

from sentry.replays.consumers.buffered.consumer import Flags, recording_runtime
from sentry.replays.consumers.buffered.platform import PlatformStrategy


class PlatformStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):

def __init__(self, max_buffer_length: int, max_buffer_wait: int, max_workers: int) -> None:
self.flags: Flags = {
"max_buffer_length": max_buffer_length,
"max_buffer_wait": max_buffer_wait,
"max_workers": max_workers,
}

def create_with_partitions(
self,
commit: ArroyoCommit,
partitions: Mapping[Partition, int],
) -> ProcessingStrategy[KafkaPayload]:
return PlatformStrategy(commit=commit, flags=self.flags, runtime=recording_runtime)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be considerably simpler to model this consumer as a sequence of these Arroyo operators:

Modeling the system this way would:

  • allow the parallelism either via processes and threads without application logic changes
  • guarantee a pipeline approach that allows the batching step to keep batching new messages while the worker thread performs its work.
  • hide offset management entirely from the application code.

Loading
Loading