Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use async for the instance's wait method #885

Merged
merged 2 commits into from
Feb 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,9 @@ pub trait Instance {
/// Delete any reference to the instance
/// This is called after the instance has exited.
fn delete(&self) -> Result<(), Error>;
/// Wait for the instance to exit
/// The waiter is used to send the exit code and time back to the caller
/// Ideally this would just be a blocking call with a normal result, however
/// because of how this is called from a thread it causes issues with lifetimes of the trait implementer.
fn wait(&self, waiter: &Wait) -> Result<(), Error>;
/// Waits for the instance to finish and returns its exit code
/// This is an async call.
async fn wait(&self) -> (u32, DateTime<Utc>);
}
```

Expand Down
4 changes: 2 additions & 2 deletions crates/containerd-shim-wasm/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ impl Instance for MyInstance {
Ok(())
}

fn wait_timeout(&self, t: impl Into<Option<Duration>>) -> Option<(u32, DateTime<Utc>)> {
Some((0, Utc::now()))
async fn wait(&self) -> (u32, DateTime<Utc>) {
(0, Utc::now())
}
}
```
Expand Down
37 changes: 28 additions & 9 deletions crates/containerd-shim-wasm/src/sandbox/async_utils.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,44 @@
#![cfg_attr(windows, allow(dead_code))] // this is currently used only for linux

use std::future::Future;
use std::sync::LazyLock;
use std::time::Duration;

thread_local! {
// A thread local runtime that can be used to run futures to completion.
// It is a current_thread runtime so that it doesn't spawn new threads.
// It is thread local as different threads might want to run futures concurrently.
static RUNTIME: tokio::runtime::Runtime = {
tokio::runtime::Builder::new_current_thread()
use tokio::task::JoinHandle;
use tokio::time::timeout;

// A thread local runtime that can be used to run futures to completion.
// It is a current_thread runtime so that it doesn't spawn new threads.
// It is thread local as different threads might want to run futures concurrently.
static RUNTIME: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
};
}
});

pub trait AmbientRuntime: Future {
fn block_on(self) -> Self::Output
where
Self: Sized,
{
RUNTIME.with(|runtime| runtime.block_on(self))
RUNTIME.block_on(self)
}

#[allow(dead_code)] // used in tests and with the testing feature
async fn with_timeout(self, t: Duration) -> Option<Self::Output>
where
Self: Sized,
{
timeout(t, self).await.ok()
}

fn spawn(self) -> JoinHandle<Self::Output>
where
Self: Sized + Send + 'static,
Self::Output: Send + 'static,
{
RUNTIME.spawn(self)
}
}

Expand Down
13 changes: 2 additions & 11 deletions crates/containerd-shim-wasm/src/sandbox/instance.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! Abstractions for running/managing a wasm/wasi instance.

use std::path::PathBuf;
use std::time::Duration;

use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -55,14 +54,6 @@ pub trait Instance: 'static {
fn delete(&self) -> Result<(), Error>;

/// Waits for the instance to finish and returns its exit code
/// This is a blocking call.
#[cfg_attr(feature = "tracing", tracing::instrument(skip(self), level = "Info"))]
fn wait(&self) -> (u32, DateTime<Utc>) {
self.wait_timeout(None).unwrap()
}

/// Waits for the instance to finish and returns its exit code
/// Returns None if the timeout is reached before the instance has finished.
/// This is a blocking call.
fn wait_timeout(&self, t: impl Into<Option<Duration>>) -> Option<(u32, DateTime<Utc>)>;
/// This is an async call.
fn wait(&self) -> impl Future<Output = (u32, DateTime<Utc>)> + Send;
}
4 changes: 2 additions & 2 deletions crates/containerd-shim-wasm/src/sandbox/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@
//! Ok(())
//! }
//!
//! fn wait_timeout(&self, t: impl Into<Option<Duration>>) -> Option<(u32, DateTime<Utc>)> {
//! Some((0, Utc::now()))
//! async fn wait(&self) -> (u32, DateTime<Utc>) {
//! (0, Utc::now())
//! }
//! }
//! ```
Expand Down
18 changes: 2 additions & 16 deletions crates/containerd-shim-wasm/src/sandbox/shim/instance_data.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::sync::{OnceLock, RwLock};
use std::time::Duration;

use chrono::{DateTime, Utc};

Expand Down Expand Up @@ -75,23 +74,10 @@ impl<T: Instance> InstanceData<T> {
}

#[cfg_attr(feature = "tracing", tracing::instrument(skip(self), level = "Debug"))]
pub fn wait(&self) -> (u32, DateTime<Utc>) {
let res = self.instance.wait();
pub async fn wait(&self) -> (u32, DateTime<Utc>) {
let res = self.instance.wait().await;
let mut s = self.state.write().unwrap();
*s = TaskState::Exited;
res
}

#[cfg_attr(feature = "tracing", tracing::instrument(skip(self), level = "Debug"))]
pub fn wait_timeout(
&self,
t: impl Into<Option<Duration>> + std::fmt::Debug,
) -> Option<(u32, DateTime<Utc>)> {
let res = self.instance.wait_timeout(t);
if res.is_some() {
let mut s = self.state.write().unwrap();
*s = TaskState::Exited;
}
res
}
}
38 changes: 19 additions & 19 deletions crates/containerd-shim-wasm/src/sandbox/shim/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ use std::ops::Not;
use std::path::Path;
use std::sync::{Arc, RwLock};
use std::thread;
#[cfg(feature = "opentelemetry")]
use std::time::Duration;

use anyhow::{Context as AnyhowContext, ensure};
use anyhow::ensure;
use containerd_shim::api::{
ConnectRequest, ConnectResponse, CreateTaskRequest, CreateTaskResponse, DeleteRequest, Empty,
KillRequest, ShutdownRequest, StartRequest, StartResponse, StateRequest, StateResponse,
Expand All @@ -18,6 +19,7 @@ use containerd_shim::protos::shim::shim_ttrpc::Task;
use containerd_shim::protos::types::task::Status;
use containerd_shim::util::IntoOption;
use containerd_shim::{DeleteResponse, ExitSignal, TtrpcContext, TtrpcResult};
use futures::FutureExt as _;
use log::debug;
use oci_spec::runtime::Spec;
use prost::Message;
Expand All @@ -28,6 +30,7 @@ use tracing_opentelemetry::OpenTelemetrySpanExt as _;

#[cfg(feature = "opentelemetry")]
use super::otel::extract_context;
use crate::sandbox::async_utils::AmbientRuntime as _;
use crate::sandbox::instance::{Instance, InstanceConfig};
use crate::sandbox::shim::events::{EventSender, RemoteEventSender, ToTimestamp};
use crate::sandbox::shim::instance_data::InstanceData;
Expand Down Expand Up @@ -249,21 +252,18 @@ impl<T: Instance + Send + Sync, E: EventSender> Local<T, E> {

let id = req.id().to_string();

thread::Builder::new()
.name(format!("{id}-wait"))
.spawn(move || {
let (exit_code, timestamp) = i.wait();
events.send(TaskExit {
container_id: id.clone(),
exit_status: exit_code,
exited_at: Some(timestamp.to_timestamp()).into(),
pid,
id,
..Default::default()
});
})
.context("could not spawn thread to wait exit")
.map_err(Error::from)?;
async move {
let (exit_code, timestamp) = i.wait().await;
events.send(TaskExit {
container_id: id.clone(),
exit_status: exit_code,
exited_at: Some(timestamp.to_timestamp()).into(),
pid,
id,
..Default::default()
});
}
.spawn();

debug!("started: {:?}", req);

Expand Down Expand Up @@ -293,7 +293,7 @@ impl<T: Instance + Send + Sync, E: EventSender> Local<T, E> {
i.delete()?;

let pid = i.pid().unwrap_or_default();
let (exit_code, timestamp) = i.wait_timeout(Duration::ZERO).unzip();
let (exit_code, timestamp) = i.wait().now_or_never().unzip();
let timestamp = timestamp.map(ToTimestamp::to_timestamp);

self.instances.write().unwrap().remove(req.id());
Expand Down Expand Up @@ -321,7 +321,7 @@ impl<T: Instance + Send + Sync, E: EventSender> Local<T, E> {
}

let i = self.get_instance(req.id())?;
let (exit_code, timestamp) = i.wait();
let (exit_code, timestamp) = i.wait().block_on();

debug!("wait finishes");
Ok(WaitResponse {
Expand All @@ -339,7 +339,7 @@ impl<T: Instance + Send + Sync, E: EventSender> Local<T, E> {

let i = self.get_instance(req.id())?;
let pid = i.pid();
let (exit_code, timestamp) = i.wait_timeout(Duration::ZERO).unzip();
let (exit_code, timestamp) = i.wait().now_or_never().unzip();
let timestamp = timestamp.map(ToTimestamp::to_timestamp);

let status = if pid.is_none() {
Expand Down
4 changes: 2 additions & 2 deletions crates/containerd-shim-wasm/src/sandbox/shim/local/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ impl Instance for InstanceStub {
fn delete(&self) -> Result<(), Error> {
Ok(())
}
fn wait_timeout(&self, t: impl Into<Option<Duration>>) -> Option<(u32, DateTime<Utc>)> {
self.exit_code.wait_timeout(t).copied()
async fn wait(&self) -> (u32, DateTime<Utc>) {
*self.exit_code.wait().await
}
}

Expand Down
Loading
Loading