From 0a5abab3dac224c9591bcdd837d07c6e5c2773c6 Mon Sep 17 00:00:00 2001 From: Shadaj Laddad Date: Sun, 3 Nov 2024 14:54:58 -0800 Subject: [PATCH] refactor(paxos): make Paxos-KV generic (#1517) --- hydroflow_plus/src/persist_pullup.rs | 8 + hydroflow_plus_test/src/cluster/paxos.rs | 73 +-- .../src/cluster/paxos_bench.rs | 25 +- hydroflow_plus_test/src/cluster/paxos_kv.rs | 72 +-- ...cluster__paxos_bench__tests__paxos_ir.snap | 424 +++++++++--------- 5 files changed, 317 insertions(+), 285 deletions(-) diff --git a/hydroflow_plus/src/persist_pullup.rs b/hydroflow_plus/src/persist_pullup.rs index 007e4b7b2e7e..caafcb13836a 100644 --- a/hydroflow_plus/src/persist_pullup.rs +++ b/hydroflow_plus/src/persist_pullup.rs @@ -43,6 +43,14 @@ fn persist_pullup_node( input: behind_persist, })), + HfPlusNode::FilterMap { + f, + input: box HfPlusNode::Persist(behind_persist), + } => HfPlusNode::Persist(Box::new(HfPlusNode::FilterMap { + f, + input: behind_persist, + })), + HfPlusNode::FlatMap { f, input: box HfPlusNode::Persist(behind_persist), diff --git a/hydroflow_plus_test/src/cluster/paxos.rs b/hydroflow_plus_test/src/cluster/paxos.rs index 05dcb8382f53..2c5b8ce2914e 100644 --- a/hydroflow_plus_test/src/cluster/paxos.rs +++ b/hydroflow_plus_test/src/cluster/paxos.rs @@ -11,10 +11,8 @@ use tokio::time::Instant; pub struct Proposer {} pub struct Acceptor {} -pub trait PaxosPayload: - Serialize + DeserializeOwned + PartialEq + Eq + Default + Clone + Debug -{ -} +pub trait PaxosPayload: Serialize + DeserializeOwned + PartialEq + Eq + Clone + Debug {} +impl PaxosPayload for T {} #[derive(Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Hash)] pub struct Ballot { @@ -31,7 +29,7 @@ struct P1a { #[derive(Serialize, Deserialize, Clone, Debug)] struct LogValue

{ ballot: Ballot, - value: P, + value: Option

, // might be a hole } #[derive(Serialize, Deserialize, Clone, Debug)] @@ -45,7 +43,7 @@ struct P1b { struct P2a

{ ballot: Ballot, slot: i32, - value: P, + value: Option

, // might be a re-committed hole } #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)] @@ -53,7 +51,7 @@ struct P2b

{ ballot: Ballot, max_ballot: Ballot, slot: i32, - value: P, + value: Option

, // might be a hole } #[expect( @@ -77,7 +75,7 @@ pub fn paxos_core<'a, P: PaxosPayload, R>( i_am_leader_check_timeout_delay_multiplier: usize, ) -> ( Stream<(), Unbounded, NoTick, Cluster<'a, Proposer>>, - Stream<(i32, P), Unbounded, NoTick, Cluster<'a, Proposer>>, + Stream<(i32, Option

), Unbounded, NoTick, Cluster<'a, Proposer>>, ) { proposers .source_iter(q!(["Proposers say hello"])) @@ -456,28 +454,33 @@ fn recommit_after_leader_election<'a, P: PaxosPayload>( let p_p1b_highest_entries_and_count = p_relevant_p1bs .flat_map(q!(|p1b| p1b.accepted.into_iter())) // Convert HashMap log back to stream - .fold_keyed(q!(|| (0, LogValue { ballot: Ballot { num: 0, proposer_id: ClusterId::from_raw(0) }, value: Default::default() })), q!(|curr_entry, new_entry| { - let same_values = new_entry.value == curr_entry.1.value; - let higher_ballot = new_entry.ballot > curr_entry.1.ballot; - // Increment count if the values are the same - if same_values { - curr_entry.0 += 1; - } - // Replace the ballot with the largest one - if higher_ballot { - curr_entry.1.ballot = new_entry.ballot; - // Replace the value with the one from the largest ballot, if necessary - if !same_values { - curr_entry.0 = 1; - curr_entry.1.value = new_entry.value; + .fold_keyed::<(usize, Option>), _, _>(q!(|| (0, None)), q!(|curr_entry, new_entry| { + if let Some(curr_entry_payload) = &mut curr_entry.1 { + let same_values = new_entry.value == curr_entry_payload.value; + let higher_ballot = new_entry.ballot > curr_entry_payload.ballot; + // Increment count if the values are the same + if same_values { + curr_entry.0 += 1; } + // Replace the ballot with the largest one + if higher_ballot { + curr_entry_payload.ballot = new_entry.ballot; + // Replace the value with the one from the largest ballot, if necessary + if !same_values { + curr_entry.0 = 1; + curr_entry_payload.value = new_entry.value; + } + } + } else { + *curr_entry = (1, Some(new_entry)); } })); let p_log_to_try_commit = p_p1b_highest_entries_and_count .clone() .cross_singleton(p_ballot_num.clone()) - .filter_map(q!( - move |((slot, (count, entry)), ballot_num)| if count <= f as u32 { + .filter_map(q!(move |((slot, (count, entry)), ballot_num)| { + let entry = entry.unwrap(); + if count <= f { Some(P2a { ballot: Ballot { num: ballot_num, @@ -489,7 +492,7 @@ fn recommit_after_leader_election<'a, P: PaxosPayload>( } else { None } - )); + })); let p_max_slot = p_p1b_highest_entries_and_count .clone() .map(q!(|(slot, _)| slot)) @@ -508,7 +511,7 @@ fn recommit_after_leader_election<'a, P: PaxosPayload>( proposer_id: p_id }, slot, - value: Default::default() + value: None })); (p_log_to_try_commit, p_max_slot, p_log_holes) } @@ -538,7 +541,7 @@ fn sequence_payload<'a, P: PaxosPayload, R>( a_max_ballot: Singleton>, ) -> ( - Stream<(i32, P), Unbounded, NoTick, Cluster<'a, Proposer>>, + Stream<(i32, Option

), Unbounded, NoTick, Cluster<'a, Proposer>>, Singleton<(i32, HashMap>), Bounded, Tick, Cluster<'a, Acceptor>>, Stream, Unbounded, NoTick, Cluster<'a, Proposer>>, ) { @@ -563,7 +566,6 @@ fn sequence_payload<'a, P: PaxosPayload, R>( f, ); - // End tell clients that leader election has completed let p_to_replicas = p_p2b(proposers, a_to_proposers_p2b.clone(), f); (p_to_replicas, a_log, a_to_proposers_p2b) @@ -602,7 +604,11 @@ fn p_p2a<'a, P: PaxosPayload>( // .inspect(q!(|next| println!("{} p_indexed_payloads next slot: {}", context.current_tick(), next)))) .cross_singleton(p_ballot_num.clone()) // .inspect(q!(|ballot_num| println!("{} p_indexed_payloads ballot_num: {}", context.current_tick(), ballot_num)))) - .map(q!(move |(((index, payload), next_slot), ballot_num)| P2a { ballot: Ballot { num: ballot_num, proposer_id: p_id }, slot: next_slot + index as i32, value: payload })); + .map(q!(move |(((index, payload), next_slot), ballot_num)| P2a { + ballot: Ballot { num: ballot_num, proposer_id: p_id }, + slot: next_slot + index as i32, + value: Some(payload) + })); // .inspect(q!(|p2a: &P2a| println!("{} p_indexed_payloads P2a: {:?}", context.current_tick(), p2a))); let p_to_acceptors_p2a = p_log_to_recommit .union(p_indexed_payloads.clone()) @@ -738,7 +744,7 @@ fn p_p2b<'a, P: PaxosPayload>( proposers: &Cluster<'a, Proposer>, a_to_proposers_p2b: Stream, Unbounded, NoTick, Cluster<'a, Proposer>>, f: usize, -) -> Stream<(i32, P), Unbounded, NoTick, Cluster<'a, Proposer>> { +) -> Stream<(i32, Option

), Unbounded, NoTick, Cluster<'a, Proposer>> { let (p_broadcasted_p2b_slots_complete_cycle, p_broadcasted_p2b_slots) = proposers.tick_cycle(); let (p_persisted_p2bs_complete_cycle, p_persisted_p2bs) = proposers.tick_cycle(); let p_p2b = a_to_proposers_p2b.tick_batch().union(p_persisted_p2bs); @@ -751,12 +757,13 @@ fn p_p2b<'a, P: PaxosPayload>( None })) .fold_keyed( - q!(|| (0, Default::default())), + q!(|| (0, None)), q!(|accum, value| { accum.0 += 1; - accum.1 = value; + accum.1 = Some(value); }), - ); + ) + .map(q!(|(k, (count, v))| (k, (count, v.unwrap())))); let p_p2b_quorum_reached = p_count_matching_p2bs .clone() diff --git a/hydroflow_plus_test/src/cluster/paxos_bench.rs b/hydroflow_plus_test/src/cluster/paxos_bench.rs index 2332a176597c..baba72b2dd2f 100644 --- a/hydroflow_plus_test/src/cluster/paxos_bench.rs +++ b/hydroflow_plus_test/src/cluster/paxos_bench.rs @@ -6,7 +6,7 @@ use hydroflow_plus::*; use stageleft::*; use super::paxos::{Acceptor, Proposer}; -use super::paxos_kv::{paxos_kv, KvPayload, Replica, SequencedKv}; +use super::paxos_kv::{paxos_kv, KvPayload, Replica}; pub struct Client {} @@ -58,10 +58,7 @@ pub fn paxos_bench<'a>( .map(q!(|(leader_id, _)| leader_id)), ); processed_payloads - .map(q!(|payload| ( - ClusterId::from_raw(payload.kv.value.parse::().unwrap()), - payload - ))) + .map(q!(|payload| (payload.value, payload))) .send_bincode(&clients) }, num_clients_per_node, @@ -82,9 +79,14 @@ fn bench_client<'a>( Cluster<'a, Client>, >, transaction_cycle: impl FnOnce( - Stream<(ClusterId, KvPayload), Unbounded, NoTick, Cluster<'a, Client>>, + Stream< + (ClusterId, KvPayload>), + Unbounded, + NoTick, + Cluster<'a, Client>, + >, ) -> Stream< - (ClusterId, SequencedKv), + (ClusterId, KvPayload>), Unbounded, NoTick, Cluster<'a, Client>, @@ -112,7 +114,7 @@ fn bench_client<'a>( leader_ballot, KvPayload { key: i as u32, - value: c_id.raw_id.to_string() + value: c_id } ) ))); @@ -126,7 +128,7 @@ fn bench_client<'a>( let c_received_payloads = transaction_results .tick_batch() .map(q!(|(sender, replica_payload)| ( - replica_payload.kv.key, + replica_payload.key, sender ))) .union(c_pending_quorum_payloads); @@ -154,10 +156,7 @@ fn bench_client<'a>( .cross_singleton(current_leader.clone().latest_tick()) .map(q!(move |(key, cur_leader)| ( cur_leader, - KvPayload { - key, - value: c_id.raw_id.to_string() - } + KvPayload { key, value: c_id } ))); c_to_proposers_complete_cycle.complete( c_new_payloads_when_leader_elected diff --git a/hydroflow_plus_test/src/cluster/paxos_kv.rs b/hydroflow_plus_test/src/cluster/paxos_kv.rs index ba9a40e73ef4..597401290001 100644 --- a/hydroflow_plus_test/src/cluster/paxos_kv.rs +++ b/hydroflow_plus_test/src/cluster/paxos_kv.rs @@ -1,35 +1,45 @@ use std::collections::HashMap; +use std::fmt::Debug; +use std::hash::Hash; use hydroflow_plus::*; +use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use stageleft::*; -use super::paxos::{paxos_core, Acceptor, PaxosPayload, Proposer}; +use super::paxos::{paxos_core, Acceptor, Proposer}; pub struct Replica {} -#[derive(Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Clone, Debug)] -pub struct KvPayload { - pub key: u32, - pub value: String, -} +pub trait KvKey: Serialize + DeserializeOwned + Hash + Eq + Clone + Debug {} +impl KvKey for K {} -impl Default for KvPayload { - fn default() -> Self { - Self { - key: 0, - value: "".to_string(), - } - } -} +pub trait KvValue: Serialize + DeserializeOwned + Eq + Clone + Debug {} +impl KvValue for V {} -impl PaxosPayload for KvPayload {} +#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)] +pub struct KvPayload { + pub key: K, + pub value: V, +} -#[derive(Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Clone, Debug)] -pub struct SequencedKv { +#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)] +pub struct SequencedKv { // Note: Important that seq is the first member of the struct for sorting pub seq: i32, - pub kv: KvPayload, + pub kv: Option>, +} + +impl Ord for SequencedKv { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.seq.cmp(&other.seq) + } +} + +impl PartialOrd for SequencedKv { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } } #[expect( @@ -37,11 +47,11 @@ pub struct SequencedKv { clippy::too_many_arguments, reason = "internal paxos code // TODO" )] -pub fn paxos_kv<'a>( +pub fn paxos_kv<'a, K: KvKey, V: KvValue>( proposers: &Cluster<'a, Proposer>, acceptors: &Cluster<'a, Acceptor>, replicas: &Cluster<'a, Replica>, - c_to_proposers: Stream>, + c_to_proposers: Stream, Unbounded, NoTick, Cluster<'a, Proposer>>, f: usize, i_am_leader_send_timeout: u64, i_am_leader_check_timeout: u64, @@ -49,7 +59,7 @@ pub fn paxos_kv<'a>( checkpoint_frequency: usize, ) -> ( Stream<(), Unbounded, NoTick, Cluster<'a, Proposer>>, - Stream>, + Stream, Unbounded, NoTick, Cluster<'a, Replica>>, ) { let (r_to_acceptors_checkpoint_complete_cycle, r_to_acceptors_checkpoint) = replicas.forward_ref::>(); @@ -80,13 +90,13 @@ pub fn paxos_kv<'a>( // Replicas. All relations for replicas will be prefixed with r. Expects ReplicaPayload on p_to_replicas, outputs a stream of (client address, ReplicaPayload) after processing. #[expect(clippy::type_complexity, reason = "internal paxos code // TODO")] -pub fn replica<'a>( +pub fn replica<'a, K: KvKey, V: KvValue>( replicas: &Cluster<'a, Replica>, - p_to_replicas: Stream>, + p_to_replicas: Stream, Unbounded, NoTick, Cluster<'a, Replica>>, checkpoint_frequency: usize, ) -> ( Stream>, - Stream>, + Stream, Unbounded, NoTick, Cluster<'a, Replica>>, ) { let (r_buffered_payloads_complete_cycle, r_buffered_payloads) = replicas.tick_cycle(); // p_to_replicas.inspect(q!(|payload: ReplicaPayload| println!("Replica received payload: {:?}", payload))); @@ -139,10 +149,10 @@ pub fn replica<'a>( let r_kv_store = r_processable_payloads .clone() .persist() // Optimization: all_ticks() + fold() = fold, where the state of the previous fold is saved and persisted values are deleted. - .fold(q!(|| (HashMap::::new(), -1)), q!(|state, payload| { - let kv_store = &mut state.0; - let last_seq = &mut state.1; - kv_store.insert(payload.kv.key, payload.kv.value); + .fold(q!(|| (HashMap::new(), -1)), q!(|(kv_store, last_seq), payload| { + if let Some(kv) = payload.kv { + kv_store.insert(kv.key, kv.value); + } debug_assert!(payload.seq == *last_seq + 1, "Hole in log between seq {} and {}", *last_seq, payload.seq); *last_seq = payload.seq; // println!("Replica kv store: {:?}", kv_store); @@ -172,6 +182,8 @@ pub fn replica<'a>( r_checkpointed_seqs_complete_cycle.complete_next_tick(r_checkpoint_seq_new.clone()); // Tell clients that the payload has been committed. All ReplicaPayloads contain the client's machine ID (to string) as value. - let r_to_clients = p_to_replicas; - (r_checkpoint_seq_new.all_ticks(), r_to_clients) + ( + r_checkpoint_seq_new.all_ticks(), + p_to_replicas.filter_map(q!(|t| t.kv)), + ) } diff --git a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap index 6103176aa6d9..711533647bc3 100644 --- a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap +++ b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap @@ -44,7 +44,7 @@ expression: built.ir() Union( Union( Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > > , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; | p1a | p1a . max_ballot }), + f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; | p1a | p1a . max_ballot }), input: CycleSource { ident: Ident { sym: cycle_1, @@ -55,7 +55,7 @@ expression: built.ir() }, }, Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; | p2b | p2b . max_ballot }), + f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; | p2b | p2b . max_ballot }), input: CycleSource { ident: Ident { sym: cycle_0, @@ -211,7 +211,7 @@ expression: built.ir() ), input: Tee { inner: : Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > >) , hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > >) , hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), input: Network { from_location: Cluster( 1, @@ -226,7 +226,7 @@ expression: built.ir() Operator { path: "map", args: [ - "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > >) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > > > (& data) . unwrap () . into ()) }", + "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > >) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > > (& data) . unwrap () . into ()) }", ], }, ), @@ -237,13 +237,13 @@ expression: built.ir() Operator { path: "map", args: [ - "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Acceptor > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > > > (& b) . unwrap ()) }", + "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Acceptor > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > > (& b) . unwrap ()) }", ], }, ), ), input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((hydroflow_plus_test :: cluster :: paxos :: P1a , hydroflow_plus_test :: cluster :: paxos :: Ballot) , std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > >) , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | ((p1a , max_ballot) , log) | (p1a . ballot . proposer_id , P1b { ballot : p1a . ballot , max_ballot , accepted : log }) }), + f: stageleft :: runtime_support :: fn1_type_hint :: < ((hydroflow_plus_test :: cluster :: paxos :: P1a , hydroflow_plus_test :: cluster :: paxos :: Ballot) , std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > >) , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | ((p1a , max_ballot) , log) | (p1a . ballot . proposer_id , P1b { ballot : p1a . ballot , max_ballot , accepted : log }) }), input: CrossSingleton( CrossSingleton( Tee { @@ -378,7 +378,7 @@ expression: built.ir() }, ), Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > >) , std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | (_ckpnt , log) | log . clone () }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > >) , std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | (_ckpnt , log) | log . clone () }), input: CycleSource { ident: Ident { sym: cycle_0, @@ -409,16 +409,16 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < usize , core :: option :: Option < bool > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; move | num_received | if num_received > f { Some (true) } else { None } }), input: Fold { init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > > , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), input: Tee { inner: : Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > > , u32) , hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | t | t . 0 }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > , u32) , hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | t | t . 0 }), input: Filter { - f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > > , u32) , bool > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; move | (p1b , ballot_num) | p1b . ballot . num == * ballot_num && p1b . ballot . proposer_id == p_id }), + f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > , u32) , bool > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; move | (p1b , ballot_num) | p1b . ballot . num == * ballot_num && p1b . ballot . proposer_id == p_id }), input: CrossSingleton( Persist( Inspect { - f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > > , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | p1b | println ! ("Proposer received P1b: {:?}" , p1b) }), + f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | p1b | println ! ("Proposer received P1b: {:?}" , p1b) }), input: Tee { inner: , }, @@ -475,13 +475,13 @@ expression: built.ir() inner: : Reduce { f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < i32 , i32 , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | curr , new | { if new > * curr { * curr = new ; } } }), input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , (u32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload >)) , i32 > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , _) | slot }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , (usize , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > >)) , i32 > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , _) | slot }), input: Tee { inner: : FoldKeyed { - init: stageleft :: runtime_support :: fn0_type_hint :: < (u32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | | (0 , LogValue { ballot : Ballot { num : 0 , proposer_id : ClusterId :: from_raw (0) } , value : Default :: default () }) }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (u32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload >) , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | curr_entry , new_entry | { let same_values = new_entry . value == curr_entry . 1 . value ; let higher_ballot = new_entry . ballot > curr_entry . 1 . ballot ; if same_values { curr_entry . 0 += 1 ; } if higher_ballot { curr_entry . 1 . ballot = new_entry . ballot ; if ! same_values { curr_entry . 0 = 1 ; curr_entry . 1 . value = new_entry . value ; } } } }), + init: stageleft :: runtime_support :: fn0_type_hint :: < (usize , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | | (0 , None) }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (usize , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > >) , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | curr_entry , new_entry | { if let Some (curr_entry_payload) = & mut curr_entry . 1 { let same_values = new_entry . value == curr_entry_payload . value ; let higher_ballot = new_entry . ballot > curr_entry_payload . ballot ; if same_values { curr_entry . 0 += 1 ; } if higher_ballot { curr_entry_payload . ballot = new_entry . ballot ; if ! same_values { curr_entry . 0 = 1 ; curr_entry_payload . value = new_entry . value ; } } } else { * curr_entry = (1 , Some (new_entry)) ; } } }), input: FlatMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > > , std :: collections :: hash_map :: IntoIter < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | p1b | p1b . accepted . into_iter () }), + f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > , std :: collections :: hash_map :: IntoIter < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | p1b | p1b . accepted . into_iter () }), input: Tee { inner: , }, @@ -531,15 +531,15 @@ expression: built.ir() Tee { inner: : Fold { init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), input: Tee { inner: : Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (((usize , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload) , i32) , u32) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; move | (((index , payload) , next_slot) , ballot_num) | P2a { ballot : Ballot { num : ballot_num , proposer_id : p_id } , slot : next_slot + index as i32 , value : payload } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (((usize , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > >) , i32) , u32) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; move | (((index , payload) , next_slot) , ballot_num) | P2a { ballot : Ballot { num : ballot_num , proposer_id : p_id } , slot : next_slot + index as i32 , value : Some (payload) } }), input: CrossSingleton( CrossSingleton( Enumerate( Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload) , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > >) , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), input: Network { from_location: Cluster( 2, @@ -554,7 +554,7 @@ expression: built.ir() Operator { path: "map", args: [ - "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > (& data) . unwrap () . into ()) }", + "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > >) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > (& data) . unwrap () . into ()) }", ], }, ), @@ -565,7 +565,7 @@ expression: built.ir() Operator { path: "map", args: [ - "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_bench :: Client > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > (& b) . unwrap ()) }", + "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_bench :: Client > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > (& b) . unwrap ()) }", ], }, ), @@ -619,194 +619,197 @@ expression: built.ir() input: DeferTick( Difference( Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload) , i32 > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , _value) | slot }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > >) , i32 > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , _value) | slot }), input: Tee { inner: : FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((i32 , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (usize , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload)) , core :: option :: Option < (i32 , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload) > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; move | ((slot , _ballot) , (count , value)) | if count > f { Some ((slot , value)) } else { None } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < ((i32 , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (usize , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > >)) , core :: option :: Option < (i32 , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > >) > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; move | ((slot , _ballot) , (count , value)) | if count > f { Some ((slot , value)) } else { None } }), input: Tee { - inner: : FoldKeyed { - init: stageleft :: runtime_support :: fn0_type_hint :: < (usize , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload) > ({ use crate :: __staged :: cluster :: paxos :: * ; | | (0 , Default :: default ()) }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (usize , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload) , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | accum , value | { accum . 0 += 1 ; accum . 1 = value ; } }), - input: FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > , core :: option :: Option < ((i32 , hydroflow_plus_test :: cluster :: paxos :: Ballot) , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload) > > ({ use crate :: __staged :: cluster :: paxos :: * ; | p2b | if p2b . ballot == p2b . max_ballot { Some (((p2b . slot , p2b . ballot) , p2b . value)) } else { None } }), - input: Tee { - inner: : Union( - Tee { - inner: : Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload >) , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), - input: Network { - from_location: Cluster( - 1, - ), - from_key: None, - to_location: Cluster( - 0, - ), - to_key: None, - serialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload >) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > (& data) . unwrap () . into ()) }", - ], - }, + inner: : Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < ((i32 , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (usize , core :: option :: Option < core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > >)) , ((i32 , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (usize , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > >)) > ({ use crate :: __staged :: cluster :: paxos :: * ; | (k , (count , v)) | (k , (count , v . unwrap ())) }), + input: FoldKeyed { + init: stageleft :: runtime_support :: fn0_type_hint :: < (usize , core :: option :: Option < core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | | (0 , None) }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (usize , core :: option :: Option < core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > >) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | accum , value | { accum . 0 += 1 ; accum . 1 = Some (value) ; } }), + input: FilterMap { + f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > , core :: option :: Option < ((i32 , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > >) > > ({ use crate :: __staged :: cluster :: paxos :: * ; | p2b | if p2b . ballot == p2b . max_ballot { Some (((p2b . slot , p2b . ballot) , p2b . value)) } else { None } }), + input: Tee { + inner: : Union( + Tee { + inner: : Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > >) , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), + input: Network { + from_location: Cluster( + 1, ), - ), - instantiate_fn: , - deserialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Acceptor > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > (& b) . unwrap ()) }", - ], - }, + from_key: None, + to_location: Cluster( + 0, ), - ), - input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | (p2a , max_ballot) | (p2a . ballot . proposer_id , P2b { ballot : p2a . ballot , max_ballot , slot : p2a . slot , value : p2a . value }) }), - input: CrossSingleton( - Tee { - inner: : Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload >) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), - input: Network { - from_location: Cluster( - 0, - ), - from_key: None, - to_location: Cluster( - 1, - ), - to_key: None, - serialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload >) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > (& data) . unwrap () . into ()) }", - ], - }, + to_key: None, + serialize_pipeline: Some( + Operator( + Operator { + path: "map", + args: [ + "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > >) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > (& data) . unwrap () . into ()) }", + ], + }, + ), + ), + instantiate_fn: , + deserialize_pipeline: Some( + Operator( + Operator { + path: "map", + args: [ + "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Acceptor > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > (& b) . unwrap ()) }", + ], + }, + ), + ), + input: Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | (p2a , max_ballot) | (p2a . ballot . proposer_id , P2b { ballot : p2a . ballot , max_ballot , slot : p2a . slot , value : p2a . value }) }), + input: CrossSingleton( + Tee { + inner: : Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > >) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), + input: Network { + from_location: Cluster( + 0, ), - ), - instantiate_fn: , - deserialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > (& b) . unwrap ()) }", - ], - }, + from_key: None, + to_location: Cluster( + 1, ), - ), - input: FlatMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > > (__hydroflow_plus_cluster_ids_1) } ; | b | ids . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }), - input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > , ()) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }), - input: CrossSingleton( - Union( - Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > , ()) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }), - input: CrossSingleton( - Union( - FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((i32 , (u32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload >)) , u32) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; move | ((slot , (count , entry)) , ballot_num) | if count <= f as u32 { Some (P2a { ballot : Ballot { num : ballot_num , proposer_id : p_id , } , slot , value : entry . value , }) } else { None } }), - input: CrossSingleton( - Tee { - inner: , - }, - Tee { - inner: , - }, - ), - }, - Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , u32) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; move | (slot , ballot_num) | P2a { ballot : Ballot { num : ballot_num , proposer_id : p_id } , slot , value : Default :: default () } }), - input: CrossSingleton( - Difference( - FlatMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < i32 , std :: ops :: Range < i32 > > ({ use crate :: __staged :: cluster :: paxos :: * ; | max_slot | 0 .. max_slot }), - input: Tee { - inner: , - }, + to_key: None, + serialize_pipeline: Some( + Operator( + Operator { + path: "map", + args: [ + "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > >) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > (& data) . unwrap () . into ()) }", + ], + }, + ), + ), + instantiate_fn: , + deserialize_pipeline: Some( + Operator( + Operator { + path: "map", + args: [ + "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > (& b) . unwrap ()) }", + ], + }, + ), + ), + input: FlatMap { + f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > > (__hydroflow_plus_cluster_ids_1) } ; | b | ids . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }), + input: Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > , ()) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }), + input: CrossSingleton( + Union( + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > , ()) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }), + input: CrossSingleton( + Union( + FilterMap { + f: stageleft :: runtime_support :: fn1_type_hint :: < ((i32 , (usize , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > >)) , u32) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; move | ((slot , (count , entry)) , ballot_num) | { let entry = entry . unwrap () ; if count <= f { Some (P2a { ballot : Ballot { num : ballot_num , proposer_id : p_id , } , slot , value : entry . value , }) } else { None } } }), + input: CrossSingleton( + Tee { + inner: , }, - Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , (u32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload >)) , i32 > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , _) | slot }), - input: Tee { - inner: , - }, + Tee { + inner: , }, ), - Tee { - inner: , - }, - ), - }, - ), - Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < bool , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | _u | () }), - input: Tee { - inner: : Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (bool , ()) , bool > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), + }, + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , u32) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; move | (slot , ballot_num) | P2a { ballot : Ballot { num : ballot_num , proposer_id : p_id } , slot , value : None } }), input: CrossSingleton( - Tee { - inner: , - }, - Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < usize , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), - input: Filter { - f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < usize , bool > ({ use hydroflow_plus :: __staged :: singleton :: * ; | c | * c == 0 }), - input: Fold { - init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , bool , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), - input: DeferTick( - Tee { - inner: , - }, - ), + Difference( + FlatMap { + f: stageleft :: runtime_support :: fn1_type_hint :: < i32 , std :: ops :: Range < i32 > > ({ use crate :: __staged :: cluster :: paxos :: * ; | max_slot | 0 .. max_slot }), + input: Tee { + inner: , + }, + }, + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , (usize , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > >)) , i32 > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , _) | slot }), + input: Tee { + inner: , }, }, + ), + Tee { + inner: , }, ), }, + ), + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < bool , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | _u | () }), + input: Tee { + inner: : Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (bool , ()) , bool > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), + input: CrossSingleton( + Tee { + inner: , + }, + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < usize , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), + input: Filter { + f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < usize , bool > ({ use hydroflow_plus :: __staged :: singleton :: * ; | c | * c == 0 }), + input: Fold { + init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , bool , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), + input: DeferTick( + Tee { + inner: , + }, + ), + }, + }, + }, + ), + }, + }, }, - }, - ), - }, - Tee { - inner: , + ), + }, + Tee { + inner: , + }, + ), + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < bool , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | _u | () }), + input: Tee { + inner: , + }, }, ), - Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < bool , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | _u | () }), - input: Tee { - inner: , - }, - }, - ), + }, }, }, }, }, - }, - Tee { - inner: , - }, - ), + Tee { + inner: , + }, + ), + }, }, }, }, - }, - CycleSource { - ident: Ident { - sym: cycle_7, + CycleSource { + ident: Ident { + sym: cycle_7, + }, + location_kind: Cluster( + 0, + ), }, - location_kind: Cluster( - 0, - ), - }, - ), + ), + }, }, }, }, @@ -816,7 +819,7 @@ expression: built.ir() }, Tee { inner: : FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((i32 , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (usize , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload)) , core :: option :: Option < i32 > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; move | ((slot , _ballot) , (count , _p2b)) | if count == 2 * f + 1 { Some (slot) } else { None } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < ((i32 , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (usize , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > >)) , core :: option :: Option < i32 > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; move | ((slot , _ballot) , (count , _p2b)) | if count == 2 * f + 1 { Some (slot) } else { None } }), input: Tee { inner: , }, @@ -834,10 +837,10 @@ expression: built.ir() ), input: DeferTick( Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload >) , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > ({ use crate :: __staged :: cluster :: paxos :: * ; | (_slot , p2b) | p2b }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > >) , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | (_slot , p2b) | p2b }), input: AntiJoin( Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > , (i32 , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | p2b | (p2b . slot , p2b) }), + f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > , (i32 , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | p2b | (p2b . slot , p2b) }), input: Tee { inner: , }, @@ -857,12 +860,12 @@ expression: built.ir() 1, ), input: Fold { - init: stageleft :: runtime_support :: fn0_type_hint :: < (i32 , std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | | (- 1 , HashMap :: new ()) }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (i32 , std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > >) , hydroflow_plus_test :: cluster :: paxos :: CheckpointOrP2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | (prev_checkpoint , log) , checkpoint_or_p2a | { match checkpoint_or_p2a { CheckpointOrP2a :: Checkpoint (new_checkpoint) => { for slot in * prev_checkpoint .. new_checkpoint { log . remove (& slot) ; } * prev_checkpoint = new_checkpoint ; } CheckpointOrP2a :: P2a (p2a) => { if p2a . slot > * prev_checkpoint && log . get (& p2a . slot) . map (| prev_p2a : & LogValue < _ > | p2a . ballot > prev_p2a . ballot) . unwrap_or (true) { log . insert (p2a . slot , LogValue { ballot : p2a . ballot , value : p2a . value , } ,) ; } } } } }), + init: stageleft :: runtime_support :: fn0_type_hint :: < (i32 , std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | | (- 1 , HashMap :: new ()) }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (i32 , std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > >) , hydroflow_plus_test :: cluster :: paxos :: CheckpointOrP2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | (prev_checkpoint , log) , checkpoint_or_p2a | { match checkpoint_or_p2a { CheckpointOrP2a :: Checkpoint (new_checkpoint) => { for slot in * prev_checkpoint .. new_checkpoint { log . remove (& slot) ; } * prev_checkpoint = new_checkpoint ; } CheckpointOrP2a :: P2a (p2a) => { if p2a . slot > * prev_checkpoint && log . get (& p2a . slot) . map (| prev_p2a : & LogValue < _ > | p2a . ballot > prev_p2a . ballot) . unwrap_or (true) { log . insert (p2a . slot , LogValue { ballot : p2a . ballot , value : p2a . value , } ,) ; } } } } }), input: Persist( Union( FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: CheckpointOrP2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | (p2a , max_ballot) | if p2a . ballot >= max_ballot { Some (CheckpointOrP2a :: P2a (p2a)) } else { None } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: CheckpointOrP2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | (p2a , max_ballot) | if p2a . ballot >= max_ballot { Some (CheckpointOrP2a :: P2a (p2a)) } else { None } }), input: CrossSingleton( Tee { inner: , @@ -873,7 +876,7 @@ expression: built.ir() ), }, Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < i32 , hydroflow_plus_test :: cluster :: paxos :: CheckpointOrP2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > ({ use crate :: __staged :: cluster :: paxos :: * ; | min_seq | CheckpointOrP2a :: Checkpoint (min_seq) }), + f: stageleft :: runtime_support :: fn1_type_hint :: < i32 , hydroflow_plus_test :: cluster :: paxos :: CheckpointOrP2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | min_seq | CheckpointOrP2a :: Checkpoint (min_seq) }), input: Delta( Union( Reduce { @@ -986,16 +989,16 @@ expression: built.ir() ), input: DeferTick( Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv , i32) , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | (sorted_payload , _) | { sorted_payload } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , i32) , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | (sorted_payload , _) | { sorted_payload } }), input: Filter { - f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv , i32) , bool > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | (sorted_payload , highest_seq) | sorted_payload . seq > * highest_seq }), + f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , i32) , bool > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | (sorted_payload , highest_seq) | sorted_payload . seq > * highest_seq }), input: CrossSingleton( Tee { inner: : Sort( Union( Tee { inner: : Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv) , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > >) , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), input: Network { from_location: Cluster( 0, @@ -1010,7 +1013,7 @@ expression: built.ir() Operator { path: "map", args: [ - "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv > (& data) . unwrap () . into ()) }", + "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > >) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > (& data) . unwrap () . into ()) }", ], }, ), @@ -1021,15 +1024,15 @@ expression: built.ir() Operator { path: "map", args: [ - "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv > (& b) . unwrap ()) }", + "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > (& b) . unwrap ()) }", ], }, ), ), input: FlatMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > > > (__hydroflow_plus_cluster_ids_3) } ; | b | ids . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }), + f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > > > (__hydroflow_plus_cluster_ids_3) } ; | b | ids . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }), input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload) , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | (slot , kv) | SequencedKv { seq : slot , kv } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > >) , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | (slot , kv) | SequencedKv { seq : slot , kv } }), input: AntiJoin( Tee { inner: , @@ -1062,7 +1065,7 @@ expression: built.ir() Tee { inner: : Fold { init: stageleft :: runtime_support :: fn0_type_hint :: < i32 > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | | - 1 }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < i32 , (hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv , i32) , () > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | filled_slot , (sorted_payload , highest_seq) | { let next_slot = std :: cmp :: max (* filled_slot , highest_seq) ; * filled_slot = if sorted_payload . seq == next_slot + 1 { sorted_payload . seq } else { * filled_slot } ; } }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < i32 , (hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , i32) , () > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | filled_slot , (sorted_payload , highest_seq) | { let next_slot = std :: cmp :: max (* filled_slot , highest_seq) ; * filled_slot = if sorted_payload . seq == next_slot + 1 { sorted_payload . seq } else { * filled_slot } ; } }), input: CrossSingleton( Tee { inner: , @@ -1103,16 +1106,16 @@ expression: built.ir() input: DeferTick( Tee { inner: : Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (std :: collections :: hash_map :: HashMap < u32 , std :: string :: String > , i32) , i32 > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | (_kv_store , highest_seq) | highest_seq }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (std :: collections :: hash_map :: HashMap < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , i32) , i32 > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | (_kv_store , highest_seq) | highest_seq }), input: Fold { - init: stageleft :: runtime_support :: fn0_type_hint :: < (std :: collections :: hash_map :: HashMap < u32 , std :: string :: String > , i32) > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | | (HashMap :: < u32 , String > :: new () , - 1) }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (std :: collections :: hash_map :: HashMap < u32 , std :: string :: String > , i32) , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv , () > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | state , payload | { let kv_store = & mut state . 0 ; let last_seq = & mut state . 1 ; kv_store . insert (payload . kv . key , payload . kv . value) ; debug_assert ! (payload . seq == * last_seq + 1 , "Hole in log between seq {} and {}" , * last_seq , payload . seq) ; * last_seq = payload . seq ; } }), + init: stageleft :: runtime_support :: fn0_type_hint :: < (std :: collections :: hash_map :: HashMap < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , i32) > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | | (HashMap :: new () , - 1) }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (std :: collections :: hash_map :: HashMap < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , i32) , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , () > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | (kv_store , last_seq) , payload | { if let Some (kv) = payload . kv { kv_store . insert (kv . key , kv . value) ; } debug_assert ! (payload . seq == * last_seq + 1 , "Hole in log between seq {} and {}" , * last_seq , payload . seq) ; * last_seq = payload . seq ; } }), input: Persist( Tee { inner: : Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv , i32) , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | (sorted_payload , _) | { sorted_payload } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , i32) , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | (sorted_payload , _) | { sorted_payload } }), input: Filter { - f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv , i32) , bool > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | (sorted_payload , highest_seq) | sorted_payload . seq <= * highest_seq }), + f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , i32) , bool > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | (sorted_payload , highest_seq) | sorted_payload . seq <= * highest_seq }), input: CrossSingleton( Tee { inner: , @@ -1249,7 +1252,7 @@ expression: built.ir() Tee { inner: : Union( Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv) , (u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica >) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (sender , replica_payload) | (replica_payload . kv . key , sender) }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > >) , (u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica >) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (sender , replica_payload) | (replica_payload . key , sender) }), input: Network { from_location: Cluster( 3, @@ -1264,7 +1267,7 @@ expression: built.ir() Operator { path: "map", args: [ - "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv > (& data) . unwrap () . into ()) }", + "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > >) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > (& data) . unwrap () . into ()) }", ], }, ), @@ -1275,15 +1278,18 @@ expression: built.ir() Operator { path: "map", args: [ - "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv > (& b) . unwrap ()) }", + "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > (& b) . unwrap ()) }", ], }, ), ), input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | payload | (ClusterId :: from_raw (payload . kv . value . parse :: < u32 > () . unwrap ()) , payload) }), - input: Tee { - inner: , + f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > >) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | payload | (payload . value , payload) }), + input: FilterMap { + f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | t | t . kv }), + input: Tee { + inner: , + }, }, }, }, @@ -1322,7 +1328,7 @@ expression: built.ir() ), input: Union( FlatMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , std :: iter :: Map < std :: ops :: Range < usize > , _ > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let c_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_bench :: Client > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; let num_clients_per_node = 1usize ; move | leader_ballot | (0 .. num_clients_per_node) . map (move | i | (leader_ballot , KvPayload { key : i as u32 , value : c_id . raw_id . to_string () })) }), + f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , std :: iter :: Map < std :: ops :: Range < usize > , _ > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let c_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_bench :: Client > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; let num_clients_per_node = 1usize ; move | leader_ballot | (0 .. num_clients_per_node) . map (move | i | (leader_ballot , KvPayload { key : i as u32 , value : c_id })) }), input: Tee { inner: : Delta( Tee { @@ -1347,7 +1353,7 @@ expression: built.ir() }, }, Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer >) , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let c_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_bench :: Client > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | (key , cur_leader) | (cur_leader , KvPayload { key , value : c_id . raw_id . to_string () }) }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer >) , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > >) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let c_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_bench :: Client > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | (key , cur_leader) | (cur_leader , KvPayload { key , value : c_id }) }), input: CrossSingleton( Tee { inner: ,