diff --git a/reporter/datadog_reporter.go b/reporter/datadog_reporter.go index a85acfd6..9b6607de 100644 --- a/reporter/datadog_reporter.go +++ b/reporter/datadog_reporter.go @@ -8,8 +8,8 @@ package reporter import ( "bytes" "context" + "errors" "fmt" - "maps" "os" "path" "runtime" @@ -24,12 +24,16 @@ import ( "go.opentelemetry.io/ebpf-profiler/libpf/xsync" "go.opentelemetry.io/ebpf-profiler/reporter" "go.opentelemetry.io/ebpf-profiler/reporter/samples" + "go.opentelemetry.io/ebpf-profiler/support" "github.com/DataDog/dd-otel-host-profiler/containermetadata" + "github.com/DataDog/dd-otel-host-profiler/reporter/pprof" + rsamples "github.com/DataDog/dd-otel-host-profiler/reporter/samples" ) // Assert that we implement the full Reporter interface. var _ reporter.Reporter = (*DatadogReporter)(nil) +var errUnknownOrigin = errors.New("unknown trace origin") const ( unknownServiceStr = "unknown-service" @@ -37,7 +41,6 @@ const ( profilerName = "dd-otel-host-profiler" pidCacheUpdateInterval = 1 * time.Minute // pid cache items will be updated at most once per this interval pidCacheCleanupInterval = 5 * time.Minute // pid cache items for which metadata hasn't been updated in this interval will be removed - executableCacheLifetime = 1 * time.Hour // executable cache items will be removed if unused after this interval framesCacheLifetime = 1 * time.Hour // frames cache items will be removed if unused after this interval profileUploadWorkerCount = 5 @@ -46,39 +49,6 @@ const ( var ServiceNameEnvVars = []string{"DD_SERVICE", "OTEL_SERVICE_NAME"} -// execInfo enriches an executable with additional metadata. -type execInfo struct { - fileName string - gnuBuildID string - goBuildID string -} - -// funcInfo is a helper to construct profile.Function messages. -type funcInfo struct { - name string - fileName string -} - -// traceAndMetaKey is the deduplication key for samples. This **must always** -// contain all trace fields that aren't already part of the trace hash to ensure -// that we don't accidentally merge traces with different fields. -type traceAndMetaKey struct { - hash libpf.TraceHash - // comm and apmServiceName are provided by the eBPF programs - comm string - executablePath string - apmServiceName string - pid libpf.PID - tid libpf.PID -} - -type processMetadata struct { - updatedAt time.Time - executablePath string - containerMetadata containermetadata.ContainerMetadata - ddService string -} - type uploadProfileData struct { start time.Time end time.Time @@ -87,17 +57,6 @@ type uploadProfileData struct { tags Tags } -type serviceEntity struct { - service string - entityID string - inferredService bool -} - -type profileStats struct { - totalSampleCount int - pidWithNoMetadata int -} - // DatadogReporter receives and transforms information to be OTLP/profiles compliant. type DatadogReporter struct { config *Config @@ -110,13 +69,13 @@ type DatadogReporter struct { // be duplicated in other places but not accessible for DatadogReporter. // executables stores metadata for executables. - executables *lru.SyncedLRU[libpf.FileID, execInfo] + executables *lru.SyncedLRU[libpf.FileID, rsamples.ExecInfo] // traceEvents stores reported trace events (trace metadata with frames and counts) - traceEvents xsync.RWMutex[map[traceAndMetaKey]*samples.TraceEvents] + traceEvents xsync.RWMutex[rsamples.TraceEventsTree] // processes stores the metadata associated to a PID. - processes *lru.SyncedLRU[libpf.PID, processMetadata] + processes *lru.SyncedLRU[libpf.PID, rsamples.ProcessMetadata] symbolUploader *DatadogSymbolUploader @@ -131,6 +90,9 @@ type DatadogReporter struct { // profileSeq is the sequence number of the profile (ie. number of profiles uploaded until now). profileSeq uint64 + // processAlreadyExitedCount is the number of processes that have already exited when attempting to collect their metadata. + processAlreadyExitedCount int + // intervalStart is the timestamp of the start of the current interval. intervalStart time.Time @@ -138,13 +100,13 @@ type DatadogReporter struct { } func NewDatadog(cfg *Config, p containermetadata.Provider) (*DatadogReporter, error) { - executables, err := lru.NewSynced[libpf.FileID, execInfo](cfg.ExecutablesCacheElements, libpf.FileID.Hash32) + executables, err := lru.NewSynced[libpf.FileID, rsamples.ExecInfo](cfg.ExecutablesCacheElements, libpf.FileID.Hash32) if err != nil { return nil, err } - executables.SetLifetime(executableCacheLifetime) + executables.SetLifetime(rsamples.ExecutableCacheLifetime) - processes, err := lru.NewSynced[libpf.PID, processMetadata](cfg.ProcessesCacheElements, libpf.PID.Hash32) + processes, err := lru.NewSynced[libpf.PID, rsamples.ProcessMetadata](cfg.ProcessesCacheElements, libpf.PID.Hash32) if err != nil { return nil, err } @@ -170,7 +132,7 @@ func NewDatadog(cfg *Config, p containermetadata.Provider) (*DatadogReporter, er }, executables: executables, containerMetadataProvider: p, - traceEvents: xsync.NewRWMutex(map[traceAndMetaKey]*samples.TraceEvents{}), + traceEvents: xsync.NewRWMutex(make(rsamples.TraceEventsTree)), processes: processes, symbolUploader: symbolUploader, tags: createTags(cfg.Tags, runtimeTag, cfg.Version, cfg.EnableSplitByService), @@ -182,31 +144,57 @@ func NewDatadog(cfg *Config, p containermetadata.Provider) (*DatadogReporter, er // ReportTraceEvent enqueues reported trace events for the Datadog reporter. func (r *DatadogReporter) ReportTraceEvent(trace *libpf.Trace, meta *samples.TraceEventMeta) error { - traceEventsMap := r.traceEvents.WLock() - defer r.traceEvents.WUnlock(&traceEventsMap) + if meta.Origin != support.TraceOriginSampling && meta.Origin != support.TraceOriginOffCPU { + // At the moment only on-CPU and off-CPU traces are reported. + return fmt.Errorf("skip reporting trace for %d origin: %w", meta.Origin, + errUnknownOrigin) + } + + pMeta, ok := r.processes.Get(meta.PID) + if !ok || time.Since(pMeta.UpdatedAt) > pidCacheUpdateInterval { + pMeta = r.addProcessMetadata(trace, meta) + } - if pMeta, ok := r.processes.Get(meta.PID); !ok || time.Since(pMeta.updatedAt) > pidCacheUpdateInterval { - r.addProcessMetadata(meta) + key := rsamples.TraceAndMetaKey{ + Hash: trace.Hash, + Comm: meta.Comm, + Pid: meta.PID, + Tid: meta.TID, } - key := traceAndMetaKey{ - hash: trace.Hash, - comm: meta.Comm, - executablePath: meta.ExecutablePath, - apmServiceName: meta.APMServiceName, - pid: meta.PID, - tid: meta.TID, + eventsTree := r.traceEvents.WLock() + defer r.traceEvents.WUnlock(&eventsTree) + + serviceEntityKey := rsamples.ServiceEntity{ + Service: pMeta.Service, + EntityID: pMeta.ContainerMetadata.EntityID, + InferredService: pMeta.InferredService, } - if tr, exists := (*traceEventsMap)[key]; exists { - tr.Timestamps = append(tr.Timestamps, uint64(meta.Timestamp)) - (*traceEventsMap)[key] = tr + perServiceEvents, exists := (*eventsTree)[serviceEntityKey] + if !exists { + perServiceEvents = make(map[libpf.Origin]rsamples.KeyToEventMapping) + (*eventsTree)[serviceEntityKey] = perServiceEvents + } + + perOriginEvents, exists := perServiceEvents[meta.Origin] + if !exists { + perOriginEvents = make(rsamples.KeyToEventMapping) + perServiceEvents[meta.Origin] = perOriginEvents + } + + if events, exists := perOriginEvents[key]; exists { + events.Timestamps = append(events.Timestamps, uint64(meta.Timestamp)) + events.OffTimes = append(events.OffTimes, meta.OffTime) + perOriginEvents[key] = events return nil } - (*traceEventsMap)[key] = &samples.TraceEvents{ + perOriginEvents[key] = &samples.TraceEvents{ Frames: trace.Frames, Timestamps: []uint64{uint64(meta.Timestamp)}, + OffTimes: []int64{meta.OffTime}, + EnvVars: meta.EnvVars, } return nil @@ -215,17 +203,17 @@ func (r *DatadogReporter) ReportTraceEvent(trace *libpf.Trace, meta *samples.Tra // ExecutableKnown returns true if the metadata of the Executable specified by fileID is // cached in the reporter. func (r *DatadogReporter) ExecutableKnown(fileID libpf.FileID) bool { - _, known := r.executables.GetAndRefresh(fileID, executableCacheLifetime) + _, known := r.executables.GetAndRefresh(fileID, rsamples.ExecutableCacheLifetime) return known } // ExecutableMetadata accepts a fileID with the corresponding filename // and caches this information. func (r *DatadogReporter) ExecutableMetadata(args *reporter.ExecutableMetadataArgs) { - r.executables.Add(args.FileID, execInfo{ - fileName: path.Base(args.FileName), - gnuBuildID: args.GnuBuildID, - goBuildID: args.GoBuildID, + r.executables.Add(args.FileID, rsamples.ExecInfo{ + FileName: path.Base(args.FileName), + GnuBuildID: args.GnuBuildID, + GoBuildID: args.GoBuildID, }) if r.symbolUploader != nil && args.Interp == libpf.Native { @@ -331,136 +319,6 @@ func (r *DatadogReporter) reportProfile(ctx context.Context, data *uploadProfile data.entityID, r.family) } -func (r *DatadogReporter) createProfile(hostSamples map[traceAndMetaKey]*samples.TraceEvents, start, end time.Time) (*pprofile.Profile, profileStats) { - numSamples := len(hostSamples) - - const unknownStr = "UNKNOWN" - - // funcMap is a temporary helper that will build the Function array - // in profile and make sure information is deduplicated. - funcMap := make(map[funcInfo]*pprofile.Function) - - samplingPeriod := 1000000000 / int64(r.config.SamplesPerSecond) - profile := &pprofile.Profile{ - SampleType: []*pprofile.ValueType{{Type: "cpu-samples", Unit: "count"}, - {Type: "cpu-time", Unit: "nanoseconds"}}, - Sample: make([]*pprofile.Sample, 0, numSamples), - PeriodType: &pprofile.ValueType{Type: "cpu-time", Unit: "nanoseconds"}, - Period: samplingPeriod, - DefaultSampleType: "cpu-time", - } - - fileIDtoMapping := make(map[libpf.FileID]*pprofile.Mapping) - totalSampleCount := 0 - pidsWithNoProcessMetadata := libpf.Set[libpf.PID]{} - for traceKey, traceInfo := range hostSamples { - sample := &pprofile.Sample{} - - // Walk every frame of the trace. - for _, uniqueFrame := range traceInfo.Frames { - frame := uniqueFrame.Value() - loc := createPProfLocation(profile, uint64(frame.AddressOrLineno)) - - switch frameKind := frame.Type; frameKind { - case libpf.NativeFrame: - // As native frames are resolved in the backend, we use Mapping to - // report these frames. - - if tmpMapping, exists := fileIDtoMapping[frame.FileID]; exists { - loc.Mapping = tmpMapping - } else { - executionInfo, exists := r.executables.GetAndRefresh(frame.FileID, executableCacheLifetime) - - // Next step: Select a proper default value, - // if the name of the executable is not known yet. - var fileName = unknownStr - var buildID = frame.FileID.StringNoQuotes() - if exists { - fileName = executionInfo.fileName - buildID = getBuildID(executionInfo.gnuBuildID, executionInfo.goBuildID, buildID) - } - - tmpMapping := createPprofMapping(profile, uint64(frame.AddressOrLineno), - fileName, buildID) - fileIDtoMapping[frame.FileID] = tmpMapping - loc.Mapping = tmpMapping - } - line := pprofile.Line{Function: createPprofFunctionEntry(funcMap, profile, "", - loc.Mapping.File)} - loc.Line = append(loc.Line, line) - case libpf.AbortFrame: - // Next step: Figure out how the OTLP protocol - // could handle artificial frames, like AbortFrame, - // that are not originate from a native or interpreted - // program. - default: - // Store interpreted frame information as Line message: - line := pprofile.Line{ - Line: int64(frame.SourceLine), - Function: createPprofFunctionEntry(funcMap, profile, - frame.FunctionName.String(), frame.SourceFile.String()), - } - - loc.Line = append(loc.Line, line) - // To be compliant with the protocol generate a dummy mapping entry. - loc.Mapping = getDummyMapping(fileIDtoMapping, profile, frame.FileID) - } - sample.Location = append(sample.Location, loc) - } - - processMeta, ok := r.processes.Get(traceKey.pid) - if !ok { - pidsWithNoProcessMetadata[traceKey.pid] = libpf.Void{} - } - execPath := getExecutablePath(&processMeta, &traceKey) - - // Check if the last frame is a kernel frame. - if isKernel(traceInfo) { - // If the last frame is a kernel frame, we need to add a dummy - // location with the kernel as the function name. - execPath = "kernel" - } - baseExec := path.Base(execPath) - - if execPath != "" { - loc := createPProfLocation(profile, 0) - m := createPprofFunctionEntry(funcMap, profile, baseExec, execPath) - loc.Line = append(loc.Line, pprofile.Line{Function: m}) - sample.Location = append(sample.Location, loc) - } - - if !r.config.Timeline { - count := int64(len(traceInfo.Timestamps)) - labels := make(map[string][]string) - addTraceLabels(labels, traceKey, processMeta.containerMetadata, baseExec, 0) - sample.Value = append(sample.Value, count, count*samplingPeriod) - sample.Label = labels - profile.Sample = append(profile.Sample, sample) - } else { - sample.Value = append(sample.Value, 1, samplingPeriod) - for _, ts := range traceInfo.Timestamps { - sampleWithTimestamp := &pprofile.Sample{} - *sampleWithTimestamp = *sample - labels := make(map[string][]string) - addTraceLabels(labels, traceKey, processMeta.containerMetadata, baseExec, ts) - sampleWithTimestamp.Label = labels - profile.Sample = append(profile.Sample, sampleWithTimestamp) - } - } - totalSampleCount += len(traceInfo.Timestamps) - } - - profile.DurationNanos = end.Sub(start).Nanoseconds() - profile.TimeNanos = start.UnixNano() - - profile = profile.Compact() - - return profile, profileStats{ - totalSampleCount: totalSampleCount, - pidWithNoMetadata: len(pidsWithNoProcessMetadata), - } -} - // getPprofProfile returns a pprof profile containing all collected samples up to this moment. func (r *DatadogReporter) getPprofProfile() { intervalEnd := time.Now() @@ -469,17 +327,22 @@ func (r *DatadogReporter) getPprofProfile() { profileSeq := r.profileSeq r.profileSeq++ + processAlreadyExitedCount := r.processAlreadyExitedCount + r.processAlreadyExitedCount = 0 + events := r.traceEvents.WLock() - hostSamples := maps.Clone(*events) - for key := range *events { - delete(*events, key) - } + reportedEvents := *events + newEvents := make(rsamples.TraceEventsTree) + *events = newEvents r.traceEvents.WUnlock(&events) - entityToSample := make(map[serviceEntity]map[traceAndMetaKey]*samples.TraceEvents) - if !r.config.EnableSplitByService { - profile, stats := r.createProfile(hostSamples, intervalStart, intervalEnd) + profileBuilder := pprof.NewProfileBuilder(intervalStart, intervalEnd, r.config.SamplesPerSecond, len(reportedEvents), r.config.Timeline, r.executables, r.processes) + + for _, events := range reportedEvents { + profileBuilder.AddEvents(events[support.TraceOriginSampling]) + } + profile, stats := profileBuilder.Build() tags := createTagsForProfile(r.tags, profileSeq, r.config.HostServiceName, false) r.profiles <- &uploadProfileData{ @@ -490,63 +353,29 @@ func (r *DatadogReporter) getPprofProfile() { } log.Infof("Tags: %v", tags) log.Infof("Reporting single profile #%d from %v to %v: %d samples, %d PIDs with no process metadata", - profileSeq, intervalStart.Format(time.RFC3339), intervalEnd.Format(time.RFC3339), stats.totalSampleCount, stats.pidWithNoMetadata) + profileSeq, intervalStart.Format(time.RFC3339), intervalEnd.Format(time.RFC3339), stats.TotalSampleCount, processAlreadyExitedCount) return } - for traceKey, traceInfo := range hostSamples { - processMeta, _ := r.processes.Get(traceKey.pid) - - service := processMeta.ddService - execPath := getExecutablePath(&processMeta, &traceKey) - inferredService := false - - if service == "" && execPath != "" && execPath != "/" { - service = path.Base(execPath) - inferredService = true - } - - if service == "" && isKernel(traceInfo) { - service = "system" - } - - if service == "" { - service = unknownServiceStr - inferredService = true - } - - entity := serviceEntity{ - service: service + r.config.SplitServiceSuffix, - entityID: processMeta.containerMetadata.EntityID, - inferredService: inferredService, - } - serviceSamples, exists := entityToSample[entity] - if !exists { - serviceSamples = make(map[traceAndMetaKey]*samples.TraceEvents) - entityToSample[entity] = serviceSamples - } - - serviceSamples[traceKey] = traceInfo - } - totalSampleCount := 0 - totalPIDsWithNoProcessMetadata := 0 - for e, s := range entityToSample { - profile, stats := r.createProfile(s, intervalStart, intervalEnd) - totalSampleCount += stats.totalSampleCount - totalPIDsWithNoProcessMetadata += stats.pidWithNoMetadata - tags := createTagsForProfile(r.tags, profileSeq, e.service, e.inferredService) + for s, perServiceEvents := range reportedEvents { + profileBuilder := pprof.NewProfileBuilder(intervalStart, intervalEnd, r.config.SamplesPerSecond, len(reportedEvents), r.config.Timeline, r.executables, r.processes) + + profileBuilder.AddEvents(perServiceEvents[support.TraceOriginSampling]) + profile, stats := profileBuilder.Build() + totalSampleCount += stats.TotalSampleCount + tags := createTagsForProfile(r.tags, profileSeq, s.Service, s.InferredService) r.profiles <- &uploadProfileData{ profile: profile, start: intervalStart, end: intervalEnd, - entityID: e.entityID, + entityID: s.EntityID, tags: tags, } - log.Debugf("Reporting profile for service %s: %d samples, %d PIDs with no process metadata, tags: %v", e.service, stats.totalSampleCount, stats.pidWithNoMetadata, tags) + log.Debugf("Reporting profile for service %s: %d samples, tags: %v", s.Service, stats.TotalSampleCount, tags) } log.Infof("Reporting %d profiles #%d from %v to %v: %d samples, %d PIDs with no process metadata", - len(entityToSample), profileSeq, intervalStart.Format(time.RFC3339), intervalEnd.Format(time.RFC3339), totalSampleCount, totalPIDsWithNoProcessMetadata) + len(reportedEvents), profileSeq, intervalStart.Format(time.RFC3339), intervalEnd.Format(time.RFC3339), totalSampleCount, processAlreadyExitedCount) } func createTags(userTags Tags, runtimeTag, version string, splitByServiceEnabled bool) Tags { @@ -585,142 +414,32 @@ func createTagsForProfile(tags Tags, profileSeq uint64, service string, inferred return newTags } -// createFunctionEntry adds a new function and returns its reference index. -func createPprofFunctionEntry(funcMap map[funcInfo]*pprofile.Function, - profile *pprofile.Profile, - name string, fileName string) *pprofile.Function { - key := funcInfo{ - name: name, - fileName: fileName, - } - if function, exists := funcMap[key]; exists { - return function - } - - idx := uint64(len(profile.Function)) + 1 - function := &pprofile.Function{ - ID: idx, - Name: name, - Filename: fileName, - } - profile.Function = append(profile.Function, function) - funcMap[key] = function - - return function -} - -func addTraceLabels(labels map[string][]string, i traceAndMetaKey, containerMetadata containermetadata.ContainerMetadata, - baseExec string, timestamp uint64) { - // The naming has an impact on the backend side, - // this is why we use "thread id", "thread name" and "process name" - if i.tid != 0 { - labels["thread id"] = append(labels["thread id"], fmt.Sprintf("%d", i.tid)) - } - - if i.comm != "" { - labels["thread name"] = append(labels["thread name"], i.comm) - } - - if baseExec != "" { - labels["process name"] = append(labels["process name"], baseExec) - } - - if containerMetadata.PodName != "" { - labels["pod_name"] = append(labels["pod_name"], containerMetadata.PodName) - } - - // In split by service, ContainerID always empty. - if containerMetadata.ContainerID != "" { - labels["container_id"] = append(labels["container_id"], containerMetadata.ContainerID) - } - - if containerMetadata.ContainerName != "" { - labels["container_name"] = append(labels["container_name"], containerMetadata.ContainerName) - } - - if i.apmServiceName != "" { - labels["apmServiceName"] = append(labels["apmServiceName"], i.apmServiceName) - } - - if i.pid != 0 { - labels["process_id"] = append(labels["process_id"], fmt.Sprintf("%d", i.pid)) - } - - if timestamp != 0 { - labels["end_timestamp_ns"] = append(labels["end_timestamp_ns"], strconv.FormatUint(timestamp, 10)) - } -} - -// getDummyMappingIndex inserts or looks up a dummy entry for interpreted FileIDs. -func getDummyMapping(fileIDtoMapping map[libpf.FileID]*pprofile.Mapping, - profile *pprofile.Profile, fileID libpf.FileID) *pprofile.Mapping { - if tmpMapping, exists := fileIDtoMapping[fileID]; exists { - return tmpMapping - } - - mapping := createPprofMapping(profile, 0, "DUMMY", fileID.StringNoQuotes()) - fileIDtoMapping[fileID] = mapping - - return mapping -} - -func createPProfLocation(profile *pprofile.Profile, - address uint64) *pprofile.Location { - idx := uint64(len(profile.Location)) + 1 - location := &pprofile.Location{ - ID: idx, - Address: address, - } - profile.Location = append(profile.Location, location) - return location -} - -func createPprofMapping(profile *pprofile.Profile, offset uint64, - fileName string, buildID string) *pprofile.Mapping { - idx := uint64(len(profile.Mapping)) + 1 - mapping := &pprofile.Mapping{ - ID: idx, - File: fileName, - Offset: offset, - BuildID: buildID, - } - profile.Mapping = append(profile.Mapping, mapping) - return mapping -} - -func getBuildID(gnuBuildID, goBuildID, fileHash string) string { - // When building Go binaries, Bazel will set the Go build ID to "redacted" to - // achieve deterministic builds. Since Go 1.24, the Gnu Build ID is inherited - // from the Go build ID - if the Go build ID is "redacted", the Gnu Build ID will - // be a hash of "redacted". In this case, we should use the file hash instead of build IDs. - if goBuildID == "redacted" { - return fileHash - } - if gnuBuildID != "" { - return gnuBuildID - } - if goBuildID != "" { - return goBuildID - } - return fileHash -} - -func (r *DatadogReporter) addProcessMetadata(meta *samples.TraceEventMeta) { +func (r *DatadogReporter) addProcessMetadata(trace *libpf.Trace, meta *samples.TraceEventMeta) rsamples.ProcessMetadata { pid := meta.PID execPath, err := os.Readlink(fmt.Sprintf("/proc/%d/exe", pid)) if err != nil { + // Process might have exited since the trace was collected or process is a kernel thread. log.Debugf("Failed to get process metadata for PID %d: %v", pid, err) - return + execPath = meta.ExecutablePath + } + + var processName string + if name, err2 := os.ReadFile(fmt.Sprintf("/proc/%d/comm", pid)); err2 == nil { + processName = string(name) + } else { + r.processAlreadyExitedCount++ + processName = meta.ProcessName } - var ddService string + var service string var ok bool for _, envVarName := range ServiceNameEnvVars { - ddService, ok = meta.EnvVars[envVarName] + service, ok = meta.EnvVars[envVarName] if ok { break } } + // If DD_SERVICE is not set and the executable path is different from the one in the trace // (meaning the process has probably exec'd into another binary) // then attempt to retrieve again DD_SERVICE. @@ -728,7 +447,22 @@ func (r *DatadogReporter) addProcessMetadata(meta *samples.TraceEventMeta) { // (without the final container environment) that then execs into the final binary // with the container environment. if !ok && meta.ExecutablePath != execPath { - ddService = getServiceName(pid) + service = getServiceName(pid) + } + + inferredService := false + if service == "" && execPath != "" { + service = path.Base(execPath) + inferredService = true + } + + if service == "" && rsamples.IsKernel(trace.Frames) { + service = "system" + } + + if service == "" { + service = unknownServiceStr + inferredService = true } var containerMetadata containermetadata.ContainerMetadata @@ -748,26 +482,16 @@ func (r *DatadogReporter) addProcessMetadata(meta *samples.TraceEventMeta) { } } - r.processes.Add(pid, processMetadata{ - updatedAt: time.Now(), - executablePath: execPath, - containerMetadata: containerMetadata, - ddService: ddService, - }) -} - -func getExecutablePath(processMeta *processMetadata, traceKey *traceAndMetaKey) string { - if processMeta.executablePath != "" { - // If we were unable to get the process metadata, we use the executable path - // from the trace key. - // This can happen if the process has already exited when process metadata - // was collected. - // We prioritize the executable path from process metadata over the trace key - // because in some cases executable path from trace key is taken too early in - // the process lifetime, eg. before the process execs into another binary. - return processMeta.executablePath - } - return traceKey.executablePath + pMeta := rsamples.ProcessMetadata{ + UpdatedAt: time.Now(), + ExecutablePath: execPath, + ProcessName: processName, + ContainerMetadata: containerMetadata, + Service: service, + InferredService: inferredService, + } + r.processes.Add(pid, pMeta) + return pMeta } func getServiceNameFromProcPath(pid libpf.PID, procRoot string) string { @@ -806,11 +530,3 @@ func parseServiceNameFromEnvironData(envData []byte) string { func getServiceName(pid libpf.PID) string { return getServiceNameFromProcPath(pid, "") } - -func isKernel(traceInfo *samples.TraceEvents) bool { - if len(traceInfo.Frames) == 0 { - return false - } - - return traceInfo.Frames[len(traceInfo.Frames)-1].Value().Type == libpf.KernelFrame -} diff --git a/reporter/pprof/profile_builder.go b/reporter/pprof/profile_builder.go new file mode 100644 index 00000000..7ec59f1d --- /dev/null +++ b/reporter/pprof/profile_builder.go @@ -0,0 +1,285 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2025 Datadog, Inc. + +package pprof + +import ( + "fmt" + "path" + "time" + + lru "github.com/elastic/go-freelru" + pprofile "github.com/google/pprof/profile" + "go.opentelemetry.io/ebpf-profiler/libpf" + + "github.com/DataDog/dd-otel-host-profiler/containermetadata" + samples "github.com/DataDog/dd-otel-host-profiler/reporter/samples" +) + +const unknownStr = "UNKNOWN" + +var dummyFileID = libpf.NewFileID(0, 0) + +type ProfileBuilder struct { + profile *pprofile.Profile + funcMap map[funcInfo]*pprofile.Function + fileIDtoMapping map[libpf.FileID]*pprofile.Mapping + executables *lru.SyncedLRU[libpf.FileID, samples.ExecInfo] + processes *lru.SyncedLRU[libpf.PID, samples.ProcessMetadata] + timeline bool + totalSampleCount int + pidsWithNoMetadata libpf.Set[libpf.PID] + samplingPeriod int64 +} + +type ProfileStats struct { + TotalSampleCount int +} + +func NewProfileBuilder(start, end time.Time, samplesPerSecond int, numSamples int, timeline bool, + executables *lru.SyncedLRU[libpf.FileID, samples.ExecInfo], processes *lru.SyncedLRU[libpf.PID, samples.ProcessMetadata]) *ProfileBuilder { + // funcMap is a temporary helper that will build the Function array + // in profile and make sure information is deduplicated. + funcMap := make(map[funcInfo]*pprofile.Function) + fileIDtoMapping := make(map[libpf.FileID]*pprofile.Mapping) + + samplingPeriod := 1000000000 / int64(samplesPerSecond) + profile := &pprofile.Profile{ + SampleType: []*pprofile.ValueType{{Type: "cpu-samples", Unit: "count"}, + {Type: "cpu-time", Unit: "nanoseconds"}}, + Sample: make([]*pprofile.Sample, 0, numSamples), + PeriodType: &pprofile.ValueType{Type: "cpu-time", Unit: "nanoseconds"}, + Period: samplingPeriod, + DefaultSampleType: "cpu-time", + } + profile.DurationNanos = end.Sub(start).Nanoseconds() + profile.TimeNanos = start.UnixNano() + + return &ProfileBuilder{ + profile: profile, + funcMap: funcMap, + fileIDtoMapping: fileIDtoMapping, + executables: executables, + processes: processes, + pidsWithNoMetadata: libpf.Set[libpf.PID]{}, + timeline: timeline, + samplingPeriod: samplingPeriod, + } +} + +func (b *ProfileBuilder) AddEvents(events samples.KeyToEventMapping) { + for traceKey, traceInfo := range events { + sample := &pprofile.Sample{} + + // Walk every frame of the trace. + for _, uniqueFrame := range traceInfo.Frames { + frame := uniqueFrame.Value() + loc := b.createPProfLocation(uint64(frame.AddressOrLineno)) + + switch frameKind := frame.Type; frameKind { + case libpf.NativeFrame: + // As native frames are resolved in the backend, we use Mapping to + // report these frames. + loc.Mapping = b.createPprofMappingForFileID(frame.FileID) + loc.Line = append(loc.Line, pprofile.Line{Function: b.createPprofFunctionEntry("", loc.Mapping.File)}) + case libpf.AbortFrame: + // Next step: Figure out how the OTLP protocol + // could handle artificial frames, like AbortFrame, + // that are not originate from a native or interpreted + // program. + default: + sourceFile := frame.SourceFile.String() + if frameKind == libpf.KernelFrame { + sourceFile = "kernel" + } + + // Store interpreted frame information as Line message: + line := pprofile.Line{ + Line: int64(frame.SourceLine), + Function: b.createPprofFunctionEntry(frame.FunctionName.String(), sourceFile), + } + + loc.Line = append(loc.Line, line) + loc.Mapping = b.getDummyMapping() + } + sample.Location = append(sample.Location, loc) + } + + processMeta, _ := b.processes.Get(traceKey.Pid) + execPath := processMeta.ExecutablePath + + var baseExec string + + switch { + case execPath != "": + baseExec = path.Base(execPath) + + case samples.IsKernel(traceInfo.Frames): + execPath = "kernel" + if processMeta.ProcessName != "" { + baseExec = processMeta.ProcessName + } else { + baseExec = execPath + } + + default: + execPath = traceKey.Comm + baseExec = execPath + } + + if execPath != "" { + loc := b.createPProfLocation(0) + m := b.createPprofFunctionEntry(baseExec, execPath) + loc.Line = append(loc.Line, pprofile.Line{Function: m}) + sample.Location = append(sample.Location, loc) + } + + var count int64 = 1 + if b.timeline { + count = int64(len(traceInfo.Timestamps)) + } + + labels := make(map[string][]string) + addTraceLabels(labels, traceKey, processMeta.ContainerMetadata, baseExec) + sample.Label = labels + sample.Value = append(sample.Value, count, count*b.samplingPeriod) + + if !b.timeline { + b.profile.Sample = append(b.profile.Sample, sample) + } else { + for _, ts := range traceInfo.Timestamps { + sampleWithTimestamp := &pprofile.Sample{} + *sampleWithTimestamp = *sample + sampleWithTimestamp.NumLabel = make(map[string][]int64) + sampleWithTimestamp.NumLabel["timestamp_ns"] = append(sampleWithTimestamp.NumLabel["timestamp_ns"], int64(ts)) + b.profile.Sample = append(b.profile.Sample, sampleWithTimestamp) + } + } + b.totalSampleCount += len(traceInfo.Timestamps) + } +} + +func (b *ProfileBuilder) Build() (*pprofile.Profile, ProfileStats) { + profile := b.profile.Compact() + stats := ProfileStats{ + TotalSampleCount: b.totalSampleCount, + } + return profile, stats +} + +// funcInfo is a helper to construct profile.Function messages. +type funcInfo struct { + name string + fileName string +} + +// createFunctionEntry adds a new function and returns its reference index. +func (b *ProfileBuilder) createPprofFunctionEntry(name, fileName string) *pprofile.Function { + key := funcInfo{ + name: name, + fileName: fileName, + } + if function, exists := b.funcMap[key]; exists { + return function + } + + idx := uint64(len(b.profile.Function)) + 1 + function := &pprofile.Function{ + ID: idx, + Name: name, + Filename: fileName, + } + b.profile.Function = append(b.profile.Function, function) + b.funcMap[key] = function + + return function +} + +// getDummyMappingIndex inserts or looks up a dummy entry for interpreted FileIDs. +func (b *ProfileBuilder) getDummyMapping() *pprofile.Mapping { + if tmpMapping, exists := b.fileIDtoMapping[dummyFileID]; exists { + return tmpMapping + } + + mapping := b.createPprofMapping("DUMMY", dummyFileID.StringNoQuotes()) + b.fileIDtoMapping[dummyFileID] = mapping + + return mapping +} + +func (b *ProfileBuilder) createPProfLocation(address uint64) *pprofile.Location { + idx := uint64(len(b.profile.Location)) + 1 + location := &pprofile.Location{ + ID: idx, + Address: address, + } + b.profile.Location = append(b.profile.Location, location) + return location +} + +func (b *ProfileBuilder) createPprofMappingForFileID(fileID libpf.FileID) *pprofile.Mapping { + if mapping, exists := b.fileIDtoMapping[fileID]; exists { + return mapping + } + + executionInfo, exists := b.executables.GetAndRefresh(fileID, samples.ExecutableCacheLifetime) + + fileName := unknownStr + buildID := fileID.StringNoQuotes() + if exists { + fileName = executionInfo.FileName + buildID = samples.GetBuildID(executionInfo.GnuBuildID, executionInfo.GoBuildID, buildID) + } + + mapping := b.createPprofMapping(fileName, buildID) + b.fileIDtoMapping[fileID] = mapping + return mapping +} + +func (b *ProfileBuilder) createPprofMapping(fileName, buildID string) *pprofile.Mapping { + idx := uint64(len(b.profile.Mapping)) + 1 + mapping := &pprofile.Mapping{ + ID: idx, + File: fileName, + Offset: 0, + BuildID: buildID, + } + b.profile.Mapping = append(b.profile.Mapping, mapping) + return mapping +} + +func addTraceLabels(labels map[string][]string, i samples.TraceAndMetaKey, containerMetadata containermetadata.ContainerMetadata, + processName string) { + // The naming has an impact on the backend side, + // this is why we use "thread id", "thread name" and "process name" + if i.Tid != 0 { + labels["thread id"] = append(labels["thread id"], fmt.Sprintf("%d", i.Tid)) + } + + if i.Pid != 0 { + labels["process_id"] = append(labels["process_id"], fmt.Sprintf("%d", i.Pid)) + } + + if i.Comm != "" { + labels["thread name"] = append(labels["thread name"], i.Comm) + } + + if processName != "" { + labels["process name"] = append(labels["process name"], processName) + } + + if containerMetadata.PodName != "" { + labels["pod_name"] = append(labels["pod_name"], containerMetadata.PodName) + } + + // In split by service, ContainerID always empty. + if containerMetadata.ContainerID != "" { + labels["container_id"] = append(labels["container_id"], containerMetadata.ContainerID) + } + + if containerMetadata.ContainerName != "" { + labels["container_name"] = append(labels["container_name"], containerMetadata.ContainerName) + } +} diff --git a/reporter/samples/samples.go b/reporter/samples/samples.go new file mode 100644 index 00000000..c6d3e6a4 --- /dev/null +++ b/reporter/samples/samples.go @@ -0,0 +1,79 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2025 Datadog, Inc. + +package samples + +import ( + "time" + + "go.opentelemetry.io/ebpf-profiler/libpf" + "go.opentelemetry.io/ebpf-profiler/reporter/samples" + + "github.com/DataDog/dd-otel-host-profiler/containermetadata" +) + +const ExecutableCacheLifetime = 1 * time.Hour // executable cache items will be removed if unused after this interval + +// ExecInfo enriches an executable with additional metadata. +type ExecInfo struct { + FileName string + GnuBuildID string + GoBuildID string +} + +// TraceAndMetaKey is the deduplication key for samples. This **must always** +// contain all trace fields that aren't already part of the trace hash to ensure +// that we don't accidentally merge traces with different fields. +type TraceAndMetaKey struct { + Hash libpf.TraceHash + // Comm is provided by the eBPF programs + Comm string + Pid libpf.PID + Tid libpf.PID +} + +type ProcessMetadata struct { + UpdatedAt time.Time + ExecutablePath string + ProcessName string + ContainerMetadata containermetadata.ContainerMetadata + Service string + InferredService bool +} + +type ServiceEntity struct { + Service string + EntityID string + InferredService bool +} + +type TraceEventsTree map[ServiceEntity]map[libpf.Origin]KeyToEventMapping + +type KeyToEventMapping map[TraceAndMetaKey]*samples.TraceEvents + +func GetBuildID(gnuBuildID, goBuildID, fileHash string) string { + // When building Go binaries, Bazel will set the Go build ID to "redacted" to + // achieve deterministic builds. Since Go 1.24, the Gnu Build ID is inherited + // from the Go build ID - if the Go build ID is "redacted", the Gnu Build ID will + // be a hash of "redacted". In this case, we should use the file hash instead of build IDs. + if goBuildID == "redacted" { + return fileHash + } + if gnuBuildID != "" { + return gnuBuildID + } + if goBuildID != "" { + return goBuildID + } + return fileHash +} + +func IsKernel(frames libpf.Frames) bool { + if len(frames) == 0 { + return false + } + + return frames[len(frames)-1].Value().Type == libpf.KernelFrame +} diff --git a/reporter/symbol_query_batching.go b/reporter/symbol_query_batching.go index 20f926bf..16e8e0de 100644 --- a/reporter/symbol_query_batching.go +++ b/reporter/symbol_query_batching.go @@ -13,6 +13,7 @@ import ( log "github.com/sirupsen/logrus" + samples "github.com/DataDog/dd-otel-host-profiler/reporter/samples" "github.com/DataDog/dd-otel-host-profiler/reporter/symbol" ) @@ -92,7 +93,7 @@ func ExecuteSymbolQueryBatch(ctx context.Context, batch SymbolQueryBatch, querie continue } - buildID := getBuildID(e.GnuBuildID(), e.GoBuildID(), e.FileHash()) + buildID := samples.GetBuildID(e.GnuBuildID(), e.GoBuildID(), e.FileHash()) if buildID == "" { result.fillWithError(errors.New("empty buildID")) continue