Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Static addresses #23

Merged
merged 24 commits into from
Jul 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ Scarb.lock
.snfoundry_cache/
stone-prover
bootloader*.json
*.dockerfile
*docker-compose*.yaml

# Ignore Python-specific files
*.pyc
Expand All @@ -29,4 +31,4 @@ build/
.dockerignore.template
.dockerignore.generated
.dockerignore.custom
.dockerignore.local
.dockerignore.local
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ hyper = { version = "1.0", features = [] }
hyper-util = { version = "0.1", features = ["tokio", "server-auto", "http1"] }
tower = { version = "0.4", features = ["util"] }
tower-http = { version = "0.5", features = ["timeout", "trace", "cors"] }
clap = { version = "4.0", features = ["derive"] }

zetina-common = { path = "crates/common" }
zetina-compiler = { path = "crates/compiler" }
Expand Down
2 changes: 1 addition & 1 deletion crates/common/src/job_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ where
// add random wait to simulate network overhead
let random = {
let mut rng = rand::thread_rng();
rng.gen_range(0..1000)
rng.gen_range(0..2000)
};
tokio::time::sleep(std::time::Duration::from_millis(random)).await;
self.ordered_set.pop_last()
Expand Down
36 changes: 5 additions & 31 deletions crates/common/src/node_account.rs
Original file line number Diff line number Diff line change
@@ -1,58 +1,32 @@
use starknet::{
accounts::{ConnectedAccount, ExecutionEncoding, SingleOwnerAccount},
core::types::FieldElement,
providers::Provider,
signers::{LocalWallet, SigningKey, VerifyingKey},
signers::{SigningKey, VerifyingKey},
};

use crate::network::Network;

pub struct NodeAccount<P>
where
P: Provider + Sync + Send + 'static,
{
pub struct NodeAccount {
/// Key pair for the p2p network.
/// This represents the identity of the node in the network.
p2p_keypair: libp2p::identity::Keypair,
/// The account for the StarkNet network.
/// This account is used to interact with the Registry contract.
account: SingleOwnerAccount<P, LocalWallet>,
signing_key: SigningKey,
}

impl<P> NodeAccount<P>
where
P: Provider + Sync + Send + 'static,
{
pub fn new(private_key: Vec<u8>, address: Vec<u8>, network: Network, provider: P) -> Self {
impl NodeAccount {
pub fn new(private_key: Vec<u8>) -> Self {
let secret_key = libp2p::identity::ecdsa::SecretKey::try_from_bytes(private_key.as_slice())
.expect("Failed to create secret key from private key.");
let p2p_keypair =
libp2p::identity::Keypair::from(libp2p::identity::ecdsa::Keypair::from(secret_key));
let signing_key = SigningKey::from_secret_scalar(
FieldElement::from_byte_slice_be(private_key.as_slice()).unwrap(),
);
let signer = LocalWallet::from(signing_key.clone());
let address = FieldElement::from_byte_slice_be(address.as_slice()).unwrap();
let network = network.to_field_element();
let account =
SingleOwnerAccount::new(provider, signer, address, network, ExecutionEncoding::New);

Self { p2p_keypair, account, signing_key }
Self { p2p_keypair, signing_key }
}

pub fn get_keypair(&self) -> &libp2p::identity::Keypair {
&self.p2p_keypair
}

pub fn get_account(&self) -> &SingleOwnerAccount<P, LocalWallet> {
&self.account
}

pub fn get_provider(&self) -> &P {
self.account.provider()
}

pub fn get_signing_key(&self) -> &SigningKey {
&self.signing_key
}
Expand Down
4 changes: 2 additions & 2 deletions crates/delegator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ libp2p.workspace = true
serde.workspace = true
serde_json.workspace = true
zetina-common.workspace = true
zetina-compiler.workspace = true
zetina-peer.workspace = true
tokio.workspace = true
tracing-subscriber.workspace = true
Expand All @@ -31,4 +30,5 @@ axum.workspace = true
hyper.workspace = true
tower.workspace = true
hyper-util.workspace = true
tower-http.workspace = true
tower-http.workspace = true
clap.workspace = true
29 changes: 17 additions & 12 deletions crates/delegator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ use axum::{
routing::{get, post},
Router,
};
use clap::Parser;
use delegator::Delegator;
use libp2p::gossipsub;
use starknet::providers::{jsonrpc::HttpTransport, JsonRpcClient, Url};
use std::time::Duration;
use swarm::SwarmRunner;
use tokio::{
Expand All @@ -31,24 +31,28 @@ use zetina_common::{
topic::{gossipsub_ident_topic, Topic},
};

#[derive(Parser)]
struct Cli {
/// The private key as a hex string
#[arg(short, long)]
private_key: String,

#[arg(short, long)]
dial_addresses: Vec<String>,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let _ = tracing_subscriber::fmt().with_env_filter(EnvFilter::from_default_env()).try_init();

// Parse command line arguments
let cli = Cli::parse();

// TODO: common setup in node initiate binary
let network = Network::Sepolia;
let private_key =
hex::decode("018ef9563461ec2d88236d59039babf44c97d8bf6200d01d81170f1f60a78f32")?;
let account_address =
hex::decode("cdd51fbc4e008f4ef807eaf26f5043521ef5931bbb1e04032a25bd845d286b")?;
let url = "https://starknet-sepolia.public.blastapi.io";
let private_key = hex::decode(cli.private_key)?;

let node_account = NodeAccount::new(
private_key,
account_address,
network,
JsonRpcClient::new(HttpTransport::new(Url::parse(url)?)),
);
let node_account = NodeAccount::new(private_key);

// Generate topic
let job_topic = gossipsub_ident_topic(network, Topic::NewJob);
Expand All @@ -62,6 +66,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (job_topic_tx, job_topic_rx) = mpsc::channel::<Vec<u8>>(100);

SwarmRunner::new(
cli.dial_addresses,
node_account.get_keypair(),
vec![job_topic.to_owned(), picked_job_topic, finished_job_topic],
vec![(job_topic, job_topic_rx)],
Expand Down
40 changes: 20 additions & 20 deletions crates/delegator/src/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@ use futures::StreamExt;
use libp2p::gossipsub::{self, IdentTopic};
use libp2p::identity::Keypair;
use libp2p::swarm::{NetworkBehaviour, SwarmEvent};
use libp2p::{mdns, noise, tcp, yamux, SwarmBuilder};
use libp2p::{noise, tcp, yamux, Multiaddr, SwarmBuilder};
use std::error::Error;
use std::str::FromStr;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tracing::{debug, error};
use tracing::{debug, error, info};
use zetina_common::graceful_shutdown::shutdown_signal;

#[derive(NetworkBehaviour)]
pub struct PeerBehaviour {
gossipsub: gossipsub::Behaviour,
mdns: mdns::tokio::Behaviour,
}

pub struct SwarmRunner {
Expand All @@ -22,17 +22,14 @@ pub struct SwarmRunner {

impl SwarmRunner {
pub fn new(
dial_addresses: Vec<String>,
p2p_local_keypair: &Keypair,
subscribe_topics: Vec<IdentTopic>,
mut transmit_topics: Vec<(IdentTopic, mpsc::Receiver<Vec<u8>>)>,
swarm_events_tx: mpsc::Sender<gossipsub::Event>,
) -> Result<Self, Box<dyn Error>> {
let mdns = mdns::tokio::Behaviour::new(
mdns::Config::default(),
p2p_local_keypair.public().to_peer_id(),
)?;
let gossipsub = Self::init_gossip(p2p_local_keypair)?;
let behaviour = PeerBehaviour { gossipsub, mdns };
let behaviour = PeerBehaviour { gossipsub };
let local_keypair = p2p_local_keypair.clone();
let mut swarm = SwarmBuilder::with_existing_identity(local_keypair)
.with_tokio()
Expand All @@ -50,6 +47,13 @@ impl SwarmRunner {
swarm.behaviour_mut().gossipsub.subscribe(&topic)?;
}

// Reach out to other nodes if specified
for to_dial in dial_addresses {
let addr: Multiaddr = Multiaddr::from_str(&to_dial)?;
swarm.dial(addr)?;
info!("Dialed {to_dial:?}")
}

swarm.listen_on("/ip4/0.0.0.0/udp/5678/quic-v1".parse()?)?;
swarm.listen_on("/ip4/0.0.0.0/tcp/5679".parse()?)?;

Expand All @@ -66,18 +70,6 @@ impl SwarmRunner {
}
},
event = swarm.select_next_some() => match event {
SwarmEvent::Behaviour(PeerBehaviourEvent::Mdns(mdns::Event::Discovered(list))) => {
for (peer_id, _multiaddr) in list {
debug!("mDNS discovered a new peer: {peer_id}");
swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer_id);
}
},
SwarmEvent::Behaviour(PeerBehaviourEvent::Mdns(mdns::Event::Expired(list))) => {
for (peer_id, _multiaddr) in list {
debug!("mDNS discover peer has expired: {peer_id}");
swarm.behaviour_mut().gossipsub.remove_explicit_peer(&peer_id);
}
},
SwarmEvent::Behaviour(PeerBehaviourEvent::Gossipsub(gossipsub::Event::Message {
propagation_source,
message_id,
Expand All @@ -87,6 +79,14 @@ impl SwarmRunner {
},
SwarmEvent::NewListenAddr { address, .. } => {
debug!("Local node is listening on {address}");
},
SwarmEvent::ConnectionEstablished { peer_id, connection_id, num_established, .. } => {
info!{"ConnectionEstablished: peer_id {}, connection_id {}, num_established {}", peer_id, connection_id, num_established};
swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer_id);
}
SwarmEvent::ConnectionClosed { peer_id, connection_id, num_established, .. } => {
info!{"ConnectionClosed: peer_id {}, connection_id {}, num_established {}", peer_id, connection_id, num_established};
swarm.behaviour_mut().gossipsub.remove_explicit_peer(&peer_id);
}
_ => {}
},
Expand Down
3 changes: 2 additions & 1 deletion crates/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@ axum.workspace = true
hyper.workspace = true
tower.workspace = true
hyper-util.workspace = true
tower-http.workspace = true
tower-http.workspace = true
clap.workspace = true
62 changes: 57 additions & 5 deletions crates/executor/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,64 @@ impl Executor {
&& !job_record.is_empty()
{
if let Some(job) = job_record.take_job().await {
let serialized_job = serde_json::to_string(&job)?;
picked_job_topic_tx.send(serialized_job.into()).await?;
info!("Sent picked job event: {}", hash!(&job));
let mut flag = false;
if let Ok(event) = events_rx.try_recv() {
match event {
Event::Message { message, .. } => {
// Received a new-job message from the network
if message.topic
== gossipsub_ident_topic(
Network::Sepolia,
Topic::NewJob,
)
.into()
{
let job: Job = serde_json::from_slice(&message.data)?;
info!("Received a new job event: {}", hash!(&job));
job_record.register_job(job);
}
// Received a picked-job message from the network
if message.topic
== gossipsub_ident_topic(
Network::Sepolia,
Topic::PickedJob,
)
.into()
{
let job_removed: Job =
serde_json::from_slice(&message.data)?;
info!("Received picked job event: {}", hash!(&job));
job_record.remove_job(&job_removed);
if hash!(job_removed) == hash!(job) {
flag = true;
}
}
}
Event::Subscribed { peer_id, topic } => {
info!(
"{} subscribed to the topic {}",
peer_id.to_string(),
topic.to_string()
);
}
Event::Unsubscribed { peer_id, topic } => {
info!(
"{} unsubscribed to the topic {}",
peer_id.to_string(),
topic.to_string()
);
}
_ => {}
}
};
if flag == false {
let serialized_job = serde_json::to_string(&job)?;
picked_job_topic_tx.send(serialized_job.into()).await?;
info!("Sent picked job event: {}", hash!(&job));

info!("Scheduled run of job: {}", hash!(&job));
runner_scheduler.push(runner.run(job)?);
info!("Scheduled run of job: {}", hash!(&job));
runner_scheduler.push(runner.run(job)?);
}
}
}
}
Expand Down
Loading
Loading