Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Tracer packet as a concept #4043

Merged
merged 9 commits into from
Dec 12, 2024
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ Release channels have their own copy of this changelog:
* CLI:
* Add global `--skip-preflight` option for skipping preflight checks on all transactions sent through RPC. This flag, along with `--use-rpc`, can improve success rate with program deployments using the public RPC nodes.
* Unhide `--accounts-db-access-storages-method` for agave-validator and agave-ledger-tool
* Remove tracer stats from banking-trace. `banking-trace` directory should be cleared when restarting on v2.2 for first time. It will not break if not cleared, but the file will be a mix of new/old format. (#4043)

## [2.1.0]
* Breaking:
Expand Down
2 changes: 1 addition & 1 deletion banking-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ fn main() {
timestamp(),
);
non_vote_sender
.send(BankingPacketBatch::new((vec![packet_batch.clone()], None)))
.send(BankingPacketBatch::new(vec![packet_batch.clone()]))
.unwrap();
}

Expand Down
12 changes: 5 additions & 7 deletions core/benches/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,16 +319,14 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
let mut sent = 0;
if let Some(vote_packets) = &vote_packets {
tpu_vote_sender
.send(BankingPacketBatch::new((
.send(BankingPacketBatch::new(
vote_packets[start..start + chunk_len].to_vec(),
None,
)))
))
.unwrap();
gossip_vote_sender
.send(BankingPacketBatch::new((
.send(BankingPacketBatch::new(
vote_packets[start..start + chunk_len].to_vec(),
None,
)))
))
.unwrap();
}
for v in verified[start..start + chunk_len].chunks(chunk_len / num_threads) {
Expand All @@ -343,7 +341,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
sent += xv.len();
}
non_vote_sender
.send(BankingPacketBatch::new((v.to_vec(), None)))
.send(BankingPacketBatch::new(v.to_vec()))
.unwrap();
}

Expand Down
36 changes: 8 additions & 28 deletions core/benches/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,12 @@ extern crate test;
use {
itertools::Itertools,
solana_client::connection_cache::ConnectionCache,
solana_core::{
banking_stage::{
forwarder::Forwarder,
leader_slot_metrics::LeaderSlotMetricsTracker,
unprocessed_packet_batches::{DeserializedPacket, UnprocessedPacketBatches},
unprocessed_transaction_storage::{ThreadType, UnprocessedTransactionStorage},
BankingStageStats,
},
tracer_packet_stats::TracerPacketStats,
solana_core::banking_stage::{
forwarder::Forwarder,
leader_slot_metrics::LeaderSlotMetricsTracker,
unprocessed_packet_batches::{DeserializedPacket, UnprocessedPacketBatches},
unprocessed_transaction_storage::{ThreadType, UnprocessedTransactionStorage},
BankingStageStats,
},
solana_gossip::cluster_info::{ClusterInfo, Node},
solana_ledger::{
Expand All @@ -38,7 +35,6 @@ struct BenchSetup {
unprocessed_packet_batches: UnprocessedTransactionStorage,
tracker: LeaderSlotMetricsTracker,
stats: BankingStageStats,
tracer_stats: TracerPacketStats,
}

fn setup(num_packets: usize, contentious_transaction: bool) -> BenchSetup {
Expand Down Expand Up @@ -88,7 +84,6 @@ fn setup(num_packets: usize, contentious_transaction: bool) -> BenchSetup {
transaction.message.account_keys[0] = solana_sdk::pubkey::Pubkey::new_unique();
}
let mut packet = Packet::from_data(None, transaction).unwrap();
packet.meta_mut().set_tracer(true);
packet.meta_mut().set_from_staked_node(true);
DeserializedPacket::new(packet).unwrap()
})
Expand Down Expand Up @@ -118,7 +113,6 @@ fn setup(num_packets: usize, contentious_transaction: bool) -> BenchSetup {
unprocessed_packet_batches,
tracker: LeaderSlotMetricsTracker::new(0),
stats: BankingStageStats::default(),
tracer_stats: TracerPacketStats::new(0),
}
}

Expand All @@ -132,19 +126,12 @@ fn bench_forwarder_handle_forwading_contentious_transaction(bencher: &mut Benche
mut unprocessed_packet_batches,
mut tracker,
stats,
mut tracer_stats,
} = setup(num_packets, true);

// hold packets so they can be reused for benching
let hold = true;
bencher.iter(|| {
forwarder.handle_forwarding(
&mut unprocessed_packet_batches,
hold,
&mut tracker,
&stats,
&mut tracer_stats,
);
forwarder.handle_forwarding(&mut unprocessed_packet_batches, hold, &mut tracker, &stats);
// reset packet.forwarded flag to reuse `unprocessed_packet_batches`
if let UnprocessedTransactionStorage::LocalTransactionStorage(unprocessed_packets) =
&mut unprocessed_packet_batches
Expand All @@ -169,19 +156,12 @@ fn bench_forwarder_handle_forwading_parallel_transactions(bencher: &mut Bencher)
mut unprocessed_packet_batches,
mut tracker,
stats,
mut tracer_stats,
} = setup(num_packets, false);

// hold packets so they can be reused for benching
let hold = true;
bencher.iter(|| {
forwarder.handle_forwarding(
&mut unprocessed_packet_batches,
hold,
&mut tracker,
&stats,
&mut tracer_stats,
);
forwarder.handle_forwarding(&mut unprocessed_packet_batches, hold, &mut tracker, &stats);
// reset packet.forwarded flag to reuse `unprocessed_packet_batches`
if let UnprocessedTransactionStorage::LocalTransactionStorage(unprocessed_packets) =
&mut unprocessed_packet_batches
Expand Down
20 changes: 6 additions & 14 deletions core/benches/sigverify_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use {
},
solana_sdk::{
hash::Hash,
packet::PacketFlags,
signature::{Keypair, Signer},
system_transaction,
},
Expand Down Expand Up @@ -58,7 +57,7 @@ fn run_bench_packet_discard(num_ips: usize, bencher: &mut Bencher) {
info!("total packets: {}", total);

bencher.iter(move || {
SigVerifyStage::discard_excess_packets(&mut batches, 10_000, |_| ());
SigVerifyStage::discard_excess_packets(&mut batches, 10_000);
let mut num_packets = 0;
for batch in batches.iter_mut() {
for p in batch.iter_mut() {
Expand Down Expand Up @@ -105,7 +104,7 @@ fn bench_packet_discard_mixed_senders(bencher: &mut Bencher) {
}
}
bencher.iter(move || {
SigVerifyStage::discard_excess_packets(&mut batches, 10_000, |_| ());
SigVerifyStage::discard_excess_packets(&mut batches, 10_000);
let mut num_packets = 0;
for batch in batches.iter_mut() {
for packet in batch.iter_mut() {
Expand Down Expand Up @@ -171,24 +170,17 @@ fn bench_sigverify_stage(bencher: &mut Bencher, use_same_tx: bool) {
);

let mut sent_len = 0;
for mut batch in batches.into_iter() {
for batch in batches.into_iter() {
sent_len += batch.len();
batch
.iter_mut()
.for_each(|packet| packet.meta_mut().flags |= PacketFlags::TRACER_PACKET);
packet_s.send(batch).unwrap();
}
let mut received = 0;
let mut total_tracer_packets_received_in_sigverify_stage = 0;
trace!("sent: {}", sent_len);
loop {
if let Ok(message) = verified_r.recv_timeout(Duration::from_millis(10)) {
let (verifieds, tracer_packet_stats) = (&message.0, message.1.as_ref().unwrap());
if let Ok(verifieds) = verified_r.recv_timeout(Duration::from_millis(10)) {
received += verifieds.iter().map(|batch| batch.len()).sum::<usize>();
total_tracer_packets_received_in_sigverify_stage +=
tracer_packet_stats.total_tracer_packets_received_in_sigverify_stage;
test::black_box(message);
if total_tracer_packets_received_in_sigverify_stage >= sent_len {
test::black_box(verifieds);
if received >= sent_len {
break;
}
}
Expand Down
3 changes: 1 addition & 2 deletions core/src/banking_simulation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -822,8 +822,7 @@ impl BankingSimulator {
let timed_batches_to_send = packet_batches_by_time.split_off(&base_event_time);
let batch_and_tx_counts = timed_batches_to_send
.values()
.map(|(_label, batches_with_stats)| {
let batches = &batches_with_stats.0;
.map(|(_label, batches)| {
(
batches.len(),
batches.iter().map(|batch| batch.len()).sum::<usize>(),
Expand Down
19 changes: 4 additions & 15 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use {
},
},
banking_trace::BankingPacketReceiver,
tracer_packet_stats::TracerPacketStats,
validator::BlockProductionMethod,
},
crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender},
Expand Down Expand Up @@ -319,8 +318,6 @@ pub enum ForwardOption {
#[derive(Debug, Default)]
pub struct FilterForwardingResults {
pub(crate) total_forwardable_packets: usize,
pub(crate) total_tracer_packets_in_buffer: usize,
pub(crate) total_forwardable_tracer_packets: usize,
pub(crate) total_dropped_packets: usize,
pub(crate) total_packet_conversion_us: u64,
pub(crate) total_filter_packets_us: u64,
Expand Down Expand Up @@ -686,7 +683,6 @@ impl BankingStage {
unprocessed_transaction_storage: &mut UnprocessedTransactionStorage,
banking_stage_stats: &BankingStageStats,
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
tracer_packet_stats: &mut TracerPacketStats,
) {
if unprocessed_transaction_storage.should_not_process() {
return;
Expand Down Expand Up @@ -722,7 +718,6 @@ impl BankingStage {
false,
slot_metrics_tracker,
banking_stage_stats,
tracer_packet_stats,
));
slot_metrics_tracker.increment_forward_us(forward_us);
// Take metrics action after forwarding packets to include forwarded
Expand All @@ -735,7 +730,6 @@ impl BankingStage {
true,
slot_metrics_tracker,
banking_stage_stats,
tracer_packet_stats,
));
slot_metrics_tracker.increment_forward_and_hold_us(forward_and_hold_us);
// Take metrics action after forwarding packets
Expand All @@ -754,7 +748,6 @@ impl BankingStage {
mut unprocessed_transaction_storage: UnprocessedTransactionStorage,
) {
let mut banking_stage_stats = BankingStageStats::new(id);
let mut tracer_packet_stats = TracerPacketStats::new(id);

let mut slot_metrics_tracker = LeaderSlotMetricsTracker::new(id);
let mut last_metrics_update = Instant::now();
Expand All @@ -770,19 +763,15 @@ impl BankingStage {
&mut unprocessed_transaction_storage,
&banking_stage_stats,
&mut slot_metrics_tracker,
&mut tracer_packet_stats,
));
slot_metrics_tracker
.increment_process_buffered_packets_us(process_buffered_packets_us);
last_metrics_update = Instant::now();
}

tracer_packet_stats.report(1000);

match packet_receiver.receive_and_buffer_packets(
&mut unprocessed_transaction_storage,
&mut banking_stage_stats,
&mut tracer_packet_stats,
&mut slot_metrics_tracker,
) {
Ok(()) | Err(RecvTimeoutError::Timeout) => (),
Expand Down Expand Up @@ -1073,7 +1062,7 @@ mod tests {
.collect();
let packet_batches = convert_from_old_verified(packet_batches);
non_vote_sender // no_ver, anf, tx
.send(BankingPacketBatch::new((packet_batches, None)))
.send(BankingPacketBatch::new(packet_batches))
.unwrap();

drop(non_vote_sender);
Expand Down Expand Up @@ -1152,7 +1141,7 @@ mod tests {
.collect();
let packet_batches = convert_from_old_verified(packet_batches);
non_vote_sender
.send(BankingPacketBatch::new((packet_batches, None)))
.send(BankingPacketBatch::new(packet_batches))
.unwrap();

// Process a second batch that uses the same from account, so conflicts with above TX
Expand All @@ -1165,7 +1154,7 @@ mod tests {
.collect();
let packet_batches = convert_from_old_verified(packet_batches);
non_vote_sender
.send(BankingPacketBatch::new((packet_batches, None)))
.send(BankingPacketBatch::new(packet_batches))
.unwrap();

let (tpu_vote_sender, tpu_vote_receiver) = banking_tracer.create_channel_tpu_vote();
Expand Down Expand Up @@ -1472,7 +1461,7 @@ mod tests {
Builder::new()
.spawn(move || {
sender
.send(BankingPacketBatch::new((packet_batches, None)))
.send(BankingPacketBatch::new(packet_batches))
.unwrap()
})
.unwrap()
Expand Down
15 changes: 1 addition & 14 deletions core/src/banking_stage/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use {
immutable_deserialized_packet::ImmutableDeserializedPacket, LikeClusterInfo,
},
next_leader::{next_leader, next_leader_tpu_vote},
tracer_packet_stats::TracerPacketStats,
},
solana_client::connection_cache::ConnectionCache,
solana_connection_cache::client_connection::ClientConnection as TpuConnection,
Expand Down Expand Up @@ -96,7 +95,6 @@ impl<T: LikeClusterInfo> Forwarder<T> {
hold: bool,
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
banking_stage_stats: &BankingStageStats,
tracer_packet_stats: &mut TracerPacketStats,
) {
let forward_option = unprocessed_transaction_storage.forward_option();

Expand Down Expand Up @@ -139,19 +137,13 @@ impl<T: LikeClusterInfo> Forwarder<T> {
slot_metrics_tracker.increment_forwardable_batches_count(1);

let batched_forwardable_packets_count = forward_batch.len();
let (_forward_result, successful_forwarded_packets_count, leader_pubkey) = self
let (_forward_result, successful_forwarded_packets_count, _leader_pubkey) = self
.forward_buffered_packets(
&forward_option,
forward_batch.get_forwardable_packets(),
banking_stage_stats,
);

if let Some(leader_pubkey) = leader_pubkey {
tracer_packet_stats.increment_total_forwardable_tracer_packets(
filter_forwarding_result.total_forwardable_tracer_packets,
leader_pubkey,
);
}
let failed_forwarded_packets_count = batched_forwardable_packets_count
.saturating_sub(successful_forwarded_packets_count);

Expand All @@ -174,9 +166,6 @@ impl<T: LikeClusterInfo> Forwarder<T> {
slot_metrics_tracker.increment_cleared_from_buffer_after_forward_count(
filter_forwarding_result.total_forwardable_packets as u64,
);
tracer_packet_stats.increment_total_cleared_from_buffer_after_forward(
filter_forwarding_result.total_tracer_packets_in_buffer,
);
unprocessed_transaction_storage.clear_forwarded_packets();
}
}
Expand Down Expand Up @@ -485,7 +474,6 @@ mod tests {
true,
&mut LeaderSlotMetricsTracker::new(0),
&stats,
&mut TracerPacketStats::new(0),
);

let recv_socket = &local_node.sockets.tpu_forwards_quic[0];
Expand Down Expand Up @@ -584,7 +572,6 @@ mod tests {
hold,
&mut LeaderSlotMetricsTracker::new(0),
&stats,
&mut TracerPacketStats::new(0),
);

let recv_socket = &local_node.sockets.tpu_forwards_quic[0];
Expand Down
Loading
Loading