Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf_event: Remove the size limit from EventIter. #30

Merged
merged 1 commit into from
Feb 27, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 58 additions & 68 deletions samply/src/linux/perf_event.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::cmp::max;
use std::fmt;
use std::collections::BinaryHeap;
use std::io;
use std::mem;
use std::ops::Range;
Expand All @@ -9,6 +9,7 @@ use std::slice;
use std::sync::atomic::fence;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::{cmp, fmt};

use libc::{self, c_void, pid_t};
use linux_perf_data::linux_perf_event_reader;
Expand Down Expand Up @@ -69,6 +70,13 @@ unsafe fn read_head(pointer: *const u8) -> u64 {
head
}

unsafe fn read_tail(pointer: *const u8) -> u64 {
let page = &*(pointer as *const PerfEventMmapPage);
// No memory fence required because we're just reading a value previously
// written by us.
ptr::read_volatile(&page.data_tail)
}

unsafe fn write_tail(pointer: *mut u8, value: u64) {
let page = &mut *(pointer as *mut PerfEventMmapPage);
fence(Ordering::AcqRel);
Expand Down Expand Up @@ -462,17 +470,36 @@ impl Perf {
struct EventRefState {
buffer: *mut u8,
size: u64,
done: u32,
positions: [u64; 32],
pending_commits: BinaryHeap<cmp::Reverse<(u64, u64)>>,
}

impl EventRefState {
fn new(buffer: *mut u8, size: u64) -> Self {
EventRefState {
buffer,
size,
done: !0,
positions: [0; 32],
pending_commits: BinaryHeap::new(),
}
}

/// Mark the read of [from, to) as complete.
/// If reads are completed in-order, then this will advance the tail pointer to `to` immediately.
/// Otherwise, it will remain in the "pending commit" queue, and committed once all previous
/// reads are also committed.
fn try_commit(&mut self, from: u64, to: u64) {
self.pending_commits.push(cmp::Reverse((from, to)));

let mut position = unsafe { read_tail(self.buffer) };
while let Some(&cmp::Reverse((from, to))) = self.pending_commits.peek() {
if from == position {
unsafe {
write_tail(self.buffer, to);
}
position = to;
self.pending_commits.pop();
} else {
break;
}
}
}
}
Expand All @@ -485,40 +512,40 @@ impl Drop for EventRefState {
}
}

/// Handle to a single event in the perf ring buffer.
///
/// On Drop, the event will be "consumed" and the read pointer will be advanced.
///
/// If events are dropped out of order, then it will be added to a list of pending commits and
/// committed when all prior events are also dropped. For this reason, events should be dropped
/// in-order to achieve the lowest overhead.
#[derive(Clone)]
pub struct EventRef {
buffer: *mut u8,
buffer_size: usize,
event_location: RawRecordLocation,
mask: u32,
state: Arc<Mutex<EventRefState>>,
event_location: RawRecordLocation,
prev_position: u64,
position: u64,
parse_info: RecordParseInfo,
}

impl fmt::Debug for EventRef {
fn fmt(&self, fmt: &mut fmt::Formatter) -> Result<(), fmt::Error> {
fmt.debug_map()
.entry(&"location", &self.event_location)
.entry(&"mask", &format!("{:032b}", self.mask))
.entry(&"prev_position", &self.prev_position)
.entry(&"position", &self.position)
.finish()
}
}

impl Drop for EventRef {
#[inline]
fn drop(&mut self) {
let mut state = self.state.lock();
let last_empty_spaces = state.done.leading_zeros();
state.done &= self.mask;
let empty_spaces = state.done.leading_zeros();

debug_assert!(empty_spaces >= last_empty_spaces);
if empty_spaces != last_empty_spaces {
let position = state.positions[empty_spaces as usize];
unsafe {
write_tail(self.buffer, position);
}
}
self.state
.lock()
.try_commit(self.prev_position, self.position);
}
}

Expand All @@ -532,45 +559,12 @@ impl EventRef {

pub struct EventIter<'a> {
perf: &'a mut Perf,
index: usize,
locations: Vec<RawRecordLocation>,
state: Arc<Mutex<EventRefState>>,
}

impl<'a> EventIter<'a> {
#[inline]
fn new(perf: &'a mut Perf) -> Self {
let mut locations = Vec::with_capacity(32);

{
let state = Arc::get_mut(&mut perf.event_ref_state)
.expect("Perf::iter called while the previous iterator hasn't finished processing");
let state = state.get_mut();

for _ in 0..31 {
state.positions[locations.len()] = perf.position;
let raw_event_location =
match next_raw_event(perf.buffer, perf.size, &mut perf.position) {
Some(location) => location,
None => break,
};

locations.push(raw_event_location);
}

state.positions[locations.len()] = perf.position;
state.done = !0;
}

// trace!("Batched {} events for PID {}", count, perf.pid);

let state = perf.event_ref_state.clone();
EventIter {
perf,
index: 0,
locations,
state,
}
EventIter { perf }
}
}

Expand All @@ -579,21 +573,17 @@ impl<'a> Iterator for EventIter<'a> {

#[inline]
fn next(&mut self) -> Option<Self::Item> {
if self.index == self.locations.len() {
return None;
}

let event_location = self.locations[self.index].clone();
let event = EventRef {
buffer: self.perf.buffer,
buffer_size: self.perf.size as usize,
let perf = &mut self.perf;
let prev_position = perf.position;
let event_location = next_raw_event(perf.buffer, perf.size, &mut perf.position)?;
Some(EventRef {
buffer: perf.buffer,
buffer_size: perf.size as usize,
state: perf.event_ref_state.clone(),
event_location,
mask: !(1 << (31 - self.index)),
state: self.state.clone(),
prev_position,
position: perf.position,
parse_info: self.perf.parse_info,
};

self.index += 1;
Some(event)
})
}
}