Skip to content

Commit

Permalink
docs(hydro_lang): add initial docs for clusters
Browse files Browse the repository at this point in the history
  • Loading branch information
shadaj committed Jan 29, 2025
1 parent e043ced commit a5acf02
Show file tree
Hide file tree
Showing 3 changed files with 204 additions and 11 deletions.
2 changes: 1 addition & 1 deletion docs/docs/hydro/live-collections/determinism.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ sidebar_position: 1
---

# Eventual Determinism
Most programs are strong guarantees on **determinism**, the property that when provided the same inputs, the outputs of the program are always the same. Even when the inputs and outputs are live collections, we can focus on the _eventual_ state of the collection (as if we froze the input and waited until the output stops changing).
Most programs have strong guarantees on **determinism**, the property that when provided the same inputs, the outputs of the program are always the same. Even when the inputs and outputs are live collections, we can focus on the _eventual_ state of the collection (as if we froze the input and waited until the output stops changing).

:::info

Expand Down
188 changes: 186 additions & 2 deletions docs/docs/hydro/locations/clusters.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,192 @@ sidebar_position: 1
---

# Clusters
:::caution
When building scalable distributed systems in Hydro, you'll often need to use **clusters**, which represent groups of threads all running the _same_ piece of your program (Single-Program-Multiple-Data). They can be used to implement scale-out systems using techniques such as sharding or replication. Unlike processes, the number of threads in a cluster does not need to be static, and can be chosen during deployment.

The Hydro documentation is currently under active development! This page is a placeholder for future content.
Like when creating a process, you can pass in a type parameter to a cluster to distinguish it from other clusters. For example, you can create a cluster with a marker of `Worker` to represent a worker in a distributed system:

```rust,no_run
# use hydro_lang::*;
struct Worker {}
let flow = FlowBuilder::new();
let workers: Cluster<Worker> = flow.cluster::<Worker>();
```

We can then instantiate a live collection on the cluster using the same APIs as for processes. For example, we can create a stream of integers on the worker cluster. If we launch this program, **each** member of the cluster will create a stream containing the elements 1, 2, 3, and 4:

```rust,no_run
# use hydro_lang::*;
# struct Worker {}
# let flow = FlowBuilder::new();
# let workers: Cluster<Worker> = flow.cluster::<Worker>();
let numbers = workers.source_iter(q!(vec![1, 2, 3, 4]));
```

## Networking
When sending a live collection from a cluster to another location, **each** member of the cluster will send its local collection. On the receiver side, these collections will be joined together into a single stream of `(ID, Data)` tuples where the ID uniquely identifies which member of the cluster the data came from. For example, we can send a stream from the worker cluster to another process using the `send_bincode` method:

```rust
# use hydro_lang::*;
# use dfir_rs::futures::StreamExt;
# tokio_test::block_on(test_util::multi_location_test(|flow, process| {
# let workers: Cluster<()> = flow.cluster::<()>();
let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
numbers.send_bincode(&process)
# }, |mut stream| async move {
// if there are 4 members in the cluster, we should receive 4 elements
// (ClusterId::<Worker>(0), 1), (ClusterId::<Worker>(1), 1), (ClusterId::<Worker>(2), 1), (ClusterId::<Worker>(3), 1)
# let mut results = Vec::new();
# for w in 0..4 {
# results.push(format!("{:?}", stream.next().await.unwrap()));
# }
# results.sort();
# assert_eq!(results, vec!["(ClusterId::<()>(0), 1)", "(ClusterId::<()>(1), 1)", "(ClusterId::<()>(2), 1)", "(ClusterId::<()>(3), 1)"]);
# }));
```

:::tip

If you do not need to know _which_ member of the cluster the data came from, you can use the `send_bincode_interleaved` method instead, which will drop the IDs at the receiver:

```rust
# use hydro_lang::*;
# use dfir_rs::futures::StreamExt;
# tokio_test::block_on(test_util::multi_location_test(|flow, process| {
# let workers: Cluster<()> = flow.cluster::<()>();
let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
numbers.send_bincode_interleaved(&process)
# }, |mut stream| async move {
// if there are 4 members in the cluster, we should receive 4 elements
// 1, 1, 1, 1
# let mut results = Vec::new();
# for w in 0..4 {
# results.push(format!("{:?}", stream.next().await.unwrap()));
# }
# results.sort();
# assert_eq!(results, vec!["1", "1", "1", "1"]);
# }));
```

:::

In the reverse direction, when sending a stream _to_ a cluster, the sender must prepare `(ID, Data)` tuples, where the ID uniquely identifies which member of the cluster the data is intended for. For example, we can send a stream from a process to the worker cluster using the `send_bincode` method:

```rust
# use hydro_lang::*;
# use dfir_rs::futures::StreamExt;
# tokio_test::block_on(test_util::multi_location_test(|flow, p2| {
# let p1 = flow.process::<()>();
# let workers: Cluster<()> = flow.cluster::<()>();
let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
let on_worker: Stream<_, Cluster<_>, _> = numbers
.map(q!(|x| (ClusterId::from_raw(x), x)))
.send_bincode(&workers);
on_worker.send_bincode(&p2)
// if there are 4 members in the cluster, we should receive 4 elements
// (ClusterId::<Worker>(0), 0), (ClusterId::<Worker>(1), 1), (ClusterId::<Worker>(2), 2), (ClusterId::<Worker>(3), 3)
# }, |mut stream| async move {
# let mut results = Vec::new();
# for w in 0..4 {
# results.push(format!("{:?}", stream.next().await.unwrap()));
# }
# results.sort();
# assert_eq!(results, vec!["(ClusterId::<()>(0), 0)", "(ClusterId::<()>(1), 1)", "(ClusterId::<()>(2), 2)", "(ClusterId::<()>(3), 3)"]);
# }));
```

## Broadcasting and Membership Lists
A common pattern in distributed systems is to broadcast data to all members of a cluster. In Hydro, this can be achieved using `broadcast_bincode`, which takes in a stream of **only data elements** and broadcasts them to all members of the cluster. For example, we can broadcast a stream of integers to the worker cluster:

```rust
# use hydro_lang::*;
# use dfir_rs::futures::StreamExt;
# tokio_test::block_on(test_util::multi_location_test(|flow, p2| {
# let p1 = flow.process::<()>();
# let workers: Cluster<()> = flow.cluster::<()>();
let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast_bincode(&workers);
on_worker.send_bincode(&p2)
// if there are 4 members in the cluster, we should receive 4 elements
// (ClusterId::<Worker>(0), 123), (ClusterId::<Worker>(1), 123), (ClusterId::<Worker>(2), 123), (ClusterId::<Worker>(3), 123)
# }, |mut stream| async move {
# let mut results = Vec::new();
# for w in 0..4 {
# results.push(format!("{:?}", stream.next().await.unwrap()));
# }
# results.sort();
# assert_eq!(results, vec!["(ClusterId::<()>(0), 123)", "(ClusterId::<()>(1), 123)", "(ClusterId::<()>(2), 123)", "(ClusterId::<()>(3), 123)"]);
# }));
```

:::warning

The current broadcast implementation assumes a static configuration where members cannot be added or removed at runtime. This will change in the future as Hydro will support dynamically scaled clusters.

:::

Under the hood, the `broadcast_bincode` API uses a list of members of the cluster provided by the deployment system. To manually access this list, you can use the `members` method on a cluster to get a value that can be used inside `q!(...)` blocks:

```rust
# use hydro_lang::*;
# use dfir_rs::futures::StreamExt;
# tokio_test::block_on(test_util::multi_location_test(|flow, p2| {
# let p1 = flow.process::<()>();
# let workers: Cluster<()> = flow.cluster::<()>();
# // do nothing on each worker
# workers.source_iter(q!(vec![])).for_each(q!(|_: ()| {}));
let cluster_members = workers.members();
let members_stream: Stream<ClusterId<_>, Process<_>, _> = p1
.source_iter(q!(cluster_members /* : &[ClusterId<Worker>] */))
.cloned();
members_stream.send_bincode(&p2)
// if there are 4 members in the cluster, we should receive 4 elements
// ClusterId::<Worker>(0), ClusterId::<Worker>(1), ClusterId::<Worker>(2), ClusterId::<Worker>(3)
# }, |mut stream| async move {
# let mut results = Vec::new();
# for w in 0..4 {
# results.push(format!("{:?}", stream.next().await.unwrap()));
# }
# results.sort();
# assert_eq!(results, vec!["ClusterId::<()>(0)", "ClusterId::<()>(1)", "ClusterId::<()>(2)", "ClusterId::<()>(3)"]);
# }));
```

## Self-Identification
In some programs, it may be necessary for cluster members to know their own ID (for example, to construct a ballot in Paxos). In Hydro, this can be achieved by using the `CLUSTER_SELF_ID` constant, which can be used inside `q!(...)` blocks to get the current cluster member's ID:

```rust
# use hydro_lang::*;
# use dfir_rs::futures::StreamExt;
# tokio_test::block_on(test_util::multi_location_test(|flow, process| {
let workers: Cluster<()> = flow.cluster::<()>();
let self_id_stream = workers.source_iter(q!([CLUSTER_SELF_ID]));
self_id_stream
.filter(q!(|x| x.raw_id % 2 == 0))
.map(q!(|x| format!("hello from {}", x.raw_id)))
.send_bincode_interleaved(&process)
// if there are 4 members in the cluster, we should receive 2 elements
// "hello from 0", "hello from 2"
# }, |mut stream| async move {
# let mut results = Vec::new();
# for w in 0..2 {
# results.push(stream.next().await.unwrap());
# }
# results.sort();
# assert_eq!(results, vec!["hello from 0", "hello from 2"]);
# }));
```

:::info

You can only use `CLUSTER_SELF_ID` in code that will run on a `Cluster<_>`, such as when calling `Stream::map` when that stream is on a cluster. If you try to use it in code that will run on a `Process<_>`, you'll get a compile-time error:

```compile_fail
# use hydro_lang::*;
# let flow = FlowBuilder::new();
let process: Process<()> = flow.process::<()>();
process.source_iter(q!([CLUSTER_SELF_ID]));
// error[E0277]: the trait bound `ClusterSelfId<'_>: FreeVariableWithContext<hydro_lang::Process<'_>>` is not satisfied
```

:::
25 changes: 17 additions & 8 deletions docs/src/theme/prism-include-languages.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import siteConfig from '@generated/docusaurus.config';
import siteConfig from "@generated/docusaurus.config";
export default function prismIncludeLanguages(PrismObject) {
const {
themeConfig: {prism},
themeConfig: { prism },
} = siteConfig;
const {additionalLanguages} = prism;
const { additionalLanguages } = prism;

const PrismBefore = globalThis.Prism;
globalThis.Prism = PrismObject;
additionalLanguages.forEach((lang) => {
Expand All @@ -13,17 +13,26 @@ export default function prismIncludeLanguages(PrismObject) {
});
Prism.languages["rust,ignore"] = Prism.languages.rust;
Prism.languages["rust,no_run"] = Prism.languages.rust;
Prism.languages["compile_fail"] = Prism.languages.rust;

const origTokenize = PrismObject.tokenize;
PrismObject.hooks.add("after-tokenize", function(env) {
if (env.language === "rust" || env.language === "rust,ignore" || env.language === "rust,no_run") {
let code = env.code.split("\n").filter(line => !line.startsWith("# ")).join("\n");
PrismObject.hooks.add("after-tokenize", function (env) {
if (
env.language === "rust" ||
env.language === "rust,ignore" ||
env.language === "rust,no_run" ||
env.language === "compile_fail"
) {
let code = env.code
.split("\n")
.filter((line) => !line.startsWith("# "))
.join("\n");
env.tokens = origTokenize(code, env.grammar);
}
});

delete globalThis.Prism;
if (typeof PrismBefore !== 'undefined') {
if (typeof PrismBefore !== "undefined") {
globalThis.Prism = PrismObject;
}
}

0 comments on commit a5acf02

Please sign in to comment.