Skip to content

Commit

Permalink
[process-agent] Remove check intervals from pkg/process/config (#14878)
Browse files Browse the repository at this point in the history
* [process-agent] Remove check intervals from pkg/process/config

- Remove check interval management from pkg/process/config package
- Never store intervals, just use config settings
- Generalize check for process and process RT check intervals

* Fix MacOS tests

* Address review feedback from @just-chillin
  • Loading branch information
xornivore authored Dec 29, 2022
1 parent 9ea5c01 commit c4c6a3b
Show file tree
Hide file tree
Showing 19 changed files with 325 additions and 180 deletions.
37 changes: 28 additions & 9 deletions cmd/process-agent/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (l *Collector) runCheck(c checks.Check, results *api.WeightedQueue) {
checks.StoreCheckOutput(c.Name(), nil)
}

if c.Name() == config.PodCheckName {
if c.Name() == checks.PodCheckName {
handlePodChecks(l, start, messages, results)
} else {
l.messagesToResultsQueue(start, c.Name(), messages, results)
Expand Down Expand Up @@ -592,11 +592,30 @@ func (l *Collector) runnerForCheck(c checks.Check, exit chan struct{}) (func(),

rtResults := l.resultsQueueForCheck(withRealTime.RealTimeName())

interval := checks.GetInterval(withRealTime.Name())
rtInterval := checks.GetInterval(withRealTime.RealTimeName())

if interval < rtInterval || interval%rtInterval != 0 {
// Check interval must be greater or equal to realtime check interval and the intervals must be divisible
// in order to be run on the same goroutine
defaultInterval := checks.GetDefaultInterval(withRealTime.Name())
defaultRTInterval := checks.GetDefaultInterval(withRealTime.RealTimeName())
log.Warnf(
"Invalid %s check interval overrides [%s,%s], resetting to defaults [%s,%s]",
withRealTime.Name(),
interval,
rtInterval,
defaultInterval,
defaultRTInterval,
)
interval = defaultInterval
rtInterval = defaultRTInterval
}

return checks.NewRunnerWithRealTime(
checks.RunnerConfig{
CheckInterval: l.cfg.CheckInterval(withRealTime.Name()),
RtInterval: l.cfg.CheckInterval(withRealTime.RealTimeName()),

CheckInterval: interval,
RtInterval: rtInterval,
ExitChan: exit,
RtIntervalChan: l.rtIntervalCh,
RtEnabled: func() bool {
Expand All @@ -616,7 +635,7 @@ func (l *Collector) basicRunner(c checks.Check, results *api.WeightedQueue, exit
l.runCheck(c, results)
}

ticker := time.NewTicker(l.cfg.CheckInterval(c.Name()))
ticker := time.NewTicker(checks.GetInterval(c.Name()))
for {
select {
case <-ticker.C:
Expand Down Expand Up @@ -686,7 +705,7 @@ func (l *Collector) consumePayloads(results *api.WeightedQueue, fwd forwarder.Fo
updateRTStatus = false
responses, err = fwd.SubmitOrchestratorChecks(forwarderPayload, payload.headers, int(orchestrator.K8sPod))
// Pod check manifest data
case config.PodCheckManifestName:
case checks.PodCheckManifestName:
updateRTStatus = false
responses, err = fwd.SubmitOrchestratorManifests(forwarderPayload, payload.headers)
case checks.ProcessDiscovery.Name():
Expand Down Expand Up @@ -844,7 +863,7 @@ func readResponseStatuses(checkName string, responses <-chan forwarder.Response)

func ignoreResponseBody(checkName string) bool {
switch checkName {
case checks.Pod.Name(), config.PodCheckManifestName, checks.ProcessEvents.Name():
case checks.Pod.Name(), checks.PodCheckManifestName, checks.ProcessEvents.Name():
return true
default:
return false
Expand All @@ -855,8 +874,8 @@ func ignoreResponseBody(checkName string) bool {
// By default we only send pod payloads containing pod metadata and pod manifests (yaml)
// Manifest payloads is a copy of pod manifests, we only send manifest payloads when feature flag is true
func handlePodChecks(l *Collector, start time.Time, messages []model.MessageBody, results *api.WeightedQueue) {
l.messagesToResultsQueue(start, config.PodCheckName, messages[:len(messages)/2], results)
l.messagesToResultsQueue(start, checks.PodCheckName, messages[:len(messages)/2], results)
if l.orchestrator.IsManifestCollectionEnabled {
l.messagesToResultsQueue(start, config.PodCheckManifestName, messages[len(messages)/2:], results)
l.messagesToResultsQueue(start, checks.PodCheckManifestName, messages[len(messages)/2:], results)
}
}
1 change: 0 additions & 1 deletion cmd/process-agent/collector_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,6 @@ func runCollectorTestWithAPIKeys(t *testing.T, check checks.Check, cfg *config.A
setProcessEventsEndpointsForTest(mockConfig, eventsEps...)

cfg.HostName = testHostName
cfg.CheckIntervals[check.Name()] = 500 * time.Millisecond

exit := make(chan struct{})

Expand Down
2 changes: 1 addition & 1 deletion cmd/process-agent/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ func TestIgnoreResponseBody(t *testing.T) {
{checkName: checks.Container.Name(), ignore: false},
{checkName: checks.RTContainer.Name(), ignore: false},
{checkName: checks.Pod.Name(), ignore: true},
{checkName: config.PodCheckManifestName, ignore: true},
{checkName: checks.PodCheckManifestName, ignore: true},
{checkName: checks.Connections.Name(), ignore: false},
{checkName: checks.ProcessEvents.Name(), ignore: true},
} {
Expand Down
13 changes: 13 additions & 0 deletions pkg/process/checks/checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,19 @@ import (
"github.com/DataDog/datadog-agent/pkg/process/config"
)

// Name for check performed by process-agent or system-probe
const (
ProcessCheckName = "process"
RTProcessCheckName = "rtprocess"
ContainerCheckName = "container"
RTContainerCheckName = "rtcontainer"
ConnectionsCheckName = "connections"
PodCheckName = "pod"
PodCheckManifestName = "pod_manifest"
DiscoveryCheckName = "process_discovery"
ProcessEventsCheckName = "process_events"
)

// Check is an interface for Agent checks that collect data. Each check returns
// a specific MessageBody type that will be published to the intake endpoint or
// processed in another way (e.g. printed for debugging).
Expand Down
2 changes: 1 addition & 1 deletion pkg/process/checks/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (c *ContainerCheck) Init(cfg *config.AgentConfig, info *model.SystemInfo) e
}

// Name returns the name of the ProcessCheck.
func (c *ContainerCheck) Name() string { return config.ContainerCheckName }
func (c *ContainerCheck) Name() string { return ContainerCheckName }

// RealTime indicates if this check only runs in real-time mode.
func (c *ContainerCheck) RealTime() bool { return false }
Expand Down
2 changes: 1 addition & 1 deletion pkg/process/checks/container_rt.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (r *RTContainerCheck) Init(_ *config.AgentConfig, sysInfo *model.SystemInfo
}

// Name returns the name of the RTContainerCheck.
func (r *RTContainerCheck) Name() string { return config.RTContainerCheckName }
func (r *RTContainerCheck) Name() string { return RTContainerCheckName }

// RealTime indicates if this check only runs in real-time mode.
func (r *RTContainerCheck) RealTime() bool { return true }
Expand Down
14 changes: 6 additions & 8 deletions pkg/process/checks/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import (

model "github.com/DataDog/agent-payload/v5/process"

"github.com/DataDog/datadog-agent/pkg/process/config"

"github.com/dustin/go-humanize"
)

Expand Down Expand Up @@ -70,17 +68,17 @@ var (
// HumanFormat takes the messages produced by a check run and outputs them in a human-readable format
func HumanFormat(check string, msgs []model.MessageBody, w io.Writer) error {
switch check {
case config.ProcessCheckName:
case ProcessCheckName:
return humanFormatProcess(msgs, w)
case config.RTProcessCheckName:
case RTProcessCheckName:
return humanFormatRealTimeProcess(msgs, w)
case config.ContainerCheckName:
case ContainerCheckName:
return humanFormatContainer(msgs, w)
case config.RTContainerCheckName:
case RTContainerCheckName:
return humanFormatRealTimeContainer(msgs, w)
case config.DiscoveryCheckName:
case DiscoveryCheckName:
return humanFormatProcessDiscovery(msgs, w)
case config.ProcessEventsCheckName:
case ProcessEventsCheckName:
return HumanFormatProcessEvents(msgs, w, true)
}
return ErrNoHumanFormat
Expand Down
99 changes: 99 additions & 0 deletions pkg/process/checks/interval.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

package checks

import (
"time"

"github.com/DataDog/datadog-agent/pkg/config"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

const (
ProcessCheckDefaultInterval = 10 * time.Second
RTProcessCheckDefaultInterval = 2 * time.Second
ContainerCheckDefaultInterval = 10 * time.Second
RTContainerCheckDefaultInterval = 2 * time.Second
ConnectionsCheckDefaultInterval = 30 * time.Second
PodCheckDefaultInterval = 10 * time.Second
ProcessDiscoveryCheckDefaultInterval = 4 * time.Hour

discoveryMinInterval = 10 * time.Minute

configIntervals = configPrefix + "intervals."

// The interval, in seconds, at which we will run each check. If you want consistent
// behavior between real-time you may set the Container/ProcessRT intervals to 10.
// Defaults to 10s for normal checks and 2s for others.
configProcessInterval = configIntervals + "process"
configRTProcessInterval = configIntervals + "process_realtime"
configContainerInterval = configIntervals + "container"
configRTContainerInterval = configIntervals + "container_realtime"
configConnectionsInterval = configIntervals + "connections"
)

var (
defaultIntervals = map[string]time.Duration{
ProcessCheckName: ProcessCheckDefaultInterval,
RTProcessCheckName: RTProcessCheckDefaultInterval,
ContainerCheckName: ContainerCheckDefaultInterval,
RTContainerCheckName: RTContainerCheckDefaultInterval,
ConnectionsCheckName: ConnectionsCheckDefaultInterval,
PodCheckName: PodCheckDefaultInterval,
DiscoveryCheckName: ProcessDiscoveryCheckDefaultInterval,
ProcessEventsCheckName: config.DefaultProcessEventsCheckInterval,
}

configKeys = map[string]string{
ProcessCheckName: configProcessInterval,
RTProcessCheckName: configRTProcessInterval,
ContainerCheckName: configContainerInterval,
RTContainerCheckName: configRTContainerInterval,
ConnectionsCheckName: configConnectionsInterval,
}
)

// GetDefaultInterval returns the default check interval value
func GetDefaultInterval(checkName string) time.Duration {
return defaultIntervals[checkName]
}

// GetInterval returns the configured check interval value
func GetInterval(checkName string) time.Duration {
switch checkName {
case DiscoveryCheckName:
// We don't need to check if the key exists since we already bound it to a default in InitConfig.
// We use a minimum of 10 minutes for this value.
discoveryInterval := config.Datadog.GetDuration("process_config.process_discovery.interval")
if discoveryInterval < discoveryMinInterval {
discoveryInterval = discoveryMinInterval
_ = log.Warnf("Invalid interval for process discovery (< %s) using minimum value of %[1]s", discoveryMinInterval.String())
}
return discoveryInterval

case ProcessEventsCheckName:
eventsInterval := config.Datadog.GetDuration("process_config.event_collection.interval")
if eventsInterval < config.DefaultProcessEventsMinCheckInterval {
eventsInterval = config.DefaultProcessEventsCheckInterval
_ = log.Warnf("Invalid interval for process_events check (< %s) using default value of %s",
config.DefaultProcessEventsMinCheckInterval.String(), config.DefaultProcessEventsCheckInterval.String())
}
return eventsInterval

default:
defaultInterval := defaultIntervals[checkName]
configKey, ok := configKeys[checkName]
if !ok || !config.Datadog.IsSet(configKey) {
return defaultInterval
}

if seconds := config.Datadog.GetInt(configKey); seconds != 0 {
log.Infof("Overriding %s check interval to %ds", configKey, seconds)
return time.Duration(seconds) * time.Second
}
return defaultInterval
}
}
Loading

0 comments on commit c4c6a3b

Please sign in to comment.