diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d57533eedcc2..738605a130cf 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -227,6 +227,16 @@ jobs: cd hydro_cli source .venv/bin/activate cd python_tests + cd ../.. + pip install -r requirements.txt + RUST_BACKTRACE=1 pytest *.py + + - name: Run Hydroflow+ Python tests + run: | + ulimit -c unlimited + cd hydro_cli + source .venv/bin/activate + cd ../hydroflow_plus_test/python_tests pip install -r requirements.txt RUST_BACKTRACE=1 pytest *.py diff --git a/Cargo.lock b/Cargo.lock index e96b680f00b2..c10dd202a585 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1494,6 +1494,7 @@ dependencies = [ name = "hydroflow_plus_test" version = "0.0.0" dependencies = [ + "hydroflow", "hydroflow_plus", "hydroflow_plus_test_macro", "insta", @@ -1501,17 +1502,20 @@ dependencies = [ "serde", "stageleft", "stageleft_tool", + "tokio", ] [[package]] name = "hydroflow_plus_test_macro" version = "0.0.0" dependencies = [ + "hydroflow", "hydroflow_plus", "regex", "serde", "stageleft", "stageleft_tool", + "tokio", ] [[package]] diff --git a/docs/docs/deploy/your-first-deploy.md b/docs/docs/deploy/your-first-deploy.md index fde31740c6b1..c4498abaebc8 100644 --- a/docs/docs/deploy/your-first-deploy.md +++ b/docs/docs/deploy/your-first-deploy.md @@ -29,7 +29,7 @@ Let's open up `src/main.rs` in the generated project and write a new `main` func ```rust #[hydroflow::main] async fn main() { - let mut ports = hydroflow::util::cli::init().await; + let ports = hydroflow::util::cli::init().await; } ``` @@ -72,7 +72,7 @@ use hydroflow::hydroflow_syntax; #[hydroflow::main] async fn main() { - let mut ports = hydroflow::util::cli::init().await; + let ports = hydroflow::util::cli::init().await; let input_recv = ports .port("input") diff --git a/hydro_cli_examples/examples/dedalus_2pc_coordinator/main.rs b/hydro_cli_examples/examples/dedalus_2pc_coordinator/main.rs index 4e0bf8ea8769..095999b50395 100644 --- a/hydro_cli_examples/examples/dedalus_2pc_coordinator/main.rs +++ b/hydro_cli_examples/examples/dedalus_2pc_coordinator/main.rs @@ -4,7 +4,7 @@ use hydroflow_datalog::datalog; #[hydroflow::main] async fn main() { - let mut ports = hydroflow::util::cli::init().await; + let ports = hydroflow::util::cli::init().await; let vote_to_participant_port = ports .port("vote_to_participant") .connect::>() diff --git a/hydro_cli_examples/examples/dedalus_2pc_participant/main.rs b/hydro_cli_examples/examples/dedalus_2pc_participant/main.rs index 457f5856503f..72b425b6118c 100644 --- a/hydro_cli_examples/examples/dedalus_2pc_participant/main.rs +++ b/hydro_cli_examples/examples/dedalus_2pc_participant/main.rs @@ -4,7 +4,7 @@ use hydroflow_datalog::datalog; #[hydroflow::main] async fn main() { - let mut ports = hydroflow::util::cli::init().await; + let ports = hydroflow::util::cli::init().await; let vote_to_participant_source = ports .port("vote_to_participant") .connect::() diff --git a/hydro_cli_examples/examples/dedalus_receiver/main.rs b/hydro_cli_examples/examples/dedalus_receiver/main.rs index 42b8fc3c06cf..bbbdd0ed5508 100644 --- a/hydro_cli_examples/examples/dedalus_receiver/main.rs +++ b/hydro_cli_examples/examples/dedalus_receiver/main.rs @@ -4,7 +4,7 @@ use hydroflow_datalog::datalog; #[hydroflow::main] async fn main() { - let mut ports = hydroflow::util::cli::init().await; + let ports = hydroflow::util::cli::init().await; let broadcast_recv = ports .port("broadcast") .connect::() diff --git a/hydro_cli_examples/examples/dedalus_sender/main.rs b/hydro_cli_examples/examples/dedalus_sender/main.rs index 40748bed345f..48a9c04109dd 100644 --- a/hydro_cli_examples/examples/dedalus_sender/main.rs +++ b/hydro_cli_examples/examples/dedalus_sender/main.rs @@ -5,7 +5,7 @@ use hydroflow_datalog::datalog; #[hydroflow::main] async fn main() { - let mut ports = hydroflow::util::cli::init().await; + let ports = hydroflow::util::cli::init().await; let broadcast_port = ports .port("broadcast") .connect::>() diff --git a/hydro_cli_examples/examples/dedalus_vote_leader/main.rs b/hydro_cli_examples/examples/dedalus_vote_leader/main.rs index 9306a50b2acc..e8a1879e4c9e 100644 --- a/hydro_cli_examples/examples/dedalus_vote_leader/main.rs +++ b/hydro_cli_examples/examples/dedalus_vote_leader/main.rs @@ -4,7 +4,7 @@ use hydroflow_datalog::datalog; #[hydroflow::main] async fn main() { - let mut ports = hydroflow::util::cli::init().await; + let ports = hydroflow::util::cli::init().await; let to_replica_port = ports .port("to_replica") .connect::>() diff --git a/hydro_cli_examples/examples/dedalus_vote_participant/main.rs b/hydro_cli_examples/examples/dedalus_vote_participant/main.rs index 6d4fcf9a8de8..b34ae218ea3e 100644 --- a/hydro_cli_examples/examples/dedalus_vote_participant/main.rs +++ b/hydro_cli_examples/examples/dedalus_vote_participant/main.rs @@ -4,7 +4,7 @@ use hydroflow_datalog::datalog; #[hydroflow::main] async fn main() { - let mut ports = hydroflow::util::cli::init().await; + let ports = hydroflow::util::cli::init().await; let to_replica_source = ports .port("to_replica") .connect::() diff --git a/hydro_cli_examples/examples/stdout_receiver/main.rs b/hydro_cli_examples/examples/stdout_receiver/main.rs index 5dcb79d3d0e2..5ee8fd4f6f96 100644 --- a/hydro_cli_examples/examples/stdout_receiver/main.rs +++ b/hydro_cli_examples/examples/stdout_receiver/main.rs @@ -3,7 +3,7 @@ use hydroflow::util::cli::{ConnectedDirect, ConnectedSource}; #[hydroflow::main] async fn main() { - let mut ports = hydroflow::util::cli::init().await; + let ports = hydroflow::util::cli::init().await; let echo_recv = ports .port("echo") .connect::() diff --git a/hydro_cli_examples/examples/tagged_stdout_receiver/main.rs b/hydro_cli_examples/examples/tagged_stdout_receiver/main.rs index 4b2691f2478f..4ce51e27e21b 100644 --- a/hydro_cli_examples/examples/tagged_stdout_receiver/main.rs +++ b/hydro_cli_examples/examples/tagged_stdout_receiver/main.rs @@ -3,7 +3,7 @@ use hydroflow::util::cli::{ConnectedDirect, ConnectedSource, ConnectedTagged}; #[hydroflow::main] async fn main() { - let mut ports = hydroflow::util::cli::init().await; + let ports = hydroflow::util::cli::init().await; let echo_recv = ports .port("echo") .connect::>() diff --git a/hydro_cli_examples/examples/ws_chat_server/main.rs b/hydro_cli_examples/examples/ws_chat_server/main.rs index 5e301b7c7292..ec0b9d40da88 100644 --- a/hydro_cli_examples/examples/ws_chat_server/main.rs +++ b/hydro_cli_examples/examples/ws_chat_server/main.rs @@ -29,7 +29,7 @@ struct ChatMessage { #[hydroflow::main] async fn main() { - let mut ports = hydroflow::util::cli::init().await; + let ports = hydroflow::util::cli::init().await; let from_peer = ports .port("from_peer") diff --git a/hydroflow/src/scheduled/context.rs b/hydroflow/src/scheduled/context.rs index 47c92385fbf8..d6957f9e1542 100644 --- a/hydroflow/src/scheduled/context.rs +++ b/hydroflow/src/scheduled/context.rs @@ -3,6 +3,7 @@ use std::any::Any; use std::future::Future; use std::marker::PhantomData; +use std::pin::Pin; use instant::Instant; use tokio::sync::mpsc::UnboundedSender; @@ -37,6 +38,8 @@ pub struct Context { /// meaningless. pub(crate) subgraph_id: SubgraphId, + pub(crate) tasks_to_spawn: Vec + 'static>>>, + /// Join handles for spawned tasks. pub(crate) task_join_handles: Vec>, } @@ -156,12 +159,18 @@ impl Context { } /// Spawns an async task on the internal Tokio executor. - pub fn spawn_task(&mut self, future: Fut) + pub fn request_task(&mut self, future: Fut) where Fut: Future + 'static, { - self.task_join_handles - .push(tokio::task::spawn_local(future)); + self.tasks_to_spawn.push(Box::pin(future)); + } + + /// Launches all tasks requested with [`Self::request_task`]. + pub fn spawn_tasks(&mut self) { + for task in self.tasks_to_spawn.drain(..) { + self.task_join_handles.push(tokio::task::spawn_local(task)); + } } /// Aborts all tasks spawned with [`Self::spawn_task`]. diff --git a/hydroflow/src/scheduled/graph.rs b/hydroflow/src/scheduled/graph.rs index 5ca3d03ccb3e..6bb16372e6bc 100644 --- a/hydroflow/src/scheduled/graph.rs +++ b/hydroflow/src/scheduled/graph.rs @@ -61,6 +61,7 @@ impl<'a> Default for Hydroflow<'a> { subgraph_id: SubgraphId(0), + tasks_to_spawn: Vec::new(), task_join_handles: Vec::new(), }; Self { @@ -351,6 +352,7 @@ impl<'a> Hydroflow<'a> { /// TODO(mingwei): Currently blockes forever, no notion of "completion." #[tracing::instrument(level = "trace", skip(self), ret)] pub async fn run_async(&mut self) -> Option { + self.context.spawn_tasks(); loop { // Run any work which is immediately available. self.run_available_async().await; @@ -681,12 +683,12 @@ impl<'a> Hydroflow<'a> { } impl<'a> Hydroflow<'a> { - /// Alias for [`Context::spawn_task`]. - pub fn spawn_task(&mut self, future: Fut) + /// Alias for [`Context::request_task`]. + pub fn request_task(&mut self, future: Fut) where Fut: Future + 'static, { - self.context.spawn_task(future); + self.context.request_task(future); } /// Alias for [`Context::abort_tasks`]. diff --git a/hydroflow/src/util/cli.rs b/hydroflow/src/util/cli.rs index d14647b9b406..fbfc07246060 100644 --- a/hydroflow/src/util/cli.rs +++ b/hydroflow/src/util/cli.rs @@ -1,5 +1,6 @@ #![allow(missing_docs)] // TODO(mingwei) +use std::cell::RefCell; use std::collections::HashMap; pub use hydroflow_cli_integration::*; @@ -15,19 +16,28 @@ pub async fn launch_flow(mut flow: Hydroflow<'_>) { stop.0.send(()).unwrap(); }); + let local_set = tokio::task::LocalSet::new(); + let flow = local_set.run_until(async move { + flow.run_async().await; + }); + tokio::select! { _ = stop.1 => {}, - _ = flow.run_async() => {} + _ = flow => {} } } pub struct HydroCLI { - ports: HashMap, + ports: RefCell>, } impl HydroCLI { - pub fn port(&mut self, name: &str) -> ServerOrBound { - self.ports.remove(name).unwrap() + pub fn port(&self, name: &str) -> ServerOrBound { + self.ports + .try_borrow_mut() + .unwrap() + .remove(name) + .unwrap_or_else(|| panic!("port {} not found", name)) } } @@ -73,6 +83,6 @@ pub async fn init() -> HydroCLI { println!("ack start"); HydroCLI { - ports: all_connected, + ports: RefCell::new(all_connected), } } diff --git a/hydroflow_cli_integration/src/lib.rs b/hydroflow_cli_integration/src/lib.rs index 6fd864f86176..8841c5157f4c 100644 --- a/hydroflow_cli_integration/src/lib.rs +++ b/hydroflow_cli_integration/src/lib.rs @@ -175,6 +175,12 @@ impl ServerOrBound { T::from_defn(self).await } + pub fn connect_local_blocking(self) -> T { + let handle = tokio::runtime::Handle::current(); + let _guard = handle.enter(); + futures::executor::block_on(T::from_defn(self)) + } + pub async fn accept_tcp(&mut self) -> TcpStream { if let ServerOrBound::Bound(BoundConnection::TcpPort(handle, _)) = self { handle.recv().await.unwrap().unwrap() diff --git a/hydroflow_lang/src/graph/ops/dest_sink.rs b/hydroflow_lang/src/graph/ops/dest_sink.rs index 45eb0ff9e722..a01c82842589 100644 --- a/hydroflow_lang/src/graph/ops/dest_sink.rs +++ b/hydroflow_lang/src/graph/ops/dest_sink.rs @@ -139,7 +139,7 @@ pub const DEST_SINK: OperatorConstraints = OperatorConstraints { } } #hydroflow - .spawn_task(sink_feed_flush(#recv_ident, #sink_arg)); + .request_task(sink_feed_flush(#recv_ident, #sink_arg)); } }; diff --git a/hydroflow_plus/src/stream.rs b/hydroflow_plus/src/stream.rs index b8123fff5976..c127cdde8d3c 100644 --- a/hydroflow_plus/src/stream.rs +++ b/hydroflow_plus/src/stream.rs @@ -1,6 +1,7 @@ use std::hash::Hash; use std::marker::PhantomData; +use hydroflow::futures::Sink; use proc_macro2::Span; use stageleft::{IntoQuotedMut, Quoted}; use syn::parse_quote; @@ -332,6 +333,22 @@ impl<'a, T> HfStream<'a, T> { #ident = #self_ident -> for_each(#f); }); } + + pub fn dest_sink + 'a>(&self, sink: impl Quoted<'a, S>) { + let sink = sink.splice(); + let self_ident = &self.ident; + + self.graph + .builders + .borrow_mut() + .as_mut() + .unwrap() + .entry(self.node_id) + .or_default() + .add_statement(parse_quote! { + #self_ident -> dest_sink(#sink); + }); + } } impl<'a, K, V1> HfStream<'a, (K, V1)> { diff --git a/hydroflow_plus_test/Cargo.toml b/hydroflow_plus_test/Cargo.toml index 71b3b900235c..c5e701fbce09 100644 --- a/hydroflow_plus_test/Cargo.toml +++ b/hydroflow_plus_test/Cargo.toml @@ -5,7 +5,9 @@ version = "0.0.0" edition = "2021" [dependencies] +hydroflow = { path = "../hydroflow", version = "^0.5.0", features = [ "cli_integration" ] } hydroflow_plus = { path = "../hydroflow_plus", version = "^0.5.0" } +tokio = { version = "1.16", features = [ "full" ] } stageleft = { path = "../stageleft", version = "^0.1.0" } regex = "1" serde = "1" diff --git a/hydroflow_plus_test/examples/networked_basic.rs b/hydroflow_plus_test/examples/networked_basic.rs new file mode 100644 index 000000000000..1f18b163b070 --- /dev/null +++ b/hydroflow_plus_test/examples/networked_basic.rs @@ -0,0 +1,12 @@ +use hydroflow_plus_test::*; + +// cannot use hydroflow::main because connect_local_blocking causes a deadlock +#[tokio::main] +async fn main() { + let node_id: usize = std::env::args().nth(1).unwrap().parse().unwrap(); + let ports = hydroflow::util::cli::init().await; + + let joined = hydroflow_plus_test::networked::networked_basic!(&ports, node_id); + + hydroflow::util::cli::launch_flow(joined).await; +} diff --git a/hydroflow_plus_test/python_tests/basic.py b/hydroflow_plus_test/python_tests/basic.py new file mode 100644 index 000000000000..56f01eef7501 --- /dev/null +++ b/hydroflow_plus_test/python_tests/basic.py @@ -0,0 +1,48 @@ +from codecs import decode +import json +from pathlib import Path +import pytest +import hydro + +@pytest.mark.asyncio +async def test_networked_basic(): + deployment = hydro.Deployment() + localhost_machine = deployment.Localhost() + + sender = deployment.CustomService( + external_ports=[], + on=localhost_machine.client_only(), + ) + + program_zero = deployment.HydroflowCrate( + src=str((Path(__file__).parent.parent).absolute()), + args=["0"], + example="networked_basic", + profile="dev", + on=localhost_machine + ) + + program_one = deployment.HydroflowCrate( + src=str((Path(__file__).parent.parent).absolute()), + args=["1"], + example="networked_basic", + profile="dev", + on=localhost_machine + ) + + sender_port = sender.client_port() + sender_port.send_to(program_zero.ports.node_zero_input) + + program_zero.ports.node_zero_output.send_to(program_one.ports.node_one_input) + + await deployment.deploy() + + receiver_out = await program_one.stdout() + connection = await (await sender_port.server_port()).into_sink() + + await deployment.start() + await connection.send(bytes("hi!", "utf-8")) + + async for log in receiver_out: + assert log == "node one received: \"hi!\"" + break diff --git a/hydroflow_plus_test/python_tests/requirements.txt b/hydroflow_plus_test/python_tests/requirements.txt new file mode 100644 index 000000000000..ee4ba018603b --- /dev/null +++ b/hydroflow_plus_test/python_tests/requirements.txt @@ -0,0 +1,2 @@ +pytest +pytest-asyncio diff --git a/hydroflow_plus_test/src/lib.rs b/hydroflow_plus_test/src/lib.rs index 95d340c3672d..d532634f1de4 100644 --- a/hydroflow_plus_test/src/lib.rs +++ b/hydroflow_plus_test/src/lib.rs @@ -7,6 +7,8 @@ use hydroflow_plus::tokio_stream::wrappers::UnboundedReceiverStream; use hydroflow_plus::HfBuilder; use stageleft::{q, Quoted, RuntimeData}; +pub mod networked; + #[stageleft::entry(UnboundedReceiverStream)] pub fn teed_join<'a, S: Stream + Unpin + 'a>( graph: &'a HfBuilder<'a>, diff --git a/hydroflow_plus_test/src/networked.rs b/hydroflow_plus_test/src/networked.rs new file mode 100644 index 000000000000..dc425af49406 --- /dev/null +++ b/hydroflow_plus_test/src/networked.rs @@ -0,0 +1,47 @@ +use hydroflow::bytes::BytesMut; +use hydroflow::util::cli::{ConnectedDirect, ConnectedSink, ConnectedSource, HydroCLI}; +use hydroflow_plus::scheduled::graph::Hydroflow; +use hydroflow_plus::HfBuilder; +use stageleft::{q, Quoted, RuntimeData}; + +#[stageleft::entry] +pub fn networked_basic<'a>( + graph: &'a HfBuilder<'a>, + cli: RuntimeData<&'a HydroCLI>, + node_id: RuntimeData, +) -> impl Quoted<'a, Hydroflow<'a>> { + let source_zero = graph.source_stream( + 0, + q!({ + cli.port("node_zero_input") + .connect_local_blocking::() + .into_source() + }), + ); + + source_zero + .map(q!(|v: Result| v.unwrap().freeze())) + .dest_sink(q!({ + cli.port("node_zero_output") + .connect_local_blocking::() + .into_sink() + })); + + let source_one = graph.source_stream( + 1, + q!({ + cli.port("node_one_input") + .connect_local_blocking::() + .into_source() + }), + ); + + source_one.for_each(q!(|v: Result| { + println!( + "node one received: {:?}", + std::str::from_utf8(&v.unwrap()).unwrap() + ); + })); + + graph.build(node_id) +} diff --git a/hydroflow_plus_test_macro/Cargo.toml b/hydroflow_plus_test_macro/Cargo.toml index ac6efb37802d..c3ff206a97a8 100644 --- a/hydroflow_plus_test_macro/Cargo.toml +++ b/hydroflow_plus_test_macro/Cargo.toml @@ -13,7 +13,9 @@ default = ["macro"] macro = [] [dependencies] +hydroflow = { path = "../hydroflow", version = "^0.5.0", features = [ "cli_integration" ] } hydroflow_plus = { path = "../hydroflow_plus", version = "^0.5.0" } +tokio = { version = "1.16", features = [ "full" ] } stageleft = { path = "../stageleft", version = "^0.1.0" } regex = "1" serde = "1" diff --git a/stageleft_macro/src/lib.rs b/stageleft_macro/src/lib.rs index b3645f1385c1..975e9b09aaab 100644 --- a/stageleft_macro/src/lib.rs +++ b/stageleft_macro/src/lib.rs @@ -296,7 +296,6 @@ pub fn entry( }; let input_contents = input - .block .to_token_stream() .to_string() .chars() diff --git a/stageleft_tool/src/lib.rs b/stageleft_tool/src/lib.rs index 5e114756c5ca..35776fc64a59 100644 --- a/stageleft_tool/src/lib.rs +++ b/stageleft_tool/src/lib.rs @@ -32,8 +32,9 @@ impl<'a> Visit<'a> for GenMacroVistor { if is_entry { let cur_path = &self.current_mod; - let contents = i - .block + let mut i_cloned = i.clone(); + i_cloned.attrs = vec![]; + let contents = i_cloned .to_token_stream() .to_string() .chars() diff --git a/topolotree/src/latency_measure.rs b/topolotree/src/latency_measure.rs index 6384207ae4e3..d95de1b36eea 100644 --- a/topolotree/src/latency_measure.rs +++ b/topolotree/src/latency_measure.rs @@ -16,7 +16,7 @@ use protocol::*; #[tokio::main] async fn main() { - let mut ports = hydroflow::util::cli::init().await; + let ports = hydroflow::util::cli::init().await; let mut start_node = ports .port("increment_start_node") .connect::() diff --git a/topolotree/src/main.rs b/topolotree/src/main.rs index 7908f18de948..17b9c18c5214 100644 --- a/topolotree/src/main.rs +++ b/topolotree/src/main.rs @@ -216,7 +216,7 @@ async fn main() { let _self_id: u32 = args.next().unwrap().parse().unwrap(); let neighbors: Vec = args.map(|x| x.parse().unwrap()).collect(); - let mut ports = hydroflow::util::cli::init().await; + let ports = hydroflow::util::cli::init().await; let input_recv = ports .port("from_peer") diff --git a/topolotree/src/pn.rs b/topolotree/src/pn.rs index d43b7a6ba3e9..4011174e3345 100644 --- a/topolotree/src/pn.rs +++ b/topolotree/src/pn.rs @@ -23,7 +23,7 @@ enum GossipOrIncrement { #[hydroflow::main] async fn main() { - let mut ports = hydroflow::util::cli::init().await; + let ports = hydroflow::util::cli::init().await; let my_id: Vec = serde_json::from_str(&std::env::args().nth(1).unwrap()).unwrap(); let my_id = my_id[0]; diff --git a/topolotree/src/pn_delta.rs b/topolotree/src/pn_delta.rs index 9e363b607737..95b25a37977e 100644 --- a/topolotree/src/pn_delta.rs +++ b/topolotree/src/pn_delta.rs @@ -23,7 +23,7 @@ type NextStateType = (u64, bool, Rc, Vec)>>); #[hydroflow::main] async fn main() { - let mut ports = hydroflow::util::cli::init().await; + let ports = hydroflow::util::cli::init().await; let my_id: Vec = serde_json::from_str(&std::env::args().nth(1).unwrap()).unwrap(); let my_id = my_id[0];