Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 34 additions & 2 deletions cli_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"flag"
"fmt"
"os"
"strconv"
"strings"
"time"

"github.com/peterbourgon/ff/v3"
Expand Down Expand Up @@ -80,13 +82,35 @@ var (
"Expected format: probe_type:target[:symbol]. probe_type can be kprobe, kretprobe, uprobe, or uretprobe."
loadProbeHelper = "Load generic eBPF program that can be attached externally to " +
"various user or kernel space hooks."
targetPidsHelp = "Comma-separated list of host PIDs to profile. When set, only these PIDs are instrumented. Empty means profile all."
)

// Package-scope variable, so that conditionally compiled other components can refer
// to the same flagset.

func parseTargetPIDs(s string) []int {
if s == "" {
return nil
}
parts := strings.Split(s, ",")
out := make([]int, 0, len(parts))
for _, p := range parts {
p = strings.TrimSpace(p)
if p == "" {
continue
}
n, err := strconv.Atoi(p)
if err != nil || n <= 0 {
continue
}
out = append(out, n)
}
return out
}

func parseArgs() (*controller.Config, error) {
var args controller.Config
var targetPidsStr string

fs := flag.NewFlagSet("ebpf-profiler", flag.ExitOnError)

Expand Down Expand Up @@ -148,19 +172,27 @@ func parseArgs() (*controller.Config, error) {

fs.BoolVar(&args.LoadProbe, "load-probe", false, loadProbeHelper)

fs.StringVar(&targetPidsStr, "target-pids", "", targetPidsHelp)

fs.Usage = func() {
fs.PrintDefaults()
}

args.Fs = fs

return &args, ff.Parse(fs, os.Args[1:],
if err := ff.Parse(fs, os.Args[1:],
ff.WithEnvVarPrefix("OTEL_PROFILING_AGENT"),
ff.WithConfigFileFlag("config"),
ff.WithConfigFileParser(ff.PlainParser),
// This will ignore configuration file (only) options that the current HA
// does not recognize.
ff.WithIgnoreUndefined(true),
ff.WithAllowMissingConfigFile(true),
)
); err != nil {
return nil, err
}
if targetPidsStr != "" {
args.TargetPIDs = parseTargetPIDs(targetPidsStr)
}
return &args, nil
}
6 changes: 6 additions & 0 deletions collector/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ type Config struct {
NoKernelVersionCheck bool `mapstructure:"no_kernel_version_check"`
MaxGRPCRetries uint32 `mapstructure:"max_grpc_retries"`
MaxRPCMsgSize int `mapstructure:"max_rpc_msg_size"`
// TargetPIDs, when non-empty, restricts instrumentation to these host PIDs only. Empty means instrument all.
TargetPIDs []int `mapstructure:"target_pids"`
// TargetPIDsFile, when non-empty, is a path to a file that lists target PIDs (one per line or comma-separated).
// The controller watches this file and calls UpdateTargetPIDs with the parsed list on change. Used when an
// external process (e.g. Odiglet) writes the current set of PIDs to instrument.
TargetPIDsFile string `mapstructure:"target_pids_file"`
}

// Validate validates the config.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ require (
github.com/josharian/native v1.1.0 // indirect
github.com/jsimonetti/rtnetlink/v2 v2.0.3 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/cpuid/v2 v2.2.10 // indirect
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
github.com/knadh/koanf/maps v0.1.2 // indirect
github.com/knadh/koanf/providers/confmap v1.0.0 // indirect
github.com/knadh/koanf/v2 v2.3.2 // indirect
Expand Down
3 changes: 1 addition & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,7 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/klauspost/compress v1.18.4 h1:RPhnKRAQ4Fh8zU2FY/6ZFDwTVTxgJ/EMydqSTzE9a2c=
github.com/klauspost/compress v1.18.4/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4=
github.com/klauspost/cpuid/v2 v2.2.10 h1:tBs3QSyvjDyFTq3uoc/9xFpCuOsJQFNPiAhYdw2skhE=
github.com/klauspost/cpuid/v2 v2.2.10/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y=
github.com/knadh/koanf/maps v0.1.2 h1:RBfmAW5CnZT+PJ1CVc1QSJKf4Xu9kxfQgYVQSu8hpbo=
github.com/knadh/koanf/maps v0.1.2/go.mod h1:npD/QZY3V6ghQDdcQzl1W4ICNVTkohC8E73eI2xW4yI=
github.com/knadh/koanf/providers/confmap v1.0.0 h1:mHKLJTE7iXEys6deO5p6olAiZdG5zwp8Aebir+/EaRE=
Expand Down
74 changes: 74 additions & 0 deletions internal/controller/controller.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package controller // import "go.opentelemetry.io/ebpf-profiler/internal/controller"

import (
"bufio"
"context"
"fmt"
"math"
"os"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -109,6 +112,7 @@ func (c *Controller) Start(ctx context.Context) error {
ProbeLinks: c.config.ProbeLinks,
LoadProbe: c.config.LoadProbe,
ExecutableReporter: c.config.ExecutableReporter,
TargetPIDs: c.config.TargetPIDs,
})
if err != nil {
return fmt.Errorf("failed to load eBPF tracer: %w", err)
Expand Down Expand Up @@ -164,6 +168,10 @@ func (c *Controller) Start(ctx context.Context) error {
return fmt.Errorf("failed to start trace handling: %w", err)
}

if c.config.TargetPIDsFile != "" {
go c.runTargetPIDsFileWatcher(ctx, trc)
}

return nil
}

Expand Down Expand Up @@ -213,3 +221,69 @@ func (c *Controller) startTraceHandling(ctx context.Context, trc *tracer.Tracer)

return nil
}

const targetPIDsFilePollInterval = 10 * time.Second

// runTargetPIDsFileWatcher polls the configured file and updates the tracer's target PIDs on change.
// File format: one PID per line, or comma-separated (or both). Empty/missing file means instrument all.
func (c *Controller) runTargetPIDsFileWatcher(ctx context.Context, trc *tracer.Tracer) {
path := c.config.TargetPIDsFile
var lastMod time.Time
ticker := time.NewTicker(targetPIDsFilePollInterval)
defer ticker.Stop()

apply := func() {
data, err := os.ReadFile(path)
if err != nil {
if !os.IsNotExist(err) {
log.Debugf("target_pids_file read failed: %v", err)
}
trc.UpdateTargetPIDs(nil)
return
}
pids := parsePIDsFromFileContent(data)
trc.UpdateTargetPIDs(pids)
log.Debugf("target_pids_file applied %d PIDs from %s", len(pids), path)
}

// Apply once immediately, then on ticker when file mod time changes
apply()
if info, err := os.Stat(path); err == nil {
lastMod = info.ModTime()
}
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
info, err := os.Stat(path)
if err != nil {
continue
}
if info.ModTime().After(lastMod) {
lastMod = info.ModTime()
apply()
}
}
}
}

// parsePIDsFromFileContent parses PIDs from file content: newline and comma separated, positive integers only.
func parsePIDsFromFileContent(data []byte) []int {
var pids []int
scanner := bufio.NewScanner(strings.NewReader(string(data)))
for scanner.Scan() {
for _, p := range strings.Split(scanner.Text(), ",") {
p = strings.TrimSpace(p)
if p == "" {
continue
}
n, err := strconv.Atoi(p)
if err != nil || n <= 0 {
continue
}
pids = append(pids, n)
}
}
return pids
}
34 changes: 34 additions & 0 deletions internal/controller/controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package controller

import (
"testing"
)

func TestParsePIDsFromFileContent(t *testing.T) {
tests := []struct {
name string
data []byte
expected []int
}{
{"empty", []byte(""), nil},
{"newlines", []byte("1234\n5678\n"), []int{1234, 5678}},
{"comma", []byte("1234,5678"), []int{1234, 5678}},
{"mixed", []byte("1234, 5678\n90"), []int{1234, 5678, 90}},
{"skip invalid", []byte("1234, 0, -1, abc, 5678"), []int{1234, 5678}},
{"whitespace", []byte(" 1234 \n 5678 "), []int{1234, 5678}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := parsePIDsFromFileContent(tt.data)
if len(got) != len(tt.expected) {
t.Errorf("parsePIDsFromFileContent() len = %v, want %v", got, tt.expected)
return
}
for i := range got {
if got[i] != tt.expected[i] {
t.Errorf("parsePIDsFromFileContent()[%d] = %v, want %v", i, got[i], tt.expected[i])
}
}
})
}
}
21 changes: 21 additions & 0 deletions processmanager/processinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,27 @@ func (pm *ProcessManager) CleanupPIDs() {
}
}

// TrackedPIDs returns a copy of the PIDs currently tracked by the ProcessManager
// (present in pidToProcessInfo). Used by the tracer to determine which PIDs to
// revoke when the target PID allowlist is updated.
func (pm *ProcessManager) TrackedPIDs() []libpf.PID {
pm.mu.RLock()
defer pm.mu.RUnlock()
out := make([]libpf.PID, 0, len(pm.pidToProcessInfo))
for pid := range pm.pidToProcessInfo {
out = append(out, pid)
}
return out
}

// RemoveFromInstrumentation tears down instrumentation for the given PID the same
// way as process exit: removes from pid_page_to_mapping_info, records exit, and
// lets ProcessedUntil clean in-memory state. Use this when the PID is no longer
// in the target allowlist (process may still be live).
func (pm *ProcessManager) RemoveFromInstrumentation(pid libpf.PID) {
pm.processPIDExit(pid)
}

// MetaForPID returns the process metadata for given PID.
func (pm *ProcessManager) MetaForPID(pid libpf.PID) process.ProcessMeta {
pm.mu.RLock()
Expand Down
3 changes: 3 additions & 0 deletions tracer/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ func (t *Tracer) processPIDEvents(ctx context.Context) {
for {
select {
case pidTid := <-t.pidEvents:
if !t.isTargetPID(pidTid.PID()) {
continue
}
t.processManager.SynchronizeProcess(process.New(pidTid.PID(), pidTid.TID()))
case <-pidCleanupTicker.C:
t.processManager.CleanupPIDs()
Expand Down
Loading
Loading