-
-
Notifications
You must be signed in to change notification settings - Fork 24
/
Copy pathsession.go
161 lines (138 loc) · 4.23 KB
/
session.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package sse
import (
"errors"
"net/http"
)
// ResponseWriter is a http.ResponseWriter augmented with a Flush method.
type ResponseWriter interface {
http.ResponseWriter
Flush() error
}
// MessageWriter is a special kind of response writer used by providers to
// send Messages to clients.
type MessageWriter interface {
// Send sends the message to the client.
// To make sure it is sent, call Flush.
Send(m *Message) error
// Flush sends any buffered messages to the client.
Flush() error
}
// A Session is an HTTP request from an SSE client.
// Create one using the Upgrade function.
//
// Using a Session you can also access the initial HTTP request,
// get the last event ID, or write data to the client.
type Session struct {
// The response writer for the request. Can be used to write an error response
// back to the client. Must not be used after the Session was subscribed!
Res ResponseWriter
// The initial HTTP request. Can be used to retrieve authentication data,
// topics, or data from context – a logger, for example.
Req *http.Request
// Last event ID of the client. It is unset if no ID was provided in the Last-Event-Id
// request header.
LastEventID EventID
didUpgrade bool
}
// Send sends the given event to the client. It returns any errors that occurred while writing the event.
func (s *Session) Send(e *Message) error {
if err := s.doUpgrade(); err != nil {
return err
}
if _, err := e.WriteTo(s.Res); err != nil {
return err
}
return nil
}
// Flush sends any buffered messages to the client.
func (s *Session) Flush() error {
prevDidUpgrade := s.didUpgrade
if err := s.doUpgrade(); err != nil {
return err
}
if prevDidUpgrade == s.didUpgrade {
return s.Res.Flush()
}
return nil
}
func (s *Session) doUpgrade() error {
if !s.didUpgrade {
s.Res.Header()[headerContentType] = headerContentTypeValue
s.Res.WriteHeader(http.StatusOK)
if err := s.Res.Flush(); err != nil {
return err
}
s.didUpgrade = true
}
return nil
}
// Upgrade upgrades an HTTP request to support server-sent events.
// It returns a Session that's used to send events to the client, or an
// error if the upgrade failed.
//
// The headers required by the SSE protocol are only sent when calling
// the Send method for the first time. If other operations are done before
// sending messages, other headers and status codes can safely be set.
func Upgrade(w http.ResponseWriter, r *http.Request) (*Session, error) {
rw := getResponseWriter(w)
if rw == nil {
return nil, ErrUpgradeUnsupported
}
id := EventID{}
// Clients must not send empty Last-Event-Id headers:
// https://html.spec.whatwg.org/multipage/server-sent-events.html#sse-processing-model
if h := r.Header[headerLastEventID]; len(h) != 0 && h[0] != "" {
// We ignore the validity flag because if the given ID is invalid then an unset ID will be returned,
// which providers are required to ignore.
id, _ = NewID(h[0])
}
return &Session{Req: r, Res: rw, LastEventID: id}, nil
}
// ErrUpgradeUnsupported is returned when a request can't be upgraded to support server-sent events.
var ErrUpgradeUnsupported = errors.New("go-sse.server: upgrade unsupported")
// Canonicalized header keys.
const (
headerLastEventID = "Last-Event-Id"
headerContentType = "Content-Type"
)
// Pre-allocated header value.
var headerContentTypeValue = []string{"text/event-stream"}
// Logic below is similar to Go 1.20's ResponseController.
// We can't use that because we need to check if the request supports
// flushing messages before we subscribe it to the event stream.
type writeFlusher interface {
http.ResponseWriter
http.Flusher
}
type writeFlusherError interface {
http.ResponseWriter
FlushError() error
}
type rwUnwrapper interface {
Unwrap() http.ResponseWriter
}
func getResponseWriter(w http.ResponseWriter) ResponseWriter {
for {
switch v := w.(type) {
case writeFlusherError:
return flusherErrorWrapper{v}
case writeFlusher:
return flusherWrapper{v}
case rwUnwrapper:
w = v.Unwrap()
default:
return nil
}
}
}
type flusherWrapper struct {
writeFlusher
}
func (f flusherWrapper) Flush() error {
f.writeFlusher.Flush()
return nil
}
type flusherErrorWrapper struct {
writeFlusherError
}
func (f flusherErrorWrapper) Flush() error { return f.FlushError() }