Skip to content

Commit

Permalink
Add --kafka.producer.batch-min-messages collector flag (#2371)
Browse files Browse the repository at this point in the history
Currently to control batch size when collector writes to kafka user can set --kafka.producer.batch-max-messages flag. This flag is used to configure kafka client library and set upper limit on the number of messages in a single batch, so the meaning of this flag is 'write up to N messages in a single batch'.
Current wording on --kafka.producer.batch-max-messages help text is 'Number of message to batch before sending records to Kafka.' which suggests that it's used to set minimal number of messages needed to send a batch to kafka.
For a user who wants to enforce a minimal batch size this means that there's currently no way of doing that, but at the same time there's a flag that is described as a way to do it, while actually doing something else.

This patch adds a --kafka.producer.batch-min-messages flag that allows configuring minimal batch size in the kafka client library and updates --kafka.producer.batch-max-messages flag description to be more clear on the effect it causes.

Signed-off-by: Łukasz Mierzwa <[email protected]>
  • Loading branch information
prymitive authored Aug 4, 2020
1 parent 9529bee commit 3cff335
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 1 deletion.
2 changes: 2 additions & 0 deletions pkg/kafka/producer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type Configuration struct {
ProtocolVersion string `mapstructure:"protocol_version"`
BatchLinger time.Duration `mapstructure:"batch_linger"`
BatchSize int `mapstructure:"batch_size"`
BatchMinMessages int `mapstructure:"batch_min_messages"`
BatchMaxMessages int `mapstructure:"batch_max_messages"`
auth.AuthenticationConfig `mapstructure:"authentication"`
}
Expand All @@ -49,6 +50,7 @@ func (c *Configuration) NewProducer() (sarama.AsyncProducer, error) {
saramaConfig.Producer.Return.Successes = true
saramaConfig.Producer.Flush.Bytes = c.BatchSize
saramaConfig.Producer.Flush.Frequency = c.BatchLinger
saramaConfig.Producer.Flush.Messages = c.BatchMinMessages
saramaConfig.Producer.Flush.MaxMessages = c.BatchMaxMessages
if len(c.ProtocolVersion) > 0 {
ver, err := sarama.ParseKafkaVersion(c.ProtocolVersion)
Expand Down
10 changes: 9 additions & 1 deletion plugin/storage/kafka/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const (
suffixProtocolVersion = ".protocol-version"
suffixBatchLinger = ".batch-linger"
suffixBatchSize = ".batch-size"
suffixBatchMinMessages = ".batch-min-messages"
suffixBatchMaxMessages = ".batch-max-messages"

defaultBroker = "127.0.0.1:9092"
Expand All @@ -55,6 +56,7 @@ const (
defaultCompressionLevel = 0
defaultBatchLinger = 0
defaultBatchSize = 0
defaultBatchMinMessages = 0
defaultBatchMaxMessages = 0
)

Expand Down Expand Up @@ -157,10 +159,15 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) {
defaultBatchSize,
"(experimental) Number of bytes to batch before sending records to Kafka. Higher value reduce request to Kafka but increase latency and the possibility of data loss in case of process restart. See https://kafka.apache.org/documentation/",
)
flagSet.Int(
configPrefix+suffixBatchMinMessages,
defaultBatchMinMessages,
"(experimental) The best-effort minimum number of messages needed to send a batch of records to Kafka. Higher value reduce request to Kafka but increase latency and the possibility of data loss in case of process restart. See https://kafka.apache.org/documentation/",
)
flagSet.Int(
configPrefix+suffixBatchMaxMessages,
defaultBatchMaxMessages,
"(experimental) Number of message to batch before sending records to Kafka. Higher value reduce request to Kafka but increase latency and the possibility of data loss in case of process restart. See https://kafka.apache.org/documentation/",
"(experimental) Maximum number of message to batch before sending records to Kafka",
)
auth.AddFlags(configPrefix, flagSet)
}
Expand Down Expand Up @@ -195,6 +202,7 @@ func (opt *Options) InitFromViper(v *viper.Viper) {
AuthenticationConfig: authenticationOptions,
BatchLinger: v.GetDuration(configPrefix + suffixBatchLinger),
BatchSize: v.GetInt(configPrefix + suffixBatchSize),
BatchMinMessages: v.GetInt(configPrefix + suffixBatchMinMessages),
BatchMaxMessages: v.GetInt(configPrefix + suffixBatchMaxMessages),
}
opt.Topic = v.GetString(configPrefix + suffixTopic)
Expand Down
3 changes: 3 additions & 0 deletions plugin/storage/kafka/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func TestOptionsWithFlags(t *testing.T) {
"--kafka.producer.compression-level=7",
"--kafka.producer.batch-linger=1s",
"--kafka.producer.batch-size=128000",
"--kafka.producer.batch-min-messages=50",
"--kafka.producer.batch-max-messages=100",
})
opts.InitFromViper(v)
Expand All @@ -52,6 +53,7 @@ func TestOptionsWithFlags(t *testing.T) {
assert.Equal(t, 7, opts.Config.CompressionLevel)
assert.Equal(t, 128000, opts.Config.BatchSize)
assert.Equal(t, time.Duration(1*time.Second), opts.Config.BatchLinger)
assert.Equal(t, 50, opts.Config.BatchMinMessages)
assert.Equal(t, 100, opts.Config.BatchMaxMessages)
}

Expand All @@ -69,6 +71,7 @@ func TestFlagDefaults(t *testing.T) {
assert.Equal(t, 0, opts.Config.CompressionLevel)
assert.Equal(t, 0, opts.Config.BatchSize)
assert.Equal(t, time.Duration(0*time.Second), opts.Config.BatchLinger)
assert.Equal(t, 0, opts.Config.BatchMinMessages)
assert.Equal(t, 0, opts.Config.BatchMaxMessages)
}

Expand Down

0 comments on commit 3cff335

Please sign in to comment.