Skip to content

Commit

Permalink
Cache mempool entries and transactions to avoid retransmissions
Browse files Browse the repository at this point in the history
Previously, we would retransmit entry and transaction data whenever
polling for mempool data. Here we introduce simple caches for both, so
most data is only retrieved in bulk on the first iteration after
startup.
  • Loading branch information
tnull committed Feb 11, 2025
1 parent 11008ee commit 9fad38e
Showing 1 changed file with 32 additions and 9 deletions.
41 changes: 32 additions & 9 deletions src/chain/bitcoind_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ use std::sync::Arc;
pub struct BitcoindRpcClient {
rpc_client: Arc<RpcClient>,
latest_mempool_timestamp: AtomicU64,
mempool_entries_cache: tokio::sync::Mutex<HashMap<Txid, MempoolEntry>>,
mempool_txs_cache: tokio::sync::Mutex<HashMap<Txid, (Transaction, u64)>>,
}

impl BitcoindRpcClient {
Expand All @@ -42,7 +44,9 @@ impl BitcoindRpcClient {

let latest_mempool_timestamp = AtomicU64::new(0);

Self { rpc_client, latest_mempool_timestamp }
let mempool_entries_cache = tokio::sync::Mutex::new(HashMap::new());
let mempool_txs_cache = tokio::sync::Mutex::new(HashMap::new());
Self { rpc_client, latest_mempool_timestamp, mempool_entries_cache, mempool_txs_cache }
}

pub(crate) fn rpc_client(&self) -> Arc<RpcClient> {
Expand Down Expand Up @@ -160,16 +164,25 @@ impl BitcoindRpcClient {
}
}

pub(crate) async fn get_mempool_entries(&self) -> std::io::Result<Vec<MempoolEntry>> {
pub(crate) async fn update_mempool_entries_cache(&self) -> std::io::Result<()> {
let mempool_txids = self.get_raw_mempool().await?;
let mut mempool_entries = Vec::with_capacity(mempool_txids.len());

let mut mempool_entries_cache = self.mempool_entries_cache.lock().await;
mempool_entries_cache.retain(|txid, _| mempool_txids.contains(txid));

for txid in mempool_txids {
// Push any entries that haven't been dropped since `getrawmempool`
if mempool_entries_cache.contains_key(&txid) {
continue;
}

if let Some(entry) = self.get_mempool_entry(txid).await? {
mempool_entries.push(entry);
mempool_entries_cache.insert(txid, entry.clone());
}
}
Ok(mempool_entries)

mempool_entries_cache.shrink_to_fit();

Ok(())
}

/// Get mempool transactions, alongside their first-seen unix timestamps.
Expand All @@ -183,10 +196,14 @@ impl BitcoindRpcClient {
let prev_mempool_time = self.latest_mempool_timestamp.load(Ordering::Relaxed);
let mut latest_time = prev_mempool_time;

let mempool_entries = self.get_mempool_entries().await?;
let mut txs_to_emit = Vec::new();
self.update_mempool_entries_cache().await?;

let mempool_entries_cache = self.mempool_entries_cache.lock().await;
let mut mempool_txs_cache = self.mempool_txs_cache.lock().await;
mempool_txs_cache.retain(|txid, _| mempool_entries_cache.contains_key(txid));

for entry in mempool_entries {
let mut txs_to_emit = Vec::with_capacity(mempool_entries_cache.len());
for (txid, entry) in mempool_entries_cache.iter() {
if entry.time > latest_time {
latest_time = entry.time;
}
Expand All @@ -202,8 +219,14 @@ impl BitcoindRpcClient {
continue;
}

if let Some((cached_tx, cached_time)) = mempool_txs_cache.get(txid) {
txs_to_emit.push((cached_tx.clone(), *cached_time));
continue;
}

match self.get_raw_transaction(&entry.txid).await {
Ok(Some(tx)) => {
mempool_txs_cache.insert(entry.txid, (tx.clone(), entry.time));
txs_to_emit.push((tx, entry.time));
},
Ok(None) => {
Expand Down

0 comments on commit 9fad38e

Please sign in to comment.