diff --git a/internal/component/otelcol/receiver/kafka/kafka.go b/internal/component/otelcol/receiver/kafka/kafka.go index dccfefff294..f1d77e8cec1 100644 --- a/internal/component/otelcol/receiver/kafka/kafka.go +++ b/internal/component/otelcol/receiver/kafka/kafka.go @@ -5,11 +5,13 @@ import ( "fmt" "time" + "github.com/go-kit/log" "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/component/otelcol" otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config" "github.com/grafana/alloy/internal/component/otelcol/receiver" "github.com/grafana/alloy/internal/featuregate" + "github.com/grafana/alloy/internal/runtime/logging/level" "github.com/mitchellh/mapstructure" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/configkafka" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver" @@ -25,8 +27,10 @@ func init() { Args: Arguments{}, Build: func(opts component.Options, args component.Arguments) (component.Component, error) { + a := args.(Arguments) + a.logDeprecations(opts.Logger) fact := kafkareceiver.NewFactory() - return receiver.New(opts, fact, args.(Arguments)) + return receiver.New(opts, fact, a) }, }) } @@ -145,6 +149,34 @@ type KafkaReceiverTopicEncodingConfig struct { ExcludeTopics []string `alloy:"exclude_topics,attr,optional"` } +func (args Arguments) logDeprecations(logger log.Logger) { + for _, signal := range []struct { + name string + cfg KafkaReceiverTopicEncodingConfig + }{ + {"logs", args.Logs}, + {"metrics", args.Metrics}, + {"traces", args.Traces}, + } { + if signal.cfg.Topic != "" { + level.Warn(logger).Log("msg", "the topic attribute is deprecated and will be removed in a future release, use topics instead", + "signal", signal.name, "topic", signal.cfg.Topic) + } + } +} + +func (c KafkaReceiverTopicEncodingConfig) convert() kafkareceiver.TopicEncodingConfig { + topics := c.Topics + if c.Topic != "" { + topics = []string{c.Topic} + } + return kafkareceiver.TopicEncodingConfig{ + Topics: topics, + Encoding: c.Encoding, + ExcludeTopics: c.ExcludeTopics, + } +} + type ErrorBackOffArguments struct { Enabled bool `alloy:"enabled,attr,optional"` InitialInterval time.Duration `alloy:"initial_interval,attr,optional"` @@ -206,32 +238,19 @@ func (args Arguments) Convert() (otelcomponent.Config, error) { result.UseLeaderEpoch = args.UseLeaderEpoch result.ErrorBackOff = *args.ErrorBackOff.Convert() - result.Logs = kafkareceiver.TopicEncodingConfig{ - Topic: args.Logs.Topic, - Topics: args.Logs.Topics, - Encoding: args.Logs.Encoding, - ExcludeTopics: args.Logs.ExcludeTopics, - } - - result.Metrics = kafkareceiver.TopicEncodingConfig{ - Topic: args.Metrics.Topic, - Topics: args.Metrics.Topics, - Encoding: args.Metrics.Encoding, - ExcludeTopics: args.Metrics.ExcludeTopics, - } - - result.Traces = kafkareceiver.TopicEncodingConfig{ - Topic: args.Traces.Topic, - Topics: args.Traces.Topics, - Encoding: args.Traces.Encoding, - ExcludeTopics: args.Traces.ExcludeTopics, - } + result.Logs = args.Logs.convert() + result.Metrics = args.Metrics.convert() + result.Traces = args.Traces.convert() if args.TLS != nil { tlsCfg := args.TLS.Convert() result.TLS = tlsCfg } + if err := result.Validate(); err != nil { + return nil, err + } + return &result, nil } diff --git a/internal/component/otelcol/receiver/kafka/kafka_test.go b/internal/component/otelcol/receiver/kafka/kafka_test.go index 4effdc2f4d7..8125ce4bcd4 100644 --- a/internal/component/otelcol/receiver/kafka/kafka_test.go +++ b/internal/component/otelcol/receiver/kafka/kafka_test.go @@ -598,6 +598,55 @@ func TestArguments_Auth(t *testing.T) { } } +func TestDeprecatedTopicShouldBeMigratedToNewTopics(t *testing.T) { + // When using the deprecated per-signal `topic` + // (singular) field, the converted config should have + // the deprecated `topic` unset and the new `topics` set. + cfg := ` + brokers = ["10.10.10.10:9092"] + protocol_version = "2.0.0" + logs { + topic = "my_custom_logs_topic" + encoding = "otlp_json" + } + metrics { + topic = "my_custom_metrics_topic" + encoding = "otlp_json" + } + traces { + topic = "my_custom_traces_topic" + encoding = "otlp_json" + } + output {} + ` + + var args kafka.Arguments + err := syntax.Unmarshal([]byte(cfg), &args) + require.NoError(t, err) + + otelCfg, err := args.Convert() + require.NoError(t, err) + + converted := otelCfg.(*kafkareceiver.Config) + require.NoError(t, converted.Validate(), "converted config should be valid") + + // The deprecated `topic` should be migrated into `topics`, matching upstream behavior. + require.Equal(t, []string{"my_custom_logs_topic"}, converted.Logs.Topics, + "deprecated logs.topic should be migrated to logs.topics") + require.Empty(t, converted.Logs.Topic, + "deprecated logs.topic should be cleared after migration") + + require.Equal(t, []string{"my_custom_metrics_topic"}, converted.Metrics.Topics, + "deprecated metrics.topic should be migrated to metrics.topics") + require.Empty(t, converted.Metrics.Topic, + "deprecated metrics.topic should be cleared after migration") + + require.Equal(t, []string{"my_custom_traces_topic"}, converted.Traces.Topics, + "deprecated traces.topic should be migrated to traces.topics") + require.Empty(t, converted.Traces.Topic, + "deprecated traces.topic should be cleared after migration") +} + func TestDebugMetricsConfig(t *testing.T) { tests := []struct { testName string