From f2e784baba5e4f656a7ff0ac19f6f9772d4db54f Mon Sep 17 00:00:00 2001 From: Sergei Shulepov Date: Mon, 30 Mar 2026 07:46:54 +0000 Subject: [PATCH 1/8] implement simple backpressure --- crates/engine/primitives/src/config.rs | 63 +++++++++++++++ crates/engine/tree/src/engine.rs | 1 - crates/engine/tree/src/launch.rs | 3 +- crates/engine/tree/src/tree/metrics.rs | 2 + crates/engine/tree/src/tree/mod.rs | 101 +++++++++++++++++++++++-- crates/engine/tree/src/tree/tests.rs | 68 +++++++++++++++++ 6 files changed, 230 insertions(+), 8 deletions(-) diff --git a/crates/engine/primitives/src/config.rs b/crates/engine/primitives/src/config.rs index 078b08d9e3f..892372bf2ef 100644 --- a/crates/engine/primitives/src/config.rs +++ b/crates/engine/primitives/src/config.rs @@ -6,6 +6,9 @@ use core::time::Duration; /// Triggers persistence when the number of canonical blocks in memory exceeds this threshold. pub const DEFAULT_PERSISTENCE_THRESHOLD: u64 = 2; +/// Maximum canonical-minus-persisted gap before engine API processing is stalled. +pub const DEFAULT_PERSISTENCE_BACKPRESSURE_THRESHOLD: u64 = 16; + /// How close to the canonical head we persist blocks. pub const DEFAULT_MEMORY_BLOCK_BUFFER_TARGET: u64 = 0; @@ -44,6 +47,16 @@ const DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH: u32 = 256; const DEFAULT_MAX_EXECUTE_BLOCK_BATCH_SIZE: usize = 4; const DEFAULT_CROSS_BLOCK_CACHE_SIZE: usize = default_cross_block_cache_size(); +const fn assert_backpressure_threshold_invariant( + persistence_threshold: u64, + persistence_backpressure_threshold: u64, +) { + debug_assert!( + persistence_backpressure_threshold > persistence_threshold, + "persistence_backpressure_threshold must be greater than persistence_threshold", + ); +} + const fn default_cross_block_cache_size() -> usize { if cfg!(test) { 1024 * 1024 // 1 MB in tests @@ -82,6 +95,8 @@ pub struct TreeConfig { /// /// Note: this should be less than or equal to `persistence_threshold`. memory_block_buffer_target: u64, + /// Maximum canonical-minus-persisted gap before engine API processing is stalled. + persistence_backpressure_threshold: u64, /// Number of pending blocks that cannot be executed due to missing parent and /// are kept in cache. block_buffer_limit: u32, @@ -164,9 +179,14 @@ pub struct TreeConfig { impl Default for TreeConfig { fn default() -> Self { + assert_backpressure_threshold_invariant( + DEFAULT_PERSISTENCE_THRESHOLD, + DEFAULT_PERSISTENCE_BACKPRESSURE_THRESHOLD, + ); Self { persistence_threshold: DEFAULT_PERSISTENCE_THRESHOLD, memory_block_buffer_target: DEFAULT_MEMORY_BLOCK_BUFFER_TARGET, + persistence_backpressure_threshold: DEFAULT_PERSISTENCE_BACKPRESSURE_THRESHOLD, block_buffer_limit: DEFAULT_BLOCK_BUFFER_LIMIT, max_invalid_header_cache_length: DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH, max_execute_block_batch_size: DEFAULT_MAX_EXECUTE_BLOCK_BATCH_SIZE, @@ -204,6 +224,7 @@ impl TreeConfig { pub const fn new( persistence_threshold: u64, memory_block_buffer_target: u64, + persistence_backpressure_threshold: u64, block_buffer_limit: u32, max_invalid_header_cache_length: u32, max_execute_block_batch_size: usize, @@ -229,9 +250,14 @@ impl TreeConfig { share_execution_cache_with_payload_builder: bool, share_sparse_trie_with_payload_builder: bool, ) -> Self { + assert_backpressure_threshold_invariant( + persistence_threshold, + persistence_backpressure_threshold, + ); Self { persistence_threshold, memory_block_buffer_target, + persistence_backpressure_threshold, block_buffer_limit, max_invalid_header_cache_length, max_execute_block_batch_size, @@ -272,6 +298,11 @@ impl TreeConfig { self.memory_block_buffer_target } + /// Return the persistence backpressure threshold. + pub const fn persistence_backpressure_threshold(&self) -> u64 { + self.persistence_backpressure_threshold + } + /// Return the block buffer limit. pub const fn block_buffer_limit(&self) -> u32 { self.block_buffer_limit @@ -368,6 +399,10 @@ impl TreeConfig { /// Setter for persistence threshold. pub const fn with_persistence_threshold(mut self, persistence_threshold: u64) -> Self { self.persistence_threshold = persistence_threshold; + assert_backpressure_threshold_invariant( + self.persistence_threshold, + self.persistence_backpressure_threshold, + ); self } @@ -380,6 +415,19 @@ impl TreeConfig { self } + /// Setter for persistence backpressure threshold. + pub const fn with_persistence_backpressure_threshold( + mut self, + persistence_backpressure_threshold: u64, + ) -> Self { + self.persistence_backpressure_threshold = persistence_backpressure_threshold; + assert_backpressure_threshold_invariant( + self.persistence_threshold, + self.persistence_backpressure_threshold, + ); + self + } + /// Setter for block buffer limit. pub const fn with_block_buffer_limit(mut self, block_buffer_limit: u32) -> Self { self.block_buffer_limit = block_buffer_limit; @@ -611,3 +659,18 @@ impl TreeConfig { self } } + +#[cfg(test)] +mod tests { + use super::TreeConfig; + + #[test] + #[should_panic( + expected = "persistence_backpressure_threshold must be greater than persistence_threshold" + )] + fn rejects_backpressure_threshold_at_or_below_persistence_threshold() { + let _ = TreeConfig::default() + .with_persistence_threshold(4) + .with_persistence_backpressure_threshold(4); + } +} diff --git a/crates/engine/tree/src/engine.rs b/crates/engine/tree/src/engine.rs index 154b3b01e4f..03efa473570 100644 --- a/crates/engine/tree/src/engine.rs +++ b/crates/engine/tree/src/engine.rs @@ -195,7 +195,6 @@ where type Block = N::Block; fn on_event(&mut self, event: FromEngine) { - // delegate to the tree let _ = self.to_tree.send(event); } diff --git a/crates/engine/tree/src/launch.rs b/crates/engine/tree/src/launch.rs index ce7a7e39b2a..530f8e1105e 100644 --- a/crates/engine/tree/src/launch.rs +++ b/crates/engine/tree/src/launch.rs @@ -101,7 +101,8 @@ where runtime, ); - let engine_handler = EngineApiRequestHandler::new(to_tree_tx, from_tree); + let engine_handler = + EngineApiRequestHandler::new(to_tree_tx, from_tree); let handler = EngineHandler::new(engine_handler, downloader, incoming_requests); let backfill_sync = PipelineSync::new(pipeline, pipeline_task_spawner); diff --git a/crates/engine/tree/src/tree/metrics.rs b/crates/engine/tree/src/tree/metrics.rs index 2b128b7f70a..57b2083aed2 100644 --- a/crates/engine/tree/src/tree/metrics.rs +++ b/crates/engine/tree/src/tree/metrics.rs @@ -171,6 +171,8 @@ pub struct EngineMetrics { pub(crate) executed_new_block_cache_miss: Counter, /// Histogram of persistence operation durations (in seconds) pub(crate) persistence_duration: Histogram, + /// Current number of messages queued in the tree input channel. + pub(crate) backpressure_buffer_len: Gauge, /// Tracks the how often we failed to deliver a newPayload response. /// /// This effectively tracks how often the message sender dropped the channel and indicates a CL diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index e65f0e70c49..a7d19821f4e 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -403,7 +403,7 @@ where /// own thread. /// /// Returns the sender through which incoming requests can be sent to the task and the receiver - /// end of a [`EngineApiEvent`] unbounded channel to receive events from the engine. + /// end of an [`EngineApiEvent`] unbounded channel to receive events from the engine. #[expect(clippy::complexity)] pub fn spawn_new( provider: P, @@ -472,12 +472,77 @@ where self.incoming_tx.clone() } + fn update_backpressure_buffer_len_metric(&self) { + self.metrics.engine.backpressure_buffer_len.set(self.incoming.len() as f64); + } + + /// How many blocks the canonical tip is ahead of the last persisted block. A large gap means + /// persistence is falling behind execution. + const fn persistence_gap(&self) -> u64 { + self.state + .tree_state + .canonical_block_number() + .saturating_sub(self.persistence_state.last_persisted_block.number) + } + + /// Returns `true` when the main loop should stop draining the tree input channel. + /// + /// This is the case when persistence is already running and the gap between the canonical tip + /// and the last persisted block has reached the configured threshold. + fn should_backpressure(&self) -> bool { + self.persistence_state.in_progress() && + self.persistence_gap() >= self.config.persistence_backpressure_threshold() + } + /// Run the engine API handler. /// /// This will block the current thread and process incoming messages. pub fn run(mut self) { loop { - match self.wait_for_event() { + // Each iteration has three phases: + // + // 1. Non-blocking poll for persistence completion. If the background flush already + // landed, absorb the result now so the gap calculation below is fresh. + // 2. Decide how to wait for the next event. When the canonical-to-persisted gap + // exceeds the backpressure threshold we only block on the persistence receiver, + // leaving new engine requests sitting in the bounded upstream channel. + // 3. Handle the event (engine message or persistence completion) and kick off a new + // persistence cycle if the threshold is met again. + // + // The net effect: when the persistence gap exceeds the threshold, we stop + // processing incoming messages and let them queue in the channel. This delays + // replies and, more importantly, prevents executing further blocks that would + // pile up in the persistence queue - where each block carries heavier state + // (eg. trie updates) than the raw payload sitting in the engine channel. + // + // Standard Ethereum CLs won't truly back off - the engine API has no + // backpressure semantics, and CLs typically timeout after ≈8s and resend - so + // this cannot prevent the incoming channel from growing under sustained load. + // But it shifts the bottleneck to the lighter-weight incoming queue rather than + // the costlier persistence pipeline. Other clients that respect reply latency + // can treat the delayed responses as a signal to chill out. + match self.try_poll_persistence() { + Ok(true) => { + if let Err(err) = self.advance_persistence() { + error!(target: "engine::tree", %err, "Advancing persistence failed"); + return + } + continue; + } + Ok(false) => {} + Err(err) => { + error!(target: "engine::tree", %err, "Polling persistence failed"); + return + } + } + + let event = if self.should_backpressure() { + self.wait_for_persistence_event() + } else { + self.wait_for_event() + }; + + match event { LoopEvent::EngineMessage(msg) => { debug!(target: "engine::tree", %msg, "received new engine message"); match self.on_engine_message(msg) { @@ -512,6 +577,24 @@ where } } + /// Blocks until the in-flight persistence task completes, used when we are under + /// backpressure. + /// + /// Unlike `wait_for_event`, this deliberately does not read from the tree input channel. Any + /// requests sent to the tree remain queued upstream until persistence catches up. + fn wait_for_persistence_event(&mut self) -> LoopEvent { + let maybe_persistence = self.persistence_state.rx.take(); + + if let Some((persistence_rx, start_time, _action)) = maybe_persistence { + match persistence_rx.recv() { + Ok(result) => LoopEvent::PersistenceComplete { result, start_time }, + Err(_) => LoopEvent::Disconnected, + } + } else { + self.wait_for_event() + } + } + /// Blocks until the next event is ready: either an incoming engine message or a persistence /// completion (if one is in progress). /// @@ -539,7 +622,10 @@ where // Put the persistence rx back - we didn't consume it self.persistence_state.rx = Some((persistence_rx, start_time, action)); match msg { - Ok(m) => LoopEvent::EngineMessage(m), + Ok(m) => { + self.update_backpressure_buffer_len_metric(); + LoopEvent::EngineMessage(m) + } Err(_) => LoopEvent::Disconnected, } }, @@ -547,7 +633,10 @@ where } else { // No persistence in progress - just wait on incoming match self.incoming.recv() { - Ok(m) => LoopEvent::EngineMessage(m), + Ok(m) => { + self.update_backpressure_buffer_len_metric(); + LoopEvent::EngineMessage(m) + } Err(_) => LoopEvent::Disconnected, } } @@ -583,6 +672,7 @@ where // if we still have blocks to execute, send them as a followup request if !blocks.is_empty() { let _ = self.incoming_tx.send(FromEngine::DownloadedBlocks(blocks)); + self.metrics.engine.backpressure_buffer_len.set(self.incoming_tx.len() as f64); } Ok(None) @@ -1341,8 +1431,7 @@ where /// Tries to poll for a completed persistence task (non-blocking). /// /// Returns `true` if a persistence task was completed, `false` otherwise. - #[cfg(test)] - pub fn try_poll_persistence(&mut self) -> Result { + fn try_poll_persistence(&mut self) -> Result { let Some((rx, start_time, action)) = self.persistence_state.rx.take() else { return Ok(false); }; diff --git a/crates/engine/tree/src/tree/tests.rs b/crates/engine/tree/src/tree/tests.rs index ce34cc83dec..bbb6c4c59b1 100644 --- a/crates/engine/tree/src/tree/tests.rs +++ b/crates/engine/tree/src/tree/tests.rs @@ -691,6 +691,74 @@ async fn test_holesky_payload() { assert!(resp.is_syncing()); } +#[test] +fn test_backpressure_waits_for_persistence_before_reading_incoming() { + let blocks: Vec<_> = TestBlockBuilder::eth().get_executed_blocks(1..4).collect(); + let mut test_harness = TestHarness::new(MAINNET.clone()).with_blocks(blocks.clone()); + test_harness.tree.config = test_harness + .tree + .config + .with_persistence_threshold(0) + .with_persistence_backpressure_threshold(1); + + let (persist_tx, persist_rx) = crossbeam_channel::bounded(1); + let persisted = blocks.last().unwrap().recovered_block().num_hash(); + test_harness.tree.persistence_state.start_save(persisted, persist_rx); + assert!(test_harness.tree.should_backpressure()); + + let (tx, mut rx) = oneshot::channel(); + test_harness + .to_tree_tx + .send(FromEngine::Request( + BeaconEngineMessage::ForkchoiceUpdated { + state: ForkchoiceState { + head_block_hash: B256::random(), + safe_block_hash: B256::random(), + finalized_block_hash: B256::random(), + }, + payload_attrs: None, + tx, + } + .into(), + )) + .unwrap(); + test_harness.to_tree_tx.send(FromEngine::DownloadedBlocks(vec![])).unwrap(); + assert_eq!(test_harness.tree.incoming.len(), 2); + + std::thread::spawn(move || { + std::thread::sleep(Duration::from_millis(10)); + persist_tx + .send(PersistenceResult { + last_block: Some(persisted), + commit_duration: Some(Duration::ZERO), + }) + .unwrap(); + }); + + let event = test_harness.tree.wait_for_persistence_event(); + assert!(matches!(event, super::LoopEvent::PersistenceComplete { .. })); + assert_eq!(test_harness.tree.incoming.len(), 2); + + let super::LoopEvent::PersistenceComplete { result, start_time } = event else { + unreachable!() + }; + test_harness.tree.on_persistence_complete(result, start_time).unwrap(); + + let super::LoopEvent::EngineMessage(message) = test_harness.tree.wait_for_event() else { + panic!("expected queued engine message") + }; + let _ = test_harness.tree.on_engine_message(message).unwrap(); + let msg = rx.try_recv(); + assert!(msg.is_ok()); + assert_eq!(test_harness.tree.incoming.len(), 1); + + let super::LoopEvent::EngineMessage(message) = test_harness.tree.wait_for_event() else { + panic!("expected queued engine message") + }; + let _ = test_harness.tree.on_engine_message(message).unwrap(); + assert_eq!(test_harness.tree.incoming.len(), 0); +} + #[tokio::test] async fn test_tree_state_on_new_head_reorg() { reth_tracing::init_test_tracing(); From 08a3d7ea0b626d81879a77ced44f6946bd3d799e Mon Sep 17 00:00:00 2001 From: Sergei Shulepov Date: Mon, 30 Mar 2026 11:22:02 +0000 Subject: [PATCH 2/8] metrics --- crates/engine/tree/src/tree/metrics.rs | 6 ++++-- crates/engine/tree/src/tree/mod.rs | 22 ++++++++-------------- 2 files changed, 12 insertions(+), 16 deletions(-) diff --git a/crates/engine/tree/src/tree/metrics.rs b/crates/engine/tree/src/tree/metrics.rs index 57b2083aed2..f8a96e3c6bb 100644 --- a/crates/engine/tree/src/tree/metrics.rs +++ b/crates/engine/tree/src/tree/metrics.rs @@ -171,8 +171,10 @@ pub struct EngineMetrics { pub(crate) executed_new_block_cache_miss: Counter, /// Histogram of persistence operation durations (in seconds) pub(crate) persistence_duration: Histogram, - /// Current number of messages queued in the tree input channel. - pub(crate) backpressure_buffer_len: Gauge, + /// Whether the engine loop is currently stalled on persistence backpressure. + pub(crate) backpressure_active: Gauge, + /// Time spent blocked waiting on persistence because backpressure was active. + pub(crate) backpressure_stall_duration: Histogram, /// Tracks the how often we failed to deliver a newPayload response. /// /// This effectively tracks how often the message sender dropped the channel and indicates a CL diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index a7d19821f4e..ee83b0dc7c0 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -472,10 +472,6 @@ where self.incoming_tx.clone() } - fn update_backpressure_buffer_len_metric(&self) { - self.metrics.engine.backpressure_buffer_len.set(self.incoming.len() as f64); - } - /// How many blocks the canonical tip is ahead of the last persisted block. A large gap means /// persistence is falling behind execution. const fn persistence_gap(&self) -> u64 { @@ -537,8 +533,13 @@ where } let event = if self.should_backpressure() { - self.wait_for_persistence_event() + self.metrics.engine.backpressure_active.set(1.0); + let stall_start = Instant::now(); + let event = self.wait_for_persistence_event(); + self.metrics.engine.backpressure_stall_duration.record(stall_start.elapsed()); + event } else { + self.metrics.engine.backpressure_active.set(0.0); self.wait_for_event() }; @@ -622,10 +623,7 @@ where // Put the persistence rx back - we didn't consume it self.persistence_state.rx = Some((persistence_rx, start_time, action)); match msg { - Ok(m) => { - self.update_backpressure_buffer_len_metric(); - LoopEvent::EngineMessage(m) - } + Ok(m) => LoopEvent::EngineMessage(m), Err(_) => LoopEvent::Disconnected, } }, @@ -633,10 +631,7 @@ where } else { // No persistence in progress - just wait on incoming match self.incoming.recv() { - Ok(m) => { - self.update_backpressure_buffer_len_metric(); - LoopEvent::EngineMessage(m) - } + Ok(m) => LoopEvent::EngineMessage(m), Err(_) => LoopEvent::Disconnected, } } @@ -672,7 +667,6 @@ where // if we still have blocks to execute, send them as a followup request if !blocks.is_empty() { let _ = self.incoming_tx.send(FromEngine::DownloadedBlocks(blocks)); - self.metrics.engine.backpressure_buffer_len.set(self.incoming_tx.len() as f64); } Ok(None) From 6cb53477ddfa3c72dec1a231972409b567f549ec Mon Sep 17 00:00:00 2001 From: Sergei Shulepov Date: Mon, 30 Mar 2026 11:42:21 +0000 Subject: [PATCH 3/8] tidy up --- crates/engine/tree/src/engine.rs | 1 + crates/engine/tree/src/launch.rs | 3 +-- crates/engine/tree/src/tree/mod.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/engine/tree/src/engine.rs b/crates/engine/tree/src/engine.rs index 03efa473570..154b3b01e4f 100644 --- a/crates/engine/tree/src/engine.rs +++ b/crates/engine/tree/src/engine.rs @@ -195,6 +195,7 @@ where type Block = N::Block; fn on_event(&mut self, event: FromEngine) { + // delegate to the tree let _ = self.to_tree.send(event); } diff --git a/crates/engine/tree/src/launch.rs b/crates/engine/tree/src/launch.rs index 530f8e1105e..ce7a7e39b2a 100644 --- a/crates/engine/tree/src/launch.rs +++ b/crates/engine/tree/src/launch.rs @@ -101,8 +101,7 @@ where runtime, ); - let engine_handler = - EngineApiRequestHandler::new(to_tree_tx, from_tree); + let engine_handler = EngineApiRequestHandler::new(to_tree_tx, from_tree); let handler = EngineHandler::new(engine_handler, downloader, incoming_requests); let backfill_sync = PipelineSync::new(pipeline, pipeline_task_spawner); diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index ee83b0dc7c0..48834afa33e 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -403,7 +403,7 @@ where /// own thread. /// /// Returns the sender through which incoming requests can be sent to the task and the receiver - /// end of an [`EngineApiEvent`] unbounded channel to receive events from the engine. + /// end of a [`EngineApiEvent`] unbounded channel to receive events from the engine. #[expect(clippy::complexity)] pub fn spawn_new( provider: P, From 20da4f65190e9f929737f22f7a95fedfd86a8c8d Mon Sep 17 00:00:00 2001 From: Sergei Shulepov Date: Mon, 30 Mar 2026 11:54:10 +0000 Subject: [PATCH 4/8] fix comment. --- crates/engine/tree/src/tree/mod.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 48834afa33e..b1285149a21 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -501,15 +501,16 @@ where // landed, absorb the result now so the gap calculation below is fresh. // 2. Decide how to wait for the next event. When the canonical-to-persisted gap // exceeds the backpressure threshold we only block on the persistence receiver, - // leaving new engine requests sitting in the bounded upstream channel. + // leaving new engine requests sitting in the unbounded upstream channel. // 3. Handle the event (engine message or persistence completion) and kick off a new // persistence cycle if the threshold is met again. // // The net effect: when the persistence gap exceeds the threshold, we stop - // processing incoming messages and let them queue in the channel. This delays - // replies and, more importantly, prevents executing further blocks that would - // pile up in the persistence queue - where each block carries heavier state - // (eg. trie updates) than the raw payload sitting in the engine channel. + // processing incoming messages and let them queue in the channel. This is only a + // soft form of backpressure: it delays replies and, more importantly, prevents + // executing further blocks that would pile up in the persistence queue - where each + // block carries heavier state (eg. trie updates) than the raw payload sitting in the + // engine channel. // // Standard Ethereum CLs won't truly back off - the engine API has no // backpressure semantics, and CLs typically timeout after ≈8s and resend - so From 3bf99c706b301ad756d689ed8138104de41cc11c Mon Sep 17 00:00:00 2001 From: Sergei Shulepov Date: Mon, 30 Mar 2026 12:44:33 +0000 Subject: [PATCH 5/8] CLI + validation --- crates/cli/commands/src/node.rs | 2 ++ crates/engine/tree/src/tree/mod.rs | 6 ++-- crates/node/core/src/args/engine.rs | 49 +++++++++++++++++++++++++++-- 3 files changed, 52 insertions(+), 5 deletions(-) diff --git a/crates/cli/commands/src/node.rs b/crates/cli/commands/src/node.rs index b18498fb358..3731562a257 100644 --- a/crates/cli/commands/src/node.rs +++ b/crates/cli/commands/src/node.rs @@ -178,6 +178,8 @@ where ext, } = self; + engine.validate()?; + // set up node config let mut node_config = NodeConfig { datadir, diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index b1285149a21..828cc0e56a3 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -499,9 +499,9 @@ where // // 1. Non-blocking poll for persistence completion. If the background flush already // landed, absorb the result now so the gap calculation below is fresh. - // 2. Decide how to wait for the next event. When the canonical-to-persisted gap - // exceeds the backpressure threshold we only block on the persistence receiver, - // leaving new engine requests sitting in the unbounded upstream channel. + // 2. Decide how to wait for the next event. When the canonical-to-persisted gap exceeds + // the backpressure threshold we only block on the persistence receiver, leaving new + // engine requests sitting in the unbounded upstream channel. // 3. Handle the event (engine message or persistence completion) and kick off a new // persistence cycle if the threshold is met again. // diff --git a/crates/node/core/src/args/engine.rs b/crates/node/core/src/args/engine.rs index c52f342c3e3..f6614f67532 100644 --- a/crates/node/core/src/args/engine.rs +++ b/crates/node/core/src/args/engine.rs @@ -1,10 +1,11 @@ //! clap [Args](clap::Args) for engine purposes use clap::{builder::Resettable, Args}; +use eyre::ensure; use reth_cli_util::{parse_duration_from_secs_or_ms, parsers::format_duration_as_secs_or_ms}; use reth_engine_primitives::{ - TreeConfig, DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE, DEFAULT_SPARSE_TRIE_MAX_HOT_ACCOUNTS, - DEFAULT_SPARSE_TRIE_MAX_HOT_SLOTS, + TreeConfig, DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE, DEFAULT_PERSISTENCE_BACKPRESSURE_THRESHOLD, + DEFAULT_SPARSE_TRIE_MAX_HOT_ACCOUNTS, DEFAULT_SPARSE_TRIE_MAX_HOT_SLOTS, }; use std::{sync::OnceLock, time::Duration}; @@ -22,6 +23,7 @@ static ENGINE_DEFAULTS: OnceLock = OnceLock::new(); #[derive(Debug, Clone)] pub struct DefaultEngineValues { persistence_threshold: u64, + persistence_backpressure_threshold: u64, memory_block_buffer_target: u64, legacy_state_root_task_enabled: bool, state_cache_disabled: bool, @@ -66,6 +68,12 @@ impl DefaultEngineValues { self } + /// Set the default persistence backpressure threshold + pub const fn with_persistence_backpressure_threshold(mut self, v: u64) -> Self { + self.persistence_backpressure_threshold = v; + self + } + /// Set the default memory block buffer target pub const fn with_memory_block_buffer_target(mut self, v: u64) -> Self { self.memory_block_buffer_target = v; @@ -224,6 +232,7 @@ impl Default for DefaultEngineValues { fn default() -> Self { Self { persistence_threshold: DEFAULT_PERSISTENCE_THRESHOLD, + persistence_backpressure_threshold: DEFAULT_PERSISTENCE_BACKPRESSURE_THRESHOLD, memory_block_buffer_target: DEFAULT_MEMORY_BLOCK_BUFFER_TARGET, legacy_state_root_task_enabled: false, state_cache_disabled: false, @@ -266,6 +275,12 @@ pub struct EngineArgs { #[arg(long = "engine.persistence-threshold", default_value_t = DefaultEngineValues::get_global().persistence_threshold)] pub persistence_threshold: u64, + /// Configure the maximum canonical-minus-persisted gap before engine API processing stalls. + /// + /// This value must be greater than `--engine.persistence-threshold`. + #[arg(long = "engine.persistence-backpressure-threshold", default_value_t = DefaultEngineValues::get_global().persistence_backpressure_threshold)] + pub persistence_backpressure_threshold: u64, + /// Configure the target number of blocks to keep in memory. #[arg(long = "engine.memory-block-buffer-target", default_value_t = DefaultEngineValues::get_global().memory_block_buffer_target)] pub memory_block_buffer_target: u64, @@ -463,6 +478,7 @@ impl Default for EngineArgs { fn default() -> Self { let DefaultEngineValues { persistence_threshold, + persistence_backpressure_threshold, memory_block_buffer_target, legacy_state_root_task_enabled, state_cache_disabled, @@ -491,6 +507,7 @@ impl Default for EngineArgs { } = DefaultEngineValues::get_global().clone(); Self { persistence_threshold, + persistence_backpressure_threshold, memory_block_buffer_target, legacy_state_root_task_enabled, state_root_task_compare_updates, @@ -529,10 +546,22 @@ impl Default for EngineArgs { } impl EngineArgs { + /// Validates cross-field engine arguments. + pub fn validate(&self) -> eyre::Result<()> { + ensure!( + self.persistence_backpressure_threshold > self.persistence_threshold, + "--engine.persistence-backpressure-threshold ({}) must be greater than --engine.persistence-threshold ({})", + self.persistence_backpressure_threshold, + self.persistence_threshold + ); + Ok(()) + } + /// Creates a [`TreeConfig`] from the engine arguments. pub fn tree_config(&self) -> TreeConfig { let config = TreeConfig::default() .with_persistence_threshold(self.persistence_threshold) + .with_persistence_backpressure_threshold(self.persistence_backpressure_threshold) .with_memory_block_buffer_target(self.memory_block_buffer_target) .with_legacy_state_root(self.legacy_state_root_task_enabled) .without_state_cache(self.state_cache_disabled) @@ -590,6 +619,7 @@ mod tests { fn engine_args() { let args = EngineArgs { persistence_threshold: 100, + persistence_backpressure_threshold: 101, memory_block_buffer_target: 50, legacy_state_root_task_enabled: true, caching_and_prewarming_enabled: true, @@ -627,6 +657,8 @@ mod tests { "reth", "--engine.persistence-threshold", "100", + "--engine.persistence-backpressure-threshold", + "101", "--engine.memory-block-buffer-target", "50", "--engine.legacy-state-root", @@ -665,6 +697,19 @@ mod tests { assert_eq!(parsed_args, args); } + #[test] + fn validate_rejects_invalid_backpressure_threshold() { + let args = EngineArgs { + persistence_threshold: 4, + persistence_backpressure_threshold: 4, + ..EngineArgs::default() + }; + + let err = args.validate().unwrap_err().to_string(); + assert!(err.contains("engine.persistence-backpressure-threshold")); + assert!(err.contains("engine.persistence-threshold")); + } + #[test] fn test_parse_slow_block_threshold() { // Test default value (None - disabled) From 7187217f250d385da9353ea146d1af362d5ab699 Mon Sep 17 00:00:00 2001 From: Sergei Shulepov Date: Mon, 30 Mar 2026 13:20:22 +0000 Subject: [PATCH 6/8] clippy --- crates/engine/tree/src/tree/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 828cc0e56a3..7d08a2929c6 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -485,7 +485,7 @@ where /// /// This is the case when persistence is already running and the gap between the canonical tip /// and the last persisted block has reached the configured threshold. - fn should_backpressure(&self) -> bool { + const fn should_backpressure(&self) -> bool { self.persistence_state.in_progress() && self.persistence_gap() >= self.config.persistence_backpressure_threshold() } From 59f4c66c05f457a234bda4c4f363eabc525fb0e0 Mon Sep 17 00:00:00 2001 From: Sergei Shulepov Date: Mon, 30 Mar 2026 13:20:39 +0000 Subject: [PATCH 7/8] book --- docs/vocs/docs/pages/cli/reth/node.mdx | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/docs/vocs/docs/pages/cli/reth/node.mdx b/docs/vocs/docs/pages/cli/reth/node.mdx index e138938632c..835013fff27 100644 --- a/docs/vocs/docs/pages/cli/reth/node.mdx +++ b/docs/vocs/docs/pages/cli/reth/node.mdx @@ -925,6 +925,13 @@ Engine: [default: 2] + --engine.persistence-backpressure-threshold + Configure the maximum canonical-minus-persisted gap before engine API processing stalls. + + This value must be greater than `--engine.persistence-threshold`. + + [default: 16] + --engine.memory-block-buffer-target Configure the target number of blocks to keep in memory From bbc8cd03d7ba0130b8e2d6397e3e92d8cc9c8fdc Mon Sep 17 00:00:00 2001 From: Sergei Shulepov Date: Mon, 30 Mar 2026 13:29:07 +0000 Subject: [PATCH 8/8] changelog --- .changelog/zesty-bees-drink.md | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 .changelog/zesty-bees-drink.md diff --git a/.changelog/zesty-bees-drink.md b/.changelog/zesty-bees-drink.md new file mode 100644 index 00000000000..3b34df85b15 --- /dev/null +++ b/.changelog/zesty-bees-drink.md @@ -0,0 +1,8 @@ +--- +reth-engine-primitives: minor +reth-engine-tree: major +reth-node-core: minor +reth-cli-commands: minor +--- + +Added persistence backpressure to the engine tree: when the canonical-minus-persisted block gap exceeds a configurable threshold (`--engine.persistence-backpressure-threshold`, default 16), the engine loop stalls on the persistence receiver instead of processing new incoming messages. Added CLI argument, cross-field validation, metrics (`backpressure_active`, `backpressure_stall_duration`), and tests.