Skip to content

Commit

Permalink
feat(socketio): notify client of transport completion
Browse files Browse the repository at this point in the history
When the async client connects to the server, it spawns a new thread to
handle the arriving messages asynchronously, which is immediately
detached with no chance of awaiting its completion.

For long-running programs (e.g. a client program that never really disconnects
from the server) this can be problematic, as unexpected stream completions
would go unnoticed. This may happen if the underlying tungstenite websocket
shuts down ('Received close frame: None') but there are no engineio/socketio
close frames. Hence, since the stream terminates, the message handling task
stops without a Close or Error event being fired.

Thus, we now fire an additional Event::Close when the stream terminates,
to signal the (potentially unexpected) close to the user.
  • Loading branch information
sirkrypt0 committed Sep 22, 2024
1 parent 64d2a13 commit 2fbdc79
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 2 deletions.
7 changes: 7 additions & 0 deletions ci/socket-io.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ var callback = client => {
ack(Buffer.from([1, 2, 3]));
}
});

client.on('transport_close', data => {
console.log(['transport_close', 'Transport close received'])
// Close underlying websocket connection
client.client.conn.close();
})

client.emit('Hello from the message event!');
client.emit('test', 'Hello from the test event!');
client.emit(Buffer.from([4, 5, 6]));
Expand Down
60 changes: 58 additions & 2 deletions socketio/src/asynchronous/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{ops::DerefMut, pin::Pin, sync::Arc};

use backoff::{backoff::Backoff, ExponentialBackoffBuilder};
use futures_util::{future::BoxFuture, stream, Stream, StreamExt};
use log::trace;
use log::{error, trace};
use rand::{thread_rng, Rng};
use serde_json::Value;
use tokio::{
Expand Down Expand Up @@ -162,7 +162,21 @@ impl Client {
drop(stream);

let should_reconnect = match *(client_clone.disconnect_reason.read().await) {
DisconnectReason::Unknown => reconnect,
DisconnectReason::Unknown => {
// If we disconnected for an unknown reason, the client might not have noticed
// the closure yet. Hence, fire a transport close event to notify it.
// We don't need to do that in the other cases, since proper server close
// and manual client close are handled explicitly.
if let Some(err) = client_clone
.callback(&Event::Close, "transport close")
.await
.err()
{
error!("Error while notifying client of transport close: {err}")
}

reconnect
}
DisconnectReason::Manual => false,
DisconnectReason::Server => reconnect_on_disconnect,
};
Expand Down Expand Up @@ -590,6 +604,7 @@ mod test {
},
error::Result,
packet::{Packet, PacketId},
Event,
Payload, TransportType,
};

Expand Down Expand Up @@ -926,6 +941,47 @@ mod test {
Ok(())
}

#[tokio::test]
async fn socket_io_transport_close() -> Result<()> {
let url = crate::test::socket_io_server();

let (tx, mut rx) = mpsc::channel(1);

let notify = Arc::new(tokio::sync::Notify::new());
let notify_clone = notify.clone();

let socket = ClientBuilder::new(url)
.on(Event::Connect, move |_, _| {
let cl = notify_clone.clone();
async move {
cl.notify_one();
}
.boxed()
})
.on(Event::Close, move |payload, _| {
let clone_tx = tx.clone();
async move { clone_tx.send(payload).await.unwrap() }.boxed()
})
.connect()
.await?;

// Wait until socket is connected
let connect_timeout = timeout(Duration::from_secs(1), notify.notified()).await;
assert!(connect_timeout.is_ok());

// Instruct server to close transport
let result = socket.emit("transport_close", Payload::from("")).await;
assert!(result.is_ok());

// Wait for Event::Close
let rx_timeout = timeout(Duration::from_secs(1), rx.recv()).await;
assert!(rx_timeout.is_ok());

assert_eq!(rx_timeout.unwrap(), Some(Payload::from("transport close")));

Ok(())
}

#[tokio::test]
async fn socketio_polling_integration() -> Result<()> {
let url = crate::test::socket_io_server();
Expand Down

0 comments on commit 2fbdc79

Please sign in to comment.