Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Further asyncify the codebase #890

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion crates/containerd-shim-wasm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,16 @@ serde_json = { workspace = true }
tempfile = { workspace = true, optional = true }
thiserror = { workspace = true }
wat = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread"] }
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
futures = { version = "0.3.30" }
wasmparser = { version = "0.226.0" }
tokio-stream = { version = "0.1" }
sha256 = { workspace = true }
serde_bytes = "0.11"
prost = "0.13"
toml = "0.8"
trait-variant = "0.1"
tokio-async-drop = "0.1"

# tracing
# note: it's important to keep the version of tracing in sync with tracing-subscriber
Expand Down
8 changes: 4 additions & 4 deletions crates/containerd-shim-wasm/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,19 +83,19 @@ struct MyInstance {
}

impl Instance for MyInstance {
fn new(id: String, cfg: &InstanceConfig) -> Result<Self, Error> {
async fn new(id: String, cfg: &InstanceConfig) -> Result<Self, Error> {
Ok(MyInstance { engine: MyEngine })
}

fn start(&self) -> Result<u32, Error> {
async fn start(&self) -> Result<u32, Error> {
Ok(1)
}

fn kill(&self, signal: u32) -> Result<(), Error> {
async fn kill(&self, signal: u32) -> Result<(), Error> {
Ok(())
}

fn delete(&self) -> Result<(), Error> {
async fn delete(&self) -> Result<(), Error> {
Ok(())
}

Expand Down
12 changes: 7 additions & 5 deletions crates/containerd-shim-wasm/src/sandbox/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ pub mod r#impl {
pub use git_version::git_version;
}

use super::async_utils::AmbientRuntime as _;
pub use crate::{revision, version};

/// Get the crate version from Cargo.toml.
Expand Down Expand Up @@ -171,15 +172,16 @@ pub fn shim_main<'a, I>(
#[cfg(feature = "opentelemetry")]
if otel_traces_enabled() {
// opentelemetry uses tokio, so we need to initialize a runtime
use tokio::runtime::Runtime;
let rt = Runtime::new().unwrap();
rt.block_on(async {
async {
let otlp_config = OtlpConfig::build_from_env().expect("Failed to build OtelConfig.");
let _guard = otlp_config
.init()
.expect("Failed to initialize OpenTelemetry.");
shim_main_inner::<I>(name, version, revision, shim_version, config);
});
tokio::task::block_in_place(move || {
shim_main_inner::<I>(name, version, revision, shim_version, config);
});
}
.block_on();
} else {
shim_main_inner::<I>(name, version, revision, shim_version, config);
}
Expand Down
3 changes: 2 additions & 1 deletion crates/containerd-shim-wasm/src/sandbox/containerd/lease.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use containerd_client::services::v1::DeleteRequest;
use containerd_client::services::v1::leases_client::LeasesClient;
use containerd_client::tonic::transport::Channel;
use containerd_client::{tonic, with_namespace};
use tokio_async_drop::tokio_async_drop;
use tonic::Request;

// Adds lease info to grpc header
Expand Down Expand Up @@ -74,7 +75,7 @@ impl LeaseGuardInner {
impl Drop for LeaseGuard {
fn drop(&mut self) {
let inner = self.inner.take().unwrap();
tokio::spawn(async move {
tokio_async_drop!({
match inner.release().await {
Ok(()) => log::info!("removed lease"),
Err(err) => log::warn!("error removing lease: {err}"),
Expand Down
11 changes: 6 additions & 5 deletions crates/containerd-shim-wasm/src/sandbox/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,25 +32,26 @@ pub struct InstanceConfig {
/// Instance is a trait that gets implemented by consumers of this library.
/// This trait requires that any type implementing it is `'static`, similar to `std::any::Any`.
/// This means that the type cannot contain a non-`'static` reference.
#[trait_variant::make(Send)]
pub trait Instance: 'static {
/// Create a new instance
fn new(id: String, cfg: &InstanceConfig) -> Result<Self, Error>
async fn new(id: String, cfg: &InstanceConfig) -> Result<Self, Error>
where
Self: Sized;

/// Start the instance
/// The returned value should be a unique ID (such as a PID) for the instance.
/// Nothing internally should be using this ID, but it is returned to containerd where a user may want to use it.
fn start(&self) -> Result<u32, Error>;
async fn start(&self) -> Result<u32, Error>;

/// Send a signal to the instance
fn kill(&self, signal: u32) -> Result<(), Error>;
async fn kill(&self, signal: u32) -> Result<(), Error>;

/// Delete any reference to the instance
/// This is called after the instance has exited.
fn delete(&self) -> Result<(), Error>;
async fn delete(&self) -> Result<(), Error>;

/// Waits for the instance to finish and returns its exit code
/// This is an async call.
fn wait(&self) -> impl Future<Output = (u32, DateTime<Utc>)> + Send;
async fn wait(&self) -> (u32, DateTime<Utc>);
}
8 changes: 4 additions & 4 deletions crates/containerd-shim-wasm/src/sandbox/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,19 @@
//! }
//!
//! impl Instance for MyInstance {
//! fn new(id: String, cfg: &InstanceConfig) -> Result<Self, Error> {
//! async fn new(id: String, cfg: &InstanceConfig) -> Result<Self, Error> {
//! Ok(MyInstance { engine: MyEngine })
//! }
//!
//! fn start(&self) -> Result<u32, Error> {
//! async fn start(&self) -> Result<u32, Error> {
//! Ok(1)
//! }
//!
//! fn kill(&self, signal: u32) -> Result<(), Error> {
//! async fn kill(&self, signal: u32) -> Result<(), Error> {
//! Ok(())
//! }
//!
//! fn delete(&self) -> Result<(), Error> {
//! async fn delete(&self) -> Result<(), Error> {
//! Ok(())
//! }
//!
Expand Down
11 changes: 6 additions & 5 deletions crates/containerd-shim-wasm/src/sandbox/shim/cli.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,26 @@
use std::env::current_dir;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::sync::Arc;

use chrono::Utc;
use containerd_shim::error::Error as ShimError;
use containerd_shim::publisher::RemotePublisher;
use containerd_shim::util::write_address;
use containerd_shim::{self as shim, ExitSignal, api};
use containerd_shim::{self as shim, api};
use oci_spec::runtime::Spec;
use shim::Flags;

use crate::sandbox::async_utils::AmbientRuntime as _;
use crate::sandbox::instance::Instance;
use crate::sandbox::shim::events::{RemoteEventSender, ToTimestamp};
use crate::sandbox::shim::local::Local;
use crate::sandbox::sync::WaitableCell;

/// Cli implements the containerd-shim cli interface using `Local<T>` as the task service.
pub struct Cli<T: Instance + Sync + Send> {
namespace: String,
containerd_address: String,
exit: Arc<ExitSignal>,
exit: WaitableCell<()>,
_id: String,
_phantom: PhantomData<T>,
}
Expand Down Expand Up @@ -48,7 +49,7 @@ where
Cli {
namespace: args.namespace.to_string(),
containerd_address: args.address.clone(),
exit: Arc::default(),
exit: WaitableCell::new(),
_id: args.id.to_string(),
_phantom: PhantomData,
}
Expand Down Expand Up @@ -77,7 +78,7 @@ where

#[cfg_attr(feature = "tracing", tracing::instrument(level = "Info"))]
fn wait(&mut self) {
self.exit.wait();
self.exit.wait().block_on();
}

#[cfg_attr(
Expand Down
34 changes: 18 additions & 16 deletions crates/containerd-shim-wasm/src/sandbox/shim/instance_data.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,28 @@
use std::sync::{OnceLock, RwLock};

use chrono::{DateTime, Utc};
use tokio::sync::{OnceCell, RwLock};

use crate::sandbox::shim::task_state::TaskState;
use crate::sandbox::{Instance, InstanceConfig, Result};

pub(super) struct InstanceData<T: Instance> {
pub instance: T,
pub config: InstanceConfig,
pid: OnceLock<u32>,
pid: OnceCell<u32>,
state: RwLock<TaskState>,
}

impl<T: Instance> InstanceData<T> {
#[cfg_attr(feature = "tracing", tracing::instrument(level = "Debug"))]
pub fn new(id: impl AsRef<str> + std::fmt::Debug, config: InstanceConfig) -> Result<Self> {
pub async fn new(
id: impl AsRef<str> + std::fmt::Debug,
config: InstanceConfig,
) -> Result<Self> {
let id = id.as_ref().to_string();
let instance = T::new(id, &config)?;
let instance = T::new(id, &config).await?;
Ok(Self {
instance,
config,
pid: OnceLock::default(),
pid: OnceCell::default(),
state: RwLock::new(TaskState::Created),
})
}
Expand All @@ -31,11 +33,11 @@ impl<T: Instance> InstanceData<T> {
}

#[cfg_attr(feature = "tracing", tracing::instrument(skip(self), level = "Debug"))]
pub fn start(&self) -> Result<u32> {
let mut s = self.state.write().unwrap();
pub async fn start(&self) -> Result<u32> {
let mut s = self.state.write().await;
s.start()?;

let res = self.instance.start();
let res = self.instance.start().await;

// These state transitions are always `Ok(())` because
// we hold the lock since `s.start()`
Expand All @@ -51,19 +53,19 @@ impl<T: Instance> InstanceData<T> {
}

#[cfg_attr(feature = "tracing", tracing::instrument(skip(self), level = "Debug"))]
pub fn kill(&self, signal: u32) -> Result<()> {
let mut s = self.state.write().unwrap();
pub async fn kill(&self, signal: u32) -> Result<()> {
let mut s = self.state.write().await;
s.kill()?;

self.instance.kill(signal)
self.instance.kill(signal).await
}

#[cfg_attr(feature = "tracing", tracing::instrument(skip(self), level = "Debug"))]
pub fn delete(&self) -> Result<()> {
let mut s = self.state.write().unwrap();
pub async fn delete(&self) -> Result<()> {
let mut s = self.state.write().await;
s.delete()?;

let res = self.instance.delete();
let res = self.instance.delete().await;

if res.is_err() {
// Always `Ok(())` because we hold the lock since `s.delete()`
Expand All @@ -76,7 +78,7 @@ impl<T: Instance> InstanceData<T> {
#[cfg_attr(feature = "tracing", tracing::instrument(skip(self), level = "Debug"))]
pub async fn wait(&self) -> (u32, DateTime<Utc>) {
let res = self.instance.wait().await;
let mut s = self.state.write().unwrap();
let mut s = self.state.write().await;
*s = TaskState::Exited;
res
}
Expand Down
Loading
Loading