Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[check command] Add --instance-filter option #15034

Merged
merged 14 commits into from
Jan 13, 2023
42 changes: 37 additions & 5 deletions cmd/agent/common/autodiscovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package common

import (
"context"
"fmt"
"time"

"go.uber.org/atomic"
Expand All @@ -18,6 +19,7 @@ import (
"github.com/DataDog/datadog-agent/pkg/autodiscovery/scheduler"
"github.com/DataDog/datadog-agent/pkg/config"
confad "github.com/DataDog/datadog-agent/pkg/config/autodiscovery"
"github.com/DataDog/datadog-agent/pkg/util/jsonquery"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

Expand Down Expand Up @@ -211,14 +213,14 @@ func (sf schedulerFunc) Stop() {
//
// If the context is cancelled, then any accumulated, matching changes are
// returned, even if that is fewer than discoveryMinInstances.
func WaitForConfigsFromAD(ctx context.Context, checkNames []string, discoveryMinInstances int) (configs []integration.Config) {
return waitForConfigsFromAD(ctx, false, checkNames, discoveryMinInstances)
func WaitForConfigsFromAD(ctx context.Context, checkNames []string, discoveryMinInstances int, instanceFilter string) (configs []integration.Config, lastError error) {
return waitForConfigsFromAD(ctx, false, checkNames, discoveryMinInstances, instanceFilter)
}

// WaitForAllConfigsFromAD waits until its context expires, and then returns
// the full set of checks scheduled by AD.
func WaitForAllConfigsFromAD(ctx context.Context) (configs []integration.Config) {
return waitForConfigsFromAD(ctx, true, []string{}, 0)
func WaitForAllConfigsFromAD(ctx context.Context) (configs []integration.Config, lastError error) {
return waitForConfigsFromAD(ctx, true, []string{}, 0, "")
}

// waitForConfigsFromAD waits for configs from the AD scheduler and returns them.
Expand All @@ -234,7 +236,7 @@ func WaitForAllConfigsFromAD(ctx context.Context) (configs []integration.Config)
// If wildcard is true, this gathers all configs scheduled before the context
// is cancelled, and then returns. It will not return before the context is
// cancelled.
func waitForConfigsFromAD(ctx context.Context, wildcard bool, checkNames []string, discoveryMinInstances int) (configs []integration.Config) {
func waitForConfigsFromAD(ctx context.Context, wildcard bool, checkNames []string, discoveryMinInstances int, instanceFilter string) (configs []integration.Config, returnErr error) {
configChan := make(chan integration.Config)

// signal to the scheduler when we are no longer waiting, so we do not continue
Expand Down Expand Up @@ -265,10 +267,24 @@ func waitForConfigsFromAD(ctx context.Context, wildcard bool, checkNames []strin
}
}

stopChan := make(chan struct{})
// add the scheduler in a goroutine, since it will schedule any "catch-up" immediately,
// placing items in configChan
go AC.AddScheduler("check-cmd", schedulerFunc(func(configs []integration.Config) {
for _, cfg := range configs {
if instanceFilter != "" {
instances, err := filterInstances(cfg.Instances, instanceFilter)
if err != nil {
returnErr = err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to get a list of error, and return an aggregated view of the errors.

we are using in several place https://pkg.go.dev/k8s.io/apimachinery/pkg/util/errors

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@clamoriniere Thanks for the suggestion. I modified the code to return an aggregated view of the errors here: 89534fd

Let me know if there is anything else to improve.

stopChan <- struct{}{}
break
}
if len(instances) == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if we want to exclude config with no instances here.
I'm not familiar enough with autodiscovery but is it possible that we get some config without any instance (such as logs config) on this part of the code ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Kaderinho thanks for the review.

In this case, note that we are here inside the instanceFilter != "" condition.

If there is no instances left after the filter being applied I think there is no need to return the config with no instances since the goal for waitForConfigsFromAD is to wait configs that match some conditions (checkNames, discoveryMinInstances, instanceFilter).

Maybe I'm missing something, please let me know :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible that we get some config without any instance (such as logs config) on this part of the code ?

AFAIK, waitForConfigsFromAD is only used for agent check <INTEGRATION> command to wait and retrieve integrations configs/instances that should be run.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay I get it sorry, I missed the fact that you use instanceFilter = "" in WaitForAllConfigsFromAD mb

continue
}
cfg.Instances = instances
}

if match(cfg) && waiting.Load() {
configChan <- cfg
}
Expand All @@ -279,9 +295,25 @@ func waitForConfigsFromAD(ctx context.Context, wildcard bool, checkNames []strin
select {
case cfg := <-configChan:
configs = append(configs, cfg)
case <-stopChan:
return
case <-ctx.Done():
return
}
}
return
}

func filterInstances(instances []integration.Data, instanceFilter string) ([]integration.Data, error) {
var newInstances []integration.Data
for _, instance := range instances {
exist, err := jsonquery.YAMLCheckExist(instance, instanceFilter)
if err != nil {
return nil, fmt.Errorf("instance filter error: %v", err)
}
if exist {
newInstances = append(newInstances, instance)
}
}
return newInstances, nil
}
9 changes: 7 additions & 2 deletions cmd/agent/subcommands/jmx/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type cliParams struct {
saveFlare bool
discoveryTimeout uint
discoveryMinInstances uint
instanceFilter string
}

// Commands returns a slice of subcommands for the 'agent' command.
Expand All @@ -63,6 +64,7 @@ func Commands(globalParams *command.GlobalParams) []*cobra.Command {
jmxCmd.PersistentFlags().UintVarP(&cliParams.discoveryTimeout, "discovery-timeout", "", 5, "max retry duration until Autodiscovery resolves the check template (in seconds)")
jmxCmd.PersistentFlags().UintVarP(&discoveryRetryInterval, "discovery-retry-interval", "", 1, "(unused)")
jmxCmd.PersistentFlags().UintVarP(&cliParams.discoveryMinInstances, "discovery-min-instances", "", 1, "minimum number of config instances to be discovered before running the check(s)")
jmxCmd.PersistentFlags().StringVarP(&cliParams.instanceFilter, "instance-filter", "", "", "filter instances using jq style syntax, example: --instance-filter '.ip_address == \"127.0.0.51\"'")

// All subcommands use the same provided components, with a different
// oneShot callback, and with some complex derivation of the
Expand Down Expand Up @@ -241,11 +243,14 @@ func runJmxCommandConsole(log log.Component, config config.Component, cliParams
context.Background(), time.Duration(cliParams.discoveryTimeout)*time.Second)
var allConfigs []integration.Config
if len(cliParams.cliSelectedChecks) == 0 {
allConfigs = common.WaitForAllConfigsFromAD(waitCtx)
allConfigs, err = common.WaitForAllConfigsFromAD(waitCtx)
} else {
allConfigs = common.WaitForConfigsFromAD(waitCtx, cliParams.cliSelectedChecks, int(cliParams.discoveryMinInstances))
allConfigs, err = common.WaitForConfigsFromAD(waitCtx, cliParams.cliSelectedChecks, int(cliParams.discoveryMinInstances), cliParams.instanceFilter)
}
cancelTimeout()
if err != nil {
return err
}

err = standalone.ExecJMXCommandConsole(cliParams.command, cliParams.cliSelectedChecks, cliParams.jmxLogLevel, allConfigs)

Expand Down
7 changes: 6 additions & 1 deletion cmd/cluster-agent/commands/check/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ var (
checkPause int
checkName string
checkDelay int
instanceFilter string
logLevel string
formatJSON bool
formatTable bool
Expand Down Expand Up @@ -72,6 +73,7 @@ func setupCmd(cmd *cobra.Command) {
cmd.Flags().IntVar(&checkPause, "pause", 0, "pause between multiple runs of the check, in milliseconds")
cmd.Flags().StringVarP(&logLevel, "log-level", "l", "", "set the log level (default 'off') (deprecated, use the env var DD_LOG_LEVEL instead)")
cmd.Flags().IntVarP(&checkDelay, "delay", "d", 100, "delay between running the check and grabbing the metrics in milliseconds")
cmd.Flags().StringVarP(&instanceFilter, "instance-filter", "", "", "filter instances using jq style syntax, example: --instance-filter '.ip_address == \"127.0.0.51\"'")
cmd.Flags().BoolVarP(&formatJSON, "json", "", false, "format aggregator and check runner output as json")
cmd.Flags().BoolVarP(&formatTable, "table", "", false, "format aggregator and check runner output as an ascii table")
cmd.Flags().StringVarP(&breakPoint, "breakpoint", "b", "", "set a breakpoint at a particular line number (Python checks only)")
Expand Down Expand Up @@ -164,8 +166,11 @@ func Check(loggerName config.LoggerName, confFilePath *string, flagNoColor *bool

waitCtx, cancelTimeout := context.WithTimeout(
context.Background(), time.Duration(discoveryTimeout)*time.Second)
allConfigs := common.WaitForConfigsFromAD(waitCtx, []string{checkName}, int(discoveryMinInstances))
allConfigs, err := common.WaitForConfigsFromAD(waitCtx, []string{checkName}, int(discoveryMinInstances), instanceFilter)
cancelTimeout()
if err != nil {
return err
}

// make sure the checks in cs are not JMX checks
for idx := range allConfigs {
Expand Down
8 changes: 7 additions & 1 deletion pkg/cli/subcommands/check/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type cliParams struct {
checkPause int
checkName string
checkDelay int
instanceFilter string
logLevel string
formatJSON bool
formatTable bool
Expand Down Expand Up @@ -118,6 +119,7 @@ func MakeCommand(globalParamsGetter func() GlobalParams) *cobra.Command {
cmd.Flags().IntVar(&cliParams.checkPause, "pause", 0, "pause between multiple runs of the check, in milliseconds")
cmd.Flags().StringVarP(&cliParams.logLevel, "log-level", "l", "", "set the log level (default 'off') (deprecated, use the env var DD_LOG_LEVEL instead)")
cmd.Flags().IntVarP(&cliParams.checkDelay, "delay", "d", 100, "delay between running the check and grabbing the metrics in milliseconds")
cmd.Flags().StringVarP(&cliParams.instanceFilter, "instance-filter", "", "", "filter instances using jq style syntax, example: --instance-filter '.ip_address == \"127.0.0.51\"'")
cmd.Flags().BoolVarP(&cliParams.formatJSON, "json", "", false, "format aggregator and check runner output as json")
cmd.Flags().BoolVarP(&cliParams.formatTable, "table", "", false, "format aggregator and check runner output as an ascii table")
cmd.Flags().StringVarP(&cliParams.breakPoint, "breakpoint", "b", "", "set a breakpoint at a particular line number (Python checks only)")
Expand Down Expand Up @@ -190,8 +192,12 @@ func run(log log.Component, config config.Component, cliParams *cliParams) error

waitCtx, cancelTimeout := context.WithTimeout(
context.Background(), time.Duration(cliParams.discoveryTimeout)*time.Second)
allConfigs := common.WaitForConfigsFromAD(waitCtx, []string{cliParams.checkName}, int(cliParams.discoveryMinInstances))

allConfigs, err := common.WaitForConfigsFromAD(waitCtx, []string{cliParams.checkName}, int(cliParams.discoveryMinInstances), cliParams.instanceFilter)
cancelTimeout()
if err != nil {
return err
}

// make sure the checks in cs are not JMX checks
for idx := range allConfigs {
Expand Down
17 changes: 17 additions & 0 deletions pkg/util/jsonquery/yaml.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ package jsonquery
import (
"fmt"
"time"

"gopkg.in/yaml.v3"
)

// Copy from https://github.com/itchyny/gojq/blob/main/cli/yaml.go
Expand Down Expand Up @@ -46,3 +48,18 @@ func NormalizeYAMLForGoJQ(v interface{}) interface{} {
return v
}
}

// YAMLCheckExist check a property/value from a YAML exist (jq style syntax)
func YAMLCheckExist(yamlData []byte, query string) (bool, error) {
var yamlContent interface{}
if err := yaml.Unmarshal(yamlData, &yamlContent); err != nil {
return false, err
}
yamlContent = NormalizeYAMLForGoJQ(yamlContent)
output, _, err := RunSingleOutput(query, yamlContent)
var exist bool
if err := yaml.Unmarshal([]byte(output), &exist); err != nil {
return false, fmt.Errorf("filter query must return a boolean: %s", err)
}
return exist, err
}
32 changes: 32 additions & 0 deletions pkg/util/jsonquery/yaml_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2023-present Datadog, Inc.

package jsonquery

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/DataDog/datadog-agent/pkg/autodiscovery/integration"
)

func TestYAMLExistQuery(t *testing.T) {
exist, err := YAMLCheckExist(integration.Data("{\"ip_address\": \"127.0.0.50\"}"), ".ip_address == \"127.0.0.50\"")
assert.NoError(t, err)
assert.True(t, exist)

exist, err = YAMLCheckExist(integration.Data("{\"ip_address\": \"127.0.0.50\"}"), ".ip_address == \"127.0.0.99\"")
assert.NoError(t, err)
assert.False(t, exist)

exist, err = YAMLCheckExist(integration.Data("{\"ip_address\": \"127.0.0.50\"}"), ".ip_address")
assert.EqualError(t, err, "filter query must return a boolean: yaml: unmarshal errors:\n line 1: cannot unmarshal !!str `127.0.0.50` into bool")
assert.False(t, exist)

exist, err = YAMLCheckExist(integration.Data("{}"), ".ip_address == \"127.0.0.99\"")
assert.NoError(t, err)
assert.False(t, exist)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Each section from every release note are combined when the
# CHANGELOG.rst is rendered. So the text needs to be worded so that
# it does not depend on any information only available in another
# section. This may mean repeating some details, but each section
# must be readable independently of the other.
#
# Each section note must be formatted as reStructuredText.
---
enhancements:
- |
Add an ``--instance-filter`` option to the Agent check command.