Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion collector/internal/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ func NewController(cfg *controller.Config,
ExecutablesCacheElements: 16384,
// Next step: Calculate FramesCacheElements from numCores and samplingRate.
FramesCacheElements: 131072,
CGroupCacheElements: 1024,
SamplesPerSecond: cfg.SamplesPerSecond,
}, nextConsumer)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions host/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type Trace struct {
Comm string
ProcessName string
ExecutablePath string
ContainerID string
Frames []Frame
Hash TraceHash
KTime times.KTime
Expand Down
59 changes: 0 additions & 59 deletions libpf/cgroupv2.go

This file was deleted.

1 change: 0 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ func mainWithExitCode() exitCode {
ExecutablesCacheElements: 16384,
// Next step: Calculate FramesCacheElements from numCores and samplingRate.
FramesCacheElements: 131072,
CGroupCacheElements: 1024,
SamplesPerSecond: cfg.SamplesPerSecond,
})
if err != nil {
Expand Down
44 changes: 44 additions & 0 deletions processmanager/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,24 @@
package processmanager // import "go.opentelemetry.io/ebpf-profiler/processmanager"

import (
"bufio"
"fmt"
"io"
"os"
"regexp"

lru "github.com/elastic/go-freelru"
log "github.com/sirupsen/logrus"

"go.opentelemetry.io/ebpf-profiler/host"
"go.opentelemetry.io/ebpf-profiler/libpf"
)

//nolint:lll
var (
cgroupv2ContainerIDPattern = regexp.MustCompile(`0:.*?:.*?([0-9a-fA-F]{64})(?:\.scope)?(?:/[a-z]+)?$`)
)

type lruFileIDMapper struct {
cache *lru.SyncedLRU[host.FileID, libpf.FileID]
}
Expand Down Expand Up @@ -79,3 +90,36 @@ type FileIDMapper interface {
// Set adds a mapping from the 64-bit file ID to the 128-bit file ID.
Set(pre host.FileID, post libpf.FileID)
}

func parseContainerID(cgroupFile io.Reader) string {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the tests only contain cgroupV2 strings, should we document this function to support cgroupV2 only? Otherwise, we should add cgroupV1 test cases.
IMO, we don't need to support for v1.

scanner := bufio.NewScanner(cgroupFile)
buf := make([]byte, 512)
// Providing a predefined buffer overrides the internal buffer that Scanner uses (4096 bytes).
// We can do that and also set a maximum allocation size on the following call.
// With a maximum of 4096 characters path in the kernel, 8192 should be fine here. We don't
// expect lines in /proc/<PID>/cgroup to be longer than that.
scanner.Buffer(buf, 8192)
var pathParts []string
for scanner.Scan() {
line := scanner.Text()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Skip regex processing and heap allocations for the very common case of "0::/".

Suggested change
line := scanner.Text()
b := scanner.Bytes()
if bytes.Equal(b, []byte("0::/")) {
continue // Skip a common case
}
line := string(b)

pathParts = cgroupv2ContainerIDPattern.FindStringSubmatch(line)
if pathParts == nil {
log.Debugf("Could not extract cgroupv2 path from line: %s", line)
continue
}
return pathParts[1]
}

// No containerID could be extracted
return ""
}

// extractContainerID returns the containerID for pid if cgroup v2 is used.
func extractContainerID(pid libpf.PID) (string, error) {
cgroupFile, err := os.Open(fmt.Sprintf("/proc/%d/cgroup", pid))
if err != nil {
return "", err
}

return parseContainerID(cgroupFile), nil
}
56 changes: 56 additions & 0 deletions processmanager/helpers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package processmanager // import "go.opentelemetry.io/ebpf-profiler/processmanager"

import (
"bytes"
"testing"

"github.com/stretchr/testify/assert"
)

//nolint:lll
func TestExtractContainerID(t *testing.T) {
tests := []struct {
line string
expectedContainerID string
}{
{
line: "0::/kubepods.slice/kubepods-burstable.slice/kubepods-burstable-podf6f2d169_f2ae_4afa-95ed_06ff2ed6b288.slice/cri-containerd-b4d6d161c62525d726fa394b27df30e14f8ea5646313ada576b390de70cfc8cc.scope",
expectedContainerID: "b4d6d161c62525d726fa394b27df30e14f8ea5646313ada576b390de70cfc8cc",
},
{
line: "0::/kubepods/besteffort/pod05e102bf-8744-4942-a241-9b6f07983a53/f52a212505a606972cf8614c3cb856539e71b77ecae33436c5ac442232fbacf8",
expectedContainerID: "f52a212505a606972cf8614c3cb856539e71b77ecae33436c5ac442232fbacf8",
},
{
line: "0::/kubepods/besteffort/pod897277d4-5e6f-4999-a976-b8340e8d075e/crio-a4d6b686848a610472a2eed3ae20d4d64b6b4819feb9fdfc7fd7854deaf59ef3",
expectedContainerID: "a4d6b686848a610472a2eed3ae20d4d64b6b4819feb9fdfc7fd7854deaf59ef3",
},
{
line: "0::/kubepods.slice/kubepods-besteffort.slice/kubepods-besteffort-pod4c9f1974_5c46_44c2_b42f_3bbf0e98eef9.slice/cri-containerd-bacb920470900725e0aa7d914fee5eb0854315448b024b6b8420ad8429c607ba.scope",
expectedContainerID: "bacb920470900725e0aa7d914fee5eb0854315448b024b6b8420ad8429c607ba",
},
{
line: "0::/user.slice/user-1000.slice/user@1000.service/app.slice/app-org.gnome.Terminal.slice/vte-spawn-868f9513-eee8-457d-8e36-1b37ae8ae622.scope",
},
{
line: "0::/../../user.slice/user-501.slice/session-3.scope",
},
{
line: "0::/system.slice/docker-b1eba9dfaeba29d8b80532a574a03ea3cac29384327f339c26da13649e2120df.scope/init",
expectedContainerID: "b1eba9dfaeba29d8b80532a574a03ea3cac29384327f339c26da13649e2120df",
},
}

for _, tc := range tests {
tc := tc
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
tc := tc

t.Run(tc.expectedContainerID, func(t *testing.T) {
reader := bytes.NewReader([]byte(tc.line))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
reader := bytes.NewReader([]byte(tc.line))
reader := strings.NewReader(tc.line)


gotContainerID := parseContainerID(reader)
assert.Equal(t, tc.expectedContainerID, gotContainerID)
})
}
}
6 changes: 6 additions & 0 deletions processmanager/processinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,16 @@ func (pm *ProcessManager) updatePidInformation(pid libpf.PID, m *Mapping) (bool,
}
}

containerID, err := extractContainerID(pid)
if err != nil {
log.Debugf("Failed extracting containerID for %d: %v", pid, err)
}

info = &processInfo{
meta: ProcessMeta{
Name: processName,
Executable: exePath,
ContainerID: containerID,
EnvVariables: envVarMap},
mappings: make(map[libpf.Address]*Mapping),
mappingsByFileID: make(map[host.FileID]map[libpf.Address]*Mapping),
Expand Down
2 changes: 2 additions & 0 deletions processmanager/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ type ProcessMeta struct {
Executable string
// process env vars from /proc/PID/environ
EnvVariables map[string]string
// container ID retrieved from /proc/PID/cgroup
ContainerID string
}

// processInfo contains information about the executable mappings
Expand Down
10 changes: 1 addition & 9 deletions reporter/base_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@ type baseReporter struct {
// pdata holds the generator for the data being exported.
pdata *pdata.Pdata

// cgroupv2ID caches PID to container ID information for cgroupv2 containers.
cgroupv2ID *lru.SyncedLRU[libpf.PID, string]

// traceEvents stores reported trace events (trace metadata with frames and counts)
traceEvents xsync.RWMutex[samples.TraceEventsTree]

Expand Down Expand Up @@ -96,12 +93,7 @@ func (b *baseReporter) ReportTraceEvent(trace *libpf.Trace, meta *samples.TraceE
extraMeta = b.cfg.ExtraSampleAttrProd.CollectExtraSampleMeta(trace, meta)
}

containerID, err := libpf.LookupCgroupv2(b.cgroupv2ID, meta.PID)
if err != nil {
log.Debugf("Failed to get a cgroupv2 ID as container ID for PID %d: %v",
meta.PID, err)
}

containerID := meta.ContainerID
key := samples.TraceAndMetaKey{
Hash: trace.Hash,
Comm: meta.Comm,
Expand Down
11 changes: 0 additions & 11 deletions reporter/collector_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package reporter // import "go.opentelemetry.io/ebpf-profiler/reporter"

import (
"context"
"time"

lru "github.com/elastic/go-freelru"
log "github.com/sirupsen/logrus"
Expand All @@ -29,14 +28,6 @@ type CollectorReporter struct {

// NewCollector builds a new CollectorReporter
func NewCollector(cfg *Config, nextConsumer xconsumer.Profiles) (*CollectorReporter, error) {
cgroupv2ID, err := lru.NewSynced[libpf.PID, string](cfg.CGroupCacheElements,
func(pid libpf.PID) uint32 { return uint32(pid) })
if err != nil {
return nil, err
}
// Set a lifetime to reduce the risk of invalid data in case of PID reuse.
cgroupv2ID.SetLifetime(90 * time.Second)

// Next step: Dynamically configure the size of this LRU.
// Currently, we use the length of the JSON array in
// hostmetadata/hostmetadata.json.
Expand All @@ -63,7 +54,6 @@ func NewCollector(cfg *Config, nextConsumer xconsumer.Profiles) (*CollectorRepor
name: cfg.Name,
version: cfg.Version,
pdata: data,
cgroupv2ID: cgroupv2ID,
traceEvents: xsync.NewRWMutex(tree),
hostmetadata: hostmetadata,
runLoop: &runLoop{
Expand All @@ -85,7 +75,6 @@ func (r *CollectorReporter) Start(ctx context.Context) error {
}, func() {
// Allow the GC to purge expired entries to avoid memory leaks.
r.pdata.Purge()
r.cgroupv2ID.PurgeExpired()
})

// When Stop() is called and a signal to 'stop' is received, then:
Expand Down
1 change: 0 additions & 1 deletion reporter/collector_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ func TestCollectorReporterReportTraceEvent(t *testing.T) {
r, err := NewCollector(&Config{
ExecutablesCacheElements: 1,
FramesCacheElements: 1,
CGroupCacheElements: 1,
}, next)
require.NoError(t, err)
if err := r.ReportTraceEvent(tt.trace, tt.meta); err != nil &&
Expand Down
2 changes: 0 additions & 2 deletions reporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ type Config struct {
ExecutablesCacheElements uint32
// FramesCacheElements defines the item capacity of the frames cache.
FramesCacheElements uint32
// CGroupCacheElements defines the item capacity of the cgroup cache.
CGroupCacheElements uint32
// samplesPerSecond defines the number of samples per second.
SamplesPerSecond int

Expand Down
10 changes: 0 additions & 10 deletions reporter/otlp_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,6 @@ type OTLPReporter struct {

// NewOTLP returns a new instance of OTLPReporter
func NewOTLP(cfg *Config) (*OTLPReporter, error) {
cgroupv2ID, err := lru.NewSynced[libpf.PID, string](cfg.CGroupCacheElements,
func(pid libpf.PID) uint32 { return uint32(pid) })
if err != nil {
return nil, err
}
// Set a lifetime to reduce risk of invalid data in case of PID reuse.
cgroupv2ID.SetLifetime(90 * time.Second)

// Next step: Dynamically configure the size of this LRU.
// Currently, we use the length of the JSON array in
// hostmetadata/hostmetadata.json.
Expand All @@ -75,7 +67,6 @@ func NewOTLP(cfg *Config) (*OTLPReporter, error) {
name: cfg.Name,
version: cfg.Version,
pdata: data,
cgroupv2ID: cgroupv2ID,
traceEvents: xsync.NewRWMutex(eventsTree),
hostmetadata: hostmetadata,
runLoop: &runLoop{
Expand Down Expand Up @@ -110,7 +101,6 @@ func (r *OTLPReporter) Start(ctx context.Context) error {
}, func() {
// Allow the GC to purge expired entries to avoid memory leaks.
r.pdata.Purge()
r.cgroupv2ID.PurgeExpired()
})

// When Stop() is called and a signal to 'stop' is received, then:
Expand Down
3 changes: 2 additions & 1 deletion reporter/samples/samples.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type TraceEventMeta struct {
ProcessName string
ExecutablePath string
APMServiceName string
ContainerID string
PID, TID libpf.PID
CPU int
Origin libpf.Origin
Expand Down Expand Up @@ -39,7 +40,7 @@ type TraceAndMetaKey struct {
// comm and apmServiceName are provided by the eBPF programs
Comm string
ApmServiceName string
// containerID is annotated based on PID information
// ContainerID is annotated based on PID information
ContainerID string
Pid int64
Tid int64
Expand Down
1 change: 1 addition & 0 deletions tracehandler/tracehandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func (m *traceHandler) HandleTrace(bpfTrace *host.Trace) {
CPU: bpfTrace.CPU,
ProcessName: bpfTrace.ProcessName,
ExecutablePath: bpfTrace.ExecutablePath,
ContainerID: bpfTrace.ContainerID,
Origin: bpfTrace.Origin,
OffTime: bpfTrace.OffTime,
EnvVars: bpfTrace.EnvVars,
Expand Down
1 change: 1 addition & 0 deletions tracer/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,7 @@ func (t *Tracer) loadBpfTrace(raw []byte, cpu int) *host.Trace {
trace := &host.Trace{
Comm: C.GoString((*C.char)(unsafe.Pointer(&ptr.comm))),
ExecutablePath: procMeta.Executable,
ContainerID: procMeta.ContainerID,
ProcessName: procMeta.Name,
APMTraceID: *(*libpf.APMTraceID)(unsafe.Pointer(&ptr.apm_trace_id)),
APMTransactionID: *(*libpf.APMTransactionID)(unsafe.Pointer(&ptr.apm_transaction_id)),
Expand Down