Skip to content

Commit

Permalink
Sketch selective retry
Browse files Browse the repository at this point in the history
  • Loading branch information
cmanallen committed Mar 5, 2025
1 parent c8d5745 commit e349232
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 31 deletions.
84 changes: 53 additions & 31 deletions src/sentry/replays/consumers/buffered/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
import contextlib
import logging
import time
from concurrent.futures import FIRST_EXCEPTION, ThreadPoolExecutor, wait
from concurrent.futures import ThreadPoolExecutor, wait
from dataclasses import dataclass
from typing import TypedDict

import sentry_sdk

from sentry.filestore.gcs import GCS_RETRYABLE_ERRORS
from sentry.replays.consumers.buffered.platform import (
Cmd,
Commit,
Expand All @@ -25,6 +26,7 @@
Sub,
Task,
)
from sentry.replays.consumers.buffered.types import Result
from sentry.replays.usecases.ingest import (
DropSilently,
ProcessedRecordingMessage,
Expand Down Expand Up @@ -61,19 +63,23 @@ class Append:
item: ProcessedRecordingMessage


@dataclass(frozen=True)
class Committed:
"""The platform committed offsets. Our buffer is now completely done."""


@dataclass(frozen=True)
class Flush:
"""Our application hit the flush threshold and has been instructed to flush."""

buffer: list[ProcessedRecordingMessage]


@dataclass(frozen=True)
class Flushed:
"""Our application successfully flushed."""

now: float
result: Result[float, list[bool]]


class Skip:
Expand Down Expand Up @@ -144,14 +150,24 @@ def update(model: Model, msg: Msg) -> tuple[Model, Cmd[Msg, None]]:
return (model, Effect(fun=time.time, msg=TryFlush))
case Committed():
return (model, Nothing())
case Flush():
return (model, Effect(fun=FlushBuffer(model), msg=Flushed))
case Flushed(now=now):
model.buffer = []
model.last_flushed_at = now
return (model, Commit(msg=Committed(), value=None))
case Flush(buffer=buffer):
return (model, Effect(fun=FlushBuffer(buffer, model.max_workers), msg=Flushed))
case Flushed(result=result):
if result.is_ok:
value = result.unwrap()
model.buffer = []
model.last_flushed_at = value
model.retries = 0
return (model, Commit(msg=Committed(), value=None))
else:
buffer = [item for item, error in zip(model.buffer, result.unwrap_err()) if error]
logger.info("[FLUSHED] Retrying %d/%d messages.", len(buffer), len(model.buffer))
return (model, Task(msg=Flush(buffer=buffer)))
case TryFlush(now=now):
return (model, Task(msg=Flush())) if can_flush(model, now) else (model, Nothing())
if can_flush(model, now):
return (model, Task(msg=Flush(buffer=model.buffer)))
else:
return (model, Nothing())


def subscription(model: Model) -> list[Sub[Msg]]:
Expand All @@ -162,7 +178,7 @@ def subscription(model: Model) -> list[Sub[Msg]]:
the runtime will inform us (the application) so we can handle the situation approporiately.
"""
return [
Join(msg=Flush),
Join(msg=lambda: Flush(model.buffer)),
Poll(msg=lambda: TryFlush(now=time.time())),
]

Expand All @@ -179,38 +195,44 @@ def can_flush(model: Model, now: float) -> bool:

@dataclass(frozen=True)
class FlushBuffer:
model: Model
buffer: list[ProcessedRecordingMessage]
max_workers: int

def __call__(self) -> float:
def __call__(self) -> Result[float, list[bool]]:
@sentry_sdk.trace
def flush_message(message: ProcessedRecordingMessage) -> None:
with contextlib.suppress(DropSilently):
commit_recording_message(message)

if len(self.model.buffer) == 0:
return time.time()
if len(self.buffer) == 0:
return Result.ok(time.time())

with ThreadPoolExecutor(max_workers=self.model.max_workers) as pool:
futures = [pool.submit(flush_message, message) for message in self.model.buffer]

# Tasks can fail. We check the done set for any failures. We will wait for all the
# futures to complete before running this step or eagerly run this step if any task
# errors.
done, _ = wait(futures, return_when=FIRST_EXCEPTION)
for future in done:
exc = future.exception()
# Raising preserves the existing behavior. We can address error handling in a
# follow up.
if exc is not None and not isinstance(exc, DropSilently):
raise exc
with ThreadPoolExecutor(max_workers=self.max_workers) as pool:
waiter = wait(pool.submit(flush_message, message) for message in self.buffer)
errors = [future.exception() for future in waiter.done]

# Recording metadata is not tracked in the threadpool. This is because this function will
# log. Logging will acquire a lock and make our threading less useful due to the speed of
# the I/O we do in this step.
for message in self.model.buffer:
track_recording_metadata(message)

return time.time()
for message, error in zip(self.buffer, errors):
if error is None:
track_recording_metadata(message)

errs = []
for error in errors:
if isinstance(error, GCS_RETRYABLE_ERRORS):
errs.append(True)
elif error is None:
errs.append(False)
else:
# Unhandled exceptions are logged and do not block ingestion.
logger.error("Unhandled error in flush buffer.", exc_info=error)
errs.append(False)

if any(errs):
return Result.err(errs)

return Result.ok(time.time())


# Consumer.
Expand Down
52 changes: 52 additions & 0 deletions src/sentry/replays/consumers/buffered/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from typing import Generic, TypeVar

T = TypeVar("T") # Success type
E = TypeVar("E") # Error type


class Result(Generic[T, E]):
"""A Result type that can either contain a success value of type T or an error value of type E."""

def __init__(self, value: T | None = None, error: E | None = None):
if (value is None and error is None) or (value is not None and error is not None):
raise ValueError("Result must have either a value or an error, but not both")
self._value = value
self._error = error

@property
def is_ok(self) -> bool:
"""Returns True if the result contains a success value."""
return self._value is not None

@property
def is_err(self) -> bool:
"""Returns True if the result contains an error value."""
return self._error is not None

def unwrap(self) -> T:
"""Returns the success value if present, raises ValueError otherwise."""
if self._value is None:
raise ValueError(f"Cannot unwrap error result: {self._error}")
return self._value

def unwrap_err(self) -> E:
"""Returns the error value if present, raises ValueError otherwise."""
if self._error is None:
raise ValueError(f"Cannot unwrap ok result: {self._value}")
return self._error

@classmethod
def ok(cls, value: T) -> "Result[T, E]":
"""Creates a new Result with a success value."""
return cls(value=value)

@classmethod
def err(cls, error: E) -> "Result[T, E]":
"""Creates a new Result with an error value."""
return cls(error=error)

def map(self, f: callable[[T], T]) -> "Result[T, E]":
"""Applies a function to the success value if present."""
if self.is_ok:
return Result.ok(f(self._value))
return self
24 changes: 24 additions & 0 deletions tests/sentry/replays/unit/consumers/test_recording.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
Committed,
Flush,
FlushBuffer,
Flushed,
Skip,
TryFlush,
init,
Expand Down Expand Up @@ -186,6 +187,17 @@ def test_missing_headers():
assert isinstance(gen.send(cmd.msg(cmd.fun())), Nothing)


# def test_flush_failure_gcs_outage():
# ...


# def test_flush_random_exception():
# ...


# def test_flush


def test_buffer_full_semantics():
runtime = _make_runtime()

Expand Down Expand Up @@ -235,6 +247,18 @@ def test_buffer_full_semantics():
assert isinstance(cmd, Task)
assert isinstance(cmd.msg, Flush)

# Trigger the flush and intercept the effect.
cmd = gen.send(cmd.msg)
assert isinstance(cmd, Effect)
assert cmd.fun == FlushBuffer(runtime.model)
assert cmd.msg(1) == Flushed(now=1)

# Record a successful flush.
cmd = gen.send(cmd.msg(now=42))
assert runtime.model.last_flushed_at == 42
assert len(runtime.model.buffer) == 1
assert cmd == Commit(msg=Committed(), value=None)


def test_buffer_timeout():
runtime = _make_runtime()
Expand Down

0 comments on commit e349232

Please sign in to comment.