Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions collector/internal/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ func NewController(cfg *controller.Config,
ReportInterval: intervals.ReportInterval(),
ExecutablesCacheElements: 16384,
// Next step: Calculate FramesCacheElements from numCores and samplingRate.
FramesCacheElements: 131072,
CGroupCacheElements: 1024,
SamplesPerSecond: cfg.SamplesPerSecond,
FramesCacheElements: 131072,
PIDToContainerIDCacheElements: 1024,
SamplesPerSecond: cfg.SamplesPerSecond,
}, nextConsumer)
if err != nil {
return nil, err
Expand Down
59 changes: 0 additions & 59 deletions libpf/cgroupv2.go

This file was deleted.

6 changes: 3 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,9 @@ func mainWithExitCode() exitCode {
ReportInterval: intervals.ReportInterval(),
ExecutablesCacheElements: 16384,
// Next step: Calculate FramesCacheElements from numCores and samplingRate.
FramesCacheElements: 131072,
CGroupCacheElements: 1024,
SamplesPerSecond: cfg.SamplesPerSecond,
FramesCacheElements: 131072,
PIDToContainerIDCacheElements: 1024,
SamplesPerSecond: cfg.SamplesPerSecond,
})
if err != nil {
log.Error(err)
Expand Down
29 changes: 25 additions & 4 deletions reporter/base_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"fmt"
"os"
"time"

lru "github.com/elastic/go-freelru"
Expand Down Expand Up @@ -34,8 +35,8 @@ type baseReporter struct {
// pdata holds the generator for the data being exported.
pdata *pdata.Pdata

// cgroupv2ID caches PID to container ID information for cgroupv2 containers.
cgroupv2ID *lru.SyncedLRU[libpf.PID, string]
// pidToContainerID caches PID to container ID for cgroupv2 containers.
pidToContainerID *lru.SyncedLRU[libpf.PID, string]

// traceEvents stores reported trace events (trace metadata with frames and counts)
traceEvents xsync.RWMutex[samples.TraceEventsTree]
Expand Down Expand Up @@ -96,9 +97,9 @@ func (b *baseReporter) ReportTraceEvent(trace *libpf.Trace, meta *samples.TraceE
extraMeta = b.cfg.ExtraSampleAttrProd.CollectExtraSampleMeta(trace, meta)
}

containerID, err := libpf.LookupCgroupv2(b.cgroupv2ID, meta.PID)
containerID, err := b.lookupContainerID(meta.PID)
if err != nil {
log.Debugf("Failed to get a cgroupv2 ID as container ID for PID %d: %v",
log.Debugf("Failed to get a container ID for PID %d: %v",
meta.PID, err)
}

Expand Down Expand Up @@ -164,3 +165,23 @@ func (b *baseReporter) FrameMetadata(args *FrameMetadataArgs) {
}
b.pdata.Frames.Add(args.FrameID, si)
}

// lookupContainerID extracts the container ID from a cgroup v2 path or
// returns an empty string otherwise.
func (b *baseReporter) lookupContainerID(pid libpf.PID) (string, error) {
if v, ok := b.pidToContainerID.GetAndRefresh(pid, 90*time.Second); ok {
return v, nil
}

// Slow path
pidCgroupFile, err := os.Open(fmt.Sprintf("/proc/%d/cgroup", pid))
if err != nil {
return "", err
}
defer pidCgroupFile.Close()

containerID := extractContainerID(pidCgroupFile)

b.pidToContainerID.Add(pid, containerID)
return containerID, nil
}
20 changes: 10 additions & 10 deletions reporter/collector_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ type CollectorReporter struct {

// NewCollector builds a new CollectorReporter
func NewCollector(cfg *Config, nextConsumer xconsumer.Profiles) (*CollectorReporter, error) {
cgroupv2ID, err := lru.NewSynced[libpf.PID, string](cfg.CGroupCacheElements,
pidToContainerID, err := lru.NewSynced[libpf.PID, string](cfg.PIDToContainerIDCacheElements,
func(pid libpf.PID) uint32 { return uint32(pid) })
if err != nil {
return nil, err
}
// Set a lifetime to reduce the risk of invalid data in case of PID reuse.
cgroupv2ID.SetLifetime(90 * time.Second)
pidToContainerID.SetLifetime(90 * time.Second)
Copy link
Copy Markdown
Member

@christos68k christos68k Jun 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not for this PR, but we can and should do better here (e.g. ProcessManager pub-sub scheme where various subsystems can get notified about a PID exit) since the ProcessManager will receive notifications for every exited PID.


// Next step: Dynamically configure the size of this LRU.
// Currently, we use the length of the JSON array in
Expand All @@ -59,13 +59,13 @@ func NewCollector(cfg *Config, nextConsumer xconsumer.Profiles) (*CollectorRepor

return &CollectorReporter{
baseReporter: &baseReporter{
cfg: cfg,
name: cfg.Name,
version: cfg.Version,
pdata: data,
cgroupv2ID: cgroupv2ID,
traceEvents: xsync.NewRWMutex(tree),
hostmetadata: hostmetadata,
cfg: cfg,
name: cfg.Name,
version: cfg.Version,
pdata: data,
pidToContainerID: pidToContainerID,
traceEvents: xsync.NewRWMutex(tree),
hostmetadata: hostmetadata,
runLoop: &runLoop{
stopSignal: make(chan libpf.Void),
},
Expand All @@ -85,7 +85,7 @@ func (r *CollectorReporter) Start(ctx context.Context) error {
}, func() {
// Allow the GC to purge expired entries to avoid memory leaks.
r.pdata.Purge()
r.cgroupv2ID.PurgeExpired()
r.pidToContainerID.PurgeExpired()
})

// When Stop() is called and a signal to 'stop' is received, then:
Expand Down
6 changes: 3 additions & 3 deletions reporter/collector_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ func TestCollectorReporterReportTraceEvent(t *testing.T) {
}

r, err := NewCollector(&Config{
ExecutablesCacheElements: 1,
FramesCacheElements: 1,
CGroupCacheElements: 1,
ExecutablesCacheElements: 1,
FramesCacheElements: 1,
PIDToContainerIDCacheElements: 1,
}, next)
require.NoError(t, err)
if err := r.ReportTraceEvent(tt.trace, tt.meta); err != nil &&
Expand Down
4 changes: 2 additions & 2 deletions reporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ type Config struct {
ExecutablesCacheElements uint32
// FramesCacheElements defines the item capacity of the frames cache.
FramesCacheElements uint32
// CGroupCacheElements defines the item capacity of the cgroup cache.
CGroupCacheElements uint32
// PIDToContainerIDCacheElements defines the item capacity of the pid cache.
PIDToContainerIDCacheElements uint32
// samplesPerSecond defines the number of samples per second.
SamplesPerSecond int

Expand Down
20 changes: 10 additions & 10 deletions reporter/otlp_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ type OTLPReporter struct {

// NewOTLP returns a new instance of OTLPReporter
func NewOTLP(cfg *Config) (*OTLPReporter, error) {
cgroupv2ID, err := lru.NewSynced[libpf.PID, string](cfg.CGroupCacheElements,
pidToContainerID, err := lru.NewSynced[libpf.PID, string](cfg.PIDToContainerIDCacheElements,
func(pid libpf.PID) uint32 { return uint32(pid) })
if err != nil {
return nil, err
}
// Set a lifetime to reduce risk of invalid data in case of PID reuse.
cgroupv2ID.SetLifetime(90 * time.Second)
pidToContainerID.SetLifetime(90 * time.Second)

// Next step: Dynamically configure the size of this LRU.
// Currently, we use the length of the JSON array in
Expand All @@ -71,13 +71,13 @@ func NewOTLP(cfg *Config) (*OTLPReporter, error) {

return &OTLPReporter{
baseReporter: &baseReporter{
cfg: cfg,
name: cfg.Name,
version: cfg.Version,
pdata: data,
cgroupv2ID: cgroupv2ID,
traceEvents: xsync.NewRWMutex(eventsTree),
hostmetadata: hostmetadata,
cfg: cfg,
name: cfg.Name,
version: cfg.Version,
pdata: data,
pidToContainerID: pidToContainerID,
traceEvents: xsync.NewRWMutex(eventsTree),
hostmetadata: hostmetadata,
runLoop: &runLoop{
stopSignal: make(chan libpf.Void),
},
Expand Down Expand Up @@ -110,7 +110,7 @@ func (r *OTLPReporter) Start(ctx context.Context) error {
}, func() {
// Allow the GC to purge expired entries to avoid memory leaks.
r.pdata.Purge()
r.cgroupv2ID.PurgeExpired()
r.pidToContainerID.PurgeExpired()
})

// When Stop() is called and a signal to 'stop' is received, then:
Expand Down
38 changes: 37 additions & 1 deletion reporter/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,47 @@

package reporter // import "go.opentelemetry.io/ebpf-profiler/reporter"

import "github.com/zeebo/xxh3"
import (
"bufio"
"io"
"regexp"

log "github.com/sirupsen/logrus"
"github.com/zeebo/xxh3"
)

//nolint:lll
var (
cgroupv2ContainerIDPattern = regexp.MustCompile(`0:.*?:.*?([0-9a-fA-F]{64})(?:\.scope)?(?:/[a-z]+)?$`)
)

// hashString is a helper function for LRUs that use string as a key.
// Xxh3 turned out to be the fastest hash function for strings in the FreeLRU benchmarks.
// It was only outperformed by the AES hash function, which is implemented in Plan9 assembly.
func hashString(s string) uint32 {
return uint32(xxh3.HashString(s))
}

// extractContainerID returns the containerID for pid if cgroup v2 is used.
func extractContainerID(pidCgroupFile io.Reader) string {
scanner := bufio.NewScanner(pidCgroupFile)
buf := make([]byte, 512)
// Providing a predefined buffer overrides the internal buffer that Scanner uses (4096 bytes).
// We can do that and also set a maximum allocation size on the following call.
// With a maximum of 4096 characters path in the kernel, 8192 should be fine here. We don't
// expect lines in /proc/<PID>/cgroup to be longer than that.
scanner.Buffer(buf, 8192)
var pathParts []string
for scanner.Scan() {
line := scanner.Text()
pathParts = cgroupv2ContainerIDPattern.FindStringSubmatch(line)
if pathParts == nil {
log.Debugf("Could not extract cgroupv2 path from line: %s", line)
continue
}
return pathParts[1]
}

// No container ID could be extracted
return ""
}
56 changes: 56 additions & 0 deletions reporter/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package reporter // import "go.opentelemetry.io/ebpf-profiler/reporter"

import (
"bytes"
"testing"

"github.com/stretchr/testify/assert"
)

//nolint:lll
func TestExtractContainerID(t *testing.T) {
tests := []struct {
line string
expectedContainerID string
}{
{
line: "0::/kubepods.slice/kubepods-burstable.slice/kubepods-burstable-podf6f2d169_f2ae_4afa-95ed_06ff2ed6b288.slice/cri-containerd-b4d6d161c62525d726fa394b27df30e14f8ea5646313ada576b390de70cfc8cc.scope",
expectedContainerID: "b4d6d161c62525d726fa394b27df30e14f8ea5646313ada576b390de70cfc8cc",
},
{
line: "0::/kubepods/besteffort/pod05e102bf-8744-4942-a241-9b6f07983a53/f52a212505a606972cf8614c3cb856539e71b77ecae33436c5ac442232fbacf8",
expectedContainerID: "f52a212505a606972cf8614c3cb856539e71b77ecae33436c5ac442232fbacf8",
},
{
line: "0::/kubepods/besteffort/pod897277d4-5e6f-4999-a976-b8340e8d075e/crio-a4d6b686848a610472a2eed3ae20d4d64b6b4819feb9fdfc7fd7854deaf59ef3",
expectedContainerID: "a4d6b686848a610472a2eed3ae20d4d64b6b4819feb9fdfc7fd7854deaf59ef3",
},
{
line: "0::/kubepods.slice/kubepods-besteffort.slice/kubepods-besteffort-pod4c9f1974_5c46_44c2_b42f_3bbf0e98eef9.slice/cri-containerd-bacb920470900725e0aa7d914fee5eb0854315448b024b6b8420ad8429c607ba.scope",
expectedContainerID: "bacb920470900725e0aa7d914fee5eb0854315448b024b6b8420ad8429c607ba",
},
{
line: "0::/user.slice/user-1000.slice/user@1000.service/app.slice/app-org.gnome.Terminal.slice/vte-spawn-868f9513-eee8-457d-8e36-1b37ae8ae622.scope",
},
{
line: "0::/../../user.slice/user-501.slice/session-3.scope",
},
{
line: "0::/system.slice/docker-b1eba9dfaeba29d8b80532a574a03ea3cac29384327f339c26da13649e2120df.scope/init",
expectedContainerID: "b1eba9dfaeba29d8b80532a574a03ea3cac29384327f339c26da13649e2120df",
},
}

for _, tc := range tests {
tc := tc
t.Run(tc.expectedContainerID, func(t *testing.T) {
reader := bytes.NewReader([]byte(tc.line))

gotContainerID := extractContainerID(reader)
assert.Equal(t, tc.expectedContainerID, gotContainerID)
})
}
}
Loading