Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/benches/sigverify_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ fn bench_sigverify_stage(bencher: &mut Bencher, use_same_tx: bool) {
let (packet_s, packet_r) = unbounded();
let (verified_s, verified_r) = BankingTracer::channel_for_test();
let verifier = TransactionSigVerifier::new(verified_s);
let stage = SigVerifyStage::new(packet_r, verifier, "bench");
let stage = SigVerifyStage::new(packet_r, verifier, "solSigVerBench", "bench");

bencher.iter(move || {
let now = Instant::now();
Expand Down
5 changes: 4 additions & 1 deletion core/src/shred_fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ impl ShredFetchStage {
#[allow(clippy::too_many_arguments)]
fn packet_modifier(
receiver_thread_name: &'static str,
modifier_thread_name: &'static str,
sockets: Vec<Arc<UdpSocket>>,
exit: Arc<AtomicBool>,
sender: Sender<PacketBatch>,
Expand Down Expand Up @@ -178,7 +179,7 @@ impl ShredFetchStage {
})
.collect();
let modifier_hdl = Builder::new()
.name("solTvuFetchPMod".to_string())
.name(modifier_thread_name.to_string())
.spawn(move || {
let repair_context = repair_context
.as_ref()
Expand Down Expand Up @@ -215,6 +216,7 @@ impl ShredFetchStage {

let (mut tvu_threads, tvu_filter) = Self::packet_modifier(
"solRcvrShred",
"solTvuPktMod",
sockets,
exit.clone(),
sender.clone(),
Expand All @@ -229,6 +231,7 @@ impl ShredFetchStage {

let (repair_receiver, repair_handler) = Self::packet_modifier(
"solRcvrShredRep",
"solTvuRepPktMod",
vec![repair_socket.clone()],
exit.clone(),
sender.clone(),
Expand Down
23 changes: 9 additions & 14 deletions core/src/sigverify_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,11 @@ impl SigVerifyStage {
pub fn new<T: SigVerifier + 'static + Send>(
packet_receiver: Receiver<PacketBatch>,
verifier: T,
name: &'static str,
thread_name: &'static str,
metrics_name: &'static str,
) -> Self {
let thread_hdl = Self::verifier_services(packet_receiver, verifier, name);
let thread_hdl =
Self::verifier_service(packet_receiver, verifier, thread_name, metrics_name);
Self { thread_hdl }
}

Expand Down Expand Up @@ -407,15 +409,16 @@ impl SigVerifyStage {
fn verifier_service<T: SigVerifier + 'static + Send>(
packet_receiver: Receiver<PacketBatch>,
mut verifier: T,
name: &'static str,
thread_name: &'static str,
metrics_name: &'static str,
) -> JoinHandle<()> {
let mut stats = SigVerifierStats::default();
let mut last_print = Instant::now();
const MAX_DEDUPER_AGE: Duration = Duration::from_secs(2);
const DEDUPER_FALSE_POSITIVE_RATE: f64 = 0.001;
const DEDUPER_NUM_BITS: u64 = 63_999_979;
Builder::new()
.name("solSigVerifier".to_string())
.name(thread_name.to_string())
.spawn(move || {
let mut rng = rand::thread_rng();
let mut deduper = Deduper::<2, [u8]>::new(&mut rng, DEDUPER_NUM_BITS);
Expand All @@ -440,7 +443,7 @@ impl SigVerifyStage {
}
}
if last_print.elapsed().as_secs() > 2 {
stats.report(name);
stats.report(metrics_name);
stats = SigVerifierStats::default();
last_print = Instant::now();
}
Expand All @@ -449,14 +452,6 @@ impl SigVerifyStage {
.unwrap()
}

fn verifier_services<T: SigVerifier + 'static + Send>(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did this exist?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question - my guess is this used to contain multiple thread handles but got refactored to only include the single thread that it currently does

packet_receiver: Receiver<PacketBatch>,
verifier: T,
name: &'static str,
) -> JoinHandle<()> {
Self::verifier_service(packet_receiver, verifier, name)
}

pub fn join(self) -> thread::Result<()> {
self.thread_hdl.join()
}
Expand Down Expand Up @@ -552,7 +547,7 @@ mod tests {
let (packet_s, packet_r) = unbounded();
let (verified_s, verified_r) = BankingTracer::channel_for_test();
let verifier = TransactionSigVerifier::new(verified_s);
let stage = SigVerifyStage::new(packet_r, verifier, "test");
let stage = SigVerifyStage::new(packet_r, verifier, "solSigVerTest", "test");

let now = Instant::now();
let packets_per_batch = 128;
Expand Down
9 changes: 7 additions & 2 deletions core/src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,14 +190,19 @@ impl Tpu {

let sigverify_stage = {
let verifier = TransactionSigVerifier::new(non_vote_sender);
SigVerifyStage::new(packet_receiver, verifier, "tpu-verifier")
SigVerifyStage::new(packet_receiver, verifier, "solSigVerTpu", "tpu-verifier")
};

let (tpu_vote_sender, tpu_vote_receiver) = banking_tracer.create_channel_tpu_vote();

let vote_sigverify_stage = {
let verifier = TransactionSigVerifier::new_reject_non_vote(tpu_vote_sender);
SigVerifyStage::new(vote_packet_receiver, verifier, "tpu-vote-verifier")
SigVerifyStage::new(
vote_packet_receiver,
verifier,
"solSigVerTpuVot",
"tpu-vote-verifier",
)
};

let (gossip_vote_sender, gossip_vote_receiver) =
Expand Down