Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions contrib/IBM/sarama/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,79 @@ func TestWrapConsumer(t *testing.T) {
assertDSMConsumerPathway(t, topic, "", msg2, false)
}
}

func TestWrapConsumerWithCustomConsumerSpanOptions(t *testing.T) {
cfg := newIntegrationTestConfig(t)
cfg.Version = sarama.MinVersion
topic := topicName(t)

mt := mocktracer.Start()
defer mt.Stop()

client, err := sarama.NewClient(kafkaBrokers, cfg)
require.NoError(t, err)
defer client.Close()

consumer, err := sarama.NewConsumerFromClient(client)
require.NoError(t, err)
consumer = WrapConsumer(
consumer,
WithDataStreams(),
WithConsumerCustomTag(
"messaging.kafka.key",
func(msg *sarama.ConsumerMessage) any {
return string(msg.Key)
},
),
)
defer consumer.Close()

partitionConsumer, err := consumer.ConsumePartition(topic, 0, 0)
require.NoError(t, err)
defer partitionConsumer.Close()

p, err := sarama.NewSyncProducer(kafkaBrokers, cfg)
require.NoError(t, err)
defer func() {
assert.NoError(t, p.Close())
}()

produceMsg := &sarama.ProducerMessage{
Topic: topic,
Key: sarama.StringEncoder("test key"),
Value: sarama.StringEncoder("test 1"),
Metadata: "test 1",
}
_, _, err = p.SendMessage(produceMsg)
require.NoError(t, err)

msg1 := <-partitionConsumer.Messages()
err = partitionConsumer.Close()
require.NoError(t, err)
// wait for the channel to be closed
<-partitionConsumer.Messages()
waitForSpans(mt, 1)

spans := mt.FinishedSpans()
require.Len(t, spans, 1)

s := spans[0]
spanctx, err := tracer.Extract(NewConsumerMessageCarrier(msg1))
assert.NoError(t, err)
assert.Equal(t, spanctx.TraceIDLower(), s.TraceID(),
"span context should be injected into the consumer message headers")

assert.Equal(t, float64(0), s.Tag(ext.MessagingKafkaPartition))
assert.NotNil(t, s.Tag("offset"))
assert.Equal(t, "kafka", s.Tag(ext.ServiceName))
assert.Equal(t, "Consume Topic "+topic, s.Tag(ext.ResourceName))
assert.Equal(t, "queue", s.Tag(ext.SpanType))
assert.Equal(t, "kafka.consume", s.OperationName())
assert.Equal(t, "IBM/sarama", s.Tag(ext.Component))
assert.Equal(t, ext.SpanKindConsumer, s.Tag(ext.SpanKind))
assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem))
assert.Equal(t, topic, s.Tag("messaging.destination.name"))
assert.Equal(t, "test key", s.Tag("messaging.kafka.key"))

assertDSMConsumerPathway(t, topic, "", msg1, false)
}
5 changes: 5 additions & 0 deletions contrib/IBM/sarama/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ func (w *wrappedDispatcher) Run() {
if !math.IsNaN(w.cfg.analyticsRate) {
opts = append(opts, tracer.Tag(ext.EventSampleRate, w.cfg.analyticsRate))
}
if len(w.cfg.consumerCustomTags) > 0 {
for tag, tagValueFn := range w.cfg.consumerCustomTags {
opts = append(opts, tracer.Tag(tag, tagValueFn(msg)))
}
}
// kafka supports headers, so try to extract a span context
carrier := NewConsumerMessageCarrier(msg)
if spanctx, err := tracer.Extract(carrier); err == nil {
Expand Down
17 changes: 17 additions & 0 deletions contrib/IBM/sarama/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"math"

"github.com/DataDog/dd-trace-go/v2/instrumentation"
"github.com/IBM/sarama"
)

type config struct {
Expand All @@ -19,6 +20,8 @@ type config struct {
analyticsRate float64
dataStreamsEnabled bool
groupID string
consumerCustomTags map[string]func(msg *sarama.ConsumerMessage) any
producerCustomTags map[string]func(msg *sarama.ProducerMessage) any
}

func defaults(cfg *config) {
Expand Down Expand Up @@ -89,3 +92,17 @@ func WithAnalyticsRate(rate float64) OptionFn {
}
}
}

// WithConsumerCustomTag enables calling a callback func to generate the value for a custom tag on wrapped consumers.
func WithConsumerCustomTag(tag string, tagFn func(msg *sarama.ConsumerMessage) any) OptionFn {
return func(cfg *config) {
cfg.consumerCustomTags[tag] = tagFn
}
}

// WithCustomProducerSpanOptions enables calling a callback func to generate the value for a custom tag on wrapped producers.
func WithProducerCustomTag(tag string, tagFn func(msg *sarama.ProducerMessage) any) OptionFn {
return func(cfg *config) {
cfg.producerCustomTags[tag] = tagFn
}
}
5 changes: 5 additions & 0 deletions contrib/IBM/sarama/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,11 @@ func startProducerSpan(cfg *config, version sarama.KafkaVersion, msg *sarama.Pro
if !math.IsNaN(cfg.analyticsRate) {
opts = append(opts, tracer.Tag(ext.EventSampleRate, cfg.analyticsRate))
}
if len(cfg.producerCustomTags) > 0 {
for tag, tagValueFn := range cfg.producerCustomTags {
opts = append(opts, tracer.Tag(tag, tagValueFn(msg)))
}
}
// if there's a span context in the headers, use that as the parent
if spanctx, err := tracer.Extract(carrier); err == nil {
// If there are span links as a result of context extraction, add them as a StartSpanOption
Expand Down
56 changes: 56 additions & 0 deletions contrib/IBM/sarama/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,62 @@ func TestSyncProducerSendMessages(t *testing.T) {
}
}

func TestSyncProducerWithCustomSpanOptions(t *testing.T) {
cfg := newIntegrationTestConfig(t)
topic := topicName(t)

mt := mocktracer.Start()
defer mt.Stop()

producer, err := sarama.NewSyncProducer(kafkaBrokers, cfg)
require.NoError(t, err)
producer = WrapSyncProducer(
cfg,
producer,
WithDataStreams(),
WithProducerCustomTag(
"kafka.messaging.key",
func(msg *sarama.ProducerMessage) any {
key, err := msg.Key.Encode()
assert.NoError(t, err)

return key
},
),
)
defer func() {
assert.NoError(t, producer.Close())
}()

msg1 := &sarama.ProducerMessage{
Topic: topic,
Key: sarama.StringEncoder("test key"),
Value: sarama.StringEncoder("test 1"),
Metadata: "test",
}
_, _, err = producer.SendMessage(msg1)
require.NoError(t, err)

spans := mt.FinishedSpans()
require.Len(t, spans, 1)
{
s := spans[0]
assert.Equal(t, "kafka", s.Tag(ext.ServiceName))
assert.Equal(t, "queue", s.Tag(ext.SpanType))
assert.Equal(t, "Produce Topic "+topic, s.Tag(ext.ResourceName))
assert.Equal(t, "kafka.produce", s.OperationName())
assert.Equal(t, float64(0), s.Tag(ext.MessagingKafkaPartition))
assert.NotNil(t, s.Tag("offset"))
assert.Equal(t, "IBM/sarama", s.Tag(ext.Component))
assert.Equal(t, ext.SpanKindProducer, s.Tag(ext.SpanKind))
assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem))
assert.Equal(t, topic, s.Tag("messaging.destination.name"))
assert.Equal(t, "test key", s.Tag("messaging.kafka.key"))

assertDSMProducerPathway(t, topic, msg1)
}
}

func TestWrapAsyncProducer(t *testing.T) {
// the default for producers is a fire-and-forget model that doesn't return
// successes
Expand Down
Loading