From 667e41e131cbca089387c12d85a2dacadd385178 Mon Sep 17 00:00:00 2001 From: Raphael Gavache Date: Wed, 2 Apr 2025 13:51:03 +0200 Subject: [PATCH 01/11] support process tags in apm stats - first pass --- pkg/proto/datadog/trace/stats.proto | 4 + pkg/proto/pbgo/trace/stats.pb.go | 145 +++++++++++++---------- pkg/proto/pbgo/trace/stats_gen.go | 66 +++++++++-- pkg/proto/pbgo/trace/stats_vtproto.pb.go | 72 +++++++++++ pkg/trace/agent/agent.go | 2 +- pkg/trace/api/api.go | 13 ++ pkg/trace/api/internal/header/headers.go | 3 + pkg/trace/api/payload.go | 3 + pkg/trace/stats/aggregation.go | 17 ++- pkg/trace/stats/concentrator.go | 46 +++++-- pkg/trace/stats/concentrator_test.go | 34 +++--- pkg/trace/stats/span_concentrator.go | 20 +++- pkg/trace/stats/statsraw.go | 15 ++- pkg/trace/stats/statsraw_test.go | 7 +- 14 files changed, 327 insertions(+), 120 deletions(-) diff --git a/pkg/proto/datadog/trace/stats.proto b/pkg/proto/datadog/trace/stats.proto index 37424d189adafd..cf94bd718d9dd5 100644 --- a/pkg/proto/datadog/trace/stats.proto +++ b/pkg/proto/datadog/trace/stats.proto @@ -49,6 +49,10 @@ message ClientStatsPayload { string git_commit_sha = 13; // The image tag is obtained from a container's set of tags. string image_tag = 14; + // The process tags hash is used as a key for agent stats agregation. + uint64 process_tags_hash = 15; + // The process tags contains a list of tags that are specific to the process. + string process_tags = 16; } // ClientStatsBucket is a time bucket containing aggregated stats. diff --git a/pkg/proto/pbgo/trace/stats.pb.go b/pkg/proto/pbgo/trace/stats.pb.go index ab9463be4d62ad..7e4ad366cd582c 100644 --- a/pkg/proto/pbgo/trace/stats.pb.go +++ b/pkg/proto/pbgo/trace/stats.pb.go @@ -126,7 +126,7 @@ type StatsPayload struct { AgentHostname string `protobuf:"bytes,1,opt,name=agentHostname,proto3" json:"agentHostname,omitempty"` AgentEnv string `protobuf:"bytes,2,opt,name=agentEnv,proto3" json:"agentEnv,omitempty"` // @gotags: json:"stats,omitempty" msg:"Stats,omitempty" - Stats []*ClientStatsPayload `protobuf:"bytes,3,rep,name=stats,proto3" json:"stats,omitempty" msg:"Stats,omitempty"` + Stats []*ClientStatsPayload `protobuf:"bytes,3,rep,name=stats,proto3" json:"stats,omitempty"` AgentVersion string `protobuf:"bytes,4,opt,name=agentVersion,proto3" json:"agentVersion,omitempty"` ClientComputed bool `protobuf:"varint,5,opt,name=clientComputed,proto3" json:"clientComputed,omitempty"` // splitPayload indicates if the payload is actually one of several payloads split out from a larger payload. @@ -218,7 +218,7 @@ type ClientStatsPayload struct { Env string `protobuf:"bytes,2,opt,name=env,proto3" json:"env,omitempty"` // env tag set on spans or in the tracers, used for aggregation Version string `protobuf:"bytes,3,opt,name=version,proto3" json:"version,omitempty"` // version tag set on spans or in the tracers, used for aggregation // @gotags: json:"stats,omitempty" msg:"Stats,omitempty" - Stats []*ClientStatsBucket `protobuf:"bytes,4,rep,name=stats,proto3" json:"stats,omitempty" msg:"Stats,omitempty"` + Stats []*ClientStatsBucket `protobuf:"bytes,4,rep,name=stats,proto3" json:"stats,omitempty"` Lang string `protobuf:"bytes,5,opt,name=lang,proto3" json:"lang,omitempty"` // informative field not used for aggregation TracerVersion string `protobuf:"bytes,6,opt,name=tracerVersion,proto3" json:"tracerVersion,omitempty"` // informative field not used for aggregation RuntimeID string `protobuf:"bytes,7,opt,name=runtimeID,proto3" json:"runtimeID,omitempty"` // used on stats payloads sent by the tracer to identify uniquely a message @@ -238,7 +238,11 @@ type ClientStatsPayload struct { // The git commit SHA is obtained from a trace, where it may be set through a tracer <-> source code integration. GitCommitSha string `protobuf:"bytes,13,opt,name=git_commit_sha,json=gitCommitSha,proto3" json:"git_commit_sha,omitempty"` // The image tag is obtained from a container's set of tags. - ImageTag string `protobuf:"bytes,14,opt,name=image_tag,json=imageTag,proto3" json:"image_tag,omitempty"` + ImageTag string `protobuf:"bytes,14,opt,name=image_tag,json=imageTag,proto3" json:"image_tag,omitempty"` + // The process tags hash is used as a key for agent stats agregation. + ProcessTagsHash uint64 `protobuf:"varint,15,opt,name=process_tags_hash,json=processTagsHash,proto3" json:"process_tags_hash,omitempty"` + // The process tags contains a list of tags that are specific to the process. + ProcessTags string `protobuf:"bytes,16,opt,name=process_tags,json=processTags,proto3" json:"process_tags,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -371,13 +375,27 @@ func (x *ClientStatsPayload) GetImageTag() string { return "" } +func (x *ClientStatsPayload) GetProcessTagsHash() uint64 { + if x != nil { + return x.ProcessTagsHash + } + return 0 +} + +func (x *ClientStatsPayload) GetProcessTags() string { + if x != nil { + return x.ProcessTags + } + return "" +} + // ClientStatsBucket is a time bucket containing aggregated stats. type ClientStatsBucket struct { state protoimpl.MessageState `protogen:"open.v1"` Start uint64 `protobuf:"varint,1,opt,name=start,proto3" json:"start,omitempty"` // bucket start in nanoseconds Duration uint64 `protobuf:"varint,2,opt,name=duration,proto3" json:"duration,omitempty"` // bucket duration in nanoseconds // @gotags: json:"stats,omitempty" msg:"Stats,omitempty" - Stats []*ClientGroupedStats `protobuf:"bytes,3,rep,name=stats,proto3" json:"stats,omitempty" msg:"Stats,omitempty"` + Stats []*ClientGroupedStats `protobuf:"bytes,3,rep,name=stats,proto3" json:"stats,omitempty"` // AgentTimeShift is the shift applied by the agent stats aggregator on bucket start // when the received bucket start is outside of the agent aggregation window AgentTimeShift int64 `protobuf:"varint,4,opt,name=agentTimeShift,proto3" json:"agentTimeShift,omitempty"` @@ -639,7 +657,7 @@ var file_datadog_trace_stats_proto_rawDesc = string([]byte{ 0x28, 0x08, 0x52, 0x0e, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x43, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x65, 0x64, 0x12, 0x22, 0x0a, 0x0c, 0x73, 0x70, 0x6c, 0x69, 0x74, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0c, 0x73, 0x70, 0x6c, 0x69, 0x74, 0x50, - 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0xc7, 0x03, 0x0a, 0x12, 0x43, 0x6c, 0x69, 0x65, 0x6e, + 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x96, 0x04, 0x0a, 0x12, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x73, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x68, 0x6f, 0x73, 0x74, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x68, 0x6f, 0x73, 0x74, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x65, 0x6e, 0x76, @@ -668,62 +686,67 @@ var file_datadog_trace_stats_proto_rawDesc = string([]byte{ 0x0d, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x67, 0x69, 0x74, 0x43, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x53, 0x68, 0x61, 0x12, 0x1b, 0x0a, 0x09, 0x69, 0x6d, 0x61, 0x67, 0x65, 0x5f, 0x74, 0x61, 0x67, 0x18, 0x0e, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x69, 0x6d, 0x61, 0x67, 0x65, 0x54, 0x61, 0x67, - 0x22, 0xa6, 0x01, 0x0a, 0x11, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x73, - 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x72, 0x74, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x05, 0x73, 0x74, 0x61, 0x72, 0x74, 0x12, 0x1a, 0x0a, 0x08, - 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x08, - 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x37, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, - 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x64, 0x6f, - 0x67, 0x2e, 0x74, 0x72, 0x61, 0x63, 0x65, 0x2e, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x47, 0x72, - 0x6f, 0x75, 0x70, 0x65, 0x64, 0x53, 0x74, 0x61, 0x74, 0x73, 0x52, 0x05, 0x73, 0x74, 0x61, 0x74, - 0x73, 0x12, 0x26, 0x0a, 0x0e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x53, 0x68, - 0x69, 0x66, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0e, 0x61, 0x67, 0x65, 0x6e, 0x74, - 0x54, 0x69, 0x6d, 0x65, 0x53, 0x68, 0x69, 0x66, 0x74, 0x22, 0xa9, 0x04, 0x0a, 0x12, 0x43, 0x6c, - 0x69, 0x65, 0x6e, 0x74, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x65, 0x64, 0x53, 0x74, 0x61, 0x74, 0x73, - 0x12, 0x18, 0x0a, 0x07, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x07, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, - 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x1a, - 0x0a, 0x08, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x08, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x28, 0x0a, 0x10, 0x48, 0x54, - 0x54, 0x50, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x5f, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x04, - 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0e, 0x48, 0x54, 0x54, 0x50, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, - 0x43, 0x6f, 0x64, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x05, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x17, 0x0a, 0x07, 0x44, 0x42, 0x5f, 0x74, - 0x79, 0x70, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x44, 0x42, 0x54, 0x79, 0x70, - 0x65, 0x12, 0x12, 0x0a, 0x04, 0x68, 0x69, 0x74, 0x73, 0x18, 0x07, 0x20, 0x01, 0x28, 0x04, 0x52, - 0x04, 0x68, 0x69, 0x74, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x73, 0x18, - 0x08, 0x20, 0x01, 0x28, 0x04, 0x52, 0x06, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x73, 0x12, 0x1a, 0x0a, - 0x08, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x09, 0x20, 0x01, 0x28, 0x04, 0x52, - 0x08, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x1c, 0x0a, 0x09, 0x6f, 0x6b, 0x53, - 0x75, 0x6d, 0x6d, 0x61, 0x72, 0x79, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x6f, 0x6b, - 0x53, 0x75, 0x6d, 0x6d, 0x61, 0x72, 0x79, 0x12, 0x22, 0x0a, 0x0c, 0x65, 0x72, 0x72, 0x6f, 0x72, - 0x53, 0x75, 0x6d, 0x6d, 0x61, 0x72, 0x79, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0c, 0x65, - 0x72, 0x72, 0x6f, 0x72, 0x53, 0x75, 0x6d, 0x6d, 0x61, 0x72, 0x79, 0x12, 0x1e, 0x0a, 0x0a, 0x73, - 0x79, 0x6e, 0x74, 0x68, 0x65, 0x74, 0x69, 0x63, 0x73, 0x18, 0x0c, 0x20, 0x01, 0x28, 0x08, 0x52, - 0x0a, 0x73, 0x79, 0x6e, 0x74, 0x68, 0x65, 0x74, 0x69, 0x63, 0x73, 0x12, 0x22, 0x0a, 0x0c, 0x74, - 0x6f, 0x70, 0x4c, 0x65, 0x76, 0x65, 0x6c, 0x48, 0x69, 0x74, 0x73, 0x18, 0x0d, 0x20, 0x01, 0x28, - 0x04, 0x52, 0x0c, 0x74, 0x6f, 0x70, 0x4c, 0x65, 0x76, 0x65, 0x6c, 0x48, 0x69, 0x74, 0x73, 0x12, - 0x1b, 0x0a, 0x09, 0x73, 0x70, 0x61, 0x6e, 0x5f, 0x6b, 0x69, 0x6e, 0x64, 0x18, 0x0f, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x08, 0x73, 0x70, 0x61, 0x6e, 0x4b, 0x69, 0x6e, 0x64, 0x12, 0x1b, 0x0a, 0x09, - 0x70, 0x65, 0x65, 0x72, 0x5f, 0x74, 0x61, 0x67, 0x73, 0x18, 0x10, 0x20, 0x03, 0x28, 0x09, 0x52, - 0x08, 0x70, 0x65, 0x65, 0x72, 0x54, 0x61, 0x67, 0x73, 0x12, 0x3a, 0x0a, 0x0d, 0x69, 0x73, 0x5f, - 0x74, 0x72, 0x61, 0x63, 0x65, 0x5f, 0x72, 0x6f, 0x6f, 0x74, 0x18, 0x11, 0x20, 0x01, 0x28, 0x0e, - 0x32, 0x16, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x64, 0x6f, 0x67, 0x2e, 0x74, 0x72, 0x61, 0x63, 0x65, - 0x2e, 0x54, 0x72, 0x69, 0x6c, 0x65, 0x61, 0x6e, 0x52, 0x0b, 0x69, 0x73, 0x54, 0x72, 0x61, 0x63, - 0x65, 0x52, 0x6f, 0x6f, 0x74, 0x12, 0x28, 0x0a, 0x10, 0x47, 0x52, 0x50, 0x43, 0x5f, 0x73, 0x74, - 0x61, 0x74, 0x75, 0x73, 0x5f, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x12, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x0e, 0x47, 0x52, 0x50, 0x43, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x43, 0x6f, 0x64, 0x65, 0x4a, - 0x04, 0x08, 0x0e, 0x10, 0x0f, 0x2a, 0x2b, 0x0a, 0x07, 0x54, 0x72, 0x69, 0x6c, 0x65, 0x61, 0x6e, - 0x12, 0x0b, 0x0a, 0x07, 0x4e, 0x4f, 0x54, 0x5f, 0x53, 0x45, 0x54, 0x10, 0x00, 0x12, 0x08, 0x0a, - 0x04, 0x54, 0x52, 0x55, 0x45, 0x10, 0x01, 0x12, 0x09, 0x0a, 0x05, 0x46, 0x41, 0x4c, 0x53, 0x45, - 0x10, 0x02, 0x2a, 0x52, 0x0a, 0x0d, 0x54, 0x72, 0x61, 0x63, 0x65, 0x52, 0x6f, 0x6f, 0x74, 0x46, - 0x6c, 0x61, 0x67, 0x12, 0x16, 0x0a, 0x12, 0x44, 0x45, 0x50, 0x52, 0x45, 0x43, 0x41, 0x54, 0x45, - 0x44, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x53, 0x45, 0x54, 0x10, 0x00, 0x12, 0x13, 0x0a, 0x0f, 0x44, - 0x45, 0x50, 0x52, 0x45, 0x43, 0x41, 0x54, 0x45, 0x44, 0x5f, 0x54, 0x52, 0x55, 0x45, 0x10, 0x01, - 0x12, 0x14, 0x0a, 0x10, 0x44, 0x45, 0x50, 0x52, 0x45, 0x43, 0x41, 0x54, 0x45, 0x44, 0x5f, 0x46, - 0x41, 0x4c, 0x53, 0x45, 0x10, 0x02, 0x42, 0x16, 0x5a, 0x14, 0x70, 0x6b, 0x67, 0x2f, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x2f, 0x70, 0x62, 0x67, 0x6f, 0x2f, 0x74, 0x72, 0x61, 0x63, 0x65, 0x62, 0x06, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x12, 0x2a, 0x0a, 0x11, 0x70, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x5f, 0x74, 0x61, 0x67, 0x73, + 0x5f, 0x68, 0x61, 0x73, 0x68, 0x18, 0x0f, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0f, 0x70, 0x72, 0x6f, + 0x63, 0x65, 0x73, 0x73, 0x54, 0x61, 0x67, 0x73, 0x48, 0x61, 0x73, 0x68, 0x12, 0x21, 0x0a, 0x0c, + 0x70, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x5f, 0x74, 0x61, 0x67, 0x73, 0x18, 0x10, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x0b, 0x70, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x54, 0x61, 0x67, 0x73, 0x22, + 0xa6, 0x01, 0x0a, 0x11, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x73, 0x42, + 0x75, 0x63, 0x6b, 0x65, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x72, 0x74, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x04, 0x52, 0x05, 0x73, 0x74, 0x61, 0x72, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x64, + 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x08, 0x64, + 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x37, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x73, + 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x64, 0x6f, 0x67, + 0x2e, 0x74, 0x72, 0x61, 0x63, 0x65, 0x2e, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x47, 0x72, 0x6f, + 0x75, 0x70, 0x65, 0x64, 0x53, 0x74, 0x61, 0x74, 0x73, 0x52, 0x05, 0x73, 0x74, 0x61, 0x74, 0x73, + 0x12, 0x26, 0x0a, 0x0e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x53, 0x68, 0x69, + 0x66, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x54, + 0x69, 0x6d, 0x65, 0x53, 0x68, 0x69, 0x66, 0x74, 0x22, 0xa9, 0x04, 0x0a, 0x12, 0x43, 0x6c, 0x69, + 0x65, 0x6e, 0x74, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x65, 0x64, 0x53, 0x74, 0x61, 0x74, 0x73, 0x12, + 0x18, 0x0a, 0x07, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x07, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, + 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x1a, 0x0a, + 0x08, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x08, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x28, 0x0a, 0x10, 0x48, 0x54, 0x54, + 0x50, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x5f, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x04, 0x20, + 0x01, 0x28, 0x0d, 0x52, 0x0e, 0x48, 0x54, 0x54, 0x50, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x43, + 0x6f, 0x64, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x17, 0x0a, 0x07, 0x44, 0x42, 0x5f, 0x74, 0x79, + 0x70, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x44, 0x42, 0x54, 0x79, 0x70, 0x65, + 0x12, 0x12, 0x0a, 0x04, 0x68, 0x69, 0x74, 0x73, 0x18, 0x07, 0x20, 0x01, 0x28, 0x04, 0x52, 0x04, + 0x68, 0x69, 0x74, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x73, 0x18, 0x08, + 0x20, 0x01, 0x28, 0x04, 0x52, 0x06, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x73, 0x12, 0x1a, 0x0a, 0x08, + 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x09, 0x20, 0x01, 0x28, 0x04, 0x52, 0x08, + 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x1c, 0x0a, 0x09, 0x6f, 0x6b, 0x53, 0x75, + 0x6d, 0x6d, 0x61, 0x72, 0x79, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x6f, 0x6b, 0x53, + 0x75, 0x6d, 0x6d, 0x61, 0x72, 0x79, 0x12, 0x22, 0x0a, 0x0c, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x53, + 0x75, 0x6d, 0x6d, 0x61, 0x72, 0x79, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0c, 0x65, 0x72, + 0x72, 0x6f, 0x72, 0x53, 0x75, 0x6d, 0x6d, 0x61, 0x72, 0x79, 0x12, 0x1e, 0x0a, 0x0a, 0x73, 0x79, + 0x6e, 0x74, 0x68, 0x65, 0x74, 0x69, 0x63, 0x73, 0x18, 0x0c, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0a, + 0x73, 0x79, 0x6e, 0x74, 0x68, 0x65, 0x74, 0x69, 0x63, 0x73, 0x12, 0x22, 0x0a, 0x0c, 0x74, 0x6f, + 0x70, 0x4c, 0x65, 0x76, 0x65, 0x6c, 0x48, 0x69, 0x74, 0x73, 0x18, 0x0d, 0x20, 0x01, 0x28, 0x04, + 0x52, 0x0c, 0x74, 0x6f, 0x70, 0x4c, 0x65, 0x76, 0x65, 0x6c, 0x48, 0x69, 0x74, 0x73, 0x12, 0x1b, + 0x0a, 0x09, 0x73, 0x70, 0x61, 0x6e, 0x5f, 0x6b, 0x69, 0x6e, 0x64, 0x18, 0x0f, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x08, 0x73, 0x70, 0x61, 0x6e, 0x4b, 0x69, 0x6e, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x70, + 0x65, 0x65, 0x72, 0x5f, 0x74, 0x61, 0x67, 0x73, 0x18, 0x10, 0x20, 0x03, 0x28, 0x09, 0x52, 0x08, + 0x70, 0x65, 0x65, 0x72, 0x54, 0x61, 0x67, 0x73, 0x12, 0x3a, 0x0a, 0x0d, 0x69, 0x73, 0x5f, 0x74, + 0x72, 0x61, 0x63, 0x65, 0x5f, 0x72, 0x6f, 0x6f, 0x74, 0x18, 0x11, 0x20, 0x01, 0x28, 0x0e, 0x32, + 0x16, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x64, 0x6f, 0x67, 0x2e, 0x74, 0x72, 0x61, 0x63, 0x65, 0x2e, + 0x54, 0x72, 0x69, 0x6c, 0x65, 0x61, 0x6e, 0x52, 0x0b, 0x69, 0x73, 0x54, 0x72, 0x61, 0x63, 0x65, + 0x52, 0x6f, 0x6f, 0x74, 0x12, 0x28, 0x0a, 0x10, 0x47, 0x52, 0x50, 0x43, 0x5f, 0x73, 0x74, 0x61, + 0x74, 0x75, 0x73, 0x5f, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x12, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0e, + 0x47, 0x52, 0x50, 0x43, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x43, 0x6f, 0x64, 0x65, 0x4a, 0x04, + 0x08, 0x0e, 0x10, 0x0f, 0x2a, 0x2b, 0x0a, 0x07, 0x54, 0x72, 0x69, 0x6c, 0x65, 0x61, 0x6e, 0x12, + 0x0b, 0x0a, 0x07, 0x4e, 0x4f, 0x54, 0x5f, 0x53, 0x45, 0x54, 0x10, 0x00, 0x12, 0x08, 0x0a, 0x04, + 0x54, 0x52, 0x55, 0x45, 0x10, 0x01, 0x12, 0x09, 0x0a, 0x05, 0x46, 0x41, 0x4c, 0x53, 0x45, 0x10, + 0x02, 0x2a, 0x52, 0x0a, 0x0d, 0x54, 0x72, 0x61, 0x63, 0x65, 0x52, 0x6f, 0x6f, 0x74, 0x46, 0x6c, + 0x61, 0x67, 0x12, 0x16, 0x0a, 0x12, 0x44, 0x45, 0x50, 0x52, 0x45, 0x43, 0x41, 0x54, 0x45, 0x44, + 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x53, 0x45, 0x54, 0x10, 0x00, 0x12, 0x13, 0x0a, 0x0f, 0x44, 0x45, + 0x50, 0x52, 0x45, 0x43, 0x41, 0x54, 0x45, 0x44, 0x5f, 0x54, 0x52, 0x55, 0x45, 0x10, 0x01, 0x12, + 0x14, 0x0a, 0x10, 0x44, 0x45, 0x50, 0x52, 0x45, 0x43, 0x41, 0x54, 0x45, 0x44, 0x5f, 0x46, 0x41, + 0x4c, 0x53, 0x45, 0x10, 0x02, 0x42, 0x16, 0x5a, 0x14, 0x70, 0x6b, 0x67, 0x2f, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x2f, 0x70, 0x62, 0x67, 0x6f, 0x2f, 0x74, 0x72, 0x61, 0x63, 0x65, 0x62, 0x06, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x33, }) var ( diff --git a/pkg/proto/pbgo/trace/stats_gen.go b/pkg/proto/pbgo/trace/stats_gen.go index 32f27bd5d5ccfd..1102c84ce02c4d 100644 --- a/pkg/proto/pbgo/trace/stats_gen.go +++ b/pkg/proto/pbgo/trace/stats_gen.go @@ -990,6 +990,18 @@ func (z *ClientStatsPayload) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "ImageTag") return } + case "ProcessTagsHash": + z.ProcessTagsHash, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "ProcessTagsHash") + return + } + case "ProcessTags": + z.ProcessTags, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "ProcessTags") + return + } default: err = dc.Skip() if err != nil { @@ -1004,15 +1016,15 @@ func (z *ClientStatsPayload) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *ClientStatsPayload) EncodeMsg(en *msgp.Writer) (err error) { // check for omitted fields - zb0001Len := uint32(14) - var zb0001Mask uint16 /* 14 bits */ + zb0001Len := uint32(16) + var zb0001Mask uint16 /* 16 bits */ _ = zb0001Mask if z.Stats == nil { zb0001Len-- zb0001Mask |= 0x8 } // variable map header, size zb0001Len - err = en.Append(0x80 | uint8(zb0001Len)) + err = en.WriteMapHeader(zb0001Len) if err != nil { return } @@ -1182,6 +1194,26 @@ func (z *ClientStatsPayload) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "ImageTag") return } + // write "ProcessTagsHash" + err = en.Append(0xaf, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x54, 0x61, 0x67, 0x73, 0x48, 0x61, 0x73, 0x68) + if err != nil { + return + } + err = en.WriteUint64(z.ProcessTagsHash) + if err != nil { + err = msgp.WrapError(err, "ProcessTagsHash") + return + } + // write "ProcessTags" + err = en.Append(0xab, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x54, 0x61, 0x67, 0x73) + if err != nil { + return + } + err = en.WriteString(z.ProcessTags) + if err != nil { + err = msgp.WrapError(err, "ProcessTags") + return + } } return } @@ -1190,15 +1222,15 @@ func (z *ClientStatsPayload) EncodeMsg(en *msgp.Writer) (err error) { func (z *ClientStatsPayload) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) // check for omitted fields - zb0001Len := uint32(14) - var zb0001Mask uint16 /* 14 bits */ + zb0001Len := uint32(16) + var zb0001Mask uint16 /* 16 bits */ _ = zb0001Mask if z.Stats == nil { zb0001Len-- zb0001Mask |= 0x8 } // variable map header, size zb0001Len - o = append(o, 0x80|uint8(zb0001Len)) + o = msgp.AppendMapHeader(o, zb0001Len) // skip if no fields are to be emitted if zb0001Len != 0 { @@ -1260,6 +1292,12 @@ func (z *ClientStatsPayload) MarshalMsg(b []byte) (o []byte, err error) { // string "ImageTag" o = append(o, 0xa8, 0x49, 0x6d, 0x61, 0x67, 0x65, 0x54, 0x61, 0x67) o = msgp.AppendString(o, z.ImageTag) + // string "ProcessTagsHash" + o = append(o, 0xaf, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x54, 0x61, 0x67, 0x73, 0x48, 0x61, 0x73, 0x68) + o = msgp.AppendUint64(o, z.ProcessTagsHash) + // string "ProcessTags" + o = append(o, 0xab, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x54, 0x61, 0x67, 0x73) + o = msgp.AppendString(o, z.ProcessTags) } return } @@ -1403,6 +1441,18 @@ func (z *ClientStatsPayload) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "ImageTag") return } + case "ProcessTagsHash": + z.ProcessTagsHash, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "ProcessTagsHash") + return + } + case "ProcessTags": + z.ProcessTags, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "ProcessTags") + return + } default: bts, err = msgp.Skip(bts) if err != nil { @@ -1417,7 +1467,7 @@ func (z *ClientStatsPayload) UnmarshalMsg(bts []byte) (o []byte, err error) { // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *ClientStatsPayload) Msgsize() (s int) { - s = 1 + 9 + msgp.StringPrefixSize + len(z.Hostname) + 4 + msgp.StringPrefixSize + len(z.Env) + 8 + msgp.StringPrefixSize + len(z.Version) + 6 + msgp.ArrayHeaderSize + s = 3 + 9 + msgp.StringPrefixSize + len(z.Hostname) + 4 + msgp.StringPrefixSize + len(z.Env) + 8 + msgp.StringPrefixSize + len(z.Version) + 6 + msgp.ArrayHeaderSize for za0001 := range z.Stats { if z.Stats[za0001] == nil { s += msgp.NilSize @@ -1429,7 +1479,7 @@ func (z *ClientStatsPayload) Msgsize() (s int) { for za0002 := range z.Tags { s += msgp.StringPrefixSize + len(z.Tags[za0002]) } - s += 13 + msgp.StringPrefixSize + len(z.GitCommitSha) + 9 + msgp.StringPrefixSize + len(z.ImageTag) + s += 13 + msgp.StringPrefixSize + len(z.GitCommitSha) + 9 + msgp.StringPrefixSize + len(z.ImageTag) + 16 + msgp.Uint64Size + 12 + msgp.StringPrefixSize + len(z.ProcessTags) return } diff --git a/pkg/proto/pbgo/trace/stats_vtproto.pb.go b/pkg/proto/pbgo/trace/stats_vtproto.pb.go index f860121cd2ae2f..0e6fa0881cf2eb 100644 --- a/pkg/proto/pbgo/trace/stats_vtproto.pb.go +++ b/pkg/proto/pbgo/trace/stats_vtproto.pb.go @@ -134,6 +134,20 @@ func (m *ClientStatsPayload) MarshalToSizedBufferVT(dAtA []byte) (int, error) { i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if len(m.ProcessTags) > 0 { + i -= len(m.ProcessTags) + copy(dAtA[i:], m.ProcessTags) + i = protohelpers.EncodeVarint(dAtA, i, uint64(len(m.ProcessTags))) + i-- + dAtA[i] = 0x1 + i-- + dAtA[i] = 0x82 + } + if m.ProcessTagsHash != 0 { + i = protohelpers.EncodeVarint(dAtA, i, uint64(m.ProcessTagsHash)) + i-- + dAtA[i] = 0x78 + } if len(m.ImageTag) > 0 { i -= len(m.ImageTag) copy(dAtA[i:], m.ImageTag) @@ -550,6 +564,13 @@ func (m *ClientStatsPayload) SizeVT() (n int) { if l > 0 { n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) } + if m.ProcessTagsHash != 0 { + n += 1 + protohelpers.SizeOfVarint(uint64(m.ProcessTagsHash)) + } + l = len(m.ProcessTags) + if l > 0 { + n += 2 + l + protohelpers.SizeOfVarint(uint64(l)) + } n += len(m.unknownFields) return n } @@ -1339,6 +1360,57 @@ func (m *ClientStatsPayload) UnmarshalVT(dAtA []byte) error { } m.ImageTag = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex + case 15: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field ProcessTagsHash", wireType) + } + m.ProcessTagsHash = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.ProcessTagsHash |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 16: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ProcessTags", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.ProcessTags = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:]) diff --git a/pkg/trace/agent/agent.go b/pkg/trace/agent/agent.go index 8e3e64fbe7d10f..55b11187ff27bf 100644 --- a/pkg/trace/agent/agent.go +++ b/pkg/trace/agent/agent.go @@ -313,7 +313,7 @@ func (a *Agent) Process(p *api.Payload) { defer a.Timing.Since("datadog.trace_agent.internal.process_payload_ms", now) ts := p.Source sampledChunks := new(writer.SampledChunks) - statsInput := stats.NewStatsInput(len(p.TracerPayload.Chunks), p.TracerPayload.ContainerID, p.ClientComputedStats) + statsInput := stats.NewStatsInput(len(p.TracerPayload.Chunks), p.TracerPayload.ContainerID, p.ClientComputedStats, p.ProcessTags) p.TracerPayload.Env = traceutil.NormalizeTagValue(p.TracerPayload.Env) diff --git a/pkg/trace/api/api.go b/pkg/trace/api/api.go index bcfad017cb9bd6..5193f21361b593 100644 --- a/pkg/trace/api/api.go +++ b/pkg/trace/api/api.go @@ -440,6 +440,7 @@ const ( // tagContainersTags specifies the name of the tag which holds key/value // pairs representing information about the container (Docker, EC2, etc). tagContainersTags = "_dd.tags.container" + tagProcessTags = "_dd.tags.process" ) // TagStats returns the stats and tags coinciding with the information found in header. @@ -657,6 +658,13 @@ func (r *HTTPReceiver) handleTraces(v Version, w http.ResponseWriter, req *http. } tp.Tags[tagContainersTags] = ctags } + ptags := getProcessTagsFromHeader(req.Header, ts) + if ptags != "" { + if tp.Tags == nil { + tp.Tags = make(map[string]string) + } + tp.Tags[tagProcessTags] = ptags + } payload := &Payload{ Source: ts, @@ -664,6 +672,7 @@ func (r *HTTPReceiver) handleTraces(v Version, w http.ResponseWriter, req *http. ClientComputedTopLevel: isHeaderTrue(header.ComputedTopLevel, req.Header.Get(header.ComputedTopLevel)), ClientComputedStats: isHeaderTrue(header.ComputedStats, req.Header.Get(header.ComputedStats)), ClientDroppedP0s: droppedTracesFromHeader(req.Header, ts), + ProcessTags: ptags, } r.out <- payload } @@ -700,6 +709,10 @@ func droppedTracesFromHeader(h http.Header, ts *info.TagStats) int64 { return dropped } +func getProcessTagsFromHeader(h http.Header, ts *info.TagStats) string { + return h.Get(header.ProcessTags) +} + // handleServices handle a request with a list of several services func (r *HTTPReceiver) handleServices(_ Version, w http.ResponseWriter, _ *http.Request) { httpOK(w) diff --git a/pkg/trace/api/internal/header/headers.go b/pkg/trace/api/internal/header/headers.go index 7f2fbc48b27ab7..1a2a6a5fd303a1 100644 --- a/pkg/trace/api/internal/header/headers.go +++ b/pkg/trace/api/internal/header/headers.go @@ -11,6 +11,9 @@ const ( // with the number of traces contained in the payload. TraceCount = "X-Datadog-Trace-Count" + // ProcessTags is a list that contains process tags split by a ','. + ProcessTags = "X-Datadog-Process-Tags" + // ContainerID specifies the name of the header which contains the ID of the // container where the request originated. // Deprecated in favor of Datadog-Entity-ID. diff --git a/pkg/trace/api/payload.go b/pkg/trace/api/payload.go index 1efc41aad09ca4..53073ea86b84ed 100644 --- a/pkg/trace/api/payload.go +++ b/pkg/trace/api/payload.go @@ -29,6 +29,9 @@ type Payload struct { // ClientDroppedP0s specifies the number of P0 traces chunks dropped by the client. ClientDroppedP0s int64 + + // ProcessTags is a list of tags describing an instrumented process. + ProcessTags string } // Chunks returns chunks in TracerPayload diff --git a/pkg/trace/stats/aggregation.go b/pkg/trace/stats/aggregation.go index 6cd51beea5a48a..5b0657ff6e0fe6 100644 --- a/pkg/trace/stats/aggregation.go +++ b/pkg/trace/stats/aggregation.go @@ -46,12 +46,13 @@ type BucketsAggregationKey struct { // PayloadAggregationKey specifies the key by which a payload is aggregated. type PayloadAggregationKey struct { - Env string - Hostname string - Version string - ContainerID string - GitCommitSha string - ImageTag string + Env string + Hostname string + Version string + ContainerID string + GitCommitSha string + ImageTag string + ProcessTagsHash uint64 } func getStatusCode(meta map[string]string, metrics map[string]float64) uint32 { @@ -99,6 +100,10 @@ func NewAggregationFromSpan(s *StatSpan, origin string, aggKey PayloadAggregatio return agg } +func processTagsHash(processTags string) uint64 { + return peerTagsHash(strings.Split(processTags, ",")) +} + func peerTagsHash(tags []string) uint64 { if len(tags) == 0 { return 0 diff --git a/pkg/trace/stats/concentrator.go b/pkg/trace/stats/concentrator.go index 06643e675596fd..a9e11e2d97b201 100644 --- a/pkg/trace/stats/concentrator.go +++ b/pkg/trace/stats/concentrator.go @@ -41,6 +41,7 @@ type Concentrator struct { exit chan struct{} exitWG sync.WaitGroup cidStats bool + processStats bool agentEnv string agentHostname string agentVersion string @@ -56,11 +57,13 @@ func NewConcentrator(conf *config.AgentConfig, writer Writer, now time.Time, sta BucketInterval: bsize, }, now) _, disabledCIDStats := conf.Features["disable_cid_stats"] + _, disabledProcessStats := conf.Features["disable_process_stats"] c := Concentrator{ spanConcentrator: sc, Writer: writer, exit: make(chan struct{}), cidStats: !disabledCIDStats, + processStats: !disabledProcessStats, agentEnv: conf.DefaultEnv, agentHostname: conf.Hostname, agentVersion: conf.AgentVersion, @@ -113,28 +116,46 @@ type Input struct { Traces []traceutil.ProcessedTrace ContainerID string ContainerTags []string + ProcessTags string } // NewStatsInput allocates a stats input for an incoming trace payload -func NewStatsInput(numChunks int, containerID string, clientComputedStats bool) Input { +func NewStatsInput(numChunks int, containerID string, clientComputedStats bool, processTags string) Input { if clientComputedStats { return Input{} } - return Input{Traces: make([]traceutil.ProcessedTrace, 0, numChunks), ContainerID: containerID} + return Input{Traces: make([]traceutil.ProcessedTrace, 0, numChunks), ContainerID: containerID, ProcessTags: processTags} } // Add applies the given input to the concentrator. func (c *Concentrator) Add(t Input) { + tags := infraTags{ + containerID: t.ContainerID, + containerTags: t.ContainerTags, + processTagsHash: processTagsHash(t.ProcessTags), + processTags: t.ProcessTags, + } for _, trace := range t.Traces { - c.addNow(&trace, t.ContainerID, t.ContainerTags) + c.addNow(&trace, tags) } } +type infraTags struct { + containerID string + containerTags []string + processTagsHash uint64 + processTags string +} + // addNow adds the given input into the concentrator. // Callers must guard! -func (c *Concentrator) addNow(pt *traceutil.ProcessedTrace, containerID string, containerTags []string) { +func (c *Concentrator) addNow(pt *traceutil.ProcessedTrace, tags infraTags) { if !c.cidStats { - containerID = "" + tags.containerID = "" + } + if !c.processStats { + tags.processTagsHash = 0 + tags.processTags = "" } hostname := pt.TracerHostname if hostname == "" { @@ -146,17 +167,18 @@ func (c *Concentrator) addNow(pt *traceutil.ProcessedTrace, containerID string, } weight := weight(pt.Root) aggKey := PayloadAggregationKey{ - Env: env, - Hostname: hostname, - Version: pt.AppVersion, - ContainerID: containerID, - GitCommitSha: pt.GitCommitSha, - ImageTag: pt.ImageTag, + Env: env, + Hostname: hostname, + Version: pt.AppVersion, + ContainerID: tags.containerID, + GitCommitSha: pt.GitCommitSha, + ImageTag: pt.ImageTag, + ProcessTagsHash: tags.processTagsHash, } for _, s := range pt.TraceChunk.Spans { statSpan, ok := c.spanConcentrator.NewStatSpanFromPB(s, c.peerTagKeys) if ok { - c.spanConcentrator.addSpan(statSpan, aggKey, containerID, containerTags, pt.TraceChunk.Origin, weight) + c.spanConcentrator.addSpan(statSpan, aggKey, tags, pt.TraceChunk.Origin, weight) } } } diff --git a/pkg/trace/stats/concentrator_test.go b/pkg/trace/stats/concentrator_test.go index bb23afb56f6a35..722f1e083a1f64 100644 --- a/pkg/trace/stats/concentrator_test.go +++ b/pkg/trace/stats/concentrator_test.go @@ -142,7 +142,7 @@ func TestTracerHostname(t *testing.T) { traceutil.ComputeTopLevel(spans) testTrace := toProcessedTrace(spans, "none", "tracer-hostname", "", "", "") c := NewTestConcentrator(now) - c.addNow(testTrace, "", nil) + c.addNow(testTrace, infraTags{}) stats := c.flushNow(now.UnixNano()+int64(c.spanConcentrator.bufferLen)*testBucketInterval, false) assert.Equal("tracer-hostname", stats.Stats[0].Hostname) @@ -171,7 +171,7 @@ func TestConcentratorOldestTs(t *testing.T) { // Running cold, all spans in the past should end up in the current time bucket. flushTime := now.UnixNano() c := NewTestConcentrator(now) - c.addNow(testTrace, "", nil) + c.addNow(testTrace, infraTags{}) for i := 0; i < c.spanConcentrator.bufferLen; i++ { stats := c.flushNow(flushTime, false) @@ -209,7 +209,7 @@ func TestConcentratorOldestTs(t *testing.T) { flushTime := now.UnixNano() c := NewTestConcentrator(now) c.spanConcentrator.oldestTs = alignTs(flushTime, c.bsize) - int64(c.spanConcentrator.bufferLen-1)*c.bsize - c.addNow(testTrace, "", nil) + c.addNow(testTrace, infraTags{}) for i := 0; i < c.spanConcentrator.bufferLen-1; i++ { stats := c.flushNow(flushTime, false) @@ -291,7 +291,7 @@ func TestConcentratorStatsTotals(t *testing.T) { testTrace := toProcessedTrace(spans, "none", "", "", "", "") t.Run("ok", func(_ *testing.T) { - c.addNow(testTrace, "", nil) + c.addNow(testTrace, infraTags{}) var duration uint64 var hits uint64 @@ -494,7 +494,7 @@ func TestConcentratorStatsCounts(t *testing.T) { traceutil.ComputeTopLevel(spans) testTrace := toProcessedTrace(spans, "none", "", "", "", "") - c.addNow(testTrace, "", nil) + c.addNow(testTrace, infraTags{}) // flush every testBucketInterval flushTime := now.UnixNano() @@ -543,7 +543,7 @@ func TestRootTag(t *testing.T) { testTrace := toProcessedTrace(spans, "none", "", "", "", "") c := NewTestConcentrator(now) c.spanConcentrator.computeStatsBySpanKind = true - c.addNow(testTrace, "", nil) + c.addNow(testTrace, infraTags{}) expected := []*pb.ClientGroupedStats{ { @@ -599,7 +599,7 @@ func generateDistribution(t *testing.T, now time.Time, generator func(i int) int spans = append(spans, testSpan(now, uint64(i)+1, 0, generator(i), 0, "A1", "resource1", 0, nil)) } traceutil.ComputeTopLevel(spans) - c.addNow(toProcessedTrace(spans, "none", "", "", "", ""), "", nil) + c.addNow(toProcessedTrace(spans, "none", "", "", "", ""), infraTags{}) stats := c.flushNow(now.UnixNano()+c.bsize*int64(c.spanConcentrator.bufferLen), false) expectedFlushedTs := alignedNow assert.Len(stats.Stats, 1) @@ -651,7 +651,7 @@ func TestIgnoresPartialSpans(t *testing.T) { testTrace := toProcessedTrace(spans, "none", "tracer-hostname", "", "", "") c := NewTestConcentrator(now) - c.addNow(testTrace, "", nil) + c.addNow(testTrace, infraTags{}) stats := c.flushNow(now.UnixNano()+int64(c.spanConcentrator.bufferLen)*testBucketInterval, false) assert.Empty(stats.GetStats()) @@ -665,7 +665,7 @@ func TestForceFlush(t *testing.T) { traceutil.ComputeTopLevel(spans) testTrace := toProcessedTrace(spans, "none", "", "", "", "") c := NewTestConcentrator(now) - c.addNow(testTrace, "", nil) + c.addNow(testTrace, infraTags{}) assert.Len(c.spanConcentrator.buckets, 1) @@ -696,7 +696,7 @@ func TestWithContainerTags(t *testing.T) { conf.DefaultEnv = "env" conf.BucketInterval = time.Duration(testBucketInterval) c := NewTestConcentratorWithCfg(now, conf) - c.addNow(testTrace, "cid", ctags) + c.addNow(testTrace, infraTags{containerID: "cid", containerTags: ctags}) stats := c.flushNow(time.Now().Unix(), true) assert.Len(stats.GetStats(), 1) @@ -717,7 +717,7 @@ func TestDisabledContainerTags(t *testing.T) { conf.Features["disable_cid_stats"] = struct{}{} conf.BucketInterval = time.Duration(testBucketInterval) c := NewTestConcentratorWithCfg(now, conf) - c.addNow(testTrace, "cid", ctags) + c.addNow(testTrace, infraTags{containerID: "cid", containerTags: ctags}) stats := c.flushNow(time.Now().Unix(), true) assert.Len(stats.GetStats(), 1) @@ -751,7 +751,7 @@ func TestPeerTags(t *testing.T) { traceutil.ComputeTopLevel(spans) testTrace := toProcessedTrace(spans, "none", "", "", "", "") c := NewTestConcentrator(now) - c.addNow(testTrace, "", nil) + c.addNow(testTrace, infraTags{}) stats := c.flushNow(now.UnixNano()+int64(c.spanConcentrator.bufferLen)*testBucketInterval, false) assert.Len(stats.Stats[0].Stats[0].Stats, 2) for _, st := range stats.Stats[0].Stats[0].Stats { @@ -764,7 +764,7 @@ func TestPeerTags(t *testing.T) { testTrace := toProcessedTrace(spans, "none", "", "", "", "") c := NewTestConcentrator(now) c.peerTagKeys = []string{"db.instance", "db.system", "peer.service"} - c.addNow(testTrace, "", nil) + c.addNow(testTrace, infraTags{}) stats := c.flushNow(now.UnixNano()+int64(c.spanConcentrator.bufferLen)*testBucketInterval, false) assert.Len(stats.Stats[0].Stats[0].Stats, 2) for _, st := range stats.Stats[0].Stats[0].Stats { @@ -827,7 +827,7 @@ func TestComputeStatsThroughSpanKindCheck(t *testing.T) { traceutil.ComputeTopLevel(spans) testTrace := toProcessedTrace(spans, "none", "", "", "", "") c := NewTestConcentrator(now) - c.addNow(testTrace, "", nil) + c.addNow(testTrace, infraTags{}) stats := c.flushNow(now.UnixNano()+int64(c.spanConcentrator.bufferLen)*testBucketInterval, false) assert.Len(stats.Stats[0].Stats[0].Stats, 3) opNames := make(map[string]struct{}, 3) @@ -846,7 +846,7 @@ func TestComputeStatsThroughSpanKindCheck(t *testing.T) { testTrace := toProcessedTrace(spans, "none", "", "", "", "") c := NewTestConcentrator(now) c.spanConcentrator.computeStatsBySpanKind = true - c.addNow(testTrace, "", nil) + c.addNow(testTrace, infraTags{}) stats := c.flushNow(now.UnixNano()+int64(c.spanConcentrator.bufferLen)*testBucketInterval, false) assert.Len(stats.Stats[0].Stats[0].Stats, 4) opNames := make(map[string]struct{}, 4) @@ -888,7 +888,7 @@ func TestVersionData(t *testing.T) { traceutil.ComputeTopLevel(spans) testTrace := toProcessedTrace(spans, "none", "", "v1.0.1", "abc", "abc123") c := NewTestConcentrator(now) - c.addNow(testTrace, "", nil) + c.addNow(testTrace, infraTags{}) stats := c.flushNow(now.UnixNano()+int64(c.spanConcentrator.bufferLen)*testBucketInterval, false) assert.Len(stats.Stats[0].Stats[0].Stats, 2) for _, st := range stats.Stats { @@ -1047,6 +1047,6 @@ func BenchmarkConcentrator(b *testing.B) { b.ResetTimer() b.ReportAllocs() for i := 0; i < b.N; i++ { - c.addNow(testTrace, "", nil) + c.addNow(testTrace, infraTags{}) } } diff --git a/pkg/trace/stats/span_concentrator.go b/pkg/trace/stats/span_concentrator.go index 20fe409621e5a5..4246d6d537a72f 100644 --- a/pkg/trace/stats/span_concentrator.go +++ b/pkg/trace/stats/span_concentrator.go @@ -175,7 +175,7 @@ var KindsComputed = map[string]struct{}{ "producer": {}, } -func (sc *SpanConcentrator) addSpan(s *StatSpan, aggKey PayloadAggregationKey, containerID string, containerTags []string, origin string, weight float64) { +func (sc *SpanConcentrator) addSpan(s *StatSpan, aggKey PayloadAggregationKey, tags infraTags, origin string, weight float64) { sc.mu.Lock() defer sc.mu.Unlock() end := s.start + s.duration @@ -184,17 +184,20 @@ func (sc *SpanConcentrator) addSpan(s *StatSpan, aggKey PayloadAggregationKey, c b, ok := sc.buckets[btime] if !ok { b = NewRawBucket(uint64(btime), uint64(sc.bsize)) - if containerID != "" && len(containerTags) > 0 { - b.containerTagsByID[containerID] = containerTags - } sc.buckets[btime] = b } + if tags.processTagsHash != 0 && len(tags.processTags) > 0 { + b.processTagsByHash[tags.processTagsHash] = tags.processTags + } + if tags.containerID != "" && len(tags.containerTags) > 0 { + b.containerTagsByID[tags.containerID] = tags.containerTags + } b.HandleSpan(s, weight, origin, aggKey) } // AddSpan to the SpanConcentrator, appending the new data to the appropriate internal bucket. -func (sc *SpanConcentrator) AddSpan(s *StatSpan, aggKey PayloadAggregationKey, containerID string, containerTags []string, origin string) { - sc.addSpan(s, aggKey, containerID, containerTags, origin, 1) +func (sc *SpanConcentrator) AddSpan(s *StatSpan, aggKey PayloadAggregationKey, tags infraTags, origin string) { + sc.addSpan(s, aggKey, tags, origin, 1) } // Flush deletes and returns complete ClientStatsPayloads. @@ -202,6 +205,7 @@ func (sc *SpanConcentrator) AddSpan(s *StatSpan, aggKey PayloadAggregationKey, c func (sc *SpanConcentrator) Flush(now int64, force bool) []*pb.ClientStatsPayload { m := make(map[PayloadAggregationKey][]*pb.ClientStatsBucket) containerTagsByID := make(map[string][]string) + processTagsByHash := make(map[uint64]string) sc.mu.Lock() for ts, srb := range sc.buckets { @@ -222,6 +226,9 @@ func (sc *SpanConcentrator) Flush(now int64, force bool) []*pb.ClientStatsPayloa if ctags, ok := srb.containerTagsByID[k.ContainerID]; ok { containerTagsByID[k.ContainerID] = ctags } + if ptags, ok := srb.processTagsByHash[k.ProcessTagsHash]; ok { + processTagsByHash[k.ProcessTagsHash] = ptags + } } delete(sc.buckets, ts) } @@ -244,6 +251,7 @@ func (sc *SpanConcentrator) Flush(now int64, force bool) []*pb.ClientStatsPayloa ImageTag: k.ImageTag, Stats: s, Tags: containerTagsByID[k.ContainerID], + ProcessTags: processTagsByHash[k.ProcessTagsHash], } sb = append(sb, p) } diff --git a/pkg/trace/stats/statsraw.go b/pkg/trace/stats/statsraw.go index 3e21b044ba84fd..85401860de71e8 100644 --- a/pkg/trace/stats/statsraw.go +++ b/pkg/trace/stats/statsraw.go @@ -110,6 +110,7 @@ type RawBucket struct { data map[Aggregation]*groupedStats containerTagsByID map[string][]string // a map from container ID to container tags + processTagsByHash map[uint64]string // a map from process hash to process tags } // NewRawBucket opens a new calculation bucket for time ts and initializes it properly @@ -120,6 +121,7 @@ func NewRawBucket(ts, d uint64) *RawBucket { duration: d, data: make(map[Aggregation]*groupedStats), containerTagsByID: make(map[string][]string), + processTagsByHash: make(map[uint64]string), } } @@ -135,12 +137,13 @@ func (sb *RawBucket) Export() map[PayloadAggregationKey]*pb.ClientStatsBucket { continue } key := PayloadAggregationKey{ - Hostname: k.Hostname, - Version: k.Version, - Env: k.Env, - ContainerID: k.ContainerID, - GitCommitSha: k.GitCommitSha, - ImageTag: k.ImageTag, + Hostname: k.Hostname, + Version: k.Version, + Env: k.Env, + ContainerID: k.ContainerID, + GitCommitSha: k.GitCommitSha, + ImageTag: k.ImageTag, + ProcessTagsHash: k.ProcessTagsHash, } s, ok := m[key] if !ok { diff --git a/pkg/trace/stats/statsraw_test.go b/pkg/trace/stats/statsraw_test.go index 69f4d31c0ba8d9..468ecda2f5ca85 100644 --- a/pkg/trace/stats/statsraw_test.go +++ b/pkg/trace/stats/statsraw_test.go @@ -7,10 +7,11 @@ package stats import ( "fmt" - "github.com/DataDog/datadog-agent/pkg/trace/traceutil" "testing" "time" + "github.com/DataDog/datadog-agent/pkg/trace/traceutil" + pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace" "github.com/stretchr/testify/assert" @@ -225,7 +226,7 @@ func BenchmarkHandleSpanRandom(b *testing.B) { b.ReportAllocs() for i := 0; i < b.N; i++ { for _, span := range benchStatSpans { - sb.HandleSpan(span, 1, "", PayloadAggregationKey{"a", "b", "c", "d", "", ""}) + sb.HandleSpan(span, 1, "", PayloadAggregationKey{Env: "a", Hostname: "b", Version: "c", ContainerID: "d"}) } } }) @@ -283,7 +284,7 @@ func BenchmarkHandleSpanRandom(b *testing.B) { b.ReportAllocs() for i := 0; i < b.N; i++ { for _, span := range benchStatSpans { - sb.HandleSpan(span, 1, "", PayloadAggregationKey{"a", "b", "c", "d", "", ""}) + sb.HandleSpan(span, 1, "", PayloadAggregationKey{Env: "a", Hostname: "b", Version: "c", ContainerID: "d"}) } } }) From 5f045d3fb8b48b99395202766a789f3eea7f3ad2 Mon Sep 17 00:00:00 2001 From: Raphael Gavache Date: Wed, 2 Apr 2025 16:08:51 +0200 Subject: [PATCH 02/11] nits --- pkg/trace/api/api.go | 4 ++-- pkg/trace/stats/span_concentrator.go | 19 ++++++++++--------- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/pkg/trace/api/api.go b/pkg/trace/api/api.go index 5193f21361b593..cc878bc585cdfe 100644 --- a/pkg/trace/api/api.go +++ b/pkg/trace/api/api.go @@ -658,7 +658,7 @@ func (r *HTTPReceiver) handleTraces(v Version, w http.ResponseWriter, req *http. } tp.Tags[tagContainersTags] = ctags } - ptags := getProcessTagsFromHeader(req.Header, ts) + ptags := getProcessTagsFromHeader(req.Header) if ptags != "" { if tp.Tags == nil { tp.Tags = make(map[string]string) @@ -709,7 +709,7 @@ func droppedTracesFromHeader(h http.Header, ts *info.TagStats) int64 { return dropped } -func getProcessTagsFromHeader(h http.Header, ts *info.TagStats) string { +func getProcessTagsFromHeader(h http.Header) string { return h.Get(header.ProcessTags) } diff --git a/pkg/trace/stats/span_concentrator.go b/pkg/trace/stats/span_concentrator.go index 4246d6d537a72f..4c933e8c103c4d 100644 --- a/pkg/trace/stats/span_concentrator.go +++ b/pkg/trace/stats/span_concentrator.go @@ -243,15 +243,16 @@ func (sc *SpanConcentrator) Flush(now int64, force bool) []*pb.ClientStatsPayloa sb := make([]*pb.ClientStatsPayload, 0, len(m)) for k, s := range m { p := &pb.ClientStatsPayload{ - Env: k.Env, - Hostname: k.Hostname, - ContainerID: k.ContainerID, - Version: k.Version, - GitCommitSha: k.GitCommitSha, - ImageTag: k.ImageTag, - Stats: s, - Tags: containerTagsByID[k.ContainerID], - ProcessTags: processTagsByHash[k.ProcessTagsHash], + Env: k.Env, + Hostname: k.Hostname, + ContainerID: k.ContainerID, + Version: k.Version, + GitCommitSha: k.GitCommitSha, + ImageTag: k.ImageTag, + Stats: s, + Tags: containerTagsByID[k.ContainerID], + ProcessTags: processTagsByHash[k.ProcessTagsHash], + ProcessTagsHash: k.ProcessTagsHash, } sb = append(sb, p) } From b41db28cd69abc4905917608bee67351ffcbcea1 Mon Sep 17 00:00:00 2001 From: Raphael Gavache Date: Wed, 2 Apr 2025 16:15:08 +0200 Subject: [PATCH 03/11] fix --- pkg/trace/stats/aggregation.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/trace/stats/aggregation.go b/pkg/trace/stats/aggregation.go index 5b0657ff6e0fe6..60a6b13d11decf 100644 --- a/pkg/trace/stats/aggregation.go +++ b/pkg/trace/stats/aggregation.go @@ -101,6 +101,9 @@ func NewAggregationFromSpan(s *StatSpan, origin string, aggKey PayloadAggregatio } func processTagsHash(processTags string) uint64 { + if processTags == "" { + return 0 + } return peerTagsHash(strings.Split(processTags, ",")) } From 9af131a2825f6eeb8f48b2808901c77bd5a9f1f3 Mon Sep 17 00:00:00 2001 From: Raphael Gavache Date: Mon, 7 Apr 2025 22:46:52 +0200 Subject: [PATCH 04/11] first trace payload e2e --- pkg/trace/stats/client_stats_aggregator.go | 42 ++++++++++++---------- pkg/trace/stats/span_concentrator.go | 4 +-- test/new-e2e/tests/apm/tests.go | 14 ++++++++ test/new-e2e/tests/apm/tracegen.go | 14 ++++++++ test/new-e2e/tests/apm/vm_test.go | 19 ++++++++++ 5 files changed, 73 insertions(+), 20 deletions(-) diff --git a/pkg/trace/stats/client_stats_aggregator.go b/pkg/trace/stats/client_stats_aggregator.go index b087598d15e8d5..ad973c02d4dc31 100644 --- a/pkg/trace/stats/client_stats_aggregator.go +++ b/pkg/trace/stats/client_stats_aggregator.go @@ -141,7 +141,7 @@ func (a *ClientStatsAggregator) add(now time.Time, p *pb.ClientStatsPayload) { // populate container tags data on the payload a.setVersionDataFromContainerTags(p) // compute the PayloadAggregationKey, common for all buckets within the payload - payloadAggKey := newPayloadAggregationKey(p.Env, p.Hostname, p.Version, p.ContainerID, p.GitCommitSha, p.ImageTag) + payloadAggKey := newPayloadAggregationKey(p.Env, p.Hostname, p.Version, p.ContainerID, p.GitCommitSha, p.ImageTag, p.ProcessTagsHash) for _, clientBucket := range p.Stats { clientBucketStart := time.Unix(0, int64(clientBucket.Start)) @@ -149,11 +149,13 @@ func (a *ClientStatsAggregator) add(now time.Time, p *pb.ClientStatsPayload) { b, ok := a.buckets[ts.Unix()] if !ok { b = &bucket{ - ts: ts, - agg: make(map[PayloadAggregationKey]map[BucketsAggregationKey]*aggregatedStats), + ts: ts, + agg: make(map[PayloadAggregationKey]map[BucketsAggregationKey]*aggregatedStats), + processTags: make(map[uint64]string), } a.buckets[ts.Unix()] = b } + b.processTags[p.ProcessTagsHash] = p.ProcessTags b.aggregateStatsBucket(clientBucket, payloadAggKey) } } @@ -206,7 +208,8 @@ type bucket struct { // ts is the timestamp attached to the payload ts time.Time // agg contains the aggregated Hits/Errors/Duration counts - agg map[PayloadAggregationKey]map[BucketsAggregationKey]*aggregatedStats + agg map[PayloadAggregationKey]map[BucketsAggregationKey]*aggregatedStats + processTags map[uint64]string } // aggregateStatsBucket takes a ClientStatsBucket and a PayloadAggregationKey, and aggregates all counts @@ -298,13 +301,15 @@ func (b *bucket) aggregationToPayloads() []*pb.ClientStatsPayload { Stats: groupedStats, }} res = append(res, &pb.ClientStatsPayload{ - Hostname: payloadKey.Hostname, - Env: payloadKey.Env, - Version: payloadKey.Version, - ImageTag: payloadKey.ImageTag, - GitCommitSha: payloadKey.GitCommitSha, - ContainerID: payloadKey.ContainerID, - Stats: clientBuckets, + Hostname: payloadKey.Hostname, + Env: payloadKey.Env, + Version: payloadKey.Version, + ImageTag: payloadKey.ImageTag, + GitCommitSha: payloadKey.GitCommitSha, + ContainerID: payloadKey.ContainerID, + Stats: clientBuckets, + ProcessTagsHash: payloadKey.ProcessTagsHash, + ProcessTags: b.processTags[payloadKey.ProcessTagsHash], }) } return res @@ -351,14 +356,15 @@ func exporGroupedStats(aggrKey BucketsAggregationKey, stats *aggregatedStats) (* }, nil } -func newPayloadAggregationKey(env, hostname, version, cid string, gitCommitSha string, imageTag string) PayloadAggregationKey { +func newPayloadAggregationKey(env, hostname, version, cid, gitCommitSha, imageTag string, processTagsHash uint64) PayloadAggregationKey { return PayloadAggregationKey{ - Env: env, - Hostname: hostname, - Version: version, - ContainerID: cid, - GitCommitSha: gitCommitSha, - ImageTag: imageTag, + Env: env, + Hostname: hostname, + Version: version, + ContainerID: cid, + GitCommitSha: gitCommitSha, + ImageTag: imageTag, + ProcessTagsHash: processTagsHash, } } diff --git a/pkg/trace/stats/span_concentrator.go b/pkg/trace/stats/span_concentrator.go index 4c933e8c103c4d..ad4c4005ec2c33 100644 --- a/pkg/trace/stats/span_concentrator.go +++ b/pkg/trace/stats/span_concentrator.go @@ -196,8 +196,8 @@ func (sc *SpanConcentrator) addSpan(s *StatSpan, aggKey PayloadAggregationKey, t } // AddSpan to the SpanConcentrator, appending the new data to the appropriate internal bucket. -func (sc *SpanConcentrator) AddSpan(s *StatSpan, aggKey PayloadAggregationKey, tags infraTags, origin string) { - sc.addSpan(s, aggKey, tags, origin, 1) +func (sc *SpanConcentrator) AddSpan(s *StatSpan, aggKey PayloadAggregationKey, processTagsHash uint64, processTags string, origin string) { + sc.addSpan(s, aggKey, infraTags{processTagsHash: processTagsHash, processTags: processTags}, origin, 1) } // Flush deletes and returns complete ClientStatsPayloads. diff --git a/test/new-e2e/tests/apm/tests.go b/test/new-e2e/tests/apm/tests.go index b91580fc34947a..8fe817389faaa4 100644 --- a/test/new-e2e/tests/apm/tests.go +++ b/test/new-e2e/tests/apm/tests.go @@ -90,6 +90,20 @@ func testTracesHaveContainerTag(t *testing.T, c *assert.CollectT, service string assert.True(c, hasContainerTag(traces, fmt.Sprintf("container_name:%s", service)), "got traces: %v", traces) } +func testProcessTraces(c *assert.CollectT, intake *components.FakeIntake, processTags string) { + traces, err := intake.Client().GetTraces() + assert.NoError(c, err) + assert.NotEmpty(c, traces) + for _, p := range traces { + assert.NotEmpty(c, p.TracerPayloads) + for _, tp := range p.TracerPayloads { + tags, ok := tp.Tags["_dd.tags.process"] + assert.True(c, ok) + assert.Equal(c, processTags, tags) + } + } +} + func testStatsHaveContainerTags(t *testing.T, c *assert.CollectT, service string, intake *components.FakeIntake) { t.Helper() stats, err := intake.Client().GetAPMStats() diff --git a/test/new-e2e/tests/apm/tracegen.go b/test/new-e2e/tests/apm/tracegen.go index 151335ea372623..aa712407bc8e02 100644 --- a/test/new-e2e/tests/apm/tracegen.go +++ b/test/new-e2e/tests/apm/tracegen.go @@ -6,6 +6,7 @@ package apm import ( + "fmt" "strconv" "github.com/DataDog/datadog-agent/test/new-e2e/pkg/components" @@ -75,3 +76,16 @@ func tracegenTCPCommands(service string, peerTags string, enableClientSideStats rm := "docker rm -f " + service return run, rm } + +func traceWithProcessTags(h *components.RemoteHost, processTags, service string) { + // TODO: once go tracer support process tags, use tracegen instead! + h.MustExecute(fmt.Sprintf(`curl -X POST http://localhost:8126/v0.4/traces \ +-H 'X-Datadog-Process-Tags: %s' \ +-H 'X-Datadog-Trace-Count: 1' \ +-H 'Content-Type: application/json' \ +-H 'User-Agent: Go-http-client/1.1' \ +-H 'Datadog-Meta-Lang: go' \ +--data-binary @- < Date: Mon, 7 Apr 2025 23:04:22 +0200 Subject: [PATCH 05/11] add couple tests --- pkg/trace/agent/agent_test.go | 22 +++++++++------ pkg/trace/stats/concentrator_test.go | 41 ++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+), 9 deletions(-) diff --git a/pkg/trace/agent/agent_test.go b/pkg/trace/agent/agent_test.go index 99a7b5921998a7..3b610e56b11044 100644 --- a/pkg/trace/agent/agent_test.go +++ b/pkg/trace/agent/agent_test.go @@ -2450,9 +2450,11 @@ func TestConvertStats(t *testing.T) { name: "containerID feature enabled, no fargate", features: "enable_cid_stats", in: &pb.ClientStatsPayload{ - Hostname: "tracer_hots", - Env: "tracer_env", - Version: "code_version", + Hostname: "tracer_hots", + Env: "tracer_env", + Version: "code_version", + ProcessTags: "binary_name:bin", + ProcessTagsHash: 123456789, Stats: []*pb.ClientStatsBucket{ { Start: 1, @@ -2487,12 +2489,14 @@ func TestConvertStats(t *testing.T) { tracerVersion: "v1", containerID: "abc123", out: &pb.ClientStatsPayload{ - Hostname: "tracer_hots", - Env: "tracer_env", - Version: "code_version", - Lang: "java", - TracerVersion: "v1", - ContainerID: "abc123", + Hostname: "tracer_hots", + Env: "tracer_env", + Version: "code_version", + Lang: "java", + TracerVersion: "v1", + ContainerID: "abc123", + ProcessTags: "binary_name:bin", + ProcessTagsHash: 123456789, Stats: []*pb.ClientStatsBucket{ { Start: 1, diff --git a/pkg/trace/stats/concentrator_test.go b/pkg/trace/stats/concentrator_test.go index 722f1e083a1f64..48fbb05b903d48 100644 --- a/pkg/trace/stats/concentrator_test.go +++ b/pkg/trace/stats/concentrator_test.go @@ -724,6 +724,47 @@ func TestDisabledContainerTags(t *testing.T) { assert.Nil(stats.Stats[0].Tags) } +func TestWithProcessTags(t *testing.T) { + assert := assert.New(t) + now := time.Now() + + ptags := "binary_name:bin33,grpc_server:my_server" + spans := []*pb.Span{testSpan(now, 1, 0, 50, 5, "A1", "resource1", 0, map[string]string{"container_id": "cid", "kube_container_name": "k8s_container"})} + traceutil.ComputeTopLevel(spans) + testTrace := toProcessedTrace(spans, "none", "", "", "", "") + conf := config.New() + conf.Hostname = "host" + conf.DefaultEnv = "env" + conf.BucketInterval = time.Duration(testBucketInterval) + c := NewTestConcentratorWithCfg(now, conf) + c.addNow(testTrace, infraTags{processTagsHash: 27, processTags: ptags}) + + stats := c.flushNow(time.Now().Unix(), true) + assert.Len(stats.GetStats(), 1) + assert.Equal(stats.Stats[0].ProcessTags, ptags) +} + +func TestDisabledProcessTags(t *testing.T) { + assert := assert.New(t) + now := time.Now() + + ptags := "binary_name:bin33,grpc_server:my_server" + spans := []*pb.Span{testSpan(now, 1, 0, 50, 5, "A1", "resource1", 0, map[string]string{"container_id": "cid", "kube_container_name": "k8s_container"})} + traceutil.ComputeTopLevel(spans) + testTrace := toProcessedTrace(spans, "none", "", "", "", "") + conf := config.New() + conf.Hostname = "host" + conf.DefaultEnv = "env" + conf.Features["disable_process_stats"] = struct{}{} + conf.BucketInterval = time.Duration(testBucketInterval) + c := NewTestConcentratorWithCfg(now, conf) + c.addNow(testTrace, infraTags{processTagsHash: 27, processTags: ptags}) + + stats := c.flushNow(time.Now().Unix(), true) + assert.Len(stats.GetStats(), 1) + assert.Equal("", stats.Stats[0].ProcessTags) +} + func TestPeerTags(t *testing.T) { assert := assert.New(t) now := time.Now() From bb7fef005399c030554fc5bd681d6f3e1baf33a1 Mon Sep 17 00:00:00 2001 From: Raphael Gavache Date: Mon, 7 Apr 2025 23:10:22 +0200 Subject: [PATCH 06/11] fix --- pkg/trace/stats/span_concentrator.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/trace/stats/span_concentrator.go b/pkg/trace/stats/span_concentrator.go index ad4c4005ec2c33..35dae4286e5619 100644 --- a/pkg/trace/stats/span_concentrator.go +++ b/pkg/trace/stats/span_concentrator.go @@ -196,8 +196,8 @@ func (sc *SpanConcentrator) addSpan(s *StatSpan, aggKey PayloadAggregationKey, t } // AddSpan to the SpanConcentrator, appending the new data to the appropriate internal bucket. -func (sc *SpanConcentrator) AddSpan(s *StatSpan, aggKey PayloadAggregationKey, processTagsHash uint64, processTags string, origin string) { - sc.addSpan(s, aggKey, infraTags{processTagsHash: processTagsHash, processTags: processTags}, origin, 1) +func (sc *SpanConcentrator) AddSpan(s *StatSpan, aggKey PayloadAggregationKey, containerID string, containerTags []string, origin string) { + sc.addSpan(s, aggKey, infraTags{containerID: containerID, containerTags: containerTags}, origin, 1) } // Flush deletes and returns complete ClientStatsPayloads. From 0185eb9345a6055e7efde3420d50410d10f4c360 Mon Sep 17 00:00:00 2001 From: Raphael Gavache Date: Tue, 8 Apr 2025 01:50:31 +0200 Subject: [PATCH 07/11] add stats e2e --- test/new-e2e/tests/apm/tests.go | 12 ++++++++++++ test/new-e2e/tests/apm/vm_test.go | 6 ++++++ 2 files changed, 18 insertions(+) diff --git a/test/new-e2e/tests/apm/tests.go b/test/new-e2e/tests/apm/tests.go index 8fe817389faaa4..a7353e2a675ae8 100644 --- a/test/new-e2e/tests/apm/tests.go +++ b/test/new-e2e/tests/apm/tests.go @@ -104,6 +104,18 @@ func testProcessTraces(c *assert.CollectT, intake *components.FakeIntake, proces } } +func testStatsHaveProcessTags(c *assert.CollectT, intake *components.FakeIntake, processTags string) { + stats, err := intake.Client().GetAPMStats() + assert.NoError(c, err) + assert.NotEmpty(c, stats) + for _, p := range stats { + assert.NotEmpty(c, p.StatsPayload.Stats) + for _, s := range p.StatsPayload.Stats { + assert.Equal(c, processTags, s.ProcessTags) + } + } +} + func testStatsHaveContainerTags(t *testing.T, c *assert.CollectT, service string, intake *components.FakeIntake) { t.Helper() stats, err := intake.Client().GetAPMStats() diff --git a/test/new-e2e/tests/apm/vm_test.go b/test/new-e2e/tests/apm/vm_test.go index 281af8996292e7..64b6624557156d 100644 --- a/test/new-e2e/tests/apm/vm_test.go +++ b/test/new-e2e/tests/apm/vm_test.go @@ -322,6 +322,12 @@ func (s *VMFakeintakeSuite) TestProcessTagsTrace() { testProcessTraces(c, s.Env().FakeIntake, "binary:generator") s.logJournal(false) }, 3*time.Minute, 10*time.Second, "Failed to find traces with process tags") + + s.EventuallyWithTf(func(c *assert.CollectT) { + s.logStatus() + testStatsHaveProcessTags(c, s.Env().FakeIntake, "binary:generator") + s.logJournal(false) + }, 3*time.Minute, 10*time.Second, "Failed to find traces with process tags") } func (s *VMFakeintakeSuite) TestProbabilitySampler() { From 9241958d4564d22f7d77c72181d0f3ec18f1aa7a Mon Sep 17 00:00:00 2001 From: Raphael Gavache Date: Tue, 8 Apr 2025 17:04:51 +0200 Subject: [PATCH 08/11] Update test/new-e2e/tests/apm/vm_test.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Iñigo López de Heredia --- test/new-e2e/tests/apm/vm_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/test/new-e2e/tests/apm/vm_test.go b/test/new-e2e/tests/apm/vm_test.go index 64b6624557156d..2e3e3c3d758623 100644 --- a/test/new-e2e/tests/apm/vm_test.go +++ b/test/new-e2e/tests/apm/vm_test.go @@ -306,7 +306,6 @@ func (s *VMFakeintakeSuite) TestBasicTrace() { } func (s *VMFakeintakeSuite) TestProcessTagsTrace() { - err := s.Env().FakeIntake.Client().FlushServerAndResetAggregators() s.Require().NoError(err) From 78b9321cf79d45b47e4f2cce6f4cf20f9faf6006 Mon Sep 17 00:00:00 2001 From: Raphael Gavache Date: Wed, 9 Apr 2025 11:02:10 +0200 Subject: [PATCH 09/11] nit --- pkg/trace/stats/aggregation.go | 8 ++++---- pkg/trace/stats/client_stats_aggregator.go | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/trace/stats/aggregation.go b/pkg/trace/stats/aggregation.go index 60a6b13d11decf..ba67bacd1fcdc8 100644 --- a/pkg/trace/stats/aggregation.go +++ b/pkg/trace/stats/aggregation.go @@ -94,7 +94,7 @@ func NewAggregationFromSpan(s *StatSpan, origin string, aggKey PayloadAggregatio Synthetics: synthetics, IsTraceRoot: isTraceRoot, GRPCStatusCode: s.grpcStatusCode, - PeerTagsHash: peerTagsHash(s.matchingPeerTags), + PeerTagsHash: tagsFnvHash(s.matchingPeerTags), }, } return agg @@ -104,10 +104,10 @@ func processTagsHash(processTags string) uint64 { if processTags == "" { return 0 } - return peerTagsHash(strings.Split(processTags, ",")) + return tagsFnvHash(strings.Split(processTags, ",")) } -func peerTagsHash(tags []string) uint64 { +func tagsFnvHash(tags []string) uint64 { if len(tags) == 0 { return 0 } @@ -134,7 +134,7 @@ func NewAggregationFromGroup(g *pb.ClientGroupedStats) Aggregation { SpanKind: g.SpanKind, StatusCode: g.HTTPStatusCode, Synthetics: g.Synthetics, - PeerTagsHash: peerTagsHash(g.PeerTags), + PeerTagsHash: tagsFnvHash(g.PeerTags), IsTraceRoot: g.IsTraceRoot, GRPCStatusCode: g.GRPCStatusCode, }, diff --git a/pkg/trace/stats/client_stats_aggregator.go b/pkg/trace/stats/client_stats_aggregator.go index ad973c02d4dc31..0fb79ac23259c9 100644 --- a/pkg/trace/stats/client_stats_aggregator.go +++ b/pkg/trace/stats/client_stats_aggregator.go @@ -381,7 +381,7 @@ func newBucketAggregationKey(b *pb.ClientGroupedStats) BucketsAggregationKey { IsTraceRoot: b.IsTraceRoot, } if tags := b.GetPeerTags(); len(tags) > 0 { - k.PeerTagsHash = peerTagsHash(tags) + k.PeerTagsHash = tagsFnvHash(tags) } return k } From 437110377231c2d082be64de8f21106999f86490 Mon Sep 17 00:00:00 2001 From: Raphael Gavache Date: Wed, 9 Apr 2025 16:44:34 +0200 Subject: [PATCH 10/11] Add TODO comment for API migration --- pkg/trace/stats/span_concentrator.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/trace/stats/span_concentrator.go b/pkg/trace/stats/span_concentrator.go index 35dae4286e5619..dec861023facc0 100644 --- a/pkg/trace/stats/span_concentrator.go +++ b/pkg/trace/stats/span_concentrator.go @@ -196,6 +196,7 @@ func (sc *SpanConcentrator) addSpan(s *StatSpan, aggKey PayloadAggregationKey, t } // AddSpan to the SpanConcentrator, appending the new data to the appropriate internal bucket. +// todo:raphael migrate dd-trace-go API to not depend on containerID/containerTags and add processTags at encoding layer func (sc *SpanConcentrator) AddSpan(s *StatSpan, aggKey PayloadAggregationKey, containerID string, containerTags []string, origin string) { sc.addSpan(s, aggKey, infraTags{containerID: containerID, containerTags: containerTags}, origin, 1) } From 7e553bb60520f63b238cdcee011d60eff2bbd81b Mon Sep 17 00:00:00 2001 From: Raphael Gavache Date: Thu, 10 Apr 2025 13:18:12 +0200 Subject: [PATCH 11/11] add test after review --- .../stats/client_stats_aggregator_test.go | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/pkg/trace/stats/client_stats_aggregator_test.go b/pkg/trace/stats/client_stats_aggregator_test.go index 29e7f742c9f7bd..9e1bb4d91eaadc 100644 --- a/pkg/trace/stats/client_stats_aggregator_test.go +++ b/pkg/trace/stats/client_stats_aggregator_test.go @@ -683,6 +683,37 @@ func TestAggregationVersionData(t *testing.T) { }) } +func TestAggregationProcessTags(t *testing.T) { + assert := assert.New(t) + a := newTestAggregator() + msw := &mockStatsWriter{} + a.writer = msw + testTime := time.Unix(time.Now().Unix(), 0) + + bak := BucketsAggregationKey{Service: "s", Name: "test.op"} + c1 := payloadWithCounts(testTime, bak, "", "test-version", "abc", "abc123", 11, 7, 100) + c1.ProcessTags = "a:1,b:2,c:3" + c1.ProcessTagsHash = 33 + c2 := payloadWithCounts(testTime, bak, "", "test-version", "abc", "abc123", 11, 7, 100) + c2.ProcessTags = "b:33" + c1.ProcessTagsHash = 59 + + assert.Len(msw.payloads, 0) + a.add(testTime, deepCopy(c1)) + a.add(testTime, deepCopy(c2)) + assert.Len(msw.payloads, 0) + a.flushOnTime(testTime.Add(oldestBucketStart + time.Nanosecond)) + require.Len(t, msw.payloads, 1) + + aggCounts := msw.payloads[0] + assertAggCountsPayload(t, aggCounts) + + assert.Len(aggCounts.Stats, 2) + res := []string{aggCounts.Stats[0].ProcessTags, aggCounts.Stats[1].ProcessTags} + assert.ElementsMatch([]string{"a:1,b:2,c:3", "b:33"}, res) + assert.Len(a.buckets, 0) +} + func TestAggregationContainerID(t *testing.T) { t.Run("ContainerID empty", func(t *testing.T) { assert := assert.New(t) @@ -776,6 +807,8 @@ func deepCopy(p *pb.ClientStatsPayload) *pb.ClientStatsPayload { Tags: p.GetTags(), GitCommitSha: p.GetGitCommitSha(), ImageTag: p.GetImageTag(), + ProcessTags: p.GetProcessTags(), + ProcessTagsHash: p.GetProcessTagsHash(), } payload.Stats = deepCopyStatsBucket(p.Stats) return payload