diff --git a/hydro_lang/src/stream.rs b/hydro_lang/src/stream.rs index 0d902f3cf66..9e749831b55 100644 --- a/hydro_lang/src/stream.rs +++ b/hydro_lang/src/stream.rs @@ -244,7 +244,6 @@ impl<'a, T, L: Location<'a>, B, Order> Stream { /// let words = process.source_iter(q!(vec!["hello", "world"])); /// words.map(q!(|x| x.to_uppercase())) /// # }, |mut stream| async move { - /// // HELLO, WORLD /// # for w in vec!["HELLO", "WORLD"] { /// # assert_eq!(stream.next().await.unwrap(), w); /// # } @@ -481,7 +480,7 @@ impl<'a, T, L: Location<'a>, B, Order> Stream { /// .timestamped(&tick) /// .tick_batch() /// }; - /// let count = batch.clone().count(); + /// let count = batch.clone().count(); // `count()` returns a singleton /// batch.cross_singleton(count).all_ticks().drop_timestamp() /// # }, |mut stream| async move { /// // (1, 4), (2, 4), (3, 4), (4, 4) @@ -515,14 +514,29 @@ impl<'a, T, L: Location<'a>, B, Order> Stream { .map(q!(|(d, _signal)| d)) } - /// Allow this stream through if the other stream is empty, otherwise the output is empty. + /// Allow this stream through if the argument (a Bounded Optional) is empty, otherwise the output is empty. pub fn continue_unless(self, other: Optional) -> Stream { self.continue_if(other.into_stream().count().filter(q!(|c| *c == 0))) } /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all - /// tupled pairs. - pub fn cross_product(self, other: Stream) -> Stream<(T, O), L, B, Order> + /// tupled pairs in a non-deterministic order. + /// + /// # Example + /// ```rust + /// # use hydro_lang::*; + /// # use std::collections::HashSet; + /// # use dfir_rs::futures::StreamExt; + /// # tokio_test::block_on(test_util::stream_transform_test(|process| { + /// let tick = process.tick(); + /// let stream1 = process.source_iter(q!(vec!['a', 'b', 'c'])); + /// let stream2 = process.source_iter(q!(vec![1, 2, 3])); + /// stream1.cross_product(stream2) + /// # }, |mut stream| async move { + /// # let expected = HashSet::from([('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)]); + /// # stream.map(|i| assert!(expected.contains(&i))); + /// # })); + pub fn cross_product(self, other: Stream) -> Stream<(T, O), L, B, NoOrder> where T: Clone, O: Clone, @@ -541,6 +555,19 @@ impl<'a, T, L: Location<'a>, B, Order> Stream { /// Takes one stream as input and filters out any duplicate occurrences. The output /// contains all unique values from the input. + /// + /// # Example + /// ```rust + /// # use hydro_lang::*; + /// # use dfir_rs::futures::StreamExt; + /// # tokio_test::block_on(test_util::stream_transform_test(|process| { + /// let tick = process.tick(); + /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique() + /// # }, |mut stream| async move { + /// # for w in vec![1, 2, 3, 4] { + /// # assert_eq!(stream.next().await.unwrap(), w); + /// # } + /// # })); pub fn unique(self) -> Stream where T: Eq + Hash, @@ -558,6 +585,30 @@ impl<'a, T, L: Location<'a>, B, Order> Stream { /// /// The `other` stream must be [`Bounded`], since this function will wait until /// all its elements are available before producing any output. + /// # Example + /// ```rust + /// # use hydro_lang::*; + /// # use dfir_rs::futures::StreamExt; + /// # tokio_test::block_on(test_util::stream_transform_test(|process| { + /// let tick = process.tick(); + /// let stream = unsafe { + /// process + /// .source_iter(q!(vec![ 1, 2, 3, 4 ])) + /// .timestamped(&tick) + /// .tick_batch() + /// }; + /// let batch = unsafe { + /// process + /// .source_iter(q!(vec![1, 2])) + /// .timestamped(&tick) + /// .tick_batch() + /// }; + /// stream.filter_not_in(batch).all_ticks().drop_timestamp() + /// # }, |mut stream| async move { + /// # for w in vec![3, 4] { + /// # assert_eq!(stream.next().await.unwrap(), w); + /// # } + /// # })); pub fn filter_not_in(self, other: Stream) -> Stream where T: Eq + Hash, @@ -577,6 +628,21 @@ impl<'a, T, L: Location<'a>, B, Order> Stream { /// An operator which allows you to "inspect" each element of a stream without /// modifying it. The closure `f` is called on a reference to each item. This is /// mainly useful for debugging, and should not be used to generate side-effects. + /// + /// # Example + /// ```rust + /// # use hydro_lang::*; + /// # use dfir_rs::futures::StreamExt; + /// # tokio_test::block_on(test_util::stream_transform_test(|process| { + /// let nums = process.source_iter(q!(vec![1, 2])); + /// // prints "1 * 10 = 10" and "2 * 10 = 20" + /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10))) + /// # }, |mut stream| async move { + /// # for w in vec![1, 2] { + /// # assert_eq!(stream.next().await.unwrap(), w); + /// # } + /// # })); + /// ``` pub fn inspect( self, f: impl IntoQuotedMut<'a, F, L>, @@ -618,6 +684,31 @@ impl<'a, T, L: Location<'a>, B, Order> Stream { /// This function is used as an escape hatch, and any mistakes in the /// provided ordering guarantee will propagate into the guarantees /// for the rest of the program. + /// + /// # Example + /// # TODO: more sensible code after Shadaj merges + /// ```rust + /// # use hydro_lang::*; + /// # use std::collections::HashSet; + /// # use dfir_rs::futures::StreamExt; + /// # tokio_test::block_on(test_util::stream_transform_test(|process| { + /// let nums = process.source_iter(q!({ + /// let now = std::time::SystemTime::now(); + /// match now.elapsed().unwrap().as_secs() % 2 { + /// 0 => vec![5, 4, 3, 2, 1], + /// _ => vec![1, 2, 3, 4, 5], + /// } + /// .into_iter() + /// })); + /// // despite being generated by `source_iter`, the order of `nums` across runs is non-deterministic + /// let stream = unsafe { nums.assume_ordering::() }; + /// stream + /// # }, |mut stream| async move { + /// # for w in vec![1, 2, 3, 4, 5] { + /// # assert!((1..=5).contains(&stream.next().await.unwrap())); + /// # } + /// # })); + /// ``` pub unsafe fn assume_ordering(self) -> Stream { Stream::new(self.location, self.ir_node.into_inner()) } @@ -651,7 +742,7 @@ impl<'a, T, L: Location<'a>, B, Order> Stream where Order: MinOrder, { - /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value, + /// Combines elements of the stream into a [`Singleton`], by starting with an initial value, /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream. /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place. /// @@ -874,6 +965,23 @@ where } impl<'a, T, L: Location<'a>, B> Stream { + /// Returns a stream with the current count tupled with each element in the input stream. + /// + /// # Example + /// ```rust + /// # use hydro_lang::*; + /// # use dfir_rs::futures::StreamExt; + /// # tokio_test::block_on(test_util::stream_transform_test::<_, _, TotalOrder>(|process| { + /// let tick = process.tick(); + /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4])); + /// numbers.enumerate() + /// # }, |mut stream| async move { + /// // (0, 1), (1, 2), (2, 3), (3, 4) + /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] { + /// # assert_eq!(stream.next().await.unwrap(), w); + /// # } + /// # })); + /// ``` pub fn enumerate(self) -> Stream<(usize, T), L, B, TotalOrder> { if L::is_top_level() { Stream::new( @@ -1002,7 +1110,7 @@ impl<'a, T, L: Location<'a>, B> Stream { Singleton::new(self.location, core) } - /// Combines elements of the stream into a [`Optional`], by starting with the first element in the stream, + /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream, /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty /// until the first element in the input arrives. /// @@ -1178,6 +1286,22 @@ impl<'a, T, L: Location<'a>, Order> Stream { impl<'a, K, V1, L: Location<'a>, B, Order> Stream<(K, V1), L, B, Order> { /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))` /// by equi-joining the two streams on the key attribute `K`. + /// + /// # Example + /// ```rust + /// # use hydro_lang::*; + /// # use std::collections::HashSet; + /// # use dfir_rs::futures::StreamExt; + /// # tokio_test::block_on(test_util::stream_transform_test(|process| { + /// let tick = process.tick(); + /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')])); + /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')])); + /// stream1.join(stream2) + /// # }, |mut stream| async move { + /// // (1, ('a', 'x')), (2, ('b', 'y')) + /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]); + /// # stream.map(|i| assert!(expected.contains(&i))); + /// # })); pub fn join(self, n: Stream<(K, V2), L, B, O2>) -> Stream<(K, (V1, V2)), L, B, NoOrder> where K: Eq + Hash, @@ -1194,10 +1318,35 @@ impl<'a, K, V1, L: Location<'a>, B, Order> Stream<(K, V1), L, B, Order> { ) } - /// Given two streams of pairs `(K, V1)` and `(K, V2)`, + /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`, /// computes the anti-join of the items in the input -- i.e. returns /// unique items in the first input that do not have a matching key /// in the second input. + /// + /// # Example + /// ```rust + /// # use hydro_lang::*; + /// # use dfir_rs::futures::StreamExt; + /// # tokio_test::block_on(test_util::stream_transform_test(|process| { + /// let tick = process.tick(); + /// let stream = unsafe { + /// process + /// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ])) + /// .timestamped(&tick) + /// .tick_batch() + /// }; + /// let batch = unsafe { + /// process + /// .source_iter(q!(vec![1, 2])) + /// .timestamped(&tick) + /// .tick_batch() + /// }; + /// stream.anti_join(batch).all_ticks().drop_timestamp() + /// # }, |mut stream| async move { + /// # for w in vec![(3, 'c'), (4, 'd')] { + /// # assert_eq!(stream.next().await.unwrap(), w); + /// # } + /// # })); pub fn anti_join(self, n: Stream) -> Stream<(K, V1), L, B, Order> where K: Eq + Hash, @@ -1355,6 +1504,21 @@ impl<'a, K: Eq + Hash, V, L: Location<'a>, Order> Stream<(K, V), Tick, Bounde } /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`. + /// # Example + /// ```rust + /// # use hydro_lang::*; + /// # use dfir_rs::futures::StreamExt; + /// # tokio_test::block_on(test_util::stream_transform_test(|process| { + /// let tick = process.tick(); + /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)])); + /// let batch = unsafe { numbers.timestamped(&tick).tick_batch() }; + /// batch.keys().all_ticks().drop_timestamp() + /// # }, |mut stream| async move { + /// // 1, 2 + /// # assert_eq!(stream.next().await.unwrap(), 1); + /// # assert_eq!(stream.next().await.unwrap(), 2); + /// # })); + /// ``` pub fn keys(self) -> Stream, Bounded, Order> { self.fold_keyed_commutative(q!(|| ()), q!(|_, _| {})) .map(q!(|(k, _)| k))