Skip to content

Commit

Permalink
fix validation
Browse files Browse the repository at this point in the history
  • Loading branch information
SWvheerden committed Feb 5, 2025
1 parent f5365ca commit d7497ae
Show file tree
Hide file tree
Showing 9 changed files with 116 additions and 30 deletions.
3 changes: 3 additions & 0 deletions base_layer/wallet/src/output_manager_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ use crate::{
service::OutputManagerService,
storage::database::{OutputManagerBackend, OutputManagerDatabase},
},
utxo_scanner_service::handle::UtxoScannerHandle,
};

/// The maximum number of transaction inputs that can be created in a single transaction, slightly less than the maximum
Expand Down Expand Up @@ -124,6 +125,7 @@ where
let base_node_service_handle = handles.expect_handle::<BaseNodeServiceHandle>();
let connectivity = handles.expect_handle::<WalletConnectivityHandle>();
let key_manager = handles.expect_handle::<TKeyManagerInterface>();
let utxo_scanner_handle = handles.expect_handle::<UtxoScannerHandle>();

let service = OutputManagerService::new(
config,
Expand All @@ -137,6 +139,7 @@ where
network,
connectivity,
key_manager,
utxo_scanner_handle,
)
.await
.expect("Could not initialize Output Manager Service")
Expand Down
12 changes: 8 additions & 4 deletions base_layer/wallet/src/output_manager_service/resources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@ use tari_common_types::tari_address::TariAddress;
use tari_core::{consensus::ConsensusConstants, transactions::CryptoFactories};
use tari_shutdown::ShutdownSignal;

use crate::output_manager_service::{
config::OutputManagerServiceConfig,
handle::OutputManagerEventSender,
storage::database::OutputManagerDatabase,
use crate::{
output_manager_service::{
config::OutputManagerServiceConfig,
handle::OutputManagerEventSender,
storage::database::OutputManagerDatabase,
},
utxo_scanner_service::handle::UtxoScannerHandle,
};

/// This struct is a collection of the common resources that a async task in the service requires.
Expand All @@ -43,4 +46,5 @@ pub(crate) struct OutputManagerResources<TBackend, TWalletConnectivity, TKeyMana
pub shutdown_signal: ShutdownSignal,
pub interactive_tari_address: TariAddress,
pub one_sided_tari_address: TariAddress,
pub utxo_scanner_handle: UtxoScannerHandle,
}
44 changes: 33 additions & 11 deletions base_layer/wallet/src/output_manager_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ use crate::{
tasks::TxoValidationTask,
TRANSACTION_INPUTS_LIMIT,
},
utxo_scanner_service::handle::{UtxoScannerEvent, UtxoScannerHandle},
};

const LOG_TARGET: &str = "wallet::output_manager_service";
Expand Down Expand Up @@ -154,6 +155,7 @@ where
network: Network,
connectivity: TWalletConnectivity,
key_manager: TKeyManagerInterface,
utxo_scanner_handle: UtxoScannerHandle,
) -> Result<Self, OutputManagerError> {
let view_key = key_manager.get_view_key().await?;
let spend_key = key_manager.get_spend_key().await?;
Expand Down Expand Up @@ -182,6 +184,7 @@ where
shutdown_signal,
one_sided_tari_address,
interactive_tari_address,
utxo_scanner_handle,
};

Ok(Self {
Expand All @@ -205,6 +208,8 @@ where

let mut base_node_service_event_stream = self.base_node_service.get_event_stream();

let mut utxo_scanner_events = self.resources.utxo_scanner_handle.get_event_receiver();

debug!(target: LOG_TARGET, "Output Manager Service started");
// Outputs marked as shorttermencumbered are not yet stored as transactions in the TMS, so lets clear them
self.resources.db.clear_short_term_encumberances()?;
Expand All @@ -216,6 +221,12 @@ where
Err(e) => debug!(target: LOG_TARGET, "Lagging read on base node event broadcast channel: {}", e),
}
},
event = utxo_scanner_events.recv() => {
match event {
Ok(msg) => self.handle_utxo_scanner_service_event(msg),
Err(e) => debug!(target: LOG_TARGET, "Lagging read on utxo scanner event broadcast channel: {}", e),
}
},
Some(request_context) = request_stream.next() => {
trace!(target: LOG_TARGET, "Handling Service API Request");
let (request, reply_tx) = request_context.split();
Expand Down Expand Up @@ -556,6 +567,23 @@ where
.map(OutputManagerResponse::ClaimHtlcTransaction)
}

fn handle_utxo_scanner_service_event(&mut self, event: UtxoScannerEvent) {
match event {
UtxoScannerEvent::ConnectingToBaseNode(_node_id) => {},
UtxoScannerEvent::ConnectedToBaseNode(_node_id, _duration) => {},
UtxoScannerEvent::ConnectionFailedToBaseNode { .. } => {},
UtxoScannerEvent::ScanningRoundFailed { .. } => {},
UtxoScannerEvent::Progress { .. } => {},
UtxoScannerEvent::Completed { .. } => {
let _id = self.validate_outputs().map_err(|e| {
warn!(target: LOG_TARGET, "Error validating txos: {:?}", e);
e
});
},
UtxoScannerEvent::ScanningFailed => {},
}
}

fn handle_base_node_service_event(&mut self, event: Arc<BaseNodeEvent>) {
match (*event).clone() {
BaseNodeEvent::BaseNodeStateChanged(_state) => {
Expand All @@ -566,10 +594,6 @@ where
},
BaseNodeEvent::NewBlockDetected(_hash, height) => {
self.last_seen_tip_height = Some(height);
let _id = self.validate_outputs().map_err(|e| {
warn!(target: LOG_TARGET, "Error validating txos: {:?}", e);
e
});
},
}
}
Expand All @@ -594,7 +618,7 @@ where
let mut base_node_watch = self.resources.connectivity.get_current_base_node_watcher();
let event_publisher = self.resources.event_publisher.clone();
let validation_in_progress = self.validation_in_progress.clone();
let mut base_node_service_event_stream = self.base_node_service.get_event_stream();
let mut utxo_scanner_service_event_stream = self.resources.utxo_scanner_handle.get_event_receiver();
tokio::spawn(async move {
// Note: We do not want the validation task to be queued
let mut _lock = match validation_in_progress.try_lock() {
Expand Down Expand Up @@ -657,12 +681,10 @@ where
is shutting down", id);
return;
},
event = base_node_service_event_stream.recv() => {
if let Ok(bn_event) = event {
if let BaseNodeEvent::NewBlockDetected(_hash, _height) = *bn_event {
debug!(target: LOG_TARGET, "TXO Validation Protocol (Id: {}) resetting because base node height changed", id);
continue 'outer;
}
event = utxo_scanner_service_event_stream.recv() => {
if let Ok(UtxoScannerEvent::Completed{..}) = event {
debug!(target: LOG_TARGET, "TXO Validation Protocol (Id: {}) resetting because base node height changed", id);
continue 'outer;
}
}
_ = base_node_watch.changed() => {
Expand Down
3 changes: 3 additions & 0 deletions base_layer/wallet/src/transaction_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ use crate::{
service::TransactionService,
storage::database::{TransactionBackend, TransactionDatabase},
},
utxo_scanner_service::handle::UtxoScannerHandle,
};

pub mod config;
Expand Down Expand Up @@ -243,6 +244,7 @@ where
let core_key_manager_service = handles.expect_handle::<TKeyManagerInterface>();
let connectivity = handles.expect_handle::<WalletConnectivityHandle>();
let base_node_service_handle = handles.expect_handle::<BaseNodeServiceHandle>();
let utxo_scanner_handle = handles.expect_handle::<UtxoScannerHandle>();

let result = TransactionService::new(
config,
Expand All @@ -266,6 +268,7 @@ where
handles.get_shutdown_signal(),
base_node_service_handle,
wallet_type,
utxo_scanner_handle,
)
.await
.expect("Could not initialize Transaction Manager Service")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ use crate::{

const LOG_TARGET: &str = "wallet::transaction_service::protocols::validation_protocol";

#[derive(Clone)]
pub struct TransactionValidationProtocol<TTransactionBackend, TWalletConnectivity> {
operation_id: OperationId,
db: TransactionDatabase<TTransactionBackend>,
Expand Down
46 changes: 31 additions & 15 deletions base_layer/wallet/src/transaction_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,10 @@ use crate::{
utc::utc_duration_since,
},
util::watch::Watch,
utxo_scanner_service::RECOVERY_KEY,
utxo_scanner_service::{
handle::{UtxoScannerEvent, UtxoScannerHandle},
RECOVERY_KEY,
},
OperationId,
};

Expand Down Expand Up @@ -261,6 +264,7 @@ where
shutdown_signal: ShutdownSignal,
base_node_service: BaseNodeServiceHandle,
wallet_type: Arc<WalletType>,
utxo_scanner_handle: UtxoScannerHandle,
) -> Result<Self, TransactionServiceError> {
// Collect the resources that all protocols will need so that they can be neatly cloned as the protocols are
// spawned.
Expand Down Expand Up @@ -295,6 +299,7 @@ where
shutdown_signal,
consensus_manager: consensus_manager.clone(),
wallet_type,
utxo_scanner_handle,
};
let power_mode = PowerMode::default();
let timeout = match power_mode {
Expand Down Expand Up @@ -3383,6 +3388,9 @@ where

let mut base_node_watch = self.connectivity().get_current_base_node_watcher();
let validation_in_progress = self.validation_in_progress.clone();

let mut utxo_scanner_service_event_stream = self.resources.utxo_scanner_handle.get_event_receiver();

let join_handle = tokio::spawn(async move {
let mut _lock = validation_in_progress.try_lock().map_err(|_| {
debug!(
Expand All @@ -3391,20 +3399,27 @@ where
);
TransactionServiceProtocolError::new(id, TransactionServiceError::TransactionValidationInProgress)
})?;
let exec_fut = protocol.execute();
tokio::pin!(exec_fut);
loop {
tokio::select! {
result = &mut exec_fut => {
return result;
},
_ = base_node_watch.changed() => {
if let Some(selected_peer) = base_node_watch.borrow().as_ref() {
if selected_peer.get_current_peer().node_id != current_base_node {
debug!(target: LOG_TARGET, "Base node changed, exiting transaction validation protocol");
return Err(TransactionServiceProtocolError::new(id, TransactionServiceError::BaseNodeChanged {
task_name: "transaction validation_protocol",
}));
'outer: loop {
let local_run = protocol.clone();
let exec_fut = local_run.execute();
tokio::pin!(exec_fut);
loop {
tokio::select! {
result = &mut exec_fut => {
return result;
},
event = utxo_scanner_service_event_stream.recv() => {
if let Ok(UtxoScannerEvent::Completed{..}) = event {
debug!(target: LOG_TARGET, "TXO Validation Protocol (Id: {}) resetting because base node height changed", id);
continue 'outer;
}
}
_ = base_node_watch.changed() => {
if let Some(selected_peer) = base_node_watch.borrow().as_ref() {
if selected_peer.get_current_peer().node_id != current_base_node {
debug!(target: LOG_TARGET, "Base node changed, restarting transaction validation protocol");
continue 'outer;
}
}
}
}
Expand Down Expand Up @@ -3843,6 +3858,7 @@ pub struct TransactionServiceResources<TBackend, TWalletConnectivity, TKeyManage
pub config: TransactionServiceConfig,
pub shutdown_signal: ShutdownSignal,
pub wallet_type: Arc<WalletType>,
pub utxo_scanner_handle: UtxoScannerHandle,
}

#[derive(Default, Clone, Copy)]
Expand Down
14 changes: 14 additions & 0 deletions base_layer/wallet/tests/output_manager_service_tests/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ use minotari_wallet::{
},
test_utils::create_consensus_constants,
transaction_service::handle::TransactionServiceHandle,
util::watch::Watch,
utxo_scanner_service::handle::UtxoScannerHandle,
};
use rand::{rngs::OsRng, RngCore};
use tari_common::configuration::Network;
Expand Down Expand Up @@ -162,6 +164,12 @@ async fn setup_output_manager_service<T: OutputManagerBackend + 'static>(

let key_manager = create_memory_db_key_manager().unwrap();

let (event_sender, _) = broadcast::channel(200);
let recovery_message_watch = Watch::new("unset".to_string());
let one_sided_message_watch = Watch::new("unset".to_string());

let scanner_handle = UtxoScannerHandle::new(event_sender.clone(), one_sided_message_watch, recovery_message_watch);

let output_manager_service = OutputManagerService::new(
OutputManagerServiceConfig { ..Default::default() },
oms_request_receiver,
Expand All @@ -174,6 +182,7 @@ async fn setup_output_manager_service<T: OutputManagerBackend + 'static>(
Network::LocalNet,
wallet_connectivity_mock.clone(),
key_manager.clone(),
scanner_handle,
)
.await
.unwrap();
Expand Down Expand Up @@ -226,6 +235,10 @@ pub async fn setup_oms_with_bn_state<T: OutputManagerBackend + 'static>(
task::spawn(mock_base_node_service.run());
let connectivity = create_wallet_connectivity_mock();
let key_manager = create_memory_db_key_manager().unwrap();
let (event_sender, _) = broadcast::channel(200);
let recovery_message_watch = Watch::new("unset".to_string());
let one_sided_message_watch = Watch::new("unset".to_string());
let scanner_handle = UtxoScannerHandle::new(event_sender.clone(), one_sided_message_watch, recovery_message_watch);
let output_manager_service = OutputManagerService::new(
OutputManagerServiceConfig { ..Default::default() },
oms_request_receiver,
Expand All @@ -238,6 +251,7 @@ pub async fn setup_oms_with_bn_state<T: OutputManagerBackend + 'static>(
Network::LocalNet,
connectivity,
key_manager.clone(),
scanner_handle,
)
.await
.unwrap();
Expand Down
15 changes: 15 additions & 0 deletions base_layer/wallet/tests/transaction_service_tests/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ use minotari_wallet::{
},
TransactionServiceInitializer,
},
util::watch::Watch,
utxo_scanner_service::handle::UtxoScannerHandle,
};
use prost::Message;
use rand::{rngs::OsRng, RngCore};
Expand Down Expand Up @@ -389,6 +391,12 @@ async fn setup_transaction_service_no_comms(
let ts_db = TransactionDatabase::new(ts_service_db.clone());
let key_manager = create_memory_db_key_manager().unwrap();
let oms_db = OutputManagerDatabase::new(OutputManagerSqliteDatabase::new(db_connection));
let (event_sender, _) = broadcast::channel(200);
let recovery_message_watch = Watch::new("unset".to_string());
let one_sided_message_watch = Watch::new("unset".to_string());

let scanner_handle = UtxoScannerHandle::new(event_sender.clone(), one_sided_message_watch, recovery_message_watch);

let output_manager_service = OutputManagerService::new(
OutputManagerServiceConfig::default(),
oms_request_receiver,
Expand All @@ -401,6 +409,7 @@ async fn setup_transaction_service_no_comms(
Network::LocalNet,
wallet_connectivity_service_mock.clone(),
key_manager.clone(),
scanner_handle,
)
.await
.unwrap();
Expand All @@ -421,6 +430,11 @@ async fn setup_transaction_service_no_comms(
max_tx_query_batch_size: 2,
..Default::default()
});
let (event_sender, _) = broadcast::channel(200);
let recovery_message_watch = Watch::new("unset".to_string());
let one_sided_message_watch = Watch::new("unset".to_string());

let scanner_handle = UtxoScannerHandle::new(event_sender.clone(), one_sided_message_watch, recovery_message_watch);
let ts_service = TransactionService::new(
test_config,
ts_db.clone(),
Expand All @@ -443,6 +457,7 @@ async fn setup_transaction_service_no_comms(
shutdown.to_signal(),
base_node_service_handle,
key_manager.get_wallet_type().await,
scanner_handle,
)
.await
.unwrap();
Expand Down
Loading

0 comments on commit d7497ae

Please sign in to comment.