diff --git a/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs b/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs index 9565e57589d..3e755f08302 100644 --- a/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs +++ b/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs @@ -37,7 +37,9 @@ const TASK_NAME: &str = "beacon_processor_reprocess_queue"; const GOSSIP_BLOCKS: &str = "gossip_blocks"; const RPC_BLOCKS: &str = "rpc_blocks"; const ATTESTATIONS: &str = "attestations"; +const ATTESTATIONS_PER_ROOT: &str = "attestations_per_root"; const LIGHT_CLIENT_UPDATES: &str = "lc_updates"; +const LIGHT_CLIENT_UPDATES_PER_PARENT_ROOT: &str = "lc_updates_per_parent_root"; /// Queue blocks for re-processing with an `ADDITIONAL_QUEUED_BLOCK_DELAY` after the slot starts. /// This is to account for any slight drift in the system clock. @@ -829,10 +831,19 @@ impl ReprocessQueue { ); } - if let Some(queued_atts) = self.awaiting_attestations_per_root.get_mut(&root) - && let Some(index) = queued_atts.iter().position(|&id| id == queued_id) + if let Entry::Occupied(mut queued_atts) = + self.awaiting_attestations_per_root.entry(root) + && let Some(index) = + queued_atts.get().iter().position(|&id| id == queued_id) { - queued_atts.swap_remove(index); + let queued_atts_mut = queued_atts.get_mut(); + queued_atts_mut.swap_remove(index); + + // If the vec is empty after this attestation's removal, we need to delete + // the entry to prevent bloating the hashmap indefinitely. + if queued_atts_mut.is_empty() { + queued_atts.remove_entry(); + } } } } @@ -853,13 +864,19 @@ impl ReprocessQueue { error!("Failed to send scheduled light client optimistic update"); } - if let Some(queued_lc_updates) = self - .awaiting_lc_updates_per_parent_root - .get_mut(&parent_root) - && let Some(index) = - queued_lc_updates.iter().position(|&id| id == queued_id) + if let Entry::Occupied(mut queued_lc_updates) = + self.awaiting_lc_updates_per_parent_root.entry(parent_root) + && let Some(index) = queued_lc_updates + .get() + .iter() + .position(|&id| id == queued_id) { - queued_lc_updates.swap_remove(index); + let queued_lc_updates_mut = queued_lc_updates.get_mut(); + queued_lc_updates_mut.swap_remove(index); + + if queued_lc_updates_mut.is_empty() { + queued_lc_updates.remove_entry(); + } } } } @@ -929,11 +946,21 @@ impl ReprocessQueue { &[ATTESTATIONS], self.attestations_delay_queue.len() as i64, ); + metrics::set_gauge_vec( + &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL, + &[ATTESTATIONS_PER_ROOT], + self.awaiting_attestations_per_root.len() as i64, + ); metrics::set_gauge_vec( &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL, &[LIGHT_CLIENT_UPDATES], self.lc_updates_delay_queue.len() as i64, ); + metrics::set_gauge_vec( + &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_TOTAL, + &[LIGHT_CLIENT_UPDATES_PER_PARENT_ROOT], + self.awaiting_lc_updates_per_parent_root.len() as i64, + ); } fn recompute_next_backfill_batch_event(&mut self) { @@ -979,6 +1006,7 @@ impl ReprocessQueue { #[cfg(test)] mod tests { use super::*; + use crate::BeaconProcessorConfig; use logging::create_test_tracing_subscriber; use slot_clock::{ManualSlotClock, TestingSlotClock}; use std::ops::Add; @@ -1101,4 +1129,97 @@ mod tests { Duration::from_secs(slot_duration), ) } + + fn test_queue() -> ReprocessQueue { + create_test_tracing_subscriber(); + + let config = BeaconProcessorConfig::default(); + let (ready_work_tx, _) = mpsc::channel::(config.max_scheduled_work_queue_len); + let (_, reprocess_work_rx) = + mpsc::channel::(config.max_scheduled_work_queue_len); + let slot_clock = Arc::new(testing_slot_clock(12)); + + ReprocessQueue::new(ready_work_tx, reprocess_work_rx, slot_clock) + } + + // This is a regression test for a memory leak in `awaiting_attestations_per_root`. + // See: https://github.com/sigp/lighthouse/pull/8065 + #[tokio::test] + async fn prune_awaiting_attestations_per_root() { + create_test_tracing_subscriber(); + + let mut queue = test_queue(); + + // Pause time so it only advances manually + tokio::time::pause(); + + let beacon_block_root = Hash256::repeat_byte(0xaf); + + // Insert an attestation. + let att = ReprocessQueueMessage::UnknownBlockUnaggregate(QueuedUnaggregate { + beacon_block_root, + process_fn: Box::new(|| {}), + }); + + // Process the event to enter it into the delay queue. + queue.handle_message(InboundEvent::Msg(att)); + + // Check that it is queued. + assert_eq!(queue.awaiting_attestations_per_root.len(), 1); + assert!( + queue + .awaiting_attestations_per_root + .contains_key(&beacon_block_root) + ); + + // Advance time to expire the attestation. + advance_time(&queue.slot_clock, 2 * QUEUED_ATTESTATION_DELAY).await; + let ready_msg = queue.next().await.unwrap(); + assert!(matches!(ready_msg, InboundEvent::ReadyAttestation(_))); + queue.handle_message(ready_msg); + + // The entry for the block root should be gone. + assert!(queue.awaiting_attestations_per_root.is_empty()); + } + + // This is a regression test for a memory leak in `awaiting_lc_updates_per_parent_root`. + // See: https://github.com/sigp/lighthouse/pull/8065 + #[tokio::test] + async fn prune_awaiting_lc_updates_per_parent_root() { + create_test_tracing_subscriber(); + + let mut queue = test_queue(); + + // Pause time so it only advances manually + tokio::time::pause(); + + let parent_root = Hash256::repeat_byte(0xaf); + + // Insert an attestation. + let msg = + ReprocessQueueMessage::UnknownLightClientOptimisticUpdate(QueuedLightClientUpdate { + parent_root, + process_fn: Box::new(|| {}), + }); + + // Process the event to enter it into the delay queue. + queue.handle_message(InboundEvent::Msg(msg)); + + // Check that it is queued. + assert_eq!(queue.awaiting_lc_updates_per_parent_root.len(), 1); + assert!( + queue + .awaiting_lc_updates_per_parent_root + .contains_key(&parent_root) + ); + + // Advance time to expire the update. + advance_time(&queue.slot_clock, 2 * QUEUED_LIGHT_CLIENT_UPDATE_DELAY).await; + let ready_msg = queue.next().await.unwrap(); + assert!(matches!(ready_msg, InboundEvent::ReadyLightClientUpdate(_))); + queue.handle_message(ready_msg); + + // The entry for the block root should be gone. + assert!(queue.awaiting_lc_updates_per_parent_root.is_empty()); + } }