diff --git a/.changelog/old-lakes-nod.md b/.changelog/old-lakes-nod.md new file mode 100644 index 00000000000..e7d87930fd7 --- /dev/null +++ b/.changelog/old-lakes-nod.md @@ -0,0 +1,10 @@ +--- +reth-engine-primitives: minor +reth-engine-tree: minor +reth-rpc-api: minor +reth-rpc-engine-api: patch +reth-bench: minor +reth-bb: patch +--- + +Added persistence backpressure tracking to `NewPayloadTimings`: `persistence_wait` is now a non-optional `Duration` that includes both time spent queued due to persistence backpressure and the explicit wait for in-flight persistence. Removed the `BENCH_RETH_NEW_PAYLOAD` toggle (always enabled), fixed `wait_time` to act as a minimum interval rather than a fixed sleep, and added warmup/wait-time metadata to benchmark summary output. diff --git a/.github/scripts/bench-reth-run.sh b/.github/scripts/bench-reth-run.sh index ba2054f5a2b..c4b9860a668 100755 --- a/.github/scripts/bench-reth-run.sh +++ b/.github/scripts/bench-reth-run.sh @@ -7,7 +7,6 @@ # # Required env: SCHELK_MOUNT, BENCH_RPC_URL, BENCH_BLOCKS, BENCH_WARMUP_BLOCKS # Optional env: BENCH_BIG_BLOCKS (true/false), BENCH_WORK_DIR (for big blocks path) -# BENCH_RETH_NEW_PAYLOAD (true/false, default true) # BENCH_WAIT_TIME (duration like 500ms, default empty) # BENCH_BASELINE_ARGS (extra reth node args for baseline runs) # BENCH_FEATURE_ARGS (extra reth node args for feature runs) @@ -240,10 +239,7 @@ fi BENCH_NICE="sudo nice -n -20 sudo -u $(id -un)" # Build optional flags -EXTRA_BENCH_ARGS=() -if [ "${BENCH_RETH_NEW_PAYLOAD:-true}" != "false" ]; then - EXTRA_BENCH_ARGS+=(--reth-new-payload --wait-for-persistence) -fi +EXTRA_BENCH_ARGS=(--reth-new-payload) if [ -n "${BENCH_WAIT_TIME:-}" ]; then EXTRA_BENCH_ARGS+=(--wait-time "$BENCH_WAIT_TIME") fi @@ -252,7 +248,7 @@ if [ "$BIG_BLOCKS" = "true" ]; then # Big blocks mode: replay pre-generated payloads BIG_BLOCKS_DIR="${BENCH_BIG_BLOCKS_DIR:-${BENCH_WORK_DIR}/big-blocks}" - BB_BENCH_ARGS=(--reth-new-payload --wait-for-persistence) + BB_BENCH_ARGS=(--reth-new-payload) if [ -n "${BENCH_WAIT_TIME:-}" ]; then BB_BENCH_ARGS+=(--wait-time "$BENCH_WAIT_TIME") fi diff --git a/.github/scripts/bench-reth-summary.py b/.github/scripts/bench-reth-summary.py index 21e9e01913d..3bd82d2c655 100755 --- a/.github/scripts/bench-reth-summary.py +++ b/.github/scripts/bench-reth-summary.py @@ -351,6 +351,8 @@ def generate_comparison_table( feature_name: str, feature_sha: str, big_blocks: bool = False, + warmup_blocks: str | None = None, + wait_time: str | None = None, ) -> str: """Generate a markdown comparison table between baseline and feature.""" n = paired["blocks"] @@ -391,8 +393,13 @@ def pct(base: float, feat: float) -> float: f"| Mgas/s | {fmt_mgas(run1['mean_mgas_s'])} | {fmt_mgas(run2['mean_mgas_s'])} | {change_str(gas_pct, mgas_ci_pct, lower_is_better=False)} |", f"| Wall Clock | {fmt_s(run1['wall_clock_s'])} | {fmt_s(run2['wall_clock_s'])} | {change_str(wall_pct, wall_ci_pct, lower_is_better=True)} |", "", - f"*{n} {'big blocks' if big_blocks else 'blocks'}*", ] + meta_parts = [f"{n} {'big blocks' if big_blocks else 'blocks'}"] + if warmup_blocks: + meta_parts.append(f"{warmup_blocks} warmup") + if wait_time: + meta_parts.append(f"wait time: {wait_time}") + lines.append(f"*{', '.join(meta_parts)}*") return "\n".join(lines) @@ -472,6 +479,8 @@ def main(): parser.add_argument("--feature-ref", "--branch-sha", "--feature-sha", default=None, help="Feature commit SHA") parser.add_argument("--behind-baseline", "--behind-main", type=int, default=0, help="Commits behind baseline") parser.add_argument("--big-blocks", action="store_true", default=False, help="Big blocks mode") + parser.add_argument("--warmup-blocks", default=None, help="Number of warmup blocks") + parser.add_argument("--wait-time", default=None, help="Wait time interval used between blocks") parser.add_argument("--grafana-url", default=None, help="Grafana dashboard URL for this benchmark run") args = parser.parse_args() @@ -522,6 +531,8 @@ def main(): feature_name=feature_name, feature_sha=feature_sha, big_blocks=args.big_blocks, + warmup_blocks=args.warmup_blocks, + wait_time=args.wait_time, ) print(f"Generated comparison ({paired_stats['n']} paired blocks, " f"mean diff {paired_stats['mean_diff_ms']:+.3f}ms ± {paired_stats['ci_ms']:.3f}ms)") @@ -553,6 +564,8 @@ def main(): summary = { "blocks": paired_stats["blocks"], "big_blocks": args.big_blocks, + "warmup_blocks": args.warmup_blocks, + "wait_time": args.wait_time, "baseline": { "name": baseline_name, "ref": baseline_ref, diff --git a/.github/scripts/bench-utils.js b/.github/scripts/bench-utils.js index 11970132884..14ab78fc054 100644 --- a/.github/scripts/bench-utils.js +++ b/.github/scripts/bench-utils.js @@ -50,6 +50,7 @@ function blocksLabel(summary) { } const cores = process.env.BENCH_CORES || '0'; if (cores !== '0') parts.push({ key: 'Cores', value: cores }); + if (summary.wait_time) parts.push({ key: 'Wait time', value: summary.wait_time }); return parts; } diff --git a/.github/workflows/bench-scheduled.yml b/.github/workflows/bench-scheduled.yml index 6889e5dafe4..b3780080fd2 100644 --- a/.github/workflows/bench-scheduled.yml +++ b/.github/workflows/bench-scheduled.yml @@ -249,7 +249,6 @@ jobs: BENCH_SAMPLY: "false" BENCH_CORES: "0" BENCH_BIG_BLOCKS: "false" - BENCH_RETH_NEW_PAYLOAD: "true" BENCH_WAIT_TIME: "" BENCH_BASELINE_ARGS: "" BENCH_FEATURE_ARGS: "" diff --git a/.github/workflows/bench.yml b/.github/workflows/bench.yml index 10e1e6d3709..3fc85ffa478 100644 --- a/.github/workflows/bench.yml +++ b/.github/workflows/bench.yml @@ -37,7 +37,7 @@ on: default: "" type: string wait_time: - description: "Fixed wait time between blocks (e.g. 500ms, 1s)" + description: "Minimum interval between block submissions (e.g. 500ms, 1s)" required: false default: "" type: string @@ -56,11 +56,6 @@ on: required: false default: "false" type: boolean - reth_newPayload: - description: "Use reth_newPayload RPC (server-side timing)" - required: false - default: "true" - type: boolean cores: description: "Limit reth to N CPU cores (0 = all available)" required: false @@ -113,7 +108,6 @@ jobs: no-slack: ${{ steps.args.outputs.no-slack }} cores: ${{ steps.args.outputs.cores }} big-blocks: ${{ steps.args.outputs.big-blocks }} - reth-new-payload: ${{ steps.args.outputs.reth-new-payload }} wait-time: ${{ steps.args.outputs.wait-time }} baseline-args: ${{ steps.args.outputs.baseline-args }} feature-args: ${{ steps.args.outputs.feature-args }} @@ -158,7 +152,6 @@ jobs: var noSlack = '${{ github.event.inputs.no_slack }}' !== 'false' ? 'true' : 'false'; cores = '${{ github.event.inputs.cores }}' || '0'; bigBlocks = '${{ github.event.inputs.big_blocks }}' === 'true' ? 'true' : 'false'; - var rethNewPayload = '${{ github.event.inputs.reth_newPayload }}' !== 'false' ? 'true' : 'false'; var abba = '${{ github.event.inputs.abba }}' !== 'false' ? 'true' : 'false'; var otlp = '${{ github.event.inputs.otlp }}' !== 'false' ? 'true' : 'false'; var waitTime = '${{ github.event.inputs.wait_time }}' || ''; @@ -186,10 +179,10 @@ jobs: const intArgs = new Set(['warmup', 'cores', 'blocks']); const refArgs = new Set(['baseline', 'feature']); const boolArgs = new Set(['samply', 'no-slack', 'big-blocks']); - const boolDefaultTrue = new Set(['reth_newPayload', 'abba', 'otlp']); + const boolDefaultTrue = new Set(['abba', 'otlp']); const durationArgs = new Set(['wait-time']); const stringArgs = new Set(['baseline-args', 'feature-args']); - const defaults = { blocks: '500', warmup: '100', baseline: '', feature: '', samply: 'false', 'no-slack': 'false', 'big-blocks': 'false', cores: '0', reth_newPayload: 'true', abba: 'true', otlp: 'true', 'wait-time': '', 'baseline-args': '', 'feature-args': '' }; + const defaults = { blocks: '500', warmup: '100', baseline: '', feature: '', samply: 'false', 'no-slack': 'false', 'big-blocks': 'false', cores: '0', abba: 'true', otlp: 'true', 'wait-time': '', 'baseline-args': '', 'feature-args': '' }; const unknown = []; const invalid = []; const args = body.replace(/^(?:@decofe|derek) bench\s*/, ''); @@ -250,7 +243,7 @@ jobs: if (unknown.length) errors.push(`Unknown argument(s): \`${unknown.join('`, `')}\``); if (invalid.length) errors.push(`Invalid value(s): ${invalid.join(', ')}`); if (errors.length) { - const msg = `❌ **Invalid bench command**\n\n${errors.join('\n')}\n\n**Usage:** \`@decofe bench [blocks=N] [big-blocks] [warmup=N] [baseline=REF] [feature=REF] [samply] [no-slack] [cores=N] [reth_newPayload=true|false] [abba=true|false] [otlp=true|false] [wait-time=DURATION] [baseline-args="..."] [feature-args="..."]\``; + const msg = `❌ **Invalid bench command**\n\n${errors.join('\n')}\n\n**Usage:** \`@decofe bench [blocks=N] [big-blocks] [warmup=N] [baseline=REF] [feature=REF] [samply] [no-slack] [cores=N] [abba=true|false] [otlp=true|false] [wait-time=DURATION] [baseline-args="..."] [feature-args="..."]\``; await github.rest.issues.createComment({ owner: context.repo.owner, repo: context.repo.repo, @@ -268,7 +261,6 @@ jobs: var noSlack = defaults['no-slack']; cores = defaults.cores; bigBlocks = defaults['big-blocks']; - var rethNewPayload = defaults.reth_newPayload; var abba = defaults.abba; var otlp = defaults.otlp; var waitTime = defaults['wait-time']; @@ -304,7 +296,6 @@ jobs: core.setOutput('no-slack', noSlack); core.setOutput('cores', cores); core.setOutput('big-blocks', bigBlocks); - core.setOutput('reth-new-payload', rethNewPayload); core.setOutput('wait-time', waitTime); core.setOutput('baseline-args', baselineNodeArgs); core.setOutput('feature-args', featureNodeArgs); @@ -373,8 +364,6 @@ jobs: const noSlackNote = noSlack ? ', no-slack' : ''; const cores = '${{ steps.args.outputs.cores }}'; const coresNote = cores && cores !== '0' ? `, cores: \`${cores}\`` : ''; - const rethNP = '${{ steps.args.outputs.reth-new-payload }}' !== 'false'; - const rethNPNote = !rethNP ? ', reth_newPayload: `disabled`' : ''; const abbaEnabled = '${{ steps.args.outputs.abba }}' !== 'false'; const abbaNote = !abbaEnabled ? ', abba: `disabled`' : ''; const otlpEnabled = '${{ steps.args.outputs.otlp }}' !== 'false'; @@ -386,7 +375,7 @@ jobs: const featureArgsVal = '${{ steps.args.outputs.feature-args }}'; const featureArgsNote = featureArgsVal ? `, feature-args: \`${featureArgsVal}\`` : ''; const blocksDesc = bigBlocks ? 'blocks: `big`' : `${blocks} blocks, ${warmup} warmup blocks`; - const config = `**Config:** ${blocksDesc}, baseline: \`${baseline}\`, feature: \`${feature}\`${samplyNote}${noSlackNote}${coresNote}${rethNPNote}${abbaNote}${otlpNote}${waitTimeNote}${baselineArgsNote}${featureArgsNote}`; + const config = `**Config:** ${blocksDesc}, baseline: \`${baseline}\`, feature: \`${feature}\`${samplyNote}${noSlackNote}${coresNote}${abbaNote}${otlpNote}${waitTimeNote}${baselineArgsNote}${featureArgsNote}`; const { data: comment } = await github.rest.issues.createComment({ owner: context.repo.owner, @@ -417,8 +406,6 @@ jobs: const noSlackNote = noSlack ? ', no-slack' : ''; const cores = '${{ steps.args.outputs.cores }}'; const coresNote = cores && cores !== '0' ? `, cores: \`${cores}\`` : ''; - const rethNP = '${{ steps.args.outputs.reth-new-payload }}' !== 'false'; - const rethNPNote = !rethNP ? ', reth_newPayload: `disabled`' : ''; const abbaEnabled = '${{ steps.args.outputs.abba }}' !== 'false'; const abbaNote = !abbaEnabled ? ', abba: `disabled`' : ''; const otlpEnabled = '${{ steps.args.outputs.otlp }}' !== 'false'; @@ -430,7 +417,7 @@ jobs: const featureArgsVal = '${{ steps.args.outputs.feature-args }}'; const featureArgsNote = featureArgsVal ? `, feature-args: \`${featureArgsVal}\`` : ''; const blocksDesc = bigBlocks ? 'blocks: `big`' : `${blocks} blocks, ${warmup} warmup blocks`; - const config = `**Config:** ${blocksDesc}, baseline: \`${baseline}\`, feature: \`${feature}\`${samplyNote}${noSlackNote}${coresNote}${rethNPNote}${abbaNote}${otlpNote}${waitTimeNote}${baselineArgsNote}${featureArgsNote}`; + const config = `**Config:** ${blocksDesc}, baseline: \`${baseline}\`, feature: \`${feature}\`${samplyNote}${noSlackNote}${coresNote}${abbaNote}${otlpNote}${waitTimeNote}${baselineArgsNote}${featureArgsNote}`; const runUrl = `${context.serverUrl}/${context.repo.owner}/${context.repo.repo}/actions/runs/${context.runId}`; const numRunners = parseInt(process.env.BENCH_RUNNERS) || 1; @@ -495,7 +482,6 @@ jobs: BENCH_SAMPLY: ${{ needs.reth-bench-ack.outputs.samply }} BENCH_CORES: ${{ needs.reth-bench-ack.outputs.cores }} BENCH_BIG_BLOCKS: ${{ needs.reth-bench-ack.outputs.big-blocks }} - BENCH_RETH_NEW_PAYLOAD: ${{ needs.reth-bench-ack.outputs.reth-new-payload }} BENCH_WAIT_TIME: ${{ needs.reth-bench-ack.outputs.wait-time }} BENCH_BASELINE_ARGS: ${{ needs.reth-bench-ack.outputs.baseline-args }} BENCH_FEATURE_ARGS: ${{ needs.reth-bench-ack.outputs.feature-args }} @@ -563,8 +549,6 @@ jobs: const noSlackNote = noSlack ? ', no-slack' : ''; const cores = process.env.BENCH_CORES || '0'; const coresNote = cores && cores !== '0' ? `, cores: \`${cores}\`` : ''; - const rethNP = (process.env.BENCH_RETH_NEW_PAYLOAD || 'true') !== 'false'; - const rethNPNote = !rethNP ? ', reth_newPayload: `disabled`' : ''; const abbaEnabled = (process.env.BENCH_ABBA || 'true') !== 'false'; const abbaNote = !abbaEnabled ? ', abba: `disabled`' : ''; const otlpEnabled = (process.env.BENCH_OTLP || 'true') !== 'false'; @@ -576,7 +560,7 @@ jobs: const featureArgsVal = process.env.BENCH_FEATURE_ARGS || ''; const featureArgsNote = featureArgsVal ? `, feature-args: \`${featureArgsVal}\`` : ''; const blocksDesc = bigBlocks ? 'blocks: `big`' : `${blocks} blocks, ${warmup} warmup blocks`; - core.exportVariable('BENCH_CONFIG', `**Config:** ${blocksDesc}, baseline: \`${baseline}\`, feature: \`${feature}\`${samplyNote}${noSlackNote}${coresNote}${rethNPNote}${abbaNote}${otlpNote}${waitTimeNote}${baselineArgsNote}${featureArgsNote}`); + core.exportVariable('BENCH_CONFIG', `**Config:** ${blocksDesc}, baseline: \`${baseline}\`, feature: \`${feature}\`${samplyNote}${noSlackNote}${coresNote}${abbaNote}${otlpNote}${waitTimeNote}${baselineArgsNote}${featureArgsNote}`); const { buildBody } = require('./.github/scripts/bench-update-status.js'); await github.rest.issues.updateComment({ @@ -1121,6 +1105,12 @@ jobs: if [ "${BENCH_BIG_BLOCKS:-false}" = "true" ]; then SUMMARY_ARGS="$SUMMARY_ARGS --big-blocks" fi + if [ -n "${BENCH_WARMUP_BLOCKS:-}" ]; then + SUMMARY_ARGS="$SUMMARY_ARGS --warmup-blocks $BENCH_WARMUP_BLOCKS" + fi + if [ -n "${BENCH_WAIT_TIME:-}" ]; then + SUMMARY_ARGS="$SUMMARY_ARGS --wait-time $BENCH_WAIT_TIME" + fi GRAFANA_URL='${{ steps.metrics.outputs.grafana-url }}' if [ -n "$GRAFANA_URL" ]; then SUMMARY_ARGS="$SUMMARY_ARGS --grafana-url $GRAFANA_URL" diff --git a/bin/reth-bb/src/main.rs b/bin/reth-bb/src/main.rs index 783786f3a77..a5f05aef2cf 100644 --- a/bin/reth-bb/src/main.rs +++ b/bin/reth-bb/src/main.rs @@ -125,7 +125,7 @@ impl BbRethEngineApiServer for BbRethEngineApiHandler { Ok(RethPayloadStatus { status, latency_us: timings.latency.as_micros() as u64, - persistence_wait_us: timings.persistence_wait.map(|d| d.as_micros() as u64), + persistence_wait_us: timings.persistence_wait.as_micros() as u64, execution_cache_wait_us: timings.execution_cache_wait.map(|d| d.as_micros() as u64), sparse_trie_wait_us: timings.sparse_trie_wait.map(|d| d.as_micros() as u64), }) diff --git a/bin/reth-bench/src/bench/new_payload_fcu.rs b/bin/reth-bench/src/bench/new_payload_fcu.rs index d219a128f81..986eb4057eb 100644 --- a/bin/reth-bench/src/bench/new_payload_fcu.rs +++ b/bin/reth-bench/src/bench/new_payload_fcu.rs @@ -84,7 +84,7 @@ impl Command { pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> { // Log mode configuration if let Some(duration) = self.wait_time { - info!(target: "reth-bench", "Using wait-time mode with {}ms delay between blocks", duration.as_millis()); + info!(target: "reth-bench", "Using wait-time mode with {}ms minimum interval between blocks", duration.as_millis()); } let BenchContext { @@ -216,7 +216,10 @@ impl Command { let new_payload_result = NewPayloadResult { gas_used, latency: np_latency, - persistence_wait: server_timings.as_ref().and_then(|t| t.persistence_wait), + persistence_wait: server_timings + .as_ref() + .map(|t| t.persistence_wait) + .unwrap_or_default(), execution_cache_wait: server_timings .as_ref() .map(|t| t.execution_cache_wait) @@ -265,7 +268,10 @@ impl Command { } if let Some(wait_time) = self.wait_time { - tokio::time::sleep(wait_time).await; + let remaining = wait_time.saturating_sub(start.elapsed()); + if !remaining.is_zero() { + tokio::time::sleep(remaining).await; + } } let gas_row = diff --git a/bin/reth-bench/src/bench/new_payload_only.rs b/bin/reth-bench/src/bench/new_payload_only.rs index 3d55a20bb26..e169aa12154 100644 --- a/bin/reth-bench/src/bench/new_payload_only.rs +++ b/bin/reth-bench/src/bench/new_payload_only.rs @@ -142,7 +142,10 @@ impl Command { let new_payload_result = NewPayloadResult { gas_used, latency, - persistence_wait: server_timings.as_ref().and_then(|t| t.persistence_wait), + persistence_wait: server_timings + .as_ref() + .map(|t| t.persistence_wait) + .unwrap_or_default(), execution_cache_wait: server_timings .as_ref() .map(|t| t.execution_cache_wait) diff --git a/bin/reth-bench/src/bench/output.rs b/bin/reth-bench/src/bench/output.rs index 1a557528215..bb77d34e1f6 100644 --- a/bin/reth-bench/src/bench/output.rs +++ b/bin/reth-bench/src/bench/output.rs @@ -25,8 +25,8 @@ pub(crate) struct NewPayloadResult { pub(crate) gas_used: u64, /// The latency of the `newPayload` call. pub(crate) latency: Duration, - /// Time spent waiting for persistence. `None` when no persistence was in-flight. - pub(crate) persistence_wait: Option, + /// Time spent waiting on persistence (backpressure + explicit wait). + pub(crate) persistence_wait: Duration, /// Time spent waiting for execution cache lock. pub(crate) execution_cache_wait: Duration, /// Time spent waiting for sparse trie lock. @@ -64,7 +64,7 @@ impl Serialize for NewPayloadResult { let mut state = serializer.serialize_struct("NewPayloadResult", 5)?; state.serialize_field("gas_used", &self.gas_used)?; state.serialize_field("latency", &time)?; - state.serialize_field("persistence_wait", &self.persistence_wait.map(|d| d.as_micros()))?; + state.serialize_field("persistence_wait", &self.persistence_wait.as_micros())?; state.serialize_field("execution_cache_wait", &self.execution_cache_wait.as_micros())?; state.serialize_field("sparse_trie_wait", &self.sparse_trie_wait.as_micros())?; state.end() @@ -116,8 +116,8 @@ impl std::fmt::Display for CombinedResult { if !np.sparse_trie_wait.is_zero() { write!(f, ", trie cache wait: {:?}", np.sparse_trie_wait)?; } - if let Some(d) = np.persistence_wait { - write!(f, ", persistence wait: {d:?}")?; + if !np.persistence_wait.is_zero() { + write!(f, ", persistence wait: {:?}", np.persistence_wait)?; } Ok(()) } @@ -146,7 +146,7 @@ impl Serialize for CombinedResult { state.serialize_field("total_latency", &total_latency)?; state.serialize_field( "persistence_wait", - &self.new_payload_result.persistence_wait.map(|d| d.as_micros()), + &self.new_payload_result.persistence_wait.as_micros(), )?; state.serialize_field( "execution_cache_wait", diff --git a/bin/reth-bench/src/bench/replay_payloads.rs b/bin/reth-bench/src/bench/replay_payloads.rs index c6905d6ff18..683fdc29bf3 100644 --- a/bin/reth-bench/src/bench/replay_payloads.rs +++ b/bin/reth-bench/src/bench/replay_payloads.rs @@ -135,7 +135,7 @@ impl Command { // Log mode configuration if let Some(duration) = self.wait_time { - info!(target: "reth-bench", "Using wait-time mode with {}ms delay between blocks", duration.as_millis()); + info!(target: "reth-bench", "Using wait-time mode with {}ms minimum interval between blocks", duration.as_millis()); } if self.reth_new_payload { info!("Using reth_newPayload and reth_forkchoiceUpdated endpoints"); @@ -274,7 +274,10 @@ impl Command { let new_payload_result = NewPayloadResult { gas_used, latency: np_latency, - persistence_wait: server_timings.as_ref().and_then(|t| t.persistence_wait), + persistence_wait: server_timings + .as_ref() + .map(|t| t.persistence_wait) + .unwrap_or_default(), execution_cache_wait: server_timings .as_ref() .map(|t| t.execution_cache_wait) @@ -318,7 +321,10 @@ impl Command { } if let Some(wait_time) = self.wait_time { - tokio::time::sleep(wait_time).await; + let remaining = wait_time.saturating_sub(start.elapsed()); + if !remaining.is_zero() { + tokio::time::sleep(remaining).await; + } } let gas_row = diff --git a/bin/reth-bench/src/valid_payload.rs b/bin/reth-bench/src/valid_payload.rs index 038c4e25746..b91ae1c01a8 100644 --- a/bin/reth-bench/src/valid_payload.rs +++ b/bin/reth-bench/src/valid_payload.rs @@ -313,7 +313,7 @@ pub(crate) async fn call_new_payload>( struct RethPayloadStatus { latency_us: u64, #[serde(default)] - persistence_wait_us: Option, + persistence_wait_us: u64, #[serde(default)] execution_cache_wait_us: u64, #[serde(default)] @@ -325,8 +325,8 @@ struct RethPayloadStatus { pub(crate) struct NewPayloadTimingBreakdown { /// Server-side execution latency. pub(crate) latency: Duration, - /// Time spent waiting for persistence. `None` when no persistence was in-flight. - pub(crate) persistence_wait: Option, + /// Time spent waiting on persistence (backpressure + explicit wait). + pub(crate) persistence_wait: Duration, /// Time spent waiting for execution cache lock. pub(crate) execution_cache_wait: Duration, /// Time spent waiting for sparse trie lock. @@ -374,7 +374,7 @@ pub(crate) async fn call_new_payload_with_reth>( Ok(Some(NewPayloadTimingBreakdown { latency: Duration::from_micros(resp.latency_us), - persistence_wait: resp.persistence_wait_us.map(Duration::from_micros), + persistence_wait: Duration::from_micros(resp.persistence_wait_us), execution_cache_wait: Duration::from_micros(resp.execution_cache_wait_us), sparse_trie_wait: Duration::from_micros(resp.sparse_trie_wait_us), })) diff --git a/crates/engine/primitives/src/message.rs b/crates/engine/primitives/src/message.rs index 425626a8560..430beee3f1b 100644 --- a/crates/engine/primitives/src/message.rs +++ b/crates/engine/primitives/src/message.rs @@ -15,7 +15,7 @@ use futures::{future::Either, FutureExt, TryFutureExt}; use reth_errors::RethResult; use reth_payload_builder_primitives::PayloadBuilderError; use reth_payload_primitives::PayloadTypes; -use std::time::Duration; +use std::time::{Duration, Instant}; use tokio::sync::{mpsc::UnboundedSender, oneshot}; /// Type alias for backwards compat @@ -148,10 +148,10 @@ impl Future for PendingPayloadId { pub struct NewPayloadTimings { /// Server-side execution latency. pub latency: Duration, - /// Time spent waiting for persistence to complete. - /// - /// `None` when wasn't asked to wait for persistence. - pub persistence_wait: Option, + /// Time spent waiting on persistence, including both time this message spent queued + /// due to persistence backpressure and, when `wait_for_persistence` was requested, + /// the explicit wait for in-flight persistence to complete. + pub persistence_wait: Duration, /// Time spent waiting for the execution cache lock. /// /// `None` when wasn't asked to wait for execution cache. @@ -212,6 +212,8 @@ pub enum BeaconEngineMessage { wait_for_caches: bool, /// The sender for returning payload status result and timing breakdown. tx: oneshot::Sender>, + /// When this message was enqueued, used to measure backpressure wait time. + enqueued_at: Instant, }, /// Message with updated forkchoice state. ForkchoiceUpdated { @@ -308,6 +310,7 @@ where wait_for_persistence, wait_for_caches, tx, + enqueued_at: Instant::now(), }); rx.await.map_err(|_| BeaconOnNewPayloadError::EngineUnavailable)? } diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 7d08a2929c6..6314c844ab1 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -1643,6 +1643,7 @@ where wait_for_persistence, wait_for_caches, tx, + enqueued_at, } => { debug!( target: "engine::tree", @@ -1651,7 +1652,9 @@ where "Processing reth_newPayload" ); - let persistence_wait = if wait_for_persistence { + let backpressure_wait = enqueued_at.elapsed(); + + let explicit_persistence_wait = if wait_for_persistence { let pending_persistence = self.persistence_state.rx.take(); if let Some((rx, start_time, _action)) = pending_persistence { let (persistence_tx, persistence_rx) = @@ -1674,12 +1677,12 @@ where .recv() .expect("persistence result channel closed"); let _ = self.on_persistence_complete(result, start_time); - Some(wait_duration) + wait_duration } else { - Some(Duration::ZERO) + Duration::ZERO } } else { - None + Duration::ZERO }; let cache_wait = wait_for_caches @@ -1702,7 +1705,7 @@ where let timings = NewPayloadTimings { latency, - persistence_wait, + persistence_wait: backpressure_wait + explicit_persistence_wait, execution_cache_wait: cache_wait .map(|wait| wait.execution_cache), sparse_trie_wait: cache_wait.map(|wait| wait.sparse_trie), diff --git a/crates/rpc/rpc-api/src/reth_engine.rs b/crates/rpc/rpc-api/src/reth_engine.rs index 19b98807963..5161650631f 100644 --- a/crates/rpc/rpc-api/src/reth_engine.rs +++ b/crates/rpc/rpc-api/src/reth_engine.rs @@ -13,11 +13,10 @@ pub struct RethPayloadStatus { pub status: PayloadStatus, /// Server-side execution latency in microseconds. pub latency_us: u64, - /// Time spent waiting for persistence to complete, in microseconds. - /// - /// `None` when wasn't asked to wait. - #[serde(skip_serializing_if = "Option::is_none")] - pub persistence_wait_us: Option, + /// Time spent waiting on persistence in microseconds, including both time spent + /// queued due to persistence backpressure and, when requested, the explicit wait + /// for in-flight persistence to complete. + pub persistence_wait_us: u64, /// Time spent waiting for the execution cache lock, in microseconds. /// /// `None` when wasn't asked to wait. diff --git a/crates/rpc/rpc-engine-api/src/reth_engine_api.rs b/crates/rpc/rpc-engine-api/src/reth_engine_api.rs index c91550fa370..7230552a399 100644 --- a/crates/rpc/rpc-engine-api/src/reth_engine_api.rs +++ b/crates/rpc/rpc-engine-api/src/reth_engine_api.rs @@ -55,7 +55,7 @@ impl RethEngineApiServer for Reth Ok(RethPayloadStatus { status, latency_us: timings.latency.as_micros() as u64, - persistence_wait_us: timings.persistence_wait.map(|d| d.as_micros() as u64), + persistence_wait_us: timings.persistence_wait.as_micros() as u64, execution_cache_wait_us: timings.execution_cache_wait.map(|d| d.as_micros() as u64), sparse_trie_wait_us: timings.sparse_trie_wait.map(|d| d.as_micros() as u64), })