From c0c3d6a4d4476c1df51d69185539c653438dcaca Mon Sep 17 00:00:00 2001 From: Pia Date: Mon, 15 Apr 2024 11:09:10 +0900 Subject: [PATCH] feat: serialize mock job --- Cargo.toml | 13 ++++++++++++- crates/common/Cargo.toml | 3 ++- crates/common/src/job.rs | 14 +++++++++++++- crates/delegator/src/main.rs | 15 +++++++++++++-- crates/executor/src/main.rs | 24 +++++++++++++++++++++++- 5 files changed, 63 insertions(+), 6 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 23d6cb4..95f1bc7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ license-file = "LICENSE" [workspace.dependencies] async-process = "2.2.0" async-stream = "0.3.5" +bincode = "1.3" cairo-felt = "0.9.1" cairo-proof-parser = { git = "https://github.com/Okm165/cairo-proof-parser", rev = "97a04bbee07330311b38d6f4cecfed3acb237626" } futures = "0.3.30" @@ -28,7 +29,17 @@ futures-core = "0.3.30" futures-util = "0.3.30" hex = "0.4.3" itertools = "0.12.1" -libp2p = { version = "0.53.2", features = ["tokio","gossipsub","kad","mdns","noise","macros","tcp","yamux","quic"]} +libp2p = { version = "0.53.2", features = [ + "tokio", + "gossipsub", + "kad", + "mdns", + "noise", + "macros", + "tcp", + "yamux", + "quic", +] } num-bigint = "0.4.4" serde = "1.0.197" serde_json = "1.0.115" diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index 6d964e3..c7baea8 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -8,10 +8,11 @@ license-file.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +bincode.workspace = true cairo-felt.workspace = true hex.workspace = true libp2p.workspace = true num-bigint.workspace = true serde_json.workspace = true serde.workspace = true -thiserror.workspace = true \ No newline at end of file +thiserror.workspace = true diff --git a/crates/common/src/job.rs b/crates/common/src/job.rs index 90c6c99..6b9b0b8 100644 --- a/crates/common/src/job.rs +++ b/crates/common/src/job.rs @@ -3,7 +3,9 @@ use std::{ hash::{DefaultHasher, Hash, Hasher}, }; -#[derive(Debug, PartialEq, Eq, Hash, Clone)] +use serde::{Deserialize, Serialize}; + +#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)] pub struct Job { pub reward: u32, pub num_of_steps: u32, @@ -20,3 +22,13 @@ impl Display for Job { write!(f, "{}", hex::encode(hasher.finish().to_be_bytes())) } } + +impl Job { + pub fn serialize_job(&self) -> Vec { + bincode::serialize(self).unwrap() + } + + pub fn deserialize_job(serialized_job: &[u8]) -> Self { + bincode::deserialize(serialized_job).unwrap() + } +} diff --git a/crates/delegator/src/main.rs b/crates/delegator/src/main.rs index 08723aa..f527308 100644 --- a/crates/delegator/src/main.rs +++ b/crates/delegator/src/main.rs @@ -1,4 +1,5 @@ use futures_util::StreamExt; +use sharp_p2p_common::job::Job; use sharp_p2p_common::network::Network; use sharp_p2p_common::topic::{gossipsub_ident_topic, Topic}; use sharp_p2p_peer::registry::RegistryHandler; @@ -36,8 +37,18 @@ async fn main() -> Result<(), Box> { loop { tokio::select! { - Ok(Some(line)) = stdin.next_line() => { - send_topic_tx.send(line.as_bytes().to_vec()).await?; + Ok(Some(_)) = stdin.next_line() => { + // TODO: Turn this into a real job generation + let job = Job { + reward: 100, + num_of_steps: 10, + private_input: vec![1, 2, 3], + public_input: vec![4, 5, 6], + cpu_air_prover_config: vec![7, 8, 9], + cpu_air_params: vec![10, 11, 12], + }; + let serialized_job = (job).serialize_job(); + send_topic_tx.send(serialized_job).await?; }, Some(event) = message_stream.next() => { info!("{:?}", event); diff --git a/crates/executor/src/main.rs b/crates/executor/src/main.rs index 8660dc8..bfff9b6 100644 --- a/crates/executor/src/main.rs +++ b/crates/executor/src/main.rs @@ -1,4 +1,6 @@ use futures_util::StreamExt; +use libp2p::gossipsub::Event; +use sharp_p2p_common::job::Job; use sharp_p2p_common::network::Network; use sharp_p2p_common::topic::{gossipsub_ident_topic, Topic}; use sharp_p2p_peer::registry::RegistryHandler; @@ -40,7 +42,27 @@ async fn main() -> Result<(), Box> { send_topic_tx.send(line.as_bytes().to_vec()).await?; }, Some(event) = message_stream.next() => { - info!("{:?}", event); + match event { + Event::Message { message, .. } => { + // Received a new-job message from the network + if message.topic == gossipsub_ident_topic(Network::Sepolia, Topic::NewJob).into() { + let deserialized_job = Job::deserialize_job(&message.data); + info!("Received a new job: {:?}", deserialized_job); + } + // Received a picked-job message from the network + if message.topic == gossipsub_ident_topic(Network::Sepolia, Topic::PickedJob).into() { + + info!("Received a picked job: {:?}", message); + } + }, + Event::Subscribed { peer_id, topic } => { + info!("{} subscribed to the topic {}", peer_id.to_string(), topic.to_string()); + }, + Event::Unsubscribed { peer_id, topic }=> { + info!("{} unsubscribed to the topic {}", peer_id.to_string(), topic.to_string()); + }, + _ => {} + } }, Some(Ok(event_vec)) = event_stream.next() => { info!("{:?}", event_vec);