Skip to content

Commit

Permalink
feat(otel): add otel collector to the core crate
Browse files Browse the repository at this point in the history
this commit adds otel collector APIs and a new opentelemetry feature to the wasm shim

Signed-off-by: jiaxiao zhou <jiazho@microsoft.com>
  • Loading branch information
Mossaka committed May 6, 2024
1 parent 685da5c commit 8a3f861
Show file tree
Hide file tree
Showing 15 changed files with 444 additions and 37 deletions.
290 changes: 283 additions & 7 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ homepage = "https://github.com/containerd/runwasi"
anyhow = "1.0"
cap-std = "1.0"
chrono = { version = "0.4", default-features = false, features = ["clock"] }
containerd-shim = "0.7.1"
containerd-shim-wasm = { path = "crates/containerd-shim-wasm", version = "0.5.0" }
containerd-shim = { path = "../rust-extensions-fork/crates/shim" }
containerd-shim-wasm = { path = "crates/containerd-shim-wasm" }
containerd-shim-wasm-test-modules = { path = "crates/containerd-shim-wasm-test-modules", version = "0.4.0"}
oci-tar-builder = { path = "crates/oci-tar-builder", version = "0.4.0" }
crossbeam = { version = "0.8.4", default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ test-oci-tar-builder:
.PHONY: install install-%
install: $(RUNTIMES:%=install-%);

install-%: build-%
install-%:
mkdir -p $(PREFIX)/bin
$(INSTALL) $(TARGET_DIR)/$(TARGET)/$(OPT_PROFILE)/containerd-shim-$*-v1 $(PREFIX)/bin/
$(LN) ./containerd-shim-$*-v1 $(PREFIX)/bin/containerd-shim-$*d-v1
Expand Down
48 changes: 41 additions & 7 deletions crates/containerd-shim-wasm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,46 @@ tempfile = { workspace = true, optional = true }
thiserror = { workspace = true }
ttrpc = { workspace = true }
wat = { workspace = true }
tokio = { version = "1.37.0", features = [ "full" ] }
tokio = { version = "1.37.0", features = ["full"] }
futures = { version = "0.3.30" }
wasmparser = "0.206.0"
tokio-stream = { version = "0.1" }
prost-types = "0.12" # should match version in containerd-shim
prost-types = "0.12" # should match version in containerd-shim
sha256 = { workspace = true }
tracing = { workspace = true }

# opentelemetry
opentelemetry = { version = "0.22", features = ["trace"], optional = true }
opentelemetry-otlp = { version = "0.15.0", features = [
"tonic",
], optional = true }
opentelemetry_sdk = { version = "0.22", features = [
"rt-tokio",
], optional = true }
tracing-subscriber = { version = "0.3", features = [
"env-filter",
], optional = true }
tracing-opentelemetry = { version = "0.23", optional = true }


[target.'cfg(unix)'.dependencies]
caps = "0.5"
# this must match the version pulled by libcontainer
dbus = { version = "0", features = ["vendored"] }
libcontainer = { workspace = true, features = ["libseccomp", "systemd", "v1", "v2"]}
libcontainer = { workspace = true, features = [
"libseccomp",
"systemd",
"v1",
"v2",
] }
nix = { workspace = true, features = ["sched", "mount"] }
containerd-client = "0.5.0"

[target.'cfg(windows)'.dependencies]
windows-sys = { workspace = true, features = ["Win32_Foundation", "Win32_Storage_FileSystem"] }
windows-sys = { workspace = true, features = [
"Win32_Foundation",
"Win32_Storage_FileSystem",
] }

[build-dependencies]
ttrpc-codegen = { version = "0.4.2" }
Expand All @@ -56,8 +78,20 @@ ttrpc-codegen = { version = "0.4.2" }
containerd-shim-wasm-test-modules = { workspace = true }
env_logger = { workspace = true }
tempfile = { workspace = true }
oci-tar-builder = { workspace = true}
rand= "0.8"
oci-tar-builder = { workspace = true }
rand = "0.8"

[features]
testing = ["dep:containerd-shim-wasm-test-modules", "dep:env_logger", "dep:tempfile", "dep:oci-tar-builder"]
testing = [
"dep:containerd-shim-wasm-test-modules",
"dep:env_logger",
"dep:tempfile",
"dep:oci-tar-builder",
]
opentelemetry = [
"dep:opentelemetry",
"dep:opentelemetry-otlp",
"dep:opentelemetry_sdk",
"dep:tracing-subscriber",
"dep:tracing-opentelemetry",
]
56 changes: 53 additions & 3 deletions crates/containerd-shim-wasm/src/sandbox/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,20 @@ use std::sync::mpsc::channel;
use std::sync::Arc;

use containerd_shim::{parse, run, Config};
use tracing::{instrument, Span};
use opentelemetry::global::{set_text_map_propagator, shutdown_tracer_provider};
use opentelemetry_sdk::propagation::TraceContextPropagator;
use tracing::instrument;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::{EnvFilter, Registry};
use ttrpc::Server;

use crate::sandbox::manager::Shim;
use crate::sandbox::shim::Local;
use crate::sandbox::shim::{init_tracer, Local};
use crate::sandbox::{Instance, ManagerService, ShimCli};
use crate::services::sandbox_ttrpc::{create_manager, Manager};

const OTEL_EXPORTER_OTLP_ENDPOINT: &str = "OTEL_EXPORTER_OTLP_ENDPOINT";

pub mod r#impl {
pub use git_version::git_version;
}
Expand All @@ -37,7 +43,47 @@ macro_rules! revision {
};
}

#[instrument(skip_all, parent = Span::current(), level= "Info")]
/// Main entry point for the shim with OpenTelemetry tracing.
///
/// It parses the `OTEL_EXPORTER_OTLP_ENDPOINT` environment variable to determine
/// if the shim should be started with OpenTelemetry tracing.
///
/// If the environment variable is not set, the shim will be started without tracing.
/// If the environment variable is empty, the shim will be started without tracing.
#[cfg(feature = "opentelemetry")]
pub fn shim_main_with_otel<'a, I>(
name: &str,
version: &str,
revision: impl Into<Option<&'a str>>,
shim_version: impl Into<Option<&'a str>>,
config: Option<Config>,
) where
I: 'static + Instance + Sync + Send,
I::Engine: Default,
{
let otel_endpoint = std::env::var(OTEL_EXPORTER_OTLP_ENDPOINT);
if otel_endpoint.is_err() || otel_endpoint.clone().unwrap().is_empty() {
shim_main::<I>(name, version, revision, shim_version, config);
} else {
let tracer =
init_tracer(&otel_endpoint.unwrap(), name).expect("Failed to initialize tracer.");
let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);
set_text_map_propagator(TraceContextPropagator::new());

// Create an environment filter
let filter = EnvFilter::try_new("info,h2=off") // Set default level to `info` and exclude all traces from `h2`
.expect("Invalid filter directive");

let subscriber = Registry::default().with(telemetry).with(filter);

tracing::subscriber::set_global_default(subscriber)
.expect("setting default subscriber failed");
shim_main::<I>(name, version, revision, shim_version, config);
shutdown_tracer_provider();
}
}

#[instrument(skip_all, level = "Info")]
pub fn shim_main<'a, I>(
name: &str,
version: &str,
Expand All @@ -49,6 +95,7 @@ pub fn shim_main<'a, I>(
I::Engine: Default,
{
let os_args: Vec<_> = std::env::args_os().collect();

let flags = parse(&os_args[1..]).unwrap();
let argv0 = PathBuf::from(&os_args[0]);
let argv0 = argv0.file_stem().unwrap_or_default().to_string_lossy();
Expand All @@ -62,6 +109,7 @@ pub fn shim_main<'a, I>(

std::process::exit(0);
}

let shim_version = shim_version.into().unwrap_or("v1");

let lower_name = name.to_lowercase();
Expand Down Expand Up @@ -95,7 +143,9 @@ pub fn shim_main<'a, I>(
}
_ => {
eprintln!("error: unrecognized binary name, expected one of {shim_cli}, {shim_client}, or {shim_daemon}.");
shutdown_tracer_provider();
std::process::exit(1);
}
}
shutdown_tracer_provider();
}
2 changes: 1 addition & 1 deletion crates/containerd-shim-wasm/src/sandbox/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ pub trait Instance: 'static {

/// Waits for the instance to finish and retunrs its exit code
/// This is a blocking call.
#[instrument(skip_all, parent = Span::current(), level= "Info")]
#[instrument(skip(self), parent = Span::current(), level= "Info")]
fn wait(&self) -> (u32, DateTime<Utc>) {
self.wait_timeout(None).unwrap()
}
Expand Down
20 changes: 12 additions & 8 deletions crates/containerd-shim-wasm/src/sandbox/shim/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use tracing::{instrument, Span};
use crate::sandbox::instance::Instance;
use crate::sandbox::shim::events::{RemoteEventSender, ToTimestamp};
use crate::sandbox::shim::local::Local;
use crate::sys::networking::setup_namespaces;

/// Cli implements the containerd-shim cli interface using `Local<T>` as the task service.
pub struct Cli<T: Instance + Sync + Send> {
Expand Down Expand Up @@ -46,7 +45,7 @@ where
{
type T = Local<I>;

#[instrument(skip_all, parent = Span::current(), level= "Info")]
#[instrument(skip_all, level = "Info")]
fn new(_runtime_id: &str, args: &Flags, _config: &mut shim::Config) -> Self {
Cli {
engine: Default::default(),
Expand All @@ -71,22 +70,27 @@ where
.and_then(|a| a.get("io.kubernetes.cri.sandbox-id"))
.unwrap_or(&id);

setup_namespaces(&spec)
.map_err(|e| shim::Error::Other(format!("failed to setup namespaces: {}", e)))?;
let otel_endpoint = std::env::var_os("OTEL_EXPORTER_OTLP_ENDPOINT")
.map(|s| s.to_string_lossy().to_string())
.unwrap_or_else(|| "".to_string());

let (_child, address) = shim::spawn(opts, grouping, vec![])?;
let (_child, address) = shim::spawn(
opts,
grouping,
vec![("OTEL_EXPORTER_OTLP_ENDPOINT", &otel_endpoint)],
)?;

write_address(&address)?;

Ok(address)
}

#[instrument(skip_all, parent = Span::current(), level = "Info")]
#[instrument(skip_all, level = "Info")]
fn wait(&mut self) {
self.exit.wait();
}

#[instrument(skip_all, parent = Span::current(), level= "Info")]
#[instrument(skip_all, level = "Info")]
fn create_task_service(&self, publisher: RemotePublisher) -> Self::T {
let events = RemoteEventSender::new(&self.namespace, publisher);
let exit = self.exit.clone();
Expand All @@ -100,7 +104,7 @@ where
)
}

#[instrument(skip_all, parent = Span::current(), level= "Info")]
#[instrument(skip_all, level = "Info")]
fn delete_shim(&mut self) -> shim::Result<api::DeleteResponse> {
Ok(api::DeleteResponse {
exit_status: 137,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl<I: Instance> Instance for InstanceOption<I> {
}
}

#[instrument(skip_all, parent = Span::current(), level= "Info")]
#[instrument(skip(self, t), parent = Span::current(), level= "Info")]
fn wait_timeout(&self, t: impl Into<Option<Duration>>) -> Option<(u32, DateTime<Utc>)> {
match self {
Self::Instance(i) => i.wait_timeout(t),
Expand Down
2 changes: 0 additions & 2 deletions crates/containerd-shim-wasm/src/sandbox/shim/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,6 @@ impl<T: Instance + Send + Sync, E: EventSender> Local<T, E> {
.context("could not spawn thread to wait exit")
.map_err(Error::from)?;

debug!("started: {:?}", req);

Ok(StartResponse {
pid,
..Default::default()
Expand Down
3 changes: 3 additions & 0 deletions crates/containerd-shim-wasm/src/sandbox/shim/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ mod events;
mod instance_data;
mod instance_option;
mod local;
mod otel;
mod task_state;

pub use cli::Cli;
pub(crate) use local::Local;
#[cfg(feature = "opentelemetry")]
pub use otel::init_tracer;
31 changes: 31 additions & 0 deletions crates/containerd-shim-wasm/src/sandbox/shim/otel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use opentelemetry::trace::TraceError;
use opentelemetry::KeyValue;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::{runtime, trace as sdktrace, Resource};

/// Initialize a new OpenTelemetry tracer with the OTLP exporter.
/// The `otel_endpoint` is the endpoint passed down from the
/// environment variable `OTEL_EXPORTER_OTLP_ENDPOINT` from Containerd.
///
/// The `name` is the name of the service that will be used as a resource.
///
/// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/exporter.md#configuration-options
pub fn init_tracer(
otel_endpoint: &str,
name: &str,
) -> Result<opentelemetry_sdk::trace::Tracer, TraceError> {
opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(otel_endpoint),
)
.with_trace_config(
sdktrace::config().with_resource(Resource::new(vec![KeyValue::new(
"service.name",
name.to_string(),
)])),
)
.install_batch(runtime::Tokio)
}
7 changes: 7 additions & 0 deletions crates/containerd-shim-wasm/src/sandbox/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ impl<T> WaitableCell<T> {
let timeout = timeout.into();
let cvar = &self.inner.cvar;
let guard = self.inner.mutex.lock().unwrap();

let span = tracing::span!(tracing::Level::INFO, "wait_timeout");
let _enter = span.enter();

let _guard = match timeout {
None => cvar
.wait_while(guard, |_| self.inner.cell.get().is_none())
Expand All @@ -110,6 +114,9 @@ impl<T> WaitableCell<T> {
.map(|(guard, _)| guard)
.unwrap(),
};

let _re_enter = span.enter();

self.inner.cell.get()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ impl<E: Engine> SandboxInstance for Instance<E> {
/// Waits for the instance to finish and retunrs its exit code
/// Returns None if the timeout is reached before the instance has finished.
/// This is a blocking call.
#[instrument(skip_all, parent = Span::current(), level= "Info")]
#[instrument(skip(self, t), parent = Span::current(), level= "Info")]
fn wait_timeout(&self, t: impl Into<Option<Duration>>) -> Option<(u32, DateTime<Utc>)> {
self.exit_code.wait_timeout(t).copied()
}
Expand Down
5 changes: 4 additions & 1 deletion crates/containerd-shim-wasmtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ edition.workspace = true
[dependencies]
anyhow = { workspace = true }
containerd-shim = { workspace = true }
containerd-shim-wasm = { workspace = true }
containerd-shim-wasm = { workspace = true, features = ["opentelemetry"] }
log = { workspace = true }
oci-spec = { workspace = true, features = ["runtime"] }
ttrpc = { workspace = true }
Expand All @@ -33,6 +33,9 @@ wasmtime = { version = "17.0", default-features = false, features = [
wasmtime-wasi = { version = "17.0", features = ["exit"] }
wasi-common = "17.0"

# OpenTelemetry dependencies
tokio = { version = "1.35", features = ["full"] }

[dev-dependencies]
containerd-shim-wasm = { workspace = true, features = ["testing"] }
serial_test = { workspace = true }
Expand Down
7 changes: 4 additions & 3 deletions crates/containerd-shim-wasmtime/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use containerd_shim_wasm::sandbox::cli::{revision, shim_main, version};
use containerd_shim_wasm::sandbox::cli::{revision, shim_main_with_otel, version};
use containerd_shim_wasmtime::WasmtimeInstance;

fn main() {
shim_main::<WasmtimeInstance>("wasmtime", version!(), revision!(), "v1", None);
#[tokio::main]
async fn main() {
shim_main_with_otel::<WasmtimeInstance>("wasmtime", version!(), revision!(), "v1", None);
}

0 comments on commit 8a3f861

Please sign in to comment.