Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -356,9 +356,10 @@ traces user-land will simply read and then clear this map on a timer.
The BPF components are responsible for notifying user-land about new and exiting
processes. An event about a new process is produced when we first interrupt it
with the unwinders. Events about exiting processes are created with a
`sched_process_exit` probe. In both cases the BPF code sends a perf event to
`sched_process_free` tracepoint. In both cases the BPF code sends a perf event to
notify user-land. We also re-report a PID if we detect execution in previously
unknown memory region to prompt re-scan of the mappings.
unknown memory region to prompt re-scan of the mappings. Finally, the profiler
can also profile processes whose main thread exits, leaving other threads running.

### Network protocol

Expand Down
4 changes: 2 additions & 2 deletions internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ func (c *Controller) Start(ctx context.Context) error {
return fmt.Errorf("failed to attach scheduler monitor: %w", err)
}

// This log line is used in our system tests to verify if that the agent has started. So if you
// change this log line update also the system test.
// This log line is used in our system tests to verify if that the agent has started.
// So if you change this log line update also the system test.
log.Printf("Attached sched monitor")

if err := startTraceHandling(ctx, c.reporter, intervals, trc,
Expand Down
19 changes: 19 additions & 0 deletions libpf/pid.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,27 @@
package libpf // import "go.opentelemetry.io/ebpf-profiler/libpf"

import (
"fmt"
)

// PID represent Unix Process ID (pid_t)
type PID uint32

func (p PID) Hash32() uint32 {
return uint32(p)
}

// PIDTID encodes a process id and a thread id
type PIDTID uint64

func (pt PIDTID) PID() PID {
return PID(pt >> 32)
}

func (pt PIDTID) TID() PID {
return PID(pt & 0xFFFFFFFF)
}

func (pt PIDTID) String() string {
return fmt.Sprintf("PID: %v TID: %v", pt.PID(), pt.TID())
}
74 changes: 57 additions & 17 deletions process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,17 @@ import (
"go.opentelemetry.io/ebpf-profiler/stringutil"
)

// GetMappings returns this error when no mappings can be extracted.
var ErrNoMappings = errors.New("no mappings")

// systemProcess provides an implementation of the Process interface for a
// process that is currently running on this machine.
type systemProcess struct {
pid libpf.PID
tid libpf.PID

remoteMemory remotememory.RemoteMemory
mainThreadExit bool
remoteMemory remotememory.RemoteMemory

fileToMapping map[string]*Mapping
}
Expand All @@ -54,9 +59,10 @@ func init() {
}

// New returns an object with Process interface accessing it
func New(pid libpf.PID) Process {
func New(pid, tid libpf.PID) Process {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't switch Process to accept libpf.PIDTID as the latter is only used with PID events, and I'd rather not couple it here too.

return &systemProcess{
pid: pid,
tid: tid,
remoteMemory: remotememory.NewProcessVirtualMemory(pid),
}
}
Expand Down Expand Up @@ -166,9 +172,8 @@ func parseMappings(mapsFile io.Reader) ([]Mapping, uint32, error) {
path = VdsoPathName
device = 0
inode = vdsoInode
} else if path != "" {
// Ignore [vsyscall] and similar executable kernel
// pages we don't care about
} else {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No semantic change, I just inlined the logic from GetMappings here as this is the more appropriate place.

// Ignore mappings that are invalid, non-existent or are special pseudo-files
continue
}
} else {
Expand Down Expand Up @@ -230,20 +235,48 @@ func (sp *systemProcess) GetMappings() ([]Mapping, uint32, error) {
defer mapsFile.Close()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this original path of checking the primary maps file be made conditional on if sp.mainThreadExit == false? (Or if this object is not kept between calls, perhaps this could be an argument?)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm yes, I can add the check/skip-pid-maps logic but I'll also need to add the zombie check to ensure that we're only going to be reading tid-specific maps if it's absolutely the case that main thread has exited.

Copy link
Copy Markdown
Member Author

@christos68k christos68k Apr 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the zombie check for process exit but adding a conditional check for sp.mainThreadExit == false in GetMappings doesn't currently serve any purpose: GetMappings is only called once for any given systemProcess instance, so the check will never fail (caching mainThreadExit on the instance does serve a purpose however, as there are subsequent method calls after GetMappings that will leverage it).

So we can either have processmanager cache the mainThreadExit status for a particular PID and add this check (and then we'd need extra logic inside GetMappings to determine when the process has finally exited, currently the primary maps file path serves this purpose) or keep the logic as is now.


mappings, numParseErrors, err := parseMappings(mapsFile)
if err == nil {
fileToMapping := make(map[string]*Mapping, len(mappings))
for idx := range mappings {
m := &mappings[idx]
if m.Inode == 0 {
// Ignore mappings that are invalid,
// non-existent or are special pseudo-files.
continue
}
fileToMapping[m.Path] = m
if err != nil {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here we should also return early if err is nil and numParseErrors is non-zero. Or perhaps even better, parseMappings could return an err if it failed to find usable mappings (but it managed to read data). The idea is basically to distinguish here if mappings is empty or all lines were non-parseable.

Copy link
Copy Markdown
Member Author

@christos68k christos68k Apr 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't the len(mappings) == 0 check that follows cover this case?

If err == nil and len(mappings) != 0 then we simply continue and process the mappings. If err == nil and len(mappings) == 0 then we continue and try mappings from another thread. Essentially all branching logic depends on err and len(mappings), not numParseErrors which is purely advisory.

return mappings, numParseErrors, err
}

if len(mappings) == 0 {
// We could test for main thread exit here by checking for zombie state
// in /proc/sp.pid/stat but it's simpler to assume that this is the case
// and try extracting mappings for a different thread. Since we stopped
// processing /proc at agent startup, it's not possible that the agent
// will sample a process without mappings
log.Debugf("PID: %v main thread exit", sp.pid)
sp.mainThreadExit = true

if sp.pid == sp.tid {
return mappings, numParseErrors, ErrNoMappings
}

log.Debugf("TID: %v extracting mappings", sp.tid)
mapsFileAlt, err := os.Open(fmt.Sprintf("/proc/%d/task/%d/maps", sp.pid, sp.tid))
// On all errors resulting from trying to get mappings from a different thread,
// return ErrNoMappings which will keep the PID tracked in processmanager and
// allow for a future iteration to try extracting mappings from a different thread.
// This is done to deal with race conditions triggered by thread exits (we do not want
// the agent to unload process metadata when a thread exits but the process is still
// alive).
if err != nil {
return mappings, numParseErrors, ErrNoMappings
}
defer mapsFileAlt.Close()
mappings, numParseErrors, err = parseMappings(mapsFileAlt)
if err != nil || len(mappings) == 0 {
return mappings, numParseErrors, ErrNoMappings
}
sp.fileToMapping = fileToMapping
}
return mappings, numParseErrors, err

fileToMapping := make(map[string]*Mapping, len(mappings))
for idx := range mappings {
m := &mappings[idx]
fileToMapping[m.Path] = m
}
sp.fileToMapping = fileToMapping
return mappings, numParseErrors, nil
}

func (sp *systemProcess) GetThreads() ([]ThreadInfo, error) {
Expand Down Expand Up @@ -272,6 +305,13 @@ func (sp *systemProcess) getMappingFile(m *Mapping) string {
if m.IsAnonymous() || m.IsVDSO() {
return ""
}
if sp.mainThreadExit {
// Neither /proc/sp.pid/map_files nor /proc/sp.pid/task/sp.tid/map_files
// nor /proc/sp.pid/root exist if main thread has exited, so we use the
// mapping path directly under the sp.tid root.
rootPath := fmt.Sprintf("/proc/%v/task/%v/root", sp.pid, sp.tid)
return path.Join(rootPath, m.Path)
}
return fmt.Sprintf("/proc/%v/map_files/%x-%x", sp.pid, m.Vaddr, m.Vaddr+m.Length)
}

Expand Down
3 changes: 2 additions & 1 deletion process/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ func TestParseMappings(t *testing.T) {
}

func TestNewPIDOfSelf(t *testing.T) {
pr := New(libpf.PID(os.Getpid()))
pid := libpf.PID(os.Getpid())
pr := New(pid, pid)
assert.NotNil(t, pr)

mappings, numParseErrors, err := pr.GetMappings()
Expand Down
29 changes: 11 additions & 18 deletions processmanager/processinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,9 +614,16 @@ func (pm *ProcessManager) SynchronizeProcess(pr process.Process) {
return
}

if errors.Is(err, process.ErrNoMappings) {
// When no mappings can be extracted but the process is still alive,
// do not trigger a process exit to avoid unloading process metadata.
// As it's likely that a future iteration can extract mappings from a
// different thread in the process, notify eBPF to enable further notifications.
pm.ebpf.RemoveReportedPID(pid)
return
}

// All other errors imply that the process has exited.
// Clean up, and notify eBPF.
pm.processPIDExit(pid)
if os.IsNotExist(err) {
// Since listing /proc and opening files in there later is inherently racy,
// we expect to lose the race sometimes and thus expect to hit os.IsNotExist.
Expand All @@ -626,22 +633,7 @@ func (pm *ProcessManager) SynchronizeProcess(pr process.Process) {
// return ESRCH. Handle it as if the process did not exist.
pm.mappingStats.errProcESRCH.Add(1)
}
return
}
if len(mappings) == 0 {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These comments are no longer relevant.

// Valid process without any (executable) mappings. All cases are
// handled as process exit. Possible causes and reasoning:
// 1. It is a kernel worker process. The eBPF does not send events from these,
// but we can see kernel threads here during startup when tracer walks
// /proc and tries to synchronize all PIDs it sees.
// The PID should not exist anywhere, but we can still double check and
// make sure the PID is not tracked.
// 2. It is a normal process executing, but we just sampled it when the kernel
// execve() is rebuilding the mappings and nothing is currently mapped.
// In this case we can handle it as process exit because everything about
// the process is changing: all mappings, comm, etc. If execve fails, we
// reaped it early. If execve succeeds, we will get new synchronization
// request soon, and handle it as a new process event.
Comment thread
christos68k marked this conversation as resolved.
// Clean up, and notify eBPF.
pm.processPIDExit(pid)
return
}
Expand Down Expand Up @@ -744,6 +736,7 @@ func (pm *ProcessManager) ProcessedUntil(traceCaptureKTime times.KTime) {
continue
}

log.Debugf("PID %v deleted", pid)
delete(pm.pidToProcessInfo, pid)

for _, instance := range pm.interpreters[pid] {
Expand Down
13 changes: 7 additions & 6 deletions support/ebpf/interpreter_dispatcher.ebpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,14 @@ bpf_map_def SEC("maps") reported_pids = {
//
// User space code will periodically iterate through the map and process each entry.
// Additionally, each time eBPF code writes a value into the map, user space is notified
// through event_send_trigger (which uses maps/report_events). As key we use the PID of
// the process and as value always true. When sizing this map, we are thinking about
// the maximum number of unique PIDs that could generate events we're interested in
// (process new, process exit, unknown PC) within a map monitor/processing interval,
// through event_send_trigger (which uses maps/report_events). As key we use the PID/TID
// of the process/thread and as value always true. When sizing this map, we are thinking
// about the maximum number of unique PIDs that could generate events we're interested in
// (process new, thread group exit, unknown PC) within a map monitor/processing interval,
// that we would like to support.
bpf_map_def SEC("maps") pid_events = {
.type = BPF_MAP_TYPE_HASH,
.key_size = sizeof(u32),
.key_size = sizeof(u64),
.value_size = sizeof(bool),
.max_entries = 65536,
};
Expand Down Expand Up @@ -205,7 +205,8 @@ static inline __attribute__((__always_inline__)) int unwind_stop(struct pt_regs
// No Error
break;
case metricID_UnwindNativeErrWrongTextSection:;
if (report_pid(ctx, trace->pid, record->ratelimitAction)) {
u64 pid_tgid = (u64)trace->pid << 32 | trace->tid;
if (report_pid(ctx, pid_tgid, record->ratelimitAction)) {
increment_metric(metricID_NumUnknownPC);
}
// Fallthrough to report the error
Expand Down
29 changes: 14 additions & 15 deletions support/ebpf/sched_monitor.ebpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,28 @@

#include "types.h"

// tracepoint__sched_process_exit is a tracepoint attached to the scheduler that stops processes.
// Every time a processes stops this hook is triggered.
SEC("tracepoint/sched/sched_process_exit")
int tracepoint__sched_process_exit(void *ctx)
{
u64 pid_tgid = bpf_get_current_pid_tgid();
u32 pid = (u32)(pid_tgid >> 32);
u32 tid = (u32)(pid_tgid & 0xFFFFFFFF);
// See /sys/kernel/debug/tracing/events/sched/sched_process_free/format
// for struct layout.
struct sched_process_free_ctx {
Comment thread
christos68k marked this conversation as resolved.
unsigned char skip[24];
pid_t pid;
int prio;
};

if (pid != tid) {
// Only if the thread group ID matched with the PID the process itself exits. If they don't
// match only a thread of the process stopped and we do not need to report this PID to
// userspace for further processing.
goto exit;
}
// tracepoint__sched_process_free is a tracepoint attached to the scheduler that frees processes.
// Every time a processes exits this hook is triggered.
SEC("tracepoint/sched/sched_process_free")
int tracepoint__sched_process_free(struct sched_process_free_ctx *ctx)
{
u32 pid = ctx->pid;

if (!bpf_map_lookup_elem(&reported_pids, &pid) && !pid_information_exists(ctx, pid)) {
// Only report PIDs that we explicitly track. This avoids sending kernel worker PIDs
// to userspace.
goto exit;
}

if (report_pid(ctx, pid, RATELIMIT_ACTION_RESET)) {
if (report_pid(ctx, (u64)pid << 32 | pid, RATELIMIT_ACTION_RESET)) {
increment_metric(metricID_NumProcExit);
}
exit:
Expand Down
14 changes: 8 additions & 6 deletions support/ebpf/tracemgmt.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,23 +159,24 @@ pid_event_ratelimit(u32 pid, int ratelimit_action)
// and reporting aborted if PID has been recently reported.
// Returns true if the PID was successfully reported to user space.
static inline __attribute__((__always_inline__)) bool
report_pid(void *ctx, int pid, int ratelimit_action)
report_pid(void *ctx, u64 pid_tgid, int ratelimit_action)
{
u32 key = (u32)pid;
u32 pid = pid_tgid >> 32;

if (pid_event_ratelimit(pid, ratelimit_action)) {
return false;
}

bool value = true;
int errNo = bpf_map_update_elem(&pid_events, &key, &value, BPF_ANY);
int errNo = bpf_map_update_elem(&pid_events, &pid_tgid, &value, BPF_ANY);
if (errNo != 0) {
DEBUG_PRINT("Failed to update pid_events with PID %d: %d", pid, errNo);
__attribute__((unused)) u32 tid = pid_tgid & 0xFFFFFFFF;
DEBUG_PRINT("Failed to update pid_events with PID %d TID: %d: %d", pid, tid, errNo);
increment_metric(metricID_PIDEventsErr);
return false;
}
if (ratelimit_action == RATELIMIT_ACTION_RESET || errNo != 0) {
bpf_map_delete_elem(&reported_pids, &key);
bpf_map_delete_elem(&reported_pids, &pid);
}

// Notify userspace that there is a PID waiting to be processed.
Expand Down Expand Up @@ -730,7 +731,8 @@ static inline int collect_trace(
}

if (!pid_information_exists(ctx, pid)) {
if (report_pid(ctx, pid, RATELIMIT_ACTION_DEFAULT)) {
u64 pid_tgid = (u64)pid << 32 | tid;
if (report_pid(ctx, pid_tgid, RATELIMIT_ACTION_DEFAULT)) {
increment_metric(metricID_NumProcNew);
}
return 0;
Expand Down
Binary file modified support/ebpf/tracer.ebpf.release.amd64
Binary file not shown.
Binary file modified support/ebpf/tracer.ebpf.release.arm64
Binary file not shown.
Loading
Loading