From 3391ef36072e3ffeb41e62b551ecb2d46c543d15 Mon Sep 17 00:00:00 2001 From: Kevin Sheldrake Date: Mon, 18 Nov 2024 12:57:25 +0000 Subject: [PATCH] ProcessCache: Make GC interval configurable The process cache delays removal of exited processes. The first time the garbage collector sees a process (marked deletePending), it changes it to deleteReady. The second time it sees the process, it deletes it from the cache. The garbage collector runs every 30s. This commit makes the GC interval configurable with the --process-cache-gc-interval switch. Signed-off-by: Kevin Sheldrake --- cmd/tetragon/main.go | 7 ++++- docs/data/tetragon_flags.yaml | 3 ++ pkg/bench/bench.go | 2 +- pkg/defaults/defaults.go | 5 +++ pkg/grpc/exec/exec_test_helper.go | 3 +- pkg/grpc/process_manager_test.go | 9 +++--- .../observer_test_helper.go | 21 ++++++++++--- pkg/option/config.go | 5 +-- pkg/option/flags.go | 31 ++++++++++++------- pkg/process/cache.go | 10 ++---- pkg/process/cache_test.go | 3 +- pkg/process/process.go | 5 +-- 12 files changed, 69 insertions(+), 35 deletions(-) diff --git a/cmd/tetragon/main.go b/cmd/tetragon/main.go index 787310f4b09..a88be21ea63 100644 --- a/cmd/tetragon/main.go +++ b/cmd/tetragon/main.go @@ -424,7 +424,12 @@ func tetragonExecuteCtx(ctx context.Context, cancel context.CancelFunc, ready fu } k8sWatcher.Start() - if err := process.InitCache(k8sWatcher, option.Config.ProcessCacheSize); err != nil { + pcGcInterval := option.Config.ProcessCacheGcInterval + if pcGcInterval <= 0 { + pcGcInterval = defaults.DefaultProcessCacheGcInterval + } + + if err := process.InitCache(k8sWatcher, option.Config.ProcessCacheSize, pcGcInterval); err != nil { return err } diff --git a/docs/data/tetragon_flags.yaml b/docs/data/tetragon_flags.yaml index ba3f347b556..e73ab0d735e 100644 --- a/docs/data/tetragon_flags.yaml +++ b/docs/data/tetragon_flags.yaml @@ -176,6 +176,9 @@ options: - name: pprof-address usage: | Serves runtime profile data via HTTP (e.g. 'localhost:6060'). Disabled by default + - name: process-cache-gc-interval + default_value: 30s + usage: Time between checking the process cache for old entries - name: process-cache-size default_value: "65536" usage: Size of the process cache diff --git a/pkg/bench/bench.go b/pkg/bench/bench.go index 0423622dbca..f4eee2b6dd0 100644 --- a/pkg/bench/bench.go +++ b/pkg/bench/bench.go @@ -208,7 +208,7 @@ func startBenchmarkExporter(ctx context.Context, obs *observer.Observer, summary dataCacheSize := 1024 watcher := watcher.NewFakeK8sWatcher(nil) - if err := process.InitCache(watcher, processCacheSize); err != nil { + if err := process.InitCache(watcher, processCacheSize, defaults.DefaultProcessCacheGcInterval); err != nil { return err } diff --git a/pkg/defaults/defaults.go b/pkg/defaults/defaults.go index b9920c42d53..6acacc243dd 100644 --- a/pkg/defaults/defaults.go +++ b/pkg/defaults/defaults.go @@ -3,6 +3,8 @@ package defaults +import "time" + const ( // DefaultMapRoot is the default path where BPFFS should be mounted DefaultMapRoot = "/sys/fs/bpf" @@ -49,6 +51,9 @@ const ( // defaults for the event cache DefaultEventCacheNumRetries = 15 DefaultEventCacheRetryDelay = 2 + + // defaults for the process cache + DefaultProcessCacheGcInterval = 30 * time.Second ) var ( diff --git a/pkg/grpc/exec/exec_test_helper.go b/pkg/grpc/exec/exec_test_helper.go index d2f5d8e16b7..b14a3b29ce0 100644 --- a/pkg/grpc/exec/exec_test_helper.go +++ b/pkg/grpc/exec/exec_test_helper.go @@ -10,6 +10,7 @@ import ( "github.com/cilium/tetragon/api/v1/tetragon" tetragonAPI "github.com/cilium/tetragon/pkg/api/processapi" + "github.com/cilium/tetragon/pkg/defaults" "github.com/cilium/tetragon/pkg/eventcache" "github.com/cilium/tetragon/pkg/option" "github.com/cilium/tetragon/pkg/process" @@ -311,7 +312,7 @@ func CreateCloneEvents[CLONE notify.Message, EXIT notify.Message](Pid uint32, Kt } func InitEnv[EXEC notify.Message, EXIT notify.Message](t *testing.T, watcher watcher.K8sResourceWatcher) DummyNotifier[EXEC, EXIT] { - if err := process.InitCache(watcher, 65536); err != nil { + if err := process.InitCache(watcher, 65536, defaults.DefaultProcessCacheGcInterval); err != nil { t.Fatalf("failed to call process.InitCache %s", err) } diff --git a/pkg/grpc/process_manager_test.go b/pkg/grpc/process_manager_test.go index a7af237fb49..d81f6b5836c 100644 --- a/pkg/grpc/process_manager_test.go +++ b/pkg/grpc/process_manager_test.go @@ -13,6 +13,7 @@ import ( "github.com/cilium/tetragon/api/v1/tetragon" "github.com/cilium/tetragon/pkg/api/processapi" + "github.com/cilium/tetragon/pkg/defaults" "github.com/cilium/tetragon/pkg/grpc/exec" "github.com/cilium/tetragon/pkg/option" "github.com/cilium/tetragon/pkg/process" @@ -60,7 +61,7 @@ func TestProcessManager_getPodInfo(t *testing.T) { } pods := []interface{}{&podA} - err := process.InitCache(watcher.NewFakeK8sWatcher(pods), 10) + err := process.InitCache(watcher.NewFakeK8sWatcher(pods), 10, defaults.DefaultProcessCacheGcInterval) assert.NoError(t, err) defer process.FreeCache() pod := process.GetPodInfo(0, "container-id-not-found", "", "", 0) @@ -125,7 +126,7 @@ func TestProcessManager_getPodInfoMaybeExecProbe(t *testing.T) { }, } pods := []interface{}{&podA} - err := process.InitCache(watcher.NewFakeK8sWatcher(pods), 10) + err := process.InitCache(watcher.NewFakeK8sWatcher(pods), 10, defaults.DefaultProcessCacheGcInterval) assert.NoError(t, err) defer process.FreeCache() pod := process.GetPodInfo(0, "aaaaaaa", "/bin/command", "arg-a arg-b", 1234) @@ -145,7 +146,7 @@ func TestProcessManager_getPodInfoMaybeExecProbe(t *testing.T) { } func TestProcessManager_GetProcessExec(t *testing.T) { - err := process.InitCache(watcher.NewFakeK8sWatcher(nil), 10) + err := process.InitCache(watcher.NewFakeK8sWatcher(nil), 10, defaults.DefaultProcessCacheGcInterval) assert.NoError(t, err) defer process.FreeCache() var wg sync.WaitGroup @@ -215,7 +216,7 @@ func TestProcessManager_GetProcessID(t *testing.T) { assert.NoError(t, os.Setenv("NODE_NAME", "my-node")) node.SetNodeName() - err := process.InitCache(watcher.NewFakeK8sWatcher([]interface{}{}), 10) + err := process.InitCache(watcher.NewFakeK8sWatcher([]interface{}{}), 10, defaults.DefaultProcessCacheGcInterval) assert.NoError(t, err) defer process.FreeCache() id := process.GetProcessID(1, 2) diff --git a/pkg/observer/observertesthelper/observer_test_helper.go b/pkg/observer/observertesthelper/observer_test_helper.go index d02f3c9be64..21169e6839c 100644 --- a/pkg/observer/observertesthelper/observer_test_helper.go +++ b/pkg/observer/observertesthelper/observer_test_helper.go @@ -20,6 +20,7 @@ import ( "time" "github.com/cilium/tetragon/pkg/cgrouprate" + "github.com/cilium/tetragon/pkg/defaults" "github.com/cilium/tetragon/pkg/encoder" "github.com/cilium/tetragon/pkg/metricsconfig" "github.com/cilium/tetragon/pkg/observer" @@ -57,9 +58,10 @@ var ( ) type testObserverOptions struct { - crd bool - config string - lib string + crd bool + config string + lib string + procCacheGcInterval time.Duration } type testExporterOptions struct { @@ -102,6 +104,12 @@ func WithConfig(config string) TestOption { } } +func WithProcCacheGcInterval(gcInterval time.Duration) TestOption { + return func(o *TestOptions) { + o.observer.procCacheGcInterval = gcInterval + } +} + func withK8sWatcher(w watcher.K8sResourceWatcher) TestOption { return func(o *TestOptions) { o.exporter.watcher = w @@ -358,6 +366,7 @@ func loadExporter(tb testing.TB, ctx context.Context, obs *observer.Observer, op watcher := opts.watcher processCacheSize := 32768 dataCacheSize := 1024 + procCacheGcInterval := defaults.DefaultProcessCacheGcInterval if err := obs.InitSensorManager(); err != nil { return err @@ -378,7 +387,11 @@ func loadExporter(tb testing.TB, ctx context.Context, obs *observer.Observer, op return err } - if err := process.InitCache(watcher, processCacheSize); err != nil { + if oo.procCacheGcInterval > 0 { + procCacheGcInterval = oo.procCacheGcInterval + } + + if err := process.InitCache(watcher, processCacheSize, procCacheGcInterval); err != nil { return err } diff --git a/pkg/option/config.go b/pkg/option/config.go index 97241f312be..c96ba974505 100644 --- a/pkg/option/config.go +++ b/pkg/option/config.go @@ -46,8 +46,9 @@ type config struct { RBSizeTotal int RBQueueSize int - ProcessCacheSize int - DataCacheSize int + ProcessCacheSize int + DataCacheSize int + ProcessCacheGcInterval time.Duration MetricsServer string MetricsLabelFilter metrics.LabelFilter diff --git a/pkg/option/flags.go b/pkg/option/flags.go index 94ef9ad1f53..d29f9d894b8 100644 --- a/pkg/option/flags.go +++ b/pkg/option/flags.go @@ -17,18 +17,19 @@ import ( ) const ( - KeyConfigDir = "config-dir" - KeyDebug = "debug" - KeyHubbleLib = "bpf-lib" - KeyBTF = "btf" - KeyProcFS = "procfs" - KeyKernelVersion = "kernel" - KeyVerbosity = "verbose" - KeyProcessCacheSize = "process-cache-size" - KeyDataCacheSize = "data-cache-size" - KeyForceSmallProgs = "force-small-progs" - KeyForceLargeProgs = "force-large-progs" - KeyClusterName = "cluster-name" + KeyConfigDir = "config-dir" + KeyDebug = "debug" + KeyHubbleLib = "bpf-lib" + KeyBTF = "btf" + KeyProcFS = "procfs" + KeyKernelVersion = "kernel" + KeyVerbosity = "verbose" + KeyProcessCacheSize = "process-cache-size" + KeyDataCacheSize = "data-cache-size" + KeyProcessCacheGcInterval = "process-cache-gc-interval" + KeyForceSmallProgs = "force-small-progs" + KeyForceLargeProgs = "force-large-progs" + KeyClusterName = "cluster-name" KeyLogLevel = "log-level" KeyLogFormat = "log-format" @@ -170,6 +171,11 @@ func ReadAndSetFlags() error { Config.ProcessCacheSize = viper.GetInt(KeyProcessCacheSize) Config.DataCacheSize = viper.GetInt(KeyDataCacheSize) + Config.ProcessCacheGcInterval = viper.GetDuration(KeyProcessCacheGcInterval) + + if Config.ProcessCacheGcInterval <= 0 { + return fmt.Errorf("failed to parse process-cache-gc-interval value. Must be >= 0") + } Config.MetricsServer = viper.GetString(KeyMetricsServer) Config.MetricsLabelFilter = DefaultLabelFilter().WithEnabledLabels(ParseMetricsLabelFilter(viper.GetString(KeyMetricsLabelFilter))) @@ -296,6 +302,7 @@ func AddFlags(flags *pflag.FlagSet) { flags.Int(KeyVerbosity, 0, "set verbosity level for eBPF verifier dumps. Pass 0 for silent, 1 for truncated logs, 2 for a full dump") flags.Int(KeyProcessCacheSize, 65536, "Size of the process cache") flags.Int(KeyDataCacheSize, 1024, "Size of the data events cache") + flags.Duration(KeyProcessCacheGcInterval, defaults.DefaultProcessCacheGcInterval, "Time between checking the process cache for old entries") flags.Bool(KeyForceSmallProgs, false, "Force loading small programs, even in kernels with >= 5.3 versions") flags.Bool(KeyForceLargeProgs, false, "Force loading large programs, even in kernels with < 5.3 versions") flags.String(KeyExportFilename, "", "Filename for JSON export. Disabled by default") diff --git a/pkg/process/cache.go b/pkg/process/cache.go index ecb503a443c..82a3c13b8c9 100644 --- a/pkg/process/cache.go +++ b/pkg/process/cache.go @@ -40,12 +40,7 @@ var colorStr = map[int]string{ deleted: "deleted", } -// garbage collection run interval -const ( - intervalGC = time.Second * 30 -) - -func (pc *Cache) cacheGarbageCollector() { +func (pc *Cache) cacheGarbageCollector(intervalGC time.Duration) { ticker := time.NewTicker(intervalGC) pc.deleteChan = make(chan *ProcessInternal) pc.stopChan = make(chan bool) @@ -147,6 +142,7 @@ func (pc *Cache) purge() { func NewCache( processCacheSize int, + gcInterval time.Duration, ) (*Cache, error) { lruCache, err := lru.NewWithEvict( processCacheSize, @@ -161,7 +157,7 @@ func NewCache( cache: lruCache, size: processCacheSize, } - pm.cacheGarbageCollector() + pm.cacheGarbageCollector(gcInterval) return pm, nil } diff --git a/pkg/process/cache_test.go b/pkg/process/cache_test.go index c4630daaad2..5308c99ccf2 100644 --- a/pkg/process/cache_test.go +++ b/pkg/process/cache_test.go @@ -7,6 +7,7 @@ import ( "testing" "github.com/cilium/tetragon/api/v1/tetragon" + "github.com/cilium/tetragon/pkg/defaults" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/protobuf/types/known/wrapperspb" @@ -14,7 +15,7 @@ import ( func TestProcessCache(t *testing.T) { // add a process to the cache. - cache, err := NewCache(10) + cache, err := NewCache(10, defaults.DefaultProcessCacheGcInterval) require.NoError(t, err) pid := wrapperspb.UInt32Value{Value: 1234} execID := "process1" diff --git a/pkg/process/process.go b/pkg/process/process.go index 03321669846..61845d0a959 100644 --- a/pkg/process/process.go +++ b/pkg/process/process.go @@ -10,6 +10,7 @@ import ( "strings" "sync" "sync/atomic" + "time" "github.com/cilium/tetragon/pkg/cgidmap" "github.com/cilium/tetragon/pkg/fieldfilters" @@ -73,7 +74,7 @@ var ( ErrProcessInfoMissing = errors.New("failed process info missing") ) -func InitCache(w watcher.K8sResourceWatcher, size int) error { +func InitCache(w watcher.K8sResourceWatcher, size int, gcInterval time.Duration) error { var err error if procCache != nil { @@ -81,7 +82,7 @@ func InitCache(w watcher.K8sResourceWatcher, size int) error { } k8s = w - procCache, err = NewCache(size) + procCache, err = NewCache(size, gcInterval) if err != nil { k8s = nil }