diff --git a/Cargo.lock b/Cargo.lock index 314e187..0a19828 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1016,6 +1016,9 @@ dependencies = [ "env_logger", "flame-client", "log", + "serde", + "serde_derive", + "serde_yaml", "sqlx", "tokio", "tonic", @@ -1053,7 +1056,6 @@ dependencies = [ "gethostname", "indicatif", "log", - "rpc", "tokio", "tonic", ] diff --git a/common/src/apis.rs b/common/src/apis.rs index 41cea18..645c22f 100644 --- a/common/src/apis.rs +++ b/common/src/apis.rs @@ -65,6 +65,16 @@ pub struct Application { pub working_directory: String, } +#[derive(Clone, Debug, Default)] +pub struct ApplicationAttributes { + pub shim: Shim, + pub url: Option, + pub command: Option, + pub arguments: Vec, + pub environments: Vec, + pub working_directory: String, +} + #[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Hash, strum_macros::Display)] pub enum SessionState { #[default] @@ -481,6 +491,19 @@ impl From<&Application> for rpc::Application { } } +impl From for ApplicationAttributes { + fn from(spec: rpc::ApplicationSpec) -> Self { + Self { + shim: spec.shim().into(), + url: spec.url.clone(), + command: spec.command.clone(), + arguments: spec.arguments.clone(), + environments: spec.environments.clone(), + working_directory: spec.working_directory.clone().unwrap_or_default(), + } + } +} + impl From for rpc::ApplicationState { fn from(s: ApplicationState) -> Self { match s { diff --git a/flmctl/Cargo.toml b/flmctl/Cargo.toml index 424d776..36aa8e8 100644 --- a/flmctl/Cargo.toml +++ b/flmctl/Cargo.toml @@ -18,4 +18,8 @@ sqlx = { workspace = true } clap = { version = "4.1", features = ["derive"] } chrono = "0.4" -url = {version = "2.5"} \ No newline at end of file +url = {version = "2.5"} + +serde = "1.0" +serde_yaml = "0.9" +serde_derive = "1.0" \ No newline at end of file diff --git a/flmctl/src/list.rs b/flmctl/src/list.rs index 47413af..8f01ec3 100644 --- a/flmctl/src/list.rs +++ b/flmctl/src/list.rs @@ -45,10 +45,10 @@ async fn list_application(conn: Connection) -> Result<(), Box> { println!( "{:<15}{:<15}{:<15}{:<15}{:<30}", app.name, - app.shim.to_string(), + app.attributes.shim.to_string(), app.state.to_string(), app.creation_time.format("%T"), - app.command.clone().unwrap_or("-".to_string()) + app.attributes.command.clone().unwrap_or("-".to_string()) ); } diff --git a/flmctl/src/main.rs b/flmctl/src/main.rs index dfa7780..370f4f0 100644 --- a/flmctl/src/main.rs +++ b/flmctl/src/main.rs @@ -20,6 +20,8 @@ mod create; mod helper; mod list; mod migrate; +mod register; +mod unregister; mod view; #[derive(Parser)] @@ -77,6 +79,18 @@ enum Commands { #[arg(short, long)] sql: String, }, + /// Register an application + Register { + /// The yaml file of the application + #[arg(short, long)] + file: String, + }, + /// Unregister the application from Flame + Unregister { + /// The name of the application + #[arg(short, long)] + name: String, + }, } #[tokio::main] @@ -97,6 +111,8 @@ async fn main() -> Result<(), Box> { Some(Commands::Create { app, slots }) => create::run(&ctx, app, slots).await?, Some(Commands::View { session }) => view::run(&ctx, session).await?, Some(Commands::Migrate { url, sql }) => migrate::run(&ctx, url, sql).await?, + Some(Commands::Register { file }) => register::run(&ctx, file).await?, + Some(Commands::Unregister { name }) => unregister::run(&ctx, name).await?, _ => helper::run().await?, }; diff --git a/flmctl/src/register.rs b/flmctl/src/register.rs new file mode 100644 index 0000000..60ad389 --- /dev/null +++ b/flmctl/src/register.rs @@ -0,0 +1,88 @@ +/* +Copyright 2025 The Flame Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +use std::{fs, path::Path}; + +use serde_derive::{Deserialize, Serialize}; + +use common::ctx::FlameContext; +use flame_client::{self as flame, ApplicationAttributes, FlameError, Shim}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct MetadataYaml { + pub name: String, +} +#[derive(Debug, Clone, Serialize, Deserialize)] +struct SpecYaml { + pub shim: Option, + pub url: Option, + pub command: Option, + pub arguments: Option>, + pub environments: Option>, + pub working_directory: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct ApplicationYaml { + pub metadata: MetadataYaml, + pub spec: SpecYaml, +} + +pub async fn run(ctx: &FlameContext, path: &String) -> Result<(), FlameError> { + if !Path::new(&path).is_file() { + return Err(FlameError::InvalidConfig(format!( + "<{}> is not a file", + path + ))); + } + + let contents = + fs::read_to_string(path.clone()).map_err(|e| FlameError::Internal(e.to_string()))?; + let app: ApplicationYaml = + serde_yaml::from_str(&contents).map_err(|e| FlameError::Internal(e.to_string()))?; + + let app_attr = ApplicationAttributes::try_from(&app)?; + + let conn = flame::connect(&ctx.endpoint).await?; + + conn.register_application(app.metadata.name, app_attr) + .await?; + Ok(()) +} + +impl TryFrom<&ApplicationYaml> for ApplicationAttributes { + type Error = FlameError; + + fn try_from(yaml: &ApplicationYaml) -> Result { + let shim = match yaml + .spec + .shim + .clone() + .unwrap_or(String::from("grpc")) + .to_lowercase() + .as_str() + { + "grpc" => Ok(Shim::Grpc), + _ => Err(FlameError::InvalidConfig("unsupported shim".to_string())), + }?; + + Ok(Self { + shim, + url: yaml.spec.url.clone(), + command: yaml.spec.command.clone(), + arguments: yaml.spec.arguments.clone().unwrap_or_default(), + environments: yaml.spec.environments.clone().unwrap_or_default(), + working_directory: yaml.spec.working_directory.clone(), + }) + } +} diff --git a/flmctl/src/unregister.rs b/flmctl/src/unregister.rs new file mode 100644 index 0000000..986146c --- /dev/null +++ b/flmctl/src/unregister.rs @@ -0,0 +1,20 @@ +/* +Copyright 2025 The Flame Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +use std::error::Error; + +use common::ctx::FlameContext; + +pub async fn run(_: &FlameContext, _: &String) -> Result<(), Box> { + todo!() +} diff --git a/flmping/Cargo.toml b/flmping/Cargo.toml index 3e8e6ff..a419c79 100644 --- a/flmping/Cargo.toml +++ b/flmping/Cargo.toml @@ -9,7 +9,6 @@ edition = "2021" flame-client = { path = "../sdk/rust/client" } flame-service = { path = "../sdk/rust/service" } -rpc = { path = "../rpc" } common = { path = "../common" } tokio = { workspace = true } diff --git a/sdk/rust/client/src/lib.rs b/sdk/rust/client/src/lib.rs index cffb3c6..c05b5b8 100644 --- a/sdk/rust/client/src/lib.rs +++ b/sdk/rust/client/src/lib.rs @@ -15,18 +15,21 @@ use std::sync::{Arc, Mutex}; use bytes::Bytes; use chrono::{DateTime, TimeZone, Utc}; + use futures::TryFutureExt; use prost::Enumeration; use thiserror::Error; use tokio_stream::StreamExt; use tonic::transport::Channel; use tonic::transport::Endpoint; +use tonic::Request; use tonic::Status; use self::rpc::frontend_client::FrontendClient as FlameFrontendClient; use self::rpc::{ - CloseSessionRequest, CreateSessionRequest, CreateTaskRequest, GetTaskRequest, - ListApplicationRequest, ListSessionRequest, SessionSpec, TaskSpec, WatchTaskRequest, + ApplicationSpec, CloseSessionRequest, CreateSessionRequest, CreateTaskRequest, GetTaskRequest, + ListApplicationRequest, ListSessionRequest, RegisterApplicationRequest, SessionSpec, TaskSpec, + WatchTaskRequest, }; use crate::flame as rpc; use crate::trace::TraceFn; @@ -131,12 +134,24 @@ pub struct SessionAttributes { pub common_data: Option, } +#[derive(Clone)] +pub struct ApplicationAttributes { + pub shim: Shim, + + pub url: Option, + pub command: Option, + pub arguments: Vec, + pub environments: Vec, + pub working_directory: Option, +} + #[derive(Clone)] pub struct Application { pub name: ApplicationID, + + pub attributes: ApplicationAttributes, + pub state: ApplicationState, - pub shim: Shim, - pub command: Option, pub creation_time: DateTime, } @@ -214,6 +229,30 @@ impl Connection { .collect()) } + pub async fn register_application( + &self, + name: String, + app: ApplicationAttributes, + ) -> Result<(), FlameError> { + let mut client = FlameClient::new(self.channel.clone()); + + let req = RegisterApplicationRequest { + name, + application: Some(ApplicationSpec::from(app)), + }; + + let res = client + .register_application(Request::new(req)) + .await? + .into_inner(); + + if res.return_code < 0 { + Err(FlameError::Network(res.message.unwrap_or_default())) + } else { + Ok(()) + } + } + pub async fn list_application(&self) -> Result, FlameError> { let mut client = FlameClient::new(self.channel.clone()); let app_list = client.list_application(ListApplicationRequest {}).await?; @@ -384,14 +423,39 @@ impl From<&rpc::Application> for Application { Self { name: metadata.name, - shim: Shim::from(spec.shim()), + attributes: ApplicationAttributes::from(spec), state: ApplicationState::from(status.state()), - command: spec.command.clone(), creation_time, } } } +impl From for ApplicationSpec { + fn from(app: ApplicationAttributes) -> Self { + Self { + shim: app.shim.into(), + url: app.url.clone(), + command: app.command.clone(), + arguments: app.arguments.clone(), + environments: app.environments.clone(), + working_directory: app.working_directory.clone(), + } + } +} + +impl From for ApplicationAttributes { + fn from(app: ApplicationSpec) -> Self { + Self { + shim: app.shim().into(), + url: app.url.clone(), + command: app.command.clone(), + arguments: app.arguments.clone(), + environments: app.environments.clone(), + working_directory: app.working_directory.clone(), + } + } +} + impl From for Shim { fn from(shim: rpc::Shim) -> Self { match shim { diff --git a/session_manager/src/apiserver/frontend.rs b/session_manager/src/apiserver/frontend.rs index eb3b200..01e9e13 100644 --- a/session_manager/src/apiserver/frontend.rs +++ b/session_manager/src/apiserver/frontend.rs @@ -17,6 +17,7 @@ use ::rpc::flame::{ UnregisterApplicationRequest, UpdateApplicationRequest, }; use async_trait::async_trait; +use common::apis::ApplicationAttributes; use futures::Stream; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; @@ -30,7 +31,7 @@ use self::rpc::{ }; use rpc::flame as rpc; -use common::apis; +use common::{apis, FlameError}; use common::{trace::TraceFn, trace_fn}; use crate::apiserver::Flame; @@ -43,7 +44,27 @@ impl Frontend for Flame { &self, req: Request, ) -> Result, Status> { - todo!() + trace_fn!("Frontend::register_application"); + + let req = req.into_inner(); + let spec = req.application.ok_or(FlameError::InvalidConfig( + "applilcation spec is missed".to_string(), + ))?; + let res = self + .controller + .register_application(req.name, ApplicationAttributes::from(spec)) + .await; + + match res { + Ok(..) => Ok(Response::new(rpc::Result { + return_code: 0, + message: None, + })), + Err(e) => Ok(Response::new(rpc::Result { + return_code: -1, + message: Some(e.to_string()), + })), + } } async fn unregister_application( &self, diff --git a/session_manager/src/controller/mod.rs b/session_manager/src/controller/mod.rs index e658046..f6a0117 100644 --- a/session_manager/src/controller/mod.rs +++ b/session_manager/src/controller/mod.rs @@ -17,8 +17,9 @@ use std::sync::Arc; use std::task::{Context, Poll}; use common::apis::{ - Application, ApplicationID, CommonData, Executor, ExecutorID, ExecutorPtr, Session, SessionID, - SessionPtr, Task, TaskGID, TaskID, TaskInput, TaskOutput, TaskPtr, TaskState, + Application, ApplicationAttributes, ApplicationID, CommonData, Executor, ExecutorID, + ExecutorPtr, Session, SessionID, SessionPtr, Task, TaskGID, TaskID, TaskInput, TaskOutput, + TaskPtr, TaskState, }; use common::{lock_ptr, trace::TraceFn, trace_fn, FlameError}; @@ -98,6 +99,14 @@ impl Controller { self.storage.get_application(id).await } + pub async fn register_application( + &self, + name: String, + attr: ApplicationAttributes, + ) -> Result<(), FlameError> { + self.storage.register_application(name, attr).await + } + pub async fn list_application(&self) -> Result, FlameError> { self.storage.list_application().await } diff --git a/session_manager/src/storage/engine/mod.rs b/session_manager/src/storage/engine/mod.rs index d4a1c0d..2471506 100644 --- a/session_manager/src/storage/engine/mod.rs +++ b/session_manager/src/storage/engine/mod.rs @@ -17,8 +17,8 @@ use async_trait::async_trait; use crate::FlameError; use common::apis::{ - Application, ApplicationID, CommonData, Session, SessionID, Task, TaskGID, TaskInput, - TaskOutput, TaskState, + Application, ApplicationAttributes, ApplicationID, CommonData, Session, SessionID, Task, + TaskGID, TaskInput, TaskOutput, TaskState, }; mod sqlite; @@ -27,6 +27,11 @@ pub type EnginePtr = Arc; #[async_trait] pub trait Engine: Send + Sync + 'static { + async fn register_application( + &self, + name: String, + attr: ApplicationAttributes, + ) -> Result<(), FlameError>; async fn get_application(&self, id: ApplicationID) -> Result; async fn find_application(&self) -> Result, FlameError>; diff --git a/session_manager/src/storage/engine/sqlite.rs b/session_manager/src/storage/engine/sqlite.rs index 7f24892..fe4b36a 100644 --- a/session_manager/src/storage/engine/sqlite.rs +++ b/session_manager/src/storage/engine/sqlite.rs @@ -20,9 +20,14 @@ use chrono::{DateTime, Utc}; use sqlx::{migrate::MigrateDatabase, FromRow, Sqlite, SqlitePool}; use crate::FlameError; -use common::apis::{ - Application, ApplicationID, ApplicationState, CommonData, Session, SessionID, SessionState, - SessionStatus, Shim, Task, TaskGID, TaskID, TaskInput, TaskOutput, TaskState, +use common::{ + apis::{ + Application, ApplicationAttributes, ApplicationID, ApplicationState, CommonData, Session, + SessionID, SessionState, SessionStatus, Shim, Task, TaskGID, TaskID, TaskInput, TaskOutput, + TaskState, + }, + trace::TraceFn, + trace_fn, }; use crate::storage::engine::{Engine, EnginePtr}; @@ -98,6 +103,35 @@ impl SqliteEngine { #[async_trait] impl Engine for SqliteEngine { + async fn register_application( + &self, + name: String, + attr: ApplicationAttributes, + ) -> Result<(), FlameError> { + trace_fn!("Sqlite::register_application"); + + let mut tx = self + .pool + .begin() + .await + .map_err(|e| FlameError::Storage(format!("failed to begin TX: {e}")))?; + + let sql = "INSERT INTO applications (name, shim, command, creation_time, state) VALUES (?, ?, ?, strftime ('%s', 'now'), 0) RETURNING *"; + let app: ApplicationDao = sqlx::query_as(sql) + .bind(name) + .bind::(attr.shim.into()) + .bind(attr.command) + .fetch_one(&mut *tx) + .await + .map_err(|e| FlameError::Storage(format!("failed to execute SQL: {e}")))?; + + tx.commit() + .await + .map_err(|e| FlameError::Storage(format!("failed to commit TX: {e}")))?; + + Ok(()) + } + async fn get_application(&self, id: ApplicationID) -> Result { let mut tx = self .pool diff --git a/session_manager/src/storage/mod.rs b/session_manager/src/storage/mod.rs index eab5cb1..2f33676 100644 --- a/session_manager/src/storage/mod.rs +++ b/session_manager/src/storage/mod.rs @@ -16,8 +16,9 @@ use std::ops::Deref; use std::sync::Arc; use common::apis::{ - Application, ApplicationID, CommonData, Executor, ExecutorID, ExecutorPtr, Session, SessionID, - SessionPtr, SessionState, Task, TaskGID, TaskID, TaskInput, TaskOutput, TaskPtr, TaskState, + Application, ApplicationAttributes, ApplicationID, CommonData, Executor, ExecutorID, + ExecutorPtr, Session, SessionID, SessionPtr, SessionState, Task, TaskGID, TaskID, TaskInput, + TaskOutput, TaskPtr, TaskState, }; use common::ptr::{self, MutexPtr}; use common::{lock_ptr, FlameError}; @@ -199,6 +200,14 @@ impl Storage { self.engine.get_application(id).await } + pub async fn register_application( + &self, + name: String, + attr: ApplicationAttributes, + ) -> Result<(), FlameError> { + self.engine.register_application(name, attr).await + } + pub async fn list_application(&self) -> Result, FlameError> { self.engine.find_application().await }