Skip to content

Commit

Permalink
[process-agent] Remove check intervals from pkg/process/config
Browse files Browse the repository at this point in the history
- Remove check interval management from pkg/process/config package
- Never store intervals, just use config settings
- Generalize check for process and process RT check intervals
  • Loading branch information
xornivore committed Dec 28, 2022
1 parent ad8393c commit 00f86c4
Show file tree
Hide file tree
Showing 19 changed files with 206 additions and 172 deletions.
37 changes: 28 additions & 9 deletions cmd/process-agent/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (l *Collector) runCheck(c checks.Check, results *api.WeightedQueue) {
checks.StoreCheckOutput(c.Name(), nil)
}

if c.Name() == config.PodCheckName {
if c.Name() == checks.PodCheckName {
handlePodChecks(l, start, messages, results)
} else {
l.messagesToResultsQueue(start, c.Name(), messages, results)
Expand Down Expand Up @@ -592,11 +592,30 @@ func (l *Collector) runnerForCheck(c checks.Check, exit chan struct{}) (func(),

rtResults := l.resultsQueueForCheck(withRealTime.RealTimeName())

interval := checks.GetInterval(withRealTime.Name())
rtInterval := checks.GetInterval(withRealTime.RealTimeName())

if interval < rtInterval || interval%rtInterval != 0 {
// Check interval must be greater or equal to realtime check interval and the intervals must be divisible
// in order to be run on the same goroutine
defaultInterval := checks.GetDefaultInterval(withRealTime.Name())
defaultRTInterval := checks.GetDefaultInterval(withRealTime.RealTimeName())
log.Warnf(
"Invalid %s check interval overrides [%s,%s], resetting to defaults [%s,%s]",
withRealTime.Name(),
interval,
rtInterval,
defaultInterval,
defaultRTInterval,
)
interval = defaultInterval
rtInterval = defaultRTInterval
}

return checks.NewRunnerWithRealTime(
checks.RunnerConfig{
CheckInterval: l.cfg.CheckInterval(withRealTime.Name()),
RtInterval: l.cfg.CheckInterval(withRealTime.RealTimeName()),

CheckInterval: interval,
RtInterval: rtInterval,
ExitChan: exit,
RtIntervalChan: l.rtIntervalCh,
RtEnabled: func() bool {
Expand All @@ -616,7 +635,7 @@ func (l *Collector) basicRunner(c checks.Check, results *api.WeightedQueue, exit
l.runCheck(c, results)
}

ticker := time.NewTicker(l.cfg.CheckInterval(c.Name()))
ticker := time.NewTicker(checks.GetInterval(c.Name()))
for {
select {
case <-ticker.C:
Expand Down Expand Up @@ -686,7 +705,7 @@ func (l *Collector) consumePayloads(results *api.WeightedQueue, fwd forwarder.Fo
updateRTStatus = false
responses, err = fwd.SubmitOrchestratorChecks(forwarderPayload, payload.headers, int(orchestrator.K8sPod))
// Pod check manifest data
case config.PodCheckManifestName:
case checks.PodCheckManifestName:
updateRTStatus = false
responses, err = fwd.SubmitOrchestratorManifests(forwarderPayload, payload.headers)
case checks.ProcessDiscovery.Name():
Expand Down Expand Up @@ -844,7 +863,7 @@ func readResponseStatuses(checkName string, responses <-chan forwarder.Response)

func ignoreResponseBody(checkName string) bool {
switch checkName {
case checks.Pod.Name(), config.PodCheckManifestName, checks.ProcessEvents.Name():
case checks.Pod.Name(), checks.PodCheckManifestName, checks.ProcessEvents.Name():
return true
default:
return false
Expand All @@ -855,8 +874,8 @@ func ignoreResponseBody(checkName string) bool {
// By default we only send pod payloads containing pod metadata and pod manifests (yaml)
// Manifest payloads is a copy of pod manifests, we only send manifest payloads when feature flag is true
func handlePodChecks(l *Collector, start time.Time, messages []model.MessageBody, results *api.WeightedQueue) {
l.messagesToResultsQueue(start, config.PodCheckName, messages[:len(messages)/2], results)
l.messagesToResultsQueue(start, checks.PodCheckName, messages[:len(messages)/2], results)
if l.orchestrator.IsManifestCollectionEnabled {
l.messagesToResultsQueue(start, config.PodCheckManifestName, messages[len(messages)/2:], results)
l.messagesToResultsQueue(start, checks.PodCheckManifestName, messages[len(messages)/2:], results)
}
}
1 change: 0 additions & 1 deletion cmd/process-agent/collector_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,6 @@ func runCollectorTestWithAPIKeys(t *testing.T, check checks.Check, cfg *config.A
setProcessEventsEndpointsForTest(mockConfig, eventsEps...)

cfg.HostName = testHostName
cfg.CheckIntervals[check.Name()] = 500 * time.Millisecond

exit := make(chan struct{})

Expand Down
2 changes: 1 addition & 1 deletion cmd/process-agent/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ func TestIgnoreResponseBody(t *testing.T) {
{checkName: checks.Container.Name(), ignore: false},
{checkName: checks.RTContainer.Name(), ignore: false},
{checkName: checks.Pod.Name(), ignore: true},
{checkName: config.PodCheckManifestName, ignore: true},
{checkName: checks.PodCheckManifestName, ignore: true},
{checkName: checks.Connections.Name(), ignore: false},
{checkName: checks.ProcessEvents.Name(), ignore: true},
} {
Expand Down
13 changes: 13 additions & 0 deletions pkg/process/checks/checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,19 @@ import (
"github.com/DataDog/datadog-agent/pkg/process/config"
)

// Name for check performed by process-agent or system-probe
const (
ProcessCheckName = "process"
RTProcessCheckName = "rtprocess"
ContainerCheckName = "container"
RTContainerCheckName = "rtcontainer"
ConnectionsCheckName = "connections"
PodCheckName = "pod"
PodCheckManifestName = "pod_manifest"
DiscoveryCheckName = "process_discovery"
ProcessEventsCheckName = "process_events"
)

// Check is an interface for Agent checks that collect data. Each check returns
// a specific MessageBody type that will be published to the intake endpoint or
// processed in another way (e.g. printed for debugging).
Expand Down
2 changes: 1 addition & 1 deletion pkg/process/checks/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (c *ContainerCheck) Init(cfg *config.AgentConfig, info *model.SystemInfo) e
}

// Name returns the name of the ProcessCheck.
func (c *ContainerCheck) Name() string { return config.ContainerCheckName }
func (c *ContainerCheck) Name() string { return ContainerCheckName }

// RealTime indicates if this check only runs in real-time mode.
func (c *ContainerCheck) RealTime() bool { return false }
Expand Down
2 changes: 1 addition & 1 deletion pkg/process/checks/container_rt.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (r *RTContainerCheck) Init(_ *config.AgentConfig, sysInfo *model.SystemInfo
}

// Name returns the name of the RTContainerCheck.
func (r *RTContainerCheck) Name() string { return config.RTContainerCheckName }
func (r *RTContainerCheck) Name() string { return RTContainerCheckName }

// RealTime indicates if this check only runs in real-time mode.
func (r *RTContainerCheck) RealTime() bool { return true }
Expand Down
14 changes: 6 additions & 8 deletions pkg/process/checks/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import (

model "github.com/DataDog/agent-payload/v5/process"

"github.com/DataDog/datadog-agent/pkg/process/config"

"github.com/dustin/go-humanize"
)

Expand Down Expand Up @@ -70,17 +68,17 @@ var (
// HumanFormat takes the messages produced by a check run and outputs them in a human-readable format
func HumanFormat(check string, msgs []model.MessageBody, w io.Writer) error {
switch check {
case config.ProcessCheckName:
case ProcessCheckName:
return humanFormatProcess(msgs, w)
case config.RTProcessCheckName:
case RTProcessCheckName:
return humanFormatRealTimeProcess(msgs, w)
case config.ContainerCheckName:
case ContainerCheckName:
return humanFormatContainer(msgs, w)
case config.RTContainerCheckName:
case RTContainerCheckName:
return humanFormatRealTimeContainer(msgs, w)
case config.DiscoveryCheckName:
case DiscoveryCheckName:
return humanFormatProcessDiscovery(msgs, w)
case config.ProcessEventsCheckName:
case ProcessEventsCheckName:
return HumanFormatProcessEvents(msgs, w, true)
}
return ErrNoHumanFormat
Expand Down
101 changes: 101 additions & 0 deletions pkg/process/checks/interval.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

package checks

import (
"time"

"github.com/DataDog/datadog-agent/pkg/config"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

const (
ProcessCheckDefaultInterval = 10 * time.Second
RTProcessCheckDefaultInterval = 2 * time.Second
ContainerCheckDefaultInterval = 10 * time.Second
RTContainerCheckDefaultInterval = 2 * time.Second
ConnectionsCheckDefaultInterval = 30 * time.Second
PodCheckDefaultInterval = 10 * time.Second
ProcessDiscoveryCheckDefaultInterval = 4 * time.Hour

discoveryMinInterval = 10 * time.Minute

configIntervals = configPrefix + ".intervals"

// The interval, in seconds, at which we will run each check. If you want consistent
// behavior between real-time you may set the Container/ProcessRT intervals to 10.
// Defaults to 10s for normal checks and 2s for others.
configProcessInterval = configIntervals + ".process"
configRTProcessInterval = configIntervals + ".process_realtime"
configContainerInterval = configIntervals + ".container"
configRTContainerInterval = configIntervals + ".container_realtime"
configConnectionsInterval = configIntervals + ".connections"
)

var (
defaultIntervals = map[string]time.Duration{
ProcessCheckName: ProcessCheckDefaultInterval,
RTProcessCheckName: RTProcessCheckDefaultInterval,
ContainerCheckName: ContainerCheckDefaultInterval,
RTContainerCheckName: RTContainerCheckDefaultInterval,
ConnectionsCheckName: ConnectionsCheckDefaultInterval,
PodCheckName: PodCheckDefaultInterval,
DiscoveryCheckName: ProcessDiscoveryCheckDefaultInterval,
ProcessEventsCheckName: config.DefaultProcessEventsCheckInterval,
}

configKeys = map[string]string{
ProcessCheckName: configProcessInterval,
RTProcessCheckName: configRTProcessInterval,
ContainerCheckName: configContainerInterval,
RTContainerCheckName: configRTContainerInterval,
ConnectionsCheckName: configConnectionsInterval,
}
)

// GetDefaultInterval returns the default check interval value
func GetDefaultInterval(checkName string) time.Duration {
return defaultIntervals[checkName]
}

// GetInterval returns the configured check interval value
func GetInterval(checkName string) time.Duration {

switch checkName {
case DiscoveryCheckName:
// We don't need to check if the key exists since we already bound it to a default in InitConfig.
// We use a minimum of 10 minutes for this value.
discoveryInterval := config.Datadog.GetDuration("process_config.process_discovery.interval")
if discoveryInterval < discoveryMinInterval {
discoveryInterval = discoveryMinInterval
_ = log.Warnf("Invalid interval for process discovery (<= %s) using default value of %[1]s", discoveryMinInterval.String())
}
return discoveryInterval

case ProcessEventsCheckName:
eventsInterval := config.Datadog.GetDuration("process_config.event_collection.interval")
if eventsInterval < config.DefaultProcessEventsMinCheckInterval {
eventsInterval = config.DefaultProcessEventsCheckInterval
_ = log.Warnf("Invalid interval for process_events check (< %s) using default value of %s",
config.DefaultProcessEventsMinCheckInterval.String(), config.DefaultProcessEventsCheckInterval.String())
}
return eventsInterval

default:
defaultInterval := defaultIntervals[checkName]
configKey, ok := configKeys[checkName]
if !ok || !config.Datadog.IsSet(configKey) {
_ = log.Errorf("missing check interval for '%s', you must set a default", checkName)
return defaultInterval
}

if seconds := config.Datadog.GetInt(configKey); seconds != 0 {
log.Infof("Overriding %s check interval to %ds", configKey, seconds)
return time.Duration(seconds) * time.Second
}
return defaultInterval
}
}
42 changes: 42 additions & 0 deletions pkg/process/checks/interval_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

package checks

import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/DataDog/datadog-agent/pkg/config"
)

// TestProcessDiscoveryInterval tests to make sure that the process discovery interval validation works properly
func TestProcessDiscoveryInterval(t *testing.T) {
for _, tc := range []struct {
name string
interval time.Duration
expectedInterval time.Duration
}{
{
name: "allowed interval",
interval: 8 * time.Hour,
expectedInterval: 8 * time.Hour,
},
{
name: "below minimum",
interval: 0,
expectedInterval: discoveryMinInterval,
},
} {
t.Run(tc.name, func(t *testing.T) {
cfg := config.Mock(t)
cfg.Set("process_config.process_discovery.interval", tc.interval)

assert.Equal(t, tc.expectedInterval, GetInterval(DiscoveryCheckName))
})
}
}
2 changes: 1 addition & 1 deletion pkg/process/checks/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (c *ConnectionsCheck) Init(cfg *config.AgentConfig, _ *model.SystemInfo) er
}

// Name returns the name of the ConnectionsCheck.
func (c *ConnectionsCheck) Name() string { return config.ConnectionsCheckName }
func (c *ConnectionsCheck) Name() string { return ConnectionsCheckName }

// RealTime indicates if this check only runs in real-time mode.
func (c *ConnectionsCheck) RealTime() bool { return false }
Expand Down
2 changes: 1 addition & 1 deletion pkg/process/checks/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (c *PodCheck) Init(_ *config.AgentConfig, info *model.SystemInfo) error {
}

// Name returns the name of the ProcessCheck.
func (c *PodCheck) Name() string { return config.PodCheckName }
func (c *PodCheck) Name() string { return PodCheckName }

// RealTime indicates if this check only runs in real-time mode.
func (c *PodCheck) RealTime() bool { return false }
Expand Down
6 changes: 3 additions & 3 deletions pkg/process/checks/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,10 @@ func (p *ProcessCheck) Init(_ *config.AgentConfig, info *model.SystemInfo) error
}

// Name returns the name of the ProcessCheck.
func (p *ProcessCheck) Name() string { return config.ProcessCheckName }
func (p *ProcessCheck) Name() string { return ProcessCheckName }

// RealTimeName returns the name of the RTProcessCheck
func (p *ProcessCheck) RealTimeName() string { return config.RTProcessCheckName }
func (p *ProcessCheck) RealTimeName() string { return RTProcessCheckName }

// RealTime indicates if this check only runs in real-time mode.
func (p *ProcessCheck) RealTime() bool { return false }
Expand Down Expand Up @@ -329,7 +329,7 @@ func fmtProcesses(
connsByPID map[int32][]*model.Connection,
) map[string][]*model.Process {
procsByCtr := make(map[string][]*model.Process)
connCheckIntervalS := int(cfg.CheckIntervals[config.ConnectionsCheckName] / time.Second)
connCheckIntervalS := int(GetInterval(ConnectionsCheckName) / time.Second)

for _, fp := range procs {
if skipProcess(disallowList, fp, lastProcs) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/process/checks/process_discovery_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (d *ProcessDiscoveryCheck) Init(_ *config.AgentConfig, info *model.SystemIn
}

// Name returns the name of the ProcessDiscoveryCheck.
func (d *ProcessDiscoveryCheck) Name() string { return config.DiscoveryCheckName }
func (d *ProcessDiscoveryCheck) Name() string { return DiscoveryCheckName }

// RealTime returns a value that says whether this check should be run in real time.
func (d *ProcessDiscoveryCheck) RealTime() bool { return false }
Expand Down
2 changes: 1 addition & 1 deletion pkg/process/checks/process_events_fallback.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (e *ProcessEventsCheck) Init(_ *config.AgentConfig, info *model.SystemInfo)
}

// Name returns the name of the ProcessEventsCheck.
func (e *ProcessEventsCheck) Name() string { return config.ProcessEventsCheckName }
func (e *ProcessEventsCheck) Name() string { return ProcessEventsCheckName }

// RealTime returns a value that says whether this check should be run in real time.
func (e *ProcessEventsCheck) RealTime() bool { return false }
Expand Down
2 changes: 1 addition & 1 deletion pkg/process/checks/process_events_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (e *ProcessEventsCheck) start() {
}

// Name returns the name of the ProcessEventsCheck.
func (e *ProcessEventsCheck) Name() string { return config.ProcessEventsCheckName }
func (e *ProcessEventsCheck) Name() string { return ProcessEventsCheckName }

// RealTime returns a value that says whether this check should be run in real time.
func (e *ProcessEventsCheck) RealTime() bool { return false }
Expand Down
2 changes: 1 addition & 1 deletion pkg/process/checks/process_rt.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func fmtProcessStats(
lastRun time.Time,
connsByPID map[int32][]*model.Connection,
) [][]*model.ProcessStat {
connCheckIntervalS := int(cfg.CheckIntervals[config.ConnectionsCheckName] / time.Second)
connCheckIntervalS := int(GetInterval(ConnectionsCheckName) / time.Second)

chunked := make([][]*model.ProcessStat, 0)
chunk := make([]*model.ProcessStat, 0, maxBatchSize)
Expand Down
Loading

0 comments on commit 00f86c4

Please sign in to comment.