From e518987b82966ef4f180d80aaeccd260fe39c2f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lancelot=20de=20Ferri=C3=A8re?= Date: Wed, 16 Oct 2024 11:07:44 +0200 Subject: [PATCH 1/3] Fix run_indexer CLI argument --- src/bin/node.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/bin/node.rs b/src/bin/node.rs index 6fadb8b54..fc857a1c9 100644 --- a/src/bin/node.rs +++ b/src/bin/node.rs @@ -32,7 +32,7 @@ pub struct Args { #[arg(long, default_value = None)] pub data_directory: Option, - #[arg(long, action = clap::ArgAction::SetTrue)] + #[arg(long)] pub run_indexer: Option, #[arg(long, default_value = "config.ron")] From 848dc3dbda0a036584b4b2cca1d0cf71bdc1a81e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lancelot=20de=20Ferri=C3=A8re?= Date: Tue, 15 Oct 2024 15:24:19 +0200 Subject: [PATCH 2/3] Rework e2e tests to make it easier to run processes + include PG --- Cargo.lock | 68 +++++----- Cargo.toml | 1 + src/consensus.rs | 2 +- src/indexer.rs | 1 + src/p2p.rs | 5 +- tests/node1/config.ron => src/tools/node1.ron | 0 tests/node2/config.ron => src/tools/node2.ron | 0 src/utils/conf.rs | 10 +- src/utils/conf_defaults.ron | 5 +- tests/e2e_test.rs | 121 +++++++++--------- tests/test_helpers.rs | 107 +++++++++++----- 11 files changed, 176 insertions(+), 144 deletions(-) rename tests/node1/config.ron => src/tools/node1.ron (100%) rename tests/node2/config.ron => src/tools/node2.ron (100%) diff --git a/Cargo.lock b/Cargo.lock index e45ca9070..c5ffdb3c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -760,9 +760,9 @@ checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" [[package]] name = "bytemuck" -version = "1.18.0" +version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94bbb0ad554ad961ddc5da507a12a29b14e4ae5bda06b19f575a3e6079d2e2ae" +checksum = "8334215b81e418a0a7bdb8ef0849474f40bb10c8b71f1c4ed315cff49f32494d" dependencies = [ "bytemuck_derive", ] @@ -887,9 +887,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.1.28" +version = "1.1.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e80e3b6a3ab07840e1cae9b0666a63970dc28e8ed5ffbcdacbfc760c281bfc1" +checksum = "b16803a61b81d9eabb7eae2588776c4c1e584b738ede45fdbb4c972cec1e9945" dependencies = [ "jobserver", "libc", @@ -2247,9 +2247,9 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.71" +version = "0.3.72" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0cb94a0ffd3f3ee755c20f7d8752f45cac88605a4dcf808abcff72873296ec7b" +checksum = "6a88f1bda2bd75b0452a14784937d796722fdebfe50df998aeb3f0b7603019a9" dependencies = [ "wasm-bindgen", ] @@ -2892,9 +2892,9 @@ checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" [[package]] name = "pathdiff" -version = "0.2.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8835116a5c179084a830efb3adc117ab007512b535bc1a21c991d3b32a6b44dd" +checksum = "d61c5ce1153ab5b689d0c074c4e7fc613e942dfb7dd9eea5ab202d2ad91fe361" [[package]] name = "pem-rfc7468" @@ -2913,9 +2913,9 @@ checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" [[package]] name = "pest" -version = "2.7.13" +version = "2.7.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fdbef9d1d47087a895abd220ed25eb4ad973a5e26f6a4367b038c25e28dfc2d9" +checksum = "879952a81a83930934cbf1786752d6dedc3b1f29e8f8fb2ad1d0a36f377cf442" dependencies = [ "memchr", "thiserror", @@ -2924,9 +2924,9 @@ dependencies = [ [[package]] name = "pest_derive" -version = "2.7.13" +version = "2.7.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d3a6e3394ec80feb3b6393c725571754c6188490265c61aaf260810d6b95aa0" +checksum = "d214365f632b123a47fd913301e14c946c61d1c183ee245fa76eb752e59a02dd" dependencies = [ "pest", "pest_generator", @@ -2934,9 +2934,9 @@ dependencies = [ [[package]] name = "pest_generator" -version = "2.7.13" +version = "2.7.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94429506bde1ca69d1b5601962c73f4172ab4726571a59ea95931218cb0e930e" +checksum = "eb55586734301717aea2ac313f50b2eb8f60d2fc3dc01d190eefa2e625f60c4e" dependencies = [ "pest", "pest_meta", @@ -2947,9 +2947,9 @@ dependencies = [ [[package]] name = "pest_meta" -version = "2.7.13" +version = "2.7.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac8a071862e93690b6e34e9a5fb8e33ff3734473ac0245b27232222c4906a33f" +checksum = "b75da2a70cf4d9cb76833c990ac9cd3923c9a8905a8929789ce347c84564d03d" dependencies = [ "once_cell", "pest", @@ -3797,9 +3797,9 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.9.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e696e35370c65c9c541198af4543ccd580cf17fc25d8e05c5a242b202488c55" +checksum = "16f1201b3c9a7ee8039bcadc17b7e605e2945b27eee7631788c1bd2b0643674b" [[package]] name = "rustls-webpki" @@ -3814,9 +3814,9 @@ dependencies = [ [[package]] name = "rustversion" -version = "1.0.17" +version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "955d28af4278de8121b7ebeb796b6a45735dc01436d898801014aced2773a3d6" +checksum = "0e819f2bc632f285be6d7cd36e25940d45b2391dd6d9b939e79de557f7014248" [[package]] name = "rusty-fork" @@ -5205,9 +5205,9 @@ checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b" [[package]] name = "wasm-bindgen" -version = "0.2.94" +version = "0.2.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef073ced962d62984fb38a36e5fdc1a2b23c9e0e1fa0689bb97afa4202ef6887" +checksum = "128d1e363af62632b8eb57219c8fd7877144af57558fb2ef0368d0087bddeb2e" dependencies = [ "cfg-if", "once_cell", @@ -5216,9 +5216,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.94" +version = "0.2.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4bfab14ef75323f4eb75fa52ee0a3fb59611977fd3240da19b2cf36ff85030e" +checksum = "cb6dd4d3ca0ddffd1dd1c9c04f94b868c37ff5fac97c30b97cff2d74fce3a358" dependencies = [ "bumpalo", "log", @@ -5231,9 +5231,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.44" +version = "0.4.45" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "65471f79c1022ffa5291d33520cbbb53b7687b01c2f8e83b57d102eed7ed479d" +checksum = "cc7ec4f8827a71586374db3e87abdb5a2bb3a15afed140221307c3ec06b1f63b" dependencies = [ "cfg-if", "js-sys", @@ -5243,9 +5243,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.94" +version = "0.2.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7bec9830f60924d9ceb3ef99d55c155be8afa76954edffbb5936ff4509474e7" +checksum = "e79384be7f8f5a9dd5d7167216f022090cf1f9ec128e6e6a482a2cb5c5422c56" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -5253,9 +5253,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.94" +version = "0.2.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c74f6e152a76a2ad448e223b0fc0b6b5747649c3d769cc6bf45737bf97d0ed6" +checksum = "26c6ab57572f7a24a4985830b120de1594465e5d500f24afe89e16b4e833ef68" dependencies = [ "proc-macro2", "quote", @@ -5266,9 +5266,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.94" +version = "0.2.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a42f6c679374623f295a8623adfe63d9284091245c3504bde47c17a3ce2777d9" +checksum = "65fc09f10666a9f147042251e0dda9c18f166ff7de300607007e96bdebc1068d" [[package]] name = "wasm-streams" @@ -5285,9 +5285,9 @@ dependencies = [ [[package]] name = "web-sys" -version = "0.3.71" +version = "0.3.72" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44188d185b5bdcae1052d08bcbcf9091a5524038d4572cc4f4f2bb9d5554ddd9" +checksum = "f6488b90108c040df0fe62fa815cbdee25124641df01814dd7282749234c6112" dependencies = [ "js-sys", "wasm-bindgen", diff --git a/Cargo.toml b/Cargo.toml index 0807eb8b5..a83115e09 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -73,6 +73,7 @@ test-log = { version = "0.2.16", features = [ ], default-features = false } tokio-stream = "0.1.16" tempfile = "3.13.0" +testcontainers-modules = { version = "0.11.2", features = ["postgres"] } [features] dhat = ["dep:dhat"] diff --git a/src/consensus.rs b/src/consensus.rs index 2c9cfd553..2765fab15 100644 --- a/src/consensus.rs +++ b/src/consensus.rs @@ -619,7 +619,7 @@ impl Consensus { fn send_candidacy(&mut self) -> Result<()> { let candidacy = ValidatorCandidacy { pubkey: self.crypto.validator_pubkey().clone(), - peer_address: self.config.peer_addr(), + peer_address: self.config.host.clone(), }; info!( "📝 Sending candidacy message to be part of consensus. {}", diff --git a/src/indexer.rs b/src/indexer.rs index 9e126cd22..b03ed62ff 100644 --- a/src/indexer.rs +++ b/src/indexer.rs @@ -52,6 +52,7 @@ impl Module for Indexer { let inner = PgPoolOptions::new() .max_connections(20) + .acquire_timeout(std::time::Duration::from_secs(1)) .connect(&ctx.config.database_url) .await .context("Failed to connect to the database")?; diff --git a/src/p2p.rs b/src/p2p.rs index 85ae9e7f1..f87a288f0 100644 --- a/src/p2p.rs +++ b/src/p2p.rs @@ -114,9 +114,8 @@ impl P2P { self.spawn_peer(peer); } - let listener = TcpListener::bind(self.config.addr()).await?; - let (addr, port) = self.config.addr(); - info!("p2p listening on {}:{}", addr, port); + let listener = TcpListener::bind(&self.config.host).await?; + info!("p2p listening on {}", listener.local_addr()?); handle_messages! { on_bus self.bus_client, diff --git a/tests/node1/config.ron b/src/tools/node1.ron similarity index 100% rename from tests/node1/config.ron rename to src/tools/node1.ron diff --git a/tests/node2/config.ron b/src/tools/node2.ron similarity index 100% rename from tests/node2/config.ron rename to src/tools/node2.ron diff --git a/src/utils/conf.rs b/src/utils/conf.rs index 5dfecb390..128229bfa 100644 --- a/src/utils/conf.rs +++ b/src/utils/conf.rs @@ -21,9 +21,8 @@ pub type SharedConf = Arc; #[derive(Serialize, Deserialize, Clone, Debug, Default)] pub struct Conf { - port: u16, - host: String, pub id: String, + pub host: String, pub peers: Vec, pub storage: Storage, pub consensus: Consensus, @@ -36,13 +35,6 @@ pub struct Conf { } impl Conf { - pub fn addr(&self) -> (&str, u16) { - (&self.host, self.port) - } - pub fn peer_addr(&self) -> String { - format!("{}:{}", self.host, self.port) - } - pub fn new( config_file: Option, data_directory: Option, diff --git a/src/utils/conf_defaults.ron b/src/utils/conf_defaults.ron index bef6fdbc5..0b0a4c6b8 100644 --- a/src/utils/conf_defaults.ron +++ b/src/utils/conf_defaults.ron @@ -1,14 +1,13 @@ Config( id: "single-node", - port: 1231, - host: "127.0.0.1", + host: "127.0.0.1:1231", peers: [], storage: Storage( interval: 10 ), rest: "127.0.0.1:4321", data_directory: "data_node1", - database_url: "postgres://postgres:password@localhost:5432/postgres", + database_url: "postgres://postgres:postgres@localhost:5432/postgres", consensus: Consensus ( slot_duration: 0, ), diff --git a/tests/e2e_test.rs b/tests/e2e_test.rs index 0b8a2a4d2..61ddcf1db 100644 --- a/tests/e2e_test.rs +++ b/tests/e2e_test.rs @@ -1,5 +1,5 @@ +use assertables::assert_ok; use hyle::{ - consensus::{Consensus, ConsensusStore}, indexer::model::ContractDb, model::{ Blob, BlobData, BlobReference, BlobTransaction, ContractName, Identity, ProofTransaction, @@ -7,21 +7,15 @@ use hyle::{ }, node_state::model::Contract, rest::client::ApiHttpClient, - utils::modules::Module, }; use reqwest::{Client, Url}; -use std::{ - fs::{self, File}, - io::Read, - path::Path, - time, -}; -use test_helpers::NodeType; +use std::{fs::File, io::Read, time}; +use test_helpers::ConfMaker; use tokio::time::sleep; mod test_helpers; -use anyhow::Result; +use anyhow::{Context, Result}; pub fn load_encoded_receipt_from_file(path: &str) -> Vec { let mut file = File::open(path).expect("Failed to open proof file"); @@ -101,7 +95,7 @@ async fn verify_contract_state(client: &ApiHttpClient) -> Result<()> { } async fn register_test_contracts(client: &ApiHttpClient) -> Result<()> { - assert!(client + assert_ok!(client .send_tx_register_contract(&RegisterContractTransaction { owner: "test".to_string(), verifier: "test".to_string(), @@ -109,11 +103,10 @@ async fn register_test_contracts(client: &ApiHttpClient) -> Result<()> { state_digest: StateDigest(vec![0, 1, 2, 3]), contract_name: ContractName("c1".to_string()), }) - .await? - .status() - .is_success()); + .await + .and_then(|response| response.error_for_status().context("registering contract"))); - assert!(client + assert_ok!(client .send_tx_register_contract(&RegisterContractTransaction { owner: "test".to_string(), verifier: "test".to_string(), @@ -121,9 +114,8 @@ async fn register_test_contracts(client: &ApiHttpClient) -> Result<()> { state_digest: StateDigest(vec![0, 1, 2, 3]), contract_name: ContractName("c2".to_string()), }) - .await? - .status() - .is_success()); + .await + .and_then(|response| response.error_for_status().context("registering contract"))); Ok(()) } @@ -143,13 +135,14 @@ async fn send_test_blobs_and_proofs(client: &ApiHttpClient) -> Result<()> { }, ], }) - .await?; + .await + .and_then(|response| response.error_for_status().context("sending tx")); - assert!(blob_response.status().is_success()); + assert_ok!(blob_response); - let blob_tx_hash = blob_response.json::().await?; + let blob_tx_hash = blob_response.unwrap().json::().await?; - assert!(client + assert_ok!(client .send_tx_proof(&ProofTransaction { blobs_references: vec![ BlobReference { @@ -165,18 +158,20 @@ async fn send_test_blobs_and_proofs(client: &ApiHttpClient) -> Result<()> { ], proof: vec![5, 5] }) - .await? - .status() - .is_success()); + .await + .and_then(|response| response.error_for_status().context("sending tx"))); Ok(()) } async fn verify_test_contract_state(client: &ApiHttpClient) -> Result<()> { - let response = client.get_contract(&ContractName("c1".to_string())).await?; - assert!(response.status().is_success(), "{}", response.status()); + let response = client + .get_contract(&ContractName("c1".to_string())) + .await + .and_then(|response| response.error_for_status().context("Getting contract")); + assert_ok!(response); - let contract = response.json::().await?; + let contract = response.unwrap().json::().await?; assert_eq!(contract.state.0, vec![4, 5, 6]); Ok(()) @@ -185,41 +180,61 @@ async fn verify_test_contract_state(client: &ApiHttpClient) -> Result<()> { async fn verify_indexer(client: &ApiHttpClient) -> Result<()> { let response = client .get_indexer_contract(&ContractName("c1".to_string())) - .await?; - assert!(response.status().is_success(), "{}", response.status()); + .await + .and_then(|response| response.error_for_status().context("Getting contract")); + assert_ok!(response); - let contract = response.json::().await?; + let contract = response.unwrap().json::().await?; // The indexer returns the initial state assert_eq!(contract.state_digest, vec![0, 1, 2, 3]); Ok(()) } +use testcontainers_modules::{ + postgres::Postgres, + testcontainers::core::IntoContainerPort, + testcontainers::{runners::AsyncRunner, ImageExt}, +}; + #[tokio::test] async fn e2e() -> Result<()> { tracing_subscriber::fmt::init(); - // FIXME: use tmp dir - let path_node1 = Path::new("tests/node1"); - let path_node2 = Path::new("tests/node2"); + // Start postgres DB with default settings for the indexer. + let _pg = Postgres::default() + .with_mapped_port(5432, 5432.tcp()) + .start() + .await + .unwrap(); - // Clean created files - _ = fs::remove_dir_all(path_node1.join("data_node1")); - _ = fs::remove_dir_all(path_node2.join("data_node2")); + let mut conf_maker = ConfMaker::default(); // Start 2 nodes - let node1 = test_helpers::TestNode::new(path_node1, NodeType::Node, "6668"); - let node2 = test_helpers::TestNode::new(path_node2, NodeType::Node, "6669"); + let node1 = test_helpers::TestProcess::new("node", conf_maker.build()) + .log("info") + .start(); + // Wait for node to properly spin up + sleep(time::Duration::from_secs(1)).await; + let mut node2_conf = conf_maker.build(); + node2_conf.peers = vec![node1.conf.host.clone()]; + let node2 = test_helpers::TestProcess::new("node", node2_conf) + .log("error") + .start(); // Wait for node to properly spin up - sleep(time::Duration::from_secs(2)).await; + sleep(time::Duration::from_secs(5)).await; // Start indexer - let indexer = test_helpers::TestNode::new(path_node1, NodeType::Indexer, "6670"); + let mut indexer_conf = conf_maker.build(); + indexer_conf.da_address = node2.conf.da_address.clone(); + let indexer = test_helpers::TestProcess::new("indexer", indexer_conf) + .log("error") + .start(); // Request something on node1 to be sure it's alive and working let client_node1 = ApiHttpClient { - url: Url::parse("http://localhost:4321").unwrap(), + url: Url::parse(&format!("http://{}", &node1.conf.rest)).unwrap(), reqwest_client: Client::new(), }; @@ -231,37 +246,19 @@ async fn e2e() -> Result<()> { send_blobs_and_proofs(&client_node1).await?; // Wait for some slots to be finished - sleep(time::Duration::from_secs(15)).await; + sleep(time::Duration::from_secs(10)).await; verify_test_contract_state(&client_node1).await?; verify_contract_state(&client_node1).await?; // Check that the indexer did index things let client_indexer = ApiHttpClient { - url: Url::parse("http://localhost:5544").unwrap(), + url: Url::parse(&format!("http://{}", &indexer.conf.rest)).unwrap(), reqwest_client: Client::new(), }; verify_indexer(&client_indexer).await?; - // Stop all processes - drop(indexer); - drop(node1); - drop(node2); - - // Check that some blocks has been produced - let node1_consensus: ConsensusStore = - Consensus::load_from_disk_or_default(path_node1.join("data_node1/consensus.bin").as_path()); - let node2_consensus: ConsensusStore = - Consensus::load_from_disk_or_default(path_node2.join("data_node2/consensus.bin").as_path()); - assert!(!node1_consensus.blocks.is_empty()); - assert!(!node2_consensus.blocks.is_empty()); - // FIXME: check that created blocks are the same. - - // Clean created files - fs::remove_dir_all(path_node1.join("data_node1")).expect("file cleaning failed"); - fs::remove_dir_all(path_node2.join("data_node2")).expect("file cleaning failed"); - //TODO: compare blocks from node1 and node2 Ok(()) diff --git a/tests/test_helpers.rs b/tests/test_helpers.rs index dc1e39030..3f7065901 100644 --- a/tests/test_helpers.rs +++ b/tests/test_helpers.rs @@ -1,50 +1,93 @@ use assert_cmd::prelude::*; -use std::{ - path::Path, - process::{Child, Command}, -}; +use hyle::utils::conf::Conf; +use std::process::{Child, Command}; +use tempfile::TempDir; -pub struct TestNode { - child: Child, +#[derive(Default)] +pub struct ConfMaker(u16); + +impl ConfMaker { + pub fn build(&mut self) -> Conf { + let defaults = Conf::new(None, None, None).unwrap(); + self.0 += 1; + Conf { + id: format!("node-{}", self.0), + host: format!("localhost:{}", 3000 + self.0), + da_address: format!("localhost:{}", 4000 + self.0), + rest: format!("localhost:{}", 5000 + self.0), + run_indexer: false, // disable indexer by default to avoid needed PG + ..defaults.clone() + } + } } -pub enum NodeType { - Node, +enum TestProcessState { + Command(Command), + Child(Child), +} + +pub struct TestProcess { + pub conf: Conf, #[allow(dead_code)] - Client, - Indexer, + pub dir: TempDir, + state: TestProcessState, } -impl TestNode { - // Create a new process that spins up a node or a client - pub fn new(config_path: &Path, node_type: NodeType, console_bind_port: &str) -> Self { - let mut cargo_bin = Command::cargo_bin(match node_type { - NodeType::Node => "node", - NodeType::Client => "client", - NodeType::Indexer => "indexer", - }) - .unwrap(); - let mut cmd = cargo_bin.current_dir(config_path); - match node_type { - NodeType::Client => cmd = cmd.arg("send").arg("blob").arg("data/tx1_blob.ron"), - NodeType::Indexer => cmd = cmd.env("HYLE_REST", "127.0.0.1:5544"), - _ => (), - } +impl TestProcess { + pub fn new(command: &str, mut conf: Conf) -> Self { + let mut cargo_bin = Command::cargo_bin(command).unwrap(); + + // Create a temporary directory for the node + let tmpdir = tempfile::Builder::new().prefix("hyle").tempdir().unwrap(); + let cmd = cargo_bin.current_dir(&tmpdir); - // When spinning up multiple node, they need to use different ports for tracing + conf.data_directory = tmpdir.path().to_path_buf(); + // Serialize the configuration to a file + let conf_file = tmpdir.path().join("config.ron"); + ron::ser::to_writer(std::fs::File::create(&conf_file).unwrap(), &conf).unwrap(); + + let console_port: u16 = conf.da_address.split(':').last().unwrap().parse().unwrap(); cmd.env( "TOKIO_CONSOLE_BIND", - format!("127.0.0.1:{}", console_bind_port), + format!("127.0.0.1:{}", console_port + 10000), ); - let child = cmd.spawn().expect("Failed to start node"); - TestNode { child } + Self { + conf, + dir: tmpdir, + state: TestProcessState::Command(cargo_bin), + } + } + + pub fn log(mut self, level: &str) -> Self { + if let TestProcessState::Command(cmd) = &mut self.state { + cmd.env("RUST_LOG", level); + }; + self + } + + pub fn start(mut self) -> Self { + self.state = match &mut self.state { + TestProcessState::Command(cmd) => { + println!("Starting process: {:?}", cmd); + TestProcessState::Child(cmd.spawn().unwrap()) + } + TestProcessState::Child(child) => { + panic!("Process already started: {:?}", child.id()); + } + }; + self } } // Drop implem to be sure that process is well stopped -impl Drop for TestNode { +impl Drop for TestProcess { fn drop(&mut self) { - let _ = self.child.kill(); // Kill child process if still active - let _ = self.child.wait(); // Wait for end of process + match &mut self.state { + TestProcessState::Command(_) => (), + TestProcessState::Child(child) => { + child.kill().unwrap(); + child.wait().unwrap(); + } + } } } From d320bfabf59d25e965b4c07a64fa3e5e42e3f5ff Mon Sep 17 00:00:00 2001 From: Alex Boussinet Date: Wed, 16 Oct 2024 16:33:26 +0200 Subject: [PATCH 3/3] feat cut (#211) * avoid re-sending sync requests keep track of previous sent sync requests in order to avoid resending one. * improve data_proposal handling do not make empty data proposals ignore empty data proposals do not make a batch with already used Cars * rename DataProposal.inner to txs * storage refactoring * implement data dissemination cut * implement data dissemination cut * rename Cut to CutWithTxs. prepare Cut without txs * remove previous_sync_requests logic * reduce mempool log details --- src/consensus.rs | 42 ++-- src/mempool.rs | 197 +++++++++--------- src/mempool/storage.rs | 445 ++++++++++++++++++++++++----------------- src/model.rs | 2 +- 4 files changed, 378 insertions(+), 308 deletions(-) diff --git a/src/consensus.rs b/src/consensus.rs index 2765fab15..10a8bfec1 100644 --- a/src/consensus.rs +++ b/src/consensus.rs @@ -17,7 +17,7 @@ use tracing::{debug, info, warn}; use crate::{ bus::{bus_client, command_response::Query, BusMessage, SharedMessageBus}, handle_messages, - mempool::{Batch, BatchInfo, MempoolCommand, MempoolEvent, MempoolResponse}, + mempool::{CutWithTxs, Cut, MempoolCommand, MempoolEvent, MempoolResponse}, model::{ get_current_timestamp, Block, BlockHash, BlockHeight, Hashable, Transaction, TransactionData, ValidatorPublicKey, @@ -71,7 +71,7 @@ pub enum ConsensusCommand { pub enum ConsensusEvent { CommitBlock { validators: Vec, - batch_info: BatchInfo, + cut_lanes: Cut, block: Block, }, } @@ -117,7 +117,7 @@ pub struct ConsensusProposal { slot: Slot, view: u64, next_leader: u64, - batch_info: BatchInfo, + cut_lanes: Cut, previous_consensus_proposal_hash: ConsensusProposalHash, previous_commit_quorum_certificate: QuorumCertificate, block: Block, // FIXME: Block ou cut ? @@ -149,7 +149,7 @@ pub struct ConsensusStore { buffered_invalid_proposals: HashMap, // FIXME: pub is here for testing pub blocks: Vec, - pending_batches: Vec, + pending_cuts: Vec, } pub struct Consensus { @@ -201,17 +201,17 @@ impl Consensus { .collect(); // Create block to-be-proposed - let batch = if self.pending_batches.is_empty() { - Batch::default() + let cut = if self.pending_cuts.is_empty() { + CutWithTxs::default() } else { - self.pending_batches.remove(0) + self.pending_cuts.remove(0) }; let block = Block { parent_hash, height: parent_height + 1, timestamp: get_current_timestamp(), new_bonded_validators, - txs: batch.txs, + txs: cut.txs, }; let validators = self.bft_round_state.staking.bonded(); @@ -221,7 +221,7 @@ impl Consensus { slot: self.bft_round_state.slot, view: self.bft_round_state.view, next_leader: (self.bft_round_state.leader_index + 1) % validators.len() as u64, - batch_info: batch.info, + cut_lanes: cut.tips, previous_consensus_proposal_hash, previous_commit_quorum_certificate, validators, @@ -271,7 +271,7 @@ impl Consensus { slot: self.bft_round_state.slot, view: self.bft_round_state.view, next_leader: 1, - batch_info: BatchInfo::new(self.crypto.validator_pubkey().clone()), + cut_lanes: Cut::default(), previous_consensus_proposal_hash: ConsensusProposalHash(vec![]), previous_commit_quorum_certificate: QuorumCertificate::default(), validators, @@ -300,7 +300,7 @@ impl Consensus { .bus .send(ConsensusEvent::CommitBlock { validators: self.bft_round_state.consensus_proposal.validators.clone(), - batch_info: self.bft_round_state.consensus_proposal.batch_info.clone(), + cut_lanes: self.bft_round_state.consensus_proposal.cut_lanes.clone(), block: self.bft_round_state.consensus_proposal.block.clone(), }) .context("Failed to send ConsensusEvent::CommitBlock msg on the bus")?; @@ -1223,10 +1223,10 @@ impl Consensus { fn handle_command(&mut self, msg: ConsensusCommand) -> Result<()> { match msg { ConsensusCommand::SingleNodeBlockGeneration(block_number) => { - let batch = if self.pending_batches.is_empty() { - Batch::default() + let cut = if self.pending_cuts.is_empty() { + CutWithTxs::default() } else { - self.pending_batches.remove(0) + self.pending_cuts.remove(0) }; let parent_hash: String = rand::Rng::sample_iter(rand::thread_rng(), &rand::distributions::Alphanumeric) @@ -1238,13 +1238,13 @@ impl Consensus { height: BlockHeight(block_number), timestamp: get_current_timestamp(), new_bonded_validators: vec![], - txs: batch.txs, + txs: cut.txs, }; _ = self .bus .send(ConsensusEvent::CommitBlock { validators: self.bft_round_state.consensus_proposal.validators.clone(), - batch_info: batch.info, + cut_lanes: cut.tips, block: block.clone(), }) .context("Failed to send ConsensusEvent::CommitBlock msg on the bus")?; @@ -1270,13 +1270,9 @@ impl Consensus { async fn handle_mempool_event(&mut self, msg: MempoolEvent) -> Result<()> { match msg { - MempoolEvent::LatestBatch(batch) => { - debug!( - "Received batch from {} with txs: {:?} pos {} parent {:?}", - batch.info.validator, batch.txs, batch.info.tip.pos, batch.info.tip.parent, - ); - self.pending_batches.push(batch); - + MempoolEvent::NewCut(cut) => { + debug!("Received a new cut"); + self.pending_cuts.push(cut); Ok(()) } } diff --git a/src/mempool.rs b/src/mempool.rs index 95c7d3f2d..94553ca06 100644 --- a/src/mempool.rs +++ b/src/mempool.rs @@ -19,10 +19,12 @@ use bincode::{Decode, Encode}; use metrics::MempoolMetrics; use serde::{Deserialize, Serialize}; use std::{collections::HashSet, sync::Arc}; +use storage::ProposalVerdict; use tracing::{debug, error, info, warn}; mod metrics; mod storage; +pub use storage::{Cut, CutWithTxs}; bus_client! { struct MempoolBusClient { @@ -82,7 +84,7 @@ pub struct Batch { #[derive(Debug, Clone, Deserialize, Serialize)] pub enum MempoolEvent { - LatestBatch(Batch), + NewCut(CutWithTxs), } impl BusMessage for MempoolEvent {} @@ -142,11 +144,11 @@ impl Mempool { match event { ConsensusEvent::CommitBlock { validators, - batch_info, - block, + cut_lanes, + .. } => { self.validators = validators; - self.storage.update_lanes_after_commit(batch_info, block); + self.storage.update_lanes_after_commit(cut_lanes); } } } @@ -184,7 +186,7 @@ impl Mempool { self.metrics.signature_error("mempool"); warn!("Invalid signature for message {:?}", msg); } - Err(e) => error!("Error while checking signed message: {}", e), + Err(e) => error!("Error while checking signed message: {:?}", e), } } @@ -205,7 +207,7 @@ impl Mempool { validator: &ValidatorPublicKey, missing_cars: Vec, ) -> Result<()> { - info!("{} SyncReply from sender {validator}", self.storage.id); + info!("{} SyncReply from validator {validator}", self.storage.id); debug!( "{} adding {} missing cars to lane {validator}", @@ -232,7 +234,10 @@ impl Mempool { data_proposal: DataProposal, last_index: Option, ) { - info!("{} SyncRequest received from sender {validator} for last_index {:?} with data proposal {} \n{}", self.storage.id, last_index, data_proposal, self.storage); + info!( + "{} SyncRequest received from validator {validator} for last_index {:?}", + self.storage.id, last_index + ); let missing_cars = self.storage.get_missing_cars(last_index, &data_proposal); debug!("Missing cars on {} are {:?}", validator, missing_cars); @@ -248,14 +253,15 @@ impl Mempool { } async fn on_data_vote(&mut self, validator: &ValidatorPublicKey, data_proposal: DataProposal) { - debug!("Vote received from sender {}", validator); - match self.storage.add_data_vote(validator, &data_proposal) { - Some(_) => { - debug!("{} DataVote from {}", self.storage.id, validator) - } - None => { - error!("{} unexpected DataVote from {}", self.storage.id, validator) - } + debug!("Vote received from validator {}", validator); + if self + .storage + .new_data_vote(validator, &data_proposal) + .is_some() + { + debug!("{} DataVote from {}", self.storage.id, validator) + } else { + error!("{} unexpected DataVote from {}", self.storage.id, validator) } } @@ -264,99 +270,86 @@ impl Mempool { validator: &ValidatorPublicKey, data_proposal: DataProposal, ) -> Result<()> { - if self.storage.has_data_proposal(validator, &data_proposal) - || self.storage.append_data_proposal(validator, &data_proposal) - { - // Normal case, we receive a proposal we already have the parent in store - self.send_vote(validator, data_proposal).await - } else { - //We dont have the parent, so we push the data proposal in the waiting room and craft a sync demand - self.storage - .push_data_proposal_into_waiting_room(validator, data_proposal.clone()); - - let last_available_index = self.storage.get_last_data_index(validator); + if data_proposal.txs.is_empty() { + warn!( + "received empty data proposal from {}, ignoring...", + validator + ); + return Ok(()); + } + match self.storage.new_data_proposal(validator, &data_proposal)? { + ProposalVerdict::Vote => { + // Normal case, we receive a proposal we already have the parent in store + self.send_vote(validator, data_proposal).await + } + ProposalVerdict::Wait(last_index) => { + //We dont have the parent, so we craft a sync demand + debug!("Emitting sync request with local state {} last_available_index {:?} and data_proposal {}", self.storage, last_index, data_proposal); - debug!("Emitting sync request with local state {} last_available_index {:?} and data_proposal {}", self.storage, last_available_index, data_proposal); + self.send_sync_request(validator, data_proposal, last_index) + .await + } + } + } - self.send_sync_request(validator, data_proposal, last_available_index) - .await + async fn try_data_proposal(&mut self, poa: Option>) { + let pending_txs = self.storage.flush_pending_txs(); + if pending_txs.is_empty() { + return; + } + let tip_id = self.storage.add_data_to_local_lane(pending_txs.clone()); + let data_proposal = DataProposal { + txs: pending_txs, + pos: tip_id, + parent: if tip_id == 1 { None } else { Some(tip_id - 1) }, + parent_poa: poa, + }; + + if let Err(e) = self.broadcast_data_proposal(data_proposal).await { + error!("{:?}", e); } } async fn check_data_proposal(&mut self) { - match self.storage.tip_data() { - Some((tip, txs)) => { - let nb_validators = self.validators.len(); - if tip.votes.len() > nb_validators / 3 { - let requests = self.storage.flush_pending_txs(); - // Create tx chunk and broadcast it - let tip_id = self.storage.add_data_to_local_lane(requests.clone()); - - let data_proposal = DataProposal { - inner: requests, - pos: tip_id, - parent: if tip_id == 1 { None } else { Some(tip_id - 1) }, - parent_poa: Some(tip.votes.clone()), - }; - - if let Err(e) = self.broadcast_data_proposal(data_proposal).await { - error!("{:?}", e); - } - - let txs_len = txs.len(); - if let Err(e) = self - .bus - .send(MempoolEvent::LatestBatch(Batch { - info: BatchInfo { - validator: self.crypto.validator_pubkey().clone(), - tip, - }, - txs, - })) - .context("Cannot send message over channel") - { - error!("{:?}", e); - } else { - self.metrics.add_batch(); - self.metrics.snapshot_batched_tx(txs_len); - } - } else { - // No PoA means we rebroadcast the data proposal for non present voters - let only_for = HashSet::from_iter( - self.validators - .iter() - .filter(|pubkey| !tip.votes.contains(pubkey)) - .cloned(), - ); - if let Err(e) = self.broadcast_data_proposal_only_for( - only_for, - DataProposal { - inner: txs, - pos: tip.pos, - parent: tip.parent, - parent_poa: None, // TODO: fetch parent votes - }, - ) { - error!("{:?}", e); - } - } + if self.storage.tip_already_used() { + return; + } + if let Some(cut) = self.storage.try_a_new_cut(self.validators.len()) { + let poa = self.storage.tip_poa(); + self.try_data_proposal(poa).await; + let total_txs = cut.txs.len(); + if let Err(e) = self + .bus + .send(MempoolEvent::NewCut(cut)) + .context("Cannot send message over channel") + { + error!("{:?}", e); + } else { + self.metrics.add_batch(); + self.metrics.snapshot_batched_tx(total_txs); } - None => { - // Genesis create a mono tx chunk and broadcast it - let pending_txs = self.storage.flush_pending_txs(); - let tip_id = self.storage.add_data_to_local_lane(pending_txs.clone()); - - let data_proposal = DataProposal { - inner: pending_txs, - pos: tip_id, - parent: None, - parent_poa: None, - }; - - if let Err(e) = self.broadcast_data_proposal(data_proposal).await { - error!("{:?}", e) - } + } else if let Some((tip, txs)) = self.storage.tip_data() { + // No PoA means we rebroadcast the data proposal for non present voters + let only_for = HashSet::from_iter( + self.validators + .iter() + .filter(|pubkey| !tip.poa.contains(pubkey)) + .cloned(), + ); + if let Err(e) = self.broadcast_data_proposal_only_for( + only_for, + DataProposal { + txs, + pos: tip.pos, + parent: tip.parent, + parent_poa: None, // TODO: fetch parent votes + }, + ) { + error!("{:?}", e); } + } else { + // Genesis create a mono tx chunk and broadcast it + self.try_data_proposal(None).await; } } @@ -380,6 +373,9 @@ impl Mempool { } async fn broadcast_data_proposal(&mut self, data_proposal: DataProposal) -> Result<()> { + if self.validators.is_empty() { + return Ok(()); + } self.metrics .add_broadcasted_data_proposal("blob".to_string()); _ = self @@ -446,6 +442,7 @@ impl Mempool { validator: &ValidatorPublicKey, cars: Vec, ) -> Result<()> { + // cleanup previously tracked sent sync request self.metrics.add_sent_sync_reply("blob".to_string()); _ = self .bus diff --git a/src/mempool/storage.rs b/src/mempool/storage.rs index 88a1c3f55..9362fb34c 100644 --- a/src/mempool/storage.rs +++ b/src/mempool/storage.rs @@ -1,23 +1,28 @@ +use anyhow::{bail, Result}; use bincode::{Decode, Encode}; use serde::{Deserialize, Serialize}; use std::{ - collections::{HashMap, HashSet}, + cmp::Ordering, + collections::{BTreeMap, HashMap, HashSet}, fmt::Display, hash::Hash, vec, }; use tracing::{debug, error, warn}; -use crate::{ - mempool::BatchInfo, - model::{Block, Transaction, ValidatorPublicKey}, -}; +use crate::model::{Transaction, ValidatorPublicKey}; #[derive(Debug, Clone, Encode, Decode, Default, Serialize, Deserialize, PartialEq, Eq, Hash)] pub struct TipData { pub pos: usize, pub parent: Option, - pub votes: Vec, + pub poa: Vec, +} + +#[derive(Debug, Clone)] +pub enum ProposalVerdict { + Wait(Option), + Vote, } #[derive(Debug, Clone)] @@ -40,6 +45,44 @@ impl Display for InMemoryStorage { } } +pub type Cut = BTreeMap>; + +#[derive(Debug, Default, Clone, Deserialize, Serialize, Encode, Decode)] +pub struct CutWithTxs { + pub tips: Cut, + pub txs: Vec, +} + +impl CutWithTxs { + fn extend_from_lane( + &mut self, + validator: &ValidatorPublicKey, + lane: &mut Lane, + txs: &mut HashSet, + ) { + self.tips.insert( + validator.clone(), + lane.cars.last().and_then(|car| { + if car.used_in_cut { + None + } else { + txs.extend(car.txs.clone()); + Some(CutCar { + id: car.id, + parent: car.parent, + }) + } + }), + ); + } +} + +#[derive(Debug, Default, Clone, Deserialize, Serialize, Encode, Decode, PartialEq, Eq, Hash)] +pub struct CutCar { + pub id: usize, + pub parent: Option, +} + impl InMemoryStorage { pub fn new(id: ValidatorPublicKey) -> InMemoryStorage { InMemoryStorage { @@ -53,24 +96,29 @@ impl InMemoryStorage { } } - fn collect_lane( - lane: &mut Lane, - validator: &ValidatorPublicKey, - batch_info: &BatchInfo, - txs: &Vec, - ) { - if validator == &batch_info.validator { - if let Some(i) = lane.cars.iter().position(|c| c.id == batch_info.tip.pos) { - // anything prior to last_pos can be collected - lane.cars.drain(0..i); + pub fn tip_poa(&self) -> Option> { + self.lane + .current() + .map(|car| car.poa.clone().drain().collect()) + } + + pub fn try_a_new_cut(&mut self, nb_validators: usize) -> Option { + if let Some(car) = self.lane.current() { + if car.poa.len() > nb_validators / 3 { + let mut txs = HashSet::new(); + let mut cut = CutWithTxs { + txs: Vec::new(), + tips: BTreeMap::new(), + }; + cut.extend_from_lane(&self.id, &mut self.lane, &mut txs); + for (validator, lane) in self.other_lanes.iter_mut() { + cut.extend_from_lane(validator, lane, &mut txs); + } + cut.txs.extend(txs); + return Some(cut); } - } else if let Some(i) = lane - .cars - .iter() - .position(|c| c.id == batch_info.tip.pos && &c.txs == txs) - { - lane.cars.drain(0..i); } + None } // Called after receiving a transaction, before broadcasting a dataproposal @@ -83,100 +131,122 @@ impl InMemoryStorage { id: current_id, parent: tip_id, txs, - votes: HashSet::from([self.id.clone()]), + poa: HashSet::from([self.id.clone()]), + used_in_cut: false, }); current_id } - // Called by the initial proposal sender to aggregate votes - pub fn add_data_vote( + // Called by the initial proposal validator to aggregate votes + pub fn new_data_vote( &mut self, - sender: &ValidatorPublicKey, + validator: &ValidatorPublicKey, data_proposal: &DataProposal, - ) -> Option> { + ) -> Option<()> { let car = self .lane .cars .iter_mut() - .find(|c| c.id == data_proposal.pos && c.txs == data_proposal.inner); + .find(|c| c.id == data_proposal.pos && c.txs == data_proposal.txs); match car { None => { warn!( - "Vote for Car that does not exist! ({sender}) data_proposal: {} / lane: {}", - data_proposal, self.lane + "Vote for Car that does not exist! ({validator}) / lane: {}", + self.lane ); None } - Some(v) => { - v.votes.insert(sender.clone()); - Some(v.votes.clone()) + Some(c) => { + if c.poa.contains(validator) { + warn!("{} already voted for data proposal", validator); + None + } else { + c.poa.insert(validator.clone()); + Some(()) + } } } } - // Called when a data proposal is received try to bind it to the previous tip (true) or fails (false) - pub fn append_data_proposal( + pub fn new_data_proposal( &mut self, - sender: &ValidatorPublicKey, + validator: &ValidatorPublicKey, data_proposal: &DataProposal, - ) -> bool { - let lane = self.other_lanes.entry(sender.clone()).or_default(); - let tip = lane.current_mut(); + ) -> Result { + if !self.has_parent_data_proposal(validator, data_proposal) { + self.push_data_proposal_into_waiting_room(validator, data_proposal.clone()); + return Ok(ProposalVerdict::Wait(data_proposal.parent)); + } + if let (Some(parent), Some(parent_poa)) = (data_proposal.parent, &data_proposal.parent_poa) + { + self.update_parent_poa(validator, parent, parent_poa) + } + if self.has_data_proposal(validator, data_proposal) { + bail!("we already have voted for {}'s data_proposal", validator); + } + self.add_data_proposal(validator, data_proposal); + Ok(ProposalVerdict::Vote) + } - let mut tip_id: Option = None; + fn add_data_proposal(&mut self, validator: &ValidatorPublicKey, data_proposal: &DataProposal) { + let lane = self.other_lanes.entry(validator.clone()).or_default(); + let tip_id = lane.current().map(|car| car.id); + lane.cars.push(Car { + id: tip_id.unwrap_or(0) + 1, + parent: tip_id, + txs: data_proposal.txs.clone(), + poa: HashSet::from([self.id.clone(), validator.clone()]), + used_in_cut: false, + }); + } - // merge parent votes - if let Some(p) = tip { - tip_id = Some(p.id); - if let Some(v) = data_proposal.parent_poa.as_ref() { - p.votes.reserve(v.len()); - for v in v.iter() { - p.votes.insert(v.clone()); - } - } + fn has_parent_data_proposal( + &mut self, + validator: &ValidatorPublicKey, + data_proposal: &DataProposal, + ) -> bool { + let lane = self.other_lanes.entry(validator.clone()).or_default(); + let tip = lane.current_mut(); + if let Some(parent) = data_proposal.parent { + tip.map(|car| car.id == parent).unwrap_or_default() + } else { + true } + } - if data_proposal.parent == tip_id { - lane.cars.push(Car { - id: tip_id.unwrap_or(0) + 1, - parent: tip_id, - txs: data_proposal.inner.clone(), - votes: HashSet::from([self.id.clone(), sender.clone()]), - }); - true - } else { - false + fn update_parent_poa( + &mut self, + validator: &ValidatorPublicKey, + parent: usize, + parent_poa: &[ValidatorPublicKey], + ) { + let lane = self.other_lanes.entry(validator.clone()).or_default(); + if let Some(tip) = lane.current_mut() { + if tip.id == parent { + tip.poa.extend(parent_poa.iter().cloned()); + } } } - pub fn has_data_proposal( + fn has_data_proposal( &mut self, - sender: &ValidatorPublicKey, + validator: &ValidatorPublicKey, data_proposal: &DataProposal, ) -> bool { - let found = self - .other_lanes - .entry(sender.clone()) + self.other_lanes + .entry(validator.clone()) .or_default() .cars .iter() - .any(|c| c.id == data_proposal.pos && c.txs == data_proposal.inner); - - if !found { - debug!( - "Data proposal {} not found in lane {}", - data_proposal, self.lane - ); - } - - found + .any(|c| c.id == data_proposal.pos && c.txs == data_proposal.txs) } - pub fn get_last_data_index(&self, sender: &ValidatorPublicKey) -> Option { + #[cfg(test)] + fn get_last_data_index(&self, validator: &ValidatorPublicKey) -> Option { self.other_lanes - .get(sender) + .get(validator) .and_then(|lane| lane.current()) .map(|c| c.id) } @@ -190,17 +260,17 @@ impl InMemoryStorage { .lane .cars .iter() - .find(|c| c.id == data_proposal.pos && c.txs == data_proposal.inner); + .find(|c| c.id == data_proposal.pos && c.txs == data_proposal.txs); match car { None => { error!( - "data proposal does exist locally as a car! data_proposal: {} / lane: {}", - data_proposal, self.lane + "data proposal does exist locally as a car! lane: {}", + self.lane ); None } - Some(v) => { + Some(c) => { //Normally last_index must be < current_car since we are on the lane reference (that must have more data than others) match last_index { // Nothing on the lane, we send everything, up to the data proposal id/pos @@ -209,22 +279,22 @@ impl InMemoryStorage { .cars .clone() .into_iter() - .take_while(|c| c.id != v.id) + .take_while(|car| car.id != c.id) .collect(), ), // If there is an index, two cases // - it matches the current tip, in this case we don't send any more data // - it does not match, we send the diff Some(last_index_usize) => { - if last_index_usize == v.id { + if last_index_usize == c.id { None } else { debug!( "Trying to compute diff between {} and last_index {}", - v, last_index_usize + c, last_index_usize ); let mut missing_cars: Vec = vec![]; - let mut current_car = v; + let mut current_car = c; loop { current_car = self.lane.cars.get(current_car.parent.unwrap() - 1).unwrap(); @@ -246,16 +316,16 @@ impl InMemoryStorage { } } - pub fn get_waiting_proposals(&mut self, sender: &ValidatorPublicKey) -> Vec { - match self.other_lanes.get_mut(sender) { + pub fn get_waiting_proposals(&mut self, validator: &ValidatorPublicKey) -> Vec { + match self.other_lanes.get_mut(validator) { Some(sl) => sl.waiting.drain().collect(), None => vec![], } } - // Updates local view other lane matching the sender with sent cars - pub fn add_missing_cars(&mut self, sender: &ValidatorPublicKey, cars: Vec) { - let lane = self.other_lanes.entry(sender.clone()).or_default(); + // Updates local view other lane matching the validator with sent cars + pub fn add_missing_cars(&mut self, validator: &ValidatorPublicKey, cars: Vec) { + let lane = self.other_lanes.entry(validator.clone()).or_default(); let mut ordered_cars = cars; ordered_cars.sort_by_key(|car| car.id); @@ -275,11 +345,11 @@ impl InMemoryStorage { // Called when validate return pub fn push_data_proposal_into_waiting_room( &mut self, - sender: &ValidatorPublicKey, + validator: &ValidatorPublicKey, data_proposal: DataProposal, ) { self.other_lanes - .entry(sender.clone()) + .entry(validator.clone()) .or_default() .waiting .insert(data_proposal); @@ -296,21 +366,45 @@ impl InMemoryStorage { TipData { pos: car.id, parent: car.parent, - votes: car.votes.clone().into_iter().collect(), + poa: car.poa.clone().into_iter().collect(), }, car.txs.clone(), ) }) } + pub fn tip_already_used(&self) -> bool { + self.lane + .current() + .map(|car| car.used_in_cut) + .unwrap_or_default() + } + pub fn flush_pending_txs(&mut self) -> Vec { self.pending_txs.drain(0..).collect() } - pub fn update_lanes_after_commit(&mut self, batch_info: BatchInfo, block: Block) { - Self::collect_lane(&mut self.lane, &self.id, &batch_info, &block.txs); - for (v, lane) in self.other_lanes.iter_mut() { - Self::collect_lane(lane, v, &batch_info, &block.txs); + fn collect_old_used_cars(cars: &mut Vec, some_tip: &Option) { + if let Some(tip) = some_tip { + cars.retain_mut(|car| match car.id.cmp(&tip.id) { + Ordering::Less => false, + Ordering::Equal => { + car.used_in_cut = true; + true + } + Ordering::Greater => true, + }); + } + } + + pub fn update_lanes_after_commit(&mut self, lanes: Cut) { + if let Some(tip) = lanes.get(&self.id) { + Self::collect_old_used_cars(&mut self.lane.cars, tip); + } + for (validator, lane) in self.other_lanes.iter_mut() { + if let Some(tip) = lanes.get(validator) { + Self::collect_old_used_cars(&mut lane.cars, tip); + } } } } @@ -320,17 +414,19 @@ pub struct Car { id: usize, parent: Option, txs: Vec, - pub votes: HashSet, + pub poa: HashSet, + used_in_cut: bool, } impl Display for Car { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, - "[{}/{:?}/{}v]", + "[{}/{:?}/{}v] (used: {})", self.id, self.txs.first(), - self.votes.len() + self.poa.len(), + self.used_in_cut, ) } } @@ -368,7 +464,7 @@ impl Lane { } #[cfg(test)] - pub fn size(&self) -> usize { + fn size(&self) -> usize { self.cars.len() } } @@ -378,7 +474,7 @@ pub struct DataProposal { pub pos: usize, pub parent: Option, pub parent_poa: Option>, - pub inner: Vec, + pub txs: Vec, } impl Display for DataProposal { @@ -388,7 +484,7 @@ impl Display for DataProposal { "{{ {:?} <- [{}/{:?}] }}", self.parent, self.pos, - self.inner.first() + self.txs.first() ) } } @@ -398,18 +494,13 @@ mod tests { use std::collections::HashSet; use crate::{ - mempool::{ - storage::{Car, DataProposal, InMemoryStorage}, - BatchInfo, - }, + mempool::storage::{Car, DataProposal, InMemoryStorage}, model::{ - Blob, BlobData, BlobTransaction, Block, BlockHash, BlockHeight, ContractName, Identity, - Transaction, TransactionData, ValidatorPublicKey, + Blob, BlobData, BlobTransaction, ContractName, Identity, Transaction, TransactionData, + ValidatorPublicKey, }, }; - use super::TipData; - fn make_tx(inner_tx: &'static str) -> Transaction { Transaction { version: 1, @@ -429,42 +520,42 @@ mod tests { let pubkey3 = ValidatorPublicKey(vec![3]); let mut store = InMemoryStorage::new(pubkey3.clone()); - store.append_data_proposal( + store.add_data_proposal( &pubkey2, &DataProposal { pos: 1, parent: None, - inner: vec![make_tx("test1")], + txs: vec![make_tx("test1")], parent_poa: None, }, ); - store.append_data_proposal( + store.add_data_proposal( &pubkey2, &DataProposal { pos: 2, parent: Some(1), - inner: vec![make_tx("test2")], + txs: vec![make_tx("test2")], parent_poa: None, }, ); - store.append_data_proposal( + store.add_data_proposal( &pubkey2, &DataProposal { pos: 3, parent: Some(2), - inner: vec![make_tx("test3")], + txs: vec![make_tx("test3")], parent_poa: None, }, ); - store.append_data_proposal( + store.add_data_proposal( &pubkey2, &DataProposal { pos: 4, parent: Some(3), - inner: vec![make_tx("test4")], + txs: vec![make_tx("test4")], parent_poa: None, }, ); @@ -484,7 +575,8 @@ mod tests { id: 1, parent: None, txs: vec![make_tx("test1")], - votes: HashSet::from([pubkey3.clone(), pubkey2.clone()]), + poa: HashSet::from([pubkey3.clone(), pubkey2.clone()]), + used_in_cut: false, } ); @@ -493,7 +585,7 @@ mod tests { &DataProposal { pos: 4, parent: Some(3), - inner: vec![make_tx("test4")], + txs: vec![make_tx("test4")], parent_poa: None, }, ); @@ -517,35 +609,35 @@ mod tests { store.add_data_to_local_lane(txs.clone()); let data_proposal = DataProposal { - inner: txs, + txs, pos: 1, parent: None, parent_poa: None, }; - store.append_data_proposal(&pubkey2, &data_proposal); + store.add_data_proposal(&pubkey2, &data_proposal); assert!(store.has_data_proposal(&pubkey2, &data_proposal)); assert_eq!(store.get_last_data_index(&pubkey2), Some(1)); - store.append_data_proposal(&pubkey1, &data_proposal); + store.add_data_proposal(&pubkey1, &data_proposal); assert!(store.has_data_proposal(&pubkey1, &data_proposal)); assert_eq!(store.get_last_data_index(&pubkey1), Some(1)); let some_tip = store.tip_data(); assert!(some_tip.is_some()); let (tip, txs) = some_tip.unwrap(); - assert_eq!(tip.votes.len(), 1); + assert_eq!(tip.poa.len(), 1); assert_eq!(txs.len(), 4); - store.add_data_vote(&pubkey2, &data_proposal); - store.add_data_vote(&pubkey1, &data_proposal); + store.new_data_vote(&pubkey2, &data_proposal); + store.new_data_vote(&pubkey1, &data_proposal); let some_tip = store.tip_data(); assert!(some_tip.is_some()); let tip = some_tip.unwrap().0; - assert_eq!(tip.votes.len(), 3); - assert!(&tip.votes.contains(&pubkey3)); - assert!(&tip.votes.contains(&pubkey1)); - assert!(&tip.votes.contains(&pubkey2)); + assert_eq!(tip.poa.len(), 3); + assert!(&tip.poa.contains(&pubkey3)); + assert!(&tip.poa.contains(&pubkey1)); + assert!(&tip.poa.contains(&pubkey2)); } #[test] @@ -555,75 +647,53 @@ mod tests { let mut store = InMemoryStorage::new(pubkey3.clone()); let data_proposal1 = DataProposal { - inner: vec![make_tx("test1"), make_tx("test2"), make_tx("test3")], + txs: vec![make_tx("test1"), make_tx("test2"), make_tx("test3")], pos: 1, parent: None, parent_poa: None, }; let data_proposal2 = DataProposal { - inner: vec![make_tx("test4"), make_tx("test5"), make_tx("test6")], - pos: 2, - parent: Some(1), + txs: vec![make_tx("test4"), make_tx("test5"), make_tx("test6")], + pos: 1, + parent: None, parent_poa: Some(vec![pubkey3.clone(), pubkey2.clone()]), }; let data_proposal3 = DataProposal { - inner: vec![make_tx("test7"), make_tx("test8"), make_tx("test9")], - pos: 3, - parent: Some(2), - parent_poa: Some(vec![pubkey3.clone(), pubkey2.clone()]), - }; - - let data_proposal4 = DataProposal { - inner: vec![make_tx("testA"), make_tx("testB"), make_tx("testC")], - pos: 4, - parent: Some(3), + txs: vec![make_tx("test7"), make_tx("test8"), make_tx("test9")], + pos: 2, + parent: Some(1), parent_poa: Some(vec![pubkey3.clone(), pubkey2.clone()]), }; - store.add_data_to_local_lane(data_proposal1.inner.clone()); - store.add_data_to_local_lane(data_proposal2.inner.clone()); - store.add_data_to_local_lane(data_proposal3.inner.clone()); - store.add_data_to_local_lane(data_proposal4.inner.clone()); - - store.append_data_proposal(&pubkey2, &data_proposal1); - store.append_data_proposal(&pubkey2, &data_proposal2); - store.append_data_proposal(&pubkey2, &data_proposal3); - store.append_data_proposal(&pubkey2, &data_proposal4); + store.add_data_to_local_lane(data_proposal1.txs.clone()); + store.new_data_vote(&pubkey2, &data_proposal1); + store + .new_data_proposal(&pubkey2, &data_proposal2) + .expect("add proposal 2"); + store + .new_data_proposal(&pubkey2, &data_proposal3) + .expect("add proposal 3"); - let batch_info = BatchInfo { - tip: TipData { - pos: 4, - parent: Some(3), - votes: vec![pubkey3.clone(), pubkey2.clone()], - }, - validator: pubkey3.clone(), - }; - let block = Block { - parent_hash: BlockHash { - inner: vec![4, 5, 6], - }, - height: BlockHeight(42), - timestamp: 1234, - txs: vec![make_tx("testA"), make_tx("testB"), make_tx("testC")], - new_bonded_validators: vec![pubkey3.clone(), pubkey2.clone()], - }; + assert_eq!(store.lane.size(), 1); + assert_eq!(store.other_lanes.get(&pubkey2).map(|l| l.size()), Some(2)); - assert_eq!(store.lane.size(), 4); - assert_eq!(store.other_lanes.get(&pubkey2).map(|l| l.size()), Some(4)); + let cut = store.try_a_new_cut(2); + assert!(cut.is_some()); + assert_eq!(cut.as_ref().map(|cut| cut.txs.len()), Some(6)); - store.update_lanes_after_commit(batch_info, block); + store.update_lanes_after_commit(cut.unwrap().tips); assert_eq!(store.lane.size(), 1); assert_eq!(store.other_lanes.get(&pubkey2).map(|l| l.size()), Some(1)); - assert_eq!(store.lane.current().map(|c| c.id), Some(4)); + assert_eq!(store.lane.current().map(|c| c.id), Some(1)); assert_eq!( store .other_lanes .get(&pubkey2) .and_then(|l| l.current().map(|c| c.id)), - Some(4) + Some(2) ); } @@ -646,13 +716,15 @@ mod tests { make_tx("test3"), make_tx("test4"), ], - votes: HashSet::from([pubkey1.clone(), pubkey2.clone()]), + poa: HashSet::from([pubkey1.clone(), pubkey2.clone()]), + used_in_cut: false, }, Car { id: 2, parent: Some(1), txs: vec![make_tx("test5"), make_tx("test6"), make_tx("test7")], - votes: HashSet::from([pubkey1.clone(), pubkey2.clone()]), + poa: HashSet::from([pubkey1.clone(), pubkey2.clone()]), + used_in_cut: false, }, ], ); @@ -675,7 +747,7 @@ mod tests { &DataProposal { pos: 4, parent: Some(3), - inner: vec![make_tx("test_local4")], + txs: vec![make_tx("test_local4")], parent_poa: None, }, ); @@ -687,13 +759,15 @@ mod tests { id: 2, parent: Some(1), txs: vec![make_tx("test_local2")], - votes: HashSet::from_iter(vec![pubkey3.clone()]) + poa: HashSet::from_iter(vec![pubkey3.clone()]), + used_in_cut: false, }, Car { id: 3, parent: Some(2), txs: vec![make_tx("test_local3")], - votes: HashSet::from_iter(vec![pubkey3.clone()]) + poa: HashSet::from_iter(vec![pubkey3.clone()]), + used_in_cut: false, } ]) ); @@ -703,7 +777,7 @@ mod tests { &DataProposal { pos: 4, parent: Some(3), - inner: vec![make_tx("test_local4")], + txs: vec![make_tx("test_local4")], parent_poa: None, }, ); @@ -715,19 +789,22 @@ mod tests { id: 1, parent: None, txs: vec![make_tx("test_local")], - votes: HashSet::from_iter(vec![pubkey3.clone()]) + poa: HashSet::from_iter(vec![pubkey3.clone()]), + used_in_cut: false, }, Car { id: 2, parent: Some(1), txs: vec![make_tx("test_local2")], - votes: HashSet::from_iter(vec![pubkey3.clone()]) + poa: HashSet::from_iter(vec![pubkey3.clone()]), + used_in_cut: false, }, Car { id: 3, parent: Some(2), txs: vec![make_tx("test_local3")], - votes: HashSet::from_iter(vec![pubkey3.clone()]) + poa: HashSet::from_iter(vec![pubkey3.clone()]), + used_in_cut: false, } ]) ); diff --git a/src/model.rs b/src/model.rs index 3162ef248..c522f605e 100644 --- a/src/model.rs +++ b/src/model.rs @@ -436,7 +436,7 @@ impl Add for BlockHeight { } } -#[derive(Clone, Encode, Decode, Default, Eq, PartialEq, Hash)] +#[derive(Clone, Encode, Decode, Default, Eq, PartialEq, Hash, PartialOrd, Ord)] pub struct ValidatorPublicKey(pub Vec); impl std::fmt::Debug for ValidatorPublicKey {