Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[process-agent] Remove check intervals from pkg/process/config #14878

Merged
merged 3 commits into from
Dec 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 28 additions & 9 deletions cmd/process-agent/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (l *Collector) runCheck(c checks.Check, results *api.WeightedQueue) {
checks.StoreCheckOutput(c.Name(), nil)
}

if c.Name() == config.PodCheckName {
if c.Name() == checks.PodCheckName {
handlePodChecks(l, start, messages, results)
} else {
l.messagesToResultsQueue(start, c.Name(), messages, results)
Expand Down Expand Up @@ -592,11 +592,30 @@ func (l *Collector) runnerForCheck(c checks.Check, exit chan struct{}) (func(),

rtResults := l.resultsQueueForCheck(withRealTime.RealTimeName())

interval := checks.GetInterval(withRealTime.Name())
rtInterval := checks.GetInterval(withRealTime.RealTimeName())

if interval < rtInterval || interval%rtInterval != 0 {
// Check interval must be greater or equal to realtime check interval and the intervals must be divisible
// in order to be run on the same goroutine
defaultInterval := checks.GetDefaultInterval(withRealTime.Name())
defaultRTInterval := checks.GetDefaultInterval(withRealTime.RealTimeName())
log.Warnf(
"Invalid %s check interval overrides [%s,%s], resetting to defaults [%s,%s]",
withRealTime.Name(),
interval,
rtInterval,
defaultInterval,
defaultRTInterval,
)
interval = defaultInterval
rtInterval = defaultRTInterval
}

return checks.NewRunnerWithRealTime(
checks.RunnerConfig{
CheckInterval: l.cfg.CheckInterval(withRealTime.Name()),
RtInterval: l.cfg.CheckInterval(withRealTime.RealTimeName()),

CheckInterval: interval,
RtInterval: rtInterval,
ExitChan: exit,
RtIntervalChan: l.rtIntervalCh,
RtEnabled: func() bool {
Expand All @@ -616,7 +635,7 @@ func (l *Collector) basicRunner(c checks.Check, results *api.WeightedQueue, exit
l.runCheck(c, results)
}

ticker := time.NewTicker(l.cfg.CheckInterval(c.Name()))
ticker := time.NewTicker(checks.GetInterval(c.Name()))
for {
select {
case <-ticker.C:
Expand Down Expand Up @@ -686,7 +705,7 @@ func (l *Collector) consumePayloads(results *api.WeightedQueue, fwd forwarder.Fo
updateRTStatus = false
responses, err = fwd.SubmitOrchestratorChecks(forwarderPayload, payload.headers, int(orchestrator.K8sPod))
// Pod check manifest data
case config.PodCheckManifestName:
case checks.PodCheckManifestName:
updateRTStatus = false
responses, err = fwd.SubmitOrchestratorManifests(forwarderPayload, payload.headers)
case checks.ProcessDiscovery.Name():
Expand Down Expand Up @@ -844,7 +863,7 @@ func readResponseStatuses(checkName string, responses <-chan forwarder.Response)

func ignoreResponseBody(checkName string) bool {
switch checkName {
case checks.Pod.Name(), config.PodCheckManifestName, checks.ProcessEvents.Name():
case checks.Pod.Name(), checks.PodCheckManifestName, checks.ProcessEvents.Name():
return true
default:
return false
Expand All @@ -855,8 +874,8 @@ func ignoreResponseBody(checkName string) bool {
// By default we only send pod payloads containing pod metadata and pod manifests (yaml)
// Manifest payloads is a copy of pod manifests, we only send manifest payloads when feature flag is true
func handlePodChecks(l *Collector, start time.Time, messages []model.MessageBody, results *api.WeightedQueue) {
l.messagesToResultsQueue(start, config.PodCheckName, messages[:len(messages)/2], results)
l.messagesToResultsQueue(start, checks.PodCheckName, messages[:len(messages)/2], results)
if l.orchestrator.IsManifestCollectionEnabled {
l.messagesToResultsQueue(start, config.PodCheckManifestName, messages[len(messages)/2:], results)
l.messagesToResultsQueue(start, checks.PodCheckManifestName, messages[len(messages)/2:], results)
}
}
1 change: 0 additions & 1 deletion cmd/process-agent/collector_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,6 @@ func runCollectorTestWithAPIKeys(t *testing.T, check checks.Check, cfg *config.A
setProcessEventsEndpointsForTest(mockConfig, eventsEps...)

cfg.HostName = testHostName
cfg.CheckIntervals[check.Name()] = 500 * time.Millisecond

exit := make(chan struct{})

Expand Down
2 changes: 1 addition & 1 deletion cmd/process-agent/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ func TestIgnoreResponseBody(t *testing.T) {
{checkName: checks.Container.Name(), ignore: false},
{checkName: checks.RTContainer.Name(), ignore: false},
{checkName: checks.Pod.Name(), ignore: true},
{checkName: config.PodCheckManifestName, ignore: true},
{checkName: checks.PodCheckManifestName, ignore: true},
{checkName: checks.Connections.Name(), ignore: false},
{checkName: checks.ProcessEvents.Name(), ignore: true},
} {
Expand Down
13 changes: 13 additions & 0 deletions pkg/process/checks/checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,19 @@ import (
"github.com/DataDog/datadog-agent/pkg/process/config"
)

// Name for check performed by process-agent or system-probe
const (
ProcessCheckName = "process"
RTProcessCheckName = "rtprocess"
ContainerCheckName = "container"
RTContainerCheckName = "rtcontainer"
ConnectionsCheckName = "connections"
PodCheckName = "pod"
PodCheckManifestName = "pod_manifest"
DiscoveryCheckName = "process_discovery"
ProcessEventsCheckName = "process_events"
)

// Check is an interface for Agent checks that collect data. Each check returns
// a specific MessageBody type that will be published to the intake endpoint or
// processed in another way (e.g. printed for debugging).
Expand Down
2 changes: 1 addition & 1 deletion pkg/process/checks/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (c *ContainerCheck) Init(cfg *config.AgentConfig, info *model.SystemInfo) e
}

// Name returns the name of the ProcessCheck.
func (c *ContainerCheck) Name() string { return config.ContainerCheckName }
func (c *ContainerCheck) Name() string { return ContainerCheckName }

// RealTime indicates if this check only runs in real-time mode.
func (c *ContainerCheck) RealTime() bool { return false }
Expand Down
2 changes: 1 addition & 1 deletion pkg/process/checks/container_rt.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (r *RTContainerCheck) Init(_ *config.AgentConfig, sysInfo *model.SystemInfo
}

// Name returns the name of the RTContainerCheck.
func (r *RTContainerCheck) Name() string { return config.RTContainerCheckName }
func (r *RTContainerCheck) Name() string { return RTContainerCheckName }

// RealTime indicates if this check only runs in real-time mode.
func (r *RTContainerCheck) RealTime() bool { return true }
Expand Down
14 changes: 6 additions & 8 deletions pkg/process/checks/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import (

model "github.com/DataDog/agent-payload/v5/process"

"github.com/DataDog/datadog-agent/pkg/process/config"

"github.com/dustin/go-humanize"
)

Expand Down Expand Up @@ -70,17 +68,17 @@ var (
// HumanFormat takes the messages produced by a check run and outputs them in a human-readable format
func HumanFormat(check string, msgs []model.MessageBody, w io.Writer) error {
switch check {
case config.ProcessCheckName:
case ProcessCheckName:
return humanFormatProcess(msgs, w)
case config.RTProcessCheckName:
case RTProcessCheckName:
return humanFormatRealTimeProcess(msgs, w)
case config.ContainerCheckName:
case ContainerCheckName:
return humanFormatContainer(msgs, w)
case config.RTContainerCheckName:
case RTContainerCheckName:
return humanFormatRealTimeContainer(msgs, w)
case config.DiscoveryCheckName:
case DiscoveryCheckName:
return humanFormatProcessDiscovery(msgs, w)
case config.ProcessEventsCheckName:
case ProcessEventsCheckName:
return HumanFormatProcessEvents(msgs, w, true)
}
return ErrNoHumanFormat
Expand Down
99 changes: 99 additions & 0 deletions pkg/process/checks/interval.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

package checks

import (
"time"

"github.com/DataDog/datadog-agent/pkg/config"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

const (
ProcessCheckDefaultInterval = 10 * time.Second
RTProcessCheckDefaultInterval = 2 * time.Second
ContainerCheckDefaultInterval = 10 * time.Second
RTContainerCheckDefaultInterval = 2 * time.Second
ConnectionsCheckDefaultInterval = 30 * time.Second
PodCheckDefaultInterval = 10 * time.Second
ProcessDiscoveryCheckDefaultInterval = 4 * time.Hour

discoveryMinInterval = 10 * time.Minute

configIntervals = configPrefix + "intervals."

// The interval, in seconds, at which we will run each check. If you want consistent
// behavior between real-time you may set the Container/ProcessRT intervals to 10.
// Defaults to 10s for normal checks and 2s for others.
configProcessInterval = configIntervals + "process"
configRTProcessInterval = configIntervals + "process_realtime"
configContainerInterval = configIntervals + "container"
configRTContainerInterval = configIntervals + "container_realtime"
configConnectionsInterval = configIntervals + "connections"
)

var (
defaultIntervals = map[string]time.Duration{
ProcessCheckName: ProcessCheckDefaultInterval,
RTProcessCheckName: RTProcessCheckDefaultInterval,
ContainerCheckName: ContainerCheckDefaultInterval,
RTContainerCheckName: RTContainerCheckDefaultInterval,
ConnectionsCheckName: ConnectionsCheckDefaultInterval,
PodCheckName: PodCheckDefaultInterval,
DiscoveryCheckName: ProcessDiscoveryCheckDefaultInterval,
ProcessEventsCheckName: config.DefaultProcessEventsCheckInterval,
}

configKeys = map[string]string{
ProcessCheckName: configProcessInterval,
RTProcessCheckName: configRTProcessInterval,
ContainerCheckName: configContainerInterval,
RTContainerCheckName: configRTContainerInterval,
ConnectionsCheckName: configConnectionsInterval,
}
)

// GetDefaultInterval returns the default check interval value
func GetDefaultInterval(checkName string) time.Duration {
return defaultIntervals[checkName]
}

// GetInterval returns the configured check interval value
func GetInterval(checkName string) time.Duration {
switch checkName {
case DiscoveryCheckName:
// We don't need to check if the key exists since we already bound it to a default in InitConfig.
// We use a minimum of 10 minutes for this value.
discoveryInterval := config.Datadog.GetDuration("process_config.process_discovery.interval")
if discoveryInterval < discoveryMinInterval {
discoveryInterval = discoveryMinInterval
_ = log.Warnf("Invalid interval for process discovery (< %s) using minimum value of %[1]s", discoveryMinInterval.String())
}
return discoveryInterval

case ProcessEventsCheckName:
eventsInterval := config.Datadog.GetDuration("process_config.event_collection.interval")
if eventsInterval < config.DefaultProcessEventsMinCheckInterval {
eventsInterval = config.DefaultProcessEventsCheckInterval
_ = log.Warnf("Invalid interval for process_events check (< %s) using default value of %s",
config.DefaultProcessEventsMinCheckInterval.String(), config.DefaultProcessEventsCheckInterval.String())
}
return eventsInterval

default:
defaultInterval := defaultIntervals[checkName]
configKey, ok := configKeys[checkName]
if !ok || !config.Datadog.IsSet(configKey) {
return defaultInterval
}

if seconds := config.Datadog.GetInt(configKey); seconds != 0 {
log.Infof("Overriding %s check interval to %ds", configKey, seconds)
return time.Duration(seconds) * time.Second
}
return defaultInterval
}
}
Loading