diff --git a/crates/prune/prune/src/segments/user/account_history.rs b/crates/prune/prune/src/segments/user/account_history.rs index 216f0d6c0ca..394083c2ae4 100644 --- a/crates/prune/prune/src/segments/user/account_history.rs +++ b/crates/prune/prune/src/segments/user/account_history.rs @@ -239,6 +239,7 @@ impl AccountHistory { Provider: DBProvider + StaticFileProviderFactory + ChangeSetReader + RocksDBProviderFactory, { use reth_provider::PruneShardOutcome; + use std::collections::hash_map::Entry; // Unlike MDBX path, we don't divide the limit by 2 because RocksDB path only prunes // history shards (no separate changeset table to delete from). The changesets are in @@ -267,10 +268,24 @@ impl AccountHistory { break; } let (block_number, changeset) = result?; - highest_deleted_accounts.insert(changeset.address, block_number); + + // Always track scan progress (even if this address repeats) last_changeset_pruned_block = Some(block_number); changesets_processed += 1; - limiter.increment_deleted_entries_count(); + + // Only charge the limiter for NEW unique addresses (RocksDB shard work) + match highest_deleted_accounts.entry(changeset.address) { + Entry::Vacant(v) => { + v.insert(block_number); + limiter.increment_deleted_entries_count(); + } + Entry::Occupied(mut o) => { + // keep highest block seen for this address + if block_number > *o.get() { + o.insert(block_number); + } + } + } } trace!(target: "pruner", processed = %changesets_processed, %done, "Scanned account changesets from static files"); @@ -919,4 +934,160 @@ mod tests { let final_changesets = db.table::().unwrap(); assert!(final_changesets.is_empty(), "All changesets up to block 10 should be pruned"); } + + /// Regression test for RETH-292: RocksDB pruning limiter counts changesets scanned + /// instead of RocksDB shards modified. + /// + /// The issue: With ~7k changesets/block and 20k limit, only 2-3 blocks can be + /// processed because the limiter increments per changeset scanned from static files, + /// not per RocksDB shard actually deleted/updated. + /// + /// This test demonstrates that: + /// 1. Many changesets can map to few unique addresses (heavy repetition) + /// 2. The limiter should count unique addresses (RocksDB work), not changesets + /// 3. With proper accounting, more blocks can be processed per run + #[cfg(all(unix, feature = "rocksdb"))] + #[test] + fn reth_292_limiter_counts_changesets_not_shards() { + use alloy_primitives::{Address, U256}; + use reth_db_api::models::ShardedKey; + use reth_primitives_traits::Account; + use reth_provider::RocksDBProviderFactory; + use reth_testing_utils::generators::ChangeSet; + + let db = TestStageDB::default(); + let mut rng = generators::rng(); + + // Create 10 blocks + let num_blocks = 10u64; + let blocks = random_block_range( + &mut rng, + 0..=(num_blocks - 1), + BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() }, + ); + db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks"); + + // Create changesets with HIGH REPETITION: 1000 changesets per block, + // but only 10 unique addresses total. This simulates contracts that are + // touched many times per block (e.g., DEX routers, token contracts). + let changesets_per_block = 1000usize; + let unique_addresses = 10usize; + let account = Account { nonce: 1, balance: U256::from(100), bytecode_hash: None }; + + // Pre-generate the 10 unique addresses + let addresses: Vec
= + (0..unique_addresses).map(|i| Address::with_last_byte(i as u8)).collect(); + + let mut changesets: Vec = Vec::new(); + for _ in 0..num_blocks { + let mut block_changesets = Vec::new(); + for i in 0..changesets_per_block { + // Cycle through the same 10 addresses repeatedly + let addr = addresses[i % unique_addresses]; + block_changesets.push((addr, account, vec![])); + } + changesets.push(block_changesets); + } + + let total_changesets: usize = changesets.iter().map(|c| c.len()).sum(); + assert_eq!(total_changesets, 10_000, "10 blocks × 1000 changesets = 10,000 total"); + + db.insert_changesets_to_static_files(changesets.clone(), None) + .expect("insert changesets to static files"); + + // Build account_blocks map - note: only 10 unique addresses! + let mut account_blocks: BTreeMap> = BTreeMap::new(); + for (block, changeset) in changesets.iter().enumerate() { + for (address, _, _) in changeset { + account_blocks.entry(*address).or_default().push(block as u64); + } + } + assert_eq!(account_blocks.len(), unique_addresses, "Only 10 unique addresses"); + + // Insert history shards into RocksDB + let rocksdb = db.factory.rocksdb_provider(); + let mut batch = rocksdb.batch(); + for (address, block_numbers) in &account_blocks { + let mut sorted = block_numbers.clone(); + sorted.sort(); + sorted.dedup(); + if sorted.is_empty() { + continue; + } + let shard = BlockNumberList::new_pre_sorted(sorted.iter().copied()); + batch + .put::(ShardedKey::new(*address, u64::MAX), &shard) + .unwrap(); + } + batch.commit().unwrap(); + + db.factory.set_storage_settings_cache( + StorageSettings::default() + .with_account_changesets_in_static_files(true) + .with_account_history_in_rocksdb(true), + ); + + let to_block = num_blocks - 1; + let prune_mode = PruneMode::Before(to_block); + let segment = AccountHistory::new(prune_mode); + + // Set a limit that SHOULD allow processing all blocks if we count correctly: + // - 10 unique addresses = 10 RocksDB shard operations + // - But 10,000 changesets scanned + // + // With limit = 100: + // - If counting RocksDB shards: 10 << 100, should process all blocks + // - If counting changesets: 10,000 >> 100, stops after ~1 block + let limit = 100usize; + let limiter = PruneLimiter::default().set_deleted_entries_limit(limit); + let input = PruneInput { previous_checkpoint: None, to_block, limiter }; + + let provider = db.factory.database_provider_rw().unwrap(); + let result = segment.prune(&provider, input).unwrap(); + provider.commit().expect("commit"); + + let blocks_processed = result.checkpoint.as_ref().and_then(|c| c.block_number).unwrap_or(0); + + // CURRENT BEHAVIOR (BUG): With limit=100 and 1000 changesets/block, + // the limiter stops after scanning ~100 changesets (< 1 block). + // + // EXPECTED BEHAVIOR (FIX): With limit=100 and only 10 unique addresses, + // the limiter should allow processing all 10 blocks since we only + // touch 10 RocksDB shards. + + assert!( + result.progress.is_finished(), + "RETH-292 FIX VERIFIED: With limit={} and {} changesets/block but only {} unique addresses, \ + pruning should finish because limiter counts unique addresses (RocksDB shard work), not changesets. \ + Checkpoint block: {}. Processed {} entries.", + limit, + changesets_per_block, + unique_addresses, + blocks_processed, + result.pruned, + ); + + // Log the behavior for clarity + eprintln!( + "\n=== RETH-292 Fix Verified ===\n\ + Limit: {}\n\ + Changesets per block: {}\n\ + Unique addresses (RocksDB shards): {}\n\ + Blocks processed: {}\n\ + Pruned count reported: {}\n\ + Finished: {}\n\ + \n\ + FIX: Limiter now counts {} unique addresses (shards), not {} changesets.\n\ + All {} blocks processed as expected.\n", + limit, + changesets_per_block, + unique_addresses, + blocks_processed, + result.pruned, + result.progress.is_finished(), + unique_addresses, + total_changesets, + num_blocks + ); + } } diff --git a/crates/prune/prune/src/segments/user/storage_history.rs b/crates/prune/prune/src/segments/user/storage_history.rs index 47c5ab1364a..f7f8911133a 100644 --- a/crates/prune/prune/src/segments/user/storage_history.rs +++ b/crates/prune/prune/src/segments/user/storage_history.rs @@ -243,6 +243,7 @@ impl StorageHistory { Provider: DBProvider + StaticFileProviderFactory + RocksDBProviderFactory, { use reth_provider::PruneShardOutcome; + use std::collections::hash_map::Entry; let mut limiter = input.limiter; @@ -270,10 +271,25 @@ impl StorageHistory { let (block_address, entry) = result?; let block_number = block_address.block_number(); let address = block_address.address(); - highest_deleted_storages.insert((address, entry.key), block_number); + + // Always track scan progress (even if this (address, slot) repeats) last_changeset_pruned_block = Some(block_number); changesets_processed += 1; - limiter.increment_deleted_entries_count(); + + let key = (address, entry.key); + + // Only charge the limiter for NEW unique (address, slot) pairs (RocksDB shard work) + match highest_deleted_storages.entry(key) { + Entry::Vacant(v) => { + v.insert(block_number); + limiter.increment_deleted_entries_count(); + } + Entry::Occupied(mut o) => { + if block_number > *o.get() { + o.insert(block_number); + } + } + } } trace!(target: "pruner", processed = %changesets_processed, %done, "Scanned storage changesets from static files"); diff --git a/scripts/patch-alloy.sh b/scripts/patch-alloy.sh deleted file mode 100755 index be6017581b3..00000000000 --- a/scripts/patch-alloy.sh +++ /dev/null @@ -1,114 +0,0 @@ -#!/usr/bin/env bash -# Patches alloy dependencies in Cargo.toml for testing breaking changes. -# -# Usage: -# ./scripts/patch-alloy.sh [--alloy ] [--evm ] [--op ] -# -# Examples: -# ./scripts/patch-alloy.sh --alloy main -# ./scripts/patch-alloy.sh --alloy feat/new-api --evm main -# ./scripts/patch-alloy.sh --alloy main --evm main --op main - -set -euo pipefail - -ALLOY_BRANCH="" -ALLOY_EVM_BRANCH="" -OP_ALLOY_BRANCH="" - -while [[ $# -gt 0 ]]; do - case $1 in - --alloy) - ALLOY_BRANCH="$2" - shift 2 - ;; - --evm) - ALLOY_EVM_BRANCH="$2" - shift 2 - ;; - --op) - OP_ALLOY_BRANCH="$2" - shift 2 - ;; - -h|--help) - echo "Usage: $0 [--alloy ] [--evm ] [--op ]" - echo "" - echo "Options:" - echo " --alloy Patch alloy-rs/alloy crates" - echo " --evm Patch alloy-rs/evm crates (alloy-evm, alloy-op-evm)" - echo " --op Patch alloy-rs/op-alloy crates" - exit 0 - ;; - *) - echo "Unknown option: $1" - exit 1 - ;; - esac -done - -if [[ -z "$ALLOY_BRANCH" && -z "$ALLOY_EVM_BRANCH" && -z "$OP_ALLOY_BRANCH" ]]; then - echo "Error: At least one of --alloy, --evm, or --op must be specified" - exit 1 -fi - -CARGO_TOML="${CARGO_TOML:-Cargo.toml}" - -echo "Patching $CARGO_TOML..." -echo "" >> "$CARGO_TOML" - -if [[ -n "$ALLOY_BRANCH" ]]; then - echo "Patching alloy-rs/alloy with branch: $ALLOY_BRANCH" - cat >> "$CARGO_TOML" << EOF -# Patched by patch-alloy.sh -alloy-consensus = { git = "https://github.com/alloy-rs/alloy", branch = "$ALLOY_BRANCH" } -alloy-contract = { git = "https://github.com/alloy-rs/alloy", branch = "$ALLOY_BRANCH" } -alloy-eips = { git = "https://github.com/alloy-rs/alloy", branch = "$ALLOY_BRANCH" } -alloy-genesis = { git = "https://github.com/alloy-rs/alloy", branch = "$ALLOY_BRANCH" } -alloy-json-rpc = { git = "https://github.com/alloy-rs/alloy", branch = "$ALLOY_BRANCH" } -alloy-network = { git = "https://github.com/alloy-rs/alloy", branch = "$ALLOY_BRANCH" } -alloy-network-primitives = { git = "https://github.com/alloy-rs/alloy", branch = "$ALLOY_BRANCH" } -alloy-provider = { git = "https://github.com/alloy-rs/alloy", branch = "$ALLOY_BRANCH" } -alloy-pubsub = { git = "https://github.com/alloy-rs/alloy", branch = "$ALLOY_BRANCH" } -alloy-rpc-client = { git = "https://github.com/alloy-rs/alloy", branch = "$ALLOY_BRANCH" } -alloy-rpc-types = { git = "https://github.com/alloy-rs/alloy", branch = "$ALLOY_BRANCH" } -alloy-rpc-types-admin = { git = "https://github.com/alloy-rs/alloy", branch = "$ALLOY_BRANCH" } -alloy-rpc-types-anvil = { git = "https://github.com/alloy-rs/alloy", branch = "$ALLOY_BRANCH" } -alloy-rpc-types-beacon = { git = "https://github.com/alloy-rs/alloy", branch = "$ALLOY_BRANCH" } -alloy-rpc-types-debug = { git = "https://github.com/alloy-rs/alloy", branch = "$ALLOY_BRANCH" } -alloy-rpc-types-engine = { git = "https://github.com/alloy-rs/alloy", branch = "$ALLOY_BRANCH" } -alloy-rpc-types-eth = { git = "https://github.com/alloy-rs/alloy", branch = "$ALLOY_BRANCH" } -alloy-rpc-types-mev = { git = "https://github.com/alloy-rs/alloy", branch = "$ALLOY_BRANCH" } -alloy-rpc-types-trace = { git = "https://github.com/alloy-rs/alloy", branch = "$ALLOY_BRANCH" } -alloy-rpc-types-txpool = { git = "https://github.com/alloy-rs/alloy", branch = "$ALLOY_BRANCH" } -alloy-serde = { git = "https://github.com/alloy-rs/alloy", branch = "$ALLOY_BRANCH" } -alloy-signer = { git = "https://github.com/alloy-rs/alloy", branch = "$ALLOY_BRANCH" } -alloy-signer-local = { git = "https://github.com/alloy-rs/alloy", branch = "$ALLOY_BRANCH" } -alloy-transport = { git = "https://github.com/alloy-rs/alloy", branch = "$ALLOY_BRANCH" } -alloy-transport-http = { git = "https://github.com/alloy-rs/alloy", branch = "$ALLOY_BRANCH" } -alloy-transport-ipc = { git = "https://github.com/alloy-rs/alloy", branch = "$ALLOY_BRANCH" } -alloy-transport-ws = { git = "https://github.com/alloy-rs/alloy", branch = "$ALLOY_BRANCH" } -EOF -fi - -if [[ -n "$ALLOY_EVM_BRANCH" ]]; then - echo "Patching alloy-rs/evm with branch: $ALLOY_EVM_BRANCH" - cat >> "$CARGO_TOML" << EOF -alloy-evm = { git = "https://github.com/alloy-rs/evm", branch = "$ALLOY_EVM_BRANCH" } -alloy-op-evm = { git = "https://github.com/alloy-rs/evm", branch = "$ALLOY_EVM_BRANCH" } -EOF -fi - -if [[ -n "$OP_ALLOY_BRANCH" ]]; then - echo "Patching alloy-rs/op-alloy with branch: $OP_ALLOY_BRANCH" - cat >> "$CARGO_TOML" << EOF -op-alloy-consensus = { git = "https://github.com/alloy-rs/op-alloy", branch = "$OP_ALLOY_BRANCH" } -op-alloy-network = { git = "https://github.com/alloy-rs/op-alloy", branch = "$OP_ALLOY_BRANCH" } -op-alloy-rpc-types = { git = "https://github.com/alloy-rs/op-alloy", branch = "$OP_ALLOY_BRANCH" } -op-alloy-rpc-types-engine = { git = "https://github.com/alloy-rs/op-alloy", branch = "$OP_ALLOY_BRANCH" } -op-alloy-rpc-jsonrpsee = { git = "https://github.com/alloy-rs/op-alloy", branch = "$OP_ALLOY_BRANCH" } -EOF -fi - -echo "Done. Patches appended to $CARGO_TOML" -echo "" -echo "Run 'cargo check --workspace --all-features' to verify compilation." -echo "Run 'git checkout $CARGO_TOML' to revert changes."