Skip to content

Commit

Permalink
Merge pull request #23 from iosis-tech/static_addresses
Browse files Browse the repository at this point in the history
Static addresses
  • Loading branch information
Okm165 authored Jul 14, 2024
2 parents 242ca2f + 0d8bcda commit fa2cf1f
Show file tree
Hide file tree
Showing 35 changed files with 720 additions and 177 deletions.
4 changes: 3 additions & 1 deletion .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ Scarb.lock
.snfoundry_cache/
stone-prover
bootloader*.json
*.dockerfile
*docker-compose*.yaml

# Ignore Python-specific files
*.pyc
Expand All @@ -29,4 +31,4 @@ build/
.dockerignore.template
.dockerignore.generated
.dockerignore.custom
.dockerignore.local
.dockerignore.local
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ hyper = { version = "1.0", features = [] }
hyper-util = { version = "0.1", features = ["tokio", "server-auto", "http1"] }
tower = { version = "0.4", features = ["util"] }
tower-http = { version = "0.5", features = ["timeout", "trace", "cors"] }
clap = { version = "4.0", features = ["derive"] }

zetina-common = { path = "crates/common" }
zetina-compiler = { path = "crates/compiler" }
Expand Down
2 changes: 1 addition & 1 deletion crates/common/src/job_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ where
// add random wait to simulate network overhead
let random = {
let mut rng = rand::thread_rng();
rng.gen_range(0..1000)
rng.gen_range(0..2000)
};
tokio::time::sleep(std::time::Duration::from_millis(random)).await;
self.ordered_set.pop_last()
Expand Down
36 changes: 5 additions & 31 deletions crates/common/src/node_account.rs
Original file line number Diff line number Diff line change
@@ -1,58 +1,32 @@
use starknet::{
accounts::{ConnectedAccount, ExecutionEncoding, SingleOwnerAccount},
core::types::FieldElement,
providers::Provider,
signers::{LocalWallet, SigningKey, VerifyingKey},
signers::{SigningKey, VerifyingKey},
};

use crate::network::Network;

pub struct NodeAccount<P>
where
P: Provider + Sync + Send + 'static,
{
pub struct NodeAccount {
/// Key pair for the p2p network.
/// This represents the identity of the node in the network.
p2p_keypair: libp2p::identity::Keypair,
/// The account for the StarkNet network.
/// This account is used to interact with the Registry contract.
account: SingleOwnerAccount<P, LocalWallet>,
signing_key: SigningKey,
}

impl<P> NodeAccount<P>
where
P: Provider + Sync + Send + 'static,
{
pub fn new(private_key: Vec<u8>, address: Vec<u8>, network: Network, provider: P) -> Self {
impl NodeAccount {
pub fn new(private_key: Vec<u8>) -> Self {
let secret_key = libp2p::identity::ecdsa::SecretKey::try_from_bytes(private_key.as_slice())
.expect("Failed to create secret key from private key.");
let p2p_keypair =
libp2p::identity::Keypair::from(libp2p::identity::ecdsa::Keypair::from(secret_key));
let signing_key = SigningKey::from_secret_scalar(
FieldElement::from_byte_slice_be(private_key.as_slice()).unwrap(),
);
let signer = LocalWallet::from(signing_key.clone());
let address = FieldElement::from_byte_slice_be(address.as_slice()).unwrap();
let network = network.to_field_element();
let account =
SingleOwnerAccount::new(provider, signer, address, network, ExecutionEncoding::New);

Self { p2p_keypair, account, signing_key }
Self { p2p_keypair, signing_key }
}

pub fn get_keypair(&self) -> &libp2p::identity::Keypair {
&self.p2p_keypair
}

pub fn get_account(&self) -> &SingleOwnerAccount<P, LocalWallet> {
&self.account
}

pub fn get_provider(&self) -> &P {
self.account.provider()
}

pub fn get_signing_key(&self) -> &SigningKey {
&self.signing_key
}
Expand Down
4 changes: 2 additions & 2 deletions crates/delegator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ libp2p.workspace = true
serde.workspace = true
serde_json.workspace = true
zetina-common.workspace = true
zetina-compiler.workspace = true
zetina-peer.workspace = true
tokio.workspace = true
tracing-subscriber.workspace = true
Expand All @@ -31,4 +30,5 @@ axum.workspace = true
hyper.workspace = true
tower.workspace = true
hyper-util.workspace = true
tower-http.workspace = true
tower-http.workspace = true
clap.workspace = true
29 changes: 17 additions & 12 deletions crates/delegator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ use axum::{
routing::{get, post},
Router,
};
use clap::Parser;
use delegator::Delegator;
use libp2p::gossipsub;
use starknet::providers::{jsonrpc::HttpTransport, JsonRpcClient, Url};
use std::time::Duration;
use swarm::SwarmRunner;
use tokio::{
Expand All @@ -31,24 +31,28 @@ use zetina_common::{
topic::{gossipsub_ident_topic, Topic},
};

#[derive(Parser)]
struct Cli {
/// The private key as a hex string
#[arg(short, long)]
private_key: String,

#[arg(short, long)]
dial_addresses: Vec<String>,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let _ = tracing_subscriber::fmt().with_env_filter(EnvFilter::from_default_env()).try_init();

// Parse command line arguments
let cli = Cli::parse();

// TODO: common setup in node initiate binary
let network = Network::Sepolia;
let private_key =
hex::decode("018ef9563461ec2d88236d59039babf44c97d8bf6200d01d81170f1f60a78f32")?;
let account_address =
hex::decode("cdd51fbc4e008f4ef807eaf26f5043521ef5931bbb1e04032a25bd845d286b")?;
let url = "https://starknet-sepolia.public.blastapi.io";
let private_key = hex::decode(cli.private_key)?;

let node_account = NodeAccount::new(
private_key,
account_address,
network,
JsonRpcClient::new(HttpTransport::new(Url::parse(url)?)),
);
let node_account = NodeAccount::new(private_key);

// Generate topic
let job_topic = gossipsub_ident_topic(network, Topic::NewJob);
Expand All @@ -62,6 +66,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (job_topic_tx, job_topic_rx) = mpsc::channel::<Vec<u8>>(100);

SwarmRunner::new(
cli.dial_addresses,
node_account.get_keypair(),
vec![job_topic.to_owned(), picked_job_topic, finished_job_topic],
vec![(job_topic, job_topic_rx)],
Expand Down
40 changes: 20 additions & 20 deletions crates/delegator/src/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@ use futures::StreamExt;
use libp2p::gossipsub::{self, IdentTopic};
use libp2p::identity::Keypair;
use libp2p::swarm::{NetworkBehaviour, SwarmEvent};
use libp2p::{mdns, noise, tcp, yamux, SwarmBuilder};
use libp2p::{noise, tcp, yamux, Multiaddr, SwarmBuilder};
use std::error::Error;
use std::str::FromStr;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tracing::{debug, error};
use tracing::{debug, error, info};
use zetina_common::graceful_shutdown::shutdown_signal;

#[derive(NetworkBehaviour)]
pub struct PeerBehaviour {
gossipsub: gossipsub::Behaviour,
mdns: mdns::tokio::Behaviour,
}

pub struct SwarmRunner {
Expand All @@ -22,17 +22,14 @@ pub struct SwarmRunner {

impl SwarmRunner {
pub fn new(
dial_addresses: Vec<String>,
p2p_local_keypair: &Keypair,
subscribe_topics: Vec<IdentTopic>,
mut transmit_topics: Vec<(IdentTopic, mpsc::Receiver<Vec<u8>>)>,
swarm_events_tx: mpsc::Sender<gossipsub::Event>,
) -> Result<Self, Box<dyn Error>> {
let mdns = mdns::tokio::Behaviour::new(
mdns::Config::default(),
p2p_local_keypair.public().to_peer_id(),
)?;
let gossipsub = Self::init_gossip(p2p_local_keypair)?;
let behaviour = PeerBehaviour { gossipsub, mdns };
let behaviour = PeerBehaviour { gossipsub };
let local_keypair = p2p_local_keypair.clone();
let mut swarm = SwarmBuilder::with_existing_identity(local_keypair)
.with_tokio()
Expand All @@ -50,6 +47,13 @@ impl SwarmRunner {
swarm.behaviour_mut().gossipsub.subscribe(&topic)?;
}

// Reach out to other nodes if specified
for to_dial in dial_addresses {
let addr: Multiaddr = Multiaddr::from_str(&to_dial)?;
swarm.dial(addr)?;
info!("Dialed {to_dial:?}")
}

swarm.listen_on("/ip4/0.0.0.0/udp/5678/quic-v1".parse()?)?;
swarm.listen_on("/ip4/0.0.0.0/tcp/5679".parse()?)?;

Expand All @@ -66,18 +70,6 @@ impl SwarmRunner {
}
},
event = swarm.select_next_some() => match event {
SwarmEvent::Behaviour(PeerBehaviourEvent::Mdns(mdns::Event::Discovered(list))) => {
for (peer_id, _multiaddr) in list {
debug!("mDNS discovered a new peer: {peer_id}");
swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer_id);
}
},
SwarmEvent::Behaviour(PeerBehaviourEvent::Mdns(mdns::Event::Expired(list))) => {
for (peer_id, _multiaddr) in list {
debug!("mDNS discover peer has expired: {peer_id}");
swarm.behaviour_mut().gossipsub.remove_explicit_peer(&peer_id);
}
},
SwarmEvent::Behaviour(PeerBehaviourEvent::Gossipsub(gossipsub::Event::Message {
propagation_source,
message_id,
Expand All @@ -87,6 +79,14 @@ impl SwarmRunner {
},
SwarmEvent::NewListenAddr { address, .. } => {
debug!("Local node is listening on {address}");
},
SwarmEvent::ConnectionEstablished { peer_id, connection_id, num_established, .. } => {
info!{"ConnectionEstablished: peer_id {}, connection_id {}, num_established {}", peer_id, connection_id, num_established};
swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer_id);
}
SwarmEvent::ConnectionClosed { peer_id, connection_id, num_established, .. } => {
info!{"ConnectionClosed: peer_id {}, connection_id {}, num_established {}", peer_id, connection_id, num_established};
swarm.behaviour_mut().gossipsub.remove_explicit_peer(&peer_id);
}
_ => {}
},
Expand Down
3 changes: 2 additions & 1 deletion crates/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@ axum.workspace = true
hyper.workspace = true
tower.workspace = true
hyper-util.workspace = true
tower-http.workspace = true
tower-http.workspace = true
clap.workspace = true
62 changes: 57 additions & 5 deletions crates/executor/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,64 @@ impl Executor {
&& !job_record.is_empty()
{
if let Some(job) = job_record.take_job().await {
let serialized_job = serde_json::to_string(&job)?;
picked_job_topic_tx.send(serialized_job.into()).await?;
info!("Sent picked job event: {}", hash!(&job));
let mut flag = false;
if let Ok(event) = events_rx.try_recv() {
match event {
Event::Message { message, .. } => {
// Received a new-job message from the network
if message.topic
== gossipsub_ident_topic(
Network::Sepolia,
Topic::NewJob,
)
.into()
{
let job: Job = serde_json::from_slice(&message.data)?;
info!("Received a new job event: {}", hash!(&job));
job_record.register_job(job);
}
// Received a picked-job message from the network
if message.topic
== gossipsub_ident_topic(
Network::Sepolia,
Topic::PickedJob,
)
.into()
{
let job_removed: Job =
serde_json::from_slice(&message.data)?;
info!("Received picked job event: {}", hash!(&job));
job_record.remove_job(&job_removed);
if hash!(job_removed) == hash!(job) {
flag = true;
}
}
}
Event::Subscribed { peer_id, topic } => {
info!(
"{} subscribed to the topic {}",
peer_id.to_string(),
topic.to_string()
);
}
Event::Unsubscribed { peer_id, topic } => {
info!(
"{} unsubscribed to the topic {}",
peer_id.to_string(),
topic.to_string()
);
}
_ => {}
}
};
if flag == false {
let serialized_job = serde_json::to_string(&job)?;
picked_job_topic_tx.send(serialized_job.into()).await?;
info!("Sent picked job event: {}", hash!(&job));

info!("Scheduled run of job: {}", hash!(&job));
runner_scheduler.push(runner.run(job)?);
info!("Scheduled run of job: {}", hash!(&job));
runner_scheduler.push(runner.run(job)?);
}
}
}
}
Expand Down
Loading

0 comments on commit fa2cf1f

Please sign in to comment.