Skip to content

Commit

Permalink
Merge pull request #2 from andrewwormald/andreww-updatePublishLifespan
Browse files Browse the repository at this point in the history
extend publish and keep alive deadline
  • Loading branch information
andrewwormald authored Dec 20, 2023
2 parents 4a923d1 + 2aa6e62 commit e1db73f
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 6 deletions.
4 changes: 2 additions & 2 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (s *Stream) Accept(w http.ResponseWriter, r *http.Request, channelKey strin
// set a deadline per channel.
func (s *Stream) Publish(payload string) {
for _, c := range s.channels() {
ctx, cancel := context.WithTimeout(s.ctx, time.Millisecond*200)
ctx, cancel := context.WithTimeout(s.ctx, time.Second)
err := c.Send(ctx, payload)
if err != nil {
// NoReturnErr: Allow other channels to be unaffected and close this connection
Expand Down Expand Up @@ -171,7 +171,7 @@ func (s *Stream) sendKeepAliveToClients(ctx context.Context) {
}

for _, c := range s.channels() {
ctx, cancel := context.WithTimeout(s.ctx, time.Millisecond*200)
ctx, cancel := context.WithTimeout(s.ctx, time.Second*30)
err := c.Send(ctx, "Keep-alive")
if err != nil {
// NoReturnErr: Allow other channels to be unaffected and close this connection
Expand Down
41 changes: 37 additions & 4 deletions stream_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package streamer
import (
"context"
"fmt"
"os"
"strconv"
"sync"
"testing"
Expand Down Expand Up @@ -34,7 +35,15 @@ func TestStore(t *testing.T) {
s := New(ctx)
channelID := "conn_1"

c := NewChannel(ctx, nil, channelID)
c := &Channel{
mu: &sync.Mutex{},
id: channelID,
subs: make(map[string]bool),
writeBuf: make(chan string, defaultWriteBuffSize),
interrupt: make(chan os.Signal, 1),
ctx: ctx,
asyncFlush: true,
}
s.store(c)

_, exists := s.pool[channelID]
Expand All @@ -51,7 +60,15 @@ func TestStoreConcurrency(t *testing.T) {
for k := range ls {
wg.Add(1)
id := makeConnID(k)
c := NewChannel(ctx,nil, id)
c := &Channel{
mu: &sync.Mutex{},
id: id,
subs: make(map[string]bool),
writeBuf: make(chan string, defaultWriteBuffSize),
interrupt: make(chan os.Signal, 1),
ctx: ctx,
asyncFlush: true,
}

go func() {
s.store(c)
Expand All @@ -76,7 +93,15 @@ func TestRemove(t *testing.T) {
// Add
for k := range ls {
id := makeConnID(k)
c := NewChannel(ctx,nil, id)
c := &Channel{
mu: &sync.Mutex{},
id: id,
subs: make(map[string]bool),
writeBuf: make(chan string, defaultWriteBuffSize),
interrupt: make(chan os.Signal, 1),
ctx: ctx,
asyncFlush: true,
}
s.store(c)
}

Expand All @@ -91,7 +116,15 @@ func TestRemove(t *testing.T) {
for k := range ls {
wg.Add(1)
id := makeConnID(k)
c := NewChannel(ctx,nil, id)
c := &Channel{
mu: &sync.Mutex{},
id: id,
subs: make(map[string]bool),
writeBuf: make(chan string, defaultWriteBuffSize),
interrupt: make(chan os.Signal, 1),
ctx: ctx,
asyncFlush: true,
}

go func() {
s.remove(c)
Expand Down

0 comments on commit e1db73f

Please sign in to comment.