diff --git a/go.mod b/go.mod index d94fc2645..145446b0b 100644 --- a/go.mod +++ b/go.mod @@ -3,12 +3,12 @@ module go.opentelemetry.io/ebpf-profiler go 1.22.2 require ( + github.com/aws/aws-sdk-go-v2 v1.30.5 github.com/aws/aws-sdk-go-v2/config v1.27.35 - github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.21 github.com/aws/aws-sdk-go-v2/service/s3 v1.62.0 github.com/cespare/xxhash/v2 v2.3.0 github.com/cilium/ebpf v0.16.0 - github.com/elastic/go-freelru v0.15.0 + github.com/elastic/go-freelru v0.16.0 github.com/elastic/go-perf v0.0.0-20241016160959-1342461adb4a github.com/google/uuid v1.6.0 github.com/jsimonetti/rtnetlink v1.4.2 @@ -29,7 +29,6 @@ require ( ) require ( - github.com/aws/aws-sdk-go-v2 v1.30.5 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.4 // indirect github.com/aws/aws-sdk-go-v2/credentials v1.17.33 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.13 // indirect diff --git a/go.sum b/go.sum index 10c61a9e1..7fe300bbf 100644 --- a/go.sum +++ b/go.sum @@ -8,8 +8,6 @@ github.com/aws/aws-sdk-go-v2/credentials v1.17.33 h1:lBHAQQznENv0gLHAZ73ONiTSkCt github.com/aws/aws-sdk-go-v2/credentials v1.17.33/go.mod h1:MBuqCUOT3ChfLuxNDGyra67eskx7ge9e3YKYBce7wpI= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.13 h1:pfQ2sqNpMVK6xz2RbqLEL0GH87JOwSxPV2rzm8Zsb74= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.13/go.mod h1:NG7RXPUlqfsCLLFfi0+IpKN4sCB9D9fw/qTaSB+xRoU= -github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.21 h1:sV0doPPsRT7gMP0BnDPwSsysVTV/nKpB/nFmMnz8goE= -github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.21/go.mod h1:ictvfJWqE2gkUFDRJVp5VU/TrytuzK88DYcpan7UYuA= github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.17 h1:pI7Bzt0BJtYA0N/JEC6B8fJ4RBrEMi1LBrkMdFYNSnQ= github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.17/go.mod h1:Dh5zzJYMtxfIjYW+/evjQ8uj2OyR/ve2KROHGHlSFqE= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.17 h1:Mqr/V5gvrhA2gvgnF42Zh5iMiQNcOYthFYwCyrnuWlc= @@ -43,10 +41,8 @@ github.com/cilium/ebpf v0.16.0/go.mod h1:L7u2Blt2jMM/vLAVgjxluxtBKlz3/GWjB0dMOEn github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/elastic/go-freelru v0.13.0 h1:TKKY6yCfNNNky7Pj9xZAOEpBcdNgZJfihEftOb55omg= -github.com/elastic/go-freelru v0.13.0/go.mod h1:bSdWT4M0lW79K8QbX6XY2heQYSCqD7THoYf82pT/H3I= -github.com/elastic/go-freelru v0.15.0 h1:Jo1aY8JAvpyxbTDJEudrsBfjFDaALpfVv8mxuh9sfvI= -github.com/elastic/go-freelru v0.15.0/go.mod h1:bSdWT4M0lW79K8QbX6XY2heQYSCqD7THoYf82pT/H3I= +github.com/elastic/go-freelru v0.16.0 h1:gG2HJ1WXN2tNl5/p40JS/l59HjvjRhjyAa+oFTRArYs= +github.com/elastic/go-freelru v0.16.0/go.mod h1:bSdWT4M0lW79K8QbX6XY2heQYSCqD7THoYf82pT/H3I= github.com/elastic/go-perf v0.0.0-20241016160959-1342461adb4a h1:ymmtaN4bVCmKKeu4XEf6JEWNZKRXPMng1zjpKd+8rCU= github.com/elastic/go-perf v0.0.0-20241016160959-1342461adb4a/go.mod h1:Nt+pnRYvf0POC+7pXsrv8ubsEOSsaipJP0zlz1Ms1RM= github.com/go-quicktest/qt v1.101.0 h1:O1K29Txy5P2OK0dGo59b7b0LR6wKfIhttaAhHUyn7eI= diff --git a/host/host.go b/host/host.go index c12a02a37..4b6a9d44a 100644 --- a/host/host.go +++ b/host/host.go @@ -55,4 +55,5 @@ type Trace struct { TID libpf.PID APMTraceID libpf.APMTraceID APMTransactionID libpf.APMTransactionID + CPU int } diff --git a/libpf/cgroupv2.go b/libpf/cgroupv2.go new file mode 100644 index 000000000..aa5d952f4 --- /dev/null +++ b/libpf/cgroupv2.go @@ -0,0 +1,59 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package libpf // import "go.opentelemetry.io/ebpf-profiler/libpf" + +import ( + "bufio" + "fmt" + "os" + "regexp" + + lru "github.com/elastic/go-freelru" + log "github.com/sirupsen/logrus" +) + +var ( + cgroupv2PathPattern = regexp.MustCompile(`0:.*?:(.*)`) +) + +// LookupCgroupv2 returns the cgroupv2 ID for pid. +func LookupCgroupv2(cgrouplru *lru.SyncedLRU[PID, string], pid PID) (string, error) { + id, ok := cgrouplru.Get(pid) + if ok { + return id, nil + } + + // Slow path + f, err := os.Open(fmt.Sprintf("/proc/%d/cgroup", pid)) + if err != nil { + return "", err + } + defer f.Close() + + var genericCgroupv2 string + scanner := bufio.NewScanner(f) + buf := make([]byte, 512) + // Providing a predefined buffer overrides the internal buffer that Scanner uses (4096 bytes). + // We can do that and also set a maximum allocation size on the following call. + // With a maximum of 4096 characters path in the kernel, 8192 should be fine here. We don't + // expect lines in /proc//cgroup to be longer than that. + scanner.Buffer(buf, 8192) + var pathParts []string + for scanner.Scan() { + line := scanner.Text() + pathParts = cgroupv2PathPattern.FindStringSubmatch(line) + if pathParts == nil { + log.Debugf("Could not extract cgroupv2 path from line: %s", line) + continue + } + genericCgroupv2 = pathParts[1] + break + } + + // Cache the cgroupv2 information. + // To avoid busy lookups, also empty cgroupv2 information is cached. + cgrouplru.Add(pid, genericCgroupv2) + + return genericCgroupv2, nil +} diff --git a/libpf/libpf.go b/libpf/libpf.go index 653d9ef5e..64e51d13d 100644 --- a/libpf/libpf.go +++ b/libpf/libpf.go @@ -5,8 +5,6 @@ package libpf // import "go.opentelemetry.io/ebpf-profiler/libpf" import ( "encoding/json" - "fmt" - "math" "time" ) @@ -32,36 +30,9 @@ func NowAsUInt32() uint32 { return uint32(time.Now().Unix()) } -// UnixTime64 represents nanoseconds or (reduced precision) seconds since epoch. +// UnixTime64 represents nanoseconds since epoch. type UnixTime64 uint64 -func (t UnixTime64) MarshalJSON() ([]byte, error) { - if t > math.MaxUint32 { - // Nanoseconds, ES does not support 'epoch_nanoseconds' so - // we have to pass it a value formatted as 'strict_date_optional_time_nanos'. - out := []byte(fmt.Sprintf("%q", - time.Unix(0, int64(t)).UTC().Format(time.RFC3339Nano))) - return out, nil - } - - // Reduced precision seconds-since-the-epoch, ES 'epoch_second' formatter will match these. - out := []byte(fmt.Sprintf("%d", t)) - return out, nil -} - -// Unix returns the value as seconds since epoch. -func (t UnixTime64) Unix() int64 { - if t > math.MaxUint32 { - // Nanoseconds, convert to seconds-since-the-epoch - return time.Unix(0, int64(t)).Unix() - } - - return int64(t) -} - -// Compile-time interface checks -var _ json.Marshaler = (*UnixTime64)(nil) - // AddressOrLineno represents a line number in an interpreted file or an offset into // a native file. type AddressOrLineno uint64 diff --git a/libpf/libpf_test.go b/libpf/libpf_test.go index d2f0e938e..38dfea1e8 100644 --- a/libpf/libpf_test.go +++ b/libpf/libpf_test.go @@ -4,12 +4,9 @@ package libpf import ( - "fmt" - "strconv" "testing" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) func TestTraceType(t *testing.T) { @@ -45,35 +42,3 @@ func TestTraceType(t *testing.T) { assert.Equal(t, test.str, test.ty.String()) } } - -func TestUnixTime64_MarshalJSON(t *testing.T) { - tests := []struct { - name string - time UnixTime64 - want []byte - }{ - { - name: "zero", - time: UnixTime64(0), - want: []byte(strconv.Itoa(0)), - }, - { - name: "non-zero, seconds since the epoch", - time: UnixTime64(1710349106), - want: []byte(strconv.Itoa(1710349106)), - }, - { - name: "non-zero, nanoseconds since the epoch", - time: UnixTime64(1710349106864964685), - want: []byte(fmt.Sprintf("%q", "2024-03-13T16:58:26.864964685Z")), - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - b, err := test.time.MarshalJSON() - require.NoError(t, err) - assert.Equal(t, test.want, b) - }) - } -} diff --git a/main.go b/main.go index 7a5e0330f..106413b8a 100644 --- a/main.go +++ b/main.go @@ -124,7 +124,7 @@ func mainWithExitCode() exitCode { GRPCStartupBackoffTime: intervals.GRPCStartupBackoffTime(), GRPCConnectionTimeout: intervals.GRPCConnectionTimeout(), ReportInterval: intervals.ReportInterval(), - ExecutablesCacheElements: 4096, + ExecutablesCacheElements: 16384, // Next step: Calculate FramesCacheElements from numCores and samplingRate. FramesCacheElements: 65536, CGroupCacheElements: 1024, diff --git a/proc/proc.go b/proc/proc.go index eeadeca61..8d61b67df 100644 --- a/proc/proc.go +++ b/proc/proc.go @@ -121,10 +121,15 @@ func GetKernelModules(modulesPath string, count++ - nFields, _ := fmt.Sscanf(line, "%s %d %d %s %s 0x%x", + nFields, err := fmt.Sscanf(line, "%s %d %d %s %s 0x%x", &name, &size, &refcount, &dependencies, &state, &address) + if err != nil { + log.Warnf("err parsing line in modules: '%s'", err) + continue + } if nFields < 6 { - return nil, fmt.Errorf("unexpected line in modules: '%s'", line) + log.Warnf("unexpected line in modules: '%s'", line) + continue } if address == 0 { continue diff --git a/reporter/iface.go b/reporter/iface.go index 85e4a1b2e..7cf19cf3e 100644 --- a/reporter/iface.go +++ b/reporter/iface.go @@ -36,6 +36,7 @@ type TraceEventMeta struct { Comm string APMServiceName string PID, TID libpf.PID + CPU int } type TraceReporter interface { diff --git a/reporter/otlp_reporter.go b/reporter/otlp_reporter.go index 1dcace293..c2fbdf299 100644 --- a/reporter/otlp_reporter.go +++ b/reporter/otlp_reporter.go @@ -4,14 +4,10 @@ package reporter // import "go.opentelemetry.io/ebpf-profiler/reporter" import ( - "bufio" "context" "crypto/rand" "crypto/tls" - "fmt" "maps" - "os" - "regexp" "slices" "strconv" "time" @@ -33,8 +29,8 @@ import ( "go.opentelemetry.io/ebpf-profiler/libpf/xsync" ) -var ( - cgroupv2PathPattern = regexp.MustCompile(`0:.*?:(.*)`) +const ( + executableCacheLifetime = 1 * time.Hour ) // Assert that we implement the full Reporter interface. @@ -104,8 +100,8 @@ type OTLPReporter struct { // client for the connection to the receiver. client otlpcollector.ProfilesServiceClient - // stopSignal is the stop signal for shutting down all background tasks. - stopSignal chan libpf.Void + // runLoop handles the run loop + runLoop *runLoop // rpcStats stores gRPC related statistics. rpcStats *StatsHandlerImpl @@ -155,7 +151,7 @@ func NewOTLP(cfg *Config) (*OTLPReporter, error) { if err != nil { return nil, err } - executables.SetLifetime(1 * time.Hour) // Allow GC to clean stale items. + executables.SetLifetime(executableCacheLifetime) // Allow GC to clean stale items. frames, err := lru.NewSynced[libpf.FileID, *xsync.RWMutex[map[libpf.AddressOrLineno]sourceInfo]]( @@ -182,15 +178,17 @@ func NewOTLP(cfg *Config) (*OTLPReporter, error) { } return &OTLPReporter{ - config: cfg, - name: cfg.Name, - version: cfg.Version, - kernelVersion: cfg.KernelVersion, - hostName: cfg.HostName, - ipAddress: cfg.IPAddress, - samplesPerSecond: cfg.SamplesPerSecond, - hostID: strconv.FormatUint(cfg.HostID, 10), - stopSignal: make(chan libpf.Void), + config: cfg, + name: cfg.Name, + version: cfg.Version, + kernelVersion: cfg.KernelVersion, + hostName: cfg.HostName, + ipAddress: cfg.IPAddress, + samplesPerSecond: cfg.SamplesPerSecond, + hostID: strconv.FormatUint(cfg.HostID, 10), + runLoop: &runLoop{ + stopSignal: make(chan libpf.Void), + }, pkgGRPCOperationTimeout: cfg.GRPCOperationTimeout, client: nil, rpcStats: NewStatsHandler(), @@ -216,7 +214,7 @@ func (r *OTLPReporter) ReportTraceEvent(trace *libpf.Trace, meta *TraceEventMeta traceEventsMap := r.traceEvents.WLock() defer r.traceEvents.WUnlock(&traceEventsMap) - containerID, err := r.lookupCgroupv2(meta.PID) + containerID, err := libpf.LookupCgroupv2(r.cgroupv2ID, meta.PID) if err != nil { log.Debugf("Failed to get a cgroupv2 ID as container ID for PID %d: %v", meta.PID, err) @@ -348,7 +346,7 @@ func (r *OTLPReporter) ReportMetrics(_ uint32, _ []uint32, _ []int64) {} // Stop triggers a graceful shutdown of OTLPReporter. func (r *OTLPReporter) Stop() { - close(r.stopSignal) + r.runLoop.Stop() } // GetMetrics returns internal metrics of OTLPReporter. @@ -372,41 +370,27 @@ func (r *OTLPReporter) Start(ctx context.Context) error { otlpGrpcConn, err := waitGrpcEndpoint(ctx, r.config, r.rpcStats) if err != nil { cancelReporting() - close(r.stopSignal) + r.runLoop.Stop() return err } r.client = otlpcollector.NewProfilesServiceClient(otlpGrpcConn) - go func() { - tick := time.NewTicker(r.config.ReportInterval) - defer tick.Stop() - purgeTick := time.NewTicker(5 * time.Minute) - defer purgeTick.Stop() - for { - select { - case <-ctx.Done(): - return - case <-r.stopSignal: - return - case <-tick.C: - if err := r.reportOTLPProfile(ctx); err != nil { - log.Errorf("Request failed: %v", err) - } - tick.Reset(libpf.AddJitter(r.config.ReportInterval, 0.2)) - case <-purgeTick.C: - // Allow the GC to purge expired entries to avoid memory leaks. - r.executables.PurgeExpired() - r.frames.PurgeExpired() - r.cgroupv2ID.PurgeExpired() - } + r.runLoop.Start(ctx, r.config.ReportInterval, func() { + if err := r.reportOTLPProfile(ctx); err != nil { + log.Errorf("Request failed: %v", err) } - }() + }, func() { + // Allow the GC to purge expired entries to avoid memory leaks. + r.executables.PurgeExpired() + r.frames.PurgeExpired() + r.cgroupv2ID.PurgeExpired() + }) // When Stop() is called and a signal to 'stop' is received, then: // - cancel the reporting functions currently running (using context) // - close the gRPC connection with collection-agent go func() { - <-r.stopSignal + <-r.runLoop.stopSignal cancelReporting() if err := otlpGrpcConn.Close(); err != nil { log.Fatalf("Stopping connection of OTLP client client failed: %v", err) @@ -594,7 +578,9 @@ func (r *OTLPReporter) getProfile() (profile *profiles.Profile, startTS, endTS u fileIDtoMapping[traceInfo.files[i]] = idx locationMappingIndex = idx - execInfo, exists := r.executables.Get(traceInfo.files[i]) + // Ensure that actively used executables do not expire. + execInfo, exists := r.executables.GetAndRefresh(traceInfo.files[i], + executableCacheLifetime) // Next step: Select a proper default value, // if the name of the executable is not known yet. @@ -871,44 +857,3 @@ func setupGrpcConnection(parent context.Context, cfg *Config, //nolint:staticcheck return grpc.DialContext(ctx, cfg.CollAgentAddr, opts...) } - -// lookupCgroupv2 returns the cgroupv2 ID for pid. -func (r *OTLPReporter) lookupCgroupv2(pid libpf.PID) (string, error) { - id, ok := r.cgroupv2ID.Get(pid) - if ok { - return id, nil - } - - // Slow path - f, err := os.Open(fmt.Sprintf("/proc/%d/cgroup", pid)) - if err != nil { - return "", err - } - defer f.Close() - - var genericCgroupv2 string - scanner := bufio.NewScanner(f) - buf := make([]byte, 512) - // Providing a predefined buffer overrides the internal buffer that Scanner uses (4096 bytes). - // We can do that and also set a maximum allocation size on the following call. - // With a maximum of 4096 characters path in the kernel, 8192 should be fine here. We don't - // expect lines in /proc//cgroup to be longer than that. - scanner.Buffer(buf, 8192) - var pathParts []string - for scanner.Scan() { - line := scanner.Text() - pathParts = cgroupv2PathPattern.FindStringSubmatch(line) - if pathParts == nil { - log.Debugf("Could not extract cgroupv2 path from line: %s", line) - continue - } - genericCgroupv2 = pathParts[1] - break - } - - // Cache the cgroupv2 information. - // To avoid busy lookups, also empty cgroupv2 information is cached. - r.cgroupv2ID.Add(pid, genericCgroupv2) - - return genericCgroupv2, nil -} diff --git a/reporter/runloop.go b/reporter/runloop.go new file mode 100644 index 000000000..0014d6347 --- /dev/null +++ b/reporter/runloop.go @@ -0,0 +1,44 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package reporter // import "go.opentelemetry.io/ebpf-profiler/reporter" + +import ( + "context" + "time" + + "go.opentelemetry.io/ebpf-profiler/libpf" +) + +// runLoop implements the run loop for all reporters +type runLoop struct { + // stopSignal is the stop signal for shutting down all background tasks. + stopSignal chan libpf.Void +} + +func (rl *runLoop) Start(ctx context.Context, reportInterval time.Duration, run, purge func()) { + go func() { + tick := time.NewTicker(reportInterval) + defer tick.Stop() + purgeTick := time.NewTicker(5 * time.Minute) + defer purgeTick.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-rl.stopSignal: + return + case <-tick.C: + run() + tick.Reset(libpf.AddJitter(reportInterval, 0.2)) + case <-purgeTick.C: + purge() + } + } + }() +} + +func (rl *runLoop) Stop() { + close(rl.stopSignal) +} diff --git a/tracehandler/tracehandler.go b/tracehandler/tracehandler.go index c1a6e3914..ebc39ac01 100644 --- a/tracehandler/tracehandler.go +++ b/tracehandler/tracehandler.go @@ -126,6 +126,7 @@ func (m *traceHandler) HandleTrace(bpfTrace *host.Trace) { PID: bpfTrace.PID, TID: bpfTrace.TID, APMServiceName: "", // filled in below + CPU: bpfTrace.CPU, } if !m.reporter.SupportsReportTraceEvent() { diff --git a/tracer/events.go b/tracer/events.go index 0a29a9c49..7e21fe564 100644 --- a/tracer/events.go +++ b/tracer/events.go @@ -134,7 +134,7 @@ func startPerfEventMonitor(ctx context.Context, perfEventMap *ebpf.Map, // calls. Returns a function that can be called to retrieve perf event array // error counts. func startPollingPerfEventMonitor(ctx context.Context, perfEventMap *ebpf.Map, - pollFrequency time.Duration, perCPUBufferSize int, triggerFunc func([]byte), + pollFrequency time.Duration, perCPUBufferSize int, triggerFunc func([]byte, int), ) func() (lost, noData, readError uint64) { eventReader, err := perf.NewReader(perfEventMap, perCPUBufferSize) if err != nil { @@ -178,7 +178,7 @@ func startPollingPerfEventMonitor(ctx context.Context, perfEventMap *ebpf.Map, noDataCount.Add(1) continue } - triggerFunc(data.RawSample) + triggerFunc(data.RawSample, data.CPU) } } }() diff --git a/tracer/tracer.go b/tracer/tracer.go index 2da8bff57..780677382 100644 --- a/tracer/tracer.go +++ b/tracer/tracer.go @@ -841,7 +841,7 @@ func (t *Tracer) eBPFMetricsCollector( // // If the raw trace contains a kernel stack ID, the kernel stack is also // retrieved and inserted at the appropriate position. -func (t *Tracer) loadBpfTrace(raw []byte) *host.Trace { +func (t *Tracer) loadBpfTrace(raw []byte, cpu int) *host.Trace { frameListOffs := int(unsafe.Offsetof(C.Trace{}.frames)) if len(raw) < frameListOffs { @@ -863,6 +863,7 @@ func (t *Tracer) loadBpfTrace(raw []byte) *host.Trace { PID: libpf.PID(ptr.pid), TID: libpf.PID(ptr.tid), KTime: times.KTime(ptr.ktime), + CPU: cpu, } // Trace fields included in the hash: @@ -912,8 +913,8 @@ func (t *Tracer) StartMapMonitors(ctx context.Context, traceOutChan chan *host.T eventMetricCollector := t.startEventMonitor(ctx) startPollingPerfEventMonitor(ctx, t.ebpfMaps["trace_events"], t.intervals.TracePollInterval(), - t.samplesPerSecond*int(unsafe.Sizeof(C.Trace{})), func(rawTrace []byte) { - traceOutChan <- t.loadBpfTrace(rawTrace) + t.samplesPerSecond*int(unsafe.Sizeof(C.Trace{})), func(rawTrace []byte, cpu int) { + traceOutChan <- t.loadBpfTrace(rawTrace, cpu) }) pidEvents := make([]uint32, 0)