diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 0000000..f4ab6e5 --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,2 @@ +[env] +RUST_LOG = "info" \ No newline at end of file diff --git a/crates/delegator/src/main.rs b/crates/delegator/src/main.rs index dc14300..08723aa 100644 --- a/crates/delegator/src/main.rs +++ b/crates/delegator/src/main.rs @@ -4,8 +4,9 @@ use sharp_p2p_common::topic::{gossipsub_ident_topic, Topic}; use sharp_p2p_peer::registry::RegistryHandler; use sharp_p2p_peer::swarm::SwarmRunner; use std::error::Error; +use tokio::io::{stdin, AsyncBufReadExt, BufReader}; use tokio::sync::mpsc; -use tracing::debug; +use tracing::info; use tracing_subscriber::EnvFilter; #[tokio::main] @@ -16,25 +17,33 @@ async fn main() -> Result<(), Box> { let p2p_local_keypair = libp2p::identity::Keypair::generate_ed25519(); // 2. Generate topic - let topic = gossipsub_ident_topic(Network::Sepolia, Topic::NewJob); + let new_job_topic = gossipsub_ident_topic(Network::Sepolia, Topic::NewJob); + let picked_job_topic = gossipsub_ident_topic(Network::Sepolia, Topic::PickedJob); - let mut swarm_runner = SwarmRunner::new(&p2p_local_keypair, &topic)?; + let mut swarm_runner = + SwarmRunner::new(&p2p_local_keypair, &[new_job_topic.to_owned(), picked_job_topic])?; let mut registry_handler = RegistryHandler::new( "https://starknet-sepolia.public.blastapi.io", "0xcdd51fbc4e008f4ef807eaf26f5043521ef5931bbb1e04032a25bd845d286b", ); - let (_send_topic_tx, send_topic_rx) = mpsc::channel::>(1000); - let mut message_stream = swarm_runner.run(topic, send_topic_rx); + let (send_topic_tx, send_topic_rx) = mpsc::channel::>(1000); + let mut message_stream = swarm_runner.run(new_job_topic, send_topic_rx); let mut event_stream = registry_handler.subscribe_events(vec!["0x0".to_string()]); + // Read full lines from stdin + let mut stdin = BufReader::new(stdin()).lines(); + loop { tokio::select! { + Ok(Some(line)) = stdin.next_line() => { + send_topic_tx.send(line.as_bytes().to_vec()).await?; + }, Some(event) = message_stream.next() => { - debug!("{:?}", event); + info!("{:?}", event); }, Some(Ok(event_vec)) = event_stream.next() => { - debug!("{:?}", event_vec); + info!("{:?}", event_vec); }, else => break } diff --git a/crates/executor/src/main.rs b/crates/executor/src/main.rs index 2709a79..8660dc8 100644 --- a/crates/executor/src/main.rs +++ b/crates/executor/src/main.rs @@ -4,8 +4,9 @@ use sharp_p2p_common::topic::{gossipsub_ident_topic, Topic}; use sharp_p2p_peer::registry::RegistryHandler; use sharp_p2p_peer::swarm::SwarmRunner; use std::error::Error; +use tokio::io::{stdin, AsyncBufReadExt, BufReader}; use tokio::sync::mpsc; -use tracing::debug; +use tracing::info; use tracing_subscriber::EnvFilter; #[tokio::main] @@ -16,25 +17,33 @@ async fn main() -> Result<(), Box> { let p2p_local_keypair = libp2p::identity::Keypair::generate_ed25519(); // 2. Generate topic - let topic = gossipsub_ident_topic(Network::Sepolia, Topic::PickedJob); + let new_job_topic = gossipsub_ident_topic(Network::Sepolia, Topic::NewJob); + let picked_job_topic = gossipsub_ident_topic(Network::Sepolia, Topic::PickedJob); - let mut swarm_runner = SwarmRunner::new(&p2p_local_keypair, &topic)?; + let mut swarm_runner = + SwarmRunner::new(&p2p_local_keypair, &[new_job_topic, picked_job_topic.to_owned()])?; let mut registry_handler = RegistryHandler::new( "https://starknet-sepolia.public.blastapi.io", "0xcdd51fbc4e008f4ef807eaf26f5043521ef5931bbb1e04032a25bd845d286b", ); - let (_send_topic_tx, send_topic_rx) = mpsc::channel::>(1000); - let mut message_stream = swarm_runner.run(topic, send_topic_rx); + let (send_topic_tx, send_topic_rx) = mpsc::channel::>(1000); + let mut message_stream = swarm_runner.run(picked_job_topic, send_topic_rx); let mut event_stream = registry_handler.subscribe_events(vec!["0x0".to_string()]); + // Read full lines from stdin + let mut stdin = BufReader::new(stdin()).lines(); + loop { tokio::select! { + Ok(Some(line)) = stdin.next_line() => { + send_topic_tx.send(line.as_bytes().to_vec()).await?; + }, Some(event) = message_stream.next() => { - debug!("{:?}", event); + info!("{:?}", event); }, Some(Ok(event_vec)) = event_stream.next() => { - debug!("{:?}", event_vec); + info!("{:?}", event_vec); }, else => break } diff --git a/crates/peer/src/swarm.rs b/crates/peer/src/swarm.rs index aea6100..52ee51a 100644 --- a/crates/peer/src/swarm.rs +++ b/crates/peer/src/swarm.rs @@ -7,6 +7,7 @@ use libp2p::swarm::{NetworkBehaviour, SwarmEvent}; use libp2p::{mdns, noise, tcp, yamux, Swarm, SwarmBuilder}; use std::error::Error; use std::pin::Pin; +use std::time::Duration; use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; use tracing::{debug, error}; @@ -25,46 +26,43 @@ pub struct SwarmRunner { impl SwarmRunner { pub fn new( p2p_local_keypair: &Keypair, - subscribe_topic: &IdentTopic, + subscribe_topics: &[IdentTopic], ) -> Result> { let mdns = mdns::tokio::Behaviour::new( mdns::Config::default(), p2p_local_keypair.public().to_peer_id(), )?; - let gossipsub = Self::init_gossip(p2p_local_keypair, subscribe_topic)?; + let gossipsub = Self::init_gossip(p2p_local_keypair)?; let behaviour = PeerBehaviour { gossipsub, mdns }; let local_keypair = p2p_local_keypair.clone(); let mut swarm = SwarmBuilder::with_existing_identity(local_keypair) .with_tokio() .with_tcp(tcp::Config::default(), noise::Config::new, yamux::Config::default)? .with_quic() - .with_behaviour(|_| behaviour) - .expect("Moving behaviour doesn't fail") + .with_behaviour(|_| behaviour)? + .with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60))) .build(); + for topic in subscribe_topics { + swarm.behaviour_mut().gossipsub.subscribe(topic)?; + } + swarm.listen_on("/ip4/0.0.0.0/udp/0/quic-v1".parse()?)?; swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; Ok(SwarmRunner { swarm, cancellation_token: CancellationToken::new() }) } - fn init_gossip( - p2p_local_keypair: &Keypair, - subscribe_topic: &IdentTopic, - ) -> Result> { + fn init_gossip(p2p_local_keypair: &Keypair) -> Result> { let message_authenticity = gossipsub::MessageAuthenticity::Signed(p2p_local_keypair.clone()); + let config = gossipsub::ConfigBuilder::default() + .heartbeat_interval(Duration::from_secs(10)) .validation_mode(gossipsub::ValidationMode::Strict) - .validate_messages() - .build() - .unwrap(); - let mut gossipsub: gossipsub::Behaviour = - gossipsub::Behaviour::new(message_authenticity, config).unwrap(); - - gossipsub.subscribe(subscribe_topic)?; + .build()?; - Ok(gossipsub) + Ok(gossipsub::Behaviour::new(message_authenticity, config)?) } pub fn run(