Skip to content

Commit

Permalink
docs(hydro_lang): document APIs for Stream (#1689)
Browse files Browse the repository at this point in the history
  • Loading branch information
jhellerstein authored Jan 30, 2025
1 parent 070b6e0 commit d2a1f38
Showing 1 changed file with 172 additions and 8 deletions.
180 changes: 172 additions & 8 deletions hydro_lang/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,6 @@ impl<'a, T, L: Location<'a>, B, Order> Stream<T, L, B, Order> {
/// let words = process.source_iter(q!(vec!["hello", "world"]));
/// words.map(q!(|x| x.to_uppercase()))
/// # }, |mut stream| async move {
/// // HELLO, WORLD
/// # for w in vec!["HELLO", "WORLD"] {
/// # assert_eq!(stream.next().await.unwrap(), w);
/// # }
Expand Down Expand Up @@ -481,7 +480,7 @@ impl<'a, T, L: Location<'a>, B, Order> Stream<T, L, B, Order> {
/// .timestamped(&tick)
/// .tick_batch()
/// };
/// let count = batch.clone().count();
/// let count = batch.clone().count(); // `count()` returns a singleton
/// batch.cross_singleton(count).all_ticks().drop_timestamp()
/// # }, |mut stream| async move {
/// // (1, 4), (2, 4), (3, 4), (4, 4)
Expand Down Expand Up @@ -515,14 +514,29 @@ impl<'a, T, L: Location<'a>, B, Order> Stream<T, L, B, Order> {
.map(q!(|(d, _signal)| d))
}

/// Allow this stream through if the other stream is empty, otherwise the output is empty.
/// Allow this stream through if the argument (a Bounded Optional) is empty, otherwise the output is empty.
pub fn continue_unless<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, Order> {
self.continue_if(other.into_stream().count().filter(q!(|c| *c == 0)))
}

/// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all
/// tupled pairs.
pub fn cross_product<O>(self, other: Stream<O, L, B, Order>) -> Stream<(T, O), L, B, Order>
/// tupled pairs in a non-deterministic order.
///
/// # Example
/// ```rust
/// # use hydro_lang::*;
/// # use std::collections::HashSet;
/// # use dfir_rs::futures::StreamExt;
/// # tokio_test::block_on(test_util::stream_transform_test(|process| {
/// let tick = process.tick();
/// let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
/// let stream2 = process.source_iter(q!(vec![1, 2, 3]));
/// stream1.cross_product(stream2)
/// # }, |mut stream| async move {
/// # let expected = HashSet::from([('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)]);
/// # stream.map(|i| assert!(expected.contains(&i)));
/// # }));
pub fn cross_product<O>(self, other: Stream<O, L, B, Order>) -> Stream<(T, O), L, B, NoOrder>
where
T: Clone,
O: Clone,
Expand All @@ -541,6 +555,19 @@ impl<'a, T, L: Location<'a>, B, Order> Stream<T, L, B, Order> {

/// Takes one stream as input and filters out any duplicate occurrences. The output
/// contains all unique values from the input.
///
/// # Example
/// ```rust
/// # use hydro_lang::*;
/// # use dfir_rs::futures::StreamExt;
/// # tokio_test::block_on(test_util::stream_transform_test(|process| {
/// let tick = process.tick();
/// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
/// # }, |mut stream| async move {
/// # for w in vec![1, 2, 3, 4] {
/// # assert_eq!(stream.next().await.unwrap(), w);
/// # }
/// # }));
pub fn unique(self) -> Stream<T, L, B, Order>
where
T: Eq + Hash,
Expand All @@ -558,6 +585,30 @@ impl<'a, T, L: Location<'a>, B, Order> Stream<T, L, B, Order> {
///
/// The `other` stream must be [`Bounded`], since this function will wait until
/// all its elements are available before producing any output.
/// # Example
/// ```rust
/// # use hydro_lang::*;
/// # use dfir_rs::futures::StreamExt;
/// # tokio_test::block_on(test_util::stream_transform_test(|process| {
/// let tick = process.tick();
/// let stream = unsafe {
/// process
/// .source_iter(q!(vec![ 1, 2, 3, 4 ]))
/// .timestamped(&tick)
/// .tick_batch()
/// };
/// let batch = unsafe {
/// process
/// .source_iter(q!(vec![1, 2]))
/// .timestamped(&tick)
/// .tick_batch()
/// };
/// stream.filter_not_in(batch).all_ticks().drop_timestamp()
/// # }, |mut stream| async move {
/// # for w in vec![3, 4] {
/// # assert_eq!(stream.next().await.unwrap(), w);
/// # }
/// # }));
pub fn filter_not_in<O2>(self, other: Stream<T, L, Bounded, O2>) -> Stream<T, L, Bounded, Order>
where
T: Eq + Hash,
Expand All @@ -577,6 +628,21 @@ impl<'a, T, L: Location<'a>, B, Order> Stream<T, L, B, Order> {
/// An operator which allows you to "inspect" each element of a stream without
/// modifying it. The closure `f` is called on a reference to each item. This is
/// mainly useful for debugging, and should not be used to generate side-effects.
///
/// # Example
/// ```rust
/// # use hydro_lang::*;
/// # use dfir_rs::futures::StreamExt;
/// # tokio_test::block_on(test_util::stream_transform_test(|process| {
/// let nums = process.source_iter(q!(vec![1, 2]));
/// // prints "1 * 10 = 10" and "2 * 10 = 20"
/// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
/// # }, |mut stream| async move {
/// # for w in vec![1, 2] {
/// # assert_eq!(stream.next().await.unwrap(), w);
/// # }
/// # }));
/// ```
pub fn inspect<F: Fn(&T) + 'a>(
self,
f: impl IntoQuotedMut<'a, F, L>,
Expand Down Expand Up @@ -618,6 +684,31 @@ impl<'a, T, L: Location<'a>, B, Order> Stream<T, L, B, Order> {
/// This function is used as an escape hatch, and any mistakes in the
/// provided ordering guarantee will propagate into the guarantees
/// for the rest of the program.
///
/// # Example
/// # TODO: more sensible code after Shadaj merges
/// ```rust
/// # use hydro_lang::*;
/// # use std::collections::HashSet;
/// # use dfir_rs::futures::StreamExt;
/// # tokio_test::block_on(test_util::stream_transform_test(|process| {
/// let nums = process.source_iter(q!({
/// let now = std::time::SystemTime::now();
/// match now.elapsed().unwrap().as_secs() % 2 {
/// 0 => vec![5, 4, 3, 2, 1],
/// _ => vec![1, 2, 3, 4, 5],
/// }
/// .into_iter()
/// }));
/// // despite being generated by `source_iter`, the order of `nums` across runs is non-deterministic
/// let stream = unsafe { nums.assume_ordering::<NoOrder>() };
/// stream
/// # }, |mut stream| async move {
/// # for w in vec![1, 2, 3, 4, 5] {
/// # assert!((1..=5).contains(&stream.next().await.unwrap()));
/// # }
/// # }));
/// ```
pub unsafe fn assume_ordering<O>(self) -> Stream<T, L, B, O> {
Stream::new(self.location, self.ir_node.into_inner())
}
Expand Down Expand Up @@ -651,7 +742,7 @@ impl<'a, T, L: Location<'a>, B, Order> Stream<T, L, B, Order>
where
Order: MinOrder<NoOrder, Min = NoOrder>,
{
/// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
/// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
/// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
/// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
///
Expand Down Expand Up @@ -874,6 +965,23 @@ where
}

impl<'a, T, L: Location<'a>, B> Stream<T, L, B, TotalOrder> {
/// Returns a stream with the current count tupled with each element in the input stream.
///
/// # Example
/// ```rust
/// # use hydro_lang::*;
/// # use dfir_rs::futures::StreamExt;
/// # tokio_test::block_on(test_util::stream_transform_test::<_, _, TotalOrder>(|process| {
/// let tick = process.tick();
/// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
/// numbers.enumerate()
/// # }, |mut stream| async move {
/// // (0, 1), (1, 2), (2, 3), (3, 4)
/// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
/// # assert_eq!(stream.next().await.unwrap(), w);
/// # }
/// # }));
/// ```
pub fn enumerate(self) -> Stream<(usize, T), L, B, TotalOrder> {
if L::is_top_level() {
Stream::new(
Expand Down Expand Up @@ -1002,7 +1110,7 @@ impl<'a, T, L: Location<'a>, B> Stream<T, L, B, TotalOrder> {
Singleton::new(self.location, core)
}

/// Combines elements of the stream into a [`Optional`], by starting with the first element in the stream,
/// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
/// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
/// until the first element in the input arrives.
///
Expand Down Expand Up @@ -1178,6 +1286,22 @@ impl<'a, T, L: Location<'a>, Order> Stream<T, L, Bounded, Order> {
impl<'a, K, V1, L: Location<'a>, B, Order> Stream<(K, V1), L, B, Order> {
/// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
/// by equi-joining the two streams on the key attribute `K`.
///
/// # Example
/// ```rust
/// # use hydro_lang::*;
/// # use std::collections::HashSet;
/// # use dfir_rs::futures::StreamExt;
/// # tokio_test::block_on(test_util::stream_transform_test(|process| {
/// let tick = process.tick();
/// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
/// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
/// stream1.join(stream2)
/// # }, |mut stream| async move {
/// // (1, ('a', 'x')), (2, ('b', 'y'))
/// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
/// # stream.map(|i| assert!(expected.contains(&i)));
/// # }));
pub fn join<V2, O2>(self, n: Stream<(K, V2), L, B, O2>) -> Stream<(K, (V1, V2)), L, B, NoOrder>
where
K: Eq + Hash,
Expand All @@ -1194,10 +1318,35 @@ impl<'a, K, V1, L: Location<'a>, B, Order> Stream<(K, V1), L, B, Order> {
)
}

/// Given two streams of pairs `(K, V1)` and `(K, V2)`,
/// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
/// computes the anti-join of the items in the input -- i.e. returns
/// unique items in the first input that do not have a matching key
/// in the second input.
///
/// # Example
/// ```rust
/// # use hydro_lang::*;
/// # use dfir_rs::futures::StreamExt;
/// # tokio_test::block_on(test_util::stream_transform_test(|process| {
/// let tick = process.tick();
/// let stream = unsafe {
/// process
/// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
/// .timestamped(&tick)
/// .tick_batch()
/// };
/// let batch = unsafe {
/// process
/// .source_iter(q!(vec![1, 2]))
/// .timestamped(&tick)
/// .tick_batch()
/// };
/// stream.anti_join(batch).all_ticks().drop_timestamp()
/// # }, |mut stream| async move {
/// # for w in vec![(3, 'c'), (4, 'd')] {
/// # assert_eq!(stream.next().await.unwrap(), w);
/// # }
/// # }));
pub fn anti_join<O2>(self, n: Stream<K, L, Bounded, O2>) -> Stream<(K, V1), L, B, Order>
where
K: Eq + Hash,
Expand Down Expand Up @@ -1355,6 +1504,21 @@ impl<'a, K: Eq + Hash, V, L: Location<'a>, Order> Stream<(K, V), Tick<L>, Bounde
}

/// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
/// # Example
/// ```rust
/// # use hydro_lang::*;
/// # use dfir_rs::futures::StreamExt;
/// # tokio_test::block_on(test_util::stream_transform_test(|process| {
/// let tick = process.tick();
/// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
/// let batch = unsafe { numbers.timestamped(&tick).tick_batch() };
/// batch.keys().all_ticks().drop_timestamp()
/// # }, |mut stream| async move {
/// // 1, 2
/// # assert_eq!(stream.next().await.unwrap(), 1);
/// # assert_eq!(stream.next().await.unwrap(), 2);
/// # }));
/// ```
pub fn keys(self) -> Stream<K, Tick<L>, Bounded, Order> {
self.fold_keyed_commutative(q!(|| ()), q!(|_, _| {}))
.map(q!(|(k, _)| k))
Expand Down

0 comments on commit d2a1f38

Please sign in to comment.