Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 130 additions & 9 deletions beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ const TASK_NAME: &str = "beacon_processor_reprocess_queue";
const GOSSIP_BLOCKS: &str = "gossip_blocks";
const RPC_BLOCKS: &str = "rpc_blocks";
const ATTESTATIONS: &str = "attestations";
const ATTESTATIONS_PER_ROOT: &str = "attestations_per_root";
const LIGHT_CLIENT_UPDATES: &str = "lc_updates";
const LIGHT_CLIENT_UPDATES_PER_PARENT_ROOT: &str = "lc_updates_per_parent_root";

/// Queue blocks for re-processing with an `ADDITIONAL_QUEUED_BLOCK_DELAY` after the slot starts.
/// This is to account for any slight drift in the system clock.
Expand Down Expand Up @@ -829,10 +831,19 @@ impl<S: SlotClock> ReprocessQueue<S> {
);
}

if let Some(queued_atts) = self.awaiting_attestations_per_root.get_mut(&root)
&& let Some(index) = queued_atts.iter().position(|&id| id == queued_id)
if let Entry::Occupied(mut queued_atts) =
self.awaiting_attestations_per_root.entry(root)
&& let Some(index) =
queued_atts.get().iter().position(|&id| id == queued_id)
{
queued_atts.swap_remove(index);
let queued_atts_mut = queued_atts.get_mut();
queued_atts_mut.swap_remove(index);

// If the vec is empty after this attestation's removal, we need to delete
// the entry to prevent bloating the hashmap indefinitely.
if queued_atts_mut.is_empty() {
queued_atts.remove_entry();
}
}
}
}
Expand All @@ -853,13 +864,19 @@ impl<S: SlotClock> ReprocessQueue<S> {
error!("Failed to send scheduled light client optimistic update");
}

if let Some(queued_lc_updates) = self
.awaiting_lc_updates_per_parent_root
.get_mut(&parent_root)
&& let Some(index) =
queued_lc_updates.iter().position(|&id| id == queued_id)
if let Entry::Occupied(mut queued_lc_updates) =
self.awaiting_lc_updates_per_parent_root.entry(parent_root)
&& let Some(index) = queued_lc_updates
.get()
.iter()
.position(|&id| id == queued_id)
{
queued_lc_updates.swap_remove(index);
let queued_lc_updates_mut = queued_lc_updates.get_mut();
queued_lc_updates_mut.swap_remove(index);

if queued_lc_updates_mut.is_empty() {
queued_lc_updates.remove_entry();
}
}
}
}
Expand Down Expand Up @@ -929,11 +946,21 @@ impl<S: SlotClock> ReprocessQueue<S> {
&[ATTESTATIONS],
self.attestations_delay_queue.len() as i64,
);
metrics::set_gauge_vec(
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL,
&[ATTESTATIONS_PER_ROOT],
self.awaiting_attestations_per_root.len() as i64,
);
metrics::set_gauge_vec(
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL,
&[LIGHT_CLIENT_UPDATES],
self.lc_updates_delay_queue.len() as i64,
);
metrics::set_gauge_vec(
&metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL,
&[LIGHT_CLIENT_UPDATES_PER_PARENT_ROOT],
self.awaiting_lc_updates_per_parent_root.len() as i64,
);
}

fn recompute_next_backfill_batch_event(&mut self) {
Expand Down Expand Up @@ -979,6 +1006,7 @@ impl<S: SlotClock> ReprocessQueue<S> {
#[cfg(test)]
mod tests {
use super::*;
use crate::BeaconProcessorConfig;
use logging::create_test_tracing_subscriber;
use slot_clock::{ManualSlotClock, TestingSlotClock};
use std::ops::Add;
Expand Down Expand Up @@ -1101,4 +1129,97 @@ mod tests {
Duration::from_secs(slot_duration),
)
}

fn test_queue() -> ReprocessQueue<ManualSlotClock> {
create_test_tracing_subscriber();

let config = BeaconProcessorConfig::default();
let (ready_work_tx, _) = mpsc::channel::<ReadyWork>(config.max_scheduled_work_queue_len);
let (_, reprocess_work_rx) =
mpsc::channel::<ReprocessQueueMessage>(config.max_scheduled_work_queue_len);
let slot_clock = Arc::new(testing_slot_clock(12));

ReprocessQueue::new(ready_work_tx, reprocess_work_rx, slot_clock)
}

// This is a regression test for a memory leak in `awaiting_attestations_per_root`.
// See: https://github.com/sigp/lighthouse/pull/8065
#[tokio::test]
async fn prune_awaiting_attestations_per_root() {
create_test_tracing_subscriber();

let mut queue = test_queue();

// Pause time so it only advances manually
tokio::time::pause();

let beacon_block_root = Hash256::repeat_byte(0xaf);

// Insert an attestation.
let att = ReprocessQueueMessage::UnknownBlockUnaggregate(QueuedUnaggregate {
beacon_block_root,
process_fn: Box::new(|| {}),
});

// Process the event to enter it into the delay queue.
queue.handle_message(InboundEvent::Msg(att));

// Check that it is queued.
assert_eq!(queue.awaiting_attestations_per_root.len(), 1);
assert!(
queue
.awaiting_attestations_per_root
.contains_key(&beacon_block_root)
);

// Advance time to expire the attestation.
advance_time(&queue.slot_clock, 2 * QUEUED_ATTESTATION_DELAY).await;
let ready_msg = queue.next().await.unwrap();
assert!(matches!(ready_msg, InboundEvent::ReadyAttestation(_)));
queue.handle_message(ready_msg);

// The entry for the block root should be gone.
assert!(queue.awaiting_attestations_per_root.is_empty());
}

// This is a regression test for a memory leak in `awaiting_lc_updates_per_parent_root`.
// See: https://github.com/sigp/lighthouse/pull/8065
#[tokio::test]
async fn prune_awaiting_lc_updates_per_parent_root() {
create_test_tracing_subscriber();

let mut queue = test_queue();

// Pause time so it only advances manually
tokio::time::pause();

let parent_root = Hash256::repeat_byte(0xaf);

// Insert an attestation.
let msg =
ReprocessQueueMessage::UnknownLightClientOptimisticUpdate(QueuedLightClientUpdate {
parent_root,
process_fn: Box::new(|| {}),
});

// Process the event to enter it into the delay queue.
queue.handle_message(InboundEvent::Msg(msg));

// Check that it is queued.
assert_eq!(queue.awaiting_lc_updates_per_parent_root.len(), 1);
assert!(
queue
.awaiting_lc_updates_per_parent_root
.contains_key(&parent_root)
);

// Advance time to expire the update.
advance_time(&queue.slot_clock, 2 * QUEUED_LIGHT_CLIENT_UPDATE_DELAY).await;
let ready_msg = queue.next().await.unwrap();
assert!(matches!(ready_msg, InboundEvent::ReadyLightClientUpdate(_)));
queue.handle_message(ready_msg);

// The entry for the block root should be gone.
assert!(queue.awaiting_lc_updates_per_parent_root.is_empty());
}
}
Loading