Skip to content

Commit

Permalink
Merge pull request #76 from chainbound/feat/ipc
Browse files Browse the repository at this point in the history
feat: IPC transport
  • Loading branch information
merklefruit authored Aug 27, 2024
2 parents ed424a0 + 6c9a9a8 commit 4dd4341
Show file tree
Hide file tree
Showing 36 changed files with 981 additions and 395 deletions.
18 changes: 6 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 4 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ members = [
resolver = "2"

[workspace.package]
version = "0.1.1"
version = "0.1.2"
edition = "2021"
rust-version = "1.70" # Remember to update .clippy.toml and README.md
rust-version = "1.70"
license = "MIT"
description = "A flexible and lightweight messaging library for distributed systems"
authors = ["Jonas Bostoen", "Nicolas Racchi"]
Expand All @@ -35,17 +35,16 @@ futures = "0.3"
tokio-stream = { version = "0.1", features = ["sync"] }
parking_lot = "0.12"


# general
bytes = "1"
thiserror = "1"
tracing = "0.1"
rustc-hash = "1"
rand = "0.8"

# NETWORKING
# networking
quinn = "0.10"
# rustls needs to be the same version as the one used by quinn
# (rustls needs to be the same version as the one used by quinn)
rustls = { version = "0.21", features = ["quic", "dangerous_configuration"] }
rcgen = "0.12"

Expand Down
36 changes: 24 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,37 @@
`msg-rs` is a messaging library that was inspired by projects like [ZeroMQ](https://zeromq.org/) and [Nanomsg](https://nanomsg.org/).
It was built because we needed a Rust-native messaging library like those above.

> MSG is still in ALPHA and is not ready for production use.
## Documentation

The [MSG-RS Book][book] contains detailed information on how to use the library.
The 📖 [MSG-RS Book][book] contains detailed information on how to use the library.

## Features

- [ ] Multiple socket types
- Multiple socket types
- [x] Request/Reply
- [x] Publish/Subscribe
- Pluggable transport layers
- [x] TCP
- [x] QUIC
- [x] IPC
- Useful stats: latency, throughput, packet drops
- Durable IO abstraction (built-in retries and reconnections)
- Custom wire protocol with support for authentication and compression
- Network simulation mode with dummynet & pfctl
- Extensive benchmarks
- Integration tests

<!-- TODO:
- Socket types
- [ ] Channel
- [ ] Push/Pull
- [ ] Survey/Respond
- [ ] Stats (RTT, throughput, packet drops etc.)
- [x] Request/Reply basic stats
- [ ] Queuing
- [ ] Pluggable transport layer
- [x] TCP
- Queuing
- Transport layers
- [ ] TLS
- [ ] IPC
- [ ] UDP
- [ ] Inproc
- [x] Durable IO abstraction (built-in retries and reconnections)
- [ ] Simulation modes with [Turmoil](https://github.com/tokio-rs/turmoil)
-->

## MSRV

Expand All @@ -59,6 +65,12 @@ Additionally, you can reach out to us on [Discord][discord] if you have any ques

This project is licensed under the Open Source [MIT license][mit-license].

## Disclaimer

<sub>
This software is provided "as is", without warranty of any kind, express or implied, including but not limited to the warranties of merchantability, fitness for a particular purpose and noninfringement. In no event shall the authors or copyright holders be liable for any claim, damages or other liability, whether in an action of contract, tort or otherwise, arising from, out of or in connection with the software or the use or other dealings in the software.
</sub>

<!-- Links -->

[book]: https://chainbound.github.io/msg-rs/
Expand Down
7 changes: 5 additions & 2 deletions msg-common/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use futures::future::BoxFuture;
#![doc(issue_tracker_base_url = "https://github.com/chainbound/msg-rs/issues/")]
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
#![cfg_attr(not(test), warn(unused_crate_dependencies))]

use std::{
pin::Pin,
task::{Context, Poll},
time::SystemTime,
};

use futures::{Sink, SinkExt, Stream};
use futures::{future::BoxFuture, Sink, SinkExt, Stream};
use tokio::sync::mpsc::{
self,
error::{TryRecvError, TrySendError},
Expand Down
6 changes: 4 additions & 2 deletions msg-sim/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use std::{collections::HashMap, io, net::IpAddr, time::Duration};
#![doc(issue_tracker_base_url = "https://github.com/chainbound/msg-rs/issues/")]
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]

pub use protocol::Protocol;
use std::{collections::HashMap, io, net::IpAddr, time::Duration};

mod protocol;
pub use protocol::Protocol;

#[cfg(target_os = "macos")]
pub mod dummynet;
Expand Down
4 changes: 2 additions & 2 deletions msg-socket/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ tokio-util.workspace = true
thiserror.workspace = true
rustc-hash.workspace = true
tracing.workspace = true
async-trait.workspace = true
tokio-stream.workspace = true
rand.workspace = true
parking_lot.workspace = true

[dev-dependencies]
rand.workspace = true

msg-sim.workspace = true

tracing-subscriber = "0.3"
8 changes: 4 additions & 4 deletions msg-socket/src/connection/state.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
use std::net::SocketAddr;
use msg_transport::Address;

use super::Backoff;

/// Abstraction to represent the state of a connection.
///
/// * `C` is the channel type, which is used to send and receive generic messages.
/// * `B` is the backoff type, used to control the backoff state for inactive connections.
pub enum ConnectionState<C, B> {
pub enum ConnectionState<C, B, A: Address> {
Active {
/// Channel to control the underlying connection. This is used to send
/// and receive any kind of message in any direction.
channel: C,
},
Inactive {
addr: SocketAddr,
addr: A,
/// The current backoff state for inactive connections.
backoff: B,
},
}

impl<C, B: Backoff> ConnectionState<C, B> {
impl<C, B: Backoff, A: Address> ConnectionState<C, B, A> {
/// Returns `true` if the connection is active.
#[allow(unused)]
pub fn is_active(&self) -> bool {
Expand Down
10 changes: 7 additions & 3 deletions msg-socket/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use std::net::SocketAddr;
#![doc(issue_tracker_base_url = "https://github.com/chainbound/msg-rs/issues/")]
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
#![cfg_attr(not(test), warn(unused_crate_dependencies))]

use msg_transport::Address;
use tokio::io::{AsyncRead, AsyncWrite};

#[path = "pub/mod.rs"]
Expand Down Expand Up @@ -36,8 +40,8 @@ pub trait Authenticator: Send + Sync + Unpin + 'static {
fn authenticate(&self, id: &Bytes) -> bool;
}

pub(crate) struct AuthResult<S: AsyncRead + AsyncWrite> {
pub(crate) struct AuthResult<S: AsyncRead + AsyncWrite, A: Address> {
id: Bytes,
addr: SocketAddr,
addr: A,
stream: S,
}
28 changes: 16 additions & 12 deletions msg-socket/src/pub/driver.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use futures::{stream::FuturesUnordered, Future, SinkExt, StreamExt};
use std::{
io,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};

use futures::{stream::FuturesUnordered, Future, SinkExt, StreamExt};
use tokio::{sync::broadcast, task::JoinSet};
use tokio_util::codec::Framed;
use tracing::{debug, error, info, warn};
Expand All @@ -13,10 +14,11 @@ use super::{
session::SubscriberSession, trie::PrefixTrie, PubError, PubMessage, PubOptions, SocketState,
};
use crate::{AuthResult, Authenticator};
use msg_transport::{PeerAddress, Transport};
use msg_transport::{Address, PeerAddress, Transport};
use msg_wire::{auth, pubsub};

pub(crate) struct PubDriver<T: Transport> {
#[allow(clippy::type_complexity)]
pub(crate) struct PubDriver<T: Transport<A>, A: Address> {
/// Session ID counter.
pub(super) id_counter: u32,
/// The server transport used to accept incoming connections.
Expand All @@ -30,14 +32,15 @@ pub(crate) struct PubDriver<T: Transport> {
/// A set of pending incoming connections, represented by [`Transport::Accept`].
pub(super) conn_tasks: FuturesUnordered<T::Accept>,
/// A joinset of authentication tasks.
pub(super) auth_tasks: JoinSet<Result<AuthResult<T::Io>, PubError>>,
pub(super) auth_tasks: JoinSet<Result<AuthResult<T::Io, A>, PubError>>,
/// The receiver end of the message broadcast channel. The sender half is stored by [`PubSocket`](super::PubSocket).
pub(super) from_socket_bcast: broadcast::Receiver<PubMessage>,
}

impl<T> Future for PubDriver<T>
impl<T, A> Future for PubDriver<T, A>
where
T: Transport + Unpin + 'static,
T: Transport<A> + Unpin + 'static,
A: Address,
{
type Output = Result<(), PubError>;

Expand All @@ -50,7 +53,7 @@ where
match auth {
Ok(auth) => {
// Run custom authenticator
debug!("Authentication passed for {:?} ({})", auth.id, auth.addr);
debug!("Authentication passed for {:?} ({:?})", auth.id, auth.addr);

let mut framed = Framed::new(auth.stream, pubsub::Codec::new());
framed.set_backpressure_boundary(this.options.backpressure_boundary);
Expand Down Expand Up @@ -128,21 +131,22 @@ where
}
}

impl<T> PubDriver<T>
impl<T, A> PubDriver<T, A>
where
T: Transport + Unpin + 'static,
T: Transport<A> + Unpin + 'static,
A: Address,
{
/// Handles an incoming connection. If this returns an error, the active connections counter
/// should be decremented.
fn on_incoming(&mut self, io: T::Io) -> Result<(), io::Error> {
let addr = io.peer_addr()?;

info!("New connection from {}", addr);
info!("New connection from {:?}", addr);

// If authentication is enabled, start the authentication process
if let Some(ref auth) = self.auth {
let authenticator = Arc::clone(auth);
debug!("New connection from {}, authenticating", addr);
debug!("New connection from {:?}, authenticating", addr);
self.auth_tasks.spawn(async move {
let mut conn = Framed::new(io, auth::Codec::new_server());

Expand Down Expand Up @@ -201,7 +205,7 @@ where

self.id_counter = self.id_counter.wrapping_add(1);
debug!(
"New connection from {}, session ID {}",
"New connection from {:?}, session ID {}",
addr, self.id_counter
);
}
Expand Down
4 changes: 2 additions & 2 deletions msg-socket/src/pub/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,9 +403,9 @@ mod tests {

pub_socket.bind("0.0.0.0:0").await.unwrap();

let mut sub1 = SubSocket::<Tcp>::with_options(Tcp::default(), SubOptions::default());
let mut sub1 = SubSocket::with_options(Tcp::default(), SubOptions::default());

let mut sub2 = SubSocket::<Tcp>::with_options(Tcp::default(), SubOptions::default());
let mut sub2 = SubSocket::with_options(Tcp::default(), SubOptions::default());

let addr = pub_socket.local_addr().unwrap();

Expand Down
Loading

0 comments on commit 4dd4341

Please sign in to comment.