Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions contrib/IBM/sarama/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,80 @@ func TestWrapConsumer(t *testing.T) {
assertDSMConsumerPathway(t, topic, "", msg2, false)
}
}

func TestWrapConsumerWithCustomConsumerSpanOptions(t *testing.T) {
cfg := newIntegrationTestConfig(t)
cfg.Version = sarama.MinVersion
topic := topicName(t)

mt := mocktracer.Start()
defer mt.Stop()

client, err := sarama.NewClient(kafkaBrokers, cfg)
require.NoError(t, err)
defer client.Close()

consumer, err := sarama.NewConsumerFromClient(client)
require.NoError(t, err)
consumer = WrapConsumer(
consumer,
WithDataStreams(),
WithCustomConsumerSpanOptions(
func(msg *sarama.ConsumerMessage) []tracer.StartSpanOption {
return []tracer.StartSpanOption{
tracer.Tag("messaging.kafka.key", string(msg.Key)),
}
},
),
)
defer consumer.Close()

partitionConsumer, err := consumer.ConsumePartition(topic, 0, 0)
require.NoError(t, err)
defer partitionConsumer.Close()

p, err := sarama.NewSyncProducer(kafkaBrokers, cfg)
require.NoError(t, err)
defer func() {
assert.NoError(t, p.Close())
}()

produceMsg := &sarama.ProducerMessage{
Topic: topic,
Key: sarama.StringEncoder("test key"),
Value: sarama.StringEncoder("test 1"),
Metadata: "test 1",
}
_, _, err = p.SendMessage(produceMsg)
require.NoError(t, err)

msg1 := <-partitionConsumer.Messages()
err = partitionConsumer.Close()
require.NoError(t, err)
// wait for the channel to be closed
<-partitionConsumer.Messages()
waitForSpans(mt, 1)

spans := mt.FinishedSpans()
require.Len(t, spans, 1)

s := spans[0]
spanctx, err := tracer.Extract(NewConsumerMessageCarrier(msg1))
assert.NoError(t, err)
assert.Equal(t, spanctx.TraceIDLower(), s.TraceID(),
"span context should be injected into the consumer message headers")

assert.Equal(t, float64(0), s.Tag(ext.MessagingKafkaPartition))
assert.NotNil(t, s.Tag("offset"))
assert.Equal(t, "kafka", s.Tag(ext.ServiceName))
assert.Equal(t, "Consume Topic "+topic, s.Tag(ext.ResourceName))
assert.Equal(t, "queue", s.Tag(ext.SpanType))
assert.Equal(t, "kafka.consume", s.OperationName())
assert.Equal(t, "IBM/sarama", s.Tag(ext.Component))
assert.Equal(t, ext.SpanKindConsumer, s.Tag(ext.SpanKind))
assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem))
assert.Equal(t, topic, s.Tag("messaging.destination.name"))
assert.Equal(t, "test key", s.Tag("messaging.kafka.key"))

assertDSMConsumerPathway(t, topic, "", msg1, false)
}
3 changes: 3 additions & 0 deletions contrib/IBM/sarama/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ func (w *wrappedDispatcher) Run() {
if !math.IsNaN(w.cfg.analyticsRate) {
opts = append(opts, tracer.Tag(ext.EventSampleRate, w.cfg.analyticsRate))
}
if w.cfg.customConsumerSpanOptionsFunc != nil {
opts = append(opts, w.cfg.customConsumerSpanOptionsFunc(msg)...)
}
// kafka supports headers, so try to extract a span context
carrier := NewConsumerMessageCarrier(msg)
if spanctx, err := tracer.Extract(carrier); err == nil {
Expand Down
35 changes: 28 additions & 7 deletions contrib/IBM/sarama/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,24 @@ package sarama
import (
"math"

"github.com/DataDog/dd-trace-go/v2/ddtrace/tracer"
"github.com/DataDog/dd-trace-go/v2/instrumentation"
"github.com/IBM/sarama"
)

type CustomConsumerSpanOptsFunc func(msg *sarama.ConsumerMessage) []tracer.StartSpanOption
type CustomProducerSpanOptsFunc func(msg *sarama.ProducerMessage) []tracer.StartSpanOption

type config struct {
consumerServiceName string
producerServiceName string
consumerSpanName string
producerSpanName string
analyticsRate float64
dataStreamsEnabled bool
groupID string
consumerServiceName string
producerServiceName string
consumerSpanName string
producerSpanName string
analyticsRate float64
dataStreamsEnabled bool
groupID string
customConsumerSpanOptionsFunc CustomConsumerSpanOptsFunc
customProducerSpanOptionsFunc CustomProducerSpanOptsFunc
}

func defaults(cfg *config) {
Expand Down Expand Up @@ -89,3 +96,17 @@ func WithAnalyticsRate(rate float64) OptionFn {
}
}
}

// WithCustomConsumerSpanOptions enables calling a callback func to add custom span options on wrapped consumers.
func WithCustomConsumerSpanOptions(f CustomConsumerSpanOptsFunc) OptionFn {
return func(cfg *config) {
cfg.customConsumerSpanOptionsFunc = f
}
}

// WithCustomProducerSpanOptions enables calling a callback func to add custom span options on wrapped producers.
func WithCustomProducerSpanOptions(f CustomProducerSpanOptsFunc) OptionFn {
return func(cfg *config) {
cfg.customProducerSpanOptionsFunc = f
}
}
3 changes: 3 additions & 0 deletions contrib/IBM/sarama/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,9 @@ func startProducerSpan(cfg *config, version sarama.KafkaVersion, msg *sarama.Pro
if !math.IsNaN(cfg.analyticsRate) {
opts = append(opts, tracer.Tag(ext.EventSampleRate, cfg.analyticsRate))
}
if cfg.customProducerSpanOptionsFunc != nil {
opts = append(opts, cfg.customProducerSpanOptionsFunc(msg)...)
}
// if there's a span context in the headers, use that as the parent
if spanctx, err := tracer.Extract(carrier); err == nil {
// If there are span links as a result of context extraction, add them as a StartSpanOption
Expand Down
58 changes: 58 additions & 0 deletions contrib/IBM/sarama/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/DataDog/dd-trace-go/v2/ddtrace/ext"
"github.com/DataDog/dd-trace-go/v2/ddtrace/mocktracer"
"github.com/DataDog/dd-trace-go/v2/ddtrace/tracer"
)

func TestSyncProducer(t *testing.T) {
Expand Down Expand Up @@ -103,6 +104,63 @@ func TestSyncProducerSendMessages(t *testing.T) {
}
}

func TestSyncProducerWithCustomSpanOptions(t *testing.T) {
cfg := newIntegrationTestConfig(t)
topic := topicName(t)

mt := mocktracer.Start()
defer mt.Stop()

producer, err := sarama.NewSyncProducer(kafkaBrokers, cfg)
require.NoError(t, err)
producer = WrapSyncProducer(
cfg,
producer,
WithDataStreams(),
WithCustomProducerSpanOptions(
func(msg *sarama.ProducerMessage) []tracer.StartSpanOption {
key, err := msg.Key.Encode()
assert.NoError(t, err)

return []tracer.StartSpanOption{
tracer.Tag("kafka.messaging.key", key),
}
},
),
)
defer func() {
assert.NoError(t, producer.Close())
}()

msg1 := &sarama.ProducerMessage{
Topic: topic,
Key: sarama.StringEncoder("test key"),
Value: sarama.StringEncoder("test 1"),
Metadata: "test",
}
_, _, err = producer.SendMessage(msg1)
require.NoError(t, err)

spans := mt.FinishedSpans()
require.Len(t, spans, 1)
{
s := spans[0]
assert.Equal(t, "kafka", s.Tag(ext.ServiceName))
assert.Equal(t, "queue", s.Tag(ext.SpanType))
assert.Equal(t, "Produce Topic "+topic, s.Tag(ext.ResourceName))
assert.Equal(t, "kafka.produce", s.OperationName())
assert.Equal(t, float64(0), s.Tag(ext.MessagingKafkaPartition))
assert.NotNil(t, s.Tag("offset"))
assert.Equal(t, "IBM/sarama", s.Tag(ext.Component))
assert.Equal(t, ext.SpanKindProducer, s.Tag(ext.SpanKind))
assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem))
assert.Equal(t, topic, s.Tag("messaging.destination.name"))
assert.Equal(t, "test key", s.Tag("messaging.kafka.key"))

assertDSMProducerPathway(t, topic, msg1)
}
}

func TestWrapAsyncProducer(t *testing.T) {
// the default for producers is a fire-and-forget model that doesn't return
// successes
Expand Down