diff --git a/docs/config-schema.json b/docs/config-schema.json index ee47e3d76e..caa3f86a6d 100644 --- a/docs/config-schema.json +++ b/docs/config-schema.json @@ -671,7 +671,7 @@ "type": "integer" }, "type": "array", - "description": "PIDs allows selecting processes by PID. When non-empty, the process PID must be in this list (in addition to any path/port criteria).", + "description": "PIDs allows selecting processes by PID (static from config). When non-empty, the process PID must be in this list.", "x-env-var": "OTEL_EBPF_TARGET_PID" } }, diff --git a/pkg/appolly/discover/attacher.go b/pkg/appolly/discover/attacher.go index 07a394db54..0a8e6df663 100644 --- a/pkg/appolly/discover/attacher.go +++ b/pkg/appolly/discover/attacher.go @@ -28,6 +28,9 @@ import ( "go.opentelemetry.io/obi/pkg/pipe/swarm/swarms" ) +// Swappable in tests so attacher tests don't depend on memlock permissions. +var removeMemlock = rlimit.RemoveMemlock + // traceAttacher creates the available trace.Tracer implementations (Go HTTP tracer, GRPC tracer, Generic tracer...) // for each received Instrumentable process and forwards an ebpf.ProcessTracer instance ready to run and start // instrumenting the executable @@ -411,7 +414,7 @@ func (ta *traceAttacher) notifyProcessDeletion(ie *ebpf.Instrumentable) { } func (ta *traceAttacher) init() error { - if err := rlimit.RemoveMemlock(); err != nil { + if err := removeMemlock(); err != nil { return fmt.Errorf("removing memory lock: %w", err) } return nil diff --git a/pkg/appolly/discover/attacher_test.go b/pkg/appolly/discover/attacher_test.go new file mode 100644 index 0000000000..0b38c31e84 --- /dev/null +++ b/pkg/appolly/discover/attacher_test.go @@ -0,0 +1,201 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package discover + +import ( + "context" + "io" + "testing" + + cebpf "github.com/cilium/ebpf" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/obi/pkg/appolly/app" + "go.opentelemetry.io/obi/pkg/appolly/app/request" + "go.opentelemetry.io/obi/pkg/appolly/app/svc" + execpkg "go.opentelemetry.io/obi/pkg/appolly/discover/exec" + "go.opentelemetry.io/obi/pkg/appolly/services" + "go.opentelemetry.io/obi/pkg/ebpf" + ebpfcommon "go.opentelemetry.io/obi/pkg/ebpf/common" + "go.opentelemetry.io/obi/pkg/export/imetrics" + "go.opentelemetry.io/obi/pkg/internal/goexec" + "go.opentelemetry.io/obi/pkg/internal/testutil" + "go.opentelemetry.io/obi/pkg/obi" + "go.opentelemetry.io/obi/pkg/pipe/msg" +) + +type blockedPID struct { + pid app.PID + ns uint32 +} + +type recordingTracer struct { + blocked []blockedPID +} + +func (r *recordingTracer) AllowPID(app.PID, uint32, *svc.Attrs) {} +func (r *recordingTracer) BlockPID(pid app.PID, ns uint32) { + r.blocked = append(r.blocked, blockedPID{pid: pid, ns: ns}) +} +func (r *recordingTracer) LoadSpecs() ([]*ebpfcommon.SpecBundle, error) { return nil, nil } +func (r *recordingTracer) AddCloser(...io.Closer) {} +func (r *recordingTracer) SetupTailCalls() {} +func (r *recordingTracer) KProbes() map[string]ebpfcommon.ProbeDesc { return nil } +func (r *recordingTracer) Tracepoints() map[string]ebpfcommon.ProbeDesc { return nil } +func (r *recordingTracer) GoProbes() map[string][]*ebpfcommon.ProbeDesc { return nil } +func (r *recordingTracer) UProbes() map[string]map[string][]*ebpfcommon.ProbeDesc { return nil } +func (r *recordingTracer) SocketFilters() []*cebpf.Program { return nil } +func (r *recordingTracer) SockMsgs() []ebpfcommon.SockMsg { return nil } +func (r *recordingTracer) SockOps() []ebpfcommon.SockOps { return nil } +func (r *recordingTracer) Iters() []*ebpfcommon.Iter { return nil } +func (r *recordingTracer) Tracing() []*ebpfcommon.Tracing { return nil } +func (r *recordingTracer) RecordInstrumentedLib(uint64, []io.Closer) {} +func (r *recordingTracer) AddInstrumentedLibRef(uint64) {} +func (r *recordingTracer) AlreadyInstrumentedLib(uint64) bool { return false } +func (r *recordingTracer) UnlinkInstrumentedLib(uint64) {} +func (r *recordingTracer) RegisterOffsets(*execpkg.FileInfo, *goexec.Offsets) {} +func (r *recordingTracer) ProcessBinary(*execpkg.FileInfo) {} +func (r *recordingTracer) Required() bool { return false } +func (r *recordingTracer) Run(context.Context, *ebpfcommon.EBPFEventContext, *msg.Queue[[]request.Span]) { +} + +func TestSyntheticDeletePath_TraceAttacherDeletesTracer(t *testing.T) { + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + origRemoveMemlock := removeMemlock + removeMemlock = func() error { return nil } + defer func() { removeMemlock = origRemoveMemlock }() + + processMatches := msg.NewQueue[[]Event[ProcessMatch]](msg.ChannelBufferLen(10)) + instrumentables := msg.NewQueue[[]Event[ebpf.Instrumentable]](msg.ChannelBufferLen(10)) + tracerEventsQu := msg.NewQueue[Event[*ebpf.Instrumentable]](msg.ChannelBufferLen(10)) + tracerEvents := tracerEventsQu.Subscribe() + + fileInfo := &execpkg.FileInfo{ + Service: svc.Attrs{UID: svc.UID{Name: "dyn-svc", Namespace: "ns"}}, + CmdExePath: "/bin/test", + Pid: 42, + Ino: 1234, + Ns: 17, + } + startDeletedTyperPipeline(ctx, &typer{ + currentPids: map[app.PID]*execpkg.FileInfo{42: fileInfo}, + }, processMatches, instrumentables) + + ta := &traceAttacher{ + Cfg: &obi.Config{}, + Metrics: imetrics.NoopReporter{}, + InputInstrumentables: instrumentables, + OutputTracerEvents: tracerEventsQu, + EbpfEventContext: &ebpfcommon.EBPFEventContext{}, + } + run, err := ta.attacherLoop(ctx) + require.NoError(t, err) + + prog := &recordingTracer{} + tracer := &ebpf.ProcessTracer{Type: ebpf.Generic, Programs: []ebpf.Tracer{prog}} + ta.existingTracers[fileInfo.Ino] = tracer + ta.processInstances.Inc(fileInfo.Ino) + + go run(ctx) + + processMatches.Send([]Event[ProcessMatch]{{ + Type: EventDeleted, + Obj: ProcessMatch{ + Process: &services.ProcessInfo{Pid: 42}, + }, + }}) + + ev := testutil.ReadChannel(t, tracerEvents, testTimeout) + require.Equal(t, EventDeleted, ev.Type) + require.NotNil(t, ev.Obj) + assert.Equal(t, app.PID(42), ev.Obj.FileInfo.Pid) + assert.Same(t, tracer, ev.Obj.Tracer) + assert.Equal(t, []blockedPID{{pid: 42, ns: 17}}, prog.blocked) + _, exists := ta.existingTracers[fileInfo.Ino] + assert.False(t, exists) +} + +func TestSyntheticDeletePath_TraceAttacherDeletesInstance(t *testing.T) { + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + origRemoveMemlock := removeMemlock + removeMemlock = func() error { return nil } + defer func() { removeMemlock = origRemoveMemlock }() + + processMatches := msg.NewQueue[[]Event[ProcessMatch]](msg.ChannelBufferLen(10)) + instrumentables := msg.NewQueue[[]Event[ebpf.Instrumentable]](msg.ChannelBufferLen(10)) + tracerEventsQu := msg.NewQueue[Event[*ebpf.Instrumentable]](msg.ChannelBufferLen(10)) + tracerEvents := tracerEventsQu.Subscribe() + + fileInfo := &execpkg.FileInfo{ + Service: svc.Attrs{UID: svc.UID{Name: "dyn-svc", Namespace: "ns"}}, + CmdExePath: "/bin/test", + Pid: 42, + Ino: 1234, + Ns: 17, + } + startDeletedTyperPipeline(ctx, &typer{ + currentPids: map[app.PID]*execpkg.FileInfo{42: fileInfo}, + }, processMatches, instrumentables) + + ta := &traceAttacher{ + Cfg: &obi.Config{}, + Metrics: imetrics.NoopReporter{}, + InputInstrumentables: instrumentables, + OutputTracerEvents: tracerEventsQu, + EbpfEventContext: &ebpfcommon.EBPFEventContext{}, + } + run, err := ta.attacherLoop(ctx) + require.NoError(t, err) + + prog := &recordingTracer{} + tracer := &ebpf.ProcessTracer{Type: ebpf.Generic, Programs: []ebpf.Tracer{prog}} + ta.existingTracers[fileInfo.Ino] = tracer + ta.processInstances.Inc(fileInfo.Ino) + ta.processInstances.Inc(fileInfo.Ino) + + go run(ctx) + + processMatches.Send([]Event[ProcessMatch]{{ + Type: EventDeleted, + Obj: ProcessMatch{ + Process: &services.ProcessInfo{Pid: 42}, + }, + }}) + + ev := testutil.ReadChannel(t, tracerEvents, testTimeout) + require.Equal(t, EventInstanceDeleted, ev.Type) + require.NotNil(t, ev.Obj) + assert.Equal(t, app.PID(42), ev.Obj.FileInfo.Pid) + assert.Nil(t, ev.Obj.Tracer) + assert.Equal(t, []blockedPID{{pid: 42, ns: 17}}, prog.blocked) + assert.Same(t, tracer, ta.existingTracers[fileInfo.Ino]) +} + +func startDeletedTyperPipeline( + ctx context.Context, + tp *typer, + input *msg.Queue[[]Event[ProcessMatch]], + output *msg.Queue[[]Event[ebpf.Instrumentable]], +) { + in := input.Subscribe(msg.SubscriberName("testExecTyper")) + go func() { + defer output.Close() + for { + select { + case <-ctx.Done(): + return + case evs, ok := <-in: + if !ok { + return + } + if out := tp.FilterClassify(evs); len(out) > 0 { + output.Send(out) + } + } + } + }() +} diff --git a/pkg/appolly/discover/dynamic_pid_selector.go b/pkg/appolly/discover/dynamic_pid_selector.go new file mode 100644 index 0000000000..7863d1c089 --- /dev/null +++ b/pkg/appolly/discover/dynamic_pid_selector.go @@ -0,0 +1,222 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package discover // import "go.opentelemetry.io/obi/pkg/appolly/discover" + +import ( + "iter" + "sync" + + "go.opentelemetry.io/obi/pkg/appolly/app" + "go.opentelemetry.io/obi/pkg/appolly/services" + "go.opentelemetry.io/obi/pkg/export/otel/perapp" +) + +// DynamicPIDSelector holds the runtime set of target PIDs for OBI. It is preloaded from +// config target_pids and updated at runtime via AddPIDs/RemovePIDs. Only the discover +// matcher uses it for matching; the instrumenter (or appolly) holds a reference and +// calls AddPIDs/RemovePIDs directly. +// +// Pending add/remove PIDs are accumulated in slices and drained by goroutines into +// RemovedNotify() and AddedPIDsNotify(), so callers never block and nothing is dropped. +type DynamicPIDSelector struct { + mu sync.RWMutex + pids []uint32 + + removedCh chan []app.PID // consumer receives from this + removedPending []app.PID // PIDs to send on next drain + removedMu sync.Mutex + removedCond *sync.Cond + + addedCh chan []app.PID // consumer receives from this + addedPending []app.PID // PIDs to send on next drain + addedMu sync.Mutex + addedCond *sync.Cond +} + +// NewDynamicPIDSelector creates a new dynamic PID selector (initially empty). +// It starts goroutines that drain pending add/remove PIDs to the notify channels. +func NewDynamicPIDSelector() *DynamicPIDSelector { + d := &DynamicPIDSelector{ + removedCh: make(chan []app.PID, 1), + addedCh: make(chan []app.PID, 1), + } + d.removedCond = sync.NewCond(&d.removedMu) + d.addedCond = sync.NewCond(&d.addedMu) + go d.drainRemoved() + go d.drainAdded() + return d +} + +// RemovedNotify returns the channel on which removed PIDs are sent when RemovePIDs is called. +// The matcher uses this to emit synthetic deletes. Safe to call from multiple goroutines. +func (d *DynamicPIDSelector) RemovedNotify() <-chan []app.PID { + return d.removedCh +} + +// AddedPIDsNotify returns the channel on which newly added PIDs are sent when AddPIDs is called. +// The process watcher uses this to forget those PIDs from its tracked state so they are re-emitted +// as new on the next poll (supporting adding an already-seen process to the dynamic set). +func (d *DynamicPIDSelector) AddedPIDsNotify() <-chan []app.PID { + return d.addedCh +} + +// GetPIDs returns a copy of the current PID list and true when non-empty. +func (d *DynamicPIDSelector) GetPIDs() ([]app.PID, bool) { + d.mu.RLock() + defer d.mu.RUnlock() + if len(d.pids) == 0 { + return nil, false + } + out := make([]app.PID, len(d.pids)) + for i, p := range d.pids { + out[i] = app.PID(p) + } + return out, true +} + +// AddPIDs adds PIDs to the set (deduplicated). Newly added PIDs are sent on AddedPIDsNotify() +// so the process watcher can forget them and re-emit them as new on the next poll. +func (d *DynamicPIDSelector) AddPIDs(pids ...uint32) { + if len(pids) == 0 { + return + } + d.mu.Lock() + existing := make(map[uint32]struct{}, len(d.pids)) + for _, p := range d.pids { + existing[p] = struct{}{} + } + var added []app.PID + for _, u := range pids { + if _, ok := existing[u]; !ok { + existing[u] = struct{}{} + d.pids = append(d.pids, u) + added = append(added, app.PID(u)) + } + } + d.mu.Unlock() + d.notifyAdded(added) +} + +// RemovePIDs removes PIDs from the set and sends them on RemovedNotify() for the matcher. +func (d *DynamicPIDSelector) RemovePIDs(pids ...uint32) { + if len(pids) == 0 { + return + } + toRemove := make(map[uint32]struct{}) + for _, u := range pids { + toRemove[u] = struct{}{} + } + d.mu.Lock() + newPids := d.pids[:0] + removedPIDs := make([]app.PID, 0, len(pids)) + for _, p := range d.pids { + if _, remove := toRemove[p]; !remove { + newPids = append(newPids, p) + continue + } + removedPIDs = append(removedPIDs, app.PID(p)) + } + d.pids = newPids + d.mu.Unlock() + d.notifyRemoved(removedPIDs) +} + +func (d *DynamicPIDSelector) notifyRemoved(removedPIDs []app.PID) { + if len(removedPIDs) == 0 { + return + } + d.removedMu.Lock() + d.removedPending = append(d.removedPending, removedPIDs...) + d.removedCond.Signal() + d.removedMu.Unlock() +} + +func (d *DynamicPIDSelector) notifyAdded(addedPIDs []app.PID) { + if len(addedPIDs) == 0 { + return + } + d.addedMu.Lock() + d.addedPending = append(d.addedPending, addedPIDs...) + d.addedCond.Signal() + d.addedMu.Unlock() +} + +// drainRemoved runs in a goroutine; it sends the current pending removed PIDs and clears the slice. +func (d *DynamicPIDSelector) drainRemoved() { + for { + d.removedMu.Lock() + for len(d.removedPending) == 0 { + d.removedCond.Wait() + } + batch := append([]app.PID(nil), d.removedPending...) + d.removedPending = d.removedPending[:0] + d.removedMu.Unlock() + d.removedCh <- batch + } +} + +// drainAdded runs in a goroutine; it sends the current pending added PIDs and clears the slice. +func (d *DynamicPIDSelector) drainAdded() { + for { + d.addedMu.Lock() + for len(d.addedPending) == 0 { + d.addedCond.Wait() + } + batch := append([]app.PID(nil), d.addedPending...) + d.addedPending = d.addedPending[:0] + d.addedMu.Unlock() + d.addedCh <- batch + } +} + +// AsSelector returns a services.Selector that matches when the process PID is in this dynamic set. +// The matcher uses it to treat runtime PIDs as a supplement to config criteria. +func (d *DynamicPIDSelector) AsSelector() services.Selector { + return &dynamicPIDCriteriaAdapter{d: d} +} + +// dynamicPIDCriteriaAdapter implements services.Selector by delegating only GetPIDs to the +// DynamicPIDSelector; all other methods return empty/zero so the matcher treats "PID in dynamic set" +// as a match. +type dynamicPIDCriteriaAdapter struct { + d *DynamicPIDSelector +} + +func (a *dynamicPIDCriteriaAdapter) GetName() string { return "" } +func (a *dynamicPIDCriteriaAdapter) GetNamespace() string { return "" } +func (a *dynamicPIDCriteriaAdapter) GetPath() services.StringMatcher { return &emptyMatcher{} } +func (a *dynamicPIDCriteriaAdapter) GetPathRegexp() services.StringMatcher { return &emptyMatcher{} } +func (a *dynamicPIDCriteriaAdapter) GetOpenPorts() *services.IntEnum { return &services.IntEnum{} } +func (a *dynamicPIDCriteriaAdapter) GetLanguages() services.StringMatcher { return &emptyMatcher{} } +func (a *dynamicPIDCriteriaAdapter) GetPIDs() ([]app.PID, bool) { return a.d.GetPIDs() } +func (a *dynamicPIDCriteriaAdapter) GetCmdArgs() services.StringMatcher { return &emptyMatcher{} } +func (a *dynamicPIDCriteriaAdapter) IsContainersOnly() bool { return false } +func (a *dynamicPIDCriteriaAdapter) RangeMetadata() iter.Seq2[string, services.StringMatcher] { + return emptyMetadataSeq2 +} + +func (a *dynamicPIDCriteriaAdapter) RangePodLabels() iter.Seq2[string, services.StringMatcher] { + return emptyMetadataSeq2 +} + +func (a *dynamicPIDCriteriaAdapter) RangePodAnnotations() iter.Seq2[string, services.StringMatcher] { + return emptyMetadataSeq2 +} + +func (a *dynamicPIDCriteriaAdapter) GetExportModes() services.ExportModes { + return services.ExportModeUnset +} + +func (a *dynamicPIDCriteriaAdapter) GetSamplerConfig() *services.SamplerConfig { return nil } +func (a *dynamicPIDCriteriaAdapter) GetRoutesConfig() *services.CustomRoutesConfig { return nil } +func (a *dynamicPIDCriteriaAdapter) MetricsConfig() perapp.SvcMetricsConfig { + return perapp.SvcMetricsConfig{} +} + +type emptyMatcher struct{} + +func (emptyMatcher) IsSet() bool { return false } +func (emptyMatcher) MatchString(_ string) bool { return false } + +func emptyMetadataSeq2(_ func(string, services.StringMatcher) bool) {} diff --git a/pkg/appolly/discover/dynamic_pid_selector_test.go b/pkg/appolly/discover/dynamic_pid_selector_test.go new file mode 100644 index 0000000000..becb3f0aa0 --- /dev/null +++ b/pkg/appolly/discover/dynamic_pid_selector_test.go @@ -0,0 +1,97 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package discover + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/obi/pkg/appolly/app" +) + +func TestDynamicPIDSelector_AddPIDs_RemovePIDs_GetPIDs(t *testing.T) { + d := NewDynamicPIDSelector() + pids, ok := d.GetPIDs() + assert.False(t, ok) + assert.Nil(t, pids) + + d.AddPIDs(1, 2, 3) + pids, ok = d.GetPIDs() + require.True(t, ok) + assert.Equal(t, []app.PID{1, 2, 3}, pids) + + d.AddPIDs(2, 3, 4) + pids, ok = d.GetPIDs() + require.True(t, ok) + assert.Equal(t, []app.PID{1, 2, 3, 4}, pids) + + d.RemovePIDs(2, 4) + pids, ok = d.GetPIDs() + require.True(t, ok) + assert.Equal(t, []app.PID{1, 3}, pids) + + d.RemovePIDs(1, 3) + pids, ok = d.GetPIDs() + assert.False(t, ok) + assert.Nil(t, pids) +} + +func TestDynamicPIDSelector_RemovePIDs_Notify(t *testing.T) { + d := NewDynamicPIDSelector() + d.AddPIDs(42, 100) + ch := d.RemovedNotify() + + d.RemovePIDs(100) + got := <-ch + assert.Equal(t, []app.PID{100}, got) + + d.RemovePIDs(42) + got = <-ch + assert.Equal(t, []app.PID{42}, got) +} + +func TestDynamicPIDSelector_AddPIDs_Notify(t *testing.T) { + d := NewDynamicPIDSelector() + ch := d.AddedPIDsNotify() + + d.AddPIDs(42, 100) + got := <-ch + assert.Equal(t, []app.PID{42, 100}, got) + + // Adding already-present PIDs does not notify + d.AddPIDs(42) + select { + case <-ch: + t.Fatal("expected no send when adding existing PID") + default: + } + // New PIDs only + d.AddPIDs(42, 99) + got = <-ch + assert.Equal(t, []app.PID{99}, got) +} + +// TestDynamicPIDSelector_QueueNoDrop verifies that rapid AddPIDs/RemovePIDs accumulate +// in a single pending slice and are sent together when the consumer drains (no drops). +func TestDynamicPIDSelector_QueueNoDrop(t *testing.T) { + d := NewDynamicPIDSelector() + d.AddPIDs(1, 2, 3, 4) + removedCh := d.RemovedNotify() + addedCh := d.AddedPIDsNotify() + + // Drain the initial AddPIDs(1,2,3,4) + <-addedCh + + d.RemovePIDs(1) + d.RemovePIDs(2, 3) + gotRemoved := <-removedCh + assert.ElementsMatch(t, []app.PID{1, 2, 3}, gotRemoved) + + d.AddPIDs(10, 20) + d.AddPIDs(30) + gotAdded := <-addedCh + assert.ElementsMatch(t, []app.PID{10, 20, 30}, gotAdded) +} diff --git a/pkg/appolly/discover/finder.go b/pkg/appolly/discover/finder.go index 9d37deac2c..b61f3e3e88 100644 --- a/pkg/appolly/discover/finder.go +++ b/pkg/appolly/discover/finder.go @@ -7,6 +7,7 @@ import ( "context" "fmt" + "go.opentelemetry.io/obi/pkg/appolly/app" "go.opentelemetry.io/obi/pkg/appolly/app/request" "go.opentelemetry.io/obi/pkg/ebpf" ebpfcommon "go.opentelemetry.io/obi/pkg/ebpf/common" @@ -42,6 +43,7 @@ func NewProcessFinder( type processFinderStartConfig struct { enrichedProcessEvents *msg.Queue[[]Event[ProcessAttrs]] + dynamicPIDSelector *DynamicPIDSelector } // ProcessFinderStartOpt allows overriding some internal behavior of ProcessFinder.Start method. @@ -57,6 +59,15 @@ func WithEnrichedProcessEvents(enrichedProcessEvents *msg.Queue[[]Event[ProcessA } } +// WithDynamicPIDSelector supplies the OBI dynamic PID set. Caller can pass discover.NewDynamicPIDSelector() +// and add PIDs via the selector. When non-nil, the finder wires it to the matcher and removed-PID +// notifications are used for synthetic deletes. +func WithDynamicPIDSelector(selector *DynamicPIDSelector) ProcessFinderStartOpt { + return func(cfg *processFinderStartConfig) { + cfg.dynamicPIDSelector = selector + } +} + // Start the ProcessFinder pipeline in background. It returns a channel where each new discovered // ebpf.ProcessTracer will be notified. func (pf *ProcessFinder) Start(ctx context.Context, opts ...ProcessFinderStartOpt) (<-chan Event[*ebpf.Instrumentable], error) { @@ -67,10 +78,16 @@ func (pf *ProcessFinder) Start(ctx context.Context, opts ...ProcessFinderStartOp tracerEvents := msgh.QueueFromConfig[Event[*ebpf.Instrumentable]](pf.cfg, "tracerEvents") + configCriteria := FindingCriteria(pf.cfg, startConfig.dynamicPIDSelector != nil) + swi := swarm.Instancer{} processEvents := msgh.QueueFromConfig[[]Event[ProcessAttrs]](pf.cfg, "processEvents") - swi.Add(swarm.DirectInstance(ProcessWatcherFunc(pf.cfg, pf.ebpfEventContext, processEvents)), + var addedPIDsCh <-chan []app.PID + if startConfig.dynamicPIDSelector != nil { + addedPIDsCh = startConfig.dynamicPIDSelector.AddedPIDsNotify() + } + swi.Add(swarm.DirectInstance(ProcessWatcherFunc(pf.cfg, pf.ebpfEventContext, processEvents, configCriteria, addedPIDsCh)), swarm.WithID("ProcessWatcher")) kubeEnrichedEvents := msgh.QueueFromConfig[[]Event[ProcessAttrs]](pf.cfg, "kubeEnrichedEvents") @@ -96,7 +113,7 @@ func (pf *ProcessFinder) Start(ctx context.Context, opts ...ProcessFinderStartOp ), swarm.WithID("LanguageDecoratorProvider")) criteriaFilteredEvents := msgh.QueueFromConfig[[]Event[ProcessMatch]](pf.cfg, "criteriaFilteredEvents") - swi.Add(criteriaMatcherProvider(pf.cfg, langEnrichedEvents, criteriaFilteredEvents), + swi.Add(criteriaMatcherProvider(pf.cfg, langEnrichedEvents, criteriaFilteredEvents, configCriteria, startConfig.dynamicPIDSelector), swarm.WithID("CriteriaMatcher")) executableTypes := msgh.QueueFromConfig[[]Event[ebpf.Instrumentable]](pf.cfg, "executableTypes") diff --git a/pkg/appolly/discover/matcher.go b/pkg/appolly/discover/matcher.go index fc44318427..39dd78ef37 100644 --- a/pkg/appolly/discover/matcher.go +++ b/pkg/appolly/discover/matcher.go @@ -20,7 +20,6 @@ import ( "go.opentelemetry.io/obi/pkg/obi" "go.opentelemetry.io/obi/pkg/pipe/msg" "go.opentelemetry.io/obi/pkg/pipe/swarm" - "go.opentelemetry.io/obi/pkg/pipe/swarm/swarms" ) var ( @@ -30,15 +29,25 @@ var ( ) // criteriaMatcherProvider filters the processes that match the discovery criteria. +// When dynamicSelector is non-nil, runtime PIDs supplement config criteria and the matcher +// listens to the selector's RemovedNotify() channel for synthetic deletes. func criteriaMatcherProvider( cfg *obi.Config, input *msg.Queue[[]Event[ProcessAttrs]], output *msg.Queue[[]Event[ProcessMatch]], + configCriteria []services.Selector, + dynamicSelector *DynamicPIDSelector, ) swarm.InstanceFunc { instrumenterNamespace, _ := namespaceFetcherFunc(app.PID(osPidFunc())) + var removedNotify <-chan []app.PID + criteria := configCriteria + if dynamicSelector != nil { + removedNotify = dynamicSelector.RemovedNotify() + criteria = append([]services.Selector{dynamicSelector.AsSelector()}, configCriteria...) + } m := &Matcher{ Log: slog.With("component", "discover.CriteriaMatcher"), - Criteria: FindingCriteria(cfg), + Criteria: criteria, ExcludeCriteria: ExcludingCriteria(cfg), LogEnricherCriteria: LogEnricherFindingCriteria(cfg), ProcessHistory: map[app.PID]ProcessMatch{}, @@ -46,6 +55,8 @@ func criteriaMatcherProvider( Output: output, Namespace: instrumenterNamespace, HasHostPidAccess: hasHostPidAccess(), + DynamicPIDs: dynamicSelector, + RemovedPIDsNotify: removedNotify, } return swarm.DirectInstance(m.Run) } @@ -65,6 +76,11 @@ type Matcher struct { Output *msg.Queue[[]Event[ProcessMatch]] Namespace string HasHostPidAccess bool + // DynamicPIDs, when set, supplements config criteria: a process also matches if its PID is in this set. + DynamicPIDs *DynamicPIDSelector + // RemovedPIDsNotify, when set, carries the PIDs removed from the dynamic selector so the + // matcher can emit targeted synthetic deletes without rescanning ProcessHistory. + RemovedPIDsNotify <-chan []app.PID } // ProcessMatch matches a found process with the first selection criteria it fulfilled. @@ -81,14 +97,30 @@ func (pm ProcessMatch) LogEnricherEnabled() bool { func (m *Matcher) Run(ctx context.Context) { defer m.Output.Close() m.Log.Debug("starting criteria matcher node") - swarms.ForEachInput(ctx, m.Input, m.Log.Debug, func(i []Event[ProcessAttrs]) { - m.Log.Debug("filtering processes", "len", len(i)) - o := m.filter(i) - m.Log.Debug("processes matching selection criteria", "len", len(o)) - if len(o) > 0 { - m.Output.Send(o) + for { + select { + case <-ctx.Done(): + m.Log.Debug("context done, stopping node") + return + case i, ok := <-m.Input: + if !ok { + m.Log.Debug("input channel closed, stopping node") + return + } + m.Log.Debug("filtering processes", "len", len(i)) + o := m.filter(i) + m.Log.Debug("processes matching selection criteria", "len", len(o)) + if len(o) > 0 { + m.Output.Send(o) + } + case removedPIDs := <-m.RemovedPIDsNotify: + o := m.syntheticDeletesForRemovedPIDs(removedPIDs) + if len(o) > 0 { + m.Log.Debug("synthetic deletes for removed PIDs", "len", len(o)) + m.Output.Send(o) + } } - }) + } } func (m *Matcher) filter(events []Event[ProcessAttrs]) []Event[ProcessMatch] { @@ -107,6 +139,27 @@ func (m *Matcher) filter(events []Event[ProcessAttrs]) []Event[ProcessMatch] { return matches } +// syntheticDeletesForRemovedPIDs returns EventDeleted for the specific PIDs removed from the +// dynamic selector. This is the matcher side of the edge-based removal path: the selector sends +// the exact removed PIDs, so we can look them up directly instead of doing a level-based scan of +// ProcessHistory against the selector's current PID set. +func (m *Matcher) syntheticDeletesForRemovedPIDs(removedPIDs []app.PID) []Event[ProcessMatch] { + if len(removedPIDs) == 0 { + return nil + } + var out []Event[ProcessMatch] + for _, pid := range removedPIDs { + procMatch, instrumented := m.ProcessHistory[pid] + if !instrumented { + continue + } + delete(m.ProcessHistory, pid) + m.Log.Debug("pid removed from dynamic selector, uninstrumenting", "pid", pid, "comm", procMatch.Process.ExePath) + out = append(out, Event[ProcessMatch]{Type: EventDeleted, Obj: procMatch}) + } + return out +} + func (m *Matcher) alreadyMatched(pid app.PID) bool { _, ok := m.ProcessHistory[pid] return ok @@ -372,10 +425,14 @@ func LogEnricherFindingCriteria(cfg *obi.Config) []services.Selector { return selectors } -func FindingCriteria(cfg *obi.Config) []services.Selector { +// FindingCriteria returns discovery criteria from config. When skipTargetPIDs is true +// (e.g. Instrumenter with DynamicPIDSelector), target_pids are not included here; the matcher +// uses the dynamic selector as a supplement. When skipTargetPIDs is false, target_pids from +// config are included as static criteria when set. +func FindingCriteria(cfg *obi.Config, skipTargetPIDs bool) []services.Selector { logDeprecationAndConflicts(cfg) - if cfg.TargetPIDs.Len() > 0 { + if !skipTargetPIDs && cfg.TargetPIDs.Len() > 0 { vals := cfg.TargetPIDs.AllValues() pids := make([]uint32, 0, len(vals)) for _, v := range vals { diff --git a/pkg/appolly/discover/matcher_test.go b/pkg/appolly/discover/matcher_test.go index 9027166080..f3345a7ad4 100644 --- a/pkg/appolly/discover/matcher_test.go +++ b/pkg/appolly/discover/matcher_test.go @@ -5,6 +5,7 @@ package discover import ( "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -51,7 +52,7 @@ func TestCriteriaMatcher(t *testing.T) { discoveredProcesses := msg.NewQueue[[]Event[ProcessAttrs]](msg.ChannelBufferLen(10)) filteredProcessesQu := msg.NewQueue[[]Event[ProcessMatch]](msg.ChannelBufferLen(10)) filteredProcesses := filteredProcessesQu.Subscribe() - matcherFunc, err := criteriaMatcherProvider(&pipeConfig, discoveredProcesses, filteredProcessesQu)(t.Context()) + matcherFunc, err := criteriaMatcherProvider(&pipeConfig, discoveredProcesses, filteredProcessesQu, FindingCriteria(&pipeConfig, false), nil)(t.Context()) require.NoError(t, err) go matcherFunc(t.Context()) defer filteredProcessesQu.Close() @@ -102,7 +103,7 @@ func TestCriteriaMatcherLanguage(t *testing.T) { discoveredProcesses := msg.NewQueue[[]Event[ProcessAttrs]](msg.ChannelBufferLen(10)) filteredProcessesQu := msg.NewQueue[[]Event[ProcessMatch]](msg.ChannelBufferLen(10)) filteredProcesses := filteredProcessesQu.Subscribe() - matcherFunc, err := criteriaMatcherProvider(&pipeConfig, discoveredProcesses, filteredProcessesQu)(t.Context()) + matcherFunc, err := criteriaMatcherProvider(&pipeConfig, discoveredProcesses, filteredProcessesQu, FindingCriteria(&pipeConfig, false), nil)(t.Context()) require.NoError(t, err) go matcherFunc(t.Context()) defer filteredProcessesQu.Close() @@ -152,7 +153,7 @@ func TestCriteriaMatcher_Exclude(t *testing.T) { discoveredProcesses := msg.NewQueue[[]Event[ProcessAttrs]](msg.ChannelBufferLen(10)) filteredProcessesQu := msg.NewQueue[[]Event[ProcessMatch]](msg.ChannelBufferLen(10)) filteredProcesses := filteredProcessesQu.Subscribe() - matcherFunc, err := criteriaMatcherProvider(&pipeConfig, discoveredProcesses, filteredProcessesQu)(t.Context()) + matcherFunc, err := criteriaMatcherProvider(&pipeConfig, discoveredProcesses, filteredProcessesQu, FindingCriteria(&pipeConfig, false), nil)(t.Context()) require.NoError(t, err) go matcherFunc(t.Context()) defer filteredProcessesQu.Close() @@ -193,7 +194,7 @@ func TestCriteriaMatcher_Exclude_Metadata(t *testing.T) { discoveredProcesses := msg.NewQueue[[]Event[ProcessAttrs]](msg.ChannelBufferLen(10)) filteredProcessesQu := msg.NewQueue[[]Event[ProcessMatch]](msg.ChannelBufferLen(10)) filteredProcesses := filteredProcessesQu.Subscribe() - matcherFunc, err := criteriaMatcherProvider(&pipeConfig, discoveredProcesses, filteredProcessesQu)(t.Context()) + matcherFunc, err := criteriaMatcherProvider(&pipeConfig, discoveredProcesses, filteredProcessesQu, FindingCriteria(&pipeConfig, false), nil)(t.Context()) require.NoError(t, err) go matcherFunc(t.Context()) defer filteredProcessesQu.Close() @@ -241,7 +242,7 @@ func TestCriteriaMatcher_MustMatchAllAttributes(t *testing.T) { discoveredProcesses := msg.NewQueue[[]Event[ProcessAttrs]](msg.ChannelBufferLen(10)) filteredProcessesQu := msg.NewQueue[[]Event[ProcessMatch]](msg.ChannelBufferLen(10)) filteredProcesses := filteredProcessesQu.Subscribe() - matcherFunc, err := criteriaMatcherProvider(&pipeConfig, discoveredProcesses, filteredProcessesQu)(t.Context()) + matcherFunc, err := criteriaMatcherProvider(&pipeConfig, discoveredProcesses, filteredProcessesQu, FindingCriteria(&pipeConfig, false), nil)(t.Context()) require.NoError(t, err) go matcherFunc(t.Context()) defer filteredProcessesQu.Close() @@ -296,7 +297,7 @@ func TestCriteriaMatcherMissingPort(t *testing.T) { discoveredProcesses := msg.NewQueue[[]Event[ProcessAttrs]](msg.ChannelBufferLen(10)) filteredProcessesQu := msg.NewQueue[[]Event[ProcessMatch]](msg.ChannelBufferLen(10)) filteredProcesses := filteredProcessesQu.Subscribe() - matcherFunc, err := criteriaMatcherProvider(&pipeConfig, discoveredProcesses, filteredProcessesQu)(t.Context()) + matcherFunc, err := criteriaMatcherProvider(&pipeConfig, discoveredProcesses, filteredProcessesQu, FindingCriteria(&pipeConfig, false), nil)(t.Context()) require.NoError(t, err) go matcherFunc(t.Context()) defer filteredProcessesQu.Close() @@ -355,7 +356,7 @@ func TestCriteriaMatcherContainersOnly(t *testing.T) { discoveredProcesses := msg.NewQueue[[]Event[ProcessAttrs]](msg.ChannelBufferLen(10)) filteredProcessesQu := msg.NewQueue[[]Event[ProcessMatch]](msg.ChannelBufferLen(10)) filteredProcesses := filteredProcessesQu.Subscribe() - matcherFunc, err := criteriaMatcherProvider(&pipeConfig, discoveredProcesses, filteredProcessesQu)(t.Context()) + matcherFunc, err := criteriaMatcherProvider(&pipeConfig, discoveredProcesses, filteredProcessesQu, FindingCriteria(&pipeConfig, false), nil)(t.Context()) require.NoError(t, err) go matcherFunc(t.Context()) defer filteredProcessesQu.Close() @@ -494,7 +495,7 @@ func TestInstrumentation_CoexistingWithDeprecatedServices(t *testing.T) { discoveredProcesses := msg.NewQueue[[]Event[ProcessAttrs]](msg.ChannelBufferLen(10)) filteredProcessesQu := msg.NewQueue[[]Event[ProcessMatch]](msg.ChannelBufferLen(10)) filteredProcesses := filteredProcessesQu.Subscribe() - matcherFunc, err := criteriaMatcherProvider(&tc.cfg, discoveredProcesses, filteredProcessesQu)(t.Context()) + matcherFunc, err := criteriaMatcherProvider(&tc.cfg, discoveredProcesses, filteredProcessesQu, FindingCriteria(&tc.cfg, false), nil)(t.Context()) require.NoError(t, err) go matcherFunc(t.Context()) defer filteredProcessesQu.Close() @@ -529,7 +530,7 @@ func TestCriteriaMatcher_TargetPIDs(t *testing.T) { discoveredProcesses := msg.NewQueue[[]Event[ProcessAttrs]](msg.ChannelBufferLen(10)) filteredProcessesQu := msg.NewQueue[[]Event[ProcessMatch]](msg.ChannelBufferLen(10)) filteredProcesses := filteredProcessesQu.Subscribe() - matcherFunc, err := criteriaMatcherProvider(&pipeConfig, discoveredProcesses, filteredProcessesQu)(t.Context()) + matcherFunc, err := criteriaMatcherProvider(&pipeConfig, discoveredProcesses, filteredProcessesQu, FindingCriteria(&pipeConfig, false), nil)(t.Context()) require.NoError(t, err) go matcherFunc(t.Context()) defer filteredProcessesQu.Close() @@ -560,7 +561,7 @@ func TestCriteriaMatcher_TargetPIDs(t *testing.T) { discoveredProcesses := msg.NewQueue[[]Event[ProcessAttrs]](msg.ChannelBufferLen(10)) filteredProcessesQu := msg.NewQueue[[]Event[ProcessMatch]](msg.ChannelBufferLen(10)) filteredProcesses := filteredProcessesQu.Subscribe() - matcherFunc, err := criteriaMatcherProvider(&pipeConfig, discoveredProcesses, filteredProcessesQu)(t.Context()) + matcherFunc, err := criteriaMatcherProvider(&pipeConfig, discoveredProcesses, filteredProcessesQu, FindingCriteria(&pipeConfig, false), nil)(t.Context()) require.NoError(t, err) go matcherFunc(t.Context()) defer filteredProcessesQu.Close() @@ -586,6 +587,97 @@ func TestCriteriaMatcher_TargetPIDs(t *testing.T) { }) } +func TestCriteriaMatcher_DynamicTargetPIDs(t *testing.T) { + pipeConfig := obi.Config{ServiceName: "dyn-svc", ServiceNamespace: "ns"} + dynamicSelector := NewDynamicPIDSelector() + dynamicSelector.AddPIDs(42) + configCriteria := FindingCriteria(&pipeConfig, true) + + discoveredProcesses := msg.NewQueue[[]Event[ProcessAttrs]](msg.ChannelBufferLen(10)) + filteredProcessesQu := msg.NewQueue[[]Event[ProcessMatch]](msg.ChannelBufferLen(10)) + filteredProcesses := filteredProcessesQu.Subscribe() + processInfo = func(pp ProcessAttrs) (*services.ProcessInfo, error) { + return &services.ProcessInfo{Pid: pp.pid, ExePath: "/any/exe", OpenPorts: pp.openPorts}, nil + } + matcherFunc, err := criteriaMatcherProvider(&pipeConfig, discoveredProcesses, filteredProcessesQu, configCriteria, dynamicSelector)(t.Context()) + require.NoError(t, err) + go matcherFunc(t.Context()) + defer filteredProcessesQu.Close() + + discoveredProcesses.Send([]Event[ProcessAttrs]{ + {Type: EventCreated, Obj: ProcessAttrs{pid: 42, openPorts: []uint32{}}}, + {Type: EventCreated, Obj: ProcessAttrs{pid: 100, openPorts: []uint32{}}}, + }) + matches := testutil.ReadChannel(t, filteredProcesses, testTimeout) + require.Len(t, matches, 1) + assert.Equal(t, app.PID(42), matches[0].Obj.Process.Pid) + + dynamicSelector.AddPIDs(100) + discoveredProcesses.Send([]Event[ProcessAttrs]{ + {Type: EventCreated, Obj: ProcessAttrs{pid: 100, openPorts: []uint32{}}}, + }) + matches = testutil.ReadChannel(t, filteredProcesses, testTimeout) + require.Len(t, matches, 1) + assert.Equal(t, app.PID(100), matches[0].Obj.Process.Pid) + + dynamicSelector.RemovePIDs(100) + // Matcher sends synthetic EventDeleted for removed PIDs; drain it before asserting no re-match. + deletes := testutil.ReadChannel(t, filteredProcesses, testTimeout) + require.Len(t, deletes, 1) + assert.Equal(t, EventDeleted, deletes[0].Type) + assert.Equal(t, app.PID(100), deletes[0].Obj.Process.Pid) + + discoveredProcesses.Send([]Event[ProcessAttrs]{ + {Type: EventCreated, Obj: ProcessAttrs{pid: 100, openPorts: []uint32{}}}, + }) + testutil.ChannelEmpty(t, filteredProcesses, 100*time.Millisecond) + + // Stop matcher so next test does not race on global processInfo (close input, drain output). + discoveredProcesses.Close() + testutil.DrainUntilClosed(filteredProcesses) +} + +func TestCriteriaMatcher_DynamicTargetPIDs_RemoveNotification(t *testing.T) { + pipeConfig := obi.Config{ServiceName: "dyn-svc", ServiceNamespace: "ns"} + dynamicSelector := NewDynamicPIDSelector() + dynamicSelector.AddPIDs(42, 100) + configCriteria := FindingCriteria(&pipeConfig, true) + + discoveredProcesses := msg.NewQueue[[]Event[ProcessAttrs]](msg.ChannelBufferLen(10)) + filteredProcessesQu := msg.NewQueue[[]Event[ProcessMatch]](msg.ChannelBufferLen(10)) + filteredProcesses := filteredProcessesQu.Subscribe() + processInfo = func(pp ProcessAttrs) (*services.ProcessInfo, error) { + return &services.ProcessInfo{Pid: pp.pid, ExePath: "/any/exe", OpenPorts: pp.openPorts}, nil + } + matcherFunc, err := criteriaMatcherProvider(&pipeConfig, discoveredProcesses, filteredProcessesQu, configCriteria, dynamicSelector)(t.Context()) + require.NoError(t, err) + go matcherFunc(t.Context()) + defer filteredProcessesQu.Close() + + discoveredProcesses.Send([]Event[ProcessAttrs]{ + {Type: EventCreated, Obj: ProcessAttrs{pid: 42}}, + {Type: EventCreated, Obj: ProcessAttrs{pid: 100}}, + }) + matches := testutil.ReadChannel(t, filteredProcesses, testTimeout) + require.Len(t, matches, 2) + + dynamicSelector.RemovePIDs(100) + matches = testutil.ReadChannel(t, filteredProcesses, testTimeout) + require.Len(t, matches, 1) + assert.Equal(t, EventDeleted, matches[0].Type) + assert.Equal(t, app.PID(100), matches[0].Obj.Process.Pid) + + dynamicSelector.RemovePIDs(42) + matches = testutil.ReadChannel(t, filteredProcesses, testTimeout) + require.Len(t, matches, 1) + assert.Equal(t, EventDeleted, matches[0].Type) + assert.Equal(t, app.PID(42), matches[0].Obj.Process.Pid) + + // Stop matcher so next test does not race on global processInfo (close input, drain output). + discoveredProcesses.Close() + testutil.DrainUntilClosed(filteredProcesses) +} + func TestCriteriaMatcher_Granular(t *testing.T) { pipeConfig := obi.Config{} @@ -610,7 +702,7 @@ func TestCriteriaMatcher_Granular(t *testing.T) { filteredProcesses := filteredProcessesQu.Subscribe() - matcherFunc, err := criteriaMatcherProvider(&pipeConfig, discoveredProcesses, filteredProcessesQu)(t.Context()) + matcherFunc, err := criteriaMatcherProvider(&pipeConfig, discoveredProcesses, filteredProcessesQu, FindingCriteria(&pipeConfig, false), nil)(t.Context()) require.NoError(t, err) diff --git a/pkg/appolly/discover/watcher_kube_test.go b/pkg/appolly/discover/watcher_kube_test.go index ac1752d72a..800f8ed8d0 100644 --- a/pkg/appolly/discover/watcher_kube_test.go +++ b/pkg/appolly/discover/watcher_kube_test.go @@ -164,7 +164,7 @@ func TestWatcherKubeEnricherWithMatcher(t *testing.T) { version: "v[0-9]*" `), &pipeConfig)) - swi.Add(criteriaMatcherProvider(&pipeConfig, connectQueue, outputQueue)) + swi.Add(criteriaMatcherProvider(&pipeConfig, connectQueue, outputQueue, FindingCriteria(&pipeConfig, false), nil)) nodesRunner, err := swi.Instance(t.Context()) require.NoError(t, err) diff --git a/pkg/appolly/discover/watcher_proc.go b/pkg/appolly/discover/watcher_proc.go index 34d2ee869e..291b8ecca8 100644 --- a/pkg/appolly/discover/watcher_proc.go +++ b/pkg/appolly/discover/watcher_proc.go @@ -67,8 +67,11 @@ func wplog() *slog.Logger { } // ProcessWatcherFunc polls every PollInterval for new processes and forwards either new or deleted process PIDs -// as well as PIDs from processes that setup a new connection -func ProcessWatcherFunc(cfg *obi.Config, ebpfContext *ebpfcommon.EBPFEventContext, output *msg.Queue[[]Event[ProcessAttrs]]) swarm.RunFunc { +// as well as PIDs from processes that setup a new connection. +// When addedPIDsNotify is non-nil, the watcher receives PIDs that were added to the dynamic selector and +// forgets them from its tracked state so they are re-emitted as new on the next poll (supporting adding +// an already-seen process). +func ProcessWatcherFunc(cfg *obi.Config, ebpfContext *ebpfcommon.EBPFEventContext, output *msg.Queue[[]Event[ProcessAttrs]], findingCriteria []services.Selector, addedPIDsNotify <-chan []app.PID) swarm.RunFunc { acc := pollAccounter{ cfg: cfg, output: output, @@ -82,8 +85,9 @@ func ProcessWatcherFunc(cfg *obi.Config, ebpfContext *ebpfcommon.EBPFEventContex fetchPorts: true, // must be true until we've activated the bpf watcher component bpfWatcherEnabled: false, // async set by listening on the bpfWatchEvents channel stateMux: sync.Mutex{}, - findingCriteria: FindingCriteria(cfg), + findingCriteria: findingCriteria, ebpfContext: ebpfContext, + addedPIDsNotify: addedPIDsNotify, } if acc.interval == 0 { acc.interval = defaultPollInterval @@ -121,6 +125,10 @@ type pollAccounter struct { findingCriteria []services.Selector output *msg.Queue[[]Event[ProcessAttrs]] ebpfContext *ebpfcommon.EBPFEventContext + // when non-nil, PIDs received here are removed from pids/pidPorts so they are re-emitted as new on next poll + addedPIDsNotify <-chan []app.PID + // pidsMu protects pids and pidPorts so the addedPIDsNotify goroutine can call forgetPIDs while snapshot runs + pidsMu sync.Mutex } func (pa *pollAccounter) run(ctx context.Context) { @@ -144,6 +152,10 @@ func (pa *pollAccounter) run(ctx context.Context) { go pa.watchForProcessEvents(ctx, log, bpfWatchEvents) + if pa.addedPIDsNotify != nil { + go pa.runAddedPIDsNotify(ctx, log) + } + for { procs, err := pa.listProcesses(pa.portFetchRequired()) if err != nil { @@ -159,7 +171,42 @@ func (pa *pollAccounter) run(ctx context.Context) { log.Debug("context canceled. Exiting") return case <-time.After(pa.interval): - // poll event starting again + // poll again + } + } +} + +// runAddedPIDsNotify runs in a goroutine; it receives PIDs added to the dynamic selector +// and calls forgetPIDs so they are re-emitted as new on the next poll. +func (pa *pollAccounter) runAddedPIDsNotify(ctx context.Context, log *slog.Logger) { + for { + select { + case <-ctx.Done(): + return + case pids, ok := <-pa.addedPIDsNotify: + if !ok { + return + } + pa.forgetPIDs(pids) + log.Debug("forgot PIDs so they can be re-emitted as new", "pids", pids) + } + } +} + +// forgetPIDs removes the given PIDs from the watcher's tracked state so they will be +// reported as new on the next poll (e.g. when added to the dynamic PID selector). +func (pa *pollAccounter) forgetPIDs(pids []app.PID) { + pa.pidsMu.Lock() + defer pa.pidsMu.Unlock() + for _, pid := range pids { + delete(pa.pids, pid) + } + for pp := range pa.pidPorts { + for _, pid := range pids { + if pp.Pid == pid { + delete(pa.pidPorts, pp) + break + } } } } @@ -228,6 +275,8 @@ func (pa *pollAccounter) processTooNew(proc ProcessAttrs) bool { // snapshot compares the current processes with the status of the previous poll // and forwards a list of process creation/deletion events func (pa *pollAccounter) snapshot(fetchedProcs map[app.PID]ProcessAttrs) []Event[ProcessAttrs] { + pa.pidsMu.Lock() + defer pa.pidsMu.Unlock() log := wplog() var events []Event[ProcessAttrs] currentPidPorts := make(map[pidPort]ProcessAttrs, len(fetchedProcs)) diff --git a/pkg/appolly/discover/watcher_proc_test.go b/pkg/appolly/discover/watcher_proc_test.go index 9577e10cbc..d8448bf766 100644 --- a/pkg/appolly/discover/watcher_proc_test.go +++ b/pkg/appolly/discover/watcher_proc_test.go @@ -202,7 +202,7 @@ func TestPortsFetchRequired(t *testing.T) { stateMux: sync.Mutex{}, bpfWatcherEnabled: false, fetchPorts: true, - findingCriteria: FindingCriteria(cfg), + findingCriteria: FindingCriteria(cfg, false), output: msg.NewQueue[[]Event[ProcessAttrs]](msg.ChannelBufferLen(1)), } @@ -300,7 +300,7 @@ func TestMinProcessAge(t *testing.T) { stateMux: sync.Mutex{}, bpfWatcherEnabled: false, fetchPorts: true, - findingCriteria: FindingCriteria(cfg), + findingCriteria: FindingCriteria(cfg, false), output: msg.NewQueue[[]Event[ProcessAttrs]](msg.ChannelBufferLen(1)), } @@ -328,3 +328,50 @@ func TestMinProcessAge(t *testing.T) { assert.True(t, ok) assert.False(t, acc.processTooNew(process)) } + +// TestForgetPIDs_ReemitsExistingProcess verifies that when a PID was already seen by the watcher, +// sending it on addedPIDsNotify (forget) causes the next poll to emit EventCreated again. +// This supports the use case of adding an existing process to the dynamic PID selector. +func TestForgetPIDs_ReemitsExistingProcess(t *testing.T) { + p1 := ProcessAttrs{pid: 1, openPorts: []uint32{3030}} + p2 := ProcessAttrs{pid: 2, openPorts: []uint32{123}} + addedCh := make(chan []app.PID, 1) + acc := pollAccounter{ + interval: time.Hour, + cfg: &obi.Config{}, + pids: map[app.PID]ProcessAttrs{}, + pidPorts: map[pidPort]ProcessAttrs{}, + listProcesses: func(bool) (map[app.PID]ProcessAttrs, error) { + return map[app.PID]ProcessAttrs{p1.pid: p1, p2.pid: p2}, nil + }, + executableReady: func(app.PID) (string, bool) { + return "", true + }, + loadBPFWatcher: func(context.Context, *ebpfcommon.EBPFEventContext, *obi.Config, chan<- watcher.Event) error { + return nil + }, + loadBPFLogger: func(context.Context, *ebpfcommon.EBPFEventContext, *obi.Config) error { + return nil + }, + addedPIDsNotify: addedCh, + } + procs, err := acc.listProcesses(false) + require.NoError(t, err) + events := acc.snapshot(procs) + require.Len(t, events, 2) + assert.Equal(t, EventCreated, sort(events)[0].Type) + assert.Equal(t, EventCreated, sort(events)[1].Type) + // Second snapshot: no new events (already seen) + events2 := acc.snapshot(procs) + assert.Empty(t, events2) + // Forget p1 only + acc.forgetPIDs([]app.PID{1}) + // Next snapshot: p1 should appear as created again + events3 := acc.snapshot(procs) + require.Len(t, events3, 1) + assert.Equal(t, EventCreated, events3[0].Type) + assert.Equal(t, app.PID(1), events3[0].Obj.pid) + // p2 still not re-emitted + events4 := acc.snapshot(procs) + assert.Empty(t, events4) +} diff --git a/pkg/appolly/services/attr_glob.go b/pkg/appolly/services/attr_glob.go index 5f2242a88c..0ac9ded1ed 100644 --- a/pkg/appolly/services/attr_glob.go +++ b/pkg/appolly/services/attr_glob.go @@ -91,7 +91,7 @@ type GlobAttributes struct { // programming language they are written in. Use lowercase names, e.g. java,go Languages GlobAttr `yaml:"languages"` - // PIDs allows selecting processes by PID. When non-empty, the process PID must be in this list (in addition to any path/port criteria). + // PIDs allows selecting processes by PID (static from config). When non-empty, the process PID must be in this list. PIDs []uint32 `yaml:"target_pids"` // Path allows defining the regular expression matching the full executable path. diff --git a/pkg/instrumenter/instrumenter_test.go b/pkg/instrumenter/instrumenter_test.go index 5ee45820ef..4b7f3bd9ab 100644 --- a/pkg/instrumenter/instrumenter_test.go +++ b/pkg/instrumenter/instrumenter_test.go @@ -4,11 +4,17 @@ package instrumenter import ( + "context" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/obi/pkg/appolly/app" + "go.opentelemetry.io/obi/pkg/appolly/discover" + "go.opentelemetry.io/obi/pkg/appolly/services" + "go.opentelemetry.io/obi/pkg/export/otel/otelcfg" "go.opentelemetry.io/obi/pkg/obi" "go.opentelemetry.io/obi/pkg/transform" ) @@ -39,3 +45,38 @@ func TestServiceNameTemplate(t *testing.T) { require.NoError(t, err) assert.Nil(t, temp) } + +// TestRun_WithDynamicPIDSelector verifies that when the caller passes a selector via +// WithDynamicPIDSelector, Run uses it and the caller can add/remove PIDs on it directly— +// no callback or reference to the instrumenter is needed. +func TestRun_WithDynamicPIDSelector(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + sel := discover.NewDynamicPIDSelector() + cfg := &obi.Config{ + ChannelBufferLen: 1, + Traces: otelcfg.TracesConfig{TracesEndpoint: "http://localhost:0"}, + Discovery: services.DiscoveryConfig{ + Instrument: services.GlobDefinitionCriteria{ + services.GlobAttributes{Name: "test-svc", OpenPorts: services.IntEnum{Ranges: []services.IntRange{{Start: 8080}}}}, + }, + }, + } + require.True(t, cfg.Enabled(obi.FeatureAppO11y), "test config must enable App O11y") + + opts := []Option{WithDynamicPIDSelector(sel)} + done := make(chan error, 1) + go func() { done <- Run(ctx, cfg, opts...) }() + + time.Sleep(2 * time.Second) + sel.AddPIDs(42, 100) + sel.AddPIDs(42, 200) + sel.RemovePIDs(42) + sel.RemovePIDs(999) + pids, ok := sel.GetPIDs() + require.True(t, ok) + assert.Equal(t, []app.PID{100, 200}, pids) + cancel() + <-done +} diff --git a/pkg/instrumenter/opts.go b/pkg/instrumenter/opts.go index c3ff2b8876..f0f5f0e5a4 100644 --- a/pkg/instrumenter/opts.go +++ b/pkg/instrumenter/opts.go @@ -5,6 +5,7 @@ package instrumenter // import "go.opentelemetry.io/obi/pkg/instrumenter" import ( "go.opentelemetry.io/obi/pkg/appolly/app/request" + "go.opentelemetry.io/obi/pkg/appolly/discover" "go.opentelemetry.io/obi/pkg/pipe/global" "go.opentelemetry.io/obi/pkg/pipe/msg" ) @@ -12,6 +13,15 @@ import ( // Option that override the instantiation of the instrumenter type Option func(info *global.ContextInfo) +// WithDynamicPIDSelector passes the given dynamic PID selector into the App O11y pipeline. The caller +// creates it with discover.NewDynamicPIDSelector(), passes it here, and then calls AddPIDs/RemovePIDs/GetPIDs +// on it directly—no callback or reference to the instrumenter is needed. +func WithDynamicPIDSelector(sel *discover.DynamicPIDSelector) Option { + return func(info *global.ContextInfo) { + info.AppO11y.DynamicPIDSelector = sel + } +} + // OverrideAppExportQueue allows to override the queue used to export the spans. // This is useful to run the instrumenter in vendored mode, and you want to provide your // own spans exporter. diff --git a/pkg/internal/appolly/appolly.go b/pkg/internal/appolly/appolly.go index be34a06f06..511bd8b0fa 100644 --- a/pkg/internal/appolly/appolly.go +++ b/pkg/internal/appolly/appolly.go @@ -51,7 +51,9 @@ type Instrumenter struct { // global data structures for all eBPF tracers ebpfEventContext *ebpfcommon.EBPFEventContext - finishers []finisher + // dynamicPIDSelector is the runtime PID set; from WithDynamicPIDSelector or created in New. Finder preloads from config. + dynamicPIDSelector *discover.DynamicPIDSelector + finishers []finisher } type finisher struct { @@ -96,16 +98,26 @@ func New(ctx context.Context, ctxInfo *global.ContextInfo, config *obi.Config) ( return nil, fmt.Errorf("can't instantiate instrumentation pipeline: %w", err) } - return &Instrumenter{ - config: config, - ctxInfo: ctxInfo, - tracersWg: &sync.WaitGroup{}, - tracesInput: tracesInput, - processEventInput: processEventsInput, - bp: bp, - peGraphBuilder: swi, - ebpfEventContext: ebpfcommon.NewEBPFEventContext(), - }, nil + var sel *discover.DynamicPIDSelector + if v := ctxInfo.AppO11y.DynamicPIDSelector; v != nil { + if s, ok := v.(*discover.DynamicPIDSelector); ok { + sel = s + } + // If v is not a *DynamicPIDSelector, sel stays nil and we use static config target_pids. + } + // When sel is nil, finder gets nil: config target_pids are used as static criteria (FindingCriteria(cfg, false)). + instr := &Instrumenter{ + config: config, + ctxInfo: ctxInfo, + tracersWg: &sync.WaitGroup{}, + tracesInput: tracesInput, + processEventInput: processEventsInput, + bp: bp, + peGraphBuilder: swi, + ebpfEventContext: ebpfcommon.NewEBPFEventContext(), + dynamicPIDSelector: sel, + } + return instr, nil } // FindAndInstrument searches in background for any new executable matching the @@ -114,7 +126,10 @@ func New(ctx context.Context, ctxInfo *global.ContextInfo, config *obi.Config) ( // This is: when the context is cancelled, it has unloaded all the eBPF probes. func (i *Instrumenter) FindAndInstrument(ctx context.Context) error { finder := discover.NewProcessFinder(i.config, i.ctxInfo, i.tracesInput, i.ebpfEventContext) - processEvents, err := finder.Start(ctx) + opts := []discover.ProcessFinderStartOpt{ + discover.WithDynamicPIDSelector(i.dynamicPIDSelector), + } + processEvents, err := finder.Start(ctx, opts...) if err != nil { return fmt.Errorf("couldn't start Process Finder: %w", err) } @@ -251,5 +266,5 @@ func refreshK8sInformerCache(ctx context.Context, ctxInfo *global.ContextInfo) e } func (i *Instrumenter) processEventsPipeline(ctx context.Context, graph *swarm.Runner) { - graph.Start(ctx, swarm.WithCancelTimeout(i.config.ShutdownTimeout)) // zurulao + graph.Start(ctx, swarm.WithCancelTimeout(i.config.ShutdownTimeout)) } diff --git a/pkg/internal/appolly/appolly_test.go b/pkg/internal/appolly/appolly_test.go index 406b2053d9..5beac431c3 100644 --- a/pkg/internal/appolly/appolly_test.go +++ b/pkg/internal/appolly/appolly_test.go @@ -7,6 +7,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.opentelemetry.io/obi/pkg/appolly/app" "go.opentelemetry.io/obi/pkg/appolly/discover" @@ -45,3 +46,27 @@ func TestProcessEventsLoopDoesntBlock(t *testing.T) { assert.NoError(t, err) } + +// TestInstrumenter_WithDynamicPIDSelector verifies that when the caller passes a selector via +// ContextInfo.AppO11y.DynamicPIDSelector, New uses it and the caller can add/remove PIDs on it directly. +func TestInstrumenter_WithDynamicPIDSelector(t *testing.T) { + sel := discover.NewDynamicPIDSelector() + ctxInfo := &global.ContextInfo{ + Prometheus: &connector.PrometheusManager{}, + AppO11y: global.AppO11y{DynamicPIDSelector: sel}, + } + _, err := New( + t.Context(), + ctxInfo, + &obi.Config{ChannelBufferLen: 1, Traces: otelcfg.TracesConfig{TracesEndpoint: "http://localhost"}}, + ) + require.NoError(t, err) + + sel.AddPIDs(1, 2, 3) + sel.AddPIDs(2, 4) + sel.RemovePIDs(2) + sel.RemovePIDs(99) + pids, ok := sel.GetPIDs() + require.True(t, ok) + assert.Equal(t, []app.PID{1, 3, 4}, pids) +} diff --git a/pkg/internal/testutil/channels.go b/pkg/internal/testutil/channels.go index d6b9196d52..719fdee19d 100644 --- a/pkg/internal/testutil/channels.go +++ b/pkg/internal/testutil/channels.go @@ -35,3 +35,13 @@ func ChannelEmpty[T any](t *testing.T, inCh <-chan T, timeout time.Duration) { // ok, channel is empty! } } + +// DrainUntilClosed reads from ch until it is closed. Use when a producer (e.g. a goroutine) closes +// its output on exit and you need to wait for it to finish before leaving the test. +func DrainUntilClosed[T any](ch <-chan T) { + for { + if _, ok := <-ch; !ok { + return + } + } +} diff --git a/pkg/pipe/global/context.go b/pkg/pipe/global/context.go index 3e07eaa4ee..11d1223d0e 100644 --- a/pkg/pipe/global/context.go +++ b/pkg/pipe/global/context.go @@ -64,4 +64,8 @@ type ContextInfo struct { type AppO11y struct { // ReportRoutes sets whether the metrics should set the http.route attribute ReportRoutes bool + // DynamicPIDSelector, when set, is the runtime PID set used for discovery. The caller creates it + // (e.g. discover.NewDynamicPIDSelector()), passes it via instrumenter.WithDynamicPIDSelector, and + // calls AddPIDs/RemovePIDs/GetPIDs on it directly. The instrumenter does not implement an updater interface. + DynamicPIDSelector any }