Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions core/benches/sigverify_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ fn bench_packet_discard(bencher: &mut Bencher) {
.map(|_| {
let mut addr = [0u16; 8];
thread_rng().fill(&mut addr);
addr
std::net::IpAddr::from(addr)
})
.collect();

Expand All @@ -54,7 +54,7 @@ fn bench_packet_discard(bencher: &mut Bencher) {
SigVerifyStage::discard_excess_packets(&mut batches, 10_000);
for batch in batches.iter_mut() {
for p in batch.packets.iter_mut() {
p.meta.discard = false;
p.meta.set_discard(false);
}
}
});
Expand Down
25 changes: 9 additions & 16 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ impl BankingStage {
let packet_vec: Vec<_> = packets
.iter()
.filter_map(|p| {
if !p.meta.forwarded && data_budget.take(p.meta.size) {
if !p.meta.forwarded() && data_budget.take(p.meta.size) {
Some((&p.data[..p.meta.size], tpu_forwards))
} else {
None
Expand Down Expand Up @@ -1125,7 +1125,7 @@ impl BankingStage {
.iter()
.filter_map(|tx_index| {
let p = &packet_batch.packets[*tx_index];
if votes_only && !p.meta.is_simple_vote_tx {
if votes_only && !p.meta.is_simple_vote_tx() {
return None;
}

Expand All @@ -1135,7 +1135,7 @@ impl BankingStage {
let tx = SanitizedTransaction::try_create(
tx,
message_hash,
Some(p.meta.is_simple_vote_tx),
Some(p.meta.is_simple_vote_tx()),
|_| Err(TransactionError::UnsupportedVersion),
)
.ok()?;
Expand Down Expand Up @@ -1306,15 +1306,8 @@ impl BankingStage {
fn generate_packet_indexes(vers: &PinnedVec<Packet>) -> Vec<usize> {
vers.iter()
.enumerate()
.filter_map(
|(index, ver)| {
if !ver.meta.discard {
Some(index)
} else {
None
}
},
)
.filter(|(_, pkt)| !pkt.meta.discard())
.map(|(index, _)| index)
.collect()
}

Expand Down Expand Up @@ -1492,7 +1485,7 @@ mod tests {
get_tmp_ledger_path,
leader_schedule_cache::LeaderScheduleCache,
},
solana_perf::packet::to_packet_batches,
solana_perf::packet::{to_packet_batches, PacketFlags},
solana_poh::{
poh_recorder::{create_test_recorder, Record, WorkingBankEntry},
poh_service::PohService,
Expand Down Expand Up @@ -1638,7 +1631,7 @@ mod tests {
b.packets
.iter_mut()
.zip(v)
.for_each(|(p, f)| p.meta.discard = *f == 0)
.for_each(|(p, f)| p.meta.set_discard(*f == 0))
});
with_vers.into_iter().map(|(b, _)| b).collect()
}
Expand Down Expand Up @@ -2843,7 +2836,7 @@ mod tests {
const FWD_PACKET: u8 = 1;
let forwarded_packet = {
let mut packet = Packet::from_data(None, &[FWD_PACKET]).unwrap();
packet.meta.forwarded = true;
packet.meta.flags |= PacketFlags::FORWARDED;
packet
};

Expand Down Expand Up @@ -3084,7 +3077,7 @@ mod tests {
packet_indexes.push(index);
}
for index in vote_indexes.iter() {
packet_batch.packets[*index].meta.is_simple_vote_tx = true;
packet_batch.packets[*index].meta.flags |= PacketFlags::SIMPLE_VOTE_TX;
}
(packet_batch, packet_indexes)
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/cluster_info_vote_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ impl ClusterInfoVoteListener {
.filter(|(_, packet_batch)| {
// to_packet_batches() above splits into 1 packet long batches
assert_eq!(packet_batch.packets.len(), 1);
!packet_batch.packets[0].meta.discard
!packet_batch.packets[0].meta.discard()
})
.filter_map(|(tx, packet_batch)| {
let (vote_account_key, vote, _) = vote_transaction::parse_vote_transaction(&tx)?;
Expand Down
7 changes: 5 additions & 2 deletions core/src/fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ use {
solana_metrics::{inc_new_counter_debug, inc_new_counter_info},
solana_perf::{packet::PacketBatchRecycler, recycler::Recycler},
solana_poh::poh_recorder::PohRecorder,
solana_sdk::{clock::DEFAULT_TICKS_PER_SLOT, packet::Packet},
solana_sdk::{
clock::DEFAULT_TICKS_PER_SLOT,
packet::{Packet, PacketFlags},
},
solana_streamer::streamer::{self, PacketBatchReceiver, PacketBatchSender},
std::{
net::UdpSocket,
Expand Down Expand Up @@ -84,7 +87,7 @@ impl FetchStage {
poh_recorder: &Arc<Mutex<PohRecorder>>,
) -> Result<()> {
let mark_forwarded = |packet: &mut Packet| {
packet.meta.forwarded = true;
packet.meta.flags |= PacketFlags::FORWARDED;
};

let mut packet_batch = recvr.recv()?;
Expand Down
7 changes: 5 additions & 2 deletions core/src/repair_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ mod test {
shred::{Shred, Shredder},
sigverify_shreds::verify_shred_cpu,
},
solana_sdk::signature::{Keypair, Signer},
solana_sdk::{
packet::PacketFlags,
signature::{Keypair, Signer},
},
std::{
collections::HashMap,
net::{IpAddr, Ipv4Addr},
Expand Down Expand Up @@ -87,7 +90,7 @@ mod test {
nonce,
)
.unwrap();
packet.meta.repair = true;
packet.meta.flags |= PacketFlags::REPAIR;

let leader_slots = [(slot, keypair.pubkey().to_bytes())]
.iter()
Expand Down
2 changes: 1 addition & 1 deletion core/src/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ mod tests {
let mut packet_batch = PacketBatch::new(vec![]);
solana_streamer::packet::recv_from(&mut packet_batch, &me_retransmit, 1).unwrap();
assert_eq!(packet_batch.packets.len(), 1);
assert!(!packet_batch.packets[0].meta.repair);
assert!(!packet_batch.packets[0].meta.repair());
}

#[test]
Expand Down
26 changes: 13 additions & 13 deletions core/src/shred_fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use {
solana_ledger::shred::{get_shred_slot_index_type, ShredFetchStats},
solana_perf::{
cuda_runtime::PinnedVec,
packet::{Packet, PacketBatchRecycler},
packet::{Packet, PacketBatchRecycler, PacketFlags},
recycler::Recycler,
},
solana_runtime::bank_forks::BankForks,
Expand Down Expand Up @@ -40,7 +40,7 @@ impl ShredFetchStage {
) where
F: Fn(&mut Packet),
{
p.meta.discard = true;
p.meta.set_discard(true);
if let Some((slot, _index, _shred_type)) = get_shred_slot_index_type(p, stats) {
// Seems reasonable to limit shreds to 2 epochs away
if slot > last_root && slot < (last_slot + 2 * slots_per_epoch) {
Expand All @@ -50,7 +50,7 @@ impl ShredFetchStage {

if shreds_received.get(&hash).is_none() {
shreds_received.put(hash, ());
p.meta.discard = false;
p.meta.set_discard(false);
modify(p);
} else {
stats.duplicate_shred += 1;
Expand Down Expand Up @@ -192,7 +192,7 @@ impl ShredFetchStage {
recycler.clone(),
bank_forks.clone(),
"shred_fetch_tvu_forwards",
|p| p.meta.forwarded = true,
|p| p.meta.flags.insert(PacketFlags::FORWARDED),
);

let (repair_receiver, repair_handler) = Self::packet_modifier(
Expand All @@ -202,7 +202,7 @@ impl ShredFetchStage {
recycler,
bank_forks,
"shred_fetch_repair",
|p| p.meta.repair = true,
|p| p.meta.flags.insert(PacketFlags::REPAIR),
);

tvu_threads.extend(tvu_forwards_threads.into_iter());
Expand Down Expand Up @@ -266,7 +266,7 @@ mod tests {
&|_p| {},
&hasher,
);
assert!(!packet.meta.discard);
assert!(!packet.meta.discard());
let coding = solana_ledger::shred::Shredder::generate_coding_shreds(
&[shred],
false, // is_last_in_slot
Expand All @@ -283,7 +283,7 @@ mod tests {
&|_p| {},
&hasher,
);
assert!(!packet.meta.discard);
assert!(!packet.meta.discard());
}

#[test]
Expand All @@ -310,7 +310,7 @@ mod tests {
&hasher,
);
assert_eq!(stats.index_overrun, 1);
assert!(packet.meta.discard);
assert!(packet.meta.discard());
let shred = Shred::new_from_data(1, 3, 0, None, true, true, 0, 0, 0);
shred.copy_to_packet(&mut packet);

Expand All @@ -325,7 +325,7 @@ mod tests {
&|_p| {},
&hasher,
);
assert!(packet.meta.discard);
assert!(packet.meta.discard());

// Accepted for 1,3
ShredFetchStage::process_packet(
Expand All @@ -338,7 +338,7 @@ mod tests {
&|_p| {},
&hasher,
);
assert!(!packet.meta.discard);
assert!(!packet.meta.discard());

// shreds_received should filter duplicate
ShredFetchStage::process_packet(
Expand All @@ -351,7 +351,7 @@ mod tests {
&|_p| {},
&hasher,
);
assert!(packet.meta.discard);
assert!(packet.meta.discard());

let shred = Shred::new_from_data(1_000_000, 3, 0, None, true, true, 0, 0, 0);
shred.copy_to_packet(&mut packet);
Expand All @@ -367,7 +367,7 @@ mod tests {
&|_p| {},
&hasher,
);
assert!(packet.meta.discard);
assert!(packet.meta.discard());

let index = MAX_DATA_SHREDS_PER_SLOT as u32;
let shred = Shred::new_from_data(5, index, 0, None, true, true, 0, 0, 0);
Expand All @@ -382,6 +382,6 @@ mod tests {
&|_p| {},
&hasher,
);
assert!(packet.meta.discard);
assert!(packet.meta.discard());
}
}
4 changes: 2 additions & 2 deletions core/src/sigverify_shreds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ pub mod tests {
batches[0].packets[1].meta.size = shred.payload.len();

let rv = verifier.verify_batches(batches);
assert!(!rv[0].packets[0].meta.discard);
assert!(rv[0].packets[1].meta.discard);
assert!(!rv[0].packets[0].meta.discard());
assert!(rv[0].packets[1].meta.discard());
}
}
12 changes: 7 additions & 5 deletions core/src/sigverify_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,9 @@ impl SigVerifyStage {
}
for (_addr, indexes) in received_ips {
for (batch_index, packet_index) in indexes {
batches[batch_index].packets[packet_index].meta.discard = true;
batches[batch_index].packets[packet_index]
.meta
.set_discard(true);
}
}
}
Expand Down Expand Up @@ -275,7 +277,7 @@ mod tests {
batch
.packets
.iter()
.map(|p| if p.meta.discard { 0 } else { 1 })
.map(|p| if p.meta.discard() { 0 } else { 1 })
.sum::<usize>()
})
.sum::<usize>()
Expand All @@ -286,12 +288,12 @@ mod tests {
solana_logger::setup();
let mut batch = PacketBatch::default();
batch.packets.resize(10, Packet::default());
batch.packets[3].meta.addr = [1u16; 8];
batch.packets[3].meta.addr = std::net::IpAddr::from([1u16; 8]);
let mut batches = vec![batch];
let max = 3;
SigVerifyStage::discard_excess_packets(&mut batches, max);
assert_eq!(count_non_discard(&batches), max);
assert!(!batches[0].packets[0].meta.discard);
assert!(!batches[0].packets[3].meta.discard);
assert!(!batches[0].packets[0].meta.discard());
assert!(!batches[0].packets[3].meta.discard());
}
}
4 changes: 2 additions & 2 deletions core/src/window_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ where
let last_root = blockstore.last_root();
let working_bank = bank_forks.read().unwrap().working_bank();
let handle_packet = |packet: &Packet| {
if packet.meta.discard {
if packet.meta.discard() {
inc_new_counter_debug!("streamer-recv_window-invalid_or_unnecessary_packet", 1);
return None;
}
Expand All @@ -375,7 +375,7 @@ where
if !shred_filter(&shred, working_bank.clone(), last_root) {
return None;
}
if packet.meta.repair {
if packet.meta.repair() {
let repair_info = RepairMeta {
_from_addr: packet.meta.addr(),
// If can't parse the nonce, dump the packet.
Expand Down
2 changes: 1 addition & 1 deletion entry/src/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ pub fn start_verify_transactions(
);
let verified = packet_batches
.iter()
.all(|batch| batch.packets.iter().all(|p| !p.meta.discard));
.all(|batch| batch.packets.iter().all(|p| !p.meta.discard()));
verify_time.stop();
(verified, verify_time.as_us())
});
Expand Down
8 changes: 4 additions & 4 deletions ledger/src/sigverify_shreds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,15 @@ pub fn verify_shred_cpu(packet: &Packet, slot_leaders: &HashMap<u64, [u8; 32]>)
let slot_start = sig_end + size_of::<ShredType>();
let slot_end = slot_start + size_of::<u64>();
let msg_start = sig_end;
if packet.meta.discard {
if packet.meta.discard() {
return Some(0);
}
trace!("slot start and end {} {}", slot_start, slot_end);
if packet.meta.size < slot_end {
return Some(0);
}
let slot: u64 = limited_deserialize(&packet.data[slot_start..slot_end]).ok()?;
let msg_end = if packet.meta.repair {
let msg_end = if packet.meta.repair() {
packet.meta.size.saturating_sub(SIZE_OF_NONCE)
} else {
packet.meta.size
Expand Down Expand Up @@ -119,7 +119,7 @@ fn slot_key_data_for_gpu<
.map(|packet| {
let slot_start = size_of::<Signature>() + size_of::<ShredType>();
let slot_end = slot_start + size_of::<u64>();
if packet.meta.size < slot_end || packet.meta.discard {
if packet.meta.size < slot_end || packet.meta.discard() {
return std::u64::MAX;
}
let slot: Option<u64> =
Expand Down Expand Up @@ -204,7 +204,7 @@ fn shred_gpu_offsets(
let sig_start = pubkeys_end;
let sig_end = sig_start + size_of::<Signature>();
let msg_start = sig_end;
let msg_end = if packet.meta.repair {
let msg_end = if packet.meta.repair() {
sig_start + packet.meta.size.saturating_sub(SIZE_OF_NONCE)
} else {
sig_start + packet.meta.size
Expand Down
2 changes: 1 addition & 1 deletion perf/src/packet.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! The `packet` module defines data structures and methods to pull data from the network.
pub use solana_sdk::packet::{Meta, Packet, PACKET_DATA_SIZE};
pub use solana_sdk::packet::{Meta, Packet, PacketFlags, PACKET_DATA_SIZE};
use {
crate::{cuda_runtime::PinnedVec, recycler::Recycler},
bincode::config::Options,
Expand Down
Loading