From 69851938e956e4b12e23eea41a2364a3a68a7f67 Mon Sep 17 00:00:00 2001 From: Mike Dame Date: Wed, 18 Mar 2026 15:32:12 -0400 Subject: [PATCH 1/4] DynamicPIDSelector uniqueness and exclusivity improvements --- pkg/appolly/discover/finder.go | 8 ++++++++ pkg/appolly/discover/matcher.go | 17 +++++++++++++---- pkg/instrumenter/instrumenter.go | 4 +++- 3 files changed, 24 insertions(+), 5 deletions(-) diff --git a/pkg/appolly/discover/finder.go b/pkg/appolly/discover/finder.go index b61f3e3e88..c7eeede429 100644 --- a/pkg/appolly/discover/finder.go +++ b/pkg/appolly/discover/finder.go @@ -79,6 +79,14 @@ func (pf *ProcessFinder) Start(ctx context.Context, opts ...ProcessFinderStartOp tracerEvents := msgh.QueueFromConfig[Event[*ebpf.Instrumentable]](pf.cfg, "tracerEvents") configCriteria := FindingCriteria(pf.cfg, startConfig.dynamicPIDSelector != nil) + // When using the dynamic PID selector, use only it: pass nil so the matcher gets + // just [dynamicSelector.AsSelector()]. Otherwise the matcher would also have config + // criteria (e.g. the default RegexSelector with empty path/port), and a process + // matches if it matches any criterion—so every process with pod metadata would + // match the empty criterion and get instrumented alongside the dynamic PIDs. + if startConfig.dynamicPIDSelector != nil { + configCriteria = nil + } swi := swarm.Instancer{} processEvents := msgh.QueueFromConfig[[]Event[ProcessAttrs]](pf.cfg, "processEvents") diff --git a/pkg/appolly/discover/matcher.go b/pkg/appolly/discover/matcher.go index 39dd78ef37..639a6488c6 100644 --- a/pkg/appolly/discover/matcher.go +++ b/pkg/appolly/discover/matcher.go @@ -29,8 +29,8 @@ var ( ) // criteriaMatcherProvider filters the processes that match the discovery criteria. -// When dynamicSelector is non-nil, runtime PIDs supplement config criteria and the matcher -// listens to the selector's RemovedNotify() channel for synthetic deletes. +// When dynamicSelector is non-nil, only the dynamic selector is used (no config criteria), +// and the matcher listens to the selector's RemovedNotify() channel for synthetic deletes. func criteriaMatcherProvider( cfg *obi.Config, input *msg.Queue[[]Event[ProcessAttrs]], @@ -40,10 +40,12 @@ func criteriaMatcherProvider( ) swarm.InstanceFunc { instrumenterNamespace, _ := namespaceFetcherFunc(app.PID(osPidFunc())) var removedNotify <-chan []app.PID - criteria := configCriteria + var criteria []services.Selector if dynamicSelector != nil { removedNotify = dynamicSelector.RemovedNotify() - criteria = append([]services.Selector{dynamicSelector.AsSelector()}, configCriteria...) + criteria = []services.Selector{dynamicSelector.AsSelector()} + } else { + criteria = configCriteria } m := &Matcher{ Log: slog.With("component", "discover.CriteriaMatcher"), @@ -268,6 +270,13 @@ func (m *Matcher) matchProcess(obj *ProcessAttrs, p *services.ProcessInfo, a ser if pids, ok := a.GetPIDs(); ok && len(pids) > 0 { return pidInList(p.Pid, pids) } + // When DynamicPIDs is set, Criteria is only the dynamic selector (see criteriaMatcherProvider). + // We already handled non-empty GetPIDs() above; here (empty set or PID not in list) must be no match, + // or we would fall through and matchByAttributes would match every process with pod metadata. + if m.DynamicPIDs != nil { + m.Log.Debug("dynamic PID selector: process not in set (or set empty), not matching", "pid", p.Pid, "exe", p.ExePath) + return false + } if !a.GetPath().IsSet() && !a.GetLanguages().IsSet() && !a.GetCmdArgs().IsSet() && a.GetOpenPorts().Len() == 0 && len(obj.metadata) == 0 { pids, hasPIDs := a.GetPIDs() if !hasPIDs || len(pids) == 0 { diff --git a/pkg/instrumenter/instrumenter.go b/pkg/instrumenter/instrumenter.go index df59e2adaa..049ed35317 100644 --- a/pkg/instrumenter/instrumenter.go +++ b/pkg/instrumenter/instrumenter.go @@ -49,7 +49,9 @@ func RunWithContextInfo( opt(ctxInfo) } - app := cfg.Enabled(obi.FeatureAppO11y) + // Enable App O11y when config enables it or when the caller passed a dynamic PID selector + // (allows an "empty" instrumenter that only instruments PIDs added via the selector). + app := cfg.Enabled(obi.FeatureAppO11y) || ctxInfo.AppO11y.DynamicPIDSelector != nil net := cfg.Enabled(obi.FeatureNetO11y) stats := cfg.Enabled(obi.FeatureStatsO11y) From 924dabfefa6113c88fc0e756e2555a1a8f3fbf62 Mon Sep 17 00:00:00 2001 From: Mike Dame Date: Wed, 25 Mar 2026 17:07:57 -0400 Subject: [PATCH 2/4] Move DynamicSelector to its own swarm node --- pkg/appolly/discover/finder.go | 12 +- pkg/appolly/discover/matcher.go | 87 ++-------- pkg/appolly/discover/matcher_dynamic.go | 184 ++++++++++++++++++++++ pkg/appolly/discover/matcher_test.go | 100 ++++++++++-- pkg/appolly/discover/watcher_kube_test.go | 2 +- pkg/appolly/discover/watcher_proc_test.go | 4 +- 6 files changed, 293 insertions(+), 96 deletions(-) create mode 100644 pkg/appolly/discover/matcher_dynamic.go diff --git a/pkg/appolly/discover/finder.go b/pkg/appolly/discover/finder.go index c7eeede429..31fd8f6c6a 100644 --- a/pkg/appolly/discover/finder.go +++ b/pkg/appolly/discover/finder.go @@ -78,15 +78,7 @@ func (pf *ProcessFinder) Start(ctx context.Context, opts ...ProcessFinderStartOp tracerEvents := msgh.QueueFromConfig[Event[*ebpf.Instrumentable]](pf.cfg, "tracerEvents") - configCriteria := FindingCriteria(pf.cfg, startConfig.dynamicPIDSelector != nil) - // When using the dynamic PID selector, use only it: pass nil so the matcher gets - // just [dynamicSelector.AsSelector()]. Otherwise the matcher would also have config - // criteria (e.g. the default RegexSelector with empty path/port), and a process - // matches if it matches any criterion—so every process with pod metadata would - // match the empty criterion and get instrumented alongside the dynamic PIDs. - if startConfig.dynamicPIDSelector != nil { - configCriteria = nil - } + configCriteria := FindingCriteria(pf.cfg) swi := swarm.Instancer{} processEvents := msgh.QueueFromConfig[[]Event[ProcessAttrs]](pf.cfg, "processEvents") @@ -123,6 +115,8 @@ func (pf *ProcessFinder) Start(ctx context.Context, opts ...ProcessFinderStartOp criteriaFilteredEvents := msgh.QueueFromConfig[[]Event[ProcessMatch]](pf.cfg, "criteriaFilteredEvents") swi.Add(criteriaMatcherProvider(pf.cfg, langEnrichedEvents, criteriaFilteredEvents, configCriteria, startConfig.dynamicPIDSelector), swarm.WithID("CriteriaMatcher")) + swi.Add(dynamicMatcherProvider(langEnrichedEvents, criteriaFilteredEvents, startConfig.dynamicPIDSelector), + swarm.WithID("DynamicMatcher")) executableTypes := msgh.QueueFromConfig[[]Event[ebpf.Instrumentable]](pf.cfg, "executableTypes") swi.Add(ExecTyperProvider(pf.cfg, pf.ctxInfo.Metrics, pf.ctxInfo.K8sInformer, criteriaFilteredEvents, executableTypes), diff --git a/pkg/appolly/discover/matcher.go b/pkg/appolly/discover/matcher.go index 639a6488c6..700c207e71 100644 --- a/pkg/appolly/discover/matcher.go +++ b/pkg/appolly/discover/matcher.go @@ -20,6 +20,7 @@ import ( "go.opentelemetry.io/obi/pkg/obi" "go.opentelemetry.io/obi/pkg/pipe/msg" "go.opentelemetry.io/obi/pkg/pipe/swarm" + "go.opentelemetry.io/obi/pkg/pipe/swarm/swarms" ) var ( @@ -39,17 +40,14 @@ func criteriaMatcherProvider( dynamicSelector *DynamicPIDSelector, ) swarm.InstanceFunc { instrumenterNamespace, _ := namespaceFetcherFunc(app.PID(osPidFunc())) - var removedNotify <-chan []app.PID - var criteria []services.Selector if dynamicSelector != nil { - removedNotify = dynamicSelector.RemovedNotify() - criteria = []services.Selector{dynamicSelector.AsSelector()} - } else { - criteria = configCriteria + emptyFunc, _ := swarm.EmptyRunFunc() + return swarm.DirectInstance(emptyFunc) } + m := &Matcher{ Log: slog.With("component", "discover.CriteriaMatcher"), - Criteria: criteria, + Criteria: configCriteria, ExcludeCriteria: ExcludingCriteria(cfg), LogEnricherCriteria: LogEnricherFindingCriteria(cfg), ProcessHistory: map[app.PID]ProcessMatch{}, @@ -57,8 +55,6 @@ func criteriaMatcherProvider( Output: output, Namespace: instrumenterNamespace, HasHostPidAccess: hasHostPidAccess(), - DynamicPIDs: dynamicSelector, - RemovedPIDsNotify: removedNotify, } return swarm.DirectInstance(m.Run) } @@ -78,11 +74,6 @@ type Matcher struct { Output *msg.Queue[[]Event[ProcessMatch]] Namespace string HasHostPidAccess bool - // DynamicPIDs, when set, supplements config criteria: a process also matches if its PID is in this set. - DynamicPIDs *DynamicPIDSelector - // RemovedPIDsNotify, when set, carries the PIDs removed from the dynamic selector so the - // matcher can emit targeted synthetic deletes without rescanning ProcessHistory. - RemovedPIDsNotify <-chan []app.PID } // ProcessMatch matches a found process with the first selection criteria it fulfilled. @@ -99,30 +90,14 @@ func (pm ProcessMatch) LogEnricherEnabled() bool { func (m *Matcher) Run(ctx context.Context) { defer m.Output.Close() m.Log.Debug("starting criteria matcher node") - for { - select { - case <-ctx.Done(): - m.Log.Debug("context done, stopping node") - return - case i, ok := <-m.Input: - if !ok { - m.Log.Debug("input channel closed, stopping node") - return - } - m.Log.Debug("filtering processes", "len", len(i)) - o := m.filter(i) - m.Log.Debug("processes matching selection criteria", "len", len(o)) - if len(o) > 0 { - m.Output.Send(o) - } - case removedPIDs := <-m.RemovedPIDsNotify: - o := m.syntheticDeletesForRemovedPIDs(removedPIDs) - if len(o) > 0 { - m.Log.Debug("synthetic deletes for removed PIDs", "len", len(o)) - m.Output.Send(o) - } + swarms.ForEachInput(ctx, m.Input, m.Log.Debug, func(i []Event[ProcessAttrs]) { + m.Log.Debug("filtering processes", "len", len(i)) + o := m.filter(i) + m.Log.Debug("processes matching selection criteria", "len", len(o)) + if len(o) > 0 { + m.Output.Send(o) } - } + }) } func (m *Matcher) filter(events []Event[ProcessAttrs]) []Event[ProcessMatch] { @@ -141,27 +116,6 @@ func (m *Matcher) filter(events []Event[ProcessAttrs]) []Event[ProcessMatch] { return matches } -// syntheticDeletesForRemovedPIDs returns EventDeleted for the specific PIDs removed from the -// dynamic selector. This is the matcher side of the edge-based removal path: the selector sends -// the exact removed PIDs, so we can look them up directly instead of doing a level-based scan of -// ProcessHistory against the selector's current PID set. -func (m *Matcher) syntheticDeletesForRemovedPIDs(removedPIDs []app.PID) []Event[ProcessMatch] { - if len(removedPIDs) == 0 { - return nil - } - var out []Event[ProcessMatch] - for _, pid := range removedPIDs { - procMatch, instrumented := m.ProcessHistory[pid] - if !instrumented { - continue - } - delete(m.ProcessHistory, pid) - m.Log.Debug("pid removed from dynamic selector, uninstrumenting", "pid", pid, "comm", procMatch.Process.ExePath) - out = append(out, Event[ProcessMatch]{Type: EventDeleted, Obj: procMatch}) - } - return out -} - func (m *Matcher) alreadyMatched(pid app.PID) bool { _, ok := m.ProcessHistory[pid] return ok @@ -270,13 +224,7 @@ func (m *Matcher) matchProcess(obj *ProcessAttrs, p *services.ProcessInfo, a ser if pids, ok := a.GetPIDs(); ok && len(pids) > 0 { return pidInList(p.Pid, pids) } - // When DynamicPIDs is set, Criteria is only the dynamic selector (see criteriaMatcherProvider). - // We already handled non-empty GetPIDs() above; here (empty set or PID not in list) must be no match, - // or we would fall through and matchByAttributes would match every process with pod metadata. - if m.DynamicPIDs != nil { - m.Log.Debug("dynamic PID selector: process not in set (or set empty), not matching", "pid", p.Pid, "exe", p.ExePath) - return false - } + if !a.GetPath().IsSet() && !a.GetLanguages().IsSet() && !a.GetCmdArgs().IsSet() && a.GetOpenPorts().Len() == 0 && len(obj.metadata) == 0 { pids, hasPIDs := a.GetPIDs() if !hasPIDs || len(pids) == 0 { @@ -434,14 +382,11 @@ func LogEnricherFindingCriteria(cfg *obi.Config) []services.Selector { return selectors } -// FindingCriteria returns discovery criteria from config. When skipTargetPIDs is true -// (e.g. Instrumenter with DynamicPIDSelector), target_pids are not included here; the matcher -// uses the dynamic selector as a supplement. When skipTargetPIDs is false, target_pids from -// config are included as static criteria when set. -func FindingCriteria(cfg *obi.Config, skipTargetPIDs bool) []services.Selector { +// FindingCriteria returns discovery criteria from config. +func FindingCriteria(cfg *obi.Config) []services.Selector { logDeprecationAndConflicts(cfg) - if !skipTargetPIDs && cfg.TargetPIDs.Len() > 0 { + if cfg.TargetPIDs.Len() > 0 { vals := cfg.TargetPIDs.AllValues() pids := make([]uint32, 0, len(vals)) for _, v := range vals { diff --git a/pkg/appolly/discover/matcher_dynamic.go b/pkg/appolly/discover/matcher_dynamic.go new file mode 100644 index 0000000000..a4098e86d8 --- /dev/null +++ b/pkg/appolly/discover/matcher_dynamic.go @@ -0,0 +1,184 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package discover // import "go.opentelemetry.io/obi/pkg/appolly/discover" + +import ( + "context" + "log/slog" + + "go.opentelemetry.io/obi/pkg/appolly/app" + "go.opentelemetry.io/obi/pkg/appolly/services" + "go.opentelemetry.io/obi/pkg/pipe/msg" + "go.opentelemetry.io/obi/pkg/pipe/swarm" +) + +type DynamicMatcher struct { + Log *slog.Logger + DynamicPIDSelector services.Selector + Input <-chan []Event[ProcessAttrs] + Output *msg.Queue[[]Event[ProcessMatch]] + ProcessHistory map[app.PID]ProcessMatch + // RemovedPIDsNotify, when set, carries the PIDs removed from the dynamic selector so the + // matcher can emit targeted synthetic deletes without rescanning ProcessHistory. + RemovedPIDsNotify <-chan []app.PID +} + +func dynamicMatcherProvider( + input *msg.Queue[[]Event[ProcessAttrs]], + output *msg.Queue[[]Event[ProcessMatch]], + dynamicPIDs *DynamicPIDSelector, +) swarm.InstanceFunc { + if dynamicPIDs == nil { + emptyFunc, _ := swarm.EmptyRunFunc() + return swarm.DirectInstance(emptyFunc) + } + + return swarm.DirectInstance(func(ctx context.Context) { + dynamicMatcher := &DynamicMatcher{ + Log: slog.With("component", "discover.DynamicMatcher"), + DynamicPIDSelector: dynamicPIDs.AsSelector(), + Input: input.Subscribe(msg.SubscriberName("discover.DynamicMatcher")), + Output: output, + ProcessHistory: map[app.PID]ProcessMatch{}, + RemovedPIDsNotify: dynamicPIDs.RemovedNotify(), + } + dynamicMatcher.Run(ctx) + }) +} + +func (m *DynamicMatcher) Run(ctx context.Context) { + defer m.Output.Close() + m.Log.Debug("starting dynamic matcher node") + + for { + select { + case <-ctx.Done(): + m.Log.Debug("context done, stopping node") + return + case i, ok := <-m.Input: + if !ok { + m.Log.Debug("input channel closed, stopping node") + return + } + m.Log.Debug("filtering processes", "len", len(i)) + o := m.filter(i) + m.Log.Debug("processes matching selection criteria", "len", len(o)) + if len(o) > 0 { + m.Output.Send(o) + } + case removedPIDs := <-m.RemovedPIDsNotify: + o := m.syntheticDeletesForRemovedPIDs(removedPIDs) + if len(o) > 0 { + m.Log.Debug("synthetic deletes for removed PIDs", "len", len(o)) + m.Output.Send(o) + } + } + } +} + +func (m *DynamicMatcher) filter(events []Event[ProcessAttrs]) []Event[ProcessMatch] { + var matches []Event[ProcessMatch] + for _, ev := range events { + if ev.Type == EventDeleted { + if ev, ok := m.filterDeleted(ev.Obj); ok { + matches = append(matches, ev) + } + } else { + if ev, ok := m.filterCreated(ev.Obj); ok { + matches = append(matches, ev) + } + } + } + return matches +} + +func (m *DynamicMatcher) filterCreated(obj ProcessAttrs) (Event[ProcessMatch], bool) { + if _, ok := m.ProcessHistory[obj.pid]; ok { + return Event[ProcessMatch]{}, false + } + + proc, err := processInfo(obj) + if err != nil { + m.Log.Debug("can't get information for process", "pid", obj.pid, "error", err) + return Event[ProcessMatch]{}, false + } + + if processMatch := m.matchDynamicCriteria(obj, proc); processMatch != nil { + m.ProcessHistory[obj.pid] = *processMatch + + return Event[ProcessMatch]{ + Type: EventCreated, + Obj: *processMatch, + }, true + } + + // We didn't match the process, but let's see if the parent PID is tracked, it might be the child hasn't opened the port yet + if procMatch, ok := m.ProcessHistory[proc.PPid]; ok { + m.Log.Debug("found process by matching the process parent id", "pid", proc.Pid, "ppid", proc.PPid, "comm", proc.ExePath, "metadata", obj.metadata) + + procMatch.Process = proc + + m.ProcessHistory[obj.pid] = procMatch + + return Event[ProcessMatch]{ + Type: EventCreated, + Obj: procMatch, + }, true + } + + return Event[ProcessMatch]{}, false +} + +func (m *DynamicMatcher) matchDynamicCriteria(obj ProcessAttrs, proc *services.ProcessInfo) *ProcessMatch { + criteria := make([]services.Selector, 0, 1) + if pids, ok := m.DynamicPIDSelector.GetPIDs(); ok && len(pids) > 0 { + for _, p := range pids { + if p == proc.Pid { + criteria = append(criteria, m.DynamicPIDSelector) + break + } + } + } + + if len(criteria) > 0 { + m.Log.Debug("found process", "pid", proc.Pid, "comm", proc.ExePath, "metadata", + obj.metadata, "podLabels", obj.podLabels, "criteria", criteria) + + return &ProcessMatch{Criteria: criteria, Process: proc} + } + + return nil +} + +func (m *DynamicMatcher) filterDeleted(obj ProcessAttrs) (Event[ProcessMatch], bool) { + procMatch, ok := m.ProcessHistory[obj.pid] + if !ok { + m.Log.Debug("deleted untracked process. Ignoring", "pid", obj.pid) + return Event[ProcessMatch]{}, false + } + delete(m.ProcessHistory, obj.pid) + m.Log.Debug("stopped process", "pid", procMatch.Process.Pid, "comm", procMatch.Process.ExePath) + return Event[ProcessMatch]{Type: EventDeleted, Obj: procMatch}, true +} + +// syntheticDeletesForRemovedPIDs returns EventDeleted for the specific PIDs removed from the +// dynamic selector. This is the matcher side of the edge-based removal path: the selector sends +// the exact removed PIDs, so we can look them up directly instead of doing a level-based scan of +// ProcessHistory against the selector's current PID set. +func (m *DynamicMatcher) syntheticDeletesForRemovedPIDs(removedPIDs []app.PID) []Event[ProcessMatch] { + if len(removedPIDs) == 0 { + return nil + } + var out []Event[ProcessMatch] + for _, pid := range removedPIDs { + procMatch, instrumented := m.ProcessHistory[pid] + if !instrumented { + continue + } + delete(m.ProcessHistory, pid) + m.Log.Debug("pid removed from dynamic selector, uninstrumenting", "pid", pid, "comm", procMatch.Process.ExePath) + out = append(out, Event[ProcessMatch]{Type: EventDeleted, Obj: procMatch}) + } + return out +} diff --git a/pkg/appolly/discover/matcher_test.go b/pkg/appolly/discover/matcher_test.go index f3345a7ad4..2bbf282d35 100644 --- a/pkg/appolly/discover/matcher_test.go +++ b/pkg/appolly/discover/matcher_test.go @@ -17,9 +17,83 @@ import ( "go.opentelemetry.io/obi/pkg/internal/testutil" "go.opentelemetry.io/obi/pkg/obi" "go.opentelemetry.io/obi/pkg/pipe/msg" + "go.opentelemetry.io/obi/pkg/pipe/swarm" "go.opentelemetry.io/obi/pkg/transform" ) +// TestMatchersMutuallyExclusive wires CriteriaMatcher + DynamicMatcher like ProcessFinder.Start. +// Exactly one path subscribes to the input: dynamic selector set → only DynamicMatcher runs; +// no selector → only CriteriaMatcher runs. Prevents duplicate output and wrong criteria source. +func TestMatchersMutuallyExclusive(t *testing.T) { + pipeConfig := obi.Config{} + require.NoError(t, yaml.Unmarshal([]byte(`discovery: + services: + - name: port-only + namespace: foo + open_ports: 80 +`), &pipeConfig)) + cfgCriteria := FindingCriteria(&pipeConfig) + + t.Run("dynamic_mode_static_is_noop_config_match_ignored", func(t *testing.T) { + sel := NewDynamicPIDSelector() + sel.AddPIDs(7) + + inQ := msg.NewQueue[[]Event[ProcessAttrs]](msg.ChannelBufferLen(10)) + outQ := msg.NewQueue[[]Event[ProcessMatch]](msg.ChannelBufferLen(10)) + outCh := outQ.Subscribe() + + processInfo = func(pp ProcessAttrs) (*services.ProcessInfo, error) { + return &services.ProcessInfo{Pid: pp.pid, ExePath: "/any/bin", OpenPorts: pp.openPorts}, nil + } + + swi := swarm.Instancer{} + swi.Add(criteriaMatcherProvider(&pipeConfig, inQ, outQ, cfgCriteria, sel), swarm.WithID("CriteriaMatcher")) + swi.Add(dynamicMatcherProvider(inQ, outQ, sel), swarm.WithID("DynamicMatcher")) + runner, err := swi.Instance(t.Context()) + require.NoError(t, err) + runner.Start(t.Context()) + defer outQ.Close() + + // Matches static port-only (80) but not dynamic PID set — would appear if CriteriaMatcher ran. + inQ.Send([]Event[ProcessAttrs]{{Type: EventCreated, Obj: ProcessAttrs{pid: 99, openPorts: []uint32{80}}}}) + testutil.ChannelEmpty(t, outCh, 300*time.Millisecond) + + inQ.Send([]Event[ProcessAttrs]{{Type: EventCreated, Obj: ProcessAttrs{pid: 7, openPorts: []uint32{}}}}) + ms := testutil.ReadChannel(t, outCh, testTimeout) + require.Len(t, ms, 1) + assert.Equal(t, app.PID(7), ms[0].Obj.Process.Pid) + + inQ.Close() + testutil.DrainUntilClosed(outCh) + }) + + t.Run("static_mode_dynamic_is_noop_single_output", func(t *testing.T) { + inQ := msg.NewQueue[[]Event[ProcessAttrs]](msg.ChannelBufferLen(10)) + outQ := msg.NewQueue[[]Event[ProcessMatch]](msg.ChannelBufferLen(10)) + outCh := outQ.Subscribe() + + processInfo = func(pp ProcessAttrs) (*services.ProcessInfo, error) { + return &services.ProcessInfo{Pid: pp.pid, ExePath: "/bin/app", OpenPorts: pp.openPorts}, nil + } + + swi := swarm.Instancer{} + swi.Add(criteriaMatcherProvider(&pipeConfig, inQ, outQ, cfgCriteria, nil), swarm.WithID("CriteriaMatcher")) + swi.Add(dynamicMatcherProvider(inQ, outQ, nil), swarm.WithID("DynamicMatcher")) + runner, err := swi.Instance(t.Context()) + require.NoError(t, err) + runner.Start(t.Context()) + defer outQ.Close() + + inQ.Send([]Event[ProcessAttrs]{{Type: EventCreated, Obj: ProcessAttrs{pid: 12, openPorts: []uint32{80}}}}) + ms := testutil.ReadChannel(t, outCh, testTimeout) + require.Len(t, ms, 1) + assert.Equal(t, app.PID(12), ms[0].Obj.Process.Pid) + + inQ.Close() + testutil.DrainUntilClosed(outCh) + }) +} + func testMatch(t *testing.T, m Event[ProcessMatch], name string, namespace string, proc services.ProcessInfo, ) { @@ -52,7 +126,7 @@ func TestCriteriaMatcher(t *testing.T) { discoveredProcesses := msg.NewQueue[[]Event[ProcessAttrs]](msg.ChannelBufferLen(10)) filteredProcessesQu := msg.NewQueue[[]Event[ProcessMatch]](msg.ChannelBufferLen(10)) filteredProcesses := filteredProcessesQu.Subscribe() - matcherFunc, err := criteriaMatcherProvider(&pipeConfig, discoveredProcesses, filteredProcessesQu, FindingCriteria(&pipeConfig, false), nil)(t.Context()) + matcherFunc, err := criteriaMatcherProvider(&pipeConfig, discoveredProcesses, filteredProcessesQu, FindingCriteria(&pipeConfig), nil)(t.Context()) require.NoError(t, err) go matcherFunc(t.Context()) defer filteredProcessesQu.Close() @@ -103,7 +177,7 @@ func TestCriteriaMatcherLanguage(t *testing.T) { discoveredProcesses := msg.NewQueue[[]Event[ProcessAttrs]](msg.ChannelBufferLen(10)) filteredProcessesQu := msg.NewQueue[[]Event[ProcessMatch]](msg.ChannelBufferLen(10)) filteredProcesses := filteredProcessesQu.Subscribe() - matcherFunc, err := criteriaMatcherProvider(&pipeConfig, discoveredProcesses, filteredProcessesQu, FindingCriteria(&pipeConfig, false), nil)(t.Context()) + matcherFunc, err := criteriaMatcherProvider(&pipeConfig, discoveredProcesses, filteredProcessesQu, FindingCriteria(&pipeConfig), nil)(t.Context()) require.NoError(t, err) go matcherFunc(t.Context()) defer filteredProcessesQu.Close() @@ -153,7 +227,7 @@ func TestCriteriaMatcher_Exclude(t *testing.T) { discoveredProcesses := msg.NewQueue[[]Event[ProcessAttrs]](msg.ChannelBufferLen(10)) filteredProcessesQu := msg.NewQueue[[]Event[ProcessMatch]](msg.ChannelBufferLen(10)) filteredProcesses := filteredProcessesQu.Subscribe() - matcherFunc, err := criteriaMatcherProvider(&pipeConfig, discoveredProcesses, filteredProcessesQu, FindingCriteria(&pipeConfig, false), nil)(t.Context()) + matcherFunc, err := criteriaMatcherProvider(&pipeConfig, discoveredProcesses, filteredProcessesQu, FindingCriteria(&pipeConfig), nil)(t.Context()) require.NoError(t, err) go matcherFunc(t.Context()) defer filteredProcessesQu.Close() @@ -194,7 +268,7 @@ func TestCriteriaMatcher_Exclude_Metadata(t *testing.T) { discoveredProcesses := msg.NewQueue[[]Event[ProcessAttrs]](msg.ChannelBufferLen(10)) filteredProcessesQu := msg.NewQueue[[]Event[ProcessMatch]](msg.ChannelBufferLen(10)) filteredProcesses := filteredProcessesQu.Subscribe() - matcherFunc, err := criteriaMatcherProvider(&pipeConfig, discoveredProcesses, filteredProcessesQu, FindingCriteria(&pipeConfig, false), nil)(t.Context()) + matcherFunc, err := criteriaMatcherProvider(&pipeConfig, discoveredProcesses, filteredProcessesQu, FindingCriteria(&pipeConfig), nil)(t.Context()) require.NoError(t, err) go matcherFunc(t.Context()) defer filteredProcessesQu.Close() @@ -242,7 +316,7 @@ func TestCriteriaMatcher_MustMatchAllAttributes(t *testing.T) { discoveredProcesses := msg.NewQueue[[]Event[ProcessAttrs]](msg.ChannelBufferLen(10)) filteredProcessesQu := msg.NewQueue[[]Event[ProcessMatch]](msg.ChannelBufferLen(10)) filteredProcesses := filteredProcessesQu.Subscribe() - matcherFunc, err := criteriaMatcherProvider(&pipeConfig, discoveredProcesses, filteredProcessesQu, FindingCriteria(&pipeConfig, false), nil)(t.Context()) + matcherFunc, err := criteriaMatcherProvider(&pipeConfig, discoveredProcesses, filteredProcessesQu, FindingCriteria(&pipeConfig), nil)(t.Context()) require.NoError(t, err) go matcherFunc(t.Context()) defer filteredProcessesQu.Close() @@ -297,7 +371,7 @@ func TestCriteriaMatcherMissingPort(t *testing.T) { discoveredProcesses := msg.NewQueue[[]Event[ProcessAttrs]](msg.ChannelBufferLen(10)) filteredProcessesQu := msg.NewQueue[[]Event[ProcessMatch]](msg.ChannelBufferLen(10)) filteredProcesses := filteredProcessesQu.Subscribe() - matcherFunc, err := criteriaMatcherProvider(&pipeConfig, discoveredProcesses, filteredProcessesQu, FindingCriteria(&pipeConfig, false), nil)(t.Context()) + matcherFunc, err := criteriaMatcherProvider(&pipeConfig, discoveredProcesses, filteredProcessesQu, FindingCriteria(&pipeConfig), nil)(t.Context()) require.NoError(t, err) go matcherFunc(t.Context()) defer filteredProcessesQu.Close() @@ -356,7 +430,7 @@ func TestCriteriaMatcherContainersOnly(t *testing.T) { discoveredProcesses := msg.NewQueue[[]Event[ProcessAttrs]](msg.ChannelBufferLen(10)) filteredProcessesQu := msg.NewQueue[[]Event[ProcessMatch]](msg.ChannelBufferLen(10)) filteredProcesses := filteredProcessesQu.Subscribe() - matcherFunc, err := criteriaMatcherProvider(&pipeConfig, discoveredProcesses, filteredProcessesQu, FindingCriteria(&pipeConfig, false), nil)(t.Context()) + matcherFunc, err := criteriaMatcherProvider(&pipeConfig, discoveredProcesses, filteredProcessesQu, FindingCriteria(&pipeConfig), nil)(t.Context()) require.NoError(t, err) go matcherFunc(t.Context()) defer filteredProcessesQu.Close() @@ -495,7 +569,7 @@ func TestInstrumentation_CoexistingWithDeprecatedServices(t *testing.T) { discoveredProcesses := msg.NewQueue[[]Event[ProcessAttrs]](msg.ChannelBufferLen(10)) filteredProcessesQu := msg.NewQueue[[]Event[ProcessMatch]](msg.ChannelBufferLen(10)) filteredProcesses := filteredProcessesQu.Subscribe() - matcherFunc, err := criteriaMatcherProvider(&tc.cfg, discoveredProcesses, filteredProcessesQu, FindingCriteria(&tc.cfg, false), nil)(t.Context()) + matcherFunc, err := criteriaMatcherProvider(&tc.cfg, discoveredProcesses, filteredProcessesQu, FindingCriteria(&tc.cfg), nil)(t.Context()) require.NoError(t, err) go matcherFunc(t.Context()) defer filteredProcessesQu.Close() @@ -530,7 +604,7 @@ func TestCriteriaMatcher_TargetPIDs(t *testing.T) { discoveredProcesses := msg.NewQueue[[]Event[ProcessAttrs]](msg.ChannelBufferLen(10)) filteredProcessesQu := msg.NewQueue[[]Event[ProcessMatch]](msg.ChannelBufferLen(10)) filteredProcesses := filteredProcessesQu.Subscribe() - matcherFunc, err := criteriaMatcherProvider(&pipeConfig, discoveredProcesses, filteredProcessesQu, FindingCriteria(&pipeConfig, false), nil)(t.Context()) + matcherFunc, err := criteriaMatcherProvider(&pipeConfig, discoveredProcesses, filteredProcessesQu, FindingCriteria(&pipeConfig), nil)(t.Context()) require.NoError(t, err) go matcherFunc(t.Context()) defer filteredProcessesQu.Close() @@ -561,7 +635,7 @@ func TestCriteriaMatcher_TargetPIDs(t *testing.T) { discoveredProcesses := msg.NewQueue[[]Event[ProcessAttrs]](msg.ChannelBufferLen(10)) filteredProcessesQu := msg.NewQueue[[]Event[ProcessMatch]](msg.ChannelBufferLen(10)) filteredProcesses := filteredProcessesQu.Subscribe() - matcherFunc, err := criteriaMatcherProvider(&pipeConfig, discoveredProcesses, filteredProcessesQu, FindingCriteria(&pipeConfig, false), nil)(t.Context()) + matcherFunc, err := criteriaMatcherProvider(&pipeConfig, discoveredProcesses, filteredProcessesQu, FindingCriteria(&pipeConfig), nil)(t.Context()) require.NoError(t, err) go matcherFunc(t.Context()) defer filteredProcessesQu.Close() @@ -591,7 +665,7 @@ func TestCriteriaMatcher_DynamicTargetPIDs(t *testing.T) { pipeConfig := obi.Config{ServiceName: "dyn-svc", ServiceNamespace: "ns"} dynamicSelector := NewDynamicPIDSelector() dynamicSelector.AddPIDs(42) - configCriteria := FindingCriteria(&pipeConfig, true) + configCriteria := FindingCriteria(&pipeConfig) discoveredProcesses := msg.NewQueue[[]Event[ProcessAttrs]](msg.ChannelBufferLen(10)) filteredProcessesQu := msg.NewQueue[[]Event[ProcessMatch]](msg.ChannelBufferLen(10)) @@ -641,7 +715,7 @@ func TestCriteriaMatcher_DynamicTargetPIDs_RemoveNotification(t *testing.T) { pipeConfig := obi.Config{ServiceName: "dyn-svc", ServiceNamespace: "ns"} dynamicSelector := NewDynamicPIDSelector() dynamicSelector.AddPIDs(42, 100) - configCriteria := FindingCriteria(&pipeConfig, true) + configCriteria := FindingCriteria(&pipeConfig) discoveredProcesses := msg.NewQueue[[]Event[ProcessAttrs]](msg.ChannelBufferLen(10)) filteredProcessesQu := msg.NewQueue[[]Event[ProcessMatch]](msg.ChannelBufferLen(10)) @@ -702,7 +776,7 @@ func TestCriteriaMatcher_Granular(t *testing.T) { filteredProcesses := filteredProcessesQu.Subscribe() - matcherFunc, err := criteriaMatcherProvider(&pipeConfig, discoveredProcesses, filteredProcessesQu, FindingCriteria(&pipeConfig, false), nil)(t.Context()) + matcherFunc, err := criteriaMatcherProvider(&pipeConfig, discoveredProcesses, filteredProcessesQu, FindingCriteria(&pipeConfig), nil)(t.Context()) require.NoError(t, err) diff --git a/pkg/appolly/discover/watcher_kube_test.go b/pkg/appolly/discover/watcher_kube_test.go index 800f8ed8d0..02183dd6d2 100644 --- a/pkg/appolly/discover/watcher_kube_test.go +++ b/pkg/appolly/discover/watcher_kube_test.go @@ -164,7 +164,7 @@ func TestWatcherKubeEnricherWithMatcher(t *testing.T) { version: "v[0-9]*" `), &pipeConfig)) - swi.Add(criteriaMatcherProvider(&pipeConfig, connectQueue, outputQueue, FindingCriteria(&pipeConfig, false), nil)) + swi.Add(criteriaMatcherProvider(&pipeConfig, connectQueue, outputQueue, FindingCriteria(&pipeConfig), nil)) nodesRunner, err := swi.Instance(t.Context()) require.NoError(t, err) diff --git a/pkg/appolly/discover/watcher_proc_test.go b/pkg/appolly/discover/watcher_proc_test.go index d8448bf766..b170041df1 100644 --- a/pkg/appolly/discover/watcher_proc_test.go +++ b/pkg/appolly/discover/watcher_proc_test.go @@ -202,7 +202,7 @@ func TestPortsFetchRequired(t *testing.T) { stateMux: sync.Mutex{}, bpfWatcherEnabled: false, fetchPorts: true, - findingCriteria: FindingCriteria(cfg, false), + findingCriteria: FindingCriteria(cfg), output: msg.NewQueue[[]Event[ProcessAttrs]](msg.ChannelBufferLen(1)), } @@ -300,7 +300,7 @@ func TestMinProcessAge(t *testing.T) { stateMux: sync.Mutex{}, bpfWatcherEnabled: false, fetchPorts: true, - findingCriteria: FindingCriteria(cfg, false), + findingCriteria: FindingCriteria(cfg), output: msg.NewQueue[[]Event[ProcessAttrs]](msg.ChannelBufferLen(1)), } From f2183a7f45f145fcfd075e47208ee7b6284c1220 Mon Sep 17 00:00:00 2001 From: Mike Dame Date: Wed, 25 Mar 2026 21:28:07 -0400 Subject: [PATCH 3/4] pipe queue thread safe + test fixes --- .../discover/dynamic_pid_selector_test.go | 47 ++++++++++++++++--- pkg/appolly/discover/matcher_dynamic.go | 4 +- pkg/appolly/discover/matcher_test.go | 16 +++---- pkg/pipe/msg/queue.go | 21 ++++++--- 4 files changed, 65 insertions(+), 23 deletions(-) diff --git a/pkg/appolly/discover/dynamic_pid_selector_test.go b/pkg/appolly/discover/dynamic_pid_selector_test.go index becb3f0aa0..db7b237bc7 100644 --- a/pkg/appolly/discover/dynamic_pid_selector_test.go +++ b/pkg/appolly/discover/dynamic_pid_selector_test.go @@ -4,7 +4,10 @@ package discover import ( + "context" + "slices" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -12,6 +15,38 @@ import ( "go.opentelemetry.io/obi/pkg/appolly/app" ) +// pidMultisetEqual reports whether a and b contain the same PIDs with the same multiplicity. +func pidMultisetEqual(a, b []app.PID) bool { + if len(a) != len(b) { + return false + } + sa := slices.Clone(a) + sb := slices.Clone(b) + slices.Sort(sa) + slices.Sort(sb) + return slices.Equal(sa, sb) +} + +// readPIDNotifyBatchesUntil reads from ch until the concatenation of batches matches want +// as a multiset (order of batches and within batches does not matter). +func readPIDNotifyBatchesUntil(t *testing.T, ch <-chan []app.PID, want []app.PID) []app.PID { + ctx, cancel := context.WithTimeout(t.Context(), 2*time.Second) + defer cancel() + var got []app.PID + for !pidMultisetEqual(got, want) { + if len(got) > len(want) { + t.Fatalf("unexpected extra PID notify batches: got %v want %v", got, want) + } + select { + case b := <-ch: + got = append(got, b...) + case <-ctx.Done(): + t.Fatalf("timeout reading notify batches: got %v want %v", got, want) + } + } + return got +} + func TestDynamicPIDSelector_AddPIDs_RemovePIDs_GetPIDs(t *testing.T) { d := NewDynamicPIDSelector() pids, ok := d.GetPIDs() @@ -74,24 +109,22 @@ func TestDynamicPIDSelector_AddPIDs_Notify(t *testing.T) { assert.Equal(t, []app.PID{99}, got) } -// TestDynamicPIDSelector_QueueNoDrop verifies that rapid AddPIDs/RemovePIDs accumulate -// in a single pending slice and are sent together when the consumer drains (no drops). +// TestDynamicPIDSelector_QueueNoDrop verifies that rapid AddPIDs/RemovePIDs are all delivered +// on the notify channels (nothing dropped). With a buffered notify channel, one logical burst can +// span multiple receives; the consumer must drain until the expected multiset is complete. func TestDynamicPIDSelector_QueueNoDrop(t *testing.T) { d := NewDynamicPIDSelector() d.AddPIDs(1, 2, 3, 4) removedCh := d.RemovedNotify() addedCh := d.AddedPIDsNotify() - // Drain the initial AddPIDs(1,2,3,4) <-addedCh d.RemovePIDs(1) d.RemovePIDs(2, 3) - gotRemoved := <-removedCh - assert.ElementsMatch(t, []app.PID{1, 2, 3}, gotRemoved) + readPIDNotifyBatchesUntil(t, removedCh, []app.PID{1, 2, 3}) d.AddPIDs(10, 20) d.AddPIDs(30) - gotAdded := <-addedCh - assert.ElementsMatch(t, []app.PID{10, 20, 30}, gotAdded) + readPIDNotifyBatchesUntil(t, addedCh, []app.PID{10, 20, 30}) } diff --git a/pkg/appolly/discover/matcher_dynamic.go b/pkg/appolly/discover/matcher_dynamic.go index a4098e86d8..43a854651a 100644 --- a/pkg/appolly/discover/matcher_dynamic.go +++ b/pkg/appolly/discover/matcher_dynamic.go @@ -65,13 +65,13 @@ func (m *DynamicMatcher) Run(ctx context.Context) { o := m.filter(i) m.Log.Debug("processes matching selection criteria", "len", len(o)) if len(o) > 0 { - m.Output.Send(o) + m.Output.SendCtx(ctx, o) } case removedPIDs := <-m.RemovedPIDsNotify: o := m.syntheticDeletesForRemovedPIDs(removedPIDs) if len(o) > 0 { m.Log.Debug("synthetic deletes for removed PIDs", "len", len(o)) - m.Output.Send(o) + m.Output.SendCtx(ctx, o) } } } diff --git a/pkg/appolly/discover/matcher_test.go b/pkg/appolly/discover/matcher_test.go index 2bbf282d35..417d7213fe 100644 --- a/pkg/appolly/discover/matcher_test.go +++ b/pkg/appolly/discover/matcher_test.go @@ -52,6 +52,7 @@ func TestMatchersMutuallyExclusive(t *testing.T) { runner, err := swi.Instance(t.Context()) require.NoError(t, err) runner.Start(t.Context()) + time.Sleep(50 * time.Millisecond) defer outQ.Close() // Matches static port-only (80) but not dynamic PID set — would appear if CriteriaMatcher ran. @@ -82,6 +83,7 @@ func TestMatchersMutuallyExclusive(t *testing.T) { runner, err := swi.Instance(t.Context()) require.NoError(t, err) runner.Start(t.Context()) + time.Sleep(50 * time.Millisecond) defer outQ.Close() inQ.Send([]Event[ProcessAttrs]{{Type: EventCreated, Obj: ProcessAttrs{pid: 12, openPorts: []uint32{80}}}}) @@ -662,10 +664,8 @@ func TestCriteriaMatcher_TargetPIDs(t *testing.T) { } func TestCriteriaMatcher_DynamicTargetPIDs(t *testing.T) { - pipeConfig := obi.Config{ServiceName: "dyn-svc", ServiceNamespace: "ns"} dynamicSelector := NewDynamicPIDSelector() dynamicSelector.AddPIDs(42) - configCriteria := FindingCriteria(&pipeConfig) discoveredProcesses := msg.NewQueue[[]Event[ProcessAttrs]](msg.ChannelBufferLen(10)) filteredProcessesQu := msg.NewQueue[[]Event[ProcessMatch]](msg.ChannelBufferLen(10)) @@ -673,9 +673,10 @@ func TestCriteriaMatcher_DynamicTargetPIDs(t *testing.T) { processInfo = func(pp ProcessAttrs) (*services.ProcessInfo, error) { return &services.ProcessInfo{Pid: pp.pid, ExePath: "/any/exe", OpenPorts: pp.openPorts}, nil } - matcherFunc, err := criteriaMatcherProvider(&pipeConfig, discoveredProcesses, filteredProcessesQu, configCriteria, dynamicSelector)(t.Context()) + runFn, err := dynamicMatcherProvider(discoveredProcesses, filteredProcessesQu, dynamicSelector)(t.Context()) require.NoError(t, err) - go matcherFunc(t.Context()) + go runFn(t.Context()) + time.Sleep(50 * time.Millisecond) defer filteredProcessesQu.Close() discoveredProcesses.Send([]Event[ProcessAttrs]{ @@ -712,10 +713,8 @@ func TestCriteriaMatcher_DynamicTargetPIDs(t *testing.T) { } func TestCriteriaMatcher_DynamicTargetPIDs_RemoveNotification(t *testing.T) { - pipeConfig := obi.Config{ServiceName: "dyn-svc", ServiceNamespace: "ns"} dynamicSelector := NewDynamicPIDSelector() dynamicSelector.AddPIDs(42, 100) - configCriteria := FindingCriteria(&pipeConfig) discoveredProcesses := msg.NewQueue[[]Event[ProcessAttrs]](msg.ChannelBufferLen(10)) filteredProcessesQu := msg.NewQueue[[]Event[ProcessMatch]](msg.ChannelBufferLen(10)) @@ -723,9 +722,10 @@ func TestCriteriaMatcher_DynamicTargetPIDs_RemoveNotification(t *testing.T) { processInfo = func(pp ProcessAttrs) (*services.ProcessInfo, error) { return &services.ProcessInfo{Pid: pp.pid, ExePath: "/any/exe", OpenPorts: pp.openPorts}, nil } - matcherFunc, err := criteriaMatcherProvider(&pipeConfig, discoveredProcesses, filteredProcessesQu, configCriteria, dynamicSelector)(t.Context()) + runFn, err := dynamicMatcherProvider(discoveredProcesses, filteredProcessesQu, dynamicSelector)(t.Context()) require.NoError(t, err) - go matcherFunc(t.Context()) + go runFn(t.Context()) + time.Sleep(50 * time.Millisecond) defer filteredProcessesQu.Close() discoveredProcesses.Send([]Event[ProcessAttrs]{ diff --git a/pkg/pipe/msg/queue.go b/pkg/pipe/msg/queue.go index a602b70a28..9174afc4a7 100644 --- a/pkg/pipe/msg/queue.go +++ b/pkg/pipe/msg/queue.go @@ -149,11 +149,19 @@ func (q *Queue[T]) chainedSend(ctx context.Context, o T, bypassPath []string) { return } - // this can happen in dead paths (which are valid for disabled pipeline branches), - // exiting early to save timeout management + // Subscribe appends to q.dsts under q.mt. Snapshot the subscriber list here so this Send + // iterates a stable copy: concurrent Subscribe is safe and we avoid racing on the slice + // header. Do not hold the mutex while writing to subscriber channels (Send can block). + q.mt.Lock() if len(q.dsts) == 0 { + q.mt.Unlock() + // this can happen in dead paths (which are valid for disabled pipeline branches), + // exiting early to save timeout management return } + dsts := make([]dst[T], len(q.dsts)) + copy(dsts, q.dsts) + q.mt.Unlock() if q.cfg.panicOnTimeout { // instead of directly panicking in sendTimeout, we first warn at 90% sendTimeout, @@ -163,7 +171,7 @@ func (q *Queue[T]) chainedSend(ctx context.Context, o T, bypassPath []string) { q.sendTimeout.Reset(q.cfg.sendTimeout) } var blocked []dst[T] - for _, d := range q.dsts { + for _, d := range dsts { select { case <-ctx.Done(): return @@ -235,9 +243,10 @@ func withRawOpts(opts subscribeOpts) SubscribeOpt { // It's important to notice that, if Subscribe is invoked after Send, the sent message // will be lost. // Concurrent invocations to Subscribe and Bypass are thread-safe between them, so you can be -// sure that any subscriber will get its own effective channel. But invocations to Subscribe are not -// thread-safe with the Send method. This means that concurrent invocations to Subscribe and Send might -// result in few initial lost messages. +// sure that any subscriber will get its own effective channel. Send snapshots the subscriber list +// under the same mutex as Subscribe, so concurrent Subscribe and Send do not race; a Send may still +// not deliver to a subscriber that Subscribe'd immediately after the snapshot (that subscriber +// misses that message). func (q *Queue[T]) Subscribe(options ...SubscribeOpt) <-chan T { q.assertNotClosed() q.mt.Lock() From 24eed0b6bbbaf72b6bf39d3eb6c219e433841fd2 Mon Sep 17 00:00:00 2001 From: Mike Dame Date: Thu, 26 Mar 2026 08:51:32 -0400 Subject: [PATCH 4/4] lint --- pkg/appolly/discover/dynamic_pid_selector_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/appolly/discover/dynamic_pid_selector_test.go b/pkg/appolly/discover/dynamic_pid_selector_test.go index db7b237bc7..51dca58fab 100644 --- a/pkg/appolly/discover/dynamic_pid_selector_test.go +++ b/pkg/appolly/discover/dynamic_pid_selector_test.go @@ -29,7 +29,7 @@ func pidMultisetEqual(a, b []app.PID) bool { // readPIDNotifyBatchesUntil reads from ch until the concatenation of batches matches want // as a multiset (order of batches and within batches does not matter). -func readPIDNotifyBatchesUntil(t *testing.T, ch <-chan []app.PID, want []app.PID) []app.PID { +func readPIDNotifyBatchesUntil(t *testing.T, ch <-chan []app.PID, want []app.PID) { ctx, cancel := context.WithTimeout(t.Context(), 2*time.Second) defer cancel() var got []app.PID @@ -44,7 +44,6 @@ func readPIDNotifyBatchesUntil(t *testing.T, ch <-chan []app.PID, want []app.PID t.Fatalf("timeout reading notify batches: got %v want %v", got, want) } } - return got } func TestDynamicPIDSelector_AddPIDs_RemovePIDs_GetPIDs(t *testing.T) {