Skip to content

Commit

Permalink
unistore: close event stream on context cancelation (grafana#101293)
Browse files Browse the repository at this point in the history
* add tests for broacaster

* fix sql notifier not closing the stream

* fix sql notifier not closing the stream

* close sub

* fix broadcaster test

* fix broadcaster test

* suggestion
  • Loading branch information
chaudyg authored Feb 25, 2025
1 parent c525031 commit 53e91fd
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 6 deletions.
38 changes: 38 additions & 0 deletions pkg/storage/unified/resource/broadcaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,41 @@ func TestCache(t *testing.T) {
// slice should return all values
require.Equal(t, []int{4, 5, 6, 7, 8, 9, 10, 11, 12, 13}, c.Slice())
}

func TestBroadcaster(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
ch := make(chan int)
input := []int{1, 2, 3}
go func() {
for _, v := range input {
ch <- v
}
}()
t.Cleanup(func() {
close(ch)
})

b, err := NewBroadcaster(ctx, func(out chan<- int) error {
go func() {
for v := range ch {
out <- v
}
}()
return nil
})
require.NoError(t, err)

sub, err := b.Subscribe(ctx)
require.NoError(t, err)

for _, expected := range input {
v, ok := <-sub
require.True(t, ok)
require.Equal(t, expected, v)
}

// cancel the context should close the stream
cancel()
_, ok := <-sub
require.False(t, ok)
}
5 changes: 1 addition & 4 deletions pkg/storage/unified/resource/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -919,10 +919,7 @@ func (s *server) initWatcher() error {
return err
}
go func() {
for {
// pipe all events
v := <-events

for v := range events {
if v == nil {
s.log.Error("received nil event")
continue
Expand Down
2 changes: 2 additions & 0 deletions pkg/storage/unified/sql/notifier_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ func (p *pollingNotifier) poller(ctx context.Context, since groupResourceRV, str

for {
select {
case <-ctx.Done():
return
case <-p.done:
return
case <-t.C:
Expand Down
37 changes: 37 additions & 0 deletions pkg/storage/unified/sql/notifier_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,4 +357,41 @@ func TestPollingNotifier(t *testing.T) {
t.Fatal("timeout waiting for events channel to close")
}
})

t.Run("stops polling when context is cancelled", func(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())

cfg := &pollingNotifierConfig{
dialect: sqltemplate.SQLite,
pollingInterval: 10 * time.Millisecond,
watchBufferSize: 10,
log: log.NewNopLogger(),
tracer: noop.NewTracerProvider().Tracer("test"),
batchLock: &batchLock{},
listLatestRVs: func(ctx context.Context) (groupResourceRV, error) { return nil, nil },
historyPoll: func(ctx context.Context, grp string, res string, since int64) ([]*historyPollResponse, error) {
return nil, nil
},
done: make(chan struct{}),
}

notifier, err := newPollingNotifier(cfg)
require.NoError(t, err)
require.NotNil(t, notifier)

events, err := notifier.notify(ctx)
require.NoError(t, err)
require.NotNil(t, events)

cancel()

select {
case _, ok := <-events:
require.False(t, ok, "events channel should be closed")
case <-time.After(time.Second):
t.Fatal("timeout waiting for events channel to close")
}
})
}
10 changes: 8 additions & 2 deletions pkg/storage/unified/testing/storage_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func RunStorageBackendTest(t *testing.T, newBackend NewBackendFunc, opts *TestOp
fn func(*testing.T, resource.StorageBackend)
}{
{TestHappyPath, runTestIntegrationBackendHappyPath},
{TestWatchWriteEvents, runTestIntegrationBackendWatchWriteEventsFromLastest},
{TestWatchWriteEvents, runTestIntegrationBackendWatchWriteEvents},
{TestList, runTestIntegrationBackendList},
{TestBlobSupport, runTestIntegrationBlobSupport},
{TestGetResourceStats, runTestIntegrationBackendGetResourceStats},
Expand Down Expand Up @@ -272,7 +272,7 @@ func runTestIntegrationBackendGetResourceStats(t *testing.T, backend resource.St
})
}

func runTestIntegrationBackendWatchWriteEventsFromLastest(t *testing.T, backend resource.StorageBackend) {
func runTestIntegrationBackendWatchWriteEvents(t *testing.T, backend resource.StorageBackend) {
ctx := testutil.NewTestContext(t, time.Now().Add(5*time.Second))

// Create a few resources before initing the watch
Expand All @@ -287,6 +287,12 @@ func runTestIntegrationBackendWatchWriteEventsFromLastest(t *testing.T, backend
_, err = writeEvent(ctx, backend, "item2", resource.WatchEvent_ADDED)
require.NoError(t, err)
require.Equal(t, "item2", (<-stream).Key.Name)

// Should close the stream
ctx.Cancel()

_, ok := <-stream
require.False(t, ok)
}

func runTestIntegrationBackendList(t *testing.T, backend resource.StorageBackend) {
Expand Down

0 comments on commit 53e91fd

Please sign in to comment.