From ee681a581e1532c06c9a58ae17d5f2740e552321 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Wed, 22 Apr 2026 13:47:53 +0200 Subject: [PATCH] fix(rpc): clean up eth state cache reorg entries --- crates/rpc/rpc-eth-types/src/cache/mod.rs | 110 ++++++++++++++++++++-- 1 file changed, 100 insertions(+), 10 deletions(-) diff --git a/crates/rpc/rpc-eth-types/src/cache/mod.rs b/crates/rpc/rpc-eth-types/src/cache/mod.rs index 820576f9483..c4b487b15df 100644 --- a/crates/rpc/rpc-eth-types/src/cache/mod.rs +++ b/crates/rpc/rpc-eth-types/src/cache/mod.rs @@ -344,17 +344,9 @@ where } /// Removes transaction index entries for a reorged block. - /// - /// Only removes entries that still point to this block, preserving mappings for transactions - /// that were re-mined in a new canonical block. fn remove_block_transactions(&mut self, block: &RecoveredBlock) { - let block_hash = block.hash(); for tx in block.body().transactions() { - if let Some((mapped_hash, _)) = self.tx_hash_index.get(tx.tx_hash()) && - *mapped_hash == block_hash - { - self.tx_hash_index.remove(tx.tx_hash()); - } + self.tx_hash_index.remove(tx.tx_hash()); } } @@ -421,6 +413,15 @@ where } } + fn on_reorg_header(&mut self, block_hash: B256, res: ProviderResult) { + if let Some(queued) = self.headers_cache.remove(&block_hash) { + // send the response to queued senders + for tx in queued { + let _ = tx.send(res.clone()); + } + } + } + /// Shrinks the queues but leaves some space for the next requests fn shrink_queues(&mut self) { let min_capacity = 2; @@ -597,9 +598,12 @@ where } CacheAction::RemoveReorgedChain { chain_change } => { for block in chain_change.blocks { + let block_hash = block.hash(); + let header = block.clone_header(); // Remove transaction index entries for reorged blocks this.remove_block_transactions(&block); - this.on_reorg_block(block.hash(), Ok(Some(block))); + this.on_reorg_block(block_hash, Ok(Some(block))); + this.on_reorg_header(block_hash, Ok(header)); } for block_receipts in chain_change.receipts { @@ -825,3 +829,89 @@ pub async fn cache_new_blocks_task( eth_state_cache.to_service.send(CacheAction::CacheNewCanonicalChain { chain_change }); } } + +#[cfg(test)] +mod tests { + use super::*; + use alloy_consensus::Header; + use alloy_primitives::{Address, Signature}; + use reth_ethereum_primitives::{ + Block, BlockBody, EthPrimitives, Transaction, TransactionSigned, + }; + use reth_primitives_traits::RecoveredBlock; + use reth_storage_api::noop::NoopProvider; + + fn test_service() -> EthStateCacheService { + let (_cache, service) = EthStateCache::::create( + NoopProvider::default(), + Runtime::test(), + 4, + 4, + 4, + 1, + 16, + ); + service + } + + fn test_block() -> RecoveredBlock { + RecoveredBlock::new_unhashed( + Block { + header: Header { number: 1, ..Default::default() }, + body: BlockBody { + transactions: vec![TransactionSigned::new_unhashed( + Transaction::Legacy(Default::default()), + Signature::test_signature(), + )], + ..Default::default() + }, + }, + vec![Address::ZERO], + ) + } + + #[test] + fn reorg_evicts_cached_headers() { + let mut service = test_service(); + let block_hash = B256::repeat_byte(0x11); + + assert!(service + .headers_cache + .insert(block_hash, Header { number: 42, ..Default::default() })); + assert!(service.headers_cache.get(&block_hash).is_some()); + + service.on_reorg_header(block_hash, Ok(Header { number: 7, ..Default::default() })); + + assert!(service.headers_cache.get(&block_hash).is_none()); + } + + #[test] + fn reorg_forwards_header_to_queued_requests() { + let mut service = test_service(); + let block_hash = B256::repeat_byte(0x22); + let (response_tx, mut response_rx) = oneshot::channel(); + let header = Header { number: 7, ..Default::default() }; + + assert!(service.headers_cache.queue(block_hash, response_tx)); + + service.on_reorg_header(block_hash, Ok(header)); + + let header = + response_rx.try_recv().expect("queued header response").expect("header result"); + + assert_eq!(header.number, 7); + } + + #[test] + fn reorg_removes_tx_hash_index_entries_unconditionally() { + let mut service = test_service(); + let block = test_block(); + let tx_hash = *block.body().transactions().next().expect("test transaction").tx_hash(); + + service.tx_hash_index.insert(tx_hash, (B256::repeat_byte(0x33), 0)); + + service.remove_block_transactions(&block); + + assert!(service.tx_hash_index.get(&tx_hash).is_none()); + } +}