From 5d68cd7b14a12377626ec0458c91cac476a81095 Mon Sep 17 00:00:00 2001 From: Rohit Kulshreshtha Date: Wed, 27 Mar 2024 10:50:32 -0700 Subject: [PATCH] Some improvements to the hydroflow template (#14) 1. Default server address, so you don't have to copy an address around when starting multiple clients to test a full star network configuration. 2. Help messages for all the arguments. 3. More hand-holding documentation, mostly condensed from the [first networking chapter](https://hydro.run/docs/hydroflow/quickstart/example_7_echo_server) 4. Updated README.md to provide more runnable commands. --- .github/workflows/ci.yml | 4 +- README.md | 121 ++++++++++++++++++--------------------- src/client.rs | 72 ++++++++++++++++------- src/helpers.rs | 15 ++--- src/main.rs | 78 ++++++++++++++----------- src/protocol.rs | 11 +++- src/server.rs | 67 ++++++++++++++++------ 7 files changed, 216 insertions(+), 152 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c4f8f2e..c8c560b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -82,8 +82,8 @@ jobs: echo "$ECHO_STRING" >client-input - "${{ runner.temp }}/generated/target/debug/hydroflow-template" --role server --addr 127.0.0.100:2048 & - "${{ runner.temp }}/generated/target/debug/hydroflow-template" --role client --server-addr 127.0.0.100:2048 client-output & + "${{ runner.temp }}/generated/target/debug/hydroflow-template" --role server --address 127.0.0.100:2048 & + "${{ runner.temp }}/generated/target/debug/hydroflow-template" --role client --address 127.0.0.100:2048 client-output & sleep 1 diff --git a/README.md b/README.md index 95c6d8e..6013a39 100644 --- a/README.md +++ b/README.md @@ -1,86 +1,47 @@ ## Getting Started -This is a template for a Rust project that uses [Hydroflow](http://github.com/hydro-project/hydroflow) for distributed services. To install, simply run +This is a template for a Rust project that uses [Hydroflow](http://github.com/hydro-project/hydroflow) for +distributed services. It implements a simple echo server and client over UDP. + +## Using the Template ```bash cargo generate hydro-project/hydroflow-template ``` -You will be prompted to name your project. Once the command completes, you can cd into the project and build the template. + +You will be prompted to name your project. Once the command completes, you can `cd` into the project and build the +template. + ```bash cd cargo build ``` -## Introducing the Template Application -The template application is a Hydroflow-centric, long-running distributed application (service). As -shipped, it assumes that there are distinct "roles" -(classes of service) that can be launched via the same executable. -By default, the application provides a `server` and `client` role; these are easily overridden in the `Opts` struct in `src/main.rs`. -Command line arguments allow you to launch with a specific role (`--role`) and an address to bind to (`--addr`). -If you don't wish to choose an address or port number, you can simply omit the `--addr` argument and the service -will bind to a random port on localhost. If you want separate executables, you can use this template multiple times to generate -a separate project for each executable. - -By default, the application also allows you to optionally specify the address of a remote server (`--server-addr`) and -a type of dataflow graph to be emitted (`--graph`); -these can be removed from the `Opts` struct in `src/main.rs` if they are not needed. - -For testing its usually helpful to run multiple instances in separate terminals. -Once your code seems to be working correctly, the [hydroplane](https://github.com/hydro-project/hydroplane) project -provides a framework for launching and managing multiple instances of a service either locally or in a distributed environment. - -### Launching the Unmodified Template Application -The provided code implements an echo server and client. To run it unmodified, open 2 terminals. - -In one terminal run the server like so: +## Running the Template +The server can be run in one terminal and one or more clients can be run in separate terminals. +### Server ```console -% cargo run -- --role server --addr localhost:12346 +% cargo run -- --role server ``` -In another terminal run a client like so: +### Client +You can run multiple instances of the client by running the following command: ```console -% cargo run -- --role client --server-addr localhost:12346 +% cargo run -- --role client ``` -The client listens on stdin, and sends (newline-delimited) messages that it receives to the server. -The client also prints any messages it receives to stdout. -Meanwhile, the server waits for messages, which it echoes back to the sender and prints out locally. -The application also includes an optional command-line argument to print out a dataflow graph of the hydroflow code. -Adding the `--graph ` flag to the end of the command lines above will print out a node-and-edge diagram of the chosen service. -Supported values for `` include [mermaid](https://mermaid-js.github.io/) and [dot](https://graphviz.org/doc/info/lang.html). - -## Structure of the Template Project -The `src` directory contains the following files: - -```txt -src/main.rs - This file contains the `main` function, which handles command-line arguments and launches the appropriate service. -src/protocol.rs - This file contains the `Message` enum that defines the messages that can be sent between instances. -src/helpers.rs - This file contains helper functions that are invoked from Hydroflow code in multiple services -src/.rs - The code for a service with the given role. Default files are provided for `server` and `client`. +## Viewing Help +```console +cargo run -- --help ``` -The `src/main.rs` file is where the command-line arguments are parsed and the appropriate service is launched. -It also contains the `Opts` struct, which uses the [clap](https://docs.rs/clap/latest/clap/) crate to -specify the command-line arguments that are accepted by the application. -It is possible to change the command-line arguments by modifying the `Opts` struct. -Before launching the service, the `main` function binds to the specified address and prints out the address that was bound to. - -The `src/protocol` file contains the enum `Message`, which can include messages with very different structures. -The `Message` must provide the `Serialize` and `Deserialize` traits, which are used by the [serde](https://docs.serde.rs/serde/) crate. -In the application, the `Message` enum includes an `Echo` message that has a (`String`) payload and timestamp; it also includes -`Heartbeat` and `HeartbeatAck` messages that carry no information other than their type. (Messages are delivered with the sender -address attached, so these empty message types can be useful.) - -The `src/helpers.rs` file contains any helper functions that are needed. In our example this is just a function to print -a dataflow graph representation of the hydroflow code to stdout. - -Each service file comes with a skeleton Hydroflow spec that provides an inbound communication channel and an outbound communication channel, both -bound to the specified address and port. The channels are named `inbound_chan` and `outbound_chan`, and are accessed in hydroflow code using the -`source_serde` and `sink_serde` operators respectively. The single address/port pair is sufficient in general to support multiple -different `Message` types across multiple services and instances. Upon receipt, messages are handled by the appropriate code using hydroflow's `demux` -operator. It is also possible to open more channels to segregate traffic to different IP addresses or ports, simply by copying the -patterns that define and use the `inbound` and `outbound` channels. +## Template Project Structure +The `src` directory contains the following files: -Each service file also includes code to generate the dataflow graph for the service, if the `--graph` flag is provided on the command line. -The ASCII spec for the graph is printed to stdout on launch. +| File | Description | +|---------------|--------------------------------------------------------------------------------------------------------------------------------------| +| `main.rs` | Contains `main` entry-point function for both client and server. Performs command-line argument parsing. | +| `protocol.rs` | Contains the `Message` enum that defines the messages that can be sent between instances. | +| `.rs` | Contains the service for the given role. Example implementations and skeletal hydroflow spec are provided for `server` and `client`. | +| `helpers.rs` | Contains helper functions that are invoked from Hydroflow code in multiple services. | ## Communication Patterns No particular communication pattern is assumed by Hydroflow. The unmodified template application is designed to be used in a "star topology": @@ -96,7 +57,35 @@ In our experience, when starting a Hydroflow project we recommend a four-step ap 2. **Messages**: Define the basic message types that services will send to each other (in the `Message` enum in `src/protocol.rs`). 3. **Print Received Messages**: Utilize the template logic at each service that prints out messages received. 4. **Exercise Sending Patterns**: Make sure the right messages get to the right recipients! Write simple logic to send out messages in all the message patterns you expect to see (in the `src/.rs` files). -5. **Service Programming**: Begin writing the actual logic for each service, with plenty of `for_each(|m| println!("{:?}", m))` operators +5. **Service Programming**: Begin writing the actual logic for each service, with plenty of `inspect(|m| println!("{:?}", m))` operators peppered throughout! Have fun! + +## Print a Dataflow Graph +The client and server can optionally print out a dataflow graph of their hydroflow code. + +### Mermaid +#### Server +Run the following command and view the messages received by the server on stdout. +```console +% cargo run -- --role server --graph mermaid +``` + +#### Client +Run the following command and type in the messages to send to the server. When the server responds, the echoed message +will be printed on stdout. +```console +% cargo run -- --role client --graph mermaid +``` + +### Dot +#### Server +```console +% cargo run -- --role server --graph dot +``` + +#### Client +```console +% cargo run -- --role client --graph dot +``` \ No newline at end of file diff --git a/src/client.rs b/src/client.rs index 5c622e3..ab5ea2a 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,41 +1,69 @@ use crate::helpers::print_graph; use crate::protocol::Message; -use crate::Opts; +use crate::{Opts, DEFAULT_SERVER_ADDRESS}; use chrono::prelude::*; use hydroflow::hydroflow_syntax; -use hydroflow::util::{UdpSink, UdpStream}; +use hydroflow::util::{bind_udp_bytes, ipv4_resolve}; use std::net::SocketAddr; -pub(crate) async fn run_client(outbound: UdpSink, inbound: UdpStream, opts: Opts) { - // server_addr is required for client - let server_addr = match opts.server_addr { - Some(addr) => { - println!("Connecting to server at {:?}", addr); - addr - } - None => panic!("Client requires a server address"), - }; - println!("Client live!"); +/// Runs the client. The client is a long-running process that reads stdin, and sends messages that +/// it receives to the server. The client also prints any messages it receives to stdout. +pub(crate) async fn run_client(opts: Opts) { + // Client listens on a port picked by the OS. + let client_addr = ipv4_resolve("localhost:0").unwrap(); + + // Use the server address that was provided in the command-line arguments, or use the default + // if one was not provided. + let server_addr = opts + .address + .unwrap_or_else(|| ipv4_resolve(DEFAULT_SERVER_ADDRESS).unwrap()); + + // Bind a client-side socket to the requested address and port. The OS will allocate a port and + // the actual port used will be available in `actual_client_addr`. + // + // `outbound` is a `UdpSink`, we use it to send messages. `inbound` is `UdpStream`, we use it + // to receive messages. + // + // bind_udp_bytes is an async function, so we need to await it. + let (outbound, inbound, allocated_client_addr) = bind_udp_bytes(client_addr).await; + println!( + "Client is live! Listening on {:?} and talking to server on {:?}", + allocated_client_addr, server_addr + ); + + // The skeletal hydroflow spec for a client. let mut flow = hydroflow_syntax! { - // Define shared inbound and outbound channels - inbound_chan = source_stream_serde(inbound) -> map(|udp_msg| udp_msg.unwrap()) /* -> tee() */; // commented out since we only use this once in the client template - outbound_chan = // union() -> // commented out since we only use this once in the client template - dest_sink_serde(outbound); + // Whenever a serialized message is received by the application from a particular address, + // a (serialized_payload, address_of_sender) pair is emitted by the `inbound` stream. + // + // `source_stream_serde` deserializes the payload into a + // (deserialized_payload, address_of_sender) pair. + inbound_chan = source_stream_serde(inbound) + -> map(Result::unwrap); // If the deserialization was unsuccessful, this line will panic. + + // Mirrors the inbound process on the outbound side. + // `dest_sink_serde` accepts a (`Message`, `SocketAddr`) pair and serializes the `Message` + // using `serde`, converting it to a (serialized_payload, address_of_receiver) pair. + // `outbound` transmits the serialized_payload to the address. + outbound_chan = dest_sink_serde(outbound); - // Print all messages for debugging purposes - inbound_chan[1] + // Print all messages for debugging purposes. + inbound_chan -> for_each(|(m, a): (Message, SocketAddr)| println!("{}: Got {:?} from {:?}", Utc::now(), m, a)); - // take stdin and send to server as an Message::Echo - source_stdin() -> map(|l| (Message::Echo{ payload: l.unwrap(), ts: Utc::now(), }, server_addr) ) - -> outbound_chan; + // Consume input from stdin and send to server as Message::Echo + source_stdin() // A stream of lines from stdin. + -> map(|l| (Message::Echo{ payload: l.unwrap(), ts: Utc::now(), }, server_addr) ) + -> outbound_chan; // Send it to the server }; + // If a graph was requested to be printed, print it. if let Some(graph) = opts.graph { - print_graph(&flow, graph); + print_graph(&flow, graph, opts.write_config); } + // Run the client. This is an async function, so we need to await it. flow.run_async().await; } diff --git a/src/helpers.rs b/src/helpers.rs index 9b7aa73..5826487 100644 --- a/src/helpers.rs +++ b/src/helpers.rs @@ -1,16 +1,9 @@ -use crate::GraphType; +use hydroflow::lang::graph::{WriteConfig, WriteGraphType}; use hydroflow::scheduled::graph::Hydroflow; -pub fn print_graph(flow: &Hydroflow, graph: GraphType) { - let meta_graph = flow +pub fn print_graph(flow: &Hydroflow, graph: WriteGraphType, write_config: Option) { + let serde_graph = flow .meta_graph() .expect("No graph found, maybe failed to parse."); - match graph { - GraphType::Mermaid => { - println!("{}", meta_graph.to_mermaid(&Default::default())); - } - GraphType::Dot => { - println!("{}", meta_graph.to_dot(&Default::default())) - } - } + serde_graph.open_graph(graph, write_config).unwrap(); } diff --git a/src/main.rs b/src/main.rs index 6229837..17328ce 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,8 @@ use clap::{Parser, ValueEnum}; use client::run_client; +use hydroflow::lang::graph::{WriteConfig, WriteGraphType}; use hydroflow::tokio; -use hydroflow::util::{bind_udp_bytes, ipv4_resolve}; +use hydroflow::util::ipv4_resolve; use server::run_server; use std::net::SocketAddr; @@ -10,51 +11,62 @@ mod helpers; mod protocol; mod server; -#[derive(Clone, ValueEnum, Debug)] -enum Role { - Client, - Server, -} - -#[derive(Clone, ValueEnum, Debug)] -pub enum GraphType { - Mermaid, - Dot, -} - +/// A simple echo server & client generated using the Hydroflow template. The lines starting with +/// `///` contain the message that appears when you run the compiled binary with the '--help' +/// arguments, so feel free to change it to whatever makes sense for your application. +/// +/// See https://docs.rs/clap/latest/clap/ for more information. #[derive(Parser, Debug)] struct Opts { - #[clap(value_enum, long)] - role: Role, - // #[clap(long)] - #[clap(long, value_parser = ipv4_resolve)] - addr: Option, - // #[clap(long)] + // The `Opts` structure contains the command line arguments accepted by the application and can + // be modified to suit your requirements. Refer to the clap crate documentation for more + // information. + /// The role this application process should assume. The example in the template provides two + /// roles: server and client. The server echoes whatever message the clients send to it. + #[clap(value_enum, long)] // value_enum => parse as enum. long => "--role" instead of "-r". + role: Role, // This is a mandatory argument. + + /// The server's network address. The server listens on this address. The client sends messages + /// to this address. #[clap(long, value_parser = ipv4_resolve)] - server_addr: Option, - #[clap(value_enum, long)] - graph: Option, + // value_parser => parse "ip:port" using ipv4_resolve + address: Option, // Since this is an Option, it is an optional argument. + + /// If specified, a graph representation of the Hydroflow flow used by the program will be + /// printed to the console in the specified format. This parameter can be removed if your + /// application doesn't need this functionality. + #[clap(long)] + graph: Option, + + #[clap(flatten)] + write_config: Option, } #[hydroflow::main] +/// This is the main entry-point for both `Client` and `Server`. async fn main() { - // parse command line arguments + // Parse command line arguments let opts = Opts::parse(); - // if no addr was provided, we ask the OS to assign a local port by passing in "localhost:0" - let addr = opts - .addr - .unwrap_or_else(|| ipv4_resolve("localhost:0").unwrap()); - - // allocate `outbound` sink and `inbound` stream - let (outbound, inbound, addr) = bind_udp_bytes(addr).await; - println!("Listening on {:?}", addr); + // Run the server or the client based on the role provided in the command-line arguments. match opts.role { Role::Server => { - run_server(outbound, inbound, opts).await; + run_server(opts).await; } Role::Client => { - run_client(outbound, inbound, opts).await; + run_client(opts).await; } } } + +/// A running application can assume one of these roles. The launched application process assumes +/// one of these roles, based on the `--role` parameter passed in as a command line argument. +#[derive(Clone, ValueEnum, Debug)] +enum Role { + Client, + Server, +} + +/// The default server address & port on which the server listens for incoming messages. Clients +/// send message to this address & port. +pub const DEFAULT_SERVER_ADDRESS: &str = "localhost:54321"; diff --git a/src/protocol.rs b/src/protocol.rs index d482f88..7d3052c 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -1,9 +1,18 @@ use chrono::prelude::*; use serde::{Deserialize, Serialize}; +/// Contains all the messages that can be exchanged between application instances. The `Serialize` +/// and `Deserialize` traits allow for serialization by the `serde` crate. #[derive(PartialEq, Clone, Serialize, Deserialize, Debug)] pub enum Message { - Echo { payload: String, ts: DateTime }, + /// Echo message contains a string payload, and a timestamp at which the message was + /// constructed. + Echo { + payload: String, + ts: DateTime, + }, + + /// Heartbeat messages carry no information other than their type. Heartbeat, HeartbeatAck, } diff --git a/src/server.rs b/src/server.rs index 2cadeed..7624a61 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,39 +1,71 @@ use crate::helpers::print_graph; use crate::protocol::Message; +use crate::DEFAULT_SERVER_ADDRESS; use chrono::prelude::*; use hydroflow::hydroflow_syntax; use hydroflow::scheduled::graph::Hydroflow; -use hydroflow::util::{UdpSink, UdpStream}; +use hydroflow::util::{bind_udp_bytes, ipv4_resolve}; use std::net::SocketAddr; -pub(crate) async fn run_server(outbound: UdpSink, inbound: UdpStream, opts: crate::Opts) { - println!("Server live!"); +/// Runs the server. The server is a long-running process that listens for messages and echoes +/// them back the client. +pub(crate) async fn run_server(opts: crate::Opts) { + // If a server address & port are provided as command-line inputs, use those, else use the + // default. + let server_address = opts + .address + .unwrap_or_else(|| ipv4_resolve(DEFAULT_SERVER_ADDRESS).unwrap()); + println!("Starting server on {:?}", server_address); + + // Bind a server-side socket to requested address and port. If "0" was provided as the port, the + // OS will allocate a port and the actual port used will be available in `actual_server_addr`. + // + // `outbound` is a `UdpSink`, we use it to send messages. `inbound` is `UdpStream`, we use it + // to receive messages. + // + // This is an async function, so we need to await it. + let (outbound, inbound, actual_server_addr) = bind_udp_bytes(server_address).await; + + println!("Server is live! Listening on {:?}", actual_server_addr); + + // The skeletal hydroflow spec for a server. let mut flow: Hydroflow = hydroflow_syntax! { - // Define shared inbound and outbound channels - inbound_chan = source_stream_serde(inbound) -> map(|udp_msg| udp_msg.unwrap()) -> tee(); - outbound_chan = union() -> dest_sink_serde(outbound); - // Print all messages for debugging purposes - inbound_chan[1] - -> for_each(|(m, a): (Message, SocketAddr)| println!("{}: Got {:?} from {:?}", Utc::now(), m, a)); + // Whenever a serialized message is received by the application from a particular address, + // a (serialized_payload, address_of_sender) pair is emitted by the `inbound` stream. + // + // `source_stream_serde` deserializes the payload into a + // (deserialized_payload, address_of_sender) pair. + inbound_chan = source_stream_serde(inbound) // `source_stream_serde` deserializes the payload + -> map(Result::unwrap); // If the deserialization was unsuccessful, this line will panic. + + // Mirrors the inbound process on the outbound side. + // `dest_sink_serde` accepts a (`Message`, `SocketAddr`) pair and serializes the `Message` + // using `serde`, converting it to a (serialized_payload, address_of_receiver) pair. + // `outbound` transmits the serialized_payload to the address. + outbound_chan = union() -> dest_sink_serde(outbound); // Demux and destructure the inbound messages into separate streams - inbound_demuxed = inbound_chan[0] - -> demux(|(msg, addr), var_args!(echo, heartbeat, errs)| - match msg { + inbound_demuxed = inbound_chan + -> inspect(|(m, a): &(Message, SocketAddr)| println!("{}: Got {:?} from {:?}", Utc::now(), m, a)) // For debugging purposes. + -> demux(|(msg, addr), var_args!(echo, heartbeat, errs)| + match msg { Message::Echo {payload, ..} => echo.give((payload, addr)), Message::Heartbeat => heartbeat.give(addr), _ => errs.give((msg, addr)), } ); - // Echo back the Echo messages with updated timestamp + // Echo a response back to the sender of the echo request. inbound_demuxed[echo] - -> map(|(payload, addr)| (Message::Echo { payload, ts: Utc::now() }, addr) ) -> [0]outbound_chan; + -> map(|(payload, sender_addr)| (Message::Echo { payload, ts: Utc::now() }, sender_addr) ) + -> [0]outbound_chan; // Respond to Heartbeat messages - inbound_demuxed[heartbeat] -> map(|addr| (Message::HeartbeatAck, addr)) -> [1]outbound_chan; + inbound_demuxed[heartbeat] + -> map(|addr| (Message::HeartbeatAck, addr)) + -> [1]outbound_chan; // Print unexpected messages inbound_demuxed[errs] @@ -41,10 +73,11 @@ pub(crate) async fn run_server(outbound: UdpSink, inbound: UdpStream, opts: crat }; + // If a graph was requested to be printed, print it. if let Some(graph) = opts.graph { - print_graph(&flow, graph); + print_graph(&flow, graph, opts.write_config); } - // run the server + // Run the server. This is an async function, so we need to await it. flow.run_async().await; }