Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: exclude matching metrics #1914

Merged
merged 9 commits into from
Aug 29, 2024
Merged
13 changes: 10 additions & 3 deletions internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ type context struct {
EntityMap entity.KnownIDs
idLookup host.IDLookup
shouldIncludeEvent sampler.IncludeSampleMatchFn
shouldExcludeEvent sampler.ExcludeSampleMatchFn
}

func (c *context) Context() context2.Context {
Expand Down Expand Up @@ -206,6 +207,7 @@ func NewContext(
resolver hostname.ResolverChangeNotifier,
lookup host.IDLookup,
sampleMatchFn sampler.IncludeSampleMatchFn,
sampleExcludeFn sampler.ExcludeSampleMatchFn,
) *context {
ctx, cancel := context2.WithCancel(context2.Background())

Expand All @@ -223,6 +225,7 @@ func NewContext(
resolver: resolver,
idLookup: lookup,
shouldIncludeEvent: sampleMatchFn,
shouldExcludeEvent: sampleExcludeFn,
agentKey: agentKey,
}
}
Expand Down Expand Up @@ -280,8 +283,9 @@ func NewAgent(
cloudHarvester.Initialize(cloud.WithProvider(cloud.Type(cfg.CloudProvider)))

idLookupTable := NewIdLookup(hostnameResolver, cloudHarvester, cfg.DisplayName)
sampleMatchFn := sampler.NewSampleMatchFn(cfg.EnableProcessMetrics, cfg.IncludeMetricsMatchers, ffRetriever)
ctx := NewContext(cfg, buildVersion, hostnameResolver, idLookupTable, sampleMatchFn)
sampleMatchFn := sampler.NewSampleMatchFn(cfg.EnableProcessMetrics, config.MetricsMap(cfg.IncludeMetricsMatchers), ffRetriever)
sampleExcludeFn := sampler.NewSampleMatchFn(cfg.EnableProcessMetrics, config.MetricsMap(cfg.ExcludeMetricsMatchers), ffRetriever)
ctx := NewContext(cfg, buildVersion, hostnameResolver, idLookupTable, sampler.IncludeSampleMatchFn(sampleMatchFn), sampler.ExcludeSampleMatchFn(sampleExcludeFn))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this casting needed? I see it wasn't before, did you add it for verbosity?

Copy link
Contributor Author

@DavSanchez DavSanchez Aug 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I modified NewSampleMatchFn to accept a generic MetricsMap, so for both include and exclude I do the explicit conversion. Otherwise I had to convert only ExcludeMetricsMap into an IncludeMetricsMap to be able to pass it to NewSampleMatchFn, which didn't feel right and could be confusing for the future us.

agentKey, err := idLookupTable.AgentKey()
if err != nil {
Expand Down Expand Up @@ -1158,7 +1162,10 @@ func (c *context) SendEvent(event sample.Event, entityKey entity.Key) {
}
}

includeSample := c.shouldIncludeEvent(event)
// check if event should be included
// include takes precedence, so the event will be included if
// it IS NOT EXCLUDED or if it IS INCLUDED
includeSample := !c.shouldExcludeEvent(event) || c.shouldIncludeEvent(event)
if !includeSample {
aclog.
WithField("entity_key", entityKey.String()).
Expand Down
66 changes: 63 additions & 3 deletions internal/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func newTesting(cfg *config.Config) *Agent {
cloudDetector := cloud.NewDetector(true, 0, 0, 0, false)
lookups := NewIdLookup(hostname.CreateResolver("", "", true), cloudDetector, cfg.DisplayName)

ctx := NewContext(cfg, "1.2.3", testhelpers.NullHostnameResolver, lookups, matcher)
ctx := NewContext(cfg, "1.2.3", testhelpers.NullHostnameResolver, lookups, matcher, matcher)

st := delta.NewStore(dataDir, "default", cfg.MaxInventorySize, true)

Expand Down Expand Up @@ -146,7 +146,7 @@ func TestIgnoreInventory(t *testing.T) {
}

func TestServicePidMap(t *testing.T) {
ctx := NewContext(&config.Config{}, "", testhelpers.NullHostnameResolver, NilIDLookup, matcher)
ctx := NewContext(&config.Config{}, "", testhelpers.NullHostnameResolver, NilIDLookup, matcher, matcher)
svc, ok := ctx.GetServiceForPid(1)
assert.False(t, ok)
assert.Len(t, svc, 0)
Expand Down Expand Up @@ -915,6 +915,65 @@ func Test_ProcessSampling(t *testing.T) {
}
}

func Test_ProcessSamplingExcludes(t *testing.T) {
t.Parallel()

someSample := &types.ProcessSample{
ProcessDisplayName: "some-process",
}

type testCase struct {
name string
c *config.Config
ff feature_flags.Retriever
want bool
}
testCases := []testCase{
{
name: "Include not matching must not exclude",
c: &config.Config{IncludeMetricsMatchers: map[string][]string{"process.name": {"does-not-match"}}, DisableCloudMetadata: true},
ff: test.NewFFRetrieverReturning(false, false),
want: false,
},
{
name: "Include matching should not exclude",
c: &config.Config{IncludeMetricsMatchers: map[string][]string{"process.name": {"some-process"}}, DisableCloudMetadata: true},
ff: test.NewFFRetrieverReturning(false, false),
want: false,
},
{
name: "Exclude matching should exclude",
c: &config.Config{ExcludeMetricsMatchers: map[string][]string{"process.name": {"some-process"}}, DisableCloudMetadata: true},
ff: test.NewFFRetrieverReturning(false, false),
want: true,
},
{
name: "Exclude not matching should not exclude",
c: &config.Config{ExcludeMetricsMatchers: map[string][]string{"process.name": {"does-not-match"}}, DisableCloudMetadata: true},
ff: test.NewFFRetrieverReturning(false, false),
want: false,
},
{
name: "Exclude matching should exclude even if include is configured",
c: &config.Config{IncludeMetricsMatchers: map[string][]string{"process.name": {"some-process"}}, ExcludeMetricsMatchers: map[string][]string{"process.name": {"some-process"}}, DisableCloudMetadata: true},
ff: test.NewFFRetrieverReturning(false, false),
want: true,
},
}

for _, tc := range testCases {
testCase := tc
a, _ := NewAgent(testCase.c, "test", "userAgent", testCase.ff)

t.Run(testCase.name, func(t *testing.T) {
t.Parallel()

actual := a.Context.shouldExcludeEvent(someSample)
assert.Equal(t, testCase.want, actual)
})
}
}

type fakeEventSender struct{}

func (f fakeEventSender) QueueEvent(_ sample.Event, _ entity.Key) error {
Expand All @@ -941,7 +1000,8 @@ func TestContext_SendEvent_LogTruncatedEvent(t *testing.T) {
"0.0.0",
testhelpers.NewFakeHostnameResolver("foobar", "foo", nil),
NilIDLookup,
func(sample interface{}) bool { return true },
matcher,
matcher,
)
c.eventSender = fakeEventSender{}

Expand Down
2 changes: 1 addition & 1 deletion internal/agent/event_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ func TestEventSender_ResponseError(t *testing.T) {
ConnectEnabled: true,
PayloadCompressionLevel: gzip.NoCompression,
}
c := NewContext(cfg, "1.2.3", testhelpers.NullHostnameResolver, host.IDLookup{}, nil)
c := NewContext(cfg, "1.2.3", testhelpers.NullHostnameResolver, host.IDLookup{}, nil, nil)
c.setAgentKey(agentKey)
c.SetAgentIdentity(agentIdn)

Expand Down
20 changes: 18 additions & 2 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,13 @@ var (

type CustomAttributeMap map[string]interface{}

type MetricsMap map[string][]string

// IncludeMetricsMap configuration type to Map include_matching_metrics setting env var
type IncludeMetricsMap map[string][]string
type IncludeMetricsMap MetricsMap

// IncludeMetricsMap configuration type to Map exclude_matching_metrics setting env var.
type ExcludeMetricsMap MetricsMap

// LogFilters configuration specifies which log entries should be included/excluded.
type LogFilters map[string][]interface{}
Expand Down Expand Up @@ -1229,6 +1234,16 @@ type Config struct {
// Public: Yes
IncludeMetricsMatchers IncludeMetricsMap `yaml:"include_matching_metrics" envconfig:"include_matching_metrics"`

// ExcludeMetricsMatchers Configuration of the metrics matchers that determine which metric data should the agent
// filter out and not send to the New Relic backend.
// If no configuration is defined, the previous behaviour is maintained, i.e., every metric data captured is sent.
// If a configuration is defined, then only metric data not matching the configuration is sent.
// Note that ALL DATA MATCHED WILL BE DROPPED.
// Also note that at present it ONLY APPLIES to metric data related to processes. All other metric data is still being sent as usual.
// Default: none
// Public: Yes
ExcludeMetricsMatchers ExcludeMetricsMap `envconfig:"exclude_matching_metrics" yaml:"exclude_matching_metrics"`

// AgentMetricsEndpoint Set the endpoint (host:port) for the HTTP server the agent will use to server OpenMetrics
// if empty the server will be not spawned
// Default: empty
Expand Down Expand Up @@ -1901,7 +1916,8 @@ func NewConfig() *Config {
MetricsNFSSampleRate: DefaultMetricsNFSSampleRate,
SmartVerboseModeEntryLimit: DefaultSmartVerboseModeEntryLimit,
DefaultIntegrationsTempDir: defaultIntegrationsTempDir,
IncludeMetricsMatchers: defaultMetricsMatcherConfig,
IncludeMetricsMatchers: defaultIncludeMetricsMatcherConfig,
ExcludeMetricsMatchers: defaultExcludeMetricsMatcherConfig,
InventoryQueueLen: DefaultInventoryQueue,
NtpMetrics: NewNtpConfig(),
Http: NewHttpConfig(),
Expand Down
3 changes: 2 additions & 1 deletion pkg/config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ var (
defaultProxyValidateCerts = false
defaultProxyConfigPlugin = true
defaultWinRemovableDrives = true
defaultMetricsMatcherConfig = IncludeMetricsMap{}
defaultIncludeMetricsMatcherConfig = IncludeMetricsMap{}
defaultExcludeMetricsMatcherConfig = ExcludeMetricsMap{}
defaultRegisterMaxRetryBoSecs = 60
defaultNtpPool = []string{} // i.e: []string{"time.cloudflare.com"}
defaultNtpEnabled = false
Expand Down
126 changes: 76 additions & 50 deletions pkg/metrics/sampler/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,17 @@ var (
}
)

// MatcherFn func that returns whether an event/sample is matched. It satisfies
// the metrics matcher (processor.MatcherChain) interface.
type MatcherFn func(event any) bool

// IncludeSampleMatchFn func that returns whether an event/sample should be included, it satisfies
// the metrics matcher (processor.MatcherChain) interface.
type IncludeSampleMatchFn func(sample interface{}) bool
type IncludeSampleMatchFn MatcherFn

// ExcludeSampleMatchFn func that returns whether an event/sample should be excluded, it satisfies
// the metrics matcher (processor.MatcherChain) interface.
type ExcludeSampleMatchFn MatcherFn

// ExpressionMatcher is an interface every evaluator must implement
type ExpressionMatcher interface {
Expand Down Expand Up @@ -214,7 +222,7 @@ type MatcherChain struct {
// NewMatcherChain creates a new chain of matchers.
// Each expression will generate an matcher that gets added to the chain
// While the chain will be matched for each "sample", it terminates as soon as 1 match is matched (result = true)
func NewMatcherChain(expressions config.IncludeMetricsMap) MatcherChain {
func NewMatcherChain(expressions config.MetricsMap) MatcherChain {
chain := MatcherChain{Matchers: map[string][]ExpressionMatcher{}, Enabled: false}

// no matchers means the chain will be disabled
Expand Down Expand Up @@ -271,82 +279,100 @@ func (ne constantMatcher) String() string {

// NewSampleMatchFn creates new includeSampleMatchFn func, enableProcessMetrics might be nil when
// value was not set.
func NewSampleMatchFn(enableProcessMetrics *bool, includeMetricsMatchers config.IncludeMetricsMap, ffRetriever feature_flags.Retriever) IncludeSampleMatchFn {
func NewSampleMatchFn(enableProcessMetrics *bool, metricsMatchers config.MetricsMap, ffRetriever feature_flags.Retriever) MatcherFn {
// configuration option always takes precedence over FF and matchers configuration
if enableProcessMetrics == nil {
// if config option is not set, check if we have rules defined. those take precedence over the FF
ec := NewMatcherChain(includeMetricsMatchers)
if ec.Enabled {
mlog.
WithField(config.TracesFieldName, config.FeatureTrace).
Tracef("EnableProcessMetrics is EMPTY and rules ARE defined, process metrics will be ENABLED for matching processes")
return func(sample interface{}) bool {
return ec.Evaluate(sample)
}
matcher := matcherFromMetricsMatchers(metricsMatchers)
if matcher != nil {
return matcher
}

// configuration option is not defined and feature flag is present, FF determines, otherwise
// all process samples will be excluded
return func(sample interface{}) bool {
_, isProcessSample := sample.(*types.ProcessSample)
_, isFlatProcessSample := sample.(*types.FlatProcessSample)

if !isProcessSample && !isFlatProcessSample {
return true
}

enabled, exists := ffRetriever.GetFeatureFlag(fflag.FlagFullProcess)
return exists && enabled
}
return matcherFromFeatureFlag(ffRetriever)
}

if excludeProcessMetrics(enableProcessMetrics) {
mlog.
WithField(config.TracesFieldName, config.FeatureTrace).
Trace("EnableProcessMetrics is FALSE, process metrics will be DISABLED")
return func(sample interface{}) bool {
switch sample.(type) {
case *types.ProcessSample:
mlog.
WithField(config.TracesFieldName, config.FeatureTrace).
Trace("Got a sample of type '*types.ProcessSample' so excluding sample.")
// no process samples are included
return false
case *types.FlatProcessSample:
mlog.
WithField(config.TracesFieldName, config.FeatureTrace).
Trace("Got a sample of type '*types.FlatProcessSample' so excluding sample.")
// no flat process samples are included
return false
default:
mlog.
WithField(config.TracesFieldName, config.FeatureTrace).
Tracef("Got a sample of type '%s' that should not be excluded.", reflect.TypeOf(sample).String())
// other samples are included
return true
}
}

return matcherForDisabledMetrics()
}

ec := NewMatcherChain(includeMetricsMatchers)
if ec.Enabled {
matcherChain := NewMatcherChain(metricsMatchers)
if matcherChain.Enabled {
mlog.
WithField(config.TracesFieldName, config.FeatureTrace).
Trace("EnableProcessMetrics is TRUE and rules ARE defined, process metrics will be ENABLED for matching processes")
return func(sample interface{}) bool {
return ec.Evaluate(sample)
}

return matcherChain.Evaluate
}

mlog.
WithField(config.TracesFieldName, config.FeatureTrace).
Trace("EnableProcessMetrics is TRUE and rules are NOT defined, ALL process metrics will be ENABLED")

return func(sample interface{}) bool {
// all process samples are included
return true
}
}

func matcherForDisabledMetrics() MatcherFn {
return func(sample interface{}) bool {
switch sample.(type) {
case *types.ProcessSample:
mlog.
WithField(config.TracesFieldName, config.FeatureTrace).
Trace("Got a sample of type '*types.ProcessSample' so excluding sample.")
// no process samples are included
return false
case *types.FlatProcessSample:
mlog.
WithField(config.TracesFieldName, config.FeatureTrace).
Trace("Got a sample of type '*types.FlatProcessSample' so excluding sample.")
// no flat process samples are included
return false
default:
mlog.
WithField(config.TracesFieldName, config.FeatureTrace).
Tracef("Got a sample of type '%s' that should not be excluded.", reflect.TypeOf(sample).String())
// other samples are included
return true
}
}
}

func matcherFromMetricsMatchers(metricsMatchers config.MetricsMap) MatcherFn {
// if config option is not set, check if we have rules defined. those take precedence over the FF
matcherChain := NewMatcherChain(metricsMatchers)
if matcherChain.Enabled {
mlog.
WithField(config.TracesFieldName, config.FeatureTrace).
Tracef("EnableProcessMetrics is EMPTY and rules ARE defined, process metrics will be ENABLED for matching processes")

return matcherChain.Evaluate
}

return nil
}

func matcherFromFeatureFlag(ffRetriever feature_flags.Retriever) MatcherFn {
return func(sample any) bool {
_, isProcessSample := sample.(*types.ProcessSample)
_, isFlatProcessSample := sample.(*types.FlatProcessSample)

if !isProcessSample && !isFlatProcessSample {
return true
}

enabled, exists := ffRetriever.GetFeatureFlag(fflag.FlagFullProcess)

return exists && enabled
}
}

func excludeProcessMetrics(enableProcessMetrics *bool) bool {
if enableProcessMetrics == nil || *enableProcessMetrics {
return false
Expand Down
4 changes: 2 additions & 2 deletions pkg/metrics/sampler/matcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ func Test_EvaluatorChain_LogTraceMatcher(t *testing.T) {
javaProcessSample := types.ProcessSample{ProcessDisplayName: "java", CmdLine: "/bin/java"}

rule := config.IncludeMetricsMap{"process.name": {"java"}}
ec := sampler.NewMatcherChain(rule)
ec := sampler.NewMatcherChain(config.MetricsMap(rule))

assert.Len(t, ec.Matchers, len(rule))
assert.EqualValues(t, true, ec.Evaluate(javaProcessSample))
Expand Down Expand Up @@ -741,7 +741,7 @@ func TestNewSampleMatchFn(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
matchFn := sampler.NewSampleMatchFn(tt.args.enableProcessMetrics, tt.args.includeMetricsMatchers, tt.args.ffRetriever)
matchFn := sampler.NewSampleMatchFn(tt.args.enableProcessMetrics, config.MetricsMap(tt.args.includeMetricsMatchers), tt.args.ffRetriever)
assert.Equal(t, tt.include, matchFn(tt.args.sample))
})
}
Expand Down
Loading
Loading