Skip to content

Commit

Permalink
Change example to echo protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
thomaseizinger committed Dec 30, 2023
1 parent 0e029c3 commit ab9284e
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 32 deletions.
2 changes: 2 additions & 0 deletions examples/stream/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
This example shows the usage of the `stream::Behaviour`.
As a counter-part to the `request_response::Behaviour`, the `stream::Behaviour` allows users to write stream-oriented protocols whilst having minimal interaction with the `Swarm`.

In this showcase, we implement an echo protocol: All incoming data is echoed back to the dialer, until the stream is closed.

## Usage

To run the example, follow these steps:
Expand Down
77 changes: 45 additions & 32 deletions examples/stream/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ use anyhow::{Context, Result};
use futures::{AsyncReadExt, AsyncWriteExt, StreamExt};
use libp2p::{multiaddr::Protocol, Multiaddr, PeerId, Stream, StreamProtocol};
use libp2p_stream as stream;
use rand::RngCore;
use tracing::level_filters::LevelFilter;
use tracing_subscriber::EnvFilter;

const PROTOCOL: StreamProtocol = StreamProtocol::new("/my-ping-protocol"); // TODO: Change to echo protocol.
const ECHO_PROTOCOL: StreamProtocol = StreamProtocol::new("/echo");

#[tokio::main]
async fn main() -> Result<()> {
Expand All @@ -34,7 +35,7 @@ async fn main() -> Result<()> {

swarm.listen_on("/ip4/127.0.0.1/udp/0/quic-v1".parse()?)?;

let mut incoming_streams = swarm.behaviour_mut().accept(PROTOCOL).unwrap();
let mut incoming_streams = swarm.behaviour_mut().accept(ECHO_PROTOCOL).unwrap();

// Deal with incoming streams.
// Spawning a dedicated task is just one way of doing this.
Expand All @@ -47,12 +48,15 @@ async fn main() -> Result<()> {
// Each task needs memory meaning an aggressive remote peer may force you OOM this way.

while let Some((peer, stream)) = incoming_streams.next().await {
if let Err(e) = inbound_ping(stream).await {
tracing::warn!(%peer, "Ping protocol failed: {e}");
continue;
}

tracing::info!(%peer, "Handled inbound ping!");
match echo(stream).await {
Ok(n) => {
tracing::info!(%peer, "Echoed {n} bytes!");
}
Err(e) => {
tracing::warn!(%peer, "Echo failed: {e}");
continue;
}
};
}
});

Expand All @@ -66,7 +70,7 @@ async fn main() -> Result<()> {

tokio::spawn(connection_handler(
peer_id,
swarm.behaviour().new_control(PROTOCOL),
swarm.behaviour().new_control(ECHO_PROTOCOL),
));
}

Expand All @@ -84,15 +88,15 @@ async fn main() -> Result<()> {
}
}

/// A very simple, `async fn`-based connection handler for our custom ping protocol.
/// A very simple, `async fn`-based connection handler for our custom echo protocol.
async fn connection_handler(peer: PeerId, mut control: stream::Control) {
loop {
tokio::time::sleep(Duration::from_secs(1)).await; // Wait a second between pings.
tokio::time::sleep(Duration::from_secs(1)).await; // Wait a second between echos.

let stream = match control.open_stream(peer).await {
Ok(stream) => stream,
Err(stream::OpenStreamError::UnsupportedProtocol) => {
tracing::info!(%peer, %PROTOCOL, "Peer does not support protocol");
tracing::info!(%peer, %ECHO_PROTOCOL, "Peer does not support protocol");
return;
}
Err(stream::OpenStreamError::Io(e)) => {
Expand All @@ -103,38 +107,47 @@ async fn connection_handler(peer: PeerId, mut control: stream::Control) {
}
};

if let Err(e) = outbound_ping(stream).await {
tracing::warn!(%peer, "Ping protocol failed: {e}");
if let Err(e) = send(stream).await {
tracing::warn!(%peer, "Echo protocol failed: {e}");
continue;
}

tracing::info!(%peer, "Successful ping!")
tracing::info!(%peer, "Echo complete!")
}
}

async fn inbound_ping(mut stream: Stream) -> io::Result<()> {
let mut ping = [0u8; 32];
stream.read_exact(&mut ping).await?;
stream.write_all(&ping).await?;
stream.close().await?;
async fn echo(mut stream: Stream) -> io::Result<usize> {
let mut total = 0;

Ok(())
let mut buf = [0u8; 100];

loop {
let read = stream.read(&mut buf).await?;
if read == 0 {
return Ok(total);
}

total += read;
stream.write_all(&buf[..read]).await?;
}
}

async fn outbound_ping(mut stream: Stream) -> io::Result<()> {
let ping = rand::random::<[u8; 32]>();
stream.write_all(&ping).await?;
stream.close().await?;
async fn send(mut stream: Stream) -> io::Result<()> {
let num_bytes = rand::random::<usize>() % 1000;

let mut pong = [0u8; 32];
stream.read_exact(&mut pong).await?;
let mut bytes = vec![0; num_bytes];
rand::thread_rng().fill_bytes(&mut bytes);

if ping != pong {
return Err(io::Error::new(
io::ErrorKind::Other,
"ping payload mismatch",
));
stream.write_all(&bytes).await?;

let mut buf = vec![0; num_bytes];
stream.read_exact(&mut buf).await?;

if bytes != buf {
return Err(io::Error::new(io::ErrorKind::Other, "incorrect echo"));
}

stream.close().await?;

Ok(())
}

0 comments on commit ab9284e

Please sign in to comment.