Skip to content

Commit

Permalink
refactor(hydro_lang)!: rename _interleaved to _anonymous (#1695)
Browse files Browse the repository at this point in the history
Also address docs feedback for streams.
  • Loading branch information
shadaj authored Feb 3, 2025
1 parent 2fc071e commit 41e5bb9
Show file tree
Hide file tree
Showing 11 changed files with 36 additions and 30 deletions.
38 changes: 22 additions & 16 deletions docs/docs/hydro/live-collections/streams.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ sidebar_position: 2
---

# Streams
Streams are the most common type of live collection in Hydro; they can be used to model streaming data collections, a feed of API requests, or even time-based intervals. A `Stream` represents a sequence of elements, with new elements being asynchronously appended to the end of the sequence. Streams can be transformed using APIs like `map` and `filter`, based on Rust [iterators](https://doc.rust-lang.org/beta/std/iter/trait.Iterator.html). You can view the full API documentation for Streams [here](pathname:///rustdoc/hydro_lang/stream/struct.Stream).
Streams are the most common type of live collection in Hydro; they can be used to model streaming data collections or a feed of API requests. A `Stream` represents a sequence of elements, with new elements being asynchronously appended to the end of the sequence. Streams can be transformed using APIs like `map` and `filter`, based on Rust [iterators](https://doc.rust-lang.org/beta/std/iter/trait.Iterator.html). You can view the full API documentation for Streams [here](pathname:///rustdoc/hydro_lang/stream/struct.Stream).

Streams have several type parameters:
- `T`: the type of elements in the stream
Expand All @@ -13,14 +13,14 @@ Streams have several type parameters:
- This type parameter is _optional_; by default the order is deterministic

## Creating a Stream
The simplest way to create a stream is to use the [`source_iter`](https://hydro.run/rustdoc/hydro_lang/location/trait.Location#method.source_iter) method on a location, which creates a stream from any Rust type that can be converted into an [`Iterator`](https://doc.rust-lang.org/beta/std/iter/trait.Iterator.html) (via [`IntoIterator`](https://doc.rust-lang.org/std/iter/trait.IntoIterator.html)). For example, we can create a stream of integers on a [process](../locations/processes.md) and transform it:
The simplest way to create a stream is to use [`Location::source_iter`](https://hydro.run/rustdoc/hydro_lang/location/trait.Location#method.source_iter), which creates a stream from any Rust type that can be converted into an [`Iterator`](https://doc.rust-lang.org/beta/std/iter/trait.Iterator.html) (via [`IntoIterator`](https://doc.rust-lang.org/std/iter/trait.IntoIterator.html)). For example, we can create a stream of integers on a [process](../locations/processes.md) and transform it:

```rust
# use hydro_lang::*;
# use dfir_rs::futures::StreamExt;
# tokio_test::block_on(test_util::multi_location_test(|flow, p_out| {
let process = flow.process::<()>();
let numbers: Stream<_, Process<_>, _> = process
let numbers: Stream<_, Process<_>, Unbounded> = process
.source_iter(q!(vec![1, 2, 3]))
.map(q!(|x| x + 1));
// 2, 3, 4
Expand All @@ -32,16 +32,16 @@ let numbers: Stream<_, Process<_>, _> = process
# }));
```

Streams also can be sent over the network to form distributed programs. Under the hood, sending a stream sets up an RPC handler at the target location that will receive the stream elements. For example, we can send a stream of integers from one process to another with [bincode](https://docs.rs/bincode/latest/bincode/) serialization:
Streams also can be sent over the network to participate in distributed programs. Under the hood, sending a stream sets up an RPC handler at the target location that will receive the stream elements. For example, we can send a stream of integers from one process to another with [bincode](https://docs.rs/bincode/latest/bincode/) serialization:

```rust
# use hydro_lang::*;
# use dfir_rs::futures::StreamExt;
# tokio_test::block_on(test_util::multi_location_test(|flow, p_out| {
let p1 = flow.process::<()>();
let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![1, 2, 3]));
let numbers: Stream<_, Process<_>, Unbounded> = p1.source_iter(q!(vec![1, 2, 3]));
let p2 = flow.process::<()>();
let on_p2: Stream<_, Process<_>, _> = numbers.send_bincode(&p2);
let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send_bincode(&p2);
// 1, 2, 3
# on_p2.send_bincode(&p_out)
# }, |mut stream| async move {
Expand All @@ -62,41 +62,41 @@ If we send a stream from a cluster to a process, the return type will be a strea
# use hydro_lang::*;
# let flow = FlowBuilder::new();
let workers: Cluster<()> = flow.cluster::<()>();
let numbers: Stream<_, Cluster<_>, _, TotalOrder> =
let numbers: Stream<_, Cluster<_>, Unbounded, TotalOrder> =
workers.source_iter(q!(vec![1, 2, 3]));
let process: Process<()> = flow.process::<()>();
let on_p2: Stream<_, Process<_>, _, NoOrder> =
let on_p2: Stream<_, Process<_>, Unbounded, NoOrder> =
numbers.send_bincode(&process);
```

The ordering of a stream determines which APIs are available on it. For example, `map` and `filter` are available on all streams, but `last` is only available on streams with `TotalOrder`. This ensures that even when the network introduces non-determinism, the program will not compile if it tries to use an API that requires a deterministic order.

A particularly common API that faces this restriction is [`fold`](pathname:///rustdoc/hydro_lang/stream/struct.Stream#method.fold) (and [`reduce`](pathname:///rustdoc/hydro_lang/stream/struct.Stream#method.reduce)). These APIs require the stream to have a deterministic order, since the aggregation may depend on the order of elements. For example, the following code will not compile because `fold` is not available on `NoOrder` streams. The error is a bit misleading, but the key part is that `fold` is not available on `NoOrder` streams:
A particularly common API that faces this restriction is [`fold`](pathname:///rustdoc/hydro_lang/stream/struct.Stream#method.fold) (and [`reduce`](pathname:///rustdoc/hydro_lang/stream/struct.Stream#method.reduce)). These APIs require the stream to have a deterministic order, since the result may depend on the order of elements. For example, the following code will not compile because `fold` is not available on `NoOrder` streams (note that the error is a bit misleading due to the Rust compiler attempting to apply `Iterator` methods):

```compile_fail
# use hydro_lang::*;
# let flow = FlowBuilder::new();
let workers: Cluster<()> = flow.cluster::<()>();
let process: Process<()> = flow.process::<()>();
let all_words: Stream<_, Process<_>, _, NoOrder> = workers
let all_words: Stream<_, Process<_>, Unbounded, NoOrder> = workers
.source_iter(q!(vec!["hello", "world"]))
.map(q!(|x| x.to_string()))
.send_bincode_interleaved(&process);
.send_bincode_anonymous(&process);
let words_concat = all_words
.fold(q!(|| "".to_string()), q!(|acc, x| acc += x));
// ^^^^ `hydro_lang::Stream<&str, hydro_lang::Process<'_>, hydro_lang::Unbounded, NoOrder>` is not an iterator
// ^^^^ error: `hydro_lang::Stream<String, hydro_lang::Process<'_>, hydro_lang::Unbounded, NoOrder>` is not an iterator
```

:::tip

We use `send_bincode_interleaved` here to drop the cluster IDs which are included in `send_bincode`. See [Clusters](../locations/clusters.md) for more details.
We use `send_bincode_anonymous` here to drop the cluster IDs which are included in `send_bincode`. See [Clusters](../locations/clusters.md) for more details.

You'll notice that we aggregated an **asynchronously** updated stream, so the result is a `Singleton` live collection. For more details on the semantics of singletons, including how they are updated when new inputs arrive, see [Singletons and Optionals](./singletons-optionals.md).
Running an aggregation (`fold`, `reduce`) converts a `Stream` into a `Singleton`, as we see in the type signature here. The `Singleton` type is still "live" in the sense of a [Live Collection](./index.md), so updates to the `Stream` input cause updates to the `Singleton` output. See [Singletons and Optionals](./singletons-optionals.md) for more information.

:::

To perform an aggregation with an unordered stream, you must use [`fold_commutative`](pathname:///rustdoc/hydro_lang/stream/struct.Stream#method.fold_commutative), which requires the aggregation function to be commutative (and therefore immune to non-deterministic ordering):
To perform an aggregation with an unordered stream, you must use [`fold_commutative`](pathname:///rustdoc/hydro_lang/stream/struct.Stream#method.fold_commutative), which requires the provided closure to be commutative (and therefore immune to non-deterministic ordering):

```rust,no_run
# use hydro_lang::*;
Expand All @@ -107,11 +107,17 @@ To perform an aggregation with an unordered stream, you must use [`fold_commutat
# let all_words: Stream<_, Process<_>, _, NoOrder> = workers
# .source_iter(q!(vec!["hello", "world"]))
# .map(q!(|x| x.to_string()))
# .send_bincode_interleaved(&process);
# .send_bincode_anonymous(&process);
let words_count = all_words
.fold_commutative(q!(|| 0), q!(|acc, x| *acc += 1));
```

:::danger

Developers are responsible for the commutativity of the closure they pass into `*_commutative` methods. In the future, commutativity checks will be automatically provided by the compiler (via tools like [Kani](https://github.com/model-checking/kani)).

:::

## Bounded and Unbounded Streams

:::caution
Expand Down
6 changes: 3 additions & 3 deletions docs/docs/hydro/locations/clusters.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@ numbers.send_bincode(&process)

:::tip

If you do not need to know _which_ member of the cluster the data came from, you can use the `send_bincode_interleaved` method instead, which will drop the IDs at the receiver:
If you do not need to know _which_ member of the cluster the data came from, you can use the `send_bincode_anonymous` method instead, which will drop the IDs at the receiver:

```rust
# use hydro_lang::*;
# use dfir_rs::futures::StreamExt;
# tokio_test::block_on(test_util::multi_location_test(|flow, process| {
# let workers: Cluster<()> = flow.cluster::<()>();
let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
numbers.send_bincode_interleaved(&process)
numbers.send_bincode_anonymous(&process)
# }, |mut stream| async move {
// if there are 4 members in the cluster, we should receive 4 elements
// 1, 1, 1, 1
Expand Down Expand Up @@ -166,7 +166,7 @@ let self_id_stream = workers.source_iter(q!([CLUSTER_SELF_ID]));
self_id_stream
.filter(q!(|x| x.raw_id % 2 == 0))
.map(q!(|x| format!("hello from {}", x.raw_id)))
.send_bincode_interleaved(&process)
.send_bincode_anonymous(&process)
// if there are 4 members in the cluster, we should receive 2 elements
// "hello from 0", "hello from 2"
# }, |mut stream| async move {
Expand Down
2 changes: 1 addition & 1 deletion docs/docs/hydro/quickstart/clusters.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ On each cluster member, we will then do some work to transform the data (using `

<CodeBlock language="rust" title="src/first_ten_cluster.rs">{getLines(firstTenClusterSrc, 10, 11)}</CodeBlock>

Finally, we will send the data back to the leader. We achieve this using a variant of the APIs from before: `send_bincode_interleaved`. If we used `send_bincode`, we would get a stream of `(cluster ID, data)` tuples. Since it is a common pattern to ignore the IDs, `send_bincode_interleaved` is available as a helper.
Finally, we will send the data back to the leader. We achieve this using a variant of the APIs from before: `send_bincode_anonymous`. If we used `send_bincode`, we would get a stream of `(cluster ID, data)` tuples. Since it is a common pattern to ignore the IDs, `send_bincode_anonymous` is available as a helper.

<CodeBlock language="rust" title="src/first_ten_cluster.rs">{getLines(firstTenClusterSrc, 12, 14)}</CodeBlock>

Expand Down
2 changes: 1 addition & 1 deletion hydro_lang/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1926,7 +1926,7 @@ impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream<T, L, B, Order> {
}
}

pub fn send_bincode_interleaved<L2: Location<'a>, Tag, CoreType>(
pub fn send_bincode_anonymous<L2: Location<'a>, Tag, CoreType>(
self,
other: &L2,
) -> Stream<CoreType, L2, Unbounded, Order::Min>
Expand Down
4 changes: 2 additions & 2 deletions hydro_std/src/compartmentalize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl<'a, T, C1, C2, Order> PartitionStream<'a, T, C1, C2, Order>
Min = NoOrder,
>,
{
self.map(dist_policy).send_bincode_interleaved(other)
self.map(dist_policy).send_bincode_anonymous(other)
}
}

Expand Down Expand Up @@ -78,7 +78,7 @@ impl<'a, T, C1, B, Order> DecoupleClusterStream<'a, T, C1, B, Order>
ClusterId::from_raw(CLUSTER_SELF_ID.raw_id),
b.clone()
)))
.send_bincode_interleaved(other);
.send_bincode_anonymous(other);

unsafe {
// SAFETY: this is safe because we are mapping clusters 1:1
Expand Down
2 changes: 1 addition & 1 deletion hydro_test/src/cluster/compute_pi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub fn compute_pi<'a>(
.all_ticks();

let estimate = trials
.send_bincode_interleaved(&process)
.send_bincode_anonymous(&process)
.reduce_commutative(q!(|(inside, total), (inside_batch, total_batch)| {
*inside += inside_batch;
*total += total_batch;
Expand Down
2 changes: 1 addition & 1 deletion hydro_test/src/cluster/map_reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub fn map_reduce<'a>(flow: &FlowBuilder<'a>) -> (Process<'a, Leader>, Cluster<'
string, count
)))
.all_ticks()
.send_bincode_interleaved(&process);
.send_bincode_anonymous(&process);

unsafe {
// SAFETY: addition is associative so we can batch reduce
Expand Down
4 changes: 2 additions & 2 deletions hydro_test/src/cluster/paxos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ fn acceptor_p1<'a, L: Serialize + DeserializeOwned + Clone>(
)
)))
.all_ticks()
.send_bincode_interleaved(proposers),
.send_bincode_anonymous(proposers),
)
}

Expand Down Expand Up @@ -826,6 +826,6 @@ fn acceptor_p2<'a, P: PaxosPayload, R>(
)
)))
.all_ticks()
.send_bincode_interleaved(proposers);
.send_bincode_anonymous(proposers);
(a_log, a_to_proposers_p2b)
}
2 changes: 1 addition & 1 deletion hydro_test/src/cluster/paxos_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub fn paxos_bench<'a>(
payload.value.0,
((payload.key, payload.value.1), Ok(()))
)))
.send_bincode_interleaved(&clients);
.send_bincode_anonymous(&clients);

// we only mark a transaction as committed when all replicas have applied it
collect_quorum::<_, _, _, ()>(
Expand Down
2 changes: 1 addition & 1 deletion hydro_test/src/cluster/paxos_with_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub unsafe fn paxos_with_client<'a, C: 'a, R, P: PaxosPayload>(
all_payloads.cross_singleton(latest_leader).all_ticks()
}
.map(q!(move |(payload, leader_id)| (leader_id, payload)))
.send_bincode_interleaved(proposers);
.send_bincode_anonymous(proposers);

let payloads_at_proposer = {
// SAFETY: documented non-determinism in interleaving of client payloads
Expand Down
2 changes: 1 addition & 1 deletion template/hydro/src/first_ten_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub fn first_ten_cluster<'a>(leader: &Process<'a, Leader>, workers: &Cluster<'a,
.round_robin_bincode(workers) // : Stream<i32, Cluster<Worker>, ...>
.map(q!(|n| n * 2)) // : Stream<i32, Cluster<Worker>, ...>
.inspect(q!(|n| println!("{}", n))) // : Stream<i32, Cluster<Worker>, ...>
.send_bincode_interleaved(leader) // : Stream<i32, Process<Leader>, ...>
.send_bincode_anonymous(leader) // : Stream<i32, Process<Leader>, ...>
.for_each(q!(|n| println!("{}", n)));
}

Expand Down

0 comments on commit 41e5bb9

Please sign in to comment.