Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 22 additions & 20 deletions reporter/base_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,43 +61,45 @@ func (b *baseReporter) ReportTraceEvent(trace *libpf.Trace, meta *samples.TraceE
extraMeta = b.cfg.ExtraSampleAttrProd.CollectExtraSampleMeta(trace, meta)
}

containerID := meta.ContainerID
key := samples.TraceAndMetaKey{
Hash: trace.Hash,
Comm: meta.Comm,
ProcessName: meta.ProcessName,
key := samples.ResourceKey{
APMServiceName: meta.APMServiceName,
ContainerID: meta.ContainerID,
PID: int64(meta.PID),
ExecutablePath: meta.ExecutablePath,
ApmServiceName: meta.APMServiceName,
Pid: int64(meta.PID),
Tid: int64(meta.TID),
CPU: int64(meta.CPU),
ExtraMeta: extraMeta,
}

eventsTree := b.traceEvents.WLock()
defer b.traceEvents.WUnlock(&eventsTree)

if _, exists := (*eventsTree)[samples.ContainerID(containerID)]; !exists {
(*eventsTree)[samples.ContainerID(containerID)] =
make(map[libpf.Origin]samples.KeyToEventMapping)
if _, exists := (*eventsTree)[key]; !exists {
(*eventsTree)[key] = samples.ResourceToProfiles{
EnvVars: meta.EnvVars,
Events: make(map[libpf.Origin]samples.SampleToEvents),
}
}

if _, exists := (*eventsTree)[samples.ContainerID(containerID)][meta.Origin]; !exists {
(*eventsTree)[samples.ContainerID(containerID)][meta.Origin] =
make(samples.KeyToEventMapping)
rtp := (*eventsTree)[key]
if _, exists := rtp.Events[meta.Origin]; !exists {
rtp.Events[meta.Origin] = make(samples.SampleToEvents)
}

if events, exists := (*eventsTree)[samples.ContainerID(containerID)][meta.Origin][key]; exists {
sampleKey := samples.SampleKey{
Hash: trace.Hash,
Comm: meta.Comm,
TID: int64(meta.TID),
CPU: int64(meta.CPU),
ExtraMeta: extraMeta,
}
if events, exists := rtp.Events[meta.Origin][sampleKey]; exists {
events.Timestamps = append(events.Timestamps, uint64(meta.Timestamp))
events.OffTimes = append(events.OffTimes, meta.OffTime)
(*eventsTree)[samples.ContainerID(containerID)][meta.Origin][key] = events
return nil
}
(*eventsTree)[samples.ContainerID(containerID)][meta.Origin][key] = &samples.TraceEvents{

rtp.Events[meta.Origin][sampleKey] = &samples.TraceEvents{
Frames: trace.Frames,
Timestamps: []uint64{uint64(meta.Timestamp)},
OffTimes: []int64{meta.OffTime},
EnvVars: meta.EnvVars,
Labels: trace.CustomLabels,
}
return nil
Expand Down
8 changes: 4 additions & 4 deletions reporter/collector_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,9 @@ func TestCollectorReporterShutdown(t *testing.T) {

traceEventsPtr := r.traceEvents.WLock()
tree := (*traceEventsPtr)
tree[libpf.NullString] = map[libpf.Origin]samples.KeyToEventMapping{
support.TraceOriginProbe: map[samples.TraceAndMetaKey]*samples.TraceEvents{
{Pid: 1}: {
tree[samples.ResourceKey{PID: 1}] = samples.ResourceToProfiles{Events: map[libpf.Origin]samples.SampleToEvents{
support.TraceOriginProbe: {
{}: {
Frames: func() libpf.Frames {
frames := make(libpf.Frames, 0, 1)
frames.Append(&libpf.Frame{
Expand All @@ -103,7 +103,7 @@ func TestCollectorReporterShutdown(t *testing.T) {
Timestamps: []uint64{1, 2, 3, 4},
},
},
}
}}
r.traceEvents.WUnlock(&traceEventsPtr)

ctx, cancelFn := context.WithCancel(t.Context())
Expand Down
92 changes: 46 additions & 46 deletions reporter/internal/pdata/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ func (p *Pdata) Generate(tree samples.TraceEventsTree,
profiles := pprofile.NewProfiles()
dic := profiles.Dictionary()

// Find oldest sample timestamp across all containers to handle buffered samples.
// Find oldest sample timestamp across all resources to handle buffered samples.
adjustedStartTime := collectionStartTime
for _, containerEvents := range tree {
for _, originEvents := range containerEvents {
for _, traceEvents := range originEvents {
for _, ts := range traceEvents.Timestamps {
for _, resourceToEvents := range tree {
for _, traceEvents := range resourceToEvents.Events {
for _, traceInfo := range traceEvents {
for _, ts := range traceInfo.Timestamps {
sampleTime := time.Unix(0, int64(ts))
if sampleTime.Before(adjustedStartTime) {
adjustedStartTime = sampleTime
Expand Down Expand Up @@ -77,14 +77,13 @@ func (p *Pdata) Generate(tree samples.TraceEventsTree,

attrMgr := samples.NewAttrTableManager(stringSet, dic.AttributeTable())

for containerID, originToEvents := range tree {
if len(originToEvents) == 0 {
for resource, toEvents := range tree {
if len(toEvents.Events) == 0 {
continue
}

rp := profiles.ResourceProfiles().AppendEmpty()
rp.Resource().Attributes().PutStr(string(semconv.ContainerIDKey),
containerID.String())
setResourceAttributes(rp.Resource().Attributes(), resource, toEvents.EnvVars)
rp.SetSchemaUrl(semconv.SchemaURL)

sp := rp.ScopeProfiles().AppendEmpty()
Expand All @@ -97,19 +96,20 @@ func (p *Pdata) Generate(tree samples.TraceEventsTree,
support.TraceOriginOffCPU,
support.TraceOriginProbe,
} {
if len(originToEvents[origin]) == 0 {
if len(toEvents.Events[origin]) == 0 {
// Do not append empty profiles.
continue
}

prof := sp.Profiles().AppendEmpty()
if err := p.setProfile(dic,
attrMgr, stringSet, funcSet, mappingSet, stackSet, locationSet,
origin, originToEvents[origin], prof,
if err := p.setProfile(dic, attrMgr,
stringSet, funcSet, mappingSet, stackSet, locationSet,
origin, toEvents.Events[origin], prof,
collectionStartTime, collectionEndTime); err != nil {
return profiles, err
}
}

}

// Populate the ProfilesDictionary tables.
Expand Down Expand Up @@ -144,7 +144,7 @@ func (p *Pdata) setProfile(
stackSet orderedset.OrderedSet[stackInfo],
locationSet orderedset.OrderedSet[locationInfo],
origin libpf.Origin,
events map[samples.TraceAndMetaKey]*samples.TraceEvents,
events samples.SampleToEvents,
profile pprofile.Profile,
collectionStartTime, collectionEndTime time.Time,
) error {
Expand All @@ -169,7 +169,7 @@ func (p *Pdata) setProfile(
return fmt.Errorf("generating profile for unsupported origin %d", origin)
}

for traceKey, traceInfo := range events {
for sampleKey, traceInfo := range events {
sample := profile.Samples().AppendEmpty()

sample.TimestampsUnixNano().FromRaw(traceInfo.Timestamps)
Expand All @@ -183,7 +183,7 @@ func (p *Pdata) setProfile(
frame := uniqueFrame.Value()
locInfo := locationInfo{
address: uint64(frame.AddressOrLineno),
frameType: frame.Type.String(),
frameType: frame.Type,
}

index, ok := mappingSet.AddWithCheck(frame.Mapping)
Expand Down Expand Up @@ -234,7 +234,7 @@ func (p *Pdata) setProfile(
line.SetFunctionIndex(locInfo.functionIndex)
}
attrMgr.AppendOptionalString(loc.AttributeIndices(),
semconv.ProfileFrameTypeKey, locInfo.frameType)
semconv.ProfileFrameTypeKey, locInfo.frameType.String())
}
locationIndices = append(locationIndices, idx)
} // End per-frame processing
Expand All @@ -251,34 +251,6 @@ func (p *Pdata) setProfile(
}
sample.SetStackIndex(stackIdx)

exeName := ""
if traceKey.ExecutablePath != libpf.NullString {
_, exeName = filepath.Split(traceKey.ExecutablePath.String())
}

attrMgr.AppendOptionalString(sample.AttributeIndices(),
semconv.ThreadNameKey, traceKey.Comm.String())

attrMgr.AppendOptionalString(sample.AttributeIndices(),
semconv.ProcessExecutableNameKey, exeName)
attrMgr.AppendOptionalString(sample.AttributeIndices(),
semconv.ProcessExecutablePathKey, traceKey.ExecutablePath.String())

attrMgr.AppendOptionalString(sample.AttributeIndices(),
semconv.ServiceNameKey, traceKey.ApmServiceName)
attrMgr.AppendInt(sample.AttributeIndices(),
semconv.ProcessPIDKey, traceKey.Pid)
attrMgr.AppendInt(sample.AttributeIndices(),
semconv.ThreadIDKey, traceKey.Tid)
attrMgr.AppendInt(sample.AttributeIndices(),
semconv.CPULogicalNumberKey, int64(traceKey.CPU))

for key, value := range traceInfo.EnvVars {
env := semconv.ProcessEnvironmentVariable(key.String(), value.String())
attrMgr.AppendOptionalString(
sample.AttributeIndices(),
env.Key, env.Value.AsString())
}
for key, value := range traceInfo.Labels {
// Once https://github.com/open-telemetry/semantic-conventions/issues/2561
// reached an agreement, use the actual OTel SemConv attribute.
Expand All @@ -288,8 +260,15 @@ func (p *Pdata) setProfile(
value.String())
}

attrMgr.AppendOptionalString(sample.AttributeIndices(),
semconv.ThreadNameKey, sampleKey.Comm.String())
attrMgr.AppendInt(sample.AttributeIndices(),
semconv.ThreadIDKey, sampleKey.TID)
attrMgr.AppendInt(sample.AttributeIndices(),
semconv.CPULogicalNumberKey, int64(sampleKey.CPU))

if p.ExtraSampleAttrProd != nil {
extra := p.ExtraSampleAttrProd.ExtraSampleAttrs(attrMgr, traceKey.ExtraMeta)
extra := p.ExtraSampleAttrProd.ExtraSampleAttrs(attrMgr, sampleKey.ExtraMeta)
sample.AttributeIndices().Append(extra...)
}
} // End sample processing
Expand All @@ -301,3 +280,24 @@ func (p *Pdata) setProfile(

return nil
}

func setResourceAttributes(attrs pcommon.Map, resource samples.ResourceKey, envVars map[libpf.String]libpf.String) {
if resource.APMServiceName != "" {
attrs.PutStr(string(semconv.ServiceNameKey), resource.APMServiceName)
}
if resource.ContainerID != libpf.NullString {
attrs.PutStr(string(semconv.ContainerIDKey), resource.ContainerID.String())
}

attrs.PutInt(string(semconv.ProcessPIDKey), resource.PID)

if resource.ExecutablePath != libpf.NullString {
attrs.PutStr(string(semconv.ProcessExecutablePathKey), resource.ExecutablePath.String())
_, exeName := filepath.Split(resource.ExecutablePath.String())
attrs.PutStr(string(semconv.ProcessExecutableNameKey), exeName)
}

for key, value := range envVars {
attrs.PutStr("process.environment_variable."+key.String(), value.String())
}
}
Loading
Loading