diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 82acf209f1731c..7a25e87f77c2a0 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -584,6 +584,14 @@ impl BankingStage { ) } + let forwarder = Forwarder::new( + poh_recorder.clone(), + bank_forks.clone(), + cluster_info.clone(), + connection_cache.clone(), + data_budget.clone(), + ); + // Spawn the central scheduler thread bank_thread_hdls.push({ let packet_deserializer = @@ -595,6 +603,7 @@ impl BankingStage { bank_forks, scheduler, worker_metrics, + forwarder, ); Builder::new() .name("solBnkTxSched".to_string()) diff --git a/core/src/banking_stage/forward_worker.rs b/core/src/banking_stage/forward_worker.rs index a0be381086878b..6c9fd45e029c2f 100644 --- a/core/src/banking_stage/forward_worker.rs +++ b/core/src/banking_stage/forward_worker.rs @@ -86,7 +86,6 @@ mod tests { super::*, crate::banking_stage::{ immutable_deserialized_packet::ImmutableDeserializedPacket, - scheduler_messages::TransactionId, tests::{create_slow_genesis_config, new_test_cluster_info, simulate_poh}, }, crossbeam_channel::unbounded, @@ -200,9 +199,6 @@ mod tests { system_transaction::transfer(mint_keypair, &pubkey2, 2, genesis_config.hash()), ]; - let id1 = TransactionId::new(1); - let id2 = TransactionId::new(0); - let packets = to_packet_batches(&txs, 2); assert_eq!(packets.len(), 1); let packets = packets[0] @@ -211,14 +207,8 @@ mod tests { .map(|p| ImmutableDeserializedPacket::new(p).unwrap()) .map(Arc::new) .collect(); - forward_sender - .send(ForwardWork { - packets, - ids: vec![id1, id2], - }) - .unwrap(); + forward_sender.send(ForwardWork { packets }).unwrap(); let forwarded = forwarded_receiver.recv().unwrap(); - assert_eq!(forwarded.work.ids, vec![id1, id2]); assert!(forwarded.successful); drop(test_frame); diff --git a/core/src/banking_stage/scheduler_messages.rs b/core/src/banking_stage/scheduler_messages.rs index 172087e2cf8e82..92181e2abf9655 100644 --- a/core/src/banking_stage/scheduler_messages.rs +++ b/core/src/banking_stage/scheduler_messages.rs @@ -48,7 +48,6 @@ pub struct ConsumeWork { /// Message: [Scheduler -> Worker] /// Transactions to be forwarded to the next leader(s) pub struct ForwardWork { - pub ids: Vec, pub packets: Vec>, } diff --git a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs index 16fa19cca44895..71ff3939203c2d 100644 --- a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs +++ b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs @@ -515,14 +515,18 @@ impl Batches { mod tests { use { super::*, - crate::banking_stage::consumer::TARGET_NUM_TRANSACTIONS_PER_BATCH, + crate::banking_stage::{ + consumer::TARGET_NUM_TRANSACTIONS_PER_BATCH, + immutable_deserialized_packet::ImmutableDeserializedPacket, + }, crossbeam_channel::{unbounded, Receiver}, itertools::Itertools, solana_sdk::{ - compute_budget::ComputeBudgetInstruction, hash::Hash, message::Message, pubkey::Pubkey, - signature::Keypair, signer::Signer, system_instruction, transaction::Transaction, + compute_budget::ComputeBudgetInstruction, hash::Hash, message::Message, packet::Packet, + pubkey::Pubkey, signature::Keypair, signer::Signer, system_instruction, + transaction::Transaction, }, - std::borrow::Borrow, + std::{borrow::Borrow, sync::Arc}, }; macro_rules! txid { @@ -593,12 +597,24 @@ mod tests { let id = TransactionId::new(index as u64); let transaction = prioritized_tranfers(from_keypair.borrow(), to_pubkeys, lamports, priority); + let packet = Arc::new( + ImmutableDeserializedPacket::new( + Packet::from_data(None, transaction.to_versioned_transaction()).unwrap(), + ) + .unwrap(), + ); let transaction_ttl = SanitizedTransactionTTL { transaction, max_age_slot: Slot::MAX, }; const TEST_TRANSACTION_COST: u64 = 5000; - container.insert_new_transaction(id, transaction_ttl, priority, TEST_TRANSACTION_COST); + container.insert_new_transaction( + id, + transaction_ttl, + packet, + priority, + TEST_TRANSACTION_COST, + ); } container diff --git a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs index bf66a5d536d55f..d4b3ff0694f2b2 100644 --- a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs +++ b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs @@ -12,9 +12,11 @@ use { consume_worker::ConsumeWorkerMetrics, consumer::Consumer, decision_maker::{BufferedPacketsDecision, DecisionMaker}, + forward_packet_batches_by_accounts::ForwardPacketBatchesByAccounts, + forwarder::Forwarder, immutable_deserialized_packet::ImmutableDeserializedPacket, packet_deserializer::PacketDeserializer, - TOTAL_BUFFERED_PACKETS, + ForwardOption, TOTAL_BUFFERED_PACKETS, }, crossbeam_channel::RecvTimeoutError, solana_accounts_db::transaction_error_metrics::TransactionErrorMetrics, @@ -23,13 +25,16 @@ use { solana_program_runtime::compute_budget_processor::process_compute_budget_instructions, solana_runtime::{bank::Bank, bank_forks::BankForks}, solana_sdk::{ - clock::MAX_PROCESSING_AGE, - feature_set::include_loaded_accounts_data_size_in_fee_calculation, fee::FeeBudgetLimits, - saturating_add_assign, timing::AtomicInterval, transaction::SanitizedTransaction, + clock::{FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, MAX_PROCESSING_AGE}, + feature_set::include_loaded_accounts_data_size_in_fee_calculation, + fee::FeeBudgetLimits, + saturating_add_assign, + timing::AtomicInterval, + transaction::SanitizedTransaction, }, std::{ sync::{Arc, RwLock}, - time::Duration, + time::{Duration, Instant}, }, }; @@ -53,6 +58,8 @@ pub(crate) struct SchedulerController { timing_metrics: SchedulerTimingMetrics, /// Metric report handles for the worker threads. worker_metrics: Vec>, + /// State for forwarding packets to the leader. + forwarder: Forwarder, } impl SchedulerController { @@ -62,6 +69,7 @@ impl SchedulerController { bank_forks: Arc>, scheduler: PrioGraphScheduler, worker_metrics: Vec>, + forwarder: Forwarder, ) -> Self { Self { decision_maker, @@ -73,6 +81,7 @@ impl SchedulerController { count_metrics: SchedulerCountMetrics::default(), timing_metrics: SchedulerTimingMetrics::default(), worker_metrics, + forwarder, } } @@ -121,7 +130,12 @@ impl SchedulerController { let (scheduling_summary, schedule_time_us) = measure_us!(self.scheduler.schedule( &mut self.container, |txs, results| { - Self::pre_graph_filter(txs, results, &bank_start.working_bank) + Self::pre_graph_filter( + txs, + results, + &bank_start.working_bank, + MAX_PROCESSING_AGE, + ) }, |_| true // no pre-lock filter for now )?); @@ -144,12 +158,12 @@ impl SchedulerController { saturating_add_assign!(self.timing_metrics.schedule_time_us, schedule_time_us); } BufferedPacketsDecision::Forward => { - let (_, clear_time_us) = measure_us!(self.clear_container()); - saturating_add_assign!(self.timing_metrics.clear_time_us, clear_time_us); + let (_, forward_time_us) = measure_us!(self.forward_packets(false)); + saturating_add_assign!(self.timing_metrics.forward_time_us, forward_time_us); } BufferedPacketsDecision::ForwardAndHold => { - let (_, clean_time_us) = measure_us!(self.clean_queue()); - saturating_add_assign!(self.timing_metrics.clean_time_us, clean_time_us); + let (_, forward_time_us) = measure_us!(self.forward_packets(true)); + saturating_add_assign!(self.timing_metrics.forward_time_us, forward_time_us); } BufferedPacketsDecision::Hold => {} } @@ -157,15 +171,16 @@ impl SchedulerController { Ok(()) } - fn pre_graph_filter(transactions: &[&SanitizedTransaction], results: &mut [bool], bank: &Bank) { + fn pre_graph_filter( + transactions: &[&SanitizedTransaction], + results: &mut [bool], + bank: &Bank, + max_age: usize, + ) { let lock_results = vec![Ok(()); transactions.len()]; let mut error_counters = TransactionErrorMetrics::default(); - let check_results = bank.check_transactions( - transactions, - &lock_results, - MAX_PROCESSING_AGE, - &mut error_counters, - ); + let check_results = + bank.check_transactions(transactions, &lock_results, max_age, &mut error_counters); let fee_check_results: Vec<_> = check_results .into_iter() @@ -181,8 +196,112 @@ impl SchedulerController { } } + /// Forward packets to the next leader. + fn forward_packets(&mut self, hold: bool) { + const MAX_FORWARDING_DURATION: Duration = Duration::from_millis(100); + let start = Instant::now(); + let bank = self.bank_forks.read().unwrap().working_bank(); + let feature_set = &bank.feature_set; + let mut forwardable_packets = + ForwardPacketBatchesByAccounts::new_with_default_batch_limits(); + + // Pop from the container in chunks, filter using bank checks, then attempt to forward. + // This doubles as a way to clean the queue as well as forwarding transactions. + const CHUNK_SIZE: usize = 64; + let mut num_forwarded: usize = 0; + let mut ids_to_add_back = Vec::new(); + let mut max_time_reached = false; + while !self.container.is_empty() { + let mut filter_array = [true; CHUNK_SIZE]; + let mut ids = Vec::with_capacity(CHUNK_SIZE); + let mut txs = Vec::with_capacity(CHUNK_SIZE); + + for _ in 0..CHUNK_SIZE { + if let Some(id) = self.container.pop() { + ids.push(id); + } else { + break; + } + } + let chunk_size = ids.len(); + ids.iter().for_each(|id| { + let transaction = self.container.get_transaction_ttl(&id.id).unwrap(); + txs.push(&transaction.transaction); + }); + + // use same filter we use for processing transactions: + // age, already processed, fee-check. + Self::pre_graph_filter( + &txs, + &mut filter_array, + &bank, + MAX_PROCESSING_AGE + .saturating_sub(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET as usize), + ); + + for (id, filter_result) in ids.iter().zip(&filter_array[..chunk_size]) { + if !*filter_result { + self.container.remove_by_id(&id.id); + continue; + } + + ids_to_add_back.push(*id); // add back to the queue at end + let state = self.container.get_mut_transaction_state(&id.id).unwrap(); + let sanitized_transaction = &state.transaction_ttl().transaction; + let immutable_packet = state.packet().clone(); + + // If not already forwarded and can be forwarded, add to forwardable packets. + if state.should_forward() + && forwardable_packets.try_add_packet( + sanitized_transaction, + immutable_packet, + feature_set, + ) + { + saturating_add_assign!(num_forwarded, 1); + state.mark_forwarded(); + } + } + + if start.elapsed() >= MAX_FORWARDING_DURATION { + max_time_reached = true; + break; + } + } + + // Forward each batch of transactions + for batch in forwardable_packets.iter_batches() { + let _ = self.forwarder.forward_packets( + &ForwardOption::ForwardTransaction, + batch.get_forwardable_packets(), + ); + } + + // If we hit the time limit. Drop everything that was not checked/processed. + // If we cannot run these simple checks in time, then we cannot run them during + // leader slot. + if max_time_reached { + while let Some(id) = self.container.pop() { + self.container.remove_by_id(&id.id); + } + } + + if hold { + for priority_id in ids_to_add_back { + self.container.push_id_into_queue(priority_id); + } + } else { + for priority_id in ids_to_add_back { + self.container.remove_by_id(&priority_id.id); + } + } + + saturating_add_assign!(self.count_metrics.num_forwarded, num_forwarded); + } + /// Clears the transaction state container. /// This only clears pending transactions, and does **not** clear in-flight transactions. + #[allow(dead_code)] fn clear_container(&mut self) { while let Some(id) = self.container.pop() { self.container.remove_by_id(&id.id); @@ -193,6 +312,7 @@ impl SchedulerController { /// Clean unprocessable transactions from the queue. These will be transactions that are /// expired, already processed, or are no longer sanitizable. /// This only clears pending transactions, and does **not** clear in-flight transactions. + #[allow(dead_code)] fn clean_queue(&mut self) { // Clean up any transactions that have already been processed, are too old, or do not have // valid nonce accounts. @@ -257,19 +377,17 @@ impl SchedulerController { let remaining_queue_capacity = self.container.remaining_queue_capacity(); const MAX_PACKET_RECEIVE_TIME: Duration = Duration::from_millis(100); - let (recv_timeout, should_buffer) = match decision { - BufferedPacketsDecision::Consume(_) => ( + let recv_timeout = match decision { + BufferedPacketsDecision::Consume(_) => { if self.container.is_empty() { MAX_PACKET_RECEIVE_TIME } else { Duration::ZERO - }, - true, - ), - BufferedPacketsDecision::Forward => (MAX_PACKET_RECEIVE_TIME, false), - BufferedPacketsDecision::ForwardAndHold | BufferedPacketsDecision::Hold => { - (MAX_PACKET_RECEIVE_TIME, true) + } } + BufferedPacketsDecision::Forward + | BufferedPacketsDecision::ForwardAndHold + | BufferedPacketsDecision::Hold => MAX_PACKET_RECEIVE_TIME, }; let (received_packet_results, receive_time_us) = measure_us!(self @@ -284,17 +402,10 @@ impl SchedulerController { Ok(receive_packet_results) => { let num_received_packets = receive_packet_results.deserialized_packets.len(); saturating_add_assign!(self.count_metrics.num_received, num_received_packets); - if should_buffer { - let (_, buffer_time_us) = measure_us!( - self.buffer_packets(receive_packet_results.deserialized_packets) - ); - saturating_add_assign!(self.timing_metrics.buffer_time_us, buffer_time_us); - } else { - saturating_add_assign!( - self.count_metrics.num_dropped_on_receive, - num_received_packets - ); - } + + let (_, buffer_time_us) = + measure_us!(self.buffer_packets(receive_packet_results.deserialized_packets)); + saturating_add_assign!(self.timing_metrics.buffer_time_us, buffer_time_us); } Err(RecvTimeoutError::Timeout) => {} Err(RecvTimeoutError::Disconnected) => return false, @@ -304,6 +415,8 @@ impl SchedulerController { } fn buffer_packets(&mut self, packets: Vec) { + // Convert to Arcs + let packets: Vec<_> = packets.into_iter().map(Arc::new).collect(); // Sanitize packets, generate IDs, and insert into the container. let bank = self.bank_forks.read().unwrap().working_bank(); let last_slot_in_epoch = bank.epoch_schedule().get_last_slot_in_epoch(bank.epoch()); @@ -313,28 +426,39 @@ impl SchedulerController { const CHUNK_SIZE: usize = 128; let lock_results: [_; CHUNK_SIZE] = core::array::from_fn(|_| Ok(())); + + let mut arc_packets = Vec::with_capacity(CHUNK_SIZE); + let mut transactions = Vec::with_capacity(CHUNK_SIZE); + let mut fee_budget_limits_vec = Vec::with_capacity(CHUNK_SIZE); + let mut error_counts = TransactionErrorMetrics::default(); for chunk in packets.chunks(CHUNK_SIZE) { let mut post_sanitization_count: usize = 0; - let (transactions, fee_budget_limits_vec): (Vec<_>, Vec<_>) = chunk + chunk .iter() .filter_map(|packet| { - packet.build_sanitized_transaction(feature_set, vote_only, bank.as_ref()) + packet + .build_sanitized_transaction(feature_set, vote_only, bank.as_ref()) + .map(|tx| (packet.clone(), tx)) }) .inspect(|_| saturating_add_assign!(post_sanitization_count, 1)) - .filter(|tx| { + .filter(|(_packet, tx)| { SanitizedTransaction::validate_account_locks( tx.message(), transaction_account_lock_limit, ) .is_ok() }) - .filter_map(|tx| { + .filter_map(|(packet, tx)| { process_compute_budget_instructions(tx.message().program_instructions_iter()) - .map(|compute_budget| (tx, compute_budget.into())) + .map(|compute_budget| (packet, tx, compute_budget.into())) .ok() }) - .unzip(); + .for_each(|(packet, tx, fee_budget_limits)| { + arc_packets.push(packet); + transactions.push(tx); + fee_budget_limits_vec.push(fee_budget_limits); + }); let check_results = bank.check_transactions( &transactions, @@ -345,9 +469,10 @@ impl SchedulerController { let post_lock_validation_count = transactions.len(); let mut post_transaction_check_count: usize = 0; - for ((transaction, fee_budget_limits), _) in transactions - .into_iter() - .zip(fee_budget_limits_vec) + for (((packet, transaction), fee_budget_limits), _) in arc_packets + .drain(..) + .zip(transactions.drain(..)) + .zip(fee_budget_limits_vec.drain(..)) .zip(check_results) .filter(|(_, check_result)| check_result.0.is_ok()) { @@ -364,6 +489,7 @@ impl SchedulerController { if self.container.insert_new_transaction( transaction_id, transaction_ttl, + packet, priority, cost, ) { @@ -459,6 +585,8 @@ struct SchedulerCountMetrics { num_finished: usize, /// Number of transactions that were retryable. num_retryable: usize, + /// Number of transactions that were scheduled to be forwarded. + pub num_forwarded: usize, /// Number of transactions that were immediately dropped on receive. num_dropped_on_receive: usize, @@ -502,6 +630,7 @@ impl SchedulerCountMetrics { ), ("num_finished", self.num_finished, i64), ("num_retryable", self.num_retryable, i64), + ("num_forwarded", self.num_forwarded, i64), ("num_dropped_on_receive", self.num_dropped_on_receive, i64), ( "num_dropped_on_sanitization", @@ -536,6 +665,7 @@ impl SchedulerCountMetrics { || self.num_schedule_filtered_out != 0 || self.num_finished != 0 || self.num_retryable != 0 + || self.num_forwarded != 0 || self.num_dropped_on_receive != 0 || self.num_dropped_on_sanitization != 0 || self.num_dropped_on_validate_locks != 0 @@ -553,6 +683,7 @@ impl SchedulerCountMetrics { self.num_schedule_filtered_out = 0; self.num_finished = 0; self.num_retryable = 0; + self.num_forwarded = 0; self.num_dropped_on_receive = 0; self.num_dropped_on_sanitization = 0; self.num_dropped_on_validate_locks = 0; @@ -580,6 +711,8 @@ struct SchedulerTimingMetrics { clear_time_us: u64, /// Time spent cleaning expired or processed transactions from the container. clean_time_us: u64, + /// Time spent forwarding transactions. + forward_time_us: u64, /// Time spent receiving completed transactions. receive_completed_time_us: u64, } @@ -605,6 +738,7 @@ impl SchedulerTimingMetrics { ("schedule_time_us", self.schedule_time_us, i64), ("clear_time_us", self.clear_time_us, i64), ("clean_time_us", self.clean_time_us, i64), + ("forward_time_us", self.forward_time_us, i64), ( "receive_completed_time_us", self.receive_completed_time_us, @@ -621,6 +755,7 @@ impl SchedulerTimingMetrics { self.schedule_time_us = 0; self.clear_time_us = 0; self.clean_time_us = 0; + self.forward_time_us = 0; self.receive_completed_time_us = 0; } } @@ -633,13 +768,14 @@ mod tests { banking_stage::{ consumer::TARGET_NUM_TRANSACTIONS_PER_BATCH, scheduler_messages::{ConsumeWork, FinishedConsumeWork, TransactionBatchId}, - tests::create_slow_genesis_config, + tests::{create_slow_genesis_config, new_test_cluster_info}, }, banking_trace::BankingPacketBatch, sigverify::SigverifyTracerPacketStats, }, crossbeam_channel::{unbounded, Receiver, Sender}, itertools::Itertools, + solana_client::connection_cache::ConnectionCache, solana_ledger::{ blockstore::Blockstore, genesis_utils::GenesisConfigInfo, get_tmp_ledger_path_auto_delete, leader_schedule_cache::LeaderScheduleCache, @@ -707,6 +843,17 @@ mod tests { let (consume_work_senders, consume_work_receivers) = create_channels(num_threads); let (finished_consume_work_sender, finished_consume_work_receiver) = unbounded(); + let validator_keypair = Arc::new(Keypair::new()); + let (_local_node, cluster_info) = new_test_cluster_info(Some(validator_keypair)); + let cluster_info = Arc::new(cluster_info); + let forwarder = Forwarder::new( + poh_recorder.clone(), + bank_forks.clone(), + cluster_info, + Arc::new(ConnectionCache::new("connection_cache_test")), + Arc::default(), + ); + let test_frame = TestFrame { bank, mint_keypair, @@ -718,12 +865,14 @@ mod tests { consume_work_receivers, finished_consume_work_sender, }; + let scheduler_controller = SchedulerController::new( decision_maker, packet_deserializer, bank_forks, PrioGraphScheduler::new(consume_work_senders, finished_consume_work_receiver), vec![], // no actual workers with metrics to report, this can be empty + forwarder, ); (test_frame, scheduler_controller) diff --git a/core/src/banking_stage/transaction_scheduler/transaction_state.rs b/core/src/banking_stage/transaction_scheduler/transaction_state.rs index 6cfd483db2b897..eac61fd4e83828 100644 --- a/core/src/banking_stage/transaction_scheduler/transaction_state.rs +++ b/core/src/banking_stage/transaction_scheduler/transaction_state.rs @@ -1,4 +1,8 @@ -use solana_sdk::{clock::Slot, transaction::SanitizedTransaction}; +use { + crate::banking_stage::immutable_deserialized_packet::ImmutableDeserializedPacket, + solana_sdk::{clock::Slot, transaction::SanitizedTransaction}, + std::sync::Arc, +}; /// Simple wrapper type to tie a sanitized transaction to max age slot. pub(crate) struct SanitizedTransactionTTL { @@ -30,20 +34,38 @@ pub(crate) enum TransactionState { /// The transaction is available for scheduling. Unprocessed { transaction_ttl: SanitizedTransactionTTL, + packet: Arc, priority: u64, cost: u64, + should_forward: bool, }, /// The transaction is currently scheduled or being processed. - Pending { priority: u64, cost: u64 }, + Pending { + packet: Arc, + priority: u64, + cost: u64, + should_forward: bool, + }, + /// Only used during transition. + Transitioning, } impl TransactionState { /// Creates a new `TransactionState` in the `Unprocessed` state. - pub(crate) fn new(transaction_ttl: SanitizedTransactionTTL, priority: u64, cost: u64) -> Self { + pub(crate) fn new( + transaction_ttl: SanitizedTransactionTTL, + packet: Arc, + priority: u64, + cost: u64, + ) -> Self { + let packet_meta = packet.original_packet().meta(); + let should_forward = !packet_meta.forwarded() && packet_meta.is_from_staked_node(); Self::Unprocessed { transaction_ttl, + packet, priority, cost, + should_forward, } } @@ -103,6 +125,7 @@ impl TransactionState { match self { Self::Unprocessed { priority, .. } => *priority, Self::Pending { priority, .. } => *priority, + Self::Transitioning => unreachable!(), } } @@ -111,6 +134,41 @@ impl TransactionState { match self { Self::Unprocessed { cost, .. } => *cost, Self::Pending { cost, .. } => *cost, + Self::Transitioning => unreachable!(), + } + } + + /// Return whether packet should be attempted to be forwarded. + pub(crate) fn should_forward(&self) -> bool { + match self { + Self::Unprocessed { + should_forward: forwarded, + .. + } => *forwarded, + Self::Pending { + should_forward: forwarded, + .. + } => *forwarded, + Self::Transitioning => unreachable!(), + } + } + + /// Mark the packet as forwarded. + /// This is used to prevent the packet from being forwarded multiple times. + pub(crate) fn mark_forwarded(&mut self) { + match self { + Self::Unprocessed { should_forward, .. } => *should_forward = false, + Self::Pending { should_forward, .. } => *should_forward = false, + Self::Transitioning => unreachable!(), + } + } + + /// Return the packet of the transaction. + pub(crate) fn packet(&self) -> &Arc { + match self { + Self::Unprocessed { packet, .. } => packet, + Self::Pending { packet, .. } => packet, + Self::Transitioning => unreachable!(), } } @@ -125,15 +183,23 @@ impl TransactionState { match self.take() { TransactionState::Unprocessed { transaction_ttl, + packet, priority, cost, + should_forward: forwarded, } => { - *self = TransactionState::Pending { priority, cost }; + *self = TransactionState::Pending { + packet, + priority, + cost, + should_forward: forwarded, + }; transaction_ttl } TransactionState::Pending { .. } => { panic!("transaction already pending"); } + Self::Transitioning => unreachable!(), } } @@ -146,13 +212,21 @@ impl TransactionState { pub(crate) fn transition_to_unprocessed(&mut self, transaction_ttl: SanitizedTransactionTTL) { match self.take() { TransactionState::Unprocessed { .. } => panic!("already unprocessed"), - TransactionState::Pending { priority, cost } => { + TransactionState::Pending { + packet, + priority, + cost, + should_forward: forwarded, + } => { *self = Self::Unprocessed { transaction_ttl, + packet, priority, cost, + should_forward: forwarded, } } + Self::Transitioning => unreachable!(), } } @@ -166,19 +240,14 @@ impl TransactionState { transaction_ttl, .. } => transaction_ttl, Self::Pending { .. } => panic!("transaction is pending"), + Self::Transitioning => unreachable!(), } } /// Internal helper to transitioning between states. /// Replaces `self` with a dummy state that will immediately be overwritten in transition. fn take(&mut self) -> Self { - core::mem::replace( - self, - Self::Pending { - priority: 0, - cost: 0, - }, - ) + core::mem::replace(self, Self::Transitioning) } } @@ -187,7 +256,7 @@ mod tests { use { super::*, solana_sdk::{ - compute_budget::ComputeBudgetInstruction, hash::Hash, message::Message, + compute_budget::ComputeBudgetInstruction, hash::Hash, message::Message, packet::Packet, signature::Keypair, signer::Signer, system_instruction, transaction::Transaction, }, }; @@ -205,12 +274,15 @@ mod tests { let message = Message::new(&ixs, Some(&from_keypair.pubkey())); let tx = Transaction::new(&[&from_keypair], message, Hash::default()); + let packet = Arc::new( + ImmutableDeserializedPacket::new(Packet::from_data(None, tx.clone()).unwrap()).unwrap(), + ); let transaction_ttl = SanitizedTransactionTTL { transaction: SanitizedTransaction::from_transaction_for_tests(tx), max_age_slot: Slot::MAX, }; const TEST_TRANSACTION_COST: u64 = 5000; - TransactionState::new(transaction_ttl, priority, TEST_TRANSACTION_COST) + TransactionState::new(transaction_ttl, packet, priority, TEST_TRANSACTION_COST) } #[test] diff --git a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs index caca701455beba..ac5b3e987467e5 100644 --- a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs +++ b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs @@ -3,9 +3,12 @@ use { transaction_priority_id::TransactionPriorityId, transaction_state::{SanitizedTransactionTTL, TransactionState}, }, - crate::banking_stage::scheduler_messages::TransactionId, + crate::banking_stage::{ + immutable_deserialized_packet::ImmutableDeserializedPacket, + scheduler_messages::TransactionId, + }, min_max_heap::MinMaxHeap, - std::collections::HashMap, + std::{collections::HashMap, sync::Arc}, }; /// This structure will hold `TransactionState` for the entirety of a @@ -96,13 +99,14 @@ impl TransactionStateContainer { &mut self, transaction_id: TransactionId, transaction_ttl: SanitizedTransactionTTL, + packet: Arc, priority: u64, cost: u64, ) -> bool { let priority_id = TransactionPriorityId::new(priority, transaction_id); self.id_to_transaction_state.insert( transaction_id, - TransactionState::new(transaction_ttl, priority, cost), + TransactionState::new(transaction_ttl, packet, priority, cost), ); self.push_id_into_queue(priority_id) } @@ -152,6 +156,7 @@ mod tests { compute_budget::ComputeBudgetInstruction, hash::Hash, message::Message, + packet::Packet, signature::Keypair, signer::Signer, slot_history::Slot, @@ -161,7 +166,14 @@ mod tests { }; /// Returns (transaction_ttl, priority, cost) - fn test_transaction(priority: u64) -> (SanitizedTransactionTTL, u64, u64) { + fn test_transaction( + priority: u64, + ) -> ( + SanitizedTransactionTTL, + Arc, + u64, + u64, + ) { let from_keypair = Keypair::new(); let ixs = vec![ system_instruction::transfer( @@ -177,21 +189,28 @@ mod tests { message, Hash::default(), )); + let packet = Arc::new( + ImmutableDeserializedPacket::new( + Packet::from_data(None, tx.to_versioned_transaction()).unwrap(), + ) + .unwrap(), + ); let transaction_ttl = SanitizedTransactionTTL { transaction: tx, max_age_slot: Slot::MAX, }; const TEST_TRANSACTION_COST: u64 = 5000; - (transaction_ttl, priority, TEST_TRANSACTION_COST) + (transaction_ttl, packet, priority, TEST_TRANSACTION_COST) } fn push_to_container(container: &mut TransactionStateContainer, num: usize) { for id in 0..num as u64 { let priority = id; - let (transaction_ttl, priority, cost) = test_transaction(priority); + let (transaction_ttl, packet, priority, cost) = test_transaction(priority); container.insert_new_transaction( TransactionId::new(id), transaction_ttl, + packet, priority, cost, );