diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index 6a2f1faf3161c0..a12f4e16100f6e 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -2,7 +2,7 @@ use { crate::result::{Error, Result}, - crossbeam_channel::{unbounded, RecvTimeoutError}, + crossbeam_channel::{unbounded, RecvTimeoutError, TrySendError}, solana_clock::{DEFAULT_TICKS_PER_SLOT, HOLD_TRANSACTIONS_SLOT_OFFSET}, solana_metrics::{inc_new_counter_debug, inc_new_counter_info}, solana_packet::PacketFlags, @@ -124,12 +124,23 @@ impl FetchStage { .unwrap() .would_be_leader(HOLD_TRANSACTIONS_SLOT_OFFSET.saturating_mul(DEFAULT_TICKS_PER_SLOT)) { - inc_new_counter_debug!("fetch_stage-honor_forwards", num_packets); + let mut packets_sent = 0usize; + let mut packets_dropped = 0usize; for packet_batch in packet_batches { - #[allow(clippy::question_mark)] - if sendr.send(packet_batch).is_err() { - return Err(Error::Send); - } + let packets_in_batch = packet_batch.len(); + match sendr.try_send(packet_batch) { + Ok(()) => { + packets_sent += packets_in_batch; + } + Err(TrySendError::Full(_)) => { + packets_dropped += packets_in_batch; + } + Err(TrySendError::Disconnected(_)) => return Err(Error::Send), + }; + } + inc_new_counter_debug!("fetch_stage-honor_forwards", packets_sent); + if packets_dropped > 0 { + inc_new_counter_error!("fetch_stage-dropped_forwards", packets_dropped); } } else { inc_new_counter_info!("fetch_stage-discard_forwards", num_packets);