Skip to content

Commit

Permalink
feat(hydroflow_lang): allow reduce() to be referenceable as a singl…
Browse files Browse the repository at this point in the history
…eton, fix docs and bugs (#1150)

* fixed bug: accumulator closures could have return values, which would be ignored
* updated docs
  • Loading branch information
MingweiSamuel authored Apr 9, 2024
1 parent 82b3030 commit 5679bfb
Show file tree
Hide file tree
Showing 10 changed files with 501 additions and 116 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
---
source: hydroflow/tests/surface_singleton.rs
expression: "df.meta_graph().unwrap().to_dot(&Default::default())"
---
digraph {
node [fontname="Monaco,Menlo,Consolas,"Droid Sans Mono",Inconsolata,"Courier New",monospace", style=filled];
edge [fontname="Monaco,Menlo,Consolas,"Droid Sans Mono",Inconsolata,"Courier New",monospace"];
n1v1 [label="(n1v1) source_iter(1..=10)", shape=invhouse, fillcolor="#88aaff"]
n2v1 [label="(n2v1) source_iter(3..=5)", shape=invhouse, fillcolor="#88aaff"]
n3v1 [label="(n3v1) reduce(|a, b| *a = std::cmp::max(*a, b))", shape=invhouse, fillcolor="#88aaff"]
n4v1 [label="(n4v1) filter(|&value| { value <= max_of_stream2.unwrap_or(0) })", shape=invhouse, fillcolor="#88aaff"]
n5v1 [label="(n5v1) map(|x| (context.current_tick(), x))", shape=invhouse, fillcolor="#88aaff"]
n6v1 [label="(n6v1) for_each(|x| filter_send.send(x).unwrap())", shape=house, fillcolor="#ffff88"]
n7v1 [label="(n7v1) map(|x| (context.current_tick(), x))", shape=invhouse, fillcolor="#88aaff"]
n8v1 [label="(n8v1) for_each(|x| max_send.send(x).unwrap())", shape=house, fillcolor="#ffff88"]
n9v1 [label="(n9v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n2v1 -> n9v1
n5v1 -> n6v1
n4v1 -> n5v1
n1v1 -> n4v1
n7v1 -> n8v1
n3v1 -> n7v1
n9v1 -> n3v1 [color=red]
n3v1 -> n4v1 [color=red]
subgraph "cluster n1v1" {
fillcolor="#dddddd"
style=filled
label = "sg_1v1\nstratum 0"
n2v1
subgraph "cluster_sg_1v1_var_stream2" {
label="var stream2"
n2v1
}
}
subgraph "cluster n2v1" {
fillcolor="#dddddd"
style=filled
label = "sg_2v1\nstratum 2"
n1v1
n4v1
n5v1
n6v1
subgraph "cluster_sg_2v1_var_filtered_stream1" {
label="var filtered_stream1"
n4v1
n5v1
n6v1
}
subgraph "cluster_sg_2v1_var_stream1" {
label="var stream1"
n1v1
}
}
subgraph "cluster n3v1" {
fillcolor="#dddddd"
style=filled
label = "sg_3v1\nstratum 1"
n3v1
n7v1
n8v1
subgraph "cluster_sg_3v1_var_max_of_stream2" {
label="var max_of_stream2"
n3v1
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
---
source: hydroflow/tests/surface_singleton.rs
expression: "df.meta_graph().unwrap().to_mermaid(&Default::default())"
---
%%{init:{'theme':'base','themeVariables':{'clusterBkg':'#ddd','clusterBorder':'#888'}}}%%
flowchart TD
classDef pullClass fill:#8af,stroke:#000,text-align:left,white-space:pre
classDef pushClass fill:#ff8,stroke:#000,text-align:left,white-space:pre
classDef otherClass fill:#fdc,stroke:#000,text-align:left,white-space:pre
linkStyle default stroke:#aaa
1v1[\"(1v1) <code>source_iter(1..=10)</code>"/]:::pullClass
2v1[\"(2v1) <code>source_iter(3..=5)</code>"/]:::pullClass
3v1[\"(3v1) <code>reduce(|a, b| *a = std::cmp::max(*a, b))</code>"/]:::pullClass
4v1[\"(4v1) <code>filter(|&amp;value| { value &lt;= max_of_stream2.unwrap_or(0) })</code>"/]:::pullClass
5v1[\"(5v1) <code>map(|x| (context.current_tick(), x))</code>"/]:::pullClass
6v1[/"(6v1) <code>for_each(|x| filter_send.send(x).unwrap())</code>"\]:::pushClass
7v1[\"(7v1) <code>map(|x| (context.current_tick(), x))</code>"/]:::pullClass
8v1[/"(8v1) <code>for_each(|x| max_send.send(x).unwrap())</code>"\]:::pushClass
9v1["(9v1) <code>handoff</code>"]:::otherClass
2v1-->9v1
5v1-->6v1
4v1-->5v1
1v1-->4v1
7v1-->8v1
3v1-->7v1
9v1--x3v1; linkStyle 6 stroke:red
3v1--x4v1; linkStyle 7 stroke:red
subgraph sg_1v1 ["sg_1v1 stratum 0"]
2v1
subgraph sg_1v1_var_stream2 ["var <tt>stream2</tt>"]
2v1
end
end
subgraph sg_2v1 ["sg_2v1 stratum 2"]
1v1
4v1
5v1
6v1
subgraph sg_2v1_var_filtered_stream1 ["var <tt>filtered_stream1</tt>"]
4v1
5v1
6v1
end
subgraph sg_2v1_var_stream1 ["var <tt>stream1</tt>"]
1v1
end
end
subgraph sg_3v1 ["sg_3v1 stratum 1"]
3v1
7v1
8v1
subgraph sg_3v1_var_max_of_stream2 ["var <tt>max_of_stream2</tt>"]
3v1
end
end

Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
---
source: hydroflow/tests/surface_singleton.rs
expression: "df.meta_graph().unwrap().to_dot(&Default::default())"
---
digraph {
node [fontname="Monaco,Menlo,Consolas,&quot;Droid Sans Mono&quot;,Inconsolata,&quot;Courier New&quot;,monospace", style=filled];
edge [fontname="Monaco,Menlo,Consolas,&quot;Droid Sans Mono&quot;,Inconsolata,&quot;Courier New&quot;,monospace"];
n1v1 [label="(n1v1) source_iter(1..=10)", shape=invhouse, fillcolor="#88aaff"]
n2v1 [label="(n2v1) source_iter(3..=5)", shape=invhouse, fillcolor="#88aaff"]
n3v1 [label="(n3v1) reduce(|a, b| *a = std::cmp::max(*a, b))", shape=house, fillcolor="#ffff88"]
n4v1 [label="(n4v1) filter(|&value| { value <= max_of_stream2.unwrap_or(0) })", shape=invhouse, fillcolor="#88aaff"]
n5v1 [label="(n5v1) map(|x| (context.current_tick(), x))", shape=invhouse, fillcolor="#88aaff"]
n6v1 [label="(n6v1) for_each(|x| filter_send.send(x).unwrap())", shape=house, fillcolor="#ffff88"]
n7v1 [label="(n7v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n2v1 -> n7v1
n5v1 -> n6v1
n4v1 -> n5v1
n1v1 -> n4v1
n7v1 -> n3v1 [color=red]
n3v1 -> n4v1 [color=red]
subgraph "cluster n1v1" {
fillcolor="#dddddd"
style=filled
label = "sg_1v1\nstratum 0"
n2v1
subgraph "cluster_sg_1v1_var_stream2" {
label="var stream2"
n2v1
}
}
subgraph "cluster n2v1" {
fillcolor="#dddddd"
style=filled
label = "sg_2v1\nstratum 1"
n3v1
subgraph "cluster_sg_2v1_var_max_of_stream2" {
label="var max_of_stream2"
n3v1
}
}
subgraph "cluster n3v1" {
fillcolor="#dddddd"
style=filled
label = "sg_3v1\nstratum 2"
n1v1
n4v1
n5v1
n6v1
subgraph "cluster_sg_3v1_var_filtered_stream1" {
label="var filtered_stream1"
n4v1
n5v1
n6v1
}
subgraph "cluster_sg_3v1_var_stream1" {
label="var stream1"
n1v1
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
---
source: hydroflow/tests/surface_singleton.rs
expression: "df.meta_graph().unwrap().to_mermaid(&Default::default())"
---
%%{init:{'theme':'base','themeVariables':{'clusterBkg':'#ddd','clusterBorder':'#888'}}}%%
flowchart TD
classDef pullClass fill:#8af,stroke:#000,text-align:left,white-space:pre
classDef pushClass fill:#ff8,stroke:#000,text-align:left,white-space:pre
classDef otherClass fill:#fdc,stroke:#000,text-align:left,white-space:pre
linkStyle default stroke:#aaa
1v1[\"(1v1) <code>source_iter(1..=10)</code>"/]:::pullClass
2v1[\"(2v1) <code>source_iter(3..=5)</code>"/]:::pullClass
3v1[/"(3v1) <code>reduce(|a, b| *a = std::cmp::max(*a, b))</code>"\]:::pushClass
4v1[\"(4v1) <code>filter(|&amp;value| { value &lt;= max_of_stream2.unwrap_or(0) })</code>"/]:::pullClass
5v1[\"(5v1) <code>map(|x| (context.current_tick(), x))</code>"/]:::pullClass
6v1[/"(6v1) <code>for_each(|x| filter_send.send(x).unwrap())</code>"\]:::pushClass
7v1["(7v1) <code>handoff</code>"]:::otherClass
2v1-->7v1
5v1-->6v1
4v1-->5v1
1v1-->4v1
7v1--x3v1; linkStyle 4 stroke:red
3v1--x4v1; linkStyle 5 stroke:red
subgraph sg_1v1 ["sg_1v1 stratum 0"]
2v1
subgraph sg_1v1_var_stream2 ["var <tt>stream2</tt>"]
2v1
end
end
subgraph sg_2v1 ["sg_2v1 stratum 1"]
3v1
subgraph sg_2v1_var_max_of_stream2 ["var <tt>max_of_stream2</tt>"]
3v1
end
end
subgraph sg_3v1 ["sg_3v1 stratum 2"]
1v1
4v1
5v1
6v1
subgraph sg_3v1_var_filtered_stream1 ["var <tt>filtered_stream1</tt>"]
4v1
5v1
6v1
end
subgraph sg_3v1_var_stream1 ["var <tt>stream1</tt>"]
1v1
end
end

2 changes: 1 addition & 1 deletion hydroflow/tests/surface_lattice_fold.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ fn test_fold_loop() {
-> map(Max::new)
-> folder;
folder = union()
-> fold::<'static>(|| Max::<u8>::new(0), |accum, x| accum.merge(x))
-> fold::<'static>(|| Max::<u8>::new(0), |accum, x| { accum.merge(x); })
-> map(|x| Max::<u8>::new(x.into_reveal() + 1))
-> filter(|x| !x.is_top())
-> tee();
Expand Down
62 changes: 62 additions & 0 deletions hydroflow/tests/surface_singleton.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,65 @@ pub fn test_fold_singleton_push() {
&*collect_ready::<Vec<_>, _>(&mut filter_recv)
);
}

#[multiplatform_test]
pub fn test_reduce_singleton() {
let (filter_send, mut filter_recv) = hydroflow::util::unbounded_channel::<(usize, usize)>();
let (max_send, mut max_recv) = hydroflow::util::unbounded_channel::<(usize, usize)>();

let mut df = hydroflow::hydroflow_syntax! {
stream1 = source_iter(1..=10);
stream2 = source_iter(3..=5);
max_of_stream2 = stream2 -> reduce(|a, b| *a = std::cmp::max(*a, b));

filtered_stream1 = stream1
-> filter(|&value| {
// This is not monotonic.
value <= #max_of_stream2.unwrap_or(0)
})
-> map(|x| (context.current_tick(), x))
-> for_each(|x| filter_send.send(x).unwrap());

max_of_stream2
-> map(|x| (context.current_tick(), x))
-> for_each(|x| max_send.send(x).unwrap());
};

assert_graphvis_snapshots!(df);

df.run_available();

assert_eq!(
&[(0, 1), (0, 2), (0, 3), (0, 4), (0, 5)],
&*collect_ready::<Vec<_>, _>(&mut filter_recv)
);
assert_eq!(&[(0, 5)], &*collect_ready::<Vec<_>, _>(&mut max_recv));
}

#[multiplatform_test]
pub fn test_reduce_singleton_push() {
let (filter_send, mut filter_recv) = hydroflow::util::unbounded_channel::<(usize, usize)>();

let mut df = hydroflow::hydroflow_syntax! {
stream1 = source_iter(1..=10);
stream2 = source_iter(3..=5);
max_of_stream2 = stream2 -> reduce(|a, b| *a = std::cmp::max(*a, b));

filtered_stream1 = stream1
-> filter(|&value| {
// This is not monotonic.
value <= #max_of_stream2.unwrap_or(0)
})
-> map(|x| (context.current_tick(), x))
-> for_each(|x| filter_send.send(x).unwrap());
};

assert_graphvis_snapshots!(df);

df.run_available();

assert_eq!(
&[(0, 1), (0, 2), (0, 3), (0, 4), (0, 5)],
&*collect_ready::<Vec<_>, _>(&mut filter_recv)
);
}
44 changes: 23 additions & 21 deletions hydroflow_lang/src/graph/ops/fold.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,22 @@ use crate::diagnostic::{Diagnostic, Level};

/// > 1 input stream, 1 output stream
///
/// > Arguments: two arguments, both closures. The first closure is used to create the initial value for the accumulator, and the second is used to combine new values with the existing accumulator.
/// The second closure takes two two arguments: an 'accumulator', and an element.
/// > Arguments: two arguments, both closures. The first closure is used to create the initial
/// value for the accumulator, and the second is used to combine new items with the existing
/// accumulator value. The second closure takes two two arguments: an `&mut Accum` accumulated
/// value, and an `Item`.
///
/// Akin to Rust's built-in fold operator, except that it takes the accumulator by `&mut` instead of by value. Folds every element into an accumulator by applying a closure,
/// returning the final result.
/// Akin to Rust's built-in [`fold`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.fold)
/// operator, except that it takes the accumulator by `&mut` instead of by value. Folds every item
/// into an accumulator by applying a closure, returning the final result.
///
/// > Note: The closure has access to the [`context` object](surface_flows.md#the-context-object).
/// > Note: The closures have access to the [`context` object](surface_flows.md#the-context-object).
///
/// `fold` can also be provided with one generic lifetime persistence argument, either
/// `'tick` or `'static`, to specify how data persists. With `'tick`, values will only be collected
/// within the same tick. With `'static`, values will be remembered across ticks and will be
/// aggregated with pairs arriving in later ticks. When not explicitly specified persistence
/// defaults to `'tick`.
/// `'tick` or `'static`, to specify how data persists. With `'tick`, Items will only be collected
/// within the same tick. With `'static`, the accumulated value will be remembered across ticks and
/// will be aggregated with items arriving in later ticks. When not explicitly specified
/// persistence defaults to `'tick`.
///
/// ```hydroflow
/// // should print `Reassembled vector [1,2,3,4,5]`
Expand Down Expand Up @@ -74,14 +77,6 @@ pub const FOLD: OperatorConstraints = OperatorConstraints {
[a] => a,
_ => unreachable!(),
};

let input = &inputs[0];
let init = &arguments[0];
let func = &arguments[1];
let initializer_func_ident = wc.make_ident("initializer_func");
let accumulator_ident = wc.make_ident("accumulator");
let iterator_item_ident = wc.make_ident("iterator_item");

if Persistence::Mutable == persistence {
diagnostics.push(Diagnostic::spanned(
op_span,
Expand All @@ -91,6 +86,13 @@ pub const FOLD: OperatorConstraints = OperatorConstraints {
return Err(());
}

let input = &inputs[0];
let init = &arguments[0];
let func = &arguments[1];
let initializer_func_ident = wc.make_ident("initializer_func");
let accumulator_ident = wc.make_ident("accumulator");
let iterator_item_ident = wc.make_ident("iterator_item");

let tick_reset_code = if Persistence::Tick == persistence {
quote_spanned! {op_span=>
// Reset the value to the initializer fn if it is a new tick.
Expand All @@ -106,12 +108,12 @@ pub const FOLD: OperatorConstraints = OperatorConstraints {
};
let iterator_foreach = quote_spanned! {op_span=>
#[inline(always)]
fn call_comb_type<Accum, Item, Out>(
fn call_comb_type<Accum, Item>(
accum: &mut Accum,
item: Item,
func: impl Fn(&mut Accum, Item) -> Out
) -> Out {
(func)(accum, item)
func: impl Fn(&mut Accum, Item),
) {
(func)(accum, item);
}
#[allow(clippy::redundant_closure_call)]
call_comb_type(&mut *#accumulator_ident, #iterator_item_ident, #func);
Expand Down
Loading

0 comments on commit 5679bfb

Please sign in to comment.