From 9acfd120b45b4f25983cf6bc085a9a2a2630070a Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Tue, 23 Dec 2025 09:43:23 +0000 Subject: [PATCH] avoid using recv in sigverify --- core/src/sigverify_stage.rs | 6 +++--- streamer/src/streamer.rs | 40 +++++++++++++++++++++++++++---------- 2 files changed, 33 insertions(+), 13 deletions(-) diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index 18fcd9329118a2..c5c7dceb68c0cf 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -293,6 +293,9 @@ impl SigVerifyStage { stats: &mut SigVerifierStats, ) -> Result<(), T::SendType> { let (mut batches, num_packets, recv_duration) = streamer::recv_packet_batches(recvr)?; + if num_packets == 0 { + return Ok(()); + } let batches_len = batches.len(); debug!( @@ -408,9 +411,6 @@ impl SigVerifyStage { SigVerifyServiceError::Streamer(StreamerError::RecvTimeout( RecvTimeoutError::Disconnected, )) => break, - SigVerifyServiceError::Streamer(StreamerError::RecvTimeout( - RecvTimeoutError::Timeout, - )) => (), SigVerifyServiceError::Send(_) => { break; } diff --git a/streamer/src/streamer.rs b/streamer/src/streamer.rs index 0fb370db480245..c0459885a31d64 100644 --- a/streamer/src/streamer.rs +++ b/streamer/src/streamer.rs @@ -9,7 +9,9 @@ use { }, sendmmsg::{batch_send, SendPktsError}, }, - crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender, TrySendError}, + crossbeam_channel::{ + Receiver, RecvTimeoutError, SendError, Sender, TryRecvError, TrySendError, + }, histogram::Histogram, solana_net_utils::{ multihomed_sockets::{ @@ -494,16 +496,34 @@ fn recv_send( pub fn recv_packet_batches( recvr: &PacketBatchReceiver, ) -> Result<(Vec, usize, Duration)> { + const MAX_RECV_ATTEMPTS: usize = 1_000; let recv_start = Instant::now(); - let timer = Duration::new(1, 0); - let packet_batch = recvr.recv_timeout(timer)?; - trace!("got packets"); - let mut num_packets = packet_batch.len(); - let mut packet_batches = vec![packet_batch]; - while let Ok(packet_batch) = recvr.try_recv() { - trace!("got more packets"); - num_packets += packet_batch.len(); - packet_batches.push(packet_batch); + + let mut num_packets = 0; + let mut packet_batches = Vec::new(); + let mut num_attempts = 0; + + while num_attempts < MAX_RECV_ATTEMPTS { + loop { + match recvr.try_recv() { + Ok(packet_batch) => { + trace!("got more packets"); + num_packets += packet_batch.len(); + packet_batches.push(packet_batch); + } + Err(TryRecvError::Empty) => { + break; + } + Err(TryRecvError::Disconnected) => { + return Err(StreamerError::RecvTimeout(RecvTimeoutError::Disconnected)); + } + } + } + if num_packets > 0 { + break; + } + sleep(Duration::from_millis(1)); + num_attempts += 1; } let recv_duration = recv_start.elapsed(); trace!(