Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion banking-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ fn check_txs(
let now = Instant::now();
let mut no_bank = false;
loop {
if let Ok((_bank, (entry, _tick_height))) = receiver.recv_timeout(Duration::from_millis(10))
if let Ok(WorkingBankEntry { entry, .. }) = receiver.recv_timeout(Duration::from_millis(10))
{
total += entry.transactions.len();
}
Expand Down
2 changes: 1 addition & 1 deletion core/benches/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ fn check_txs(receiver: &Arc<Receiver<WorkingBankEntry>>, ref_tx_count: usize) {
let mut total = 0;
let now = Instant::now();
loop {
if let Ok((_bank, (entry, _tick_height))) = receiver.recv_timeout(Duration::new(1, 0)) {
if let Ok(WorkingBankEntry { entry, .. }) = receiver.recv_timeout(Duration::new(1, 0)) {
total += entry.transactions.len();
}
if total >= ref_tx_count {
Expand Down
5 changes: 2 additions & 3 deletions core/benches/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,12 @@ use {
banking_stage::{committer::Committer, consumer::Consumer},
qos_service::QosService,
},
solana_entry::entry::Entry,
solana_ledger::{
blockstore::Blockstore,
genesis_utils::{create_genesis_config, GenesisConfigInfo},
},
solana_poh::{
poh_recorder::{create_test_recorder, PohRecorder},
poh_recorder::{create_test_recorder, PohRecorder, WorkingBankEntry},
poh_service::PohService,
},
solana_runtime::bank::Bank,
Expand Down Expand Up @@ -88,7 +87,7 @@ struct BenchFrame {
exit: Arc<AtomicBool>,
poh_recorder: Arc<RwLock<PohRecorder>>,
poh_service: PohService,
signal_receiver: Receiver<(Arc<Bank>, (Entry, u64))>,
signal_receiver: Receiver<WorkingBankEntry>,
}

fn setup() -> BenchFrame {
Expand Down
9 changes: 5 additions & 4 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,7 @@ mod tests {
solana_poh::{
poh_recorder::{
create_test_recorder, PohRecorderError, Record, RecordTransactionsSummary,
WorkingBankEntry,
},
poh_service::PohService,
},
Expand Down Expand Up @@ -741,7 +742,7 @@ mod tests {
trace!("getting entries");
let entries: Vec<_> = entry_receiver
.iter()
.map(|(_bank, (entry, _tick_height))| entry)
.map(|WorkingBankEntry { entry, .. }| entry)
.collect();
trace!("done");
assert_eq!(entries.len(), genesis_config.ticks_per_slot as usize);
Expand Down Expand Up @@ -863,7 +864,7 @@ mod tests {
loop {
let entries: Vec<Entry> = entry_receiver
.iter()
.map(|(_bank, (entry, _tick_height))| entry)
.map(|WorkingBankEntry { entry, .. }| entry)
.collect();

assert!(entries.verify(&blockhash));
Expand Down Expand Up @@ -990,7 +991,7 @@ mod tests {
// check that the balance is what we expect.
let entries: Vec<_> = entry_receiver
.iter()
.map(|(_bank, (entry, _tick_height))| entry)
.map(|WorkingBankEntry { entry, .. }| entry)
.collect();

let bank = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);
Expand Down Expand Up @@ -1052,7 +1053,7 @@ mod tests {
];

let _ = recorder.record_transactions(bank.slot(), txs.clone());
let (_bank, (entry, _tick_height)) = entry_receiver.recv().unwrap();
let WorkingBankEntry { entry, .. } = entry_receiver.recv().unwrap();
assert_eq!(entry.transactions, txs);

// Once bank is set to a new bank (setting bank.slot() + 1 in record_transactions),
Expand Down
2 changes: 1 addition & 1 deletion core/src/banking_stage/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -980,7 +980,7 @@ mod tests {

let mut done = false;
// read entries until I find mine, might be ticks...
while let Ok((_bank, (entry, _tick_height))) = entry_receiver.recv() {
while let Ok(WorkingBankEntry { entry, .. }) = entry_receiver.recv() {
if !entry.is_tick() {
trace!("got entry");
assert_eq!(entry.transactions.len(), transactions.len());
Expand Down
7 changes: 6 additions & 1 deletion core/src/broadcast_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,12 @@ pub mod test {
let ticks = create_ticks(max_tick_height - start_tick_height, 0, Hash::default());
for (i, tick) in ticks.into_iter().enumerate() {
entry_sender
.send((bank.clone(), (tick, i as u64 + 1)))
.send(WorkingBankEntry {
bank: bank.clone(),
entry: tick,
tick_height: i as u64 + 1,
entry_index: 0,
})
.expect("Expect successful send to broadcast service");
}
}
Expand Down
53 changes: 42 additions & 11 deletions core/src/broadcast_stage/broadcast_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,23 @@ pub(super) fn recv_slot_entries(receiver: &Receiver<WorkingBankEntry>) -> Result
32 * ShredData::capacity(/*merkle_proof_size*/ None).unwrap() as u64;
let timer = Duration::new(1, 0);
let recv_start = Instant::now();
let (mut bank, (entry, mut last_tick_height)) = receiver.recv_timeout(timer)?;
let WorkingBankEntry {
mut bank,
entry,
tick_height: mut last_tick_height,
..
} = receiver.recv_timeout(timer)?;
let mut entries = vec![entry];
assert!(last_tick_height <= bank.max_tick_height());

// Drain channel
while last_tick_height != bank.max_tick_height() {
let (try_bank, (entry, tick_height)) = match receiver.try_recv() {
let WorkingBankEntry {
bank: try_bank,
entry,
tick_height,
..
} = match receiver.try_recv() {
Ok(working_bank_entry) => working_bank_entry,
Err(_) => break,
};
Expand All @@ -65,11 +75,15 @@ pub(super) fn recv_slot_entries(receiver: &Receiver<WorkingBankEntry>) -> Result
while last_tick_height != bank.max_tick_height()
&& serialized_batch_byte_count < target_serialized_batch_byte_count
{
let (try_bank, (entry, tick_height)) =
match receiver.recv_deadline(coalesce_start + ENTRY_COALESCE_DURATION) {
Ok(working_bank_entry) => working_bank_entry,
Err(_) => break,
};
let WorkingBankEntry {
bank: try_bank,
entry,
tick_height,
..
} = match receiver.recv_deadline(coalesce_start + ENTRY_COALESCE_DURATION) {
Ok(working_bank_entry) => working_bank_entry,
Err(_) => break,
};
// If the bank changed, that implies the previous slot was interrupted and we do not have to
// broadcast its entries.
if try_bank.slot() != bank.slot() {
Expand Down Expand Up @@ -139,7 +153,13 @@ mod tests {
.map(|i| {
let entry = Entry::new(&last_hash, 1, vec![tx.clone()]);
last_hash = entry.hash;
s.send((bank1.clone(), (entry.clone(), i))).unwrap();
s.send(WorkingBankEntry {
bank: bank1.clone(),
entry: entry.clone(),
tick_height: i,
entry_index: 0,
})
.unwrap();
entry
})
.collect();
Expand Down Expand Up @@ -173,11 +193,22 @@ mod tests {
last_hash = entry.hash;
// Interrupt slot 1 right before the last tick
if tick_height == expected_last_height {
s.send((bank2.clone(), (entry.clone(), tick_height)))
.unwrap();
s.send(WorkingBankEntry {
bank: bank2.clone(),
entry: entry.clone(),
tick_height,
entry_index: 0,
})
.unwrap();
Some(entry)
} else {
s.send((bank1.clone(), (entry, tick_height))).unwrap();
s.send(WorkingBankEntry {
bank: bank1.clone(),
entry,
tick_height,
entry_index: 0,
})
.unwrap();
None
}
})
Expand Down
1 change: 1 addition & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ mod tower1_14_11;
mod tower1_7_14;
pub mod tower_storage;
pub mod tpu;
mod tpu_entry_notifier;
pub mod tracer_packet_stats;
pub mod tree_diff;
pub mod tvu;
Expand Down
22 changes: 21 additions & 1 deletion core/src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use {
sigverify::TransactionSigVerifier,
sigverify_stage::SigVerifyStage,
staked_nodes_updater_service::StakedNodesUpdaterService,
tpu_entry_notifier::TpuEntryNotifier,
validator::GeneratorConfig,
},
crossbeam_channel::{unbounded, Receiver},
Expand Down Expand Up @@ -70,6 +71,7 @@ pub struct Tpu {
broadcast_stage: BroadcastStage,
tpu_quic_t: thread::JoinHandle<()>,
tpu_forwards_quic_t: thread::JoinHandle<()>,
tpu_entry_notifier: Option<TpuEntryNotifier>,
staked_nodes_updater_service: StakedNodesUpdaterService,
tracer_thread_hdl: TracerThread,
}
Expand All @@ -84,7 +86,7 @@ impl Tpu {
sockets: TpuSockets,
subscriptions: &Arc<RpcSubscriptions>,
transaction_status_sender: Option<TransactionStatusSender>,
_entry_notification_sender: Option<EntryNotifierSender>,
entry_notification_sender: Option<EntryNotifierSender>,
blockstore: &Arc<Blockstore>,
broadcast_type: &BroadcastStageType,
exit: Arc<AtomicBool>,
Expand Down Expand Up @@ -229,6 +231,20 @@ impl Tpu {
prioritization_fee_cache,
);

let (entry_receiver, tpu_entry_notifier) =
if let Some(entry_notification_sender) = entry_notification_sender {
let (broadcast_entry_sender, broadcast_entry_receiver) = unbounded();
let tpu_entry_notifier = TpuEntryNotifier::new(
entry_receiver,
entry_notification_sender,
broadcast_entry_sender,
exit.clone(),
);
(broadcast_entry_receiver, Some(tpu_entry_notifier))
} else {
(entry_receiver, None)
};

let broadcast_stage = broadcast_type.new_broadcast_stage(
broadcast_sockets,
cluster_info.clone(),
Expand All @@ -249,6 +265,7 @@ impl Tpu {
broadcast_stage,
tpu_quic_t,
tpu_forwards_quic_t,
tpu_entry_notifier,
staked_nodes_updater_service,
tracer_thread_hdl,
}
Expand All @@ -269,6 +286,9 @@ impl Tpu {
for result in results {
result?;
}
if let Some(tpu_entry_notifier) = self.tpu_entry_notifier {
tpu_entry_notifier.join()?;
}
let _ = broadcast_result?;
if let Some(tracer_thread_hdl) = self.tracer_thread_hdl {
if let Err(tracer_result) = tracer_thread_hdl.join()? {
Expand Down
82 changes: 82 additions & 0 deletions core/src/tpu_entry_notifier.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use {
crossbeam_channel::{Receiver, RecvTimeoutError, Sender},
solana_entry::entry::EntrySummary,
solana_ledger::entry_notifier_service::{EntryNotification, EntryNotifierSender},
solana_poh::poh_recorder::WorkingBankEntry,
std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread::{self, Builder, JoinHandle},
time::Duration,
},
};

pub(crate) struct TpuEntryNotifier {
thread_hdl: JoinHandle<()>,
}

impl TpuEntryNotifier {
pub(crate) fn new(
receiver: Receiver<WorkingBankEntry>,
entry_notification_sender: EntryNotifierSender,
broadcast_entry_sender: Sender<WorkingBankEntry>,
exit: Arc<AtomicBool>,
) -> Self {
let thread_hdl = Builder::new()
.name("solTpuEntry".to_string())
.spawn(move || loop {
if exit.load(Ordering::Relaxed) {
break;
}

if let Err(RecvTimeoutError::Disconnected) = Self::send_entry_notification(
&receiver,
&entry_notification_sender,
&broadcast_entry_sender,
) {
break;
}
})
.unwrap();
Self { thread_hdl }
}

pub(crate) fn send_entry_notification(
receiver: &Receiver<WorkingBankEntry>,
entry_notification_sender: &EntryNotifierSender,
broadcast_entry_sender: &Sender<WorkingBankEntry>,
) -> Result<(), RecvTimeoutError> {
let working_bank_entry = receiver.recv_timeout(Duration::from_secs(1))?;
let slot = working_bank_entry.bank.slot();
let index = working_bank_entry.entry_index;

let entry_summary = EntrySummary {
num_hashes: working_bank_entry.entry.num_hashes,
hash: working_bank_entry.entry.hash,
num_transactions: working_bank_entry.entry.transactions.len() as u64,
};
if let Err(err) = entry_notification_sender.send(EntryNotification {
slot,
index,
entry: entry_summary,
}) {
warn!(
"Failed to send slot {slot:?} entry {index:?} from Tpu to EntryNotifierService, error {err:?}",
);
}

// TODO: in PohRecorder, we panic if the send to BroadcastStage fails. Should we do the same here?
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts on this question?
Also, would you want to see this send in a separate thread?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we panic in pohrecorder? PohService waits for an exit signal https://github.com/apfitzge/solana/blob/abbf96f17aca9647766559db5b5de714726da033/poh/src/poh_service.rs#L371, and poh_recorder doesn't panic on record either

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would you want to see this send in a separate thread?

This is already running in a separate TpuEntryNotifier thread right? Seems like this send is perfectly fine here since it's nonblocking

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we panic in pohrecorder? PohService waits for an exit signal https://github.com/apfitzge/solana/blob/abbf96f17aca9647766559db5b5de714726da033/poh/src/poh_service.rs#L371, and poh_recorder doesn't panic on record either

This is the panic I was referring to:

Err(e) => panic!("Poh recorder returned unexpected error: {e:?}"),

It's a bit of a chore to track all the results around, so I might be misreading. But I think that if the BroadcastStage send fails with an Err, this returns an Err, which gets returned to that method with the panic I linked above here. Is that right?

if let Err(err) = broadcast_entry_sender.send(working_bank_entry) {
warn!(
"Failed to send slot {slot:?} entry {index:?} from Tpu to BroadcastStage, error {err:?}",
);
}
Ok(())
}

pub(crate) fn join(self) -> thread::Result<()> {
self.thread_hdl.join()
}
}
Loading