Skip to content

Commit

Permalink
refactor: rustfmt, format errors
Browse files Browse the repository at this point in the history
  • Loading branch information
merklefruit committed Aug 27, 2024
1 parent 906c983 commit 87d105b
Show file tree
Hide file tree
Showing 43 changed files with 291 additions and 639 deletions.
8 changes: 4 additions & 4 deletions msg-common/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,11 @@ impl<S: Send + 'static, R> Channel<S, R> {
///
/// This method returns:
///
/// * `Poll::Pending` if no messages are available but the channel is not
/// closed, or if a spurious failure happens.
/// * `Poll::Pending` if no messages are available but the channel is not closed, or if a
/// spurious failure happens.
/// * `Poll::Ready(Some(message))` if a message is available.
/// * `Poll::Ready(None)` if the channel has been closed and all messages
/// sent before it was closed have been received.
/// * `Poll::Ready(None)` if the channel has been closed and all messages sent before it was
/// closed have been received.
///
/// When the method returns `Poll::Pending`, the `Waker` in the provided
/// `Context` is scheduled to receive a wakeup when a message is sent on any
Expand Down
5 changes: 1 addition & 4 deletions msg-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@ pub use task::JoinMap;
/// Returns the current UNIX timestamp in microseconds.
#[inline]
pub fn unix_micros() -> u64 {
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_micros() as u64
SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_micros() as u64
}

/// Wraps the given error in a boxed future.
Expand Down
20 changes: 9 additions & 11 deletions msg-common/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use std::{
use tokio::task::{JoinError, JoinSet};

/// A collection of keyed tasks spawned on a Tokio runtime.
/// Hacky implementation of a join set that allows for a key to be associated with each task by having
/// the task return a tuple of (key, value).
/// Hacky implementation of a join set that allows for a key to be associated with each task by
/// having the task return a tuple of (key, value).
#[derive(Debug, Default)]
pub struct JoinMap<K, V> {
keys: HashSet<K>,
Expand All @@ -17,10 +17,7 @@ pub struct JoinMap<K, V> {
impl<K, V> JoinMap<K, V> {
/// Create a new `JoinSet`.
pub fn new() -> Self {
Self {
keys: HashSet::new(),
joinset: JoinSet::new(),
}
Self { keys: HashSet::new(), joinset: JoinSet::new() }
}

/// Returns the number of tasks currently in the `JoinSet`.
Expand Down Expand Up @@ -71,7 +68,8 @@ where

/// Polls for one of the tasks in the set to complete.
///
/// If this returns `Poll::Ready(Some(_))`, then the task that completed is removed from the set.
/// If this returns `Poll::Ready(Some(_))`, then the task that completed is removed from the
/// set.
///
/// When the method returns `Poll::Pending`, the `Waker` in the provided `Context` is scheduled
/// to receive a wakeup when a task in the `JoinSet` completes. Note that on multiple calls to
Expand All @@ -83,11 +81,11 @@ where
/// This function returns:
///
/// * `Poll::Pending` if the `JoinSet` is not empty but there is no task whose output is
/// available right now.
/// * `Poll::Ready(Some(Ok(value)))` if one of the tasks in this `JoinSet` has completed.
/// The `value` is the return value of one of the tasks that completed.
/// available right now.
/// * `Poll::Ready(Some(Ok(value)))` if one of the tasks in this `JoinSet` has completed. The
/// `value` is the return value of one of the tasks that completed.
/// * `Poll::Ready(Some(Err(err)))` if one of the tasks in this `JoinSet` has panicked or been
/// aborted. The `err` is the `JoinError` from the panicked/aborted task.
/// aborted. The `err` is the `JoinError` from the panicked/aborted task.
/// * `Poll::Ready(None)` if the `JoinSet` is empty.
///
/// Note that this method may return `Poll::Pending` even if one of the tasks has completed.
Expand Down
82 changes: 18 additions & 64 deletions msg-sim/src/dummynet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,7 @@ pub struct Pipe {
impl Pipe {
/// Creates a new pipe with the given ID. The ID must be unique.
pub fn new(id: usize) -> Self {
Self {
id,
bandwidth: None,
delay: None,
plr: None,
}
Self { id, bandwidth: None, delay: None, plr: None }
}

/// Set the bandwidth cap of the pipe in Kbps.
Expand Down Expand Up @@ -54,10 +49,7 @@ impl Pipe {
fn build_cmd(&self) -> Command {
let mut cmd = Command::new("sudo");

cmd.arg("dnctl")
.arg("pipe")
.arg(self.id.to_string())
.arg("config");
cmd.arg("dnctl").arg("pipe").arg(self.id.to_string()).arg("config");

if let Some(bandwidth) = self.bandwidth {
let bw = format!("{}Kbit/s", bandwidth);
Expand Down Expand Up @@ -87,10 +79,7 @@ impl Pipe {

fn destroy_cmd(&self) -> Command {
let mut cmd = Command::new("sudo");
cmd.arg("dnctl")
.arg("pipe")
.arg("delete")
.arg(self.id.to_string());
cmd.arg("dnctl").arg("pipe").arg("delete").arg(self.id.to_string());

cmd
}
Expand Down Expand Up @@ -169,9 +158,7 @@ impl PacketFilter {

/// Destroys the packet filter by executing the correct shell commands.
pub fn destroy(self) -> io::Result<()> {
let status = Command::new("sudo")
.args(["pfctl", "-f", "/etc/pf.conf"])
.status()?;
let status = Command::new("sudo").args(["pfctl", "-f", "/etc/pf.conf"]).status()?;

assert_status(status, "Failed to flush packet filter")?;

Expand All @@ -181,21 +168,15 @@ impl PacketFilter {

// Remove the loopback alias
let status = Command::new("sudo")
.args([
"ifconfig",
&self.loopback,
"-alias",
&self.endpoint.unwrap().to_string(),
])
.args(["ifconfig", &self.loopback, "-alias", &self.endpoint.unwrap().to_string()])
.status()?;

assert_status(status, "Failed to remove the loopback alias")?;

// Reset the MTU of the loopback interface

let status = Command::new("sudo")
.args(["ifconfig", &self.loopback, "mtu", "16384"])
.status()?;
let status =
Command::new("sudo").args(["ifconfig", &self.loopback, "mtu", "16384"]).status()?;

assert_status(status, "Failed to reset loopback MTU back to 16384")?;

Expand All @@ -207,19 +188,13 @@ impl PacketFilter {

fn create_loopback_alias(&self) -> io::Result<()> {
let status = Command::new("sudo")
.args([
"ifconfig",
&self.loopback,
"alias",
&self.endpoint.unwrap().to_string(),
])
.args(["ifconfig", &self.loopback, "alias", &self.endpoint.unwrap().to_string()])
.status()?;

assert_status(status, "Failed to create loopback alias")?;

let status = Command::new("sudo")
.args(["ifconfig", &self.loopback, "mtu", "1500"])
.status()?;
let status =
Command::new("sudo").args(["ifconfig", &self.loopback, "mtu", "1500"]).status()?;

assert_status(status, "Failed to set loopback MTU to 1500")?;

Expand All @@ -230,31 +205,18 @@ impl PacketFilter {
/// `(cat /etc/pf.conf && echo "dummynet-anchor \"msg-sim\"" &&
/// echo "anchor \"msg-sim\"") | sudo pfctl -f -`
fn load_pf_config(&self) -> io::Result<()> {
let echo_cmd = format!(
"dummynet-anchor \"{}\"\nanchor \"{}\"",
self.anchor, self.anchor
);
let echo_cmd = format!("dummynet-anchor \"{}\"\nanchor \"{}\"", self.anchor, self.anchor);

let mut cat = Command::new("cat")
.arg("/etc/pf.conf")
.stdout(Stdio::piped())
.spawn()?;
let mut cat = Command::new("cat").arg("/etc/pf.conf").stdout(Stdio::piped()).spawn()?;

let cat_stdout = cat.stdout.take().unwrap();

let mut echo = Command::new("echo")
.arg(echo_cmd)
.stdout(Stdio::piped())
.spawn()?;
let mut echo = Command::new("echo").arg(echo_cmd).stdout(Stdio::piped()).spawn()?;

let echo_stdout = echo.stdout.take().unwrap();

let mut pfctl = Command::new("sudo")
.arg("pfctl")
.arg("-f")
.arg("-")
.stdin(Stdio::piped())
.spawn()?;
let mut pfctl =
Command::new("sudo").arg("pfctl").arg("-f").arg("-").stdin(Stdio::piped()).spawn()?;

let pfctl_stdin = pfctl.stdin.as_mut().unwrap();
io::copy(&mut cat_stdout.chain(echo_stdout), pfctl_stdin)?;
Expand All @@ -277,10 +239,7 @@ impl PacketFilter {
let echo_command = format!("dummynet in from any to {} pipe {}", endpoint, pipe_id);

// Set up the echo command
let mut echo = Command::new("echo")
.arg(echo_command)
.stdout(Stdio::piped())
.spawn()?;
let mut echo = Command::new("echo").arg(echo_command).stdout(Stdio::piped()).spawn()?;

if let Some(echo_stdout) = echo.stdout.take() {
// Set up the pfctl command
Expand Down Expand Up @@ -346,10 +305,7 @@ mod tests {
let cmd = Pipe::new(1).bandwidth(10).delay(100).plr(0.1).build_cmd();
let cmd_str = cmd_to_string(&cmd);

assert_eq!(
cmd_str,
"sudo dnctl pipe 1 config bw 10Kbit/s delay 100 plr 0.1"
);
assert_eq!(cmd_str, "sudo dnctl pipe 1 config bw 10Kbit/s delay 100 plr 0.1");

let cmd = Pipe::new(2).delay(1000).plr(10.0).build_cmd();
let cmd_str = cmd_to_string(&cmd);
Expand Down Expand Up @@ -378,9 +334,7 @@ mod tests {
let pipe = Pipe::new(3).bandwidth(100).delay(300);

let endpoint = "127.0.0.2".parse().unwrap();
let pf = PacketFilter::new(pipe)
.endpoint(endpoint)
.anchor("msg-sim-test");
let pf = PacketFilter::new(pipe).endpoint(endpoint).anchor("msg-sim-test");

pf.enable().unwrap();

Expand Down
10 changes: 3 additions & 7 deletions msg-sim/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,7 @@ pub struct Simulator {

impl Simulator {
pub fn new() -> Self {
Self {
active_sims: HashMap::new(),
sim_id: 1,
}
Self { active_sims: HashMap::new(), sim_id: 1 }
}

/// Starts a new simulation on the given endpoint according to the config.
Expand Down Expand Up @@ -108,9 +105,8 @@ impl Simulation {
pipe = pipe.plr(plr);
}

let mut pf = PacketFilter::new(pipe)
.anchor(format!("msg-sim-{}", self.id))
.endpoint(self.endpoint);
let mut pf =
PacketFilter::new(pipe).anchor(format!("msg-sim-{}", self.id)).endpoint(self.endpoint);

if !self.config.protocols.is_empty() {
pf = pf.protocols(self.config.protocols.clone());
Expand Down
7 changes: 1 addition & 6 deletions msg-socket/src/connection/backoff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,7 @@ pub struct ExponentialBackoff {

impl ExponentialBackoff {
pub fn new(initial: Duration, max_retries: usize) -> Self {
Self {
retry_count: 0,
max_retries,
backoff: initial,
timeout: None,
}
Self { retry_count: 0, max_retries, backoff: initial, timeout: None }
}

/// (Re)-set the timeout to the current backoff duration.
Expand Down
43 changes: 18 additions & 25 deletions msg-socket/src/pub/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ pub(crate) struct PubDriver<T: Transport<A>, A: Address> {
pub(super) conn_tasks: FuturesUnordered<T::Accept>,
/// A joinset of authentication tasks.
pub(super) auth_tasks: JoinSet<Result<AuthResult<T::Io, A>, PubError>>,
/// The receiver end of the message broadcast channel. The sender half is stored by [`PubSocket`](super::PubSocket).
/// The receiver end of the message broadcast channel. The sender half is stored by
/// [`PubSocket`](super::PubSocket).
pub(super) from_socket_bcast: broadcast::Receiver<PubMessage>,
}

Expand All @@ -48,7 +49,8 @@ where
let this = self.get_mut();

loop {
// First, poll the joinset of authentication tasks. If a new connection has been handled we spawn a new session for it.
// First, poll the joinset of authentication tasks. If a new connection has been handled
// we spawn a new session for it.
if let Poll::Ready(Some(Ok(auth))) = this.auth_tasks.poll_join_next(cx) {
match auth {
Ok(auth) => {
Expand All @@ -75,50 +77,48 @@ where
this.id_counter = this.id_counter.wrapping_add(1);
}
Err(e) => {
error!("Error authenticating client: {:?}", e);
error!(err = ?e, "Error authenticating client");
this.state.stats.decrement_active_clients();
}
}

continue;
}

// Then poll the incoming connection tasks. If a new connection has been accepted, spawn a new authentication task for it.
// Then poll the incoming connection tasks. If a new connection has been accepted, spawn
// a new authentication task for it.
if let Poll::Ready(Some(incoming)) = this.conn_tasks.poll_next_unpin(cx) {
match incoming {
Ok(io) => {
if let Err(e) = this.on_incoming(io) {
error!("Error accepting incoming connection: {:?}", e);
error!(err = ?e, "Error accepting incoming connection");
this.state.stats.decrement_active_clients();
}
}
Err(e) => {
error!("Error accepting incoming connection: {:?}", e);
error!(err = ?e, "Error accepting incoming connection");

// Active clients have already been incremented in the initial call to `poll_accept`,
// so we need to decrement them here.
// Active clients have already been incremented in the initial call to
// `poll_accept`, so we need to decrement them here.
this.state.stats.decrement_active_clients();
}
}

continue;
}

// Finally, poll the transport for new incoming connection futures and push them to the incoming connection tasks.
// Finally, poll the transport for new incoming connection futures and push them to the
// incoming connection tasks.
if let Poll::Ready(accept) = Pin::new(&mut this.transport).poll_accept(cx) {
if let Some(max) = this.options.max_clients {
if this.state.stats.active_clients() >= max {
warn!(
"Max connections reached ({}), rejecting new incoming connection",
max
);

warn!("Max connections reached ({}), rejecting incoming connection", max);
continue;
}
}

// Increment the active clients counter. If the authentication fails, this counter
// will be decremented.
// Increment the active clients counter. If the authentication fails,
// this counter will be decremented.
this.state.stats.increment_active_clients();

this.conn_tasks.push(accept);
Expand Down Expand Up @@ -179,11 +179,7 @@ where
conn.send(auth::Message::Ack).await?;
conn.flush().await?;

Ok(AuthResult {
id,
addr,
stream: conn.into_inner(),
})
Ok(AuthResult { id, addr, stream: conn.into_inner() })
});
} else {
let mut framed = Framed::new(io, pubsub::Codec::new());
Expand All @@ -204,10 +200,7 @@ where
tokio::spawn(session);

self.id_counter = self.id_counter.wrapping_add(1);
debug!(
"New connection from {:?}, session ID {}",
addr, self.id_counter
);
debug!("New connection from {:?}, session ID {}", addr, self.id_counter);
}

Ok(())
Expand Down
Loading

0 comments on commit 87d105b

Please sign in to comment.