diff --git a/docs/docs/hydroflow/concepts/cyclic_flows.md b/docs/docs/hydroflow/concepts/cyclic_flows.md new file mode 100644 index 000000000000..e8ac836d52a4 --- /dev/null +++ b/docs/docs/hydroflow/concepts/cyclic_flows.md @@ -0,0 +1,49 @@ +--- +sidebar_position: 2 +--- + +import CodeBlock from '@theme/CodeBlock'; +import exampleCode from '!!raw-loader!../../../../hydroflow/examples/example_5_reachability.rs'; +import exampleCode2 from '!!raw-loader!../../../../hydroflow/examples/example_naturals.rs'; +import { getLines, extractOutput, extractMermaid } from '../../../src/util'; + +# Dataflow Cycles and Fixpoints +Many dataflow libraries only support acyclic flow graphs (DAGs); Hydroflow goes further and supports cycles. Hydroflow's semantics for cyclic flows are based on the formal foundations of recursive queries in the [Datalog](https://en.wikipedia.org/wiki/Datalog) language, which also influenced the design of recursive query features in SQL. + +The basic pattern for cycles in Hydroflow looks something like this: +``` + base = source_() -> ... -> [base]cycle; + cycle = union() + -> ... + -> [next]cycle; +``` +That is, we can trace a cycle of operators in the graph, where one operator is a `union()` that accepts two inputs, one of which is the "back edge" that closes the cycle. + +For a concrete example, we can revisit the code in the [Graph Reachability](../quickstart/example_5_reachability.mdx) quickstart program: + +{getLines(exampleCode, 7, 22)} + +The cycle in that program matches our rough pattern as follows: +``` + origin = source_iter(vec![0]) -> [base]reached_vertices; + reached_vertices = union() -> map(...) + -> [0]my_join_tee + -> ... + -> [next]reached_vertices; +``` + +How should we think about a cycle like this? Intuitively, we can think of the cycle beginning to compute on the data from `base` that comes in via `[0]cycle`. In the Graph Reachability example, this base data corresponds to the origin vertex, `0`. By joining [0] with the `stream_of_edges`, we generate neighbors (1 hop away) and pass them back into the cycle. When one of these is joined again with `stream_of_edges` we get a neighbor of a neighbor (2 hops away). When one of *these* is joined with `stream_of_edges` we get a vertex 3 hops away, and so on. + +If you prefer to think about this program as logic, it represents [Mathematical Induction](https://en.wikipedia.org/wiki/Mathematical_induction) via dataflow: the data from `base` going into `[0]cycle` (i.e. the origin vertex, 0 hops away) is like a "base case", and the data going into `[1]cycle` represents the "induction step" (a vertex *k+1* hops away). (A graph with multiple cycles represents multiple induction, which is a relatively uncommon design pattern in both mathematics and Hydroflow!) + +When does this process end? As with most Hydroflow questions, the answer is not in terms of control flow, but rather in terms of dataflow: *the cycle terminates when it produces no new data*, a notion called a [fixpoint](https://en.wikipedia.org/wiki/Fixed_point_(mathematics)). Our graph reachability example, it terminates when there are no new vertices to visit. Note that the `[join()](../syntax/surface_ops_gen#join)` operator is defined over the *sets* of inputs on each side, and sets +by definition do not contain duplicate values. This prevents the Reachability dataflow from regenerating the same value multiple times. + +Like many looping constructs, it is possible to write a cyclic Hydroflow program that never ``terminates``, in the sense that it produces an unbounded stream of data. If we use `[join_multiset()](../syntax/surface_ops_gen#join_multiset)` instead of `[join()](../syntax/surface_ops_gen#join)` in our Reachability dataflow, the call to `flow.run_available()` never terminates, because each time the same vertex is visited, new data is generated! + +A simpler example of a non-terminating cycle is the following, which specifies the natural numbers: + +{exampleCode2} + +Like any sufficiently powerful language, Hydroflow cannot guarantee that your programs terminate. If you're debugging a non-terminating Hydroflow program, it's a good idea to identify the dataflow cycles and insert an +`[inspect()](../syntax/surface_ops_gen#inspect)` operator along an edge of the cycle to see if it's generating unbounded amounts of duplicated data. You can use the `[unique()](../syntax/surface_ops_gen#unique)` operator to ameliorate the problem. \ No newline at end of file diff --git a/docs/docs/hydroflow/concepts/debugging.md b/docs/docs/hydroflow/concepts/debugging.md index 083d1ec6ba05..1b1fc22b6d6f 100644 --- a/docs/docs/hydroflow/concepts/debugging.md +++ b/docs/docs/hydroflow/concepts/debugging.md @@ -1,5 +1,5 @@ --- -sidebar_position: 4 +sidebar_position: 5 --- # Debugging diff --git a/docs/docs/hydroflow/concepts/distributed_time.md b/docs/docs/hydroflow/concepts/distributed_time.md index e410612535a9..82f5e79c4075 100644 --- a/docs/docs/hydroflow/concepts/distributed_time.md +++ b/docs/docs/hydroflow/concepts/distributed_time.md @@ -1,5 +1,5 @@ --- -sidebar_position: 3 +sidebar_position: 4 --- # Distributed Time diff --git a/docs/docs/hydroflow/concepts/error_handling.md b/docs/docs/hydroflow/concepts/error_handling.md index 7c28c29142e7..3ec805991294 100644 --- a/docs/docs/hydroflow/concepts/error_handling.md +++ b/docs/docs/hydroflow/concepts/error_handling.md @@ -1,5 +1,5 @@ --- -sidebar_position: 5 +sidebar_position: 6 --- # Error Handling diff --git a/docs/docs/hydroflow/concepts/stratification.md b/docs/docs/hydroflow/concepts/stratification.md index 4d1042c810fc..b06018b3bbb3 100644 --- a/docs/docs/hydroflow/concepts/stratification.md +++ b/docs/docs/hydroflow/concepts/stratification.md @@ -1,5 +1,5 @@ --- -sidebar_position: 2 +sidebar_position: 3 --- # Streaming, Blocking and Stratification diff --git a/docs/docs/hydroflow/index.mdx b/docs/docs/hydroflow/index.mdx index f66b9e519099..1cf7a673b1de 100644 --- a/docs/docs/hydroflow/index.mdx +++ b/docs/docs/hydroflow/index.mdx @@ -11,7 +11,6 @@ import HydroflowDocs from '../../../hydroflow/README.md' ## This Book This book will teach you how to set up your environment to get started with Hydroflow, and how to program in the Hydroflow surface syntax. -Keep in mind that Hydroflow is under active development and is constantly -changing. However the code in this book is tested with the Hydroflow library so should always be up-to-date. +Keep in mind that Hydroflow is under active development. However the code in this book is tested with the Hydroflow library so should always be up-to-date. If you have any questions, feel free to [create an issue on Github](https://github.com/hydro-project/hydroflow/issues/new). diff --git a/docs/docs/hydroflow/quickstart/example_1_simplest.mdx b/docs/docs/hydroflow/quickstart/example_1_simplest.mdx index e378d92e4e55..980ad8f6f62d 100644 --- a/docs/docs/hydroflow/quickstart/example_1_simplest.mdx +++ b/docs/docs/hydroflow/quickstart/example_1_simplest.mdx @@ -62,12 +62,17 @@ item passed in. The Hydroflow surface syntax is merely a *specification*; it does not actually do anything until we run it. -We run the flow from within Rust via the [`run_available()` method](https://hydro-project.github.io/hydroflow/doc/hydroflow/scheduled/graph/struct.Hydroflow.html#method.run_available). +We can run this flow from within Rust via the [`run_available()` method](https://hydro-project.github.io/hydroflow/doc/hydroflow/scheduled/graph/struct.Hydroflow.html#method.run_available). + {getLines(exampleCode, 8)} + Note that `run_available()` runs the Hydroflow graph until no more work is immediately available. In this example flow, running the graph drains the iterator completely, so no more work will *ever* be available. In future examples we will use external inputs such as -network ingress, in which case more work might appear at any time. +network ingress, in which case more work might appear at any time. In those examples we may need a different method than `run_available()`, +e.g. the [`run_async()`](https://hydro-project.github.io/hydroflow/doc/hydroflow/scheduled/graph/struct.Hydroflow.html#method.run_async) method, +which we'll see +in [the EchoServer example](./example_7_echo_server). ### A Note on Project Structure The template project is intended to be a starting point for your own Hydroflow project, and you can add files and directories as you see fit. The only requirement is that the `src/main.rs` file exists and contains a `main()` function. diff --git a/docs/docs/hydroflow/quickstart/example_2_simple.mdx b/docs/docs/hydroflow/quickstart/example_2_simple.mdx index 9be2e31ae766..5e8cf286b711 100644 --- a/docs/docs/hydroflow/quickstart/example_2_simple.mdx +++ b/docs/docs/hydroflow/quickstart/example_2_simple.mdx @@ -61,6 +61,12 @@ Replace the contents of `src/main.rs` with the following: {exampleCode2} +Here the `filter_map` operator takes a map closure that returns a Rust [`Option`](https://doc.rust-lang.org/std/option/enum.Option.html). +If the value is `Some(...)`, it is passed to the output; if it is `None` it is filtered. + +The `flat_map` operator takes a map closure that generates a collection type (in this case a `Vec`) +which is flattened. + Results: {extractOutput(exampleOutput2)} diff --git a/docs/docs/hydroflow/quickstart/example_3_stream.mdx b/docs/docs/hydroflow/quickstart/example_3_stream.mdx index 9a806d9bd976..8ec6856c8766 100644 --- a/docs/docs/hydroflow/quickstart/example_3_stream.mdx +++ b/docs/docs/hydroflow/quickstart/example_3_stream.mdx @@ -12,7 +12,7 @@ import { getLines, extractOutput } from '../../../src/util'; > - the [`source_stream`](../syntax/surface_ops_gen.md#source_stream) operator that brings channel input into Hydroflow > - Rust syntax to programmatically send data to a (local) channel -In our previous examples, data came from within the Hydroflow spec, via Rust iterators and the [`source_iter`](../syntax/surface_ops_gen.md#source_iter) operator. In most cases, however, data comes from outside the Hydroflow spec. In this example, we'll see a simple version of this idea, with data being generated on the same machine and sent into the channel programmatically via Rust. +In our previous examples, data came from within the Hydroflow spec, via Rust iterators and the [`source_iter`](../syntax/surface_ops_gen.md#source_iter) operator. In most cases, however, data comes from outside the Hydroflow spec. In this example, we'll see a simple version of this idea, with data being generated on the same thread and sent into the channel programmatically via Rust. For discussion, we start with a skeleton much like before: diff --git a/docs/docs/hydroflow/quickstart/example_4_neighbors.mdx b/docs/docs/hydroflow/quickstart/example_4_neighbors.mdx index f9de47a80de4..2f755c35131b 100644 --- a/docs/docs/hydroflow/quickstart/example_4_neighbors.mdx +++ b/docs/docs/hydroflow/quickstart/example_4_neighbors.mdx @@ -14,7 +14,7 @@ import { getLines, extractOutput, extractMermaid } from '../../../src/util'; > * The [`unique`](../syntax/surface_ops_gen.md#unique) operator for removing duplicates from a stream > * Visualizing hydroflow code via `flow.meta_graph().expect(...).to_mermaid()` -So far all the operators we've used have one input and one output and therefore +So far, all the operators we've used have one input and one output and therefore create a linear flow of operators. Let's now take a look at a Hydroflow program containing an operator which has multiple inputs; in the following examples we'll extend this to multiple outputs. @@ -67,7 +67,7 @@ Run the program and focus on the last three lines of output, which come from `fl That looks right: the edges we "sent" into the flow that start at `0` are `(0, 1)` and `(0, 3)`, so the nodes reachable from `0` in 0 or 1 hops are `0, 1, 3`. -> Note: When you run the program you may see the lines printed out in a different order. That's OK; the flow we're defining here is producing a `set` of nodes, so the order in which they are printed out is not specified. The [`sort_by_key`](../syntax/surface_ops_gen.md#sort_by_key) operator can be used to sort the output of a flow. +> Note: When you run the program you may see the lines printed out in a different order. That's OK; the flow we're defining here uses the [`join()`](../syntax/surface_ops_gen.md#join) operator, which deals in `sets` of data items, so the order in which a `join()`'s output items are generated is not specified or guaranteed. The [`sort_by_key`](../syntax/surface_ops_gen.md#sort_by_key) operator can always be used to sort the output of a flow if needed. ## Examining the Hydroflow Code In the code, we want to start out with the origin vertex, `0`, diff --git a/docs/docs/hydroflow/quickstart/example_5_reachability.mdx b/docs/docs/hydroflow/quickstart/example_5_reachability.mdx index e9834ff8c8e3..364ff45c4f19 100644 --- a/docs/docs/hydroflow/quickstart/example_5_reachability.mdx +++ b/docs/docs/hydroflow/quickstart/example_5_reachability.mdx @@ -11,8 +11,7 @@ import { getLines, extractOutput, extractMermaid } from '../../../src/util'; > * Implementing a recursive algorithm (graph reachability) via cyclic dataflow > * Operators to union data from multiple inputs ([`union`](../syntax/surface_ops_gen.md#union)), and send data to multiple outputs ([`tee`](../syntax/surface_ops_gen.md#tee)) > * Indexing multi-output operators by appending a bracket expression -> * An example of how a cyclic dataflow in one stratum executes to completion before starting the next stratum. - +> * A first example of a cyclic (recursive) flow and the concept of fixpoint. To expand from graph neighbors to graph reachability, we want to find vertices that are connected not just to `origin`, but also to vertices reachable *transitively* from `origin`. Said differently, a vertex is reachable from `origin` if it is @@ -67,19 +66,16 @@ We route the `origin` vertex into it as one input right away: {getLines(exampleCode, 8, 12)} -Note the square-bracket syntax for differentiating the multiple inputs to `union()` -is the same as that of `join()` (except that union can have an unbounded number of inputs, -whereas `join()` is defined to only have two.) +Note the square-bracket syntax for assigning index names to the multiple inputs to `union()`; this is similar +to the indexes for `join()`, except that (a) union can have an arbitrary number of inputs, (b) the index names can be arbitrary strings, and (c) the indexes are optional can be omitted entirely. (By contrast, recall that +`join()` is defined to take 2 required input indexes, `[0]` and `[1]`). The only reason to assign index names to the inputs of `union()` is for labeling edges in the generated (e.g. Mermaid) graphs. -Now, `join()` is defined to only have one output. In our program, we want to copy the joined -output to two places: to the original `for_each` from above to print output, and *also* -back to the `union` operator we called `reached_vertices`. We feed the `join()` output +The next group of statements lays out the join of `reached_vertices` and the `stream_of_edges`. The `join()` operator is defined to only have one output, but in our program, we need its output twice: once to feed the original `for_each` from above to print output, and also to feed +back to the `union` operator that we called `reached_vertices`. We pass the `join()` output through a `flat_map()` as before, and then we feed the result into a [`tee()`](../syntax/surface_ops_gen.md#tee) operator, which is the mirror image of `union()`: instead of merging many inputs to one output, -it copies one input to many different outputs. Each input element is _cloned_, in Rust terms, and -given to each of the outputs. The syntax for the outputs of `tee()` mirrors that of union: we *append* -an output index in square brackets to the `tee` or variable. In this example we have -`my_join_tee[0] ->` and `my_join_tee[1] ->`. +it copies one input to many different outputs. Each input element is _cloned_, in Rust terms, and separate copy is given to each of the outputs. The syntax for the outputs of `tee()` mirrors that of the inputs to union: we can (optionally) *append* +an arbitrary output index name in square brackets to the `tee` or variable. In this example we have `my_join_tee[cycle] ->` and `my_join_tee[print] ->`. Finally, we process the output of the `join` as passed through the `tee`. One branch pushes reached vertices back up into the `reached_vertices` variable (which begins with a `union`), while the other @@ -87,9 +83,6 @@ prints out all the reached vertices as in the simple program. {getLines(exampleCode, 14, 17)} -Note the syntax for differentiating the *outputs* of a `tee()` is symmetric to that of `union()`, -showing up to the right of the variable rather than the left. - Below is the diagram rendered by [mermaid](https://mermaid-js.github.io/) showing the structure of the full flow: @@ -97,4 +90,7 @@ the structure of the full flow: This is similar to the flow for graph neighbors, but has a few more operators that make it look more complex. In particular, it includes the `union` and `tee` operators, and a cycle-forming back-edge. -There is also an auto-generated `handoff` operator that enforces the rule that a push producer and a pull consumer must be separated by a `handoff`. +There is also an auto-generated `handoff` operator that enforces the rule that a push producer and a pull consumer must be separated by a `handoff` (see the [Architecture section](../architecture/handoffs)). + +# Cyclic Dataflow +Many dataflow and workflow systems are restricted to acyclic graphs (DAGs), but Hydroflow supports cycles, as we see in this example. \ No newline at end of file diff --git a/docs/docs/hydroflow/quickstart/example_6_unreachability.mdx b/docs/docs/hydroflow/quickstart/example_6_unreachability.mdx index 20697fb7b606..18a1cdbcd5d2 100644 --- a/docs/docs/hydroflow/quickstart/example_6_unreachability.mdx +++ b/docs/docs/hydroflow/quickstart/example_6_unreachability.mdx @@ -11,6 +11,7 @@ import { getLines, extractOutput, extractMermaid } from '../../../src/util'; > * Extending a program with additional downstream logic. > * Hydroflow's ([`difference`](../syntax/surface_ops_gen.md#difference)) operator > * A first exposure to the concepts of _strata_ and _ticks_ +> * An example of how a cyclic dataflow in one stratum executes to completion before starting the next stratum. Our next example builds on the previous by finding vertices that are _not_ reachable. To do this, we need to capture the set `all_vertices`, and use a [difference](../syntax/surface_ops_gen.md#difference) operator to form the difference between that set of vertices and `reachable_vertices`. @@ -74,16 +75,17 @@ in order, one at a time, ensuring all values are computed before moving on to the next stratum. Between strata we see a _handoff_, which logically buffers the output of the first stratum, and delineates the separation of execution between the 2 strata. +If you look carefully, you'll see two subgraphs labeled with `stratum 0`. The reason that stratum 0 was broken into subgraphs has nothing to do with +correctness, but rather the way that Hydroflow graphs are compiled and scheduled (as +discussed in the section on [In-Out Trees](../architecture/in-out_trees). We need not concern ourselves with this detail other than to look carefully at the `stratum` labels on the grey boxes in our Mermaid diagrams. + All the subgraphs labeled `stratum 0` are run first to completion, and then all the subgraphs labeled `stratum 1` are run. This captures the requirements of the `difference` operator: it has to wait for its full negative input before it can start producing output. Note how the `difference` operator has two inputs (labeled `pos` and `neg`), and only the `neg` input shows up as blocking (with the bold edge ending in a ball). -Meanwhile, note stratum 0 has a recursive loop, and stratum 1 that computes `difference`, with the blocking input. This means that Hydroflow will first run the loop of stratum 0 repeatedly until all the transitive reached vertices are found, before moving on to compute the unreached vertices. +In this Mermaid graph, note that stratum 0 has a recursive loop back through `my_join`, and `tee`s off output to the `difference` operator in stratum 1 via the handoff and the blocking `neg` input. This means that Hydroflow will first run the loop of stratum 0 repeatedly until all the transitive reached vertices are passed to the handoff (a [fixpoint](../concepts/cyclic_flows)), before moving on to compute the unreached vertices via stratum 1. -After all strata are run, Hydroflow returns to the stratum 0; this begins the next _tick_. This doesn't really matter for this example, but it is important for long-running Hydroflow services that accept input from the outside world. More on this topic in the chapter on [time](../concepts/life_and_times.md). +After all strata are run, Hydroflow returns to stratum 0; this begins the next _tick_. This doesn't really matter for this example, but it is important for long-running Hydroflow services that accept input from the outside world. More on this topic in the chapter on [time](../concepts/life_and_times.md). -If you look carefully, you'll see two subgraphs labeled with `stratum 0`. The reason that stratum 0 was broken into subgraphs has nothing to do with -correctness, but rather the way that Hydroflow graphs are compiled and scheduled, as -discussed in the chapter on [Architecture](../architecture/index.mdx). diff --git a/docs/docs/hydroflow/quickstart/example_7_echo_server.mdx b/docs/docs/hydroflow/quickstart/example_7_echo_server.mdx index 4f100b22ef23..48ef12b3e315 100644 --- a/docs/docs/hydroflow/quickstart/example_7_echo_server.mdx +++ b/docs/docs/hydroflow/quickstart/example_7_echo_server.mdx @@ -31,23 +31,9 @@ cargo generate hydro-project/hydroflow-template ``` Then change directory into the resulting project. +The Hydroflow template project provides *this example* as its default, so there's no code for us to change. The `README.md` for the template project is a good place to start. It contains a brief overview of the project structure, and how to build and run the example. Here we'll spend more time learning from the code. -## Hydroflow project structure -The Hydroflow template project auto-generates this example for us. If you prefer, you can find the source in the `examples/echo_server` directory of the Hydroflow repository. - -The directory structure encouraged by the template is as follows: -```txt -project/README.md # documentation -project/Cargo.toml # package and dependency info -project/src/main.rs # main function -project/src/protocol.rs # message types exchanged between roles -project/src/helpers.rs # helper functions used by all roles -project/src/.rs # service definition for role A (e.g. server) -project/src/.rs # service definition for role B (e.g. client) -``` -In the default example, the roles we use are `Client` and `Server`, but you can imagine different roles depending on the structure of your service or application. - ### `main.rs` We start with a `main` function that parses command-line options, and invokes the appropriate role-specific service. @@ -61,7 +47,7 @@ Following that, we use Rust's [`clap`](https://docs.rs/clap/latest/clap/) (Comma This sets up 3 command-line flags: `role`, `addr`, and `server_addr`. Note how the `addr` and `server_addr` flags are made optional via wrapping in a Rust `Option`; by contrast, the `role` option is required. The `clap` crate will parse the command-line options and populate the `Opts` struct with the values. `clap` handles parsing the command line strings into the associated Rust types -- the `value_parser` attribute tells `clap` to use Hydroflow's `ipv4_resolve` helper function to parse a string like "127.0.0.1:6552" into a `SocketAddr`. -This brings us to the `main` function itself. It is prefaced by a `#[hydroflow::main]` attribute, which is a macro that sets up the tokio runtime for Hydroflow. This is necessary because Hydroflow uses the tokio runtime for asynchronous execution as a service. +This brings us to the `main` function itself. It is prefaced by a `#[hydroflow::main]` attribute, which is a macro that sets up the tokio runtime for Hydroflow. It is also an async function. This is necessary because Hydroflow uses the tokio runtime for asynchronous execution as a service. {getLines(main, 29, 40)} diff --git a/docs/docs/hydroflow/quickstart/example_8_chat_server.mdx b/docs/docs/hydroflow/quickstart/example_8_chat_server.mdx index 06d3c8c6c52e..00a1efd8c09a 100644 --- a/docs/docs/hydroflow/quickstart/example_8_chat_server.mdx +++ b/docs/docs/hydroflow/quickstart/example_8_chat_server.mdx @@ -53,18 +53,21 @@ To follow along, replace the contents of `src/server.rs` with the code below: {getLines(server, 1, 24)} -After a short prelude, we have the Hydroflow code near the top of `run_server()`. It begins by defining `outbound_chan` as a `union`d destination sink for network messages. Then we get to the +After a short prelude, we have the Hydroflow code near the top of `run_server()`. It begins by defining `outbound_chan` as a `union`ed destination sink for network messages. Then we get to the more interesting `inbound_chan` definition. The `inbound` channel is a source stream that will carry many -types of `Message`s. We use the [`demux`](../syntax/surface_ops_gen.md#demux) operator to partition the stream objects into three channels. The `clients` channel +types of `Message`s. +We first use a `map` operator to `unwrap` the Rust `Result` type that comes from deserializing the input +from `source_stream_serde`. +Then we use the [`demux`](../syntax/surface_ops_gen.md#demux) operator to partition the stream objects into three channels. The `clients` channel will carry the addresses of clients that have connected to the server. The `msgs` channel will carry the `ChatMsg` messages that clients send to the server. The `errs` channel will carry any other messages that clients send to the server. Note the structure of the `demux` operator: it takes a closure on -`(Message, SocketAddr)` pairs, and a variadic tuple (`var_args!`) of output channel names—in this case `clients`, `msgs`, and `errs`. The closure is basically a big +`(Message, SocketAddr)` pairs, and a variadic tuple (`var_args!`) of the output channel names—in this case `clients`, `msgs`, and `errs`. The closure is basically a big Rust pattern [`match`](https://doc.rust-lang.org/book/ch06-02-match.html), with one arm for each output channel name given in the variadic tuple. Note -that the different output channels can have different-typed messages! Note also that we destructure the incoming `Message` types into tuples of fields. (If we didn't we'd either have to write boilerplate code for each message type in every downstream pipeline, or face Rust's dreaded [refutable pattern](https://doc.rust-lang.org/book/ch18-02-refutability.html) error!) +that each output channel can have its own message type! Note also that we destructure the incoming `Message` types into component fields. (If we didn't we'd have to write boilerplate code to handle every possible `Message` type in every downstream pipeline!) The remainder of the server consists of two independent pipelines, the code to print out the flow graph, and the code to run the flow graph. To follow along, paste the following into the bottom of your `src/server.rs` file: @@ -152,7 +155,7 @@ To follow along, start by replacing the contents of `src/client.rs` with the fol {getLines(client, 1, 27)} This brings us to the `run_client` function. As in `run_server` we begin by ensuring the server address -is supplied. We then have the hydroflow code starting with a standard pattern of a `union`d `outbound_chan`, +is supplied. We then have the hydroflow code starting with a standard pattern of a `union`ed `outbound_chan`, and a `demux`ed `inbound_chan`. The client handles only two inbound `Message` types: `Message::ConnectResponse` and `Message::ChatMsg`. Paste the following to the bottom of `src/client.rs`: @@ -275,7 +278,7 @@ May 31, 5:12:40 alice: Is there anyone home? Now start client "bob" in terminal 3, and notice how he instantly receives the backlog of Alice's messages from the server's `cross_join`. (The messages may not be printed in the same order as they were timestamped! The `cross_join` operator is not guaranteed to preserve order, nor -is the udp network. Fixing these issues requires extra client logic that we leave as an exercise to the reader.) +is the udp network. Fixing these issues requires extra client logic (perhaps using the [`sort()`](../syntax/surface_ops_gen#sort) operator) that we leave as an exercise to the reader.) ```console #shell-command-next-line cargo run -- --name "bob" --role client --server-addr 127.0.0.1:12347 diff --git a/docs/docs/hydroflow/quickstart/setup.md b/docs/docs/hydroflow/quickstart/setup.md index 08b76be74e07..f3a2c477688d 100644 --- a/docs/docs/hydroflow/quickstart/setup.md +++ b/docs/docs/hydroflow/quickstart/setup.md @@ -47,9 +47,7 @@ cargo install cargo-generate ## VS Code Setup We recommend using VS Code with the `rust-analyzer` extension (and NOT the -`Rust` extension). To enable the pre-release version of `rust-analyzer` -(required by some new nightly syntax we use, at least until 2022-03-14), click -the "Switch to Pre-Release Version" button next to the uninstall button. +`Rust` extension). ## Setting up a Hydroflow Project The easiest way to get started with Hydroflow is to begin with a template project. @@ -122,9 +120,10 @@ will provide inline type and error messages, code completion, etc. To work with the repository, it's best to start with an "example", found in the [`hydroflow/examples` folder](https://github.com/hydro-project/hydroflow/tree/main/hydroflow/examples). -These examples are included via the [`hydroflow/Cargo.toml` file](https://github.com/hydro-project/hydroflow/blob/main/hydroflow/Cargo.toml), -so make sure to add your example there if you create a new one. The simplest -example is the [`echo server`](https://github.com/hydro-project/hydroflow/blob/main/hydroflow/examples/echoserver/main.rs). +The simplest example is the +['hello world'](https://github.com/hydro-project/hydroflow/blob/main/hydroflow/examples/hello_world/main.rs) example; +the simplest example with networking is the +[`echo server`](https://github.com/hydro-project/hydroflow/blob/main/hydroflow/examples/echoserver/main.rs). The Hydroflow repository is set up as a [workspace](https://doc.rust-lang.org/book/ch14-03-cargo-workspaces.html), i.e. a repo containing a bunch of separate packages, `hydroflow` is just the diff --git a/docs/docs/hydroflow/syntax/surface_data.mdx b/docs/docs/hydroflow/syntax/surface_data.mdx index 6e6d3a18e421..66a27b4c99dd 100644 --- a/docs/docs/hydroflow/syntax/surface_data.mdx +++ b/docs/docs/hydroflow/syntax/surface_data.mdx @@ -6,20 +6,27 @@ import exampleCode from '!!raw-loader!../../../../hydroflow/examples/example_syn # Data Sources and Sinks in Rust Any useful flow requires us to define sources of data, either generated computationally or received from -and outside environment via I/O. +an outside environment via I/O. -## `source_iter` -A flow can receive data from a Rust collection object via the `source_iter` operator, which takes the +## One-time Iterator Sources +A flow can receive data from a Rust collection object via the [`source_iter()`](./surface_ops_gen.md#source_iter) operator, which takes the iterable collection as an argument and passes the items down the flow. For example, here we iterate through a vector of `usize` items and push them down the flow: ```rust,ignore source_iter(vec![0, 1]) -> ... ``` -The Hello, World example above uses this construct. +The Hello, World example above uses this construct. -## `source_stream` +The [`source_file()`](./surface_ops_gen.md#source_file) and [`source_json()`](./surface_ops_gen.md#source_json) operators are similar, but read from a specified file. + +All of these operators output the contents of their collection only once, during the first tick. +To output every tick, consider feeding results into a [`persist()`](./surface_ops_gen.md#persist) +operator. + +## Streaming Sources More commonly, a flow should handle external data coming in asynchronously from a [_Tokio_ runtime](https://tokio.rs/tokio/tutorial). -One way to do this is with _channels_ that allow Rust code to send data into the Hydroflow inputs. + +One way to do this is with _channels_ that allow Rust code to send data into Hydroflow via the [`source_stream()`](./surface_ops_gen.md#source_stream) operator. The code below creates a channel for data of (Rust) type `(usize, usize)`: ```rust,ignore let (input_send, input_recv) = hydroflow::util::unbounded_channel::<(usize, usize)>(); @@ -30,7 +37,7 @@ it explicitly as follows: ```rust,ignore input_send.send((0, 1)).unwrap() ``` -And in our Hydroflow syntax we can receive the data from the channel using the `source_stream` syntax and +And in our Hydroflow syntax we can receive the data from the channel using the [`source_stream()`](./surface_ops_gen.md#source_stream) operator and pass it along a flow: ```rust,ignore source_stream(input_recv) -> ... @@ -41,4 +48,19 @@ in from outside the flow: {exampleCode} -TODO: add the remaining sources. \ No newline at end of file +Sometimes we want to trigger activity based on timing, not data. To achieve this, we can use the [`source_interval()`](./surface_ops_gen.md#source_interval) operator, which takes a `Duration` `d` as an argument, and outputs a Tokio time Instant after every `d` units of time pass. + +## Destinations +As duals to our data source operators, we also have data destination operators. The dest operators you'll likely use +most often are [`dest_sink()`](./surface_ops_gen.md#dest_sink) and [`dest_file()`](./surface_ops_gen.md#dest_file). They are fairly +straightforward, so the best source for further information is the documentation you can find by following the links on the operator names above. + +## SerDe: Network Serialization and Deserialization +One of the mechanical annoyances of networked systems is the need to convert data to wire format ("serialization") and convert it back from wire format to data ("deserialization"), +also known as "SerDe". +This can be done with `map` functions, but we provide a convenience source/sink pair that does serde and networking for you. +The source side, [`source_serde()`](./surface_ops_gen.md#source_serde) generates tuples of the type `(T, SocketAddr)`, +where the first field is a deserialized item of type `T`, and the second field is the address of the sender of the item. +The dest side, [`source_serde()`](./surface_ops_gen.md#source_serde), takes in tuples of type `(T, SocketAddr)`, +where the first field is an item of type `T` to be serialized, and the second field is a destination address. + diff --git a/docs/docs/hydroflow/syntax/surface_flows.md b/docs/docs/hydroflow/syntax/surface_flows.md index 2f56d55a9703..1d55b2c756de 100644 --- a/docs/docs/hydroflow/syntax/surface_flows.md +++ b/docs/docs/hydroflow/syntax/surface_flows.md @@ -23,15 +23,15 @@ data, making the program more understandable. ## Operators with Multiple Ports Some operators have more than one input _port_ that can be referenced by `->`. For example [`union`](./surface_ops_gen.md#union) -unions the contents of many flows, so it can have an abitrary number of input ports. Some operators have multiple outputs, notably [`tee`](./surface_ops_gen.md#tee), -which has an arbitrary number of outputs. +unions the contents of many flows, so it can have an abitrary number of input ports. Some operators have multiple outputs; [`tee`](./surface_ops_gen.md#tee) and [`demux`](./surface_ops_gen.md#demux) +have an arbitrary number of outputs. In the syntax, we optionally distinguish input ports via an _indexing prefix_ string -in square brackets before the name (e.g. `[0]my_union` and `[1]my_union`). Binary operators --- -those with two distinct input ports --- require indexing prefixes, and require them to be `0` and `1`. -Operators with arbitrary numbers of inputs ([`union`](./surface_ops_gen.md#union)) and outputs +in square brackets before the name (e.g. `[0]my_union` and `[1]my_union`). Most operators with a fixed number of input ports\ require specific indexing prefixes to +distinguish the inputs. For example, the inputs to [`join`](./surface_ops_gen.md#join) must be `[0]` and `[1]`; the inputs to [`difference`](./surface_ops_gen.md#difference) must be `[pos]` and `[neg]`. +Operators with an arbitrary number of inputs ([`union`](./surface_ops_gen.md#union)) and outputs ([`demux`](./surface_ops_gen.md#demux), [`tee`](./surface_ops_gen.md#tee)) -allow for arbitrary strings, which can make code and dataflow graphs more readable and understandable +allow you to choose arbitrary strings, which help you make your code and dataflow graphs more readable and understandable (e.g. `my_tee[print]` and `my_tee[continue]`). Here is an example that tees one flow into two, handles each separately, and then unions them to print out the contents in both lowercase and uppercase: diff --git a/hydroflow/README.md b/hydroflow/README.md index 1337aa592b97..8aa98d34cd24 100644 --- a/hydroflow/README.md +++ b/hydroflow/README.md @@ -1,16 +1,23 @@ # Hydroflow -[Hydroflow](https://github.com/hydro-project/hydroflow) is a compiler for low-latency -dataflow programs, written in Rust. Hydroflow is the runtime library for the +[Hydroflow](https://github.com/hydro-project/hydroflow) is a small language and compiler for low-latency +dataflow programs, written in Rust. Hydroflow serves as the runtime library for the [Hydro language stack](https://hydro.run/docs/hydroflow/ecosystem), which is under development as a complete compiler stack for distributed programming languages. -Hydroflow is designed with two goals in mind: -- Expert developers can program Hydroflow directly to build components in a distributed system. -- Higher levels of the Hydro stack will offer friendlier languages with more abstractions, and treat Hydroflow as a compiler target. +Hydroflow is designed with two use cases in mind: + - Expert developers can program directly in the Hydroflow language to build individual components that can interoperate in a distributed program or service. + - Higher levels of the Hydro stack will offer higher-level abstractions and DSLs, and treat Hydroflow as a compiler target. -Hydroflow provides a DSL—the *surface syntax*—embedded in Rust, which compiles to high-efficiency machine code. +Hydroflow is targeted at supporting the following unique features: + 1. A type system that helps developers reason about progress and consistency guarantees in a distributed program. This includes an emphasis on [lattice types](https://hydro.run/docs/hydroflow/lattices_crate/) that can allow for consistent outcomes in the face of network messages that may be interleaved, reordered, batched, and resent. + 2. A [dataflow programming model](https://hydro.run/docs/hydroflow/syntax/surface_flows), capturing the message- and data-driven nature of many distributed services. + 3. Extremely low-latency handling of asynchronously-arriving data/messages, via aggressive exploitation of Rust's [monomorphization](https://rustc-dev-guide.rust-lang.org/backend/monomorph.html) techniques. + 4. Dataflow optimizations, both to optimize single-node Hydroflow flows, and to enable distributed optimizations across multiple flows. + +Hydroflow's language—the Hydroflow *surface syntax*—is embedded in Rust, which compiles Hydroflow code to high-efficiency machine code. As the lowest level of the Hydro stack, Hydroflow requires some knowledge of Rust to use. -Check out the [Hydroflow Playground](https://hydro.run/playground) to see Hydroflow's surface syntax in action! -Or read the [Hydroflow Book docs](https://hydro.run/docs/hydroflow/#this-book) to get started. +The most recent release of the [Hydroflow Book docs](https://hydro.run/docs/hydroflow/#this-book) are online, providing documentation and numerous annotated examples. + +You can also check out the [Hydroflow Playground](https://hydro.run/playground) to see Hydroflow's surface syntax in action! diff --git a/hydroflow/examples/chat/client.rs b/hydroflow/examples/chat/client.rs index 19a76eecf93c..b3ba7c05e331 100644 --- a/hydroflow/examples/chat/client.rs +++ b/hydroflow/examples/chat/client.rs @@ -48,15 +48,15 @@ pub(crate) async fn run_client(outbound: UdpSink, inbound: UdpStream, opts: Opts source_iter([()]) -> map(|_m| (Message::ConnectRequest, server_addr)) -> [0]outbound_chan; // take stdin and send to server as a msg - // the cross_join serves to buffer msgs until the connection request is acked - msg_send = cross_join() -> map(|(msg, _)| (msg, server_addr)) -> [1]outbound_chan; + // the batch serves to buffer msgs until the connection request is acked + msg_send = batch() -> map(|msg| (msg, server_addr)) -> [1]outbound_chan; lines = source_stdin() -> map(|l| Message::ChatMsg { nickname: opts.name.clone(), message: l.unwrap(), ts: Utc::now()}) - -> [0]msg_send; - inbound_chan[acks] -> [1]msg_send; + -> [input]msg_send; + inbound_chan[acks] -> persist() -> [signal]msg_send; // receive and print messages inbound_chan[msgs] -> for_each(pretty_print_msg); diff --git a/hydroflow/examples/chat/main.rs b/hydroflow/examples/chat/main.rs index 2b12c1cba19f..6219fe205278 100644 --- a/hydroflow/examples/chat/main.rs +++ b/hydroflow/examples/chat/main.rs @@ -66,7 +66,7 @@ fn test() { let (_server, _, mut server_output) = run_cargo_example( "chat", - "--role server --name server --addr 127.0.0.100:2050", + "--role server --name server --addr 127.0.0.100:12347", ); let mut server_output_so_far = String::new(); @@ -78,12 +78,12 @@ fn test() { let (_client1, mut client1_input, mut client1_output) = run_cargo_example( "chat", - "--role client --name client1 --server-addr 127.0.0.100:2050", + "--role client --name client1 --server-addr 127.0.0.100:12347", ); let (_client2, _, mut client2_output) = run_cargo_example( "chat", - "--role client --name client2 --server-addr 127.0.0.100:2050", + "--role client --name client2 --server-addr 127.0.0.100:12347", ); let mut client1_output_so_far = String::new(); diff --git a/hydroflow/examples/chat/server.rs b/hydroflow/examples/chat/server.rs index b898eb19c2b0..eb9d311e035a 100644 --- a/hydroflow/examples/chat/server.rs +++ b/hydroflow/examples/chat/server.rs @@ -13,13 +13,13 @@ pub(crate) async fn run_server(outbound: UdpSink, inbound: UdpStream, opts: Opts outbound_chan = union() -> dest_sink_serde(outbound); inbound_chan = source_stream_serde(inbound) -> map(Result::unwrap) - -> demux(|(msg, addr), var_args!(clients, msgs, errs)| + -> demux(|(msg, addr), var_args!(clients, msgs, errs)| match msg { Message::ConnectRequest => clients.give(addr), Message::ChatMsg {..} => msgs.give(msg), _ => errs.give(msg), } - ); + ); clients = inbound_chan[clients] -> tee(); inbound_chan[errs] -> for_each(|m| println!("Received unexpected message type: {:?}", m)); @@ -27,9 +27,9 @@ pub(crate) async fn run_server(outbound: UdpSink, inbound: UdpStream, opts: Opts clients[0] -> map(|addr| (Message::ConnectResponse, addr)) -> [0]outbound_chan; // Pipeline 2: Broadcast messages to all clients - broadcast = cross_join() -> [1]outbound_chan; inbound_chan[msgs] -> [0]broadcast; clients[1] -> [1]broadcast; + broadcast = cross_join() -> [1]outbound_chan; }; if let Some(graph) = opts.graph { diff --git a/hydroflow/examples/example_5_reachability.rs b/hydroflow/examples/example_5_reachability.rs index 749ce0a2d635..ab8be9e61c52 100644 --- a/hydroflow/examples/example_5_reachability.rs +++ b/hydroflow/examples/example_5_reachability.rs @@ -8,17 +8,20 @@ pub fn main() { // inputs: the origin vertex (vertex 0) and stream of input edges origin = source_iter(vec![0]); stream_of_edges = source_stream(edges_recv); - origin -> [0]reached_vertices; - reached_vertices = union(); // the join reached_vertices -> map(|v| (v, ())) -> [0]my_join_tee; stream_of_edges -> [1]my_join_tee; my_join_tee = join() -> flat_map(|(src, ((), dst))| [src, dst]) -> tee(); - // the loop and the output - my_join_tee[0] -> [1]reached_vertices; - my_join_tee[1] -> unique() -> for_each(|x| println!("Reached: {}", x)); + // the cycle: my_join_tee gets data from reached_vertices + // and provides data back to reached_vertices! + origin -> [base]reached_vertices; + my_join_tee -> [cycle]reached_vertices; + reached_vertices = union(); + + // the output + my_join_tee[print] -> unique() -> for_each(|x| println!("Reached: {}", x)); }; println!( @@ -33,5 +36,6 @@ pub fn main() { edges_send.send((1, 2)).unwrap(); edges_send.send((0, 3)).unwrap(); edges_send.send((0, 3)).unwrap(); + edges_send.send((4, 0)).unwrap(); flow.run_available(); } diff --git a/hydroflow/examples/example_6_unreachability.rs b/hydroflow/examples/example_6_unreachability.rs index 6fd8a2100dbf..ae3238fb7d1a 100644 --- a/hydroflow/examples/example_6_unreachability.rs +++ b/hydroflow/examples/example_6_unreachability.rs @@ -5,25 +5,27 @@ pub fn main() { let (pairs_send, pairs_recv) = hydroflow::util::unbounded_channel::<(usize, usize)>(); let mut flow = hydroflow_syntax! { + // inputs: the origin vertex (vertex 0) and stream of input edges origin = source_iter(vec![0]); stream_of_edges = source_stream(pairs_recv) -> tee(); - reached_vertices = union()->tee(); - origin -> [0]reached_vertices; // the join for reachable vertices - my_join = join() -> flat_map(|(src, ((), dst))| [src, dst]); reached_vertices[0] -> map(|v| (v, ())) -> [0]my_join; stream_of_edges[1] -> [1]my_join; + my_join = join() -> flat_map(|(src, ((), dst))| [src, dst]); - // the loop - my_join -> [1]reached_vertices; + // the cycle: my_join gets data from reached_vertices + // and provides data back to reached_vertices! + origin -> [base]reached_vertices; + my_join -> [cycle]reached_vertices; + reached_vertices = union()->tee(); - // the difference all_vertices - reached_vertices + // the difference: all_vertices - reached_vertices all_vertices = stream_of_edges[0] -> flat_map(|(src, dst)| [src, dst]) -> tee(); - unreached_vertices = difference(); all_vertices[0] -> [pos]unreached_vertices; reached_vertices[1] -> [neg]unreached_vertices; + unreached_vertices = difference(); // the output all_vertices[1] -> unique() -> for_each(|v| println!("Received vertex: {}", v)); diff --git a/hydroflow/examples/example_naturals.rs b/hydroflow/examples/example_naturals.rs new file mode 100644 index 000000000000..e9d723709757 --- /dev/null +++ b/hydroflow/examples/example_naturals.rs @@ -0,0 +1,14 @@ +use hydroflow::hydroflow_syntax; + +pub fn main() { + let mut _flow = hydroflow_syntax! { + base = source_iter(vec![1]) -> cycle; + cycle = union() + -> map(|i| i + 1) + -> inspect(|i| println!("{}", i)) + -> cycle; + }; + + // Let's not run this -- it will go forever! + // flow.run_available(); +} diff --git a/hydroflow/tests/snapshots/surface_examples__example_5_reachability.snap b/hydroflow/tests/snapshots/surface_examples__example_5_reachability.snap index c80718351812..72b40ef16a24 100644 --- a/hydroflow/tests/snapshots/surface_examples__example_5_reachability.snap +++ b/hydroflow/tests/snapshots/surface_examples__example_5_reachability.snap @@ -10,34 +10,34 @@ linkStyle default stroke:#aaa,stroke-width:4px,color:red,font-size:1.5em; subgraph sg_1v1 ["sg_1v1 stratum 0"] 1v1[\"(1v1) source_iter(vec![0])"/]:::pullClass 2v1[\"(2v1) source_stream(edges_recv)"/]:::pullClass - 3v1[\"(3v1) union()"/]:::pullClass - 4v1[\"(4v1) map(|v| (v, ()))"/]:::pullClass - 5v1[\"(5v1) join()"/]:::pullClass - 6v1[\"(6v1) flat_map(|(src, ((), dst))| [src, dst])"/]:::pullClass - 7v1[/"(7v1) tee()"\]:::pushClass + 7v1[\"(7v1) union()"/]:::pullClass + 3v1[\"(3v1) map(|v| (v, ()))"/]:::pullClass + 4v1[\"(4v1) join()"/]:::pullClass + 5v1[\"(5v1) flat_map(|(src, ((), dst))| [src, dst])"/]:::pullClass + 6v1[/"(6v1) tee()"\]:::pushClass 8v1[/"(8v1) unique()"\]:::pushClass 9v1[/"(9v1) for_each(|x| println!("Reached: {}", x))"\]:::pushClass 10v1["(10v1) handoff"]:::otherClass - 10v1--1--->3v1 - 1v1--0--->3v1 - 2v1--1--->5v1 - 3v1--->4v1 - 4v1--0--->5v1 + 10v1--cycle--->7v1 + 1v1--base--->7v1 + 2v1--1--->4v1 + 7v1--->3v1 + 3v1--0--->4v1 + 4v1--->5v1 5v1--->6v1 - 6v1--->7v1 - 7v1--0--->10v1 - 7v1--1--->8v1 + 6v1--->10v1 + 6v1--print--->8v1 8v1--->9v1 subgraph sg_1v1_var_my_join_tee ["var my_join_tee"] + 4v1 5v1 6v1 - 7v1 end subgraph sg_1v1_var_origin ["var origin"] 1v1 end subgraph sg_1v1_var_reached_vertices ["var reached_vertices"] - 3v1 + 7v1 end subgraph sg_1v1_var_stream_of_edges ["var stream_of_edges"] 2v1 diff --git a/hydroflow/tests/snapshots/surface_examples__example_6_unreachability.snap b/hydroflow/tests/snapshots/surface_examples__example_6_unreachability.snap index b1cda51e5c06..8b1ebb7ed9a0 100644 --- a/hydroflow/tests/snapshots/surface_examples__example_6_unreachability.snap +++ b/hydroflow/tests/snapshots/surface_examples__example_6_unreachability.snap @@ -9,29 +9,29 @@ classDef pushClass fill:#ff8,stroke:#000,text-align:left,white-space:pre linkStyle default stroke:#aaa,stroke-width:4px,color:red,font-size:1.5em; subgraph sg_1v1 ["sg_1v1 stratum 0"] 1v1[\"(1v1) source_iter(vec![0])"/]:::pullClass - 8v1[\"(8v1) map(|v| (v, ()))"/]:::pullClass - 6v1[\"(6v1) join()"/]:::pullClass - 7v1[\"(7v1) flat_map(|(src, ((), dst))| [src, dst])"/]:::pullClass - 4v1[\"(4v1) union()"/]:::pullClass - 5v1[/"(5v1) tee()"\]:::pushClass + 4v1[\"(4v1) map(|v| (v, ()))"/]:::pullClass + 5v1[\"(5v1) join()"/]:::pullClass + 6v1[\"(6v1) flat_map(|(src, ((), dst))| [src, dst])"/]:::pullClass + 7v1[\"(7v1) union()"/]:::pullClass + 8v1[/"(8v1) tee()"\]:::pushClass 15v1["(15v1) handoff"]:::otherClass - 15v1--->8v1 - 1v1--0--->4v1 - 8v1--0--->6v1 - 6v1--->7v1 - 7v1--1--->4v1 - 4v1--->5v1 - 5v1--0--->15v1 + 15v1--->4v1 + 1v1--base--->7v1 + 4v1--0--->5v1 + 5v1--->6v1 + 6v1--cycle--->7v1 + 7v1--->8v1 + 8v1--0--->15v1 subgraph sg_1v1_var_my_join ["var my_join"] + 5v1 6v1 - 7v1 end subgraph sg_1v1_var_origin ["var origin"] 1v1 end subgraph sg_1v1_var_reached_vertices ["var reached_vertices"] - 4v1 - 5v1 + 7v1 + 8v1 end end subgraph sg_2v1 ["sg_2v1 stratum 0"] @@ -64,10 +64,10 @@ subgraph sg_3v1 ["sg_3v1 stratum 1"] end end 3v1--1--->16v1 -5v1--1--->18v1 +8v1--1--->18v1 10v1--0--->17v1 16v1["(16v1) handoff"]:::otherClass -16v1--1--->6v1 +16v1--1--->5v1 17v1["(17v1) handoff"]:::otherClass 17v1--pos--->11v1 18v1["(18v1) handoff"]:::otherClass diff --git a/hydroflow/tests/snapshots/surface_examples__example_naturals.snap b/hydroflow/tests/snapshots/surface_examples__example_naturals.snap new file mode 100644 index 000000000000..01a15b13afb4 --- /dev/null +++ b/hydroflow/tests/snapshots/surface_examples__example_naturals.snap @@ -0,0 +1,5 @@ +--- +source: hydroflow/tests/surface_examples.rs +expression: output +--- + diff --git a/hydroflow_lang/src/graph/ops/join.rs b/hydroflow_lang/src/graph/ops/join.rs index 7d69935dcb68..86b4f71f17c2 100644 --- a/hydroflow_lang/src/graph/ops/join.rs +++ b/hydroflow_lang/src/graph/ops/join.rs @@ -43,6 +43,10 @@ use crate::graph::{OpInstGenerics, OperatorInstance}; /// // etc. /// ``` /// +/// `join` is defined to treat its inputs as *sets*, meaning that it +/// eliminates duplicated values in its inputs. If you do not want +/// duplicates eliminated, use the [`join_multiset`](#join_multiset) operator. +/// /// ### Examples /// /// ```rustbook diff --git a/hydroflow_lang/src/graph/ops/join_multiset.rs b/hydroflow_lang/src/graph/ops/join_multiset.rs index b01de28de545..b9c6ec3019cc 100644 --- a/hydroflow_lang/src/graph/ops/join_multiset.rs +++ b/hydroflow_lang/src/graph/ops/join_multiset.rs @@ -10,6 +10,9 @@ use crate::graph::{OpInstGenerics, OperatorInstance}; /// /// This operator is equivalent to `join` except that the LHS and RHS are collected into multisets rather than sets before joining. /// +/// If you want +/// duplicates eliminated from the inputs, use the [`join`](#join) operator. +/// /// For example: /// ```hydroflow /// lhs = source_iter([("a", 0), ("a", 0)]) -> tee();