diff --git a/processmanager/processinfo.go b/processmanager/processinfo.go index e1a764aa2..783831753 100644 --- a/processmanager/processinfo.go +++ b/processmanager/processinfo.go @@ -736,6 +736,11 @@ func (pm *ProcessManager) findMappingForTrace(pid libpf.PID, fid host.FileID, } func (pm *ProcessManager) ProcessedUntil(traceCaptureKTime times.KTime) { + // Forward to reporter first, outside of pm.mu + if pur, ok := pm.traceReporter.(reporter.ProcessedUntilReporter); ok { + pur.ProcessedUntil(traceCaptureKTime) + } + var err error defer func() { if err != nil { diff --git a/reporter/base_reporter.go b/reporter/base_reporter.go index ddd21627f..0a55f7df8 100644 --- a/reporter/base_reporter.go +++ b/reporter/base_reporter.go @@ -6,6 +6,7 @@ package reporter // import "go.opentelemetry.io/ebpf-profiler/reporter" import ( "errors" "fmt" + "sync" "time" "go.opentelemetry.io/ebpf-profiler/libpf" @@ -13,8 +14,17 @@ import ( "go.opentelemetry.io/ebpf-profiler/reporter/internal/pdata" "go.opentelemetry.io/ebpf-profiler/reporter/samples" "go.opentelemetry.io/ebpf-profiler/support" + "go.opentelemetry.io/ebpf-profiler/times" ) +// pendingReport holds a report waiting for ProcessedUntil confirmation +type pendingReport struct { + samples samples.TraceEventsTree + startTime time.Time // wall-clock collection start + endTime time.Time // wall-clock collection end + endKTime times.KTime // kernel time at collection end +} + // baseReporter encapsulates shared behavior between all the available reporters. type baseReporter struct { cfg *Config @@ -38,6 +48,20 @@ type baseReporter struct { // Initialized when Start() is called. The duration of the first profile may be // slightly overestimated as it includes tracer setup time before samples arrive. collectionStartTime time.Time + + // pending holds a report waiting for ProcessedUntil confirmation before sending + pending *pendingReport + + // pendingMu protects pending and maxProcessedUntilKTime + pendingMu sync.Mutex + + // maxProcessedUntilKTime is the monotonically increasing watermark for ProcessedUntil. + // ProcessedUntil can go backward due to per-CPU reordering, so we track the max. + maxProcessedUntilKTime times.KTime + + // readyCh signals the runLoop that ProcessedUntil has passed the pending threshold. + // Capacity 1 allows non-blocking signal; runLoop drains it. + readyCh chan struct{} } var errUnknownOrigin = errors.New("unknown trace origin") @@ -46,6 +70,112 @@ func (b *baseReporter) Stop() { b.runLoop.Stop() } +// ProcessedUntil implements ProcessedUntilReporter. +func (b *baseReporter) ProcessedUntil(ktime times.KTime) { + b.pendingMu.Lock() + + // Track monotonically increasing watermark (ProcessedUntil can go backward) + if ktime > b.maxProcessedUntilKTime { + b.maxProcessedUntilKTime = ktime + } + + // Check if we should signal runLoop + shouldSignal := b.pending != nil && b.maxProcessedUntilKTime >= b.pending.endKTime + b.pendingMu.Unlock() + + if shouldSignal { + select { + case b.readyCh <- struct{}{}: + default: + // Already signaled, runLoop will handle it + } + } +} + +// popPendingIfReady returns the pending report if ProcessedUntil has passed its threshold, +// and clears it from the baseReporter. Returns nil if not ready. +func (b *baseReporter) popPendingIfReady() *pendingReport { + b.pendingMu.Lock() + defer b.pendingMu.Unlock() + + if b.pending != nil && b.maxProcessedUntilKTime >= b.pending.endKTime { + p := b.pending + b.pending = nil + return p + } + return nil +} + +// swapPendingReport swaps the current buffer and returns any reports that should be sent. +// Returns (timeoutFallback, newReport) where timeoutFallback is a previous pending that +// wasn't sent in time, and newReport is set if ProcessedUntil has already passed. +func (b *baseReporter) swapPendingReport() (toSend, sendImmediately *pendingReport) { + b.pendingMu.Lock() + defer b.pendingMu.Unlock() + + // Timeout fallback: if pending exists, swap it out for sending + if b.pending != nil { + toSend = b.pending + b.pending = nil + } + + // Swap buffer and create new pending + traceEventsPtr := b.traceEvents.WLock() + reportedEvents := (*traceEventsPtr) + *traceEventsPtr = make(samples.TraceEventsTree) + collectionEndTime := time.Now() + collectionEndKTime := times.GetKTime() + collectionStartTime := b.collectionStartTime + b.collectionStartTime = collectionEndTime + b.traceEvents.WUnlock(&traceEventsPtr) + + newPending := &pendingReport{ + samples: reportedEvents, + startTime: collectionStartTime, + endTime: collectionEndTime, + endKTime: collectionEndKTime, + } + + // Check if ProcessedUntil already passed - if so, send immediately + if b.maxProcessedUntilKTime >= collectionEndKTime { + sendImmediately = newPending + } else { + b.pending = newPending + } + + return toSend, sendImmediately +} + +// addSampleToTree adds a sample to the given trace events tree. +// The tree must be locked by the caller. +func addSampleToTree(tree *samples.TraceEventsTree, containerID libpf.String, + origin libpf.Origin, key samples.TraceAndMetaKey, trace *libpf.Trace, + meta *samples.TraceEventMeta) { + if _, exists := (*tree)[samples.ContainerID(containerID)]; !exists { + (*tree)[samples.ContainerID(containerID)] = + make(map[libpf.Origin]samples.KeyToEventMapping) + } + + if _, exists := (*tree)[samples.ContainerID(containerID)][origin]; !exists { + (*tree)[samples.ContainerID(containerID)][origin] = + make(samples.KeyToEventMapping) + } + + if events, exists := (*tree)[samples.ContainerID(containerID)][origin][key]; exists { + events.Timestamps = append(events.Timestamps, uint64(meta.Timestamp)) + events.OffTimes = append(events.OffTimes, meta.OffTime) + (*tree)[samples.ContainerID(containerID)][origin][key] = events + return + } + (*tree)[samples.ContainerID(containerID)][origin][key] = &samples.TraceEvents{ + Frames: trace.Frames, + Timestamps: []uint64{uint64(meta.Timestamp)}, + OffTimes: []int64{meta.OffTime}, + EnvVars: meta.EnvVars, + Labels: trace.CustomLabels, + } +} + func (b *baseReporter) ReportTraceEvent(trace *libpf.Trace, meta *samples.TraceEventMeta) error { switch meta.Origin { case support.TraceOriginSampling: @@ -74,31 +204,21 @@ func (b *baseReporter) ReportTraceEvent(trace *libpf.Trace, meta *samples.TraceE ExtraMeta: extraMeta, } - eventsTree := b.traceEvents.WLock() - defer b.traceEvents.WUnlock(&eventsTree) - - if _, exists := (*eventsTree)[samples.ContainerID(containerID)]; !exists { - (*eventsTree)[samples.ContainerID(containerID)] = - make(map[libpf.Origin]samples.KeyToEventMapping) + // Check if sample should go to pending report. + // Use <= to include samples exactly at the boundary in the pending window. + b.pendingMu.Lock() + if b.pending != nil && meta.Timestamp <= libpf.UnixTime64(b.pending.endKTime.UnixNano()) { + // Sample belongs to pending report window + addSampleToTree(&b.pending.samples, containerID, meta.Origin, key, trace, meta) + b.pendingMu.Unlock() + return nil } + b.pendingMu.Unlock() - if _, exists := (*eventsTree)[samples.ContainerID(containerID)][meta.Origin]; !exists { - (*eventsTree)[samples.ContainerID(containerID)][meta.Origin] = - make(samples.KeyToEventMapping) - } + // Sample goes to current buffer + eventsTree := b.traceEvents.WLock() + defer b.traceEvents.WUnlock(&eventsTree) - if events, exists := (*eventsTree)[samples.ContainerID(containerID)][meta.Origin][key]; exists { - events.Timestamps = append(events.Timestamps, uint64(meta.Timestamp)) - events.OffTimes = append(events.OffTimes, meta.OffTime) - (*eventsTree)[samples.ContainerID(containerID)][meta.Origin][key] = events - return nil - } - (*eventsTree)[samples.ContainerID(containerID)][meta.Origin][key] = &samples.TraceEvents{ - Frames: trace.Frames, - Timestamps: []uint64{uint64(meta.Timestamp)}, - OffTimes: []int64{meta.OffTime}, - EnvVars: meta.EnvVars, - Labels: trace.CustomLabels, - } + addSampleToTree(eventsTree, containerID, meta.Origin, key, trace, meta) return nil } diff --git a/reporter/collector_reporter.go b/reporter/collector_reporter.go index c43a7d2de..a771be930 100644 --- a/reporter/collector_reporter.go +++ b/reporter/collector_reporter.go @@ -19,6 +19,9 @@ import ( // Assert that we implement the full Reporter interface. var _ Reporter = (*CollectorReporter)(nil) +// Assert that we implement the ProcessedUntilReporter interface. +var _ ProcessedUntilReporter = (*CollectorReporter)(nil) + // CollectorReporter receives and transforms information to be Collector Collector compliant. type CollectorReporter struct { *baseReporter @@ -48,6 +51,7 @@ func NewCollector(cfg *Config, nextConsumer xconsumer.Profiles) (*CollectorRepor runLoop: &runLoop{ stopSignal: make(chan libpf.Void), }, + readyCh: make(chan struct{}, 1), }, nextConsumer: nextConsumer, }, nil @@ -66,11 +70,12 @@ func (r *CollectorReporter) Start(ctx context.Context) error { }, func() { // Allow the GC to purge expired entries to avoid memory leaks. r.pdata.Purge() + }, r.readyCh, func() { + r.trySendPending(ctx) }) // When Stop() is called and a signal to 'stop' is received, then: // - cancel the reporting functions currently running (using context) - // - close the gRPC connection with collection-agent go func() { <-r.runLoop.stopSignal cancelReporting() @@ -81,26 +86,39 @@ func (r *CollectorReporter) Start(ctx context.Context) error { // reportProfile creates and sends out a profile. func (r *CollectorReporter) reportProfile(ctx context.Context) error { - traceEventsPtr := r.traceEvents.WLock() - reportedEvents := (*traceEventsPtr) - newEvents := make(samples.TraceEventsTree) - *traceEventsPtr = newEvents - collectionEndTime := time.Now() - collectionStartTime := r.collectionStartTime - r.collectionStartTime = collectionEndTime - r.traceEvents.WUnlock(&traceEventsPtr) - - profiles, err := r.pdata.Generate(reportedEvents, r.name, r.version, - collectionStartTime, collectionEndTime) + toSend, sendImmediately := r.swapPendingReport() + + if toSend != nil { + r.sendPending(ctx, toSend) + } + if sendImmediately != nil { + r.sendPending(ctx, sendImmediately) + } + return nil +} + +// trySendPending attempts to send the pending report if ProcessedUntil has passed its threshold. +func (r *CollectorReporter) trySendPending(ctx context.Context) { + if pending := r.popPendingIfReady(); pending != nil { + r.sendPending(ctx, pending) + } +} + +// sendPending sends a pending report (must be called WITHOUT holding pendingMu). +func (r *CollectorReporter) sendPending(ctx context.Context, pending *pendingReport) { + profiles, err := r.pdata.Generate(pending.samples, r.name, r.version, + pending.startTime, pending.endTime) if err != nil { log.Errorf("pdata: %v", err) - return nil + return } if profiles.SampleCount() == 0 { log.Debugf("Skip sending profile with no samples") - return nil + return } - return r.nextConsumer.ConsumeProfiles(ctx, profiles) + if err := r.nextConsumer.ConsumeProfiles(ctx, profiles); err != nil { + log.Errorf("Failed to send profile: %v", err) + } } diff --git a/reporter/iface.go b/reporter/iface.go index d26b3f11e..f88e59a7c 100644 --- a/reporter/iface.go +++ b/reporter/iface.go @@ -9,6 +9,7 @@ import ( "go.opentelemetry.io/ebpf-profiler/libpf" "go.opentelemetry.io/ebpf-profiler/process" "go.opentelemetry.io/ebpf-profiler/reporter/samples" + "go.opentelemetry.io/ebpf-profiler/times" ) // Reporter is the top-level interface implemented by a full reporter. @@ -33,6 +34,14 @@ type TraceReporter interface { ReportTraceEvent(trace *libpf.Trace, meta *samples.TraceEventMeta) error } +// ProcessedUntilReporter is an optional interface that reporters can implement +// to receive notifications about sample processing progress. +type ProcessedUntilReporter interface { + // ProcessedUntil indicates all trace events with kernel timestamps <= ktime + // have been delivered to the reporter. + ProcessedUntil(ktime times.KTime) +} + type ExecutableMetadata struct { // MappingFile is the reference to mapping file data. MappingFile libpf.FrameMappingFile diff --git a/reporter/otlp_reporter.go b/reporter/otlp_reporter.go index 9e56bf330..6cf65e7ec 100644 --- a/reporter/otlp_reporter.go +++ b/reporter/otlp_reporter.go @@ -24,6 +24,9 @@ import ( // Assert that we implement the full Reporter interface. var _ Reporter = (*OTLPReporter)(nil) +// Assert that we implement the ProcessedUntilReporter interface. +var _ ProcessedUntilReporter = (*OTLPReporter)(nil) + var gzipOption = grpc.UseCompressor(gzip.Name) // OTLPReporter receives and transforms information to be OTLP/profiles compliant. @@ -63,6 +66,7 @@ func NewOTLP(cfg *Config) (*OTLPReporter, error) { runLoop: &runLoop{ stopSignal: make(chan libpf.Void), }, + readyCh: make(chan struct{}, 1), }, pkgGRPCOperationTimeout: cfg.GRPCOperationTimeout, client: nil, @@ -94,6 +98,8 @@ func (r *OTLPReporter) Start(ctx context.Context) error { }, func() { // Allow the GC to purge expired entries to avoid memory leaks. r.pdata.Purge() + }, r.readyCh, func() { + r.trySendPending(ctx) }) // When Stop() is called and a signal to 'stop' is received, then: @@ -112,32 +118,47 @@ func (r *OTLPReporter) Start(ctx context.Context) error { // reportOTLPProfile creates and sends out an OTLP profile. func (r *OTLPReporter) reportOTLPProfile(ctx context.Context) error { - traceEventsPtr := r.traceEvents.WLock() - reportedEvents := (*traceEventsPtr) - newEvents := make(samples.TraceEventsTree) - *traceEventsPtr = newEvents - collectionEndTime := time.Now() - collectionStartTime := r.collectionStartTime - r.collectionStartTime = collectionEndTime - r.traceEvents.WUnlock(&traceEventsPtr) - - profiles, err := r.pdata.Generate(reportedEvents, r.name, r.version, - collectionStartTime, collectionEndTime) + toSend, sendImmediately := r.swapPendingReport() + + if toSend != nil { + r.sendPending(ctx, toSend) + } + if sendImmediately != nil { + r.sendPending(ctx, sendImmediately) + } + return nil +} + +// trySendPending attempts to send the pending report if ProcessedUntil has passed its threshold. +func (r *OTLPReporter) trySendPending(ctx context.Context) { + if pending := r.popPendingIfReady(); pending != nil { + r.sendPending(ctx, pending) + } +} + +// sendPending sends a pending report (must be called WITHOUT holding pendingMu). +func (r *OTLPReporter) sendPending(ctx context.Context, pending *pendingReport) { + profiles, err := r.pdata.Generate(pending.samples, r.name, r.version, + pending.startTime, pending.endTime) if err != nil { log.Errorf("pdata: %v", err) - return nil + return } + if profiles.SampleCount() == 0 { log.Debugf("Skip sending of OTLP profile with no samples") - return nil + return } req := pprofileotlp.NewExportRequestFromProfiles(profiles) reqCtx, ctxCancel := context.WithTimeout(ctx, r.pkgGRPCOperationTimeout) defer ctxCancel() + _, err = r.client.Export(reqCtx, req, gzipOption) - return err + if err != nil { + log.Errorf("Failed to send profile: %v", err) + } } // waitGrpcEndpoint waits until the gRPC connection is established. diff --git a/reporter/runloop.go b/reporter/runloop.go index 0f2345285..6b40f2bd4 100644 --- a/reporter/runloop.go +++ b/reporter/runloop.go @@ -17,7 +17,7 @@ type runLoop struct { } func (rl *runLoop) Start(ctx context.Context, reportInterval time.Duration, jitter float64, - run, purge func()) { + run, purge func(), readyCh <-chan struct{}, onReady func()) { go func() { tick := time.NewTicker(reportInterval) defer tick.Stop() @@ -35,6 +35,8 @@ func (rl *runLoop) Start(ctx context.Context, reportInterval time.Duration, jitt tick.Reset(libpf.AddJitter(reportInterval, jitter)) case <-purgeTick.C: purge() + case <-readyCh: + onReady() } } }()