Skip to content

Commit

Permalink
fix: validation trigger (#6784)
Browse files Browse the repository at this point in the history
Description
---
Lets validation triggers not on new block, but rather on scanning
complete of new block

Motivation and Context
---
We should not trigger a new validation after a block has been detected,
but rather scanned
  • Loading branch information
SWvheerden authored Feb 5, 2025
1 parent 3ad35af commit 2698a58
Show file tree
Hide file tree
Showing 9 changed files with 158 additions and 70 deletions.
3 changes: 3 additions & 0 deletions base_layer/wallet/src/output_manager_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ use crate::{
service::OutputManagerService,
storage::database::{OutputManagerBackend, OutputManagerDatabase},
},
utxo_scanner_service::handle::UtxoScannerHandle,
};

/// The maximum number of transaction inputs that can be created in a single transaction, slightly less than the maximum
Expand Down Expand Up @@ -124,6 +125,7 @@ where
let base_node_service_handle = handles.expect_handle::<BaseNodeServiceHandle>();
let connectivity = handles.expect_handle::<WalletConnectivityHandle>();
let key_manager = handles.expect_handle::<TKeyManagerInterface>();
let utxo_scanner_handle = handles.expect_handle::<UtxoScannerHandle>();

let service = OutputManagerService::new(
config,
Expand All @@ -137,6 +139,7 @@ where
network,
connectivity,
key_manager,
utxo_scanner_handle,
)
.await
.expect("Could not initialize Output Manager Service")
Expand Down
12 changes: 8 additions & 4 deletions base_layer/wallet/src/output_manager_service/resources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@ use tari_common_types::tari_address::TariAddress;
use tari_core::{consensus::ConsensusConstants, transactions::CryptoFactories};
use tari_shutdown::ShutdownSignal;

use crate::output_manager_service::{
config::OutputManagerServiceConfig,
handle::OutputManagerEventSender,
storage::database::OutputManagerDatabase,
use crate::{
output_manager_service::{
config::OutputManagerServiceConfig,
handle::OutputManagerEventSender,
storage::database::OutputManagerDatabase,
},
utxo_scanner_service::handle::UtxoScannerHandle,
};

/// This struct is a collection of the common resources that a async task in the service requires.
Expand All @@ -43,4 +46,5 @@ pub(crate) struct OutputManagerResources<TBackend, TWalletConnectivity, TKeyMana
pub shutdown_signal: ShutdownSignal,
pub interactive_tari_address: TariAddress,
pub one_sided_tari_address: TariAddress,
pub utxo_scanner_handle: UtxoScannerHandle,
}
44 changes: 33 additions & 11 deletions base_layer/wallet/src/output_manager_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ use crate::{
tasks::TxoValidationTask,
TRANSACTION_INPUTS_LIMIT,
},
utxo_scanner_service::handle::{UtxoScannerEvent, UtxoScannerHandle},
};

const LOG_TARGET: &str = "wallet::output_manager_service";
Expand Down Expand Up @@ -154,6 +155,7 @@ where
network: Network,
connectivity: TWalletConnectivity,
key_manager: TKeyManagerInterface,
utxo_scanner_handle: UtxoScannerHandle,
) -> Result<Self, OutputManagerError> {
let view_key = key_manager.get_view_key().await?;
let spend_key = key_manager.get_spend_key().await?;
Expand Down Expand Up @@ -182,6 +184,7 @@ where
shutdown_signal,
one_sided_tari_address,
interactive_tari_address,
utxo_scanner_handle,
};

Ok(Self {
Expand All @@ -205,6 +208,8 @@ where

let mut base_node_service_event_stream = self.base_node_service.get_event_stream();

let mut utxo_scanner_events = self.resources.utxo_scanner_handle.get_event_receiver();

debug!(target: LOG_TARGET, "Output Manager Service started");
// Outputs marked as shorttermencumbered are not yet stored as transactions in the TMS, so lets clear them
self.resources.db.clear_short_term_encumberances()?;
Expand All @@ -216,6 +221,12 @@ where
Err(e) => debug!(target: LOG_TARGET, "Lagging read on base node event broadcast channel: {}", e),
}
},
event = utxo_scanner_events.recv() => {
match event {
Ok(msg) => self.handle_utxo_scanner_service_event(msg),
Err(e) => debug!(target: LOG_TARGET, "Lagging read on utxo scanner event broadcast channel: {}", e),
}
},
Some(request_context) = request_stream.next() => {
trace!(target: LOG_TARGET, "Handling Service API Request");
let (request, reply_tx) = request_context.split();
Expand Down Expand Up @@ -556,6 +567,23 @@ where
.map(OutputManagerResponse::ClaimHtlcTransaction)
}

fn handle_utxo_scanner_service_event(&mut self, event: UtxoScannerEvent) {
match event {
UtxoScannerEvent::ConnectingToBaseNode(_node_id) => {},
UtxoScannerEvent::ConnectedToBaseNode(_node_id, _duration) => {},
UtxoScannerEvent::ConnectionFailedToBaseNode { .. } => {},
UtxoScannerEvent::ScanningRoundFailed { .. } => {},
UtxoScannerEvent::Progress { .. } => {},
UtxoScannerEvent::Completed { .. } => {
let _id = self.validate_outputs().map_err(|e| {
warn!(target: LOG_TARGET, "Error validating txos: {:?}", e);
e
});
},
UtxoScannerEvent::ScanningFailed => {},
}
}

fn handle_base_node_service_event(&mut self, event: Arc<BaseNodeEvent>) {
match (*event).clone() {
BaseNodeEvent::BaseNodeStateChanged(_state) => {
Expand All @@ -566,10 +594,6 @@ where
},
BaseNodeEvent::NewBlockDetected(_hash, height) => {
self.last_seen_tip_height = Some(height);
let _id = self.validate_outputs().map_err(|e| {
warn!(target: LOG_TARGET, "Error validating txos: {:?}", e);
e
});
},
}
}
Expand All @@ -594,7 +618,7 @@ where
let mut base_node_watch = self.resources.connectivity.get_current_base_node_watcher();
let event_publisher = self.resources.event_publisher.clone();
let validation_in_progress = self.validation_in_progress.clone();
let mut base_node_service_event_stream = self.base_node_service.get_event_stream();
let mut utxo_scanner_service_event_stream = self.resources.utxo_scanner_handle.get_event_receiver();
tokio::spawn(async move {
// Note: We do not want the validation task to be queued
let mut _lock = match validation_in_progress.try_lock() {
Expand Down Expand Up @@ -657,12 +681,10 @@ where
is shutting down", id);
return;
},
event = base_node_service_event_stream.recv() => {
if let Ok(bn_event) = event {
if let BaseNodeEvent::NewBlockDetected(_hash, _height) = *bn_event {
debug!(target: LOG_TARGET, "TXO Validation Protocol (Id: {}) resetting because base node height changed", id);
continue 'outer;
}
event = utxo_scanner_service_event_stream.recv() => {
if let Ok(UtxoScannerEvent::Completed{..}) = event {
debug!(target: LOG_TARGET, "TXO Validation Protocol (Id: {}) resetting because base node height changed", id);
continue 'outer;
}
}
_ = base_node_watch.changed() => {
Expand Down
3 changes: 3 additions & 0 deletions base_layer/wallet/src/transaction_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ use crate::{
service::TransactionService,
storage::database::{TransactionBackend, TransactionDatabase},
},
utxo_scanner_service::handle::UtxoScannerHandle,
};

pub mod config;
Expand Down Expand Up @@ -243,6 +244,7 @@ where
let core_key_manager_service = handles.expect_handle::<TKeyManagerInterface>();
let connectivity = handles.expect_handle::<WalletConnectivityHandle>();
let base_node_service_handle = handles.expect_handle::<BaseNodeServiceHandle>();
let utxo_scanner_handle = handles.expect_handle::<UtxoScannerHandle>();

let result = TransactionService::new(
config,
Expand All @@ -266,6 +268,7 @@ where
handles.get_shutdown_signal(),
base_node_service_handle,
wallet_type,
utxo_scanner_handle,
)
.await
.expect("Could not initialize Transaction Manager Service")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ use crate::{

const LOG_TARGET: &str = "wallet::transaction_service::protocols::validation_protocol";

#[derive(Clone)]
pub struct TransactionValidationProtocol<TTransactionBackend, TWalletConnectivity> {
operation_id: OperationId,
db: TransactionDatabase<TTransactionBackend>,
Expand Down
85 changes: 60 additions & 25 deletions base_layer/wallet/src/transaction_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,10 @@ use crate::{
utc::utc_duration_since,
},
util::watch::Watch,
utxo_scanner_service::RECOVERY_KEY,
utxo_scanner_service::{
handle::{UtxoScannerEvent, UtxoScannerHandle},
RECOVERY_KEY,
},
OperationId,
};

Expand Down Expand Up @@ -261,6 +264,7 @@ where
shutdown_signal: ShutdownSignal,
base_node_service: BaseNodeServiceHandle,
wallet_type: Arc<WalletType>,
utxo_scanner_handle: UtxoScannerHandle,
) -> Result<Self, TransactionServiceError> {
// Collect the resources that all protocols will need so that they can be neatly cloned as the protocols are
// spawned.
Expand Down Expand Up @@ -295,6 +299,7 @@ where
shutdown_signal,
consensus_manager: consensus_manager.clone(),
wallet_type,
utxo_scanner_handle,
};
let power_mode = PowerMode::default();
let timeout = match power_mode {
Expand Down Expand Up @@ -388,6 +393,7 @@ where

let mut base_node_service_event_stream = self.base_node_service.get_event_stream();
let mut output_manager_event_stream = self.resources.output_manager_service.get_event_stream();
let mut utxo_scanner_events = self.resources.utxo_scanner_handle.get_event_receiver();

debug!(target: LOG_TARGET, "Transaction Service started");
loop {
Expand All @@ -401,10 +407,16 @@ where
// Base Node Monitoring Service event
event = base_node_service_event_stream.recv() => {
match event {
Ok(msg) => self.handle_base_node_service_event(msg, &mut transaction_validation_protocol_handles).await,
Ok(msg) => self.handle_base_node_service_event(msg).await,
Err(e) => debug!(target: LOG_TARGET, "Lagging read on base node event broadcast channel: {}", e),
};
},
event = utxo_scanner_events.recv() => {
match event {
Ok(msg) => self.handle_utxo_scanner_service_event(msg, &mut transaction_validation_protocol_handles).await,
Err(e) => debug!(target: LOG_TARGET, "Lagging read on utxo scanner event broadcast channel: {}", e),
}
},
//Incoming request
Some(request_context) = request_stream.next() => {
let start = Instant::now();
Expand Down Expand Up @@ -1040,28 +1052,40 @@ where
});
}

async fn handle_base_node_service_event(
&mut self,
event: Arc<BaseNodeEvent>,
transaction_validation_join_handles: &mut FuturesUnordered<
JoinHandle<Result<OperationId, TransactionServiceProtocolError<OperationId>>>,
>,
) {
async fn handle_base_node_service_event(&mut self, event: Arc<BaseNodeEvent>) {
match (*event).clone() {
BaseNodeEvent::BaseNodeStateChanged(_state) => {
trace!(target: LOG_TARGET, "Received BaseNodeStateChanged event, but igoring",);
},
BaseNodeEvent::NewBlockDetected(_hash, height) => {
self.last_seen_tip_height = Some(height);
},
}
}

async fn handle_utxo_scanner_service_event(
&mut self,
event: UtxoScannerEvent,
transaction_validation_join_handles: &mut FuturesUnordered<
JoinHandle<Result<OperationId, TransactionServiceProtocolError<OperationId>>>,
>,
) {
match event {
UtxoScannerEvent::ConnectingToBaseNode(_node_id) => {},
UtxoScannerEvent::ConnectedToBaseNode(_node_id, _duration) => {},
UtxoScannerEvent::ConnectionFailedToBaseNode { .. } => {},
UtxoScannerEvent::ScanningRoundFailed { .. } => {},
UtxoScannerEvent::Progress { .. } => {},
UtxoScannerEvent::Completed { .. } => {
let _operation_id = self
.start_transaction_validation_protocol(transaction_validation_join_handles)
.await
.map_err(|e| {
warn!(target: LOG_TARGET, "Error validating txos: {:?}", e);
e
});

self.last_seen_tip_height = Some(height);
},
UtxoScannerEvent::ScanningFailed => {},
}
}

Expand Down Expand Up @@ -3383,6 +3407,9 @@ where

let mut base_node_watch = self.connectivity().get_current_base_node_watcher();
let validation_in_progress = self.validation_in_progress.clone();

let mut utxo_scanner_service_event_stream = self.resources.utxo_scanner_handle.get_event_receiver();

let join_handle = tokio::spawn(async move {
let mut _lock = validation_in_progress.try_lock().map_err(|_| {
debug!(
Expand All @@ -3391,20 +3418,27 @@ where
);
TransactionServiceProtocolError::new(id, TransactionServiceError::TransactionValidationInProgress)
})?;
let exec_fut = protocol.execute();
tokio::pin!(exec_fut);
loop {
tokio::select! {
result = &mut exec_fut => {
return result;
},
_ = base_node_watch.changed() => {
if let Some(selected_peer) = base_node_watch.borrow().as_ref() {
if selected_peer.get_current_peer().node_id != current_base_node {
debug!(target: LOG_TARGET, "Base node changed, exiting transaction validation protocol");
return Err(TransactionServiceProtocolError::new(id, TransactionServiceError::BaseNodeChanged {
task_name: "transaction validation_protocol",
}));
'outer: loop {
let local_run = protocol.clone();
let exec_fut = local_run.execute();
tokio::pin!(exec_fut);
loop {
tokio::select! {
result = &mut exec_fut => {
return result;
},
event = utxo_scanner_service_event_stream.recv() => {
if let Ok(UtxoScannerEvent::Completed{..}) = event {
debug!(target: LOG_TARGET, "TXO Validation Protocol (Id: {}) resetting because base node height changed", id);
continue 'outer;
}
}
_ = base_node_watch.changed() => {
if let Some(selected_peer) = base_node_watch.borrow().as_ref() {
if selected_peer.get_current_peer().node_id != current_base_node {
debug!(target: LOG_TARGET, "Base node changed, restarting transaction validation protocol");
continue 'outer;
}
}
}
}
Expand Down Expand Up @@ -3843,6 +3877,7 @@ pub struct TransactionServiceResources<TBackend, TWalletConnectivity, TKeyManage
pub config: TransactionServiceConfig,
pub shutdown_signal: ShutdownSignal,
pub wallet_type: Arc<WalletType>,
pub utxo_scanner_handle: UtxoScannerHandle,
}

#[derive(Default, Clone, Copy)]
Expand Down
Loading

0 comments on commit 2698a58

Please sign in to comment.