Skip to content

Commit

Permalink
bench: add -linger, -disable-idempotency, -log-level options
Browse files Browse the repository at this point in the history
-linger can be particularly useful to see if adding some lingering will
help your workload.
  • Loading branch information
twmb committed Jun 16, 2021
1 parent b10b2d5 commit 5047b31
Showing 1 changed file with 23 additions and 0 deletions.
23 changes: 23 additions & 0 deletions examples/bench/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ var (
noCompression = flag.Bool("no-compression", false, "set to disable snappy compression (producing)")
poolProduce = flag.Bool("pool", false, "if true, use a sync.Pool to reuse record structs/slices (producing)")

noIdempotency = flag.Bool("disable-idempotency", false, "if true, disable idempotency (force 1 produce rps")
logLevel = flag.String("log-level", "", "if non-empty, use a basic logger with this log level (debug, info, warn, error)")
linger = flag.Duration("linger", 0, "if non-zero, linger to use when producing")

consume = flag.Bool("consume", false, "if true, consume rather than produce")
group = flag.String("group", "", "if non-empty, group to use for consuming rather than direct partition consuming (consuming)")

Expand Down Expand Up @@ -79,19 +83,38 @@ func main() {
// back because snappy deflation will balloon our memory usage.
kgo.FetchMaxBytes(5 << 20),
}
if *noIdempotency {
opts = append(opts, kgo.DisableIdempotentWrite())
}
if *consume {
opts = append(opts, kgo.ConsumeTopics(*topic))
if *group != "" {
opts = append(opts, kgo.ConsumerGroup(*group))
}
}

switch strings.ToLower(*logLevel) {
case "":
case "debug":
opts = append(opts, kgo.WithLogger(kgo.BasicLogger(os.Stderr, kgo.LogLevelDebug, nil)))
case "info":
opts = append(opts, kgo.WithLogger(kgo.BasicLogger(os.Stderr, kgo.LogLevelInfo, nil)))
case "warn":
opts = append(opts, kgo.WithLogger(kgo.BasicLogger(os.Stderr, kgo.LogLevelWarn, nil)))
case "error":
opts = append(opts, kgo.WithLogger(kgo.BasicLogger(os.Stderr, kgo.LogLevelError, nil)))
}

if *linger != 0 {
opts = append(opts, kgo.Linger(*linger))
}
if *noCompression {
opts = append(opts, kgo.BatchCompression(kgo.NoCompression()))
}
if *dialTLS {
opts = append(opts, kgo.Dialer((new(tls.Dialer)).DialContext))
}

if *saslMethod != "" || *saslUser != "" || *saslPass != "" {
if *saslMethod == "" || *saslUser == "" || *saslPass == "" {
die("all of -sasl-method, -sasl-user, -sasl-pass must be specified if any are")
Expand Down

0 comments on commit 5047b31

Please sign in to comment.