From 98da8bf8466c77a27e90437ff8f415629229ec00 Mon Sep 17 00:00:00 2001 From: Shadaj Laddad Date: Tue, 2 Jan 2024 13:10:03 -0800 Subject: [PATCH] docs(hydroflow_plus): add docs for using clusters --- docs/docs/hydroflow_plus/clusters.mdx | 140 +++++++++++++++++++++++ docs/docs/hydroflow_plus/distributed.mdx | 14 +-- docs/docs/hydroflow_plus/structure.mdx | 21 +++- 3 files changed, 163 insertions(+), 12 deletions(-) create mode 100644 docs/docs/hydroflow_plus/clusters.mdx diff --git a/docs/docs/hydroflow_plus/clusters.mdx b/docs/docs/hydroflow_plus/clusters.mdx new file mode 100644 index 000000000000..37caeacea207 --- /dev/null +++ b/docs/docs/hydroflow_plus/clusters.mdx @@ -0,0 +1,140 @@ +--- +sidebar_position: 4 +--- + +# Node Clusters +So far, we have looked at distributed systems where there is a single node running each piece of the compute graph -- **compute parallelism** (like pipelining). However, we can also use Hydroflow+ to run the same computation on multiple nodes -- achieving **data parallelism** (like replication and partitioning). This is done by creating a **cluster** of nodes that all run the same subgraph. + +## Creating Clusters +Just like we use `NodeBuilder` to create nodes, we use `ClusterBuilder` to create clusters. We can then use the `graph.cluster(builder)` method to instantiate a cluster in our graph. Let's create a simple application where a leader node broadcasts data to a cluster of workers. + +We start with the standard architecture, with a flow graph and a runtime entrypoint, but now take a cluster builder in addition to a node builder. + +:::tip + +If you have been following along with the Hydroflow+ template, you'll now need to declare a new module for this example. Create a new file at `flow/src/broadcast.rs` and add the following to `flow/src/lib.rs`: + +```rust title="flow/src/lib.rs" +pub mod broadcast; +``` + +::: + + +```rust title="flow/src/broadcast.rs" +use hydroflow_plus::*; +use hydroflow_plus::node::*; +use stageleft::*; + +pub fn broadcast<'a, D: Deploy<'a>>( + graph: &'a GraphBuilder<'a, D>, + node_builder: &impl NodeBuilder<'a, D>, + cluster_builder: &impl ClusterBuilder<'a, D> +) { + let leader = graph.node(node_builder); + let workers = graph.cluster(cluster_builder); +} +``` + +## Broadcasting Data +When sending data between individual nodes, we used the `send_bincode` operator. When sending data from a node to a cluster, we can use the `broadcast_bincode` operator instead. + +```rust +let data = leader.source_iter(q!(0..10)); +data + .broadcast_bincode(&workers) + .for_each(q!(|n| println!("{}", n))); +``` + +The `Stream` returned by `broadcast_bincode` represents the data received on _each_ node in the cluster. Because all nodes in a cluster run the exact same computation, we can then use the `for_each` operator directly on that stream to print the data on each node. + +## Deploying Graphs with Clusters +To deploy this application, we must set up the runtime entrypoint and the Hydro Deploy configuration. The entrypoint looks similar to before, but now uses the CLI data to instantiate the cluster as well. + +```rust title="flow/src/broadcast.rs" +use hydroflow_plus::util::cli::HydroCLI; +use hydroflow_plus_cli_integration::{CLIRuntime, HydroflowPlusMeta}; + +#[stageleft::entry] +pub fn broadcast_runtime<'a>( + graph: &'a GraphBuilder<'a, CLIRuntime>, + cli: RuntimeData<&'a HydroCLI>, +) -> impl Quoted<'a, Hydroflow<'a>> { + broadcast(graph, &cli, &cli); + graph.build(q!(cli.meta.subgraph_id)) +} +``` + +Our binary (`src/bin/broadcast.rs`) looks similar to before: + +```rust title="flow/src/bin/broadcast.rs" +#[tokio::main] +async fn main() { + let ports = hydroflow_plus::util::cli::init().await; + + hydroflow_plus::util::cli::launch_flow( + flow::broadcast::broadcast_runtime!(&ports) + ).await; +} +``` + +Finally, our deployment script (`examples/broadcast.rs`) instantiates multiple services for the leader node and the workers. Because we are sharing the deployment across multiple builders, we wrap it in a `RefCell`. Since this script defines the physical deployment, we explicitly instantiate multiple services for the cluster builder, returning a `Vec` of services. We also set a display name for each service so that we can tell them apart in the logs. + +```rust title="flow/examples/broadcast.rs" +use std::cell::RefCell; + +use hydro_deploy::{Deployment, HydroflowCrate}; +use hydroflow_plus_cli_integration::{CLIDeployNodeBuilder, CLIDeployClusterBuilder}; + +#[tokio::main] +async fn main() { + let deployment = RefCell::new(Deployment::new()); + let localhost = deployment.borrow_mut().Localhost(); + + let builder = hydroflow_plus::GraphBuilder::new(); + flow::broadcast::broadcast( + &builder, + &CLIDeployNodeBuilder::new(|| { + let mut deployment = deployment.borrow_mut(); + deployment.add_service( + HydroflowCrate::new(".", localhost.clone()) + .bin("broadcast") + .profile(profile) + .display_name("leader"), + ) + }), + &CLIDeployClusterBuilder::new(|| { + let mut deployment = deployment.borrow_mut(); + (0..2) + .map(|idx| { + deployment.add_service( + HydroflowCrate::new(".", localhost.clone()) + .bin("broadcast") + .profile(profile) + .display_name(format!("worker/{}", idx)), + ) + }) + .collect() + }), + ); + + let mut deployment = deployment.into_inner(); + + deployment.deploy().await.unwrap(); + + deployment.start().await.unwrap(); + + tokio::signal::ctrl_c().await.unwrap() +} +``` + +If we run this script, we should see the following output: + +```bash +$ cargo run -p flow --example broadcast +[worker/0] 0 +[worker/1] 0 +[worker/0] 1 +[worker/1] 1 +... +``` diff --git a/docs/docs/hydroflow_plus/distributed.mdx b/docs/docs/hydroflow_plus/distributed.mdx index 82ba347716d3..5d980c017939 100644 --- a/docs/docs/hydroflow_plus/distributed.mdx +++ b/docs/docs/hydroflow_plus/distributed.mdx @@ -5,7 +5,7 @@ sidebar_position: 3 # Distributed Hydroflow+ Continuing from our previous example, we will now look at how to extend our program to run on multiple nodes. Recall that our previous flow graph looked like this: -```rust +```rust title="flow/src/first_ten.rs" use hydroflow_plus::*; use hydroflow_plus::node::*; use stageleft::*; @@ -22,7 +22,7 @@ pub fn first_ten<'a, D: LocalDeploy<'a>>( ## The Flow Graph Let's extend this example to print the numbers on a separate node. First, we need to specify that our flow graph will involve the network. We do this by replacing the `LocalDeploy<'a>` trait bound with the general `Deploy<'a>`. Then, we can use the `node_builder` to create a second node: -```rust +```rust title="flow/src/first_ten_distributed.rs" use hydroflow_plus::*; use hydroflow_plus::node::*; use stageleft::*; @@ -48,7 +48,7 @@ numbers ## The Runtime Now that our graph spans multiple nodes, our runtime entrypoint will involve multiple subgraphs. This means we can't get away with `build_single`. Instead, we must take the subgraph ID as a runtime parameter (`subgraph_id`) to select the appropriate graph. In addition, our dataflow involves the network, so we take a `HydroCLI` runtime parameter (`cli`) so that nodes can look up their network connections and instantiate the flow graph with access to it. -```rust +```rust title="flow/src/first_ten_distributed.rs" use hydroflow_plus::util::cli::HydroCLI; use hydroflow_plus_cli_integration::{CLIRuntime, HydroflowPlusMeta}; @@ -64,7 +64,7 @@ pub fn first_ten_distributed_runtime<'a>( The corresponding binary in `src/bin/first_ten_distributed.rs` then instantiates the CLI and reads the node ID from the command line arguments: -```rust +```rust title="flow/src/bin/first_ten_distributed.rs" #[tokio::main] async fn main() { hydroflow_plus::util::cli::launch( @@ -76,7 +76,7 @@ async fn main() { ## The Deployment Finally, we need to deploy our dataflow with the appropriate network topology. We achieve this by using [Hydro Deploy](../deploy/index.md). Hydroflow+ integrates with Hydro Deploy to automatically construct the topology based on the flow graph. We can create a new file `examples/first_ten_distributed.rs` with the following contents: -```rust +```rust title="flow/examples/first_ten_distributed.rs" use hydro_deploy::{Deployment, HydroflowCrate}; use hydroflow_plus_cli_integration::CLIDeployNodeBuilder; @@ -86,7 +86,7 @@ async fn main() { let localhost = deployment.Localhost(); let builder = hydroflow_plus::GraphBuilder::new(); - hydroflow_plus_test::first_ten::first_ten_distributed( + flow::first_ten::first_ten_distributed( &builder, &CLIDeployNodeBuilder::new(|| { deployment.add_service( @@ -110,7 +110,7 @@ Most importantly, we specify a `CLIDeployNodeBuilder`, which takes a closure tha We can then run our distributed dataflow with: ```bash -$ cargo run --example first_ten_distributed +$ cargo run -p flow --example first_ten_distributed [service/1] 0 [service/1] 1 [service/1] 2 diff --git a/docs/docs/hydroflow_plus/structure.mdx b/docs/docs/hydroflow_plus/structure.mdx index 8f09ccb5531b..fca57d2fdf86 100644 --- a/docs/docs/hydroflow_plus/structure.mdx +++ b/docs/docs/hydroflow_plus/structure.mdx @@ -10,9 +10,20 @@ Hydroflow+ programs require special structure to support code generation and dis Let's look a minimal example of a Hydroflow+ program. We'll start with a simple flow graph that prints out the first 10 natural numbers. First, we'll define the **flow graph**. +:::tip + +We recommend using the Hydroflow+ template to get started with a new project. The template comes with a pre-configured build system and the following example pre-implemented. + +```bash +$ cargo install cargo-generate +$ cargo generate hydro-project/hydroflow-plus-template +``` + +::: + ## The Flow Graph -```rust +```rust title="flow/src/first_ten.rs" use hydroflow_plus::*; use hydroflow_plus::node::*; use stageleft::*; @@ -51,7 +62,7 @@ Next, we need to instantiate our flow graph into a runnable Rust binary. We do t To define the entrypoint, we use the `#[stageleft::entry]` macro, which takes the graph being built and returns a generated Hydroflow program: -```rust +```rust title="flow/src/first_ten.rs" #[stageleft::entry] pub fn first_ten_runtime<'a>( graph: &'a GraphBuilder<'a, SingleGraph> @@ -63,17 +74,17 @@ pub fn first_ten_runtime<'a>( Because we are using a `SingleGraph` deployment, we can use the `build_single` method to generate a runnable Hydroflow program. Stageleft entries are usable as macros from other programs. In our case, we will instantiate our entrypoint from the Rust binary for our dataflow. We can create a new file `src/bin/first_ten.rs` with the following contents: -```rust +```rust title="flow/src/bin/first_ten.rs" #[tokio::main] async fn main() { - flow::first_ten_runtime!().run_async().await; + flow::first_ten::first_ten_runtime!().run_async().await; } ``` We can now run this binary to see the output of our dataflow: ```bash -$ cargo run --bin first_ten +$ cargo run -p flow --bin first_ten 0 1 2