Skip to content

Commit

Permalink
Fix horizon sync
Browse files Browse the repository at this point in the history
- Added integration-level horizon sync unit tests
- Fixed horizon sync: initial, re-sync, re-sync after being offline
  • Loading branch information
hansieodendaal committed Jan 17, 2024
1 parent c7d2d88 commit 43fd36b
Show file tree
Hide file tree
Showing 23 changed files with 943 additions and 222 deletions.
18 changes: 12 additions & 6 deletions base_layer/core/src/base_node/proto/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,22 @@ message SyncKernelsRequest {
}

message SyncUtxosRequest {
// Start header hash to sync UTXOs from
bytes start_header_hash = 1;
// End header hash to sync UTXOs to
bytes end_header_hash = 2;
}
message SyncUtxosResponse {
tari.types.TransactionOutput output = 1;
bytes mined_header = 2;
// Indicate if spent UTXOs should be included, typically not for the initial sync
bool include_spent_txos = 3;
}

message PrunedOutput {
bytes hash = 1;
message SyncUtxosResponse {
oneof txo {
// The unspent transaction output
tari.types.TransactionOutput output = 1;
// If the TXO is spent, the commitment bytes are returned
bytes commitment = 2;
}
bytes mined_header = 3;
}

message SyncUtxosByBlockRequest {
Expand Down
12 changes: 11 additions & 1 deletion base_layer/core/src/base_node/sync/horizon_state_sync/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use tari_comms::{
};
use tari_crypto::errors::RangeProofError;
use tari_mmr::{error::MerkleMountainRangeError, sparse_merkle_tree::SMTError};
use tari_utilities::ByteArrayError;
use thiserror::Error;
use tokio::task;

Expand Down Expand Up @@ -97,6 +98,14 @@ pub enum HorizonSyncError {
PeerNotFound,
#[error("Sparse Merkle Tree error: {0}")]
SMTError(#[from] SMTError),
#[error("ByteArrayError error: {0}")]
ByteArrayError(String),
}

impl From<ByteArrayError> for HorizonSyncError {
fn from(e: ByteArrayError) -> Self {
HorizonSyncError::ByteArrayError(e.to_string())
}
}

impl From<TryFromIntError> for HorizonSyncError {
Expand Down Expand Up @@ -142,7 +151,8 @@ impl HorizonSyncError {
err @ HorizonSyncError::ConversionError(_) |
err @ HorizonSyncError::MerkleMountainRangeError(_) |
err @ HorizonSyncError::FixedHashSizeError(_) |
err @ HorizonSyncError::TransactionError(_) => Some(BanReason {
err @ HorizonSyncError::TransactionError(_) |
err @ HorizonSyncError::ByteArrayError(_) => Some(BanReason {
reason: format!("{}", err),
ban_duration: BanPeriod::Long,
}),
Expand Down
149 changes: 88 additions & 61 deletions base_layer/core/src/base_node/sync/horizon_state_sync/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use crate::{
chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, ChainStorageError, MmrTree},
common::{rolling_avg::RollingAverageTime, BanPeriod},
consensus::ConsensusManager,
proto::base_node::{SyncKernelsRequest, SyncUtxosRequest, SyncUtxosResponse},
proto::base_node::{sync_utxos_response::Txo, SyncKernelsRequest, SyncUtxosRequest, SyncUtxosResponse},
transactions::transaction_components::{
transaction_output::batch_verify_range_proofs,
TransactionKernel,
Expand Down Expand Up @@ -129,7 +129,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
target: LOG_TARGET,
"Preparing database for horizon sync to height #{}", self.horizon_sync_height
);
let header = self.db().fetch_header(self.horizon_sync_height).await?.ok_or_else(|| {
let to_header = self.db().fetch_header(self.horizon_sync_height).await?.ok_or_else(|| {
ChainStorageError::ValueNotFound {
entity: "Header",
field: "height",
Expand All @@ -139,7 +139,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {

let mut latency_increases_counter = 0;
loop {
match self.sync(&header).await {
match self.sync(&to_header).await {
Ok(()) => return Ok(()),
Err(err @ HorizonSyncError::AllSyncPeersExceedLatency) => {
// If we don't have many sync peers to select from, return the listening state and see if we can get
Expand Down Expand Up @@ -167,7 +167,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
}
}

async fn sync(&mut self, header: &BlockHeader) -> Result<(), HorizonSyncError> {
async fn sync(&mut self, to_header: &BlockHeader) -> Result<(), HorizonSyncError> {
let sync_peer_node_ids = self.sync_peers.iter().map(|p| p.node_id()).cloned().collect::<Vec<_>>();
info!(
target: LOG_TARGET,
Expand All @@ -176,7 +176,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
);
let mut latency_counter = 0usize;
for node_id in sync_peer_node_ids {
match self.connect_and_attempt_sync(&node_id, header).await {
match self.connect_and_attempt_sync(&node_id, to_header).await {
Ok(_) => return Ok(()),
// Try another peer
Err(err) => {
Expand Down Expand Up @@ -213,7 +213,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
async fn connect_and_attempt_sync(
&mut self,
node_id: &NodeId,
header: &BlockHeader,
to_header: &BlockHeader,
) -> Result<(), HorizonSyncError> {
let peer_index = self
.get_sync_peer_index(node_id)
Expand Down Expand Up @@ -250,7 +250,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
debug!(target: LOG_TARGET, "Sync peer latency is {:.2?}", latency);
let sync_peer = self.sync_peers[peer_index].clone();

self.begin_sync(sync_peer.clone(), &mut client, header).await?;
self.begin_sync(sync_peer.clone(), &mut client, to_header).await?;
self.finalize_horizon_sync(&sync_peer).await?;

Ok(())
Expand Down Expand Up @@ -328,7 +328,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
"Requesting kernels from {} to {} ({} remaining)",
local_num_kernels,
remote_num_kernels,
remote_num_kernels - local_num_kernels,
remote_num_kernels.saturating_sub(local_num_kernels),
);

let latency = client.get_last_request_latency();
Expand Down Expand Up @@ -374,7 +374,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
}

txn.insert_kernel_via_horizon_sync(kernel, *current_header.hash(), mmr_position);
if mmr_position == current_header.header().kernel_mmr_size - 1 {
if mmr_position == current_header.header().kernel_mmr_size.saturating_sub(1) {
let num_kernels = kernel_hashes.len();
debug!(
target: LOG_TARGET,
Expand Down Expand Up @@ -425,9 +425,9 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
num_kernels,
mmr_position + 1,
end,
end - (mmr_position + 1)
end.saturating_sub(mmr_position + 1)
);
if mmr_position < end - 1 {
if mmr_position < end.saturating_sub(1) {
current_header = db.fetch_chain_header(current_header.height() + 1).await?;
}
}
Expand Down Expand Up @@ -483,42 +483,39 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
let remote_num_outputs = to_header.output_smt_size;
self.num_outputs = remote_num_outputs;

let db = self.db().clone();

let tip_header = db.fetch_tip_header().await?;
let local_num_outputs = tip_header.header().output_smt_size;
let info = HorizonSyncInfo::new(vec![sync_peer.node_id().clone()], HorizonSyncStatus::Outputs {
current: 0,
current: local_num_outputs,
total: self.num_outputs,
sync_peer: sync_peer.clone(),
});
self.hooks.call_on_progress_horizon_hooks(info);

debug!(
target: LOG_TARGET,
"Requesting outputs from {}",
remote_num_outputs,
);
let db = self.db().clone();

let end = remote_num_outputs;
let end_hash = to_header.hash();
let start_hash = db.fetch_chain_header(1).await?;
let gen_block = db.fetch_chain_header(0).await?;

let latency = client.get_last_request_latency();
debug!(
target: LOG_TARGET,
"Initiating output sync with peer `{}` (latency = {}ms)",
"Initiating output sync with peer `{}`, requesting {} outputs, tip_header height `{}`, last_chain_header height `{}` (latency = {}ms)",
sync_peer.node_id(),
latency.unwrap_or_default().as_millis()
remote_num_outputs.saturating_sub(local_num_outputs),
tip_header.height(),
db.fetch_last_chain_header().await?.height(),
latency.unwrap_or_default().as_millis(),
);

let start_chain_header = db.fetch_chain_header(tip_header.height() + 1).await?;
let initial_sync = tip_header.header().height == 0;
let req = SyncUtxosRequest {
start_header_hash: start_hash.hash().to_vec(),
end_header_hash: end_hash.to_vec(),
start_header_hash: start_chain_header.hash().to_vec(),
end_header_hash: to_header.hash().to_vec(),
include_spent_txos: !initial_sync,
};

let mut output_stream = client.sync_utxos(req).await?;

let mut txn = db.write_transaction();
let mut utxo_counter = gen_block.header().output_smt_size;
let mut utxo_counter = local_num_outputs;
let timer = Instant::now();
let mut output_smt = db.fetch_tip_smt().await?;
let mut last_sync_timer = Instant::now();
Expand All @@ -528,16 +525,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
let latency = last_sync_timer.elapsed();
avg_latency.add_sample(latency);
let res: SyncUtxosResponse = response?;
utxo_counter += 1;

if utxo_counter > end {
return Err(HorizonSyncError::IncorrectResponse(
"Peer sent too many outputs".to_string(),
));
}
let output = res
.output
.ok_or_else(|| HorizonSyncError::IncorrectResponse("Peer sent no transaction output data".into()))?;
let output_header = FixedHash::try_from(res.mined_header)
.map_err(|_| HorizonSyncError::IncorrectResponse("Peer sent no mined header".into()))?;
let current_header = self
Expand All @@ -548,28 +536,67 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
HorizonSyncError::IncorrectResponse("Peer sent mined header we do not know of".into())
})?;

let constants = self.rules.consensus_constants(current_header.height).clone();
let output = TransactionOutput::try_from(output).map_err(HorizonSyncError::ConversionError)?;
trace!(
let proto_output = res
.txo
.ok_or_else(|| HorizonSyncError::IncorrectResponse("Peer sent no transaction output data".into()))?;
match proto_output {
Txo::Output(output) => {
utxo_counter += 1;
if initial_sync && utxo_counter > remote_num_outputs {
return Err(HorizonSyncError::IncorrectResponse(
"Peer sent too many outputs".to_string(),
));
}

let constants = self.rules.consensus_constants(current_header.height).clone();
let output = TransactionOutput::try_from(output).map_err(HorizonSyncError::ConversionError)?;
debug!(
target: LOG_TARGET,
"UTXO {} received from sync peer",
"UTXO {} received from sync peer ({} of {})",
output.hash(),
);
helpers::check_tari_script_byte_size(&output.script, constants.max_script_byte_size())?;

batch_verify_range_proofs(&self.prover, &[&output])?;
let smt_key = NodeKey::try_from(output.commitment.as_bytes())?;
let smt_node = ValueHash::try_from(output.smt_hash(current_header.height).as_slice())?;
output_smt.insert(smt_key, smt_node)?;
txn.insert_output_via_horizon_sync(
output,
current_header.hash(),
current_header.height,
current_header.timestamp.as_u64(),
);
utxo_counter.saturating_sub(local_num_outputs),
if initial_sync {
remote_num_outputs.saturating_sub(local_num_outputs).to_string()
} else {
"?".to_string()
},
);
helpers::check_tari_script_byte_size(&output.script, constants.max_script_byte_size())?;

batch_verify_range_proofs(&self.prover, &[&output])?;
let smt_key = NodeKey::try_from(output.commitment.as_bytes())?;
let smt_node = ValueHash::try_from(output.smt_hash(current_header.height).as_slice())?;
output_smt.insert(smt_key, smt_node)?;
txn.insert_output_via_horizon_sync(
output,
current_header.hash(),
current_header.height,
current_header.timestamp.as_u64(),
);

// we have checked the range proof, and we have checked that the linked to header exists.
txn.commit().await?;
// We have checked the range proof, and we have checked that the linked to header exists.
txn.commit().await?;
},
Txo::Commitment(commitment_bytes) => {
if initial_sync {
return Err(HorizonSyncError::IncorrectResponse(
"Peer sent deleted output hash during initial sync".to_string(),
));
}
debug!(
target: LOG_TARGET,
"STXO hash {} received from sync peer ({} of ?)",
self.db().fetch_unspent_output_hash_by_commitment(Commitment::from_canonical_bytes(
commitment_bytes.as_slice(),
)?).await?.ok_or(HorizonSyncError::IncorrectResponse(
"Peer sent unknown commitment hash".into(),
))?,
utxo_counter.saturating_sub(local_num_outputs),
);
let smt_key = NodeKey::try_from(commitment_bytes.as_slice())?;
output_smt.delete(&smt_key)?;
},
}

if utxo_counter % 100 == 0 {
let info = HorizonSyncInfo::new(vec![sync_peer.node_id().clone()], HorizonSyncStatus::Outputs {
Expand All @@ -583,15 +610,15 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
sync_peer.add_sample(last_sync_timer.elapsed());
last_sync_timer = Instant::now();
}
if utxo_counter != end {
if initial_sync && utxo_counter != remote_num_outputs {
return Err(HorizonSyncError::IncorrectResponse(
"Peer did not send enough outputs".to_string(),
));
}
debug!(
target: LOG_TARGET,
"finished syncing UTXOs: {} downloaded in {:.2?}",
end,
remote_num_outputs.saturating_sub(local_num_outputs),
timer.elapsed()
);
let root = FixedHash::try_from(output_smt.hash().as_slice())?;
Expand Down Expand Up @@ -693,7 +720,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
curr_header.height(),
curr_header.header().kernel_mmr_size,
prev_kernel_mmr,
curr_header.header().kernel_mmr_size - 1
curr_header.header().kernel_mmr_size.saturating_sub(1)
);

trace!(target: LOG_TARGET, "Number of utxos returned: {}", utxos.len());
Expand Down
Loading

0 comments on commit 43fd36b

Please sign in to comment.