From d00c76c5a764d3fb922062a0d8b78a476c0e329a Mon Sep 17 00:00:00 2001 From: Hansie Odendaal Date: Mon, 15 Jan 2024 18:20:50 +0200 Subject: [PATCH] wip --- base_layer/core/src/base_node/proto/rpc.proto | 4 +-- .../sync/horizon_state_sync/synchronizer.rs | 32 ++++++++++++++----- .../src/base_node/sync/rpc/sync_utxos_task.rs | 19 ++++++++--- 3 files changed, 40 insertions(+), 15 deletions(-) diff --git a/base_layer/core/src/base_node/proto/rpc.proto b/base_layer/core/src/base_node/proto/rpc.proto index 42905d8ed3..ed28cd24b3 100644 --- a/base_layer/core/src/base_node/proto/rpc.proto +++ b/base_layer/core/src/base_node/proto/rpc.proto @@ -61,8 +61,8 @@ message SyncUtxosRequest { bytes start_header_hash = 1; // End header hash to sync UTXOs to bytes end_header_hash = 2; - // Indicate if pruned UTXOs should be included, typically not for the initial sync - bool include_pruned_utxos = 3; + // Indicate if spent UTXOs should be included, typically not for the initial sync + bool include_spent_utxos = 3; } message SyncUtxosResponse { diff --git a/base_layer/core/src/base_node/sync/horizon_state_sync/synchronizer.rs b/base_layer/core/src/base_node/sync/horizon_state_sync/synchronizer.rs index 4ed70f9f90..a7ffecc64d 100644 --- a/base_layer/core/src/base_node/sync/horizon_state_sync/synchronizer.rs +++ b/base_layer/core/src/base_node/sync/horizon_state_sync/synchronizer.rs @@ -525,13 +525,6 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { let latency = last_sync_timer.elapsed(); avg_latency.add_sample(latency); let res: SyncUtxosResponse = response?; - utxo_counter += 1; - - if initial_sync && utxo_counter > remote_num_outputs { - return Err(HorizonSyncError::IncorrectResponse( - "Peer sent too many outputs".to_string(), - )); - } let output_header = FixedHash::try_from(res.mined_header) .map_err(|_| HorizonSyncError::IncorrectResponse("Peer sent no mined header".into()))?; @@ -548,6 +541,13 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { .ok_or_else(|| HorizonSyncError::IncorrectResponse("Peer sent no transaction output data".into()))?; match proto_output { Utxo::Output(output) => { + utxo_counter += 1; + if initial_sync && utxo_counter > remote_num_outputs { + return Err(HorizonSyncError::IncorrectResponse( + "Peer sent too many outputs".to_string(), + )); + } + let constants = self.rules.consensus_constants(current_header.height).clone(); let output = TransactionOutput::try_from(output).map_err(HorizonSyncError::ConversionError)?; trace!( @@ -578,6 +578,11 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { txn.commit().await?; }, Utxo::Commitment(commitment_bytes) => { + if initial_sync { + return Err(HorizonSyncError::IncorrectResponse( + "Peer sent deleted output hash during initial sync".to_string(), + )); + } let output_hash = self .db() .fetch_unspent_output_hash_by_commitment(Commitment::from_canonical_bytes( @@ -592,9 +597,20 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { .fetch_output(output_hash) .await? .ok_or(HorizonSyncError::ChainStorageError( - ChainStorageError::CorruptedDatabase("Peer sent unknown commitment hash".into()), + ChainStorageError::CorruptedDatabase(format!("UTXO not found by its hash {}", output_hash)), ))? .output; + trace!( + target: LOG_TARGET, + "Spent UTXO hash {} received from sync peer ({} of {})", + output.hash(), + utxo_counter.saturating_sub(local_num_outputs), + if initial_sync { + remote_num_outputs.saturating_sub(local_num_outputs).to_string() + } else { + "n/a".to_string() + }, + ); let smt_key = NodeKey::try_from(commitment_bytes.as_slice())?; output_smt.delete(&smt_key)?; }, diff --git a/base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs b/base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs index db051f6822..4338b51896 100644 --- a/base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs +++ b/base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs @@ -92,7 +92,7 @@ where B: BlockchainBackend + 'static ))); } - let include_pruned_utxos = msg.include_pruned_utxos; + let include_spent_utxos = msg.include_spent_utxos; task::spawn(async move { debug!( @@ -100,7 +100,7 @@ where B: BlockchainBackend + 'static "Starting UTXO stream for peer '{}'", self.peer_node_id ); if let Err(err) = self - .start_streaming(&mut tx, start_header, end_header, include_pruned_utxos) + .start_streaming(&mut tx, start_header, end_header, include_spent_utxos) .await { debug!( @@ -126,13 +126,14 @@ where B: BlockchainBackend + 'static tx: &mut mpsc::Sender>, mut current_header: BlockHeader, end_header: BlockHeader, - include_pruned_utxos: bool, + include_spent_utxos: bool, ) -> Result<(), RpcStatus> { debug!( target: LOG_TARGET, - "Starting stream task with current_header: {}, end_header: {},", + "Starting stream task with current_header: {}, end_header: {}, include spent UTXOs '{}'", current_header.hash().to_hex(), end_header.hash().to_hex(), + include_spent_utxos, ); loop { let timer = Instant::now(); @@ -174,7 +175,11 @@ where B: BlockchainBackend + 'static let utxos = outputs_with_statuses .into_iter() .filter_map(|(output, spent)| { - if spent && include_pruned_utxos { + if spent && include_spent_utxos { + debug!( + target: LOG_TARGET, + "Spent TXO '{}' to peer", output.commitment.to_hex() + ); Some(Ok(SyncUtxosResponse { utxo: Some(proto::base_node::sync_utxos_response::Utxo::Commitment( output.commitment.as_bytes().to_vec(), @@ -185,6 +190,10 @@ where B: BlockchainBackend + 'static None } else { match output.try_into() { + debug!( + target: LOG_TARGET, + "Unspent TXO '{}' to peer", output.commitment.to_hex() + ); Ok(tx_ouput) => Some(Ok(SyncUtxosResponse { utxo: Some(proto::base_node::sync_utxos_response::Utxo::Output(tx_ouput)), mined_header: current_header.hash().to_vec(),