Skip to content

Commit

Permalink
Remove concept of Tracer packet
Browse files Browse the repository at this point in the history
  • Loading branch information
apfitzge committed Dec 10, 2024
1 parent aba0e08 commit b8cffd2
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 124 deletions.
1 change: 0 additions & 1 deletion core/benches/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ fn setup(num_packets: usize, contentious_transaction: bool) -> BenchSetup {
transaction.message.account_keys[0] = solana_sdk::pubkey::Pubkey::new_unique();
}
let mut packet = Packet::from_data(None, transaction).unwrap();
packet.meta_mut().set_tracer(true);
packet.meta_mut().set_from_staked_node(true);
DeserializedPacket::new(packet).unwrap()
})
Expand Down
87 changes: 20 additions & 67 deletions core/src/banking_stage/unprocessed_transaction_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -631,8 +631,6 @@ impl ThreadLocalUnprocessedPackets {
bank: Arc<Bank>,
forward_buffer: &mut ForwardPacketBatchesByAccounts,
) -> FilterForwardingResults {
let mut total_forwardable_tracer_packets: usize = 0;
let mut total_tracer_packets_in_buffer: usize = 0;
let mut total_forwardable_packets: usize = 0;
let mut total_packet_conversion_us: u64 = 0;
let mut total_filter_packets_us: u64 = 0;
Expand All @@ -653,11 +651,8 @@ impl ThreadLocalUnprocessedPackets {
.into_iter()
.flat_map(|packets_to_process| {
// Only process packets not yet forwarded
let (forwarded_packets, packets_to_forward, is_tracer_packet) = self
.prepare_packets_to_forward(
packets_to_process,
&mut total_tracer_packets_in_buffer,
);
let (forwarded_packets, packets_to_forward) =
self.prepare_packets_to_forward(packets_to_process);

[
forwarded_packets,
Expand All @@ -682,15 +677,10 @@ impl ThreadLocalUnprocessedPackets {
&mut total_dropped_packets
));
saturating_add_assign!(total_filter_packets_us, filter_packets_us);

for forwardable_transaction_index in &forwardable_transaction_indexes {
saturating_add_assign!(total_forwardable_packets, 1);
let forwardable_packet_index =
transaction_to_packet_indexes[*forwardable_transaction_index];
if is_tracer_packet[forwardable_packet_index] {
saturating_add_assign!(total_forwardable_tracer_packets, 1);
}
}
saturating_add_assign!(
total_forwardable_packets,
forwardable_transaction_indexes.len()
);

let accepted_packet_indexes =
Self::add_filtered_packets_to_forward_buffer(
Expand Down Expand Up @@ -973,36 +963,27 @@ impl ThreadLocalUnprocessedPackets {
fn prepare_packets_to_forward(
&self,
packets_to_forward: impl Iterator<Item = Arc<ImmutableDeserializedPacket>>,
total_tracer_packets_in_buffer: &mut usize,
) -> (
Vec<Arc<ImmutableDeserializedPacket>>,
Vec<Arc<ImmutableDeserializedPacket>>,
Vec<bool>,
) {
let mut forwarded_packets: Vec<Arc<ImmutableDeserializedPacket>> = vec![];
let (forwardable_packets, is_tracer_packet) = packets_to_forward
let forwardable_packets = packets_to_forward
.into_iter()
.filter_map(|immutable_deserialized_packet| {
let is_tracer_packet = immutable_deserialized_packet
.original_packet()
.meta()
.is_tracer_packet();
if is_tracer_packet {
saturating_add_assign!(*total_tracer_packets_in_buffer, 1);
}
if !self
.unprocessed_packet_batches
.is_forwarded(&immutable_deserialized_packet)
{
Some((immutable_deserialized_packet, is_tracer_packet))
Some(immutable_deserialized_packet)
} else {
forwarded_packets.push(immutable_deserialized_packet);
None
}
})
.unzip();
.collect();

(forwarded_packets, forwardable_packets, is_tracer_packet)
(forwarded_packets, forwardable_packets)
}
}

Expand Down Expand Up @@ -1107,7 +1088,6 @@ mod tests {
.map(|(packets_id, transaction)| {
let mut p = Packet::from_data(None, transaction).unwrap();
p.meta_mut().port = packets_id as u16;
p.meta_mut().set_tracer(true);
DeserializedPacket::new(p).unwrap()
})
.collect_vec();
Expand Down Expand Up @@ -1356,17 +1336,14 @@ mod tests {
.map(|(packets_id, transaction)| {
let mut p = Packet::from_data(None, transaction).unwrap();
p.meta_mut().port = packets_id as u16;
p.meta_mut().set_tracer(true);
DeserializedPacket::new(p).unwrap()
})
.collect_vec();

// test preparing buffered packets for forwarding
let test_prepareing_buffered_packets_for_forwarding =
|buffered_packet_batches: UnprocessedPacketBatches| -> (usize, usize, usize) {
let mut total_tracer_packets_in_buffer: usize = 0;
|buffered_packet_batches: UnprocessedPacketBatches| -> usize {
let mut total_packets_to_forward: usize = 0;
let mut total_tracer_packets_to_forward: usize = 0;

let mut unprocessed_transactions = ThreadLocalUnprocessedPackets {
unprocessed_packet_batches: buffered_packet_batches,
Expand All @@ -1379,35 +1356,21 @@ mod tests {
.chunks(128usize)
.into_iter()
.flat_map(|packets_to_process| {
let (_, packets_to_forward, is_tracer_packet) = unprocessed_transactions
.prepare_packets_to_forward(
packets_to_process,
&mut total_tracer_packets_in_buffer,
);
let (_, packets_to_forward) =
unprocessed_transactions.prepare_packets_to_forward(packets_to_process);
total_packets_to_forward += packets_to_forward.len();
total_tracer_packets_to_forward += is_tracer_packet.len();
packets_to_forward
})
.collect::<MinMaxHeap<Arc<ImmutableDeserializedPacket>>>();
(
total_tracer_packets_in_buffer,
total_packets_to_forward,
total_tracer_packets_to_forward,
)
total_packets_to_forward
};

// all tracer packets are forwardable
{
let buffered_packet_batches: UnprocessedPacketBatches =
UnprocessedPacketBatches::from_iter(packets.clone(), packets.len());
let (
total_tracer_packets_in_buffer,
total_packets_to_forward,
total_tracer_packets_to_forward,
) = test_prepareing_buffered_packets_for_forwarding(buffered_packet_batches);
assert_eq!(total_tracer_packets_in_buffer, 256);
let total_packets_to_forward =
test_prepareing_buffered_packets_for_forwarding(buffered_packet_batches);
assert_eq!(total_packets_to_forward, 256);
assert_eq!(total_tracer_packets_to_forward, 256);
}

// some packets are forwarded
Expand All @@ -1418,14 +1381,9 @@ mod tests {
}
let buffered_packet_batches: UnprocessedPacketBatches =
UnprocessedPacketBatches::from_iter(packets.clone(), packets.len());
let (
total_tracer_packets_in_buffer,
total_packets_to_forward,
total_tracer_packets_to_forward,
) = test_prepareing_buffered_packets_for_forwarding(buffered_packet_batches);
assert_eq!(total_tracer_packets_in_buffer, 256);
let total_packets_to_forward =
test_prepareing_buffered_packets_for_forwarding(buffered_packet_batches);
assert_eq!(total_packets_to_forward, 256 - num_already_forwarded);
assert_eq!(total_tracer_packets_to_forward, 256 - num_already_forwarded);
}

// all packets are forwarded
Expand All @@ -1435,14 +1393,9 @@ mod tests {
}
let buffered_packet_batches: UnprocessedPacketBatches =
UnprocessedPacketBatches::from_iter(packets.clone(), packets.len());
let (
total_tracer_packets_in_buffer,
total_packets_to_forward,
total_tracer_packets_to_forward,
) = test_prepareing_buffered_packets_for_forwarding(buffered_packet_batches);
assert_eq!(total_tracer_packets_in_buffer, 256);
let total_packets_to_forward =
test_prepareing_buffered_packets_for_forwarding(buffered_packet_batches);
assert_eq!(total_packets_to_forward, 0);
assert_eq!(total_tracer_packets_to_forward, 0);
}
}
}
3 changes: 1 addition & 2 deletions core/src/sigverify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,10 @@ impl SigVerifier for TransactionSigVerifier {
#[inline(always)]
fn process_received_packet(
&mut self,
packet: &mut Packet,
_packet: &mut Packet,
_removed_before_sigverify_stage: bool,
_is_dup: bool,
) {
sigverify::check_for_tracer_packet(packet);
}

#[inline(always)]
Expand Down
23 changes: 3 additions & 20 deletions core/src/sigverify_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,6 @@ mod tests {
packet::{to_packet_batches, Packet},
test_tx::test_tx,
},
solana_sdk::packet::PacketFlags,
};

fn count_non_discard(packet_batches: &[PacketBatch]) -> usize {
Expand All @@ -488,31 +487,15 @@ mod tests {
solana_logger::setup();
let batch_size = 10;
let mut batch = PacketBatch::with_capacity(batch_size);
let mut tracer_packet = Packet::default();
tracer_packet.meta_mut().flags |= PacketFlags::TRACER_PACKET;
batch.resize(batch_size, tracer_packet);
let packet = Packet::default();
batch.resize(batch_size, packet);
batch[3].meta_mut().addr = std::net::IpAddr::from([1u16; 8]);
batch[3].meta_mut().set_discard(true);
let num_discarded_before_filter = 1;
batch[4].meta_mut().addr = std::net::IpAddr::from([2u16; 8]);
let total_num_packets = batch.len();
let mut batches = vec![batch];
let max = 3;
let mut total_tracer_packets_discarded = 0;
SigVerifyStage::discard_excess_packets(&mut batches, max, |packet| {
if packet.meta().is_tracer_packet() {
total_tracer_packets_discarded += 1;
}
});
SigVerifyStage::discard_excess_packets(&mut batches, max, |_| {});
let total_non_discard = count_non_discard(&batches);
let total_discarded = total_num_packets - total_non_discard;
// Every packet except the packets already marked `discard` before the call
// to `discard_excess_packets()` should count towards the
// `total_tracer_packets_discarded`
assert_eq!(
total_tracer_packets_discarded,
total_discarded - num_discarded_before_filter
);
assert_eq!(total_non_discard, max);
assert!(!batches[0][0].meta().discard());
assert!(batches[0][3].meta().discard());
Expand Down
22 changes: 0 additions & 22 deletions perf/src/sigverify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,6 @@ use {
std::{convert::TryFrom, mem::size_of},
};

// Representing key tKeYE4wtowRb8yRroZShTipE18YVnqwXjsSAoNsFU6g
const TRACER_KEY_BYTES: [u8; 32] = [
13, 37, 180, 170, 252, 137, 36, 194, 183, 143, 161, 193, 201, 207, 211, 23, 189, 93, 33, 110,
155, 90, 30, 39, 116, 115, 238, 38, 126, 21, 232, 133,
];
const TRACER_KEY: Pubkey = Pubkey::new_from_array(TRACER_KEY_BYTES);
const TRACER_KEY_OFFSET_IN_TRANSACTION: usize = 69;
// Empirically derived to constrain max verify latency to ~8ms at lower packet counts
pub const VERIFY_PACKET_CHUNK_SIZE: usize = 128;

Expand Down Expand Up @@ -308,21 +301,6 @@ fn do_get_packet_offsets(
))
}

pub fn check_for_tracer_packet(packet: &mut Packet) -> bool {
let first_pubkey_start: usize = TRACER_KEY_OFFSET_IN_TRANSACTION;
let Some(first_pubkey_end) = first_pubkey_start.checked_add(size_of::<Pubkey>()) else {
return false;
};
// Check for tracer pubkey
match packet.data(first_pubkey_start..first_pubkey_end) {
Some(pubkey) if pubkey == TRACER_KEY.as_ref() => {
packet.meta_mut().set_tracer(true);
true
}
_ => false,
}
}

fn get_packet_offsets(
packet: &mut Packet,
current_offset: usize,
Expand Down
15 changes: 3 additions & 12 deletions sdk/packet/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ bitflags! {
const FORWARDED = 0b0000_0010;
const REPAIR = 0b0000_0100;
const SIMPLE_VOTE_TX = 0b0000_1000;
const TRACER_PACKET = 0b0001_0000;
// Previously used - this can now be re-used for something else.
const UNUSED = 0b0010_0000;
const UNUSED_0 = 0b0001_0000;
// Previously used - this can now be re-used for something else.
const UNUSED_1 = 0b0010_0000;
/// For tracking performance
const PERF_TRACK_PACKET = 0b0100_0000;
/// For marking packets from staked nodes
Expand Down Expand Up @@ -264,11 +265,6 @@ impl Meta {
self.flags.set(PacketFlags::DISCARD, discard);
}

#[inline]
pub fn set_tracer(&mut self, is_tracer: bool) {
self.flags.set(PacketFlags::TRACER_PACKET, is_tracer);
}

#[inline]
pub fn set_track_performance(&mut self, is_performance_track: bool) {
self.flags
Expand All @@ -295,11 +291,6 @@ impl Meta {
self.flags.contains(PacketFlags::SIMPLE_VOTE_TX)
}

#[inline]
pub fn is_tracer_packet(&self) -> bool {
self.flags.contains(PacketFlags::TRACER_PACKET)
}

#[inline]
pub fn is_perf_track_packet(&self) -> bool {
self.flags.contains(PacketFlags::PERF_TRACK_PACKET)
Expand Down

0 comments on commit b8cffd2

Please sign in to comment.