From 23a22f8aa27d82d88fc0584dfda70a3595d04e7b Mon Sep 17 00:00:00 2001 From: Jorge Prendes Date: Thu, 27 Feb 2025 12:24:38 +0000 Subject: [PATCH] make the instance's wait method async Signed-off-by: Jorge Prendes --- README.md | 8 +- .../src/sandbox/async_utils.rs | 17 +- .../src/sandbox/instance.rs | 13 +- .../src/sandbox/shim/instance_data.rs | 18 +-- .../src/sandbox/shim/local.rs | 11 +- .../src/sandbox/shim/local/tests.rs | 4 +- .../containerd-shim-wasm/src/sandbox/sync.rs | 151 +++++++----------- .../src/sys/unix/container/instance.rs | 12 +- .../src/sys/windows/container/instance.rs | 4 +- crates/containerd-shim-wasm/src/testing.rs | 6 +- 10 files changed, 95 insertions(+), 149 deletions(-) diff --git a/README.md b/README.md index f5d00cfe9..a9aaf3e01 100644 --- a/README.md +++ b/README.md @@ -44,11 +44,9 @@ pub trait Instance { /// Delete any reference to the instance /// This is called after the instance has exited. fn delete(&self) -> Result<(), Error>; - /// Wait for the instance to exit - /// The waiter is used to send the exit code and time back to the caller - /// Ideally this would just be a blocking call with a normal result, however - /// because of how this is called from a thread it causes issues with lifetimes of the trait implementer. - fn wait(&self, waiter: &Wait) -> Result<(), Error>; + /// Waits for the instance to finish and returns its exit code + /// This is an async call. + async fn wait(&self) -> (u32, DateTime); } ``` diff --git a/crates/containerd-shim-wasm/src/sandbox/async_utils.rs b/crates/containerd-shim-wasm/src/sandbox/async_utils.rs index c638432a4..ed7a3855a 100644 --- a/crates/containerd-shim-wasm/src/sandbox/async_utils.rs +++ b/crates/containerd-shim-wasm/src/sandbox/async_utils.rs @@ -1,25 +1,24 @@ #![cfg_attr(windows, allow(dead_code))] // this is currently used only for linux use std::future::Future; +use std::sync::LazyLock; -thread_local! { - // A thread local runtime that can be used to run futures to completion. - // It is a current_thread runtime so that it doesn't spawn new threads. - // It is thread local as different threads might want to run futures concurrently. - static RUNTIME: tokio::runtime::Runtime = { - tokio::runtime::Builder::new_current_thread() +// A thread local runtime that can be used to run futures to completion. +// It is a current_thread runtime so that it doesn't spawn new threads. +// It is thread local as different threads might want to run futures concurrently. +static RUNTIME: LazyLock = LazyLock::new(|| { + tokio::runtime::Builder::new_multi_thread() .enable_all() .build() .unwrap() - }; -} +}); pub trait AmbientRuntime: Future { fn block_on(self) -> Self::Output where Self: Sized, { - RUNTIME.with(|runtime| runtime.block_on(self)) + RUNTIME.block_on(self) } } diff --git a/crates/containerd-shim-wasm/src/sandbox/instance.rs b/crates/containerd-shim-wasm/src/sandbox/instance.rs index a74b1e3e9..10186156b 100644 --- a/crates/containerd-shim-wasm/src/sandbox/instance.rs +++ b/crates/containerd-shim-wasm/src/sandbox/instance.rs @@ -1,7 +1,6 @@ //! Abstractions for running/managing a wasm/wasi instance. use std::path::{Path, PathBuf}; -use std::time::Duration; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; @@ -136,14 +135,6 @@ pub trait Instance: 'static { fn delete(&self) -> Result<(), Error>; /// Waits for the instance to finish and returns its exit code - /// This is a blocking call. - #[cfg_attr(feature = "tracing", tracing::instrument(skip(self), level = "Info"))] - fn wait(&self) -> (u32, DateTime) { - self.wait_timeout(None).unwrap() - } - - /// Waits for the instance to finish and returns its exit code - /// Returns None if the timeout is reached before the instance has finished. - /// This is a blocking call. - fn wait_timeout(&self, t: impl Into>) -> Option<(u32, DateTime)>; + /// This is an async call. + fn wait(&self) -> impl Future)> + Send; } diff --git a/crates/containerd-shim-wasm/src/sandbox/shim/instance_data.rs b/crates/containerd-shim-wasm/src/sandbox/shim/instance_data.rs index 5fa76b6b6..d2a27f9b4 100644 --- a/crates/containerd-shim-wasm/src/sandbox/shim/instance_data.rs +++ b/crates/containerd-shim-wasm/src/sandbox/shim/instance_data.rs @@ -1,5 +1,4 @@ use std::sync::{OnceLock, RwLock}; -use std::time::Duration; use chrono::{DateTime, Utc}; @@ -80,23 +79,10 @@ impl InstanceData { } #[cfg_attr(feature = "tracing", tracing::instrument(skip(self), level = "Debug"))] - pub fn wait(&self) -> (u32, DateTime) { - let res = self.instance.wait(); + pub async fn wait(&self) -> (u32, DateTime) { + let res = self.instance.wait().await; let mut s = self.state.write().unwrap(); *s = TaskState::Exited; res } - - #[cfg_attr(feature = "tracing", tracing::instrument(skip(self), level = "Debug"))] - pub fn wait_timeout( - &self, - t: impl Into> + std::fmt::Debug, - ) -> Option<(u32, DateTime)> { - let res = self.instance.wait_timeout(t); - if res.is_some() { - let mut s = self.state.write().unwrap(); - *s = TaskState::Exited; - } - res - } } diff --git a/crates/containerd-shim-wasm/src/sandbox/shim/local.rs b/crates/containerd-shim-wasm/src/sandbox/shim/local.rs index b54edf87e..cd958899a 100644 --- a/crates/containerd-shim-wasm/src/sandbox/shim/local.rs +++ b/crates/containerd-shim-wasm/src/sandbox/shim/local.rs @@ -4,6 +4,7 @@ use std::ops::Not; use std::path::Path; use std::sync::{Arc, RwLock}; use std::thread; +#[cfg(feature = "opentelemetry")] use std::time::Duration; use anyhow::{Context as AnyhowContext, ensure}; @@ -18,6 +19,7 @@ use containerd_shim::protos::shim::shim_ttrpc::Task; use containerd_shim::protos::types::task::Status; use containerd_shim::util::IntoOption; use containerd_shim::{DeleteResponse, ExitSignal, TtrpcContext, TtrpcResult}; +use futures::FutureExt as _; use log::debug; use oci_spec::runtime::Spec; use prost::Message; @@ -28,6 +30,7 @@ use tracing_opentelemetry::OpenTelemetrySpanExt as _; #[cfg(feature = "opentelemetry")] use super::otel::extract_context; +use crate::sandbox::async_utils::AmbientRuntime as _; use crate::sandbox::instance::{Instance, InstanceConfig}; use crate::sandbox::shim::events::{EventSender, RemoteEventSender, ToTimestamp}; use crate::sandbox::shim::instance_data::InstanceData; @@ -254,7 +257,7 @@ impl Local { thread::Builder::new() .name(format!("{id}-wait")) .spawn(move || { - let (exit_code, timestamp) = i.wait(); + let (exit_code, timestamp) = i.wait().block_on(); events.send(TaskExit { container_id: id.clone(), exit_status: exit_code, @@ -295,7 +298,7 @@ impl Local { i.delete()?; let pid = i.pid().unwrap_or_default(); - let (exit_code, timestamp) = i.wait_timeout(Duration::ZERO).unzip(); + let (exit_code, timestamp) = i.wait().now_or_never().unzip(); let timestamp = timestamp.map(ToTimestamp::to_timestamp); self.instances.write().unwrap().remove(req.id()); @@ -323,7 +326,7 @@ impl Local { } let i = self.get_instance(req.id())?; - let (exit_code, timestamp) = i.wait(); + let (exit_code, timestamp) = i.wait().block_on(); debug!("wait finishes"); Ok(WaitResponse { @@ -341,7 +344,7 @@ impl Local { let i = self.get_instance(req.id())?; let pid = i.pid(); - let (exit_code, timestamp) = i.wait_timeout(Duration::ZERO).unzip(); + let (exit_code, timestamp) = i.wait().now_or_never().unzip(); let timestamp = timestamp.map(ToTimestamp::to_timestamp); let status = if pid.is_none() { diff --git a/crates/containerd-shim-wasm/src/sandbox/shim/local/tests.rs b/crates/containerd-shim-wasm/src/sandbox/shim/local/tests.rs index 23d95ae14..16630c04c 100644 --- a/crates/containerd-shim-wasm/src/sandbox/shim/local/tests.rs +++ b/crates/containerd-shim-wasm/src/sandbox/shim/local/tests.rs @@ -39,8 +39,8 @@ impl Instance for InstanceStub { fn delete(&self) -> Result<(), Error> { Ok(()) } - fn wait_timeout(&self, t: impl Into>) -> Option<(u32, DateTime)> { - self.exit_code.wait_timeout(t).copied() + async fn wait(&self) -> (u32, DateTime) { + *self.exit_code.wait().await } } diff --git a/crates/containerd-shim-wasm/src/sandbox/sync.rs b/crates/containerd-shim-wasm/src/sandbox/sync.rs index c968d1ab8..d50e0b0ce 100644 --- a/crates/containerd-shim-wasm/src/sandbox/sync.rs +++ b/crates/containerd-shim-wasm/src/sandbox/sync.rs @@ -1,8 +1,9 @@ //! Synchronization primitives (e.g. `WaitableCell`) for the sandbox. use std::cell::OnceCell; -use std::sync::{Arc, Condvar, Mutex}; -use std::time::Duration; +use std::sync::Arc; + +use tokio::sync::Notify; /// A cell where we can wait (with timeout) for /// a value to be set @@ -11,16 +12,8 @@ pub struct WaitableCell { } struct WaitableCellImpl { - // Ideally we would just use a OnceLock, but it doesn't - // have the `wait` and `wait_timeout` methods, so we use - // a Condvar + Mutex pair instead. - // We can't guard the OnceCell **inside** the Mutex as - // that would produce ownership problems with returning - // `&T`. This is because the mutex doesn't know that we - // won't mutate the OnceCell once it's set. - mutex: Mutex<()>, - cvar: Condvar, cell: OnceCell, + notify: Notify, } // this is safe because access to cell guarded by the mutex @@ -31,9 +24,8 @@ impl Default for WaitableCell { fn default() -> Self { Self { inner: Arc::new(WaitableCellImpl { - mutex: Mutex::new(()), - cvar: Condvar::new(), - cell: OnceCell::new(), + cell: OnceCell::default(), + notify: Notify::new(), }), } } @@ -55,32 +47,34 @@ impl WaitableCell { /// Sets a value to the WaitableCell. /// This method has no effect if the WaitableCell already has a value. pub fn set(&self, val: impl Into) -> Result<(), T> { - let val = val.into(); - let _guard = self.inner.mutex.lock().unwrap(); - let res = self.inner.cell.set(val); - self.inner.cvar.notify_all(); - res + self.inner.cell.set(val.into())?; + self.inner.notify.notify_waiters(); + Ok(()) } /// If the `WaitableCell` is empty when this guard is dropped, the cell will be set to result of `f`. /// ``` /// # use containerd_shim_wasm::sandbox::sync::WaitableCell; + /// # tokio::runtime::Runtime::new().unwrap().block_on(async { /// let cell = WaitableCell::::new(); /// { /// let _guard = cell.set_guard_with(|| 42); /// } - /// assert_eq!(&42, cell.wait()); + /// assert_eq!(&42, cell.wait().await); + /// # }) /// ``` /// /// The operation is a no-op if the cell conbtains a value before the guard is dropped. /// ``` /// # use containerd_shim_wasm::sandbox::sync::WaitableCell; + /// # tokio::runtime::Runtime::new().unwrap().block_on(async { /// let cell = WaitableCell::::new(); /// { /// let _guard = cell.set_guard_with(|| 42); /// let _ = cell.set(24); /// } - /// assert_eq!(&24, cell.wait()); + /// assert_eq!(&24, cell.wait().await); + /// # }) /// ``` /// /// The function `f` will always be called, regardless of whether the `WaitableCell` has a value or not. @@ -92,29 +86,15 @@ impl WaitableCell { } /// Wait for the WaitableCell to be set a value. - pub fn wait(&self) -> &T { - let value = self.wait_timeout(None); - // safe because we waited with timeout `None` - unsafe { value.unwrap_unchecked() } - } - - /// Wait for the WaitableCell to be set a value, with timeout. - /// Returns None if the timeout is reached with no value. - pub fn wait_timeout(&self, timeout: impl Into>) -> Option<&T> { - let timeout = timeout.into(); - let cvar = &self.inner.cvar; - let guard = self.inner.mutex.lock().unwrap(); - let _guard = match timeout { - None => cvar - .wait_while(guard, |_| self.inner.cell.get().is_none()) - .unwrap(), - Some(Duration::ZERO) => guard, - Some(dur) => cvar - .wait_timeout_while(guard, dur, |_| self.inner.cell.get().is_none()) - .map(|(guard, _)| guard) - .unwrap(), - }; - self.inner.cell.get() + pub async fn wait(&self) -> &T { + let notified = self.inner.notify.notified(); + if let Some(val) = self.inner.cell.get() { + return val; + } + notified.await; + // safe because we've been notified, which can only happen + // a call to self.inner.cell.set(..) + unsafe { self.inner.cell.get().unwrap_unchecked() } } } @@ -134,95 +114,86 @@ impl, F: FnOnce() -> R> Drop for WaitableCellSetGuard { #[cfg(test)] mod test { - use std::thread::{sleep, spawn}; use std::time::Duration; - use super::WaitableCell; - - #[test] - fn basic() { - let cell = WaitableCell::::new(); - cell.set(42).unwrap(); - assert_eq!(&42, cell.wait()); - } + use futures::FutureExt; + use tokio::time::{sleep, timeout}; - #[test] - fn basic_timeout_zero() { - let cell = WaitableCell::::new(); - cell.set(42).unwrap(); - assert_eq!(Some(&42), cell.wait_timeout(Duration::ZERO)); - } + use super::WaitableCell; - #[test] - fn basic_timeout_1ms() { + #[tokio::test] + async fn basic() { let cell = WaitableCell::::new(); cell.set(42).unwrap(); - assert_eq!(Some(&42), cell.wait_timeout(Duration::from_secs(1))); + assert_eq!(&42, cell.wait().await); } - #[test] - fn basic_timeout_none() { + #[tokio::test] + async fn basic_timeout_zero() { let cell = WaitableCell::::new(); cell.set(42).unwrap(); - assert_eq!(Some(&42), cell.wait_timeout(None)); + assert_eq!(Some(&42), cell.wait().now_or_never()); } - #[test] - fn unset_timeout_zero() { + #[tokio::test] + async fn unset_timeout_zero() { let cell = WaitableCell::::new(); - assert_eq!(None, cell.wait_timeout(Duration::ZERO)); + assert_eq!(None, cell.wait().now_or_never()); } - #[test] - fn unset_timeout_1ms() { + #[tokio::test] + async fn unset_timeout_1ms() { let cell = WaitableCell::::new(); - assert_eq!(None, cell.wait_timeout(Duration::from_millis(1))); + assert_eq!( + None, + timeout(Duration::from_millis(1), cell.wait()).await.ok() + ); } - #[test] - fn clone() { + #[tokio::test] + async fn clone() { let cell = WaitableCell::::new(); let cloned = cell.clone(); let _ = cloned.set(42); - assert_eq!(&42, cell.wait()); + assert_eq!(&42, cell.wait().await); } - #[test] - fn basic_threaded() { + #[tokio::test] + async fn basic_threaded() { let cell = WaitableCell::::new(); - { + tokio::spawn({ let cell = cell.clone(); - spawn(move || { - sleep(Duration::from_millis(1)); + async move { + sleep(Duration::from_millis(1)).await; let _ = cell.set(42); - }); - } - assert_eq!(&42, cell.wait()); + } + }); + assert_eq!(&42, cell.wait().await); } - #[test] - fn basic_double_set() { + #[tokio::test] + async fn basic_double_set() { let cell = WaitableCell::::new(); assert_eq!(Ok(()), cell.set(42)); assert_eq!(Err(24), cell.set(24)); } - #[test] - fn guard() { + #[tokio::test] + async fn guard() { let cell = WaitableCell::::new(); { let _guard = cell.set_guard_with(|| 42); } - assert_eq!(&42, cell.wait()); + assert_eq!(&42, cell.wait().await); } - #[test] - fn guard_no_op() { + #[tokio::test] + async fn guard_no_op() { let cell = WaitableCell::::new(); { let _guard = cell.set_guard_with(|| 42); let _ = cell.set(24); } - assert_eq!(&24, cell.wait()); + assert_eq!(&24, cell.wait().await); } } diff --git a/crates/containerd-shim-wasm/src/sys/unix/container/instance.rs b/crates/containerd-shim-wasm/src/sys/unix/container/instance.rs index 341d51df7..6d6202a6e 100644 --- a/crates/containerd-shim-wasm/src/sys/unix/container/instance.rs +++ b/crates/containerd-shim-wasm/src/sys/unix/container/instance.rs @@ -1,7 +1,6 @@ use std::marker::PhantomData; use std::path::Path; use std::thread; -use std::time::Duration; use chrono::{DateTime, Utc}; use libcontainer::container::builder::ContainerBuilder; @@ -147,12 +146,9 @@ impl SandboxInstance for Instance { /// Waits for the instance to finish and returns its exit code /// Returns None if the timeout is reached before the instance has finished. - /// This is a blocking call. - #[cfg_attr( - feature = "tracing", - tracing::instrument(skip(self, t), level = "Info") - )] - fn wait_timeout(&self, t: impl Into>) -> Option<(u32, DateTime)> { - self.exit_code.wait_timeout(t).copied() + /// This is an async call. + #[cfg_attr(feature = "tracing", tracing::instrument(skip(self), level = "Info"))] + async fn wait(&self) -> (u32, DateTime) { + *self.exit_code.wait().await } } diff --git a/crates/containerd-shim-wasm/src/sys/windows/container/instance.rs b/crates/containerd-shim-wasm/src/sys/windows/container/instance.rs index 491368213..486009923 100644 --- a/crates/containerd-shim-wasm/src/sys/windows/container/instance.rs +++ b/crates/containerd-shim-wasm/src/sys/windows/container/instance.rs @@ -35,8 +35,8 @@ impl SandboxInstance for Instance { /// Waits for the instance to finish and returns its exit code /// Returns None if the timeout is reached before the instance has finished. - /// This is a blocking call. - fn wait_timeout(&self, _t: impl Into>) -> Option<(u32, DateTime)> { + /// This is an async call. + async fn wait(&self) -> (u32, DateTime) { todo!(); } } diff --git a/crates/containerd-shim-wasm/src/testing.rs b/crates/containerd-shim-wasm/src/testing.rs index 3d3643b44..0b58dcda7 100644 --- a/crates/containerd-shim-wasm/src/testing.rs +++ b/crates/containerd-shim-wasm/src/testing.rs @@ -18,7 +18,9 @@ use oci_spec::runtime::{ LinuxBuilder, LinuxNamespace, LinuxNamespaceType, ProcessBuilder, RootBuilder, SpecBuilder, get_default_namespaces, }; +use tokio::time::timeout; +use crate::sandbox::async_utils::AmbientRuntime; use crate::sandbox::{Instance, InstanceConfig}; pub const TEST_NAMESPACE: &str = "runwasi-test"; @@ -269,9 +271,9 @@ where Ok(self) } - pub fn wait(&self, timeout: Duration) -> Result<(u32, String, String)> { + pub fn wait(&self, t: Duration) -> Result<(u32, String, String)> { log::info!("waiting wasi test"); - let (status, _) = match self.instance.wait_timeout(timeout) { + let (status, _) = match timeout(t, self.instance.wait()).block_on().ok() { Some(res) => res, None => { self.instance.kill(SIGKILL)?;