Skip to content

Commit

Permalink
refactor(hydro_test): make logic for sending payloads to current lead…
Browse files Browse the repository at this point in the history
…er reusable (#1703)
  • Loading branch information
shadaj authored Feb 8, 2025
1 parent f3c4590 commit 4059b7b
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 79 deletions.
23 changes: 16 additions & 7 deletions hydro_test/examples/paxos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use hydro_deploy::gcp::GcpNetwork;
use hydro_deploy::{Deployment, Host};
use hydro_lang::deploy::TrybuildHost;
use hydro_test::cluster::paxos::PaxosConfig;
use hydro_test::cluster::paxos::{CorePaxos, PaxosConfig};
use tokio::sync::RwLock;

type HostCreator = Box<dyn Fn(&mut Deployment) -> Arc<dyn Host>>;
Expand Down Expand Up @@ -42,16 +42,25 @@ async fn main() {
let i_am_leader_check_timeout = 10; // Sec
let i_am_leader_check_timeout_delay_multiplier = 15;

let (proposers, acceptors, clients, replicas) = hydro_test::cluster::paxos_bench::paxos_bench(
let proposers = builder.cluster();
let acceptors = builder.cluster();

let (clients, replicas) = hydro_test::cluster::paxos_bench::paxos_bench(
&builder,
num_clients_per_node,
median_latency_window_size,
checkpoint_frequency,
PaxosConfig {
f,
i_am_leader_send_timeout,
i_am_leader_check_timeout,
i_am_leader_check_timeout_delay_multiplier,
f + 1,
|replica_checkpoint| CorePaxos {
proposers: proposers.clone(),
acceptors: acceptors.clone(),
replica_checkpoint: replica_checkpoint.broadcast_bincode(&acceptors),
paxos_config: PaxosConfig {
f,
i_am_leader_send_timeout,
i_am_leader_check_timeout,
i_am_leader_check_timeout_delay_multiplier,
},
},
);

Expand Down
44 changes: 44 additions & 0 deletions hydro_test/src/cluster/paxos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ use hydro_std::request_response::join_responses;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};

use super::kv_replica::Replica;
use super::paxos_with_client::PaxosLike;

pub struct Proposer {}
pub struct Acceptor {}

Expand Down Expand Up @@ -60,6 +63,47 @@ struct P2a<P> {
value: Option<P>, // might be a re-committed hole
}

pub struct CorePaxos<'a> {
pub proposers: Cluster<'a, Proposer>,
pub acceptors: Cluster<'a, Acceptor>,
pub replica_checkpoint:
Stream<(ClusterId<Replica>, usize), Cluster<'a, Acceptor>, Unbounded, NoOrder>,
pub paxos_config: PaxosConfig,
}

impl<'a> PaxosLike<'a> for CorePaxos<'a> {
type Leader = Proposer;
type Ballot = Ballot;

fn leaders(&self) -> &Cluster<'a, Proposer> {
&self.proposers
}

fn get_ballot_leader<L: Location<'a>>(
ballot: Optional<Self::Ballot, L, Unbounded>,
) -> Optional<ClusterId<Self::Leader>, L, Unbounded> {
ballot.map(q!(|ballot| ballot.proposer_id))
}

unsafe fn build<P: PaxosPayload>(
self,
with_ballot: impl FnOnce(
Stream<Ballot, Cluster<'a, Proposer>, Unbounded>,
) -> Stream<P, Cluster<'a, Proposer>, Unbounded>,
) -> Stream<(usize, Option<P>), Cluster<'a, Proposer>, Unbounded, NoOrder> {
unsafe {
paxos_core(
&self.proposers,
&self.acceptors,
self.replica_checkpoint,
with_ballot,
self.paxos_config,
)
.1
}
}
}

/// Implements the core Paxos algorithm, which uses a cluster of propsers and acceptors
/// to sequence payloads being sent to the proposers.
///
Expand Down
52 changes: 21 additions & 31 deletions hydro_test/src/cluster/paxos_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,16 @@ use hydro_std::quorum::collect_quorum;

use super::bench_client::{bench_client, Client};
use super::kv_replica::{kv_replica, KvPayload, Replica};
use super::paxos::{Acceptor, PaxosConfig, Proposer};
use super::paxos_with_client::paxos_with_client;
use super::paxos_with_client::PaxosLike;

pub fn paxos_bench<'a>(
pub fn paxos_bench<'a, Paxos: PaxosLike<'a>>(
flow: &FlowBuilder<'a>,
num_clients_per_node: usize,
median_latency_window_size: usize, /* How many latencies to keep in the window for calculating the median */
checkpoint_frequency: usize, // How many sequence numbers to commit before checkpointing
paxos_config: PaxosConfig,
) -> (
Cluster<'a, Proposer>,
Cluster<'a, Acceptor>,
Cluster<'a, Client>,
Cluster<'a, Replica>,
) {
let proposers = flow.cluster::<Proposer>();
let acceptors = flow.cluster::<Acceptor>();
replica_count: usize,
create_paxos: impl FnOnce(Stream<usize, Cluster<'a, Replica>, Unbounded>) -> Paxos,
) -> (Cluster<'a, Client>, Cluster<'a, Replica>) {
let clients = flow.cluster::<Client>();
let replicas = flow.cluster::<Replica>();

Expand All @@ -35,19 +28,14 @@ pub fn paxos_bench<'a>(
let (replica_checkpoint_complete, replica_checkpoint) =
replicas.forward_ref::<Stream<_, _, _>>();

let paxos = create_paxos(replica_checkpoint);

let sequenced_payloads = unsafe {
// SAFETY: clients "own" certain keys, so interleaving elements from clients will not affect
// the order of writes to the same key

// TODO(shadaj): we should retry when a payload is dropped due to stale leader
paxos_with_client(
&proposers,
&acceptors,
&clients,
payloads,
replica_checkpoint.broadcast_bincode(&acceptors),
paxos_config,
)
paxos.with_client(&clients, payloads)
};

let sequenced_to_replicas = sequenced_payloads.broadcast_bincode_interleaved(&replicas);
Expand All @@ -68,8 +56,8 @@ pub fn paxos_bench<'a>(
// we only mark a transaction as committed when all replicas have applied it
collect_quorum::<_, _, _, ()>(
c_received_payloads.atomic(&clients.tick()),
paxos_config.f + 1,
paxos_config.f + 1,
replica_count,
replica_count,
)
.0
.end_atomic()
Expand All @@ -78,31 +66,33 @@ pub fn paxos_bench<'a>(
median_latency_window_size,
);

(proposers, acceptors, clients, replicas)
(clients, replicas)
}

#[cfg(test)]
mod tests {
use hydro_lang::deploy::DeployRuntime;
use stageleft::RuntimeData;

use crate::cluster::paxos::PaxosConfig;
use crate::cluster::paxos::{CorePaxos, PaxosConfig};

#[test]
fn paxos_ir() {
let builder = hydro_lang::FlowBuilder::new();
let _ = super::paxos_bench(
&builder,
1,
1,
1,
PaxosConfig {
let proposers = builder.cluster();
let acceptors = builder.cluster();

let _ = super::paxos_bench(&builder, 1, 1, 1, 2, |replica_checkpoint| CorePaxos {
proposers,
acceptors: acceptors.clone(),
replica_checkpoint: replica_checkpoint.broadcast_bincode(&acceptors),
paxos_config: PaxosConfig {
f: 1,
i_am_leader_send_timeout: 1,
i_am_leader_check_timeout: 1,
i_am_leader_check_timeout_delay_multiplier: 1,
},
);
});
let built = builder.with_default_optimize::<DeployRuntime>();

hydro_lang::ir::dbg_dedup_tee(|| {
Expand Down
95 changes: 55 additions & 40 deletions hydro_test/src/cluster/paxos_with_client.rs
Original file line number Diff line number Diff line change
@@ -1,40 +1,57 @@
use std::fmt::Debug;

use hydro_lang::*;
use serde::de::DeserializeOwned;
use serde::Serialize;

use super::paxos::PaxosPayload;

pub trait PaxosLike<'a>: Sized {
type Leader: 'a;
type Ballot: Clone + Ord + Debug + Serialize + DeserializeOwned;

fn leaders(&self) -> &Cluster<'a, Self::Leader>;

fn get_ballot_leader<L: Location<'a>>(
ballot: Optional<Self::Ballot, L, Unbounded>,
) -> Optional<ClusterId<Self::Leader>, L, Unbounded>;

/// # Safety
/// During leader-reelection, the latest known leader may be stale, which may
/// result in non-deterministic dropping of payloads.
#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
unsafe fn build<P: PaxosPayload>(
self,
payload_generator: impl FnOnce(
Stream<Self::Ballot, Cluster<'a, Self::Leader>, Unbounded>,
) -> Stream<P, Cluster<'a, Self::Leader>, Unbounded>,
) -> Stream<(usize, Option<P>), Cluster<'a, Self::Leader>, Unbounded, NoOrder>;

/// # Safety
/// During leader-reelection, the latest known leader may be stale, which may
/// result in non-deterministic dropping of payloads.
#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
unsafe fn with_client<C: 'a, P: PaxosPayload>(
self,
clients: &Cluster<'a, C>,
payloads: Stream<P, Cluster<'a, C>, Unbounded>,
) -> Stream<(usize, Option<P>), Cluster<'a, Self::Leader>, Unbounded, NoOrder> {
unsafe {
// SAFETY: Non-deterministic leader notifications are handled in `cur_leader_id`. We do not
// care about the order in which key writes are processed, which is the non-determinism in
// `sequenced_payloads`.
let leaders = self.leaders().clone();

use super::paxos::{paxos_core, Acceptor, Ballot, PaxosConfig, PaxosPayload, Proposer};

/// Wraps the core Paxos algorithm with logic to send payloads from clients to the current
/// leader.
///
/// # Safety
/// Clients may send payloads to a stale leader if the leader changes between the time the
/// payload is sent and the time it is processed. This will result in the payload being dropped.
/// Payloads sent from multiple clients may be interleaved in a non-deterministic order.
pub unsafe fn paxos_with_client<'a, C: 'a, R, P: PaxosPayload>(
proposers: &Cluster<'a, Proposer>,
acceptors: &Cluster<'a, Acceptor>,
clients: &Cluster<'a, C>,
payloads: Stream<P, Cluster<'a, C>, Unbounded>,
replica_checkpoint: Stream<(ClusterId<R>, usize), Cluster<'a, Acceptor>, Unbounded, NoOrder>,
paxos_config: PaxosConfig,
) -> Stream<(usize, Option<P>), Cluster<'a, Proposer>, Unbounded, NoOrder> {
unsafe {
// SAFETY: Non-deterministic leader notifications are handled in `cur_leader_id`. We do not
// care about the order in which key writes are processed, which is the non-determinism in
// `sequenced_payloads`.

paxos_core(
proposers,
acceptors,
replica_checkpoint,
|new_leader_elected| {
let cur_leader_id = new_leader_elected
.broadcast_bincode_interleaved(clients)
.inspect(q!(|ballot| println!(
"Client notified that leader was elected: {:?}",
ballot
)))
.max()
.map(q!(|ballot: Ballot| ballot.proposer_id));
self.build(move |new_leader_elected| {
let cur_leader_id = Self::get_ballot_leader(
new_leader_elected
.broadcast_bincode_interleaved(clients)
.inspect(q!(|ballot| println!(
"Client notified that leader was elected: {:?}",
ballot
)))
.max(),
);

let payloads_at_proposer = {
// SAFETY: the risk here is that we send a batch of requests
Expand All @@ -59,17 +76,15 @@ pub unsafe fn paxos_with_client<'a, C: 'a, R, P: PaxosPayload>(
all_payloads.cross_singleton(latest_leader).all_ticks()
}
.map(q!(move |(payload, leader_id)| (leader_id, payload)))
.send_bincode_anonymous(proposers);
.send_bincode_anonymous(&leaders);

let payloads_at_proposer = {
// SAFETY: documented non-determinism in interleaving of client payloads
payloads_at_proposer.assume_ordering()
};

payloads_at_proposer
},
paxos_config,
)
.1
})
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1570,7 +1570,7 @@ expression: built.ir()
acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos :: Proposer > , () > ({ use hydro_lang :: __staged :: stream :: * ; | count , _ | * count += 1 }),
input: Tee {
inner: <tee 15>: Map {
f: stageleft :: runtime_support :: fn1_type_hint :: < hydro_test :: cluster :: paxos :: Ballot , hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos :: Proposer > > ({ use crate :: __staged :: cluster :: paxos_with_client :: * ; | ballot : Ballot | ballot . proposer_id }),
f: stageleft :: runtime_support :: fn1_type_hint :: < hydro_test :: cluster :: paxos :: Ballot , hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos :: Proposer > > ({ use crate :: __staged :: cluster :: paxos :: * ; | ballot | ballot . proposer_id }),
input: Reduce {
f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < hydro_test :: cluster :: paxos :: Ballot , hydro_test :: cluster :: paxos :: Ballot , () > ({ use hydro_lang :: __staged :: stream :: * ; | curr , new | { if new > * curr { * curr = new ; } } }),
input: Persist {
Expand Down

0 comments on commit 4059b7b

Please sign in to comment.