diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index c6173c68098502..e8868543553049 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -57,7 +57,7 @@ fn check_txs( let now = Instant::now(); let mut no_bank = false; loop { - if let Ok((_bank, (entry, _tick_height))) = receiver.recv_timeout(Duration::from_millis(10)) + if let Ok(WorkingBankEntry { entry, .. }) = receiver.recv_timeout(Duration::from_millis(10)) { total += entry.transactions.len(); } diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 87735334f460d0..47b98694eeddaa 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -61,7 +61,7 @@ fn check_txs(receiver: &Arc>, ref_tx_count: usize) { let mut total = 0; let now = Instant::now(); loop { - if let Ok((_bank, (entry, _tick_height))) = receiver.recv_timeout(Duration::new(1, 0)) { + if let Ok(WorkingBankEntry { entry, .. }) = receiver.recv_timeout(Duration::new(1, 0)) { total += entry.transactions.len(); } if total >= ref_tx_count { diff --git a/core/benches/consumer.rs b/core/benches/consumer.rs index 9fbbe0d960091e..11bcf94e8e3963 100644 --- a/core/benches/consumer.rs +++ b/core/benches/consumer.rs @@ -11,13 +11,12 @@ use { banking_stage::{committer::Committer, consumer::Consumer}, qos_service::QosService, }, - solana_entry::entry::Entry, solana_ledger::{ blockstore::Blockstore, genesis_utils::{create_genesis_config, GenesisConfigInfo}, }, solana_poh::{ - poh_recorder::{create_test_recorder, PohRecorder}, + poh_recorder::{create_test_recorder, PohRecorder, WorkingBankEntry}, poh_service::PohService, }, solana_runtime::bank::Bank, @@ -88,7 +87,7 @@ struct BenchFrame { exit: Arc, poh_recorder: Arc>, poh_service: PohService, - signal_receiver: Receiver<(Arc, (Entry, u64))>, + signal_receiver: Receiver, } fn setup() -> BenchFrame { diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index dc75d24a537c7a..03ee473fb4cae5 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -598,6 +598,7 @@ mod tests { solana_poh::{ poh_recorder::{ create_test_recorder, PohRecorderError, Record, RecordTransactionsSummary, + WorkingBankEntry, }, poh_service::PohService, }, @@ -741,7 +742,7 @@ mod tests { trace!("getting entries"); let entries: Vec<_> = entry_receiver .iter() - .map(|(_bank, (entry, _tick_height))| entry) + .map(|WorkingBankEntry { entry, .. }| entry) .collect(); trace!("done"); assert_eq!(entries.len(), genesis_config.ticks_per_slot as usize); @@ -863,7 +864,7 @@ mod tests { loop { let entries: Vec = entry_receiver .iter() - .map(|(_bank, (entry, _tick_height))| entry) + .map(|WorkingBankEntry { entry, .. }| entry) .collect(); assert!(entries.verify(&blockhash)); @@ -990,7 +991,7 @@ mod tests { // check that the balance is what we expect. let entries: Vec<_> = entry_receiver .iter() - .map(|(_bank, (entry, _tick_height))| entry) + .map(|WorkingBankEntry { entry, .. }| entry) .collect(); let bank = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); @@ -1052,7 +1053,7 @@ mod tests { ]; let _ = recorder.record_transactions(bank.slot(), txs.clone()); - let (_bank, (entry, _tick_height)) = entry_receiver.recv().unwrap(); + let WorkingBankEntry { entry, .. } = entry_receiver.recv().unwrap(); assert_eq!(entry.transactions, txs); // Once bank is set to a new bank (setting bank.slot() + 1 in record_transactions), diff --git a/core/src/banking_stage/consumer.rs b/core/src/banking_stage/consumer.rs index ee073fc7710af4..e09c47b4e27133 100644 --- a/core/src/banking_stage/consumer.rs +++ b/core/src/banking_stage/consumer.rs @@ -980,7 +980,7 @@ mod tests { let mut done = false; // read entries until I find mine, might be ticks... - while let Ok((_bank, (entry, _tick_height))) = entry_receiver.recv() { + while let Ok(WorkingBankEntry { entry, .. }) = entry_receiver.recv() { if !entry.is_tick() { trace!("got entry"); assert_eq!(entry.transactions.len(), transactions.len()); diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 3b6e4967c3bb04..affd146096b2b3 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -659,7 +659,12 @@ pub mod test { let ticks = create_ticks(max_tick_height - start_tick_height, 0, Hash::default()); for (i, tick) in ticks.into_iter().enumerate() { entry_sender - .send((bank.clone(), (tick, i as u64 + 1))) + .send(WorkingBankEntry { + bank: bank.clone(), + entry: tick, + tick_height: i as u64 + 1, + entry_index: 0, + }) .expect("Expect successful send to broadcast service"); } } diff --git a/core/src/broadcast_stage/broadcast_utils.rs b/core/src/broadcast_stage/broadcast_utils.rs index f9485d59a9ebd8..a2d2ebdd0db2d8 100644 --- a/core/src/broadcast_stage/broadcast_utils.rs +++ b/core/src/broadcast_stage/broadcast_utils.rs @@ -36,13 +36,23 @@ pub(super) fn recv_slot_entries(receiver: &Receiver) -> Result 32 * ShredData::capacity(/*merkle_proof_size*/ None).unwrap() as u64; let timer = Duration::new(1, 0); let recv_start = Instant::now(); - let (mut bank, (entry, mut last_tick_height)) = receiver.recv_timeout(timer)?; + let WorkingBankEntry { + mut bank, + entry, + tick_height: mut last_tick_height, + .. + } = receiver.recv_timeout(timer)?; let mut entries = vec![entry]; assert!(last_tick_height <= bank.max_tick_height()); // Drain channel while last_tick_height != bank.max_tick_height() { - let (try_bank, (entry, tick_height)) = match receiver.try_recv() { + let WorkingBankEntry { + bank: try_bank, + entry, + tick_height, + .. + } = match receiver.try_recv() { Ok(working_bank_entry) => working_bank_entry, Err(_) => break, }; @@ -65,11 +75,15 @@ pub(super) fn recv_slot_entries(receiver: &Receiver) -> Result while last_tick_height != bank.max_tick_height() && serialized_batch_byte_count < target_serialized_batch_byte_count { - let (try_bank, (entry, tick_height)) = - match receiver.recv_deadline(coalesce_start + ENTRY_COALESCE_DURATION) { - Ok(working_bank_entry) => working_bank_entry, - Err(_) => break, - }; + let WorkingBankEntry { + bank: try_bank, + entry, + tick_height, + .. + } = match receiver.recv_deadline(coalesce_start + ENTRY_COALESCE_DURATION) { + Ok(working_bank_entry) => working_bank_entry, + Err(_) => break, + }; // If the bank changed, that implies the previous slot was interrupted and we do not have to // broadcast its entries. if try_bank.slot() != bank.slot() { @@ -139,7 +153,13 @@ mod tests { .map(|i| { let entry = Entry::new(&last_hash, 1, vec![tx.clone()]); last_hash = entry.hash; - s.send((bank1.clone(), (entry.clone(), i))).unwrap(); + s.send(WorkingBankEntry { + bank: bank1.clone(), + entry: entry.clone(), + tick_height: i, + entry_index: 0, + }) + .unwrap(); entry }) .collect(); @@ -173,11 +193,22 @@ mod tests { last_hash = entry.hash; // Interrupt slot 1 right before the last tick if tick_height == expected_last_height { - s.send((bank2.clone(), (entry.clone(), tick_height))) - .unwrap(); + s.send(WorkingBankEntry { + bank: bank2.clone(), + entry: entry.clone(), + tick_height, + entry_index: 0, + }) + .unwrap(); Some(entry) } else { - s.send((bank1.clone(), (entry, tick_height))).unwrap(); + s.send(WorkingBankEntry { + bank: bank1.clone(), + entry, + tick_height, + entry_index: 0, + }) + .unwrap(); None } }) diff --git a/core/src/lib.rs b/core/src/lib.rs index 7f48a9cbeb6c54..6747732231878e 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -74,6 +74,7 @@ mod tower1_14_11; mod tower1_7_14; pub mod tower_storage; pub mod tpu; +mod tpu_entry_notifier; pub mod tracer_packet_stats; pub mod tree_diff; pub mod tvu; diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 75bd7832dbbc22..515c2c5e7ee671 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -15,6 +15,7 @@ use { sigverify::TransactionSigVerifier, sigverify_stage::SigVerifyStage, staked_nodes_updater_service::StakedNodesUpdaterService, + tpu_entry_notifier::TpuEntryNotifier, validator::GeneratorConfig, }, crossbeam_channel::{unbounded, Receiver}, @@ -70,6 +71,7 @@ pub struct Tpu { broadcast_stage: BroadcastStage, tpu_quic_t: thread::JoinHandle<()>, tpu_forwards_quic_t: thread::JoinHandle<()>, + tpu_entry_notifier: Option, staked_nodes_updater_service: StakedNodesUpdaterService, tracer_thread_hdl: TracerThread, } @@ -84,7 +86,7 @@ impl Tpu { sockets: TpuSockets, subscriptions: &Arc, transaction_status_sender: Option, - _entry_notification_sender: Option, + entry_notification_sender: Option, blockstore: &Arc, broadcast_type: &BroadcastStageType, exit: Arc, @@ -229,6 +231,20 @@ impl Tpu { prioritization_fee_cache, ); + let (entry_receiver, tpu_entry_notifier) = + if let Some(entry_notification_sender) = entry_notification_sender { + let (broadcast_entry_sender, broadcast_entry_receiver) = unbounded(); + let tpu_entry_notifier = TpuEntryNotifier::new( + entry_receiver, + entry_notification_sender, + broadcast_entry_sender, + exit.clone(), + ); + (broadcast_entry_receiver, Some(tpu_entry_notifier)) + } else { + (entry_receiver, None) + }; + let broadcast_stage = broadcast_type.new_broadcast_stage( broadcast_sockets, cluster_info.clone(), @@ -249,6 +265,7 @@ impl Tpu { broadcast_stage, tpu_quic_t, tpu_forwards_quic_t, + tpu_entry_notifier, staked_nodes_updater_service, tracer_thread_hdl, } @@ -269,6 +286,9 @@ impl Tpu { for result in results { result?; } + if let Some(tpu_entry_notifier) = self.tpu_entry_notifier { + tpu_entry_notifier.join()?; + } let _ = broadcast_result?; if let Some(tracer_thread_hdl) = self.tracer_thread_hdl { if let Err(tracer_result) = tracer_thread_hdl.join()? { diff --git a/core/src/tpu_entry_notifier.rs b/core/src/tpu_entry_notifier.rs new file mode 100644 index 00000000000000..1c1b4f466dcb24 --- /dev/null +++ b/core/src/tpu_entry_notifier.rs @@ -0,0 +1,82 @@ +use { + crossbeam_channel::{Receiver, RecvTimeoutError, Sender}, + solana_entry::entry::EntrySummary, + solana_ledger::entry_notifier_service::{EntryNotification, EntryNotifierSender}, + solana_poh::poh_recorder::WorkingBankEntry, + std::{ + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + thread::{self, Builder, JoinHandle}, + time::Duration, + }, +}; + +pub(crate) struct TpuEntryNotifier { + thread_hdl: JoinHandle<()>, +} + +impl TpuEntryNotifier { + pub(crate) fn new( + receiver: Receiver, + entry_notification_sender: EntryNotifierSender, + broadcast_entry_sender: Sender, + exit: Arc, + ) -> Self { + let thread_hdl = Builder::new() + .name("solTpuEntry".to_string()) + .spawn(move || loop { + if exit.load(Ordering::Relaxed) { + break; + } + + if let Err(RecvTimeoutError::Disconnected) = Self::send_entry_notification( + &receiver, + &entry_notification_sender, + &broadcast_entry_sender, + ) { + break; + } + }) + .unwrap(); + Self { thread_hdl } + } + + pub(crate) fn send_entry_notification( + receiver: &Receiver, + entry_notification_sender: &EntryNotifierSender, + broadcast_entry_sender: &Sender, + ) -> Result<(), RecvTimeoutError> { + let working_bank_entry = receiver.recv_timeout(Duration::from_secs(1))?; + let slot = working_bank_entry.bank.slot(); + let index = working_bank_entry.entry_index; + + let entry_summary = EntrySummary { + num_hashes: working_bank_entry.entry.num_hashes, + hash: working_bank_entry.entry.hash, + num_transactions: working_bank_entry.entry.transactions.len() as u64, + }; + if let Err(err) = entry_notification_sender.send(EntryNotification { + slot, + index, + entry: entry_summary, + }) { + warn!( + "Failed to send slot {slot:?} entry {index:?} from Tpu to EntryNotifierService, error {err:?}", + ); + } + + // TODO: in PohRecorder, we panic if the send to BroadcastStage fails. Should we do the same here? + if let Err(err) = broadcast_entry_sender.send(working_bank_entry) { + warn!( + "Failed to send slot {slot:?} entry {index:?} from Tpu to BroadcastStage, error {err:?}", + ); + } + Ok(()) + } + + pub(crate) fn join(self) -> thread::Result<()> { + self.thread_hdl.join() + } +} diff --git a/poh/src/poh_recorder.rs b/poh/src/poh_recorder.rs index a78d45fd5b90a3..11cb841a0b0961 100644 --- a/poh/src/poh_recorder.rs +++ b/poh/src/poh_recorder.rs @@ -62,7 +62,13 @@ pub enum PohRecorderError { type Result = std::result::Result; -pub type WorkingBankEntry = (Arc, (Entry, u64)); +#[derive(Clone)] +pub struct WorkingBankEntry { + pub bank: Arc, + pub entry: Entry, + pub tick_height: u64, + pub entry_index: usize, +} #[derive(Debug, Clone)] pub struct BankStart { @@ -264,6 +270,13 @@ pub struct WorkingBank { pub min_tick_height: u64, pub max_tick_height: u64, pub transaction_index: Option, + pub entry_index: usize, +} + +impl WorkingBank { + fn increment_entry_index(&mut self) { + self.entry_index += 1 + } } #[derive(Debug, PartialEq, Eq)] @@ -585,6 +598,7 @@ impl PohRecorder { bank, start: Arc::new(Instant::now()), transaction_index: track_transaction_indexes.then_some(0), + entry_index: 0, }; trace!("new working bank"); assert_eq!(working_bank.bank.ticks_per_slot(), self.ticks_per_slot()); @@ -631,7 +645,7 @@ impl PohRecorder { // will fail instead of broadcasting any ticks let working_bank = self .working_bank - .as_ref() + .as_mut() .ok_or(PohRecorderError::MaxHeightReached)?; if self.tick_height < working_bank.min_tick_height { return Err(PohRecorderError::MinHeightNotReached); @@ -658,7 +672,14 @@ impl PohRecorder { for tick in &self.tick_cache[..entry_count] { working_bank.bank.register_tick(&tick.0.hash); - send_result = self.sender.send((working_bank.bank.clone(), tick.clone())); + let tick_clone = tick.clone(); + send_result = self.sender.send(WorkingBankEntry { + bank: working_bank.bank.clone(), + entry: tick_clone.0, + tick_height: tick_clone.1, + entry_index: working_bank.entry_index, + }); + working_bank.increment_entry_index(); if send_result.is_err() { break; } @@ -881,10 +902,16 @@ impl PohRecorder { transactions, }; let bank_clone = working_bank.bank.clone(); - self.sender.send((bank_clone, (entry, self.tick_height))) + self.sender.send(WorkingBankEntry { + bank: bank_clone, + entry, + tick_height: self.tick_height, + entry_index: working_bank.entry_index, + }) }, "send_poh_entry", ); + working_bank.increment_entry_index(); self.send_entry_us += send_entry_time.as_us(); send_entry_res?; let starting_transaction_index = @@ -1258,7 +1285,7 @@ mod tests { assert_eq!(poh_recorder.tick_height, tick_height_before + 1); assert_eq!(poh_recorder.tick_cache.len(), 0); let mut num_entries = 0; - while let Ok((wbank, (_entry, _tick_height))) = entry_receiver.try_recv() { + while let Ok(WorkingBankEntry { bank: wbank, .. }) = entry_receiver.try_recv() { assert_eq!(wbank.slot(), bank1.slot()); num_entries += 1; } @@ -1449,11 +1476,11 @@ mod tests { //tick in the cache + entry for _ in 0..min_tick_height { - let (_bank, (e, _tick_height)) = entry_receiver.recv().unwrap(); + let WorkingBankEntry { entry: e, .. } = entry_receiver.recv().unwrap(); assert!(e.is_tick()); } - let (_bank, (e, _tick_height)) = entry_receiver.recv().unwrap(); + let WorkingBankEntry { entry: e, .. } = entry_receiver.recv().unwrap(); assert!(!e.is_tick()); } Blockstore::destroy(&ledger_path).unwrap(); @@ -1492,7 +1519,7 @@ mod tests { .record(bank.slot(), h1, vec![tx.into()]) .is_err()); for _ in 0..num_ticks_to_max { - let (_bank, (entry, _tick_height)) = entry_receiver.recv().unwrap(); + let WorkingBankEntry { entry, .. } = entry_receiver.recv().unwrap(); assert!(entry.is_tick()); } } @@ -1571,6 +1598,63 @@ mod tests { Blockstore::destroy(&ledger_path).unwrap(); } + #[test] + fn test_poh_recorder_increment_entry_index() { + let ledger_path = get_tmp_ledger_path!(); + { + let blockstore = Blockstore::open(&ledger_path) + .expect("Expected to be able to open database ledger"); + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2); + let bank = Arc::new(Bank::new_for_tests(&genesis_config)); + let prev_hash = bank.last_blockhash(); + let (mut poh_recorder, entry_receiver, _record_receiver) = PohRecorder::new( + 0, + prev_hash, + bank.clone(), + Some((4, 4)), + bank.ticks_per_slot(), + &Pubkey::default(), + Arc::new(blockstore), + &Arc::new(LeaderScheduleCache::new_from_bank(&bank)), + &PohConfig::default(), + Arc::new(AtomicBool::default()), + ); + + poh_recorder.set_bank(bank.clone(), false); + assert_eq!(poh_recorder.working_bank.as_ref().unwrap().entry_index, 0); + + poh_recorder.tick(); + let WorkingBankEntry { entry_index, .. } = entry_receiver.recv().unwrap(); + assert_eq!(entry_index, 0); + assert_eq!(poh_recorder.working_bank.as_ref().unwrap().entry_index, 1); + + let tx0 = test_tx(); + let tx1 = test_tx(); + let h1 = hash(b"hello world!"); + let _record_result = poh_recorder + .record(bank.slot(), h1, vec![tx0.into(), tx1.into()]) + .unwrap(); + let WorkingBankEntry { entry_index, .. } = entry_receiver.recv().unwrap(); + assert_eq!(entry_index, 1); + assert_eq!(poh_recorder.working_bank.as_ref().unwrap().entry_index, 2); + + poh_recorder.tick(); + let WorkingBankEntry { entry_index, .. } = entry_receiver.recv().unwrap(); + assert_eq!(entry_index, 2); + assert_eq!(poh_recorder.working_bank.as_ref().unwrap().entry_index, 3); + + let tx = test_tx(); + let h2 = hash(b"foobar"); + let _record_result = poh_recorder + .record(bank.slot(), h2, vec![tx.into()]) + .unwrap(); + let WorkingBankEntry { entry_index, .. } = entry_receiver.recv().unwrap(); + assert_eq!(entry_index, 3); + assert_eq!(poh_recorder.working_bank.as_ref().unwrap().entry_index, 4); + } + Blockstore::destroy(&ledger_path).unwrap(); + } + #[test] fn test_poh_cache_on_disconnect() { let ledger_path = get_tmp_ledger_path!(); diff --git a/poh/src/poh_service.rs b/poh/src/poh_service.rs index caa2c2a7c8770a..436bf110406aff 100644 --- a/poh/src/poh_service.rs +++ b/poh/src/poh_service.rs @@ -381,6 +381,7 @@ impl PohService { mod tests { use { super::*, + crate::poh_recorder::WorkingBankEntry, rand::{thread_rng, Rng}, solana_ledger::{ blockstore::Blockstore, @@ -509,7 +510,7 @@ mod tests { let time = Instant::now(); while run_time != 0 || need_tick || need_entry || need_partial { - let (_bank, (entry, _tick_height)) = entry_receiver.recv().unwrap(); + let WorkingBankEntry { entry, .. } = entry_receiver.recv().unwrap(); if entry.is_tick() { num_ticks += 1;