Skip to content

Commit

Permalink
Merge d16e9f2 into sapling-pr-archive-shadaj
Browse files Browse the repository at this point in the history
  • Loading branch information
shadaj authored Dec 12, 2023
2 parents 88c6dd0 + d16e9f2 commit d39ca21
Show file tree
Hide file tree
Showing 31 changed files with 203 additions and 30 deletions.
10 changes: 10 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,16 @@ jobs:
cd hydro_cli
source .venv/bin/activate
cd python_tests
cd ../..
pip install -r requirements.txt
RUST_BACKTRACE=1 pytest *.py
- name: Run Hydroflow+ Python tests
run: |
ulimit -c unlimited
cd hydro_cli
source .venv/bin/activate
cd ../hydroflow_plus_test/python_tests
pip install -r requirements.txt
RUST_BACKTRACE=1 pytest *.py
Expand Down
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions docs/docs/deploy/your-first-deploy.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ Let's open up `src/main.rs` in the generated project and write a new `main` func
```rust
#[hydroflow::main]
async fn main() {
let mut ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init().await;
}
```

Expand Down Expand Up @@ -72,7 +72,7 @@ use hydroflow::hydroflow_syntax;

#[hydroflow::main]
async fn main() {
let mut ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init().await;

let input_recv = ports
.port("input")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use hydroflow_datalog::datalog;

#[hydroflow::main]
async fn main() {
let mut ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init().await;
let vote_to_participant_port = ports
.port("vote_to_participant")
.connect::<ConnectedDemux<ConnectedDirect>>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use hydroflow_datalog::datalog;

#[hydroflow::main]
async fn main() {
let mut ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init().await;
let vote_to_participant_source = ports
.port("vote_to_participant")
.connect::<ConnectedDirect>()
Expand Down
2 changes: 1 addition & 1 deletion hydro_cli_examples/examples/dedalus_receiver/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use hydroflow_datalog::datalog;

#[hydroflow::main]
async fn main() {
let mut ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init().await;
let broadcast_recv = ports
.port("broadcast")
.connect::<ConnectedDirect>()
Expand Down
2 changes: 1 addition & 1 deletion hydro_cli_examples/examples/dedalus_sender/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use hydroflow_datalog::datalog;

#[hydroflow::main]
async fn main() {
let mut ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init().await;
let broadcast_port = ports
.port("broadcast")
.connect::<ConnectedDemux<ConnectedDirect>>()
Expand Down
2 changes: 1 addition & 1 deletion hydro_cli_examples/examples/dedalus_vote_leader/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use hydroflow_datalog::datalog;

#[hydroflow::main]
async fn main() {
let mut ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init().await;
let to_replica_port = ports
.port("to_replica")
.connect::<ConnectedDemux<ConnectedDirect>>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use hydroflow_datalog::datalog;

#[hydroflow::main]
async fn main() {
let mut ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init().await;
let to_replica_source = ports
.port("to_replica")
.connect::<ConnectedDirect>()
Expand Down
2 changes: 1 addition & 1 deletion hydro_cli_examples/examples/stdout_receiver/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use hydroflow::util::cli::{ConnectedDirect, ConnectedSource};

#[hydroflow::main]
async fn main() {
let mut ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init().await;
let echo_recv = ports
.port("echo")
.connect::<ConnectedDirect>()
Expand Down
2 changes: 1 addition & 1 deletion hydro_cli_examples/examples/tagged_stdout_receiver/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use hydroflow::util::cli::{ConnectedDirect, ConnectedSource, ConnectedTagged};

#[hydroflow::main]
async fn main() {
let mut ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init().await;
let echo_recv = ports
.port("echo")
.connect::<ConnectedTagged<ConnectedDirect>>()
Expand Down
2 changes: 1 addition & 1 deletion hydro_cli_examples/examples/ws_chat_server/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ struct ChatMessage {

#[hydroflow::main]
async fn main() {
let mut ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init().await;

let from_peer = ports
.port("from_peer")
Expand Down
15 changes: 12 additions & 3 deletions hydroflow/src/scheduled/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use std::any::Any;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;

use instant::Instant;
use tokio::sync::mpsc::UnboundedSender;
Expand Down Expand Up @@ -37,6 +38,8 @@ pub struct Context {
/// meaningless.
pub(crate) subgraph_id: SubgraphId,

pub(crate) tasks_to_spawn: Vec<Pin<Box<dyn Future<Output = ()> + 'static>>>,

/// Join handles for spawned tasks.
pub(crate) task_join_handles: Vec<JoinHandle<()>>,
}
Expand Down Expand Up @@ -156,12 +159,18 @@ impl Context {
}

/// Spawns an async task on the internal Tokio executor.
pub fn spawn_task<Fut>(&mut self, future: Fut)
pub fn request_task<Fut>(&mut self, future: Fut)
where
Fut: Future<Output = ()> + 'static,
{
self.task_join_handles
.push(tokio::task::spawn_local(future));
self.tasks_to_spawn.push(Box::pin(future));
}

/// Launches all tasks requested with [`Self::request_task`].
pub fn spawn_tasks(&mut self) {
for task in self.tasks_to_spawn.drain(..) {
self.task_join_handles.push(tokio::task::spawn_local(task));
}
}

/// Aborts all tasks spawned with [`Self::spawn_task`].
Expand Down
8 changes: 5 additions & 3 deletions hydroflow/src/scheduled/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ impl<'a> Default for Hydroflow<'a> {

subgraph_id: SubgraphId(0),

tasks_to_spawn: Vec::new(),
task_join_handles: Vec::new(),
};
Self {
Expand Down Expand Up @@ -351,6 +352,7 @@ impl<'a> Hydroflow<'a> {
/// TODO(mingwei): Currently blockes forever, no notion of "completion."
#[tracing::instrument(level = "trace", skip(self), ret)]
pub async fn run_async(&mut self) -> Option<Never> {
self.context.spawn_tasks();
loop {
// Run any work which is immediately available.
self.run_available_async().await;
Expand Down Expand Up @@ -681,12 +683,12 @@ impl<'a> Hydroflow<'a> {
}

impl<'a> Hydroflow<'a> {
/// Alias for [`Context::spawn_task`].
pub fn spawn_task<Fut>(&mut self, future: Fut)
/// Alias for [`Context::request_task`].
pub fn request_task<Fut>(&mut self, future: Fut)
where
Fut: Future<Output = ()> + 'static,
{
self.context.spawn_task(future);
self.context.request_task(future);
}

/// Alias for [`Context::abort_tasks`].
Expand Down
20 changes: 15 additions & 5 deletions hydroflow/src/util/cli.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![allow(missing_docs)] // TODO(mingwei)

use std::cell::RefCell;
use std::collections::HashMap;

pub use hydroflow_cli_integration::*;
Expand All @@ -15,19 +16,28 @@ pub async fn launch_flow(mut flow: Hydroflow<'_>) {
stop.0.send(()).unwrap();
});

let local_set = tokio::task::LocalSet::new();
let flow = local_set.run_until(async move {
flow.run_async().await;
});

tokio::select! {
_ = stop.1 => {},
_ = flow.run_async() => {}
_ = flow => {}
}
}

pub struct HydroCLI {
ports: HashMap<String, ServerOrBound>,
ports: RefCell<HashMap<String, ServerOrBound>>,
}

impl HydroCLI {
pub fn port(&mut self, name: &str) -> ServerOrBound {
self.ports.remove(name).unwrap()
pub fn port(&self, name: &str) -> ServerOrBound {
self.ports
.try_borrow_mut()
.unwrap()
.remove(name)
.unwrap_or_else(|| panic!("port {} not found", name))
}
}

Expand Down Expand Up @@ -73,6 +83,6 @@ pub async fn init() -> HydroCLI {
println!("ack start");

HydroCLI {
ports: all_connected,
ports: RefCell::new(all_connected),
}
}
6 changes: 6 additions & 0 deletions hydroflow_cli_integration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,12 @@ impl ServerOrBound {
T::from_defn(self).await
}

pub fn connect_local_blocking<T: Connected>(self) -> T {
let handle = tokio::runtime::Handle::current();
let _guard = handle.enter();
futures::executor::block_on(T::from_defn(self))
}

pub async fn accept_tcp(&mut self) -> TcpStream {
if let ServerOrBound::Bound(BoundConnection::TcpPort(handle, _)) = self {
handle.recv().await.unwrap().unwrap()
Expand Down
2 changes: 1 addition & 1 deletion hydroflow_lang/src/graph/ops/dest_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ pub const DEST_SINK: OperatorConstraints = OperatorConstraints {
}
}
#hydroflow
.spawn_task(sink_feed_flush(#recv_ident, #sink_arg));
.request_task(sink_feed_flush(#recv_ident, #sink_arg));
}
};

Expand Down
17 changes: 17 additions & 0 deletions hydroflow_plus/src/stream.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::hash::Hash;
use std::marker::PhantomData;

use hydroflow::futures::Sink;
use proc_macro2::Span;
use stageleft::{IntoQuotedMut, Quoted};
use syn::parse_quote;
Expand Down Expand Up @@ -332,6 +333,22 @@ impl<'a, T> HfStream<'a, T> {
#ident = #self_ident -> for_each(#f);
});
}

pub fn dest_sink<S: Unpin + Sink<T> + 'a>(&self, sink: impl Quoted<'a, S>) {
let sink = sink.splice();
let self_ident = &self.ident;

self.graph
.builders
.borrow_mut()
.as_mut()
.unwrap()
.entry(self.node_id)
.or_default()
.add_statement(parse_quote! {
#self_ident -> dest_sink(#sink);
});
}
}

impl<'a, K, V1> HfStream<'a, (K, V1)> {
Expand Down
2 changes: 2 additions & 0 deletions hydroflow_plus_test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ version = "0.0.0"
edition = "2021"

[dependencies]
hydroflow = { path = "../hydroflow", version = "^0.5.0", features = [ "cli_integration" ] }
hydroflow_plus = { path = "../hydroflow_plus", version = "^0.5.0" }
tokio = { version = "1.16", features = [ "full" ] }
stageleft = { path = "../stageleft", version = "^0.1.0" }
regex = "1"
serde = "1"
Expand Down
12 changes: 12 additions & 0 deletions hydroflow_plus_test/examples/networked_basic.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
use hydroflow_plus_test::*;

// cannot use hydroflow::main because connect_local_blocking causes a deadlock
#[tokio::main]
async fn main() {
let node_id: usize = std::env::args().nth(1).unwrap().parse().unwrap();
let ports = hydroflow::util::cli::init().await;

let joined = hydroflow_plus_test::networked::networked_basic!(&ports, node_id);

hydroflow::util::cli::launch_flow(joined).await;
}
48 changes: 48 additions & 0 deletions hydroflow_plus_test/python_tests/basic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from codecs import decode
import json
from pathlib import Path
import pytest
import hydro

@pytest.mark.asyncio
async def test_networked_basic():
deployment = hydro.Deployment()
localhost_machine = deployment.Localhost()

sender = deployment.CustomService(
external_ports=[],
on=localhost_machine.client_only(),
)

program_zero = deployment.HydroflowCrate(
src=str((Path(__file__).parent.parent).absolute()),
args=["0"],
example="networked_basic",
profile="dev",
on=localhost_machine
)

program_one = deployment.HydroflowCrate(
src=str((Path(__file__).parent.parent).absolute()),
args=["1"],
example="networked_basic",
profile="dev",
on=localhost_machine
)

sender_port = sender.client_port()
sender_port.send_to(program_zero.ports.node_zero_input)

program_zero.ports.node_zero_output.send_to(program_one.ports.node_one_input)

await deployment.deploy()

receiver_out = await program_one.stdout()
connection = await (await sender_port.server_port()).into_sink()

await deployment.start()
await connection.send(bytes("hi!", "utf-8"))

async for log in receiver_out:
assert log == "node one received: \"hi!\""
break
2 changes: 2 additions & 0 deletions hydroflow_plus_test/python_tests/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pytest
pytest-asyncio
Loading

0 comments on commit d39ca21

Please sign in to comment.