This repository has been archived by the owner on Jul 15, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Some improvements to the hydroflow template (#14)
1. Default server address, so you don't have to copy an address around when starting multiple clients to test a full star network configuration. 2. Help messages for all the arguments. 3. More hand-holding documentation, mostly condensed from the [first networking chapter](https://hydro.run/docs/hydroflow/quickstart/example_7_echo_server) 4. Updated README.md to provide more runnable commands.
- Loading branch information
1 parent
ea17247
commit 5d68cd7
Showing
7 changed files
with
216 additions
and
152 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,41 +1,69 @@ | ||
use crate::helpers::print_graph; | ||
use crate::protocol::Message; | ||
use crate::Opts; | ||
use crate::{Opts, DEFAULT_SERVER_ADDRESS}; | ||
use chrono::prelude::*; | ||
use hydroflow::hydroflow_syntax; | ||
use hydroflow::util::{UdpSink, UdpStream}; | ||
use hydroflow::util::{bind_udp_bytes, ipv4_resolve}; | ||
use std::net::SocketAddr; | ||
|
||
pub(crate) async fn run_client(outbound: UdpSink, inbound: UdpStream, opts: Opts) { | ||
// server_addr is required for client | ||
let server_addr = match opts.server_addr { | ||
Some(addr) => { | ||
println!("Connecting to server at {:?}", addr); | ||
addr | ||
} | ||
None => panic!("Client requires a server address"), | ||
}; | ||
println!("Client live!"); | ||
/// Runs the client. The client is a long-running process that reads stdin, and sends messages that | ||
/// it receives to the server. The client also prints any messages it receives to stdout. | ||
pub(crate) async fn run_client(opts: Opts) { | ||
// Client listens on a port picked by the OS. | ||
let client_addr = ipv4_resolve("localhost:0").unwrap(); | ||
|
||
// Use the server address that was provided in the command-line arguments, or use the default | ||
// if one was not provided. | ||
let server_addr = opts | ||
.address | ||
.unwrap_or_else(|| ipv4_resolve(DEFAULT_SERVER_ADDRESS).unwrap()); | ||
|
||
// Bind a client-side socket to the requested address and port. The OS will allocate a port and | ||
// the actual port used will be available in `actual_client_addr`. | ||
// | ||
// `outbound` is a `UdpSink`, we use it to send messages. `inbound` is `UdpStream`, we use it | ||
// to receive messages. | ||
// | ||
// bind_udp_bytes is an async function, so we need to await it. | ||
let (outbound, inbound, allocated_client_addr) = bind_udp_bytes(client_addr).await; | ||
|
||
println!( | ||
"Client is live! Listening on {:?} and talking to server on {:?}", | ||
allocated_client_addr, server_addr | ||
); | ||
|
||
// The skeletal hydroflow spec for a client. | ||
let mut flow = hydroflow_syntax! { | ||
// Define shared inbound and outbound channels | ||
inbound_chan = source_stream_serde(inbound) -> map(|udp_msg| udp_msg.unwrap()) /* -> tee() */; // commented out since we only use this once in the client template | ||
|
||
outbound_chan = // union() -> // commented out since we only use this once in the client template | ||
dest_sink_serde(outbound); | ||
// Whenever a serialized message is received by the application from a particular address, | ||
// a (serialized_payload, address_of_sender) pair is emitted by the `inbound` stream. | ||
// | ||
// `source_stream_serde` deserializes the payload into a | ||
// (deserialized_payload, address_of_sender) pair. | ||
inbound_chan = source_stream_serde(inbound) | ||
-> map(Result::unwrap); // If the deserialization was unsuccessful, this line will panic. | ||
|
||
// Mirrors the inbound process on the outbound side. | ||
// `dest_sink_serde` accepts a (`Message`, `SocketAddr`) pair and serializes the `Message` | ||
// using `serde`, converting it to a (serialized_payload, address_of_receiver) pair. | ||
// `outbound` transmits the serialized_payload to the address. | ||
outbound_chan = dest_sink_serde(outbound); | ||
|
||
// Print all messages for debugging purposes | ||
inbound_chan[1] | ||
// Print all messages for debugging purposes. | ||
inbound_chan | ||
-> for_each(|(m, a): (Message, SocketAddr)| println!("{}: Got {:?} from {:?}", Utc::now(), m, a)); | ||
|
||
// take stdin and send to server as an Message::Echo | ||
source_stdin() -> map(|l| (Message::Echo{ payload: l.unwrap(), ts: Utc::now(), }, server_addr) ) | ||
-> outbound_chan; | ||
// Consume input from stdin and send to server as Message::Echo | ||
source_stdin() // A stream of lines from stdin. | ||
-> map(|l| (Message::Echo{ payload: l.unwrap(), ts: Utc::now(), }, server_addr) ) | ||
-> outbound_chan; // Send it to the server | ||
}; | ||
|
||
// If a graph was requested to be printed, print it. | ||
if let Some(graph) = opts.graph { | ||
print_graph(&flow, graph); | ||
print_graph(&flow, graph, opts.write_config); | ||
} | ||
|
||
// Run the client. This is an async function, so we need to await it. | ||
flow.run_async().await; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,16 +1,9 @@ | ||
use crate::GraphType; | ||
use hydroflow::lang::graph::{WriteConfig, WriteGraphType}; | ||
use hydroflow::scheduled::graph::Hydroflow; | ||
|
||
pub fn print_graph(flow: &Hydroflow, graph: GraphType) { | ||
let meta_graph = flow | ||
pub fn print_graph(flow: &Hydroflow, graph: WriteGraphType, write_config: Option<WriteConfig>) { | ||
let serde_graph = flow | ||
.meta_graph() | ||
.expect("No graph found, maybe failed to parse."); | ||
match graph { | ||
GraphType::Mermaid => { | ||
println!("{}", meta_graph.to_mermaid(&Default::default())); | ||
} | ||
GraphType::Dot => { | ||
println!("{}", meta_graph.to_dot(&Default::default())) | ||
} | ||
} | ||
serde_graph.open_graph(graph, write_config).unwrap(); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,18 @@ | ||
use chrono::prelude::*; | ||
use serde::{Deserialize, Serialize}; | ||
|
||
/// Contains all the messages that can be exchanged between application instances. The `Serialize` | ||
/// and `Deserialize` traits allow for serialization by the `serde` crate. | ||
#[derive(PartialEq, Clone, Serialize, Deserialize, Debug)] | ||
pub enum Message { | ||
Echo { payload: String, ts: DateTime<Utc> }, | ||
/// Echo message contains a string payload, and a timestamp at which the message was | ||
/// constructed. | ||
Echo { | ||
payload: String, | ||
ts: DateTime<Utc>, | ||
}, | ||
|
||
/// Heartbeat messages carry no information other than their type. | ||
Heartbeat, | ||
HeartbeatAck, | ||
} |
Oops, something went wrong.