diff --git a/Cargo.toml b/Cargo.toml index 2a28df6..9346623 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,6 +70,7 @@ hyper = { version = "1.0", features = [] } hyper-util = { version = "0.1", features = ["tokio", "server-auto", "http1"] } tower = { version = "0.4", features = ["util"] } tower-http = { version = "0.5", features = ["timeout", "trace", "cors"] } +clap = { version = "4.0", features = ["derive"] } zetina-common = { path = "crates/common" } zetina-compiler = { path = "crates/compiler" } diff --git a/crates/delegator/Cargo.toml b/crates/delegator/Cargo.toml index 487c7aa..4aae6ad 100644 --- a/crates/delegator/Cargo.toml +++ b/crates/delegator/Cargo.toml @@ -31,4 +31,5 @@ axum.workspace = true hyper.workspace = true tower.workspace = true hyper-util.workspace = true -tower-http.workspace = true \ No newline at end of file +tower-http.workspace = true +clap.workspace = true \ No newline at end of file diff --git a/crates/delegator/src/main.rs b/crates/delegator/src/main.rs index 021ca01..dd38886 100644 --- a/crates/delegator/src/main.rs +++ b/crates/delegator/src/main.rs @@ -7,6 +7,7 @@ use axum::{ routing::{get, post}, Router, }; +use clap::Parser; use delegator::Delegator; use libp2p::gossipsub; use std::time::Duration; @@ -30,14 +31,26 @@ use zetina_common::{ topic::{gossipsub_ident_topic, Topic}, }; +#[derive(Parser)] +struct Cli { + /// The private key as a hex string + #[arg(short, long)] + private_key: String, + + #[arg(short, long)] + dial_addresses: Vec, +} + #[tokio::main] async fn main() -> Result<(), Box> { let _ = tracing_subscriber::fmt().with_env_filter(EnvFilter::from_default_env()).try_init(); + // Parse command line arguments + let cli = Cli::parse(); + // TODO: common setup in node initiate binary let network = Network::Sepolia; - let private_key = - hex::decode("018ef9563461ec2d88236d59039babf44c97d8bf6200d01d81170f1f60a78f32")?; + let private_key = hex::decode(cli.private_key)?; let node_account = NodeAccount::new(private_key); @@ -55,6 +68,7 @@ async fn main() -> Result<(), Box> { let (job_topic_tx, job_topic_rx) = mpsc::channel::>(100); SwarmRunner::new( + cli.dial_addresses, &p2p_keypair, vec![job_topic.to_owned(), picked_job_topic, finished_job_topic], vec![(job_topic, job_topic_rx)], diff --git a/crates/delegator/src/swarm.rs b/crates/delegator/src/swarm.rs index db59d5f..4ad78c2 100644 --- a/crates/delegator/src/swarm.rs +++ b/crates/delegator/src/swarm.rs @@ -2,12 +2,13 @@ use futures::StreamExt; use libp2p::gossipsub::{self, IdentTopic}; use libp2p::identity::Keypair; use libp2p::swarm::{NetworkBehaviour, SwarmEvent}; -use libp2p::{noise, tcp, yamux, SwarmBuilder}; +use libp2p::{noise, tcp, yamux, Multiaddr, SwarmBuilder}; use std::error::Error; +use std::str::FromStr; use std::time::Duration; use tokio::sync::mpsc; use tokio::task::JoinHandle; -use tracing::{debug, error}; +use tracing::{debug, error, info}; use zetina_common::graceful_shutdown::shutdown_signal; #[derive(NetworkBehaviour)] @@ -21,6 +22,7 @@ pub struct SwarmRunner { impl SwarmRunner { pub fn new( + dial_addresses: Vec, p2p_local_keypair: &Keypair, subscribe_topics: Vec, mut transmit_topics: Vec<(IdentTopic, mpsc::Receiver>)>, @@ -45,6 +47,13 @@ impl SwarmRunner { swarm.behaviour_mut().gossipsub.subscribe(&topic)?; } + // Reach out to other nodes if specified + for to_dial in dial_addresses { + let addr: Multiaddr = Multiaddr::from_str(&to_dial)?; + swarm.dial(addr)?; + info!("Dialed {to_dial:?}") + } + swarm.listen_on("/ip4/0.0.0.0/udp/5678/quic-v1".parse()?)?; swarm.listen_on("/ip4/0.0.0.0/tcp/5679".parse()?)?; diff --git a/crates/executor/Cargo.toml b/crates/executor/Cargo.toml index 6aa7259..e011888 100644 --- a/crates/executor/Cargo.toml +++ b/crates/executor/Cargo.toml @@ -32,4 +32,5 @@ axum.workspace = true hyper.workspace = true tower.workspace = true hyper-util.workspace = true -tower-http.workspace = true \ No newline at end of file +tower-http.workspace = true +clap.workspace = true \ No newline at end of file diff --git a/crates/executor/src/main.rs b/crates/executor/src/main.rs index 546797b..ff467a4 100644 --- a/crates/executor/src/main.rs +++ b/crates/executor/src/main.rs @@ -2,6 +2,7 @@ pub mod executor; pub mod swarm; use axum::Router; +use clap::Parser; use executor::Executor; use libp2p::gossipsub; use std::time::Duration; @@ -18,10 +19,23 @@ use zetina_common::{ use zetina_prover::stone_prover::StoneProver; use zetina_runner::cairo_runner::CairoRunner; +#[derive(Parser)] +struct Cli { + /// The private key as a hex string + #[arg(short, long)] + private_key: String, + + #[arg(short, long)] + dial_addresses: Vec, +} + #[tokio::main] async fn main() -> Result<(), Box> { let _ = tracing_subscriber::fmt().with_env_filter(EnvFilter::from_default_env()).try_init(); + // Parse command line arguments + let cli = Cli::parse(); + let ws_root = std::path::PathBuf::from( std::env::var("CARGO_MANIFEST_DIR").expect("CARGO_MANIFEST_DIR env not present"), ) @@ -30,8 +44,7 @@ async fn main() -> Result<(), Box> { // TODO: common setup in node initiate binary let network = Network::Sepolia; - let private_key = - hex::decode("07c7a41c77c7a3b19e7c77485854fc88b09ed7041361595920009f81236d55d2")?; + let private_key = hex::decode(cli.private_key)?; let node_account = NodeAccount::new(private_key); @@ -45,6 +58,7 @@ async fn main() -> Result<(), Box> { let (finished_job_topic_tx, finished_job_topic_rx) = mpsc::channel::>(1000); SwarmRunner::new( + cli.dial_addresses, node_account.get_keypair(), vec![new_job_topic, picked_job_topic.to_owned(), finished_job_topic.to_owned()], vec![ diff --git a/crates/executor/src/swarm.rs b/crates/executor/src/swarm.rs index 4344865..89bd5f2 100644 --- a/crates/executor/src/swarm.rs +++ b/crates/executor/src/swarm.rs @@ -2,12 +2,13 @@ use futures::StreamExt; use libp2p::gossipsub::{self, IdentTopic}; use libp2p::identity::Keypair; use libp2p::swarm::{NetworkBehaviour, SwarmEvent}; -use libp2p::{noise, tcp, yamux, SwarmBuilder}; +use libp2p::{noise, tcp, yamux, Multiaddr, SwarmBuilder}; use std::error::Error; +use std::str::FromStr; use std::time::Duration; use tokio::sync::mpsc; use tokio::task::JoinHandle; -use tracing::{debug, error}; +use tracing::{debug, error, info}; use zetina_common::graceful_shutdown::shutdown_signal; #[derive(NetworkBehaviour)] @@ -21,6 +22,7 @@ pub struct SwarmRunner { impl SwarmRunner { pub fn new( + dial_addresses: Vec, p2p_local_keypair: &Keypair, subscribe_topics: Vec, mut transmit_topics: Vec<(IdentTopic, mpsc::Receiver>)>, @@ -48,6 +50,13 @@ impl SwarmRunner { swarm.listen_on("/ip4/0.0.0.0/udp/5678/quic-v1".parse()?)?; swarm.listen_on("/ip4/0.0.0.0/tcp/5679".parse()?)?; + // Reach out to other nodes if specified + for to_dial in dial_addresses { + let addr: Multiaddr = Multiaddr::from_str(&to_dial)?; + swarm.dial(addr)?; + info!("Dialed {to_dial:?}") + } + Ok(SwarmRunner { handle: Some(tokio::spawn(async move { // TODO make it nicer solution, extensible not manual!