diff --git a/Cargo.lock b/Cargo.lock index 4d2a055680f..26bbf6ea720 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6919,6 +6919,7 @@ dependencies = [ "reth-chainspec", "reth-consensus", "reth-db", + "reth-db-api", "reth-engine-primitives", "reth-errors", "reth-ethereum-consensus", @@ -6931,12 +6932,15 @@ dependencies = [ "reth-primitives", "reth-provider", "reth-prune", + "reth-prune-types", "reth-revm", "reth-rpc-types", + "reth-stages", "reth-stages-api", "reth-static-file", "reth-tasks", "reth-tokio-util", + "reth-tracing", "reth-trie", "revm", "tokio", diff --git a/crates/engine/tree/Cargo.toml b/crates/engine/tree/Cargo.toml index ab1d0cd1f3e..bcc8ae34bdd 100644 --- a/crates/engine/tree/Cargo.toml +++ b/crates/engine/tree/Cargo.toml @@ -12,48 +12,53 @@ workspace = true [dependencies] # reth -reth-primitives.workspace = true +reth-beacon-consensus.workspace = true reth-blockchain-tree.workspace = true reth-blockchain-tree-api.workspace = true -reth-ethereum-consensus.workspace = true -reth-payload-primitives.workspace = true -reth-stages-api.workspace = true -reth-errors.workspace = true +reth-chainspec.workspace = true +reth-consensus.workspace = true reth-db.workspace = true -reth-provider.workspace = true -reth-rpc-types.workspace = true -reth-tasks.workspace = true -reth-trie.workspace = true +reth-db-api.workspace = true +reth-engine-primitives.workspace = true +reth-errors.workspace = true +reth-ethereum-consensus.workspace = true +reth-evm.workspace = true +reth-network-p2p.workspace = true reth-payload-builder.workspace = true +reth-payload-primitives.workspace = true reth-payload-validator.workspace = true +reth-primitives.workspace = true +reth-provider.workspace = true reth-prune.workspace = true +reth-revm.workspace = true +reth-rpc-types.workspace = true +reth-stages-api.workspace = true reth-static-file.workspace = true +reth-tasks.workspace = true reth-tokio-util.workspace = true -reth-engine-primitives.workspace = true -reth-network-p2p.workspace = true -reth-beacon-consensus.workspace = true -reth-consensus.workspace = true -reth-evm.workspace = true -reth-revm.workspace = true -reth-chainspec.workspace = true +reth-trie.workspace = true revm.workspace = true # common +futures.workspace = true tokio = { workspace = true, features = ["macros", "sync"] } tokio-stream = { workspace = true, features = ["sync"] } -futures.workspace = true + # metrics -reth-metrics = { workspace = true, features = ["common"] } metrics.workspace = true +reth-metrics = { workspace = true, features = ["common"] } # misc aquamarine.workspace = true -tracing.workspace = true parking_lot.workspace = true +tracing.workspace = true [dev-dependencies] # reth reth-network-p2p = { workspace = true, features = ["test-utils"] } +reth-prune-types.workspace = true +reth-stages = { workspace = true, features = ["test-utils"] } +reth-tracing.workspace = true assert_matches.workspace = true \ No newline at end of file diff --git a/crates/engine/tree/src/backfill.rs b/crates/engine/tree/src/backfill.rs index 593f6ae469d..6fce16c3490 100644 --- a/crates/engine/tree/src/backfill.rs +++ b/crates/engine/tree/src/backfill.rs @@ -7,13 +7,18 @@ //! //! These modes are mutually exclusive and the node can only be in one mode at a time. -use reth_stages_api::{ControlFlow, PipelineError, PipelineTarget}; -use std::task::{Context, Poll}; +use futures::FutureExt; +use reth_db_api::database::Database; +use reth_stages_api::{ControlFlow, Pipeline, PipelineError, PipelineTarget, PipelineWithResult}; +use reth_tasks::TaskSpawner; +use std::task::{ready, Context, Poll}; +use tokio::sync::oneshot; +use tracing::trace; /// Backfill sync mode functionality. pub trait BackfillSync: Send + Sync { /// Performs a backfill action. - fn on_action(&mut self, event: BackfillAction); + fn on_action(&mut self, action: BackfillAction); /// Polls the pipeline for completion. fn poll(&mut self, cx: &mut Context<'_>) -> Poll; @@ -29,11 +34,309 @@ pub enum BackfillAction { /// The events that can be emitted on backfill sync. #[derive(Debug)] pub enum BackfillEvent { + /// Backfill sync idle. Idle, /// Backfill sync started. Started(PipelineTarget), - /// Pipeline finished + /// Backfill sync finished. /// - /// If this is returned, the pipeline is idle. + /// If this is returned, backfill sync is idle. Finished(Result), + /// Sync task was dropped after it was started, unable to receive it because + /// channel closed. This would indicate a panicked task. + TaskDropped(String), +} + +/// Pipeline sync. +#[derive(Debug)] +pub struct PipelineSync +where + DB: Database, +{ + /// The type that can spawn the pipeline task. + pipeline_task_spawner: Box, + /// The current state of the pipeline. + /// The pipeline is used for large ranges. + pipeline_state: PipelineState, + /// Pending target block for the pipeline to sync + pending_pipeline_target: Option, +} + +impl PipelineSync +where + DB: Database + 'static, +{ + /// Create a new instance. + pub fn new(pipeline: Pipeline, pipeline_task_spawner: Box) -> Self { + Self { + pipeline_task_spawner, + pipeline_state: PipelineState::Idle(Some(pipeline)), + pending_pipeline_target: None, + } + } + + /// Returns `true` if a pipeline target is queued and will be triggered on the next `poll`. + #[allow(dead_code)] + const fn is_pipeline_sync_pending(&self) -> bool { + self.pending_pipeline_target.is_some() && self.pipeline_state.is_idle() + } + + /// Returns `true` if the pipeline is idle. + const fn is_pipeline_idle(&self) -> bool { + self.pipeline_state.is_idle() + } + + /// Returns `true` if the pipeline is active. + const fn is_pipeline_active(&self) -> bool { + !self.is_pipeline_idle() + } + + /// Sets a new target to sync the pipeline to. + /// + /// But ensures the target is not the zero hash. + fn set_pipeline_sync_target(&mut self, target: PipelineTarget) { + if target.sync_target().is_some_and(|target| target.is_zero()) { + trace!( + target: "consensus::engine::sync", + "Pipeline target cannot be zero hash." + ); + // precaution to never sync to the zero hash + return + } + self.pending_pipeline_target = Some(target); + } + + /// This will spawn the pipeline if it is idle and a target is set or if the pipeline is set to + /// run continuously. + fn try_spawn_pipeline(&mut self) -> Option { + match &mut self.pipeline_state { + PipelineState::Idle(pipeline) => { + let target = self.pending_pipeline_target.take()?; + let (tx, rx) = oneshot::channel(); + + let pipeline = pipeline.take().expect("exists"); + self.pipeline_task_spawner.spawn_critical_blocking( + "pipeline task", + Box::pin(async move { + let result = pipeline.run_as_fut(Some(target)).await; + let _ = tx.send(result); + }), + ); + self.pipeline_state = PipelineState::Running(rx); + + Some(BackfillEvent::Started(target)) + } + PipelineState::Running(_) => None, + } + } + + /// Advances the pipeline state. + /// + /// This checks for the result in the channel, or returns pending if the pipeline is idle. + fn poll_pipeline(&mut self, cx: &mut Context<'_>) -> Poll { + let res = match self.pipeline_state { + PipelineState::Idle(_) => return Poll::Pending, + PipelineState::Running(ref mut fut) => { + ready!(fut.poll_unpin(cx)) + } + }; + let ev = match res { + Ok((_, result)) => BackfillEvent::Finished(result), + Err(why) => { + // failed to receive the pipeline + BackfillEvent::TaskDropped(why.to_string()) + } + }; + Poll::Ready(ev) + } +} + +impl BackfillSync for PipelineSync +where + DB: Database + 'static, +{ + fn on_action(&mut self, event: BackfillAction) { + match event { + BackfillAction::Start(target) => self.set_pipeline_sync_target(target), + } + } + + fn poll(&mut self, cx: &mut Context<'_>) -> Poll { + // try to spawn a pipeline if a target is set + if let Some(event) = self.try_spawn_pipeline() { + return Poll::Ready(event) + } + + // make sure we poll the pipeline if it's active, and return any ready pipeline events + if !self.is_pipeline_idle() { + // advance the pipeline + if let Poll::Ready(event) = self.poll_pipeline(cx) { + return Poll::Ready(event) + } + } + + Poll::Pending + } +} + +/// The possible pipeline states within the sync controller. +/// +/// [`PipelineState::Idle`] means that the pipeline is currently idle. +/// [`PipelineState::Running`] means that the pipeline is currently running. +/// +/// NOTE: The differentiation between these two states is important, because when the pipeline is +/// running, it acquires the write lock over the database. This means that we cannot forward to the +/// blockchain tree any messages that would result in database writes, since it would result in a +/// deadlock. +#[derive(Debug)] +enum PipelineState { + /// Pipeline is idle. + Idle(Option>), + /// Pipeline is running and waiting for a response + Running(oneshot::Receiver>), +} + +impl PipelineState { + /// Returns `true` if the state matches idle. + const fn is_idle(&self) -> bool { + matches!(self, Self::Idle(_)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_utils::insert_headers_into_client; + use assert_matches::assert_matches; + use futures::poll; + use reth_chainspec::{ChainSpec, ChainSpecBuilder, MAINNET}; + use reth_db::{mdbx::DatabaseEnv, test_utils::TempDatabase}; + use reth_network_p2p::test_utils::TestFullBlockClient; + use reth_primitives::{constants::ETHEREUM_BLOCK_GAS_LIMIT, BlockNumber, Header, B256}; + use reth_provider::{ + test_utils::create_test_provider_factory_with_chain_spec, ExecutionOutcome, + }; + use reth_prune_types::PruneModes; + use reth_stages::{test_utils::TestStages, ExecOutput, StageError}; + use reth_stages_api::StageCheckpoint; + use reth_static_file::StaticFileProducer; + use reth_tasks::TokioTaskExecutor; + use std::{collections::VecDeque, future::poll_fn, sync::Arc}; + use tokio::sync::watch; + + struct TestHarness { + pipeline_sync: PipelineSync>>, + tip: B256, + } + + impl TestHarness { + fn new(total_blocks: usize, pipeline_done_after: u64) -> Self { + let chain_spec = Arc::new( + ChainSpecBuilder::default() + .chain(MAINNET.chain) + .genesis(MAINNET.genesis.clone()) + .paris_activated() + .build(), + ); + + // force the pipeline to be "done" after 5 blocks + let pipeline = TestPipelineBuilder::new() + .with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput { + checkpoint: StageCheckpoint::new(BlockNumber::from(pipeline_done_after)), + done: true, + })])) + .build(chain_spec.clone()); + + let pipeline_sync = PipelineSync::new(pipeline, Box::::default()); + let client = TestFullBlockClient::default(); + let header = Header { + base_fee_per_gas: Some(7), + gas_limit: ETHEREUM_BLOCK_GAS_LIMIT, + ..Default::default() + } + .seal_slow(); + insert_headers_into_client(&client, header, 0..total_blocks); + + let tip = client.highest_block().expect("there should be blocks here").hash(); + + Self { pipeline_sync, tip } + } + } + + struct TestPipelineBuilder { + pipeline_exec_outputs: VecDeque>, + executor_results: Vec, + } + + impl TestPipelineBuilder { + /// Create a new [`TestPipelineBuilder`]. + const fn new() -> Self { + Self { pipeline_exec_outputs: VecDeque::new(), executor_results: Vec::new() } + } + + /// Set the pipeline execution outputs to use for the test consensus engine. + fn with_pipeline_exec_outputs( + mut self, + pipeline_exec_outputs: VecDeque>, + ) -> Self { + self.pipeline_exec_outputs = pipeline_exec_outputs; + self + } + + /// Set the executor results to use for the test consensus engine. + #[allow(dead_code)] + fn with_executor_results(mut self, executor_results: Vec) -> Self { + self.executor_results = executor_results; + self + } + + /// Builds the pipeline. + fn build(self, chain_spec: Arc) -> Pipeline>> { + reth_tracing::init_test_tracing(); + + // Setup pipeline + let (tip_tx, _tip_rx) = watch::channel(B256::default()); + let pipeline = Pipeline::builder() + .add_stages(TestStages::new(self.pipeline_exec_outputs, Default::default())) + .with_tip_sender(tip_tx); + + let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec); + + let static_file_producer = + StaticFileProducer::new(provider_factory.clone(), PruneModes::default()); + + pipeline.build(provider_factory, static_file_producer) + } + } + + #[tokio::test] + async fn pipeline_started_and_finished() { + const TOTAL_BLOCKS: usize = 10; + const PIPELINE_DONE_AFTER: u64 = 5; + let TestHarness { mut pipeline_sync, tip } = + TestHarness::new(TOTAL_BLOCKS, PIPELINE_DONE_AFTER); + + let sync_future = poll_fn(|cx| pipeline_sync.poll(cx)); + let next_event = poll!(sync_future); + + // sync target not set, pipeline not started + assert_matches!(next_event, Poll::Pending); + + pipeline_sync.on_action(BackfillAction::Start(PipelineTarget::Sync(tip))); + + let sync_future = poll_fn(|cx| pipeline_sync.poll(cx)); + let next_event = poll!(sync_future); + + // sync target set, pipeline started + assert_matches!(next_event, Poll::Ready(BackfillEvent::Started(target)) => { + assert_eq!(target.sync_target().unwrap(), tip); + }); + + // the next event should be the pipeline finishing in a good state + let sync_future = poll_fn(|cx| pipeline_sync.poll(cx)); + let next_ready = sync_future.await; + assert_matches!(next_ready, BackfillEvent::Finished(result) => { + assert_matches!(result, Ok(control_flow) => assert_eq!(control_flow, ControlFlow::Continue { block_number: PIPELINE_DONE_AFTER })); + }); + } } diff --git a/crates/engine/tree/src/chain.rs b/crates/engine/tree/src/chain.rs index 140786d3bb3..badff5a0925 100644 --- a/crates/engine/tree/src/chain.rs +++ b/crates/engine/tree/src/chain.rs @@ -94,6 +94,10 @@ where } } } + BackfillEvent::TaskDropped(err) => { + tracing::error!( %err, "pipeline task dropped"); + return Poll::Ready(ChainEvent::FatalError); + } }, Poll::Pending => {} } diff --git a/crates/engine/tree/src/download.rs b/crates/engine/tree/src/download.rs index cbd9b46633c..f264b54535d 100644 --- a/crates/engine/tree/src/download.rs +++ b/crates/engine/tree/src/download.rs @@ -2,8 +2,6 @@ use crate::{engine::DownloadRequest, metrics::BlockDownloaderMetrics}; use futures::FutureExt; -use reth_beacon_consensus::EthBeaconConsensus; -use reth_chainspec::ChainSpec; use reth_consensus::Consensus; use reth_network_p2p::{ bodies::client::BodiesClient, @@ -271,11 +269,13 @@ impl BlockDownloader for NoopBlockDownloader { #[cfg(test)] mod tests { use super::*; + use crate::test_utils::insert_headers_into_client; use assert_matches::assert_matches; + use reth_beacon_consensus::EthBeaconConsensus; use reth_chainspec::{ChainSpecBuilder, MAINNET}; use reth_network_p2p::test_utils::TestFullBlockClient; - use reth_primitives::{constants::ETHEREUM_BLOCK_GAS_LIMIT, BlockBody, Header, SealedHeader}; - use std::{future::poll_fn, ops::Range, sync::Arc}; + use reth_primitives::{constants::ETHEREUM_BLOCK_GAS_LIMIT, Header}; + use std::{future::poll_fn, sync::Arc}; struct TestHarness { block_downloader: BasicBlockDownloader, @@ -308,24 +308,6 @@ mod tests { } } - fn insert_headers_into_client( - client: &TestFullBlockClient, - genesis_header: SealedHeader, - range: Range, - ) { - let mut sealed_header = genesis_header; - let body = BlockBody::default(); - for _ in range { - let (mut header, hash) = sealed_header.split(); - // update to the next header - header.parent_hash = hash; - header.number += 1; - header.timestamp += 1; - sealed_header = header.seal_slow(); - client.insert(sealed_header.clone(), body.clone()); - } - } - #[tokio::test] async fn block_downloader_range_request() { const TOTAL_BLOCKS: usize = 10; diff --git a/crates/engine/tree/src/lib.rs b/crates/engine/tree/src/lib.rs index 02590c4632d..7aa7fcf75b9 100644 --- a/crates/engine/tree/src/lib.rs +++ b/crates/engine/tree/src/lib.rs @@ -26,3 +26,6 @@ pub mod metrics; pub mod persistence; /// Support for interacting with the blockchain tree. pub mod tree; + +#[cfg(test)] +mod test_utils; diff --git a/crates/engine/tree/src/test_utils.rs b/crates/engine/tree/src/test_utils.rs new file mode 100644 index 00000000000..eed483e2993 --- /dev/null +++ b/crates/engine/tree/src/test_utils.rs @@ -0,0 +1,21 @@ +use reth_network_p2p::test_utils::TestFullBlockClient; +use reth_primitives::{BlockBody, SealedHeader}; +use std::ops::Range; + +pub(crate) fn insert_headers_into_client( + client: &TestFullBlockClient, + genesis_header: SealedHeader, + range: Range, +) { + let mut sealed_header = genesis_header; + let body = BlockBody::default(); + for _ in range { + let (mut header, hash) = sealed_header.split(); + // update to the next header + header.parent_hash = hash; + header.number += 1; + header.timestamp += 1; + sealed_header = header.seal_slow(); + client.insert(sealed_header.clone(), body.clone()); + } +}