@@ -42,11 +42,10 @@ class SocketConnectionManager {
42
42
int _connectionAttempts = 0 ;
43
43
44
44
SocketConnectionAttempt _currentAttempt = SocketConnectionAttempt .aborted ();
45
- SocketConnectionAttempt _setNewAttempt (Duration delay) {
46
- return _currentAttempt = SocketConnectionAttempt (delay: delay);
47
- }
48
45
49
46
bool _disposed = false ;
47
+ bool get _shouldAttemptReconnection =>
48
+ ! _disposed && _pendingConnection != null ;
50
49
51
50
/// Requests to start connecting to the socket.
52
51
///
@@ -61,15 +60,15 @@ class SocketConnectionManager {
61
60
}
62
61
63
62
if (immediately) {
64
- if (_pendingConnection != null && ! _currentAttempt.done ) {
65
- _currentAttempt.completeNow ();
63
+ if (_pendingConnection != null && ! _currentAttempt.delayDone ) {
64
+ _currentAttempt.skipDelay ();
66
65
return ;
67
66
}
68
67
69
68
_stopConnecting (4002 , 'Immediate connection requested' );
70
69
_connectionAttempts = 0 ;
71
70
}
72
- _connect ();
71
+ _maybeConnect ();
73
72
}
74
73
75
74
/// Sends a message to the socket. Will start connecting to the socket if
@@ -81,7 +80,7 @@ class SocketConnectionManager {
81
80
/// If after call to [addMessage] a call to [dispose] or [stop] is made, then
82
81
/// this future will complete with an error instead.
83
82
Future <void > addMessage (String message) {
84
- return _connect ().then ((connection) => connection.send (message));
83
+ return _maybeConnect ().then ((connection) => connection.send (message));
85
84
}
86
85
87
86
/// Stops the attempts to connect, and closes the current connection if one is
@@ -113,7 +112,7 @@ class SocketConnectionManager {
113
112
_pendingConnection = null ;
114
113
115
114
// Make sure that no old attempt will begin emitting messages.
116
- if (! _currentAttempt.done ) {
115
+ if (! _currentAttempt.delayDone ) {
117
116
_currentAttempt.abort ();
118
117
}
119
118
_currentAttempt = SocketConnectionAttempt .aborted ();
@@ -123,33 +122,19 @@ class SocketConnectionManager {
123
122
}).ignore ();
124
123
}
125
124
126
- Future <_WebSocketConnection > _connect () async {
125
+ /// Establishes a new connection unless one is already available/in progress.
126
+ Future <_WebSocketConnection > _maybeConnect () {
127
127
if (_disposed) {
128
128
throw StateError ('Cannot connect: WebSocket connection manager disposed' );
129
129
}
130
130
131
- final pendingConnection =
132
- _pendingConnection ?? = _attemptConnection (_reconnectDelay ());
133
- final connection = await pendingConnection;
134
- if (_disposed) {
135
- throw StateError ('Cannot connect: WebSocket connection manager disposed' );
136
- } else if (_pendingConnection != pendingConnection) {
137
- // Something changed while waiting, try again from the start.
138
- _logger.warning ('Connection changed while connecting' );
139
- connection.close (normalClosure, 'New connection is being established' );
140
- return _connect ();
141
- } else {
142
- return connection;
143
- }
131
+ return _pendingConnection ?? _connect ();
144
132
}
145
133
146
- /// Returns a future that completes with a _WsConnection.
147
- ///
148
- /// If [completer] is null, then a new connection attempt will replace
149
- /// _pendingConnection.
134
+ /// Starts connection attempts.
150
135
///
151
- /// If [completer] is not null, then the returned future will be that
152
- /// completer's future .
136
+ /// Upon completiong, the [_pendingConnection] field will be set to the newly
137
+ /// established connection Future, and the same Future will be returned .
153
138
///
154
139
/// Can throw/complete with an exception if:
155
140
/// - during any asynchronous operation, this [SocketConnectionManager] is
@@ -161,66 +146,68 @@ class SocketConnectionManager {
161
146
/// [ConnectionInitializationException] in case the initialization of
162
147
/// connection fails. However, the reconnection will be triggered until it is
163
148
/// established, or interrupted by call to [stop] or [dispose] .
164
- Future <_WebSocketConnection > _attemptConnection (Duration delayDuration,
165
- [Completer <_WebSocketConnection >? completer]) async {
166
- _connectionAttempts++ ;
167
-
168
- final Completer <_WebSocketConnection > connectionCompleter;
169
- if (completer == null ) {
170
- connectionCompleter = Completer <_WebSocketConnection >();
171
- _pendingConnection = connectionCompleter.future;
172
- } else {
173
- connectionCompleter = completer;
149
+ Future <_WebSocketConnection > _connect () async {
150
+ if (_disposed) {
151
+ throw StateError ('Cannot connect: WebSocket connection manager disposed' );
174
152
}
175
153
176
- final attempt = _setNewAttempt (delayDuration);
177
- if (_logger.isLoggable (Level .FINE )) {
178
- _logger.fine (() {
179
- final durationString = delayDuration == Duration .zero
180
- ? 'now'
181
- : 'in ${delayDuration .inMilliseconds } milliseconds' ;
182
- return 'Triggering attempt #$_connectionAttempts (id: ${attempt .idAsString }) to connect $durationString ' ;
183
- });
154
+ final connectionCompleter = Completer <_WebSocketConnection >();
155
+ final connectionFuture = _pendingConnection = connectionCompleter.future;
156
+
157
+ while (! connectionCompleter.isCompleted) {
158
+ final delay = _reconnectDelay ();
159
+ _connectionAttempts++ ;
160
+
161
+ _WebSocketConnection ? connection;
162
+ try {
163
+ connection = await _runConnectionAttempt (delay);
164
+ } catch (error, stackTrace) {
165
+ _logger.warning ('Failed to initialize connection' , error, stackTrace);
166
+ } finally {
167
+ if (_disposed) {
168
+ // Manager was disposed while running connection attempt.
169
+ connection? .close (goingAway, 'Client disposed' );
170
+ connectionCompleter.completeError (StateError ('Client disposed' ));
171
+ } else if (connectionFuture != _pendingConnection) {
172
+ connection? .close (normalClosure, 'Closing obsolete connection' );
173
+ if (_pendingConnection == null ) {
174
+ // stop() was called during connection attempt.
175
+ connectionCompleter
176
+ .completeError (StateError ('Connection attempt aborted' ));
177
+ } else {
178
+ // _startConnecting() was called during connection attempt, return the
179
+ // new Future instead.
180
+ connectionCompleter.complete (_pendingConnection);
181
+ }
182
+ } else if (connection != null ) {
183
+ // Correctly established connection.
184
+ _logger.fine ('Established WebSocket connection' );
185
+ _connectionAttempts = 0 ;
186
+ connectionCompleter.complete (connection);
187
+ }
188
+ }
184
189
}
185
190
186
- attempt.completionFuture
187
- .then (
188
- (_) => _attemptConnectionWithCompleter (connectionCompleter, attempt),
189
- )
190
- .catchError (
191
- (error) => _logger.info (
192
- 'Pending connection attempt ${attempt .idAsString } aborted' ,
193
- ),
194
- );
195
-
196
191
return connectionCompleter.future;
197
192
}
198
193
199
- void _attemptConnectionWithCompleter (
200
- Completer <_WebSocketConnection > connectionCompleter,
201
- SocketConnectionAttempt attempt,
194
+ Future <_WebSocketConnection > _runConnectionAttempt (
195
+ Duration delay,
202
196
) async {
203
- if (_disposed) {
204
- connectionCompleter.completeError (StateError (
205
- 'WebSocket connection manager disposed during connection delay' ,
206
- ));
207
- return ;
197
+ final attempt = _currentAttempt = SocketConnectionAttempt (delay: delay);
198
+ if (_logger.isLoggable (Level .FINE )) {
199
+ _logger.fine (() {
200
+ final durationString = delay == Duration .zero
201
+ ? 'now'
202
+ : 'in ${delay .inMilliseconds } milliseconds' ;
203
+ return 'Triggering attempt #$_connectionAttempts (id: ${attempt .idAsString }) to connect $durationString ' ;
204
+ });
208
205
}
209
206
207
+ await attempt.delayFuture;
208
+
210
209
if (attempt != _currentAttempt) {
211
- // This would be odd, but I've seen enough.
212
- if (_pendingConnection != null ) {
213
- _logger.warning ('Pending connection updated while delaying' );
214
- connectionCompleter.complete (_pendingConnection! );
215
- return ;
216
- } else {
217
- // We probably don't want to connect anymore.
218
- connectionCompleter.completeError (
219
- StateError ('Connection aborted' ),
220
- StackTrace .current,
221
- );
222
- return ;
223
- }
210
+ throw StateError ('Current attempt obsoleted while delaying' );
224
211
}
225
212
226
213
try {
@@ -229,29 +216,20 @@ class SocketConnectionManager {
229
216
callbacks: _ConnectionCallbacks (attempt: attempt, manager: this ),
230
217
);
231
218
if (attempt == _currentAttempt) {
232
- connectionCompleter. complete ( connection) ;
219
+ return connection;
233
220
} else {
234
221
connection.close (normalClosure, 'Closing unnecessary connection' );
222
+ throw ConnectionInitializationException (
223
+ 'Current attempt obsoleted while delaying' ,
224
+ StackTrace .current,
225
+ );
235
226
}
236
227
} catch (error, stackTrace) {
237
- _logger.warning ('Failed to initialize connection' , error, stackTrace);
238
-
239
- _onError (error, stackTrace);
240
-
241
- // Checks just as above.
242
228
if (attempt == _currentAttempt) {
243
- _attemptConnection (_reconnectDelay (), connectionCompleter);
244
- } else if (_pendingConnection != null ) {
245
- _logger.warning (
246
- 'Pending connection updated while waiting for initialization' ,
247
- );
248
- connectionCompleter.complete (_pendingConnection);
249
- } else {
250
- connectionCompleter.completeError (
251
- StateError ('Connection aborted' ),
252
- StackTrace .current,
253
- );
229
+ _onError (error, stackTrace);
254
230
}
231
+
232
+ rethrow ;
255
233
}
256
234
}
257
235
@@ -262,7 +240,8 @@ class SocketConnectionManager {
262
240
}
263
241
}
264
242
265
- /// Wraps upstream callbacks to filter out obsolete or invalid callbacks from _WsConnection
243
+ /// Wraps upstream callbacks to filter out obsolete or invalid callbacks from
244
+ /// _WebSocketConnection.
266
245
final class _ConnectionCallbacks {
267
246
_ConnectionCallbacks ({
268
247
required this .attempt,
@@ -331,11 +310,11 @@ final class _ConnectionCallbacks {
331
310
case WebSocketClosed ():
332
311
if (_logger.isLoggable (Level .FINE )) {
333
312
_logger.fine (
334
- 'Socket closed, ${manager ._disposed ? ' not ' : '' }attempting to reconnect' ,
313
+ 'Socket closed, ${manager ._shouldAttemptReconnection ? ' not ' : '' }attempting to reconnect' ,
335
314
);
336
315
}
337
- if (! manager._disposed ) {
338
- manager._attemptConnection (manager. _reconnectDelay () );
316
+ if (! manager._shouldAttemptReconnection ) {
317
+ manager._connect ( );
339
318
}
340
319
case WebSocketReady ():
341
320
manager._connectionAttempts = 0 ;
0 commit comments