Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs(hydro_lang): initial docs for streams #1694

Merged
merged 1 commit into from
Feb 1, 2025
Merged

docs(hydro_lang): initial docs for streams #1694

merged 1 commit into from
Feb 1, 2025

Conversation

shadaj
Copy link
Member

@shadaj shadaj commented Jan 31, 2025

No description provided.

Copy link

cloudflare-workers-and-pages bot commented Jan 31, 2025

Deploying hydroflow with  Cloudflare Pages  Cloudflare Pages

Latest commit: 586d1f3
Status: ✅  Deploy successful!
Preview URL: https://be99446a.hydroflow.pages.dev
Branch Preview URL: https://pr1694.hydroflow.pages.dev

View logs

Copy link
Collaborator

@jhellerstein jhellerstein left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approved for speed, but please address comments.

docs/docs/hydro/live-collections/streams.md Show resolved Hide resolved
- `T`: the type of elements in the stream
- `L`: the location the stream is on (see [Locations](../locations/index.md))
- `B`: indicates whether the stream is [bounded or unbounded](./bounded-unbounded.md)
- `Order`: indicates whether the elements in the stream have a deterministic order or not
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is Order a word and the rest are just a character?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I support all type parameters being full words

- This type parameter is _optional_; by default the order is deterministic

## Creating a Stream
The simplest way to create a stream is to use the [`source_iter`](https://hydro.run/rustdoc/hydro_lang/location/trait.Location#method.source_iter) method on a location, which creates a stream from any Rust type that can be converted into an [`Iterator`](https://doc.rust-lang.org/beta/std/iter/trait.Iterator.html) (via [`IntoIterator`](https://doc.rust-lang.org/std/iter/trait.IntoIterator.html)). For example, we can create a stream of integers on a [process](../locations/processes.md) and transform it:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does it mean to "use a method on a location"? That's a bit opaque.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe instead say "source_iter converts any Rust iterator over type T into a Stream<T, L, Bounded, TotalOrder>, where the Location L is ...." and forward ref the TotalOrder part to the section below.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we able to assert that all Rust iterators generate Bounded streams? what about repeat() and the like?

docs/docs/hydro/live-collections/streams.md Show resolved Hide resolved
## Stream Ordering and Determinism
When sending a stream over the network, there are certain situations in which the order of messages will not be deterministic for the receiver. For example, when sending streams from a cluster to a process, delays will cause messages from different cluster members to be interleaved in a non-deterministic order.

To track this behavior, stream have an `Order` type parameter that indicates whether the elements in the stream will have a deterministic order ([`TotalOrder`](pathname:///rustdoc/hydro_lang/stream/struct.TotalOrder)) or not ([`NoOrder`](pathname:///rustdoc/hydro_lang/stream/struct.NoOrder)). When the type parameter is omitted, it defaults to `TotalOrder` for brevity.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that last sentence is quite accurate due to type inference. In some cases we infer NoOrder without a type parameter being provided by the developer.

More generally, in the absence of assertions from elsewhere shouldn't we make a conservative inference of NoOrder?


The ordering of a stream determines which APIs are available on it. For example, `map` and `filter` are available on all streams, but `last` is only available on streams with `TotalOrder`. This ensures that even when the network introduces non-determinism, the program will not compile if it tries to use an API that requires a deterministic order.

A particularly common API that faces this restriction is [`fold`](pathname:///rustdoc/hydro_lang/stream/struct.Stream#method.fold) (and [`reduce`](pathname:///rustdoc/hydro_lang/stream/struct.Stream#method.reduce)). These APIs require the stream to have a deterministic order, since the aggregation may depend on the order of elements. For example, the following code will not compile because `fold` is not available on `NoOrder` streams. The error is a bit misleading, but the key part is that `fold` is not available on `NoOrder` streams:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"the aggregation" --> "the outcome of applying the Rust code in the closure"

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The confusing error code is unfortunate. Are there examples where we have better error codes?

And is this error code an issue we should file?


:::tip

We use `send_bincode_interleaved` here to drop the cluster IDs which are included in `send_bincode`. See [Clusters](../locations/clusters.md) for more details.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a better name for this method is send_bincode_anonymous. There's nothing here about interleaving per se, it's just about whether the sender identity is attached.

This has been bugging me in the demos.

docs/docs/hydro/live-collections/streams.md Show resolved Hide resolved

:::

To perform an aggregation with an unordered stream, you must use [`fold_commutative`](pathname:///rustdoc/hydro_lang/stream/struct.Stream#method.fold_commutative), which requires the aggregation function to be commutative (and therefore immune to non-deterministic ordering):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"requires the aggregation function to be" --> "which asserts that the provided closure is"

two things here:

  1. I'd avoid saying "aggregation", it feels like a SQL term that isn't so Rusty?
  2. Clarify that this is a programmer assertion. Maybe add an alert that says "In future, commutativity checks will be automatically provided by the compiler, but for now developers are responsible for the commutativity of the closure they pass into *_commutative methods."

@shadaj shadaj merged commit be4b6e3 into main Feb 1, 2025
16 checks passed
@shadaj shadaj deleted the pr1694 branch February 1, 2025 02:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants