From cb54cbd309cbc17e3fa60302c0cd6a73f81217f7 Mon Sep 17 00:00:00 2001 From: Klaus Ma Date: Fri, 23 Feb 2024 06:10:03 +0000 Subject: [PATCH] connect to host and xpu. Signed-off-by: Klaus Ma --- Cargo.lock | 2 + shim/Cargo.toml | 1 + shim/src/cfg/mod.rs | 11 ++-- shim/src/cri/image.rs | 54 ++++++++++++++++---- shim/src/cri/runtime.rs | 97 +++++++++++++++++++++++------------- shim/src/main.rs | 30 ++++++++--- shim/systemd/chariot.service | 4 ++ 7 files changed, 143 insertions(+), 56 deletions(-) create mode 100644 shim/systemd/chariot.service diff --git a/Cargo.lock b/Cargo.lock index 9feb54b..2b49f96 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -241,6 +241,7 @@ dependencies = [ "tokio-stream", "tonic", "tonic-build", + "tower", "tracing", "tracing-subscriber", ] @@ -1412,6 +1413,7 @@ version = "0.1.40" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ + "log", "pin-project-lite", "tracing-attributes", "tracing-core", diff --git a/shim/Cargo.toml b/shim/Cargo.toml index d76d443..fdbd353 100644 --- a/shim/Cargo.toml +++ b/shim/Cargo.toml @@ -19,6 +19,7 @@ prost = "0.12" prost-types = "0.12" prost-build = "0.12" async-trait = "0.1" +tower = "0.4" [build-dependencies] tonic-build = { workspace = true } diff --git a/shim/src/cfg/mod.rs b/shim/src/cfg/mod.rs index 52d676a..20fa20d 100644 --- a/shim/src/cfg/mod.rs +++ b/shim/src/cfg/mod.rs @@ -17,10 +17,11 @@ use clap::Parser; #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] pub struct Options { - // The address of Unix socket for Chariot shim. - // #[arg(short, long, default_value=cri::DEFAULT_UNIX_SOCKET)] - // pub address: String, /// The address of CRI server in XPU. - #[arg(short, long)] - pub xpu_address: String, + #[arg(long)] + pub xpu_cri: String, + + /// The address of CRI server in host. + #[arg(long)] + pub host_cri: String, } diff --git a/shim/src/cri/image.rs b/shim/src/cri/image.rs index bf0d18a..40faef3 100644 --- a/shim/src/cri/image.rs +++ b/shim/src/cri/image.rs @@ -11,7 +11,10 @@ See the License for the specific language governing permissions and limitations under the License. */ +use tokio::net::UnixStream; use tonic::transport::Channel; +use tonic::transport::{Endpoint, Uri}; +use tower::service_fn; use tracing::info; use self::crirpc::image_service_client::ImageServiceClient; @@ -26,30 +29,59 @@ use crate::common::ChariotError; pub struct ImageShim { pub xpu_client: ImageServiceClient, + pub host_client: ImageServiceClient, } impl ImageShim { - pub async fn connect(cri_addr: String) -> Result { - let mut image_client = ImageServiceClient::connect(cri_addr) + pub async fn connect(host_cri: String, xpu_cri: String) -> Result { + let mut xpu_client = ImageServiceClient::connect(xpu_cri) .await .map_err(|e| ChariotError::NetworkError(e.to_string()))?; - let resp = image_client + // Log XPU image FS info + let resp = xpu_client .image_fs_info(ImageFsInfoRequest {}) .await .map_err(|e| ChariotError::CriError(e.to_string()))?; let fs_info = resp.into_inner(); for fs in fs_info.container_filesystems { - info!("Container FS: {:?}", fs.fs_id.map(|i| i.mountpoint),); + info!("XPU container FS: {:?}", fs.fs_id.map(|i| i.mountpoint),); } for fs in fs_info.image_filesystems { - info!("Image FS: {:?}", fs.fs_id.map(|i| i.mountpoint),); + info!("XPU image FS: {:?}", fs.fs_id.map(|i| i.mountpoint),); + } + + let channel = Endpoint::try_from("http://[::]:50051") + .map_err(|e| ChariotError::NetworkError(e.to_string()))? + .connect_with_connector(service_fn(move |_: Uri| { + let host_path = host_cri.clone(); + UnixStream::connect(host_path) + })) + .await + .map_err(|e| ChariotError::NetworkError(e.to_string()))?; + + let mut host_client = ImageServiceClient::new(channel); + + // Log host image FS info + let resp = host_client + .image_fs_info(ImageFsInfoRequest {}) + .await + .map_err(|e| ChariotError::CriError(e.to_string()))?; + let fs_info = resp.into_inner(); + + for fs in fs_info.container_filesystems { + info!("Host container FS: {:?}", fs.fs_id.map(|i| i.mountpoint),); + } + + for fs in fs_info.image_filesystems { + info!("Host image FS: {:?}", fs.fs_id.map(|i| i.mountpoint),); } Ok(ImageShim { - xpu_client: image_client, + xpu_client, + host_client, }) } } @@ -60,7 +92,7 @@ impl crirpc::image_service_server::ImageService for ImageShim { &self, request: tonic::Request, ) -> Result, tonic::Status> { - let mut client = self.xpu_client.clone(); + let mut client = self.host_client.clone(); client.list_images(request).await } @@ -69,7 +101,7 @@ impl crirpc::image_service_server::ImageService for ImageShim { &self, request: tonic::Request, ) -> Result, tonic::Status> { - let mut client = self.xpu_client.clone(); + let mut client = self.host_client.clone(); client.image_status(request).await } @@ -78,7 +110,7 @@ impl crirpc::image_service_server::ImageService for ImageShim { &self, request: tonic::Request, ) -> Result, tonic::Status> { - let mut client = self.xpu_client.clone(); + let mut client = self.host_client.clone(); client.pull_image(request).await } @@ -87,7 +119,7 @@ impl crirpc::image_service_server::ImageService for ImageShim { &self, request: tonic::Request, ) -> Result, tonic::Status> { - let mut client = self.xpu_client.clone(); + let mut client = self.host_client.clone(); client.remove_image(request).await } @@ -96,7 +128,7 @@ impl crirpc::image_service_server::ImageService for ImageShim { &self, request: tonic::Request, ) -> Result, tonic::Status> { - let mut client = self.xpu_client.clone(); + let mut client = self.host_client.clone(); client.image_fs_info(request).await } diff --git a/shim/src/cri/runtime.rs b/shim/src/cri/runtime.rs index 6808647..7a271fb 100644 --- a/shim/src/cri/runtime.rs +++ b/shim/src/cri/runtime.rs @@ -13,7 +13,10 @@ limitations under the License. use std::pin::Pin; use futures::Stream; +use tokio::net::UnixStream; use tonic::transport::Channel; +use tonic::transport::{Endpoint, Uri}; +use tower::service_fn; use tracing::info; use self::crirpc::runtime_service_client::RuntimeServiceClient; @@ -41,28 +44,54 @@ use crate::common::ChariotError; pub struct RuntimeShim { pub xpu_client: RuntimeServiceClient, + pub host_client: RuntimeServiceClient, } impl RuntimeShim { - pub async fn connect(cri_addr: String) -> Result { - let mut runtime_client = RuntimeServiceClient::connect(cri_addr) + pub async fn connect(host_cri: String, xpu_cri: String) -> Result { + let mut xpu_client = RuntimeServiceClient::connect(xpu_cri) .await .map_err(|e| ChariotError::NetworkError(e.to_string()))?; + let channel = Endpoint::try_from("http://[::]:50051") + .map_err(|e| ChariotError::NetworkError(e.to_string()))? + .connect_with_connector(service_fn(move |_: Uri| { + let host_path = host_cri.clone(); + UnixStream::connect(host_path) + })) + .await + .map_err(|e| ChariotError::NetworkError(e.to_string()))?; + let mut host_client = RuntimeServiceClient::new(channel); + let request = crirpc::VersionRequest { version: "*".to_string(), }; - let version = runtime_client - .version(request) + let version = xpu_client + .version(request.clone()) + .await + .map_err(|e| ChariotError::CriError(e.to_string()))?; + let resp = version.into_inner(); + + info!( + "XPU runtime: {}/{}", + resp.runtime_name, resp.runtime_version + ); + + let version = host_client + .version(request.clone()) .await .map_err(|e| ChariotError::CriError(e.to_string()))?; let resp = version.into_inner(); - info!("Runtime: {}/{}", resp.runtime_name, resp.runtime_version); + info!( + "Host runtime: {}/{}", + resp.runtime_name, resp.runtime_version + ); Ok(RuntimeShim { - xpu_client: runtime_client, + xpu_client, + host_client, }) } } @@ -73,7 +102,7 @@ impl crirpc::runtime_service_server::RuntimeService for RuntimeShim { &self, request: tonic::Request, ) -> Result, tonic::Status> { - let mut client = self.xpu_client.clone(); + let mut client = self.host_client.clone(); client.version(request).await } @@ -82,7 +111,7 @@ impl crirpc::runtime_service_server::RuntimeService for RuntimeShim { &self, request: tonic::Request, ) -> Result, tonic::Status> { - let mut client = self.xpu_client.clone(); + let mut client = self.host_client.clone(); client.run_pod_sandbox(request).await } @@ -91,7 +120,7 @@ impl crirpc::runtime_service_server::RuntimeService for RuntimeShim { &self, request: tonic::Request, ) -> Result, tonic::Status> { - let mut client = self.xpu_client.clone(); + let mut client = self.host_client.clone(); client.stop_pod_sandbox(request).await } @@ -100,7 +129,7 @@ impl crirpc::runtime_service_server::RuntimeService for RuntimeShim { &self, request: tonic::Request, ) -> Result, tonic::Status> { - let mut client = self.xpu_client.clone(); + let mut client = self.host_client.clone(); client.remove_pod_sandbox(request).await } @@ -109,7 +138,7 @@ impl crirpc::runtime_service_server::RuntimeService for RuntimeShim { &self, request: tonic::Request, ) -> Result, tonic::Status> { - let mut client = self.xpu_client.clone(); + let mut client = self.host_client.clone(); client.pod_sandbox_status(request).await } @@ -118,7 +147,7 @@ impl crirpc::runtime_service_server::RuntimeService for RuntimeShim { &self, request: tonic::Request, ) -> Result, tonic::Status> { - let mut client = self.xpu_client.clone(); + let mut client = self.host_client.clone(); client.list_pod_sandbox(request).await } @@ -127,7 +156,7 @@ impl crirpc::runtime_service_server::RuntimeService for RuntimeShim { &self, request: tonic::Request, ) -> Result, tonic::Status> { - let mut client = self.xpu_client.clone(); + let mut client = self.host_client.clone(); client.create_container(request).await } @@ -136,7 +165,7 @@ impl crirpc::runtime_service_server::RuntimeService for RuntimeShim { &self, request: tonic::Request, ) -> Result, tonic::Status> { - let mut client = self.xpu_client.clone(); + let mut client = self.host_client.clone(); client.start_container(request).await } @@ -145,7 +174,7 @@ impl crirpc::runtime_service_server::RuntimeService for RuntimeShim { &self, request: tonic::Request, ) -> Result, tonic::Status> { - let mut client = self.xpu_client.clone(); + let mut client = self.host_client.clone(); client.stop_container(request).await } @@ -154,7 +183,7 @@ impl crirpc::runtime_service_server::RuntimeService for RuntimeShim { &self, request: tonic::Request, ) -> Result, tonic::Status> { - let mut client = self.xpu_client.clone(); + let mut client = self.host_client.clone(); client.remove_container(request).await } @@ -163,7 +192,7 @@ impl crirpc::runtime_service_server::RuntimeService for RuntimeShim { &self, request: tonic::Request, ) -> Result, tonic::Status> { - let mut client = self.xpu_client.clone(); + let mut client = self.host_client.clone(); client.list_containers(request).await } @@ -172,7 +201,7 @@ impl crirpc::runtime_service_server::RuntimeService for RuntimeShim { &self, request: tonic::Request, ) -> Result, tonic::Status> { - let mut client = self.xpu_client.clone(); + let mut client = self.host_client.clone(); client.container_status(request).await } @@ -181,7 +210,7 @@ impl crirpc::runtime_service_server::RuntimeService for RuntimeShim { &self, request: tonic::Request, ) -> Result, tonic::Status> { - let mut client = self.xpu_client.clone(); + let mut client = self.host_client.clone(); client.update_container_resources(request).await } @@ -190,7 +219,7 @@ impl crirpc::runtime_service_server::RuntimeService for RuntimeShim { &self, request: tonic::Request, ) -> Result, tonic::Status> { - let mut client = self.xpu_client.clone(); + let mut client = self.host_client.clone(); client.reopen_container_log(request).await } @@ -199,7 +228,7 @@ impl crirpc::runtime_service_server::RuntimeService for RuntimeShim { &self, request: tonic::Request, ) -> Result, tonic::Status> { - let mut client = self.xpu_client.clone(); + let mut client = self.host_client.clone(); client.exec_sync(request).await } @@ -208,7 +237,7 @@ impl crirpc::runtime_service_server::RuntimeService for RuntimeShim { &self, request: tonic::Request, ) -> Result, tonic::Status> { - let mut client = self.xpu_client.clone(); + let mut client = self.host_client.clone(); client.exec(request).await } @@ -217,7 +246,7 @@ impl crirpc::runtime_service_server::RuntimeService for RuntimeShim { &self, request: tonic::Request, ) -> Result, tonic::Status> { - let mut client = self.xpu_client.clone(); + let mut client = self.host_client.clone(); client.attach(request).await } @@ -226,7 +255,7 @@ impl crirpc::runtime_service_server::RuntimeService for RuntimeShim { &self, request: tonic::Request, ) -> Result, tonic::Status> { - let mut client = self.xpu_client.clone(); + let mut client = self.host_client.clone(); client.port_forward(request).await } @@ -235,7 +264,7 @@ impl crirpc::runtime_service_server::RuntimeService for RuntimeShim { &self, request: tonic::Request, ) -> Result, tonic::Status> { - let mut client = self.xpu_client.clone(); + let mut client = self.host_client.clone(); client.container_stats(request).await } @@ -244,7 +273,7 @@ impl crirpc::runtime_service_server::RuntimeService for RuntimeShim { &self, request: tonic::Request, ) -> Result, tonic::Status> { - let mut client = self.xpu_client.clone(); + let mut client = self.host_client.clone(); client.list_container_stats(request).await } @@ -253,7 +282,7 @@ impl crirpc::runtime_service_server::RuntimeService for RuntimeShim { &self, request: tonic::Request, ) -> Result, tonic::Status> { - let mut client = self.xpu_client.clone(); + let mut client = self.host_client.clone(); client.pod_sandbox_stats(request).await } @@ -262,7 +291,7 @@ impl crirpc::runtime_service_server::RuntimeService for RuntimeShim { &self, request: tonic::Request, ) -> Result, tonic::Status> { - let mut client = self.xpu_client.clone(); + let mut client = self.host_client.clone(); client.list_pod_sandbox_stats(request).await } @@ -271,7 +300,7 @@ impl crirpc::runtime_service_server::RuntimeService for RuntimeShim { &self, request: tonic::Request, ) -> Result, tonic::Status> { - let mut client = self.xpu_client.clone(); + let mut client = self.host_client.clone(); client.update_runtime_config(request).await } @@ -280,7 +309,7 @@ impl crirpc::runtime_service_server::RuntimeService for RuntimeShim { &self, request: tonic::Request, ) -> Result, tonic::Status> { - let mut client = self.xpu_client.clone(); + let mut client = self.host_client.clone(); client.status(request).await } @@ -289,7 +318,7 @@ impl crirpc::runtime_service_server::RuntimeService for RuntimeShim { &self, request: tonic::Request, ) -> Result, tonic::Status> { - let mut client = self.xpu_client.clone(); + let mut client = self.host_client.clone(); client.checkpoint_container(request).await } @@ -309,7 +338,7 @@ impl crirpc::runtime_service_server::RuntimeService for RuntimeShim { &self, request: tonic::Request, ) -> Result, tonic::Status> { - let mut client = self.xpu_client.clone(); + let mut client = self.host_client.clone(); client.list_metric_descriptors(request).await } @@ -318,7 +347,7 @@ impl crirpc::runtime_service_server::RuntimeService for RuntimeShim { &self, request: tonic::Request, ) -> Result, tonic::Status> { - let mut client = self.xpu_client.clone(); + let mut client = self.host_client.clone(); client.list_pod_sandbox_metrics(request).await } @@ -327,7 +356,7 @@ impl crirpc::runtime_service_server::RuntimeService for RuntimeShim { &self, request: tonic::Request, ) -> Result, tonic::Status> { - let mut client = self.xpu_client.clone(); + let mut client = self.host_client.clone(); client.runtime_config(request).await } diff --git a/shim/src/main.rs b/shim/src/main.rs index 94ec6c0..9885f62 100644 --- a/shim/src/main.rs +++ b/shim/src/main.rs @@ -26,22 +26,40 @@ use tokio::net::UnixListener; #[cfg(unix)] use tokio_stream::wrappers::UnixListenerStream; use tonic::transport::Server; +use tracing::info; +use crate::cri::image::ImageShim; +use crate::cri::runtime::RuntimeShim; use crate::rpc::cri::image_service_server::ImageServiceServer; use crate::rpc::cri::runtime_service_server::RuntimeServiceServer; #[cfg(unix)] #[tokio::main] async fn main() -> Result<(), Box> { - // construct a subscriber that prints formatted traces to stdout - let subscriber = tracing_subscriber::FmtSubscriber::new(); - // use that subscriber to process traces emitted after this point - tracing::subscriber::set_global_default(subscriber)?; + let log_level = match std::env::var("CHARIOT_LOG") { + Ok(level) => match level.as_str() { + "debug" => tracing::Level::DEBUG, + "info" => tracing::Level::INFO, + "error" => tracing::Level::ERROR, + "warn" => tracing::Level::WARN, + _ => tracing::Level::INFO, + }, + Err(_) => tracing::Level::INFO, + }; + + tracing_subscriber::fmt() + .with_max_level(log_level) + .with_target(false) + .init(); let args = cfg::Options::parse(); - let image_svc = cri::image::ImageShim::connect(args.xpu_address.clone()).await?; - let runtime_svc = cri::runtime::RuntimeShim::connect(args.xpu_address.clone()).await?; + info!("Chariot start the CRI listener at {}", cri::DEFAULT_UNIX_SOCKET); + info!("Connecting to Host CRI at {}", args.host_cri.clone()); + info!("Connecting to XPU CRI at {}", args.xpu_cri.clone()); + + let image_svc = ImageShim::connect(args.host_cri.clone(), args.xpu_cri.clone()).await?; + let runtime_svc = RuntimeShim::connect(args.host_cri.clone(), args.xpu_cri.clone()).await?; // TODO(k82cn): use the address from args. fs::create_dir_all(cri::DEFAULT_UNIX_SOCKET_DIR)?; diff --git a/shim/systemd/chariot.service b/shim/systemd/chariot.service new file mode 100644 index 0000000..ada8bec --- /dev/null +++ b/shim/systemd/chariot.service @@ -0,0 +1,4 @@ +[Service] +Environment="CHARIOT_LOG=info" +# EnvironmentFile= +ExecStart=/usr/bin/chariot-shim --host-cri /run/containerd/containerd.sock --xpu-cri http://192.168.100.2:9090 \ No newline at end of file