diff --git a/hydroflow_plus_test/src/cluster/paxos.rs b/hydroflow_plus_test/src/cluster/paxos.rs index 5af3d1a62fdc..2f704830447d 100644 --- a/hydroflow_plus_test/src/cluster/paxos.rs +++ b/hydroflow_plus_test/src/cluster/paxos.rs @@ -8,8 +8,6 @@ use serde::{Deserialize, Serialize}; use stageleft::*; use tokio::time::Instant; -use super::paxos_bench::LeaderElected; - pub struct Proposer {} pub struct Acceptor {} @@ -25,12 +23,6 @@ pub struct Ballot { pub proposer_id: ClusterId, } -impl LeaderElected for Ballot { - fn leader_id(&self) -> ClusterId { - self.proposer_id - } -} - #[derive(Serialize, Deserialize, Clone, Debug)] struct P1a { ballot: Ballot, @@ -85,7 +77,7 @@ pub fn paxos_core<'a, P: PaxosPayload, R>( ) -> ( Cluster<'a, Proposer>, Cluster<'a, Acceptor>, - Stream>, + Stream<(), Unbounded, NoTick, Cluster<'a, Proposer>>, Stream<(i32, P), Unbounded, NoTick, Cluster<'a, Proposer>>, ) { let proposers = flow.cluster::(); @@ -115,29 +107,37 @@ pub fn paxos_core<'a, P: PaxosPayload, R>( a_log_forward_reference.map(q!(|(_ckpnt, log)| log.clone())), ); + let just_became_leader = p_is_leader + .clone() + .continue_unless(p_is_leader.clone().defer_tick()); + + // Tell clients that leader election has completed and they can begin sending messages + let p_to_clients_new_leader_elected = just_became_leader + .clone() + .map(q!(move |_| ())) // Only tell the clients once when leader election concludes + .all_ticks(); + let (p_log_to_try_commit, p_max_slot, p_log_holes) = recommit_after_leader_election(&proposers, p_relevant_p1bs, p_ballot_num.clone(), f); let p_log_to_recommit = p_log_to_try_commit .union(p_log_holes) - .continue_unless(p_is_leader.clone().defer_tick()) - .continue_if(p_is_leader.clone()); // Only resend p1b stuff once the moment we become leader. + .continue_if(just_became_leader); // Only resend p1b stuff once the moment we become leader. let c_to_proposers = c_to_proposers(&proposers); - let (p_to_clients_new_leader_elected, p_to_replicas, a_log, a_to_proposers_p2b) = - sequence_payload( - &proposers, - &acceptors, - c_to_proposers, - r_to_acceptors_checkpoint, - p_ballot_num, - p_is_leader, - p_max_slot, - p_log_to_recommit, - f, - a_max_ballot, - ); + let (p_to_replicas, a_log, a_to_proposers_p2b) = sequence_payload( + &proposers, + &acceptors, + c_to_proposers, + r_to_acceptors_checkpoint, + p_ballot_num, + p_is_leader, + p_max_slot, + p_log_to_recommit, + f, + a_max_ballot, + ); a_log_complete_cycle.complete(a_log); a_to_proposers_p2b_complete_cycle.complete(a_to_proposers_p2b); @@ -550,13 +550,11 @@ fn sequence_payload<'a, P: PaxosPayload, R>( a_max_ballot: Singleton>, ) -> ( - Stream>, Stream<(i32, P), Unbounded, NoTick, Cluster<'a, Proposer>>, Singleton<(i32, HashMap>), Bounded, Tick, Cluster<'a, Acceptor>>, Stream, Unbounded, NoTick, Cluster<'a, Proposer>>, ) { - let p_id = proposers.self_id(); - let (p_next_slot, p_to_acceptors_p2a) = p_p2a( + let p_to_acceptors_p2a = p_p2a( proposers, p_max_slot, c_to_proposers, @@ -566,13 +564,6 @@ fn sequence_payload<'a, P: PaxosPayload, R>( acceptors, ); - // Tell clients that leader election has completed and they can begin sending messages - let p_to_clients_new_leader_elected = p_is_leader.clone() - .continue_unless(p_next_slot) - .cross_singleton(p_ballot_num) - .map(q!(move |(_is_leader, ballot_num)| Ballot { num: ballot_num, proposer_id: p_id})) // Only tell the clients once when leader election concludes - .all_ticks(); - // Acceptors. let r_to_acceptors_checkpoint = r_to_acceptors_checkpoint(acceptors); @@ -589,12 +580,7 @@ fn sequence_payload<'a, P: PaxosPayload, R>( // End tell clients that leader election has completed let p_to_replicas = p_p2b(proposers, a_to_proposers_p2b.clone(), f); - ( - p_to_clients_new_leader_elected, - p_to_replicas, - a_log, - a_to_proposers_p2b, - ) + (p_to_replicas, a_log, a_to_proposers_p2b) } #[derive(Clone)] @@ -604,7 +590,6 @@ enum CheckpointOrP2a

{ } // Proposer logic to send p2as, outputting the next slot and the p2as to send to acceptors. -#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")] fn p_p2a<'a, P: PaxosPayload>( proposers: &Cluster<'a, Proposer>, p_max_slot: Optional>, @@ -613,10 +598,7 @@ fn p_p2a<'a, P: PaxosPayload>( p_log_to_recommit: Stream, Bounded, Tick, Cluster<'a, Proposer>>, p_is_leader: Optional>, acceptors: &Cluster<'a, Acceptor>, -) -> ( - Optional>, - Stream, Unbounded, NoTick, Cluster<'a, Acceptor>>, -) { +) -> Stream, Unbounded, NoTick, Cluster<'a, Acceptor>> { let p_id = proposers.self_id(); let (p_next_slot_complete_cycle, p_next_slot) = proposers.tick_cycle::>(); @@ -657,7 +639,7 @@ fn p_p2a<'a, P: PaxosPayload>( .continue_if(p_is_leader.clone()); p_next_slot_complete_cycle.complete_next_tick(p_new_next_slot); - (p_next_slot, p_to_acceptors_p2a) + p_to_acceptors_p2a } #[expect(clippy::type_complexity, reason = "internal paxos code // TODO")] diff --git a/hydroflow_plus_test/src/cluster/paxos_bench.rs b/hydroflow_plus_test/src/cluster/paxos_bench.rs index 37cfc078305d..897727de986d 100644 --- a/hydroflow_plus_test/src/cluster/paxos_bench.rs +++ b/hydroflow_plus_test/src/cluster/paxos_bench.rs @@ -7,11 +7,7 @@ use hydroflow_plus::*; use serde::{Deserialize, Serialize}; use stageleft::*; -use super::paxos::{paxos_core, Acceptor, Ballot, PaxosPayload, Proposer}; - -pub trait LeaderElected: Ord + Clone { - fn leader_id(&self) -> ClusterId; -} +use super::paxos::{paxos_core, Acceptor, PaxosPayload, Proposer}; pub struct Replica {} @@ -79,7 +75,9 @@ pub fn paxos_bench<'a>( c_to_proposers_complete_cycle.complete(bench_client( &clients, - p_to_clients_new_leader_elected.broadcast_bincode_interleaved(&clients), + p_to_clients_new_leader_elected + .broadcast_bincode(&clients) + .map(q!(|(leader_id, _)| leader_id)), r_new_processed_payloads.send_bincode(&clients), num_clients_per_node, median_latency_window_size, @@ -111,7 +109,7 @@ fn paxos_with_replica<'a>( ) -> ( Cluster<'a, Proposer>, Cluster<'a, Acceptor>, - Stream>, + Stream<(), Unbounded, NoTick, Cluster<'a, Proposer>>, Stream<(ClusterId, ReplicaPayload), Unbounded, NoTick, Cluster<'a, Replica>>, ) { let (r_to_acceptors_checkpoint_complete_cycle, r_to_acceptors_checkpoint) = @@ -251,9 +249,14 @@ pub fn replica<'a>( } // Clients. All relations for clients will be prefixed with c. All ClientPayloads will contain the virtual client number as key and the client's machine ID (to string) as value. Expects p_to_clients_leader_elected containing Ballots whenever the leader is elected, and r_to_clients_payload_applied containing ReplicaPayloads whenever a payload is committed. Outputs (leader address, ClientPayload) when a new leader is elected or when the previous payload is committed. -fn bench_client<'a, B: LeaderElected + std::fmt::Debug>( +fn bench_client<'a>( clients: &Cluster<'a, Client>, - p_to_clients_leader_elected: Stream>, + p_to_clients_leader_elected: Stream< + ClusterId, + Unbounded, + NoTick, + Cluster<'a, Client>, + >, r_to_clients_payload_applied: Stream< (ClusterId, ReplicaPayload), Unbounded, @@ -280,7 +283,7 @@ fn bench_client<'a, B: LeaderElected + std::fmt::Debug>( .clone() .flat_map(q!(move |leader_ballot| (0..num_clients_per_node).map( move |i| ( - leader_ballot.leader_id(), + leader_ballot, ClientPayload { key: i as u32, value: c_id.raw_id.to_string() @@ -320,7 +323,7 @@ fn bench_client<'a, B: LeaderElected + std::fmt::Debug>( .clone() .cross_singleton(c_max_leader_ballot.clone().latest_tick()) .map(q!(move |(key, leader_ballot)| ( - leader_ballot.leader_id(), + leader_ballot, ClientPayload { key, value: c_id.raw_id.to_string() diff --git a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap index 169fe4a64112..c2f43c08201f 100644 --- a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap +++ b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap @@ -709,65 +709,67 @@ expression: built.ir() Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , ()) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }), input: CrossSingleton( - Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , ()) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }), - input: CrossSingleton( - Union( - FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((i32 , (u32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >)) , u32) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | ((slot , (count , entry)) , ballot_num) | if count <= f as u32 { Some (P2a { ballot : Ballot { num : ballot_num , proposer_id : p_id , } , slot , value : entry . value , }) } else { None } }), - input: CrossSingleton( - Tee { - inner: , - }, - Tee { - inner: , - }, - ), + Union( + FilterMap { + f: stageleft :: runtime_support :: fn1_type_hint :: < ((i32 , (u32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >)) , u32) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | ((slot , (count , entry)) , ballot_num) | if count <= f as u32 { Some (P2a { ballot : Ballot { num : ballot_num , proposer_id : p_id , } , slot , value : entry . value , }) } else { None } }), + input: CrossSingleton( + Tee { + inner: , }, - Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , u32) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | (slot , ballot_num) | P2a { ballot : Ballot { num : ballot_num , proposer_id : p_id } , slot , value : Default :: default () } }), - input: CrossSingleton( - Difference( - FlatMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < i32 , std :: ops :: Range < i32 > > ({ use crate :: __staged :: cluster :: paxos :: * ; | max_slot | 0 .. max_slot }), - input: Tee { - inner: , - }, - }, - Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , (u32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >)) , i32 > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , _) | slot }), - input: Tee { - inner: , - }, - }, - ), - Tee { - inner: , - }, - ), + Tee { + inner: , }, ), - Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < usize , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | _u | () }), - input: Filter { - f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < usize , bool > ({ use hydroflow_plus :: __staged :: stream :: * ; | c | * c == 0 }), - input: Fold { - init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , bool , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), - input: DeferTick( - Tee { - inner: , - }, - ), + }, + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , u32) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | (slot , ballot_num) | P2a { ballot : Ballot { num : ballot_num , proposer_id : p_id } , slot , value : Default :: default () } }), + input: CrossSingleton( + Difference( + FlatMap { + f: stageleft :: runtime_support :: fn1_type_hint :: < i32 , std :: ops :: Range < i32 > > ({ use crate :: __staged :: cluster :: paxos :: * ; | max_slot | 0 .. max_slot }), + input: Tee { + inner: , + }, }, + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , (u32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >)) , i32 > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , _) | slot }), + input: Tee { + inner: , + }, + }, + ), + Tee { + inner: , }, - }, - ), - }, + ), + }, + ), Map { f: stageleft :: runtime_support :: fn1_type_hint :: < bool , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | _u | () }), input: Tee { - inner: , + inner: : Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (bool , ()) , bool > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), + input: CrossSingleton( + Tee { + inner: , + }, + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < usize , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), + input: Filter { + f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < usize , bool > ({ use hydroflow_plus :: __staged :: singleton :: * ; | c | * c == 0 }), + input: Fold { + init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , bool , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), + input: DeferTick( + Tee { + inner: , + }, + ), + }, + }, + }, + ), + }, }, }, ), @@ -813,7 +815,7 @@ expression: built.ir() }, }, Tee { - inner: : FilterMap { + inner: : FilterMap { f: stageleft :: runtime_support :: fn1_type_hint :: < ((i32 , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (usize , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload)) , core :: option :: Option < i32 > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; move | ((slot , _ballot) , (count , _p2b)) | if count == 2 * f + 1 { Some (slot) } else { None } }), input: Tee { inner: , @@ -841,7 +843,7 @@ expression: built.ir() }, }, Tee { - inner: , + inner: , }, ), }, @@ -882,7 +884,7 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < ((hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Replica > , i32) , ()) , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Replica > , i32) > ({ use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }), input: CrossSingleton( Tee { - inner: : ReduceKeyed { + inner: : ReduceKeyed { f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < i32 , i32 , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | curr_seq , seq | { if seq > * curr_seq { * curr_seq = seq ; } } }), input: Persist( Network { @@ -938,7 +940,7 @@ expression: built.ir() init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Replica > , i32) , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), input: Tee { - inner: , + inner: , }, }, }, @@ -989,10 +991,10 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload , i32) , bool > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (sorted_payload , highest_seq) | sorted_payload . seq > * highest_seq }), input: CrossSingleton( Tee { - inner: : Sort( + inner: : Sort( Union( Tee { - inner: : Map { + inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload) , hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), input: Network { from_location: Cluster( @@ -1058,12 +1060,12 @@ expression: built.ir() ), }, Tee { - inner: : Fold { + inner: : Fold { init: stageleft :: runtime_support :: fn0_type_hint :: < i32 > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | | - 1 }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < i32 , (hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload , i32) , () > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | filled_slot , (sorted_payload , highest_seq) | { let next_slot = std :: cmp :: max (* filled_slot , highest_seq) ; * filled_slot = if sorted_payload . seq == next_slot + 1 { sorted_payload . seq } else { * filled_slot } ; } }), input: CrossSingleton( Tee { - inner: , + inner: , }, Union( CycleSource { @@ -1100,23 +1102,23 @@ expression: built.ir() ), input: DeferTick( Tee { - inner: : Map { + inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (std :: collections :: hash_map :: HashMap < u32 , std :: string :: String > , i32) , i32 > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (_kv_store , highest_seq) | highest_seq }), input: Fold { init: stageleft :: runtime_support :: fn0_type_hint :: < (std :: collections :: hash_map :: HashMap < u32 , std :: string :: String > , i32) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | | (HashMap :: < u32 , String > :: new () , - 1) }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (std :: collections :: hash_map :: HashMap < u32 , std :: string :: String > , i32) , hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload , () > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | state , payload | { let kv_store = & mut state . 0 ; let last_seq = & mut state . 1 ; kv_store . insert (payload . key , payload . value) ; debug_assert ! (payload . seq == * last_seq + 1 , "Hole in log between seq {} and {}" , * last_seq , payload . seq) ; * last_seq = payload . seq ; } }), input: Persist( Tee { - inner: : Map { + inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload , i32) , hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (sorted_payload , _) | { sorted_payload } }), input: Filter { f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload , i32) , bool > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (sorted_payload , highest_seq) | sorted_payload . seq <= * highest_seq }), input: CrossSingleton( Tee { - inner: , + inner: , }, Tee { - inner: , + inner: , }, ), }, @@ -1137,7 +1139,7 @@ expression: built.ir() ), input: DeferTick( Tee { - inner: : FilterMap { + inner: : FilterMap { f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , i32) , core :: option :: Option < i32 > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let checkpoint_frequency = 1usize ; move | (max_checkpointed_seq , new_highest_seq) | if new_highest_seq - max_checkpointed_seq >= checkpoint_frequency as i32 { Some (new_highest_seq) } else { None } }), input: CrossSingleton( Union( @@ -1166,7 +1168,7 @@ expression: built.ir() ), ), Tee { - inner: , + inner: , }, ), }, @@ -1181,7 +1183,7 @@ expression: built.ir() 1, ), input: Tee { - inner: , + inner: , }, }, CycleSink { @@ -1194,7 +1196,7 @@ expression: built.ir() input: DeferTick( AntiJoin( Tee { - inner: : Union( + inner: : Union( Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Replica > , hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload) , (u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Replica >) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (sender , replica_payload) | (replica_payload . key , sender) }), input: Network { @@ -1230,7 +1232,7 @@ expression: built.ir() input: Map { f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | payload | (ClusterId :: from_raw (payload . value . parse :: < u32 > () . unwrap ()) , payload) }), input: Tee { - inner: , + inner: , }, }, }, @@ -1246,13 +1248,13 @@ expression: built.ir() ), }, Tee { - inner: : FilterMap { + inner: : FilterMap { f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , usize) , core :: option :: Option < u32 > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let f = 1usize ; move | (key , count) | { if count == f + 1 { Some (key) } else { None } } }), input: FoldKeyed { init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | | 0 }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Replica > , () > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | curr_count , _sender | { * curr_count += 1 ; } }), input: Tee { - inner: , + inner: , }, }, }, @@ -1273,7 +1275,7 @@ expression: built.ir() input: Union( Union( Tee { - inner: : CycleSource { + inner: : CycleSource { ident: Ident { sym: cycle_2, }, @@ -1285,17 +1287,17 @@ expression: built.ir() FlatMap { f: stageleft :: runtime_support :: fn1_type_hint :: < std :: time :: SystemTime , std :: iter :: Map < std :: ops :: Range < usize > , _ > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let num_clients_per_node = 1usize ; move | now | (0 .. num_clients_per_node) . map (move | virtual_id | (virtual_id , now)) }), input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: Ballot , std :: time :: SystemTime > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | SystemTime :: now () }), + f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , std :: time :: SystemTime > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | SystemTime :: now () }), input: Tee { - inner: : Delta( + inner: : Delta( Tee { - inner: : Reduce { - f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: Ballot , hydroflow_plus_test :: cluster :: paxos :: Ballot , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | curr , new | { if new > * curr { * curr = new ; } } }), + inner: : Reduce { + f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | curr , new | { if new > * curr { * curr = new ; } } }), input: Persist( Inspect { - f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: Ballot , () > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | ballot | println ! ("Client notified that leader was elected: {:?}" , ballot) }), + f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , () > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | ballot | println ! ("Client notified that leader was elected: {:?}" , ballot) }), input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos :: Ballot) , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , ()) , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (leader_id , _) | leader_id }), input: Network { from_location: Cluster( 2, @@ -1310,7 +1312,7 @@ expression: built.ir() Operator { path: "map", args: [ - "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos :: Ballot) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos :: Ballot > (& data) . unwrap () . into ()) }", + "| (id , data) : (hydroflow_plus :: ClusterId < _ > , ()) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < () > (& data) . unwrap () . into ()) }", ], }, ), @@ -1321,41 +1323,18 @@ expression: built.ir() Operator { path: "map", args: [ - "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos :: Ballot > (& b) . unwrap ()) }", + "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < () > (& b) . unwrap ()) }", ], }, ), ), input: FlatMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: Ballot , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > (__hydroflow_plus_cluster_ids_0) } ; | b | ids . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }), + f: stageleft :: runtime_support :: fn1_type_hint :: < () , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > (__hydroflow_plus_cluster_ids_0) } ; | b | ids . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }), input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (bool , u32) , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | (_is_leader , ballot_num) | Ballot { num : ballot_num , proposer_id : p_id } }), - input: CrossSingleton( - Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (bool , ()) , bool > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), - input: CrossSingleton( - Tee { - inner: , - }, - Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < usize , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), - input: Filter { - f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < usize , bool > ({ use hydroflow_plus :: __staged :: singleton :: * ; | c | * c == 0 }), - input: Fold { - init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , i32 , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), - input: Tee { - inner: , - }, - }, - }, - }, - ), - }, - Tee { - inner: , - }, - ), + f: stageleft :: runtime_support :: fn1_type_hint :: < bool , () > ({ use crate :: __staged :: cluster :: paxos :: * ; move | _ | () }), + input: Tee { + inner: , + }, }, }, }, @@ -1370,10 +1349,10 @@ expression: built.ir() }, ), Tee { - inner: : Map { + inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < u32 , (usize , std :: time :: SystemTime) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | key | (key as usize , SystemTime :: now ()) }), input: Tee { - inner: , + inner: , }, }, }, @@ -1386,7 +1365,7 @@ expression: built.ir() input: CrossSingleton( CrossSingleton( Tee { - inner: : Source { + inner: : Source { source: Interval( { use crate :: __staged :: cluster :: paxos_bench :: * ; Duration :: from_secs (1) }, ), @@ -1404,10 +1383,10 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , (std :: time :: SystemTime , std :: time :: SystemTime)) , core :: option :: Option < u128 > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (_virtual_id , (prev_time , curr_time)) | Some (curr_time . duration_since (prev_time) . unwrap () . as_micros ()) }), input: Join( Tee { - inner: , + inner: , }, Tee { - inner: , + inner: , }, ), }, @@ -1415,7 +1394,7 @@ expression: built.ir() Map { f: stageleft :: runtime_support :: fn1_type_hint :: < () , core :: option :: Option < u128 > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | None }), input: Tee { - inner: , + inner: , }, }, ), @@ -1437,7 +1416,7 @@ expression: built.ir() init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , u32 , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), input: Tee { - inner: , + inner: , }, }, Map { @@ -1448,7 +1427,7 @@ expression: built.ir() init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , () , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), input: Tee { - inner: , + inner: , }, }, }, @@ -1460,7 +1439,7 @@ expression: built.ir() Map { f: stageleft :: runtime_support :: fn1_type_hint :: < () , (usize , bool) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | (0 , true) }), input: Tee { - inner: , + inner: , }, }, ), @@ -1478,19 +1457,19 @@ expression: built.ir() ), input: Union( FlatMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: Ballot , std :: iter :: Map < std :: ops :: Range < usize > , _ > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let c_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_bench :: Client > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; let num_clients_per_node = 1usize ; move | leader_ballot | (0 .. num_clients_per_node) . map (move | i | (leader_ballot . leader_id () , ClientPayload { key : i as u32 , value : c_id . raw_id . to_string () })) }), + f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , std :: iter :: Map < std :: ops :: Range < usize > , _ > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let c_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_bench :: Client > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; let num_clients_per_node = 1usize ; move | leader_ballot | (0 .. num_clients_per_node) . map (move | i | (leader_ballot , ClientPayload { key : i as u32 , value : c_id . raw_id . to_string () })) }), input: Tee { - inner: , + inner: , }, }, Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let c_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_bench :: Client > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; move | (key , leader_ballot) | (leader_ballot . leader_id () , ClientPayload { key , value : c_id . raw_id . to_string () }) }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer >) , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let c_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_bench :: Client > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; move | (key , leader_ballot) | (leader_ballot , ClientPayload { key , value : c_id . raw_id . to_string () }) }), input: CrossSingleton( Tee { - inner: , + inner: , }, Tee { - inner: , + inner: , }, ), },