Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(replays): Add buffered consumer implementation #85356

Draft
wants to merge 61 commits into
base: master
Choose a base branch
from

Conversation

cmanallen
Copy link
Member

@cmanallen cmanallen commented Feb 18, 2025

Edit: Out of time for now. Converting to a draft. Will pick this up at a later date.


TODO description...

Partially addresses the DACI: https://www.notion.so/sentry/DACI-Session-Replay-Recording-Consumer-Stability-and-Performance-Improvements-19e8b10e4b5d80a192a1ecd46f13eebb

How it works:

  • As messages come in they are processed and their processed results are stored on a queue.
  • When the queue fills up the processed messages are flushed.
    • Flushing involves committing data to GCS, ClickHouse, BigQuery, DataDog.
    • Anything I/O related.
  • Flushing happens in a thread-pool.

This closely mirrors current production behavior except processing is no longer done in the thread-pool.

This PR also introduces a new RunTime abstraction for managing state changes in the consumer. Which I will document in a Notion doc.

@github-actions github-actions bot added the Scope: Backend Automatically applied to PRs that change backend components label Feb 18, 2025
Copy link

codecov bot commented Feb 20, 2025

❌ 2 Tests Failed:

Tests completed Failed Passed Skipped
324 2 322 10
View the top 2 failed test(s) by shortest run time
::tests.sentry.replays.consumers.test_recording
Stack Traces | 0s run time
#x1B[1m#x1B[.../replays/consumers/test_recording.py#x1B[0m:16: in <module>
    from sentry.replays.consumers.buffered.factory import PlatformStrategyFactory
#x1B[1m#x1B[.../consumers/buffered/factory.py#x1B[0m:14: in <module>
    from sentry.replays.consumers.buffered.consumer import Flags, recording_consumer
#x1B[1m#x1B[.../consumers/buffered/consumer.py#x1B[0m:29: in <module>
    from sentry.replays.consumers.buffered.types import Result
#x1B[1m#x1B[.../consumers/buffered/types.py#x1B[0m:7: in <module>
    class Result(Generic[T, E]):
#x1B[1m#x1B[.../consumers/buffered/types.py#x1B[0m:48: in Result
    def map(self, f: callable[[T], T]) -> "Result[T, E]":
#x1B[1m#x1B[31mE   TypeError: 'builtin_function_or_method' object is not subscriptable#x1B[0m
::tests.sentry.replays.unit.consumers.test_recording
Stack Traces | 0s run time
#x1B[1m#x1B[.../unit/consumers/test_recording.py#x1B[0m:8: in <module>
    from sentry.replays.consumers.buffered.consumer import (
#x1B[1m#x1B[.../consumers/buffered/consumer.py#x1B[0m:29: in <module>
    from sentry.replays.consumers.buffered.types import Result
#x1B[1m#x1B[.../consumers/buffered/types.py#x1B[0m:7: in <module>
    class Result(Generic[T, E]):
#x1B[1m#x1B[.../consumers/buffered/types.py#x1B[0m:48: in Result
    def map(self, f: callable[[T], T]) -> "Result[T, E]":
#x1B[1m#x1B[31mE   TypeError: 'builtin_function_or_method' object is not subscriptable#x1B[0m

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

Copy link
Contributor

@fpacifici fpacifici left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would advise against building a consumer with this abstraction and make it work on top of Arroyo.

  • Arroyo provides an abstraction level that is similar to the one provided by your PlatformStrategy. The two, though, go in fundamentally different directions. Building one on top of the other seems to create a quite complex architecture. It is quite hard to intuitively tell whether the system guarantees at least once delivery.
  • We will not be able to move parts of the processing on a multi process pool in case it is needed for scale as that qould require using the RunTask abstraction.
  • As we discussed in the past. I believe the dataflow model (even the small subset provided by arroyo) makes this kind of applications simpler to understand due to the sequential nature of the pipeline.

Comment on lines 27 to 32
def create_with_partitions(
self,
commit: ArroyoCommit,
partitions: Mapping[Partition, int],
) -> ProcessingStrategy[KafkaPayload]:
return PlatformStrategy(commit=commit, flags=self.flags, runtime=recording_runtime)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be considerably simpler to model this consumer as a sequence of these Arroyo operators:

Modeling the system this way would:

  • allow the parallelism either via processes and threads without application logic changes
  • guarantee a pipeline approach that allows the batching step to keep batching new messages while the worker thread performs its work.
  • hide offset management entirely from the application code.

Comment on lines 57 to 63
def can_flush(self, model: Model[ProcessedRecordingMessage]) -> bool:
# TODO: time.time is stateful and hard to test. We should enable the RunTime to perform
# managed effects so we can properly test this behavior.
return (
len(model.buffer) >= self.__max_buffer_length
or (time.time() - self.__max_buffer_wait) >= self.__last_flushed_at
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arroyo primitives manage these kind of concerns for you (when to flush a batch for example). Are you sure about the idea of pushing them into the product code instead ?

@cmanallen
Copy link
Member Author

@fpacifici Thanks for the feedback! Let me preface by saying when I opened this PR I did so for three reasons:

  1. To demonstrate the behaviors I outlined in my streaming platform proposal so you would have something tangible to reference.
  2. To explore new ideas as part of a research project. The goal is exploration so using Arroyo primitives would have gone against my goals.
  3. I'm trying to solve a real problem from a pre-determined starting point with constraints on what can change and how fast.

So there are practical concerns intermixed with educational and exploratory concerns. I've since dropped the educational component and simplified what I'm doing in the PR with this morning's commits (it mostly doesn't impact your review -- but it did simplify the consumer implementation so it might be worth a second look). Hopefully that explains why I made some of the choices I did.

Concerns Around Committing

I agree completely. They were a pain to manage and it was never necessary to manage them in application code (for my use case at least). I moved the offset handling into the RunTime where they're now managed at the platform level.

Why Not Use Arroyo

Its a research project so we're trying something new. But, critically, this is Arroyo. Its not a full implementation but it is an Arroyo strategy. That means I can prefix the step with any number of streaming primitives and I could suffix the step with any number of streaming primitives (could being the operative word because I'm currently hard coding the commit step as the next step -- that's an easy fix so I'm ignoring this oversight).

This RunTime strategy is a generalization of all Arroyo strategies so its not surprising that specialized strategies exist that can solve components of this pipeline. Research aside I do want this to go to production and I'll mention why at the bottom.


So the three concerns I'm keying in on are:

  1. allow the parallelism either via processes and threads without application logic changes
  2. guarantee a pipeline approach that allows the batching step to keep batching new messages while the worker thread performs its work.
  3. hide offset management entirely from the application code.

Three is gone; I've removed it. One is partially solved I think. We can prefix the RunTime step with a multi-threading/processing step. I'm not sure if the RunTime can be embedded into those steps so that may be a shortcoming. That's an area I could look into if this was ever an important component of Arroyo.

Two is not solved as far as I'm aware. I wrote the Buffer strategy in Arroyo and the Reduce strategy implements the Buffer strategy. I'm not aware how those strategies flush their buffers in a worker thread. As far as I know they block the main thread. But if that's not the case and, there is some platform magic happening, then I don't see why the RunTime strategy couldn't also have the benefit of flushing off the main thread.


I want this to go to production; why? tl;dr I can now unit-test my consumer end-to-end.

Testing is a huge concern for me. I've refactored this consumer in the past and its led to production outages (using Arroyo streaming primitives as it happens -- which I don't blame).

There are minor things that Arroyo does that can make testing more difficult. For example in the Reduce strategy we call time.time(). That can make unit-testing difficult. You have to mock or just not test certain behaviors which is not ideal. However, there's a larger problem that I'm trying to address and that's the difficulty in testing how state is threaded through a consumer. Arroyo does not provide any facilities for this and my PR did not have any until this morning.

One of the benefits of managing the state machine in the way I have is that I can intercept the commands being issued by my application and rewrite them. You can see in the MockRunTime class I'm using coroutines to rewrite commands in my test suite. This gives me a lot of insight into what the application is doing and the ability to redirect behavior in a way that does not require monkey patching. I can deterministically simulate all possible states and assert the outcome very cheaply.

My implementation isn't perfect (I haven't abstracted all state yet) but there's already been a significant uplift in what I'm capable of asserting about my software.

There are other reasons but this has already gotten too long so I'll leave it there. Let me know if I addressed your concerns well enough and thanks again for taking the time to review this!

@fpacifici
Copy link
Contributor

fpacifici commented Mar 4, 2025

I want this to go to production; why? tl;dr I can now unit-test my consumer end-to-end.

This is the main reason that prompted me to express my concerns on this PR.
Replays is a high tier production service. Somebody will have to be on-call for this and guarantee its stability and scale to meet the SLOs.

Arroyo primitives have been designed over time to ensuring certain failover guarantees, provide the observability needed to troubleshoot production issues and prevent certain patterns.
In practice SRE (soon platform) can be on call for consumers they did not write because it is reasonably possible to troubleshoot the issues from what arroyo provides. This works as long as consumers follow the patterns provided by Arroyo.
A considerably different architecture will break those assumptions unless the failure modes and observability needs are experienced before in other environments.

Consider one of the following:

  • take the pager for it. This can be arranged with SRE.
  • test it in the sandbox environment under synthetic generated load
  • deploy this in S4S only for enough time to understand how to manage this consumer.

@cmanallen
Copy link
Member Author

@fpacifici

take the pager for it

😧 I'll implement this strictly in Arroyo primitives. Thank you for your time.

Jokes aside this is a great point "Replays is a high tier production service. Somebody will have to be on-call for this and guarantee its stability and scale to meet the SLOs.".

The consumer as it exists now could absolutely be implemented in Arroyo primitives. I'm implementing it differently because in the future I want to have state which persists between flushes and I wanted really strong unit testing. However, I can accomplish these goals through other means (using a global, using slower integration tests, using monkey-patching, etc.).

If you tell me that a pager is required for this to live in prod then I can abandon this RunTime implementation and use the Reduce step. I don't want to be a burden on SRE and I can't take on the responsibility of a pager.

test it in the sandbox environment under synthetic generated load
deploy this in S4S only for enough time to understand how to manage this consumer.

Notion doc for the sandbox? And testing in S4S first is my intention.

@cmanallen cmanallen marked this pull request as draft March 4, 2025 22:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Scope: Backend Automatically applied to PRs that change backend components
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants