Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(socketio): notify client of transport completion #449

Merged
merged 2 commits into from
Nov 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions ci/socket-io.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ var callback = client => {
ack(Buffer.from([1, 2, 3]));
}
});

// This event allows the test framework to arbitrarily close the underlying connection
client.on('close_transport', data => {
console.log(['close_transport', 'Request to close transport received'])
// Close underlying websocket connection
client.client.conn.close();
})

client.emit('Hello from the message event!');
client.emit('test', 'Hello from the test event!');
client.emit(Buffer.from([4, 5, 6]));
Expand Down
69 changes: 64 additions & 5 deletions socketio/src/asynchronous/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{ops::DerefMut, pin::Pin, sync::Arc};

use backoff::{backoff::Backoff, ExponentialBackoffBuilder};
use futures_util::{future::BoxFuture, stream, Stream, StreamExt};
use log::trace;
use log::{error, trace};
use rand::{thread_rng, Rng};
use serde_json::Value;
use tokio::{
Expand All @@ -19,7 +19,7 @@ use crate::{
asynchronous::socket::Socket as InnerSocket,
error::{Error, Result},
packet::{Packet, PacketId},
Event, Payload,
CloseReason, Event, Payload,
};

#[derive(Default)]
Expand Down Expand Up @@ -162,7 +162,21 @@ impl Client {
drop(stream);

let should_reconnect = match *(client_clone.disconnect_reason.read().await) {
DisconnectReason::Unknown => reconnect,
DisconnectReason::Unknown => {
// If we disconnected for an unknown reason, the client might not have noticed
// the closure yet. Hence, fire a transport close event to notify it.
// We don't need to do that in the other cases, since proper server close
// and manual client close are handled explicitly.
if let Some(err) = client_clone
.callback(&Event::Close, CloseReason::TransportClose.as_str())
.await
.err()
{
error!("Error while notifying client of transport close: {err}")
}

reconnect
}
DisconnectReason::Manual => false,
DisconnectReason::Server => reconnect_on_disconnect,
};
Expand Down Expand Up @@ -510,7 +524,8 @@ impl Client {
}
PacketId::Disconnect => {
*(self.disconnect_reason.write().await) = DisconnectReason::Server;
self.callback(&Event::Close, "").await?;
self.callback(&Event::Close, CloseReason::IOServerDisconnect.as_str())
.await?;
}
PacketId::ConnectError => {
self.callback(
Expand Down Expand Up @@ -590,7 +605,7 @@ mod test {
},
error::Result,
packet::{Packet, PacketId},
Payload, TransportType,
CloseReason, Event, Payload, TransportType,
};

#[tokio::test]
Expand Down Expand Up @@ -926,6 +941,50 @@ mod test {
Ok(())
}

#[tokio::test]
async fn socket_io_transport_close() -> Result<()> {
let url = crate::test::socket_io_server();

let (tx, mut rx) = mpsc::channel(1);

let notify = Arc::new(tokio::sync::Notify::new());
let notify_clone = notify.clone();

let socket = ClientBuilder::new(url)
.on(Event::Connect, move |_, _| {
let cl = notify_clone.clone();
async move {
cl.notify_one();
}
.boxed()
})
.on(Event::Close, move |payload, _| {
let clone_tx = tx.clone();
async move { clone_tx.send(payload).await.unwrap() }.boxed()
})
.connect()
.await?;

// Wait until socket is connected
let connect_timeout = timeout(Duration::from_secs(1), notify.notified()).await;
assert!(connect_timeout.is_ok());

// Instruct server to close transport
let result = socket.emit("close_transport", Payload::from("")).await;
assert!(result.is_ok());

// Wait for Event::Close
let rx_timeout = timeout(Duration::from_secs(1), rx.recv()).await;
assert!(rx_timeout.is_ok());

assert_eq!(
rx_timeout.unwrap(),
Some(Payload::from(CloseReason::TransportClose.as_str()))
);

Ok(())
}

#[tokio::test]
async fn socketio_polling_integration() -> Result<()> {
let url = crate::test::socket_io_server();
Expand Down
6 changes: 3 additions & 3 deletions socketio/src/client/raw_client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::callback::Callback;
use crate::packet::{Packet, PacketId};
use crate::Error;
pub(crate) use crate::{event::Event, payload::Payload};
pub(crate) use crate::{event::CloseReason, event::Event, payload::Payload};
use rand::{thread_rng, Rng};
use serde_json::Value;

Expand Down Expand Up @@ -149,7 +149,7 @@ impl RawClient {
let _ = self.socket.send(disconnect_packet);
self.socket.disconnect()?;

let _ = self.callback(&Event::Close, ""); // trigger on_close
let _ = self.callback(&Event::Close, CloseReason::IOClientDisconnect.as_str()); // trigger on_close
Ok(())
}

Expand Down Expand Up @@ -372,7 +372,7 @@ impl RawClient {
self.callback(&Event::Connect, "")?;
}
PacketId::Disconnect => {
self.callback(&Event::Close, "")?;
self.callback(&Event::Close, CloseReason::IOServerDisconnect.as_str())?;
}
PacketId::ConnectError => {
self.callback(
Expand Down
35 changes: 35 additions & 0 deletions socketio/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,38 @@ impl Display for Event {
f.write_str(self.as_str())
}
}

/// A `CloseReason` is the payload of the [`Event::Close`] and specifies the reason for
/// why it was fired.
/// These are aligned with the official Socket.IO disconnect reasons, see
/// https://socket.io/docs/v4/client-socket-instance/#disconnect
#[derive(Debug, PartialEq, PartialOrd, Clone, Eq, Hash)]
pub enum CloseReason {
IOServerDisconnect,
IOClientDisconnect,
TransportClose,
}

impl CloseReason {
pub fn as_str(&self) -> &str {
match self {
// Inspired by https://github.com/socketio/socket.io/blob/d0fc72042068e7eaef448941add617f05e1ec236/packages/socket.io-client/lib/socket.ts#L865
CloseReason::IOServerDisconnect => "io server disconnect",
// Inspired by https://github.com/socketio/socket.io/blob/d0fc72042068e7eaef448941add617f05e1ec236/packages/socket.io-client/lib/socket.ts#L911
CloseReason::IOClientDisconnect => "io client disconnect",
CloseReason::TransportClose => "transport close",
}
}
}

impl From<CloseReason> for String {
fn from(event: CloseReason) -> Self {
Self::from(event.as_str())
}
}

impl Display for CloseReason {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
f.write_str(self.as_str())
}
}
2 changes: 1 addition & 1 deletion socketio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ pub mod asynchronous;

pub use error::Error;

pub use {event::Event, payload::Payload};
pub use {event::CloseReason, event::Event, payload::Payload};

pub use client::{ClientBuilder, RawClient, TransportType};

Expand Down
Loading