Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: handling of cgroup driver setting #864

Merged
merged 3 commits into from
Feb 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/containerd-shim-wasm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ wasmparser = { version = "0.225.0" }
tokio-stream = { version = "0.1" }
sha256 = { workspace = true }
serde_bytes = "0.11"
prost = "0.13"
toml = "0.8"

# tracing
# note: it's important to keep the version of tracing in sync with tracing-subscriber
Expand Down
14 changes: 14 additions & 0 deletions crates/containerd-shim-wasm/src/sandbox/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ pub struct InstanceConfig {
namespace: String,
/// GRPC address back to main containerd
containerd_address: String,
/// Enables systemd cgroup.
systemd_cgroup: bool,
}

impl InstanceConfig {
Expand All @@ -33,6 +35,7 @@ impl InstanceConfig {
Self {
namespace,
containerd_address,
systemd_cgroup: true,
stdin: PathBuf::default(),
stdout: PathBuf::default(),
stderr: PathBuf::default(),
Expand Down Expand Up @@ -93,6 +96,17 @@ impl InstanceConfig {
pub fn get_containerd_address(&self) -> String {
self.containerd_address.clone()
}

/// set the systemd cgroup for the instance
pub fn set_systemd_cgroup(&mut self, systemd_cgroup: bool) -> &mut Self {
self.systemd_cgroup = systemd_cgroup;
self
}

/// get the systemd cgroup for the instance
pub fn get_systemd_cgroup(&self) -> bool {
self.systemd_cgroup
}
}

/// Represents a WASI module(s).
Expand Down
49 changes: 47 additions & 2 deletions crates/containerd-shim-wasm/src/sandbox/shim/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::sync::{Arc, RwLock};
use std::thread;
use std::time::Duration;

use anyhow::Context as AnyhowContext;
use anyhow::{Context as AnyhowContext, ensure};
use containerd_shim::api::{
ConnectRequest, ConnectResponse, CreateTaskRequest, CreateTaskResponse, DeleteRequest, Empty,
KillRequest, ShutdownRequest, StartRequest, StartResponse, StateRequest, StateResponse,
Expand All @@ -20,6 +20,9 @@ use containerd_shim::util::IntoOption;
use containerd_shim::{DeleteResponse, ExitSignal, TtrpcContext, TtrpcResult};
use log::debug;
use oci_spec::runtime::Spec;
use prost::Message;
use protobuf::well_known_types::any::Any;
use serde::Deserialize;
#[cfg(feature = "opentelemetry")]
use tracing_opentelemetry::OpenTelemetrySpanExt as _;

Expand All @@ -34,6 +37,43 @@ use crate::sys::metrics::get_metrics;
#[cfg(test)]
mod tests;

#[derive(Message, Clone, PartialEq)]
struct Options {
#[prost(string)]
type_url: String,
#[prost(string)]
config_path: String,
#[prost(string)]
config_body: String,
}

#[derive(Deserialize, Default, Clone, PartialEq)]
struct Config {
#[serde(alias = "SystemdCgroup")]
systemd_cgroup: bool,
}

impl Config {
fn get_from_options(options: Option<&Any>) -> anyhow::Result<Self> {
let Some(opts) = options else {
return Ok(Default::default());
};

ensure!(
opts.type_url == "runtimeoptions.v1.Options",
"Invalid options type {}",
opts.type_url
);

let opts = Options::decode(opts.value.as_slice())?;

let config = toml::from_str(opts.config_body.as_str())
.map_err(|err| Error::InvalidArgument(format!("invalid shim options: {err}")))?;

Ok(config)
}
}

type LocalInstances<T> = RwLock<HashMap<String, Arc<InstanceData<T>>>>;

/// Local implements the Task service for a containerd shim.
Expand Down Expand Up @@ -99,6 +139,10 @@ impl<T: Instance + Send + Sync, E: EventSender> Local<T, E> {
impl<T: Instance + Send + Sync, E: EventSender> Local<T, E> {
#[cfg_attr(feature = "tracing", tracing::instrument(skip(self), level = "Debug"))]
fn task_create(&self, req: CreateTaskRequest) -> Result<CreateTaskResponse> {
let config = Config::get_from_options(req.options.as_ref())
.map_err(|err| Error::InvalidArgument(format!("invalid shim options: {err}")))?;
let systemd_cgroup = config.systemd_cgroup;

if !req.checkpoint().is_empty() || !req.parent_checkpoint().is_empty() {
return Err(ShimError::Unimplemented("checkpoint is not supported".to_string()).into());
}
Expand Down Expand Up @@ -147,7 +191,8 @@ impl<T: Instance + Send + Sync, E: EventSender> Local<T, E> {
cfg.set_bundle(&req.bundle)
.set_stdin(&req.stdin)
.set_stdout(&req.stdout)
.set_stderr(&req.stderr);
.set_stderr(&req.stderr)
.set_systemd_cgroup(systemd_cgroup);

// Check if this is a cri container
let instance = InstanceData::new(req.id(), cfg)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,12 @@ impl<E: Engine + Default> SandboxInstance for Instance<E> {
if let Ok(f) = open(cfg.get_stderr()) {
builder = builder.with_stderr(f);
}
let systemd_cgroup = cfg.get_systemd_cgroup();

let container = builder
.as_init(&bundle)
.as_sibling(true)
.with_systemd(false)
.with_systemd(systemd_cgroup)
.build()?;

Ok(container)
Expand Down
24 changes: 19 additions & 5 deletions crates/stress-test/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,7 @@ async fn main_impl() -> Result<()> {
let client = containerd::Client::default().await?;

if cli.args.is_empty() {
if cli.image == "ghcr.io/containerd/runwasi/wasi-demo-app:latest" {
cli.args = vec!["/wasi-demo-app.wasm".into(), "echo".into(), "hello".into()];
} else {
cli.args = client.entrypoint(&cli.image).await?;
}
cli.args = default_entrypoint(&client, &cli.image).await?;
}

if cli.containerd {
Expand Down Expand Up @@ -286,3 +282,21 @@ async fn run_stress_test(cli: Cli, c8d: impl Containerd) -> Result<()> {
}
Ok(())
}

async fn default_entrypoint(
client: &containerd::Client,
image: impl AsRef<str>,
) -> Result<Vec<String>> {
let image = image.as_ref();
let name = image.split_once(':').map(|p| p.0).unwrap_or(image);
let args = match name {
"ghcr.io/containerd/runwasi/wasi-demo-app" => {
vec!["/wasi-demo-app.wasm".into(), "echo".into(), "hello".into()]
}
"ghcr.io/containerd/runwasi/wasi-demo-oci" => {
vec!["/wasi-demo-oci.wasm".into(), "echo".into(), "hello".into()]
}
_ => client.entrypoint(image).await?,
};
Ok(args)
}
60 changes: 28 additions & 32 deletions crates/stress-test/src/mocks/shim.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::path::Path;

use anyhow::{Result, ensure};
use anyhow::{Result, bail, ensure};
use log::info;
use oci_spec::runtime::SpecBuilder;
use serde::Deserialize;
Expand Down Expand Up @@ -47,43 +47,39 @@ impl Shim {
info!("Starting shim");

let pid = std::process::id();
let binary = binary.as_ref();
let start_shim = || {
Command::new(binary)
.args([
"-namespace",
&format!("shim-benchmark-{pid}"),
"-id",
&format!("shim-benchmark-{pid}"),
"-address",
"/run/containerd/containerd.sock",
"start",
])
.env("TTRPC_ADDRESS", &socket)
.current_dir(dir.path())
.output()
};

// Start the shim twice. It seems that with task v3 the first run on the
// shim does not return immediately, but rather blocks. Running the shim
// twice we make sure that at least one of them will return immediately.
let output1 = Command::new(binary.as_ref())
.args([
"-namespace",
&format!("shim-benchmark-{pid}"),
"-id",
&format!("shim-benchmark-{pid}"),
"-address",
"/run/containerd/containerd.sock",
"start",
])
.env("TTRPC_ADDRESS", &socket)
.current_dir(dir.path())
.output();

let output2 = Command::new(binary.as_ref())
.args([
"-namespace",
&format!("shim-benchmark-{pid}"),
"-id",
&format!("shim-benchmark-{pid}"),
"-address",
"/run/containerd/containerd.sock",
"start",
])
.env("TTRPC_ADDRESS", &socket)
.current_dir(dir.path())
.output();

let output = tokio::select! {
o = output1 => o,
o = output2 => o
// However, the shims may race and one of them would exit with an error
// status, but by that time we can run a third and get the output.
tokio::select! {
o = start_shim() => o,
o = start_shim() => o,
}?;

let output = start_shim().await?;

if !output.status.success() {
bail!("failed to start shim: {output:?}");
}

let mut address = String::from_utf8(output.stdout)?.trim().to_owned();
if address.starts_with("{") {
#[derive(Deserialize)]
Expand Down
Loading