From 15420081600f280647b2f8c608bfbd5ce272859c Mon Sep 17 00:00:00 2001 From: Chodor Marek Date: Thu, 27 Jun 2019 15:42:44 +0200 Subject: [PATCH 1/4] Configurable kafka protocol version for msg consuming by jaeger ingester Signed-off-by: Chodor Marek --- cmd/ingester/app/builder/builder.go | 1 + cmd/ingester/app/flags.go | 7 +++++++ cmd/ingester/app/flags_test.go | 2 ++ pkg/kafka/consumer/config.go | 9 +++++++++ 4 files changed, 19 insertions(+) diff --git a/cmd/ingester/app/builder/builder.go b/cmd/ingester/app/builder/builder.go index 279823fe943..d382baac683 100644 --- a/cmd/ingester/app/builder/builder.go +++ b/cmd/ingester/app/builder/builder.go @@ -55,6 +55,7 @@ func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWrit Topic: options.Topic, GroupID: options.GroupID, ClientID: options.ClientID, + Version: options.Version, AuthenticationConfig: options.AuthenticationConfig, } saramaConsumer, err := consumerConfig.NewConsumer() diff --git a/cmd/ingester/app/flags.go b/cmd/ingester/app/flags.go index 9eba6832865..9467eb81a83 100644 --- a/cmd/ingester/app/flags.go +++ b/cmd/ingester/app/flags.go @@ -41,6 +41,8 @@ const ( SuffixGroupID = ".group-id" // SuffixClientID is a suffix for the client-id flag SuffixClientID = ".client-id" + // SuffixVersion Kafka protocol version - must be supported by kafka server + SuffixVersion = ".version" // SuffixEncoding is a suffix for the encoding flag SuffixEncoding = ".encoding" // SuffixDeadlockInterval is a suffix for deadlock detecor flag @@ -91,6 +93,10 @@ func AddFlags(flagSet *flag.FlagSet) { KafkaConsumerConfigPrefix+SuffixClientID, DefaultClientID, "The Consumer Client ID that ingester will use") + flagSet.String( + KafkaConsumerConfigPrefix+SuffixVersion, + "", + "Kafka version - must be supported by kafka server") flagSet.String( KafkaConsumerConfigPrefix+SuffixEncoding, DefaultEncoding, @@ -113,6 +119,7 @@ func (o *Options) InitFromViper(v *viper.Viper) { o.Topic = v.GetString(KafkaConsumerConfigPrefix + SuffixTopic) o.GroupID = v.GetString(KafkaConsumerConfigPrefix + SuffixGroupID) o.ClientID = v.GetString(KafkaConsumerConfigPrefix + SuffixClientID) + o.Version = v.GetString(KafkaConsumerConfigPrefix + SuffixVersion) o.Encoding = v.GetString(KafkaConsumerConfigPrefix + SuffixEncoding) o.Parallelism = v.GetInt(ConfigPrefix + SuffixParallelism) diff --git a/cmd/ingester/app/flags_test.go b/cmd/ingester/app/flags_test.go index 00c178668e3..3186280a673 100644 --- a/cmd/ingester/app/flags_test.go +++ b/cmd/ingester/app/flags_test.go @@ -33,6 +33,7 @@ func TestOptionsWithFlags(t *testing.T) { "--kafka.consumer.group-id=group1", "--kafka.consumer.client-id=client-id1", "--kafka.consumer.encoding=json", + "--kafka.consumer.version=1.0.0", "--ingester.parallelism=5", "--ingester.deadlockInterval=2m", }) @@ -42,6 +43,7 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, o.Brokers) assert.Equal(t, "group1", o.GroupID) assert.Equal(t, "client-id1", o.ClientID) + assert.Equal(t, "1.0.0", o.Version) assert.Equal(t, 5, o.Parallelism) assert.Equal(t, 2*time.Minute, o.DeadlockInterval) assert.Equal(t, kafka.EncodingJSON, o.Encoding) diff --git a/pkg/kafka/consumer/config.go b/pkg/kafka/consumer/config.go index 509800b0bdb..57603188072 100644 --- a/pkg/kafka/consumer/config.go +++ b/pkg/kafka/consumer/config.go @@ -17,6 +17,7 @@ package consumer import ( "io" + "github.com/Shopify/sarama" "github.com/bsm/sarama-cluster" "github.com/jaegertracing/jaeger/pkg/kafka/auth" @@ -40,6 +41,7 @@ type Configuration struct { Topic string GroupID string ClientID string + Version string Consumer auth.AuthenticationConfig } @@ -49,6 +51,13 @@ func (c *Configuration) NewConsumer() (Consumer, error) { saramaConfig := cluster.NewConfig() saramaConfig.Group.Mode = cluster.ConsumerModePartitions saramaConfig.ClientID = c.ClientID + if len(c.Version) > 0 { + ver, err := sarama.ParseKafkaVersion(c.Version) + if err != nil { + return nil, err + } + saramaConfig.Config.Version = ver + } c.AuthenticationConfig.SetConfiguration(&saramaConfig.Config) return cluster.NewConsumer(c.Brokers, c.GroupID, []string{c.Topic}, saramaConfig) } From aebc038f35362ca6992bdcb724e8a34d9bb3bc7d Mon Sep 17 00:00:00 2001 From: Chodor Marek Date: Mon, 8 Jul 2019 11:32:42 +0200 Subject: [PATCH 2/4] rename kafka.consumer.version to kafka.consumer.protcol-version Signed-off-by: Chodor Marek --- cmd/ingester/app/builder/builder.go | 2 +- cmd/ingester/app/flags.go | 10 +++++----- cmd/ingester/app/flags_test.go | 4 ++-- pkg/kafka/consumer/config.go | 14 +++++++------- 4 files changed, 15 insertions(+), 15 deletions(-) diff --git a/cmd/ingester/app/builder/builder.go b/cmd/ingester/app/builder/builder.go index d382baac683..4d2e86d100c 100644 --- a/cmd/ingester/app/builder/builder.go +++ b/cmd/ingester/app/builder/builder.go @@ -55,7 +55,7 @@ func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWrit Topic: options.Topic, GroupID: options.GroupID, ClientID: options.ClientID, - Version: options.Version, + ProtocolVersion: options.ProtocolVersion, AuthenticationConfig: options.AuthenticationConfig, } saramaConsumer, err := consumerConfig.NewConsumer() diff --git a/cmd/ingester/app/flags.go b/cmd/ingester/app/flags.go index 9467eb81a83..3d45337f84f 100644 --- a/cmd/ingester/app/flags.go +++ b/cmd/ingester/app/flags.go @@ -41,8 +41,8 @@ const ( SuffixGroupID = ".group-id" // SuffixClientID is a suffix for the client-id flag SuffixClientID = ".client-id" - // SuffixVersion Kafka protocol version - must be supported by kafka server - SuffixVersion = ".version" + // SuffixProtocolVersion Kafka protocol version - must be supported by kafka server + SuffixProtocolVersion = ".protocol-version" // SuffixEncoding is a suffix for the encoding flag SuffixEncoding = ".encoding" // SuffixDeadlockInterval is a suffix for deadlock detecor flag @@ -94,9 +94,9 @@ func AddFlags(flagSet *flag.FlagSet) { DefaultClientID, "The Consumer Client ID that ingester will use") flagSet.String( - KafkaConsumerConfigPrefix+SuffixVersion, + KafkaConsumerConfigPrefix+SuffixProtocolVersion, "", - "Kafka version - must be supported by kafka server") + "Kafka protocol version - must be supported by kafka server") flagSet.String( KafkaConsumerConfigPrefix+SuffixEncoding, DefaultEncoding, @@ -119,7 +119,7 @@ func (o *Options) InitFromViper(v *viper.Viper) { o.Topic = v.GetString(KafkaConsumerConfigPrefix + SuffixTopic) o.GroupID = v.GetString(KafkaConsumerConfigPrefix + SuffixGroupID) o.ClientID = v.GetString(KafkaConsumerConfigPrefix + SuffixClientID) - o.Version = v.GetString(KafkaConsumerConfigPrefix + SuffixVersion) + o.ProtocolVersion = v.GetString(KafkaConsumerConfigPrefix + SuffixProtocolVersion) o.Encoding = v.GetString(KafkaConsumerConfigPrefix + SuffixEncoding) o.Parallelism = v.GetInt(ConfigPrefix + SuffixParallelism) diff --git a/cmd/ingester/app/flags_test.go b/cmd/ingester/app/flags_test.go index 3186280a673..fab2d5ed2f1 100644 --- a/cmd/ingester/app/flags_test.go +++ b/cmd/ingester/app/flags_test.go @@ -33,7 +33,7 @@ func TestOptionsWithFlags(t *testing.T) { "--kafka.consumer.group-id=group1", "--kafka.consumer.client-id=client-id1", "--kafka.consumer.encoding=json", - "--kafka.consumer.version=1.0.0", + "--kafka.consumer.protocol-version=1.0.0", "--ingester.parallelism=5", "--ingester.deadlockInterval=2m", }) @@ -43,7 +43,7 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, o.Brokers) assert.Equal(t, "group1", o.GroupID) assert.Equal(t, "client-id1", o.ClientID) - assert.Equal(t, "1.0.0", o.Version) + assert.Equal(t, "1.0.0", o.ProtocolVersion) assert.Equal(t, 5, o.Parallelism) assert.Equal(t, 2*time.Minute, o.DeadlockInterval) assert.Equal(t, kafka.EncodingJSON, o.Encoding) diff --git a/pkg/kafka/consumer/config.go b/pkg/kafka/consumer/config.go index 57603188072..caa21818de0 100644 --- a/pkg/kafka/consumer/config.go +++ b/pkg/kafka/consumer/config.go @@ -37,11 +37,11 @@ type Builder interface { // Configuration describes the configuration properties needed to create a Kafka consumer type Configuration struct { - Brokers []string - Topic string - GroupID string - ClientID string - Version string + Brokers []string + Topic string + GroupID string + ClientID string + ProtocolVersion string Consumer auth.AuthenticationConfig } @@ -51,8 +51,8 @@ func (c *Configuration) NewConsumer() (Consumer, error) { saramaConfig := cluster.NewConfig() saramaConfig.Group.Mode = cluster.ConsumerModePartitions saramaConfig.ClientID = c.ClientID - if len(c.Version) > 0 { - ver, err := sarama.ParseKafkaVersion(c.Version) + if len(c.ProtocolVersion) > 0 { + ver, err := sarama.ParseKafkaVersion(c.ProtocolVersion) if err != nil { return nil, err } From caf0760eb6a08cd33d5c61d5673c4c87436d8f83 Mon Sep 17 00:00:00 2001 From: Chodor Marek Date: Tue, 9 Jul 2019 09:48:10 +0200 Subject: [PATCH 3/4] Configurable kafka protocol/server version in producer Signed-off-by: Chodor Marek --- pkg/kafka/producer/config.go | 10 +++++++++- plugin/storage/kafka/options.go | 20 +++++++++++++------- 2 files changed, 22 insertions(+), 8 deletions(-) diff --git a/pkg/kafka/producer/config.go b/pkg/kafka/producer/config.go index c866c4d957f..4e254e12b1a 100644 --- a/pkg/kafka/producer/config.go +++ b/pkg/kafka/producer/config.go @@ -27,7 +27,8 @@ type Builder interface { // Configuration describes the configuration properties needed to create a Kafka producer type Configuration struct { - Brokers []string + Brokers []string + ProtocolVersion string auth.AuthenticationConfig } @@ -36,5 +37,12 @@ func (c *Configuration) NewProducer() (sarama.AsyncProducer, error) { saramaConfig := sarama.NewConfig() saramaConfig.Producer.Return.Successes = true c.AuthenticationConfig.SetConfiguration(saramaConfig) + if len(c.ProtocolVersion) > 0 { + ver, err := sarama.ParseKafkaVersion(c.ProtocolVersion) + if err != nil { + return nil, err + } + saramaConfig.Version = ver + } return sarama.NewAsyncProducer(c.Brokers, saramaConfig) } diff --git a/plugin/storage/kafka/options.go b/plugin/storage/kafka/options.go index 136449d6e14..1a4646da52f 100644 --- a/plugin/storage/kafka/options.go +++ b/plugin/storage/kafka/options.go @@ -33,13 +33,14 @@ const ( // EncodingZipkinThrift is used for spans encoded as Zipkin Thrift. EncodingZipkinThrift = "zipkin-thrift" - configPrefix = "kafka.producer" - suffixBrokers = ".brokers" - suffixTopic = ".topic" - suffixEncoding = ".encoding" - defaultBroker = "127.0.0.1:9092" - defaultTopic = "jaeger-spans" - defaultEncoding = EncodingProto + configPrefix = "kafka.producer" + suffixBrokers = ".brokers" + suffixTopic = ".topic" + suffixProtocolVersion = ".protocol-version" + suffixEncoding = ".encoding" + defaultBroker = "127.0.0.1:9092" + defaultTopic = "jaeger-spans" + defaultEncoding = EncodingProto ) var ( @@ -64,6 +65,10 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) { configPrefix+suffixTopic, defaultTopic, "(experimental) The name of the kafka topic") + flagSet.String( + configPrefix+suffixProtocolVersion, + "", + "(experimental) Kafka protocol version - must be supported by kafka server") flagSet.String( configPrefix+suffixEncoding, defaultEncoding, @@ -78,6 +83,7 @@ func (opt *Options) InitFromViper(v *viper.Viper) { authenticationOptions.InitFromViper(configPrefix, v) opt.config = producer.Configuration{ Brokers: strings.Split(stripWhiteSpace(v.GetString(configPrefix+suffixBrokers)), ","), + ProtocolVersion: v.GetString(configPrefix + suffixProtocolVersion), AuthenticationConfig: authenticationOptions, } opt.topic = v.GetString(configPrefix + suffixTopic) From f97c7bd14a625c94e798132032739e1eebdf58df Mon Sep 17 00:00:00 2001 From: Chodor Marek Date: Thu, 11 Jul 2019 14:12:24 +0200 Subject: [PATCH 4/4] remove "(experimental)" annotation from kafka.producer settings Signed-off-by: Chodor Marek --- plugin/storage/kafka/options.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/plugin/storage/kafka/options.go b/plugin/storage/kafka/options.go index 1a4646da52f..8b8ff0760fa 100644 --- a/plugin/storage/kafka/options.go +++ b/plugin/storage/kafka/options.go @@ -60,19 +60,19 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) { flagSet.String( configPrefix+suffixBrokers, defaultBroker, - "(experimental) The comma-separated list of kafka brokers. i.e. '127.0.0.1:9092,0.0.0:1234'") + "The comma-separated list of kafka brokers. i.e. '127.0.0.1:9092,0.0.0:1234'") flagSet.String( configPrefix+suffixTopic, defaultTopic, - "(experimental) The name of the kafka topic") + "The name of the kafka topic") flagSet.String( configPrefix+suffixProtocolVersion, "", - "(experimental) Kafka protocol version - must be supported by kafka server") + "Kafka protocol version - must be supported by kafka server") flagSet.String( configPrefix+suffixEncoding, defaultEncoding, - fmt.Sprintf(`(experimental) Encoding of spans ("%s" or "%s") sent to kafka.`, EncodingJSON, EncodingProto), + fmt.Sprintf(`Encoding of spans ("%s" or "%s") sent to kafka.`, EncodingJSON, EncodingProto), ) auth.AddFlags(configPrefix, flagSet) }