Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 28 additions & 5 deletions receiver/kafkareceiver/kafka_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func newLogsReceiver(config *Config, set receiver.Settings, nextConsumer consume
return nil, err
}

headerAttrKeys := buildHeaderAttrKeys(config)
return func(ctx context.Context, message kafkaMessage, attrs attribute.Set) error {
return processMessage(ctx, message, config, set.Logger, telBldr,
&logsHandler{
Expand All @@ -85,6 +86,7 @@ func newLogsReceiver(config *Config, set receiver.Settings, nextConsumer consume
encoding: config.Logs.Encoding,
},
attrs,
headerAttrKeys,
)
}, nil
}
Expand All @@ -101,6 +103,7 @@ func newMetricsReceiver(config *Config, set receiver.Settings, nextConsumer cons
return nil, err
}

headerAttrKeys := buildHeaderAttrKeys(config)
return func(ctx context.Context, message kafkaMessage, attrs attribute.Set) error {
return processMessage(ctx, message, config, set.Logger, telBldr,
&metricsHandler{
Expand All @@ -110,6 +113,7 @@ func newMetricsReceiver(config *Config, set receiver.Settings, nextConsumer cons
encoding: config.Metrics.Encoding,
},
attrs,
headerAttrKeys,
)
}, nil
}
Expand All @@ -126,6 +130,7 @@ func newTracesReceiver(config *Config, set receiver.Settings, nextConsumer consu
return nil, err
}

headerAttrKeys := buildHeaderAttrKeys(config)
return func(ctx context.Context, message kafkaMessage, attrs attribute.Set) error {
return processMessage(ctx, message, config, set.Logger, telBldr,
&tracesHandler{
Expand All @@ -135,6 +140,7 @@ func newTracesReceiver(config *Config, set receiver.Settings, nextConsumer consu
encoding: config.Traces.Encoding,
},
attrs,
headerAttrKeys,
)
}, nil
}
Expand All @@ -151,6 +157,7 @@ func newProfilesReceiver(config *Config, set receiver.Settings, nextConsumer xco
return nil, err
}

headerAttrKeys := buildHeaderAttrKeys(config)
return func(ctx context.Context, message kafkaMessage, attrs attribute.Set) error {
return processMessage(ctx, message, config, set.Logger, telBldr,
&profilesHandler{
Expand All @@ -160,6 +167,7 @@ func newProfilesReceiver(config *Config, set receiver.Settings, nextConsumer xco
encoding: config.Profiles.Encoding,
},
attrs,
headerAttrKeys,
)
}, nil
}
Expand Down Expand Up @@ -352,6 +360,7 @@ func processMessage[T plog.Logs | pmetric.Metrics | ptrace.Traces | pprofile.Pro
telBldr *metadata.TelemetryBuilder,
handler messageHandler[T],
attrs attribute.Set,
headerAttrKeys map[string]string,
) error {
if logger.Core().Enabled(zap.DebugLevel) {
logger.Debug("kafka message received",
Expand All @@ -378,7 +387,7 @@ func processMessage[T plog.Logs | pmetric.Metrics | ptrace.Traces | pprofile.Pro
// Add resource attributes from headers if configured
if config.HeaderExtraction.ExtractHeaders {
for key, value := range getMessageHeaderResourceAttributes(
message.headers(), config.HeaderExtraction.Headers,
message.headers(), headerAttrKeys,
) {
for resource := range handler.getResources(data) {
resource.Attributes().PutStr(key, value)
Expand All @@ -391,20 +400,34 @@ func processMessage[T plog.Logs | pmetric.Metrics | ptrace.Traces | pprofile.Pro
return err
}

func getMessageHeaderResourceAttributes(h messageHeaders, resHeaders []string) iter.Seq2[string, string] {
func getMessageHeaderResourceAttributes(h messageHeaders, headerKeys map[string]string) iter.Seq2[string, string] {
return func(yield func(string, string) bool) {
for _, resHeader := range resHeaders {
value, ok := h.get(resHeader)
for rawKey, attrKey := range headerKeys {
value, ok := h.get(rawKey)
if !ok {
continue
}
if !yield("kafka.header."+resHeader, value) {
if !yield(attrKey, value) {
return
}
}
}
}

// buildHeaderAttrKeys pre-computes the mapping from raw header names to their
// "kafka.header." prefixed attribute keys. Returns nil when header extraction
// is disabled.
func buildHeaderAttrKeys(config *Config) map[string]string {
if !config.HeaderExtraction.ExtractHeaders {
return nil
}
m := make(map[string]string, len(config.HeaderExtraction.Headers))
for _, h := range config.HeaderExtraction.Headers {
m[h] = "kafka.header." + h
}
return m
}

func newExponentialBackOff(config configretry.BackOffConfig) *backoff.ExponentialBackOff {
if !config.Enabled {
return nil
Expand Down
33 changes: 33 additions & 0 deletions receiver/kafkareceiver/kafka_receiver_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package kafkareceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver"

import (
"fmt"
"testing"

"github.com/twmb/franz-go/pkg/kgo"
Expand Down Expand Up @@ -44,3 +45,35 @@ func BenchmarkContextWithHeaders(b *testing.B) {
})
}
}

func BenchmarkGetMessageHeaderResourceAttributes(b *testing.B) {
for _, numHeaders := range []int{1, 4, 8, 16, 32, 64, 128} {
b.Run(fmt.Sprintf("headers=%d", numHeaders), func(b *testing.B) {
// Build message headers: numHeaders matching + 1 unrelated.
kgoHeaders := make([]kgo.RecordHeader, 0, numHeaders+1)
headerAttrKeys := make(map[string]string, numHeaders)
for i := range numHeaders {
key := fmt.Sprintf("header-%d", i)
kgoHeaders = append(kgoHeaders, kgo.RecordHeader{
Key: key,
Value: fmt.Appendf(nil, "value-%d", i),
})
headerAttrKeys[key] = "kafka.header." + key
}
kgoHeaders = append(kgoHeaders, kgo.RecordHeader{
Key: "unrelated",
Value: []byte("ignored"),
})
headers := franzHeaders{headers: kgoHeaders}

b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
for k, v := range getMessageHeaderResourceAttributes(headers, headerAttrKeys) {
_ = k
_ = v
}
}
})
}
}
Loading