Skip to content

Commit

Permalink
feat(hydroflow): support building graphs for symmetric clusters in Hy…
Browse files Browse the repository at this point in the history
…droflow+
  • Loading branch information
shadaj committed Dec 19, 2023
1 parent 9e27582 commit f541dbe
Show file tree
Hide file tree
Showing 41 changed files with 1,333 additions and 287 deletions.
5 changes: 4 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 7 additions & 6 deletions docs/docs/hydroflow_plus/distributed.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use stageleft::*;

pub fn first_ten<'a, D: HfDeploy<'a>>(
graph: &'a HfBuilder<'a, D>,
node_builder: &mut impl HfNodeBuilder<'a, D>
node_builder: &impl HfNodeBuilder<'a, D>
) {
let node = graph.node(node_builder);
let numbers = node.source_iter(q!(0..10));
Expand All @@ -29,7 +29,7 @@ use stageleft::*;

pub fn first_ten_distributed<'a, D: HfNetworkedDeploy<'a>>(
graph: &'a HfBuilder<'a, D>,
node_builder: &mut impl HfNodeBuilder<'a, D>
node_builder: &impl HfNodeBuilder<'a, D>
) {
let node = graph.node(node_builder);
let second_node = graph.node(node_builder);
Expand All @@ -46,19 +46,19 @@ numbers
```

## The Runtime
Now that our graph spans multiple nodes, our runtime entrypoint will involve multiple subgraphs. This means we can't get away with hardcoding node 0 as before. Instead, we must take the node ID as a runtime parameter (`node_id`) to select the appropriate graph. In addition, our dataflow involves the network, so we take a `HydroCLI` runtime parameter (`cli`) so that nodes can look up their network connections and instantiate the flow graph with access to it (via `CLIRuntimeNodeBuilder`).
Now that our graph spans multiple nodes, our runtime entrypoint will involve multiple subgraphs. This means we can't get away with hardcoding node 0 as before. Instead, we must take the node ID as a runtime parameter (`node_id`) to select the appropriate graph. In addition, our dataflow involves the network, so we take a `HydroCLI` runtime parameter (`cli`) so that nodes can look up their network connections and instantiate the flow graph with access to it.

```rust
use hydroflow::util::cli::HydroCLI;
use hydroflow_plus_cli_integration::{CLIRuntime, CLIRuntimeNodeBuilder};
use hydroflow_plus_cli_integration::CLIRuntime;

#[stageleft::entry]
pub fn first_ten_distributed_runtime<'a>(
graph: &'a HfBuilder<'a, CLIRuntime>,
cli: RuntimeData<&'a HydroCLI>,
node_id: RuntimeData<usize>,
) -> impl Quoted<'a, Hydroflow<'a>> {
let _ = first_ten_distributed(graph, &mut CLIRuntimeNodeBuilder::new(cli));
let _ = first_ten_distributed(graph, &cli);
graph.build(node_id)
}
```
Expand Down Expand Up @@ -92,7 +92,7 @@ async fn main() {
let builder = hydroflow_plus::HfBuilder::new();
hydroflow_plus_test::first_ten::first_ten_distributed(
&builder,
&mut CLIDeployNodeBuilder::new(|id| {
&CLIDeployNodeBuilder::new(|id| {
deployment.HydroflowCrate(
".",
localhost.clone(),
Expand All @@ -106,6 +106,7 @@ async fn main() {
)
}),
);
builder.wire(); // sets up network connections

deployment.deploy().await.unwrap();

Expand Down
6 changes: 3 additions & 3 deletions docs/docs/hydroflow_plus/structure.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use stageleft::*;

pub fn first_ten<'a, D: HfDeploy<'a>>(
graph: &'a HfBuilder<'a, D>,
node_builder: &mut impl HfNodeBuilder<'a, D>
node_builder: &impl HfNodeBuilder<'a, D>
) {}
```

Expand All @@ -28,7 +28,7 @@ The `graph` variable gives us access to the builder API for constructing the flo
```rust
pub fn first_ten<'a, D: HfDeploy<'a>>(
graph: &'a HfBuilder<'a, D>,
node_builder: &mut impl HfNodeBuilder<'a, D>
node_builder: &impl HfNodeBuilder<'a, D>
) {
let node = graph.node(node_builder);
}
Expand Down Expand Up @@ -56,7 +56,7 @@ To define the entrypoint, we use the `#[stageleft::entry]` macro, which takes th
pub fn first_ten_runtime<'a>(
graph: &'a HfBuilder<'a, SingleGraph>
) -> impl Quoted<'a, Hydroflow<'a>> {
first_ten(graph, &mut ());
first_ten(graph, &());
graph.build(q!(0))
}
```
Expand Down
18 changes: 16 additions & 2 deletions hydro_cli/src/core/hydroflow_crate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use anyhow::{anyhow, bail, Result};
use async_channel::Receiver;
use async_trait::async_trait;
use futures_core::Future;
use hydroflow_cli_integration::ServerPort;
use hydroflow_cli_integration::{ServerBindConfig, ServerPort};
use tokio::sync::RwLock;

use self::ports::{HydroflowPortConfig, HydroflowSink, SourcePath};
Expand All @@ -33,6 +33,8 @@ pub struct HydroflowCrate {
display_id: Option<String>,
external_ports: Vec<u16>,

meta: Option<String>,

target_type: HostTargetType,

/// Configuration for the ports this service will connect to as a client.
Expand Down Expand Up @@ -81,6 +83,7 @@ impl HydroflowCrate {
display_id,
target_type,
external_ports,
meta: None,
port_to_server: HashMap::new(),
port_to_bind: HashMap::new(),
built_binary: Arc::new(async_once_cell::OnceCell::new()),
Expand All @@ -91,6 +94,14 @@ impl HydroflowCrate {
}
}

pub fn set_meta(&mut self, meta: String) {
if self.meta.is_some() {
panic!("meta already set");
}

self.meta = Some(meta);
}

pub fn get_port(
&self,
name: String,
Expand Down Expand Up @@ -194,6 +205,8 @@ impl HydroflowCrate {
}
}

type InitConfig = (HashMap<String, ServerBindConfig>, Option<String>);

#[async_trait]
impl Service for HydroflowCrate {
fn collect_resources(&mut self, _resource_batch: &mut ResourceBatch) {
Expand Down Expand Up @@ -276,7 +289,8 @@ impl Service for HydroflowCrate {
bind_config.insert(port_name.clone(), launched_host.server_config(bind_type));
}

let formatted_bind_config = serde_json::to_string(&bind_config).unwrap();
let formatted_bind_config =
serde_json::to_string::<InitConfig>(&(bind_config, self.meta.clone())).unwrap();

// request stdout before sending config so we don't miss the "ready" response
let stdout_receiver = binary.write().await.cli_stdout().await;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use hydroflow_datalog::datalog;

#[hydroflow::main]
async fn main() {
let ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init::<()>().await;
let vote_to_participant_port = ports
.port("vote_to_participant")
.connect::<ConnectedDemux<ConnectedDirect>>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use hydroflow_datalog::datalog;

#[hydroflow::main]
async fn main() {
let ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init::<()>().await;
let vote_to_participant_source = ports
.port("vote_to_participant")
.connect::<ConnectedDirect>()
Expand Down
2 changes: 1 addition & 1 deletion hydro_cli_examples/examples/dedalus_receiver/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use hydroflow_datalog::datalog;

#[hydroflow::main]
async fn main() {
let ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init::<()>().await;
let broadcast_recv = ports
.port("broadcast")
.connect::<ConnectedDirect>()
Expand Down
2 changes: 1 addition & 1 deletion hydro_cli_examples/examples/dedalus_sender/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use hydroflow_datalog::datalog;

#[hydroflow::main]
async fn main() {
let ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init::<()>().await;
let broadcast_port = ports
.port("broadcast")
.connect::<ConnectedDemux<ConnectedDirect>>()
Expand Down
2 changes: 1 addition & 1 deletion hydro_cli_examples/examples/dedalus_vote_leader/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use hydroflow_datalog::datalog;

#[hydroflow::main]
async fn main() {
let ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init::<()>().await;
let to_replica_port = ports
.port("to_replica")
.connect::<ConnectedDemux<ConnectedDirect>>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use hydroflow_datalog::datalog;

#[hydroflow::main]
async fn main() {
let ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init::<()>().await;
let to_replica_source = ports
.port("to_replica")
.connect::<ConnectedDirect>()
Expand Down
2 changes: 1 addition & 1 deletion hydro_cli_examples/examples/empty_program/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#[hydroflow::main]
async fn main() {
let _ = hydroflow::util::cli::init().await;
let _ = hydroflow::util::cli::init::<()>().await;
loop {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
Expand Down
2 changes: 1 addition & 1 deletion hydro_cli_examples/examples/stdout_receiver/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use hydroflow::util::cli::{ConnectedDirect, ConnectedSource};

#[hydroflow::main]
async fn main() {
let ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init::<()>().await;
let echo_recv = ports
.port("echo")
.connect::<ConnectedDirect>()
Expand Down
2 changes: 1 addition & 1 deletion hydro_cli_examples/examples/tagged_stdout_receiver/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use hydroflow::util::cli::{ConnectedDirect, ConnectedSource, ConnectedTagged};

#[hydroflow::main]
async fn main() {
let ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init::<()>().await;
let echo_recv = ports
.port("echo")
.connect::<ConnectedTagged<ConnectedDirect>>()
Expand Down
2 changes: 1 addition & 1 deletion hydro_cli_examples/examples/ws_chat_server/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ struct ChatMessage {

#[hydroflow::main]
async fn main() {
let ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init::<()>().await;

let from_peer = ports
.port("from_peer")
Expand Down
15 changes: 10 additions & 5 deletions hydroflow/src/util/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::cell::RefCell;
use std::collections::HashMap;

pub use hydroflow_cli_integration::*;
use serde::de::DeserializeOwned;

use crate::scheduled::graph::Hydroflow;

Expand All @@ -27,11 +28,12 @@ pub async fn launch_flow(mut flow: Hydroflow<'_>) {
}
}

pub struct HydroCLI {
pub struct HydroCLI<T> {
ports: RefCell<HashMap<String, ServerOrBound>>,
pub meta: Option<T>,
}

impl HydroCLI {
impl<T> HydroCLI<T> {
pub fn port(&self, name: &str) -> ServerOrBound {
self.ports
.try_borrow_mut()
Expand All @@ -41,17 +43,19 @@ impl HydroCLI {
}
}

pub async fn init() -> HydroCLI {
type InitConfig = (HashMap<String, ServerBindConfig>, Option<String>);

pub async fn init<T: DeserializeOwned>() -> HydroCLI<T> {
let mut input = String::new();
std::io::stdin().read_line(&mut input).unwrap();
let trimmed = input.trim();

let bind_config = serde_json::from_str::<HashMap<String, ServerBindConfig>>(trimmed).unwrap();
let bind_config = serde_json::from_str::<InitConfig>(trimmed).unwrap();

// config telling other services how to connect to me
let mut bind_results: HashMap<String, ServerPort> = HashMap::new();
let mut binds = HashMap::new();
for (name, config) in bind_config {
for (name, config) in bind_config.0 {
let bound = config.bind().await;
bind_results.insert(name.clone(), bound.sink_port());
binds.insert(name.clone(), bound);
Expand Down Expand Up @@ -84,5 +88,6 @@ pub async fn init() -> HydroCLI {

HydroCLI {
ports: RefCell::new(all_connected),
meta: bind_config.1.map(|s| serde_json::from_str(&s).unwrap()),
}
}
Loading

0 comments on commit f541dbe

Please sign in to comment.