Skip to content
Merged
67 changes: 47 additions & 20 deletions pkg/components/discover/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func CriteriaMatcherProvider(
Log: slog.With("component", "discover.CriteriaMatcher"),
Criteria: FindingCriteria(cfg),
ExcludeCriteria: ExcludingCriteria(cfg),
ProcessHistory: map[PID]*services.ProcessInfo{},
ProcessHistory: map[PID]ProcessMatch{},
Input: input.Subscribe(),
Output: output,
Namespace: beylaNamespace,
Expand All @@ -52,7 +52,7 @@ type Matcher struct {
// ProcessHistory keeps track of the processes that have been already matched and submitted for
// instrumentation.
// This avoids keep inspecting again and again client processes each time they open a new connection port
ProcessHistory map[PID]*services.ProcessInfo
ProcessHistory map[PID]ProcessMatch
Input <-chan []Event[ProcessAttrs]
Output *msg.Queue[[]Event[ProcessMatch]]
Namespace string
Expand All @@ -61,7 +61,7 @@ type Matcher struct {

// ProcessMatch matches a found process with the first selection criteria it fulfilled.
type ProcessMatch struct {
Criteria services.Selector
Criteria []services.Selector
Process *services.ProcessInfo
}

Expand Down Expand Up @@ -104,51 +104,78 @@ func (m *Matcher) filter(events []Event[ProcessAttrs]) []Event[ProcessMatch] {
return matches
}

func (m *Matcher) alreadyMatched(pid PID) bool {
_, ok := m.ProcessHistory[pid]
return ok
}

func (m *Matcher) matchCriteria(obj ProcessAttrs, proc *services.ProcessInfo) *ProcessMatch {
criteria := make([]services.Selector, 0, len(m.Criteria))

for i := range m.Criteria {
if m.matchProcess(&obj, proc, m.Criteria[i]) && !m.isExcluded(&obj, proc) {
criteria = append(criteria, m.Criteria[i])
}
}

if len(criteria) > 0 {
m.Log.Debug("found process", "pid", proc.Pid, "comm", proc.ExePath, "metadata",
obj.metadata, "podLabels", obj.podLabels, "criteria", criteria)

return &ProcessMatch{Criteria: criteria, Process: proc}
}

return nil
}

func (m *Matcher) filterCreated(obj ProcessAttrs) (Event[ProcessMatch], bool) {
if _, ok := m.ProcessHistory[obj.pid]; ok {
// this was already matched and submitted for inspection. Ignoring!
if m.alreadyMatched(obj.pid) {
return Event[ProcessMatch]{}, false
}

proc, err := processInfo(obj)
if err != nil {
m.Log.Debug("can't get information for process", "pid", obj.pid, "error", err)
return Event[ProcessMatch]{}, false
}
for i := range m.Criteria {
if m.matchProcess(&obj, proc, m.Criteria[i]) && !m.isExcluded(&obj, proc) {
m.Log.Debug("found process", "pid", proc.Pid, "comm", proc.ExePath, "metadata", obj.metadata, "podLabels", obj.podLabels, "criteria", m.Criteria[i])
m.ProcessHistory[obj.pid] = proc
return Event[ProcessMatch]{
Type: EventCreated,
Obj: ProcessMatch{Criteria: m.Criteria[i], Process: proc},
}, true
}

if processMatch := m.matchCriteria(obj, proc); processMatch != nil {
m.ProcessHistory[obj.pid] = *processMatch

return Event[ProcessMatch]{
Type: EventCreated,
Obj: *processMatch,
}, true
}

// We didn't match the process, but let's see if the parent PID is tracked, it might be the child hasn't opened the port yet
if _, ok := m.ProcessHistory[PID(proc.PPid)]; ok {
if procMatch, ok := m.ProcessHistory[PID(proc.PPid)]; ok {
m.Log.Debug("found process by matching the process parent id", "pid", proc.Pid, "ppid", proc.PPid, "comm", proc.ExePath, "metadata", obj.metadata)
m.ProcessHistory[obj.pid] = proc

procMatch.Process = proc

m.ProcessHistory[obj.pid] = procMatch

return Event[ProcessMatch]{
Type: EventCreated,
Obj: ProcessMatch{Criteria: m.Criteria[0], Process: proc},
Obj: procMatch,
}, true
}

return Event[ProcessMatch]{}, false
}

func (m *Matcher) filterDeleted(obj ProcessAttrs) (Event[ProcessMatch], bool) {
proc, ok := m.ProcessHistory[obj.pid]
procMatch, ok := m.ProcessHistory[obj.pid]
if !ok {
m.Log.Debug("deleted untracked process. Ignoring", "pid", obj.pid)
return Event[ProcessMatch]{}, false
}
delete(m.ProcessHistory, obj.pid)
m.Log.Debug("stopped process", "pid", proc.Pid, "comm", proc.ExePath)
m.Log.Debug("stopped process", "pid", procMatch.Process.Pid, "comm", procMatch.Process.ExePath)
return Event[ProcessMatch]{
Type: EventDeleted,
Obj: ProcessMatch{Process: proc},
Obj: procMatch,
}, true
}

Expand Down
208 changes: 147 additions & 61 deletions pkg/components/discover/matcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,16 @@ import (
"github.com/open-telemetry/opentelemetry-ebpf-instrumentation/pkg/services"
)

func testMatch(t *testing.T, m Event[ProcessMatch], name string,
namespace string, proc services.ProcessInfo,
) {
assert.Equal(t, EventCreated, m.Type)
require.Len(t, m.Obj.Criteria, 1)
assert.Equal(t, name, m.Obj.Criteria[0].GetName())
assert.Equal(t, namespace, m.Obj.Criteria[0].GetNamespace())
assert.Equal(t, proc, *m.Obj.Process)
}

func TestCriteriaMatcher(t *testing.T) {
pipeConfig := obi.Config{}
require.NoError(t, yaml.Unmarshal([]byte(`discovery:
Expand Down Expand Up @@ -54,26 +64,11 @@ func TestCriteriaMatcher(t *testing.T) {

matches := testutil.ReadChannel(t, filteredProcesses, testTimeout)
require.Len(t, matches, 4)
m := matches[0]
assert.Equal(t, EventCreated, m.Type)
assert.Equal(t, "exec-only", m.Obj.Criteria.GetName())
assert.Empty(t, m.Obj.Criteria.GetNamespace())
assert.Equal(t, services.ProcessInfo{Pid: 1, ExePath: "/bin/weird33", OpenPorts: []uint32{1, 2, 3}}, *m.Obj.Process)
m = matches[1]
assert.Equal(t, EventCreated, m.Type)
assert.Equal(t, "port-only", m.Obj.Criteria.GetName())
assert.Equal(t, "foo", m.Obj.Criteria.GetNamespace())
assert.Equal(t, services.ProcessInfo{Pid: 4, ExePath: "/bin/something", OpenPorts: []uint32{8083}}, *m.Obj.Process)
m = matches[2]
assert.Equal(t, EventCreated, m.Type)
assert.Equal(t, "both", m.Obj.Criteria.GetName())
assert.Empty(t, m.Obj.Criteria.GetNamespace())
assert.Equal(t, services.ProcessInfo{Pid: 5, ExePath: "server", OpenPorts: []uint32{443}}, *m.Obj.Process)
m = matches[3]
assert.Equal(t, EventCreated, m.Type)
assert.Equal(t, "exec-only", m.Obj.Criteria.GetName())
assert.Empty(t, m.Obj.Criteria.GetNamespace())
assert.Equal(t, services.ProcessInfo{Pid: 6, ExePath: "/bin/clientweird99"}, *m.Obj.Process)

testMatch(t, matches[0], "exec-only", "", services.ProcessInfo{Pid: 1, ExePath: "/bin/weird33", OpenPorts: []uint32{1, 2, 3}})
testMatch(t, matches[1], "port-only", "foo", services.ProcessInfo{Pid: 4, ExePath: "/bin/something", OpenPorts: []uint32{8083}})
testMatch(t, matches[2], "both", "", services.ProcessInfo{Pid: 5, ExePath: "server", OpenPorts: []uint32{443}})
testMatch(t, matches[3], "exec-only", "", services.ProcessInfo{Pid: 6, ExePath: "/bin/clientweird99"})
}

func TestCriteriaMatcher_Exclude(t *testing.T) {
Expand Down Expand Up @@ -119,16 +114,9 @@ func TestCriteriaMatcher_Exclude(t *testing.T) {

matches := testutil.ReadChannel(t, filteredProcesses, testTimeout)
require.Len(t, matches, 2)
m := matches[0]
assert.Equal(t, EventCreated, m.Type)
assert.Equal(t, "exec-only", m.Obj.Criteria.GetName())
assert.Empty(t, m.Obj.Criteria.GetNamespace())
assert.Equal(t, services.ProcessInfo{Pid: 1, ExePath: "/bin/weird33", OpenPorts: []uint32{1, 2, 3}}, *m.Obj.Process)
m = matches[1]
assert.Equal(t, EventCreated, m.Type)
assert.Equal(t, "exec-only", m.Obj.Criteria.GetName())
assert.Empty(t, m.Obj.Criteria.GetNamespace())
assert.Equal(t, services.ProcessInfo{Pid: 6, ExePath: "/bin/clientweird99"}, *m.Obj.Process)

testMatch(t, matches[0], "exec-only", "", services.ProcessInfo{Pid: 1, ExePath: "/bin/weird33", OpenPorts: []uint32{1, 2, 3}})
testMatch(t, matches[1], "exec-only", "", services.ProcessInfo{Pid: 6, ExePath: "/bin/clientweird99"})
}

func TestCriteriaMatcher_Exclude_Metadata(t *testing.T) {
Expand Down Expand Up @@ -169,12 +157,8 @@ func TestCriteriaMatcher_Exclude_Metadata(t *testing.T) {

matches := testutil.ReadChannel(t, filteredProcesses, 1000*testTimeout)
require.Len(t, matches, 2)
m := matches[0]
assert.Equal(t, EventCreated, m.Type)
assert.Equal(t, services.ProcessInfo{Pid: 1, ExePath: "/bin/weird33"}, *m.Obj.Process)
m = matches[1]
assert.Equal(t, EventCreated, m.Type)
assert.Equal(t, services.ProcessInfo{Pid: 3, ExePath: "server"}, *m.Obj.Process)
testMatch(t, matches[0], "", "", services.ProcessInfo{Pid: 1, ExePath: "/bin/weird33"})
testMatch(t, matches[1], "", "", services.ProcessInfo{Pid: 3, ExePath: "server"})
}

func TestCriteriaMatcher_MustMatchAllAttributes(t *testing.T) {
Expand Down Expand Up @@ -233,11 +217,7 @@ func TestCriteriaMatcher_MustMatchAllAttributes(t *testing.T) {
})
matches := testutil.ReadChannel(t, filteredProcesses, testTimeout)
require.Len(t, matches, 1)
m := matches[0]
assert.Equal(t, EventCreated, m.Type)
assert.Equal(t, "all-attributes-must-match", m.Obj.Criteria.GetName())
assert.Equal(t, "foons", m.Obj.Criteria.GetNamespace())
assert.Equal(t, services.ProcessInfo{Pid: 1, ExePath: "/bin/foo", OpenPorts: []uint32{8081}}, *m.Obj.Process)
testMatch(t, matches[0], "all-attributes-must-match", "foons", services.ProcessInfo{Pid: 1, ExePath: "/bin/foo", OpenPorts: []uint32{8081}})
}

func TestCriteriaMatcherMissingPort(t *testing.T) {
Expand Down Expand Up @@ -275,16 +255,8 @@ func TestCriteriaMatcherMissingPort(t *testing.T) {

matches := testutil.ReadChannel(t, filteredProcesses, testTimeout)
require.Len(t, matches, 2)
m := matches[0]
assert.Equal(t, EventCreated, m.Type)
assert.Equal(t, "port-only", m.Obj.Criteria.GetName())
assert.Equal(t, "foo", m.Obj.Criteria.GetNamespace())
assert.Equal(t, services.ProcessInfo{Pid: 1, ExePath: "/bin/weird33", OpenPorts: []uint32{80}, PPid: 0}, *m.Obj.Process)
m = matches[1]
assert.Equal(t, EventCreated, m.Type)
assert.Equal(t, "port-only", m.Obj.Criteria.GetName())
assert.Equal(t, "foo", m.Obj.Criteria.GetNamespace())
assert.Equal(t, services.ProcessInfo{Pid: 3, ExePath: "/bin/weird33", OpenPorts: []uint32{}, PPid: 1}, *m.Obj.Process)
testMatch(t, matches[0], "port-only", "foo", services.ProcessInfo{Pid: 1, ExePath: "/bin/weird33", OpenPorts: []uint32{80}, PPid: 0})
testMatch(t, matches[1], "port-only", "foo", services.ProcessInfo{Pid: 3, ExePath: "/bin/weird33", OpenPorts: []uint32{}, PPid: 1})
}

func TestCriteriaMatcherContainersOnly(t *testing.T) {
Expand Down Expand Up @@ -342,16 +314,8 @@ func TestCriteriaMatcherContainersOnly(t *testing.T) {

matches := testutil.ReadChannel(t, filteredProcesses, 5000*testTimeout)
require.Len(t, matches, 2)
m := matches[0]
assert.Equal(t, EventCreated, m.Type)
assert.Equal(t, "port-only-containers", m.Obj.Criteria.GetName())
assert.Equal(t, "foo", m.Obj.Criteria.GetNamespace())
assert.Equal(t, services.ProcessInfo{Pid: 2, ExePath: "/bin/weird33", OpenPorts: []uint32{80}, PPid: 0}, *m.Obj.Process)
m = matches[1]
assert.Equal(t, EventCreated, m.Type)
assert.Equal(t, "port-only-containers", m.Obj.Criteria.GetName())
assert.Equal(t, "foo", m.Obj.Criteria.GetNamespace())
assert.Equal(t, services.ProcessInfo{Pid: 3, ExePath: "/bin/weird33", OpenPorts: []uint32{80}, PPid: 1}, *m.Obj.Process)
testMatch(t, matches[0], "port-only-containers", "foo", services.ProcessInfo{Pid: 2, ExePath: "/bin/weird33", OpenPorts: []uint32{80}, PPid: 0})
testMatch(t, matches[1], "port-only-containers", "foo", services.ProcessInfo{Pid: 3, ExePath: "/bin/weird33", OpenPorts: []uint32{80}, PPid: 1})
}

func TestInstrumentation_CoexistingWithDeprecatedServices(t *testing.T) {
Expand Down Expand Up @@ -489,3 +453,125 @@ func TestInstrumentation_CoexistingWithDeprecatedServices(t *testing.T) {
})
}
}

func TestCriteriaMatcher_Granular(t *testing.T) {
pipeConfig := obi.Config{}

require.NoError(t, yaml.Unmarshal([]byte(`discovery:
instrument:
- k8s_namespace: default
exports: [metrics]
- k8s_deployment_name: planet-service
exports: [traces]
- k8s_deployment_name: satellite-service
exports: []
- k8s_deployment_name: star-service
- k8s_deployment_name: asteroid-service
exports: [metrics, traces]
`), &pipeConfig))

discoveredProcesses := msg.NewQueue[[]Event[ProcessAttrs]](msg.ChannelBufferLen(10))
filteredProcessesQu := msg.NewQueue[[]Event[ProcessMatch]](msg.ChannelBufferLen(10))

filteredProcesses := filteredProcessesQu.Subscribe()

matcherFunc, err := CriteriaMatcherProvider(&pipeConfig, discoveredProcesses, filteredProcessesQu)(t.Context())

require.NoError(t, err)

go matcherFunc(t.Context())
defer filteredProcessesQu.Close()

processInfo = func(pp ProcessAttrs) (*services.ProcessInfo, error) {
exePath := map[PID]string{
1: "/bin/planet-service",
2: "/bin/satellite-service",
3: "/bin/star-service",
4: "/bin/asteroid-service",
}[pp.pid]

return &services.ProcessInfo{Pid: int32(pp.pid), ExePath: exePath, OpenPorts: pp.openPorts}, nil
}

discoveredProcesses.Send([]Event[ProcessAttrs]{
{
Type: EventCreated,
Obj: ProcessAttrs{
pid: 1,
metadata: map[string]string{
"k8s_namespace": "default",
"k8s_deployment_name": "planet-service",
},
},
},
{
Type: EventCreated,
Obj: ProcessAttrs{
pid: 2,
metadata: map[string]string{
"k8s_namespace": "default",
"k8s_deployment_name": "satellite-service",
},
},
},
{
Type: EventCreated,
Obj: ProcessAttrs{
pid: 3,
metadata: map[string]string{
"k8s_namespace": "default",
"k8s_deployment_name": "star-service",
},
},
},
{
Type: EventCreated,
Obj: ProcessAttrs{
pid: 4,
metadata: map[string]string{
"k8s_namespace": "default",
"k8s_deployment_name": "asteroid-service",
},
},
},
})

matches := testutil.ReadChannel(t, filteredProcesses, testTimeout)
require.Len(t, matches, 4)

planetMatch := matches[0].Obj

require.Len(t, planetMatch.Criteria, 2)

planetAttrs := makeServiceAttrs(&planetMatch)

assert.True(t, planetAttrs.ExportModes.CanExportTraces())
assert.False(t, planetAttrs.ExportModes.CanExportMetrics())

satelliteMatch := matches[1].Obj

require.Len(t, satelliteMatch.Criteria, 2)

satelliteAttrs := makeServiceAttrs(&satelliteMatch)

assert.False(t, satelliteAttrs.ExportModes.CanExportTraces())
assert.False(t, satelliteAttrs.ExportModes.CanExportMetrics())

starMatch := matches[2].Obj

require.Len(t, starMatch.Criteria, 2)

starAttrs := makeServiceAttrs(&starMatch)

assert.False(t, starAttrs.ExportModes.CanExportTraces())
assert.True(t, starAttrs.ExportModes.CanExportMetrics())

asteroidMatch := matches[3].Obj

require.Len(t, asteroidMatch.Criteria, 2)

asteroidAttrs := makeServiceAttrs(&asteroidMatch)

assert.True(t, asteroidAttrs.ExportModes.CanExportTraces())
assert.True(t, asteroidAttrs.ExportModes.CanExportMetrics())
}
Loading