diff --git a/Cargo.lock b/Cargo.lock index 90df0f745286..4f96c0d84eb3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1499,8 +1499,10 @@ dependencies = [ "async-channel", "hydro_cli", "hydroflow_plus", - "proc-macro-crate", + "serde", + "serde_json", "stageleft", + "stageleft_tool", "syn 2.0.14", "tokio", ] @@ -1509,6 +1511,7 @@ dependencies = [ name = "hydroflow_plus_test" version = "0.0.0" dependencies = [ + "futures", "hydro_cli", "hydroflow", "hydroflow_plus", diff --git a/docs/docs/hydroflow_plus/distributed.mdx b/docs/docs/hydroflow_plus/distributed.mdx index f5a9a3416bb9..12e793e3e3aa 100644 --- a/docs/docs/hydroflow_plus/distributed.mdx +++ b/docs/docs/hydroflow_plus/distributed.mdx @@ -12,7 +12,7 @@ use stageleft::*; pub fn first_ten<'a, D: HfDeploy<'a>>( graph: &'a HfBuilder<'a, D>, - node_builder: &mut impl HfNodeBuilder<'a, D> + node_builder: &impl HfNodeBuilder<'a, D> ) { let node = graph.node(node_builder); let numbers = node.source_iter(q!(0..10)); @@ -29,7 +29,7 @@ use stageleft::*; pub fn first_ten_distributed<'a, D: HfNetworkedDeploy<'a>>( graph: &'a HfBuilder<'a, D>, - node_builder: &mut impl HfNodeBuilder<'a, D> + node_builder: &impl HfNodeBuilder<'a, D> ) { let node = graph.node(node_builder); let second_node = graph.node(node_builder); @@ -46,11 +46,11 @@ numbers ``` ## The Runtime -Now that our graph spans multiple nodes, our runtime entrypoint will involve multiple subgraphs. This means we can't get away with hardcoding node 0 as before. Instead, we must take the node ID as a runtime parameter (`node_id`) to select the appropriate graph. In addition, our dataflow involves the network, so we take a `HydroCLI` runtime parameter (`cli`) so that nodes can look up their network connections and instantiate the flow graph with access to it (via `CLIRuntimeNodeBuilder`). +Now that our graph spans multiple nodes, our runtime entrypoint will involve multiple subgraphs. This means we can't get away with hardcoding node 0 as before. Instead, we must take the node ID as a runtime parameter (`node_id`) to select the appropriate graph. In addition, our dataflow involves the network, so we take a `HydroCLI` runtime parameter (`cli`) so that nodes can look up their network connections and instantiate the flow graph with access to it. ```rust use hydroflow::util::cli::HydroCLI; -use hydroflow_plus_cli_integration::{CLIRuntime, CLIRuntimeNodeBuilder}; +use hydroflow_plus_cli_integration::CLIRuntime; #[stageleft::entry] pub fn first_ten_distributed_runtime<'a>( @@ -58,7 +58,7 @@ pub fn first_ten_distributed_runtime<'a>( cli: RuntimeData<&'a HydroCLI>, node_id: RuntimeData, ) -> impl Quoted<'a, Hydroflow<'a>> { - let _ = first_ten_distributed(graph, &mut CLIRuntimeNodeBuilder::new(cli)); + let _ = first_ten_distributed(graph, &cli); graph.build(node_id) } ``` @@ -92,7 +92,7 @@ async fn main() { let builder = hydroflow_plus::HfBuilder::new(); hydroflow_plus_test::first_ten::first_ten_distributed( &builder, - &mut CLIDeployNodeBuilder::new(|id| { + &CLIDeployNodeBuilder::new(|id| { deployment.HydroflowCrate( ".", localhost.clone(), @@ -106,6 +106,7 @@ async fn main() { ) }), ); + builder.wire(); // sets up network connections deployment.deploy().await.unwrap(); diff --git a/docs/docs/hydroflow_plus/structure.mdx b/docs/docs/hydroflow_plus/structure.mdx index 3650c3128a53..9a85f5215fbd 100644 --- a/docs/docs/hydroflow_plus/structure.mdx +++ b/docs/docs/hydroflow_plus/structure.mdx @@ -19,7 +19,7 @@ use stageleft::*; pub fn first_ten<'a, D: HfDeploy<'a>>( graph: &'a HfBuilder<'a, D>, - node_builder: &mut impl HfNodeBuilder<'a, D> + node_builder: &impl HfNodeBuilder<'a, D> ) {} ``` @@ -28,7 +28,7 @@ The `graph` variable gives us access to the builder API for constructing the flo ```rust pub fn first_ten<'a, D: HfDeploy<'a>>( graph: &'a HfBuilder<'a, D>, - node_builder: &mut impl HfNodeBuilder<'a, D> + node_builder: &impl HfNodeBuilder<'a, D> ) { let node = graph.node(node_builder); } @@ -56,7 +56,7 @@ To define the entrypoint, we use the `#[stageleft::entry]` macro, which takes th pub fn first_ten_runtime<'a>( graph: &'a HfBuilder<'a, SingleGraph> ) -> impl Quoted<'a, Hydroflow<'a>> { - first_ten(graph, &mut ()); + first_ten(graph, &()); graph.build(q!(0)) } ``` diff --git a/hydro_cli/src/core/hydroflow_crate/mod.rs b/hydro_cli/src/core/hydroflow_crate/mod.rs index 4e7cb021ca68..369e5422c54b 100644 --- a/hydro_cli/src/core/hydroflow_crate/mod.rs +++ b/hydro_cli/src/core/hydroflow_crate/mod.rs @@ -7,7 +7,7 @@ use anyhow::{anyhow, bail, Result}; use async_channel::Receiver; use async_trait::async_trait; use futures_core::Future; -use hydroflow_cli_integration::ServerPort; +use hydroflow_cli_integration::{ServerBindConfig, ServerPort}; use tokio::sync::RwLock; use self::ports::{HydroflowPortConfig, HydroflowSink, SourcePath}; @@ -33,6 +33,8 @@ pub struct HydroflowCrate { display_id: Option, external_ports: Vec, + meta: Option, + target_type: HostTargetType, /// Configuration for the ports this service will connect to as a client. @@ -81,6 +83,7 @@ impl HydroflowCrate { display_id, target_type, external_ports, + meta: None, port_to_server: HashMap::new(), port_to_bind: HashMap::new(), built_binary: Arc::new(async_once_cell::OnceCell::new()), @@ -91,6 +94,14 @@ impl HydroflowCrate { } } + pub fn set_meta(&mut self, meta: String) { + if self.meta.is_some() { + panic!("meta already set"); + } + + self.meta = Some(meta); + } + pub fn get_port( &self, name: String, @@ -194,6 +205,8 @@ impl HydroflowCrate { } } +type InitConfig = (HashMap, Option); + #[async_trait] impl Service for HydroflowCrate { fn collect_resources(&mut self, _resource_batch: &mut ResourceBatch) { @@ -276,7 +289,8 @@ impl Service for HydroflowCrate { bind_config.insert(port_name.clone(), launched_host.server_config(bind_type)); } - let formatted_bind_config = serde_json::to_string(&bind_config).unwrap(); + let formatted_bind_config = + serde_json::to_string::(&(bind_config, self.meta.clone())).unwrap(); // request stdout before sending config so we don't miss the "ready" response let stdout_receiver = binary.write().await.cli_stdout().await; diff --git a/hydro_cli_examples/examples/dedalus_2pc_coordinator/main.rs b/hydro_cli_examples/examples/dedalus_2pc_coordinator/main.rs index 095999b50395..055df4f205af 100644 --- a/hydro_cli_examples/examples/dedalus_2pc_coordinator/main.rs +++ b/hydro_cli_examples/examples/dedalus_2pc_coordinator/main.rs @@ -4,7 +4,7 @@ use hydroflow_datalog::datalog; #[hydroflow::main] async fn main() { - let ports = hydroflow::util::cli::init().await; + let ports = hydroflow::util::cli::init::<()>().await; let vote_to_participant_port = ports .port("vote_to_participant") .connect::>() diff --git a/hydro_cli_examples/examples/dedalus_2pc_participant/main.rs b/hydro_cli_examples/examples/dedalus_2pc_participant/main.rs index 72b425b6118c..a96a325262cd 100644 --- a/hydro_cli_examples/examples/dedalus_2pc_participant/main.rs +++ b/hydro_cli_examples/examples/dedalus_2pc_participant/main.rs @@ -4,7 +4,7 @@ use hydroflow_datalog::datalog; #[hydroflow::main] async fn main() { - let ports = hydroflow::util::cli::init().await; + let ports = hydroflow::util::cli::init::<()>().await; let vote_to_participant_source = ports .port("vote_to_participant") .connect::() diff --git a/hydro_cli_examples/examples/dedalus_receiver/main.rs b/hydro_cli_examples/examples/dedalus_receiver/main.rs index bbbdd0ed5508..a1fb8f98d827 100644 --- a/hydro_cli_examples/examples/dedalus_receiver/main.rs +++ b/hydro_cli_examples/examples/dedalus_receiver/main.rs @@ -4,7 +4,7 @@ use hydroflow_datalog::datalog; #[hydroflow::main] async fn main() { - let ports = hydroflow::util::cli::init().await; + let ports = hydroflow::util::cli::init::<()>().await; let broadcast_recv = ports .port("broadcast") .connect::() diff --git a/hydro_cli_examples/examples/dedalus_sender/main.rs b/hydro_cli_examples/examples/dedalus_sender/main.rs index 48a9c04109dd..34f38194f54f 100644 --- a/hydro_cli_examples/examples/dedalus_sender/main.rs +++ b/hydro_cli_examples/examples/dedalus_sender/main.rs @@ -5,7 +5,7 @@ use hydroflow_datalog::datalog; #[hydroflow::main] async fn main() { - let ports = hydroflow::util::cli::init().await; + let ports = hydroflow::util::cli::init::<()>().await; let broadcast_port = ports .port("broadcast") .connect::>() diff --git a/hydro_cli_examples/examples/dedalus_vote_leader/main.rs b/hydro_cli_examples/examples/dedalus_vote_leader/main.rs index e8a1879e4c9e..22ae1b349c31 100644 --- a/hydro_cli_examples/examples/dedalus_vote_leader/main.rs +++ b/hydro_cli_examples/examples/dedalus_vote_leader/main.rs @@ -4,7 +4,7 @@ use hydroflow_datalog::datalog; #[hydroflow::main] async fn main() { - let ports = hydroflow::util::cli::init().await; + let ports = hydroflow::util::cli::init::<()>().await; let to_replica_port = ports .port("to_replica") .connect::>() diff --git a/hydro_cli_examples/examples/dedalus_vote_participant/main.rs b/hydro_cli_examples/examples/dedalus_vote_participant/main.rs index b34ae218ea3e..262be0dfd7ef 100644 --- a/hydro_cli_examples/examples/dedalus_vote_participant/main.rs +++ b/hydro_cli_examples/examples/dedalus_vote_participant/main.rs @@ -4,7 +4,7 @@ use hydroflow_datalog::datalog; #[hydroflow::main] async fn main() { - let ports = hydroflow::util::cli::init().await; + let ports = hydroflow::util::cli::init::<()>().await; let to_replica_source = ports .port("to_replica") .connect::() diff --git a/hydro_cli_examples/examples/empty_program/main.rs b/hydro_cli_examples/examples/empty_program/main.rs index 445fb46c2204..0794176236f5 100644 --- a/hydro_cli_examples/examples/empty_program/main.rs +++ b/hydro_cli_examples/examples/empty_program/main.rs @@ -1,6 +1,6 @@ #[hydroflow::main] async fn main() { - let _ = hydroflow::util::cli::init().await; + let _ = hydroflow::util::cli::init::<()>().await; loop { tokio::time::sleep(std::time::Duration::from_secs(1)).await; } diff --git a/hydro_cli_examples/examples/stdout_receiver/main.rs b/hydro_cli_examples/examples/stdout_receiver/main.rs index 5ee8fd4f6f96..3a575a8854cd 100644 --- a/hydro_cli_examples/examples/stdout_receiver/main.rs +++ b/hydro_cli_examples/examples/stdout_receiver/main.rs @@ -3,7 +3,7 @@ use hydroflow::util::cli::{ConnectedDirect, ConnectedSource}; #[hydroflow::main] async fn main() { - let ports = hydroflow::util::cli::init().await; + let ports = hydroflow::util::cli::init::<()>().await; let echo_recv = ports .port("echo") .connect::() diff --git a/hydro_cli_examples/examples/tagged_stdout_receiver/main.rs b/hydro_cli_examples/examples/tagged_stdout_receiver/main.rs index 4ce51e27e21b..4859c59e4584 100644 --- a/hydro_cli_examples/examples/tagged_stdout_receiver/main.rs +++ b/hydro_cli_examples/examples/tagged_stdout_receiver/main.rs @@ -3,7 +3,7 @@ use hydroflow::util::cli::{ConnectedDirect, ConnectedSource, ConnectedTagged}; #[hydroflow::main] async fn main() { - let ports = hydroflow::util::cli::init().await; + let ports = hydroflow::util::cli::init::<()>().await; let echo_recv = ports .port("echo") .connect::>() diff --git a/hydro_cli_examples/examples/ws_chat_server/main.rs b/hydro_cli_examples/examples/ws_chat_server/main.rs index ec0b9d40da88..fc8ac3b4d787 100644 --- a/hydro_cli_examples/examples/ws_chat_server/main.rs +++ b/hydro_cli_examples/examples/ws_chat_server/main.rs @@ -29,7 +29,7 @@ struct ChatMessage { #[hydroflow::main] async fn main() { - let ports = hydroflow::util::cli::init().await; + let ports = hydroflow::util::cli::init::<()>().await; let from_peer = ports .port("from_peer") diff --git a/hydroflow/src/util/cli.rs b/hydroflow/src/util/cli.rs index fbfc07246060..fb0516482bc6 100644 --- a/hydroflow/src/util/cli.rs +++ b/hydroflow/src/util/cli.rs @@ -4,6 +4,7 @@ use std::cell::RefCell; use std::collections::HashMap; pub use hydroflow_cli_integration::*; +use serde::de::DeserializeOwned; use crate::scheduled::graph::Hydroflow; @@ -27,11 +28,12 @@ pub async fn launch_flow(mut flow: Hydroflow<'_>) { } } -pub struct HydroCLI { +pub struct HydroCLI { ports: RefCell>, + pub meta: Option, } -impl HydroCLI { +impl HydroCLI { pub fn port(&self, name: &str) -> ServerOrBound { self.ports .try_borrow_mut() @@ -41,17 +43,19 @@ impl HydroCLI { } } -pub async fn init() -> HydroCLI { +type InitConfig = (HashMap, Option); + +pub async fn init() -> HydroCLI { let mut input = String::new(); std::io::stdin().read_line(&mut input).unwrap(); let trimmed = input.trim(); - let bind_config = serde_json::from_str::>(trimmed).unwrap(); + let bind_config = serde_json::from_str::(trimmed).unwrap(); // config telling other services how to connect to me let mut bind_results: HashMap = HashMap::new(); let mut binds = HashMap::new(); - for (name, config) in bind_config { + for (name, config) in bind_config.0 { let bound = config.bind().await; bind_results.insert(name.clone(), bound.sink_port()); binds.insert(name.clone(), bound); @@ -84,5 +88,6 @@ pub async fn init() -> HydroCLI { HydroCLI { ports: RefCell::new(all_connected), + meta: bind_config.1.map(|s| serde_json::from_str(&s).unwrap()), } } diff --git a/hydroflow_plus/src/builder.rs b/hydroflow_plus/src/builder.rs index dc9f5a864663..0f9660ee1f45 100644 --- a/hydroflow_plus/src/builder.rs +++ b/hydroflow_plus/src/builder.rs @@ -1,6 +1,7 @@ use std::cell::RefCell; use std::collections::BTreeMap; use std::marker::PhantomData; +use std::ops::Deref; use hydroflow_lang::graph::{ eliminate_extra_unions_tees, partition_graph, propegate_flow_props, FlatGraphBuilder, @@ -10,7 +11,7 @@ use quote::quote; use stageleft::{Quoted, QuotedContext}; use syn::parse_quote; -use crate::node::{HfDeploy, HfNodeBuilder}; +use crate::node::{HfClusterBuilder, HfDeploy, HfNode, HfNodeBuilder}; use crate::{HfBuilt, RuntimeContext}; pub type Builders = RefCell>>; @@ -19,6 +20,8 @@ pub struct HfBuilder<'a, D: HfDeploy<'a> + ?Sized> { pub(crate) next_id: RefCell, pub(crate) builders: Builders, nodes: RefCell>, + clusters: RefCell>, + pub meta: RefCell>, next_node_id: RefCell, _phantom: PhantomData<&'a mut &'a ()>, } @@ -36,6 +39,8 @@ impl<'a, D: HfDeploy<'a>> HfBuilder<'a, D> { next_id: RefCell::new(0), builders: RefCell::new(Some(Default::default())), nodes: RefCell::new(Vec::new()), + clusters: RefCell::new(Vec::new()), + meta: RefCell::new(Default::default()), next_node_id: RefCell::new(0), _phantom: PhantomData, } @@ -45,7 +50,7 @@ impl<'a, D: HfDeploy<'a>> HfBuilder<'a, D> { (&self.next_id, &self.builders) } - pub fn node(&'a self, builder: &mut impl HfNodeBuilder<'a, D>) -> D::Node { + pub fn node(&'a self, builder: &impl HfNodeBuilder<'a, D>) -> D::Node { let mut next_node_id = self.next_node_id.borrow_mut(); let id = *next_node_id; *next_node_id += 1; @@ -55,7 +60,52 @@ impl<'a, D: HfDeploy<'a>> HfBuilder<'a, D> { node } + pub fn cluster(&'a self, builder: &impl HfClusterBuilder<'a, D>) -> D::Cluster { + let mut next_node_id = self.next_node_id.borrow_mut(); + let id = *next_node_id; + *next_node_id += 1; + + let cluster = builder.build(id, self); + self.clusters.borrow_mut().push(cluster.clone()); + cluster + } + + pub fn runtime_context(&self) -> RuntimeContext<'a> { + RuntimeContext { + _phantom: PhantomData, + } + } +} + +impl<'a, D: HfDeploy<'a, RuntimeID = ()>> HfBuilder<'a, D> { + pub fn wire(&self) { + let meta_borrow = self.meta.borrow(); + let meta = meta_borrow.deref(); + self.nodes + .borrow_mut() + .iter_mut() + .for_each(|n| n.build(meta)); + self.clusters + .borrow_mut() + .iter_mut() + .for_each(|n| n.build(meta)); + } +} + +impl<'a, D: HfDeploy<'a, RuntimeID = usize>> HfBuilder<'a, D> { pub fn build(&self, id: impl Quoted<'a, usize>) -> HfBuilt<'a> { + let meta_borrow = self.meta.borrow(); + let meta = meta_borrow.deref(); + self.nodes + .borrow_mut() + .iter_mut() + .for_each(|n| n.build(meta)); + self.clusters + .borrow_mut() + .iter_mut() + .for_each(|n| n.build(meta)); + drop(meta_borrow); + let builders = self.builders.borrow_mut().take().unwrap(); let mut conditioned_tokens = None; @@ -113,10 +163,4 @@ impl<'a, D: HfDeploy<'a>> HfBuilder<'a, D> { _phantom: PhantomData, } } - - pub fn runtime_context(&self) -> RuntimeContext<'a> { - RuntimeContext { - _phantom: PhantomData, - } - } } diff --git a/hydroflow_plus/src/node/graphs.rs b/hydroflow_plus/src/node/graphs.rs index 7f22f1a0ad50..7582a0c1986f 100644 --- a/hydroflow_plus/src/node/graphs.rs +++ b/hydroflow_plus/src/node/graphs.rs @@ -1,8 +1,8 @@ use std::cell::RefCell; -use hydroflow_lang::parse::Pipeline; +use stageleft::{Quoted, RuntimeData}; -use super::{HfDeploy, HfNode, HfNodeBuilder}; +use super::{HfCluster, HfDeploy, HfNode, HfNodeBuilder}; use crate::builder::Builders; use crate::HfBuilder; @@ -10,10 +10,13 @@ pub struct SingleGraph {} impl<'a> HfDeploy<'a> for SingleGraph { type Node = SingleNode<'a>; + type Cluster = SingleNode<'a>; + type Meta = (); + type RuntimeID = usize; } impl<'a> HfNodeBuilder<'a, SingleGraph> for () { - fn build(&mut self, _id: usize, builder: &'a HfBuilder<'a, SingleGraph>) -> SingleNode<'a> { + fn build(&self, _id: usize, builder: &'a HfBuilder<'a, SingleGraph>) -> SingleNode<'a> { SingleNode { builder } } } @@ -25,6 +28,7 @@ pub struct SingleNode<'a> { impl<'a> HfNode<'a> for SingleNode<'a> { type Port = (); + type Meta = (); fn id(&self) -> usize { 0 @@ -38,12 +42,14 @@ impl<'a> HfNode<'a> for SingleNode<'a> { panic!(); } - fn gen_source_statement(&self, _port: &()) -> Pipeline { - panic!(); - } + fn build(&mut self, _meta: &Option) {} +} - fn gen_sink_statement(&self, _port: &()) -> Pipeline { +impl<'a> HfCluster<'a> for SingleNode<'a> { + fn ids(&self) -> impl Quoted<'a, &'a Vec> { panic!(); + #[allow(unreachable_code)] + RuntimeData::new("") } } @@ -51,10 +57,13 @@ pub struct MultiGraph {} impl<'a> HfDeploy<'a> for MultiGraph { type Node = MultiNode<'a>; + type Cluster = MultiNode<'a>; + type Meta = (); + type RuntimeID = usize; } impl<'a> HfNodeBuilder<'a, MultiGraph> for () { - fn build(&mut self, id: usize, builder: &'a HfBuilder<'a, MultiGraph>) -> MultiNode<'a> { + fn build(&self, id: usize, builder: &'a HfBuilder<'a, MultiGraph>) -> MultiNode<'a> { MultiNode { builder, id } } } @@ -67,6 +76,7 @@ pub struct MultiNode<'a> { impl<'a> HfNode<'a> for MultiNode<'a> { type Port = (); + type Meta = (); fn id(&self) -> usize { self.id @@ -80,11 +90,13 @@ impl<'a> HfNode<'a> for MultiNode<'a> { panic!(); } - fn gen_source_statement(&self, _port: &()) -> Pipeline { - panic!(); - } + fn build(&mut self, _meta: &Option) {} +} - fn gen_sink_statement(&self, _port: &()) -> Pipeline { +impl<'a> HfCluster<'a> for MultiNode<'a> { + fn ids(&self) -> impl Quoted<'a, &'a Vec> { panic!(); + #[allow(unreachable_code)] + RuntimeData::new("") } } diff --git a/hydroflow_plus/src/node/mod.rs b/hydroflow_plus/src/node/mod.rs index aed0c818485e..15ce7e73af65 100644 --- a/hydroflow_plus/src/node/mod.rs +++ b/hydroflow_plus/src/node/mod.rs @@ -16,35 +16,67 @@ mod graphs; pub use graphs::*; pub trait HfDeploy<'a> { - type Node: HfNode<'a>; + type Node: HfNode<'a, Meta = Self::Meta>; + type Cluster: HfNode<'a, Meta = Self::Meta> + HfCluster<'a>; + type Meta; + type RuntimeID; } -pub trait HfNetworkedDeploy<'a>: HfDeploy<'a, Node = Self::NetworkedNode> { - type NetworkedNode: HfNode<'a, Port = Self::Port> + HfSendTo<'a, Self::NetworkedNode>; - type Port; +pub trait HfNetworkedDeploy<'a>: + HfDeploy<'a, Node = Self::NetworkedNode, Cluster = Self::NetworkedCluster> +{ + type NetworkedNode: HfNode<'a, Port = Self::NodePort> + + HfSendTo<'a, Self::NetworkedNode> + + HfDemuxTo<'a, Self::NetworkedCluster>; + type NetworkedCluster: HfNode<'a, Port = Self::ClusterPort> + HfCluster<'a>; + type NodePort; + type ClusterPort; } -impl<'a, T: HfDeploy<'a, Node = N>, N: HfSendTo<'a, N>> HfNetworkedDeploy<'a> for T { +impl< + 'a, + T: HfDeploy<'a, Node = N, Cluster = C>, + N: HfNode<'a> + HfSendTo<'a, N> + HfDemuxTo<'a, C>, + C: HfNode<'a> + HfCluster<'a>, + > HfNetworkedDeploy<'a> for T +{ type NetworkedNode = N; - type Port = N::Port; + type NetworkedCluster = C; + type NodePort = N::Port; + type ClusterPort = C::Port; } pub trait HfNodeBuilder<'a, D: HfDeploy<'a> + ?Sized> { - fn build(&mut self, id: usize, builder: &'a HfBuilder<'a, D>) -> D::Node; + fn build(&self, id: usize, builder: &'a HfBuilder<'a, D>) -> D::Node; +} + +pub trait HfClusterBuilder<'a, D: HfDeploy<'a> + ?Sized> { + fn build(&self, id: usize, builder: &'a HfBuilder<'a, D>) -> D::Cluster; } pub trait HfSendTo<'a, O: HfNode<'a>>: HfNode<'a> { fn send_to(&self, other: &O, source_port: &Self::Port, recipient_port: &O::Port); + + fn gen_sink_statement(&self, port: &Self::Port) -> Pipeline; + fn gen_source_statement(other: &O, port: &O::Port) -> Pipeline; +} + +pub trait HfDemuxTo<'a, O: HfNode<'a>>: HfNode<'a> { + fn demux_to(&self, other: &O, source_port: &Self::Port, recipient_port: &O::Port); + + fn gen_sink_statement(&self, port: &Self::Port) -> Pipeline; + fn gen_source_statement(other: &O, port: &O::Port) -> Pipeline; } pub trait HfNode<'a>: Clone { type Port; + type Meta; fn id(&self) -> usize; fn graph_builder(&self) -> (&'a RefCell, &'a Builders); fn next_port(&self) -> Self::Port; - fn gen_source_statement(&self, port: &Self::Port) -> Pipeline; - fn gen_sink_statement(&self, port: &Self::Port) -> Pipeline; + + fn build(&mut self, meta: &Option); fn source_stream + Unpin>( &self, @@ -81,7 +113,10 @@ pub trait HfNode<'a>: Clone { } } - fn source_external(&self) -> (Self::Port, HfStream<'a, Result, Self>) { + fn source_external(&self) -> (Self::Port, HfStream<'a, Result, Self>) + where + Self: HfSendTo<'a, Self>, + { let (next_id_cell, builders) = self.graph_builder(); let next_id = { @@ -93,7 +128,7 @@ pub trait HfNode<'a>: Clone { let ident = syn::Ident::new(&format!("stream_{}", next_id), Span::call_site()); let port = self.next_port(); - let source_pipeline = self.gen_source_statement(&port); + let source_pipeline = Self::gen_source_statement(self, &port); builders .borrow_mut() @@ -191,3 +226,7 @@ pub trait HfNode<'a>: Clone { ) } } + +pub trait HfCluster<'a> { + fn ids(&self) -> impl Quoted<'a, &'a Vec>; +} diff --git a/hydroflow_plus/src/stream.rs b/hydroflow_plus/src/stream.rs index 8444e3ed9d19..a9a5be118522 100644 --- a/hydroflow_plus/src/stream.rs +++ b/hydroflow_plus/src/stream.rs @@ -13,7 +13,7 @@ use stageleft::{IntoQuotedMut, Quoted}; use syn::parse_quote; use crate::builder::Builders; -use crate::node::{HfNode, HfSendTo}; +use crate::node::{HfDemuxTo, HfNode, HfSendTo}; pub struct HfStream<'a, T, N: HfNode<'a>> { pub(crate) ident: syn::Ident, @@ -425,7 +425,7 @@ impl<'a, N: HfNode<'a>> HfStream<'a, Bytes, N> { let ident = syn::Ident::new(&format!("stream_{}", recipient_next_id), Span::call_site()); let recipient_port_name = other.next_port(); - let recipient_source = other.gen_source_statement(&recipient_port_name); + let recipient_source = N::gen_source_statement(other, &recipient_port_name); builders .entry(other.id()) @@ -491,7 +491,7 @@ impl<'a, T: Serialize + DeserializeOwned, N: HfNode<'a>> HfStream<'a, T, N> { let ident = syn::Ident::new(&format!("stream_{}", recipient_next_id), Span::call_site()); let recipient_port_name = other.next_port(); - let recipient_source = other.gen_source_statement(&recipient_port_name); + let recipient_source = N::gen_source_statement(other, &recipient_port_name); builders .entry(other.id()) @@ -514,6 +514,130 @@ impl<'a, T: Serialize + DeserializeOwned, N: HfNode<'a>> HfStream<'a, T, N> { } } +impl<'a, N: HfNode<'a>> HfStream<'a, (u32, Bytes), N> { + pub fn demux_bytes>( + &self, + other: &N2, + ) -> HfStream<'a, Result, N2> + where + N: HfDemuxTo<'a, N2>, + { + let self_ident = &self.ident; + + let mut builders_borrowed = self.builders.borrow_mut(); + let builders = builders_borrowed.as_mut().unwrap(); + + let source_name = self.node.next_port(); + let self_sink = self.node.gen_sink_statement(&source_name); + + builders + .entry(self.node.id()) + .or_default() + .add_statement(parse_quote! { + #self_ident -> #self_sink; + }); + + let recipient_next_id = { + let mut next_id = self.next_id.borrow_mut(); + let id = *next_id; + *next_id += 1; + id + }; + + let ident = syn::Ident::new(&format!("stream_{}", recipient_next_id), Span::call_site()); + + let recipient_port_name = other.next_port(); + let recipient_source = N::gen_source_statement(other, &recipient_port_name); + + builders + .entry(other.id()) + .or_default() + .add_statement(parse_quote! { + #ident = #recipient_source -> tee(); + }); + + self.node + .demux_to(other, &source_name, &recipient_port_name); + + HfStream { + ident, + node: other.clone(), + next_id: self.next_id, + builders: self.builders, + _phantom: PhantomData, + } + } +} + +impl<'a, T: Serialize + DeserializeOwned, N: HfNode<'a>> HfStream<'a, (u32, T), N> { + pub fn demux_bincode>(&self, other: &N2) -> HfStream<'a, T, N2> + where + N: HfDemuxTo<'a, N2>, + { + let self_ident = &self.ident; + + let mut builders_borrowed = self.builders.borrow_mut(); + let builders = builders_borrowed.as_mut().unwrap(); + + let source_name = self.node.next_port(); + let self_sink = self.node.gen_sink_statement(&source_name); + + let hydroflow_crate = proc_macro_crate::crate_name("hydroflow_plus") + .expect("hydroflow_plus should be present in `Cargo.toml`"); + let root = match hydroflow_crate { + proc_macro_crate::FoundCrate::Itself => quote! { hydroflow_plus }, + proc_macro_crate::FoundCrate::Name(name) => { + let ident = syn::Ident::new(&name, Span::call_site()); + quote! { #ident } + } + }; + + // TODO(shadaj): this may fail when instantiated in an environment with different deps + let t_type: syn::Type = syn::parse_str(std::any::type_name::()).unwrap(); + + builders + .entry(self.node.id()) + .or_default() + .add_statement(parse_quote! { + #self_ident -> map(|(id, data): (u32, _)| { + (id, #root::runtime_support::bincode::serialize::<#t_type>(&data).unwrap().into()) + }) -> #self_sink; + }); + + let recipient_next_id = { + let mut next_id = self.next_id.borrow_mut(); + let id = *next_id; + *next_id += 1; + id + }; + + let ident = syn::Ident::new(&format!("stream_{}", recipient_next_id), Span::call_site()); + + let recipient_port_name = other.next_port(); + let recipient_source = N::gen_source_statement(other, &recipient_port_name); + + builders + .entry(other.id()) + .or_default() + .add_statement(parse_quote! { + #ident = #recipient_source -> map(|b| { + #root::runtime_support::bincode::deserialize::<#t_type>(&b.unwrap()).unwrap() + }) -> tee(); + }); + + self.node + .demux_to(other, &source_name, &recipient_port_name); + + HfStream { + ident, + node: other.clone(), + next_id: self.next_id, + builders: self.builders, + _phantom: PhantomData, + } + } +} + impl<'a, K, V1, N: HfNode<'a>> HfStream<'a, (K, V1), N> { pub fn join(&self, n: &HfStream<'a, (K, V2), N>) -> HfStream<'a, (K, (V1, V2)), N> where diff --git a/hydroflow_plus_cli_integration/Cargo.toml b/hydroflow_plus_cli_integration/Cargo.toml index c8ed720998f2..c10fd44b413e 100644 --- a/hydroflow_plus_cli_integration/Cargo.toml +++ b/hydroflow_plus_cli_integration/Cargo.toml @@ -6,13 +6,18 @@ edition = "2021" [features] default = [] -deploy = [ "hydro_cli" ] +deploy = [ "hydro_cli", "async-channel" ] [dependencies] stageleft = { path = "../stageleft", version = "^0.1.0" } hydroflow_plus = { path = "../hydroflow_plus", version = "^0.5.0", features = [ "cli_integration" ] } -proc-macro-crate = "1.1.0" syn = { version = "2.0.0", features = [ "parsing", "extra-traits" ] } -hydro_cli = { path = "../hydro_cli", version = "^0.5.0", optional = true } tokio = { version = "1.16", features = [ "full" ] } -async-channel = "1.8.0" +serde = { version = "1", features = [ "derive" ] } +serde_json = "1" + +hydro_cli = { path = "../hydro_cli", version = "^0.5.0", optional = true } +async-channel = { version = "1.8.0", optional = true } + +[build-dependencies] +stageleft_tool = { path = "../stageleft_tool", version = "^0.1.0" } diff --git a/hydroflow_plus_cli_integration/build.rs b/hydroflow_plus_cli_integration/build.rs new file mode 100644 index 000000000000..99775c3c7daa --- /dev/null +++ b/hydroflow_plus_cli_integration/build.rs @@ -0,0 +1,3 @@ +fn main() { + stageleft_tool::gen_final!(); +} diff --git a/hydroflow_plus_cli_integration/src/deploy.rs b/hydroflow_plus_cli_integration/src/deploy.rs index ea2a5c9239fe..8e6c08fa1e14 100644 --- a/hydroflow_plus_cli_integration/src/deploy.rs +++ b/hydroflow_plus_cli_integration/src/deploy.rs @@ -1,33 +1,37 @@ use std::cell::RefCell; +use std::collections::HashMap; use std::rc::Rc; use std::sync::Arc; use async_channel::Receiver; use hydro_cli::core::custom_service::CustomClientPort; -use hydro_cli::core::hydroflow_crate::ports::HydroflowSource; +use hydro_cli::core::hydroflow_crate::ports::{DemuxSink, HydroflowSink, HydroflowSource}; use hydro_cli::core::{Deployment, Host, HydroflowCrate}; use hydroflow_plus::builder::Builders; -use hydroflow_plus::node::{HfDeploy, HfNode, HfNodeBuilder, HfSendTo}; +use hydroflow_plus::node::{ + HfCluster, HfClusterBuilder, HfDemuxTo, HfDeploy, HfNode, HfNodeBuilder, HfSendTo, +}; use hydroflow_plus::HfBuilder; use stageleft::internal::syn::parse_quote; +use stageleft::q; use tokio::sync::RwLock; +use super::HydroflowPlusMeta; + pub struct CLIDeploy {} impl<'a> HfDeploy<'a> for CLIDeploy { type Node = CLIDeployNode<'a>; + type Cluster = CLIDeployCluster<'a>; + type Meta = HydroflowPlusMeta; + type RuntimeID = (); } -#[derive(Clone)] -pub struct CLIDeployNode<'a> { - id: usize, - builder: &'a HfBuilder<'a, CLIDeploy>, - next_port: Rc>, - underlying: Arc>, -} +pub trait DeployCrateWrapper { + fn underlying(&self) -> Arc>; -impl<'a> CLIDeployNode<'a> { - pub async fn create_sender( + #[allow(async_fn_in_trait)] + async fn create_sender( &self, port: &str, deployment: &mut Deployment, @@ -36,30 +40,46 @@ impl<'a> CLIDeployNode<'a> { let sender_service = deployment.CustomService(on.clone(), vec![]); let mut sender_port = sender_service.read().await.declare_client(&sender_service); let mut recipient = self - .underlying + .underlying() .read() .await - .get_port(port.to_string(), &self.underlying); + .get_port(port.to_string(), &self.underlying()); sender_port.send_to(&mut recipient); sender_port } - pub async fn stdout(&self) -> Receiver { - self.underlying.read().await.stdout().await + #[allow(async_fn_in_trait)] + async fn stdout(&self) -> Receiver { + self.underlying().read().await.stdout().await } - pub async fn stderr(&self) -> Receiver { - self.underlying.read().await.stderr().await + #[allow(async_fn_in_trait)] + async fn stderr(&self) -> Receiver { + self.underlying().read().await.stderr().await + } +} + +#[derive(Clone)] +pub struct CLIDeployNode<'a> { + id: usize, + builder: &'a HfBuilder<'a, CLIDeploy>, + next_port: Rc>, + underlying: Arc>, +} + +impl<'a> DeployCrateWrapper for CLIDeployNode<'a> { + fn underlying(&self) -> Arc> { + self.underlying.clone() } } -pub struct CLIDeployPort<'a> { - node: CLIDeployNode<'a>, +pub struct CLIDeployPort { + node: N, port: String, } -impl<'a> CLIDeployPort<'a> { +impl<'a> CLIDeployPort> { pub async fn create_sender( &self, deployment: &mut Deployment, @@ -70,7 +90,8 @@ impl<'a> CLIDeployPort<'a> { } impl<'a> HfNode<'a> for CLIDeployNode<'a> { - type Port = CLIDeployPort<'a>; + type Port = CLIDeployPort; + type Meta = HydroflowPlusMeta; fn id(&self) -> usize { self.id @@ -80,7 +101,7 @@ impl<'a> HfNode<'a> for CLIDeployNode<'a> { self.builder.builder_components() } - fn next_port(&self) -> CLIDeployPort<'a> { + fn next_port(&self) -> CLIDeployPort { let next_port = *self.next_port.borrow(); *self.next_port.borrow_mut() += 1; @@ -90,12 +111,68 @@ impl<'a> HfNode<'a> for CLIDeployNode<'a> { } } - fn gen_sink_statement(&self, _port: &Self::Port) -> hydroflow_plus::lang::parse::Pipeline { - parse_quote!(null()) + fn build(&mut self, meta: &Option) { + if let Some(meta) = meta { + let mut n = self.underlying.try_write().unwrap(); + n.set_meta(serde_json::to_string(&meta).unwrap()); + } } +} - fn gen_source_statement(&self, _port: &Self::Port) -> hydroflow_plus::lang::parse::Pipeline { - parse_quote!(null()) +#[derive(Clone)] +pub struct DeployClusterNode { + underlying: Arc>, +} + +impl DeployCrateWrapper for DeployClusterNode { + fn underlying(&self) -> Arc> { + self.underlying.clone() + } +} + +#[derive(Clone)] +pub struct CLIDeployCluster<'a> { + id: usize, + builder: &'a HfBuilder<'a, CLIDeploy>, + next_port: Rc>, + pub nodes: Vec, +} + +impl<'a> HfNode<'a> for CLIDeployCluster<'a> { + type Port = CLIDeployPort; + type Meta = HydroflowPlusMeta; + + fn id(&self) -> usize { + self.id + } + + fn graph_builder(&self) -> (&'a RefCell, &'a Builders) { + self.builder.builder_components() + } + + fn next_port(&self) -> CLIDeployPort { + let next_port = *self.next_port.borrow(); + *self.next_port.borrow_mut() += 1; + + CLIDeployPort { + node: self.clone(), + port: format!("port_{}", next_port), + } + } + + fn build(&mut self, meta: &Option) { + if let Some(meta) = meta { + self.nodes.iter().for_each(|n| { + let mut n = n.underlying.try_write().unwrap(); + n.set_meta(serde_json::to_string(&meta).unwrap()); + }); + } + } +} + +impl<'a> HfCluster<'a> for CLIDeployCluster<'a> { + fn ids(&self) -> impl stageleft::Quoted<'a, &'a Vec> { + q!(panic!()) } } @@ -103,8 +180,8 @@ impl<'a> HfSendTo<'a, CLIDeployNode<'a>> for CLIDeployNode<'a> { fn send_to( &self, other: &CLIDeployNode<'a>, - source_port: &CLIDeployPort<'a>, - recipient_port: &CLIDeployPort<'a>, + source_port: &CLIDeployPort>, + recipient_port: &CLIDeployPort>, ) { let mut source_port = self .underlying @@ -120,23 +197,115 @@ impl<'a> HfSendTo<'a, CLIDeployNode<'a>> for CLIDeployNode<'a> { source_port.send_to(&mut recipient_port); } + + fn gen_sink_statement(&self, _port: &Self::Port) -> hydroflow_plus::lang::parse::Pipeline { + parse_quote!(null()) + } + + fn gen_source_statement( + _other: &CLIDeployNode<'a>, + _port: &Self::Port, + ) -> hydroflow_plus::lang::parse::Pipeline { + parse_quote!(null()) + } } -pub struct CLIDeployNodeBuilder<'a>(Box Arc> + 'a>); +impl<'a> HfDemuxTo<'a, CLIDeployCluster<'a>> for CLIDeployNode<'a> { + fn demux_to( + &self, + other: &CLIDeployCluster<'a>, + source_port: &CLIDeployPort>, + recipient_port: &CLIDeployPort>, + ) { + let mut source_port = self + .underlying + .try_read() + .unwrap() + .get_port(source_port.port.clone(), &self.underlying); + + let mut recipient_port = DemuxSink { + demux: other + .nodes + .iter() + .enumerate() + .map(|(id, c)| { + let n = c.underlying.try_read().unwrap(); + ( + id as u32, + Arc::new(RwLock::new( + n.get_port(recipient_port.port.clone(), &c.underlying), + )) as Arc>, + ) + }) + .collect(), + }; + + source_port.send_to(&mut recipient_port); + } + + fn gen_sink_statement(&self, _port: &Self::Port) -> hydroflow_plus::lang::parse::Pipeline { + parse_quote!(null()) + } + + fn gen_source_statement( + _other: &CLIDeployCluster<'a>, + _port: &CLIDeployPort>, + ) -> hydroflow_plus::lang::parse::Pipeline { + parse_quote!(null()) + } +} + +type CrateBuilder<'a> = dyn FnMut(usize) -> Arc> + 'a; + +pub struct CLIDeployNodeBuilder<'a>(RefCell>>); impl<'a> CLIDeployNodeBuilder<'a> { pub fn new Arc> + 'a>(f: F) -> Self { - Self(Box::new(f)) + Self(RefCell::new(Box::new(f))) } } impl<'a: 'b, 'b> HfNodeBuilder<'a, CLIDeploy> for CLIDeployNodeBuilder<'b> { - fn build(&mut self, id: usize, builder: &'a HfBuilder<'a, CLIDeploy>) -> CLIDeployNode<'a> { + fn build(&self, id: usize, builder: &'a HfBuilder<'a, CLIDeploy>) -> CLIDeployNode<'a> { CLIDeployNode { id, builder, next_port: Rc::new(RefCell::new(0)), - underlying: (self.0)(id), + underlying: (self.0.borrow_mut())(id), + } + } +} + +type ClusterBuilder<'a> = dyn FnMut(usize) -> Vec>> + 'a; + +pub struct CLIDeployClusterBuilder<'a>(RefCell>>); + +impl<'a> CLIDeployClusterBuilder<'a> { + pub fn new Vec>> + 'a>(f: F) -> Self { + Self(RefCell::new(Box::new(f))) + } +} + +impl<'a: 'b, 'b> HfClusterBuilder<'a, CLIDeploy> for CLIDeployClusterBuilder<'b> { + fn build(&self, id: usize, builder: &'a HfBuilder<'a, CLIDeploy>) -> CLIDeployCluster<'a> { + let cluster_nodes = (self.0.borrow_mut())(id); + builder + .meta + .borrow_mut() + .get_or_insert(HydroflowPlusMeta { + clusters: HashMap::new(), + }) + .clusters + .insert(id, (0..(cluster_nodes.len() as u32)).collect()); + + CLIDeployCluster { + id, + builder, + next_port: Rc::new(RefCell::new(0)), + nodes: cluster_nodes + .into_iter() + .map(|u| DeployClusterNode { underlying: u }) + .collect(), } } } diff --git a/hydroflow_plus_cli_integration/src/lib.rs b/hydroflow_plus_cli_integration/src/lib.rs index d0ae1e5e7178..149dded68375 100644 --- a/hydroflow_plus_cli_integration/src/lib.rs +++ b/hydroflow_plus_cli_integration/src/lib.rs @@ -1,4 +1,8 @@ +stageleft::stageleft_no_entry_crate!(); + mod runtime; +use std::collections::HashMap; + pub use runtime::*; #[cfg(feature = "deploy")] @@ -6,3 +10,9 @@ mod deploy; #[cfg(feature = "deploy")] pub use deploy::*; +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize)] +pub struct HydroflowPlusMeta { + pub clusters: HashMap>, +} diff --git a/hydroflow_plus_cli_integration/src/runtime.rs b/hydroflow_plus_cli_integration/src/runtime.rs index b9da8b0949dc..0fdd301c0aee 100644 --- a/hydroflow_plus_cli_integration/src/runtime.rs +++ b/hydroflow_plus_cli_integration/src/runtime.rs @@ -2,17 +2,25 @@ use std::cell::RefCell; use std::rc::Rc; use hydroflow_plus::lang::parse::Pipeline; -use hydroflow_plus::node::{HfDeploy, HfNode, HfNodeBuilder, HfSendTo}; -use hydroflow_plus::util::cli::HydroCLI; +use hydroflow_plus::node::{ + HfCluster, HfClusterBuilder, HfDemuxTo, HfDeploy, HfNode, HfNodeBuilder, HfSendTo, +}; +use hydroflow_plus::util::cli::{ + ConnectedDemux, ConnectedDirect, ConnectedSink, ConnectedSource, HydroCLI, +}; use hydroflow_plus::HfBuilder; -use stageleft::internal::{quote, Span}; -use stageleft::{Quoted, RuntimeData}; +use stageleft::{q, Quoted, RuntimeData}; use syn::parse_quote; +use super::HydroflowPlusMeta; + pub struct CLIRuntime {} impl<'a> HfDeploy<'a> for CLIRuntime { type Node = CLIRuntimeNode<'a>; + type Cluster = CLIRuntimeCluster<'a>; + type Meta = String; + type RuntimeID = usize; } #[derive(Clone)] @@ -20,11 +28,12 @@ pub struct CLIRuntimeNode<'a> { id: usize, builder: &'a HfBuilder<'a, CLIRuntime>, next_port: Rc>, - cli: RuntimeData<&'a HydroCLI>, + cli: RuntimeData<&'a HydroCLI>, } impl<'a> HfNode<'a> for CLIRuntimeNode<'a> { type Port = String; + type Meta = String; fn id(&self) -> usize { self.id @@ -40,78 +49,140 @@ impl<'a> HfNode<'a> for CLIRuntimeNode<'a> { format!("port_{}", next_send_port) } - fn gen_source_statement(&self, port: &String) -> Pipeline { - let hydroflow_crate = proc_macro_crate::crate_name("hydroflow_plus") - .expect("hydroflow_plus should be present in `Cargo.toml`"); - let root = match hydroflow_crate { - proc_macro_crate::FoundCrate::Itself => quote! { hydroflow_plus }, - proc_macro_crate::FoundCrate::Name(name) => { - let ident = syn::Ident::new(&name, Span::call_site()); - quote! { #ident } - } - }; - - let self_cli_splice = self.cli.splice(); - parse_quote! { - source_stream({ - use #root::util::cli::ConnectedSource; - #self_cli_splice - .port(#port) - .connect_local_blocking::<#root::util::cli::ConnectedDirect>() - .into_source() - }) - } + fn build(&mut self, _meta: &Option) {} +} + +#[derive(Clone)] +pub struct CLIRuntimeCluster<'a> { + id: usize, + builder: &'a HfBuilder<'a, CLIRuntime>, + next_port: Rc>, + cli: RuntimeData<&'a HydroCLI>, +} + +impl<'a> HfNode<'a> for CLIRuntimeCluster<'a> { + type Port = String; + type Meta = String; + + fn id(&self) -> usize { + self.id } - fn gen_sink_statement(&self, port: &String) -> Pipeline { - let hydroflow_crate = proc_macro_crate::crate_name("hydroflow_plus") - .expect("hydroflow_plus should be present in `Cargo.toml`"); - let root = match hydroflow_crate { - proc_macro_crate::FoundCrate::Itself => quote! { hydroflow_plus }, - proc_macro_crate::FoundCrate::Name(name) => { - let ident = syn::Ident::new(&name, Span::call_site()); - quote! { #ident } - } - }; - - let self_cli_splice = self.cli.splice(); - parse_quote! { - dest_sink({ - use #root::util::cli::ConnectedSink; - #self_cli_splice - .port(#port) - .connect_local_blocking::<#root::util::cli::ConnectedDirect>() - .into_sink() - }) - } + fn graph_builder(&self) -> (&'a RefCell, &'a hydroflow_plus::builder::Builders) { + self.builder.builder_components() + } + + fn next_port(&self) -> String { + let next_send_port = *self.next_port.borrow(); + *self.next_port.borrow_mut() += 1; + format!("port_{}", next_send_port) + } + + fn build(&mut self, _meta: &Option) {} +} + +impl<'a> HfCluster<'a> for CLIRuntimeCluster<'a> { + fn ids(&self) -> impl Quoted<'a, &'a Vec> { + let cli = self.cli; + let self_id = self.id; + q!(cli.meta.as_ref().unwrap().clusters.get(&self_id).unwrap()) } } impl<'a> HfSendTo<'a, CLIRuntimeNode<'a>> for CLIRuntimeNode<'a> { fn send_to(&self, _other: &CLIRuntimeNode, _source_port: &String, _recipient_port: &String) {} + + fn gen_sink_statement(&self, port: &String) -> Pipeline { + let self_cli = self.cli; + let port = port.as_str(); + let sink_quote = q!({ + self_cli + .port(port) + .connect_local_blocking::() + .into_sink() + }) + .splice(); + + parse_quote!(dest_sink(#sink_quote)) + } + + fn gen_source_statement(other: &CLIRuntimeNode<'a>, port: &String) -> Pipeline { + let self_cli = other.cli; + let port = port.as_str(); + let source_quote = q!({ + self_cli + .port(port) + .connect_local_blocking::() + .into_source() + }) + .splice(); + + parse_quote!(source_stream(#source_quote)) + } } -pub struct CLIRuntimeNodeBuilder<'a> { - cli: RuntimeData<&'a HydroCLI>, +impl<'a> HfDemuxTo<'a, CLIRuntimeCluster<'a>> for CLIRuntimeNode<'a> { + fn demux_to( + &self, + _other: &CLIRuntimeCluster, + _source_port: &String, + _recipient_port: &String, + ) { + } + + fn gen_sink_statement(&self, port: &String) -> Pipeline { + let self_cli = self.cli; + let port = port.as_str(); + + let sink_quote = q!({ + self_cli + .port(port) + .connect_local_blocking::>() + .into_sink() + }) + .splice(); + + parse_quote!(dest_sink(#sink_quote)) + } + + fn gen_source_statement(other: &CLIRuntimeCluster<'a>, port: &String) -> Pipeline { + let self_cli = other.cli; + let port = port.as_str(); + + let source_quote = q!({ + self_cli + .port(port) + .connect_local_blocking::() + .into_source() + }) + .splice(); + + parse_quote!(source_stream(#source_quote)) + } } -impl CLIRuntimeNodeBuilder<'_> { - pub fn new(cli: RuntimeData<&HydroCLI>) -> CLIRuntimeNodeBuilder { - CLIRuntimeNodeBuilder { cli } +impl<'cli> HfNodeBuilder<'cli, CLIRuntime> for RuntimeData<&'cli HydroCLI> { + fn build(&self, id: usize, builder: &'cli HfBuilder<'cli, CLIRuntime>) -> CLIRuntimeNode<'cli> { + CLIRuntimeNode { + id, + builder, + next_port: Rc::new(RefCell::new(0)), + cli: *self, + } } } -impl<'cli> HfNodeBuilder<'cli, CLIRuntime> for CLIRuntimeNodeBuilder<'cli> { +impl<'cli> HfClusterBuilder<'cli, CLIRuntime> for RuntimeData<&'cli HydroCLI> { fn build( - &mut self, + &self, id: usize, builder: &'cli HfBuilder<'cli, CLIRuntime>, - ) -> CLIRuntimeNode<'cli> { - CLIRuntimeNode { + ) -> CLIRuntimeCluster<'cli> { + CLIRuntimeCluster { id, builder, next_port: Rc::new(RefCell::new(0)), - cli: self.cli, + cli: *self, } } } diff --git a/hydroflow_plus_test/Cargo.toml b/hydroflow_plus_test/Cargo.toml index 194e5e23eaf2..6fa7e628bd27 100644 --- a/hydroflow_plus_test/Cargo.toml +++ b/hydroflow_plus_test/Cargo.toml @@ -20,3 +20,4 @@ stageleft_tool = { path = "../stageleft_tool", version = "^0.1.0" } insta = "1.7.1" hydro_cli = { path = "../hydro_cli", version = "^0.5.0" } hydroflow_plus_cli_integration = { path = "../hydroflow_plus_cli_integration", version = "^0.5.0", features = [ "deploy" ] } +futures = "0.3" diff --git a/hydroflow_plus_test/examples/first_ten_distributed.rs b/hydroflow_plus_test/examples/first_ten_distributed.rs index 6923611991e8..8cd34d9e0bac 100644 --- a/hydroflow_plus_test/examples/first_ten_distributed.rs +++ b/hydroflow_plus_test/examples/first_ten_distributed.rs @@ -41,7 +41,7 @@ async fn main() { let builder = hydroflow_plus::HfBuilder::new(); hydroflow_plus_test::first_ten::first_ten_distributed( &builder, - &mut CLIDeployNodeBuilder::new(|id| { + &CLIDeployNodeBuilder::new(|id| { let host = create_host(&mut deployment); deployment.HydroflowCrate( ".", @@ -56,6 +56,7 @@ async fn main() { ) }), ); + builder.wire(); deployment.deploy().await.unwrap(); diff --git a/hydroflow_plus_test/examples/networked_basic.rs b/hydroflow_plus_test/examples/networked_basic.rs index 54ad96a91afe..a1f175db0c12 100644 --- a/hydroflow_plus_test/examples/networked_basic.rs +++ b/hydroflow_plus_test/examples/networked_basic.rs @@ -11,7 +11,7 @@ async fn main() { let builder = hydroflow_plus::HfBuilder::new(); let (source_zero_port, _, _) = hydroflow_plus_test::networked::networked_basic( &builder, - &mut CLIDeployNodeBuilder::new(|id| { + &CLIDeployNodeBuilder::new(|id| { deployment.HydroflowCrate( ".", localhost.clone(), @@ -25,6 +25,7 @@ async fn main() { ) }), ); + builder.wire(); let port_to_zero = source_zero_port .create_sender(&mut deployment, &localhost) diff --git a/hydroflow_plus_test/examples/simple_cluster.rs b/hydroflow_plus_test/examples/simple_cluster.rs new file mode 100644 index 000000000000..5472b6c8adca --- /dev/null +++ b/hydroflow_plus_test/examples/simple_cluster.rs @@ -0,0 +1,89 @@ +use std::cell::RefCell; +use std::sync::Arc; + +use hydro_cli::core::gcp::GCPNetwork; +use hydro_cli::core::{Deployment, Host}; +use hydroflow_plus_cli_integration::{CLIDeployClusterBuilder, CLIDeployNodeBuilder}; +use tokio::sync::RwLock; + +type HostCreator = Box Arc>>; + +// run with no args for localhost, with `gcp ` for GCP +#[tokio::main] +async fn main() { + let deployment = RefCell::new(Deployment::new()); + let host_arg = std::env::args().nth(1).unwrap_or_default(); + + let (create_host, profile): (HostCreator, Option) = if host_arg == *"gcp" { + let project = std::env::args().nth(2).unwrap(); + let network = Arc::new(RwLock::new(GCPNetwork::new(&project, None))); + + ( + Box::new(move |deployment| -> Arc> { + deployment.GCPComputeEngineHost( + &project, + "e2-micro", + "debian-cloud/debian-11", + "us-west1-a", + network.clone(), + None, + ) + }), + None, + ) + } else { + let localhost = deployment.borrow_mut().Localhost(); + ( + Box::new(move |_| -> Arc> { localhost.clone() }), + Some("dev".to_string()), + ) + }; + + let builder = hydroflow_plus::HfBuilder::new(); + hydroflow_plus_test::cluster::simple_cluster( + &builder, + &CLIDeployNodeBuilder::new(|id| { + let mut deployment = deployment.borrow_mut(); + let host = create_host(&mut deployment); + deployment.HydroflowCrate( + ".", + host.clone(), + Some("simple_cluster".into()), + None, + profile.clone(), + None, + Some(vec![id.to_string()]), + None, + vec![], + ) + }), + &CLIDeployClusterBuilder::new(|id| { + let mut deployment = deployment.borrow_mut(); + (0..2) + .map(|_| { + let host = create_host(&mut deployment); + deployment.HydroflowCrate( + ".", + host.clone(), + Some("simple_cluster".into()), + None, + profile.clone(), + None, + Some(vec![id.to_string()]), + None, + vec![], + ) + }) + .collect() + }), + ); + builder.wire(); + + let mut deployment = deployment.into_inner(); + + deployment.deploy().await.unwrap(); + + deployment.start().await.unwrap(); + + tokio::signal::ctrl_c().await.unwrap() +} diff --git a/hydroflow_plus_test/src/bin/simple_cluster.rs b/hydroflow_plus_test/src/bin/simple_cluster.rs new file mode 100644 index 000000000000..3df3182e9c6a --- /dev/null +++ b/hydroflow_plus_test/src/bin/simple_cluster.rs @@ -0,0 +1,10 @@ +// cannot use hydroflow::main because connect_local_blocking causes a deadlock +#[tokio::main] +async fn main() { + let node_id: usize = std::env::args().nth(1).unwrap().parse().unwrap(); + let ports = hydroflow::util::cli::init().await; + + let joined = hydroflow_plus_test::cluster::simple_cluster_runtime!(&ports, node_id); + + hydroflow::util::cli::launch_flow(joined).await; +} diff --git a/hydroflow_plus_test/src/cluster.rs b/hydroflow_plus_test/src/cluster.rs new file mode 100644 index 000000000000..1174aac33565 --- /dev/null +++ b/hydroflow_plus_test/src/cluster.rs @@ -0,0 +1,111 @@ +use hydroflow_plus::node::*; +use hydroflow_plus::*; +use stageleft::*; + +pub fn simple_cluster<'a, D: HfNetworkedDeploy<'a>>( + graph: &'a HfBuilder<'a, D>, + node_builder: &impl HfNodeBuilder<'a, D>, + cluster_builder: &impl HfClusterBuilder<'a, D>, +) -> D::Cluster { + let node = graph.node(node_builder); + let cluster = graph.cluster(cluster_builder); + + let numbers = node.source_iter(q!(0..5)); + let ids = node.source_iter(cluster.ids()).map(q!(|&id| id)); + + ids.cross_product(&numbers) + .map(q!(|(id, n)| (id, (id, n)))) + .demux_bincode(&cluster) + .for_each(q!(|n| println!("received: {:?}", n))); + + cluster +} + +use hydroflow::util::cli::HydroCLI; +use hydroflow_plus_cli_integration::{CLIRuntime, HydroflowPlusMeta}; + +#[stageleft::entry] +pub fn simple_cluster_runtime<'a>( + graph: &'a HfBuilder<'a, CLIRuntime>, + cli: RuntimeData<&'a HydroCLI>, + node_id: RuntimeData, +) -> impl Quoted<'a, Hydroflow<'a>> { + let _ = simple_cluster(graph, &cli, &cli); + graph.build(node_id) +} + +#[stageleft::runtime] +#[cfg(test)] +mod tests { + use std::cell::RefCell; + use std::time::Duration; + + use hydro_cli::core::Deployment; + use hydroflow::lattices::cc_traits::Iter; + use hydroflow_plus_cli_integration::{ + CLIDeployClusterBuilder, CLIDeployNodeBuilder, DeployCrateWrapper, + }; + + #[tokio::test] + async fn simple_cluster() { + let deployment = RefCell::new(Deployment::new()); + let localhost = deployment.borrow_mut().Localhost(); + + let builder = hydroflow_plus::HfBuilder::new(); + let cluster = super::simple_cluster( + &builder, + &CLIDeployNodeBuilder::new(|id| { + deployment.borrow_mut().HydroflowCrate( + ".", + localhost.clone(), + Some("simple_cluster".into()), + None, + Some("dev".into()), + None, + Some(vec![id.to_string()]), + None, + vec![], + ) + }), + &CLIDeployClusterBuilder::new(|id| { + (0..2) + .map(|_| { + deployment.borrow_mut().HydroflowCrate( + ".", + localhost.clone(), + Some("simple_cluster".into()), + None, + Some("dev".into()), + None, + Some(vec![id.to_string()]), + None, + vec![], + ) + }) + .collect() + }), + ); + builder.wire(); + + let mut deployment = deployment.into_inner(); + + deployment.deploy().await.unwrap(); + + let cluster_stdouts = + futures::future::join_all(cluster.nodes.iter().map(|node| node.stdout())).await; + + deployment.start().await.unwrap(); + + for (i, stdout) in cluster_stdouts.into_iter().enumerate() { + for j in 0..5 { + assert_eq!( + tokio::time::timeout(Duration::from_secs(1), stdout.recv()) + .await + .unwrap() + .unwrap(), + format!("received: ({}, {})", i, j) + ); + } + } + } +} diff --git a/hydroflow_plus_test/src/first_ten.rs b/hydroflow_plus_test/src/first_ten.rs index 0071dadc5659..efc614fa43b5 100644 --- a/hydroflow_plus_test/src/first_ten.rs +++ b/hydroflow_plus_test/src/first_ten.rs @@ -21,7 +21,7 @@ pub fn first_ten_runtime<'a>( pub fn first_ten_distributed<'a, D: HfNetworkedDeploy<'a>>( graph: &'a HfBuilder<'a, D>, - node_builder: &mut impl HfNodeBuilder<'a, D>, + node_builder: &impl HfNodeBuilder<'a, D>, ) -> D::Node { let node = graph.node(node_builder); let second_node = graph.node(node_builder); @@ -35,15 +35,15 @@ pub fn first_ten_distributed<'a, D: HfNetworkedDeploy<'a>>( } use hydroflow::util::cli::HydroCLI; -use hydroflow_plus_cli_integration::{CLIRuntime, CLIRuntimeNodeBuilder}; +use hydroflow_plus_cli_integration::{CLIRuntime, HydroflowPlusMeta}; #[stageleft::entry] pub fn first_ten_distributed_runtime<'a>( graph: &'a HfBuilder<'a, CLIRuntime>, - cli: RuntimeData<&'a HydroCLI>, + cli: RuntimeData<&'a HydroCLI>, node_id: RuntimeData, ) -> impl Quoted<'a, Hydroflow<'a>> { - let _ = first_ten_distributed(graph, &mut CLIRuntimeNodeBuilder::new(cli)); + let _ = first_ten_distributed(graph, &cli); graph.build(node_id) } @@ -53,7 +53,7 @@ mod tests { use std::time::Duration; use hydro_cli::core::Deployment; - use hydroflow_plus_cli_integration::CLIDeployNodeBuilder; + use hydroflow_plus_cli_integration::{CLIDeployNodeBuilder, DeployCrateWrapper}; #[tokio::test] async fn first_ten_distributed() { @@ -63,7 +63,7 @@ mod tests { let builder = hydroflow_plus::HfBuilder::new(); let second_node = super::first_ten_distributed( &builder, - &mut CLIDeployNodeBuilder::new(|id| { + &CLIDeployNodeBuilder::new(|id| { deployment.HydroflowCrate( ".", localhost.clone(), @@ -77,6 +77,7 @@ mod tests { ) }), ); + builder.wire(); deployment.deploy().await.unwrap(); diff --git a/hydroflow_plus_test/src/lib.rs b/hydroflow_plus_test/src/lib.rs index 32d0a4046fa0..c4ae218b001b 100644 --- a/hydroflow_plus_test/src/lib.rs +++ b/hydroflow_plus_test/src/lib.rs @@ -8,6 +8,7 @@ use hydroflow_plus::tokio_stream::wrappers::UnboundedReceiverStream; use hydroflow_plus::HfBuilder; use stageleft::{q, Quoted, RuntimeData}; +pub mod cluster; pub mod first_ten; pub mod networked; @@ -19,8 +20,8 @@ pub fn teed_join<'a, S: Stream + Unpin + 'a>( send_twice: bool, node_id: RuntimeData, ) -> impl Quoted<'a, Hydroflow<'a>> { - let node_zero = graph.node(&mut ()); - let node_one = graph.node(&mut ()); + let node_zero = graph.node(&()); + let node_one = graph.node(&()); let source = node_zero.source_stream(input_stream); let map1 = source.map(q!(|v| (v + 1, ()))); @@ -54,7 +55,7 @@ pub fn chat_app<'a>( output: RuntimeData<&'a UnboundedSender<(u32, String)>>, replay_messages: bool, ) -> impl Quoted<'a, Hydroflow<'a>> { - let node = graph.node(&mut ()); + let node = graph.node(&()); let users = node.source_stream(users_stream).persist(); let mut messages = node.source_stream(messages); @@ -81,7 +82,7 @@ pub fn graph_reachability<'a>( edges: RuntimeData>, reached_out: RuntimeData<&'a UnboundedSender>, ) -> impl Quoted<'a, Hydroflow<'a>> { - let node = graph.node(&mut ()); + let node = graph.node(&()); let roots = node.source_stream(roots); let edges = node.source_stream(edges); @@ -108,7 +109,7 @@ pub fn count_elems<'a, T: 'a>( input_stream: RuntimeData>, output: RuntimeData<&'a UnboundedSender>, ) -> impl Quoted<'a, Hydroflow<'a>> { - let node = graph.node(&mut ()); + let node = graph.node(&()); let source = node.source_stream(input_stream); let count = source.map(q!(|_| 1)).fold(q!(|| 0), q!(|a, b| *a += b)); diff --git a/hydroflow_plus_test/src/networked.rs b/hydroflow_plus_test/src/networked.rs index 8d27e45a7e39..2a0486b0d4a5 100644 --- a/hydroflow_plus_test/src/networked.rs +++ b/hydroflow_plus_test/src/networked.rs @@ -3,13 +3,13 @@ use hydroflow::util::cli::HydroCLI; use hydroflow_plus::node::{HfNetworkedDeploy, HfNode, HfNodeBuilder}; use hydroflow_plus::scheduled::graph::Hydroflow; use hydroflow_plus::HfBuilder; -use hydroflow_plus_cli_integration::{CLIRuntime, CLIRuntimeNodeBuilder}; +use hydroflow_plus_cli_integration::{CLIRuntime, HydroflowPlusMeta}; use stageleft::{q, Quoted, RuntimeData}; pub fn networked_basic<'a, D: HfNetworkedDeploy<'a>>( graph: &'a HfBuilder<'a, D>, - node_builder: &mut impl HfNodeBuilder<'a, D>, -) -> (D::Port, D::Node, D::Node) { + node_builder: &impl HfNodeBuilder<'a, D>, +) -> (D::NodePort, D::Node, D::Node) { let node_zero = graph.node(node_builder); let node_one = graph.node(node_builder); @@ -31,10 +31,10 @@ pub fn networked_basic<'a, D: HfNetworkedDeploy<'a>>( #[stageleft::entry] pub fn networked_basic_runtime<'a>( graph: &'a HfBuilder<'a, CLIRuntime>, - cli: RuntimeData<&'a HydroCLI>, + cli: RuntimeData<&'a HydroCLI>, node_id: RuntimeData, ) -> impl Quoted<'a, Hydroflow<'a>> { - let _ = networked_basic(graph, &mut CLIRuntimeNodeBuilder::new(cli)); + let _ = networked_basic(graph, &cli); graph.build(node_id) } @@ -46,7 +46,7 @@ mod tests { use hydro_cli::core::Deployment; use hydroflow::futures::SinkExt; use hydroflow::util::cli::ConnectedSink; - use hydroflow_plus_cli_integration::CLIDeployNodeBuilder; + use hydroflow_plus_cli_integration::{CLIDeployNodeBuilder, DeployCrateWrapper}; #[tokio::test] async fn networked_basic() { @@ -56,7 +56,7 @@ mod tests { let builder = hydroflow_plus::HfBuilder::new(); let (source_zero_port, _, node_one) = super::networked_basic( &builder, - &mut CLIDeployNodeBuilder::new(|id| { + &CLIDeployNodeBuilder::new(|id| { deployment.HydroflowCrate( ".", localhost.clone(), @@ -70,6 +70,7 @@ mod tests { ) }), ); + builder.wire(); let port_to_zero = source_zero_port .create_sender(&mut deployment, &localhost) diff --git a/stageleft/src/runtime_support.rs b/stageleft/src/runtime_support.rs index c7a79026b213..748dae0477ba 100644 --- a/stageleft/src/runtime_support.rs +++ b/stageleft/src/runtime_support.rs @@ -90,6 +90,12 @@ macro_rules! impl_free_variable_from_literal_numeric { impl_free_variable_from_literal_numeric!(i8, i16, i32, i64, i128, isize); impl_free_variable_from_literal_numeric!(u8, u16, u32, u64, u128, usize); +impl FreeVariable<&str> for &str { + fn to_tokens(self) -> (Option, Option) { + (None, Some(quote!(#self))) + } +} + pub struct Import { module_path: &'static str, crate_name: &'static str, diff --git a/topolotree/src/latency_measure.rs b/topolotree/src/latency_measure.rs index d95de1b36eea..a5f691f62c6d 100644 --- a/topolotree/src/latency_measure.rs +++ b/topolotree/src/latency_measure.rs @@ -16,7 +16,7 @@ use protocol::*; #[tokio::main] async fn main() { - let ports = hydroflow::util::cli::init().await; + let ports = hydroflow::util::cli::init::<()>().await; let mut start_node = ports .port("increment_start_node") .connect::() diff --git a/topolotree/src/main.rs b/topolotree/src/main.rs index 17b9c18c5214..92d51e58d2ad 100644 --- a/topolotree/src/main.rs +++ b/topolotree/src/main.rs @@ -216,7 +216,7 @@ async fn main() { let _self_id: u32 = args.next().unwrap().parse().unwrap(); let neighbors: Vec = args.map(|x| x.parse().unwrap()).collect(); - let ports = hydroflow::util::cli::init().await; + let ports = hydroflow::util::cli::init::<()>().await; let input_recv = ports .port("from_peer") diff --git a/topolotree/src/pn.rs b/topolotree/src/pn.rs index 4011174e3345..269d22c6b2c5 100644 --- a/topolotree/src/pn.rs +++ b/topolotree/src/pn.rs @@ -23,7 +23,7 @@ enum GossipOrIncrement { #[hydroflow::main] async fn main() { - let ports = hydroflow::util::cli::init().await; + let ports = hydroflow::util::cli::init::<()>().await; let my_id: Vec = serde_json::from_str(&std::env::args().nth(1).unwrap()).unwrap(); let my_id = my_id[0]; diff --git a/topolotree/src/pn_delta.rs b/topolotree/src/pn_delta.rs index 95b25a37977e..136b6cb2159c 100644 --- a/topolotree/src/pn_delta.rs +++ b/topolotree/src/pn_delta.rs @@ -23,7 +23,7 @@ type NextStateType = (u64, bool, Rc, Vec)>>); #[hydroflow::main] async fn main() { - let ports = hydroflow::util::cli::init().await; + let ports = hydroflow::util::cli::init::<()>().await; let my_id: Vec = serde_json::from_str(&std::env::args().nth(1).unwrap()).unwrap(); let my_id = my_id[0];