From d429e6d0c2571da5838c0b764d2d2e7a6bcb47e4 Mon Sep 17 00:00:00 2001 From: Shadaj Laddad Date: Mon, 27 Jan 2025 21:43:33 -0800 Subject: [PATCH] feat(hydro_lang): provide APIs for blanket-deploying locations This makes it easy to implement patterns like deploying everything to localhost. --- hydro_lang/src/builder/built.rs | 55 ++++++++++++++++++----- hydro_lang/src/builder/deploy.rs | 56 +++++++++++++++++++++--- hydro_lang/src/builder/mod.rs | 50 +++++++++++++++++---- hydro_lang/src/deploy/in_memory_graph.rs | 8 ++++ hydro_lang/src/deploy/mod.rs | 4 ++ hydro_lang/src/ir.rs | 4 +- hydro_lang/src/test_util.rs | 32 ++++++++++++++ 7 files changed, 183 insertions(+), 26 deletions(-) diff --git a/hydro_lang/src/builder/built.rs b/hydro_lang/src/builder/built.rs index e9a8bdca3f9..42f84242439 100644 --- a/hydro_lang/src/builder/built.rs +++ b/hydro_lang/src/builder/built.rs @@ -12,8 +12,9 @@ use crate::staging_util::Invariant; pub struct BuiltFlow<'a> { pub(super) ir: Vec, - pub(super) processes: Vec, - pub(super) clusters: Vec, + pub(super) process_id_name: Vec<(usize, String)>, + pub(super) cluster_id_name: Vec<(usize, String)>, + pub(super) external_id_name: Vec<(usize, String)>, pub(super) used: bool, pub(super) _phantom: Invariant<'a>, @@ -54,8 +55,9 @@ impl<'a> BuiltFlow<'a> { self.used = true; BuiltFlow { ir: f(std::mem::take(&mut self.ir)), - processes: std::mem::take(&mut self.processes), - clusters: std::mem::take(&mut self.clusters), + process_id_name: std::mem::take(&mut self.process_id_name), + cluster_id_name: std::mem::take(&mut self.cluster_id_name), + external_id_name: std::mem::take(&mut self.external_id_name), used: false, _phantom: PhantomData, } @@ -69,18 +71,27 @@ impl<'a> BuiltFlow<'a> { fn into_deploy>(mut self) -> DeployFlow<'a, D> { self.used = true; let processes = if D::has_trivial_node() { - self.processes + self.process_id_name .iter() - .map(|id| (*id, D::trivial_process(*id))) + .map(|id| (id.0, D::trivial_process(id.0))) .collect() } else { HashMap::new() }; let clusters = if D::has_trivial_node() { - self.clusters + self.cluster_id_name .iter() - .map(|id| (*id, D::trivial_cluster(*id))) + .map(|id| (id.0, D::trivial_cluster(id.0))) + .collect() + } else { + HashMap::new() + }; + + let externals = if D::has_trivial_node() { + self.external_id_name + .iter() + .map(|id| (id.0, D::trivial_external(id.0))) .collect() } else { HashMap::new() @@ -88,9 +99,12 @@ impl<'a> BuiltFlow<'a> { DeployFlow { ir: std::mem::take(&mut self.ir), - nodes: processes, + processes, + process_id_name: std::mem::take(&mut self.process_id_name), clusters, - externals: HashMap::new(), + cluster_id_name: std::mem::take(&mut self.cluster_id_name), + externals, + external_id_name: std::mem::take(&mut self.external_id_name), used: false, _phantom: PhantomData, } @@ -104,6 +118,13 @@ impl<'a> BuiltFlow<'a> { self.into_deploy().with_process(process, spec) } + pub fn with_remaining_processes, S: IntoProcessSpec<'a, D> + 'a>( + self, + spec: impl Fn() -> S, + ) -> DeployFlow<'a, D> { + self.into_deploy().with_remaining_processes(spec) + } + pub fn with_external>( self, process: &ExternalProcess

, @@ -112,6 +133,13 @@ impl<'a> BuiltFlow<'a> { self.into_deploy().with_external(process, spec) } + pub fn with_remaining_externals, S: ExternalSpec<'a, D> + 'a>( + self, + spec: impl Fn() -> S, + ) -> DeployFlow<'a, D> { + self.into_deploy().with_remaining_externals(spec) + } + pub fn with_cluster>( self, cluster: &Cluster, @@ -120,6 +148,13 @@ impl<'a> BuiltFlow<'a> { self.into_deploy().with_cluster(cluster, spec) } + pub fn with_remaining_clusters, S: ClusterSpec<'a, D> + 'a>( + self, + spec: impl Fn() -> S, + ) -> DeployFlow<'a, D> { + self.into_deploy().with_remaining_clusters(spec) + } + pub fn compile>(self, env: &D::CompileEnv) -> CompiledFlow<'a, D::GraphId> { self.into_deploy::().compile(env) } diff --git a/hydro_lang/src/builder/deploy.rs b/hydro_lang/src/builder/deploy.rs index 0b3c71890e4..748dbf3b9df 100644 --- a/hydro_lang/src/builder/deploy.rs +++ b/hydro_lang/src/builder/deploy.rs @@ -25,9 +25,19 @@ use crate::staging_util::Invariant; pub struct DeployFlow<'a, D: LocalDeploy<'a>> { pub(super) ir: Vec, - pub(super) nodes: HashMap, + + /// Deployed instances of each process in the flow + pub(super) processes: HashMap, + + /// Lists all the processes that were created in the flow, same ID as `processes` + /// but with the type name of the tag. + pub(super) process_id_name: Vec<(usize, String)>, + pub(super) externals: HashMap, + pub(super) external_id_name: Vec<(usize, String)>, + pub(super) clusters: HashMap, + pub(super) cluster_id_name: Vec<(usize, String)>, pub(super) used: bool, pub(super) _phantom: Invariant<'a, D>, @@ -52,13 +62,25 @@ impl<'a, D: LocalDeploy<'a>> DeployFlow<'a, D> { spec: impl IntoProcessSpec<'a, D>, ) -> Self { let tag_name = std::any::type_name::

().to_string(); - self.nodes.insert( + self.processes.insert( process.id, spec.into_process_spec().build(process.id, &tag_name), ); self } + pub fn with_remaining_processes + 'a>( + mut self, + spec: impl Fn() -> S, + ) -> Self { + for (id, name) in &self.process_id_name { + self.processes + .insert(*id, spec().into_process_spec().build(*id, name)); + } + + self + } + pub fn with_external

( mut self, process: &ExternalProcess

, @@ -70,6 +92,17 @@ impl<'a, D: LocalDeploy<'a>> DeployFlow<'a, D> { self } + pub fn with_remaining_externals + 'a>( + mut self, + spec: impl Fn() -> S, + ) -> Self { + for (id, name) in &self.external_id_name { + self.externals.insert(*id, spec().build(*id, name)); + } + + self + } + pub fn with_cluster(mut self, cluster: &Cluster, spec: impl ClusterSpec<'a, D>) -> Self { let tag_name = std::any::type_name::().to_string(); self.clusters @@ -77,6 +110,17 @@ impl<'a, D: LocalDeploy<'a>> DeployFlow<'a, D> { self } + pub fn with_remaining_clusters + 'a>( + mut self, + spec: impl Fn() -> S, + ) -> Self { + for (id, name) in &self.cluster_id_name { + self.clusters.insert(*id, spec().build(*id, name)); + } + + self + } + pub fn compile_no_network(mut self) -> CompiledFlow<'a, D::GraphId> { self.used = true; @@ -99,7 +143,7 @@ impl<'a, D: Deploy<'a>> DeployFlow<'a, D> { leaf.compile_network::( env, &mut seen_tees, - &self.nodes, + &self.processes, &self.clusters, &self.externals, ) @@ -130,7 +174,7 @@ impl<'a, D: Deploy<'a>> DeployFlow<'a, D> { let #self_id_ident = #self_id_expr; }); - for other_location in self.nodes.keys().chain(self.clusters.keys()) { + for other_location in self.processes.keys().chain(self.clusters.keys()) { let other_id_ident = syn::Ident::new( &format!("__hydro_lang_cluster_ids_{}", c_id), Span::call_site(), @@ -160,7 +204,7 @@ impl<'a, D: Deploy<'a, CompileEnv = ()>> DeployFlow<'a, D> { leaf.compile_network::( &(), &mut seen_tees_instantiate, - &self.nodes, + &self.processes, &self.clusters, &self.externals, ) @@ -172,7 +216,7 @@ impl<'a, D: Deploy<'a, CompileEnv = ()>> DeployFlow<'a, D> { let mut meta = D::Meta::default(); let (mut processes, mut clusters, mut externals) = ( - std::mem::take(&mut self.nodes) + std::mem::take(&mut self.processes) .into_iter() .filter_map(|(node_id, node)| { if let Some(ir) = compiled.remove(&node_id) { diff --git a/hydro_lang/src/builder/mod.rs b/hydro_lang/src/builder/mod.rs index 894fb99a714..6aa6859cd86 100644 --- a/hydro_lang/src/builder/mod.rs +++ b/hydro_lang/src/builder/mod.rs @@ -1,3 +1,4 @@ +use std::any::type_name; use std::cell::RefCell; use std::collections::HashMap; use std::marker::PhantomData; @@ -44,8 +45,9 @@ pub const FLOW_USED_MESSAGE: &str = "Attempted to add a leaf to a flow that has pub struct FlowBuilder<'a> { flow_state: FlowState, - nodes: RefCell>, - clusters: RefCell>, + processes: RefCell>, + clusters: RefCell>, + externals: RefCell>, next_node_id: RefCell, @@ -87,8 +89,9 @@ impl<'a> FlowBuilder<'a> { cycle_counts: HashMap::new(), next_clock_id: 0, })), - nodes: RefCell::new(vec![]), + processes: RefCell::new(vec![]), clusters: RefCell::new(vec![]), + externals: RefCell::new(vec![]), next_node_id: RefCell::new(0), finalized: false, _phantom: PhantomData, @@ -101,8 +104,9 @@ impl<'a> FlowBuilder<'a> { built::BuiltFlow { ir: self.flow_state.borrow_mut().leaves.take().unwrap(), - processes: self.nodes.replace(vec![]), - clusters: self.clusters.replace(vec![]), + process_id_name: self.processes.replace(vec![]), + cluster_id_name: self.clusters.replace(vec![]), + external_id_name: self.externals.replace(vec![]), used: false, _phantom: PhantomData, } @@ -130,7 +134,9 @@ impl<'a> FlowBuilder<'a> { let id = *next_node_id; *next_node_id += 1; - self.nodes.borrow_mut().push(id); + self.processes + .borrow_mut() + .push((id, type_name::

().to_string())); Process { id, @@ -144,7 +150,9 @@ impl<'a> FlowBuilder<'a> { let id = *next_node_id; *next_node_id += 1; - self.nodes.borrow_mut().push(id); + self.externals + .borrow_mut() + .push((id, type_name::

().to_string())); ExternalProcess { id, @@ -158,7 +166,9 @@ impl<'a> FlowBuilder<'a> { let id = *next_node_id; *next_node_id += 1; - self.clusters.borrow_mut().push(id); + self.clusters + .borrow_mut() + .push((id, type_name::().to_string())); Cluster { id, @@ -176,6 +186,14 @@ impl<'a> FlowBuilder<'a> { self.with_default_optimize().with_process(process, spec) } + #[cfg(feature = "build")] + pub fn with_remaining_processes, S: IntoProcessSpec<'a, D> + 'a>( + self, + spec: impl Fn() -> S, + ) -> DeployFlow<'a, D> { + self.with_default_optimize().with_remaining_processes(spec) + } + #[cfg(feature = "build")] pub fn with_external>( self, @@ -185,6 +203,14 @@ impl<'a> FlowBuilder<'a> { self.with_default_optimize().with_external(process, spec) } + #[cfg(feature = "build")] + pub fn with_remaining_externals, S: ExternalSpec<'a, D> + 'a>( + self, + spec: impl Fn() -> S, + ) -> DeployFlow<'a, D> { + self.with_default_optimize().with_remaining_externals(spec) + } + #[cfg(feature = "build")] pub fn with_cluster>( self, @@ -194,6 +220,14 @@ impl<'a> FlowBuilder<'a> { self.with_default_optimize().with_cluster(cluster, spec) } + #[cfg(feature = "build")] + pub fn with_remaining_clusters, S: ClusterSpec<'a, D> + 'a>( + self, + spec: impl Fn() -> S, + ) -> DeployFlow<'a, D> { + self.with_default_optimize().with_remaining_clusters(spec) + } + #[cfg(feature = "build")] pub fn compile>(self, env: &D::CompileEnv) -> CompiledFlow<'a, D::GraphId> { self.with_default_optimize::().compile(env) diff --git a/hydro_lang/src/deploy/in_memory_graph.rs b/hydro_lang/src/deploy/in_memory_graph.rs index e9af6c9f9bc..b6e5d6b3ab9 100644 --- a/hydro_lang/src/deploy/in_memory_graph.rs +++ b/hydro_lang/src/deploy/in_memory_graph.rs @@ -22,6 +22,10 @@ impl LocalDeploy<'_> for SingleProcessGraph { fn trivial_cluster(_id: usize) -> Self::Cluster { SingleNode {} } + + fn trivial_external(_id: usize) -> Self::ExternalProcess { + SingleNode {} + } } impl ProcessSpec<'_, SingleProcessGraph> for () { @@ -74,6 +78,10 @@ impl LocalDeploy<'_> for MultiGraph { fn trivial_cluster(_id: usize) -> Self::Cluster { MultiNode {} } + + fn trivial_external(_id: usize) -> Self::ExternalProcess { + MultiNode {} + } } impl ProcessSpec<'_, MultiGraph> for () { diff --git a/hydro_lang/src/deploy/mod.rs b/hydro_lang/src/deploy/mod.rs index 3e76af6ffa5..5ec1e175c83 100644 --- a/hydro_lang/src/deploy/mod.rs +++ b/hydro_lang/src/deploy/mod.rs @@ -50,6 +50,10 @@ pub trait LocalDeploy<'a> { fn trivial_cluster(_id: usize) -> Self::Cluster { panic!("No trivial cluster") } + + fn trivial_external(_id: usize) -> Self::ExternalProcess { + panic!("No trivial external") + } } pub trait Deploy<'a> { diff --git a/hydro_lang/src/ir.rs b/hydro_lang/src/ir.rs index b1a68f19960..2ffd432d0f2 100644 --- a/hydro_lang/src/ir.rs +++ b/hydro_lang/src/ir.rs @@ -95,13 +95,13 @@ impl HydroLeaf { self, compile_env: &D::CompileEnv, seen_tees: &mut SeenTees, - nodes: &HashMap, + processes: &HashMap, clusters: &HashMap, externals: &HashMap, ) -> HydroLeaf { self.transform_children( |n, s| { - n.compile_network::(compile_env, s, nodes, clusters, externals); + n.compile_network::(compile_env, s, processes, clusters, externals); }, seen_tees, ) diff --git a/hydro_lang/src/test_util.rs b/hydro_lang/src/test_util.rs index f1712848ea5..e862e3bcc38 100644 --- a/hydro_lang/src/test_util.rs +++ b/hydro_lang/src/test_util.rs @@ -7,6 +7,38 @@ use serde::Serialize; use crate::{FlowBuilder, Process, Stream, Unbounded}; +pub async fn multi_location_test< + 'a, + O: Serialize + DeserializeOwned + 'static, + C: Future, + OutOrder, +>( + thunk: impl FnOnce( + &FlowBuilder<'a>, + &Process<'a, ()>, + ) -> Stream, Unbounded, OutOrder>, + check: impl FnOnce(Pin>>) -> C, +) { + let mut deployment = hydro_deploy::Deployment::new(); + let flow = FlowBuilder::new(); + let process = flow.process::<()>(); + let external = flow.external_process::<()>(); + let out = thunk(&flow, &process); + let out_port = out.send_bincode_external(&external); + let nodes = flow + .with_remaining_processes(|| deployment.Localhost()) + .with_remaining_clusters(|| vec![deployment.Localhost(); 4]) + .with_external(&external, deployment.Localhost()) + .deploy(&mut deployment); + + deployment.deploy().await.unwrap(); + + let external_out = nodes.connect_source_bincode(out_port).await; + deployment.start().await.unwrap(); + + check(external_out).await; +} + pub async fn stream_transform_test< 'a, O: Serialize + DeserializeOwned + 'static,