Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
hansieodendaal committed Jan 15, 2024
1 parent 0ed8576 commit d00c76c
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 15 deletions.
4 changes: 2 additions & 2 deletions base_layer/core/src/base_node/proto/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ message SyncUtxosRequest {
bytes start_header_hash = 1;
// End header hash to sync UTXOs to
bytes end_header_hash = 2;
// Indicate if pruned UTXOs should be included, typically not for the initial sync
bool include_pruned_utxos = 3;
// Indicate if spent UTXOs should be included, typically not for the initial sync
bool include_spent_utxos = 3;
}

message SyncUtxosResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,13 +525,6 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
let latency = last_sync_timer.elapsed();
avg_latency.add_sample(latency);
let res: SyncUtxosResponse = response?;
utxo_counter += 1;

if initial_sync && utxo_counter > remote_num_outputs {
return Err(HorizonSyncError::IncorrectResponse(
"Peer sent too many outputs".to_string(),
));
}

let output_header = FixedHash::try_from(res.mined_header)
.map_err(|_| HorizonSyncError::IncorrectResponse("Peer sent no mined header".into()))?;
Expand All @@ -548,6 +541,13 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
.ok_or_else(|| HorizonSyncError::IncorrectResponse("Peer sent no transaction output data".into()))?;
match proto_output {
Utxo::Output(output) => {
utxo_counter += 1;
if initial_sync && utxo_counter > remote_num_outputs {
return Err(HorizonSyncError::IncorrectResponse(
"Peer sent too many outputs".to_string(),
));
}

let constants = self.rules.consensus_constants(current_header.height).clone();
let output = TransactionOutput::try_from(output).map_err(HorizonSyncError::ConversionError)?;
trace!(
Expand Down Expand Up @@ -578,6 +578,11 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
txn.commit().await?;
},
Utxo::Commitment(commitment_bytes) => {
if initial_sync {
return Err(HorizonSyncError::IncorrectResponse(
"Peer sent deleted output hash during initial sync".to_string(),
));
}
let output_hash = self
.db()
.fetch_unspent_output_hash_by_commitment(Commitment::from_canonical_bytes(
Expand All @@ -592,9 +597,20 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
.fetch_output(output_hash)
.await?
.ok_or(HorizonSyncError::ChainStorageError(
ChainStorageError::CorruptedDatabase("Peer sent unknown commitment hash".into()),
ChainStorageError::CorruptedDatabase(format!("UTXO not found by its hash {}", output_hash)),
))?
.output;
trace!(
target: LOG_TARGET,
"Spent UTXO hash {} received from sync peer ({} of {})",
output.hash(),
utxo_counter.saturating_sub(local_num_outputs),
if initial_sync {
remote_num_outputs.saturating_sub(local_num_outputs).to_string()
} else {
"n/a".to_string()
},
);
let smt_key = NodeKey::try_from(commitment_bytes.as_slice())?;
output_smt.delete(&smt_key)?;
},
Expand Down
19 changes: 14 additions & 5 deletions base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,15 @@ where B: BlockchainBackend + 'static
)));
}

let include_pruned_utxos = msg.include_pruned_utxos;
let include_spent_utxos = msg.include_spent_utxos;

task::spawn(async move {
debug!(
target: LOG_TARGET,
"Starting UTXO stream for peer '{}'", self.peer_node_id
);
if let Err(err) = self
.start_streaming(&mut tx, start_header, end_header, include_pruned_utxos)
.start_streaming(&mut tx, start_header, end_header, include_spent_utxos)
.await
{
debug!(
Expand All @@ -126,13 +126,14 @@ where B: BlockchainBackend + 'static
tx: &mut mpsc::Sender<Result<SyncUtxosResponse, RpcStatus>>,
mut current_header: BlockHeader,
end_header: BlockHeader,
include_pruned_utxos: bool,
include_spent_utxos: bool,
) -> Result<(), RpcStatus> {
debug!(
target: LOG_TARGET,
"Starting stream task with current_header: {}, end_header: {},",
"Starting stream task with current_header: {}, end_header: {}, include spent UTXOs '{}'",
current_header.hash().to_hex(),
end_header.hash().to_hex(),
include_spent_utxos,
);
loop {
let timer = Instant::now();
Expand Down Expand Up @@ -174,7 +175,11 @@ where B: BlockchainBackend + 'static
let utxos = outputs_with_statuses
.into_iter()
.filter_map(|(output, spent)| {
if spent && include_pruned_utxos {
if spent && include_spent_utxos {
debug!(
target: LOG_TARGET,
"Spent TXO '{}' to peer", output.commitment.to_hex()
);
Some(Ok(SyncUtxosResponse {
utxo: Some(proto::base_node::sync_utxos_response::Utxo::Commitment(
output.commitment.as_bytes().to_vec(),
Expand All @@ -185,6 +190,10 @@ where B: BlockchainBackend + 'static
None
} else {
match output.try_into() {
debug!(
target: LOG_TARGET,
"Unspent TXO '{}' to peer", output.commitment.to_hex()
);
Ok(tx_ouput) => Some(Ok(SyncUtxosResponse {
utxo: Some(proto::base_node::sync_utxos_response::Utxo::Output(tx_ouput)),
mined_header: current_header.hash().to_vec(),
Expand Down

0 comments on commit d00c76c

Please sign in to comment.