Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 26 additions & 20 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use {
TransactionExecutionResult,
},
bank_utils,
cost_model::CostModel,
cost_model::{CostModel, ExecutionCost},
transaction_batch::TransactionBatch,
vote_sender_types::ReplayVoteSender,
},
Expand Down Expand Up @@ -973,14 +973,20 @@ impl BankingStage {
let tx_costs = qos_service.compute_transaction_costs(txs.iter());

let transactions_qos_results =
qos_service.select_transactions_per_cost(txs.iter(), tx_costs.iter(), bank);
qos_service.select_transactions_per_cost(txs.iter(), tx_costs.into_iter(), bank);

// Only lock accounts for those transactions are selected for the block;
// Once accounts are locked, other threads cannot encode transactions that will modify the
// same account state
let mut lock_time = Measure::start("lock_time");
let batch =
bank.prepare_sanitized_batch_with_results(txs, transactions_qos_results.into_iter());
let batch = bank.prepare_sanitized_batch_with_results(
txs,
transactions_qos_results
.into_iter()
.map(|transaction_cost_result| {
transaction_cost_result.map(|transaction_cost| transaction_cost.execution_cost)
}),
);
lock_time.stop();

// retryable_txs includes AccountInUse, WouldExceedMaxBlockCostLimit and
Expand Down Expand Up @@ -1077,9 +1083,9 @@ impl BankingStage {
fn prepare_filter_for_pending_transactions(
transactions_len: usize,
pending_tx_indexes: &[usize],
) -> Vec<transaction::Result<()>> {
) -> Vec<transaction::Result<ExecutionCost>> {
let mut mask = vec![Err(TransactionError::BlockhashNotFound); transactions_len];
pending_tx_indexes.iter().for_each(|x| mask[*x] = Ok(()));
pending_tx_indexes.iter().for_each(|x| mask[*x] = Ok(0));
mask
}

Expand Down Expand Up @@ -1170,7 +1176,7 @@ impl BankingStage {

let results = bank.check_transactions(
transactions,
&filter,
filter.into_iter(),
(MAX_PROCESSING_AGE)
.saturating_sub(max_tx_fwd_delay)
.saturating_sub(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET as usize),
Expand Down Expand Up @@ -2082,20 +2088,20 @@ mod tests {
vec![
Err(TransactionError::BlockhashNotFound),
Err(TransactionError::BlockhashNotFound),
Ok(()),
Ok(0),
Err(TransactionError::BlockhashNotFound),
Ok(()),
Ok(())
Ok(0),
Ok(0)
]
);

assert_eq!(
BankingStage::prepare_filter_for_pending_transactions(6, &[0, 2, 3]),
vec![
Ok(()),
Ok(0),
Err(TransactionError::BlockhashNotFound),
Ok(()),
Ok(()),
Ok(0),
Ok(0),
Err(TransactionError::BlockhashNotFound),
Err(TransactionError::BlockhashNotFound),
]
Expand All @@ -2109,10 +2115,10 @@ mod tests {
&[
(Err(TransactionError::BlockhashNotFound), None),
(Err(TransactionError::BlockhashNotFound), None),
(Ok(()), None),
(Ok(0), None),
(Err(TransactionError::BlockhashNotFound), None),
(Ok(()), None),
(Ok(()), None),
(Ok(0), None),
(Ok(0), None),
],
&[2, 4, 5, 9, 11, 13]
),
Expand All @@ -2122,12 +2128,12 @@ mod tests {
assert_eq!(
BankingStage::filter_valid_transaction_indexes(
&[
(Ok(()), None),
(Ok(0), None),
(Err(TransactionError::BlockhashNotFound), None),
(Err(TransactionError::BlockhashNotFound), None),
(Ok(()), None),
(Ok(()), None),
(Ok(()), None),
(Ok(0), None),
(Ok(0), None),
(Ok(0), None),
],
&[1, 6, 7, 9, 31, 43]
),
Expand Down
2 changes: 2 additions & 0 deletions core/src/cost_update_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ mod tests {
ProgramTiming {
accumulated_us,
accumulated_units,
current_cost_model_estimated_units: 0,
count,
},
);
Expand Down Expand Up @@ -281,6 +282,7 @@ mod tests {
ProgramTiming {
accumulated_us,
accumulated_units,
current_cost_model_estimated_units: 0,
count,
},
);
Expand Down
11 changes: 6 additions & 5 deletions core/src/qos_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,18 +108,18 @@ impl QosService {
pub fn select_transactions_per_cost<'a>(
&self,
transactions: impl Iterator<Item = &'a SanitizedTransaction>,
transactions_costs: impl Iterator<Item = &'a TransactionCost>,
transactions_costs: impl Iterator<Item = TransactionCost>,
bank: &Arc<Bank>,
) -> Vec<transaction::Result<()>> {
) -> Vec<transaction::Result<TransactionCost>> {
let mut cost_tracking_time = Measure::start("cost_tracking_time");
let mut cost_tracker = bank.write_cost_tracker().unwrap();
let select_results = transactions
.zip(transactions_costs)
.map(|(tx, cost)| match cost_tracker.try_add(tx, cost) {
.map(|(tx, cost)| match cost_tracker.try_add(tx, &cost) {
Ok(current_block_cost) => {
debug!("slot {:?}, transaction {:?}, cost {:?}, fit into current block, current block cost {}", bank.slot(), tx, cost, current_block_cost);
self.metrics.selected_txs_count.fetch_add(1, Ordering::Relaxed);
Ok(())
Ok(cost)
},
Err(e) => {
debug!("slot {:?}, transaction {:?}, cost {:?}, not fit into current block, '{:?}'", bank.slot(), tx, cost, e);
Expand Down Expand Up @@ -304,7 +304,8 @@ mod tests {
bank.write_cost_tracker()
.unwrap()
.set_limits(cost_limit, cost_limit);
let results = qos_service.select_transactions_per_cost(txs.iter(), txs_costs.iter(), &bank);
let results =
qos_service.select_transactions_per_cost(txs.iter(), txs_costs.into_iter(), &bank);

// verify that first transfer tx and all votes are allowed
assert_eq!(results.len(), txs.len());
Expand Down
6 changes: 3 additions & 3 deletions ledger/src/blockstore_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,10 @@ thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::
.unwrap())
);

fn first_err(results: &[Result<()>]) -> Result<()> {
fn first_err<T>(results: &[Result<T>]) -> Result<()> {
for r in results {
if r.is_err() {
return r.clone();
if let Err(e) = r {
return Err(e.clone());
}
}
Ok(())
Expand Down
Loading