forked from st3fan/mijia-hub
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsubscription.go
133 lines (107 loc) · 2.73 KB
/
subscription.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package main
// Mostly taken from https://github.com/pdf/golifx
// This is all way to complicated for what we need. We can simplify this.
import (
"errors"
"sync"
"time"
"github.com/google/uuid"
)
const (
// DefaultTimeout is the default duration after which operations time out
DefaultSubscriptionTimeout = 2 * time.Second
// DefaultRetryInterval is the default interval at which operations are retried
DefaultSubscriptionRetryInterval = 100 * time.Millisecond
subscriptionChanSize = 16
)
var (
// ErrSubscriptionClosed connection closed
ErrSubscriptionClosed = errors.New(`Connection closed`)
// ErrSubscriptionTimeout timed out
ErrSubscriptionTimeout = errors.New(`Timed out`)
// ErrSubscriptionNotFound
ErrSubscriptionNotFound = errors.New(`Not found`)
)
type Subscription struct {
sync.Mutex
id string
events chan interface{}
quit chan struct{}
provider *SubscriptionProvider
}
func newSubscription(provider *SubscriptionProvider) *Subscription {
return &Subscription{
id: uuid.NewString(),
events: make(chan interface{}, subscriptionChanSize),
quit: make(chan struct{}),
provider: provider,
}
}
func (s *Subscription) Events() <-chan interface{} {
return s.events
}
func (s *Subscription) notify(event interface{}) error {
timeout := time.After(DefaultSubscriptionTimeout)
select {
case <-s.quit:
return ErrSubscriptionClosed
case s.events <- event:
return nil
case <-timeout:
return ErrSubscriptionTimeout
}
}
func (s *Subscription) Close() error {
s.Lock()
defer s.Unlock()
select {
case <-s.quit:
return ErrSubscriptionClosed
default:
close(s.quit)
close(s.events)
}
return s.provider.unsubscribe(s)
}
//
type SubscriptionProvider struct {
subscriptions map[string]*Subscription
sync.RWMutex
}
// Notify sends the provided event to all subscribers
func (s *SubscriptionProvider) Notify(event interface{}) {
s.RLock()
defer s.RUnlock()
for _, subscription := range s.subscriptions {
if err := subscription.notify(event); err != nil {
// TODO
}
}
}
func (s *SubscriptionProvider) Close() error {
for _, subscription := range s.subscriptions {
if err := subscription.Close(); err != nil {
// TODO What is the best strategy here?
}
}
return nil
}
func (s *SubscriptionProvider) unsubscribe(subscription *Subscription) error {
s.Lock()
defer s.Unlock()
if _, ok := s.subscriptions[subscription.id]; !ok {
return ErrSubscriptionNotFound
}
return nil
}
// Subscribe returns a new Subscription for this provider
func (s *SubscriptionProvider) Subscribe() *Subscription {
s.Lock()
defer s.Unlock()
if s.subscriptions == nil {
s.subscriptions = make(map[string]*Subscription)
}
sub := newSubscription(s)
s.subscriptions[sub.id] = sub
return sub
}