diff --git a/Cargo.lock b/Cargo.lock index 86b5f27..37b9cbb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -36,7 +36,7 @@ dependencies = [ "getrandom 0.2.15", "once_cell", "version_check", - "zerocopy", + "zerocopy 0.7.35", ] [[package]] @@ -312,9 +312,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.9.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "325918d6fe32f23b19878fe4b34794ae41fc19ddbe53b10571a4874d44ffd39b" +checksum = "f61dac84819c6588b558454b194026eb1f09c293b9036ae9b159e74e73ab6cf9" [[package]] name = "cap-fs-ext" @@ -364,7 +364,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ce977bea95e49cc352bf8253719d872d27486e56f91b5491e20a827ab2c1a16" dependencies = [ "ambient-authority", - "rand", + "rand 0.8.5", ] [[package]] @@ -395,9 +395,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.11" +version = "1.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4730490333d58093109dc02c23174c3f4d490998c3fed3cc8e82d57afedb9cf" +checksum = "755717a7de9ec452bf7f3f1a3099085deabd7f2962b861dae91ecd7a365903d2" dependencies = [ "jobserver", "libc", @@ -426,9 +426,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.27" +version = "4.5.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "769b0145982b4b48713e01ec42d61614425f27b7058bda7180a3a41f30104796" +checksum = "3e77c3243bd94243c03672cb5154667347c457ca271254724f9f393aee1c05ff" dependencies = [ "clap_builder", "clap_derive", @@ -448,9 +448,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.5.24" +version = "4.5.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54b755194d6389280185988721fffba69495eed5ee9feeee9a599b53db80318c" +checksum = "bf4ced95c6f4a675af3da73304b9ac4ed991640c36374e4b46795c49e17cf1ed" dependencies = [ "heck 0.5.0", "proc-macro2", @@ -951,12 +951,14 @@ dependencies = [ "clap", "common", "env_logger", + "hyper-util", "lazy_static", "log", "prost", "rpc", "tokio", "tonic", + "tower 0.5.2", "uuid", "wasmtime", "wasmtime-wasi", @@ -965,6 +967,16 @@ dependencies = [ [[package]] name = "flame-service" version = "0.1.0" +dependencies = [ + "bytes", + "prost", + "thiserror", + "tokio", + "tokio-stream", + "tonic", + "tonic-build", + "tower 0.4.13", +] [[package]] name = "flame-session-manager" @@ -1034,6 +1046,7 @@ dependencies = [ "common", "env_logger", "flame-client", + "flame-service", "futures", "gethostname", "indicatif", @@ -1798,6 +1811,12 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "884e2677b40cc8c339eaefcb701c32ef1fd2493d71118dc0ca4b6a736c93bd67" +[[package]] +name = "leb128fmt" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" + [[package]] name = "libc" version = "0.2.169" @@ -1994,7 +2013,7 @@ dependencies = [ "num-integer", "num-iter", "num-traits", - "rand", + "rand 0.8.5", "smallvec", "zeroize", ] @@ -2064,9 +2083,9 @@ checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775" [[package]] name = "openssl" -version = "0.10.69" +version = "0.10.70" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f5e534d133a060a3c19daec1eb3e98ec6f4685978834f2dbadfe2ec215bab64e" +checksum = "61cfb4e166a8bb8c9b55c500bc2308550148ece889be90f609377e58140f42c6" dependencies = [ "bitflags", "cfg-if", @@ -2096,9 +2115,9 @@ checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" [[package]] name = "openssl-sys" -version = "0.9.104" +version = "0.9.105" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45abf306cbf99debc8195b66b7346498d7b10c210de50418b5ccd7ceba08c741" +checksum = "8b22d5b84be05a8d6947c7cb71f7c849aa0f112acd4bf51c2a7c1c988ac0a9dc" dependencies = [ "cc", "libc", @@ -2162,18 +2181,18 @@ dependencies = [ [[package]] name = "pin-project" -version = "1.1.8" +version = "1.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e2ec53ad785f4d35dac0adea7f7dc6f1bb277ad84a680c7afefeae05d1f5916" +checksum = "dfe2e71e1471fe07709406bf725f710b02927c9c54b2b5b2ec0e8087d97c327d" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.1.8" +version = "1.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d56a66c0c55993aa927429d0f8a0abfd74f084e4d9c192cffed01e418d83eefb" +checksum = "f6e859e6e5bd50440ab63c47e3ebabc90f26251f7c73c3d3e837b74a1cc3fa67" dependencies = [ "proc-macro2", "quote", @@ -2231,7 +2250,7 @@ version = "0.2.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77957b295656769bb8ad2b6a6b09d897d94f05c41b069aede1fcdaa675eaea04" dependencies = [ - "zerocopy", + "zerocopy 0.7.35", ] [[package]] @@ -2330,8 +2349,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ "libc", - "rand_chacha", - "rand_core", + "rand_chacha 0.3.1", + "rand_core 0.6.4", +] + +[[package]] +name = "rand" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3779b94aeb87e8bd4e834cee3650289ee9e0d5677f976ecdb6d219e5f4f6cd94" +dependencies = [ + "rand_chacha 0.9.0", + "rand_core 0.9.0", + "zerocopy 0.8.16", ] [[package]] @@ -2341,7 +2371,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.6.4", +] + +[[package]] +name = "rand_chacha" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" +dependencies = [ + "ppv-lite86", + "rand_core 0.9.0", ] [[package]] @@ -2353,6 +2393,16 @@ dependencies = [ "getrandom 0.2.15", ] +[[package]] +name = "rand_core" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b08f3c9802962f7e1b25113931d94f43ed9725bebc59db9d0c3e9a23b67e15ff" +dependencies = [ + "getrandom 0.3.1", + "zerocopy 0.8.16", +] + [[package]] name = "rayon" version = "1.10.0" @@ -2459,7 +2509,7 @@ dependencies = [ "num-traits", "pkcs1", "pkcs8", - "rand_core", + "rand_core 0.6.4", "signature", "spki", "subtle", @@ -2647,7 +2697,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77549399552de45a898a580c1b41d445bf730df867cc44e6c0233bbc4b8329de" dependencies = [ "digest", - "rand_core", + "rand_core 0.6.4", ] [[package]] @@ -2836,7 +2886,7 @@ dependencies = [ "memchr", "once_cell", "percent-encoding", - "rand", + "rand 0.8.5", "rsa", "serde", "sha1", @@ -2875,7 +2925,7 @@ dependencies = [ "md-5", "memchr", "once_cell", - "rand", + "rand 0.8.5", "serde", "serde_json", "sha2", @@ -3220,7 +3270,7 @@ dependencies = [ "indexmap 1.9.3", "pin-project", "pin-project-lite", - "rand", + "rand 0.8.5", "slab", "tokio", "tokio-util", @@ -3394,20 +3444,20 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" -version = "1.12.1" +version = "1.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b3758f5e68192bb96cc8f9b7e2c2cfdabb435499a28499a42f8f984092adad4b" +checksum = "ced87ca4be083373936a67f8de945faa23b6b42384bd5b64434850802c6dccd0" dependencies = [ - "getrandom 0.2.15", - "rand", + "getrandom 0.3.1", + "rand 0.9.0", "uuid-macro-internal", ] [[package]] name = "uuid-macro-internal" -version = "1.12.1" +version = "1.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8a86d88347b61a0e17b9908a67efcc594130830bf1045653784358dd023e294" +checksum = "d28dd23acb5f2fa7bd2155ab70b960e770596b3bb6395119b40476c3655dfba4" dependencies = [ "proc-macro2", "quote", @@ -3568,12 +3618,12 @@ dependencies = [ [[package]] name = "wasm-encoder" -version = "0.224.0" +version = "0.225.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7249cf8cb0c6b9cb42bce90c0a5feb276fbf963fa385ff3d818ab3d90818ed6" +checksum = "6f7eac0445cac73bcf09e6a97f83248d64356dccf9f2b100199769b6b42464e5" dependencies = [ - "leb128", - "wasmparser 0.224.0", + "leb128fmt", + "wasmparser 0.225.0", ] [[package]] @@ -3599,9 +3649,9 @@ dependencies = [ [[package]] name = "wasmparser" -version = "0.224.0" +version = "0.225.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "65881a664fdd43646b647bb27bf186ab09c05bf56779d40aed4c6dce47d423f5" +checksum = "36e5456165f81e64cb9908a0fe9b9d852c2c74582aa3fe2be3c2da57f937d3ae" dependencies = [ "bitflags", "indexmap 2.7.1", @@ -3971,24 +4021,24 @@ dependencies = [ [[package]] name = "wast" -version = "224.0.0" +version = "225.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d722a51e62b669d17e5a9f6bc8ec210178b37d869114355aa46989686c5c6391" +checksum = "c61496027ff707f9fa9e0b22c34ec163eb7adb1070df565e32a9180a76e4300b" dependencies = [ "bumpalo", - "leb128", + "leb128fmt", "memchr", "unicode-width", - "wasm-encoder 0.224.0", + "wasm-encoder 0.225.0", ] [[package]] name = "wat" -version = "1.224.0" +version = "1.225.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "71dece6a7dd5bcbcf8d256606c7fb3faa36286d46bf3f98185407719a5ceede2" +checksum = "89e72a33942234fd0794bcdac30e43b448de3187512414267678e511c6755f11" dependencies = [ - "wast 224.0.0", + "wast 225.0.0", ] [[package]] @@ -4339,7 +4389,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" dependencies = [ "byteorder", - "zerocopy-derive", + "zerocopy-derive 0.7.35", +] + +[[package]] +name = "zerocopy" +version = "0.8.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b8c07a70861ce02bad1607b5753ecb2501f67847b9f9ada7c160fff0ec6300c" +dependencies = [ + "zerocopy-derive 0.8.16", ] [[package]] @@ -4353,6 +4412,17 @@ dependencies = [ "syn 2.0.98", ] +[[package]] +name = "zerocopy-derive" +version = "0.8.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5226bc9a9a9836e7428936cde76bb6b22feea1a8bfdbc0d241136e4d13417e25" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.98", +] + [[package]] name = "zerofrom" version = "0.1.5" diff --git a/Cargo.toml b/Cargo.toml index 80d9207..fda3476 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,8 +8,8 @@ members = [ "session_manager", "executor_manager", "rpc", - "sdk/service", - "sdk/client", + "sdk/rust/service", + "sdk/rust/client", ] [workspace.dependencies] diff --git a/Makefile b/Makefile index 8a603a8..06b39e8 100644 --- a/Makefile +++ b/Makefile @@ -19,7 +19,7 @@ ci-image: sudo docker build -t xflops/flame-console -f docker/Dockerfile.console . update_protos: - cp rpc/protos/frontend.proto sdk/client/protos - cp rpc/protos/types.proto sdk/client/protos - cp rpc/protos/shim.proto sdk/service/protos - cp rpc/protos/types.proto sdk/service/protos + cp rpc/protos/frontend.proto sdk/rust/client/protos + cp rpc/protos/types.proto sdk/rust/client/protos + cp rpc/protos/shim.proto sdk/rust/service/protos + cp rpc/protos/types.proto sdk/rust/service/protos diff --git a/common/src/apis.rs b/common/src/apis.rs index 7910a48..41cea18 100644 --- a/common/src/apis.rs +++ b/common/src/apis.rs @@ -49,6 +49,7 @@ pub enum Shim { Stdio = 1, Wasm = 2, Shell = 3, + Grpc = 4, } #[derive(Clone, Debug, Default)] @@ -153,15 +154,15 @@ pub struct Executor { #[derive(Clone, Debug)] pub struct TaskContext { - pub id: String, - pub ssn_id: String, + pub task_id: String, + pub session_id: String, pub input: Option, pub output: Option, } #[derive(Clone, Debug)] pub struct SessionContext { - pub ssn_id: String, + pub session_id: String, pub application: ApplicationContext, pub slots: i32, pub common_data: Option, @@ -243,8 +244,8 @@ impl TryFrom for TaskContext { .ok_or(FlameError::InvalidConfig("spec".to_string()))?; Ok(TaskContext { - id: metadata.id, - ssn_id: spec.session_id.to_string(), + task_id: metadata.id.clone(), + session_id: spec.session_id.to_string(), input: spec.input.map(TaskInput::from), output: spec.output.map(TaskOutput::from), }) @@ -273,6 +274,37 @@ impl TryFrom for ApplicationContext { } } +impl From for rpc::TaskContext { + fn from(ctx: TaskContext) -> Self { + Self { + task_id: ctx.task_id.clone(), + session_id: ctx.session_id.clone(), + input: ctx.input.map(|d| d.into()), + } + } +} + +impl From for rpc::SessionContext { + fn from(ctx: SessionContext) -> Self { + Self { + session_id: ctx.session_id.clone(), + application: Some(ctx.application.into()), + common_data: ctx.common_data.map(|d| d.into()), + } + } +} + +impl From for rpc::ApplicationContext { + fn from(ctx: ApplicationContext) -> Self { + Self { + name: ctx.name.clone(), + url: ctx.url.clone(), + shim: ctx.shim.into(), + command: ctx.command.clone(), + } + } +} + impl TryFrom for SessionContext { type Error = FlameError; @@ -295,7 +327,7 @@ impl TryFrom for SessionContext { let application = ApplicationContext::try_from(app)?; Ok(SessionContext { - ssn_id: metadata.id, + session_id: metadata.id, application, slots: spec.slots, common_data: spec.common_data.map(CommonData::from), @@ -559,6 +591,7 @@ impl From for Shim { rpc::Shim::Stdio => Self::Stdio, rpc::Shim::Wasm => Self::Wasm, rpc::Shim::Shell => Self::Shell, + rpc::Shim::Grpc => Self::Grpc, } } } @@ -570,6 +603,7 @@ impl From for rpc::Shim { Shim::Stdio => Self::Stdio, Shim::Wasm => Self::Wasm, Shim::Shell => Self::Shell, + Shim::Grpc => Self::Grpc, } } } diff --git a/executor_manager/Cargo.toml b/executor_manager/Cargo.toml index 5670efb..e3f3dcb 100644 --- a/executor_manager/Cargo.toml +++ b/executor_manager/Cargo.toml @@ -16,6 +16,8 @@ log = { workspace = true } async-trait = { workspace = true } clap = { workspace = true } prost = { workspace = true } +tower = "0.5" +hyper-util = "0.1" bytes = "1" chrono = "0.4" diff --git a/executor_manager/src/shims/grpc_shim.rs b/executor_manager/src/shims/grpc_shim.rs index ce56a11..cde3511 100644 --- a/executor_manager/src/shims/grpc_shim.rs +++ b/executor_manager/src/shims/grpc_shim.rs @@ -12,42 +12,114 @@ limitations under the License. */ use std::sync::Arc; +use std::{thread, time}; use async_trait::async_trait; +use hyper_util::rt::TokioIo; +use tokio::net::UnixStream; use tokio::sync::Mutex; +use tonic::transport::Channel; +use tonic::transport::{Endpoint, Uri}; +use tonic::Request; +use tower::service_fn; + +use ::rpc::flame as rpc; +use rpc::grpc_shim_client::GrpcShimClient; +use rpc::EmptyRequest; use crate::shims::{Shim, ShimPtr}; use common::apis::{ApplicationContext, SessionContext, TaskContext, TaskOutput}; use common::FlameError; -#[derive(Clone)] pub struct GrpcShim { session_context: Option, + client: GrpcShimClient, + child: tokio::process::Child, } +const FLAME_SOCKET_PATH: &str = "FLAME_SOCKET_PATH"; + impl GrpcShim { - pub fn new_ptr(_: &ApplicationContext) -> ShimPtr { - // TODO: launch service based on application context. - Arc::new(Mutex::new(Self { + pub async fn new_ptr(app_ctx: &ApplicationContext) -> Result { + let socket_path = format!("/tmp/flame-shim-{}.sock", uuid::Uuid::new_v4().simple()); + std::env::set_var(FLAME_SOCKET_PATH, socket_path.clone()); + + // Spawn child process + let mut cmd = tokio::process::Command::new(&app_ctx.command.clone().unwrap()); + cmd.env(FLAME_SOCKET_PATH, &socket_path).kill_on_drop(true); + + let child = cmd + .env(FLAME_SOCKET_PATH, &socket_path) + .spawn() + .map_err(|e| FlameError::InvalidConfig(e.to_string()))?; + + let channel = Endpoint::try_from("http://[::]:50051") + .map_err(|e| FlameError::Network(e.to_string()))? + .connect_with_connector(service_fn(|_: Uri| async { + // Connect to a Uds socket + let path = std::env::var(FLAME_SOCKET_PATH).ok().unwrap(); + Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(path).await?)) + })) + .await + .map_err(|e| FlameError::Network(e.to_string()))?; + + let mut client = GrpcShimClient::new(channel); + + let mut connected = false; + for i in 1..10 { + let resp = client + .readiness(Request::new(EmptyRequest::default())) + .await; + if resp.is_ok() { + connected = true; + break; + } + // sleep 1s + let ten_millis = time::Duration::from_secs(1); + thread::sleep(ten_millis); + } + + if !connected { + return Err(FlameError::InvalidConfig( + "failed to connect to service".to_string(), + )); + } + + Ok(Arc::new(Mutex::new(Self { session_context: None, - })) + client, + child, + }))) } } #[async_trait] impl Shim for GrpcShim { async fn on_session_enter(&mut self, ctx: &SessionContext) -> Result<(), FlameError> { - todo!() + let req = Request::new(rpc::SessionContext::from(ctx.clone())); + self.client.on_session_enter(req).await?; + Ok(()) } async fn on_task_invoke( &mut self, ctx: &TaskContext, ) -> Result, FlameError> { - todo!() + let req = Request::new(rpc::TaskContext::from(ctx.clone())); + let resp = self.client.on_task_invoke(req).await?; + let output = resp.into_inner(); + + Ok(output.data.map(|d| d.into())) } async fn on_session_leave(&mut self) -> Result<(), FlameError> { - todo!() + let _ = self + .client + .on_session_leave(Request::new(EmptyRequest::default())) + .await?; + + self.child.kill(); + + Ok(()) } } diff --git a/executor_manager/src/shims/log_shim.rs b/executor_manager/src/shims/log_shim.rs index 71c121f..7f76cde 100644 --- a/executor_manager/src/shims/log_shim.rs +++ b/executor_manager/src/shims/log_shim.rs @@ -38,7 +38,7 @@ impl Shim for LogShim { async fn on_session_enter(&mut self, ctx: &SessionContext) -> Result<(), FlameError> { log::info!( "on_session_enter: Session: <{}>, Application: <{}>, Slots: <{}>", - ctx.ssn_id, + ctx.session_id, ctx.application.name, ctx.slots ); @@ -53,8 +53,8 @@ impl Shim for LogShim { ) -> Result, FlameError> { log::info!( "on_task_invoke: Task: <{}>, Session: <{}>", - ctx.id, - ctx.ssn_id + ctx.task_id, + ctx.session_id ); Ok(None) } @@ -67,7 +67,7 @@ impl Shim for LogShim { Some(ctx) => { log::info!( "on_session_leave: Session: <{}>, Application: <{}>, Slots: <{}>", - ctx.ssn_id, + ctx.session_id, ctx.application.name, ctx.slots ); diff --git a/executor_manager/src/shims/mod.rs b/executor_manager/src/shims/mod.rs index 92bafce..74488fe 100644 --- a/executor_manager/src/shims/mod.rs +++ b/executor_manager/src/shims/mod.rs @@ -11,21 +11,22 @@ See the License for the specific language governing permissions and limitations under the License. */ +mod grpc_shim; mod log_shim; +mod shell_shim; mod stdio_shim; mod wasm_shim; -mod shell_shim; -mod grpc_shim; use std::sync::Arc; use async_trait::async_trait; +use grpc_shim::GrpcShim; use tokio::sync::Mutex; use self::log_shim::LogShim; +use self::shell_shim::ShellShim; use self::stdio_shim::StdioShim; use self::wasm_shim::WasmShim; -use self::shell_shim::ShellShim; use common::apis::{ApplicationContext, SessionContext, Shim as ShimType, TaskContext, TaskOutput}; @@ -38,6 +39,7 @@ pub async fn from(app: &ApplicationContext) -> Result { ShimType::Stdio => Ok(StdioShim::new_ptr(app)), ShimType::Wasm => Ok(WasmShim::new_ptr(app).await?), ShimType::Shell => Ok(ShellShim::new_ptr(app)), + ShimType::Grpc => Ok(GrpcShim::new_ptr(app).await?), _ => Ok(LogShim::new_ptr(app)), } } diff --git a/executor_manager/src/shims/shell_shim.rs b/executor_manager/src/shims/shell_shim.rs index 57712d6..798fde0 100644 --- a/executor_manager/src/shims/shell_shim.rs +++ b/executor_manager/src/shims/shell_shim.rs @@ -54,9 +54,12 @@ impl Shim for ShellShim { &mut self, ctx: &TaskContext, ) -> Result, FlameError> { - let input = ctx.input.clone().ok_or(FlameError::Uninitialized(String::from( - "task input is empty", - )))?; + let input = ctx + .input + .clone() + .ok_or(FlameError::Uninitialized(String::from( + "task input is empty", + )))?; let mut cmd = String::from_utf8(input.to_ascii_lowercase()) .map_err(|e| FlameError::Uninitialized(format!("task input is invalid: {}", e)))?; @@ -86,8 +89,8 @@ impl Shim for ShellShim { .stdout(Stdio::piped()) // TODO: add working dir // .current_dir(&self.application.working_directory) - .env(FLAME_TASK_ID, &ctx.id) - .env(FLAME_SESSION_ID, &ctx.ssn_id) + .env(FLAME_TASK_ID, &ctx.task_id) + .env(FLAME_SESSION_ID, &ctx.session_id) .spawn() .map_err(|_| FlameError::Internal("failed to start subprocess".to_string()))?; diff --git a/executor_manager/src/shims/stdio_shim.rs b/executor_manager/src/shims/stdio_shim.rs index 48983cb..3728a98 100644 --- a/executor_manager/src/shims/stdio_shim.rs +++ b/executor_manager/src/shims/stdio_shim.rs @@ -85,8 +85,8 @@ impl Shim for StdioShim { .stdout(Stdio::piped()) // TODO: add working dir // .current_dir(&self.application.working_directory) - .env(FLAME_TASK_ID, &ctx.id) - .env(FLAME_SESSION_ID, &ctx.ssn_id) + .env(FLAME_TASK_ID, &ctx.task_id) + .env(FLAME_SESSION_ID, &ctx.session_id) .spawn() .map_err(|_| FlameError::Internal("failed to start subprocess".to_string()))?; diff --git a/executor_manager/src/shims/wasm_shim.rs b/executor_manager/src/shims/wasm_shim.rs index e46aa2c..5b00503 100644 --- a/executor_manager/src/shims/wasm_shim.rs +++ b/executor_manager/src/shims/wasm_shim.rs @@ -79,7 +79,7 @@ impl Shim for WasmShim { ctx: &apis::SessionContext, ) -> Result<(), common::FlameError> { let ssn_ctx = service::SessionContext { - session_id: ctx.ssn_id.clone(), + session_id: ctx.session_id.clone(), common_data: ctx.common_data.clone().map(apis::CommonData::into), }; @@ -101,8 +101,8 @@ impl Shim for WasmShim { ctx: &apis::TaskContext, ) -> Result, common::FlameError> { let task_ctx = service::TaskContext { - session_id: ctx.ssn_id.clone(), - task_id: ctx.id.clone(), + session_id: ctx.session_id.clone(), + task_id: ctx.task_id.clone(), }; let output = self @@ -122,7 +122,7 @@ impl Shim for WasmShim { async fn on_session_leave(&mut self) -> Result<(), common::FlameError> { let ssn_ctx = service::SessionContext { - session_id: self.session_context.clone().unwrap().ssn_id.clone(), + session_id: self.session_context.clone().unwrap().session_id.clone(), common_data: None, }; diff --git a/executor_manager/src/states/bound.rs b/executor_manager/src/states/bound.rs index cf3e9ea..573da14 100644 --- a/executor_manager/src/states/bound.rs +++ b/executor_manager/src/states/bound.rs @@ -49,7 +49,7 @@ impl State for BoundState { let (ssn_id, task_id) = { let task = &self.executor.task.clone().unwrap(); - (task.ssn_id.clone(), task.id.clone()) + (task.session_id.clone(), task.task_id.clone()) }; log::debug!("Complete task <{}/{}>", ssn_id, task_id) } diff --git a/executor_manager/src/states/idle.rs b/executor_manager/src/states/idle.rs index 51961a0..f1a4f8b 100644 --- a/executor_manager/src/states/idle.rs +++ b/executor_manager/src/states/idle.rs @@ -48,7 +48,7 @@ impl State for IdleState { log::debug!( "Executor <{}> was bound to <{}>.", &self.executor.id.clone(), - &ssn.ssn_id.clone() + &ssn.session_id.clone() ); Ok(self.executor.clone()) diff --git a/flmctl/Cargo.toml b/flmctl/Cargo.toml index 95c9f49..424d776 100644 --- a/flmctl/Cargo.toml +++ b/flmctl/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -flame-client = { path = "../sdk/client" } +flame-client = { path = "../sdk/rust/client" } common = { path = "../common" } tokio = { workspace = true } diff --git a/flmexec/Cargo.toml b/flmexec/Cargo.toml index 1edc8d3..4d4ee92 100644 --- a/flmexec/Cargo.toml +++ b/flmexec/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -flame-client = { path = "../sdk/client" } +flame-client = { path = "../sdk/rust/client" } rpc = { path = "../rpc" } common = { path = "../common" } diff --git a/flmexec/src/main.rs b/flmexec/src/main.rs index 6e89c00..ef1dadb 100644 --- a/flmexec/src/main.rs +++ b/flmexec/src/main.rs @@ -72,7 +72,7 @@ async fn main() -> Result<(), Box> { let mut tasks = vec![]; let tasks_creations_start_time = Local::now(); - let info = Arc::new(Mutex::new(ExecInfo{})); + let info = Arc::new(Mutex::new(ExecInfo {})); for _ in 0..task_num { tasks.push(ssn.run_task(Some(cli.command.clone().into()), info.clone())); @@ -95,13 +95,16 @@ async fn main() -> Result<(), Box> { Ok(()) } -struct ExecInfo { -} +struct ExecInfo {} impl flame::TaskInformer for ExecInfo { fn on_update(&mut self, task: flame::Task) { if task.is_completed() { - println!("Task {:<10}: {:?}", task.id, task.output.unwrap_or_default()); + println!( + "Task {:<10}: {:?}", + task.id, + task.output.unwrap_or_default() + ); } } diff --git a/flmping/Cargo.toml b/flmping/Cargo.toml index 8c68151..3e8e6ff 100644 --- a/flmping/Cargo.toml +++ b/flmping/Cargo.toml @@ -6,7 +6,9 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -flame-client = { path = "../sdk/client" } +flame-client = { path = "../sdk/rust/client" } +flame-service = { path = "../sdk/rust/service" } + rpc = { path = "../rpc" } common = { path = "../common" } diff --git a/flmping/src/service.rs b/flmping/src/service.rs index a24f1fe..54660bd 100644 --- a/flmping/src/service.rs +++ b/flmping/src/service.rs @@ -11,18 +11,36 @@ See the License for the specific language governing permissions and limitations under the License. */ -use std::env::{self, VarError}; - use gethostname::gethostname; -const FLAME_TASK_ID: &str = "FLAME_TASK_ID"; -const FLAME_SESSION_ID: &str = "FLAME_SESSION_ID"; +use flame_service::{self as flame, FlameError, SessionContext, TaskContext, TaskOutput}; + +#[derive(Clone)] +pub struct FlmpingService {} + +#[tonic::async_trait] +impl flame::FlameService for FlmpingService { + async fn on_session_enter(&self, _: SessionContext) -> Result<(), FlameError> { + Ok(()) + } -pub fn main() -> Result<(), VarError> { - let ssn_id = env::var(FLAME_SESSION_ID)?; - let task_id = env::var(FLAME_TASK_ID)?; + async fn on_task_invoke(&self, ctx: TaskContext) -> Result, FlameError> { + Ok(Some(TaskOutput::from(format!( + "Task <{}/{}> is executed on <{:?}>", + ctx.session_id, + ctx.task_id, + gethostname(), + )))) + } + + async fn on_session_leave(&self) -> Result<(), FlameError> { + Ok(()) + } +} - print!("Execute <{ssn_id}/{task_id}> on host <{:?}>", gethostname()); +#[tokio::main] +async fn main() -> Result<(), Box> { + flame_service::run(FlmpingService {}).await?; Ok(()) } diff --git a/rpc/build.rs b/rpc/build.rs index 51f9f25..4aea303 100644 --- a/rpc/build.rs +++ b/rpc/build.rs @@ -25,6 +25,7 @@ fn main() -> Result<(), Box> { "protos/types.proto", "protos/frontend.proto", "protos/backend.proto", + "protos/shim.proto", ], &["protos"], )?; diff --git a/rpc/protos/shim.proto b/rpc/protos/shim.proto index 48e6247..1f4142a 100644 --- a/rpc/protos/shim.proto +++ b/rpc/protos/shim.proto @@ -6,25 +6,28 @@ package flame; message ApplicationContext { string name = 1; + Shim shim = 2; + optional string url = 3; + optional string command = 4; } message SessionContext { - string id = 1; + string session_id = 1; ApplicationContext application = 2; - optional string common_data = 3; + optional bytes common_data = 3; } message TaskContext { - string id = 1; - SessionContext session = 2; - optional string input = 4; + string task_id = 1; + string session_id = 2; + optional bytes input = 4; } message TaskOutput { - optional string data = 1; + optional bytes data = 1; } -service Shim{ +service GrpcShim{ rpc Readiness(EmptyRequest) returns (Result) {} rpc OnSessionEnter(SessionContext) returns (Result) {} rpc OnTaskInvoke(TaskContext) returns (TaskOutput) {} diff --git a/rpc/protos/types.proto b/rpc/protos/types.proto index 8ed3d94..9646cd8 100644 --- a/rpc/protos/types.proto +++ b/rpc/protos/types.proto @@ -69,6 +69,7 @@ enum Shim { Stdio = 1; Wasm = 2; Shell = 3; + Grpc = 4; } enum ApplicationState { diff --git a/sdk/client/Cargo.toml b/sdk/rust/client/Cargo.toml similarity index 100% rename from sdk/client/Cargo.toml rename to sdk/rust/client/Cargo.toml diff --git a/sdk/client/build.rs b/sdk/rust/client/build.rs similarity index 100% rename from sdk/client/build.rs rename to sdk/rust/client/build.rs diff --git a/sdk/client/protos/frontend.proto b/sdk/rust/client/protos/frontend.proto similarity index 100% rename from sdk/client/protos/frontend.proto rename to sdk/rust/client/protos/frontend.proto diff --git a/sdk/client/protos/types.proto b/sdk/rust/client/protos/types.proto similarity index 98% rename from sdk/client/protos/types.proto rename to sdk/rust/client/protos/types.proto index 2f19f37..9646cd8 100644 --- a/sdk/client/protos/types.proto +++ b/sdk/rust/client/protos/types.proto @@ -69,6 +69,7 @@ enum Shim { Stdio = 1; Wasm = 2; Shell = 3; + Grpc = 4; } enum ApplicationState { @@ -118,11 +119,6 @@ message Executor { ExecutorStatus status = 3; } -message Result { - int32 return_code = 1; - optional string message = 2; -} - message SessionList { repeated Session sessions = 1; } @@ -130,3 +126,11 @@ message SessionList { message ApplicationList { repeated Application applications = 1; } + +message Result { + int32 return_code = 1; + optional string message = 2; +} + +message EmptyRequest { +} diff --git a/sdk/client/src/lib.rs b/sdk/rust/client/src/lib.rs similarity index 98% rename from sdk/client/src/lib.rs rename to sdk/rust/client/src/lib.rs index 045065a..cffb3c6 100644 --- a/sdk/client/src/lib.rs +++ b/sdk/rust/client/src/lib.rs @@ -56,6 +56,13 @@ macro_rules! lock_ptr { }; } +#[macro_export] +macro_rules! new_ptr { + ( $mutex_arc:expr ) => { + Arc::new(Mutex::new($mutex_arc)) + }; +} + pub async fn connect(addr: &str) -> Result { let endpoint = Endpoint::from_shared(addr.to_string()) .map_err(|_| FlameError::InvalidConfig("invalid address".to_string()))?; @@ -109,6 +116,7 @@ pub enum Shim { Stdio = 1, Wasm = 2, Shell = 3, + Grpc = 4, } #[derive(Clone)] @@ -160,7 +168,6 @@ pub struct Task { } pub type TaskInformerPtr = Arc>; -pub type TaskResultPtr = Arc>>; pub trait TaskInformer: Send + Sync + 'static { fn on_update(&mut self, task: Task); @@ -392,6 +399,7 @@ impl From for Shim { rpc::Shim::Stdio => Shim::Stdio, rpc::Shim::Wasm => Shim::Wasm, rpc::Shim::Shell => Shim::Shell, + rpc::Shim::Grpc => Shim::Grpc, } } } diff --git a/sdk/client/src/trace.rs b/sdk/rust/client/src/trace.rs similarity index 100% rename from sdk/client/src/trace.rs rename to sdk/rust/client/src/trace.rs diff --git a/sdk/client/tests/integration_test.rs b/sdk/rust/client/tests/integration_test.rs similarity index 95% rename from sdk/client/tests/integration_test.rs rename to sdk/rust/client/tests/integration_test.rs index 955cc28..ad30029 100644 --- a/sdk/client/tests/integration_test.rs +++ b/sdk/rust/client/tests/integration_test.rs @@ -15,7 +15,7 @@ use std::sync::{Arc, Mutex}; use futures::future::try_join_all; -use self::flame::{lock_ptr, Task, TaskInformer, TaskState}; +use self::flame::{lock_ptr, new_ptr, Task, TaskInformer, TaskState}; use flame_client as flame; use self::flame::{FlameError, SessionAttributes, SessionState}; @@ -97,11 +97,11 @@ async fn test_create_session_with_tasks() -> Result<(), FlameError> { assert_eq!(ssn.state, SessionState::Open); - let informer = Arc::new(Mutex::new(DefaultTaskInformer { + let informer = new_ptr!(DefaultTaskInformer { succeed: 0, failed: 0, error: 0, - })); + }); let task_num = 100; let mut tasks = vec![]; @@ -137,11 +137,11 @@ async fn test_create_multiple_sessions_with_tasks() -> Result<(), FlameError> { let ssn_2 = conn.create_session(&ssn_attr).await?; assert_eq!(ssn_2.state, SessionState::Open); - let informer = Arc::new(Mutex::new(DefaultTaskInformer { + let informer = new_ptr!(DefaultTaskInformer { succeed: 0, failed: 0, error: 0, - })); + }); let task_num = 100; let mut tasks = vec![]; diff --git a/sdk/rust/service/Cargo.toml b/sdk/rust/service/Cargo.toml new file mode 100644 index 0000000..16367bb --- /dev/null +++ b/sdk/rust/service/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "flame-service" +version = "0.1.0" +edition = "2021" +build = "build.rs" + +[dependencies] +tower = "0.4" +prost = { workspace = true, features = ["derive"] } +tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } +tokio-stream = "0.1" +tonic.workspace = true +thiserror = "1.0" +bytes = "1" + +[build-dependencies] +tonic-build = { workspace = true } diff --git a/sdk/rust/service/build.rs b/sdk/rust/service/build.rs new file mode 100644 index 0000000..3b2388a --- /dev/null +++ b/sdk/rust/service/build.rs @@ -0,0 +1,20 @@ +/* +Copyright 2025 The Flame Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +fn main() -> Result<(), Box> { + tonic_build::configure().compile_protos( + &["protos/shim.proto"], + &["protos/"], // Proto file location + )?; + Ok(()) +} diff --git a/sdk/rust/service/protos/shim.proto b/sdk/rust/service/protos/shim.proto new file mode 100644 index 0000000..1f4142a --- /dev/null +++ b/sdk/rust/service/protos/shim.proto @@ -0,0 +1,35 @@ +syntax = "proto3"; + +import "types.proto"; + +package flame; + +message ApplicationContext { + string name = 1; + Shim shim = 2; + optional string url = 3; + optional string command = 4; +} + +message SessionContext { + string session_id = 1; + ApplicationContext application = 2; + optional bytes common_data = 3; +} + +message TaskContext { + string task_id = 1; + string session_id = 2; + optional bytes input = 4; +} + +message TaskOutput { + optional bytes data = 1; +} + +service GrpcShim{ + rpc Readiness(EmptyRequest) returns (Result) {} + rpc OnSessionEnter(SessionContext) returns (Result) {} + rpc OnTaskInvoke(TaskContext) returns (TaskOutput) {} + rpc OnSessionLeave(EmptyRequest) returns (Result) {} +} diff --git a/sdk/service/protos/types.proto b/sdk/rust/service/protos/types.proto similarity index 98% rename from sdk/service/protos/types.proto rename to sdk/rust/service/protos/types.proto index 2f19f37..9646cd8 100644 --- a/sdk/service/protos/types.proto +++ b/sdk/rust/service/protos/types.proto @@ -69,6 +69,7 @@ enum Shim { Stdio = 1; Wasm = 2; Shell = 3; + Grpc = 4; } enum ApplicationState { @@ -118,11 +119,6 @@ message Executor { ExecutorStatus status = 3; } -message Result { - int32 return_code = 1; - optional string message = 2; -} - message SessionList { repeated Session sessions = 1; } @@ -130,3 +126,11 @@ message SessionList { message ApplicationList { repeated Application applications = 1; } + +message Result { + int32 return_code = 1; + optional string message = 2; +} + +message EmptyRequest { +} diff --git a/sdk/rust/service/src/lib.rs b/sdk/rust/service/src/lib.rs new file mode 100644 index 0000000..8bad4e2 --- /dev/null +++ b/sdk/rust/service/src/lib.rs @@ -0,0 +1,186 @@ +/* +Copyright 2025 The Flame Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +use std::sync::Arc; + +use bytes::Bytes; +use thiserror::Error; +use tokio::net::UnixListener; +use tokio_stream::wrappers::UnixListenerStream; +use tonic::{transport::Server, Request, Response, Status}; + +mod flame { + tonic::include_proto!("flame"); +} + +use self::rpc::grpc_shim_server::{GrpcShim, GrpcShimServer}; +use crate::flame as rpc; + +type Message = Bytes; +pub type TaskInput = Message; +pub type TaskOutput = Message; +pub type CommonData = Message; + +const FLAME_SOCKET_PATH: &str = "FLAME_SOCKET_PATH"; + +#[derive(Error, Debug, Clone)] +pub enum FlameError { + #[error("'{0}' not found")] + NotFound(String), + + #[error("'{0}'")] + Internal(String), + + #[error("'{0}'")] + Network(String), + + #[error("'{0}'")] + InvalidConfig(String), +} + +pub struct ApplicationContext { + pub name: String, + pub url: Option, + pub command: Option, +} + +pub struct SessionContext { + pub session_id: String, + pub application: ApplicationContext, + pub common_data: Option, +} + +pub struct TaskContext { + pub task_id: String, + pub session_id: String, + pub input: Option, +} + +#[tonic::async_trait] +pub trait FlameService: Send + Sync + 'static { + async fn on_session_enter(&self, _: SessionContext) -> Result<(), FlameError>; + async fn on_task_invoke(&self, _: TaskContext) -> Result, FlameError>; + async fn on_session_leave(&self) -> Result<(), FlameError>; +} + +pub type FlameServicePtr = Arc; + +struct ShimService { + service: FlameServicePtr, +} + +#[tonic::async_trait] +impl GrpcShim for ShimService { + async fn readiness( + &self, + _: Request, + ) -> Result, Status> { + Ok(Response::new(rpc::Result { + return_code: 0, + message: None, + })) + } + + async fn on_session_enter( + &self, + req: Request, + ) -> Result, Status> { + let req = req.into_inner(); + self.service + .on_session_enter(SessionContext::from(req)) + .await?; + + Ok(Response::new(rpc::Result { + return_code: 0, + message: None, + })) + } + + async fn on_task_invoke( + &self, + req: Request, + ) -> Result, Status> { + let req = req.into_inner(); + let data = self.service.on_task_invoke(TaskContext::from(req)).await?; + + Ok(Response::new(rpc::TaskOutput { + data: data.map(|d| d.into()), + })) + } + + async fn on_session_leave( + &self, + _: Request, + ) -> Result, Status> { + self.service.on_session_leave().await?; + + Ok(Response::new(rpc::Result { + return_code: 0, + message: None, + })) + } +} + +pub async fn run(service: impl FlameService) -> Result<(), Box> { + let shim_service = ShimService { + service: Arc::new(service), + }; + + let path = std::env::var(FLAME_SOCKET_PATH)?; + + let uds = UnixListener::bind(path)?; + let uds_stream = UnixListenerStream::new(uds); + + Server::builder() + .add_service(GrpcShimServer::new(shim_service)) + .serve_with_incoming(uds_stream) + .await?; + + Ok(()) +} + +impl From for Status { + fn from(e: FlameError) -> Self { + Status::internal(e.to_string()) + } +} + +impl From for ApplicationContext { + fn from(ctx: rpc::ApplicationContext) -> Self { + Self { + name: ctx.name.clone(), + url: ctx.url.clone(), + command: ctx.command.clone(), + } + } +} + +impl From for SessionContext { + fn from(ctx: rpc::SessionContext) -> Self { + SessionContext { + session_id: ctx.session_id.clone(), + application: ctx.application.map(ApplicationContext::from).unwrap(), + common_data: ctx.common_data.map(|data| data.into()), + } + } +} + +impl From for TaskContext { + fn from(ctx: rpc::TaskContext) -> Self { + TaskContext { + task_id: ctx.task_id.clone(), + session_id: ctx.session_id.clone(), + input: ctx.input.map(|data| data.into()), + } + } +} diff --git a/sdk/service/Cargo.toml b/sdk/service/Cargo.toml deleted file mode 100644 index 91a614c..0000000 --- a/sdk/service/Cargo.toml +++ /dev/null @@ -1,6 +0,0 @@ -[package] -name = "flame-service" -version = "0.1.0" -edition = "2021" - -[dependencies] diff --git a/sdk/service/protos/shim.proto b/sdk/service/protos/shim.proto deleted file mode 100644 index af85883..0000000 --- a/sdk/service/protos/shim.proto +++ /dev/null @@ -1,13 +0,0 @@ -syntax = "proto3"; - -import "types.proto"; - -package flame; - -message SessionContext { - -} - -service Shim{ - rpc RegisterApplication(RegisterApplicationRequest) returns (Result) {} -} \ No newline at end of file diff --git a/sdk/service/src/lib.rs b/sdk/service/src/lib.rs deleted file mode 100644 index b93cf3f..0000000 --- a/sdk/service/src/lib.rs +++ /dev/null @@ -1,14 +0,0 @@ -pub fn add(left: u64, right: u64) -> u64 { - left + right -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn it_works() { - let result = add(2, 2); - assert_eq!(result, 4); - } -} diff --git a/session_manager/migrations/sqlite/20250115120000_app.sql b/session_manager/migrations/sqlite/20250115120000_app.sql index 62d0c0c..375dc8d 100644 --- a/session_manager/migrations/sqlite/20250115120000_app.sql +++ b/session_manager/migrations/sqlite/20250115120000_app.sql @@ -12,7 +12,7 @@ CREATE TABLE IF NOT EXISTS applications ( ); INSERT OR IGNORE INTO applications (name, command, shim, creation_time, state) - VALUES ('flmping', '/usr/local/flame/bin/flmping-service', 1, strftime ('%s', 'now'), 0); + VALUES ('flmping', '/usr/local/flame/bin/flmping-service', 4, strftime ('%s', 'now'), 0); INSERT OR IGNORE INTO applications (name, shim, creation_time, state) VALUES ('flmexec', 3, strftime ('%s', 'now'), 0); INSERT OR IGNORE INTO applications (name, shim, creation_time, state) diff --git a/session_manager/src/apiserver/backend.rs b/session_manager/src/apiserver/backend.rs index e3c5932..020908c 100644 --- a/session_manager/src/apiserver/backend.rs +++ b/session_manager/src/apiserver/backend.rs @@ -49,7 +49,9 @@ impl Backend for Flame { state: apis::ExecutorState::Idle, }; - self.controller.register_executor(&e).map_err(Status::from)?; + self.controller + .register_executor(&e) + .map_err(Status::from)?; Ok(Response::new(rpc::Result::default())) } @@ -96,7 +98,9 @@ impl Backend for Flame { trace_fn!("Backend::bind_executor_completed"); let req = req.into_inner(); - self.controller.bind_session_completed(req.executor_id).await?; + self.controller + .bind_session_completed(req.executor_id) + .await?; Ok(Response::new(rpc::Result::default())) } diff --git a/session_manager/src/apiserver/mod.rs b/session_manager/src/apiserver/mod.rs index 0c2a829..9679f5a 100644 --- a/session_manager/src/apiserver/mod.rs +++ b/session_manager/src/apiserver/mod.rs @@ -26,13 +26,11 @@ mod backend; mod frontend; pub struct Flame { - controller: ControllerPtr + controller: ControllerPtr, } pub fn new(controller: ControllerPtr) -> Arc { - Arc::new(ApiserverRunner { - controller - }) + Arc::new(ApiserverRunner { controller }) } struct ApiserverRunner { diff --git a/session_manager/src/controller/mod.rs b/session_manager/src/controller/mod.rs index 05fee11..e658046 100644 --- a/session_manager/src/controller/mod.rs +++ b/session_manager/src/controller/mod.rs @@ -57,7 +57,7 @@ impl Controller { } pub async fn delete_session(&self, id: SessionID) -> Result { - self.storage.delete_session(id).await + self.storage.delete_session(id).await } pub fn list_session(&self) -> Result, FlameError> { @@ -76,8 +76,6 @@ impl Controller { self.storage.get_task(ssn_id, id) } - - pub async fn update_task( &self, ssn: SessionPtr, @@ -91,7 +89,7 @@ impl Controller { pub fn register_executor(&self, e: &Executor) -> Result<(), FlameError> { self.storage.register_executor(e) } - + pub fn snapshot(&self) -> Result { self.storage.snapshot() } diff --git a/session_manager/src/scheduler/ctx.rs b/session_manager/src/scheduler/ctx.rs index bb424ef..9f5ae28 100644 --- a/session_manager/src/scheduler/ctx.rs +++ b/session_manager/src/scheduler/ctx.rs @@ -74,7 +74,9 @@ impl Context { exec: ExecutorInfoPtr, ssn: SessionInfoPtr, ) -> Result<(), FlameError> { - self.controller.bind_session(exec.id.clone(), ssn.id).await?; + self.controller + .bind_session(exec.id.clone(), ssn.id) + .await?; self.plugins.on_session_bind(ssn)?; self.snapshot .update_executor_state(exec.clone(), ExecutorState::Binding)?; diff --git a/session_manager/src/storage/engine/mod.rs b/session_manager/src/storage/engine/mod.rs index 4163445..d4a1c0d 100644 --- a/session_manager/src/storage/engine/mod.rs +++ b/session_manager/src/storage/engine/mod.rs @@ -17,7 +17,8 @@ use async_trait::async_trait; use crate::FlameError; use common::apis::{ - Application, ApplicationID, CommonData, Session, SessionID, Task, TaskGID, TaskInput, TaskOutput, TaskState + Application, ApplicationID, CommonData, Session, SessionID, Task, TaskGID, TaskInput, + TaskOutput, TaskState, }; mod sqlite; @@ -48,7 +49,12 @@ pub trait Engine: Send + Sync + 'static { async fn get_task(&self, gid: TaskGID) -> Result; async fn retry_task(&self, gid: TaskGID) -> Result; async fn delete_task(&self, gid: TaskGID) -> Result; - async fn update_task(&self, gid: TaskGID, state: TaskState, output: Option) -> Result; + async fn update_task( + &self, + gid: TaskGID, + state: TaskState, + output: Option, + ) -> Result; async fn find_tasks(&self, ssn_id: SessionID) -> Result, FlameError>; } diff --git a/session_manager/src/storage/engine/sqlite.rs b/session_manager/src/storage/engine/sqlite.rs index 7c5d261..7f24892 100644 --- a/session_manager/src/storage/engine/sqlite.rs +++ b/session_manager/src/storage/engine/sqlite.rs @@ -21,7 +21,8 @@ use sqlx::{migrate::MigrateDatabase, FromRow, Sqlite, SqlitePool}; use crate::FlameError; use common::apis::{ - Application, ApplicationID, ApplicationState, CommonData, Session, SessionID, SessionState, SessionStatus, Shim, Task, TaskGID, TaskID, TaskInput, TaskOutput, TaskState + Application, ApplicationID, ApplicationState, CommonData, Session, SessionID, SessionState, + SessionStatus, Shim, Task, TaskGID, TaskID, TaskInput, TaskOutput, TaskState, }; use crate::storage::engine::{Engine, EnginePtr}; @@ -372,7 +373,12 @@ impl Engine for SqliteEngine { task.try_into() } - async fn update_task(&self, gid: TaskGID, state: TaskState, output: Option) -> Result { + async fn update_task( + &self, + gid: TaskGID, + state: TaskState, + output: Option, + ) -> Result { let mut tx = self .pool .begin() @@ -384,8 +390,7 @@ impl Engine for SqliteEngine { _ => None, }; let output: Option> = output.map(Bytes::into); - let sql = - r#"UPDATE tasks SET state=?, completion_time=?, output=? WHERE id=? AND ssn_id=? RETURNING *"#; + let sql = r#"UPDATE tasks SET state=?, completion_time=?, output=? WHERE id=? AND ssn_id=? RETURNING *"#; let task: TaskDao = sqlx::query_as(sql) .bind::(state.into()) .bind(completion_time)