Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[process-agent] Remove check intervals from pkg/process/config #14878

Merged
merged 3 commits into from
Dec 29, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 28 additions & 9 deletions cmd/process-agent/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (l *Collector) runCheck(c checks.Check, results *api.WeightedQueue) {
checks.StoreCheckOutput(c.Name(), nil)
}

if c.Name() == config.PodCheckName {
if c.Name() == checks.PodCheckName {
handlePodChecks(l, start, messages, results)
} else {
l.messagesToResultsQueue(start, c.Name(), messages, results)
Expand Down Expand Up @@ -592,11 +592,30 @@ func (l *Collector) runnerForCheck(c checks.Check, exit chan struct{}) (func(),

rtResults := l.resultsQueueForCheck(withRealTime.RealTimeName())

interval := checks.GetInterval(withRealTime.Name())
rtInterval := checks.GetInterval(withRealTime.RealTimeName())

if interval < rtInterval || interval%rtInterval != 0 {
// Check interval must be greater or equal to realtime check interval and the intervals must be divisible
// in order to be run on the same goroutine
defaultInterval := checks.GetDefaultInterval(withRealTime.Name())
defaultRTInterval := checks.GetDefaultInterval(withRealTime.RealTimeName())
log.Warnf(
"Invalid %s check interval overrides [%s,%s], resetting to defaults [%s,%s]",
withRealTime.Name(),
interval,
rtInterval,
defaultInterval,
defaultRTInterval,
)
interval = defaultInterval
rtInterval = defaultRTInterval
}

return checks.NewRunnerWithRealTime(
checks.RunnerConfig{
CheckInterval: l.cfg.CheckInterval(withRealTime.Name()),
RtInterval: l.cfg.CheckInterval(withRealTime.RealTimeName()),

CheckInterval: interval,
RtInterval: rtInterval,
ExitChan: exit,
RtIntervalChan: l.rtIntervalCh,
RtEnabled: func() bool {
Expand All @@ -616,7 +635,7 @@ func (l *Collector) basicRunner(c checks.Check, results *api.WeightedQueue, exit
l.runCheck(c, results)
}

ticker := time.NewTicker(l.cfg.CheckInterval(c.Name()))
ticker := time.NewTicker(checks.GetInterval(c.Name()))
for {
select {
case <-ticker.C:
Expand Down Expand Up @@ -686,7 +705,7 @@ func (l *Collector) consumePayloads(results *api.WeightedQueue, fwd forwarder.Fo
updateRTStatus = false
responses, err = fwd.SubmitOrchestratorChecks(forwarderPayload, payload.headers, int(orchestrator.K8sPod))
// Pod check manifest data
case config.PodCheckManifestName:
case checks.PodCheckManifestName:
updateRTStatus = false
responses, err = fwd.SubmitOrchestratorManifests(forwarderPayload, payload.headers)
case checks.ProcessDiscovery.Name():
Expand Down Expand Up @@ -844,7 +863,7 @@ func readResponseStatuses(checkName string, responses <-chan forwarder.Response)

func ignoreResponseBody(checkName string) bool {
switch checkName {
case checks.Pod.Name(), config.PodCheckManifestName, checks.ProcessEvents.Name():
case checks.Pod.Name(), checks.PodCheckManifestName, checks.ProcessEvents.Name():
return true
default:
return false
Expand All @@ -855,8 +874,8 @@ func ignoreResponseBody(checkName string) bool {
// By default we only send pod payloads containing pod metadata and pod manifests (yaml)
// Manifest payloads is a copy of pod manifests, we only send manifest payloads when feature flag is true
func handlePodChecks(l *Collector, start time.Time, messages []model.MessageBody, results *api.WeightedQueue) {
l.messagesToResultsQueue(start, config.PodCheckName, messages[:len(messages)/2], results)
l.messagesToResultsQueue(start, checks.PodCheckName, messages[:len(messages)/2], results)
if l.orchestrator.IsManifestCollectionEnabled {
l.messagesToResultsQueue(start, config.PodCheckManifestName, messages[len(messages)/2:], results)
l.messagesToResultsQueue(start, checks.PodCheckManifestName, messages[len(messages)/2:], results)
}
}
1 change: 0 additions & 1 deletion cmd/process-agent/collector_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,6 @@ func runCollectorTestWithAPIKeys(t *testing.T, check checks.Check, cfg *config.A
setProcessEventsEndpointsForTest(mockConfig, eventsEps...)

cfg.HostName = testHostName
cfg.CheckIntervals[check.Name()] = 500 * time.Millisecond

exit := make(chan struct{})

Expand Down
2 changes: 1 addition & 1 deletion cmd/process-agent/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ func TestIgnoreResponseBody(t *testing.T) {
{checkName: checks.Container.Name(), ignore: false},
{checkName: checks.RTContainer.Name(), ignore: false},
{checkName: checks.Pod.Name(), ignore: true},
{checkName: config.PodCheckManifestName, ignore: true},
{checkName: checks.PodCheckManifestName, ignore: true},
{checkName: checks.Connections.Name(), ignore: false},
{checkName: checks.ProcessEvents.Name(), ignore: true},
} {
Expand Down
13 changes: 13 additions & 0 deletions pkg/process/checks/checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,19 @@ import (
"github.com/DataDog/datadog-agent/pkg/process/config"
)

// Name for check performed by process-agent or system-probe
const (
ProcessCheckName = "process"
RTProcessCheckName = "rtprocess"
ContainerCheckName = "container"
RTContainerCheckName = "rtcontainer"
ConnectionsCheckName = "connections"
PodCheckName = "pod"
PodCheckManifestName = "pod_manifest"
DiscoveryCheckName = "process_discovery"
ProcessEventsCheckName = "process_events"
)

// Check is an interface for Agent checks that collect data. Each check returns
// a specific MessageBody type that will be published to the intake endpoint or
// processed in another way (e.g. printed for debugging).
Expand Down
2 changes: 1 addition & 1 deletion pkg/process/checks/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (c *ContainerCheck) Init(cfg *config.AgentConfig, info *model.SystemInfo) e
}

// Name returns the name of the ProcessCheck.
func (c *ContainerCheck) Name() string { return config.ContainerCheckName }
func (c *ContainerCheck) Name() string { return ContainerCheckName }

// RealTime indicates if this check only runs in real-time mode.
func (c *ContainerCheck) RealTime() bool { return false }
Expand Down
2 changes: 1 addition & 1 deletion pkg/process/checks/container_rt.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (r *RTContainerCheck) Init(_ *config.AgentConfig, sysInfo *model.SystemInfo
}

// Name returns the name of the RTContainerCheck.
func (r *RTContainerCheck) Name() string { return config.RTContainerCheckName }
func (r *RTContainerCheck) Name() string { return RTContainerCheckName }

// RealTime indicates if this check only runs in real-time mode.
func (r *RTContainerCheck) RealTime() bool { return true }
Expand Down
14 changes: 6 additions & 8 deletions pkg/process/checks/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import (

model "github.com/DataDog/agent-payload/v5/process"

"github.com/DataDog/datadog-agent/pkg/process/config"

"github.com/dustin/go-humanize"
)

Expand Down Expand Up @@ -70,17 +68,17 @@ var (
// HumanFormat takes the messages produced by a check run and outputs them in a human-readable format
func HumanFormat(check string, msgs []model.MessageBody, w io.Writer) error {
switch check {
case config.ProcessCheckName:
case ProcessCheckName:
return humanFormatProcess(msgs, w)
case config.RTProcessCheckName:
case RTProcessCheckName:
return humanFormatRealTimeProcess(msgs, w)
case config.ContainerCheckName:
case ContainerCheckName:
return humanFormatContainer(msgs, w)
case config.RTContainerCheckName:
case RTContainerCheckName:
return humanFormatRealTimeContainer(msgs, w)
case config.DiscoveryCheckName:
case DiscoveryCheckName:
return humanFormatProcessDiscovery(msgs, w)
case config.ProcessEventsCheckName:
case ProcessEventsCheckName:
return HumanFormatProcessEvents(msgs, w, true)
}
return ErrNoHumanFormat
Expand Down
101 changes: 101 additions & 0 deletions pkg/process/checks/interval.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

package checks

import (
"time"

"github.com/DataDog/datadog-agent/pkg/config"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

const (
ProcessCheckDefaultInterval = 10 * time.Second
RTProcessCheckDefaultInterval = 2 * time.Second
ContainerCheckDefaultInterval = 10 * time.Second
RTContainerCheckDefaultInterval = 2 * time.Second
ConnectionsCheckDefaultInterval = 30 * time.Second
PodCheckDefaultInterval = 10 * time.Second
ProcessDiscoveryCheckDefaultInterval = 4 * time.Hour

discoveryMinInterval = 10 * time.Minute

configIntervals = configPrefix + ".intervals"

// The interval, in seconds, at which we will run each check. If you want consistent
// behavior between real-time you may set the Container/ProcessRT intervals to 10.
// Defaults to 10s for normal checks and 2s for others.
configProcessInterval = configIntervals + ".process"
configRTProcessInterval = configIntervals + ".process_realtime"
configContainerInterval = configIntervals + ".container"
configRTContainerInterval = configIntervals + ".container_realtime"
configConnectionsInterval = configIntervals + ".connections"
)

var (
defaultIntervals = map[string]time.Duration{
ProcessCheckName: ProcessCheckDefaultInterval,
RTProcessCheckName: RTProcessCheckDefaultInterval,
ContainerCheckName: ContainerCheckDefaultInterval,
RTContainerCheckName: RTContainerCheckDefaultInterval,
ConnectionsCheckName: ConnectionsCheckDefaultInterval,
PodCheckName: PodCheckDefaultInterval,
DiscoveryCheckName: ProcessDiscoveryCheckDefaultInterval,
ProcessEventsCheckName: config.DefaultProcessEventsCheckInterval,
}

configKeys = map[string]string{
ProcessCheckName: configProcessInterval,
RTProcessCheckName: configRTProcessInterval,
ContainerCheckName: configContainerInterval,
RTContainerCheckName: configRTContainerInterval,
ConnectionsCheckName: configConnectionsInterval,
}
)

// GetDefaultInterval returns the default check interval value
func GetDefaultInterval(checkName string) time.Duration {
return defaultIntervals[checkName]
}

// GetInterval returns the configured check interval value
func GetInterval(checkName string) time.Duration {

switch checkName {
case DiscoveryCheckName:
// We don't need to check if the key exists since we already bound it to a default in InitConfig.
// We use a minimum of 10 minutes for this value.
discoveryInterval := config.Datadog.GetDuration("process_config.process_discovery.interval")
if discoveryInterval < discoveryMinInterval {
discoveryInterval = discoveryMinInterval
_ = log.Warnf("Invalid interval for process discovery (<= %s) using default value of %[1]s", discoveryMinInterval.String())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
_ = log.Warnf("Invalid interval for process discovery (<= %s) using default value of %[1]s", discoveryMinInterval.String())
_ = log.Warnf("Invalid interval for process discovery (< %s) using minimum value of %[1]s", discoveryMinInterval.String())

The actual comparison is < discoveryMinInterval, not <= discoveryMinInterval, so this log line should reflect that.

Also, we're using the minimum, not the default, so I fixed that in the suggestion as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, nice catch, let's fix this.

}
return discoveryInterval

case ProcessEventsCheckName:
eventsInterval := config.Datadog.GetDuration("process_config.event_collection.interval")
if eventsInterval < config.DefaultProcessEventsMinCheckInterval {
eventsInterval = config.DefaultProcessEventsCheckInterval
_ = log.Warnf("Invalid interval for process_events check (< %s) using default value of %s",
config.DefaultProcessEventsMinCheckInterval.String(), config.DefaultProcessEventsCheckInterval.String())
}
return eventsInterval

default:
defaultInterval := defaultIntervals[checkName]
configKey, ok := configKeys[checkName]
if !ok || !config.Datadog.IsSet(configKey) {
_ = log.Errorf("missing check interval for '%s', you must set a default", checkName)
Copy link
Contributor

@just-chillin just-chillin Dec 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might not be a good idea to log an error here. AFAIK most customers don't override the default check intervals, and I don't necessarily think we should be telling them that they need to do so.

Nit: Also, I think the convention is to not explicitly ignore the error for logs, do you mind removing the _ =. In the past, people have been confused when I added this and it would be nice to keep a consistent style.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Point taken, this was moved rather mechanically though, so the logging part exists today.

Copy link
Contributor

@just-chillin just-chillin Dec 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh interesting. I didn't notice that. I think it might be worthwhile removing while we're here, but I'll give you the final decision in this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you, it was probably done this way when we weren't binding defaults to these in pkg/config so removing the log makes sense!

return defaultInterval
}

if seconds := config.Datadog.GetInt(configKey); seconds != 0 {
log.Infof("Overriding %s check interval to %ds", configKey, seconds)
return time.Duration(seconds) * time.Second
}
return defaultInterval
}
}
42 changes: 42 additions & 0 deletions pkg/process/checks/interval_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

package checks

import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/DataDog/datadog-agent/pkg/config"
)

// TestProcessDiscoveryInterval tests to make sure that the process discovery interval validation works properly
func TestProcessDiscoveryInterval(t *testing.T) {
for _, tc := range []struct {
name string
interval time.Duration
expectedInterval time.Duration
}{
{
name: "allowed interval",
interval: 8 * time.Hour,
expectedInterval: 8 * time.Hour,
},
{
name: "below minimum",
interval: 0,
expectedInterval: discoveryMinInterval,
},
} {
t.Run(tc.name, func(t *testing.T) {
cfg := config.Mock(t)
cfg.Set("process_config.process_discovery.interval", tc.interval)

assert.Equal(t, tc.expectedInterval, GetInterval(DiscoveryCheckName))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mind adding test cases for the ProcessEventsCheck, and the default case? Should be an easy way to boost our code coverage a bit 😄

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might fit better in a separate function rather than a separate test case, but I'll leave that decision up to you

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I will add tests for these two other branches.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added tests and found bugs, best suggestion ever 😄

        discoveryMinInterval = 10 * time.Minute

-       configIntervals = configPrefix + ".intervals"
+       configIntervals = configPrefix + "intervals."

        // The interval, in seconds, at which we will run each check. If you want consistent
        // behavior between real-time you may set the Container/ProcessRT intervals to 10.
        // Defaults to 10s for normal checks and 2s for others.
-       configProcessInterval     = configIntervals + ".process"
-       configRTProcessInterval   = configIntervals + ".process_realtime"
-       configContainerInterval   = configIntervals + ".container"
-       configRTContainerInterval = configIntervals + ".container_realtime"
-       configConnectionsInterval = configIntervals + ".connections"
+       configProcessInterval     = configIntervals + "process"
+       configRTProcessInterval   = configIntervals + "process_realtime"
+       configContainerInterval   = configIntervals + "container"
+       configRTContainerInterval = configIntervals + "container_realtime"
+       configConnectionsInterval = configIntervals + "connections"

})
}
}
2 changes: 1 addition & 1 deletion pkg/process/checks/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (c *ConnectionsCheck) Init(cfg *config.AgentConfig, _ *model.SystemInfo) er
}

// Name returns the name of the ConnectionsCheck.
func (c *ConnectionsCheck) Name() string { return config.ConnectionsCheckName }
func (c *ConnectionsCheck) Name() string { return ConnectionsCheckName }

// RealTime indicates if this check only runs in real-time mode.
func (c *ConnectionsCheck) RealTime() bool { return false }
Expand Down
2 changes: 1 addition & 1 deletion pkg/process/checks/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (c *PodCheck) Init(_ *config.AgentConfig, info *model.SystemInfo) error {
}

// Name returns the name of the ProcessCheck.
func (c *PodCheck) Name() string { return config.PodCheckName }
func (c *PodCheck) Name() string { return PodCheckName }

// RealTime indicates if this check only runs in real-time mode.
func (c *PodCheck) RealTime() bool { return false }
Expand Down
6 changes: 3 additions & 3 deletions pkg/process/checks/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,10 @@ func (p *ProcessCheck) Init(_ *config.AgentConfig, info *model.SystemInfo) error
}

// Name returns the name of the ProcessCheck.
func (p *ProcessCheck) Name() string { return config.ProcessCheckName }
func (p *ProcessCheck) Name() string { return ProcessCheckName }

// RealTimeName returns the name of the RTProcessCheck
func (p *ProcessCheck) RealTimeName() string { return config.RTProcessCheckName }
func (p *ProcessCheck) RealTimeName() string { return RTProcessCheckName }

// RealTime indicates if this check only runs in real-time mode.
func (p *ProcessCheck) RealTime() bool { return false }
Expand Down Expand Up @@ -329,7 +329,7 @@ func fmtProcesses(
connsByPID map[int32][]*model.Connection,
) map[string][]*model.Process {
procsByCtr := make(map[string][]*model.Process)
connCheckIntervalS := int(cfg.CheckIntervals[config.ConnectionsCheckName] / time.Second)
connCheckIntervalS := int(GetInterval(ConnectionsCheckName) / time.Second)

for _, fp := range procs {
if skipProcess(disallowList, fp, lastProcs) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/process/checks/process_discovery_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (d *ProcessDiscoveryCheck) Init(_ *config.AgentConfig, info *model.SystemIn
}

// Name returns the name of the ProcessDiscoveryCheck.
func (d *ProcessDiscoveryCheck) Name() string { return config.DiscoveryCheckName }
func (d *ProcessDiscoveryCheck) Name() string { return DiscoveryCheckName }

// RealTime returns a value that says whether this check should be run in real time.
func (d *ProcessDiscoveryCheck) RealTime() bool { return false }
Expand Down
2 changes: 1 addition & 1 deletion pkg/process/checks/process_events_fallback.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (e *ProcessEventsCheck) Init(_ *config.AgentConfig, info *model.SystemInfo)
}

// Name returns the name of the ProcessEventsCheck.
func (e *ProcessEventsCheck) Name() string { return config.ProcessEventsCheckName }
func (e *ProcessEventsCheck) Name() string { return ProcessEventsCheckName }

// RealTime returns a value that says whether this check should be run in real time.
func (e *ProcessEventsCheck) RealTime() bool { return false }
Expand Down
2 changes: 1 addition & 1 deletion pkg/process/checks/process_events_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (e *ProcessEventsCheck) start() {
}

// Name returns the name of the ProcessEventsCheck.
func (e *ProcessEventsCheck) Name() string { return config.ProcessEventsCheckName }
func (e *ProcessEventsCheck) Name() string { return ProcessEventsCheckName }

// RealTime returns a value that says whether this check should be run in real time.
func (e *ProcessEventsCheck) RealTime() bool { return false }
Expand Down
2 changes: 1 addition & 1 deletion pkg/process/checks/process_rt.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func fmtProcessStats(
lastRun time.Time,
connsByPID map[int32][]*model.Connection,
) [][]*model.ProcessStat {
connCheckIntervalS := int(cfg.CheckIntervals[config.ConnectionsCheckName] / time.Second)
connCheckIntervalS := int(GetInterval(ConnectionsCheckName) / time.Second)

chunked := make([][]*model.ProcessStat, 0)
chunk := make([]*model.ProcessStat, 0, maxBatchSize)
Expand Down
Loading