Skip to content

Commit

Permalink
docs(hydro_lang): add initial docs for clusters
Browse files Browse the repository at this point in the history
  • Loading branch information
shadaj committed Jan 28, 2025
1 parent 3288290 commit 53ab25f
Showing 1 changed file with 123 additions and 2 deletions.
125 changes: 123 additions & 2 deletions docs/docs/hydro/locations/clusters.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,129 @@ sidebar_position: 1
---

# Clusters
:::caution
When building scalable distributed systems in Hydro, you'll often need to use **clusters**, which represent groups of threads all running the _same_ piece of your program (Single-Program-Multiple-Data). They can be used to implement scale-out systems using techniques such as sharding or replication. Unlike processes, the number of threads in a cluster does not need to be static, and can be chosen during deployment.

The Hydro documentation is currently under active development! This page is a placeholder for future content.
Like when creating a process, you can pass in a type parameter to a cluster to distinguish it from other clusters. For example, you can create a cluster with a marker of `Worker` to represent a worker in a distributed system:

```rust,no_run
# use hydro_lang::*;
struct Worker {}
let flow = FlowBuilder::new();
let workers: Cluster<Worker> = flow.cluster::<Worker>();
```

We can then instantiate a live collection on the cluster using the same APIs as for processes. For example, we can create a stream of integers on the worker cluster. If we launch this program, **each** member of the cluster will create a stream containing the elements 1, 2, 3, and 4:

```rust,no_run
# use hydro_lang::*;
# struct Worker {}
# let flow = FlowBuilder::new();
# let workers: Cluster<Worker> = flow.cluster::<Worker>();
let numbers = workers.source_iter(q!(vec![1, 2, 3, 4]));
```

## Networking
When sending a live collection from a cluster to another location, **each** member of the cluster will send its local collection. On the receiver side, these collections will be joined together into a single stream of `(ID, Data)` tuples where the ID uniquely identifies which member of the cluster the data came from. For example, we can send a stream from the worker cluster to another process using the `send_bincode` method:

```rust
# use hydro_lang::*;
# use dfir_rs::futures::StreamExt;
# tokio_test::block_on(test_util::multi_location_test(|flow, process| {
# let workers: Cluster<()> = flow.cluster::<()>();
let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
numbers.send_bincode(&process)
# }, |mut stream| async move {
// if there are 4 members in the cluster, we should receive 4 elements
// (ClusterId::<Worker>(0), 1), (ClusterId::<Worker>(1), 1), (ClusterId::<Worker>(2), 1), (ClusterId::<Worker>(3), 1)
# let mut results = Vec::new();
# for w in 0..4 {
# results.push(format!("{:?}", stream.next().await.unwrap()));
# }
# results.sort();
# assert_eq!(results, vec!["(ClusterId::<()>(0), 1)", "(ClusterId::<()>(1), 1)", "(ClusterId::<()>(2), 1)", "(ClusterId::<()>(3), 1)"]);
# }));
```

In the reverse direction, when sending a stream _to_ a cluster, the sender must prepare `(ID, Data)` tuples, where the ID uniquely identifies which member of the cluster the data is intended for. For example, we can send a stream from a process to the worker cluster using the `send_bincode` method:

```rust
# use hydro_lang::*;
# use dfir_rs::futures::StreamExt;
# tokio_test::block_on(test_util::multi_location_test(|flow, p2| {
# let p1 = flow.process::<()>();
# let workers: Cluster<()> = flow.cluster::<()>();
let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
let on_worker: Stream<_, Cluster<_>, _> = numbers
.map(q!(|x| (ClusterId::from_raw(x), x)))
.send_bincode(&workers);
on_worker.send_bincode(&p2)
// if there are 4 members in the cluster, we should receive 4 elements
// (ClusterId::<Worker>(0), 0), (ClusterId::<Worker>(1), 1), (ClusterId::<Worker>(2), 2), (ClusterId::<Worker>(3), 3)
# }, |mut stream| async move {
# let mut results = Vec::new();
# for w in 0..4 {
# results.push(format!("{:?}", stream.next().await.unwrap()));
# }
# results.sort();
# assert_eq!(results, vec!["(ClusterId::<()>(0), 0)", "(ClusterId::<()>(1), 1)", "(ClusterId::<()>(2), 2)", "(ClusterId::<()>(3), 3)"]);
# }));
```

## Broadcasting and Membership Lists
A common pattern in distributed systems is to broadcast data to all members of a cluster. In Hydro, this can be achieved using `broadcast_bincode`, which takes in a stream of **only data elements** and broadcasts them to all members of the cluster. For example, we can broadcast a stream of integers to the worker cluster:

```rust
# use hydro_lang::*;
# use dfir_rs::futures::StreamExt;
# tokio_test::block_on(test_util::multi_location_test(|flow, p2| {
# let p1 = flow.process::<()>();
# let workers: Cluster<()> = flow.cluster::<()>();
let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast_bincode(&workers);
on_worker.send_bincode(&p2)
// if there are 4 members in the cluster, we should receive 4 elements
// (ClusterId::<Worker>(0), 123), (ClusterId::<Worker>(1), 123), (ClusterId::<Worker>(2), 123), (ClusterId::<Worker>(3), 123)
# }, |mut stream| async move {
# let mut results = Vec::new();
# for w in 0..4 {
# results.push(format!("{:?}", stream.next().await.unwrap()));
# }
# results.sort();
# assert_eq!(results, vec!["(ClusterId::<()>(0), 123)", "(ClusterId::<()>(1), 123)", "(ClusterId::<()>(2), 123)", "(ClusterId::<()>(3), 123)"]);
# }));
```

Under the hood, the `broadcast_bincode` API uses a list of members of the cluster provided by the deployment system. To manually access this list, you can use the `members` method on a cluster to get a value that can be used inside `q!(...)` blocks:

```rust
# use hydro_lang::*;
# use dfir_rs::futures::StreamExt;
# tokio_test::block_on(test_util::multi_location_test(|flow, p2| {
# let p1 = flow.process::<()>();
# let workers: Cluster<()> = flow.cluster::<()>();
# // do nothing on each worker
# workers.source_iter(q!(vec![])).for_each(q!(|_: ()| {}));
let cluster_members = workers.members();
let members_stream: Stream<ClusterId<_>, Process<_>, _> = p1
.source_iter(q!(cluster_members /* : &[ClusterId<Worker>] */))
.cloned();
members_stream.send_bincode(&p2)
// if there are 4 members in the cluster, we should receive 4 elements
// ClusterId::<Worker>(0), ClusterId::<Worker>(1), ClusterId::<Worker>(2), ClusterId::<Worker>(3)
# }, |mut stream| async move {
# let mut results = Vec::new();
# for w in 0..4 {
# results.push(format!("{:?}", stream.next().await.unwrap()));
# }
# results.sort();
# assert_eq!(results, vec!["ClusterId::<()>(0)", "ClusterId::<()>(1)", "ClusterId::<()>(2)", "ClusterId::<()>(3)"]);
# }));
```

## Self-Identification
:::warning

The Hydro documentation is currently under active development! This is a placeholder for future content.

:::

0 comments on commit 53ab25f

Please sign in to comment.