diff --git a/src/chain/bitcoind_rpc.rs b/src/chain/bitcoind_rpc.rs index 3ad85db0b..433928a89 100644 --- a/src/chain/bitcoind_rpc.rs +++ b/src/chain/bitcoind_rpc.rs @@ -30,6 +30,8 @@ use std::sync::Arc; pub struct BitcoindRpcClient { rpc_client: Arc, latest_mempool_timestamp: AtomicU64, + mempool_entries_cache: tokio::sync::Mutex>, + mempool_txs_cache: tokio::sync::Mutex>, } impl BitcoindRpcClient { @@ -42,7 +44,9 @@ impl BitcoindRpcClient { let latest_mempool_timestamp = AtomicU64::new(0); - Self { rpc_client, latest_mempool_timestamp } + let mempool_entries_cache = tokio::sync::Mutex::new(HashMap::new()); + let mempool_txs_cache = tokio::sync::Mutex::new(HashMap::new()); + Self { rpc_client, latest_mempool_timestamp, mempool_entries_cache, mempool_txs_cache } } pub(crate) fn rpc_client(&self) -> Arc { @@ -160,16 +164,25 @@ impl BitcoindRpcClient { } } - pub(crate) async fn get_mempool_entries(&self) -> std::io::Result> { + pub(crate) async fn update_mempool_entries_cache(&self) -> std::io::Result<()> { let mempool_txids = self.get_raw_mempool().await?; - let mut mempool_entries = Vec::with_capacity(mempool_txids.len()); + + let mut mempool_entries_cache = self.mempool_entries_cache.lock().await; + mempool_entries_cache.retain(|txid, _| mempool_txids.contains(txid)); + for txid in mempool_txids { - // Push any entries that haven't been dropped since `getrawmempool` + if mempool_entries_cache.contains_key(&txid) { + continue; + } + if let Some(entry) = self.get_mempool_entry(txid).await? { - mempool_entries.push(entry); + mempool_entries_cache.insert(txid, entry.clone()); } } - Ok(mempool_entries) + + mempool_entries_cache.shrink_to_fit(); + + Ok(()) } /// Get mempool transactions, alongside their first-seen unix timestamps. @@ -183,10 +196,14 @@ impl BitcoindRpcClient { let prev_mempool_time = self.latest_mempool_timestamp.load(Ordering::Relaxed); let mut latest_time = prev_mempool_time; - let mempool_entries = self.get_mempool_entries().await?; - let mut txs_to_emit = Vec::new(); + self.update_mempool_entries_cache().await?; + + let mempool_entries_cache = self.mempool_entries_cache.lock().await; + let mut mempool_txs_cache = self.mempool_txs_cache.lock().await; + mempool_txs_cache.retain(|txid, _| mempool_entries_cache.contains_key(txid)); - for entry in mempool_entries { + let mut txs_to_emit = Vec::with_capacity(mempool_entries_cache.len()); + for (txid, entry) in mempool_entries_cache.iter() { if entry.time > latest_time { latest_time = entry.time; } @@ -202,8 +219,14 @@ impl BitcoindRpcClient { continue; } + if let Some((cached_tx, cached_time)) = mempool_txs_cache.get(txid) { + txs_to_emit.push((cached_tx.clone(), *cached_time)); + continue; + } + match self.get_raw_transaction(&entry.txid).await { Ok(Some(tx)) => { + mempool_txs_cache.insert(entry.txid, (tx.clone(), entry.time)); txs_to_emit.push((tx, entry.time)); }, Ok(None) => {