From 0e90fb137a9f40348b37d376a4f501f4ecd3611d Mon Sep 17 00:00:00 2001 From: Paulo Dias Date: Wed, 11 Mar 2026 10:21:17 +0000 Subject: [PATCH 1/2] [receiver/kafka] Remove messageHeaders abstraction since Sarama is deprecated Signed-off-by: Paulo Dias --- receiver/kafkareceiver/consumer_franz.go | 8 +- receiver/kafkareceiver/consumer_franz_test.go | 14 +-- receiver/kafkareceiver/consumer_message.go | 93 ------------------- receiver/kafkareceiver/kafka_receiver.go | 62 +++++++------ .../kafka_receiver_bench_test.go | 16 ++-- 5 files changed, 50 insertions(+), 143 deletions(-) delete mode 100644 receiver/kafkareceiver/consumer_message.go diff --git a/receiver/kafkareceiver/consumer_franz.go b/receiver/kafkareceiver/consumer_franz.go index 302ff4b066d55..06f456c3b4581 100644 --- a/receiver/kafkareceiver/consumer_franz.go +++ b/receiver/kafkareceiver/consumer_franz.go @@ -316,7 +316,7 @@ func (c *franzConsumer) consume(ctx context.Context, size int) bool { c.client.MarkCommitRecords(msg) } c.telemetryBuilder.KafkaReceiverCurrentOffset.Record(ctx, msg.Offset, metric.WithAttributeSet(pc.attrs)) - if err := c.handleMessage(pc, wrapFranzMsg(msg)); err != nil { + if err := c.handleMessage(pc, msg); err != nil { pc.logger.Error("unable to process message", zap.Error(err), zap.Int64("offset", msg.Offset), @@ -507,13 +507,13 @@ func (c *franzConsumer) lost(ctx context.Context, _ *kgo.Client, } // handleMessage is called on a per-partition basis. -func (c *franzConsumer) handleMessage(pc *pc, msg kafkaMessage) error { +func (c *franzConsumer) handleMessage(pc *pc, record *kgo.Record) error { if pc.backOff != nil { defer pc.backOff.Reset() } for { - err := c.consumeMessage(pc.ctx, msg, pc.attrs) + err := c.consumeMessage(pc.ctx, record, pc.attrs) if err == nil { return nil // Successfully processed. } @@ -554,7 +554,7 @@ func (c *franzConsumer) handleMessage(pc *pc, msg kafkaMessage) error { } pc.logger.Error("failed to consume message, skipping due to message_marking config", zap.Error(err), - zap.Int64("offset", msg.offset()), + zap.Int64("offset", record.Offset), ) return nil } diff --git a/receiver/kafkareceiver/consumer_franz_test.go b/receiver/kafkareceiver/consumer_franz_test.go index 10e377260aa6e..519ef655b1e48 100644 --- a/receiver/kafkareceiver/consumer_franz_test.go +++ b/receiver/kafkareceiver/consumer_franz_test.go @@ -170,7 +170,7 @@ func TestConsumerShutdownConsuming(t *testing.T) { newConsumeFunc := func() (newConsumeMessageFunc, chan<- struct{}) { consuming := make(chan struct{}) return func(component.Host, *receiverhelper.ObsReport, *metadata.TelemetryBuilder) (consumeMessageFunc, error) { - return func(ctx context.Context, _ kafkaMessage, _ attribute.Set) error { + return func(ctx context.Context, _ *kgo.Record, _ attribute.Set) error { wg.Add(1) defer wg.Done() @@ -234,7 +234,7 @@ func TestConsumerShutdownNotStarted(t *testing.T) { _, cfg := mustNewFakeCluster(t, kfake.SeedTopics(1, "test")) settings, _, _ := mustNewSettings(t) consumeFn := func(component.Host, *receiverhelper.ObsReport, *metadata.TelemetryBuilder) (consumeMessageFunc, error) { - return func(_ context.Context, _ kafkaMessage, _ attribute.Set) error { + return func(_ context.Context, _ *kgo.Record, _ attribute.Set) error { return nil }, nil } @@ -276,7 +276,7 @@ func TestRaceLostVsConsume(t *testing.T) { // Noop consume function. consumeFn := func(component.Host, *receiverhelper.ObsReport, *metadata.TelemetryBuilder) (consumeMessageFunc, error) { - return func(context.Context, kafkaMessage, attribute.Set) error { + return func(context.Context, *kgo.Record, attribute.Set) error { return nil }, nil } @@ -309,7 +309,7 @@ func TestLost(t *testing.T) { settings, _, _ := mustNewSettings(t) consumeFn := func(component.Host, *receiverhelper.ObsReport, *metadata.TelemetryBuilder) (consumeMessageFunc, error) { - return func(_ context.Context, _ kafkaMessage, _ attribute.Set) error { + return func(_ context.Context, _ *kgo.Record, _ attribute.Set) error { return nil }, nil } @@ -337,7 +337,7 @@ func TestFranzConsumer_UseLeaderEpoch_Smoke(t *testing.T) { var called atomic.Int64 settings, _, _ := mustNewSettings(t) consumeFn := func(component.Host, *receiverhelper.ObsReport, *metadata.TelemetryBuilder) (consumeMessageFunc, error) { - return func(_ context.Context, _ kafkaMessage, _ attribute.Set) error { + return func(_ context.Context, _ *kgo.Record, _ attribute.Set) error { called.Add(1) return nil }, nil @@ -419,9 +419,9 @@ func TestExcludeTopicWithRegex(t *testing.T) { settings, _, _ := mustNewSettings(t) consumeFn := func(component.Host, *receiverhelper.ObsReport, *metadata.TelemetryBuilder) (consumeMessageFunc, error) { - return func(_ context.Context, msg kafkaMessage, _ attribute.Set) error { + return func(_ context.Context, record *kgo.Record, _ attribute.Set) error { mu.Lock() - consumedTopics[msg.topic()]++ + consumedTopics[record.Topic]++ mu.Unlock() called.Add(1) return nil diff --git a/receiver/kafkareceiver/consumer_message.go b/receiver/kafkareceiver/consumer_message.go deleted file mode 100644 index 496c5422199da..0000000000000 --- a/receiver/kafkareceiver/consumer_message.go +++ /dev/null @@ -1,93 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package kafkareceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver" - -import ( - "iter" - "time" - - "github.com/twmb/franz-go/pkg/kgo" -) - -// kafkaMessage provides a generic interface for Kafka messages that abstracts -// over Franz-go record types. -type kafkaMessage interface { - value() []byte - headers() messageHeaders - topic() string - partition() int32 - offset() int64 - timestamp() time.Time -} - -type header struct { - key string - value []byte -} - -// messageHeaders provides a generic interface for accessing Kafka message headers. -type messageHeaders interface { - get(key string) (string, bool) - all() iter.Seq[header] - len() int -} - -type franzMessage struct { - record *kgo.Record -} - -func wrapFranzMsg(record *kgo.Record) franzMessage { - return franzMessage{record: record} -} - -func (w franzMessage) value() []byte { - return w.record.Value -} - -func (w franzMessage) headers() messageHeaders { - return franzHeaders{headers: w.record.Headers} -} - -func (w franzMessage) topic() string { - return w.record.Topic -} - -func (w franzMessage) partition() int32 { - return w.record.Partition -} - -func (w franzMessage) offset() int64 { - return w.record.Offset -} - -func (w franzMessage) timestamp() time.Time { - return w.record.Timestamp -} - -type franzHeaders struct { - headers []kgo.RecordHeader -} - -func (h franzHeaders) get(key string) (string, bool) { - for _, header := range h.headers { - if header.Key == key { - return string(header.Value), true - } - } - return "", false -} - -func (h franzHeaders) all() iter.Seq[header] { - return func(yield func(header) bool) { - for _, hdr := range h.headers { - if !yield(header{key: hdr.Key, value: hdr.Value}) { - return - } - } - } -} - -func (h franzHeaders) len() int { - return len(h.headers) -} diff --git a/receiver/kafkareceiver/kafka_receiver.go b/receiver/kafkareceiver/kafka_receiver.go index ade7e5fc37a53..cb8669bfd53a7 100644 --- a/receiver/kafkareceiver/kafka_receiver.go +++ b/receiver/kafkareceiver/kafka_receiver.go @@ -8,6 +8,7 @@ import ( "iter" "github.com/cenkalti/backoff/v4" + "github.com/twmb/franz-go/pkg/kgo" "go.opentelemetry.io/collector/client" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configretry" @@ -31,7 +32,7 @@ import ( const transport = "kafka" -type consumeMessageFunc func(ctx context.Context, message kafkaMessage, attrs attribute.Set) error +type consumeMessageFunc func(ctx context.Context, record *kgo.Record, attrs attribute.Set) error type newConsumeMessageFunc func(host component.Host, obsrecv *receiverhelper.ObsReport, telBldr *metadata.TelemetryBuilder, @@ -77,8 +78,8 @@ func newLogsReceiver(config *Config, set receiver.Settings, nextConsumer consume } headerAttrKeys := buildHeaderAttrKeys(config) - return func(ctx context.Context, message kafkaMessage, attrs attribute.Set) error { - return processMessage(ctx, message, config, set.Logger, telBldr, + return func(ctx context.Context, record *kgo.Record, attrs attribute.Set) error { + return processMessage(ctx, record, config, set.Logger, telBldr, &logsHandler{ unmarshaler: unmarshaler, obsrecv: obsrecv, @@ -104,8 +105,8 @@ func newMetricsReceiver(config *Config, set receiver.Settings, nextConsumer cons } headerAttrKeys := buildHeaderAttrKeys(config) - return func(ctx context.Context, message kafkaMessage, attrs attribute.Set) error { - return processMessage(ctx, message, config, set.Logger, telBldr, + return func(ctx context.Context, record *kgo.Record, attrs attribute.Set) error { + return processMessage(ctx, record, config, set.Logger, telBldr, &metricsHandler{ unmarshaler: unmarshaler, obsrecv: obsrecv, @@ -131,8 +132,8 @@ func newTracesReceiver(config *Config, set receiver.Settings, nextConsumer consu } headerAttrKeys := buildHeaderAttrKeys(config) - return func(ctx context.Context, message kafkaMessage, attrs attribute.Set) error { - return processMessage(ctx, message, config, set.Logger, telBldr, + return func(ctx context.Context, record *kgo.Record, attrs attribute.Set) error { + return processMessage(ctx, record, config, set.Logger, telBldr, &tracesHandler{ unmarshaler: unmarshaler, obsrecv: obsrecv, @@ -158,8 +159,8 @@ func newProfilesReceiver(config *Config, set receiver.Settings, nextConsumer xco } headerAttrKeys := buildHeaderAttrKeys(config) - return func(ctx context.Context, message kafkaMessage, attrs attribute.Set) error { - return processMessage(ctx, message, config, set.Logger, telBldr, + return func(ctx context.Context, record *kgo.Record, attrs attribute.Set) error { + return processMessage(ctx, record, config, set.Logger, telBldr, &profilesHandler{ unmarshaler: unmarshaler, obsrecv: obsrecv, @@ -354,7 +355,7 @@ func (*profilesHandler) getUnmarshalFailureCounter(telBldr *metadata.TelemetryBu // processMessage is a generic function that processes any KafkaMessage using a messageHandler func processMessage[T plog.Logs | pmetric.Metrics | ptrace.Traces | pprofile.Profiles]( ctx context.Context, - message kafkaMessage, + record *kgo.Record, config *Config, logger *zap.Logger, telBldr *metadata.TelemetryBuilder, @@ -364,18 +365,18 @@ func processMessage[T plog.Logs | pmetric.Metrics | ptrace.Traces | pprofile.Pro ) error { if logger.Core().Enabled(zap.DebugLevel) { logger.Debug("kafka message received", - zap.String("value", string(message.value())), - zap.Time("timestamp", message.timestamp()), - zap.String("topic", message.topic()), - zap.Int32("partition", message.partition()), - zap.Int64("offset", message.offset()), + zap.String("value", string(record.Value)), + zap.Time("timestamp", record.Timestamp), + zap.String("topic", record.Topic), + zap.Int32("partition", record.Partition), + zap.Int64("offset", record.Offset), ) } - ctx = contextWithHeaders(ctx, message.headers()) + ctx = contextWithHeaders(ctx, record.Headers) obsCtx := handler.startObsReport(ctx) - data, n, err := handler.unmarshalData(message.value()) + data, n, err := handler.unmarshalData(record.Value) if err != nil { handler.getUnmarshalFailureCounter(telBldr).Add(ctx, 1, metric.WithAttributeSet(attrs)) logger.Error("failed to unmarshal message", zap.Error(err)) @@ -387,7 +388,7 @@ func processMessage[T plog.Logs | pmetric.Metrics | ptrace.Traces | pprofile.Pro // Add resource attributes from headers if configured if config.HeaderExtraction.ExtractHeaders { for key, value := range getMessageHeaderResourceAttributes( - message.headers(), headerAttrKeys, + record.Headers, headerAttrKeys, ) { for resource := range handler.getResources(data) { resource.Attributes().PutStr(key, value) @@ -400,15 +401,16 @@ func processMessage[T plog.Logs | pmetric.Metrics | ptrace.Traces | pprofile.Pro return err } -func getMessageHeaderResourceAttributes(h messageHeaders, headerKeys map[string]string) iter.Seq2[string, string] { +func getMessageHeaderResourceAttributes(headers []kgo.RecordHeader, headerKeys map[string]string) iter.Seq2[string, string] { return func(yield func(string, string) bool) { for rawKey, attrKey := range headerKeys { - value, ok := h.get(rawKey) - if !ok { - continue - } - if !yield(attrKey, value) { - return + for _, h := range headers { + if h.Key == rawKey { + if !yield(attrKey, string(h.Value)) { + return + } + break + } } } } @@ -442,13 +444,13 @@ func newExponentialBackOff(config configretry.BackOffConfig) *backoff.Exponentia return backOff } -func contextWithHeaders(ctx context.Context, headers messageHeaders) context.Context { - if headers.len() == 0 { +func contextWithHeaders(ctx context.Context, headers []kgo.RecordHeader) context.Context { + if len(headers) == 0 { return ctx } - m := make(map[string][]string, headers.len()) - for header := range headers.all() { - m[header.key] = append(m[header.key], string(header.value)) + m := make(map[string][]string, len(headers)) + for _, h := range headers { + m[h.Key] = append(m[h.Key], string(h.Value)) } return client.NewContext(ctx, client.Info{Metadata: client.NewMetadata(m)}) } diff --git a/receiver/kafkareceiver/kafka_receiver_bench_test.go b/receiver/kafkareceiver/kafka_receiver_bench_test.go index cadc68c393331..796d34e5cc2b9 100644 --- a/receiver/kafkareceiver/kafka_receiver_bench_test.go +++ b/receiver/kafkareceiver/kafka_receiver_bench_test.go @@ -14,27 +14,27 @@ func BenchmarkContextWithHeaders(b *testing.B) { baseCtx := b.Context() tests := []struct { name string - headers messageHeaders + headers []kgo.RecordHeader }{ { name: "no headers", - headers: franzHeaders{headers: nil}, + headers: nil, }, { name: "1 header", - headers: franzHeaders{headers: []kgo.RecordHeader{ + headers: []kgo.RecordHeader{ {Key: "trace-id", Value: []byte("abc123")}, - }}, + }, }, { name: "5 headers", - headers: franzHeaders{headers: []kgo.RecordHeader{ + headers: []kgo.RecordHeader{ {Key: "trace-id", Value: []byte("abc123")}, {Key: "span-id", Value: []byte("def456")}, {Key: "tenant", Value: []byte("acme")}, {Key: "source", Value: []byte("app1")}, {Key: "env", Value: []byte("prod")}, - }}, + }, }, } for _, tt := range tests { @@ -64,12 +64,10 @@ func BenchmarkGetMessageHeaderResourceAttributes(b *testing.B) { Key: "unrelated", Value: []byte("ignored"), }) - headers := franzHeaders{headers: kgoHeaders} - b.ReportAllocs() b.ResetTimer() for b.Loop() { - for k, v := range getMessageHeaderResourceAttributes(headers, headerAttrKeys) { + for k, v := range getMessageHeaderResourceAttributes(kgoHeaders, headerAttrKeys) { _ = k _ = v } From 2f86b731573b7ff79947b9000597962b4a59bc1a Mon Sep 17 00:00:00 2001 From: Paulo Dias Date: Wed, 11 Mar 2026 10:42:57 +0000 Subject: [PATCH 2/2] chore: comment Signed-off-by: Paulo Dias --- receiver/kafkareceiver/kafka_receiver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/receiver/kafkareceiver/kafka_receiver.go b/receiver/kafkareceiver/kafka_receiver.go index cb8669bfd53a7..2539328d91e70 100644 --- a/receiver/kafkareceiver/kafka_receiver.go +++ b/receiver/kafkareceiver/kafka_receiver.go @@ -352,7 +352,7 @@ func (*profilesHandler) getUnmarshalFailureCounter(telBldr *metadata.TelemetryBu return telBldr.KafkaReceiverUnmarshalFailedProfiles } -// processMessage is a generic function that processes any KafkaMessage using a messageHandler +// processMessage is a generic function that processes a Kafka record (*kgo.Record) using a messageHandler func processMessage[T plog.Logs | pmetric.Metrics | ptrace.Traces | pprofile.Profiles]( ctx context.Context, record *kgo.Record,