diff --git a/.changelog/zesty-bees-drink.md b/.changelog/zesty-bees-drink.md new file mode 100644 index 00000000000..3b34df85b15 --- /dev/null +++ b/.changelog/zesty-bees-drink.md @@ -0,0 +1,8 @@ +--- +reth-engine-primitives: minor +reth-engine-tree: major +reth-node-core: minor +reth-cli-commands: minor +--- + +Added persistence backpressure to the engine tree: when the canonical-minus-persisted block gap exceeds a configurable threshold (`--engine.persistence-backpressure-threshold`, default 16), the engine loop stalls on the persistence receiver instead of processing new incoming messages. Added CLI argument, cross-field validation, metrics (`backpressure_active`, `backpressure_stall_duration`), and tests. diff --git a/crates/cli/commands/src/node.rs b/crates/cli/commands/src/node.rs index b18498fb358..3731562a257 100644 --- a/crates/cli/commands/src/node.rs +++ b/crates/cli/commands/src/node.rs @@ -178,6 +178,8 @@ where ext, } = self; + engine.validate()?; + // set up node config let mut node_config = NodeConfig { datadir, diff --git a/crates/engine/primitives/src/config.rs b/crates/engine/primitives/src/config.rs index 078b08d9e3f..892372bf2ef 100644 --- a/crates/engine/primitives/src/config.rs +++ b/crates/engine/primitives/src/config.rs @@ -6,6 +6,9 @@ use core::time::Duration; /// Triggers persistence when the number of canonical blocks in memory exceeds this threshold. pub const DEFAULT_PERSISTENCE_THRESHOLD: u64 = 2; +/// Maximum canonical-minus-persisted gap before engine API processing is stalled. +pub const DEFAULT_PERSISTENCE_BACKPRESSURE_THRESHOLD: u64 = 16; + /// How close to the canonical head we persist blocks. pub const DEFAULT_MEMORY_BLOCK_BUFFER_TARGET: u64 = 0; @@ -44,6 +47,16 @@ const DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH: u32 = 256; const DEFAULT_MAX_EXECUTE_BLOCK_BATCH_SIZE: usize = 4; const DEFAULT_CROSS_BLOCK_CACHE_SIZE: usize = default_cross_block_cache_size(); +const fn assert_backpressure_threshold_invariant( + persistence_threshold: u64, + persistence_backpressure_threshold: u64, +) { + debug_assert!( + persistence_backpressure_threshold > persistence_threshold, + "persistence_backpressure_threshold must be greater than persistence_threshold", + ); +} + const fn default_cross_block_cache_size() -> usize { if cfg!(test) { 1024 * 1024 // 1 MB in tests @@ -82,6 +95,8 @@ pub struct TreeConfig { /// /// Note: this should be less than or equal to `persistence_threshold`. memory_block_buffer_target: u64, + /// Maximum canonical-minus-persisted gap before engine API processing is stalled. + persistence_backpressure_threshold: u64, /// Number of pending blocks that cannot be executed due to missing parent and /// are kept in cache. block_buffer_limit: u32, @@ -164,9 +179,14 @@ pub struct TreeConfig { impl Default for TreeConfig { fn default() -> Self { + assert_backpressure_threshold_invariant( + DEFAULT_PERSISTENCE_THRESHOLD, + DEFAULT_PERSISTENCE_BACKPRESSURE_THRESHOLD, + ); Self { persistence_threshold: DEFAULT_PERSISTENCE_THRESHOLD, memory_block_buffer_target: DEFAULT_MEMORY_BLOCK_BUFFER_TARGET, + persistence_backpressure_threshold: DEFAULT_PERSISTENCE_BACKPRESSURE_THRESHOLD, block_buffer_limit: DEFAULT_BLOCK_BUFFER_LIMIT, max_invalid_header_cache_length: DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH, max_execute_block_batch_size: DEFAULT_MAX_EXECUTE_BLOCK_BATCH_SIZE, @@ -204,6 +224,7 @@ impl TreeConfig { pub const fn new( persistence_threshold: u64, memory_block_buffer_target: u64, + persistence_backpressure_threshold: u64, block_buffer_limit: u32, max_invalid_header_cache_length: u32, max_execute_block_batch_size: usize, @@ -229,9 +250,14 @@ impl TreeConfig { share_execution_cache_with_payload_builder: bool, share_sparse_trie_with_payload_builder: bool, ) -> Self { + assert_backpressure_threshold_invariant( + persistence_threshold, + persistence_backpressure_threshold, + ); Self { persistence_threshold, memory_block_buffer_target, + persistence_backpressure_threshold, block_buffer_limit, max_invalid_header_cache_length, max_execute_block_batch_size, @@ -272,6 +298,11 @@ impl TreeConfig { self.memory_block_buffer_target } + /// Return the persistence backpressure threshold. + pub const fn persistence_backpressure_threshold(&self) -> u64 { + self.persistence_backpressure_threshold + } + /// Return the block buffer limit. pub const fn block_buffer_limit(&self) -> u32 { self.block_buffer_limit @@ -368,6 +399,10 @@ impl TreeConfig { /// Setter for persistence threshold. pub const fn with_persistence_threshold(mut self, persistence_threshold: u64) -> Self { self.persistence_threshold = persistence_threshold; + assert_backpressure_threshold_invariant( + self.persistence_threshold, + self.persistence_backpressure_threshold, + ); self } @@ -380,6 +415,19 @@ impl TreeConfig { self } + /// Setter for persistence backpressure threshold. + pub const fn with_persistence_backpressure_threshold( + mut self, + persistence_backpressure_threshold: u64, + ) -> Self { + self.persistence_backpressure_threshold = persistence_backpressure_threshold; + assert_backpressure_threshold_invariant( + self.persistence_threshold, + self.persistence_backpressure_threshold, + ); + self + } + /// Setter for block buffer limit. pub const fn with_block_buffer_limit(mut self, block_buffer_limit: u32) -> Self { self.block_buffer_limit = block_buffer_limit; @@ -611,3 +659,18 @@ impl TreeConfig { self } } + +#[cfg(test)] +mod tests { + use super::TreeConfig; + + #[test] + #[should_panic( + expected = "persistence_backpressure_threshold must be greater than persistence_threshold" + )] + fn rejects_backpressure_threshold_at_or_below_persistence_threshold() { + let _ = TreeConfig::default() + .with_persistence_threshold(4) + .with_persistence_backpressure_threshold(4); + } +} diff --git a/crates/engine/tree/src/tree/metrics.rs b/crates/engine/tree/src/tree/metrics.rs index 2b128b7f70a..f8a96e3c6bb 100644 --- a/crates/engine/tree/src/tree/metrics.rs +++ b/crates/engine/tree/src/tree/metrics.rs @@ -171,6 +171,10 @@ pub struct EngineMetrics { pub(crate) executed_new_block_cache_miss: Counter, /// Histogram of persistence operation durations (in seconds) pub(crate) persistence_duration: Histogram, + /// Whether the engine loop is currently stalled on persistence backpressure. + pub(crate) backpressure_active: Gauge, + /// Time spent blocked waiting on persistence because backpressure was active. + pub(crate) backpressure_stall_duration: Histogram, /// Tracks the how often we failed to deliver a newPayload response. /// /// This effectively tracks how often the message sender dropped the channel and indicates a CL diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index e65f0e70c49..7d08a2929c6 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -472,12 +472,79 @@ where self.incoming_tx.clone() } + /// How many blocks the canonical tip is ahead of the last persisted block. A large gap means + /// persistence is falling behind execution. + const fn persistence_gap(&self) -> u64 { + self.state + .tree_state + .canonical_block_number() + .saturating_sub(self.persistence_state.last_persisted_block.number) + } + + /// Returns `true` when the main loop should stop draining the tree input channel. + /// + /// This is the case when persistence is already running and the gap between the canonical tip + /// and the last persisted block has reached the configured threshold. + const fn should_backpressure(&self) -> bool { + self.persistence_state.in_progress() && + self.persistence_gap() >= self.config.persistence_backpressure_threshold() + } + /// Run the engine API handler. /// /// This will block the current thread and process incoming messages. pub fn run(mut self) { loop { - match self.wait_for_event() { + // Each iteration has three phases: + // + // 1. Non-blocking poll for persistence completion. If the background flush already + // landed, absorb the result now so the gap calculation below is fresh. + // 2. Decide how to wait for the next event. When the canonical-to-persisted gap exceeds + // the backpressure threshold we only block on the persistence receiver, leaving new + // engine requests sitting in the unbounded upstream channel. + // 3. Handle the event (engine message or persistence completion) and kick off a new + // persistence cycle if the threshold is met again. + // + // The net effect: when the persistence gap exceeds the threshold, we stop + // processing incoming messages and let them queue in the channel. This is only a + // soft form of backpressure: it delays replies and, more importantly, prevents + // executing further blocks that would pile up in the persistence queue - where each + // block carries heavier state (eg. trie updates) than the raw payload sitting in the + // engine channel. + // + // Standard Ethereum CLs won't truly back off - the engine API has no + // backpressure semantics, and CLs typically timeout after ≈8s and resend - so + // this cannot prevent the incoming channel from growing under sustained load. + // But it shifts the bottleneck to the lighter-weight incoming queue rather than + // the costlier persistence pipeline. Other clients that respect reply latency + // can treat the delayed responses as a signal to chill out. + match self.try_poll_persistence() { + Ok(true) => { + if let Err(err) = self.advance_persistence() { + error!(target: "engine::tree", %err, "Advancing persistence failed"); + return + } + continue; + } + Ok(false) => {} + Err(err) => { + error!(target: "engine::tree", %err, "Polling persistence failed"); + return + } + } + + let event = if self.should_backpressure() { + self.metrics.engine.backpressure_active.set(1.0); + let stall_start = Instant::now(); + let event = self.wait_for_persistence_event(); + self.metrics.engine.backpressure_stall_duration.record(stall_start.elapsed()); + event + } else { + self.metrics.engine.backpressure_active.set(0.0); + self.wait_for_event() + }; + + match event { LoopEvent::EngineMessage(msg) => { debug!(target: "engine::tree", %msg, "received new engine message"); match self.on_engine_message(msg) { @@ -512,6 +579,24 @@ where } } + /// Blocks until the in-flight persistence task completes, used when we are under + /// backpressure. + /// + /// Unlike `wait_for_event`, this deliberately does not read from the tree input channel. Any + /// requests sent to the tree remain queued upstream until persistence catches up. + fn wait_for_persistence_event(&mut self) -> LoopEvent { + let maybe_persistence = self.persistence_state.rx.take(); + + if let Some((persistence_rx, start_time, _action)) = maybe_persistence { + match persistence_rx.recv() { + Ok(result) => LoopEvent::PersistenceComplete { result, start_time }, + Err(_) => LoopEvent::Disconnected, + } + } else { + self.wait_for_event() + } + } + /// Blocks until the next event is ready: either an incoming engine message or a persistence /// completion (if one is in progress). /// @@ -1341,8 +1426,7 @@ where /// Tries to poll for a completed persistence task (non-blocking). /// /// Returns `true` if a persistence task was completed, `false` otherwise. - #[cfg(test)] - pub fn try_poll_persistence(&mut self) -> Result { + fn try_poll_persistence(&mut self) -> Result { let Some((rx, start_time, action)) = self.persistence_state.rx.take() else { return Ok(false); }; diff --git a/crates/engine/tree/src/tree/tests.rs b/crates/engine/tree/src/tree/tests.rs index ce34cc83dec..bbb6c4c59b1 100644 --- a/crates/engine/tree/src/tree/tests.rs +++ b/crates/engine/tree/src/tree/tests.rs @@ -691,6 +691,74 @@ async fn test_holesky_payload() { assert!(resp.is_syncing()); } +#[test] +fn test_backpressure_waits_for_persistence_before_reading_incoming() { + let blocks: Vec<_> = TestBlockBuilder::eth().get_executed_blocks(1..4).collect(); + let mut test_harness = TestHarness::new(MAINNET.clone()).with_blocks(blocks.clone()); + test_harness.tree.config = test_harness + .tree + .config + .with_persistence_threshold(0) + .with_persistence_backpressure_threshold(1); + + let (persist_tx, persist_rx) = crossbeam_channel::bounded(1); + let persisted = blocks.last().unwrap().recovered_block().num_hash(); + test_harness.tree.persistence_state.start_save(persisted, persist_rx); + assert!(test_harness.tree.should_backpressure()); + + let (tx, mut rx) = oneshot::channel(); + test_harness + .to_tree_tx + .send(FromEngine::Request( + BeaconEngineMessage::ForkchoiceUpdated { + state: ForkchoiceState { + head_block_hash: B256::random(), + safe_block_hash: B256::random(), + finalized_block_hash: B256::random(), + }, + payload_attrs: None, + tx, + } + .into(), + )) + .unwrap(); + test_harness.to_tree_tx.send(FromEngine::DownloadedBlocks(vec![])).unwrap(); + assert_eq!(test_harness.tree.incoming.len(), 2); + + std::thread::spawn(move || { + std::thread::sleep(Duration::from_millis(10)); + persist_tx + .send(PersistenceResult { + last_block: Some(persisted), + commit_duration: Some(Duration::ZERO), + }) + .unwrap(); + }); + + let event = test_harness.tree.wait_for_persistence_event(); + assert!(matches!(event, super::LoopEvent::PersistenceComplete { .. })); + assert_eq!(test_harness.tree.incoming.len(), 2); + + let super::LoopEvent::PersistenceComplete { result, start_time } = event else { + unreachable!() + }; + test_harness.tree.on_persistence_complete(result, start_time).unwrap(); + + let super::LoopEvent::EngineMessage(message) = test_harness.tree.wait_for_event() else { + panic!("expected queued engine message") + }; + let _ = test_harness.tree.on_engine_message(message).unwrap(); + let msg = rx.try_recv(); + assert!(msg.is_ok()); + assert_eq!(test_harness.tree.incoming.len(), 1); + + let super::LoopEvent::EngineMessage(message) = test_harness.tree.wait_for_event() else { + panic!("expected queued engine message") + }; + let _ = test_harness.tree.on_engine_message(message).unwrap(); + assert_eq!(test_harness.tree.incoming.len(), 0); +} + #[tokio::test] async fn test_tree_state_on_new_head_reorg() { reth_tracing::init_test_tracing(); diff --git a/crates/node/core/src/args/engine.rs b/crates/node/core/src/args/engine.rs index c52f342c3e3..f6614f67532 100644 --- a/crates/node/core/src/args/engine.rs +++ b/crates/node/core/src/args/engine.rs @@ -1,10 +1,11 @@ //! clap [Args](clap::Args) for engine purposes use clap::{builder::Resettable, Args}; +use eyre::ensure; use reth_cli_util::{parse_duration_from_secs_or_ms, parsers::format_duration_as_secs_or_ms}; use reth_engine_primitives::{ - TreeConfig, DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE, DEFAULT_SPARSE_TRIE_MAX_HOT_ACCOUNTS, - DEFAULT_SPARSE_TRIE_MAX_HOT_SLOTS, + TreeConfig, DEFAULT_MULTIPROOF_TASK_CHUNK_SIZE, DEFAULT_PERSISTENCE_BACKPRESSURE_THRESHOLD, + DEFAULT_SPARSE_TRIE_MAX_HOT_ACCOUNTS, DEFAULT_SPARSE_TRIE_MAX_HOT_SLOTS, }; use std::{sync::OnceLock, time::Duration}; @@ -22,6 +23,7 @@ static ENGINE_DEFAULTS: OnceLock = OnceLock::new(); #[derive(Debug, Clone)] pub struct DefaultEngineValues { persistence_threshold: u64, + persistence_backpressure_threshold: u64, memory_block_buffer_target: u64, legacy_state_root_task_enabled: bool, state_cache_disabled: bool, @@ -66,6 +68,12 @@ impl DefaultEngineValues { self } + /// Set the default persistence backpressure threshold + pub const fn with_persistence_backpressure_threshold(mut self, v: u64) -> Self { + self.persistence_backpressure_threshold = v; + self + } + /// Set the default memory block buffer target pub const fn with_memory_block_buffer_target(mut self, v: u64) -> Self { self.memory_block_buffer_target = v; @@ -224,6 +232,7 @@ impl Default for DefaultEngineValues { fn default() -> Self { Self { persistence_threshold: DEFAULT_PERSISTENCE_THRESHOLD, + persistence_backpressure_threshold: DEFAULT_PERSISTENCE_BACKPRESSURE_THRESHOLD, memory_block_buffer_target: DEFAULT_MEMORY_BLOCK_BUFFER_TARGET, legacy_state_root_task_enabled: false, state_cache_disabled: false, @@ -266,6 +275,12 @@ pub struct EngineArgs { #[arg(long = "engine.persistence-threshold", default_value_t = DefaultEngineValues::get_global().persistence_threshold)] pub persistence_threshold: u64, + /// Configure the maximum canonical-minus-persisted gap before engine API processing stalls. + /// + /// This value must be greater than `--engine.persistence-threshold`. + #[arg(long = "engine.persistence-backpressure-threshold", default_value_t = DefaultEngineValues::get_global().persistence_backpressure_threshold)] + pub persistence_backpressure_threshold: u64, + /// Configure the target number of blocks to keep in memory. #[arg(long = "engine.memory-block-buffer-target", default_value_t = DefaultEngineValues::get_global().memory_block_buffer_target)] pub memory_block_buffer_target: u64, @@ -463,6 +478,7 @@ impl Default for EngineArgs { fn default() -> Self { let DefaultEngineValues { persistence_threshold, + persistence_backpressure_threshold, memory_block_buffer_target, legacy_state_root_task_enabled, state_cache_disabled, @@ -491,6 +507,7 @@ impl Default for EngineArgs { } = DefaultEngineValues::get_global().clone(); Self { persistence_threshold, + persistence_backpressure_threshold, memory_block_buffer_target, legacy_state_root_task_enabled, state_root_task_compare_updates, @@ -529,10 +546,22 @@ impl Default for EngineArgs { } impl EngineArgs { + /// Validates cross-field engine arguments. + pub fn validate(&self) -> eyre::Result<()> { + ensure!( + self.persistence_backpressure_threshold > self.persistence_threshold, + "--engine.persistence-backpressure-threshold ({}) must be greater than --engine.persistence-threshold ({})", + self.persistence_backpressure_threshold, + self.persistence_threshold + ); + Ok(()) + } + /// Creates a [`TreeConfig`] from the engine arguments. pub fn tree_config(&self) -> TreeConfig { let config = TreeConfig::default() .with_persistence_threshold(self.persistence_threshold) + .with_persistence_backpressure_threshold(self.persistence_backpressure_threshold) .with_memory_block_buffer_target(self.memory_block_buffer_target) .with_legacy_state_root(self.legacy_state_root_task_enabled) .without_state_cache(self.state_cache_disabled) @@ -590,6 +619,7 @@ mod tests { fn engine_args() { let args = EngineArgs { persistence_threshold: 100, + persistence_backpressure_threshold: 101, memory_block_buffer_target: 50, legacy_state_root_task_enabled: true, caching_and_prewarming_enabled: true, @@ -627,6 +657,8 @@ mod tests { "reth", "--engine.persistence-threshold", "100", + "--engine.persistence-backpressure-threshold", + "101", "--engine.memory-block-buffer-target", "50", "--engine.legacy-state-root", @@ -665,6 +697,19 @@ mod tests { assert_eq!(parsed_args, args); } + #[test] + fn validate_rejects_invalid_backpressure_threshold() { + let args = EngineArgs { + persistence_threshold: 4, + persistence_backpressure_threshold: 4, + ..EngineArgs::default() + }; + + let err = args.validate().unwrap_err().to_string(); + assert!(err.contains("engine.persistence-backpressure-threshold")); + assert!(err.contains("engine.persistence-threshold")); + } + #[test] fn test_parse_slow_block_threshold() { // Test default value (None - disabled) diff --git a/docs/vocs/docs/pages/cli/reth/node.mdx b/docs/vocs/docs/pages/cli/reth/node.mdx index e138938632c..835013fff27 100644 --- a/docs/vocs/docs/pages/cli/reth/node.mdx +++ b/docs/vocs/docs/pages/cli/reth/node.mdx @@ -925,6 +925,13 @@ Engine: [default: 2] + --engine.persistence-backpressure-threshold + Configure the maximum canonical-minus-persisted gap before engine API processing stalls. + + This value must be greater than `--engine.persistence-threshold`. + + [default: 16] + --engine.memory-block-buffer-target Configure the target number of blocks to keep in memory