Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions crates/engine/primitives/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,6 @@ pub enum BeaconEngineMessage<Payload: PayloadTypes> {
payload: Payload::ExecutionData,
/// The sender for returning payload status result.
tx: oneshot::Sender<Result<PayloadStatus, BeaconOnNewPayloadError>>,
/// When this message was enqueued, used to measure backpressure wait time.
enqueued_at: Instant,
},
/// Message with new payload used by `reth_newPayload` endpoint.
///
Expand Down Expand Up @@ -290,11 +288,7 @@ where
payload: Payload::ExecutionData,
) -> Result<PayloadStatus, BeaconOnNewPayloadError> {
let (tx, rx) = oneshot::channel();
let _ = self.to_engine.send(BeaconEngineMessage::NewPayload {
payload,
tx,
enqueued_at: Instant::now(),
});
let _ = self.to_engine.send(BeaconEngineMessage::NewPayload { payload, tx });
rx.await.map_err(|_| BeaconOnNewPayloadError::EngineUnavailable)?
}

Expand Down
10 changes: 5 additions & 5 deletions crates/engine/tree/src/tree/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,9 +436,9 @@ impl NewPayloadStatusMetrics {
latest_forkchoice_updated_at: &mut Option<Instant>,
result: &Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError>,
gas_used: u64,
latency: Duration,
) {
let finish = Instant::now();
let elapsed = finish - start;

if let Some(prev_finish) = self.latest_finish_at {
self.time_between_new_payloads.record(start - prev_finish);
Expand All @@ -455,13 +455,13 @@ impl NewPayloadStatusMetrics {
if !outcome.already_seen {
self.new_payload_total_gas.record(gas_used as f64);
self.new_payload_total_gas_last.set(gas_used as f64);
let gas_per_second = gas_used as f64 / latency.as_secs_f64();
let gas_per_second = gas_used as f64 / elapsed.as_secs_f64();
self.new_payload_gas_per_second.record(gas_per_second);
self.new_payload_gas_per_second_last.set(gas_per_second);

self.new_payload_latency.record(latency);
self.new_payload_last.set(latency);
self.gas_bucket.record(gas_used, latency);
self.new_payload_latency.record(elapsed);
self.new_payload_last.set(elapsed);
self.gas_bucket.record(gas_used, elapsed);
}
}
PayloadStatusEnum::Syncing => self.new_payload_syncing.increment(1),
Expand Down
14 changes: 2 additions & 12 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1608,18 +1608,16 @@ where
warn!(target: "engine::tree", ?state, elapsed=?start.elapsed(), "Failed to deliver forkchoiceUpdated response, receiver dropped (request cancelled): {err:?}");
}
}
BeaconEngineMessage::NewPayload { payload, tx, enqueued_at } => {
BeaconEngineMessage::NewPayload { payload, tx } => {
let start = Instant::now();
let gas_used = payload.gas_used();
let num_hash = payload.num_hash();
let mut output = self.on_new_payload(payload);
let latency = enqueued_at.elapsed();
self.metrics.engine.new_payload.update_response_metrics(
start,
&mut self.metrics.engine.forkchoice_updated.latest_finish_at,
&output,
gas_used,
latency,
);

let maybe_event =
Expand Down Expand Up @@ -1695,20 +1693,12 @@ where
let gas_used = payload.gas_used();
let num_hash = payload.num_hash();
let mut output = self.on_new_payload(payload);

// Latency measures time from enqueue to completion, excluding
// only the explicit persistence wait. This means backpressure
// (time spent queued due to the engine being busy) is included,
// reflecting real-world engine responsiveness.
let latency =
enqueued_at.elapsed().saturating_sub(explicit_persistence_wait);

let latency = start.elapsed();
self.metrics.engine.new_payload.update_response_metrics(
start,
&mut self.metrics.engine.forkchoice_updated.latest_finish_at,
&output,
gas_used,
latency,
);

let maybe_event =
Expand Down
1 change: 0 additions & 1 deletion crates/engine/tree/src/tree/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,6 @@ async fn test_holesky_payload() {
sidecar: ExecutionPayloadSidecar::none(),
},
tx,
enqueued_at: std::time::Instant::now(),
}
.into(),
))
Expand Down
7 changes: 2 additions & 5 deletions crates/engine/util/src/reorg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use std::{
future::Future,
pin::Pin,
task::{ready, Context, Poll},
time::Instant,
};
use tokio::sync::oneshot;
use tracing::*;
Expand Down Expand Up @@ -144,7 +143,7 @@ where
let next = ready!(this.stream.poll_next_unpin(cx));
let item = match (next, &this.last_forkchoice_state) {
(
Some(BeaconEngineMessage::NewPayload { payload, tx, enqueued_at }),
Some(BeaconEngineMessage::NewPayload { payload, tx }),
Some(last_forkchoice_state),
) if this.forkchoice_states_forwarded > this.frequency &&
// Only enter reorg state if new payload attaches to current head.
Expand Down Expand Up @@ -174,7 +173,6 @@ where
return Poll::Ready(Some(BeaconEngineMessage::NewPayload {
payload,
tx,
enqueued_at,
}))
}
};
Expand All @@ -193,12 +191,11 @@ where

let queue = VecDeque::from([
// Current payload
BeaconEngineMessage::NewPayload { payload, tx, enqueued_at },
BeaconEngineMessage::NewPayload { payload, tx },
// Reorg payload
BeaconEngineMessage::NewPayload {
payload: T::block_to_payload(reorg_block),
tx: reorg_payload_tx,
enqueued_at: Instant::now(),
},
// Reorg forkchoice state
BeaconEngineMessage::ForkchoiceUpdated {
Expand Down
4 changes: 2 additions & 2 deletions crates/engine/util/src/skip_new_payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ where
loop {
let next = ready!(this.stream.poll_next_unpin(cx));
let item = match next {
Some(BeaconEngineMessage::NewPayload { payload, tx, enqueued_at }) => {
Some(BeaconEngineMessage::NewPayload { payload, tx }) => {
if this.skipped < this.threshold {
*this.skipped += 1;
tracing::warn!(
Expand All @@ -56,7 +56,7 @@ where
continue
}
*this.skipped = 0;
Some(BeaconEngineMessage::NewPayload { payload, tx, enqueued_at })
Some(BeaconEngineMessage::NewPayload { payload, tx })
}
next => next,
};
Expand Down
2 changes: 1 addition & 1 deletion examples/bsc-p2p/src/block_import/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ mod tests {
tokio::spawn(async move {
while let Some(message) = from_engine.recv().await {
match message {
BeaconEngineMessage::NewPayload { payload: _, tx, .. } => {
BeaconEngineMessage::NewPayload { payload: _, tx } => {
tx.send(Ok(PayloadStatus::new(responses.new_payload.clone(), None)))
.unwrap();
}
Expand Down
Loading