Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 22 additions & 5 deletions core/src/block_creation_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ use {
replay_stage::{Finalizer, ReplayStage},
},
solana_clock::Slot,
solana_entry::block_component::{
BlockFooterV1, BlockMarkerV1, VersionedBlockFooter, VersionedBlockMarker,
},
solana_gossip::cluster_info::ClusterInfo,
solana_ledger::{
blockstore::Blockstore,
Expand All @@ -27,6 +30,7 @@ use {
bank::{Bank, NewBankOptions},
bank_forks::BankForks,
},
solana_version::version,
solana_votor::{common::block_timeout, event::LeaderWindowInfo, votor::LeaderWindowNotifier},
stats::{BlockCreationLoopMetrics, SlotMetrics},
std::{
Expand Down Expand Up @@ -106,6 +110,18 @@ enum StartLeaderError {
),
}

fn produce_block_footer(block_producer_time_nanos: u64) -> VersionedBlockMarker {
let footer = BlockFooterV1 {
block_producer_time_nanos,
block_user_agent: format!("agave/{}", version!()).into_bytes(),
Comment thread
ksn6 marked this conversation as resolved.
};

let footer = VersionedBlockFooter::Current(footer);
let footer = BlockMarkerV1::BlockFooter(footer);

VersionedBlockMarker::Current(footer)
}

/// The block creation loop.
///
/// The `votor::consensus_pool_service` tracks when it is our leader window, and
Expand Down Expand Up @@ -278,7 +294,7 @@ fn produce_window(
/// Afterwards:
/// - Shutdown the record receiver
/// - Clear any inflight records
/// - TODO: insert the block footer
/// - Insert the block footer
/// - Insert the alpentick
/// - Clear the working bank
fn record_and_complete_block(
Expand Down Expand Up @@ -317,10 +333,13 @@ fn record_and_complete_block(
)?;
}

// TODO: insert block footer
// Construct and send the block footer
let mut w_poh_recorder = poh_recorder.write().unwrap();
let block_producer_time_nanos = w_poh_recorder.working_bank_block_producer_time_nanos();
let footer = produce_block_footer(block_producer_time_nanos);
w_poh_recorder.send_marker(footer)?;

// Alpentick and clear bank
let mut w_poh_recorder = poh_recorder.write().unwrap();
let bank = w_poh_recorder
.bank()
.expect("Bank cannot have been cleared as BlockCreationLoop is the only modifier");
Expand Down Expand Up @@ -513,8 +532,6 @@ fn create_and_insert_leader_bank(slot: Slot, parent_bank: Arc<Bank>, ctx: &mut L
ctx.poh_recorder.write().unwrap().set_bank(tpu_bank);
ctx.record_receiver.restart(slot);

// TODO: fill in the block header

info!(
"{}: new fork:{} parent:{} (leader) root:{}",
ctx.my_pubkey, slot, parent_slot, root_slot
Expand Down
49 changes: 45 additions & 4 deletions poh/src/poh_recorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use {
log::*,
solana_clock::{Slot, NUM_CONSECUTIVE_LEADER_SLOTS},
solana_entry::{
block_component::VersionedBlockMarker,
entry::Entry,
entry_marker::EntryMarker,
poh::{Poh, PohEntry},
Expand All @@ -40,7 +41,7 @@ use {
atomic::{AtomicBool, AtomicU64, Ordering},
Arc, Mutex, RwLock,
},
time::Instant,
time::{Instant, SystemTime, UNIX_EPOCH},
},
thiserror::Error,
};
Expand Down Expand Up @@ -97,7 +98,7 @@ impl Record {

pub struct WorkingBank {
pub bank: BankWithScheduler,
pub start: Arc<Instant>,
pub start: Arc<SystemTime>,
pub min_tick_height: u64,
pub max_tick_height: u64,
}
Expand Down Expand Up @@ -306,6 +307,23 @@ impl PohRecorder {
self.leader_last_tick_height = leader_last_tick_height;
}

pub fn send_marker(&mut self, marker: VersionedBlockMarker) -> Result<()> {
let tick_height = self.tick_height();
let working_bank = self
.working_bank
.as_mut()
.ok_or(PohRecorderError::MaxHeightReached)?;

self.working_bank_sender
.send((
working_bank.bank.clone(),
(EntryMarker::Marker(marker), tick_height),
))
.unwrap();

Ok(())
}

// Returns the index of `transactions.first()` in the slot, if being tracked by WorkingBank
#[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
pub fn record(
Expand Down Expand Up @@ -439,7 +457,7 @@ impl PohRecorder {
min_tick_height: bank.tick_height(),
max_tick_height: bank.max_tick_height(),
bank,
start: Arc::new(Instant::now()),
start: Arc::new(SystemTime::now()),
};
trace!("new working bank");
assert_eq!(working_bank.bank.ticks_per_slot(), self.ticks_per_slot());
Expand Down Expand Up @@ -486,10 +504,20 @@ impl PohRecorder {
.store(leader_first_tick_height);
self.leader_last_tick_height = leader_last_tick_height;

let elapsed_time = start.elapsed().map(|dur| dur.as_millis());
if let Err(err) = elapsed_time.as_ref() {
error!("Likely misconfigured system clock. Error: {err:?}");
}

datapoint_info!(
"leader-slot-start-to-cleared-elapsed-ms",
("slot", bank.slot(), i64),
("elapsed", start.elapsed().as_millis(), i64),
(
"elapsed",
// This errors if the clock drifts backwards, in which case we just return 0.
elapsed_time.unwrap_or(0),
i64
),
);
}

Expand Down Expand Up @@ -676,6 +704,10 @@ impl PohRecorder {
self.ticks_per_slot
}

pub fn working_bank(&self) -> Option<&WorkingBank> {
self.working_bank.as_ref()
}

/// Returns a shared reference to the working bank, if it exists.
/// This allows for other threads to access the working bank
/// without needing to lock poh recorder.
Expand Down Expand Up @@ -902,6 +934,15 @@ impl PohRecorder {
self.clear_bank();
}

pub fn working_bank_block_producer_time_nanos(&self) -> u64 {
self.working_bank()
.unwrap()
.start
.duration_since(UNIX_EPOCH)
.expect("Misconfigured system clock; couldn't measure block producer time.")
.as_nanos() as u64
}

pub fn tick_alpenglow(&mut self, slot_max_tick_height: u64) {
let (poh_entry, tick_lock_contention_us) = measure_us!({
let mut poh_l = self.poh.lock().unwrap();
Expand Down