Skip to content

Commit

Permalink
Add on_reconnect callback on the async client
Browse files Browse the repository at this point in the history
Async clients that are using the reconnect feature can now provide an
optional callback that will be invoked before the server is reconnected.
You may optionally update the address or auth settings in this callback
and they will be applied when the client reconnects.

This is useful for updating auth tokens which may have expired by the
time a reconnection is required.
  • Loading branch information
rageshkrishna authored and 1c3t3a committed Mar 28, 2024
1 parent 5ae3d4d commit e863462
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 5 deletions.
46 changes: 43 additions & 3 deletions socketio/src/asynchronous/client/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ use url::Url;
use crate::{error::Result, Event, Payload, TransportType};

use super::{
callback::{Callback, DynAsyncAnyCallback, DynAsyncCallback},
client::Client,
callback::{
Callback, DynAsyncAnyCallback, DynAsyncCallback, DynAsyncReconnectSettingsCallback,
},
client::{Client, ReconnectSettings},
};
use crate::asynchronous::socket::Socket as InnerSocket;

Expand All @@ -21,9 +23,10 @@ use crate::asynchronous::socket::Socket as InnerSocket;
/// namespace is specified, the default namespace `/` is taken. The `connect` method
/// acts the `build` method and returns a connected [`Client`].
pub struct ClientBuilder {
address: String,
pub(crate) address: String,
pub(crate) on: HashMap<Event, Callback<DynAsyncCallback>>,
pub(crate) on_any: Option<Callback<DynAsyncAnyCallback>>,
pub(crate) on_reconnect: Option<Callback<DynAsyncReconnectSettingsCallback>>,
pub(crate) namespace: String,
tls_config: Option<TlsConnector>,
opening_headers: Option<HeaderMap>,
Expand Down Expand Up @@ -82,6 +85,7 @@ impl ClientBuilder {
address: address.into(),
on: HashMap::new(),
on_any: None,
on_reconnect: None,
namespace: "/".to_owned(),
tls_config: None,
opening_headers: None,
Expand Down Expand Up @@ -192,6 +196,42 @@ impl ClientBuilder {
self
}

/// Registers a callback for reconnect events. The event handler must return
/// a [ReconnectSettings] struct with the settings that should be updated.
///
/// # Example
/// ```rust
/// use rust_socketio::{asynchronous::{ClientBuilder, ReconnectSettings}};
/// use futures_util::future::FutureExt;
/// use serde_json::json;
///
/// #[tokio::main]
/// async fn main() {
/// let client = ClientBuilder::new("http://localhost:4200/")
/// .namespace("/admin")
/// .on_reconnect(|| {
/// async {
/// let mut settings = ReconnectSettings::new();
/// settings.address("http://server?test=123");
/// settings.auth(json!({ "token": "abc" }));
/// settings
/// }.boxed()
/// })
/// .connect()
/// .await;
/// }
/// ```
pub fn on_reconnect<F>(mut self, callback: F) -> Self
where
F: for<'a> std::ops::FnMut() -> BoxFuture<'static, ReconnectSettings>
+ 'static
+ Send
+ Sync,
{
self.on_reconnect = Some(Callback::<DynAsyncReconnectSettingsCallback>::new(callback));
self
}

/// Registers a Callback for all [`crate::event::Event::Custom`] and [`crate::event::Event::Message`].
///
/// # Example
Expand Down
31 changes: 30 additions & 1 deletion socketio/src/asynchronous/client/callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{

use crate::{Event, Payload};

use super::client::Client;
use super::client::{Client, ReconnectSettings};

/// Internal type, provides a way to store futures and return them in a boxed manner.
pub(crate) type DynAsyncCallback =
Expand All @@ -16,6 +16,9 @@ pub(crate) type DynAsyncAnyCallback = Box<
dyn for<'a> FnMut(Event, Payload, Client) -> BoxFuture<'static, ()> + 'static + Send + Sync,
>;

pub(crate) type DynAsyncReconnectSettingsCallback =
Box<dyn for<'a> FnMut() -> BoxFuture<'static, ReconnectSettings> + 'static + Send + Sync>;

pub(crate) struct Callback<T> {
inner: T,
}
Expand Down Expand Up @@ -77,3 +80,29 @@ impl Callback<DynAsyncAnyCallback> {
}
}
}

impl Deref for Callback<DynAsyncReconnectSettingsCallback> {
type Target =
dyn for<'a> FnMut() -> BoxFuture<'static, ReconnectSettings> + 'static + Sync + Send;

fn deref(&self) -> &Self::Target {
self.inner.as_ref()
}
}

impl DerefMut for Callback<DynAsyncReconnectSettingsCallback> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.inner.as_mut()
}
}

impl Callback<DynAsyncReconnectSettingsCallback> {
pub(crate) fn new<T>(callback: T) -> Self
where
T: for<'a> FnMut() -> BoxFuture<'static, ReconnectSettings> + 'static + Sync + Send,
{
Callback {
inner: Box::new(callback),
}
}
}
41 changes: 40 additions & 1 deletion socketio/src/asynchronous/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,33 @@ enum DisconnectReason {
Server,
}

/// Settings that can be updated before reconnecting to a server
#[derive(Default)]
pub struct ReconnectSettings {
address: Option<String>,
auth: Option<serde_json::Value>,
}

impl ReconnectSettings {
pub fn new() -> Self {
Self::default()
}

/// Sets the URL that will be used when reconnecting to the server
pub fn address<T>(&mut self, address: T) -> &mut Self
where
T: Into<String>,
{
self.address = Some(address.into());
self
}

/// Sets the authentication data that will be send in the opening request
pub fn auth(&mut self, auth: serde_json::Value) {
self.auth = Some(auth);
}
}

/// A socket which handles communication with the server. It's initialized with
/// a specific address as well as an optional namespace to connect to. If `None`
/// is given the client will connect to the default namespace `"/"`.
Expand Down Expand Up @@ -81,7 +108,19 @@ impl Client {
}

pub(crate) async fn reconnect(&mut self) -> Result<()> {
let builder = self.builder.write().await;
let mut builder = self.builder.write().await;

if let Some(config) = builder.on_reconnect.as_mut() {
let reconnect_settings = config().await;
if let Some(address) = reconnect_settings.address {
builder.address = address;
}

if let Some(auth) = reconnect_settings.auth {
self.auth = Some(auth);
}
}

let socket = builder.inner_create().await?;

// New inner socket that can be connected
Expand Down
1 change: 1 addition & 0 deletions socketio/src/asynchronous/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ mod socket;
#[cfg(feature = "async")]
pub use client::builder::ClientBuilder;
pub use client::client::Client;
pub use client::client::ReconnectSettings;

0 comments on commit e863462

Please sign in to comment.