Skip to content

Commit

Permalink
feat(hydroflow_plus)!: add an explicit API for creating tick contexts (
Browse files Browse the repository at this point in the history
…#1550)

Previously, each location had a (semantic) global clock that drives
ticks, and so all streams in a tick domain were all in the same atomic
block. For future optimizations, we'd like developers to be able to
place streams on the same location into different clocks to eliminate
synchronization between them, which in turn would allow the computations
in those separate clocks to be potentially decoupled across machines.
  • Loading branch information
shadaj authored Nov 8, 2024
1 parent b43e6c7 commit 5d5209b
Show file tree
Hide file tree
Showing 24 changed files with 437 additions and 288 deletions.
4 changes: 4 additions & 0 deletions hydroflow_plus/src/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ pub struct FlowStateInner {

/// Counters for generating identifiers for cycles.
pub(crate) cycle_counts: HashMap<usize, usize>,

/// Counters for clock IDs.
pub(crate) next_clock_id: usize,
}

pub type FlowState = Rc<RefCell<FlowStateInner>>;
Expand Down Expand Up @@ -73,6 +76,7 @@ impl<'a> FlowBuilder<'a> {
leaves: Some(vec![]),
next_external_out: 0,
cycle_counts: HashMap::new(),
next_clock_id: 0,
})),
nodes: RefCell::new(vec![]),
clusters: RefCell::new(vec![]),
Expand Down
10 changes: 8 additions & 2 deletions hydroflow_plus/src/ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,10 @@ impl HfPlusLeaf {
let (input_ident, input_location_id) =
input.emit(graph_builders, built_tees, next_stmt_id);

let location_id = match location_kind {
let location_id = match location_kind.root() {
LocationId::Process(id) => id,
LocationId::Cluster(id) => id,
LocationId::Tick(_, _) => panic!(),
LocationId::ExternalProcess(_) => panic!(),
};

Expand Down Expand Up @@ -586,6 +587,7 @@ impl<'a> HfPlusNode {
let location_id = match location_kind {
LocationId::Process(id) => id,
LocationId::Cluster(id) => id,
LocationId::Tick(_, _) => panic!(),
LocationId::ExternalProcess(id) => id,
};

Expand Down Expand Up @@ -635,9 +637,10 @@ impl<'a> HfPlusNode {
ident,
location_kind,
} => {
let location_id = match location_kind {
let location_id = match location_kind.root() {
LocationId::Process(id) => id,
LocationId::Cluster(id) => id,
LocationId::Tick(_, _) => panic!(),
LocationId::ExternalProcess(_) => panic!(),
};

Expand Down Expand Up @@ -1161,6 +1164,7 @@ impl<'a> HfPlusNode {
let to_id = match to_location {
LocationId::Process(id) => id,
LocationId::Cluster(id) => id,
LocationId::Tick(_, _) => panic!(),
LocationId::ExternalProcess(id) => id,
};

Expand Down Expand Up @@ -1355,6 +1359,8 @@ fn instantiate_network<'a, D: Deploy<'a> + 'a>(
(LocationId::Cluster(_from), LocationId::ExternalProcess(_to)) => {
todo!("NYI")
}
(LocationId::Tick(_, _), _) => panic!(),
(_, LocationId::Tick(_, _)) => panic!(),
};
(sink, source, connect_fn)
}
171 changes: 22 additions & 149 deletions hydroflow_plus/src/location/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,9 @@ use proc_macro2::Span;
use stageleft::{q, Quoted};

use super::builder::FlowState;
use crate::cycle::{
CycleCollection, CycleCollectionWithInitial, DeferTick, ForwardRef, HfCycle, HfForwardRef,
TickCycle,
};
use crate::cycle::{CycleCollection, ForwardRef, HfForwardRef};
use crate::ir::{HfPlusNode, HfPlusSource};
use crate::{Bounded, Optional, Singleton, Stream, Unbounded};
use crate::{Singleton, Stream, Unbounded};

pub mod external_process;
pub use external_process::ExternalProcess;
Expand All @@ -30,18 +27,29 @@ pub use can_send::CanSend;
pub mod tick;
pub use tick::{NoTick, Tick};

#[derive(PartialEq, Eq, Clone, Copy, Debug)]
#[derive(PartialEq, Eq, Clone, Debug)]
pub enum LocationId {
Process(usize),
Cluster(usize),
Tick(usize, Box<LocationId>),
ExternalProcess(usize),
}

impl LocationId {
pub fn root(&self) -> &LocationId {
match self {
LocationId::Process(_) => self,
LocationId::Cluster(_) => self,
LocationId::Tick(_, id) => id.root(),
LocationId::ExternalProcess(_) => self,
}
}

pub fn raw_id(&self) -> usize {
match self {
LocationId::Process(id) => *id,
LocationId::Cluster(id) => *id,
LocationId::Tick(_, _) => panic!("cannot get raw id for tick"),
LocationId::ExternalProcess(id) => *id,
}
}
Expand All @@ -58,11 +66,16 @@ pub trait Location<'a>: Clone {

fn is_top_level() -> bool;

fn nest(&self) -> Tick<Self>
fn tick(&self) -> Tick<Self>
where
Self: NoTick,
{
Tick { l: self.clone() }
let next_id = self.flow_state().borrow_mut().next_clock_id;
self.flow_state().borrow_mut().next_clock_id += 1;
Tick {
id: next_id,
l: self.clone(),
}
}

fn spin(&self) -> Stream<(), Unbounded, Self>
Expand All @@ -78,19 +91,6 @@ pub trait Location<'a>: Clone {
)
}

fn spin_batch(
&self,
batch_size: impl Quoted<'a, usize> + Copy + 'a,
) -> Stream<(), Bounded, Tick<Self>>
where
Self: Sized + NoTick,
{
self.spin()
.flat_map(q!(move |_| 0..batch_size))
.map(q!(|_| ()))
.tick_batch()
}

fn source_stream<T, E: FuturesStream<Item = T> + Unpin>(
&self,
e: impl Quoted<'a, E>,
Expand Down Expand Up @@ -153,35 +153,6 @@ pub trait Location<'a>: Clone {
)
}

fn singleton_each_tick<T: Clone>(
&self,
e: impl Quoted<'a, T>,
) -> Singleton<T, Bounded, Tick<Self>>
where
Self: Sized + NoTick,
{
self.singleton(e).latest_tick()
}

fn singleton_first_tick<T: Clone>(
&self,
e: impl Quoted<'a, T>,
) -> Optional<T, Bounded, Tick<Self>>
where
Self: Sized + NoTick,
{
let e_arr = q!([e]);
let e = e_arr.splice_untyped();

Optional::new(
self.clone().nest(),
HfPlusNode::Source {
source: HfPlusSource::Iter(e.into()),
location_kind: self.id(),
},
)
}

fn source_interval(
&self,
interval: impl Quoted<'a, Duration> + Copy + 'a,
Expand Down Expand Up @@ -217,6 +188,7 @@ pub trait Location<'a>: Clone {
let on_id = match self.id() {
LocationId::Process(id) => id,
LocationId::Cluster(id) => id,
LocationId::Tick(_, _) => panic!(),
LocationId::ExternalProcess(_) => panic!(),
};

Expand All @@ -238,103 +210,4 @@ pub trait Location<'a>: Clone {
S::create_source(ident, self.clone()),
)
}

fn tick_forward_ref<S: CycleCollection<'a, ForwardRef, Location = Tick<Self>>>(
&self,
) -> (HfForwardRef<'a, S>, S)
where
Self: NoTick,
{
let next_id = {
let on_id = match self.id() {
LocationId::Process(id) => id,
LocationId::Cluster(id) => id,
LocationId::ExternalProcess(_) => panic!(),
};

let mut flow_state = self.flow_state().borrow_mut();
let next_id_entry = flow_state.cycle_counts.entry(on_id).or_default();

let id = *next_id_entry;
*next_id_entry += 1;
id
};

let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());

(
HfForwardRef {
ident: ident.clone(),
_phantom: PhantomData,
},
S::create_source(ident, self.nest().clone()),
)
}

fn tick_cycle<S: CycleCollection<'a, TickCycle, Location = Tick<Self>> + DeferTick>(
&self,
) -> (HfCycle<'a, S>, S)
where
Self: NoTick,
{
let next_id = {
let on_id = match self.id() {
LocationId::Process(id) => id,
LocationId::Cluster(id) => id,
LocationId::ExternalProcess(_) => panic!(),
};

let mut flow_state = self.flow_state().borrow_mut();
let next_id_entry = flow_state.cycle_counts.entry(on_id).or_default();

let id = *next_id_entry;
*next_id_entry += 1;
id
};

let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());

(
HfCycle {
ident: ident.clone(),
_phantom: PhantomData,
},
S::create_source(ident, self.nest().clone()),
)
}

fn tick_cycle_with_initial<
S: CycleCollectionWithInitial<'a, TickCycle, Location = Tick<Self>> + DeferTick,
>(
&self,
initial: S,
) -> (HfCycle<'a, S>, S)
where
Self: NoTick,
{
let next_id = {
let on_id = match self.id() {
LocationId::Process(id) => id,
LocationId::Cluster(id) => id,
LocationId::ExternalProcess(_) => panic!(),
};

let mut flow_state = self.flow_state().borrow_mut();
let next_id_entry = flow_state.cycle_counts.entry(on_id).or_default();

let id = *next_id_entry;
*next_id_entry += 1;
id
};

let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());

(
HfCycle {
ident: ident.clone(),
_phantom: PhantomData,
},
S::create_source(ident, initial, self.nest().clone()),
)
}
}
Loading

0 comments on commit 5d5209b

Please sign in to comment.