diff --git a/hydroflow/Cargo.toml b/hydroflow/Cargo.toml index d471fed6d46b..d2fcbca653b1 100644 --- a/hydroflow/Cargo.toml +++ b/hydroflow/Cargo.toml @@ -39,7 +39,7 @@ name = "modules_triple_cross_join" required-features = [ "debugging" ] [[example]] -name = "echoserver_websocket" +name = "chat_websocket" required-features = [ "websocket" ] [dependencies] diff --git a/hydroflow/examples/echoserver_websocket/main.rs b/hydroflow/examples/chat_websocket/main.rs similarity index 80% rename from hydroflow/examples/echoserver_websocket/main.rs rename to hydroflow/examples/chat_websocket/main.rs index 17b4d7343fd2..9d18a25e32cd 100644 --- a/hydroflow/examples/echoserver_websocket/main.rs +++ b/hydroflow/examples/chat_websocket/main.rs @@ -40,9 +40,12 @@ async fn main() { inbound_chan[0] -> for_each(|(msg, addr): (Message, SocketAddr)| println!("{}: Got {:?} from {:?}", Utc::now(), msg, addr)); - // Echo back the Echo messages with updated timestamp - inbound_chan[1] - -> map(|(msg, addr)| (msg, addr) ) -> dest_sink(outbound); + clients = inbound_chan[1] -> map(|(_msg, addr)| addr) -> unique::<'static>(); + messages = inbound_chan[2] -> map(|(msg, _addr)| msg); + + messages -> [0]cj; + clients -> [1]cj; + cj = cross_join::<'tick, 'static>() -> inspect(|msg| println!("SEND {:?}", msg)) -> dest_sink(outbound); }; // run the server diff --git a/hydroflow/examples/echoserver/main.rs b/hydroflow/examples/echoserver/main.rs index 7de8f8cae6bc..d483385f251b 100644 --- a/hydroflow/examples/echoserver/main.rs +++ b/hydroflow/examples/echoserver/main.rs @@ -35,7 +35,7 @@ async fn main() { .unwrap_or_else(|| ipv4_resolve("localhost:0").unwrap()); // allocate `outbound` sink and `inbound` stream - let (outbound, inbound, addr) = bind_udp_bytes(addr).await.unwrap(); + let (outbound, inbound, addr) = bind_udp_bytes(addr).await; println!("Listening on {:?}", addr); match opts.role { diff --git a/hydroflow/src/util/websocket.rs b/hydroflow/src/util/websocket.rs index b3c25c6c5238..410628e7fbe4 100644 --- a/hydroflow/src/util/websocket.rs +++ b/hydroflow/src/util/websocket.rs @@ -3,15 +3,25 @@ use std::collections::HashMap; use std::net::SocketAddr; use std::pin::pin; use std::rc::Rc; + use futures::{SinkExt, StreamExt}; -use tokio::net::{TcpListener}; +use tokio::net::TcpListener; use tokio::task::spawn_local; use tokio_tungstenite::tungstenite::{Error, Message}; + use crate::util::unsync::mpsc::{Receiver, Sender}; use crate::util::unsync_channel; - -pub async fn bind_websocket(endpoint: SocketAddr) -> Result<(Sender<(Message, SocketAddr)>, Receiver>, SocketAddr), std::io::Error>{ +pub async fn bind_websocket( + endpoint: SocketAddr, +) -> Result< + ( + Sender<(Message, SocketAddr)>, + Receiver>, + SocketAddr, + ), + std::io::Error, +> { let listener = TcpListener::bind(endpoint).await.unwrap(); let bound_endpoint = listener.local_addr()?; @@ -70,9 +80,7 @@ pub async fn bind_websocket(endpoint: SocketAddr) -> Result<(Sender<(Message, So } }); } - }); Ok((tx_egress, rx_ingress, bound_endpoint)) } -