diff --git a/hydroflow_plus/src/builder/mod.rs b/hydroflow_plus/src/builder/mod.rs index 5b9fabc52428..a15cea44dd66 100644 --- a/hydroflow_plus/src/builder/mod.rs +++ b/hydroflow_plus/src/builder/mod.rs @@ -24,6 +24,9 @@ pub struct FlowStateInner { /// Counters for generating identifiers for cycles. pub(crate) cycle_counts: HashMap, + + /// Counters for clock IDs. + pub(crate) next_clock_id: usize, } pub type FlowState = Rc>; @@ -73,6 +76,7 @@ impl<'a> FlowBuilder<'a> { leaves: Some(vec![]), next_external_out: 0, cycle_counts: HashMap::new(), + next_clock_id: 0, })), nodes: RefCell::new(vec![]), clusters: RefCell::new(vec![]), diff --git a/hydroflow_plus/src/ir.rs b/hydroflow_plus/src/ir.rs index 557eb8076e65..8f18bf552c86 100644 --- a/hydroflow_plus/src/ir.rs +++ b/hydroflow_plus/src/ir.rs @@ -186,9 +186,10 @@ impl HfPlusLeaf { let (input_ident, input_location_id) = input.emit(graph_builders, built_tees, next_stmt_id); - let location_id = match location_kind { + let location_id = match location_kind.root() { LocationId::Process(id) => id, LocationId::Cluster(id) => id, + LocationId::Tick(_, _) => panic!(), LocationId::ExternalProcess(_) => panic!(), }; @@ -586,6 +587,7 @@ impl<'a> HfPlusNode { let location_id = match location_kind { LocationId::Process(id) => id, LocationId::Cluster(id) => id, + LocationId::Tick(_, _) => panic!(), LocationId::ExternalProcess(id) => id, }; @@ -635,9 +637,10 @@ impl<'a> HfPlusNode { ident, location_kind, } => { - let location_id = match location_kind { + let location_id = match location_kind.root() { LocationId::Process(id) => id, LocationId::Cluster(id) => id, + LocationId::Tick(_, _) => panic!(), LocationId::ExternalProcess(_) => panic!(), }; @@ -1161,6 +1164,7 @@ impl<'a> HfPlusNode { let to_id = match to_location { LocationId::Process(id) => id, LocationId::Cluster(id) => id, + LocationId::Tick(_, _) => panic!(), LocationId::ExternalProcess(id) => id, }; @@ -1355,6 +1359,8 @@ fn instantiate_network<'a, D: Deploy<'a> + 'a>( (LocationId::Cluster(_from), LocationId::ExternalProcess(_to)) => { todo!("NYI") } + (LocationId::Tick(_, _), _) => panic!(), + (_, LocationId::Tick(_, _)) => panic!(), }; (sink, source, connect_fn) } diff --git a/hydroflow_plus/src/location/mod.rs b/hydroflow_plus/src/location/mod.rs index 8bd3f0ea5a32..d8dd16750e8d 100644 --- a/hydroflow_plus/src/location/mod.rs +++ b/hydroflow_plus/src/location/mod.rs @@ -8,12 +8,9 @@ use proc_macro2::Span; use stageleft::{q, Quoted}; use super::builder::FlowState; -use crate::cycle::{ - CycleCollection, CycleCollectionWithInitial, DeferTick, ForwardRef, HfCycle, HfForwardRef, - TickCycle, -}; +use crate::cycle::{CycleCollection, ForwardRef, HfForwardRef}; use crate::ir::{HfPlusNode, HfPlusSource}; -use crate::{Bounded, Optional, Singleton, Stream, Unbounded}; +use crate::{Singleton, Stream, Unbounded}; pub mod external_process; pub use external_process::ExternalProcess; @@ -30,18 +27,29 @@ pub use can_send::CanSend; pub mod tick; pub use tick::{NoTick, Tick}; -#[derive(PartialEq, Eq, Clone, Copy, Debug)] +#[derive(PartialEq, Eq, Clone, Debug)] pub enum LocationId { Process(usize), Cluster(usize), + Tick(usize, Box), ExternalProcess(usize), } impl LocationId { + pub fn root(&self) -> &LocationId { + match self { + LocationId::Process(_) => self, + LocationId::Cluster(_) => self, + LocationId::Tick(_, id) => id.root(), + LocationId::ExternalProcess(_) => self, + } + } + pub fn raw_id(&self) -> usize { match self { LocationId::Process(id) => *id, LocationId::Cluster(id) => *id, + LocationId::Tick(_, _) => panic!("cannot get raw id for tick"), LocationId::ExternalProcess(id) => *id, } } @@ -58,11 +66,16 @@ pub trait Location<'a>: Clone { fn is_top_level() -> bool; - fn nest(&self) -> Tick + fn tick(&self) -> Tick where Self: NoTick, { - Tick { l: self.clone() } + let next_id = self.flow_state().borrow_mut().next_clock_id; + self.flow_state().borrow_mut().next_clock_id += 1; + Tick { + id: next_id, + l: self.clone(), + } } fn spin(&self) -> Stream<(), Unbounded, Self> @@ -78,19 +91,6 @@ pub trait Location<'a>: Clone { ) } - fn spin_batch( - &self, - batch_size: impl Quoted<'a, usize> + Copy + 'a, - ) -> Stream<(), Bounded, Tick> - where - Self: Sized + NoTick, - { - self.spin() - .flat_map(q!(move |_| 0..batch_size)) - .map(q!(|_| ())) - .tick_batch() - } - fn source_stream + Unpin>( &self, e: impl Quoted<'a, E>, @@ -153,35 +153,6 @@ pub trait Location<'a>: Clone { ) } - fn singleton_each_tick( - &self, - e: impl Quoted<'a, T>, - ) -> Singleton> - where - Self: Sized + NoTick, - { - self.singleton(e).latest_tick() - } - - fn singleton_first_tick( - &self, - e: impl Quoted<'a, T>, - ) -> Optional> - where - Self: Sized + NoTick, - { - let e_arr = q!([e]); - let e = e_arr.splice_untyped(); - - Optional::new( - self.clone().nest(), - HfPlusNode::Source { - source: HfPlusSource::Iter(e.into()), - location_kind: self.id(), - }, - ) - } - fn source_interval( &self, interval: impl Quoted<'a, Duration> + Copy + 'a, @@ -217,6 +188,7 @@ pub trait Location<'a>: Clone { let on_id = match self.id() { LocationId::Process(id) => id, LocationId::Cluster(id) => id, + LocationId::Tick(_, _) => panic!(), LocationId::ExternalProcess(_) => panic!(), }; @@ -238,103 +210,4 @@ pub trait Location<'a>: Clone { S::create_source(ident, self.clone()), ) } - - fn tick_forward_ref>>( - &self, - ) -> (HfForwardRef<'a, S>, S) - where - Self: NoTick, - { - let next_id = { - let on_id = match self.id() { - LocationId::Process(id) => id, - LocationId::Cluster(id) => id, - LocationId::ExternalProcess(_) => panic!(), - }; - - let mut flow_state = self.flow_state().borrow_mut(); - let next_id_entry = flow_state.cycle_counts.entry(on_id).or_default(); - - let id = *next_id_entry; - *next_id_entry += 1; - id - }; - - let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site()); - - ( - HfForwardRef { - ident: ident.clone(), - _phantom: PhantomData, - }, - S::create_source(ident, self.nest().clone()), - ) - } - - fn tick_cycle> + DeferTick>( - &self, - ) -> (HfCycle<'a, S>, S) - where - Self: NoTick, - { - let next_id = { - let on_id = match self.id() { - LocationId::Process(id) => id, - LocationId::Cluster(id) => id, - LocationId::ExternalProcess(_) => panic!(), - }; - - let mut flow_state = self.flow_state().borrow_mut(); - let next_id_entry = flow_state.cycle_counts.entry(on_id).or_default(); - - let id = *next_id_entry; - *next_id_entry += 1; - id - }; - - let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site()); - - ( - HfCycle { - ident: ident.clone(), - _phantom: PhantomData, - }, - S::create_source(ident, self.nest().clone()), - ) - } - - fn tick_cycle_with_initial< - S: CycleCollectionWithInitial<'a, TickCycle, Location = Tick> + DeferTick, - >( - &self, - initial: S, - ) -> (HfCycle<'a, S>, S) - where - Self: NoTick, - { - let next_id = { - let on_id = match self.id() { - LocationId::Process(id) => id, - LocationId::Cluster(id) => id, - LocationId::ExternalProcess(_) => panic!(), - }; - - let mut flow_state = self.flow_state().borrow_mut(); - let next_id_entry = flow_state.cycle_counts.entry(on_id).or_default(); - - let id = *next_id_entry; - *next_id_entry += 1; - id - }; - - let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site()); - - ( - HfCycle { - ident: ident.clone(), - _phantom: PhantomData, - }, - S::create_source(ident, initial, self.nest().clone()), - ) - } } diff --git a/hydroflow_plus/src/location/tick.rs b/hydroflow_plus/src/location/tick.rs index acdf26e72756..895ae45559ad 100644 --- a/hydroflow_plus/src/location/tick.rs +++ b/hydroflow_plus/src/location/tick.rs @@ -1,5 +1,16 @@ +use std::marker::PhantomData; + +use proc_macro2::Span; +use stageleft::{q, Quoted}; + use super::{Cluster, Location, LocationId, Process}; use crate::builder::FlowState; +use crate::cycle::{ + CycleCollection, CycleCollectionWithInitial, DeferTick, ForwardRef, HfCycle, HfForwardRef, + TickCycle, +}; +use crate::ir::{HfPlusNode, HfPlusSource}; +use crate::{Bounded, Optional, Singleton, Stream}; pub trait NoTick {} impl NoTick for Process<'_, T> {} @@ -8,18 +19,13 @@ impl NoTick for Cluster<'_, T> {} /// Marks the stream as being inside the single global clock domain. #[derive(Clone)] pub struct Tick { + pub(crate) id: usize, pub(crate) l: L, } -impl<'a, L: Location<'a>> Tick { - pub fn outer(&self) -> &L { - &self.l - } -} - impl<'a, L: Location<'a>> Location<'a> for Tick { fn id(&self) -> LocationId { - self.l.id() + LocationId::Tick(self.id, Box::new(self.l.id())) } fn flow_state(&self) -> &FlowState { @@ -30,3 +36,151 @@ impl<'a, L: Location<'a>> Location<'a> for Tick { false } } + +impl<'a, L: Location<'a>> Tick { + pub fn outer(&self) -> &L { + &self.l + } + + pub fn spin_batch( + &self, + batch_size: impl Quoted<'a, usize> + Copy + 'a, + ) -> Stream<(), Bounded, Self> + where + L: NoTick, + { + self.l + .spin() + .flat_map(q!(move |_| 0..batch_size)) + .map(q!(|_| ())) + .tick_batch(self) + } + + pub fn singleton(&self, e: impl Quoted<'a, T>) -> Singleton + where + L: NoTick, + { + self.outer().singleton(e).latest_tick(self) + } + + pub fn singleton_first_tick( + &self, + e: impl Quoted<'a, T>, + ) -> Optional + where + L: NoTick, + { + let e_arr = q!([e]); + let e = e_arr.splice_untyped(); + + Optional::new( + self.clone(), + HfPlusNode::Source { + source: HfPlusSource::Iter(e.into()), + location_kind: self.l.id(), + }, + ) + } + + pub fn forward_ref>( + &self, + ) -> (HfForwardRef<'a, S>, S) + where + L: NoTick, + { + let next_id = { + let on_id = match self.l.id() { + LocationId::Process(id) => id, + LocationId::Cluster(id) => id, + LocationId::Tick(_, _) => panic!(), + LocationId::ExternalProcess(_) => panic!(), + }; + + let mut flow_state = self.flow_state().borrow_mut(); + let next_id_entry = flow_state.cycle_counts.entry(on_id).or_default(); + + let id = *next_id_entry; + *next_id_entry += 1; + id + }; + + let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site()); + + ( + HfForwardRef { + ident: ident.clone(), + _phantom: PhantomData, + }, + S::create_source(ident, self.clone()), + ) + } + + pub fn cycle + DeferTick>( + &self, + ) -> (HfCycle<'a, S>, S) + where + L: NoTick, + { + let next_id = { + let on_id = match self.l.id() { + LocationId::Process(id) => id, + LocationId::Cluster(id) => id, + LocationId::Tick(_, _) => panic!(), + LocationId::ExternalProcess(_) => panic!(), + }; + + let mut flow_state = self.flow_state().borrow_mut(); + let next_id_entry = flow_state.cycle_counts.entry(on_id).or_default(); + + let id = *next_id_entry; + *next_id_entry += 1; + id + }; + + let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site()); + + ( + HfCycle { + ident: ident.clone(), + _phantom: PhantomData, + }, + S::create_source(ident, self.clone()), + ) + } + + pub fn cycle_with_initial< + S: CycleCollectionWithInitial<'a, TickCycle, Location = Self> + DeferTick, + >( + &self, + initial: S, + ) -> (HfCycle<'a, S>, S) + where + L: NoTick, + { + let next_id = { + let on_id = match self.l.id() { + LocationId::Process(id) => id, + LocationId::Cluster(id) => id, + LocationId::Tick(_, _) => panic!(), + LocationId::ExternalProcess(_) => panic!(), + }; + + let mut flow_state = self.flow_state().borrow_mut(); + let next_id_entry = flow_state.cycle_counts.entry(on_id).or_default(); + + let id = *next_id_entry; + *next_id_entry += 1; + id + }; + + let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site()); + + ( + HfCycle { + ident: ident.clone(), + _phantom: PhantomData, + }, + S::create_source(ident, initial, self.clone()), + ) + } +} diff --git a/hydroflow_plus/src/optional.rs b/hydroflow_plus/src/optional.rs index 71eab96cbd0f..7ca84bf6be1f 100644 --- a/hydroflow_plus/src/optional.rs +++ b/hydroflow_plus/src/optional.rs @@ -299,7 +299,7 @@ impl<'a, T, W, N: Location<'a>> Optional { let none: syn::Expr = parse_quote!([::std::option::Option::None]); let core_ir = HfPlusNode::Persist(Box::new(HfPlusNode::Source { source: HfPlusSource::Iter(none.into()), - location_kind: self.location.id(), + location_kind: self.location.id().root().clone(), })); let none_singleton = if N::is_top_level() { @@ -330,27 +330,28 @@ impl<'a, T, N: Location<'a>> Optional { } impl<'a, T, B, N: Location<'a> + NoTick> Optional { - pub fn latest_tick(self) -> Optional> { + pub fn latest_tick(self, tick: &Tick) -> Optional> { Optional::new( - self.location.nest(), + tick.clone(), HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner())), ) } pub fn tick_samples(self) -> Stream { - self.latest_tick().all_ticks() + let tick = self.location.tick(); + self.latest_tick(&tick).all_ticks() } pub fn sample_every( self, interval: impl Quoted<'a, std::time::Duration> + Copy + 'a, ) -> Stream { - let samples = self.location.source_interval(interval).tick_batch(); + let samples = self.location.source_interval(interval); + let tick = self.location.tick(); - self.latest_tick() - .continue_if(samples.first()) - .latest() - .tick_samples() + self.latest_tick(&tick) + .continue_if(samples.tick_batch(&tick).first()) + .all_ticks() } } diff --git a/hydroflow_plus/src/rewrites/persist_pullup.rs b/hydroflow_plus/src/rewrites/persist_pullup.rs index caafcb13836a..50ead7f9e2f4 100644 --- a/hydroflow_plus/src/rewrites/persist_pullup.rs +++ b/hydroflow_plus/src/rewrites/persist_pullup.rs @@ -167,7 +167,8 @@ mod tests { let flow = crate::builder::FlowBuilder::new(); let process = flow.process::<()>(); - let before_tee = process.source_iter(q!(0..10)).tick_batch().persist(); + let tick = process.tick(); + let before_tee = process.source_iter(q!(0..10)).tick_batch(&tick).persist(); before_tee .clone() diff --git a/hydroflow_plus/src/rewrites/properties.rs b/hydroflow_plus/src/rewrites/properties.rs index ca557c0ef67c..ecb6bcbe42a4 100644 --- a/hydroflow_plus/src/rewrites/properties.rs +++ b/hydroflow_plus/src/rewrites/properties.rs @@ -104,6 +104,7 @@ mod tests { let mut database = PropertyDatabase::default(); let process = flow.process::<()>(); + let tick = process.tick(); let counter_func = q!(|count: &mut i32, _| *count += 1); let _ = database.add_commutative_tag(counter_func); @@ -111,7 +112,7 @@ mod tests { process .source_iter(q!(vec![])) .map(q!(|string: String| (string, ()))) - .tick_batch() + .tick_batch(&tick) .fold_keyed(q!(|| 0), counter_func) .all_ticks() .for_each(q!(|(string, count)| println!("{}: {}", string, count))); diff --git a/hydroflow_plus/src/singleton.rs b/hydroflow_plus/src/singleton.rs index 7271cc4a0854..b153a0947152 100644 --- a/hydroflow_plus/src/singleton.rs +++ b/hydroflow_plus/src/singleton.rs @@ -227,27 +227,28 @@ impl<'a, T, N: Location<'a>> Singleton { } impl<'a, T, B, N: Location<'a> + NoTick> Singleton { - pub fn latest_tick(self) -> Singleton> { + pub fn latest_tick(self, tick: &Tick) -> Singleton> { Singleton::new( - self.location.nest(), + tick.clone(), HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner())), ) } pub fn tick_samples(self) -> Stream { - self.latest_tick().all_ticks() + let tick = self.location.tick(); + self.latest_tick(&tick).all_ticks() } pub fn sample_every( self, interval: impl Quoted<'a, std::time::Duration> + Copy + 'a, ) -> Stream { - let samples = self.location.source_interval(interval).tick_batch(); + let samples = self.location.source_interval(interval); + let tick = self.location.tick(); - self.latest_tick() - .continue_if(samples.first()) - .latest() - .tick_samples() + self.latest_tick(&tick) + .continue_if(samples.tick_batch(&tick).first()) + .all_ticks() } } diff --git a/hydroflow_plus/src/stream.rs b/hydroflow_plus/src/stream.rs index 065635d2d9f7..3f1e1803cc94 100644 --- a/hydroflow_plus/src/stream.rs +++ b/hydroflow_plus/src/stream.rs @@ -467,26 +467,29 @@ impl<'a, K: Eq + Hash, V, N: Location<'a>> Stream<(K, V), Bounded, Tick> { } impl<'a, T, W, N: Location<'a> + NoTick> Stream { - pub fn tick_batch(self) -> Stream> { + pub fn tick_batch(self, tick: &Tick) -> Stream> { Stream::new( - self.location.nest(), + tick.clone(), HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner())), ) } - pub fn tick_prefix(self) -> Stream> + pub fn tick_prefix(self, tick: &Tick) -> Stream> where T: Clone, { - self.tick_batch().persist() + self.tick_batch(tick).persist() } pub fn sample_every( self, interval: impl Quoted<'a, std::time::Duration> + Copy + 'a, ) -> Stream { - let samples = self.location.source_interval(interval).tick_batch(); - self.tick_batch().continue_if(samples.first()).all_ticks() + let samples = self.location.source_interval(interval); + let tick = self.location.tick(); + self.tick_batch(&tick) + .continue_if(samples.tick_batch(&tick).first()) + .all_ticks() } pub fn for_each(self, f: impl IntoQuotedMut<'a, F>) { diff --git a/hydroflow_plus_test/src/cluster/compute_pi.rs b/hydroflow_plus_test/src/cluster/compute_pi.rs index 106c39fecfdc..4f4010a3d5fc 100644 --- a/hydroflow_plus_test/src/cluster/compute_pi.rs +++ b/hydroflow_plus_test/src/cluster/compute_pi.rs @@ -14,6 +14,7 @@ pub fn compute_pi<'a>( let process = flow.process(); let trials = cluster + .tick() .spin_batch(q!(batch_size)) .map(q!(|_| rand::random::<(f64, f64)>())) .map(q!(|(x, y)| x * x + y * y < 1.0)) diff --git a/hydroflow_plus_test/src/cluster/map_reduce.rs b/hydroflow_plus_test/src/cluster/map_reduce.rs index 95696d5cbcf0..192403e423b1 100644 --- a/hydroflow_plus_test/src/cluster/map_reduce.rs +++ b/hydroflow_plus_test/src/cluster/map_reduce.rs @@ -14,7 +14,7 @@ pub fn map_reduce<'a>(flow: &FlowBuilder<'a>) -> (Process<'a, Leader>, Cluster<' let all_ids_vec = cluster.members(); let words_partitioned = words - .tick_batch() + .tick_batch(&process.tick()) .enumerate() .map(q!(|(i, w)| ( ClusterId::from_raw((i % all_ids_vec.len()) as u32), @@ -25,7 +25,7 @@ pub fn map_reduce<'a>(flow: &FlowBuilder<'a>) -> (Process<'a, Leader>, Cluster<' words_partitioned .send_bincode(&cluster) .map(q!(|string| (string, ()))) - .tick_batch() + .tick_batch(&cluster.tick()) .fold_keyed(q!(|| 0), q!(|count, _| *count += 1)) .inspect(q!(|(string, count)| println!( "partition count: {} - {}", @@ -33,7 +33,7 @@ pub fn map_reduce<'a>(flow: &FlowBuilder<'a>) -> (Process<'a, Leader>, Cluster<' ))) .all_ticks() .send_bincode_interleaved(&process) - .tick_batch() + .tick_batch(&process.tick()) .persist() .reduce_keyed(q!(|total, count| *total += count)) .all_ticks() diff --git a/hydroflow_plus_test/src/cluster/paxos.rs b/hydroflow_plus_test/src/cluster/paxos.rs index 170046d6fdbb..a5d14f794063 100644 --- a/hydroflow_plus_test/src/cluster/paxos.rs +++ b/hydroflow_plus_test/src/cluster/paxos.rs @@ -80,15 +80,20 @@ pub fn paxos_core<'a, P: PaxosPayload, R>( .source_iter(q!(["Acceptors say hello"])) .for_each(q!(|s| println!("{}", s))); + let proposer_tick = proposers.tick(); + let acceptor_tick = acceptors.tick(); + let (a_to_proposers_p2b_complete_cycle, a_to_proposers_p2b_forward_reference) = proposers.forward_ref::, _, _>>(); let (a_log_complete_cycle, a_log_forward_reference) = - acceptors - .tick_forward_ref::, HashMap>), _, _>>(); + acceptor_tick + .forward_ref::, HashMap>), _, _>>(); let (p_ballot_num, p_is_leader, p_relevant_p1bs, a_max_ballot) = leader_election( proposers, acceptors, + &proposer_tick, + &acceptor_tick, f, i_am_leader_send_timeout, i_am_leader_check_timeout, @@ -117,6 +122,8 @@ pub fn paxos_core<'a, P: PaxosPayload, R>( let (p_to_replicas, a_log, a_to_proposers_p2b) = sequence_payload( proposers, acceptors, + &proposer_tick, + &acceptor_tick, c_to_proposers, r_to_acceptors_checkpoint, p_ballot_num, @@ -141,6 +148,8 @@ pub fn paxos_core<'a, P: PaxosPayload, R>( fn leader_election<'a, L: Clone + Debug + Serialize + DeserializeOwned>( proposers: &Cluster<'a, Proposer>, acceptors: &Cluster<'a, Acceptor>, + proposer_tick: &Tick>, + acceptor_tick: &Tick>, f: usize, i_am_leader_send_timeout: u64, i_am_leader_check_timeout: u64, @@ -158,7 +167,7 @@ fn leader_election<'a, L: Clone + Debug + Serialize + DeserializeOwned>( let (p_to_proposers_i_am_leader_complete_cycle, p_to_proposers_i_am_leader_forward_ref) = proposers.forward_ref::>(); let (p_is_leader_complete_cycle, p_is_leader_forward_ref) = - proposers.tick_forward_ref::>(); + proposer_tick.forward_ref::>(); // a_to_proposers_p2b.clone().for_each(q!(|(_, p2b): (u32, P2b)| println!("Proposer received P2b: {:?}", p2b))); // p_to_proposers_i_am_leader.clone().for_each(q!(|ballot: Ballot| println!("Proposer received I am leader: {:?}", ballot))); // c_to_proposers.clone().for_each(q!(|payload: ClientPayload| println!("Client sent proposer payload: {:?}", payload))); @@ -169,11 +178,15 @@ fn leader_election<'a, L: Clone + Debug + Serialize + DeserializeOwned>( p_received_p2b_ballots, p_to_proposers_i_am_leader_forward_ref, ); - let (p_ballot_num, p_has_largest_ballot) = - p_ballot_calc(proposers, p_received_max_ballot.latest_tick()); + let (p_ballot_num, p_has_largest_ballot) = p_ballot_calc( + proposers, + proposer_tick, + p_received_max_ballot.latest_tick(proposer_tick), + ); let (p_to_proposers_i_am_leader, p_trigger_election) = p_leader_heartbeat( proposers, + proposer_tick, p_is_leader_forward_ref, p_ballot_num.clone(), i_am_leader_send_timeout, @@ -191,11 +204,12 @@ fn leader_election<'a, L: Clone + Debug + Serialize + DeserializeOwned>( ); let (a_max_ballot, a_to_proposers_p1b) = - acceptor_p1(acceptors, p_to_acceptors_p1a, a_log, proposers); + acceptor_p1(acceptor_tick, p_to_acceptors_p1a, a_log, proposers); a_to_proposers_p1b_complete_cycle.complete(a_to_proposers_p1b.clone()); let (p_is_leader, p_relevant_p1bs) = p_p1b( proposers, + proposer_tick, a_to_proposers_p1b.inspect(q!(|p1b| println!("Proposer received P1b: {:?}", p1b))), p_ballot_num.clone(), p_has_largest_ballot, @@ -227,6 +241,7 @@ fn p_max_ballot<'a>( #[expect(clippy::type_complexity, reason = "internal paxos code // TODO")] fn p_ballot_calc<'a>( proposers: &Cluster<'a, Proposer>, + proposer_tick: &Tick>, p_received_max_ballot: Singleton>>, ) -> ( Singleton>>, @@ -234,7 +249,7 @@ fn p_ballot_calc<'a>( ) { let p_id = proposers.self_id(); let (p_ballot_num_complete_cycle, p_ballot_num) = - proposers.tick_cycle_with_initial(proposers.singleton_each_tick(q!(0))); + proposer_tick.cycle_with_initial(proposer_tick.singleton(q!(0))); let p_new_ballot_num = p_received_max_ballot .clone() @@ -269,6 +284,7 @@ fn p_ballot_calc<'a>( } fn p_leader_expired<'a>( + proposer_tick: &Tick>, p_to_proposers_i_am_leader: Stream>, p_is_leader: Optional>>, i_am_leader_check_timeout: u64, // How often to check if heartbeat expired @@ -282,7 +298,7 @@ fn p_leader_expired<'a>( ); p_latest_received_i_am_leader - .latest_tick() + .latest_tick(proposer_tick) .continue_unless(p_is_leader) .filter(q!(move |latest_received_i_am_leader| { if let Some(latest_received_i_am_leader) = latest_received_i_am_leader { @@ -297,6 +313,7 @@ fn p_leader_expired<'a>( #[expect(clippy::type_complexity, reason = "internal paxos code // TODO")] fn p_leader_heartbeat<'a>( proposers: &Cluster<'a, Proposer>, + proposer_tick: &Tick>, p_is_leader: Optional>>, p_ballot_num: Singleton>>, i_am_leader_send_timeout: u64, // How often to heartbeat @@ -319,6 +336,7 @@ fn p_leader_heartbeat<'a>( .broadcast_bincode_interleaved(proposers); let p_leader_expired = p_leader_expired( + proposer_tick, p_to_proposers_i_am_leader.clone(), p_is_leader, i_am_leader_check_timeout, @@ -333,7 +351,7 @@ fn p_leader_heartbeat<'a>( )), q!(Duration::from_secs(i_am_leader_check_timeout)), ) - .tick_batch() + .tick_batch(proposer_tick) .first(), ); (p_to_proposers_i_am_leader, p_trigger_election) @@ -363,7 +381,7 @@ fn p_p1a<'a>( #[expect(clippy::type_complexity, reason = "internal paxos code // TODO")] fn acceptor_p1<'a, L: Serialize + DeserializeOwned + Clone>( - acceptors: &Cluster<'a, Acceptor>, + acceptor_tick: &Tick>, p_to_acceptors_p1a: Stream>, a_log: Singleton>>, proposers: &Cluster<'a, Proposer>, @@ -371,14 +389,14 @@ fn acceptor_p1<'a, L: Serialize + DeserializeOwned + Clone>( Singleton>>, Stream, Unbounded, Cluster<'a, Proposer>>, ) { - let p_to_acceptors_p1a = p_to_acceptors_p1a.tick_batch(); + let p_to_acceptors_p1a = p_to_acceptors_p1a.tick_batch(acceptor_tick); let a_max_ballot = p_to_acceptors_p1a .clone() .inspect(q!(|p1a| println!("Acceptor received P1a: {:?}", p1a))) .persist() .map(q!(|p1a| p1a.ballot)) .max() - .unwrap_or(acceptors.singleton_each_tick(q!(Ballot { + .unwrap_or(acceptor_tick.singleton(q!(Ballot { num: 0, proposer_id: ClusterId::from_raw(0) }))); @@ -405,6 +423,7 @@ fn acceptor_p1<'a, L: Serialize + DeserializeOwned + Clone>( #[expect(clippy::type_complexity, reason = "internal paxos code // TODO")] fn p_p1b<'a, P: Clone + Serialize + DeserializeOwned>( proposers: &Cluster<'a, Proposer>, + proposer_tick: &Tick>, a_to_proposers_p1b: Stream, Unbounded, Cluster<'a, Proposer>>, p_ballot_num: Singleton>>, p_has_largest_ballot: Optional<(Ballot, u32), Bounded, Tick>>, @@ -415,7 +434,7 @@ fn p_p1b<'a, P: Clone + Serialize + DeserializeOwned>( ) { let p_id = proposers.self_id(); let p_relevant_p1bs = a_to_proposers_p1b - .tick_prefix() + .tick_prefix(proposer_tick) // NOTE: because `p_ballot_num` grows monotonically across ticks, we could garbage gollect // but we don't do that here since leader election is a rare event .cross_singleton(p_ballot_num.clone()) @@ -520,6 +539,8 @@ fn recommit_after_leader_election<'a, P: PaxosPayload>( fn sequence_payload<'a, P: PaxosPayload, R>( proposers: &Cluster<'a, Proposer>, acceptors: &Cluster<'a, Acceptor>, + proposer_tick: &Tick>, + acceptor_tick: &Tick>, c_to_proposers: Stream>, r_to_acceptors_checkpoint: Stream<(ClusterId, usize), Unbounded, Cluster<'a, Acceptor>>, @@ -538,6 +559,7 @@ fn sequence_payload<'a, P: PaxosPayload, R>( ) { let p_to_acceptors_p2a = p_p2a( proposers, + proposer_tick, p_max_slot, c_to_proposers, p_ballot_num.clone(), @@ -549,6 +571,7 @@ fn sequence_payload<'a, P: PaxosPayload, R>( // Acceptors. // p_to_acceptors_p2a.clone().for_each(q!(|p2a: P2a| println!("Acceptor received P2a: {:?}", p2a))); let (a_log, a_to_proposers_p2b) = acceptor_p2( + acceptor_tick, a_max_ballot.clone(), p_to_acceptors_p2a, r_to_acceptors_checkpoint, @@ -556,7 +579,7 @@ fn sequence_payload<'a, P: PaxosPayload, R>( f, ); - let p_to_replicas = p_p2b(proposers, a_to_proposers_p2b.clone(), f); + let p_to_replicas = p_p2b(proposer_tick, a_to_proposers_p2b.clone(), f); (p_to_replicas, a_log, a_to_proposers_p2b) } @@ -568,8 +591,10 @@ enum CheckpointOrP2a

{ } // Proposer logic to send p2as, outputting the next slot and the p2as to send to acceptors. +#[expect(clippy::too_many_arguments, reason = "internal paxos code // TODO")] fn p_p2a<'a, P: PaxosPayload>( proposers: &Cluster<'a, Proposer>, + proposer_tick: &Tick>, p_max_slot: Optional>>, c_to_proposers: Stream>, p_ballot_num: Singleton>>, @@ -578,16 +603,16 @@ fn p_p2a<'a, P: PaxosPayload>( acceptors: &Cluster<'a, Acceptor>, ) -> Stream, Unbounded, Cluster<'a, Acceptor>> { let p_id = proposers.self_id(); - let (p_next_slot_complete_cycle, p_next_slot) = proposers.tick_cycle::>(); + let (p_next_slot_complete_cycle, p_next_slot) = proposer_tick.cycle::>(); let p_next_slot_after_reconciling_p1bs = p_max_slot .map(q!(|max_slot| max_slot + 1)) - .unwrap_or(proposers.singleton_each_tick(q!(0))) + .unwrap_or(proposer_tick.singleton(q!(0))) // .inspect(q!(|max_slot| println!("{} p_max_slot: {:?}", context.current_tick(), max_slot))) .continue_unless(p_next_slot.clone()); // Send p2as let p_indexed_payloads = c_to_proposers - .tick_batch() + .tick_batch(proposer_tick) .enumerate() .cross_singleton(p_next_slot.clone()) // .inspect(q!(|next| println!("{} p_indexed_payloads next slot: {}", context.current_tick(), next)))) @@ -623,6 +648,7 @@ fn p_p2a<'a, P: PaxosPayload>( #[expect(clippy::type_complexity, reason = "internal paxos code // TODO")] fn acceptor_p2<'a, P: PaxosPayload, R>( + acceptor_tick: &Tick>, a_max_ballot: Singleton>>, p_to_acceptors_p2a: Stream, Unbounded, Cluster<'a, Acceptor>>, r_to_acceptors_checkpoint: Stream<(ClusterId, usize), Unbounded, Cluster<'a, Acceptor>>, @@ -632,17 +658,16 @@ fn acceptor_p2<'a, P: PaxosPayload, R>( Singleton<(Option, HashMap>), Bounded, Tick>>, Stream, Unbounded, Cluster<'a, Proposer>>, ) { - let p_to_acceptors_p2a_batch = p_to_acceptors_p2a.tick_batch(); + let p_to_acceptors_p2a_batch = p_to_acceptors_p2a.tick_batch(acceptor_tick); // Get the latest checkpoint sequence per replica - let a_checkpoint_largest_seqs = - r_to_acceptors_checkpoint - .tick_prefix() - .reduce_keyed(q!(|curr_seq, seq| { - if seq > *curr_seq { - *curr_seq = seq; - } - })); + let a_checkpoint_largest_seqs = r_to_acceptors_checkpoint + .tick_prefix(acceptor_tick) + .reduce_keyed(q!(|curr_seq, seq| { + if seq > *curr_seq { + *curr_seq = seq; + } + })); let a_checkpoints_quorum_reached = a_checkpoint_largest_seqs.clone().count().filter_map(q!( move |num_received| if num_received == f + 1 { Some(true) @@ -721,13 +746,15 @@ fn acceptor_p2<'a, P: PaxosPayload, R>( } fn p_p2b<'a, P: PaxosPayload>( - proposers: &Cluster<'a, Proposer>, + proposer_tick: &Tick>, a_to_proposers_p2b: Stream, Unbounded, Cluster<'a, Proposer>>, f: usize, ) -> Stream<(usize, Option

), Unbounded, Cluster<'a, Proposer>> { - let (p_broadcasted_p2b_slots_complete_cycle, p_broadcasted_p2b_slots) = proposers.tick_cycle(); - let (p_persisted_p2bs_complete_cycle, p_persisted_p2bs) = proposers.tick_cycle(); - let p_p2b = a_to_proposers_p2b.tick_batch().union(p_persisted_p2bs); + let (p_broadcasted_p2b_slots_complete_cycle, p_broadcasted_p2b_slots) = proposer_tick.cycle(); + let (p_persisted_p2bs_complete_cycle, p_persisted_p2bs) = proposer_tick.cycle(); + let p_p2b = a_to_proposers_p2b + .tick_batch(proposer_tick) + .union(p_persisted_p2bs); let p_count_matching_p2bs = p_p2b .clone() .filter_map(q!(|p2b| if p2b.ballot == p2b.max_ballot { diff --git a/hydroflow_plus_test/src/cluster/paxos_bench.rs b/hydroflow_plus_test/src/cluster/paxos_bench.rs index 0af5e6ce350a..e9c59925b274 100644 --- a/hydroflow_plus_test/src/cluster/paxos_bench.rs +++ b/hydroflow_plus_test/src/cluster/paxos_bench.rs @@ -88,6 +88,7 @@ fn bench_client<'a>( median_latency_window_size: usize, f: usize, ) { + let client_tick = clients.tick(); let c_id = clients.self_id(); // r_to_clients_payload_applied.clone().inspect(q!(|payload: &(u32, ReplicaPayload)| println!("Client received payload: {:?}", payload))); // Only keep the latest leader @@ -97,7 +98,7 @@ fn bench_client<'a>( ballot ))) .max(); - let c_new_leader_ballot = current_leader.clone().latest_tick().delta(); + let c_new_leader_ballot = current_leader.clone().latest_tick(&client_tick).delta(); // Whenever the leader changes, make all clients send a message let c_new_payloads_when_leader_elected = c_new_leader_ballot @@ -116,10 +117,9 @@ fn bench_client<'a>( let transaction_results = transaction_cycle(c_to_proposers); // Whenever replicas confirm that a payload was committed, collected it and wait for a quorum - let (c_pending_quorum_payloads_complete_cycle, c_pending_quorum_payloads) = - clients.tick_cycle(); + let (c_pending_quorum_payloads_complete_cycle, c_pending_quorum_payloads) = client_tick.cycle(); let c_received_payloads = transaction_results - .tick_batch() + .tick_batch(&client_tick) .map(q!(|(sender, replica_payload)| ( replica_payload.key, sender @@ -146,7 +146,7 @@ fn bench_client<'a>( // Whenever all replicas confirm that a payload was committed, send another payload let c_new_payloads_when_committed = c_received_quorum_payloads .clone() - .cross_singleton(current_leader.clone().latest_tick()) + .cross_singleton(current_leader.clone().latest_tick(&client_tick)) .map(q!(move |(key, cur_leader)| ( cur_leader, KvPayload { key, value: c_id } @@ -159,7 +159,7 @@ fn bench_client<'a>( // Track statistics let (c_timers_complete_cycle, c_timers) = - clients.tick_cycle::>(); + client_tick.cycle::>(); let c_new_timers_when_leader_elected = c_new_leader_ballot .map(q!(|_| SystemTime::now())) .flat_map(q!( @@ -181,7 +181,7 @@ fn bench_client<'a>( let c_stats_output_timer = clients .source_interval(q!(Duration::from_secs(1))) - .tick_batch() + .tick_batch(&client_tick) .first(); let c_latency_reset = c_stats_output_timer.clone().map(q!(|_| None)).defer_tick(); @@ -243,7 +243,7 @@ fn bench_client<'a>( c_latencies .zip(c_throughput) - .latest_tick() + .latest_tick(&client_tick) .continue_if(c_stats_output_timer) .all_ticks() .for_each(q!(move |(latencies, throughput)| { diff --git a/hydroflow_plus_test/src/cluster/paxos_kv.rs b/hydroflow_plus_test/src/cluster/paxos_kv.rs index 320643a4befb..840e30cc9af7 100644 --- a/hydroflow_plus_test/src/cluster/paxos_kv.rs +++ b/hydroflow_plus_test/src/cluster/paxos_kv.rs @@ -98,15 +98,17 @@ pub fn replica<'a, K: KvKey, V: KvValue>( Stream>, Stream, Unbounded, Cluster<'a, Replica>>, ) { - let (r_buffered_payloads_complete_cycle, r_buffered_payloads) = replicas.tick_cycle(); + let replica_tick = replicas.tick(); + + let (r_buffered_payloads_complete_cycle, r_buffered_payloads) = replica_tick.cycle(); // p_to_replicas.inspect(q!(|payload: ReplicaPayload| println!("Replica received payload: {:?}", payload))); let r_sorted_payloads = p_to_replicas - .tick_batch() + .tick_batch(&replica_tick) .union(r_buffered_payloads) // Combine with all payloads that we've received and not processed yet .sort(); // Create a cycle since we'll use this seq before we define it let (r_highest_seq_complete_cycle, r_highest_seq) = - replicas.tick_cycle::>(); + replica_tick.cycle::>(); // Find highest the sequence number of any payload that can be processed in this tick. This is the payload right before a hole. let r_highest_seq_processable_payload = r_sorted_payloads .clone() @@ -160,7 +162,7 @@ pub fn replica<'a, K: KvKey, V: KvValue>( // Send checkpoints to the acceptors when we've processed enough payloads let (r_checkpointed_seqs_complete_cycle, r_checkpointed_seqs) = - replicas.tick_cycle::>(); + replica_tick.cycle::>(); let r_max_checkpointed_seq = r_checkpointed_seqs.persist().max().into_singleton(); let r_checkpoint_seq_new = r_max_checkpointed_seq diff --git a/hydroflow_plus_test/src/cluster/simple_cluster.rs b/hydroflow_plus_test/src/cluster/simple_cluster.rs index 771a7ed1c97b..f3c7062b348d 100644 --- a/hydroflow_plus_test/src/cluster/simple_cluster.rs +++ b/hydroflow_plus_test/src/cluster/simple_cluster.rs @@ -40,7 +40,7 @@ pub fn simple_cluster<'a>(flow: &FlowBuilder<'a>) -> (Process<'a, ()>, Cluster<' ids.cross_product(numbers) .map(q!(|(id, n)| (id, (id, n)))) .send_bincode(&cluster) - .tick_batch() + .tick_batch(&cluster.tick()) .inspect(q!(move |n| println!( "cluster received: {:?} (self cluster id: {})", n, cluster_self_id diff --git a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__compute_pi__tests__compute_pi_ir.snap b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__compute_pi__tests__compute_pi_ir.snap index eb4e28cb096b..a9bac21e29da 100644 --- a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__compute_pi__tests__compute_pi_ir.snap +++ b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__compute_pi__tests__compute_pi_ir.snap @@ -51,9 +51,9 @@ expression: built.ir() input: Map { f: stageleft :: runtime_support :: fn1_type_hint :: < () , (f64 , f64) > ({ use crate :: __staged :: cluster :: compute_pi :: * ; | _ | rand :: random :: < (f64 , f64) > () }), input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < usize , () > ({ use hydroflow_plus :: __staged :: location :: * ; | _ | () }), + f: stageleft :: runtime_support :: fn1_type_hint :: < usize , () > ({ use hydroflow_plus :: __staged :: location :: tick :: * ; | _ | () }), input: FlatMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < () , std :: ops :: Range < usize > > ({ use hydroflow_plus :: __staged :: location :: * ; let batch_size = { use crate :: __staged :: cluster :: compute_pi :: * ; let batch_size = 8192usize ; batch_size } ; move | _ | 0 .. batch_size }), + f: stageleft :: runtime_support :: fn1_type_hint :: < () , std :: ops :: Range < usize > > ({ use hydroflow_plus :: __staged :: location :: tick :: * ; let batch_size = { use crate :: __staged :: cluster :: compute_pi :: * ; let batch_size = 8192usize ; batch_size } ; move | _ | 0 .. batch_size }), input: Source { source: Spin, location_kind: Cluster( diff --git a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__compute_pi__tests__compute_pi_ir@surface_graph_0.snap b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__compute_pi__tests__compute_pi_ir@surface_graph_0.snap index 41e4f54f9479..0df52aa01c95 100644 --- a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__compute_pi__tests__compute_pi_ir@surface_graph_0.snap +++ b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__compute_pi__tests__compute_pi_ir@surface_graph_0.snap @@ -3,8 +3,8 @@ source: hydroflow_plus_test/src/cluster/compute_pi.rs expression: ir.surface_syntax_string() --- 1v1 = spin (); -2v1 = flat_map (stageleft :: runtime_support :: fn1_type_hint :: < () , std :: ops :: Range < usize > > ({ use hydroflow_plus :: __staged :: location :: * ; let batch_size = { use crate :: __staged :: cluster :: compute_pi :: * ; let batch_size = 8192usize ; batch_size } ; move | _ | 0 .. batch_size })); -3v1 = map (stageleft :: runtime_support :: fn1_type_hint :: < usize , () > ({ use hydroflow_plus :: __staged :: location :: * ; | _ | () })); +2v1 = flat_map (stageleft :: runtime_support :: fn1_type_hint :: < () , std :: ops :: Range < usize > > ({ use hydroflow_plus :: __staged :: location :: tick :: * ; let batch_size = { use crate :: __staged :: cluster :: compute_pi :: * ; let batch_size = 8192usize ; batch_size } ; move | _ | 0 .. batch_size })); +3v1 = map (stageleft :: runtime_support :: fn1_type_hint :: < usize , () > ({ use hydroflow_plus :: __staged :: location :: tick :: * ; | _ | () })); 4v1 = map (stageleft :: runtime_support :: fn1_type_hint :: < () , (f64 , f64) > ({ use crate :: __staged :: cluster :: compute_pi :: * ; | _ | rand :: random :: < (f64 , f64) > () })); 5v1 = map (stageleft :: runtime_support :: fn1_type_hint :: < (f64 , f64) , bool > ({ use crate :: __staged :: cluster :: compute_pi :: * ; | (x , y) | x * x + y * y < 1.0 })); 6v1 = fold :: < 'tick > (stageleft :: runtime_support :: fn0_type_hint :: < (u64 , u64) > ({ use crate :: __staged :: cluster :: compute_pi :: * ; | | (0u64 , 0u64) }) , stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (u64 , u64) , bool , () > ({ use crate :: __staged :: cluster :: compute_pi :: * ; | (inside , total) , sample_inside | { if sample_inside { * inside += 1 ; } * total += 1 ; } })); diff --git a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap index 2b9f3f5cf572..df35cf749539 100644 --- a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap +++ b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap @@ -29,8 +29,11 @@ expression: built.ir() ident: Ident { sym: cycle_4, }, - location_kind: Cluster( - 0, + location_kind: Tick( + 1, + Cluster( + 0, + ), ), input: DeferTick( Map { @@ -95,8 +98,11 @@ expression: built.ir() ident: Ident { sym: cycle_4, }, - location_kind: Cluster( - 0, + location_kind: Tick( + 1, + Cluster( + 0, + ), ), }, Persist( @@ -175,8 +181,11 @@ expression: built.ir() ident: Ident { sym: cycle_3, }, - location_kind: Cluster( - 0, + location_kind: Tick( + 1, + Cluster( + 0, + ), ), }, }, @@ -383,8 +392,11 @@ expression: built.ir() ident: Ident { sym: cycle_0, }, - location_kind: Cluster( - 1, + location_kind: Tick( + 2, + Cluster( + 1, + ), ), }, }, @@ -398,8 +410,11 @@ expression: built.ir() ident: Ident { sym: cycle_3, }, - location_kind: Cluster( - 0, + location_kind: Tick( + 1, + Cluster( + 0, + ), ), input: Tee { inner: : Map { @@ -457,8 +472,11 @@ expression: built.ir() ident: Ident { sym: cycle_5, }, - location_kind: Cluster( - 0, + location_kind: Tick( + 1, + Cluster( + 0, + ), ), input: DeferTick( Map { @@ -515,8 +533,11 @@ expression: built.ir() ident: Ident { sym: cycle_5, }, - location_kind: Cluster( - 0, + location_kind: Tick( + 1, + Cluster( + 0, + ), ), }, }, @@ -613,8 +634,11 @@ expression: built.ir() ident: Ident { sym: cycle_6, }, - location_kind: Cluster( - 0, + location_kind: Tick( + 1, + Cluster( + 0, + ), ), input: DeferTick( Difference( @@ -804,8 +828,11 @@ expression: built.ir() ident: Ident { sym: cycle_7, }, - location_kind: Cluster( - 0, + location_kind: Tick( + 1, + Cluster( + 0, + ), ), }, ), @@ -832,8 +859,11 @@ expression: built.ir() ident: Ident { sym: cycle_7, }, - location_kind: Cluster( - 0, + location_kind: Tick( + 1, + Cluster( + 0, + ), ), input: DeferTick( Map { @@ -856,8 +886,11 @@ expression: built.ir() ident: Ident { sym: cycle_0, }, - location_kind: Cluster( - 1, + location_kind: Tick( + 2, + Cluster( + 1, + ), ), input: Fold { init: stageleft :: runtime_support :: fn0_type_hint :: < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: cluster :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | | (None , HashMap :: new ()) }), @@ -972,8 +1005,11 @@ expression: built.ir() ident: Ident { sym: cycle_1, }, - location_kind: Cluster( - 3, + location_kind: Tick( + 4, + Cluster( + 3, + ), ), input: DeferTick( Map { @@ -1028,8 +1064,11 @@ expression: built.ir() ident: Ident { sym: cycle_6, }, - location_kind: Cluster( - 0, + location_kind: Tick( + 1, + Cluster( + 0, + ), ), }, ), @@ -1041,8 +1080,11 @@ expression: built.ir() ident: Ident { sym: cycle_1, }, - location_kind: Cluster( - 3, + location_kind: Tick( + 4, + Cluster( + 3, + ), ), }, ), @@ -1065,8 +1107,11 @@ expression: built.ir() ident: Ident { sym: cycle_2, }, - location_kind: Cluster( - 3, + location_kind: Tick( + 4, + Cluster( + 3, + ), ), }, }, @@ -1094,8 +1139,11 @@ expression: built.ir() ident: Ident { sym: cycle_2, }, - location_kind: Cluster( - 3, + location_kind: Tick( + 4, + Cluster( + 3, + ), ), input: DeferTick( Tee { @@ -1131,8 +1179,11 @@ expression: built.ir() ident: Ident { sym: cycle_3, }, - location_kind: Cluster( - 3, + location_kind: Tick( + 4, + Cluster( + 3, + ), ), input: DeferTick( Tee { @@ -1149,8 +1200,11 @@ expression: built.ir() ident: Ident { sym: cycle_3, }, - location_kind: Cluster( - 3, + location_kind: Tick( + 4, + Cluster( + 3, + ), ), }, ), @@ -1241,8 +1295,11 @@ expression: built.ir() ident: Ident { sym: cycle_2, }, - location_kind: Cluster( - 2, + location_kind: Tick( + 0, + Cluster( + 2, + ), ), input: DeferTick( AntiJoin( @@ -1295,8 +1352,11 @@ expression: built.ir() ident: Ident { sym: cycle_2, }, - location_kind: Cluster( - 2, + location_kind: Tick( + 0, + Cluster( + 2, + ), ), }, ), @@ -1366,8 +1426,11 @@ expression: built.ir() ident: Ident { sym: cycle_3, }, - location_kind: Cluster( - 2, + location_kind: Tick( + 0, + Cluster( + 2, + ), ), input: DeferTick( ReduceKeyed { @@ -1379,8 +1442,11 @@ expression: built.ir() ident: Ident { sym: cycle_3, }, - location_kind: Cluster( - 2, + location_kind: Tick( + 0, + Cluster( + 2, + ), ), }, }, diff --git a/hydroflow_plus_test/src/cluster/two_pc.rs b/hydroflow_plus_test/src/cluster/two_pc.rs index 42f944cb419f..5e11100ffc95 100644 --- a/hydroflow_plus_test/src/cluster/two_pc.rs +++ b/hydroflow_plus_test/src/cluster/two_pc.rs @@ -69,7 +69,7 @@ pub fn two_pc<'a>( .map(q!(|(id, (t, _reply))| (t, id))) // fold_keyed: 1 input stream of type (K, V1), 1 output stream of type (K, V2). // The output will have one tuple for each distinct K, with an accumulated value of type V2. - .tick_batch().fold_keyed(q!(|| 0), q!(|old: &mut u32, _| *old += 1)).filter_map(q!(move |(t, count)| { + .tick_batch(&coordinator.tick()).fold_keyed(q!(|| 0), q!(|old: &mut u32, _| *old += 1)).filter_map(q!(move |(t, count)| { // here I set the participant to 3. If want more or less participant, fix line 26 of examples/broadcast.rs if count == num_participants { Some(t) diff --git a/hydroflow_plus_test_local/src/local/chat_app.rs b/hydroflow_plus_test_local/src/local/chat_app.rs index 6ede979ea63f..dc87f8a680c1 100644 --- a/hydroflow_plus_test_local/src/local/chat_app.rs +++ b/hydroflow_plus_test_local/src/local/chat_app.rs @@ -13,13 +13,17 @@ pub fn chat_app<'a>( replay_messages: bool, ) -> impl Quoted<'a, Hydroflow<'a>> { let process = flow.process::<()>(); + let tick = process.tick(); - let users = process.source_stream(users_stream).tick_batch().persist(); + let users = process + .source_stream(users_stream) + .tick_batch(&tick) + .persist(); let messages = process.source_stream(messages); let messages = if replay_messages { - messages.tick_batch().persist() + messages.tick_batch(&tick).persist() } else { - messages.tick_batch() + messages.tick_batch(&tick) }; // do this after the persist to test pullup diff --git a/hydroflow_plus_test_local/src/local/compute_pi.rs b/hydroflow_plus_test_local/src/local/compute_pi.rs index af4882fa2cfa..97b850714e45 100644 --- a/hydroflow_plus_test_local/src/local/compute_pi.rs +++ b/hydroflow_plus_test_local/src/local/compute_pi.rs @@ -6,8 +6,9 @@ use stageleft::*; pub fn compute_pi<'a>(flow: &FlowBuilder<'a>, batch_size: RuntimeData) -> Process<'a, ()> { let process = flow.process(); + let tick = process.tick(); - let trials = process + let trials = tick .spin_batch(q!(batch_size)) .map(q!(|_| rand::random::<(f64, f64)>())) .map(q!(|(x, y)| x * x + y * y < 1.0)) diff --git a/hydroflow_plus_test_local/src/local/count_elems.rs b/hydroflow_plus_test_local/src/local/count_elems.rs index 04818fed51ad..de967ca18bc0 100644 --- a/hydroflow_plus_test_local/src/local/count_elems.rs +++ b/hydroflow_plus_test_local/src/local/count_elems.rs @@ -10,11 +10,12 @@ pub fn count_elems_generic<'a, T: 'a>( output: RuntimeData<&'a UnboundedSender>, ) -> impl Quoted<'a, Hydroflow<'a>> { let process = flow.process::<()>(); + let tick = process.tick(); let source = process.source_stream(input_stream); let count = source .map(q!(|_| 1)) - .tick_batch() + .tick_batch(&tick) .fold(q!(|| 0), q!(|a, b| *a += b)) .all_ticks(); diff --git a/hydroflow_plus_test_local/src/local/negation.rs b/hydroflow_plus_test_local/src/local/negation.rs index 9fb75b678198..667bec86c572 100644 --- a/hydroflow_plus_test_local/src/local/negation.rs +++ b/hydroflow_plus_test_local/src/local/negation.rs @@ -11,13 +11,14 @@ pub fn test_difference<'a>( persist2: bool, ) -> impl Quoted<'a, Hydroflow<'a>> { let process = flow.process::<()>(); + let tick = process.tick(); - let mut source = process.source_iter(q!(0..5)).tick_batch(); + let mut source = process.source_iter(q!(0..5)).tick_batch(&tick); if persist1 { source = source.persist(); } - let mut source2 = process.source_iter(q!(3..6)).tick_batch(); + let mut source2 = process.source_iter(q!(3..6)).tick_batch(&tick); if persist2 { source2 = source2.persist(); } @@ -38,16 +39,17 @@ pub fn test_anti_join<'a>( persist2: bool, ) -> impl Quoted<'a, Hydroflow<'a>> { let process = flow.process::<()>(); + let tick = process.tick(); let mut source = process .source_iter(q!(0..5)) .map(q!(|v| (v, v))) - .tick_batch(); + .tick_batch(&tick); if persist1 { source = source.persist(); } - let mut source2 = process.source_iter(q!(3..6)).tick_batch(); + let mut source2 = process.source_iter(q!(3..6)).tick_batch(&tick); if persist2 { source2 = source2.persist(); } diff --git a/hydroflow_plus_test_local/src/local/teed_join.rs b/hydroflow_plus_test_local/src/local/teed_join.rs index 951ad9608874..3d1283528af9 100644 --- a/hydroflow_plus_test_local/src/local/teed_join.rs +++ b/hydroflow_plus_test_local/src/local/teed_join.rs @@ -18,8 +18,9 @@ pub fn teed_join<'a, S: Stream + Unpin + 'a>( ) -> impl Quoted<'a, Hydroflow<'a>> { let node_zero = flow.process::(); let node_one = flow.process::(); + let n0_tick = node_zero.tick(); - let source = node_zero.source_stream(input_stream).tick_batch(); + let source = node_zero.source_stream(input_stream).tick_batch(&n0_tick); let map1 = source.clone().map(q!(|v| (v + 1, ()))); let map2 = source.map(q!(|v| (v - 1, ())));