From 1925dfb466513baabc623fd91bd245a6d98c98c9 Mon Sep 17 00:00:00 2001 From: Teodor Maxim <57960185+tmaxmax@users.noreply.github.com> Date: Thu, 26 Dec 2024 17:28:33 +0200 Subject: [PATCH] Rename `ReplayProvider` to `Replayer` --- CHANGELOG.md | 31 +++++++++------- README.md | 24 ++++++------ cmd/complex/main.go | 4 +- joe.go | 62 +++++++++++++++---------------- joe_test.go | 36 +++++++++--------- replay_provider.go => replay.go | 65 ++++++++++++++++----------------- replay_test.go | 22 +++++------ server.go | 4 +- 8 files changed, 126 insertions(+), 122 deletions(-) rename replay_provider.go => replay.go (75%) diff --git a/CHANGELOG.md b/CHANGELOG.md index 050d5e7..02b4c98 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,32 +2,37 @@ This file tracks changes to this project. It follows the [Keep a Changelog format](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## Unreleased +## [0.9.0] - 2024-12-26 + +This is the replayer update. Oh, what is a "replayer"? It's how we call replay providers starting with this version! Anyway, besides renaming, this update removes many replaying bugs, improves performance, robustness and error handling and better defines expected behavior for `ReplayProviders`... err, `Replayers`. + +More such overhauls are planned. I'm leaving it up to you to guess which comes next – the server or the client? ;) ### Removed -- `FiniteReplayProvider.{Count, AutoIDs}` – use the constructor instead -- `ValidReplayProvider.{TTL, AutoIDs}` – use the constructor instead +- `FiniteReplayer.{Count, AutoIDs}` – use the constructor instead. +- `ValidReplayer.{TTL, AutoIDs}` – use the constructor instead. ### Changed -- Due to a change in the internal implementation, the `FiniteReplayProvider` is now able to replay events only if the event with the LastEventID provided by the client is still buffered. Previously if the LastEventID was that of the latest removed event, events would still be replayed. This detail added complexity to the implementation without an apparent significant win, so it was dropped. -- `FiniteReplayProvider.GCInterval` should be set to `0` now in order to disable GC. -- Automatic ID generation for both providers does not overwrite already existing message IDs and errors instead. Ensure that your events do not have IDs when using providers configured to generate IDs. -- `ReplayProvider.Put` now returns an error instead of being required to panic. Read the method documentation for more info. `Joe` also propagates this error through `Joe.Publish`. -- Replay providers are now required to not overwrite message IDs and return errors instead. Sending unsupported messages to replay providers is a bug which should not go unnoticed. Both providers in this library now implement this behavior. -- `Joe` does not log replay provider panics to the console anymore. Handle these panics inside the replay provider itself. +- The `ReplayProvider` and related entities are renamed to just `Replayer`. `go-sse` strives to have a minimal and expressive API, and minimal and expressive names are an important step in that direction. The changelog will use the new names onwards. +- Due to a change in the internal implementation, the `FiniteReplayer` is now able to replay events only if the event with the LastEventID provided by the client is still buffered. Previously if the LastEventID was that of the latest removed event, events would still be replayed. This detail added complexity to the implementation without an apparent significant win, so it was dropped. +- `FiniteReplayer.GCInterval` should be set to `0` now in order to disable GC. +- Automatic ID generation for both replayers does not overwrite already existing message IDs and errors instead. Ensure that your events do not have IDs when using replayers configured to generate IDs. +- `Replayer.Put` now returns an error instead of being required to panic. Read the method documentation for more info. `Joe` also propagates this error through `Joe.Publish`. +- Replayers are now required to not overwrite message IDs and return errors instead. Sending unsupported messages to replayers is a bug which should not go unnoticed. Both replayers in this library now implement this behavior. +- `Joe` does not log replayer panics to the console anymore. Handle these panics inside the replay provider itself. ### Added -- `NewFiniteReplayProvider` constructor -- `NewValidReplayProvider` constructor +- `NewFiniteReplayer` constructor +- `NewValidReplayer` constructor - `Connection.Buffer` ### Fixed -- `FiniteReplayProvider` doesn't leak memory anymore and respects the stored messages count it was given. Previously when a new message was put after the messages count was reached and some other messages were removed, the total messages count would grow unexpectedly and `FiniteReplayProvider` would store and replay more events than it was configured to. -- `ValidReplayProvider` was also susceptible to a similar memory leak, which is also fixed now. +- `FiniteReplayer` doesn't leak memory anymore and respects the stored messages count it was given. Previously when a new message was put after the messages count was reached and some other messages were removed, the total messages count would grow unexpectedly and `FiniteReplayer` would store and replay more events than it was configured to. +- `ValidReplayer` was also susceptible to a similar memory leak, which is also fixed now. - #41 – `sse.Session` now writes the header explicitly when upgrading. ## [0.8.0] - 2024-01-30 diff --git a/README.md b/README.md index a297a48..1b1649a 100644 --- a/README.md +++ b/README.md @@ -55,7 +55,7 @@ import "github.com/tmaxmax/go-sse" s := &sse.Server{} // zero value ready to use! ``` -The `sse.Server` type also implements the `http.Handler` interface, but a server is framework-agnostic: See the [`ServeHTTP` implementation](https://github.com/tmaxmax/go-sse/blob/master/server/server.go#L136) to learn how to implement your own custom logic. It also has some additional configuration options: +The `sse.Server` type also implements the `http.Handler` interface, but a server is framework-agnostic: See the [`ServeHTTP` implementation](https://github.com/tmaxmax/go-sse/blob/master/server/server.go#L156) to learn how to implement your own custom logic. It also has some additional configuration options: ```go s := &sse.Server{ @@ -99,10 +99,10 @@ import "github.com/tmaxmax/go-sse" joe := &sse.Joe{} // the zero value is ready to use! ``` -and he'll dispatch events all day! By default, he has no memory of what events he has received, but you can help him remember and replay older messages to new clients using a `ReplayProvider`: +and he'll dispatch events all day! By default, he has no memory of what events he has received, but you can help him remember and replay older messages to new clients using a `Replayer`: ```go -type ReplayProvider interface { +type Replayer interface { // Put a new event in the provider's buffer. // If the provider automatically adds IDs aswell, // the returned message will also have the ID set, @@ -113,24 +113,24 @@ type ReplayProvider interface { } ``` -`go-sse` provides two replay providers by default, which both hold the events in-memory: the `ValidReplayProvider` and `FiniteReplayProvider`. The first replays events that are valid, not expired, the second replays a finite number of the most recent events. For example: +`go-sse` provides two replayers by default, which both hold the events in-memory: the `ValidReplayer` and `FiniteReplayer`. The first replays events that are valid, not expired, the second replays a finite number of the most recent events. For example: ```go // Let's have events expire after 5 minutes. For this example we don't enable automatic ID generation. -rp, err := sse.NewValidReplayProvider(time.Minute * 5, false) +r, err := sse.NewValidReplayer(time.Minute * 5, false) if err != nil { // TTL was 0 or negative. // Useful to have this error if the value comes from a config which happens to be faulty. } -joe = &sse.Joe{ReplayProvider: rp} +joe = &sse.Joe{Replayer: r} ``` -will tell Joe to replay all valid events! Replay providers can do so much more (for example, add IDs to events automatically): read the [docs][3] on how to use the existing ones and how to implement yours. +will tell Joe to replay all valid events! Replayers can do so much more (for example, add IDs to events automatically): read the [docs][3] on how to use the existing ones and how to implement yours. -You can also implement your own replay providers: maybe you need persistent storage for your events? Or event validity is determined based on other criterias than expiry time? And if you think your replay provider may be useful to others, you are encouraged to share it! +You can also implement your own replayers: maybe you need persistent storage for your events? Or event validity is determined based on other criterias than expiry time? And if you think your replayer may be useful to others, you are encouraged to share it! -`go-sse` created the `ReplayProvider` interface mainly for `Joe`, but it encourages you to integrate it with your own `Provider` implementations, where suitable. +`go-sse` created the `Replayer` interface mainly for `Joe`, but it encourages you to integrate it with your own `Provider` implementations, where suitable. ### Publish your first event @@ -161,7 +161,7 @@ data: to see you. You can also see that `go-sse` takes care of splitting input by lines into new fields, as required by the specification. -Keep in mind that providers, such as the `ValidReplayProvider` used above, will give an error for and won't replay the events without an ID (unless, of course, they give the IDs themselves). To have our event expire, as configured, we must set an ID for the event: +Keep in mind that replayers, such as the `ValidReplayer` used above, will give an error for and won't replay the events without an ID (unless, of course, they give the IDs themselves). To have our event expire, as configured, we must set an ID for the event: ```go m.ID = sse.ID("unique") @@ -309,7 +309,7 @@ unsubscribe := conn.SubscribeToAll(func (event sse.Event) { }) ``` -All `Susbcribe` methods return a function that when called tells the connection to stop calling the corresponding callback. +All `Subscribe` methods return a function that when called tells the connection to stop calling the corresponding callback. In order to work with events, the `sse.Event` type has some fields and methods exposed: @@ -442,7 +442,7 @@ Thank you for contributing! [1]: https://github.com/cenkalti/backoff [2]: https://pkg.go.dev/github.com/tmaxmax/go-sse#Provider -[3]: https://pkg.go.dev/github.com/tmaxmax/go-sse#ReplayProvider +[3]: https://pkg.go.dev/github.com/tmaxmax/go-sse#Replayer [4]: https://pkg.go.dev/github.com/tmaxmax/go-sse#Message [5]: https://pkg.go.dev/github.com/tmaxmax/go-sse#Client [6]: https://pkg.go.dev/github.com/tmaxmax/go-sse#Event diff --git a/cmd/complex/main.go b/cmd/complex/main.go index fdee4a0..9c49fdd 100644 --- a/cmd/complex/main.go +++ b/cmd/complex/main.go @@ -22,11 +22,11 @@ const ( ) func newSSE() *sse.Server { - rp, _ := sse.NewValidReplayProvider(time.Minute*5, true) + rp, _ := sse.NewValidReplayer(time.Minute*5, true) rp.GCInterval = time.Minute return &sse.Server{ - Provider: &sse.Joe{ReplayProvider: rp}, + Provider: &sse.Joe{Replayer: rp}, Logger: logger{log.New(os.Stderr, "", 0)}, OnSession: func(s *sse.Session) (sse.Subscription, bool) { topics := s.Req.URL.Query()["topic"] diff --git a/joe.go b/joe.go index 2858846..40af0ac 100644 --- a/joe.go +++ b/joe.go @@ -5,18 +5,18 @@ import ( "sync" ) -// A ReplayProvider is a type that can replay older published events to new subscribers. -// Replay providers use event IDs, the topics the events were published to and optionally -// the events' expiration times or any other criteria to determine which are valid for replay. +// A Replayer is a type that can replay older published events to new subscribers. +// Replayers use event IDs, the topics the events were published and optionally +// any other criteria to determine which are valid for replay. // -// While providers can require events to have IDs beforehand, they can also set the IDs themselves, -// automatically - it's up to the implementation. Providers should not overwrite or remove any existing +// While replayers can require events to have IDs beforehand, they can also set the IDs themselves, +// automatically - it's up to the implementation. Replayers should not overwrite or remove any existing // IDs and return an error instead. // -// Replay providers are not required to be thread-safe - server providers are required to ensure only -// one operation is executed on the replay provider at any given time. Server providers may not execute -// replay operation concurrently with other operations, so make sure any action on the replay provider -// blocks for as little as possible. If a replay provider is thread-safe, some operations may be +// Replayers are not required to be thread-safe - server providers are required to ensure only +// one operation is executed on the replayer at any given time. Server providers may not execute +// replay operation concurrently with other operations, so make sure any action on the replayer +// blocks for as little as possible. If a replayer is thread-safe, some operations may be // run in a separate goroutine - see the interface's method documentation. // // Executing actions that require waiting for a long time on I/O, such as HTTP requests or database @@ -25,30 +25,30 @@ import ( // recommended, as long as the implementation fulfills the requirements. // // If not specified otherwise, the errors returned are implementation-specific. -type ReplayProvider interface { +type Replayer interface { // Put adds a new event to the replay buffer. The Message that is returned may not have the - // same address, if the replay provider automatically sets IDs. + // same address, if the replayer automatically sets IDs. // // Put errors if the message couldn't be queued – if no topics are provided, - // a message without an ID is put into a ReplayProvider which does not - // automatically set IDs, or a message with an ID is put into a ReplayProvider which + // a message without an ID is put into a Replayer which does not + // automatically set IDs, or a message with an ID is put into a Replayer which // does automatically set IDs. An error should be returned for other failures // related to the given message. When no topics are provided, ErrNoTopic should be // returned. // - // The Put operation may be executed by the replay provider in another goroutine only if + // The Put operation may be executed by the replayer in another goroutine only if // it can ensure that any Replay operation called after the Put goroutine is started - // can replay the new received message. This also requires the replay provider implementation + // can replay the new received message. This also requires the replayer implementation // to be thread-safe. // - // Replay providers are not required to guarantee that immediately after Put returns + // Replayers are not required to guarantee that immediately after Put returns // the new messages can be replayed. If an error occurs internally when putting the new message // and retrying the operation would block for too long, it can be aborted. // - // To indicate a complete replay provider failure (i.e. the replay provider won't work after this point) + // To indicate a complete replayer failure (i.e. the replayer won't work after this point) // a panic should be used instead of an error. Put(message *Message, topics []string) (*Message, error) - // Replay sends to a new subscriber all the valid events received by the provider + // Replay sends to a new subscriber all the valid events received by the replayer // since the event with the listener's ID. If the ID the listener provides // is invalid, the provider should not replay any events. // @@ -86,10 +86,10 @@ type ( // Events are also sent synchronously to subscribers, so if a subscriber's callback blocks, the others // have to wait. // -// Joe optionally supports event replaying with the help of a replay provider. +// Joe optionally supports event replaying with the help of a Replayer. // -// If the replay provider panics, the subscription for which it panicked is considered failed -// and an error is returned, and thereafter the replay provider is not used anymore – no replays +// If the replayer panics, the subscription for which it panicked is considered failed +// and an error is returned, and thereafter the replayer is not used anymore – no replays // will be attempted for future subscriptions. // If due to some other unexpected scenario something panics internally, Joe will remove all subscribers // and close itself, so subscribers don't end up blocked. @@ -104,8 +104,8 @@ type Joe struct { closed chan struct{} subscribers map[subscriber]Subscription - // An optional replay provider that Joe uses to resend older messages to new subscribers. - ReplayProvider ReplayProvider + // An optional replayer that Joe uses to resend older messages to new subscribers. + Replayer Replayer initDone sync.Once } @@ -145,8 +145,8 @@ func (j *Joe) Subscribe(ctx context.Context, sub Subscription) error { // receives each unique message once, regardless of how many topics it // is subscribed to or to how many topics the message is published. // -// It returns ErrNoTopic if no topics are provided, eventual ReplayProvider.Put -// errors or ErrProviderClosed. If the replay provider returns an error the +// It returns ErrNoTopic if no topics are provided, eventual Replayer.Put +// errors or ErrProviderClosed. If the replayer returns an error the // message will still be sent but most probably it won't be replayed to // new subscribers, depending on how the error is handled by the replay provider. func (j *Joe) Publish(msg *Message, topics []string) error { @@ -200,7 +200,7 @@ func (j *Joe) removeSubscriber(sub subscriber) { close(sub) } -func (j *Joe) start(replay ReplayProvider) { +func (j *Joe) start(replay Replayer) { defer close(j.closed) // defer closing all subscribers instead of closing them when done is closed // so in case of a panic subscribers won't block the request goroutines forever. @@ -267,13 +267,13 @@ func (j *Joe) closeSubscribers() { } } -func tryReplay(sub Subscription, replay *ReplayProvider) (err error) { //nolint:gocritic // intended +func tryReplay(sub Subscription, replay *Replayer) (err error) { //nolint:gocritic // intended defer handleReplayerPanic(replay, &err) return (*replay).Replay(sub) } -func tryPut(msg messageWithTopics, replay *ReplayProvider) (m *Message, err error) { //nolint:gocritic // intended +func tryPut(msg messageWithTopics, replay *Replayer) (m *Message, err error) { //nolint:gocritic // intended defer handleReplayerPanic(replay, &err) return (*replay).Put(msg.message, msg.topics) @@ -283,7 +283,7 @@ type replayPanic struct{} func (replayPanic) Error() string { return "replay provider panicked" } -func handleReplayerPanic(replay *ReplayProvider, errp *error) { //nolint:gocritic // intended +func handleReplayerPanic(replay *Replayer, errp *error) { //nolint:gocritic // intended if r := recover(); r != nil { *replay = nil *errp = replayPanic{} @@ -299,9 +299,9 @@ func (j *Joe) init() { j.closed = make(chan struct{}) j.subscribers = map[subscriber]Subscription{} - replay := j.ReplayProvider + replay := j.Replayer if replay == nil { - replay = noopReplayProvider{} + replay = noopReplayer{} } go j.start(replay) }) diff --git a/joe_test.go b/joe_test.go index 311f82d..5336a6f 100644 --- a/joe_test.go +++ b/joe_test.go @@ -11,13 +11,13 @@ import ( "github.com/tmaxmax/go-sse/internal/tests" ) -type mockReplayProvider struct { +type mockReplayer struct { putc chan struct{} replayc chan struct{} shouldPanic string } -func (m *mockReplayProvider) Put(msg *sse.Message, _ []string) (*sse.Message, error) { +func (m *mockReplayer) Put(msg *sse.Message, _ []string) (*sse.Message, error) { m.putc <- struct{}{} if strings.Contains(m.shouldPanic, "put") { panic("panicked") @@ -26,7 +26,7 @@ func (m *mockReplayProvider) Put(msg *sse.Message, _ []string) (*sse.Message, er return msg, nil } -func (m *mockReplayProvider) Replay(_ sse.Subscription) error { +func (m *mockReplayer) Replay(_ sse.Subscription) error { m.replayc <- struct{}{} if strings.Contains(m.shouldPanic, "replay") { panic("panicked") @@ -35,18 +35,18 @@ func (m *mockReplayProvider) Replay(_ sse.Subscription) error { return nil } -func (m *mockReplayProvider) replays() int { +func (m *mockReplayer) replays() int { return len(m.replayc) } -func (m *mockReplayProvider) puts() int { +func (m *mockReplayer) puts() int { return len(m.putc) } -var _ sse.ReplayProvider = (*mockReplayProvider)(nil) +var _ sse.Replayer = (*mockReplayer)(nil) -func newMockReplayProvider(shouldPanic string, numExpectedCalls int) *mockReplayProvider { - return &mockReplayProvider{ +func newMockReplayer(shouldPanic string, numExpectedCalls int) *mockReplayer { + return &mockReplayer{ shouldPanic: shouldPanic, putc: make(chan struct{}, numExpectedCalls), replayc: make(chan struct{}, numExpectedCalls), @@ -73,9 +73,9 @@ func (c mockClient) Flush() error { return c(nil) } func TestJoe_Shutdown(t *testing.T) { t.Parallel() - rp := newMockReplayProvider("", 0) + rp := newMockReplayer("", 0) j := &sse.Joe{ - ReplayProvider: rp, + Replayer: rp, } tests.Equal(t, j.Shutdown(context.Background()), nil, "joe should close successfully") @@ -169,9 +169,9 @@ func newMockContext(tb testing.TB) (*mockContext, context.CancelFunc) { func TestJoe_SubscribePublish(t *testing.T) { t.Parallel() - rp := newMockReplayProvider("", 2) + rp := newMockReplayer("", 2) j := &sse.Joe{ - ReplayProvider: rp, + Replayer: rp, } defer j.Shutdown(context.Background()) //nolint:errcheck // irrelevant @@ -229,11 +229,11 @@ data: world func TestJoe_errors(t *testing.T) { t.Parallel() - fin, err := sse.NewFiniteReplayProvider(2, false) + fin, err := sse.NewFiniteReplayer(2, false) tests.Equal(t, err, nil, "should create new FiniteReplayProvider") j := &sse.Joe{ - ReplayProvider: fin, + Replayer: fin, } defer j.Shutdown(context.Background()) //nolint:errcheck // irrelevant @@ -299,8 +299,8 @@ func (m *mockMessageWriter) Flush() error { func TestJoe_ReplayPanic(t *testing.T) { t.Parallel() - rp := newMockReplayProvider("replay put", 1) - j := &sse.Joe{ReplayProvider: rp} + rp := newMockReplayer("replay put", 1) + j := &sse.Joe{Replayer: rp} wr := &mockMessageWriter{msg: make(chan *sse.Message, 1)} topics := []string{sse.DefaultTopic} @@ -321,8 +321,8 @@ func TestJoe_ReplayPanic(t *testing.T) { tests.Equal(t, j.Shutdown(context.Background()), nil, "shutdown should succeed") tests.Equal(t, <-suberr, nil, "unexpected subscribe error") - rp = newMockReplayProvider("put", 1) - j = &sse.Joe{ReplayProvider: rp} + rp = newMockReplayer("put", 1) + j = &sse.Joe{Replayer: rp} go func() { suberr <- j.Subscribe(context.Background(), sse.Subscription{Client: wr, Topics: topics}) }() _, ok = <-rp.replayc diff --git a/replay_provider.go b/replay.go similarity index 75% rename from replay_provider.go rename to replay.go index fcc5c1c..6cbe015 100644 --- a/replay_provider.go +++ b/replay.go @@ -6,22 +6,22 @@ import ( "time" ) -// NewFiniteReplayProvider creates a finite replay provider with the given max +// NewFiniteReplayer creates a finite replay provider with the given max // count and auto ID behaviour. // -// Count is the maximum number of events FiniteReplayProvider should hold as +// Count is the maximum number of events FiniteReplayer should hold as // valid. It must be greater than zero. // -// AutoIDs configures FiniteReplayProvider to automatically set the IDs of +// AutoIDs configures FiniteReplayer to automatically set the IDs of // events. -func NewFiniteReplayProvider( +func NewFiniteReplayer( count int, autoIDs bool, -) (*FiniteReplayProvider, error) { +) (*FiniteReplayer, error) { if count < 2 { return nil, errors.New("count must be at least 2") } - r := &FiniteReplayProvider{} + r := &FiniteReplayer{} r.buf.buf = make([]messageWithTopics, count) if autoIDs { r.currentID = new(uint64) @@ -30,16 +30,16 @@ func NewFiniteReplayProvider( return r, nil } -// FiniteReplayProvider is a replay provider that replays at maximum a certain number of events. -// The events must have an ID unless the replay provider is configured to set IDs automatically. -type FiniteReplayProvider struct { +// FiniteReplayer is a replayer that replays at maximum a certain number of events. +// The events must have an ID unless the replayer is configured to set IDs automatically. +type FiniteReplayer struct { currentID *uint64 buf queue[messageWithTopics] } -// Put puts a message into the provider's buffer. If there are more messages than the maximum +// Put puts a message into the replayer's buffer. If there are more messages than the maximum // number, the oldest message is removed. -func (f *FiniteReplayProvider) Put(message *Message, topics []string) (*Message, error) { +func (f *FiniteReplayer) Put(message *Message, topics []string) (*Message, error) { if len(topics) == 0 { return nil, ErrNoTopic } @@ -54,9 +54,8 @@ func (f *FiniteReplayProvider) Put(message *Message, topics []string) (*Message, return message, nil } -// Replay replays the messages in the buffer to the listener. -// It doesn't take into account the messages' expiry times. -func (f *FiniteReplayProvider) Replay(subscription Subscription) error { +// Replay replays the stored messages to the listener. +func (f *FiniteReplayer) Replay(subscription Subscription) error { i := findIDInQueue(&f.buf, subscription.LastEventID, f.currentID != nil) if i < 0 { return nil @@ -78,13 +77,13 @@ func (f *FiniteReplayProvider) Replay(subscription Subscription) error { return subscription.Client.Flush() } -// ValidReplayProvider is a ReplayProvider that replays all the buffered non-expired events. +// ValidReplayer is a Replayer that replays all the buffered non-expired events. // -// The provider removes any expired events when a new event is put and after at least +// The replayer removes any expired events when a new event is put and after at least // a GCInterval period passed. // -// The events must have an ID unless the replay provider is configured to set IDs automatically. -type ValidReplayProvider struct { +// The events must have an ID unless the replayer is configured to set IDs automatically. +type ValidReplayer struct { lastGC time.Time // The function used to retrieve the current time. Defaults to time.Now. @@ -95,7 +94,7 @@ type ValidReplayProvider struct { messages queue[messageWithTopicsAndExpiry] ttl time.Duration - // After how long the ReplayProvider should attempt to clean up expired events. + // After how long the replayer should attempt to clean up expired events. // By default cleanup is done after a fourth of the TTL has passed; this means // that messages may be stored for a duration equal to 5/4*TTL. If this is not // desired, set the GC interval to a value sensible for your use case or set @@ -104,19 +103,19 @@ type ValidReplayProvider struct { GCInterval time.Duration } -// NewValidReplayProvider creates a valid replay provider with the given message +// NewValidReplayer creates a ValidReplayer with the given message // lifetime duration (time-to-live) and auto ID behavior. // // The TTL must be a positive duration. It is technically possible to use a very // big duration in order to store and replay every message put for the lifetime // of the program; this is not recommended, as memory usage becomes effectively // unbounded which might lead to a crash. -func NewValidReplayProvider(ttl time.Duration, autoIDs bool) (*ValidReplayProvider, error) { +func NewValidReplayer(ttl time.Duration, autoIDs bool) (*ValidReplayer, error) { if ttl <= 0 { return nil, errors.New("event TTL must be greater than zero") } - r := &ValidReplayProvider{ + r := &ValidReplayer{ Now: time.Now, GCInterval: ttl / 4, ttl: ttl, @@ -129,8 +128,8 @@ func NewValidReplayProvider(ttl time.Duration, autoIDs bool) (*ValidReplayProvid return r, nil } -// Put puts the message into the provider's buffer. -func (v *ValidReplayProvider) Put(message *Message, topics []string) (*Message, error) { +// Put puts the message into the replayer's buffer. +func (v *ValidReplayer) Put(message *Message, topics []string) (*Message, error) { if len(topics) == 0 { return nil, ErrNoTopic } @@ -163,16 +162,16 @@ func (v *ValidReplayProvider) Put(message *Message, topics []string) (*Message, return message, nil } -func (v *ValidReplayProvider) shouldGC(now time.Time) bool { +func (v *ValidReplayer) shouldGC(now time.Time) bool { return v.GCInterval > 0 && now.Sub(v.lastGC) >= v.GCInterval } -// GC removes all the expired messages from the provider's buffer. -func (v *ValidReplayProvider) GC() { +// GC removes all the expired messages from the replayer's buffer. +func (v *ValidReplayer) GC() { v.doGC(v.Now()) } -func (v *ValidReplayProvider) doGC(now time.Time) { +func (v *ValidReplayer) doGC(now time.Time) { for v.messages.count > 0 { e := v.messages.buf[v.messages.head] if e.exp.After(now) { @@ -192,7 +191,7 @@ func (v *ValidReplayProvider) doGC(now time.Time) { } // Replay replays all the valid messages to the listener. -func (v *ValidReplayProvider) Replay(subscription Subscription) error { +func (v *ValidReplayer) Replay(subscription Subscription) error { i := findIDInQueue(&v.messages, subscription.LastEventID, v.currentID != nil) if i < 0 { return nil @@ -381,9 +380,9 @@ type messageWithTopicsAndExpiry struct { messageWithTopics } -// noopReplayProvider is the default replay provider used if none is given. It does nothing. +// noopReplayer is the default replay provider used if none is given. It does nothing. // It is used to avoid nil checks for the provider each time it is used. -type noopReplayProvider struct{} +type noopReplayer struct{} -func (n noopReplayProvider) Put(m *Message, _ []string) (*Message, error) { return m, nil } -func (n noopReplayProvider) Replay(_ Subscription) error { return nil } +func (n noopReplayer) Put(m *Message, _ []string) (*Message, error) { return m, nil } +func (n noopReplayer) Replay(_ Subscription) error { return nil } diff --git a/replay_test.go b/replay_test.go index 546c5ee..11b85ca 100644 --- a/replay_test.go +++ b/replay_test.go @@ -10,7 +10,7 @@ import ( "github.com/tmaxmax/go-sse/internal/tests" ) -func replay(tb testing.TB, p sse.ReplayProvider, lastEventID sse.EventID, topics ...string) []*sse.Message { +func replay(tb testing.TB, p sse.Replayer, lastEventID sse.EventID, topics ...string) []*sse.Message { tb.Helper() if len(topics) == 0 { @@ -45,7 +45,7 @@ func replay(tb testing.TB, p sse.ReplayProvider, lastEventID sse.EventID, topics return replayed } -func put(tb testing.TB, p sse.ReplayProvider, msg *sse.Message, topics ...string) *sse.Message { +func put(tb testing.TB, p sse.Replayer, msg *sse.Message, topics ...string) *sse.Message { tb.Helper() if len(topics) == 0 { @@ -58,7 +58,7 @@ func put(tb testing.TB, p sse.ReplayProvider, msg *sse.Message, topics ...string return msg } -func testReplayError(tb testing.TB, p sse.ReplayProvider, tm *tests.Time) { +func testReplayError(tb testing.TB, p sse.Replayer, tm *tests.Time) { tb.Helper() tm.Reset() @@ -86,10 +86,10 @@ func TestValidReplayProvider(t *testing.T) { tm := &tests.Time{} ttl := time.Millisecond * 5 - _, err := sse.NewValidReplayProvider(0, false) + _, err := sse.NewValidReplayer(0, false) tests.Expect(t, err != nil, "replay provider cannot be created with zero or negative TTL") - p, _ := sse.NewValidReplayProvider(ttl, true) + p, _ := sse.NewValidReplayer(ttl, true) p.GCInterval = 0 p.Now = tm.Now @@ -125,7 +125,7 @@ func TestValidReplayProvider(t *testing.T) { tests.Equal(t, len(allReplayed), 2, "there should be two messages in topic 't'") tests.Equal(t, allReplayed[0].String(), "id: 6\ndata: again\n\n", "invalid message received") - tr, err := sse.NewValidReplayProvider(time.Second, false) + tr, err := sse.NewValidReplayer(time.Second, false) tests.Equal(t, err, nil, "replay provider should be created") testReplayError(t, tr, tm) @@ -134,10 +134,10 @@ func TestValidReplayProvider(t *testing.T) { func TestFiniteReplayProvider(t *testing.T) { t.Parallel() - _, err := sse.NewFiniteReplayProvider(1, false) + _, err := sse.NewFiniteReplayer(1, false) tests.Expect(t, err != nil, "should not create FiniteReplayProvider with count less than 2") - p, err := sse.NewFiniteReplayProvider(3, false) + p, err := sse.NewFiniteReplayer(3, false) tests.Equal(t, err, nil, "should create new FiniteReplayProvider") tests.Equal(t, p.Replay(sse.Subscription{}), nil, "replay failed on provider without messages") @@ -162,20 +162,20 @@ func TestFiniteReplayProvider(t *testing.T) { replayed = replay(t, p, sse.ID("4"), sse.DefaultTopic, "topic with no messages")[0] tests.Equal(t, replayed.String(), "id: 6\ndata: again\n\n", "invalid replayed message") - idp, err := sse.NewFiniteReplayProvider(10, true) + idp, err := sse.NewFiniteReplayer(10, true) tests.Equal(t, err, nil, "should create new FiniteReplayProvider") _, err = idp.Put(msg(t, "should error", "should not have ID"), []string{sse.DefaultTopic}) tests.Expect(t, err != nil, "messages with IDs cannot be put in an autoID replay provider") - tr, err := sse.NewFiniteReplayProvider(10, false) + tr, err := sse.NewFiniteReplayer(10, false) tests.Equal(t, err, nil, "should create new FiniteReplayProvider") testReplayError(t, tr, nil) } func TestFiniteReplayProvider_allocations(t *testing.T) { - p, err := sse.NewFiniteReplayProvider(3, false) + p, err := sse.NewFiniteReplayer(3, false) tests.Equal(t, err, nil, "should create new FiniteReplayProvider") const runs = 100 diff --git a/server.go b/server.go index eb670de..a5039af 100644 --- a/server.go +++ b/server.go @@ -73,7 +73,7 @@ var ErrProviderClosed = errors.New("go-sse.server: provider is closed") // ErrNoTopic is a sentinel error returned when a Message is published without any topics. // It is not an issue to call Server.Publish without topics, because the Server will add the DefaultTopic; -// it is an error to call Provider.Publish or ReplayProvider.Put without any topics, though. +// it is an error to call Provider.Publish or Replayer.Put without any topics, though. var ErrNoTopic = errors.New("go-sse.server: no topics specified") // DefaultTopic is the identifier for the topic that is implied when no topics are specified for a Subscription @@ -109,7 +109,7 @@ type Logger interface { Log(ctx context.Context, level LogLevel, msg string, data map[string]any) } -// A Server is mostly a convenience wrapper around a provider. +// A Server is mostly a convenience wrapper around a Provider. // It implements the http.Handler interface and has some methods // for calling the underlying provider's methods. //