Skip to content

Commit b3ef6f2

Browse files
committed
Add shutoff
1 parent d1df465 commit b3ef6f2

File tree

3 files changed

+29
-3
lines changed

3 files changed

+29
-3
lines changed

poh/src/poh_recorder.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,8 @@ impl TransactionRecorder {
201201
mixin: Hash,
202202
transactions: Vec<VersionedTransaction>,
203203
) -> Result<Option<usize>> {
204+
// Shut off producer if certain height is reached.
205+
204206
let rec_send_res = self
205207
.record_sender
206208
.try_push(Record::new(mixin, transactions, bank_slot));
@@ -459,7 +461,7 @@ impl PohRecorder {
459461
}
460462

461463
self.report_pending_fork_was_detected(next_slot);
462-
if !self.delay_leader_block_for_pending_fork {
464+
if !self.delay_leader_block_for_pending_fork {
463465
// Not configured to wait for pending blocks from previous leader.
464466
return true;
465467
}
@@ -935,6 +937,7 @@ impl PohRecorder {
935937
let ((), report_metrics_us) = measure_us!(self.report_metrics(bank_slot));
936938
self.report_metrics_us += report_metrics_us;
937939

940+
// adityak: can check if rb is full
938941
loop {
939942
let (flush_cache_res, flush_cache_us) = measure_us!(self.flush_cache(false));
940943
self.flush_cache_no_tick_us += flush_cache_us;

poh/src/poh_service.rs

+24-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
//! The `poh_service` module implements a service that records the passing of
22
//! "ticks", a measure of time in the PoH stream
3+
4+
use std::time::Duration;
35
use {
46
crate::{
57
mpsc::{self, Consumer},
@@ -236,6 +238,19 @@ impl PohService {
236238
poh: &Arc<Mutex<Poh>>,
237239
target_ns_per_tick: u64,
238240
) -> bool {
241+
// If the difference between min tick height and capacity is small then shutoff.
242+
// remaining_tick_time, ticks_per_slot, compute_leader_slot_tick_heights
243+
// tick_height, Bank::max_tick_height
244+
let tick_height = poh_recorder.read().unwrap().tick_height();
245+
let max_tick_height = poh_recorder.read().unwrap().bank().unwrap().max_tick_height();
246+
let remaining_slots = max_tick_height - tick_height;
247+
let min_gap = std::cmp::max(max_tick_height - record_receiver.ring_buffer.capacity(), 64);
248+
if remaining_slots < min_gap {
249+
record_receiver.shut_off_producers();
250+
if remaining_slots == 0 {
251+
record_receiver.enable_producers(); // TODO: Do we need to reset the queue?
252+
}
253+
}
239254
match next_record.take() {
240255
Some(mut record) => {
241256
// received message to record
@@ -281,28 +296,35 @@ impl PohService {
281296
// nothing else can be done. tick required.
282297
return true;
283298
}
299+
284300
// check to see if a record request has been sent
285301
if let Some(record) = record_receiver.pop() {
286302
// remember the record we just received as the next record to occur
287303
*next_record = Some(record);
288304
break;
289305
}
290306
// check to see if we need to wait to catch up to ideal
291-
let wait_start = Instant::now();
307+
let mut wait_start = Instant::now();
308+
292309
if ideal_time <= wait_start {
293310
// no, keep hashing. We still hold the lock.
294311
continue;
295312
}
296313

297314
// busy wait, polling for new records and after dropping poh lock (reset can occur, for example)
298315
drop(poh_l);
299-
while ideal_time > Instant::now() {
316+
let d = Duration::from_nanos(target_ns_per_tick*64);
317+
while ideal_time > wait_start {
318+
if ideal_time.checked_duration_since(wait_start).unwrap().ge(&d) {
319+
record_receiver.shut_off_producers()
320+
}
300321
// check to see if a record request has been sent
301322
if let Some(record) = record_receiver.pop() {
302323
// remember the record we just received as the next record to occur
303324
*next_record = Some(record);
304325
break;
305326
}
327+
wait_start = Instant::now();
306328
}
307329
timing.total_sleep_us += wait_start.elapsed().as_micros() as u64;
308330
break;

poh/src/ring_buffer.rs

+1
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ impl<T> RingBuffer<T> {
113113
let ProducerCheckResult::CanWrite(cell) = self.producer_check() else {
114114
return Err(item);
115115
};
116+
// If the buffer is at capacity then wait for a bit.
116117

117118
cell.set(item);
118119
Ok(())

0 commit comments

Comments
 (0)