diff --git a/hydroflow_plus/src/stream.rs b/hydroflow_plus/src/stream.rs index f374d5761bb9..7c6c9cb142a3 100644 --- a/hydroflow_plus/src/stream.rs +++ b/hydroflow_plus/src/stream.rs @@ -176,6 +176,13 @@ impl<'a, T, W, C, N: Location<'a>> Stream { ) } + pub fn flatten(self) -> Stream + where + T: IntoIterator, + { + self.flat_map(q!(|d| d)) + } + pub fn filter bool + 'a>( self, f: impl IntoQuotedMut<'a, F>, diff --git a/hydroflow_plus_test/src/cluster/paxos_bench.rs b/hydroflow_plus_test/src/cluster/paxos_bench.rs index 897727de986d..a1a0ec690594 100644 --- a/hydroflow_plus_test/src/cluster/paxos_bench.rs +++ b/hydroflow_plus_test/src/cluster/paxos_bench.rs @@ -370,6 +370,7 @@ fn bench_client<'a>( ))) .union(c_latency_reset.into_stream()) .all_ticks() + .flatten() .fold( // Create window with ring buffer using vec + wraparound index // TODO: Would be nice if I could use vec![] instead, but that doesn't work in HF+ with RuntimeData *median_latency_window_size @@ -378,31 +379,20 @@ fn bench_client<'a>( median_latency_window_size ))), 0usize, - false )), - q!(move |(latencies, write_index, has_any_value), latency| { + q!(move |(latencies, write_index), latency| { let mut latencies_mut = latencies.borrow_mut(); - if let Some(latency) = latency { - // Insert into latencies - if let Some(prev_latency) = latencies_mut.get_mut(*write_index) { - *prev_latency = latency; - } else { - latencies_mut.push(latency); - } - *has_any_value = true; - // Increment write index and wrap around - *write_index += 1; - if *write_index == median_latency_window_size { - *write_index = 0; - } + if *write_index < latencies_mut.len() { + latencies_mut[*write_index] = latency; } else { - // reset latencies - latencies_mut.clear(); - *write_index = 0; - *has_any_value = false; + latencies_mut.push(latency); } + // Increment write index and wrap around + *write_index = (*write_index + 1) % median_latency_window_size; }), - ); + ) + .map(q!(|(latencies, _)| latencies)); + let c_throughput_new_batch = c_received_quorum_payloads .clone() .count() @@ -419,37 +409,30 @@ fn bench_client<'a>( .union(c_throughput_reset) .all_ticks() .fold( - q!(|| (0, 0)), - q!(|(total, num_ticks), (batch_size, reset)| { + q!(|| 0), + q!(|total, (batch_size, reset)| { if reset { *total = 0; - *num_ticks = 0; } else { - *total += batch_size as u32; - *num_ticks += 1; + *total += batch_size; } }), ); - c_stats_output_timer - .cross_singleton(c_latencies) + c_latencies .cross_singleton(c_throughput) - .tick_samples() - .for_each(q!(move |( - (_, (latencies, _write_index, has_any_value)), - (throughput, num_ticks), - )| { + .latest_tick() + .continue_if(c_stats_output_timer.latest_tick()) + .all_ticks() + .for_each(q!(move |(latencies, throughput)| { let mut latencies_mut = latencies.borrow_mut(); - let median_latency = if has_any_value { - let (_, median, _) = - latencies_mut.select_nth_unstable(median_latency_window_size / 2); - *median - } else { - 0 - }; - println!("Median latency: {}ms", median_latency as f64 / 1000.0); + if latencies_mut.len() > 0 { + let middle_idx = latencies_mut.len() / 2; + let (_, median, _) = latencies_mut.select_nth_unstable(middle_idx); + println!("Median latency: {}ms", (*median) as f64 / 1000.0); + } + println!("Throughput: {} requests/s", throughput); - println!("Num ticks per second: {}", num_ticks); })); // End track statistics c_to_proposers diff --git a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap index c2f43c08201f..f1f81755fdd1 100644 --- a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap +++ b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap @@ -1361,92 +1361,104 @@ expression: built.ir() ), }, ForEach { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((() , (std :: rc :: Rc < core :: cell :: RefCell < std :: vec :: Vec < u128 > > > , usize , bool)) , (u32 , i32)) , () > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let median_latency_window_size = 1usize ; move | ((_ , (latencies , _write_index , has_any_value)) , (throughput , num_ticks) ,) | { let mut latencies_mut = latencies . borrow_mut () ; let median_latency = if has_any_value { let (_ , median , _) = latencies_mut . select_nth_unstable (median_latency_window_size / 2) ; * median } else { 0 } ; println ! ("Median latency: {}ms" , median_latency as f64 / 1000.0) ; println ! ("Throughput: {} requests/s" , throughput) ; println ! ("Num ticks per second: {}" , num_ticks) ; } }), - input: CrossSingleton( - CrossSingleton( - Tee { - inner: : Source { - source: Interval( - { use crate :: __staged :: cluster :: paxos_bench :: * ; Duration :: from_secs (1) }, - ), - location_kind: Cluster( - 0, - ), - }, - }, - Fold { - init: stageleft :: runtime_support :: fn0_type_hint :: < (std :: rc :: Rc < core :: cell :: RefCell < std :: vec :: Vec < u128 > > > , usize , bool) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let median_latency_window_size = 1usize ; move | | (Rc :: new (RefCell :: new (Vec :: < u128 > :: with_capacity (median_latency_window_size))) , 0usize , false) }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (std :: rc :: Rc < core :: cell :: RefCell < std :: vec :: Vec < u128 > > > , usize , bool) , core :: option :: Option < u128 > , () > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let median_latency_window_size = 1usize ; move | (latencies , write_index , has_any_value) , latency | { let mut latencies_mut = latencies . borrow_mut () ; if let Some (latency) = latency { if let Some (prev_latency) = latencies_mut . get_mut (* write_index) { * prev_latency = latency ; } else { latencies_mut . push (latency) ; } * has_any_value = true ; * write_index += 1 ; if * write_index == median_latency_window_size { * write_index = 0 ; } } else { latencies_mut . clear () ; * write_index = 0 ; * has_any_value = false ; } } }), - input: Persist( - Union( - Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , (std :: time :: SystemTime , std :: time :: SystemTime)) , core :: option :: Option < u128 > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (_virtual_id , (prev_time , curr_time)) | Some (curr_time . duration_since (prev_time) . unwrap () . as_micros ()) }), - input: Join( - Tee { - inner: , - }, - Tee { - inner: , - }, - ), - }, - DeferTick( - Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < () , core :: option :: Option < u128 > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | None }), - input: Tee { - inner: , - }, + f: stageleft :: runtime_support :: fn1_type_hint :: < (std :: rc :: Rc < core :: cell :: RefCell < std :: vec :: Vec < u128 > > > , usize) , () > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; move | (latencies , throughput) | { let mut latencies_mut = latencies . borrow_mut () ; if latencies_mut . len () > 0 { let middle_idx = latencies_mut . len () / 2 ; let (_ , median , _) = latencies_mut . select_nth_unstable (middle_idx) ; println ! ("Median latency: {}ms" , (* median) as f64 / 1000.0) ; } println ! ("Throughput: {} requests/s" , throughput) ; } }), + input: Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < ((std :: rc :: Rc < core :: cell :: RefCell < std :: vec :: Vec < u128 > > > , usize) , ()) , (std :: rc :: Rc < core :: cell :: RefCell < std :: vec :: Vec < u128 > > > , usize) > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), + input: CrossSingleton( + CrossSingleton( + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (std :: rc :: Rc < core :: cell :: RefCell < std :: vec :: Vec < u128 > > > , usize) , std :: rc :: Rc < core :: cell :: RefCell < std :: vec :: Vec < u128 > > > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (latencies , _) | latencies }), + input: Fold { + init: stageleft :: runtime_support :: fn0_type_hint :: < (std :: rc :: Rc < core :: cell :: RefCell < std :: vec :: Vec < u128 > > > , usize) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let median_latency_window_size = 1usize ; move | | (Rc :: new (RefCell :: new (Vec :: < u128 > :: with_capacity (median_latency_window_size))) , 0usize ,) }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (std :: rc :: Rc < core :: cell :: RefCell < std :: vec :: Vec < u128 > > > , usize) , u128 , () > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let median_latency_window_size = 1usize ; move | (latencies , write_index) , latency | { let mut latencies_mut = latencies . borrow_mut () ; if * write_index < latencies_mut . len () { latencies_mut [* write_index] = latency ; } else { latencies_mut . push (latency) ; } * write_index = (* write_index + 1) % median_latency_window_size ; } }), + input: Persist( + FlatMap { + f: stageleft :: runtime_support :: fn1_type_hint :: < core :: option :: Option < u128 > , core :: option :: Option < u128 > > ({ use hydroflow_plus :: __staged :: stream :: * ; | d | d }), + input: Union( + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , (std :: time :: SystemTime , std :: time :: SystemTime)) , core :: option :: Option < u128 > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (_virtual_id , (prev_time , curr_time)) | Some (curr_time . duration_since (prev_time) . unwrap () . as_micros ()) }), + input: Join( + Tee { + inner: , + }, + Tee { + inner: , + }, + ), + }, + DeferTick( + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < () , core :: option :: Option < u128 > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | None }), + input: Tee { + inner: : Source { + source: Interval( + { use crate :: __staged :: cluster :: paxos_bench :: * ; Duration :: from_secs (1) }, + ), + location_kind: Cluster( + 0, + ), + }, + }, + }, + ), + ), }, ), - ), - ), - }, - ), - Fold { - init: stageleft :: runtime_support :: fn0_type_hint :: < (u32 , i32) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | | (0 , 0) }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (u32 , i32) , (usize , bool) , () > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (total , num_ticks) , (batch_size , reset) | { if reset { * total = 0 ; * num_ticks = 0 ; } else { * total += batch_size as u32 ; * num_ticks += 1 ; } } }), - input: Persist( - Union( - Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < usize , (usize , bool) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | batch_size | (batch_size , false) }), - input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , ()) , usize > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), - input: CrossSingleton( - Fold { - init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , u32 , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), - input: Tee { - inner: , - }, - }, - Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < usize , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), - input: Filter { - f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < usize , bool > ({ use hydroflow_plus :: __staged :: singleton :: * ; | c | * c == 0 }), - input: Fold { + }, + }, + Fold { + init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | | 0 }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , (usize , bool) , () > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | total , (batch_size , reset) | { if reset { * total = 0 ; } else { * total += batch_size ; } } }), + input: Persist( + Union( + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < usize , (usize , bool) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | batch_size | (batch_size , false) }), + input: Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , ()) , usize > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), + input: CrossSingleton( + Fold { init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , () , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , u32 , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), input: Tee { - inner: , + inner: , + }, + }, + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < usize , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), + input: Filter { + f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < usize , bool > ({ use hydroflow_plus :: __staged :: singleton :: * ; | c | * c == 0 }), + input: Fold { + init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , () , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), + input: Tee { + inner: , + }, + }, }, }, + ), + }, + }, + DeferTick( + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < () , (usize , bool) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | (0 , true) }), + input: Tee { + inner: , }, }, ), - }, - }, - DeferTick( - Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < () , (usize , bool) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | (0 , true) }), - input: Tee { - inner: , - }, - }, + ), ), - ), + }, ), - }, - ), + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < () , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), + input: Tee { + inner: , + }, + }, + ), + }, }, CycleSink { ident: Ident {