Skip to content

Commit

Permalink
gossip fix stdin testing
Browse files Browse the repository at this point in the history
  • Loading branch information
Okm165 committed Apr 14, 2024
1 parent d2633d5 commit acb9ac1
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 30 deletions.
2 changes: 2 additions & 0 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[env]
RUST_LOG = "info"
23 changes: 16 additions & 7 deletions crates/delegator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ use sharp_p2p_common::topic::{gossipsub_ident_topic, Topic};
use sharp_p2p_peer::registry::RegistryHandler;
use sharp_p2p_peer::swarm::SwarmRunner;
use std::error::Error;
use tokio::io::{stdin, AsyncBufReadExt, BufReader};
use tokio::sync::mpsc;
use tracing::debug;
use tracing::info;
use tracing_subscriber::EnvFilter;

#[tokio::main]
Expand All @@ -16,25 +17,33 @@ async fn main() -> Result<(), Box<dyn Error>> {
let p2p_local_keypair = libp2p::identity::Keypair::generate_ed25519();

// 2. Generate topic
let topic = gossipsub_ident_topic(Network::Sepolia, Topic::NewJob);
let new_job_topic = gossipsub_ident_topic(Network::Sepolia, Topic::NewJob);
let picked_job_topic = gossipsub_ident_topic(Network::Sepolia, Topic::PickedJob);

let mut swarm_runner = SwarmRunner::new(&p2p_local_keypair, &topic)?;
let mut swarm_runner =
SwarmRunner::new(&p2p_local_keypair, &[new_job_topic.to_owned(), picked_job_topic])?;
let mut registry_handler = RegistryHandler::new(
"https://starknet-sepolia.public.blastapi.io",
"0xcdd51fbc4e008f4ef807eaf26f5043521ef5931bbb1e04032a25bd845d286b",
);

let (_send_topic_tx, send_topic_rx) = mpsc::channel::<Vec<u8>>(1000);
let mut message_stream = swarm_runner.run(topic, send_topic_rx);
let (send_topic_tx, send_topic_rx) = mpsc::channel::<Vec<u8>>(1000);
let mut message_stream = swarm_runner.run(new_job_topic, send_topic_rx);
let mut event_stream = registry_handler.subscribe_events(vec!["0x0".to_string()]);

// Read full lines from stdin
let mut stdin = BufReader::new(stdin()).lines();

loop {
tokio::select! {
Ok(Some(line)) = stdin.next_line() => {
send_topic_tx.send(line.as_bytes().to_vec()).await?;
},
Some(event) = message_stream.next() => {
debug!("{:?}", event);
info!("{:?}", event);
},
Some(Ok(event_vec)) = event_stream.next() => {
debug!("{:?}", event_vec);
info!("{:?}", event_vec);
},
else => break
}
Expand Down
23 changes: 16 additions & 7 deletions crates/executor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ use sharp_p2p_common::topic::{gossipsub_ident_topic, Topic};
use sharp_p2p_peer::registry::RegistryHandler;
use sharp_p2p_peer::swarm::SwarmRunner;
use std::error::Error;
use tokio::io::{stdin, AsyncBufReadExt, BufReader};
use tokio::sync::mpsc;
use tracing::debug;
use tracing::info;
use tracing_subscriber::EnvFilter;

#[tokio::main]
Expand All @@ -16,25 +17,33 @@ async fn main() -> Result<(), Box<dyn Error>> {
let p2p_local_keypair = libp2p::identity::Keypair::generate_ed25519();

// 2. Generate topic
let topic = gossipsub_ident_topic(Network::Sepolia, Topic::PickedJob);
let new_job_topic = gossipsub_ident_topic(Network::Sepolia, Topic::NewJob);
let picked_job_topic = gossipsub_ident_topic(Network::Sepolia, Topic::PickedJob);

let mut swarm_runner = SwarmRunner::new(&p2p_local_keypair, &topic)?;
let mut swarm_runner =
SwarmRunner::new(&p2p_local_keypair, &[new_job_topic, picked_job_topic.to_owned()])?;
let mut registry_handler = RegistryHandler::new(
"https://starknet-sepolia.public.blastapi.io",
"0xcdd51fbc4e008f4ef807eaf26f5043521ef5931bbb1e04032a25bd845d286b",
);

let (_send_topic_tx, send_topic_rx) = mpsc::channel::<Vec<u8>>(1000);
let mut message_stream = swarm_runner.run(topic, send_topic_rx);
let (send_topic_tx, send_topic_rx) = mpsc::channel::<Vec<u8>>(1000);
let mut message_stream = swarm_runner.run(picked_job_topic, send_topic_rx);
let mut event_stream = registry_handler.subscribe_events(vec!["0x0".to_string()]);

// Read full lines from stdin
let mut stdin = BufReader::new(stdin()).lines();

loop {
tokio::select! {
Ok(Some(line)) = stdin.next_line() => {
send_topic_tx.send(line.as_bytes().to_vec()).await?;
},
Some(event) = message_stream.next() => {
debug!("{:?}", event);
info!("{:?}", event);
},
Some(Ok(event_vec)) = event_stream.next() => {
debug!("{:?}", event_vec);
info!("{:?}", event_vec);
},
else => break
}
Expand Down
30 changes: 14 additions & 16 deletions crates/peer/src/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use libp2p::swarm::{NetworkBehaviour, SwarmEvent};
use libp2p::{mdns, noise, tcp, yamux, Swarm, SwarmBuilder};
use std::error::Error;
use std::pin::Pin;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error};
Expand All @@ -25,46 +26,43 @@ pub struct SwarmRunner {
impl SwarmRunner {
pub fn new(
p2p_local_keypair: &Keypair,
subscribe_topic: &IdentTopic,
subscribe_topics: &[IdentTopic],
) -> Result<Self, Box<dyn Error>> {
let mdns = mdns::tokio::Behaviour::new(
mdns::Config::default(),
p2p_local_keypair.public().to_peer_id(),
)?;
let gossipsub = Self::init_gossip(p2p_local_keypair, subscribe_topic)?;
let gossipsub = Self::init_gossip(p2p_local_keypair)?;
let behaviour = PeerBehaviour { gossipsub, mdns };
let local_keypair = p2p_local_keypair.clone();
let mut swarm = SwarmBuilder::with_existing_identity(local_keypair)
.with_tokio()
.with_tcp(tcp::Config::default(), noise::Config::new, yamux::Config::default)?
.with_quic()
.with_behaviour(|_| behaviour)
.expect("Moving behaviour doesn't fail")
.with_behaviour(|_| behaviour)?
.with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60)))
.build();

for topic in subscribe_topics {
swarm.behaviour_mut().gossipsub.subscribe(topic)?;
}

swarm.listen_on("/ip4/0.0.0.0/udp/0/quic-v1".parse()?)?;
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;

Ok(SwarmRunner { swarm, cancellation_token: CancellationToken::new() })
}

fn init_gossip(
p2p_local_keypair: &Keypair,
subscribe_topic: &IdentTopic,
) -> Result<gossipsub::Behaviour, Box<dyn Error>> {
fn init_gossip(p2p_local_keypair: &Keypair) -> Result<gossipsub::Behaviour, Box<dyn Error>> {
let message_authenticity =
gossipsub::MessageAuthenticity::Signed(p2p_local_keypair.clone());

let config = gossipsub::ConfigBuilder::default()
.heartbeat_interval(Duration::from_secs(10))
.validation_mode(gossipsub::ValidationMode::Strict)
.validate_messages()
.build()
.unwrap();
let mut gossipsub: gossipsub::Behaviour =
gossipsub::Behaviour::new(message_authenticity, config).unwrap();

gossipsub.subscribe(subscribe_topic)?;
.build()?;

Ok(gossipsub)
Ok(gossipsub::Behaviour::new(message_authenticity, config)?)
}

pub fn run(
Expand Down

0 comments on commit acb9ac1

Please sign in to comment.