From ecc202b566593daa4b3b07b6b25ae6c5c646ebea Mon Sep 17 00:00:00 2001 From: Shadaj Laddad Date: Thu, 21 Dec 2023 16:46:48 -0800 Subject: [PATCH] feat(hydroflow_plus): pass subgraph ID through deploy metadata --- Cargo.lock | 1 - docs/docs/hydroflow_plus/distributed.mdx | 15 +++---- hydro_deploy/core/src/hydroflow_crate/mod.rs | 5 ++- .../hydroflow_plus_cli_integration/Cargo.toml | 1 - .../src/deploy.rs | 39 +++++++++++-------- .../hydroflow_plus_cli_integration/src/lib.rs | 1 + .../src/runtime.rs | 2 +- hydroflow/src/util/cli.rs | 11 ++++-- .../examples/first_ten_distributed.rs | 5 +-- hydroflow_plus_test/examples/map_reduce.rs | 6 +-- .../examples/networked_basic.rs | 5 +-- .../examples/simple_cluster.rs | 10 ++--- .../src/bin/first_ten_distributed.rs | 9 ++--- hydroflow_plus_test/src/bin/many_to_many.rs | 6 +-- hydroflow_plus_test/src/bin/map_reduce.rs | 6 +-- .../src/bin/networked_basic.rs | 8 ++-- hydroflow_plus_test/src/bin/simple_cluster.rs | 8 ++-- hydroflow_plus_test/src/cluster.rs | 24 +++++------- hydroflow_plus_test/src/first_ten.rs | 8 ++-- hydroflow_plus_test/src/networked.rs | 8 ++-- 20 files changed, 82 insertions(+), 96 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 03ac39793f99..87d386f8f7e6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1503,7 +1503,6 @@ dependencies = [ "hydro_deploy", "hydroflow_plus", "serde", - "serde_json", "stageleft", "stageleft_tool", "syn 2.0.14", diff --git a/docs/docs/hydroflow_plus/distributed.mdx b/docs/docs/hydroflow_plus/distributed.mdx index c190648816f9..430b9673e386 100644 --- a/docs/docs/hydroflow_plus/distributed.mdx +++ b/docs/docs/hydroflow_plus/distributed.mdx @@ -56,10 +56,9 @@ use hydroflow_plus_cli_integration::{CLIRuntime, HydroflowPlusMeta}; pub fn first_ten_distributed_runtime<'a>( graph: &'a GraphBuilder<'a, CLIRuntime>, cli: RuntimeData<&'a HydroCLI>, - subgraph_id: RuntimeData, ) -> impl Quoted<'a, Hydroflow<'a>> { let _ = first_ten_distributed(graph, &cli); - graph.build(subgraph_id) + graph.build(q!(cli.meta.subgraph_id)) } ``` @@ -68,12 +67,11 @@ The corresponding binary in `src/bin/first_ten_distributed.rs` then instantiates ```rust #[tokio::main] async fn main() { - let subgraph_id: usize = std::env::args().nth(1).unwrap().parse().unwrap(); let ports = hydroflow::util::cli::init().await; - let joined = flow::first_ten_distributed_runtime!(&ports, subgraph_id); - - hydroflow::util::cli::launch_flow(joined).await; + hydroflow::util::cli::launch_flow( + flow::first_ten_distributed_runtime!(&ports) + ).await; } ``` @@ -92,12 +90,11 @@ async fn main() { let builder = hydroflow_plus::GraphBuilder::new(); hydroflow_plus_test::first_ten::first_ten_distributed( &builder, - &CLIDeployNodeBuilder::new(|id| { + &CLIDeployNodeBuilder::new(|| { deployment.add_service( HydroflowCrate::new(".", localhost.clone()) .bin("first_ten_distributed") - .profile("dev") - .args(vec![id.to_string()]), + .profile("dev"), ) }), ); diff --git a/hydro_deploy/core/src/hydroflow_crate/mod.rs b/hydro_deploy/core/src/hydroflow_crate/mod.rs index 9b3f538dc9d3..b2f71a28ba57 100644 --- a/hydro_deploy/core/src/hydroflow_crate/mod.rs +++ b/hydro_deploy/core/src/hydroflow_crate/mod.rs @@ -8,6 +8,7 @@ use async_channel::Receiver; use async_trait::async_trait; use futures_core::Future; use hydroflow_cli_integration::{InitConfig, ServerPort}; +use serde::Serialize; use tokio::sync::RwLock; use self::ports::{HydroflowPortConfig, HydroflowSink, SourcePath}; @@ -202,12 +203,12 @@ impl HydroflowCrateService { } } - pub fn update_meta(&mut self, meta: String) { + pub fn update_meta(&mut self, meta: T) { if self.launched_binary.is_some() { panic!("Cannot update meta after binary has been launched") } - self.meta = Some(meta); + self.meta = Some(serde_json::to_string(&meta).unwrap()); } pub fn get_port( diff --git a/hydro_deploy/hydroflow_plus_cli_integration/Cargo.toml b/hydro_deploy/hydroflow_plus_cli_integration/Cargo.toml index 9356520c1240..567eb3e2ba41 100644 --- a/hydro_deploy/hydroflow_plus_cli_integration/Cargo.toml +++ b/hydro_deploy/hydroflow_plus_cli_integration/Cargo.toml @@ -14,7 +14,6 @@ hydroflow_plus = { path = "../../hydroflow_plus", version = "^0.5.0", features = syn = { version = "2.0.0", features = [ "parsing", "extra-traits" ] } tokio = { version = "1.16", features = [ "full" ] } serde = { version = "1", features = [ "derive" ] } -serde_json = "1" hydro_deploy = { path = "../core", version = "^0.5.0", optional = true } async-channel = { version = "1.8.0", optional = true } diff --git a/hydro_deploy/hydroflow_plus_cli_integration/src/deploy.rs b/hydro_deploy/hydroflow_plus_cli_integration/src/deploy.rs index e825685c782b..1605fcd7661e 100644 --- a/hydro_deploy/hydroflow_plus_cli_integration/src/deploy.rs +++ b/hydro_deploy/hydroflow_plus_cli_integration/src/deploy.rs @@ -1,4 +1,5 @@ use std::cell::RefCell; +use std::collections::HashMap; use std::rc::Rc; use std::sync::Arc; @@ -26,7 +27,7 @@ pub struct CLIDeploy {} impl<'a> Deploy<'a> for CLIDeploy { type Node = CLIDeployNode<'a>; type Cluster = CLIDeployCluster<'a>; - type Meta = HydroflowPlusMeta; + type Meta = HashMap>; type RuntimeID = (); type NodePort = CLIDeployPort>; type ClusterPort = CLIDeployPort>; @@ -96,7 +97,7 @@ impl<'a> CLIDeployPort> { impl<'a> HfNode<'a> for CLIDeployNode<'a> { type Port = CLIDeployPort; - type Meta = HydroflowPlusMeta; + type Meta = HashMap>; fn id(&self) -> usize { self.id @@ -118,7 +119,10 @@ impl<'a> HfNode<'a> for CLIDeployNode<'a> { fn update_meta(&mut self, meta: &Self::Meta) { let mut n = self.underlying.try_write().unwrap(); - n.update_meta(serde_json::to_string(&meta).unwrap()); + n.update_meta(HydroflowPlusMeta { + clusters: meta.clone(), + subgraph_id: self.id, + }); } } @@ -143,7 +147,7 @@ pub struct CLIDeployCluster<'a> { impl<'a> HfNode<'a> for CLIDeployCluster<'a> { type Port = CLIDeployPort; - type Meta = HydroflowPlusMeta; + type Meta = HashMap>; fn id(&self) -> usize { self.id @@ -164,10 +168,14 @@ impl<'a> HfNode<'a> for CLIDeployCluster<'a> { } fn update_meta(&mut self, meta: &Self::Meta) { - let json_meta = serde_json::to_string(&meta).unwrap(); + let meta = HydroflowPlusMeta { + clusters: meta.clone(), + subgraph_id: self.id, + }; + self.nodes.iter().for_each(|n| { let mut n = n.underlying.try_write().unwrap(); - n.update_meta(json_meta.clone()); + n.update_meta(&meta); }); } } @@ -351,12 +359,12 @@ impl<'a> HfSendManyToMany<'a, CLIDeployCluster<'a>> for CLIDeployCluster<'a> { } } -type CrateBuilder<'a> = dyn FnMut(usize) -> Arc> + 'a; +type CrateBuilder<'a> = dyn FnMut() -> Arc> + 'a; pub struct CLIDeployNodeBuilder<'a>(RefCell>>); impl<'a> CLIDeployNodeBuilder<'a> { - pub fn new Arc> + 'a>(f: F) -> Self { + pub fn new Arc> + 'a>(f: F) -> Self { Self(RefCell::new(Box::new(f))) } } @@ -366,23 +374,23 @@ impl<'a: 'b, 'b> NodeBuilder<'a, CLIDeploy> for CLIDeployNodeBuilder<'b> { &self, id: usize, builder: &'a GraphBuilder<'a, CLIDeploy>, - _meta: &mut HydroflowPlusMeta, + _meta: &mut HashMap>, ) -> CLIDeployNode<'a> { CLIDeployNode { id, builder, next_port: Rc::new(RefCell::new(0)), - underlying: (self.0.borrow_mut())(id), + underlying: (self.0.borrow_mut())(), } } } -type ClusterBuilderFn<'a> = dyn FnMut(usize) -> Vec>> + 'a; +type ClusterBuilderFn<'a> = dyn FnMut() -> Vec>> + 'a; pub struct CLIDeployClusterBuilder<'a>(RefCell>>); impl<'a> CLIDeployClusterBuilder<'a> { - pub fn new Vec>> + 'a>(f: F) -> Self { + pub fn new Vec>> + 'a>(f: F) -> Self { Self(RefCell::new(Box::new(f))) } } @@ -392,11 +400,10 @@ impl<'a: 'b, 'b> ClusterBuilder<'a, CLIDeploy> for CLIDeployClusterBuilder<'b> { &self, id: usize, builder: &'a GraphBuilder<'a, CLIDeploy>, - meta: &mut HydroflowPlusMeta, + meta: &mut HashMap>, ) -> CLIDeployCluster<'a> { - let cluster_nodes = (self.0.borrow_mut())(id); - meta.clusters - .insert(id, (0..(cluster_nodes.len() as u32)).collect()); + let cluster_nodes = (self.0.borrow_mut())(); + meta.insert(id, (0..(cluster_nodes.len() as u32)).collect()); CLIDeployCluster { id, diff --git a/hydro_deploy/hydroflow_plus_cli_integration/src/lib.rs b/hydro_deploy/hydroflow_plus_cli_integration/src/lib.rs index 097bfa042000..37b9043fcc9b 100644 --- a/hydro_deploy/hydroflow_plus_cli_integration/src/lib.rs +++ b/hydro_deploy/hydroflow_plus_cli_integration/src/lib.rs @@ -15,4 +15,5 @@ use serde::{Deserialize, Serialize}; #[derive(Default, Serialize, Deserialize)] pub struct HydroflowPlusMeta { pub clusters: HashMap>, + pub subgraph_id: usize, } diff --git a/hydro_deploy/hydroflow_plus_cli_integration/src/runtime.rs b/hydro_deploy/hydroflow_plus_cli_integration/src/runtime.rs index 72084d324637..570ee78b8583 100644 --- a/hydro_deploy/hydroflow_plus_cli_integration/src/runtime.rs +++ b/hydro_deploy/hydroflow_plus_cli_integration/src/runtime.rs @@ -88,7 +88,7 @@ impl<'a> HfCluster<'a> for CLIRuntimeCluster<'a> { fn ids(&self) -> impl Quoted<'a, &'a Vec> + Copy + 'a { let cli = self.cli; let self_id = self.id; - q!(cli.meta.as_ref().unwrap().clusters.get(&self_id).unwrap()) + q!(cli.meta.clusters.get(&self_id).unwrap()) } } diff --git a/hydroflow/src/util/cli.rs b/hydroflow/src/util/cli.rs index 8ec17c3fa186..b634a5505220 100644 --- a/hydroflow/src/util/cli.rs +++ b/hydroflow/src/util/cli.rs @@ -28,9 +28,9 @@ pub async fn launch_flow(mut flow: Hydroflow<'_>) { } } -pub struct HydroCLI { +pub struct HydroCLI> { ports: RefCell>, - pub meta: Option, + pub meta: T, } impl HydroCLI { @@ -43,7 +43,7 @@ impl HydroCLI { } } -pub async fn init() -> HydroCLI { +pub async fn init() -> HydroCLI { let mut input = String::new(); std::io::stdin().read_line(&mut input).unwrap(); let trimmed = input.trim(); @@ -86,6 +86,9 @@ pub async fn init() -> HydroCLI { HydroCLI { ports: RefCell::new(all_connected), - meta: bind_config.1.map(|s| serde_json::from_str(&s).unwrap()), + meta: bind_config + .1 + .map(|b| serde_json::from_str(&b).unwrap()) + .unwrap_or_default(), } } diff --git a/hydroflow_plus_test/examples/first_ten_distributed.rs b/hydroflow_plus_test/examples/first_ten_distributed.rs index e8bfd05598f5..15ae80ee1c95 100644 --- a/hydroflow_plus_test/examples/first_ten_distributed.rs +++ b/hydroflow_plus_test/examples/first_ten_distributed.rs @@ -41,13 +41,12 @@ async fn main() { let builder = hydroflow_plus::GraphBuilder::new(); hydroflow_plus_test::first_ten::first_ten_distributed( &builder, - &CLIDeployNodeBuilder::new(|id| { + &CLIDeployNodeBuilder::new(|| { let host = create_host(&mut deployment); deployment.add_service( HydroflowCrate::new(".", host.clone()) .bin("first_ten_distributed") - .profile(profile) - .args(vec![id.to_string()]), + .profile(profile), ) }), ); diff --git a/hydroflow_plus_test/examples/map_reduce.rs b/hydroflow_plus_test/examples/map_reduce.rs index cea962029caa..fbb2546135d1 100644 --- a/hydroflow_plus_test/examples/map_reduce.rs +++ b/hydroflow_plus_test/examples/map_reduce.rs @@ -42,18 +42,17 @@ async fn main() { let builder = hydroflow_plus::GraphBuilder::new(); hydroflow_plus_test::cluster::map_reduce( &builder, - &CLIDeployNodeBuilder::new(|id| { + &CLIDeployNodeBuilder::new(|| { let mut deployment = deployment.borrow_mut(); let host = create_host(&mut deployment); deployment.add_service( HydroflowCrate::new(".", host.clone()) .bin("map_reduce") .profile(profile) - .args(vec![id.to_string()]) .display_name("leader"), ) }), - &CLIDeployClusterBuilder::new(|id| { + &CLIDeployClusterBuilder::new(|| { let mut deployment = deployment.borrow_mut(); (0..2) .map(|idx| { @@ -62,7 +61,6 @@ async fn main() { HydroflowCrate::new(".", host.clone()) .bin("map_reduce") .profile(profile) - .args(vec![id.to_string()]) .display_name(format!("cluster/{}", idx)), ) }) diff --git a/hydroflow_plus_test/examples/networked_basic.rs b/hydroflow_plus_test/examples/networked_basic.rs index f8ad6bfb7545..d6de39692656 100644 --- a/hydroflow_plus_test/examples/networked_basic.rs +++ b/hydroflow_plus_test/examples/networked_basic.rs @@ -11,12 +11,11 @@ async fn main() { let builder = hydroflow_plus::GraphBuilder::new(); let (source_zero_port, _, _) = hydroflow_plus_test::networked::networked_basic( &builder, - &CLIDeployNodeBuilder::new(|id| { + &CLIDeployNodeBuilder::new(|| { deployment.add_service( HydroflowCrate::new(".", localhost.clone()) .bin("networked_basic") - .profile("dev") - .args(vec![id.to_string()]), + .profile("dev"), ) }), ); diff --git a/hydroflow_plus_test/examples/simple_cluster.rs b/hydroflow_plus_test/examples/simple_cluster.rs index 9d19a0873d01..359b2ef1bf38 100644 --- a/hydroflow_plus_test/examples/simple_cluster.rs +++ b/hydroflow_plus_test/examples/simple_cluster.rs @@ -42,17 +42,16 @@ async fn main() { let builder = hydroflow_plus::GraphBuilder::new(); hydroflow_plus_test::cluster::simple_cluster( &builder, - &CLIDeployNodeBuilder::new(|id| { + &CLIDeployNodeBuilder::new(|| { let mut deployment = deployment.borrow_mut(); let host = create_host(&mut deployment); deployment.add_service( HydroflowCrate::new(".", host) .bin("simple_cluster") - .profile(profile) - .args(vec![id.to_string()]), + .profile(profile), ) }), - &CLIDeployClusterBuilder::new(|id| { + &CLIDeployClusterBuilder::new(|| { let mut deployment = deployment.borrow_mut(); (0..2) .map(|_| { @@ -60,8 +59,7 @@ async fn main() { deployment.add_service( HydroflowCrate::new(".", host) .bin("simple_cluster") - .profile(profile) - .args(vec![id.to_string()]), + .profile(profile), ) }) .collect() diff --git a/hydroflow_plus_test/src/bin/first_ten_distributed.rs b/hydroflow_plus_test/src/bin/first_ten_distributed.rs index 776b94a8015e..6ec405c55018 100644 --- a/hydroflow_plus_test/src/bin/first_ten_distributed.rs +++ b/hydroflow_plus_test/src/bin/first_ten_distributed.rs @@ -1,11 +1,10 @@ // cannot use hydroflow::main because connect_local_blocking causes a deadlock #[tokio::main] async fn main() { - let subgraph_id: usize = std::env::args().nth(1).unwrap().parse().unwrap(); let ports = hydroflow::util::cli::init().await; - let joined = - hydroflow_plus_test::first_ten::first_ten_distributed_runtime!(&ports, subgraph_id); - - hydroflow::util::cli::launch_flow(joined).await; + hydroflow::util::cli::launch_flow( + hydroflow_plus_test::first_ten::first_ten_distributed_runtime!(&ports), + ) + .await; } diff --git a/hydroflow_plus_test/src/bin/many_to_many.rs b/hydroflow_plus_test/src/bin/many_to_many.rs index 91ba7fd97b96..4fe217b6e0d6 100644 --- a/hydroflow_plus_test/src/bin/many_to_many.rs +++ b/hydroflow_plus_test/src/bin/many_to_many.rs @@ -1,10 +1,8 @@ // cannot use hydroflow::main because connect_local_blocking causes a deadlock #[tokio::main] async fn main() { - let subgraph_id: usize = std::env::args().nth(1).unwrap().parse().unwrap(); let ports = hydroflow::util::cli::init().await; - let joined = hydroflow_plus_test::cluster::many_to_many_runtime!(&ports, subgraph_id); - - hydroflow::util::cli::launch_flow(joined).await; + hydroflow::util::cli::launch_flow(hydroflow_plus_test::cluster::many_to_many_runtime!(&ports)) + .await; } diff --git a/hydroflow_plus_test/src/bin/map_reduce.rs b/hydroflow_plus_test/src/bin/map_reduce.rs index a41e9a7696ec..17526ca37efe 100644 --- a/hydroflow_plus_test/src/bin/map_reduce.rs +++ b/hydroflow_plus_test/src/bin/map_reduce.rs @@ -4,10 +4,8 @@ extern crate alloc; // cannot use hydroflow::main because connect_local_blocking causes a deadlock #[tokio::main] async fn main() { - let subgraph_id: usize = std::env::args().nth(1).unwrap().parse().unwrap(); let ports = hydroflow::util::cli::init().await; - let joined = hydroflow_plus_test::cluster::map_reduce_runtime!(&ports, subgraph_id); - - hydroflow::util::cli::launch_flow(joined).await; + hydroflow::util::cli::launch_flow(hydroflow_plus_test::cluster::map_reduce_runtime!(&ports)) + .await; } diff --git a/hydroflow_plus_test/src/bin/networked_basic.rs b/hydroflow_plus_test/src/bin/networked_basic.rs index 0a00c8345dc1..5e85fc102eb6 100644 --- a/hydroflow_plus_test/src/bin/networked_basic.rs +++ b/hydroflow_plus_test/src/bin/networked_basic.rs @@ -1,10 +1,10 @@ // cannot use hydroflow::main because connect_local_blocking causes a deadlock #[tokio::main] async fn main() { - let subgraph_id: usize = std::env::args().nth(1).unwrap().parse().unwrap(); let ports = hydroflow::util::cli::init().await; - let joined = hydroflow_plus_test::networked::networked_basic_runtime!(&ports, subgraph_id); - - hydroflow::util::cli::launch_flow(joined).await; + hydroflow::util::cli::launch_flow(hydroflow_plus_test::networked::networked_basic_runtime!( + &ports + )) + .await; } diff --git a/hydroflow_plus_test/src/bin/simple_cluster.rs b/hydroflow_plus_test/src/bin/simple_cluster.rs index d74d3b48d789..8851839da7d4 100644 --- a/hydroflow_plus_test/src/bin/simple_cluster.rs +++ b/hydroflow_plus_test/src/bin/simple_cluster.rs @@ -1,10 +1,10 @@ // cannot use hydroflow::main because connect_local_blocking causes a deadlock #[tokio::main] async fn main() { - let subgraph_id: usize = std::env::args().nth(1).unwrap().parse().unwrap(); let ports = hydroflow::util::cli::init().await; - let joined = hydroflow_plus_test::cluster::simple_cluster_runtime!(&ports, subgraph_id); - - hydroflow::util::cli::launch_flow(joined).await; + hydroflow::util::cli::launch_flow(hydroflow_plus_test::cluster::simple_cluster_runtime!( + &ports + )) + .await; } diff --git a/hydroflow_plus_test/src/cluster.rs b/hydroflow_plus_test/src/cluster.rs index f0ba4e223843..b4c9bbb1f728 100644 --- a/hydroflow_plus_test/src/cluster.rs +++ b/hydroflow_plus_test/src/cluster.rs @@ -75,30 +75,27 @@ use hydroflow_plus_cli_integration::{CLIRuntime, HydroflowPlusMeta}; pub fn simple_cluster_runtime<'a>( graph: &'a GraphBuilder<'a, CLIRuntime>, cli: RuntimeData<&'a HydroCLI>, - subgraph_id: RuntimeData, ) -> impl Quoted<'a, Hydroflow<'a>> { let _ = simple_cluster(graph, &cli, &cli); - graph.build(subgraph_id) + graph.build(q!(cli.meta.subgraph_id)) } #[stageleft::entry] pub fn many_to_many_runtime<'a>( graph: &'a GraphBuilder<'a, CLIRuntime>, cli: RuntimeData<&'a HydroCLI>, - subgraph_id: RuntimeData, ) -> impl Quoted<'a, Hydroflow<'a>> { let _ = many_to_many(graph, &cli); - graph.build(subgraph_id) + graph.build(q!(cli.meta.subgraph_id)) } #[stageleft::entry] pub fn map_reduce_runtime<'a>( graph: &'a GraphBuilder<'a, CLIRuntime>, cli: RuntimeData<&'a HydroCLI>, - subgraph_id: RuntimeData, ) -> impl Quoted<'a, Hydroflow<'a>> { let _ = map_reduce(graph, &cli, &cli); - graph.build(subgraph_id) + graph.build(q!(cli.meta.subgraph_id)) } #[stageleft::runtime] @@ -121,22 +118,20 @@ mod tests { let builder = hydroflow_plus::GraphBuilder::new(); let (node, cluster) = super::simple_cluster( &builder, - &CLIDeployNodeBuilder::new(|id| { + &CLIDeployNodeBuilder::new(|| { deployment.borrow_mut().add_service( HydroflowCrate::new(".", localhost.clone()) .bin("simple_cluster") - .profile("dev") - .args(vec![id.to_string()]), + .profile("dev"), ) }), - &CLIDeployClusterBuilder::new(|id| { + &CLIDeployClusterBuilder::new(|| { (0..2) .map(|_| { deployment.borrow_mut().add_service( HydroflowCrate::new(".", localhost.clone()) .bin("simple_cluster") - .profile("dev") - .args(vec![id.to_string()]), + .profile("dev"), ) }) .collect() @@ -192,14 +187,13 @@ mod tests { let builder = hydroflow_plus::GraphBuilder::new(); let cluster = super::many_to_many( &builder, - &CLIDeployClusterBuilder::new(|id| { + &CLIDeployClusterBuilder::new(|| { (0..2) .map(|_| { deployment.borrow_mut().add_service( HydroflowCrate::new(".", localhost.clone()) .bin("many_to_many") - .profile("dev") - .args(vec![id.to_string()]), + .profile("dev"), ) }) .collect() diff --git a/hydroflow_plus_test/src/first_ten.rs b/hydroflow_plus_test/src/first_ten.rs index 8440fa1cc8d2..e5028718689d 100644 --- a/hydroflow_plus_test/src/first_ten.rs +++ b/hydroflow_plus_test/src/first_ten.rs @@ -41,10 +41,9 @@ use hydroflow_plus_cli_integration::{CLIRuntime, HydroflowPlusMeta}; pub fn first_ten_distributed_runtime<'a>( graph: &'a GraphBuilder<'a, CLIRuntime>, cli: RuntimeData<&'a HydroCLI>, - subgraph_id: RuntimeData, ) -> impl Quoted<'a, Hydroflow<'a>> { let _ = first_ten_distributed(graph, &cli); - graph.build(subgraph_id) + graph.build(q!(cli.meta.subgraph_id)) } #[stageleft::runtime] @@ -63,12 +62,11 @@ mod tests { let builder = hydroflow_plus::GraphBuilder::new(); let second_node = super::first_ten_distributed( &builder, - &CLIDeployNodeBuilder::new(|id| { + &CLIDeployNodeBuilder::new(|| { deployment.add_service( HydroflowCrate::new(".", localhost.clone()) .bin("first_ten_distributed") - .profile("dev") - .args(vec![id.to_string()]), + .profile("dev"), ) }), ); diff --git a/hydroflow_plus_test/src/networked.rs b/hydroflow_plus_test/src/networked.rs index 4ae51c7c17e1..b549376d883f 100644 --- a/hydroflow_plus_test/src/networked.rs +++ b/hydroflow_plus_test/src/networked.rs @@ -32,10 +32,9 @@ pub fn networked_basic<'a, D: Deploy<'a>>( pub fn networked_basic_runtime<'a>( graph: &'a GraphBuilder<'a, CLIRuntime>, cli: RuntimeData<&'a HydroCLI>, - subgraph_id: RuntimeData, ) -> impl Quoted<'a, Hydroflow<'a>> { let _ = networked_basic(graph, &cli); - graph.build(subgraph_id) + graph.build(q!(cli.meta.subgraph_id)) } #[stageleft::runtime] @@ -56,12 +55,11 @@ mod tests { let builder = hydroflow_plus::GraphBuilder::new(); let (source_zero_port, _, node_one) = super::networked_basic( &builder, - &CLIDeployNodeBuilder::new(|id| { + &CLIDeployNodeBuilder::new(|| { deployment.add_service( HydroflowCrate::new(".", localhost.clone()) .bin("networked_basic") - .profile("dev") - .args(vec![id.to_string()]), + .profile("dev"), ) }), );