Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(hydroflow_plus): pass subgraph ID through deploy metadata #996

Merged
merged 1 commit into from
Dec 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 6 additions & 9 deletions docs/docs/hydroflow_plus/distributed.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,9 @@ use hydroflow_plus_cli_integration::{CLIRuntime, HydroflowPlusMeta};
pub fn first_ten_distributed_runtime<'a>(
graph: &'a GraphBuilder<'a, CLIRuntime>,
cli: RuntimeData<&'a HydroCLI<HydroflowPlusMeta>>,
subgraph_id: RuntimeData<usize>,
) -> impl Quoted<'a, Hydroflow<'a>> {
let _ = first_ten_distributed(graph, &cli);
graph.build(subgraph_id)
graph.build(q!(cli.meta.subgraph_id))
}
```

Expand All @@ -68,12 +67,11 @@ The corresponding binary in `src/bin/first_ten_distributed.rs` then instantiates
```rust
#[tokio::main]
async fn main() {
let subgraph_id: usize = std::env::args().nth(1).unwrap().parse().unwrap();
let ports = hydroflow::util::cli::init().await;

let joined = flow::first_ten_distributed_runtime!(&ports, subgraph_id);

hydroflow::util::cli::launch_flow(joined).await;
hydroflow::util::cli::launch_flow(
flow::first_ten_distributed_runtime!(&ports)
).await;
}
```

Expand All @@ -92,12 +90,11 @@ async fn main() {
let builder = hydroflow_plus::GraphBuilder::new();
hydroflow_plus_test::first_ten::first_ten_distributed(
&builder,
&CLIDeployNodeBuilder::new(|id| {
&CLIDeployNodeBuilder::new(|| {
deployment.add_service(
HydroflowCrate::new(".", localhost.clone())
.bin("first_ten_distributed")
.profile("dev")
.args(vec![id.to_string()]),
.profile("dev"),
)
}),
);
Expand Down
5 changes: 3 additions & 2 deletions hydro_deploy/core/src/hydroflow_crate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use async_channel::Receiver;
use async_trait::async_trait;
use futures_core::Future;
use hydroflow_cli_integration::{InitConfig, ServerPort};
use serde::Serialize;
use tokio::sync::RwLock;

use self::ports::{HydroflowPortConfig, HydroflowSink, SourcePath};
Expand Down Expand Up @@ -202,12 +203,12 @@ impl HydroflowCrateService {
}
}

pub fn update_meta(&mut self, meta: String) {
pub fn update_meta<T: Serialize>(&mut self, meta: T) {
if self.launched_binary.is_some() {
panic!("Cannot update meta after binary has been launched")
}

self.meta = Some(meta);
self.meta = Some(serde_json::to_string(&meta).unwrap());
}

pub fn get_port(
Expand Down
1 change: 0 additions & 1 deletion hydro_deploy/hydroflow_plus_cli_integration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ hydroflow_plus = { path = "../../hydroflow_plus", version = "^0.5.0", features =
syn = { version = "2.0.0", features = [ "parsing", "extra-traits" ] }
tokio = { version = "1.16", features = [ "full" ] }
serde = { version = "1", features = [ "derive" ] }
serde_json = "1"

hydro_deploy = { path = "../core", version = "^0.5.0", optional = true }
async-channel = { version = "1.8.0", optional = true }
Expand Down
39 changes: 23 additions & 16 deletions hydro_deploy/hydroflow_plus_cli_integration/src/deploy.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::cell::RefCell;
use std::collections::HashMap;
use std::rc::Rc;
use std::sync::Arc;

Expand Down Expand Up @@ -26,7 +27,7 @@ pub struct CLIDeploy {}
impl<'a> Deploy<'a> for CLIDeploy {
type Node = CLIDeployNode<'a>;
type Cluster = CLIDeployCluster<'a>;
type Meta = HydroflowPlusMeta;
type Meta = HashMap<usize, Vec<u32>>;
type RuntimeID = ();
type NodePort = CLIDeployPort<CLIDeployNode<'a>>;
type ClusterPort = CLIDeployPort<CLIDeployCluster<'a>>;
Expand Down Expand Up @@ -96,7 +97,7 @@ impl<'a> CLIDeployPort<CLIDeployNode<'a>> {

impl<'a> HfNode<'a> for CLIDeployNode<'a> {
type Port = CLIDeployPort<Self>;
type Meta = HydroflowPlusMeta;
type Meta = HashMap<usize, Vec<u32>>;

fn id(&self) -> usize {
self.id
Expand All @@ -118,7 +119,10 @@ impl<'a> HfNode<'a> for CLIDeployNode<'a> {

fn update_meta(&mut self, meta: &Self::Meta) {
let mut n = self.underlying.try_write().unwrap();
n.update_meta(serde_json::to_string(&meta).unwrap());
n.update_meta(HydroflowPlusMeta {
clusters: meta.clone(),
subgraph_id: self.id,
});
}
}

Expand All @@ -143,7 +147,7 @@ pub struct CLIDeployCluster<'a> {

impl<'a> HfNode<'a> for CLIDeployCluster<'a> {
type Port = CLIDeployPort<Self>;
type Meta = HydroflowPlusMeta;
type Meta = HashMap<usize, Vec<u32>>;

fn id(&self) -> usize {
self.id
Expand All @@ -164,10 +168,14 @@ impl<'a> HfNode<'a> for CLIDeployCluster<'a> {
}

fn update_meta(&mut self, meta: &Self::Meta) {
let json_meta = serde_json::to_string(&meta).unwrap();
let meta = HydroflowPlusMeta {
clusters: meta.clone(),
subgraph_id: self.id,
};

self.nodes.iter().for_each(|n| {
let mut n = n.underlying.try_write().unwrap();
n.update_meta(json_meta.clone());
n.update_meta(&meta);
});
}
}
Expand Down Expand Up @@ -351,12 +359,12 @@ impl<'a> HfSendManyToMany<'a, CLIDeployCluster<'a>> for CLIDeployCluster<'a> {
}
}

type CrateBuilder<'a> = dyn FnMut(usize) -> Arc<RwLock<HydroflowCrateService>> + 'a;
type CrateBuilder<'a> = dyn FnMut() -> Arc<RwLock<HydroflowCrateService>> + 'a;

pub struct CLIDeployNodeBuilder<'a>(RefCell<Box<CrateBuilder<'a>>>);

impl<'a> CLIDeployNodeBuilder<'a> {
pub fn new<F: FnMut(usize) -> Arc<RwLock<HydroflowCrateService>> + 'a>(f: F) -> Self {
pub fn new<F: FnMut() -> Arc<RwLock<HydroflowCrateService>> + 'a>(f: F) -> Self {
Self(RefCell::new(Box::new(f)))
}
}
Expand All @@ -366,23 +374,23 @@ impl<'a: 'b, 'b> NodeBuilder<'a, CLIDeploy> for CLIDeployNodeBuilder<'b> {
&self,
id: usize,
builder: &'a GraphBuilder<'a, CLIDeploy>,
_meta: &mut HydroflowPlusMeta,
_meta: &mut HashMap<usize, Vec<u32>>,
) -> CLIDeployNode<'a> {
CLIDeployNode {
id,
builder,
next_port: Rc::new(RefCell::new(0)),
underlying: (self.0.borrow_mut())(id),
underlying: (self.0.borrow_mut())(),
}
}
}

type ClusterBuilderFn<'a> = dyn FnMut(usize) -> Vec<Arc<RwLock<HydroflowCrateService>>> + 'a;
type ClusterBuilderFn<'a> = dyn FnMut() -> Vec<Arc<RwLock<HydroflowCrateService>>> + 'a;

pub struct CLIDeployClusterBuilder<'a>(RefCell<Box<ClusterBuilderFn<'a>>>);

impl<'a> CLIDeployClusterBuilder<'a> {
pub fn new<F: FnMut(usize) -> Vec<Arc<RwLock<HydroflowCrateService>>> + 'a>(f: F) -> Self {
pub fn new<F: FnMut() -> Vec<Arc<RwLock<HydroflowCrateService>>> + 'a>(f: F) -> Self {
Self(RefCell::new(Box::new(f)))
}
}
Expand All @@ -392,11 +400,10 @@ impl<'a: 'b, 'b> ClusterBuilder<'a, CLIDeploy> for CLIDeployClusterBuilder<'b> {
&self,
id: usize,
builder: &'a GraphBuilder<'a, CLIDeploy>,
meta: &mut HydroflowPlusMeta,
meta: &mut HashMap<usize, Vec<u32>>,
) -> CLIDeployCluster<'a> {
let cluster_nodes = (self.0.borrow_mut())(id);
meta.clusters
.insert(id, (0..(cluster_nodes.len() as u32)).collect());
let cluster_nodes = (self.0.borrow_mut())();
meta.insert(id, (0..(cluster_nodes.len() as u32)).collect());

CLIDeployCluster {
id,
Expand Down
1 change: 1 addition & 0 deletions hydro_deploy/hydroflow_plus_cli_integration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ use serde::{Deserialize, Serialize};
#[derive(Default, Serialize, Deserialize)]
pub struct HydroflowPlusMeta {
pub clusters: HashMap<usize, Vec<u32>>,
pub subgraph_id: usize,
}
2 changes: 1 addition & 1 deletion hydro_deploy/hydroflow_plus_cli_integration/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl<'a> HfCluster<'a> for CLIRuntimeCluster<'a> {
fn ids(&self) -> impl Quoted<'a, &'a Vec<u32>> + Copy + 'a {
let cli = self.cli;
let self_id = self.id;
q!(cli.meta.as_ref().unwrap().clusters.get(&self_id).unwrap())
q!(cli.meta.clusters.get(&self_id).unwrap())
}
}

Expand Down
11 changes: 7 additions & 4 deletions hydroflow/src/util/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ pub async fn launch_flow(mut flow: Hydroflow<'_>) {
}
}

pub struct HydroCLI<T> {
pub struct HydroCLI<T = Option<()>> {
ports: RefCell<HashMap<String, ServerOrBound>>,
pub meta: Option<T>,
pub meta: T,
}

impl<T> HydroCLI<T> {
Expand All @@ -43,7 +43,7 @@ impl<T> HydroCLI<T> {
}
}

pub async fn init<T: DeserializeOwned>() -> HydroCLI<T> {
pub async fn init<T: DeserializeOwned + Default>() -> HydroCLI<T> {
let mut input = String::new();
std::io::stdin().read_line(&mut input).unwrap();
let trimmed = input.trim();
Expand Down Expand Up @@ -86,6 +86,9 @@ pub async fn init<T: DeserializeOwned>() -> HydroCLI<T> {

HydroCLI {
ports: RefCell::new(all_connected),
meta: bind_config.1.map(|s| serde_json::from_str(&s).unwrap()),
meta: bind_config
.1
.map(|b| serde_json::from_str(&b).unwrap())
.unwrap_or_default(),
}
}
5 changes: 2 additions & 3 deletions hydroflow_plus_test/examples/first_ten_distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,12 @@ async fn main() {
let builder = hydroflow_plus::GraphBuilder::new();
hydroflow_plus_test::first_ten::first_ten_distributed(
&builder,
&CLIDeployNodeBuilder::new(|id| {
&CLIDeployNodeBuilder::new(|| {
let host = create_host(&mut deployment);
deployment.add_service(
HydroflowCrate::new(".", host.clone())
.bin("first_ten_distributed")
.profile(profile)
.args(vec![id.to_string()]),
.profile(profile),
)
}),
);
Expand Down
6 changes: 2 additions & 4 deletions hydroflow_plus_test/examples/map_reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,17 @@ async fn main() {
let builder = hydroflow_plus::GraphBuilder::new();
hydroflow_plus_test::cluster::map_reduce(
&builder,
&CLIDeployNodeBuilder::new(|id| {
&CLIDeployNodeBuilder::new(|| {
let mut deployment = deployment.borrow_mut();
let host = create_host(&mut deployment);
deployment.add_service(
HydroflowCrate::new(".", host.clone())
.bin("map_reduce")
.profile(profile)
.args(vec![id.to_string()])
.display_name("leader"),
)
}),
&CLIDeployClusterBuilder::new(|id| {
&CLIDeployClusterBuilder::new(|| {
let mut deployment = deployment.borrow_mut();
(0..2)
.map(|idx| {
Expand All @@ -62,7 +61,6 @@ async fn main() {
HydroflowCrate::new(".", host.clone())
.bin("map_reduce")
.profile(profile)
.args(vec![id.to_string()])
.display_name(format!("cluster/{}", idx)),
)
})
Expand Down
5 changes: 2 additions & 3 deletions hydroflow_plus_test/examples/networked_basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,11 @@ async fn main() {
let builder = hydroflow_plus::GraphBuilder::new();
let (source_zero_port, _, _) = hydroflow_plus_test::networked::networked_basic(
&builder,
&CLIDeployNodeBuilder::new(|id| {
&CLIDeployNodeBuilder::new(|| {
deployment.add_service(
HydroflowCrate::new(".", localhost.clone())
.bin("networked_basic")
.profile("dev")
.args(vec![id.to_string()]),
.profile("dev"),
)
}),
);
Expand Down
10 changes: 4 additions & 6 deletions hydroflow_plus_test/examples/simple_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,26 +42,24 @@ async fn main() {
let builder = hydroflow_plus::GraphBuilder::new();
hydroflow_plus_test::cluster::simple_cluster(
&builder,
&CLIDeployNodeBuilder::new(|id| {
&CLIDeployNodeBuilder::new(|| {
let mut deployment = deployment.borrow_mut();
let host = create_host(&mut deployment);
deployment.add_service(
HydroflowCrate::new(".", host)
.bin("simple_cluster")
.profile(profile)
.args(vec![id.to_string()]),
.profile(profile),
)
}),
&CLIDeployClusterBuilder::new(|id| {
&CLIDeployClusterBuilder::new(|| {
let mut deployment = deployment.borrow_mut();
(0..2)
.map(|_| {
let host = create_host(&mut deployment);
deployment.add_service(
HydroflowCrate::new(".", host)
.bin("simple_cluster")
.profile(profile)
.args(vec![id.to_string()]),
.profile(profile),
)
})
.collect()
Expand Down
9 changes: 4 additions & 5 deletions hydroflow_plus_test/src/bin/first_ten_distributed.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
// cannot use hydroflow::main because connect_local_blocking causes a deadlock
#[tokio::main]
async fn main() {
let subgraph_id: usize = std::env::args().nth(1).unwrap().parse().unwrap();
let ports = hydroflow::util::cli::init().await;

let joined =
hydroflow_plus_test::first_ten::first_ten_distributed_runtime!(&ports, subgraph_id);

hydroflow::util::cli::launch_flow(joined).await;
hydroflow::util::cli::launch_flow(
hydroflow_plus_test::first_ten::first_ten_distributed_runtime!(&ports),
)
.await;
}
6 changes: 2 additions & 4 deletions hydroflow_plus_test/src/bin/many_to_many.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
// cannot use hydroflow::main because connect_local_blocking causes a deadlock
#[tokio::main]
async fn main() {
let subgraph_id: usize = std::env::args().nth(1).unwrap().parse().unwrap();
let ports = hydroflow::util::cli::init().await;

let joined = hydroflow_plus_test::cluster::many_to_many_runtime!(&ports, subgraph_id);

hydroflow::util::cli::launch_flow(joined).await;
hydroflow::util::cli::launch_flow(hydroflow_plus_test::cluster::many_to_many_runtime!(&ports))
.await;
}
6 changes: 2 additions & 4 deletions hydroflow_plus_test/src/bin/map_reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@ extern crate alloc;
// cannot use hydroflow::main because connect_local_blocking causes a deadlock
#[tokio::main]
async fn main() {
let subgraph_id: usize = std::env::args().nth(1).unwrap().parse().unwrap();
let ports = hydroflow::util::cli::init().await;

let joined = hydroflow_plus_test::cluster::map_reduce_runtime!(&ports, subgraph_id);

hydroflow::util::cli::launch_flow(joined).await;
hydroflow::util::cli::launch_flow(hydroflow_plus_test::cluster::map_reduce_runtime!(&ports))
.await;
}
8 changes: 4 additions & 4 deletions hydroflow_plus_test/src/bin/networked_basic.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
// cannot use hydroflow::main because connect_local_blocking causes a deadlock
#[tokio::main]
async fn main() {
let subgraph_id: usize = std::env::args().nth(1).unwrap().parse().unwrap();
let ports = hydroflow::util::cli::init().await;

let joined = hydroflow_plus_test::networked::networked_basic_runtime!(&ports, subgraph_id);

hydroflow::util::cli::launch_flow(joined).await;
hydroflow::util::cli::launch_flow(hydroflow_plus_test::networked::networked_basic_runtime!(
&ports
))
.await;
}
Loading
Loading