Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion reporter/collector_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,12 @@ func (r *CollectorReporter) reportProfile(ctx context.Context) error {
*traceEventsPtr = newEvents
r.traceEvents.WUnlock(&traceEventsPtr)

profiles := r.pdata.Generate(reportedEvents, r.name, r.version)
profiles, err := r.pdata.Generate(reportedEvents, r.name, r.version)
if err != nil {
log.Errorf("pdata: %v", err)
return nil
}

if profiles.SampleCount() == 0 {
log.Debugf("Skip sending profile with no samples")
return nil
Expand Down
252 changes: 108 additions & 144 deletions reporter/internal/pdata/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package pdata // import "go.opentelemetry.io/ebpf-profiler/reporter/internal/pdata"

import (
"fmt"
"path/filepath"
"slices"
"time"
Expand All @@ -29,16 +30,28 @@ const (
// Generate generates a pdata request out of internal profiles data, to be
// exported.
func (p *Pdata) Generate(tree samples.TraceEventsTree,
agentName, agentVersion string) pprofile.Profiles {
agentName, agentVersion string) (pprofile.Profiles, error) {
profiles := pprofile.NewProfiles()
dic := profiles.ProfilesDictionary()

// Temporary helpers that will build the various tables in ProfilesDictionary.
stringSet := make(OrderedSet[string], 64)
funcSet := make(OrderedSet[funcInfo], 64)
mappingSet := make(OrderedSet[libpf.FileID], 64)
locationSet := make(OrderedSet[locationInfo], 64)

// By specification, the first element should be empty.
stringSet.Add("")
funcSet.Add(funcInfo{nameIdx: stringSet.Add(""), fileNameIdx: stringSet.Add("")})

for containerID, originToEvents := range tree {
if len(originToEvents) == 0 {
continue
}

rp := profiles.ResourceProfiles().AppendEmpty()
rp.Resource().Attributes().PutStr(string(semconv.ContainerIDKey), string(containerID))
rp.Resource().Attributes().PutStr(string(semconv.ContainerIDKey),
Comment thread
christos68k marked this conversation as resolved.
string(containerID))

sp := rp.ScopeProfiles().AppendEmpty()
sp.Scope().SetName(agentName)
Expand All @@ -49,61 +62,73 @@ func (p *Pdata) Generate(tree samples.TraceEventsTree,
support.TraceOriginOffCPU,
} {
if len(originToEvents[origin]) == 0 {
// Do not append empty profiles, if there
// is not profiling data for this origin.
// Do not append empty profiles.
continue
}

prof := sp.Profiles().AppendEmpty()

p.setProfile(profiles.ProfilesDictionary(), origin, originToEvents[origin], prof)
if err := p.setProfile(dic,
stringSet, funcSet, mappingSet, locationSet,
origin, originToEvents[origin], prof); err != nil {
return profiles, err
}
}
}
return profiles

// Populate the ProfilesDictionary tables.
funcTable := dic.FunctionTable()
funcTable.EnsureCapacity(len(funcSet))
for range funcSet {
funcTable.AppendEmpty()
}
for v, idx := range funcSet {
f := funcTable.At(int(idx))
f.SetNameStrindex(v.nameIdx)
f.SetFilenameStrindex(v.fileNameIdx)
}

stringTable := dic.StringTable()
stringTable.EnsureCapacity(len(stringSet))
for _, val := range stringSet.ToSlice() {
Comment thread
christos68k marked this conversation as resolved.
stringTable.Append(val)
}

return profiles, nil
}

// setProfile sets the data an OTLP profile with all collected samples up to
// this moment.
func (p *Pdata) setProfile(
dic pprofile.ProfilesDictionary,
stringSet OrderedSet[string],
funcSet OrderedSet[funcInfo],
mappingSet OrderedSet[libpf.FileID],
locationSet OrderedSet[locationInfo],
origin libpf.Origin,
events map[samples.TraceAndMetaKey]*samples.TraceEvents,
profile pprofile.Profile,
) {
// stringMap is a temporary helper that will build the StringTable.
// By specification, the first element should be empty.
stringMap := make(map[string]int32)
stringMap[""] = 0

// funcMap is a temporary helper that will build the Function array
// in profile and make sure information is deduplicated.
funcMap := make(map[samples.FuncInfo]int32)
funcMap[samples.FuncInfo{Name: "", FileName: ""}] = 0

) error {
st := profile.SampleType().AppendEmpty()
switch origin {
case support.TraceOriginSampling:
st.SetTypeStrindex(getStringMapIndex(stringMap, "samples"))
st.SetUnitStrindex(getStringMapIndex(stringMap, "count"))

profile.SetPeriod(1e9 / int64(p.samplesPerSecond))
pt := profile.PeriodType()
pt.SetTypeStrindex(getStringMapIndex(stringMap, "cpu"))
pt.SetUnitStrindex(getStringMapIndex(stringMap, "nanoseconds"))
pt.SetTypeStrindex(stringSet.Add("cpu"))
pt.SetUnitStrindex(stringSet.Add("nanoseconds"))

profile.SetPeriod(1e9 / int64(p.samplesPerSecond))
st.SetTypeStrindex(stringSet.Add("samples"))
st.SetUnitStrindex(stringSet.Add("count"))
case support.TraceOriginOffCPU:
st.SetTypeStrindex(getStringMapIndex(stringMap, "events"))
st.SetUnitStrindex(getStringMapIndex(stringMap, "nanoseconds"))
st.SetTypeStrindex(stringSet.Add("events"))
st.SetUnitStrindex(stringSet.Add("nanoseconds"))
default:
log.Errorf("Generating profile for unsupported origin %d", origin)
return
// Should never happen
return fmt.Errorf("generating profile for unsupported origin %d", origin)
}

// Temporary lookup to reference existing Mappings.
fileIDtoMapping := make(map[libpf.FileID]int32)

attrMgr := samples.NewAttrTableManager(dic.AttributeTable())
var locationIndex int32

locationIndex := int32(profile.LocationIndices().Len())
var startTS, endTS pcommon.Timestamp
for traceKey, traceInfo := range events {
sample := profile.Sample().AppendEmpty()
Expand All @@ -112,7 +137,6 @@ func (p *Pdata) setProfile(
slices.Sort(traceInfo.Timestamps)
startTS = pcommon.Timestamp(traceInfo.Timestamps[0])
endTS = pcommon.Timestamp(traceInfo.Timestamps[len(traceInfo.Timestamps)-1])

sample.TimestampsUnixNano().FromRaw(traceInfo.Timestamps)

switch origin {
Expand All @@ -124,27 +148,18 @@ func (p *Pdata) setProfile(

// Walk every frame of the trace.
for i := range traceInfo.FrameTypes {
loc := dic.LocationTable().AppendEmpty()
loc.SetAddress(uint64(traceInfo.Linenos[i]))
attrMgr.AppendOptionalString(loc.AttributeIndices(),
semconv.ProfileFrameTypeKey, traceInfo.FrameTypes[i].String())

locInfo := locationInfo{
address: uint64(traceInfo.Linenos[i]),
frameType: traceInfo.FrameTypes[i].String(),
}
switch frameKind := traceInfo.FrameTypes[i]; frameKind {
case libpf.NativeFrame:
// As native frames are resolved in the backend, we use Mapping to
// report these frames.

var locationMappingIndex int32
if tmpMappingIndex, exists := fileIDtoMapping[traceInfo.Files[i]]; exists {
locationMappingIndex = tmpMappingIndex
} else {
idx := int32(len(fileIDtoMapping))
fileIDtoMapping[traceInfo.Files[i]] = idx
locationMappingIndex = idx

locationMappingIndex, exists := mappingSet.AddWithCheck(traceInfo.Files[i])
if !exists {
ei, exists := p.Executables.GetAndRefresh(traceInfo.Files[i],
ExecutableCacheLifetime)

// Next step: Select a proper default value,
// if the name of the executable is not known yet.
fileName := "UNKNOWN"
Expand All @@ -156,7 +171,7 @@ func (p *Pdata) setProfile(
mapping.SetMemoryStart(uint64(traceInfo.MappingStarts[i]))
mapping.SetMemoryLimit(uint64(traceInfo.MappingEnds[i]))
mapping.SetFileOffset(traceInfo.MappingFileOffsets[i])
mapping.SetFilenameStrindex(getStringMapIndex(stringMap, fileName))
mapping.SetFilenameStrindex(stringSet.Add(fileName))

// Once SemConv and its Go package is released with the new
// semantic convention for build_id, replace these hard coded
Expand All @@ -168,38 +183,65 @@ func (p *Pdata) setProfile(
semconv.ProcessExecutableBuildIDHtlhashKey,
traceInfo.Files[i].StringNoQuotes())
}
loc.SetMappingIndex(locationMappingIndex)
locInfo.mappingIndex = locationMappingIndex
case libpf.AbortFrame:
// Next step: Figure out how the OTLP protocol
// could handle artificial frames, like AbortFrame,
// that are not originated from a native or interpreted
// program.
default:
// Store interpreted frame information as a Line message:
line := loc.Line().AppendEmpty()

// Store interpreted frame information as a Line message
locInfo.hasLine = true
if si, exists := p.Frames.GetAndRefresh(
libpf.NewFrameID(traceInfo.Files[i], traceInfo.Linenos[i]),
FramesCacheLifetime); exists {
line.SetLine(int64(si.LineNumber))

line.SetFunctionIndex(createFunctionEntry(funcMap,
si.FunctionName, si.FilePath))
locInfo.lineNumber = int64(si.LineNumber)
fi := funcInfo{
nameIdx: stringSet.Add(si.FunctionName),
fileNameIdx: stringSet.Add(si.FilePath),
}
locInfo.functionIndex = funcSet.Add(fi)
} else {
// At this point, we do not have enough information for the frame.
// Therefore, we report a dummy entry and use the interpreter as filename.
// To differentiate this case from the case where no information about
// the file ID is available at all, we use a different name for reported
// function.
line.SetFunctionIndex(createFunctionEntry(funcMap,
"UNRESOLVED", frameKind.String()))
fi := funcInfo{
nameIdx: stringSet.Add("UNRESOLVED"),
fileNameIdx: stringSet.Add(frameKind.String()),
}
locInfo.functionIndex = funcSet.Add(fi)
}

// To be compliant with the protocol, generate a dummy mapping entry.
loc.SetMappingIndex(getDummyMappingIndex(fileIDtoMapping, stringMap,
attrMgr, dic, traceInfo.Files[i]))
idx, exists := mappingSet.AddWithCheck(traceInfo.Files[i])
locInfo.mappingIndex = idx
if !exists {
// To be compliant with the protocol, generate a dummy mapping entry.
mapping := dic.MappingTable().AppendEmpty()
mapping.SetFilenameStrindex(stringSet.Add(""))
attrMgr.AppendOptionalString(mapping.AttributeIndices(),
semconv.ProcessExecutableBuildIDHtlhashKey,
traceInfo.Files[i].StringNoQuotes())
}
} // End frame type switch

idx, exists := locationSet.AddWithCheck(locInfo)
if !exists {
// Add a new Location to the dictionary
loc := dic.LocationTable().AppendEmpty()
loc.SetAddress(locInfo.address)
loc.SetMappingIndex(locInfo.mappingIndex)
if locInfo.hasLine {
line := loc.Line().AppendEmpty()
line.SetLine(locInfo.lineNumber)
line.SetFunctionIndex(locInfo.functionIndex)
}
attrMgr.AppendOptionalString(loc.AttributeIndices(),
semconv.ProfileFrameTypeKey, locInfo.frameType)
}
}
profile.LocationIndices().Append(idx)
} // End per-frame processing

exeName := traceKey.ExecutablePath
if exeName != "" {
Expand Down Expand Up @@ -237,90 +279,12 @@ func (p *Pdata) setProfile(

sample.SetLocationsLength(int32(len(traceInfo.FrameTypes)))
locationIndex += sample.LocationsLength()
}
log.Debugf("Reporting OTLP profile with %d samples", profile.Sample().Len())

// Populate the deduplicated functions into profile.
funcTable := dic.FunctionTable()
funcTable.EnsureCapacity(len(funcMap))
for range funcMap {
funcTable.AppendEmpty()
}
for v, idx := range funcMap {
f := funcTable.At(int(idx))
f.SetNameStrindex(getStringMapIndex(stringMap, v.Name))
f.SetFilenameStrindex(getStringMapIndex(stringMap, v.FileName))
}
} // End sample processing

// When ranging over stringMap, the order will be according to the
// hash value of the key. To get the correct order for profile.StringTable,
// put the values in stringMap, in the correct array order.
stringTable := make([]string, len(stringMap))
for v, idx := range stringMap {
stringTable[idx] = v
}

for _, v := range stringTable {
dic.StringTable().Append(v)
}

// profile.LocationIndices is not optional, and we only write elements into
// profile.Location that at least one sample references.
for i := int32(0); i < int32(dic.LocationTable().Len()); i++ {
profile.LocationIndices().Append(i)
}
log.Debugf("Reporting OTLP profile with %d samples", profile.Sample().Len())

profile.SetDuration(endTS - startTS)
profile.SetStartTime(startTS)
}

// getStringMapIndex inserts or looks up the index for value in stringMap.
func getStringMapIndex(stringMap map[string]int32, value string) int32 {
if idx, exists := stringMap[value]; exists {
return idx
}

idx := int32(len(stringMap))
stringMap[value] = idx

return idx
}

// createFunctionEntry adds a new function and returns its reference index.
func createFunctionEntry(funcMap map[samples.FuncInfo]int32,
name string, fileName string,
) int32 {
key := samples.FuncInfo{
Name: name,
FileName: fileName,
}
if idx, exists := funcMap[key]; exists {
return idx
}

idx := int32(len(funcMap))
funcMap[key] = idx

return idx
}

// getDummyMappingIndex inserts or looks up an entry for interpreted FileIDs.
func getDummyMappingIndex(fileIDtoMapping map[libpf.FileID]int32,
stringMap map[string]int32, attrMgr *samples.AttrTableManager,
dic pprofile.ProfilesDictionary,
fileID libpf.FileID,
) int32 {
if mappingIndex, exists := fileIDtoMapping[fileID]; exists {
return mappingIndex
}

locationMappingIndex := int32(len(fileIDtoMapping))
fileIDtoMapping[fileID] = locationMappingIndex

mapping := dic.MappingTable().AppendEmpty()
mapping.SetFilenameStrindex(getStringMapIndex(stringMap, ""))
attrMgr.AppendOptionalString(mapping.AttributeIndices(),
semconv.ProcessExecutableBuildIDHtlhashKey,
fileID.StringNoQuotes())
return locationMappingIndex
return nil
}
Loading