diff --git a/CODEOWNERS b/CODEOWNERS index c18afd3fea..e9e723542f 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -17,6 +17,10 @@ /contrib/**/*appsec*.go @DataDog/asm-go /.github/workflows/appsec.yml @DataDog/asm-go +# datastreams +/datastreams @Datadog/data-streams-monitoring +/internal/datastreams @Datadog/data-streams-monitoring + # telemetry /internal/telemetry @DataDog/apm-go diff --git a/contrib/Shopify/sarama/option.go b/contrib/Shopify/sarama/option.go index 23461fbad8..f2443b6a82 100644 --- a/contrib/Shopify/sarama/option.go +++ b/contrib/Shopify/sarama/option.go @@ -20,6 +20,8 @@ type config struct { consumerSpanName string producerSpanName string analyticsRate float64 + dataStreamsEnabled bool + groupID string } func defaults(cfg *config) { @@ -51,6 +53,20 @@ func WithServiceName(name string) Option { } } +// WithDataStreams enables the Data Streams monitoring product features: https://www.datadoghq.com/product/data-streams-monitoring/ +func WithDataStreams() Option { + return func(cfg *config) { + cfg.dataStreamsEnabled = true + } +} + +// WithGroupID tags the produced data streams metrics with the given groupID (aka consumer group) +func WithGroupID(groupID string) Option { + return func(cfg *config) { + cfg.groupID = groupID + } +} + // WithAnalytics enables Trace Analytics for all started spans. func WithAnalytics(on bool) Option { return func(cfg *config) { diff --git a/contrib/Shopify/sarama/sarama.go b/contrib/Shopify/sarama/sarama.go index 3bfbc56bb1..f1e5b9fee1 100644 --- a/contrib/Shopify/sarama/sarama.go +++ b/contrib/Shopify/sarama/sarama.go @@ -7,8 +7,11 @@ package sarama // import "gopkg.in/DataDog/dd-trace-go.v1/contrib/Shopify/sarama" import ( + "context" "math" + "gopkg.in/DataDog/dd-trace-go.v1/datastreams" + "gopkg.in/DataDog/dd-trace-go.v1/datastreams/options" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" @@ -76,6 +79,7 @@ func WrapPartitionConsumer(pc sarama.PartitionConsumer, opts ...Option) sarama.P next := tracer.StartSpan(cfg.consumerSpanName, opts...) // reinject the span context so consumers can pick it up tracer.Inject(next.Context(), carrier) + setConsumeCheckpoint(cfg.dataStreamsEnabled, cfg.groupID, msg) wrapped.messages <- msg @@ -127,8 +131,12 @@ type syncProducer struct { // SendMessage calls sarama.SyncProducer.SendMessage and traces the request. func (p *syncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) { span := startProducerSpan(p.cfg, p.version, msg) + setProduceCheckpoint(p.cfg.dataStreamsEnabled, msg, p.version) partition, offset, err = p.SyncProducer.SendMessage(msg) finishProducerSpan(span, partition, offset, err) + if err == nil && p.cfg.dataStreamsEnabled { + tracer.TrackKafkaProduceOffset(msg.Topic, partition, offset) + } return partition, offset, err } @@ -138,12 +146,19 @@ func (p *syncProducer) SendMessages(msgs []*sarama.ProducerMessage) error { // treated individually, so we create a span for each one spans := make([]ddtrace.Span, len(msgs)) for i, msg := range msgs { + setProduceCheckpoint(p.cfg.dataStreamsEnabled, msg, p.version) spans[i] = startProducerSpan(p.cfg, p.version, msg) } err := p.SyncProducer.SendMessages(msgs) for i, span := range spans { finishProducerSpan(span, msgs[i].Partition, msgs[i].Offset, err) } + if err == nil && p.cfg.dataStreamsEnabled { + // we only track Kafka lag if messages have been sent successfully. Otherwise, we have no way to know to which partition data was sent to. + for _, msg := range msgs { + tracer.TrackKafkaProduceOffset(msg.Topic, msg.Partition, msg.Offset) + } + } return err } @@ -221,6 +236,7 @@ func WrapAsyncProducer(saramaConfig *sarama.Config, p sarama.AsyncProducer, opts select { case msg := <-wrapped.input: span := startProducerSpan(cfg, saramaConfig.Version, msg) + setProduceCheckpoint(cfg.dataStreamsEnabled, msg, saramaConfig.Version) p.Input() <- msg if saramaConfig.Producer.Return.Successes { spanID := span.Context().SpanID() @@ -236,6 +252,10 @@ func WrapAsyncProducer(saramaConfig *sarama.Config, p sarama.AsyncProducer, opts // producer was closed, so exit return } + if cfg.dataStreamsEnabled { + // we only track Kafka lag if returning successes is enabled. Otherwise, we have no way to know to which partition data was sent to. + tracer.TrackKafkaProduceOffset(msg.Topic, msg.Partition, msg.Offset) + } if spanctx, spanFound := getSpanContext(msg); spanFound { spanID := spanctx.SpanID() if span, ok := spans[spanID]; ok { @@ -303,3 +323,57 @@ func getSpanContext(msg *sarama.ProducerMessage) (ddtrace.SpanContext, bool) { return spanctx, true } + +func setProduceCheckpoint(enabled bool, msg *sarama.ProducerMessage, version sarama.KafkaVersion) { + if !enabled || msg == nil { + return + } + edges := []string{"direction:out", "topic:" + msg.Topic, "type:kafka"} + carrier := NewProducerMessageCarrier(msg) + ctx, ok := tracer.SetDataStreamsCheckpointWithParams(datastreams.ExtractFromBase64Carrier(context.Background(), carrier), options.CheckpointParams{PayloadSize: getProducerMsgSize(msg)}, edges...) + if !ok || !version.IsAtLeast(sarama.V0_11_0_0) { + return + } + datastreams.InjectToBase64Carrier(ctx, carrier) +} + +func setConsumeCheckpoint(enabled bool, groupID string, msg *sarama.ConsumerMessage) { + if !enabled || msg == nil { + return + } + edges := []string{"direction:in", "topic:" + msg.Topic, "type:kafka"} + if groupID != "" { + edges = append(edges, "group:"+groupID) + } + carrier := NewConsumerMessageCarrier(msg) + ctx, ok := tracer.SetDataStreamsCheckpointWithParams(datastreams.ExtractFromBase64Carrier(context.Background(), carrier), options.CheckpointParams{PayloadSize: getConsumerMsgSize(msg)}, edges...) + if !ok { + return + } + datastreams.InjectToBase64Carrier(ctx, carrier) + if groupID != "" { + // only track Kafka lag if a consumer group is set. + // since there is no ack mechanism, we consider that messages read are committed right away. + tracer.TrackKafkaCommitOffset(groupID, msg.Topic, msg.Partition, msg.Offset) + } +} + +func getProducerMsgSize(msg *sarama.ProducerMessage) (size int64) { + for _, header := range msg.Headers { + size += int64(len(header.Key) + len(header.Value)) + } + if msg.Value != nil { + size += int64(msg.Value.Length()) + } + if msg.Key != nil { + size += int64(msg.Key.Length()) + } + return size +} + +func getConsumerMsgSize(msg *sarama.ConsumerMessage) (size int64) { + for _, header := range msg.Headers { + size += int64(len(header.Key) + len(header.Value)) + } + return size + int64(len(msg.Value)+len(msg.Key)) +} diff --git a/contrib/Shopify/sarama/sarama_test.go b/contrib/Shopify/sarama/sarama_test.go index 6254496f14..4237f6e764 100644 --- a/contrib/Shopify/sarama/sarama_test.go +++ b/contrib/Shopify/sarama/sarama_test.go @@ -11,6 +11,7 @@ import ( "time" "gopkg.in/DataDog/dd-trace-go.v1/contrib/internal/namingschematest" + "gopkg.in/DataDog/dd-trace-go.v1/datastreams" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" @@ -115,7 +116,7 @@ func TestConsumer(t *testing.T) { } defer consumer.Close() - consumer = WrapConsumer(consumer) + consumer = WrapConsumer(consumer, WithDataStreams()) partitionConsumer, err := consumer.ConsumePartition("test-topic", 0, 0) if err != nil { @@ -145,6 +146,12 @@ func TestConsumer(t *testing.T) { assert.Equal(t, "Shopify/sarama", s.Tag(ext.Component)) assert.Equal(t, ext.SpanKindConsumer, s.Tag(ext.SpanKind)) assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem)) + p, ok := datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), NewConsumerMessageCarrier(msg1))) + assert.True(t, ok) + expectedCtx, _ := tracer.SetDataStreamsCheckpoint(context.Background(), "direction:in", "topic:test-topic", "type:kafka") + expected, _ := datastreams.PathwayFromContext(expectedCtx) + assert.NotEqual(t, expected.GetHash(), 0) + assert.Equal(t, expected.GetHash(), p.GetHash()) } { s := spans[1] @@ -162,6 +169,12 @@ func TestConsumer(t *testing.T) { assert.Equal(t, "Shopify/sarama", s.Tag(ext.Component)) assert.Equal(t, ext.SpanKindConsumer, s.Tag(ext.SpanKind)) assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem)) + p, ok := datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), NewConsumerMessageCarrier(msg1))) + assert.True(t, ok) + expectedCtx, _ := tracer.SetDataStreamsCheckpoint(context.Background(), "direction:in", "topic:test-topic", "type:kafka") + expected, _ := datastreams.PathwayFromContext(expectedCtx) + assert.NotEqual(t, expected.GetHash(), 0) + assert.Equal(t, expected.GetHash(), p.GetHash()) } } @@ -176,23 +189,25 @@ func TestSyncProducer(t *testing.T) { defer leader.Close() metadataResponse := new(sarama.MetadataResponse) + metadataResponse.Version = 1 metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError) seedBroker.Returns(metadataResponse) prodSuccess := new(sarama.ProduceResponse) + prodSuccess.Version = 2 prodSuccess.AddTopicPartition("my_topic", 0, sarama.ErrNoError) leader.Returns(prodSuccess) cfg := sarama.NewConfig() - cfg.Version = sarama.MinVersion + cfg.Version = sarama.V0_11_0_0 // first version that supports headers cfg.Producer.Return.Successes = true producer, err := sarama.NewSyncProducer([]string{seedBroker.Addr()}, cfg) if err != nil { t.Fatal(err) } - producer = WrapSyncProducer(cfg, producer) + producer = WrapSyncProducer(cfg, producer, WithDataStreams()) msg1 := &sarama.ProducerMessage{ Topic: "my_topic", @@ -214,6 +229,12 @@ func TestSyncProducer(t *testing.T) { assert.Equal(t, "Shopify/sarama", s.Tag(ext.Component)) assert.Equal(t, ext.SpanKindProducer, s.Tag(ext.SpanKind)) assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem)) + p, ok := datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), NewProducerMessageCarrier(msg1))) + assert.True(t, ok) + expectedCtx, _ := tracer.SetDataStreamsCheckpoint(context.Background(), "direction:out", "topic:my_topic", "type:kafka") + expected, _ := datastreams.PathwayFromContext(expectedCtx) + assert.NotEqual(t, expected.GetHash(), 0) + assert.Equal(t, expected.GetHash(), p.GetHash()) } } diff --git a/contrib/confluentinc/confluent-kafka-go/kafka.v2/example_test.go b/contrib/confluentinc/confluent-kafka-go/kafka.v2/example_test.go index 6270860f33..9798b7158a 100644 --- a/contrib/confluentinc/confluent-kafka-go/kafka.v2/example_test.go +++ b/contrib/confluentinc/confluent-kafka-go/kafka.v2/example_test.go @@ -21,6 +21,7 @@ var ( // This example shows how a span context can be passed from a producer to a consumer. func Example() { + tracer.Start() defer tracer.Stop() @@ -31,6 +32,7 @@ func Example() { "session.timeout.ms": 10, "enable.auto.offset.store": false, }) + err = c.Subscribe(testTopic, nil) if err != nil { panic(err) @@ -56,6 +58,7 @@ func Example() { tracer.Inject(parentSpan.Context(), carrier) c.Consumer.Events() <- msg + }() msg := (<-c.Events()).(*kafka.Message) @@ -66,6 +69,7 @@ func Example() { if err != nil { panic(err) } + parentContext := parentSpan.Context() // Validate that the context passed is the context sent via the message diff --git a/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka.go b/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka.go index 0133fee151..8c7ae41542 100644 --- a/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka.go +++ b/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka.go @@ -7,9 +7,12 @@ package kafka // import "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/kafka" import ( + "context" "math" "time" + "gopkg.in/DataDog/dd-trace-go.v1/datastreams" + "gopkg.in/DataDog/dd-trace-go.v1/datastreams/options" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" @@ -19,11 +22,16 @@ import ( "github.com/confluentinc/confluent-kafka-go/v2/kafka" ) -const componentName = "confluentinc/confluent-kafka-go/kafka.v2" +const ( + // make sure these 3 are updated to V2 for the V2 version. + componentName = "confluentinc/confluent-kafka-go/kafka.v2" + packageName = "contrib/confluentinc/confluent-kafka-go/kafka.v2" + integrationName = "github.com/confluentinc/confluent-kafka-go/v2" +) func init() { telemetry.LoadIntegration(componentName) - tracer.MarkIntegrationImported("github.com/confluentinc/confluent-kafka-go/v2") + tracer.MarkIntegrationImported(integrationName) } // NewConsumer calls kafka.NewConsumer and wraps the resulting Consumer. @@ -60,7 +68,7 @@ func WrapConsumer(c *kafka.Consumer, opts ...Option) *Consumer { Consumer: c, cfg: newConfig(opts...), } - log.Debug("contrib/confluentinc/confluent-kafka-go/kafka.v2: Wrapping Consumer: %#v", wrapped.cfg) + log.Debug("%s: Wrapping Consumer: %#v", packageName, wrapped.cfg) wrapped.events = wrapped.traceEventsChannel(c.Events()) return wrapped } @@ -80,6 +88,9 @@ func (c *Consumer) traceEventsChannel(in chan kafka.Event) chan kafka.Event { // only trace messages if msg, ok := evt.(*kafka.Message); ok { next = c.startSpan(msg) + setConsumeCheckpoint(c.cfg.dataStreamsEnabled, c.cfg.groupID, msg) + } else if offset, ok := evt.(kafka.OffsetsCommitted); ok { + commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, offset.Offsets, offset.Error) } out <- evt @@ -107,7 +118,7 @@ func (c *Consumer) startSpan(msg *kafka.Message) ddtrace.Span { tracer.Tag("offset", msg.TopicPartition.Offset), tracer.Tag(ext.Component, componentName), tracer.Tag(ext.SpanKind, ext.SpanKindConsumer), - tracer.Tag(ext.MessagingSystem, "kafka"), + tracer.Tag(ext.MessagingSystem, ext.MessagingSystemKafka), tracer.Measured(), } if c.cfg.bootstrapServers != "" { @@ -161,7 +172,10 @@ func (c *Consumer) Poll(timeoutMS int) (event kafka.Event) { } evt := c.Consumer.Poll(timeoutMS) if msg, ok := evt.(*kafka.Message); ok { + setConsumeCheckpoint(c.cfg.dataStreamsEnabled, c.cfg.groupID, msg) c.prev = c.startSpan(msg) + } else if offset, ok := evt.(kafka.OffsetsCommitted); ok { + commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, offset.Offsets, offset.Error) } return evt } @@ -176,28 +190,80 @@ func (c *Consumer) ReadMessage(timeout time.Duration) (*kafka.Message, error) { if err != nil { return nil, err } + setConsumeCheckpoint(c.cfg.dataStreamsEnabled, c.cfg.groupID, msg) c.prev = c.startSpan(msg) return msg, nil } +// Commit commits current offsets and tracks the commit offsets if data streams is enabled. +func (c *Consumer) Commit() ([]kafka.TopicPartition, error) { + tps, err := c.Consumer.Commit() + commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, tps, err) + return tps, err +} + +// CommitMessage commits a message and tracks the commit offsets if data streams is enabled. +func (c *Consumer) CommitMessage(msg *kafka.Message) ([]kafka.TopicPartition, error) { + tps, err := c.Consumer.CommitMessage(msg) + commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, tps, err) + return tps, err +} + +// CommitOffsets commits provided offsets and tracks the commit offsets if data streams is enabled. +func (c *Consumer) CommitOffsets(offsets []kafka.TopicPartition) ([]kafka.TopicPartition, error) { + tps, err := c.Consumer.CommitOffsets(offsets) + commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, tps, err) + return tps, err +} + +func commitOffsets(dataStreamsEnabled bool, groupID string, tps []kafka.TopicPartition, err error) { + if err != nil || groupID == "" || !dataStreamsEnabled { + return + } + for _, tp := range tps { + tracer.TrackKafkaCommitOffset(groupID, *tp.Topic, tp.Partition, int64(tp.Offset)) + } +} + +func trackProduceOffsets(dataStreamsEnabled bool, msg *kafka.Message, err error) { + if err != nil || !dataStreamsEnabled || msg.TopicPartition.Topic == nil { + return + } + tracer.TrackKafkaProduceOffset(*msg.TopicPartition.Topic, msg.TopicPartition.Partition, int64(msg.TopicPartition.Offset)) +} + // A Producer wraps a kafka.Producer. type Producer struct { *kafka.Producer cfg *config produceChannel chan *kafka.Message + events chan kafka.Event + libraryVersion int } // WrapProducer wraps a kafka.Producer so requests are traced. func WrapProducer(p *kafka.Producer, opts ...Option) *Producer { + version, _ := kafka.LibraryVersion() wrapped := &Producer{ - Producer: p, - cfg: newConfig(opts...), + Producer: p, + cfg: newConfig(opts...), + events: p.Events(), + libraryVersion: version, } - log.Debug("contrib/confluentinc/confluent-kafka-go/kafka.v2: Wrapping Producer: %#v", wrapped.cfg) + log.Debug("%s: Wrapping Producer: %#v", packageName, wrapped.cfg) wrapped.produceChannel = wrapped.traceProduceChannel(p.ProduceChannel()) + if wrapped.cfg.dataStreamsEnabled { + wrapped.events = wrapped.traceEventsChannel(p.Events()) + } return wrapped } +// Events returns the kafka Events channel (if enabled). Message events will be monitored +// with data streams monitoring (if enabled) +func (p *Producer) Events() chan kafka.Event { + return p.events +} + func (p *Producer) traceProduceChannel(out chan *kafka.Message) chan *kafka.Message { if out == nil { return out @@ -206,6 +272,7 @@ func (p *Producer) traceProduceChannel(out chan *kafka.Message) chan *kafka.Mess go func() { for msg := range in { span := p.startSpan(msg) + setProduceCheckpoint(p.cfg.dataStreamsEnabled, p.libraryVersion, msg) out <- msg span.Finish() } @@ -220,7 +287,7 @@ func (p *Producer) startSpan(msg *kafka.Message) ddtrace.Span { tracer.SpanType(ext.SpanTypeMessageProducer), tracer.Tag(ext.Component, componentName), tracer.Tag(ext.SpanKind, ext.SpanKindProducer), - tracer.Tag(ext.MessagingSystem, "kafka"), + tracer.Tag(ext.MessagingSystem, ext.MessagingSystemKafka), tracer.Tag(ext.MessagingKafkaPartition, msg.TopicPartition.Partition), } if p.cfg.bootstrapServers != "" { @@ -262,11 +329,14 @@ func (p *Producer) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) er if msg, ok := evt.(*kafka.Message); ok { // delivery errors are returned via TopicPartition.Error err = msg.TopicPartition.Error + trackProduceOffsets(p.cfg.dataStreamsEnabled, msg, err) } span.Finish(tracer.WithError(err)) oldDeliveryChan <- evt }() } + + setProduceCheckpoint(p.cfg.dataStreamsEnabled, p.libraryVersion, msg) err := p.Producer.Produce(msg, deliveryChan) // with no delivery channel, finish immediately if deliveryChan == nil { @@ -281,3 +351,57 @@ func (p *Producer) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) er func (p *Producer) ProduceChannel() chan *kafka.Message { return p.produceChannel } + +func (p *Producer) traceEventsChannel(in chan kafka.Event) chan kafka.Event { + if in == nil { + return nil + } + out := make(chan kafka.Event, 1) + go func() { + defer close(out) + for evt := range in { + if msg, ok := evt.(*kafka.Message); ok { + trackProduceOffsets(p.cfg.dataStreamsEnabled, msg, msg.TopicPartition.Error) + } + out <- evt + } + }() + return out +} + +func setConsumeCheckpoint(dataStreamsEnabled bool, groupID string, msg *kafka.Message) { + if !dataStreamsEnabled || msg == nil { + return + } + edges := []string{"direction:in", "topic:" + *msg.TopicPartition.Topic, "type:kafka"} + if groupID != "" { + edges = append(edges, "group:"+groupID) + } + carrier := NewMessageCarrier(msg) + ctx, ok := tracer.SetDataStreamsCheckpointWithParams(datastreams.ExtractFromBase64Carrier(context.Background(), carrier), options.CheckpointParams{PayloadSize: getMsgSize(msg)}, edges...) + if !ok { + return + } + datastreams.InjectToBase64Carrier(ctx, carrier) +} + +func setProduceCheckpoint(dataStreamsEnabled bool, version int, msg *kafka.Message) { + if !dataStreamsEnabled || msg == nil { + return + } + edges := []string{"direction:out", "topic:" + *msg.TopicPartition.Topic, "type:kafka"} + carrier := NewMessageCarrier(msg) + ctx, ok := tracer.SetDataStreamsCheckpointWithParams(datastreams.ExtractFromBase64Carrier(context.Background(), carrier), options.CheckpointParams{PayloadSize: getMsgSize(msg)}, edges...) + if !ok || version < 0x000b0400 { + // headers not supported before librdkafka >=0.11.4 + return + } + datastreams.InjectToBase64Carrier(ctx, carrier) +} + +func getMsgSize(msg *kafka.Message) (size int64) { + for _, header := range msg.Headers { + size += int64(len(header.Key) + len(header.Value)) + } + return size + int64(len(msg.Value)+len(msg.Key)) +} diff --git a/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka_test.go b/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka_test.go index aca1722374..32dfa1e9eb 100644 --- a/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka_test.go +++ b/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka_test.go @@ -13,6 +13,7 @@ import ( "time" "gopkg.in/DataDog/dd-trace-go.v1/contrib/internal/namingschematest" + "gopkg.in/DataDog/dd-trace-go.v1/datastreams" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" @@ -29,7 +30,7 @@ var ( type consumerActionFn func(c *Consumer) (*kafka.Message, error) -func genIntegrationTestSpans(t *testing.T, consumerAction consumerActionFn, producerOpts []Option, consumerOpts []Option) []mocktracer.Span { +func produceThenConsume(t *testing.T, consumerAction consumerActionFn, producerOpts []Option, consumerOpts []Option) ([]mocktracer.Span, *kafka.Message) { if _, ok := os.LookupEnv("INTEGRATION"); !ok { t.Skip("to enable integration test, set the INTEGRATION environment variable") } @@ -83,7 +84,7 @@ func genIntegrationTestSpans(t *testing.T, consumerAction consumerActionFn, prod require.Len(t, spans, 2) // they should be linked via headers assert.Equal(t, spans[0].TraceID(), spans[1].TraceID()) - return spans + return spans, msg2 } func TestConsumerChannel(t *testing.T) { @@ -99,7 +100,7 @@ func TestConsumerChannel(t *testing.T) { "socket.timeout.ms": 10, "session.timeout.ms": 10, "enable.auto.offset.store": false, - }, WithAnalyticsRate(0.3)) + }, WithAnalyticsRate(0.3), WithDataStreams()) assert.NoError(t, err) err = c.Subscribe(testTopic, nil) @@ -145,10 +146,18 @@ func TestConsumerChannel(t *testing.T) { assert.Equal(t, int32(1), s.Tag(ext.MessagingKafkaPartition)) assert.Equal(t, 0.3, s.Tag(ext.EventSampleRate)) assert.Equal(t, kafka.Offset(i+1), s.Tag("offset")) - assert.Equal(t, "confluentinc/confluent-kafka-go/kafka.v2", s.Tag(ext.Component)) + assert.Equal(t, componentName, s.Tag(ext.Component)) assert.Equal(t, ext.SpanKindConsumer, s.Tag(ext.SpanKind)) assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem)) } + for _, msg := range []*kafka.Message{msg1, msg2} { + p, ok := datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), NewMessageCarrier(msg))) + assert.True(t, ok) + expectedCtx, _ := tracer.SetDataStreamsCheckpoint(context.Background(), "group:"+testGroupID, "direction:in", "topic:"+testTopic, "type:kafka") + expected, _ := datastreams.PathwayFromContext(expectedCtx) + assert.NotEqual(t, expected.GetHash(), 0) + assert.Equal(t, expected.GetHash(), p.GetHash()) + } } /* @@ -199,7 +208,7 @@ func TestConsumerFunctional(t *testing.T) { }, } { t.Run(tt.name, func(t *testing.T) { - spans := genIntegrationTestSpans(t, tt.action, []Option{WithAnalyticsRate(0.1)}, nil) + spans, msg := produceThenConsume(t, tt.action, []Option{WithAnalyticsRate(0.1), WithDataStreams()}, []Option{WithDataStreams()}) s0 := spans[0] // produce assert.Equal(t, "kafka.produce", s0.OperationName()) @@ -208,10 +217,10 @@ func TestConsumerFunctional(t *testing.T) { assert.Equal(t, 0.1, s0.Tag(ext.EventSampleRate)) assert.Equal(t, "queue", s0.Tag(ext.SpanType)) assert.Equal(t, int32(0), s0.Tag(ext.MessagingKafkaPartition)) - assert.Equal(t, "confluentinc/confluent-kafka-go/kafka.v2", s0.Tag(ext.Component)) + assert.Equal(t, componentName, s0.Tag(ext.Component)) assert.Equal(t, ext.SpanKindProducer, s0.Tag(ext.SpanKind)) - assert.Equal(t, "127.0.0.1", s0.Tag(ext.KafkaBootstrapServers)) assert.Equal(t, "kafka", s0.Tag(ext.MessagingSystem)) + assert.Equal(t, "127.0.0.1", s0.Tag(ext.KafkaBootstrapServers)) s1 := spans[1] // consume assert.Equal(t, "kafka.consume", s1.OperationName()) @@ -220,10 +229,20 @@ func TestConsumerFunctional(t *testing.T) { assert.Equal(t, nil, s1.Tag(ext.EventSampleRate)) assert.Equal(t, "queue", s1.Tag(ext.SpanType)) assert.Equal(t, int32(0), s1.Tag(ext.MessagingKafkaPartition)) - assert.Equal(t, "confluentinc/confluent-kafka-go/kafka.v2", s1.Tag(ext.Component)) + assert.Equal(t, componentName, s1.Tag(ext.Component)) assert.Equal(t, ext.SpanKindConsumer, s1.Tag(ext.SpanKind)) - assert.Equal(t, "127.0.0.1", s1.Tag(ext.KafkaBootstrapServers)) assert.Equal(t, "kafka", s1.Tag(ext.MessagingSystem)) + assert.Equal(t, "127.0.0.1", s1.Tag(ext.KafkaBootstrapServers)) + + p, ok := datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), NewMessageCarrier(msg))) + assert.True(t, ok) + mt := mocktracer.Start() + ctx, _ := tracer.SetDataStreamsCheckpoint(context.Background(), "direction:out", "topic:"+testTopic, "type:kafka") + expectedCtx, _ := tracer.SetDataStreamsCheckpoint(ctx, "group:"+testGroupID, "direction:in", "topic:"+testTopic, "type:kafka") + expected, _ := datastreams.PathwayFromContext(expectedCtx) + mt.Stop() + assert.NotEqual(t, expected.GetHash(), 0) + assert.Equal(t, expected.GetHash(), p.GetHash()) }) } } @@ -351,7 +370,8 @@ func TestNamingSchema(t *testing.T) { consumerAction := consumerActionFn(func(c *Consumer) (*kafka.Message, error) { return c.ReadMessage(3000 * time.Millisecond) }) - return genIntegrationTestSpans(t, consumerAction, opts, opts) + spans, _ := produceThenConsume(t, consumerAction, opts, opts) + return spans } namingschematest.NewKafkaTest(genSpans)(t) } diff --git a/contrib/confluentinc/confluent-kafka-go/kafka.v2/option.go b/contrib/confluentinc/confluent-kafka-go/kafka.v2/option.go index c2b9e9d04f..d946a426ed 100644 --- a/contrib/confluentinc/confluent-kafka-go/kafka.v2/option.go +++ b/contrib/confluentinc/confluent-kafka-go/kafka.v2/option.go @@ -27,7 +27,9 @@ type config struct { producerSpanName string analyticsRate float64 bootstrapServers string + groupID string tagFns map[string]func(msg *kafka.Message) interface{} + dataStreamsEnabled bool } // An Option customizes the config. @@ -111,6 +113,9 @@ func WithCustomTag(tag string, tagFn func(msg *kafka.Message) interface{}) Optio // WithConfig extracts the config information for the client to be tagged func WithConfig(cg *kafka.ConfigMap) Option { return func(cfg *config) { + if groupID, err := cg.Get("group.id", ""); err == nil { + cfg.groupID = groupID.(string) + } if bs, err := cg.Get("bootstrap.servers", ""); err == nil && bs != "" { for _, addr := range strings.Split(bs.(string), ",") { host, _, err := net.SplitHostPort(addr) @@ -122,3 +127,10 @@ func WithConfig(cg *kafka.ConfigMap) Option { } } } + +// WithDataStreams enables the Data Streams monitoring product features: https://www.datadoghq.com/product/data-streams-monitoring/ +func WithDataStreams() Option { + return func(cfg *config) { + cfg.dataStreamsEnabled = true + } +} diff --git a/contrib/confluentinc/confluent-kafka-go/kafka/kafka.go b/contrib/confluentinc/confluent-kafka-go/kafka/kafka.go index 28c8283dae..c0c9a91c29 100644 --- a/contrib/confluentinc/confluent-kafka-go/kafka/kafka.go +++ b/contrib/confluentinc/confluent-kafka-go/kafka/kafka.go @@ -7,9 +7,12 @@ package kafka // import "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/kafka" import ( + "context" "math" "time" + "gopkg.in/DataDog/dd-trace-go.v1/datastreams" + "gopkg.in/DataDog/dd-trace-go.v1/datastreams/options" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" @@ -19,11 +22,16 @@ import ( "github.com/confluentinc/confluent-kafka-go/kafka" ) -const componentName = "confluentinc/confluent-kafka-go/kafka" +const ( + // make sure these 3 are updated to V2 for the V2 version. + componentName = "confluentinc/confluent-kafka-go/kafka" + packageName = "contrib/confluentinc/confluent-kafka-go/kafka" + integrationName = "github.com/confluentinc/confluent-kafka-go" +) func init() { telemetry.LoadIntegration(componentName) - tracer.MarkIntegrationImported("github.com/confluentinc/confluent-kafka-go") + tracer.MarkIntegrationImported(integrationName) } // NewConsumer calls kafka.NewConsumer and wraps the resulting Consumer. @@ -32,7 +40,6 @@ func NewConsumer(conf *kafka.ConfigMap, opts ...Option) (*Consumer, error) { if err != nil { return nil, err } - opts = append(opts, WithConfig(conf)) return WrapConsumer(c, opts...), nil } @@ -61,7 +68,7 @@ func WrapConsumer(c *kafka.Consumer, opts ...Option) *Consumer { Consumer: c, cfg: newConfig(opts...), } - log.Debug("contrib/confluentinc/confluent-kafka-go/kafka: Wrapping Consumer: %#v", wrapped.cfg) + log.Debug("%s: Wrapping Consumer: %#v", packageName, wrapped.cfg) wrapped.events = wrapped.traceEventsChannel(c.Events()) return wrapped } @@ -81,6 +88,9 @@ func (c *Consumer) traceEventsChannel(in chan kafka.Event) chan kafka.Event { // only trace messages if msg, ok := evt.(*kafka.Message); ok { next = c.startSpan(msg) + setConsumeCheckpoint(c.cfg.dataStreamsEnabled, c.cfg.groupID, msg) + } else if offset, ok := evt.(kafka.OffsetsCommitted); ok { + commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, offset.Offsets, offset.Error) } out <- evt @@ -96,7 +106,6 @@ func (c *Consumer) traceEventsChannel(in chan kafka.Event) chan kafka.Event { c.prev = nil } }() - return out } @@ -112,7 +121,6 @@ func (c *Consumer) startSpan(msg *kafka.Message) ddtrace.Span { tracer.Tag(ext.MessagingSystem, ext.MessagingSystemKafka), tracer.Measured(), } - if c.cfg.bootstrapServers != "" { opts = append(opts, tracer.Tag(ext.KafkaBootstrapServers, c.cfg.bootstrapServers)) } @@ -164,7 +172,10 @@ func (c *Consumer) Poll(timeoutMS int) (event kafka.Event) { } evt := c.Consumer.Poll(timeoutMS) if msg, ok := evt.(*kafka.Message); ok { + setConsumeCheckpoint(c.cfg.dataStreamsEnabled, c.cfg.groupID, msg) c.prev = c.startSpan(msg) + } else if offset, ok := evt.(kafka.OffsetsCommitted); ok { + commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, offset.Offsets, offset.Error) } return evt } @@ -179,42 +190,93 @@ func (c *Consumer) ReadMessage(timeout time.Duration) (*kafka.Message, error) { if err != nil { return nil, err } + setConsumeCheckpoint(c.cfg.dataStreamsEnabled, c.cfg.groupID, msg) c.prev = c.startSpan(msg) return msg, nil } +// Commit commits current offsets and tracks the commit offsets if data streams is enabled. +func (c *Consumer) Commit() ([]kafka.TopicPartition, error) { + tps, err := c.Consumer.Commit() + commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, tps, err) + return tps, err +} + +// CommitMessage commits a message and tracks the commit offsets if data streams is enabled. +func (c *Consumer) CommitMessage(msg *kafka.Message) ([]kafka.TopicPartition, error) { + tps, err := c.Consumer.CommitMessage(msg) + commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, tps, err) + return tps, err +} + +// CommitOffsets commits provided offsets and tracks the commit offsets if data streams is enabled. +func (c *Consumer) CommitOffsets(offsets []kafka.TopicPartition) ([]kafka.TopicPartition, error) { + tps, err := c.Consumer.CommitOffsets(offsets) + commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, tps, err) + return tps, err +} + +func commitOffsets(dataStreamsEnabled bool, groupID string, tps []kafka.TopicPartition, err error) { + if err != nil || groupID == "" || !dataStreamsEnabled { + return + } + for _, tp := range tps { + tracer.TrackKafkaCommitOffset(groupID, *tp.Topic, tp.Partition, int64(tp.Offset)) + } +} + +func trackProduceOffsets(dataStreamsEnabled bool, msg *kafka.Message, err error) { + if err != nil || !dataStreamsEnabled || msg.TopicPartition.Topic == nil { + return + } + tracer.TrackKafkaProduceOffset(*msg.TopicPartition.Topic, msg.TopicPartition.Partition, int64(msg.TopicPartition.Offset)) +} + // A Producer wraps a kafka.Producer. type Producer struct { *kafka.Producer cfg *config produceChannel chan *kafka.Message + events chan kafka.Event + libraryVersion int } // WrapProducer wraps a kafka.Producer so requests are traced. func WrapProducer(p *kafka.Producer, opts ...Option) *Producer { + version, _ := kafka.LibraryVersion() wrapped := &Producer{ - Producer: p, - cfg: newConfig(opts...), + Producer: p, + cfg: newConfig(opts...), + events: p.Events(), + libraryVersion: version, } - log.Debug("contrib/confluentinc/confluent-kafka-go/kafka: Wrapping Producer: %#v", wrapped.cfg) + log.Debug("%s: Wrapping Producer: %#v", packageName, wrapped.cfg) wrapped.produceChannel = wrapped.traceProduceChannel(p.ProduceChannel()) + if wrapped.cfg.dataStreamsEnabled { + wrapped.events = wrapped.traceEventsChannel(p.Events()) + } return wrapped } +// Events returns the kafka Events channel (if enabled). Message events will be monitored +// with data streams monitoring (if enabled) +func (p *Producer) Events() chan kafka.Event { + return p.events +} + func (p *Producer) traceProduceChannel(out chan *kafka.Message) chan *kafka.Message { if out == nil { return out } - in := make(chan *kafka.Message, 1) go func() { for msg := range in { span := p.startSpan(msg) + setProduceCheckpoint(p.cfg.dataStreamsEnabled, p.libraryVersion, msg) out <- msg span.Finish() } }() - return in } @@ -228,7 +290,6 @@ func (p *Producer) startSpan(msg *kafka.Message) ddtrace.Span { tracer.Tag(ext.MessagingSystem, ext.MessagingSystemKafka), tracer.Tag(ext.MessagingKafkaPartition, msg.TopicPartition.Partition), } - if p.cfg.bootstrapServers != "" { opts = append(opts, tracer.Tag(ext.KafkaBootstrapServers, p.cfg.bootstrapServers)) } @@ -240,7 +301,6 @@ func (p *Producer) startSpan(msg *kafka.Message) ddtrace.Span { if spanctx, err := tracer.Extract(carrier); err == nil { opts = append(opts, tracer.ChildOf(spanctx)) } - span, _ := tracer.StartSpanFromContext(p.cfg.ctx, p.cfg.producerSpanName, opts...) // inject the span context so consumers can pick it up tracer.Inject(span.Context(), carrier) @@ -269,12 +329,14 @@ func (p *Producer) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) er if msg, ok := evt.(*kafka.Message); ok { // delivery errors are returned via TopicPartition.Error err = msg.TopicPartition.Error + trackProduceOffsets(p.cfg.dataStreamsEnabled, msg, err) } span.Finish(tracer.WithError(err)) oldDeliveryChan <- evt }() } + setProduceCheckpoint(p.cfg.dataStreamsEnabled, p.libraryVersion, msg) err := p.Producer.Produce(msg, deliveryChan) // with no delivery channel, finish immediately if deliveryChan == nil { @@ -289,3 +351,57 @@ func (p *Producer) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) er func (p *Producer) ProduceChannel() chan *kafka.Message { return p.produceChannel } + +func (p *Producer) traceEventsChannel(in chan kafka.Event) chan kafka.Event { + if in == nil { + return nil + } + out := make(chan kafka.Event, 1) + go func() { + defer close(out) + for evt := range in { + if msg, ok := evt.(*kafka.Message); ok { + trackProduceOffsets(p.cfg.dataStreamsEnabled, msg, msg.TopicPartition.Error) + } + out <- evt + } + }() + return out +} + +func setConsumeCheckpoint(dataStreamsEnabled bool, groupID string, msg *kafka.Message) { + if !dataStreamsEnabled || msg == nil { + return + } + edges := []string{"direction:in", "topic:" + *msg.TopicPartition.Topic, "type:kafka"} + if groupID != "" { + edges = append(edges, "group:"+groupID) + } + carrier := NewMessageCarrier(msg) + ctx, ok := tracer.SetDataStreamsCheckpointWithParams(datastreams.ExtractFromBase64Carrier(context.Background(), carrier), options.CheckpointParams{PayloadSize: getMsgSize(msg)}, edges...) + if !ok { + return + } + datastreams.InjectToBase64Carrier(ctx, carrier) +} + +func setProduceCheckpoint(dataStreamsEnabled bool, version int, msg *kafka.Message) { + if !dataStreamsEnabled || msg == nil { + return + } + edges := []string{"direction:out", "topic:" + *msg.TopicPartition.Topic, "type:kafka"} + carrier := NewMessageCarrier(msg) + ctx, ok := tracer.SetDataStreamsCheckpointWithParams(datastreams.ExtractFromBase64Carrier(context.Background(), carrier), options.CheckpointParams{PayloadSize: getMsgSize(msg)}, edges...) + if !ok || version < 0x000b0400 { + // headers not supported before librdkafka >=0.11.4 + return + } + datastreams.InjectToBase64Carrier(ctx, carrier) +} + +func getMsgSize(msg *kafka.Message) (size int64) { + for _, header := range msg.Headers { + size += int64(len(header.Key) + len(header.Value)) + } + return size + int64(len(msg.Value)+len(msg.Key)) +} diff --git a/contrib/confluentinc/confluent-kafka-go/kafka/kafka_test.go b/contrib/confluentinc/confluent-kafka-go/kafka/kafka_test.go index 2fa44322eb..2196beda41 100644 --- a/contrib/confluentinc/confluent-kafka-go/kafka/kafka_test.go +++ b/contrib/confluentinc/confluent-kafka-go/kafka/kafka_test.go @@ -13,6 +13,7 @@ import ( "time" "gopkg.in/DataDog/dd-trace-go.v1/contrib/internal/namingschematest" + "gopkg.in/DataDog/dd-trace-go.v1/datastreams" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" @@ -29,7 +30,7 @@ var ( type consumerActionFn func(c *Consumer) (*kafka.Message, error) -func genIntegrationTestSpans(t *testing.T, consumerAction consumerActionFn, producerOpts []Option, consumerOpts []Option) []mocktracer.Span { +func produceThenConsume(t *testing.T, consumerAction consumerActionFn, producerOpts []Option, consumerOpts []Option) ([]mocktracer.Span, *kafka.Message) { if _, ok := os.LookupEnv("INTEGRATION"); !ok { t.Skip("to enable integration test, set the INTEGRATION environment variable") } @@ -83,7 +84,7 @@ func genIntegrationTestSpans(t *testing.T, consumerAction consumerActionFn, prod require.Len(t, spans, 2) // they should be linked via headers assert.Equal(t, spans[0].TraceID(), spans[1].TraceID()) - return spans + return spans, msg2 } func TestConsumerChannel(t *testing.T) { @@ -99,7 +100,7 @@ func TestConsumerChannel(t *testing.T) { "socket.timeout.ms": 10, "session.timeout.ms": 10, "enable.auto.offset.store": false, - }, WithAnalyticsRate(0.3)) + }, WithAnalyticsRate(0.3), WithDataStreams()) assert.NoError(t, err) err = c.Subscribe(testTopic, nil) @@ -145,10 +146,18 @@ func TestConsumerChannel(t *testing.T) { assert.Equal(t, int32(1), s.Tag(ext.MessagingKafkaPartition)) assert.Equal(t, 0.3, s.Tag(ext.EventSampleRate)) assert.Equal(t, kafka.Offset(i+1), s.Tag("offset")) - assert.Equal(t, "confluentinc/confluent-kafka-go/kafka", s.Tag(ext.Component)) + assert.Equal(t, componentName, s.Tag(ext.Component)) assert.Equal(t, ext.SpanKindConsumer, s.Tag(ext.SpanKind)) assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem)) } + for _, msg := range []*kafka.Message{msg1, msg2} { + p, ok := datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), NewMessageCarrier(msg))) + assert.True(t, ok) + expectedCtx, _ := tracer.SetDataStreamsCheckpoint(context.Background(), "group:"+testGroupID, "direction:in", "topic:"+testTopic, "type:kafka") + expected, _ := datastreams.PathwayFromContext(expectedCtx) + assert.NotEqual(t, expected.GetHash(), 0) + assert.Equal(t, expected.GetHash(), p.GetHash()) + } } /* @@ -199,7 +208,7 @@ func TestConsumerFunctional(t *testing.T) { }, } { t.Run(tt.name, func(t *testing.T) { - spans := genIntegrationTestSpans(t, tt.action, []Option{WithAnalyticsRate(0.1)}, nil) + spans, msg := produceThenConsume(t, tt.action, []Option{WithAnalyticsRate(0.1), WithDataStreams()}, []Option{WithDataStreams()}) s0 := spans[0] // produce assert.Equal(t, "kafka.produce", s0.OperationName()) @@ -208,7 +217,7 @@ func TestConsumerFunctional(t *testing.T) { assert.Equal(t, 0.1, s0.Tag(ext.EventSampleRate)) assert.Equal(t, "queue", s0.Tag(ext.SpanType)) assert.Equal(t, int32(0), s0.Tag(ext.MessagingKafkaPartition)) - assert.Equal(t, "confluentinc/confluent-kafka-go/kafka", s0.Tag(ext.Component)) + assert.Equal(t, componentName, s0.Tag(ext.Component)) assert.Equal(t, ext.SpanKindProducer, s0.Tag(ext.SpanKind)) assert.Equal(t, "kafka", s0.Tag(ext.MessagingSystem)) assert.Equal(t, "127.0.0.1", s0.Tag(ext.KafkaBootstrapServers)) @@ -220,10 +229,20 @@ func TestConsumerFunctional(t *testing.T) { assert.Equal(t, nil, s1.Tag(ext.EventSampleRate)) assert.Equal(t, "queue", s1.Tag(ext.SpanType)) assert.Equal(t, int32(0), s1.Tag(ext.MessagingKafkaPartition)) - assert.Equal(t, "confluentinc/confluent-kafka-go/kafka", s1.Tag(ext.Component)) + assert.Equal(t, componentName, s1.Tag(ext.Component)) assert.Equal(t, ext.SpanKindConsumer, s1.Tag(ext.SpanKind)) assert.Equal(t, "kafka", s1.Tag(ext.MessagingSystem)) assert.Equal(t, "127.0.0.1", s1.Tag(ext.KafkaBootstrapServers)) + + p, ok := datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), NewMessageCarrier(msg))) + assert.True(t, ok) + mt := mocktracer.Start() + ctx, _ := tracer.SetDataStreamsCheckpoint(context.Background(), "direction:out", "topic:"+testTopic, "type:kafka") + expectedCtx, _ := tracer.SetDataStreamsCheckpoint(ctx, "group:"+testGroupID, "direction:in", "topic:"+testTopic, "type:kafka") + expected, _ := datastreams.PathwayFromContext(expectedCtx) + mt.Stop() + assert.NotEqual(t, expected.GetHash(), 0) + assert.Equal(t, expected.GetHash(), p.GetHash()) }) } } @@ -351,7 +370,8 @@ func TestNamingSchema(t *testing.T) { consumerAction := consumerActionFn(func(c *Consumer) (*kafka.Message, error) { return c.ReadMessage(3000 * time.Millisecond) }) - return genIntegrationTestSpans(t, consumerAction, opts, opts) + spans, _ := produceThenConsume(t, consumerAction, opts, opts) + return spans } namingschematest.NewKafkaTest(genSpans)(t) } diff --git a/contrib/confluentinc/confluent-kafka-go/kafka/option.go b/contrib/confluentinc/confluent-kafka-go/kafka/option.go index d0f44f3b85..514f54fff0 100644 --- a/contrib/confluentinc/confluent-kafka-go/kafka/option.go +++ b/contrib/confluentinc/confluent-kafka-go/kafka/option.go @@ -27,7 +27,9 @@ type config struct { producerSpanName string analyticsRate float64 bootstrapServers string + groupID string tagFns map[string]func(msg *kafka.Message) interface{} + dataStreamsEnabled bool } // An Option customizes the config. @@ -111,6 +113,9 @@ func WithCustomTag(tag string, tagFn func(msg *kafka.Message) interface{}) Optio // WithConfig extracts the config information for the client to be tagged func WithConfig(cg *kafka.ConfigMap) Option { return func(cfg *config) { + if groupID, err := cg.Get("group.id", ""); err == nil { + cfg.groupID = groupID.(string) + } if bs, err := cg.Get("bootstrap.servers", ""); err == nil && bs != "" { for _, addr := range strings.Split(bs.(string), ",") { host, _, err := net.SplitHostPort(addr) @@ -122,3 +127,10 @@ func WithConfig(cg *kafka.ConfigMap) Option { } } } + +// WithDataStreams enables the Data Streams monitoring product features: https://www.datadoghq.com/product/data-streams-monitoring/ +func WithDataStreams() Option { + return func(cfg *config) { + cfg.dataStreamsEnabled = true + } +} diff --git a/datastreams/options/options.go b/datastreams/options/options.go new file mode 100644 index 0000000000..7b2d626ce1 --- /dev/null +++ b/datastreams/options/options.go @@ -0,0 +1,10 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package options + +type CheckpointParams struct { + PayloadSize int64 +} diff --git a/datastreams/pathway.go b/datastreams/pathway.go new file mode 100644 index 0000000000..233fd6cad4 --- /dev/null +++ b/datastreams/pathway.go @@ -0,0 +1,22 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package datastreams + +import ( + "context" + + "gopkg.in/DataDog/dd-trace-go.v1/internal/datastreams" +) + +type Pathway interface { + // GetHash returns the hash of the pathway, representing the upstream path of the data. + GetHash() uint64 +} + +// PathwayFromContext returns the pathway contained in a Go context if present +func PathwayFromContext(ctx context.Context) (Pathway, bool) { + return datastreams.PathwayFromContext(ctx) +} diff --git a/datastreams/propagation.go b/datastreams/propagation.go new file mode 100644 index 0000000000..ca657b9d80 --- /dev/null +++ b/datastreams/propagation.go @@ -0,0 +1,71 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package datastreams + +import ( + "context" + + "gopkg.in/DataDog/dd-trace-go.v1/internal/datastreams" +) + +// MergeContexts returns the first context which includes the pathway resulting from merging the pathways +// contained in all contexts. +// This function should be used in fan-in situations. The current implementation keeps only 1 Pathway. +// A future implementation could merge multiple Pathways together and put the resulting Pathway in the context. +func MergeContexts(ctxs ...context.Context) context.Context { + if len(ctxs) == 0 { + return context.Background() + } + pathways := make([]datastreams.Pathway, 0, len(ctxs)) + for _, ctx := range ctxs { + if p, ok := datastreams.PathwayFromContext(ctx); ok { + pathways = append(pathways, p) + } + } + if len(pathways) == 0 { + return ctxs[0] + } + return datastreams.ContextWithPathway(ctxs[0], datastreams.Merge(pathways)) +} + +// TextMapWriter allows setting key/value pairs of strings on the underlying +// data structure. Carriers implementing TextMapWriter are compatible to be +// used with Datadog's TextMapPropagator. +type TextMapWriter interface { + // Set sets the given key/value pair. + Set(key, val string) +} + +// TextMapReader allows iterating over sets of key/value pairs. Carriers implementing +// TextMapReader are compatible to be used with Datadog's TextMapPropagator. +type TextMapReader interface { + // ForeachKey iterates over all keys that exist in the underlying + // carrier. It takes a callback function which will be called + // using all key/value pairs as arguments. ForeachKey will return + // the first error returned by the handler. + ForeachKey(handler func(key, val string) error) error +} + +// ExtractFromBase64Carrier extracts the pathway context from a carrier to a context object +func ExtractFromBase64Carrier(ctx context.Context, carrier TextMapReader) (outCtx context.Context) { + outCtx = ctx + carrier.ForeachKey(func(key, val string) error { + if key == datastreams.PropagationKeyBase64 { + _, outCtx, _ = datastreams.DecodeBase64(ctx, val) + } + return nil + }) + return outCtx +} + +// InjectToBase64Carrier injects a pathway context from a context object inta a carrier +func InjectToBase64Carrier(ctx context.Context, carrier TextMapWriter) { + p, ok := datastreams.PathwayFromContext(ctx) + if !ok { + return + } + carrier.Set(datastreams.PropagationKeyBase64, p.EncodeBase64()) +} diff --git a/datastreams/propagation_test.go b/datastreams/propagation_test.go new file mode 100644 index 0000000000..f29960cdeb --- /dev/null +++ b/datastreams/propagation_test.go @@ -0,0 +1,45 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package datastreams + +import ( + "context" + "testing" + + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" + "gopkg.in/DataDog/dd-trace-go.v1/internal/datastreams" + + "github.com/stretchr/testify/assert" +) + +type carrier map[string]string + +func (c carrier) Set(key, val string) { + c[key] = val +} + +func (c carrier) ForeachKey(handler func(key, val string) error) error { + for k, v := range c { + if err := handler(k, v); err != nil { + return err + } + } + return nil +} + +func TestBase64Propagation(t *testing.T) { + c := make(carrier) + mt := mocktracer.Start() + defer mt.Stop() + ctx := context.Background() + ctx, _ = tracer.SetDataStreamsCheckpoint(ctx, "direction:out", "type:kafka", "topic:topic1") + InjectToBase64Carrier(ctx, c) + got, _ := datastreams.PathwayFromContext(ExtractFromBase64Carrier(context.Background(), c)) + expected, _ := datastreams.PathwayFromContext(ctx) + assert.Equal(t, expected.GetHash(), got.GetHash()) + assert.NotEqual(t, 0, expected.GetHash()) +} diff --git a/ddtrace/mocktracer/mocktracer.go b/ddtrace/mocktracer/mocktracer.go index bcdb6953e9..e397088af7 100644 --- a/ddtrace/mocktracer/mocktracer.go +++ b/ddtrace/mocktracer/mocktracer.go @@ -20,6 +20,7 @@ import ( "gopkg.in/DataDog/dd-trace-go.v1/ddtrace" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/internal" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" + "gopkg.in/DataDog/dd-trace-go.v1/internal/datastreams" ) var _ ddtrace.Tracer = (*mocktracer)(nil) @@ -86,6 +87,10 @@ func (t *mocktracer) StartSpan(operationName string, opts ...ddtrace.StartSpanOp return span } +func (t *mocktracer) GetDataStreamsProcessor() *datastreams.Processor { + return &datastreams.Processor{} +} + func (t *mocktracer) OpenSpans() []Span { t.RLock() defer t.RUnlock() diff --git a/ddtrace/tracer/data_streams.go b/ddtrace/tracer/data_streams.go new file mode 100644 index 0000000000..32585cd41a --- /dev/null +++ b/ddtrace/tracer/data_streams.go @@ -0,0 +1,64 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016 Datadog, Inc. + +package tracer + +import ( + "context" + + "gopkg.in/DataDog/dd-trace-go.v1/datastreams/options" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/internal" + idatastreams "gopkg.in/DataDog/dd-trace-go.v1/internal/datastreams" +) + +// dataStreamsContainer is an object that contains a data streams processor. +type dataStreamsContainer interface { + GetDataStreamsProcessor() *idatastreams.Processor +} + +// GetDataStreamsProcessor returns the processor tracking data streams stats +func (t *tracer) GetDataStreamsProcessor() *idatastreams.Processor { + return t.dataStreams +} + +// SetDataStreamsCheckpoint sets a consume or produce checkpoint in a Data Streams pathway. +// This enables tracking data flow & end to end latency. +// To learn more about the data streams product, see: https://docs.datadoghq.com/data_streams/go/ +func SetDataStreamsCheckpoint(ctx context.Context, edgeTags ...string) (outCtx context.Context, ok bool) { + return SetDataStreamsCheckpointWithParams(ctx, options.CheckpointParams{}, edgeTags...) +} + +// SetDataStreamsCheckpointWithParams sets a consume or produce checkpoint in a Data Streams pathway. +// This enables tracking data flow & end to end latency. +// To learn more about the data streams product, see: https://docs.datadoghq.com/data_streams/go/ +func SetDataStreamsCheckpointWithParams(ctx context.Context, params options.CheckpointParams, edgeTags ...string) (outCtx context.Context, ok bool) { + if t, ok := internal.GetGlobalTracer().(dataStreamsContainer); ok { + if processor := t.GetDataStreamsProcessor(); processor != nil { + outCtx = processor.SetCheckpointWithParams(ctx, params, edgeTags...) + return outCtx, true + } + } + return ctx, false +} + +// TrackKafkaCommitOffset should be used in the consumer, to track when it acks offset. +// if used together with TrackKafkaProduceOffset it can generate a Kafka lag in seconds metric. +func TrackKafkaCommitOffset(group, topic string, partition int32, offset int64) { + if t, ok := internal.GetGlobalTracer().(dataStreamsContainer); ok { + if p := t.GetDataStreamsProcessor(); p != nil { + p.TrackKafkaCommitOffset(group, topic, partition, offset) + } + } +} + +// TrackKafkaProduceOffset should be used in the producer, to track when it produces a message. +// if used together with TrackKafkaCommitOffset it can generate a Kafka lag in seconds metric. +func TrackKafkaProduceOffset(topic string, partition int32, offset int64) { + if t, ok := internal.GetGlobalTracer().(dataStreamsContainer); ok { + if p := t.GetDataStreamsProcessor(); p != nil { + p.TrackKafkaProduceOffset(topic, partition, offset) + } + } +} diff --git a/ddtrace/tracer/log_test.go b/ddtrace/tracer/log_test.go index 9c7b6220f4..61eea2c475 100644 --- a/ddtrace/tracer/log_test.go +++ b/ddtrace/tracer/log_test.go @@ -32,7 +32,7 @@ func TestStartupLog(t *testing.T) { tp.Ignore("appsec: ", telemetry.LogPrefix) logStartup(tracer) require.Len(t, tp.Logs(), 2) - assert.Regexp(logPrefixRegexp+` INFO: DATADOG TRACER CONFIGURATION {"date":"[^"]*","os_name":"[^"]*","os_version":"[^"]*","version":"[^"]*","lang":"Go","lang_version":"[^"]*","env":"","service":"tracer\.test(\.exe)?","agent_url":"http://localhost:9/v0.4/traces","agent_error":"Post .*","debug":false,"analytics_enabled":false,"sample_rate":"NaN","sample_rate_limit":"disabled","sampling_rules":null,"sampling_rules_error":"","service_mappings":null,"tags":{"runtime-id":"[^"]*"},"runtime_metrics_enabled":false,"health_metrics_enabled":false,"profiler_code_hotspots_enabled":((false)|(true)),"profiler_endpoints_enabled":((false)|(true)),"dd_version":"","architecture":"[^"]*","global_service":"","lambda_mode":"false","appsec":((true)|(false)),"agent_features":{"DropP0s":((true)|(false)),"Stats":((true)|(false)),"StatsdPort":0},"integrations":{.*},"partial_flush_enabled":false,"partial_flush_min_spans":1000,"orchestrion":{"enabled":false}}`, tp.Logs()[1]) + assert.Regexp(logPrefixRegexp+` INFO: DATADOG TRACER CONFIGURATION {"date":"[^"]*","os_name":"[^"]*","os_version":"[^"]*","version":"[^"]*","lang":"Go","lang_version":"[^"]*","env":"","service":"tracer\.test(\.exe)?","agent_url":"http://localhost:9/v0.4/traces","agent_error":"Post .*","debug":false,"analytics_enabled":false,"sample_rate":"NaN","sample_rate_limit":"disabled","sampling_rules":null,"sampling_rules_error":"","service_mappings":null,"tags":{"runtime-id":"[^"]*"},"runtime_metrics_enabled":false,"health_metrics_enabled":false,"profiler_code_hotspots_enabled":((false)|(true)),"profiler_endpoints_enabled":((false)|(true)),"dd_version":"","architecture":"[^"]*","global_service":"","lambda_mode":"false","appsec":((true)|(false)),"agent_features":{"DropP0s":((true)|(false)),"Stats":((true)|(false)),"DataStreams":((true)|(false)),"StatsdPort":0},"integrations":{.*},"partial_flush_enabled":false,"partial_flush_min_spans":1000,"orchestrion":{"enabled":false}}`, tp.Logs()[1]) }) t.Run("configured", func(t *testing.T) { @@ -64,7 +64,7 @@ func TestStartupLog(t *testing.T) { tp.Ignore("appsec: ", telemetry.LogPrefix) logStartup(tracer) require.Len(t, tp.Logs(), 2) - assert.Regexp(logPrefixRegexp+` INFO: DATADOG TRACER CONFIGURATION {"date":"[^"]*","os_name":"[^"]*","os_version":"[^"]*","version":"[^"]*","lang":"Go","lang_version":"[^"]*","env":"configuredEnv","service":"configured.service","agent_url":"http://localhost:9/v0.4/traces","agent_error":"Post .*","debug":true,"analytics_enabled":true,"sample_rate":"0\.123000","sample_rate_limit":"100","sampling_rules":\[{"service":"mysql","name":"","sample_rate":0\.75,"type":"trace\(0\)"}\],"sampling_rules_error":"","service_mappings":{"initial_service":"new_service"},"tags":{"runtime-id":"[^"]*","tag":"value","tag2":"NaN"},"runtime_metrics_enabled":true,"health_metrics_enabled":true,"profiler_code_hotspots_enabled":((false)|(true)),"profiler_endpoints_enabled":((false)|(true)),"dd_version":"2.3.4","architecture":"[^"]*","global_service":"configured.service","lambda_mode":"false","appsec":((true)|(false)),"agent_features":{"DropP0s":false,"Stats":false,"StatsdPort":0},"integrations":{.*},"partial_flush_enabled":false,"partial_flush_min_spans":1000,"orchestrion":{"enabled":true,"metadata":{"version":"v1"}}}`, tp.Logs()[1]) + assert.Regexp(logPrefixRegexp+` INFO: DATADOG TRACER CONFIGURATION {"date":"[^"]*","os_name":"[^"]*","os_version":"[^"]*","version":"[^"]*","lang":"Go","lang_version":"[^"]*","env":"configuredEnv","service":"configured.service","agent_url":"http://localhost:9/v0.4/traces","agent_error":"Post .*","debug":true,"analytics_enabled":true,"sample_rate":"0\.123000","sample_rate_limit":"100","sampling_rules":\[{"service":"mysql","name":"","sample_rate":0\.75,"type":"trace\(0\)"}\],"sampling_rules_error":"","service_mappings":{"initial_service":"new_service"},"tags":{"runtime-id":"[^"]*","tag":"value","tag2":"NaN"},"runtime_metrics_enabled":true,"health_metrics_enabled":true,"profiler_code_hotspots_enabled":((false)|(true)),"profiler_endpoints_enabled":((false)|(true)),"dd_version":"2.3.4","architecture":"[^"]*","global_service":"configured.service","lambda_mode":"false","appsec":((true)|(false)),"agent_features":{"DropP0s":false,"Stats":false,"DataStreams":false,"StatsdPort":0},"integrations":{.*},"partial_flush_enabled":false,"partial_flush_min_spans":1000,"orchestrion":{"enabled":true,"metadata":{"version":"v1"}}}`, tp.Logs()[1]) }) t.Run("limit", func(t *testing.T) { @@ -96,7 +96,7 @@ func TestStartupLog(t *testing.T) { tp.Ignore("appsec: ", telemetry.LogPrefix) logStartup(tracer) require.Len(t, tp.Logs(), 2) - assert.Regexp(logPrefixRegexp+` INFO: DATADOG TRACER CONFIGURATION {"date":"[^"]*","os_name":"[^"]*","os_version":"[^"]*","version":"[^"]*","lang":"Go","lang_version":"[^"]*","env":"configuredEnv","service":"configured.service","agent_url":"http://localhost:9/v0.4/traces","agent_error":"Post .*","debug":true,"analytics_enabled":true,"sample_rate":"0\.123000","sample_rate_limit":"1000.001","sampling_rules":\[{"service":"mysql","name":"","sample_rate":0\.75,"type":"trace\(0\)"}\],"sampling_rules_error":"","service_mappings":{"initial_service":"new_service"},"tags":{"runtime-id":"[^"]*","tag":"value","tag2":"NaN"},"runtime_metrics_enabled":true,"health_metrics_enabled":true,"profiler_code_hotspots_enabled":((false)|(true)),"profiler_endpoints_enabled":((false)|(true)),"dd_version":"2.3.4","architecture":"[^"]*","global_service":"configured.service","lambda_mode":"false","appsec":((true)|(false)),"agent_features":{"DropP0s":false,"Stats":false,"StatsdPort":0},"integrations":{.*},"partial_flush_enabled":false,"partial_flush_min_spans":1000,"orchestrion":{"enabled":false}}`, tp.Logs()[1]) + assert.Regexp(logPrefixRegexp+` INFO: DATADOG TRACER CONFIGURATION {"date":"[^"]*","os_name":"[^"]*","os_version":"[^"]*","version":"[^"]*","lang":"Go","lang_version":"[^"]*","env":"configuredEnv","service":"configured.service","agent_url":"http://localhost:9/v0.4/traces","agent_error":"Post .*","debug":true,"analytics_enabled":true,"sample_rate":"0\.123000","sample_rate_limit":"1000.001","sampling_rules":\[{"service":"mysql","name":"","sample_rate":0\.75,"type":"trace\(0\)"}\],"sampling_rules_error":"","service_mappings":{"initial_service":"new_service"},"tags":{"runtime-id":"[^"]*","tag":"value","tag2":"NaN"},"runtime_metrics_enabled":true,"health_metrics_enabled":true,"profiler_code_hotspots_enabled":((false)|(true)),"profiler_endpoints_enabled":((false)|(true)),"dd_version":"2.3.4","architecture":"[^"]*","global_service":"configured.service","lambda_mode":"false","appsec":((true)|(false)),"agent_features":{"DropP0s":false,"Stats":false,"DataStreams":false,"StatsdPort":0},"integrations":{.*},"partial_flush_enabled":false,"partial_flush_min_spans":1000,"orchestrion":{"enabled":false}}`, tp.Logs()[1]) }) t.Run("errors", func(t *testing.T) { @@ -111,7 +111,7 @@ func TestStartupLog(t *testing.T) { tp.Ignore("appsec: ", telemetry.LogPrefix) logStartup(tracer) require.Len(t, tp.Logs(), 2) - assert.Regexp(logPrefixRegexp+` INFO: DATADOG TRACER CONFIGURATION {"date":"[^"]*","os_name":"[^"]*","os_version":"[^"]*","version":"[^"]*","lang":"Go","lang_version":"[^"]*","env":"","service":"tracer\.test(\.exe)?","agent_url":"http://localhost:9/v0.4/traces","agent_error":"Post .*","debug":false,"analytics_enabled":false,"sample_rate":"NaN","sample_rate_limit":"100","sampling_rules":\[{"service":"some.service","name":"","sample_rate":0\.234,"type":"trace\(0\)"}\],"sampling_rules_error":"\\n\\tat index 1: rate not provided","service_mappings":null,"tags":{"runtime-id":"[^"]*"},"runtime_metrics_enabled":false,"health_metrics_enabled":false,"profiler_code_hotspots_enabled":((false)|(true)),"profiler_endpoints_enabled":((false)|(true)),"dd_version":"","architecture":"[^"]*","global_service":"","lambda_mode":"false","appsec":((true)|(false)),"agent_features":{"DropP0s":((true)|(false)),"Stats":((true)|(false)),"StatsdPort":0},"integrations":{.*},"partial_flush_enabled":false,"partial_flush_min_spans":1000,"orchestrion":{"enabled":false}}`, tp.Logs()[1]) + assert.Regexp(logPrefixRegexp+` INFO: DATADOG TRACER CONFIGURATION {"date":"[^"]*","os_name":"[^"]*","os_version":"[^"]*","version":"[^"]*","lang":"Go","lang_version":"[^"]*","env":"","service":"tracer\.test(\.exe)?","agent_url":"http://localhost:9/v0.4/traces","agent_error":"Post .*","debug":false,"analytics_enabled":false,"sample_rate":"NaN","sample_rate_limit":"100","sampling_rules":\[{"service":"some.service","name":"","sample_rate":0\.234,"type":"trace\(0\)"}\],"sampling_rules_error":"\\n\\tat index 1: rate not provided","service_mappings":null,"tags":{"runtime-id":"[^"]*"},"runtime_metrics_enabled":false,"health_metrics_enabled":false,"profiler_code_hotspots_enabled":((false)|(true)),"profiler_endpoints_enabled":((false)|(true)),"dd_version":"","architecture":"[^"]*","global_service":"","lambda_mode":"false","appsec":((true)|(false)),"agent_features":{"DropP0s":((true)|(false)),"Stats":((true)|(false)),"DataStreams":((true)|(false)),"StatsdPort":0},"integrations":{.*},"partial_flush_enabled":false,"partial_flush_min_spans":1000,"orchestrion":{"enabled":false}}`, tp.Logs()[1]) }) t.Run("lambda", func(t *testing.T) { @@ -124,7 +124,7 @@ func TestStartupLog(t *testing.T) { tp.Ignore("appsec: ", telemetry.LogPrefix) logStartup(tracer) assert.Len(tp.Logs(), 1) - assert.Regexp(logPrefixRegexp+` INFO: DATADOG TRACER CONFIGURATION {"date":"[^"]*","os_name":"[^"]*","os_version":"[^"]*","version":"[^"]*","lang":"Go","lang_version":"[^"]*","env":"","service":"tracer\.test(\.exe)?","agent_url":"http://localhost:9/v0.4/traces","agent_error":"","debug":false,"analytics_enabled":false,"sample_rate":"NaN","sample_rate_limit":"disabled","sampling_rules":null,"sampling_rules_error":"","service_mappings":null,"tags":{"runtime-id":"[^"]*"},"runtime_metrics_enabled":false,"health_metrics_enabled":false,"profiler_code_hotspots_enabled":((false)|(true)),"profiler_endpoints_enabled":((false)|(true)),"dd_version":"","architecture":"[^"]*","global_service":"","lambda_mode":"true","appsec":((true)|(false)),"agent_features":{"DropP0s":false,"Stats":false,"StatsdPort":0},"integrations":{.*},"partial_flush_enabled":false,"partial_flush_min_spans":1000,"orchestrion":{"enabled":false}}`, tp.Logs()[0]) + assert.Regexp(logPrefixRegexp+` INFO: DATADOG TRACER CONFIGURATION {"date":"[^"]*","os_name":"[^"]*","os_version":"[^"]*","version":"[^"]*","lang":"Go","lang_version":"[^"]*","env":"","service":"tracer\.test(\.exe)?","agent_url":"http://localhost:9/v0.4/traces","agent_error":"","debug":false,"analytics_enabled":false,"sample_rate":"NaN","sample_rate_limit":"disabled","sampling_rules":null,"sampling_rules_error":"","service_mappings":null,"tags":{"runtime-id":"[^"]*"},"runtime_metrics_enabled":false,"health_metrics_enabled":false,"profiler_code_hotspots_enabled":((false)|(true)),"profiler_endpoints_enabled":((false)|(true)),"dd_version":"","architecture":"[^"]*","global_service":"","lambda_mode":"true","appsec":((true)|(false)),"agent_features":{"DropP0s":false,"Stats":false,"DataStreams":false,"StatsdPort":0},"integrations":{.*},"partial_flush_enabled":false,"partial_flush_min_spans":1000,"orchestrion":{"enabled":false}}`, tp.Logs()[0]) }) t.Run("integrations", func(t *testing.T) { diff --git a/ddtrace/tracer/metrics.go b/ddtrace/tracer/metrics.go index a454ca1673..409d8a439a 100644 --- a/ddtrace/tracer/metrics.go +++ b/ddtrace/tracer/metrics.go @@ -18,15 +18,6 @@ import ( // be reported. const defaultMetricsReportInterval = 10 * time.Second -type statsdClient interface { - Incr(name string, tags []string, rate float64) error - Count(name string, value int64, tags []string, rate float64) error - Gauge(name string, value float64, tags []string, rate float64) error - Timing(name string, value time.Duration, tags []string, rate float64) error - Flush() error - Close() error -} - // reportRuntimeMetrics periodically reports go runtime metrics at // the given interval. func (t *tracer) reportRuntimeMetrics(interval time.Duration) { diff --git a/ddtrace/tracer/metrics_test.go b/ddtrace/tracer/metrics_test.go index 07b7260dda..12c58c2239 100644 --- a/ddtrace/tracer/metrics_test.go +++ b/ddtrace/tracer/metrics_test.go @@ -12,6 +12,8 @@ import ( "testing" "time" + globalinternal "gopkg.in/DataDog/dd-trace-go.v1/internal" + "github.com/stretchr/testify/assert" ) @@ -47,7 +49,7 @@ type testStatsdCall struct { rate float64 } -func withStatsdClient(s statsdClient) StartOption { +func withStatsdClient(s globalinternal.StatsdClient) StartOption { return func(c *config) { c.statsdClient = s } diff --git a/ddtrace/tracer/option.go b/ddtrace/tracer/option.go index b343b29c0a..00fa76dd5f 100644 --- a/ddtrace/tracer/option.go +++ b/ddtrace/tracer/option.go @@ -189,7 +189,7 @@ type config struct { // statsdClient is set when a user provides a custom statsd client for tracking metrics // associated with the runtime and the tracer. - statsdClient statsdClient + statsdClient internal.StatsdClient // spanRules contains user-defined rules to determine the sampling rate to apply // to a single span without affecting the entire trace @@ -247,6 +247,9 @@ type config struct { // statsComputationEnabled enables client-side stats computation (aka trace metrics). statsComputationEnabled bool + // dataStreamsMonitoringEnabled specifies whether the tracer should enable monitoring of data streams + dataStreamsMonitoringEnabled bool + // orchestrionCfg holds Orchestrion (aka auto-instrumentation) configuration. // Only used for telemetry currently. orchestrionCfg orchestrionConfig @@ -340,6 +343,7 @@ func newConfig(opts ...StartOption) *config { c.spanTimeout = internal.DurationEnv("DD_TRACE_ABANDONED_SPAN_TIMEOUT", 10*time.Minute) } c.statsComputationEnabled = internal.BoolEnv("DD_TRACE_STATS_COMPUTATION_ENABLED", false) + c.dataStreamsMonitoringEnabled = internal.BoolEnv("DD_DATA_STREAMS_ENABLED", false) c.partialFlushEnabled = internal.BoolEnv("DD_TRACE_PARTIAL_FLUSH_ENABLED", false) c.partialFlushMinSpans = internal.IntEnv("DD_TRACE_PARTIAL_FLUSH_MIN_SPANS", partialFlushMinSpansDefault) if c.partialFlushMinSpans <= 0 { @@ -446,7 +450,7 @@ func newConfig(opts ...StartOption) *config { if c.debug { log.SetLevel(log.LevelDebug) } - c.loadAgentFeatures() + c.agent = loadAgentFeatures(c.logToStdout, c.agentURL, c.httpClient) info, ok := debug.ReadBuildInfo() if !ok { c.loadContribIntegrations([]*debug.Module{}) @@ -479,7 +483,7 @@ func newConfig(opts ...StartOption) *config { return c } -func newStatsdClient(c *config) (statsdClient, error) { +func newStatsdClient(c *config) (internal.StatsdClient, error) { if c.statsdClient != nil { return c.statsdClient, nil } @@ -555,6 +559,10 @@ type agentFeatures struct { // the /v0.6/stats endpoint. Stats bool + // DataStreams reports whether the agent can receive data streams stats on + // the /v0.1/pipeline_stats endpoint. + DataStreams bool + // StatsdPort specifies the Dogstatsd port as provided by the agent. // If it's the default, it will be 0, which means 8125. StatsdPort int @@ -571,13 +579,12 @@ func (a *agentFeatures) HasFlag(feat string) bool { // loadAgentFeatures queries the trace-agent for its capabilities and updates // the tracer's behaviour. -func (c *config) loadAgentFeatures() { - c.agent = agentFeatures{} - if c.logToStdout { +func loadAgentFeatures(logToStdout bool, agentURL *url.URL, httpClient *http.Client) (features agentFeatures) { + if logToStdout { // there is no agent; all features off return } - resp, err := c.httpClient.Get(fmt.Sprintf("%s/info", c.agentURL)) + resp, err := httpClient.Get(fmt.Sprintf("%s/info", agentURL)) if err != nil { log.Error("Loading features: %v", err) return @@ -598,18 +605,21 @@ func (c *config) loadAgentFeatures() { log.Error("Decoding features: %v", err) return } - c.agent.DropP0s = info.ClientDropP0s - c.agent.StatsdPort = info.StatsdPort + features.DropP0s = info.ClientDropP0s + features.StatsdPort = info.StatsdPort for _, endpoint := range info.Endpoints { switch endpoint { case "/v0.6/stats": - c.agent.Stats = true + features.Stats = true + case "/v0.1/pipeline_stats": + features.DataStreams = true } } - c.agent.featureFlags = make(map[string]struct{}, len(info.FeatureFlags)) + features.featureFlags = make(map[string]struct{}, len(info.FeatureFlags)) for _, flag := range info.FeatureFlags { - c.agent.featureFlags[flag] = struct{}{} + features.featureFlags[flag] = struct{}{} } + return features } // MarkIntegrationImported labels the given integration as imported diff --git a/ddtrace/tracer/stats.go b/ddtrace/tracer/stats.go index 95c5caee11..720a2a0230 100644 --- a/ddtrace/tracer/stats.go +++ b/ddtrace/tracer/stats.go @@ -12,6 +12,7 @@ import ( "sync/atomic" "time" + "gopkg.in/DataDog/dd-trace-go.v1/internal" "gopkg.in/DataDog/dd-trace-go.v1/internal/log" "github.com/DataDog/datadog-go/v5/statsd" @@ -54,11 +55,11 @@ type concentrator struct { // stopped reports whether the concentrator is stopped (when non-zero) stopped uint32 - wg sync.WaitGroup // waits for any active goroutines - bucketSize int64 // the size of a bucket in nanoseconds - stop chan struct{} // closing this channel triggers shutdown - cfg *config // tracer startup configuration - statsdClient statsdClient // statsd client for sending metrics. + wg sync.WaitGroup // waits for any active goroutines + bucketSize int64 // the size of a bucket in nanoseconds + stop chan struct{} // closing this channel triggers shutdown + cfg *config // tracer startup configuration + statsdClient internal.StatsdClient // statsd client for sending metrics. } // newConcentrator creates a new concentrator using the given tracer @@ -113,7 +114,7 @@ func (c *concentrator) runFlusher(tick <-chan time.Time) { } // statsd returns any tracer configured statsd client, or a no-op. -func (c *concentrator) statsd() statsdClient { +func (c *concentrator) statsd() internal.StatsdClient { if c.statsdClient == nil { return &statsd.NoOpClient{} } diff --git a/ddtrace/tracer/tracer.go b/ddtrace/tracer/tracer.go index 0f5ed7f746..416f3c721c 100644 --- a/ddtrace/tracer/tracer.go +++ b/ddtrace/tracer/tracer.go @@ -20,6 +20,7 @@ import ( "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/internal" globalinternal "gopkg.in/DataDog/dd-trace-go.v1/internal" "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec" + "gopkg.in/DataDog/dd-trace-go.v1/internal/datastreams" "gopkg.in/DataDog/dd-trace-go.v1/internal/hostname" "gopkg.in/DataDog/dd-trace-go.v1/internal/log" "gopkg.in/DataDog/dd-trace-go.v1/internal/remoteconfig" @@ -94,7 +95,10 @@ type tracer struct { obfuscator *obfuscate.Obfuscator // statsd is used for tracking metrics associated with the runtime and the tracer. - statsd statsdClient + statsd globalinternal.StatsdClient + + // dataStreams processes data streams monitoring information + dataStreams *datastreams.Processor // abandonedSpansDebugger specifies where and how potentially abandoned spans are stored // when abandoned spans debugging is enabled. @@ -143,6 +147,9 @@ func Start(opts ...StartOption) { if t.config.logStartup { logStartup(t) } + if t.dataStreams != nil { + t.dataStreams.Start() + } // Start AppSec with remote configuration cfg := remoteconfig.DefaultClientConfig() cfg.AgentURL = t.config.agentURL.String() @@ -232,6 +239,13 @@ func newUnstartedTracer(opts ...StartOption) *tracer { if spans != nil { c.spanRules = spans } + var dataStreamsProcessor *datastreams.Processor + if c.dataStreamsMonitoringEnabled { + dataStreamsProcessor = datastreams.NewProcessor(statsd, c.env, c.serviceName, c.agentURL, c.httpClient, func() bool { + f := loadAgentFeatures(c.logToStdout, c.agentURL, c.httpClient) + return f.DataStreams + }) + } t := &tracer{ config: c, traceWriter: writer, @@ -251,7 +265,8 @@ func newUnstartedTracer(opts ...StartOption) *tracer { Cache: c.agent.HasFlag("sql_cache"), }, }), - statsd: statsd, + statsd: statsd, + dataStreams: dataStreamsProcessor, } return t } @@ -311,6 +326,9 @@ func newTracer(opts ...StartOption) *tracer { func Flush() { if t, ok := internal.GetGlobalTracer().(*tracer); ok { t.flushSync() + if t.dataStreams != nil { + t.dataStreams.Flush() + } } } @@ -617,6 +635,9 @@ func (t *tracer) Stop() { t.wg.Wait() t.traceWriter.stop() t.statsd.Close() + if t.dataStreams != nil { + t.dataStreams.Stop() + } appsec.Stop() } diff --git a/ddtrace/tracer/writer.go b/ddtrace/tracer/writer.go index 8027defa79..a4665bddb5 100644 --- a/ddtrace/tracer/writer.go +++ b/ddtrace/tracer/writer.go @@ -16,6 +16,7 @@ import ( "sync" "time" + globalinternal "gopkg.in/DataDog/dd-trace-go.v1/internal" "gopkg.in/DataDog/dd-trace-go.v1/internal/log" ) @@ -48,10 +49,10 @@ type agentTraceWriter struct { prioritySampling *prioritySampler // statsd is used to send metrics - statsd statsdClient + statsd globalinternal.StatsdClient } -func newAgentTraceWriter(c *config, s *prioritySampler, statsdClient statsdClient) *agentTraceWriter { +func newAgentTraceWriter(c *config, s *prioritySampler, statsdClient globalinternal.StatsdClient) *agentTraceWriter { return &agentTraceWriter{ config: c, payload: newPayload(), @@ -135,10 +136,10 @@ type logTraceWriter struct { buf bytes.Buffer hasTraces bool w io.Writer - statsd statsdClient + statsd globalinternal.StatsdClient } -func newLogTraceWriter(c *config, statsdClient statsdClient) *logTraceWriter { +func newLogTraceWriter(c *config, statsdClient globalinternal.StatsdClient) *logTraceWriter { w := &logTraceWriter{ config: c, w: logWriter, diff --git a/internal/datastreams/pathway.go b/internal/datastreams/pathway.go new file mode 100644 index 0000000000..5ff05de272 --- /dev/null +++ b/internal/datastreams/pathway.go @@ -0,0 +1,96 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package datastreams + +import ( + "encoding/binary" + "fmt" + "hash/fnv" + "math/rand" + "sort" + "strings" + "time" +) + +var hashableEdgeTags = map[string]struct{}{"event_type": {}, "exchange": {}, "group": {}, "topic": {}, "type": {}, "direction": {}} + +func isWellFormedEdgeTag(t string) bool { + if i := strings.IndexByte(t, ':'); i != -1 { + if j := strings.LastIndexByte(t, ':'); j == i { + if _, exists := hashableEdgeTags[t[:i]]; exists { + return true + } + } + } + return false +} + +func nodeHash(service, env string, edgeTags []string) uint64 { + h := fnv.New64() + sort.Strings(edgeTags) + h.Write([]byte(service)) + h.Write([]byte(env)) + for _, t := range edgeTags { + if isWellFormedEdgeTag(t) { + h.Write([]byte(t)) + } else { + fmt.Println("not formatted correctly", t) + } + } + return h.Sum64() +} + +func pathwayHash(nodeHash, parentHash uint64) uint64 { + b := make([]byte, 16) + binary.LittleEndian.PutUint64(b, nodeHash) + binary.LittleEndian.PutUint64(b[8:], parentHash) + h := fnv.New64() + h.Write(b) + return h.Sum64() +} + +// Pathway is used to monitor how payloads are sent across different services. +// An example Pathway would be: +// service A -- edge 1 --> service B -- edge 2 --> service C +// So it's a branch of services (we also call them "nodes") connected via edges. +// As the payload is sent around, we save the start time (start of service A), +// and the start time of the previous service. +// This allows us to measure the latency of each edge, as well as the latency from origin of any service. +type Pathway struct { + // hash is the hash of the current node, of the parent node, and of the edge that connects the parent node + // to this node. + hash uint64 + // pathwayStart is the start of the first node in the Pathway + pathwayStart time.Time + // edgeStart is the start of the previous node. + edgeStart time.Time +} + +// Merge merges multiple pathways into one. +// The current implementation samples one resulting Pathway. A future implementation could be more clever +// and actually merge the Pathways. +func Merge(pathways []Pathway) Pathway { + if len(pathways) == 0 { + return Pathway{} + } + // Randomly select a pathway to propagate downstream. + n := rand.Intn(len(pathways)) + return pathways[n] +} + +// GetHash gets the hash of a pathway. +func (p Pathway) GetHash() uint64 { + return p.hash +} + +// PathwayStart returns the start timestamp of the pathway +func (p Pathway) PathwayStart() time.Time { + return p.pathwayStart +} + +func (p Pathway) EdgeStart() time.Time { + return p.edgeStart +} diff --git a/internal/datastreams/pathway_test.go b/internal/datastreams/pathway_test.go new file mode 100644 index 0000000000..eefaf6f58d --- /dev/null +++ b/internal/datastreams/pathway_test.go @@ -0,0 +1,182 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package datastreams + +import ( + "context" + "hash/fnv" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestPathway(t *testing.T) { + t.Run("test SetCheckpoint", func(t *testing.T) { + start := time.Now() + processor := Processor{ + stopped: 1, + in: make(chan statsPoint, 10), + service: "service-1", + env: "env", + timeSource: func() time.Time { return start }, + } + ctx := processor.SetCheckpoint(context.Background()) + middle := start.Add(time.Hour) + processor.timeSource = func() time.Time { return middle } + ctx = processor.SetCheckpoint(ctx, "edge-1") + end := middle.Add(time.Hour) + processor.timeSource = func() time.Time { return end } + ctx = processor.SetCheckpoint(ctx, "edge-2") + hash1 := pathwayHash(nodeHash("service-1", "env", nil), 0) + hash2 := pathwayHash(nodeHash("service-1", "env", []string{"edge-1"}), hash1) + hash3 := pathwayHash(nodeHash("service-1", "env", []string{"edge-2"}), hash2) + p, _ := PathwayFromContext(ctx) + assert.Equal(t, hash3, p.GetHash()) + assert.Equal(t, start, p.PathwayStart()) + assert.Equal(t, end, p.EdgeStart()) + assert.Equal(t, statsPoint{ + edgeTags: nil, + hash: hash1, + parentHash: 0, + timestamp: start.UnixNano(), + pathwayLatency: 0, + edgeLatency: 0, + }, <-processor.in) + assert.Equal(t, statsPoint{ + edgeTags: []string{"edge-1"}, + hash: hash2, + parentHash: hash1, + timestamp: middle.UnixNano(), + pathwayLatency: middle.Sub(start).Nanoseconds(), + edgeLatency: middle.Sub(start).Nanoseconds(), + }, <-processor.in) + assert.Equal(t, statsPoint{ + edgeTags: []string{"edge-2"}, + hash: hash3, + parentHash: hash2, + timestamp: end.UnixNano(), + pathwayLatency: end.Sub(start).Nanoseconds(), + edgeLatency: end.Sub(middle).Nanoseconds(), + }, <-processor.in) + }) + + t.Run("test new pathway creation", func(t *testing.T) { + processor := Processor{ + stopped: 1, + in: make(chan statsPoint, 10), + service: "service-1", + env: "env", + timeSource: time.Now, + } + + pathwayWithNoEdgeTags, _ := PathwayFromContext(processor.SetCheckpoint(context.Background())) + pathwayWith1EdgeTag, _ := PathwayFromContext(processor.SetCheckpoint(context.Background(), "type:internal")) + pathwayWith2EdgeTags, _ := PathwayFromContext(processor.SetCheckpoint(context.Background(), "type:internal", "some_other_key:some_other_val")) + + hash1 := pathwayHash(nodeHash("service-1", "env", nil), 0) + hash2 := pathwayHash(nodeHash("service-1", "env", []string{"type:internal"}), 0) + hash3 := pathwayHash(nodeHash("service-1", "env", []string{"type:internal", "some_other_key:some_other_val"}), 0) + assert.Equal(t, hash1, pathwayWithNoEdgeTags.GetHash()) + assert.Equal(t, hash2, pathwayWith1EdgeTag.GetHash()) + assert.Equal(t, hash3, pathwayWith2EdgeTags.GetHash()) + + var statsPointWithNoEdgeTags = <-processor.in + var statsPointWith1EdgeTag = <-processor.in + var statsPointWith2EdgeTags = <-processor.in + assert.Equal(t, hash1, statsPointWithNoEdgeTags.hash) + assert.Equal(t, []string(nil), statsPointWithNoEdgeTags.edgeTags) + assert.Equal(t, hash2, statsPointWith1EdgeTag.hash) + assert.Equal(t, []string{"type:internal"}, statsPointWith1EdgeTag.edgeTags) + assert.Equal(t, hash3, statsPointWith2EdgeTags.hash) + assert.Equal(t, []string{"some_other_key:some_other_val", "type:internal"}, statsPointWith2EdgeTags.edgeTags) + }) + + t.Run("test nodeHash", func(t *testing.T) { + assert.NotEqual(t, + nodeHash("service-1", "env", []string{"type:internal"}), + nodeHash("service-1", "env", []string{"type:kafka"}), + ) + assert.NotEqual(t, + nodeHash("service-1", "env", []string{"exchange:1"}), + nodeHash("service-1", "env", []string{"exchange:2"}), + ) + assert.NotEqual(t, + nodeHash("service-1", "env", []string{"topic:1"}), + nodeHash("service-1", "env", []string{"topic:2"}), + ) + assert.NotEqual(t, + nodeHash("service-1", "env", []string{"group:1"}), + nodeHash("service-1", "env", []string{"group:2"}), + ) + assert.NotEqual(t, + nodeHash("service-1", "env", []string{"event_type:1"}), + nodeHash("service-1", "env", []string{"event_type:2"}), + ) + assert.Equal(t, + nodeHash("service-1", "env", []string{"partition:0"}), + nodeHash("service-1", "env", []string{"partition:1"}), + ) + }) + + t.Run("test isWellFormedEdgeTag", func(t *testing.T) { + for _, tc := range []struct { + s string + b bool + }{ + {"", false}, + {"dog", false}, + {"dog:", false}, + {"dog:bark", false}, + {"type:", true}, + {"type:dog", true}, + {"type::dog", false}, + {"type:d:o:g", false}, + {"type::", false}, + {":", false}, + } { + assert.Equal(t, isWellFormedEdgeTag(tc.s), tc.b) + } + }) + + // nodeHash assumes that the go Hash interface produces the same result + // for a given series of Write calls as for a single Write of the same + // byte sequence. This unit test asserts that assumption. + t.Run("test hashWriterIsomorphism", func(t *testing.T) { + h := fnv.New64() + var b []byte + b = append(b, "dog"...) + b = append(b, "cat"...) + b = append(b, "pig"...) + h.Write(b) + s1 := h.Sum64() + h.Reset() + h.Write([]byte("dog")) + h.Write([]byte("cat")) + h.Write([]byte("pig")) + assert.Equal(t, s1, h.Sum64()) + }) + + t.Run("test GetHash", func(t *testing.T) { + pathway := Pathway{hash: nodeHash("service", "env", []string{"direction:in"})} + assert.Equal(t, pathway.hash, pathway.GetHash()) + }) +} + +// Sample results at time of writing this benchmark: +// goos: darwin +// goarch: amd64 +// pkg: github.com/DataDog/data-streams-go/datastreams +// cpu: Intel(R) Core(TM) i7-1068NG7 CPU @ 2.30GHz +// BenchmarkNodeHash-8 5167707 232.5 ns/op 24 B/op 1 allocs/op +func BenchmarkNodeHash(b *testing.B) { + service := "benchmark-runner" + env := "test" + edgeTags := []string{"event_type:dog", "exchange:local", "group:all", "topic:off", "type:writer"} + for i := 0; i < b.N; i++ { + nodeHash(service, env, edgeTags) + } +} diff --git a/internal/datastreams/payload.go b/internal/datastreams/payload.go new file mode 100644 index 0000000000..2278e63b8d --- /dev/null +++ b/internal/datastreams/payload.go @@ -0,0 +1,82 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +//go:generate msgp -unexported -marshal=false -o=payload_msgp.go -tests=false + +package datastreams + +// StatsPayload stores client computed stats. +type StatsPayload struct { + // Env specifies the env. of the application, as defined by the user. + Env string + // Service is the service of the application + Service string + // Stats holds all stats buckets computed within this payload. + Stats []StatsBucket + // TracerVersion is the version of the tracer + TracerVersion string + // Lang is the language of the tracer + Lang string +} + +type ProduceOffset struct { + Topic string + Partition int32 + Offset int64 +} + +type CommitOffset struct { + ConsumerGroup string + Topic string + Partition int32 + Offset int64 +} + +// Backlog represents the size of a queue that hasn't been yet read by the consumer. +type Backlog struct { + // Tags that identify the backlog + Tags []string + // Value of the backlog + Value int64 +} + +// StatsBucket specifies a set of stats computed over a duration. +type StatsBucket struct { + // Start specifies the beginning of this bucket in unix nanoseconds. + Start uint64 + // Duration specifies the duration of this bucket in nanoseconds. + Duration uint64 + // Stats contains a set of statistics computed for the duration of this bucket. + Stats []StatsPoint + // Backlogs store information used to compute queue backlog + Backlogs []Backlog +} + +// TimestampType can be either current or origin. +type TimestampType string + +const ( + // TimestampTypeCurrent is for when the recorded timestamp is based on the + // timestamp of the current StatsPoint. + TimestampTypeCurrent TimestampType = "current" + // TimestampTypeOrigin is for when the recorded timestamp is based on the + // time that the first StatsPoint in the pathway is sent out. + TimestampTypeOrigin TimestampType = "origin" +) + +// StatsPoint contains a set of statistics grouped under various aggregation keys. +type StatsPoint struct { + // These fields indicate the properties under which the stats were aggregated. + Service string // deprecated + EdgeTags []string + Hash uint64 + ParentHash uint64 + // These fields specify the stats for the above aggregation. + // those are distributions of latency in seconds. + PathwayLatency []byte + EdgeLatency []byte + PayloadSize []byte + TimestampType TimestampType +} diff --git a/internal/datastreams/payload_msgp.go b/internal/datastreams/payload_msgp.go new file mode 100644 index 0000000000..201387a940 --- /dev/null +++ b/internal/datastreams/payload_msgp.go @@ -0,0 +1,891 @@ +package datastreams + +// Code generated by github.com/tinylib/msgp DO NOT EDIT. + +import ( + "github.com/tinylib/msgp/msgp" +) + +// DecodeMsg implements msgp.Decodable +func (z *Backlog) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "Tags": + var zb0002 uint32 + zb0002, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err, "Tags") + return + } + if cap(z.Tags) >= int(zb0002) { + z.Tags = (z.Tags)[:zb0002] + } else { + z.Tags = make([]string, zb0002) + } + for za0001 := range z.Tags { + z.Tags[za0001], err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Tags", za0001) + return + } + } + case "Value": + z.Value, err = dc.ReadInt64() + if err != nil { + err = msgp.WrapError(err, "Value") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *Backlog) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 2 + // write "Tags" + err = en.Append(0x82, 0xa4, 0x54, 0x61, 0x67, 0x73) + if err != nil { + return + } + err = en.WriteArrayHeader(uint32(len(z.Tags))) + if err != nil { + err = msgp.WrapError(err, "Tags") + return + } + for za0001 := range z.Tags { + err = en.WriteString(z.Tags[za0001]) + if err != nil { + err = msgp.WrapError(err, "Tags", za0001) + return + } + } + // write "Value" + err = en.Append(0xa5, 0x56, 0x61, 0x6c, 0x75, 0x65) + if err != nil { + return + } + err = en.WriteInt64(z.Value) + if err != nil { + err = msgp.WrapError(err, "Value") + return + } + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *Backlog) Msgsize() (s int) { + s = 1 + 5 + msgp.ArrayHeaderSize + for za0001 := range z.Tags { + s += msgp.StringPrefixSize + len(z.Tags[za0001]) + } + s += 6 + msgp.Int64Size + return +} + +// DecodeMsg implements msgp.Decodable +func (z *CommitOffset) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "ConsumerGroup": + z.ConsumerGroup, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "ConsumerGroup") + return + } + case "Topic": + z.Topic, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Topic") + return + } + case "Partition": + z.Partition, err = dc.ReadInt32() + if err != nil { + err = msgp.WrapError(err, "Partition") + return + } + case "Offset": + z.Offset, err = dc.ReadInt64() + if err != nil { + err = msgp.WrapError(err, "Offset") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *CommitOffset) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 4 + // write "ConsumerGroup" + err = en.Append(0x84, 0xad, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x47, 0x72, 0x6f, 0x75, 0x70) + if err != nil { + return + } + err = en.WriteString(z.ConsumerGroup) + if err != nil { + err = msgp.WrapError(err, "ConsumerGroup") + return + } + // write "Topic" + err = en.Append(0xa5, 0x54, 0x6f, 0x70, 0x69, 0x63) + if err != nil { + return + } + err = en.WriteString(z.Topic) + if err != nil { + err = msgp.WrapError(err, "Topic") + return + } + // write "Partition" + err = en.Append(0xa9, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e) + if err != nil { + return + } + err = en.WriteInt32(z.Partition) + if err != nil { + err = msgp.WrapError(err, "Partition") + return + } + // write "Offset" + err = en.Append(0xa6, 0x4f, 0x66, 0x66, 0x73, 0x65, 0x74) + if err != nil { + return + } + err = en.WriteInt64(z.Offset) + if err != nil { + err = msgp.WrapError(err, "Offset") + return + } + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *CommitOffset) Msgsize() (s int) { + s = 1 + 14 + msgp.StringPrefixSize + len(z.ConsumerGroup) + 6 + msgp.StringPrefixSize + len(z.Topic) + 10 + msgp.Int32Size + 7 + msgp.Int64Size + return +} + +// DecodeMsg implements msgp.Decodable +func (z *ProduceOffset) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "Topic": + z.Topic, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Topic") + return + } + case "Partition": + z.Partition, err = dc.ReadInt32() + if err != nil { + err = msgp.WrapError(err, "Partition") + return + } + case "Offset": + z.Offset, err = dc.ReadInt64() + if err != nil { + err = msgp.WrapError(err, "Offset") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z ProduceOffset) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 3 + // write "Topic" + err = en.Append(0x83, 0xa5, 0x54, 0x6f, 0x70, 0x69, 0x63) + if err != nil { + return + } + err = en.WriteString(z.Topic) + if err != nil { + err = msgp.WrapError(err, "Topic") + return + } + // write "Partition" + err = en.Append(0xa9, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e) + if err != nil { + return + } + err = en.WriteInt32(z.Partition) + if err != nil { + err = msgp.WrapError(err, "Partition") + return + } + // write "Offset" + err = en.Append(0xa6, 0x4f, 0x66, 0x66, 0x73, 0x65, 0x74) + if err != nil { + return + } + err = en.WriteInt64(z.Offset) + if err != nil { + err = msgp.WrapError(err, "Offset") + return + } + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z ProduceOffset) Msgsize() (s int) { + s = 1 + 6 + msgp.StringPrefixSize + len(z.Topic) + 10 + msgp.Int32Size + 7 + msgp.Int64Size + return +} + +// DecodeMsg implements msgp.Decodable +func (z *StatsBucket) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "Start": + z.Start, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "Start") + return + } + case "Duration": + z.Duration, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "Duration") + return + } + case "Stats": + var zb0002 uint32 + zb0002, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err, "Stats") + return + } + if cap(z.Stats) >= int(zb0002) { + z.Stats = (z.Stats)[:zb0002] + } else { + z.Stats = make([]StatsPoint, zb0002) + } + for za0001 := range z.Stats { + err = z.Stats[za0001].DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "Stats", za0001) + return + } + } + case "Backlogs": + var zb0003 uint32 + zb0003, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err, "Backlogs") + return + } + if cap(z.Backlogs) >= int(zb0003) { + z.Backlogs = (z.Backlogs)[:zb0003] + } else { + z.Backlogs = make([]Backlog, zb0003) + } + for za0002 := range z.Backlogs { + var zb0004 uint32 + zb0004, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err, "Backlogs", za0002) + return + } + for zb0004 > 0 { + zb0004-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err, "Backlogs", za0002) + return + } + switch msgp.UnsafeString(field) { + case "Tags": + var zb0005 uint32 + zb0005, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err, "Backlogs", za0002, "Tags") + return + } + if cap(z.Backlogs[za0002].Tags) >= int(zb0005) { + z.Backlogs[za0002].Tags = (z.Backlogs[za0002].Tags)[:zb0005] + } else { + z.Backlogs[za0002].Tags = make([]string, zb0005) + } + for za0003 := range z.Backlogs[za0002].Tags { + z.Backlogs[za0002].Tags[za0003], err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Backlogs", za0002, "Tags", za0003) + return + } + } + case "Value": + z.Backlogs[za0002].Value, err = dc.ReadInt64() + if err != nil { + err = msgp.WrapError(err, "Backlogs", za0002, "Value") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err, "Backlogs", za0002) + return + } + } + } + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *StatsBucket) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 4 + // write "Start" + err = en.Append(0x84, 0xa5, 0x53, 0x74, 0x61, 0x72, 0x74) + if err != nil { + return + } + err = en.WriteUint64(z.Start) + if err != nil { + err = msgp.WrapError(err, "Start") + return + } + // write "Duration" + err = en.Append(0xa8, 0x44, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e) + if err != nil { + return + } + err = en.WriteUint64(z.Duration) + if err != nil { + err = msgp.WrapError(err, "Duration") + return + } + // write "Stats" + err = en.Append(0xa5, 0x53, 0x74, 0x61, 0x74, 0x73) + if err != nil { + return + } + err = en.WriteArrayHeader(uint32(len(z.Stats))) + if err != nil { + err = msgp.WrapError(err, "Stats") + return + } + for za0001 := range z.Stats { + err = z.Stats[za0001].EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "Stats", za0001) + return + } + } + // write "Backlogs" + err = en.Append(0xa8, 0x42, 0x61, 0x63, 0x6b, 0x6c, 0x6f, 0x67, 0x73) + if err != nil { + return + } + err = en.WriteArrayHeader(uint32(len(z.Backlogs))) + if err != nil { + err = msgp.WrapError(err, "Backlogs") + return + } + for za0002 := range z.Backlogs { + // map header, size 2 + // write "Tags" + err = en.Append(0x82, 0xa4, 0x54, 0x61, 0x67, 0x73) + if err != nil { + return + } + err = en.WriteArrayHeader(uint32(len(z.Backlogs[za0002].Tags))) + if err != nil { + err = msgp.WrapError(err, "Backlogs", za0002, "Tags") + return + } + for za0003 := range z.Backlogs[za0002].Tags { + err = en.WriteString(z.Backlogs[za0002].Tags[za0003]) + if err != nil { + err = msgp.WrapError(err, "Backlogs", za0002, "Tags", za0003) + return + } + } + // write "Value" + err = en.Append(0xa5, 0x56, 0x61, 0x6c, 0x75, 0x65) + if err != nil { + return + } + err = en.WriteInt64(z.Backlogs[za0002].Value) + if err != nil { + err = msgp.WrapError(err, "Backlogs", za0002, "Value") + return + } + } + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *StatsBucket) Msgsize() (s int) { + s = 1 + 6 + msgp.Uint64Size + 9 + msgp.Uint64Size + 6 + msgp.ArrayHeaderSize + for za0001 := range z.Stats { + s += z.Stats[za0001].Msgsize() + } + s += 9 + msgp.ArrayHeaderSize + for za0002 := range z.Backlogs { + s += 1 + 5 + msgp.ArrayHeaderSize + for za0003 := range z.Backlogs[za0002].Tags { + s += msgp.StringPrefixSize + len(z.Backlogs[za0002].Tags[za0003]) + } + s += 6 + msgp.Int64Size + } + return +} + +// DecodeMsg implements msgp.Decodable +func (z *StatsPayload) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "Env": + z.Env, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Env") + return + } + case "Service": + z.Service, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Service") + return + } + case "Stats": + var zb0002 uint32 + zb0002, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err, "Stats") + return + } + if cap(z.Stats) >= int(zb0002) { + z.Stats = (z.Stats)[:zb0002] + } else { + z.Stats = make([]StatsBucket, zb0002) + } + for za0001 := range z.Stats { + err = z.Stats[za0001].DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "Stats", za0001) + return + } + } + case "TracerVersion": + z.TracerVersion, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "TracerVersion") + return + } + case "Lang": + z.Lang, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Lang") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *StatsPayload) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 5 + // write "Env" + err = en.Append(0x85, 0xa3, 0x45, 0x6e, 0x76) + if err != nil { + return + } + err = en.WriteString(z.Env) + if err != nil { + err = msgp.WrapError(err, "Env") + return + } + // write "Service" + err = en.Append(0xa7, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65) + if err != nil { + return + } + err = en.WriteString(z.Service) + if err != nil { + err = msgp.WrapError(err, "Service") + return + } + // write "Stats" + err = en.Append(0xa5, 0x53, 0x74, 0x61, 0x74, 0x73) + if err != nil { + return + } + err = en.WriteArrayHeader(uint32(len(z.Stats))) + if err != nil { + err = msgp.WrapError(err, "Stats") + return + } + for za0001 := range z.Stats { + err = z.Stats[za0001].EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "Stats", za0001) + return + } + } + // write "TracerVersion" + err = en.Append(0xad, 0x54, 0x72, 0x61, 0x63, 0x65, 0x72, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e) + if err != nil { + return + } + err = en.WriteString(z.TracerVersion) + if err != nil { + err = msgp.WrapError(err, "TracerVersion") + return + } + // write "Lang" + err = en.Append(0xa4, 0x4c, 0x61, 0x6e, 0x67) + if err != nil { + return + } + err = en.WriteString(z.Lang) + if err != nil { + err = msgp.WrapError(err, "Lang") + return + } + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *StatsPayload) Msgsize() (s int) { + s = 1 + 4 + msgp.StringPrefixSize + len(z.Env) + 8 + msgp.StringPrefixSize + len(z.Service) + 6 + msgp.ArrayHeaderSize + for za0001 := range z.Stats { + s += z.Stats[za0001].Msgsize() + } + s += 14 + msgp.StringPrefixSize + len(z.TracerVersion) + 5 + msgp.StringPrefixSize + len(z.Lang) + return +} + +// DecodeMsg implements msgp.Decodable +func (z *StatsPoint) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "Service": + z.Service, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Service") + return + } + case "EdgeTags": + var zb0002 uint32 + zb0002, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err, "EdgeTags") + return + } + if cap(z.EdgeTags) >= int(zb0002) { + z.EdgeTags = (z.EdgeTags)[:zb0002] + } else { + z.EdgeTags = make([]string, zb0002) + } + for za0001 := range z.EdgeTags { + z.EdgeTags[za0001], err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "EdgeTags", za0001) + return + } + } + case "Hash": + z.Hash, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "Hash") + return + } + case "ParentHash": + z.ParentHash, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "ParentHash") + return + } + case "PathwayLatency": + z.PathwayLatency, err = dc.ReadBytes(z.PathwayLatency) + if err != nil { + err = msgp.WrapError(err, "PathwayLatency") + return + } + case "EdgeLatency": + z.EdgeLatency, err = dc.ReadBytes(z.EdgeLatency) + if err != nil { + err = msgp.WrapError(err, "EdgeLatency") + return + } + case "PayloadSize": + z.PayloadSize, err = dc.ReadBytes(z.PayloadSize) + if err != nil { + err = msgp.WrapError(err, "PayloadSize") + return + } + case "TimestampType": + { + var zb0003 string + zb0003, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "TimestampType") + return + } + z.TimestampType = TimestampType(zb0003) + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *StatsPoint) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 8 + // write "Service" + err = en.Append(0x88, 0xa7, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65) + if err != nil { + return + } + err = en.WriteString(z.Service) + if err != nil { + err = msgp.WrapError(err, "Service") + return + } + // write "EdgeTags" + err = en.Append(0xa8, 0x45, 0x64, 0x67, 0x65, 0x54, 0x61, 0x67, 0x73) + if err != nil { + return + } + err = en.WriteArrayHeader(uint32(len(z.EdgeTags))) + if err != nil { + err = msgp.WrapError(err, "EdgeTags") + return + } + for za0001 := range z.EdgeTags { + err = en.WriteString(z.EdgeTags[za0001]) + if err != nil { + err = msgp.WrapError(err, "EdgeTags", za0001) + return + } + } + // write "Hash" + err = en.Append(0xa4, 0x48, 0x61, 0x73, 0x68) + if err != nil { + return + } + err = en.WriteUint64(z.Hash) + if err != nil { + err = msgp.WrapError(err, "Hash") + return + } + // write "ParentHash" + err = en.Append(0xaa, 0x50, 0x61, 0x72, 0x65, 0x6e, 0x74, 0x48, 0x61, 0x73, 0x68) + if err != nil { + return + } + err = en.WriteUint64(z.ParentHash) + if err != nil { + err = msgp.WrapError(err, "ParentHash") + return + } + // write "PathwayLatency" + err = en.Append(0xae, 0x50, 0x61, 0x74, 0x68, 0x77, 0x61, 0x79, 0x4c, 0x61, 0x74, 0x65, 0x6e, 0x63, 0x79) + if err != nil { + return + } + err = en.WriteBytes(z.PathwayLatency) + if err != nil { + err = msgp.WrapError(err, "PathwayLatency") + return + } + // write "EdgeLatency" + err = en.Append(0xab, 0x45, 0x64, 0x67, 0x65, 0x4c, 0x61, 0x74, 0x65, 0x6e, 0x63, 0x79) + if err != nil { + return + } + err = en.WriteBytes(z.EdgeLatency) + if err != nil { + err = msgp.WrapError(err, "EdgeLatency") + return + } + // write "PayloadSize" + err = en.Append(0xab, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x53, 0x69, 0x7a, 0x65) + if err != nil { + return + } + err = en.WriteBytes(z.PayloadSize) + if err != nil { + err = msgp.WrapError(err, "PayloadSize") + return + } + // write "TimestampType" + err = en.Append(0xad, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x54, 0x79, 0x70, 0x65) + if err != nil { + return + } + err = en.WriteString(string(z.TimestampType)) + if err != nil { + err = msgp.WrapError(err, "TimestampType") + return + } + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *StatsPoint) Msgsize() (s int) { + s = 1 + 8 + msgp.StringPrefixSize + len(z.Service) + 9 + msgp.ArrayHeaderSize + for za0001 := range z.EdgeTags { + s += msgp.StringPrefixSize + len(z.EdgeTags[za0001]) + } + s += 5 + msgp.Uint64Size + 11 + msgp.Uint64Size + 15 + msgp.BytesPrefixSize + len(z.PathwayLatency) + 12 + msgp.BytesPrefixSize + len(z.EdgeLatency) + 12 + msgp.BytesPrefixSize + len(z.PayloadSize) + 14 + msgp.StringPrefixSize + len(string(z.TimestampType)) + return +} + +// DecodeMsg implements msgp.Decodable +func (z *TimestampType) DecodeMsg(dc *msgp.Reader) (err error) { + { + var zb0001 string + zb0001, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err) + return + } + (*z) = TimestampType(zb0001) + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z TimestampType) EncodeMsg(en *msgp.Writer) (err error) { + err = en.WriteString(string(z)) + if err != nil { + err = msgp.WrapError(err) + return + } + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z TimestampType) Msgsize() (s int) { + s = msgp.StringPrefixSize + len(string(z)) + return +} diff --git a/internal/datastreams/processor.go b/internal/datastreams/processor.go new file mode 100644 index 0000000000..fd29280481 --- /dev/null +++ b/internal/datastreams/processor.go @@ -0,0 +1,469 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package datastreams + +import ( + "context" + "fmt" + "math" + "net/http" + "net/url" + "sync" + "sync/atomic" + "time" + + "gopkg.in/DataDog/dd-trace-go.v1/datastreams/options" + "gopkg.in/DataDog/dd-trace-go.v1/internal" + "gopkg.in/DataDog/dd-trace-go.v1/internal/log" + "gopkg.in/DataDog/dd-trace-go.v1/internal/version" + + "github.com/DataDog/sketches-go/ddsketch" + "github.com/DataDog/sketches-go/ddsketch/mapping" + "github.com/DataDog/sketches-go/ddsketch/store" + "github.com/golang/protobuf/proto" +) + +const ( + bucketDuration = time.Second * 10 + loadAgentFeaturesInterval = time.Second * 30 + defaultServiceName = "unnamed-go-service" +) + +var sketchMapping, _ = mapping.NewLogarithmicMapping(0.01) + +type statsPoint struct { + edgeTags []string + hash uint64 + parentHash uint64 + timestamp int64 + pathwayLatency int64 + edgeLatency int64 + payloadSize int64 +} + +type statsGroup struct { + service string + edgeTags []string + hash uint64 + parentHash uint64 + pathwayLatency *ddsketch.DDSketch + edgeLatency *ddsketch.DDSketch + payloadSize *ddsketch.DDSketch +} + +type bucket struct { + points map[uint64]statsGroup + latestCommitOffsets map[partitionConsumerKey]int64 + latestProduceOffsets map[partitionKey]int64 + start uint64 + duration uint64 +} + +func newBucket(start, duration uint64) bucket { + return bucket{ + points: make(map[uint64]statsGroup), + latestCommitOffsets: make(map[partitionConsumerKey]int64), + latestProduceOffsets: make(map[partitionKey]int64), + start: start, + duration: duration, + } +} + +func (b bucket) export(timestampType TimestampType) StatsBucket { + stats := make([]StatsPoint, 0, len(b.points)) + for _, s := range b.points { + pathwayLatency, err := proto.Marshal(s.pathwayLatency.ToProto()) + if err != nil { + log.Error("can't serialize pathway latency. Ignoring: %v", err) + continue + } + edgeLatency, err := proto.Marshal(s.edgeLatency.ToProto()) + if err != nil { + log.Error("can't serialize edge latency. Ignoring: %v", err) + continue + } + payloadSize, err := proto.Marshal(s.payloadSize.ToProto()) + if err != nil { + log.Error("can't serialize payload size. Ignoring: %v", err) + continue + } + stats = append(stats, StatsPoint{ + PathwayLatency: pathwayLatency, + EdgeLatency: edgeLatency, + Service: s.service, + EdgeTags: s.edgeTags, + Hash: s.hash, + ParentHash: s.parentHash, + TimestampType: timestampType, + PayloadSize: payloadSize, + }) + } + exported := StatsBucket{ + Start: b.start, + Duration: b.duration, + Stats: stats, + Backlogs: make([]Backlog, 0, len(b.latestCommitOffsets)+len(b.latestProduceOffsets)), + } + for key, offset := range b.latestProduceOffsets { + exported.Backlogs = append(exported.Backlogs, Backlog{Tags: []string{fmt.Sprintf("partition:%d", key.partition), fmt.Sprintf("topic:%s", key.topic), "type:kafka_produce"}, Value: offset}) + } + for key, offset := range b.latestCommitOffsets { + exported.Backlogs = append(exported.Backlogs, Backlog{Tags: []string{fmt.Sprintf("consumer_group:%s", key.group), fmt.Sprintf("partition:%d", key.partition), fmt.Sprintf("topic:%s", key.topic), "type:kafka_commit"}, Value: offset}) + } + return exported +} + +type processorStats struct { + payloadsIn int64 + flushedPayloads int64 + flushedBuckets int64 + flushErrors int64 + dropped int64 +} + +type partitionKey struct { + partition int32 + topic string +} + +type partitionConsumerKey struct { + partition int32 + topic string + group string +} + +type offsetType int + +const ( + produceOffset offsetType = iota + commitOffset +) + +type kafkaOffset struct { + offset int64 + topic string + group string + partition int32 + offsetType offsetType + timestamp int64 +} + +type Processor struct { + in chan statsPoint + inKafka chan kafkaOffset + tsTypeCurrentBuckets map[int64]bucket + tsTypeOriginBuckets map[int64]bucket + wg sync.WaitGroup + stopped uint64 + stop chan struct{} // closing this channel triggers shutdown + flushRequest chan chan<- struct{} + stats processorStats + transport *httpTransport + statsd internal.StatsdClient + env string + primaryTag string + service string + // used for tests + timeSource func() time.Time + disableStatsFlushing uint32 + getAgentSupportsDataStreams func() bool +} + +func (p *Processor) time() time.Time { + if p.timeSource != nil { + return p.timeSource() + } + return time.Now() +} + +func NewProcessor(statsd internal.StatsdClient, env, service string, agentURL *url.URL, httpClient *http.Client, getAgentSupportsDataStreams func() bool) *Processor { + if service == "" { + service = defaultServiceName + } + p := &Processor{ + tsTypeCurrentBuckets: make(map[int64]bucket), + tsTypeOriginBuckets: make(map[int64]bucket), + in: make(chan statsPoint, 10000), + inKafka: make(chan kafkaOffset, 10000), + stopped: 1, + statsd: statsd, + env: env, + service: service, + transport: newHTTPTransport(agentURL, httpClient), + timeSource: time.Now, + getAgentSupportsDataStreams: getAgentSupportsDataStreams, + } + p.updateAgentSupportsDataStreams(getAgentSupportsDataStreams()) + return p +} + +// alignTs returns the provided timestamp truncated to the bucket size. +// It gives us the start time of the time bucket in which such timestamp falls. +func alignTs(ts, bucketSize int64) int64 { return ts - ts%bucketSize } + +func (p *Processor) getBucket(btime int64, buckets map[int64]bucket) bucket { + b, ok := buckets[btime] + if !ok { + b = newBucket(uint64(btime), uint64(bucketDuration.Nanoseconds())) + buckets[btime] = b + } + return b +} +func (p *Processor) addToBuckets(point statsPoint, btime int64, buckets map[int64]bucket) { + b := p.getBucket(btime, buckets) + group, ok := b.points[point.hash] + if !ok { + group = statsGroup{ + edgeTags: point.edgeTags, + parentHash: point.parentHash, + hash: point.hash, + pathwayLatency: ddsketch.NewDDSketch(sketchMapping, store.DenseStoreConstructor(), store.DenseStoreConstructor()), + edgeLatency: ddsketch.NewDDSketch(sketchMapping, store.DenseStoreConstructor(), store.DenseStoreConstructor()), + payloadSize: ddsketch.NewDDSketch(sketchMapping, store.DenseStoreConstructor(), store.DenseStoreConstructor()), + } + b.points[point.hash] = group + } + if err := group.pathwayLatency.Add(math.Max(float64(point.pathwayLatency)/float64(time.Second), 0)); err != nil { + log.Error("failed to add pathway latency. Ignoring %v.", err) + } + if err := group.edgeLatency.Add(math.Max(float64(point.edgeLatency)/float64(time.Second), 0)); err != nil { + log.Error("failed to add edge latency. Ignoring %v.", err) + } + if err := group.payloadSize.Add(float64(point.payloadSize)); err != nil { + log.Error("failed to add payload size. Ignoring %v.", err) + } +} + +func (p *Processor) add(point statsPoint) { + currentBucketTime := alignTs(point.timestamp, bucketDuration.Nanoseconds()) + p.addToBuckets(point, currentBucketTime, p.tsTypeCurrentBuckets) + originTimestamp := point.timestamp - point.pathwayLatency + originBucketTime := alignTs(originTimestamp, bucketDuration.Nanoseconds()) + p.addToBuckets(point, originBucketTime, p.tsTypeOriginBuckets) +} + +func (p *Processor) addKafkaOffset(o kafkaOffset) { + btime := alignTs(o.timestamp, bucketDuration.Nanoseconds()) + b := p.getBucket(btime, p.tsTypeCurrentBuckets) + if o.offsetType == produceOffset { + b.latestProduceOffsets[partitionKey{ + partition: o.partition, + topic: o.topic, + }] = o.offset + return + } + b.latestCommitOffsets[partitionConsumerKey{ + partition: o.partition, + group: o.group, + topic: o.topic, + }] = o.offset +} + +func (p *Processor) run(tick <-chan time.Time) { + for { + select { + case s := <-p.in: + atomic.AddInt64(&p.stats.payloadsIn, 1) + p.add(s) + case o := <-p.inKafka: + p.addKafkaOffset(o) + case now := <-tick: + p.sendToAgent(p.flush(now)) + case done := <-p.flushRequest: + p.sendToAgent(p.flush(time.Now().Add(bucketDuration * 10))) + close(done) + case <-p.stop: + // drop in flight payloads on the input channel + p.sendToAgent(p.flush(time.Now().Add(bucketDuration * 10))) + return + } + } +} + +func (p *Processor) Start() { + if atomic.SwapUint64(&p.stopped, 0) == 0 { + // already running + log.Warn("(*Processor).Start called more than once. This is likely a programming error.") + return + } + p.stop = make(chan struct{}) + p.flushRequest = make(chan chan<- struct{}) + p.wg.Add(2) + go p.reportStats() + go func() { + defer p.wg.Done() + tick := time.NewTicker(bucketDuration) + defer tick.Stop() + p.run(tick.C) + }() + go func() { + defer p.wg.Done() + tick := time.NewTicker(loadAgentFeaturesInterval) + defer tick.Stop() + p.runLoadAgentFeatures(tick.C) + }() +} + +// Flush triggers a flush and waits for it to complete. +func (p *Processor) Flush() { + if atomic.LoadUint64(&p.stopped) > 0 { + return + } + done := make(chan struct{}) + select { + case p.flushRequest <- done: + <-done + case <-p.stop: + } +} + +func (p *Processor) Stop() { + if atomic.SwapUint64(&p.stopped, 1) > 0 { + return + } + close(p.stop) + p.wg.Wait() +} + +func (p *Processor) reportStats() { + for range time.NewTicker(time.Second * 10).C { + p.statsd.Count("datadog.datastreams.processor.payloads_in", atomic.SwapInt64(&p.stats.payloadsIn, 0), nil, 1) + p.statsd.Count("datadog.datastreams.processor.flushed_payloads", atomic.SwapInt64(&p.stats.flushedPayloads, 0), nil, 1) + p.statsd.Count("datadog.datastreams.processor.flushed_buckets", atomic.SwapInt64(&p.stats.flushedBuckets, 0), nil, 1) + p.statsd.Count("datadog.datastreams.processor.flush_errors", atomic.SwapInt64(&p.stats.flushErrors, 0), nil, 1) + p.statsd.Count("datadog.datastreams.processor.dropped_payloads", atomic.SwapInt64(&p.stats.dropped, 0), nil, 1) + } +} + +func (p *Processor) flushBucket(buckets map[int64]bucket, bucketStart int64, timestampType TimestampType) StatsBucket { + bucket := buckets[bucketStart] + delete(buckets, bucketStart) + return bucket.export(timestampType) +} + +func (p *Processor) flush(now time.Time) StatsPayload { + nowNano := now.UnixNano() + sp := StatsPayload{ + Service: p.service, + Env: p.env, + Lang: "go", + TracerVersion: version.Tag, + Stats: make([]StatsBucket, 0, len(p.tsTypeCurrentBuckets)+len(p.tsTypeOriginBuckets)), + } + for ts := range p.tsTypeCurrentBuckets { + if ts > nowNano-bucketDuration.Nanoseconds() { + // do not flush the bucket at the current time + continue + } + sp.Stats = append(sp.Stats, p.flushBucket(p.tsTypeCurrentBuckets, ts, TimestampTypeCurrent)) + } + for ts := range p.tsTypeOriginBuckets { + if ts > nowNano-bucketDuration.Nanoseconds() { + // do not flush the bucket at the current time + continue + } + sp.Stats = append(sp.Stats, p.flushBucket(p.tsTypeOriginBuckets, ts, TimestampTypeOrigin)) + } + return sp +} + +func (p *Processor) sendToAgent(payload StatsPayload) { + atomic.AddInt64(&p.stats.flushedPayloads, 1) + atomic.AddInt64(&p.stats.flushedBuckets, int64(len(payload.Stats))) + if err := p.transport.sendPipelineStats(&payload); err != nil { + atomic.AddInt64(&p.stats.flushErrors, 1) + } +} + +func (p *Processor) SetCheckpoint(ctx context.Context, edgeTags ...string) context.Context { + return p.SetCheckpointWithParams(ctx, options.CheckpointParams{}, edgeTags...) +} + +func (p *Processor) SetCheckpointWithParams(ctx context.Context, params options.CheckpointParams, edgeTags ...string) context.Context { + parent, hasParent := PathwayFromContext(ctx) + parentHash := uint64(0) + now := p.time() + pathwayStart := now + edgeStart := now + if hasParent { + pathwayStart = parent.PathwayStart() + edgeStart = parent.EdgeStart() + parentHash = parent.GetHash() + } + child := Pathway{ + hash: pathwayHash(nodeHash(p.service, p.env, edgeTags), parentHash), + pathwayStart: pathwayStart, + edgeStart: now, + } + select { + case p.in <- statsPoint{ + edgeTags: edgeTags, + parentHash: parentHash, + hash: child.hash, + timestamp: now.UnixNano(), + pathwayLatency: now.Sub(pathwayStart).Nanoseconds(), + edgeLatency: now.Sub(edgeStart).Nanoseconds(), + payloadSize: params.PayloadSize, + }: + default: + atomic.AddInt64(&p.stats.dropped, 1) + } + return ContextWithPathway(ctx, child) +} + +func (p *Processor) TrackKafkaCommitOffset(group string, topic string, partition int32, offset int64) { + select { + case p.inKafka <- kafkaOffset{ + offset: offset, + group: group, + topic: topic, + partition: partition, + offsetType: commitOffset, + timestamp: p.time().UnixNano(), + }: + default: + atomic.AddInt64(&p.stats.dropped, 1) + } +} + +func (p *Processor) TrackKafkaProduceOffset(topic string, partition int32, offset int64) { + select { + case p.inKafka <- kafkaOffset{ + offset: offset, + topic: topic, + partition: partition, + offsetType: produceOffset, + timestamp: p.time().UnixNano(), + }: + default: + atomic.AddInt64(&p.stats.dropped, 1) + } +} + +func (p *Processor) runLoadAgentFeatures(tick <-chan time.Time) { + for { + select { + case <-tick: + p.updateAgentSupportsDataStreams(p.getAgentSupportsDataStreams()) + case <-p.stop: + return + } + } +} + +func (p *Processor) updateAgentSupportsDataStreams(agentSupportsDataStreams bool) { + var disableStatsFlushing uint32 + if !agentSupportsDataStreams { + disableStatsFlushing = 1 + } + if atomic.SwapUint32(&p.disableStatsFlushing, disableStatsFlushing) != disableStatsFlushing { + if agentSupportsDataStreams { + log.Info("Detected agent upgrade. Turning on Data Streams Monitoring.") + } else { + log.Warn("Turning off Data Streams Monitoring. Upgrade your agent to 7.34+") + } + } +} diff --git a/internal/datastreams/processor_test.go b/internal/datastreams/processor_test.go new file mode 100644 index 0000000000..cf8ed9c3f5 --- /dev/null +++ b/internal/datastreams/processor_test.go @@ -0,0 +1,239 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package datastreams + +import ( + "context" + "net/url" + "sort" + "strings" + "testing" + "time" + + "gopkg.in/DataDog/dd-trace-go.v1/internal/version" + + "github.com/DataDog/sketches-go/ddsketch" + "github.com/DataDog/sketches-go/ddsketch/store" + "github.com/golang/protobuf/proto" + "github.com/stretchr/testify/assert" +) + +func buildSketch(values ...float64) []byte { + sketch := ddsketch.NewDDSketch(sketchMapping, store.DenseStoreConstructor(), store.DenseStoreConstructor()) + for _, v := range values { + sketch.Add(v) + } + bytes, _ := proto.Marshal(sketch.ToProto()) + return bytes +} + +func TestProcessor(t *testing.T) { + p := NewProcessor(nil, "env", "service", &url.URL{Scheme: "http", Host: "agent-address"}, nil, func() bool { return true }) + tp1 := time.Now().Truncate(bucketDuration) + tp2 := tp1.Add(time.Minute) + + p.add(statsPoint{ + edgeTags: []string{"type:edge-1"}, + hash: 2, + parentHash: 1, + timestamp: tp2.UnixNano(), + pathwayLatency: time.Second.Nanoseconds(), + edgeLatency: time.Second.Nanoseconds(), + payloadSize: 1, + }) + p.add(statsPoint{ + edgeTags: []string{"type:edge-1"}, + hash: 2, + parentHash: 1, + timestamp: tp2.UnixNano(), + pathwayLatency: (5 * time.Second).Nanoseconds(), + edgeLatency: (2 * time.Second).Nanoseconds(), + payloadSize: 2, + }) + p.add(statsPoint{ + edgeTags: []string{"type:edge-1"}, + hash: 3, + parentHash: 1, + timestamp: tp2.UnixNano(), + pathwayLatency: (5 * time.Second).Nanoseconds(), + edgeLatency: (2 * time.Second).Nanoseconds(), + payloadSize: 2, + }) + p.add(statsPoint{ + edgeTags: []string{"type:edge-1"}, + hash: 2, + parentHash: 1, + timestamp: tp1.UnixNano(), + pathwayLatency: (5 * time.Second).Nanoseconds(), + edgeLatency: (2 * time.Second).Nanoseconds(), + payloadSize: 2, + }) + got := p.flush(tp1.Add(bucketDuration)) + sort.Slice(got.Stats, func(i, j int) bool { + return got.Stats[i].Start < got.Stats[j].Start + }) + assert.Len(t, got.Stats, 2) + assert.Equal(t, StatsPayload{ + Env: "env", + Service: "service", + Stats: []StatsBucket{ + { + Start: uint64(tp1.Add(-10 * time.Second).UnixNano()), + Duration: uint64(bucketDuration.Nanoseconds()), + Stats: []StatsPoint{{ + EdgeTags: []string{"type:edge-1"}, + Hash: 2, + ParentHash: 1, + PathwayLatency: buildSketch(5), + EdgeLatency: buildSketch(2), + PayloadSize: buildSketch(2), + TimestampType: "origin", + }}, + Backlogs: []Backlog{}, + }, + { + Start: uint64(tp1.UnixNano()), + Duration: uint64(bucketDuration.Nanoseconds()), + Stats: []StatsPoint{{ + EdgeTags: []string{"type:edge-1"}, + Hash: 2, + ParentHash: 1, + PathwayLatency: buildSketch(5), + EdgeLatency: buildSketch(2), + PayloadSize: buildSketch(2), + TimestampType: "current", + }}, + Backlogs: []Backlog{}, + }, + }, + TracerVersion: version.Tag, + Lang: "go", + }, got) + + sp := p.flush(tp2.Add(bucketDuration)) + sort.Slice(sp.Stats, func(i, j int) bool { + return sp.Stats[i].Start < sp.Stats[j].Start + }) + for k := range sp.Stats { + sort.Slice(sp.Stats[k].Stats, func(i, j int) bool { + return sp.Stats[k].Stats[i].Hash < sp.Stats[k].Stats[j].Hash + }) + } + assert.Equal(t, StatsPayload{ + Env: "env", + Service: "service", + Stats: []StatsBucket{ + { + Start: uint64(tp2.Add(-time.Second * 10).UnixNano()), + Duration: uint64(bucketDuration.Nanoseconds()), + Stats: []StatsPoint{ + { + EdgeTags: []string{"type:edge-1"}, + Hash: 2, + ParentHash: 1, + PathwayLatency: buildSketch(1, 5), + EdgeLatency: buildSketch(1, 2), + PayloadSize: buildSketch(1, 2), + TimestampType: "origin", + }, + { + EdgeTags: []string{"type:edge-1"}, + Hash: 3, + ParentHash: 1, + PathwayLatency: buildSketch(5), + EdgeLatency: buildSketch(2), + PayloadSize: buildSketch(2), + TimestampType: "origin", + }, + }, + Backlogs: []Backlog{}, + }, + { + Start: uint64(tp2.UnixNano()), + Duration: uint64(bucketDuration.Nanoseconds()), + Stats: []StatsPoint{ + { + EdgeTags: []string{"type:edge-1"}, + Hash: 2, + ParentHash: 1, + PathwayLatency: buildSketch(1, 5), + EdgeLatency: buildSketch(1, 2), + PayloadSize: buildSketch(1, 2), + TimestampType: "current", + }, + { + EdgeTags: []string{"type:edge-1"}, + Hash: 3, + ParentHash: 1, + PathwayLatency: buildSketch(5), + EdgeLatency: buildSketch(2), + PayloadSize: buildSketch(2), + TimestampType: "current", + }, + }, + Backlogs: []Backlog{}, + }, + }, + TracerVersion: version.Tag, + Lang: "go", + }, sp) +} + +func TestSetCheckpoint(t *testing.T) { + processor := Processor{ + stopped: 1, + in: make(chan statsPoint, 10), + service: "service-1", + env: "env", + timeSource: time.Now, + } + hash1 := pathwayHash(nodeHash("service-1", "env", []string{"direction:in", "type:kafka"}), 0) + hash2 := pathwayHash(nodeHash("service-1", "env", []string{"direction:out", "type:kafka"}), hash1) + + ctx := processor.SetCheckpoint(context.Background(), "direction:in", "type:kafka") + pathway, _ := PathwayFromContext(processor.SetCheckpoint(ctx, "direction:out", "type:kafka")) + + statsPt1 := <-processor.in + statsPt2 := <-processor.in + + assert.Equal(t, []string{"direction:in", "type:kafka"}, statsPt1.edgeTags) + assert.Equal(t, hash1, statsPt1.hash) + assert.Equal(t, uint64(0), statsPt1.parentHash) + + assert.Equal(t, []string{"direction:out", "type:kafka"}, statsPt2.edgeTags) + assert.Equal(t, hash2, statsPt2.hash) + assert.Equal(t, hash1, statsPt2.parentHash) + + assert.Equal(t, statsPt2.hash, pathway.GetHash()) +} + +func TestKafkaLag(t *testing.T) { + p := NewProcessor(nil, "env", "service", &url.URL{Scheme: "http", Host: "agent-address"}, nil, func() bool { return true }) + tp1 := time.Now() + p.addKafkaOffset(kafkaOffset{offset: 1, topic: "topic1", partition: 1, group: "group1", offsetType: commitOffset}) + p.addKafkaOffset(kafkaOffset{offset: 10, topic: "topic2", partition: 1, group: "group1", offsetType: commitOffset}) + p.addKafkaOffset(kafkaOffset{offset: 5, topic: "topic1", partition: 1, offsetType: produceOffset}) + p.addKafkaOffset(kafkaOffset{offset: 15, topic: "topic1", partition: 1, offsetType: produceOffset}) + point := p.flush(tp1.Add(bucketDuration * 2)) + sort.Slice(point.Stats[0].Backlogs, func(i, j int) bool { + return strings.Join(point.Stats[0].Backlogs[i].Tags, "") < strings.Join(point.Stats[0].Backlogs[j].Tags, "") + }) + expectedBacklogs := []Backlog{ + { + Tags: []string{"consumer_group:group1", "partition:1", "topic:topic1", "type:kafka_commit"}, + Value: 1, + }, + { + Tags: []string{"consumer_group:group1", "partition:1", "topic:topic2", "type:kafka_commit"}, + Value: 10, + }, + { + Tags: []string{"partition:1", "topic:topic1", "type:kafka_produce"}, + Value: 15, + }, + } + assert.Equal(t, expectedBacklogs, point.Stats[0].Backlogs) +} diff --git a/internal/datastreams/propagator.go b/internal/datastreams/propagator.go new file mode 100644 index 0000000000..bd8168a08d --- /dev/null +++ b/internal/datastreams/propagator.go @@ -0,0 +1,87 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package datastreams + +import ( + "context" + "encoding/base64" + "encoding/binary" + "errors" + "time" + + "github.com/DataDog/sketches-go/ddsketch/encoding" +) + +type contextKey struct{} + +var activePathwayKey = contextKey{} + +const ( + // PropagationKeyBase64 is the key to use to propagate the pathway between services. + PropagationKeyBase64 = "dd-pathway-ctx-base64" +) + +// Encode encodes the pathway +func (p Pathway) Encode() []byte { + data := make([]byte, 8, 20) + binary.LittleEndian.PutUint64(data, p.hash) + encoding.EncodeVarint64(&data, p.pathwayStart.UnixNano()/int64(time.Millisecond)) + encoding.EncodeVarint64(&data, p.edgeStart.UnixNano()/int64(time.Millisecond)) + return data +} + +// Decode decodes a pathway +func Decode(ctx context.Context, data []byte) (p Pathway, outCtx context.Context, err error) { + if len(data) < 8 { + return p, ctx, errors.New("hash smaller than 8 bytes") + } + p.hash = binary.LittleEndian.Uint64(data) + data = data[8:] + pathwayStart, err := encoding.DecodeVarint64(&data) + if err != nil { + return p, ctx, err + } + edgeStart, err := encoding.DecodeVarint64(&data) + if err != nil { + return p, ctx, err + } + p.pathwayStart = time.Unix(0, pathwayStart*int64(time.Millisecond)) + p.edgeStart = time.Unix(0, edgeStart*int64(time.Millisecond)) + return p, ContextWithPathway(ctx, p), nil +} + +// EncodeBase64 encodes a pathway context into a string using base64 encoding. +func (p Pathway) EncodeBase64() string { + b := p.Encode() + return base64.StdEncoding.EncodeToString(b) +} + +// DecodeBase64 decodes a pathway context from a string using base64 encoding. +func DecodeBase64(ctx context.Context, str string) (p Pathway, outCtx context.Context, err error) { + data, err := base64.StdEncoding.DecodeString(str) + if err != nil { + return p, ctx, err + } + return Decode(ctx, data) +} + +// ContextWithPathway returns a copy of the given context which includes the pathway p. +func ContextWithPathway(ctx context.Context, p Pathway) context.Context { + return context.WithValue(ctx, activePathwayKey, p) +} + +// PathwayFromContext returns the pathway contained in the given context, and whether a +// pathway is found in ctx. +func PathwayFromContext(ctx context.Context) (p Pathway, ok bool) { + if ctx == nil { + return p, false + } + v := ctx.Value(activePathwayKey) + if p, ok := v.(Pathway); ok { + return p, true + } + return p, false +} diff --git a/internal/datastreams/propagator_test.go b/internal/datastreams/propagator_test.go new file mode 100644 index 0000000000..0334b97138 --- /dev/null +++ b/internal/datastreams/propagator_test.go @@ -0,0 +1,39 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package datastreams + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func testPathway() Pathway { + now := time.Now().Local().Truncate(time.Millisecond) + return Pathway{ + hash: 234, + pathwayStart: now.Add(-time.Hour), + edgeStart: now, + } +} + +func TestEncode(t *testing.T) { + p := testPathway() + encoded := p.Encode() + decoded, _, err := Decode(context.Background(), encoded) + assert.Nil(t, err) + assert.Equal(t, p, decoded) +} + +func TestEncodeBase64(t *testing.T) { + p := testPathway() + encoded := p.EncodeBase64() + decoded, _, err := DecodeBase64(context.Background(), encoded) + assert.Nil(t, err) + assert.Equal(t, p, decoded) +} diff --git a/internal/datastreams/transport.go b/internal/datastreams/transport.go new file mode 100644 index 0000000000..62433c7557 --- /dev/null +++ b/internal/datastreams/transport.go @@ -0,0 +1,115 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package datastreams + +import ( + "bytes" + "compress/gzip" + "fmt" + "net" + "net/http" + "net/url" + "runtime" + "strings" + "time" + + "gopkg.in/DataDog/dd-trace-go.v1/internal" + + "github.com/tinylib/msgp/msgp" +) + +const ( + defaultHostname = "localhost" + defaultPort = "8126" + defaultAddress = defaultHostname + ":" + defaultPort + defaultHTTPTimeout = 2 * time.Second // defines the current timeout before giving up with the send process +) + +var defaultDialer = &net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + DualStack: true, +} + +var defaultClient = &http.Client{ + // We copy the transport to avoid using the default one, as it might be + // augmented with tracing and we don't want these calls to be recorded. + // See https://golang.org/pkg/net/http/#DefaultTransport . + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: defaultDialer.DialContext, + MaxIdleConns: 100, + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + }, + Timeout: defaultHTTPTimeout, +} + +type httpTransport struct { + url string // the delivery URL for stats + client *http.Client // the HTTP client used in the POST + headers map[string]string // the Transport headers +} + +func newHTTPTransport(agentURL *url.URL, client *http.Client) *httpTransport { + // initialize the default EncoderPool with Encoder headers + defaultHeaders := map[string]string{ + "Datadog-Meta-Lang": "go", + "Datadog-Meta-Lang-Version": strings.TrimPrefix(runtime.Version(), "go"), + "Datadog-Meta-Lang-Interpreter": runtime.Compiler + "-" + runtime.GOARCH + "-" + runtime.GOOS, + "Content-Type": "application/msgpack", + "Content-Encoding": "gzip", + } + if cid := internal.ContainerID(); cid != "" { + defaultHeaders["Datadog-Container-ID"] = cid + } + url := fmt.Sprintf("%s/v0.1/pipeline_stats", agentURL.String()) + return &httpTransport{ + url: url, + client: client, + headers: defaultHeaders, + } +} + +func (t *httpTransport) sendPipelineStats(p *StatsPayload) error { + var buf bytes.Buffer + gzipWriter, err := gzip.NewWriterLevel(&buf, gzip.BestSpeed) + if err != nil { + return err + } + if err := msgp.Encode(gzipWriter, p); err != nil { + return err + } + err = gzipWriter.Close() + if err != nil { + return err + } + req, err := http.NewRequest("POST", t.url, &buf) + if err != nil { + return err + } + for header, value := range t.headers { + req.Header.Set(header, value) + } + resp, err := t.client.Do(req) + if err != nil { + return err + } + if code := resp.StatusCode; code >= 400 { + // error, check the body for context information and + // return a nice error. + msg := make([]byte, 1000) + n, _ := resp.Body.Read(msg) + resp.Body.Close() + txt := http.StatusText(code) + if n > 0 { + return fmt.Errorf("%s (Status: %s)", msg[:n], txt) + } + return fmt.Errorf("%s", txt) + } + return nil +} diff --git a/internal/datastreams/transport_test.go b/internal/datastreams/transport_test.go new file mode 100644 index 0000000000..630a288f35 --- /dev/null +++ b/internal/datastreams/transport_test.go @@ -0,0 +1,44 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package datastreams + +import ( + "net/http" + "net/url" + "testing" + + "github.com/stretchr/testify/assert" +) + +type fakeTransport struct { + requests []*http.Request +} + +func (t *fakeTransport) RoundTrip(r *http.Request) (*http.Response, error) { + t.requests = append(t.requests, r) + return &http.Response{StatusCode: 200}, nil +} + +func TestHTTPTransport(t *testing.T) { + p := StatsPayload{Env: "env-1", Stats: []StatsBucket{{ + Start: 2, + Duration: 10, + Stats: []StatsPoint{{ + Service: "service-1", + EdgeTags: []string{"edge-1"}, + Hash: 1, + ParentHash: 2, + PathwayLatency: []byte{1, 2, 3}, + EdgeLatency: []byte{4, 5, 6}, + }}, + }}} + fakeTransport := fakeTransport{} + transport := newHTTPTransport(&url.URL{Scheme: "http", Host: "agent-address:8126"}, &http.Client{Transport: &fakeTransport}) + assert.Nil(t, transport.sendPipelineStats(&p)) + assert.Len(t, fakeTransport.requests, 1) + r := fakeTransport.requests[0] + assert.Equal(t, "http://agent-address:8126/v0.1/pipeline_stats", r.URL.String()) +} diff --git a/internal/statsd.go b/internal/statsd.go new file mode 100644 index 0000000000..cccf35f8f1 --- /dev/null +++ b/internal/statsd.go @@ -0,0 +1,17 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016 Datadog, Inc. + +package internal + +import "time" + +type StatsdClient interface { + Incr(name string, tags []string, rate float64) error + Count(name string, value int64, tags []string, rate float64) error + Gauge(name string, value float64, tags []string, rate float64) error + Timing(name string, value time.Duration, tags []string, rate float64) error + Flush() error + Close() error +}