From 8f94f14544cd844b02f30ddb222be436cb5f6656 Mon Sep 17 00:00:00 2001 From: Shadaj Laddad Date: Wed, 29 Jan 2025 11:11:54 -0800 Subject: [PATCH] refactor(hydro_lang)!: rename timestamp to atomic and provide batching shortcuts --- .../live-collections/bounded-unbounded.md | 4 +- docs/docs/hydro/ticks-atomicity/atomicity.md | 2 +- hydro_lang/Cargo.toml | 2 +- hydro_lang/src/lib.rs | 8 +- hydro_lang/src/location/mod.rs | 2 +- hydro_lang/src/location/tick.rs | 33 ++--- hydro_lang/src/optional.rs | 76 ++++++---- hydro_lang/src/rewrites/persist_pullup.rs | 8 +- hydro_lang/src/rewrites/properties.rs | 3 +- hydro_lang/src/singleton.rs | 117 +++++++-------- hydro_lang/src/stream.rs | 139 ++++++++---------- hydro_std/src/quorum.rs | 20 +-- hydro_std/src/request_response.rs | 6 +- hydro_test/src/cluster/bench_client.rs | 14 +- hydro_test/src/cluster/kv_replica.rs | 16 +- hydro_test/src/cluster/map_reduce.rs | 5 +- hydro_test/src/cluster/paxos.rs | 60 +++----- hydro_test/src/cluster/paxos_bench.rs | 4 +- hydro_test/src/cluster/paxos_with_client.rs | 4 +- ...cluster__paxos_bench__tests__paxos_ir.snap | 63 +++----- hydro_test/src/cluster/two_pc.rs | 2 +- hydro_test_local/src/local/chat_app.rs | 9 +- hydro_test_local/src/local/compute_pi.rs | 3 +- hydro_test_local/src/local/count_elems.rs | 2 +- .../src/local/graph_reachability.rs | 7 +- hydro_test_local/src/local/negation.rs | 18 +-- hydro_test_local/src/local/teed_join.rs | 5 +- stageleft/src/type_name.rs | 2 + 28 files changed, 275 insertions(+), 359 deletions(-) diff --git a/docs/docs/hydro/live-collections/bounded-unbounded.md b/docs/docs/hydro/live-collections/bounded-unbounded.md index a60180fc3d2..6e62674e785 100644 --- a/docs/docs/hydro/live-collections/bounded-unbounded.md +++ b/docs/docs/hydro/live-collections/bounded-unbounded.md @@ -18,7 +18,7 @@ In some cases, you may need to convert between bounded and unbounded collections # let tick = process.tick(); # let numbers = process.source_iter(q!(vec![1, 2, 3, 4])); let input: Stream<_, _, Bounded> = // ... -# unsafe { numbers.timestamped(&tick).tick_batch() }; +# unsafe { numbers.tick_batch(&tick) }; let unbounded: Stream<_, _, Unbounded> = input.into(); ``` @@ -43,6 +43,6 @@ let unbounded_input = // ... # process.source_iter(q!(vec![1, 2, 3, 4])); let tick = process.tick(); let batch: Stream<_, _, Bounded> = unsafe { - unbounded_input.timestamped(&tick).tick_batch() + unbounded_input.tick_batch(&tick) }; ``` diff --git a/docs/docs/hydro/ticks-atomicity/atomicity.md b/docs/docs/hydro/ticks-atomicity/atomicity.md index 3c45ab26f99..4be9a9e692c 100644 --- a/docs/docs/hydro/ticks-atomicity/atomicity.md +++ b/docs/docs/hydro/ticks-atomicity/atomicity.md @@ -2,7 +2,7 @@ sidebar_position: 3 --- -# Atomicity and Timestamps +# Atomicity :::caution The Hydro documentation is currently under active development! This page is a placeholder for future content. diff --git a/hydro_lang/Cargo.toml b/hydro_lang/Cargo.toml index c102a46d01c..1a7fb564221 100644 --- a/hydro_lang/Cargo.toml +++ b/hydro_lang/Cargo.toml @@ -40,7 +40,6 @@ syn = { version = "2.0.46", features = [ "parsing", "extra-traits", "visit-mut" tokio = { version = "1.29.0", features = [ "full" ] } toml = { version = "0.8.0", optional = true } trybuild-internals-api = { version = "1.0.99", optional = true } -ctor = "0.2" [build-dependencies] stageleft_tool = { path = "../stageleft_tool", version = "^0.5.0" } @@ -48,6 +47,7 @@ stageleft_tool = { path = "../stageleft_tool", version = "^0.5.0" } [dev-dependencies] async-ssh2-lite = { version = "0.5.0", features = ["vendored-openssl"] } hydro_deploy = { path = "../hydro_deploy/core", version = "^0.11.0" } +ctor = "0.2" insta = "1.39" tokio-test = "0.4.4" trybuild = "1" diff --git a/hydro_lang/src/lib.rs b/hydro_lang/src/lib.rs index aaad68cb902..47c2a98fd38 100644 --- a/hydro_lang/src/lib.rs +++ b/hydro_lang/src/lib.rs @@ -25,7 +25,7 @@ pub use optional::Optional; pub mod location; pub use location::cluster::CLUSTER_SELF_ID; -pub use location::{Cluster, ClusterId, ExternalProcess, Location, Process, Tick, Timestamped}; +pub use location::{Atomic, Cluster, ClusterId, ExternalProcess, Location, Process, Tick}; #[cfg(feature = "build")] pub mod deploy; @@ -46,12 +46,6 @@ mod staging_util; #[cfg(feature = "deploy")] pub mod test_util; -#[ctor::ctor] -fn add_private_reexports() { - stageleft::add_private_reexport(vec!["tokio", "time", "instant"], vec!["tokio", "time"]); - stageleft::add_private_reexport(vec!["bytes", "bytes"], vec!["bytes"]); -} - #[stageleft::runtime] #[cfg(test)] mod test_init { diff --git a/hydro_lang/src/location/mod.rs b/hydro_lang/src/location/mod.rs index f8902476ed3..4ec66acb5cb 100644 --- a/hydro_lang/src/location/mod.rs +++ b/hydro_lang/src/location/mod.rs @@ -25,7 +25,7 @@ pub mod can_send; pub use can_send::CanSend; pub mod tick; -pub use tick::{NoTick, Tick, Timestamped}; +pub use tick::{Atomic, NoTick, Tick}; #[derive(PartialEq, Eq, Clone, Debug, Hash)] pub enum LocationId { diff --git a/hydro_lang/src/location/tick.rs b/hydro_lang/src/location/tick.rs index a5f6f7a90c7..094cd075f81 100644 --- a/hydro_lang/src/location/tick.rs +++ b/hydro_lang/src/location/tick.rs @@ -21,20 +21,20 @@ impl NoTick for Process<'_, T> {} impl NoTick for Cluster<'_, T> {} #[sealed] -pub trait NoTimestamp {} +pub trait NoAtomic {} #[sealed] -impl NoTimestamp for Process<'_, T> {} +impl NoAtomic for Process<'_, T> {} #[sealed] -impl NoTimestamp for Cluster<'_, T> {} +impl NoAtomic for Cluster<'_, T> {} #[sealed] -impl<'a, L: Location<'a>> NoTimestamp for Tick {} +impl<'a, L: Location<'a>> NoAtomic for Tick {} #[derive(Clone)] -pub struct Timestamped { +pub struct Atomic { pub(crate) tick: Tick, } -impl<'a, L: Location<'a>> Location<'a> for Timestamped { +impl<'a, L: Location<'a>> Location<'a> for Atomic { type Root = L::Root; fn root(&self) -> Self::Root { @@ -55,7 +55,7 @@ impl<'a, L: Location<'a>> Location<'a> for Timestamped { } #[sealed] -impl NoTick for Timestamped {} +impl NoTick for Atomic {} /// Marks the stream as being inside the single global clock domain. #[derive(Clone)] @@ -102,19 +102,18 @@ impl<'a, L: Location<'a>> Tick { batch_size: impl QuotedWithContext<'a, usize, L> + Copy + 'a, ) -> Stream<(), Self, Bounded> where - L: NoTick + NoTimestamp, + L: NoTick + NoAtomic, { let out = self .l .spin() .flat_map_ordered(q!(move |_| 0..batch_size)) - .map(q!(|_| ())) - .timestamped(self); + .map(q!(|_| ())); unsafe { // SAFETY: at runtime, `spin` produces a single value per tick, // so each batch is guaranteed to be the same size. - out.tick_batch() + out.tick_batch(self) } } @@ -123,11 +122,11 @@ impl<'a, L: Location<'a>> Tick { e: impl QuotedWithContext<'a, T, L>, ) -> Singleton where - L: NoTick, + L: NoTick + NoAtomic, { unsafe { // SAFETY: a top-level singleton produces the same value each tick - self.outer().singleton(e).timestamped(self).latest_tick() + self.outer().singleton(e).latest_tick(self) } } @@ -136,7 +135,7 @@ impl<'a, L: Location<'a>> Tick { e: impl QuotedWithContext<'a, T, Tick>, ) -> Optional where - L: NoTick, + L: NoTick + NoAtomic, { let e_arr = q!([e]); let e = e_arr.splice_untyped_ctx(self); @@ -185,9 +184,7 @@ impl<'a, L: Location<'a>> Tick { ) } - pub fn forward_ref_timestamped< - S: CycleCollection<'a, ForwardRefMarker, Location = Timestamped>, - >( + pub fn forward_ref_atomic>>( &self, ) -> (ForwardRef<'a, S>, S) { let next_id = { @@ -214,7 +211,7 @@ impl<'a, L: Location<'a>> Tick { expected_location: self.id(), _phantom: PhantomData, }, - S::create_source(ident, Timestamped { tick: self.clone() }), + S::create_source(ident, Atomic { tick: self.clone() }), ) } diff --git a/hydro_lang/src/optional.rs b/hydro_lang/src/optional.rs index b75ae8843f5..77fc852b648 100644 --- a/hydro_lang/src/optional.rs +++ b/hydro_lang/src/optional.rs @@ -9,7 +9,7 @@ use syn::parse_quote; use crate::builder::FLOW_USED_MESSAGE; use crate::cycle::{CycleCollection, CycleComplete, DeferTick, ForwardRefMarker, TickCycleMarker}; use crate::ir::{HydroLeaf, HydroNode, HydroSource, TeeNode}; -use crate::location::tick::{NoTimestamp, Timestamped}; +use crate::location::tick::{Atomic, NoAtomic}; use crate::location::{check_matching_location, LocationId, NoTick}; use crate::singleton::ZipResult; use crate::stream::NoOrder; @@ -220,7 +220,7 @@ impl<'a, T, L: Location<'a>, B> Optional { /// # tokio_test::block_on(test_util::stream_transform_test(|process| { /// let tick = process.tick(); /// let optional = tick.optional_first_tick(q!(1)); - /// optional.map(q!(|v| v + 1)).all_ticks().drop_timestamp() + /// optional.map(q!(|v| v + 1)).all_ticks() /// # }, |mut stream| async move { /// // 2 /// # assert_eq!(stream.next().await.unwrap(), 2); @@ -475,10 +475,10 @@ impl<'a, T, L: Location<'a>> Optional { } } -impl<'a, T, L: Location<'a> + NoTick, B> Optional, B> { - /// Given a tick, returns a optional value corresponding to a snapshot of the optional - /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all - /// relevant data that contributed to the snapshot at tick `t`. +impl<'a, T, L: Location<'a> + NoTick, B> Optional, B> { + /// Returns an optional value corresponding to the latest snapshot of the optional + /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include + /// at least all relevant data that contributed to the snapshot at tick `t`. /// /// # Safety /// Because this picks a snapshot of a optional whose value is continuously changing, @@ -494,17 +494,26 @@ impl<'a, T, L: Location<'a> + NoTick, B> Optional, B> { ) } - pub fn drop_timestamp(self) -> Optional { + pub fn end_atomic(self) -> Optional { Optional::new(self.location.tick.l, self.ir_node.into_inner()) } } -impl<'a, T, L: Location<'a> + NoTick, B> Optional { - pub fn timestamped(self, tick: &Tick) -> Optional, B> { - Optional::new( - Timestamped { tick: tick.clone() }, - self.ir_node.into_inner(), - ) +impl<'a, T, L: Location<'a> + NoTick + NoAtomic, B> Optional { + pub fn atomic(self, tick: &Tick) -> Optional, B> { + Optional::new(Atomic { tick: tick.clone() }, self.ir_node.into_inner()) + } + + /// Given a tick, returns a optional value corresponding to a snapshot of the optional + /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all + /// relevant data that contributed to the snapshot at tick `t`. + /// + /// # Safety + /// Because this picks a snapshot of a optional whose value is continuously changing, + /// the output optional has a non-deterministic value since the snapshot can be at an + /// arbitrary point in time. + pub unsafe fn latest_tick(self, tick: &Tick) -> Optional, Bounded> { + unsafe { self.atomic(tick).latest_tick() } } /// Eagerly samples the optional as fast as possible, returning a stream of snapshots @@ -519,10 +528,7 @@ impl<'a, T, L: Location<'a> + NoTick, B> Optional { unsafe { // SAFETY: source of intentional non-determinism - self.timestamped(&tick) - .latest_tick() - .all_ticks() - .drop_timestamp() + self.latest_tick(&tick).all_ticks() } } @@ -540,7 +546,7 @@ impl<'a, T, L: Location<'a> + NoTick, B> Optional { interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a, ) -> Stream where - L: NoTimestamp, + L: NoAtomic, { let samples = unsafe { // SAFETY: source of intentional non-determinism @@ -550,19 +556,27 @@ impl<'a, T, L: Location<'a> + NoTick, B> Optional { unsafe { // SAFETY: source of intentional non-determinism - self.timestamped(&tick) - .latest_tick() - .continue_if(samples.timestamped(&tick).tick_batch().first()) + self.latest_tick(&tick) + .continue_if(samples.tick_batch(&tick).first()) .all_ticks() - .drop_timestamp() } } } impl<'a, T, L: Location<'a>> Optional, Bounded> { - pub fn all_ticks(self) -> Stream, Unbounded> { + pub fn all_ticks(self) -> Stream { Stream::new( - Timestamped { + self.location.outer().clone(), + HydroNode::Persist { + inner: Box::new(self.ir_node.into_inner()), + metadata: self.location.new_node_metadata::(), + }, + ) + } + + pub fn all_ticks_atomic(self) -> Stream, Unbounded> { + Stream::new( + Atomic { tick: self.location.clone(), }, HydroNode::Persist { @@ -572,9 +586,19 @@ impl<'a, T, L: Location<'a>> Optional, Bounded> { ) } - pub fn latest(self) -> Optional, Unbounded> { + pub fn latest(self) -> Optional { + Optional::new( + self.location.outer().clone(), + HydroNode::Persist { + inner: Box::new(self.ir_node.into_inner()), + metadata: self.location.new_node_metadata::(), + }, + ) + } + + pub fn latest_atomic(self) -> Optional, Unbounded> { Optional::new( - Timestamped { + Atomic { tick: self.location.clone(), }, HydroNode::Persist { diff --git a/hydro_lang/src/rewrites/persist_pullup.rs b/hydro_lang/src/rewrites/persist_pullup.rs index a4429d7a067..9c97f062a9b 100644 --- a/hydro_lang/src/rewrites/persist_pullup.rs +++ b/hydro_lang/src/rewrites/persist_pullup.rs @@ -228,13 +228,7 @@ mod tests { let process = flow.process::<()>(); let tick = process.tick(); - let before_tee = unsafe { - process - .source_iter(q!(0..10)) - .timestamped(&tick) - .tick_batch() - .persist() - }; + let before_tee = unsafe { process.source_iter(q!(0..10)).tick_batch(&tick).persist() }; before_tee .clone() diff --git a/hydro_lang/src/rewrites/properties.rs b/hydro_lang/src/rewrites/properties.rs index e6516d0ef87..66fe0318ab9 100644 --- a/hydro_lang/src/rewrites/properties.rs +++ b/hydro_lang/src/rewrites/properties.rs @@ -121,8 +121,7 @@ mod tests { process .source_iter(q!(vec![])) .map(q!(|string: String| (string, ()))) - .timestamped(&tick) - .tick_batch() + .tick_batch(&tick) } .fold_keyed(q!(|| 0), counter_func) .all_ticks() diff --git a/hydro_lang/src/singleton.rs b/hydro_lang/src/singleton.rs index c5e8d00dea7..8bb57f3f30a 100644 --- a/hydro_lang/src/singleton.rs +++ b/hydro_lang/src/singleton.rs @@ -11,7 +11,7 @@ use crate::cycle::{ TickCycleMarker, }; use crate::ir::{HydroLeaf, HydroNode, TeeNode}; -use crate::location::tick::{NoTimestamp, Timestamped}; +use crate::location::tick::{Atomic, NoAtomic}; use crate::location::{check_matching_location, Location, LocationId, NoTick, Tick}; use crate::{Bounded, Optional, Stream, Unbounded}; @@ -350,10 +350,10 @@ impl<'a, T, L: Location<'a>, B> Singleton { } } -impl<'a, T, L: Location<'a> + NoTick, B> Singleton, B> { - /// Given a tick, returns a singleton value corresponding to a snapshot of the singleton - /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all - /// relevant data that contributed to the snapshot at tick `t`. +impl<'a, T, L: Location<'a> + NoTick, B> Singleton, B> { + /// Returns a singleton value corresponding to the latest snapshot of the singleton + /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include + /// at least all relevant data that contributed to the snapshot at tick `t`. /// /// # Safety /// Because this picks a snapshot of a singleton whose value is continuously changing, @@ -369,17 +369,29 @@ impl<'a, T, L: Location<'a> + NoTick, B> Singleton, B> { ) } - pub fn drop_timestamp(self) -> Optional { + pub fn end_atomic(self) -> Optional { Optional::new(self.location.tick.l, self.ir_node.into_inner()) } } -impl<'a, T, L: Location<'a> + NoTick, B> Singleton { - pub fn timestamped(self, tick: &Tick) -> Singleton, B> { - Singleton::new( - Timestamped { tick: tick.clone() }, - self.ir_node.into_inner(), - ) +impl<'a, T, L: Location<'a> + NoTick + NoAtomic, B> Singleton { + pub fn atomic(self, tick: &Tick) -> Singleton, B> { + Singleton::new(Atomic { tick: tick.clone() }, self.ir_node.into_inner()) + } + + /// Given a tick, returns a singleton value corresponding to a snapshot of the singleton + /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all + /// relevant data that contributed to the snapshot at tick `t`. + /// + /// # Safety + /// Because this picks a snapshot of a singleton whose value is continuously changing, + /// the output singleton has a non-deterministic value since the snapshot can be at an + /// arbitrary point in time. + pub unsafe fn latest_tick(self, tick: &Tick) -> Singleton, Bounded> + where + L: NoTick, + { + unsafe { self.atomic(tick).latest_tick() } } /// Eagerly samples the singleton as fast as possible, returning a stream of snapshots @@ -394,10 +406,7 @@ impl<'a, T, L: Location<'a> + NoTick, B> Singleton { unsafe { // SAFETY: source of intentional non-determinism - self.timestamped(&tick) - .latest_tick() - .all_ticks() - .drop_timestamp() + self.latest_tick(&tick).all_ticks() } } @@ -415,7 +424,7 @@ impl<'a, T, L: Location<'a> + NoTick, B> Singleton { interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a, ) -> Stream where - L: NoTimestamp, + L: NoAtomic, { let samples = unsafe { // SAFETY: source of intentional non-determinism @@ -425,19 +434,27 @@ impl<'a, T, L: Location<'a> + NoTick, B> Singleton { unsafe { // SAFETY: source of intentional non-determinism - self.timestamped(&tick) - .latest_tick() - .continue_if(samples.timestamped(&tick).tick_batch().first()) + self.latest_tick(&tick) + .continue_if(samples.tick_batch(&tick).first()) .all_ticks() - .drop_timestamp() } } } impl<'a, T, L: Location<'a>> Singleton, Bounded> { - pub fn all_ticks(self) -> Stream, Unbounded> { + pub fn all_ticks(self) -> Stream { Stream::new( - Timestamped { + self.location.outer().clone(), + HydroNode::Persist { + inner: Box::new(self.ir_node.into_inner()), + metadata: self.location.new_node_metadata::(), + }, + ) + } + + pub fn all_ticks_atomic(self) -> Stream, Unbounded> { + Stream::new( + Atomic { tick: self.location.clone(), }, HydroNode::Persist { @@ -447,9 +464,19 @@ impl<'a, T, L: Location<'a>> Singleton, Bounded> { ) } - pub fn latest(self) -> Singleton, Unbounded> { + pub fn latest(self) -> Singleton { + Singleton::new( + self.location.outer().clone(), + HydroNode::Persist { + inner: Box::new(self.ir_node.into_inner()), + metadata: self.location.new_node_metadata::(), + }, + ) + } + + pub fn latest_atomic(self) -> Singleton, Unbounded> { Singleton::new( - Timestamped { + Atomic { tick: self.location.clone(), }, HydroNode::Persist { @@ -505,46 +532,6 @@ pub trait ZipResult<'a, Other> { fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out; } -impl<'a, T, U: Clone, L: Location<'a>, B> ZipResult<'a, Singleton, B>> - for Singleton, B> -{ - type Out = Singleton<(T, U), Timestamped, B>; - type ElementType = (T, U); - type Location = Timestamped; - - fn other_location(other: &Singleton, B>) -> Timestamped { - other.location.clone() - } - - fn other_ir_node(other: Singleton, B>) -> HydroNode { - other.ir_node.into_inner() - } - - fn make(location: Timestamped, ir_node: HydroNode) -> Self::Out { - Singleton::new(location, ir_node) - } -} - -impl<'a, T, U: Clone, L: Location<'a>, B> ZipResult<'a, Optional, B>> - for Singleton, B> -{ - type Out = Optional<(T, U), Timestamped, B>; - type ElementType = (T, U); - type Location = Timestamped; - - fn other_location(other: &Optional, B>) -> Timestamped { - other.location.clone() - } - - fn other_ir_node(other: Optional, B>) -> HydroNode { - other.ir_node.into_inner() - } - - fn make(location: Timestamped, ir_node: HydroNode) -> Self::Out { - Optional::new(location, ir_node) - } -} - impl<'a, T, U: Clone, L: Location<'a>, B> ZipResult<'a, Singleton, B>> for Singleton, B> { diff --git a/hydro_lang/src/stream.rs b/hydro_lang/src/stream.rs index 0d902f3cf66..6cc9f9210f6 100644 --- a/hydro_lang/src/stream.rs +++ b/hydro_lang/src/stream.rs @@ -16,7 +16,7 @@ use crate::builder::FLOW_USED_MESSAGE; use crate::cycle::{CycleCollection, CycleComplete, DeferTick, ForwardRefMarker, TickCycleMarker}; use crate::ir::{DebugInstantiate, HydroLeaf, HydroNode, TeeNode}; use crate::location::external_process::{ExternalBincodeStream, ExternalBytesPort}; -use crate::location::tick::{NoTimestamp, Timestamped}; +use crate::location::tick::{Atomic, NoAtomic}; use crate::location::{ check_matching_location, CanSend, ExternalProcess, Location, LocationId, NoTick, Tick, }; @@ -478,11 +478,10 @@ impl<'a, T, L: Location<'a>, B, Order> Stream { /// let batch = unsafe { /// process /// .source_iter(q!(vec![1, 2, 3, 4])) - /// .timestamped(&tick) - /// .tick_batch() + /// .tick_batch(&tick) /// }; /// let count = batch.clone().count(); - /// batch.cross_singleton(count).all_ticks().drop_timestamp() + /// batch.cross_singleton(count).all_ticks() /// # }, |mut stream| async move { /// // (1, 4), (2, 4), (3, 4), (4, 4) /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] { @@ -664,11 +663,10 @@ where /// # tokio_test::block_on(test_util::stream_transform_test(|process| { /// let tick = process.tick(); /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4])); - /// let batch = unsafe { numbers.timestamped(&tick).tick_batch() }; + /// let batch = unsafe { numbers.tick_batch(&tick) }; /// batch /// .fold_commutative(q!(|| 0), q!(|acc, x| *acc += x)) /// .all_ticks() - /// .drop_timestamp() /// # }, |mut stream| async move { /// // 10 /// # assert_eq!(stream.next().await.unwrap(), 10); @@ -716,11 +714,10 @@ where /// # tokio_test::block_on(test_util::stream_transform_test(|process| { /// let tick = process.tick(); /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4])); - /// let batch = unsafe { numbers.timestamped(&tick).tick_batch() }; + /// let batch = unsafe { numbers.tick_batch(&tick) }; /// batch /// .reduce_commutative(q!(|curr, new| *curr += new)) /// .all_ticks() - /// .drop_timestamp() /// # }, |mut stream| async move { /// // 10 /// # assert_eq!(stream.next().await.unwrap(), 10); @@ -757,8 +754,8 @@ where /// # tokio_test::block_on(test_util::stream_transform_test(|process| { /// let tick = process.tick(); /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4])); - /// let batch = unsafe { numbers.timestamped(&tick).tick_batch() }; - /// batch.max().all_ticks().drop_timestamp() + /// let batch = unsafe { numbers.tick_batch(&tick) }; + /// batch.max().all_ticks() /// # }, |mut stream| async move { /// // 4 /// # assert_eq!(stream.next().await.unwrap(), 4); @@ -786,8 +783,8 @@ where /// # tokio_test::block_on(test_util::stream_transform_test(|process| { /// let tick = process.tick(); /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4])); - /// let batch = unsafe { numbers.timestamped(&tick).tick_batch() }; - /// batch.max_by_key(q!(|x| -x)).all_ticks().drop_timestamp() + /// let batch = unsafe { numbers.tick_batch(&tick) }; + /// batch.max_by_key(q!(|x| -x)).all_ticks() /// # }, |mut stream| async move { /// // 1 /// # assert_eq!(stream.next().await.unwrap(), 1); @@ -834,8 +831,8 @@ where /// # tokio_test::block_on(test_util::stream_transform_test(|process| { /// let tick = process.tick(); /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4])); - /// let batch = unsafe { numbers.timestamped(&tick).tick_batch() }; - /// batch.min().all_ticks().drop_timestamp() + /// let batch = unsafe { numbers.tick_batch(&tick) }; + /// batch.min().all_ticks() /// # }, |mut stream| async move { /// // 1 /// # assert_eq!(stream.next().await.unwrap(), 1); @@ -861,8 +858,8 @@ where /// # tokio_test::block_on(test_util::stream_transform_test(|process| { /// let tick = process.tick(); /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4])); - /// let batch = unsafe { numbers.timestamped(&tick).tick_batch() }; - /// batch.count().all_ticks().drop_timestamp() + /// let batch = unsafe { numbers.tick_batch(&tick) }; + /// batch.count().all_ticks() /// # }, |mut stream| async move { /// // 4 /// # assert_eq!(stream.next().await.unwrap(), 4); @@ -915,8 +912,8 @@ impl<'a, T, L: Location<'a>, B> Stream { /// # tokio_test::block_on(test_util::stream_transform_test(|process| { /// let tick = process.tick(); /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4])); - /// let batch = unsafe { numbers.timestamped(&tick).tick_batch() }; - /// batch.first().all_ticks().drop_timestamp() + /// let batch = unsafe { numbers.tick_batch(&tick) }; + /// batch.first().all_ticks() /// # }, |mut stream| async move { /// // 1 /// # assert_eq!(stream.next().await.unwrap(), 1); @@ -939,8 +936,8 @@ impl<'a, T, L: Location<'a>, B> Stream { /// # tokio_test::block_on(test_util::stream_transform_test(|process| { /// let tick = process.tick(); /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4])); - /// let batch = unsafe { numbers.timestamped(&tick).tick_batch() }; - /// batch.last().all_ticks().drop_timestamp() + /// let batch = unsafe { numbers.tick_batch(&tick) }; + /// batch.last().all_ticks() /// # }, |mut stream| async move { /// // 4 /// # assert_eq!(stream.next().await.unwrap(), 4); @@ -964,11 +961,10 @@ impl<'a, T, L: Location<'a>, B> Stream { /// # tokio_test::block_on(test_util::stream_transform_test(|process| { /// let tick = process.tick(); /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"])); - /// let batch = unsafe { words.timestamped(&tick).tick_batch() }; + /// let batch = unsafe { words.tick_batch(&tick) }; /// batch /// .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x))) /// .all_ticks() - /// .drop_timestamp() /// # }, |mut stream| async move { /// // "HELLOWORLD" /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD"); @@ -1016,12 +1012,11 @@ impl<'a, T, L: Location<'a>, B> Stream { /// # tokio_test::block_on(test_util::stream_transform_test(|process| { /// let tick = process.tick(); /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"])); - /// let batch = unsafe { words.timestamped(&tick).tick_batch() }; + /// let batch = unsafe { words.tick_batch(&tick) }; /// batch /// .map(q!(|x| x.to_string())) /// .reduce(q!(|curr, new| curr.push_str(&new))) /// .all_ticks() - /// .drop_timestamp() /// # }, |mut stream| async move { /// // "HELLOWORLD" /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD"); @@ -1049,7 +1044,7 @@ impl<'a, T, L: Location<'a>, B> Stream { } } -impl<'a, T, L: Location<'a> + NoTick + NoTimestamp, O> Stream { +impl<'a, T, L: Location<'a> + NoTick + NoAtomic, O> Stream { /// Produces a new stream that interleaves the elements of the two input streams. /// The result has [`NoOrder`] because the order of interleaving is not guaranteed. /// @@ -1075,17 +1070,10 @@ impl<'a, T, L: Location<'a> + NoTick + NoTimestamp, O> Stream() - .chain( - other - .timestamped(&tick) - .tick_batch() - .assume_ordering::(), - ) + .chain(other.tick_batch(&tick).assume_ordering::()) .all_ticks() - .drop_timestamp() .assume_ordering() } } @@ -1106,8 +1094,8 @@ impl<'a, T, L: Location<'a>, Order> Stream { /// # tokio_test::block_on(test_util::stream_transform_test(|process| { /// let tick = process.tick(); /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1])); - /// let batch = unsafe { numbers.timestamped(&tick).tick_batch() }; - /// batch.sort().all_ticks().drop_timestamp() + /// let batch = unsafe { numbers.tick_batch(&tick) }; + /// batch.sort().all_ticks() /// # }, |mut stream| async move { /// // 1, 2, 3, 4 /// # for w in (1..5) { @@ -1144,13 +1132,8 @@ impl<'a, T, L: Location<'a>, Order> Stream { /// # tokio_test::block_on(test_util::stream_transform_test(|process| { /// let tick = process.tick(); /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4])); - /// let batch = unsafe { numbers.timestamped(&tick).tick_batch() }; - /// batch - /// .clone() - /// .map(q!(|x| x + 1)) - /// .chain(batch) - /// .all_ticks() - /// .drop_timestamp() + /// let batch = unsafe { numbers.tick_batch(&tick) }; + /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks() /// # }, |mut stream| async move { /// // 2, 3, 4, 5, 1, 2, 3, 4 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] { @@ -1233,11 +1216,10 @@ impl<'a, K: Eq + Hash, V, L: Location<'a>> Stream<(K, V), Tick, Bounded> { /// # tokio_test::block_on(test_util::stream_transform_test(|process| { /// let tick = process.tick(); /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)])); - /// let batch = unsafe { numbers.timestamped(&tick).tick_batch() }; + /// let batch = unsafe { numbers.tick_batch(&tick) }; /// batch /// .fold_keyed(q!(|| 0), q!(|acc, x| *acc += x)) /// .all_ticks() - /// .drop_timestamp() /// # }, |mut stream| async move { /// // (1, 5), (2, 7) /// # assert_eq!(stream.next().await.unwrap(), (1, 5)); @@ -1279,11 +1261,8 @@ impl<'a, K: Eq + Hash, V, L: Location<'a>> Stream<(K, V), Tick, Bounded> { /// # tokio_test::block_on(test_util::stream_transform_test(|process| { /// let tick = process.tick(); /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)])); - /// let batch = unsafe { numbers.timestamped(&tick).tick_batch() }; - /// batch - /// .reduce_keyed(q!(|acc, x| *acc += x)) - /// .all_ticks() - /// .drop_timestamp() + /// let batch = unsafe { numbers.tick_batch(&tick) }; + /// batch.reduce_keyed(q!(|acc, x| *acc += x)).all_ticks() /// # }, |mut stream| async move { /// // (1, 5), (2, 7) /// # assert_eq!(stream.next().await.unwrap(), (1, 5)); @@ -1324,11 +1303,10 @@ impl<'a, K: Eq + Hash, V, L: Location<'a>, Order> Stream<(K, V), Tick, Bounde /// # tokio_test::block_on(test_util::stream_transform_test(|process| { /// let tick = process.tick(); /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)])); - /// let batch = unsafe { numbers.timestamped(&tick).tick_batch() }; + /// let batch = unsafe { numbers.tick_batch(&tick) }; /// batch /// .fold_keyed_commutative(q!(|| 0), q!(|acc, x| *acc += x)) /// .all_ticks() - /// .drop_timestamp() /// # }, |mut stream| async move { /// // (1, 5), (2, 7) /// # assert_eq!(stream.next().await.unwrap(), (1, 5)); @@ -1375,11 +1353,10 @@ impl<'a, K: Eq + Hash, V, L: Location<'a>, Order> Stream<(K, V), Tick, Bounde /// # tokio_test::block_on(test_util::stream_transform_test(|process| { /// let tick = process.tick(); /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)])); - /// let batch = unsafe { numbers.timestamped(&tick).tick_batch() }; + /// let batch = unsafe { numbers.tick_batch(&tick) }; /// batch /// .reduce_keyed_commutative(q!(|acc, x| *acc += x)) /// .all_ticks() - /// .drop_timestamp() /// # }, |mut stream| async move { /// // (1, 5), (2, 7) /// # assert_eq!(stream.next().await.unwrap(), (1, 5)); @@ -1403,10 +1380,10 @@ impl<'a, K: Eq + Hash, V, L: Location<'a>, Order> Stream<(K, V), Tick, Bounde } } -impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream, B, Order> { - /// Given a tick, returns a stream corresponding to a batch of elements for that tick. - /// These batches are guaranteed to be contiguous across ticks and preserve the order - /// of the input. +impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream, B, Order> { + /// Returns a stream corresponding to the latest batch of elements being atomically + /// processed. These batches are guaranteed to be contiguous across ticks and preserve + /// the order of the input. /// /// # Safety /// The batch boundaries are non-deterministic and may change across executions. @@ -1420,21 +1397,28 @@ impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream, B, Ord ) } - pub fn drop_timestamp(self) -> Stream { + pub fn end_atomic(self) -> Stream { Stream::new(self.location.tick.l, self.ir_node.into_inner()) } - pub fn timestamp_source(&self) -> Tick { + pub fn atomic_source(&self) -> Tick { self.location.tick.clone() } } -impl<'a, T, L: Location<'a> + NoTick + NoTimestamp, B, Order> Stream { - pub fn timestamped(self, tick: &Tick) -> Stream, B, Order> { - Stream::new( - Timestamped { tick: tick.clone() }, - self.ir_node.into_inner(), - ) +impl<'a, T, L: Location<'a> + NoTick + NoAtomic, B, Order> Stream { + pub fn atomic(self, tick: &Tick) -> Stream, B, Order> { + Stream::new(Atomic { tick: tick.clone() }, self.ir_node.into_inner()) + } + + /// Given a tick, returns a stream corresponding to a batch of elements segmented by + /// that tick. These batches are guaranteed to be contiguous across ticks and preserve + /// the order of the input. + /// + /// # Safety + /// The batch boundaries are non-deterministic and may change across executions. + pub unsafe fn tick_batch(self, tick: &Tick) -> Stream, Bounded, Order> { + unsafe { self.atomic(tick).tick_batch() } } /// Given a time interval, returns a stream corresponding to samples taken from the @@ -1457,11 +1441,9 @@ impl<'a, T, L: Location<'a> + NoTick + NoTimestamp, B, Order> Stream + NoTick + NoTimestamp, B, Order> Stream + NoTick + NoTimestamp, B, Order> Stream + NoTick, B, Order> Stream { } impl<'a, T, L: Location<'a>, Order> Stream, Bounded, Order> { - pub fn all_ticks(self) -> Stream, Unbounded, Order> { + pub fn all_ticks(self) -> Stream { + Stream::new( + self.location.outer().clone(), + HydroNode::Persist { + inner: Box::new(self.ir_node.into_inner()), + metadata: self.location.new_node_metadata::(), + }, + ) + } + + pub fn all_ticks_atomic(self) -> Stream, Unbounded, Order> { Stream::new( - Timestamped { + Atomic { tick: self.location.clone(), }, HydroNode::Persist { diff --git a/hydro_std/src/quorum.rs b/hydro_std/src/quorum.rs index beae281c90b..4b7466e3ea8 100644 --- a/hydro_std/src/quorum.rs +++ b/hydro_std/src/quorum.rs @@ -12,14 +12,14 @@ pub fn collect_quorum_with_response< V: Clone, E: Clone, >( - responses: Stream<(K, Result), Timestamped, Unbounded, Order>, + responses: Stream<(K, Result), Atomic, Unbounded, Order>, min: usize, max: usize, ) -> ( - Stream<(K, V), Timestamped, Unbounded, Order>, - Stream<(K, E), Timestamped, Unbounded, Order>, + Stream<(K, V), Atomic, Unbounded, Order>, + Stream<(K, E), Atomic, Unbounded, Order>, ) { - let tick = responses.timestamp_source(); + let tick = responses.atomic_source(); let (not_all_complete_cycle, not_all) = tick.cycle::>(); let current_responses = not_all.chain(unsafe { @@ -91,7 +91,7 @@ pub fn collect_quorum_with_response< Ok(v) => Some((key, v)), Err(_) => None, })) - .all_ticks(), + .all_ticks_atomic(), responses.filter_map(q!(move |(key, res)| match res { Ok(_) => None, Err(e) => Some((key, e)), @@ -101,14 +101,14 @@ pub fn collect_quorum_with_response< #[expect(clippy::type_complexity, reason = "stream types with ordering")] pub fn collect_quorum<'a, L: Location<'a> + NoTick, Order, K: Clone + Eq + Hash, E: Clone>( - responses: Stream<(K, Result<(), E>), Timestamped, Unbounded, Order>, + responses: Stream<(K, Result<(), E>), Atomic, Unbounded, Order>, min: usize, max: usize, ) -> ( - Stream, Unbounded, Order>, - Stream<(K, E), Timestamped, Unbounded, Order>, + Stream, Unbounded, Order>, + Stream<(K, E), Atomic, Unbounded, Order>, ) { - let tick = responses.timestamp_source(); + let tick = responses.atomic_source(); let (not_all_complete_cycle, not_all) = tick.cycle::>(); let current_responses = not_all.chain(unsafe { @@ -169,7 +169,7 @@ pub fn collect_quorum<'a, L: Location<'a> + NoTick, Order, K: Clone + Eq + Hash, }; ( - just_reached_quorum.all_ticks(), + just_reached_quorum.all_ticks_atomic(), responses.filter_map(q!(move |(key, res)| match res { Ok(_) => None, Err(e) => Some((key, e)), diff --git a/hydro_std/src/request_response.rs b/hydro_std/src/request_response.rs index 7532fd88996..65328a03caa 100644 --- a/hydro_std/src/request_response.rs +++ b/hydro_std/src/request_response.rs @@ -3,7 +3,7 @@ use std::hash::Hash; use hydro_lang::*; use location::NoTick; -type JoinResponses = Stream<(K, (M, V)), Timestamped, Unbounded, NoOrder>; +type JoinResponses = Stream<(K, (M, V)), Atomic, Unbounded, NoOrder>; /// Given an incoming stream of request-response responses, joins with metadata generated /// at request time that is stored in-memory. @@ -13,7 +13,7 @@ type JoinResponses = Stream<(K, (M, V)), Timestamped, Unbounded, /// key, same for the metadata stream. pub fn join_responses<'a, K: Clone + Eq + Hash, M: Clone, V: Clone, L: Location<'a> + NoTick>( tick: &Tick, - responses: Stream<(K, V), Timestamped, Unbounded, NoOrder>, + responses: Stream<(K, V), Atomic, Unbounded, NoOrder>, metadata: Stream<(K, M), Tick, Bounded, NoOrder>, ) -> JoinResponses { let (remaining_to_join_complete_cycle, remaining_to_join) = @@ -38,5 +38,5 @@ pub fn join_responses<'a, K: Clone + Eq + Hash, M: Clone, V: Clone, L: Location< remaining_to_join_complete_cycle .complete_next_tick(remaining_and_new.anti_join(responses.map(q!(|(key, _)| key)))); - joined_this_tick.all_ticks() + joined_this_tick.all_ticks_atomic() } diff --git a/hydro_test/src/cluster/bench_client.rs b/hydro_test/src/cluster/bench_client.rs index d573a9d2c1f..f95e60a21e9 100644 --- a/hydro_test/src/cluster/bench_client.rs +++ b/hydro_test/src/cluster/bench_client.rs @@ -35,9 +35,7 @@ pub fn bench_client<'a>( // across *different* keys, we are safe because delaying a transaction result for a key // will only affect when the next request for that key is emitted with respect to other // keys - transaction_cycle(c_to_proposers) - .timestamped(&client_tick) - .tick_batch() + transaction_cycle(c_to_proposers).tick_batch(&client_tick) }; // Whenever all replicas confirm that a payload was committed, send another payload @@ -52,8 +50,7 @@ pub fn bench_client<'a>( // across keys c_new_payloads_when_committed.assume_ordering::() }) - .all_ticks() - .drop_timestamp(), + .all_ticks(), ); // Track statistics @@ -82,8 +79,7 @@ pub fn bench_client<'a>( // SAFETY: intentionally sampling statistics clients .source_interval(q!(Duration::from_secs(1))) - .timestamped(&client_tick) - .tick_batch() + .tick_batch(&client_tick) } .first(); @@ -146,7 +142,9 @@ pub fn bench_client<'a>( unsafe { // SAFETY: intentionally sampling statistics - c_latencies.zip(c_throughput).latest_tick() + c_latencies + .latest_tick(&client_tick) + .zip(c_throughput.latest_tick(&client_tick)) } .continue_if(c_stats_output_timer) .all_ticks() diff --git a/hydro_test/src/cluster/kv_replica.rs b/hydro_test/src/cluster/kv_replica.rs index 946a618182b..1aa941e07df 100644 --- a/hydro_test/src/cluster/kv_replica.rs +++ b/hydro_test/src/cluster/kv_replica.rs @@ -51,9 +51,10 @@ pub fn kv_replica<'a, K: KvKey, V: KvValue>( Stream, Unbounded>, Stream, Cluster<'a, Replica>, Unbounded>, ) { - let p_to_replicas = p_to_replicas - .into() - .map(q!(|(slot, kv)| SequencedKv { seq: slot, kv })); + let p_to_replicas: Stream, Cluster<'a, Replica>, Unbounded, NoOrder> = + p_to_replicas + .into() + .map(q!(|(slot, kv)| SequencedKv { seq: slot, kv })); let replica_tick = replicas.tick(); @@ -62,9 +63,7 @@ pub fn kv_replica<'a, K: KvKey, V: KvValue>( let r_sorted_payloads = unsafe { // SAFETY: because we fill slots one-by-one, we can safely batch // because non-determinism is resolved when we sort by slots - p_to_replicas - .timestamped(&replica_tick) - .tick_batch() + p_to_replicas.tick_batch(&replica_tick) } .chain(r_buffered_payloads) // Combine with all payloads that we've received and not processed yet .sort(); @@ -145,8 +144,5 @@ pub fn kv_replica<'a, K: KvKey, V: KvValue>( let r_to_clients = r_processable_payloads .filter_map(q!(|payload| payload.kv)) .all_ticks(); - ( - r_checkpoint_seq_new.all_ticks().drop_timestamp(), - r_to_clients.drop_timestamp(), - ) + (r_checkpoint_seq_new.all_ticks(), r_to_clients) } diff --git a/hydro_test/src/cluster/map_reduce.rs b/hydro_test/src/cluster/map_reduce.rs index 819df581958..64a58bcb230 100644 --- a/hydro_test/src/cluster/map_reduce.rs +++ b/hydro_test/src/cluster/map_reduce.rs @@ -17,7 +17,7 @@ pub fn map_reduce<'a>(flow: &FlowBuilder<'a>) -> (Process<'a, Leader>, Cluster<' let batches = unsafe { // SAFETY: addition is associative so we can batch reduce - partitioned_words.timestamped(&cluster.tick()).tick_batch() + partitioned_words.tick_batch(&cluster.tick()) } .fold_keyed(q!(|| 0), q!(|count, _| *count += 1)) .inspect(q!(|(string, count)| println!( @@ -30,8 +30,7 @@ pub fn map_reduce<'a>(flow: &FlowBuilder<'a>) -> (Process<'a, Leader>, Cluster<' unsafe { // SAFETY: addition is associative so we can batch reduce batches - .timestamped(&process.tick()) - .tick_batch() + .tick_batch(&process.tick()) .persist() .reduce_keyed_commutative(q!(|total, count| *total += count)) } diff --git a/hydro_test/src/cluster/paxos.rs b/hydro_test/src/cluster/paxos.rs index 2d40915ea9f..93cb6b90fa0 100644 --- a/hydro_test/src/cluster/paxos.rs +++ b/hydro_test/src/cluster/paxos.rs @@ -144,8 +144,7 @@ pub unsafe fn paxos_core<'a, P: PaxosPayload, R>( just_became_leader .clone() .then(p_ballot.clone()) - .all_ticks() - .drop_timestamp(), + .all_ticks(), ); let (p_to_replicas, a_log, sequencing_max_ballots) = unsafe { @@ -178,10 +177,7 @@ pub unsafe fn paxos_core<'a, P: PaxosPayload, R>( ( // Only tell the clients once when leader election concludes - just_became_leader - .then(p_ballot) - .all_ticks() - .drop_timestamp(), + just_became_leader.then(p_ballot).all_ticks(), p_to_replicas, ) } @@ -230,9 +226,7 @@ unsafe fn leader_election<'a, L: Clone + Debug + Serialize + DeserializeOwned>( let (p_ballot, p_has_largest_ballot) = p_ballot_calc(proposer_tick, unsafe { // SAFETY: A stale max ballot might result in us failing to become the leader, but which proposer // becomes the leader is non-deterministic anyway. - p_received_max_ballot - .timestamped(proposer_tick) - .latest_tick() + p_received_max_ballot.latest_tick(proposer_tick) }); let (p_to_proposers_i_am_leader, p_trigger_election) = unsafe { @@ -263,7 +257,7 @@ unsafe fn leader_election<'a, L: Clone + Debug + Serialize + DeserializeOwned>( // SAFETY: Non-deterministic batching may result in different payloads being rejected // by an acceptor if the payload is batched with another payload with larger ballot. // But as documented, payloads may be non-deterministically dropped during leader election. - p_to_acceptors_p1a.timestamped(acceptor_tick).tick_batch() + p_to_acceptors_p1a.tick_batch(acceptor_tick) }, a_log, proposers, @@ -277,7 +271,7 @@ unsafe fn leader_election<'a, L: Clone + Debug + Serialize + DeserializeOwned>( f, ); p_is_leader_complete_cycle.complete(p_is_leader.clone()); - p1b_fail_complete.complete(fail_ballots.drop_timestamp()); + p1b_fail_complete.complete(fail_ballots.end_atomic()); (p_ballot, p_is_leader, p_accepted_values, a_max_ballot) } @@ -349,7 +343,6 @@ unsafe fn p_leader_heartbeat<'a>( .clone() .then(p_ballot) .latest() - .drop_timestamp() .sample_every(q!(Duration::from_secs(i_am_leader_send_timeout))) } .broadcast_bincode_interleaved(proposers); @@ -361,8 +354,7 @@ unsafe fn p_leader_heartbeat<'a>( p_to_proposers_i_am_leader .clone() .timeout(q!(Duration::from_secs(i_am_leader_check_timeout))) - .timestamped(proposer_tick) - .latest_tick() + .latest_tick(proposer_tick) .continue_unless(p_is_leader) }; @@ -381,8 +373,7 @@ unsafe fn p_leader_heartbeat<'a>( )), q!(Duration::from_secs(i_am_leader_check_timeout)), ) - .timestamped(proposer_tick) - .tick_batch() + .tick_batch(proposer_tick) .first(), ) }; @@ -446,13 +437,10 @@ fn p_p1b<'a, P: Clone + Serialize + DeserializeOwned>( ) -> ( Optional<(), Tick>, Bounded>, Stream<(Option, P), Tick>, Bounded, NoOrder>, - Stream>, Unbounded, NoOrder>, + Stream>, Unbounded, NoOrder>, ) { - let (quorums, fails) = collect_quorum_with_response( - a_to_proposers_p1b.timestamped(proposer_tick), - f + 1, - 2 * f + 1, - ); + let (quorums, fails) = + collect_quorum_with_response(a_to_proposers_p1b.atomic(proposer_tick), f + 1, 2 * f + 1); let p_received_quorum_of_p1bs = unsafe { // SAFETY: All the values for a quorum will be emitted in a single batch, @@ -614,7 +602,7 @@ unsafe fn sequence_payload<'a, P: PaxosPayload, R>( Stream<(usize, Option

), Cluster<'a, Proposer>, Unbounded, NoOrder>, Singleton< (Option, HashMap>), - Timestamped>, + Atomic>, Unbounded, >, Stream, Unbounded, NoOrder>, @@ -629,8 +617,7 @@ unsafe fn sequence_payload<'a, P: PaxosPayload, R>( // in which payloads are committed when the leader is changing, which is documented at // the function level. c_to_proposers - .timestamped(proposer_tick) - .tick_batch() + .tick_batch(proposer_tick) .continue_if(p_is_leader.clone()) }); @@ -642,7 +629,7 @@ unsafe fn sequence_payload<'a, P: PaxosPayload, R>( ))) .chain(p_log_to_recommit.map(q!(|p2a| ((p2a.slot, p2a.ballot), p2a.value)))) .continue_if(p_is_leader) - .all_ticks(); + .all_ticks_atomic(); let (a_log, a_to_proposers_p2b) = acceptor_p2( acceptor_tick, @@ -661,11 +648,8 @@ unsafe fn sequence_payload<'a, P: PaxosPayload, R>( ); // TOOD: only persist if we are the leader - let (quorums, fails) = collect_quorum( - a_to_proposers_p2b.timestamped(proposer_tick), - f + 1, - 2 * f + 1, - ); + let (quorums, fails) = + collect_quorum(a_to_proposers_p2b.atomic(proposer_tick), f + 1, 2 * f + 1); let p_to_replicas = join_responses(proposer_tick, quorums.map(q!(|k| (k, ()))), unsafe { // SAFETY: The metadata will always be generated before we get a quorum @@ -676,9 +660,9 @@ unsafe fn sequence_payload<'a, P: PaxosPayload, R>( ( p_to_replicas .map(q!(|((slot, _ballot), (value, _))| (slot, value))) - .drop_timestamp(), + .end_atomic(), a_log, - fails.map(q!(|(_, ballot)| ballot)).drop_timestamp(), + fails.map(q!(|(_, ballot)| ballot)).end_atomic(), ) } @@ -735,7 +719,7 @@ fn acceptor_p2<'a, P: PaxosPayload, R>( ) -> ( Singleton< (Option, HashMap>), - Timestamped>, + Atomic>, Unbounded, >, Stream<((usize, Ballot), Result<(), Ballot>), Cluster<'a, Proposer>, Unbounded, NoOrder>, @@ -745,7 +729,7 @@ fn acceptor_p2<'a, P: PaxosPayload, R>( // a confirmation to the proposer. Because we use `persist()` on these // messages before folding into the log, non-deterministic batch boundaries // will not affect the eventual log state. - p_to_acceptors_p2a.timestamped(acceptor_tick).tick_batch() + p_to_acceptors_p2a.tick_batch(acceptor_tick) }; // Get the latest checkpoint sequence per replica @@ -754,9 +738,7 @@ fn acceptor_p2<'a, P: PaxosPayload, R>( // that do not need to be saved (because the data is at all replicas). This affects // the logs that will be collected during a leader re-election, but eventually the // same checkpoint will arrive at acceptors and those slots will be eventually deleted. - r_to_acceptors_checkpoint - .timestamped(acceptor_tick) - .tick_batch() + r_to_acceptors_checkpoint.tick_batch(acceptor_tick) } .persist() .reduce_keyed_commutative(q!(|curr_seq, seq| { @@ -792,7 +774,7 @@ fn acceptor_p2<'a, P: PaxosPayload, R>( )); let a_log = a_p2as_to_place_in_log .chain(a_new_checkpoint.into_stream()) - .all_ticks() + .all_ticks_atomic() .fold_commutative( q!(|| (None, HashMap::new())), q!(|(prev_checkpoint, log), checkpoint_or_p2a| { diff --git a/hydro_test/src/cluster/paxos_bench.rs b/hydro_test/src/cluster/paxos_bench.rs index 1c06048491d..f5a6c062c2d 100644 --- a/hydro_test/src/cluster/paxos_bench.rs +++ b/hydro_test/src/cluster/paxos_bench.rs @@ -67,12 +67,12 @@ pub fn paxos_bench<'a>( // we only mark a transaction as committed when all replicas have applied it collect_quorum::<_, _, _, ()>( - c_received_payloads.timestamped(&clients.tick()), + c_received_payloads.atomic(&clients.tick()), paxos_config.f + 1, paxos_config.f + 1, ) .0 - .drop_timestamp() + .end_atomic() }, num_clients_per_node, median_latency_window_size, diff --git a/hydro_test/src/cluster/paxos_with_client.rs b/hydro_test/src/cluster/paxos_with_client.rs index f850ed088e3..029815fb083 100644 --- a/hydro_test/src/cluster/paxos_with_client.rs +++ b/hydro_test/src/cluster/paxos_with_client.rs @@ -43,9 +43,9 @@ pub unsafe fn paxos_with_client<'a, C: 'a, R, P: PaxosPayload>( // is documented non-determinism. let client_tick = clients.tick(); - let payload_batch = payloads.timestamped(&client_tick).tick_batch(); + let payload_batch = payloads.tick_batch(&client_tick); - let latest_leader = cur_leader_id.timestamped(&client_tick).latest_tick(); + let latest_leader = cur_leader_id.latest_tick(&client_tick); let (unsent_payloads_complete, unsent_payloads) = client_tick.cycle::>(); diff --git a/hydro_test/src/cluster/snapshots/hydro_test__cluster__paxos_bench__tests__paxos_ir.snap b/hydro_test/src/cluster/snapshots/hydro_test__cluster__paxos_bench__tests__paxos_ir.snap index c3b6cbc7bbf..b67ba161b10 100644 --- a/hydro_test/src/cluster/snapshots/hydro_test__cluster__paxos_bench__tests__paxos_ir.snap +++ b/hydro_test/src/cluster/snapshots/hydro_test__cluster__paxos_bench__tests__paxos_ir.snap @@ -778,11 +778,8 @@ expression: built.ir() }, }, metadata: HydroNodeMetadata { - location_kind: Tick( - 1, - Cluster( - 0, - ), + location_kind: Cluster( + 0, ), output_type: Some( hydro_test :: cluster :: paxos :: Ballot, @@ -790,11 +787,8 @@ expression: built.ir() }, }, metadata: HydroNodeMetadata { - location_kind: Tick( - 1, - Cluster( - 0, - ), + location_kind: Cluster( + 0, ), output_type: Some( (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos :: Acceptor > , hydro_test :: cluster :: paxos :: Ballot), @@ -1949,11 +1943,8 @@ expression: built.ir() }, }, metadata: HydroNodeMetadata { - location_kind: Tick( - 7, - Cluster( - 2, - ), + location_kind: Cluster( + 2, ), output_type: Some( (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos :: Proposer > , hydro_test :: cluster :: kv_replica :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: bench_client :: Client > , u32) >), @@ -4727,11 +4718,8 @@ expression: built.ir() }, }, metadata: HydroNodeMetadata { - location_kind: Tick( - 0, - Cluster( - 2, - ), + location_kind: Cluster( + 2, ), output_type: Some( core :: time :: Duration, @@ -4739,11 +4727,8 @@ expression: built.ir() }, }, metadata: HydroNodeMetadata { - location_kind: Tick( - 0, - Cluster( - 2, - ), + location_kind: Cluster( + 2, ), output_type: Some( core :: time :: Duration, @@ -4751,11 +4736,8 @@ expression: built.ir() }, }, metadata: HydroNodeMetadata { - location_kind: Tick( - 0, - Cluster( - 2, - ), + location_kind: Cluster( + 2, ), output_type: Some( (std :: rc :: Rc < core :: cell :: RefCell < std :: vec :: Vec < core :: time :: Duration > > > , usize), @@ -4763,11 +4745,8 @@ expression: built.ir() }, }, metadata: HydroNodeMetadata { - location_kind: Tick( - 0, - Cluster( - 2, - ), + location_kind: Cluster( + 2, ), output_type: Some( std :: rc :: Rc < core :: cell :: RefCell < std :: vec :: Vec < core :: time :: Duration > > >, @@ -4930,11 +4909,8 @@ expression: built.ir() }, }, metadata: HydroNodeMetadata { - location_kind: Tick( - 0, - Cluster( - 2, - ), + location_kind: Cluster( + 2, ), output_type: Some( usize, @@ -4942,11 +4918,8 @@ expression: built.ir() }, }, metadata: HydroNodeMetadata { - location_kind: Tick( - 0, - Cluster( - 2, - ), + location_kind: Cluster( + 2, ), output_type: Some( (std :: rc :: Rc < core :: cell :: RefCell < std :: vec :: Vec < core :: time :: Duration > > > , usize), diff --git a/hydro_test/src/cluster/two_pc.rs b/hydro_test/src/cluster/two_pc.rs index 2f1f998efd3..45a8bde254b 100644 --- a/hydro_test/src/cluster/two_pc.rs +++ b/hydro_test/src/cluster/two_pc.rs @@ -61,7 +61,7 @@ pub fn two_pc<'a>( t, if reply == "commit" { Ok(()) } else { Err(id) } ))) - .timestamped(&coordinator_tick), + .atomic(&coordinator_tick), num_participants as usize, num_participants as usize, ); diff --git a/hydro_test_local/src/local/chat_app.rs b/hydro_test_local/src/local/chat_app.rs index de4391d96ce..4dcdc99fe51 100644 --- a/hydro_test_local/src/local/chat_app.rs +++ b/hydro_test_local/src/local/chat_app.rs @@ -19,23 +19,20 @@ pub fn chat_app<'a>( let users = unsafe { // SAFETY: intentionally non-deterministic to not send messaged // to users that joined after the message was sent - process - .source_stream(users_stream) - .timestamped(&tick) - .tick_batch() + process.source_stream(users_stream).tick_batch(&tick) } .persist(); let messages = process.source_stream(messages); let messages = if replay_messages { unsafe { // SAFETY: see above - messages.timestamped(&tick).tick_batch() + messages.tick_batch(&tick) } .persist() } else { unsafe { // SAFETY: see above - messages.timestamped(&tick).tick_batch() + messages.tick_batch(&tick) } }; diff --git a/hydro_test_local/src/local/compute_pi.rs b/hydro_test_local/src/local/compute_pi.rs index 0d900514d61..18b99fc8f95 100644 --- a/hydro_test_local/src/local/compute_pi.rs +++ b/hydro_test_local/src/local/compute_pi.rs @@ -23,8 +23,7 @@ pub fn compute_pi<'a>(flow: &FlowBuilder<'a>, batch_size: RuntimeData) -> *total += 1; }), ) - .all_ticks() - .drop_timestamp(); + .all_ticks(); let estimate = trials.reduce(q!(|(inside, total), (inside_batch, total_batch)| { *inside += inside_batch; diff --git a/hydro_test_local/src/local/count_elems.rs b/hydro_test_local/src/local/count_elems.rs index 48c51b803cb..66c3012be05 100644 --- a/hydro_test_local/src/local/count_elems.rs +++ b/hydro_test_local/src/local/count_elems.rs @@ -16,7 +16,7 @@ pub fn count_elems_generic<'a, T: 'a>( let source = process.source_stream(input_stream); let count = unsafe { // SAFETY: intentionally using ticks - source.map(q!(|_| 1)).timestamped(&tick).tick_batch() + source.map(q!(|_| 1)).tick_batch(&tick) } .fold(q!(|| 0), q!(|a, b| *a += b)) .all_ticks(); diff --git a/hydro_test_local/src/local/graph_reachability.rs b/hydro_test_local/src/local/graph_reachability.rs index 2728c55bf6d..996ff39b760 100644 --- a/hydro_test_local/src/local/graph_reachability.rs +++ b/hydro_test_local/src/local/graph_reachability.rs @@ -22,17 +22,14 @@ pub fn graph_reachability<'a>( let reached = unsafe { // SAFETY: roots can be inserted on any tick because we are fixpointing - roots - .timestamped(&reachability_tick) - .tick_batch() - .chain(reached_cycle) + roots.tick_batch(&reachability_tick).chain(reached_cycle) }; let reachable = reached .clone() .map(q!(|r| (r, ()))) .join(unsafe { // SAFETY: edges can be inserted on any tick because we are fixpointing - edges.timestamped(&reachability_tick).tick_batch().persist() + edges.tick_batch(&reachability_tick).persist() }) .map(q!(|(_from, (_, to))| to)); set_reached_cycle.complete_next_tick(reached.clone().chain(reachable)); diff --git a/hydro_test_local/src/local/negation.rs b/hydro_test_local/src/local/negation.rs index 779480e9882..6cdf4b29c8b 100644 --- a/hydro_test_local/src/local/negation.rs +++ b/hydro_test_local/src/local/negation.rs @@ -16,10 +16,7 @@ pub fn test_difference<'a>( let mut source = unsafe { // SAFETY: intentionally using ticks - process - .source_iter(q!(0..5)) - .timestamped(&tick) - .tick_batch() + process.source_iter(q!(0..5)).tick_batch(&tick) }; if persist1 { source = source.persist(); @@ -27,10 +24,7 @@ pub fn test_difference<'a>( let mut source2 = unsafe { // SAFETY: intentionally using ticks - process - .source_iter(q!(3..6)) - .timestamped(&tick) - .tick_batch() + process.source_iter(q!(3..6)).tick_batch(&tick) }; if persist2 { source2 = source2.persist(); @@ -58,8 +52,7 @@ pub fn test_anti_join<'a>( process .source_iter(q!(0..5)) .map(q!(|v| (v, v))) - .timestamped(&tick) - .tick_batch() + .tick_batch(&tick) }; if persist1 { source = source.persist(); @@ -67,10 +60,7 @@ pub fn test_anti_join<'a>( let mut source2 = unsafe { // SAFETY: intentionally using ticks - process - .source_iter(q!(3..6)) - .timestamped(&tick) - .tick_batch() + process.source_iter(q!(3..6)).tick_batch(&tick) }; if persist2 { source2 = source2.persist(); diff --git a/hydro_test_local/src/local/teed_join.rs b/hydro_test_local/src/local/teed_join.rs index 37fab33411d..9f868ce8285 100644 --- a/hydro_test_local/src/local/teed_join.rs +++ b/hydro_test_local/src/local/teed_join.rs @@ -23,10 +23,7 @@ pub fn teed_join<'a, S: Stream + Unpin + 'a>( let source = unsafe { // SAFETY: intentionally using ticks - node_zero - .source_stream(input_stream) - .timestamped(&n0_tick) - .tick_batch() + node_zero.source_stream(input_stream).tick_batch(&n0_tick) }; let map1 = source.clone().map(q!(|v| (v + 1, ()))); let map2 = source.map(q!(|v| (v - 1, ()))); diff --git a/stageleft/src/type_name.rs b/stageleft/src/type_name.rs index dc02417d44c..19a4a011740 100644 --- a/stageleft/src/type_name.rs +++ b/stageleft/src/type_name.rs @@ -22,6 +22,8 @@ static PRIVATE_REEXPORTS: ReexportsSet = LazyLock::new(|| { vec!["std", "collections", "hash_set"], ), (vec!["std", "vec", "into_iter"], vec!["std", "vec"]), + (vec!["tokio", "time", "instant"], vec!["tokio", "time"]), + (vec!["bytes", "bytes"], vec!["bytes"]), ]) });