diff --git a/contrib/Shopify/sarama/data_streams.go b/contrib/Shopify/sarama/data_streams.go new file mode 100644 index 0000000000..3f4e3e828f --- /dev/null +++ b/contrib/Shopify/sarama/data_streams.go @@ -0,0 +1,36 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016 Datadog, Inc. + +package sarama + +import ( + "context" + + "github.com/Shopify/sarama" + + "gopkg.in/DataDog/dd-trace-go.v1/datastreams" +) + +func TraceKafkaProduce(ctx context.Context, msg *sarama.ProducerMessage) context.Context { + edges := []string{"direction:out", "topic:" + msg.Topic, "type:kafka"} + p, ctx := datastreams.SetCheckpoint(ctx, edges...) + msg.Headers = append(msg.Headers, sarama.RecordHeader{Key: []byte(datastreams.PropagationKey), Value: p.Encode()}) + return ctx +} + +func TraceKafkaConsume(ctx context.Context, msg *sarama.ConsumerMessage, group string) context.Context { + for _, header := range msg.Headers { + if header != nil && string(header.Key) == datastreams.PropagationKey { + p, err := datastreams.Decode(header.Value) + if err == nil { + ctx = datastreams.ContextWithPathway(ctx, p) + } + break + } + } + edges := []string{"direction:in", "group:" + group, "topic:" + msg.Topic, "type:kafka"} + _, ctx = datastreams.SetCheckpoint(ctx, edges...) + return ctx +} diff --git a/contrib/Shopify/sarama/data_streams_test.go b/contrib/Shopify/sarama/data_streams_test.go new file mode 100644 index 0000000000..a8a224acc0 --- /dev/null +++ b/contrib/Shopify/sarama/data_streams_test.go @@ -0,0 +1,93 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016 Datadog, Inc. + +package sarama + +import ( + "context" + "testing" + + "github.com/Shopify/sarama" + "github.com/stretchr/testify/assert" + + "gopkg.in/DataDog/dd-trace-go.v1/datastreams" +) + +func TestTraceKafkaProduce(t *testing.T) { + t.Run("Checkpoint should be created and pathway should be propagated to kafka headers", func(t *testing.T) { + ctx := context.Background() + initialPathway := datastreams.NewPathway() + ctx = datastreams.ContextWithPathway(ctx, initialPathway) + + msg := sarama.ProducerMessage{ + Topic: "test", + } + + ctx = TraceKafkaProduce(ctx, &msg) + + // The old pathway shouldn't be equal to the new pathway found in the ctx because we created a new checkpoint. + ctxPathway, _ := datastreams.PathwayFromContext(ctx) + assertPathwayNotEqual(t, initialPathway, ctxPathway) + + // The decoded pathway found in the kafka headers should be the same as the pathway found in the ctx. + var encodedPathway []byte + for _, header := range msg.Headers { + if string(header.Key) == datastreams.PropagationKey { + encodedPathway = header.Value + } + } + headersPathway, _ := datastreams.Decode(encodedPathway) + assertPathwayEqual(t, ctxPathway, headersPathway) + }) +} + +func TestTraceKafkaConsume(t *testing.T) { + t.Run("Checkpoint should be created and pathway should be extracted from kafka headers into context", func(t *testing.T) { + // First, set up pathway and context as it would have been from the producer view. + producerCtx := context.Background() + initialPathway := datastreams.NewPathway() + producerCtx = datastreams.ContextWithPathway(producerCtx, initialPathway) + + topic := "my-topic" + produceMessage := sarama.ProducerMessage{ + Topic: topic, + } + producerCtx = TraceKafkaProduce(producerCtx, &produceMessage) + consumeMessage := sarama.ConsumerMessage{ + Topic: topic, + } + for _, header := range produceMessage.Headers { + consumeMessage.Headers = append(consumeMessage.Headers, &sarama.RecordHeader{Key: header.Key, Value: header.Value}) + } + // Calls TraceKafkaConsume + group := "my-consumer-group" + consumerCtx := context.Background() + consumerCtx = TraceKafkaConsume(consumerCtx, &consumeMessage, group) + + // Check that the resulting consumerCtx contains an expected pathway. + consumerCtxPathway, _ := datastreams.PathwayFromContext(consumerCtx) + _, expectedCtx := datastreams.SetCheckpoint(producerCtx, "direction:in", "group:my-consumer-group", "topic:my-topic", "type:kafka") + expectedCtxPathway, _ := datastreams.PathwayFromContext(expectedCtx) + assertPathwayEqual(t, expectedCtxPathway, consumerCtxPathway) + }) +} + +func assertPathwayNotEqual(t *testing.T, p1 datastreams.Pathway, p2 datastreams.Pathway) { + decodedP1, err1 := datastreams.Decode(p1.Encode()) + decodedP2, err2 := datastreams.Decode(p2.Encode()) + + assert.Nil(t, err1) + assert.Nil(t, err2) + assert.NotEqual(t, decodedP1, decodedP2) +} + +func assertPathwayEqual(t *testing.T, p1 datastreams.Pathway, p2 datastreams.Pathway) { + decodedP1, err1 := datastreams.Decode(p1.Encode()) + decodedP2, err2 := datastreams.Decode(p2.Encode()) + + assert.Nil(t, err1) + assert.Nil(t, err2) + assert.Equal(t, decodedP1, decodedP2) +} diff --git a/contrib/confluentinc/confluent-kafka-go/kafka/data_streams.go b/contrib/confluentinc/confluent-kafka-go/kafka/data_streams.go new file mode 100644 index 0000000000..0ca9560e9b --- /dev/null +++ b/contrib/confluentinc/confluent-kafka-go/kafka/data_streams.go @@ -0,0 +1,44 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016 Datadog, Inc. + +package kafka + +import ( + "context" + + "github.com/confluentinc/confluent-kafka-go/kafka" + + "gopkg.in/DataDog/dd-trace-go.v1/datastreams" +) + +func TraceKafkaProduce(ctx context.Context, msg *kafka.Message) context.Context { + edges := []string{"direction:out"} + if msg.TopicPartition.Topic != nil { + edges = append(edges, "topic:"+*msg.TopicPartition.Topic) + } + edges = append(edges, "type:kafka") + p, ctx := datastreams.SetCheckpoint(ctx, edges...) + msg.Headers = append(msg.Headers, kafka.Header{Key: datastreams.PropagationKey, Value: p.Encode()}) + return ctx +} + +func TraceKafkaConsume(ctx context.Context, msg *kafka.Message, group string) context.Context { + for _, header := range msg.Headers { + if header.Key == datastreams.PropagationKey { + p, err := datastreams.Decode(header.Value) + if err == nil { + ctx = datastreams.ContextWithPathway(ctx, p) + } + } + } + edges := []string{"direction:in", "group:" + group} + if msg.TopicPartition.Topic != nil { + edges = append(edges, "topic:"+*msg.TopicPartition.Topic) + } + edges = append(edges, "type:kafka") + edges = append(edges) + _, ctx = datastreams.SetCheckpoint(ctx, edges...) + return ctx +} diff --git a/contrib/confluentinc/confluent-kafka-go/kafka/data_streams_test.go b/contrib/confluentinc/confluent-kafka-go/kafka/data_streams_test.go new file mode 100644 index 0000000000..90159abb07 --- /dev/null +++ b/contrib/confluentinc/confluent-kafka-go/kafka/data_streams_test.go @@ -0,0 +1,94 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016 Datadog, Inc. + +package kafka + +import ( + "context" + "fmt" + "testing" + + "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/stretchr/testify/assert" + + "gopkg.in/DataDog/dd-trace-go.v1/datastreams" +) + +func TestTraceKafkaConsume(t *testing.T) { + t.Run("Checkpoint should be created and pathway should be extracted from kafka headers into context", func(t *testing.T) { + // First, set up pathway and context as it would have been from the producer view. + producerCtx := context.Background() + initialPathway := datastreams.NewPathway() + producerCtx = datastreams.ContextWithPathway(producerCtx, initialPathway) + + topic := "my-topic" + msg := kafka.Message{ + TopicPartition: kafka.TopicPartition{ + Topic: &topic, + }, + } + producerCtx = TraceKafkaProduce(producerCtx, &msg) + + // Calls TraceKafkaConsume + group := "my-consumer-group" + consumerCtx := context.Background() + fmt.Println("tracking") + consumerCtx = TraceKafkaConsume(consumerCtx, &msg, group) + + // Check that the resulting consumerCtx contains an expected pathway. + consumerCtxPathway, _ := datastreams.PathwayFromContext(consumerCtx) + fmt.Println("setting") + _, expectedCtx := datastreams.SetCheckpoint(producerCtx, "direction:in", "group:my-consumer-group", "topic:my-topic", "type:kafka") + expectedCtxPathway, _ := datastreams.PathwayFromContext(expectedCtx) + assertPathwayEqual(t, expectedCtxPathway, consumerCtxPathway) + }) +} + +func TestTraceKafkaProduce(t *testing.T) { + t.Run("Checkpoint should be created and pathway should be propagated to kafka headers", func(t *testing.T) { + ctx := context.Background() + initialPathway := datastreams.NewPathway() + ctx = datastreams.ContextWithPathway(ctx, initialPathway) + + msg := kafka.Message{ + TopicPartition: kafka.TopicPartition{}, + Value: []byte{}, + } + + ctx = TraceKafkaProduce(ctx, &msg) + + // The old pathway shouldn't be equal to the new pathway found in the ctx because we created a new checkpoint. + ctxPathway, _ := datastreams.PathwayFromContext(ctx) + assertPathwayNotEqual(t, initialPathway, ctxPathway) + + // The decoded pathway found in the kafka headers should be the same as the pathway found in the ctx. + var encodedPathway []byte + for _, header := range msg.Headers { + if header.Key == datastreams.PropagationKey { + encodedPathway = header.Value + } + } + headersPathway, _ := datastreams.Decode(encodedPathway) + assertPathwayEqual(t, ctxPathway, headersPathway) + }) +} + +func assertPathwayNotEqual(t *testing.T, p1 datastreams.Pathway, p2 datastreams.Pathway) { + decodedP1, err1 := datastreams.Decode(p1.Encode()) + decodedP2, err2 := datastreams.Decode(p2.Encode()) + + assert.Nil(t, err1) + assert.Nil(t, err2) + assert.NotEqual(t, decodedP1, decodedP2) +} + +func assertPathwayEqual(t *testing.T, p1 datastreams.Pathway, p2 datastreams.Pathway) { + decodedP1, err1 := datastreams.Decode(p1.Encode()) + decodedP2, err2 := datastreams.Decode(p2.Encode()) + + assert.Nil(t, err1) + assert.Nil(t, err2) + assert.Equal(t, decodedP1, decodedP2) +} diff --git a/datastreams/aggregator.go b/datastreams/aggregator.go new file mode 100644 index 0000000000..77adef68ea --- /dev/null +++ b/datastreams/aggregator.go @@ -0,0 +1,333 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package datastreams + +import ( + "fmt" + "log" + "math" + "net/http" + "sync" + "sync/atomic" + "time" + + "github.com/DataDog/datadog-go/v5/statsd" + "github.com/DataDog/sketches-go/ddsketch" + "github.com/DataDog/sketches-go/ddsketch/mapping" + "github.com/DataDog/sketches-go/ddsketch/store" + "github.com/golang/protobuf/proto" + + "gopkg.in/DataDog/dd-trace-go.v1/internal/version" +) + +const ( + bucketDuration = time.Second * 10 + defaultServiceName = "unnamed-go-service" +) + +var sketchMapping, _ = mapping.NewLogarithmicMapping(0.01) + +type statsPoint struct { + edgeTags []string + hash uint64 + parentHash uint64 + timestamp int64 + pathwayLatency int64 + edgeLatency int64 +} + +type statsGroup struct { + service string + edgeTags []string + hash uint64 + parentHash uint64 + pathwayLatency *ddsketch.DDSketch + edgeLatency *ddsketch.DDSketch +} + +type bucket struct { + points map[uint64]statsGroup + latestCommitOffsets map[partitionConsumerKey]int64 + latestProduceOffsets map[partitionKey]int64 + start uint64 + duration uint64 +} + +func newBucket(start, duration uint64) bucket { + return bucket{ + points: make(map[uint64]statsGroup), + latestCommitOffsets: make(map[partitionConsumerKey]int64), + latestProduceOffsets: make(map[partitionKey]int64), + start: start, + duration: duration, + } +} + +func (b bucket) export(timestampType TimestampType) StatsBucket { + stats := make([]StatsPoint, 0, len(b.points)) + for _, s := range b.points { + pathwayLatency, err := proto.Marshal(s.pathwayLatency.ToProto()) + if err != nil { + log.Printf("ERROR: can't serialize pathway latency. Ignoring: %v", err) + continue + } + edgeLatency, err := proto.Marshal(s.edgeLatency.ToProto()) + if err != nil { + log.Printf("ERROR: can't serialize edge latency. Ignoring: %v", err) + continue + } + stats = append(stats, StatsPoint{ + PathwayLatency: pathwayLatency, + EdgeLatency: edgeLatency, + Service: s.service, + EdgeTags: s.edgeTags, + Hash: s.hash, + ParentHash: s.parentHash, + TimestampType: timestampType, + }) + } + exported := StatsBucket{ + Start: b.start, + Duration: b.duration, + Stats: stats, + Backlogs: make([]Backlog, 0, len(b.latestCommitOffsets)+len(b.latestProduceOffsets)), + } + for key, offset := range b.latestProduceOffsets { + exported.Backlogs = append(exported.Backlogs, Backlog{Tags: []string{fmt.Sprintf("partition:%d", key.partition), fmt.Sprintf("topic:%s", key.topic), "type:kafka_produce"}, Value: offset}) + } + for key, offset := range b.latestCommitOffsets { + exported.Backlogs = append(exported.Backlogs, Backlog{Tags: []string{fmt.Sprintf("consumer_group:%s", key.group), fmt.Sprintf("partition:%d", key.partition), fmt.Sprintf("topic:%s", key.topic), "type:kafka_commit"}, Value: offset}) + } + return exported +} + +type aggregatorStats struct { + payloadsIn int64 + flushedPayloads int64 + flushedBuckets int64 + flushErrors int64 + dropped int64 +} + +type partitionKey struct { + partition int32 + topic string +} + +type partitionConsumerKey struct { + partition int32 + topic string + group string +} + +type offsetType int + +const ( + produceOffset offsetType = iota + commitOffset +) + +type kafkaOffset struct { + offset int64 + topic string + group string + partition int32 + offsetType offsetType + timestamp int64 +} + +type aggregator struct { + in chan statsPoint + inKafka chan kafkaOffset + tsTypeCurrentBuckets map[int64]bucket + tsTypeOriginBuckets map[int64]bucket + wg sync.WaitGroup + stopped uint64 + stop chan struct{} // closing this channel triggers shutdown + stats aggregatorStats + transport *httpTransport + statsd statsd.ClientInterface + env string + primaryTag string + service string +} + +func newAggregator(statsd statsd.ClientInterface, env, primaryTag, service, agentAddr string, httpClient *http.Client, site, apiKey string, agentLess bool) *aggregator { + return &aggregator{ + tsTypeCurrentBuckets: make(map[int64]bucket), + tsTypeOriginBuckets: make(map[int64]bucket), + in: make(chan statsPoint, 10000), + inKafka: make(chan kafkaOffset, 10000), + stopped: 1, + statsd: statsd, + env: env, + primaryTag: primaryTag, + service: service, + transport: newHTTPTransport(agentAddr, site, apiKey, httpClient, agentLess), + } +} + +// alignTs returns the provided timestamp truncated to the bucket size. +// It gives us the start time of the time bucket in which such timestamp falls. +func alignTs(ts, bucketSize int64) int64 { return ts - ts%bucketSize } + +func (a *aggregator) getBucket(btime int64, buckets map[int64]bucket) bucket { + b, ok := buckets[btime] + if !ok { + b = newBucket(uint64(btime), uint64(bucketDuration.Nanoseconds())) + buckets[btime] = b + } + return b +} +func (a *aggregator) addToBuckets(point statsPoint, btime int64, buckets map[int64]bucket) { + b := a.getBucket(btime, buckets) + group, ok := b.points[point.hash] + if !ok { + group = statsGroup{ + edgeTags: point.edgeTags, + parentHash: point.parentHash, + hash: point.hash, + pathwayLatency: ddsketch.NewDDSketch(sketchMapping, store.DenseStoreConstructor(), store.DenseStoreConstructor()), + edgeLatency: ddsketch.NewDDSketch(sketchMapping, store.DenseStoreConstructor(), store.DenseStoreConstructor()), + } + b.points[point.hash] = group + } + if err := group.pathwayLatency.Add(math.Max(float64(point.pathwayLatency)/float64(time.Second), 0)); err != nil { + log.Printf("ERROR: failed to add pathway latency. Ignoring %v.", err) + } + if err := group.edgeLatency.Add(math.Max(float64(point.edgeLatency)/float64(time.Second), 0)); err != nil { + log.Printf("ERROR: failed to add edge latency. Ignoring %v.", err) + } +} + +func (a *aggregator) add(point statsPoint) { + currentBucketTime := alignTs(point.timestamp, bucketDuration.Nanoseconds()) + a.addToBuckets(point, currentBucketTime, a.tsTypeCurrentBuckets) + originTimestamp := point.timestamp - point.pathwayLatency + originBucketTime := alignTs(originTimestamp, bucketDuration.Nanoseconds()) + a.addToBuckets(point, originBucketTime, a.tsTypeOriginBuckets) +} + +func (a *aggregator) addKafkaOffset(o kafkaOffset) { + btime := alignTs(o.timestamp, bucketDuration.Nanoseconds()) + b := a.getBucket(btime, a.tsTypeCurrentBuckets) + if o.offsetType == produceOffset { + b.latestProduceOffsets[partitionKey{ + partition: o.partition, + topic: o.topic, + }] = o.offset + return + } + b.latestCommitOffsets[partitionConsumerKey{ + partition: o.partition, + group: o.group, + topic: o.topic, + }] = o.offset +} + +func (a *aggregator) run(tick <-chan time.Time) { + for { + select { + case s := <-a.in: + atomic.AddInt64(&a.stats.payloadsIn, 1) + a.add(s) + case o := <-a.inKafka: + a.addKafkaOffset(o) + case now := <-tick: + a.sendToAgent(a.flush(now)) + case <-a.stop: + // drop in flight payloads on the input channel + a.sendToAgent(a.flush(time.Now().Add(bucketDuration * 10))) + return + } + } +} + +func (a *aggregator) Start() { + if atomic.SwapUint64(&a.stopped, 0) == 0 { + // already running + log.Print("WARN: (*aggregator).Start called more than once. This is likely a programming error.") + return + } + a.stop = make(chan struct{}) + a.wg.Add(1) + go a.reportStats() + go func() { + defer a.wg.Done() + tick := time.NewTicker(bucketDuration) + defer tick.Stop() + a.run(tick.C) + }() +} + +func (a *aggregator) Stop() { + if atomic.SwapUint64(&a.stopped, 1) > 0 { + return + } + close(a.stop) + a.wg.Wait() +} + +func (a *aggregator) reportStats() { + for range time.NewTicker(time.Second * 10).C { + a.statsd.Count("datadog.datastreams.aggregator.payloads_in", atomic.SwapInt64(&a.stats.payloadsIn, 0), nil, 1) + a.statsd.Count("datadog.datastreams.aggregator.flushed_payloads", atomic.SwapInt64(&a.stats.flushedPayloads, 0), nil, 1) + a.statsd.Count("datadog.datastreams.aggregator.flushed_buckets", atomic.SwapInt64(&a.stats.flushedBuckets, 0), nil, 1) + a.statsd.Count("datadog.datastreams.aggregator.flush_errors", atomic.SwapInt64(&a.stats.flushErrors, 0), nil, 1) + a.statsd.Count("datadog.datastreams.dropped_payloads", atomic.SwapInt64(&a.stats.dropped, 0), nil, 1) + } +} + +func (a *aggregator) runFlusher() { + for { + select { + case <-a.stop: + // flush everything, so add a few bucketDurations to the current time in order to get a good margin. + return + } + } +} + +func (a *aggregator) flushBucket(buckets map[int64]bucket, bucketStart int64, timestampType TimestampType) StatsBucket { + bucket := buckets[bucketStart] + delete(buckets, bucketStart) + return bucket.export(timestampType) +} + +func (a *aggregator) flush(now time.Time) StatsPayload { + nowNano := now.UnixNano() + sp := StatsPayload{ + Service: a.service, + Env: a.env, + PrimaryTag: a.primaryTag, + Lang: "go", + TracerVersion: version.Tag, + Stats: make([]StatsBucket, 0, len(a.tsTypeCurrentBuckets)+len(a.tsTypeOriginBuckets)), + } + for ts := range a.tsTypeCurrentBuckets { + if ts > nowNano-bucketDuration.Nanoseconds() { + // do not flush the bucket at the current time + continue + } + sp.Stats = append(sp.Stats, a.flushBucket(a.tsTypeCurrentBuckets, ts, TimestampTypeCurrent)) + } + for ts := range a.tsTypeOriginBuckets { + if ts > nowNano-bucketDuration.Nanoseconds() { + // do not flush the bucket at the current time + continue + } + sp.Stats = append(sp.Stats, a.flushBucket(a.tsTypeOriginBuckets, ts, TimestampTypeOrigin)) + } + return sp +} + +func (a *aggregator) sendToAgent(payload StatsPayload) { + atomic.AddInt64(&a.stats.flushedPayloads, 1) + atomic.AddInt64(&a.stats.flushedBuckets, int64(len(payload.Stats))) + if err := a.transport.sendPipelineStats(&payload); err != nil { + atomic.AddInt64(&a.stats.flushErrors, 1) + } +} diff --git a/datastreams/aggregator_test.go b/datastreams/aggregator_test.go new file mode 100644 index 0000000000..ee590c337c --- /dev/null +++ b/datastreams/aggregator_test.go @@ -0,0 +1,194 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package datastreams + +import ( + "sort" + "strings" + "testing" + "time" + + "github.com/DataDog/sketches-go/ddsketch" + "github.com/DataDog/sketches-go/ddsketch/store" + "github.com/golang/protobuf/proto" + "github.com/stretchr/testify/assert" + + "gopkg.in/DataDog/dd-trace-go.v1/internal/version" +) + +func buildSketch(values ...float64) []byte { + sketch := ddsketch.NewDDSketch(sketchMapping, store.DenseStoreConstructor(), store.DenseStoreConstructor()) + for _, v := range values { + sketch.Add(v) + } + bytes, _ := proto.Marshal(sketch.ToProto()) + return bytes +} + +func TestAggregator(t *testing.T) { + p := newAggregator(nil, "env", "datacenter:us1.prod.dog", "service", "agent-addr", nil, "datadoghq.com", "key", true) + tp1 := time.Now() + // Set tp2 to be some 40 seconds after the tp1, but also account for bucket alignments, + // otherwise the possible StatsPayload would change depending on when the test is run. + tp2 := time.Unix(0, alignTs(tp1.Add(time.Second*40).UnixNano(), bucketDuration.Nanoseconds())).Add(6 * time.Second) + + p.add(statsPoint{ + edgeTags: []string{"type:edge-1"}, + hash: 2, + parentHash: 1, + timestamp: tp2.UnixNano(), + pathwayLatency: time.Second.Nanoseconds(), + edgeLatency: time.Second.Nanoseconds(), + }) + p.add(statsPoint{ + edgeTags: []string{"type:edge-1"}, + hash: 2, + parentHash: 1, + timestamp: tp2.UnixNano(), + pathwayLatency: (5 * time.Second).Nanoseconds(), + edgeLatency: (2 * time.Second).Nanoseconds(), + }) + p.add(statsPoint{ + edgeTags: []string{"type:edge-1"}, + hash: 3, + parentHash: 1, + timestamp: tp2.UnixNano(), + pathwayLatency: (5 * time.Second).Nanoseconds(), + edgeLatency: (2 * time.Second).Nanoseconds(), + }) + p.add(statsPoint{ + edgeTags: []string{"type:edge-1"}, + hash: 2, + parentHash: 1, + timestamp: tp1.UnixNano(), + pathwayLatency: (5 * time.Second).Nanoseconds(), + edgeLatency: (2 * time.Second).Nanoseconds(), + }) + // flush at tp2 doesn't flush points at tp2 (current bucket) + assert.Equal(t, StatsPayload{ + Env: "env", + Service: "service", + PrimaryTag: "datacenter:us1.prod.dog", + Stats: []StatsBucket{ + { + Start: uint64(alignTs(tp1.UnixNano(), bucketDuration.Nanoseconds())), + Duration: uint64(bucketDuration.Nanoseconds()), + Stats: []StatsPoint{{ + EdgeTags: []string{"type:edge-1"}, + Hash: 2, + ParentHash: 1, + PathwayLatency: buildSketch(5), + EdgeLatency: buildSketch(2), + TimestampType: "current", + }}, + Backlogs: []Backlog{}, + }, + { + Start: uint64(alignTs(tp1.UnixNano()-(5*time.Second).Nanoseconds(), bucketDuration.Nanoseconds())), + Duration: uint64(bucketDuration.Nanoseconds()), + Stats: []StatsPoint{{ + EdgeTags: []string{"type:edge-1"}, + Hash: 2, + ParentHash: 1, + PathwayLatency: buildSketch(5), + EdgeLatency: buildSketch(2), + TimestampType: "origin", + }}, + Backlogs: []Backlog{}, + }, + }, + TracerVersion: version.Tag, + Lang: "go", + }, p.flush(tp2)) + + sp := p.flush(tp2.Add(bucketDuration).Add(time.Second)) + sort.Slice(sp.Stats[0].Stats, func(i, j int) bool { + return sp.Stats[0].Stats[i].Hash < sp.Stats[0].Stats[j].Hash + }) + assert.Equal(t, StatsPayload{ + Env: "env", + Service: "service", + PrimaryTag: "datacenter:us1.prod.dog", + Stats: []StatsBucket{ + { + Start: uint64(alignTs(tp2.UnixNano(), bucketDuration.Nanoseconds())), + Duration: uint64(bucketDuration.Nanoseconds()), + Stats: []StatsPoint{ + { + EdgeTags: []string{"type:edge-1"}, + Hash: 2, + ParentHash: 1, + PathwayLatency: buildSketch(1, 5), + EdgeLatency: buildSketch(1, 2), + TimestampType: "current", + }, + { + EdgeTags: []string{"type:edge-1"}, + Hash: 3, + ParentHash: 1, + PathwayLatency: buildSketch(5), + EdgeLatency: buildSketch(2), + TimestampType: "current", + }, + }, + Backlogs: []Backlog{}, + }, + { + Start: uint64(alignTs(tp2.UnixNano()-(5*time.Second).Nanoseconds(), bucketDuration.Nanoseconds())), + Duration: uint64(bucketDuration.Nanoseconds()), + Stats: []StatsPoint{ + { + EdgeTags: []string{"type:edge-1"}, + Hash: 2, + ParentHash: 1, + PathwayLatency: buildSketch(1, 5), + EdgeLatency: buildSketch(1, 2), + TimestampType: "origin", + }, + { + EdgeTags: []string{"type:edge-1"}, + Hash: 3, + ParentHash: 1, + PathwayLatency: buildSketch(5), + EdgeLatency: buildSketch(2), + TimestampType: "origin", + }, + }, + Backlogs: []Backlog{}, + }, + }, + TracerVersion: version.Tag, + Lang: "go", + }, sp) +} + +func TestKafkaLag(t *testing.T) { + a := newAggregator(nil, "env", "datacenter:us1.prod.dog", "service", "agent-addr", nil, "datadoghq.com", "key", true) + tp1 := time.Now() + a.addKafkaOffset(kafkaOffset{offset: 1, topic: "topic1", partition: 1, group: "group1", offsetType: commitOffset}) + a.addKafkaOffset(kafkaOffset{offset: 10, topic: "topic2", partition: 1, group: "group1", offsetType: commitOffset}) + a.addKafkaOffset(kafkaOffset{offset: 5, topic: "topic1", partition: 1, offsetType: produceOffset}) + a.addKafkaOffset(kafkaOffset{offset: 15, topic: "topic1", partition: 1, offsetType: produceOffset}) + p := a.flush(tp1.Add(bucketDuration * 2)) + sort.Slice(p.Stats[0].Backlogs, func(i, j int) bool { + return strings.Join(p.Stats[0].Backlogs[i].Tags, "") < strings.Join(p.Stats[0].Backlogs[j].Tags, "") + }) + expectedBacklogs := []Backlog{ + { + Tags: []string{"consumer_group:group1", "partition:1", "topic:topic1", "type:kafka_commit"}, + Value: 1, + }, + { + Tags: []string{"consumer_group:group1", "partition:1", "topic:topic2", "type:kafka_commit"}, + Value: 10, + }, + { + Tags: []string{"partition:1", "topic:topic1", "type:kafka_produce"}, + Value: 15, + }, + } + assert.Equal(t, expectedBacklogs, p.Stats[0].Backlogs) +} diff --git a/datastreams/container.go b/datastreams/container.go new file mode 100644 index 0000000000..32e38a6fbb --- /dev/null +++ b/datastreams/container.go @@ -0,0 +1,71 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016 Datadog, Inc. + +package datastreams + +import ( + "bufio" + "fmt" + "io" + "os" + "regexp" +) + +const ( + // cgroupPath is the path to the cgroup file where we can find the container id if one exists. + cgroupPath = "/proc/self/cgroup" +) + +const ( + uuidSource = "[0-9a-f]{8}[-_][0-9a-f]{4}[-_][0-9a-f]{4}[-_][0-9a-f]{4}[-_][0-9a-f]{12}|[0-9a-f]{8}(?:-[0-9a-f]{4}){4}$" + containerSource = "[0-9a-f]{64}" + taskSource = "[0-9a-f]{32}-\\d+" +) + +var ( + // expLine matches a line in the /proc/self/cgroup file. It has a submatch for the last element (path), which contains the container ID. + expLine = regexp.MustCompile(`^\d+:[^:]*:(.+)$`) + + // expContainerID matches contained IDs and sources. Source: https://github.com/Qard/container-info/blob/master/index.js + expContainerID = regexp.MustCompile(fmt.Sprintf(`(%s|%s|%s)(?:.scope)?$`, uuidSource, containerSource, taskSource)) + + // containerID is the containerID read at init from /proc/self/cgroup + containerID string +) + +func init() { + containerID = readContainerID(cgroupPath) +} + +// parseContainerID finds the first container ID reading from r and returns it. +func parseContainerID(r io.Reader) string { + scn := bufio.NewScanner(r) + for scn.Scan() { + path := expLine.FindStringSubmatch(scn.Text()) + if len(path) != 2 { + // invalid entry, continue + continue + } + if parts := expContainerID.FindStringSubmatch(path[1]); len(parts) == 2 { + return parts[1] + } + } + return "" +} + +// readContainerID attempts to return the container ID from the provided file path or empty on failure. +func readContainerID(fpath string) string { + f, err := os.Open(fpath) + if err != nil { + return "" + } + defer f.Close() + return parseContainerID(f) +} + +// ContainerID attempts to return the container ID from /proc/self/cgroup or empty on failure. +func ContainerID() string { + return containerID +} diff --git a/datastreams/container_test.go b/datastreams/container_test.go new file mode 100644 index 0000000000..3a6ecf6bf6 --- /dev/null +++ b/datastreams/container_test.go @@ -0,0 +1,81 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016 Datadog, Inc. + +package datastreams + +import ( + "io" + "os" + "strings" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestReadContainerID(t *testing.T) { + for in, out := range map[string]string{ + `other_line +10:hugetlb:/kubepods/burstable/podfd52ef25-a87d-11e9-9423-0800271a638e/8c046cb0b72cd4c99f51b5591cd5b095967f58ee003710a45280c28ee1a9c7fa +9:cpuset:/kubepods/burstable/podfd52ef25-a87d-11e9-9423-0800271a638e/8c046cb0b72cd4c99f51b5591cd5b095967f58ee003710a45280c28ee1a9c7fa +8:pids:/kubepods/burstable/podfd52ef25-a87d-11e9-9423-0800271a638e/8c046cb0b72cd4c99f51b5591cd5b095967f58ee003710a45280c28ee1a9c7fa +7:freezer:/kubepods/burstable/podfd52ef25-a87d-11e9-9423-0800271a638e/8c046cb0b72cd4c99f51b5591cd5b095967f58ee003710a45280c28ee1a9c7fa +6:cpu,cpuacct:/kubepods/burstable/podfd52ef25-a87d-11e9-9423-0800271a638e/8c046cb0b72cd4c99f51b5591cd5b095967f58ee003710a45280c28ee1a9c7fa +5:perf_event:/kubepods/burstable/podfd52ef25-a87d-11e9-9423-0800271a638e/8c046cb0b72cd4c99f51b5591cd5b095967f58ee003710a45280c28ee1a9c7fa +4:blkio:/kubepods/burstable/podfd52ef25-a87d-11e9-9423-0800271a638e/8c046cb0b72cd4c99f51b5591cd5b095967f58ee003710a45280c28ee1a9c7fa +3:devices:/kubepods/burstable/podfd52ef25-a87d-11e9-9423-0800271a638e/8c046cb0b72cd4c99f51b5591cd5b095967f58ee003710a45280c28ee1a9c7fa +2:net_cls,net_prio:/kubepods/burstable/podfd52ef25-a87d-11e9-9423-0800271a638e/8c046cb0b72cd4c99f51b5591cd5b095967f58ee003710a45280c28ee1a9c7fa`: "8c046cb0b72cd4c99f51b5591cd5b095967f58ee003710a45280c28ee1a9c7fa", + "10:hugetlb:/kubepods/burstable/podfd52ef25-a87d-11e9-9423-0800271a638e/8c046cb0b72cd4c99f51b5591cd5b095967f58ee003710a45280c28ee1a9c7fa": "8c046cb0b72cd4c99f51b5591cd5b095967f58ee003710a45280c28ee1a9c7fa", + "10:hugetlb:/kubepods": "", + "11:hugetlb:/ecs/55091c13-b8cf-4801-b527-f4601742204d/432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da": "432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da", + "1:name=systemd:/docker/34dc0b5e626f2c5c4c5170e34b10e7654ce36f0fcd532739f4445baabea03376": "34dc0b5e626f2c5c4c5170e34b10e7654ce36f0fcd532739f4445baabea03376", + "1:name=systemd:/uuid/34dc0b5e-626f-2c5c-4c51-70e34b10e765": "34dc0b5e-626f-2c5c-4c51-70e34b10e765", + "1:name=systemd:/ecs/34dc0b5e626f2c5c4c5170e34b10e765-1234567890": "34dc0b5e626f2c5c4c5170e34b10e765-1234567890", + "1:name=systemd:/docker/34dc0b5e626f2c5c4c5170e34b10e7654ce36f0fcd532739f4445baabea03376.scope": "34dc0b5e626f2c5c4c5170e34b10e7654ce36f0fcd532739f4445baabea03376", + `1:name=systemd:/nope +2:pids:/docker/34dc0b5e626f2c5c4c5170e34b10e7654ce36f0fcd532739f4445baabea03376 +3:cpu:/invalid`: "34dc0b5e626f2c5c4c5170e34b10e7654ce36f0fcd532739f4445baabea03376", + `other_line +12:memory:/system.slice/garden.service/garden/6f265890-5165-7fab-6b52-18d1 +11:rdma:/ +10:freezer:/garden/6f265890-5165-7fab-6b52-18d1 +9:hugetlb:/garden/6f265890-5165-7fab-6b52-18d1 +8:pids:/system.slice/garden.service/garden/6f265890-5165-7fab-6b52-18d1 +7:perf_event:/garden/6f265890-5165-7fab-6b52-18d1 +6:cpu,cpuacct:/system.slice/garden.service/garden/6f265890-5165-7fab-6b52-18d1 +5:net_cls,net_prio:/garden/6f265890-5165-7fab-6b52-18d1 +4:cpuset:/garden/6f265890-5165-7fab-6b52-18d1 +3:blkio:/system.slice/garden.service/garden/6f265890-5165-7fab-6b52-18d1 +2:devices:/system.slice/garden.service/garden/6f265890-5165-7fab-6b52-18d1 +1:name=systemd:/system.slice/garden.service/garden/6f265890-5165-7fab-6b52-18d1`: "6f265890-5165-7fab-6b52-18d1", + "1:name=systemd:/system.slice/garden.service/garden/6f265890-5165-7fab-6b52-18d1": "6f265890-5165-7fab-6b52-18d1", + } { + id := parseContainerID(strings.NewReader(in)) + if id != out { + t.Fatalf("%q -> %q: %q", in, out, id) + } + } +} + +func TestReadContainerIDFromCgroup(t *testing.T) { + cid := "8c046cb0b72cd4c99f51b5591cd5b095967f58ee003710a45280c28ee1a9c7fa" + cgroupContents := "10:hugetlb:/kubepods/burstable/podfd52ef25-a87d-11e9-9423-0800271a638e/" + cid + + tmpFile, err := os.CreateTemp(os.TempDir(), "fake-cgroup-") + if err != nil { + t.Fatalf("failed to create fake cgroup file: %v", err) + } + defer os.Remove(tmpFile.Name()) + _, err = io.WriteString(tmpFile, cgroupContents) + if err != nil { + t.Fatalf("failed writing to fake cgroup file: %v", err) + } + err = tmpFile.Close() + if err != nil { + t.Fatalf("failed closing fake cgroup file: %v", err) + } + + actualCID := readContainerID(tmpFile.Name()) + assert.Equal(t, cid, actualCID) +} diff --git a/datastreams/context.go b/datastreams/context.go new file mode 100644 index 0000000000..70c89586ce --- /dev/null +++ b/datastreams/context.go @@ -0,0 +1,70 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package datastreams + +import ( + "context" + "fmt" +) + +type contextKey struct{} + +var activePathwayKey = contextKey{} + +// ContextWithPathway returns a copy of the given context which includes the pathway p. +func ContextWithPathway(ctx context.Context, p Pathway) context.Context { + return context.WithValue(ctx, activePathwayKey, p) +} + +// PathwayFromContext returns the pathway contained in the given context, and whether a +// pathway is found in ctx. +func PathwayFromContext(ctx context.Context) (p Pathway, ok bool) { + if ctx == nil { + return Pathway{}, false + } + v := ctx.Value(activePathwayKey) + if s, ok := v.(Pathway); ok { + return s, true + } + return Pathway{}, false +} + +// SetCheckpoint sets a checkpoint on the pathway found in ctx. +// If there is no pathway in ctx, a new Pathway is returned. +func SetCheckpoint(ctx context.Context, edgeTags ...string) (Pathway, context.Context) { + if ctx == nil { + ctx = context.Background() + } + p, ok := PathwayFromContext(ctx) + if ok { + fmt.Println("SetCheckpoint", edgeTags, p.hash) + p = p.SetCheckpoint(edgeTags...) + } else { + p = NewPathway(edgeTags...) + } + ctx = ContextWithPathway(ctx, p) + return p, ctx +} + +// MergeContexts returns the first context which includes the pathway resulting from merging the pathways +// contained in all contexts. +// This function should be used in fan-in situations. The current implementation keeps only 1 Pathway. +// A future implementation could merge multiple Pathways together and put the resulting Pathway in the context. +func MergeContexts(ctxs ...context.Context) context.Context { + if len(ctxs) == 0 { + return context.Background() + } + pathways := make([]Pathway, 0, len(ctxs)) + for _, ctx := range ctxs { + if p, ok := PathwayFromContext(ctx); ok { + pathways = append(pathways, p) + } + } + if len(pathways) == 0 { + return ctxs[0] + } + return ContextWithPathway(ctxs[0], Merge(pathways)) +} diff --git a/datastreams/context_test.go b/datastreams/context_test.go new file mode 100644 index 0000000000..5299ea228c --- /dev/null +++ b/datastreams/context_test.go @@ -0,0 +1,46 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package datastreams + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestContext(t *testing.T) { + t.Run("SetCheckpoint", func(t *testing.T) { + aggregator := aggregator{ + stopped: 1, + in: make(chan statsPoint, 10), + service: "service-1", + env: "env", + primaryTag: "d:1", + } + setGlobalAggregator(&aggregator) + defer setGlobalAggregator(nil) + hash1 := pathwayHash(nodeHash("service-1", "env", "d:1", []string{"type:internal"}), 0) + hash2 := pathwayHash(nodeHash("service-1", "env", "d:1", []string{"type:kafka"}), hash1) + + ctx := context.Background() + pathway, ctx := SetCheckpoint(ctx, "type:internal") + pathway, _ = SetCheckpoint(ctx, "type:kafka") + + statsPt1 := <-aggregator.in + statsPt2 := <-aggregator.in + + assert.Equal(t, []string{"type:internal"}, statsPt1.edgeTags) + assert.Equal(t, hash1, statsPt1.hash) + assert.Equal(t, uint64(0), statsPt1.parentHash) + + assert.Equal(t, []string{"type:kafka"}, statsPt2.edgeTags) + assert.Equal(t, hash2, statsPt2.hash) + assert.Equal(t, hash1, statsPt2.parentHash) + + assert.Equal(t, statsPt2.hash, pathway.hash) + }) +} diff --git a/datastreams/init.go b/datastreams/init.go new file mode 100644 index 0000000000..9ca1ae7b46 --- /dev/null +++ b/datastreams/init.go @@ -0,0 +1,58 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +// this package will later be copied to dd-trace-go once we end the experimentation on it. +// So it can't use any dd-go dependencies. + +package datastreams + +import ( + "log" + "sync" +) + +var ( + mu sync.RWMutex + activeAggregator *aggregator +) + +func setGlobalAggregator(a *aggregator) { + mu.Lock() + defer mu.Unlock() + old := activeAggregator + activeAggregator = a + if old != nil { + old.Stop() + } +} + +func getGlobalAggregator() *aggregator { + mu.RLock() + defer mu.RUnlock() + return activeAggregator +} + +// Start starts the data streams stats aggregator that will record pipeline stats and send them to the agent. +func Start(opts ...StartOption) { + cfg := newConfig(opts...) + if !cfg.agentLess && !cfg.features.PipelineStats { + log.Print("ERROR: Agent does not support pipeline stats and pipeline stats aggregator launched in agent mode.") + return + } + p := newAggregator(cfg.statsd, cfg.env, cfg.primaryTag, cfg.service, cfg.agentAddr, cfg.httpClient, cfg.site, cfg.apiKey, cfg.agentLess) + p.Start() + setGlobalAggregator(p) +} + +// Stop stops the data streams stats aggregator. +func Stop() { + p := getGlobalAggregator() + if p == nil { + log.Print("ERROR: Stopped aggregator more than once.") + return + } + p.Stop() + setGlobalAggregator(nil) +} diff --git a/datastreams/kafka.go b/datastreams/kafka.go new file mode 100644 index 0000000000..ed95883083 --- /dev/null +++ b/datastreams/kafka.go @@ -0,0 +1,44 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016 Datadog, Inc. + +package datastreams + +import ( + "sync/atomic" + "time" +) + +func TrackKafkaCommitOffset(group string, topic string, partition int32, offset int64) { + if aggregator := getGlobalAggregator(); aggregator != nil { + select { + case aggregator.inKafka <- kafkaOffset{ + offset: offset, + group: group, + topic: topic, + partition: partition, + offsetType: commitOffset, + timestamp: time.Now().UnixNano(), + }: + default: + atomic.AddInt64(&aggregator.stats.dropped, 1) + } + } +} + +func TrackKafkaProduce(topic string, partition int32, offset int64) { + if aggregator := getGlobalAggregator(); aggregator != nil { + select { + case aggregator.inKafka <- kafkaOffset{ + offset: offset, + topic: topic, + partition: partition, + offsetType: produceOffset, + timestamp: time.Now().UnixNano(), + }: + default: + atomic.AddInt64(&aggregator.stats.dropped, 1) + } + } +} diff --git a/datastreams/options.go b/datastreams/options.go new file mode 100644 index 0000000000..9f7c610620 --- /dev/null +++ b/datastreams/options.go @@ -0,0 +1,330 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package datastreams + +import ( + "context" + "encoding/json" + "fmt" + "log" + "net" + "net/http" + "os" + "path/filepath" + "runtime" + "strconv" + "time" + + "github.com/DataDog/datadog-go/v5/statsd" +) + +var ( + // defaultSocketAPM specifies the socket path to use for connecting to the trace-agent. + // Replaced in tests + defaultSocketAPM = "/var/run/datadog/apm.socket" + + // defaultSocketDSD specifies the socket path to use for connecting to the statsd server. + // Replaced in tests + defaultSocketDSD = "/var/run/datadog/dsd.socket" +) + +// config holds the tracer configuration. +type config struct { + // features holds the capabilities of the agent and determines some + // of the behaviour of the tracer. + features agentFeatures + // logToStdout reports whether we should log all traces to the standard + // output instead of using the agent. This is used in Lambda environments. + logToStdout bool + // logStartup, when true, causes various startup info to be written + // when the tracer starts. + logStartup bool + // service specifies the name of this application. + service string + // env contains the environment that this application will run under. + env string + // primaryTag contains the primary tag that this application will run under + primaryTag string + // agentAddr specifies the hostname and port of the agent where the traces + // are sent to. + agentAddr string + // globalTags holds a set of tags that will be automatically applied to + // all spans. + globalTags map[string]interface{} + // httpClient specifies the HTTP client to be used by the agent's transport. + httpClient *http.Client + // hostname is automatically assigned when the DD_TRACE_REPORT_HOSTNAME is set to true, + // and is added as a special tag to the root span of traces. + hostname string + // dogstatsdAddr specifies the address to connect for sending metrics to the + // Datadog Agent. If not set, it defaults to "localhost:8125" or to the + // combination of the environment variables DD_AGENT_HOST and DD_DOGSTATSD_PORT. + dogstatsdAddr string + // statsd is used for tracking metrics associated with the runtime and the tracer. + statsd statsd.ClientInterface + site string + apiKey string + agentLess bool +} + +// StartOption represents a function that can be provided as a parameter to Start. +type StartOption func(*config) + +// newConfig renders the tracer configuration based on defaults, environment variables +// and passed user opts. +func newConfig(opts ...StartOption) *config { + c := new(config) + c.agentAddr = getAgentAddr() + c.httpClient = defaultHTTPClient() + if v := os.Getenv("DD_ENV"); v != "" { + c.env = v + } + if v := os.Getenv("DD_SERVICE"); v != "" { + c.service = v + } + if v := os.Getenv("DD_PRIMARY_TAG"); v != "" { + c.primaryTag = v + } + for _, fn := range opts { + fn(c) + } + if c.env == "" { + if v, ok := c.globalTags["env"]; ok { + if e, ok := v.(string); ok { + c.env = e + } + } + } + if c.service == "" { + if v, ok := c.globalTags["service"]; ok { + if s, ok := v.(string); ok { + c.service = s + } + } else { + c.service = filepath.Base(os.Args[0]) + } + } + fmt.Println("INFO: agent addr is", c.agentAddr) + c.loadAgentFeatures() + if c.statsd == nil { + // configure statsd client + addr := c.dogstatsdAddr + if addr == "" { + // no config defined address; use defaults + addr = defaultDogstatsdAddr() + } + if agentport := c.features.StatsdPort; agentport > 0 { + // the agent reported a non-standard port + host, _, err := net.SplitHostPort(addr) + if err == nil { + // we have a valid host:port address; replace the port because + // the agent knows better + if host == "" { + host = defaultHostname + } + addr = net.JoinHostPort(host, strconv.Itoa(agentport)) + } + // not a valid TCP address, leave it as it is (could be a socket connection) + } + c.dogstatsdAddr = addr + client, err := statsd.New(addr, statsd.WithMaxMessagesPerPayload(40), statsd.WithTags(statsTags(c))) + if err != nil { + log.Printf("INFO: Runtime and health metrics disabled: %v", err) + c.statsd = &statsd.NoOpClient{} + } else { + c.statsd = client + } + } + return c +} + +// getAgentAddr returns the agent address. +func getAgentAddr() string { + host := defaultHostname + port := defaultPort + if v := os.Getenv("DD_AGENT_HOST"); v != "" { + host = v + } + if v := os.Getenv("DD_TRACE_AGENT_PORT"); v != "" { + port = v + } + return fmt.Sprintf("%s:%s", host, port) +} + +// defaultHTTPClient returns the default http.Client to start the tracer with. +func defaultHTTPClient() *http.Client { + if _, err := os.Stat(defaultSocketAPM); err == nil { + // we have the UDS socket file, use it + return udsClient(defaultSocketAPM) + } + return defaultClient +} + +// udsClient returns a new http.Client which connects using the given UDS socket path. +func udsClient(socketPath string) *http.Client { + return &http.Client{ + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: func(ctx context.Context, network, address string) (net.Conn, error) { + return defaultDialer.DialContext(ctx, "unix", (&net.UnixAddr{ + Name: socketPath, + Net: "unix", + }).String()) + }, + MaxIdleConns: 100, + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + }, + Timeout: defaultHTTPTimeout, + } +} + +// defaultDogstatsdAddr returns the default connection address for Dogstatsd. +func defaultDogstatsdAddr() string { + envHost, envPort := os.Getenv("DD_AGENT_HOST"), os.Getenv("DD_DOGSTATSD_PORT") + if _, err := os.Stat(defaultSocketDSD); err == nil && envHost == "" && envPort == "" { + // socket exists and user didn't specify otherwise via env vars + return "unix://" + defaultSocketDSD + } + host, port := defaultHostname, "8125" + if envHost != "" { + host = envHost + } + if envPort != "" { + port = envPort + } + return net.JoinHostPort(host, port) +} + +// agentFeatures holds information about the trace-agent's capabilities. +// When running WithLambdaMode, a zero-value of this struct will be used +// as features. +type agentFeatures struct { + // PipelineStats reports whether the agent can receive pipeline stats on + // the /v0.1/pipeline_stats endpoint. + PipelineStats bool + // StatsdPort specifies the Dogstatsd port as provided by the agent. + // If it's the default, it will be 0, which means 8125. + StatsdPort int +} + +// loadAgentFeatures queries the trace-agent for its capabilities and updates +// the tracer's behaviour. +func (c *config) loadAgentFeatures() { + c.features = agentFeatures{} + if c.logToStdout { + // there is no agent; all features off + return + } + fmt.Println("INFO: agent addr is", c.agentAddr) + resp, err := c.httpClient.Get(fmt.Sprintf("http://%s/info", c.agentAddr)) + if err != nil { + log.Printf("ERROR: Loading features: %v", err) + return + } + if resp.StatusCode == http.StatusNotFound { + // agent is older than 7.28.0, features not discoverable + return + } + defer resp.Body.Close() + type infoResponse struct { + Endpoints []string `json:"endpoints"` + StatsdPort int `json:"statsd_port"` + } + var info infoResponse + if err := json.NewDecoder(resp.Body).Decode(&info); err != nil { + log.Printf("ERROR: Decoding features: %v", err) + return + } + c.features.StatsdPort = info.StatsdPort + for _, endpoint := range info.Endpoints { + switch endpoint { + case "/v0.1/pipeline_stats": + c.features.PipelineStats = true + log.Printf("INFO: Enable pipeline stats.") + } + } +} + +func statsTags(c *config) []string { + tags := []string{ + "lang:go", + "lang_version:" + runtime.Version(), + } + if c.service != "" { + tags = append(tags, "service:"+c.service) + } + if c.env != "" { + tags = append(tags, "env:"+c.env) + } + if c.hostname != "" { + tags = append(tags, "host:"+c.hostname) + } + for k, v := range c.globalTags { + if vstr, ok := v.(string); ok { + tags = append(tags, k+":"+vstr) + } + } + return tags +} + +// withNoopStats is used for testing to disable statsd client +func withNoopStats() StartOption { + return func(c *config) { + c.statsd = &statsd.NoOpClient{} + } +} + +// WithService sets the default service name for the program. +func WithService(name string) StartOption { + return func(c *config) { + c.service = name + } +} + +// WithAgentAddr sets the address where the agent is located. The default is +// localhost:8126. It should contain both host and port. +func WithAgentAddr(addr string) StartOption { + return func(c *config) { + fmt.Println("INFO: setting agent addr to", addr) + c.agentAddr = addr + } +} + +// WithEnv sets the environment to which all traces started by the tracer will be submitted. +// The default value is the environment variable DD_ENV, if it is set. +func WithEnv(env string) StartOption { + return func(c *config) { + c.env = env + } +} + +// WithDogstatsdAddress specifies the address to connect to for sending metrics +// to the Datadog Agent. If not set, it defaults to "localhost:8125" or to the +// combination of the environment variables DD_AGENT_HOST and DD_DOGSTATSD_PORT. +// This option is in effect when WithRuntimeMetrics is enabled. +func WithDogstatsdAddress(addr string) StartOption { + return func(cfg *config) { + cfg.dogstatsdAddr = addr + } +} + +// WithSite starts the data streams stats aggregator with a given site to send data to. +func WithSite(site string) StartOption { + return func(c *config) { + c.site = site + } +} + +// WithAgentless starts the data streams stats aggregator in a mode where stats are sent directly to the datadog backend +// instead of going through the agent. +func WithAgentless(apiKey string) StartOption { + return func(c *config) { + c.apiKey = apiKey + c.agentLess = true + } +} diff --git a/datastreams/pathway.go b/datastreams/pathway.go new file mode 100644 index 0000000000..b0fe83c65a --- /dev/null +++ b/datastreams/pathway.go @@ -0,0 +1,136 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package datastreams + +import ( + "encoding/binary" + "fmt" + "hash/fnv" + "math/rand" + "sort" + "strings" + "sync/atomic" + "time" +) + +var hashableEdgeTags = map[string]struct{}{"event_type": {}, "exchange": {}, "group": {}, "topic": {}, "type": {}, "direction": {}} + +// Pathway is used to monitor how payloads are sent across different services. +// An example Pathway would be: +// service A -- edge 1 --> service B -- edge 2 --> service C +// So it's a branch of services (we also call them "nodes") connected via edges. +// As the payload is sent around, we save the start time (start of service A), +// and the start time of the previous service. +// This allows us to measure the latency of each edge, as well as the latency from origin of any service. +type Pathway struct { + // hash is the hash of the current node, of the parent node, and of the edge that connects the parent node + // to this node. + hash uint64 + // pathwayStart is the start of the first node in the Pathway + pathwayStart time.Time + // edgeStart is the start of the previous node. + edgeStart time.Time +} + +// Merge merges multiple pathways into one. +// The current implementation samples one resulting Pathway. A future implementation could be more clever +// and actually merge the Pathways. +func Merge(pathways []Pathway) Pathway { + if len(pathways) == 0 { + return Pathway{} + } + // Randomly select a pathway to propagate downstream. + n := rand.Intn(len(pathways)) + return pathways[n] +} + +func isWellFormedEdgeTag(t string) bool { + if i := strings.IndexByte(t, ':'); i != -1 { + if j := strings.LastIndexByte(t, ':'); j == i { + if _, exists := hashableEdgeTags[t[:i]]; exists { + return true + } + } + } + return false +} + +func nodeHash(service, env, primaryTag string, edgeTags []string) uint64 { + h := fnv.New64() + sort.Strings(edgeTags) + fmt.Printf("service %s, env %s, primary tag %s, edge tags %v\n", service, env, primaryTag, edgeTags) + h.Write([]byte(service)) + h.Write([]byte(env)) + h.Write([]byte(primaryTag)) + for _, t := range edgeTags { + if isWellFormedEdgeTag(t) { + h.Write([]byte(t)) + } else { + fmt.Println("not formatted correctly", t) + } + } + return h.Sum64() +} + +func pathwayHash(nodeHash, parentHash uint64) uint64 { + b := make([]byte, 16) + binary.LittleEndian.PutUint64(b, nodeHash) + binary.LittleEndian.PutUint64(b[8:], parentHash) + h := fnv.New64() + h.Write(b) + return h.Sum64() +} + +// NewPathway creates a new pathway. +func NewPathway(edgeTags ...string) Pathway { + return newPathway(time.Now(), edgeTags...) +} + +func newPathway(now time.Time, edgeTags ...string) Pathway { + p := Pathway{ + hash: 0, + pathwayStart: now, + edgeStart: now, + } + return p.setCheckpoint(now, edgeTags) +} + +// SetCheckpoint sets a checkpoint on a pathway. +func (p Pathway) SetCheckpoint(edgeTags ...string) Pathway { + return p.setCheckpoint(time.Now(), edgeTags) +} + +func (p Pathway) setCheckpoint(now time.Time, edgeTags []string) Pathway { + aggr := getGlobalAggregator() + service := defaultServiceName + primaryTag := "" + env := "" + if aggr != nil { + service = aggr.service + primaryTag = aggr.primaryTag + env = aggr.env + } + child := Pathway{ + hash: pathwayHash(nodeHash(service, env, primaryTag, edgeTags), p.hash), + pathwayStart: p.pathwayStart, + edgeStart: now, + } + if aggregator := getGlobalAggregator(); aggregator != nil { + select { + case aggregator.in <- statsPoint{ + edgeTags: edgeTags, + parentHash: p.hash, + hash: child.hash, + timestamp: now.UnixNano(), + pathwayLatency: now.Sub(p.pathwayStart).Nanoseconds(), + edgeLatency: now.Sub(p.edgeStart).Nanoseconds(), + }: + default: + atomic.AddInt64(&aggregator.stats.dropped, 1) + } + } + return child +} diff --git a/datastreams/pathway_test.go b/datastreams/pathway_test.go new file mode 100644 index 0000000000..cdaac22321 --- /dev/null +++ b/datastreams/pathway_test.go @@ -0,0 +1,190 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package datastreams + +import ( + "fmt" + "hash/fnv" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestPathway(t *testing.T) { + t.Run("test SetCheckPoint", func(t *testing.T) { + aggregator := aggregator{ + stopped: 1, + in: make(chan statsPoint, 10), + service: "service-1", + env: "env", + primaryTag: "d:1", + } + setGlobalAggregator(&aggregator) + defer setGlobalAggregator(nil) + start := time.Now() + middle := start.Add(time.Hour) + end := middle.Add(time.Hour) + p := newPathway(start) + p = p.setCheckpoint(middle, []string{"edge-1"}) + p = p.setCheckpoint(end, []string{"edge-2"}) + hash1 := pathwayHash(nodeHash("service-1", "env", "d:1", nil), 0) + hash2 := pathwayHash(nodeHash("service-1", "env", "d:1", []string{"edge-1"}), hash1) + hash3 := pathwayHash(nodeHash("service-1", "env", "d:1", []string{"edge-2"}), hash2) + assert.Equal(t, Pathway{ + hash: hash3, + pathwayStart: start, + edgeStart: end, + }, p) + assert.Equal(t, statsPoint{ + edgeTags: nil, + hash: hash1, + parentHash: 0, + timestamp: start.UnixNano(), + pathwayLatency: 0, + edgeLatency: 0, + }, <-aggregator.in) + assert.Equal(t, statsPoint{ + edgeTags: []string{"edge-1"}, + hash: hash2, + parentHash: hash1, + timestamp: middle.UnixNano(), + pathwayLatency: middle.Sub(start).Nanoseconds(), + edgeLatency: middle.Sub(start).Nanoseconds(), + }, <-aggregator.in) + assert.Equal(t, statsPoint{ + edgeTags: []string{"edge-2"}, + hash: hash3, + parentHash: hash2, + timestamp: end.UnixNano(), + pathwayLatency: end.Sub(start).Nanoseconds(), + edgeLatency: end.Sub(middle).Nanoseconds(), + }, <-aggregator.in) + }) + + t.Run("test NewPathway", func(t *testing.T) { + aggregator := aggregator{ + stopped: 1, + in: make(chan statsPoint, 10), + service: "service-1", + env: "env", + primaryTag: "d:1", + } + setGlobalAggregator(&aggregator) + defer setGlobalAggregator(nil) + + pathwayWithNoEdgeTags := NewPathway() + pathwayWith1EdgeTag := NewPathway("type:internal") + pathwayWith2EdgeTags := NewPathway("type:internal", "some_other_key:some_other_val") + + hash1 := pathwayHash(nodeHash("service-1", "env", "d:1", nil), 0) + hash2 := pathwayHash(nodeHash("service-1", "env", "d:1", []string{"type:internal"}), 0) + hash3 := pathwayHash(nodeHash("service-1", "env", "d:1", []string{"type:internal", "some_other_key:some_other_val"}), 0) + assert.Equal(t, hash1, pathwayWithNoEdgeTags.hash) + assert.Equal(t, hash2, pathwayWith1EdgeTag.hash) + assert.Equal(t, hash3, pathwayWith2EdgeTags.hash) + + var statsPointWithNoEdgeTags = <-aggregator.in + var statsPointWith1EdgeTag = <-aggregator.in + var statsPointWith2EdgeTags = <-aggregator.in + assert.Equal(t, hash1, statsPointWithNoEdgeTags.hash) + assert.Equal(t, []string(nil), statsPointWithNoEdgeTags.edgeTags) + assert.Equal(t, hash2, statsPointWith1EdgeTag.hash) + assert.Equal(t, []string{"type:internal"}, statsPointWith1EdgeTag.edgeTags) + assert.Equal(t, hash3, statsPointWith2EdgeTags.hash) + assert.Equal(t, []string{"some_other_key:some_other_val", "type:internal"}, statsPointWith2EdgeTags.edgeTags) + }) + + t.Run("test nodeHash", func(t *testing.T) { + assert.NotEqual(t, + nodeHash("service-1", "env", "d:1", []string{"type:internal"}), + nodeHash("service-1", "env", "d:1", []string{"type:kafka"}), + ) + assert.NotEqual(t, + nodeHash("service-1", "env", "d:1", []string{"exchange:1"}), + nodeHash("service-1", "env", "d:1", []string{"exchange:2"}), + ) + assert.NotEqual(t, + nodeHash("service-1", "env", "d:1", []string{"topic:1"}), + nodeHash("service-1", "env", "d:1", []string{"topic:2"}), + ) + assert.NotEqual(t, + nodeHash("service-1", "env", "d:1", []string{"group:1"}), + nodeHash("service-1", "env", "d:1", []string{"group:2"}), + ) + assert.NotEqual(t, + nodeHash("service-1", "env", "d:1", []string{"event_type:1"}), + nodeHash("service-1", "env", "d:1", []string{"event_type:2"}), + ) + assert.Equal(t, + nodeHash("service-1", "env", "d:1", []string{"partition:0"}), + nodeHash("service-1", "env", "d:1", []string{"partition:1"}), + ) + }) + + t.Run("test isWellFormedEdgeTag", func(t *testing.T) { + for _, tc := range []struct { + s string + b bool + }{ + {"", false}, + {"dog", false}, + {"dog:", false}, + {"dog:bark", false}, + {"type:", true}, + {"type:dog", true}, + {"type::dog", false}, + {"type:d:o:g", false}, + {"type::", false}, + {":", false}, + } { + assert.Equal(t, isWellFormedEdgeTag(tc.s), tc.b) + } + }) + + // nodeHash assumes that the go Hash interface produces the same result + // for a given series of Write calls as for a single Write of the same + // byte sequence. This unit test asserts that assumption. + t.Run("test hashWriterIsomorphism", func(t *testing.T) { + h := fnv.New64() + var b []byte + b = append(b, "dog"...) + b = append(b, "cat"...) + b = append(b, "pig"...) + h.Write(b) + s1 := h.Sum64() + h.Reset() + h.Write([]byte("dog")) + h.Write([]byte("cat")) + h.Write([]byte("pig")) + assert.Equal(t, s1, h.Sum64()) + }) +} + +// Sample results at time of writing this benchmark: +// goos: darwin +// goarch: amd64 +// pkg: github.com/DataDog/data-streams-go/datastreams +// cpu: Intel(R) Core(TM) i7-1068NG7 CPU @ 2.30GHz +// BenchmarkNodeHash-8 5167707 232.5 ns/op 24 B/op 1 allocs/op +func BenchmarkNodeHash(b *testing.B) { + service := "benchmark-runner" + env := "test" + primaryTag := "foo:bar" + edgeTags := []string{"event_type:dog", "exchange:local", "group:all", "topic:off", "type:writer"} + for i := 0; i < b.N; i++ { + nodeHash(service, env, primaryTag, edgeTags) + } +} + +func TestPiotr(t *testing.T) { + w := fnv.New64() + w.Write([]byte("unnamed-python-servicenonedirection:outtopic:topicAtype:kafka")) + fmt.Println(w.Sum64()) + nodeHash := nodeHash("unnamed-python-service", "none", "", []string{"type:kafka", "direction:out", "topic:topicA"}) + fmt.Println(nodeHash) + fmt.Println(pathwayHash(nodeHash, 0)) +} diff --git a/datastreams/payload.go b/datastreams/payload.go new file mode 100644 index 0000000000..a337e313fc --- /dev/null +++ b/datastreams/payload.go @@ -0,0 +1,83 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +//go:generate msgp -unexported -marshal=false -o=payload_msgp.go -tests=false + +package datastreams + +// StatsPayload stores client computed stats. +type StatsPayload struct { + // Env specifies the env. of the application, as defined by the user. + Env string + // Service is the service of the application + Service string + // PrimaryTag is the primary tag of the application. + PrimaryTag string + // Stats holds all stats buckets computed within this payload. + Stats []StatsBucket + // TracerVersion is the version of the tracer + TracerVersion string + // Lang is the language of the tracer + Lang string +} + +type ProduceOffset struct { + Topic string + Partition int32 + Offset int64 +} + +type CommitOffset struct { + ConsumerGroup string + Topic string + Partition int32 + Offset int64 +} + +// Backlog represents the size of a queue that hasn't been yet read by the consumer. +type Backlog struct { + // Tags that identify the backlog + Tags []string + // Value of the backlog + Value int64 +} + +// StatsBucket specifies a set of stats computed over a duration. +type StatsBucket struct { + // Start specifies the beginning of this bucket in unix nanoseconds. + Start uint64 + // Duration specifies the duration of this bucket in nanoseconds. + Duration uint64 + // Stats contains a set of statistics computed for the duration of this bucket. + Stats []StatsPoint + // Backlogs store information used to compute queue backlog + Backlogs []Backlog +} + +// TimestampType can be either current or origin. +type TimestampType string + +const ( + // TimestampTypeCurrent is for when the recorded timestamp is based on the + // timestamp of the current StatsPoint. + TimestampTypeCurrent TimestampType = "current" + // TimestampTypeOrigin is for when the recorded timestamp is based on the + // time that the first StatsPoint in the pathway is sent out. + TimestampTypeOrigin TimestampType = "origin" +) + +// StatsPoint contains a set of statistics grouped under various aggregation keys. +type StatsPoint struct { + // These fields indicate the properties under which the stats were aggregated. + Service string // deprecated + EdgeTags []string + Hash uint64 + ParentHash uint64 + // These fields specify the stats for the above aggregation. + // those are distributions of latency in seconds. + PathwayLatency []byte + EdgeLatency []byte + TimestampType TimestampType +} diff --git a/datastreams/payload_msgp.go b/datastreams/payload_msgp.go new file mode 100644 index 0000000000..f0b7570fa4 --- /dev/null +++ b/datastreams/payload_msgp.go @@ -0,0 +1,891 @@ +package datastreams + +// Code generated by github.com/tinylib/msgp DO NOT EDIT. + +import ( + "github.com/tinylib/msgp/msgp" +) + +// DecodeMsg implements msgp.Decodable +func (z *Backlog) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "Tags": + var zb0002 uint32 + zb0002, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err, "Tags") + return + } + if cap(z.Tags) >= int(zb0002) { + z.Tags = (z.Tags)[:zb0002] + } else { + z.Tags = make([]string, zb0002) + } + for za0001 := range z.Tags { + z.Tags[za0001], err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Tags", za0001) + return + } + } + case "Value": + z.Value, err = dc.ReadInt64() + if err != nil { + err = msgp.WrapError(err, "Value") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *Backlog) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 2 + // write "Tags" + err = en.Append(0x82, 0xa4, 0x54, 0x61, 0x67, 0x73) + if err != nil { + return + } + err = en.WriteArrayHeader(uint32(len(z.Tags))) + if err != nil { + err = msgp.WrapError(err, "Tags") + return + } + for za0001 := range z.Tags { + err = en.WriteString(z.Tags[za0001]) + if err != nil { + err = msgp.WrapError(err, "Tags", za0001) + return + } + } + // write "Value" + err = en.Append(0xa5, 0x56, 0x61, 0x6c, 0x75, 0x65) + if err != nil { + return + } + err = en.WriteInt64(z.Value) + if err != nil { + err = msgp.WrapError(err, "Value") + return + } + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *Backlog) Msgsize() (s int) { + s = 1 + 5 + msgp.ArrayHeaderSize + for za0001 := range z.Tags { + s += msgp.StringPrefixSize + len(z.Tags[za0001]) + } + s += 6 + msgp.Int64Size + return +} + +// DecodeMsg implements msgp.Decodable +func (z *CommitOffset) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "ConsumerGroup": + z.ConsumerGroup, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "ConsumerGroup") + return + } + case "Topic": + z.Topic, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Topic") + return + } + case "Partition": + z.Partition, err = dc.ReadInt32() + if err != nil { + err = msgp.WrapError(err, "Partition") + return + } + case "Offset": + z.Offset, err = dc.ReadInt64() + if err != nil { + err = msgp.WrapError(err, "Offset") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *CommitOffset) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 4 + // write "ConsumerGroup" + err = en.Append(0x84, 0xad, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x47, 0x72, 0x6f, 0x75, 0x70) + if err != nil { + return + } + err = en.WriteString(z.ConsumerGroup) + if err != nil { + err = msgp.WrapError(err, "ConsumerGroup") + return + } + // write "Topic" + err = en.Append(0xa5, 0x54, 0x6f, 0x70, 0x69, 0x63) + if err != nil { + return + } + err = en.WriteString(z.Topic) + if err != nil { + err = msgp.WrapError(err, "Topic") + return + } + // write "Partition" + err = en.Append(0xa9, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e) + if err != nil { + return + } + err = en.WriteInt32(z.Partition) + if err != nil { + err = msgp.WrapError(err, "Partition") + return + } + // write "Offset" + err = en.Append(0xa6, 0x4f, 0x66, 0x66, 0x73, 0x65, 0x74) + if err != nil { + return + } + err = en.WriteInt64(z.Offset) + if err != nil { + err = msgp.WrapError(err, "Offset") + return + } + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *CommitOffset) Msgsize() (s int) { + s = 1 + 14 + msgp.StringPrefixSize + len(z.ConsumerGroup) + 6 + msgp.StringPrefixSize + len(z.Topic) + 10 + msgp.Int32Size + 7 + msgp.Int64Size + return +} + +// DecodeMsg implements msgp.Decodable +func (z *ProduceOffset) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "Topic": + z.Topic, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Topic") + return + } + case "Partition": + z.Partition, err = dc.ReadInt32() + if err != nil { + err = msgp.WrapError(err, "Partition") + return + } + case "Offset": + z.Offset, err = dc.ReadInt64() + if err != nil { + err = msgp.WrapError(err, "Offset") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z ProduceOffset) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 3 + // write "Topic" + err = en.Append(0x83, 0xa5, 0x54, 0x6f, 0x70, 0x69, 0x63) + if err != nil { + return + } + err = en.WriteString(z.Topic) + if err != nil { + err = msgp.WrapError(err, "Topic") + return + } + // write "Partition" + err = en.Append(0xa9, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e) + if err != nil { + return + } + err = en.WriteInt32(z.Partition) + if err != nil { + err = msgp.WrapError(err, "Partition") + return + } + // write "Offset" + err = en.Append(0xa6, 0x4f, 0x66, 0x66, 0x73, 0x65, 0x74) + if err != nil { + return + } + err = en.WriteInt64(z.Offset) + if err != nil { + err = msgp.WrapError(err, "Offset") + return + } + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z ProduceOffset) Msgsize() (s int) { + s = 1 + 6 + msgp.StringPrefixSize + len(z.Topic) + 10 + msgp.Int32Size + 7 + msgp.Int64Size + return +} + +// DecodeMsg implements msgp.Decodable +func (z *StatsBucket) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "Start": + z.Start, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "Start") + return + } + case "Duration": + z.Duration, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "Duration") + return + } + case "Stats": + var zb0002 uint32 + zb0002, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err, "Stats") + return + } + if cap(z.Stats) >= int(zb0002) { + z.Stats = (z.Stats)[:zb0002] + } else { + z.Stats = make([]StatsPoint, zb0002) + } + for za0001 := range z.Stats { + err = z.Stats[za0001].DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "Stats", za0001) + return + } + } + case "Backlogs": + var zb0003 uint32 + zb0003, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err, "Backlogs") + return + } + if cap(z.Backlogs) >= int(zb0003) { + z.Backlogs = (z.Backlogs)[:zb0003] + } else { + z.Backlogs = make([]Backlog, zb0003) + } + for za0002 := range z.Backlogs { + var zb0004 uint32 + zb0004, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err, "Backlogs", za0002) + return + } + for zb0004 > 0 { + zb0004-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err, "Backlogs", za0002) + return + } + switch msgp.UnsafeString(field) { + case "Tags": + var zb0005 uint32 + zb0005, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err, "Backlogs", za0002, "Tags") + return + } + if cap(z.Backlogs[za0002].Tags) >= int(zb0005) { + z.Backlogs[za0002].Tags = (z.Backlogs[za0002].Tags)[:zb0005] + } else { + z.Backlogs[za0002].Tags = make([]string, zb0005) + } + for za0003 := range z.Backlogs[za0002].Tags { + z.Backlogs[za0002].Tags[za0003], err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Backlogs", za0002, "Tags", za0003) + return + } + } + case "Value": + z.Backlogs[za0002].Value, err = dc.ReadInt64() + if err != nil { + err = msgp.WrapError(err, "Backlogs", za0002, "Value") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err, "Backlogs", za0002) + return + } + } + } + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *StatsBucket) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 4 + // write "Start" + err = en.Append(0x84, 0xa5, 0x53, 0x74, 0x61, 0x72, 0x74) + if err != nil { + return + } + err = en.WriteUint64(z.Start) + if err != nil { + err = msgp.WrapError(err, "Start") + return + } + // write "Duration" + err = en.Append(0xa8, 0x44, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e) + if err != nil { + return + } + err = en.WriteUint64(z.Duration) + if err != nil { + err = msgp.WrapError(err, "Duration") + return + } + // write "Stats" + err = en.Append(0xa5, 0x53, 0x74, 0x61, 0x74, 0x73) + if err != nil { + return + } + err = en.WriteArrayHeader(uint32(len(z.Stats))) + if err != nil { + err = msgp.WrapError(err, "Stats") + return + } + for za0001 := range z.Stats { + err = z.Stats[za0001].EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "Stats", za0001) + return + } + } + // write "Backlogs" + err = en.Append(0xa8, 0x42, 0x61, 0x63, 0x6b, 0x6c, 0x6f, 0x67, 0x73) + if err != nil { + return + } + err = en.WriteArrayHeader(uint32(len(z.Backlogs))) + if err != nil { + err = msgp.WrapError(err, "Backlogs") + return + } + for za0002 := range z.Backlogs { + // map header, size 2 + // write "Tags" + err = en.Append(0x82, 0xa4, 0x54, 0x61, 0x67, 0x73) + if err != nil { + return + } + err = en.WriteArrayHeader(uint32(len(z.Backlogs[za0002].Tags))) + if err != nil { + err = msgp.WrapError(err, "Backlogs", za0002, "Tags") + return + } + for za0003 := range z.Backlogs[za0002].Tags { + err = en.WriteString(z.Backlogs[za0002].Tags[za0003]) + if err != nil { + err = msgp.WrapError(err, "Backlogs", za0002, "Tags", za0003) + return + } + } + // write "Value" + err = en.Append(0xa5, 0x56, 0x61, 0x6c, 0x75, 0x65) + if err != nil { + return + } + err = en.WriteInt64(z.Backlogs[za0002].Value) + if err != nil { + err = msgp.WrapError(err, "Backlogs", za0002, "Value") + return + } + } + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *StatsBucket) Msgsize() (s int) { + s = 1 + 6 + msgp.Uint64Size + 9 + msgp.Uint64Size + 6 + msgp.ArrayHeaderSize + for za0001 := range z.Stats { + s += z.Stats[za0001].Msgsize() + } + s += 9 + msgp.ArrayHeaderSize + for za0002 := range z.Backlogs { + s += 1 + 5 + msgp.ArrayHeaderSize + for za0003 := range z.Backlogs[za0002].Tags { + s += msgp.StringPrefixSize + len(z.Backlogs[za0002].Tags[za0003]) + } + s += 6 + msgp.Int64Size + } + return +} + +// DecodeMsg implements msgp.Decodable +func (z *StatsPayload) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "Env": + z.Env, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Env") + return + } + case "Service": + z.Service, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Service") + return + } + case "PrimaryTag": + z.PrimaryTag, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "PrimaryTag") + return + } + case "Stats": + var zb0002 uint32 + zb0002, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err, "Stats") + return + } + if cap(z.Stats) >= int(zb0002) { + z.Stats = (z.Stats)[:zb0002] + } else { + z.Stats = make([]StatsBucket, zb0002) + } + for za0001 := range z.Stats { + err = z.Stats[za0001].DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "Stats", za0001) + return + } + } + case "TracerVersion": + z.TracerVersion, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "TracerVersion") + return + } + case "Lang": + z.Lang, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Lang") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *StatsPayload) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 6 + // write "Env" + err = en.Append(0x86, 0xa3, 0x45, 0x6e, 0x76) + if err != nil { + return + } + err = en.WriteString(z.Env) + if err != nil { + err = msgp.WrapError(err, "Env") + return + } + // write "Service" + err = en.Append(0xa7, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65) + if err != nil { + return + } + err = en.WriteString(z.Service) + if err != nil { + err = msgp.WrapError(err, "Service") + return + } + // write "PrimaryTag" + err = en.Append(0xaa, 0x50, 0x72, 0x69, 0x6d, 0x61, 0x72, 0x79, 0x54, 0x61, 0x67) + if err != nil { + return + } + err = en.WriteString(z.PrimaryTag) + if err != nil { + err = msgp.WrapError(err, "PrimaryTag") + return + } + // write "Stats" + err = en.Append(0xa5, 0x53, 0x74, 0x61, 0x74, 0x73) + if err != nil { + return + } + err = en.WriteArrayHeader(uint32(len(z.Stats))) + if err != nil { + err = msgp.WrapError(err, "Stats") + return + } + for za0001 := range z.Stats { + err = z.Stats[za0001].EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "Stats", za0001) + return + } + } + // write "TracerVersion" + err = en.Append(0xad, 0x54, 0x72, 0x61, 0x63, 0x65, 0x72, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e) + if err != nil { + return + } + err = en.WriteString(z.TracerVersion) + if err != nil { + err = msgp.WrapError(err, "TracerVersion") + return + } + // write "Lang" + err = en.Append(0xa4, 0x4c, 0x61, 0x6e, 0x67) + if err != nil { + return + } + err = en.WriteString(z.Lang) + if err != nil { + err = msgp.WrapError(err, "Lang") + return + } + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *StatsPayload) Msgsize() (s int) { + s = 1 + 4 + msgp.StringPrefixSize + len(z.Env) + 8 + msgp.StringPrefixSize + len(z.Service) + 11 + msgp.StringPrefixSize + len(z.PrimaryTag) + 6 + msgp.ArrayHeaderSize + for za0001 := range z.Stats { + s += z.Stats[za0001].Msgsize() + } + s += 14 + msgp.StringPrefixSize + len(z.TracerVersion) + 5 + msgp.StringPrefixSize + len(z.Lang) + return +} + +// DecodeMsg implements msgp.Decodable +func (z *StatsPoint) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "Service": + z.Service, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Service") + return + } + case "EdgeTags": + var zb0002 uint32 + zb0002, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err, "EdgeTags") + return + } + if cap(z.EdgeTags) >= int(zb0002) { + z.EdgeTags = (z.EdgeTags)[:zb0002] + } else { + z.EdgeTags = make([]string, zb0002) + } + for za0001 := range z.EdgeTags { + z.EdgeTags[za0001], err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "EdgeTags", za0001) + return + } + } + case "Hash": + z.Hash, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "Hash") + return + } + case "ParentHash": + z.ParentHash, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "ParentHash") + return + } + case "PathwayLatency": + z.PathwayLatency, err = dc.ReadBytes(z.PathwayLatency) + if err != nil { + err = msgp.WrapError(err, "PathwayLatency") + return + } + case "EdgeLatency": + z.EdgeLatency, err = dc.ReadBytes(z.EdgeLatency) + if err != nil { + err = msgp.WrapError(err, "EdgeLatency") + return + } + case "TimestampType": + { + var zb0003 string + zb0003, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "TimestampType") + return + } + z.TimestampType = TimestampType(zb0003) + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *StatsPoint) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 7 + // write "Service" + err = en.Append(0x87, 0xa7, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65) + if err != nil { + return + } + err = en.WriteString(z.Service) + if err != nil { + err = msgp.WrapError(err, "Service") + return + } + // write "EdgeTags" + err = en.Append(0xa8, 0x45, 0x64, 0x67, 0x65, 0x54, 0x61, 0x67, 0x73) + if err != nil { + return + } + err = en.WriteArrayHeader(uint32(len(z.EdgeTags))) + if err != nil { + err = msgp.WrapError(err, "EdgeTags") + return + } + for za0001 := range z.EdgeTags { + err = en.WriteString(z.EdgeTags[za0001]) + if err != nil { + err = msgp.WrapError(err, "EdgeTags", za0001) + return + } + } + // write "Hash" + err = en.Append(0xa4, 0x48, 0x61, 0x73, 0x68) + if err != nil { + return + } + err = en.WriteUint64(z.Hash) + if err != nil { + err = msgp.WrapError(err, "Hash") + return + } + // write "ParentHash" + err = en.Append(0xaa, 0x50, 0x61, 0x72, 0x65, 0x6e, 0x74, 0x48, 0x61, 0x73, 0x68) + if err != nil { + return + } + err = en.WriteUint64(z.ParentHash) + if err != nil { + err = msgp.WrapError(err, "ParentHash") + return + } + // write "PathwayLatency" + err = en.Append(0xae, 0x50, 0x61, 0x74, 0x68, 0x77, 0x61, 0x79, 0x4c, 0x61, 0x74, 0x65, 0x6e, 0x63, 0x79) + if err != nil { + return + } + err = en.WriteBytes(z.PathwayLatency) + if err != nil { + err = msgp.WrapError(err, "PathwayLatency") + return + } + // write "EdgeLatency" + err = en.Append(0xab, 0x45, 0x64, 0x67, 0x65, 0x4c, 0x61, 0x74, 0x65, 0x6e, 0x63, 0x79) + if err != nil { + return + } + err = en.WriteBytes(z.EdgeLatency) + if err != nil { + err = msgp.WrapError(err, "EdgeLatency") + return + } + // write "TimestampType" + err = en.Append(0xad, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x54, 0x79, 0x70, 0x65) + if err != nil { + return + } + err = en.WriteString(string(z.TimestampType)) + if err != nil { + err = msgp.WrapError(err, "TimestampType") + return + } + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *StatsPoint) Msgsize() (s int) { + s = 1 + 8 + msgp.StringPrefixSize + len(z.Service) + 9 + msgp.ArrayHeaderSize + for za0001 := range z.EdgeTags { + s += msgp.StringPrefixSize + len(z.EdgeTags[za0001]) + } + s += 5 + msgp.Uint64Size + 11 + msgp.Uint64Size + 15 + msgp.BytesPrefixSize + len(z.PathwayLatency) + 12 + msgp.BytesPrefixSize + len(z.EdgeLatency) + 14 + msgp.StringPrefixSize + len(string(z.TimestampType)) + return +} + +// DecodeMsg implements msgp.Decodable +func (z *TimestampType) DecodeMsg(dc *msgp.Reader) (err error) { + { + var zb0001 string + zb0001, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err) + return + } + (*z) = TimestampType(zb0001) + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z TimestampType) EncodeMsg(en *msgp.Writer) (err error) { + err = en.WriteString(string(z)) + if err != nil { + err = msgp.WrapError(err) + return + } + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z TimestampType) Msgsize() (s int) { + s = msgp.StringPrefixSize + len(string(z)) + return +} diff --git a/datastreams/propagator.go b/datastreams/propagator.go new file mode 100644 index 0000000000..3f1b75e6c1 --- /dev/null +++ b/datastreams/propagator.go @@ -0,0 +1,65 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package datastreams + +import ( + "encoding/base64" + "encoding/binary" + "errors" + "time" + + "github.com/DataDog/sketches-go/ddsketch/encoding" +) + +const ( + // PropagationKey is the key to use to propagate the pathway between services. + PropagationKey = "dd-pathway-ctx" + PropagationKeyBase64 = "dd-pathway-ctx-base64" +) + +// Encode encodes the pathway +func (p Pathway) Encode() []byte { + data := make([]byte, 8, 20) + binary.LittleEndian.PutUint64(data, p.hash) + encoding.EncodeVarint64(&data, p.pathwayStart.UnixNano()/int64(time.Millisecond)) + encoding.EncodeVarint64(&data, p.edgeStart.UnixNano()/int64(time.Millisecond)) + return data +} + +// Decode decodes a pathway +func Decode(data []byte) (p Pathway, err error) { + if len(data) < 8 { + return p, errors.New("hash smaller than 8 bytes") + } + p.hash = binary.LittleEndian.Uint64(data) + data = data[8:] + pathwayStart, err := encoding.DecodeVarint64(&data) + if err != nil { + return p, err + } + edgeStart, err := encoding.DecodeVarint64(&data) + if err != nil { + return p, err + } + p.pathwayStart = time.Unix(0, pathwayStart*int64(time.Millisecond)) + p.edgeStart = time.Unix(0, edgeStart*int64(time.Millisecond)) + return p, nil +} + +// EncodeStr encodes a pathway context into a string using base64 encoding. +func (p Pathway) EncodeStr() string { + b := p.Encode() + return base64.StdEncoding.EncodeToString(b) +} + +// DecodeStr decodes a pathway context from a string using base64 encoding. +func DecodeStr(str string) (p Pathway, err error) { + data, err := base64.StdEncoding.DecodeString(str) + if err != nil { + return p, err + } + return Decode(data) +} diff --git a/datastreams/propagator_test.go b/datastreams/propagator_test.go new file mode 100644 index 0000000000..45efa56d97 --- /dev/null +++ b/datastreams/propagator_test.go @@ -0,0 +1,38 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package datastreams + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func testPathway() Pathway { + now := time.Now().Local().Truncate(time.Millisecond) + return Pathway{ + hash: 234, + pathwayStart: now.Add(-time.Hour), + edgeStart: now, + } +} + +func TestEncode(t *testing.T) { + p := testPathway() + encoded := p.Encode() + decoded, err := Decode(encoded) + assert.Nil(t, err) + assert.Equal(t, p, decoded) +} + +func TestEncodeStr(t *testing.T) { + p := testPathway() + encoded := p.EncodeStr() + decoded, err := DecodeStr(encoded) + assert.Nil(t, err) + assert.Equal(t, p, decoded) +} diff --git a/datastreams/transport.go b/datastreams/transport.go new file mode 100644 index 0000000000..8b87cf8590 --- /dev/null +++ b/datastreams/transport.go @@ -0,0 +1,118 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package datastreams + +import ( + "bytes" + "compress/gzip" + "fmt" + "net" + "net/http" + "runtime" + "strings" + "time" + + "github.com/tinylib/msgp/msgp" +) + +const ( + defaultHostname = "localhost" + defaultPort = "8126" + defaultAddress = defaultHostname + ":" + defaultPort + defaultHTTPTimeout = 2 * time.Second // defines the current timeout before giving up with the send process +) + +var defaultDialer = &net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + DualStack: true, +} + +var defaultClient = &http.Client{ + // We copy the transport to avoid using the default one, as it might be + // augmented with tracing and we don't want these calls to be recorded. + // See https://golang.org/pkg/net/http/#DefaultTransport . + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: defaultDialer.DialContext, + MaxIdleConns: 100, + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + }, + Timeout: defaultHTTPTimeout, +} + +type httpTransport struct { + url string // the delivery URL for stats + client *http.Client // the HTTP client used in the POST + headers map[string]string // the Transport headers +} + +func newHTTPTransport(addr string, site string, apiKey string, client *http.Client, agentLess bool) *httpTransport { + // initialize the default EncoderPool with Encoder headers + defaultHeaders := map[string]string{ + "Datadog-Meta-Lang": "go", + "Datadog-Meta-Lang-Version": strings.TrimPrefix(runtime.Version(), "go"), + "Datadog-Meta-Lang-Interpreter": runtime.Compiler + "-" + runtime.GOARCH + "-" + runtime.GOOS, + "Content-Type": "application/msgpack", + "Content-Encoding": "gzip", + } + if cid := ContainerID(); cid != "" { + defaultHeaders["Datadog-Container-ID"] = cid + } + var url string + if agentLess { + defaultHeaders["DD-API-KEY"] = apiKey + url = fmt.Sprintf("https://trace.agent.%s/api/v0.1/pipeline_stats", site) + } else { + url = fmt.Sprintf("http://%s/v0.1/pipeline_stats", addr) + } + return &httpTransport{ + url: url, + client: client, + headers: defaultHeaders, + } +} + +func (t *httpTransport) sendPipelineStats(p *StatsPayload) error { + var buf bytes.Buffer + gzipWriter, err := gzip.NewWriterLevel(&buf, gzip.BestSpeed) + if err != nil { + return err + } + if err := msgp.Encode(gzipWriter, p); err != nil { + return err + } + err = gzipWriter.Close() + if err != nil { + return err + } + req, err := http.NewRequest("POST", t.url, &buf) + if err != nil { + return err + } + for header, value := range t.headers { + req.Header.Set(header, value) + } + resp, err := t.client.Do(req) + if err != nil { + return err + } + if code := resp.StatusCode; code >= 400 { + // error, check the body for context information and + // return a nice error. + msg := make([]byte, 1000) + n, _ := resp.Body.Read(msg) + resp.Body.Close() + txt := http.StatusText(code) + if n > 0 { + return fmt.Errorf("%s (Status: %s)", msg[:n], txt) + } + return fmt.Errorf("%s", txt) + } + return nil +} diff --git a/datastreams/transport_test.go b/datastreams/transport_test.go new file mode 100644 index 0000000000..7773a7b3c5 --- /dev/null +++ b/datastreams/transport_test.go @@ -0,0 +1,82 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package datastreams + +import ( + "compress/gzip" + "net/http" + "runtime" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/tinylib/msgp/msgp" +) + +type fakeTransport struct { + requests []*http.Request +} + +func (t *fakeTransport) RoundTrip(r *http.Request) (*http.Response, error) { + t.requests = append(t.requests, r) + return &http.Response{StatusCode: 200}, nil +} + +func toHeaders(headers map[string]string) http.Header { + h := make(http.Header) + for k, v := range headers { + h[k] = []string{v} + } + return h +} + +func TestHTTPTransport(t *testing.T) { + p := StatsPayload{Env: "env-1", Stats: []StatsBucket{{ + Start: 2, + Duration: 10, + Stats: []StatsPoint{{ + Service: "service-1", + EdgeTags: []string{"edge-1"}, + Hash: 1, + ParentHash: 2, + PathwayLatency: []byte{1, 2, 3}, + EdgeLatency: []byte{4, 5, 6}, + }}, + }}} + + t.Run("agentless", func(t *testing.T) { + fakeTransport := fakeTransport{} + transport := newHTTPTransport("agent-address", "datadoghq.com", "key", &http.Client{Transport: &fakeTransport}, true) + assert.Nil(t, transport.sendPipelineStats(&p)) + assert.Len(t, fakeTransport.requests, 1) + r := fakeTransport.requests[0] + assert.Equal(t, "https://trace.agent.datadoghq.com/api/v0.1/pipeline_stats", r.URL.String()) + goVersion := strings.TrimPrefix(runtime.Version(), "go") + headers := toHeaders(map[string]string{ + "Content-Encoding": "gzip", + "Content-Type": "application/msgpack", + "Datadog-Meta-Lang-Version": goVersion, + "Datadog-Meta-Lang": "go", + "Datadog-Meta-Lang-Interpreter": runtime.Compiler + "-" + runtime.GOARCH + "-" + runtime.GOOS, + "Dd-Api-Key": "key", + }) + assert.Equal(t, headers, r.Header) + gzReader, err := gzip.NewReader(r.Body) + assert.Nil(t, err) + var sentPayload StatsPayload + assert.Nil(t, msgp.Decode(gzReader, &sentPayload)) + assert.Equal(t, p, sentPayload) + }) + + t.Run("with_agent", func(t *testing.T) { + fakeTransport := fakeTransport{} + transport := newHTTPTransport("agent-address:8126", "datadoghq.com", "key", &http.Client{Transport: &fakeTransport}, false) + assert.Nil(t, transport.sendPipelineStats(&p)) + assert.Len(t, fakeTransport.requests, 1) + r := fakeTransport.requests[0] + assert.Equal(t, "http://agent-address:8126/v0.1/pipeline_stats", r.URL.String()) + }) +}