diff --git a/collector/internal/controller.go b/collector/internal/controller.go index d414ee5e0..8380f484f 100644 --- a/collector/internal/controller.go +++ b/collector/internal/controller.go @@ -37,9 +37,9 @@ func NewController(cfg *controller.Config, ReportInterval: intervals.ReportInterval(), ExecutablesCacheElements: 16384, // Next step: Calculate FramesCacheElements from numCores and samplingRate. - FramesCacheElements: 131072, - CGroupCacheElements: 1024, - SamplesPerSecond: cfg.SamplesPerSecond, + FramesCacheElements: 131072, + PIDToContainerIDCacheElements: 1024, + SamplesPerSecond: cfg.SamplesPerSecond, }, nextConsumer) if err != nil { return nil, err diff --git a/libpf/cgroupv2.go b/libpf/cgroupv2.go deleted file mode 100644 index aa5d952f4..000000000 --- a/libpf/cgroupv2.go +++ /dev/null @@ -1,59 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package libpf // import "go.opentelemetry.io/ebpf-profiler/libpf" - -import ( - "bufio" - "fmt" - "os" - "regexp" - - lru "github.com/elastic/go-freelru" - log "github.com/sirupsen/logrus" -) - -var ( - cgroupv2PathPattern = regexp.MustCompile(`0:.*?:(.*)`) -) - -// LookupCgroupv2 returns the cgroupv2 ID for pid. -func LookupCgroupv2(cgrouplru *lru.SyncedLRU[PID, string], pid PID) (string, error) { - id, ok := cgrouplru.Get(pid) - if ok { - return id, nil - } - - // Slow path - f, err := os.Open(fmt.Sprintf("/proc/%d/cgroup", pid)) - if err != nil { - return "", err - } - defer f.Close() - - var genericCgroupv2 string - scanner := bufio.NewScanner(f) - buf := make([]byte, 512) - // Providing a predefined buffer overrides the internal buffer that Scanner uses (4096 bytes). - // We can do that and also set a maximum allocation size on the following call. - // With a maximum of 4096 characters path in the kernel, 8192 should be fine here. We don't - // expect lines in /proc//cgroup to be longer than that. - scanner.Buffer(buf, 8192) - var pathParts []string - for scanner.Scan() { - line := scanner.Text() - pathParts = cgroupv2PathPattern.FindStringSubmatch(line) - if pathParts == nil { - log.Debugf("Could not extract cgroupv2 path from line: %s", line) - continue - } - genericCgroupv2 = pathParts[1] - break - } - - // Cache the cgroupv2 information. - // To avoid busy lookups, also empty cgroupv2 information is cached. - cgrouplru.Add(pid, genericCgroupv2) - - return genericCgroupv2, nil -} diff --git a/main.go b/main.go index 607096084..6925f071c 100644 --- a/main.go +++ b/main.go @@ -114,9 +114,9 @@ func mainWithExitCode() exitCode { ReportInterval: intervals.ReportInterval(), ExecutablesCacheElements: 16384, // Next step: Calculate FramesCacheElements from numCores and samplingRate. - FramesCacheElements: 131072, - CGroupCacheElements: 1024, - SamplesPerSecond: cfg.SamplesPerSecond, + FramesCacheElements: 131072, + PIDToContainerIDCacheElements: 1024, + SamplesPerSecond: cfg.SamplesPerSecond, }) if err != nil { log.Error(err) diff --git a/reporter/base_reporter.go b/reporter/base_reporter.go index 364133ed1..f78c93846 100644 --- a/reporter/base_reporter.go +++ b/reporter/base_reporter.go @@ -7,6 +7,7 @@ import ( "context" "errors" "fmt" + "os" "time" lru "github.com/elastic/go-freelru" @@ -34,8 +35,8 @@ type baseReporter struct { // pdata holds the generator for the data being exported. pdata *pdata.Pdata - // cgroupv2ID caches PID to container ID information for cgroupv2 containers. - cgroupv2ID *lru.SyncedLRU[libpf.PID, string] + // pidToContainerID caches PID to container ID for cgroupv2 containers. + pidToContainerID *lru.SyncedLRU[libpf.PID, string] // traceEvents stores reported trace events (trace metadata with frames and counts) traceEvents xsync.RWMutex[samples.TraceEventsTree] @@ -96,9 +97,9 @@ func (b *baseReporter) ReportTraceEvent(trace *libpf.Trace, meta *samples.TraceE extraMeta = b.cfg.ExtraSampleAttrProd.CollectExtraSampleMeta(trace, meta) } - containerID, err := libpf.LookupCgroupv2(b.cgroupv2ID, meta.PID) + containerID, err := b.lookupContainerID(meta.PID) if err != nil { - log.Debugf("Failed to get a cgroupv2 ID as container ID for PID %d: %v", + log.Debugf("Failed to get a container ID for PID %d: %v", meta.PID, err) } @@ -164,3 +165,23 @@ func (b *baseReporter) FrameMetadata(args *FrameMetadataArgs) { } b.pdata.Frames.Add(args.FrameID, si) } + +// lookupContainerID extracts the container ID from a cgroup v2 path or +// returns an empty string otherwise. +func (b *baseReporter) lookupContainerID(pid libpf.PID) (string, error) { + if v, ok := b.pidToContainerID.GetAndRefresh(pid, 90*time.Second); ok { + return v, nil + } + + // Slow path + pidCgroupFile, err := os.Open(fmt.Sprintf("/proc/%d/cgroup", pid)) + if err != nil { + return "", err + } + defer pidCgroupFile.Close() + + containerID := extractContainerID(pidCgroupFile) + + b.pidToContainerID.Add(pid, containerID) + return containerID, nil +} diff --git a/reporter/collector_reporter.go b/reporter/collector_reporter.go index 4ae5bc349..673d777e6 100644 --- a/reporter/collector_reporter.go +++ b/reporter/collector_reporter.go @@ -29,13 +29,13 @@ type CollectorReporter struct { // NewCollector builds a new CollectorReporter func NewCollector(cfg *Config, nextConsumer xconsumer.Profiles) (*CollectorReporter, error) { - cgroupv2ID, err := lru.NewSynced[libpf.PID, string](cfg.CGroupCacheElements, + pidToContainerID, err := lru.NewSynced[libpf.PID, string](cfg.PIDToContainerIDCacheElements, func(pid libpf.PID) uint32 { return uint32(pid) }) if err != nil { return nil, err } // Set a lifetime to reduce the risk of invalid data in case of PID reuse. - cgroupv2ID.SetLifetime(90 * time.Second) + pidToContainerID.SetLifetime(90 * time.Second) // Next step: Dynamically configure the size of this LRU. // Currently, we use the length of the JSON array in @@ -59,13 +59,13 @@ func NewCollector(cfg *Config, nextConsumer xconsumer.Profiles) (*CollectorRepor return &CollectorReporter{ baseReporter: &baseReporter{ - cfg: cfg, - name: cfg.Name, - version: cfg.Version, - pdata: data, - cgroupv2ID: cgroupv2ID, - traceEvents: xsync.NewRWMutex(tree), - hostmetadata: hostmetadata, + cfg: cfg, + name: cfg.Name, + version: cfg.Version, + pdata: data, + pidToContainerID: pidToContainerID, + traceEvents: xsync.NewRWMutex(tree), + hostmetadata: hostmetadata, runLoop: &runLoop{ stopSignal: make(chan libpf.Void), }, @@ -85,7 +85,7 @@ func (r *CollectorReporter) Start(ctx context.Context) error { }, func() { // Allow the GC to purge expired entries to avoid memory leaks. r.pdata.Purge() - r.cgroupv2ID.PurgeExpired() + r.pidToContainerID.PurgeExpired() }) // When Stop() is called and a signal to 'stop' is received, then: diff --git a/reporter/collector_reporter_test.go b/reporter/collector_reporter_test.go index d5807d465..7a552f157 100644 --- a/reporter/collector_reporter_test.go +++ b/reporter/collector_reporter_test.go @@ -52,9 +52,9 @@ func TestCollectorReporterReportTraceEvent(t *testing.T) { } r, err := NewCollector(&Config{ - ExecutablesCacheElements: 1, - FramesCacheElements: 1, - CGroupCacheElements: 1, + ExecutablesCacheElements: 1, + FramesCacheElements: 1, + PIDToContainerIDCacheElements: 1, }, next) require.NoError(t, err) if err := r.ReportTraceEvent(tt.trace, tt.meta); err != nil && diff --git a/reporter/config.go b/reporter/config.go index 0c13f47cf..9fe9191c9 100644 --- a/reporter/config.go +++ b/reporter/config.go @@ -30,8 +30,8 @@ type Config struct { ExecutablesCacheElements uint32 // FramesCacheElements defines the item capacity of the frames cache. FramesCacheElements uint32 - // CGroupCacheElements defines the item capacity of the cgroup cache. - CGroupCacheElements uint32 + // PIDToContainerIDCacheElements defines the item capacity of the pid cache. + PIDToContainerIDCacheElements uint32 // samplesPerSecond defines the number of samples per second. SamplesPerSecond int diff --git a/reporter/otlp_reporter.go b/reporter/otlp_reporter.go index 2191c6583..c9065d3a9 100644 --- a/reporter/otlp_reporter.go +++ b/reporter/otlp_reporter.go @@ -41,13 +41,13 @@ type OTLPReporter struct { // NewOTLP returns a new instance of OTLPReporter func NewOTLP(cfg *Config) (*OTLPReporter, error) { - cgroupv2ID, err := lru.NewSynced[libpf.PID, string](cfg.CGroupCacheElements, + pidToContainerID, err := lru.NewSynced[libpf.PID, string](cfg.PIDToContainerIDCacheElements, func(pid libpf.PID) uint32 { return uint32(pid) }) if err != nil { return nil, err } // Set a lifetime to reduce risk of invalid data in case of PID reuse. - cgroupv2ID.SetLifetime(90 * time.Second) + pidToContainerID.SetLifetime(90 * time.Second) // Next step: Dynamically configure the size of this LRU. // Currently, we use the length of the JSON array in @@ -71,13 +71,13 @@ func NewOTLP(cfg *Config) (*OTLPReporter, error) { return &OTLPReporter{ baseReporter: &baseReporter{ - cfg: cfg, - name: cfg.Name, - version: cfg.Version, - pdata: data, - cgroupv2ID: cgroupv2ID, - traceEvents: xsync.NewRWMutex(eventsTree), - hostmetadata: hostmetadata, + cfg: cfg, + name: cfg.Name, + version: cfg.Version, + pdata: data, + pidToContainerID: pidToContainerID, + traceEvents: xsync.NewRWMutex(eventsTree), + hostmetadata: hostmetadata, runLoop: &runLoop{ stopSignal: make(chan libpf.Void), }, @@ -110,7 +110,7 @@ func (r *OTLPReporter) Start(ctx context.Context) error { }, func() { // Allow the GC to purge expired entries to avoid memory leaks. r.pdata.Purge() - r.cgroupv2ID.PurgeExpired() + r.pidToContainerID.PurgeExpired() }) // When Stop() is called and a signal to 'stop' is received, then: diff --git a/reporter/util.go b/reporter/util.go index 6eba1fa6e..45d749e32 100644 --- a/reporter/util.go +++ b/reporter/util.go @@ -3,7 +3,19 @@ package reporter // import "go.opentelemetry.io/ebpf-profiler/reporter" -import "github.com/zeebo/xxh3" +import ( + "bufio" + "io" + "regexp" + + log "github.com/sirupsen/logrus" + "github.com/zeebo/xxh3" +) + +//nolint:lll +var ( + cgroupv2ContainerIDPattern = regexp.MustCompile(`0:.*?:.*?([0-9a-fA-F]{64})(?:\.scope)?(?:/[a-z]+)?$`) +) // hashString is a helper function for LRUs that use string as a key. // Xxh3 turned out to be the fastest hash function for strings in the FreeLRU benchmarks. @@ -11,3 +23,27 @@ import "github.com/zeebo/xxh3" func hashString(s string) uint32 { return uint32(xxh3.HashString(s)) } + +// extractContainerID returns the containerID for pid if cgroup v2 is used. +func extractContainerID(pidCgroupFile io.Reader) string { + scanner := bufio.NewScanner(pidCgroupFile) + buf := make([]byte, 512) + // Providing a predefined buffer overrides the internal buffer that Scanner uses (4096 bytes). + // We can do that and also set a maximum allocation size on the following call. + // With a maximum of 4096 characters path in the kernel, 8192 should be fine here. We don't + // expect lines in /proc//cgroup to be longer than that. + scanner.Buffer(buf, 8192) + var pathParts []string + for scanner.Scan() { + line := scanner.Text() + pathParts = cgroupv2ContainerIDPattern.FindStringSubmatch(line) + if pathParts == nil { + log.Debugf("Could not extract cgroupv2 path from line: %s", line) + continue + } + return pathParts[1] + } + + // No container ID could be extracted + return "" +} diff --git a/reporter/util_test.go b/reporter/util_test.go new file mode 100644 index 000000000..465e3e6e1 --- /dev/null +++ b/reporter/util_test.go @@ -0,0 +1,56 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package reporter // import "go.opentelemetry.io/ebpf-profiler/reporter" + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/assert" +) + +//nolint:lll +func TestExtractContainerID(t *testing.T) { + tests := []struct { + line string + expectedContainerID string + }{ + { + line: "0::/kubepods.slice/kubepods-burstable.slice/kubepods-burstable-podf6f2d169_f2ae_4afa-95ed_06ff2ed6b288.slice/cri-containerd-b4d6d161c62525d726fa394b27df30e14f8ea5646313ada576b390de70cfc8cc.scope", + expectedContainerID: "b4d6d161c62525d726fa394b27df30e14f8ea5646313ada576b390de70cfc8cc", + }, + { + line: "0::/kubepods/besteffort/pod05e102bf-8744-4942-a241-9b6f07983a53/f52a212505a606972cf8614c3cb856539e71b77ecae33436c5ac442232fbacf8", + expectedContainerID: "f52a212505a606972cf8614c3cb856539e71b77ecae33436c5ac442232fbacf8", + }, + { + line: "0::/kubepods/besteffort/pod897277d4-5e6f-4999-a976-b8340e8d075e/crio-a4d6b686848a610472a2eed3ae20d4d64b6b4819feb9fdfc7fd7854deaf59ef3", + expectedContainerID: "a4d6b686848a610472a2eed3ae20d4d64b6b4819feb9fdfc7fd7854deaf59ef3", + }, + { + line: "0::/kubepods.slice/kubepods-besteffort.slice/kubepods-besteffort-pod4c9f1974_5c46_44c2_b42f_3bbf0e98eef9.slice/cri-containerd-bacb920470900725e0aa7d914fee5eb0854315448b024b6b8420ad8429c607ba.scope", + expectedContainerID: "bacb920470900725e0aa7d914fee5eb0854315448b024b6b8420ad8429c607ba", + }, + { + line: "0::/user.slice/user-1000.slice/user@1000.service/app.slice/app-org.gnome.Terminal.slice/vte-spawn-868f9513-eee8-457d-8e36-1b37ae8ae622.scope", + }, + { + line: "0::/../../user.slice/user-501.slice/session-3.scope", + }, + { + line: "0::/system.slice/docker-b1eba9dfaeba29d8b80532a574a03ea3cac29384327f339c26da13649e2120df.scope/init", + expectedContainerID: "b1eba9dfaeba29d8b80532a574a03ea3cac29384327f339c26da13649e2120df", + }, + } + + for _, tc := range tests { + tc := tc + t.Run(tc.expectedContainerID, func(t *testing.T) { + reader := bytes.NewReader([]byte(tc.line)) + + gotContainerID := extractContainerID(reader) + assert.Equal(t, tc.expectedContainerID, gotContainerID) + }) + } +}