Skip to content

Commit

Permalink
feat(topolotree): implement core fault tolerance protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
shadaj committed Nov 30, 2023
1 parent 2e7e4cb commit c4b85db
Show file tree
Hide file tree
Showing 7 changed files with 295 additions and 111 deletions.
21 changes: 12 additions & 9 deletions hydro_cli/src/core/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,18 @@ impl Deployment {
.collect::<Vec<_>>();
self.services = active_services;

let all_services_start =
self.services
.iter()
.map(|service: &Weak<RwLock<dyn Service>>| async {
service.upgrade().unwrap().write().await.start().await?;
Ok(()) as Result<()>
});

futures::future::try_join_all(all_services_start).await?;
progress::ProgressTracker::with_group("start", None, || {
let all_services_start =
self.services
.iter()
.map(|service: &Weak<RwLock<dyn Service>>| async {
service.upgrade().unwrap().write().await.start().await?;
Ok(()) as Result<()>
});

futures::future::try_join_all(all_services_start)
})
.await?;
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion hydro_cli/src/core/hydroflow_crate/ports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ impl HydroflowSink for HydroflowPortConfig {
merge.push(bind_type);
merge.len() - 1
} else {
panic!()
panic!("Expected a merge connection definition")
};

ServerConfig::MergeSelect(Box::new(base_config), merge_index)
Expand Down
23 changes: 17 additions & 6 deletions topolotree/src/latency_measure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ async fn main() {
.await
.into_source();

let num_clients: Vec<usize> = serde_json::from_str(&std::env::args().nth(1).unwrap()).unwrap();
let num_clients = num_clients[0];
let num_clients: u64 = std::env::args().nth(1).unwrap().parse().unwrap();
let partition_n: u64 = std::env::args().nth(2).unwrap().parse().unwrap();
let keys_per_partition: u64 = std::env::args().nth(3).unwrap().parse().unwrap();

let atomic_counter = Arc::new(AtomicU64::new(0));
let atomic_borrow = atomic_counter.clone();
Expand Down Expand Up @@ -71,10 +72,15 @@ async fn main() {
#[cfg(debug_assertions)]
let mut count_tracker = HashMap::new();

let mut next_base: u64 = 0;

loop {
let id = ((rand::random::<u64>() % 1024) / (num_clients as u64))
* (num_clients as u64)
+ (i as u64);
let id = ((((next_base % keys_per_partition)
+ (partition_n * keys_per_partition))
/ (num_clients))
* num_clients)
+ i;
next_base += 1;
let increment = rand::random::<bool>();
let change = if increment { 1 } else { -1 };
let start = Instant::now();
Expand Down Expand Up @@ -102,7 +108,12 @@ async fn main() {
let updated =
deserialize_from_bytes::<QueryResponse>(end_node.next().await.unwrap().unwrap())
.unwrap();
if queues[(updated.key % (num_clients as u64)) as usize]

if updated.key / keys_per_partition != partition_n {
continue;
}

if queues[(updated.key % num_clients) as usize]
.send(updated.value)
.is_err()
{
Expand Down
101 changes: 83 additions & 18 deletions topolotree/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
mod tests;

use std::cell::RefCell;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::fmt::{Debug, Display};
use std::io;
use std::rc::Rc;
Expand Down Expand Up @@ -33,7 +33,8 @@ impl Display for NodeID {
type PostNeighborJoin = (((u64, Option<NodeID>), (i64, usize)), NodeID);

fn run_topolotree(
neighbors: Vec<u32>,
self_id: u32,
init_neighbors: Vec<u32>,
input_recv: impl Stream<Item = Result<(u32, BytesMut), io::Error>> + Unpin + 'static,
increment_requests: impl Stream<Item = Result<BytesMut, io::Error>> + Unpin + 'static,
output_send: tokio::sync::mpsc::UnboundedSender<(u32, Bytes)>,
Expand All @@ -58,17 +59,32 @@ fn run_topolotree(
parsed_input = source_stream(input_recv)
-> map(Result::unwrap)
-> map(|(src, x)| (NodeID(src), deserialize_from_bytes::<TopolotreeMessage>(&x).unwrap()))
-> demux(|(src, msg), var_args!(payload, ping, pong)| {
-> demux(|(src, msg), var_args!(payload, ping, pong, neighbor_of_neighbor)| {
match msg {
TopolotreeMessage::Payload(p) => payload.give((src, p)),
TopolotreeMessage::Ping() => ping.give((src, ())),
TopolotreeMessage::Pong() => pong.give((src, ())),
TopolotreeMessage::NeighborOfNeighbor(its_neighbor, add) => neighbor_of_neighbor.give((src, (NodeID(its_neighbor), add)))
}
});

from_neighbors = parsed_input[payload];
pings = parsed_input[ping] -> tee();
pongs = parsed_input[pong] -> tee();

Check warning on line 73 in topolotree/src/main.rs

View workflow job for this annotation

GitHub Actions / Docs (rustdoc)

`tee` should have at least 2 output(s), actually has 1.

Check warning on line 73 in topolotree/src/main.rs

View workflow job for this annotation

GitHub Actions / Check (pinned-nightly)

`tee` should have at least 2 output(s), actually has 1.

Check warning on line 73 in topolotree/src/main.rs

View workflow job for this annotation

GitHub Actions / Check (pinned-nightly)

`tee` should have at least 2 output(s), actually has 1.

Check warning on line 73 in topolotree/src/main.rs

View workflow job for this annotation

GitHub Actions / Lints (pinned-nightly)

`tee` should have at least 2 output(s), actually has 1.

Check warning on line 73 in topolotree/src/main.rs

View workflow job for this annotation

GitHub Actions / Test Suite (--lib --bins, pinned-nightly)

`tee` should have at least 2 output(s), actually has 1.

Check warning on line 73 in topolotree/src/main.rs

View workflow job for this annotation

GitHub Actions / Test Suite (--benches, pinned-nightly)

`tee` should have at least 2 output(s), actually has 1.

Check warning on line 73 in topolotree/src/main.rs

View workflow job for this annotation

GitHub Actions / Test Suite (--tests, pinned-nightly)

`tee` should have at least 2 output(s), actually has 1.
neighbor_of_neighbor_ops = parsed_input[neighbor_of_neighbor] -> tee();

Check warning on line 74 in topolotree/src/main.rs

View workflow job for this annotation

GitHub Actions / Docs (rustdoc)

`tee` should have at least 2 output(s), actually has 1.

Check warning on line 74 in topolotree/src/main.rs

View workflow job for this annotation

GitHub Actions / Check (pinned-nightly)

`tee` should have at least 2 output(s), actually has 1.

Check warning on line 74 in topolotree/src/main.rs

View workflow job for this annotation

GitHub Actions / Check (pinned-nightly)

`tee` should have at least 2 output(s), actually has 1.

Check warning on line 74 in topolotree/src/main.rs

View workflow job for this annotation

GitHub Actions / Lints (pinned-nightly)

`tee` should have at least 2 output(s), actually has 1.

Check warning on line 74 in topolotree/src/main.rs

View workflow job for this annotation

GitHub Actions / Test Suite (--lib --bins, pinned-nightly)

`tee` should have at least 2 output(s), actually has 1.

Check warning on line 74 in topolotree/src/main.rs

View workflow job for this annotation

GitHub Actions / Test Suite (--benches, pinned-nightly)

`tee` should have at least 2 output(s), actually has 1.

Check warning on line 74 in topolotree/src/main.rs

View workflow job for this annotation

GitHub Actions / Test Suite (--tests, pinned-nightly)

`tee` should have at least 2 output(s), actually has 1.

neighbor_of_neighbor =
neighbor_of_neighbor_ops
-> map(|(src, (neighbor, add))| (src, (neighbor, add)))
-> fold_keyed::<'static>(HashSet::new, |acc: &mut HashSet<NodeID>, (neighbor, add)| {
if add {
acc.insert(neighbor);
} else {
acc.remove(&neighbor);
}
})
-> flat_map(|(src, acc)| acc.into_iter().map(move |neighbor| (src, neighbor)))
-> tee();

Check warning on line 87 in topolotree/src/main.rs

View workflow job for this annotation

GitHub Actions / Docs (rustdoc)

`tee` should have at least 2 output(s), actually has 1.

Check warning on line 87 in topolotree/src/main.rs

View workflow job for this annotation

GitHub Actions / Check (pinned-nightly)

`tee` should have at least 2 output(s), actually has 1.

Check warning on line 87 in topolotree/src/main.rs

View workflow job for this annotation

GitHub Actions / Check (pinned-nightly)

`tee` should have at least 2 output(s), actually has 1.

Check warning on line 87 in topolotree/src/main.rs

View workflow job for this annotation

GitHub Actions / Lints (pinned-nightly)

`tee` should have at least 2 output(s), actually has 1.

Check warning on line 87 in topolotree/src/main.rs

View workflow job for this annotation

GitHub Actions / Test Suite (--lib --bins, pinned-nightly)

`tee` should have at least 2 output(s), actually has 1.

Check warning on line 87 in topolotree/src/main.rs

View workflow job for this annotation

GitHub Actions / Test Suite (--benches, pinned-nightly)

`tee` should have at least 2 output(s), actually has 1.

Check warning on line 87 in topolotree/src/main.rs

View workflow job for this annotation

GitHub Actions / Test Suite (--tests, pinned-nightly)

`tee` should have at least 2 output(s), actually has 1.

pings -> map(|(src, _)| (src, TopolotreeMessage::Pong())) -> output;

Expand All @@ -79,19 +95,41 @@ fn run_topolotree(
-> map(|(src, _)| (src, TopolotreeMessage::Ping()))
-> output;

pongs -> dead_neighbors;
pings -> dead_neighbors;
new_neighbors -> map(|neighbor| (neighbor, ())) -> dead_neighbors; // fake pong
dead_neighbors = union() -> fold_keyed::<'static>(Instant::now, |acc: &mut Instant, _| {
pongs -> dead_maybe_neighbors;
pings -> dead_maybe_neighbors;
new_neighbors -> map(|neighbor| (neighbor, ())) -> dead_maybe_neighbors; // fake pong
dead_maybe_neighbors = union() -> fold_keyed::<'static>(Instant::now, |acc: &mut Instant, _| {
*acc = Instant::now();
})
-> filter_map(|(node_id, acc)| {
if acc.elapsed().as_secs() > 5 {
if acc.elapsed().as_secs() >= 5 {
Some(node_id)
} else {
None
}
}) -> tee();
})
-> map(|n| (n, ()))
-> [0]dead_neighbors;

neighbors -> map(|n| (n, ())) -> [1]dead_neighbors;
dead_neighbors = join()
-> map(|(n, _)| n)
-> tee();

// TODO(shadaj): only remove when we get an ack from the new leader
dead_neighbors -> removed_neighbors;

dead_neighbors -> map(|n| (n, ())) -> [0]min_neighbor_of_dead_neighbor;
neighbor_of_neighbor -> [1]min_neighbor_of_dead_neighbor;
min_neighbor_of_dead_neighbor = join()
-> map(|(dead, ((), neighbor))| (dead, neighbor))
-> filter(|(_, neighbor)| neighbor.0 != self_id)
-> reduce_keyed(|acc: &mut NodeID, n: NodeID| {
if n.0 < acc.0 {
*acc = n;
}
})
-> tee();

Check warning on line 132 in topolotree/src/main.rs

View workflow job for this annotation

GitHub Actions / Docs (rustdoc)

`tee` should have at least 2 output(s), actually has 0.

Check warning on line 132 in topolotree/src/main.rs

View workflow job for this annotation

GitHub Actions / Check (pinned-nightly)

`tee` should have at least 2 output(s), actually has 0.

Check warning on line 132 in topolotree/src/main.rs

View workflow job for this annotation

GitHub Actions / Check (pinned-nightly)

`tee` should have at least 2 output(s), actually has 0.

Check warning on line 132 in topolotree/src/main.rs

View workflow job for this annotation

GitHub Actions / Lints (pinned-nightly)

`tee` should have at least 2 output(s), actually has 0.

Check warning on line 132 in topolotree/src/main.rs

View workflow job for this annotation

GitHub Actions / Test Suite (--lib --bins, pinned-nightly)

`tee` should have at least 2 output(s), actually has 0.

Check warning on line 132 in topolotree/src/main.rs

View workflow job for this annotation

GitHub Actions / Test Suite (--benches, pinned-nightly)

`tee` should have at least 2 output(s), actually has 0.

Check warning on line 132 in topolotree/src/main.rs

View workflow job for this annotation

GitHub Actions / Test Suite (--tests, pinned-nightly)

`tee` should have at least 2 output(s), actually has 0.

from_neighbors
-> map(|(src, payload): (NodeID, Payload<i64>)| ((payload.key, src), (payload.key, payload.contents)))
Expand All @@ -102,7 +140,12 @@ fn run_topolotree(
acc.1 = context.current_tick();
}
})
-> map(|((key, src), (payload, change_tick))| ((key, Some(src)), (payload.data, change_tick)))
-> map(|((key, src), (payload, change_tick))| (src, ((key, Some(src)), (payload.data, change_tick))))
-> [1]from_actual_neighbors;

neighbors -> map(|n| (n, ())) -> [0]from_actual_neighbors;
from_actual_neighbors = join()
-> map(|(_, (_, payload))| payload)
-> from_neighbors_or_local;

local_value = source_stream(increment_requests)
Expand All @@ -121,19 +164,29 @@ fn run_topolotree(
from_neighbors_or_local = union() -> tee();
from_neighbors_or_local -> [0]all_neighbor_data;

new_neighbors = source_iter(neighbors)
new_neighbors = source_iter(init_neighbors)
-> map(NodeID)
-> tee();

new_neighbors
-> persist()
-> [pos]neighbors;
dead_neighbors -> [neg]neighbors;
neighbors = difference()
new_neighbors -> map(|n| (n, true)) -> neighbors;
removed_neighbors = map(|n| (n, false)) -> neighbors;
neighbors = union()
-> map(|(neighbor, add)| (neighbor, !add))
-> sort_by_key(|(_, remove)| remove) // process adds first
-> fold::<'static>(|| vec![], |acc: &mut Vec<NodeID>, (neighbor, remove): (NodeID, bool)| {

Check failure on line 176 in topolotree/src/main.rs

View workflow job for this annotation

GitHub Actions / Lints (pinned-nightly)

redundant closure
if remove {
acc.retain(|x| *x != neighbor);
} else {
acc.push(neighbor);
}
})
-> flatten()
-> tee();

neighbors -> [1]all_neighbor_data;

// broadcast_neighbors = cross_join() TODO

query_result = from_neighbors_or_local
-> map(|((key, _), payload): ((u64, _), (i64, usize))| {
(key, payload)
Expand Down Expand Up @@ -183,7 +236,12 @@ fn run_topolotree(
#[hydroflow::main]
async fn main() {
let args: Vec<String> = std::env::args().skip(1).collect();
let neighbors: Vec<u32> = args.into_iter().map(|x| x.parse().unwrap()).collect();
let self_id: u32 = args[0].parse().unwrap();
let neighbors: Vec<u32> = args
.into_iter()
.skip(1)
.map(|x| x.parse().unwrap())
.collect();

let mut ports = hydroflow::util::cli::init().await;

Expand Down Expand Up @@ -228,7 +286,14 @@ async fn main() {
}
});

let flow = run_topolotree(neighbors, input_recv, operations_send, chan_tx, query_tx);
let flow = run_topolotree(
self_id,
neighbors,
input_recv,
operations_send,
chan_tx,
query_tx,
);

let f1 = async move {
#[cfg(target_os = "linux")]
Expand Down
1 change: 1 addition & 0 deletions topolotree/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub enum TopolotreeMessage {
Payload(Payload<i64>),
Ping(),
Pong(),
NeighborOfNeighbor(u32, bool),
}

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)]
Expand Down
Loading

0 comments on commit c4b85db

Please sign in to comment.