Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions processmanager/processinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,11 @@ func (pm *ProcessManager) findMappingForTrace(pid libpf.PID, fid host.FileID,
}

func (pm *ProcessManager) ProcessedUntil(traceCaptureKTime times.KTime) {
// Forward to reporter first, outside of pm.mu
if pur, ok := pm.traceReporter.(reporter.ProcessedUntilReporter); ok {
pur.ProcessedUntil(traceCaptureKTime)
}

var err error
defer func() {
if err != nil {
Expand Down
166 changes: 143 additions & 23 deletions reporter/base_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,25 @@ package reporter // import "go.opentelemetry.io/ebpf-profiler/reporter"
import (
"errors"
"fmt"
"sync"
"time"

"go.opentelemetry.io/ebpf-profiler/libpf"
"go.opentelemetry.io/ebpf-profiler/libpf/xsync"
"go.opentelemetry.io/ebpf-profiler/reporter/internal/pdata"
"go.opentelemetry.io/ebpf-profiler/reporter/samples"
"go.opentelemetry.io/ebpf-profiler/support"
"go.opentelemetry.io/ebpf-profiler/times"
)

// pendingReport holds a report waiting for ProcessedUntil confirmation
type pendingReport struct {
samples samples.TraceEventsTree
startTime time.Time // wall-clock collection start
endTime time.Time // wall-clock collection end
endKTime times.KTime // kernel time at collection end
}

// baseReporter encapsulates shared behavior between all the available reporters.
type baseReporter struct {
cfg *Config
Expand All @@ -38,6 +48,20 @@ type baseReporter struct {
// Initialized when Start() is called. The duration of the first profile may be
// slightly overestimated as it includes tracer setup time before samples arrive.
collectionStartTime time.Time

// pending holds a report waiting for ProcessedUntil confirmation before sending
pending *pendingReport

// pendingMu protects pending and maxProcessedUntilKTime
pendingMu sync.Mutex

// maxProcessedUntilKTime is the monotonically increasing watermark for ProcessedUntil.
// ProcessedUntil can go backward due to per-CPU reordering, so we track the max.
maxProcessedUntilKTime times.KTime

// readyCh signals the runLoop that ProcessedUntil has passed the pending threshold.
// Capacity 1 allows non-blocking signal; runLoop drains it.
readyCh chan struct{}
}

var errUnknownOrigin = errors.New("unknown trace origin")
Expand All @@ -46,6 +70,112 @@ func (b *baseReporter) Stop() {
b.runLoop.Stop()
}

// ProcessedUntil implements ProcessedUntilReporter.
func (b *baseReporter) ProcessedUntil(ktime times.KTime) {
b.pendingMu.Lock()

// Track monotonically increasing watermark (ProcessedUntil can go backward)
if ktime > b.maxProcessedUntilKTime {
b.maxProcessedUntilKTime = ktime
}

// Check if we should signal runLoop
shouldSignal := b.pending != nil && b.maxProcessedUntilKTime >= b.pending.endKTime
b.pendingMu.Unlock()

if shouldSignal {
select {
case b.readyCh <- struct{}{}:
default:
// Already signaled, runLoop will handle it
}
}
}

// popPendingIfReady returns the pending report if ProcessedUntil has passed its threshold,
// and clears it from the baseReporter. Returns nil if not ready.
func (b *baseReporter) popPendingIfReady() *pendingReport {
b.pendingMu.Lock()
defer b.pendingMu.Unlock()

if b.pending != nil && b.maxProcessedUntilKTime >= b.pending.endKTime {
p := b.pending
b.pending = nil
return p
}
return nil
}

// swapPendingReport swaps the current buffer and returns any reports that should be sent.
// Returns (timeoutFallback, newReport) where timeoutFallback is a previous pending that
// wasn't sent in time, and newReport is set if ProcessedUntil has already passed.
func (b *baseReporter) swapPendingReport() (toSend, sendImmediately *pendingReport) {
b.pendingMu.Lock()
defer b.pendingMu.Unlock()

// Timeout fallback: if pending exists, swap it out for sending
if b.pending != nil {
toSend = b.pending
b.pending = nil
}

// Swap buffer and create new pending
traceEventsPtr := b.traceEvents.WLock()
reportedEvents := (*traceEventsPtr)
*traceEventsPtr = make(samples.TraceEventsTree)
collectionEndTime := time.Now()
collectionEndKTime := times.GetKTime()
collectionStartTime := b.collectionStartTime
b.collectionStartTime = collectionEndTime
b.traceEvents.WUnlock(&traceEventsPtr)

newPending := &pendingReport{
samples: reportedEvents,
startTime: collectionStartTime,
endTime: collectionEndTime,
endKTime: collectionEndKTime,
}

// Check if ProcessedUntil already passed - if so, send immediately
if b.maxProcessedUntilKTime >= collectionEndKTime {
sendImmediately = newPending
} else {
b.pending = newPending
}

return toSend, sendImmediately
}

// addSampleToTree adds a sample to the given trace events tree.
// The tree must be locked by the caller.
func addSampleToTree(tree *samples.TraceEventsTree, containerID libpf.String,
origin libpf.Origin, key samples.TraceAndMetaKey, trace *libpf.Trace,
meta *samples.TraceEventMeta) {
if _, exists := (*tree)[samples.ContainerID(containerID)]; !exists {
(*tree)[samples.ContainerID(containerID)] =
make(map[libpf.Origin]samples.KeyToEventMapping)
}

if _, exists := (*tree)[samples.ContainerID(containerID)][origin]; !exists {
(*tree)[samples.ContainerID(containerID)][origin] =
make(samples.KeyToEventMapping)
}

if events, exists := (*tree)[samples.ContainerID(containerID)][origin][key]; exists {
events.Timestamps = append(events.Timestamps, uint64(meta.Timestamp))
events.OffTimes = append(events.OffTimes, meta.OffTime)
(*tree)[samples.ContainerID(containerID)][origin][key] = events
return
}
(*tree)[samples.ContainerID(containerID)][origin][key] = &samples.TraceEvents{
Frames: trace.Frames,
Timestamps: []uint64{uint64(meta.Timestamp)},
OffTimes: []int64{meta.OffTime},
EnvVars: meta.EnvVars,
Labels: trace.CustomLabels,
}
}

func (b *baseReporter) ReportTraceEvent(trace *libpf.Trace, meta *samples.TraceEventMeta) error {
switch meta.Origin {
case support.TraceOriginSampling:
Expand Down Expand Up @@ -74,31 +204,21 @@ func (b *baseReporter) ReportTraceEvent(trace *libpf.Trace, meta *samples.TraceE
ExtraMeta: extraMeta,
}

eventsTree := b.traceEvents.WLock()
defer b.traceEvents.WUnlock(&eventsTree)

if _, exists := (*eventsTree)[samples.ContainerID(containerID)]; !exists {
(*eventsTree)[samples.ContainerID(containerID)] =
make(map[libpf.Origin]samples.KeyToEventMapping)
// Check if sample should go to pending report.
// Use <= to include samples exactly at the boundary in the pending window.
b.pendingMu.Lock()
if b.pending != nil && meta.Timestamp <= libpf.UnixTime64(b.pending.endKTime.UnixNano()) {
// Sample belongs to pending report window
addSampleToTree(&b.pending.samples, containerID, meta.Origin, key, trace, meta)
b.pendingMu.Unlock()
return nil
}
b.pendingMu.Unlock()

if _, exists := (*eventsTree)[samples.ContainerID(containerID)][meta.Origin]; !exists {
(*eventsTree)[samples.ContainerID(containerID)][meta.Origin] =
make(samples.KeyToEventMapping)
}
// Sample goes to current buffer
eventsTree := b.traceEvents.WLock()
defer b.traceEvents.WUnlock(&eventsTree)

if events, exists := (*eventsTree)[samples.ContainerID(containerID)][meta.Origin][key]; exists {
events.Timestamps = append(events.Timestamps, uint64(meta.Timestamp))
events.OffTimes = append(events.OffTimes, meta.OffTime)
(*eventsTree)[samples.ContainerID(containerID)][meta.Origin][key] = events
return nil
}
(*eventsTree)[samples.ContainerID(containerID)][meta.Origin][key] = &samples.TraceEvents{
Frames: trace.Frames,
Timestamps: []uint64{uint64(meta.Timestamp)},
OffTimes: []int64{meta.OffTime},
EnvVars: meta.EnvVars,
Labels: trace.CustomLabels,
}
addSampleToTree(eventsTree, containerID, meta.Origin, key, trace, meta)
return nil
}
48 changes: 33 additions & 15 deletions reporter/collector_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ import (
// Assert that we implement the full Reporter interface.
var _ Reporter = (*CollectorReporter)(nil)

// Assert that we implement the ProcessedUntilReporter interface.
var _ ProcessedUntilReporter = (*CollectorReporter)(nil)

// CollectorReporter receives and transforms information to be Collector Collector compliant.
type CollectorReporter struct {
*baseReporter
Expand Down Expand Up @@ -48,6 +51,7 @@ func NewCollector(cfg *Config, nextConsumer xconsumer.Profiles) (*CollectorRepor
runLoop: &runLoop{
stopSignal: make(chan libpf.Void),
},
readyCh: make(chan struct{}, 1),
},
nextConsumer: nextConsumer,
}, nil
Expand All @@ -66,11 +70,12 @@ func (r *CollectorReporter) Start(ctx context.Context) error {
}, func() {
// Allow the GC to purge expired entries to avoid memory leaks.
r.pdata.Purge()
}, r.readyCh, func() {
r.trySendPending(ctx)
})

// When Stop() is called and a signal to 'stop' is received, then:
// - cancel the reporting functions currently running (using context)
// - close the gRPC connection with collection-agent
go func() {
<-r.runLoop.stopSignal
cancelReporting()
Expand All @@ -81,26 +86,39 @@ func (r *CollectorReporter) Start(ctx context.Context) error {

// reportProfile creates and sends out a profile.
func (r *CollectorReporter) reportProfile(ctx context.Context) error {
traceEventsPtr := r.traceEvents.WLock()
reportedEvents := (*traceEventsPtr)
newEvents := make(samples.TraceEventsTree)
*traceEventsPtr = newEvents
collectionEndTime := time.Now()
collectionStartTime := r.collectionStartTime
r.collectionStartTime = collectionEndTime
r.traceEvents.WUnlock(&traceEventsPtr)

profiles, err := r.pdata.Generate(reportedEvents, r.name, r.version,
collectionStartTime, collectionEndTime)
toSend, sendImmediately := r.swapPendingReport()

if toSend != nil {
r.sendPending(ctx, toSend)
}
if sendImmediately != nil {
r.sendPending(ctx, sendImmediately)
}
return nil
}

// trySendPending attempts to send the pending report if ProcessedUntil has passed its threshold.
func (r *CollectorReporter) trySendPending(ctx context.Context) {
if pending := r.popPendingIfReady(); pending != nil {
r.sendPending(ctx, pending)
}
}

// sendPending sends a pending report (must be called WITHOUT holding pendingMu).
func (r *CollectorReporter) sendPending(ctx context.Context, pending *pendingReport) {
profiles, err := r.pdata.Generate(pending.samples, r.name, r.version,
pending.startTime, pending.endTime)
if err != nil {
log.Errorf("pdata: %v", err)
return nil
return
}

if profiles.SampleCount() == 0 {
log.Debugf("Skip sending profile with no samples")
return nil
return
}

return r.nextConsumer.ConsumeProfiles(ctx, profiles)
if err := r.nextConsumer.ConsumeProfiles(ctx, profiles); err != nil {
log.Errorf("Failed to send profile: %v", err)
}
}
9 changes: 9 additions & 0 deletions reporter/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"go.opentelemetry.io/ebpf-profiler/libpf"
"go.opentelemetry.io/ebpf-profiler/process"
"go.opentelemetry.io/ebpf-profiler/reporter/samples"
"go.opentelemetry.io/ebpf-profiler/times"
)

// Reporter is the top-level interface implemented by a full reporter.
Expand All @@ -33,6 +34,14 @@ type TraceReporter interface {
ReportTraceEvent(trace *libpf.Trace, meta *samples.TraceEventMeta) error
}

// ProcessedUntilReporter is an optional interface that reporters can implement
// to receive notifications about sample processing progress.
type ProcessedUntilReporter interface {
// ProcessedUntil indicates all trace events with kernel timestamps <= ktime
// have been delivered to the reporter.
ProcessedUntil(ktime times.KTime)
}

type ExecutableMetadata struct {
// MappingFile is the reference to mapping file data.
MappingFile libpf.FrameMappingFile
Expand Down
Loading