From 99b129afc46d06121cccd1bc585bc75206f79027 Mon Sep 17 00:00:00 2001
From: nicolas <48695862+merklefruit@users.noreply.github.com>
Date: Sun, 25 Aug 2024 12:12:15 +0200
Subject: [PATCH 1/7] feat: ipc socket
---
Cargo.lock | 18 ++--
Cargo.toml | 9 +-
README.md | 36 ++++---
msg-common/src/lib.rs | 7 +-
msg-sim/src/lib.rs | 7 +-
msg-socket/Cargo.toml | 4 +-
msg-socket/src/connection/state.rs | 8 +-
msg-socket/src/lib.rs | 10 +-
msg-socket/src/pub/driver.rs | 14 +--
msg-socket/src/pub/mod.rs | 38 +++----
msg-socket/src/pub/socket.rs | 61 ++++++++---
msg-socket/src/rep/driver.rs | 48 ++++-----
msg-socket/src/rep/mod.rs | 36 +++----
msg-socket/src/rep/socket.rs | 61 ++++++++---
msg-socket/src/req/driver.rs | 24 ++---
msg-socket/src/req/socket.rs | 44 ++++++--
msg-socket/src/sub/driver.rs | 87 ++++++++--------
msg-socket/src/sub/mod.rs | 37 ++++---
msg-socket/src/sub/session.rs | 31 +++---
msg-socket/src/sub/socket.rs | 161 ++++++++++++++++++++---------
msg-socket/src/sub/stats.rs | 24 +++--
msg-socket/tests/it/pubsub.rs | 46 +++++----
msg-transport/Cargo.toml | 4 -
msg-transport/src/ipc/mod.rs | 153 +++++++++++++++++++++++++++
msg-transport/src/lib.rs | 35 +++++--
msg-transport/src/quic/config.rs | 4 +-
msg-transport/src/quic/mod.rs | 16 +--
msg-transport/src/quic/stream.rs | 2 +-
msg-transport/src/tcp/mod.rs | 3 +-
msg-wire/Cargo.toml | 1 -
msg-wire/src/lib.rs | 4 +
msg/benches/pubsub.rs | 112 ++++++++++++++++++--
msg/benches/reqrep.rs | 85 +++++++++++++--
msg/examples/durable.rs | 4 +-
msg/examples/ipc.rs | 44 ++++++++
msg/examples/pubsub.rs | 6 +-
msg/examples/pubsub_auth.rs | 6 +-
msg/examples/pubsub_compression.rs | 6 +-
msg/examples/quic_vs_tcp.rs | 8 +-
msg/examples/reqrep.rs | 4 +-
msg/examples/reqrep_auth.rs | 4 +-
msg/examples/reqrep_compression.rs | 4 +-
msg/src/lib.rs | 3 +
43 files changed, 948 insertions(+), 371 deletions(-)
create mode 100644 msg-transport/src/ipc/mod.rs
create mode 100644 msg/examples/ipc.rs
diff --git a/Cargo.lock b/Cargo.lock
index d949ef4..f190edd 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -713,7 +713,7 @@ dependencies = [
[[package]]
name = "msg"
-version = "0.1.1"
+version = "0.1.2"
dependencies = [
"bytes",
"criterion",
@@ -733,7 +733,7 @@ dependencies = [
[[package]]
name = "msg-common"
-version = "0.1.1"
+version = "0.1.2"
dependencies = [
"futures",
"tokio",
@@ -742,16 +742,15 @@ dependencies = [
[[package]]
name = "msg-sim"
-version = "0.1.1"
+version = "0.1.2"
dependencies = [
"pnet",
]
[[package]]
name = "msg-socket"
-version = "0.1.1"
+version = "0.1.2"
dependencies = [
- "async-trait",
"bytes",
"futures",
"msg-common",
@@ -771,27 +770,23 @@ dependencies = [
[[package]]
name = "msg-transport"
-version = "0.1.1"
+version = "0.1.2"
dependencies = [
"async-trait",
- "bytes",
"futures",
"msg-common",
- "msg-wire",
"quinn",
- "rand",
"rcgen",
"rustls",
"thiserror",
"tokio",
- "tokio-util",
"tracing",
"tracing-subscriber",
]
[[package]]
name = "msg-wire"
-version = "0.1.1"
+version = "0.1.2"
dependencies = [
"bytes",
"flate2",
@@ -800,7 +795,6 @@ dependencies = [
"snap",
"thiserror",
"tokio-util",
- "tracing",
"zstd",
]
diff --git a/Cargo.toml b/Cargo.toml
index 54913e1..35960fc 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -10,9 +10,9 @@ members = [
resolver = "2"
[workspace.package]
-version = "0.1.1"
+version = "0.1.2"
edition = "2021"
-rust-version = "1.70" # Remember to update .clippy.toml and README.md
+rust-version = "1.70"
license = "MIT"
description = "A flexible and lightweight messaging library for distributed systems"
authors = ["Jonas Bostoen", "Nicolas Racchi"]
@@ -35,7 +35,6 @@ futures = "0.3"
tokio-stream = { version = "0.1", features = ["sync"] }
parking_lot = "0.12"
-
# general
bytes = "1"
thiserror = "1"
@@ -43,9 +42,9 @@ tracing = "0.1"
rustc-hash = "1"
rand = "0.8"
-# NETWORKING
+# networking
quinn = "0.10"
-# rustls needs to be the same version as the one used by quinn
+# (rustls needs to be the same version as the one used by quinn)
rustls = { version = "0.21", features = ["quic", "dangerous_configuration"] }
rcgen = "0.12"
diff --git a/README.md b/README.md
index 030122d..34e98e7 100644
--- a/README.md
+++ b/README.md
@@ -18,31 +18,37 @@
`msg-rs` is a messaging library that was inspired by projects like [ZeroMQ](https://zeromq.org/) and [Nanomsg](https://nanomsg.org/).
It was built because we needed a Rust-native messaging library like those above.
-> MSG is still in ALPHA and is not ready for production use.
-
## Documentation
-The [MSG-RS Book][book] contains detailed information on how to use the library.
+The 📖 [MSG-RS Book][book] contains detailed information on how to use the library.
## Features
-- [ ] Multiple socket types
+- Multiple socket types
- [x] Request/Reply
- [x] Publish/Subscribe
+- Pluggable transport layers
+ - [x] TCP
+ - [x] QUIC
+ - [x] IPC
+- Useful stats: latency, throughput, packet drops
+- Durable IO abstraction (built-in retries and reconnections)
+- Custom wire protocol with support for authentication and compression
+- Network simulation mode with dummynet & pfctl
+- Extensive benchmarks
+- Integration tests
+
+
## MSRV
@@ -59,6 +65,12 @@ Additionally, you can reach out to us on [Discord][discord] if you have any ques
This project is licensed under the Open Source [MIT license][mit-license].
+## Disclaimer
+
+
+This software is provided "as is", without warranty of any kind, express or implied, including but not limited to the warranties of merchantability, fitness for a particular purpose and noninfringement. In no event shall the authors or copyright holders be liable for any claim, damages or other liability, whether in an action of contract, tort or otherwise, arising from, out of or in connection with the software or the use or other dealings in the software.
+
+
[book]: https://chainbound.github.io/msg-rs/
diff --git a/msg-common/src/lib.rs b/msg-common/src/lib.rs
index 1f2aedf..f7e4b94 100644
--- a/msg-common/src/lib.rs
+++ b/msg-common/src/lib.rs
@@ -1,11 +1,14 @@
-use futures::future::BoxFuture;
+#![doc(issue_tracker_base_url = "https://github.com/chainbound/msg-rs/issues/")]
+#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
+#![cfg_attr(not(test), warn(unused_crate_dependencies))]
+
use std::{
pin::Pin,
task::{Context, Poll},
time::SystemTime,
};
-use futures::{Sink, SinkExt, Stream};
+use futures::{future::BoxFuture, Sink, SinkExt, Stream};
use tokio::sync::mpsc::{
self,
error::{TryRecvError, TrySendError},
diff --git a/msg-sim/src/lib.rs b/msg-sim/src/lib.rs
index 84c66d6..d95450f 100644
--- a/msg-sim/src/lib.rs
+++ b/msg-sim/src/lib.rs
@@ -1,8 +1,11 @@
-use std::{collections::HashMap, io, net::IpAddr, time::Duration};
+#![doc(issue_tracker_base_url = "https://github.com/chainbound/msg-rs/issues/")]
+#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
+#![cfg_attr(not(test), warn(unused_crate_dependencies))]
-pub use protocol::Protocol;
+use std::{collections::HashMap, io, net::IpAddr, time::Duration};
mod protocol;
+pub use protocol::Protocol;
#[cfg(target_os = "macos")]
pub mod dummynet;
diff --git a/msg-socket/Cargo.toml b/msg-socket/Cargo.toml
index 22a7a3c..2378ac6 100644
--- a/msg-socket/Cargo.toml
+++ b/msg-socket/Cargo.toml
@@ -23,12 +23,12 @@ tokio-util.workspace = true
thiserror.workspace = true
rustc-hash.workspace = true
tracing.workspace = true
-async-trait.workspace = true
tokio-stream.workspace = true
-rand.workspace = true
parking_lot.workspace = true
[dev-dependencies]
+rand.workspace = true
+
msg-sim.workspace = true
tracing-subscriber = "0.3"
diff --git a/msg-socket/src/connection/state.rs b/msg-socket/src/connection/state.rs
index de5a5f6..c3f7814 100644
--- a/msg-socket/src/connection/state.rs
+++ b/msg-socket/src/connection/state.rs
@@ -1,4 +1,4 @@
-use std::net::SocketAddr;
+use msg_transport::Address;
use super::Backoff;
@@ -6,20 +6,20 @@ use super::Backoff;
///
/// * `C` is the channel type, which is used to send and receive generic messages.
/// * `B` is the backoff type, used to control the backoff state for inactive connections.
-pub enum ConnectionState {
+pub enum ConnectionState {
Active {
/// Channel to control the underlying connection. This is used to send
/// and receive any kind of message in any direction.
channel: C,
},
Inactive {
- addr: SocketAddr,
+ addr: A,
/// The current backoff state for inactive connections.
backoff: B,
},
}
-impl ConnectionState {
+impl ConnectionState {
/// Returns `true` if the connection is active.
#[allow(unused)]
pub fn is_active(&self) -> bool {
diff --git a/msg-socket/src/lib.rs b/msg-socket/src/lib.rs
index 901e3d8..1705d11 100644
--- a/msg-socket/src/lib.rs
+++ b/msg-socket/src/lib.rs
@@ -1,4 +1,8 @@
-use std::net::SocketAddr;
+#![doc(issue_tracker_base_url = "https://github.com/chainbound/msg-rs/issues/")]
+#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
+#![cfg_attr(not(test), warn(unused_crate_dependencies))]
+
+use msg_transport::Address;
use tokio::io::{AsyncRead, AsyncWrite};
#[path = "pub/mod.rs"]
@@ -36,8 +40,8 @@ pub trait Authenticator: Send + Sync + Unpin + 'static {
fn authenticate(&self, id: &Bytes) -> bool;
}
-pub(crate) struct AuthResult {
+pub(crate) struct AuthResult {
id: Bytes,
- addr: SocketAddr,
+ addr: A,
stream: S,
}
diff --git a/msg-socket/src/pub/driver.rs b/msg-socket/src/pub/driver.rs
index 5789828..e27f0d2 100644
--- a/msg-socket/src/pub/driver.rs
+++ b/msg-socket/src/pub/driver.rs
@@ -1,10 +1,11 @@
-use futures::{stream::FuturesUnordered, Future, SinkExt, StreamExt};
use std::{
io,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
+
+use futures::{stream::FuturesUnordered, Future, SinkExt, StreamExt};
use tokio::{sync::broadcast, task::JoinSet};
use tokio_util::codec::Framed;
use tracing::{debug, error, info, warn};
@@ -16,6 +17,7 @@ use crate::{AuthResult, Authenticator};
use msg_transport::{PeerAddress, Transport};
use msg_wire::{auth, pubsub};
+#[allow(clippy::type_complexity)]
pub(crate) struct PubDriver {
/// Session ID counter.
pub(super) id_counter: u32,
@@ -30,7 +32,7 @@ pub(crate) struct PubDriver {
/// A set of pending incoming connections, represented by [`Transport::Accept`].
pub(super) conn_tasks: FuturesUnordered,
/// A joinset of authentication tasks.
- pub(super) auth_tasks: JoinSet, PubError>>,
+ pub(super) auth_tasks: JoinSet, PubError>>,
/// The receiver end of the message broadcast channel. The sender half is stored by [`PubSocket`](super::PubSocket).
pub(super) from_socket_bcast: broadcast::Receiver,
}
@@ -50,7 +52,7 @@ where
match auth {
Ok(auth) => {
// Run custom authenticator
- debug!("Authentication passed for {:?} ({})", auth.id, auth.addr);
+ debug!("Authentication passed for {:?} ({:?})", auth.id, auth.addr);
let mut framed = Framed::new(auth.stream, pubsub::Codec::new());
framed.set_backpressure_boundary(this.options.backpressure_boundary);
@@ -137,12 +139,12 @@ where
fn on_incoming(&mut self, io: T::Io) -> Result<(), io::Error> {
let addr = io.peer_addr()?;
- info!("New connection from {}", addr);
+ info!("New connection from {:?}", addr);
// If authentication is enabled, start the authentication process
if let Some(ref auth) = self.auth {
let authenticator = Arc::clone(auth);
- debug!("New connection from {}, authenticating", addr);
+ debug!("New connection from {:?}, authenticating", addr);
self.auth_tasks.spawn(async move {
let mut conn = Framed::new(io, auth::Codec::new_server());
@@ -201,7 +203,7 @@ where
self.id_counter = self.id_counter.wrapping_add(1);
debug!(
- "New connection from {}, session ID {}",
+ "New connection from {:?}, session ID {}",
addr, self.id_counter
);
}
diff --git a/msg-socket/src/pub/mod.rs b/msg-socket/src/pub/mod.rs
index ef291d2..37f56d5 100644
--- a/msg-socket/src/pub/mod.rs
+++ b/msg-socket/src/pub/mod.rs
@@ -192,10 +192,10 @@ mod tests {
let mut sub_socket = SubSocket::with_options(Tcp::default(), SubOptions::default());
- pub_socket.bind("0.0.0.0:0").await.unwrap();
+ pub_socket.bind_socket("0.0.0.0:0").await.unwrap();
let addr = pub_socket.local_addr().unwrap();
- sub_socket.connect(addr).await.unwrap();
+ sub_socket.connect_socket(addr).await.unwrap();
sub_socket.subscribe("HELLO".to_string()).await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
@@ -221,10 +221,10 @@ mod tests {
SubOptions::default().auth_token(Bytes::from("client1")),
);
- pub_socket.bind("0.0.0.0:0").await.unwrap();
+ pub_socket.bind_socket("0.0.0.0:0").await.unwrap();
let addr = pub_socket.local_addr().unwrap();
- sub_socket.connect(addr).await.unwrap();
+ sub_socket.connect_socket(addr).await.unwrap();
sub_socket.subscribe("HELLO".to_string()).await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
@@ -250,10 +250,10 @@ mod tests {
SubOptions::default().auth_token(Bytes::from("client1")),
);
- pub_socket.bind("0.0.0.0:0").await.unwrap();
+ pub_socket.bind_socket("0.0.0.0:0").await.unwrap();
let addr = pub_socket.local_addr().unwrap();
- sub_socket.connect(addr).await.unwrap();
+ sub_socket.connect_socket(addr).await.unwrap();
sub_socket.subscribe("HELLO".to_string()).await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
@@ -278,11 +278,11 @@ mod tests {
let mut sub2 = SubSocket::new(Tcp::default());
- pub_socket.bind("0.0.0.0:0").await.unwrap();
+ pub_socket.bind_socket("0.0.0.0:0").await.unwrap();
let addr = pub_socket.local_addr().unwrap();
- sub1.connect(addr).await.unwrap();
- sub2.connect(addr).await.unwrap();
+ sub1.connect_socket(addr).await.unwrap();
+ sub2.connect_socket(addr).await.unwrap();
sub1.subscribe("HELLO".to_string()).await.unwrap();
sub2.subscribe("HELLO".to_string()).await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
@@ -313,11 +313,11 @@ mod tests {
let mut sub2 = SubSocket::new(Tcp::default());
- pub_socket.bind("0.0.0.0:0").await.unwrap();
+ pub_socket.bind_socket("0.0.0.0:0").await.unwrap();
let addr = pub_socket.local_addr().unwrap();
- sub1.connect(addr).await.unwrap();
- sub2.connect(addr).await.unwrap();
+ sub1.connect_socket(addr).await.unwrap();
+ sub2.connect_socket(addr).await.unwrap();
sub1.subscribe("HELLO".to_string()).await.unwrap();
sub2.subscribe("HELLO".to_string()).await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
@@ -349,11 +349,11 @@ mod tests {
let mut sub_socket = SubSocket::new(Tcp::default());
// Try to connect and subscribe before the publisher is up
- sub_socket.connect("0.0.0.0:6662").await.unwrap();
+ sub_socket.connect_socket("0.0.0.0:6662").await.unwrap();
sub_socket.subscribe("HELLO".to_string()).await.unwrap();
tokio::time::sleep(Duration::from_millis(500)).await;
- pub_socket.bind("0.0.0.0:6662").await.unwrap();
+ pub_socket.bind_socket("0.0.0.0:6662").await.unwrap();
tokio::time::sleep(Duration::from_millis(2000)).await;
pub_socket
@@ -376,11 +376,11 @@ mod tests {
let mut sub_socket = SubSocket::new(Quic::default());
// Try to connect and subscribe before the publisher is up
- sub_socket.connect("0.0.0.0:6662").await.unwrap();
+ sub_socket.connect_socket("0.0.0.0:6662").await.unwrap();
sub_socket.subscribe("HELLO".to_string()).await.unwrap();
tokio::time::sleep(Duration::from_millis(1000)).await;
- pub_socket.bind("0.0.0.0:6662").await.unwrap();
+ pub_socket.bind_socket("0.0.0.0:6662").await.unwrap();
tokio::time::sleep(Duration::from_millis(2000)).await;
pub_socket
@@ -401,7 +401,7 @@ mod tests {
let mut pub_socket =
PubSocket::with_options(Tcp::default(), PubOptions::default().max_clients(1));
- pub_socket.bind("0.0.0.0:0").await.unwrap();
+ pub_socket.bind_socket("0.0.0.0:0").await.unwrap();
let mut sub1 = SubSocket::::with_options(Tcp::default(), SubOptions::default());
@@ -409,10 +409,10 @@ mod tests {
let addr = pub_socket.local_addr().unwrap();
- sub1.connect(addr).await.unwrap();
+ sub1.connect_socket(addr).await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
assert_eq!(pub_socket.stats().active_clients(), 1);
- sub2.connect(addr).await.unwrap();
+ sub2.connect_socket(addr).await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
assert_eq!(pub_socket.stats().active_clients(), 1);
}
diff --git a/msg-socket/src/pub/socket.rs b/msg-socket/src/pub/socket.rs
index 6a0bbb8..991da56 100644
--- a/msg-socket/src/pub/socket.rs
+++ b/msg-socket/src/pub/socket.rs
@@ -1,12 +1,12 @@
use bytes::Bytes;
use futures::stream::FuturesUnordered;
-use std::{io, net::SocketAddr, sync::Arc};
+use std::{io, net::SocketAddr, path::PathBuf, sync::Arc};
use tokio::{
net::{lookup_host, ToSocketAddrs},
sync::broadcast,
task::JoinSet,
};
-use tracing::{debug, trace};
+use tracing::{debug, trace, warn};
use super::{driver::PubDriver, stats::SocketStats, PubError, PubMessage, PubOptions, SocketState};
use crate::Authenticator;
@@ -32,7 +32,42 @@ pub struct PubSocket {
// complicates the API a lot. We can always change this later for perf reasons.
compressor: Option>,
/// The local address this socket is bound to.
- local_addr: Option,
+ local_addr: Option,
+}
+
+impl PubSocket
+where
+ T: Transport + Send + Unpin + 'static,
+ T::Addr: ToSocketAddrs,
+{
+}
+
+impl PubSocket
+where
+ T: Transport + Send + Unpin + 'static,
+{
+ /// Binds the socket to the given socket addres
+ ///
+ /// This method is only available for transports that support [`SocketAddr`] as address type,
+ /// like [`Tcp`](msg_transport::tcp::Tcp) and [`Quic`](msg_transport::quic::Quic).
+ pub async fn bind_socket(&mut self, addr: impl ToSocketAddrs) -> Result<(), PubError> {
+ let addrs = lookup_host(addr).await?;
+ self.try_bind(addrs.collect()).await
+ }
+}
+
+impl PubSocket
+where
+ T: Transport + Send + Unpin + 'static,
+{
+ /// Binds the socket to the given path.
+ ///
+ /// This method is only available for transports that support [`PathBuf`] as address type,
+ /// like [`Ipc`](msg_transport::ipc::Ipc).
+ pub async fn bind_path(&mut self, path: impl AsRef) -> Result<(), PubError> {
+ let addr = path.as_ref().clone();
+ self.try_bind(vec![addr]).await
+ }
}
impl PubSocket
@@ -69,8 +104,10 @@ where
self
}
- /// Binds the socket to the given address. This spawns the socket driver task.
- pub async fn bind(&mut self, addr: A) -> Result<(), PubError> {
+ /// Binds the socket to the given addresses in order until one succeeds.
+ ///
+ /// This also spawns the socket driver task.
+ pub async fn try_bind(&mut self, addresses: Vec) -> Result<(), PubError> {
let (to_sessions_bcast, from_socket_bcast) =
broadcast::channel(self.options.session_buffer_size);
@@ -79,13 +116,11 @@ where
.take()
.expect("Transport has been moved already");
- let addrs = lookup_host(addr).await?;
-
- for addr in addrs {
- match transport.bind(addr).await {
+ for addr in addresses {
+ match transport.bind(addr.clone()).await {
Ok(_) => break,
Err(e) => {
- tracing::warn!("Failed to bind to {}, trying next address: {}", addr, e);
+ warn!("Failed to bind to {:?}, trying next address: {}", addr, e);
continue;
}
}
@@ -98,7 +133,7 @@ where
)));
};
- tracing::debug!("Listening on {}", local_addr);
+ debug!("Listening on {:?}", local_addr);
let backend = PubDriver {
id_counter: 0,
@@ -192,7 +227,7 @@ where
}
/// Returns the local address this socket is bound to. `None` if the socket is not bound.
- pub fn local_addr(&self) -> Option {
- self.local_addr
+ pub fn local_addr(&self) -> Option<&T::Addr> {
+ self.local_addr.as_ref()
}
}
diff --git a/msg-socket/src/rep/driver.rs b/msg-socket/src/rep/driver.rs
index c0d06c4..9899f16 100644
--- a/msg-socket/src/rep/driver.rs
+++ b/msg-socket/src/rep/driver.rs
@@ -1,13 +1,13 @@
-use bytes::Bytes;
-use futures::{stream::FuturesUnordered, Future, FutureExt, SinkExt, Stream, StreamExt};
use std::{
collections::VecDeque,
io,
- net::SocketAddr,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
+
+use bytes::Bytes;
+use futures::{stream::FuturesUnordered, Future, FutureExt, SinkExt, Stream, StreamExt};
use tokio::{
io::{AsyncRead, AsyncWrite},
sync::{mpsc, oneshot},
@@ -15,26 +15,28 @@ use tokio::{
};
use tokio_stream::{StreamMap, StreamNotifyClose};
use tokio_util::codec::Framed;
-use tracing::{debug, error, info, warn};
+use tracing::{debug, error, info, trace, warn};
use crate::{rep::SocketState, AuthResult, Authenticator, PubError, RepOptions, Request};
-use msg_transport::{PeerAddress, Transport};
+
+use msg_transport::{Address, PeerAddress, Transport};
use msg_wire::{
auth,
compression::{try_decompress_payload, Compressor},
reqrep,
};
-pub(crate) struct PeerState {
+pub(crate) struct PeerState {
pending_requests: FuturesUnordered,
conn: Framed,
- addr: SocketAddr,
+ addr: A,
egress_queue: VecDeque,
state: Arc,
should_flush: bool,
compressor: Option>,
}
+#[allow(clippy::type_complexity)]
pub(crate) struct RepDriver {
/// The server transport used to accept incoming connections.
pub(crate) transport: T,
@@ -44,9 +46,9 @@ pub(crate) struct RepDriver {
/// Options shared with socket.
pub(crate) options: Arc,
/// [`StreamMap`] of connected peers. The key is the peer's address.
- pub(crate) peer_states: StreamMap>>,
+ pub(crate) peer_states: StreamMap>>,
/// Sender to the socket front-end. Used to notify the socket of incoming requests.
- pub(crate) to_socket: mpsc::Sender,
+ pub(crate) to_socket: mpsc::Sender>,
/// Optional connection authenticator.
pub(crate) auth: Option>,
/// Optional message compressor. This is shared with the socket to keep
@@ -55,7 +57,7 @@ pub(crate) struct RepDriver {
/// A set of pending incoming connections, represented by [`Transport::Accept`].
pub(super) conn_tasks: FuturesUnordered,
/// A joinset of authentication tasks.
- pub(crate) auth_tasks: JoinSet, PubError>>,
+ pub(crate) auth_tasks: JoinSet, PubError>>,
}
impl Future for RepDriver
@@ -71,7 +73,7 @@ where
if let Poll::Ready(Some((peer, msg))) = this.peer_states.poll_next_unpin(cx) {
match msg {
Some(Ok(mut request)) => {
- debug!("Received request from peer {}", peer);
+ debug!("Received request from peer {:?}", peer);
let size = request.msg().len();
@@ -88,10 +90,10 @@ where
let _ = this.to_socket.try_send(request);
}
Some(Err(e)) => {
- error!("Error receiving message from peer {}: {:?}", peer, e);
+ error!("Error receiving message from peer {:?}: {:?}", peer, e);
}
None => {
- warn!("Peer {} disconnected", peer);
+ warn!("Peer {:?} disconnected", peer);
this.state.stats.decrement_active_clients();
}
}
@@ -103,10 +105,10 @@ where
match auth {
Ok(auth) => {
// Run custom authenticator
- tracing::info!("Authentication passed for {:?} ({})", auth.id, auth.addr);
+ info!("Authentication passed for {:?} ({:?})", auth.id, auth.addr);
this.peer_states.insert(
- auth.addr,
+ auth.addr.clone(),
StreamNotifyClose::new(PeerState {
pending_requests: FuturesUnordered::new(),
conn: Framed::new(auth.stream, reqrep::Codec::new()),
@@ -183,12 +185,12 @@ where
fn on_incoming(&mut self, io: T::Io) -> Result<(), io::Error> {
let addr = io.peer_addr()?;
- info!("New connection from {}", addr);
+ info!("New connection from {:?}", addr);
// If authentication is enabled, start the authentication process
if let Some(ref auth) = self.auth {
let authenticator = Arc::clone(auth);
- debug!("New connection from {}, authenticating", addr);
+ debug!("New connection from {:?}, authenticating", addr);
self.auth_tasks.spawn(async move {
let mut conn = Framed::new(io, auth::Codec::new_server());
@@ -229,7 +231,7 @@ where
});
} else {
self.peer_states.insert(
- addr,
+ addr.clone(),
StreamNotifyClose::new(PeerState {
pending_requests: FuturesUnordered::new(),
conn: Framed::new(io, reqrep::Codec::new()),
@@ -246,8 +248,8 @@ where
}
}
-impl Stream for PeerState {
- type Item = Result;
+impl Stream for PeerState {
+ type Item = Result, PubError>;
/// Advances the state of the peer.
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll