Skip to content

Commit 77a41bb

Browse files
committed
Refactors PhoenixSocket.sendMessage to get better control over heartbeat
1 parent 30a397e commit 77a41bb

File tree

2 files changed

+28
-26
lines changed

2 files changed

+28
-26
lines changed

lib/src/socket.dart

+18-12
Original file line numberDiff line numberDiff line change
@@ -299,24 +299,30 @@ class PhoenixSocket {
299299
/// Returns a future that completes when the reply for the sent message is
300300
/// received. If your flow awaits for the result of this future, add a timeout
301301
/// to it so that you are not stuck in case that the reply is never received.
302-
Future<Message> sendMessage(Message message) {
302+
Future<Message> sendMessage(Message message) async {
303+
return (_pendingMessages[message.ref!] = await _sendMessage(message))
304+
.future;
305+
}
306+
307+
Future<Completer<Message>> _sendMessage(Message message) async {
308+
if (_disposed) {
309+
throw StateError('Cannot add messages to a disposed socket');
310+
}
311+
303312
if (message.ref == null) {
304313
throw ArgumentError.value(
305314
message,
306315
'message',
307316
'does not contain a ref',
308317
);
309318
}
310-
_addToSink(_options.serializer.encode(message));
311-
return (_pendingMessages[message.ref!] = Completer<Message>()).future;
312-
}
313319

314-
Future<void> _addToSink(String data) async {
315-
if (_disposed) {
316-
throw StateError('Cannot add messages to a disposed socket');
320+
if (_connectionManager == null) {
321+
throw StateError('Cannot add messages to a disconnected socket');
317322
}
318323

319-
_connectionManager!.addMessage(data);
324+
await _connectionManager!.addMessage(_options.serializer.encode(message));
325+
return _pendingMessages[message.ref!] = Completer<Message>();
320326
}
321327

322328
/// CHANNELS
@@ -406,15 +412,15 @@ class PhoenixSocket {
406412

407413
try {
408414
final heartbeatMessage = Message.heartbeat(nextRef);
409-
// No await here, since the heartbeat will already be completed when it
410-
// returns, and `_pendingMessages[heartbeatRef]` won't exist.
411-
sendMessage(heartbeatMessage);
415+
// Using _sendMessages to wait for potential problems when sending message
416+
// (eg. WebSocket failure), but not until the response is reported.
417+
final heartbeatCompleter = await _sendMessage(heartbeatMessage);
412418
_logger.fine('Heartbeat ${heartbeatMessage.ref} sent');
413419
final heartbeatRef = _latestHeartbeatRef = heartbeatMessage.ref!;
414420

415421
_heartbeatTimeout = _scheduleHeartbeat(heartbeatRef);
416422

417-
await _pendingMessages[heartbeatRef]!.future;
423+
await heartbeatCompleter.future;
418424
_logger.fine('Heartbeat $heartbeatRef completed');
419425
return true;
420426
} on WebSocketChannelException catch (error, stackTrace) {

test/socket_test.dart

+10-14
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@ void main() {
1919
Completer<void>(),
2020
Completer<void>(),
2121
];
22+
final streamControllers = [
23+
StreamController<String>(),
24+
StreamController<String>(),
25+
StreamController<String>(),
26+
];
2227

2328
// This code might throw asynchronous errors, prevent the test from failing
2429
// if these are expected ones (as defined in `exceptions`).
@@ -43,27 +48,18 @@ void main() {
4348
// On the third attempt, the sink add should work as expected.
4449
if (invocations < 2) {
4550
doneCompleter.complete();
51+
streamControllers[invocations].close();
4652
throw exceptions[invocations++];
4753
}
54+
streamControllers[invocations]
55+
.add(jsonEncode(Message.heartbeat('$invocations').encode()));
4856
});
4957
return sink;
5058
});
5159
when(websocket.ready).thenAnswer((_) => Future.value());
5260

53-
when(websocket.stream).thenAnswer((_) {
54-
final invocation = invocations;
55-
final controller = StreamController<String>();
56-
// Close controller when sink is done.
57-
sinkCompleters[invocation].future.then((_) {
58-
return controller.close();
59-
});
60-
if (invocation == 2) {
61-
// Automatically emit heartbeat response.
62-
controller
63-
.add(jsonEncode(Message.heartbeat('$invocation').encode()));
64-
}
65-
return controller.stream;
66-
});
61+
when(websocket.stream)
62+
.thenAnswer((_) => streamControllers[invocations].stream);
6763

6864
// Connect to the socket
6965
await phoenixSocket.connect();

0 commit comments

Comments
 (0)