Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 173 additions & 2 deletions crates/prune/prune/src/segments/user/account_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ impl AccountHistory {
Provider: DBProvider + StaticFileProviderFactory + ChangeSetReader + RocksDBProviderFactory,
{
use reth_provider::PruneShardOutcome;
use std::collections::hash_map::Entry;

// Unlike MDBX path, we don't divide the limit by 2 because RocksDB path only prunes
// history shards (no separate changeset table to delete from). The changesets are in
Expand Down Expand Up @@ -267,10 +268,24 @@ impl AccountHistory {
break;
}
let (block_number, changeset) = result?;
highest_deleted_accounts.insert(changeset.address, block_number);

// Always track scan progress (even if this address repeats)
last_changeset_pruned_block = Some(block_number);
changesets_processed += 1;
limiter.increment_deleted_entries_count();

// Only charge the limiter for NEW unique addresses (RocksDB shard work)
match highest_deleted_accounts.entry(changeset.address) {
Entry::Vacant(v) => {
v.insert(block_number);
limiter.increment_deleted_entries_count();
}
Entry::Occupied(mut o) => {
// keep highest block seen for this address
if block_number > *o.get() {
o.insert(block_number);
}
}
}
}
trace!(target: "pruner", processed = %changesets_processed, %done, "Scanned account changesets from static files");

Expand Down Expand Up @@ -919,4 +934,160 @@ mod tests {
let final_changesets = db.table::<tables::AccountChangeSets>().unwrap();
assert!(final_changesets.is_empty(), "All changesets up to block 10 should be pruned");
}

/// Regression test for RETH-292: RocksDB pruning limiter counts changesets scanned
/// instead of RocksDB shards modified.
///
/// The issue: With ~7k changesets/block and 20k limit, only 2-3 blocks can be
/// processed because the limiter increments per changeset scanned from static files,
/// not per RocksDB shard actually deleted/updated.
///
/// This test demonstrates that:
/// 1. Many changesets can map to few unique addresses (heavy repetition)
/// 2. The limiter should count unique addresses (RocksDB work), not changesets
/// 3. With proper accounting, more blocks can be processed per run
#[cfg(all(unix, feature = "rocksdb"))]
#[test]
fn reth_292_limiter_counts_changesets_not_shards() {
use alloy_primitives::{Address, U256};
use reth_db_api::models::ShardedKey;
use reth_primitives_traits::Account;
use reth_provider::RocksDBProviderFactory;
use reth_testing_utils::generators::ChangeSet;

let db = TestStageDB::default();
let mut rng = generators::rng();

// Create 10 blocks
let num_blocks = 10u64;
let blocks = random_block_range(
&mut rng,
0..=(num_blocks - 1),
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
);
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");

// Create changesets with HIGH REPETITION: 1000 changesets per block,
// but only 10 unique addresses total. This simulates contracts that are
// touched many times per block (e.g., DEX routers, token contracts).
let changesets_per_block = 1000usize;
let unique_addresses = 10usize;
let account = Account { nonce: 1, balance: U256::from(100), bytecode_hash: None };

// Pre-generate the 10 unique addresses
let addresses: Vec<Address> =
(0..unique_addresses).map(|i| Address::with_last_byte(i as u8)).collect();

let mut changesets: Vec<ChangeSet> = Vec::new();
for _ in 0..num_blocks {
let mut block_changesets = Vec::new();
for i in 0..changesets_per_block {
// Cycle through the same 10 addresses repeatedly
let addr = addresses[i % unique_addresses];
block_changesets.push((addr, account, vec![]));
}
changesets.push(block_changesets);
}

let total_changesets: usize = changesets.iter().map(|c| c.len()).sum();
assert_eq!(total_changesets, 10_000, "10 blocks × 1000 changesets = 10,000 total");

db.insert_changesets_to_static_files(changesets.clone(), None)
.expect("insert changesets to static files");

// Build account_blocks map - note: only 10 unique addresses!
let mut account_blocks: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
for (block, changeset) in changesets.iter().enumerate() {
for (address, _, _) in changeset {
account_blocks.entry(*address).or_default().push(block as u64);
}
}
assert_eq!(account_blocks.len(), unique_addresses, "Only 10 unique addresses");

// Insert history shards into RocksDB
let rocksdb = db.factory.rocksdb_provider();
let mut batch = rocksdb.batch();
for (address, block_numbers) in &account_blocks {
let mut sorted = block_numbers.clone();
sorted.sort();
sorted.dedup();
if sorted.is_empty() {
continue;
}
let shard = BlockNumberList::new_pre_sorted(sorted.iter().copied());
batch
.put::<tables::AccountsHistory>(ShardedKey::new(*address, u64::MAX), &shard)
.unwrap();
}
batch.commit().unwrap();

db.factory.set_storage_settings_cache(
StorageSettings::default()
.with_account_changesets_in_static_files(true)
.with_account_history_in_rocksdb(true),
);

let to_block = num_blocks - 1;
let prune_mode = PruneMode::Before(to_block);
let segment = AccountHistory::new(prune_mode);

// Set a limit that SHOULD allow processing all blocks if we count correctly:
// - 10 unique addresses = 10 RocksDB shard operations
// - But 10,000 changesets scanned
//
// With limit = 100:
// - If counting RocksDB shards: 10 << 100, should process all blocks
// - If counting changesets: 10,000 >> 100, stops after ~1 block
let limit = 100usize;
let limiter = PruneLimiter::default().set_deleted_entries_limit(limit);
let input = PruneInput { previous_checkpoint: None, to_block, limiter };

let provider = db.factory.database_provider_rw().unwrap();
let result = segment.prune(&provider, input).unwrap();
provider.commit().expect("commit");

let blocks_processed = result.checkpoint.as_ref().and_then(|c| c.block_number).unwrap_or(0);

// CURRENT BEHAVIOR (BUG): With limit=100 and 1000 changesets/block,
// the limiter stops after scanning ~100 changesets (< 1 block).
//
// EXPECTED BEHAVIOR (FIX): With limit=100 and only 10 unique addresses,
// the limiter should allow processing all 10 blocks since we only
// touch 10 RocksDB shards.

assert!(
result.progress.is_finished(),
"RETH-292 FIX VERIFIED: With limit={} and {} changesets/block but only {} unique addresses, \
pruning should finish because limiter counts unique addresses (RocksDB shard work), not changesets. \
Checkpoint block: {}. Processed {} entries.",
limit,
changesets_per_block,
unique_addresses,
blocks_processed,
result.pruned,
);

// Log the behavior for clarity
eprintln!(
"\n=== RETH-292 Fix Verified ===\n\
Limit: {}\n\
Changesets per block: {}\n\
Unique addresses (RocksDB shards): {}\n\
Blocks processed: {}\n\
Pruned count reported: {}\n\
Finished: {}\n\
\n\
FIX: Limiter now counts {} unique addresses (shards), not {} changesets.\n\
All {} blocks processed as expected.\n",
limit,
changesets_per_block,
unique_addresses,
blocks_processed,
result.pruned,
result.progress.is_finished(),
unique_addresses,
total_changesets,
num_blocks
);
}
}
20 changes: 18 additions & 2 deletions crates/prune/prune/src/segments/user/storage_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ impl StorageHistory {
Provider: DBProvider + StaticFileProviderFactory + RocksDBProviderFactory,
{
use reth_provider::PruneShardOutcome;
use std::collections::hash_map::Entry;

let mut limiter = input.limiter;

Expand Down Expand Up @@ -270,10 +271,25 @@ impl StorageHistory {
let (block_address, entry) = result?;
let block_number = block_address.block_number();
let address = block_address.address();
highest_deleted_storages.insert((address, entry.key), block_number);

// Always track scan progress (even if this (address, slot) repeats)
last_changeset_pruned_block = Some(block_number);
changesets_processed += 1;
limiter.increment_deleted_entries_count();

let key = (address, entry.key);

// Only charge the limiter for NEW unique (address, slot) pairs (RocksDB shard work)
match highest_deleted_storages.entry(key) {
Entry::Vacant(v) => {
v.insert(block_number);
limiter.increment_deleted_entries_count();
}
Entry::Occupied(mut o) => {
if block_number > *o.get() {
o.insert(block_number);
}
}
}
}

trace!(target: "pruner", processed = %changesets_processed, %done, "Scanned storage changesets from static files");
Expand Down
114 changes: 0 additions & 114 deletions scripts/patch-alloy.sh

This file was deleted.

Loading