diff --git a/crates/cli/commands/src/common.rs b/crates/cli/commands/src/common.rs index 25f32f63a2b..1ceba8f57da 100644 --- a/crates/cli/commands/src/common.rs +++ b/crates/cli/commands/src/common.rs @@ -24,7 +24,7 @@ use reth_provider::{ providers::{BlockchainProvider, NodeTypesForProvider, StaticFileProvider}, ProviderFactory, StaticFileProviderFactory, }; -use reth_stages::{sets::DefaultStages, Pipeline}; +use reth_stages::{sets::DefaultStages, Pipeline, PipelineTarget}; use reth_static_file::StaticFileProducer; use std::{path::PathBuf, sync::Arc}; use tokio::sync::watch; @@ -126,6 +126,7 @@ impl EnvironmentArgs { where C: ChainSpecParser, { + let has_receipt_pruning = config.prune.as_ref().is_some_and(|a| a.has_receipts_pruning()); let prune_modes = config.prune.as_ref().map(|prune| prune.segments.clone()).unwrap_or_default(); let factory = ProviderFactory::>>::new( @@ -136,8 +137,9 @@ impl EnvironmentArgs { .with_prune_modes(prune_modes.clone()); // Check for consistency between database and static files. - if let Some(unwind_target) = - factory.static_file_provider().check_consistency(&factory.provider()?)? + if let Some(unwind_target) = factory + .static_file_provider() + .check_consistency(&factory.provider()?, has_receipt_pruning)? { if factory.db_ref().is_read_only()? { warn!(target: "reth::cli", ?unwind_target, "Inconsistent storage. Restart node to heal."); @@ -148,7 +150,7 @@ impl EnvironmentArgs { // instead. assert_ne!( unwind_target, - 0, + PipelineTarget::Unwind(0), "A static file <> database inconsistency was found that would trigger an unwind to block 0" ); @@ -173,7 +175,7 @@ impl EnvironmentArgs { // Move all applicable data from database to static files. pipeline.move_to_static_files()?; - pipeline.unwind(unwind_target, None)?; + pipeline.unwind(unwind_target.unwind_target().expect("should exist"), None)?; } Ok(factory) diff --git a/crates/node/builder/src/launch/common.rs b/crates/node/builder/src/launch/common.rs index 2d1fb6924d8..b43dc2a2a6a 100644 --- a/crates/node/builder/src/launch/common.rs +++ b/crates/node/builder/src/launch/common.rs @@ -41,10 +41,12 @@ use eyre::Context; use rayon::ThreadPoolBuilder; use reth_chainspec::{Chain, EthChainSpec, EthereumHardfork, EthereumHardforks}; use reth_config::{config::EtlConfig, PruneConfig}; +use reth_consensus::noop::NoopConsensus; use reth_db_api::{database::Database, database_metrics::DatabaseMetrics}; use reth_db_common::init::{init_genesis, InitStorageError}; +use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader}; use reth_engine_local::MiningMode; -use reth_evm::ConfigureEvm; +use reth_evm::{noop::NoopEvmConfig, ConfigureEvm}; use reth_exex::ExExManagerHandle; use reth_fs_util as fs; use reth_network_p2p::headers::client::HeadersClient; @@ -65,19 +67,25 @@ use reth_node_metrics::{ }; use reth_provider::{ providers::{NodeTypesForProvider, ProviderNodeTypes, StaticFileProvider}, - BlockNumReader, BlockReaderIdExt, ProviderError, ProviderFactory, ProviderResult, - StaticFileProviderFactory, + BlockHashReader, BlockNumReader, BlockReaderIdExt, ProviderError, ProviderFactory, + ProviderResult, StageCheckpointReader, StaticFileProviderFactory, }; use reth_prune::{PruneModes, PrunerBuilder}; use reth_rpc_builder::config::RethRpcServerConfig; use reth_rpc_layer::JwtSecret; -use reth_stages::{stages::EraImportSource, MetricEvent}; +use reth_stages::{ + sets::DefaultStages, stages::EraImportSource, MetricEvent, PipelineBuilder, PipelineTarget, + StageId, +}; use reth_static_file::StaticFileProducer; use reth_tasks::TaskExecutor; use reth_tracing::tracing::{debug, error, info, warn}; use reth_transaction_pool::TransactionPool; use std::{sync::Arc, thread::available_parallelism}; -use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; +use tokio::sync::{ + mpsc::{unbounded_channel, UnboundedSender}, + oneshot, watch, +}; use futures::{future::Either, stream, Stream, StreamExt}; use reth_node_ethstats::EthStatsService; @@ -458,13 +466,70 @@ where N: ProviderNodeTypes, Evm: ConfigureEvm + 'static, { - Ok(ProviderFactory::new( + let factory = ProviderFactory::new( self.right().clone(), self.chain_spec(), StaticFileProvider::read_write(self.data_dir().static_files())?, ) .with_prune_modes(self.prune_modes()) - .with_static_files_metrics()) + .with_static_files_metrics(); + + let has_receipt_pruning = + self.toml_config().prune.as_ref().is_some_and(|a| a.has_receipts_pruning()); + + // Check for consistency between database and static files. If it fails, it unwinds to + // the first block that's consistent between database and static files. + if let Some(unwind_target) = factory + .static_file_provider() + .check_consistency(&factory.provider()?, has_receipt_pruning)? + { + // Highly unlikely to happen, and given its destructive nature, it's better to panic + // instead. + assert_ne!( + unwind_target, + PipelineTarget::Unwind(0), + "A static file <> database inconsistency was found that would trigger an unwind to block 0" + ); + + info!(target: "reth::cli", unwind_target = %unwind_target, "Executing an unwind after a failed storage consistency check."); + + let (_tip_tx, tip_rx) = watch::channel(B256::ZERO); + + // Builds an unwind-only pipeline + let pipeline = PipelineBuilder::default() + .add_stages(DefaultStages::new( + factory.clone(), + tip_rx, + Arc::new(NoopConsensus::default()), + NoopHeaderDownloader::default(), + NoopBodiesDownloader::default(), + NoopEvmConfig::::default(), + self.toml_config().stages.clone(), + self.prune_modes(), + None, + )) + .build( + factory.clone(), + StaticFileProducer::new(factory.clone(), self.prune_modes()), + ); + + // Unwinds to block + let (tx, rx) = oneshot::channel(); + + // Pipeline should be run as blocking and panic if it fails. + self.task_executor().spawn_critical_blocking( + "pipeline task", + Box::pin(async move { + let (_, result) = pipeline.run_as_fut(Some(unwind_target)).await; + let _ = tx.send(result); + }), + ); + rx.await?.inspect_err(|err| { + error!(target: "reth::cli", unwind_target = %unwind_target, %err, "failed to run unwind") + })?; + } + + Ok(factory) } /// Creates a new [`ProviderFactory`] and attaches it to the launch context. @@ -787,6 +852,21 @@ where &self.node_adapter().provider } + /// Returns the initial backfill to sync to at launch. + /// + /// This returns the configured `debug.tip` if set, otherwise it will check if backfill was + /// previously interrupted and returns the block hash of the last checkpoint, see also + /// [`Self::check_pipeline_consistency`] + pub fn initial_backfill_target(&self) -> ProviderResult> { + let mut initial_target = self.node_config().debug.tip; + + if initial_target.is_none() { + initial_target = self.check_pipeline_consistency()?; + } + + Ok(initial_target) + } + /// Returns true if the node should terminate after the initial backfill run. /// /// This is the case if any of these configs are set: @@ -800,7 +880,7 @@ where /// /// This checks for OP-Mainnet and ensures we have all the necessary data to progress (past /// bedrock height) - pub fn ensure_chain_specific_db_checks(&self) -> ProviderResult<()> { + fn ensure_chain_specific_db_checks(&self) -> ProviderResult<()> { if self.chain_spec().is_optimism() && !self.is_dev() && self.chain_id() == Chain::optimism_mainnet() @@ -818,6 +898,54 @@ where Ok(()) } + /// Check if the pipeline is consistent (all stages have the checkpoint block numbers no less + /// than the checkpoint of the first stage). + /// + /// This will return the pipeline target if: + /// * the pipeline was interrupted during its previous run + /// * a new stage was added + /// * stage data was dropped manually through `reth stage drop ...` + /// + /// # Returns + /// + /// A target block hash if the pipeline is inconsistent, otherwise `None`. + pub fn check_pipeline_consistency(&self) -> ProviderResult> { + // If no target was provided, check if the stages are congruent - check if the + // checkpoint of the last stage matches the checkpoint of the first. + let first_stage_checkpoint = self + .blockchain_db() + .get_stage_checkpoint(*StageId::ALL.first().unwrap())? + .unwrap_or_default() + .block_number; + + // Skip the first stage as we've already retrieved it and comparing all other checkpoints + // against it. + for stage_id in StageId::ALL.iter().skip(1) { + let stage_checkpoint = self + .blockchain_db() + .get_stage_checkpoint(*stage_id)? + .unwrap_or_default() + .block_number; + + // If the checkpoint of any stage is less than the checkpoint of the first stage, + // retrieve and return the block hash of the latest header and use it as the target. + if stage_checkpoint < first_stage_checkpoint { + debug!( + target: "consensus::engine", + first_stage_checkpoint, + inconsistent_stage_id = %stage_id, + inconsistent_stage_checkpoint = stage_checkpoint, + "Pipeline sync progress is inconsistent" + ); + return self.blockchain_db().block_hash(first_stage_checkpoint); + } + } + + self.ensure_chain_specific_db_checks()?; + + Ok(None) + } + /// Expire the pre-merge transactions if the node is configured to do so and the chain has a /// merge block. /// diff --git a/crates/node/builder/src/launch/engine.rs b/crates/node/builder/src/launch/engine.rs index 3b43f5f3299..556eb5670d1 100644 --- a/crates/node/builder/src/launch/engine.rs +++ b/crates/node/builder/src/launch/engine.rs @@ -117,6 +117,9 @@ impl EngineNodeLauncher { })? .with_components(components_builder, on_component_initialized).await?; + // Try to expire pre-merge transaction history if configured + ctx.expire_pre_merge_transactions()?; + // spawn exexs if any let maybe_exex_manager_handle = ctx.launch_exex(installed_exex).await?; @@ -138,7 +141,7 @@ impl EngineNodeLauncher { let consensus = Arc::new(ctx.components().consensus().clone()); - let mut pipeline = build_networked_pipeline( + let pipeline = build_networked_pipeline( &ctx.toml_config().stages, network_client.clone(), consensus.clone(), @@ -154,18 +157,7 @@ impl EngineNodeLauncher { )?; // The new engine writes directly to static files. This ensures that they're up to the tip. - pipeline.ensure_static_files_consistency().await?; - - // Try to expire pre-merge transaction history if configured - ctx.expire_pre_merge_transactions()?; - - let initial_target = if let Some(tip) = ctx.node_config().debug.tip { - Some(tip) - } else { - pipeline.initial_backfill_target()? - }; - - ctx.ensure_chain_specific_db_checks()?; + pipeline.move_to_static_files()?; let pipeline_events = pipeline.events(); @@ -258,6 +250,7 @@ impl EngineNodeLauncher { add_ons.launch_add_ons(add_ons_ctx).await?; // Run consensus engine to completion + let initial_target = ctx.initial_backfill_target()?; let mut built_payloads = ctx .components() .payload_builder_handle() diff --git a/crates/stages/api/src/pipeline/mod.rs b/crates/stages/api/src/pipeline/mod.rs index ac35a489031..e8542c36da6 100644 --- a/crates/stages/api/src/pipeline/mod.rs +++ b/crates/stages/api/src/pipeline/mod.rs @@ -9,7 +9,7 @@ use reth_primitives_traits::constants::BEACON_CONSENSUS_REORG_UNWIND_DEPTH; use reth_provider::{ providers::ProviderNodeTypes, BlockHashReader, BlockNumReader, ChainStateBlockReader, ChainStateBlockWriter, DBProvider, DatabaseProviderFactory, ProviderFactory, - PruneCheckpointReader, StageCheckpointReader, StageCheckpointWriter, StaticFileProviderFactory, + PruneCheckpointReader, StageCheckpointReader, StageCheckpointWriter, }; use reth_prune::PrunerBuilder; use reth_static_file::StaticFileProducer; @@ -31,7 +31,7 @@ use crate::{ }; pub use builder::*; use progress::*; -use reth_errors::{ProviderResult, RethResult}; +use reth_errors::RethResult; pub use set::*; /// A container for a queued stage. @@ -101,6 +101,12 @@ impl Pipeline { PipelineBuilder::default() } + /// Return the minimum block number achieved by + /// any stage during the execution of the pipeline. + pub const fn minimum_block_number(&self) -> Option { + self.progress.minimum_block_number + } + /// Set tip for reverse sync. #[track_caller] pub fn set_tip(&self, tip: B256) { @@ -121,7 +127,9 @@ impl Pipeline { ) -> &mut dyn Stage< as DatabaseProviderFactory>::ProviderRW> { &mut self.stages[idx] } +} +impl Pipeline { /// Registers progress metrics for each registered stage pub fn register_metrics(&mut self) -> Result<(), PipelineError> { let Some(metrics_tx) = &mut self.metrics_tx else { return Ok(()) }; @@ -282,81 +290,6 @@ impl Pipeline { Ok(()) } - /// Check if the pipeline is consistent (all stages have the checkpoint block numbers no less - /// than the checkpoint of the first stage). - /// - /// This will return the pipeline target if: - /// * the pipeline was interrupted during its previous run - /// * a new stage was added - /// * stage data was dropped manually through `reth stage drop ...` - /// - /// # Returns - /// - /// A target block hash if the pipeline is inconsistent, otherwise `None`. - pub fn initial_backfill_target(&self) -> ProviderResult> { - let provider = self.provider_factory.provider()?; - - // If no target was provided, check if the stages are congruent - check if the - // checkpoint of the last stage matches the checkpoint of the first. - let first_stage_checkpoint = provider - .get_stage_checkpoint(self.stages.first().unwrap().id())? - .unwrap_or_default() - .block_number; - - // Skip the first stage as we've already retrieved it and comparing all other checkpoints - // against it. - for stage in self.stages.iter().skip(1) { - let stage_id = stage.id(); - - let stage_checkpoint = - provider.get_stage_checkpoint(stage_id)?.unwrap_or_default().block_number; - - // If the checkpoint of any stage is less than the checkpoint of the first stage, - // retrieve and return the block hash of the latest header and use it as the target. - if stage_checkpoint < first_stage_checkpoint { - debug!( - target: "consensus::engine", - first_stage_checkpoint, - inconsistent_stage_id = %stage_id, - inconsistent_stage_checkpoint = stage_checkpoint, - "Pipeline sync progress is inconsistent" - ); - return provider.block_hash(first_stage_checkpoint); - } - } - - Ok(None) - } - - /// Checks for consistency between database and static files. If it fails, it unwinds to - /// the first block that's consistent between database and static files. - pub async fn ensure_static_files_consistency(&mut self) -> Result<(), PipelineError> { - let maybe_unwind_target = self - .provider_factory - .static_file_provider() - .check_consistency(&self.provider_factory.provider()?)?; - - self.move_to_static_files()?; - - if let Some(unwind_target) = maybe_unwind_target { - // Highly unlikely to happen, and given its destructive nature, it's better to panic - // instead. - assert_ne!( - unwind_target, - 0, - "A static file <> database inconsistency was found that would trigger an unwind to block 0" - ); - - info!(target: "reth::cli", unwind_target = %unwind_target, "Executing an unwind after a failed storage consistency check."); - - self.unwind(unwind_target, None).inspect_err(|err| { - error!(target: "reth::cli", unwind_target = %unwind_target, %err, "failed to run unwind") - })?; - } - - Ok(()) - } - /// Unwind the stages to the target block (exclusive). /// /// If the unwind is due to a bad block the number of that block should be specified. diff --git a/crates/stages/stages/src/stages/mod.rs b/crates/stages/stages/src/stages/mod.rs index 40c4cb91368..58fa7cfb324 100644 --- a/crates/stages/stages/src/stages/mod.rs +++ b/crates/stages/stages/src/stages/mod.rs @@ -75,7 +75,9 @@ mod tests { StaticFileProviderFactory, StorageReader, }; use reth_prune_types::{PruneMode, PruneModes}; - use reth_stages_api::{ExecInput, ExecutionStageThresholds, Stage, StageCheckpoint, StageId}; + use reth_stages_api::{ + ExecInput, ExecutionStageThresholds, PipelineTarget, Stage, StageCheckpoint, StageId, + }; use reth_static_file_types::StaticFileSegment; use reth_testing_utils::generators::{ self, random_block, random_block_range, random_receipt, BlockRangeParams, @@ -302,7 +304,7 @@ mod tests { prune_count: usize, segment: StaticFileSegment, is_full_node: bool, - expected: Option, + expected: Option, ) { // We recreate the static file provider, since consistency heals are done on fetching the // writer for the first time. @@ -324,18 +326,11 @@ mod tests { // We recreate the static file provider, since consistency heals are done on fetching the // writer for the first time. - let mut provider = db.factory.database_provider_ro().unwrap(); - if is_full_node { - provider.set_prune_modes(PruneModes { - receipts: Some(PruneMode::Full), - ..Default::default() - }); - } let mut static_file_provider = db.factory.static_file_provider(); static_file_provider = StaticFileProvider::read_write(static_file_provider.path()).unwrap(); assert!(matches!( static_file_provider - .check_consistency(&provider), + .check_consistency(&db.factory.database_provider_ro().unwrap(), is_full_node,), Ok(e) if e == expected )); } @@ -346,7 +341,7 @@ mod tests { db: &TestStageDB, stage_id: StageId, checkpoint_block_number: BlockNumber, - expected: Option, + expected: Option, ) { let provider_rw = db.factory.provider_rw().unwrap(); provider_rw @@ -357,15 +352,18 @@ mod tests { assert!(matches!( db.factory .static_file_provider() - .check_consistency(&db.factory.database_provider_ro().unwrap()), + .check_consistency(&db.factory.database_provider_ro().unwrap(), false,), Ok(e) if e == expected )); } /// Inserts a dummy value at key and compare the check consistency result against the expected /// one. - fn update_db_and_check>(db: &TestStageDB, key: u64, expected: Option) - where + fn update_db_and_check>( + db: &TestStageDB, + key: u64, + expected: Option, + ) where ::Value: Default, { update_db_with_and_check::(db, key, expected, &Default::default()); @@ -376,7 +374,7 @@ mod tests { fn update_db_with_and_check>( db: &TestStageDB, key: u64, - expected: Option, + expected: Option, value: &T::Value, ) { let provider_rw = db.factory.provider_rw().unwrap(); @@ -387,7 +385,7 @@ mod tests { assert!(matches!( db.factory .static_file_provider() - .check_consistency(&db.factory.database_provider_ro().unwrap()), + .check_consistency(&db.factory.database_provider_ro().unwrap(), false), Ok(e) if e == expected )); } @@ -398,7 +396,7 @@ mod tests { let db_provider = db.factory.database_provider_ro().unwrap(); assert!(matches!( - db.factory.static_file_provider().check_consistency(&db_provider), + db.factory.static_file_provider().check_consistency(&db_provider, false), Ok(None) )); } @@ -420,7 +418,7 @@ mod tests { 1, StaticFileSegment::Receipts, archive_node, - Some(88), + Some(PipelineTarget::Unwind(88)), ); simulate_behind_checkpoint_corruption( @@ -428,7 +426,7 @@ mod tests { 3, StaticFileSegment::Headers, archive_node, - Some(86), + Some(PipelineTarget::Unwind(86)), ); } @@ -477,7 +475,7 @@ mod tests { ); // When a checkpoint is ahead, we request a pipeline unwind. - save_checkpoint_and_check(&db, StageId::Headers, 91, Some(block)); + save_checkpoint_and_check(&db, StageId::Headers, 91, Some(PipelineTarget::Unwind(block))); } #[test] @@ -490,7 +488,7 @@ mod tests { .unwrap(); // Creates a gap of one header: static_file db - update_db_and_check::(&db, current + 2, Some(89)); + update_db_and_check::(&db, current + 2, Some(PipelineTarget::Unwind(89))); // Fill the gap, and ensure no unwind is necessary. update_db_and_check::(&db, current + 1, None); @@ -509,7 +507,7 @@ mod tests { update_db_with_and_check::( &db, current + 2, - Some(89), + Some(PipelineTarget::Unwind(89)), &TxLegacy::default().into_signed(Signature::test_signature()).into(), ); @@ -532,7 +530,7 @@ mod tests { .unwrap(); // Creates a gap of one receipt: static_file db - update_db_and_check::(&db, current + 2, Some(89)); + update_db_and_check::(&db, current + 2, Some(PipelineTarget::Unwind(89))); // Fill the gap, and ensure no unwind is necessary. update_db_and_check::(&db, current + 1, None); diff --git a/crates/storage/provider/src/providers/database/mod.rs b/crates/storage/provider/src/providers/database/mod.rs index bd6b1e0f472..df0bc33c461 100644 --- a/crates/storage/provider/src/providers/database/mod.rs +++ b/crates/storage/provider/src/providers/database/mod.rs @@ -111,11 +111,6 @@ impl ProviderFactory { pub fn into_db(self) -> N::DB { self.db } - - /// Returns reference to the prune modes. - pub const fn prune_modes_ref(&self) -> &PruneModes { - &self.prune_modes - } } impl>> ProviderFactory { diff --git a/crates/storage/provider/src/providers/static_file/manager.rs b/crates/storage/provider/src/providers/static_file/manager.rs index 800c761718a..434d3836fb2 100644 --- a/crates/storage/provider/src/providers/static_file/manager.rs +++ b/crates/storage/provider/src/providers/static_file/manager.rs @@ -37,7 +37,7 @@ use reth_ethereum_primitives::{Receipt, TransactionSigned}; use reth_nippy_jar::{NippyJar, NippyJarChecker, CONFIG_FILE_EXTENSION}; use reth_node_types::{FullNodePrimitives, NodePrimitives}; use reth_primitives_traits::{RecoveredBlock, SealedHeader, SignedTransaction}; -use reth_stages_types::StageId; +use reth_stages_types::{PipelineTarget, StageId}; use reth_static_file_types::{ find_fixed_range, HighestStaticFiles, SegmentHeader, SegmentRangeInclusive, StaticFileSegment, DEFAULT_BLOCKS_PER_STATIC_FILE, @@ -731,14 +731,15 @@ impl StaticFileProvider { /// * its highest block should match the stage checkpoint block number if it's equal or higher /// than the corresponding database table last entry. /// - /// Returns a [`Option`] with block number to unwind to if any healing is further required. + /// Returns a [`Option`] of [`PipelineTarget::Unwind`] if any healing is further required. /// /// WARNING: No static file writer should be held before calling this function, otherwise it /// will deadlock. pub fn check_consistency( &self, provider: &Provider, - ) -> ProviderResult> + has_receipt_pruning: bool, + ) -> ProviderResult> where Provider: DBProvider + BlockReader + StageCheckpointReader + ChainSpecProvider, N: NodePrimitives, @@ -775,7 +776,7 @@ impl StaticFileProvider { }; for segment in StaticFileSegment::iter() { - if provider.prune_modes_ref().has_receipts_pruning() && segment.is_receipts() { + if has_receipt_pruning && segment.is_receipts() { // Pruned nodes (including full node) do not store receipts as static files. continue } @@ -886,7 +887,7 @@ impl StaticFileProvider { } } - Ok(unwind_target) + Ok(unwind_target.map(PipelineTarget::Unwind)) } /// Checks consistency of the latest static file segment and throws an error if at fault.