Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions crates/payload/builder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ reth-ethereum-engine-primitives.workspace = true

# alloy
alloy-consensus.workspace = true
alloy-primitives = { workspace = true, optional = true }
alloy-primitives.workspace = true
alloy-rpc-types = { workspace = true, features = ["engine"] }

# async
Expand All @@ -37,13 +37,10 @@ metrics.workspace = true
tracing.workspace = true

[dev-dependencies]
alloy-primitives.workspace = true

tokio = { workspace = true, features = ["sync", "rt"] }

[features]
test-utils = [
"alloy-primitives",
"reth-chain-state/test-utils",
"reth-primitives-traits/test-utils",
"tokio/rt",
Expand Down
27 changes: 27 additions & 0 deletions crates/payload/builder/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::{
PayloadJob,
};
use alloy_consensus::BlockHeader;
use alloy_primitives::BlockTimestamp;
use alloy_rpc_types::engine::PayloadId;
use futures_util::{future::FutureExt, Stream, StreamExt};
use reth_chain_state::CanonStateNotification;
Expand All @@ -24,6 +25,7 @@ use std::{
use tokio::sync::{
broadcast, mpsc,
oneshot::{self, Receiver},
watch,
};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{debug, info, trace, warn};
Expand Down Expand Up @@ -218,6 +220,11 @@ where
chain_events: St,
/// Payload events handler, used to broadcast and subscribe to payload events.
payload_events: broadcast::Sender<Events<T>>,
/// We retain latest resolved payload just to make sure that we can handle repeating
/// requests for it gracefully.
cached_payload_rx: watch::Receiver<Option<(PayloadId, BlockTimestamp, T::BuiltPayload)>>,
/// Sender half of the cached payload channel.
cached_payload_tx: watch::Sender<Option<(PayloadId, BlockTimestamp, T::BuiltPayload)>>,
}

const PAYLOAD_EVENTS_BUFFER_SIZE: usize = 20;
Expand All @@ -241,6 +248,8 @@ where
let (service_tx, command_rx) = mpsc::unbounded_channel();
let (payload_events, _) = broadcast::channel(PAYLOAD_EVENTS_BUFFER_SIZE);

let (cached_payload_tx, cached_payload_rx) = watch::channel(None);

let service = Self {
generator,
payload_jobs: Vec::new(),
Expand All @@ -249,6 +258,8 @@ where
metrics: Default::default(),
chain_events,
payload_events,
cached_payload_rx,
cached_payload_tx,
};

let handle = service.handle();
Expand Down Expand Up @@ -294,8 +305,15 @@ where
) -> Option<PayloadFuture<T::BuiltPayload>> {
debug!(target: "payload_builder", %id, "resolving payload job");

if let Some((cached, _, payload)) = &*self.cached_payload_rx.borrow() {
if *cached == id {
return Some(Box::pin(core::future::ready(Ok(payload.clone()))));
}
}

let job = self.payload_jobs.iter().position(|(_, job_id)| *job_id == id)?;
let (fut, keep_alive) = self.payload_jobs[job].0.resolve_kind(kind);
let payload_timestamp = self.payload_jobs[job].0.payload_timestamp();

if keep_alive == KeepPayloadJobAlive::No {
let (_, id) = self.payload_jobs.swap_remove(job);
Expand All @@ -306,6 +324,7 @@ where
// the future in a new future that will update the metrics.
let resolved_metrics = self.metrics.clone();
let payload_events = self.payload_events.clone();
let cached_payload_tx = self.cached_payload_tx.clone();

let fut = async move {
let res = fut.await;
Expand All @@ -314,6 +333,10 @@ where
payload_events.send(Events::BuiltPayload(payload.clone().into())).ok();
}

if let Ok(timestamp) = payload_timestamp {
let _ = cached_payload_tx.send(Some((id, timestamp, payload.clone().into())));
}

resolved_metrics
.set_resolved_revenue(payload.block().number(), f64::from(payload.fees()));
}
Expand All @@ -333,6 +356,10 @@ where
{
/// Returns the payload timestamp for the given payload.
fn payload_timestamp(&self, id: PayloadId) -> Option<Result<u64, PayloadBuilderError>> {
if let Some((_, timestamp, _)) = *self.cached_payload_rx.borrow() {
return Some(Ok(timestamp));
}

let timestamp = self
.payload_jobs
.iter()
Expand Down
Loading