diff --git a/reporter/base_reporter.go b/reporter/base_reporter.go index becccb2c2..ddd21627f 100644 --- a/reporter/base_reporter.go +++ b/reporter/base_reporter.go @@ -6,6 +6,7 @@ package reporter // import "go.opentelemetry.io/ebpf-profiler/reporter" import ( "errors" "fmt" + "time" "go.opentelemetry.io/ebpf-profiler/libpf" "go.opentelemetry.io/ebpf-profiler/libpf/xsync" @@ -32,6 +33,11 @@ type baseReporter struct { // traceEvents stores reported trace events (trace metadata with frames and counts) traceEvents xsync.RWMutex[samples.TraceEventsTree] + + // collectionStartTime tracks when the current collection window started. + // Initialized when Start() is called. The duration of the first profile may be + // slightly overestimated as it includes tracer setup time before samples arrive. + collectionStartTime time.Time } var errUnknownOrigin = errors.New("unknown trace origin") diff --git a/reporter/collector_reporter.go b/reporter/collector_reporter.go index e2a95056e..c43a7d2de 100644 --- a/reporter/collector_reporter.go +++ b/reporter/collector_reporter.go @@ -5,6 +5,7 @@ package reporter // import "go.opentelemetry.io/ebpf-profiler/reporter" import ( "context" + "time" "go.opentelemetry.io/collector/consumer/xconsumer" "go.opentelemetry.io/ebpf-profiler/internal/log" @@ -53,6 +54,8 @@ func NewCollector(cfg *Config, nextConsumer xconsumer.Profiles) (*CollectorRepor } func (r *CollectorReporter) Start(ctx context.Context) error { + r.collectionStartTime = time.Now() + // Create a child context for reporting features ctx, cancelReporting := context.WithCancel(ctx) @@ -82,9 +85,13 @@ func (r *CollectorReporter) reportProfile(ctx context.Context) error { reportedEvents := (*traceEventsPtr) newEvents := make(samples.TraceEventsTree) *traceEventsPtr = newEvents + collectionEndTime := time.Now() + collectionStartTime := r.collectionStartTime + r.collectionStartTime = collectionEndTime r.traceEvents.WUnlock(&traceEventsPtr) - profiles, err := r.pdata.Generate(reportedEvents, r.name, r.version) + profiles, err := r.pdata.Generate(reportedEvents, r.name, r.version, + collectionStartTime, collectionEndTime) if err != nil { log.Errorf("pdata: %v", err) return nil diff --git a/reporter/internal/pdata/generate.go b/reporter/internal/pdata/generate.go index 13a7c85ce..9dbb9e3e4 100644 --- a/reporter/internal/pdata/generate.go +++ b/reporter/internal/pdata/generate.go @@ -5,7 +5,6 @@ package pdata // import "go.opentelemetry.io/ebpf-profiler/reporter/internal/pda import ( "fmt" - "math" "path/filepath" "time" @@ -27,13 +26,35 @@ const ( ) // Generate generates a pdata request out of internal profiles data, to be -// exported. +// exported. The collectionStartTime and collectionEndTime define the time window +// during which the profiler was actively collecting samples. func (p *Pdata) Generate(tree samples.TraceEventsTree, agentName, agentVersion string, + collectionStartTime, collectionEndTime time.Time, ) (pprofile.Profiles, error) { profiles := pprofile.NewProfiles() dic := profiles.Dictionary() + // Find oldest sample timestamp across all containers to handle buffered samples. + adjustedStartTime := collectionStartTime + for _, containerEvents := range tree { + for _, originEvents := range containerEvents { + for _, traceEvents := range originEvents { + for _, ts := range traceEvents.Timestamps { + sampleTime := time.Unix(0, int64(ts)) + if sampleTime.Before(adjustedStartTime) { + adjustedStartTime = sampleTime + } + } + } + } + } + if adjustedStartTime.Before(collectionStartTime) { + log.Debugf("Adjusted profile start time backward by %v to include oldest sample", + collectionStartTime.Sub(adjustedStartTime)) + } + collectionStartTime = adjustedStartTime + // Temporary helpers that will build the various tables in ProfilesDictionary. stringSet := make(orderedset.OrderedSet[string], 64) funcSet := make(orderedset.OrderedSet[funcInfo], 64) @@ -84,7 +105,8 @@ func (p *Pdata) Generate(tree samples.TraceEventsTree, prof := sp.Profiles().AppendEmpty() if err := p.setProfile(dic, attrMgr, stringSet, funcSet, mappingSet, stackSet, locationSet, - origin, originToEvents[origin], prof); err != nil { + origin, originToEvents[origin], prof, + collectionStartTime, collectionEndTime); err != nil { return profiles, err } } @@ -124,6 +146,7 @@ func (p *Pdata) setProfile( origin libpf.Origin, events map[samples.TraceAndMetaKey]*samples.TraceEvents, profile pprofile.Profile, + collectionStartTime, collectionEndTime time.Time, ) error { st := profile.SampleType() switch origin { @@ -146,15 +169,9 @@ func (p *Pdata) setProfile( return fmt.Errorf("generating profile for unsupported origin %d", origin) } - startTS, endTS := uint64(math.MaxUint64), uint64(0) for traceKey, traceInfo := range events { sample := profile.Samples().AppendEmpty() - for _, ts := range traceInfo.Timestamps { - startTS = min(startTS, ts) - endTS = max(endTS, ts) - } - sample.TimestampsUnixNano().FromRaw(traceInfo.Timestamps) if origin == support.TraceOriginOffCPU { sample.Values().Append(traceInfo.OffTimes...) @@ -282,8 +299,8 @@ func (p *Pdata) setProfile( log.Debugf("Reporting OTLP profile with %d samples", profile.Samples().Len()) - profile.SetDurationNano(endTS - startTS) - profile.SetTime(pcommon.Timestamp(startTS)) + profile.SetDurationNano(uint64(collectionEndTime.Sub(collectionStartTime).Nanoseconds())) + profile.SetTime(pcommon.Timestamp(collectionStartTime.UnixNano())) return nil } diff --git a/reporter/internal/pdata/generate_test.go b/reporter/internal/pdata/generate_test.go index c0269bced..e547657f5 100644 --- a/reporter/internal/pdata/generate_test.go +++ b/reporter/internal/pdata/generate_test.go @@ -2,6 +2,7 @@ package pdata import ( "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -16,6 +17,20 @@ import ( "go.opentelemetry.io/ebpf-profiler/support" ) +var ( + // Test collection window: 60 second duration + testCollectionStart = time.Unix(1000, 0) + testCollectionEnd = time.Unix(1060, 0) + // Expected profile metadata based on collection window + testProfileTime = pcommon.Timestamp(testCollectionStart.UnixNano()) + testProfileDuration = uint64(testCollectionEnd.Sub(testCollectionStart).Nanoseconds()) +) + +// testGenerate is a helper that calls Generate with the standard test collection window +func testGenerate(p *Pdata, tree samples.TraceEventsTree, name, version string) (pprofile.Profiles, error) { + return p.Generate(tree, name, version, testCollectionStart, testCollectionEnd) +} + func TestGetDummyMappingIndex(t *testing.T) { fileID := libpf.NewFileID(12345678, 12345678) for _, tt := range []struct { @@ -184,7 +199,7 @@ func TestFunctionTableOrder(t *testing.T) { require.NoError(t, err) tree := make(samples.TraceEventsTree) tree[libpf.NullString] = tt.events - res, _ := d.Generate(tree, tt.name, "version") + res, _ := testGenerate(d, tree, tt.name, "version") require.Equal(t, tt.expectedResourceProfiles, res.ResourceProfiles().Len()) if tt.expectedResourceProfiles == 0 { // Do not check elements of ResourceProfile if there is no expected @@ -210,36 +225,100 @@ func TestFunctionTableOrder(t *testing.T) { } func TestProfileDuration(t *testing.T) { + mapping := libpf.NewFrameMapping(libpf.FrameMappingData{ + File: libpf.NewFrameMappingFile(libpf.FrameMappingFileData{ + FileID: libpf.NewFileID(1, 2), + }), + }) + for _, tt := range []struct { - name string - events map[libpf.Origin]samples.KeyToEventMapping + name string + tree samples.TraceEventsTree + expectedTime pcommon.Timestamp + expectedDuration uint64 }{ { - name: "profile duration", - events: map[libpf.Origin]samples.KeyToEventMapping{ - support.TraceOriginSampling: map[samples.TraceAndMetaKey]*samples.TraceEvents{ - {Pid: 1}: { - Timestamps: []uint64{2, 1, 3, 4, 7}, + name: "samples within collection window", + tree: samples.TraceEventsTree{ + libpf.NullString: map[libpf.Origin]samples.KeyToEventMapping{ + support.TraceOriginSampling: map[samples.TraceAndMetaKey]*samples.TraceEvents{ + {Pid: 1}: { + // Timestamps within the collection window (1000-1060) + Timestamps: []uint64{ + uint64(time.Unix(1010, 0).UnixNano()), + uint64(time.Unix(1020, 0).UnixNano()), + uint64(time.Unix(1030, 0).UnixNano()), + }, + }, + {Pid: 2}: { + Timestamps: []uint64{uint64(time.Unix(1040, 0).UnixNano())}, + }, }, - {Pid: 2}: { - Timestamps: []uint64{8}, + }, + }, + expectedTime: testProfileTime, + expectedDuration: testProfileDuration, + }, + { + name: "adjusted start time for buffered samples", + tree: samples.TraceEventsTree{ + libpf.NullString: map[libpf.Origin]samples.KeyToEventMapping{ + support.TraceOriginSampling: { + {Pid: 1}: { + Frames: newTestFrames(false), + // Sample before collection start (990 vs 1000) + Timestamps: []uint64{uint64(time.Unix(990, 0).UnixNano())}, + }, }, }, }, + expectedTime: pcommon.Timestamp(time.Unix(990, 0).UnixNano()), + expectedDuration: uint64(testCollectionEnd.Sub(time.Unix(990, 0)).Nanoseconds()), + }, + { + name: "adjusted across multiple containers", + tree: samples.TraceEventsTree{ + libpf.Intern("container1"): map[libpf.Origin]samples.KeyToEventMapping{ + support.TraceOriginSampling: { + {Pid: 1}: { + Frames: singleFrameTrace(libpf.GoFrame, mapping, 0x10, "func1", libpf.NullString, 1), + // Oldest sample at 985 + Timestamps: []uint64{uint64(time.Unix(985, 0).UnixNano())}, + }, + }, + }, + libpf.Intern("container2"): map[libpf.Origin]samples.KeyToEventMapping{ + support.TraceOriginSampling: { + {Pid: 2}: { + Frames: singleFrameTrace(libpf.GoFrame, mapping, 0x20, "func2", libpf.NullString, 2), + // Newer old sample at 995 + Timestamps: []uint64{uint64(time.Unix(995, 0).UnixNano())}, + }, + }, + }, + }, + expectedTime: pcommon.Timestamp(time.Unix(985, 0).UnixNano()), + expectedDuration: uint64(testCollectionEnd.Sub(time.Unix(985, 0)).Nanoseconds()), }, } { t.Run(tt.name, func(t *testing.T) { d, err := New(100, nil) require.NoError(t, err) - tree := make(samples.TraceEventsTree) - tree[libpf.NullString] = tt.events - res, err := d.Generate(tree, tt.name, "version") + res, err := testGenerate(d, tt.tree, tt.name, "version") require.NoError(t, err) - profile := res.ResourceProfiles().At(0).ScopeProfiles().At(0).Profiles().At(0) - require.Equal(t, uint64(7), profile.DurationNano()) - require.Equal(t, pcommon.Timestamp(1), profile.Time()) + for i := 0; i < res.ResourceProfiles().Len(); i++ { + rp := res.ResourceProfiles().At(i) + for j := 0; j < rp.ScopeProfiles().Len(); j++ { + sp := rp.ScopeProfiles().At(j) + for k := 0; k < sp.Profiles().Len(); k++ { + profile := sp.Profiles().At(k) + assert.Equal(t, tt.expectedTime, profile.Time()) + assert.Equal(t, tt.expectedDuration, profile.DurationNano()) + } + } + } }) } } @@ -249,7 +328,7 @@ func TestGenerate_EmptyTree(t *testing.T) { require.NoError(t, err) tree := make(samples.TraceEventsTree) - profiles, err := d.Generate(tree, "agent", "v1") + profiles, err := testGenerate(d, tree, "agent", "v1") require.NoError(t, err) assert.Equal(t, 0, profiles.ResourceProfiles().Len()) } @@ -295,7 +374,7 @@ func TestGenerate_SingleContainerSingleOrigin(t *testing.T) { traceKey: &samples.TraceEvents{ Frames: singleFrameTrace(libpf.GoFrame, mapping, 0x10, funcName, filePath, 42), - Timestamps: []uint64{100}, + Timestamps: []uint64{uint64(time.Unix(1010, 0).UnixNano())}, EnvVars: map[libpf.String]libpf.String{ libpf.Intern("FOO"): libpf.Intern("BAR"), }, @@ -306,7 +385,7 @@ func TestGenerate_SingleContainerSingleOrigin(t *testing.T) { libpf.Intern("container1"): events, } - profiles, err := d.Generate(tree, "agent", "v1") + profiles, err := testGenerate(d, tree, "agent", "v1") require.NoError(t, err) require.Equal(t, 1, profiles.ResourceProfiles().Len()) rp := profiles.ResourceProfiles().At(0) @@ -320,8 +399,8 @@ func TestGenerate_SingleContainerSingleOrigin(t *testing.T) { assert.Equal(t, semconv.SchemaURL, sp.SchemaUrl()) require.Equal(t, 1, sp.Profiles().Len()) prof := sp.Profiles().At(0) - assert.Equal(t, pcommon.Timestamp(100), prof.Time()) - assert.Equal(t, uint64(0), prof.DurationNano()) + assert.Equal(t, testProfileTime, prof.Time()) + assert.Equal(t, testProfileDuration, prof.DurationNano()) t.Run("Check environment variable attribute", func(t *testing.T) { foundFOOKey := false @@ -363,15 +442,21 @@ func TestGenerate_MultipleOriginsAndContainers(t *testing.T) { events1 := map[libpf.Origin]samples.KeyToEventMapping{ support.TraceOriginSampling: { traceKey: &samples.TraceEvents{ - Frames: frames, - Timestamps: []uint64{1, 2}, + Frames: frames, + Timestamps: []uint64{ + uint64(time.Unix(1010, 0).UnixNano()), + uint64(time.Unix(1020, 0).UnixNano()), + }, }, }, support.TraceOriginOffCPU: { traceKey: &samples.TraceEvents{ - Frames: frames, - Timestamps: []uint64{3, 4}, - OffTimes: []int64{10, 20}, + Frames: frames, + Timestamps: []uint64{ + uint64(time.Unix(1030, 0).UnixNano()), + uint64(time.Unix(1040, 0).UnixNano()), + }, + OffTimes: []int64{10, 20}, }, }, } @@ -379,7 +464,7 @@ func TestGenerate_MultipleOriginsAndContainers(t *testing.T) { support.TraceOriginSampling: { traceKey: &samples.TraceEvents{ Frames: frames, - Timestamps: []uint64{5}, + Timestamps: []uint64{uint64(time.Unix(1050, 0).UnixNano())}, }, }, } @@ -388,7 +473,7 @@ func TestGenerate_MultipleOriginsAndContainers(t *testing.T) { libpf.Intern("c2"): events2, } - profiles, err := d.Generate(tree, "agent", "v2") + profiles, err := testGenerate(d, tree, "agent", "v2") require.NoError(t, err) require.Equal(t, 2, profiles.ResourceProfiles().Len()) @@ -399,8 +484,18 @@ func TestGenerate_MultipleOriginsAndContainers(t *testing.T) { val, exists := rp.Resource().Attributes().Get(string(semconv.ContainerIDKey)) require.True(t, exists) containerID := val.Str() - profileCount := rp.ScopeProfiles().At(0).Profiles().Len() + sp := rp.ScopeProfiles().At(0) + profileCount := sp.Profiles().Len() containerProfileCounts[containerID] = profileCount + + // All profiles should have the same duration and start time based on collection window + for j := range profileCount { + prof := sp.Profiles().At(j) + assert.Equal(t, testProfileTime, prof.Time(), + "profile %d in container %s", j, containerID) + assert.Equal(t, testProfileDuration, prof.DurationNano(), + "profile %d in container %s", j, containerID) + } } // c1 has both origins, so 2 profiles @@ -436,7 +531,7 @@ func TestGenerate_StringAndFunctionTablePopulation(t *testing.T) { libpf.Intern("c"): events, } - profiles, err := d.Generate(tree, "agent", "v3") + profiles, err := testGenerate(d, tree, "agent", "v3") require.NoError(t, err) dic := profiles.Dictionary() // The string table should contain "" as first element, then function name and file path @@ -492,8 +587,12 @@ func TestGenerate_NativeFrame(t *testing.T) { events := map[libpf.Origin]samples.KeyToEventMapping{ support.TraceOriginSampling: { traceKey: &samples.TraceEvents{ - Frames: singleFrameNative(mappingFile, 0x1000, 0x1000, 0x2000, 0x100), - Timestamps: []uint64{123, 456, 789}, + Frames: singleFrameNative(mappingFile, 0x1000, 0x1000, 0x2000, 0x100), + Timestamps: []uint64{ + uint64(time.Unix(1010, 0).UnixNano()), + uint64(time.Unix(1020, 0).UnixNano()), + uint64(time.Unix(1030, 0).UnixNano()), + }, }, }, } @@ -501,7 +600,7 @@ func TestGenerate_NativeFrame(t *testing.T) { libpf.Intern("native_container"): events, } - profiles, err := d.Generate(tree, "agent", "v1") + profiles, err := testGenerate(d, tree, "agent", "v1") require.NoError(t, err) require.Equal(t, 1, profiles.ResourceProfiles().Len()) @@ -520,8 +619,8 @@ func TestGenerate_NativeFrame(t *testing.T) { // Check profile require.Equal(t, 1, sp.Profiles().Len()) prof := sp.Profiles().At(0) - assert.Equal(t, pcommon.Timestamp(123), prof.Time()) - assert.Equal(t, uint64(666), prof.DurationNano()) + assert.Equal(t, testProfileTime, prof.Time()) + assert.Equal(t, testProfileDuration, prof.DurationNano()) // Verify profile contains one sample assert.Equal(t, 1, prof.Samples().Len()) @@ -622,7 +721,7 @@ func TestStackTableOrder(t *testing.T) { require.NoError(t, err) tree := make(samples.TraceEventsTree) tree[libpf.NullString] = tt.events - res, _ := d.Generate(tree, tt.name, "version") + res, _ := testGenerate(d, tree, tt.name, "version") dic := res.Dictionary() diff --git a/reporter/otlp_reporter.go b/reporter/otlp_reporter.go index 91aab3654..9e56bf330 100644 --- a/reporter/otlp_reporter.go +++ b/reporter/otlp_reporter.go @@ -71,6 +71,8 @@ func NewOTLP(cfg *Config) (*OTLPReporter, error) { // Start sets up and manages the reporting connection to a OTLP backend. func (r *OTLPReporter) Start(ctx context.Context) error { + r.collectionStartTime = time.Now() + // Create a child context for reporting features ctx, cancelReporting := context.WithCancel(ctx) @@ -114,9 +116,13 @@ func (r *OTLPReporter) reportOTLPProfile(ctx context.Context) error { reportedEvents := (*traceEventsPtr) newEvents := make(samples.TraceEventsTree) *traceEventsPtr = newEvents + collectionEndTime := time.Now() + collectionStartTime := r.collectionStartTime + r.collectionStartTime = collectionEndTime r.traceEvents.WUnlock(&traceEventsPtr) - profiles, err := r.pdata.Generate(reportedEvents, r.name, r.version) + profiles, err := r.pdata.Generate(reportedEvents, r.name, r.version, + collectionStartTime, collectionEndTime) if err != nil { log.Errorf("pdata: %v", err) return nil