-
-
Notifications
You must be signed in to change notification settings - Fork 24
/
Copy pathjoe_test.go
336 lines (251 loc) · 8.79 KB
/
joe_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
package sse_test
import (
"context"
"errors"
"strings"
"testing"
"time"
"github.com/tmaxmax/go-sse"
"github.com/tmaxmax/go-sse/internal/tests"
)
type mockReplayer struct {
putc chan struct{}
replayc chan struct{}
shouldPanic string
}
func (m *mockReplayer) Put(msg *sse.Message, _ []string) (*sse.Message, error) {
m.putc <- struct{}{}
if strings.Contains(m.shouldPanic, "put") {
panic("panicked")
}
return msg, nil
}
func (m *mockReplayer) Replay(_ sse.Subscription) error {
m.replayc <- struct{}{}
if strings.Contains(m.shouldPanic, "replay") {
panic("panicked")
}
return nil
}
func (m *mockReplayer) replays() int {
return len(m.replayc)
}
func (m *mockReplayer) puts() int {
return len(m.putc)
}
var _ sse.Replayer = (*mockReplayer)(nil)
func newMockReplayer(shouldPanic string, numExpectedCalls int) *mockReplayer {
return &mockReplayer{
shouldPanic: shouldPanic,
putc: make(chan struct{}, numExpectedCalls),
replayc: make(chan struct{}, numExpectedCalls),
}
}
func msg(tb testing.TB, data, id string) *sse.Message {
tb.Helper()
e := &sse.Message{}
e.AppendData(data)
if id != "" {
e.ID = sse.ID(id)
}
return e
}
type mockClient func(m *sse.Message) error
func (c mockClient) Send(m *sse.Message) error { return c(m) }
func (c mockClient) Flush() error { return c(nil) }
func TestJoe_Shutdown(t *testing.T) {
t.Parallel()
rp := newMockReplayer("", 0)
j := &sse.Joe{
Replayer: rp,
}
tests.Equal(t, j.Shutdown(context.Background()), nil, "joe should close successfully")
tests.Equal(t, j.Shutdown(context.Background()), sse.ErrProviderClosed, "joe should already be closed")
tests.Equal(t, j.Subscribe(context.Background(), sse.Subscription{}), sse.ErrProviderClosed, "no operation should be allowed on closed joe")
tests.Equal(t, j.Publish(nil, nil), sse.ErrNoTopic, "parameter validation should happen first")
tests.Equal(t, j.Publish(nil, []string{sse.DefaultTopic}), sse.ErrProviderClosed, "no operation should be allowed on closed joe")
tests.Equal(t, rp.puts(), 0, "joe should not have used the replay provider")
tests.Equal(t, rp.replays(), 0, "joe should not have used the replay provider")
j = &sse.Joe{}
// trigger internal initialization, so the concurrent Shutdowns aren't serialized by the internal sync.Once.
_ = j.Publish(&sse.Message{}, []string{sse.DefaultTopic})
//nolint
tests.NotPanics(t, func() {
go j.Shutdown(context.Background())
j.Shutdown(context.Background())
}, "concurrent shutdown should work")
j = &sse.Joe{}
subctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*15)
t.Cleanup(cancel)
go j.Subscribe(subctx, sse.Subscription{ //nolint:errcheck // we don't care about this error
Topics: []string{sse.DefaultTopic},
Client: mockClient(func(m *sse.Message) error {
if m != nil {
time.Sleep(time.Millisecond * 8)
}
return nil
}),
})
time.Sleep(time.Millisecond)
_ = j.Publish(&sse.Message{}, []string{sse.DefaultTopic})
sctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*5)
t.Cleanup(cancel)
tests.ErrorIs(t, j.Shutdown(sctx), context.DeadlineExceeded, "shutdown should stop on closed context")
<-subctx.Done()
}
func subscribe(t testing.TB, p sse.Provider, ctx context.Context, topics ...string) <-chan []*sse.Message { //nolint
t.Helper()
if len(topics) == 0 {
topics = []string{sse.DefaultTopic}
}
ch := make(chan []*sse.Message, 1)
go func() {
defer close(ch)
var msgs []*sse.Message
c := mockClient(func(m *sse.Message) error {
if m != nil {
msgs = append(msgs, m)
}
return nil
})
_ = p.Subscribe(ctx, sse.Subscription{Client: c, Topics: topics})
ch <- msgs
}()
return ch
}
type mockContext struct {
context.Context
waitingOnDone chan struct{}
}
func (m *mockContext) Done() <-chan struct{} {
close(m.waitingOnDone)
return m.Context.Done()
}
func newMockContext(tb testing.TB) (*mockContext, context.CancelFunc) {
tb.Helper()
ctx, cancel := context.WithCancel(context.Background())
return &mockContext{Context: ctx, waitingOnDone: make(chan struct{})}, cancel
}
func TestJoe_SubscribePublish(t *testing.T) {
t.Parallel()
rp := newMockReplayer("", 2)
j := &sse.Joe{
Replayer: rp,
}
defer j.Shutdown(context.Background()) //nolint:errcheck // irrelevant
ctx, cancel := newMockContext(t)
defer cancel()
sub := subscribe(t, j, ctx)
<-ctx.waitingOnDone
tests.Equal(t, j.Publish(msg(t, "hello", ""), []string{sse.DefaultTopic}), nil, "publish should succeed")
cancel()
tests.Equal(t, j.Publish(msg(t, "world", ""), []string{sse.DefaultTopic}), nil, "publish should succeed")
msgs := <-sub
tests.Equal(t, "data: hello\n\n", msgs[0].String(), "invalid data received")
ctx2, cancel2 := newMockContext(t)
defer cancel2()
sub2 := subscribe(t, j, ctx2)
<-ctx2.waitingOnDone
tests.Equal(t, j.Shutdown(context.Background()), nil, "shutdown should succeed")
msgs = <-sub2
tests.Equal(t, len(msgs), 0, "unexpected messages received")
tests.Equal(t, rp.puts(), 2, "invalid put calls")
tests.Equal(t, rp.puts(), 2, "invalid replay calls")
}
func TestJoe_Subscribe_multipleTopics(t *testing.T) {
t.Parallel()
j := &sse.Joe{}
defer j.Shutdown(context.Background()) //nolint:errcheck // irrelevant
ctx, cancel := newMockContext(t)
defer cancel()
sub := subscribe(t, j, ctx, sse.DefaultTopic, "another topic")
<-ctx.waitingOnDone
_ = j.Publish(msg(t, "hello", ""), []string{sse.DefaultTopic, "another topic"})
_ = j.Publish(msg(t, "world", ""), []string{"another topic"})
_ = j.Shutdown(context.Background())
msgs := <-sub
expected := `data: hello
data: world
`
tests.Equal(t, expected, msgs[0].String()+msgs[1].String(), "unexpected data received")
}
func TestJoe_errors(t *testing.T) {
t.Parallel()
fin, err := sse.NewFiniteReplayer(2, false)
tests.Equal(t, err, nil, "should create new FiniteReplayProvider")
j := &sse.Joe{
Replayer: fin,
}
defer j.Shutdown(context.Background()) //nolint:errcheck // irrelevant
_ = j.Publish(msg(t, "hello", "0"), []string{sse.DefaultTopic})
_ = j.Publish(msg(t, "hello", "1"), []string{sse.DefaultTopic})
callErr := errors.New("artificial fail")
var called int
client := mockClient(func(m *sse.Message) error {
if m != nil {
called++
}
return callErr
})
err = j.Subscribe(context.Background(), sse.Subscription{
Client: client,
LastEventID: sse.ID("0"),
Topics: []string{sse.DefaultTopic},
})
tests.Equal(t, err, callErr, "error not received from replay")
_ = j.Publish(msg(t, "world", "2"), []string{sse.DefaultTopic})
tests.Equal(t, called, 1, "callback was called after subscribe returned")
called = 0
ctx, cancel := newMockContext(t)
defer cancel()
done := make(chan struct{})
go func() {
defer close(done)
<-ctx.waitingOnDone
_ = j.Publish(msg(t, "", "3"), []string{sse.DefaultTopic})
_ = j.Publish(msg(t, "", "4"), []string{sse.DefaultTopic})
}()
err = j.Subscribe(ctx, sse.Subscription{Client: client, Topics: []string{sse.DefaultTopic}})
tests.Equal(t, err, callErr, "error not received from send")
// Only the first event should be attempted as nothing is replayed.
tests.Equal(t, called, 1, "callback was called after subscribe returned")
<-done
}
type mockMessageWriter struct {
msg chan *sse.Message
}
func (m *mockMessageWriter) Send(msg *sse.Message) error {
m.msg <- msg
return nil
}
func (m *mockMessageWriter) Flush() error {
return nil
}
func TestJoe_ReplayPanic(t *testing.T) {
t.Parallel()
rp := newMockReplayer("replay put", 1)
j := &sse.Joe{Replayer: rp}
wr := &mockMessageWriter{msg: make(chan *sse.Message, 1)}
topics := []string{sse.DefaultTopic}
suberr := make(chan error)
go func() { suberr <- j.Subscribe(context.Background(), sse.Subscription{Client: wr, Topics: topics}) }()
_, ok := <-rp.replayc
tests.Expect(t, ok, "replay wasn't called")
msg := &sse.Message{ID: sse.ID("hello")}
tests.Equal(t, j.Publish(msg, topics), nil, "replay put should not be triggered by publishing anymore")
tests.Equal(t, (<-wr.msg).ID, msg.ID, "message was not sent to client")
go func() { _ = j.Subscribe(context.Background(), sse.Subscription{}) }()
time.Sleep(time.Millisecond)
tests.Equal(t, rp.replays(), 0, "replay was called")
tests.Equal(t, j.Shutdown(context.Background()), nil, "shutdown should succeed")
tests.Equal(t, <-suberr, nil, "unexpected subscribe error")
rp = newMockReplayer("put", 1)
j = &sse.Joe{Replayer: rp}
go func() { suberr <- j.Subscribe(context.Background(), sse.Subscription{Client: wr, Topics: topics}) }()
_, ok = <-rp.replayc
tests.Expect(t, ok, "replay was called")
tests.Equal(t, j.Publish(msg, topics), nil, "replay put error should not be propagated")
tests.Equal(t, (<-wr.msg).ID, msg.ID, "message was not sent to client")
tests.Equal(t, j.Shutdown(context.Background()), nil, "shutdown should succeed")
tests.Equal(t, <-suberr, nil, "unexpected subscribe error")
}