diff --git a/.github/scripts/bench-reth-build.sh b/.github/scripts/bench-reth-build.sh index 34ab514c470..000c971bd25 100755 --- a/.github/scripts/bench-reth-build.sh +++ b/.github/scripts/bench-reth-build.sh @@ -11,10 +11,11 @@ # optional branch-sha is the PR head commit for cache key # # Outputs: -# baseline: /target/profiling/reth -# feature: /target/profiling/reth, reth-bench installed to cargo bin +# baseline: /target/profiling/reth (or reth-bb if BENCH_BIG_BLOCKS=true) +# feature: /target/profiling/reth (or reth-bb), reth-bench installed to cargo bin # # Required: mc (MinIO client) with a configured alias +# Optional env: BENCH_BIG_BLOCKS (true/false) — build reth-bb instead of reth set -euo pipefail MC="mc" @@ -22,6 +23,16 @@ MODE="$1" SOURCE_DIR="$2" COMMIT="$3" +BIG_BLOCKS="${BENCH_BIG_BLOCKS:-false}" +# The node binary to build: reth-bb for big blocks, reth otherwise +if [ "$BIG_BLOCKS" = "true" ]; then + NODE_BIN="reth-bb" + NODE_PKG="-p reth-bb" +else + NODE_BIN="reth" + NODE_PKG="--bin reth" +fi + # Tracy support: when BENCH_TRACY is "on" or "full", add Tracy cargo features # and frame pointers for accurate stack traces. EXTRA_FEATURES="" @@ -62,18 +73,18 @@ case "$MODE" in mkdir -p "${SOURCE_DIR}/target/profiling" CACHE_VALID=false - if $MC stat "${BUCKET}/reth" &>/dev/null; then - echo "Cache hit for baseline (${COMMIT}), downloading binary..." - $MC cp "${BUCKET}/reth" "${SOURCE_DIR}/target/profiling/reth" - chmod +x "${SOURCE_DIR}/target/profiling/reth" - if verify_binary "${SOURCE_DIR}/target/profiling/reth" "${COMMIT}"; then + if $MC stat "${BUCKET}/${NODE_BIN}" &>/dev/null; then + echo "Cache hit for baseline (${COMMIT}), downloading ${NODE_BIN}..." + $MC cp "${BUCKET}/${NODE_BIN}" "${SOURCE_DIR}/target/profiling/${NODE_BIN}" + chmod +x "${SOURCE_DIR}/target/profiling/${NODE_BIN}" + if verify_binary "${SOURCE_DIR}/target/profiling/${NODE_BIN}" "${COMMIT}"; then CACHE_VALID=true else echo "Cached baseline binary is stale, rebuilding..." fi fi if [ "$CACHE_VALID" = false ]; then - echo "Building baseline (${COMMIT}) from source..." + echo "Building baseline ${NODE_BIN} (${COMMIT}) from source..." cd "${SOURCE_DIR}" FEATURES_ARG="" WORKSPACE_ARG="" @@ -84,8 +95,8 @@ case "$MODE" in fi # shellcheck disable=SC2086 RUSTFLAGS="-C target-cpu=native${EXTRA_RUSTFLAGS}" \ - cargo build --profile profiling --bin reth $WORKSPACE_ARG $FEATURES_ARG - $MC cp target/profiling/reth "${BUCKET}/reth" + cargo build --profile profiling $NODE_PKG $WORKSPACE_ARG $FEATURES_ARG + $MC cp "target/profiling/${NODE_BIN}" "${BUCKET}/${NODE_BIN}" fi ;; @@ -94,32 +105,34 @@ case "$MODE" in BUCKET="minio/reth-binaries/${BRANCH_SHA}${BUILD_SUFFIX}" CACHE_VALID=false - if $MC stat "${BUCKET}/reth" &>/dev/null && $MC stat "${BUCKET}/reth-bench" &>/dev/null; then + if $MC stat "${BUCKET}/${NODE_BIN}" &>/dev/null && $MC stat "${BUCKET}/reth-bench" &>/dev/null; then echo "Cache hit for ${BRANCH_SHA}, downloading binaries..." mkdir -p "${SOURCE_DIR}/target/profiling" - $MC cp "${BUCKET}/reth" "${SOURCE_DIR}/target/profiling/reth" + $MC cp "${BUCKET}/${NODE_BIN}" "${SOURCE_DIR}/target/profiling/${NODE_BIN}" $MC cp "${BUCKET}/reth-bench" /home/ubuntu/.cargo/bin/reth-bench - chmod +x "${SOURCE_DIR}/target/profiling/reth" /home/ubuntu/.cargo/bin/reth-bench - if verify_binary "${SOURCE_DIR}/target/profiling/reth" "${COMMIT}"; then + chmod +x "${SOURCE_DIR}/target/profiling/${NODE_BIN}" /home/ubuntu/.cargo/bin/reth-bench + if verify_binary "${SOURCE_DIR}/target/profiling/${NODE_BIN}" "${COMMIT}"; then CACHE_VALID=true else echo "Cached feature binary is stale, rebuilding..." fi fi if [ "$CACHE_VALID" = false ]; then - echo "Building feature (${COMMIT}) from source..." + echo "Building feature ${NODE_BIN} (${COMMIT}) from source..." cd "${SOURCE_DIR}" rustup show active-toolchain || rustup default stable if [ -n "$EXTRA_FEATURES" ]; then # Can't use `make profiling` when adding features; build explicitly # --workspace is needed for cross-package feature syntax (tracy-client/ondemand) RUSTFLAGS="-C target-cpu=native${EXTRA_RUSTFLAGS}" \ - cargo build --profile profiling --workspace --bin reth --features "${EXTRA_FEATURES}" + cargo build --profile profiling --workspace $NODE_PKG --features "${EXTRA_FEATURES}" else - make profiling + # shellcheck disable=SC2086 + RUSTFLAGS="-C target-cpu=native${EXTRA_RUSTFLAGS}" \ + cargo build --profile profiling $NODE_PKG fi make install-reth-bench - $MC cp target/profiling/reth "${BUCKET}/reth" + $MC cp "target/profiling/${NODE_BIN}" "${BUCKET}/${NODE_BIN}" $MC cp "$(which reth-bench)" "${BUCKET}/reth-bench" fi ;; diff --git a/.github/scripts/bench-reth-run.sh b/.github/scripts/bench-reth-run.sh index 482a1be215c..2630ef9c707 100755 --- a/.github/scripts/bench-reth-run.sh +++ b/.github/scripts/bench-reth-run.sh @@ -132,12 +132,6 @@ if "$BINARY" node --help 2>/dev/null | grep -qF -- '--debug.startup-sync-state-i SYNC_STATE_IDLE=true fi -# Big blocks mode requires the testing API, skip-invalid-transactions, and -# skip-gas-limit-ramp-check + gas-limit override to avoid the 6800-block ramp. -if [ "$BIG_BLOCKS" = "true" ]; then - RETH_ARGS+=(--http.api eth,net,web3,reth,testing --rpc.max-request-size max --testing.skip-invalid-transactions --testing.skip-gas-limit-ramp-check --testing.gas-limit 1000000000) -fi - # Append per-label extra node args (baseline or feature) EXTRA_NODE_ARGS="" case "$LABEL" in @@ -266,9 +260,18 @@ if [ "$BIG_BLOCKS" = "true" ]; then sleep 0.5 # give tracy-capture time to connect fi + BB_BENCH_ARGS=(--reth-new-payload --wait-for-persistence) + if [ -n "${BENCH_WAIT_TIME:-}" ]; then + BB_BENCH_ARGS+=(--wait-time "$BENCH_WAIT_TIME") + fi + # Limit number of payloads if blocks count is specified + if [ "${BENCH_BLOCKS:-0}" -gt 0 ] 2>/dev/null; then + BB_BENCH_ARGS+=(--count "$BENCH_BLOCKS") + fi + echo "Running big blocks benchmark (replay-payloads)..." $BENCH_NICE "$RETH_BENCH" replay-payloads \ - "${EXTRA_BENCH_ARGS[@]}" \ + "${BB_BENCH_ARGS[@]}" \ --payload-dir "$BIG_BLOCKS_DIR/payloads" \ --engine-rpc-url http://127.0.0.1:8551 \ --jwt-secret "$DATADIR/jwt.hex" \ diff --git a/.github/scripts/check_wasm.sh b/.github/scripts/check_wasm.sh index 0fcbc71fa21..a12300deba5 100755 --- a/.github/scripts/check_wasm.sh +++ b/.github/scripts/check_wasm.sh @@ -56,6 +56,7 @@ exclude_crates=( reth-ress-provider # The following are not supposed to be working reth # all of the crates below + reth-bb # binary-only, uses tokio features unsupported on wasm reth-storage-rpc-provider reth-invalid-block-hooks # reth-provider reth-libmdbx # mdbx diff --git a/.github/workflows/bench.yml b/.github/workflows/bench.yml index 464d2c40749..10e1e6d3709 100644 --- a/.github/workflows/bench.yml +++ b/.github/workflows/bench.yml @@ -12,10 +12,15 @@ on: workflow_dispatch: inputs: blocks: - description: "Number of blocks to benchmark (or 'big' for big blocks mode)" + description: "Number of blocks to benchmark" required: false default: "500" type: string + big_blocks: + description: "Use big blocks mode (pre-generated merged payloads with reth-bb)" + required: false + default: "false" + type: boolean warmup: description: "Number of warmup blocks" required: false @@ -152,7 +157,7 @@ jobs: samply = '${{ github.event.inputs.samply }}' === 'true' ? 'true' : 'false'; var noSlack = '${{ github.event.inputs.no_slack }}' !== 'false' ? 'true' : 'false'; cores = '${{ github.event.inputs.cores }}' || '0'; - bigBlocks = blocks === 'big' ? 'true' : 'false'; + bigBlocks = '${{ github.event.inputs.big_blocks }}' === 'true' ? 'true' : 'false'; var rethNewPayload = '${{ github.event.inputs.reth_newPayload }}' !== 'false' ? 'true' : 'false'; var abba = '${{ github.event.inputs.abba }}' !== 'false' ? 'true' : 'false'; var otlp = '${{ github.event.inputs.otlp }}' !== 'false' ? 'true' : 'false'; @@ -178,14 +183,13 @@ jobs: actor = context.payload.comment.user.login; const body = context.payload.comment.body.trim(); - const intArgs = new Set(['warmup', 'cores']); - const intOrKeywordArgs = new Map([['blocks', new Set(['big'])]]); + const intArgs = new Set(['warmup', 'cores', 'blocks']); const refArgs = new Set(['baseline', 'feature']); - const boolArgs = new Set(['samply', 'no-slack']); + const boolArgs = new Set(['samply', 'no-slack', 'big-blocks']); const boolDefaultTrue = new Set(['reth_newPayload', 'abba', 'otlp']); const durationArgs = new Set(['wait-time']); const stringArgs = new Set(['baseline-args', 'feature-args']); - const defaults = { blocks: '500', warmup: '100', baseline: '', feature: '', samply: 'false', 'no-slack': 'false', cores: '0', reth_newPayload: 'true', abba: 'true', otlp: 'true', 'wait-time': '', 'baseline-args': '', 'feature-args': '' }; + const defaults = { blocks: '500', warmup: '100', baseline: '', feature: '', samply: 'false', 'no-slack': 'false', 'big-blocks': 'false', cores: '0', reth_newPayload: 'true', abba: 'true', otlp: 'true', 'wait-time': '', 'baseline-args': '', 'feature-args': '' }; const unknown = []; const invalid = []; const args = body.replace(/^(?:@decofe|derek) bench\s*/, ''); @@ -230,15 +234,6 @@ jobs: } else { defaults[key] = value; } - } else if (intOrKeywordArgs.has(key)) { - const keywords = intOrKeywordArgs.get(key); - if (keywords.has(value)) { - defaults[key] = value; - } else if (/^\d+$/.test(value)) { - defaults[key] = value; - } else { - invalid.push(`\`${key}=${value}\` (must be a positive integer or one of: ${[...keywords].join(', ')})`); - } } else if (refArgs.has(key)) { if (!value) { invalid.push(`\`${key}=\` (must be a git ref)`); @@ -255,7 +250,7 @@ jobs: if (unknown.length) errors.push(`Unknown argument(s): \`${unknown.join('`, `')}\``); if (invalid.length) errors.push(`Invalid value(s): ${invalid.join(', ')}`); if (errors.length) { - const msg = `❌ **Invalid bench command**\n\n${errors.join('\n')}\n\n**Usage:** \`@decofe bench [blocks=N|big] [warmup=N] [baseline=REF] [feature=REF] [samply] [no-slack] [cores=N] [reth_newPayload=true|false] [abba=true|false] [otlp=true|false] [wait-time=DURATION] [baseline-args="..."] [feature-args="..."]\``; + const msg = `❌ **Invalid bench command**\n\n${errors.join('\n')}\n\n**Usage:** \`@decofe bench [blocks=N] [big-blocks] [warmup=N] [baseline=REF] [feature=REF] [samply] [no-slack] [cores=N] [reth_newPayload=true|false] [abba=true|false] [otlp=true|false] [wait-time=DURATION] [baseline-args="..."] [feature-args="..."]\``; await github.rest.issues.createComment({ owner: context.repo.owner, repo: context.repo.repo, @@ -272,7 +267,7 @@ jobs: samply = defaults.samply; var noSlack = defaults['no-slack']; cores = defaults.cores; - bigBlocks = blocks === 'big' ? 'true' : 'false'; + bigBlocks = defaults['big-blocks']; var rethNewPayload = defaults.reth_newPayload; var abba = defaults.abba; var otlp = defaults.otlp; @@ -508,6 +503,7 @@ jobs: BENCH_OTLP: ${{ needs.reth-bench-ack.outputs.otlp }} BENCH_COMMENT_ID: ${{ needs.reth-bench-ack.outputs.comment-id }} BENCH_NO_SLACK: ${{ needs.reth-bench-ack.outputs.no-slack }} + BENCH_NODE_BIN: ${{ needs.reth-bench-ack.outputs.big-blocks == 'true' && 'reth-bb' || 'reth' }} BENCH_METRICS_ADDR: "127.0.0.1:9100" BENCH_OTLP_TRACES_ENDPOINT: ${{ needs.reth-bench-ack.outputs.otlp != 'false' && secrets.BENCH_OTLP_TRACES_ENDPOINT || '' }} BENCH_OTLP_LOGS_ENDPOINT: ${{ needs.reth-bench-ack.outputs.otlp != 'false' && secrets.BENCH_OTLP_LOGS_ENDPOINT || '' }} @@ -825,7 +821,7 @@ jobs: env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} BENCH_REPO: ${{ github.repository }} - BENCH_RETH_BINARY: ${{ github.workspace }}/../reth-feature/target/profiling/reth + BENCH_RETH_BINARY: ${{ github.workspace }}/../reth-feature/target/profiling/${{ needs.reth-bench-ack.outputs.big-blocks == 'true' && 'reth-bb' || 'reth' }} run: .github/scripts/bench-reth-snapshot.sh # System tuning for reproducible benchmarks @@ -891,11 +887,23 @@ jobs: exit 0 fi MC="mc --config-dir /home/ubuntu/.mc" - BUCKET="minio/reth-snapshots/reth-1-minimal-stable-big-blocks.tar.zst" + MANIFEST="minio/reth-snapshots/reth-1-minimal-stable-big-blocks.json" rm -rf "$BIG_BLOCKS_DIR"; mkdir -p "$BIG_BLOCKS_DIR" - echo "Downloading big blocks from $BUCKET..." - $MC cat "$BUCKET" | pzstd -d -p 6 | tar -xf - -C "$BIG_BLOCKS_DIR" + + # Download and parse manifest + echo "Downloading manifest from $MANIFEST..." + $MC cat "$MANIFEST" > "$BIG_BLOCKS_DIR/manifest.json" + UPLOAD_PATH=$(jq -r '.upload_path' "$BIG_BLOCKS_DIR/manifest.json") + COUNT=$(jq -r '.count' "$BIG_BLOCKS_DIR/manifest.json") + TARGET_GAS=$(jq -r '.target_gas' "$BIG_BLOCKS_DIR/manifest.json") + echo "Manifest: count=$COUNT, target_gas=$TARGET_GAS, archive=$UPLOAD_PATH" + + # Download and extract archive + ARCHIVE="minio/$UPLOAD_PATH" + echo "Downloading big blocks from $ARCHIVE..." + $MC cat "$ARCHIVE" | pzstd -d -p 6 | tar -xf - -C "$BIG_BLOCKS_DIR" echo "Big blocks downloaded to $BIG_BLOCKS_DIR" + # Verify expected directory structure if [ ! -d "$BIG_BLOCKS_DIR/payloads" ]; then echo "::error::Big blocks archive missing expected payloads/ directory" @@ -946,7 +954,7 @@ jobs: cat > "$BENCH_LABELS_FILE" < "$BENCH_LABELS_FILE" < "$BENCH_LABELS_FILE" < "$BENCH_LABELS_FILE" < **Not for production use.** reth-bb disables some consensus-related validations to allow artificially large blocks. It is intended solely for performance benchmarking. + +## How it works + +reth-bb extends the standard Ethereum node with: + +1. **Multi-segment execution** — a custom `reth_newPayload` handler that accepts optional `BigBlockData` alongside the payload. When present, the block is executed in multiple segments, each with its own EVM environment (matching the original blocks that were merged). + +2. **Relaxed consensus** — the gas-limit bound-divisor check and blob gas validation are skipped, since merged blocks exceed single-block limits. + +## Quick start + +The full workflow has four steps: **build** binaries, **generate** big blocks, **start** reth-bb, and **replay** the payloads. + +### Prerequisites + +- A synced reth datadir for the target chain (e.g. hoodi) +- Rust toolchain + +### 1. Build + +```bash +cargo build --profile profiling -p reth-bb -p reth-bench +``` + +### 2. Generate big blocks + +Fetch consecutive blocks from an RPC and merge them until a target gas is reached. Use `--from-block` set to the block number following the one the node is currently synced to (i.e. the next block the node would process): + +```bash +reth-bench generate-big-block \ + --rpc-url https://rpc.hoodi.ethpandaops.io \ + --chain hoodi \ + --from-block 910020 \ + --target-gas 2G \ + --num-big-blocks 5 \ + --output-dir /tmp/payloads +``` + +This produces one JSON file per big block in the output directory. + +### 3. Start reth-bb + +```bash +reth-bb node \ + --datadir /data/reth/hoodi \ + --chain hoodi \ + --http --http.api debug,eth \ + --authrpc.jwtsecret /tmp/jwt.hex \ + -d +``` + +### 4. Replay payloads + +```bash +reth-bench replay-payloads \ + --engine-rpc-url http://localhost:8551 \ + --jwt-secret /tmp/jwt.hex \ + --payload-dir /tmp/payloads \ + --reth-new-payload +``` + +The `--reth-new-payload` flag is required for big blocks — it uses the `reth_newPayload` endpoint which carries the multi-segment execution metadata. diff --git a/bin/reth-bb/src/evm.rs b/bin/reth-bb/src/evm.rs new file mode 100644 index 00000000000..6d56914ad1d --- /dev/null +++ b/bin/reth-bb/src/evm.rs @@ -0,0 +1,570 @@ +//! Big-block executor. +//! +//! Provides [`BbBlockExecutor`] and [`BbBlockExecutorFactory`] which handle +//! segment boundaries within big-block payloads. +//! +//! [`BbBlockExecutor`] wraps [`EthBlockExecutor`] and intercepts +//! `execute_transaction` to apply segment-boundary changes. + +use crate::evm_config::BigBlockSegment; +use alloy_eips::eip7685::Requests; +use alloy_evm::{ + block::{ + BlockExecutionError, BlockExecutionResult, BlockExecutor, BlockExecutorFactory, + BlockExecutorFor, ExecutableTx, OnStateHook, StateChangeSource, StateDB, + }, + eth::{EthBlockExecutionCtx, EthBlockExecutor, EthEvmContext, EthTxResult}, + precompiles::PrecompilesMap, + Database, EthEvm, EthEvmFactory, Evm, FromRecoveredTx, FromTxWithEncoded, +}; +use alloy_primitives::B256; +use reth_ethereum_primitives::{Receipt, TransactionSigned}; +use reth_evm_ethereum::RethReceiptBuilder; +use revm::{ + context::{BlockEnv, TxEnv}, + context_interface::result::{EVMError, HaltReason}, + handler::PrecompileProvider, + interpreter::InterpreterResult, + primitives::hardfork::SpecId, + Inspector, +}; +use std::sync::{Arc, Mutex}; +use tracing::{debug, trace}; + +// --------------------------------------------------------------------------- +// BbEvmPlan — runtime segment tracking state +// --------------------------------------------------------------------------- + +/// Runtime state for segment boundary tracking. +pub(crate) struct BbEvmPlan { + /// The segment boundaries and environments. + pub(crate) segments: Vec, + /// Index of the next segment to switch to (starts at 1). + pub(crate) next_segment: usize, + /// Number of user transactions executed so far. + pub(crate) tx_counter: usize, + /// Block hashes to seed for inter-segment BLOCKHASH resolution. + /// Includes both prior block hashes and inter-segment hashes. + pub(crate) block_hashes_to_seed: Vec<(u64, B256)>, +} + +impl BbEvmPlan { + /// Creates a new `BbEvmPlan` from segments and hardfork flags. + pub(crate) fn new(segments: Vec) -> Self { + // Pre-compute all inter-segment block hashes. + let mut block_hashes_to_seed = Vec::new(); + for seg in segments.iter().skip(1) { + let finished_block_number = seg.evm_env.block_env.number.saturating_to::() - 1; + let finished_block_hash = seg.ctx.parent_hash; + block_hashes_to_seed.push((finished_block_number, finished_block_hash)); + } + + Self { segments, next_segment: 1, tx_counter: 0, block_hashes_to_seed } + } + + /// Returns the 256 block hashes relevant to a segment with the given block + /// number. BLOCKHASH can look back 256 blocks, so we select entries in + /// `[block_number - 256, block_number)`. + pub(crate) fn hashes_for_block(&self, block_number: u64) -> Vec<(u64, B256)> { + let min = block_number.saturating_sub(256); + self.block_hashes_to_seed + .iter() + .copied() + .filter(|(n, _)| *n >= min && *n < block_number) + .collect() + } +} + +impl std::fmt::Debug for BbEvmPlan { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("BbEvmPlan") + .field("segments", &self.segments) + .field("next_segment", &self.next_segment) + .field("tx_counter", &self.tx_counter) + .field("block_hashes_to_seed", &self.block_hashes_to_seed) + .finish() + } +} + +// --------------------------------------------------------------------------- +// BbBlockExecutor — handles segment boundaries +// --------------------------------------------------------------------------- + +/// Function pointer that seeds block hashes into the DB's block hash cache. +/// +/// Injected from `ConfigureEvm::create_executor` where the concrete `State` +/// type is known, allowing `BbBlockExecutor` to reseed the ring buffer at +/// segment boundaries without requiring additional trait bounds on `DB`. +pub(crate) type BlockHashSeeder = fn(&mut DB, &[(u64, B256)]); + +/// Block executor that wraps [`EthBlockExecutor`] and handles segment-boundary +/// changes for big-block execution. +/// +/// At segment boundaries, the inner executor is finished (applying its +/// end-of-block logic: post-execution system calls, withdrawal balance +/// increments) and a new one is constructed for the next segment (applying +/// its start-of-block logic: EIP-2935/EIP-4788 system calls). +/// +/// Gas counters reset at each boundary so that each segment's real gas limit +/// is used (preserving correct GASLIMIT opcode behavior). Accumulated offsets +/// are applied to receipts and totals in `finish()`. +pub(crate) struct BbBlockExecutor<'a, DB, I, P, Spec> +where + DB: Database, +{ + /// The inner executor. `None` transiently during `apply_segment_boundary`. + inner: Option, Spec, RethReceiptBuilder>>, + plan: Option, + /// Requests accumulated from segments that have been finished at + /// boundaries. Merged into the final result in `finish()`. + accumulated_requests: Requests, + /// Cumulative gas used by all segments that have been finished at + /// boundaries. Added to receipts and the final gas total in `finish()`. + gas_used_offset: u64, + /// Cumulative blob gas used by all segments that have been finished at + /// boundaries. + blob_gas_used_offset: u64, + /// Shared state hook that survives inner executor finish/reconstruct + /// cycles at segment boundaries. Each inner executor receives a + /// forwarding hook that delegates to this shared instance. + shared_hook: Arc>>>, + /// Callback to reseed block hashes into the DB's cache at segment + /// boundaries. See [`BlockHashSeeder`]. + block_hash_seeder: Option>, +} + +impl<'a, DB, I, P, Spec> BbBlockExecutor<'a, DB, I, P, Spec> +where + DB: StateDB, + I: Inspector>, + P: PrecompileProvider, Output = InterpreterResult>, + Spec: alloy_evm::eth::spec::EthExecutorSpec + Clone, + EthEvm: Evm< + DB = DB, + Tx = TxEnv, + HaltReason = HaltReason, + Error = EVMError, + Spec = SpecId, + BlockEnv = BlockEnv, + >, + TxEnv: FromRecoveredTx + FromTxWithEncoded, +{ + pub(crate) fn new( + evm: EthEvm, + ctx: EthBlockExecutionCtx<'a>, + spec: Spec, + receipt_builder: RethReceiptBuilder, + plan: Option, + block_hash_seeder: Option>, + ) -> Self { + let inner = EthBlockExecutor::new(evm, ctx, spec, receipt_builder); + Self { + inner: Some(inner), + plan, + accumulated_requests: Requests::default(), + gas_used_offset: 0, + blob_gas_used_offset: 0, + shared_hook: Arc::new(Mutex::new(None)), + block_hash_seeder, + } + } + + /// Creates a forwarding `OnStateHook` that delegates to the shared hook. + fn forwarding_hook(&self) -> Option> { + let shared = self.shared_hook.clone(); + Some(Box::new(move |source: StateChangeSource, state: &revm::state::EvmState| { + if let Some(hook) = shared.lock().unwrap().as_mut() { + hook.on_state(source, state); + } + })) + } + + const fn inner(&self) -> &EthBlockExecutor<'a, EthEvm, Spec, RethReceiptBuilder> { + self.inner.as_ref().expect("inner executor must exist") + } + + const fn inner_mut( + &mut self, + ) -> &mut EthBlockExecutor<'a, EthEvm, Spec, RethReceiptBuilder> { + self.inner.as_mut().expect("inner executor must exist") + } + + fn reseed_block_hashes_for(&mut self, block_number: u64) { + let Some(seeder) = self.block_hash_seeder else { return }; + let hashes = match &self.plan { + Some(plan) => plan.hashes_for_block(block_number), + None => return, + }; + seeder(self.inner_mut().evm_mut().db_mut(), &hashes); + } + + fn maybe_apply_boundary(&mut self) -> Result<(), BlockExecutionError> { + loop { + let plan = match &self.plan { + Some(p) => p, + None => return Ok(()), + }; + + if plan.next_segment >= plan.segments.len() || + plan.tx_counter != plan.segments[plan.next_segment].start_tx + { + return Ok(()); + } + + self.apply_segment_boundary()?; + } + } + + fn apply_segment_boundary(&mut self) -> Result<(), BlockExecutionError> { + let plan = self.plan.as_mut().expect("plan must exist"); + let seg_idx = plan.next_segment; + let prev_seg_idx = seg_idx - 1; + + debug!( + target: "engine::bb::evm", + seg_idx, + tx_counter = plan.tx_counter, + "Applying segment boundary" + ); + + // Swap the inner executor's ctx to the finished segment's values so + // that finish() applies the correct withdrawals and post-execution + // system calls for that segment. + let prev_segment = &plan.segments[prev_seg_idx]; + let prev_ctx = EthBlockExecutionCtx { + parent_hash: prev_segment.ctx.parent_hash, + parent_beacon_block_root: prev_segment.ctx.parent_beacon_block_root, + ommers: prev_segment.ctx.ommers, + withdrawals: prev_segment.ctx.withdrawals.clone(), + extra_data: prev_segment.ctx.extra_data.clone(), + tx_count_hint: prev_segment.ctx.tx_count_hint, + }; + + // Clone the next segment's data before we consume inner. + let new_segment = &plan.segments[seg_idx]; + let new_block_env = new_segment.evm_env.block_env.clone(); + let mut new_cfg_env = new_segment.evm_env.cfg_env.clone(); + new_cfg_env.disable_base_fee = true; + let new_ctx = EthBlockExecutionCtx { + parent_hash: new_segment.ctx.parent_hash, + parent_beacon_block_root: new_segment.ctx.parent_beacon_block_root, + ommers: new_segment.ctx.ommers, + withdrawals: new_segment.ctx.withdrawals.clone(), + extra_data: new_segment.ctx.extra_data.clone(), + tx_count_hint: new_segment.ctx.tx_count_hint, + }; + + plan.next_segment += 1; + + // Finish the inner executor for the completed segment. This applies + // post-execution system calls (EIP-7002/7251) and withdrawal balance + // increments via EthBlockExecutor::finish(). + let mut inner = self.inner.take().expect("inner executor must exist"); + inner.ctx = prev_ctx; + let spec = inner.spec.clone(); + let receipt_builder = inner.receipt_builder; + let (mut evm, result) = inner.finish()?; + + // Receipts already have globally-correct cumulative_gas_used (fixed + // up in commit_transaction). Update the offset with this segment's + // gas so that subsequent segments' receipts are adjusted correctly. + self.gas_used_offset += result.gas_used; + self.blob_gas_used_offset += result.blob_gas_used; + self.accumulated_requests.extend(result.requests); + + let last_receipt_cumulative = + result.receipts.last().map(|r| r.cumulative_gas_used).unwrap_or(0); + let seg_block_number = prev_segment.evm_env.block_env.number.saturating_to::(); + debug!( + target: "engine::bb::evm", + prev_seg_idx, + seg_block_number, + segment_gas_used = result.gas_used, + gas_used_offset = self.gas_used_offset, + last_receipt_cumulative, + receipt_count = result.receipts.len(), + "Finished segment" + ); + + // Swap EVM env to the next segment's values (using real gas_limit). + let ctx = evm.ctx_mut(); + ctx.block = new_block_env; + ctx.cfg = new_cfg_env; + + // Build a new inner executor for the next segment. gas_used starts + // at 0 so the per-transaction gas check uses this segment's real + // gas_limit correctly. + let mut new_inner = EthBlockExecutor::new(evm, new_ctx, spec, receipt_builder); + + // Carry forward receipts from prior segments. + new_inner.receipts = result.receipts; + + // Re-install the forwarding state hook so the parallel state root + // task continues to receive state changes. + if self.shared_hook.lock().unwrap().is_some() { + new_inner.set_state_hook(self.forwarding_hook()); + } + + self.inner = Some(new_inner); + + // Reseed the block hash cache for the new segment's 256-block window + // before applying pre-execution changes (which may use BLOCKHASH). + let new_block_number = self.plan.as_ref().unwrap().segments[seg_idx] + .evm_env + .block_env + .number + .saturating_to::(); + self.reseed_block_hashes_for(new_block_number); + + // Apply pre-execution changes for the new segment (EIP-2935, EIP-4788). + self.inner_mut().apply_pre_execution_changes()?; + + trace!(target: "engine::bb::evm", "Started segment {seg_idx}"); + + Ok(()) + } +} + +impl<'a, DB, I, P, Spec> BlockExecutor for BbBlockExecutor<'a, DB, I, P, Spec> +where + DB: StateDB, + I: Inspector>, + P: PrecompileProvider, Output = InterpreterResult>, + Spec: alloy_evm::eth::spec::EthExecutorSpec + Clone, + EthEvm: Evm< + DB = DB, + Tx = TxEnv, + HaltReason = HaltReason, + Error = EVMError, + Spec = SpecId, + BlockEnv = BlockEnv, + >, + TxEnv: FromRecoveredTx + FromTxWithEncoded, +{ + type Transaction = TransactionSigned; + type Receipt = Receipt; + type Evm = EthEvm; + type Result = EthTxResult; + + fn apply_pre_execution_changes(&mut self) -> Result<(), BlockExecutionError> { + // Swap the EVM's block_env and executor ctx to the first segment's + // values so that the initial EIP-2935/EIP-4788 system calls use the + // correct block number and parent hash. Without this, the outer big + // block header's block_number (which is synthetic) would be used, + // writing to wrong EIP-2935 slots and corrupting state. + if let Some(seg0) = self.plan.as_ref().map(|p| &p.segments[0]) { + let block_env = seg0.evm_env.block_env.clone(); + let block_number = block_env.number.saturating_to::(); + let mut cfg_env = seg0.evm_env.cfg_env.clone(); + cfg_env.disable_base_fee = true; + let seg0_ctx = EthBlockExecutionCtx { + parent_hash: seg0.ctx.parent_hash, + parent_beacon_block_root: seg0.ctx.parent_beacon_block_root, + ommers: seg0.ctx.ommers, + withdrawals: seg0.ctx.withdrawals.clone(), + extra_data: seg0.ctx.extra_data.clone(), + tx_count_hint: seg0.ctx.tx_count_hint, + }; + + let inner = self.inner_mut(); + let evm_ctx = inner.evm.ctx_mut(); + evm_ctx.block = block_env; + evm_ctx.cfg = cfg_env; + inner.ctx = seg0_ctx; + + self.reseed_block_hashes_for(block_number); + } + + self.inner_mut().apply_pre_execution_changes() + } + + fn execute_transaction_without_commit( + &mut self, + tx: impl ExecutableTx, + ) -> Result { + self.maybe_apply_boundary()?; + self.inner_mut().execute_transaction_without_commit(tx) + } + + fn commit_transaction(&mut self, output: Self::Result) -> Result { + let gas_used = self.inner_mut().commit_transaction(output)?; + + // Fix up cumulative_gas_used on the just-committed receipt so that + // the receipt root task (which reads receipts incrementally) sees + // globally-correct values across all segments. + let offset = self.gas_used_offset; + if offset > 0 && + let Some(receipt) = self.inner_mut().receipts.last_mut() + { + receipt.cumulative_gas_used += offset; + } + + if let Some(plan) = &mut self.plan { + plan.tx_counter += 1; + } + Ok(gas_used) + } + + fn finish( + mut self, + ) -> Result<(Self::Evm, BlockExecutionResult), BlockExecutionError> { + // Swap the inner executor's ctx to the last segment's ctx so that + // EthBlockExecutor::finish() applies the correct withdrawal balance + // increments and post-execution system calls. + if let Some(last_seg) = self.plan.as_ref().map(|p| p.segments.last().unwrap()) { + let last_ctx = EthBlockExecutionCtx { + parent_hash: last_seg.ctx.parent_hash, + parent_beacon_block_root: last_seg.ctx.parent_beacon_block_root, + ommers: last_seg.ctx.ommers, + withdrawals: last_seg.ctx.withdrawals.clone(), + extra_data: last_seg.ctx.extra_data.clone(), + tx_count_hint: last_seg.ctx.tx_count_hint, + }; + self.inner_mut().ctx = last_ctx; + } + let inner = self.inner.take().expect("inner executor must exist"); + let (evm, mut result) = inner.finish()?; + + // Receipts already have globally-correct cumulative_gas_used (fixed + // up in commit_transaction). Add the offset to the totals so they + // reflect gas across all segments. + let last_segment_gas = result.gas_used; + result.gas_used += self.gas_used_offset; + result.blob_gas_used += self.blob_gas_used_offset; + + let last_receipt_cumulative = + result.receipts.last().map(|r| r.cumulative_gas_used).unwrap_or(0); + debug!( + target: "engine::bb::evm", + last_segment_gas, + gas_used_offset = self.gas_used_offset, + total_gas_used = result.gas_used, + last_receipt_cumulative, + receipt_count = result.receipts.len(), + "Finished final segment" + ); + + // Merge requests accumulated from earlier segment boundaries into + // the final result. + if !self.accumulated_requests.is_empty() { + let mut merged = self.accumulated_requests; + merged.extend(result.requests); + result.requests = merged; + } + + Ok((evm, result)) + } + + fn set_state_hook(&mut self, hook: Option>) { + if self.plan.is_some() { + // Store the real hook in the shared slot and give the inner + // executor a lightweight forwarder. This way the hook survives + // inner executor finish/reconstruct cycles at segment boundaries. + *self.shared_hook.lock().unwrap() = hook; + let fwd = self.forwarding_hook(); + self.inner_mut().set_state_hook(fwd); + } else { + self.inner_mut().set_state_hook(hook); + } + } + + fn evm_mut(&mut self) -> &mut Self::Evm { + self.inner_mut().evm_mut() + } + + fn evm(&self) -> &Self::Evm { + self.inner().evm() + } + + fn receipts(&self) -> &[Self::Receipt] { + self.inner().receipts() + } +} + +// --------------------------------------------------------------------------- +// BbBlockExecutorFactory +// --------------------------------------------------------------------------- + +/// Block executor factory that produces [`BbBlockExecutor`] for +/// boundary-aware big-block execution. +#[derive(Debug, Clone)] +pub struct BbBlockExecutorFactory { + receipt_builder: RethReceiptBuilder, + spec: Spec, + evm_factory: EthEvmFactory, + /// Staged plan consumed by the next [`BbBlockExecutor`]. + pub(crate) staged_plan: Arc>>, +} + +impl BbBlockExecutorFactory { + pub fn new( + receipt_builder: RethReceiptBuilder, + spec: Spec, + evm_factory: EthEvmFactory, + ) -> Self { + Self { receipt_builder, spec, evm_factory, staged_plan: Arc::new(Mutex::new(None)) } + } + + pub const fn evm_factory(&self) -> &EthEvmFactory { + &self.evm_factory + } + + pub const fn spec(&self) -> &Spec { + &self.spec + } + + pub const fn receipt_builder(&self) -> &RethReceiptBuilder { + &self.receipt_builder + } + + pub(crate) fn stage_plan(&self, plan: BbEvmPlan) { + *self.staged_plan.lock().unwrap() = Some(plan); + } + + fn take_plan(&self) -> Option { + self.staged_plan.lock().unwrap().take() + } + + pub(crate) fn create_executor_with_seeder<'a, DB, I>( + &'a self, + evm: EthEvm, + ctx: EthBlockExecutionCtx<'a>, + block_hash_seeder: Option>, + ) -> BbBlockExecutor<'a, DB, I, PrecompilesMap, &'a Spec> + where + Spec: alloy_evm::eth::spec::EthExecutorSpec, + DB: StateDB + 'a, + I: Inspector> + 'a, + { + let plan = self.take_plan(); + BbBlockExecutor::new(evm, ctx, &self.spec, self.receipt_builder, plan, block_hash_seeder) + } +} + +impl BlockExecutorFactory for BbBlockExecutorFactory +where + Spec: alloy_evm::eth::spec::EthExecutorSpec + 'static, + TxEnv: FromRecoveredTx + FromTxWithEncoded, +{ + type EvmFactory = EthEvmFactory; + type ExecutionCtx<'a> = EthBlockExecutionCtx<'a>; + type Transaction = TransactionSigned; + type Receipt = Receipt; + + fn evm_factory(&self) -> &Self::EvmFactory { + &self.evm_factory + } + + fn create_executor<'a, DB, I>( + &'a self, + evm: EthEvm, + ctx: EthBlockExecutionCtx<'a>, + ) -> impl BlockExecutorFor<'a, Self, DB, I> + where + DB: StateDB + 'a, + I: Inspector> + 'a, + { + let plan = self.take_plan(); + BbBlockExecutor::new(evm, ctx, &self.spec, self.receipt_builder, plan, None) + } +} diff --git a/bin/reth-bb/src/evm_config.rs b/bin/reth-bb/src/evm_config.rs new file mode 100644 index 00000000000..86bd695726e --- /dev/null +++ b/bin/reth-bb/src/evm_config.rs @@ -0,0 +1,291 @@ +//! Big-block EVM configuration. +//! +//! Wraps [`EthEvmConfig`] to create executors that handle multi-segment +//! big-block execution internally. At transaction boundaries defined by +//! [`BigBlockData`], the executor swaps the EVM environment (block env, +//! cfg env) and applies pre/post execution changes for each segment. + +pub(crate) use reth_engine_primitives::BigBlockData; + +use crate::{ + evm::{BbBlockExecutorFactory, BbEvmPlan}, + BigBlockMap, +}; +use alloy_consensus::Header; +use alloy_evm::eth::EthBlockExecutionCtx; +use alloy_primitives::B256; +use alloy_rpc_types::engine::ExecutionData; +use core::convert::Infallible; +use reth_chainspec::{ChainSpec, EthChainSpec}; +use reth_ethereum_forks::Hardforks; +use reth_ethereum_primitives::EthPrimitives; +use reth_evm::{ + ConfigureEngineEvm, ConfigureEvm, Database, EvmEnv, ExecutableTxIterator, + NextBlockEnvAttributes, +}; +use reth_evm_ethereum::{EthBlockAssembler, EthEvmConfig, RethReceiptBuilder}; +use reth_primitives_traits::{SealedBlock, SealedHeader}; +use revm::primitives::hardfork::SpecId; +use std::sync::Arc; +use tracing::debug; + +use alloy_evm::{eth::spec::EthExecutorSpec, EthEvmFactory}; +use reth_evm::{EvmEnvFor, ExecutionCtxFor}; + +// --------------------------------------------------------------------------- +// Execution plan types +// --------------------------------------------------------------------------- + +/// A single execution segment within a big block. +#[derive(Debug, Clone)] +pub(crate) struct BigBlockSegment { + /// Transaction index at which this segment starts. + pub start_tx: usize, + /// The EVM environment for this segment. + pub evm_env: EvmEnv, + /// The execution context for this segment. + pub ctx: EthBlockExecutionCtx<'static>, +} + +// --------------------------------------------------------------------------- +// BbEvmConfig +// --------------------------------------------------------------------------- + +/// EVM configuration for big-block execution. +/// +/// Wraps [`EthEvmConfig`] and a shared [`BigBlockMap`]. When a big-block +/// payload is received, the plan is staged on the [`BbBlockExecutorFactory`] +/// and consumed when the executor is created. Block hashes for inter-segment +/// BLOCKHASH resolution are reseeded into `State::block_hashes` at each +/// segment boundary via a [`BlockHashSeeder`](crate::evm::BlockHashSeeder) +/// callback injected in [`ConfigureEvm::create_executor`]. +#[derive(Debug, Clone)] +pub struct BbEvmConfig { + /// The inner Ethereum EVM configuration (used for env computation). + pub inner: EthEvmConfig, + /// Shared map of pending big-block metadata. + pub pending: BigBlockMap, + /// Block executor factory for big-block execution. + executor_factory: BbBlockExecutorFactory>, + /// Block assembler. + block_assembler: EthBlockAssembler, +} + +impl BbEvmConfig { + /// Creates a new big-block EVM configuration. + pub fn new(inner: EthEvmConfig, pending: BigBlockMap) -> Self + where + C: Clone, + { + let chain_spec = inner.chain_spec().clone(); + let executor_factory = BbBlockExecutorFactory::new( + RethReceiptBuilder::default(), + chain_spec, + EthEvmFactory::default(), + ); + let block_assembler = inner.block_assembler.clone(); + + Self { inner, pending, executor_factory, block_assembler } + } +} + +// --------------------------------------------------------------------------- +// Block hash seeder for State +// --------------------------------------------------------------------------- + +/// Reseeds `State::block_hashes` with the given hashes. +/// +/// This is used as a [`BlockHashSeeder`](crate::evm::BlockHashSeeder) callback, +/// injected into [`BbBlockExecutor`](crate::evm::BbBlockExecutor) from +/// `ConfigureEvm::create_executor` where the concrete `State` type is known. +/// At each segment boundary the executor calls this to populate the ring buffer +/// with the 256 block hashes relevant to the new segment's block number window. +fn seed_state_block_hashes(state: &mut &mut revm::database::State, hashes: &[(u64, B256)]) { + for &(number, hash) in hashes { + state.block_hashes.insert(number, hash); + } +} + +// --------------------------------------------------------------------------- +// ConfigureEvm +// --------------------------------------------------------------------------- + +impl ConfigureEvm for BbEvmConfig +where + C: EthExecutorSpec + EthChainSpec
+ Hardforks + 'static, +{ + type Primitives = EthPrimitives; + type Error = Infallible; + type NextBlockEnvCtx = NextBlockEnvAttributes; + type BlockExecutorFactory = BbBlockExecutorFactory>; + type BlockAssembler = EthBlockAssembler; + + fn block_executor_factory(&self) -> &Self::BlockExecutorFactory { + &self.executor_factory + } + + fn block_assembler(&self) -> &Self::BlockAssembler { + &self.block_assembler + } + + fn evm_env(&self, header: &Header) -> Result, Self::Error> { + self.inner.evm_env(header) + } + + fn next_evm_env( + &self, + parent: &Header, + attributes: &NextBlockEnvAttributes, + ) -> Result { + self.inner.next_evm_env(parent, attributes) + } + + fn context_for_block<'a>( + &self, + block: &'a SealedBlock, + ) -> Result, Self::Error> { + self.inner.context_for_block(block) + } + + fn context_for_next_block( + &self, + parent: &SealedHeader, + attributes: Self::NextBlockEnvCtx, + ) -> Result, Self::Error> { + self.inner.context_for_next_block(parent, attributes) + } + + fn create_executor<'a, DB, I>( + &'a self, + evm: reth_evm::EvmFor, I>, + ctx: EthBlockExecutionCtx<'a>, + ) -> impl alloy_evm::block::BlockExecutorFor< + 'a, + Self::BlockExecutorFactory, + &'a mut revm::database::State, + I, + > + where + DB: Database, + I: reth_evm::InspectorFor> + 'a, + { + // Use create_executor_with_seeder to inject a concrete seeder that + // can reseed State::block_hashes at segment boundaries. The seeder + // is a function pointer that knows the concrete State type, + // allowing the generic BbBlockExecutor to reseed without additional + // trait bounds on DB. + self.executor_factory.create_executor_with_seeder( + evm, + ctx, + Some(seed_state_block_hashes::), + ) + } +} + +// --------------------------------------------------------------------------- +// ConfigureEngineEvm — intercepts payload methods for big blocks +// --------------------------------------------------------------------------- + +impl ConfigureEngineEvm for BbEvmConfig +where + C: EthExecutorSpec + EthChainSpec
+ Hardforks + 'static, +{ + fn evm_env_for_payload(&self, payload: &ExecutionData) -> Result, Self::Error> { + let payload_hash = payload.block_hash(); + let has_plan = self.pending.lock().unwrap().contains_key(&payload_hash); + + if has_plan { + // Compute the env from the first segment BEFORE removing the + // entry (stage_plan_for_payload removes it). + let first_exec_data = { + let pending = self.pending.lock().unwrap(); + let bb_data = pending.get(&payload_hash).unwrap(); + bb_data.env_switches[0].1.clone() + }; + let mut env = self.inner.evm_env_for_payload(&first_exec_data)?; + + // Disable basefee validation: transactions from different + // original blocks may have gas prices below the big block's + // effective basefee. + env.cfg_env.disable_base_fee = true; + + // Now stage the plan on the factory (removes the entry). + self.stage_plan_for_payload(&payload_hash); + + Ok(env) + } else { + self.inner.evm_env_for_payload(payload) + } + } + + fn context_for_payload<'a>( + &self, + payload: &'a ExecutionData, + ) -> Result, Self::Error> { + self.inner.context_for_payload(payload) + } + + fn tx_iterator_for_payload( + &self, + payload: &ExecutionData, + ) -> Result, Self::Error> { + self.inner.tx_iterator_for_payload(payload) + } +} + +// --------------------------------------------------------------------------- +// Plan construction and staging +// --------------------------------------------------------------------------- + +impl BbEvmConfig +where + C: EthExecutorSpec + EthChainSpec
+ Hardforks + 'static, +{ + /// Takes the big-block plan for a payload hash, builds a [`BbEvmPlan`], + /// and stages it on the factory. + /// + /// Must be called before `evm_with_env` is invoked for this payload. + /// In practice, this is called from `evm_env_for_payload` in the + /// engine pipeline. + pub fn stage_plan_for_payload(&self, payload_hash: &B256) { + let bb = match self.pending.lock().unwrap().remove(payload_hash) { + Some(bb) => bb, + None => return, + }; + + let segments: Vec<_> = bb + .env_switches + .into_iter() + .map(|(start_tx, exec_data)| { + let evm_env = self.inner.evm_env_for_payload(&exec_data).unwrap(); + let ctx = self.inner.context_for_payload(&exec_data).unwrap(); + let ctx = EthBlockExecutionCtx { + tx_count_hint: ctx.tx_count_hint, + parent_hash: ctx.parent_hash, + parent_beacon_block_root: ctx.parent_beacon_block_root, + ommers: &[], + withdrawals: ctx.withdrawals.map(|w| std::borrow::Cow::Owned(w.into_owned())), + extra_data: ctx.extra_data, + }; + BigBlockSegment { start_tx, evm_env, ctx } + }) + .collect(); + + debug!( + target: "engine::bb", + ?payload_hash, + segments = segments.len(), + seed_hashes = bb.prior_block_hashes.len(), + "Staging multi-segment plan" + ); + + let mut plan = BbEvmPlan::new(segments); + + // Add prior block hashes to the seeding list. + plan.block_hashes_to_seed.extend(bb.prior_block_hashes); + + plan.block_hashes_to_seed.sort_unstable_by_key(|(n, _)| *n); + + self.executor_factory.stage_plan(plan); + } +} diff --git a/bin/reth-bb/src/main.rs b/bin/reth-bb/src/main.rs new file mode 100644 index 00000000000..783786f3a77 --- /dev/null +++ b/bin/reth-bb/src/main.rs @@ -0,0 +1,374 @@ +//! reth-bb: a modified reth node for benchmarking big block execution. +#![allow(missing_docs)] + +#[global_allocator] +static ALLOC: reth_cli_util::allocator::Allocator = reth_cli_util::allocator::new_allocator(); + +mod evm; +mod evm_config; + +use alloy_primitives::B256; + +use alloy_rpc_types::engine::{ExecutionData, ForkchoiceState, ForkchoiceUpdated}; +use async_trait::async_trait; +use clap::Parser; +use evm_config::{BbEvmConfig, BigBlockData}; +use jsonrpsee::core::RpcResult; +use reth_chainspec::{ChainSpec, EthChainSpec, EthereumHardforks, Hardforks}; +use reth_engine_primitives::ConsensusEngineHandle; +use reth_ethereum_cli::{chainspec::EthereumChainSpecParser, interface::Cli}; +use reth_ethereum_consensus::EthBeaconConsensus; +use reth_ethereum_primitives::EthPrimitives; +use reth_evm_ethereum::EthEvmConfig; +use reth_node_api::{AddOnsContext, FullNodeComponents, NodeTypes, PayloadTypes}; +use reth_node_builder::{ + components::{ + BasicPayloadServiceBuilder, ComponentsBuilder, ConsensusBuilder, ExecutorBuilder, + }, + node::FullNodeTypes, + rpc::{ + BasicEngineApiBuilder, BasicEngineValidatorBuilder, EngineApiBuilder, EngineValidatorAddOn, + EngineValidatorBuilder, PayloadValidatorBuilder, RethRpcAddOns, RpcAddOns, RpcHandle, + RpcHooks, + }, + BuilderContext, Node, +}; +use reth_node_ethereum::{ + EthEngineTypes, EthereumEngineValidatorBuilder, EthereumEthApiBuilder, EthereumNetworkBuilder, + EthereumNode, EthereumPayloadBuilder, EthereumPoolBuilder, +}; +use reth_payload_primitives::ExecutionPayload; +use reth_primitives_traits::SealedBlock; +use reth_provider::EthStorage; +use reth_rpc_api::{RethNewPayloadInput, RethPayloadStatus}; +use reth_rpc_engine_api::EngineApiError; +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, +}; +use tracing::{info, trace}; + +/// Shared map for big block data, keyed by payload hash. +pub type BigBlockMap = Arc>>>; + +// --------------------------------------------------------------------------- +// Custom RPC trait for big-block payloads +// --------------------------------------------------------------------------- + +/// Big-block extension of the `reth_` engine API. +#[jsonrpsee::proc_macros::rpc(server, namespace = "reth")] +pub trait BbRethEngineApi { + /// `reth_newPayload` with optional big-block data. + #[method(name = "newPayload")] + async fn reth_new_payload( + &self, + payload: RethNewPayloadInput, + wait_for_persistence: Option, + wait_for_caches: Option, + big_block_data: Option>, + ) -> RpcResult; + + /// `reth_forkchoiceUpdated` – pass-through. + #[method(name = "forkchoiceUpdated")] + async fn reth_forkchoice_updated( + &self, + forkchoice_state: ForkchoiceState, + ) -> RpcResult; +} + +/// Server-side implementation of `BbRethEngineApi`. +#[derive(Debug)] +struct BbRethEngineApiHandler { + pending: BigBlockMap, + engine: ConsensusEngineHandle, +} + +#[async_trait] +impl BbRethEngineApiServer for BbRethEngineApiHandler { + async fn reth_new_payload( + &self, + input: RethNewPayloadInput, + wait_for_persistence: Option, + wait_for_caches: Option, + big_block_data: Option>, + ) -> RpcResult { + let wait_for_persistence = wait_for_persistence.unwrap_or(true); + let wait_for_caches = wait_for_caches.unwrap_or(true); + trace!( + target: "rpc::engine", + wait_for_persistence, + wait_for_caches, + has_big_block_data = big_block_data.is_some(), + "Serving bb reth_newPayload" + ); + + let payload = match input { + RethNewPayloadInput::ExecutionData(data) => data, + RethNewPayloadInput::BlockRlp(rlp) => { + let block = alloy_rlp::Decodable::decode(&mut rlp.as_ref()) + .map_err(|err| EngineApiError::Internal(Box::new(err)))?; + ::block_to_payload(SealedBlock::new_unhashed(block)) + } + }; + + if let Some(data) = big_block_data { + let hash = ExecutionPayload::block_hash(&payload); + self.pending.lock().unwrap().insert(hash, data); + } + + let (status, timings) = self + .engine + .reth_new_payload(payload, wait_for_persistence, wait_for_caches) + .await + .map_err(EngineApiError::from)?; + + Ok(RethPayloadStatus { + status, + latency_us: timings.latency.as_micros() as u64, + persistence_wait_us: timings.persistence_wait.map(|d| d.as_micros() as u64), + execution_cache_wait_us: timings.execution_cache_wait.map(|d| d.as_micros() as u64), + sparse_trie_wait_us: timings.sparse_trie_wait.map(|d| d.as_micros() as u64), + }) + } + + async fn reth_forkchoice_updated( + &self, + forkchoice_state: ForkchoiceState, + ) -> RpcResult { + trace!(target: "rpc::engine", "Serving reth_forkchoiceUpdated"); + self.engine + .fork_choice_updated(forkchoice_state, None) + .await + .map_err(|e| EngineApiError::from(e).into()) + } +} + +// --------------------------------------------------------------------------- +// Node add-ons wrapper +// --------------------------------------------------------------------------- + +/// Add-ons for the big-block node. +#[derive(Debug)] +pub struct BbAddOns { + pending: BigBlockMap, +} + +impl BbAddOns { + const fn new(pending: BigBlockMap) -> Self { + Self { pending } + } + + fn make_rpc_add_ons( + &self, + ) -> RpcAddOns< + N, + EthereumEthApiBuilder, + EthereumEngineValidatorBuilder, + BasicEngineApiBuilder, + BasicEngineValidatorBuilder, + > + where + EthereumEthApiBuilder: reth_node_builder::rpc::EthApiBuilder, + { + RpcAddOns::new( + EthereumEthApiBuilder::default(), + EthereumEngineValidatorBuilder::default(), + BasicEngineApiBuilder::default(), + BasicEngineValidatorBuilder::default(), + Default::default(), + ) + } +} + +impl reth_node_api::NodeAddOns for BbAddOns +where + N: FullNodeComponents< + Types: NodeTypes< + ChainSpec: EthereumHardforks + Hardforks + Clone + 'static, + Payload = EthEngineTypes, + Primitives = EthPrimitives, + >, + >, + EthereumEthApiBuilder: reth_node_builder::rpc::EthApiBuilder, + EthereumEngineValidatorBuilder: PayloadValidatorBuilder, + BasicEngineApiBuilder: EngineApiBuilder, + BasicEngineValidatorBuilder: EngineValidatorBuilder, +{ + type Handle = + RpcHandle>::EthApi>; + + async fn launch_add_ons(self, ctx: AddOnsContext<'_, N>) -> eyre::Result { + let engine_handle = ctx.beacon_engine_handle.clone(); + let pending = self.pending.clone(); + let rpc_add_ons = self.make_rpc_add_ons::(); + + rpc_add_ons + .launch_add_ons_with(ctx, move |container| { + let handler = BbRethEngineApiHandler { pending, engine: engine_handle }; + let bb_module = BbRethEngineApiServer::into_rpc(handler); + container.auth_module.replace_auth_methods(bb_module.remove_context())?; + Ok(()) + }) + .await + } +} + +impl RethRpcAddOns for BbAddOns +where + N: FullNodeComponents< + Types: NodeTypes< + ChainSpec: EthereumHardforks + Hardforks + Clone + 'static, + Payload = EthEngineTypes, + Primitives = EthPrimitives, + >, + >, + EthereumEthApiBuilder: reth_node_builder::rpc::EthApiBuilder, + EthereumEngineValidatorBuilder: PayloadValidatorBuilder, + BasicEngineApiBuilder: EngineApiBuilder, + BasicEngineValidatorBuilder: EngineValidatorBuilder, +{ + type EthApi = >::EthApi; + + fn hooks_mut(&mut self) -> &mut RpcHooks { + unimplemented!("BbAddOns does not support dynamic hook mutation") + } +} + +impl EngineValidatorAddOn for BbAddOns +where + N: FullNodeComponents, + BasicEngineValidatorBuilder: EngineValidatorBuilder, +{ + type ValidatorBuilder = BasicEngineValidatorBuilder; + + fn engine_validator_builder(&self) -> Self::ValidatorBuilder { + BasicEngineValidatorBuilder::default() + } +} + +// --------------------------------------------------------------------------- +// Custom executor builder +// --------------------------------------------------------------------------- + +/// Executor builder that creates a [`BbEvmConfig`]. +#[derive(Debug)] +pub struct BbExecutorBuilder { + pending: BigBlockMap, +} + +impl ExecutorBuilder for BbExecutorBuilder +where + Node: FullNodeTypes< + Types: NodeTypes< + ChainSpec: reth_ethereum_forks::Hardforks + + alloy_evm::eth::spec::EthExecutorSpec + + EthereumHardforks, + Primitives = EthPrimitives, + >, + >, +{ + type EVM = BbEvmConfig<::ChainSpec>; + + async fn build_evm(self, ctx: &BuilderContext) -> eyre::Result { + Ok(BbEvmConfig::new(EthEvmConfig::new(ctx.chain_spec()), self.pending)) + } +} + +// --------------------------------------------------------------------------- +// Node type +// --------------------------------------------------------------------------- + +/// Node type for big block execution. +#[derive(Debug, Clone)] +pub struct BbNode { + pending: BigBlockMap, +} + +impl BbNode { + const fn new(pending: BigBlockMap) -> Self { + Self { pending } + } +} + +impl NodeTypes for BbNode { + type Primitives = EthPrimitives; + type ChainSpec = ChainSpec; + type Storage = EthStorage; + type Payload = EthEngineTypes; +} + +impl Node for BbNode +where + N: FullNodeTypes, +{ + type ComponentsBuilder = ComponentsBuilder< + N, + EthereumPoolBuilder, + BasicPayloadServiceBuilder, + EthereumNetworkBuilder, + BbExecutorBuilder, + BbConsensusBuilder, + >; + + type AddOns = BbAddOns; + + fn components_builder(&self) -> Self::ComponentsBuilder { + EthereumNode::components() + .executor(BbExecutorBuilder { pending: self.pending.clone() }) + .consensus(BbConsensusBuilder) + } + + fn add_ons(&self) -> Self::AddOns { + BbAddOns::new(self.pending.clone()) + } +} + +// --------------------------------------------------------------------------- +// Consensus builder +// --------------------------------------------------------------------------- + +/// Consensus builder for big block execution. +#[derive(Debug, Default, Clone, Copy)] +pub struct BbConsensusBuilder; + +impl ConsensusBuilder for BbConsensusBuilder +where + Node: FullNodeTypes< + Types: NodeTypes, + >, +{ + type Consensus = Arc::ChainSpec>>; + + async fn build_consensus(self, ctx: &BuilderContext) -> eyre::Result { + Ok(Arc::new( + EthBeaconConsensus::new(ctx.chain_spec()) + .with_skip_gas_limit_ramp_check(true) + .with_skip_blob_gas_used_check(true) + .with_skip_requests_hash_check(true), + )) + } +} + +// --------------------------------------------------------------------------- +// Main +// --------------------------------------------------------------------------- + +fn main() { + reth_cli_util::sigsegv_handler::install(); + + if std::env::var_os("RUST_BACKTRACE").is_none() { + unsafe { std::env::set_var("RUST_BACKTRACE", "1") }; + } + + let pending: BigBlockMap = Arc::new(Mutex::new(HashMap::new())); + + if let Err(err) = Cli::::parse().run(async move |builder, _| { + info!(target: "reth::cli", "Launching big block node"); + let handle = builder.launch_node(BbNode::new(pending.clone())).await?; + + handle.wait_for_node_exit().await + }) { + eprintln!("Error: {err:?}"); + std::process::exit(1); + } +} diff --git a/bin/reth-bench/Cargo.toml b/bin/reth-bench/Cargo.toml index abcc47f6259..7d2f76ade03 100644 --- a/bin/reth-bench/Cargo.toml +++ b/bin/reth-bench/Cargo.toml @@ -14,9 +14,13 @@ workspace = true [dependencies] # reth +reth-chainspec.workspace = true +reth-cli.workspace = true reth-cli-runner.workspace = true reth-cli-util.workspace = true reth-engine-primitives.workspace = true +reth-ethereum-cli.workspace = true +reth-ethereum-primitives.workspace = true reth-fs-util.workspace = true reth-node-api.workspace = true reth-node-core.workspace = true @@ -26,6 +30,7 @@ reth-rpc-api.workspace = true reth-tracing.workspace = true # alloy +alloy-consensus.workspace = true alloy-eips.workspace = true alloy-json-rpc.workspace = true @@ -79,39 +84,54 @@ default = ["jemalloc"] asm-keccak = [ "reth-node-core/asm-keccak", + "reth-ethereum-cli/asm-keccak", "alloy-primitives/asm-keccak", ] jemalloc = [ "reth-cli-util/jemalloc", "reth-node-core/jemalloc", + "reth-ethereum-cli/jemalloc", +] +jemalloc-prof = [ + "reth-cli-util/jemalloc-prof", + "reth-ethereum-cli/jemalloc-prof", +] +tracy-allocator = [ + "reth-cli-util/tracy-allocator", + "tracy", + "reth-ethereum-cli/tracy-allocator", ] -jemalloc-prof = ["jemalloc", "reth-cli-util/jemalloc-prof"] -tracy-allocator = ["reth-cli-util/tracy-allocator", "tracy"] tracy = [ "reth-node-core/tracy", "reth-tracing/tracy", + "reth-ethereum-cli/tracy", ] min-error-logs = [ "tracing/release_max_level_error", "reth-node-core/min-error-logs", + "reth-ethereum-cli/min-error-logs", ] min-warn-logs = [ "tracing/release_max_level_warn", "reth-node-core/min-warn-logs", + "reth-ethereum-cli/min-warn-logs", ] min-info-logs = [ "tracing/release_max_level_info", "reth-node-core/min-info-logs", + "reth-ethereum-cli/min-info-logs", ] min-debug-logs = [ "tracing/release_max_level_debug", "reth-node-core/min-debug-logs", + "reth-ethereum-cli/min-debug-logs", ] min-trace-logs = [ "tracing/release_max_level_trace", "reth-node-core/min-trace-logs", + "reth-ethereum-cli/min-trace-logs", ] # no-op feature flag for CI matrices diff --git a/bin/reth-bench/src/bench/generate_big_block.rs b/bin/reth-bench/src/bench/generate_big_block.rs index a26116a1410..17f28e86e98 100644 --- a/bin/reth-bench/src/bench/generate_big_block.rs +++ b/bin/reth-bench/src/bench/generate_big_block.rs @@ -1,27 +1,30 @@ -//! Command for generating large blocks by packing transactions from real blocks. +//! Command for generating large blocks by merging transactions from consecutive real blocks. //! -//! This command fetches transactions from existing blocks and packs them into a single -//! large block using the `testing_buildBlockV1` RPC endpoint. - -use crate::{ - authenticated_transport::AuthenticatedTransportConnect, bench::helpers::parse_gas_limit, -}; -use alloy_eips::{BlockNumberOrTag, Typed2718}; -use alloy_primitives::{Bytes, B256}; -use alloy_provider::{ext::EngineApi, network::AnyNetwork, Provider, RootProvider}; +//! This command fetches consecutive blocks from an RPC until a target gas usage is reached, +//! takes block 0 as the "base" payload, concatenates transactions from subsequent blocks, +//! and saves the result to disk as a [`BigBlockPayload`] JSON file containing the merged +//! [`ExecutionData`] and environment switches at each block boundary. + +use alloy_consensus::TxReceipt; +use alloy_eips::{eip1559::BaseFeeParams, eip7840::BlobParams, Typed2718}; +use alloy_primitives::{Bloom, Bytes, B256}; +use alloy_provider::{network::AnyNetwork, Provider, RootProvider}; use alloy_rpc_client::ClientBuilder; use alloy_rpc_types_engine::{ - ExecutionPayloadEnvelopeV4, ExecutionPayloadEnvelopeV5, ForkchoiceState, JwtSecret, - PayloadAttributes, + CancunPayloadFields, ExecutionData, ExecutionPayload, ExecutionPayloadSidecar, + PraguePayloadFields, }; -use alloy_transport::layers::RetryBackoffLayer; use clap::Parser; use eyre::Context; -use reqwest::Url; +use reth_chainspec::EthChainSpec; +use reth_cli::chainspec::ChainSpecParser; use reth_cli_runner::CliContext; -use reth_rpc_api::TestingBuildBlockRequestV1; +use reth_engine_primitives::BigBlockData; +use reth_ethereum_cli::chainspec::EthereumChainSpecParser; +use reth_ethereum_primitives::Receipt; +use reth_primitives_traits::proofs; +use serde::{Deserialize, Serialize}; use std::future::Future; -use tokio::sync::mpsc; use tracing::{info, warn}; /// A single transaction with its gas used and raw encoded bytes. @@ -64,7 +67,7 @@ impl RpcTransactionSource { /// Create from an RPC URL with retry backoff. pub fn from_url(rpc_url: &str) -> eyre::Result { let client = ClientBuilder::default() - .layer(RetryBackoffLayer::new(10, 800, u64::MAX)) + .layer(alloy_transport::layers::RetryBackoffLayer::new(10, 800, u64::MAX)) .http(rpc_url.parse()?); let provider = RootProvider::::new(client); Ok(Self { provider }) @@ -152,7 +155,7 @@ impl TransactionCollector { while total_gas < gas_target { let Some((block_txs, _)) = self.source.fetch_block_transactions(current_block).await? else { - warn!(target: "reth-bench", block = current_block, "Block not found, stopping"); + tracing::warn!(target: "reth-bench", block = current_block, "Block not found, stopping"); break; }; @@ -193,81 +196,6 @@ impl TransactionCollector { } } -/// `reth bench generate-big-block` command -/// -/// Generates a large block by fetching transactions from existing blocks and packing them -/// into a single block using the `testing_buildBlockV1` RPC endpoint. -#[derive(Debug, Parser)] -pub struct Command { - /// The RPC URL to use for fetching blocks (can be an external archive node). - #[arg(long, value_name = "RPC_URL")] - rpc_url: String, - - /// The engine RPC URL (with JWT authentication). - #[arg(long, value_name = "ENGINE_RPC_URL", default_value = "http://localhost:8551")] - engine_rpc_url: String, - - /// The RPC URL for `testing_buildBlockV1` calls (same node as engine, regular RPC port). - #[arg(long, value_name = "TESTING_RPC_URL", default_value = "http://localhost:8545")] - testing_rpc_url: String, - - /// Path to the JWT secret file for engine API authentication. - #[arg(long, value_name = "JWT_SECRET")] - jwt_secret: std::path::PathBuf, - - /// Target gas to pack into the block. - /// Accepts short notation: K for thousand, M for million, G for billion (e.g., 1G = 1 - /// billion). - #[arg(long, value_name = "TARGET_GAS", default_value = "30000000", value_parser = parse_gas_limit)] - target_gas: u64, - - /// Block number to start fetching transactions from (required). - /// - /// This must be the last canonical block BEFORE any gas limit ramping was performed. - /// The command collects transactions from historical blocks starting at this number - /// to pack into large blocks. - /// - /// How to determine this value: - /// - If starting from a fresh node (no gas limit ramp yet): use the current chain tip - /// - If gas limit ramping has already been performed: use the block number that was the chain - /// tip BEFORE ramping began (you must track this yourself) - /// - /// Using a block after ramping started will cause transaction collection to fail - /// because those blocks contain synthetic transactions that cannot be replayed. - #[arg(long, value_name = "FROM_BLOCK")] - from_block: u64, - - /// Execute the payload (call newPayload + forkchoiceUpdated). - /// If false, only builds the payload and prints it. - #[arg(long, default_value = "false")] - execute: bool, - - /// Number of payloads to generate. Each payload uses the previous as parent. - /// When count == 1, the payload is only generated and saved, not executed. - /// When count > 1, each payload is executed before building the next. - #[arg(long, default_value = "1")] - count: u64, - - /// Number of transaction batches to prefetch in background when count > 1. - /// Higher values reduce latency but use more memory. - #[arg(long, default_value = "4")] - prefetch_buffer: usize, - - /// Output directory for generated payloads. Each payload is saved as `payload_block_N.json`. - #[arg(long, value_name = "OUTPUT_DIR")] - output_dir: std::path::PathBuf, -} - -/// A built payload ready for execution. -struct BuiltPayload { - block_number: u64, - envelope: ExecutionPayloadEnvelopeV4, - block_hash: B256, - timestamp: u64, - /// The actual gas used in the built block. - gas_used: u64, -} - /// Result of collecting transactions from blocks. #[derive(Debug)] pub struct CollectionResult { @@ -279,106 +207,75 @@ pub struct CollectionResult { pub next_block: u64, } -/// Constants for retry logic. -const MAX_BUILD_RETRIES: u32 = 5; -/// Maximum retries for fetching a transaction batch. -const MAX_FETCH_RETRIES: u32 = 5; -/// Tolerance: if `gas_used` is within 1M of target, don't retry. -const MIN_TARGET_SLACK: u64 = 1_000_000; -/// Maximum gas to request in retries (10x target as safety cap). -const MAX_ADDITIONAL_GAS_MULTIPLIER: u64 = 10; +/// A merged big block payload with environment switches at block boundaries. +#[derive(Debug, Serialize, Deserialize)] +pub struct BigBlockPayload { + /// The primary execution data with all concatenated transactions. + pub execution_data: ExecutionData, + /// Big block data containing environment switches and prior block hashes. + #[serde(default)] + pub big_block_data: BigBlockData, +} -/// Fetches a batch of transactions with retry logic. +/// `reth bench generate-big-block` command /// -/// Returns `None` if all retries are exhausted. -async fn fetch_batch_with_retry( - collector: &TransactionCollector, - block: u64, -) -> Option { - for attempt in 1..=MAX_FETCH_RETRIES { - match collector.collect(block).await { - Ok(result) => return Some(result), - Err(e) => { - if attempt == MAX_FETCH_RETRIES { - warn!(target: "reth-bench", attempt, error = %e, "Failed to fetch transactions after max retries"); - return None; - } - warn!(target: "reth-bench", attempt, error = %e, "Failed to fetch transactions, retrying..."); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - } - } - } - None -} +/// Generates a large block by fetching consecutive blocks from an RPC, merging their +/// transactions into a single payload, and saving the result to disk. +#[derive(Debug, Parser)] +pub struct Command { + /// The RPC URL to use for fetching blocks. + #[arg(long, value_name = "RPC_URL")] + rpc_url: String, -/// Outcome of a build attempt check. -enum RetryOutcome { - /// Payload is close enough to target gas. - Success, - /// Max retries reached, accept what we have. - MaxRetries, - /// Need more transactions with the specified gas amount. - NeedMore(u64), -} + /// The chain name or path to a chain spec JSON file. + #[arg(long, value_name = "CHAIN", default_value = "mainnet")] + chain: String, -/// Buffer for receiving transaction batches from the fetcher. -/// -/// This abstracts over the channel to allow the main loop to request -/// batches on demand, including for retries. -struct TxBuffer { - receiver: mpsc::Receiver, -} + /// Block number to start from. + #[arg(long, value_name = "FROM_BLOCK")] + from_block: u64, -impl TxBuffer { - const fn new(receiver: mpsc::Receiver) -> Self { - Self { receiver } - } + /// Target gas usage per big block. Consecutive real blocks are merged until + /// this gas target is reached (or exceeded by the last included block). + /// Accepts optional suffixes: K (thousand), M (million), G (billion). + #[arg(long, value_name = "TARGET_GAS", value_parser = super::helpers::parse_gas_limit)] + target_gas: u64, - /// Take the next available batch from the fetcher. - async fn take_batch(&mut self) -> Option { - self.receiver.recv().await - } + /// Number of sequential big blocks to generate. + /// + /// Each big block merges real blocks until `--target-gas` is reached. + /// Sequential big blocks are chained: block N+1's `parent_hash` is set to + /// block N's computed hash. + #[arg(long, value_name = "NUM_BIG_BLOCKS", default_value = "1")] + num_big_blocks: u64, + + /// Output directory for generated payloads. + #[arg(long, value_name = "OUTPUT_DIR")] + output_dir: std::path::PathBuf, } impl Command { - /// Execute the `generate-big-block` command + /// Execute the `generate-big-block` command. pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> { - info!(target: "reth-bench", target_gas = self.target_gas, count = self.count, "Generating big block(s)"); - - // Set up authenticated engine provider - let jwt = - std::fs::read_to_string(&self.jwt_secret).wrap_err("Failed to read JWT secret file")?; - let jwt = JwtSecret::from_hex(jwt.trim())?; - let auth_url = Url::parse(&self.engine_rpc_url)?; - - info!(target: "reth-bench", "Connecting to Engine RPC at {}", auth_url); - let auth_transport = AuthenticatedTransportConnect::new(auth_url.clone(), jwt); - let auth_client = ClientBuilder::default().connect_with(auth_transport).await?; - let auth_provider = RootProvider::::new(auth_client); - - // Set up testing RPC provider (for testing_buildBlockV1) - info!(target: "reth-bench", "Connecting to Testing RPC at {}", self.testing_rpc_url); - let testing_client = ClientBuilder::default() - .layer(RetryBackoffLayer::new(10, 800, u64::MAX)) - .http(self.testing_rpc_url.parse()?); - let testing_provider = RootProvider::::new(testing_client); - - // Get the parent block (latest canonical block) - info!(target: "reth-bench", endpoint = "engine", method = "eth_getBlockByNumber", block = "latest", "RPC call"); - let parent_block = auth_provider - .get_block_by_number(BlockNumberOrTag::Latest) - .await? - .ok_or_else(|| eyre::eyre!("Failed to fetch latest block"))?; - - let parent_hash = parent_block.header.hash; - let parent_number = parent_block.header.number; - let parent_timestamp = parent_block.header.timestamp; + if self.target_gas == 0 { + return Err(eyre::eyre!("--target-gas must be greater than 0")); + } + if self.num_big_blocks == 0 { + return Err(eyre::eyre!("--num-big-blocks must be at least 1")); + } + + // Resolve chain spec for blob params lookup + let chain_spec = EthereumChainSpecParser::parse(&self.chain) + .wrap_err_with(|| format!("Failed to parse chain spec: {}", self.chain))?; info!( target: "reth-bench", - parent_hash = %parent_hash, - parent_number = parent_number, - "Using initial parent block" + from_block = self.from_block, + target_gas = self.target_gas, + num_big_blocks = self.num_big_blocks, + chain = %chain_spec.chain(), + output_dir = %self.output_dir.display(), + "Generating big block payloads" ); // Create output directory @@ -386,466 +283,358 @@ impl Command { format!("Failed to create output directory: {:?}", self.output_dir) })?; - let start_block = self.from_block; - - // Use pipelined execution when generating multiple payloads - if self.count > 1 { - self.execute_pipelined( - &auth_provider, - &testing_provider, - start_block, - parent_hash, - parent_timestamp, - ) - .await?; - } else { - // Single payload - collect transactions and build with retry - let tx_source = RpcTransactionSource::from_url(&self.rpc_url)?; - let collector = TransactionCollector::new(tx_source, self.target_gas); - let result = collector.collect(start_block).await?; - - if result.transactions.is_empty() { - return Err(eyre::eyre!("No transactions collected")); - } - - self.execute_sequential_with_retry( - &auth_provider, - &testing_provider, - &collector, - result, - parent_hash, - parent_timestamp, - ) - .await?; - } - - info!(target: "reth-bench", count = self.count, output_dir = %self.output_dir.display(), "All payloads generated"); - Ok(()) - } - - /// Sequential execution path with retry logic for underfilled payloads. - async fn execute_sequential_with_retry( - &self, - auth_provider: &RootProvider, - testing_provider: &RootProvider, - collector: &TransactionCollector, - initial_result: CollectionResult, - mut parent_hash: B256, - mut parent_timestamp: u64, - ) -> eyre::Result<()> { - let mut current_result = initial_result; - - for i in 0..self.count { - let built = self - .build_with_retry( - testing_provider, - collector, - &mut current_result, - i, - parent_hash, - parent_timestamp, - ) - .await?; - - self.save_payload(&built)?; - - if self.execute || self.count > 1 { - info!(target: "reth-bench", payload = i + 1, block_hash = %built.block_hash, gas_used = built.gas_used, "Executing payload (newPayload + FCU)"); - self.execute_payload_v4(auth_provider, built.envelope, parent_hash).await?; - info!(target: "reth-bench", payload = i + 1, "Payload executed successfully"); - } + // Set up RPC provider + let client = ClientBuilder::default() + .layer(alloy_transport::layers::RetryBackoffLayer::new(10, 800, u64::MAX)) + .http(self.rpc_url.parse()?); + let provider = RootProvider::::new(client); - parent_hash = built.block_hash; - parent_timestamp = built.timestamp; + let mut prev_big_block_hash: Option = None; + let mut accumulated_block_hashes: Vec<(u64, B256)> = Vec::new(); + + // Track previous big block's merged header fields for deriving basefee and + // excess_blob_gas on subsequent big blocks. + struct PrevBigBlockHeader { + gas_used: u64, + gas_limit: u64, + base_fee_per_gas: u64, + blob_gas_used: u64, + excess_blob_gas: u64, } - Ok(()) - } + let mut prev_big_block_header: Option = None; - /// Build a payload with retry logic when `gas_used` is below target. - /// - /// Uses the ratio of `gas_used/gas_sent` to estimate how many more transactions - /// are needed to hit the target gas. - async fn build_with_retry( - &self, - testing_provider: &RootProvider, - collector: &TransactionCollector, - result: &mut CollectionResult, - index: u64, - parent_hash: B256, - parent_timestamp: u64, - ) -> eyre::Result { - for attempt in 1..=MAX_BUILD_RETRIES { - let tx_bytes: Vec = result.transactions.iter().map(|t| t.raw.clone()).collect(); - let gas_sent = result.gas_sent; + // Track the next block to fetch across big blocks so they don't overlap. + let mut next_block = self.from_block; - info!( - target: "reth-bench", - payload = index + 1, - attempt, - tx_count = tx_bytes.len(), - gas_sent, - parent_hash = %parent_hash, - "Building payload via testing_buildBlockV1" - ); + for big_block_idx in 0..self.num_big_blocks { + let range_start = next_block; - let built = Self::build_payload_static( - testing_provider, - &tx_bytes, - index, - parent_hash, - parent_timestamp, - ) - .await?; - - match self.check_retry_outcome(&built, index, attempt, gas_sent) { - RetryOutcome::Success | RetryOutcome::MaxRetries => return Ok(built), - RetryOutcome::NeedMore(additional_gas) => { - let additional = - collector.collect_gas(result.next_block, additional_gas).await?; - result.transactions.extend(additional.transactions); - result.gas_sent = result.gas_sent.saturating_add(additional.gas_sent); - result.next_block = additional.next_block; - } - } - } - - warn!(target: "reth-bench", payload = index + 1, "Retry loop exited without returning a payload"); - Err(eyre::eyre!("build_with_retry exhausted retries without result")) - } - - /// Pipelined execution - fetches transactions in background, builds with retry. - /// - /// The fetcher continuously produces transaction batches. The main loop consumes them, - /// builds payloads with retry logic (requesting more transactions if underfilled), - /// and executes each payload before moving to the next. - async fn execute_pipelined( - &self, - auth_provider: &RootProvider, - testing_provider: &RootProvider, - start_block: u64, - initial_parent_hash: B256, - initial_parent_timestamp: u64, - ) -> eyre::Result<()> { - // Create channel for transaction batches - fetcher sends CollectionResult - let (tx_sender, tx_receiver) = mpsc::channel::(self.prefetch_buffer); - - // Spawn background task to continuously fetch transaction batches - let rpc_url = self.rpc_url.clone(); - let target_gas = self.target_gas; - - let fetcher_handle = tokio::spawn(async move { - let tx_source = match RpcTransactionSource::from_url(&rpc_url) { - Ok(source) => source, - Err(e) => { - warn!(target: "reth-bench", error = %e, "Failed to create transaction source"); - return None; - } - }; + // Fetch consecutive blocks until the gas target is reached. + let mut blocks = Vec::new(); + let mut block_receipts: Vec> = Vec::new(); + let mut accumulated_block_gas: u64 = 0; - let collector = TransactionCollector::new(tx_source, target_gas); - let mut current_block = start_block; + let mut reached_chain_tip = false; + while accumulated_block_gas < self.target_gas { + let block_number = next_block; + info!(target: "reth-bench", block_number, big_block = big_block_idx, "Fetching block"); - while let Some(batch) = fetch_batch_with_retry(&collector, current_block).await { - if batch.transactions.is_empty() { - info!(target: "reth-bench", block = current_block, "Reached chain tip, stopping fetcher"); - break; - } + let fetch_result = tokio::try_join!( + provider.get_block_by_number(block_number.into()).full(), + provider.get_block_receipts(block_number.into()), + ); + let (rpc_block, receipts) = match fetch_result { + Ok((Some(block), Some(receipts))) => (block, receipts), + Ok((None, _) | (_, None)) => { + warn!( + target: "reth-bench", + block_number, + "Block not found — reached chain tip" + ); + reached_chain_tip = true; + break; + } + Err(e) => return Err(e.into()), + }; + + // Convert RPC receipts to consensus receipts + let consensus_receipts: Vec = receipts + .iter() + .map(|r| { + let inner = &r.inner.inner.inner; + let tx_type = r.inner.inner.r#type.try_into().unwrap_or_default(); + Receipt { + tx_type, + success: inner.receipt.status.coerce_status(), + cumulative_gas_used: inner.receipt.cumulative_gas_used, + logs: inner + .receipt + .logs + .iter() + .map(|log| alloy_primitives::Log { + address: log.inner.address, + data: log.inner.data.clone(), + }) + .collect(), + } + }) + .collect(); + + // Convert to consensus block + let block = rpc_block + .into_inner() + .map_header(|header| header.map(|h| h.into_header_with_defaults())) + .try_map_transactions(|tx| { + tx.try_into_either::() + })? + .into_consensus(); + + // Convert to ExecutionData + let (payload, sidecar) = ExecutionPayload::from_block_slow(&block); + let execution_data = ExecutionData { payload, sidecar }; + + let block_gas = execution_data.payload.as_v1().gas_used; info!( target: "reth-bench", - tx_count = batch.transactions.len(), - gas_sent = batch.gas_sent, - blocks = format!("{}..{}", current_block, batch.next_block), - "Fetched transaction batch" + block_number, + gas_used = block_gas, + tx_count = execution_data.payload.transactions().len(), + receipts = consensus_receipts.len(), + "Fetched block" ); - current_block = batch.next_block; - if tx_sender.send(batch).await.is_err() { - break; - } + accumulated_block_gas += block_gas; + blocks.push(execution_data); + block_receipts.push(consensus_receipts); + next_block += 1; } - Some(current_block) - }); - - // Transaction buffer: holds transactions from batches + any extras from retries - let mut tx_buffer = TxBuffer::new(tx_receiver); - - let mut parent_hash = initial_parent_hash; - let mut parent_timestamp = initial_parent_timestamp; - - for i in 0..self.count { - // Get initial batch of transactions for this payload - let Some(mut result) = tx_buffer.take_batch().await else { - info!( + // If we hit the chain tip without fetching any blocks, stop generating. + if blocks.is_empty() { + warn!( target: "reth-bench", - payloads_built = i, - payloads_requested = self.count, - "Transaction source exhausted, stopping" - ); - break; - }; - - if result.transactions.is_empty() { - info!( - target: "reth-bench", - payloads_built = i, - payloads_requested = self.count, - "No more transactions available, stopping" + big_block = big_block_idx, + requested = self.num_big_blocks, + "No blocks available, stopping generation early" ); break; } - // Build with retry - may need to request more transactions - let built = self - .build_with_retry_buffered( - testing_provider, - &mut tx_buffer, - &mut result, - i, - parent_hash, - parent_timestamp, - ) - .await?; - - self.save_payload(&built)?; - - let current_block_hash = built.block_hash; - let current_timestamp = built.timestamp; - - // Execute payload - info!(target: "reth-bench", payload = i + 1, block_hash = %current_block_hash, gas_used = built.gas_used, "Executing payload (newPayload + FCU)"); - self.execute_payload_v4(auth_provider, built.envelope, parent_hash).await?; - info!(target: "reth-bench", payload = i + 1, "Payload executed successfully"); - - parent_hash = current_block_hash; - parent_timestamp = current_timestamp; - } - - // Clean up the fetcher task - drop(tx_buffer); - let _ = fetcher_handle.await; + // Block 0 is the base + let mut base = blocks.remove(0); + let base_receipts = block_receipts.remove(0); + let mut env_switches = Vec::new(); + + // Accumulate all receipts with corrected cumulative_gas_used. + // Each block's receipts have cumulative gas relative to that block; + // we add the prior blocks' total gas to make them globally correct. + let mut all_receipts: Vec = Vec::new(); + let mut cumulative_gas_offset: u64 = 0; + { + // Base block receipts (block 0) — no offset needed + let base_block_gas = base.payload.as_v1().gas_used; + all_receipts.extend(base_receipts.into_iter().map(|mut r| { + r.cumulative_gas_used += cumulative_gas_offset; + r + })); + cumulative_gas_offset += base_block_gas; + } - Ok(()) - } + if !blocks.is_empty() { + // Store the original unmutated base block as env_switch at index 0. + // This preserves the real gas_limit, basefee, etc. for segment 0's + // EVM environment, which would otherwise be lost when we mutate the + // base payload header below. + env_switches.push((0, base.clone())); + + let mut cumulative_tx_count = base.payload.transactions().len(); + + // Collect state from the last block for header fields + let last = blocks.last().unwrap(); + let last_v1 = last.payload.as_v1(); + let final_state_root = last_v1.state_root; + + let mut total_gas_used = base.payload.as_v1().gas_used; + let mut total_gas_limit = base.payload.as_v1().gas_limit; + + // Concatenate transactions from subsequent blocks and build env_switches + for (block_data, receipts) in blocks.into_iter().zip(block_receipts) { + let block_v1 = block_data.payload.as_v1(); + let block_gas = block_v1.gas_used; + total_gas_used += block_gas; + total_gas_limit += block_v1.gas_limit; + + // Accumulate receipts with corrected cumulative_gas_used + all_receipts.extend(receipts.into_iter().map(|mut r| { + r.cumulative_gas_used += cumulative_gas_offset; + r + })); + cumulative_gas_offset += block_gas; + + // Record environment switch at this block boundary + env_switches.push((cumulative_tx_count, block_data.clone())); + + // Append this block's transactions to the base payload + let txs = block_data.payload.transactions().clone(); + cumulative_tx_count += txs.len(); + base.payload.transactions_mut().extend(txs); + } - /// Build a payload with retry logic, using the buffered transaction source. - async fn build_with_retry_buffered( - &self, - testing_provider: &RootProvider, - tx_buffer: &mut TxBuffer, - result: &mut CollectionResult, - index: u64, - parent_hash: B256, - parent_timestamp: u64, - ) -> eyre::Result { - for attempt in 1..=MAX_BUILD_RETRIES { - let tx_bytes: Vec = result.transactions.iter().map(|t| t.raw.clone()).collect(); - let gas_sent = result.gas_sent; + // Compute merged receipts_root and logs_bloom from all accumulated + // receipts (with globally-correct cumulative_gas_used). + let receipts_with_bloom: Vec<_> = + all_receipts.iter().map(|r| r.with_bloom_ref()).collect(); + let merged_receipts_root = proofs::calculate_receipt_root(&receipts_with_bloom); + let merged_logs_bloom = + receipts_with_bloom.iter().fold(Bloom::ZERO, |bloom, r| bloom | *r.bloom_ref()); + + // Mutate the base payload header + let base_v1 = base.payload.as_v1_mut(); + base_v1.state_root = final_state_root; + base_v1.gas_used = total_gas_used; + base_v1.gas_limit = total_gas_limit; + base_v1.receipts_root = merged_receipts_root; + base_v1.logs_bloom = merged_logs_bloom; + } - info!( - target: "reth-bench", - payload = index + 1, - attempt, - tx_count = tx_bytes.len(), - gas_sent, - parent_hash = %parent_hash, - "Building payload via testing_buildBlockV1" - ); + // Chain sequential big blocks: set parent_hash, block_number, basefee, + // and excess_blob_gas for sequential continuity. The engine validates + // each big block against its parent, so these fields must be + // derivable from the previous big block's merged header. + if let Some(prev_hash) = prev_big_block_hash { + base.payload.as_v1_mut().parent_hash = prev_hash; + // First big block keeps its original block number (from_block). + // Subsequent big blocks increment from there. + base.payload.as_v1_mut().block_number = self.from_block + big_block_idx; + } + if let Some(prev) = &prev_big_block_header { + // Derive basefee from the previous big block's merged header using + // the standard EIP-1559 formula so validate_against_parent_eip1559_base_fee passes. + let next_base_fee = alloy_eips::calc_next_block_base_fee( + prev.gas_used, + prev.gas_limit, + prev.base_fee_per_gas, + BaseFeeParams::ethereum(), + ); + base.payload.as_v1_mut().base_fee_per_gas = + alloy_primitives::U256::from(next_base_fee); + + // Derive excess_blob_gas from the previous big block's merged header + // so validate_against_parent_4844 passes. + let timestamp = base.payload.as_v1().timestamp; + let blob_params = chain_spec + .blob_params_at_timestamp(timestamp) + .unwrap_or_else(BlobParams::cancun); + let next_excess_blob_gas = blob_params.next_block_excess_blob_gas_osaka( + prev.excess_blob_gas, + prev.blob_gas_used, + prev.base_fee_per_gas, + ); + if let Some(v3) = base.payload.as_v3_mut() { + v3.excess_blob_gas = next_excess_blob_gas; + } + } - let built = Self::build_payload_static( - testing_provider, - &tx_bytes, - index, - parent_hash, - parent_timestamp, - ) - .await?; - - match self.check_retry_outcome(&built, index, attempt, gas_sent) { - RetryOutcome::Success | RetryOutcome::MaxRetries => return Ok(built), - RetryOutcome::NeedMore(additional_gas) => { - let mut collected_gas = 0u64; - while collected_gas < additional_gas { - if let Some(batch) = tx_buffer.take_batch().await { - collected_gas += batch.gas_sent; - result.transactions.extend(batch.transactions); - result.gas_sent = result.gas_sent.saturating_add(batch.gas_sent); - result.next_block = batch.next_block; - } else { - warn!(target: "reth-bench", "Transaction fetcher exhausted, proceeding with available transactions"); - break; - } + // Merge blob data from all constituent blocks: sum blob_gas_used + // and concatenate versioned hashes so the sidecar matches the blob + // transactions in the merged payload body. + { + let mut all_versioned_hashes: Vec = + base.sidecar.cancun().map(|c| c.versioned_hashes.clone()).unwrap_or_default(); + let mut total_blob_gas = + base.payload.as_v3().map(|v3| v3.blob_gas_used).unwrap_or(0); + // Skip env_switch[0] (base block clone) to avoid double-counting + for (_, switch_data) in env_switches.iter().skip(1) { + if let Some(cancun) = switch_data.sidecar.cancun() { + all_versioned_hashes.extend_from_slice(&cancun.versioned_hashes); + } + if let Some(v3) = switch_data.payload.as_v3() { + total_blob_gas += v3.blob_gas_used; } } + if let Some(v3) = base.payload.as_v3_mut() { + v3.blob_gas_used = total_blob_gas; + } + let cancun = base.sidecar.cancun().map(|c| CancunPayloadFields { + versioned_hashes: all_versioned_hashes, + parent_beacon_block_root: c.parent_beacon_block_root, + }); + // For merged blocks, set an empty requests hash in the Prague sidecar. + // The correct requests_hash cannot be computed from RPC data alone + // (raw execution layer requests are not exposed via eth_getBlockByNumber). + // Use --testing.skip-requests-hash-check when validating big block payloads. + let prague = base + .sidecar + .prague() + .map(|_| PraguePayloadFields::new(alloy_eips::eip7685::Requests::default())); + base.sidecar = match (cancun, prague) { + (Some(c), Some(p)) => ExecutionPayloadSidecar::v4(c, p), + (Some(c), None) => ExecutionPayloadSidecar::v3(c), + _ => ExecutionPayloadSidecar::none(), + }; } - } - - warn!(target: "reth-bench", payload = index + 1, "Retry loop exited without returning a payload"); - Err(eyre::eyre!("build_with_retry_buffered exhausted retries without result")) - } - /// Determines the outcome of a build attempt. - fn check_retry_outcome( - &self, - built: &BuiltPayload, - index: u64, - attempt: u32, - gas_sent: u64, - ) -> RetryOutcome { - let gas_used = built.gas_used; - - if gas_used + MIN_TARGET_SLACK >= self.target_gas { - info!( - target: "reth-bench", - payload = index + 1, - gas_used, - target_gas = self.target_gas, - attempts = attempt, - "Payload built successfully" - ); - return RetryOutcome::Success; - } + // Compute the real block hash from the mutated payload + let block_hash = compute_payload_block_hash(&base)?; + base.payload.as_v1_mut().block_hash = block_hash; + prev_big_block_hash = Some(block_hash); + + // Record this big block's merged header fields so the next big block + // can derive its basefee and excess_blob_gas correctly. + { + let v1 = base.payload.as_v1(); + prev_big_block_header = Some(PrevBigBlockHeader { + gas_used: v1.gas_used, + gas_limit: v1.gas_limit, + base_fee_per_gas: v1.base_fee_per_gas.to::(), + blob_gas_used: base.payload.as_v3().map(|v3| v3.blob_gas_used).unwrap_or(0), + excess_blob_gas: base.payload.as_v3().map(|v3| v3.excess_blob_gas).unwrap_or(0), + }); + } - if attempt == MAX_BUILD_RETRIES { - warn!( - target: "reth-bench", - payload = index + 1, - gas_used, - target_gas = self.target_gas, - gas_sent, - "Underfilled after max retries, accepting payload" - ); - return RetryOutcome::MaxRetries; - } + let big_block = BigBlockPayload { + execution_data: base, + big_block_data: BigBlockData { + env_switches, + prior_block_hashes: accumulated_block_hashes.clone(), + }, + }; - if gas_used == 0 { - warn!( - target: "reth-bench", - payload = index + 1, - "Zero gas used in payload, requesting fixed chunk of additional transactions" - ); - return RetryOutcome::NeedMore(self.target_gas); - } + // Accumulate real block hashes from this big block's env_switches for + // subsequent big blocks' BLOCKHASH lookups. Cap at 256 entries since the + // BLOCKHASH opcode only looks back 256 blocks. + for (_, switch_data) in &big_block.big_block_data.env_switches { + let block_number = switch_data.payload.as_v1().block_number; + let block_hash = switch_data.payload.as_v1().block_hash; + accumulated_block_hashes.push((block_number, block_hash)); + } + if accumulated_block_hashes.len() > 256 { + let excess = accumulated_block_hashes.len() - 256; + accumulated_block_hashes.drain(..excess); + } - let gas_sent_needed_total = - (self.target_gas as u128 * gas_sent as u128).div_ceil(gas_used as u128) as u64; - let additional = gas_sent_needed_total.saturating_sub(gas_sent); - let additional = additional.min(self.target_gas * MAX_ADDITIONAL_GAS_MULTIPLIER); + // Save to disk + let range_end = next_block - 1; + let filename = format!("big_block_{range_start}_to_{range_end}.json"); + let filepath = self.output_dir.join(&filename); + let json = serde_json::to_string_pretty(&big_block)?; + std::fs::write(&filepath, &json) + .wrap_err_with(|| format!("Failed to write payload to {:?}", filepath))?; - if additional == 0 { info!( target: "reth-bench", - payload = index + 1, - gas_used, - target_gas = self.target_gas, - "No additional transactions needed based on ratio" + path = %filepath.display(), + block_hash = %block_hash, + total_txs = big_block.execution_data.payload.transactions().len(), + total_gas_used = big_block.execution_data.payload.as_v1().gas_used, + env_switches = big_block.big_block_data.env_switches.len(), + prior_block_hashes = big_block.big_block_data.prior_block_hashes.len(), + "Big block payload saved" ); - return RetryOutcome::Success; - } - - let ratio = gas_used as f64 / gas_sent as f64; - info!( - target: "reth-bench", - payload = index + 1, - gas_used, - gas_sent, - ratio = format!("{:.4}", ratio), - additional_gas = additional, - "Underfilled, collecting more transactions for retry" - ); - RetryOutcome::NeedMore(additional) - } - - /// Build a single payload via `testing_buildBlockV1`. - async fn build_payload_static( - testing_provider: &RootProvider, - transactions: &[Bytes], - index: u64, - parent_hash: B256, - parent_timestamp: u64, - ) -> eyre::Result { - let request = TestingBuildBlockRequestV1 { - parent_block_hash: parent_hash, - payload_attributes: PayloadAttributes { - timestamp: parent_timestamp + 12, - prev_randao: B256::ZERO, - suggested_fee_recipient: alloy_primitives::Address::ZERO, - withdrawals: Some(vec![]), - parent_beacon_block_root: Some(B256::ZERO), - }, - transactions: transactions.to_vec(), - extra_data: None, - }; - - let total_tx_bytes: usize = transactions.iter().map(|tx| tx.len()).sum(); - info!( - target: "reth-bench", - payload = index + 1, - tx_count = transactions.len(), - total_tx_bytes = total_tx_bytes, - parent_hash = %parent_hash, - "Sending to testing_buildBlockV1" - ); - let envelope: ExecutionPayloadEnvelopeV5 = - testing_provider.client().request("testing_buildBlockV1", [request]).await?; - - let v4_envelope = envelope.try_into_v4()?; - - let inner = &v4_envelope.envelope_inner.execution_payload.payload_inner.payload_inner; - let block_hash = inner.block_hash; - let block_number = inner.block_number; - let timestamp = inner.timestamp; - let gas_used = inner.gas_used; - - Ok(BuiltPayload { block_number, envelope: v4_envelope, block_hash, timestamp, gas_used }) - } - /// Save a payload to disk. - fn save_payload(&self, payload: &BuiltPayload) -> eyre::Result<()> { - let filename = format!("payload_block_{}.json", payload.block_number); - let filepath = self.output_dir.join(&filename); - let json = serde_json::to_string_pretty(&payload.envelope)?; - std::fs::write(&filepath, &json) - .wrap_err_with(|| format!("Failed to write payload to {:?}", filepath))?; - info!(target: "reth-bench", block_number = payload.block_number, block_hash = %payload.block_hash, path = %filepath.display(), "Payload saved"); - Ok(()) - } - - async fn execute_payload_v4( - &self, - provider: &RootProvider, - envelope: ExecutionPayloadEnvelopeV4, - parent_hash: B256, - ) -> eyre::Result<()> { - let block_hash = - envelope.envelope_inner.execution_payload.payload_inner.payload_inner.block_hash; - - let status = provider - .new_payload_v4( - envelope.envelope_inner.execution_payload, - vec![], - B256::ZERO, - envelope.execution_requests.to_vec(), - ) - .await?; - - if !status.is_valid() { - return Err(eyre::eyre!("Payload rejected: {:?}", status)); - } - - let fcu_state = ForkchoiceState { - head_block_hash: block_hash, - safe_block_hash: parent_hash, - finalized_block_hash: parent_hash, - }; - - let fcu_result = provider.fork_choice_updated_v3(fcu_state, None).await?; - - if !fcu_result.is_valid() { - return Err(eyre::eyre!("FCU rejected: {:?}", fcu_result)); + if reached_chain_tip { + warn!( + target: "reth-bench", + generated = big_block_idx + 1, + requested = self.num_big_blocks, + "Reached chain tip, stopping generation early" + ); + break; + } } Ok(()) } } + +/// Computes the block hash for an [`ExecutionData`] by converting it to a raw block +/// and hashing the header. +pub fn compute_payload_block_hash(data: &ExecutionData) -> eyre::Result { + let block = data + .payload + .clone() + .into_block_with_sidecar_raw(&data.sidecar) + .wrap_err("failed to convert payload to block for hash computation")?; + Ok(block.header.hash_slow()) +} diff --git a/bin/reth-bench/src/bench/mod.rs b/bin/reth-bench/src/bench/mod.rs index 1f171d85a68..79e21d5fca9 100644 --- a/bin/reth-bench/src/bench/mod.rs +++ b/bin/reth-bench/src/bench/mod.rs @@ -9,7 +9,8 @@ mod context; mod generate_big_block; pub(crate) mod helpers; pub use generate_big_block::{ - RawTransaction, RpcTransactionSource, TransactionCollector, TransactionSource, + compute_payload_block_hash, BigBlockPayload, RawTransaction, RpcTransactionSource, + TransactionCollector, TransactionSource, }; pub(crate) mod metrics_scraper; mod new_payload_fcu; @@ -50,16 +51,16 @@ pub enum Subcommands { /// --jwt-secret $(cat ~/.local/share/reth/mainnet/jwt.hex)` SendPayload(send_payload::Command), - /// Generate a large block by packing transactions from existing blocks. + /// Generate a large block by merging consecutive blocks from an RPC. /// - /// This command fetches transactions from real blocks and packs them into a single - /// block using the `testing_buildBlockV1` RPC endpoint. + /// Fetches N consecutive blocks, takes block 0 as the base payload, concatenates + /// transactions from blocks 1..N-1, and saves the result to disk as a JSON file + /// containing the merged execution data and environment switches at block boundaries. /// /// Example: /// - /// `reth-bench generate-big-block --rpc-url http://localhost:8545 --engine-rpc-url - /// http://localhost:8551 --jwt-secret ~/.local/share/reth/mainnet/jwt.hex --target-gas - /// 30000000` + /// `reth-bench generate-big-block --rpc-url http://localhost:8545 --from-block 20000000 + /// --count 10 --output-dir ./payloads` GenerateBigBlock(generate_big_block::Command), /// Replay pre-generated payloads from a directory. diff --git a/bin/reth-bench/src/bench/replay_payloads.rs b/bin/reth-bench/src/bench/replay_payloads.rs index 9a7960beb0c..c6905d6ff18 100644 --- a/bin/reth-bench/src/bench/replay_payloads.rs +++ b/bin/reth-bench/src/bench/replay_payloads.rs @@ -3,6 +3,7 @@ use crate::{ authenticated_transport::AuthenticatedTransportConnect, bench::{ + generate_big_block::BigBlockPayload, helpers::parse_duration, metrics_scraper::MetricsScraper, output::{ @@ -21,6 +22,7 @@ use alloy_rpc_types_engine::{ use clap::Parser; use eyre::Context; use reth_cli_runner::CliContext; +use reth_engine_primitives::BigBlockData; use reth_node_api::EngineApiMessageVersion; use reth_node_core::args::WaitForPersistence; use reth_rpc_api::RethNewPayloadInput; @@ -58,8 +60,7 @@ pub struct Command { #[arg(long, value_name = "SKIP", default_value = "0")] skip: usize, - /// Deprecated: gas ramp is no longer needed. Use `--testing.skip-gas-limit-ramp-check` - /// and `--testing.gas-limit` on the reth node instead. This flag is accepted but ignored. + /// Deprecated: gas ramp is no longer needed. This flag is accepted but ignored. #[arg(long, value_name = "GAS_RAMP_DIR", hide = true)] gas_ramp_dir: Option, @@ -119,10 +120,12 @@ pub struct Command { struct LoadedPayload { /// The index (from filename). index: u64, - /// The payload envelope. - envelope: ExecutionPayloadEnvelopeV4, + /// The execution data for the block. + execution_data: ExecutionData, /// The block hash. block_hash: B256, + /// Big block data containing environment switches and prior block hashes. + big_block_data: BigBlockData, } impl Command { @@ -171,8 +174,7 @@ impl Command { if self.gas_ramp_dir.is_some() { warn!( target: "reth-bench", - "--gas-ramp-dir is deprecated and ignored. Use --testing.skip-gas-limit-ramp-check \ - and --testing.gas-limit on the reth node instead." + "--gas-ramp-dir is deprecated and ignored." ); } @@ -183,22 +185,33 @@ impl Command { } info!(target: "reth-bench", count = payloads.len(), "Loaded main payloads from disk"); + // If any payload has env_switches but we're not using reth_newPayload, warn the user + if !self.reth_new_payload { + let has_env_switches = + payloads.iter().any(|p| !p.big_block_data.env_switches.is_empty()); + if has_env_switches { + warn!( + target: "reth-bench", + "Payloads contain env_switches but --reth-new-payload is not set. \ + env_switches are only supported with reth_newPayload and will be ignored." + ); + } + } + let mut parent_hash = initial_parent_hash; let mut results = Vec::new(); let total_benchmark_duration = Instant::now(); for (i, payload) in payloads.iter().enumerate() { - let envelope = &payload.envelope; + let execution_data = &payload.execution_data; let block_hash = payload.block_hash; - let execution_payload = &envelope.envelope_inner.execution_payload; - let inner_payload = &execution_payload.payload_inner.payload_inner; + let v1 = execution_data.payload.as_v1(); - let gas_used = inner_payload.gas_used; - let gas_limit = inner_payload.gas_limit; - let block_number = inner_payload.block_number; - let transaction_count = - execution_payload.payload_inner.payload_inner.transactions.len() as u64; + let gas_used = v1.gas_used; + let gas_limit = v1.gas_limit; + let block_number = v1.block_number; + let transaction_count = v1.transactions.len() as u64; debug!( target: "reth-bench", @@ -218,40 +231,37 @@ impl Command { "Sending newPayload" ); - let use_reth = self.reth_new_payload; - let (version, params) = if use_reth { + let (version, params) = if self.reth_new_payload { + let big_block_data_param = if payload.big_block_data.env_switches.is_empty() && + payload.big_block_data.prior_block_hashes.is_empty() + { + None + } else { + Some(payload.big_block_data.clone()) + }; let wait_for_persistence = self .wait_for_persistence .unwrap_or(WaitForPersistence::Never) .rpc_value(block_number); - let reth_data = ExecutionData { - payload: execution_payload.clone().into(), - sidecar: ExecutionPayloadSidecar::v4( - CancunPayloadFields { - versioned_hashes: Vec::new(), - parent_beacon_block_root: B256::ZERO, - }, - PraguePayloadFields { - requests: envelope.execution_requests.clone().into(), - }, - ), - }; ( None, serde_json::to_value(( - RethNewPayloadInput::ExecutionData(reth_data), + RethNewPayloadInput::ExecutionData(execution_data.clone()), wait_for_persistence, self.no_wait_for_caches.then_some(false), + big_block_data_param, ))?, ) } else { + let requests = + execution_data.sidecar.requests().cloned().unwrap_or_default().to_vec(); ( Some(EngineApiMessageVersion::V4), serde_json::to_value(( - execution_payload.clone(), + execution_data.payload.clone(), Vec::::new(), B256::ZERO, - envelope.execution_requests.to_vec(), + requests, ))?, ) }; @@ -307,6 +317,10 @@ impl Command { tracing::warn!(target: "reth-bench", %err, block_number, "Failed to scrape metrics"); } + if let Some(wait_time) = self.wait_time { + tokio::time::sleep(wait_time).await; + } + let gas_row = TotalGasRow { block_number, transaction_count, gas_used, time: current_duration }; results.push((gas_row, combined_result)); @@ -342,28 +356,42 @@ impl Command { } /// Load and parse all payload files from the directory. + /// + /// Tries to load each file as a [`BigBlockPayload`] first (which includes `env_switches`), + /// falling back to [`ExecutionPayloadEnvelopeV4`] for backwards compatibility. fn load_payloads(&self) -> eyre::Result> { let mut payloads = Vec::new(); - // Read directory entries + // Read directory entries — match both legacy "payload_block_*.json" and new + // "big_block_*.json" formats let entries: Vec<_> = std::fs::read_dir(&self.payload_dir) .wrap_err_with(|| format!("Failed to read directory {:?}", self.payload_dir))? .filter_map(|e| e.ok()) .filter(|e| { + let name = e.file_name(); + let name_str = name.to_string_lossy(); e.path().extension().and_then(|s| s.to_str()) == Some("json") && - e.file_name().to_string_lossy().starts_with("payload_block_") + (name_str.starts_with("payload_block_") || + name_str.starts_with("big_block_")) }) .collect(); - // Parse filenames to get indices and sort + // Parse filenames to get indices and sort. + // Supports "payload_block_N.json" and "big_block_FROM_to_TO.json" naming. let mut indexed_paths: Vec<(u64, PathBuf)> = entries .into_iter() .filter_map(|e| { let name = e.file_name(); let name_str = name.to_string_lossy(); - // Extract index from "payload_NNN.json" - let index_str = name_str.strip_prefix("payload_block_")?.strip_suffix(".json")?; - let index: u64 = index_str.parse().ok()?; + let index = if let Some(rest) = name_str.strip_prefix("payload_block_") { + rest.strip_suffix(".json")?.parse::().ok()? + } else if let Some(rest) = name_str.strip_prefix("big_block_") { + // "big_block_FROM_to_TO.json" — use FROM as the index + let rest = rest.strip_suffix(".json")?; + rest.split("_to_").next()?.parse::().ok()? + } else { + return None; + }; Some((index, e.path())) }) .collect(); @@ -381,21 +409,42 @@ impl Command { for (index, path) in indexed_paths { let content = std::fs::read_to_string(&path) .wrap_err_with(|| format!("Failed to read {:?}", path))?; - let envelope: ExecutionPayloadEnvelopeV4 = serde_json::from_str(&content) - .wrap_err_with(|| format!("Failed to parse {:?}", path))?; - let block_hash = - envelope.envelope_inner.execution_payload.payload_inner.payload_inner.block_hash; + // Try BigBlockPayload first, then fall back to legacy ExecutionPayloadEnvelopeV4 + let (execution_data, big_block_data) = + if let Ok(big_block) = serde_json::from_str::(&content) { + (big_block.execution_data, big_block.big_block_data) + } else { + let envelope: ExecutionPayloadEnvelopeV4 = serde_json::from_str(&content) + .wrap_err_with(|| format!("Failed to parse {:?}", path))?; + let execution_data = ExecutionData { + payload: envelope.envelope_inner.execution_payload.clone().into(), + sidecar: ExecutionPayloadSidecar::v4( + CancunPayloadFields { + versioned_hashes: Vec::new(), + parent_beacon_block_root: B256::ZERO, + }, + PraguePayloadFields { + requests: envelope.execution_requests.clone().into(), + }, + ), + }; + (execution_data, BigBlockData::default()) + }; + + let block_hash = execution_data.payload.as_v1().block_hash; debug!( target: "reth-bench", index = index, block_hash = %block_hash, + env_switches = big_block_data.env_switches.len(), + prior_block_hashes = big_block_data.prior_block_hashes.len(), path = %path.display(), "Loaded payload" ); - payloads.push(LoadedPayload { index, envelope, block_hash }); + payloads.push(LoadedPayload { index, execution_data, block_hash, big_block_data }); } Ok(payloads) diff --git a/crates/engine/primitives/src/message.rs b/crates/engine/primitives/src/message.rs index 46d5ba6ad0a..425626a8560 100644 --- a/crates/engine/primitives/src/message.rs +++ b/crates/engine/primitives/src/message.rs @@ -162,6 +162,30 @@ pub struct NewPayloadTimings { pub sparse_trie_wait: Option, } +/// Additional data for big block payloads that merge multiple real blocks. +/// +/// This is used by the `reth_newPayload` endpoint to pass environment switches +/// and prior block hashes needed for correct multi-segment execution. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct BigBlockData { + /// Environment switches at block boundaries. + /// Each entry is `(cumulative_tx_count, execution_data_of_next_block)`. + /// + /// The first entry at index 0 represents the **original unmutated** base block's + /// `ExecutionData`, which must be used to derive the initial EVM environment. + pub env_switches: Vec<(usize, ExecutionData)>, + /// Block number → real block hash for blocks covered by previous big blocks in a sequence. + /// When replaying chained big blocks, the BLOCKHASH opcode needs real hashes for blocks + /// that were merged into earlier big blocks (and thus not individually persisted). + pub prior_block_hashes: Vec<(u64, alloy_primitives::B256)>, +} + +impl Default for BigBlockData { + fn default() -> Self { + Self { env_switches: Vec::new(), prior_block_hashes: Vec::new() } + } +} + /// A message for the beacon engine from other components of the node (engine RPC API invoked by the /// consensus layer). #[derive(Debug)] diff --git a/crates/ethereum/consensus/src/lib.rs b/crates/ethereum/consensus/src/lib.rs index a0ab5081c53..d33ee4249b2 100644 --- a/crates/ethereum/consensus/src/lib.rs +++ b/crates/ethereum/consensus/src/lib.rs @@ -45,6 +45,10 @@ pub struct EthBeaconConsensus { max_extra_data_size: usize, /// When true, skips the gas limit change validation between parent and child blocks. skip_gas_limit_ramp_check: bool, + /// When true, skips the blob gas used check in header validation. + skip_blob_gas_used_check: bool, + /// When true, skips the requests hash check in post-execution validation. + skip_requests_hash_check: bool, } impl EthBeaconConsensus { @@ -54,6 +58,8 @@ impl EthBeaconConsensus chain_spec, max_extra_data_size: MAXIMUM_EXTRA_DATA_SIZE, skip_gas_limit_ramp_check: false, + skip_blob_gas_used_check: false, + skip_requests_hash_check: false, } } @@ -74,6 +80,18 @@ impl EthBeaconConsensus self } + /// Disables the blob gas used check in header validation. + pub const fn with_skip_blob_gas_used_check(mut self, skip: bool) -> Self { + self.skip_blob_gas_used_check = skip; + self + } + + /// Disables the requests hash check in post-execution validation. + pub const fn with_skip_requests_hash_check(mut self, skip: bool) -> Self { + self.skip_requests_hash_check = skip; + self + } + /// Returns the chain spec associated with this consensus engine. pub const fn chain_spec(&self) -> &Arc { &self.chain_spec @@ -91,13 +109,21 @@ where result: &BlockExecutionResult, receipt_root_bloom: Option, ) -> Result<(), ConsensusError> { - validate_block_post_execution( + let res = validate_block_post_execution( block, &self.chain_spec, &result.receipts, &result.requests, receipt_root_bloom, - ) + ); + + if self.skip_requests_hash_check && + let Err(ConsensusError::BodyRequestsHashDiff(_)) = &res + { + return Ok(()); + } + + res } } @@ -183,12 +209,14 @@ where // Ensures that EIP-4844 fields are valid once cancun is active. if self.chain_spec.is_cancun_active_at_timestamp(header.timestamp()) { - validate_4844_header_standalone( - header, - self.chain_spec - .blob_params_at_timestamp(header.timestamp()) - .unwrap_or_else(BlobParams::cancun), - )?; + if !self.skip_blob_gas_used_check { + validate_4844_header_standalone( + header, + self.chain_spec + .blob_params_at_timestamp(header.timestamp()) + .unwrap_or_else(BlobParams::cancun), + )?; + } } else if header.blob_gas_used().is_some() { return Err(ConsensusError::BlobGasUsedUnexpected) } else if header.excess_blob_gas().is_some() { diff --git a/crates/ethereum/node/src/node.rs b/crates/ethereum/node/src/node.rs index 95c8683d8ad..3ceb320bdb1 100644 --- a/crates/ethereum/node/src/node.rs +++ b/crates/ethereum/node/src/node.rs @@ -562,10 +562,7 @@ where type Consensus = Arc::ChainSpec>>; async fn build_consensus(self, ctx: &BuilderContext) -> eyre::Result { - Ok(Arc::new( - EthBeaconConsensus::new(ctx.chain_spec()) - .with_skip_gas_limit_ramp_check(ctx.config().rpc.testing_skip_gas_limit_ramp_check), - )) + Ok(Arc::new(EthBeaconConsensus::new(ctx.chain_spec()))) } } diff --git a/crates/node/core/src/args/rpc_server.rs b/crates/node/core/src/args/rpc_server.rs index 71d005e3197..cdd4c681129 100644 --- a/crates/node/core/src/args/rpc_server.rs +++ b/crates/node/core/src/args/rpc_server.rs @@ -648,14 +648,6 @@ pub struct RpcServerArgs { #[arg(long = "testing.skip-invalid-transactions", default_value_t = true)] pub testing_skip_invalid_transactions: bool, - /// Skip the 1/1024 gas limit change restriction between parent and child blocks. - /// - /// When enabled, consensus will not enforce the gas limit bound divisor check, - /// allowing blocks to jump to an arbitrary gas limit without ramping up over - /// thousands of empty blocks. - #[arg(long = "testing.skip-gas-limit-ramp-check", default_value_t = false, hide = true)] - pub testing_skip_gas_limit_ramp_check: bool, - /// Override the gas limit used by `testing_buildBlockV1`. /// /// When set, `testing_buildBlockV1` will use this value instead of inheriting @@ -902,7 +894,6 @@ impl Default for RpcServerArgs { gas_price_oracle, rpc_send_raw_transaction_sync_timeout, testing_skip_invalid_transactions: true, - testing_skip_gas_limit_ramp_check: false, testing_gas_limit: None, rpc_force_blob_sidecar_upcasting: false, } @@ -1081,7 +1072,6 @@ mod tests { }, rpc_send_raw_transaction_sync_timeout: std::time::Duration::from_secs(30), testing_skip_invalid_transactions: true, - testing_skip_gas_limit_ramp_check: false, testing_gas_limit: None, rpc_force_blob_sidecar_upcasting: false, };