Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions receiver/kafkareceiver/consumer_franz.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func (c *franzConsumer) consume(ctx context.Context, size int) bool {
c.client.MarkCommitRecords(msg)
}
c.telemetryBuilder.KafkaReceiverCurrentOffset.Record(ctx, msg.Offset, metric.WithAttributeSet(pc.attrs))
if err := c.handleMessage(pc, wrapFranzMsg(msg)); err != nil {
if err := c.handleMessage(pc, msg); err != nil {
pc.logger.Error("unable to process message",
zap.Error(err),
zap.Int64("offset", msg.Offset),
Expand Down Expand Up @@ -507,13 +507,13 @@ func (c *franzConsumer) lost(ctx context.Context, _ *kgo.Client,
}

// handleMessage is called on a per-partition basis.
func (c *franzConsumer) handleMessage(pc *pc, msg kafkaMessage) error {
func (c *franzConsumer) handleMessage(pc *pc, record *kgo.Record) error {
if pc.backOff != nil {
defer pc.backOff.Reset()
}

for {
err := c.consumeMessage(pc.ctx, msg, pc.attrs)
err := c.consumeMessage(pc.ctx, record, pc.attrs)
if err == nil {
return nil // Successfully processed.
}
Expand Down Expand Up @@ -554,7 +554,7 @@ func (c *franzConsumer) handleMessage(pc *pc, msg kafkaMessage) error {
}
pc.logger.Error("failed to consume message, skipping due to message_marking config",
zap.Error(err),
zap.Int64("offset", msg.offset()),
zap.Int64("offset", record.Offset),
)
return nil
}
Expand Down
14 changes: 7 additions & 7 deletions receiver/kafkareceiver/consumer_franz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func TestConsumerShutdownConsuming(t *testing.T) {
newConsumeFunc := func() (newConsumeMessageFunc, chan<- struct{}) {
consuming := make(chan struct{})
return func(component.Host, *receiverhelper.ObsReport, *metadata.TelemetryBuilder) (consumeMessageFunc, error) {
return func(ctx context.Context, _ kafkaMessage, _ attribute.Set) error {
return func(ctx context.Context, _ *kgo.Record, _ attribute.Set) error {
wg.Add(1)
defer wg.Done()

Expand Down Expand Up @@ -234,7 +234,7 @@ func TestConsumerShutdownNotStarted(t *testing.T) {
_, cfg := mustNewFakeCluster(t, kfake.SeedTopics(1, "test"))
settings, _, _ := mustNewSettings(t)
consumeFn := func(component.Host, *receiverhelper.ObsReport, *metadata.TelemetryBuilder) (consumeMessageFunc, error) {
return func(_ context.Context, _ kafkaMessage, _ attribute.Set) error {
return func(_ context.Context, _ *kgo.Record, _ attribute.Set) error {
return nil
}, nil
}
Expand Down Expand Up @@ -276,7 +276,7 @@ func TestRaceLostVsConsume(t *testing.T) {

// Noop consume function.
consumeFn := func(component.Host, *receiverhelper.ObsReport, *metadata.TelemetryBuilder) (consumeMessageFunc, error) {
return func(context.Context, kafkaMessage, attribute.Set) error {
return func(context.Context, *kgo.Record, attribute.Set) error {
return nil
}, nil
}
Expand Down Expand Up @@ -309,7 +309,7 @@ func TestLost(t *testing.T) {
settings, _, _ := mustNewSettings(t)

consumeFn := func(component.Host, *receiverhelper.ObsReport, *metadata.TelemetryBuilder) (consumeMessageFunc, error) {
return func(_ context.Context, _ kafkaMessage, _ attribute.Set) error {
return func(_ context.Context, _ *kgo.Record, _ attribute.Set) error {
return nil
}, nil
}
Expand Down Expand Up @@ -337,7 +337,7 @@ func TestFranzConsumer_UseLeaderEpoch_Smoke(t *testing.T) {
var called atomic.Int64
settings, _, _ := mustNewSettings(t)
consumeFn := func(component.Host, *receiverhelper.ObsReport, *metadata.TelemetryBuilder) (consumeMessageFunc, error) {
return func(_ context.Context, _ kafkaMessage, _ attribute.Set) error {
return func(_ context.Context, _ *kgo.Record, _ attribute.Set) error {
called.Add(1)
return nil
}, nil
Expand Down Expand Up @@ -419,9 +419,9 @@ func TestExcludeTopicWithRegex(t *testing.T) {

settings, _, _ := mustNewSettings(t)
consumeFn := func(component.Host, *receiverhelper.ObsReport, *metadata.TelemetryBuilder) (consumeMessageFunc, error) {
return func(_ context.Context, msg kafkaMessage, _ attribute.Set) error {
return func(_ context.Context, record *kgo.Record, _ attribute.Set) error {
mu.Lock()
consumedTopics[msg.topic()]++
consumedTopics[record.Topic]++
mu.Unlock()
called.Add(1)
return nil
Expand Down
93 changes: 0 additions & 93 deletions receiver/kafkareceiver/consumer_message.go

This file was deleted.

64 changes: 33 additions & 31 deletions receiver/kafkareceiver/kafka_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"iter"

"github.com/cenkalti/backoff/v4"
"github.com/twmb/franz-go/pkg/kgo"
"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configretry"
Expand All @@ -31,7 +32,7 @@ import (

const transport = "kafka"

type consumeMessageFunc func(ctx context.Context, message kafkaMessage, attrs attribute.Set) error
type consumeMessageFunc func(ctx context.Context, record *kgo.Record, attrs attribute.Set) error

type newConsumeMessageFunc func(host component.Host, obsrecv *receiverhelper.ObsReport,
telBldr *metadata.TelemetryBuilder,
Expand Down Expand Up @@ -77,8 +78,8 @@ func newLogsReceiver(config *Config, set receiver.Settings, nextConsumer consume
}

headerAttrKeys := buildHeaderAttrKeys(config)
return func(ctx context.Context, message kafkaMessage, attrs attribute.Set) error {
return processMessage(ctx, message, config, set.Logger, telBldr,
return func(ctx context.Context, record *kgo.Record, attrs attribute.Set) error {
return processMessage(ctx, record, config, set.Logger, telBldr,
&logsHandler{
unmarshaler: unmarshaler,
obsrecv: obsrecv,
Expand All @@ -104,8 +105,8 @@ func newMetricsReceiver(config *Config, set receiver.Settings, nextConsumer cons
}

headerAttrKeys := buildHeaderAttrKeys(config)
return func(ctx context.Context, message kafkaMessage, attrs attribute.Set) error {
return processMessage(ctx, message, config, set.Logger, telBldr,
return func(ctx context.Context, record *kgo.Record, attrs attribute.Set) error {
return processMessage(ctx, record, config, set.Logger, telBldr,
&metricsHandler{
unmarshaler: unmarshaler,
obsrecv: obsrecv,
Expand All @@ -131,8 +132,8 @@ func newTracesReceiver(config *Config, set receiver.Settings, nextConsumer consu
}

headerAttrKeys := buildHeaderAttrKeys(config)
return func(ctx context.Context, message kafkaMessage, attrs attribute.Set) error {
return processMessage(ctx, message, config, set.Logger, telBldr,
return func(ctx context.Context, record *kgo.Record, attrs attribute.Set) error {
return processMessage(ctx, record, config, set.Logger, telBldr,
&tracesHandler{
unmarshaler: unmarshaler,
obsrecv: obsrecv,
Expand All @@ -158,8 +159,8 @@ func newProfilesReceiver(config *Config, set receiver.Settings, nextConsumer xco
}

headerAttrKeys := buildHeaderAttrKeys(config)
return func(ctx context.Context, message kafkaMessage, attrs attribute.Set) error {
return processMessage(ctx, message, config, set.Logger, telBldr,
return func(ctx context.Context, record *kgo.Record, attrs attribute.Set) error {
return processMessage(ctx, record, config, set.Logger, telBldr,
&profilesHandler{
unmarshaler: unmarshaler,
obsrecv: obsrecv,
Expand Down Expand Up @@ -351,10 +352,10 @@ func (*profilesHandler) getUnmarshalFailureCounter(telBldr *metadata.TelemetryBu
return telBldr.KafkaReceiverUnmarshalFailedProfiles
}

// processMessage is a generic function that processes any KafkaMessage using a messageHandler
// processMessage is a generic function that processes a Kafka record (*kgo.Record) using a messageHandler
func processMessage[T plog.Logs | pmetric.Metrics | ptrace.Traces | pprofile.Profiles](
ctx context.Context,
message kafkaMessage,
record *kgo.Record,
config *Config,
logger *zap.Logger,
telBldr *metadata.TelemetryBuilder,
Expand All @@ -364,18 +365,18 @@ func processMessage[T plog.Logs | pmetric.Metrics | ptrace.Traces | pprofile.Pro
) error {
if logger.Core().Enabled(zap.DebugLevel) {
logger.Debug("kafka message received",
zap.String("value", string(message.value())),
zap.Time("timestamp", message.timestamp()),
zap.String("topic", message.topic()),
zap.Int32("partition", message.partition()),
zap.Int64("offset", message.offset()),
zap.String("value", string(record.Value)),
zap.Time("timestamp", record.Timestamp),
zap.String("topic", record.Topic),
zap.Int32("partition", record.Partition),
zap.Int64("offset", record.Offset),
)
}

ctx = contextWithHeaders(ctx, message.headers())
ctx = contextWithHeaders(ctx, record.Headers)

obsCtx := handler.startObsReport(ctx)
data, n, err := handler.unmarshalData(message.value())
data, n, err := handler.unmarshalData(record.Value)
if err != nil {
handler.getUnmarshalFailureCounter(telBldr).Add(ctx, 1, metric.WithAttributeSet(attrs))
logger.Error("failed to unmarshal message", zap.Error(err))
Expand All @@ -387,7 +388,7 @@ func processMessage[T plog.Logs | pmetric.Metrics | ptrace.Traces | pprofile.Pro
// Add resource attributes from headers if configured
if config.HeaderExtraction.ExtractHeaders {
for key, value := range getMessageHeaderResourceAttributes(
message.headers(), headerAttrKeys,
record.Headers, headerAttrKeys,
) {
for resource := range handler.getResources(data) {
resource.Attributes().PutStr(key, value)
Expand All @@ -400,15 +401,16 @@ func processMessage[T plog.Logs | pmetric.Metrics | ptrace.Traces | pprofile.Pro
return err
}

func getMessageHeaderResourceAttributes(h messageHeaders, headerKeys map[string]string) iter.Seq2[string, string] {
func getMessageHeaderResourceAttributes(headers []kgo.RecordHeader, headerKeys map[string]string) iter.Seq2[string, string] {
return func(yield func(string, string) bool) {
for rawKey, attrKey := range headerKeys {
value, ok := h.get(rawKey)
if !ok {
continue
}
if !yield(attrKey, value) {
return
for _, h := range headers {
if h.Key == rawKey {
if !yield(attrKey, string(h.Value)) {
return
}
break
}
}
}
}
Expand Down Expand Up @@ -442,13 +444,13 @@ func newExponentialBackOff(config configretry.BackOffConfig) *backoff.Exponentia
return backOff
}

func contextWithHeaders(ctx context.Context, headers messageHeaders) context.Context {
if headers.len() == 0 {
func contextWithHeaders(ctx context.Context, headers []kgo.RecordHeader) context.Context {
if len(headers) == 0 {
return ctx
}
m := make(map[string][]string, headers.len())
for header := range headers.all() {
m[header.key] = append(m[header.key], string(header.value))
m := make(map[string][]string, len(headers))
for _, h := range headers {
m[h.Key] = append(m[h.Key], string(h.Value))
}
return client.NewContext(ctx, client.Info{Metadata: client.NewMetadata(m)})
}
16 changes: 7 additions & 9 deletions receiver/kafkareceiver/kafka_receiver_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,27 @@ func BenchmarkContextWithHeaders(b *testing.B) {
baseCtx := b.Context()
tests := []struct {
name string
headers messageHeaders
headers []kgo.RecordHeader
}{
{
name: "no headers",
headers: franzHeaders{headers: nil},
headers: nil,
},
{
name: "1 header",
headers: franzHeaders{headers: []kgo.RecordHeader{
headers: []kgo.RecordHeader{
{Key: "trace-id", Value: []byte("abc123")},
}},
},
},
{
name: "5 headers",
headers: franzHeaders{headers: []kgo.RecordHeader{
headers: []kgo.RecordHeader{
{Key: "trace-id", Value: []byte("abc123")},
{Key: "span-id", Value: []byte("def456")},
{Key: "tenant", Value: []byte("acme")},
{Key: "source", Value: []byte("app1")},
{Key: "env", Value: []byte("prod")},
}},
},
},
}
for _, tt := range tests {
Expand Down Expand Up @@ -64,12 +64,10 @@ func BenchmarkGetMessageHeaderResourceAttributes(b *testing.B) {
Key: "unrelated",
Value: []byte("ignored"),
})
headers := franzHeaders{headers: kgoHeaders}

b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
for k, v := range getMessageHeaderResourceAttributes(headers, headerAttrKeys) {
for k, v := range getMessageHeaderResourceAttributes(kgoHeaders, headerAttrKeys) {
_ = k
_ = v
}
Expand Down
Loading