diff --git a/docs/docs/hydroflow_plus/index.mdx b/docs/docs/hydroflow_plus/index.mdx index c1c4d1805d32..77a9ee57362c 100644 --- a/docs/docs/hydroflow_plus/index.mdx +++ b/docs/docs/hydroflow_plus/index.mdx @@ -3,32 +3,14 @@ sidebar_position: 0 --- # Introduction -Hydroflow+ layers a high-level Rust API over the Hydroflow IR, making it possible to write dataflow programs that span multiple processes with straightline, functional Rust code. Hydroflow+ is built on top of [Stageleft](./stageleft.mdx), which allows Hydroflow+ to emit regular Hydroflow programs that are compiled into efficient Rust binaries. It also integrates with [Hydro Deploy](../deploy/index.md) to make it easy to deploy and run Hydroflow+ programs on a cluster. +Hydroflow+ is a high-level distributed streaming framework for Rust powered by the [Hydroflow runtime](../hydroflow/index.mdx). Unlike traditional architectures such as actors or RPCs, Hydroflow+ offers _choreographic_ APIs, where expressions and functions can describe computation that takes place across many locations. It also integrates with [Hydro Deploy](../deploy/index.md) to make it easy to deploy and run Hydroflow+ programs to the cloud. + +Hydroflow+ uses a two-stage compilation approach. HF+ programs are standard Rust programs, which first run on the developer's laptop to generate a _deployment plan_. This plan is then compiled to individual binaries for each machine in the distributed system (enabling zero-overhead abstractions), and are then deployed to the cloud using the generated plan along with specifications of cloud resources. + +Hydroflow+ has been used to write a variety of high-performance distributed system, including implementations of classic distributed protocols such as two-phase commit and Paxos. Work is ongoing to develop a distributed systems standard library that will offer these protocols and more as reusable components. :::caution The docs for Hydroflow+ are still a work in progress. If you have any questions or run into bugs, please file an issue on the [Hydroflow GitHub repository](https://github.com/hydro-project/hydroflow). ::: - -The main logic of Hydroflow+ programs manipulates **streams**, which capture infinite ordered sequences of elements. Streams are transformed using classic functional operators such as `map`, `filter`, and `fold`, as well as relational operators such as `join`. To build **distributed** dataflow programs, Hydroflow+ also introduces the concept of **processes**, which capture _where_ a stream is being processed. - -## Setup -Hydroflow+ requires a particular workspace setup, as any crate that uses Hydroflow+ must have an supporting macro crate to drive the code generation. To get started, we recommend using the Hydroflow+ template. - -```bash -#shell-command-next-line -cargo install cargo-generate -#shell-command-next-line -cargo generate hydro-project/hydroflow template/hydroflow_plus -``` - -`cd` into the generated folder, ensure the correct nightly version of rust is installed, and test the generated project: -```bash -#shell-command-next-line -cd -#shell-command-next-line -rustup update -#shell-command-next-line -cargo test -``` diff --git a/docs/docs/hydroflow_plus/quickstart/clusters.mdx b/docs/docs/hydroflow_plus/quickstart/clusters.mdx index 4155c13e68a0..77c1dba62e65 100644 --- a/docs/docs/hydroflow_plus/quickstart/clusters.mdx +++ b/docs/docs/hydroflow_plus/quickstart/clusters.mdx @@ -23,7 +23,6 @@ pub mod broadcast; ```rust title="src/broadcast.rs" use hydroflow_plus::*; -use stageleft::*; pub struct Leader {} pub struct Workers {} @@ -69,10 +68,9 @@ async fn main() { let (leader, workers) = flow::broadcast::broadcast(&builder); flow.with_default_optimize() - .with_process(&leader, TrybuildHost::new(deployment.Localhost())) + .with_process(&leader, deployment.Localhost()) .with_cluster(&workers, (0..2) - .map(|idx| TrybuildHost::new(deployment.Localhost())) - .collect::>() + .map(|idx| deployment.Localhost()) ) .deploy(&mut deployment); diff --git a/docs/docs/hydroflow_plus/quickstart/first-dataflow.mdx b/docs/docs/hydroflow_plus/quickstart/first-dataflow.mdx new file mode 100644 index 000000000000..a1699536f434 --- /dev/null +++ b/docs/docs/hydroflow_plus/quickstart/first-dataflow.mdx @@ -0,0 +1,45 @@ +--- +sidebar_position: 1 +--- + +import CodeBlock from '@theme/CodeBlock'; +import firstTenSrc from '!!raw-loader!../../../../template/hydroflow_plus/src/first_ten.rs'; +import firstTenExampleSrc from '!!raw-loader!../../../../template/hydroflow_plus/examples/first_ten.rs'; +import { getLines, extractOutput } from '../../../src/util'; + +# Your First Dataflow +Let's look a minimal example of a Hydroflow+ program. We'll start with a simple dataflow that prints out the first 10 natural numbers. + +:::tip + +We recommend using the Hydroflow+ template to get started with a new project: + +```bash +#shell-command-next-line +cargo install cargo-generate +#shell-command-next-line +cargo generate gh:hydro-project/hydroflow template/hydroflow_plus +``` + +::: + +## Writing a Dataflow + +Hydroflow+ programs are _explicit_ about where computation takes place. So our dataflow program takes a single `&Process` parameter which is a handle to the single machine our program will run on. We can use this handle to materialize a stream using `source_iter` (which emits values from a provided collection), and then print out the values using `for_each`. + +{getLines(firstTenSrc, 1, 7)} + +You'll notice that the arguments to `source_iter` and `for_each` are wrapped in `q!` macros. This is because Hydroflow+ uses a two-stage compilation process, where the first stage generates a deployment plan that is then compiled to individual binaries for each machine in the distributed system. The `q!` macro is used to mark Rust code that will be executed in the second stage ("runtime" code). This generally includes snippets of Rust code that are used to define static sources of data or closures that transform them. + +## Running the Dataflow +Next, let's launch the dataflow program we just wrote. To do this, we'll need to write a bit more code in `examples/first_ten.rs` to configure our deployment. + +{getLines(firstTenExampleSrc, 1, 17)} + +First, we initialize a new [Hydro Deploy](../../deploy/index.md) deployment with `Deployment::new()`. Then, we create a `FlowBuilder` which will store the entire dataflow program and manage its compilation. + +To get the `&Process` we provide to `first_ten`, we can call `flow.process()`. After the dataflow has been created, we optimize it using `flow.with_default_optimize()`. Then, we map our virtual `Process` to a physical deployment target using `flow.with_process` (in this case we deploy to localhost). + +Finally, we call `flow.deploy(&mut deployment)` to provision the dataflow program on the target machine. This returns a struct with handles to the instantiated machines, which we must store in the `_nodes` variable to prevent them from being dropped. Then, we can start the dataflow program and block until `Ctrl-C` using `deployment.run_ctrl_c()`. + +In the next section, we will look at how to distribute this program across multiple processes. diff --git a/docs/docs/hydroflow_plus/quickstart/index.mdx b/docs/docs/hydroflow_plus/quickstart/index.mdx index d366e2f61046..29868a1c7503 100644 --- a/docs/docs/hydroflow_plus/quickstart/index.mdx +++ b/docs/docs/hydroflow_plus/quickstart/index.mdx @@ -1,7 +1,7 @@ # Quickstart In this tutorial, we'll walk through the basics of Hydroflow+ by building a simple dataflow that prints out the first 10 natural numbers. We'll start with a single process, then pipeline the computation, and finally distribute it across a cluster. -To get started with a new project, we'll use the Hydroflow+ template. The template comes with a pre-configured build system and an implementation of the following examples. +To get started with a new project, we'll use the Hydroflow+ template. The template comes with a simple distributed program. ```bash #shell-command-next-line @@ -13,10 +13,8 @@ cargo generate gh:hydro-project/hydroflow template/hydroflow_plus cd my-example-project ``` -After `cd`ing into the generated folder, ensure the correct nightly version of rust is installed and test the generated project: +After `cd`ing into the generated folder, we can run tests for the included sample: ```bash #shell-command-next-line -rustup update -#shell-command-next-line cargo test ``` \ No newline at end of file diff --git a/docs/docs/hydroflow_plus/quickstart/structure.mdx b/docs/docs/hydroflow_plus/quickstart/structure.mdx deleted file mode 100644 index 54c13f1588a0..000000000000 --- a/docs/docs/hydroflow_plus/quickstart/structure.mdx +++ /dev/null @@ -1,58 +0,0 @@ ---- -sidebar_position: 1 ---- -import CodeBlock from '@theme/CodeBlock'; -import firstTenSrc from '!!raw-loader!../../../../template/hydroflow_plus/src/first_ten_distributed.rs'; -import { getLines, extractOutput } from '../../../src/util'; - -# Your First Dataflow -Hydroflow+ programs require special structure to support code generation and distributed deployments. There are two main components of a Hydroflow+ program: -- The **flow graph** describes the dataflow logic of the program. -- The **deployment** describes how to map the flow graph to a physical deployment. - -:::tip - -We recommend using the Hydroflow+ template to get started with a new project. The template comes with a pre-configured build system and the following example pre-implemented. - -```bash -#shell-command-next-line -cargo install cargo-generate -#shell-command-next-line -cargo generate gh:hydro-project/hydroflow template/hydroflow_plus -``` - -`cd` into the generated folder, ensure the correct nightly version of rust is installed, and test the generated project: -```bash -#shell-command-next-line -cd -#shell-command-next-line -rustup update -#shell-command-next-line -cargo test -``` - -::: - - -Let's look a minimal example of a Hydroflow+ program. We'll start with a simple flow graph that prints out the first 10 natural numbers. First, we'll define the **flow graph**. - - -## The Flow Graph - -{getLines(firstTenSrc, 1, 17)} - -To build a Hydroflow+ application, we need to define a dataflow that spans multiple processes. The `FlowBuilder` parameter captures the global dataflow, and we can instantiate processes to define boundaries between distributed logic. When defining a process, we also pass in a type parameter to a "tag" that identifies the process. When transforming streams, the Rust type system will guarantee that we are operating on streams on the same process. - -{getLines(firstTenSrc, 8, 9)} - -Now, we can build out the dataflow to run on this process. Every dataflow starts at a source that is bound to a specific process. First, we instantiate a stream that emits the first 10 natural numbers. - -{getLines(firstTenSrc, 11)} - -In Hydroflow+, whenever there are snippets of Rust code passed to operators (like `source_iter`, `map`, or `for_each`), we use the `q!` macro to mark them. For example, we may use Rust snippets to define static sources of data or closures that transform them. - -To print out these numbers, we can use the `for_each` operator (note that the body of `for_each` is a closure wrapped in `q!`): - -{getLines(firstTenSrc, 12, 14)} - -In the next section, we will look at how to deploy this program to run on multiple processs. diff --git a/hydroflow_plus/src/builder/built.rs b/hydroflow_plus/src/builder/built.rs index ee196b3f3016..9beb37b6da4a 100644 --- a/hydroflow_plus/src/builder/built.rs +++ b/hydroflow_plus/src/builder/built.rs @@ -5,7 +5,7 @@ use hydroflow_lang::graph::{eliminate_extra_unions_tees, HydroflowGraph}; use super::compiled::HfCompiled; use super::deploy::{DeployFlow, DeployResult}; -use crate::deploy::{ClusterSpec, Deploy, ExternalSpec, LocalDeploy, ProcessSpec}; +use crate::deploy::{ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, LocalDeploy}; use crate::ir::HfPlusLeaf; use crate::location::{Cluster, ExternalProcess, Process}; @@ -109,7 +109,7 @@ impl<'a> BuiltFlow<'a> { pub fn with_process>( self, process: &Process

, - spec: impl ProcessSpec<'a, D>, + spec: impl IntoProcessSpec<'a, D>, ) -> DeployFlow<'a, D> { self.into_deploy().with_process(process, spec) } diff --git a/hydroflow_plus/src/builder/deploy.rs b/hydroflow_plus/src/builder/deploy.rs index e441b76eaed6..8cdaf5b9e1b9 100644 --- a/hydroflow_plus/src/builder/deploy.rs +++ b/hydroflow_plus/src/builder/deploy.rs @@ -12,7 +12,7 @@ use stageleft::Quoted; use super::built::build_inner; use super::compiled::HfCompiled; -use crate::deploy::{ExternalSpec, LocalDeploy, Node, RegisterPort}; +use crate::deploy::{ExternalSpec, IntoProcessSpec, LocalDeploy, Node, RegisterPort}; use crate::ir::HfPlusLeaf; use crate::location::external_process::{ ExternalBincodeSink, ExternalBincodeStream, ExternalBytesPort, @@ -39,10 +39,16 @@ impl<'a, D: LocalDeploy<'a>> Drop for DeployFlow<'a, D> { } impl<'a, D: LocalDeploy<'a>> DeployFlow<'a, D> { - pub fn with_process

(mut self, process: &Process

, spec: impl ProcessSpec<'a, D>) -> Self { + pub fn with_process

( + mut self, + process: &Process

, + spec: impl IntoProcessSpec<'a, D>, + ) -> Self { let tag_name = std::any::type_name::

().to_string(); - self.nodes - .insert(process.id, spec.build(process.id, &tag_name)); + self.nodes.insert( + process.id, + spec.into_process_spec().build(process.id, &tag_name), + ); self } diff --git a/hydroflow_plus/src/deploy/deploy_graph.rs b/hydroflow_plus/src/deploy/deploy_graph.rs index 218fbda3f5b5..ba5a91ee4a76 100644 --- a/hydroflow_plus/src/deploy/deploy_graph.rs +++ b/hydroflow_plus/src/deploy/deploy_graph.rs @@ -26,7 +26,7 @@ use trybuild_internals_api::path; use super::deploy_runtime::*; use super::trybuild::{compile_graph_trybuild, create_trybuild}; -use super::{ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort}; +use super::{ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, Node, ProcessSpec, RegisterPort}; use crate::futures::SinkExt; use crate::lang::graph::HydroflowGraph; @@ -409,6 +409,32 @@ pub struct TrybuildHost { pub cluster_idx: Option, } +impl From> for TrybuildHost { + fn from(host: Arc) -> Self { + Self { + host, + display_name: None, + rustflags: None, + tracing: None, + name_hint: None, + cluster_idx: None, + } + } +} + +impl From> for TrybuildHost { + fn from(host: Arc) -> Self { + Self { + host, + display_name: None, + rustflags: None, + tracing: None, + name_hint: None, + cluster_idx: None, + } + } +} + impl TrybuildHost { pub fn new(host: Arc) -> Self { Self { @@ -455,10 +481,25 @@ impl TrybuildHost { } } -impl From> for TrybuildHost { - fn from(h: Arc) -> Self { - Self { - host: h, +impl IntoProcessSpec<'_, HydroDeploy> for Arc { + type ProcessSpec = TrybuildHost; + fn into_process_spec(self) -> TrybuildHost { + TrybuildHost { + host: self, + display_name: None, + rustflags: None, + tracing: None, + name_hint: None, + cluster_idx: None, + } + } +} + +impl IntoProcessSpec<'_, HydroDeploy> for Arc { + type ProcessSpec = TrybuildHost; + fn into_process_spec(self) -> TrybuildHost { + TrybuildHost { + host: self, display_name: None, rustflags: None, tracing: None, @@ -586,6 +627,18 @@ impl ExternalSpec<'_, HydroDeploy> for Arc { } } +impl ExternalSpec<'_, HydroDeploy> for Arc { + fn build(self, _id: usize, _name_hint: &str) -> DeployExternal { + DeployExternal { + next_port: Rc::new(RefCell::new(0)), + host: self, + underlying: Rc::new(RefCell::new(None)), + allocated_ports: Rc::new(RefCell::new(HashMap::new())), + client_ports: Rc::new(RefCell::new(HashMap::new())), + } + } +} + pub enum CrateOrTrybuild { Crate(HydroflowCrate), Trybuild(TrybuildHost), @@ -799,7 +852,7 @@ impl ClusterSpec<'_, HydroDeploy> for DeployClusterSpec { } } -impl ClusterSpec<'_, HydroDeploy> for Vec { +impl, I: IntoIterator> ClusterSpec<'_, HydroDeploy> for I { fn build(self, id: usize, name_hint: &str) -> DeployCluster { let name_hint = format!("{} (cluster {id})", name_hint); DeployCluster { @@ -808,7 +861,8 @@ impl ClusterSpec<'_, HydroDeploy> for Vec { cluster_spec: Rc::new(RefCell::new(Some( self.into_iter() .enumerate() - .map(|(idx, mut b)| { + .map(|(idx, b)| { + let mut b = b.into(); b.name_hint = Some(name_hint.clone()); b.cluster_idx = Some(idx); CrateOrTrybuild::Trybuild(b) diff --git a/hydroflow_plus/src/deploy/mod.rs b/hydroflow_plus/src/deploy/mod.rs index 995f2f0f69cc..d447bd23722a 100644 --- a/hydroflow_plus/src/deploy/mod.rs +++ b/hydroflow_plus/src/deploy/mod.rs @@ -208,6 +208,18 @@ pub trait ProcessSpec<'a, D: LocalDeploy<'a> + ?Sized> { fn build(self, id: usize, name_hint: &str) -> D::Process; } +pub trait IntoProcessSpec<'a, D: LocalDeploy<'a> + ?Sized> { + type ProcessSpec: ProcessSpec<'a, D>; + fn into_process_spec(self) -> Self::ProcessSpec; +} + +impl<'a, D: LocalDeploy<'a> + ?Sized, T: ProcessSpec<'a, D>> IntoProcessSpec<'a, D> for T { + type ProcessSpec = T; + fn into_process_spec(self) -> Self::ProcessSpec { + self + } +} + pub trait ClusterSpec<'a, D: LocalDeploy<'a> + ?Sized> { fn build(self, id: usize, name_hint: &str) -> D::Cluster; } diff --git a/hydroflow_plus/src/lib.rs b/hydroflow_plus/src/lib.rs index 2e7f322ec1d8..b71cc879b8e1 100644 --- a/hydroflow_plus/src/lib.rs +++ b/hydroflow_plus/src/lib.rs @@ -4,6 +4,7 @@ stageleft::stageleft_no_entry_crate!(); pub use hydroflow::scheduled::graph::Hydroflow; pub use hydroflow::*; +pub use stageleft::*; pub mod runtime_support { pub use bincode; diff --git a/hydroflow_plus/src/location/process.rs b/hydroflow_plus/src/location/process.rs index 301d7ce6c2a8..fe28157ff768 100644 --- a/hydroflow_plus/src/location/process.rs +++ b/hydroflow_plus/src/location/process.rs @@ -3,7 +3,7 @@ use std::marker::PhantomData; use super::{Location, LocationId}; use crate::builder::FlowState; -pub struct Process<'a, P> { +pub struct Process<'a, P = ()> { pub(crate) id: usize, pub(crate) flow_state: FlowState, pub(crate) _phantom: PhantomData<&'a &'a mut P>, diff --git a/hydroflow_plus/src/stream.rs b/hydroflow_plus/src/stream.rs index f85eb32aa2dc..81c8f1ec0573 100644 --- a/hydroflow_plus/src/stream.rs +++ b/hydroflow_plus/src/stream.rs @@ -832,14 +832,11 @@ impl<'a, T, L: Location<'a> + NoTick, B> Stream { #[cfg(test)] mod tests { - use std::sync::Arc; - - use hydro_deploy::{Deployment, Host}; + use hydro_deploy::Deployment; use hydroflow::futures::StreamExt; use serde::{Deserialize, Serialize}; use stageleft::q; - use crate::deploy::TrybuildHost; use crate::location::Location; use crate::FlowBuilder; @@ -868,9 +865,9 @@ mod tests { let nodes = flow .with_default_optimize() - .with_process(&first_node, TrybuildHost::new(deployment.Localhost())) - .with_process(&second_node, TrybuildHost::new(deployment.Localhost())) - .with_external(&external, deployment.Localhost() as Arc) + .with_process(&first_node, deployment.Localhost()) + .with_process(&second_node, deployment.Localhost()) + .with_external(&external, deployment.Localhost()) .deploy(&mut deployment); deployment.deploy().await.unwrap(); diff --git a/hydroflow_plus_test/examples/compute_pi.rs b/hydroflow_plus_test/examples/compute_pi.rs index 37c02da439f3..e7844e1c2c4e 100644 --- a/hydroflow_plus_test/examples/compute_pi.rs +++ b/hydroflow_plus_test/examples/compute_pi.rs @@ -49,9 +49,7 @@ async fn main() { ) .with_cluster( &cluster, - (0..8) - .map(|_| TrybuildHost::new(create_host(&mut deployment)).rustflags(rustflags)) - .collect::>(), + (0..8).map(|_| TrybuildHost::new(create_host(&mut deployment)).rustflags(rustflags)), ) .deploy(&mut deployment); diff --git a/hydroflow_plus_test/examples/first_ten_distributed.rs b/hydroflow_plus_test/examples/first_ten_distributed.rs index 6716c6fa52b4..ddbcd5c83e6d 100644 --- a/hydroflow_plus_test/examples/first_ten_distributed.rs +++ b/hydroflow_plus_test/examples/first_ten_distributed.rs @@ -52,7 +52,7 @@ async fn main() { &p2, TrybuildHost::new(create_host(&mut deployment)).rustflags(rustflags), ) - .with_external(&external_process, deployment.Localhost() as Arc) + .with_external(&external_process, deployment.Localhost()) .deploy(&mut deployment); deployment.deploy().await.unwrap(); diff --git a/hydroflow_plus_test/examples/map_reduce.rs b/hydroflow_plus_test/examples/map_reduce.rs index c18d9c59bb52..3ccc8ea48969 100644 --- a/hydroflow_plus_test/examples/map_reduce.rs +++ b/hydroflow_plus_test/examples/map_reduce.rs @@ -48,9 +48,7 @@ async fn main() { ) .with_cluster( &cluster, - (0..2) - .map(|_| TrybuildHost::new(create_host(&mut deployment)).rustflags(rustflags)) - .collect::>(), + (0..2).map(|_| TrybuildHost::new(create_host(&mut deployment)).rustflags(rustflags)), ) .deploy(&mut deployment); diff --git a/hydroflow_plus_test/examples/paxos.rs b/hydroflow_plus_test/examples/paxos.rs index 31c34cb39beb..06efd212cdf3 100644 --- a/hydroflow_plus_test/examples/paxos.rs +++ b/hydroflow_plus_test/examples/paxos.rs @@ -60,26 +60,22 @@ async fn main() { .with_cluster( &proposers, (0..f + 1) - .map(|_| TrybuildHost::new(create_host(&mut deployment)).rustflags(rustflags)) - .collect::>(), + .map(|_| TrybuildHost::new(create_host(&mut deployment)).rustflags(rustflags)), ) .with_cluster( &acceptors, (0..2 * f + 1) - .map(|_| TrybuildHost::new(create_host(&mut deployment)).rustflags(rustflags)) - .collect::>(), + .map(|_| TrybuildHost::new(create_host(&mut deployment)).rustflags(rustflags)), ) .with_cluster( &clients, (0..num_clients) - .map(|_| TrybuildHost::new(create_host(&mut deployment)).rustflags(rustflags)) - .collect::>(), + .map(|_| TrybuildHost::new(create_host(&mut deployment)).rustflags(rustflags)), ) .with_cluster( &replicas, (0..f + 1) - .map(|_| TrybuildHost::new(create_host(&mut deployment)).rustflags(rustflags)) - .collect::>(), + .map(|_| TrybuildHost::new(create_host(&mut deployment)).rustflags(rustflags)), ) .deploy(&mut deployment); diff --git a/hydroflow_plus_test/examples/perf_compute_pi.rs b/hydroflow_plus_test/examples/perf_compute_pi.rs index 56d8db5a9746..d92c77c4dd4d 100644 --- a/hydroflow_plus_test/examples/perf_compute_pi.rs +++ b/hydroflow_plus_test/examples/perf_compute_pi.rs @@ -68,21 +68,19 @@ async fn main() { ) .with_cluster( &cluster, - (0..8) - .map(|idx| { - TrybuildHost::new(create_host(&mut deployment)) - .rustflags(rustflags) - .tracing( - TracingOptions::builder() - .perf_raw_outfile(format!("cluster{}.perf.data", idx)) - .dtrace_outfile(format!("cluster{}.leader.stacks", idx)) - .fold_outfile(format!("cluster{}.data.folded", idx)) - .flamegraph_outfile(format!("cluster{}.svg", idx)) - .frequency(128) - .build(), - ) - }) - .collect::>(), + (0..8).map(|idx| { + TrybuildHost::new(create_host(&mut deployment)) + .rustflags(rustflags) + .tracing( + TracingOptions::builder() + .perf_raw_outfile(format!("cluster{}.perf.data", idx)) + .dtrace_outfile(format!("cluster{}.leader.stacks", idx)) + .fold_outfile(format!("cluster{}.data.folded", idx)) + .flamegraph_outfile(format!("cluster{}.svg", idx)) + .frequency(128) + .build(), + ) + }), ) .deploy(&mut deployment); deployment.run_ctrl_c().await.unwrap(); diff --git a/hydroflow_plus_test/examples/simple_cluster.rs b/hydroflow_plus_test/examples/simple_cluster.rs index e1c22b702f45..8fb08e5b6a7d 100644 --- a/hydroflow_plus_test/examples/simple_cluster.rs +++ b/hydroflow_plus_test/examples/simple_cluster.rs @@ -49,9 +49,7 @@ async fn main() { ) .with_cluster( &cluster, - (0..2) - .map(|_| TrybuildHost::new(create_host(&mut deployment)).rustflags(rustflags)) - .collect::>(), + (0..2).map(|_| TrybuildHost::new(create_host(&mut deployment)).rustflags(rustflags)), ) .deploy(&mut deployment); deployment.run_ctrl_c().await.unwrap(); diff --git a/hydroflow_plus_test/examples/two_pc.rs b/hydroflow_plus_test/examples/two_pc.rs index df946a70613c..23f7a6b64239 100644 --- a/hydroflow_plus_test/examples/two_pc.rs +++ b/hydroflow_plus_test/examples/two_pc.rs @@ -19,9 +19,7 @@ async fn main() { .with_process(&coordinator, TrybuildHost::new(deployment.Localhost())) .with_cluster( &participants, - (0..num_participants) - .map(|_| TrybuildHost::new(deployment.Localhost())) - .collect::>(), + (0..num_participants).map(|_| TrybuildHost::new(deployment.Localhost())), ) .with_process(&client, TrybuildHost::new(deployment.Localhost())) .deploy(&mut deployment); diff --git a/hydroflow_plus_test/src/cluster/compute_pi.rs b/hydroflow_plus_test/src/cluster/compute_pi.rs index 4f4010a3d5fc..fbbf2a4d2671 100644 --- a/hydroflow_plus_test/src/cluster/compute_pi.rs +++ b/hydroflow_plus_test/src/cluster/compute_pi.rs @@ -1,7 +1,6 @@ use std::time::Duration; use hydroflow_plus::*; -use stageleft::*; pub struct Worker {} pub struct Leader {} diff --git a/hydroflow_plus_test/src/cluster/many_to_many.rs b/hydroflow_plus_test/src/cluster/many_to_many.rs index 90adbf03f09b..91cda6f10ca1 100644 --- a/hydroflow_plus_test/src/cluster/many_to_many.rs +++ b/hydroflow_plus_test/src/cluster/many_to_many.rs @@ -1,5 +1,4 @@ use hydroflow_plus::*; -use stageleft::*; pub fn many_to_many<'a>(flow: &FlowBuilder<'a>) -> Cluster<'a, ()> { let cluster = flow.cluster(); @@ -14,7 +13,7 @@ pub fn many_to_many<'a>(flow: &FlowBuilder<'a>) -> Cluster<'a, ()> { #[cfg(test)] mod tests { use hydro_deploy::Deployment; - use hydroflow_plus::deploy::{DeployCrateWrapper, TrybuildHost}; + use hydroflow_plus::deploy::DeployCrateWrapper; #[tokio::test] async fn many_to_many() { @@ -27,12 +26,7 @@ mod tests { insta::assert_debug_snapshot!(built.ir()); let nodes = built - .with_cluster( - &cluster, - (0..2) - .map(|_| TrybuildHost::new(deployment.Localhost())) - .collect::>(), - ) + .with_cluster(&cluster, (0..2).map(|_| deployment.Localhost())) .deploy(&mut deployment); deployment.deploy().await.unwrap(); diff --git a/hydroflow_plus_test/src/cluster/map_reduce.rs b/hydroflow_plus_test/src/cluster/map_reduce.rs index 192403e423b1..cf9e558c3daa 100644 --- a/hydroflow_plus_test/src/cluster/map_reduce.rs +++ b/hydroflow_plus_test/src/cluster/map_reduce.rs @@ -1,5 +1,4 @@ use hydroflow_plus::*; -use stageleft::*; pub struct Leader {} pub struct Worker {} diff --git a/hydroflow_plus_test/src/cluster/paxos.rs b/hydroflow_plus_test/src/cluster/paxos.rs index 85b9e4c06940..26ac4a535675 100644 --- a/hydroflow_plus_test/src/cluster/paxos.rs +++ b/hydroflow_plus_test/src/cluster/paxos.rs @@ -5,7 +5,6 @@ use std::time::Duration; use hydroflow_plus::*; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; -use stageleft::*; use tokio::time::Instant; pub struct Proposer {} diff --git a/hydroflow_plus_test/src/cluster/paxos_bench.rs b/hydroflow_plus_test/src/cluster/paxos_bench.rs index e3db1458a809..a651eebe02c8 100644 --- a/hydroflow_plus_test/src/cluster/paxos_bench.rs +++ b/hydroflow_plus_test/src/cluster/paxos_bench.rs @@ -3,7 +3,6 @@ use std::rc::Rc; use std::time::{Duration, SystemTime}; use hydroflow_plus::*; -use stageleft::*; use super::paxos::{Acceptor, Proposer}; use super::paxos_kv::{paxos_kv, KvPayload, Replica}; diff --git a/hydroflow_plus_test/src/cluster/paxos_kv.rs b/hydroflow_plus_test/src/cluster/paxos_kv.rs index 9aeef0d9b7c4..42048ff104ac 100644 --- a/hydroflow_plus_test/src/cluster/paxos_kv.rs +++ b/hydroflow_plus_test/src/cluster/paxos_kv.rs @@ -5,7 +5,6 @@ use std::hash::Hash; use hydroflow_plus::*; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; -use stageleft::*; use super::paxos::{paxos_core, Acceptor, Proposer}; diff --git a/hydroflow_plus_test/src/cluster/simple_cluster.rs b/hydroflow_plus_test/src/cluster/simple_cluster.rs index f3c7062b348d..074230631b69 100644 --- a/hydroflow_plus_test/src/cluster/simple_cluster.rs +++ b/hydroflow_plus_test/src/cluster/simple_cluster.rs @@ -1,5 +1,4 @@ use hydroflow_plus::*; -use stageleft::*; pub fn decouple_cluster<'a>(flow: &FlowBuilder<'a>) -> (Cluster<'a, ()>, Cluster<'a, ()>) { let cluster1 = flow.cluster(); @@ -55,7 +54,7 @@ pub fn simple_cluster<'a>(flow: &FlowBuilder<'a>) -> (Process<'a, ()>, Cluster<' #[cfg(test)] mod tests { use hydro_deploy::Deployment; - use hydroflow_plus::deploy::{DeployCrateWrapper, TrybuildHost}; + use hydroflow_plus::deploy::DeployCrateWrapper; #[tokio::test] async fn simple_cluster() { @@ -68,13 +67,8 @@ mod tests { insta::assert_debug_snapshot!(built.ir()); let nodes = built - .with_process(&node, TrybuildHost::new(deployment.Localhost())) - .with_cluster( - &cluster, - (0..2) - .map(|_| TrybuildHost::new(deployment.Localhost())) - .collect::>(), - ) + .with_process(&node, deployment.Localhost()) + .with_cluster(&cluster, (0..2).map(|_| deployment.Localhost())) .deploy(&mut deployment); deployment.deploy().await.unwrap(); @@ -128,8 +122,8 @@ mod tests { let built = builder.with_default_optimize(); let nodes = built - .with_process(&process1, TrybuildHost::new(deployment.Localhost())) - .with_process(&process2, TrybuildHost::new(deployment.Localhost())) + .with_process(&process1, deployment.Localhost()) + .with_process(&process2, deployment.Localhost()) .deploy(&mut deployment); deployment.deploy().await.unwrap(); @@ -150,18 +144,8 @@ mod tests { let built = builder.with_default_optimize(); let nodes = built - .with_cluster( - &cluster1, - (0..3) - .map(|_| TrybuildHost::new(deployment.Localhost())) - .collect::>(), - ) - .with_cluster( - &cluster2, - (0..3) - .map(|_| TrybuildHost::new(deployment.Localhost())) - .collect::>(), - ) + .with_cluster(&cluster1, (0..3).map(|_| deployment.Localhost())) + .with_cluster(&cluster2, (0..3).map(|_| deployment.Localhost())) .deploy(&mut deployment); deployment.deploy().await.unwrap(); diff --git a/hydroflow_plus_test/src/cluster/two_pc.rs b/hydroflow_plus_test/src/cluster/two_pc.rs index 5e11100ffc95..79e400ae38a2 100644 --- a/hydroflow_plus_test/src/cluster/two_pc.rs +++ b/hydroflow_plus_test/src/cluster/two_pc.rs @@ -1,5 +1,4 @@ use hydroflow_plus::*; -use stageleft::*; // if the variable start with p, that means current work is at the participant side. if start with c, at coordinator side. // diff --git a/hydroflow_plus_test/src/distributed/first_ten.rs b/hydroflow_plus_test/src/distributed/first_ten.rs index cead08979018..aacce28f290f 100644 --- a/hydroflow_plus_test/src/distributed/first_ten.rs +++ b/hydroflow_plus_test/src/distributed/first_ten.rs @@ -2,7 +2,6 @@ use hydroflow_plus::*; use location::external_process::ExternalBincodeSink; use location::ExternalProcess; use serde::{Deserialize, Serialize}; -use stageleft::*; #[derive(Serialize, Deserialize)] struct SendOverNetwork { @@ -44,11 +43,9 @@ pub fn first_ten_distributed<'a>( #[cfg(test)] mod tests { - use std::sync::Arc; - use futures::SinkExt; - use hydro_deploy::{Deployment, Host}; - use hydroflow_plus::deploy::{DeployCrateWrapper, TrybuildHost}; + use hydro_deploy::Deployment; + use hydroflow_plus::deploy::DeployCrateWrapper; #[tokio::test] async fn first_ten_distributed() { @@ -63,9 +60,9 @@ mod tests { insta::assert_debug_snapshot!(built.ir()); let nodes = built - .with_process(&first_node, TrybuildHost::new(deployment.Localhost())) - .with_process(&second_node, TrybuildHost::new(deployment.Localhost())) - .with_external(&external_process, deployment.Localhost() as Arc) + .with_process(&first_node, deployment.Localhost()) + .with_process(&second_node, deployment.Localhost()) + .with_external(&external_process, deployment.Localhost()) .deploy(&mut deployment); deployment.deploy().await.unwrap(); diff --git a/hydroflow_plus_test_local/src/local/compute_pi.rs b/hydroflow_plus_test_local/src/local/compute_pi.rs index 97b850714e45..c0cc525e647e 100644 --- a/hydroflow_plus_test_local/src/local/compute_pi.rs +++ b/hydroflow_plus_test_local/src/local/compute_pi.rs @@ -2,7 +2,6 @@ use std::time::Duration; use hydroflow_plus::deploy::SingleProcessGraph; use hydroflow_plus::*; -use stageleft::*; pub fn compute_pi<'a>(flow: &FlowBuilder<'a>, batch_size: RuntimeData) -> Process<'a, ()> { let process = flow.process(); diff --git a/hydroflow_plus_test_local/src/local/first_ten.rs b/hydroflow_plus_test_local/src/local/first_ten.rs index 8da36aa823c6..20c674a691e4 100644 --- a/hydroflow_plus_test_local/src/local/first_ten.rs +++ b/hydroflow_plus_test_local/src/local/first_ten.rs @@ -1,6 +1,5 @@ use hydroflow_plus::deploy::SingleProcessGraph; use hydroflow_plus::*; -use stageleft::*; pub fn first_ten(flow: &FlowBuilder) { let process = flow.process::<()>(); diff --git a/hydroflow_plus_test_local/src/local/graph_reachability.rs b/hydroflow_plus_test_local/src/local/graph_reachability.rs index 97f48660e852..928a9157c2d1 100644 --- a/hydroflow_plus_test_local/src/local/graph_reachability.rs +++ b/hydroflow_plus_test_local/src/local/graph_reachability.rs @@ -2,7 +2,6 @@ use hydroflow_plus::deploy::SingleProcessGraph; use hydroflow_plus::tokio::sync::mpsc::UnboundedSender; use hydroflow_plus::tokio_stream::wrappers::UnboundedReceiverStream; use hydroflow_plus::*; -use stageleft::*; #[stageleft::entry] pub fn graph_reachability<'a>( diff --git a/template/hydroflow_plus/README.md b/template/hydroflow_plus/README.md index d999d91e6a8f..a4f1058c359d 100644 --- a/template/hydroflow_plus/README.md +++ b/template/hydroflow_plus/README.md @@ -7,16 +7,15 @@ cargo generate gh:hydro-project/hydroflow template/hydroflow_plus cd ``` -After `cd`ing into the workspace, ensure the correct nightly version of rust is installed: -```bash -rustup update -``` +After `cd`ing into the workspace, run the sample tests Then test the project: ```bash cargo test ``` +To learn more about the template, see the [Hydroflow+ Quickstart](https://hydro.run/docs/hydroflow_plus/quickstart/first-dataflow). + ## Project Structure The template includes a sample program `first_ten_distributed`. diff --git a/template/hydroflow_plus/examples/first_ten.rs b/template/hydroflow_plus/examples/first_ten.rs new file mode 100644 index 000000000000..ada07db93fbf --- /dev/null +++ b/template/hydroflow_plus/examples/first_ten.rs @@ -0,0 +1,17 @@ +use hydro_deploy::Deployment; + +#[tokio::main] +async fn main() { + let mut deployment = Deployment::new(); + + let flow = hydroflow_plus::FlowBuilder::new(); + let process = flow.process(); + hydroflow_plus_template::first_ten::first_ten(&process); + + let _nodes = flow + .with_default_optimize() + .with_process(&process, deployment.Localhost()) + .deploy(&mut deployment); + + deployment.run_ctrl_c().await.unwrap(); +} diff --git a/template/hydroflow_plus/examples/first_ten_distributed.rs b/template/hydroflow_plus/examples/first_ten_distributed.rs index 6cae922a757d..e75bdbb1ecd6 100644 --- a/template/hydroflow_plus/examples/first_ten_distributed.rs +++ b/template/hydroflow_plus/examples/first_ten_distributed.rs @@ -1,5 +1,4 @@ use hydro_deploy::Deployment; -use hydroflow_plus::deploy::TrybuildHost; #[tokio::main] async fn main() { @@ -10,8 +9,8 @@ async fn main() { let _nodes = flow .with_default_optimize() - .with_process(&p1, TrybuildHost::new(deployment.Localhost())) - .with_process(&p2, TrybuildHost::new(deployment.Localhost())) + .with_process(&p1, deployment.Localhost()) + .with_process(&p2, deployment.Localhost()) .deploy(&mut deployment); deployment.run_ctrl_c().await.unwrap(); diff --git a/template/hydroflow_plus/src/first_ten.rs b/template/hydroflow_plus/src/first_ten.rs new file mode 100644 index 000000000000..86fe2feb7528 --- /dev/null +++ b/template/hydroflow_plus/src/first_ten.rs @@ -0,0 +1,7 @@ +use hydroflow_plus::*; + +pub fn first_ten(process: &Process) { + process + .source_iter(q!(0..10)) + .for_each(q!(|n| println!("{}", n))); +} diff --git a/template/hydroflow_plus/src/first_ten_distributed.rs b/template/hydroflow_plus/src/first_ten_distributed.rs index 5fb1c4b48fff..c4d8d1cbcccc 100644 --- a/template/hydroflow_plus/src/first_ten_distributed.rs +++ b/template/hydroflow_plus/src/first_ten_distributed.rs @@ -1,5 +1,4 @@ use hydroflow_plus::*; -use stageleft::*; pub struct P1 {} pub struct P2 {} @@ -19,7 +18,7 @@ pub fn first_ten_distributed<'a>(flow: &FlowBuilder<'a>) -> (Process<'a, P1>, Pr #[cfg(test)] mod tests { use hydro_deploy::Deployment; - use hydroflow_plus::deploy::{DeployCrateWrapper, TrybuildHost}; + use hydroflow_plus::deploy::DeployCrateWrapper; use hydroflow_plus::futures::StreamExt; use tokio_stream::wrappers::UnboundedReceiverStream; @@ -33,8 +32,8 @@ mod tests { let nodes = flow .with_default_optimize() - .with_process(&p1, TrybuildHost::new(localhost.clone())) - .with_process(&p2, TrybuildHost::new(localhost.clone())) + .with_process(&p1, localhost.clone()) + .with_process(&p2, localhost.clone()) .deploy(&mut deployment); deployment.deploy().await.unwrap(); diff --git a/template/hydroflow_plus/src/lib.rs b/template/hydroflow_plus/src/lib.rs index 17e53184ca7e..37c22f9c1c6b 100644 --- a/template/hydroflow_plus/src/lib.rs +++ b/template/hydroflow_plus/src/lib.rs @@ -1,3 +1,4 @@ stageleft::stageleft_no_entry_crate!(); +pub mod first_ten; pub mod first_ten_distributed;