Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions crates/engine/tree/src/tree/payload_processor/multiproof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,17 @@ pub(crate) fn dispatch_with_chunking<T, I>(
chunking_len: usize,
chunk_size: usize,
max_targets_for_chunking: usize,
available_account_workers: usize,
available_storage_workers: usize,
has_multiple_idle_account_workers: bool,
has_multiple_idle_storage_workers: bool,
chunker: impl FnOnce(T, usize) -> I,
mut dispatch: impl FnMut(T),
) -> usize
where
I: IntoIterator<Item = T>,
{
let should_chunk = chunking_len > max_targets_for_chunking ||
available_account_workers > 1 ||
available_storage_workers > 1;
has_multiple_idle_account_workers ||
has_multiple_idle_storage_workers;

if should_chunk && chunking_len > chunk_size {
let mut num_chunks = 0usize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -777,8 +777,8 @@ where
chunking_length,
self.chunk_size,
self.max_targets_for_chunking,
self.proof_worker_handle.available_account_workers(),
self.proof_worker_handle.available_storage_workers(),
self.proof_worker_handle.has_multiple_idle_account_workers(),
self.proof_worker_handle.has_multiple_idle_storage_workers(),
MultiProofTargetsV2::chunks,
|proof_targets| {
if let Err(e) =
Expand Down
1 change: 1 addition & 0 deletions crates/trie/parallel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ derive_more.workspace = true
rayon.workspace = true
itertools.workspace = true
crossbeam-channel.workspace = true
crossbeam-utils.workspace = true

# `metrics` feature
reth-metrics = { workspace = true, optional = true }
Expand Down
128 changes: 80 additions & 48 deletions crates/trie/parallel/src/proof_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ use std::{
cell::RefCell,
rc::Rc,
sync::{
atomic::{AtomicUsize, Ordering},
atomic::{AtomicBool, AtomicUsize, Ordering},
mpsc::{channel, Receiver, Sender},
Arc,
},
Expand Down Expand Up @@ -86,6 +86,52 @@ type V2StorageProofCalculator<'a, Provider> = proof_v2::StorageProofCalculator<
InstrumentedHashedCursor<'a, <Provider as HashedCursorFactory>::StorageCursor<'a>>,
>;

/// Tracks worker availability counts.
///
/// It uses cacheline-aligned flags to avoid core-to-core chatter.
#[derive(Debug)]
struct AvailabilitySheet {
/// One flag per worker, each on its own cacheline. Workers store `true` when idle,
/// `false` when busy. Only the owning worker writes; the dispatcher only reads.
flags: Vec<crossbeam_utils::CachePadded<AtomicBool>>,
}

impl AvailabilitySheet {
/// Creates a new sheet with `count` workers, all initially marked as busy.
fn new(count: usize) -> Self {
let flags =
(0..count).map(|_| crossbeam_utils::CachePadded::new(AtomicBool::new(false))).collect();
Self { flags }
}

/// Returns `true` if more than one worker is currently idle.
///
/// Note, that this is somewhat racy since a flag that was just saying `idle` and we counted it
/// as such might turn into `busy` right away.
fn has_multiple_idle(&self) -> bool {
let mut idle = 0u32;
for flag in &self.flags {
if flag.load(Ordering::Relaxed) {
idle += 1;
if idle > 1 {
return true;
}
}
}
false
}

/// Marks the given worker as idle.
fn mark_idle(&self, worker_id: usize) {
self.flags[worker_id].store(true, Ordering::Relaxed);
}

/// Marks the given worker as busy.
fn mark_busy(&self, worker_id: usize) {
self.flags[worker_id].store(false, Ordering::Relaxed);
}
}

/// A handle that provides type-safe access to proof worker pools.
///
/// The handle stores direct senders to both storage and account worker pools,
Expand All @@ -97,12 +143,12 @@ pub struct ProofWorkerHandle {
storage_work_tx: CrossbeamSender<StorageWorkerJob>,
/// Direct sender to account worker pool
account_work_tx: CrossbeamSender<AccountWorkerJob>,
/// Counter tracking available storage workers. Workers decrement when starting work,
/// increment when finishing. Used to determine whether to chunk multiproofs.
storage_available_workers: Arc<AtomicUsize>,
/// Counter tracking available account workers. Workers decrement when starting work,
/// increment when finishing. Used to determine whether to chunk multiproofs.
account_available_workers: Arc<AtomicUsize>,
/// Per-worker availability flags for storage workers. Used to determine whether to chunk
/// multiproofs.
storage_availability: Arc<AvailabilitySheet>,
/// Per-worker availability flags for account workers. Used to determine whether to chunk
/// multiproofs.
account_availability: Arc<AvailabilitySheet>,
/// Total number of storage workers spawned
storage_worker_count: usize,
/// Total number of account workers spawned
Expand Down Expand Up @@ -140,9 +186,6 @@ impl ProofWorkerHandle {
let (storage_work_tx, storage_work_rx) = unbounded::<StorageWorkerJob>();
let (account_work_tx, account_work_rx) = unbounded::<AccountWorkerJob>();

let storage_available_workers = Arc::<AtomicUsize>::default();
let account_available_workers = Arc::<AtomicUsize>::default();

let cached_storage_roots = Arc::<DashMap<_, _>>::default();

let divisor = if halve_workers { 2 } else { 1 };
Expand All @@ -151,6 +194,9 @@ impl ProofWorkerHandle {
let account_worker_count =
runtime.proof_account_worker_pool().current_num_threads() / divisor;

let storage_availability = Arc::new(AvailabilitySheet::new(storage_worker_count));
let account_availability = Arc::new(AvailabilitySheet::new(account_worker_count));

debug!(
target: "trie::proof_task",
storage_worker_count,
Expand All @@ -163,7 +209,7 @@ impl ProofWorkerHandle {
// tokio's blocking pool.
let storage_rt = runtime.clone();
let storage_task_ctx = task_ctx.clone();
let storage_avail = storage_available_workers.clone();
let storage_avail = storage_availability.clone();
let storage_roots = cached_storage_roots.clone();
let storage_parent_span = tracing::Span::current();
runtime.spawn_blocking_named("storage-workers", move || {
Expand Down Expand Up @@ -202,7 +248,7 @@ impl ProofWorkerHandle {

let account_rt = runtime.clone();
let account_tx = storage_work_tx.clone();
let account_avail = account_available_workers.clone();
let account_avail = account_availability.clone();
let account_parent_span = tracing::Span::current();
runtime.spawn_blocking_named("account-workers", move || {
let worker_id = AtomicUsize::new(0);
Expand Down Expand Up @@ -242,21 +288,21 @@ impl ProofWorkerHandle {
Self {
storage_work_tx,
account_work_tx,
storage_available_workers,
account_available_workers,
storage_availability,
account_availability,
storage_worker_count,
account_worker_count,
}
}

/// Returns how many storage workers are currently available/idle.
pub fn available_storage_workers(&self) -> usize {
self.storage_available_workers.load(Ordering::Relaxed)
/// Returns `true` if more than one storage worker is currently idle.
pub fn has_multiple_idle_storage_workers(&self) -> bool {
self.storage_availability.has_multiple_idle()
}

/// Returns how many account workers are currently available/idle.
pub fn available_account_workers(&self) -> usize {
self.account_available_workers.load(Ordering::Relaxed)
/// Returns `true` if more than one account worker is currently idle.
pub fn has_multiple_idle_account_workers(&self) -> bool {
self.account_availability.has_multiple_idle()
}

/// Returns the number of pending storage tasks in the queue.
Expand All @@ -279,20 +325,6 @@ impl ProofWorkerHandle {
self.account_worker_count
}

/// Returns the number of storage workers currently processing tasks.
///
/// This is calculated as total workers minus available workers.
pub fn active_storage_workers(&self) -> usize {
self.storage_worker_count.saturating_sub(self.available_storage_workers())
}

/// Returns the number of account workers currently processing tasks.
///
/// This is calculated as total workers minus available workers.
pub fn active_account_workers(&self) -> usize {
self.account_worker_count.saturating_sub(self.available_account_workers())
}

/// Dispatch a storage proof computation to storage worker pool
///
/// The result will be sent via the `proof_result_sender` channel.
Expand Down Expand Up @@ -635,8 +667,8 @@ struct StorageProofWorker<Factory> {
work_rx: CrossbeamReceiver<StorageWorkerJob>,
/// Unique identifier for this worker (used for tracing)
worker_id: usize,
/// Counter tracking worker availability
available_workers: Arc<AtomicUsize>,
/// Per-worker availability flags
availability: Arc<AvailabilitySheet>,
/// Cached storage roots
cached_storage_roots: Arc<DashMap<B256, B256>>,
/// Metrics collector for this worker
Expand All @@ -656,7 +688,7 @@ where
task_ctx: ProofTaskCtx<Factory>,
work_rx: CrossbeamReceiver<StorageWorkerJob>,
worker_id: usize,
available_workers: Arc<AtomicUsize>,
availability: Arc<AvailabilitySheet>,
cached_storage_roots: Arc<DashMap<B256, B256>>,
#[cfg(feature = "metrics")] metrics: ProofTaskTrieMetrics,
#[cfg(feature = "metrics")] cursor_metrics: ProofTaskCursorMetrics,
Expand All @@ -665,7 +697,7 @@ where
task_ctx,
work_rx,
worker_id,
available_workers,
availability,
cached_storage_roots,
#[cfg(feature = "metrics")]
metrics,
Expand Down Expand Up @@ -719,7 +751,7 @@ where
);

// Initially mark this worker as available.
self.available_workers.fetch_add(1, Ordering::Relaxed);
self.availability.mark_idle(self.worker_id);

let mut total_idle_time = Duration::ZERO;
let mut idle_start = Instant::now();
Expand All @@ -728,7 +760,7 @@ where
total_idle_time += idle_start.elapsed();

// Mark worker as busy.
self.available_workers.fetch_sub(1, Ordering::Relaxed);
self.availability.mark_busy(self.worker_id);

#[cfg(feature = "trie-debug")]
if let Some(max_jitter) = self.task_ctx.proof_jitter {
Expand Down Expand Up @@ -767,7 +799,7 @@ where
}

// Mark worker as available again.
self.available_workers.fetch_add(1, Ordering::Relaxed);
self.availability.mark_idle(self.worker_id);

idle_start = Instant::now();
}
Expand Down Expand Up @@ -911,8 +943,8 @@ struct AccountProofWorker<Factory> {
worker_id: usize,
/// Channel for dispatching storage proof work (for pre-dispatched target proofs)
storage_work_tx: CrossbeamSender<StorageWorkerJob>,
/// Counter tracking worker availability
available_workers: Arc<AtomicUsize>,
/// Per-worker availability flags
availability: Arc<AvailabilitySheet>,
/// Cached storage roots
cached_storage_roots: Arc<DashMap<B256, B256>>,
/// Metrics collector for this worker
Expand All @@ -934,7 +966,7 @@ where
work_rx: CrossbeamReceiver<AccountWorkerJob>,
worker_id: usize,
storage_work_tx: CrossbeamSender<StorageWorkerJob>,
available_workers: Arc<AtomicUsize>,
availability: Arc<AvailabilitySheet>,
cached_storage_roots: Arc<DashMap<B256, B256>>,
#[cfg(feature = "metrics")] metrics: ProofTaskTrieMetrics,
#[cfg(feature = "metrics")] cursor_metrics: ProofTaskCursorMetrics,
Expand All @@ -944,7 +976,7 @@ where
work_rx,
worker_id,
storage_work_tx,
available_workers,
availability,
cached_storage_roots,
#[cfg(feature = "metrics")]
metrics,
Expand Down Expand Up @@ -1030,7 +1062,7 @@ where
)));

// Count this worker as available only after successful initialization.
self.available_workers.fetch_add(1, Ordering::Relaxed);
self.availability.mark_idle(self.worker_id);

let mut total_idle_time = Duration::ZERO;
let mut idle_start = Instant::now();
Expand All @@ -1040,7 +1072,7 @@ where
total_idle_time += idle_start.elapsed();

// Mark worker as busy.
self.available_workers.fetch_sub(1, Ordering::Relaxed);
self.availability.mark_busy(self.worker_id);

#[cfg(feature = "trie-debug")]
if let Some(max_jitter) = self.task_ctx.proof_jitter {
Expand Down Expand Up @@ -1079,7 +1111,7 @@ where
}

// Mark worker as available again.
self.available_workers.fetch_add(1, Ordering::Relaxed);
self.availability.mark_idle(self.worker_id);

idle_start = Instant::now();
}
Expand Down
Loading