Skip to content

Commit

Permalink
perf_event: Remove the size limit from EventIter.
Browse files Browse the repository at this point in the history
To sort perf events on the fly, all available events from one stream must be
examined at once, which is not possible with the current EventIter restricted
to up to 31 event at once.

Rewrite the out-of-order commit logic so that pending commits are tracked in
a binary heap, which will remain small as long as events are dropped in-order
but can also comfortably accommodate an unbounded amount of out-of-order drop.

This changes the event iteration order. Previously, we iterated over 31 events
from each buffer before going on to the next. Now, all available events in
each buffer will be consumed before going on to the next. This incurs more
buffering than before, but live event sorting will incur even more buffering,
so seeing this as a transition step it should be acceptable.
  • Loading branch information
ishitatsuyuki committed Feb 27, 2023
1 parent c06dffb commit cd846d4
Showing 1 changed file with 54 additions and 68 deletions.
122 changes: 54 additions & 68 deletions samply/src/linux/perf_event.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{cmp, fmt};
use std::cmp::max;
use std::fmt;
use std::collections::BinaryHeap;
use std::io;
use std::mem;
use std::ops::Range;
Expand Down Expand Up @@ -69,6 +70,13 @@ unsafe fn read_head(pointer: *const u8) -> u64 {
head
}

unsafe fn read_tail(pointer: *const u8) -> u64 {
let page = &*(pointer as *const PerfEventMmapPage);
// No memory fence required because we're just reading a value previously
// written by us.
ptr::read_volatile(&page.data_tail)
}

unsafe fn write_tail(pointer: *mut u8, value: u64) {
let page = &mut *(pointer as *mut PerfEventMmapPage);
fence(Ordering::AcqRel);
Expand Down Expand Up @@ -462,17 +470,34 @@ impl Perf {
struct EventRefState {
buffer: *mut u8,
size: u64,
done: u32,
positions: [u64; 32],
pending_commits: BinaryHeap<cmp::Reverse<(u64, u64)>>,
}

impl EventRefState {
fn new(buffer: *mut u8, size: u64) -> Self {
EventRefState {
buffer,
size,
done: !0,
positions: [0; 32],
pending_commits: BinaryHeap::new(),
}
}

/// Mark the read of [from, to) as complete.
/// If reads are completed in-order, then this will advance the tail pointer to `to` immediately.
/// Otherwise, it will remain in the "pending commit" queue, and committed once all previous
/// reads are also committed.
fn try_commit(&mut self, from: u64, to: u64) {
self.pending_commits.push(cmp::Reverse((from, to)));

let mut position = unsafe { read_tail(self.buffer) };
while let Some(&cmp::Reverse((from, to))) = self.pending_commits.peek() {
if from == position {
unsafe { write_tail(self.buffer, to); }
position = to;
self.pending_commits.pop();
} else {
break;
}
}
}
}
Expand All @@ -485,40 +510,38 @@ impl Drop for EventRefState {
}
}

/// Handle to a single event in the perf ring buffer.
///
/// On Drop, the event will be "consumed" and the read pointer will be advanced.
///
/// If events are dropped out of order, then it will be added to a list of pending commits and
/// committed when all prior events are also dropped. For this reason, events should be dropped
/// in-order to achieve the lowest overhead.
#[derive(Clone)]
pub struct EventRef {
buffer: *mut u8,
buffer_size: usize,
event_location: RawRecordLocation,
mask: u32,
state: Arc<Mutex<EventRefState>>,
event_location: RawRecordLocation,
prev_position: u64,
position: u64,
parse_info: RecordParseInfo,
}

impl fmt::Debug for EventRef {
fn fmt(&self, fmt: &mut fmt::Formatter) -> Result<(), fmt::Error> {
fmt.debug_map()
.entry(&"location", &self.event_location)
.entry(&"mask", &format!("{:032b}", self.mask))
.entry(&"prev_position", &self.prev_position)
.entry(&"position", &self.position)
.finish()
}
}

impl Drop for EventRef {
#[inline]
fn drop(&mut self) {
let mut state = self.state.lock();
let last_empty_spaces = state.done.leading_zeros();
state.done &= self.mask;
let empty_spaces = state.done.leading_zeros();

debug_assert!(empty_spaces >= last_empty_spaces);
if empty_spaces != last_empty_spaces {
let position = state.positions[empty_spaces as usize];
unsafe {
write_tail(self.buffer, position);
}
}
self.state.lock().try_commit(self.prev_position, self.position);
}
}

Expand All @@ -532,45 +555,12 @@ impl EventRef {

pub struct EventIter<'a> {
perf: &'a mut Perf,
index: usize,
locations: Vec<RawRecordLocation>,
state: Arc<Mutex<EventRefState>>,
}

impl<'a> EventIter<'a> {
#[inline]
fn new(perf: &'a mut Perf) -> Self {
let mut locations = Vec::with_capacity(32);

{
let state = Arc::get_mut(&mut perf.event_ref_state)
.expect("Perf::iter called while the previous iterator hasn't finished processing");
let state = state.get_mut();

for _ in 0..31 {
state.positions[locations.len()] = perf.position;
let raw_event_location =
match next_raw_event(perf.buffer, perf.size, &mut perf.position) {
Some(location) => location,
None => break,
};

locations.push(raw_event_location);
}

state.positions[locations.len()] = perf.position;
state.done = !0;
}

// trace!("Batched {} events for PID {}", count, perf.pid);

let state = perf.event_ref_state.clone();
EventIter {
perf,
index: 0,
locations,
state,
}
EventIter { perf }
}
}

Expand All @@ -579,21 +569,17 @@ impl<'a> Iterator for EventIter<'a> {

#[inline]
fn next(&mut self) -> Option<Self::Item> {
if self.index == self.locations.len() {
return None;
}

let event_location = self.locations[self.index].clone();
let event = EventRef {
buffer: self.perf.buffer,
buffer_size: self.perf.size as usize,
let perf = &mut self.perf;
let prev_position = perf.position;
let event_location = next_raw_event(perf.buffer, perf.size, &mut perf.position)?;
Some(EventRef {
buffer: perf.buffer,
buffer_size: perf.size as usize,
state: perf.event_ref_state.clone(),
event_location,
mask: !(1 << (31 - self.index)),
state: self.state.clone(),
prev_position,
position: perf.position,
parse_info: self.perf.parse_info,
};

self.index += 1;
Some(event)
})
}
}

0 comments on commit cd846d4

Please sign in to comment.