Skip to content

Commit

Permalink
refactor(hydroflow_plus): move HfCompiled and friends to a module (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
shadaj authored Nov 6, 2024
1 parent 9f74405 commit e9d05bf
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 155 deletions.
2 changes: 1 addition & 1 deletion hydroflow_plus/src/builder/built.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ use std::marker::PhantomData;

use hydroflow_lang::graph::{eliminate_extra_unions_tees, HydroflowGraph};

use super::compiled::HfCompiled;
use super::deploy::{DeployFlow, DeployResult};
use crate::deploy::{ClusterSpec, Deploy, ExternalSpec, LocalDeploy, ProcessSpec};
use crate::ir::HfPlusLeaf;
use crate::location::{Cluster, ExternalProcess, Process};
use crate::HfCompiled;

pub struct BuiltFlow<'a> {
pub(super) ir: Vec<HfPlusLeaf>,
Expand Down
123 changes: 123 additions & 0 deletions hydroflow_plus/src/builder/compiled.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
use std::collections::BTreeMap;
use std::marker::PhantomData;

use hydroflow::scheduled::graph::Hydroflow;
use hydroflow_lang::graph::{partition_graph, HydroflowGraph};
use proc_macro2::TokenStream;
use quote::quote;
use stageleft::runtime_support::FreeVariable;
use stageleft::Quoted;

pub struct HfCompiled<'a, ID> {
pub(super) hydroflow_ir: BTreeMap<usize, HydroflowGraph>,
pub(super) extra_stmts: BTreeMap<usize, Vec<syn::Stmt>>,
pub(super) _phantom: PhantomData<&'a mut &'a ID>,
}

impl<ID> HfCompiled<'_, ID> {
pub fn hydroflow_ir(&self) -> &BTreeMap<usize, HydroflowGraph> {
&self.hydroflow_ir
}

pub fn take_ir(self) -> BTreeMap<usize, HydroflowGraph> {
self.hydroflow_ir
}
}

impl<'a> HfCompiled<'a, usize> {
pub fn with_dynamic_id(self, id: impl Quoted<'a, usize>) -> HfBuiltWithId<'a> {
let hydroflow_crate = proc_macro_crate::crate_name("hydroflow_plus")
.expect("hydroflow_plus should be present in `Cargo.toml`");
let root = match hydroflow_crate {
proc_macro_crate::FoundCrate::Itself => quote! { hydroflow_plus },
proc_macro_crate::FoundCrate::Name(name) => {
let ident = syn::Ident::new(&name, proc_macro2::Span::call_site());
quote! { #ident }
}
};

let mut conditioned_tokens = None;
for (subgraph_id, flat_graph) in self.hydroflow_ir {
let partitioned_graph =
partition_graph(flat_graph).expect("Failed to partition (cycle detected).");

let mut diagnostics = Vec::new();
let tokens = partitioned_graph.as_code(&root, true, quote::quote!(), &mut diagnostics);
let my_extra_stmts = self
.extra_stmts
.get(&subgraph_id)
.cloned()
.unwrap_or_default();

if let Some(conditioned_tokens) = conditioned_tokens.as_mut() {
*conditioned_tokens = syn::parse_quote! {
#conditioned_tokens else if __given_id == #subgraph_id {
#(#my_extra_stmts)*
#tokens
}
};
} else {
conditioned_tokens = Some(syn::parse_quote! {
if __given_id == #subgraph_id {
#(#my_extra_stmts)*
#tokens
}
});
}
}

let conditioned_tokens: TokenStream = conditioned_tokens.unwrap();
let id = id.splice_untyped();
HfBuiltWithId {
tokens: syn::parse_quote!({
let __given_id = #id;
#conditioned_tokens else {
panic!("Invalid node id: {}", __given_id);
}
}),
_phantom: PhantomData,
}
}
}

impl<'a> Quoted<'a, Hydroflow<'a>> for HfCompiled<'a, ()> {}

impl<'a> FreeVariable<Hydroflow<'a>> for HfCompiled<'a, ()> {
fn to_tokens(mut self) -> (Option<TokenStream>, Option<TokenStream>) {
let hydroflow_crate = proc_macro_crate::crate_name("hydroflow_plus")
.expect("hydroflow_plus should be present in `Cargo.toml`");
let root = match hydroflow_crate {
proc_macro_crate::FoundCrate::Itself => quote! { hydroflow_plus },
proc_macro_crate::FoundCrate::Name(name) => {
let ident = syn::Ident::new(&name, proc_macro2::Span::call_site());
quote! { #ident }
}
};

if self.hydroflow_ir.len() != 1 {
panic!("Expected exactly one subgraph in the Hydroflow IR");
}

let flat_graph = self.hydroflow_ir.remove(&0).unwrap();
let partitioned_graph =
partition_graph(flat_graph).expect("Failed to partition (cycle detected).");

let mut diagnostics = Vec::new();
let tokens = partitioned_graph.as_code(&root, true, quote::quote!(), &mut diagnostics);

(None, Some(tokens))
}
}

pub struct HfBuiltWithId<'a> {
tokens: TokenStream,
_phantom: PhantomData<&'a mut &'a ()>,
}

impl<'a> Quoted<'a, Hydroflow<'a>> for HfBuiltWithId<'a> {}

impl<'a> FreeVariable<Hydroflow<'a>> for HfBuiltWithId<'a> {
fn to_tokens(self) -> (Option<TokenStream>, Option<TokenStream>) {
(None, Some(self.tokens))
}
}
3 changes: 2 additions & 1 deletion hydroflow_plus/src/builder/deploy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ use serde::Serialize;
use stageleft::Quoted;

use super::built::build_inner;
use super::compiled::HfCompiled;
use crate::deploy::{ExternalSpec, LocalDeploy, Node, RegisterPort};
use crate::ir::HfPlusLeaf;
use crate::location::external_process::{
ExternalBincodeSink, ExternalBincodeStream, ExternalBytesPort,
};
use crate::location::{ExternalProcess, Location, LocationId};
use crate::{Cluster, ClusterSpec, Deploy, HfCompiled, Process, ProcessSpec};
use crate::{Cluster, ClusterSpec, Deploy, Process, ProcessSpec};

pub struct DeployFlow<'a, D: LocalDeploy<'a>> {
pub(super) ir: Vec<HfPlusLeaf>,
Expand Down
5 changes: 2 additions & 3 deletions hydroflow_plus/src/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::location::{Cluster, ExternalProcess, Process};
use crate::RuntimeContext;

pub mod built;
pub mod compiled;
pub mod deploy;

pub struct FlowStateInner {
Expand Down Expand Up @@ -151,8 +152,6 @@ impl<'a> FlowBuilder<'a> {
}

pub fn runtime_context(&self) -> RuntimeContext<'a> {
RuntimeContext {
_phantom: PhantomData,
}
RuntimeContext::new()
}
}
153 changes: 3 additions & 150 deletions hydroflow_plus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,16 @@

stageleft::stageleft_no_entry_crate!();

use std::collections::BTreeMap;
use std::marker::PhantomData;

use hydroflow::scheduled::context::Context;
pub use hydroflow::scheduled::graph::Hydroflow;
pub use hydroflow::*;
use lang::graph::{partition_graph, HydroflowGraph};
use proc_macro2::TokenStream;
use quote::quote;
use stageleft::runtime_support::FreeVariable;
use stageleft::Quoted;

pub mod runtime_support {
pub use bincode;
}

pub mod runtime_context;
pub use runtime_context::RuntimeContext;

pub mod stream;
pub use stream::{Bounded, Stream, Unbounded};

Expand Down Expand Up @@ -47,147 +41,6 @@ pub mod properties;

mod staging_util;

#[derive(Clone)]
pub struct RuntimeContext<'a> {
_phantom: PhantomData<&'a mut &'a ()>,
}

impl RuntimeContext<'_> {
pub fn new() -> Self {
Self {
_phantom: PhantomData,
}
}
}

impl Copy for RuntimeContext<'_> {}

impl Default for RuntimeContext<'_> {
fn default() -> Self {
Self::new()
}
}

impl<'a> FreeVariable<&'a Context> for RuntimeContext<'a> {
fn to_tokens(self) -> (Option<TokenStream>, Option<TokenStream>) {
(None, Some(quote!(&context)))
}
}

pub struct HfCompiled<'a, ID> {
hydroflow_ir: BTreeMap<usize, HydroflowGraph>,
extra_stmts: BTreeMap<usize, Vec<syn::Stmt>>,
_phantom: PhantomData<&'a mut &'a ID>,
}

impl<ID> HfCompiled<'_, ID> {
pub fn hydroflow_ir(&self) -> &BTreeMap<usize, HydroflowGraph> {
&self.hydroflow_ir
}

pub fn take_ir(self) -> BTreeMap<usize, HydroflowGraph> {
self.hydroflow_ir
}
}

impl<'a> HfCompiled<'a, usize> {
pub fn with_dynamic_id(self, id: impl Quoted<'a, usize>) -> HfBuiltWithId<'a> {
let hydroflow_crate = proc_macro_crate::crate_name("hydroflow_plus")
.expect("hydroflow_plus should be present in `Cargo.toml`");
let root = match hydroflow_crate {
proc_macro_crate::FoundCrate::Itself => quote! { hydroflow_plus },
proc_macro_crate::FoundCrate::Name(name) => {
let ident = syn::Ident::new(&name, proc_macro2::Span::call_site());
quote! { #ident }
}
};

let mut conditioned_tokens = None;
for (subgraph_id, flat_graph) in self.hydroflow_ir {
let partitioned_graph =
partition_graph(flat_graph).expect("Failed to partition (cycle detected).");

let mut diagnostics = Vec::new();
let tokens = partitioned_graph.as_code(&root, true, quote::quote!(), &mut diagnostics);
let my_extra_stmts = self
.extra_stmts
.get(&subgraph_id)
.cloned()
.unwrap_or_default();

if let Some(conditioned_tokens) = conditioned_tokens.as_mut() {
*conditioned_tokens = syn::parse_quote! {
#conditioned_tokens else if __given_id == #subgraph_id {
#(#my_extra_stmts)*
#tokens
}
};
} else {
conditioned_tokens = Some(syn::parse_quote! {
if __given_id == #subgraph_id {
#(#my_extra_stmts)*
#tokens
}
});
}
}

let conditioned_tokens: TokenStream = conditioned_tokens.unwrap();
let id = id.splice_untyped();
HfBuiltWithId {
tokens: syn::parse_quote!({
let __given_id = #id;
#conditioned_tokens else {
panic!("Invalid node id: {}", __given_id);
}
}),
_phantom: PhantomData,
}
}
}

impl<'a> Quoted<'a, Hydroflow<'a>> for HfCompiled<'a, ()> {}

impl<'a> FreeVariable<Hydroflow<'a>> for HfCompiled<'a, ()> {
fn to_tokens(mut self) -> (Option<TokenStream>, Option<TokenStream>) {
let hydroflow_crate = proc_macro_crate::crate_name("hydroflow_plus")
.expect("hydroflow_plus should be present in `Cargo.toml`");
let root = match hydroflow_crate {
proc_macro_crate::FoundCrate::Itself => quote! { hydroflow_plus },
proc_macro_crate::FoundCrate::Name(name) => {
let ident = syn::Ident::new(&name, proc_macro2::Span::call_site());
quote! { #ident }
}
};

if self.hydroflow_ir.len() != 1 {
panic!("Expected exactly one subgraph in the Hydroflow IR");
}

let flat_graph = self.hydroflow_ir.remove(&0).unwrap();
let partitioned_graph =
partition_graph(flat_graph).expect("Failed to partition (cycle detected).");

let mut diagnostics = Vec::new();
let tokens = partitioned_graph.as_code(&root, true, quote::quote!(), &mut diagnostics);

(None, Some(tokens))
}
}

pub struct HfBuiltWithId<'a> {
tokens: TokenStream,
_phantom: PhantomData<&'a mut &'a ()>,
}

impl<'a> Quoted<'a, Hydroflow<'a>> for HfBuiltWithId<'a> {}

impl<'a> FreeVariable<Hydroflow<'a>> for HfBuiltWithId<'a> {
fn to_tokens(self) -> (Option<TokenStream>, Option<TokenStream>) {
(None, Some(self.tokens))
}
}

#[stageleft::runtime]
#[cfg(test)]
mod tests {
Expand Down
33 changes: 33 additions & 0 deletions hydroflow_plus/src/runtime_context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use std::marker::PhantomData;

use hydroflow::scheduled::context::Context;
use proc_macro2::TokenStream;
use quote::quote;
use stageleft::runtime_support::FreeVariable;

#[derive(Clone)]
pub struct RuntimeContext<'a> {
_phantom: PhantomData<&'a mut &'a ()>,
}

impl RuntimeContext<'_> {
pub fn new() -> Self {
Self {
_phantom: PhantomData,
}
}
}

impl Copy for RuntimeContext<'_> {}

impl Default for RuntimeContext<'_> {
fn default() -> Self {
Self::new()
}
}

impl<'a> FreeVariable<&'a Context> for RuntimeContext<'a> {
fn to_tokens(self) -> (Option<TokenStream>, Option<TokenStream>) {
(None, Some(quote!(&context)))
}
}

0 comments on commit e9d05bf

Please sign in to comment.