Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: loop { scheduler #1692

Open
wants to merge 13 commits into
base: main
Choose a base branch
from

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ fn main() {
use hydroflow::{var_expr, var_args};
let mut df = hydroflow::scheduled::graph::Dfir::new();
df.__assign_meta_graph(
"{\"nodes\":[{\"value\":null,\"version\":0},{\"value\":null,\"version\":2},{\"value\":{\"Operator\":\"unique :: < 'tick > ()\"},\"version\":1},{\"value\":null,\"version\":2},{\"value\":null,\"version\":2},{\"value\":{\"Operator\":\"unique :: < 'tick > ()\"},\"version\":1},{\"value\":{\"Handoff\":{}},\"version\":3},{\"value\":{\"Operator\":\"source_stream (ints)\"},\"version\":1},{\"value\":{\"Operator\":\"for_each (| v | result . send (v) . unwrap ())\"},\"version\":1},{\"value\":{\"Operator\":\"map (| row : (_ , _ ,) | ((row . 0 % 2 ,) , (row . 1 ,)))\"},\"version\":1},{\"value\":{\"Operator\":\"fold_keyed :: < 'tick , (_ ,) , (Option < _ > ,) > (| | (None ,) , | old : & mut (Option < _ > ,) , val : (_ ,) | { old . 0 = if let Some (prev) = old . 0 . take () { Some (prev + val . 0) } else { Some (val . 0) } ; })\"},\"version\":1},{\"value\":{\"Operator\":\"map (| (g , a) : ((_ ,) , _) | (g . 0 , a . 0 . unwrap () ,))\"},\"version\":1}],\"graph\":[{\"value\":null,\"version\":0},{\"value\":[{\"idx\":7,\"version\":1},{\"idx\":2,\"version\":1}],\"version\":3},{\"value\":null,\"version\":2},{\"value\":[{\"idx\":11,\"version\":1},{\"idx\":5,\"version\":1}],\"version\":3},{\"value\":[{\"idx\":6,\"version\":3},{\"idx\":10,\"version\":1}],\"version\":3},{\"value\":null,\"version\":2},{\"value\":[{\"idx\":5,\"version\":1},{\"idx\":8,\"version\":1}],\"version\":3},{\"value\":null,\"version\":2},{\"value\":[{\"idx\":10,\"version\":1},{\"idx\":11,\"version\":1}],\"version\":1},{\"value\":[{\"idx\":9,\"version\":1},{\"idx\":6,\"version\":3}],\"version\":3},{\"value\":[{\"idx\":2,\"version\":1},{\"idx\":9,\"version\":1}],\"version\":3}],\"ports\":[{\"value\":null,\"version\":0},{\"value\":[\"Elided\",\"Elided\"],\"version\":3},{\"value\":null,\"version\":0},{\"value\":[\"Elided\",\"Elided\"],\"version\":3},{\"value\":[\"Elided\",\"Elided\"],\"version\":3},{\"value\":null,\"version\":0},{\"value\":[\"Elided\",\"Elided\"],\"version\":3},{\"value\":null,\"version\":0},{\"value\":[\"Elided\",\"Elided\"],\"version\":1},{\"value\":[\"Elided\",\"Elided\"],\"version\":3},{\"value\":[\"Elided\",\"Elided\"],\"version\":3}],\"node_loops\":[{\"value\":null,\"version\":0}],\"loop_nodes\":[{\"value\":null,\"version\":0}],\"loop_parent\":[{\"value\":null,\"version\":0}],\"loop_children\":[{\"value\":null,\"version\":0}],\"node_subgraph\":[{\"value\":null,\"version\":0},{\"value\":null,\"version\":0},{\"value\":{\"idx\":2,\"version\":1},\"version\":1},{\"value\":null,\"version\":0},{\"value\":null,\"version\":0},{\"value\":{\"idx\":1,\"version\":1},\"version\":1},{\"value\":null,\"version\":0},{\"value\":{\"idx\":2,\"version\":1},\"version\":1},{\"value\":{\"idx\":1,\"version\":1},\"version\":1},{\"value\":{\"idx\":2,\"version\":1},\"version\":1},{\"value\":{\"idx\":1,\"version\":1},\"version\":1},{\"value\":{\"idx\":1,\"version\":1},\"version\":1}],\"subgraph_nodes\":[{\"value\":null,\"version\":0},{\"value\":[{\"idx\":10,\"version\":1},{\"idx\":11,\"version\":1},{\"idx\":5,\"version\":1},{\"idx\":8,\"version\":1}],\"version\":1},{\"value\":[{\"idx\":7,\"version\":1},{\"idx\":2,\"version\":1},{\"idx\":9,\"version\":1}],\"version\":1}],\"subgraph_stratum\":[{\"value\":null,\"version\":0},{\"value\":1,\"version\":1},{\"value\":0,\"version\":1}],\"node_singleton_references\":[{\"value\":null,\"version\":0},{\"value\":[],\"version\":1},{\"value\":[],\"version\":1},{\"value\":[],\"version\":1},{\"value\":[],\"version\":1},{\"value\":[],\"version\":1},{\"value\":[],\"version\":1},{\"value\":[],\"version\":1},{\"value\":[],\"version\":1},{\"value\":[],\"version\":1},{\"value\":[],\"version\":1},{\"value\":[],\"version\":1}],\"node_varnames\":[{\"value\":null,\"version\":0},{\"value\":null,\"version\":0},{\"value\":\"ints_insert\",\"version\":1},{\"value\":null,\"version\":0},{\"value\":null,\"version\":0},{\"value\":\"result_insert\",\"version\":1}],\"subgraph_laziness\":[{\"value\":null,\"version\":0}]}",
"{\"nodes\":[{\"value\":null,\"version\":0},{\"value\":null,\"version\":2},{\"value\":{\"Operator\":\"unique :: < 'tick > ()\"},\"version\":1},{\"value\":null,\"version\":2},{\"value\":null,\"version\":2},{\"value\":{\"Operator\":\"unique :: < 'tick > ()\"},\"version\":1},{\"value\":{\"Handoff\":{}},\"version\":3},{\"value\":{\"Operator\":\"source_stream (ints)\"},\"version\":1},{\"value\":{\"Operator\":\"for_each (| v | result . send (v) . unwrap ())\"},\"version\":1},{\"value\":{\"Operator\":\"map (| row : (_ , _ ,) | ((row . 0 % 2 ,) , (row . 1 ,)))\"},\"version\":1},{\"value\":{\"Operator\":\"fold_keyed :: < 'tick , (_ ,) , (Option < _ > ,) > (| | (None ,) , | old : & mut (Option < _ > ,) , val : (_ ,) | { old . 0 = if let Some (prev) = old . 0 . take () { Some (prev + val . 0) } else { Some (val . 0) } ; })\"},\"version\":1},{\"value\":{\"Operator\":\"map (| (g , a) : ((_ ,) , _) | (g . 0 , a . 0 . unwrap () ,))\"},\"version\":1}],\"graph\":[{\"value\":null,\"version\":0},{\"value\":[{\"idx\":7,\"version\":1},{\"idx\":2,\"version\":1}],\"version\":3},{\"value\":null,\"version\":2},{\"value\":[{\"idx\":11,\"version\":1},{\"idx\":5,\"version\":1}],\"version\":3},{\"value\":[{\"idx\":6,\"version\":3},{\"idx\":10,\"version\":1}],\"version\":3},{\"value\":null,\"version\":2},{\"value\":[{\"idx\":5,\"version\":1},{\"idx\":8,\"version\":1}],\"version\":3},{\"value\":null,\"version\":2},{\"value\":[{\"idx\":10,\"version\":1},{\"idx\":11,\"version\":1}],\"version\":1},{\"value\":[{\"idx\":9,\"version\":1},{\"idx\":6,\"version\":3}],\"version\":3},{\"value\":[{\"idx\":2,\"version\":1},{\"idx\":9,\"version\":1}],\"version\":3}],\"ports\":[{\"value\":null,\"version\":0},{\"value\":[\"Elided\",\"Elided\"],\"version\":3},{\"value\":null,\"version\":0},{\"value\":[\"Elided\",\"Elided\"],\"version\":3},{\"value\":[\"Elided\",\"Elided\"],\"version\":3},{\"value\":null,\"version\":0},{\"value\":[\"Elided\",\"Elided\"],\"version\":3},{\"value\":null,\"version\":0},{\"value\":[\"Elided\",\"Elided\"],\"version\":1},{\"value\":[\"Elided\",\"Elided\"],\"version\":3},{\"value\":[\"Elided\",\"Elided\"],\"version\":3}],\"node_loops\":[{\"value\":null,\"version\":0}],\"loop_nodes\":[{\"value\":null,\"version\":0}],\"loop_parent\":[{\"value\":null,\"version\":0}],\"root_loops\":[],\"loop_children\":[{\"value\":null,\"version\":0}],\"node_subgraph\":[{\"value\":null,\"version\":0},{\"value\":null,\"version\":0},{\"value\":{\"idx\":2,\"version\":1},\"version\":1},{\"value\":null,\"version\":0},{\"value\":null,\"version\":0},{\"value\":{\"idx\":1,\"version\":1},\"version\":1},{\"value\":null,\"version\":0},{\"value\":{\"idx\":2,\"version\":1},\"version\":1},{\"value\":{\"idx\":1,\"version\":1},\"version\":1},{\"value\":{\"idx\":2,\"version\":1},\"version\":1},{\"value\":{\"idx\":1,\"version\":1},\"version\":1},{\"value\":{\"idx\":1,\"version\":1},\"version\":1}],\"subgraph_nodes\":[{\"value\":null,\"version\":0},{\"value\":[{\"idx\":10,\"version\":1},{\"idx\":11,\"version\":1},{\"idx\":5,\"version\":1},{\"idx\":8,\"version\":1}],\"version\":1},{\"value\":[{\"idx\":7,\"version\":1},{\"idx\":2,\"version\":1},{\"idx\":9,\"version\":1}],\"version\":1}],\"subgraph_stratum\":[{\"value\":null,\"version\":0},{\"value\":1,\"version\":1},{\"value\":0,\"version\":1}],\"node_singleton_references\":[{\"value\":null,\"version\":0},{\"value\":[],\"version\":1},{\"value\":[],\"version\":1},{\"value\":[],\"version\":1},{\"value\":[],\"version\":1},{\"value\":[],\"version\":1},{\"value\":[],\"version\":1},{\"value\":[],\"version\":1},{\"value\":[],\"version\":1},{\"value\":[],\"version\":1},{\"value\":[],\"version\":1},{\"value\":[],\"version\":1}],\"node_varnames\":[{\"value\":null,\"version\":0},{\"value\":null,\"version\":0},{\"value\":\"ints_insert\",\"version\":1},{\"value\":null,\"version\":0},{\"value\":null,\"version\":0},{\"value\":\"result_insert\",\"version\":1}],\"subgraph_laziness\":[{\"value\":null,\"version\":0}]}",
);
df.__assign_diagnostics("[]");
let (hoff_6v3_send, hoff_6v3_recv) = df
Expand Down Expand Up @@ -56,12 +56,13 @@ fn main() {
>::default(),
),
);
df.add_subgraph_stratified(
df.add_subgraph_full(
"Subgraph GraphSubgraphId(2v1)",
0,
var_expr!(),
var_expr!(hoff_6v3_send),
false,
None,
move |context, var_args!(), var_args!(hoff_6v3_send)| {
let hoff_6v3_send = hydroflow::pusherator::for_each::ForEach::new(|
v|
Expand Down Expand Up @@ -206,12 +207,13 @@ fn main() {
pivot_run_sg_2v1(op_9v1, hoff_6v3_send);
},
);
df.add_subgraph_stratified(
df.add_subgraph_full(
"Subgraph GraphSubgraphId(1v1)",
1,
var_expr!(hoff_6v3_recv),
var_expr!(),
false,
None,
move |context, var_args!(hoff_6v3_recv), var_args!()| {
let mut hoff_6v3_recv = hoff_6v3_recv.borrow_mut_swap();
let hoff_6v3_recv = hoff_6v3_recv.drain(..);
Expand Down

Large diffs are not rendered by default.

Loading
Loading