diff --git a/mempool/src/shared_mempool/tasks.rs b/mempool/src/shared_mempool/tasks.rs index bf97397f9e1d5..c54f0ba46196e 100644 --- a/mempool/src/shared_mempool/tasks.rs +++ b/mempool/src/shared_mempool/tasks.rs @@ -16,7 +16,7 @@ use crate::{ }, use_case_history::UseCaseHistory, }, - thread_pool::IO_POOL, + thread_pool::{IO_POOL, VALIDATION_POOL}, QuorumStoreRequest, QuorumStoreResponse, SubmissionStatus, }; use anyhow::Result; @@ -45,7 +45,6 @@ use std::{ time::{Duration, Instant}, }; use tokio::runtime::Handle; - // ============================== // // broadcast_coordinator tasks // // ============================== // @@ -393,18 +392,20 @@ fn validate_and_add_transactions( let vm_validation_timer = counters::PROCESS_TXN_BREAKDOWN_LATENCY .with_label_values(&[counters::VM_VALIDATION_LABEL]) .start_timer(); - let validation_results = transactions - .par_iter() - .map(|t| { - let result = smp.validator.read().validate_transaction(t.0.clone()); - // Pre-compute the hash and length if the transaction is valid, before locking mempool - if result.is_ok() { - t.0.committed_hash(); - t.0.txn_bytes_len(); - } - result - }) - .collect::>(); + let validation_results = VALIDATION_POOL.install(|| { + transactions + .par_iter() + .map(|t| { + let result = smp.validator.read().validate_transaction(t.0.clone()); + // Pre-compute the hash and length if the transaction is valid, before locking mempool + if result.is_ok() { + t.0.committed_hash(); + t.0.txn_bytes_len(); + } + result + }) + .collect::>() + }); vm_validation_timer.stop_and_record(); { let mut mempool = smp.mempool.lock(); diff --git a/mempool/src/thread_pool.rs b/mempool/src/thread_pool.rs index 43c5dc2dc7f2e..7ad58b1c2114a 100644 --- a/mempool/src/thread_pool.rs +++ b/mempool/src/thread_pool.rs @@ -11,3 +11,10 @@ pub(crate) static IO_POOL: Lazy = Lazy::new(|| { .build() .unwrap() }); + +pub(crate) static VALIDATION_POOL: Lazy = Lazy::new(|| { + rayon::ThreadPoolBuilder::new() + .thread_name(|index| format!("mempool_vali_{}", index)) + .build() + .unwrap() +}); diff --git a/storage/storage-interface/src/state_store/sharded_state_updates.rs b/storage/storage-interface/src/state_store/sharded_state_updates.rs index e7459db931d84..4abf2c5e9be9a 100644 --- a/storage/storage-interface/src/state_store/sharded_state_updates.rs +++ b/storage/storage-interface/src/state_store/sharded_state_updates.rs @@ -31,12 +31,14 @@ impl ShardedStateUpdates { } pub fn merge(&mut self, other: Self) { - self.shards - .par_iter_mut() - .zip_eq(other.shards.into_par_iter()) - .for_each(|(l, r)| { - l.extend(r); - }) + THREAD_MANAGER.get_exe_cpu_pool().install(|| { + self.shards + .par_iter_mut() + .zip_eq(other.shards.into_par_iter()) + .for_each(|(l, r)| { + l.extend(r); + }) + }) } pub fn clone_merge(&mut self, other: &Self) {