From de6d2cac9bdc63262248c30be65b7eeed4804ee6 Mon Sep 17 00:00:00 2001 From: Arun Dhyani Date: Fri, 16 Jan 2026 23:23:22 +0530 Subject: [PATCH 1/3] fix(exex): prevent ExExManager deadlock when buffer clears after being full --- crates/exex/exex/src/manager.rs | 86 ++++++++++++++++++++++++++++++++- 1 file changed, 84 insertions(+), 2 deletions(-) diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index b57beb8d5cf..a3eecc034dd 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -36,7 +36,7 @@ use tokio_util::sync::{PollSendError, PollSender, ReusableBoxFuture}; /// /// 1024 notifications in the buffer is 3.5 hours of mainnet blocks, /// or 17 minutes of 1-second blocks. -pub const DEFAULT_EXEX_MANAGER_CAPACITY: usize = 1024; +pub const DEFAULT_EXEX_MANAGER_CAPACITY: usize = 128; /// Default maximum number of blocks allowed in the WAL before emitting a warning. /// @@ -45,7 +45,7 @@ pub const DEFAULT_EXEX_MANAGER_CAPACITY: usize = 1024; /// /// This value is appropriate for Ethereum mainnet with ~12 second block times. For L2 chains with /// faster block times, this value should be increased proportionally to avoid excessive warnings. -pub const DEFAULT_WAL_BLOCKS_WARNING: usize = 128; +pub const DEFAULT_WAL_BLOCKS_WARNING: usize = 64; /// The source of the notification. /// @@ -503,6 +503,7 @@ where } break } + let buffer_full = this.buffer.len() >= this.max_capacity; // Update capacity this.update_capacity(); @@ -536,6 +537,12 @@ where // Update capacity this.update_capacity(); + // If the buffer was full and we made space, we need to wake up to accept new notifications + if buffer_full && this.buffer.len() < this.max_capacity { + debug!(target: "exex::manager", "Buffer has space again, waking up senders"); + cx.waker().wake_by_ref(); + } + // Update watch channel block number let finished_height = this.exex_handles.iter_mut().try_fold(u64::MAX, |curr, exex| { exex.finished_height.map_or(Err(()), |height| Ok(height.number.min(curr))) @@ -1443,4 +1450,79 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_deadlock_manager_wakes_after_buffer_clears() { + // This test simulates the scenario where the buffer fills up, ingestion pauses, + // and then space clears. We verify the manager wakes up to process pending items. + + let temp_dir = tempfile::tempdir().unwrap(); + let wal = Wal::new(temp_dir.path()).unwrap(); + let provider_factory = create_test_provider_factory(); + init_genesis(&provider_factory).unwrap(); + let provider = BlockchainProvider::new(provider_factory.clone()).unwrap(); + + // 1. Setup Manager with Capacity = 1 + let (exex_handle, _, mut notifications) = ExExHandle::new( + "test_exex".to_string(), + Default::default(), + provider, + EthEvmConfig::mainnet(), + wal.handle(), + ); + + let max_capacity = 2; + let exex_manager = ExExManager::new( + provider_factory, + vec![exex_handle], + max_capacity, + wal, + empty_finalized_header_stream(), + ); + + let manager_handle = exex_manager.handle(); + + // Spawn manager in background so it runs continuously + tokio::spawn(async move { + exex_manager.await.ok(); + }); + + // Helper to create notifications + let mut rng = generators::rng(); + let mut make_notif = |id: u64| { + let block = random_block(&mut rng, id, BlockParams::default()) + .try_recover() + .unwrap(); + ExExNotification::ChainCommitted { + new: Arc::new(Chain::new( + vec![block], + Default::default(), + Default::default(), + Default::default(), + )), + } + }; + + manager_handle.send(ExExNotificationSource::Pipeline, make_notif(1)).unwrap(); + + // Send the "Stuck" Item (Notification #100). + // At this point, the Manager loop has skipped the ingestion logic because buffer is full (buffer_full=true). + // This item sits in the unbounded 'handle_rx' channel waiting. + manager_handle.send(ExExNotificationSource::Pipeline, make_notif(100)).unwrap(); + + // 3. Relieve Pressure + // We consume items from the ExEx. + // As we pull items out, the ExEx frees space -> Manager sends buffered item -> Manager frees space. + // Once Manager frees space, the FIX (wake_by_ref) should trigger, causing it to read Notif #100. + + // Consume the jam + let _ = notifications.next().await.unwrap(); + + // 4. Assert No Deadlock + // We expect Notification #100 next. + // If the wake_by_ref fix is missing, this will Time Out because the manager is sleeping despite having empty buffer. + let result = tokio::time::timeout(std::time::Duration::from_secs(1), notifications.next()).await; + + assert!(result.is_ok(), "Deadlock detected! Manager failed to wake up and process Pending Item #100."); + } } From 22eb58f2d2bdc8bf2a54642c9f5ab5b2faa391fe Mon Sep 17 00:00:00 2001 From: Arun Dhyani Date: Fri, 16 Jan 2026 23:26:25 +0530 Subject: [PATCH 2/3] revert back const --- crates/exex/exex/src/manager.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index a3eecc034dd..49e6e741faa 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -36,7 +36,7 @@ use tokio_util::sync::{PollSendError, PollSender, ReusableBoxFuture}; /// /// 1024 notifications in the buffer is 3.5 hours of mainnet blocks, /// or 17 minutes of 1-second blocks. -pub const DEFAULT_EXEX_MANAGER_CAPACITY: usize = 128; +pub const DEFAULT_EXEX_MANAGER_CAPACITY: usize = 1024; /// Default maximum number of blocks allowed in the WAL before emitting a warning. /// @@ -45,7 +45,7 @@ pub const DEFAULT_EXEX_MANAGER_CAPACITY: usize = 128; /// /// This value is appropriate for Ethereum mainnet with ~12 second block times. For L2 chains with /// faster block times, this value should be increased proportionally to avoid excessive warnings. -pub const DEFAULT_WAL_BLOCKS_WARNING: usize = 64; +pub const DEFAULT_WAL_BLOCKS_WARNING: usize = 128; /// The source of the notification. /// From 14e80ff0cb876420fc46f2a9d701899ee984ecb8 Mon Sep 17 00:00:00 2001 From: Arun Dhyani Date: Fri, 16 Jan 2026 23:32:42 +0530 Subject: [PATCH 3/3] lintfix --- crates/exex/exex/src/manager.rs | 34 ++++++++++++++++++--------------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index 49e6e741faa..b28aef51246 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -1455,7 +1455,7 @@ mod tests { async fn test_deadlock_manager_wakes_after_buffer_clears() { // This test simulates the scenario where the buffer fills up, ingestion pauses, // and then space clears. We verify the manager wakes up to process pending items. - + let temp_dir = tempfile::tempdir().unwrap(); let wal = Wal::new(temp_dir.path()).unwrap(); let provider_factory = create_test_provider_factory(); @@ -1490,9 +1490,7 @@ mod tests { // Helper to create notifications let mut rng = generators::rng(); let mut make_notif = |id: u64| { - let block = random_block(&mut rng, id, BlockParams::default()) - .try_recover() - .unwrap(); + let block = random_block(&mut rng, id, BlockParams::default()).try_recover().unwrap(); ExExNotification::ChainCommitted { new: Arc::new(Chain::new( vec![block], @@ -1506,23 +1504,29 @@ mod tests { manager_handle.send(ExExNotificationSource::Pipeline, make_notif(1)).unwrap(); // Send the "Stuck" Item (Notification #100). - // At this point, the Manager loop has skipped the ingestion logic because buffer is full (buffer_full=true). - // This item sits in the unbounded 'handle_rx' channel waiting. + // At this point, the Manager loop has skipped the ingestion logic because buffer is full + // (buffer_full=true). This item sits in the unbounded 'handle_rx' channel waiting. manager_handle.send(ExExNotificationSource::Pipeline, make_notif(100)).unwrap(); - // 3. Relieve Pressure - // We consume items from the ExEx. - // As we pull items out, the ExEx frees space -> Manager sends buffered item -> Manager frees space. - // Once Manager frees space, the FIX (wake_by_ref) should trigger, causing it to read Notif #100. - + // 3. Relieve Pressure + // We consume items from the ExEx. + // As we pull items out, the ExEx frees space -> Manager sends buffered item -> Manager + // frees space. Once Manager frees space, the FIX (wake_by_ref) should trigger, + // causing it to read Notif #100. + // Consume the jam let _ = notifications.next().await.unwrap(); // 4. Assert No Deadlock // We expect Notification #100 next. - // If the wake_by_ref fix is missing, this will Time Out because the manager is sleeping despite having empty buffer. - let result = tokio::time::timeout(std::time::Duration::from_secs(1), notifications.next()).await; - - assert!(result.is_ok(), "Deadlock detected! Manager failed to wake up and process Pending Item #100."); + // If the wake_by_ref fix is missing, this will Time Out because the manager is sleeping + // despite having empty buffer. + let result = + tokio::time::timeout(std::time::Duration::from_secs(1), notifications.next()).await; + + assert!( + result.is_ok(), + "Deadlock detected! Manager failed to wake up and process Pending Item #100." + ); } }