From 5576dceff72fe7a912128b3b26d85f53d0113dc7 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sun, 30 May 2021 19:36:11 -0600 Subject: [PATCH] benchmarks: add comparisons to confluent-kafka-go & sarama I was not expecting confluent-kafka-go to be so slow, but I guess the cgo and then channel overhead quickly adds up. For sarama, I was not expecting consuming to be so fast. But, franz-go still beats it by 1.5x. This also drops down the fetch max bytes in the franz-go benchmark, because decompressing inflated responses a lot, which ate up a significant amount of memory. --- README.md | 13 +- examples/bench/README.md | 10 + .../bench/compare/confluent-kafka-go/go.mod | 5 + .../bench/compare/confluent-kafka-go/go.sum | 2 + .../bench/compare/confluent-kafka-go/main.go | 132 +++++++++++++ examples/bench/compare/sarama/go.mod | 10 + examples/bench/compare/sarama/go.sum | 89 +++++++++ examples/bench/compare/sarama/main.go | 181 ++++++++++++++++++ examples/bench/main.go | 9 +- 9 files changed, 447 insertions(+), 4 deletions(-) create mode 100644 examples/bench/compare/confluent-kafka-go/go.mod create mode 100644 examples/bench/compare/confluent-kafka-go/go.sum create mode 100644 examples/bench/compare/confluent-kafka-go/main.go create mode 100644 examples/bench/compare/sarama/go.mod create mode 100644 examples/bench/compare/sarama/go.sum create mode 100644 examples/bench/compare/sarama/main.go diff --git a/README.md b/README.md index 85fd464a..79c904bf 100644 --- a/README.md +++ b/README.md @@ -155,11 +155,20 @@ integrating with prometheus! ## Benchmarks This client is quite fast; it is the fastest and most cpu and memory efficient -client in Go (and may even beat out librdkafka). +client in Go. + +For 100 byte messages, + +- This client is 4x faster at producing than confluent-kafka-go, and up to + 10x-20x faster (at the expense of more memory usage) at consuming. + +- This client is 2.5x faster at producing than sarama, and 1.5x faster at + consuming. To check benchmarks yourself, see the [bench](./examples/bench) example. This example lets you produce or consume to a cluster and see the byte / record -rate. +rate. The [compare](./examples/bench/compare) subdirectory shows comparison +code. ## Supported KIPs diff --git a/examples/bench/README.md b/examples/bench/README.md index 826859a9..05abf894 100644 --- a/examples/bench/README.md +++ b/examples/bench/README.md @@ -17,6 +17,16 @@ go run . -topic foo -no-compression go run . -tls -sasl-method scram-sha-256 -sasl-user user -sasl-pass pass -consume -group group -topic foo ``` +## Comparisons + +The [compare](./compare) directory has comparisons to other clients. For +simplicity, not every aspect that is supported in this benchmark is ported to +comparisons: tls support and sasl support may be missing, and for the +confluent-kafka-go comparison, it does not seem possible to consume outside of +a group. + +All flags that _can_ be easily supported in comparisons are the same. + ## Flags `-brokers` can be specified to override the default localhost:9092 broker to diff --git a/examples/bench/compare/confluent-kafka-go/go.mod b/examples/bench/compare/confluent-kafka-go/go.mod new file mode 100644 index 00000000..f2d27318 --- /dev/null +++ b/examples/bench/compare/confluent-kafka-go/go.mod @@ -0,0 +1,5 @@ +module ckg + +go 1.16 + +require github.com/confluentinc/confluent-kafka-go v1.7.0 diff --git a/examples/bench/compare/confluent-kafka-go/go.sum b/examples/bench/compare/confluent-kafka-go/go.sum new file mode 100644 index 00000000..d1419097 --- /dev/null +++ b/examples/bench/compare/confluent-kafka-go/go.sum @@ -0,0 +1,2 @@ +github.com/confluentinc/confluent-kafka-go v1.7.0 h1:tXh3LWb2Ne0WiU3ng4h5qiGA9XV61rz46w60O+cq8bM= +github.com/confluentinc/confluent-kafka-go v1.7.0/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg= diff --git a/examples/bench/compare/confluent-kafka-go/main.go b/examples/bench/compare/confluent-kafka-go/main.go new file mode 100644 index 00000000..fa4cd9b4 --- /dev/null +++ b/examples/bench/compare/confluent-kafka-go/main.go @@ -0,0 +1,132 @@ +package main + +import ( + "flag" + "fmt" + "os" + "strconv" + "sync/atomic" + "time" + + "github.com/confluentinc/confluent-kafka-go/kafka" +) + +var ( + seedBrokers = flag.String("brokers", "localhost:9092", "comma delimited list of seed brokers") + topic = flag.String("topic", "", "topic to produce to or consume from") + + recordBytes = flag.Int("record-bytes", 100, "bytes per record (producing)") + noCompression = flag.Bool("no-compression", false, "set to disable snappy compression (producing)") + + consume = flag.Bool("consume", false, "if true, consume rather than produce") + group = flag.String("group", "", "if non-empty, group to use for consuming rather than direct partition consuming (consuming)") + + rateRecs int64 + rateBytes int64 +) + +func printRate() { + go func() { + for range time.Tick(time.Second) { + recs := atomic.SwapInt64(&rateRecs, 0) + bytes := atomic.SwapInt64(&rateBytes, 0) + fmt.Printf("%0.2f MiB/s; %0.2fk records/s\n", float64(bytes)/(1024*1024), float64(recs)/1000) + } + }() +} + +func die(msg string, args ...interface{}) { + fmt.Fprintf(os.Stderr, msg+"\n", args...) + os.Exit(1) +} + +func chk(err error, msg string, args ...interface{}) { + if err != nil { + die(msg, args...) + } +} + +func main() { + flag.Parse() + + go printRate() + + switch *consume { + case false: + cfg := &kafka.ConfigMap{ + "bootstrap.servers": *seedBrokers, + "enable.idempotence": true, + } + if !*noCompression { + (*cfg)["compression.codec"] = "snappy" + } + p, err := kafka.NewProducer(cfg) + chk(err, "unable to create producer: %v", err) + + go func() { + for { + switch ev := (<-p.Events()).(type) { + case *kafka.Message: + err := ev.TopicPartition.Error + chk(err, "produce error: %v", err) + atomic.AddInt64(&rateRecs, 1) + atomic.AddInt64(&rateBytes, int64(*recordBytes)) + case kafka.Error: + if ev.IsFatal() { + die("fatal err: %v", err) + } + } + } + }() + var num int64 + for { + err = p.Produce(&kafka.Message{ + TopicPartition: kafka.TopicPartition{Topic: topic, Partition: kafka.PartitionAny}, + Value: newValue(num), + }, nil) + num++ + chk(err, "unable to produce: %v", err) + } + + case true: + cfg := &kafka.ConfigMap{ + "bootstrap.servers": *seedBrokers, + "auto.offset.reset": "earliest", + "group.id": *group, + } + c, err := kafka.NewConsumer(cfg) + chk(err, "unable to create consumer: %v", err) + + if err := c.Subscribe(*topic, nil); err != nil { + die("unable to subscribe to topic: %v", err) + + } + for { + ev := c.Poll(100) + if ev == nil { + continue + } + switch e := ev.(type) { + case *kafka.Message: + atomic.AddInt64(&rateRecs, 1) + atomic.AddInt64(&rateBytes, int64(len(e.Value))) + case kafka.Error: + die("consume error: %v", err) + } + } + } +} + +func newValue(num int64) []byte { + var buf [20]byte // max int64 takes 19 bytes, then we add a space + b := strconv.AppendInt(buf[:0], num, 10) + b = append(b, ' ') + + s := make([]byte, *recordBytes) + + var n int + for n != len(s) { + n += copy(s[n:], b) + } + return s +} diff --git a/examples/bench/compare/sarama/go.mod b/examples/bench/compare/sarama/go.mod new file mode 100644 index 00000000..6d574500 --- /dev/null +++ b/examples/bench/compare/sarama/go.mod @@ -0,0 +1,10 @@ +module saram + +go 1.16 + +require ( + github.com/Shopify/sarama v1.29.0 + github.com/klauspost/compress v1.12.3 // indirect + golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a // indirect + golang.org/x/net v0.0.0-20210525063256-abc453219eb5 // indirect +) diff --git a/examples/bench/compare/sarama/go.sum b/examples/bench/compare/sarama/go.sum new file mode 100644 index 00000000..982a33a7 --- /dev/null +++ b/examples/bench/compare/sarama/go.sum @@ -0,0 +1,89 @@ +github.com/Shopify/sarama v1.29.0 h1:ARid8o8oieau9XrHI55f/L3EoRAhm9px6sonbD7yuUE= +github.com/Shopify/sarama v1.29.0/go.mod h1:2QpgD79wpdAESqNQMxNc0KYMkycd4slxGdV3TWSVqrU= +github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc= +github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eapache/go-resiliency v1.2.0 h1:v7g92e/KSN71Rq7vSThKaWIq68fL4YHvWyiUKorFR1Q= +github.com/eapache/go-resiliency v1.2.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= +github.com/frankban/quicktest v1.11.3 h1:8sXhOn0uLys67V8EsXLc6eszDs8VXWxL3iRvebPhedY= +github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k= +github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA= +github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M= +github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= +github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= +github.com/hashicorp/go-uuid v1.0.2 h1:cfejS+Tpcp13yd5nYHWDI6qVCny6wyX2Mt5SGur2IGE= +github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= +github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= +github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= +github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= +github.com/jcmturner/gofork v1.0.0 h1:J7uCkflzTEhUZ64xqKnkDxq3kzc96ajM1Gli5ktUem8= +github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= +github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o= +github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= +github.com/jcmturner/gokrb5/v8 v8.4.2 h1:6ZIM6b/JJN0X8UM43ZOM6Z4SJzla+a/u7scXFJzodkA= +github.com/jcmturner/gokrb5/v8 v8.4.2/go.mod h1:sb+Xq/fTY5yktf/VxLsE3wlfPqQjp0aWNYyvBVK62bc= +github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= +github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= +github.com/klauspost/compress v1.12.2/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= +github.com/klauspost/compress v1.12.3 h1:G5AfA94pHPysR56qqrkO2pxEexdDzrpFJ6yt/VqWxVU= +github.com/klauspost/compress v1.12.3/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= +github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/pierrec/lz4 v2.6.0+incompatible h1:Ix9yFKn1nSPBLFl/yZknTp8TU5G4Ps0JDmguYK6iH1A= +github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/xdg/scram v1.0.3/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= +github.com/xdg/stringprep v1.0.3/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= +golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a h1:kr2P4QFmQr29mSLA43kwrOcgcReGTfbE9N577tCTuBc= +golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20210427231257-85d9c07bbe3a/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= +golang.org/x/net v0.0.0-20210525063256-abc453219eb5 h1:wjuX4b5yYQnEQHzd+CBcrcC6OVR2J1CN6mUy0oSxIPo= +golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/examples/bench/compare/sarama/main.go b/examples/bench/compare/sarama/main.go new file mode 100644 index 00000000..f6b8909f --- /dev/null +++ b/examples/bench/compare/sarama/main.go @@ -0,0 +1,181 @@ +package main + +import ( + "context" + "flag" + "fmt" + "os" + "strconv" + "strings" + "sync/atomic" + "time" + + "github.com/Shopify/sarama" +) + +var ( + seedBrokers = flag.String("brokers", "localhost:9092", "comma delimited list of seed brokers") + topic = flag.String("topic", "", "topic to produce to or consume from") + + recordBytes = flag.Int("record-bytes", 100, "bytes per record (producing)") + noCompression = flag.Bool("no-compression", false, "set to disable snappy compression (producing)") + + consume = flag.Bool("consume", false, "if true, consume rather than produce") + group = flag.String("group", "", "if non-empty, group to use for consuming rather than direct partition consuming (consuming)") + + dialTLS = flag.Bool("tls", false, "if true, use tls for connecting") + + saslMethod = flag.String("sasl-method", "", "if non-empty, sasl method to use (must specify all options; supports plain, scram-sha-256, scram-sha-512)") + saslUser = flag.String("sasl-user", "", "if non-empty, username to use for sasl (must specify all options)") + saslPass = flag.String("sasl-pass", "", "if non-empty, password to use for sasl (must specify all options)") + + rateRecs int64 + rateBytes int64 +) + +func printRate() { + go func() { + for range time.Tick(time.Second) { + recs := atomic.SwapInt64(&rateRecs, 0) + bytes := atomic.SwapInt64(&rateBytes, 0) + fmt.Printf("%0.2f MiB/s; %0.2fk records/s\n", float64(bytes)/(1024*1024), float64(recs)/1000) + } + }() +} + +func die(msg string, args ...interface{}) { + fmt.Fprintf(os.Stderr, msg+"\n", args...) + os.Exit(1) +} + +func chk(err error, msg string, args ...interface{}) { + if err != nil { + die(msg, args...) + } +} + +func main() { + flag.Parse() + + brokers := strings.Split(*seedBrokers, ",") + + go printRate() + + cfg := sarama.NewConfig() + cfg.Version = sarama.V2_8_0_0 + if *dialTLS { + cfg.Net.TLS.Enable = true + } + if *saslMethod != "" || *saslUser != "" || *saslPass != "" { + if *saslMethod == "" || *saslUser == "" || *saslPass == "" { + die("all of -sasl-method, -sasl-user, -sasl-pass must be specified if any are") + } + method := strings.ToLower(*saslMethod) + method = strings.ReplaceAll(method, "-", "") + method = strings.ReplaceAll(method, "_", "") + cfg.Net.SASL.Enable = true + cfg.Net.SASL.Version = 1 + cfg.Net.SASL.User = *saslUser + cfg.Net.SASL.Password = *saslPass + switch method { + case "plain", + "scramsha256", + "scramsha512": + default: + die("unrecognized sasl option %s", *saslMethod) + } + } + + switch *consume { + case false: + cfg.Producer.RequiredAcks = sarama.WaitForAll + cfg.Producer.Compression = sarama.CompressionSnappy + cfg.Producer.Return.Successes = true + + p, err := sarama.NewAsyncProducer(brokers, cfg) + chk(err, "unable to create producer: %v", err) + + go func() { + for err := range p.Errors() { + die("produce error: %v", err) + } + }() + go func() { + for range p.Successes() { + atomic.AddInt64(&rateRecs, 1) + atomic.AddInt64(&rateBytes, int64(*recordBytes)) + } + }() + + var num int64 + for { + p.Input() <- &sarama.ProducerMessage{Topic: *topic, Value: sarama.ByteEncoder(newValue(num))} + num++ + } + + case true: + cfg.Consumer.Return.Errors = true + cfg.Consumer.Offsets.Initial = sarama.OffsetOldest + + if *group == "" { + c, err := sarama.NewConsumer(brokers, cfg) + chk(err, "unable to create consumer: %v", err) + ps, err := c.Partitions(*topic) + chk(err, "unable to get partitions for topic %s: %v", *topic, err) + + for _, p := range ps { + go func(p int32) { + pc, err := c.ConsumePartition(*topic, p, sarama.OffsetOldest) + chk(err, "unable to consume topic %s partition %d: %v", *topic, p, err) + for { + msg := <-pc.Messages() + atomic.AddInt64(&rateRecs, 1) + atomic.AddInt64(&rateBytes, int64(len(msg.Value))) + } + }(p) + } + select {} + } + + g, err := sarama.NewConsumerGroup(brokers, *group, cfg) + chk(err, "unable to create group consumer: %v", err) + + go func() { + for err := range g.Errors() { + chk(err, "consumer group error: %v", err) + } + }() + + for { + err := g.Consume(context.Background(), []string{*topic}, new(consumer)) + chk(err, "consumer group err: %v", err) + } + } +} + +func newValue(num int64) []byte { + var buf [20]byte // max int64 takes 19 bytes, then we add a space + b := strconv.AppendInt(buf[:0], num, 10) + b = append(b, ' ') + + s := make([]byte, *recordBytes) + + var n int + for n != len(s) { + n += copy(s[n:], b) + } + return s +} + +type consumer struct{} + +func (*consumer) Setup(sarama.ConsumerGroupSession) error { return nil } +func (*consumer) Cleanup(sarama.ConsumerGroupSession) error { return nil } +func (*consumer) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + for msg := range claim.Messages() { + atomic.AddInt64(&rateRecs, 1) + atomic.AddInt64(&rateBytes, int64(len(msg.Value))) + sess.MarkMessage(msg, "") + } + return nil +} diff --git a/examples/bench/main.go b/examples/bench/main.go index 044e11c7..4876138a 100644 --- a/examples/bench/main.go +++ b/examples/bench/main.go @@ -75,6 +75,9 @@ func main() { kgo.ProduceTopic(*topic), kgo.MaxBufferedRecords(250<<20 / *recordBytes + 1), kgo.AllowedConcurrentFetches(3), + // We have good compression, so we want to limit what we read + // back because snappy deflation will balloon our memory usage. + kgo.FetchMaxBytes(5 << 20), } if *noCompression { opts = append(opts, kgo.BatchCompression(kgo.NoCompression())) @@ -110,6 +113,8 @@ func main() { AccessKey: *saslUser, SecretKey: *saslPass, }.AsManagedStreamingIAMMechanism())) + default: + die("unrecognized sasl option %s", *saslMethod) } } @@ -129,13 +134,13 @@ func main() { case false: var num int64 for { - atomic.AddInt64(&rateRecs, 1) - atomic.AddInt64(&rateBytes, int64(*recordBytes)) cl.Produce(context.Background(), newRecord(num), func(r *kgo.Record, err error) { if *poolProduce { p.Put(r) } chk(err, "produce error: %v", err) + atomic.AddInt64(&rateRecs, 1) + atomic.AddInt64(&rateBytes, int64(*recordBytes)) }) num++ }