Skip to content

Commit

Permalink
examples: add benchmarking example
Browse files Browse the repository at this point in the history
This is useful to demonstrate a bunch of aspects of producing &
consuming, while also demonstrating throughput.
  • Loading branch information
twmb committed May 30, 2021
1 parent fec2a18 commit d848174
Show file tree
Hide file tree
Showing 5 changed files with 305 additions and 0 deletions.
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,15 @@ supplementary information. It is recommended to always use a logger and to use
See [this example](./examples/hooks_and_logging/prometheus) for an example of
integrating with prometheus!

## Benchmarks

This client is quite fast; it is the fastest and most cpu and memory efficient
client in Go (and may even beat out librdkafka).

To check benchmarks yourself, see the [bench](./examples/bench) example. This
example lets you produce or consume to a cluster and see the byte / record
rate.

## Supported KIPs

Theoretically, this library supports every (non-Java-specific) client facing
Expand Down
58 changes: 58 additions & 0 deletions examples/bench/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Bench

This example allows for benchmarking producing or consuming against a local
cluster, and prints the producing and consuming byte and record rates.

This example is also a good general example if you want to look at plugging
in TLS or SASL.

## Examples

```
go run . -topic foo
go run . -topic foo -consume
go run . -topic foo -no-compression
go run . -tls -sasl-method scram-sha-256 -sasl-user user -sasl-pass pass -consume -group group -topic foo
```

## Flags

`-brokers` can be specified to override the default localhost:9092 broker to
any comma delimited set of brokers.

`-topic` specifies the topic to produce to or consume from.

`-pprof` sets a port to bind to to enable the default pprof handlers.

### Producing (only relevant if producing)

`-record-bytes` specifies the size of records to produce.

`-no-compression` disables snappy compression.

`-pool` enables using a `sync.Pool` to reuse records and value slices, reducing
garbage as a factor of the benchmark.

### Consuming (only relevant if consuming)

`-consume` opts in to consuming.

`-group` switches from consuming partitions directly to consuming as a part of
a consumer group.


### Connecting (optional tls, sasl)

`-tls` if true, sets the benchmark to dial over tls

`-sasl-method` specifies a SASL method to use when connecting. This supports
`PLAIN`, `SCRAM-SHA-256`, `SCRAM-SHA-512`, or `AWS_MSK_IAM` (any casing, with
or without dashes or underscores).

`-sasl-user` specifies the username to use for SASL. If using `AWS_MSK_IAM`,
this is the access key.

`-sasl-pass` specifies the password to use for SASL. If using `AWS_MSK_IAM`,
this is the secret key.
7 changes: 7 additions & 0 deletions examples/bench/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
module bench

go 1.16

require github.com/twmb/franz-go v0.7.6

replace github.com/twmb/franz-go => ../..
44 changes: 44 additions & 0 deletions examples/bench/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA=
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.3.1 h1:Xye71clBPdm5HgqGwUkwhbynsUJZhDbS20FvLhQ2izg=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4=
github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM=
github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs=
github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM=
github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o=
github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg=
github.com/jcmturner/gokrb5/v8 v8.4.2/go.mod h1:sb+Xq/fTY5yktf/VxLsE3wlfPqQjp0aWNYyvBVK62bc=
github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
github.com/klauspost/compress v1.12.2 h1:2KCfW3I9M7nSc5wOqXAlW2v2U6v+w6cbjvbfp+OykW8=
github.com/klauspost/compress v1.12.2/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/pierrec/lz4/v4 v4.1.6 h1:ueMTcBBFrbT8K4uGDNNZPa8Z7LtPV7Cl0TDjaeHxP44=
github.com/pierrec/lz4/v4 v4.1.6/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/twmb/go-rbtree v1.0.0 h1:KxN7dXJ8XaZ4cvmHV1qqXTshxX3EBvX/toG5+UR49Mg=
github.com/twmb/go-rbtree v1.0.0/go.mod h1:UlIAI8gu3KRPkXSobZnmJfVwCJgEhD/liWzT5ppzIyc=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b h1:7mWr3k41Qtv8XlltBkDkl8LoP3mpSgBW8BUoxtEdbXg=
golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210423184538-5f58ad60dda6/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
187 changes: 187 additions & 0 deletions examples/bench/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
package main

import (
"context"
"crypto/tls"
"flag"
"fmt"
"net/http"
_ "net/http/pprof"
"os"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/sasl/aws"
"github.com/twmb/franz-go/pkg/sasl/plain"
"github.com/twmb/franz-go/pkg/sasl/scram"
)

var (
seedBrokers = flag.String("brokers", "localhost:9092", "comma delimited list of seed brokers")
topic = flag.String("topic", "", "topic to produce to or consume from")
pprofPort = flag.String("pprof", ":9876", "port to bind to for pprof, if non-empty")

recordBytes = flag.Int("record-bytes", 100, "bytes per record (producing)")
noCompression = flag.Bool("no-compression", false, "set to disable snappy compression (producing)")
poolProduce = flag.Bool("pool", false, "if true, use a sync.Pool to reuse record structs/slices (producing)")

consume = flag.Bool("consume", false, "if true, consume rather than produce")
group = flag.String("group", "", "if non-empty, group to use for consuming rather than direct partition consuming (consuming)")

dialTLS = flag.Bool("tls", false, "if true, use tls for connecting")

saslMethod = flag.String("sasl-method", "", "if non-empty, sasl method to use (must specify all options; supports plain, scram-sha-256, scram-sha-512, aws_msk_iam)")
saslUser = flag.String("sasl-user", "", "if non-empty, username to use for sasl (must specify all options)")
saslPass = flag.String("sasl-pass", "", "if non-empty, password to use for sasl (must specify all options)")

rateRecs int64
rateBytes int64
)

func printRate() {
go func() {
for range time.Tick(time.Second) {
recs := atomic.SwapInt64(&rateRecs, 0)
bytes := atomic.SwapInt64(&rateBytes, 0)
fmt.Printf("%0.2f MiB/s; %0.2fk records/s\n", float64(bytes)/(1024*1024), float64(recs)/1000)
}
}()
}

func die(msg string, args ...interface{}) {
fmt.Fprintf(os.Stderr, msg+"\n", args...)
os.Exit(1)
}

func chk(err error, msg string, args ...interface{}) {
if err != nil {
die(msg, args...)
}
}

func main() {
flag.Parse()

if *recordBytes <= 0 {
die("record bytes must be larger than zero")
}

opts := []kgo.Opt{
kgo.SeedBrokers(strings.Split(*seedBrokers, ",")...),
kgo.ProduceTopic(*topic),
kgo.MaxBufferedRecords(250<<20 / *recordBytes + 1),
kgo.AllowedConcurrentFetches(3),
}
if *noCompression {
opts = append(opts, kgo.BatchCompression(kgo.NoCompression()))
}
if *dialTLS {
opts = append(opts, kgo.Dialer((new(tls.Dialer)).DialContext))
}
if *saslMethod != "" || *saslUser != "" || *saslPass != "" {
if *saslMethod == "" || *saslUser == "" || *saslPass == "" {
die("all of -sasl-method, -sasl-user, -sasl-pass must be specified if any are")
}
method := strings.ToLower(*saslMethod)
method = strings.ReplaceAll(method, "-", "")
method = strings.ReplaceAll(method, "_", "")
switch method {
case "plain":
opts = append(opts, kgo.SASL(plain.Auth{
User: *saslUser,
Pass: *saslPass,
}.AsMechanism()))
case "scramsha256":
opts = append(opts, kgo.SASL(scram.Auth{
User: *saslUser,
Pass: *saslPass,
}.AsSha256Mechanism()))
case "scramsha512":
opts = append(opts, kgo.SASL(scram.Auth{
User: *saslUser,
Pass: *saslPass,
}.AsSha512Mechanism()))
case "awsmskiam":
opts = append(opts, kgo.SASL(aws.Auth{
AccessKey: *saslUser,
SecretKey: *saslPass,
}.AsManagedStreamingIAMMechanism()))
}
}

cl, err := kgo.NewClient(opts...)
chk(err, "unable to initialize client: %v", err)

if *pprofPort != "" {
go func() {
err := http.ListenAndServe(*pprofPort, nil)
chk(err, "unable to run pprof listener: %v", err)
}()
}

go printRate()

switch *consume {
case false:
var num int64
for {
atomic.AddInt64(&rateRecs, 1)
atomic.AddInt64(&rateBytes, int64(*recordBytes))
cl.Produce(context.Background(), newRecord(num), func(r *kgo.Record, err error) {
if *poolProduce {
p.Put(r)
}
chk(err, "produce error: %v", err)
})
num++
}
case true:
if *group != "" {
cl.AssignGroup(*group, kgo.GroupTopics(*topic))
} else {
cl.AssignPartitions(kgo.ConsumeTopics(kgo.NewOffset().AtStart(), *topic))
}

for {
fetches := cl.PollFetches(context.Background())
var recs int64
var bytes int64
fetches.EachRecord(func(r *kgo.Record) {
recs++
bytes += int64(len(r.Value))
})
atomic.AddInt64(&rateRecs, recs)
atomic.AddInt64(&rateBytes, bytes)
}
}
}

var p = sync.Pool{
New: func() interface{} {
s := make([]byte, *recordBytes)
return &kgo.Record{Value: s}
},
}

func newRecord(num int64) *kgo.Record {
var buf [20]byte // max int64 takes 19 bytes, then we add a space
b := strconv.AppendInt(buf[:0], num, 10)
b = append(b, ' ')

var r *kgo.Record
if *poolProduce {
r = p.Get().(*kgo.Record)
} else {
r = kgo.SliceRecord(make([]byte, *recordBytes))
}

var n int
for n != len(r.Value) {
n += copy(r.Value[n:], b)
}
return r
}

0 comments on commit d848174

Please sign in to comment.