Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changelog/zesty-bees-drink.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
reth-engine-primitives: minor
reth-engine-tree: major
reth-node-core: minor
reth-cli-commands: minor
---

Added persistence backpressure to the engine tree: when the canonical-minus-persisted block gap exceeds a configurable threshold (`--engine.persistence-backpressure-threshold`, default 16), the engine loop stalls on the persistence receiver instead of processing new incoming messages. Added CLI argument, cross-field validation, metrics (`backpressure_active`, `backpressure_stall_duration`), and tests.
2 changes: 2 additions & 0 deletions crates/cli/commands/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ where
ext,
} = self;

engine.validate()?;

// set up node config
let mut node_config = NodeConfig {
datadir,
Expand Down
63 changes: 63 additions & 0 deletions crates/engine/primitives/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ use core::time::Duration;
/// Triggers persistence when the number of canonical blocks in memory exceeds this threshold.
pub const DEFAULT_PERSISTENCE_THRESHOLD: u64 = 2;

/// Maximum canonical-minus-persisted gap before engine API processing is stalled.
pub const DEFAULT_PERSISTENCE_BACKPRESSURE_THRESHOLD: u64 = 16;

/// How close to the canonical head we persist blocks.
pub const DEFAULT_MEMORY_BLOCK_BUFFER_TARGET: u64 = 0;

Expand Down Expand Up @@ -44,6 +47,16 @@ const DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH: u32 = 256;
const DEFAULT_MAX_EXECUTE_BLOCK_BATCH_SIZE: usize = 4;
const DEFAULT_CROSS_BLOCK_CACHE_SIZE: usize = default_cross_block_cache_size();

const fn assert_backpressure_threshold_invariant(
persistence_threshold: u64,
persistence_backpressure_threshold: u64,
) {
debug_assert!(
persistence_backpressure_threshold > persistence_threshold,
"persistence_backpressure_threshold must be greater than persistence_threshold",
);
}

const fn default_cross_block_cache_size() -> usize {
if cfg!(test) {
1024 * 1024 // 1 MB in tests
Expand Down Expand Up @@ -82,6 +95,8 @@ pub struct TreeConfig {
///
/// Note: this should be less than or equal to `persistence_threshold`.
memory_block_buffer_target: u64,
/// Maximum canonical-minus-persisted gap before engine API processing is stalled.
persistence_backpressure_threshold: u64,
/// Number of pending blocks that cannot be executed due to missing parent and
/// are kept in cache.
block_buffer_limit: u32,
Expand Down Expand Up @@ -164,9 +179,14 @@ pub struct TreeConfig {

impl Default for TreeConfig {
fn default() -> Self {
assert_backpressure_threshold_invariant(
DEFAULT_PERSISTENCE_THRESHOLD,
DEFAULT_PERSISTENCE_BACKPRESSURE_THRESHOLD,
);
Self {
persistence_threshold: DEFAULT_PERSISTENCE_THRESHOLD,
memory_block_buffer_target: DEFAULT_MEMORY_BLOCK_BUFFER_TARGET,
persistence_backpressure_threshold: DEFAULT_PERSISTENCE_BACKPRESSURE_THRESHOLD,
block_buffer_limit: DEFAULT_BLOCK_BUFFER_LIMIT,
max_invalid_header_cache_length: DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH,
max_execute_block_batch_size: DEFAULT_MAX_EXECUTE_BLOCK_BATCH_SIZE,
Expand Down Expand Up @@ -204,6 +224,7 @@ impl TreeConfig {
pub const fn new(
persistence_threshold: u64,
memory_block_buffer_target: u64,
persistence_backpressure_threshold: u64,
block_buffer_limit: u32,
max_invalid_header_cache_length: u32,
max_execute_block_batch_size: usize,
Expand All @@ -229,9 +250,14 @@ impl TreeConfig {
share_execution_cache_with_payload_builder: bool,
share_sparse_trie_with_payload_builder: bool,
) -> Self {
assert_backpressure_threshold_invariant(
persistence_threshold,
persistence_backpressure_threshold,
);
Self {
persistence_threshold,
memory_block_buffer_target,
persistence_backpressure_threshold,
block_buffer_limit,
max_invalid_header_cache_length,
max_execute_block_batch_size,
Expand Down Expand Up @@ -272,6 +298,11 @@ impl TreeConfig {
self.memory_block_buffer_target
}

/// Return the persistence backpressure threshold.
pub const fn persistence_backpressure_threshold(&self) -> u64 {
self.persistence_backpressure_threshold
}

/// Return the block buffer limit.
pub const fn block_buffer_limit(&self) -> u32 {
self.block_buffer_limit
Expand Down Expand Up @@ -368,6 +399,10 @@ impl TreeConfig {
/// Setter for persistence threshold.
pub const fn with_persistence_threshold(mut self, persistence_threshold: u64) -> Self {
self.persistence_threshold = persistence_threshold;
assert_backpressure_threshold_invariant(
self.persistence_threshold,
self.persistence_backpressure_threshold,
);
self
}

Expand All @@ -380,6 +415,19 @@ impl TreeConfig {
self
}

/// Setter for persistence backpressure threshold.
pub const fn with_persistence_backpressure_threshold(
mut self,
persistence_backpressure_threshold: u64,
) -> Self {
self.persistence_backpressure_threshold = persistence_backpressure_threshold;
assert_backpressure_threshold_invariant(
self.persistence_threshold,
self.persistence_backpressure_threshold,
);
self
}

/// Setter for block buffer limit.
pub const fn with_block_buffer_limit(mut self, block_buffer_limit: u32) -> Self {
self.block_buffer_limit = block_buffer_limit;
Expand Down Expand Up @@ -611,3 +659,18 @@ impl TreeConfig {
self
}
}

#[cfg(test)]
mod tests {
use super::TreeConfig;

#[test]
#[should_panic(
expected = "persistence_backpressure_threshold must be greater than persistence_threshold"
)]
fn rejects_backpressure_threshold_at_or_below_persistence_threshold() {
let _ = TreeConfig::default()
.with_persistence_threshold(4)
.with_persistence_backpressure_threshold(4);
}
}
4 changes: 4 additions & 0 deletions crates/engine/tree/src/tree/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ pub struct EngineMetrics {
pub(crate) executed_new_block_cache_miss: Counter,
/// Histogram of persistence operation durations (in seconds)
pub(crate) persistence_duration: Histogram,
/// Whether the engine loop is currently stalled on persistence backpressure.
pub(crate) backpressure_active: Gauge,
/// Time spent blocked waiting on persistence because backpressure was active.
pub(crate) backpressure_stall_duration: Histogram,
/// Tracks the how often we failed to deliver a newPayload response.
///
/// This effectively tracks how often the message sender dropped the channel and indicates a CL
Expand Down
90 changes: 87 additions & 3 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,12 +472,79 @@ where
self.incoming_tx.clone()
}

/// How many blocks the canonical tip is ahead of the last persisted block. A large gap means
/// persistence is falling behind execution.
const fn persistence_gap(&self) -> u64 {
self.state
.tree_state
.canonical_block_number()
.saturating_sub(self.persistence_state.last_persisted_block.number)
}

/// Returns `true` when the main loop should stop draining the tree input channel.
///
/// This is the case when persistence is already running and the gap between the canonical tip
/// and the last persisted block has reached the configured threshold.
const fn should_backpressure(&self) -> bool {
self.persistence_state.in_progress() &&
self.persistence_gap() >= self.config.persistence_backpressure_threshold()
}

/// Run the engine API handler.
///
/// This will block the current thread and process incoming messages.
pub fn run(mut self) {
loop {
match self.wait_for_event() {
// Each iteration has three phases:
//
// 1. Non-blocking poll for persistence completion. If the background flush already
// landed, absorb the result now so the gap calculation below is fresh.
// 2. Decide how to wait for the next event. When the canonical-to-persisted gap exceeds
// the backpressure threshold we only block on the persistence receiver, leaving new
// engine requests sitting in the unbounded upstream channel.
// 3. Handle the event (engine message or persistence completion) and kick off a new
// persistence cycle if the threshold is met again.
//
// The net effect: when the persistence gap exceeds the threshold, we stop
// processing incoming messages and let them queue in the channel. This is only a
// soft form of backpressure: it delays replies and, more importantly, prevents
// executing further blocks that would pile up in the persistence queue - where each
// block carries heavier state (eg. trie updates) than the raw payload sitting in the
// engine channel.
//
// Standard Ethereum CLs won't truly back off - the engine API has no
// backpressure semantics, and CLs typically timeout after ≈8s and resend - so
// this cannot prevent the incoming channel from growing under sustained load.
// But it shifts the bottleneck to the lighter-weight incoming queue rather than
// the costlier persistence pipeline. Other clients that respect reply latency
// can treat the delayed responses as a signal to chill out.
match self.try_poll_persistence() {
Ok(true) => {
if let Err(err) = self.advance_persistence() {
error!(target: "engine::tree", %err, "Advancing persistence failed");
return
}
continue;
}
Ok(false) => {}
Err(err) => {
error!(target: "engine::tree", %err, "Polling persistence failed");
return
}
}

let event = if self.should_backpressure() {
self.metrics.engine.backpressure_active.set(1.0);
let stall_start = Instant::now();
let event = self.wait_for_persistence_event();
self.metrics.engine.backpressure_stall_duration.record(stall_start.elapsed());
event
} else {
self.metrics.engine.backpressure_active.set(0.0);
self.wait_for_event()
};

match event {
LoopEvent::EngineMessage(msg) => {
debug!(target: "engine::tree", %msg, "received new engine message");
match self.on_engine_message(msg) {
Expand Down Expand Up @@ -512,6 +579,24 @@ where
}
}

/// Blocks until the in-flight persistence task completes, used when we are under
/// backpressure.
///
/// Unlike `wait_for_event`, this deliberately does not read from the tree input channel. Any
/// requests sent to the tree remain queued upstream until persistence catches up.
fn wait_for_persistence_event(&mut self) -> LoopEvent<T, N> {
let maybe_persistence = self.persistence_state.rx.take();

if let Some((persistence_rx, start_time, _action)) = maybe_persistence {
match persistence_rx.recv() {
Ok(result) => LoopEvent::PersistenceComplete { result, start_time },
Err(_) => LoopEvent::Disconnected,
}
} else {
self.wait_for_event()
}
}

/// Blocks until the next event is ready: either an incoming engine message or a persistence
/// completion (if one is in progress).
///
Expand Down Expand Up @@ -1341,8 +1426,7 @@ where
/// Tries to poll for a completed persistence task (non-blocking).
///
/// Returns `true` if a persistence task was completed, `false` otherwise.
#[cfg(test)]
pub fn try_poll_persistence(&mut self) -> Result<bool, AdvancePersistenceError> {
fn try_poll_persistence(&mut self) -> Result<bool, AdvancePersistenceError> {
let Some((rx, start_time, action)) = self.persistence_state.rx.take() else {
return Ok(false);
};
Expand Down
68 changes: 68 additions & 0 deletions crates/engine/tree/src/tree/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,74 @@ async fn test_holesky_payload() {
assert!(resp.is_syncing());
}

#[test]
fn test_backpressure_waits_for_persistence_before_reading_incoming() {
let blocks: Vec<_> = TestBlockBuilder::eth().get_executed_blocks(1..4).collect();
let mut test_harness = TestHarness::new(MAINNET.clone()).with_blocks(blocks.clone());
test_harness.tree.config = test_harness
.tree
.config
.with_persistence_threshold(0)
.with_persistence_backpressure_threshold(1);

let (persist_tx, persist_rx) = crossbeam_channel::bounded(1);
let persisted = blocks.last().unwrap().recovered_block().num_hash();
test_harness.tree.persistence_state.start_save(persisted, persist_rx);
assert!(test_harness.tree.should_backpressure());

let (tx, mut rx) = oneshot::channel();
test_harness
.to_tree_tx
.send(FromEngine::Request(
BeaconEngineMessage::ForkchoiceUpdated {
state: ForkchoiceState {
head_block_hash: B256::random(),
safe_block_hash: B256::random(),
finalized_block_hash: B256::random(),
},
payload_attrs: None,
tx,
}
.into(),
))
.unwrap();
test_harness.to_tree_tx.send(FromEngine::DownloadedBlocks(vec![])).unwrap();
assert_eq!(test_harness.tree.incoming.len(), 2);

std::thread::spawn(move || {
std::thread::sleep(Duration::from_millis(10));
persist_tx
.send(PersistenceResult {
last_block: Some(persisted),
commit_duration: Some(Duration::ZERO),
})
.unwrap();
});

let event = test_harness.tree.wait_for_persistence_event();
assert!(matches!(event, super::LoopEvent::PersistenceComplete { .. }));
assert_eq!(test_harness.tree.incoming.len(), 2);

let super::LoopEvent::PersistenceComplete { result, start_time } = event else {
unreachable!()
};
test_harness.tree.on_persistence_complete(result, start_time).unwrap();

let super::LoopEvent::EngineMessage(message) = test_harness.tree.wait_for_event() else {
panic!("expected queued engine message")
};
let _ = test_harness.tree.on_engine_message(message).unwrap();
let msg = rx.try_recv();
assert!(msg.is_ok());
assert_eq!(test_harness.tree.incoming.len(), 1);

let super::LoopEvent::EngineMessage(message) = test_harness.tree.wait_for_event() else {
panic!("expected queued engine message")
};
let _ = test_harness.tree.on_engine_message(message).unwrap();
assert_eq!(test_harness.tree.incoming.len(), 0);
}

#[tokio::test]
async fn test_tree_state_on_new_head_reorg() {
reth_tracing::init_test_tracing();
Expand Down
Loading
Loading