diff --git a/Cargo.lock b/Cargo.lock index d5bdaafe1d5..7d13e89ccdf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10565,6 +10565,7 @@ dependencies = [ "alloy-primitives", "alloy-rlp", "crossbeam-channel", + "crossbeam-utils", "derive_more", "itertools 0.14.0", "metrics", diff --git a/crates/engine/tree/src/tree/payload_processor/multiproof.rs b/crates/engine/tree/src/tree/payload_processor/multiproof.rs index 0f4046d1d7a..dbe8f524c65 100644 --- a/crates/engine/tree/src/tree/payload_processor/multiproof.rs +++ b/crates/engine/tree/src/tree/payload_processor/multiproof.rs @@ -61,8 +61,8 @@ pub(crate) fn dispatch_with_chunking( chunking_len: usize, chunk_size: usize, max_targets_for_chunking: usize, - available_account_workers: usize, - available_storage_workers: usize, + has_multiple_idle_account_workers: bool, + has_multiple_idle_storage_workers: bool, chunker: impl FnOnce(T, usize) -> I, mut dispatch: impl FnMut(T), ) -> usize @@ -70,8 +70,8 @@ where I: IntoIterator, { let should_chunk = chunking_len > max_targets_for_chunking || - available_account_workers > 1 || - available_storage_workers > 1; + has_multiple_idle_account_workers || + has_multiple_idle_storage_workers; if should_chunk && chunking_len > chunk_size { let mut num_chunks = 0usize; diff --git a/crates/engine/tree/src/tree/payload_processor/sparse_trie.rs b/crates/engine/tree/src/tree/payload_processor/sparse_trie.rs index 27f38f53f72..3ab85584a00 100644 --- a/crates/engine/tree/src/tree/payload_processor/sparse_trie.rs +++ b/crates/engine/tree/src/tree/payload_processor/sparse_trie.rs @@ -777,8 +777,8 @@ where chunking_length, self.chunk_size, self.max_targets_for_chunking, - self.proof_worker_handle.available_account_workers(), - self.proof_worker_handle.available_storage_workers(), + self.proof_worker_handle.has_multiple_idle_account_workers(), + self.proof_worker_handle.has_multiple_idle_storage_workers(), MultiProofTargetsV2::chunks, |proof_targets| { if let Err(e) = diff --git a/crates/trie/parallel/Cargo.toml b/crates/trie/parallel/Cargo.toml index ac80c5ba41c..637e98d3c09 100644 --- a/crates/trie/parallel/Cargo.toml +++ b/crates/trie/parallel/Cargo.toml @@ -39,6 +39,7 @@ derive_more.workspace = true rayon.workspace = true itertools.workspace = true crossbeam-channel.workspace = true +crossbeam-utils.workspace = true # `metrics` feature reth-metrics = { workspace = true, optional = true } diff --git a/crates/trie/parallel/src/proof_task.rs b/crates/trie/parallel/src/proof_task.rs index 7cc66caad9d..7eec4591140 100644 --- a/crates/trie/parallel/src/proof_task.rs +++ b/crates/trie/parallel/src/proof_task.rs @@ -55,7 +55,7 @@ use std::{ cell::RefCell, rc::Rc, sync::{ - atomic::{AtomicUsize, Ordering}, + atomic::{AtomicBool, AtomicUsize, Ordering}, mpsc::{channel, Receiver, Sender}, Arc, }, @@ -86,6 +86,52 @@ type V2StorageProofCalculator<'a, Provider> = proof_v2::StorageProofCalculator< InstrumentedHashedCursor<'a, ::StorageCursor<'a>>, >; +/// Tracks worker availability counts. +/// +/// It uses cacheline-aligned flags to avoid core-to-core chatter. +#[derive(Debug)] +struct AvailabilitySheet { + /// One flag per worker, each on its own cacheline. Workers store `true` when idle, + /// `false` when busy. Only the owning worker writes; the dispatcher only reads. + flags: Vec>, +} + +impl AvailabilitySheet { + /// Creates a new sheet with `count` workers, all initially marked as busy. + fn new(count: usize) -> Self { + let flags = + (0..count).map(|_| crossbeam_utils::CachePadded::new(AtomicBool::new(false))).collect(); + Self { flags } + } + + /// Returns `true` if more than one worker is currently idle. + /// + /// Note, that this is somewhat racy since a flag that was just saying `idle` and we counted it + /// as such might turn into `busy` right away. + fn has_multiple_idle(&self) -> bool { + let mut idle = 0u32; + for flag in &self.flags { + if flag.load(Ordering::Relaxed) { + idle += 1; + if idle > 1 { + return true; + } + } + } + false + } + + /// Marks the given worker as idle. + fn mark_idle(&self, worker_id: usize) { + self.flags[worker_id].store(true, Ordering::Relaxed); + } + + /// Marks the given worker as busy. + fn mark_busy(&self, worker_id: usize) { + self.flags[worker_id].store(false, Ordering::Relaxed); + } +} + /// A handle that provides type-safe access to proof worker pools. /// /// The handle stores direct senders to both storage and account worker pools, @@ -97,12 +143,12 @@ pub struct ProofWorkerHandle { storage_work_tx: CrossbeamSender, /// Direct sender to account worker pool account_work_tx: CrossbeamSender, - /// Counter tracking available storage workers. Workers decrement when starting work, - /// increment when finishing. Used to determine whether to chunk multiproofs. - storage_available_workers: Arc, - /// Counter tracking available account workers. Workers decrement when starting work, - /// increment when finishing. Used to determine whether to chunk multiproofs. - account_available_workers: Arc, + /// Per-worker availability flags for storage workers. Used to determine whether to chunk + /// multiproofs. + storage_availability: Arc, + /// Per-worker availability flags for account workers. Used to determine whether to chunk + /// multiproofs. + account_availability: Arc, /// Total number of storage workers spawned storage_worker_count: usize, /// Total number of account workers spawned @@ -140,9 +186,6 @@ impl ProofWorkerHandle { let (storage_work_tx, storage_work_rx) = unbounded::(); let (account_work_tx, account_work_rx) = unbounded::(); - let storage_available_workers = Arc::::default(); - let account_available_workers = Arc::::default(); - let cached_storage_roots = Arc::>::default(); let divisor = if halve_workers { 2 } else { 1 }; @@ -151,6 +194,9 @@ impl ProofWorkerHandle { let account_worker_count = runtime.proof_account_worker_pool().current_num_threads() / divisor; + let storage_availability = Arc::new(AvailabilitySheet::new(storage_worker_count)); + let account_availability = Arc::new(AvailabilitySheet::new(account_worker_count)); + debug!( target: "trie::proof_task", storage_worker_count, @@ -163,7 +209,7 @@ impl ProofWorkerHandle { // tokio's blocking pool. let storage_rt = runtime.clone(); let storage_task_ctx = task_ctx.clone(); - let storage_avail = storage_available_workers.clone(); + let storage_avail = storage_availability.clone(); let storage_roots = cached_storage_roots.clone(); let storage_parent_span = tracing::Span::current(); runtime.spawn_blocking_named("storage-workers", move || { @@ -202,7 +248,7 @@ impl ProofWorkerHandle { let account_rt = runtime.clone(); let account_tx = storage_work_tx.clone(); - let account_avail = account_available_workers.clone(); + let account_avail = account_availability.clone(); let account_parent_span = tracing::Span::current(); runtime.spawn_blocking_named("account-workers", move || { let worker_id = AtomicUsize::new(0); @@ -242,21 +288,21 @@ impl ProofWorkerHandle { Self { storage_work_tx, account_work_tx, - storage_available_workers, - account_available_workers, + storage_availability, + account_availability, storage_worker_count, account_worker_count, } } - /// Returns how many storage workers are currently available/idle. - pub fn available_storage_workers(&self) -> usize { - self.storage_available_workers.load(Ordering::Relaxed) + /// Returns `true` if more than one storage worker is currently idle. + pub fn has_multiple_idle_storage_workers(&self) -> bool { + self.storage_availability.has_multiple_idle() } - /// Returns how many account workers are currently available/idle. - pub fn available_account_workers(&self) -> usize { - self.account_available_workers.load(Ordering::Relaxed) + /// Returns `true` if more than one account worker is currently idle. + pub fn has_multiple_idle_account_workers(&self) -> bool { + self.account_availability.has_multiple_idle() } /// Returns the number of pending storage tasks in the queue. @@ -279,20 +325,6 @@ impl ProofWorkerHandle { self.account_worker_count } - /// Returns the number of storage workers currently processing tasks. - /// - /// This is calculated as total workers minus available workers. - pub fn active_storage_workers(&self) -> usize { - self.storage_worker_count.saturating_sub(self.available_storage_workers()) - } - - /// Returns the number of account workers currently processing tasks. - /// - /// This is calculated as total workers minus available workers. - pub fn active_account_workers(&self) -> usize { - self.account_worker_count.saturating_sub(self.available_account_workers()) - } - /// Dispatch a storage proof computation to storage worker pool /// /// The result will be sent via the `proof_result_sender` channel. @@ -635,8 +667,8 @@ struct StorageProofWorker { work_rx: CrossbeamReceiver, /// Unique identifier for this worker (used for tracing) worker_id: usize, - /// Counter tracking worker availability - available_workers: Arc, + /// Per-worker availability flags + availability: Arc, /// Cached storage roots cached_storage_roots: Arc>, /// Metrics collector for this worker @@ -656,7 +688,7 @@ where task_ctx: ProofTaskCtx, work_rx: CrossbeamReceiver, worker_id: usize, - available_workers: Arc, + availability: Arc, cached_storage_roots: Arc>, #[cfg(feature = "metrics")] metrics: ProofTaskTrieMetrics, #[cfg(feature = "metrics")] cursor_metrics: ProofTaskCursorMetrics, @@ -665,7 +697,7 @@ where task_ctx, work_rx, worker_id, - available_workers, + availability, cached_storage_roots, #[cfg(feature = "metrics")] metrics, @@ -719,7 +751,7 @@ where ); // Initially mark this worker as available. - self.available_workers.fetch_add(1, Ordering::Relaxed); + self.availability.mark_idle(self.worker_id); let mut total_idle_time = Duration::ZERO; let mut idle_start = Instant::now(); @@ -728,7 +760,7 @@ where total_idle_time += idle_start.elapsed(); // Mark worker as busy. - self.available_workers.fetch_sub(1, Ordering::Relaxed); + self.availability.mark_busy(self.worker_id); #[cfg(feature = "trie-debug")] if let Some(max_jitter) = self.task_ctx.proof_jitter { @@ -767,7 +799,7 @@ where } // Mark worker as available again. - self.available_workers.fetch_add(1, Ordering::Relaxed); + self.availability.mark_idle(self.worker_id); idle_start = Instant::now(); } @@ -911,8 +943,8 @@ struct AccountProofWorker { worker_id: usize, /// Channel for dispatching storage proof work (for pre-dispatched target proofs) storage_work_tx: CrossbeamSender, - /// Counter tracking worker availability - available_workers: Arc, + /// Per-worker availability flags + availability: Arc, /// Cached storage roots cached_storage_roots: Arc>, /// Metrics collector for this worker @@ -934,7 +966,7 @@ where work_rx: CrossbeamReceiver, worker_id: usize, storage_work_tx: CrossbeamSender, - available_workers: Arc, + availability: Arc, cached_storage_roots: Arc>, #[cfg(feature = "metrics")] metrics: ProofTaskTrieMetrics, #[cfg(feature = "metrics")] cursor_metrics: ProofTaskCursorMetrics, @@ -944,7 +976,7 @@ where work_rx, worker_id, storage_work_tx, - available_workers, + availability, cached_storage_roots, #[cfg(feature = "metrics")] metrics, @@ -1030,7 +1062,7 @@ where ))); // Count this worker as available only after successful initialization. - self.available_workers.fetch_add(1, Ordering::Relaxed); + self.availability.mark_idle(self.worker_id); let mut total_idle_time = Duration::ZERO; let mut idle_start = Instant::now(); @@ -1040,7 +1072,7 @@ where total_idle_time += idle_start.elapsed(); // Mark worker as busy. - self.available_workers.fetch_sub(1, Ordering::Relaxed); + self.availability.mark_busy(self.worker_id); #[cfg(feature = "trie-debug")] if let Some(max_jitter) = self.task_ctx.proof_jitter { @@ -1079,7 +1111,7 @@ where } // Mark worker as available again. - self.available_workers.fetch_add(1, Ordering::Relaxed); + self.availability.mark_idle(self.worker_id); idle_start = Instant::now(); }