diff --git a/hydro_cli/src/core/deployment.rs b/hydro_cli/src/core/deployment.rs index 11390a274675..cab890d1dd90 100644 --- a/hydro_cli/src/core/deployment.rs +++ b/hydro_cli/src/core/deployment.rs @@ -106,15 +106,18 @@ impl Deployment { .collect::>(); self.services = active_services; - let all_services_start = - self.services - .iter() - .map(|service: &Weak>| async { - service.upgrade().unwrap().write().await.start().await?; - Ok(()) as Result<()> - }); - - futures::future::try_join_all(all_services_start).await?; + progress::ProgressTracker::with_group("start", None, || { + let all_services_start = + self.services + .iter() + .map(|service: &Weak>| async { + service.upgrade().unwrap().write().await.start().await?; + Ok(()) as Result<()> + }); + + futures::future::try_join_all(all_services_start) + }) + .await?; Ok(()) } diff --git a/hydro_cli/src/core/hydroflow_crate/ports.rs b/hydro_cli/src/core/hydroflow_crate/ports.rs index ba279bc44104..5794eb1f0b32 100644 --- a/hydro_cli/src/core/hydroflow_crate/ports.rs +++ b/hydro_cli/src/core/hydroflow_crate/ports.rs @@ -363,7 +363,7 @@ impl HydroflowSink for HydroflowPortConfig { merge.push(bind_type); merge.len() - 1 } else { - panic!() + panic!("Expected a merge connection definition") }; ServerConfig::MergeSelect(Box::new(base_config), merge_index) diff --git a/hydro_cli_examples/examples/pn_counter/main.rs b/hydro_cli_examples/examples/pn_counter/main.rs deleted file mode 100644 index 7e2caf00d143..000000000000 --- a/hydro_cli_examples/examples/pn_counter/main.rs +++ /dev/null @@ -1,164 +0,0 @@ -use std::cell::RefCell; -use std::collections::{HashMap, HashSet}; -use std::ops::Deref; -use std::rc::Rc; - -use hydroflow::serde::{Deserialize, Serialize}; -use hydroflow::util::cli::{ConnectedDemux, ConnectedDirect, ConnectedSink, ConnectedSource}; -use hydroflow::util::{deserialize_from_bytes, serialize_to_bytes}; -use hydroflow::{hydroflow_syntax, tokio}; - -#[derive(Serialize, Deserialize, Clone, Debug)] -struct IncrementRequest { - tweet_id: u64, - likes: i32, -} - -type NextStateType = (u64, Rc, Vec)>>); - -#[derive(Serialize, Deserialize, Clone, Debug)] -enum GossipOrIncrement { - Gossip(Vec), - Increment(u64, i32), -} - -#[hydroflow::main] -async fn main() { - let mut ports = hydroflow::util::cli::init().await; - - let my_id: Vec = serde_json::from_str(&std::env::args().nth(1).unwrap()).unwrap(); - let my_id = my_id[0]; - let num_replicas: Vec = serde_json::from_str(&std::env::args().nth(2).unwrap()).unwrap(); - let num_replicas = num_replicas[0]; - - let increment_requests = ports - .port("increment_requests") - .connect::() - .await - .into_source(); - - let query_responses = ports - .port("query_responses") - .connect::() - .await - .into_sink(); - - let to_peer = ports - .port("to_peer") - .connect::>() - .await - .into_sink(); - - let from_peer = ports - .port("from_peer") - .connect::() - .await - .into_source(); - - let f1 = async move { - #[cfg(target_os = "linux")] - loop { - let x = procinfo::pid::stat_self().unwrap(); - let bytes = x.rss * 1024 * 4; - println!("memory,{}", bytes); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - } - }; - - let df = hydroflow_syntax! { - next_state = union() - -> fold::<'static>(|| (HashMap::, Vec)>>>::new(), HashSet::new(), 0), |(cur_state, modified_tweets, last_tick): &mut (HashMap<_, _>, HashSet<_>, _), goi| { - if context.current_tick() != *last_tick { - modified_tweets.clear(); - } - - match goi { - GossipOrIncrement::Gossip(gossip) => { - for (counter_id, gossip_rc) in gossip.iter() { - let gossip_borrowed = gossip_rc.as_ref().borrow(); - let (pos, neg) = gossip_borrowed.deref(); - let cur_value = cur_state.entry(*counter_id).or_insert(Rc::new(RefCell::new(( - vec![0; num_replicas], vec![0; num_replicas] - )))); - let mut cur_value = cur_value.as_ref().borrow_mut(); - - for i in 0..num_replicas { - if pos[i] > cur_value.0[i] { - cur_value.0[i] = pos[i]; - modified_tweets.insert(*counter_id); - } - - if neg[i] > cur_value.1[i] { - cur_value.1[i] = neg[i]; - modified_tweets.insert(*counter_id); - } - } - } - } - GossipOrIncrement::Increment(counter_id, delta) => { - let cur_value = cur_state.entry(counter_id).or_insert(Rc::new(RefCell::new(( - vec![0; num_replicas], vec![0; num_replicas] - )))); - let mut cur_value = cur_value.as_ref().borrow_mut(); - - if delta > 0 { - cur_value.0[my_id] += delta as u32; - } else { - cur_value.1[my_id] += (-delta) as u32; - } - - modified_tweets.insert(counter_id); - } - } - - *last_tick = context.current_tick(); - }) - -> filter(|(_, _, tick)| *tick == context.current_tick()) - -> filter(|(_, modified_tweets, _)| !modified_tweets.is_empty()) - -> map(|(state, modified_tweets, _)| modified_tweets.iter().map(|t| (*t, state.get(t).unwrap().clone())).collect::>()) - -> tee(); - - source_stream(from_peer) - -> map(|x| deserialize_from_bytes::(&x.unwrap()).unwrap()) - -> next_state; - - source_stream(increment_requests) - -> map(|x| deserialize_from_bytes::(&x.unwrap()).unwrap()) - -> map(|t| GossipOrIncrement::Increment(t.tweet_id, t.likes)) - -> next_state; - - all_peers = source_iter(0..num_replicas) - -> filter(|x| *x != my_id); - - all_peers -> [0] broadcaster; - next_state -> [1] broadcaster; - broadcaster = cross_join::<'static, 'tick>() - -> map(|(peer, state)| { - (peer as u32, serialize_to_bytes(GossipOrIncrement::Gossip(state))) - }) - -> dest_sink(to_peer); - - next_state - -> flat_map(|a: Vec| { - a.into_iter().map(|(k, rc_array)| { - let rc_borrowed = rc_array.as_ref().borrow(); - let (pos, neg) = rc_borrowed.deref(); - (k, pos.iter().sum::() as i32 - neg.iter().sum::() as i32) - }).collect::>() - }) - -> map(serialize_to_bytes::<(u64, i32)>) - -> dest_sink(query_responses); - }; - - // initial memory - #[cfg(target_os = "linux")] - { - let x = procinfo::pid::stat_self().unwrap(); - let bytes = x.rss * 1024 * 4; - println!("memory,{}", bytes); - } - - let f1_handle = tokio::spawn(f1); - hydroflow::util::cli::launch_flow(df).await; - f1_handle.abort(); -} diff --git a/hydro_cli_examples/examples/pn_counter_delta/main.rs b/hydro_cli_examples/examples/pn_counter_delta/main.rs deleted file mode 100644 index aba606c56db6..000000000000 --- a/hydro_cli_examples/examples/pn_counter_delta/main.rs +++ /dev/null @@ -1,165 +0,0 @@ -use std::cell::RefCell; -use std::collections::HashMap; -use std::ops::Deref; -use std::rc::Rc; - -use hydroflow::serde::{Deserialize, Serialize}; -use hydroflow::util::cli::{ConnectedDemux, ConnectedDirect, ConnectedSink, ConnectedSource}; -use hydroflow::util::{deserialize_from_bytes, serialize_to_bytes}; -use hydroflow::{hydroflow_syntax, tokio}; - -#[derive(Serialize, Deserialize, Clone, Debug)] -struct IncrementRequest { - tweet_id: u64, - likes: i32, -} - -#[derive(Serialize, Deserialize, Clone, Debug)] -enum GossipOrIncrement { - Gossip(Vec<(u64, (usize, u32, u32))>), - Increment(u64, i32), -} - -type NextStateType = (u64, bool, Rc, Vec)>>); - -#[hydroflow::main] -async fn main() { - let mut ports = hydroflow::util::cli::init().await; - - let my_id: Vec = serde_json::from_str(&std::env::args().nth(1).unwrap()).unwrap(); - let my_id = my_id[0]; - let num_replicas: Vec = serde_json::from_str(&std::env::args().nth(2).unwrap()).unwrap(); - let num_replicas = num_replicas[0]; - - let increment_requests = ports - .port("increment_requests") - .connect::() - .await - .into_source(); - - let query_responses = ports - .port("query_responses") - .connect::() - .await - .into_sink(); - - let to_peer = ports - .port("to_peer") - .connect::>() - .await - .into_sink(); - - let from_peer = ports - .port("from_peer") - .connect::() - .await - .into_source(); - - let f1 = async move { - #[cfg(target_os = "linux")] - loop { - let x = procinfo::pid::stat_self().unwrap(); - let bytes = x.rss * 1024 * 4; - println!("memory,{}", bytes); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - } - }; - - let df = hydroflow_syntax! { - next_state = union() - -> fold::<'static>(|| (HashMap::, Vec)>>>::new(), HashMap::new(), 0), |(cur_state, modified_tweets, last_tick): &mut (HashMap<_, _>, HashMap<_, _>, _), goi| { - if context.current_tick() != *last_tick { - modified_tweets.clear(); - } - - match goi { - GossipOrIncrement::Gossip(gossip) => { - for (counter_id, (gossip_i, gossip_pos, gossip_neg)) in gossip.iter() { - let gossip_i = *gossip_i; - let cur_value = cur_state.entry(*counter_id).or_insert(Rc::new(RefCell::new(( - vec![0; num_replicas], vec![0; num_replicas] - )))); - let mut cur_value = cur_value.as_ref().borrow_mut(); - - if *gossip_pos > cur_value.0[gossip_i] { - cur_value.0[gossip_i] = *gossip_pos; - modified_tweets.entry(*counter_id).or_insert(false); - } - - if *gossip_neg > cur_value.1[gossip_i] { - cur_value.1[gossip_i] = *gossip_neg; - modified_tweets.entry(*counter_id).or_insert(false); - } - } - } - GossipOrIncrement::Increment(counter_id, delta) => { - let cur_value = cur_state.entry(counter_id).or_insert(Rc::new(RefCell::new(( - vec![0; num_replicas], vec![0; num_replicas] - )))); - let mut cur_value = cur_value.as_ref().borrow_mut(); - - if delta > 0 { - cur_value.0[my_id] += delta as u32; - } else { - cur_value.1[my_id] += (-delta) as u32; - } - - *modified_tweets.entry(counter_id).or_insert(false) |= true; - } - } - - *last_tick = context.current_tick(); - }) - -> filter(|(_, _, tick)| *tick == context.current_tick()) - -> filter(|(_, modified_tweets, _)| !modified_tweets.is_empty()) - -> map(|(state, modified_tweets, _)| modified_tweets.iter().map(|(t, is_local)| (*t, *is_local, state.get(t).unwrap().clone())).collect::>()) - -> tee(); - - source_stream(from_peer) - -> map(|x| deserialize_from_bytes::(&x.unwrap()).unwrap()) - -> next_state; - - source_stream(increment_requests) - -> map(|x| deserialize_from_bytes::(&x.unwrap()).unwrap()) - -> map(|t| GossipOrIncrement::Increment(t.tweet_id, t.likes)) - -> next_state; - - all_peers = source_iter(0..num_replicas) - -> filter(|x| *x != my_id); - - all_peers -> [0] broadcaster; - next_state -> [1] broadcaster; - broadcaster = cross_join::<'static, 'tick>() - -> map(|(peer, state): (_, Vec)| { - (peer as u32, state.iter().filter(|t| t.1).map(|(k, _, v)| (*k, (my_id, v.as_ref().borrow().0[my_id], v.as_ref().borrow().1[my_id]))).collect()) - }) - -> filter(|(_, gossip): &(_, Vec<_>)| !gossip.is_empty()) - -> map(|(peer, gossip): (_, _)| { - (peer, serialize_to_bytes(GossipOrIncrement::Gossip(gossip))) - }) - -> dest_sink(to_peer); - - next_state - -> flat_map(|a: Vec| { - a.into_iter().map(|(k, _, rc_array)| { - let rc_borrowed = rc_array.as_ref().borrow(); - let (pos, neg) = rc_borrowed.deref(); - (k, pos.iter().sum::() as i32 - neg.iter().sum::() as i32) - }).collect::>() - }) - -> map(serialize_to_bytes::<(u64, i32)>) - -> dest_sink(query_responses); - }; - - // initial memory - #[cfg(target_os = "linux")] - { - let x = procinfo::pid::stat_self().unwrap(); - let bytes = x.rss * 1024 * 4; - println!("memory,{}", bytes); - } - - let f1_handle = tokio::spawn(f1); - hydroflow::util::cli::launch_flow(df).await; - f1_handle.abort(); -} diff --git a/hydro_cli_examples/examples/topolotree/main.rs b/hydro_cli_examples/examples/topolotree/main.rs deleted file mode 100644 index 1725d7bdf9c5..000000000000 --- a/hydro_cli_examples/examples/topolotree/main.rs +++ /dev/null @@ -1,352 +0,0 @@ -use std::collections::{HashMap, HashSet}; -use std::hash::{Hash, Hasher}; - -use hydroflow::serde::{Deserialize, Serialize}; -use hydroflow::util::cli::{ConnectedDirect, ConnectedSink, ConnectedSource}; -use hydroflow::util::{deserialize_from_bytes, serialize_to_bytes}; -use hydroflow::{hydroflow_syntax, tokio}; - -#[derive(Serialize, Deserialize, Clone, Debug)] -struct IncrementRequest { - tweet_id: u64, - likes: i32, -} - -#[derive(Serialize, Deserialize, Default, Copy, Clone, Debug)] -struct TimestampedValue { - pub value: T, - pub timestamp: u32, -} - -impl TimestampedValue { - pub fn merge_from(&mut self, other: TimestampedValue) -> bool { - if other.timestamp > self.timestamp { - self.value = other.value; - self.timestamp = other.timestamp; - true - } else { - false - } - } - - pub fn update(&mut self, updater: impl Fn(&T) -> T) { - self.value = updater(&self.value); - self.timestamp += 1; - } -} - -impl PartialEq for TimestampedValue { - fn eq(&self, other: &Self) -> bool { - self.timestamp == other.timestamp - } -} - -impl Eq for TimestampedValue {} - -impl Hash for TimestampedValue { - fn hash(&self, state: &mut H) { - self.timestamp.hash(state); - } -} - -#[hydroflow::main] -async fn main() { - let mut ports = hydroflow::util::cli::init().await; - let increment_requests = ports - .port("increment_requests") - .connect::() - .await - .into_source(); - - let query_responses = ports - .port("query_responses") - .connect::() - .await - .into_sink(); - - let to_parent = ports - .port("to_parent") - .connect::() - .await - .into_sink(); - - let from_parent = ports - .port("from_parent") - .connect::() - .await - .into_source(); - - let to_left = ports - .port("to_left") - .connect::() - .await - .into_sink(); - - let from_left = ports - .port("from_left") - .connect::() - .await - .into_source(); - - let to_right = ports - .port("to_right") - .connect::() - .await - .into_sink(); - - let from_right = ports - .port("from_right") - .connect::() - .await - .into_source(); - - let f1 = async move { - #[cfg(target_os = "linux")] - loop { - let x = procinfo::pid::stat_self().unwrap(); - let bytes = x.rss * 1024 * 4; - println!("memory,{}", bytes); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - } - }; - - type UpdateType = (u64, i32); - - #[allow(clippy::type_complexity)] - let df = hydroflow_syntax! { - from_parent = source_stream(from_parent) - -> map(|x| deserialize_from_bytes::)>>(x.unwrap()).unwrap()) - -> fold::<'static>( - || (HashMap::>::new(), HashSet::new(), 0), - |(prev, modified_tweets, prev_tick): &mut (HashMap<_, TimestampedValue>, HashSet<_>, _), req: Vec<(u64, TimestampedValue)>| { - if *prev_tick != context.current_tick() { - modified_tweets.clear(); - } - - for (k, v) in req { - let updated = if let Some(e) = prev.get_mut(&k) { - e.merge_from(v) - } else { - prev.insert(k, v); - true - }; - - if updated { - modified_tweets.insert(k); - } - } - - *prev_tick = context.current_tick(); - } - ) - -> filter(|(_, _, tick)| *tick == context.current_tick()) - -> flat_map(|(state, modified_tweets, _)| modified_tweets.iter().map(|t| (*t, *state.get(t).unwrap())).collect::>()) - -> tee(); - - from_left = source_stream(from_left) - -> map(|x| deserialize_from_bytes::)>>(x.unwrap()).unwrap()) - -> fold::<'static>( - || (HashMap::>::new(), HashSet::new(), 0), - |(prev, modified_tweets, prev_tick): &mut (HashMap<_, TimestampedValue>, HashSet<_>, _), req: Vec<(u64, TimestampedValue)>| { - if *prev_tick != context.current_tick() { - modified_tweets.clear(); - } - - for (k, v) in req { - let updated = if let Some(e) = prev.get_mut(&k) { - e.merge_from(v) - } else { - prev.insert(k, v); - true - }; - - if updated { - modified_tweets.insert(k); - } - } - - *prev_tick = context.current_tick(); - } - ) - -> filter(|(_, _, tick)| *tick == context.current_tick()) - -> flat_map(|(state, modified_tweets, _)| modified_tweets.iter().map(|t| (*t, *state.get(t).unwrap())).collect::>()) - -> tee(); - - from_right = source_stream(from_right) - -> map(|x| deserialize_from_bytes::)>>(x.unwrap()).unwrap()) - -> fold::<'static>( - || (HashMap::>::new(), HashSet::new(), 0), - |(prev, modified_tweets, prev_tick): &mut (HashMap<_, TimestampedValue>, HashSet<_>, _), req: Vec<(u64, TimestampedValue)>| { - if *prev_tick != context.current_tick() { - modified_tweets.clear(); - } - - for (k, v) in req { - let updated = if let Some(e) = prev.get_mut(&k) { - e.merge_from(v) - } else { - prev.insert(k, v); - true - }; - - if updated { - modified_tweets.insert(k); - } - } - - *prev_tick = context.current_tick(); - } - ) - -> filter(|(_, _, tick)| *tick == context.current_tick()) - -> flat_map(|(state, modified_tweets, _)| modified_tweets.iter().map(|t| (*t, *state.get(t).unwrap())).collect::>()) - -> tee(); - - from_local = source_stream(increment_requests) - -> map(|x| deserialize_from_bytes::(&x.unwrap()).unwrap()) - -> map(|x| (x.tweet_id, x.likes)) - -> fold::<'static>( - || (HashMap::>::new(), HashSet::new(), 0), - |(prev, modified_tweets, prev_tick): &mut (HashMap<_, TimestampedValue>, HashSet<_>, usize), req: UpdateType| { - if *prev_tick != context.current_tick() { - modified_tweets.clear(); - } - - prev.entry(req.0).or_default().update(|v| v + req.1); - modified_tweets.insert(req.0); - *prev_tick = context.current_tick(); - } - ) - -> filter(|(_, _, tick)| *tick == context.current_tick()) - -> flat_map(|(state, modified_tweets, _)| modified_tweets.iter().map(|t| (*t, *state.get(t).unwrap())).collect::>()) - -> tee(); - - to_right = union(); - - from_parent -> map(|v| (0, v)) -> to_right; - from_left -> map(|v| (1, v)) -> to_right; - from_local -> map(|v| (2, v)) -> to_right; - - to_right - -> fold::<'static>( - || (vec![HashMap::>::new(); 3], HashMap::>::new(), HashSet::new(), 0), - |(each_source, acc_source, modified_tweets, prev_tick): &mut (Vec>>, HashMap<_, TimestampedValue>, HashSet<_>, usize), (source_i, (key, v)): (usize, _)| { - if *prev_tick != context.current_tick() { - modified_tweets.clear(); - } - - let updated = each_source[source_i].entry(key).or_default().merge_from(v); - - if updated { - acc_source.entry(key).or_default().update(|_| each_source.iter().map(|s| s.get(&key).map(|t| t.value).unwrap_or_default()).sum()); - modified_tweets.insert(key); - } - - *prev_tick = context.current_tick(); - } - ) - -> filter(|(_, _, _, tick)| *tick == context.current_tick()) - -> map(|(_, state, modified_tweets, _)| modified_tweets.iter().map(|t| (*t, *state.get(t).unwrap())).collect()) - -> map(serialize_to_bytes::)>>) - -> dest_sink(to_right); - - to_left = union(); - - from_parent -> map(|v| (0, v)) -> to_left; - from_right -> map(|v| (1, v)) -> to_left; - from_local -> map(|v| (2, v)) -> to_left; - - to_left - -> fold::<'static>( - || (vec![HashMap::>::new(); 3], HashMap::>::new(), HashSet::new(), 0), - |(each_source, acc_source, modified_tweets, prev_tick): &mut (Vec>>, HashMap<_, TimestampedValue>, HashSet<_>, usize), (source_i, (key, v)): (usize, _)| { - if *prev_tick != context.current_tick() { - modified_tweets.clear(); - } - - let updated = each_source[source_i].entry(key).or_default().merge_from(v); - - if updated { - acc_source.entry(key).or_default().update(|_| each_source.iter().map(|s| s.get(&key).map(|t| t.value).unwrap_or_default()).sum()); - modified_tweets.insert(key); - } - - *prev_tick = context.current_tick(); - } - ) - -> filter(|(_, _, _, tick)| *tick == context.current_tick()) - -> map(|(_, state, modified_tweets, _)| modified_tweets.iter().map(|t| (*t, *state.get(t).unwrap())).collect()) - -> map(serialize_to_bytes::)>>) - -> dest_sink(to_left); - - to_parent = union(); - - from_right -> map(|v| (0, v)) -> to_parent; - from_left -> map(|v| (1, v)) -> to_parent; - from_local -> map(|v| (2, v)) -> to_parent; - - to_parent - -> fold::<'static>( - || (vec![HashMap::>::new(); 3], HashMap::>::new(), HashSet::new(), 0), - |(each_source, acc_source, modified_tweets, prev_tick): &mut (Vec>>, HashMap<_, TimestampedValue>, HashSet<_>, usize), (source_i, (key, v)): (usize, _)| { - if *prev_tick != context.current_tick() { - modified_tweets.clear(); - } - - let updated = each_source[source_i].entry(key).or_default().merge_from(v); - - if updated { - acc_source.entry(key).or_default().update(|_| each_source.iter().map(|s| s.get(&key).map(|t| t.value).unwrap_or_default()).sum()); - modified_tweets.insert(key); - } - - *prev_tick = context.current_tick(); - } - ) - -> filter(|(_, _, _, tick)| *tick == context.current_tick()) - -> map(|(_, state, modified_tweets, _)| modified_tweets.iter().map(|t| (*t, *state.get(t).unwrap())).collect()) - -> map(serialize_to_bytes::)>>) - -> dest_sink(to_parent); - - to_query = union(); - - from_parent -> map(|v| (0, v)) -> to_query; - from_left -> map(|v| (1, v)) -> to_query; - from_right -> map(|v| (2, v)) -> to_query; - from_local -> map(|v| (3, v)) -> to_query; - - to_query - -> fold::<'static>( - || (vec![HashMap::>::new(); 4], HashMap::>::new(), HashSet::new(), 0), - |(each_source, acc_source, modified_tweets, prev_tick): &mut (Vec>>, HashMap<_, TimestampedValue>, HashSet<_>, usize), (source_i, (key, v)): (usize, _)| { - if *prev_tick != context.current_tick() { - modified_tweets.clear(); - } - - let updated = each_source[source_i].entry(key).or_default().merge_from(v); - - if updated { - acc_source.entry(key).or_default().update(|_| each_source.iter().map(|s| s.get(&key).map(|t| t.value).unwrap_or_default()).sum()); - modified_tweets.insert(key); - } - - *prev_tick = context.current_tick(); - } - ) - -> filter(|(_, _, _, tick)| *tick == context.current_tick()) - -> flat_map(|(_, state, modified_tweets, _)| modified_tweets.iter().map(|t| (*t, state.get(t).unwrap().value)).collect::>()) - -> map(serialize_to_bytes::<(u64, i32)>) - -> dest_sink(query_responses); - }; - - // initial memory - #[cfg(target_os = "linux")] - { - let x = procinfo::pid::stat_self().unwrap(); - let bytes = x.rss * 1024 * 4; - println!("memory,{}", bytes); - } - - let f1_handle = tokio::spawn(f1); - hydroflow::util::cli::launch_flow(df).await; - f1_handle.abort(); -} diff --git a/topolotree/src/latency_measure.rs b/topolotree/src/latency_measure.rs index afc2fe86ee4c..37b9b09ccf1f 100644 --- a/topolotree/src/latency_measure.rs +++ b/topolotree/src/latency_measure.rs @@ -28,8 +28,9 @@ async fn main() { .await .into_source(); - let num_clients: Vec = serde_json::from_str(&std::env::args().nth(1).unwrap()).unwrap(); - let num_clients = num_clients[0]; + let num_clients: u64 = std::env::args().nth(1).unwrap().parse().unwrap(); + let partition_n: u64 = std::env::args().nth(2).unwrap().parse().unwrap(); + let keys_per_partition: u64 = std::env::args().nth(3).unwrap().parse().unwrap(); let atomic_counter = Arc::new(AtomicU64::new(0)); let atomic_borrow = atomic_counter.clone(); @@ -71,10 +72,15 @@ async fn main() { #[cfg(debug_assertions)] let mut count_tracker = HashMap::new(); + let mut next_base: u64 = 0; + loop { - let id = ((rand::random::() % 1024) / (num_clients as u64)) - * (num_clients as u64) - + (i as u64); + let id = ((((next_base % keys_per_partition) + + (partition_n * keys_per_partition)) + / (num_clients)) + * num_clients) + + i; + next_base += 1; let increment = rand::random::(); let change = if increment { 1 } else { -1 }; let start = Instant::now(); @@ -102,7 +108,12 @@ async fn main() { let updated = deserialize_from_bytes::(end_node.next().await.unwrap().unwrap()) .unwrap(); - if queues[(updated.key % (num_clients as u64)) as usize] + + if updated.key / keys_per_partition != partition_n { + continue; + } + + if queues[(updated.key % num_clients) as usize] .send(updated.value) .is_err() { diff --git a/topolotree/src/main.rs b/topolotree/src/main.rs index 4f9e4540b7b8..fb1506bb2b93 100644 --- a/topolotree/src/main.rs +++ b/topolotree/src/main.rs @@ -2,7 +2,7 @@ mod tests; use std::cell::RefCell; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fmt::{Debug, Display}; use std::io; use std::rc::Rc; @@ -33,7 +33,8 @@ impl Display for NodeID { type PostNeighborJoin = (((u64, Option), (i64, usize)), NodeID); fn run_topolotree( - neighbors: Vec, + self_id: u32, + init_neighbors: Vec, input_recv: impl Stream> + Unpin + 'static, increment_requests: impl Stream> + Unpin + 'static, output_send: tokio::sync::mpsc::UnboundedSender<(u32, Bytes)>, @@ -58,17 +59,32 @@ fn run_topolotree( parsed_input = source_stream(input_recv) -> map(Result::unwrap) -> map(|(src, x)| (NodeID(src), deserialize_from_bytes::(&x).unwrap())) - -> demux(|(src, msg), var_args!(payload, ping, pong)| { + -> demux(|(src, msg), var_args!(payload, ping, pong, neighbor_of_neighbor)| { match msg { TopolotreeMessage::Payload(p) => payload.give((src, p)), TopolotreeMessage::Ping() => ping.give((src, ())), TopolotreeMessage::Pong() => pong.give((src, ())), + TopolotreeMessage::NeighborOfNeighbor(its_neighbor, add) => neighbor_of_neighbor.give((src, (NodeID(its_neighbor), add))) } }); from_neighbors = parsed_input[payload]; pings = parsed_input[ping] -> tee(); pongs = parsed_input[pong] -> tee(); + neighbor_of_neighbor_ops = parsed_input[neighbor_of_neighbor] -> tee(); + + neighbor_of_neighbor = + neighbor_of_neighbor_ops + -> map(|(src, (neighbor, add))| (src, (neighbor, add))) + -> fold_keyed::<'static>(HashSet::new, |acc: &mut HashSet, (neighbor, add)| { + if add { + acc.insert(neighbor); + } else { + acc.remove(&neighbor); + } + }) + -> flat_map(|(src, acc)| acc.into_iter().map(move |neighbor| (src, neighbor))) + -> tee(); pings -> map(|(src, _)| (src, TopolotreeMessage::Pong())) -> output; @@ -79,19 +95,41 @@ fn run_topolotree( -> map(|(src, _)| (src, TopolotreeMessage::Ping())) -> output; - pongs -> dead_neighbors; - pings -> dead_neighbors; - new_neighbors -> map(|neighbor| (neighbor, ())) -> dead_neighbors; // fake pong - dead_neighbors = union() -> fold_keyed::<'static>(Instant::now, |acc: &mut Instant, _| { + pongs -> dead_maybe_neighbors; + pings -> dead_maybe_neighbors; + new_neighbors -> map(|neighbor| (neighbor, ())) -> dead_maybe_neighbors; // fake pong + dead_maybe_neighbors = union() -> fold_keyed::<'static>(Instant::now, |acc: &mut Instant, _| { *acc = Instant::now(); }) -> filter_map(|(node_id, acc)| { - if acc.elapsed().as_secs() > 5 { + if acc.elapsed().as_secs() >= 5 { Some(node_id) } else { None } - }) -> tee(); + }) + -> map(|n| (n, ())) + -> [0]dead_neighbors; + + neighbors -> map(|n| (n, ())) -> [1]dead_neighbors; + dead_neighbors = join() + -> map(|(n, _)| n) + -> tee(); + + // TODO(shadaj): only remove when we get an ack from the new leader + dead_neighbors -> removed_neighbors; + + dead_neighbors -> map(|n| (n, ())) -> [0]min_neighbor_of_dead_neighbor; + neighbor_of_neighbor -> [1]min_neighbor_of_dead_neighbor; + min_neighbor_of_dead_neighbor = join() + -> map(|(dead, ((), neighbor))| (dead, neighbor)) + -> filter(|(_, neighbor)| neighbor.0 != self_id) + -> reduce_keyed(|acc: &mut NodeID, n: NodeID| { + if n.0 < acc.0 { + *acc = n; + } + }) + -> tee(); from_neighbors -> map(|(src, payload): (NodeID, Payload)| ((payload.key, src), (payload.key, payload.contents))) @@ -102,7 +140,12 @@ fn run_topolotree( acc.1 = context.current_tick(); } }) - -> map(|((key, src), (payload, change_tick))| ((key, Some(src)), (payload.data, change_tick))) + -> map(|((key, src), (payload, change_tick))| (src, ((key, Some(src)), (payload.data, change_tick)))) + -> [1]from_actual_neighbors; + + neighbors -> map(|n| (n, ())) -> [0]from_actual_neighbors; + from_actual_neighbors = join() + -> map(|(_, (_, payload))| payload) -> from_neighbors_or_local; local_value = source_stream(increment_requests) @@ -121,19 +164,29 @@ fn run_topolotree( from_neighbors_or_local = union() -> tee(); from_neighbors_or_local -> [0]all_neighbor_data; - new_neighbors = source_iter(neighbors) + new_neighbors = source_iter(init_neighbors) -> map(NodeID) -> tee(); - new_neighbors - -> persist() - -> [pos]neighbors; - dead_neighbors -> [neg]neighbors; - neighbors = difference() + new_neighbors -> map(|n| (n, true)) -> neighbors; + removed_neighbors = map(|n| (n, false)) -> neighbors; + neighbors = union() + -> map(|(neighbor, add)| (neighbor, !add)) + -> sort_by_key(|(_, remove)| remove) // process adds first + -> fold::<'static>(vec![], |acc: &mut Vec, (neighbor, remove): (NodeID, bool)| { + if remove { + acc.retain(|x| *x != neighbor); + } else { + acc.push(neighbor); + } + }) + -> flatten() -> tee(); neighbors -> [1]all_neighbor_data; + // broadcast_neighbors = cross_join() TODO + query_result = from_neighbors_or_local -> map(|((key, _), payload): ((u64, _), (i64, usize))| { (key, payload) @@ -183,7 +236,12 @@ fn run_topolotree( #[hydroflow::main] async fn main() { let args: Vec = std::env::args().skip(1).collect(); - let neighbors: Vec = args.into_iter().map(|x| x.parse().unwrap()).collect(); + let self_id: u32 = args[0].parse().unwrap(); + let neighbors: Vec = args + .into_iter() + .skip(1) + .map(|x| x.parse().unwrap()) + .collect(); let mut ports = hydroflow::util::cli::init().await; @@ -228,7 +286,14 @@ async fn main() { } }); - let flow = run_topolotree(neighbors, input_recv, operations_send, chan_tx, query_tx); + let flow = run_topolotree( + self_id, + neighbors, + input_recv, + operations_send, + chan_tx, + query_tx, + ); let f1 = async move { #[cfg(target_os = "linux")] diff --git a/topolotree/src/protocol.rs b/topolotree/src/protocol.rs index d8a480f59c5d..4ff9aeca6e48 100644 --- a/topolotree/src/protocol.rs +++ b/topolotree/src/protocol.rs @@ -13,6 +13,7 @@ pub enum TopolotreeMessage { Payload(Payload), Ping(), Pong(), + NeighborOfNeighbor(u32, bool), } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)] diff --git a/topolotree/src/tests.rs b/topolotree/src/tests.rs index ac4112deff96..8f998ce8c3ef 100644 --- a/topolotree/src/tests.rs +++ b/topolotree/src/tests.rs @@ -69,6 +69,7 @@ async fn simple_payload_test() { simulate_input(&mut input_send, (1, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 1, data: 2 } }))).unwrap(); let mut flow = run_topolotree( + 0, neighbors, input_recv, operations_rx, @@ -109,6 +110,7 @@ async fn idempotence_test() { }; let mut flow = run_topolotree( + 0, neighbors, input_recv, operations_rx, @@ -149,6 +151,7 @@ async fn backwards_in_time_test() { }; let mut flow = run_topolotree( + 0, neighbors, input_recv, operations_rx, @@ -189,6 +192,7 @@ async fn multiple_input_sources_test() { }; let mut flow = run_topolotree( + 0, neighbors, input_recv, operations_rx, @@ -224,6 +228,7 @@ async fn operations_across_ticks() { let (query_send, mut query_recv) = unbounded_channel::(); let mut flow = run_topolotree( + 0, neighbors, input_recv, operations_rx, @@ -285,6 +290,7 @@ async fn operations_multiple_keys() { let (query_send, mut query_recv) = unbounded_channel::(); let mut flow = run_topolotree( + 0, neighbors, input_recv, operations_rx, @@ -370,6 +376,7 @@ async fn gossip_multiple_keys() { let (query_send, mut query_recv) = unbounded_channel::(); let mut flow = run_topolotree( + 0, neighbors, input_recv, operations_rx, @@ -438,6 +445,7 @@ async fn ping_pongs() { let (query_send, mut _query_recv) = unbounded_channel::(); let mut flow = run_topolotree( + 0, neighbors, input_recv, operations_rx, @@ -468,3 +476,99 @@ async fn ping_pongs() { (1, TopolotreeMessage::Ping()) ])); } + +#[hydroflow::test(start_paused = true)] +async fn no_gossip_if_dead() { + let neighbors: Vec = vec![1, 2, 3]; + + let (mut _operations_tx, operations_rx) = unbounded_channel::>(); + let (mut input_send, input_recv) = unbounded_channel::>(); + let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>(); + let (query_send, mut _query_recv) = unbounded_channel::(); + + let mut flow = run_topolotree( + 0, + neighbors, + input_recv, + operations_rx, + output_send, + query_send, + ); + + #[rustfmt::skip] + { + simulate_input(&mut input_send, (1, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 0, data: 5 } }))).unwrap(); + simulate_input(&mut input_send, (2, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 0, data: 5 } }))).unwrap(); + }; + + flow.run_tick(); + + #[rustfmt::skip] + assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ + (1, TopolotreeMessage::Ping()), + (2, TopolotreeMessage::Ping()), + (3, TopolotreeMessage::Ping()), + (1, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 2, data: 5 } })), + (2, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 2, data: 5 } })), + (3, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 2, data: 10 } })), + ])); + + tokio::time::advance(Duration::from_millis(500)).await; + + #[rustfmt::skip] + { + simulate_input(&mut input_send, (1, TopolotreeMessage::Pong())).unwrap(); + simulate_input(&mut input_send, (2, TopolotreeMessage::Pong())).unwrap(); + simulate_input(&mut input_send, (3, TopolotreeMessage::Pong())).unwrap(); + }; + + flow.run_tick(); + + tokio::time::advance(Duration::from_millis(500)).await; + + #[rustfmt::skip] + { + simulate_input(&mut input_send, (1, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 2, data: 6 } }))).unwrap(); + simulate_input(&mut input_send, (2, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 2, data: 6 } }))).unwrap(); + }; + + flow.run_tick(); + + #[rustfmt::skip] + assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ + (1, TopolotreeMessage::Ping()), + (2, TopolotreeMessage::Ping()), + (3, TopolotreeMessage::Ping()), + (1, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 4, data: 6 } })), + (2, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 4, data: 6 } })), + (3, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 4, data: 12 } })), + ])); + + tokio::time::advance(Duration::from_secs(5)).await; + + #[rustfmt::skip] + { + simulate_input(&mut input_send, (1, TopolotreeMessage::Pong())).unwrap(); + simulate_input(&mut input_send, (3, TopolotreeMessage::Pong())).unwrap(); + simulate_input(&mut input_send, (1, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 3, data: 7 } }))).unwrap(); + }; + + flow.run_tick(); + + // at this point, node 2 should be marked as dead + + #[rustfmt::skip] + assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ + (1, TopolotreeMessage::Ping()), + (1, TopolotreeMessage::Ping()), + (1, TopolotreeMessage::Ping()), + (1, TopolotreeMessage::Ping()), + (1, TopolotreeMessage::Ping()), + (3, TopolotreeMessage::Ping()), + (3, TopolotreeMessage::Ping()), + (3, TopolotreeMessage::Ping()), + (3, TopolotreeMessage::Ping()), + (3, TopolotreeMessage::Ping()), + (3, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 5, data: 7 } })), + ])); +} diff --git a/topolotree/topolotree_latency.hydro.py b/topolotree/topolotree_latency.hydro.py index cda984202fde..cfd445445f5f 100644 --- a/topolotree/topolotree_latency.hydro.py +++ b/topolotree/topolotree_latency.hydro.py @@ -48,6 +48,7 @@ async def run_experiment( tree_arg, depth_arg, clients_arg, + partitions_arg, is_gcp, gcp_vpc, ): @@ -58,6 +59,8 @@ async def run_experiment( num_clients = int(clients_arg) + num_partitions = int(partitions_arg) + print(f"Launching benchmark with protocol {tree_arg}, {num_replicas} replicas, and {num_clients} clients") currently_deployed = [] @@ -90,7 +93,7 @@ def create_machine(): ), profile=profile, bin="topolotree", - args=[str(neighbor) for neighbor in neighbors[i]], + args=([str(i)] + [str(neighbor) for neighbor in neighbors[i]]), on=create_machine(), ) if is_tree else deployment.HydroflowCrate( src=str( @@ -118,42 +121,45 @@ def create_machine(): if is_tree: leaves = get_leaves_in_binary_tree(list(range(num_replicas))) - source = cluster[leaves[0]] - dest = cluster[leaves[-1]] + sources = list(cluster[i] for i in leaves[: len(leaves) // 2])[:num_partitions] + dests = list(cluster[i] for i in leaves[len(leaves) // 2 :])[:num_partitions] else: - source = cluster[0] - dest = cluster[-1] + sources = list(cluster[i] for i in list(range(num_replicas))[: num_replicas // 2])[:num_partitions] + dests = list(cluster[i] for i in list(range(num_replicas))[num_replicas // 2 :])[:num_partitions] for node in all_nodes: - if node is not dest: - node.ports.query_responses.send_to(hydro.null()) - if node is not source: + if node not in sources: hydro.null().send_to(node.ports.increment_requests) + if node not in dests: + node.ports.query_responses.send_to(hydro.null()) - latency_measurer = deployment.HydroflowCrate( + drivers = [deployment.HydroflowCrate( src=str(Path(__file__).parent.absolute()), profile=profile, bin="latency_measure", - args=[json.dumps([num_clients])], + args=[str(num_clients), str(i), str(1024)], on=create_machine(), - ) + ) for i in range(num_partitions)] - latency_measurer.ports.increment_start_node.send_to( - source.ports.increment_requests.merge() - ) - dest.ports.query_responses.send_to(latency_measurer.ports.end_node_query) + for i in range(num_partitions): + drivers[i].ports.increment_start_node.send_to( + sources[i].ports.increment_requests + ) + dests[i].ports.query_responses.send_to(drivers[i].ports.end_node_query) await deployment.deploy() print("Deployed!") - latency = [] + latency_per_driver = [[] for _ in range(num_partitions)] memory_per_node = [[] for _ in range(num_replicas)] - throughput_raw = [] - - throughput = [] + throughput_raw_per_driver = [[] for _ in range(num_partitions)] + throughput_per_driver = [[] for _ in range(num_partitions)] - latency_stdout = await latency_measurer.stdout() + latency_streams_with_index = [ + stream.map(await node.stdout(), lambda x, i=i: (i, x)) + for i, node in enumerate(drivers) + ] memories_streams_with_index = [ stream.map(await node.stdout(), lambda x, i=i: (i, x)) @@ -174,28 +180,26 @@ async def memory_plotter(): async def latency_plotter(): try: - async for line in latency_stdout: - line_split = line.split(",") - if line_split[0] == "throughput": - count = int(line_split[1]) - period = float(line_split[2]) - throughput_raw.append([count, period]) - throughput.append(count / period) - elif line_split[0] == "latency": - number = int(line_split[1]) # microseconds - latency.append(number) + async with stream.merge(*latency_streams_with_index).stream() as merged: + async for (driver_idx, line) in merged: + line_split = line.split(",") + if line_split[0] == "throughput": + count = int(line_split[1]) + period = float(line_split[2]) + throughput_raw_per_driver[driver_idx].append([count, period]) + throughput_per_driver[driver_idx].append(count / period) + elif line_split[0] == "latency": + number = int(line_split[1]) # microseconds + latency_per_driver[driver_idx].append(number) except asyncio.CancelledError: return latency_plotter_task = asyncio.create_task(latency_plotter()) await deployment.start() - print("Started! Please wait 30 seconds to collect data.") + print("Started! Please wait 15 seconds to collect data.") - await asyncio.sleep(30) - - await latency_measurer.stop() - await asyncio.gather(*[node.stop() for node in all_nodes]) + await asyncio.sleep(15) memory_plotter_task.cancel() await memory_plotter_task @@ -203,6 +207,9 @@ async def latency_plotter(): latency_plotter_task.cancel() await latency_plotter_task + await asyncio.gather(*[node.stop() for node in drivers]) + await asyncio.gather(*[node.stop() for node in all_nodes]) + def summarize(v, kind): print("mean = ", np.mean(v)) print("std = ", np.std(v)) @@ -230,21 +237,25 @@ def summarize(v, kind): summaries_file.write(str(np.percentile(v, 1))) summaries_file.flush() - print("latency:") - summarize(latency, "latency") + all_latencies = [] + all_throughputs = [] + all_throughputs_raw = [] + for i in range(num_partitions): + all_latencies += latency_per_driver[i] + all_throughputs += throughput_per_driver[i] + all_throughputs_raw += throughput_raw_per_driver[i] - print("throughput:") - summarize(throughput, "throughput") + print("latency:") + summarize(all_latencies, "latency") - init_memory = [memory[0] for memory in memory_per_node] - print("init memory:") - summarize(init_memory, "init_memory") + print("throughput per driver:") + summarize(all_throughputs, "throughput_per_driver") - final_memory = [memory[-1] for memory in memory_per_node] - print("final memory:") - summarize(final_memory, "final_memory") + memory_delta = [memory[-1] - memory[0] for memory in memory_per_node] + print("memory delta:") + summarize(memory_delta, "memory_delta") - pd.DataFrame(latency).to_csv( + pd.DataFrame(all_latencies).to_csv( "latency_" + tree_arg + "_tree_depth_" @@ -257,8 +268,8 @@ def summarize(v, kind): index=False, header=["latency"], ) - pd.DataFrame(throughput_raw).to_csv( - "throughput_" + pd.DataFrame(all_throughputs_raw).to_csv( + "throughput_per_driver_" + tree_arg + "_tree_depth_" + str(tree_depth) @@ -270,8 +281,8 @@ def summarize(v, kind): index=False, header=["count", "period"], ) - pd.DataFrame(init_memory).to_csv( - "init_memory_" + pd.DataFrame(memory_delta).to_csv( + "memory_delta_" + tree_arg + "_tree_depth_" + str(tree_depth) @@ -281,20 +292,7 @@ def summarize(v, kind): + experiment_id + ".csv", index=False, - header=["memory"], - ) - pd.DataFrame(final_memory).to_csv( - "final_memory_" - + tree_arg - + "_tree_depth_" - + str(tree_depth) - + "_num_clients_" - + str(num_clients) - + "_" - + experiment_id - + ".csv", - index=False, - header=["memory"], + header=["memory_delta"], ) for machine in currently_deployed: @@ -328,19 +326,21 @@ async def main(args): for depth_arg in args[2].split(","): for tree_arg in args[1].split(","): for num_clients_arg in args[3].split(","): - await run_experiment( - deployment, - localhost, - "dev" if args[0] == "local" else None, - pool, - experiment_id, - summaries_file, - tree_arg, - depth_arg, - num_clients_arg, - args[0] == "gcp", - network, - ) + for num_partitions_arg in args[4].split(","): + await run_experiment( + deployment, + localhost, + "dev" if args[0] == "local" else None, + pool, + experiment_id, + summaries_file, + tree_arg, + depth_arg, + num_clients_arg, + num_partitions_arg, + args[0] == "gcp", + network, + ) summaries_file.close()