diff --git a/Cargo.lock b/Cargo.lock index db33a99..ddd5f0b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2164,6 +2164,21 @@ dependencies = [ "indexmap 2.7.1", ] +[[package]] +name = "pi" +version = "0.1.0" +dependencies = [ + "clap", + "env_logger", + "flame-rs", + "futures", + "log", + "rand 0.9.0", + "rand_distr", + "tokio", + "tonic", +] + [[package]] name = "pin-project" version = "1.1.9" @@ -2388,6 +2403,16 @@ dependencies = [ "zerocopy 0.8.17", ] +[[package]] +name = "rand_distr" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddc3b5afe4c995c44540865b8ca5c52e6a59fa362da96c5d30886930ddc8da1c" +dependencies = [ + "num-traits", + "rand 0.9.0", +] + [[package]] name = "rayon" version = "1.10.0" diff --git a/Cargo.toml b/Cargo.toml index dd259bd..2fd10be 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ members = [ "executor_manager", "rpc", "sdk/rust", + "examples/pi", ] [workspace.dependencies] diff --git a/examples/pi/Cargo.toml b/examples/pi/Cargo.toml new file mode 100644 index 0000000..fb042fe --- /dev/null +++ b/examples/pi/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "pi" +version = "0.1.0" +edition = "2021" + +[dependencies] +flame-rs = { path = "../../sdk/rust" } +tokio = { workspace = true } +tonic = { workspace = true } +env_logger = { workspace = true } +log = { workspace = true } +rand_distr = "*" +rand="*" +futures = "0.3" +clap = { version = "4.1", features = ["derive"] } + +[[bin]] +name = "pi-service" +path = "src/service.rs" + +[[bin]] +name = "pi" +path = "src/client.rs" \ No newline at end of file diff --git a/examples/pi/pi-app.yaml b/examples/pi/pi-app.yaml new file mode 100644 index 0000000..d3872d2 --- /dev/null +++ b/examples/pi/pi-app.yaml @@ -0,0 +1,5 @@ +metadata: + name: pi-app +spec: + shim: grpc + command: /usr/local/flame/bin/pi-service diff --git a/examples/pi/src/client.rs b/examples/pi/src/client.rs new file mode 100644 index 0000000..9c31e9b --- /dev/null +++ b/examples/pi/src/client.rs @@ -0,0 +1,111 @@ +/* +Copyright 2025 The Flame Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +use std::error::Error; +use std::sync::{Arc, Mutex}; + +mod util; + +use flame_rs::apis::{FlameContext, FlameError, TaskInput}; +use flame_rs::client::{SessionAttributes, Task, TaskInformer}; +use flame_rs::{self as flame, lock_ptr, new_ptr}; + +use clap::Parser; +use futures::future::try_join_all; + +#[derive(Parser)] +#[command(name = "pi")] +#[command(author = "Klaus Ma ")] +#[command(version = "0.1.0")] +#[command(about = "Flame Pi Example", long_about = None)] +struct Cli { + #[arg(short, long)] + app: Option, + #[arg(short, long)] + slots: Option, + #[arg(long)] + task_num: Option, + #[arg(long)] + task_input: Option, +} + +const DEFAULT_APP: &str = "pi-app"; +const DEFAULT_SLOTS: i32 = 1; +const DEFAULT_TASK_NUM: u32 = 10; +const DEFAULT_TASK_INPUT: u32 = 10000; + +#[tokio::main] +async fn main() -> Result<(), Box> { + env_logger::init(); + let cli = Cli::parse(); + + let app = cli.app.unwrap_or(DEFAULT_APP.to_string()); + let slots = cli.slots.unwrap_or(DEFAULT_SLOTS); + let task_num = cli.task_num.unwrap_or(DEFAULT_TASK_NUM); + let task_input = cli.task_input.unwrap_or(DEFAULT_TASK_INPUT); + + let ctx = FlameContext::from_file(None)?; + + let conn = flame::client::connect(&ctx.endpoint).await?; + let ssn = conn + .create_session(&SessionAttributes { + application: app, + slots, + common_data: None, + }) + .await?; + + let informer = new_ptr!(PiInfo { area: 0 }); + let mut tasks = vec![]; + for _ in 0..task_num { + let task_input = util::u32_to_bytes(task_input); + let task = ssn.run_task(Some(TaskInput::from(task_input)), informer.clone()); + tasks.push(task); + } + + try_join_all(tasks).await?; + + { + // Get the number of points in the circle. + let informer = lock_ptr!(informer)?; + let pi = 4_f64 * informer.area as f64 / ((task_num as f64) * (task_input as f64)); + + println!( + "pi = 4*({}/{}) = {}", + informer.area, + task_num * task_input, + pi + ); + } + + ssn.close().await?; + + Ok(()) +} + +pub struct PiInfo { + pub area: u64, +} + +impl TaskInformer for PiInfo { + fn on_update(&mut self, task: Task) { + if let Some(output) = task.output { + let output = util::bytes_to_u32(output.to_vec()).ok().unwrap_or_default(); + self.area += output as u64; + } + } + + fn on_error(&mut self, _: FlameError) { + print!("Got an error") + } +} diff --git a/examples/pi/src/service.rs b/examples/pi/src/service.rs new file mode 100644 index 0000000..da6ce5d --- /dev/null +++ b/examples/pi/src/service.rs @@ -0,0 +1,68 @@ +/* +Copyright 2025 The Flame Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +mod util; + +use rand::distr::{Distribution, Uniform}; + +use flame_rs::{ + self as flame, + apis::{FlameError, TaskOutput}, + service::{SessionContext, TaskContext}, +}; + +#[derive(Clone)] +pub struct PiService {} + +#[tonic::async_trait] +impl flame::service::FlameService for PiService { + async fn on_session_enter(&self, _: SessionContext) -> Result<(), FlameError> { + Ok(()) + } + + async fn on_task_invoke(&self, ctx: TaskContext) -> Result, FlameError> { + let mut rng = rand::rng(); + let die = Uniform::try_from(0.0..1.0).unwrap(); + + let input = ctx.input.unwrap_or(util::zero_u32()); + let total = util::bytes_to_u32(input.to_vec())?; + let mut sum = 0u32; + + for _ in 0..total { + let x: f64 = die.sample(&mut rng); + let y: f64 = die.sample(&mut rng); + let dist = (x * x + y * y).sqrt(); + + if dist <= 1.0 { + sum += 1; + } + } + + Ok(Some(TaskOutput::from(util::u32_to_bytes(sum)))) + } + + async fn on_session_leave(&self) -> Result<(), FlameError> { + Ok(()) + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + env_logger::init(); + + flame::service::run(PiService {}).await?; + + log::debug!("FlmpingService was stopped."); + + Ok(()) +} diff --git a/examples/pi/src/util.rs b/examples/pi/src/util.rs new file mode 100644 index 0000000..4689d0f --- /dev/null +++ b/examples/pi/src/util.rs @@ -0,0 +1,33 @@ +/* +Copyright 2025 The Flame Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +use flame_rs::apis::{FlameError, TaskOutput}; + +pub fn zero_u32() -> TaskOutput { + TaskOutput::from(vec![0u8, 0u8, 0u8, 0u8]) +} + +pub fn u32_to_bytes(i: u32) -> Vec { + i.to_be_bytes().to_vec() +} + +pub fn bytes_to_u32(v: Vec) -> Result { + if v.len() != 4 { + return Err(FlameError::InvalidConfig( + "Vec must contain exactly 4 bytes".to_string(), + )); + } + + let byte_array: [u8; 4] = v.try_into().unwrap(); + Ok(u32::from_be_bytes(byte_array)) +}