Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions crates/cli/commands/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use reth_provider::{
providers::{BlockchainProvider, NodeTypesForProvider, StaticFileProvider},
ProviderFactory, StaticFileProviderFactory,
};
use reth_stages::{sets::DefaultStages, Pipeline};
use reth_stages::{sets::DefaultStages, Pipeline, PipelineTarget};
use reth_static_file::StaticFileProducer;
use std::{path::PathBuf, sync::Arc};
use tokio::sync::watch;
Expand Down Expand Up @@ -126,6 +126,7 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
where
C: ChainSpecParser<ChainSpec = N::ChainSpec>,
{
let has_receipt_pruning = config.prune.as_ref().is_some_and(|a| a.has_receipts_pruning());
let prune_modes =
config.prune.as_ref().map(|prune| prune.segments.clone()).unwrap_or_default();
let factory = ProviderFactory::<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>::new(
Expand All @@ -136,8 +137,9 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
.with_prune_modes(prune_modes.clone());

// Check for consistency between database and static files.
if let Some(unwind_target) =
factory.static_file_provider().check_consistency(&factory.provider()?)?
if let Some(unwind_target) = factory
.static_file_provider()
.check_consistency(&factory.provider()?, has_receipt_pruning)?
{
if factory.db_ref().is_read_only()? {
warn!(target: "reth::cli", ?unwind_target, "Inconsistent storage. Restart node to heal.");
Expand All @@ -148,7 +150,7 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {
// instead.
assert_ne!(
unwind_target,
0,
PipelineTarget::Unwind(0),
"A static file <> database inconsistency was found that would trigger an unwind to block 0"
);

Expand All @@ -173,7 +175,7 @@ impl<C: ChainSpecParser> EnvironmentArgs<C> {

// Move all applicable data from database to static files.
pipeline.move_to_static_files()?;
pipeline.unwind(unwind_target, None)?;
pipeline.unwind(unwind_target.unwind_target().expect("should exist"), None)?;
}

Ok(factory)
Expand Down
144 changes: 136 additions & 8 deletions crates/node/builder/src/launch/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ use eyre::Context;
use rayon::ThreadPoolBuilder;
use reth_chainspec::{Chain, EthChainSpec, EthereumHardfork, EthereumHardforks};
use reth_config::{config::EtlConfig, PruneConfig};
use reth_consensus::noop::NoopConsensus;
use reth_db_api::{database::Database, database_metrics::DatabaseMetrics};
use reth_db_common::init::{init_genesis, InitStorageError};
use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader};
use reth_engine_local::MiningMode;
use reth_evm::ConfigureEvm;
use reth_evm::{noop::NoopEvmConfig, ConfigureEvm};
use reth_exex::ExExManagerHandle;
use reth_fs_util as fs;
use reth_network_p2p::headers::client::HeadersClient;
Expand All @@ -65,19 +67,25 @@ use reth_node_metrics::{
};
use reth_provider::{
providers::{NodeTypesForProvider, ProviderNodeTypes, StaticFileProvider},
BlockNumReader, BlockReaderIdExt, ProviderError, ProviderFactory, ProviderResult,
StaticFileProviderFactory,
BlockHashReader, BlockNumReader, BlockReaderIdExt, ProviderError, ProviderFactory,
ProviderResult, StageCheckpointReader, StaticFileProviderFactory,
};
use reth_prune::{PruneModes, PrunerBuilder};
use reth_rpc_builder::config::RethRpcServerConfig;
use reth_rpc_layer::JwtSecret;
use reth_stages::{stages::EraImportSource, MetricEvent};
use reth_stages::{
sets::DefaultStages, stages::EraImportSource, MetricEvent, PipelineBuilder, PipelineTarget,
StageId,
};
use reth_static_file::StaticFileProducer;
use reth_tasks::TaskExecutor;
use reth_tracing::tracing::{debug, error, info, warn};
use reth_transaction_pool::TransactionPool;
use std::{sync::Arc, thread::available_parallelism};
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedSender},
oneshot, watch,
};

use futures::{future::Either, stream, Stream, StreamExt};
use reth_node_ethstats::EthStatsService;
Expand Down Expand Up @@ -458,13 +466,70 @@ where
N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
Evm: ConfigureEvm<Primitives = N::Primitives> + 'static,
{
Ok(ProviderFactory::new(
let factory = ProviderFactory::new(
self.right().clone(),
self.chain_spec(),
StaticFileProvider::read_write(self.data_dir().static_files())?,
)
.with_prune_modes(self.prune_modes())
.with_static_files_metrics())
.with_static_files_metrics();

let has_receipt_pruning =
self.toml_config().prune.as_ref().is_some_and(|a| a.has_receipts_pruning());

// Check for consistency between database and static files. If it fails, it unwinds to
// the first block that's consistent between database and static files.
if let Some(unwind_target) = factory
.static_file_provider()
.check_consistency(&factory.provider()?, has_receipt_pruning)?
{
// Highly unlikely to happen, and given its destructive nature, it's better to panic
// instead.
assert_ne!(
unwind_target,
PipelineTarget::Unwind(0),
"A static file <> database inconsistency was found that would trigger an unwind to block 0"
);

info!(target: "reth::cli", unwind_target = %unwind_target, "Executing an unwind after a failed storage consistency check.");

let (_tip_tx, tip_rx) = watch::channel(B256::ZERO);

// Builds an unwind-only pipeline
let pipeline = PipelineBuilder::default()
.add_stages(DefaultStages::new(
factory.clone(),
tip_rx,
Arc::new(NoopConsensus::default()),
NoopHeaderDownloader::default(),
NoopBodiesDownloader::default(),
NoopEvmConfig::<Evm>::default(),
self.toml_config().stages.clone(),
self.prune_modes(),
None,
))
.build(
factory.clone(),
StaticFileProducer::new(factory.clone(), self.prune_modes()),
);

// Unwinds to block
let (tx, rx) = oneshot::channel();

// Pipeline should be run as blocking and panic if it fails.
self.task_executor().spawn_critical_blocking(
"pipeline task",
Box::pin(async move {
let (_, result) = pipeline.run_as_fut(Some(unwind_target)).await;
let _ = tx.send(result);
}),
);
rx.await?.inspect_err(|err| {
error!(target: "reth::cli", unwind_target = %unwind_target, %err, "failed to run unwind")
})?;
}

Ok(factory)
}

/// Creates a new [`ProviderFactory`] and attaches it to the launch context.
Expand Down Expand Up @@ -787,6 +852,21 @@ where
&self.node_adapter().provider
}

/// Returns the initial backfill to sync to at launch.
///
/// This returns the configured `debug.tip` if set, otherwise it will check if backfill was
/// previously interrupted and returns the block hash of the last checkpoint, see also
/// [`Self::check_pipeline_consistency`]
pub fn initial_backfill_target(&self) -> ProviderResult<Option<B256>> {
let mut initial_target = self.node_config().debug.tip;

if initial_target.is_none() {
initial_target = self.check_pipeline_consistency()?;
}

Ok(initial_target)
}

/// Returns true if the node should terminate after the initial backfill run.
///
/// This is the case if any of these configs are set:
Expand All @@ -800,7 +880,7 @@ where
///
/// This checks for OP-Mainnet and ensures we have all the necessary data to progress (past
/// bedrock height)
pub fn ensure_chain_specific_db_checks(&self) -> ProviderResult<()> {
fn ensure_chain_specific_db_checks(&self) -> ProviderResult<()> {
if self.chain_spec().is_optimism() &&
!self.is_dev() &&
self.chain_id() == Chain::optimism_mainnet()
Expand All @@ -818,6 +898,54 @@ where
Ok(())
}

/// Check if the pipeline is consistent (all stages have the checkpoint block numbers no less
/// than the checkpoint of the first stage).
///
/// This will return the pipeline target if:
/// * the pipeline was interrupted during its previous run
/// * a new stage was added
/// * stage data was dropped manually through `reth stage drop ...`
///
/// # Returns
///
/// A target block hash if the pipeline is inconsistent, otherwise `None`.
pub fn check_pipeline_consistency(&self) -> ProviderResult<Option<B256>> {
// If no target was provided, check if the stages are congruent - check if the
// checkpoint of the last stage matches the checkpoint of the first.
let first_stage_checkpoint = self
.blockchain_db()
.get_stage_checkpoint(*StageId::ALL.first().unwrap())?
.unwrap_or_default()
.block_number;

// Skip the first stage as we've already retrieved it and comparing all other checkpoints
// against it.
for stage_id in StageId::ALL.iter().skip(1) {
let stage_checkpoint = self
.blockchain_db()
.get_stage_checkpoint(*stage_id)?
.unwrap_or_default()
.block_number;

// If the checkpoint of any stage is less than the checkpoint of the first stage,
// retrieve and return the block hash of the latest header and use it as the target.
if stage_checkpoint < first_stage_checkpoint {
debug!(
target: "consensus::engine",
first_stage_checkpoint,
inconsistent_stage_id = %stage_id,
inconsistent_stage_checkpoint = stage_checkpoint,
"Pipeline sync progress is inconsistent"
);
return self.blockchain_db().block_hash(first_stage_checkpoint);
}
}

self.ensure_chain_specific_db_checks()?;

Ok(None)
}

/// Expire the pre-merge transactions if the node is configured to do so and the chain has a
/// merge block.
///
Expand Down
19 changes: 6 additions & 13 deletions crates/node/builder/src/launch/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ impl EngineNodeLauncher {
})?
.with_components(components_builder, on_component_initialized).await?;

// Try to expire pre-merge transaction history if configured
ctx.expire_pre_merge_transactions()?;

// spawn exexs if any
let maybe_exex_manager_handle = ctx.launch_exex(installed_exex).await?;

Expand All @@ -138,7 +141,7 @@ impl EngineNodeLauncher {

let consensus = Arc::new(ctx.components().consensus().clone());

let mut pipeline = build_networked_pipeline(
let pipeline = build_networked_pipeline(
&ctx.toml_config().stages,
network_client.clone(),
consensus.clone(),
Expand All @@ -154,18 +157,7 @@ impl EngineNodeLauncher {
)?;

// The new engine writes directly to static files. This ensures that they're up to the tip.
pipeline.ensure_static_files_consistency().await?;

// Try to expire pre-merge transaction history if configured
ctx.expire_pre_merge_transactions()?;

let initial_target = if let Some(tip) = ctx.node_config().debug.tip {
Some(tip)
} else {
pipeline.initial_backfill_target()?
};

ctx.ensure_chain_specific_db_checks()?;
pipeline.move_to_static_files()?;

let pipeline_events = pipeline.events();

Expand Down Expand Up @@ -258,6 +250,7 @@ impl EngineNodeLauncher {
add_ons.launch_add_ons(add_ons_ctx).await?;

// Run consensus engine to completion
let initial_target = ctx.initial_backfill_target()?;
let mut built_payloads = ctx
.components()
.payload_builder_handle()
Expand Down
Loading
Loading