Skip to content

Commit

Permalink
Merge pull request #68 from chainbound/fix/backoff
Browse files Browse the repository at this point in the history
Fix subscriber backoff
  • Loading branch information
mempirate authored Jan 23, 2024
2 parents ac32c93 + 6d8f2bd commit 3799474
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 30 deletions.
4 changes: 2 additions & 2 deletions msg-socket/src/pub/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use tokio::{
sync::broadcast,
task::JoinSet,
};
use tracing::debug;
use tracing::{debug, trace};

use super::{driver::PubDriver, stats::SocketStats, PubError, PubMessage, PubOptions, SocketState};
use crate::Authenticator;
Expand Down Expand Up @@ -132,7 +132,7 @@ where
if let Some(ref compressor) = self.compressor {
msg.compress(compressor.as_ref())?;

debug!(
trace!(
"Compressed message from {} to {} bytes",
len_before,
msg.payload().len(),
Expand Down
47 changes: 19 additions & 28 deletions msg-socket/src/sub/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use msg_common::{channel, Channel};
use msg_transport::Transport;
use msg_wire::{auth, pubsub};

type ConnectionResult<Io, E> = Result<(SocketAddr, Io), (SocketAddr, E)>;
type ConnectionResult<Io, E> = Result<(SocketAddr, Io), E>;

pub(crate) struct SubDriver<T: Transport> {
/// Options shared with the socket.
Expand Down Expand Up @@ -92,13 +92,8 @@ where
Ok((addr, io)) => {
this.on_connection(addr, io);
}
// If the initial connection failed, reset the publisher to try again later.
Err((addr, e)) => {
this.reset_publisher(addr);
error!(
"Error connecting to publisher, scheduling reconnect: {:?}",
e
);
Err(e) => {
error!("Error connecting to publisher: {:?}", e);
}
}

Expand Down Expand Up @@ -223,6 +218,10 @@ where
}

self.connect(endpoint);

// Also set the publisher to the disconnected state. This will make sure that if the
// initial connection attempt fails, it will be retried in `poll_publishers`.
self.reset_publisher(endpoint);
}
Command::Disconnect { endpoint } => {
if self.publishers.remove(&endpoint).is_some() {
Expand All @@ -244,7 +243,7 @@ where
let token = self.options.auth_token.clone();

self.connection_tasks.spawn(async move {
let io = connect.await.map_err(|e| (addr, e))?;
let io = connect.await?;

if let Some(token) = token {
let mut conn = Framed::new(io, auth::Codec::new_client());
Expand All @@ -253,37 +252,29 @@ where
// Send the authentication message
conn.send(auth::Message::Auth(token))
.await
.map_err(|e| (addr, T::Error::from(e)))?;
conn.flush().await.map_err(|e| (addr, T::Error::from(e)))?;
.map_err(T::Error::from)?;
conn.flush().await.map_err(T::Error::from)?;

tracing::debug!("Waiting for ACK from server...");

// Wait for the response
let ack = conn
.next()
.await
.ok_or((
addr,
io::Error::new(io::ErrorKind::UnexpectedEof, "Connection closed").into(),
.ok_or(io::Error::new(
io::ErrorKind::UnexpectedEof,
"Connection closed",
))?
.map_err(|e| {
(
addr,
io::Error::new(io::ErrorKind::PermissionDenied, e).into(),
)
})?;
.map_err(|e| io::Error::new(io::ErrorKind::PermissionDenied, e))?;

if matches!(ack, auth::Message::Ack) {
Ok((addr, conn.into_inner()))
} else {
Err((
addr,
io::Error::new(
io::ErrorKind::PermissionDenied,
"Publisher denied connection",
)
.into(),
))
Err(io::Error::new(
io::ErrorKind::PermissionDenied,
"Publisher denied connection",
)
.into())
}
} else {
Ok((addr, io))
Expand Down

0 comments on commit 3799474

Please sign in to comment.