From 2698a58ab0cd66b63ded602d6b0f605db5981e8c Mon Sep 17 00:00:00 2001 From: SW van Heerden Date: Wed, 5 Feb 2025 16:20:03 +0200 Subject: [PATCH] fix: validation trigger (#6784) Description --- Lets validation triggers not on new block, but rather on scanning complete of new block Motivation and Context --- We should not trigger a new validation after a block has been detected, but rather scanned --- .../wallet/src/output_manager_service/mod.rs | 3 + .../src/output_manager_service/resources.rs | 12 ++- .../src/output_manager_service/service.rs | 44 +++++++--- .../wallet/src/transaction_service/mod.rs | 3 + .../transaction_validation_protocol.rs | 1 + .../wallet/src/transaction_service/service.rs | 85 +++++++++++++------ .../output_manager_service_tests/service.rs | 45 ++++------ .../transaction_service_tests/service.rs | 27 +++++- .../transaction_protocols.rs | 8 ++ 9 files changed, 158 insertions(+), 70 deletions(-) diff --git a/base_layer/wallet/src/output_manager_service/mod.rs b/base_layer/wallet/src/output_manager_service/mod.rs index 24d16bb230..37db44ef45 100644 --- a/base_layer/wallet/src/output_manager_service/mod.rs +++ b/base_layer/wallet/src/output_manager_service/mod.rs @@ -62,6 +62,7 @@ use crate::{ service::OutputManagerService, storage::database::{OutputManagerBackend, OutputManagerDatabase}, }, + utxo_scanner_service::handle::UtxoScannerHandle, }; /// The maximum number of transaction inputs that can be created in a single transaction, slightly less than the maximum @@ -124,6 +125,7 @@ where let base_node_service_handle = handles.expect_handle::(); let connectivity = handles.expect_handle::(); let key_manager = handles.expect_handle::(); + let utxo_scanner_handle = handles.expect_handle::(); let service = OutputManagerService::new( config, @@ -137,6 +139,7 @@ where network, connectivity, key_manager, + utxo_scanner_handle, ) .await .expect("Could not initialize Output Manager Service") diff --git a/base_layer/wallet/src/output_manager_service/resources.rs b/base_layer/wallet/src/output_manager_service/resources.rs index bbac2b117a..9e5ed77445 100644 --- a/base_layer/wallet/src/output_manager_service/resources.rs +++ b/base_layer/wallet/src/output_manager_service/resources.rs @@ -24,10 +24,13 @@ use tari_common_types::tari_address::TariAddress; use tari_core::{consensus::ConsensusConstants, transactions::CryptoFactories}; use tari_shutdown::ShutdownSignal; -use crate::output_manager_service::{ - config::OutputManagerServiceConfig, - handle::OutputManagerEventSender, - storage::database::OutputManagerDatabase, +use crate::{ + output_manager_service::{ + config::OutputManagerServiceConfig, + handle::OutputManagerEventSender, + storage::database::OutputManagerDatabase, + }, + utxo_scanner_service::handle::UtxoScannerHandle, }; /// This struct is a collection of the common resources that a async task in the service requires. @@ -43,4 +46,5 @@ pub(crate) struct OutputManagerResources Result { let view_key = key_manager.get_view_key().await?; let spend_key = key_manager.get_spend_key().await?; @@ -182,6 +184,7 @@ where shutdown_signal, one_sided_tari_address, interactive_tari_address, + utxo_scanner_handle, }; Ok(Self { @@ -205,6 +208,8 @@ where let mut base_node_service_event_stream = self.base_node_service.get_event_stream(); + let mut utxo_scanner_events = self.resources.utxo_scanner_handle.get_event_receiver(); + debug!(target: LOG_TARGET, "Output Manager Service started"); // Outputs marked as shorttermencumbered are not yet stored as transactions in the TMS, so lets clear them self.resources.db.clear_short_term_encumberances()?; @@ -216,6 +221,12 @@ where Err(e) => debug!(target: LOG_TARGET, "Lagging read on base node event broadcast channel: {}", e), } }, + event = utxo_scanner_events.recv() => { + match event { + Ok(msg) => self.handle_utxo_scanner_service_event(msg), + Err(e) => debug!(target: LOG_TARGET, "Lagging read on utxo scanner event broadcast channel: {}", e), + } + }, Some(request_context) = request_stream.next() => { trace!(target: LOG_TARGET, "Handling Service API Request"); let (request, reply_tx) = request_context.split(); @@ -556,6 +567,23 @@ where .map(OutputManagerResponse::ClaimHtlcTransaction) } + fn handle_utxo_scanner_service_event(&mut self, event: UtxoScannerEvent) { + match event { + UtxoScannerEvent::ConnectingToBaseNode(_node_id) => {}, + UtxoScannerEvent::ConnectedToBaseNode(_node_id, _duration) => {}, + UtxoScannerEvent::ConnectionFailedToBaseNode { .. } => {}, + UtxoScannerEvent::ScanningRoundFailed { .. } => {}, + UtxoScannerEvent::Progress { .. } => {}, + UtxoScannerEvent::Completed { .. } => { + let _id = self.validate_outputs().map_err(|e| { + warn!(target: LOG_TARGET, "Error validating txos: {:?}", e); + e + }); + }, + UtxoScannerEvent::ScanningFailed => {}, + } + } + fn handle_base_node_service_event(&mut self, event: Arc) { match (*event).clone() { BaseNodeEvent::BaseNodeStateChanged(_state) => { @@ -566,10 +594,6 @@ where }, BaseNodeEvent::NewBlockDetected(_hash, height) => { self.last_seen_tip_height = Some(height); - let _id = self.validate_outputs().map_err(|e| { - warn!(target: LOG_TARGET, "Error validating txos: {:?}", e); - e - }); }, } } @@ -594,7 +618,7 @@ where let mut base_node_watch = self.resources.connectivity.get_current_base_node_watcher(); let event_publisher = self.resources.event_publisher.clone(); let validation_in_progress = self.validation_in_progress.clone(); - let mut base_node_service_event_stream = self.base_node_service.get_event_stream(); + let mut utxo_scanner_service_event_stream = self.resources.utxo_scanner_handle.get_event_receiver(); tokio::spawn(async move { // Note: We do not want the validation task to be queued let mut _lock = match validation_in_progress.try_lock() { @@ -657,12 +681,10 @@ where is shutting down", id); return; }, - event = base_node_service_event_stream.recv() => { - if let Ok(bn_event) = event { - if let BaseNodeEvent::NewBlockDetected(_hash, _height) = *bn_event { - debug!(target: LOG_TARGET, "TXO Validation Protocol (Id: {}) resetting because base node height changed", id); - continue 'outer; - } + event = utxo_scanner_service_event_stream.recv() => { + if let Ok(UtxoScannerEvent::Completed{..}) = event { + debug!(target: LOG_TARGET, "TXO Validation Protocol (Id: {}) resetting because base node height changed", id); + continue 'outer; } } _ = base_node_watch.changed() => { diff --git a/base_layer/wallet/src/transaction_service/mod.rs b/base_layer/wallet/src/transaction_service/mod.rs index f86c0bc3bb..c440db0833 100644 --- a/base_layer/wallet/src/transaction_service/mod.rs +++ b/base_layer/wallet/src/transaction_service/mod.rs @@ -63,6 +63,7 @@ use crate::{ service::TransactionService, storage::database::{TransactionBackend, TransactionDatabase}, }, + utxo_scanner_service::handle::UtxoScannerHandle, }; pub mod config; @@ -243,6 +244,7 @@ where let core_key_manager_service = handles.expect_handle::(); let connectivity = handles.expect_handle::(); let base_node_service_handle = handles.expect_handle::(); + let utxo_scanner_handle = handles.expect_handle::(); let result = TransactionService::new( config, @@ -266,6 +268,7 @@ where handles.get_shutdown_signal(), base_node_service_handle, wallet_type, + utxo_scanner_handle, ) .await .expect("Could not initialize Transaction Manager Service") diff --git a/base_layer/wallet/src/transaction_service/protocols/transaction_validation_protocol.rs b/base_layer/wallet/src/transaction_service/protocols/transaction_validation_protocol.rs index 8c3357a8a9..c79d8054d7 100644 --- a/base_layer/wallet/src/transaction_service/protocols/transaction_validation_protocol.rs +++ b/base_layer/wallet/src/transaction_service/protocols/transaction_validation_protocol.rs @@ -58,6 +58,7 @@ use crate::{ const LOG_TARGET: &str = "wallet::transaction_service::protocols::validation_protocol"; +#[derive(Clone)] pub struct TransactionValidationProtocol { operation_id: OperationId, db: TransactionDatabase, diff --git a/base_layer/wallet/src/transaction_service/service.rs b/base_layer/wallet/src/transaction_service/service.rs index 4aadfcabf8..09e95ae8ec 100644 --- a/base_layer/wallet/src/transaction_service/service.rs +++ b/base_layer/wallet/src/transaction_service/service.rs @@ -145,7 +145,10 @@ use crate::{ utc::utc_duration_since, }, util::watch::Watch, - utxo_scanner_service::RECOVERY_KEY, + utxo_scanner_service::{ + handle::{UtxoScannerEvent, UtxoScannerHandle}, + RECOVERY_KEY, + }, OperationId, }; @@ -261,6 +264,7 @@ where shutdown_signal: ShutdownSignal, base_node_service: BaseNodeServiceHandle, wallet_type: Arc, + utxo_scanner_handle: UtxoScannerHandle, ) -> Result { // Collect the resources that all protocols will need so that they can be neatly cloned as the protocols are // spawned. @@ -295,6 +299,7 @@ where shutdown_signal, consensus_manager: consensus_manager.clone(), wallet_type, + utxo_scanner_handle, }; let power_mode = PowerMode::default(); let timeout = match power_mode { @@ -388,6 +393,7 @@ where let mut base_node_service_event_stream = self.base_node_service.get_event_stream(); let mut output_manager_event_stream = self.resources.output_manager_service.get_event_stream(); + let mut utxo_scanner_events = self.resources.utxo_scanner_handle.get_event_receiver(); debug!(target: LOG_TARGET, "Transaction Service started"); loop { @@ -401,10 +407,16 @@ where // Base Node Monitoring Service event event = base_node_service_event_stream.recv() => { match event { - Ok(msg) => self.handle_base_node_service_event(msg, &mut transaction_validation_protocol_handles).await, + Ok(msg) => self.handle_base_node_service_event(msg).await, Err(e) => debug!(target: LOG_TARGET, "Lagging read on base node event broadcast channel: {}", e), }; }, + event = utxo_scanner_events.recv() => { + match event { + Ok(msg) => self.handle_utxo_scanner_service_event(msg, &mut transaction_validation_protocol_handles).await, + Err(e) => debug!(target: LOG_TARGET, "Lagging read on utxo scanner event broadcast channel: {}", e), + } + }, //Incoming request Some(request_context) = request_stream.next() => { let start = Instant::now(); @@ -1040,18 +1052,31 @@ where }); } - async fn handle_base_node_service_event( - &mut self, - event: Arc, - transaction_validation_join_handles: &mut FuturesUnordered< - JoinHandle>>, - >, - ) { + async fn handle_base_node_service_event(&mut self, event: Arc) { match (*event).clone() { BaseNodeEvent::BaseNodeStateChanged(_state) => { trace!(target: LOG_TARGET, "Received BaseNodeStateChanged event, but igoring",); }, BaseNodeEvent::NewBlockDetected(_hash, height) => { + self.last_seen_tip_height = Some(height); + }, + } + } + + async fn handle_utxo_scanner_service_event( + &mut self, + event: UtxoScannerEvent, + transaction_validation_join_handles: &mut FuturesUnordered< + JoinHandle>>, + >, + ) { + match event { + UtxoScannerEvent::ConnectingToBaseNode(_node_id) => {}, + UtxoScannerEvent::ConnectedToBaseNode(_node_id, _duration) => {}, + UtxoScannerEvent::ConnectionFailedToBaseNode { .. } => {}, + UtxoScannerEvent::ScanningRoundFailed { .. } => {}, + UtxoScannerEvent::Progress { .. } => {}, + UtxoScannerEvent::Completed { .. } => { let _operation_id = self .start_transaction_validation_protocol(transaction_validation_join_handles) .await @@ -1059,9 +1084,8 @@ where warn!(target: LOG_TARGET, "Error validating txos: {:?}", e); e }); - - self.last_seen_tip_height = Some(height); }, + UtxoScannerEvent::ScanningFailed => {}, } } @@ -3383,6 +3407,9 @@ where let mut base_node_watch = self.connectivity().get_current_base_node_watcher(); let validation_in_progress = self.validation_in_progress.clone(); + + let mut utxo_scanner_service_event_stream = self.resources.utxo_scanner_handle.get_event_receiver(); + let join_handle = tokio::spawn(async move { let mut _lock = validation_in_progress.try_lock().map_err(|_| { debug!( @@ -3391,20 +3418,27 @@ where ); TransactionServiceProtocolError::new(id, TransactionServiceError::TransactionValidationInProgress) })?; - let exec_fut = protocol.execute(); - tokio::pin!(exec_fut); - loop { - tokio::select! { - result = &mut exec_fut => { - return result; - }, - _ = base_node_watch.changed() => { - if let Some(selected_peer) = base_node_watch.borrow().as_ref() { - if selected_peer.get_current_peer().node_id != current_base_node { - debug!(target: LOG_TARGET, "Base node changed, exiting transaction validation protocol"); - return Err(TransactionServiceProtocolError::new(id, TransactionServiceError::BaseNodeChanged { - task_name: "transaction validation_protocol", - })); + 'outer: loop { + let local_run = protocol.clone(); + let exec_fut = local_run.execute(); + tokio::pin!(exec_fut); + loop { + tokio::select! { + result = &mut exec_fut => { + return result; + }, + event = utxo_scanner_service_event_stream.recv() => { + if let Ok(UtxoScannerEvent::Completed{..}) = event { + debug!(target: LOG_TARGET, "TXO Validation Protocol (Id: {}) resetting because base node height changed", id); + continue 'outer; + } + } + _ = base_node_watch.changed() => { + if let Some(selected_peer) = base_node_watch.borrow().as_ref() { + if selected_peer.get_current_peer().node_id != current_base_node { + debug!(target: LOG_TARGET, "Base node changed, restarting transaction validation protocol"); + continue 'outer; + } } } } @@ -3843,6 +3877,7 @@ pub struct TransactionServiceResources, + pub utxo_scanner_handle: UtxoScannerHandle, } #[derive(Default, Clone, Copy)] diff --git a/base_layer/wallet/tests/output_manager_service_tests/service.rs b/base_layer/wallet/tests/output_manager_service_tests/service.rs index 4ac0e4eb92..c806aaeebe 100644 --- a/base_layer/wallet/tests/output_manager_service_tests/service.rs +++ b/base_layer/wallet/tests/output_manager_service_tests/service.rs @@ -40,6 +40,8 @@ use minotari_wallet::{ }, test_utils::create_consensus_constants, transaction_service::handle::TransactionServiceHandle, + util::watch::Watch, + utxo_scanner_service::handle::UtxoScannerHandle, }; use rand::{rngs::OsRng, RngCore}; use tari_common::configuration::Network; @@ -92,7 +94,6 @@ use crate::support::{ data::get_temp_sqlite_database_connection, utils::{make_input, make_input_with_features}, }; - fn default_features_and_scripts_size_byte_size() -> std::io::Result { Ok(TransactionWeight::latest().round_up_features_and_scripts_size( OutputFeatures::default().get_serialized_size()? + TariScript::default().get_serialized_size()?, @@ -107,7 +108,7 @@ struct TestOmsService { pub mock_rpc_service: MockRpcServer>, pub node_id: Arc, pub base_node_wallet_rpc_mock_state: BaseNodeWalletRpcMockState, - pub node_event: broadcast::Sender>, + pub _node_event: broadcast::Sender>, pub key_manager_handle: MemoryDbKeyManager, } @@ -162,6 +163,12 @@ async fn setup_output_manager_service( let key_manager = create_memory_db_key_manager().unwrap(); + let (event_sender, _) = broadcast::channel(200); + let recovery_message_watch = Watch::new("unset".to_string()); + let one_sided_message_watch = Watch::new("unset".to_string()); + + let scanner_handle = UtxoScannerHandle::new(event_sender.clone(), one_sided_message_watch, recovery_message_watch); + let output_manager_service = OutputManagerService::new( OutputManagerServiceConfig { ..Default::default() }, oms_request_receiver, @@ -174,6 +181,7 @@ async fn setup_output_manager_service( Network::LocalNet, wallet_connectivity_mock.clone(), key_manager.clone(), + scanner_handle, ) .await .unwrap(); @@ -189,7 +197,7 @@ async fn setup_output_manager_service( mock_rpc_service: mock_server, node_id: server_node_identity, base_node_wallet_rpc_mock_state: rpc_service_state, - node_event: event_publisher_bns, + _node_event: event_publisher_bns, key_manager_handle: key_manager, } } @@ -226,6 +234,10 @@ pub async fn setup_oms_with_bn_state( task::spawn(mock_base_node_service.run()); let connectivity = create_wallet_connectivity_mock(); let key_manager = create_memory_db_key_manager().unwrap(); + let (event_sender, _) = broadcast::channel(200); + let recovery_message_watch = Watch::new("unset".to_string()); + let one_sided_message_watch = Watch::new("unset".to_string()); + let scanner_handle = UtxoScannerHandle::new(event_sender.clone(), one_sided_message_watch, recovery_message_watch); let output_manager_service = OutputManagerService::new( OutputManagerServiceConfig { ..Default::default() }, oms_request_receiver, @@ -238,6 +250,7 @@ pub async fn setup_oms_with_bn_state( Network::LocalNet, connectivity, key_manager.clone(), + scanner_handle, ) .await .unwrap(); @@ -1822,31 +1835,7 @@ async fn test_txo_validation() { oms.base_node_wallet_rpc_mock_state .set_query_deleted_response(query_deleted_response.clone()); - // Trigger validation through a base_node_service event - oms.node_event - .send(Arc::new(BaseNodeEvent::NewBlockDetected( - (*block5_header_reorg.hash()).into(), - 5, - ))) - .unwrap(); - - let _result = oms - .base_node_wallet_rpc_mock_state - .wait_pop_get_header_by_height_calls(2, Duration::from_secs(60)) - .await - .unwrap(); - - let _utxo_query_calls = oms - .base_node_wallet_rpc_mock_state - .wait_pop_utxo_query_calls(1, Duration::from_secs(60)) - .await - .unwrap(); - - let _query_deleted_calls = oms - .base_node_wallet_rpc_mock_state - .wait_pop_query_deleted(1, Duration::from_secs(60)) - .await - .unwrap(); + oms.output_manager_handle.validate_txos().await.unwrap(); // This is needed on a fast computer, otherwise the balance have not been updated correctly yet with the next // step diff --git a/base_layer/wallet/tests/transaction_service_tests/service.rs b/base_layer/wallet/tests/transaction_service_tests/service.rs index 13e1c94b43..39bda9494f 100644 --- a/base_layer/wallet/tests/transaction_service_tests/service.rs +++ b/base_layer/wallet/tests/transaction_service_tests/service.rs @@ -81,6 +81,8 @@ use minotari_wallet::{ }, TransactionServiceInitializer, }, + util::watch::Watch, + utxo_scanner_service::{handle::UtxoScannerHandle, initializer::UtxoScannerServiceInitializer}, }; use prost::Message; use rand::{rngs::OsRng, RngCore}; @@ -266,12 +268,20 @@ async fn setup_transaction_service>( node_identity.clone(), Network::LocalNet, consensus_manager, - factories, + factories.clone(), db.clone(), wallet_type, )) - .add_initializer(BaseNodeServiceInitializer::new(BaseNodeServiceConfig::default(), db)) + .add_initializer(BaseNodeServiceInitializer::new( + BaseNodeServiceConfig::default(), + db.clone(), + )) .add_initializer(WalletConnectivityInitializer::new(BaseNodeServiceConfig::default())) + .add_initializer(UtxoScannerServiceInitializer::<_, MemoryDbKeyManager>::new( + db, + factories.clone(), + Network::LocalNet, + )) .build() .await .unwrap(); @@ -389,6 +399,12 @@ async fn setup_transaction_service_no_comms( let ts_db = TransactionDatabase::new(ts_service_db.clone()); let key_manager = create_memory_db_key_manager().unwrap(); let oms_db = OutputManagerDatabase::new(OutputManagerSqliteDatabase::new(db_connection)); + let (event_sender, _) = broadcast::channel(200); + let recovery_message_watch = Watch::new("unset".to_string()); + let one_sided_message_watch = Watch::new("unset".to_string()); + + let scanner_handle = UtxoScannerHandle::new(event_sender.clone(), one_sided_message_watch, recovery_message_watch); + let output_manager_service = OutputManagerService::new( OutputManagerServiceConfig::default(), oms_request_receiver, @@ -401,6 +417,7 @@ async fn setup_transaction_service_no_comms( Network::LocalNet, wallet_connectivity_service_mock.clone(), key_manager.clone(), + scanner_handle, ) .await .unwrap(); @@ -421,6 +438,11 @@ async fn setup_transaction_service_no_comms( max_tx_query_batch_size: 2, ..Default::default() }); + let (event_sender, _) = broadcast::channel(200); + let recovery_message_watch = Watch::new("unset".to_string()); + let one_sided_message_watch = Watch::new("unset".to_string()); + + let scanner_handle = UtxoScannerHandle::new(event_sender.clone(), one_sided_message_watch, recovery_message_watch); let ts_service = TransactionService::new( test_config, ts_db.clone(), @@ -443,6 +465,7 @@ async fn setup_transaction_service_no_comms( shutdown.to_signal(), base_node_service_handle, key_manager.get_wallet_type().await, + scanner_handle, ) .await .unwrap(); diff --git a/base_layer/wallet/tests/transaction_service_tests/transaction_protocols.rs b/base_layer/wallet/tests/transaction_service_tests/transaction_protocols.rs index 58257c6c9d..eacb0200d2 100644 --- a/base_layer/wallet/tests/transaction_service_tests/transaction_protocols.rs +++ b/base_layer/wallet/tests/transaction_service_tests/transaction_protocols.rs @@ -48,6 +48,7 @@ use minotari_wallet::{ }, }, util::watch::Watch, + utxo_scanner_service::handle::UtxoScannerHandle, }; use rand::{rngs::OsRng, RngCore}; use tari_common::configuration::Network; @@ -178,6 +179,12 @@ pub async fn setup() -> ( let interactive_tari_address = TariAddress::new_dual_address(view_key.pub_key, spend_key.pub_key, network, interactive_features); let wallet_type = core_key_manager_service_handle.get_wallet_type().await; + let (event_sender, _) = broadcast::channel(200); + let recovery_message_watch = Watch::new("unset".to_string()); + let one_sided_message_watch = Watch::new("unset".to_string()); + + let utxo_scanner_handle = + UtxoScannerHandle::new(event_sender.clone(), one_sided_message_watch, recovery_message_watch); let resources = TransactionServiceResources { db, output_manager_service: output_manager_service_handle, @@ -197,6 +204,7 @@ pub async fn setup() -> ( }, shutdown_signal: shutdown.to_signal(), wallet_type, + utxo_scanner_handle, }; (