Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ mod tower1_14_11;
mod tower1_7_14;
pub mod tower_storage;
pub mod tpu;
mod tpu_entry_notifier;
pub mod tracer_packet_stats;
pub mod tree_diff;
pub mod tvu;
Expand Down
22 changes: 21 additions & 1 deletion core/src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use {
sigverify::TransactionSigVerifier,
sigverify_stage::SigVerifyStage,
staked_nodes_updater_service::StakedNodesUpdaterService,
tpu_entry_notifier::TpuEntryNotifier,
validator::GeneratorConfig,
},
crossbeam_channel::{unbounded, Receiver},
Expand Down Expand Up @@ -70,6 +71,7 @@ pub struct Tpu {
broadcast_stage: BroadcastStage,
tpu_quic_t: thread::JoinHandle<()>,
tpu_forwards_quic_t: thread::JoinHandle<()>,
tpu_entry_notifier: Option<TpuEntryNotifier>,
staked_nodes_updater_service: StakedNodesUpdaterService,
tracer_thread_hdl: TracerThread,
}
Expand All @@ -84,7 +86,7 @@ impl Tpu {
sockets: TpuSockets,
subscriptions: &Arc<RpcSubscriptions>,
transaction_status_sender: Option<TransactionStatusSender>,
_entry_notification_sender: Option<EntryNotifierSender>,
entry_notification_sender: Option<EntryNotifierSender>,
blockstore: &Arc<Blockstore>,
broadcast_type: &BroadcastStageType,
exit: &Arc<AtomicBool>,
Expand Down Expand Up @@ -229,6 +231,20 @@ impl Tpu {
prioritization_fee_cache,
);

let (entry_receiver, tpu_entry_notifier) =
if let Some(entry_notification_sender) = entry_notification_sender {
let (broadcast_entry_sender, broadcast_entry_receiver) = unbounded();
let tpu_entry_notifier = TpuEntryNotifier::new(
entry_receiver,
entry_notification_sender,
broadcast_entry_sender,
exit.clone(),
);
(broadcast_entry_receiver, Some(tpu_entry_notifier))
} else {
(entry_receiver, None)
};

let broadcast_stage = broadcast_type.new_broadcast_stage(
broadcast_sockets,
cluster_info.clone(),
Expand All @@ -249,6 +265,7 @@ impl Tpu {
broadcast_stage,
tpu_quic_t,
tpu_forwards_quic_t,
tpu_entry_notifier,
staked_nodes_updater_service,
tracer_thread_hdl,
}
Expand All @@ -269,6 +286,9 @@ impl Tpu {
for result in results {
result?;
}
if let Some(tpu_entry_notifier) = self.tpu_entry_notifier {
tpu_entry_notifier.join()?;
}
let _ = broadcast_result?;
if let Some(tracer_thread_hdl) = self.tracer_thread_hdl {
if let Err(tracer_result) = tracer_thread_hdl.join()? {
Expand Down
101 changes: 101 additions & 0 deletions core/src/tpu_entry_notifier.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
use {
crossbeam_channel::{Receiver, RecvTimeoutError, Sender},
solana_entry::entry::EntrySummary,
solana_ledger::entry_notifier_service::{EntryNotification, EntryNotifierSender},
solana_poh::poh_recorder::WorkingBankEntry,
std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread::{self, Builder, JoinHandle},
time::Duration,
},
};

pub(crate) struct TpuEntryNotifier {
thread_hdl: JoinHandle<()>,
}

impl TpuEntryNotifier {
pub(crate) fn new(
entry_receiver: Receiver<WorkingBankEntry>,
entry_notification_sender: EntryNotifierSender,
broadcast_entry_sender: Sender<WorkingBankEntry>,
exit: Arc<AtomicBool>,
) -> Self {
let thread_hdl = Builder::new()
.name("solTpuEntry".to_string())
.spawn(move || {
let mut current_slot = 0;
let mut current_index = 0;
loop {
if exit.load(Ordering::Relaxed) {
break;
}

if let Err(RecvTimeoutError::Disconnected) = Self::send_entry_notification(
exit.clone(),
&entry_receiver,
&entry_notification_sender,
&broadcast_entry_sender,
&mut current_slot,
&mut current_index,
) {
break;
}
}
})
.unwrap();
Self { thread_hdl }
}

pub(crate) fn send_entry_notification(
exit: Arc<AtomicBool>,
entry_receiver: &Receiver<WorkingBankEntry>,
entry_notification_sender: &EntryNotifierSender,
broadcast_entry_sender: &Sender<WorkingBankEntry>,
current_slot: &mut u64,
current_index: &mut usize,
) -> Result<(), RecvTimeoutError> {
let (bank, (entry, tick_height)) = entry_receiver.recv_timeout(Duration::from_secs(1))?;
let slot = bank.slot();
let index = if slot != *current_slot {
*current_index = 0;
*current_slot = slot;
0
} else {
*current_index += 1;
*current_index
};

let entry_summary = EntrySummary {
num_hashes: entry.num_hashes,
hash: entry.hash,
num_transactions: entry.transactions.len() as u64,
};
if let Err(err) = entry_notification_sender.send(EntryNotification {
slot,
index,
entry: entry_summary,
}) {
warn!(
"Failed to send slot {slot:?} entry {index:?} from Tpu to EntryNotifierService, error {err:?}",
);
}

if let Err(err) = broadcast_entry_sender.send((bank, (entry, tick_height))) {
warn!(
"Failed to send slot {slot:?} entry {index:?} from Tpu to BroadcastStage, error {err:?}",
);
// If the BroadcastStage channel is closed, the validator has halted. Try to exit
// gracefully.
exit.store(true, Ordering::Relaxed);
}
Ok(())
}

pub(crate) fn join(self) -> thread::Result<()> {
self.thread_hdl.join()
}
}