Skip to content

Commit

Permalink
make the instance's wait method async
Browse files Browse the repository at this point in the history
Signed-off-by: Jorge Prendes <jorge.prendes@gmail.com>
  • Loading branch information
jprendes committed Feb 27, 2025
1 parent 3240088 commit 23a22f8
Show file tree
Hide file tree
Showing 10 changed files with 95 additions and 149 deletions.
8 changes: 3 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,9 @@ pub trait Instance {
/// Delete any reference to the instance
/// This is called after the instance has exited.
fn delete(&self) -> Result<(), Error>;
/// Wait for the instance to exit
/// The waiter is used to send the exit code and time back to the caller
/// Ideally this would just be a blocking call with a normal result, however
/// because of how this is called from a thread it causes issues with lifetimes of the trait implementer.
fn wait(&self, waiter: &Wait) -> Result<(), Error>;
/// Waits for the instance to finish and returns its exit code
/// This is an async call.
async fn wait(&self) -> (u32, DateTime<Utc>);
}
```

Expand Down
17 changes: 8 additions & 9 deletions crates/containerd-shim-wasm/src/sandbox/async_utils.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,24 @@
#![cfg_attr(windows, allow(dead_code))] // this is currently used only for linux

use std::future::Future;
use std::sync::LazyLock;

thread_local! {
// A thread local runtime that can be used to run futures to completion.
// It is a current_thread runtime so that it doesn't spawn new threads.
// It is thread local as different threads might want to run futures concurrently.
static RUNTIME: tokio::runtime::Runtime = {
tokio::runtime::Builder::new_current_thread()
// A thread local runtime that can be used to run futures to completion.
// It is a current_thread runtime so that it doesn't spawn new threads.
// It is thread local as different threads might want to run futures concurrently.
static RUNTIME: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
};
}
});

pub trait AmbientRuntime: Future {
fn block_on(self) -> Self::Output
where
Self: Sized,
{
RUNTIME.with(|runtime| runtime.block_on(self))
RUNTIME.block_on(self)
}
}

Expand Down
13 changes: 2 additions & 11 deletions crates/containerd-shim-wasm/src/sandbox/instance.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! Abstractions for running/managing a wasm/wasi instance.
use std::path::{Path, PathBuf};
use std::time::Duration;

use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -136,14 +135,6 @@ pub trait Instance: 'static {
fn delete(&self) -> Result<(), Error>;

/// Waits for the instance to finish and returns its exit code
/// This is a blocking call.
#[cfg_attr(feature = "tracing", tracing::instrument(skip(self), level = "Info"))]
fn wait(&self) -> (u32, DateTime<Utc>) {
self.wait_timeout(None).unwrap()
}

/// Waits for the instance to finish and returns its exit code
/// Returns None if the timeout is reached before the instance has finished.
/// This is a blocking call.
fn wait_timeout(&self, t: impl Into<Option<Duration>>) -> Option<(u32, DateTime<Utc>)>;
/// This is an async call.
fn wait(&self) -> impl Future<Output = (u32, DateTime<Utc>)> + Send;
}
18 changes: 2 additions & 16 deletions crates/containerd-shim-wasm/src/sandbox/shim/instance_data.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::sync::{OnceLock, RwLock};
use std::time::Duration;

use chrono::{DateTime, Utc};

Expand Down Expand Up @@ -80,23 +79,10 @@ impl<T: Instance> InstanceData<T> {
}

#[cfg_attr(feature = "tracing", tracing::instrument(skip(self), level = "Debug"))]
pub fn wait(&self) -> (u32, DateTime<Utc>) {
let res = self.instance.wait();
pub async fn wait(&self) -> (u32, DateTime<Utc>) {
let res = self.instance.wait().await;
let mut s = self.state.write().unwrap();
*s = TaskState::Exited;
res
}

#[cfg_attr(feature = "tracing", tracing::instrument(skip(self), level = "Debug"))]
pub fn wait_timeout(
&self,
t: impl Into<Option<Duration>> + std::fmt::Debug,
) -> Option<(u32, DateTime<Utc>)> {
let res = self.instance.wait_timeout(t);
if res.is_some() {
let mut s = self.state.write().unwrap();
*s = TaskState::Exited;
}
res
}
}
11 changes: 7 additions & 4 deletions crates/containerd-shim-wasm/src/sandbox/shim/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::ops::Not;
use std::path::Path;
use std::sync::{Arc, RwLock};
use std::thread;
#[cfg(feature = "opentelemetry")]
use std::time::Duration;

use anyhow::{Context as AnyhowContext, ensure};
Expand All @@ -18,6 +19,7 @@ use containerd_shim::protos::shim::shim_ttrpc::Task;
use containerd_shim::protos::types::task::Status;
use containerd_shim::util::IntoOption;
use containerd_shim::{DeleteResponse, ExitSignal, TtrpcContext, TtrpcResult};
use futures::FutureExt as _;
use log::debug;
use oci_spec::runtime::Spec;
use prost::Message;
Expand All @@ -28,6 +30,7 @@ use tracing_opentelemetry::OpenTelemetrySpanExt as _;

#[cfg(feature = "opentelemetry")]
use super::otel::extract_context;
use crate::sandbox::async_utils::AmbientRuntime as _;
use crate::sandbox::instance::{Instance, InstanceConfig};
use crate::sandbox::shim::events::{EventSender, RemoteEventSender, ToTimestamp};
use crate::sandbox::shim::instance_data::InstanceData;
Expand Down Expand Up @@ -254,7 +257,7 @@ impl<T: Instance + Send + Sync, E: EventSender> Local<T, E> {
thread::Builder::new()
.name(format!("{id}-wait"))
.spawn(move || {
let (exit_code, timestamp) = i.wait();
let (exit_code, timestamp) = i.wait().block_on();
events.send(TaskExit {
container_id: id.clone(),
exit_status: exit_code,
Expand Down Expand Up @@ -295,7 +298,7 @@ impl<T: Instance + Send + Sync, E: EventSender> Local<T, E> {
i.delete()?;

let pid = i.pid().unwrap_or_default();
let (exit_code, timestamp) = i.wait_timeout(Duration::ZERO).unzip();
let (exit_code, timestamp) = i.wait().now_or_never().unzip();
let timestamp = timestamp.map(ToTimestamp::to_timestamp);

self.instances.write().unwrap().remove(req.id());
Expand Down Expand Up @@ -323,7 +326,7 @@ impl<T: Instance + Send + Sync, E: EventSender> Local<T, E> {
}

let i = self.get_instance(req.id())?;
let (exit_code, timestamp) = i.wait();
let (exit_code, timestamp) = i.wait().block_on();

debug!("wait finishes");
Ok(WaitResponse {
Expand All @@ -341,7 +344,7 @@ impl<T: Instance + Send + Sync, E: EventSender> Local<T, E> {

let i = self.get_instance(req.id())?;
let pid = i.pid();
let (exit_code, timestamp) = i.wait_timeout(Duration::ZERO).unzip();
let (exit_code, timestamp) = i.wait().now_or_never().unzip();
let timestamp = timestamp.map(ToTimestamp::to_timestamp);

let status = if pid.is_none() {
Expand Down
4 changes: 2 additions & 2 deletions crates/containerd-shim-wasm/src/sandbox/shim/local/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ impl Instance for InstanceStub {
fn delete(&self) -> Result<(), Error> {
Ok(())
}
fn wait_timeout(&self, t: impl Into<Option<Duration>>) -> Option<(u32, DateTime<Utc>)> {
self.exit_code.wait_timeout(t).copied()
async fn wait(&self) -> (u32, DateTime<Utc>) {
*self.exit_code.wait().await
}
}

Expand Down
Loading

0 comments on commit 23a22f8

Please sign in to comment.