Skip to content

Commit fe54930

Browse files
support process tags in apm stats - first pass
1 parent 10b43a6 commit fe54930

File tree

12 files changed

+314
-117
lines changed

12 files changed

+314
-117
lines changed

pkg/proto/datadog/trace/stats.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ message ClientStatsPayload {
4949
string git_commit_sha = 13;
5050
// The image tag is obtained from a container's set of tags.
5151
string image_tag = 14;
52+
// The process tags hash is used as a key for agent stats agregation.
53+
uint64 process_tags_hash = 15;
54+
// The process tags contains a list of tags that are specific to the process.
55+
string process_tags = 16;
5256
}
5357

5458
// ClientStatsBucket is a time bucket containing aggregated stats.

pkg/proto/pbgo/trace/stats.pb.go

Lines changed: 84 additions & 61 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/proto/pbgo/trace/stats_gen.go

Lines changed: 58 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/proto/pbgo/trace/stats_vtproto.pb.go

Lines changed: 72 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/trace/api/api.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -440,6 +440,7 @@ const (
440440
// tagContainersTags specifies the name of the tag which holds key/value
441441
// pairs representing information about the container (Docker, EC2, etc).
442442
tagContainersTags = "_dd.tags.container"
443+
processTags = "_dd.tags.process"
443444
)
444445

445446
// TagStats returns the stats and tags coinciding with the information found in header.
@@ -664,6 +665,7 @@ func (r *HTTPReceiver) handleTraces(v Version, w http.ResponseWriter, req *http.
664665
ClientComputedTopLevel: isHeaderTrue(header.ComputedTopLevel, req.Header.Get(header.ComputedTopLevel)),
665666
ClientComputedStats: isHeaderTrue(header.ComputedStats, req.Header.Get(header.ComputedStats)),
666667
ClientDroppedP0s: droppedTracesFromHeader(req.Header, ts),
668+
ProcessTags: getProcessTagsFromHeader(req.Header, ts),
667669
}
668670
r.out <- payload
669671
}
@@ -700,6 +702,10 @@ func droppedTracesFromHeader(h http.Header, ts *info.TagStats) int64 {
700702
return dropped
701703
}
702704

705+
func getProcessTagsFromHeader(h http.Header, ts *info.TagStats) string {
706+
return h.Get(header.ProcessTags)
707+
}
708+
703709
// handleServices handle a request with a list of several services
704710
func (r *HTTPReceiver) handleServices(_ Version, w http.ResponseWriter, _ *http.Request) {
705711
httpOK(w)

pkg/trace/api/internal/header/headers.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ const (
1111
// with the number of traces contained in the payload.
1212
TraceCount = "X-Datadog-Trace-Count"
1313

14+
// ProcessTags is a list that contains process tags split by a ','.
15+
ProcessTags = "X-Datadog-Process-Tags"
16+
1417
// ContainerID specifies the name of the header which contains the ID of the
1518
// container where the request originated.
1619
// Deprecated in favor of Datadog-Entity-ID.

pkg/trace/stats/aggregation.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,13 @@ type BucketsAggregationKey struct {
4646

4747
// PayloadAggregationKey specifies the key by which a payload is aggregated.
4848
type PayloadAggregationKey struct {
49-
Env string
50-
Hostname string
51-
Version string
52-
ContainerID string
53-
GitCommitSha string
54-
ImageTag string
49+
Env string
50+
Hostname string
51+
Version string
52+
ContainerID string
53+
GitCommitSha string
54+
ImageTag string
55+
ProcessTagsHash uint64
5556
}
5657

5758
func getStatusCode(meta map[string]string, metrics map[string]float64) uint32 {
@@ -99,6 +100,10 @@ func NewAggregationFromSpan(s *StatSpan, origin string, aggKey PayloadAggregatio
99100
return agg
100101
}
101102

103+
func processTagsHash(processTags string) uint64 {
104+
return peerTagsHash(strings.Split(processTags, ","))
105+
}
106+
102107
func peerTagsHash(tags []string) uint64 {
103108
if len(tags) == 0 {
104109
return 0

pkg/trace/stats/concentrator.go

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ type Concentrator struct {
4141
exit chan struct{}
4242
exitWG sync.WaitGroup
4343
cidStats bool
44+
processStats bool
4445
agentEnv string
4546
agentHostname string
4647
agentVersion string
@@ -56,11 +57,13 @@ func NewConcentrator(conf *config.AgentConfig, writer Writer, now time.Time, sta
5657
BucketInterval: bsize,
5758
}, now)
5859
_, disabledCIDStats := conf.Features["disable_cid_stats"]
60+
_, disabledProcessStats := conf.Features["disable_process_stats"]
5961
c := Concentrator{
6062
spanConcentrator: sc,
6163
Writer: writer,
6264
exit: make(chan struct{}),
6365
cidStats: !disabledCIDStats,
66+
processStats: !disabledProcessStats,
6467
agentEnv: conf.DefaultEnv,
6568
agentHostname: conf.Hostname,
6669
agentVersion: conf.AgentVersion,
@@ -113,6 +116,7 @@ type Input struct {
113116
Traces []traceutil.ProcessedTrace
114117
ContainerID string
115118
ContainerTags []string
119+
ProcessTags string
116120
}
117121

118122
// NewStatsInput allocates a stats input for an incoming trace payload
@@ -125,16 +129,33 @@ func NewStatsInput(numChunks int, containerID string, clientComputedStats bool)
125129

126130
// Add applies the given input to the concentrator.
127131
func (c *Concentrator) Add(t Input) {
132+
tags := infraTags{
133+
containerID: t.ContainerID,
134+
containerTags: t.ContainerTags,
135+
processTagsHash: processTagsHash(t.ProcessTags),
136+
processTags: t.ProcessTags,
137+
}
128138
for _, trace := range t.Traces {
129-
c.addNow(&trace, t.ContainerID, t.ContainerTags)
139+
c.addNow(&trace, tags)
130140
}
131141
}
132142

143+
type infraTags struct {
144+
containerID string
145+
containerTags []string
146+
processTagsHash uint64
147+
processTags string
148+
}
149+
133150
// addNow adds the given input into the concentrator.
134151
// Callers must guard!
135-
func (c *Concentrator) addNow(pt *traceutil.ProcessedTrace, containerID string, containerTags []string) {
152+
func (c *Concentrator) addNow(pt *traceutil.ProcessedTrace, tags infraTags) {
136153
if !c.cidStats {
137-
containerID = ""
154+
tags.containerID = ""
155+
}
156+
if !c.processStats {
157+
tags.processTagsHash = 0
158+
tags.processTags = ""
138159
}
139160
hostname := pt.TracerHostname
140161
if hostname == "" {
@@ -146,17 +167,18 @@ func (c *Concentrator) addNow(pt *traceutil.ProcessedTrace, containerID string,
146167
}
147168
weight := weight(pt.Root)
148169
aggKey := PayloadAggregationKey{
149-
Env: env,
150-
Hostname: hostname,
151-
Version: pt.AppVersion,
152-
ContainerID: containerID,
153-
GitCommitSha: pt.GitCommitSha,
154-
ImageTag: pt.ImageTag,
170+
Env: env,
171+
Hostname: hostname,
172+
Version: pt.AppVersion,
173+
ContainerID: tags.containerID,
174+
GitCommitSha: pt.GitCommitSha,
175+
ImageTag: pt.ImageTag,
176+
ProcessTagsHash: tags.processTagsHash,
155177
}
156178
for _, s := range pt.TraceChunk.Spans {
157179
statSpan, ok := c.spanConcentrator.NewStatSpanFromPB(s, c.peerTagKeys)
158180
if ok {
159-
c.spanConcentrator.addSpan(statSpan, aggKey, containerID, containerTags, pt.TraceChunk.Origin, weight)
181+
c.spanConcentrator.addSpan(statSpan, aggKey, tags, pt.TraceChunk.Origin, weight)
160182
}
161183
}
162184
}

0 commit comments

Comments
 (0)