diff --git a/src/sentry/replays/consumers/buffered/consumer.py b/src/sentry/replays/consumers/buffered/consumer.py index 9794dd92b71418..a1937aa87a62a2 100644 --- a/src/sentry/replays/consumers/buffered/consumer.py +++ b/src/sentry/replays/consumers/buffered/consumer.py @@ -8,12 +8,13 @@ import contextlib import logging import time -from concurrent.futures import FIRST_EXCEPTION, ThreadPoolExecutor, wait +from concurrent.futures import ThreadPoolExecutor, wait from dataclasses import dataclass from typing import TypedDict import sentry_sdk +from sentry.filestore.gcs import GCS_RETRYABLE_ERRORS from sentry.replays.consumers.buffered.platform import ( Cmd, Commit, @@ -25,6 +26,7 @@ Sub, Task, ) +from sentry.replays.consumers.buffered.types import Result from sentry.replays.usecases.ingest import ( DropSilently, ProcessedRecordingMessage, @@ -61,19 +63,23 @@ class Append: item: ProcessedRecordingMessage +@dataclass(frozen=True) class Committed: """The platform committed offsets. Our buffer is now completely done.""" +@dataclass(frozen=True) class Flush: """Our application hit the flush threshold and has been instructed to flush.""" + buffer: list[ProcessedRecordingMessage] + @dataclass(frozen=True) class Flushed: """Our application successfully flushed.""" - now: float + result: Result[float, list[bool]] class Skip: @@ -144,14 +150,24 @@ def update(model: Model, msg: Msg) -> tuple[Model, Cmd[Msg, None]]: return (model, Effect(fun=time.time, msg=TryFlush)) case Committed(): return (model, Nothing()) - case Flush(): - return (model, Effect(fun=FlushBuffer(model), msg=Flushed)) - case Flushed(now=now): - model.buffer = [] - model.last_flushed_at = now - return (model, Commit(msg=Committed(), value=None)) + case Flush(buffer=buffer): + return (model, Effect(fun=FlushBuffer(buffer, model.max_workers), msg=Flushed)) + case Flushed(result=result): + if result.is_ok: + value = result.unwrap() + model.buffer = [] + model.last_flushed_at = value + model.retries = 0 + return (model, Commit(msg=Committed(), value=None)) + else: + buffer = [item for item, error in zip(model.buffer, result.unwrap_err()) if error] + logger.info("[FLUSHED] Retrying %d/%d messages.", len(buffer), len(model.buffer)) + return (model, Task(msg=Flush(buffer=buffer))) case TryFlush(now=now): - return (model, Task(msg=Flush())) if can_flush(model, now) else (model, Nothing()) + if can_flush(model, now): + return (model, Task(msg=Flush(buffer=model.buffer))) + else: + return (model, Nothing()) def subscription(model: Model) -> list[Sub[Msg]]: @@ -162,7 +178,7 @@ def subscription(model: Model) -> list[Sub[Msg]]: the runtime will inform us (the application) so we can handle the situation approporiately. """ return [ - Join(msg=Flush), + Join(msg=lambda: Flush(model.buffer)), Poll(msg=lambda: TryFlush(now=time.time())), ] @@ -179,38 +195,44 @@ def can_flush(model: Model, now: float) -> bool: @dataclass(frozen=True) class FlushBuffer: - model: Model + buffer: list[ProcessedRecordingMessage] + max_workers: int - def __call__(self) -> float: + def __call__(self) -> Result[float, list[bool]]: @sentry_sdk.trace def flush_message(message: ProcessedRecordingMessage) -> None: with contextlib.suppress(DropSilently): commit_recording_message(message) - if len(self.model.buffer) == 0: - return time.time() + if len(self.buffer) == 0: + return Result.ok(time.time()) - with ThreadPoolExecutor(max_workers=self.model.max_workers) as pool: - futures = [pool.submit(flush_message, message) for message in self.model.buffer] - - # Tasks can fail. We check the done set for any failures. We will wait for all the - # futures to complete before running this step or eagerly run this step if any task - # errors. - done, _ = wait(futures, return_when=FIRST_EXCEPTION) - for future in done: - exc = future.exception() - # Raising preserves the existing behavior. We can address error handling in a - # follow up. - if exc is not None and not isinstance(exc, DropSilently): - raise exc + with ThreadPoolExecutor(max_workers=self.max_workers) as pool: + waiter = wait(pool.submit(flush_message, message) for message in self.buffer) + errors = [future.exception() for future in waiter.done] # Recording metadata is not tracked in the threadpool. This is because this function will # log. Logging will acquire a lock and make our threading less useful due to the speed of # the I/O we do in this step. - for message in self.model.buffer: - track_recording_metadata(message) - - return time.time() + for message, error in zip(self.buffer, errors): + if error is None: + track_recording_metadata(message) + + errs = [] + for error in errors: + if isinstance(error, GCS_RETRYABLE_ERRORS): + errs.append(True) + elif error is None: + errs.append(False) + else: + # Unhandled exceptions are logged and do not block ingestion. + logger.error("Unhandled error in flush buffer.", exc_info=error) + errs.append(False) + + if any(errs): + return Result.err(errs) + + return Result.ok(time.time()) # Consumer. diff --git a/src/sentry/replays/consumers/buffered/types.py b/src/sentry/replays/consumers/buffered/types.py new file mode 100644 index 00000000000000..f2fde506c98632 --- /dev/null +++ b/src/sentry/replays/consumers/buffered/types.py @@ -0,0 +1,52 @@ +from typing import Generic, TypeVar + +T = TypeVar("T") # Success type +E = TypeVar("E") # Error type + + +class Result(Generic[T, E]): + """A Result type that can either contain a success value of type T or an error value of type E.""" + + def __init__(self, value: T | None = None, error: E | None = None): + if (value is None and error is None) or (value is not None and error is not None): + raise ValueError("Result must have either a value or an error, but not both") + self._value = value + self._error = error + + @property + def is_ok(self) -> bool: + """Returns True if the result contains a success value.""" + return self._value is not None + + @property + def is_err(self) -> bool: + """Returns True if the result contains an error value.""" + return self._error is not None + + def unwrap(self) -> T: + """Returns the success value if present, raises ValueError otherwise.""" + if self._value is None: + raise ValueError(f"Cannot unwrap error result: {self._error}") + return self._value + + def unwrap_err(self) -> E: + """Returns the error value if present, raises ValueError otherwise.""" + if self._error is None: + raise ValueError(f"Cannot unwrap ok result: {self._value}") + return self._error + + @classmethod + def ok(cls, value: T) -> "Result[T, E]": + """Creates a new Result with a success value.""" + return cls(value=value) + + @classmethod + def err(cls, error: E) -> "Result[T, E]": + """Creates a new Result with an error value.""" + return cls(error=error) + + def map(self, f: callable[[T], T]) -> "Result[T, E]": + """Applies a function to the success value if present.""" + if self.is_ok: + return Result.ok(f(self._value)) + return self diff --git a/tests/sentry/replays/unit/consumers/test_recording.py b/tests/sentry/replays/unit/consumers/test_recording.py index 2c90d39b719113..8b20c63dd68286 100644 --- a/tests/sentry/replays/unit/consumers/test_recording.py +++ b/tests/sentry/replays/unit/consumers/test_recording.py @@ -10,6 +10,7 @@ Committed, Flush, FlushBuffer, + Flushed, Skip, TryFlush, init, @@ -186,6 +187,17 @@ def test_missing_headers(): assert isinstance(gen.send(cmd.msg(cmd.fun())), Nothing) +# def test_flush_failure_gcs_outage(): +# ... + + +# def test_flush_random_exception(): +# ... + + +# def test_flush + + def test_buffer_full_semantics(): runtime = _make_runtime() @@ -235,6 +247,18 @@ def test_buffer_full_semantics(): assert isinstance(cmd, Task) assert isinstance(cmd.msg, Flush) + # Trigger the flush and intercept the effect. + cmd = gen.send(cmd.msg) + assert isinstance(cmd, Effect) + assert cmd.fun == FlushBuffer(runtime.model) + assert cmd.msg(1) == Flushed(now=1) + + # Record a successful flush. + cmd = gen.send(cmd.msg(now=42)) + assert runtime.model.last_flushed_at == 42 + assert len(runtime.model.buffer) == 1 + assert cmd == Commit(msg=Committed(), value=None) + def test_buffer_timeout(): runtime = _make_runtime()