From 3ff21568f21a68f893e64dcba8436ad909d3a71f Mon Sep 17 00:00:00 2001 From: Nadav Ivgi Date: Fri, 26 Sep 2025 22:03:11 +0300 Subject: [PATCH 1/2] Support partial mempool synchronization Prior to this change, the local electrs mempool was only updated with new transactions once we were able to get a complete snapshot of every transaction in bitcoind's mempool, which could require multiple rounds of attempts if any transactions are replaced (or otherwise evicted) between querying for the mempool txids and querying for the transactions themselves. PR #89 made each round more efficient, but obtaining a complete snapshot can still potentially take many rounds the more RBF is used, with each round taking longer the larger the mempool gets (just listing the mempool txids to find the delta becomes costly). With this change, the local mempool is instead updated with whatever transactions we are able to get, without waiting for a fully consistent snapshot - which should improve performance and reduce latency. Making partial updates requires some extra care on the electrs side to ensure mempool consistency (i.e., no double-spends or missing input txos), since we can no longer fully rely on bitcoind validating consistency for us. Specifically: - If any transactions were RBF'd and gone by the time we queried for them, we explicitly check for transactions that depend on the missing transactions (or their descendants) and remove them too. - Mempool evictions must be processed first before any additions, even if the replacing transactions aren't available. This ensures double-spend conflicts are not possible. --- src/daemon.rs | 2 +- src/new_index/mempool.rs | 172 +++++++++++++++++++++------------------ 2 files changed, 94 insertions(+), 80 deletions(-) diff --git a/src/daemon.rs b/src/daemon.rs index 9b4bb6478..211f7d94d 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -639,7 +639,7 @@ impl Daemon { /// Fetch the given transactions in parallel over multiple threads and RPC connections, /// ignoring any missing ones and returning whatever is available. #[trace] - pub fn gettransactions_available(&self, txids: &[&Txid]) -> Result> { + pub fn gettransactions_available(&self, txids: &[&Txid]) -> Result> { const RPC_INVALID_ADDRESS_OR_KEY: i64 = -5; let params_list: Vec = txids diff --git a/src/new_index/mempool.rs b/src/new_index/mempool.rs index 69e46199a..a4db05d91 100644 --- a/src/new_index/mempool.rs +++ b/src/new_index/mempool.rs @@ -516,99 +516,113 @@ impl Mempool { daemon: &Daemon, tip: &BlockHash, ) -> Result { - let _timer = mempool - .read() - .unwrap() - .latency - .with_label_values(&["update"]) - .start_timer(); + let (_timer, count) = { + let mempool = mempool.read().unwrap(); + let timer = mempool.latency.with_label_values(&["update"]).start_timer(); + (timer, mempool.count.clone()) + }; - // Continuously attempt to fetch mempool transactions until we're able to get them in full - let mut fetched_txs = HashMap::::new(); - let mut indexed_txids = mempool.read().unwrap().txids_set(); - loop { - // Get bitcoind's current list of mempool txids - let all_txids = daemon - .getmempooltxids() - .chain_err(|| "failed to update mempool from daemon")?; - - // Remove evicted mempool transactions - mempool - .write() - .unwrap() - .remove(indexed_txids.difference(&all_txids).collect()); - - indexed_txids.retain(|txid| all_txids.contains(txid)); - fetched_txs.retain(|txid, _| all_txids.contains(txid)); - - // Fetch missing transactions from bitcoind - let new_txids = all_txids - .iter() - .filter(|&txid| !fetched_txs.contains_key(txid) && !indexed_txids.contains(txid)) - .collect::>(); - if new_txids.is_empty() { - break; - } - debug!( - "mempool with total {} txs, {} fetched, {} missing", - all_txids.len(), - indexed_txids.len() + fetched_txs.len(), - new_txids.len() - ); + // Get bitcoind's current list of mempool txids + let bitcoind_txids = daemon + .getmempooltxids() + .chain_err(|| "failed to update mempool from daemon")?; - { - let mempool = mempool.read().unwrap(); - - mempool - .count - .with_label_values(&["all_txs"]) - .set(all_txids.len() as f64); - mempool - .count - .with_label_values(&["fetched_txs"]) - .set((indexed_txids.len() + fetched_txs.len()) as f64); - mempool - .count - .with_label_values(&["missing_txs"]) - .set(new_txids.len() as f64); - } + // Get the list of mempool txids in the local mempool view + let indexed_txids = mempool.read().unwrap().txids_set(); - let new_txs = daemon.gettransactions_available(&new_txids)?; + // Remove evicted mempool transactions from the local mempool view + let evicted_txids = indexed_txids + .difference(&bitcoind_txids) + .collect::>(); + if !evicted_txids.is_empty() { + mempool.write().unwrap().remove(evicted_txids); + } // avoids acquiring a lock when there are no evictions - // Abort if the chain tip moved while fetching transactions - if daemon.getbestblockhash()? != *tip { - warn!("chain tip moved while updating mempool"); - return Ok(false); - } + // Find transactions available in bitcoind's mempool but not indexed locally + let new_txids = bitcoind_txids + .iter() + .filter(|&txid| !indexed_txids.contains(txid)) + .collect::>(); + + debug!( + "mempool with total {} txs, {} indexed locally, {} to fetch", + bitcoind_txids.len(), + indexed_txids.len(), + new_txids.len() + ); + count + .with_label_values(&["all_txs"]) + .set(bitcoind_txids.len() as f64); + count + .with_label_values(&["indexed_txs"]) + .set(indexed_txids.len() as f64); + count + .with_label_values(&["missing_txs"]) + .set(new_txids.len() as f64); + + if new_txids.is_empty() { + return Ok(true); + } + + // Fetch missing transactions from bitcoind + let mut fetched_txs = daemon.gettransactions_available(&new_txids)?; + + // Abort if the chain tip moved while fetching transactions + if daemon.getbestblockhash()? != *tip { + warn!("chain tip moved while updating mempool"); + return Ok(false); + } - let fetched_count = new_txs.len(); - fetched_txs.extend(new_txs); + // Find which transactions were requested but are no longer available in bitcoind's mempool, + // typically due to Replace-By-Fee (or mempool eviction for some other reason) taking place + // between querying for the mempool txids and querying for the transactions themselves. + let mut replaced_txids: HashSet<_> = new_txids + .into_iter() + .filter(|txid| !fetched_txs.contains_key(*txid)) + .cloned() + .collect(); - // Retry if any transactions were evicted form the mempool before we managed to get them - if fetched_count != new_txids.len() { - warn!( - "failed to fetch {} mempool txs, retrying...", - new_txids.len() - fetched_count - ); - let missing_txids: Vec<_> = new_txids + if replaced_txids.is_empty() { + trace!("fetched complete mempool snapshot"); + } else { + warn!( + "could not to fetch {} replaced/evicted mempool transactions: {:?}", + replaced_txids.len(), + replaced_txids.iter().take(10).collect::>() + ); + } + + // If we were unable to get a complete consistent snapshot of the bitcoind mempool, + // detect and remove any transactions that spend from the missing (replaced) transactions + // or any of their descendants. This is necessary because it could be possible to fetch the + // child tx successfully before the parent is replaced, but miss the replaced parent tx. + while !replaced_txids.is_empty() { + let mut descendants_txids = HashSet::new(); + fetched_txs.retain(|txid, tx| { + let parent_was_replaced = tx + .input .iter() - .filter(|txid| !fetched_txs.contains_key(**txid)) - .take(10) - .collect(); - warn!("missing mempool txids: {:?} (capped at 10)", missing_txids); - } else { - break; - } + .any(|txin| replaced_txids.contains(&txin.previous_output.txid)); + if parent_was_replaced { + descendants_txids.insert(*txid); + } + !parent_was_replaced + }); + trace!( + "detected {} replaced mempool descendants", + descendants_txids.len() + ); + replaced_txids = descendants_txids; } // Add fetched transactions to our view of the mempool - { + trace!("indexing {} new mempool transactions", fetched_txs.len()); + if !fetched_txs.is_empty() { let mut mempool = mempool.write().unwrap(); mempool.add(fetched_txs)?; - mempool - .count + count .with_label_values(&["txs"]) .set(mempool.txstore.len() as f64); From c3ce3a96b0434cab16457b4bd62ac20277ddd423 Mon Sep 17 00:00:00 2001 From: Nadav Ivgi Date: Sun, 19 Oct 2025 18:20:59 +0300 Subject: [PATCH 2/2] Refactor to use HashSet::difference() --- src/new_index/mempool.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/new_index/mempool.rs b/src/new_index/mempool.rs index a4db05d91..c5788e683 100644 --- a/src/new_index/mempool.rs +++ b/src/new_index/mempool.rs @@ -540,8 +540,7 @@ impl Mempool { // Find transactions available in bitcoind's mempool but not indexed locally let new_txids = bitcoind_txids - .iter() - .filter(|&txid| !indexed_txids.contains(txid)) + .difference(&indexed_txids) .collect::>(); debug!(