diff --git a/bin/mock-replay-recording b/bin/mock-replay-recording new file mode 100755 index 00000000000000..2f2d27109ace96 --- /dev/null +++ b/bin/mock-replay-recording @@ -0,0 +1,76 @@ +#!/usr/bin/env python +""". + +Helpful commands: + + - Run the consumer. + - `sentry run consumer ingest-replay-recordings --consumer-group 0` + - Check if offsets are committed correctly. + - `docker exec -it kafka-kafka-1 kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group 0` +""" +from sentry.runner import configure + +configure() +import logging +import os +import time +import uuid + +import click +from arroyo import Topic as ArroyoTopic +from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration +from sentry_kafka_schemas.codecs import Codec +from sentry_kafka_schemas.schema_types.ingest_replay_recordings_v1 import ReplayRecording + +os.environ.setdefault("DJANGO_SETTINGS_MODULE", "sentry.conf.server") + +import django + +django.setup() + +from sentry.conf.types.kafka_definition import Topic, get_topic_codec +from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition + +logger = logging.getLogger(__name__) + + +def get_producer() -> KafkaProducer: + cluster_name = get_topic_definition(Topic.INGEST_REPLAYS_RECORDINGS)["cluster"] + producer_config = get_kafka_producer_cluster_options(cluster_name) + return KafkaProducer(build_kafka_configuration(default_config=producer_config)) + + +RECORDING_CODEC: Codec[ReplayRecording] = get_topic_codec(Topic.INGEST_REPLAYS_RECORDINGS) + + +@click.command() +@click.option("--organization-id", type=int, required=True, help="Organization ID") +@click.option("--project-id", type=int, required=True, help="Project ID") +def main(organization_id: int, project_id: int) -> None: + """Produce a mock uptime result message to the INGEST_REPLAYS_RECORDINGS topic.""" + message: ReplayRecording = { + "key_id": None, + "org_id": organization_id, + "payload": b'{"segment_id"', + "project_id": project_id, + "received": int(time.time()), + "replay_event": None, + "replay_id": uuid.uuid4().hex, + "replay_video": None, + "retention_days": 30, + "type": "replay_recording_not_chunked", + "version": 1, + } + + producer = get_producer() + topic = get_topic_definition(Topic.INGEST_REPLAYS_RECORDINGS)["real_topic_name"] + payload = KafkaPayload(None, RECORDING_CODEC.encode(message), []) + + producer.produce(ArroyoTopic(topic), payload) + producer.close() + + logger.info("Successfully produced message to %s", topic) + + +if __name__ == "__main__": + main() diff --git a/src/sentry/consumers/__init__.py b/src/sentry/consumers/__init__.py index f3e5aa93484434..785a2c452877e5 100644 --- a/src/sentry/consumers/__init__.py +++ b/src/sentry/consumers/__init__.py @@ -84,24 +84,11 @@ def ingest_replay_recordings_options() -> list[click.Option]: def ingest_replay_recordings_buffered_options() -> list[click.Option]: """Return a list of ingest-replay-recordings-buffered options.""" - options = [ - click.Option( - ["--max-buffer-message-count", "max_buffer_message_count"], - type=int, - default=100, - ), - click.Option( - ["--max-buffer-size-in-bytes", "max_buffer_size_in_bytes"], - type=int, - default=2_500_000, - ), - click.Option( - ["--max-buffer-time-in-seconds", "max_buffer_time_in_seconds"], - type=int, - default=1, - ), + return [ + click.Option(["--max-buffer-length", "max_buffer_length"], type=int, default=8), + click.Option(["--max-buffer-wait", "max_buffer_wait"], type=int, default=1), + click.Option(["--max-workers", "max_workers"], type=int, default=8), ] - return options def ingest_monitors_options() -> list[click.Option]: @@ -269,8 +256,8 @@ def ingest_transactions_options() -> list[click.Option]: }, "ingest-replay-recordings": { "topic": Topic.INGEST_REPLAYS_RECORDINGS, - "strategy_factory": "sentry.replays.consumers.recording.ProcessReplayRecordingStrategyFactory", - "click_options": ingest_replay_recordings_options(), + "strategy_factory": "sentry.replays.consumers.buffered.factory.PlatformStrategyFactory", + "click_options": ingest_replay_recordings_buffered_options(), }, "ingest-replay-recordings-buffered": { "topic": Topic.INGEST_REPLAYS_RECORDINGS, diff --git a/src/sentry/replays/consumers/buffered/__init__.py b/src/sentry/replays/consumers/buffered/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/src/sentry/replays/consumers/buffered/consumer.py b/src/sentry/replays/consumers/buffered/consumer.py new file mode 100644 index 00000000000000..a1937aa87a62a2 --- /dev/null +++ b/src/sentry/replays/consumers/buffered/consumer.py @@ -0,0 +1,246 @@ +"""Session Replay recording consumer implementation. + +The consumer implementation follows a batching flush strategy. We accept messages, process them, +buffer them, and when some threshold is reached we flush the buffer. The batch has finished work +after the buffer is flushed so we commit with a None value. +""" + +import contextlib +import logging +import time +from concurrent.futures import ThreadPoolExecutor, wait +from dataclasses import dataclass +from typing import TypedDict + +import sentry_sdk + +from sentry.filestore.gcs import GCS_RETRYABLE_ERRORS +from sentry.replays.consumers.buffered.platform import ( + Cmd, + Commit, + Effect, + Join, + Nothing, + Poll, + RunTime, + Sub, + Task, +) +from sentry.replays.consumers.buffered.types import Result +from sentry.replays.usecases.ingest import ( + DropSilently, + ProcessedRecordingMessage, + commit_recording_message, + parse_recording_message, + process_recording_message, + track_recording_metadata, +) + +logger = logging.getLogger() + +# Types. + + +class Flags(TypedDict): + max_buffer_length: int + max_buffer_wait: int + max_workers: int + + +@dataclass +class Model: + buffer: list[ProcessedRecordingMessage] + last_flushed_at: float + max_buffer_length: int + max_buffer_wait: int + max_workers: int + + +@dataclass(frozen=True) +class Append: + """Append the item to the buffer.""" + + item: ProcessedRecordingMessage + + +@dataclass(frozen=True) +class Committed: + """The platform committed offsets. Our buffer is now completely done.""" + + +@dataclass(frozen=True) +class Flush: + """Our application hit the flush threshold and has been instructed to flush.""" + + buffer: list[ProcessedRecordingMessage] + + +@dataclass(frozen=True) +class Flushed: + """Our application successfully flushed.""" + + result: Result[float, list[bool]] + + +class Skip: + """Skip the message.""" + + +@dataclass(frozen=True) +class TryFlush: + """Instruct the application to flush the buffer if its time.""" + + now: float + + +# A "Msg" is the union of all application messages our RunTime will accept. +Msg = Append | Committed | Flush | Flushed | Skip | TryFlush + + +# State machine functions. + + +def init(flags: Flags) -> tuple[Model, Cmd[Msg, None]]: + """Initialize the state of the consumer.""" + return ( + Model( + buffer=[], + last_flushed_at=time.time(), + max_buffer_wait=flags["max_buffer_wait"], + max_workers=flags["max_workers"], + max_buffer_length=flags["max_buffer_length"], + ), + Nothing(), + ) + + +@sentry_sdk.trace +def process(model: Model, message: bytes) -> Msg: + """Process raw bytes to structured output. + + Some messages can not be parsed and their failures are known to the application. Other messages + can not be parsed and their failures are unknown to the application. In either case we don't + block ingestion for deterministic failures. We'll address the short-comings in a pull request. + + This is a good place to DLQ messages within unknown failure modes. The DLQ does not exist + currently and so is not implemented here. + """ + try: + item = process_recording_message(parse_recording_message(message)) + return Append(item=item) + except DropSilently: + return Skip() + except Exception: + logger.exception("Could not process replay recording message.") # Unmanaged effect. + return Skip() + + +def update(model: Model, msg: Msg) -> tuple[Model, Cmd[Msg, None]]: + """Grand central dispatch. + + This is the brain of the consumer. Events are processed and sent here for handling. Msgs enter + and Cmds exit this function. If the sequence of messages and commands are in a specific order a + flush event will occur and the buffer will be committed. + """ + match msg: + case Append(item=item): + model.buffer.append(item) + return (model, Effect(fun=time.time, msg=TryFlush)) + case Skip(): + return (model, Effect(fun=time.time, msg=TryFlush)) + case Committed(): + return (model, Nothing()) + case Flush(buffer=buffer): + return (model, Effect(fun=FlushBuffer(buffer, model.max_workers), msg=Flushed)) + case Flushed(result=result): + if result.is_ok: + value = result.unwrap() + model.buffer = [] + model.last_flushed_at = value + model.retries = 0 + return (model, Commit(msg=Committed(), value=None)) + else: + buffer = [item for item, error in zip(model.buffer, result.unwrap_err()) if error] + logger.info("[FLUSHED] Retrying %d/%d messages.", len(buffer), len(model.buffer)) + return (model, Task(msg=Flush(buffer=buffer))) + case TryFlush(now=now): + if can_flush(model, now): + return (model, Task(msg=Flush(buffer=model.buffer))) + else: + return (model, Nothing()) + + +def subscription(model: Model) -> list[Sub[Msg]]: + """Platform event subscriptions. + + This function registers the platform subscriptions we want to listen for. When the platform + decides its time to poll or shutdown the platform will emit those commands to the runtime and + the runtime will inform us (the application) so we can handle the situation approporiately. + """ + return [ + Join(msg=lambda: Flush(model.buffer)), + Poll(msg=lambda: TryFlush(now=time.time())), + ] + + +# Helpers. + + +def can_flush(model: Model, now: float) -> bool: + return ( + len(model.buffer) >= model.max_buffer_length + or (now - model.max_buffer_wait) >= model.last_flushed_at + ) + + +@dataclass(frozen=True) +class FlushBuffer: + buffer: list[ProcessedRecordingMessage] + max_workers: int + + def __call__(self) -> Result[float, list[bool]]: + @sentry_sdk.trace + def flush_message(message: ProcessedRecordingMessage) -> None: + with contextlib.suppress(DropSilently): + commit_recording_message(message) + + if len(self.buffer) == 0: + return Result.ok(time.time()) + + with ThreadPoolExecutor(max_workers=self.max_workers) as pool: + waiter = wait(pool.submit(flush_message, message) for message in self.buffer) + errors = [future.exception() for future in waiter.done] + + # Recording metadata is not tracked in the threadpool. This is because this function will + # log. Logging will acquire a lock and make our threading less useful due to the speed of + # the I/O we do in this step. + for message, error in zip(self.buffer, errors): + if error is None: + track_recording_metadata(message) + + errs = [] + for error in errors: + if isinstance(error, GCS_RETRYABLE_ERRORS): + errs.append(True) + elif error is None: + errs.append(False) + else: + # Unhandled exceptions are logged and do not block ingestion. + logger.error("Unhandled error in flush buffer.", exc_info=error) + errs.append(False) + + if any(errs): + return Result.err(errs) + + return Result.ok(time.time()) + + +# Consumer. + + +recording_consumer = RunTime( + init=init, + process=process, + subscription=subscription, + update=update, +) diff --git a/src/sentry/replays/consumers/buffered/factory.py b/src/sentry/replays/consumers/buffered/factory.py new file mode 100644 index 00000000000000..5cc3ba59bbecea --- /dev/null +++ b/src/sentry/replays/consumers/buffered/factory.py @@ -0,0 +1,34 @@ +"""Session Replay recording consumer strategy factory. + +This module exists solely to abstract the bootstraping process of the application and runtime in +`sentry/consumers/__init__.py`. +""" + +from collections.abc import Mapping + +from arroyo.backends.kafka.consumer import KafkaPayload +from arroyo.processing.strategies import ProcessingStrategy, ProcessingStrategyFactory +from arroyo.processing.strategies.commit import CommitOffsets +from arroyo.types import Commit, Partition + +from sentry.replays.consumers.buffered.consumer import Flags, recording_consumer +from sentry.replays.consumers.buffered.platform import PlatformStrategy + + +class PlatformStrategyFactory(ProcessingStrategyFactory[KafkaPayload]): + + def __init__(self, max_buffer_length: int, max_buffer_wait: int, max_workers: int) -> None: + self.flags: Flags = { + "max_buffer_length": max_buffer_length, + "max_buffer_wait": max_buffer_wait, + "max_workers": max_workers, + } + + def create_with_partitions( + self, + commit: Commit, + partitions: Mapping[Partition, int], + ) -> ProcessingStrategy[KafkaPayload]: + return PlatformStrategy( + next_step=CommitOffsets(commit), flags=self.flags, runtime=recording_consumer + ) diff --git a/src/sentry/replays/consumers/buffered/platform.py b/src/sentry/replays/consumers/buffered/platform.py new file mode 100644 index 00000000000000..bf07962b3532d9 --- /dev/null +++ b/src/sentry/replays/consumers/buffered/platform.py @@ -0,0 +1,242 @@ +from collections.abc import Callable, MutableMapping +from dataclasses import dataclass +from datetime import datetime +from typing import Any, Generic, TypeVar + +from arroyo.backends.kafka.consumer import KafkaPayload +from arroyo.processing.strategies import MessageRejected, ProcessingStrategy +from arroyo.types import FilteredPayload, Message, Partition, Value + +Output = TypeVar("Output") +Input = KafkaPayload + + +class PlatformStrategy(ProcessingStrategy[FilteredPayload | Input], Generic[Output]): + + def __init__( + self, + flags: "Flags", + runtime: "RunTime[Model, Msg, Flags, Output]", + next_step: "ProcessingStrategy[Output]", + ) -> None: + # The RunTime is made aware of the commit strategy. It will + # submit the partition, offset mapping it wants committed. + runtime.setup(flags, self._handle_next_step) + + self.__closed = False + self.__next_step = next_step + self.__offsets: MutableMapping[Partition, int] = {} + self.__runtime = runtime + + def submit(self, message: Message[FilteredPayload | KafkaPayload]) -> None: + assert not self.__closed + + if isinstance(message.payload, KafkaPayload): + self.__runtime.submit(message.payload.value) + self.__offsets.update(message.committable) + else: + self.__offsets.update(message.committable) + + def poll(self) -> None: + assert not self.__closed + + try: + self.__runtime.publish("poll") + except MessageRejected: + pass + + self.__next_step.poll() + + def close(self) -> None: + self.__closed = True + + def terminate(self) -> None: + self.__closed = True + self.__next_step.terminate() + + def join(self, timeout: float | None = None) -> None: + try: + self.__runtime.publish("join") + except MessageRejected: + pass + + self.__next_step.close() + self.__next_step.join(timeout) + + def _handle_next_step(self, value: Output) -> None: + self.__next_step.submit(Message(Value(value, self.__offsets, datetime.now()))) + + +# A Model represents the state of your application. It is a type variable and the RunTime is +# generic over it. Your state can be anything from a simple integer to a large class with many +# fields. +Model = TypeVar("Model") + +# A Msg represents the commands an application can issue to itself. These commands update the state +# and optionally issue commands to the RunTime. +Msg = TypeVar("Msg") + +# A generic type representing the structure of the flags passed to the RunTime instance. +Flags = TypeVar("Flags") + + +@dataclass(frozen=True) +class Commit(Generic[Msg, Output]): + """Instructs the RunTime to produce to the next step.""" + + msg: Msg + value: Output + + +@dataclass(frozen=True) +class Effect(Generic[Msg]): + """Instructs the RunTime to perform a managed effect. + + If the RunTime performs an effect for the application it means the RunTime can dictate if the + effect blocks the application, if the effect executes at all, or perform any additional + operations before or after the effect. This has significant implications for RunTime + performance and application testability. + """ + + fun: Callable[[], Any] + msg: Callable[[Any], Msg] + + +class Nothing: + """Instructs the RunTime to do nothing.""" + + +@dataclass(frozen=True) +class Task(Generic[Msg]): + """Instructs the RunTime to emit an application message back to the application.""" + + msg: Msg + + +# A "Cmd" is the union of all the commands an application can issue to the RunTime. The RunTime +# accepts these commands and handles them in some pre-defined way. Commands are fixed and can not +# be registered on a per application basis. +Cmd = Commit[Msg, Output] | Effect[Msg] | Nothing | Task[Msg] + + +@dataclass(frozen=True) +class Join(Generic[Msg]): + """Join subscription class. + + The platform may need to quit. When this happens the RunTime needs to know. The application + may or may not need to know. The Join subscription allows aplications to subscribe to join + events and handle them in the way they see fit. + """ + + msg: Callable[[], Msg] + name = "join" + + +@dataclass(frozen=True) +class Poll(Generic[Msg]): + """Poll subscription class. + + The platform will periodically poll the RunTime. The application may or may not subscribe to + these events and choose to act on them. + """ + + msg: Callable[[], Msg] + name = "poll" + + +# A "Sub" is the union of all the commands the Platform can issue to an application. The Platform +# will occassionally emit actions which are intercepted by the RunTime. The RunTime translates +# these actions into a set of predefined subscriptions. These subscriptions are exposed to the +# application and the developer is free to handle them in the way they see fit. +Sub = Join[Msg] | Poll[Msg] + + +class RunTime(Generic[Model, Msg, Flags, Output]): + """RunTime object. + + The RunTime is an intermediate data structure which manages communication between the platform + and the application. It formalizes state transformations and abstracts the logic of the + platform. Commands are declaratively issued rather than defined within the logic of the + application. Commands can be issued bidirectionally with "Cmd" types flowing from the + application to the platform and "Sub" types flowing from the platform to the application. + """ + + def __init__( + self, + init: Callable[[Flags], tuple[Model, Cmd[Msg, Output]]], + process: Callable[[Model, bytes], Msg], + subscription: Callable[[Model], list[Sub[Msg]]], + update: Callable[[Model, Msg], tuple[Model, Cmd[Msg, Output]]], + ) -> None: + self.init = init + self.process = process + self.subscription = subscription + self.update = update + + self._next_step: Callable[[Output], None] | None = None + self._model: Model | None = None + self._subscriptions: dict[str, Sub[Msg]] = {} + + @property + def model(self) -> Model: + assert self._model is not None + return self._model + + @property + def next_step(self) -> Callable[[Output], None]: + assert self._next_step is not None + return self._next_step + + # NOTE: Could this be a factory function that produces RunTimes instead? That way we don't need + # the assert checks on model and commit. + def setup(self, flags: Flags, next_step: Callable[[Output], None]) -> None: + self._next_step = next_step + + model, cmd = self.init(flags) + self._model = model + self._handle_cmd(cmd) + self._register_subscriptions() + + def submit(self, message: bytes) -> None: + self._handle_msg(self.process(self.model, message)) + + def publish(self, sub_name: str) -> None: + # For each new subscription event we re-register the subscribers in case anything within + # the application has changed. I.e. the model is in some new state and that means we care + # about a new subscription or don't care about an old one. + self._register_subscriptions() + + # Using the subscription's name look for the subscription in the registry. + sub = self._subscriptions.get(sub_name) + if sub is None: + return None + + # Match of the various subscription types and emit a mesasge to the RunTime. Right now + # there's no need to match. The name disambiguates enough already but in the future more + # subscriptions might do more complex things. + match sub: + case Join(msg=msg): + return self._handle_msg(msg()) + case Poll(msg=msg): + return self._handle_msg(msg()) + + def _handle_msg(self, msg: Msg) -> None: + model, cmd = self.update(self.model, msg) + self._model = model + self._handle_cmd(cmd) + + def _handle_cmd(self, cmd: Cmd[Msg, Output]) -> None: + match cmd: + case Commit(msg=msg, value=value): + self.next_step(value) + return self._handle_msg(msg) + case Effect(msg=msg, fun=fun): + return self._handle_msg(msg(fun())) + case Nothing(): + return None + case Task(msg=msg): + return self._handle_msg(msg) + + def _register_subscriptions(self) -> None: + for sub in self.subscription(self.model): + self._subscriptions[sub.name] = sub diff --git a/src/sentry/replays/consumers/buffered/types.py b/src/sentry/replays/consumers/buffered/types.py new file mode 100644 index 00000000000000..f2fde506c98632 --- /dev/null +++ b/src/sentry/replays/consumers/buffered/types.py @@ -0,0 +1,52 @@ +from typing import Generic, TypeVar + +T = TypeVar("T") # Success type +E = TypeVar("E") # Error type + + +class Result(Generic[T, E]): + """A Result type that can either contain a success value of type T or an error value of type E.""" + + def __init__(self, value: T | None = None, error: E | None = None): + if (value is None and error is None) or (value is not None and error is not None): + raise ValueError("Result must have either a value or an error, but not both") + self._value = value + self._error = error + + @property + def is_ok(self) -> bool: + """Returns True if the result contains a success value.""" + return self._value is not None + + @property + def is_err(self) -> bool: + """Returns True if the result contains an error value.""" + return self._error is not None + + def unwrap(self) -> T: + """Returns the success value if present, raises ValueError otherwise.""" + if self._value is None: + raise ValueError(f"Cannot unwrap error result: {self._error}") + return self._value + + def unwrap_err(self) -> E: + """Returns the error value if present, raises ValueError otherwise.""" + if self._error is None: + raise ValueError(f"Cannot unwrap ok result: {self._value}") + return self._error + + @classmethod + def ok(cls, value: T) -> "Result[T, E]": + """Creates a new Result with a success value.""" + return cls(value=value) + + @classmethod + def err(cls, error: E) -> "Result[T, E]": + """Creates a new Result with an error value.""" + return cls(error=error) + + def map(self, f: callable[[T], T]) -> "Result[T, E]": + """Applies a function to the success value if present.""" + if self.is_ok: + return Result.ok(f(self._value)) + return self diff --git a/src/sentry/replays/usecases/ingest/__init__.py b/src/sentry/replays/usecases/ingest/__init__.py index ea5c479318dde3..e4b7f2a02d86ee 100644 --- a/src/sentry/replays/usecases/ingest/__init__.py +++ b/src/sentry/replays/usecases/ingest/__init__.py @@ -4,12 +4,14 @@ import logging import time import zlib +from contextlib import contextmanager from datetime import datetime, timezone from typing import Any, TypedDict, cast import sentry_sdk import sentry_sdk.scope from django.conf import settings +from msgpack.exceptions import UnpackException from sentry_kafka_schemas.codecs import Codec, ValidationError from sentry_kafka_schemas.schema_types.ingest_replay_recordings_v1 import ReplayRecording from sentry_sdk import set_tag @@ -91,30 +93,30 @@ class RecordingIngestMessage: replay_video: bytes | None -def ingest_recording(message_bytes: bytes) -> None: - """Ingest non-chunked recording messages.""" +@contextmanager +def sentry_tracing(name: str): + sample_rate = getattr(settings, "SENTRY_REPLAY_RECORDINGS_CONSUMER_APM_SAMPLING", 0) isolation_scope = sentry_sdk.Scope.get_isolation_scope().fork() - with sentry_sdk.scope.use_isolation_scope(isolation_scope): with sentry_sdk.start_transaction( - name="replays.consumer.process_recording", - op="replays.consumer", - custom_sampling_context={ - "sample_rate": getattr( - settings, "SENTRY_REPLAY_RECORDINGS_CONSUMER_APM_SAMPLING", 0 - ) - }, + name=name, op="replays.consumer", custom_sampling_context={"sample_rate": sample_rate} ): - try: - message = parse_recording_message(message_bytes) - if message.org_id in options.get("replay.consumer.separate-compute-and-io-org-ids"): - _ingest_recording_separated_io_compute(message) - else: - _ingest_recording(message) - except DropSilently: - # The message couldn't be parsed for whatever reason. We shouldn't block the consumer - # so we ignore it. - pass + yield + + +def ingest_recording(message_bytes: bytes) -> None: + """Ingest non-chunked recording messages.""" + with sentry_tracing("replays.consumer.process_recording"): + try: + message = parse_recording_message(message_bytes) + if message.org_id in options.get("replay.consumer.separate-compute-and-io-org-ids"): + _ingest_recording_separated_io_compute(message) + else: + _ingest_recording(message) + except DropSilently: + # The message couldn't be parsed for whatever reason. We shouldn't block the consumer + # so we ignore it. + pass @sentry_sdk.trace @@ -312,8 +314,8 @@ def recording_post_processor( def parse_recording_message(message: bytes) -> RecordingIngestMessage: try: message_dict: ReplayRecording = RECORDINGS_CODEC.decode(message) - except ValidationError: - logger.exception("Could not decode recording message.") + except (ValidationError, UnpackException, ValueError): + logger.exception("Could not decode replay recording message.") raise DropSilently() return RecordingIngestMessage( diff --git a/tests/sentry/replays/consumers/test_recording.py b/tests/sentry/replays/consumers/test_recording.py index e2958e54dd7a88..627de49ac7046d 100644 --- a/tests/sentry/replays/consumers/test_recording.py +++ b/tests/sentry/replays/consumers/test_recording.py @@ -13,6 +13,7 @@ from sentry_kafka_schemas.schema_types.ingest_replay_recordings_v1 import ReplayRecording from sentry.models.organizationonboardingtask import OnboardingTask, OnboardingTaskStatus +from sentry.replays.consumers.buffered.factory import PlatformStrategyFactory from sentry.replays.consumers.recording import ProcessReplayRecordingStrategyFactory from sentry.replays.consumers.recording_buffered import ( RecordingBufferedStrategyFactory, @@ -581,3 +582,8 @@ def test_invalid_message(self): {"replay.consumer.separate-compute-and-io-org-ids": [self.organization.id]} ): super().test_invalid_message() + + +class BufferedRunTimeConsumerTestCase(RecordingTestCase): + def processing_factory(self): + return PlatformStrategyFactory(max_buffer_length=8, max_buffer_wait=1, max_workers=8) diff --git a/tests/sentry/replays/unit/consumers/__init__.py b/tests/sentry/replays/unit/consumers/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/tests/sentry/replays/unit/consumers/test_helpers.py b/tests/sentry/replays/unit/consumers/test_helpers.py new file mode 100644 index 00000000000000..1922451e5fe802 --- /dev/null +++ b/tests/sentry/replays/unit/consumers/test_helpers.py @@ -0,0 +1,45 @@ +from sentry.replays.consumers.buffered.platform import Flags, Model, Msg, Output, RunTime + + +class SandboxRunTime(RunTime[Model, Msg, Flags, Output]): + def _handle_msg(self, msg): + # The first msg returned by the submit function needs to be yielded. From this point + # onward we'll only intercept commands and we'll send msgs. + yield msg + + while True: + model, cmd = self.update(self.model, msg) + self._model = model + + # The application wants the runtime to execute this command but we're intercepting it + # and forcing the test suite to decide what msg should be produced. + msg = yield cmd + + def submit(self, message): + yield from self._handle_msg(self.process(self.model, message)) + + def publish(self, sub_name: str): + self._register_subscriptions() + + sub = self._subscriptions.get(sub_name) + if sub is None: + return None + + msg = yield sub + yield from self._handle_msg(msg) + + +class MockNextStep: + def __init__(self): + self.values = [] + + def __call__(self, value): + self.values.append(value) + + +class MockSink: + def __init__(self): + self.accepted = [] + + def accept(self, buffer): + self.accepted.extend(buffer) diff --git a/tests/sentry/replays/unit/consumers/test_recording.py b/tests/sentry/replays/unit/consumers/test_recording.py new file mode 100644 index 00000000000000..8b20c63dd68286 --- /dev/null +++ b/tests/sentry/replays/unit/consumers/test_recording.py @@ -0,0 +1,313 @@ +import time +import uuid +import zlib + +from sentry_kafka_schemas.schema_types.ingest_replay_recordings_v1 import ReplayRecording + +from sentry.conf.types.kafka_definition import Topic, get_topic_codec +from sentry.replays.consumers.buffered.consumer import ( + Append, + Committed, + Flush, + FlushBuffer, + Flushed, + Skip, + TryFlush, + init, + process, + subscription, + update, +) +from sentry.replays.consumers.buffered.platform import Commit, Effect, Nothing, Task +from sentry.replays.usecases.ingest import ProcessedRecordingMessage +from sentry.replays.usecases.ingest.event_parser import ParsedEventMeta +from tests.sentry.replays.unit.consumers.test_helpers import MockNextStep, SandboxRunTime + +RECORDINGS_CODEC = get_topic_codec(Topic.INGEST_REPLAYS_RECORDINGS) + + +def test_end_to_end_message_processing(): + """End to end test of the recording consumer.""" + runtime = _make_runtime() + + message: ReplayRecording = { + "key_id": None, + "org_id": 1, + "payload": b'{"segment_id":0}\n[]', # type: ignore[typeddict-item] + "project_id": 1, + "received": int(time.time()), + "replay_event": None, + "replay_id": uuid.uuid4().hex, + "replay_video": None, + "retention_days": 30, + "type": "replay_recording_not_chunked", + "version": 1, + } + message_bytes = RECORDINGS_CODEC.encode(message) + + gen = runtime.submit(message_bytes) + + # Assert the application does not append the message to the buffer. + msg = next(gen) + assert isinstance(msg, Append) + + # Assert the application gets the current time after appending the message and then attempts to + # flush the buffer with the current time. + cmd = next(gen) + assert cmd == Effect(fun=time.time, msg=TryFlush) + assert runtime.model.buffer == [ + ProcessedRecordingMessage( + actions_event=ParsedEventMeta([], [], [], [], [], []), + filedata=zlib.compress(b"[]"), + filename=runtime.model.buffer[0].filename, + is_replay_video=False, + key_id=None, + org_id=1, + project_id=1, + received=message["received"], + recording_size_uncompressed=2, + recording_size=runtime.model.buffer[0].recording_size, + retention_days=30, + replay_id=message["replay_id"], + segment_id=0, + video_size=None, + replay_event=None, + ) + ] + + # Give the application timestamps that are too early to flush (including the one the + # application wanted to generate). + assert isinstance(gen.send(TryFlush(now=1)), Nothing) + assert isinstance(gen.send(TryFlush(now=2)), Nothing) + assert isinstance(gen.send(TryFlush(now=3)), Nothing) + assert cmd.msg(1) == TryFlush(now=1) + assert isinstance(gen.send(cmd.msg(cmd.fun())), Nothing) + + # Give the application a timestamp that will trigger a flush. + cmd = gen.send(TryFlush(now=time.time() + 1)) + assert isinstance(cmd, Task) + assert isinstance(cmd.msg, Flush) + + # Assert the application triggered a buffer flush and forward the next msg to the application. + cmd = gen.send(cmd.msg) + assert isinstance(cmd, Effect) + assert cmd.fun == FlushBuffer(runtime.model) + assert len(runtime.model.buffer) == 1 + + # Assert the successful flush triggers a commit command. + cmd = gen.send(cmd.msg(1)) + assert len(runtime.model.buffer) == 0 + assert runtime.model.last_flushed_at == 1 + assert isinstance(cmd, Commit) + assert isinstance(cmd.msg, Committed) + + +def test_invalid_message_format(): + """Test message with invalid message format.""" + runtime = _make_runtime() + + # We submit a message which can't be parsed and will not be buffered. Flush is not triggered. + gen = runtime.submit(b"invalid") + + # Assert the application does not append the message to the buffer. + msg = next(gen) + assert isinstance(msg, Skip) + + # Application tries to flush. + cmd = next(gen) + assert len(runtime.model.buffer) == 0 + assert cmd == Effect(fun=time.time, msg=TryFlush) + assert cmd.msg(1) == TryFlush(now=1) + assert isinstance(gen.send(cmd.msg(cmd.fun())), Nothing) + + +def test_invalid_recording_json(): + """Test message with invalid recording JSON.""" + runtime = _make_runtime() + + message: ReplayRecording = { + "key_id": None, + "org_id": 1, + "payload": b'{"segment_id":0}\n[', # type: ignore[typeddict-item] + "project_id": 1, + "received": int(time.time()), + "replay_event": None, + "replay_id": uuid.uuid4().hex, + "replay_video": None, + "retention_days": 30, + "type": "replay_recording_not_chunked", + "version": 1, + } + message_bytes = RECORDINGS_CODEC.encode(message) + + gen = runtime.submit(message_bytes) + + # Assert the application appends the message to the buffer. + msg = next(gen) + assert isinstance(msg, Append) + + # Application tries to flush. + cmd = next(gen) + assert len(runtime.model.buffer) == 1 + assert cmd == Effect(fun=time.time, msg=TryFlush) + assert cmd.msg(1) == TryFlush(now=1) + assert isinstance(gen.send(cmd.msg(cmd.fun())), Nothing) + + +def test_missing_headers(): + """Test message with missing headers.""" + runtime = _make_runtime() + + message: ReplayRecording = { + "key_id": None, + "org_id": 1, + "payload": b"[]", # type: ignore[typeddict-item] + "project_id": 1, + "received": int(time.time()), + "replay_event": None, + "replay_id": uuid.uuid4().hex, + "replay_video": None, + "retention_days": 30, + "type": "replay_recording_not_chunked", + "version": 1, + } + message_bytes = RECORDINGS_CODEC.encode(message) + + gen = runtime.submit(message_bytes) + + # Assert the application does not append the message to the buffer. + msg = next(gen) + assert isinstance(msg, Skip) + + # Application tries to flush. + cmd = next(gen) + assert len(runtime.model.buffer) == 0 + assert cmd == Effect(fun=time.time, msg=TryFlush) + assert cmd.msg(1) == TryFlush(now=1) + assert isinstance(gen.send(cmd.msg(cmd.fun())), Nothing) + + +# def test_flush_failure_gcs_outage(): +# ... + + +# def test_flush_random_exception(): +# ... + + +# def test_flush + + +def test_buffer_full_semantics(): + runtime = _make_runtime() + + message: ReplayRecording = { + "key_id": None, + "org_id": 1, + "payload": b'{"segment_id":0}\n[]', # type: ignore[typeddict-item] + "project_id": 1, + "received": int(time.time()), + "replay_event": None, + "replay_id": uuid.uuid4().hex, + "replay_video": None, + "retention_days": 30, + "type": "replay_recording_not_chunked", + "version": 1, + } + message_bytes = RECORDINGS_CODEC.encode(message) + + gen = runtime.submit(message_bytes) + + # Assert the application appends the message to the buffer. + msg = next(gen) + assert isinstance(msg, Append) + + # Application tries to flush. + cmd = next(gen) + assert cmd == Effect(fun=time.time, msg=TryFlush) + + # Assert the TryFlush msg produced by the runtime had no effect because the buffer was not full + # and the wait interval was not exceeded. + assert isinstance(gen.send(cmd.msg(cmd.fun())), Nothing) + + # We submit another message which will be buffered and trigger a flush. + gen = runtime.submit(message_bytes) + + # Assert the application appends the message to the buffer. + msg = next(gen) + assert isinstance(msg, Append) + + # Application tries to flush. + cmd = next(gen) + assert cmd == Effect(fun=time.time, msg=TryFlush) + assert cmd.msg(1) == TryFlush(now=1) + + # Assert a flush command is triggered from the msg produced by the runtime. + cmd = gen.send(cmd.msg(cmd.fun())) + assert isinstance(cmd, Task) + assert isinstance(cmd.msg, Flush) + + # Trigger the flush and intercept the effect. + cmd = gen.send(cmd.msg) + assert isinstance(cmd, Effect) + assert cmd.fun == FlushBuffer(runtime.model) + assert cmd.msg(1) == Flushed(now=1) + + # Record a successful flush. + cmd = gen.send(cmd.msg(now=42)) + assert runtime.model.last_flushed_at == 42 + assert len(runtime.model.buffer) == 1 + assert cmd == Commit(msg=Committed(), value=None) + + +def test_buffer_timeout(): + runtime = _make_runtime() + + message: ReplayRecording = { + "key_id": None, + "org_id": 1, + "payload": b'{"segment_id":0}\n[]', # type: ignore[typeddict-item] + "project_id": 1, + "received": int(time.time()), + "replay_event": None, + "replay_id": uuid.uuid4().hex, + "replay_video": None, + "retention_days": 30, + "type": "replay_recording_not_chunked", + "version": 1, + } + message_bytes = RECORDINGS_CODEC.encode(message) + + gen = runtime.submit(message_bytes) + + # Assert the application does not append the message to the buffer. + msg = next(gen) + assert isinstance(msg, Append) + + # Application tries to flush. + cmd = next(gen) + assert cmd == Effect(fun=time.time, msg=TryFlush) + assert cmd.msg(1) == TryFlush(now=1) + + # Assert the TryFlush msg produced by the runtime had no effect because the wait interval was + # not exceeded. + assert isinstance(gen.send(cmd.msg(cmd.fun())), Nothing) + + # Now we emit a new TryFlush message with a timestamp in the future. This triggers the runtime + # to flush because its flush interval has been exceeded. + cmd = gen.send(TryFlush(now=time.time() + 1)) + assert isinstance(cmd, Task) + assert isinstance(cmd.msg, Flush) + + +def _make_runtime(): + runtime = SandboxRunTime(init, process, subscription, update) + runtime.setup( + { + "max_buffer_length": 2, + "max_buffer_wait": 1, + "max_workers": 1, + }, + MockNextStep(), + ) + return runtime diff --git a/tests/sentry/replays/unit/consumers/test_runtime.py b/tests/sentry/replays/unit/consumers/test_runtime.py new file mode 100644 index 00000000000000..151141c387ed96 --- /dev/null +++ b/tests/sentry/replays/unit/consumers/test_runtime.py @@ -0,0 +1,86 @@ +from sentry.replays.consumers.buffered.platform import Join, Nothing, Poll, RunTime +from tests.sentry.replays.unit.consumers.test_helpers import MockNextStep + + +def counter_runtime() -> RunTime[int, str, None, None]: + def init(_): + return (22, None) + + def process(_model, message): + if message == b"incr": + return "incr" + elif message == b"decr": + return "decr" + else: + return "nothing" + + def update(model, msg): + if msg == "incr": + return (model + 1, Nothing()) + elif msg == "decr": + return (model - 1, Nothing()) + elif msg == "join": + return (-10, Nothing()) + elif msg == "poll": + return (99, Nothing()) + elif msg == "nothing": + return (model, Nothing()) + else: + raise ValueError("Unknown msg") + + def subscription(_): + return [ + Join(msg=lambda: "join"), + Poll(msg=lambda: "poll"), + ] + + return RunTime( + init=init, + process=process, + update=update, + subscription=subscription, + ) + + +def test_runtime_setup(): + runtime = counter_runtime() + runtime.setup(None, next_step=MockNextStep()) + assert runtime.model == 22 + + +def test_runtime_submit(): + # RunTime defaults to a start point of 22. + runtime = counter_runtime() + runtime.setup(None, next_step=MockNextStep()) + assert runtime.model == 22 + + # Two incr commands increase the count by 2. + runtime.submit(b"incr") + runtime.submit(b"incr") + assert runtime.model == 24 + + # Four decr commands decrease the count by 4. + runtime.submit(b"decr") + runtime.submit(b"decr") + runtime.submit(b"decr") + runtime.submit(b"decr") + assert runtime.model == 20 + + # Messages which the application does not understand do nothing to the model. + runtime.submit(b"other") + assert runtime.model == 20 + + +def test_runtime_publish(): + # RunTime defaults to a start point of 22. + runtime = counter_runtime() + runtime.setup(None, next_step=MockNextStep()) + assert runtime.model == 22 + + # A join event updates the model and sets it to -10. + runtime.publish("join") + assert runtime.model == -10 + + # A poll event updates the model and sets it to 99. + runtime.publish("poll") + assert runtime.model == 99