From c285b546f4a70637667c3b7033b52a1763b73f00 Mon Sep 17 00:00:00 2001 From: Jorge Prendes Date: Thu, 20 Feb 2025 19:32:38 +0000 Subject: [PATCH 1/6] [stress-test] Add support for task v3 API simplify protos Signed-off-by: Jorge Prendes --- Cargo.lock | 12 +-- crates/stress-test/Cargo.toml | 2 +- .../containerd/api/runtime/task/v3/shim.proto | 48 +++++++++++ crates/stress-test/src/mocks/mod.rs | 1 + crates/stress-test/src/mocks/shim.rs | 55 ++++++++++--- crates/stress-test/src/mocks/task.rs | 28 ++----- crates/stress-test/src/mocks/task_client.rs | 81 +++++++++++++++++++ crates/stress-test/src/protos.rs | 1 + 8 files changed, 189 insertions(+), 39 deletions(-) create mode 100644 crates/stress-test/protos/github.com/containerd/containerd/api/runtime/task/v3/shim.proto create mode 100644 crates/stress-test/src/mocks/task_client.rs diff --git a/Cargo.lock b/Cargo.lock index 7ac2797b7..88c83d0a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5925,9 +5925,9 @@ dependencies = [ [[package]] name = "trapeze" -version = "0.7.5" +version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a2da2a61bcb6db06b2b1e1665a7b7aec6f83e384b544ccf47a039c84b69946f" +checksum = "35508cc3b25208044772edae64f1aa282e08da46c6a906a37df9cdb02d459daf" dependencies = [ "anyhow", "async-stream", @@ -5949,18 +5949,18 @@ dependencies = [ [[package]] name = "trapeze-codegen" -version = "0.7.5" +version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d6f3bfa2fda8c8f5d417fca8030109f5f1b67e91b020bdc34d61a55dcf158f4" +checksum = "d14d87105c38be204d509e997c5ba53f05bf5f61957d2b4578220aa51eac79a8" dependencies = [ "prost-build 0.13.3", ] [[package]] name = "trapeze-macros" -version = "0.7.5" +version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b629c7aad077270c530523c46be16dfba42224824b57b539f9e2674d7e937b2c" +checksum = "e2a9fd22b2ca0a1103f1cab12fb29becc283be120b3a36581efbc3f9e31044fc" dependencies = [ "anyhow", "proc-macro2", diff --git a/crates/stress-test/Cargo.toml b/crates/stress-test/Cargo.toml index fdcc3223f..76b59035a 100644 --- a/crates/stress-test/Cargo.toml +++ b/crates/stress-test/Cargo.toml @@ -9,7 +9,7 @@ homepage.workspace = true [dependencies] anyhow = { workspace = true } -trapeze = "0.7" +trapeze = "0.7.6" prost = "0.13" prost-types = "0.13" tokio = { workspace = true, features = ["rt-multi-thread", "macros", "fs", "process", "signal"] } diff --git a/crates/stress-test/protos/github.com/containerd/containerd/api/runtime/task/v3/shim.proto b/crates/stress-test/protos/github.com/containerd/containerd/api/runtime/task/v3/shim.proto new file mode 100644 index 000000000..eb38ee1c2 --- /dev/null +++ b/crates/stress-test/protos/github.com/containerd/containerd/api/runtime/task/v3/shim.proto @@ -0,0 +1,48 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +syntax = "proto3"; + +package containerd.task.v3; + +import "google/protobuf/empty.proto"; +import "github.com/containerd/containerd/api/runtime/task/v2/shim.proto"; + +option go_package = "github.com/containerd/containerd/api/runtime/task/v3;task"; + +// Shim service is launched for each container and is responsible for owning the IO +// for the container and its additional processes. The shim is also the parent of +// each container and allows reattaching to the IO and receiving the exit status +// for the container processes. +service Task { + rpc State(containerd.task.v2.StateRequest) returns (containerd.task.v2.StateResponse); + rpc Create(containerd.task.v2.CreateTaskRequest) returns (containerd.task.v2.CreateTaskResponse); + rpc Start(containerd.task.v2.StartRequest) returns (containerd.task.v2.StartResponse); + rpc Delete(containerd.task.v2.DeleteRequest) returns (containerd.task.v2.DeleteResponse); + rpc Pids(containerd.task.v2.PidsRequest) returns (containerd.task.v2.PidsResponse); + rpc Pause(containerd.task.v2.PauseRequest) returns (google.protobuf.Empty); + rpc Resume(containerd.task.v2.ResumeRequest) returns (google.protobuf.Empty); + rpc Checkpoint(containerd.task.v2.CheckpointTaskRequest) returns (google.protobuf.Empty); + rpc Kill(containerd.task.v2.KillRequest) returns (google.protobuf.Empty); + rpc Exec(containerd.task.v2.ExecProcessRequest) returns (google.protobuf.Empty); + rpc ResizePty(containerd.task.v2.ResizePtyRequest) returns (google.protobuf.Empty); + rpc CloseIO(containerd.task.v2.CloseIORequest) returns (google.protobuf.Empty); + rpc Update(containerd.task.v2.UpdateTaskRequest) returns (google.protobuf.Empty); + rpc Wait(containerd.task.v2.WaitRequest) returns (containerd.task.v2.WaitResponse); + rpc Stats(containerd.task.v2.StatsRequest) returns (containerd.task.v2.StatsResponse); + rpc Connect(containerd.task.v2.ConnectRequest) returns (containerd.task.v2.ConnectResponse); + rpc Shutdown(containerd.task.v2.ShutdownRequest) returns (google.protobuf.Empty); +} diff --git a/crates/stress-test/src/mocks/mod.rs b/crates/stress-test/src/mocks/mod.rs index 5e44b44e1..4c9d27ea4 100644 --- a/crates/stress-test/src/mocks/mod.rs +++ b/crates/stress-test/src/mocks/mod.rs @@ -1,6 +1,7 @@ mod containerd; mod shim; mod task; +mod task_client; pub use containerd::Containerd; pub use shim::Shim; diff --git a/crates/stress-test/src/mocks/shim.rs b/crates/stress-test/src/mocks/shim.rs index 5255246a9..673308e70 100644 --- a/crates/stress-test/src/mocks/shim.rs +++ b/crates/stress-test/src/mocks/shim.rs @@ -1,21 +1,22 @@ use std::path::Path; -use anyhow::Result; +use anyhow::{Result, ensure}; use log::info; use oci_spec::runtime::SpecBuilder; +use serde::Deserialize; use tempfile::{TempDir, tempdir_in}; use tokio::fs::{canonicalize, symlink}; use tokio::process::Command; use tokio_async_drop::tokio_async_drop; -use trapeze::Client; use super::Task; use crate::containerd; -use crate::protos::containerd::task::v2::{ShutdownRequest, Task as _}; +use crate::mocks::task_client::TaskClient; +use crate::protos::containerd::task::v2::ShutdownRequest; pub struct Shim { dir: TempDir, - client: Client, + client: TaskClient, containerd: containerd::Client, } @@ -46,7 +47,11 @@ impl Shim { info!("Starting shim"); let pid = std::process::id(); - let output = Command::new(binary.as_ref()) + + // Start the shim twice. It seems that with task v3 the first run on the + // shim does not return immediately, but rather blocks. Running the shim + // twice we make sure that at least one of them will return immediately. + let output1 = Command::new(binary.as_ref()) .args([ "-namespace", &format!("shim-benchmark-{pid}"), @@ -56,16 +61,44 @@ impl Shim { "/run/containerd/containerd.sock", "start", ]) - .process_group(0) - .env("TTRPC_ADDRESS", socket) + .env("TTRPC_ADDRESS", &socket) .current_dir(dir.path()) - .output() - .await?; + .output(); - let address = String::from_utf8(output.stdout)?.trim().to_owned(); + let output2 = Command::new(binary.as_ref()) + .args([ + "-namespace", + &format!("shim-benchmark-{pid}"), + "-id", + &format!("shim-benchmark-{pid}"), + "-address", + "/run/containerd/containerd.sock", + "start", + ]) + .env("TTRPC_ADDRESS", &socket) + .current_dir(dir.path()) + .output(); + + let output = tokio::select! { + o = output1 => o, + o = output2 => o + }?; + + let mut address = String::from_utf8(output.stdout)?.trim().to_owned(); + if address.starts_with("{") { + #[derive(Deserialize)] + struct Address { + address: String, + protocol: String, + } + + let parsed: Address = serde_json::from_str(&address)?; + ensure!(parsed.protocol == "ttrpc"); + address = parsed.address; + } info!("Connecting to {address}"); - let client = Client::connect(address).await?; + let client = TaskClient::connect(address).await?; Ok(Shim { dir, diff --git a/crates/stress-test/src/mocks/task.rs b/crates/stress-test/src/mocks/task.rs index f87004e24..8e1a30843 100644 --- a/crates/stress-test/src/mocks/task.rs +++ b/crates/stress-test/src/mocks/task.rs @@ -1,18 +1,15 @@ use std::path::{Path, PathBuf}; use anyhow::Result; -use log::info; use nix::NixPath; use oci_spec::runtime::{ProcessBuilder, RootBuilder, SpecBuilder, UserBuilder}; use tempfile::{TempDir, tempdir_in}; use tokio::fs::{canonicalize, create_dir_all, write}; use tokio_async_drop::tokio_async_drop; -use trapeze::Client; +use super::task_client::TaskClient; use crate::containerd; -use crate::protos::containerd::task::v2::{ - CreateTaskRequest, DeleteRequest, StartRequest, Task as _, WaitRequest, -}; +use crate::protos::containerd::task::v2::*; use crate::protos::containerd::types::Mount; use crate::traits::Task as _; use crate::utils::{RunOnce, make_task_id}; @@ -20,7 +17,7 @@ use crate::utils::{RunOnce, make_task_id}; pub struct Task { id: String, dir: TempDir, - client: Client, + client: TaskClient, mounts: Vec, deleted: RunOnce, unmounted: RunOnce, @@ -32,7 +29,7 @@ impl Task { scratch: impl AsRef, image: String, args: impl IntoIterator, - client: Client, + client: TaskClient, ) -> Result { let id = make_task_id(); let mounts = containerd.get_mounts(&id, &image).await?; @@ -95,8 +92,7 @@ impl crate::traits::Task for Task { let stdout = stdout.to_string_lossy().into_owned(); - let res = self - .client + self.client .create(CreateTaskRequest { id: self.id.clone(), bundle: self.dir.path().to_string_lossy().into_owned(), @@ -106,36 +102,26 @@ impl crate::traits::Task for Task { }) .await?; - info!("create returned {res:?}"); - Ok(()) } async fn start(&self) -> Result<()> { - let res = self - .client + self.client .start(StartRequest { id: self.id.clone(), ..Default::default() }) .await?; - - info!("start returned {res:?}"); - Ok(()) } async fn wait(&self) -> Result<()> { - let res = self - .client + self.client .wait(WaitRequest { id: self.id.clone(), ..Default::default() }) .await?; - - info!("wait returned {res:?}"); - Ok(()) } diff --git a/crates/stress-test/src/mocks/task_client.rs b/crates/stress-test/src/mocks/task_client.rs new file mode 100644 index 000000000..fccc8c9ba --- /dev/null +++ b/crates/stress-test/src/mocks/task_client.rs @@ -0,0 +1,81 @@ +use anyhow::{Result, bail}; +use trapeze::{Client, Code}; + +use crate::protos::containerd::task::v2::*; +use crate::protos::containerd::task::v3::Task as TaskV3; + +#[derive(Clone, Copy)] +enum Version { + V2, + V3, +} + +#[derive(Clone)] +pub struct TaskClient { + client: Client, + version: Version, +} + +macro_rules! multiplex { + ($obj:ident.$method:ident ( $req:ident ) $($rest:tt)*) => {{ + match $obj.version { + Version::V2 => { + trapeze::as_client!(&$obj.client: Task) + .$method($req) + .await + } + Version::V3 => { + trapeze::as_client!(&$obj.client: TaskV3) + .$method($req) + .await + } + } + Ok(()) + }}; +} + +impl TaskClient { + pub async fn connect(address: impl AsRef) -> Result { + let client = Client::connect(address).await?; + + let version = 'v: { + let task = trapeze::as_client!(&client: Task); + let Err(status) = task.delete(DeleteRequest::default()).await else { + bail!("unexpected shim response") + }; + if status.code() != Code::Unimplemented { + break 'v Version::V2; + } + let task = trapeze::as_client!(&client: TaskV3); + let Err(status) = task.delete(DeleteRequest::default()).await else { + bail!("unexpected shim response") + }; + if status.code() != Code::Unimplemented { + break 'v Version::V3; + } + bail!("unknown task service version") + }; + + Ok(Self { version, client }) + } + + pub async fn shutdown(&self, req: ShutdownRequest) -> trapeze::Result<()> { + multiplex!(self.shutdown(req)) + } + + pub async fn create(&self, req: CreateTaskRequest) -> trapeze::Result { + multiplex!(self.create(req)) + } + + pub async fn start(&self, req: StartRequest) -> trapeze::Result { + multiplex!(self.start(req)) + } + + pub async fn wait(&self, req: WaitRequest) -> trapeze::Result { + multiplex!(self.wait(req)) + } + + pub async fn delete(&self, req: DeleteRequest) -> trapeze::Result { + multiplex!(self.delete(req)) + } +} diff --git a/crates/stress-test/src/protos.rs b/crates/stress-test/src/protos.rs index 0c1b0210e..7486d2c3a 100644 --- a/crates/stress-test/src/protos.rs +++ b/crates/stress-test/src/protos.rs @@ -1,6 +1,7 @@ trapeze::include_protos!( [ "protos/github.com/containerd/containerd/api/runtime/task/v2/shim.proto", + "protos/github.com/containerd/containerd/api/runtime/task/v3/shim.proto", "protos/github.com/containerd/containerd/api/services/ttrpc/events/v1/events.proto", ], ["protos"] From dd72da964ea89312ab953bd3749331f8b9f774de Mon Sep 17 00:00:00 2001 From: Jorge Prendes Date: Thu, 20 Feb 2025 19:33:01 +0000 Subject: [PATCH 2/6] [stress-test] Add support for index manifest Signed-off-by: Jorge Prendes --- crates/stress-test/src/containerd/client.rs | 26 ++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/crates/stress-test/src/containerd/client.rs b/crates/stress-test/src/containerd/client.rs index c6492c0f3..f4cb318ea 100644 --- a/crates/stress-test/src/containerd/client.rs +++ b/crates/stress-test/src/containerd/client.rs @@ -21,7 +21,7 @@ use containerd_client::services::v1::{ }; use containerd_client::types::Mount; use humantime::format_rfc3339; -use oci_spec::image::{ImageConfiguration, ImageManifest}; +use oci_spec::image::{Arch, ImageConfiguration, ImageIndex, ImageManifest}; use oci_spec::runtime::Spec; use prost_types::Any; use tokio_async_drop::tokio_async_drop as async_drop; @@ -125,9 +125,29 @@ impl ClientInner { let request = self.with_metadata(request); let response = client.get(request).await?.into_inner(); let image = response.image.context("Could not find image")?; - let descriptor = image.target.context("Could not find image descriptor")?; - let manifest = self.read_content(&descriptor.digest).await?; + let mut manifest = self.read_content(&descriptor.digest).await?; + + // If this is a multiplatform image, the manifest will be an index manifest + // rather than an image manifest. + if let Ok(index) = ImageIndex::from_reader(Cursor::new(&manifest)) { + let descriptor = index + .manifests() + .iter() + .find(|m| { + let Some(platform) = m.platform() else { + return false; + }; + match platform.architecture() { + Arch::Amd64 => cfg!(target_arch = "x86_64"), + Arch::ARM64 => cfg!(target_arch = "aarch64"), + _ => false, + } + }) + .context("host platform not supported")?; + manifest = self.read_content(descriptor.digest()).await?; + } + let manifest = ImageManifest::from_reader(Cursor::new(manifest))?; let config = self.read_content(manifest.config().digest()).await?; let config = ImageConfiguration::from_reader(Cursor::new(config))?; From 3bc1021f5891623022687c86734ef34ffbcf29e1 Mon Sep 17 00:00:00 2001 From: Jorge Prendes Date: Thu, 20 Feb 2025 16:37:26 +0000 Subject: [PATCH 3/6] [stress-test] fix how chain id is computed Signed-off-by: Jorge Prendes --- Cargo.lock | 1 + crates/stress-test/Cargo.toml | 1 + crates/stress-test/src/containerd/client.rs | 29 ++++++++++++++++++--- 3 files changed, 28 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 88c83d0a0..2a1a67623 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5227,6 +5227,7 @@ dependencies = [ "prost-types 0.13.5", "serde", "serde_json", + "sha256", "tempfile", "tokio", "tokio-async-drop", diff --git a/crates/stress-test/Cargo.toml b/crates/stress-test/Cargo.toml index 76b59035a..5745e9c2c 100644 --- a/crates/stress-test/Cargo.toml +++ b/crates/stress-test/Cargo.toml @@ -27,6 +27,7 @@ tonic = "0.12" serde_json = { workspace = true } serde = { workspace = true } futures = "0.3" +sha256 = { workspace = true } [package.metadata.cargo-machete] diff --git a/crates/stress-test/src/containerd/client.rs b/crates/stress-test/src/containerd/client.rs index f4cb318ea..560290c05 100644 --- a/crates/stress-test/src/containerd/client.rs +++ b/crates/stress-test/src/containerd/client.rs @@ -158,12 +158,12 @@ impl ClientInner { pub(crate) async fn snapshot( &self, id: String, - diffs: Vec, + parent: String, ) -> Result> { let mut client = SnapshotsClient::new(self.channel.clone()); let request = PrepareSnapshotRequest { key: id, - parent: diffs.join(" "), + parent, snapshotter: String::from("overlayfs"), ..Default::default() }; @@ -325,7 +325,8 @@ impl Client { ) -> Result> { let config = self.0.image_config(image.into()).await?; let diffs = config.rootfs().diff_ids().clone(); - let mounts = self.0.snapshot(id.into(), diffs).await?; + let chain_id = chain_id(diffs); + let mounts = self.0.snapshot(id.into(), chain_id).await?; Ok(mounts) } @@ -385,3 +386,25 @@ impl Clone for Client { Self(self.0.clone()) } } + +fn chain_id(digests: Vec) -> String { + let mut chain_id = digests.get(0).cloned().unwrap_or_default(); + for digest in digests.iter().skip(1) { + chain_id = sha256::digest(format!("{chain_id} {digest}")); + chain_id = format!("sha256:{chain_id}"); + } + chain_id +} + +#[cfg(test)] +mod test { + #[test] + fn chain_id_smoke() { + let diffs = vec![ + String::from("sha256:6f60b56fd4d6a01ebc6ee4133eb429a00c327acc869a0c6083f0e4bc784d8d07"), + String::from("sha256:4d851d7c3ef9a3cb8c6553806846038c3c81498e1f6d6dc60bb03291f223b99a"), + ]; + let chain_id = super::chain_id(diffs); + assert_eq!(chain_id, "sha256:0f8505411d5fe958101c5e6b6e31c61262a05f7aff548bf7742ff1ad24d6bf88"); + } +} From e9fe1df3d1c586f07384e8a1d7ba61c510dafb5d Mon Sep 17 00:00:00 2001 From: Jorge Prendes Date: Thu, 20 Feb 2025 18:51:47 +0000 Subject: [PATCH 4/6] [stress-test] fail and print stdout/stderr if container exit status is not 0 Signed-off-by: Jorge Prendes --- crates/stress-test/src/containerd/client.rs | 15 +++++---- crates/stress-test/src/containerd/task.rs | 31 +++++++++-------- crates/stress-test/src/main.rs | 9 ++--- crates/stress-test/src/mocks/task.rs | 37 +++++++++++++-------- crates/stress-test/src/mocks/task_client.rs | 1 - crates/stress-test/src/traits.rs | 2 +- 6 files changed, 52 insertions(+), 43 deletions(-) diff --git a/crates/stress-test/src/containerd/client.rs b/crates/stress-test/src/containerd/client.rs index 560290c05..aea7dcc78 100644 --- a/crates/stress-test/src/containerd/client.rs +++ b/crates/stress-test/src/containerd/client.rs @@ -211,6 +211,7 @@ impl ClientInner { container_id: impl Into, mounts: impl Into>, stdout: impl Into, + stderr: impl Into, ) -> Result<()> { let mut client = TasksClient::new(self.channel.clone()); @@ -218,6 +219,7 @@ impl ClientInner { container_id: container_id.into(), rootfs: mounts.into(), stdout: stdout.into(), + stderr: stderr.into(), ..Default::default() }; let request = self.with_metadata(request); @@ -241,7 +243,7 @@ impl ClientInner { Ok(()) } - async fn wait_task(&self, container_id: impl Into) -> Result<()> { + async fn wait_task(&self, container_id: impl Into) -> Result { let mut client = TasksClient::new(self.channel.clone()); let request = WaitRequest { @@ -250,9 +252,9 @@ impl ClientInner { }; let request = self.with_metadata(request); - client.wait(request).await?; - - Ok(()) + let response = client.wait(request).await?; + let status = response.into_inner().exit_status; + Ok(status) } async fn kill_task(&self, container_id: impl Into) -> Result<()> { @@ -356,15 +358,16 @@ impl Client { container_id: impl Into, mounts: impl Into>, stdout: impl Into, + stderr: impl Into, ) -> Result<()> { - self.0.create_task(container_id, mounts, stdout).await + self.0.create_task(container_id, mounts, stdout, stderr).await } pub async fn start_task(&self, container_id: impl Into) -> Result<()> { self.0.start_task(container_id).await } - pub async fn wait_task(&self, container_id: impl Into) -> Result<()> { + pub async fn wait_task(&self, container_id: impl Into) -> Result { self.0.wait_task(container_id).await } diff --git a/crates/stress-test/src/containerd/task.rs b/crates/stress-test/src/containerd/task.rs index 4e5f33222..714d487a7 100644 --- a/crates/stress-test/src/containerd/task.rs +++ b/crates/stress-test/src/containerd/task.rs @@ -1,9 +1,7 @@ -use std::path::PathBuf; - -use anyhow::Result; +use anyhow::{ensure, Result}; use containerd_client::types::Mount; use oci_spec::runtime::{ProcessBuilder, RootBuilder, Spec, SpecBuilder, UserBuilder}; -use tokio::fs::canonicalize; +use tempfile::{tempdir, TempDir}; use tokio_async_drop::tokio_async_drop; use super::Client; @@ -19,6 +17,7 @@ pub struct Task { spec: Spec, task_deleted: RunOnce, container_deleted: RunOnce, + dir: TempDir, } impl Task { @@ -71,28 +70,28 @@ impl Task { spec, task_deleted: RunOnce::new(), container_deleted: RunOnce::new(), + dir: tempdir()?, }) } } impl crate::traits::Task for Task { - async fn create(&self, verbose: bool) -> Result<()> { - let stdout = if !verbose { - PathBuf::new() - } else if let Ok(stdout) = canonicalize("/proc/self/fd/1").await { - stdout - } else { - PathBuf::new() - }; + async fn create(&self) -> Result<()> { + let stdout = self.dir.path().join("stdout"); + let stderr = self.dir.path().join("stderr"); + + let _ = std::fs::write(&stdout, ""); + let _ = std::fs::write(&stderr, ""); let stdout = stdout.to_string_lossy().into_owned(); + let stderr = stderr.to_string_lossy().into_owned(); self.containerd .create_container(&self.id, &self.image, &self.runtime, self.spec.clone()) .await?; self.containerd - .create_task(&self.id, &self.mounts[..], stdout) + .create_task(&self.id, &self.mounts[..], stdout, stderr) .await?; Ok(()) @@ -103,7 +102,11 @@ impl crate::traits::Task for Task { } async fn wait(&self) -> Result<()> { - self.containerd.wait_task(&self.id).await + let status = self.containerd.wait_task(&self.id).await?; + let stdout = std::fs::read_to_string(self.dir.path().join("stdout")).unwrap_or_default(); + let stderr = std::fs::read_to_string(self.dir.path().join("stderr")).unwrap_or_default(); + ensure!(status == 0, "Exit status {status}, stdout: {stdout:?}, stderr: {stderr:?}"); + Ok(()) } async fn delete(&self) -> Result<()> { diff --git a/crates/stress-test/src/main.rs b/crates/stress-test/src/main.rs index 550786e37..38b5fe61c 100644 --- a/crates/stress-test/src/main.rs +++ b/crates/stress-test/src/main.rs @@ -52,10 +52,6 @@ struct Cli { /// Show the shim logs in stderr verbose: bool, - #[arg(short('O'), long)] - /// Show the container output in stdout - container_output: bool, - #[arg(short, long, default_value("1"))] /// Number of tasks to create and start concurrently [0 = no limit] parallel: usize, @@ -124,7 +120,6 @@ async fn run_stress_test(cli: Cli, c8d: impl Containerd) -> Result<()> { let Cli { containerd, shim: shim_path, - container_output, parallel, count, timeout, @@ -141,7 +136,7 @@ async fn run_stress_test(cli: Cli, c8d: impl Containerd) -> Result<()> { // create a "pause" container to keep the shim running let pause = shim.task(&image, &args).await?; - pause.create(false).await?; + pause.create().await?; let permits = if parallel == 0 { count } else { parallel }; let semaphore = Arc::new(Semaphore::new(permits)); @@ -169,7 +164,7 @@ async fn run_stress_test(cli: Cli, c8d: impl Containerd) -> Result<()> { let permit = semaphore.acquire_owned().await?; let _ = start.set(Instant::now()); - task.create(container_output).await?; + task.create().await?; task.start().await?; // release the concurrency slot diff --git a/crates/stress-test/src/mocks/task.rs b/crates/stress-test/src/mocks/task.rs index 8e1a30843..6003a1107 100644 --- a/crates/stress-test/src/mocks/task.rs +++ b/crates/stress-test/src/mocks/task.rs @@ -1,10 +1,10 @@ use std::path::{Path, PathBuf}; -use anyhow::Result; +use anyhow::{ensure, Result}; use nix::NixPath; use oci_spec::runtime::{ProcessBuilder, RootBuilder, SpecBuilder, UserBuilder}; -use tempfile::{TempDir, tempdir_in}; -use tokio::fs::{canonicalize, create_dir_all, write}; +use tempfile::{tempdir_in, TempDir}; +use tokio::fs::{create_dir_all, write}; use tokio_async_drop::tokio_async_drop; use super::task_client::TaskClient; @@ -81,22 +81,19 @@ impl Task { } impl crate::traits::Task for Task { - async fn create(&self, verbose: bool) -> Result<()> { - let stdout = if !verbose { - PathBuf::new() - } else if let Ok(stdout) = canonicalize("/proc/self/fd/1").await { - stdout - } else { - PathBuf::new() - }; + async fn create(&self) -> Result<()> { + let stdout = self.dir.path().join("stdout"); + let stderr = self.dir.path().join("stderr"); - let stdout = stdout.to_string_lossy().into_owned(); + let _ = std::fs::write(&stdout, ""); + let _ = std::fs::write(&stderr, ""); self.client .create(CreateTaskRequest { id: self.id.clone(), bundle: self.dir.path().to_string_lossy().into_owned(), - stdout, + stdout: stdout.to_string_lossy().into_owned(), + stderr: stderr.to_string_lossy().into_owned(), rootfs: self.mounts.clone(), ..Default::default() }) @@ -116,12 +113,24 @@ impl crate::traits::Task for Task { } async fn wait(&self) -> Result<()> { +<<<<<<< HEAD self.client +======= + let status = self + .client +>>>>>>> babd163 ([stress-test] fail and print stdout/stderr if container exit status is not 0) .wait(WaitRequest { id: self.id.clone(), ..Default::default() }) - .await?; + .await? + .exit_status; + let stdout = std::fs::read_to_string(self.dir.path().join("stdout")).unwrap_or_default(); + let stderr = std::fs::read_to_string(self.dir.path().join("stderr")).unwrap_or_default(); + ensure!( + status == 0, + "Exit status {status}, stdout: {stdout:?}, stderr: {stderr:?}" + ); Ok(()) } diff --git a/crates/stress-test/src/mocks/task_client.rs b/crates/stress-test/src/mocks/task_client.rs index fccc8c9ba..967ceed6f 100644 --- a/crates/stress-test/src/mocks/task_client.rs +++ b/crates/stress-test/src/mocks/task_client.rs @@ -30,7 +30,6 @@ macro_rules! multiplex { .await } } - Ok(()) }}; } diff --git a/crates/stress-test/src/traits.rs b/crates/stress-test/src/traits.rs index 711f1a37d..4b51d1d9c 100644 --- a/crates/stress-test/src/traits.rs +++ b/crates/stress-test/src/traits.rs @@ -20,7 +20,7 @@ pub trait Shim { #[trait_variant::make(Send)] pub trait Task { - async fn create(&self, verbose: bool) -> Result<()>; + async fn create(&self) -> Result<()>; async fn start(&self) -> Result<()>; async fn wait(&self) -> Result<()>; async fn delete(&self) -> Result<()>; From 41db82d13f969646bce91aa8576446733fa1f307 Mon Sep 17 00:00:00 2001 From: Jorge Prendes Date: Thu, 20 Feb 2025 19:03:28 +0000 Subject: [PATCH 5/6] [stress-test] use image entrypoint no args specified in cli Signed-off-by: Jorge Prendes --- crates/stress-test/src/containerd/containerd.rs | 6 ++---- crates/stress-test/src/containerd/task.rs | 8 +------- crates/stress-test/src/main.rs | 17 +++++++++++++---- crates/stress-test/src/mocks/containerd.rs | 5 ++--- crates/stress-test/src/mocks/task.rs | 9 +-------- 5 files changed, 19 insertions(+), 26 deletions(-) diff --git a/crates/stress-test/src/containerd/containerd.rs b/crates/stress-test/src/containerd/containerd.rs index 5e94b4e9c..8c01eebb3 100644 --- a/crates/stress-test/src/containerd/containerd.rs +++ b/crates/stress-test/src/containerd/containerd.rs @@ -9,10 +9,8 @@ pub struct Containerd { } impl Containerd { - pub async fn new() -> Result { - Ok(Self { - containerd: Client::default().await?, - }) + pub async fn new(client: Client) -> Result { + Ok(Self { containerd: client }) } } diff --git a/crates/stress-test/src/containerd/task.rs b/crates/stress-test/src/containerd/task.rs index 714d487a7..43a04a6c2 100644 --- a/crates/stress-test/src/containerd/task.rs +++ b/crates/stress-test/src/containerd/task.rs @@ -31,15 +31,9 @@ impl Task { let runtime = runtime.into(); let id = make_task_id(); - let entrypoint = containerd.entrypoint(&image).await?; let mounts = containerd.get_mounts(&id, &image).await?; - let mut args: Vec<_> = args.into_iter().map(|arg| arg.into()).collect(); - if args.is_empty() { - args = entrypoint; - } else if let Some(argv0) = entrypoint.into_iter().next() { - args.insert(0, argv0); - } + let args: Vec<_> = args.into_iter().map(|arg| arg.into()).collect(); let process = ProcessBuilder::default() .user(UserBuilder::default().build().unwrap()) diff --git a/crates/stress-test/src/main.rs b/crates/stress-test/src/main.rs index 38b5fe61c..cee5f9e8b 100644 --- a/crates/stress-test/src/main.rs +++ b/crates/stress-test/src/main.rs @@ -79,7 +79,6 @@ struct Cli { /// Path to the shim binary shim: PathBuf, - #[clap(default_values = ["echo", "hello"])] /// Arguments to pass to the image args: Vec, } @@ -95,13 +94,23 @@ async fn main() -> Result<()> { async fn main_impl() -> Result<()> { env_logger::try_init()?; - let cli = Cli::parse(); + let mut cli = Cli::parse(); + + let client = containerd::Client::default().await?; + + if cli.args.is_empty() { + if cli.image == "ghcr.io/containerd/runwasi/wasi-demo-app:latest" { + cli.args = vec!["/wasi-demo-app.wasm".into(), "echo".into(), "hello".into()]; + } else { + cli.args = client.entrypoint(&cli.image).await?; + } + } if cli.containerd { - let containerd = containerd::Containerd::new().await?; + let containerd = containerd::Containerd::new(client).await?; run_stress_test(cli, containerd).await } else { - let containerd = mocks::Containerd::new(cli.verbose).await?; + let containerd = mocks::Containerd::new(client, cli.verbose).await?; run_stress_test(cli, containerd).await } } diff --git a/crates/stress-test/src/mocks/containerd.rs b/crates/stress-test/src/mocks/containerd.rs index 743f00b01..9e36c3dad 100644 --- a/crates/stress-test/src/mocks/containerd.rs +++ b/crates/stress-test/src/mocks/containerd.rs @@ -25,10 +25,9 @@ pub struct Containerd { } impl Containerd { - pub async fn new(verbose: bool) -> Result { + pub async fn new(client: containerd::Client, verbose: bool) -> Result { let dir = tempdir()?; let socket = dir.path().join("containerd.sock.ttrpc"); - let containerd = containerd::Client::default().await?; let _server = Server::new() .register(service!(EventsService: Events)) @@ -39,7 +38,7 @@ impl Containerd { dir, _server, verbose, - containerd, + containerd: client, }) } } diff --git a/crates/stress-test/src/mocks/task.rs b/crates/stress-test/src/mocks/task.rs index 6003a1107..fee8c3043 100644 --- a/crates/stress-test/src/mocks/task.rs +++ b/crates/stress-test/src/mocks/task.rs @@ -35,14 +35,7 @@ impl Task { let mounts = containerd.get_mounts(&id, &image).await?; let mounts = map_mounts(mounts); - let entrypoint = containerd.entrypoint(&image).await?; - - let mut args: Vec<_> = args.into_iter().map(|arg| arg.into()).collect(); - if args.is_empty() { - args = entrypoint; - } else if let Some(argv0) = entrypoint.into_iter().next() { - args.insert(0, argv0); - } + let args: Vec<_> = args.into_iter().map(|arg| arg.into()).collect(); let process = ProcessBuilder::default() .user(UserBuilder::default().build().unwrap()) From 04aed358b625b2d1fe0e9b4896ee770f7a66d958 Mon Sep 17 00:00:00 2001 From: Jorge Prendes Date: Thu, 20 Feb 2025 19:03:59 +0000 Subject: [PATCH 6/6] [stress-test] make rootfs writable Signed-off-by: Jorge Prendes Signed-off-by: Jiaxiao (mossaka) Zhou --- crates/stress-test/src/containerd/client.rs | 9 +++++++-- crates/stress-test/src/containerd/task.rs | 9 ++++++--- crates/stress-test/src/mocks/task.rs | 13 ++++++------- 3 files changed, 19 insertions(+), 12 deletions(-) diff --git a/crates/stress-test/src/containerd/client.rs b/crates/stress-test/src/containerd/client.rs index aea7dcc78..055903fd8 100644 --- a/crates/stress-test/src/containerd/client.rs +++ b/crates/stress-test/src/containerd/client.rs @@ -360,7 +360,9 @@ impl Client { stdout: impl Into, stderr: impl Into, ) -> Result<()> { - self.0.create_task(container_id, mounts, stdout, stderr).await + self.0 + .create_task(container_id, mounts, stdout, stderr) + .await } pub async fn start_task(&self, container_id: impl Into) -> Result<()> { @@ -408,6 +410,9 @@ mod test { String::from("sha256:4d851d7c3ef9a3cb8c6553806846038c3c81498e1f6d6dc60bb03291f223b99a"), ]; let chain_id = super::chain_id(diffs); - assert_eq!(chain_id, "sha256:0f8505411d5fe958101c5e6b6e31c61262a05f7aff548bf7742ff1ad24d6bf88"); + assert_eq!( + chain_id, + "sha256:0f8505411d5fe958101c5e6b6e31c61262a05f7aff548bf7742ff1ad24d6bf88" + ); } } diff --git a/crates/stress-test/src/containerd/task.rs b/crates/stress-test/src/containerd/task.rs index 43a04a6c2..58bb9c038 100644 --- a/crates/stress-test/src/containerd/task.rs +++ b/crates/stress-test/src/containerd/task.rs @@ -1,7 +1,7 @@ -use anyhow::{ensure, Result}; +use anyhow::{Result, ensure}; use containerd_client::types::Mount; use oci_spec::runtime::{ProcessBuilder, RootBuilder, Spec, SpecBuilder, UserBuilder}; -use tempfile::{tempdir, TempDir}; +use tempfile::{TempDir, tempdir}; use tokio_async_drop::tokio_async_drop; use super::Client; @@ -99,7 +99,10 @@ impl crate::traits::Task for Task { let status = self.containerd.wait_task(&self.id).await?; let stdout = std::fs::read_to_string(self.dir.path().join("stdout")).unwrap_or_default(); let stderr = std::fs::read_to_string(self.dir.path().join("stderr")).unwrap_or_default(); - ensure!(status == 0, "Exit status {status}, stdout: {stdout:?}, stderr: {stderr:?}"); + ensure!( + status == 0, + "Exit status {status}, stdout: {stdout:?}, stderr: {stderr:?}" + ); Ok(()) } diff --git a/crates/stress-test/src/mocks/task.rs b/crates/stress-test/src/mocks/task.rs index fee8c3043..eeb29807f 100644 --- a/crates/stress-test/src/mocks/task.rs +++ b/crates/stress-test/src/mocks/task.rs @@ -1,9 +1,9 @@ use std::path::{Path, PathBuf}; -use anyhow::{ensure, Result}; +use anyhow::{Result, ensure}; use nix::NixPath; use oci_spec::runtime::{ProcessBuilder, RootBuilder, SpecBuilder, UserBuilder}; -use tempfile::{tempdir_in, TempDir}; +use tempfile::{TempDir, tempdir_in}; use tokio::fs::{create_dir_all, write}; use tokio_async_drop::tokio_async_drop; @@ -48,7 +48,10 @@ impl Task { format!("sandbox-{}", std::process::id()), )]; - let root = RootBuilder::default().path("rootfs").build()?; + let root = RootBuilder::default() + .path("rootfs") + .readonly(false) + .build()?; let spec = SpecBuilder::default() .version("1.1.0") @@ -106,12 +109,8 @@ impl crate::traits::Task for Task { } async fn wait(&self) -> Result<()> { -<<<<<<< HEAD - self.client -======= let status = self .client ->>>>>>> babd163 ([stress-test] fail and print stdout/stderr if container exit status is not 0) .wait(WaitRequest { id: self.id.clone(), ..Default::default()