diff --git a/crates/cli/commands/src/common.rs b/crates/cli/commands/src/common.rs index 1ceba8f57da..25f32f63a2b 100644 --- a/crates/cli/commands/src/common.rs +++ b/crates/cli/commands/src/common.rs @@ -24,7 +24,7 @@ use reth_provider::{ providers::{BlockchainProvider, NodeTypesForProvider, StaticFileProvider}, ProviderFactory, StaticFileProviderFactory, }; -use reth_stages::{sets::DefaultStages, Pipeline, PipelineTarget}; +use reth_stages::{sets::DefaultStages, Pipeline}; use reth_static_file::StaticFileProducer; use std::{path::PathBuf, sync::Arc}; use tokio::sync::watch; @@ -126,7 +126,6 @@ impl EnvironmentArgs { where C: ChainSpecParser, { - let has_receipt_pruning = config.prune.as_ref().is_some_and(|a| a.has_receipts_pruning()); let prune_modes = config.prune.as_ref().map(|prune| prune.segments.clone()).unwrap_or_default(); let factory = ProviderFactory::>>::new( @@ -137,9 +136,8 @@ impl EnvironmentArgs { .with_prune_modes(prune_modes.clone()); // Check for consistency between database and static files. - if let Some(unwind_target) = factory - .static_file_provider() - .check_consistency(&factory.provider()?, has_receipt_pruning)? + if let Some(unwind_target) = + factory.static_file_provider().check_consistency(&factory.provider()?)? { if factory.db_ref().is_read_only()? { warn!(target: "reth::cli", ?unwind_target, "Inconsistent storage. Restart node to heal."); @@ -150,7 +148,7 @@ impl EnvironmentArgs { // instead. assert_ne!( unwind_target, - PipelineTarget::Unwind(0), + 0, "A static file <> database inconsistency was found that would trigger an unwind to block 0" ); @@ -175,7 +173,7 @@ impl EnvironmentArgs { // Move all applicable data from database to static files. pipeline.move_to_static_files()?; - pipeline.unwind(unwind_target.unwind_target().expect("should exist"), None)?; + pipeline.unwind(unwind_target, None)?; } Ok(factory) diff --git a/crates/node/builder/src/launch/common.rs b/crates/node/builder/src/launch/common.rs index 3a35c4183f1..0e39ef64f39 100644 --- a/crates/node/builder/src/launch/common.rs +++ b/crates/node/builder/src/launch/common.rs @@ -41,12 +41,10 @@ use eyre::Context; use rayon::ThreadPoolBuilder; use reth_chainspec::{Chain, EthChainSpec, EthereumHardfork, EthereumHardforks}; use reth_config::{config::EtlConfig, PruneConfig}; -use reth_consensus::noop::NoopConsensus; use reth_db_api::{database::Database, database_metrics::DatabaseMetrics}; use reth_db_common::init::{init_genesis, InitStorageError}; -use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader}; use reth_engine_local::MiningMode; -use reth_evm::{noop::NoopEvmConfig, ConfigureEvm}; +use reth_evm::ConfigureEvm; use reth_exex::ExExManagerHandle; use reth_fs_util as fs; use reth_network_p2p::headers::client::HeadersClient; @@ -67,25 +65,19 @@ use reth_node_metrics::{ }; use reth_provider::{ providers::{NodeTypesForProvider, ProviderNodeTypes, StaticFileProvider}, - BlockHashReader, BlockNumReader, BlockReaderIdExt, ProviderError, ProviderFactory, - ProviderResult, StageCheckpointReader, StaticFileProviderFactory, + BlockNumReader, BlockReaderIdExt, ProviderError, ProviderFactory, ProviderResult, + StaticFileProviderFactory, }; use reth_prune::{PruneModes, PrunerBuilder}; use reth_rpc_builder::config::RethRpcServerConfig; use reth_rpc_layer::JwtSecret; -use reth_stages::{ - sets::DefaultStages, stages::EraImportSource, MetricEvent, PipelineBuilder, PipelineTarget, - StageId, -}; +use reth_stages::{stages::EraImportSource, MetricEvent}; use reth_static_file::StaticFileProducer; use reth_tasks::TaskExecutor; use reth_tracing::tracing::{debug, error, info, warn}; use reth_transaction_pool::TransactionPool; use std::{sync::Arc, thread::available_parallelism}; -use tokio::sync::{ - mpsc::{unbounded_channel, UnboundedSender}, - oneshot, watch, -}; +use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; use futures::{future::Either, stream, Stream, StreamExt}; use reth_node_ethstats::EthStatsService; @@ -466,70 +458,13 @@ where N: ProviderNodeTypes, Evm: ConfigureEvm + 'static, { - let factory = ProviderFactory::new( + Ok(ProviderFactory::new( self.right().clone(), self.chain_spec(), StaticFileProvider::read_write(self.data_dir().static_files())?, ) .with_prune_modes(self.prune_modes()) - .with_static_files_metrics(); - - let has_receipt_pruning = - self.toml_config().prune.as_ref().is_some_and(|a| a.has_receipts_pruning()); - - // Check for consistency between database and static files. If it fails, it unwinds to - // the first block that's consistent between database and static files. - if let Some(unwind_target) = factory - .static_file_provider() - .check_consistency(&factory.provider()?, has_receipt_pruning)? - { - // Highly unlikely to happen, and given its destructive nature, it's better to panic - // instead. - assert_ne!( - unwind_target, - PipelineTarget::Unwind(0), - "A static file <> database inconsistency was found that would trigger an unwind to block 0" - ); - - info!(target: "reth::cli", unwind_target = %unwind_target, "Executing an unwind after a failed storage consistency check."); - - let (_tip_tx, tip_rx) = watch::channel(B256::ZERO); - - // Builds an unwind-only pipeline - let pipeline = PipelineBuilder::default() - .add_stages(DefaultStages::new( - factory.clone(), - tip_rx, - Arc::new(NoopConsensus::default()), - NoopHeaderDownloader::default(), - NoopBodiesDownloader::default(), - NoopEvmConfig::::default(), - self.toml_config().stages.clone(), - self.prune_modes(), - None, - )) - .build( - factory.clone(), - StaticFileProducer::new(factory.clone(), self.prune_modes()), - ); - - // Unwinds to block - let (tx, rx) = oneshot::channel(); - - // Pipeline should be run as blocking and panic if it fails. - self.task_executor().spawn_critical_blocking( - "pipeline task", - Box::pin(async move { - let (_, result) = pipeline.run_as_fut(Some(unwind_target)).await; - let _ = tx.send(result); - }), - ); - rx.await?.inspect_err(|err| { - error!(target: "reth::cli", unwind_target = %unwind_target, %err, "failed to run unwind") - })?; - } - - Ok(factory) + .with_static_files_metrics()) } /// Creates a new [`ProviderFactory`] and attaches it to the launch context. @@ -852,21 +787,6 @@ where &self.node_adapter().provider } - /// Returns the initial backfill to sync to at launch. - /// - /// This returns the configured `debug.tip` if set, otherwise it will check if backfill was - /// previously interrupted and returns the block hash of the last checkpoint, see also - /// [`Self::check_pipeline_consistency`] - pub fn initial_backfill_target(&self) -> ProviderResult> { - let mut initial_target = self.node_config().debug.tip; - - if initial_target.is_none() { - initial_target = self.check_pipeline_consistency()?; - } - - Ok(initial_target) - } - /// Returns true if the node should terminate after the initial backfill run. /// /// This is the case if any of these configs are set: @@ -880,7 +800,7 @@ where /// /// This checks for OP-Mainnet and ensures we have all the necessary data to progress (past /// bedrock height) - fn ensure_chain_specific_db_checks(&self) -> ProviderResult<()> { + pub fn ensure_chain_specific_db_checks(&self) -> ProviderResult<()> { if self.chain_spec().is_optimism() && !self.is_dev() && self.chain_id() == Chain::optimism_mainnet() @@ -898,54 +818,6 @@ where Ok(()) } - /// Check if the pipeline is consistent (all stages have the checkpoint block numbers no less - /// than the checkpoint of the first stage). - /// - /// This will return the pipeline target if: - /// * the pipeline was interrupted during its previous run - /// * a new stage was added - /// * stage data was dropped manually through `reth stage drop ...` - /// - /// # Returns - /// - /// A target block hash if the pipeline is inconsistent, otherwise `None`. - pub fn check_pipeline_consistency(&self) -> ProviderResult> { - // If no target was provided, check if the stages are congruent - check if the - // checkpoint of the last stage matches the checkpoint of the first. - let first_stage_checkpoint = self - .blockchain_db() - .get_stage_checkpoint(*StageId::ALL.first().unwrap())? - .unwrap_or_default() - .block_number; - - // Skip the first stage as we've already retrieved it and comparing all other checkpoints - // against it. - for stage_id in StageId::ALL.iter().skip(1) { - let stage_checkpoint = self - .blockchain_db() - .get_stage_checkpoint(*stage_id)? - .unwrap_or_default() - .block_number; - - // If the checkpoint of any stage is less than the checkpoint of the first stage, - // retrieve and return the block hash of the latest header and use it as the target. - if stage_checkpoint < first_stage_checkpoint { - debug!( - target: "consensus::engine", - first_stage_checkpoint, - inconsistent_stage_id = %stage_id, - inconsistent_stage_checkpoint = stage_checkpoint, - "Pipeline sync progress is inconsistent" - ); - return self.blockchain_db().block_hash(first_stage_checkpoint); - } - } - - self.ensure_chain_specific_db_checks()?; - - Ok(None) - } - /// Expire the pre-merge transactions if the node is configured to do so and the chain has a /// merge block. /// diff --git a/crates/node/builder/src/launch/engine.rs b/crates/node/builder/src/launch/engine.rs index 5f6c54afc96..02fb505b077 100644 --- a/crates/node/builder/src/launch/engine.rs +++ b/crates/node/builder/src/launch/engine.rs @@ -117,9 +117,6 @@ impl EngineNodeLauncher { })? .with_components(components_builder, on_component_initialized).await?; - // Try to expire pre-merge transaction history if configured - ctx.expire_pre_merge_transactions()?; - // spawn exexs if any let maybe_exex_manager_handle = ctx.launch_exex(installed_exex).await?; @@ -141,7 +138,7 @@ impl EngineNodeLauncher { let consensus = Arc::new(ctx.components().consensus().clone()); - let pipeline = build_networked_pipeline( + let mut pipeline = build_networked_pipeline( &ctx.toml_config().stages, network_client.clone(), consensus.clone(), @@ -157,7 +154,18 @@ impl EngineNodeLauncher { )?; // The new engine writes directly to static files. This ensures that they're up to the tip. - pipeline.move_to_static_files()?; + pipeline.ensure_static_files_consistency().await?; + + // Try to expire pre-merge transaction history if configured + ctx.expire_pre_merge_transactions()?; + + let initial_target = if let Some(tip) = ctx.node_config().debug.tip { + Some(tip) + } else { + pipeline.initial_backfill_target()? + }; + + ctx.ensure_chain_specific_db_checks()?; let pipeline_events = pipeline.events(); @@ -249,7 +257,6 @@ impl EngineNodeLauncher { add_ons.launch_add_ons(add_ons_ctx).await?; // Run consensus engine to completion - let initial_target = ctx.initial_backfill_target()?; let mut built_payloads = ctx .components() .payload_builder_handle() diff --git a/crates/stages/api/src/pipeline/mod.rs b/crates/stages/api/src/pipeline/mod.rs index 0a9aaef73de..2446219ea3d 100644 --- a/crates/stages/api/src/pipeline/mod.rs +++ b/crates/stages/api/src/pipeline/mod.rs @@ -9,7 +9,7 @@ use reth_primitives_traits::constants::BEACON_CONSENSUS_REORG_UNWIND_DEPTH; use reth_provider::{ providers::ProviderNodeTypes, BlockHashReader, BlockNumReader, ChainStateBlockReader, ChainStateBlockWriter, DBProvider, DatabaseProviderFactory, ProviderFactory, - PruneCheckpointReader, StageCheckpointReader, StageCheckpointWriter, + PruneCheckpointReader, StageCheckpointReader, StageCheckpointWriter, StaticFileProviderFactory, }; use reth_prune::PrunerBuilder; use reth_static_file::StaticFileProducer; @@ -31,7 +31,7 @@ use crate::{ }; pub use builder::*; use progress::*; -use reth_errors::RethResult; +use reth_errors::{ProviderResult, RethResult}; pub use set::*; /// A container for a queued stage. @@ -101,12 +101,6 @@ impl Pipeline { PipelineBuilder::default() } - /// Return the minimum block number achieved by - /// any stage during the execution of the pipeline. - pub const fn minimum_block_number(&self) -> Option { - self.progress.minimum_block_number - } - /// Set tip for reverse sync. #[track_caller] pub fn set_tip(&self, tip: B256) { @@ -127,9 +121,7 @@ impl Pipeline { ) -> &mut dyn Stage< as DatabaseProviderFactory>::ProviderRW> { &mut self.stages[idx] } -} -impl Pipeline { /// Registers progress metrics for each registered stage pub fn register_metrics(&mut self) -> Result<(), PipelineError> { let Some(metrics_tx) = &mut self.metrics_tx else { return Ok(()) }; @@ -290,6 +282,81 @@ impl Pipeline { Ok(()) } + /// Check if the pipeline is consistent (all stages have the checkpoint block numbers no less + /// than the checkpoint of the first stage). + /// + /// This will return the pipeline target if: + /// * the pipeline was interrupted during its previous run + /// * a new stage was added + /// * stage data was dropped manually through `reth stage drop ...` + /// + /// # Returns + /// + /// A target block hash if the pipeline is inconsistent, otherwise `None`. + pub fn initial_backfill_target(&self) -> ProviderResult> { + let provider = self.provider_factory.provider()?; + + // If no target was provided, check if the stages are congruent - check if the + // checkpoint of the last stage matches the checkpoint of the first. + let first_stage_checkpoint = provider + .get_stage_checkpoint(self.stages.first().unwrap().id())? + .unwrap_or_default() + .block_number; + + // Skip the first stage as we've already retrieved it and comparing all other checkpoints + // against it. + for stage in self.stages.iter().skip(1) { + let stage_id = stage.id(); + + let stage_checkpoint = + provider.get_stage_checkpoint(stage_id)?.unwrap_or_default().block_number; + + // If the checkpoint of any stage is less than the checkpoint of the first stage, + // retrieve and return the block hash of the latest header and use it as the target. + if stage_checkpoint < first_stage_checkpoint { + debug!( + target: "consensus::engine", + first_stage_checkpoint, + inconsistent_stage_id = %stage_id, + inconsistent_stage_checkpoint = stage_checkpoint, + "Pipeline sync progress is inconsistent" + ); + return provider.block_hash(first_stage_checkpoint); + } + } + + Ok(None) + } + + /// Checks for consistency between database and static files. If it fails, it unwinds to + /// the first block that's consistent between database and static files. + pub async fn ensure_static_files_consistency(&mut self) -> Result<(), PipelineError> { + let maybe_unwind_target = self + .provider_factory + .static_file_provider() + .check_consistency(&self.provider_factory.provider()?)?; + + self.move_to_static_files()?; + + if let Some(unwind_target) = maybe_unwind_target { + // Highly unlikely to happen, and given its destructive nature, it's better to panic + // instead. + assert_ne!( + unwind_target, + 0, + "A static file <> database inconsistency was found that would trigger an unwind to block 0" + ); + + info!(target: "reth::cli", unwind_target = %unwind_target, "Executing an unwind after a failed storage consistency check."); + + self.unwind(unwind_target, None).inspect_err(|err| { + error!(target: "reth::cli", unwind_target = %unwind_target, %err, "failed to run unwind") + })?; + } + + Ok(()) + } + /// Unwind the stages to the target block (exclusive). /// /// If the unwind is due to a bad block the number of that block should be specified. diff --git a/crates/stages/stages/src/stages/mod.rs b/crates/stages/stages/src/stages/mod.rs index f9b2312f5ab..7e57009e808 100644 --- a/crates/stages/stages/src/stages/mod.rs +++ b/crates/stages/stages/src/stages/mod.rs @@ -72,9 +72,7 @@ mod tests { StaticFileProviderFactory, StorageReader, }; use reth_prune_types::{PruneMode, PruneModes}; - use reth_stages_api::{ - ExecInput, ExecutionStageThresholds, PipelineTarget, Stage, StageCheckpoint, StageId, - }; + use reth_stages_api::{ExecInput, ExecutionStageThresholds, Stage, StageCheckpoint, StageId}; use reth_static_file_types::StaticFileSegment; use reth_testing_utils::generators::{ self, random_block, random_block_range, random_receipt, BlockRangeParams, @@ -301,7 +299,7 @@ mod tests { prune_count: usize, segment: StaticFileSegment, is_full_node: bool, - expected: Option, + expected: Option, ) { // We recreate the static file provider, since consistency heals are done on fetching the // writer for the first time. @@ -323,11 +321,18 @@ mod tests { // We recreate the static file provider, since consistency heals are done on fetching the // writer for the first time. + let mut provider = db.factory.database_provider_ro().unwrap(); + if is_full_node { + provider.set_prune_modes(PruneModes { + receipts: Some(PruneMode::Full), + ..Default::default() + }); + } let mut static_file_provider = db.factory.static_file_provider(); static_file_provider = StaticFileProvider::read_write(static_file_provider.path()).unwrap(); assert!(matches!( static_file_provider - .check_consistency(&db.factory.database_provider_ro().unwrap(), is_full_node,), + .check_consistency(&provider), Ok(e) if e == expected )); } @@ -338,7 +343,7 @@ mod tests { db: &TestStageDB, stage_id: StageId, checkpoint_block_number: BlockNumber, - expected: Option, + expected: Option, ) { let provider_rw = db.factory.provider_rw().unwrap(); provider_rw @@ -349,18 +354,15 @@ mod tests { assert!(matches!( db.factory .static_file_provider() - .check_consistency(&db.factory.database_provider_ro().unwrap(), false,), + .check_consistency(&db.factory.database_provider_ro().unwrap()), Ok(e) if e == expected )); } /// Inserts a dummy value at key and compare the check consistency result against the expected /// one. - fn update_db_and_check>( - db: &TestStageDB, - key: u64, - expected: Option, - ) where + fn update_db_and_check>(db: &TestStageDB, key: u64, expected: Option) + where ::Value: Default, { update_db_with_and_check::(db, key, expected, &Default::default()); @@ -371,7 +373,7 @@ mod tests { fn update_db_with_and_check>( db: &TestStageDB, key: u64, - expected: Option, + expected: Option, value: &T::Value, ) { let provider_rw = db.factory.provider_rw().unwrap(); @@ -382,7 +384,7 @@ mod tests { assert!(matches!( db.factory .static_file_provider() - .check_consistency(&db.factory.database_provider_ro().unwrap(), false), + .check_consistency(&db.factory.database_provider_ro().unwrap()), Ok(e) if e == expected )); } @@ -393,7 +395,7 @@ mod tests { let db_provider = db.factory.database_provider_ro().unwrap(); assert!(matches!( - db.factory.static_file_provider().check_consistency(&db_provider, false), + db.factory.static_file_provider().check_consistency(&db_provider), Ok(None) )); } @@ -415,7 +417,7 @@ mod tests { 1, StaticFileSegment::Receipts, archive_node, - Some(PipelineTarget::Unwind(88)), + Some(88), ); simulate_behind_checkpoint_corruption( @@ -423,7 +425,7 @@ mod tests { 3, StaticFileSegment::Headers, archive_node, - Some(PipelineTarget::Unwind(86)), + Some(86), ); } @@ -472,7 +474,7 @@ mod tests { ); // When a checkpoint is ahead, we request a pipeline unwind. - save_checkpoint_and_check(&db, StageId::Headers, 91, Some(PipelineTarget::Unwind(block))); + save_checkpoint_and_check(&db, StageId::Headers, 91, Some(block)); } #[test] @@ -485,7 +487,7 @@ mod tests { .unwrap(); // Creates a gap of one header: static_file db - update_db_and_check::(&db, current + 2, Some(PipelineTarget::Unwind(89))); + update_db_and_check::(&db, current + 2, Some(89)); // Fill the gap, and ensure no unwind is necessary. update_db_and_check::(&db, current + 1, None); @@ -504,7 +506,7 @@ mod tests { update_db_with_and_check::( &db, current + 2, - Some(PipelineTarget::Unwind(89)), + Some(89), &TxLegacy::default().into_signed(Signature::test_signature()).into(), ); @@ -527,7 +529,7 @@ mod tests { .unwrap(); // Creates a gap of one receipt: static_file db - update_db_and_check::(&db, current + 2, Some(PipelineTarget::Unwind(89))); + update_db_and_check::(&db, current + 2, Some(89)); // Fill the gap, and ensure no unwind is necessary. update_db_and_check::(&db, current + 1, None); diff --git a/crates/storage/provider/src/providers/database/mod.rs b/crates/storage/provider/src/providers/database/mod.rs index 54642a94757..f7b3c4ba603 100644 --- a/crates/storage/provider/src/providers/database/mod.rs +++ b/crates/storage/provider/src/providers/database/mod.rs @@ -111,6 +111,11 @@ impl ProviderFactory { pub fn into_db(self) -> N::DB { self.db } + + /// Returns reference to the prune modes. + pub const fn prune_modes_ref(&self) -> &PruneModes { + &self.prune_modes + } } impl>> ProviderFactory { diff --git a/crates/storage/provider/src/providers/static_file/manager.rs b/crates/storage/provider/src/providers/static_file/manager.rs index 434d3836fb2..800c761718a 100644 --- a/crates/storage/provider/src/providers/static_file/manager.rs +++ b/crates/storage/provider/src/providers/static_file/manager.rs @@ -37,7 +37,7 @@ use reth_ethereum_primitives::{Receipt, TransactionSigned}; use reth_nippy_jar::{NippyJar, NippyJarChecker, CONFIG_FILE_EXTENSION}; use reth_node_types::{FullNodePrimitives, NodePrimitives}; use reth_primitives_traits::{RecoveredBlock, SealedHeader, SignedTransaction}; -use reth_stages_types::{PipelineTarget, StageId}; +use reth_stages_types::StageId; use reth_static_file_types::{ find_fixed_range, HighestStaticFiles, SegmentHeader, SegmentRangeInclusive, StaticFileSegment, DEFAULT_BLOCKS_PER_STATIC_FILE, @@ -731,15 +731,14 @@ impl StaticFileProvider { /// * its highest block should match the stage checkpoint block number if it's equal or higher /// than the corresponding database table last entry. /// - /// Returns a [`Option`] of [`PipelineTarget::Unwind`] if any healing is further required. + /// Returns a [`Option`] with block number to unwind to if any healing is further required. /// /// WARNING: No static file writer should be held before calling this function, otherwise it /// will deadlock. pub fn check_consistency( &self, provider: &Provider, - has_receipt_pruning: bool, - ) -> ProviderResult> + ) -> ProviderResult> where Provider: DBProvider + BlockReader + StageCheckpointReader + ChainSpecProvider, N: NodePrimitives, @@ -776,7 +775,7 @@ impl StaticFileProvider { }; for segment in StaticFileSegment::iter() { - if has_receipt_pruning && segment.is_receipts() { + if provider.prune_modes_ref().has_receipts_pruning() && segment.is_receipts() { // Pruned nodes (including full node) do not store receipts as static files. continue } @@ -887,7 +886,7 @@ impl StaticFileProvider { } } - Ok(unwind_target.map(PipelineTarget::Unwind)) + Ok(unwind_target) } /// Checks consistency of the latest static file segment and throws an error if at fault.