diff --git a/examples/stream/README.md b/examples/stream/README.md index e7fa99e08b2..8437a5ea21e 100644 --- a/examples/stream/README.md +++ b/examples/stream/README.md @@ -3,6 +3,8 @@ This example shows the usage of the `stream::Behaviour`. As a counter-part to the `request_response::Behaviour`, the `stream::Behaviour` allows users to write stream-oriented protocols whilst having minimal interaction with the `Swarm`. +In this showcase, we implement an echo protocol: All incoming data is echoed back to the dialer, until the stream is closed. + ## Usage To run the example, follow these steps: diff --git a/examples/stream/src/main.rs b/examples/stream/src/main.rs index debe764d521..6ff5a971abb 100644 --- a/examples/stream/src/main.rs +++ b/examples/stream/src/main.rs @@ -4,10 +4,11 @@ use anyhow::{Context, Result}; use futures::{AsyncReadExt, AsyncWriteExt, StreamExt}; use libp2p::{multiaddr::Protocol, Multiaddr, PeerId, Stream, StreamProtocol}; use libp2p_stream as stream; +use rand::RngCore; use tracing::level_filters::LevelFilter; use tracing_subscriber::EnvFilter; -const PROTOCOL: StreamProtocol = StreamProtocol::new("/my-ping-protocol"); // TODO: Change to echo protocol. +const ECHO_PROTOCOL: StreamProtocol = StreamProtocol::new("/echo"); #[tokio::main] async fn main() -> Result<()> { @@ -34,7 +35,7 @@ async fn main() -> Result<()> { swarm.listen_on("/ip4/127.0.0.1/udp/0/quic-v1".parse()?)?; - let mut incoming_streams = swarm.behaviour_mut().accept(PROTOCOL).unwrap(); + let mut incoming_streams = swarm.behaviour_mut().accept(ECHO_PROTOCOL).unwrap(); // Deal with incoming streams. // Spawning a dedicated task is just one way of doing this. @@ -47,12 +48,15 @@ async fn main() -> Result<()> { // Each task needs memory meaning an aggressive remote peer may force you OOM this way. while let Some((peer, stream)) = incoming_streams.next().await { - if let Err(e) = inbound_ping(stream).await { - tracing::warn!(%peer, "Ping protocol failed: {e}"); - continue; - } - - tracing::info!(%peer, "Handled inbound ping!"); + match echo(stream).await { + Ok(n) => { + tracing::info!(%peer, "Echoed {n} bytes!"); + } + Err(e) => { + tracing::warn!(%peer, "Echo failed: {e}"); + continue; + } + }; } }); @@ -66,7 +70,7 @@ async fn main() -> Result<()> { tokio::spawn(connection_handler( peer_id, - swarm.behaviour().new_control(PROTOCOL), + swarm.behaviour().new_control(ECHO_PROTOCOL), )); } @@ -84,15 +88,15 @@ async fn main() -> Result<()> { } } -/// A very simple, `async fn`-based connection handler for our custom ping protocol. +/// A very simple, `async fn`-based connection handler for our custom echo protocol. async fn connection_handler(peer: PeerId, mut control: stream::Control) { loop { - tokio::time::sleep(Duration::from_secs(1)).await; // Wait a second between pings. + tokio::time::sleep(Duration::from_secs(1)).await; // Wait a second between echos. let stream = match control.open_stream(peer).await { Ok(stream) => stream, Err(stream::OpenStreamError::UnsupportedProtocol) => { - tracing::info!(%peer, %PROTOCOL, "Peer does not support protocol"); + tracing::info!(%peer, %ECHO_PROTOCOL, "Peer does not support protocol"); return; } Err(stream::OpenStreamError::Io(e)) => { @@ -103,38 +107,47 @@ async fn connection_handler(peer: PeerId, mut control: stream::Control) { } }; - if let Err(e) = outbound_ping(stream).await { - tracing::warn!(%peer, "Ping protocol failed: {e}"); + if let Err(e) = send(stream).await { + tracing::warn!(%peer, "Echo protocol failed: {e}"); continue; } - tracing::info!(%peer, "Successful ping!") + tracing::info!(%peer, "Echo complete!") } } -async fn inbound_ping(mut stream: Stream) -> io::Result<()> { - let mut ping = [0u8; 32]; - stream.read_exact(&mut ping).await?; - stream.write_all(&ping).await?; - stream.close().await?; +async fn echo(mut stream: Stream) -> io::Result { + let mut total = 0; - Ok(()) + let mut buf = [0u8; 100]; + + loop { + let read = stream.read(&mut buf).await?; + if read == 0 { + return Ok(total); + } + + total += read; + stream.write_all(&buf[..read]).await?; + } } -async fn outbound_ping(mut stream: Stream) -> io::Result<()> { - let ping = rand::random::<[u8; 32]>(); - stream.write_all(&ping).await?; - stream.close().await?; +async fn send(mut stream: Stream) -> io::Result<()> { + let num_bytes = rand::random::() % 1000; - let mut pong = [0u8; 32]; - stream.read_exact(&mut pong).await?; + let mut bytes = vec![0; num_bytes]; + rand::thread_rng().fill_bytes(&mut bytes); - if ping != pong { - return Err(io::Error::new( - io::ErrorKind::Other, - "ping payload mismatch", - )); + stream.write_all(&bytes).await?; + + let mut buf = vec![0; num_bytes]; + stream.read_exact(&mut buf).await?; + + if bytes != buf { + return Err(io::Error::new(io::ErrorKind::Other, "incorrect echo")); } + stream.close().await?; + Ok(()) }