Skip to content

Commit

Permalink
feat(hydroflow_plus, docs): improve quickstart ergonomics (#1553)
Browse files Browse the repository at this point in the history
  • Loading branch information
shadaj authored Nov 8, 2024
1 parent 9107841 commit baedf23
Show file tree
Hide file tree
Showing 37 changed files with 214 additions and 205 deletions.
28 changes: 5 additions & 23 deletions docs/docs/hydroflow_plus/index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,14 @@ sidebar_position: 0
---

# Introduction
Hydroflow+ layers a high-level Rust API over the Hydroflow IR, making it possible to write dataflow programs that span multiple processes with straightline, functional Rust code. Hydroflow+ is built on top of [Stageleft](./stageleft.mdx), which allows Hydroflow+ to emit regular Hydroflow programs that are compiled into efficient Rust binaries. It also integrates with [Hydro Deploy](../deploy/index.md) to make it easy to deploy and run Hydroflow+ programs on a cluster.
Hydroflow+ is a high-level distributed streaming framework for Rust powered by the [Hydroflow runtime](../hydroflow/index.mdx). Unlike traditional architectures such as actors or RPCs, Hydroflow+ offers _choreographic_ APIs, where expressions and functions can describe computation that takes place across many locations. It also integrates with [Hydro Deploy](../deploy/index.md) to make it easy to deploy and run Hydroflow+ programs to the cloud.

Hydroflow+ uses a two-stage compilation approach. HF+ programs are standard Rust programs, which first run on the developer's laptop to generate a _deployment plan_. This plan is then compiled to individual binaries for each machine in the distributed system (enabling zero-overhead abstractions), and are then deployed to the cloud using the generated plan along with specifications of cloud resources.

Hydroflow+ has been used to write a variety of high-performance distributed system, including implementations of classic distributed protocols such as two-phase commit and Paxos. Work is ongoing to develop a distributed systems standard library that will offer these protocols and more as reusable components.

:::caution

The docs for Hydroflow+ are still a work in progress. If you have any questions or run into bugs, please file an issue on the [Hydroflow GitHub repository](https://github.com/hydro-project/hydroflow).

:::

The main logic of Hydroflow+ programs manipulates **streams**, which capture infinite ordered sequences of elements. Streams are transformed using classic functional operators such as `map`, `filter`, and `fold`, as well as relational operators such as `join`. To build **distributed** dataflow programs, Hydroflow+ also introduces the concept of **processes**, which capture _where_ a stream is being processed.

## Setup
Hydroflow+ requires a particular workspace setup, as any crate that uses Hydroflow+ must have an supporting macro crate to drive the code generation. To get started, we recommend using the Hydroflow+ template.

```bash
#shell-command-next-line
cargo install cargo-generate
#shell-command-next-line
cargo generate hydro-project/hydroflow template/hydroflow_plus
```

`cd` into the generated folder, ensure the correct nightly version of rust is installed, and test the generated project:
```bash
#shell-command-next-line
cd <my-project>
#shell-command-next-line
rustup update
#shell-command-next-line
cargo test
```
6 changes: 2 additions & 4 deletions docs/docs/hydroflow_plus/quickstart/clusters.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ pub mod broadcast;

```rust title="src/broadcast.rs"
use hydroflow_plus::*;
use stageleft::*;

pub struct Leader {}
pub struct Workers {}
Expand Down Expand Up @@ -69,10 +68,9 @@ async fn main() {
let (leader, workers) = flow::broadcast::broadcast(&builder);

flow.with_default_optimize()
.with_process(&leader, TrybuildHost::new(deployment.Localhost()))
.with_process(&leader, deployment.Localhost())
.with_cluster(&workers, (0..2)
.map(|idx| TrybuildHost::new(deployment.Localhost()))
.collect::<Vec<_>>()
.map(|idx| deployment.Localhost())
)
.deploy(&mut deployment);

Expand Down
45 changes: 45 additions & 0 deletions docs/docs/hydroflow_plus/quickstart/first-dataflow.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
---
sidebar_position: 1
---

import CodeBlock from '@theme/CodeBlock';
import firstTenSrc from '!!raw-loader!../../../../template/hydroflow_plus/src/first_ten.rs';
import firstTenExampleSrc from '!!raw-loader!../../../../template/hydroflow_plus/examples/first_ten.rs';
import { getLines, extractOutput } from '../../../src/util';

# Your First Dataflow
Let's look a minimal example of a Hydroflow+ program. We'll start with a simple dataflow that prints out the first 10 natural numbers.

:::tip

We recommend using the Hydroflow+ template to get started with a new project:

```bash
#shell-command-next-line
cargo install cargo-generate
#shell-command-next-line
cargo generate gh:hydro-project/hydroflow template/hydroflow_plus
```

:::

## Writing a Dataflow

Hydroflow+ programs are _explicit_ about where computation takes place. So our dataflow program takes a single `&Process` parameter which is a handle to the single machine our program will run on. We can use this handle to materialize a stream using `source_iter` (which emits values from a provided collection), and then print out the values using `for_each`.

<CodeBlock language="rust" title="src/first_ten.rs">{getLines(firstTenSrc, 1, 7)}</CodeBlock>

You'll notice that the arguments to `source_iter` and `for_each` are wrapped in `q!` macros. This is because Hydroflow+ uses a two-stage compilation process, where the first stage generates a deployment plan that is then compiled to individual binaries for each machine in the distributed system. The `q!` macro is used to mark Rust code that will be executed in the second stage ("runtime" code). This generally includes snippets of Rust code that are used to define static sources of data or closures that transform them.

## Running the Dataflow
Next, let's launch the dataflow program we just wrote. To do this, we'll need to write a bit more code in `examples/first_ten.rs` to configure our deployment.

<CodeBlock language="rust">{getLines(firstTenExampleSrc, 1, 17)}</CodeBlock>

First, we initialize a new [Hydro Deploy](../../deploy/index.md) deployment with `Deployment::new()`. Then, we create a `FlowBuilder` which will store the entire dataflow program and manage its compilation.

To get the `&Process` we provide to `first_ten`, we can call `flow.process()`. After the dataflow has been created, we optimize it using `flow.with_default_optimize()`. Then, we map our virtual `Process` to a physical deployment target using `flow.with_process` (in this case we deploy to localhost).

Finally, we call `flow.deploy(&mut deployment)` to provision the dataflow program on the target machine. This returns a struct with handles to the instantiated machines, which we must store in the `_nodes` variable to prevent them from being dropped. Then, we can start the dataflow program and block until `Ctrl-C` using `deployment.run_ctrl_c()`.

In the next section, we will look at how to distribute this program across multiple processes.
6 changes: 2 additions & 4 deletions docs/docs/hydroflow_plus/quickstart/index.mdx
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Quickstart
In this tutorial, we'll walk through the basics of Hydroflow+ by building a simple dataflow that prints out the first 10 natural numbers. We'll start with a single process, then pipeline the computation, and finally distribute it across a cluster.

To get started with a new project, we'll use the Hydroflow+ template. The template comes with a pre-configured build system and an implementation of the following examples.
To get started with a new project, we'll use the Hydroflow+ template. The template comes with a simple distributed program.

```bash
#shell-command-next-line
Expand All @@ -13,10 +13,8 @@ cargo generate gh:hydro-project/hydroflow template/hydroflow_plus
cd my-example-project
```

After `cd`ing into the generated folder, ensure the correct nightly version of rust is installed and test the generated project:
After `cd`ing into the generated folder, we can run tests for the included sample:
```bash
#shell-command-next-line
rustup update
#shell-command-next-line
cargo test
```
58 changes: 0 additions & 58 deletions docs/docs/hydroflow_plus/quickstart/structure.mdx

This file was deleted.

4 changes: 2 additions & 2 deletions hydroflow_plus/src/builder/built.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use hydroflow_lang::graph::{eliminate_extra_unions_tees, HydroflowGraph};

use super::compiled::HfCompiled;
use super::deploy::{DeployFlow, DeployResult};
use crate::deploy::{ClusterSpec, Deploy, ExternalSpec, LocalDeploy, ProcessSpec};
use crate::deploy::{ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, LocalDeploy};
use crate::ir::HfPlusLeaf;
use crate::location::{Cluster, ExternalProcess, Process};

Expand Down Expand Up @@ -109,7 +109,7 @@ impl<'a> BuiltFlow<'a> {
pub fn with_process<P, D: LocalDeploy<'a>>(
self,
process: &Process<P>,
spec: impl ProcessSpec<'a, D>,
spec: impl IntoProcessSpec<'a, D>,
) -> DeployFlow<'a, D> {
self.into_deploy().with_process(process, spec)
}
Expand Down
14 changes: 10 additions & 4 deletions hydroflow_plus/src/builder/deploy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use stageleft::Quoted;

use super::built::build_inner;
use super::compiled::HfCompiled;
use crate::deploy::{ExternalSpec, LocalDeploy, Node, RegisterPort};
use crate::deploy::{ExternalSpec, IntoProcessSpec, LocalDeploy, Node, RegisterPort};
use crate::ir::HfPlusLeaf;
use crate::location::external_process::{
ExternalBincodeSink, ExternalBincodeStream, ExternalBytesPort,
Expand All @@ -39,10 +39,16 @@ impl<'a, D: LocalDeploy<'a>> Drop for DeployFlow<'a, D> {
}

impl<'a, D: LocalDeploy<'a>> DeployFlow<'a, D> {
pub fn with_process<P>(mut self, process: &Process<P>, spec: impl ProcessSpec<'a, D>) -> Self {
pub fn with_process<P>(
mut self,
process: &Process<P>,
spec: impl IntoProcessSpec<'a, D>,
) -> Self {
let tag_name = std::any::type_name::<P>().to_string();
self.nodes
.insert(process.id, spec.build(process.id, &tag_name));
self.nodes.insert(
process.id,
spec.into_process_spec().build(process.id, &tag_name),
);
self
}

Expand Down
68 changes: 61 additions & 7 deletions hydroflow_plus/src/deploy/deploy_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use trybuild_internals_api::path;

use super::deploy_runtime::*;
use super::trybuild::{compile_graph_trybuild, create_trybuild};
use super::{ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort};
use super::{ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, Node, ProcessSpec, RegisterPort};
use crate::futures::SinkExt;
use crate::lang::graph::HydroflowGraph;

Expand Down Expand Up @@ -409,6 +409,32 @@ pub struct TrybuildHost {
pub cluster_idx: Option<usize>,
}

impl From<Arc<dyn Host>> for TrybuildHost {
fn from(host: Arc<dyn Host>) -> Self {
Self {
host,
display_name: None,
rustflags: None,
tracing: None,
name_hint: None,
cluster_idx: None,
}
}
}

impl<H: Host + 'static> From<Arc<H>> for TrybuildHost {
fn from(host: Arc<H>) -> Self {
Self {
host,
display_name: None,
rustflags: None,
tracing: None,
name_hint: None,
cluster_idx: None,
}
}
}

impl TrybuildHost {
pub fn new(host: Arc<dyn Host>) -> Self {
Self {
Expand Down Expand Up @@ -455,10 +481,25 @@ impl TrybuildHost {
}
}

impl From<Arc<dyn Host>> for TrybuildHost {
fn from(h: Arc<dyn Host>) -> Self {
Self {
host: h,
impl IntoProcessSpec<'_, HydroDeploy> for Arc<dyn Host> {
type ProcessSpec = TrybuildHost;
fn into_process_spec(self) -> TrybuildHost {
TrybuildHost {
host: self,
display_name: None,
rustflags: None,
tracing: None,
name_hint: None,
cluster_idx: None,
}
}
}

impl<H: Host + 'static> IntoProcessSpec<'_, HydroDeploy> for Arc<H> {
type ProcessSpec = TrybuildHost;
fn into_process_spec(self) -> TrybuildHost {
TrybuildHost {
host: self,
display_name: None,
rustflags: None,
tracing: None,
Expand Down Expand Up @@ -586,6 +627,18 @@ impl ExternalSpec<'_, HydroDeploy> for Arc<dyn Host> {
}
}

impl<H: Host + 'static> ExternalSpec<'_, HydroDeploy> for Arc<H> {
fn build(self, _id: usize, _name_hint: &str) -> DeployExternal {
DeployExternal {
next_port: Rc::new(RefCell::new(0)),
host: self,
underlying: Rc::new(RefCell::new(None)),
allocated_ports: Rc::new(RefCell::new(HashMap::new())),
client_ports: Rc::new(RefCell::new(HashMap::new())),
}
}
}

pub enum CrateOrTrybuild {
Crate(HydroflowCrate),
Trybuild(TrybuildHost),
Expand Down Expand Up @@ -799,7 +852,7 @@ impl ClusterSpec<'_, HydroDeploy> for DeployClusterSpec {
}
}

impl ClusterSpec<'_, HydroDeploy> for Vec<TrybuildHost> {
impl<T: Into<TrybuildHost>, I: IntoIterator<Item = T>> ClusterSpec<'_, HydroDeploy> for I {
fn build(self, id: usize, name_hint: &str) -> DeployCluster {
let name_hint = format!("{} (cluster {id})", name_hint);
DeployCluster {
Expand All @@ -808,7 +861,8 @@ impl ClusterSpec<'_, HydroDeploy> for Vec<TrybuildHost> {
cluster_spec: Rc::new(RefCell::new(Some(
self.into_iter()
.enumerate()
.map(|(idx, mut b)| {
.map(|(idx, b)| {
let mut b = b.into();
b.name_hint = Some(name_hint.clone());
b.cluster_idx = Some(idx);
CrateOrTrybuild::Trybuild(b)
Expand Down
12 changes: 12 additions & 0 deletions hydroflow_plus/src/deploy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,18 @@ pub trait ProcessSpec<'a, D: LocalDeploy<'a> + ?Sized> {
fn build(self, id: usize, name_hint: &str) -> D::Process;
}

pub trait IntoProcessSpec<'a, D: LocalDeploy<'a> + ?Sized> {
type ProcessSpec: ProcessSpec<'a, D>;
fn into_process_spec(self) -> Self::ProcessSpec;
}

impl<'a, D: LocalDeploy<'a> + ?Sized, T: ProcessSpec<'a, D>> IntoProcessSpec<'a, D> for T {
type ProcessSpec = T;
fn into_process_spec(self) -> Self::ProcessSpec {
self
}
}

pub trait ClusterSpec<'a, D: LocalDeploy<'a> + ?Sized> {
fn build(self, id: usize, name_hint: &str) -> D::Cluster;
}
Expand Down
1 change: 1 addition & 0 deletions hydroflow_plus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ stageleft::stageleft_no_entry_crate!();

pub use hydroflow::scheduled::graph::Hydroflow;
pub use hydroflow::*;
pub use stageleft::*;

pub mod runtime_support {
pub use bincode;
Expand Down
2 changes: 1 addition & 1 deletion hydroflow_plus/src/location/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::marker::PhantomData;
use super::{Location, LocationId};
use crate::builder::FlowState;

pub struct Process<'a, P> {
pub struct Process<'a, P = ()> {
pub(crate) id: usize,
pub(crate) flow_state: FlowState,
pub(crate) _phantom: PhantomData<&'a &'a mut P>,
Expand Down
Loading

0 comments on commit baedf23

Please sign in to comment.