Skip to content

Commit 154b027

Browse files
authored
Add more resilience over connectivity issues (braverhealth#79)
* add toxiproxy service for testing * add tests for certain connection issues * fixes * fix service name * fix logging flag
1 parent c5bf093 commit 154b027

13 files changed

+322
-58
lines changed

.github/workflows/test.yaml

+7-1
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,17 @@ jobs:
1313
runs-on: ubuntu-latest
1414

1515
services:
16-
phoenix:
16+
backend:
1717
image: braverhq/phoenix-dart-server
1818
ports:
1919
- 4001:4001
2020
- 4002:4002
21+
22+
proxy:
23+
image: ghcr.io/shopify/toxiproxy
24+
ports:
25+
- 8474:8474
26+
- 4004:4004
2127

2228
steps:
2329

example/backend/Dockerfile

+12-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,15 @@
1-
FROM bitwalker/alpine-elixir-phoenix
1+
FROM hexpm/elixir:1.15.7-erlang-26.2.2-alpine-3.19.1
2+
3+
RUN apk update && apk upgrade && \
4+
apk add --no-cache \
5+
ca-certificates \
6+
bash \
7+
curl \
8+
git
9+
10+
RUN echo America/Montreal > /etc/timezone
11+
RUN mix local.hex --force && \
12+
mix local.rebar --force
213

314
EXPOSE 4001
415

example/backend/compose.yaml

+14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
services:
2+
backend:
3+
build:
4+
context: .
5+
dockerfile: Dockerfile
6+
ports:
7+
- 4001:4001
8+
- 4002:4002
9+
10+
proxy:
11+
image: ghcr.io/shopify/toxiproxy
12+
ports:
13+
- 8474:8474
14+
- 4004:4004

lib/src/channel.dart

+6-2
Original file line numberDiff line numberDiff line change
@@ -278,12 +278,16 @@ class PhoenixChannel {
278278
);
279279

280280
if (canPush) {
281+
_logger.finest(() => 'Sending out push ${pushEvent.ref}');
281282
pushEvent.send();
282283
} else {
283-
if (_state == PhoenixChannelState.closed) {
284-
throw ChannelClosedError('Can\'t push event on a closed channel');
284+
if (_state == PhoenixChannelState.closed ||
285+
_state == PhoenixChannelState.errored) {
286+
throw ChannelClosedError('Can\'t push event on a $_state channel');
285287
}
286288

289+
_logger.finest(
290+
() => 'Buffering push ${pushEvent.ref} for later send ($_state)');
287291
pushBuffer.add(pushEvent);
288292
}
289293

lib/src/events.dart

+4-2
Original file line numberDiff line numberDiff line change
@@ -133,13 +133,15 @@ class PhoenixChannelEvent {
133133
value.startsWith(__replyEventName);
134134

135135
/// Whether the event name is a 'channel reply' event
136-
bool get isChannelReply =>
137-
value.startsWith(__chanReplyEventName);
136+
bool get isChannelReply => value.startsWith(__chanReplyEventName);
138137

139138
@override
140139
bool operator ==(Object other) =>
141140
other is PhoenixChannelEvent && other.value == value;
142141

143142
@override
144143
int get hashCode => Object.hash(runtimeType, value);
144+
145+
@override
146+
String toString() => 'PhoenixChannelEvent($value)';
145147
}

lib/src/push.dart

+5-2
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ class Push {
202202

203203
_timeoutTimer ??= Timer(timeout!, () {
204204
_timeoutTimer = null;
205-
_logger.warning('Push $ref timed out');
205+
_logger.warning(() => 'Push $ref timed out');
206206
_channel.trigger(Message.timeoutFor(ref));
207207
});
208208
}
@@ -268,6 +268,7 @@ class Push {
268268
// Remove existing waiters and reset completer
269269
void cleanUp() {
270270
if (_sent) {
271+
_logger.fine('Cleaning up completer');
271272
clearReceivers();
272273
_responseCompleter = Completer();
273274
}
@@ -279,7 +280,9 @@ class Push {
279280
if (response.event == replyEvent) {
280281
trigger(PushResponse.fromMessage(response));
281282
}
282-
} else {
283+
} else if (event != PhoenixChannelEvent.join) {
284+
_logger.finest(
285+
() => "Completing with error: ${_responseCompleter.hashCode}");
283286
if (!_responseCompleter.isCompleted) {
284287
_responseCompleter.completeError(response);
285288
clearReceivers();

phoenix_socket.code-workspace

+27-12
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,29 @@
11
{
2-
"folders": [
3-
{
4-
"path": "."
5-
},
6-
{
7-
"path": "example/backend"
8-
}
9-
],
10-
"settings": {
11-
"editor.formatOnSave": true,
12-
"elixirLS.fetchDeps": true
13-
}
2+
"folders": [
3+
{
4+
"path": "."
5+
},
6+
{
7+
"path": "example/backend"
8+
}
9+
],
10+
"settings": {
11+
"editor.formatOnSave": true,
12+
"elixirLS.fetchDeps": true,
13+
"[dart]": {
14+
"editor.tabSize": 2,
15+
"editor.insertSpaces": true,
16+
"editor.detectIndentation": false,
17+
"editor.suggest.insertMode": "replace",
18+
"editor.formatOnSave": true,
19+
"editor.codeActionsOnSave": {
20+
"source.organizeImports": "explicit",
21+
"source.fixAll": "explicit"
22+
}
23+
},
24+
"dart.env": {
25+
"LOG_ALL_LEVELS": "y"
26+
},
27+
"dart.testAdditionalArgs": ["--chain-stack-traces"]
28+
}
1429
}

test/channel_integration_test.dart

+142-11
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,25 @@ import 'dart:async';
33
import 'package:phoenix_socket/phoenix_socket.dart';
44
import 'package:test/test.dart';
55

6-
import 'control.dart';
6+
import 'helpers/logging.dart';
7+
import 'helpers/proxy.dart';
78

8-
void main() {
9-
const addr = 'ws://localhost:4001/socket/websocket';
9+
Set<int> usedPorts = {};
1010

11+
void main() {
1112
group('PhoenixChannel', () {
13+
const addr = 'ws://localhost:4004/socket/websocket';
14+
15+
setUpAll(() {
16+
maybeActivateAllLogLevels();
17+
});
18+
1219
setUp(() async {
13-
await restartBackend();
20+
await prepareProxy();
21+
});
22+
23+
tearDown(() async {
24+
await destroyProxy();
1425
});
1526

1627
test('can join a channel through a socket', () async {
@@ -28,10 +39,11 @@ void main() {
2839

2940
test('can join a channel through a socket that starts closed then connects',
3041
() async {
42+
await haltThenResumeProxy();
43+
3144
final socket = PhoenixSocket(addr);
3245
final completer = Completer<void>();
3346

34-
await stopThenRestartBackend();
3547
await socket.connect();
3648

3749
socket.addChannel(topic: 'channel1').join().onReply('ok', (reply) {
@@ -50,10 +62,10 @@ void main() {
5062

5163
await socket.connect();
5264

53-
await stopBackend();
65+
await haltProxy();
5466
final joinFuture = socket.addChannel(topic: 'channel1').join();
5567
Future.delayed(const Duration(milliseconds: 300))
56-
.then((value) => restartBackend());
68+
.then((value) => resumeProxy());
5769

5870
joinFuture.onReply('ok', (reply) {
5971
expect(reply.status, equals('ok'));
@@ -63,6 +75,32 @@ void main() {
6375
await completer.future;
6476
});
6577

78+
test(
79+
'can join a channel through a socket that gets a "peer reset" before join but reconnects',
80+
() async {
81+
final socket = PhoenixSocket(addr);
82+
final completer = Completer<void>();
83+
84+
await socket.connect();
85+
addTearDown(() {
86+
socket.close();
87+
});
88+
await resetPeer();
89+
90+
runZonedGuarded(() {
91+
final joinFuture = socket.addChannel(topic: 'channel1').join();
92+
joinFuture.onReply('ok', (reply) {
93+
expect(reply.status, equals('ok'));
94+
completer.complete();
95+
});
96+
}, (error, stack) {});
97+
98+
Future.delayed(const Duration(milliseconds: 1000))
99+
.then((value) => resetPeer(enable: false));
100+
101+
await completer.future;
102+
});
103+
66104
test('can join a channel through an unawaited socket', () async {
67105
final socket = PhoenixSocket(addr);
68106
final completer = Completer<void>();
@@ -137,22 +175,100 @@ void main() {
137175
});
138176

139177
test(
140-
'can send messages to channels that got transiently disconnected and receive a reply',
141-
() async {
178+
'can send messages to channels that got transiently '
179+
'disconnected and receive a reply', () async {
142180
final socket = PhoenixSocket(addr);
143181

144182
await socket.connect();
145183

146184
final channel1 = socket.addChannel(topic: 'channel1');
147185
await channel1.join().future;
148186

149-
await stopThenRestartBackend();
187+
await haltThenResumeProxy();
188+
await socket.openStream.first;
150189

151190
final reply = await channel1.push('hello!', {'foo': 'bar'}).future;
152191
expect(reply.status, equals('ok'));
153192
expect(reply.response, equals({'name': 'bar'}));
154193
});
155194

195+
test(
196+
'can send messages to channels that got "peer reset" '
197+
'and receive a reply', () async {
198+
final socket = PhoenixSocket(addr);
199+
200+
await socket.connect();
201+
202+
final channel1 = socket.addChannel(topic: 'channel1');
203+
await channel1.join().future;
204+
205+
await resetPeerThenResumeProxy();
206+
207+
final push = channel1.push('hello!', {'foo': 'bar'});
208+
final reply = await push.future;
209+
210+
expect(reply.status, equals('ok'));
211+
expect(reply.response, equals({'name': 'bar'}));
212+
});
213+
214+
test(
215+
'throws when sending messages to channels that got "peer reset" '
216+
'and that have not recovered yet', () async {
217+
final socket = PhoenixSocket(addr);
218+
219+
await socket.connect();
220+
221+
final channel1 = socket.addChannel(topic: 'channel1');
222+
await channel1.join().future;
223+
224+
await resetPeer();
225+
226+
final Completer<Object> errorCompleter = Completer();
227+
228+
runZonedGuarded(() async {
229+
final push = channel1.push('hello!', {'foo': 'bar'});
230+
try {
231+
await push.future;
232+
} catch (err) {
233+
errorCompleter.complete(err);
234+
}
235+
}, (error, stack) {});
236+
237+
final Object exception;
238+
expect(exception = await errorCompleter.future, isA<PhoenixException>());
239+
expect((exception as PhoenixException).socketClosed, isNotNull);
240+
});
241+
242+
test(
243+
'throws when sending messages to channels that got disconnected '
244+
'and that have not recovered yet',
245+
() async {
246+
final socket = PhoenixSocket(addr);
247+
248+
await socket.connect();
249+
250+
final channel1 = socket.addChannel(topic: 'channel1');
251+
await channel1.join().future;
252+
253+
await haltProxy();
254+
255+
final Completer<Object> errorCompleter = Completer();
256+
runZonedGuarded(() async {
257+
try {
258+
final push = channel1.push('hello!', {'foo': 'bar'});
259+
await push.future;
260+
} catch (err) {
261+
errorCompleter.complete(err);
262+
}
263+
}, (error, stack) {});
264+
265+
expect(await errorCompleter.future, isA<ChannelClosedError>());
266+
},
267+
timeout: Timeout(
268+
Duration(seconds: 5),
269+
),
270+
);
271+
156272
test('only emits reply messages that are channel replies', () async {
157273
final socket = PhoenixSocket(addr);
158274

@@ -195,6 +311,11 @@ void main() {
195311
final channel2 = socket2.addChannel(topic: 'channel3');
196312
await channel2.join().future;
197313

314+
addTearDown(() {
315+
socket1.close();
316+
socket2.close();
317+
});
318+
198319
expect(
199320
channel1.messages,
200321
emitsInOrder([
@@ -249,6 +370,11 @@ void main() {
249370
final channel2 = socket2.addChannel(topic: 'channel3');
250371
await channel2.join().future;
251372

373+
addTearDown(() {
374+
socket1.close();
375+
socket2.close();
376+
});
377+
252378
channel1.push('ping', {'from': 'socket1'});
253379

254380
expect(
@@ -278,6 +404,11 @@ void main() {
278404
final channel2 = socket2.addChannel(topic: 'channel3');
279405
await channel2.join().future;
280406

407+
addTearDown(() {
408+
socket1.close();
409+
socket2.close();
410+
});
411+
281412
channel1.push('ping', {'from': 'socket1'});
282413

283414
expect(
@@ -315,8 +446,8 @@ void main() {
315446
final socket = PhoenixSocket(addr);
316447
await socket.connect();
317448
final channel = socket.addChannel(topic: 'channel3');
318-
await channel.join().future;
319449

450+
await channel.join().future;
320451
await channel.leave().future;
321452

322453
expect(

0 commit comments

Comments
 (0)