Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/benches/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) {

let tx = test_tx();
let transactions = vec![tx; 4194304];
let batches = transactions_to_deserialized_packets(&transactions).unwrap();
let batches = transactions_to_deserialized_packets(&transactions, false).unwrap();
let batches_len = batches.len();
let mut transaction_buffer = UnprocessedTransactionStorage::new_transaction_storage(
UnprocessedPacketBatches::from_iter(batches.into_iter(), 2 * batches_len),
Expand Down
4 changes: 2 additions & 2 deletions core/benches/unprocessed_packet_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ fn insert_packet_batches(

(0..batch_count).for_each(|_| {
let (packet_batch, packet_indexes) = build_packet_batch(packet_per_batch_count, None);
let deserialized_packets = deserialize_packets(&packet_batch, &packet_indexes);
let deserialized_packets = deserialize_packets(&packet_batch, &packet_indexes, false);
unprocessed_packet_batches.insert_batch(deserialized_packets);
});
}
Expand Down Expand Up @@ -129,7 +129,7 @@ fn buffer_iter_desc_and_forward(
(0..batch_count).for_each(|_| {
let (packet_batch, packet_indexes) =
build_packet_batch(packet_per_batch_count, Some(genesis_config.hash()));
let deserialized_packets = deserialize_packets(&packet_batch, &packet_indexes);
let deserialized_packets = deserialize_packets(&packet_batch, &packet_indexes, false);
unprocessed_packet_batches.insert_batch(deserialized_packets);
});
}
Expand Down
3 changes: 2 additions & 1 deletion core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,8 @@ impl BankingStage {
),
};

let mut packet_receiver = PacketReceiver::new(id, packet_receiver);
let mut packet_receiver =
PacketReceiver::new(id, packet_receiver, bank_forks.clone());
let poh_recorder = poh_recorder.clone();

let committer = Committer::new(
Expand Down
21 changes: 15 additions & 6 deletions core/src/banking_stage/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1713,8 +1713,11 @@ mod tests {
let recorder = poh_recorder.read().unwrap().new_recorder();
let num_conflicting_transactions = transactions.len();
let deserialized_packets =
unprocessed_packet_batches::transactions_to_deserialized_packets(&transactions)
.unwrap();
unprocessed_packet_batches::transactions_to_deserialized_packets(
&transactions,
false,
)
.unwrap();
assert_eq!(deserialized_packets.len(), num_conflicting_transactions);
let mut buffered_packet_batches =
UnprocessedTransactionStorage::new_transaction_storage(
Expand Down Expand Up @@ -1793,8 +1796,11 @@ mod tests {
let recorder = poh_recorder.read().unwrap().new_recorder();
let num_conflicting_transactions = transactions.len();
let deserialized_packets =
unprocessed_packet_batches::transactions_to_deserialized_packets(&transactions)
.unwrap();
unprocessed_packet_batches::transactions_to_deserialized_packets(
&transactions,
false,
)
.unwrap();
assert_eq!(deserialized_packets.len(), num_conflicting_transactions);
let mut buffered_packet_batches =
UnprocessedTransactionStorage::new_transaction_storage(
Expand Down Expand Up @@ -1846,8 +1852,11 @@ mod tests {
let recorder = poh_recorder.read().unwrap().new_recorder();
let num_conflicting_transactions = transactions.len();
let deserialized_packets =
unprocessed_packet_batches::transactions_to_deserialized_packets(&transactions)
.unwrap();
unprocessed_packet_batches::transactions_to_deserialized_packets(
&transactions,
false,
)
.unwrap();
assert_eq!(deserialized_packets.len(), num_conflicting_transactions);
let retryable_packet = deserialized_packets[0].clone();
let mut buffered_packet_batches =
Expand Down
6 changes: 3 additions & 3 deletions core/src/banking_stage/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ mod tests {
Hash::new_unique(),
);
let packet = Packet::from_data(None, tx).unwrap();
let deserialized_packet = DeserializedPacket::new(packet).unwrap();
let deserialized_packet = DeserializedPacket::new(packet, false).unwrap();

let test_cases = vec![
("budget-restricted", DataBudget::restricted(), 0),
Expand Down Expand Up @@ -425,14 +425,14 @@ mod tests {
let transaction = system_transaction::transfer(&keypair, &pubkey, 1, fwd_block_hash);
let mut packet = Packet::from_data(None, transaction).unwrap();
packet.meta_mut().flags |= PacketFlags::FORWARDED;
DeserializedPacket::new(packet).unwrap()
DeserializedPacket::new(packet, false).unwrap()
};

let normal_block_hash = Hash::new_unique();
let normal_packet = {
let transaction = system_transaction::transfer(&keypair, &pubkey, 1, normal_block_hash);
let packet = Packet::from_data(None, transaction).unwrap();
DeserializedPacket::new(packet).unwrap()
DeserializedPacket::new(packet, false).unwrap()
};

let mut unprocessed_packet_batches = UnprocessedTransactionStorage::new_transaction_storage(
Expand Down
20 changes: 18 additions & 2 deletions core/src/banking_stage/packet_receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,30 @@ use {
},
crossbeam_channel::RecvTimeoutError,
solana_measure::{measure::Measure, measure_us},
solana_runtime::bank_forks::BankForks,
solana_sdk::{saturating_add_assign, timing::timestamp},
std::{sync::atomic::Ordering, time::Duration},
std::{
sync::{atomic::Ordering, Arc, RwLock},
time::Duration,
},
};

pub struct PacketReceiver {
id: u32,
packet_deserializer: PacketDeserializer,
bank_forks: Arc<RwLock<BankForks>>,
}

impl PacketReceiver {
pub fn new(id: u32, banking_packet_receiver: BankingPacketReceiver) -> Self {
pub fn new(
id: u32,
banking_packet_receiver: BankingPacketReceiver,
bank_forks: Arc<RwLock<BankForks>>,
) -> Self {
Self {
id,
packet_deserializer: PacketDeserializer::new(banking_packet_receiver),
bank_forks,
}
}

Expand All @@ -37,11 +47,17 @@ impl PacketReceiver {
) -> Result<(), RecvTimeoutError> {
let (result, recv_time_us) = measure_us!({
let recv_timeout = Self::get_receive_timeout(unprocessed_transaction_storage);

// get root bank from bank_forks, use it to get feature_set status
let _root_bank = self.bank_forks.read().unwrap().root_bank();
let support_round_compute_unit_price = false; // TODO get feature from root_bank

let mut recv_and_buffer_measure = Measure::start("recv_and_buffer");
self.packet_deserializer
.receive_packets(
recv_timeout,
unprocessed_transaction_storage.max_receive_size(),
support_round_compute_unit_price,
)
// Consumes results if Ok, otherwise we keep the Err
.map(|receive_packet_results| {
Expand Down
2 changes: 1 addition & 1 deletion core/src/forward_packet_batches_by_accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ mod tests {
let tx_cost = CostModel::calculate_cost(&sanitized_transaction, &FeatureSet::all_enabled());
let cost = tx_cost.sum();
let deserialized_packet =
DeserializedPacket::new(Packet::from_data(None, transaction).unwrap()).unwrap();
DeserializedPacket::new(Packet::from_data(None, transaction).unwrap(), false).unwrap();

// set limit ratio so each batch can only have one test transaction
let limit_ratio: u32 =
Expand Down
12 changes: 9 additions & 3 deletions core/src/immutable_deserialized_packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,13 @@ pub struct ImmutableDeserializedPacket {
}

impl ImmutableDeserializedPacket {
pub fn new(packet: Packet) -> Result<Self, DeserializedPacketError> {
pub fn new(
packet: Packet,
// current working or root bank's feature gate status for supporting rounding
// compute-unit-price. This parameter can be removed once the feature is fully adapted in
// mainnet-beta.
support_round_compute_unit_price: bool,
) -> Result<Self, DeserializedPacketError> {
let versioned_transaction: VersionedTransaction = packet.deserialize_slice(..)?;
let sanitized_transaction = SanitizedVersionedTransaction::try_from(versioned_transaction)?;
let message_bytes = packet_message(&packet)?;
Expand All @@ -55,7 +61,7 @@ impl ImmutableDeserializedPacket {

// drop transaction if prioritization fails.
let mut priority_details = sanitized_transaction
.get_transaction_priority_details()
.get_transaction_priority_details(support_round_compute_unit_price)
.ok_or(DeserializedPacketError::PrioritizationFailure)?;

// set priority to zero for vote transactions
Expand Down Expand Up @@ -160,7 +166,7 @@ mod tests {
Hash::new_unique(),
);
let packet = Packet::from_data(None, tx).unwrap();
let deserialized_packet = ImmutableDeserializedPacket::new(packet);
let deserialized_packet = ImmutableDeserializedPacket::new(packet, false);

assert!(matches!(deserialized_packet, Ok(_)));
}
Expand Down
16 changes: 12 additions & 4 deletions core/src/latest_unprocessed_votes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,19 @@ pub struct LatestValidatorVotePacket {
}

impl LatestValidatorVotePacket {
pub fn new(packet: Packet, vote_source: VoteSource) -> Result<Self, DeserializedPacketError> {
pub fn new(
packet: Packet,
vote_source: VoteSource,
support_round_compute_unit_price: bool,
) -> Result<Self, DeserializedPacketError> {
if !packet.meta().is_simple_vote_tx() {
return Err(DeserializedPacketError::VoteTransactionError);
}

let vote = Arc::new(ImmutableDeserializedPacket::new(packet)?);
let vote = Arc::new(ImmutableDeserializedPacket::new(
packet,
support_round_compute_unit_price,
)?);
Self::new_from_immutable(vote, vote_source)
}

Expand Down Expand Up @@ -351,7 +358,7 @@ mod tests {
.meta_mut()
.flags
.set(PacketFlags::SIMPLE_VOTE_TX, true);
LatestValidatorVotePacket::new(packet, vote_source).unwrap()
LatestValidatorVotePacket::new(packet, vote_source, false).unwrap()
}

fn deserialize_packets<'a>(
Expand All @@ -360,7 +367,8 @@ mod tests {
vote_source: VoteSource,
) -> impl Iterator<Item = LatestValidatorVotePacket> + 'a {
packet_indexes.iter().filter_map(move |packet_index| {
LatestValidatorVotePacket::new(packet_batch[*packet_index].clone(), vote_source).ok()
LatestValidatorVotePacket::new(packet_batch[*packet_index].clone(), vote_source, false)
.ok()
})
}

Expand Down
21 changes: 17 additions & 4 deletions core/src/packet_deserializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@ impl PacketDeserializer {
&self,
recv_timeout: Duration,
capacity: usize,
support_round_compute_unit_price: bool,
) -> Result<ReceivePacketResults, RecvTimeoutError> {
let (packet_count, packet_batches) = self.receive_until(recv_timeout, capacity)?;
Ok(Self::deserialize_and_collect_packets(
packet_count,
&packet_batches,
support_round_compute_unit_price,
))
}

Expand All @@ -53,6 +55,7 @@ impl PacketDeserializer {
fn deserialize_and_collect_packets(
packet_count: usize,
banking_batches: &[BankingPacketBatch],
support_round_compute_unit_price: bool,
) -> ReceivePacketResults {
let mut passed_sigverify_count: usize = 0;
let mut failed_sigverify_count: usize = 0;
Expand All @@ -66,8 +69,11 @@ impl PacketDeserializer {
passed_sigverify_count += packet_indexes.len();
failed_sigverify_count += packet_batch.len().saturating_sub(packet_indexes.len());

deserialized_packets
.extend(Self::deserialize_packets(packet_batch, &packet_indexes));
deserialized_packets.extend(Self::deserialize_packets(
packet_batch,
&packet_indexes,
support_round_compute_unit_price,
));
}

if let Some(tracer_packet_stats) = &banking_batch.1 {
Expand Down Expand Up @@ -136,9 +142,14 @@ impl PacketDeserializer {
fn deserialize_packets<'a>(
packet_batch: &'a PacketBatch,
packet_indexes: &'a [usize],
support_round_compute_unit_price: bool,
) -> impl Iterator<Item = ImmutableDeserializedPacket> + 'a {
packet_indexes.iter().filter_map(move |packet_index| {
ImmutableDeserializedPacket::new(packet_batch[*packet_index].clone()).ok()
ImmutableDeserializedPacket::new(
packet_batch[*packet_index].clone(),
support_round_compute_unit_price,
)
.ok()
})
}
}
Expand All @@ -160,7 +171,7 @@ mod tests {

#[test]
fn test_deserialize_and_collect_packets_empty() {
let results = PacketDeserializer::deserialize_and_collect_packets(0, &[]);
let results = PacketDeserializer::deserialize_and_collect_packets(0, &[], false);
assert_eq!(results.deserialized_packets.len(), 0);
assert!(results.new_tracer_stats_option.is_none());
assert_eq!(results.passed_sigverify_count, 0);
Expand All @@ -177,6 +188,7 @@ mod tests {
let results = PacketDeserializer::deserialize_and_collect_packets(
packet_count,
&[BankingPacketBatch::new((packet_batches, None))],
false,
);
assert_eq!(results.deserialized_packets.len(), 2);
assert!(results.new_tracer_stats_option.is_none());
Expand All @@ -195,6 +207,7 @@ mod tests {
let results = PacketDeserializer::deserialize_and_collect_packets(
packet_count,
&[BankingPacketBatch::new((packet_batches, None))],
false,
);
assert_eq!(results.deserialized_packets.len(), 1);
assert!(results.new_tracer_stats_option.is_none());
Expand Down
24 changes: 17 additions & 7 deletions core/src/unprocessed_packet_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,12 @@ impl DeserializedPacket {
}
}

pub fn new(packet: Packet) -> Result<Self, DeserializedPacketError> {
let immutable_section = ImmutableDeserializedPacket::new(packet)?;
pub fn new(
packet: Packet,
support_round_compute_unit_price: bool,
) -> Result<Self, DeserializedPacketError> {
let immutable_section =
ImmutableDeserializedPacket::new(packet, support_round_compute_unit_price)?;

Ok(Self {
immutable_section: Arc::new(immutable_section),
Expand Down Expand Up @@ -293,20 +297,26 @@ impl UnprocessedPacketBatches {
pub fn deserialize_packets<'a>(
packet_batch: &'a PacketBatch,
packet_indexes: &'a [usize],
support_round_compute_unit_price: bool,
) -> impl Iterator<Item = DeserializedPacket> + 'a {
packet_indexes.iter().filter_map(move |packet_index| {
DeserializedPacket::new(packet_batch[*packet_index].clone()).ok()
DeserializedPacket::new(
packet_batch[*packet_index].clone(),
support_round_compute_unit_price,
)
.ok()
})
}

pub fn transactions_to_deserialized_packets(
transactions: &[Transaction],
support_round_compute_unit_price: bool,
) -> Result<Vec<DeserializedPacket>, DeserializedPacketError> {
transactions
.iter()
.map(|transaction| {
let packet = Packet::from_data(None, transaction)?;
DeserializedPacket::new(packet)
DeserializedPacket::new(packet, support_round_compute_unit_price)
})
.collect()
}
Expand Down Expand Up @@ -335,7 +345,7 @@ mod tests {
Hash::new_unique(),
);
let packet = Packet::from_data(None, tx).unwrap();
DeserializedPacket::new(packet).unwrap()
DeserializedPacket::new(packet, false).unwrap()
}

fn packet_with_priority_details(priority: u64, compute_unit_limit: u64) -> DeserializedPacket {
Expand All @@ -348,7 +358,7 @@ mod tests {
],
Some(&from_account),
));
DeserializedPacket::new(Packet::from_data(None, tx).unwrap()).unwrap()
DeserializedPacket::new(Packet::from_data(None, tx).unwrap(), false).unwrap()
}

#[test]
Expand Down Expand Up @@ -464,7 +474,7 @@ mod tests {

packet_vector
.into_iter()
.map(|p| DeserializedPacket::new(p).unwrap())
.map(|p| DeserializedPacket::new(p, false).unwrap())
.collect()
}

Expand Down
Loading