diff --git a/README-CN.md b/README-CN.md index f43b6bf..1c461a4 100644 --- a/README-CN.md +++ b/README-CN.md @@ -32,6 +32,7 @@ 部分条目如下: - [`AnyRes`](https://docs.rs/est/latest/est/result/type.AnyRes.html) - [`collections::MapExt::replace_key()`](https://docs.rs/est/latest/est/collections/trait.MapExt.html#tymethod.replace_key) +- [`future::FutureExt::with_cancel_signal()`](https://docs.rs/est/latest/est/future/trait.FutureExt.html#tymethod.with_cancel_signal) - [`sync::once`](https://docs.rs/est/latest/est/sync/once/index.html) - [`task::CloseAndWait::close_and_wait()`](https://docs.rs/est/latest/est/task/trait.CloseAndWait.html#tymethod.close_and_wait) - [`task::TaskId`](https://docs.rs/est/latest/est/task/struct.TaskId.html) diff --git a/README.md b/README.md index 8c88539..c3a4ce8 100644 --- a/README.md +++ b/README.md @@ -32,6 +32,7 @@ Latest version: [v0.7.1](https://github.com/opensound-org/est/releases/tag/v0.7. Some of the items are as follows: - [`AnyRes`](https://docs.rs/est/latest/est/result/type.AnyRes.html) - [`collections::MapExt::replace_key()`](https://docs.rs/est/latest/est/collections/trait.MapExt.html#tymethod.replace_key) +- [`future::FutureExt::with_cancel_signal()`](https://docs.rs/est/latest/est/future/trait.FutureExt.html#tymethod.with_cancel_signal) - [`sync::once`](https://docs.rs/est/latest/est/sync/once/index.html) - [`task::CloseAndWait::close_and_wait()`](https://docs.rs/est/latest/est/task/trait.CloseAndWait.html#tymethod.close_and_wait) - [`task::TaskId`](https://docs.rs/est/latest/est/task/struct.TaskId.html) diff --git a/src/future.rs b/src/future.rs new file mode 100644 index 0000000..986c73c --- /dev/null +++ b/src/future.rs @@ -0,0 +1,98 @@ +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +/// A `Future` that can `select` whether a `Future` is successfully completed or cancelled +/// by a cancellation signal. +/// +/// Use [`FutureExt::with_cancel_signal`] to construct. +/// +/// If the execution of the original `Future` is completed, `.await` will resolve to `Ok` +/// with the `Output` of the original `Future`; otherwise (if cancelled by the cancellation +/// signal halfway), `.await` will resolve to `Err` with the `Output` of the cancellation +/// signal `Future`. +/// +/// Note: This `Future` will acquire the ownership of these two `Future`s, and [`Box::pin`] +/// them. This allows the two `Future`s to be arbitrary, including those that are not +/// [`Unpin`] (such as those generated by the `async block`). +#[derive(Debug)] +pub struct WithCancelSignal { + future: Pin>, + cancel: Pin>, +} + +impl Future for WithCancelSignal +where + F: Future, + C: Future, +{ + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if let Poll::Ready(o) = Pin::new(&mut self.future).poll(cx) { + return Poll::Ready(Ok(o)); + } + + if let Poll::Ready(o) = Pin::new(&mut self.cancel).poll(cx) { + return Poll::Ready(Err(o)); + } + + Poll::Pending + } +} + +/// [`Future`] extension trait. +/// +/// This trait has been implemented for all [`Sized`] `Future`s. +pub trait FutureExt: Future + Sized { + /// Construct a [`WithCancelSignal`] Future that can be selected and resolved to + /// [`Result`] according to "whether it is cancelled by a cancellation signal". + /// + /// # Example + /// + /// ``` + /// use est::future::FutureExt; + /// use std::time::Duration; + /// use tokio::time::sleep; + /// + /// #[tokio::main] + /// async fn main() { + /// let future = sleep(Duration::from_millis(100)); + /// let cancel = sleep(Duration::from_millis(50)); + /// assert!(future.with_cancel_signal(cancel).await.is_err()); + /// + /// let future = sleep(Duration::from_millis(100)); + /// let cancel = sleep(Duration::from_millis(200)); + /// assert!(future.with_cancel_signal(cancel).await.is_ok()); + /// } + /// ``` + fn with_cancel_signal(self, cancel: C) -> WithCancelSignal { + WithCancelSignal { + future: Box::pin(self), + cancel: Box::pin(cancel), + } + } +} + +impl FutureExt for T {} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn with_cancel_signal() { + use std::time::Duration; + use tokio::time::sleep; + + let cancel = async move { sleep(Duration::from_millis(100)).await }; + let future = async move { sleep(Duration::from_millis(200)).await }; + assert!(future.with_cancel_signal(cancel).await.is_err()); + + let cancel = async move { sleep(Duration::from_millis(100)).await }; + let future = async move { sleep(Duration::from_millis(50)).await }; + assert!(future.with_cancel_signal(cancel).await.is_ok()); + } +} diff --git a/src/lib.rs b/src/lib.rs index 91d7988..5a4f0c7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,6 +4,8 @@ /// Extensions to the [`std::collections`](https://doc.rust-lang.org/stable/std/collections/index.html) module. pub mod collections; +/// Extensions to the [`std::future`](https://doc.rust-lang.org/stable/std/future/index.html) module. +pub mod future; /// Extensions to the [`std::result`](https://doc.rust-lang.org/stable/std/result/index.html) module. pub mod result; /// Extensions to the [`std::sync`](https://doc.rust-lang.org/stable/std/sync/index.html) &