Skip to content

Commit

Permalink
feat(hydroflow_plus): pass subgraph ID through deploy metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
shadaj committed Dec 21, 2023
1 parent 2d2e970 commit c02ead5
Show file tree
Hide file tree
Showing 20 changed files with 82 additions and 96 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 6 additions & 9 deletions docs/docs/hydroflow_plus/distributed.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,9 @@ use hydroflow_plus_cli_integration::{CLIRuntime, HydroflowPlusMeta};
pub fn first_ten_distributed_runtime<'a>(
graph: &'a GraphBuilder<'a, CLIRuntime>,
cli: RuntimeData<&'a HydroCLI<HydroflowPlusMeta>>,
subgraph_id: RuntimeData<usize>,
) -> impl Quoted<'a, Hydroflow<'a>> {
let _ = first_ten_distributed(graph, &cli);
graph.build(subgraph_id)
graph.build(q!(cli.meta.subgraph_id))
}
```

Expand All @@ -68,12 +67,11 @@ The corresponding binary in `src/bin/first_ten_distributed.rs` then instantiates
```rust
#[tokio::main]
async fn main() {
let subgraph_id: usize = std::env::args().nth(1).unwrap().parse().unwrap();
let ports = hydroflow::util::cli::init().await;

let joined = flow::first_ten_distributed_runtime!(&ports, subgraph_id);

hydroflow::util::cli::launch_flow(joined).await;
hydroflow::util::cli::launch_flow(
flow::first_ten_distributed_runtime!(&ports)
).await;
}
```

Expand All @@ -92,12 +90,11 @@ async fn main() {
let builder = hydroflow_plus::GraphBuilder::new();
hydroflow_plus_test::first_ten::first_ten_distributed(
&builder,
&CLIDeployNodeBuilder::new(|id| {
&CLIDeployNodeBuilder::new(|| {
deployment.add_service(
HydroflowCrate::new(".", localhost.clone())
.bin("first_ten_distributed")
.profile("dev")
.args(vec![id.to_string()]),
.profile("dev"),
)
}),
);
Expand Down
5 changes: 3 additions & 2 deletions hydro_deploy/core/src/hydroflow_crate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use async_channel::Receiver;
use async_trait::async_trait;
use futures_core::Future;
use hydroflow_cli_integration::{InitConfig, ServerPort};
use serde::Serialize;
use tokio::sync::RwLock;

use self::ports::{HydroflowPortConfig, HydroflowSink, SourcePath};
Expand Down Expand Up @@ -202,12 +203,12 @@ impl HydroflowCrateService {
}
}

pub fn update_meta(&mut self, meta: String) {
pub fn update_meta<T: Serialize>(&mut self, meta: T) {
if self.launched_binary.is_some() {
panic!("Cannot update meta after binary has been launched")
}

self.meta = Some(meta);
self.meta = Some(serde_json::to_string(&meta).unwrap());
}

pub fn get_port(
Expand Down
1 change: 0 additions & 1 deletion hydro_deploy/hydroflow_plus_cli_integration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ hydroflow_plus = { path = "../../hydroflow_plus", version = "^0.5.0", features =
syn = { version = "2.0.0", features = [ "parsing", "extra-traits" ] }
tokio = { version = "1.16", features = [ "full" ] }
serde = { version = "1", features = [ "derive" ] }
serde_json = "1"

hydro_deploy = { path = "../core", version = "^0.5.0", optional = true }
async-channel = { version = "1.8.0", optional = true }
Expand Down
39 changes: 23 additions & 16 deletions hydro_deploy/hydroflow_plus_cli_integration/src/deploy.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::cell::RefCell;
use std::collections::HashMap;
use std::rc::Rc;
use std::sync::Arc;

Expand Down Expand Up @@ -26,7 +27,7 @@ pub struct CLIDeploy {}
impl<'a> Deploy<'a> for CLIDeploy {
type Node = CLIDeployNode<'a>;
type Cluster = CLIDeployCluster<'a>;
type Meta = HydroflowPlusMeta;
type Meta = HashMap<usize, Vec<u32>>;
type RuntimeID = ();
type NodePort = CLIDeployPort<CLIDeployNode<'a>>;
type ClusterPort = CLIDeployPort<CLIDeployCluster<'a>>;
Expand Down Expand Up @@ -96,7 +97,7 @@ impl<'a> CLIDeployPort<CLIDeployNode<'a>> {

impl<'a> HfNode<'a> for CLIDeployNode<'a> {
type Port = CLIDeployPort<Self>;
type Meta = HydroflowPlusMeta;
type Meta = HashMap<usize, Vec<u32>>;

fn id(&self) -> usize {
self.id
Expand All @@ -118,7 +119,10 @@ impl<'a> HfNode<'a> for CLIDeployNode<'a> {

fn update_meta(&mut self, meta: &Self::Meta) {
let mut n = self.underlying.try_write().unwrap();
n.update_meta(serde_json::to_string(&meta).unwrap());
n.update_meta(HydroflowPlusMeta {
clusters: meta.clone(),
subgraph_id: self.id,
});
}
}

Expand All @@ -143,7 +147,7 @@ pub struct CLIDeployCluster<'a> {

impl<'a> HfNode<'a> for CLIDeployCluster<'a> {
type Port = CLIDeployPort<Self>;
type Meta = HydroflowPlusMeta;
type Meta = HashMap<usize, Vec<u32>>;

fn id(&self) -> usize {
self.id
Expand All @@ -164,10 +168,14 @@ impl<'a> HfNode<'a> for CLIDeployCluster<'a> {
}

fn update_meta(&mut self, meta: &Self::Meta) {
let json_meta = serde_json::to_string(&meta).unwrap();
let meta = HydroflowPlusMeta {
clusters: meta.clone(),
subgraph_id: self.id,
};

self.nodes.iter().for_each(|n| {
let mut n = n.underlying.try_write().unwrap();
n.update_meta(json_meta.clone());
n.update_meta(&meta);
});
}
}
Expand Down Expand Up @@ -351,12 +359,12 @@ impl<'a> HfSendManyToMany<'a, CLIDeployCluster<'a>> for CLIDeployCluster<'a> {
}
}

type CrateBuilder<'a> = dyn FnMut(usize) -> Arc<RwLock<HydroflowCrateService>> + 'a;
type CrateBuilder<'a> = dyn FnMut() -> Arc<RwLock<HydroflowCrateService>> + 'a;

pub struct CLIDeployNodeBuilder<'a>(RefCell<Box<CrateBuilder<'a>>>);

impl<'a> CLIDeployNodeBuilder<'a> {
pub fn new<F: FnMut(usize) -> Arc<RwLock<HydroflowCrateService>> + 'a>(f: F) -> Self {
pub fn new<F: FnMut() -> Arc<RwLock<HydroflowCrateService>> + 'a>(f: F) -> Self {
Self(RefCell::new(Box::new(f)))
}
}
Expand All @@ -366,23 +374,23 @@ impl<'a: 'b, 'b> NodeBuilder<'a, CLIDeploy> for CLIDeployNodeBuilder<'b> {
&self,
id: usize,
builder: &'a GraphBuilder<'a, CLIDeploy>,
_meta: &mut HydroflowPlusMeta,
_meta: &mut HashMap<usize, Vec<u32>>,
) -> CLIDeployNode<'a> {
CLIDeployNode {
id,
builder,
next_port: Rc::new(RefCell::new(0)),
underlying: (self.0.borrow_mut())(id),
underlying: (self.0.borrow_mut())(),
}
}
}

type ClusterBuilderFn<'a> = dyn FnMut(usize) -> Vec<Arc<RwLock<HydroflowCrateService>>> + 'a;
type ClusterBuilderFn<'a> = dyn FnMut() -> Vec<Arc<RwLock<HydroflowCrateService>>> + 'a;

pub struct CLIDeployClusterBuilder<'a>(RefCell<Box<ClusterBuilderFn<'a>>>);

impl<'a> CLIDeployClusterBuilder<'a> {
pub fn new<F: FnMut(usize) -> Vec<Arc<RwLock<HydroflowCrateService>>> + 'a>(f: F) -> Self {
pub fn new<F: FnMut() -> Vec<Arc<RwLock<HydroflowCrateService>>> + 'a>(f: F) -> Self {
Self(RefCell::new(Box::new(f)))
}
}
Expand All @@ -392,11 +400,10 @@ impl<'a: 'b, 'b> ClusterBuilder<'a, CLIDeploy> for CLIDeployClusterBuilder<'b> {
&self,
id: usize,
builder: &'a GraphBuilder<'a, CLIDeploy>,
meta: &mut HydroflowPlusMeta,
meta: &mut HashMap<usize, Vec<u32>>,
) -> CLIDeployCluster<'a> {
let cluster_nodes = (self.0.borrow_mut())(id);
meta.clusters
.insert(id, (0..(cluster_nodes.len() as u32)).collect());
let cluster_nodes = (self.0.borrow_mut())();
meta.insert(id, (0..(cluster_nodes.len() as u32)).collect());

CLIDeployCluster {
id,
Expand Down
1 change: 1 addition & 0 deletions hydro_deploy/hydroflow_plus_cli_integration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ use serde::{Deserialize, Serialize};
#[derive(Default, Serialize, Deserialize)]
pub struct HydroflowPlusMeta {
pub clusters: HashMap<usize, Vec<u32>>,
pub subgraph_id: usize,
}
2 changes: 1 addition & 1 deletion hydro_deploy/hydroflow_plus_cli_integration/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl<'a> HfCluster<'a> for CLIRuntimeCluster<'a> {
fn ids(&self) -> impl Quoted<'a, &'a Vec<u32>> + Copy + 'a {
let cli = self.cli;
let self_id = self.id;
q!(cli.meta.as_ref().unwrap().clusters.get(&self_id).unwrap())
q!(cli.meta.clusters.get(&self_id).unwrap())
}
}

Expand Down
11 changes: 7 additions & 4 deletions hydroflow/src/util/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ pub async fn launch_flow(mut flow: Hydroflow<'_>) {
}
}

pub struct HydroCLI<T> {
pub struct HydroCLI<T = Option<()>> {
ports: RefCell<HashMap<String, ServerOrBound>>,
pub meta: Option<T>,
pub meta: T,
}

impl<T> HydroCLI<T> {
Expand All @@ -43,7 +43,7 @@ impl<T> HydroCLI<T> {
}
}

pub async fn init<T: DeserializeOwned>() -> HydroCLI<T> {
pub async fn init<T: DeserializeOwned + Default>() -> HydroCLI<T> {
let mut input = String::new();
std::io::stdin().read_line(&mut input).unwrap();
let trimmed = input.trim();
Expand Down Expand Up @@ -86,6 +86,9 @@ pub async fn init<T: DeserializeOwned>() -> HydroCLI<T> {

HydroCLI {
ports: RefCell::new(all_connected),
meta: bind_config.1.map(|s| serde_json::from_str(&s).unwrap()),
meta: bind_config
.1
.map(|b| serde_json::from_str(&b).unwrap())
.unwrap_or_default(),
}
}
5 changes: 2 additions & 3 deletions hydroflow_plus_test/examples/first_ten_distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,12 @@ async fn main() {
let builder = hydroflow_plus::GraphBuilder::new();
hydroflow_plus_test::first_ten::first_ten_distributed(
&builder,
&CLIDeployNodeBuilder::new(|id| {
&CLIDeployNodeBuilder::new(|| {
let host = create_host(&mut deployment);
deployment.add_service(
HydroflowCrate::new(".", host.clone())
.bin("first_ten_distributed")
.profile(profile)
.args(vec![id.to_string()]),
.profile(profile),
)
}),
);
Expand Down
6 changes: 2 additions & 4 deletions hydroflow_plus_test/examples/map_reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,17 @@ async fn main() {
let builder = hydroflow_plus::GraphBuilder::new();
hydroflow_plus_test::cluster::map_reduce(
&builder,
&CLIDeployNodeBuilder::new(|id| {
&CLIDeployNodeBuilder::new(|| {
let mut deployment = deployment.borrow_mut();
let host = create_host(&mut deployment);
deployment.add_service(
HydroflowCrate::new(".", host.clone())
.bin("map_reduce")
.profile(profile)
.args(vec![id.to_string()])
.display_name("leader"),
)
}),
&CLIDeployClusterBuilder::new(|id| {
&CLIDeployClusterBuilder::new(|| {
let mut deployment = deployment.borrow_mut();
(0..2)
.map(|idx| {
Expand All @@ -62,7 +61,6 @@ async fn main() {
HydroflowCrate::new(".", host.clone())
.bin("map_reduce")
.profile(profile)
.args(vec![id.to_string()])
.display_name(format!("cluster/{}", idx)),
)
})
Expand Down
5 changes: 2 additions & 3 deletions hydroflow_plus_test/examples/networked_basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,11 @@ async fn main() {
let builder = hydroflow_plus::GraphBuilder::new();
let (source_zero_port, _, _) = hydroflow_plus_test::networked::networked_basic(
&builder,
&CLIDeployNodeBuilder::new(|id| {
&CLIDeployNodeBuilder::new(|| {
deployment.add_service(
HydroflowCrate::new(".", localhost.clone())
.bin("networked_basic")
.profile("dev")
.args(vec![id.to_string()]),
.profile("dev"),
)
}),
);
Expand Down
10 changes: 4 additions & 6 deletions hydroflow_plus_test/examples/simple_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,26 +42,24 @@ async fn main() {
let builder = hydroflow_plus::GraphBuilder::new();
hydroflow_plus_test::cluster::simple_cluster(
&builder,
&CLIDeployNodeBuilder::new(|id| {
&CLIDeployNodeBuilder::new(|| {
let mut deployment = deployment.borrow_mut();
let host = create_host(&mut deployment);
deployment.add_service(
HydroflowCrate::new(".", host)
.bin("simple_cluster")
.profile(profile)
.args(vec![id.to_string()]),
.profile(profile),
)
}),
&CLIDeployClusterBuilder::new(|id| {
&CLIDeployClusterBuilder::new(|| {
let mut deployment = deployment.borrow_mut();
(0..2)
.map(|_| {
let host = create_host(&mut deployment);
deployment.add_service(
HydroflowCrate::new(".", host)
.bin("simple_cluster")
.profile(profile)
.args(vec![id.to_string()]),
.profile(profile),
)
})
.collect()
Expand Down
9 changes: 4 additions & 5 deletions hydroflow_plus_test/src/bin/first_ten_distributed.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
// cannot use hydroflow::main because connect_local_blocking causes a deadlock
#[tokio::main]
async fn main() {
let subgraph_id: usize = std::env::args().nth(1).unwrap().parse().unwrap();
let ports = hydroflow::util::cli::init().await;

let joined =
hydroflow_plus_test::first_ten::first_ten_distributed_runtime!(&ports, subgraph_id);

hydroflow::util::cli::launch_flow(joined).await;
hydroflow::util::cli::launch_flow(
hydroflow_plus_test::first_ten::first_ten_distributed_runtime!(&ports),
)
.await;
}
6 changes: 2 additions & 4 deletions hydroflow_plus_test/src/bin/many_to_many.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
// cannot use hydroflow::main because connect_local_blocking causes a deadlock
#[tokio::main]
async fn main() {
let subgraph_id: usize = std::env::args().nth(1).unwrap().parse().unwrap();
let ports = hydroflow::util::cli::init().await;

let joined = hydroflow_plus_test::cluster::many_to_many_runtime!(&ports, subgraph_id);

hydroflow::util::cli::launch_flow(joined).await;
hydroflow::util::cli::launch_flow(hydroflow_plus_test::cluster::many_to_many_runtime!(&ports))
.await;
}
6 changes: 2 additions & 4 deletions hydroflow_plus_test/src/bin/map_reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@ extern crate alloc;
// cannot use hydroflow::main because connect_local_blocking causes a deadlock
#[tokio::main]
async fn main() {
let subgraph_id: usize = std::env::args().nth(1).unwrap().parse().unwrap();
let ports = hydroflow::util::cli::init().await;

let joined = hydroflow_plus_test::cluster::map_reduce_runtime!(&ports, subgraph_id);

hydroflow::util::cli::launch_flow(joined).await;
hydroflow::util::cli::launch_flow(hydroflow_plus_test::cluster::map_reduce_runtime!(&ports))
.await;
}
8 changes: 4 additions & 4 deletions hydroflow_plus_test/src/bin/networked_basic.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
// cannot use hydroflow::main because connect_local_blocking causes a deadlock
#[tokio::main]
async fn main() {
let subgraph_id: usize = std::env::args().nth(1).unwrap().parse().unwrap();
let ports = hydroflow::util::cli::init().await;

let joined = hydroflow_plus_test::networked::networked_basic_runtime!(&ports, subgraph_id);

hydroflow::util::cli::launch_flow(joined).await;
hydroflow::util::cli::launch_flow(hydroflow_plus_test::networked::networked_basic_runtime!(
&ports
))
.await;
}
Loading

0 comments on commit c02ead5

Please sign in to comment.