Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions core/src/banking_stage/consume_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use {
leader_slot_timing_metrics::LeaderExecuteAndCommitTimings,
scheduler_messages::{ConsumeWork, FinishedConsumeWork},
},
crate::banking_stage::consumer::RetryableIndex,
crossbeam_channel::{Receiver, RecvError, SendError, Sender},
solana_measure::measure_us,
solana_poh::poh_recorder::SharedWorkingBank,
Expand Down Expand Up @@ -179,7 +180,12 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {

/// Send transactions back to scheduler as retryable.
fn retry(&self, work: ConsumeWork<Tx>) -> Result<(), ConsumeWorkerError<Tx>> {
let retryable_indexes: Vec<_> = (0..work.transactions.len()).collect();
let retryable_indexes: Vec<_> = (0..work.transactions.len())
.map(|index| RetryableIndex {
index,
immediately_retryable: true,
})
.collect();
let num_retryable = retryable_indexes.len();
self.metrics
.count_metrics
Expand Down Expand Up @@ -942,7 +948,10 @@ mod tests {
assert_eq!(consumed.work.batch_id, bid);
assert_eq!(consumed.work.ids, vec![id]);
assert_eq!(consumed.work.max_ages, vec![max_age]);
assert_eq!(consumed.retryable_indexes, vec![0]);
assert_eq!(
consumed.retryable_indexes,
vec![RetryableIndex::new(0, true)]
);

drop(test_frame);
let _ = worker_thread.join().unwrap();
Expand Down Expand Up @@ -991,7 +1000,7 @@ mod tests {
assert_eq!(consumed.work.batch_id, bid);
assert_eq!(consumed.work.ids, vec![id]);
assert_eq!(consumed.work.max_ages, vec![max_age]);
assert_eq!(consumed.retryable_indexes, Vec::<usize>::new());
assert_eq!(consumed.retryable_indexes, Vec::new());

drop(test_frame);
let _ = worker_thread.join().unwrap();
Expand Down Expand Up @@ -1051,7 +1060,7 @@ mod tests {
if relax_intrabatch_account_locks {
vec![]
} else {
vec![1]
vec![RetryableIndex::new(1, true)]
}
);

Expand Down Expand Up @@ -1122,13 +1131,13 @@ mod tests {
assert_eq!(consumed.work.batch_id, bid1);
assert_eq!(consumed.work.ids, vec![id1]);
assert_eq!(consumed.work.max_ages, vec![max_age]);
assert_eq!(consumed.retryable_indexes, Vec::<usize>::new());
assert_eq!(consumed.retryable_indexes, Vec::new());

let consumed = consumed_receiver.recv().unwrap();
assert_eq!(consumed.work.batch_id, bid2);
assert_eq!(consumed.work.ids, vec![id2]);
assert_eq!(consumed.work.max_ages, vec![max_age]);
assert_eq!(consumed.retryable_indexes, Vec::<usize>::new());
assert_eq!(consumed.retryable_indexes, Vec::new());

drop(test_frame);
let _ = worker_thread.join().unwrap();
Expand Down Expand Up @@ -1258,7 +1267,7 @@ mod tests {
.unwrap();

let consumed = consumed_receiver.recv().unwrap();
assert_eq!(consumed.retryable_indexes, Vec::<usize>::new());
assert_eq!(consumed.retryable_indexes, Vec::new());
// all but one succeed. 6 for initial funding
assert_eq!(bank.transaction_count(), 6 + 5);

Expand Down
84 changes: 69 additions & 15 deletions core/src/banking_stage/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,21 @@ use {
/// Consumer will create chunks of transactions from buffer with up to this size.
pub const TARGET_NUM_TRANSACTIONS_PER_BATCH: usize = 64;

#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct RetryableIndex {
pub index: usize,
pub immediately_retryable: bool,
Comment thread
t-nelson marked this conversation as resolved.
}

impl RetryableIndex {
pub fn new(index: usize, immediately_retryable: bool) -> Self {
Self {
index,
immediately_retryable,
}
}
}

pub struct ProcessTransactionBatchOutput {
// The number of transactions filtered out by the cost model
pub(crate) cost_model_throttled_transactions_count: u64,
Expand All @@ -48,7 +63,7 @@ pub struct ExecuteAndCommitTransactionsOutput {
pub(crate) transaction_counts: LeaderProcessedTransactionCounts,
// Transactions that either were not executed, or were executed and failed to be committed due
// to the block ending.
pub(crate) retryable_transaction_indexes: Vec<usize>,
pub(crate) retryable_transaction_indexes: Vec<RetryableIndex>,
// A result that indicates whether transactions were successfully
// committed into the Poh stream.
pub commit_transactions_result: Result<Vec<CommitTransactionDetails>, PohRecorderError>,
Expand Down Expand Up @@ -256,23 +271,39 @@ impl Consumer {
// following are retryable errors
Err(TransactionError::AccountInUse) => {
error_counters.account_in_use += 1;
Some(index)
// locking failure due to vote conflict or jito - immediately retry.
Some(RetryableIndex {
index,
immediately_retryable: true,
})
}
Err(TransactionError::WouldExceedMaxBlockCostLimit) => {
error_counters.would_exceed_max_block_cost_limit += 1;
Some(index)
Some(RetryableIndex {
index,
immediately_retryable: false,
Comment thread
tao-stones marked this conversation as resolved.
})
}
Err(TransactionError::WouldExceedMaxVoteCostLimit) => {
error_counters.would_exceed_max_vote_cost_limit += 1;
Some(index)
Some(RetryableIndex {
index,
immediately_retryable: false,
})
}
Err(TransactionError::WouldExceedMaxAccountCostLimit) => {
error_counters.would_exceed_max_account_cost_limit += 1;
Some(index)
Some(RetryableIndex {
index,
immediately_retryable: false,
})
}
Err(TransactionError::WouldExceedAccountDataBlockLimit) => {
error_counters.would_exceed_account_data_block_limit += 1;
Some(index)
Some(RetryableIndex {
index,
immediately_retryable: false,
})
}
// following are non-retryable errors
Err(TransactionError::TooManyAccountLocks) => {
Expand Down Expand Up @@ -369,7 +400,12 @@ impl Consumer {

if let Err(recorder_err) = record_transactions_result {
retryable_transaction_indexes.extend(processing_results.iter().enumerate().filter_map(
|(index, processing_result)| processing_result.was_processed().then_some(index),
|(index, processing_result)| {
processing_result.was_processed().then_some(RetryableIndex {
index,
immediately_retryable: true, // recording errors are always immediately retryable
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit picking: instead of mapping different retryable scenarios to immediately_retryable: bool in various parts of the code, maybe consider constructing RetryableIndex with the specific reason (say enum { NotScheduled, WouldExceedCuLimit, RecordErrror, ..}) and centralizing the immediate retry decision in one place (perhaps as a member function of RetryableIndex). This could help improve clarity and maintainability.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think your suggestion is the more correct one, however I'd extend it. retryable_indexes should not be a thing at all...we should simply return the transaction results, and scheduler should make a decision from there.

That seems like a larger change, but I'll give it a shot.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After looking the change is pretty involved, because retryable_indexes.len() is used in a ton of places in metrics accumulation. I think if we remove retryable indexes it should be done separately, wdyt?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree that returning transaction results to scheduler is a better solution, and it should be done in separately.

})
},
));

// retryable indexes are expected to be sorted - in this case the
Expand Down Expand Up @@ -754,7 +790,13 @@ mod tests {
processed_with_successful_result_count: 1,
}
);
assert_eq!(retryable_transaction_indexes, vec![0]);
assert_eq!(
retryable_transaction_indexes,
vec![RetryableIndex {
index: 0,
immediately_retryable: true
}]
);
assert_matches!(
commit_transactions_result,
Err(PohRecorderError::MaxHeightReached)
Expand Down Expand Up @@ -1152,7 +1194,10 @@ mod tests {
commit_transactions_result.get(1),
Some(CommitTransactionDetails::NotCommitted(_))
);
assert_eq!(retryable_transaction_indexes, vec![1]);
assert_eq!(
retryable_transaction_indexes,
vec![RetryableIndex::new(1, true)]
);

let expected_block_cost = {
let (actual_programs_execution_cost, actual_loaded_accounts_data_size_cost) =
Expand Down Expand Up @@ -1311,9 +1356,12 @@ mod tests {

// with simd3, duplicate transactions are not retryable
if relax_intrabatch_account_locks && use_duplicate_transaction {
assert_eq!(retryable_transaction_indexes, Vec::<usize>::new());
assert_eq!(retryable_transaction_indexes, Vec::<_>::new());
} else {
assert_eq!(retryable_transaction_indexes, vec![1]);
assert_eq!(
retryable_transaction_indexes,
vec![RetryableIndex::new(1, true)]
);
}
}

Expand Down Expand Up @@ -1369,7 +1417,9 @@ mod tests {

assert_eq!(
execute_and_commit_transactions_output.retryable_transaction_indexes,
(1..transactions_len - 1).collect::<Vec<usize>>()
(1..transactions_len - 1)
.map(|index| RetryableIndex::new(index, true))
.collect::<Vec<_>>()
);
}

Expand Down Expand Up @@ -1455,12 +1505,14 @@ mod tests {
if relax_intrabatch_account_locks {
assert_eq!(
execute_and_commit_transactions_output.retryable_transaction_indexes,
Vec::<usize>::new()
Vec::<_>::new()
);
} else {
assert_eq!(
execute_and_commit_transactions_output.retryable_transaction_indexes,
(1..transactions_len).collect::<Vec<usize>>()
(1..transactions_len)
.map(|index| RetryableIndex::new(index, true))
.collect::<Vec<_>>()
);
}
}
Expand Down Expand Up @@ -1548,7 +1600,9 @@ mod tests {
execute_and_commit_transactions_output
.retryable_transaction_indexes
.sort_unstable();
let expected: Vec<usize> = (0..transactions.len()).collect();
let expected: Vec<_> = (0..transactions.len())
.map(|index| RetryableIndex::new(index, true))
.collect();
assert_eq!(
execute_and_commit_transactions_output.retryable_transaction_indexes,
expected
Expand Down
3 changes: 2 additions & 1 deletion core/src/banking_stage/scheduler_messages.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use {
crate::banking_stage::consumer::RetryableIndex,
solana_clock::{Epoch, Slot},
std::fmt::Display,
};
Expand Down Expand Up @@ -47,5 +48,5 @@ pub struct ConsumeWork<Tx> {
/// Processed transactions.
pub struct FinishedConsumeWork<Tx> {
pub work: ConsumeWork<Tx>,
pub retryable_indexes: Vec<usize>,
pub retryable_indexes: Vec<RetryableIndex>,
}
48 changes: 34 additions & 14 deletions core/src/banking_stage/transaction_scheduler/scheduler_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,13 @@ impl<Tx: TransactionWithMeta> SchedulingCommon<Tx> {
// Assumption - retryable indexes are in order (sorted by workers).
let mut retryable_iter = retryable_indexes.iter().peekable();
for (index, (id, transaction)) in izip!(ids, transactions).enumerate() {
if let Some(&&retryable_index) = retryable_iter.peek() {
if retryable_index == index {
container.retry_transaction(id, transaction);
if let Some(&retryable_index) = retryable_iter.peek() {
if retryable_index.index == index {
container.retry_transaction(
id,
transaction,
retryable_index.immediately_retryable,
);
retryable_iter.next();
continue;
}
Expand All @@ -254,7 +258,11 @@ impl<Tx: TransactionWithMeta> SchedulingCommon<Tx> {

debug_assert!(
retryable_iter.peek().is_none(),
"retryable indexes were not in order: {retryable_indexes:?}"
"retryable indexes were not in order: {:?}",
retryable_indexes
Comment thread
tao-stones marked this conversation as resolved.
.iter()
.map(|index| index.index)
.collect::<Vec<_>>(),
);

Ok((num_transactions, num_retryable))
Expand Down Expand Up @@ -290,11 +298,18 @@ impl<Tx: TransactionWithMeta> SchedulingCommon<Tx> {
mod tests {
use {
super::*,
crate::banking_stage::transaction_scheduler::transaction_state_container::TransactionStateContainer,
crossbeam_channel::unbounded, solana_hash::Hash, solana_keypair::Keypair,
solana_pubkey::Pubkey, solana_runtime_transaction::runtime_transaction::RuntimeTransaction,
crate::banking_stage::{
consumer::RetryableIndex,
transaction_scheduler::transaction_state_container::TransactionStateContainer,
},
crossbeam_channel::unbounded,
solana_hash::Hash,
solana_keypair::Keypair,
solana_pubkey::Pubkey,
solana_runtime_transaction::runtime_transaction::RuntimeTransaction,
solana_system_transaction as system_transaction,
solana_transaction::sanitized::SanitizedTransaction, test_case::test_case,
solana_transaction::sanitized::SanitizedTransaction,
test_case::test_case,
};

const NUM_WORKERS: usize = 4;
Expand Down Expand Up @@ -521,17 +536,22 @@ mod tests {
let num_scheduled = common.send_batch(0).unwrap();
let work = work_receivers[0].try_recv().unwrap();
assert_eq!(work.ids.len(), num_scheduled);
let retryable_indexes = vec![0, 1];
let retryable_indexes = vec![
RetryableIndex::new(0, true),
RetryableIndex::new(1, false), // should be held by container.
];
let expected_num_retryable = retryable_indexes.len();
let finished_work = FinishedConsumeWork {
work,
retryable_indexes: retryable_indexes.clone(),
retryable_indexes,
};
finished_work_sender.send(finished_work).unwrap();
let (num_transactions, num_retryable) =
common.try_receive_completed(&mut container).unwrap();
assert_eq!(num_transactions, num_scheduled);
assert_eq!(num_retryable, retryable_indexes.len());
assert_eq!(container.buffer_size(), retryable_indexes.len());
assert_eq!(num_retryable, expected_num_retryable);
assert_eq!(container.buffer_size(), expected_num_retryable);
assert_eq!(container.queue_size(), expected_num_retryable - 1); // held transaction not in queue.
}

#[test]
Expand All @@ -550,10 +570,10 @@ mod tests {
let num_scheduled = common.send_batch(0).unwrap();
let work = work_receivers[0].try_recv().unwrap();
assert_eq!(work.ids.len(), num_scheduled);
let retryable_indexes = vec![1, 0];
let retryable_indexes = vec![RetryableIndex::new(1, true), RetryableIndex::new(0, true)];
let finished_work = FinishedConsumeWork {
work,
retryable_indexes: retryable_indexes.clone(),
retryable_indexes,
};
finished_work_sender.send(finished_work).unwrap();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ where
}

pub fn run(mut self) -> Result<(), SchedulerError> {
let mut last_slot = None;
while !self.exit.load(Ordering::Relaxed) {
// BufferedPacketsDecision is shared with legacy BankingStage, which will forward
// packets. Initially, not renaming these decision variants but the actions taken
Expand All @@ -110,6 +111,10 @@ where
.maybe_report_and_reset_slot(new_leader_slot);

self.receive_completed()?;
if last_slot != new_leader_slot {
self.container.flush_held_transactions();
last_slot = new_leader_slot;
}
self.process_transactions(&decision)?;
if self.receive_and_buffer_packets(&decision).is_err() {
break;
Expand Down Expand Up @@ -353,7 +358,7 @@ mod tests {
use {
super::*,
crate::banking_stage::{
consumer::TARGET_NUM_TRANSACTIONS_PER_BATCH,
consumer::{RetryableIndex, TARGET_NUM_TRANSACTIONS_PER_BATCH},
packet_deserializer::PacketDeserializer,
scheduler_messages::{ConsumeWork, FinishedConsumeWork, TransactionBatchId},
tests::create_slow_genesis_config,
Expand Down Expand Up @@ -874,7 +879,7 @@ mod tests {
finished_consume_work_sender
.send(FinishedConsumeWork {
work: consume_work,
retryable_indexes: vec![1],
retryable_indexes: vec![RetryableIndex::new(1, true)],
})
.unwrap();

Expand Down
Loading
Loading