Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 100 additions & 10 deletions crates/rpc/rpc-eth-types/src/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,17 +344,9 @@ where
}

/// Removes transaction index entries for a reorged block.
///
/// Only removes entries that still point to this block, preserving mappings for transactions
/// that were re-mined in a new canonical block.
fn remove_block_transactions(&mut self, block: &RecoveredBlock<Provider::Block>) {
let block_hash = block.hash();
for tx in block.body().transactions() {
if let Some((mapped_hash, _)) = self.tx_hash_index.get(tx.tx_hash()) &&
*mapped_hash == block_hash
{
self.tx_hash_index.remove(tx.tx_hash());
}
self.tx_hash_index.remove(tx.tx_hash());
}
}

Expand Down Expand Up @@ -421,6 +413,15 @@ where
}
}

fn on_reorg_header(&mut self, block_hash: B256, res: ProviderResult<Provider::Header>) {
if let Some(queued) = self.headers_cache.remove(&block_hash) {
// send the response to queued senders
for tx in queued {
let _ = tx.send(res.clone());
}
}
}

/// Shrinks the queues but leaves some space for the next requests
fn shrink_queues(&mut self) {
let min_capacity = 2;
Expand Down Expand Up @@ -597,9 +598,12 @@ where
}
CacheAction::RemoveReorgedChain { chain_change } => {
for block in chain_change.blocks {
let block_hash = block.hash();
let header = block.clone_header();
// Remove transaction index entries for reorged blocks
this.remove_block_transactions(&block);
this.on_reorg_block(block.hash(), Ok(Some(block)));
this.on_reorg_block(block_hash, Ok(Some(block)));
this.on_reorg_header(block_hash, Ok(header));
}

for block_receipts in chain_change.receipts {
Expand Down Expand Up @@ -825,3 +829,89 @@ pub async fn cache_new_blocks_task<St, N: NodePrimitives>(
eth_state_cache.to_service.send(CacheAction::CacheNewCanonicalChain { chain_change });
}
}

#[cfg(test)]
mod tests {
use super::*;
use alloy_consensus::Header;
use alloy_primitives::{Address, Signature};
use reth_ethereum_primitives::{
Block, BlockBody, EthPrimitives, Transaction, TransactionSigned,
};
use reth_primitives_traits::RecoveredBlock;
use reth_storage_api::noop::NoopProvider;

fn test_service() -> EthStateCacheService<NoopProvider, Runtime> {
let (_cache, service) = EthStateCache::<EthPrimitives>::create(
NoopProvider::default(),
Runtime::test(),
4,
4,
4,
1,
16,
);
service
}

fn test_block() -> RecoveredBlock<Block> {
RecoveredBlock::new_unhashed(
Block {
header: Header { number: 1, ..Default::default() },
body: BlockBody {
transactions: vec![TransactionSigned::new_unhashed(
Transaction::Legacy(Default::default()),
Signature::test_signature(),
)],
..Default::default()
},
},
vec![Address::ZERO],
)
}

#[test]
fn reorg_evicts_cached_headers() {
let mut service = test_service();
let block_hash = B256::repeat_byte(0x11);

assert!(service
.headers_cache
.insert(block_hash, Header { number: 42, ..Default::default() }));
assert!(service.headers_cache.get(&block_hash).is_some());

service.on_reorg_header(block_hash, Ok(Header { number: 7, ..Default::default() }));

assert!(service.headers_cache.get(&block_hash).is_none());
}

#[test]
fn reorg_forwards_header_to_queued_requests() {
let mut service = test_service();
let block_hash = B256::repeat_byte(0x22);
let (response_tx, mut response_rx) = oneshot::channel();
let header = Header { number: 7, ..Default::default() };

assert!(service.headers_cache.queue(block_hash, response_tx));

service.on_reorg_header(block_hash, Ok(header));

let header =
response_rx.try_recv().expect("queued header response").expect("header result");

assert_eq!(header.number, 7);
}

#[test]
fn reorg_removes_tx_hash_index_entries_unconditionally() {
let mut service = test_service();
let block = test_block();
let tx_hash = *block.body().transactions().next().expect("test transaction").tx_hash();

service.tx_hash_index.insert(tx_hash, (B256::repeat_byte(0x33), 0));

service.remove_block_transactions(&block);

assert!(service.tx_hash_index.get(&tx_hash).is_none());
}
}
Loading