diff --git a/receiver/kafkareceiver/kafka_receiver.go b/receiver/kafkareceiver/kafka_receiver.go index 2d460011fe4ab..ade7e5fc37a53 100644 --- a/receiver/kafkareceiver/kafka_receiver.go +++ b/receiver/kafkareceiver/kafka_receiver.go @@ -76,6 +76,7 @@ func newLogsReceiver(config *Config, set receiver.Settings, nextConsumer consume return nil, err } + headerAttrKeys := buildHeaderAttrKeys(config) return func(ctx context.Context, message kafkaMessage, attrs attribute.Set) error { return processMessage(ctx, message, config, set.Logger, telBldr, &logsHandler{ @@ -85,6 +86,7 @@ func newLogsReceiver(config *Config, set receiver.Settings, nextConsumer consume encoding: config.Logs.Encoding, }, attrs, + headerAttrKeys, ) }, nil } @@ -101,6 +103,7 @@ func newMetricsReceiver(config *Config, set receiver.Settings, nextConsumer cons return nil, err } + headerAttrKeys := buildHeaderAttrKeys(config) return func(ctx context.Context, message kafkaMessage, attrs attribute.Set) error { return processMessage(ctx, message, config, set.Logger, telBldr, &metricsHandler{ @@ -110,6 +113,7 @@ func newMetricsReceiver(config *Config, set receiver.Settings, nextConsumer cons encoding: config.Metrics.Encoding, }, attrs, + headerAttrKeys, ) }, nil } @@ -126,6 +130,7 @@ func newTracesReceiver(config *Config, set receiver.Settings, nextConsumer consu return nil, err } + headerAttrKeys := buildHeaderAttrKeys(config) return func(ctx context.Context, message kafkaMessage, attrs attribute.Set) error { return processMessage(ctx, message, config, set.Logger, telBldr, &tracesHandler{ @@ -135,6 +140,7 @@ func newTracesReceiver(config *Config, set receiver.Settings, nextConsumer consu encoding: config.Traces.Encoding, }, attrs, + headerAttrKeys, ) }, nil } @@ -151,6 +157,7 @@ func newProfilesReceiver(config *Config, set receiver.Settings, nextConsumer xco return nil, err } + headerAttrKeys := buildHeaderAttrKeys(config) return func(ctx context.Context, message kafkaMessage, attrs attribute.Set) error { return processMessage(ctx, message, config, set.Logger, telBldr, &profilesHandler{ @@ -160,6 +167,7 @@ func newProfilesReceiver(config *Config, set receiver.Settings, nextConsumer xco encoding: config.Profiles.Encoding, }, attrs, + headerAttrKeys, ) }, nil } @@ -352,6 +360,7 @@ func processMessage[T plog.Logs | pmetric.Metrics | ptrace.Traces | pprofile.Pro telBldr *metadata.TelemetryBuilder, handler messageHandler[T], attrs attribute.Set, + headerAttrKeys map[string]string, ) error { if logger.Core().Enabled(zap.DebugLevel) { logger.Debug("kafka message received", @@ -378,7 +387,7 @@ func processMessage[T plog.Logs | pmetric.Metrics | ptrace.Traces | pprofile.Pro // Add resource attributes from headers if configured if config.HeaderExtraction.ExtractHeaders { for key, value := range getMessageHeaderResourceAttributes( - message.headers(), config.HeaderExtraction.Headers, + message.headers(), headerAttrKeys, ) { for resource := range handler.getResources(data) { resource.Attributes().PutStr(key, value) @@ -391,20 +400,34 @@ func processMessage[T plog.Logs | pmetric.Metrics | ptrace.Traces | pprofile.Pro return err } -func getMessageHeaderResourceAttributes(h messageHeaders, resHeaders []string) iter.Seq2[string, string] { +func getMessageHeaderResourceAttributes(h messageHeaders, headerKeys map[string]string) iter.Seq2[string, string] { return func(yield func(string, string) bool) { - for _, resHeader := range resHeaders { - value, ok := h.get(resHeader) + for rawKey, attrKey := range headerKeys { + value, ok := h.get(rawKey) if !ok { continue } - if !yield("kafka.header."+resHeader, value) { + if !yield(attrKey, value) { return } } } } +// buildHeaderAttrKeys pre-computes the mapping from raw header names to their +// "kafka.header." prefixed attribute keys. Returns nil when header extraction +// is disabled. +func buildHeaderAttrKeys(config *Config) map[string]string { + if !config.HeaderExtraction.ExtractHeaders { + return nil + } + m := make(map[string]string, len(config.HeaderExtraction.Headers)) + for _, h := range config.HeaderExtraction.Headers { + m[h] = "kafka.header." + h + } + return m +} + func newExponentialBackOff(config configretry.BackOffConfig) *backoff.ExponentialBackOff { if !config.Enabled { return nil diff --git a/receiver/kafkareceiver/kafka_receiver_bench_test.go b/receiver/kafkareceiver/kafka_receiver_bench_test.go index 03b53d5701d87..cadc68c393331 100644 --- a/receiver/kafkareceiver/kafka_receiver_bench_test.go +++ b/receiver/kafkareceiver/kafka_receiver_bench_test.go @@ -4,6 +4,7 @@ package kafkareceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver" import ( + "fmt" "testing" "github.com/twmb/franz-go/pkg/kgo" @@ -44,3 +45,35 @@ func BenchmarkContextWithHeaders(b *testing.B) { }) } } + +func BenchmarkGetMessageHeaderResourceAttributes(b *testing.B) { + for _, numHeaders := range []int{1, 4, 8, 16, 32, 64, 128} { + b.Run(fmt.Sprintf("headers=%d", numHeaders), func(b *testing.B) { + // Build message headers: numHeaders matching + 1 unrelated. + kgoHeaders := make([]kgo.RecordHeader, 0, numHeaders+1) + headerAttrKeys := make(map[string]string, numHeaders) + for i := range numHeaders { + key := fmt.Sprintf("header-%d", i) + kgoHeaders = append(kgoHeaders, kgo.RecordHeader{ + Key: key, + Value: fmt.Appendf(nil, "value-%d", i), + }) + headerAttrKeys[key] = "kafka.header." + key + } + kgoHeaders = append(kgoHeaders, kgo.RecordHeader{ + Key: "unrelated", + Value: []byte("ignored"), + }) + headers := franzHeaders{headers: kgoHeaders} + + b.ReportAllocs() + b.ResetTimer() + for b.Loop() { + for k, v := range getMessageHeaderResourceAttributes(headers, headerAttrKeys) { + _ = k + _ = v + } + } + }) + } +}