Skip to content

Commit

Permalink
docs(hydroflow_plus): add docs for using clusters
Browse files Browse the repository at this point in the history
  • Loading branch information
shadaj committed Jan 2, 2024
1 parent 2addaed commit 98da8bf
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 12 deletions.
140 changes: 140 additions & 0 deletions docs/docs/hydroflow_plus/clusters.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
---
sidebar_position: 4
---

# Node Clusters
So far, we have looked at distributed systems where there is a single node running each piece of the compute graph -- **compute parallelism** (like pipelining). However, we can also use Hydroflow+ to run the same computation on multiple nodes -- achieving **data parallelism** (like replication and partitioning). This is done by creating a **cluster** of nodes that all run the same subgraph.

## Creating Clusters
Just like we use `NodeBuilder` to create nodes, we use `ClusterBuilder` to create clusters. We can then use the `graph.cluster(builder)` method to instantiate a cluster in our graph. Let's create a simple application where a leader node broadcasts data to a cluster of workers.

We start with the standard architecture, with a flow graph and a runtime entrypoint, but now take a cluster builder in addition to a node builder.

:::tip

If you have been following along with the Hydroflow+ template, you'll now need to declare a new module for this example. Create a new file at `flow/src/broadcast.rs` and add the following to `flow/src/lib.rs`:

```rust title="flow/src/lib.rs"
pub mod broadcast;
```

:::


```rust title="flow/src/broadcast.rs"
use hydroflow_plus::*;
use hydroflow_plus::node::*;
use stageleft::*;

pub fn broadcast<'a, D: Deploy<'a>>(
graph: &'a GraphBuilder<'a, D>,
node_builder: &impl NodeBuilder<'a, D>,
cluster_builder: &impl ClusterBuilder<'a, D>
) {
let leader = graph.node(node_builder);
let workers = graph.cluster(cluster_builder);
}
```

## Broadcasting Data
When sending data between individual nodes, we used the `send_bincode` operator. When sending data from a node to a cluster, we can use the `broadcast_bincode` operator instead.

```rust
let data = leader.source_iter(q!(0..10));
data
.broadcast_bincode(&workers)
.for_each(q!(|n| println!("{}", n)));
```

The `Stream` returned by `broadcast_bincode` represents the data received on _each_ node in the cluster. Because all nodes in a cluster run the exact same computation, we can then use the `for_each` operator directly on that stream to print the data on each node.

## Deploying Graphs with Clusters
To deploy this application, we must set up the runtime entrypoint and the Hydro Deploy configuration. The entrypoint looks similar to before, but now uses the CLI data to instantiate the cluster as well.

```rust title="flow/src/broadcast.rs"
use hydroflow_plus::util::cli::HydroCLI;
use hydroflow_plus_cli_integration::{CLIRuntime, HydroflowPlusMeta};

#[stageleft::entry]
pub fn broadcast_runtime<'a>(
graph: &'a GraphBuilder<'a, CLIRuntime>,
cli: RuntimeData<&'a HydroCLI<HydroflowPlusMeta>>,
) -> impl Quoted<'a, Hydroflow<'a>> {
broadcast(graph, &cli, &cli);
graph.build(q!(cli.meta.subgraph_id))
}
```

Our binary (`src/bin/broadcast.rs`) looks similar to before:

```rust title="flow/src/bin/broadcast.rs"
#[tokio::main]
async fn main() {
let ports = hydroflow_plus::util::cli::init().await;

hydroflow_plus::util::cli::launch_flow(
flow::broadcast::broadcast_runtime!(&ports)
).await;
}
```

Finally, our deployment script (`examples/broadcast.rs`) instantiates multiple services for the leader node and the workers. Because we are sharing the deployment across multiple builders, we wrap it in a `RefCell`. Since this script defines the physical deployment, we explicitly instantiate multiple services for the cluster builder, returning a `Vec` of services. We also set a display name for each service so that we can tell them apart in the logs.

```rust title="flow/examples/broadcast.rs"
use std::cell::RefCell;

use hydro_deploy::{Deployment, HydroflowCrate};
use hydroflow_plus_cli_integration::{CLIDeployNodeBuilder, CLIDeployClusterBuilder};

#[tokio::main]
async fn main() {
let deployment = RefCell::new(Deployment::new());
let localhost = deployment.borrow_mut().Localhost();

let builder = hydroflow_plus::GraphBuilder::new();
flow::broadcast::broadcast(
&builder,
&CLIDeployNodeBuilder::new(|| {
let mut deployment = deployment.borrow_mut();
deployment.add_service(
HydroflowCrate::new(".", localhost.clone())
.bin("broadcast")
.profile(profile)
.display_name("leader"),
)
}),
&CLIDeployClusterBuilder::new(|| {
let mut deployment = deployment.borrow_mut();
(0..2)
.map(|idx| {
deployment.add_service(
HydroflowCrate::new(".", localhost.clone())
.bin("broadcast")
.profile(profile)
.display_name(format!("worker/{}", idx)),
)
})
.collect()
}),
);

let mut deployment = deployment.into_inner();

deployment.deploy().await.unwrap();

deployment.start().await.unwrap();

tokio::signal::ctrl_c().await.unwrap()
}
```

If we run this script, we should see the following output:

```bash
$ cargo run -p flow --example broadcast
[worker/0] 0
[worker/1] 0
[worker/0] 1
[worker/1] 1
...
```
14 changes: 7 additions & 7 deletions docs/docs/hydroflow_plus/distributed.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ sidebar_position: 3
# Distributed Hydroflow+
Continuing from our previous example, we will now look at how to extend our program to run on multiple nodes. Recall that our previous flow graph looked like this:

```rust
```rust title="flow/src/first_ten.rs"
use hydroflow_plus::*;
use hydroflow_plus::node::*;
use stageleft::*;
Expand All @@ -22,7 +22,7 @@ pub fn first_ten<'a, D: LocalDeploy<'a>>(

## The Flow Graph
Let's extend this example to print the numbers on a separate node. First, we need to specify that our flow graph will involve the network. We do this by replacing the `LocalDeploy<'a>` trait bound with the general `Deploy<'a>`. Then, we can use the `node_builder` to create a second node:
```rust
```rust title="flow/src/first_ten_distributed.rs"
use hydroflow_plus::*;
use hydroflow_plus::node::*;
use stageleft::*;
Expand All @@ -48,7 +48,7 @@ numbers
## The Runtime
Now that our graph spans multiple nodes, our runtime entrypoint will involve multiple subgraphs. This means we can't get away with `build_single`. Instead, we must take the subgraph ID as a runtime parameter (`subgraph_id`) to select the appropriate graph. In addition, our dataflow involves the network, so we take a `HydroCLI` runtime parameter (`cli`) so that nodes can look up their network connections and instantiate the flow graph with access to it.

```rust
```rust title="flow/src/first_ten_distributed.rs"
use hydroflow_plus::util::cli::HydroCLI;
use hydroflow_plus_cli_integration::{CLIRuntime, HydroflowPlusMeta};

Expand All @@ -64,7 +64,7 @@ pub fn first_ten_distributed_runtime<'a>(

The corresponding binary in `src/bin/first_ten_distributed.rs` then instantiates the CLI and reads the node ID from the command line arguments:

```rust
```rust title="flow/src/bin/first_ten_distributed.rs"
#[tokio::main]
async fn main() {
hydroflow_plus::util::cli::launch(
Expand All @@ -76,7 +76,7 @@ async fn main() {
## The Deployment
Finally, we need to deploy our dataflow with the appropriate network topology. We achieve this by using [Hydro Deploy](../deploy/index.md). Hydroflow+ integrates with Hydro Deploy to automatically construct the topology based on the flow graph. We can create a new file `examples/first_ten_distributed.rs` with the following contents:

```rust
```rust title="flow/examples/first_ten_distributed.rs"
use hydro_deploy::{Deployment, HydroflowCrate};
use hydroflow_plus_cli_integration::CLIDeployNodeBuilder;

Expand All @@ -86,7 +86,7 @@ async fn main() {
let localhost = deployment.Localhost();

let builder = hydroflow_plus::GraphBuilder::new();
hydroflow_plus_test::first_ten::first_ten_distributed(
flow::first_ten::first_ten_distributed(
&builder,
&CLIDeployNodeBuilder::new(|| {
deployment.add_service(
Expand All @@ -110,7 +110,7 @@ Most importantly, we specify a `CLIDeployNodeBuilder`, which takes a closure tha
We can then run our distributed dataflow with:

```bash
$ cargo run --example first_ten_distributed
$ cargo run -p flow --example first_ten_distributed
[service/1] 0
[service/1] 1
[service/1] 2
Expand Down
21 changes: 16 additions & 5 deletions docs/docs/hydroflow_plus/structure.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,20 @@ Hydroflow+ programs require special structure to support code generation and dis

Let's look a minimal example of a Hydroflow+ program. We'll start with a simple flow graph that prints out the first 10 natural numbers. First, we'll define the **flow graph**.

:::tip

We recommend using the Hydroflow+ template to get started with a new project. The template comes with a pre-configured build system and the following example pre-implemented.

```bash
$ cargo install cargo-generate
$ cargo generate hydro-project/hydroflow-plus-template
```

:::


## The Flow Graph
```rust
```rust title="flow/src/first_ten.rs"
use hydroflow_plus::*;
use hydroflow_plus::node::*;
use stageleft::*;
Expand Down Expand Up @@ -51,7 +62,7 @@ Next, we need to instantiate our flow graph into a runnable Rust binary. We do t

To define the entrypoint, we use the `#[stageleft::entry]` macro, which takes the graph being built and returns a generated Hydroflow program:

```rust
```rust title="flow/src/first_ten.rs"
#[stageleft::entry]
pub fn first_ten_runtime<'a>(
graph: &'a GraphBuilder<'a, SingleGraph>
Expand All @@ -63,17 +74,17 @@ pub fn first_ten_runtime<'a>(

Because we are using a `SingleGraph` deployment, we can use the `build_single` method to generate a runnable Hydroflow program. Stageleft entries are usable as macros from other programs. In our case, we will instantiate our entrypoint from the Rust binary for our dataflow. We can create a new file `src/bin/first_ten.rs` with the following contents:

```rust
```rust title="flow/src/bin/first_ten.rs"
#[tokio::main]
async fn main() {
flow::first_ten_runtime!().run_async().await;
flow::first_ten::first_ten_runtime!().run_async().await;
}
```

We can now run this binary to see the output of our dataflow:

```bash
$ cargo run --bin first_ten
$ cargo run -p flow --bin first_ten
0
1
2
Expand Down

0 comments on commit 98da8bf

Please sign in to comment.