Skip to content

Commit

Permalink
Add horizon sync tests
Browse files Browse the repository at this point in the history
Added integration-level horizon sync unit tests
  • Loading branch information
hansieodendaal committed Jan 12, 2024
1 parent c337c5e commit 1fa90d3
Show file tree
Hide file tree
Showing 12 changed files with 454 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
target: LOG_TARGET,
"Preparing database for horizon sync to height #{}", self.horizon_sync_height
);
let header = self.db().fetch_header(self.horizon_sync_height).await?.ok_or_else(|| {
let to_header = self.db().fetch_header(self.horizon_sync_height).await?.ok_or_else(|| {
ChainStorageError::ValueNotFound {
entity: "Header",
field: "height",
Expand All @@ -139,7 +139,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {

let mut latency_increases_counter = 0;
loop {
match self.sync(&header).await {
match self.sync(&to_header).await {
Ok(()) => return Ok(()),
Err(err @ HorizonSyncError::AllSyncPeersExceedLatency) => {
// If we don't have many sync peers to select from, return the listening state and see if we can get
Expand Down Expand Up @@ -167,7 +167,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
}
}

async fn sync(&mut self, header: &BlockHeader) -> Result<(), HorizonSyncError> {
async fn sync(&mut self, to_header: &BlockHeader) -> Result<(), HorizonSyncError> {
let sync_peer_node_ids = self.sync_peers.iter().map(|p| p.node_id()).cloned().collect::<Vec<_>>();
info!(
target: LOG_TARGET,
Expand All @@ -176,7 +176,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
);
let mut latency_counter = 0usize;
for node_id in sync_peer_node_ids {
match self.connect_and_attempt_sync(&node_id, header).await {
match self.connect_and_attempt_sync(&node_id, to_header).await {
Ok(_) => return Ok(()),
// Try another peer
Err(err) => {
Expand Down Expand Up @@ -213,7 +213,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
async fn connect_and_attempt_sync(
&mut self,
node_id: &NodeId,
header: &BlockHeader,
to_header: &BlockHeader,
) -> Result<(), HorizonSyncError> {
let peer_index = self
.get_sync_peer_index(node_id)
Expand Down Expand Up @@ -250,7 +250,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
debug!(target: LOG_TARGET, "Sync peer latency is {:.2?}", latency);
let sync_peer = self.sync_peers[peer_index].clone();

self.begin_sync(sync_peer.clone(), &mut client, header).await?;
self.begin_sync(sync_peer.clone(), &mut client, to_header).await?;
self.finalize_horizon_sync(&sync_peer).await?;

Ok(())
Expand Down Expand Up @@ -328,7 +328,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
"Requesting kernels from {} to {} ({} remaining)",
local_num_kernels,
remote_num_kernels,
remote_num_kernels - local_num_kernels,
remote_num_kernels.saturating_sub(local_num_kernels),
);

let latency = client.get_last_request_latency();
Expand Down Expand Up @@ -374,7 +374,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
}

txn.insert_kernel_via_horizon_sync(kernel, *current_header.hash(), mmr_position);
if mmr_position == current_header.header().kernel_mmr_size - 1 {
if mmr_position == current_header.header().kernel_mmr_size.saturating_sub(1) {
let num_kernels = kernel_hashes.len();
debug!(
target: LOG_TARGET,
Expand Down Expand Up @@ -425,9 +425,9 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
num_kernels,
mmr_position + 1,
end,
end - (mmr_position + 1)
end.saturating_sub(mmr_position + 1)
);
if mmr_position < end - 1 {
if mmr_position < end.saturating_sub(1) {
current_header = db.fetch_chain_header(current_header.height() + 1).await?;
}
}
Expand Down Expand Up @@ -483,42 +483,46 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
let remote_num_outputs = to_header.output_smt_size;
self.num_outputs = remote_num_outputs;

let db = self.db().clone();

let tip_header = db.fetch_tip_header().await?;
let local_num_outputs = tip_header.header().output_smt_size;
let info = HorizonSyncInfo::new(vec![sync_peer.node_id().clone()], HorizonSyncStatus::Outputs {
current: 0,
current: local_num_outputs,
total: self.num_outputs,
sync_peer: sync_peer.clone(),
});
self.hooks.call_on_progress_horizon_hooks(info);

debug!(
target: LOG_TARGET,
"Requesting outputs from {}",
remote_num_outputs,
);
let db = self.db().clone();

let end = remote_num_outputs;
let end_hash = to_header.hash();
let start_hash = db.fetch_chain_header(1).await?;
let gen_block = db.fetch_chain_header(0).await?;

let latency = client.get_last_request_latency();
debug!(
target: LOG_TARGET,
"Initiating output sync with peer `{}` (latency = {}ms)",
"Initiating output sync with peer `{}`, requesting {} outputs (latency = {}ms)",
sync_peer.node_id(),
remote_num_outputs.saturating_sub(local_num_outputs),
latency.unwrap_or_default().as_millis()
);

// let start_chain_header = db.fetch_chain_header(1).await?;
debug!(
target: LOG_TARGET,
"synchronize_outputs: tip_header height {}",
tip_header.height(),
);
debug!(
target: LOG_TARGET,
"synchronize_outputs: last_chain_header height {}",
db.fetch_last_chain_header().await?.height(),
);
let start_chain_header = db.fetch_chain_header(tip_header.height() + 1).await?;
let req = SyncUtxosRequest {
start_header_hash: start_hash.hash().to_vec(),
end_header_hash: end_hash.to_vec(),
start_header_hash: start_chain_header.hash().to_vec(),
end_header_hash: to_header.hash().to_vec(),
};

let mut output_stream = client.sync_utxos(req).await?;

let mut txn = db.write_transaction();
let mut utxo_counter = gen_block.header().output_smt_size;
let mut utxo_counter = local_num_outputs;
let timer = Instant::now();
let mut output_smt = db.fetch_tip_smt().await?;
let mut last_sync_timer = Instant::now();
Expand All @@ -530,7 +534,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
let res: SyncUtxosResponse = response?;
utxo_counter += 1;

if utxo_counter > end {
if utxo_counter > remote_num_outputs {
return Err(HorizonSyncError::IncorrectResponse(
"Peer sent too many outputs".to_string(),
));
Expand All @@ -551,9 +555,11 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
let constants = self.rules.consensus_constants(current_header.height).clone();
let output = TransactionOutput::try_from(output).map_err(HorizonSyncError::ConversionError)?;
trace!(
target: LOG_TARGET,
"UTXO {} received from sync peer",
output.hash(),
target: LOG_TARGET,
"UTXO {} received from sync peer ({} of {})",
output.hash(),
utxo_counter.saturating_sub(local_num_outputs),
remote_num_outputs.saturating_sub(local_num_outputs),
);
helpers::check_tari_script_byte_size(&output.script, constants.max_script_byte_size())?;

Expand Down Expand Up @@ -583,15 +589,15 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
sync_peer.add_sample(last_sync_timer.elapsed());
last_sync_timer = Instant::now();
}
if utxo_counter != end {
if utxo_counter != remote_num_outputs {
return Err(HorizonSyncError::IncorrectResponse(
"Peer did not send enough outputs".to_string(),
));
}
debug!(
target: LOG_TARGET,
"finished syncing UTXOs: {} downloaded in {:.2?}",
end,
remote_num_outputs.saturating_sub(local_num_outputs),
timer.elapsed()
);
let root = FixedHash::try_from(output_smt.hash().as_slice())?;
Expand Down Expand Up @@ -693,7 +699,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
curr_header.height(),
curr_header.header().kernel_mmr_size,
prev_kernel_mmr,
curr_header.header().kernel_mmr_size - 1
curr_header.header().kernel_mmr_size.saturating_sub(1)
);

trace!(target: LOG_TARGET, "Number of utxos returned: {}", utxos.len());
Expand Down
1 change: 1 addition & 0 deletions base_layer/core/tests/helpers/block_builders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ fn update_genesis_block_mmr_roots(template: NewBlockTemplate) -> Result<Block, C
let smt_node = ValueHash::try_from(output.smt_hash(header.height).as_slice())?;
mmr.insert(smt_key, smt_node).unwrap();
}
header.output_smt_size = body.outputs().len() as u64;

header.output_mr = FixedHash::try_from(mmr.hash().as_slice()).unwrap();
Ok(Block { header, body })
Expand Down
66 changes: 51 additions & 15 deletions base_layer/core/tests/helpers/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use tari_core::{
LocalNodeCommsInterface,
StateMachineHandle,
},
chain_storage::{BlockchainDatabase, Validators},
chain_storage::{BlockchainDatabase, BlockchainDatabaseConfig, Validators},
consensus::{ConsensusManager, ConsensusManagerBuilder, NetworkConsensus},
mempool::{
service::{LocalMempoolService, MempoolHandle},
Expand All @@ -52,7 +52,7 @@ use tari_core::{
OutboundMempoolServiceInterface,
},
proof_of_work::randomx_factory::RandomXFactory,
test_helpers::blockchain::{create_store_with_consensus_and_validators, TempDatabase},
test_helpers::blockchain::{create_store_with_consensus_and_validators_and_config, TempDatabase},
validation::{
mocks::MockValidator,
transaction::TransactionChainLinkedValidator,
Expand Down Expand Up @@ -186,7 +186,11 @@ impl BaseNodeBuilder {

/// Build the test base node and start its services.
#[allow(clippy::redundant_closure)]
pub async fn start(self, data_path: &str) -> (NodeInterfaces, ConsensusManager) {
pub async fn start(
self,
data_path: &str,
blockchain_db_config: BlockchainDatabaseConfig,
) -> (NodeInterfaces, ConsensusManager) {
let validators = self.validators.unwrap_or_else(|| {
Validators::new(
MockValidator::new(true),
Expand All @@ -198,7 +202,11 @@ impl BaseNodeBuilder {
let consensus_manager = self
.consensus_manager
.unwrap_or_else(|| ConsensusManagerBuilder::new(network).build().unwrap());
let blockchain_db = create_store_with_consensus_and_validators(consensus_manager.clone(), validators);
let blockchain_db = create_store_with_consensus_and_validators_and_config(
consensus_manager.clone(),
validators,
blockchain_db_config,
);
let mempool_validator = TransactionChainLinkedValidator::new(blockchain_db.clone(), consensus_manager.clone());
let mempool = Mempool::new(
self.mempool_config.unwrap_or_default(),
Expand Down Expand Up @@ -244,13 +252,13 @@ pub async fn create_network_with_2_base_nodes(data_path: &str) -> (NodeInterface
let (alice_node, consensus_manager) = BaseNodeBuilder::new(network.into())
.with_node_identity(alice_node_identity.clone())
.with_peers(vec![bob_node_identity.clone()])
.start(data_path)
.start(data_path, BlockchainDatabaseConfig::default())
.await;
let (bob_node, consensus_manager) = BaseNodeBuilder::new(network.into())
.with_node_identity(bob_node_identity)
.with_peers(vec![alice_node_identity])
.with_consensus_manager(consensus_manager)
.start(data_path)
.start(data_path, BlockchainDatabaseConfig::default())
.await;

wait_until_online(&[&alice_node, &bob_node]).await;
Expand All @@ -263,34 +271,42 @@ pub async fn create_network_with_2_base_nodes(data_path: &str) -> (NodeInterface
pub async fn create_network_with_2_base_nodes_with_config<P: AsRef<Path>>(
mempool_service_config: MempoolServiceConfig,
liveness_service_config: LivenessConfig,
local_blockchain_db_config: BlockchainDatabaseConfig,
peer_blockchain_db_config: BlockchainDatabaseConfig,
p2p_config: P2pConfig,
consensus_manager: ConsensusManager,
data_path: P,
) -> (NodeInterfaces, NodeInterfaces, ConsensusManager) {
let alice_node_identity = random_node_identity();
let bob_node_identity = random_node_identity();
let network = Network::LocalNet;
let (alice_node, consensus_manager) = BaseNodeBuilder::new(network.into())
let (local_node, consensus_manager) = BaseNodeBuilder::new(network.into())
.with_node_identity(alice_node_identity.clone())
.with_mempool_service_config(mempool_service_config.clone())
.with_liveness_service_config(liveness_service_config.clone())
.with_p2p_config(p2p_config.clone())
.with_consensus_manager(consensus_manager)
.start(data_path.as_ref().join("alice").as_os_str().to_str().unwrap())
.start(
data_path.as_ref().join("alice").as_os_str().to_str().unwrap(),
local_blockchain_db_config,
)
.await;
let (bob_node, consensus_manager) = BaseNodeBuilder::new(network.into())
let (peer_node, consensus_manager) = BaseNodeBuilder::new(network.into())
.with_node_identity(bob_node_identity)
.with_peers(vec![alice_node_identity])
.with_mempool_service_config(mempool_service_config)
.with_liveness_service_config(liveness_service_config)
.with_p2p_config(p2p_config.clone())
.with_consensus_manager(consensus_manager)
.start(data_path.as_ref().join("bob").as_os_str().to_str().unwrap())
.start(
data_path.as_ref().join("bob").as_os_str().to_str().unwrap(),
peer_blockchain_db_config,
)
.await;

wait_until_online(&[&alice_node, &bob_node]).await;
wait_until_online(&[&local_node, &peer_node]).await;

(alice_node, bob_node, consensus_manager)
(local_node, peer_node, consensus_manager)
}

// Creates a network with three Base Nodes where each node in the network knows the other nodes in the network.
Expand All @@ -303,6 +319,10 @@ pub async fn create_network_with_3_base_nodes(
create_network_with_3_base_nodes_with_config(
MempoolServiceConfig::default(),
LivenessConfig::default(),
BlockchainDatabaseConfig::default(),
BlockchainDatabaseConfig::default(),
BlockchainDatabaseConfig::default(),
P2pConfig::default(),
consensus_manager,
data_path,
)
Expand All @@ -314,6 +334,10 @@ pub async fn create_network_with_3_base_nodes(
pub async fn create_network_with_3_base_nodes_with_config<P: AsRef<Path>>(
mempool_service_config: MempoolServiceConfig,
liveness_service_config: LivenessConfig,
alice_blockchain_db_config: BlockchainDatabaseConfig,
bob_blockchain_db_config: BlockchainDatabaseConfig,
carol_blockchain_db_config: BlockchainDatabaseConfig,
p2p_config: P2pConfig,
consensus_manager: ConsensusManager,
data_path: P,
) -> (NodeInterfaces, NodeInterfaces, NodeInterfaces, ConsensusManager) {
Expand All @@ -332,24 +356,36 @@ pub async fn create_network_with_3_base_nodes_with_config<P: AsRef<Path>>(
.with_node_identity(carol_node_identity.clone())
.with_mempool_service_config(mempool_service_config.clone())
.with_liveness_service_config(liveness_service_config.clone())
.with_p2p_config(p2p_config.clone())
.with_consensus_manager(consensus_manager)
.start(data_path.as_ref().join("carol").as_os_str().to_str().unwrap())
.start(
data_path.as_ref().join("carol").as_os_str().to_str().unwrap(),
alice_blockchain_db_config,
)
.await;
let (bob_node, consensus_manager) = BaseNodeBuilder::new(network.into())
.with_node_identity(bob_node_identity.clone())
.with_peers(vec![carol_node_identity.clone()])
.with_mempool_service_config(mempool_service_config.clone())
.with_liveness_service_config(liveness_service_config.clone())
.with_p2p_config(p2p_config.clone())
.with_consensus_manager(consensus_manager)
.start(data_path.as_ref().join("bob").as_os_str().to_str().unwrap())
.start(
data_path.as_ref().join("bob").as_os_str().to_str().unwrap(),
bob_blockchain_db_config,
)
.await;
let (alice_node, consensus_manager) = BaseNodeBuilder::new(network.into())
.with_node_identity(alice_node_identity)
.with_peers(vec![bob_node_identity, carol_node_identity])
.with_mempool_service_config(mempool_service_config)
.with_liveness_service_config(liveness_service_config)
.with_p2p_config(p2p_config.clone())
.with_consensus_manager(consensus_manager)
.start(data_path.as_ref().join("alice").as_os_str().to_str().unwrap())
.start(
data_path.as_ref().join("alice").as_os_str().to_str().unwrap(),
carol_blockchain_db_config,
)
.await;

wait_until_online(&[&alice_node, &bob_node, &carol_node]).await;
Expand Down
Loading

0 comments on commit 1fa90d3

Please sign in to comment.