Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 39 additions & 7 deletions pkg/appolly/discover/dynamic_pid_selector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,48 @@
package discover

import (
"context"
"slices"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/obi/pkg/appolly/app"
)

// pidMultisetEqual reports whether a and b contain the same PIDs with the same multiplicity.
func pidMultisetEqual(a, b []app.PID) bool {
if len(a) != len(b) {
return false
}
sa := slices.Clone(a)
sb := slices.Clone(b)
slices.Sort(sa)
slices.Sort(sb)
return slices.Equal(sa, sb)
}

// readPIDNotifyBatchesUntil reads from ch until the concatenation of batches matches want
// as a multiset (order of batches and within batches does not matter).
func readPIDNotifyBatchesUntil(t *testing.T, ch <-chan []app.PID, want []app.PID) {
ctx, cancel := context.WithTimeout(t.Context(), 2*time.Second)
defer cancel()
var got []app.PID
for !pidMultisetEqual(got, want) {
if len(got) > len(want) {
t.Fatalf("unexpected extra PID notify batches: got %v want %v", got, want)
}
select {
case b := <-ch:
got = append(got, b...)
case <-ctx.Done():
t.Fatalf("timeout reading notify batches: got %v want %v", got, want)
}
}
}

func TestDynamicPIDSelector_AddPIDs_RemovePIDs_GetPIDs(t *testing.T) {
d := NewDynamicPIDSelector()
pids, ok := d.GetPIDs()
Expand Down Expand Up @@ -74,24 +108,22 @@ func TestDynamicPIDSelector_AddPIDs_Notify(t *testing.T) {
assert.Equal(t, []app.PID{99}, got)
}

// TestDynamicPIDSelector_QueueNoDrop verifies that rapid AddPIDs/RemovePIDs accumulate
// in a single pending slice and are sent together when the consumer drains (no drops).
// TestDynamicPIDSelector_QueueNoDrop verifies that rapid AddPIDs/RemovePIDs are all delivered
// on the notify channels (nothing dropped). With a buffered notify channel, one logical burst can
// span multiple receives; the consumer must drain until the expected multiset is complete.
func TestDynamicPIDSelector_QueueNoDrop(t *testing.T) {
d := NewDynamicPIDSelector()
d.AddPIDs(1, 2, 3, 4)
removedCh := d.RemovedNotify()
addedCh := d.AddedPIDsNotify()

// Drain the initial AddPIDs(1,2,3,4)
<-addedCh

d.RemovePIDs(1)
d.RemovePIDs(2, 3)
gotRemoved := <-removedCh
assert.ElementsMatch(t, []app.PID{1, 2, 3}, gotRemoved)
readPIDNotifyBatchesUntil(t, removedCh, []app.PID{1, 2, 3})

d.AddPIDs(10, 20)
d.AddPIDs(30)
gotAdded := <-addedCh
assert.ElementsMatch(t, []app.PID{10, 20, 30}, gotAdded)
readPIDNotifyBatchesUntil(t, addedCh, []app.PID{10, 20, 30})
}
4 changes: 3 additions & 1 deletion pkg/appolly/discover/finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (pf *ProcessFinder) Start(ctx context.Context, opts ...ProcessFinderStartOp

tracerEvents := msgh.QueueFromConfig[Event[*ebpf.Instrumentable]](pf.cfg, "tracerEvents")

configCriteria := FindingCriteria(pf.cfg, startConfig.dynamicPIDSelector != nil)
configCriteria := FindingCriteria(pf.cfg)

swi := swarm.Instancer{}
processEvents := msgh.QueueFromConfig[[]Event[ProcessAttrs]](pf.cfg, "processEvents")
Expand Down Expand Up @@ -115,6 +115,8 @@ func (pf *ProcessFinder) Start(ctx context.Context, opts ...ProcessFinderStartOp
criteriaFilteredEvents := msgh.QueueFromConfig[[]Event[ProcessMatch]](pf.cfg, "criteriaFilteredEvents")
swi.Add(criteriaMatcherProvider(pf.cfg, langEnrichedEvents, criteriaFilteredEvents, configCriteria, startConfig.dynamicPIDSelector),
swarm.WithID("CriteriaMatcher"))
swi.Add(dynamicMatcherProvider(langEnrichedEvents, criteriaFilteredEvents, startConfig.dynamicPIDSelector),
swarm.WithID("DynamicMatcher"))

executableTypes := msgh.QueueFromConfig[[]Event[ebpf.Instrumentable]](pf.cfg, "executableTypes")
swi.Add(ExecTyperProvider(pf.cfg, pf.ctxInfo.Metrics, pf.ctxInfo.K8sInformer, criteriaFilteredEvents, executableTypes),
Expand Down
82 changes: 18 additions & 64 deletions pkg/appolly/discover/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"go.opentelemetry.io/obi/pkg/obi"
"go.opentelemetry.io/obi/pkg/pipe/msg"
"go.opentelemetry.io/obi/pkg/pipe/swarm"
"go.opentelemetry.io/obi/pkg/pipe/swarm/swarms"
)

var (
Expand All @@ -29,8 +30,8 @@ var (
)

// criteriaMatcherProvider filters the processes that match the discovery criteria.
// When dynamicSelector is non-nil, runtime PIDs supplement config criteria and the matcher
// listens to the selector's RemovedNotify() channel for synthetic deletes.
// When dynamicSelector is non-nil, only the dynamic selector is used (no config criteria),
// and the matcher listens to the selector's RemovedNotify() channel for synthetic deletes.
func criteriaMatcherProvider(
cfg *obi.Config,
input *msg.Queue[[]Event[ProcessAttrs]],
Expand All @@ -39,24 +40,21 @@ func criteriaMatcherProvider(
dynamicSelector *DynamicPIDSelector,
) swarm.InstanceFunc {
instrumenterNamespace, _ := namespaceFetcherFunc(app.PID(osPidFunc()))
var removedNotify <-chan []app.PID
criteria := configCriteria
if dynamicSelector != nil {
removedNotify = dynamicSelector.RemovedNotify()
criteria = append([]services.Selector{dynamicSelector.AsSelector()}, configCriteria...)
emptyFunc, _ := swarm.EmptyRunFunc()
return swarm.DirectInstance(emptyFunc)
}

m := &Matcher{
Log: slog.With("component", "discover.CriteriaMatcher"),
Criteria: criteria,
Criteria: configCriteria,
ExcludeCriteria: ExcludingCriteria(cfg),
LogEnricherCriteria: LogEnricherFindingCriteria(cfg),
ProcessHistory: map[app.PID]ProcessMatch{},
Input: input.Subscribe(msg.SubscriberName("discover.CriteriaMatcher")),
Output: output,
Namespace: instrumenterNamespace,
HasHostPidAccess: hasHostPidAccess(),
DynamicPIDs: dynamicSelector,
RemovedPIDsNotify: removedNotify,
}
return swarm.DirectInstance(m.Run)
}
Expand All @@ -76,11 +74,6 @@ type Matcher struct {
Output *msg.Queue[[]Event[ProcessMatch]]
Namespace string
HasHostPidAccess bool
// DynamicPIDs, when set, supplements config criteria: a process also matches if its PID is in this set.
DynamicPIDs *DynamicPIDSelector
// RemovedPIDsNotify, when set, carries the PIDs removed from the dynamic selector so the
// matcher can emit targeted synthetic deletes without rescanning ProcessHistory.
RemovedPIDsNotify <-chan []app.PID
}

// ProcessMatch matches a found process with the first selection criteria it fulfilled.
Expand All @@ -97,30 +90,14 @@ func (pm ProcessMatch) LogEnricherEnabled() bool {
func (m *Matcher) Run(ctx context.Context) {
defer m.Output.Close()
m.Log.Debug("starting criteria matcher node")
for {
select {
case <-ctx.Done():
m.Log.Debug("context done, stopping node")
return
case i, ok := <-m.Input:
if !ok {
m.Log.Debug("input channel closed, stopping node")
return
}
m.Log.Debug("filtering processes", "len", len(i))
o := m.filter(i)
m.Log.Debug("processes matching selection criteria", "len", len(o))
if len(o) > 0 {
m.Output.Send(o)
}
case removedPIDs := <-m.RemovedPIDsNotify:
o := m.syntheticDeletesForRemovedPIDs(removedPIDs)
if len(o) > 0 {
m.Log.Debug("synthetic deletes for removed PIDs", "len", len(o))
m.Output.Send(o)
}
swarms.ForEachInput(ctx, m.Input, m.Log.Debug, func(i []Event[ProcessAttrs]) {
m.Log.Debug("filtering processes", "len", len(i))
o := m.filter(i)
m.Log.Debug("processes matching selection criteria", "len", len(o))
if len(o) > 0 {
m.Output.Send(o)
}
}
})
}

func (m *Matcher) filter(events []Event[ProcessAttrs]) []Event[ProcessMatch] {
Expand All @@ -139,27 +116,6 @@ func (m *Matcher) filter(events []Event[ProcessAttrs]) []Event[ProcessMatch] {
return matches
}

// syntheticDeletesForRemovedPIDs returns EventDeleted for the specific PIDs removed from the
// dynamic selector. This is the matcher side of the edge-based removal path: the selector sends
// the exact removed PIDs, so we can look them up directly instead of doing a level-based scan of
// ProcessHistory against the selector's current PID set.
func (m *Matcher) syntheticDeletesForRemovedPIDs(removedPIDs []app.PID) []Event[ProcessMatch] {
if len(removedPIDs) == 0 {
return nil
}
var out []Event[ProcessMatch]
for _, pid := range removedPIDs {
procMatch, instrumented := m.ProcessHistory[pid]
if !instrumented {
continue
}
delete(m.ProcessHistory, pid)
m.Log.Debug("pid removed from dynamic selector, uninstrumenting", "pid", pid, "comm", procMatch.Process.ExePath)
out = append(out, Event[ProcessMatch]{Type: EventDeleted, Obj: procMatch})
}
return out
}

func (m *Matcher) alreadyMatched(pid app.PID) bool {
_, ok := m.ProcessHistory[pid]
return ok
Expand Down Expand Up @@ -268,6 +224,7 @@ func (m *Matcher) matchProcess(obj *ProcessAttrs, p *services.ProcessInfo, a ser
if pids, ok := a.GetPIDs(); ok && len(pids) > 0 {
return pidInList(p.Pid, pids)
}

if !a.GetPath().IsSet() && !a.GetLanguages().IsSet() && !a.GetCmdArgs().IsSet() && a.GetOpenPorts().Len() == 0 && len(obj.metadata) == 0 {
pids, hasPIDs := a.GetPIDs()
if !hasPIDs || len(pids) == 0 {
Expand Down Expand Up @@ -425,14 +382,11 @@ func LogEnricherFindingCriteria(cfg *obi.Config) []services.Selector {
return selectors
}

// FindingCriteria returns discovery criteria from config. When skipTargetPIDs is true
// (e.g. Instrumenter with DynamicPIDSelector), target_pids are not included here; the matcher
// uses the dynamic selector as a supplement. When skipTargetPIDs is false, target_pids from
// config are included as static criteria when set.
func FindingCriteria(cfg *obi.Config, skipTargetPIDs bool) []services.Selector {
// FindingCriteria returns discovery criteria from config.
func FindingCriteria(cfg *obi.Config) []services.Selector {
logDeprecationAndConflicts(cfg)

if !skipTargetPIDs && cfg.TargetPIDs.Len() > 0 {
if cfg.TargetPIDs.Len() > 0 {
vals := cfg.TargetPIDs.AllValues()
pids := make([]uint32, 0, len(vals))
for _, v := range vals {
Expand Down
Loading
Loading