Skip to content

Commit

Permalink
datastreams: Re-use main tracer for data streams
Browse files Browse the repository at this point in the history
  • Loading branch information
piochelepiotr committed Aug 3, 2023
1 parent 031616b commit 386f073
Show file tree
Hide file tree
Showing 35 changed files with 470 additions and 979 deletions.
14 changes: 7 additions & 7 deletions contrib/Shopify/sarama/data_streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,26 @@ import (
"github.com/Shopify/sarama"

"gopkg.in/DataDog/dd-trace-go.v1/datastreams"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)

func TraceKafkaProduce(ctx context.Context, msg *sarama.ProducerMessage) context.Context {
edges := []string{"direction:out", "topic:" + msg.Topic, "type:kafka"}
p, ctx := datastreams.SetCheckpoint(ctx, edges...)
msg.Headers = append(msg.Headers, sarama.RecordHeader{Key: []byte(datastreams.PropagationKey), Value: p.Encode()})
p, ctx := tracer.SetDataStreamsCheckpoint(ctx, edges...)
if p != nil {
msg.Headers = append(msg.Headers, sarama.RecordHeader{Key: []byte(datastreams.PropagationKey), Value: p.Encode()})
}
return ctx
}

func TraceKafkaConsume(ctx context.Context, msg *sarama.ConsumerMessage, group string) context.Context {
for _, header := range msg.Headers {
if header != nil && string(header.Key) == datastreams.PropagationKey {
p, err := datastreams.Decode(header.Value)
if err == nil {
ctx = datastreams.ContextWithPathway(ctx, p)
}
_, ctx, _ = datastreams.Decode(ctx, header.Value)
break
}
}
edges := []string{"direction:in", "group:" + group, "topic:" + msg.Topic, "type:kafka"}
_, ctx = datastreams.SetCheckpoint(ctx, edges...)
_, ctx = tracer.SetDataStreamsCheckpoint(ctx, edges...)
return ctx
}
40 changes: 22 additions & 18 deletions contrib/Shopify/sarama/data_streams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ package sarama

import (
"context"
"gopkg.in/DataDog/dd-trace-go.v1/datastreams/dsminterface"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
"testing"

"github.com/Shopify/sarama"
Expand All @@ -17,9 +20,10 @@ import (

func TestTraceKafkaProduce(t *testing.T) {
t.Run("Checkpoint should be created and pathway should be propagated to kafka headers", func(t *testing.T) {
ctx := context.Background()
initialPathway := datastreams.NewPathway()
ctx = datastreams.ContextWithPathway(ctx, initialPathway)
mt := mocktracer.Start()
defer mt.Stop()

initialPathway, ctx := tracer.SetDataStreamsCheckpoint(context.Background(), "direction:in", "type:rabbitmq")

msg := sarama.ProducerMessage{
Topic: "test",
Expand All @@ -28,7 +32,7 @@ func TestTraceKafkaProduce(t *testing.T) {
ctx = TraceKafkaProduce(ctx, &msg)

// The old pathway shouldn't be equal to the new pathway found in the ctx because we created a new checkpoint.
ctxPathway, _ := datastreams.PathwayFromContext(ctx)
ctxPathway := datastreams.PathwayFromContext(ctx)
assertPathwayNotEqual(t, initialPathway, ctxPathway)

// The decoded pathway found in the kafka headers should be the same as the pathway found in the ctx.
Expand All @@ -38,17 +42,17 @@ func TestTraceKafkaProduce(t *testing.T) {
encodedPathway = header.Value
}
}
headersPathway, _ := datastreams.Decode(encodedPathway)
headersPathway, _, _ := datastreams.Decode(context.Background(), encodedPathway)
assertPathwayEqual(t, ctxPathway, headersPathway)
})
}

func TestTraceKafkaConsume(t *testing.T) {
t.Run("Checkpoint should be created and pathway should be extracted from kafka headers into context", func(t *testing.T) {
// First, set up pathway and context as it would have been from the producer view.
producerCtx := context.Background()
initialPathway := datastreams.NewPathway()
producerCtx = datastreams.ContextWithPathway(producerCtx, initialPathway)
mt := mocktracer.Start()
defer mt.Stop()

_, producerCtx := tracer.SetDataStreamsCheckpoint(context.Background(), "direction:out", "topic:topic1")

topic := "my-topic"
produceMessage := sarama.ProducerMessage{
Expand All @@ -67,25 +71,25 @@ func TestTraceKafkaConsume(t *testing.T) {
consumerCtx = TraceKafkaConsume(consumerCtx, &consumeMessage, group)

// Check that the resulting consumerCtx contains an expected pathway.
consumerCtxPathway, _ := datastreams.PathwayFromContext(consumerCtx)
_, expectedCtx := datastreams.SetCheckpoint(producerCtx, "direction:in", "group:my-consumer-group", "topic:my-topic", "type:kafka")
expectedCtxPathway, _ := datastreams.PathwayFromContext(expectedCtx)
consumerCtxPathway := datastreams.PathwayFromContext(consumerCtx)
_, expectedCtx := tracer.SetDataStreamsCheckpoint(producerCtx, "direction:in", "group:my-consumer-group", "topic:my-topic", "type:kafka")
expectedCtxPathway := datastreams.PathwayFromContext(expectedCtx)
assertPathwayEqual(t, expectedCtxPathway, consumerCtxPathway)
})
}

func assertPathwayNotEqual(t *testing.T, p1 datastreams.Pathway, p2 datastreams.Pathway) {
decodedP1, err1 := datastreams.Decode(p1.Encode())
decodedP2, err2 := datastreams.Decode(p2.Encode())
func assertPathwayNotEqual(t *testing.T, p1 dsminterface.Pathway, p2 dsminterface.Pathway) {
decodedP1, _, err1 := datastreams.Decode(context.Background(), p1.Encode())
decodedP2, _, err2 := datastreams.Decode(context.Background(), p2.Encode())

assert.Nil(t, err1)
assert.Nil(t, err2)
assert.NotEqual(t, decodedP1, decodedP2)
}

func assertPathwayEqual(t *testing.T, p1 datastreams.Pathway, p2 datastreams.Pathway) {
decodedP1, err1 := datastreams.Decode(p1.Encode())
decodedP2, err2 := datastreams.Decode(p2.Encode())
func assertPathwayEqual(t *testing.T, p1 dsminterface.Pathway, p2 dsminterface.Pathway) {
decodedP1, _, err1 := datastreams.Decode(context.Background(), p1.Encode())
decodedP2, _, err2 := datastreams.Decode(context.Background(), p2.Encode())

assert.Nil(t, err1)
assert.Nil(t, err2)
Expand Down
15 changes: 8 additions & 7 deletions contrib/confluentinc/confluent-kafka-go/kafka/data_streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/confluentinc/confluent-kafka-go/kafka"

"gopkg.in/DataDog/dd-trace-go.v1/datastreams"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)

func TraceKafkaProduce(ctx context.Context, msg *kafka.Message) context.Context {
Expand All @@ -19,18 +20,18 @@ func TraceKafkaProduce(ctx context.Context, msg *kafka.Message) context.Context
edges = append(edges, "topic:"+*msg.TopicPartition.Topic)
}
edges = append(edges, "type:kafka")
p, ctx := datastreams.SetCheckpoint(ctx, edges...)
msg.Headers = append(msg.Headers, kafka.Header{Key: datastreams.PropagationKey, Value: p.Encode()})
p, ctx := tracer.SetDataStreamsCheckpoint(ctx, edges...)
if p != nil {
msg.Headers = append(msg.Headers, kafka.Header{Key: datastreams.PropagationKey, Value: p.Encode()})
}
return ctx
}

func TraceKafkaConsume(ctx context.Context, msg *kafka.Message, group string) context.Context {
for _, header := range msg.Headers {
if header.Key == datastreams.PropagationKey {
p, err := datastreams.Decode(header.Value)
if err == nil {
ctx = datastreams.ContextWithPathway(ctx, p)
}
_, ctx, _ = datastreams.Decode(ctx, header.Value)
break
}
}
edges := []string{"direction:in", "group:" + group}
Expand All @@ -39,6 +40,6 @@ func TraceKafkaConsume(ctx context.Context, msg *kafka.Message, group string) co
}
edges = append(edges, "type:kafka")
edges = append(edges)
_, ctx = datastreams.SetCheckpoint(ctx, edges...)
_, ctx = tracer.SetDataStreamsCheckpoint(ctx, edges...)
return ctx
}
39 changes: 21 additions & 18 deletions contrib/confluentinc/confluent-kafka-go/kafka/data_streams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ package kafka
import (
"context"
"fmt"
"gopkg.in/DataDog/dd-trace-go.v1/datastreams/dsminterface"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
"testing"

"github.com/confluentinc/confluent-kafka-go/kafka"
Expand All @@ -18,10 +21,10 @@ import (

func TestTraceKafkaConsume(t *testing.T) {
t.Run("Checkpoint should be created and pathway should be extracted from kafka headers into context", func(t *testing.T) {
mt := mocktracer.Start()
defer mt.Stop()
// First, set up pathway and context as it would have been from the producer view.
producerCtx := context.Background()
initialPathway := datastreams.NewPathway()
producerCtx = datastreams.ContextWithPathway(producerCtx, initialPathway)
_, producerCtx := tracer.SetDataStreamsCheckpoint(context.Background(), "direction:in", "type:kafka")

topic := "my-topic"
msg := kafka.Message{
Expand All @@ -38,29 +41,29 @@ func TestTraceKafkaConsume(t *testing.T) {
consumerCtx = TraceKafkaConsume(consumerCtx, &msg, group)

// Check that the resulting consumerCtx contains an expected pathway.
consumerCtxPathway, _ := datastreams.PathwayFromContext(consumerCtx)
consumerCtxPathway := datastreams.PathwayFromContext(consumerCtx)
fmt.Println("setting")
_, expectedCtx := datastreams.SetCheckpoint(producerCtx, "direction:in", "group:my-consumer-group", "topic:my-topic", "type:kafka")
expectedCtxPathway, _ := datastreams.PathwayFromContext(expectedCtx)
_, expectedCtx := tracer.SetDataStreamsCheckpoint(producerCtx, "direction:in", "group:my-consumer-group", "topic:my-topic", "type:kafka")
expectedCtxPathway := datastreams.PathwayFromContext(expectedCtx)
assertPathwayEqual(t, expectedCtxPathway, consumerCtxPathway)
})
}

func TestTraceKafkaProduce(t *testing.T) {
t.Run("Checkpoint should be created and pathway should be propagated to kafka headers", func(t *testing.T) {
ctx := context.Background()
initialPathway := datastreams.NewPathway()
ctx = datastreams.ContextWithPathway(ctx, initialPathway)
mt := mocktracer.Start()
defer mt.Stop()
initialPathway, producerCtx := tracer.SetDataStreamsCheckpoint(context.Background(), "direction:out", "topic:topic1")

msg := kafka.Message{
TopicPartition: kafka.TopicPartition{},
Value: []byte{},
}

ctx = TraceKafkaProduce(ctx, &msg)
ctx := TraceKafkaProduce(producerCtx, &msg)

// The old pathway shouldn't be equal to the new pathway found in the ctx because we created a new checkpoint.
ctxPathway, _ := datastreams.PathwayFromContext(ctx)
ctxPathway := datastreams.PathwayFromContext(ctx)
assertPathwayNotEqual(t, initialPathway, ctxPathway)

// The decoded pathway found in the kafka headers should be the same as the pathway found in the ctx.
Expand All @@ -70,23 +73,23 @@ func TestTraceKafkaProduce(t *testing.T) {
encodedPathway = header.Value
}
}
headersPathway, _ := datastreams.Decode(encodedPathway)
headersPathway, _, _ := datastreams.Decode(context.Background(), encodedPathway)
assertPathwayEqual(t, ctxPathway, headersPathway)
})
}

func assertPathwayNotEqual(t *testing.T, p1 datastreams.Pathway, p2 datastreams.Pathway) {
decodedP1, err1 := datastreams.Decode(p1.Encode())
decodedP2, err2 := datastreams.Decode(p2.Encode())
func assertPathwayNotEqual(t *testing.T, p1 dsminterface.Pathway, p2 dsminterface.Pathway) {
decodedP1, _, err1 := datastreams.Decode(context.Background(), p1.Encode())
decodedP2, _, err2 := datastreams.Decode(context.Background(), p2.Encode())

assert.Nil(t, err1)
assert.Nil(t, err2)
assert.NotEqual(t, decodedP1, decodedP2)
}

func assertPathwayEqual(t *testing.T, p1 datastreams.Pathway, p2 datastreams.Pathway) {
decodedP1, err1 := datastreams.Decode(p1.Encode())
decodedP2, err2 := datastreams.Decode(p2.Encode())
func assertPathwayEqual(t *testing.T, p1 dsminterface.Pathway, p2 dsminterface.Pathway) {
decodedP1, _, err1 := datastreams.Decode(context.Background(), p1.Encode())
decodedP2, _, err2 := datastreams.Decode(context.Background(), p2.Encode())

assert.Nil(t, err1)
assert.Nil(t, err2)
Expand Down
71 changes: 0 additions & 71 deletions datastreams/container.go

This file was deleted.

Loading

0 comments on commit 386f073

Please sign in to comment.