Skip to content

Commit

Permalink
WIP: return Vec for transaction_service tx methods
Browse files Browse the repository at this point in the history
  • Loading branch information
mmrrnn committed Jan 15, 2025
1 parent d06e9fe commit 1c1eb0e
Show file tree
Hide file tree
Showing 9 changed files with 208 additions and 141 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion base_layer/wallet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ version = "1.9.2-pre.0"
edition = "2018"

[dependencies]
indexmap = "2.7.0"
tari_common = { path = "../../common", version = "1.9.2-pre.0" }
tari_common_sqlite = { path = "../../common_sqlite", version = "1.9.2-pre.0" }
tari_common_types = { path = "../../base_layer/common_types", version = "1.9.2-pre.0" }
Expand Down
24 changes: 12 additions & 12 deletions base_layer/wallet/src/transaction_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use std::{
};

use chrono::{DateTime, Utc};
use indexmap::IndexMap;
use tari_common_types::{
burnt_proof::BurntProof,
tari_address::TariAddress,
Expand Down Expand Up @@ -418,9 +417,9 @@ pub enum TransactionServiceResponse {
template_registration: Box<CodeTemplateRegistration>,
},
TransactionCancelled,
PendingInboundTransactions(HashMap<TxId, InboundTransaction>),
PendingOutboundTransactions(HashMap<TxId, OutboundTransaction>),
CompletedTransactions(IndexMap<TxId, CompletedTransaction>),
PendingInboundTransactions(Vec<InboundTransaction>),
PendingOutboundTransactions(Vec<OutboundTransaction>),
CompletedTransactions(Vec<CompletedTransaction>),
CompletedTransaction(Box<CompletedTransaction>),
BaseNodePublicKeySet,
UtxoImported(TxId),
Expand Down Expand Up @@ -912,9 +911,10 @@ impl TransactionServiceHandle {
}
}

///////////////////////////////
pub async fn get_pending_inbound_transactions(
&mut self,
) -> Result<HashMap<TxId, InboundTransaction>, TransactionServiceError> {
) -> Result<Vec<InboundTransaction>, TransactionServiceError> {
match self
.handle
.call(TransactionServiceRequest::GetPendingInboundTransactions)
Expand All @@ -927,7 +927,7 @@ impl TransactionServiceHandle {

pub async fn get_cancelled_pending_inbound_transactions(
&mut self,
) -> Result<HashMap<TxId, InboundTransaction>, TransactionServiceError> {
) -> Result<Vec<InboundTransaction>, TransactionServiceError> {
match self
.handle
.call(TransactionServiceRequest::GetCancelledPendingInboundTransactions)
Expand All @@ -940,7 +940,7 @@ impl TransactionServiceHandle {

pub async fn get_pending_outbound_transactions(
&mut self,
) -> Result<HashMap<TxId, OutboundTransaction>, TransactionServiceError> {
) -> Result<Vec<OutboundTransaction>, TransactionServiceError> {
match self
.handle
.call(TransactionServiceRequest::GetPendingOutboundTransactions)
Expand All @@ -953,7 +953,7 @@ impl TransactionServiceHandle {

pub async fn get_cancelled_pending_outbound_transactions(
&mut self,
) -> Result<HashMap<TxId, OutboundTransaction>, TransactionServiceError> {
) -> Result<Vec<OutboundTransaction>, TransactionServiceError> {
match self
.handle
.call(TransactionServiceRequest::GetCancelledPendingOutboundTransactions)
Expand All @@ -964,9 +964,7 @@ impl TransactionServiceHandle {
}
}

pub async fn get_completed_transactions(
&mut self,
) -> Result<IndexMap<TxId, CompletedTransaction>, TransactionServiceError> {
pub async fn get_completed_transactions(&mut self) -> Result<Vec<CompletedTransaction>, TransactionServiceError> {
match self
.handle
.call(TransactionServiceRequest::GetCompletedTransactions)
Expand All @@ -979,7 +977,7 @@ impl TransactionServiceHandle {

pub async fn get_cancelled_completed_transactions(
&mut self,
) -> Result<IndexMap<TxId, CompletedTransaction>, TransactionServiceError> {
) -> Result<Vec<CompletedTransaction>, TransactionServiceError> {
match self
.handle
.call(TransactionServiceRequest::GetCancelledCompletedTransactions)
Expand All @@ -990,6 +988,8 @@ impl TransactionServiceHandle {
}
}

///////////////////////////

pub async fn get_completed_transaction(
&mut self,
tx_id: TxId,
Expand Down
18 changes: 9 additions & 9 deletions base_layer/wallet/src/transaction_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2750,42 +2750,42 @@ where
>,
) -> Result<(), TransactionServiceError> {
let outbound_txs = self.db.get_pending_outbound_transactions()?;
for (tx_id, tx) in outbound_txs {
for tx in outbound_txs {
let (sender_protocol, stage) = if tx.send_count > 0 {
(None, TransactionSendProtocolStage::WaitForReply)
} else {
(Some(tx.sender_protocol), TransactionSendProtocolStage::Queued)
};
let (not_yet_pending, queued) = (
!self.pending_transaction_reply_senders.contains_key(&tx_id),
!self.pending_transaction_reply_senders.contains_key(&tx.tx_id),
stage == TransactionSendProtocolStage::Queued,
);

if not_yet_pending {
debug!(
target: LOG_TARGET,
"Restarting listening for Reply for Pending Outbound Transaction TxId: {}", tx_id
"Restarting listening for Reply for Pending Outbound Transaction TxId: {}", tx.tx_id
);
} else if queued {
debug!(
target: LOG_TARGET,
"Retry sending queued Pending Outbound Transaction TxId: {}", tx_id
"Retry sending queued Pending Outbound Transaction TxId: {}", tx.tx_id
);
let _sender = self.pending_transaction_reply_senders.remove(&tx_id);
let _sender = self.send_transaction_cancellation_senders.remove(&tx_id);
let _sender = self.pending_transaction_reply_senders.remove(&tx.tx_id);
let _sender = self.send_transaction_cancellation_senders.remove(&tx.tx_id);
} else {
// dont care
}

if not_yet_pending || queued {
let (tx_reply_sender, tx_reply_receiver) = mpsc::channel(100);
let (cancellation_sender, cancellation_receiver) = oneshot::channel();
self.pending_transaction_reply_senders.insert(tx_id, tx_reply_sender);
self.pending_transaction_reply_senders.insert(tx.tx_id, tx_reply_sender);
self.send_transaction_cancellation_senders
.insert(tx_id, cancellation_sender);
.insert(tx.tx_id, cancellation_sender);

let protocol = TransactionSendProtocol::new(
tx_id,
tx.tx_id,
self.resources.clone(),
tx_reply_receiver,
cancellation_receiver,
Expand Down
32 changes: 12 additions & 20 deletions base_layer/wallet/src/transaction_service/storage/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,12 @@
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{
collections::HashMap,
fmt,
fmt::{Display, Error, Formatter},
sync::Arc,
};

use chrono::{DateTime, Utc};
use indexmap::IndexMap;
use log::*;
use tari_common_types::{
tari_address::TariAddress,
Expand Down Expand Up @@ -246,9 +244,9 @@ pub enum DbValue {
PendingOutboundTransaction(Box<OutboundTransaction>),
PendingInboundTransaction(Box<InboundTransaction>),
CompletedTransaction(Box<CompletedTransaction>),
PendingOutboundTransactions(HashMap<TxId, OutboundTransaction>),
PendingInboundTransactions(HashMap<TxId, InboundTransaction>),
CompletedTransactions(IndexMap<TxId, CompletedTransaction>),
PendingOutboundTransactions(Vec<OutboundTransaction>),
PendingInboundTransactions(Vec<InboundTransaction>),
CompletedTransactions(Vec<CompletedTransaction>),
WalletTransaction(Box<WalletTransaction>),
}

Expand Down Expand Up @@ -509,22 +507,20 @@ where T: TransactionBackend + 'static
Ok(*t)
}

pub fn get_pending_inbound_transactions(
&self,
) -> Result<HashMap<TxId, InboundTransaction>, TransactionStorageError> {
pub fn get_pending_inbound_transactions(&self) -> Result<Vec<InboundTransaction>, TransactionStorageError> {
self.get_pending_inbound_transactions_by_cancelled(false)
}

pub fn get_cancelled_pending_inbound_transactions(
&self,
) -> Result<HashMap<TxId, InboundTransaction>, TransactionStorageError> {
) -> Result<Vec<InboundTransaction>, TransactionStorageError> {
self.get_pending_inbound_transactions_by_cancelled(true)
}

fn get_pending_inbound_transactions_by_cancelled(
&self,
cancelled: bool,
) -> Result<HashMap<TxId, InboundTransaction>, TransactionStorageError> {
) -> Result<Vec<InboundTransaction>, TransactionStorageError> {
let key = if cancelled {
DbKey::CancelledPendingInboundTransactions
} else {
Expand All @@ -545,22 +541,20 @@ where T: TransactionBackend + 'static
Ok(t)
}

pub fn get_pending_outbound_transactions(
&self,
) -> Result<HashMap<TxId, OutboundTransaction>, TransactionStorageError> {
pub fn get_pending_outbound_transactions(&self) -> Result<Vec<OutboundTransaction>, TransactionStorageError> {
self.get_pending_outbound_transactions_by_cancelled(false)
}

pub fn get_cancelled_pending_outbound_transactions(
&self,
) -> Result<HashMap<TxId, OutboundTransaction>, TransactionStorageError> {
) -> Result<Vec<OutboundTransaction>, TransactionStorageError> {
self.get_pending_outbound_transactions_by_cancelled(true)
}

fn get_pending_outbound_transactions_by_cancelled(
&self,
cancelled: bool,
) -> Result<HashMap<TxId, OutboundTransaction>, TransactionStorageError> {
) -> Result<Vec<OutboundTransaction>, TransactionStorageError> {
let key = if cancelled {
DbKey::CancelledPendingOutboundTransactions
} else {
Expand Down Expand Up @@ -589,13 +583,11 @@ where T: TransactionBackend + 'static
Ok(address)
}

pub fn get_completed_transactions(&self) -> Result<IndexMap<TxId, CompletedTransaction>, TransactionStorageError> {
pub fn get_completed_transactions(&self) -> Result<Vec<CompletedTransaction>, TransactionStorageError> {
self.get_completed_transactions_by_cancelled(false)
}

pub fn get_cancelled_completed_transactions(
&self,
) -> Result<IndexMap<TxId, CompletedTransaction>, TransactionStorageError> {
pub fn get_cancelled_completed_transactions(&self) -> Result<Vec<CompletedTransaction>, TransactionStorageError> {
self.get_completed_transactions_by_cancelled(true)
}

Expand All @@ -621,7 +613,7 @@ where T: TransactionBackend + 'static
fn get_completed_transactions_by_cancelled(
&self,
cancelled: bool,
) -> Result<IndexMap<TxId, CompletedTransaction>, TransactionStorageError> {
) -> Result<Vec<CompletedTransaction>, TransactionStorageError> {
let key = if cancelled {
DbKey::CancelledCompletedTransactions
} else {
Expand Down
48 changes: 16 additions & 32 deletions base_layer/wallet/src/transaction_service/storage/sqlite_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{
collections::HashMap,
convert::{TryFrom, TryInto},
sync::{Arc, RwLock},
};
Expand All @@ -48,7 +47,6 @@ use tari_utilities::{hex::Hex, ByteArray, Hidden};
use thiserror::Error;
use tokio::time::Instant;
use zeroize::Zeroize;
use indexmap::IndexMap;

use crate::{
schema::{completed_transactions, inbound_transactions, outbound_transactions},
Expand Down Expand Up @@ -267,67 +265,49 @@ impl TransactionBackend for TransactionServiceSqliteDatabase {
None
},
DbKey::PendingOutboundTransactions => {
let mut result = HashMap::new();
let mut result = Vec::new();
for o in OutboundTransactionSql::index_by_cancelled(&mut conn, false)? {
result.insert(
(o.tx_id as u64).into(),
OutboundTransaction::try_from(o.clone(), &cipher)?,
);
result.push(OutboundTransaction::try_from(o.clone(), &cipher)?);
}

Some(DbValue::PendingOutboundTransactions(result))
},
DbKey::PendingInboundTransactions => {
let mut result = HashMap::new();
let mut result = Vec::new();
for i in InboundTransactionSql::index_by_cancelled(&mut conn, false)? {
result.insert(
(i.tx_id as u64).into(),
InboundTransaction::try_from((i).clone(), &cipher)?,
);
result.push(InboundTransaction::try_from((i).clone(), &cipher)?);
}

Some(DbValue::PendingInboundTransactions(result))
},
DbKey::CompletedTransactions => {
let mut result = IndexMap::new();
let mut result = Vec::new();
for c in CompletedTransactionSql::index_by_cancelled(&mut conn, false)? {
result.insert(
(c.tx_id as u64).into(),
CompletedTransaction::try_from((c).clone(), &cipher)?,
);
result.push(CompletedTransaction::try_from((c).clone(), &cipher)?);
}

Some(DbValue::CompletedTransactions(result))
},
DbKey::CancelledPendingOutboundTransactions => {
let mut result = HashMap::new();
let mut result = Vec::new();
for o in OutboundTransactionSql::index_by_cancelled(&mut conn, true)? {
result.insert(
(o.tx_id as u64).into(),
OutboundTransaction::try_from((o).clone(), &cipher)?,
);
result.push(OutboundTransaction::try_from((o).clone(), &cipher)?);
}

Some(DbValue::PendingOutboundTransactions(result))
},
DbKey::CancelledPendingInboundTransactions => {
let mut result = HashMap::new();
let mut result = Vec::new();
for i in InboundTransactionSql::index_by_cancelled(&mut conn, true)? {
result.insert(
(i.tx_id as u64).into(),
InboundTransaction::try_from(i.clone(), &cipher)?,
);
result.push(InboundTransaction::try_from(i.clone(), &cipher)?);
}

Some(DbValue::PendingInboundTransactions(result))
},
DbKey::CancelledCompletedTransactions => {
let mut result = IndexMap::new();
let mut result = Vec::new();
for c in CompletedTransactionSql::index_by_cancelled(&mut conn, true)? {
result.insert(
(c.tx_id as u64).into(),
CompletedTransaction::try_from((c).clone(), &cipher)?,
);
result.push(CompletedTransaction::try_from((c).clone(), &cipher)?);
}

Some(DbValue::CompletedTransactions(result))
Expand Down Expand Up @@ -1193,6 +1173,8 @@ impl InboundTransactionSql {
) -> Result<Vec<InboundTransactionSql>, TransactionStorageError> {
Ok(inbound_transactions::table
.filter(inbound_transactions::cancelled.eq(i32::from(cancelled)))
// QUESTION(A): Should we order by timestamp or last_send_timestamp?
.order_by(inbound_transactions::timestamp.desc())
.load::<InboundTransactionSql>(conn)?)
}

Expand Down Expand Up @@ -1457,6 +1439,8 @@ impl OutboundTransactionSql {
) -> Result<Vec<OutboundTransactionSql>, TransactionStorageError> {
Ok(outbound_transactions::table
.filter(outbound_transactions::cancelled.eq(i32::from(cancelled)))
// QUESTION(A): Should we order by timestamp or last_send_timestamp?
.order_by(outbound_transactions::timestamp.desc())
.load::<OutboundTransactionSql>(conn)?)
}

Expand Down
Loading

0 comments on commit 1c1eb0e

Please sign in to comment.