From 6970788da51d5206bdca142a123299c9831f9e77 Mon Sep 17 00:00:00 2001 From: Florian Lehner Date: Tue, 24 Mar 2026 12:57:12 +0100 Subject: [PATCH 1/2] reporter: refactor for reference based attributes (#1234) Signed-off-by: Florian Lehner --- reporter/base_reporter.go | 42 +++--- reporter/collector_reporter_test.go | 8 +- reporter/internal/pdata/generate.go | 92 ++++++------ reporter/internal/pdata/generate_test.go | 175 ++++++++++++----------- reporter/internal/pdata/helper.go | 3 +- reporter/internal/pdata/pdata.go | 6 +- reporter/samples/attrmgr_test.go | 58 +++----- reporter/samples/samples.go | 75 ++++++---- 8 files changed, 234 insertions(+), 225 deletions(-) diff --git a/reporter/base_reporter.go b/reporter/base_reporter.go index ddd21627f..84266918c 100644 --- a/reporter/base_reporter.go +++ b/reporter/base_reporter.go @@ -61,43 +61,45 @@ func (b *baseReporter) ReportTraceEvent(trace *libpf.Trace, meta *samples.TraceE extraMeta = b.cfg.ExtraSampleAttrProd.CollectExtraSampleMeta(trace, meta) } - containerID := meta.ContainerID - key := samples.TraceAndMetaKey{ - Hash: trace.Hash, - Comm: meta.Comm, - ProcessName: meta.ProcessName, + key := samples.ResourceKey{ + APMServiceName: meta.APMServiceName, + ContainerID: meta.ContainerID, + PID: int64(meta.PID), ExecutablePath: meta.ExecutablePath, - ApmServiceName: meta.APMServiceName, - Pid: int64(meta.PID), - Tid: int64(meta.TID), - CPU: int64(meta.CPU), - ExtraMeta: extraMeta, } eventsTree := b.traceEvents.WLock() defer b.traceEvents.WUnlock(&eventsTree) - if _, exists := (*eventsTree)[samples.ContainerID(containerID)]; !exists { - (*eventsTree)[samples.ContainerID(containerID)] = - make(map[libpf.Origin]samples.KeyToEventMapping) + if _, exists := (*eventsTree)[key]; !exists { + (*eventsTree)[key] = samples.ResourceToProfiles{ + EnvVars: meta.EnvVars, + Events: make(map[libpf.Origin]samples.SampleToEvents), + } } - if _, exists := (*eventsTree)[samples.ContainerID(containerID)][meta.Origin]; !exists { - (*eventsTree)[samples.ContainerID(containerID)][meta.Origin] = - make(samples.KeyToEventMapping) + rtp := (*eventsTree)[key] + if _, exists := rtp.Events[meta.Origin]; !exists { + rtp.Events[meta.Origin] = make(samples.SampleToEvents) } - if events, exists := (*eventsTree)[samples.ContainerID(containerID)][meta.Origin][key]; exists { + sampleKey := samples.SampleKey{ + Hash: trace.Hash, + Comm: meta.Comm, + TID: int64(meta.TID), + CPU: int64(meta.CPU), + ExtraMeta: extraMeta, + } + if events, exists := rtp.Events[meta.Origin][sampleKey]; exists { events.Timestamps = append(events.Timestamps, uint64(meta.Timestamp)) events.OffTimes = append(events.OffTimes, meta.OffTime) - (*eventsTree)[samples.ContainerID(containerID)][meta.Origin][key] = events return nil } - (*eventsTree)[samples.ContainerID(containerID)][meta.Origin][key] = &samples.TraceEvents{ + + rtp.Events[meta.Origin][sampleKey] = &samples.TraceEvents{ Frames: trace.Frames, Timestamps: []uint64{uint64(meta.Timestamp)}, OffTimes: []int64{meta.OffTime}, - EnvVars: meta.EnvVars, Labels: trace.CustomLabels, } return nil diff --git a/reporter/collector_reporter_test.go b/reporter/collector_reporter_test.go index 70cd23a13..223d362ab 100644 --- a/reporter/collector_reporter_test.go +++ b/reporter/collector_reporter_test.go @@ -88,9 +88,9 @@ func TestCollectorReporterShutdown(t *testing.T) { traceEventsPtr := r.traceEvents.WLock() tree := (*traceEventsPtr) - tree[libpf.NullString] = map[libpf.Origin]samples.KeyToEventMapping{ - support.TraceOriginProbe: map[samples.TraceAndMetaKey]*samples.TraceEvents{ - {Pid: 1}: { + tree[samples.ResourceKey{PID: 1}] = samples.ResourceToProfiles{Events: map[libpf.Origin]samples.SampleToEvents{ + support.TraceOriginProbe: { + {}: { Frames: func() libpf.Frames { frames := make(libpf.Frames, 0, 1) frames.Append(&libpf.Frame{ @@ -103,7 +103,7 @@ func TestCollectorReporterShutdown(t *testing.T) { Timestamps: []uint64{1, 2, 3, 4}, }, }, - } + }} r.traceEvents.WUnlock(&traceEventsPtr) ctx, cancelFn := context.WithCancel(t.Context()) diff --git a/reporter/internal/pdata/generate.go b/reporter/internal/pdata/generate.go index af7bf04ed..830c77ca8 100644 --- a/reporter/internal/pdata/generate.go +++ b/reporter/internal/pdata/generate.go @@ -35,12 +35,12 @@ func (p *Pdata) Generate(tree samples.TraceEventsTree, profiles := pprofile.NewProfiles() dic := profiles.Dictionary() - // Find oldest sample timestamp across all containers to handle buffered samples. + // Find oldest sample timestamp across all resources to handle buffered samples. adjustedStartTime := collectionStartTime - for _, containerEvents := range tree { - for _, originEvents := range containerEvents { - for _, traceEvents := range originEvents { - for _, ts := range traceEvents.Timestamps { + for _, resourceToEvents := range tree { + for _, traceEvents := range resourceToEvents.Events { + for _, traceInfo := range traceEvents { + for _, ts := range traceInfo.Timestamps { sampleTime := time.Unix(0, int64(ts)) if sampleTime.Before(adjustedStartTime) { adjustedStartTime = sampleTime @@ -77,14 +77,13 @@ func (p *Pdata) Generate(tree samples.TraceEventsTree, attrMgr := samples.NewAttrTableManager(stringSet, dic.AttributeTable()) - for containerID, originToEvents := range tree { - if len(originToEvents) == 0 { + for resource, toEvents := range tree { + if len(toEvents.Events) == 0 { continue } rp := profiles.ResourceProfiles().AppendEmpty() - rp.Resource().Attributes().PutStr(string(semconv.ContainerIDKey), - containerID.String()) + setResourceAttributes(rp.Resource().Attributes(), resource, toEvents.EnvVars) rp.SetSchemaUrl(semconv.SchemaURL) sp := rp.ScopeProfiles().AppendEmpty() @@ -97,19 +96,20 @@ func (p *Pdata) Generate(tree samples.TraceEventsTree, support.TraceOriginOffCPU, support.TraceOriginProbe, } { - if len(originToEvents[origin]) == 0 { + if len(toEvents.Events[origin]) == 0 { // Do not append empty profiles. continue } prof := sp.Profiles().AppendEmpty() - if err := p.setProfile(dic, - attrMgr, stringSet, funcSet, mappingSet, stackSet, locationSet, - origin, originToEvents[origin], prof, + if err := p.setProfile(dic, attrMgr, + stringSet, funcSet, mappingSet, stackSet, locationSet, + origin, toEvents.Events[origin], prof, collectionStartTime, collectionEndTime); err != nil { return profiles, err } } + } // Populate the ProfilesDictionary tables. @@ -144,7 +144,7 @@ func (p *Pdata) setProfile( stackSet orderedset.OrderedSet[stackInfo], locationSet orderedset.OrderedSet[locationInfo], origin libpf.Origin, - events map[samples.TraceAndMetaKey]*samples.TraceEvents, + events samples.SampleToEvents, profile pprofile.Profile, collectionStartTime, collectionEndTime time.Time, ) error { @@ -169,7 +169,7 @@ func (p *Pdata) setProfile( return fmt.Errorf("generating profile for unsupported origin %d", origin) } - for traceKey, traceInfo := range events { + for sampleKey, traceInfo := range events { sample := profile.Samples().AppendEmpty() sample.TimestampsUnixNano().FromRaw(traceInfo.Timestamps) @@ -183,7 +183,7 @@ func (p *Pdata) setProfile( frame := uniqueFrame.Value() locInfo := locationInfo{ address: uint64(frame.AddressOrLineno), - frameType: frame.Type.String(), + frameType: frame.Type, } index, ok := mappingSet.AddWithCheck(frame.Mapping) @@ -234,7 +234,7 @@ func (p *Pdata) setProfile( line.SetFunctionIndex(locInfo.functionIndex) } attrMgr.AppendOptionalString(loc.AttributeIndices(), - semconv.ProfileFrameTypeKey, locInfo.frameType) + semconv.ProfileFrameTypeKey, locInfo.frameType.String()) } locationIndices = append(locationIndices, idx) } // End per-frame processing @@ -251,34 +251,6 @@ func (p *Pdata) setProfile( } sample.SetStackIndex(stackIdx) - exeName := "" - if traceKey.ExecutablePath != libpf.NullString { - _, exeName = filepath.Split(traceKey.ExecutablePath.String()) - } - - attrMgr.AppendOptionalString(sample.AttributeIndices(), - semconv.ThreadNameKey, traceKey.Comm.String()) - - attrMgr.AppendOptionalString(sample.AttributeIndices(), - semconv.ProcessExecutableNameKey, exeName) - attrMgr.AppendOptionalString(sample.AttributeIndices(), - semconv.ProcessExecutablePathKey, traceKey.ExecutablePath.String()) - - attrMgr.AppendOptionalString(sample.AttributeIndices(), - semconv.ServiceNameKey, traceKey.ApmServiceName) - attrMgr.AppendInt(sample.AttributeIndices(), - semconv.ProcessPIDKey, traceKey.Pid) - attrMgr.AppendInt(sample.AttributeIndices(), - semconv.ThreadIDKey, traceKey.Tid) - attrMgr.AppendInt(sample.AttributeIndices(), - semconv.CPULogicalNumberKey, int64(traceKey.CPU)) - - for key, value := range traceInfo.EnvVars { - env := semconv.ProcessEnvironmentVariable(key.String(), value.String()) - attrMgr.AppendOptionalString( - sample.AttributeIndices(), - env.Key, env.Value.AsString()) - } for key, value := range traceInfo.Labels { // Once https://github.com/open-telemetry/semantic-conventions/issues/2561 // reached an agreement, use the actual OTel SemConv attribute. @@ -288,8 +260,15 @@ func (p *Pdata) setProfile( value.String()) } + attrMgr.AppendOptionalString(sample.AttributeIndices(), + semconv.ThreadNameKey, sampleKey.Comm.String()) + attrMgr.AppendInt(sample.AttributeIndices(), + semconv.ThreadIDKey, sampleKey.TID) + attrMgr.AppendInt(sample.AttributeIndices(), + semconv.CPULogicalNumberKey, int64(sampleKey.CPU)) + if p.ExtraSampleAttrProd != nil { - extra := p.ExtraSampleAttrProd.ExtraSampleAttrs(attrMgr, traceKey.ExtraMeta) + extra := p.ExtraSampleAttrProd.ExtraSampleAttrs(attrMgr, sampleKey.ExtraMeta) sample.AttributeIndices().Append(extra...) } } // End sample processing @@ -301,3 +280,24 @@ func (p *Pdata) setProfile( return nil } + +func setResourceAttributes(attrs pcommon.Map, resource samples.ResourceKey, envVars map[libpf.String]libpf.String) { + if resource.APMServiceName != "" { + attrs.PutStr(string(semconv.ServiceNameKey), resource.APMServiceName) + } + if resource.ContainerID != libpf.NullString { + attrs.PutStr(string(semconv.ContainerIDKey), resource.ContainerID.String()) + } + + attrs.PutInt(string(semconv.ProcessPIDKey), resource.PID) + + if resource.ExecutablePath != libpf.NullString { + attrs.PutStr(string(semconv.ProcessExecutablePathKey), resource.ExecutablePath.String()) + _, exeName := filepath.Split(resource.ExecutablePath.String()) + attrs.PutStr(string(semconv.ProcessExecutableNameKey), exeName) + } + + for key, value := range envVars { + attrs.PutStr("process.environment_variable."+key.String(), value.String()) + } +} diff --git a/reporter/internal/pdata/generate_test.go b/reporter/internal/pdata/generate_test.go index e579f493f..f9fc35293 100644 --- a/reporter/internal/pdata/generate_test.go +++ b/reporter/internal/pdata/generate_test.go @@ -168,27 +168,26 @@ func newTestFrames(extraFrame bool) libpf.Frames { func TestFunctionTableOrder(t *testing.T) { for _, tt := range []struct { name string - events map[libpf.Origin]samples.KeyToEventMapping + events map[libpf.Origin]samples.SampleToEvents wantFunctionTable []string expectedResourceProfiles int }{ { name: "no events", - events: map[libpf.Origin]samples.KeyToEventMapping{}, + events: map[libpf.Origin]samples.SampleToEvents{}, wantFunctionTable: []string{""}, expectedResourceProfiles: 0, }, { name: "single executable", expectedResourceProfiles: 1, - events: map[libpf.Origin]samples.KeyToEventMapping{ - support.TraceOriginSampling: map[samples.TraceAndMetaKey]*samples.TraceEvents{ - {Pid: 1}: { + events: map[libpf.Origin]samples.SampleToEvents{ + support.TraceOriginSampling: { + {}: { Frames: newTestFrames(false), Timestamps: []uint64{1, 2, 3, 4, 5}, }, - // Test Function deduplication - {Pid: 2}: { + samples.SampleKey{Hash: libpf.NewTraceHash(0, 1)}: { Frames: newTestFrames(true), Timestamps: []uint64{6, 7, 8, 9, 10, 11}, }, @@ -203,7 +202,9 @@ func TestFunctionTableOrder(t *testing.T) { d, err := New(100, nil) require.NoError(t, err) tree := make(samples.TraceEventsTree) - tree[libpf.NullString] = tt.events + if len(tt.events) > 0 { + tree[samples.ResourceKey{PID: 1}] = samples.ResourceToProfiles{Events: tt.events} + } res, _ := testGenerate(d, tree, tt.name, "version") require.Equal(t, tt.expectedResourceProfiles, res.ResourceProfiles().Len()) if tt.expectedResourceProfiles == 0 { @@ -245,9 +246,9 @@ func TestProfileDuration(t *testing.T) { { name: "samples within collection window", tree: samples.TraceEventsTree{ - libpf.NullString: map[libpf.Origin]samples.KeyToEventMapping{ - support.TraceOriginSampling: map[samples.TraceAndMetaKey]*samples.TraceEvents{ - {Pid: 1}: { + samples.ResourceKey{PID: 1}: samples.ResourceToProfiles{Events: map[libpf.Origin]samples.SampleToEvents{ + support.TraceOriginSampling: { + {}: { // Timestamps within the collection window (1000-1060) Timestamps: []uint64{ uint64(time.Unix(1010, 0).UnixNano()), @@ -255,11 +256,15 @@ func TestProfileDuration(t *testing.T) { uint64(time.Unix(1030, 0).UnixNano()), }, }, - {Pid: 2}: { + }, + }}, + samples.ResourceKey{PID: 2}: samples.ResourceToProfiles{Events: map[libpf.Origin]samples.SampleToEvents{ + support.TraceOriginSampling: { + {}: { Timestamps: []uint64{uint64(time.Unix(1040, 0).UnixNano())}, }, }, - }, + }}, }, expectedTime: testProfileTime, expectedDuration: testProfileDuration, @@ -267,15 +272,15 @@ func TestProfileDuration(t *testing.T) { { name: "adjusted start time for buffered samples", tree: samples.TraceEventsTree{ - libpf.NullString: map[libpf.Origin]samples.KeyToEventMapping{ + samples.ResourceKey{PID: 1}: samples.ResourceToProfiles{Events: map[libpf.Origin]samples.SampleToEvents{ support.TraceOriginSampling: { - {Pid: 1}: { + {}: { Frames: newTestFrames(false), // Sample before collection start (990 vs 1000) Timestamps: []uint64{uint64(time.Unix(990, 0).UnixNano())}, }, }, - }, + }}, }, expectedTime: pcommon.Timestamp(time.Unix(990, 0).UnixNano()), expectedDuration: uint64(testCollectionEnd.Sub(time.Unix(990, 0)).Nanoseconds()), @@ -283,24 +288,24 @@ func TestProfileDuration(t *testing.T) { { name: "adjusted across multiple containers", tree: samples.TraceEventsTree{ - libpf.Intern("container1"): map[libpf.Origin]samples.KeyToEventMapping{ + samples.ResourceKey{PID: 1, ContainerID: libpf.Intern("container1")}: samples.ResourceToProfiles{Events: map[libpf.Origin]samples.SampleToEvents{ support.TraceOriginSampling: { - {Pid: 1}: { + {}: { Frames: singleFrameTrace(libpf.GoFrame, mapping, 0x10, "func1", libpf.NullString, 1), // Oldest sample at 985 Timestamps: []uint64{uint64(time.Unix(985, 0).UnixNano())}, }, }, - }, - libpf.Intern("container2"): map[libpf.Origin]samples.KeyToEventMapping{ + }}, + samples.ResourceKey{PID: 2, ContainerID: libpf.Intern("container2")}: samples.ResourceToProfiles{Events: map[libpf.Origin]samples.SampleToEvents{ support.TraceOriginSampling: { - {Pid: 2}: { + {}: { Frames: singleFrameTrace(libpf.GoFrame, mapping, 0x20, "func2", libpf.NullString, 2), // Newer old sample at 995 Timestamps: []uint64{uint64(time.Unix(995, 0).UnixNano())}, }, }, - }, + }}, }, expectedTime: pcommon.Timestamp(time.Unix(985, 0).UnixNano()), expectedDuration: uint64(testCollectionEnd.Sub(time.Unix(985, 0)).Nanoseconds()), @@ -367,27 +372,28 @@ func TestGenerate_SingleContainerSingleOrigin(t *testing.T) { }), }) - traceKey := samples.TraceAndMetaKey{ + resourceKey := samples.ResourceKey{ ExecutablePath: filePath, - Comm: libpf.Intern("testproc"), - Pid: 123, - Tid: 456, - ApmServiceName: "svc", + PID: 123, + APMServiceName: "svc", + ContainerID: libpf.Intern("container1"), } - events := map[libpf.Origin]samples.KeyToEventMapping{ + events := map[libpf.Origin]samples.SampleToEvents{ support.TraceOriginSampling: { - traceKey: &samples.TraceEvents{ + {}: &samples.TraceEvents{ Frames: singleFrameTrace(libpf.GoFrame, mapping, 0x10, funcName, filePath, 42), Timestamps: []uint64{uint64(time.Unix(1010, 0).UnixNano())}, - EnvVars: map[libpf.String]libpf.String{ - libpf.Intern("FOO"): libpf.Intern("BAR"), - }, }, }, } tree := samples.TraceEventsTree{ - libpf.Intern("container1"): events, + resourceKey: samples.ResourceToProfiles{ + EnvVars: map[libpf.String]libpf.String{ + libpf.Intern("FOO"): libpf.Intern("BAR"), + }, + Events: events, + }, } profiles, err := testGenerate(d, tree, "agent", "v1") @@ -408,25 +414,12 @@ func TestGenerate_SingleContainerSingleOrigin(t *testing.T) { assert.Equal(t, testProfileDuration, prof.DurationNano()) t.Run("Check environment variable attribute", func(t *testing.T) { - foundFOOKey := false - foundBarValue := false - - dic := profiles.Dictionary() - for _, attr := range dic.AttributeTable().All() { - key := dic.StringTable().At(int(attr.KeyStrindex())) - value := attr.Value() - // Check if this is an environment variable attribute - if key == "process.environment_variable.FOO" { - foundFOOKey = true - if value.Type() == pcommon.ValueTypeStr && value.Str() == "BAR" { - foundBarValue = true - } - } - } - assert.True(t, foundFOOKey, - "Attribute 'process.environment_variable.FOO' should be in the attribute table") - assert.True(t, foundBarValue, - "Environment variable value 'bar' should be in the attribute table") + rp := profiles.ResourceProfiles().At(0) + val, exists := rp.Resource().Attributes().Get("process.environment_variable.FOO") + assert.True(t, exists, + "Attribute 'process.environment_variable.FOO' should be in the resource attributes") + assert.Equal(t, "BAR", val.Str(), + "Environment variable value 'BAR' should be in the resource attributes") }) } @@ -441,12 +434,15 @@ func TestGenerate_MultipleOriginsAndContainers(t *testing.T) { }), }) exec := libpf.Intern("/bin/foo") - traceKey := samples.TraceAndMetaKey{ExecutablePath: exec} frames := singleFrameTrace(libpf.PythonFrame, mapping, 0x20, "f", exec, 1) - events1 := map[libpf.Origin]samples.KeyToEventMapping{ + resourceKey1 := samples.ResourceKey{ + ExecutablePath: exec, + ContainerID: libpf.Intern("c1"), + } + events1 := map[libpf.Origin]samples.SampleToEvents{ support.TraceOriginSampling: { - traceKey: &samples.TraceEvents{ + {}: &samples.TraceEvents{ Frames: frames, Timestamps: []uint64{ uint64(time.Unix(1010, 0).UnixNano()), @@ -455,7 +451,7 @@ func TestGenerate_MultipleOriginsAndContainers(t *testing.T) { }, }, support.TraceOriginOffCPU: { - traceKey: &samples.TraceEvents{ + {}: &samples.TraceEvents{ Frames: frames, Timestamps: []uint64{ uint64(time.Unix(1030, 0).UnixNano()), @@ -465,17 +461,21 @@ func TestGenerate_MultipleOriginsAndContainers(t *testing.T) { }, }, } - events2 := map[libpf.Origin]samples.KeyToEventMapping{ + resourceKey2 := samples.ResourceKey{ + ExecutablePath: exec, + ContainerID: libpf.Intern("c2"), + } + events2 := map[libpf.Origin]samples.SampleToEvents{ support.TraceOriginSampling: { - traceKey: &samples.TraceEvents{ + {}: &samples.TraceEvents{ Frames: frames, Timestamps: []uint64{uint64(time.Unix(1050, 0).UnixNano())}, }, }, } tree := samples.TraceEventsTree{ - libpf.Intern("c1"): events1, - libpf.Intern("c2"): events2, + resourceKey1: samples.ResourceToProfiles{Events: events1}, + resourceKey2: samples.ResourceToProfiles{Events: events2}, } profiles, err := testGenerate(d, tree, "agent", "v2") @@ -522,10 +522,13 @@ func TestGenerate_StringAndFunctionTablePopulation(t *testing.T) { }), }) - traceKey := samples.TraceAndMetaKey{ExecutablePath: filePath} - events := map[libpf.Origin]samples.KeyToEventMapping{ + resourceKey := samples.ResourceKey{ + ExecutablePath: filePath, + ContainerID: libpf.Intern("c"), + } + events := map[libpf.Origin]samples.SampleToEvents{ support.TraceOriginSampling: { - traceKey: &samples.TraceEvents{ + {}: &samples.TraceEvents{ Frames: singleFrameTrace(libpf.PythonFrame, mapping, 0x30, funcName, filePath, 123), Timestamps: []uint64{42}, @@ -533,7 +536,7 @@ func TestGenerate_StringAndFunctionTablePopulation(t *testing.T) { }, } tree := samples.TraceEventsTree{ - libpf.Intern("c"): events, + resourceKey: samples.ResourceToProfiles{Events: events}, } profiles, err := testGenerate(d, tree, "agent", "v3") @@ -583,15 +586,14 @@ func TestGenerate_NativeFrame(t *testing.T) { FileName: filePath, }) - traceKey := samples.TraceAndMetaKey{ + resourceKey := samples.ResourceKey{ ExecutablePath: filePath, - Comm: libpf.Intern("native_app"), - Pid: 789, - Tid: 1011, + PID: 789, + ContainerID: libpf.Intern("native_container"), } - events := map[libpf.Origin]samples.KeyToEventMapping{ + events := map[libpf.Origin]samples.SampleToEvents{ support.TraceOriginSampling: { - traceKey: &samples.TraceEvents{ + {}: &samples.TraceEvents{ Frames: singleFrameNative(mappingFile, 0x1000, 0x1000, 0x2000, 0x100), Timestamps: []uint64{ uint64(time.Unix(1010, 0).UnixNano()), @@ -602,7 +604,7 @@ func TestGenerate_NativeFrame(t *testing.T) { }, } tree := samples.TraceEventsTree{ - libpf.Intern("native_container"): events, + resourceKey: samples.ResourceToProfiles{Events: events}, } profiles, err := testGenerate(d, tree, "agent", "v1") @@ -671,15 +673,15 @@ func TestGenerate_NativeFrame(t *testing.T) { func TestStackTableOrder(t *testing.T) { for _, tt := range []struct { name string - events map[libpf.Origin]samples.KeyToEventMapping + events map[libpf.Origin]samples.SampleToEvents wantStackTable [][]int32 expectedLocationTableLen int }{ { name: "single stack", - events: map[libpf.Origin]samples.KeyToEventMapping{ - support.TraceOriginSampling: map[samples.TraceAndMetaKey]*samples.TraceEvents{ + events: map[libpf.Origin]samples.SampleToEvents{ + support.TraceOriginSampling: { {}: { Frames: newTestFrames(false), Timestamps: []uint64{1, 2, 3, 4, 5}, @@ -693,21 +695,21 @@ func TestStackTableOrder(t *testing.T) { }, { name: "multiple stacks", - events: map[libpf.Origin]samples.KeyToEventMapping{ - support.TraceOriginSampling: map[samples.TraceAndMetaKey]*samples.TraceEvents{ - {Pid: 1}: { + events: map[libpf.Origin]samples.SampleToEvents{ + support.TraceOriginSampling: { + {}: { Frames: newTestFrames(false), Timestamps: []uint64{1, 2, 3, 4, 5}, }, }, // This test relies on an implementation detail for ordering of results: // it assumes that support.TraceOriginSampling events are processed first - support.TraceOriginOffCPU: map[samples.TraceAndMetaKey]*samples.TraceEvents{ - {Pid: 2}: { + support.TraceOriginOffCPU: { + samples.SampleKey{Hash: libpf.NewTraceHash(0, 1)}: { Frames: newTestFrames(true), Timestamps: []uint64{7, 8, 9, 10, 11, 12}, }, - {Pid: 3}: { + samples.SampleKey{Hash: libpf.NewTraceHash(0, 2)}: { Frames: newTestFrames(false), Timestamps: []uint64{13, 14, 15, 16, 17}, }, @@ -725,7 +727,7 @@ func TestStackTableOrder(t *testing.T) { d, err := New(100, nil) require.NoError(t, err) tree := make(samples.TraceEventsTree) - tree[libpf.NullString] = tt.events + tree[samples.ResourceKey{}] = samples.ResourceToProfiles{Events: tt.events} res, _ := testGenerate(d, tree, tt.name, "version") dic := res.Dictionary() @@ -753,10 +755,13 @@ func TestGenerate_Validate(t *testing.T) { }), }) - traceKey := samples.TraceAndMetaKey{ExecutablePath: filePath} - events := map[libpf.Origin]samples.KeyToEventMapping{ + resourceKey := samples.ResourceKey{ + ExecutablePath: filePath, + ContainerID: libpf.Intern("native_container"), + } + events := map[libpf.Origin]samples.SampleToEvents{ support.TraceOriginSampling: { - traceKey: &samples.TraceEvents{ + {}: &samples.TraceEvents{ Frames: singleFrameTrace(libpf.PythonFrame, mapping, 0x30, funcName, filePath, 123), Timestamps: []uint64{42}, @@ -764,7 +769,7 @@ func TestGenerate_Validate(t *testing.T) { }, } tree := samples.TraceEventsTree{ - libpf.Intern("native_container"): events, + resourceKey: samples.ResourceToProfiles{Events: events}, } profiles, err := testGenerate(d, tree, "agent", "v1") diff --git a/reporter/internal/pdata/helper.go b/reporter/internal/pdata/helper.go index 39a0e8c06..0a86418aa 100644 --- a/reporter/internal/pdata/helper.go +++ b/reporter/internal/pdata/helper.go @@ -3,6 +3,7 @@ package pdata // import "go.opentelemetry.io/ebpf-profiler/reporter/internal/pda import ( "hash/fnv" + "go.opentelemetry.io/ebpf-profiler/libpf" "go.opentelemetry.io/ebpf-profiler/libpf/pfunsafe" ) @@ -10,7 +11,7 @@ import ( type locationInfo struct { address uint64 mappingIndex int32 - frameType string + frameType libpf.FrameType hasLine bool lineNumber int64 columnNumber int64 diff --git a/reporter/internal/pdata/pdata.go b/reporter/internal/pdata/pdata.go index d6b8ace0b..2e1eaae1c 100644 --- a/reporter/internal/pdata/pdata.go +++ b/reporter/internal/pdata/pdata.go @@ -10,12 +10,12 @@ import ( // Pdata holds the cache for the data used to generate the events reporters // will export when handling OTLP data. type Pdata struct { - // samplesPerSecond is the number of samples per second. - samplesPerSecond int - // ExtraSampleAttrProd is an optional hook point for adding custom // attributes to samples. ExtraSampleAttrProd samples.SampleAttrProducer + + // samplesPerSecond is the number of samples per second. + samplesPerSecond int } func New(samplesPerSecond int, extra samples.SampleAttrProducer) (*Pdata, error) { diff --git a/reporter/samples/attrmgr_test.go b/reporter/samples/attrmgr_test.go index 6917eeaab..23a72e242 100644 --- a/reporter/samples/attrmgr_test.go +++ b/reporter/samples/attrmgr_test.go @@ -11,7 +11,6 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pprofile" - "go.opentelemetry.io/ebpf-profiler/libpf" "go.opentelemetry.io/ebpf-profiler/reporter/internal/orderedset" semconv "go.opentelemetry.io/otel/semconv/v1.34.0" ) @@ -22,19 +21,16 @@ type attributeStruct struct { } func TestAttrTableManager(t *testing.T) { - comm1 := libpf.Intern("comm1") - comm2 := libpf.Intern("comm2") tests := map[string]struct { - k []TraceAndMetaKey + k []ResourceKey expectedIndices [][]int32 expectedAttributeTable []attributeStruct }{ "empty": { - k: []TraceAndMetaKey{ + k: []ResourceKey{ { - Hash: libpf.TraceHash{}, - ApmServiceName: "", - Pid: 0, + APMServiceName: "", + PID: 0, }, }, expectedIndices: [][]int32{{0}}, @@ -43,49 +39,38 @@ func TestAttrTableManager(t *testing.T) { }, }, "duplicate": { - k: []TraceAndMetaKey{ + k: []ResourceKey{ { - Hash: libpf.TraceHash{}, - Comm: comm1, - ApmServiceName: "apmServiceName1", - Pid: 1234, + APMServiceName: "APMServiceName1", + PID: 1234, }, { - Hash: libpf.TraceHash{}, - Comm: comm1, - ApmServiceName: "apmServiceName1", - Pid: 1234, + APMServiceName: "APMServiceName1", + PID: 1234, }, }, - expectedIndices: [][]int32{{0, 1, 2}, {0, 1, 2}}, + expectedIndices: [][]int32{{0, 1}, {0, 1}}, expectedAttributeTable: []attributeStruct{ - {Key: "thread.name", Value: "comm1"}, - {Key: "service.name", Value: "apmServiceName1"}, + {Key: "service.name", Value: "APMServiceName1"}, {Key: "process.pid", Value: int64(1234)}, }, }, "different": { - k: []TraceAndMetaKey{ + k: []ResourceKey{ { - Hash: libpf.TraceHash{}, - Comm: comm1, - ApmServiceName: "apmServiceName1", - Pid: 1234, + APMServiceName: "APMServiceName1", + PID: 1234, }, { - Hash: libpf.TraceHash{}, - Comm: comm2, - ApmServiceName: "apmServiceName2", - Pid: 6789, + APMServiceName: "APMServiceName2", + PID: 6789, }, }, - expectedIndices: [][]int32{{0, 1, 2}, {3, 4, 5}}, + expectedIndices: [][]int32{{0, 1}, {2, 3}}, expectedAttributeTable: []attributeStruct{ - {Key: "thread.name", Value: "comm1"}, - {Key: "service.name", Value: "apmServiceName1"}, + {Key: "service.name", Value: "APMServiceName1"}, {Key: "process.pid", Value: int64(1234)}, - {Key: "thread.name", Value: "comm2"}, - {Key: "service.name", Value: "apmServiceName2"}, + {Key: "service.name", Value: "APMServiceName2"}, {Key: "process.pid", Value: int64(6789)}, }, }, @@ -99,9 +84,8 @@ func TestAttrTableManager(t *testing.T) { indices := make([][]int32, 0) for _, k := range tc.k { inner := pcommon.NewInt32Slice() - mgr.AppendOptionalString(inner, semconv.ThreadNameKey, k.Comm.String()) - mgr.AppendOptionalString(inner, semconv.ServiceNameKey, k.ApmServiceName) - mgr.AppendInt(inner, semconv.ProcessPIDKey, k.Pid) + mgr.AppendOptionalString(inner, semconv.ServiceNameKey, k.APMServiceName) + mgr.AppendInt(inner, semconv.ProcessPIDKey, k.PID) indices = append(indices, inner.AsRaw()) } diff --git a/reporter/samples/samples.go b/reporter/samples/samples.go index 4e1559a87..a0f08b2c8 100644 --- a/reporter/samples/samples.go +++ b/reporter/samples/samples.go @@ -8,57 +8,74 @@ import ( ) type TraceEventMeta struct { - Timestamp libpf.UnixTime64 Comm libpf.String ProcessName libpf.String ExecutablePath libpf.String - APMServiceName string ContainerID libpf.String - PID, TID libpf.PID + EnvVars map[libpf.String]libpf.String + APMServiceName string + Timestamp libpf.UnixTime64 CPU int Origin libpf.Origin OffTime int64 - EnvVars map[libpf.String]libpf.String + PID, TID libpf.PID } // TraceEvents holds known information about a trace. type TraceEvents struct { + Labels map[libpf.String]libpf.String Frames libpf.Frames Timestamps []uint64 // in nanoseconds OffTimes []int64 // in nanoseconds - EnvVars map[libpf.String]libpf.String - Labels map[libpf.String]libpf.String } -// TraceAndMetaKey is the deduplication key for samples. This **must always** -// contain all trace fields that aren't already part of the trace hash to ensure -// that we don't accidentally merge traces with different fields. -type TraceAndMetaKey struct { - // Hash is not sent forward, but it is used as the primary key - // to not aggregate difference traces. - Hash libpf.TraceHash - // comm and apmServiceName are provided by the eBPF programs - Comm libpf.String - ApmServiceName string - Pid int64 - Tid int64 - CPU int64 - // Process name is retrieved from /proc/PID/comm - ProcessName libpf.String +// TraceEventsTree stores samples and their related metadata in a tree-like +// structure optimized for the OTel Profiling protocol representation. +type TraceEventsTree map[ResourceKey]ResourceToProfiles + +// ResourceToProfiles holds non-comparable information that belong to +// a resource as well as profiling event data of this resource. +type ResourceToProfiles struct { + // EnvVars can not be part of ResourceKey as maps are not + // comparable. + EnvVars map[libpf.String]libpf.String + + // Events holds the actual profiling information. + Events map[libpf.Origin]SampleToEvents +} + +// SampleToEvents maps a unique trace hash with its meta data to +// trace events. +type SampleToEvents map[SampleKey]*TraceEvents + +// ResourceKey is the deduplication key for samples that describes a unique +// resource. This **must always** contain all trace fields that aren't +// already part of the trace hash to ensure that we don't accidentally merge +// traces with different fields. +type ResourceKey struct { + // ContainerID represents an extracted key from /proc//cgroup. + ContainerID libpf.String + // Executable path is retrieved from /proc/PID/exe ExecutablePath libpf.String + // APMServiceName is provided by the eBPF programs + APMServiceName string + + PID int64 +} + +// SampleKey holds a unique trace hash and its dedicated meta data. +type SampleKey struct { // ExtraMeta stores extra meta info that may have been produced by a // `SampleAttrProducer` instance. May be nil. ExtraMeta any -} -// TraceEventsTree stores samples and their related metadata in a tree-like -// structure optimized for the OTel Profiling protocol representation. -type TraceEventsTree map[ContainerID]map[libpf.Origin]KeyToEventMapping + // Comm is provided by the eBPF programs + Comm libpf.String -// ContainerID represents an extracted key from /proc//cgroup. -type ContainerID = libpf.String + Hash libpf.TraceHash -// KeyToEventMapping supports temporary mapping traces to additional information. -type KeyToEventMapping map[TraceAndMetaKey]*TraceEvents + TID int64 + CPU int64 +} From 11047f0839a92025fc6342ca27202d792ad75065 Mon Sep 17 00:00:00 2001 From: Tommy Reilly Date: Wed, 29 Apr 2026 07:50:37 -0400 Subject: [PATCH 2/2] reporter/samples: replace fork-only heap fields with generic OriginData The parca-agent oomprof path was carrying Allocs/Frees/AllocBytes/FreeBytes on TraceEventMeta as fork-only fields. Replace with a single OriginData any field so origin-specific payload can be carried through the reporter interface without requiring the shared struct to grow per-origin fields. Future upstream changes to TraceEventMeta no longer textually conflict on fork-only field placement; samples.go remains byte-identical to upstream modulo the single OriginData line. --- reporter/samples/samples.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/reporter/samples/samples.go b/reporter/samples/samples.go index f9dd4ef13..37362b676 100644 --- a/reporter/samples/samples.go +++ b/reporter/samples/samples.go @@ -20,9 +20,8 @@ type TraceEventMeta struct { OffTime int64 PID, TID libpf.PID - // Populated by parca-agent's oomprof path for TraceOriginMemory samples. - Allocs, Frees uint64 - AllocBytes, FreeBytes uint64 + // OriginData carries optional Origin-specific payload. + OriginData any } // TraceEvents holds known information about a trace.