Skip to content
Closed
237 changes: 234 additions & 3 deletions crates/engine/tree/src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ use crate::metrics::PersistenceMetrics;
use alloy_eips::BlockNumHash;
use crossbeam_channel::Sender as CrossbeamSender;
use reth_chain_state::ExecutedBlock;
use reth_db::Database;
use reth_errors::ProviderError;
use reth_ethereum_primitives::EthPrimitives;
use reth_primitives_traits::{FastInstant as Instant, NodePrimitives};
use reth_provider::{
providers::ProviderNodeTypes, BlockExecutionWriter, BlockHashReader, ChainStateBlockWriter,
DBProvider, DatabaseProviderFactory, ProviderFactory, SaveBlocksMode,
StaticFileProviderFactory, StaticFileWriter, StorageSettingsCache,
};
use reth_prune::{PrunerError, PrunerWithFactory};
use reth_stages_api::{MetricEvent, MetricEventsSender};
Expand All @@ -32,6 +34,21 @@ pub struct PersistenceResult {
pub commit_duration: Option<Duration>,
}

/// A deferred changeset prune from a `RemoveBlocksAbove` operation.
///
/// Changeset static file truncation is deferred to avoid truncating memory-mapped files while
/// concurrent readers may still hold handles. The prune is applied at the start of the next
/// `SaveBlocks`, after waiting for all MDBX readers from the reorg era to drain.
#[derive(Debug)]
struct DeferredChangesetPrune {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets please not add private types to top of file :)

/// The block number to prune changesets to.
target_block: u64,
/// MDBX txn ID recorded after the `RemoveBlocksAbove` commit. All readers with a txn ID
/// strictly below this value must complete before the prune can be applied safely, as
/// they hold a pre-unwind MDBX snapshot and may reference stale changeset data.
committed_txn_id: Option<u64>,
}

/// Writes parts of reth's in memory tree state to the database and static files.
///
/// This is meant to be a spawned service that listens for various incoming persistence operations,
Expand Down Expand Up @@ -60,6 +77,12 @@ where
/// Pending safe block number to be committed with the next block save.
/// This avoids triggering a separate fsync for each safe block update.
pending_safe_block: Option<u64>,
/// Deferred changeset prune from a previous `RemoveBlocksAbove` operation.
///
/// During reorgs, changeset static file truncation is deferred to the next `SaveBlocks`
/// commit to avoid truncating files while concurrent readers (payload builders, RPC) may
/// still hold stale memory-mapped file handles.
deferred_changeset_prune: Option<DeferredChangesetPrune>,
}

impl<N> PersistenceService<N>
Expand All @@ -81,6 +104,7 @@ where
sync_metrics_tx,
pending_finalized_block: None,
pending_safe_block: None,
deferred_changeset_prune: None,
}
}
}
Expand Down Expand Up @@ -128,7 +152,7 @@ where

#[instrument(level = "debug", target = "engine::persistence", skip_all, fields(%new_tip_num))]
fn on_remove_blocks_above(
&self,
&mut self,
new_tip_num: u64,
) -> Result<Option<BlockNumHash>, PersistenceError> {
debug!(target: "engine::persistence", ?new_tip_num, "Removing blocks");
Expand All @@ -137,8 +161,36 @@ where

let new_tip_hash = provider_rw.block_hash(new_tip_num)?;
provider_rw.remove_block_and_execution_above(new_tip_num)?;

// Defer changeset static file truncation to the next SaveBlocks commit.
//
// `remove_block_and_execution_above` queues prune strategies on the changeset SF
// writers. If we let commit() execute them now, it would truncate the underlying
// files while concurrent readers (payload builders, RPC) may still hold stale
// memory-mapped handles — causing a panic or SIGBUS.
//
// By taking the strategies here and re-applying them later in `on_save_blocks`,
// we ensure the truncation only happens when no reader needs the old data:
// after the reorg completes and the next batch of blocks is persisted.
if self.provider.cached_storage_settings().storage_v2 &&
let Some(target) = self.provider.static_file_provider().take_changeset_prunes()
{
let prev_target = self.deferred_changeset_prune.as_ref().map(|d| d.target_block);
self.deferred_changeset_prune = Some(DeferredChangesetPrune {
target_block: prev_target.map_or(target, |prev| prev.min(target)),
// txn ID will be set after commit below
committed_txn_id: None,
});
}

provider_rw.commit()?;

// Record the MDBX txn ID after commit so we can later verify all readers from
// this era have completed before applying the deferred changeset prune.
if let Some(deferred) = &mut self.deferred_changeset_prune {
deferred.committed_txn_id = self.provider.db_ref().last_txnid();
}

debug!(target: "engine::persistence", ?new_tip_num, ?new_tip_hash, "Removed blocks from disk");
self.metrics.remove_blocks_above_duration_seconds.record(start_time.elapsed());
Ok(new_tip_hash.map(|hash| BlockNumHash { hash, number: new_tip_num }))
Expand All @@ -160,6 +212,35 @@ where

let start_time = Instant::now();

// Apply any deferred changeset prunes from a previous RemoveBlocksAbove.
// Wait for all MDBX readers from the reorg era to complete before truncating,
// so no reader can observe a stale memory-mapped file.
if let Some(deferred) = self.deferred_changeset_prune.take() {
if let Some(prune_txn) = deferred.committed_txn_id {
while self
.provider
.db_ref()
.oldest_reader_txnid()
.is_some_and(|oldest| oldest < prune_txn)
Comment on lines +223 to +224

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this perhaps too relaxed?

this would also include rpc related transactions?
and there's a risk that an ethgetproof or adjacent tx will be open for a while?
should we either introduce a counter, that after like 3 blocks or so we always do this?

{
debug!(
target: "engine::persistence",
target_block = deferred.target_block,
prune_txn,
oldest_reader = ?self.provider.db_ref().oldest_reader_txnid(),
"Waiting for stale readers to drain before changeset prune"
);
std::thread::sleep(std::time::Duration::from_millis(10));
}
}

debug!(target: "engine::persistence", target_block = deferred.target_block, "Applying deferred changeset prunes");
let sf = self.provider.static_file_provider();
sf.requeue_changeset_prunes(deferred.target_block)?;
sf.commit()?;
debug!(target: "engine::persistence", target_block = deferred.target_block, "Applied deferred changeset prunes");
}

if let Some(last) = last_block {
let provider_rw = self.provider.database_provider_rw()?;
provider_rw.save_blocks(blocks, SaveBlocksMode::Full)?;
Expand Down Expand Up @@ -374,13 +455,35 @@ impl Drop for ServiceGuard {
#[cfg(test)]
mod tests {
use super::*;
use alloy_primitives::B256;
use alloy_consensus::Header;
use alloy_primitives::{Address, B256, U256};
use reth_chain_state::test_utils::TestBlockBuilder;
use reth_execution_types::{BlockExecutionOutput, BlockExecutionResult};
use reth_exex_types::FinishedExExHeight;
use reth_provider::test_utils::create_test_provider_factory;
use reth_primitives_traits::{SealedBlock, SealedHeader};
use reth_provider::{
test_utils::create_test_provider_factory, ChangeSetReader, HeaderProvider, StorageSettings,
StorageSettingsCache,
};
use reth_prune::Pruner;
use reth_revm::db::BundleState;
use revm_state::AccountInfo;
use std::sync::Arc;
use tokio::sync::mpsc::unbounded_channel;

fn persistence_handle_with_factory(
provider: ProviderFactory<reth_provider::test_utils::MockNodeTypesWithDB>,
) -> PersistenceHandle<EthPrimitives> {
let (_finished_exex_height_tx, finished_exex_height_rx) =
tokio::sync::watch::channel(FinishedExExHeight::NoExExs);

let pruner =
Pruner::new_with_factory(provider.clone(), vec![], 5, 0, None, finished_exex_height_rx);

let (sync_metrics_tx, _sync_metrics_rx) = unbounded_channel();
PersistenceHandle::<EthPrimitives>::spawn_service(provider, pruner, sync_metrics_tx)
}

fn default_persistence_handle() -> PersistenceHandle<EthPrimitives> {
let provider = create_test_provider_factory();

Expand Down Expand Up @@ -509,4 +612,132 @@ mod tests {
let expected: Vec<u64> = (15..25).collect();
assert_eq!(entries, expected, "new entries 20..25 must survive pruning");
}

/// Verifies that changeset static file truncation is deferred during reorgs.
///
/// Headers are truncated immediately by `RemoveBlocksAbove`, but changeset prunes are
/// deferred to the next `SaveBlocks` to avoid truncating files while concurrent readers
/// (payload builders, RPC) may still hold memory-mapped handles.
#[test]
fn test_deferred_changeset_prune_on_reorg() {
let factory = create_test_provider_factory();
factory.set_storage_settings_cache(StorageSettings::v2());

// -- Phase 1: Persist genesis + blocks 1..3 with account state changes --

let genesis = SealedBlock::<reth_ethereum_primitives::Block>::from_sealed_parts(
SealedHeader::new(
Header { number: 0, difficulty: U256::from(1), ..Default::default() },
B256::ZERO,
),
Default::default(),
);
let genesis_executed = ExecutedBlock::new(
Arc::new(genesis.try_recover().unwrap()),
Arc::new(BlockExecutionOutput {
result: BlockExecutionResult {
receipts: vec![],
requests: Default::default(),
gas_used: 0,
blob_gas_used: 0,
},
state: Default::default(),
}),
Default::default(),
);

let provider_rw = factory.provider_rw().unwrap();
provider_rw.save_blocks(vec![genesis_executed], SaveBlocksMode::Full).unwrap();
provider_rw.commit().unwrap();

// Build blocks 1..3 with account changes so changesets are written to SF
let mut blocks = Vec::new();
let mut parent_hash = B256::ZERO;
for block_num in 1..=3u64 {
let address = Address::with_last_byte(block_num as u8);
let bundle = BundleState::builder(block_num..=block_num)
.state_present_account_info(
address,
AccountInfo {
nonce: block_num,
balance: U256::from(block_num * 100),
..Default::default()
},
)
.revert_account_info(block_num, address, Some(None))
.build();

let header = Header {
number: block_num,
parent_hash,
difficulty: U256::from(1),
..Default::default()
};
let block = SealedBlock::<reth_ethereum_primitives::Block>::seal_parts(
header,
Default::default(),
);
parent_hash = block.hash();

blocks.push(ExecutedBlock::new(
Arc::new(block.try_recover().unwrap()),
Arc::new(BlockExecutionOutput {
result: BlockExecutionResult {
receipts: vec![],
requests: Default::default(),
gas_used: 0,
blob_gas_used: 0,
},
state: bundle,
}),
Default::default(),
));
}

let handle = persistence_handle_with_factory(factory.clone());
let (tx, rx) = crossbeam_channel::bounded(1);
handle.save_blocks(blocks, tx).unwrap();
rx.recv_timeout(std::time::Duration::from_secs(10)).expect("save_blocks timed out");

// -- Phase 2: Simulate builder getting a reader before reorg --

let sf = factory.static_file_provider();

// Builder reads block 2 changesets — this should work
let changesets_block2 = sf.account_block_changeset(2).unwrap();
assert!(!changesets_block2.is_empty(), "block 2 should have changesets before reorg");

// -- Phase 3: Reorg via RemoveBlocksAbove(1) --

let (tx, rx) = crossbeam_channel::bounded(1);
handle.remove_blocks_above(1, tx).unwrap();
rx.recv_timeout(std::time::Duration::from_secs(10)).expect("remove_blocks timed out");

// -- Phase 4: Headers are truncated immediately, but changesets are deferred --

assert!(
sf.header_by_number(2).unwrap().is_none(),
"header 2 should be gone after reorg (truncated immediately)"
);

let changesets_after_reorg = sf.account_block_changeset(2).unwrap();
assert!(
!changesets_after_reorg.is_empty(),
"block 2 changesets should still be readable after reorg (prune deferred)"
);

// -- Phase 5: Next save_blocks applies the deferred prune --

let (tx, rx) = crossbeam_channel::bounded(1);
handle.save_blocks(vec![], tx).unwrap();
rx.recv_timeout(std::time::Duration::from_secs(10)).expect("save_blocks timed out");

// -- Phase 6: Block 2 changesets should now be gone --

let changesets_after_prune = sf.account_block_changeset(2).unwrap();
assert!(
changesets_after_prune.is_empty(),
"block 2 changesets should be gone after deferred prune applied"
);
}
}
32 changes: 32 additions & 0 deletions crates/storage/db-api/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,22 @@ pub trait Database: Send + Sync + Debug {
Ok(res)
}

/// Returns the transaction ID of the oldest active reader, if available.
///
/// Used to check whether stale readers from a previous write transaction have completed.
/// MDBX-backed implementations derive this from the environment's `mi_latter_reader_txnid`
/// field, which tracks the oldest reader still pinning old pages.
///
/// Returns `None` if no readers are active or the backend does not support this query.
fn oldest_reader_txnid(&self) -> Option<u64> {
None
}

/// Returns the ID of the most recently committed transaction, if available.
fn last_txnid(&self) -> Option<u64> {
None
}

/// Takes a function and passes a write-read transaction into it, making sure it's committed in
/// the end of the execution.
fn update<T, F>(&self, f: F) -> Result<T, DatabaseError>
Expand Down Expand Up @@ -69,6 +85,14 @@ impl<DB: Database> Database for Arc<DB> {
fn path(&self) -> PathBuf {
<DB as Database>::path(self)
}

fn oldest_reader_txnid(&self) -> Option<u64> {
<DB as Database>::oldest_reader_txnid(self)
}

fn last_txnid(&self) -> Option<u64> {
<DB as Database>::last_txnid(self)
}
}

impl<DB: Database> Database for &DB {
Expand All @@ -86,4 +110,12 @@ impl<DB: Database> Database for &DB {
fn path(&self) -> PathBuf {
<DB as Database>::path(self)
}

fn oldest_reader_txnid(&self) -> Option<u64> {
<DB as Database>::oldest_reader_txnid(self)
}

fn last_txnid(&self) -> Option<u64> {
<DB as Database>::last_txnid(self)
}
}
20 changes: 20 additions & 0 deletions crates/storage/db/src/implementation/mdbx/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,26 @@ impl Database for DatabaseEnv {
fn path(&self) -> PathBuf {
self.path.clone()
}

fn oldest_reader_txnid(&self) -> Option<u64> {
let info = self.inner.info().ok()?;
let txnid = info.latter_reader_txnid();
if txnid == 0 {
None
} else {
Some(txnid)
}
}

fn last_txnid(&self) -> Option<u64> {
let info = self.inner.info().ok()?;
let txnid = info.last_txnid();
if txnid == 0 {
None
} else {
Some(txnid as u64)
}
}
}

impl DatabaseMetrics for DatabaseEnv {
Expand Down
Loading
Loading