Skip to content

Commit

Permalink
Rename ReplayProvider to Replayer
Browse files Browse the repository at this point in the history
  • Loading branch information
tmaxmax committed Dec 26, 2024
1 parent 3a757ce commit 1925dfb
Show file tree
Hide file tree
Showing 8 changed files with 126 additions and 122 deletions.
31 changes: 18 additions & 13 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,37 @@

This file tracks changes to this project. It follows the [Keep a Changelog format](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased
## [0.9.0] - 2024-12-26

This is the replayer update. Oh, what is a "replayer"? It's how we call replay providers starting with this version! Anyway, besides renaming, this update removes many replaying bugs, improves performance, robustness and error handling and better defines expected behavior for `ReplayProviders`... err, `Replayers`.

More such overhauls are planned. I'm leaving it up to you to guess which comes next – the server or the client? ;)

### Removed

- `FiniteReplayProvider.{Count, AutoIDs}` – use the constructor instead
- `ValidReplayProvider.{TTL, AutoIDs}` – use the constructor instead
- `FiniteReplayer.{Count, AutoIDs}` – use the constructor instead.
- `ValidReplayer.{TTL, AutoIDs}` – use the constructor instead.

### Changed

- Due to a change in the internal implementation, the `FiniteReplayProvider` is now able to replay events only if the event with the LastEventID provided by the client is still buffered. Previously if the LastEventID was that of the latest removed event, events would still be replayed. This detail added complexity to the implementation without an apparent significant win, so it was dropped.
- `FiniteReplayProvider.GCInterval` should be set to `0` now in order to disable GC.
- Automatic ID generation for both providers does not overwrite already existing message IDs and errors instead. Ensure that your events do not have IDs when using providers configured to generate IDs.
- `ReplayProvider.Put` now returns an error instead of being required to panic. Read the method documentation for more info. `Joe` also propagates this error through `Joe.Publish`.
- Replay providers are now required to not overwrite message IDs and return errors instead. Sending unsupported messages to replay providers is a bug which should not go unnoticed. Both providers in this library now implement this behavior.
- `Joe` does not log replay provider panics to the console anymore. Handle these panics inside the replay provider itself.
- The `ReplayProvider` and related entities are renamed to just `Replayer`. `go-sse` strives to have a minimal and expressive API, and minimal and expressive names are an important step in that direction. The changelog will use the new names onwards.
- Due to a change in the internal implementation, the `FiniteReplayer` is now able to replay events only if the event with the LastEventID provided by the client is still buffered. Previously if the LastEventID was that of the latest removed event, events would still be replayed. This detail added complexity to the implementation without an apparent significant win, so it was dropped.
- `FiniteReplayer.GCInterval` should be set to `0` now in order to disable GC.
- Automatic ID generation for both replayers does not overwrite already existing message IDs and errors instead. Ensure that your events do not have IDs when using replayers configured to generate IDs.
- `Replayer.Put` now returns an error instead of being required to panic. Read the method documentation for more info. `Joe` also propagates this error through `Joe.Publish`.
- Replayers are now required to not overwrite message IDs and return errors instead. Sending unsupported messages to replayers is a bug which should not go unnoticed. Both replayers in this library now implement this behavior.
- `Joe` does not log replayer panics to the console anymore. Handle these panics inside the replay provider itself.

### Added

- `NewFiniteReplayProvider` constructor
- `NewValidReplayProvider` constructor
- `NewFiniteReplayer` constructor
- `NewValidReplayer` constructor
- `Connection.Buffer`

### Fixed

- `FiniteReplayProvider` doesn't leak memory anymore and respects the stored messages count it was given. Previously when a new message was put after the messages count was reached and some other messages were removed, the total messages count would grow unexpectedly and `FiniteReplayProvider` would store and replay more events than it was configured to.
- `ValidReplayProvider` was also susceptible to a similar memory leak, which is also fixed now.
- `FiniteReplayer` doesn't leak memory anymore and respects the stored messages count it was given. Previously when a new message was put after the messages count was reached and some other messages were removed, the total messages count would grow unexpectedly and `FiniteReplayer` would store and replay more events than it was configured to.
- `ValidReplayer` was also susceptible to a similar memory leak, which is also fixed now.
- #41`sse.Session` now writes the header explicitly when upgrading.

## [0.8.0] - 2024-01-30
Expand Down
24 changes: 12 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ import "github.com/tmaxmax/go-sse"
s := &sse.Server{} // zero value ready to use!
```

The `sse.Server` type also implements the `http.Handler` interface, but a server is framework-agnostic: See the [`ServeHTTP` implementation](https://github.com/tmaxmax/go-sse/blob/master/server/server.go#L136) to learn how to implement your own custom logic. It also has some additional configuration options:
The `sse.Server` type also implements the `http.Handler` interface, but a server is framework-agnostic: See the [`ServeHTTP` implementation](https://github.com/tmaxmax/go-sse/blob/master/server/server.go#L156) to learn how to implement your own custom logic. It also has some additional configuration options:

```go
s := &sse.Server{
Expand Down Expand Up @@ -99,10 +99,10 @@ import "github.com/tmaxmax/go-sse"
joe := &sse.Joe{} // the zero value is ready to use!
```

and he'll dispatch events all day! By default, he has no memory of what events he has received, but you can help him remember and replay older messages to new clients using a `ReplayProvider`:
and he'll dispatch events all day! By default, he has no memory of what events he has received, but you can help him remember and replay older messages to new clients using a `Replayer`:

```go
type ReplayProvider interface {
type Replayer interface {
// Put a new event in the provider's buffer.
// If the provider automatically adds IDs aswell,
// the returned message will also have the ID set,
Expand All @@ -113,24 +113,24 @@ type ReplayProvider interface {
}
```

`go-sse` provides two replay providers by default, which both hold the events in-memory: the `ValidReplayProvider` and `FiniteReplayProvider`. The first replays events that are valid, not expired, the second replays a finite number of the most recent events. For example:
`go-sse` provides two replayers by default, which both hold the events in-memory: the `ValidReplayer` and `FiniteReplayer`. The first replays events that are valid, not expired, the second replays a finite number of the most recent events. For example:

```go
// Let's have events expire after 5 minutes. For this example we don't enable automatic ID generation.
rp, err := sse.NewValidReplayProvider(time.Minute * 5, false)
r, err := sse.NewValidReplayer(time.Minute * 5, false)
if err != nil {
// TTL was 0 or negative.
// Useful to have this error if the value comes from a config which happens to be faulty.
}

joe = &sse.Joe{ReplayProvider: rp}
joe = &sse.Joe{Replayer: r}
```

will tell Joe to replay all valid events! Replay providers can do so much more (for example, add IDs to events automatically): read the [docs][3] on how to use the existing ones and how to implement yours.
will tell Joe to replay all valid events! Replayers can do so much more (for example, add IDs to events automatically): read the [docs][3] on how to use the existing ones and how to implement yours.

You can also implement your own replay providers: maybe you need persistent storage for your events? Or event validity is determined based on other criterias than expiry time? And if you think your replay provider may be useful to others, you are encouraged to share it!
You can also implement your own replayers: maybe you need persistent storage for your events? Or event validity is determined based on other criterias than expiry time? And if you think your replayer may be useful to others, you are encouraged to share it!

`go-sse` created the `ReplayProvider` interface mainly for `Joe`, but it encourages you to integrate it with your own `Provider` implementations, where suitable.
`go-sse` created the `Replayer` interface mainly for `Joe`, but it encourages you to integrate it with your own `Provider` implementations, where suitable.

### Publish your first event

Expand Down Expand Up @@ -161,7 +161,7 @@ data: to see you.

You can also see that `go-sse` takes care of splitting input by lines into new fields, as required by the specification.

Keep in mind that providers, such as the `ValidReplayProvider` used above, will give an error for and won't replay the events without an ID (unless, of course, they give the IDs themselves). To have our event expire, as configured, we must set an ID for the event:
Keep in mind that replayers, such as the `ValidReplayer` used above, will give an error for and won't replay the events without an ID (unless, of course, they give the IDs themselves). To have our event expire, as configured, we must set an ID for the event:

```go
m.ID = sse.ID("unique")
Expand Down Expand Up @@ -309,7 +309,7 @@ unsubscribe := conn.SubscribeToAll(func (event sse.Event) {
})
```

All `Susbcribe` methods return a function that when called tells the connection to stop calling the corresponding callback.
All `Subscribe` methods return a function that when called tells the connection to stop calling the corresponding callback.

In order to work with events, the `sse.Event` type has some fields and methods exposed:

Expand Down Expand Up @@ -442,7 +442,7 @@ Thank you for contributing!

[1]: https://github.com/cenkalti/backoff
[2]: https://pkg.go.dev/github.com/tmaxmax/go-sse#Provider
[3]: https://pkg.go.dev/github.com/tmaxmax/go-sse#ReplayProvider
[3]: https://pkg.go.dev/github.com/tmaxmax/go-sse#Replayer
[4]: https://pkg.go.dev/github.com/tmaxmax/go-sse#Message
[5]: https://pkg.go.dev/github.com/tmaxmax/go-sse#Client
[6]: https://pkg.go.dev/github.com/tmaxmax/go-sse#Event
4 changes: 2 additions & 2 deletions cmd/complex/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ const (
)

func newSSE() *sse.Server {
rp, _ := sse.NewValidReplayProvider(time.Minute*5, true)
rp, _ := sse.NewValidReplayer(time.Minute*5, true)
rp.GCInterval = time.Minute

return &sse.Server{
Provider: &sse.Joe{ReplayProvider: rp},
Provider: &sse.Joe{Replayer: rp},
Logger: logger{log.New(os.Stderr, "", 0)},
OnSession: func(s *sse.Session) (sse.Subscription, bool) {
topics := s.Req.URL.Query()["topic"]
Expand Down
62 changes: 31 additions & 31 deletions joe.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,18 @@ import (
"sync"
)

// A ReplayProvider is a type that can replay older published events to new subscribers.
// Replay providers use event IDs, the topics the events were published to and optionally
// the events' expiration times or any other criteria to determine which are valid for replay.
// A Replayer is a type that can replay older published events to new subscribers.
// Replayers use event IDs, the topics the events were published and optionally
// any other criteria to determine which are valid for replay.
//
// While providers can require events to have IDs beforehand, they can also set the IDs themselves,
// automatically - it's up to the implementation. Providers should not overwrite or remove any existing
// While replayers can require events to have IDs beforehand, they can also set the IDs themselves,
// automatically - it's up to the implementation. Replayers should not overwrite or remove any existing
// IDs and return an error instead.
//
// Replay providers are not required to be thread-safe - server providers are required to ensure only
// one operation is executed on the replay provider at any given time. Server providers may not execute
// replay operation concurrently with other operations, so make sure any action on the replay provider
// blocks for as little as possible. If a replay provider is thread-safe, some operations may be
// Replayers are not required to be thread-safe - server providers are required to ensure only
// one operation is executed on the replayer at any given time. Server providers may not execute
// replay operation concurrently with other operations, so make sure any action on the replayer
// blocks for as little as possible. If a replayer is thread-safe, some operations may be
// run in a separate goroutine - see the interface's method documentation.
//
// Executing actions that require waiting for a long time on I/O, such as HTTP requests or database
Expand All @@ -25,30 +25,30 @@ import (
// recommended, as long as the implementation fulfills the requirements.
//
// If not specified otherwise, the errors returned are implementation-specific.
type ReplayProvider interface {
type Replayer interface {
// Put adds a new event to the replay buffer. The Message that is returned may not have the
// same address, if the replay provider automatically sets IDs.
// same address, if the replayer automatically sets IDs.
//
// Put errors if the message couldn't be queued – if no topics are provided,
// a message without an ID is put into a ReplayProvider which does not
// automatically set IDs, or a message with an ID is put into a ReplayProvider which
// a message without an ID is put into a Replayer which does not
// automatically set IDs, or a message with an ID is put into a Replayer which
// does automatically set IDs. An error should be returned for other failures
// related to the given message. When no topics are provided, ErrNoTopic should be
// returned.
//
// The Put operation may be executed by the replay provider in another goroutine only if
// The Put operation may be executed by the replayer in another goroutine only if
// it can ensure that any Replay operation called after the Put goroutine is started
// can replay the new received message. This also requires the replay provider implementation
// can replay the new received message. This also requires the replayer implementation
// to be thread-safe.
//
// Replay providers are not required to guarantee that immediately after Put returns
// Replayers are not required to guarantee that immediately after Put returns
// the new messages can be replayed. If an error occurs internally when putting the new message
// and retrying the operation would block for too long, it can be aborted.
//
// To indicate a complete replay provider failure (i.e. the replay provider won't work after this point)
// To indicate a complete replayer failure (i.e. the replayer won't work after this point)
// a panic should be used instead of an error.
Put(message *Message, topics []string) (*Message, error)
// Replay sends to a new subscriber all the valid events received by the provider
// Replay sends to a new subscriber all the valid events received by the replayer
// since the event with the listener's ID. If the ID the listener provides
// is invalid, the provider should not replay any events.
//
Expand Down Expand Up @@ -86,10 +86,10 @@ type (
// Events are also sent synchronously to subscribers, so if a subscriber's callback blocks, the others
// have to wait.
//
// Joe optionally supports event replaying with the help of a replay provider.
// Joe optionally supports event replaying with the help of a Replayer.
//
// If the replay provider panics, the subscription for which it panicked is considered failed
// and an error is returned, and thereafter the replay provider is not used anymore – no replays
// If the replayer panics, the subscription for which it panicked is considered failed
// and an error is returned, and thereafter the replayer is not used anymore – no replays
// will be attempted for future subscriptions.
// If due to some other unexpected scenario something panics internally, Joe will remove all subscribers
// and close itself, so subscribers don't end up blocked.
Expand All @@ -104,8 +104,8 @@ type Joe struct {
closed chan struct{}
subscribers map[subscriber]Subscription

// An optional replay provider that Joe uses to resend older messages to new subscribers.
ReplayProvider ReplayProvider
// An optional replayer that Joe uses to resend older messages to new subscribers.
Replayer Replayer

initDone sync.Once
}
Expand Down Expand Up @@ -145,8 +145,8 @@ func (j *Joe) Subscribe(ctx context.Context, sub Subscription) error {
// receives each unique message once, regardless of how many topics it
// is subscribed to or to how many topics the message is published.
//
// It returns ErrNoTopic if no topics are provided, eventual ReplayProvider.Put
// errors or ErrProviderClosed. If the replay provider returns an error the
// It returns ErrNoTopic if no topics are provided, eventual Replayer.Put
// errors or ErrProviderClosed. If the replayer returns an error the
// message will still be sent but most probably it won't be replayed to
// new subscribers, depending on how the error is handled by the replay provider.
func (j *Joe) Publish(msg *Message, topics []string) error {
Expand Down Expand Up @@ -200,7 +200,7 @@ func (j *Joe) removeSubscriber(sub subscriber) {
close(sub)
}

func (j *Joe) start(replay ReplayProvider) {
func (j *Joe) start(replay Replayer) {
defer close(j.closed)
// defer closing all subscribers instead of closing them when done is closed
// so in case of a panic subscribers won't block the request goroutines forever.
Expand Down Expand Up @@ -267,13 +267,13 @@ func (j *Joe) closeSubscribers() {
}
}

func tryReplay(sub Subscription, replay *ReplayProvider) (err error) { //nolint:gocritic // intended
func tryReplay(sub Subscription, replay *Replayer) (err error) { //nolint:gocritic // intended
defer handleReplayerPanic(replay, &err)

return (*replay).Replay(sub)
}

func tryPut(msg messageWithTopics, replay *ReplayProvider) (m *Message, err error) { //nolint:gocritic // intended
func tryPut(msg messageWithTopics, replay *Replayer) (m *Message, err error) { //nolint:gocritic // intended
defer handleReplayerPanic(replay, &err)

return (*replay).Put(msg.message, msg.topics)
Expand All @@ -283,7 +283,7 @@ type replayPanic struct{}

func (replayPanic) Error() string { return "replay provider panicked" }

func handleReplayerPanic(replay *ReplayProvider, errp *error) { //nolint:gocritic // intended
func handleReplayerPanic(replay *Replayer, errp *error) { //nolint:gocritic // intended
if r := recover(); r != nil {
*replay = nil
*errp = replayPanic{}
Expand All @@ -299,9 +299,9 @@ func (j *Joe) init() {
j.closed = make(chan struct{})
j.subscribers = map[subscriber]Subscription{}

replay := j.ReplayProvider
replay := j.Replayer
if replay == nil {
replay = noopReplayProvider{}
replay = noopReplayer{}
}
go j.start(replay)
})
Expand Down
Loading

0 comments on commit 1925dfb

Please sign in to comment.