From d437f73017c2b2035c3e4d3ff81e61943f6fecce Mon Sep 17 00:00:00 2001 From: Arsenii Kulikov Date: Mon, 8 Sep 2025 15:54:58 +0300 Subject: [PATCH 1/2] feat: cache latest built payload --- crates/payload/builder/src/service.rs | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/crates/payload/builder/src/service.rs b/crates/payload/builder/src/service.rs index 27f6bb61284..3f05772c32b 100644 --- a/crates/payload/builder/src/service.rs +++ b/crates/payload/builder/src/service.rs @@ -8,6 +8,7 @@ use crate::{ PayloadJob, }; use alloy_consensus::BlockHeader; +use alloy_primitives::BlockTimestamp; use alloy_rpc_types::engine::PayloadId; use futures_util::{future::FutureExt, Stream, StreamExt}; use reth_chain_state::CanonStateNotification; @@ -24,6 +25,7 @@ use std::{ use tokio::sync::{ broadcast, mpsc, oneshot::{self, Receiver}, + watch, }; use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::{debug, info, trace, warn}; @@ -218,6 +220,11 @@ where chain_events: St, /// Payload events handler, used to broadcast and subscribe to payload events. payload_events: broadcast::Sender>, + /// We retain latest resolved payload just to make sure that we can handle repeating + /// requests for it gracefully. + cached_payload_rx: watch::Receiver>, + /// Sender half of the cached payload channel. + cached_payload_tx: watch::Sender>, } const PAYLOAD_EVENTS_BUFFER_SIZE: usize = 20; @@ -241,6 +248,8 @@ where let (service_tx, command_rx) = mpsc::unbounded_channel(); let (payload_events, _) = broadcast::channel(PAYLOAD_EVENTS_BUFFER_SIZE); + let (cached_payload_tx, cached_payload_rx) = watch::channel(None); + let service = Self { generator, payload_jobs: Vec::new(), @@ -249,6 +258,8 @@ where metrics: Default::default(), chain_events, payload_events, + cached_payload_rx, + cached_payload_tx, }; let handle = service.handle(); @@ -294,8 +305,15 @@ where ) -> Option> { debug!(target: "payload_builder", %id, "resolving payload job"); + if let Some((cached, _, payload)) = &*self.cached_payload_rx.borrow() { + if *cached == id { + return Some(Box::pin(core::future::ready(Ok(payload.clone())))); + } + } + let job = self.payload_jobs.iter().position(|(_, job_id)| *job_id == id)?; let (fut, keep_alive) = self.payload_jobs[job].0.resolve_kind(kind); + let payload_timestamp = self.payload_jobs[job].0.payload_timestamp(); if keep_alive == KeepPayloadJobAlive::No { let (_, id) = self.payload_jobs.swap_remove(job); @@ -306,6 +324,7 @@ where // the future in a new future that will update the metrics. let resolved_metrics = self.metrics.clone(); let payload_events = self.payload_events.clone(); + let cached_payload_tx = self.cached_payload_tx.clone(); let fut = async move { let res = fut.await; @@ -314,6 +333,10 @@ where payload_events.send(Events::BuiltPayload(payload.clone().into())).ok(); } + if let Ok(timestamp) = payload_timestamp { + let _ = cached_payload_tx.send(Some((id, timestamp, payload.clone().into()))); + } + resolved_metrics .set_resolved_revenue(payload.block().number(), f64::from(payload.fees())); } @@ -333,6 +356,10 @@ where { /// Returns the payload timestamp for the given payload. fn payload_timestamp(&self, id: PayloadId) -> Option> { + if let Some((_, timestamp, _)) = *self.cached_payload_rx.borrow() { + return Some(Ok(timestamp)); + } + let timestamp = self .payload_jobs .iter() From f330a03a513bfa436fc9182fbcf5bc961457804e Mon Sep 17 00:00:00 2001 From: Arsenii Kulikov Date: Mon, 8 Sep 2025 16:02:21 +0300 Subject: [PATCH 2/2] fix --- crates/payload/builder/Cargo.toml | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/crates/payload/builder/Cargo.toml b/crates/payload/builder/Cargo.toml index 222af0a664d..166c538f7a1 100644 --- a/crates/payload/builder/Cargo.toml +++ b/crates/payload/builder/Cargo.toml @@ -21,7 +21,7 @@ reth-ethereum-engine-primitives.workspace = true # alloy alloy-consensus.workspace = true -alloy-primitives = { workspace = true, optional = true } +alloy-primitives.workspace = true alloy-rpc-types = { workspace = true, features = ["engine"] } # async @@ -37,13 +37,10 @@ metrics.workspace = true tracing.workspace = true [dev-dependencies] -alloy-primitives.workspace = true - tokio = { workspace = true, features = ["sync", "rt"] } [features] test-utils = [ - "alloy-primitives", "reth-chain-state/test-utils", "reth-primitives-traits/test-utils", "tokio/rt",