Skip to content

Commit 466875c

Browse files
committed
Restructuring lifetime
1 parent 069b56b commit 466875c

6 files changed

+242
-240
lines changed

lib/src/delayed_callback.dart

+85
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
import 'dart:async';
2+
import 'dart:math';
3+
4+
final _random = Random();
5+
6+
/// Like [Future.delayed], but allows some control of the delay before the
7+
/// callback execution.
8+
final class DelayedCallback<T> {
9+
// Created a socket connection attempt whose delayFuture will complete after
10+
// the given delay.
11+
DelayedCallback({
12+
required Duration delay,
13+
required Future<T> Function() callback,
14+
}) : _callback = callback {
15+
_delayCompleter.future
16+
.then((_) => _runCallback())
17+
.catchError(_callbackCompleter.completeError);
18+
19+
_delayTimer = Timer(delay, _delayCompleter.complete);
20+
}
21+
22+
final int _id = _random.nextInt(1 << 32);
23+
late final idAsString = _id.toRadixString(16).padLeft(8, '0');
24+
25+
late Timer _delayTimer;
26+
final _delayCompleter = Completer<void>();
27+
28+
final Future<T> Function() _callback;
29+
bool _callbackRan = false;
30+
final _callbackCompleter = Completer<T>();
31+
late final Future<T> callbackFuture = _callbackCompleter.future;
32+
33+
void _runCallback() {
34+
if (!_callbackRan) {
35+
_callbackRan = true;
36+
// This way the _callbackCompleter stays uncompleted until callback ends.
37+
_callback()
38+
.then(_callbackCompleter.complete)
39+
.catchError(_callbackCompleter.completeError);
40+
}
41+
}
42+
43+
/// Whether the delay has expired. Does not guarantee that the callback was
44+
/// executed.
45+
bool get delayDone => !_delayTimer.isActive;
46+
47+
// Immediately skips delay and executes callback. Has no effect if the delay
48+
// has expired already.
49+
void skipDelay() {
50+
if (_delayTimer.isActive) {
51+
_delayTimer.cancel();
52+
}
53+
if (!_delayCompleter.isCompleted) {
54+
_delayCompleter.complete();
55+
}
56+
}
57+
58+
// Immediately completes delay with an error. The [callbackFuture] is going
59+
// to be completed with an error. Has no effect if the delay has expired
60+
// already.
61+
void abort() {
62+
if (_delayTimer.isActive) {
63+
_delayTimer.cancel();
64+
}
65+
if (!_delayCompleter.isCompleted) {
66+
_delayCompleter.completeError(
67+
'Connection attempt $idAsString aborted.',
68+
StackTrace.current,
69+
);
70+
}
71+
}
72+
73+
@override
74+
bool operator ==(Object other) {
75+
return other is DelayedCallback && _id == other._id;
76+
}
77+
78+
@override
79+
int get hashCode => _id.hashCode;
80+
81+
@override
82+
String toString() {
83+
return 'SocketConnectionAttempt(id: $idAsString)';
84+
}
85+
}

lib/src/socket.dart

+66-49
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,10 @@ class PhoenixSocket {
2929

3030
/// The factory to use to create the WebSocketChannel.
3131
WebSocketChannel Function(Uri uri)? webSocketChannelFactory,
32-
}) {
33-
_options = socketOptions ?? PhoenixSocketOptions();
34-
32+
}) : _endpoint = endpoint,
33+
_options = socketOptions ?? const PhoenixSocketOptions(),
34+
_webSocketChannelFactory =
35+
webSocketChannelFactory ?? WebSocketChannel.connect {
3536
_messageStream =
3637
_receiveStreamController.stream.map(_options.serializer.decode);
3738
_openStream =
@@ -41,20 +42,6 @@ class PhoenixSocket {
4142
_errorStream =
4243
_stateEventStreamController.stream.whereType<PhoenixSocketErrorEvent>();
4344

44-
_connectionManager = SocketConnectionManager(
45-
factory: () async {
46-
final mountPoint = await _buildMountPoint(endpoint, _options);
47-
return (webSocketChannelFactory ?? WebSocketChannel.connect)
48-
.call(mountPoint);
49-
},
50-
reconnectDelays: _options.reconnectDelays,
51-
onMessage: onSocketDataCallback,
52-
onError: (error, [stackTrace]) => _stateEventStreamController.add(
53-
PhoenixSocketErrorEvent(error: error, stacktrace: stackTrace),
54-
),
55-
onStateChange: _socketStateStream.add,
56-
);
57-
5845
_subscriptions = [
5946
_messageStream.listen(_onMessage),
6047
_openStream.listen((_) => _isOpen = true),
@@ -64,18 +51,12 @@ class PhoenixSocket {
6451
];
6552
}
6653

67-
static Future<Uri> _buildMountPoint(
68-
String endpoint,
69-
PhoenixSocketOptions options,
70-
) async {
71-
var decodedUri = Uri.parse(endpoint);
72-
final params = await options.getParams();
73-
final queryParams = decodedUri.queryParameters.entries.toList()
74-
..addAll(params.entries.toList());
75-
return decodedUri.replace(
76-
queryParameters: Map.fromEntries(queryParams),
77-
);
78-
}
54+
final String _endpoint;
55+
final PhoenixSocketOptions _options;
56+
final WebSocketChannel Function(Uri uri)? _webSocketChannelFactory;
57+
58+
/// Default duration for a connection timeout.
59+
Duration get defaultTimeout => _options.timeout;
7960

8061
final Map<String, Completer<Message>> _pendingMessages = {};
8162
final Map<String, Stream<Message>> _topicStreams = {};
@@ -92,12 +73,7 @@ class PhoenixSocket {
9273
late Stream<PhoenixSocketCloseEvent> _closeStream;
9374
late Stream<PhoenixSocketErrorEvent> _errorStream;
9475
late Stream<Message> _messageStream;
95-
late SocketConnectionManager _connectionManager;
96-
97-
late PhoenixSocketOptions _options;
98-
99-
/// Default duration for a connection timeout.
100-
Duration get defaultTimeout => _options.timeout;
76+
SocketConnectionManager? _connectionManager;
10177

10278
_StreamRouter<Message>? _router;
10379

@@ -171,9 +147,8 @@ class PhoenixSocket {
171147
/// To check whether the socket is ready for use, use [isOpen], or await on an
172148
/// event from [openStream].
173149
///
174-
/// If [immediately] is set to `true`, then if a
175-
/// connection is not established, it will attempt to connect to a socket
176-
/// without delay.
150+
/// If [immediately] is set to `true` and if a connection is not established,
151+
/// it will attempt to connect to a socket without delay.
177152
Future<void> connect({bool immediately = false}) async {
178153
if (_disposed) {
179154
throw StateError('PhoenixSocket cannot connect after being disposed.');
@@ -188,13 +163,46 @@ class PhoenixSocket {
188163
if (isOpen) {
189164
return;
190165
} else if (!_socketStateStream.hasValue || _isConnectingOrConnected) {
191-
return _connectionManager.start(immediately: immediately);
166+
await (_connectionManager ??= _createConnectionManager())
167+
.start(immediately: immediately);
192168
} else {
193-
return _reconnect(
169+
await _reconnect(
194170
normalClosure, // Any code is a good code.
195171
immediately: immediately,
196172
);
197173
}
174+
175+
assert(
176+
_stateEventStreamController.valueOrNull is! PhoenixSocketOpenEvent,
177+
'Phoenix socket should not be open at this stage',
178+
);
179+
await _openStream.first;
180+
}
181+
182+
SocketConnectionManager _createConnectionManager() {
183+
return SocketConnectionManager(
184+
factory: () async {
185+
final mountPoint = await _buildMountPoint();
186+
return (_webSocketChannelFactory ?? WebSocketChannel.connect)
187+
.call(mountPoint);
188+
},
189+
reconnectDelays: _options.reconnectDelays,
190+
onMessage: onSocketDataCallback,
191+
onError: (error, [stackTrace]) => _stateEventStreamController.add(
192+
PhoenixSocketErrorEvent(error: error, stacktrace: stackTrace),
193+
),
194+
onStateChange: _socketStateStream.add,
195+
);
196+
}
197+
198+
Future<Uri> _buildMountPoint() async {
199+
var decodedUri = Uri.parse(_endpoint);
200+
final params = await _options.getParams();
201+
final queryParams = decodedUri.queryParameters.entries.toList()
202+
..addAll(params.entries.toList());
203+
return decodedUri.replace(
204+
queryParameters: Map.fromEntries(queryParams),
205+
);
198206
}
199207

200208
/// Close the underlying connection supporting the socket.
@@ -255,19 +263,27 @@ class PhoenixSocket {
255263
throw StateError('Cannot reconnect a disposed socket');
256264
}
257265

258-
if (_socketStateStream.hasValue) {
259-
await _closeConnection(code, reason: reason);
266+
if (_connectionManager == null) {
267+
return connect(immediately: immediately);
260268
}
261-
_connectionManager.start(immediately: immediately);
269+
270+
_connectionManager!
271+
.reconnect(code, reason: reason, immediately: immediately);
262272
}
263273

264274
Future<void> _closeConnection(int code, {String? reason}) async {
265275
if (_disposed) {
266276
_logger.warning('Cannot close a disposed socket');
267277
}
268-
if (_isConnectingOrConnected) {
269-
_connectionManager.stop(code, reason);
270-
} else if (_socketStateStream.valueOrNull is! WebSocketDisconnected) {
278+
if (_connectionManager != null) {
279+
_connectionManager!.dispose(code, reason);
280+
_connectionManager = null;
281+
}
282+
if (_stateEventStreamController.valueOrNull is! PhoenixSocketCloseEvent) {
283+
_stateEventStreamController
284+
.add(PhoenixSocketCloseEvent(code: code, reason: reason));
285+
}
286+
if (_socketStateStream.valueOrNull is! WebSocketDisconnected) {
271287
await _socketStateStream
272288
.firstWhere((state) => state is WebSocketDisconnected);
273289
}
@@ -295,11 +311,12 @@ class PhoenixSocket {
295311
return (_pendingMessages[message.ref!] = Completer<Message>()).future;
296312
}
297313

298-
void _addToSink(String data) {
314+
Future<void> _addToSink(String data) async {
299315
if (_disposed) {
300-
return;
316+
throw StateError('Cannot add messages to a disposed socket');
301317
}
302-
_connectionManager.addMessage(data);
318+
319+
_connectionManager!.addMessage(data);
303320
}
304321

305322
/// CHANNELS

0 commit comments

Comments
 (0)