diff --git a/README.md b/README.md index e4337bc3b..3c3e689fd 100644 --- a/README.md +++ b/README.md @@ -356,9 +356,10 @@ traces user-land will simply read and then clear this map on a timer. The BPF components are responsible for notifying user-land about new and exiting processes. An event about a new process is produced when we first interrupt it with the unwinders. Events about exiting processes are created with a -`sched_process_exit` probe. In both cases the BPF code sends a perf event to +`sched_process_free` tracepoint. In both cases the BPF code sends a perf event to notify user-land. We also re-report a PID if we detect execution in previously -unknown memory region to prompt re-scan of the mappings. +unknown memory region to prompt re-scan of the mappings. Finally, the profiler +can also profile processes whose main thread exits, leaving other threads running. ### Network protocol diff --git a/internal/controller/controller.go b/internal/controller/controller.go index 7d823352e..bb3b0e8ae 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -137,8 +137,8 @@ func (c *Controller) Start(ctx context.Context) error { return fmt.Errorf("failed to attach scheduler monitor: %w", err) } - // This log line is used in our system tests to verify if that the agent has started. So if you - // change this log line update also the system test. + // This log line is used in our system tests to verify if that the agent has started. + // So if you change this log line update also the system test. log.Printf("Attached sched monitor") if err := startTraceHandling(ctx, c.reporter, intervals, trc, diff --git a/libpf/pid.go b/libpf/pid.go index 168a2d161..4752e6aba 100644 --- a/libpf/pid.go +++ b/libpf/pid.go @@ -1,8 +1,27 @@ package libpf // import "go.opentelemetry.io/ebpf-profiler/libpf" +import ( + "fmt" +) + // PID represent Unix Process ID (pid_t) type PID uint32 func (p PID) Hash32() uint32 { return uint32(p) } + +// PIDTID encodes a process id and a thread id +type PIDTID uint64 + +func (pt PIDTID) PID() PID { + return PID(pt >> 32) +} + +func (pt PIDTID) TID() PID { + return PID(pt & 0xFFFFFFFF) +} + +func (pt PIDTID) String() string { + return fmt.Sprintf("PID: %v TID: %v", pt.PID(), pt.TID()) +} diff --git a/process/process.go b/process/process.go index 2215bf8ee..1102cffc7 100644 --- a/process/process.go +++ b/process/process.go @@ -25,12 +25,17 @@ import ( "go.opentelemetry.io/ebpf-profiler/stringutil" ) +// GetMappings returns this error when no mappings can be extracted. +var ErrNoMappings = errors.New("no mappings") + // systemProcess provides an implementation of the Process interface for a // process that is currently running on this machine. type systemProcess struct { pid libpf.PID + tid libpf.PID - remoteMemory remotememory.RemoteMemory + mainThreadExit bool + remoteMemory remotememory.RemoteMemory fileToMapping map[string]*Mapping } @@ -54,9 +59,10 @@ func init() { } // New returns an object with Process interface accessing it -func New(pid libpf.PID) Process { +func New(pid, tid libpf.PID) Process { return &systemProcess{ pid: pid, + tid: tid, remoteMemory: remotememory.NewProcessVirtualMemory(pid), } } @@ -166,9 +172,8 @@ func parseMappings(mapsFile io.Reader) ([]Mapping, uint32, error) { path = VdsoPathName device = 0 inode = vdsoInode - } else if path != "" { - // Ignore [vsyscall] and similar executable kernel - // pages we don't care about + } else { + // Ignore mappings that are invalid, non-existent or are special pseudo-files continue } } else { @@ -230,20 +235,48 @@ func (sp *systemProcess) GetMappings() ([]Mapping, uint32, error) { defer mapsFile.Close() mappings, numParseErrors, err := parseMappings(mapsFile) - if err == nil { - fileToMapping := make(map[string]*Mapping, len(mappings)) - for idx := range mappings { - m := &mappings[idx] - if m.Inode == 0 { - // Ignore mappings that are invalid, - // non-existent or are special pseudo-files. - continue - } - fileToMapping[m.Path] = m + if err != nil { + return mappings, numParseErrors, err + } + + if len(mappings) == 0 { + // We could test for main thread exit here by checking for zombie state + // in /proc/sp.pid/stat but it's simpler to assume that this is the case + // and try extracting mappings for a different thread. Since we stopped + // processing /proc at agent startup, it's not possible that the agent + // will sample a process without mappings + log.Debugf("PID: %v main thread exit", sp.pid) + sp.mainThreadExit = true + + if sp.pid == sp.tid { + return mappings, numParseErrors, ErrNoMappings + } + + log.Debugf("TID: %v extracting mappings", sp.tid) + mapsFileAlt, err := os.Open(fmt.Sprintf("/proc/%d/task/%d/maps", sp.pid, sp.tid)) + // On all errors resulting from trying to get mappings from a different thread, + // return ErrNoMappings which will keep the PID tracked in processmanager and + // allow for a future iteration to try extracting mappings from a different thread. + // This is done to deal with race conditions triggered by thread exits (we do not want + // the agent to unload process metadata when a thread exits but the process is still + // alive). + if err != nil { + return mappings, numParseErrors, ErrNoMappings + } + defer mapsFileAlt.Close() + mappings, numParseErrors, err = parseMappings(mapsFileAlt) + if err != nil || len(mappings) == 0 { + return mappings, numParseErrors, ErrNoMappings } - sp.fileToMapping = fileToMapping } - return mappings, numParseErrors, err + + fileToMapping := make(map[string]*Mapping, len(mappings)) + for idx := range mappings { + m := &mappings[idx] + fileToMapping[m.Path] = m + } + sp.fileToMapping = fileToMapping + return mappings, numParseErrors, nil } func (sp *systemProcess) GetThreads() ([]ThreadInfo, error) { @@ -272,6 +305,13 @@ func (sp *systemProcess) getMappingFile(m *Mapping) string { if m.IsAnonymous() || m.IsVDSO() { return "" } + if sp.mainThreadExit { + // Neither /proc/sp.pid/map_files nor /proc/sp.pid/task/sp.tid/map_files + // nor /proc/sp.pid/root exist if main thread has exited, so we use the + // mapping path directly under the sp.tid root. + rootPath := fmt.Sprintf("/proc/%v/task/%v/root", sp.pid, sp.tid) + return path.Join(rootPath, m.Path) + } return fmt.Sprintf("/proc/%v/map_files/%x-%x", sp.pid, m.Vaddr, m.Vaddr+m.Length) } diff --git a/process/process_test.go b/process/process_test.go index 9e5689db7..cf2ab035c 100644 --- a/process/process_test.go +++ b/process/process_test.go @@ -103,7 +103,8 @@ func TestParseMappings(t *testing.T) { } func TestNewPIDOfSelf(t *testing.T) { - pr := New(libpf.PID(os.Getpid())) + pid := libpf.PID(os.Getpid()) + pr := New(pid, pid) assert.NotNil(t, pr) mappings, numParseErrors, err := pr.GetMappings() diff --git a/processmanager/processinfo.go b/processmanager/processinfo.go index 6ccf86f80..2872f04dc 100644 --- a/processmanager/processinfo.go +++ b/processmanager/processinfo.go @@ -614,9 +614,16 @@ func (pm *ProcessManager) SynchronizeProcess(pr process.Process) { return } + if errors.Is(err, process.ErrNoMappings) { + // When no mappings can be extracted but the process is still alive, + // do not trigger a process exit to avoid unloading process metadata. + // As it's likely that a future iteration can extract mappings from a + // different thread in the process, notify eBPF to enable further notifications. + pm.ebpf.RemoveReportedPID(pid) + return + } + // All other errors imply that the process has exited. - // Clean up, and notify eBPF. - pm.processPIDExit(pid) if os.IsNotExist(err) { // Since listing /proc and opening files in there later is inherently racy, // we expect to lose the race sometimes and thus expect to hit os.IsNotExist. @@ -626,22 +633,7 @@ func (pm *ProcessManager) SynchronizeProcess(pr process.Process) { // return ESRCH. Handle it as if the process did not exist. pm.mappingStats.errProcESRCH.Add(1) } - return - } - if len(mappings) == 0 { - // Valid process without any (executable) mappings. All cases are - // handled as process exit. Possible causes and reasoning: - // 1. It is a kernel worker process. The eBPF does not send events from these, - // but we can see kernel threads here during startup when tracer walks - // /proc and tries to synchronize all PIDs it sees. - // The PID should not exist anywhere, but we can still double check and - // make sure the PID is not tracked. - // 2. It is a normal process executing, but we just sampled it when the kernel - // execve() is rebuilding the mappings and nothing is currently mapped. - // In this case we can handle it as process exit because everything about - // the process is changing: all mappings, comm, etc. If execve fails, we - // reaped it early. If execve succeeds, we will get new synchronization - // request soon, and handle it as a new process event. + // Clean up, and notify eBPF. pm.processPIDExit(pid) return } @@ -744,6 +736,7 @@ func (pm *ProcessManager) ProcessedUntil(traceCaptureKTime times.KTime) { continue } + log.Debugf("PID %v deleted", pid) delete(pm.pidToProcessInfo, pid) for _, instance := range pm.interpreters[pid] { diff --git a/support/ebpf/interpreter_dispatcher.ebpf.c b/support/ebpf/interpreter_dispatcher.ebpf.c index 934855681..d83673257 100644 --- a/support/ebpf/interpreter_dispatcher.ebpf.c +++ b/support/ebpf/interpreter_dispatcher.ebpf.c @@ -66,14 +66,14 @@ bpf_map_def SEC("maps") reported_pids = { // // User space code will periodically iterate through the map and process each entry. // Additionally, each time eBPF code writes a value into the map, user space is notified -// through event_send_trigger (which uses maps/report_events). As key we use the PID of -// the process and as value always true. When sizing this map, we are thinking about -// the maximum number of unique PIDs that could generate events we're interested in -// (process new, process exit, unknown PC) within a map monitor/processing interval, +// through event_send_trigger (which uses maps/report_events). As key we use the PID/TID +// of the process/thread and as value always true. When sizing this map, we are thinking +// about the maximum number of unique PIDs that could generate events we're interested in +// (process new, thread group exit, unknown PC) within a map monitor/processing interval, // that we would like to support. bpf_map_def SEC("maps") pid_events = { .type = BPF_MAP_TYPE_HASH, - .key_size = sizeof(u32), + .key_size = sizeof(u64), .value_size = sizeof(bool), .max_entries = 65536, }; @@ -205,7 +205,8 @@ static inline __attribute__((__always_inline__)) int unwind_stop(struct pt_regs // No Error break; case metricID_UnwindNativeErrWrongTextSection:; - if (report_pid(ctx, trace->pid, record->ratelimitAction)) { + u64 pid_tgid = (u64)trace->pid << 32 | trace->tid; + if (report_pid(ctx, pid_tgid, record->ratelimitAction)) { increment_metric(metricID_NumUnknownPC); } // Fallthrough to report the error diff --git a/support/ebpf/sched_monitor.ebpf.c b/support/ebpf/sched_monitor.ebpf.c index a8d6b98c1..335daf657 100644 --- a/support/ebpf/sched_monitor.ebpf.c +++ b/support/ebpf/sched_monitor.ebpf.c @@ -6,21 +6,20 @@ #include "types.h" -// tracepoint__sched_process_exit is a tracepoint attached to the scheduler that stops processes. -// Every time a processes stops this hook is triggered. -SEC("tracepoint/sched/sched_process_exit") -int tracepoint__sched_process_exit(void *ctx) -{ - u64 pid_tgid = bpf_get_current_pid_tgid(); - u32 pid = (u32)(pid_tgid >> 32); - u32 tid = (u32)(pid_tgid & 0xFFFFFFFF); +// See /sys/kernel/debug/tracing/events/sched/sched_process_free/format +// for struct layout. +struct sched_process_free_ctx { + unsigned char skip[24]; + pid_t pid; + int prio; +}; - if (pid != tid) { - // Only if the thread group ID matched with the PID the process itself exits. If they don't - // match only a thread of the process stopped and we do not need to report this PID to - // userspace for further processing. - goto exit; - } +// tracepoint__sched_process_free is a tracepoint attached to the scheduler that frees processes. +// Every time a processes exits this hook is triggered. +SEC("tracepoint/sched/sched_process_free") +int tracepoint__sched_process_free(struct sched_process_free_ctx *ctx) +{ + u32 pid = ctx->pid; if (!bpf_map_lookup_elem(&reported_pids, &pid) && !pid_information_exists(ctx, pid)) { // Only report PIDs that we explicitly track. This avoids sending kernel worker PIDs @@ -28,7 +27,7 @@ int tracepoint__sched_process_exit(void *ctx) goto exit; } - if (report_pid(ctx, pid, RATELIMIT_ACTION_RESET)) { + if (report_pid(ctx, (u64)pid << 32 | pid, RATELIMIT_ACTION_RESET)) { increment_metric(metricID_NumProcExit); } exit: diff --git a/support/ebpf/tracemgmt.h b/support/ebpf/tracemgmt.h index 60258f908..dd22f6bbb 100644 --- a/support/ebpf/tracemgmt.h +++ b/support/ebpf/tracemgmt.h @@ -159,23 +159,24 @@ pid_event_ratelimit(u32 pid, int ratelimit_action) // and reporting aborted if PID has been recently reported. // Returns true if the PID was successfully reported to user space. static inline __attribute__((__always_inline__)) bool -report_pid(void *ctx, int pid, int ratelimit_action) +report_pid(void *ctx, u64 pid_tgid, int ratelimit_action) { - u32 key = (u32)pid; + u32 pid = pid_tgid >> 32; if (pid_event_ratelimit(pid, ratelimit_action)) { return false; } bool value = true; - int errNo = bpf_map_update_elem(&pid_events, &key, &value, BPF_ANY); + int errNo = bpf_map_update_elem(&pid_events, &pid_tgid, &value, BPF_ANY); if (errNo != 0) { - DEBUG_PRINT("Failed to update pid_events with PID %d: %d", pid, errNo); + __attribute__((unused)) u32 tid = pid_tgid & 0xFFFFFFFF; + DEBUG_PRINT("Failed to update pid_events with PID %d TID: %d: %d", pid, tid, errNo); increment_metric(metricID_PIDEventsErr); return false; } if (ratelimit_action == RATELIMIT_ACTION_RESET || errNo != 0) { - bpf_map_delete_elem(&reported_pids, &key); + bpf_map_delete_elem(&reported_pids, &pid); } // Notify userspace that there is a PID waiting to be processed. @@ -730,7 +731,8 @@ static inline int collect_trace( } if (!pid_information_exists(ctx, pid)) { - if (report_pid(ctx, pid, RATELIMIT_ACTION_DEFAULT)) { + u64 pid_tgid = (u64)pid << 32 | tid; + if (report_pid(ctx, pid_tgid, RATELIMIT_ACTION_DEFAULT)) { increment_metric(metricID_NumProcNew); } return 0; diff --git a/support/ebpf/tracer.ebpf.release.amd64 b/support/ebpf/tracer.ebpf.release.amd64 index fc76f8977..1fa9f8e90 100644 Binary files a/support/ebpf/tracer.ebpf.release.amd64 and b/support/ebpf/tracer.ebpf.release.amd64 differ diff --git a/support/ebpf/tracer.ebpf.release.arm64 b/support/ebpf/tracer.ebpf.release.arm64 index 365871f08..b284e12ab 100644 Binary files a/support/ebpf/tracer.ebpf.release.arm64 and b/support/ebpf/tracer.ebpf.release.arm64 differ diff --git a/support/tests/main_thread_exit.c b/support/tests/main_thread_exit.c new file mode 100644 index 000000000..e9188cdeb --- /dev/null +++ b/support/tests/main_thread_exit.c @@ -0,0 +1,108 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +/* + * Implements a profiling test based on a multi-threaded process with + * a main thread that exits early. + * + * Two additional threads are created: + * 1. Burns CPU, ensures process is sampled by the profiler + * 2. Burns CPU in newly mapped pages + * + * After main thread exits, /proc/PID/maps is empty and the expected + * behavior is for the profiler to not cleanup the process, but instead + * keep profiling the remaining thread and use /proc/PID/task/TID/maps + * (TID corresponding to thread 2) to synchronize mappings. + * + * Needs OpenSSL (libssl) installed as it dynamically loads libcrypto.so. + */ + +#define _GNU_SOURCE +#include +#include +#include +#include +#include + +static void *burn(void *arg) +{ + int old_type; + + // We're just burning CPU, so asynchronous cancellation is safe + pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &old_type); + + for (;;) { + } + + // Never reached + return NULL; +} + +static void *hash(void *arg) +{ + unsigned char buf[1024]; + + printf("Thread TID: %d, sleeping for 5s\n", gettid()); + sleep(5); + + unsigned char *(*MD5)(const unsigned char *d, unsigned long n, unsigned char *md); + void *handle = dlopen("libcrypto.so", RTLD_LAZY); + if (!handle) { + fprintf(stderr, "%s\n", dlerror()); + return NULL; + } + + MD5 = dlsym(handle, "MD5"); + if (!MD5) { + fprintf(stderr, "dlsym: Could not resolve MD5\n"); + return NULL; + } + + printf("Thread TID: %d, hashing..\n", gettid()); + for (;;) { + MD5(buf, sizeof(buf), NULL); + } + + // Never reached + return NULL; +} + +int main() +{ + int ret; + pthread_t tid; + printf("Main thread is starting, PID: %d\n", getpid()); + + // Create a new thread to burn CPU / ensure process gets profiled + if ((ret = pthread_create(&tid, NULL, burn, NULL)) != 0) { + fprintf(stderr, "pthread_create: burn %d\n", ret); + exit(EXIT_FAILURE); + } + + sleep(2); + + printf("Press ENTER to exit main thread: "); + getchar(); + + // Stop CPU burn thread to reduce noise while hash thread is running + void *tret; + pthread_cancel(tid); + pthread_join(tid, &tret); + + if (tret != PTHREAD_CANCELED) { + fprintf(stderr, "pthread_join: %p\n", tret); + exit(EXIT_FAILURE); + } + + printf("Main thread is exiting\n"); + + // Create a new thread to burn CPU in newly mapped pages + if ((ret = pthread_create(&tid, NULL, hash, NULL) != 0)) { + fprintf(stderr, "pthread_create: hash %d\n", ret); + exit(EXIT_FAILURE); + } + + pthread_detach(pthread_self()); + pthread_exit(NULL); + return 0; +} diff --git a/tracer/events.go b/tracer/events.go index 36238bbe9..a574d5e8d 100644 --- a/tracer/events.go +++ b/tracer/events.go @@ -47,8 +47,8 @@ func (t *Tracer) processPIDEvents(ctx context.Context) { defer pidCleanupTicker.Stop() for { select { - case pid := <-t.pidEvents: - t.processManager.SynchronizeProcess(process.New(pid)) + case pidTid := <-t.pidEvents: + t.processManager.SynchronizeProcess(process.New(pidTid.PID(), pidTid.TID())) case <-pidCleanupTicker.C: t.processManager.CleanupPIDs() case <-ctx.Done(): diff --git a/tracer/tracepoints.go b/tracer/tracepoints.go index dc812c932..7fbe1d4c8 100644 --- a/tracer/tracepoints.go +++ b/tracer/tracepoints.go @@ -36,6 +36,6 @@ func (t *Tracer) AttachSchedMonitor() error { } defer restoreRlimit() - prog := t.ebpfProgs["tracepoint__sched_process_exit"] - return t.attachToTracepoint("sched", "sched_process_exit", prog) + prog := t.ebpfProgs["tracepoint__sched_process_free"] + return t.attachToTracepoint("sched", "sched_process_free", prog) } diff --git a/tracer/tracer.go b/tracer/tracer.go index e780bd98e..c827dfb96 100644 --- a/tracer/tracer.go +++ b/tracer/tracer.go @@ -105,10 +105,11 @@ type Tracer struct { // when process events take place (new, exit, unknown PC). triggerPIDProcessing chan bool - // pidEvents notifies the tracer of new PID events. + // pidEvents notifies the tracer of new PID events. Each PID event is a 64bit integer + // value, see bpf_get_current_pid_tgid for information on how the value is encoded. // It needs to be buffered to avoid locking the writers and stacking up resources when we // read new PIDs at startup or notified via eBPF. - pidEvents chan libpf.PID + pidEvents chan libpf.PIDTID // intervals provides access to globally configured timers and counters. intervals Intervals @@ -323,7 +324,7 @@ func NewTracer(ctx context.Context, cfg *Config) (*Tracer, error) { kernelSymbols: kernelSymbols, kernelModules: kernelModules, triggerPIDProcessing: make(chan bool, 1), - pidEvents: make(chan libpf.PID, pidEventBufferSize), + pidEvents: make(chan libpf.PIDTID, pidEventBufferSize), ebpfMaps: ebpfMaps, ebpfProgs: ebpfProgs, hooks: make(map[hookPoint]link.Link), @@ -593,7 +594,7 @@ func loadPerfUnwinders(coll *cebpf.CollectionSpec, ebpfProgs map[string]*cebpf.P copy(progs, tailCallProgs) progs = append(progs, progLoaderHelper{ - name: "tracepoint__sched_process_exit", + name: "tracepoint__sched_process_free", noTailCallTarget: true, enable: true, }, @@ -843,12 +844,12 @@ func (t *Tracer) enableEvent(eventType int) { // monitorPIDEventsMap periodically iterates over the eBPF map pid_events, // collects PIDs and writes them to the keys slice. -func (t *Tracer) monitorPIDEventsMap(keys *[]uint32) { +func (t *Tracer) monitorPIDEventsMap(keys *[]libpf.PIDTID) { eventsMap := t.ebpfMaps["pid_events"] - var key, nextKey uint32 + var key, nextKey uint64 var value bool keyFound := true - deleteBatch := make(libpf.Set[uint32]) + deleteBatch := make(libpf.Set[uint64]) // Key 0 retrieves the very first element in the hash map as // it is guaranteed not to exist in pid_events. @@ -891,7 +892,7 @@ func (t *Tracer) monitorPIDEventsMap(keys *[]uint32) { // exact point), we may block sending to the channel, delay the iteration and may introduce // race conditions (related to deletion). For that reason, keys are first collected and, // after the iteration has finished, sent to the channel. - *keys = append(*keys, key) + *keys = append(*keys, libpf.PIDTID(key)) } keysToDelete := len(deleteBatch) @@ -1048,15 +1049,15 @@ func (t *Tracer) StartMapMonitors(ctx context.Context, traceOutChan chan<- *host eventMetricCollector := t.startEventMonitor(ctx) traceEventMetricCollector := t.startTraceEventMonitor(ctx, traceOutChan) - pidEvents := make([]uint32, 0) + pidEvents := make([]libpf.PIDTID, 0) periodiccaller.StartWithManualTrigger(ctx, t.intervals.MonitorInterval(), t.triggerPIDProcessing, func(_ bool) { t.enableEvent(support.EventTypeGenericPID) t.monitorPIDEventsMap(&pidEvents) - for _, ev := range pidEvents { - log.Debugf("=> PID: %v", ev) - t.pidEvents <- libpf.PID(ev) + for _, pidTid := range pidEvents { + log.Debugf("=> %v", pidTid) + t.pidEvents <- pidTid } // Keep the underlying array alive to avoid GC pressure