Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions metrics/ids.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions metrics/metrics.json
Original file line number Diff line number Diff line change
Expand Up @@ -952,13 +952,15 @@
"id": 130
},
{
"obsolete": true,
"description": "Number of cache hits in tracehandler trace cache by BPF hash",
"type": "counter",
"name": "KnownTracesHit",
"field": "bpf.known_traces.hits",
"id": 131
},
{
"obsolete": true,
"description": "Number of cache misses in tracehandler trace cache by BPF hash",
"type": "counter",
"name": "KnownTracesMiss",
Expand Down
22 changes: 9 additions & 13 deletions reporter/base_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package reporter // import "go.opentelemetry.io/ebpf-profiler/reporter"

import (
"context"
"errors"
"fmt"
"time"

lru "github.com/elastic/go-freelru"
Expand Down Expand Up @@ -42,6 +44,8 @@ type baseReporter struct {
hostmetadata *lru.SyncedLRU[string, string]
}

var errUnknownOrigin = errors.New("unknown trace origin")

func (b *baseReporter) Stop() {
b.runLoop.Stop()
}
Expand All @@ -63,13 +67,6 @@ func (b *baseReporter) addHostmetadata(metadataMap map[string]string) {
}
}

// ReportFramesForTrace is a NOP
func (*baseReporter) ReportFramesForTrace(_ *libpf.Trace) {}

// ReportCountForTrace is a NOP
func (b *baseReporter) ReportCountForTrace(_ libpf.TraceHash, _ uint16, _ *samples.TraceEventMeta) {
}

func (b *baseReporter) ExecutableKnown(fileID libpf.FileID) bool {
_, known := b.pdata.Executables.GetAndRefresh(fileID, pdata.ExecutableCacheLifetime)
return known
Expand All @@ -93,13 +90,11 @@ func (b *baseReporter) ExecutableMetadata(args *ExecutableMetadataArgs) {
})
}

func (*baseReporter) SupportsReportTraceEvent() bool { return true }

func (b *baseReporter) ReportTraceEvent(trace *libpf.Trace, meta *samples.TraceEventMeta) {
func (b *baseReporter) ReportTraceEvent(trace *libpf.Trace, meta *samples.TraceEventMeta) error {
if meta.Origin != support.TraceOriginSampling && meta.Origin != support.TraceOriginOffCPU {
// At the moment only on-CPU and off-CPU traces are reported.
log.Errorf("Skip reporting trace for unexpected %d origin", meta.Origin)
return
return fmt.Errorf("skip reporting trace for %d origin: %w", meta.Origin,
errUnknownOrigin)
}

var extraMeta any
Expand Down Expand Up @@ -131,7 +126,7 @@ func (b *baseReporter) ReportTraceEvent(trace *libpf.Trace, meta *samples.TraceE
events.Timestamps = append(events.Timestamps, uint64(meta.Timestamp))
events.OffTimes = append(events.OffTimes, meta.OffTime)
(*traceEventsMap)[meta.Origin][key] = events
return
return nil
}

(*traceEventsMap)[meta.Origin][key] = &samples.TraceEvents{
Expand All @@ -145,6 +140,7 @@ func (b *baseReporter) ReportTraceEvent(trace *libpf.Trace, meta *samples.TraceE
OffTimes: []int64{meta.OffTime},
EnvVars: meta.EnvVars,
}
return nil
}

func (b *baseReporter) FrameMetadata(args *FrameMetadataArgs) {
Expand Down
5 changes: 4 additions & 1 deletion reporter/collector_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ func TestCollectorReporterReportTraceEvent(t *testing.T) {
CGroupCacheElements: 1,
}, next)
require.NoError(t, err)
r.ReportTraceEvent(tt.trace, tt.meta)
if err := r.ReportTraceEvent(tt.trace, tt.meta); err != nil &&
!errors.Is(err, errUnknownOrigin) {
t.Fatal(err)
}
})
}
}
20 changes: 4 additions & 16 deletions reporter/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,10 @@ type Reporter interface {
}

type TraceReporter interface {
// ReportFramesForTrace accepts a trace with the corresponding frames
// and caches this information before a periodic reporting to the backend.
ReportFramesForTrace(trace *libpf.Trace)

// ReportCountForTrace accepts a hash of a trace with a corresponding count and
// caches this information before a periodic reporting to the backend.
ReportCountForTrace(traceHash libpf.TraceHash, count uint16, meta *samples.TraceEventMeta)

// ReportTraceEvent accepts a trace event (trace metadata with frames and counts)
// and caches it for reporting to the backend. It returns true if the event was
// enqueued for reporting, and false if the event was ignored.
ReportTraceEvent(trace *libpf.Trace, meta *samples.TraceEventMeta)

// SupportsReportTraceEvent returns true if the reporter supports reporting trace events
// via ReportTraceEvent().
SupportsReportTraceEvent() bool
// ReportTraceEvent accepts a trace event (trace metadata with frames)
// and enqueues it for reporting to the backend.
// If handling the trace event fails it returns an error.
ReportTraceEvent(trace *libpf.Trace, meta *samples.TraceEventMeta) error
}

// ExecutableOpener is a function that attempts to open an executable.
Expand Down
18 changes: 4 additions & 14 deletions tracehandler/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,14 @@ func (m *traceHandler) collectMetrics() {
metrics.AddSlice([]metrics.Metric{
{
ID: metrics.IDTraceCacheHit,
Value: metrics.MetricValue(m.umTraceCacheHit),
Value: metrics.MetricValue(m.traceCacheHit),
},
{
ID: metrics.IDTraceCacheMiss,
Value: metrics.MetricValue(m.umTraceCacheMiss),
},
{
ID: metrics.IDKnownTracesHit,
Value: metrics.MetricValue(m.bpfTraceCacheHit),
},
{
ID: metrics.IDKnownTracesMiss,
Value: metrics.MetricValue(m.bpfTraceCacheMiss),
Value: metrics.MetricValue(m.traceCacheMiss),
},
})

m.umTraceCacheHit = 0
m.umTraceCacheMiss = 0
m.bpfTraceCacheHit = 0
m.bpfTraceCacheMiss = 0
m.traceCacheHit = 0
m.traceCacheMiss = 0
}
101 changes: 50 additions & 51 deletions tracehandler/tracehandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package tracehandler // import "go.opentelemetry.io/ebpf-profiler/tracehandler"
import (
"context"
"fmt"
"sync"
"time"

lru "github.com/elastic/go-freelru"
Expand All @@ -29,6 +30,10 @@ type Times interface {
MonitorInterval() time.Duration
}

// Default lifetime of elements in the cache to reduce recurring
// symbolization efforts.
var traceCacheLifetime = 5 * time.Minute

// TraceProcessor is an interface used by traceHandler to convert traces
// from a form received from eBPF to the form we wish to dispatch to the
// collection agent.
Expand All @@ -54,21 +59,15 @@ type TraceProcessor interface {
// from the eBPF components.
type traceHandler struct {
// Metrics
umTraceCacheHit uint64
umTraceCacheMiss uint64
bpfTraceCacheHit uint64
bpfTraceCacheMiss uint64
traceCacheHit uint64
traceCacheMiss uint64

traceProcessor TraceProcessor

// bpfTraceCache stores mappings from BPF to user-mode hashes. This allows
// traceCache stores mappings from BPF hashes to symbolized traces. This allows
// avoiding the overhead of re-doing user-mode symbolization of traces that
// we have recently seen already.
bpfTraceCache *lru.LRU[host.TraceHash, libpf.TraceHash]

// umTraceCache is a LRU set that suppresses unnecessary resends of traces
// that we have recently reported to the collector already.
umTraceCache *lru.LRU[libpf.TraceHash, libpf.Void]
traceCache *lru.SyncedLRU[host.TraceHash, libpf.Trace]

// reporter instance to use to send out traces.
reporter reporter.TraceReporter
Expand All @@ -77,29 +76,43 @@ type traceHandler struct {
}

// newTraceHandler creates a new traceHandler
func newTraceHandler(rep reporter.TraceReporter, traceProcessor TraceProcessor,
intervals Times, cacheSize uint32) (*traceHandler, error) {
bpfTraceCache, err := lru.New[host.TraceHash, libpf.TraceHash](
func newTraceHandler(ctx context.Context, rep reporter.TraceReporter,
traceProcessor TraceProcessor, intervals Times, cacheSize uint32) (*traceHandler, error) {
traceCache, err := lru.NewSynced[host.TraceHash, libpf.Trace](
cacheSize, func(k host.TraceHash) uint32 { return uint32(k) })
if err != nil {
return nil, err
}
// Do not hold elements indefinitely in the cache.
traceCache.SetLifetime(traceCacheLifetime)
Comment thread
florianl marked this conversation as resolved.

umTraceCache, err := lru.New[libpf.TraceHash, libpf.Void](
cacheSize, libpf.TraceHash.Hash32)
if err != nil {
return nil, err
}
var wg sync.WaitGroup
wg.Add(1)

go func() {
wg.Done()
ticker := time.NewTicker(traceCacheLifetime)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
traceCache.PurgeExpired()
}
}
}()

t := &traceHandler{
// Wait to make sure the purge routine did start.
wg.Wait()
Comment on lines +107 to +108
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to wait here? It doesn't harm if the go routine starts after return.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The waitgroup pattern is just used to make sure the Go routine starts. This pattern is used also in other places in the code base. Waiting here should not increase the start up time in a noticeable way. But as we can not control the Go scheduler, we make sure this way, the clean up/purge routine is working properly and scheduled.


return &traceHandler{
traceProcessor: traceProcessor,
bpfTraceCache: bpfTraceCache,
umTraceCache: umTraceCache,
traceCache: traceCache,
reporter: rep,
times: intervals,
}

return t, nil
}, nil
}

func (m *traceHandler) HandleTrace(bpfTrace *host.Trace) {
Expand All @@ -117,40 +130,26 @@ func (m *traceHandler) HandleTrace(bpfTrace *host.Trace) {
EnvVars: bpfTrace.EnvVars,
}

if !m.reporter.SupportsReportTraceEvent() {
// Fast path: if the trace is already known remotely, we just send a counter update.
postConvHash, traceKnown := m.bpfTraceCache.Get(bpfTrace.Hash)
if traceKnown {
m.bpfTraceCacheHit++
meta.APMServiceName = m.traceProcessor.MaybeNotifyAPMAgent(bpfTrace, postConvHash, 1)
m.reporter.ReportCountForTrace(postConvHash, 1, meta)
return
if trace, exists := m.traceCache.GetAndRefresh(bpfTrace.Hash,
traceCacheLifetime); exists {
m.traceCacheHit++
// Fast path
meta.APMServiceName = m.traceProcessor.MaybeNotifyAPMAgent(bpfTrace, trace.Hash, 1)
if err := m.reporter.ReportTraceEvent(&trace, meta); err != nil {
log.Errorf("Failed to report trace event: %v", err)
}
m.bpfTraceCacheMiss++
return
}
m.traceCacheMiss++

// Slow path: convert trace.
umTrace := m.traceProcessor.ConvertTrace(bpfTrace)
log.Debugf("Trace hash remap 0x%x -> 0x%x", bpfTrace.Hash, umTrace.Hash)
m.bpfTraceCache.Add(bpfTrace.Hash, umTrace.Hash)
m.traceCache.Add(bpfTrace.Hash, *umTrace)

meta.APMServiceName = m.traceProcessor.MaybeNotifyAPMAgent(bpfTrace, umTrace.Hash, 1)
if m.reporter.SupportsReportTraceEvent() {
m.reporter.ReportTraceEvent(umTrace, meta)
return
}
m.reporter.ReportCountForTrace(umTrace.Hash, 1, meta)

// Trace already known to collector by UM hash?
if _, known := m.umTraceCache.Get(umTrace.Hash); known {
m.umTraceCacheHit++
return
if err := m.reporter.ReportTraceEvent(umTrace, meta); err != nil {
log.Errorf("Failed to report trace event: %v", err)
}
m.umTraceCacheMiss++

// Nope. Send it now.
m.reporter.ReportFramesForTrace(umTrace)
m.umTraceCache.Add(umTrace.Hash, libpf.Void{})
}

// Start starts a goroutine that receives and processes trace updates over
Expand All @@ -161,7 +160,7 @@ func Start(ctx context.Context, rep reporter.TraceReporter, traceProcessor Trace
traceInChan <-chan *host.Trace, intervals Times, cacheSize uint32,
) (workerExited <-chan libpf.Void, err error) {
handler, err :=
newTraceHandler(rep, traceProcessor, intervals, cacheSize)
newTraceHandler(ctx, rep, traceProcessor, intervals, cacheSize)
if err != nil {
return nil, fmt.Errorf("failed to create traceHandler: %v", err)
}
Expand Down
Loading