diff --git a/docs/docs/hydro/live-collections/streams.md b/docs/docs/hydro/live-collections/streams.md index 310991b02d3..fce32bbbf0b 100644 --- a/docs/docs/hydro/live-collections/streams.md +++ b/docs/docs/hydro/live-collections/streams.md @@ -3,7 +3,7 @@ sidebar_position: 2 --- # Streams -Streams are the most common type of live collection in Hydro; they can be used to model streaming data collections, a feed of API requests, or even time-based intervals. A `Stream` represents a sequence of elements, with new elements being asynchronously appended to the end of the sequence. Streams can be transformed using APIs like `map` and `filter`, based on Rust [iterators](https://doc.rust-lang.org/beta/std/iter/trait.Iterator.html). You can view the full API documentation for Streams [here](pathname:///rustdoc/hydro_lang/stream/struct.Stream). +Streams are the most common type of live collection in Hydro; they can be used to model streaming data collections or a feed of API requests. A `Stream` represents a sequence of elements, with new elements being asynchronously appended to the end of the sequence. Streams can be transformed using APIs like `map` and `filter`, based on Rust [iterators](https://doc.rust-lang.org/beta/std/iter/trait.Iterator.html). You can view the full API documentation for Streams [here](pathname:///rustdoc/hydro_lang/stream/struct.Stream). Streams have several type parameters: - `T`: the type of elements in the stream @@ -13,14 +13,14 @@ Streams have several type parameters: - This type parameter is _optional_; by default the order is deterministic ## Creating a Stream -The simplest way to create a stream is to use the [`source_iter`](https://hydro.run/rustdoc/hydro_lang/location/trait.Location#method.source_iter) method on a location, which creates a stream from any Rust type that can be converted into an [`Iterator`](https://doc.rust-lang.org/beta/std/iter/trait.Iterator.html) (via [`IntoIterator`](https://doc.rust-lang.org/std/iter/trait.IntoIterator.html)). For example, we can create a stream of integers on a [process](../locations/processes.md) and transform it: +The simplest way to create a stream is to use [`Location::source_iter`](https://hydro.run/rustdoc/hydro_lang/location/trait.Location#method.source_iter), which creates a stream from any Rust type that can be converted into an [`Iterator`](https://doc.rust-lang.org/beta/std/iter/trait.Iterator.html) (via [`IntoIterator`](https://doc.rust-lang.org/std/iter/trait.IntoIterator.html)). For example, we can create a stream of integers on a [process](../locations/processes.md) and transform it: ```rust # use hydro_lang::*; # use dfir_rs::futures::StreamExt; # tokio_test::block_on(test_util::multi_location_test(|flow, p_out| { let process = flow.process::<()>(); -let numbers: Stream<_, Process<_>, _> = process +let numbers: Stream<_, Process<_>, Unbounded> = process .source_iter(q!(vec![1, 2, 3])) .map(q!(|x| x + 1)); // 2, 3, 4 @@ -32,16 +32,16 @@ let numbers: Stream<_, Process<_>, _> = process # })); ``` -Streams also can be sent over the network to form distributed programs. Under the hood, sending a stream sets up an RPC handler at the target location that will receive the stream elements. For example, we can send a stream of integers from one process to another with [bincode](https://docs.rs/bincode/latest/bincode/) serialization: +Streams also can be sent over the network to participate in distributed programs. Under the hood, sending a stream sets up an RPC handler at the target location that will receive the stream elements. For example, we can send a stream of integers from one process to another with [bincode](https://docs.rs/bincode/latest/bincode/) serialization: ```rust # use hydro_lang::*; # use dfir_rs::futures::StreamExt; # tokio_test::block_on(test_util::multi_location_test(|flow, p_out| { let p1 = flow.process::<()>(); -let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![1, 2, 3])); +let numbers: Stream<_, Process<_>, Unbounded> = p1.source_iter(q!(vec![1, 2, 3])); let p2 = flow.process::<()>(); -let on_p2: Stream<_, Process<_>, _> = numbers.send_bincode(&p2); +let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send_bincode(&p2); // 1, 2, 3 # on_p2.send_bincode(&p_out) # }, |mut stream| async move { @@ -62,41 +62,41 @@ If we send a stream from a cluster to a process, the return type will be a strea # use hydro_lang::*; # let flow = FlowBuilder::new(); let workers: Cluster<()> = flow.cluster::<()>(); -let numbers: Stream<_, Cluster<_>, _, TotalOrder> = +let numbers: Stream<_, Cluster<_>, Unbounded, TotalOrder> = workers.source_iter(q!(vec![1, 2, 3])); let process: Process<()> = flow.process::<()>(); -let on_p2: Stream<_, Process<_>, _, NoOrder> = +let on_p2: Stream<_, Process<_>, Unbounded, NoOrder> = numbers.send_bincode(&process); ``` The ordering of a stream determines which APIs are available on it. For example, `map` and `filter` are available on all streams, but `last` is only available on streams with `TotalOrder`. This ensures that even when the network introduces non-determinism, the program will not compile if it tries to use an API that requires a deterministic order. -A particularly common API that faces this restriction is [`fold`](pathname:///rustdoc/hydro_lang/stream/struct.Stream#method.fold) (and [`reduce`](pathname:///rustdoc/hydro_lang/stream/struct.Stream#method.reduce)). These APIs require the stream to have a deterministic order, since the aggregation may depend on the order of elements. For example, the following code will not compile because `fold` is not available on `NoOrder` streams. The error is a bit misleading, but the key part is that `fold` is not available on `NoOrder` streams: +A particularly common API that faces this restriction is [`fold`](pathname:///rustdoc/hydro_lang/stream/struct.Stream#method.fold) (and [`reduce`](pathname:///rustdoc/hydro_lang/stream/struct.Stream#method.reduce)). These APIs require the stream to have a deterministic order, since the result may depend on the order of elements. For example, the following code will not compile because `fold` is not available on `NoOrder` streams (note that the error is a bit misleading due to the Rust compiler attempting to apply `Iterator` methods): ```compile_fail # use hydro_lang::*; # let flow = FlowBuilder::new(); let workers: Cluster<()> = flow.cluster::<()>(); let process: Process<()> = flow.process::<()>(); -let all_words: Stream<_, Process<_>, _, NoOrder> = workers +let all_words: Stream<_, Process<_>, Unbounded, NoOrder> = workers .source_iter(q!(vec!["hello", "world"])) .map(q!(|x| x.to_string())) - .send_bincode_interleaved(&process); + .send_bincode_anonymous(&process); let words_concat = all_words .fold(q!(|| "".to_string()), q!(|acc, x| acc += x)); -// ^^^^ `hydro_lang::Stream<&str, hydro_lang::Process<'_>, hydro_lang::Unbounded, NoOrder>` is not an iterator +// ^^^^ error: `hydro_lang::Stream, hydro_lang::Unbounded, NoOrder>` is not an iterator ``` :::tip -We use `send_bincode_interleaved` here to drop the cluster IDs which are included in `send_bincode`. See [Clusters](../locations/clusters.md) for more details. +We use `send_bincode_anonymous` here to drop the cluster IDs which are included in `send_bincode`. See [Clusters](../locations/clusters.md) for more details. -You'll notice that we aggregated an **asynchronously** updated stream, so the result is a `Singleton` live collection. For more details on the semantics of singletons, including how they are updated when new inputs arrive, see [Singletons and Optionals](./singletons-optionals.md). +Running an aggregation (`fold`, `reduce`) converts a `Stream` into a `Singleton`, as we see in the type signature here. The `Singleton` type is still "live" in the sense of a [Live Collection](./index.md), so updates to the `Stream` input cause updates to the `Singleton` output. See [Singletons and Optionals](./singletons-optionals.md) for more information. ::: -To perform an aggregation with an unordered stream, you must use [`fold_commutative`](pathname:///rustdoc/hydro_lang/stream/struct.Stream#method.fold_commutative), which requires the aggregation function to be commutative (and therefore immune to non-deterministic ordering): +To perform an aggregation with an unordered stream, you must use [`fold_commutative`](pathname:///rustdoc/hydro_lang/stream/struct.Stream#method.fold_commutative), which requires the provided closure to be commutative (and therefore immune to non-deterministic ordering): ```rust,no_run # use hydro_lang::*; @@ -107,11 +107,17 @@ To perform an aggregation with an unordered stream, you must use [`fold_commutat # let all_words: Stream<_, Process<_>, _, NoOrder> = workers # .source_iter(q!(vec!["hello", "world"])) # .map(q!(|x| x.to_string())) -# .send_bincode_interleaved(&process); +# .send_bincode_anonymous(&process); let words_count = all_words .fold_commutative(q!(|| 0), q!(|acc, x| *acc += 1)); ``` +:::danger + +Developers are responsible for the commutativity of the closure they pass into `*_commutative` methods. In the future, commutativity checks will be automatically provided by the compiler (via tools like [Kani](https://github.com/model-checking/kani)). + +::: + ## Bounded and Unbounded Streams :::caution diff --git a/docs/docs/hydro/locations/clusters.md b/docs/docs/hydro/locations/clusters.md index a644effd20e..9eeb278f1a2 100644 --- a/docs/docs/hydro/locations/clusters.md +++ b/docs/docs/hydro/locations/clusters.md @@ -49,7 +49,7 @@ numbers.send_bincode(&process) :::tip -If you do not need to know _which_ member of the cluster the data came from, you can use the `send_bincode_interleaved` method instead, which will drop the IDs at the receiver: +If you do not need to know _which_ member of the cluster the data came from, you can use the `send_bincode_anonymous` method instead, which will drop the IDs at the receiver: ```rust # use hydro_lang::*; @@ -57,7 +57,7 @@ If you do not need to know _which_ member of the cluster the data came from, you # tokio_test::block_on(test_util::multi_location_test(|flow, process| { # let workers: Cluster<()> = flow.cluster::<()>(); let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1])); -numbers.send_bincode_interleaved(&process) +numbers.send_bincode_anonymous(&process) # }, |mut stream| async move { // if there are 4 members in the cluster, we should receive 4 elements // 1, 1, 1, 1 @@ -166,7 +166,7 @@ let self_id_stream = workers.source_iter(q!([CLUSTER_SELF_ID])); self_id_stream .filter(q!(|x| x.raw_id % 2 == 0)) .map(q!(|x| format!("hello from {}", x.raw_id))) - .send_bincode_interleaved(&process) + .send_bincode_anonymous(&process) // if there are 4 members in the cluster, we should receive 2 elements // "hello from 0", "hello from 2" # }, |mut stream| async move { diff --git a/docs/docs/hydro/quickstart/clusters.mdx b/docs/docs/hydro/quickstart/clusters.mdx index 17772809b74..815e200d051 100644 --- a/docs/docs/hydro/quickstart/clusters.mdx +++ b/docs/docs/hydro/quickstart/clusters.mdx @@ -32,7 +32,7 @@ On each cluster member, we will then do some work to transform the data (using ` {getLines(firstTenClusterSrc, 10, 11)} -Finally, we will send the data back to the leader. We achieve this using a variant of the APIs from before: `send_bincode_interleaved`. If we used `send_bincode`, we would get a stream of `(cluster ID, data)` tuples. Since it is a common pattern to ignore the IDs, `send_bincode_interleaved` is available as a helper. +Finally, we will send the data back to the leader. We achieve this using a variant of the APIs from before: `send_bincode_anonymous`. If we used `send_bincode`, we would get a stream of `(cluster ID, data)` tuples. Since it is a common pattern to ignore the IDs, `send_bincode_anonymous` is available as a helper. {getLines(firstTenClusterSrc, 12, 14)} diff --git a/hydro_lang/src/stream.rs b/hydro_lang/src/stream.rs index 3cb2092601f..f40704d68d8 100644 --- a/hydro_lang/src/stream.rs +++ b/hydro_lang/src/stream.rs @@ -1926,7 +1926,7 @@ impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream { } } - pub fn send_bincode_interleaved, Tag, CoreType>( + pub fn send_bincode_anonymous, Tag, CoreType>( self, other: &L2, ) -> Stream diff --git a/hydro_std/src/compartmentalize.rs b/hydro_std/src/compartmentalize.rs index 83be82d239f..f8cf821230a 100644 --- a/hydro_std/src/compartmentalize.rs +++ b/hydro_std/src/compartmentalize.rs @@ -40,7 +40,7 @@ impl<'a, T, C1, C2, Order> PartitionStream<'a, T, C1, C2, Order> Min = NoOrder, >, { - self.map(dist_policy).send_bincode_interleaved(other) + self.map(dist_policy).send_bincode_anonymous(other) } } @@ -78,7 +78,7 @@ impl<'a, T, C1, B, Order> DecoupleClusterStream<'a, T, C1, B, Order> ClusterId::from_raw(CLUSTER_SELF_ID.raw_id), b.clone() ))) - .send_bincode_interleaved(other); + .send_bincode_anonymous(other); unsafe { // SAFETY: this is safe because we are mapping clusters 1:1 diff --git a/hydro_test/src/cluster/compute_pi.rs b/hydro_test/src/cluster/compute_pi.rs index ae649e7c00a..553a9db2555 100644 --- a/hydro_test/src/cluster/compute_pi.rs +++ b/hydro_test/src/cluster/compute_pi.rs @@ -30,7 +30,7 @@ pub fn compute_pi<'a>( .all_ticks(); let estimate = trials - .send_bincode_interleaved(&process) + .send_bincode_anonymous(&process) .reduce_commutative(q!(|(inside, total), (inside_batch, total_batch)| { *inside += inside_batch; *total += total_batch; diff --git a/hydro_test/src/cluster/map_reduce.rs b/hydro_test/src/cluster/map_reduce.rs index 64a58bcb230..3645731ee9e 100644 --- a/hydro_test/src/cluster/map_reduce.rs +++ b/hydro_test/src/cluster/map_reduce.rs @@ -25,7 +25,7 @@ pub fn map_reduce<'a>(flow: &FlowBuilder<'a>) -> (Process<'a, Leader>, Cluster<' string, count ))) .all_ticks() - .send_bincode_interleaved(&process); + .send_bincode_anonymous(&process); unsafe { // SAFETY: addition is associative so we can batch reduce diff --git a/hydro_test/src/cluster/paxos.rs b/hydro_test/src/cluster/paxos.rs index 93cb6b90fa0..fd5205d365b 100644 --- a/hydro_test/src/cluster/paxos.rs +++ b/hydro_test/src/cluster/paxos.rs @@ -417,7 +417,7 @@ fn acceptor_p1<'a, L: Serialize + DeserializeOwned + Clone>( ) ))) .all_ticks() - .send_bincode_interleaved(proposers), + .send_bincode_anonymous(proposers), ) } @@ -826,6 +826,6 @@ fn acceptor_p2<'a, P: PaxosPayload, R>( ) ))) .all_ticks() - .send_bincode_interleaved(proposers); + .send_bincode_anonymous(proposers); (a_log, a_to_proposers_p2b) } diff --git a/hydro_test/src/cluster/paxos_bench.rs b/hydro_test/src/cluster/paxos_bench.rs index f5a6c062c2d..77762643284 100644 --- a/hydro_test/src/cluster/paxos_bench.rs +++ b/hydro_test/src/cluster/paxos_bench.rs @@ -63,7 +63,7 @@ pub fn paxos_bench<'a>( payload.value.0, ((payload.key, payload.value.1), Ok(())) ))) - .send_bincode_interleaved(&clients); + .send_bincode_anonymous(&clients); // we only mark a transaction as committed when all replicas have applied it collect_quorum::<_, _, _, ()>( diff --git a/hydro_test/src/cluster/paxos_with_client.rs b/hydro_test/src/cluster/paxos_with_client.rs index 029815fb083..6d3a0281d0f 100644 --- a/hydro_test/src/cluster/paxos_with_client.rs +++ b/hydro_test/src/cluster/paxos_with_client.rs @@ -59,7 +59,7 @@ pub unsafe fn paxos_with_client<'a, C: 'a, R, P: PaxosPayload>( all_payloads.cross_singleton(latest_leader).all_ticks() } .map(q!(move |(payload, leader_id)| (leader_id, payload))) - .send_bincode_interleaved(proposers); + .send_bincode_anonymous(proposers); let payloads_at_proposer = { // SAFETY: documented non-determinism in interleaving of client payloads diff --git a/template/hydro/src/first_ten_cluster.rs b/template/hydro/src/first_ten_cluster.rs index 4c94a41415e..570b0173083 100644 --- a/template/hydro/src/first_ten_cluster.rs +++ b/template/hydro/src/first_ten_cluster.rs @@ -9,7 +9,7 @@ pub fn first_ten_cluster<'a>(leader: &Process<'a, Leader>, workers: &Cluster<'a, .round_robin_bincode(workers) // : Stream, ...> .map(q!(|n| n * 2)) // : Stream, ...> .inspect(q!(|n| println!("{}", n))) // : Stream, ...> - .send_bincode_interleaved(leader) // : Stream, ...> + .send_bincode_anonymous(leader) // : Stream, ...> .for_each(q!(|n| println!("{}", n))); }