Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/src/banking_stage/consume_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {
self.metrics.has_data.store(true, Ordering::Relaxed);

self.consumed_sender.send(FinishedConsumeWork {
slot: Some(bank.slot()),
work,
retryable_indexes: output
.execute_and_commit_transactions_output
Expand Down Expand Up @@ -164,6 +165,7 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {
.fetch_add(num_retryable, Ordering::Relaxed);
self.metrics.has_data.store(true, Ordering::Relaxed);
self.consumed_sender.send(FinishedConsumeWork {
slot: None,
work,
retryable_indexes,
})?;
Expand Down
2 changes: 2 additions & 0 deletions core/src/banking_stage/scheduler_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ pub struct ConsumeWork<Tx> {
/// Message: [Worker -> Scheduler]
/// Processed transactions.
pub struct FinishedConsumeWork<Tx> {
/// Slot that work was attempted on. `None` if sent back without attempt.
pub slot: Option<Slot>,
pub work: ConsumeWork<Tx>,
pub retryable_indexes: Vec<usize>,
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ impl<Tx: TransactionWithMeta> Scheduler<Tx> for GreedyScheduler<Tx> {
let mut num_sent: usize = 0;
let mut num_unschedulable_conflicts: usize = 0;
let mut num_unschedulable_threads: usize = 0;
let mut num_skipped_retry: usize = 0;

let mut batches = Batches::new(num_threads, self.config.target_transactions_per_batch);
while num_scanned < self.config.max_scanned_transactions_per_scheduling_pass
Expand Down Expand Up @@ -152,6 +153,10 @@ impl<Tx: TransactionWithMeta> Scheduler<Tx> for GreedyScheduler<Tx> {
num_unschedulable_threads += 1;
self.unschedulables.push(id);
}
Err(TransactionSchedulingError::Skipped) => {
num_skipped_retry += 1;
self.unschedulables.push(id);
}
Ok(TransactionSchedulingInfo {
thread_id,
transaction,
Expand Down Expand Up @@ -209,6 +214,7 @@ impl<Tx: TransactionWithMeta> Scheduler<Tx> for GreedyScheduler<Tx> {
num_scheduled,
num_unschedulable_conflicts,
num_unschedulable_threads,
num_skipped_retry,
num_filtered_out: 0,
filter_time_us: 0,
})
Expand All @@ -228,6 +234,7 @@ fn try_schedule_transaction<Tx: TransactionWithMeta>(
) -> Result<TransactionSchedulingInfo<Tx>, TransactionSchedulingError> {
match pre_lock_filter(transaction_state) {
PreLockFilterAction::AttemptToSchedule => {}
PreLockFilterAction::SkipAndRetain => return Err(TransactionSchedulingError::Skipped),
}

// Schedule the transaction if it can be.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ impl<Tx: TransactionWithMeta> Scheduler<Tx> for PrioGraphScheduler<Tx> {
let mut num_sent: usize = 0;
let mut num_unschedulable_conflicts: usize = 0;
let mut num_unschedulable_threads: usize = 0;
let mut num_skipped_retry: usize = 0;
while num_scanned < self.config.max_scanned_transactions_per_scheduling_pass {
// If nothing is in the main-queue of the `PrioGraph` then there's nothing left to schedule.
if self.prio_graph.is_empty() {
Expand Down Expand Up @@ -239,6 +240,10 @@ impl<Tx: TransactionWithMeta> Scheduler<Tx> for PrioGraphScheduler<Tx> {
num_unschedulable_threads += 1;
unschedulable_ids.push(id);
}
Err(TransactionSchedulingError::Skipped) => {
num_skipped_retry += 1;
unschedulable_ids.push(id);
}
Ok(TransactionSchedulingInfo {
thread_id,
transaction,
Expand Down Expand Up @@ -331,6 +336,7 @@ impl<Tx: TransactionWithMeta> Scheduler<Tx> for PrioGraphScheduler<Tx> {
num_scheduled,
num_unschedulable_conflicts,
num_unschedulable_threads,
num_skipped_retry,
num_filtered_out,
filter_time_us: total_filter_time_us,
})
Expand Down Expand Up @@ -370,6 +376,7 @@ fn try_schedule_transaction<Tx: TransactionWithMeta>(
) -> Result<TransactionSchedulingInfo<Tx>, TransactionSchedulingError> {
match pre_lock_filter(transaction_state) {
PreLockFilterAction::AttemptToSchedule => {}
PreLockFilterAction::SkipAndRetain => return Err(TransactionSchedulingError::Skipped),
}

// Check if this transaction conflicts with any blocked transactions
Expand Down Expand Up @@ -694,6 +701,7 @@ mod tests {
// Complete batch on thread 0. Remaining txs can be scheduled onto thread 1
finished_work_sender
.send(FinishedConsumeWork {
slot: None,
work: thread_0_work.into_iter().next().unwrap(),
retryable_indexes: vec![],
})
Expand Down
4 changes: 4 additions & 0 deletions core/src/banking_stage/transaction_scheduler/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ pub(crate) trait Scheduler<Tx: TransactionWithMeta> {
pub(crate) enum PreLockFilterAction {
/// Attempt to schedule the transaction.
AttemptToSchedule,
/// Skip the transaction but do not drop it.
SkipAndRetain,
}

/// Metrics from scheduling transactions.
Expand All @@ -64,6 +66,8 @@ pub(crate) struct SchedulingSummary {
pub num_unschedulable_conflicts: usize,
/// Number of transactions that were skipped due to thread capacity.
pub num_unschedulable_threads: usize,
/// Number of transactions that were skipped due too recently attempted.
pub num_skipped_retry: usize,
/// Number of transactions that were dropped due to filter.
pub num_filtered_out: usize,
/// Time spent filtering transactions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ pub enum TransactionSchedulingError {
UnschedulableConflicts,
/// Thread is not allowed to be scheduled on at this time.
UnschedulableThread,
/// Transaction was skipped by pre-filter logic.
Skipped,
}

/// Given the schedulable `thread_set`, select the thread with the least amount
Expand Down Expand Up @@ -198,6 +200,7 @@ impl<Tx: TransactionWithMeta> SchedulingCommon<Tx> {
max_ages: _,
},
retryable_indexes,
slot,
}) => {
let num_transactions = ids.len();
let num_retryable = retryable_indexes.len();
Expand All @@ -210,7 +213,7 @@ impl<Tx: TransactionWithMeta> SchedulingCommon<Tx> {
for (index, (id, transaction)) in izip!(ids, transactions).enumerate() {
if let Some(retryable_index) = retryable_iter.peek() {
if *retryable_index == index {
container.retry_transaction(id, transaction);
container.retry_transaction(slot, id, transaction);
retryable_iter.next();
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,13 @@ where
MAX_PROCESSING_AGE,
)
},
|_| PreLockFilterAction::AttemptToSchedule // no pre-lock filter for now
|state| {
if state.last_tried_slot() == Some(bank_start.working_bank.slot()) {
PreLockFilterAction::SkipAndRetain
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to clarify, what are the cases worker would send back tx for retry? Wondering if all retry cases are not suitable for current slot

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. block limits exceeded
  2. poh recording failed (slot ended)
  3. account locks taken (shouldn't happen due to scheduling logic)
    • ugh this does happen because of jito though...
  4. bank failed to become available in 50ms (in this case None is returned for the slot)

} else {
PreLockFilterAction::AttemptToSchedule
}
}
)?);

self.count_metrics.update(|count_metrics| {
Expand Down Expand Up @@ -543,6 +549,7 @@ mod tests {

finished_consume_work_sender
.send(FinishedConsumeWork {
slot: None,
work: ConsumeWork {
batch_id: TransactionBatchId::new(0),
ids: vec![],
Expand Down Expand Up @@ -881,6 +888,7 @@ mod tests {
// Complete the batch - marking the second transaction as retryable
finished_consume_work_sender
.send(FinishedConsumeWork {
slot: None,
work: consume_work,
retryable_indexes: vec![1],
})
Expand All @@ -898,4 +906,78 @@ mod tests {
.collect_vec();
assert_eq!(message_hashes, vec![&tx1_hash]);
}

#[test_case(test_create_sanitized_transaction_receive_and_buffer; "Sdk")]
#[test_case(test_create_transaction_view_receive_and_buffer; "View")]
fn test_schedule_consume_slot_gated_retry<R: ReceiveAndBuffer>(
create_receive_and_buffer: impl FnOnce(BankingPacketReceiver, Arc<RwLock<BankForks>>) -> R,
) {
let (test_frame, mut scheduler_controller) =
create_test_frame(1, create_receive_and_buffer);
let TestFrame {
bank,
mint_keypair,
poh_recorder,
banking_packet_sender,
consume_work_receivers,
finished_consume_work_sender,
..
} = &test_frame;

poh_recorder
.write()
.unwrap()
.set_bank_for_test(bank.clone());

// Send packet batch to the scheduler - should do nothing until we become the leader.
let tx1 = create_and_fund_prioritized_transfer(
bank,
mint_keypair,
&Keypair::new(),
&Pubkey::new_unique(),
1,
1000,
bank.last_blockhash(),
);
let tx2 = create_and_fund_prioritized_transfer(
bank,
mint_keypair,
&Keypair::new(),
&Pubkey::new_unique(),
1,
2000,
bank.last_blockhash(),
);
let tx1_hash = tx1.message().hash();
let tx2_hash = tx2.message().hash();

let txs = vec![tx1, tx2];
banking_packet_sender
.send(to_banking_packet_batch(&txs))
.unwrap();

test_receive_then_schedule(&mut scheduler_controller);
let consume_work = consume_work_receivers[0].try_recv().unwrap();
assert_eq!(consume_work.ids.len(), 2);
assert_eq!(consume_work.transactions.len(), 2);
let message_hashes = consume_work
.transactions
.iter()
.map(|tx| tx.message_hash())
.collect_vec();
assert_eq!(message_hashes, vec![&tx2_hash, &tx1_hash]);

// Complete the batch - marking the second transaction as retryable
finished_consume_work_sender
.send(FinishedConsumeWork {
slot: Some(bank.slot()),
work: consume_work,
retryable_indexes: vec![1],
})
.unwrap();

// Transaction should NOT be rescheduled
test_receive_then_schedule(&mut scheduler_controller);
assert!(consume_work_receivers[0].is_empty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,7 @@ pub struct SchedulingDetails {
pub sum_num_scheduled: usize,
pub sum_unschedulable_conflicts: usize,
pub sum_unschedulable_threads: usize,
pub sum_skipped_retry: usize,
}

impl Default for SchedulingDetails {
Expand All @@ -435,6 +436,7 @@ impl Default for SchedulingDetails {
sum_num_scheduled: 0,
sum_unschedulable_conflicts: 0,
sum_unschedulable_threads: 0,
sum_skipped_retry: 0,
}
}
}
Expand All @@ -458,6 +460,7 @@ impl SchedulingDetails {
.max_starting_buffer_size
.max(scheduling_summary.starting_buffer_size);
self.sum_starting_buffer_size += scheduling_summary.starting_buffer_size;
self.sum_skipped_retry += scheduling_summary.num_skipped_retry;

self.sum_num_scheduled += scheduling_summary.num_scheduled;
self.sum_unschedulable_conflicts += scheduling_summary.num_unschedulable_conflicts;
Expand Down Expand Up @@ -502,6 +505,7 @@ impl SchedulingDetails {
self.sum_unschedulable_threads,
i64
),
("num_skipped_retry", self.sum_skipped_retry, i64),
);
*self = Self {
last_report: now,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::banking_stage::scheduler_messages::MaxAge;
use {crate::banking_stage::scheduler_messages::MaxAge, solana_sdk::clock::Slot};

/// TransactionState is used to track the state of a transaction in the transaction scheduler
/// and banking stage as a whole.
Expand All @@ -20,6 +20,8 @@ pub(crate) struct TransactionState<Tx> {
priority: u64,
/// Estimated cost of the transaction.
cost: u64,
/// Last slot transaction attempted execution on.
last_tried_slot: Option<Slot>,
}

impl<Tx> TransactionState<Tx> {
Expand All @@ -30,6 +32,7 @@ impl<Tx> TransactionState<Tx> {
max_age,
priority,
cost,
last_tried_slot: None,
}
}

Expand Down Expand Up @@ -58,6 +61,16 @@ impl<Tx> TransactionState<Tx> {
(tx, self.max_age)
}

/// Return the last tried slot.
pub(crate) fn last_tried_slot(&self) -> Option<Slot> {
self.last_tried_slot
}

/// Set a new last tried slot.
pub(crate) fn set_last_tried_slot(&mut self, slot: Option<Slot>) {
self.last_tried_slot = slot;
}

/// Intended to be called when a transaction is retried. This method will
/// return ownership of the transaction to the state.
///
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use {
solana_runtime_transaction::{
runtime_transaction::RuntimeTransaction, transaction_with_meta::TransactionWithMeta,
},
solana_sdk::packet::PACKET_DATA_SIZE,
solana_sdk::{clock::Slot, packet::PACKET_DATA_SIZE},
std::sync::Arc,
};

Expand Down Expand Up @@ -67,10 +67,16 @@ pub(crate) trait StateContainer<Tx: TransactionWithMeta> {

/// Retries a transaction - inserts transaction back into map.
/// This transitions the transaction to `Unprocessed` state.
fn retry_transaction(&mut self, transaction_id: TransactionId, transaction: Tx) {
fn retry_transaction(
&mut self,
tried_slot: Option<Slot>,
transaction_id: TransactionId,
transaction: Tx,
) {
let transaction_state = self
.get_mut_transaction_state(transaction_id)
.expect("transaction must exist");
transaction_state.set_last_tried_slot(tried_slot);
let priority_id = TransactionPriorityId::new(transaction_state.priority(), transaction_id);
transaction_state.retry_transaction(transaction);
self.push_ids_into_queue(std::iter::once(priority_id));
Expand Down