-
Notifications
You must be signed in to change notification settings - Fork 36
/
Copy pathmux_observer.go
145 lines (127 loc) · 2.68 KB
/
mux_observer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package broadcast
type taggedObservation struct {
sub *subObserver
ob interface{}
}
const (
register = iota
unregister
purge
)
type taggedRegReq struct {
sub *subObserver
ch chan<- interface{}
regType int
}
// A MuxObserver multiplexes several streams of observations onto a
// single delivery goroutine.
type MuxObserver struct {
subs map[*subObserver]map[chan<- interface{}]bool
reg chan taggedRegReq
input chan taggedObservation
}
// NewMuxObserver constructs a new MuxObserver.
//
// qlen is the size of the channel buffer for observations sent into
// the mux observer and reglen is the size of the channel buffer for
// registration/unregistration events.
func NewMuxObserver(qlen, reglen int) *MuxObserver {
rv := &MuxObserver{
subs: map[*subObserver]map[chan<- interface{}]bool{},
reg: make(chan taggedRegReq, reglen),
input: make(chan taggedObservation, qlen),
}
go rv.run()
return rv
}
// Close shuts down this mux observer.
func (m *MuxObserver) Close() error {
close(m.reg)
return nil
}
func (m *MuxObserver) broadcast(to taggedObservation) {
for ch := range m.subs[to.sub] {
ch <- to.ob
}
}
func (m *MuxObserver) doReg(tr taggedRegReq) {
mm, exists := m.subs[tr.sub]
if !exists {
mm = map[chan<- interface{}]bool{}
m.subs[tr.sub] = mm
}
mm[tr.ch] = true
}
func (m *MuxObserver) doUnreg(tr taggedRegReq) {
mm, exists := m.subs[tr.sub]
if exists {
delete(mm, tr.ch)
if len(mm) == 0 {
delete(m.subs, tr.sub)
}
}
}
func (m *MuxObserver) handleReg(tr taggedRegReq) {
switch tr.regType {
case register:
m.doReg(tr)
case unregister:
m.doUnreg(tr)
case purge:
delete(m.subs, tr.sub)
}
}
func (m *MuxObserver) run() {
for {
select {
case tr, ok := <-m.reg:
if ok {
m.handleReg(tr)
} else {
return
}
default:
select {
case to := <-m.input:
m.broadcast(to)
case tr, ok := <-m.reg:
if ok {
m.handleReg(tr)
} else {
return
}
}
}
}
}
// Sub creates a new sub-broadcaster from this MuxObserver.
func (m *MuxObserver) Sub() Broadcaster {
return &subObserver{m}
}
type subObserver struct {
mo *MuxObserver
}
func (s *subObserver) Register(ch chan<- interface{}) {
s.mo.reg <- taggedRegReq{s, ch, register}
}
func (s *subObserver) Unregister(ch chan<- interface{}) {
s.mo.reg <- taggedRegReq{s, ch, unregister}
}
func (s *subObserver) Close() error {
s.mo.reg <- taggedRegReq{s, nil, purge}
return nil
}
func (s *subObserver) Submit(ob interface{}) {
s.mo.input <- taggedObservation{s, ob}
}
func (s *subObserver) TrySubmit(ob interface{}) bool {
if s == nil {
return false
}
select {
case s.mo.input <- taggedObservation{s, ob}:
return true
default:
return false
}
}