Skip to content

Commit b23b218

Browse files
committed
map based subscriber list
1 parent 4c18b61 commit b23b218

File tree

2 files changed

+36
-30
lines changed

2 files changed

+36
-30
lines changed

common/signal/pubsub/pubsub.go

+34-30
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,13 @@ import (
55
"time"
66

77
"v2ray.com/core/common"
8+
"v2ray.com/core/common/signal/done"
89
"v2ray.com/core/common/task"
910
)
1011

1112
type Subscriber struct {
12-
name string
13-
buffer chan interface{}
14-
removed chan struct{}
13+
buffer chan interface{}
14+
done *done.Instance
1515
}
1616

1717
func (s *Subscriber) push(msg interface{}) {
@@ -25,62 +25,66 @@ func (s *Subscriber) Wait() <-chan interface{} {
2525
return s.buffer
2626
}
2727

28-
func (s *Subscriber) Close() {
29-
close(s.removed)
28+
func (s *Subscriber) Close() error {
29+
return s.done.Close()
3030
}
3131

3232
func (s *Subscriber) IsClosed() bool {
33-
select {
34-
case <-s.removed:
35-
return true
36-
default:
37-
return false
38-
}
33+
return s.done.Done()
3934
}
4035

4136
type Service struct {
4237
sync.RWMutex
43-
subs []*Subscriber
38+
subs map[string][]*Subscriber
4439
ctask *task.Periodic
4540
}
4641

4742
func NewService() *Service {
48-
s := &Service{}
43+
s := &Service{
44+
subs: make(map[string][]*Subscriber),
45+
}
4946
s.ctask = &task.Periodic{
50-
Execute: s.cleanup,
47+
Execute: s.Cleanup,
5148
Interval: time.Second * 30,
5249
}
5350
common.Must(s.ctask.Start())
5451
return s
5552
}
5653

57-
func (s *Service) cleanup() error {
54+
// Cleanup cleans up internal caches of subscribers.
55+
// Visible for testing only.
56+
func (s *Service) Cleanup() error {
5857
s.Lock()
5958
defer s.Unlock()
6059

61-
if len(s.subs) < 16 {
62-
return nil
63-
}
64-
65-
newSub := make([]*Subscriber, 0, len(s.subs))
66-
for _, sub := range s.subs {
67-
if !sub.IsClosed() {
68-
newSub = append(newSub, sub)
60+
for name, subs := range s.subs {
61+
newSub := make([]*Subscriber, 0, len(s.subs))
62+
for _, sub := range subs {
63+
if !sub.IsClosed() {
64+
newSub = append(newSub, sub)
65+
}
66+
}
67+
if len(newSub) == 0 {
68+
delete(s.subs, name)
69+
} else {
70+
s.subs[name] = newSub
6971
}
7072
}
7173

72-
s.subs = newSub
74+
if len(s.subs) == 0 {
75+
s.subs = make(map[string][]*Subscriber)
76+
}
7377
return nil
7478
}
7579

7680
func (s *Service) Subscribe(name string) *Subscriber {
7781
sub := &Subscriber{
78-
name: name,
79-
buffer: make(chan interface{}, 16),
80-
removed: make(chan struct{}),
82+
buffer: make(chan interface{}, 16),
83+
done: done.New(),
8184
}
8285
s.Lock()
83-
s.subs = append(s.subs, sub)
86+
subs := append(s.subs[name], sub)
87+
s.subs[name] = subs
8488
s.Unlock()
8589
return sub
8690
}
@@ -89,8 +93,8 @@ func (s *Service) Publish(name string, message interface{}) {
8993
s.RLock()
9094
defer s.RUnlock()
9195

92-
for _, sub := range s.subs {
93-
if sub.name == name && !sub.IsClosed() {
96+
for _, sub := range s.subs[name] {
97+
if !sub.IsClosed() {
9498
sub.push(message)
9599
}
96100
}

common/signal/pubsub/pubsub_test.go

+2
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,6 @@ func TestPubsub(t *testing.T) {
3030
t.Fail()
3131
default:
3232
}
33+
34+
service.Cleanup()
3335
}

0 commit comments

Comments
 (0)