Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 115 additions & 3 deletions beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -761,10 +761,10 @@ impl<S: SlotClock> ReprocessQueue<S> {
let reconstruction_deadline_millis =
(slot_duration * RECONSTRUCTION_DEADLINE.0) / RECONSTRUCTION_DEADLINE.1;
let reconstruction_deadline = Duration::from_millis(reconstruction_deadline_millis);
if let Some(seconds_from_current_slot) =
self.slot_clock.seconds_from_current_slot_start()
if let Some(duration_from_current_slot) =
self.slot_clock.millis_from_current_slot_start()
&& let Some(current_slot) = self.slot_clock.now()
&& seconds_from_current_slot >= reconstruction_deadline
&& duration_from_current_slot >= reconstruction_deadline
&& current_slot == request.slot
{
// If we are at least `reconstruction_deadline` seconds into the current slot,
Expand Down Expand Up @@ -1227,4 +1227,116 @@ mod tests {
// The entry for the block root should be gone.
assert!(queue.awaiting_lc_updates_per_parent_root.is_empty());
}

async fn test_reconstruction_immediate_at_deadline(slot_duration_secs: u64) {
let config = BeaconProcessorConfig::default();
let (ready_work_tx, _) = mpsc::channel::<ReadyWork>(config.max_scheduled_work_queue_len);
let (_, reprocess_work_rx) =
mpsc::channel::<ReprocessQueueMessage>(config.max_scheduled_work_queue_len);
let slot_clock = Arc::new(testing_slot_clock(slot_duration_secs));
let mut queue = ReprocessQueue::new(ready_work_tx, reprocess_work_rx, slot_clock);

let slot_duration = queue.slot_clock.slot_duration();
let reconstruction_deadline_millis = (slot_duration.as_millis() as u64
* RECONSTRUCTION_DEADLINE.0)
/ RECONSTRUCTION_DEADLINE.1;
let reconstruction_deadline = Duration::from_millis(reconstruction_deadline_millis);

// Advance time to just after the deadline
advance_time(
&queue.slot_clock,
reconstruction_deadline + Duration::from_millis(10),
)
.await;

let current_slot = queue.slot_clock.now().unwrap();
let block_root = Hash256::repeat_byte(0xaa);

// Queue a reconstruction for the current slot after the deadline
let reconstruction_request = QueuedColumnReconstruction {
block_root,
slot: current_slot,
process_fn: Box::pin(async {}),
};
queue.handle_message(InboundEvent::Msg(
ReprocessQueueMessage::DelayColumnReconstruction(reconstruction_request),
));

assert_eq!(queue.queued_column_reconstructions.len(), 1);

// Should be immediately ready (0 delay since we're past deadline)
let ready_msg = queue.next().await.unwrap();
assert!(matches!(
ready_msg,
InboundEvent::ReadyColumnReconstruction(_)
));

if let InboundEvent::ReadyColumnReconstruction(reconstruction) = ready_msg {
assert_eq!(reconstruction.block_root, block_root);
queue.handle_message(InboundEvent::ReadyColumnReconstruction(reconstruction));
}

assert!(queue.queued_column_reconstructions.is_empty());
}

/// Tests that column reconstruction queued after the deadline is triggered immediately
/// on mainnet (12s slots).
///
/// When a reconstruction for the current slot is queued after the reconstruction deadline
/// (1/4 of slot duration = 3s for mainnet), it should be processed immediately with 0 delay.
#[tokio::test]
async fn column_reconstruction_immediate_processing_at_deadline_mainnet() {
tokio::time::pause();
test_reconstruction_immediate_at_deadline(12).await;
}

/// Tests that column reconstruction queued after the deadline is triggered immediately
/// on Gnosis (5s slots).
///
/// When a reconstruction for the current slot is queued after the reconstruction deadline
/// (1/4 of slot duration = 1.25s for Gnosis), it should be processed immediately with 0 delay.
#[tokio::test]
async fn column_reconstruction_immediate_processing_at_deadline_gnosis() {
tokio::time::pause();
test_reconstruction_immediate_at_deadline(5).await;
}

/// Tests that column reconstruction uses the standard delay when queued before the deadline.
///
/// When a reconstruction for the current slot is queued before the deadline, it should wait
/// for the standard QUEUED_RECONSTRUCTION_DELAY (150ms) before being triggered.
#[tokio::test]
async fn column_reconstruction_uses_standard_delay() {
tokio::time::pause();

let mut queue = test_queue();
let current_slot = queue.slot_clock.now().unwrap();
let block_root = Hash256::repeat_byte(0xcc);

// Queue a reconstruction at the start of the slot (before deadline)
let reconstruction_request = QueuedColumnReconstruction {
block_root,
slot: current_slot,
process_fn: Box::pin(async {}),
};
queue.handle_message(InboundEvent::Msg(
ReprocessQueueMessage::DelayColumnReconstruction(reconstruction_request),
));

assert_eq!(queue.queued_column_reconstructions.len(), 1);

// Advance time by QUEUED_RECONSTRUCTION_DELAY
advance_time(&queue.slot_clock, QUEUED_RECONSTRUCTION_DELAY).await;

// Should be ready after the standard delay
let ready_msg = queue.next().await.unwrap();
assert!(matches!(
ready_msg,
InboundEvent::ReadyColumnReconstruction(_)
));

if let InboundEvent::ReadyColumnReconstruction(reconstruction) = ready_msg {
assert_eq!(reconstruction.block_root, block_root);
}
}
}
Loading