forked from dustin/go-broadcast
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbroadcaster.go
104 lines (89 loc) · 2.17 KB
/
broadcaster.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
/*
Package broadcast provides pubsub of messages over channels.
A provider has a Broadcaster into which it Submits messages and into
which subscribers Register to pick up those messages.
*/
package broadcast
type broadcaster struct {
input chan interface{}
reg chan chan<- interface{}
unreg chan chan<- interface{}
outputs map[chan<- interface{}]bool
}
// The Broadcaster interface describes the main entry points to
// broadcasters.
type Broadcaster interface {
// Register a new channel to receive broadcasts
Register(chan<- interface{})
// Unregister a channel so that it no longer receives broadcasts.
Unregister(chan<- interface{})
// Shut this broadcaster down.
Close() error
// Submit a new object to all subscribers
Submit(interface{})
// Try Submit a new object to all subscribers return false if input chan is fill
TrySubmit(interface{}) bool
}
func (b *broadcaster) broadcast(m interface{}) {
for ch := range b.outputs {
ch <- m
}
}
func (b *broadcaster) run() {
for {
select {
case m := <-b.input:
b.broadcast(m)
case ch, ok := <-b.reg:
if ok {
b.outputs[ch] = true
} else {
return
}
case ch := <-b.unreg:
delete(b.outputs, ch)
}
}
}
// NewBroadcaster creates a new broadcaster with the given input
// channel buffer length.
func NewBroadcaster(buflen int) Broadcaster {
b := &broadcaster{
input: make(chan interface{}, buflen),
reg: make(chan chan<- interface{}),
unreg: make(chan chan<- interface{}),
outputs: make(map[chan<- interface{}]bool),
}
go b.run()
return b
}
func (b *broadcaster) Register(newch chan<- interface{}) {
b.reg <- newch
}
func (b *broadcaster) Unregister(newch chan<- interface{}) {
b.unreg <- newch
}
func (b *broadcaster) Close() error {
close(b.reg)
close(b.unreg)
return nil
}
// Submit an item to be broadcast to all listeners.
func (b *broadcaster) Submit(m interface{}) {
if b != nil {
b.input <- m
}
}
// TrySubmit attempts to submit an item to be broadcast, returning
// true iff it the item was broadcast, else false.
func (b *broadcaster) TrySubmit(m interface{}) bool {
if b == nil {
return false
}
select {
case b.input <- m:
return true
default:
return false
}
}