-
-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(replays): Add buffered consumer implementation #85356
Draft
cmanallen
wants to merge
61
commits into
master
Choose a base branch
from
cmanallen/replays-optimize-consumer
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+1,130
−41
Draft
Changes from 46 commits
Commits
Show all changes
61 commits
Select commit
Hold shift + click to select a range
b8cb413
Add buffered consumer runtime implementation
cmanallen d82c589
Begin adding process logic
cmanallen e82ce0f
Add tracing to each component of recording processing
cmanallen 55cc3af
Delete unused function
cmanallen a243bdb
Report size metrics
cmanallen 3d728b5
Merge branch 'cmanallen/replays-improve-tracing' into cmanallen/repla…
cmanallen 890f170
Separate IO from processing
cmanallen 2dcfaec
Add explicit return
cmanallen 9e4e227
Merge branch 'cmanallen/replays-consumer-separate-processing-from-io'…
cmanallen db5fa11
Add buffer managers
cmanallen 7a5a98c
Write FilePart rows and adopt a new subscription model
cmanallen 44c899b
Add unit tests
cmanallen de76ad0
Add contextual errors
cmanallen abf22cf
Misc test updates
cmanallen 2e16a01
Fully separate compute and io within the recording consumer
cmanallen 208360f
Configure max workers
cmanallen 13ea6ac
Merge branch 'master' into cmanallen/replays-add-separated-compute-an…
cmanallen fc8df9c
Remove conditional branch as its moved further up the hierarchy
cmanallen b0f518c
Merge branch 'cmanallen/replays-add-separated-compute-and-io' into cm…
cmanallen 536c250
Use context manager
cmanallen ea571fa
Simplify buffer flushing (for now)
cmanallen 1826ccb
Merge branch 'master' into cmanallen/replays-optimize-consumer
cmanallen 3733886
Update tracing logic
cmanallen 88747c1
Soften flag requirements and minor fixes
cmanallen 74e811f
Remove buffer managers module
cmanallen 8f8e5ce
Test clean up
cmanallen fac3204
Fix unit test
cmanallen ae59eb9
Add explicit return
cmanallen a6a32e5
Fix typing
cmanallen f1026ca
Remove unused option
cmanallen b363596
Reset dom_index module to align with master
cmanallen c656420
Update buffering run-time coverage
cmanallen 29bb826
Update test ordering
cmanallen b4c6477
Add offset committing test
cmanallen 2572955
Add docs
cmanallen 7ca7462
More docstring fixes
cmanallen fb5731e
Add typing to flags and factory module
cmanallen 17eb011
Adopt buffered strategy in callsite
cmanallen 0c79eac
Add script for mocking recordings
cmanallen 30ed5d6
Add handling for appending offsets when the message is not buffered
cmanallen 52d8616
Add commit coverage
cmanallen b334400
Assert messages are committed regardless of if they're appended to th…
cmanallen 8618e29
Fix typing
cmanallen 0346a3c
Docstrings
cmanallen 4ab0ff2
More docstrings
cmanallen 77c2971
Yet more docstrings
cmanallen b956ad9
Implement declarative effect management
cmanallen 5e2ce4f
Merge branch 'master' into cmanallen/replays-optimize-consumer
cmanallen fa42004
Move offset management into the runtime
cmanallen 2215007
Fix typing
cmanallen 4cc15d7
Remove comments on offsets
cmanallen 4063f5d
Remove none-type messages
cmanallen ea64a50
Move offsets out of runtime and into platform strategy. Add support f…
cmanallen 62d7433
Fix typing
cmanallen 895185b
Docstrings and renames
cmanallen a0ea819
Update error handling and documentation
cmanallen ab3918c
Rename to sandbox
cmanallen 369acde
Update test coverage
cmanallen 85e0fcf
Remove unnecessary lambda
cmanallen c8d5745
Improve coveragE
cmanallen e349232
Sketch selective retry
cmanallen File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
#!/usr/bin/env python | ||
""". | ||
|
||
Helpful commands: | ||
|
||
- Run the consumer. | ||
- `sentry run consumer ingest-replay-recordings --consumer-group 0` | ||
- Check if offsets are committed correctly. | ||
- `docker exec -it kafka-kafka-1 kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group 0` | ||
""" | ||
from sentry.runner import configure | ||
|
||
configure() | ||
import logging | ||
import os | ||
import time | ||
import uuid | ||
|
||
import click | ||
from arroyo import Topic as ArroyoTopic | ||
from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration | ||
from sentry_kafka_schemas.codecs import Codec | ||
from sentry_kafka_schemas.schema_types.ingest_replay_recordings_v1 import ReplayRecording | ||
|
||
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "sentry.conf.server") | ||
|
||
import django | ||
|
||
django.setup() | ||
|
||
from sentry.conf.types.kafka_definition import Topic, get_topic_codec | ||
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
def get_producer() -> KafkaProducer: | ||
cluster_name = get_topic_definition(Topic.INGEST_REPLAYS_RECORDINGS)["cluster"] | ||
producer_config = get_kafka_producer_cluster_options(cluster_name) | ||
return KafkaProducer(build_kafka_configuration(default_config=producer_config)) | ||
|
||
|
||
RECORDING_CODEC: Codec[ReplayRecording] = get_topic_codec(Topic.INGEST_REPLAYS_RECORDINGS) | ||
|
||
|
||
@click.command() | ||
@click.option("--organization-id", type=int, required=True, help="Organization ID") | ||
@click.option("--project-id", type=int, required=True, help="Project ID") | ||
def main(organization_id: int, project_id: int) -> None: | ||
"""Produce a mock uptime result message to the INGEST_REPLAYS_RECORDINGS topic.""" | ||
message: ReplayRecording = { | ||
"key_id": None, | ||
"org_id": organization_id, | ||
"payload": b'{"segment_id"', | ||
"project_id": project_id, | ||
"received": int(time.time()), | ||
"replay_event": None, | ||
"replay_id": uuid.uuid4().hex, | ||
"replay_video": None, | ||
"retention_days": 30, | ||
"type": "replay_recording_not_chunked", | ||
"version": 1, | ||
} | ||
|
||
producer = get_producer() | ||
topic = get_topic_definition(Topic.INGEST_REPLAYS_RECORDINGS)["real_topic_name"] | ||
payload = KafkaPayload(None, RECORDING_CODEC.encode(message), []) | ||
|
||
producer.produce(ArroyoTopic(topic), payload) | ||
producer.close() | ||
|
||
logger.info("Successfully produced message to %s", topic) | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,128 @@ | ||
"""Session Replay recording consumer implementation. | ||
|
||
To understand how the buffering works visit the `lib.py` module and inspect the source of the | ||
buffering runtime. | ||
|
||
This module has two parts. A processing component and a buffer flushing component. The processing | ||
component is straight-forward. It accepts a message and performs some work on it. After it | ||
completes it instructs the runtime to append the message to the buffer. This is abstracted by the | ||
buffering runtime library so we just return the transformed data in this module. | ||
|
||
The second part is the flushing of the buffer. The buffering runtime library has no idea when to | ||
flush this buffer so it constantly asks us if it can flush. We control flushing behavior through a | ||
stateful "BufferManager" class. If we can_flush then we do_flush. After the flush completes the | ||
RunTime will commit the offsets. | ||
""" | ||
|
||
import contextlib | ||
import time | ||
from concurrent.futures import FIRST_EXCEPTION, ThreadPoolExecutor, wait | ||
from typing import TypedDict | ||
|
||
import sentry_sdk | ||
|
||
from sentry.replays.consumers.buffered.lib import Model, buffering_runtime | ||
from sentry.replays.usecases.ingest import ( | ||
DropSilently, | ||
ProcessedRecordingMessage, | ||
commit_recording_message, | ||
parse_recording_message, | ||
process_recording_message, | ||
sentry_tracing, | ||
track_recording_metadata, | ||
) | ||
|
||
|
||
class Flags(TypedDict): | ||
max_buffer_length: int | ||
max_buffer_wait: int | ||
max_workers: int | ||
|
||
|
||
class BufferManager: | ||
"""Buffer manager. | ||
|
||
The buffer manager is a class instance has a lifetime as long as the RunTime's. We pass its | ||
methods as callbacks to the Model. The state contained within the method's instance is implicit | ||
and unknown to the RunTime. | ||
""" | ||
|
||
def __init__(self, flags: Flags) -> None: | ||
self.__max_buffer_length = flags["max_buffer_length"] | ||
self.__max_buffer_wait = flags["max_buffer_wait"] | ||
self.__max_workers = flags["max_workers"] | ||
|
||
self.__last_flushed_at = time.time() | ||
|
||
def can_flush(self, model: Model[ProcessedRecordingMessage]) -> bool: | ||
# TODO: time.time is stateful and hard to test. We should enable the RunTime to perform | ||
# managed effects so we can properly test this behavior. | ||
return ( | ||
len(model.buffer) >= self.__max_buffer_length | ||
or (time.time() - self.__max_buffer_wait) >= self.__last_flushed_at | ||
) | ||
|
||
def do_flush(self, model: Model[ProcessedRecordingMessage]) -> None: | ||
with sentry_tracing("replays.consumers.buffered.flush_buffer"): | ||
flush_buffer(model, max_workers=self.__max_workers) | ||
# TODO: time.time again. Should be declarative for testing purposes. | ||
self.__last_flushed_at = time.time() | ||
|
||
|
||
@sentry_sdk.trace | ||
def flush_buffer(model: Model[ProcessedRecordingMessage], max_workers: int) -> None: | ||
if len(model.buffer) == 0: | ||
return None | ||
|
||
with ThreadPoolExecutor(max_workers=max_workers) as pool: | ||
futures = [pool.submit(flush_message, message) for message in model.buffer] | ||
|
||
# Tasks can fail. We check the done set for any failures. We will wait for all the | ||
# futures to complete before running this step or eagerly run this step if any task | ||
# errors. | ||
done, _ = wait(futures, return_when=FIRST_EXCEPTION) | ||
for future in done: | ||
exc = future.exception() | ||
if exc is not None: | ||
# TODO: Why raise? Can I do something more meaningful here than reject the whole | ||
# batch? Raising is certainly the easiest way of handling failures... | ||
raise exc | ||
|
||
# Recording metadata is not tracked in the threadpool. This is because this function will | ||
# log. Logging will acquire a lock and make our threading less useful due to the speed of | ||
# the I/O we do in this step. | ||
for message in model.buffer: | ||
track_recording_metadata(message) | ||
|
||
return None | ||
|
||
|
||
@sentry_sdk.trace | ||
def flush_message(message: ProcessedRecordingMessage) -> None: | ||
with contextlib.suppress(DropSilently): | ||
commit_recording_message(message) | ||
|
||
|
||
def process_message(message_bytes: bytes) -> ProcessedRecordingMessage | None: | ||
"""Message processing function. | ||
|
||
Accepts an unstructured type and returns a structured one. Other than tracing the goal is to | ||
have no I/O here. We'll commit the I/O on flush. | ||
""" | ||
with sentry_tracing("replays.consumers.buffered.process_message"): | ||
with contextlib.suppress(DropSilently): | ||
message = parse_recording_message(message_bytes) | ||
return process_recording_message(message) | ||
return None | ||
|
||
|
||
def init(flags: Flags) -> Model[ProcessedRecordingMessage]: | ||
"""Return the initial state of the application.""" | ||
buffer = BufferManager(flags) | ||
return Model(buffer=[], can_flush=buffer.can_flush, do_flush=buffer.do_flush, offsets={}) | ||
|
||
|
||
recording_runtime = buffering_runtime( | ||
init_fn=init, | ||
process_fn=process_message, | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
"""Session Replay recording consumer strategy factory. | ||
|
||
This module exists solely to abstract the bootstraping process of the application and runtime in | ||
`sentry/consumers/__init__.py`. | ||
""" | ||
|
||
from collections.abc import Mapping | ||
|
||
from arroyo.backends.kafka.consumer import KafkaPayload | ||
from arroyo.processing.strategies import ProcessingStrategy, ProcessingStrategyFactory | ||
from arroyo.types import Commit as ArroyoCommit | ||
from arroyo.types import Partition | ||
|
||
from sentry.replays.consumers.buffered.consumer import Flags, recording_runtime | ||
from sentry.replays.consumers.buffered.platform import PlatformStrategy | ||
|
||
|
||
class PlatformStrategyFactory(ProcessingStrategyFactory[KafkaPayload]): | ||
|
||
def __init__(self, max_buffer_length: int, max_buffer_wait: int, max_workers: int) -> None: | ||
self.flags: Flags = { | ||
"max_buffer_length": max_buffer_length, | ||
"max_buffer_wait": max_buffer_wait, | ||
"max_workers": max_workers, | ||
} | ||
|
||
def create_with_partitions( | ||
self, | ||
commit: ArroyoCommit, | ||
partitions: Mapping[Partition, int], | ||
) -> ProcessingStrategy[KafkaPayload]: | ||
return PlatformStrategy(commit=commit, flags=self.flags, runtime=recording_runtime) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wouldn't it be considerably simpler to model this consumer as a sequence of these Arroyo operators:
Modeling the system this way would:
|
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Arroyo primitives manage these kind of concerns for you (when to flush a batch for example). Are you sure about the idea of pushing them into the product code instead ?