Skip to content

Commit

Permalink
Fix horizon sync
Browse files Browse the repository at this point in the history
- Added integration-level horizon sync unit tests
- Fixed horizon sync: initial, re-sync, re-sync after being offline
- Added logic to detect genesys block outputs being spend
  • Loading branch information
hansieodendaal committed Jan 19, 2024
1 parent 7a54cf2 commit 9bae457
Show file tree
Hide file tree
Showing 23 changed files with 1,215 additions and 420 deletions.
16 changes: 10 additions & 6 deletions base_layer/core/src/base_node/proto/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,20 @@ message SyncKernelsRequest {
}

message SyncUtxosRequest {
// Start header hash to sync UTXOs from
bytes start_header_hash = 1;
// End header hash to sync UTXOs to
bytes end_header_hash = 2;
}
message SyncUtxosResponse {
tari.types.TransactionOutput output = 1;
bytes mined_header = 2;
}

message PrunedOutput {
bytes hash = 1;
message SyncUtxosResponse {
oneof txo {
// The unspent transaction output
tari.types.TransactionOutput output = 1;
// If the TXO is spent, the commitment bytes are returned
bytes commitment = 2;
}
bytes mined_header = 3;
}

message SyncUtxosByBlockRequest {
Expand Down
12 changes: 11 additions & 1 deletion base_layer/core/src/base_node/sync/horizon_state_sync/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use tari_comms::{
};
use tari_crypto::errors::RangeProofError;
use tari_mmr::{error::MerkleMountainRangeError, sparse_merkle_tree::SMTError};
use tari_utilities::ByteArrayError;
use thiserror::Error;
use tokio::task;

Expand Down Expand Up @@ -97,6 +98,14 @@ pub enum HorizonSyncError {
PeerNotFound,
#[error("Sparse Merkle Tree error: {0}")]
SMTError(#[from] SMTError),
#[error("ByteArrayError error: {0}")]
ByteArrayError(String),
}

impl From<ByteArrayError> for HorizonSyncError {
fn from(e: ByteArrayError) -> Self {
HorizonSyncError::ByteArrayError(e.to_string())
}
}

impl From<TryFromIntError> for HorizonSyncError {
Expand Down Expand Up @@ -142,7 +151,8 @@ impl HorizonSyncError {
err @ HorizonSyncError::ConversionError(_) |
err @ HorizonSyncError::MerkleMountainRangeError(_) |
err @ HorizonSyncError::FixedHashSizeError(_) |
err @ HorizonSyncError::TransactionError(_) => Some(BanReason {
err @ HorizonSyncError::TransactionError(_) |
err @ HorizonSyncError::ByteArrayError(_) => Some(BanReason {
reason: format!("{}", err),
ban_duration: BanPeriod::Long,
}),
Expand Down
208 changes: 132 additions & 76 deletions base_layer/core/src/base_node/sync/horizon_state_sync/synchronizer.rs

Large diffs are not rendered by default.

137 changes: 93 additions & 44 deletions base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,28 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{convert::TryInto, sync::Arc, time::Instant};
use std::{
convert::{TryFrom, TryInto},
sync::Arc,
time::Instant,
};

use log::*;
use tari_comms::{
peer_manager::NodeId,
protocol::rpc::{Request, RpcStatus, RpcStatusResultExt},
utils,
};
use tari_utilities::hex::Hex;
use tari_utilities::{hex::Hex, ByteArray};
use tokio::{sync::mpsc, task};

#[cfg(feature = "metrics")]
use crate::base_node::metrics;
use crate::{
blocks::BlockHeader,
chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend},
proto::base_node::{SyncUtxosRequest, SyncUtxosResponse},
proto,
proto::base_node::{sync_utxos_response::Txo, SyncUtxosRequest, SyncUtxosResponse},
};

const LOG_TARGET: &str = "c::base_node::sync_rpc::sync_utxo_task";
Expand Down Expand Up @@ -70,7 +75,7 @@ where B: BlockchainBackend + 'static
.fetch_header_by_block_hash(start_hash)
.await
.rpc_status_internal_error(LOG_TARGET)?
.ok_or_else(|| RpcStatus::not_found("Start header hash is was not found"))?;
.ok_or_else(|| RpcStatus::not_found("Start header hash was not found"))?;

let end_hash = msg
.end_header_hash
Expand All @@ -83,7 +88,7 @@ where B: BlockchainBackend + 'static
.fetch_header_by_block_hash(end_hash)
.await
.rpc_status_internal_error(LOG_TARGET)?
.ok_or_else(|| RpcStatus::not_found("End header hash is was not found"))?;
.ok_or_else(|| RpcStatus::not_found("End header hash was not found"))?;
if start_header.height > end_header.height {
return Err(RpcStatus::bad_request(&format!(
"Start header height({}) cannot be greater than the end header height({})",
Expand Down Expand Up @@ -123,78 +128,122 @@ where B: BlockchainBackend + 'static
) -> Result<(), RpcStatus> {
debug!(
target: LOG_TARGET,
"Starting stream task with current_header: {}, end_header: {},",
"Starting stream task with current_header: {}, end_header: {}",
current_header.hash().to_hex(),
end_header.hash().to_hex(),
);
loop {
let timer = Instant::now();
let current_header_hash = current_header.hash();

debug!(
target: LOG_TARGET,
"current header = {} ({})",
"Streaming TXO(s) for block #{} ({})",
current_header.height,
current_header_hash.to_hex()
);

if tx.is_closed() {
debug!(
target: LOG_TARGET,
"Peer '{}' exited UTXO sync session early", self.peer_node_id
);
debug!(target: LOG_TARGET, "Peer '{}' exited TXO sync session early", self.peer_node_id);
break;
}

let outputs_with_statuses = self
.db
.fetch_outputs_in_block_with_spend_state(current_header.hash(), Some(end_header.hash()))
.fetch_outputs_in_block_with_spend_state(current_header_hash, Some(end_header.hash()))
.await
.rpc_status_internal_error(LOG_TARGET)?;
if tx.is_closed() {
debug!(target: LOG_TARGET, "Peer '{}' exited TXO sync session early", self.peer_node_id);
break;
}

let mut outputs = Vec::with_capacity(outputs_with_statuses.len());
for (output, spent) in outputs_with_statuses {
if !spent {
match proto::types::TransactionOutput::try_from(output.clone()) {
Ok(tx_ouput) => {
trace!(
target: LOG_TARGET,
"Unspent TXO (commitment '{}') to peer",
output.commitment.to_hex()
);
outputs.push(Ok(SyncUtxosResponse {
txo: Some(Txo::Output(tx_ouput)),
mined_header: current_header_hash.to_vec(),
}));
},
Err(e) => {
return Err(RpcStatus::general(&format!(
"Output '{}' RPC conversion error ({})",
output.hash().to_hex(),
e
)))
},
}
}
}
debug!(
target: LOG_TARGET,
"Streaming UTXO(s) for block #{}.",
"Adding {} outputs in response for block #{} '{}'", outputs.len(),
current_header.height,
current_header_hash
);

let inputs_in_block = self
.db
.fetch_inputs_in_block(current_header_hash)
.await
.rpc_status_internal_error(LOG_TARGET)?;
if tx.is_closed() {
debug!(
target: LOG_TARGET,
"Peer '{}' exited UTXO sync session early", self.peer_node_id
);
debug!(target: LOG_TARGET, "Peer '{}' exited TXO sync session early", self.peer_node_id);
break;
}

let utxos = outputs_with_statuses
.into_iter()
.filter_map(|(output, spent)| {
// We only send unspent utxos
if spent {
None
} else {
match output.try_into() {
Ok(tx_ouput) => Some(Ok(SyncUtxosResponse {
output: Some(tx_ouput),
mined_header: current_header.hash().to_vec(),
})),
Err(err) => Some(Err(err)),
}
}
})
.collect::<Result<Vec<SyncUtxosResponse>, String>>()
.map_err(|err| RpcStatus::bad_request(&err))?
.into_iter()
.map(Ok);
let mut inputs = Vec::with_capacity(inputs_in_block.len());
for input in inputs_in_block {
let input_commitment = match self.db.fetch_output(input.output_hash()).await {
Ok(Some(o)) => o.output.commitment,
Ok(None) => {
return Err(RpcStatus::general(&format!(
"Mined info for input '{}' not found",
input.output_hash().to_hex()
)))
},
Err(e) => {
return Err(RpcStatus::general(&format!(
"Input '{}' not found ({})",
input.output_hash().to_hex(),
e
)))
},
};
trace!(target: LOG_TARGET, "Spent TXO (commitment '{}') to peer", input_commitment.to_hex());
inputs.push(Ok(SyncUtxosResponse {
txo: Some(Txo::Commitment(input_commitment.as_bytes().to_vec())),
mined_header: current_header_hash.to_vec(),
}));
}
debug!(
target: LOG_TARGET,
"Adding {} inputs in response for block #{} '{}'", inputs.len(),
current_header.height,
current_header_hash
);

let mut txos = Vec::with_capacity(outputs.len() + inputs.len());
txos.append(&mut outputs);
txos.append(&mut inputs);
let txos = txos.into_iter();

// Ensure task stops if the peer prematurely stops their RPC session
let utxos_len = utxos.len();
if utils::mpsc::send_all(tx, utxos).await.is_err() {
let txos_len = txos.len();
if utils::mpsc::send_all(tx, txos).await.is_err() {
break;
}

debug!(
target: LOG_TARGET,
"Streamed {} utxos in {:.2?} (including stream backpressure)",
utxos_len,
"Streamed {} TXOs in {:.2?} (including stream backpressure)",
txos_len,
timer.elapsed()
);

Expand All @@ -217,7 +266,7 @@ where B: BlockchainBackend + 'static

debug!(
target: LOG_TARGET,
"UTXO sync completed to Header hash = {}",
"TXO sync completed to Header hash = {}",
current_header.hash().to_hex()
);

Expand Down
20 changes: 16 additions & 4 deletions base_layer/core/src/chain_storage/async_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use primitive_types::U256;
use rand::{rngs::OsRng, RngCore};
use tari_common_types::{
chain_metadata::ChainMetadata,
types::{BlockHash, Commitment, FixedHash, HashOutput, PublicKey, Signature},
types::{BlockHash, Commitment, HashOutput, PublicKey, Signature},
};
use tari_utilities::epoch_time::EpochTime;

Expand Down Expand Up @@ -59,9 +59,10 @@ use crate::{
},
common::rolling_vec::RollingVec,
proof_of_work::{PowAlgorithm, TargetDifficultyWindow},
transactions::transaction_components::{TransactionKernel, TransactionOutput},
transactions::transaction_components::{TransactionInput, TransactionKernel, TransactionOutput},
OutputSmt,
};

const LOG_TARGET: &str = "c::bn::async_db";

fn trace_log<F, R>(name: &str, f: F) -> R
Expand Down Expand Up @@ -154,15 +155,21 @@ impl<B: BlockchainBackend + 'static> AsyncBlockchainDb<B> {

//---------------------------------- TXO --------------------------------------------//

make_async_fn!(fetch_output(output_hash: HashOutput) -> Option<OutputMinedInfo>, "fetch_output");

make_async_fn!(fetch_unspent_output_hash_by_commitment(commitment: Commitment) -> Option<HashOutput>, "fetch_unspent_output_by_commitment");

make_async_fn!(fetch_outputs_with_spend_status_at_tip(hashes: Vec<HashOutput>) -> Vec<Option<(TransactionOutput, bool)>>, "fetch_outputs_with_spend_status_at_tip");

make_async_fn!(fetch_outputs_mined_info(hashes: Vec<HashOutput>) -> Vec<Option<OutputMinedInfo>>, "fetch_outputs_mined_info");

make_async_fn!(fetch_inputs_mined_info(hashes: Vec<HashOutput>) -> Vec<Option<InputMinedInfo>>, "fetch_inputs_mined_info");

make_async_fn!(fetch_outputs_in_block_with_spend_state(hash: HashOutput, spend_header: Option<FixedHash>) -> Vec<(TransactionOutput, bool)>, "fetch_outputs_in_block_with_spend_state");
make_async_fn!(fetch_outputs_in_block_with_spend_state(header_hash: HashOutput, spend_status_at_header: Option<HashOutput>) -> Vec<(TransactionOutput, bool)>, "fetch_outputs_in_block_with_spend_state");

make_async_fn!(fetch_outputs_in_block(header_hash: HashOutput) -> Vec<TransactionOutput>, "fetch_outputs_in_block");

make_async_fn!(fetch_outputs_in_block(hash: HashOutput) -> Vec<TransactionOutput>, "fetch_outputs_in_block");
make_async_fn!(fetch_inputs_in_block(header_hash: HashOutput) -> Vec<TransactionInput>, "fetch_inputs_in_block");

make_async_fn!(utxo_count() -> usize, "utxo_count");

Expand Down Expand Up @@ -350,6 +357,11 @@ impl<'a, B: BlockchainBackend + 'static> AsyncDbTransaction<'a, B> {
self
}

pub fn prune_output_from_all_dbs(&mut self, output_hash: HashOutput, commitment: Commitment) -> &mut Self {
self.transaction.prune_output_from_all_dbs(output_hash, commitment);
self
}

pub fn update_block_accumulated_data_via_horizon_sync(
&mut self,
header_hash: HashOutput,
Expand Down
4 changes: 2 additions & 2 deletions base_layer/core/src/chain_storage/blockchain_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use tari_common_types::{
chain_metadata::ChainMetadata,
types::{Commitment, FixedHash, HashOutput, PublicKey, Signature},
types::{Commitment, HashOutput, PublicKey, Signature},
};

use super::TemplateRegistrationEntry;
Expand Down Expand Up @@ -91,7 +91,7 @@ pub trait BlockchainBackend: Send + Sync {
fn fetch_outputs_in_block_with_spend_state(
&self,
header_hash: &HashOutput,
spend_status_at_header: Option<FixedHash>,
spend_status_at_header: Option<HashOutput>,
) -> Result<Vec<(TransactionOutput, bool)>, ChainStorageError>;

/// Fetch a specific output. Returns the output
Expand Down
Loading

0 comments on commit 9bae457

Please sign in to comment.