-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathconnection_pool.go
113 lines (99 loc) · 2.75 KB
/
connection_pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package lrs
import (
"github.com/gorilla/websocket"
"sync"
)
// SocketStateMap type
type SocketStateMap map[*websocket.Conn]bool
// SocketConnectionsMap type
type SocketConnectionsMap map[*websocket.Conn]*User
// UserConnectionsMap type
type UserConnectionsMap map[uint64]SocketStateMap
// ConnectionPool is intended to keep track of all connections
type ConnectionPool struct {
Sockets SocketConnectionsMap
Users UserConnectionsMap
outbox map[*websocket.Conn]chan Frame
sync.Mutex
}
// NewConnectionPool is factory to create new pool of connections
func NewConnectionPool() *ConnectionPool {
var pool ConnectionPool
pool.Sockets = make(SocketConnectionsMap)
pool.Users = make(UserConnectionsMap)
pool.outbox = make(map[*websocket.Conn]chan Frame)
return &pool
}
// AddConnection adds connection to the pool
func (pool *ConnectionPool) AddConnection(conn *websocket.Conn) {
pool.Lock()
pool.Sockets[conn] = nil
pool.outbox[conn] = make(chan Frame)
pool.Unlock()
go pool.dispatch(conn)
}
// Authenticate binds user identifier to a connection
func (pool *ConnectionPool) Authenticate(conn *websocket.Conn, user *User) {
pool.Lock()
defer pool.Unlock()
pool.Sockets[conn] = user
if _, ok := pool.Users[user.ID]; !ok {
pool.Users[user.ID] = make(SocketStateMap)
}
pool.Users[user.ID][conn] = true
}
// Logout disassociates users and connections
func (pool *ConnectionPool) Logout(user *User) {
pool.Lock()
defer pool.Unlock()
for conn := range pool.Users[user.ID] {
pool.Sockets[conn] = nil
delete(pool.Users[pool.Sockets[conn].ID], conn)
}
}
// DropConnection closes connection
func (pool *ConnectionPool) DropConnection(conn *websocket.Conn) {
pool.Lock()
defer pool.Unlock()
if pool.Sockets[conn] != nil {
delete(pool.Users[pool.Sockets[conn].ID], conn)
}
if _, ok := pool.outbox[conn]; ok {
close(pool.outbox[conn])
delete(pool.outbox, conn)
}
delete(pool.Sockets, conn)
conn.Close()
}
// Broadcast sends a frame to all connections
func (pool *ConnectionPool) Broadcast(frame Frame, skipConn *websocket.Conn) {
for conn := range pool.Sockets {
if conn != skipConn {
pool.Write(conn, frame)
}
}
}
// Write sends a frame to a specific connection
func (pool *ConnectionPool) Write(conn *websocket.Conn, frame Frame) {
if _, ok := pool.outbox[conn]; ok {
pool.outbox[conn] <- frame
}
}
// Send delivers a frame to all user's connections
func (pool *ConnectionPool) Send(to *User, frame Frame) {
if _, ok := pool.Users[to.ID]; ok {
for conn := range pool.Users[to.ID] {
pool.Write(conn, frame)
}
}
}
// actually sends frames
func (pool *ConnectionPool) dispatch(c *websocket.Conn) {
for f := range pool.outbox[c] {
// time.Sleep(200 * time.Millisecond)
err := c.WriteJSON(&f)
if err != nil {
pool.DropConnection(c)
}
}
}