Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

datastreams: Port data-streams-go to dd-trace-go #2006

Merged
merged 18 commits into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
/contrib/**/*appsec*.go @DataDog/asm-go
/.github/workflows/appsec.yml @DataDog/asm-go

# datastreams
/datastreams @Datadog/data-streams-monitoring
piochelepiotr marked this conversation as resolved.
Show resolved Hide resolved

# telemetry
/internal/telemetry @DataDog/apm-go

Expand Down
16 changes: 16 additions & 0 deletions contrib/Shopify/sarama/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ type config struct {
consumerSpanName string
producerSpanName string
analyticsRate float64
dataStreamsEnabled bool
groupID string
}

func defaults(cfg *config) {
Expand Down Expand Up @@ -51,6 +53,20 @@ func WithServiceName(name string) Option {
}
}

// WithDataStreams enables the Data Streams monitoring product features: https://www.datadoghq.com/product/data-streams-monitoring/
func WithDataStreams() Option {
return func(cfg *config) {
cfg.dataStreamsEnabled = true
}
}

// WithGroupID tags the produced data streams metrics with the given groupID (aka consumer group)
func WithGroupID(groupID string) Option {
return func(cfg *config) {
cfg.groupID = groupID
}
}

// WithAnalytics enables Trace Analytics for all started spans.
func WithAnalytics(on bool) Option {
return func(cfg *config) {
Expand Down
74 changes: 74 additions & 0 deletions contrib/Shopify/sarama/sarama.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@
package sarama // import "gopkg.in/DataDog/dd-trace-go.v1/contrib/Shopify/sarama"

import (
"context"
"gopkg.in/DataDog/dd-trace-go.v1/datastreams/options"
piochelepiotr marked this conversation as resolved.
Show resolved Hide resolved
"math"

"gopkg.in/DataDog/dd-trace-go.v1/datastreams"
piochelepiotr marked this conversation as resolved.
Show resolved Hide resolved
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
Expand Down Expand Up @@ -76,6 +79,7 @@ func WrapPartitionConsumer(pc sarama.PartitionConsumer, opts ...Option) sarama.P
next := tracer.StartSpan(cfg.consumerSpanName, opts...)
// reinject the span context so consumers can pick it up
tracer.Inject(next.Context(), carrier)
setConsumeCheckpoint(cfg.dataStreamsEnabled, cfg.groupID, msg)

wrapped.messages <- msg

Expand Down Expand Up @@ -127,8 +131,12 @@ type syncProducer struct {
// SendMessage calls sarama.SyncProducer.SendMessage and traces the request.
func (p *syncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) {
span := startProducerSpan(p.cfg, p.version, msg)
setProduceCheckpoint(p.cfg.dataStreamsEnabled, msg, p.version)
partition, offset, err = p.SyncProducer.SendMessage(msg)
finishProducerSpan(span, partition, offset, err)
if err == nil && p.cfg.dataStreamsEnabled {
tracer.TrackKafkaProduceOffset(msg.Topic, partition, offset)
}
return partition, offset, err
}

Expand All @@ -138,12 +146,19 @@ func (p *syncProducer) SendMessages(msgs []*sarama.ProducerMessage) error {
// treated individually, so we create a span for each one
spans := make([]ddtrace.Span, len(msgs))
for i, msg := range msgs {
setProduceCheckpoint(p.cfg.dataStreamsEnabled, msg, p.version)
spans[i] = startProducerSpan(p.cfg, p.version, msg)
}
err := p.SyncProducer.SendMessages(msgs)
for i, span := range spans {
finishProducerSpan(span, msgs[i].Partition, msgs[i].Offset, err)
}
if err == nil && p.cfg.dataStreamsEnabled {
// we only track Kafka lag if messages have been sent successfully. Otherwise, we have no way to know to which partition data was sent to.
for _, msg := range msgs {
piochelepiotr marked this conversation as resolved.
Show resolved Hide resolved
tracer.TrackKafkaProduceOffset(msg.Topic, msg.Partition, msg.Offset)
}
}
return err
}

Expand Down Expand Up @@ -221,6 +236,7 @@ func WrapAsyncProducer(saramaConfig *sarama.Config, p sarama.AsyncProducer, opts
select {
case msg := <-wrapped.input:
span := startProducerSpan(cfg, saramaConfig.Version, msg)
setProduceCheckpoint(cfg.dataStreamsEnabled, msg, saramaConfig.Version)
p.Input() <- msg
if saramaConfig.Producer.Return.Successes {
spanID := span.Context().SpanID()
Expand All @@ -236,6 +252,10 @@ func WrapAsyncProducer(saramaConfig *sarama.Config, p sarama.AsyncProducer, opts
// producer was closed, so exit
return
}
if cfg.dataStreamsEnabled {
// we only track Kafka lag if returning successes is enabled. Otherwise, we have no way to know to which partition data was sent to.
tracer.TrackKafkaProduceOffset(msg.Topic, msg.Partition, msg.Offset)
}
if spanctx, spanFound := getSpanContext(msg); spanFound {
spanID := spanctx.SpanID()
if span, ok := spans[spanID]; ok {
Expand Down Expand Up @@ -303,3 +323,57 @@ func getSpanContext(msg *sarama.ProducerMessage) (ddtrace.SpanContext, bool) {

return spanctx, true
}

func setProduceCheckpoint(enabled bool, msg *sarama.ProducerMessage, version sarama.KafkaVersion) {
ajgajg1134 marked this conversation as resolved.
Show resolved Hide resolved
if !enabled || msg == nil {
return
}
edges := []string{"direction:out", "topic:" + msg.Topic, "type:kafka"}
carrier := NewProducerMessageCarrier(msg)
ctx, ok := tracer.SetDataStreamsCheckpointWithParams(datastreams.ExtractFromBytesCarrier(context.Background(), carrier), options.CheckpointParams{PayloadSize: getProducerMsgSize(msg)}, edges...)
if !ok || !version.IsAtLeast(sarama.V0_11_0_0) {
return
}
datastreams.InjectToBytesCarrier(ctx, carrier)
}

func setConsumeCheckpoint(enabled bool, groupID string, msg *sarama.ConsumerMessage) {
if !enabled || msg == nil {
return
}
edges := []string{"direction:in", "topic:" + msg.Topic, "type:kafka"}
if groupID != "" {
edges = append(edges, "group:"+groupID)
}
carrier := NewConsumerMessageCarrier(msg)
ctx, ok := tracer.SetDataStreamsCheckpointWithParams(datastreams.ExtractFromBytesCarrier(context.Background(), carrier), options.CheckpointParams{PayloadSize: getConsumerMsgSize(msg)}, edges...)
if !ok {
return
}
datastreams.InjectToBytesCarrier(ctx, carrier)
if groupID != "" {
// only track Kafka lag if a consumer group is set.
// since there is no ack mechanism, we consider that messages read are committed right away.
tracer.TrackKafkaCommitOffset(groupID, msg.Topic, msg.Partition, msg.Offset)
}
}

func getProducerMsgSize(msg *sarama.ProducerMessage) (size int64) {
for _, header := range msg.Headers {
size += int64(len(header.Key) + len(header.Value))
}
if msg.Value != nil {
size += int64(msg.Value.Length())
}
if msg.Key != nil {
size += int64(msg.Key.Length())
}
return size
}

func getConsumerMsgSize(msg *sarama.ConsumerMessage) (size int64) {
for _, header := range msg.Headers {
size += int64(len(header.Key) + len(header.Value))
}
return size + int64(len(msg.Value)+len(msg.Key))
}
27 changes: 24 additions & 3 deletions contrib/Shopify/sarama/sarama_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"gopkg.in/DataDog/dd-trace-go.v1/contrib/internal/namingschematest"
"gopkg.in/DataDog/dd-trace-go.v1/datastreams"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
Expand Down Expand Up @@ -115,7 +116,7 @@ func TestConsumer(t *testing.T) {
}
defer consumer.Close()

consumer = WrapConsumer(consumer)
consumer = WrapConsumer(consumer, WithDataStreams())

partitionConsumer, err := consumer.ConsumePartition("test-topic", 0, 0)
if err != nil {
Expand Down Expand Up @@ -145,6 +146,12 @@ func TestConsumer(t *testing.T) {
assert.Equal(t, "Shopify/sarama", s.Tag(ext.Component))
assert.Equal(t, ext.SpanKindConsumer, s.Tag(ext.SpanKind))
assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem))
p, ok := datastreams.PathwayFromContext(datastreams.ExtractFromBytesCarrier(context.Background(), NewConsumerMessageCarrier(msg1)))
assert.True(t, ok)
expectedCtx, _ := tracer.SetDataStreamsCheckpoint(context.Background(), "direction:in", "topic:test-topic", "type:kafka")
expected, _ := datastreams.PathwayFromContext(expectedCtx)
assert.NotEqual(t, expected.GetHash(), 0)
assert.Equal(t, expected.GetHash(), p.GetHash())
}
{
s := spans[1]
Expand All @@ -162,6 +169,12 @@ func TestConsumer(t *testing.T) {
assert.Equal(t, "Shopify/sarama", s.Tag(ext.Component))
assert.Equal(t, ext.SpanKindConsumer, s.Tag(ext.SpanKind))
assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem))
p, ok := datastreams.PathwayFromContext(datastreams.ExtractFromBytesCarrier(context.Background(), NewConsumerMessageCarrier(msg1)))
assert.True(t, ok)
expectedCtx, _ := tracer.SetDataStreamsCheckpoint(context.Background(), "direction:in", "topic:test-topic", "type:kafka")
expected, _ := datastreams.PathwayFromContext(expectedCtx)
assert.NotEqual(t, expected.GetHash(), 0)
assert.Equal(t, expected.GetHash(), p.GetHash())
}
}

Expand All @@ -176,23 +189,25 @@ func TestSyncProducer(t *testing.T) {
defer leader.Close()

metadataResponse := new(sarama.MetadataResponse)
metadataResponse.Version = 1
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError)
seedBroker.Returns(metadataResponse)

prodSuccess := new(sarama.ProduceResponse)
prodSuccess.Version = 2
prodSuccess.AddTopicPartition("my_topic", 0, sarama.ErrNoError)
leader.Returns(prodSuccess)

cfg := sarama.NewConfig()
cfg.Version = sarama.MinVersion
cfg.Version = sarama.V0_11_0_0 // first version that supports headers
cfg.Producer.Return.Successes = true

producer, err := sarama.NewSyncProducer([]string{seedBroker.Addr()}, cfg)
if err != nil {
t.Fatal(err)
}
producer = WrapSyncProducer(cfg, producer)
producer = WrapSyncProducer(cfg, producer, WithDataStreams())

msg1 := &sarama.ProducerMessage{
Topic: "my_topic",
Expand All @@ -214,6 +229,12 @@ func TestSyncProducer(t *testing.T) {
assert.Equal(t, "Shopify/sarama", s.Tag(ext.Component))
assert.Equal(t, ext.SpanKindProducer, s.Tag(ext.SpanKind))
assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem))
p, ok := datastreams.PathwayFromContext(datastreams.ExtractFromBytesCarrier(context.Background(), NewProducerMessageCarrier(msg1)))
assert.True(t, ok)
expectedCtx, _ := tracer.SetDataStreamsCheckpoint(context.Background(), "direction:out", "topic:my_topic", "type:kafka")
expected, _ := datastreams.PathwayFromContext(expectedCtx)
assert.NotEqual(t, expected.GetHash(), 0)
assert.Equal(t, expected.GetHash(), p.GetHash())
}
}

Expand Down
ajgajg1134 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ var (

// This example shows how a span context can be passed from a producer to a consumer.
func Example() {

tracer.Start()
defer tracer.Stop()

Expand All @@ -31,6 +32,7 @@ func Example() {
"session.timeout.ms": 10,
"enable.auto.offset.store": false,
})

err = c.Subscribe(testTopic, nil)
if err != nil {
panic(err)
Expand All @@ -56,6 +58,7 @@ func Example() {
tracer.Inject(parentSpan.Context(), carrier)

c.Consumer.Events() <- msg

}()

msg := (<-c.Events()).(*kafka.Message)
Expand All @@ -66,6 +69,7 @@ func Example() {
if err != nil {
panic(err)
}

parentContext := parentSpan.Context()

// Validate that the context passed is the context sent via the message
Expand Down
Loading
Loading