Skip to content

Commit

Permalink
add confluent kafka support
Browse files Browse the repository at this point in the history
  • Loading branch information
piochelepiotr committed Aug 25, 2023
1 parent be27ec4 commit 3e2fccc
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 162 deletions.
61 changes: 0 additions & 61 deletions contrib/confluentinc/confluent-kafka-go/kafka/data_streams.go

This file was deleted.

95 changes: 0 additions & 95 deletions contrib/confluentinc/confluent-kafka-go/kafka/data_streams_test.go

This file was deleted.

113 changes: 112 additions & 1 deletion contrib/confluentinc/confluent-kafka-go/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
package kafka // import "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/kafka"

import (
"context"
"gopkg.in/DataDog/dd-trace-go.v1/datastreams"
"math"
"time"

Expand Down Expand Up @@ -81,6 +83,7 @@ func (c *Consumer) traceEventsChannel(in chan kafka.Event) chan kafka.Event {
// only trace messages
if msg, ok := evt.(*kafka.Message); ok {
next = c.startSpan(msg)
setConsumeCheckpoint(c.cfg.dataStreamsEnabled, c.cfg.groupID, msg)
}

out <- evt
Expand Down Expand Up @@ -164,6 +167,7 @@ func (c *Consumer) Poll(timeoutMS int) (event kafka.Event) {
}
evt := c.Consumer.Poll(timeoutMS)
if msg, ok := evt.(*kafka.Message); ok {
setConsumeCheckpoint(c.cfg.dataStreamsEnabled, c.cfg.groupID, msg)
c.prev = c.startSpan(msg)
}
return evt
Expand All @@ -179,37 +183,84 @@ func (c *Consumer) ReadMessage(timeout time.Duration) (*kafka.Message, error) {
if err != nil {
return nil, err
}
setConsumeCheckpoint(c.cfg.dataStreamsEnabled, c.cfg.groupID, msg)
c.prev = c.startSpan(msg)
return msg, nil
}

// Commit commits current offsets and tracks the commit offsets if data streams is enabled.
func (c *Consumer) Commit() ([]kafka.TopicPartition, error) {
tps, err := c.Consumer.Commit()
commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, tps, err)
return tps, err
}

// CommitMessage commits a message and tracks the commit offsets if data streams is enabled.
func (c *Consumer) CommitMessage(msg *kafka.Message) ([]kafka.TopicPartition, error) {
tps, err := c.Consumer.CommitMessage(msg)
commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, tps, err)
return tps, err
}

// CommitOffsets commits provided offsets and tracks the commit offsets if data streams is enabled.
func (c *Consumer) CommitOffsets(offsets []kafka.TopicPartition) ([]kafka.TopicPartition, error) {
tps, err := c.Consumer.CommitOffsets(offsets)
commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, tps, err)
return tps, err
}

func commitOffsets(dataStreamsEnabled bool, groupID string, tps []kafka.TopicPartition, err error) {
if err != nil || groupID == "" || !dataStreamsEnabled {
return
}
for _, tp := range tps {
tracer.TrackKafkaCommitOffset(groupID, *tp.Topic, tp.Partition, int64(tp.Offset))
}
}

func trackProduceOffsets(dataStreamsEnabled bool, msg *kafka.Message, err error) {
if err != nil || !dataStreamsEnabled || msg.TopicPartition.Topic == nil {
return
}
tracer.TrackKafkaProduceOffset(*msg.TopicPartition.Topic, msg.TopicPartition.Partition, int64(msg.TopicPartition.Offset))
}

// A Producer wraps a kafka.Producer.
type Producer struct {
*kafka.Producer
cfg *config
produceChannel chan *kafka.Message
events chan kafka.Event
}

// WrapProducer wraps a kafka.Producer so requests are traced.
func WrapProducer(p *kafka.Producer, opts ...Option) *Producer {
wrapped := &Producer{
Producer: p,
cfg: newConfig(opts...),
events: p.Events(),
}
log.Debug("contrib/confluentinc/confluent-kafka-go/kafka: Wrapping Producer: %#v", wrapped.cfg)
wrapped.produceChannel = wrapped.traceProduceChannel(p.ProduceChannel())
if wrapped.cfg.dataStreamsEnabled {
wrapped.events = wrapped.traceEventsChannel(p.Events())
}
return wrapped
}

func (p *Producer) Events() chan kafka.Event {
return p.events
}

func (p *Producer) traceProduceChannel(out chan *kafka.Message) chan *kafka.Message {
if out == nil {
return out
}

in := make(chan *kafka.Message, 1)
go func() {
for msg := range in {
span := p.startSpan(msg)
setProduceCheckpoint(p.cfg.dataStreamsEnabled, msg)
out <- msg
span.Finish()
}
Expand Down Expand Up @@ -269,12 +320,14 @@ func (p *Producer) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) er
if msg, ok := evt.(*kafka.Message); ok {
// delivery errors are returned via TopicPartition.Error
err = msg.TopicPartition.Error
trackProduceOffsets(p.cfg.dataStreamsEnabled, msg, err)
}
span.Finish(tracer.WithError(err))
oldDeliveryChan <- evt
}()
}

setProduceCheckpoint(p.cfg.dataStreamsEnabled, msg)
err := p.Producer.Produce(msg, deliveryChan)
// with no delivery channel, finish immediately
if deliveryChan == nil {
Expand All @@ -289,3 +342,61 @@ func (p *Producer) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) er
func (p *Producer) ProduceChannel() chan *kafka.Message {
return p.produceChannel
}

func (p *Producer) traceEventsChannel(in chan kafka.Event) chan kafka.Event {
if in == nil {
return nil
}
out := make(chan kafka.Event, 1)
go func() {
defer close(out)
for evt := range in {
if msg, ok := evt.(*kafka.Message); ok {
trackProduceOffsets(p.cfg.dataStreamsEnabled, msg, msg.TopicPartition.Error)
}
out <- evt
}
}()
return out
}

func setConsumeCheckpoint(dataStreamsEnabled bool, groupID string, msg *kafka.Message) {
if !dataStreamsEnabled || msg == nil {
return
}
edges := []string{"direction:in", "topic:" + *msg.TopicPartition.Topic, "type:kafka"}
if groupID != "" {
edges = append(edges, "group:"+groupID)
}
carrier := NewMessageCarrier(msg)
_, ctx, ok := tracer.SetDataStreamsCheckpointWithParams(datastreams.ExtractFromCarrier(context.Background(), carrier), datastreams.NewCheckpointParams().WithPayloadSize(getMsgSize(msg)), edges...)
if !ok {
return
}
datastreams.InjectToCarrier(ctx, carrier)
if groupID != "" {
// only track Kafka lag if a consumer group is set.
// since there is no ack mechanism, we consider that messages read are committed right away.
// tracer.TrackKafkaCommitOffset(groupID, msg.Topic, msg.Partition, msg.Offset)
}
}

func setProduceCheckpoint(dataStreamsEnabled bool, msg *kafka.Message) {
if !dataStreamsEnabled || msg == nil {
return
}
edges := []string{"direction:out", "topic:" + *msg.TopicPartition.Topic, "type:kafka"}
carrier := NewMessageCarrier(msg)
_, ctx, ok := tracer.SetDataStreamsCheckpointWithParams(datastreams.ExtractFromCarrier(context.Background(), carrier), datastreams.NewCheckpointParams().WithPayloadSize(getMsgSize(msg)), edges...)
if !ok {
return
}
datastreams.InjectToCarrier(ctx, carrier)
}

func getMsgSize(msg *kafka.Message) (size int64) {
for _, header := range msg.Headers {
size += int64(len(header.Key) + len(header.Value))
}
return size + int64(len(msg.Value)+len(msg.Key))
}
Loading

0 comments on commit 3e2fccc

Please sign in to comment.