diff --git a/contrib/IBM/sarama/consumer_test.go b/contrib/IBM/sarama/consumer_test.go index 9561956d22..05c8f9c2cc 100644 --- a/contrib/IBM/sarama/consumer_test.go +++ b/contrib/IBM/sarama/consumer_test.go @@ -106,3 +106,79 @@ func TestWrapConsumer(t *testing.T) { assertDSMConsumerPathway(t, topic, "", msg2, false) } } + +func TestWrapConsumerWithCustomConsumerSpanOptions(t *testing.T) { + cfg := newIntegrationTestConfig(t) + cfg.Version = sarama.MinVersion + topic := topicName(t) + + mt := mocktracer.Start() + defer mt.Stop() + + client, err := sarama.NewClient(kafkaBrokers, cfg) + require.NoError(t, err) + defer client.Close() + + consumer, err := sarama.NewConsumerFromClient(client) + require.NoError(t, err) + consumer = WrapConsumer( + consumer, + WithDataStreams(), + WithConsumerCustomTag( + "messaging.kafka.key", + func(msg *sarama.ConsumerMessage) any { + return string(msg.Key) + }, + ), + ) + defer consumer.Close() + + partitionConsumer, err := consumer.ConsumePartition(topic, 0, 0) + require.NoError(t, err) + defer partitionConsumer.Close() + + p, err := sarama.NewSyncProducer(kafkaBrokers, cfg) + require.NoError(t, err) + defer func() { + assert.NoError(t, p.Close()) + }() + + produceMsg := &sarama.ProducerMessage{ + Topic: topic, + Key: sarama.StringEncoder("test key"), + Value: sarama.StringEncoder("test 1"), + Metadata: "test 1", + } + _, _, err = p.SendMessage(produceMsg) + require.NoError(t, err) + + msg1 := <-partitionConsumer.Messages() + err = partitionConsumer.Close() + require.NoError(t, err) + // wait for the channel to be closed + <-partitionConsumer.Messages() + waitForSpans(mt, 1) + + spans := mt.FinishedSpans() + require.Len(t, spans, 1) + + s := spans[0] + spanctx, err := tracer.Extract(NewConsumerMessageCarrier(msg1)) + assert.NoError(t, err) + assert.Equal(t, spanctx.TraceIDLower(), s.TraceID(), + "span context should be injected into the consumer message headers") + + assert.Equal(t, float64(0), s.Tag(ext.MessagingKafkaPartition)) + assert.NotNil(t, s.Tag("offset")) + assert.Equal(t, "kafka", s.Tag(ext.ServiceName)) + assert.Equal(t, "Consume Topic "+topic, s.Tag(ext.ResourceName)) + assert.Equal(t, "queue", s.Tag(ext.SpanType)) + assert.Equal(t, "kafka.consume", s.OperationName()) + assert.Equal(t, "IBM/sarama", s.Tag(ext.Component)) + assert.Equal(t, ext.SpanKindConsumer, s.Tag(ext.SpanKind)) + assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem)) + assert.Equal(t, topic, s.Tag("messaging.destination.name")) + assert.Equal(t, "test key", s.Tag("messaging.kafka.key")) + + assertDSMConsumerPathway(t, topic, "", msg1, false) +} diff --git a/contrib/IBM/sarama/dispatcher.go b/contrib/IBM/sarama/dispatcher.go index 855a7b8a42..566c189d8c 100644 --- a/contrib/IBM/sarama/dispatcher.go +++ b/contrib/IBM/sarama/dispatcher.go @@ -59,6 +59,11 @@ func (w *wrappedDispatcher) Run() { if !math.IsNaN(w.cfg.analyticsRate) { opts = append(opts, tracer.Tag(ext.EventSampleRate, w.cfg.analyticsRate)) } + if len(w.cfg.consumerCustomTags) > 0 { + for tag, tagValueFn := range w.cfg.consumerCustomTags { + opts = append(opts, tracer.Tag(tag, tagValueFn(msg))) + } + } // kafka supports headers, so try to extract a span context carrier := NewConsumerMessageCarrier(msg) if spanctx, err := tracer.Extract(carrier); err == nil { diff --git a/contrib/IBM/sarama/option.go b/contrib/IBM/sarama/option.go index f17e050f49..3497beed0f 100644 --- a/contrib/IBM/sarama/option.go +++ b/contrib/IBM/sarama/option.go @@ -9,6 +9,7 @@ import ( "math" "github.com/DataDog/dd-trace-go/v2/instrumentation" + "github.com/IBM/sarama" ) type config struct { @@ -19,6 +20,8 @@ type config struct { analyticsRate float64 dataStreamsEnabled bool groupID string + consumerCustomTags map[string]func(msg *sarama.ConsumerMessage) any + producerCustomTags map[string]func(msg *sarama.ProducerMessage) any } func defaults(cfg *config) { @@ -31,6 +34,9 @@ func defaults(cfg *config) { cfg.dataStreamsEnabled = instr.DataStreamsEnabled() cfg.analyticsRate = instr.AnalyticsRate(false) + + cfg.consumerCustomTags = make(map[string]func(msg *sarama.ConsumerMessage) any) + cfg.producerCustomTags = make(map[string]func(msg *sarama.ProducerMessage) any) } // Option describes options for the Sarama integration. @@ -89,3 +95,17 @@ func WithAnalyticsRate(rate float64) OptionFn { } } } + +// WithConsumerCustomTag enables calling a callback func to generate the value for a custom tag on wrapped consumers. +func WithConsumerCustomTag(tag string, tagFn func(msg *sarama.ConsumerMessage) any) OptionFn { + return func(cfg *config) { + cfg.consumerCustomTags[tag] = tagFn + } +} + +// WithCustomProducerSpanOptions enables calling a callback func to generate the value for a custom tag on wrapped producers. +func WithProducerCustomTag(tag string, tagFn func(msg *sarama.ProducerMessage) any) OptionFn { + return func(cfg *config) { + cfg.producerCustomTags[tag] = tagFn + } +} diff --git a/contrib/IBM/sarama/producer.go b/contrib/IBM/sarama/producer.go index db6f237ce5..2b74188aef 100644 --- a/contrib/IBM/sarama/producer.go +++ b/contrib/IBM/sarama/producer.go @@ -194,6 +194,11 @@ func startProducerSpan(cfg *config, version sarama.KafkaVersion, msg *sarama.Pro if !math.IsNaN(cfg.analyticsRate) { opts = append(opts, tracer.Tag(ext.EventSampleRate, cfg.analyticsRate)) } + if len(cfg.producerCustomTags) > 0 { + for tag, tagValueFn := range cfg.producerCustomTags { + opts = append(opts, tracer.Tag(tag, tagValueFn(msg))) + } + } // if there's a span context in the headers, use that as the parent if spanctx, err := tracer.Extract(carrier); err == nil { // If there are span links as a result of context extraction, add them as a StartSpanOption diff --git a/contrib/IBM/sarama/producer_test.go b/contrib/IBM/sarama/producer_test.go index 8a9efc1b31..f83021bfa1 100644 --- a/contrib/IBM/sarama/producer_test.go +++ b/contrib/IBM/sarama/producer_test.go @@ -103,6 +103,62 @@ func TestSyncProducerSendMessages(t *testing.T) { } } +func TestSyncProducerWithCustomSpanOptions(t *testing.T) { + cfg := newIntegrationTestConfig(t) + topic := topicName(t) + + mt := mocktracer.Start() + defer mt.Stop() + + producer, err := sarama.NewSyncProducer(kafkaBrokers, cfg) + require.NoError(t, err) + producer = WrapSyncProducer( + cfg, + producer, + WithDataStreams(), + WithProducerCustomTag( + "messaging.kafka.key", + func(msg *sarama.ProducerMessage) any { + key, err := msg.Key.Encode() + assert.NoError(t, err) + + return string(key) + }, + ), + ) + defer func() { + assert.NoError(t, producer.Close()) + }() + + msg1 := &sarama.ProducerMessage{ + Topic: topic, + Key: sarama.StringEncoder("test key"), + Value: sarama.StringEncoder("test 1"), + Metadata: "test", + } + _, _, err = producer.SendMessage(msg1) + require.NoError(t, err) + + spans := mt.FinishedSpans() + require.Len(t, spans, 1) + { + s := spans[0] + assert.Equal(t, "kafka", s.Tag(ext.ServiceName)) + assert.Equal(t, "queue", s.Tag(ext.SpanType)) + assert.Equal(t, "Produce Topic "+topic, s.Tag(ext.ResourceName)) + assert.Equal(t, "kafka.produce", s.OperationName()) + assert.Equal(t, float64(0), s.Tag(ext.MessagingKafkaPartition)) + assert.NotNil(t, s.Tag("offset")) + assert.Equal(t, "IBM/sarama", s.Tag(ext.Component)) + assert.Equal(t, ext.SpanKindProducer, s.Tag(ext.SpanKind)) + assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem)) + assert.Equal(t, topic, s.Tag("messaging.destination.name")) + assert.Equal(t, "test key", s.Tag("messaging.kafka.key")) + + assertDSMProducerPathway(t, topic, msg1) + } +} + func TestWrapAsyncProducer(t *testing.T) { // the default for producers is a fire-and-forget model that doesn't return // successes