diff --git a/base_layer/core/src/base_node/proto/rpc.proto b/base_layer/core/src/base_node/proto/rpc.proto index bbc6aea8b31..2e7d08443f2 100644 --- a/base_layer/core/src/base_node/proto/rpc.proto +++ b/base_layer/core/src/base_node/proto/rpc.proto @@ -57,16 +57,22 @@ message SyncKernelsRequest { } message SyncUtxosRequest { + // Start header hash to sync UTXOs from bytes start_header_hash = 1; + // End header hash to sync UTXOs to bytes end_header_hash = 2; -} -message SyncUtxosResponse { - tari.types.TransactionOutput output = 1; - bytes mined_header = 2; + // Indicate if spent UTXOs should be included, typically not for the initial sync + bool include_spent_txos = 3; } -message PrunedOutput { - bytes hash = 1; +message SyncUtxosResponse { + oneof txo { + // The unspent transaction output + tari.types.TransactionOutput output = 1; + // If the TXO is spent, the commitment bytes are returned + bytes commitment = 2; + } + bytes mined_header = 3; } message SyncUtxosByBlockRequest { diff --git a/base_layer/core/src/base_node/sync/horizon_state_sync/error.rs b/base_layer/core/src/base_node/sync/horizon_state_sync/error.rs index 4f1a40ff891..6aff7e45102 100644 --- a/base_layer/core/src/base_node/sync/horizon_state_sync/error.rs +++ b/base_layer/core/src/base_node/sync/horizon_state_sync/error.rs @@ -30,6 +30,7 @@ use tari_comms::{ }; use tari_crypto::errors::RangeProofError; use tari_mmr::{error::MerkleMountainRangeError, sparse_merkle_tree::SMTError}; +use tari_utilities::ByteArrayError; use thiserror::Error; use tokio::task; @@ -97,6 +98,14 @@ pub enum HorizonSyncError { PeerNotFound, #[error("Sparse Merkle Tree error: {0}")] SMTError(#[from] SMTError), + #[error("ByteArrayError error: {0}")] + ByteArrayError(String), +} + +impl From for HorizonSyncError { + fn from(e: ByteArrayError) -> Self { + HorizonSyncError::ByteArrayError(e.to_string()) + } } impl From for HorizonSyncError { @@ -142,7 +151,8 @@ impl HorizonSyncError { err @ HorizonSyncError::ConversionError(_) | err @ HorizonSyncError::MerkleMountainRangeError(_) | err @ HorizonSyncError::FixedHashSizeError(_) | - err @ HorizonSyncError::TransactionError(_) => Some(BanReason { + err @ HorizonSyncError::TransactionError(_) | + err @ HorizonSyncError::ByteArrayError(_) => Some(BanReason { reason: format!("{}", err), ban_duration: BanPeriod::Long, }), diff --git a/base_layer/core/src/base_node/sync/horizon_state_sync/synchronizer.rs b/base_layer/core/src/base_node/sync/horizon_state_sync/synchronizer.rs index 3a4a528841e..1340d4e09f1 100644 --- a/base_layer/core/src/base_node/sync/horizon_state_sync/synchronizer.rs +++ b/base_layer/core/src/base_node/sync/horizon_state_sync/synchronizer.rs @@ -50,7 +50,7 @@ use crate::{ chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, ChainStorageError, MmrTree}, common::{rolling_avg::RollingAverageTime, BanPeriod}, consensus::ConsensusManager, - proto::base_node::{SyncKernelsRequest, SyncUtxosRequest, SyncUtxosResponse}, + proto::base_node::{sync_utxos_response::Txo, SyncKernelsRequest, SyncUtxosRequest, SyncUtxosResponse}, transactions::transaction_components::{ transaction_output::batch_verify_range_proofs, TransactionKernel, @@ -129,7 +129,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { target: LOG_TARGET, "Preparing database for horizon sync to height #{}", self.horizon_sync_height ); - let header = self.db().fetch_header(self.horizon_sync_height).await?.ok_or_else(|| { + let to_header = self.db().fetch_header(self.horizon_sync_height).await?.ok_or_else(|| { ChainStorageError::ValueNotFound { entity: "Header", field: "height", @@ -139,7 +139,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { let mut latency_increases_counter = 0; loop { - match self.sync(&header).await { + match self.sync(&to_header).await { Ok(()) => return Ok(()), Err(err @ HorizonSyncError::AllSyncPeersExceedLatency) => { // If we don't have many sync peers to select from, return the listening state and see if we can get @@ -167,7 +167,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { } } - async fn sync(&mut self, header: &BlockHeader) -> Result<(), HorizonSyncError> { + async fn sync(&mut self, to_header: &BlockHeader) -> Result<(), HorizonSyncError> { let sync_peer_node_ids = self.sync_peers.iter().map(|p| p.node_id()).cloned().collect::>(); info!( target: LOG_TARGET, @@ -176,7 +176,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { ); let mut latency_counter = 0usize; for node_id in sync_peer_node_ids { - match self.connect_and_attempt_sync(&node_id, header).await { + match self.connect_and_attempt_sync(&node_id, to_header).await { Ok(_) => return Ok(()), // Try another peer Err(err) => { @@ -213,7 +213,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { async fn connect_and_attempt_sync( &mut self, node_id: &NodeId, - header: &BlockHeader, + to_header: &BlockHeader, ) -> Result<(), HorizonSyncError> { let peer_index = self .get_sync_peer_index(node_id) @@ -250,7 +250,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { debug!(target: LOG_TARGET, "Sync peer latency is {:.2?}", latency); let sync_peer = self.sync_peers[peer_index].clone(); - self.begin_sync(sync_peer.clone(), &mut client, header).await?; + self.begin_sync(sync_peer.clone(), &mut client, to_header).await?; self.finalize_horizon_sync(&sync_peer).await?; Ok(()) @@ -328,7 +328,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { "Requesting kernels from {} to {} ({} remaining)", local_num_kernels, remote_num_kernels, - remote_num_kernels - local_num_kernels, + remote_num_kernels.saturating_sub(local_num_kernels), ); let latency = client.get_last_request_latency(); @@ -374,7 +374,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { } txn.insert_kernel_via_horizon_sync(kernel, *current_header.hash(), mmr_position); - if mmr_position == current_header.header().kernel_mmr_size - 1 { + if mmr_position == current_header.header().kernel_mmr_size.saturating_sub(1) { let num_kernels = kernel_hashes.len(); debug!( target: LOG_TARGET, @@ -425,9 +425,9 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { num_kernels, mmr_position + 1, end, - end - (mmr_position + 1) + end.saturating_sub(mmr_position + 1) ); - if mmr_position < end - 1 { + if mmr_position < end.saturating_sub(1) { current_header = db.fetch_chain_header(current_header.height() + 1).await?; } } @@ -483,42 +483,39 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { let remote_num_outputs = to_header.output_smt_size; self.num_outputs = remote_num_outputs; + let db = self.db().clone(); + + let tip_header = db.fetch_tip_header().await?; + let local_num_outputs = tip_header.header().output_smt_size; let info = HorizonSyncInfo::new(vec![sync_peer.node_id().clone()], HorizonSyncStatus::Outputs { - current: 0, + current: local_num_outputs, total: self.num_outputs, sync_peer: sync_peer.clone(), }); self.hooks.call_on_progress_horizon_hooks(info); - debug!( - target: LOG_TARGET, - "Requesting outputs from {}", - remote_num_outputs, - ); - let db = self.db().clone(); - - let end = remote_num_outputs; - let end_hash = to_header.hash(); - let start_hash = db.fetch_chain_header(1).await?; - let gen_block = db.fetch_chain_header(0).await?; - let latency = client.get_last_request_latency(); debug!( target: LOG_TARGET, - "Initiating output sync with peer `{}` (latency = {}ms)", + "Initiating output sync with peer `{}`, requesting {} outputs, tip_header height `{}`, last_chain_header height `{}` (latency = {}ms)", sync_peer.node_id(), - latency.unwrap_or_default().as_millis() + remote_num_outputs.saturating_sub(local_num_outputs), + tip_header.height(), + db.fetch_last_chain_header().await?.height(), + latency.unwrap_or_default().as_millis(), ); + let start_chain_header = db.fetch_chain_header(tip_header.height() + 1).await?; + let initial_sync = tip_header.header().height == 0; let req = SyncUtxosRequest { - start_header_hash: start_hash.hash().to_vec(), - end_header_hash: end_hash.to_vec(), + start_header_hash: start_chain_header.hash().to_vec(), + end_header_hash: to_header.hash().to_vec(), + include_spent_txos: !initial_sync, }; - let mut output_stream = client.sync_utxos(req).await?; let mut txn = db.write_transaction(); - let mut utxo_counter = gen_block.header().output_smt_size; + let mut utxo_counter = local_num_outputs; let timer = Instant::now(); let mut output_smt = db.fetch_tip_smt().await?; let mut last_sync_timer = Instant::now(); @@ -528,16 +525,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { let latency = last_sync_timer.elapsed(); avg_latency.add_sample(latency); let res: SyncUtxosResponse = response?; - utxo_counter += 1; - if utxo_counter > end { - return Err(HorizonSyncError::IncorrectResponse( - "Peer sent too many outputs".to_string(), - )); - } - let output = res - .output - .ok_or_else(|| HorizonSyncError::IncorrectResponse("Peer sent no transaction output data".into()))?; let output_header = FixedHash::try_from(res.mined_header) .map_err(|_| HorizonSyncError::IncorrectResponse("Peer sent no mined header".into()))?; let current_header = self @@ -548,28 +536,67 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { HorizonSyncError::IncorrectResponse("Peer sent mined header we do not know of".into()) })?; - let constants = self.rules.consensus_constants(current_header.height).clone(); - let output = TransactionOutput::try_from(output).map_err(HorizonSyncError::ConversionError)?; - trace!( + let proto_output = res + .txo + .ok_or_else(|| HorizonSyncError::IncorrectResponse("Peer sent no transaction output data".into()))?; + match proto_output { + Txo::Output(output) => { + utxo_counter += 1; + if initial_sync && utxo_counter > remote_num_outputs { + return Err(HorizonSyncError::IncorrectResponse( + "Peer sent too many outputs".to_string(), + )); + } + + let constants = self.rules.consensus_constants(current_header.height).clone(); + let output = TransactionOutput::try_from(output).map_err(HorizonSyncError::ConversionError)?; + debug!( target: LOG_TARGET, - "UTXO {} received from sync peer", + "UTXO {} received from sync peer ({} of {})", output.hash(), - ); - helpers::check_tari_script_byte_size(&output.script, constants.max_script_byte_size())?; - - batch_verify_range_proofs(&self.prover, &[&output])?; - let smt_key = NodeKey::try_from(output.commitment.as_bytes())?; - let smt_node = ValueHash::try_from(output.smt_hash(current_header.height).as_slice())?; - output_smt.insert(smt_key, smt_node)?; - txn.insert_output_via_horizon_sync( - output, - current_header.hash(), - current_header.height, - current_header.timestamp.as_u64(), - ); + utxo_counter.saturating_sub(local_num_outputs), + if initial_sync { + remote_num_outputs.saturating_sub(local_num_outputs).to_string() + } else { + "?".to_string() + }, + ); + helpers::check_tari_script_byte_size(&output.script, constants.max_script_byte_size())?; + + batch_verify_range_proofs(&self.prover, &[&output])?; + let smt_key = NodeKey::try_from(output.commitment.as_bytes())?; + let smt_node = ValueHash::try_from(output.smt_hash(current_header.height).as_slice())?; + output_smt.insert(smt_key, smt_node)?; + txn.insert_output_via_horizon_sync( + output, + current_header.hash(), + current_header.height, + current_header.timestamp.as_u64(), + ); - // we have checked the range proof, and we have checked that the linked to header exists. - txn.commit().await?; + // We have checked the range proof, and we have checked that the linked to header exists. + txn.commit().await?; + }, + Txo::Commitment(commitment_bytes) => { + if initial_sync { + return Err(HorizonSyncError::IncorrectResponse( + "Peer sent deleted output hash during initial sync".to_string(), + )); + } + debug!( + target: LOG_TARGET, + "STXO hash {} received from sync peer ({} of ?)", + self.db().fetch_unspent_output_hash_by_commitment(Commitment::from_canonical_bytes( + commitment_bytes.as_slice(), + )?).await?.ok_or(HorizonSyncError::IncorrectResponse( + "Peer sent unknown commitment hash".into(), + ))?, + utxo_counter.saturating_sub(local_num_outputs), + ); + let smt_key = NodeKey::try_from(commitment_bytes.as_slice())?; + output_smt.delete(&smt_key)?; + }, + } if utxo_counter % 100 == 0 { let info = HorizonSyncInfo::new(vec![sync_peer.node_id().clone()], HorizonSyncStatus::Outputs { @@ -583,7 +610,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { sync_peer.add_sample(last_sync_timer.elapsed()); last_sync_timer = Instant::now(); } - if utxo_counter != end { + if initial_sync && utxo_counter != remote_num_outputs { return Err(HorizonSyncError::IncorrectResponse( "Peer did not send enough outputs".to_string(), )); @@ -591,7 +618,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { debug!( target: LOG_TARGET, "finished syncing UTXOs: {} downloaded in {:.2?}", - end, + remote_num_outputs.saturating_sub(local_num_outputs), timer.elapsed() ); let root = FixedHash::try_from(output_smt.hash().as_slice())?; @@ -693,7 +720,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { curr_header.height(), curr_header.header().kernel_mmr_size, prev_kernel_mmr, - curr_header.header().kernel_mmr_size - 1 + curr_header.header().kernel_mmr_size.saturating_sub(1) ); trace!(target: LOG_TARGET, "Number of utxos returned: {}", utxos.len()); diff --git a/base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs b/base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs index 8b03e476a49..ddf4baaa244 100644 --- a/base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs +++ b/base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs @@ -28,7 +28,7 @@ use tari_comms::{ protocol::rpc::{Request, RpcStatus, RpcStatusResultExt}, utils, }; -use tari_utilities::hex::Hex; +use tari_utilities::{hex::Hex, ByteArray}; use tokio::{sync::mpsc, task}; #[cfg(feature = "metrics")] @@ -36,6 +36,7 @@ use crate::base_node::metrics; use crate::{ blocks::BlockHeader, chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend}, + proto, proto::base_node::{SyncUtxosRequest, SyncUtxosResponse}, }; @@ -70,7 +71,7 @@ where B: BlockchainBackend + 'static .fetch_header_by_block_hash(start_hash) .await .rpc_status_internal_error(LOG_TARGET)? - .ok_or_else(|| RpcStatus::not_found("Start header hash is was not found"))?; + .ok_or_else(|| RpcStatus::not_found("Start header hash was not found"))?; let end_hash = msg .end_header_hash @@ -83,7 +84,7 @@ where B: BlockchainBackend + 'static .fetch_header_by_block_hash(end_hash) .await .rpc_status_internal_error(LOG_TARGET)? - .ok_or_else(|| RpcStatus::not_found("End header hash is was not found"))?; + .ok_or_else(|| RpcStatus::not_found("End header hash was not found"))?; if start_header.height > end_header.height { return Err(RpcStatus::bad_request(&format!( "Start header height({}) cannot be greater than the end header height({})", @@ -91,12 +92,17 @@ where B: BlockchainBackend + 'static ))); } + let include_spent_txos = msg.include_spent_txos; + task::spawn(async move { debug!( target: LOG_TARGET, "Starting UTXO stream for peer '{}'", self.peer_node_id ); - if let Err(err) = self.start_streaming(&mut tx, start_header, end_header).await { + if let Err(err) = self + .start_streaming(&mut tx, start_header, end_header, include_spent_txos) + .await + { debug!( target: LOG_TARGET, "UTXO stream errored for peer '{}': {}", self.peer_node_id, err @@ -120,61 +126,65 @@ where B: BlockchainBackend + 'static tx: &mut mpsc::Sender>, mut current_header: BlockHeader, end_header: BlockHeader, + include_spent_txos: bool, ) -> Result<(), RpcStatus> { debug!( target: LOG_TARGET, - "Starting stream task with current_header: {}, end_header: {},", + "Starting stream task with current_header: {}, end_header: {}, include spent TXOs '{}'", current_header.hash().to_hex(), end_header.hash().to_hex(), + include_spent_txos, ); loop { let timer = Instant::now(); let current_header_hash = current_header.hash(); - debug!( target: LOG_TARGET, - "current header = {} ({})", + "Streaming TXO(s) for block #{} ({})", current_header.height, current_header_hash.to_hex() ); - if tx.is_closed() { debug!( target: LOG_TARGET, - "Peer '{}' exited UTXO sync session early", self.peer_node_id + "Peer '{}' exited TXO sync session early", self.peer_node_id ); break; } let outputs_with_statuses = self .db - .fetch_outputs_in_block_with_spend_state(current_header.hash(), Some(end_header.hash())) + .fetch_outputs_in_block_with_spend_state(current_header_hash, Some(end_header.hash())) .await .rpc_status_internal_error(LOG_TARGET)?; - debug!( - target: LOG_TARGET, - "Streaming UTXO(s) for block #{}.", - current_header.height, - ); if tx.is_closed() { debug!( target: LOG_TARGET, - "Peer '{}' exited UTXO sync session early", self.peer_node_id + "Peer '{}' exited TXO sync session early", self.peer_node_id ); break; } - let utxos = outputs_with_statuses + let outputs = outputs_with_statuses .into_iter() .filter_map(|(output, spent)| { - // We only send unspent utxos - if spent { + if spent && include_spent_txos { + debug!(target: LOG_TARGET, "Spent TXO '{}' to peer", output.commitment.to_hex()); + Some(Ok(SyncUtxosResponse { + txo: Some(proto::base_node::sync_utxos_response::Txo::Commitment( + output.commitment.as_bytes().to_vec(), + )), + mined_header: current_header_hash.to_vec(), + })) + } else if spent { + debug!(target: LOG_TARGET, "Spent TXO '{}' not sent to peer", output.commitment.to_hex()); None } else { + debug!(target: LOG_TARGET, "Unspent TXO '{}' to peer", output.commitment.to_hex()); match output.try_into() { Ok(tx_ouput) => Some(Ok(SyncUtxosResponse { - output: Some(tx_ouput), - mined_header: current_header.hash().to_vec(), + txo: Some(proto::base_node::sync_utxos_response::Txo::Output(tx_ouput)), + mined_header: current_header_hash.to_vec(), })), Err(err) => Some(Err(err)), } @@ -185,16 +195,85 @@ where B: BlockchainBackend + 'static .into_iter() .map(Ok); + let txos = if include_spent_txos { + let inputs_in_block = self + .db + .fetch_inputs_in_block(current_header_hash) + .await + .rpc_status_internal_error(LOG_TARGET)?; + debug!( + target: LOG_TARGET, + "Found {} inputs in block #{} '{}'", inputs_in_block.len(), + current_header.height, + current_header_hash + ); + if tx.is_closed() { + debug!( + target: LOG_TARGET, + "Peer '{}' exited TXO sync session early", self.peer_node_id + ); + break; + } + + let mut inputs = Vec::with_capacity(inputs_in_block.len()); + for input in inputs_in_block { + let input_commitment = match self.db.fetch_output(input.output_hash()).await { + Ok(Some(o)) => o.output.commitment, + Ok(None) => { + return Err(RpcStatus::general(&format!( + "Mined info for input '{}' not found", + input.output_hash().to_hex() + ))) + }, + Err(e) => { + return Err(RpcStatus::general(&format!( + "Input '{}' not found ({})", + input.output_hash().to_hex(), + e + ))) + }, + }; + debug!(target: LOG_TARGET, "Spent TXO '{}' to peer", input_commitment.to_hex()); + inputs.push(Ok(SyncUtxosResponse { + txo: Some(proto::base_node::sync_utxos_response::Txo::Commitment( + input_commitment.as_bytes().to_vec(), + )), + mined_header: current_header_hash.to_vec(), + })); + } + let inputs = inputs + .into_iter() + .collect::, String>>() + .map_err(|err| RpcStatus::bad_request(&err))? + .into_iter() + .map(Ok); + debug!( + target: LOG_TARGET, + "Adding {} inputs in response for block #{} '{}'", inputs.len(), + current_header.height, + current_header_hash + ); + + outputs + .chain(inputs) + .collect::, _>>() + .map_err(|err| RpcStatus::bad_request(&err))? + .into_iter() + .map(Ok) + } else { + outputs + }; + // Ensure task stops if the peer prematurely stops their RPC session - let utxos_len = utxos.len(); - if utils::mpsc::send_all(tx, utxos).await.is_err() { + let txos_len = txos.len(); + if utils::mpsc::send_all(tx, txos).await.is_err() { break; } debug!( target: LOG_TARGET, - "Streamed {} utxos in {:.2?} (including stream backpressure)", - utxos_len, + "Streamed {} TXOs in {:.2?} (including stream backpressure)", + txos_len, timer.elapsed() ); @@ -217,7 +296,7 @@ where B: BlockchainBackend + 'static debug!( target: LOG_TARGET, - "UTXO sync completed to Header hash = {}", + "TXO sync completed to Header hash = {}", current_header.hash().to_hex() ); diff --git a/base_layer/core/src/base_node/sync/rpc/tests.rs b/base_layer/core/src/base_node/sync/rpc/tests.rs index 817229ad441..1330061778b 100644 --- a/base_layer/core/src/base_node/sync/rpc/tests.rs +++ b/base_layer/core/src/base_node/sync/rpc/tests.rs @@ -126,6 +126,7 @@ mod sync_utxos { let msg = SyncUtxosRequest { start_header_hash: gen_block_hash.to_vec(), end_header_hash: vec![0; 32], + include_spent_txos: false, }; let req = rpc_request_mock.request_with_context(Default::default(), msg); let err = service.sync_utxos(req).await.unwrap_err(); @@ -140,6 +141,7 @@ mod sync_utxos { let msg = SyncUtxosRequest { start_header_hash: vec![0; 32], end_header_hash: gb.hash().to_vec(), + include_spent_txos: false, }; let req = rpc_request_mock.request_with_context(Default::default(), msg); let err = service.sync_utxos(req).await.unwrap_err(); diff --git a/base_layer/core/src/chain_storage/async_db.rs b/base_layer/core/src/chain_storage/async_db.rs index e108dae80a3..f7c18a1859a 100644 --- a/base_layer/core/src/chain_storage/async_db.rs +++ b/base_layer/core/src/chain_storage/async_db.rs @@ -26,7 +26,7 @@ use primitive_types::U256; use rand::{rngs::OsRng, RngCore}; use tari_common_types::{ chain_metadata::ChainMetadata, - types::{BlockHash, Commitment, FixedHash, HashOutput, PublicKey, Signature}, + types::{BlockHash, Commitment, HashOutput, PublicKey, Signature}, }; use tari_utilities::epoch_time::EpochTime; @@ -59,9 +59,10 @@ use crate::{ }, common::rolling_vec::RollingVec, proof_of_work::{PowAlgorithm, TargetDifficultyWindow}, - transactions::transaction_components::{TransactionKernel, TransactionOutput}, + transactions::transaction_components::{TransactionInput, TransactionKernel, TransactionOutput}, OutputSmt, }; + const LOG_TARGET: &str = "c::bn::async_db"; fn trace_log(name: &str, f: F) -> R @@ -154,15 +155,21 @@ impl AsyncBlockchainDb { //---------------------------------- TXO --------------------------------------------// + make_async_fn!(fetch_output(output_hash: HashOutput) -> Option, "fetch_output"); + + make_async_fn!(fetch_unspent_output_hash_by_commitment(commitment: Commitment) -> Option, "fetch_unspent_output_by_commitment"); + make_async_fn!(fetch_outputs_with_spend_status_at_tip(hashes: Vec) -> Vec>, "fetch_outputs_with_spend_status_at_tip"); make_async_fn!(fetch_outputs_mined_info(hashes: Vec) -> Vec>, "fetch_outputs_mined_info"); make_async_fn!(fetch_inputs_mined_info(hashes: Vec) -> Vec>, "fetch_inputs_mined_info"); - make_async_fn!(fetch_outputs_in_block_with_spend_state(hash: HashOutput, spend_header: Option) -> Vec<(TransactionOutput, bool)>, "fetch_outputs_in_block_with_spend_state"); + make_async_fn!(fetch_outputs_in_block_with_spend_state(header_hash: HashOutput, spend_status_at_header: Option) -> Vec<(TransactionOutput, bool)>, "fetch_outputs_in_block_with_spend_state"); + + make_async_fn!(fetch_outputs_in_block(header_hash: HashOutput) -> Vec, "fetch_outputs_in_block"); - make_async_fn!(fetch_outputs_in_block(hash: HashOutput) -> Vec, "fetch_outputs_in_block"); + make_async_fn!(fetch_inputs_in_block(header_hash: HashOutput) -> Vec, "fetch_inputs_in_block"); make_async_fn!(utxo_count() -> usize, "utxo_count"); diff --git a/base_layer/core/src/chain_storage/blockchain_backend.rs b/base_layer/core/src/chain_storage/blockchain_backend.rs index d291a136a66..895982a371d 100644 --- a/base_layer/core/src/chain_storage/blockchain_backend.rs +++ b/base_layer/core/src/chain_storage/blockchain_backend.rs @@ -3,7 +3,7 @@ use tari_common_types::{ chain_metadata::ChainMetadata, - types::{Commitment, FixedHash, HashOutput, PublicKey, Signature}, + types::{Commitment, HashOutput, PublicKey, Signature}, }; use super::TemplateRegistrationEntry; @@ -91,7 +91,7 @@ pub trait BlockchainBackend: Send + Sync { fn fetch_outputs_in_block_with_spend_state( &self, header_hash: &HashOutput, - spend_status_at_header: Option, + spend_status_at_header: Option, ) -> Result, ChainStorageError>; /// Fetch a specific output. Returns the output diff --git a/base_layer/core/src/chain_storage/blockchain_database.rs b/base_layer/core/src/chain_storage/blockchain_database.rs index fffa20a2759..21a22e54b88 100644 --- a/base_layer/core/src/chain_storage/blockchain_database.rs +++ b/base_layer/core/src/chain_storage/blockchain_database.rs @@ -383,12 +383,18 @@ where B: BlockchainBackend db.fetch_chain_metadata() } - pub fn fetch_unspent_output_by_commitment( + /// Returns a copy of the current blockchain database metadata + pub fn fetch_output(&self, output_hash: HashOutput) -> Result, ChainStorageError> { + let db = self.db_read_access()?; + db.fetch_output(&output_hash) + } + + pub fn fetch_unspent_output_hash_by_commitment( &self, - commitment: &Commitment, + commitment: Commitment, ) -> Result, ChainStorageError> { let db = self.db_read_access()?; - db.fetch_unspent_output_hash_by_commitment(commitment) + db.fetch_unspent_output_hash_by_commitment(&commitment) } /// Return a list of matching utxos, with each being `None` if not found. If found, the transaction @@ -456,16 +462,21 @@ where B: BlockchainBackend pub fn fetch_outputs_in_block_with_spend_state( &self, - hash: HashOutput, - spend_status_at_header: Option, + header_hash: HashOutput, + spend_status_at_header: Option, ) -> Result, ChainStorageError> { let db = self.db_read_access()?; - db.fetch_outputs_in_block_with_spend_state(&hash, spend_status_at_header) + db.fetch_outputs_in_block_with_spend_state(&header_hash, spend_status_at_header) + } + + pub fn fetch_outputs_in_block(&self, header_hash: HashOutput) -> Result, ChainStorageError> { + let db = self.db_read_access()?; + db.fetch_outputs_in_block(&header_hash) } - pub fn fetch_outputs_in_block(&self, hash: HashOutput) -> Result, ChainStorageError> { + pub fn fetch_inputs_in_block(&self, header_hash: HashOutput) -> Result, ChainStorageError> { let db = self.db_read_access()?; - db.fetch_outputs_in_block(&hash) + db.fetch_inputs_in_block(&header_hash) } /// Returns the number of UTXOs in the current unspent set diff --git a/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs b/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs index 34496f45010..6d4693186d1 100644 --- a/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs +++ b/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs @@ -1830,7 +1830,7 @@ impl BlockchainBackend for LMDBDatabase { fn fetch_outputs_in_block_with_spend_state( &self, header_hash: &HashOutput, - spend_status_at_header: Option, + spend_status_at_header: Option, ) -> Result, ChainStorageError> { let txn = self.read_transaction()?; diff --git a/base_layer/core/src/test_helpers/blockchain.rs b/base_layer/core/src/test_helpers/blockchain.rs index d60d6cfa035..87338619ea1 100644 --- a/base_layer/core/src/test_helpers/blockchain.rs +++ b/base_layer/core/src/test_helpers/blockchain.rs @@ -32,7 +32,7 @@ use tari_common::configuration::Network; use tari_common_types::{ chain_metadata::ChainMetadata, tari_address::TariAddress, - types::{Commitment, FixedHash, HashOutput, PublicKey, Signature}, + types::{Commitment, HashOutput, PublicKey, Signature}, }; use tari_storage::lmdb_store::LMDBConfig; use tari_test_utils::paths::create_temporary_data_path; @@ -277,7 +277,7 @@ impl BlockchainBackend for TempDatabase { fn fetch_outputs_in_block_with_spend_state( &self, header_hash: &HashOutput, - spend_status_at_header: Option, + spend_status_at_header: Option, ) -> Result, ChainStorageError> { self.db .as_ref() diff --git a/base_layer/core/tests/helpers/block_builders.rs b/base_layer/core/tests/helpers/block_builders.rs index b4f11bc64b8..1c9f038df1a 100644 --- a/base_layer/core/tests/helpers/block_builders.rs +++ b/base_layer/core/tests/helpers/block_builders.rs @@ -191,6 +191,7 @@ fn update_genesis_block_mmr_roots(template: NewBlockTemplate) -> Result( consensus: &ConsensusManager, achieved_difficulty: Difficulty, key_manager: &MemoryDbKeyManager, -) -> Result { +) -> Result<(ChainBlock, WalletOutput), ChainStorageError> { append_block_with_coinbase(db, prev_block, txns, consensus, achieved_difficulty, key_manager) .await - .map(|(b, _)| b) + .map(|(b, wo)| (b, wo)) } /// Create a new block with the provided transactions and add a coinbase output. The new MMR roots are calculated, and @@ -577,7 +578,7 @@ pub async fn construct_chained_blocks( let mut prev_block = block0; let mut blocks = Vec::new(); for _i in 0..n { - let block = append_block(db, &prev_block, vec![], consensus, Difficulty::min(), key_manager) + let (block, _) = append_block(db, &prev_block, vec![], consensus, Difficulty::min(), key_manager) .await .unwrap(); prev_block = block.clone(); diff --git a/base_layer/core/tests/helpers/nodes.rs b/base_layer/core/tests/helpers/nodes.rs index 98702db9d8b..6bf2d1d1a60 100644 --- a/base_layer/core/tests/helpers/nodes.rs +++ b/base_layer/core/tests/helpers/nodes.rs @@ -41,7 +41,7 @@ use tari_core::{ LocalNodeCommsInterface, StateMachineHandle, }, - chain_storage::{BlockchainDatabase, Validators}, + chain_storage::{BlockchainDatabase, BlockchainDatabaseConfig, Validators}, consensus::{ConsensusManager, ConsensusManagerBuilder, NetworkConsensus}, mempool::{ service::{LocalMempoolService, MempoolHandle}, @@ -52,7 +52,7 @@ use tari_core::{ OutboundMempoolServiceInterface, }, proof_of_work::randomx_factory::RandomXFactory, - test_helpers::blockchain::{create_store_with_consensus_and_validators, TempDatabase}, + test_helpers::blockchain::{create_store_with_consensus_and_validators_and_config, TempDatabase}, validation::{ mocks::MockValidator, transaction::TransactionChainLinkedValidator, @@ -186,7 +186,11 @@ impl BaseNodeBuilder { /// Build the test base node and start its services. #[allow(clippy::redundant_closure)] - pub async fn start(self, data_path: &str) -> (NodeInterfaces, ConsensusManager) { + pub async fn start( + self, + data_path: &str, + blockchain_db_config: BlockchainDatabaseConfig, + ) -> (NodeInterfaces, ConsensusManager) { let validators = self.validators.unwrap_or_else(|| { Validators::new( MockValidator::new(true), @@ -198,7 +202,11 @@ impl BaseNodeBuilder { let consensus_manager = self .consensus_manager .unwrap_or_else(|| ConsensusManagerBuilder::new(network).build().unwrap()); - let blockchain_db = create_store_with_consensus_and_validators(consensus_manager.clone(), validators); + let blockchain_db = create_store_with_consensus_and_validators_and_config( + consensus_manager.clone(), + validators, + blockchain_db_config, + ); let mempool_validator = TransactionChainLinkedValidator::new(blockchain_db.clone(), consensus_manager.clone()); let mempool = Mempool::new( self.mempool_config.unwrap_or_default(), @@ -244,13 +252,13 @@ pub async fn create_network_with_2_base_nodes(data_path: &str) -> (NodeInterface let (alice_node, consensus_manager) = BaseNodeBuilder::new(network.into()) .with_node_identity(alice_node_identity.clone()) .with_peers(vec![bob_node_identity.clone()]) - .start(data_path) + .start(data_path, BlockchainDatabaseConfig::default()) .await; let (bob_node, consensus_manager) = BaseNodeBuilder::new(network.into()) .with_node_identity(bob_node_identity) .with_peers(vec![alice_node_identity]) .with_consensus_manager(consensus_manager) - .start(data_path) + .start(data_path, BlockchainDatabaseConfig::default()) .await; wait_until_online(&[&alice_node, &bob_node]).await; @@ -263,6 +271,8 @@ pub async fn create_network_with_2_base_nodes(data_path: &str) -> (NodeInterface pub async fn create_network_with_2_base_nodes_with_config>( mempool_service_config: MempoolServiceConfig, liveness_service_config: LivenessConfig, + local_blockchain_db_config: BlockchainDatabaseConfig, + peer_blockchain_db_config: BlockchainDatabaseConfig, p2p_config: P2pConfig, consensus_manager: ConsensusManager, data_path: P, @@ -270,27 +280,33 @@ pub async fn create_network_with_2_base_nodes_with_config>( let alice_node_identity = random_node_identity(); let bob_node_identity = random_node_identity(); let network = Network::LocalNet; - let (alice_node, consensus_manager) = BaseNodeBuilder::new(network.into()) + let (local_node, consensus_manager) = BaseNodeBuilder::new(network.into()) .with_node_identity(alice_node_identity.clone()) .with_mempool_service_config(mempool_service_config.clone()) .with_liveness_service_config(liveness_service_config.clone()) .with_p2p_config(p2p_config.clone()) .with_consensus_manager(consensus_manager) - .start(data_path.as_ref().join("alice").as_os_str().to_str().unwrap()) + .start( + data_path.as_ref().join("alice").as_os_str().to_str().unwrap(), + local_blockchain_db_config, + ) .await; - let (bob_node, consensus_manager) = BaseNodeBuilder::new(network.into()) + let (peer_node, consensus_manager) = BaseNodeBuilder::new(network.into()) .with_node_identity(bob_node_identity) .with_peers(vec![alice_node_identity]) .with_mempool_service_config(mempool_service_config) .with_liveness_service_config(liveness_service_config) .with_p2p_config(p2p_config.clone()) .with_consensus_manager(consensus_manager) - .start(data_path.as_ref().join("bob").as_os_str().to_str().unwrap()) + .start( + data_path.as_ref().join("bob").as_os_str().to_str().unwrap(), + peer_blockchain_db_config, + ) .await; - wait_until_online(&[&alice_node, &bob_node]).await; + wait_until_online(&[&local_node, &peer_node]).await; - (alice_node, bob_node, consensus_manager) + (local_node, peer_node, consensus_manager) } // Creates a network with three Base Nodes where each node in the network knows the other nodes in the network. @@ -303,6 +319,10 @@ pub async fn create_network_with_3_base_nodes( create_network_with_3_base_nodes_with_config( MempoolServiceConfig::default(), LivenessConfig::default(), + BlockchainDatabaseConfig::default(), + BlockchainDatabaseConfig::default(), + BlockchainDatabaseConfig::default(), + P2pConfig::default(), consensus_manager, data_path, ) @@ -314,6 +334,10 @@ pub async fn create_network_with_3_base_nodes( pub async fn create_network_with_3_base_nodes_with_config>( mempool_service_config: MempoolServiceConfig, liveness_service_config: LivenessConfig, + alice_blockchain_db_config: BlockchainDatabaseConfig, + bob_blockchain_db_config: BlockchainDatabaseConfig, + carol_blockchain_db_config: BlockchainDatabaseConfig, + p2p_config: P2pConfig, consensus_manager: ConsensusManager, data_path: P, ) -> (NodeInterfaces, NodeInterfaces, NodeInterfaces, ConsensusManager) { @@ -332,24 +356,36 @@ pub async fn create_network_with_3_base_nodes_with_config>( .with_node_identity(carol_node_identity.clone()) .with_mempool_service_config(mempool_service_config.clone()) .with_liveness_service_config(liveness_service_config.clone()) + .with_p2p_config(p2p_config.clone()) .with_consensus_manager(consensus_manager) - .start(data_path.as_ref().join("carol").as_os_str().to_str().unwrap()) + .start( + data_path.as_ref().join("carol").as_os_str().to_str().unwrap(), + alice_blockchain_db_config, + ) .await; let (bob_node, consensus_manager) = BaseNodeBuilder::new(network.into()) .with_node_identity(bob_node_identity.clone()) .with_peers(vec![carol_node_identity.clone()]) .with_mempool_service_config(mempool_service_config.clone()) .with_liveness_service_config(liveness_service_config.clone()) + .with_p2p_config(p2p_config.clone()) .with_consensus_manager(consensus_manager) - .start(data_path.as_ref().join("bob").as_os_str().to_str().unwrap()) + .start( + data_path.as_ref().join("bob").as_os_str().to_str().unwrap(), + bob_blockchain_db_config, + ) .await; let (alice_node, consensus_manager) = BaseNodeBuilder::new(network.into()) .with_node_identity(alice_node_identity) .with_peers(vec![bob_node_identity, carol_node_identity]) .with_mempool_service_config(mempool_service_config) .with_liveness_service_config(liveness_service_config) + .with_p2p_config(p2p_config.clone()) .with_consensus_manager(consensus_manager) - .start(data_path.as_ref().join("alice").as_os_str().to_str().unwrap()) + .start( + data_path.as_ref().join("alice").as_os_str().to_str().unwrap(), + carol_blockchain_db_config, + ) .await; wait_until_online(&[&alice_node, &bob_node, &carol_node]).await; diff --git a/base_layer/core/tests/helpers/sync.rs b/base_layer/core/tests/helpers/sync.rs index c3af8050318..346e215cc20 100644 --- a/base_layer/core/tests/helpers/sync.rs +++ b/base_layer/core/tests/helpers/sync.rs @@ -28,19 +28,22 @@ use tari_comms::peer_manager::NodeId; use tari_core::{ base_node::{ chain_metadata_service::PeerChainMetadata, - state_machine_service::states::{BlockSync, HeaderSyncState, StateEvent, StatusInfo}, + state_machine_service::states::{BlockSync, HeaderSyncState, HorizonStateSync, StateEvent, StatusInfo}, sync::SyncPeer, BaseNodeStateMachine, BaseNodeStateMachineConfig, SyncValidators, }, blocks::ChainBlock, - chain_storage::DbTransaction, + chain_storage::{BlockchainDatabaseConfig, DbTransaction}, consensus::{ConsensusConstantsBuilder, ConsensusManager, ConsensusManagerBuilder}, mempool::MempoolServiceConfig, proof_of_work::{randomx_factory::RandomXFactory, Difficulty}, test_helpers::blockchain::TempDatabase, - transactions::key_manager::{create_memory_db_key_manager, MemoryDbKeyManager}, + transactions::{ + key_manager::{create_memory_db_key_manager, MemoryDbKeyManager}, + transaction_components::{Transaction, WalletOutput}, + }, validation::mocks::MockValidator, }; use tari_p2p::{services::liveness::LivenessConfig, P2pConfig}; @@ -91,13 +94,32 @@ pub async fn sync_blocks_execute( block_sync.next_event(state_machine).await } -pub async fn create_network_with_local_and_peer_nodes() -> ( +pub fn initialize_horizon_sync(peer_node_interfaces: &NodeInterfaces) -> HorizonStateSync { + HorizonStateSync::from(vec![SyncPeer::from(PeerChainMetadata::new( + peer_node_interfaces.node_identity.node_id().clone(), + peer_node_interfaces.blockchain_db.get_chain_metadata().unwrap(), + None, + ))]) +} + +pub async fn horizon_sync_execute( + state_machine: &mut BaseNodeStateMachine, + horizon_sync: &mut HorizonStateSync, +) -> StateEvent { + horizon_sync.next_event(state_machine).await +} + +pub async fn create_network_with_local_and_peer_nodes( + local_blockchain_db_config: BlockchainDatabaseConfig, + peer_blockchain_db_config: BlockchainDatabaseConfig, +) -> ( BaseNodeStateMachine, NodeInterfaces, NodeInterfaces, ChainBlock, ConsensusManager, MemoryDbKeyManager, + WalletOutput, ) { let network = Network::LocalNet; let temp_dir = tempdir().unwrap(); @@ -105,7 +127,7 @@ pub async fn create_network_with_local_and_peer_nodes() -> ( let consensus_constants = ConsensusConstantsBuilder::new(network) .with_emission_amounts(100_000_000.into(), &EMISSION, 100.into()) .build(); - let (initial_block, _) = create_genesis_block(&consensus_constants, &key_manager).await; + let (initial_block, coinbase_wallet_output) = create_genesis_block(&consensus_constants, &key_manager).await; let consensus_manager = ConsensusManagerBuilder::new(network) .add_consensus_constants(consensus_constants) .with_block(initial_block.clone()) @@ -117,6 +139,8 @@ pub async fn create_network_with_local_and_peer_nodes() -> ( auto_ping_interval: Some(Duration::from_millis(100)), ..Default::default() }, + local_blockchain_db_config, + peer_blockchain_db_config, P2pConfig::default(), consensus_manager, temp_dir.path().to_str().unwrap(), @@ -149,6 +173,7 @@ pub async fn create_network_with_local_and_peer_nodes() -> ( initial_block, consensus_manager, key_manager, + coinbase_wallet_output, ) } @@ -248,38 +273,49 @@ pub fn add_some_existing_blocks(blocks: &[ChainBlock], node: &NodeInterfaces) { } } -// Return blocks added, including the start block +// Return blocks and coinbases added, including the start block and coinbase pub async fn create_and_add_some_blocks( node: &NodeInterfaces, start_block: &ChainBlock, + start_coinbase: &WalletOutput, number_of_blocks: usize, consensus_manager: &ConsensusManager, key_manager: &MemoryDbKeyManager, difficulties: &[u64], -) -> Vec { - if number_of_blocks != difficulties.len() { + transactions: &Option>>, +) -> (Vec, Vec) { + let transactions = if let Some(val) = transactions { + val.clone() + } else { + vec![vec![]; number_of_blocks] + }; + if number_of_blocks != difficulties.len() || number_of_blocks != transactions.len() { panic!( - "Number of blocks ({}) and difficulties length ({}) must be equal", + "Number of blocks ({}), transactions length ({}) and difficulties length ({}) must be equal", number_of_blocks, + transactions.len(), difficulties.len() ); } let mut blocks = vec![start_block.clone()]; + let mut coinbases = vec![start_coinbase.clone()]; let mut prev_block = start_block.clone(); - for item in difficulties.iter().take(number_of_blocks) { - prev_block = append_block( + for (item, txns) in difficulties.iter().zip(transactions.iter()) { + let (new_block, coinbase) = append_block( &node.blockchain_db, &prev_block, - vec![], + txns.clone(), consensus_manager, Difficulty::from_u64(*item).unwrap(), key_manager, ) .await .unwrap(); - blocks.push(prev_block.clone()); + prev_block = new_block.clone(); + blocks.push(new_block.clone()); + coinbases.push(coinbase.clone()); } - blocks + (blocks, coinbases) } // We give some time for the peer to be banned as it is an async process diff --git a/base_layer/core/tests/tests/base_node_rpc.rs b/base_layer/core/tests/tests/base_node_rpc.rs index ec11eff7c3d..3d0f25bc4df 100644 --- a/base_layer/core/tests/tests/base_node_rpc.rs +++ b/base_layer/core/tests/tests/base_node_rpc.rs @@ -41,6 +41,7 @@ use tari_core::{ sync::rpc::BaseNodeSyncRpcService, }, blocks::ChainBlock, + chain_storage::BlockchainDatabaseConfig, consensus::{ConsensusConstantsBuilder, ConsensusManager, ConsensusManagerBuilder, NetworkConsensus}, proto::{ base_node::{FetchMatchingUtxos, Signatures as SignaturesProto, SyncUtxosByBlockRequest}, @@ -94,7 +95,7 @@ async fn setup() -> ( .unwrap(); let (mut base_node, _consensus_manager) = BaseNodeBuilder::new(network) .with_consensus_manager(consensus_manager.clone()) - .start(temp_dir.path().to_str().unwrap()) + .start(temp_dir.path().to_str().unwrap(), BlockchainDatabaseConfig::default()) .await; base_node.mock_base_node_state_machine.publish_status(StatusInfo { bootstrapped: true, diff --git a/base_layer/core/tests/tests/block_sync.rs b/base_layer/core/tests/tests/block_sync.rs index 9011a4b2765..fecbfdb7749 100644 --- a/base_layer/core/tests/tests/block_sync.rs +++ b/base_layer/core/tests/tests/block_sync.rs @@ -20,7 +20,7 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use tari_core::base_node::state_machine_service::states::StateEvent; +use tari_core::{base_node::state_machine_service::states::StateEvent, chain_storage::BlockchainDatabaseConfig}; use crate::helpers::{sync, sync::WhatToDelete}; @@ -29,12 +29,32 @@ async fn test_block_sync_happy_path() { // env_logger::init(); // Set `$env:RUST_LOG = "trace"` // Create the network with Alice node and Bob node - let (mut alice_state_machine, alice_node, bob_node, initial_block, consensus_manager, key_manager) = - sync::create_network_with_local_and_peer_nodes().await; + let ( + mut alice_state_machine, + alice_node, + bob_node, + initial_block, + consensus_manager, + key_manager, + initial_coinbase, + ) = sync::create_network_with_local_and_peer_nodes( + BlockchainDatabaseConfig::default(), + BlockchainDatabaseConfig::default(), + ) + .await; // Add some block to Bob's chain - let _bob_blocks = - sync::create_and_add_some_blocks(&bob_node, &initial_block, 5, &consensus_manager, &key_manager, &[3; 5]).await; + let (_blocks, _coinbases) = sync::create_and_add_some_blocks( + &bob_node, + &initial_block, + &initial_coinbase, + 5, + &consensus_manager, + &key_manager, + &[3; 5], + &None, + ) + .await; assert_eq!(bob_node.blockchain_db.get_height().unwrap(), 5); // Alice attempts header sync @@ -78,17 +98,30 @@ async fn test_block_sync_peer_supplies_no_blocks_with_ban() { // env_logger::init(); // Set `$env:RUST_LOG = "trace"` // Create the network with Alice node and Bob node - let (mut alice_state_machine, alice_node, bob_node, initial_block, consensus_manager, key_manager) = - sync::create_network_with_local_and_peer_nodes().await; + let ( + mut alice_state_machine, + alice_node, + bob_node, + initial_block, + consensus_manager, + key_manager, + initial_coinbase, + ) = sync::create_network_with_local_and_peer_nodes( + BlockchainDatabaseConfig::default(), + BlockchainDatabaseConfig::default(), + ) + .await; // Add some block to Bob's chain - let blocks = sync::create_and_add_some_blocks( + let (blocks, _coinbases) = sync::create_and_add_some_blocks( &bob_node, &initial_block, + &initial_coinbase, 10, &consensus_manager, &key_manager, &[3; 10], + &None, ) .await; assert_eq!(bob_node.blockchain_db.get_height().unwrap(), 10); @@ -129,17 +162,30 @@ async fn test_block_sync_peer_supplies_not_all_blocks_with_ban() { // env_logger::init(); // Set `$env:RUST_LOG = "trace"` // Create the network with Alice node and Bob node - let (mut alice_state_machine, alice_node, bob_node, initial_block, consensus_manager, key_manager) = - sync::create_network_with_local_and_peer_nodes().await; + let ( + mut alice_state_machine, + alice_node, + bob_node, + initial_block, + consensus_manager, + key_manager, + initial_coinbase, + ) = sync::create_network_with_local_and_peer_nodes( + BlockchainDatabaseConfig::default(), + BlockchainDatabaseConfig::default(), + ) + .await; // Add some block to Bob's chain - let blocks = sync::create_and_add_some_blocks( + let (blocks, _coinbases) = sync::create_and_add_some_blocks( &bob_node, &initial_block, + &initial_coinbase, 10, &consensus_manager, &key_manager, &[3; 10], + &None, ) .await; assert_eq!(bob_node.blockchain_db.get_height().unwrap(), 10); diff --git a/base_layer/core/tests/tests/header_sync.rs b/base_layer/core/tests/tests/header_sync.rs index 5745f241259..3f90f9af940 100644 --- a/base_layer/core/tests/tests/header_sync.rs +++ b/base_layer/core/tests/tests/header_sync.rs @@ -20,7 +20,10 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use tari_core::base_node::{state_machine_service::states::StateEvent, sync::HeaderSyncStatus}; +use tari_core::{ + base_node::{state_machine_service::states::StateEvent, sync::HeaderSyncStatus}, + chain_storage::BlockchainDatabaseConfig, +}; use crate::helpers::{sync, sync::WhatToDelete}; @@ -30,12 +33,32 @@ async fn test_header_sync_happy_path() { // env_logger::init(); // Set `$env:RUST_LOG = "trace"` // Create the network with Alice node and Bob node - let (mut alice_state_machine, alice_node, bob_node, initial_block, consensus_manager, key_manager) = - sync::create_network_with_local_and_peer_nodes().await; + let ( + mut alice_state_machine, + alice_node, + bob_node, + initial_block, + consensus_manager, + key_manager, + initial_coinbase, + ) = sync::create_network_with_local_and_peer_nodes( + BlockchainDatabaseConfig::default(), + BlockchainDatabaseConfig::default(), + ) + .await; // Add 1 block to Bob's chain - let bob_blocks = - sync::create_and_add_some_blocks(&bob_node, &initial_block, 1, &consensus_manager, &key_manager, &[3]).await; + let (bob_blocks, bob_coinbases) = sync::create_and_add_some_blocks( + &bob_node, + &initial_block, + &initial_coinbase, + 1, + &consensus_manager, + &key_manager, + &[3], + &None, + ) + .await; assert_eq!(bob_node.blockchain_db.get_height().unwrap(), 1); // Alice attempts header sync, still on the genesys block, headers will be lagging @@ -74,8 +97,17 @@ async fn test_header_sync_happy_path() { } // Bob adds another block - let _bob_blocks = - sync::create_and_add_some_blocks(&bob_node, &bob_blocks[1], 1, &consensus_manager, &key_manager, &[3]).await; + let (_blocks, _coinbases) = sync::create_and_add_some_blocks( + &bob_node, + &bob_blocks[1], + &bob_coinbases[1], + 1, + &consensus_manager, + &key_manager, + &[3], + &None, + ) + .await; assert_eq!(bob_node.blockchain_db.get_height().unwrap(), 2); // Alice attempts header sync, still on the genesys block, headers will be lagging @@ -102,25 +134,60 @@ async fn test_header_sync_with_fork_happy_path() { // env_logger::init(); // Set `$env:RUST_LOG = "trace"` // Create the network with Alice node and Bob node - let (mut alice_state_machine, alice_node, bob_node, initial_block, consensus_manager, key_manager) = - sync::create_network_with_local_and_peer_nodes().await; + let ( + mut alice_state_machine, + alice_node, + bob_node, + initial_block, + consensus_manager, + key_manager, + initial_coinbase, + ) = sync::create_network_with_local_and_peer_nodes( + BlockchainDatabaseConfig::default(), + BlockchainDatabaseConfig::default(), + ) + .await; // Add 1 block to Bob's chain - let bob_blocks = - sync::create_and_add_some_blocks(&bob_node, &initial_block, 1, &consensus_manager, &key_manager, &[3]).await; + let (bob_blocks, bob_coinbases) = sync::create_and_add_some_blocks( + &bob_node, + &initial_block, + &initial_coinbase, + 1, + &consensus_manager, + &key_manager, + &[3], + &None, + ) + .await; assert_eq!(bob_node.blockchain_db.get_height().unwrap(), 1); // Bob adds another block - let bob_blocks = - sync::create_and_add_some_blocks(&bob_node, &bob_blocks[1], 1, &consensus_manager, &key_manager, &[3]).await; + let (bob_blocks, bob_coinbases) = sync::create_and_add_some_blocks( + &bob_node, + &bob_blocks[1], + &bob_coinbases[1], + 1, + &consensus_manager, + &key_manager, + &[3], + &None, + ) + .await; assert_eq!(bob_node.blockchain_db.get_height().unwrap(), 2); // Alice adds 3 (different) blocks, with POW on par with Bob's chain, but with greater height - let _alice_blocks = - sync::create_and_add_some_blocks(&alice_node, &initial_block, 3, &consensus_manager, &key_manager, &[ - 3, 2, 1, - ]) - .await; + let _alice_blocks = sync::create_and_add_some_blocks( + &alice_node, + &initial_block, + &initial_coinbase, + 3, + &consensus_manager, + &key_manager, + &[3, 2, 1], + &None, + ) + .await; assert_eq!(alice_node.blockchain_db.get_height().unwrap(), 3); assert_eq!( alice_node @@ -148,8 +215,17 @@ async fn test_header_sync_with_fork_happy_path() { assert!(!sync::wait_for_is_peer_banned(&alice_node, bob_node.node_identity.node_id(), 1).await); // Bob adds more blocks and draws ahead of Alice - let _bob_blocks = - sync::create_and_add_some_blocks(&bob_node, &bob_blocks[1], 2, &consensus_manager, &key_manager, &[3; 2]).await; + let _blocks = sync::create_and_add_some_blocks( + &bob_node, + &bob_blocks[1], + &bob_coinbases[1], + 2, + &consensus_manager, + &key_manager, + &[3; 2], + &None, + ) + .await; assert_eq!(bob_node.blockchain_db.get_height().unwrap(), 4); // Alice attempts header sync to Bob's chain with higher POW, headers will be lagging with reorg steps @@ -176,17 +252,30 @@ async fn test_header_sync_uneven_headers_and_blocks_happy_path() { // env_logger::init(); // Set `$env:RUST_LOG = "trace"` // Create the network with Alice node and Bob node - let (mut alice_state_machine, alice_node, bob_node, initial_block, consensus_manager, key_manager) = - sync::create_network_with_local_and_peer_nodes().await; + let ( + mut alice_state_machine, + alice_node, + bob_node, + initial_block, + consensus_manager, + key_manager, + initial_coinbase, + ) = sync::create_network_with_local_and_peer_nodes( + BlockchainDatabaseConfig::default(), + BlockchainDatabaseConfig::default(), + ) + .await; // Add blocks and headers to Bob's chain, with more headers than blocks - let blocks = sync::create_and_add_some_blocks( + let (blocks, _coinbases) = sync::create_and_add_some_blocks( &bob_node, &initial_block, + &initial_coinbase, 10, &consensus_manager, &key_manager, &[3; 10], + &None, ) .await; sync::delete_some_blocks_and_headers(&blocks[5..=10], WhatToDelete::Blocks, &bob_node); @@ -224,17 +313,30 @@ async fn test_header_sync_uneven_headers_and_blocks_peer_lies_about_pow_no_ban() // env_logger::init(); // Set `$env:RUST_LOG = "trace"` // Create the network with Alice node and Bob node - let (mut alice_state_machine, alice_node, bob_node, initial_block, consensus_manager, key_manager) = - sync::create_network_with_local_and_peer_nodes().await; + let ( + mut alice_state_machine, + alice_node, + bob_node, + initial_block, + consensus_manager, + key_manager, + initial_coinbase, + ) = sync::create_network_with_local_and_peer_nodes( + BlockchainDatabaseConfig::default(), + BlockchainDatabaseConfig::default(), + ) + .await; // Add blocks and headers to Bob's chain, with more headers than blocks - let blocks = sync::create_and_add_some_blocks( + let (blocks, _coinbases) = sync::create_and_add_some_blocks( &bob_node, &initial_block, + &initial_coinbase, 10, &consensus_manager, &key_manager, &[3; 10], + &None, ) .await; sync::delete_some_blocks_and_headers(&blocks[5..=10], WhatToDelete::Blocks, &bob_node); @@ -287,12 +389,32 @@ async fn test_header_sync_even_headers_and_blocks_peer_lies_about_pow_with_ban() // env_logger::init(); // Set `$env:RUST_LOG = "trace"` // Create the network with Alice node and Bob node - let (mut alice_state_machine, alice_node, bob_node, initial_block, consensus_manager, key_manager) = - sync::create_network_with_local_and_peer_nodes().await; + let ( + mut alice_state_machine, + alice_node, + bob_node, + initial_block, + consensus_manager, + key_manager, + initial_coinbase, + ) = sync::create_network_with_local_and_peer_nodes( + BlockchainDatabaseConfig::default(), + BlockchainDatabaseConfig::default(), + ) + .await; // Add blocks and headers to Bob's chain - let blocks = - sync::create_and_add_some_blocks(&bob_node, &initial_block, 6, &consensus_manager, &key_manager, &[3; 6]).await; + let (blocks, _coinbases) = sync::create_and_add_some_blocks( + &bob_node, + &initial_block, + &initial_coinbase, + 6, + &consensus_manager, + &key_manager, + &[3; 6], + &None, + ) + .await; assert_eq!(bob_node.blockchain_db.get_height().unwrap(), 6); assert_eq!(bob_node.blockchain_db.fetch_last_header().unwrap().height, 6); @@ -333,12 +455,32 @@ async fn test_header_sync_even_headers_and_blocks_peer_metadata_improve_with_reo // env_logger::init(); // Set `$env:RUST_LOG = "trace"` // Create the network with Alice node and Bob node - let (mut alice_state_machine, alice_node, bob_node, initial_block, consensus_manager, key_manager) = - sync::create_network_with_local_and_peer_nodes().await; + let ( + mut alice_state_machine, + alice_node, + bob_node, + initial_block, + consensus_manager, + key_manager, + initial_coinbase, + ) = sync::create_network_with_local_and_peer_nodes( + BlockchainDatabaseConfig::default(), + BlockchainDatabaseConfig::default(), + ) + .await; // Add blocks and headers to Bob's chain - let blocks = - sync::create_and_add_some_blocks(&bob_node, &initial_block, 6, &consensus_manager, &key_manager, &[3; 6]).await; + let (blocks, coinbases) = sync::create_and_add_some_blocks( + &bob_node, + &initial_block, + &initial_coinbase, + 6, + &consensus_manager, + &key_manager, + &[3; 6], + &None, + ) + .await; assert_eq!(bob_node.blockchain_db.get_height().unwrap(), 6); assert_eq!(bob_node.blockchain_db.fetch_last_header().unwrap().height, 6); @@ -351,8 +493,17 @@ async fn test_header_sync_even_headers_and_blocks_peer_metadata_improve_with_reo let mut header_sync = sync::initialize_sync_headers_with_ping_pong_data(&alice_node, &bob_node); // Bob's chain will reorg with improved metadata sync::delete_some_blocks_and_headers(&blocks[4..=6], WhatToDelete::Blocks, &bob_node); - let _blocks = - sync::create_and_add_some_blocks(&bob_node, &blocks[4], 3, &consensus_manager, &key_manager, &[3; 3]).await; + let _blocks = sync::create_and_add_some_blocks( + &bob_node, + &blocks[4], + &coinbases[4], + 3, + &consensus_manager, + &key_manager, + &[3; 3], + &None, + ) + .await; assert_eq!(bob_node.blockchain_db.get_height().unwrap(), 7); assert_eq!(bob_node.blockchain_db.fetch_last_header().unwrap().height, 7); let event = sync::sync_headers_execute(&mut alice_state_machine, &mut header_sync).await; diff --git a/base_layer/core/tests/tests/horizon_sync.rs b/base_layer/core/tests/tests/horizon_sync.rs new file mode 100644 index 00000000000..cef38029c40 --- /dev/null +++ b/base_layer/core/tests/tests/horizon_sync.rs @@ -0,0 +1,257 @@ +// Copyright 2022. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::sync::Arc; + +use tari_core::{ + base_node::state_machine_service::states::StateEvent, + chain_storage::BlockchainDatabaseConfig, + transactions::{tari_amount::T, test_helpers::schema_to_transaction}, + txn_schema, +}; + +use crate::helpers::{sync, sync::WhatToDelete}; + +#[allow(clippy::too_many_lines)] +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_horizon_sync_to_archival_node_happy_path() { + //` cargo test --release --test core_integration_tests + //` tests::horizon_sync::test_horizon_sync_to_archival_node_happy_path > .\target\output.txt 2>&1 + env_logger::init(); // Set `$env:RUST_LOG = "trace"` + + // Create the network with Alice node and Bob node + let pruning_horizon = 5; + let ( + mut alice_state_machine, + alice_node, + bob_node, + initial_block, + consensus_manager, + key_manager, + initial_coinbase, + ) = sync::create_network_with_local_and_peer_nodes( + BlockchainDatabaseConfig { + orphan_storage_capacity: 5, + pruning_horizon, + pruning_interval: 5, + track_reorgs: false, + cleanup_orphans_at_startup: false, + }, + BlockchainDatabaseConfig::default(), + ) + .await; + println!( + "Genesis block - outputs: {}, output SMT size: {}", + initial_block.block().body.outputs().len(), + initial_block.header().output_smt_size + ); + + // Create a blockchain with many blocks + let (blocks_a, coinbases_a) = sync::create_and_add_some_blocks( + &bob_node, + &initial_block, + &initial_coinbase, + 20, + &consensus_manager, + &key_manager, + &[3; 20], + &None, + ) + .await; + // Add a transaction to spend some coinbase outputs + let schema = txn_schema!( + from: vec![coinbases_a[0].clone(), coinbases_a[1].clone(), coinbases_a[2].clone(), coinbases_a[3].clone(), coinbases_a[4].clone()], + to: vec![10 * T; 20] + ); + let (txns, _outputs) = schema_to_transaction(&[schema], &key_manager).await; + let mut txns_block_21 = vec![vec![]; 20]; + txns_block_21[0] = txns + .into_iter() + .map(|t| Arc::try_unwrap(t).unwrap()) + .collect::>(); + // Expand the blockchain with the spend transaction + let (blocks_b, _coinbases) = sync::create_and_add_some_blocks( + &bob_node, + &blocks_a[20], + &coinbases_a[20], + 20, + &consensus_manager, + &key_manager, + &[3; 20], + &Some(txns_block_21), + ) + .await; + assert_eq!(bob_node.blockchain_db.get_height().unwrap(), 40); + assert_eq!(bob_node.blockchain_db.fetch_last_header().unwrap().height, 40); + let blocks = [&blocks_a[..], &blocks_b[1..]].concat(); + + // Now rewind Bob's chain to height 10 + sync::delete_some_blocks_and_headers(&blocks[10..=40], WhatToDelete::BlocksAndHeaders, &bob_node); + assert_eq!(bob_node.blockchain_db.get_height().unwrap(), 10); + assert_eq!(bob_node.blockchain_db.fetch_last_header().unwrap().height, 10); + + // 1. Alice attempts horizon sync without having done header sync + println!("\n1. Alice attempts horizon sync without having done header sync\n"); + let mut horizon_sync = sync::initialize_horizon_sync(&bob_node); + let event = sync::horizon_sync_execute(&mut alice_state_machine, &mut horizon_sync).await; + println!("Event: {:?}", event); + assert_eq!(event, StateEvent::HorizonStateSynchronized); + assert_eq!(alice_node.blockchain_db.get_height().unwrap(), 0); + // Bob will not be banned + assert!(!sync::wait_for_is_peer_banned(&alice_node, bob_node.node_identity.node_id(), 1).await); + + // 2. Alice does header sync (to height 10) + println!("\n2. Alice does header sync (to height 10)\n"); + let mut header_sync = sync::initialize_sync_headers_with_ping_pong_data(&alice_node, &bob_node); + let event = sync::sync_headers_execute(&mut alice_state_machine, &mut header_sync).await; + match event.clone() { + StateEvent::HeadersSynchronized(..) => { + // Good, headers are synced + assert_eq!(alice_node.blockchain_db.fetch_last_header().unwrap().height, 10); + }, + _ => panic!("Expected HeadersSynchronized event"), + } + // Bob will not be banned + assert!(!sync::wait_for_is_peer_banned(&alice_node, bob_node.node_identity.node_id(), 1).await); + + // 3. Alice attempts horizon sync after header sync (to height 5) + println!("\n3. Alice attempts horizon sync after header sync (to height 5)\n"); + let mut horizon_sync = sync::initialize_horizon_sync(&bob_node); + let event = sync::horizon_sync_execute(&mut alice_state_machine, &mut horizon_sync).await; + println!("Event: {:?}", event); + assert_eq!(event, StateEvent::HorizonStateSynchronized); + assert_eq!( + alice_node.blockchain_db.get_height().unwrap(), + alice_node.blockchain_db.fetch_last_header().unwrap().height - pruning_horizon + ); + // Bob will not be banned + assert!(!sync::wait_for_is_peer_banned(&alice_node, bob_node.node_identity.node_id(), 1).await); + + // 4. Alice attempts horizon sync again without any change in the blockchain + println!("\n4. Alice attempts horizon sync again without any change in the blockchain\n"); + let mut horizon_sync = sync::initialize_horizon_sync(&bob_node); + let event = sync::horizon_sync_execute(&mut alice_state_machine, &mut horizon_sync).await; + println!("Event: {:?}", event); + assert_eq!(event, StateEvent::HorizonStateSynchronized); + assert_eq!( + alice_node.blockchain_db.get_height().unwrap(), + alice_node.blockchain_db.fetch_last_header().unwrap().height - pruning_horizon + ); + // Bob will not be banned + assert!(!sync::wait_for_is_peer_banned(&alice_node, bob_node.node_identity.node_id(), 1).await); + + // Give Bob some more blocks + sync::add_some_existing_blocks(&blocks[11..=20], &bob_node); + assert_eq!(bob_node.blockchain_db.get_height().unwrap(), 20); + assert_eq!(bob_node.blockchain_db.fetch_last_header().unwrap().height, 20); + + // 5. Alice does header sync to the new height (to height 20) + println!("\n5. Alice does header sync to the new height (to height 20)\n"); + let mut header_sync = sync::initialize_sync_headers_with_ping_pong_data(&alice_node, &bob_node); + let event = sync::sync_headers_execute(&mut alice_state_machine, &mut header_sync).await; + match event.clone() { + StateEvent::HeadersSynchronized(..) => { + // Good, headers are synced + assert_eq!(alice_node.blockchain_db.fetch_last_header().unwrap().height, 20); + }, + _ => panic!("Expected HeadersSynchronized event"), + } + // Bob will not be banned + assert!(!sync::wait_for_is_peer_banned(&alice_node, bob_node.node_identity.node_id(), 1).await); + + // 6. Alice attempts horizon sync to the new pruning height (to height 15) + println!("\n6. Alice attempts horizon sync to the new pruning height (to height 15)\n"); + let mut horizon_sync = sync::initialize_horizon_sync(&bob_node); + let event = sync::horizon_sync_execute(&mut alice_state_machine, &mut horizon_sync).await; + println!("Event: {:?}", event); + assert_eq!( + alice_node.blockchain_db.get_height().unwrap(), + alice_node.blockchain_db.fetch_last_header().unwrap().height - pruning_horizon + ); + // Bob will not be banned + assert!(!sync::wait_for_is_peer_banned(&alice_node, bob_node.node_identity.node_id(), 1).await); + + // 7. Alice attempts block sync to the tip (to height 20) + println!("\n7. Alice attempts block sync to the tip (to height 20)\n"); + let mut block_sync = sync::initialize_sync_blocks(&bob_node); + let event = sync::sync_blocks_execute(&mut alice_state_machine, &mut block_sync).await; + match event { + StateEvent::BlocksSynchronized => { + // Good, blocks are synced + }, + _ => panic!("Expected BlocksSynchronized event"), + } + assert_eq!( + alice_node.blockchain_db.get_height().unwrap(), + alice_node.blockchain_db.fetch_last_header().unwrap().height + ); + // Bob will not be banned + assert!(!sync::wait_for_is_peer_banned(&alice_node, bob_node.node_identity.node_id(), 1).await); + + // Give Bob some more blocks (containing the block with the spend transaction at height 21) + sync::add_some_existing_blocks(&blocks[21..=40], &bob_node); + assert_eq!(bob_node.blockchain_db.get_height().unwrap(), 40); + assert_eq!(bob_node.blockchain_db.fetch_last_header().unwrap().height, 40); + + // 8. Alice does header sync to the new height (to height 40) + println!("\n8. Alice does header sync to the new height (to height 40)\n"); + let mut header_sync = sync::initialize_sync_headers_with_ping_pong_data(&alice_node, &bob_node); + let event = sync::sync_headers_execute(&mut alice_state_machine, &mut header_sync).await; + match event.clone() { + StateEvent::HeadersSynchronized(..) => { + // Good, headers are synced + assert_eq!(alice_node.blockchain_db.fetch_last_header().unwrap().height, 40); + }, + _ => panic!("Expected HeadersSynchronized event"), + } + // Bob will not be banned + assert!(!sync::wait_for_is_peer_banned(&alice_node, bob_node.node_identity.node_id(), 1).await); + + // 9. Alice attempts horizon sync to the new pruning height (to height 35) + println!("\n9. Alice attempts horizon sync to the new pruning height (to height 35)\n"); + let mut horizon_sync = sync::initialize_horizon_sync(&bob_node); + let event = sync::horizon_sync_execute(&mut alice_state_machine, &mut horizon_sync).await; + println!("Event: {:?}", event); + assert_eq!( + alice_node.blockchain_db.get_height().unwrap(), + alice_node.blockchain_db.fetch_last_header().unwrap().height - pruning_horizon + ); + // Bob will not be banned + assert!(!sync::wait_for_is_peer_banned(&alice_node, bob_node.node_identity.node_id(), 1).await); + + // 10. Alice attempts block sync to the tip (to height 40) + println!("\n10. Alice attempts block sync to the tip (to height 40)\n"); + let mut block_sync = sync::initialize_sync_blocks(&bob_node); + let event = sync::sync_blocks_execute(&mut alice_state_machine, &mut block_sync).await; + match event { + StateEvent::BlocksSynchronized => { + // Good, blocks are synced + }, + _ => panic!("Expected BlocksSynchronized event"), + } + assert_eq!( + alice_node.blockchain_db.get_height().unwrap(), + alice_node.blockchain_db.fetch_last_header().unwrap().height + ); + // Bob will not be banned + assert!(!sync::wait_for_is_peer_banned(&alice_node, bob_node.node_identity.node_id(), 1).await); +} diff --git a/base_layer/core/tests/tests/mempool.rs b/base_layer/core/tests/tests/mempool.rs index 0bb1d7a6d37..ab07ebad9ce 100644 --- a/base_layer/core/tests/tests/mempool.rs +++ b/base_layer/core/tests/tests/mempool.rs @@ -28,6 +28,7 @@ use tari_common_types::types::{Commitment, PrivateKey, PublicKey, Signature}; use tari_comms_dht::domain_message::OutboundDomainMessage; use tari_core::{ base_node::state_machine_service::states::{ListeningInfo, StateInfo, StatusInfo}, + chain_storage::BlockchainDatabaseConfig, consensus::{ConsensusConstantsBuilder, ConsensusManager}, mempool::{Mempool, MempoolConfig, MempoolServiceConfig, TxStorageResponse}, proof_of_work::Difficulty, @@ -1057,6 +1058,10 @@ async fn receive_and_propagate_transaction() { create_network_with_3_base_nodes_with_config( MempoolServiceConfig::default(), LivenessConfig::default(), + BlockchainDatabaseConfig::default(), + BlockchainDatabaseConfig::default(), + BlockchainDatabaseConfig::default(), + P2pConfig::default(), consensus_manager, temp_dir.path().to_str().unwrap(), ) @@ -1725,6 +1730,8 @@ async fn block_event_and_reorg_event_handling() { let (mut alice, mut bob, consensus_manager) = create_network_with_2_base_nodes_with_config( MempoolServiceConfig::default(), LivenessConfig::default(), + BlockchainDatabaseConfig::default(), + BlockchainDatabaseConfig::default(), P2pConfig::default(), consensus_manager, temp_dir.path().to_str().unwrap(), diff --git a/base_layer/core/tests/tests/mod.rs b/base_layer/core/tests/tests/mod.rs index e36b646680d..5e3ade249b6 100644 --- a/base_layer/core/tests/tests/mod.rs +++ b/base_layer/core/tests/tests/mod.rs @@ -27,6 +27,7 @@ mod base_node_rpc; mod block_sync; mod block_validation; mod header_sync; +mod horizon_sync; mod mempool; mod node_comms_interface; mod node_service; diff --git a/base_layer/core/tests/tests/node_comms_interface.rs b/base_layer/core/tests/tests/node_comms_interface.rs index 5f025723561..fc9bfd3f584 100644 --- a/base_layer/core/tests/tests/node_comms_interface.rs +++ b/base_layer/core/tests/tests/node_comms_interface.rs @@ -464,7 +464,7 @@ async fn inbound_fetch_blocks_before_horizon_height() { randomx_factory, ); - let block1 = append_block( + let (block1, _) = append_block( &store, &block0, vec![], @@ -474,7 +474,7 @@ async fn inbound_fetch_blocks_before_horizon_height() { ) .await .unwrap(); - let block2 = append_block( + let (block2, _) = append_block( &store, &block1, vec![], @@ -484,7 +484,7 @@ async fn inbound_fetch_blocks_before_horizon_height() { ) .await .unwrap(); - let block3 = append_block( + let (block3, _) = append_block( &store, &block2, vec![], @@ -494,7 +494,7 @@ async fn inbound_fetch_blocks_before_horizon_height() { ) .await .unwrap(); - let block4 = append_block( + let (block4, _) = append_block( &store, &block3, vec![], @@ -504,7 +504,7 @@ async fn inbound_fetch_blocks_before_horizon_height() { ) .await .unwrap(); - let _block5 = append_block( + let (_block5, _) = append_block( &store, &block4, vec![], diff --git a/base_layer/core/tests/tests/node_service.rs b/base_layer/core/tests/tests/node_service.rs index 9877d99b445..253b8d678cc 100644 --- a/base_layer/core/tests/tests/node_service.rs +++ b/base_layer/core/tests/tests/node_service.rs @@ -31,6 +31,7 @@ use tari_core::{ state_machine_service::states::{ListeningInfo, StateInfo, StatusInfo}, }, blocks::{ChainBlock, NewBlock}, + chain_storage::BlockchainDatabaseConfig, consensus::{ConsensusConstantsBuilder, ConsensusManager, ConsensusManagerBuilder, NetworkConsensus}, mempool::TxStorageResponse, proof_of_work::{randomx_factory::RandomXFactory, Difficulty, PowAlgorithm}, @@ -104,25 +105,37 @@ async fn propagate_and_forward_many_valid_blocks() { let (mut alice_node, rules) = BaseNodeBuilder::new(network.into()) .with_node_identity(alice_node_identity.clone()) .with_consensus_manager(rules) - .start(temp_dir.path().join("alice").to_str().unwrap()) + .start( + temp_dir.path().join("alice").to_str().unwrap(), + BlockchainDatabaseConfig::default(), + ) .await; let (mut bob_node, rules) = BaseNodeBuilder::new(network.into()) .with_node_identity(bob_node_identity.clone()) .with_peers(vec![alice_node_identity]) .with_consensus_manager(rules) - .start(temp_dir.path().join("bob").to_str().unwrap()) + .start( + temp_dir.path().join("bob").to_str().unwrap(), + BlockchainDatabaseConfig::default(), + ) .await; let (mut carol_node, rules) = BaseNodeBuilder::new(network.into()) .with_node_identity(carol_node_identity.clone()) .with_peers(vec![bob_node_identity.clone()]) .with_consensus_manager(rules) - .start(temp_dir.path().join("carol").to_str().unwrap()) + .start( + temp_dir.path().join("carol").to_str().unwrap(), + BlockchainDatabaseConfig::default(), + ) .await; let (mut dan_node, rules) = BaseNodeBuilder::new(network.into()) .with_node_identity(dan_node_identity) .with_peers(vec![carol_node_identity, bob_node_identity]) .with_consensus_manager(rules) - .start(temp_dir.path().join("dan").to_str().unwrap()) + .start( + temp_dir.path().join("dan").to_str().unwrap(), + BlockchainDatabaseConfig::default(), + ) .await; wait_until_online(&[&alice_node, &bob_node, &carol_node, &dan_node]).await; @@ -166,7 +179,8 @@ async fn propagate_and_forward_many_valid_blocks() { &key_manager, ) .await - .unwrap(), + .unwrap() + .0, ); blocks .extend(construct_chained_blocks(&alice_node.blockchain_db, blocks[0].clone(), &rules, 5, &key_manager).await); @@ -210,6 +224,7 @@ async fn propagate_and_forward_many_valid_blocks() { static EMISSION: [u64; 2] = [10, 10]; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] +#[allow(clippy::too_many_lines)] async fn propagate_and_forward_invalid_block_hash() { // Alice will propagate a "made up" block hash to Bob, Bob will request the block from Alice. Alice will not be able // to provide the block and so Bob will not propagate the hash further to Carol. @@ -234,19 +249,28 @@ async fn propagate_and_forward_invalid_block_hash() { let (mut alice_node, rules) = BaseNodeBuilder::new(network.into()) .with_node_identity(alice_node_identity.clone()) .with_consensus_manager(rules) - .start(temp_dir.path().join("alice").to_str().unwrap()) + .start( + temp_dir.path().join("alice").to_str().unwrap(), + BlockchainDatabaseConfig::default(), + ) .await; let (mut bob_node, rules) = BaseNodeBuilder::new(network.into()) .with_node_identity(bob_node_identity.clone()) .with_peers(vec![alice_node_identity]) .with_consensus_manager(rules) - .start(temp_dir.path().join("bob").to_str().unwrap()) + .start( + temp_dir.path().join("bob").to_str().unwrap(), + BlockchainDatabaseConfig::default(), + ) .await; let (mut carol_node, rules) = BaseNodeBuilder::new(network.into()) .with_node_identity(carol_node_identity) .with_peers(vec![bob_node_identity]) .with_consensus_manager(rules) - .start(temp_dir.path().join("carol").to_str().unwrap()) + .start( + temp_dir.path().join("carol").to_str().unwrap(), + BlockchainDatabaseConfig::default(), + ) .await; wait_until_online(&[&alice_node, &bob_node, &carol_node]).await; @@ -276,7 +300,7 @@ async fn propagate_and_forward_invalid_block_hash() { ) .await; let txs = txs.into_iter().map(|tx| (*tx).clone()).collect(); - let block1 = append_block( + let (block1, _) = append_block( &alice_node.blockchain_db, &block0, txs, @@ -361,7 +385,10 @@ async fn propagate_and_forward_invalid_block() { let (mut dan_node, rules) = BaseNodeBuilder::new(network.into()) .with_node_identity(dan_node_identity.clone()) .with_consensus_manager(rules) - .start(temp_dir.path().join("dan").to_str().unwrap()) + .start( + temp_dir.path().join("dan").to_str().unwrap(), + BlockchainDatabaseConfig::default(), + ) .await; let (mut carol_node, rules) = BaseNodeBuilder::new(network.into()) .with_node_identity(carol_node_identity.clone()) @@ -372,20 +399,29 @@ async fn propagate_and_forward_invalid_block() { mock_validator.clone(), stateless_block_validator.clone(), ) - .start(temp_dir.path().join("carol").to_str().unwrap()) + .start( + temp_dir.path().join("carol").to_str().unwrap(), + BlockchainDatabaseConfig::default(), + ) .await; let (mut bob_node, rules) = BaseNodeBuilder::new(network.into()) .with_node_identity(bob_node_identity.clone()) .with_peers(vec![dan_node_identity]) .with_consensus_manager(rules) .with_validators(mock_validator.clone(), mock_validator, stateless_block_validator) - .start(temp_dir.path().join("bob").to_str().unwrap()) + .start( + temp_dir.path().join("bob").to_str().unwrap(), + BlockchainDatabaseConfig::default(), + ) .await; let (mut alice_node, rules) = BaseNodeBuilder::new(network.into()) .with_node_identity(alice_node_identity) .with_peers(vec![bob_node_identity, carol_node_identity]) .with_consensus_manager(rules) - .start(temp_dir.path().join("alice").to_str().unwrap()) + .start( + temp_dir.path().join("alice").to_str().unwrap(), + BlockchainDatabaseConfig::default(), + ) .await; alice_node @@ -423,7 +459,7 @@ async fn propagate_and_forward_invalid_block() { // This is a valid block, however Bob, Carol and Dan's block validator is set to always reject the block // after fetching it. - let block1 = append_block( + let (block1, _) = append_block( &alice_node.blockchain_db, &block0, vec![], @@ -485,14 +521,14 @@ async fn local_get_metadata() { let network = Network::LocalNet; let key_manager = create_memory_db_key_manager(); let (mut node, consensus_manager) = BaseNodeBuilder::new(network.into()) - .start(temp_dir.path().to_str().unwrap()) + .start(temp_dir.path().to_str().unwrap(), BlockchainDatabaseConfig::default()) .await; let db = &node.blockchain_db; let block0 = db.fetch_block(0, true).unwrap().try_into_chain_block().unwrap(); - let block1 = append_block(db, &block0, vec![], &consensus_manager, Difficulty::min(), &key_manager) + let (block1, _) = append_block(db, &block0, vec![], &consensus_manager, Difficulty::min(), &key_manager) .await .unwrap(); - let block2 = append_block(db, &block1, vec![], &consensus_manager, Difficulty::min(), &key_manager) + let (block2, _) = append_block(db, &block1, vec![], &consensus_manager, Difficulty::min(), &key_manager) .await .unwrap(); @@ -517,7 +553,7 @@ async fn local_get_new_block_template_and_get_new_block() { .unwrap(); let (mut node, _rules) = BaseNodeBuilder::new(network.into()) .with_consensus_manager(rules) - .start(temp_dir.path().to_str().unwrap()) + .start(temp_dir.path().to_str().unwrap(), BlockchainDatabaseConfig::default()) .await; let schema = [ @@ -566,7 +602,7 @@ async fn local_get_new_block_with_zero_conf() { HeaderFullValidator::new(rules.clone(), difficulty_calculator), BlockBodyInternalConsistencyValidator::new(rules, true, factories.clone()), ) - .start(temp_dir.path().to_str().unwrap()) + .start(temp_dir.path().to_str().unwrap(), BlockchainDatabaseConfig::default()) .await; let (tx01, tx01_out) = spend_utxos( @@ -652,7 +688,7 @@ async fn local_get_new_block_with_combined_transaction() { HeaderFullValidator::new(rules.clone(), difficulty_calculator), BlockBodyInternalConsistencyValidator::new(rules, true, factories.clone()), ) - .start(temp_dir.path().to_str().unwrap()) + .start(temp_dir.path().to_str().unwrap(), BlockchainDatabaseConfig::default()) .await; let (tx01, tx01_out) = spend_utxos( @@ -718,7 +754,7 @@ async fn local_submit_block() { let network = Network::LocalNet; let key_manager = create_memory_db_key_manager(); let (mut node, consensus_manager) = BaseNodeBuilder::new(network.into()) - .start(temp_dir.path().to_str().unwrap()) + .start(temp_dir.path().to_str().unwrap(), BlockchainDatabaseConfig::default()) .await; let db = &node.blockchain_db; diff --git a/base_layer/core/tests/tests/node_state_machine.rs b/base_layer/core/tests/tests/node_state_machine.rs index 55e68c79de8..2cb66ffaabf 100644 --- a/base_layer/core/tests/tests/node_state_machine.rs +++ b/base_layer/core/tests/tests/node_state_machine.rs @@ -36,6 +36,7 @@ use tari_core::{ }, SyncValidators, }, + chain_storage::BlockchainDatabaseConfig, consensus::{ConsensusConstantsBuilder, ConsensusManagerBuilder}, mempool::MempoolServiceConfig, proof_of_work::{randomx_factory::RandomXFactory, Difficulty}, @@ -87,6 +88,8 @@ async fn test_listening_lagging() { auto_ping_interval: Some(Duration::from_millis(100)), ..Default::default() }, + BlockchainDatabaseConfig::default(), + BlockchainDatabaseConfig::default(), P2pConfig::default(), consensus_manager, temp_dir.path().to_str().unwrap(), @@ -117,7 +120,7 @@ async fn test_listening_lagging() { let mut bob_local_nci = bob_node.local_nci; // Bob Block 1 - no block event - let prev_block = append_block( + let (prev_block, _) = append_block( &bob_db, &prev_block, vec![], @@ -163,6 +166,10 @@ async fn test_listening_initial_fallen_behind() { auto_ping_interval: Some(Duration::from_millis(100)), ..Default::default() }, + BlockchainDatabaseConfig::default(), + BlockchainDatabaseConfig::default(), + BlockchainDatabaseConfig::default(), + P2pConfig::default(), consensus_manager, temp_dir.path().to_str().unwrap(), ) @@ -173,7 +180,7 @@ async fn test_listening_initial_fallen_behind() { let mut bob_local_nci = bob_node.local_nci; // Bob Block 1 - no block event - let prev_block = append_block( + let (prev_block, _) = append_block( &bob_db, &gen_block, vec![], @@ -196,7 +203,7 @@ async fn test_listening_initial_fallen_behind() { let mut charlie_local_nci = charlie_node.local_nci; // charlie Block 1 - no block event - let prev_block = append_block( + let (prev_block, _) = append_block( &charlie_db, &gen_block, vec![], @@ -256,7 +263,7 @@ async fn test_listening_initial_fallen_behind() { async fn test_event_channel() { let temp_dir = tempdir().unwrap(); let (node, consensus_manager) = BaseNodeBuilder::new(Network::Esmeralda.into()) - .start(temp_dir.path().to_str().unwrap()) + .start(temp_dir.path().to_str().unwrap(), BlockchainDatabaseConfig::default()) .await; // let shutdown = Shutdown::new(); let db = create_test_blockchain_db(); diff --git a/base_layer/mmr/src/sparse_merkle_tree/tree.rs b/base_layer/mmr/src/sparse_merkle_tree/tree.rs index caa2b38102f..922b4396b42 100644 --- a/base_layer/mmr/src/sparse_merkle_tree/tree.rs +++ b/base_layer/mmr/src/sparse_merkle_tree/tree.rs @@ -229,7 +229,7 @@ impl> SparseMerkleTree { Ok(result) } - /// Update and existing node at location `key` in the tree, or, if the key does not exist, insert a new node at + /// Update an existing node at location `key` in the tree, or, if the key does not exist, insert a new node at /// location `key` instead. Returns `Ok(UpdateResult::Updated)` if the node was updated, or /// `Ok(UpdateResult::Inserted)` if the node was inserted. ///