Skip to content

Commit

Permalink
refactor(paxos): extract leader election notification (#1514)
Browse files Browse the repository at this point in the history
  • Loading branch information
shadaj authored Oct 31, 2024
1 parent 871dc01 commit 57a2b81
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 178 deletions.
74 changes: 28 additions & 46 deletions hydroflow_plus_test/src/cluster/paxos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ use serde::{Deserialize, Serialize};
use stageleft::*;
use tokio::time::Instant;

use super::paxos_bench::LeaderElected;

pub struct Proposer {}
pub struct Acceptor {}

Expand All @@ -25,12 +23,6 @@ pub struct Ballot {
pub proposer_id: ClusterId<Proposer>,
}

impl LeaderElected for Ballot {
fn leader_id(&self) -> ClusterId<Proposer> {
self.proposer_id
}
}

#[derive(Serialize, Deserialize, Clone, Debug)]
struct P1a {
ballot: Ballot,
Expand Down Expand Up @@ -85,7 +77,7 @@ pub fn paxos_core<'a, P: PaxosPayload, R>(
) -> (
Cluster<'a, Proposer>,
Cluster<'a, Acceptor>,
Stream<Ballot, Unbounded, NoTick, Cluster<'a, Proposer>>,
Stream<(), Unbounded, NoTick, Cluster<'a, Proposer>>,
Stream<(i32, P), Unbounded, NoTick, Cluster<'a, Proposer>>,
) {
let proposers = flow.cluster::<Proposer>();
Expand Down Expand Up @@ -115,29 +107,37 @@ pub fn paxos_core<'a, P: PaxosPayload, R>(
a_log_forward_reference.map(q!(|(_ckpnt, log)| log.clone())),
);

let just_became_leader = p_is_leader
.clone()
.continue_unless(p_is_leader.clone().defer_tick());

// Tell clients that leader election has completed and they can begin sending messages
let p_to_clients_new_leader_elected = just_became_leader
.clone()
.map(q!(move |_| ())) // Only tell the clients once when leader election concludes
.all_ticks();

let (p_log_to_try_commit, p_max_slot, p_log_holes) =
recommit_after_leader_election(&proposers, p_relevant_p1bs, p_ballot_num.clone(), f);

let p_log_to_recommit = p_log_to_try_commit
.union(p_log_holes)
.continue_unless(p_is_leader.clone().defer_tick())
.continue_if(p_is_leader.clone()); // Only resend p1b stuff once the moment we become leader.
.continue_if(just_became_leader); // Only resend p1b stuff once the moment we become leader.

let c_to_proposers = c_to_proposers(&proposers);

let (p_to_clients_new_leader_elected, p_to_replicas, a_log, a_to_proposers_p2b) =
sequence_payload(
&proposers,
&acceptors,
c_to_proposers,
r_to_acceptors_checkpoint,
p_ballot_num,
p_is_leader,
p_max_slot,
p_log_to_recommit,
f,
a_max_ballot,
);
let (p_to_replicas, a_log, a_to_proposers_p2b) = sequence_payload(
&proposers,
&acceptors,
c_to_proposers,
r_to_acceptors_checkpoint,
p_ballot_num,
p_is_leader,
p_max_slot,
p_log_to_recommit,
f,
a_max_ballot,
);

a_log_complete_cycle.complete(a_log);
a_to_proposers_p2b_complete_cycle.complete(a_to_proposers_p2b);
Expand Down Expand Up @@ -550,13 +550,11 @@ fn sequence_payload<'a, P: PaxosPayload, R>(

a_max_ballot: Singleton<Ballot, Bounded, Tick, Cluster<'a, Acceptor>>,
) -> (
Stream<Ballot, Unbounded, NoTick, Cluster<'a, Proposer>>,
Stream<(i32, P), Unbounded, NoTick, Cluster<'a, Proposer>>,
Singleton<(i32, HashMap<i32, LogValue<P>>), Bounded, Tick, Cluster<'a, Acceptor>>,
Stream<P2b<P>, Unbounded, NoTick, Cluster<'a, Proposer>>,
) {
let p_id = proposers.self_id();
let (p_next_slot, p_to_acceptors_p2a) = p_p2a(
let p_to_acceptors_p2a = p_p2a(
proposers,
p_max_slot,
c_to_proposers,
Expand All @@ -566,13 +564,6 @@ fn sequence_payload<'a, P: PaxosPayload, R>(
acceptors,
);

// Tell clients that leader election has completed and they can begin sending messages
let p_to_clients_new_leader_elected = p_is_leader.clone()
.continue_unless(p_next_slot)
.cross_singleton(p_ballot_num)
.map(q!(move |(_is_leader, ballot_num)| Ballot { num: ballot_num, proposer_id: p_id})) // Only tell the clients once when leader election concludes
.all_ticks();

// Acceptors.
let r_to_acceptors_checkpoint = r_to_acceptors_checkpoint(acceptors);

Expand All @@ -589,12 +580,7 @@ fn sequence_payload<'a, P: PaxosPayload, R>(
// End tell clients that leader election has completed
let p_to_replicas = p_p2b(proposers, a_to_proposers_p2b.clone(), f);

(
p_to_clients_new_leader_elected,
p_to_replicas,
a_log,
a_to_proposers_p2b,
)
(p_to_replicas, a_log, a_to_proposers_p2b)
}

#[derive(Clone)]
Expand All @@ -604,7 +590,6 @@ enum CheckpointOrP2a<P> {
}

// Proposer logic to send p2as, outputting the next slot and the p2as to send to acceptors.
#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
fn p_p2a<'a, P: PaxosPayload>(
proposers: &Cluster<'a, Proposer>,
p_max_slot: Optional<i32, Bounded, Tick, Cluster<'a, Proposer>>,
Expand All @@ -613,10 +598,7 @@ fn p_p2a<'a, P: PaxosPayload>(
p_log_to_recommit: Stream<P2a<P>, Bounded, Tick, Cluster<'a, Proposer>>,
p_is_leader: Optional<bool, Bounded, Tick, Cluster<'a, Proposer>>,
acceptors: &Cluster<'a, Acceptor>,
) -> (
Optional<i32, Bounded, Tick, Cluster<'a, Proposer>>,
Stream<P2a<P>, Unbounded, NoTick, Cluster<'a, Acceptor>>,
) {
) -> Stream<P2a<P>, Unbounded, NoTick, Cluster<'a, Acceptor>> {
let p_id = proposers.self_id();
let (p_next_slot_complete_cycle, p_next_slot) =
proposers.tick_cycle::<Optional<i32, _, _, _>>();
Expand Down Expand Up @@ -657,7 +639,7 @@ fn p_p2a<'a, P: PaxosPayload>(
.continue_if(p_is_leader.clone());

p_next_slot_complete_cycle.complete_next_tick(p_new_next_slot);
(p_next_slot, p_to_acceptors_p2a)
p_to_acceptors_p2a
}

#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
Expand Down
25 changes: 14 additions & 11 deletions hydroflow_plus_test/src/cluster/paxos_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,7 @@ use hydroflow_plus::*;
use serde::{Deserialize, Serialize};
use stageleft::*;

use super::paxos::{paxos_core, Acceptor, Ballot, PaxosPayload, Proposer};

pub trait LeaderElected: Ord + Clone {
fn leader_id(&self) -> ClusterId<Proposer>;
}
use super::paxos::{paxos_core, Acceptor, PaxosPayload, Proposer};

pub struct Replica {}

Expand Down Expand Up @@ -79,7 +75,9 @@ pub fn paxos_bench<'a>(

c_to_proposers_complete_cycle.complete(bench_client(
&clients,
p_to_clients_new_leader_elected.broadcast_bincode_interleaved(&clients),
p_to_clients_new_leader_elected
.broadcast_bincode(&clients)
.map(q!(|(leader_id, _)| leader_id)),
r_new_processed_payloads.send_bincode(&clients),
num_clients_per_node,
median_latency_window_size,
Expand Down Expand Up @@ -111,7 +109,7 @@ fn paxos_with_replica<'a>(
) -> (
Cluster<'a, Proposer>,
Cluster<'a, Acceptor>,
Stream<Ballot, Unbounded, NoTick, Cluster<'a, Proposer>>,
Stream<(), Unbounded, NoTick, Cluster<'a, Proposer>>,
Stream<(ClusterId<Client>, ReplicaPayload), Unbounded, NoTick, Cluster<'a, Replica>>,
) {
let (r_to_acceptors_checkpoint_complete_cycle, r_to_acceptors_checkpoint) =
Expand Down Expand Up @@ -251,9 +249,14 @@ pub fn replica<'a>(
}

// Clients. All relations for clients will be prefixed with c. All ClientPayloads will contain the virtual client number as key and the client's machine ID (to string) as value. Expects p_to_clients_leader_elected containing Ballots whenever the leader is elected, and r_to_clients_payload_applied containing ReplicaPayloads whenever a payload is committed. Outputs (leader address, ClientPayload) when a new leader is elected or when the previous payload is committed.
fn bench_client<'a, B: LeaderElected + std::fmt::Debug>(
fn bench_client<'a>(
clients: &Cluster<'a, Client>,
p_to_clients_leader_elected: Stream<B, Unbounded, NoTick, Cluster<'a, Client>>,
p_to_clients_leader_elected: Stream<
ClusterId<Proposer>,
Unbounded,
NoTick,
Cluster<'a, Client>,
>,
r_to_clients_payload_applied: Stream<
(ClusterId<Replica>, ReplicaPayload),
Unbounded,
Expand All @@ -280,7 +283,7 @@ fn bench_client<'a, B: LeaderElected + std::fmt::Debug>(
.clone()
.flat_map(q!(move |leader_ballot| (0..num_clients_per_node).map(
move |i| (
leader_ballot.leader_id(),
leader_ballot,
ClientPayload {
key: i as u32,
value: c_id.raw_id.to_string()
Expand Down Expand Up @@ -320,7 +323,7 @@ fn bench_client<'a, B: LeaderElected + std::fmt::Debug>(
.clone()
.cross_singleton(c_max_leader_ballot.clone().latest_tick())
.map(q!(move |(key, leader_ballot)| (
leader_ballot.leader_id(),
leader_ballot,
ClientPayload {
key,
value: c_id.raw_id.to_string()
Expand Down
Loading

0 comments on commit 57a2b81

Please sign in to comment.