diff --git a/rpc/src/optimistically_confirmed_bank_tracker.rs b/rpc/src/optimistically_confirmed_bank_tracker.rs index 7b14954c7d81f7..99466b531aa911 100644 --- a/rpc/src/optimistically_confirmed_bank_tracker.rs +++ b/rpc/src/optimistically_confirmed_bank_tracker.rs @@ -72,13 +72,13 @@ impl std::fmt::Debug for BankNotification { } } -pub type BankNotificationWithEventSequence = ( +pub type BankNotificationWithDependencyWork = ( BankNotification, - Option, // dependecy work sequence number + Option, // dependecy work id ); -pub type BankNotificationReceiver = Receiver; -pub type BankNotificationSender = Sender; +pub type BankNotificationReceiver = Receiver; +pub type BankNotificationSender = Sender; #[derive(Clone)] pub struct BankNotificationSenderConfig { @@ -138,7 +138,7 @@ impl OptimisticallyConfirmedBankTracker { #[allow(clippy::too_many_arguments)] fn recv_notification( - receiver: &Receiver, + receiver: &Receiver, bank_forks: &RwLock, optimistically_confirmed_bank: &RwLock, subscriptions: &RpcSubscriptions, @@ -270,7 +270,7 @@ impl OptimisticallyConfirmedBankTracker { #[allow(clippy::too_many_arguments)] pub fn process_notification( - (notification, dependency_work): BankNotificationWithEventSequence, + (notification, dependency_work): BankNotificationWithDependencyWork, bank_forks: &RwLock, optimistically_confirmed_bank: &RwLock, subscriptions: &RpcSubscriptions, @@ -469,7 +469,7 @@ mod tests { OptimisticallyConfirmedBankTracker::process_notification( ( BankNotification::OptimisticallyConfirmed(2), - None, /* no work sequence */ + None, /* no dependency work */ ), &bank_forks, &optimistically_confirmed_bank, @@ -489,7 +489,7 @@ mod tests { OptimisticallyConfirmedBankTracker::process_notification( ( BankNotification::OptimisticallyConfirmed(1), - None, /* no work sequence */ + None, /* no dependency work */ ), &bank_forks, &optimistically_confirmed_bank, @@ -509,7 +509,7 @@ mod tests { OptimisticallyConfirmedBankTracker::process_notification( ( BankNotification::OptimisticallyConfirmed(3), - None, /* no work sequence */ + None, /* no dependency work */ ), &bank_forks, &optimistically_confirmed_bank, @@ -534,7 +534,7 @@ mod tests { OptimisticallyConfirmedBankTracker::process_notification( ( BankNotification::Frozen(bank3), - None, /* no work sequence */ + None, /* no dependency work */ ), &bank_forks, &optimistically_confirmed_bank, @@ -558,7 +558,7 @@ mod tests { OptimisticallyConfirmedBankTracker::process_notification( ( BankNotification::OptimisticallyConfirmed(4), - None, /* no work sequence */ + None, /* no dependency work */ ), &bank_forks, &optimistically_confirmed_bank, @@ -591,7 +591,7 @@ mod tests { OptimisticallyConfirmedBankTracker::process_notification( ( BankNotification::NewRootBank(bank5), - None, /* no work sequence */ + None, /* no dependency work */ ), &bank_forks, &optimistically_confirmed_bank, @@ -614,7 +614,7 @@ mod tests { OptimisticallyConfirmedBankTracker::process_notification( ( BankNotification::NewRootedChain(parent_roots), - None, /* no work sequence */ + None, /* no dependency work */ ), &bank_forks, &optimistically_confirmed_bank, @@ -645,7 +645,7 @@ mod tests { OptimisticallyConfirmedBankTracker::process_notification( ( BankNotification::OptimisticallyConfirmed(6), - None, /* no work sequence */ + None, /* no dependency work */ ), &bank_forks, &optimistically_confirmed_bank, @@ -670,7 +670,7 @@ mod tests { OptimisticallyConfirmedBankTracker::process_notification( ( BankNotification::NewRootBank(bank7), - None, /* no work sequence */ + None, /* no dependency work */ ), &bank_forks, &optimistically_confirmed_bank, @@ -692,7 +692,7 @@ mod tests { OptimisticallyConfirmedBankTracker::process_notification( ( BankNotification::NewRootedChain(parent_roots), - None, /* no work sequence */ + None, /* no dependency work */ ), &bank_forks, &optimistically_confirmed_bank, @@ -718,8 +718,8 @@ mod tests { let exit = Arc::new(AtomicBool::new(false)); let dependency_tracker: Arc = Arc::new(dependency_tracker::DependencyTracker::default()); - let work_sequence_1 = 345; - let work_sequence_2 = 678; + let work_id_1 = 345; + let work_id_2 = 678; let tracker_clone = dependency_tracker.clone(); let handle = thread::spawn(move || { let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(100); @@ -756,7 +756,7 @@ mod tests { OptimisticallyConfirmedBankTracker::process_notification( ( BankNotification::OptimisticallyConfirmed(1), - Some(work_sequence_1), /* dependency work sequence */ + Some(work_id_1), /* dependency work id */ ), &bank_forks, &optimistically_confirmed_bank, @@ -781,7 +781,7 @@ mod tests { OptimisticallyConfirmedBankTracker::process_notification( ( BankNotification::Frozen(bank1), - Some(work_sequence_2), /* dependency work sequence */ + Some(work_id_2), /* dependency work id */ ), &bank_forks, &optimistically_confirmed_bank, @@ -800,8 +800,8 @@ mod tests { assert_eq!(pending_optimistically_confirmed_banks.len(), 0); }); - dependency_tracker.mark_this_and_all_previous_work_processed(work_sequence_1); - dependency_tracker.mark_this_and_all_previous_work_processed(work_sequence_2); + dependency_tracker.mark_this_and_all_previous_work_processed(work_id_1); + dependency_tracker.mark_this_and_all_previous_work_processed(work_id_2); handle.join().unwrap(); } diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index 30473d4524cd4b..49b33be3a00454 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -8809,7 +8809,7 @@ pub mod tests { OptimisticallyConfirmedBankTracker::process_notification( ( BankNotification::OptimisticallyConfirmed(2), - None, /* no work sequence */ + None, /* no dependency work */ ), &bank_forks, &optimistically_confirmed_bank, @@ -8833,7 +8833,7 @@ pub mod tests { OptimisticallyConfirmedBankTracker::process_notification( ( BankNotification::OptimisticallyConfirmed(1), - None, /* no work sequence */ + None, /* no dependency work */ ), &bank_forks, &optimistically_confirmed_bank, @@ -8857,7 +8857,7 @@ pub mod tests { OptimisticallyConfirmedBankTracker::process_notification( ( BankNotification::OptimisticallyConfirmed(3), - None, /* no work sequence */ + None, /* no dependency work */ ), &bank_forks, &optimistically_confirmed_bank, @@ -8882,7 +8882,7 @@ pub mod tests { OptimisticallyConfirmedBankTracker::process_notification( ( BankNotification::Frozen(bank3), - None, /* no work sequence */ + None, /* no dependency work */ ), &bank_forks, &optimistically_confirmed_bank, diff --git a/rpc/src/rpc_subscriptions.rs b/rpc/src/rpc_subscriptions.rs index 28be1d239d316c..7b6035030468f0 100644 --- a/rpc/src/rpc_subscriptions.rs +++ b/rpc/src/rpc_subscriptions.rs @@ -1992,7 +1992,7 @@ pub(crate) mod tests { OptimisticallyConfirmedBankTracker::process_notification( ( BankNotification::OptimisticallyConfirmed(3), - None, /* no work sequence */ + None, /* no dependency work */ ), &bank_forks, &optimistically_confirmed_bank, @@ -2049,7 +2049,7 @@ pub(crate) mod tests { OptimisticallyConfirmedBankTracker::process_notification( ( BankNotification::Frozen(bank3), - None, /* no work sequence */ + None, /* no dependency work */ ), &bank_forks, &optimistically_confirmed_bank, @@ -2172,7 +2172,7 @@ pub(crate) mod tests { OptimisticallyConfirmedBankTracker::process_notification( ( BankNotification::OptimisticallyConfirmed(3), - None, /* no work sequence */ + None, /* no dependency work */ ), &bank_forks, &optimistically_confirmed_bank, @@ -2291,7 +2291,7 @@ pub(crate) mod tests { OptimisticallyConfirmedBankTracker::process_notification( ( BankNotification::OptimisticallyConfirmed(3), - None, /* no work sequence */ + None, /* no dependency work */ ), &bank_forks, &optimistically_confirmed_bank, @@ -2350,7 +2350,7 @@ pub(crate) mod tests { OptimisticallyConfirmedBankTracker::process_notification( ( BankNotification::Frozen(bank3), - None, /* no work sequence */ + None, /* no dependency work */ ), &bank_forks, &optimistically_confirmed_bank, @@ -2800,7 +2800,7 @@ pub(crate) mod tests { OptimisticallyConfirmedBankTracker::process_notification( ( BankNotification::OptimisticallyConfirmed(1), - None, /* no work sequence */ + None, /* no dependency work */ ), &bank_forks, &optimistically_confirmed_bank, @@ -2859,7 +2859,7 @@ pub(crate) mod tests { OptimisticallyConfirmedBankTracker::process_notification( ( BankNotification::Frozen(bank2), - None, /* no work sequence */ + None, /* no dependency work */ ), &bank_forks, &optimistically_confirmed_bank, diff --git a/rpc/src/transaction_status_service.rs b/rpc/src/transaction_status_service.rs index a903a2d473aad5..f2fd6cd9cf738e 100644 --- a/rpc/src/transaction_status_service.rs +++ b/rpc/src/transaction_status_service.rs @@ -139,7 +139,7 @@ impl TransactionStatusService { costs, transaction_indexes, }, - work_sequence, + work_id, )) => { let mut status_and_memos_batch = blockstore.get_write_batch()?; @@ -256,8 +256,8 @@ impl TransactionStatusService { } if let Some(dependency_tracker) = dependency_tracker.as_ref() { - if let Some(work_sequence) = work_sequence { - dependency_tracker.mark_this_and_all_previous_work_processed(work_sequence); + if let Some(work_id) = work_id { + dependency_tracker.mark_this_and_all_previous_work_processed(work_id); } } } @@ -529,7 +529,7 @@ pub(crate) mod tests { transaction_status_sender .send(TransactionStatusMessage::Batch(( transaction_status_batch, - None, /* No work sequence */ + None, /* No work id */ ))) .unwrap(); @@ -633,11 +633,11 @@ pub(crate) mod tests { Some(dependency_tracker.clone()), exit.clone(), ); - let work_sequence = 345; + let work_id = 345; transaction_status_sender .send(TransactionStatusMessage::Batch(( transaction_status_batch, - Some(work_sequence), + Some(work_id), ))) .unwrap(); transaction_status_service.quiesce_and_join_for_tests(exit); diff --git a/runtime/src/dependency_tracker.rs b/runtime/src/dependency_tracker.rs index f8418952aa3d34..a1bca8e15d9f53 100644 --- a/runtime/src/dependency_tracker.rs +++ b/runtime/src/dependency_tracker.rs @@ -4,10 +4,10 @@ use std::sync::{atomic::AtomicU64, Condvar, Mutex}; #[derive(Debug, Default)] pub struct DependencyTracker { - /// The current work sequence number - work_sequence: AtomicU64, - /// The processed work sequence number, if it is None, no work has been processed - processed_work_sequence: Mutex>, + /// The current work id + work_id: AtomicU64, + /// The processed work id, if it is None, no work has been processed + processed_work_id: Mutex>, condvar: Condvar, } @@ -16,40 +16,40 @@ fn less_than(a: &Option, b: u64) -> bool { } impl DependencyTracker { - /// Acquire the next work sequence number. - /// The sequence starts from 0 and increments by 1 each time it is called. + /// Acquire the next work id number. + /// The work id starts from 0 and increments by 1 each time it is called. pub fn declare_work(&self) -> u64 { - self.work_sequence + self.work_id .fetch_add(1, std::sync::atomic::Ordering::SeqCst) + 1 } - /// Notify all waiting threads that a work has occurred with the given sequence number. - /// This function will update the work sequence and notify all waiting threads only if the work - /// sequence is greater than the work sequence. Notify a work of sequence number 's' will - /// implicitly imply that all work with sequence number less than 's' have been processed. - pub fn mark_this_and_all_previous_work_processed(&self, sequence: u64) { - let mut work_sequence = self.processed_work_sequence.lock().unwrap(); - if less_than(&work_sequence, sequence) { - *work_sequence = Some(sequence); + /// Notify all waiting threads that a work has been processed with the given work id. + /// This function will update the processed work id and notify all waiting threads only if the work + /// id is greater than the procsessed work id. Notify a work of id number 's' will + /// implicitly imply that all work with id number less than 's' have been processed. + pub fn mark_this_and_all_previous_work_processed(&self, work_id: u64) { + let mut processed_work_id = self.processed_work_id.lock().unwrap(); + if less_than(&processed_work_id, work_id) { + *processed_work_id = Some(work_id); self.condvar.notify_all(); } } - /// To wait for the dependency work with 'sequence' to be processed. - pub fn wait_for_dependency(&self, sequence: u64) { - if sequence == 0 { - return; // No need to wait for sequence 0 as real work starts from 1. + /// To wait for the dependency work with 'work_id' to be processed. + pub fn wait_for_dependency(&self, work_id: u64) { + if work_id == 0 { + return; // No need to wait for work id 0 as real work starts from 1. } - let mut processed_sequence = self.processed_work_sequence.lock().unwrap(); - while less_than(&processed_sequence, sequence) { - processed_sequence = self.condvar.wait(processed_sequence).unwrap(); + let mut processed_work_id = self.processed_work_id.lock().unwrap(); + while less_than(&processed_work_id, work_id) { + processed_work_id = self.condvar.wait(processed_work_id).unwrap(); } } - /// Get the current work sequence number. + /// Get the current work id number. pub fn get_current_declared_work(&self) -> u64 { - self.work_sequence.load(std::sync::atomic::Ordering::SeqCst) + self.work_id.load(std::sync::atomic::Ordering::SeqCst) } } @@ -69,7 +69,7 @@ mod tests { } #[test] - fn test_get_new_work_sequence() { + fn test_get_new_work_id() { let dependency_tracker = DependencyTracker::default(); assert_eq!(dependency_tracker.declare_work(), 1); assert_eq!(dependency_tracker.declare_work(), 2); @@ -81,21 +81,21 @@ mod tests { let dependency_tracker = DependencyTracker::default(); dependency_tracker.mark_this_and_all_previous_work_processed(1); - let processed_sequence = *dependency_tracker.processed_work_sequence.lock().unwrap(); - assert_eq!(processed_sequence, Some(1)); + let processed_work_id = *dependency_tracker.processed_work_id.lock().unwrap(); + assert_eq!(processed_work_id, Some(1)); - // notify a smaller sequence number, should not change the processed sequence + // notify a smaller work id number, should not change the processed work id dependency_tracker.mark_this_and_all_previous_work_processed(0); - let processed_sequence = *dependency_tracker.processed_work_sequence.lock().unwrap(); - assert_eq!(processed_sequence, Some(1)); - // notify a larger sequence number, should change the processed sequence + let processed_work_id = *dependency_tracker.processed_work_id.lock().unwrap(); + assert_eq!(processed_work_id, Some(1)); + // notify a larger work id number, should change the processed work id dependency_tracker.mark_this_and_all_previous_work_processed(2); - let processed_sequence = *dependency_tracker.processed_work_sequence.lock().unwrap(); - assert_eq!(processed_sequence, Some(2)); - // notify the same sequence number, should not change the processed sequence + let processed_work_id = *dependency_tracker.processed_work_id.lock().unwrap(); + assert_eq!(processed_work_id, Some(2)); + // notify the same work id number, should not change the processed work id dependency_tracker.mark_this_and_all_previous_work_processed(2); - let processed_sequence = *dependency_tracker.processed_work_sequence.lock().unwrap(); - assert_eq!(processed_sequence, Some(2)); + let processed_work_id = *dependency_tracker.processed_work_id.lock().unwrap(); + assert_eq!(processed_work_id, Some(2)); } #[test] @@ -116,7 +116,7 @@ mod tests { dependency_tracker.mark_this_and_all_previous_work_processed(work); handle.join().unwrap(); - let processed_sequence = *dependency_tracker.processed_work_sequence.lock().unwrap(); - assert_eq!(processed_sequence, Some(2)); + let processed_work_id = *dependency_tracker.processed_work_id.lock().unwrap(); + assert_eq!(processed_work_id, Some(2)); } } diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index 506a2526b9849c..fd508d2fe7209d 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -4578,7 +4578,7 @@ mod tests { receiver.try_recv(), Ok(TransactionStatusMessage::Batch(( TransactionStatusBatch { .. }, - None, // no work sequence + None, // no work id ))) ); assert_matches!(