From c3fc8e03e6f462f16727da99951b32d847477d21 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Tue, 1 Mar 2022 11:53:42 -0700 Subject: [PATCH] goroutine per partition: add two more examples One example uses AutoCommitMarks and BlockPollOnRebalance, showing how much easier these options make things. The last example uses manual committing with BlockPollOnRebalance. --- .../README.md | 57 +++++- .../{ => autocommit_marks}/go.mod | 2 +- .../{ => autocommit_marks}/go.sum | 0 .../autocommit_marks/main.go | 176 ++++++++++++++++++ .../autocommit_normal/go.mod | 14 ++ .../autocommit_normal}/go.sum | 13 +- .../{ => autocommit_normal}/main.go | 3 + .../manual_commit}/go.mod | 5 +- .../manual_commit/go.sum | 47 +++++ .../manual_commit/main.go | 170 +++++++++++++++++ .../README.md | 23 --- .../main.go | 161 ---------------- 12 files changed, 473 insertions(+), 198 deletions(-) rename examples/goroutine_per_partition_consuming/{ => autocommit_marks}/go.mod (87%) rename examples/goroutine_per_partition_consuming/{ => autocommit_marks}/go.sum (100%) create mode 100644 examples/goroutine_per_partition_consuming/autocommit_marks/main.go create mode 100644 examples/goroutine_per_partition_consuming/autocommit_normal/go.mod rename examples/{goroutine_per_partition_consuming_manual_commit => goroutine_per_partition_consuming/autocommit_normal}/go.sum (83%) rename examples/goroutine_per_partition_consuming/{ => autocommit_normal}/main.go (95%) rename examples/{goroutine_per_partition_consuming_manual_commit => goroutine_per_partition_consuming/manual_commit}/go.mod (66%) create mode 100644 examples/goroutine_per_partition_consuming/manual_commit/go.sum create mode 100644 examples/goroutine_per_partition_consuming/manual_commit/main.go delete mode 100644 examples/goroutine_per_partition_consuming_manual_commit/README.md delete mode 100644 examples/goroutine_per_partition_consuming_manual_commit/main.go diff --git a/examples/goroutine_per_partition_consuming/README.md b/examples/goroutine_per_partition_consuming/README.md index b1302ed7..3f50e935 100644 --- a/examples/goroutine_per_partition_consuming/README.md +++ b/examples/goroutine_per_partition_consuming/README.md @@ -1,18 +1,65 @@ Group consuming, using a goroutine per partition === -This example consumes from a group and starts a goroutine to process each +This directory contains three examples that demonstrate different ways to +have per-partition processing as a group consumer. Because each file is +invoked the same way, this one readme serves all three examples. + +These examples consume from a group and start a goroutine to process each partition concurrently. This type of code may be useful if processing each record per partition is slow, such that processing records in a single `PollFetches` loop is not as fast as you want it to be. -This is just one example of how to process messages concurrently. A simpler -solution would be just to have a group of record consumers selecting from a -channel, and to send all records down this channel in your `PollFetches` loop. -However, that simple solution does not preserve per-partition ordering. +A simpler solution would be to have a pool of goroutines selecting from a +channel and then sending all records from your `PollFetches` loop down this +channel. However, the simple solution does not preserve per-partition ordering. + +## Auto committing + +The autocommitting example is the simplest, but is the most prone to duplicate +consuming due to rebalances. This solution consumes and processes each +partition individually, but does nothing about a behind-the-scenes rebalance. +If a rebalance happens after records are sent to the partition goroutines, +those partition goroutines will process records for partitions that may have +been lost. + +## Auto committing marks + +This example adds a few things to the simpler auto-committing example. First, +we switch to `BlockRebalanceOnPoll` and uses some locking to avoid rebalances +while the partition goroutines are processing, and we switch to +`AutoCommitMarks` to have more control over what will actually be committed. +This example uses `CommitUncommittedOffsets` at the end of being revoked to +ensure that marked records are committed before revoking is allowed to +continue. Lastly, we use `EachPartition` rather than `EachTopic` to avoid the +internal allocations that `EachTopic` may do. + +Blocking rebalance while polling allows for a lot of simplifications in +comparison to plain autocommitting. Compare the differences: we worry less +about whether partition consumers have gone away, and we are more sure of what +is actually happening. These simplifications are commented within the file. + +The main downside with `BlockRebalanceOnPoll` is that your application is more +at risk of blocking the rebalance so long that the member is booted from the +group. You must ensure that your goroutine workers are fast enough to not block +rebalancing for all of `RebalanceTimeout`. + +## Manually commit + +This example is a small extension of the autocommit marks example: rather than +marking records for commit and forcing a commit when revoked, we issue a +synchronous commit in each partition consumer whenever a partition batch is +processed. + +This example will have more blocking commits, but has even tighter guarantees +around what is committed when. Because this also uses `BlockRebalanceOnPoll`, +like above, you must ensure that your partition processing is fast enough to +not block a rebalance too long. ## Flags +The flags in each example are the same: + `-b` can be specified to override the default localhost:9092 broker to any comma delimited set of brokers. diff --git a/examples/goroutine_per_partition_consuming/go.mod b/examples/goroutine_per_partition_consuming/autocommit_marks/go.mod similarity index 87% rename from examples/goroutine_per_partition_consuming/go.mod rename to examples/goroutine_per_partition_consuming/autocommit_marks/go.mod index 95aebd66..a1d8b2cf 100644 --- a/examples/goroutine_per_partition_consuming/go.mod +++ b/examples/goroutine_per_partition_consuming/autocommit_marks/go.mod @@ -2,7 +2,7 @@ module goroutine_per_partition_consuming go 1.17 -replace github.com/twmb/franz-go => ../../ +replace github.com/twmb/franz-go => ../../../ require github.com/twmb/franz-go v1.0.0 diff --git a/examples/goroutine_per_partition_consuming/go.sum b/examples/goroutine_per_partition_consuming/autocommit_marks/go.sum similarity index 100% rename from examples/goroutine_per_partition_consuming/go.sum rename to examples/goroutine_per_partition_consuming/autocommit_marks/go.sum diff --git a/examples/goroutine_per_partition_consuming/autocommit_marks/main.go b/examples/goroutine_per_partition_consuming/autocommit_marks/main.go new file mode 100644 index 00000000..290fbc78 --- /dev/null +++ b/examples/goroutine_per_partition_consuming/autocommit_marks/main.go @@ -0,0 +1,176 @@ +package main + +import ( + "context" + "flag" + "fmt" + "math/rand" + "strings" + "sync" + "time" + + "github.com/twmb/franz-go/pkg/kgo" +) + +var ( + brokers = flag.String("b", "", "comma delimited brokers to consume from") + topic = flag.String("t", "", "topic to consume") + group = flag.String("g", "", "group to consume in") +) + +type tp struct { + t string + p int32 +} + +type pconsumer struct { + cl *kgo.Client + topic string + partition int32 + + quit chan struct{} + done chan struct{} + recs chan kgo.FetchTopicPartition +} + +type splitConsume struct { + // Using BlockRebalanceOnCommit means we do not need a mu to manage + // consumers, unlike the autocommit normal example. + consumers map[tp]*pconsumer +} + +func (pc *pconsumer) consume() { + defer close(pc.done) + fmt.Printf("starting, t %s p %d\n", pc.topic, pc.partition) + defer fmt.Printf("killing, t %s p %d\n", pc.topic, pc.partition) + for { + select { + case <-pc.quit: + return + case p := <-pc.recs: + time.Sleep(time.Duration(rand.Intn(150)+100) * time.Millisecond) // simulate work + fmt.Printf("Some sort of work done, about to commit t %s p %d\n", pc.topic, pc.partition) + pc.cl.MarkCommitRecords(p.Records...) + } + } +} + +func (s *splitConsume) assigned(_ context.Context, cl *kgo.Client, assigned map[string][]int32) { + for topic, partitions := range assigned { + for _, partition := range partitions { + pc := &pconsumer{ + cl: cl, + topic: topic, + partition: partition, + + quit: make(chan struct{}), + done: make(chan struct{}), + recs: make(chan kgo.FetchTopicPartition, 5), + } + s.consumers[tp{topic, partition}] = pc + go pc.consume() + } + } +} + +func (s *splitConsume) revoked(ctx context.Context, cl *kgo.Client, revoked map[string][]int32) { + s.killConsumers(revoked) + if err := cl.CommitUncommittedOffsets(ctx); err != nil { + fmt.Printf("Revoke commit failed: %v\n", err) + } +} + +func (s *splitConsume) lost(_ context.Context, cl *kgo.Client, lost map[string][]int32) { + s.killConsumers(lost) + // Losing means we cannot commit: an error happened. +} + +func (s *splitConsume) killConsumers(lost map[string][]int32) { + var wg sync.WaitGroup + defer wg.Wait() + + for topic, partitions := range lost { + for _, partition := range partitions { + tp := tp{topic, partition} + pc := s.consumers[tp] + delete(s.consumers, tp) + close(pc.quit) + fmt.Printf("waiting for work to finish t %s p %d\n", topic, partition) + wg.Add(1) + go func() { <-pc.done; wg.Done() }() + } + } +} + +func main() { + flag.Parse() + rand.Seed(time.Now().Unix()) + + if len(*group) == 0 { + fmt.Println("missing required group") + return + } + if len(*topic) == 0 { + fmt.Println("missing required topic") + return + } + + s := &splitConsume{ + consumers: make(map[tp]*pconsumer), + } + + opts := []kgo.Opt{ + kgo.SeedBrokers(strings.Split(*brokers, ",")...), + kgo.ConsumerGroup(*group), + kgo.ConsumeTopics(*topic), + kgo.OnPartitionsAssigned(s.assigned), + kgo.OnPartitionsRevoked(s.lost), + kgo.OnPartitionsLost(s.lost), + kgo.AutoCommitMarks(), + kgo.BlockRebalanceOnPoll(), + } + + cl, err := kgo.NewClient(opts...) + if err != nil { + panic(err) + } + if err = cl.Ping(context.Background()); err != nil { // check connectivity to cluster + panic(err) + } + + s.poll(cl) +} + +func (s *splitConsume) poll(cl *kgo.Client) { + for { + // PollRecords is strongly recommended when using + // BlockRebalanceOnPoll. You can tune how many records to + // process at once (upper bound -- could all be on one + // partition), ensuring that your processor loops complete fast + // enough to not block a rebalance too long. + fetches := cl.PollRecords(context.Background(), 10000) + if fetches.IsClientClosed() { + return + } + fetches.EachError(func(_ string, _ int32, err error) { + // Note: you can delete this block, which will result + // in these errors being sent to the partition + // consumers, and then you can handle the errors there. + panic(err) + }) + fetches.EachPartition(func(p kgo.FetchTopicPartition) { + tp := tp{p.Topic, p.Partition} + + // Since we are using BlockRebalanceOnPoll, we can be + // sure this partition consumer exists: + // + // * onAssigned is guaranteed to be called before we + // fetch offsets for newly added partitions + // + // * onRevoked waits for partition consumers to quit + // and be deleted before re-allowing polling. + s.consumers[tp].recs <- p + }) + cl.AllowRebalance() + } +} diff --git a/examples/goroutine_per_partition_consuming/autocommit_normal/go.mod b/examples/goroutine_per_partition_consuming/autocommit_normal/go.mod new file mode 100644 index 00000000..a1d8b2cf --- /dev/null +++ b/examples/goroutine_per_partition_consuming/autocommit_normal/go.mod @@ -0,0 +1,14 @@ +module goroutine_per_partition_consuming + +go 1.17 + +replace github.com/twmb/franz-go => ../../../ + +require github.com/twmb/franz-go v1.0.0 + +require ( + github.com/klauspost/compress v1.14.4 // indirect + github.com/pierrec/lz4/v4 v4.1.14 // indirect + github.com/twmb/franz-go/pkg/kmsg v0.0.0-20220222044056-99b4da42cf4b // indirect + github.com/twmb/go-rbtree v1.0.0 // indirect +) diff --git a/examples/goroutine_per_partition_consuming_manual_commit/go.sum b/examples/goroutine_per_partition_consuming/autocommit_normal/go.sum similarity index 83% rename from examples/goroutine_per_partition_consuming_manual_commit/go.sum rename to examples/goroutine_per_partition_consuming/autocommit_normal/go.sum index 29b47ebf..31f19b62 100644 --- a/examples/goroutine_per_partition_consuming_manual_commit/go.sum +++ b/examples/goroutine_per_partition_consuming/autocommit_normal/go.sum @@ -1,6 +1,6 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/google/go-cmp v0.3.1 h1:Xye71clBPdm5HgqGwUkwhbynsUJZhDbS20FvLhQ2izg= -github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o= +github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= @@ -24,21 +24,24 @@ github.com/twmb/go-rbtree v1.0.0 h1:KxN7dXJ8XaZ4cvmHV1qqXTshxX3EBvX/toG5+UR49Mg= github.com/twmb/go-rbtree v1.0.0/go.mod h1:UlIAI8gu3KRPkXSobZnmJfVwCJgEhD/liWzT5ppzIyc= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871 h1:/pEO3GD/ABYAjuakUS6xSEmmlyVS4kxBNkeA9tLJiTI= -golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.0.0-20211123203042-d83791d6bcd9/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/examples/goroutine_per_partition_consuming/main.go b/examples/goroutine_per_partition_consuming/autocommit_normal/main.go similarity index 95% rename from examples/goroutine_per_partition_consuming/main.go rename to examples/goroutine_per_partition_consuming/autocommit_normal/main.go index 1511620a..fbb9dd0a 100644 --- a/examples/goroutine_per_partition_consuming/main.go +++ b/examples/goroutine_per_partition_consuming/autocommit_normal/main.go @@ -151,6 +151,9 @@ func (s *splitConsume) poll(cl *kgo.Client) { return } fetches.EachError(func(_ string, _ int32, err error) { + // Note: you can delete this block, which will result + // in these errors being sent to the partition + // consumers, and then you can handle the errors there. panic(err) }) fetches.EachTopic(func(t kgo.FetchTopic) { diff --git a/examples/goroutine_per_partition_consuming_manual_commit/go.mod b/examples/goroutine_per_partition_consuming/manual_commit/go.mod similarity index 66% rename from examples/goroutine_per_partition_consuming_manual_commit/go.mod rename to examples/goroutine_per_partition_consuming/manual_commit/go.mod index 178b7e21..7087c52f 100644 --- a/examples/goroutine_per_partition_consuming_manual_commit/go.mod +++ b/examples/goroutine_per_partition_consuming/manual_commit/go.mod @@ -2,14 +2,13 @@ module goroutine_per_partition_consuming_manual_commit go 1.17 -replace github.com/twmb/franz-go => ../../ +replace github.com/twmb/franz-go => ../../../ -require github.com/twmb/franz-go v1.3.4 +require github.com/twmb/franz-go v1.3.5 require ( github.com/klauspost/compress v1.14.4 // indirect github.com/pierrec/lz4/v4 v4.1.14 // indirect github.com/twmb/franz-go/pkg/kmsg v0.0.0-20220222044056-99b4da42cf4b // indirect github.com/twmb/go-rbtree v1.0.0 // indirect - golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871 // indirect ) diff --git a/examples/goroutine_per_partition_consuming/manual_commit/go.sum b/examples/goroutine_per_partition_consuming/manual_commit/go.sum new file mode 100644 index 00000000..31f19b62 --- /dev/null +++ b/examples/goroutine_per_partition_consuming/manual_commit/go.sum @@ -0,0 +1,47 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o= +github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= +github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= +github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= +github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= +github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= +github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= +github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= +github.com/jcmturner/gokrb5/v8 v8.4.2/go.mod h1:sb+Xq/fTY5yktf/VxLsE3wlfPqQjp0aWNYyvBVK62bc= +github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= +github.com/klauspost/compress v1.14.4 h1:eijASRJcobkVtSt81Olfh7JX43osYLwy5krOJo6YEu4= +github.com/klauspost/compress v1.14.4/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/pierrec/lz4/v4 v4.1.14 h1:+fL8AQEZtz/ijeNnpduH0bROTu0O3NZAlPjQxGn8LwE= +github.com/pierrec/lz4/v4 v4.1.14/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/twmb/franz-go/pkg/kmsg v0.0.0-20220222044056-99b4da42cf4b h1:K7ICUMoexUPk3aFQOgydoeRS2nCZvTcKMm93lkGDGPc= +github.com/twmb/franz-go/pkg/kmsg v0.0.0-20220222044056-99b4da42cf4b/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY= +github.com/twmb/go-rbtree v1.0.0 h1:KxN7dXJ8XaZ4cvmHV1qqXTshxX3EBvX/toG5+UR49Mg= +github.com/twmb/go-rbtree v1.0.0/go.mod h1:UlIAI8gu3KRPkXSobZnmJfVwCJgEhD/liWzT5ppzIyc= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/examples/goroutine_per_partition_consuming/manual_commit/main.go b/examples/goroutine_per_partition_consuming/manual_commit/main.go new file mode 100644 index 00000000..90f3cdd2 --- /dev/null +++ b/examples/goroutine_per_partition_consuming/manual_commit/main.go @@ -0,0 +1,170 @@ +package main + +import ( + "context" + "flag" + "fmt" + "math/rand" + "strings" + "sync" + "time" + + "github.com/twmb/franz-go/pkg/kgo" +) + +var ( + brokers = flag.String("b", "", "comma delimited brokers to consume from") + topic = flag.String("t", "", "topic to consume") + group = flag.String("g", "", "group to consume in") +) + +type tp struct { + t string + p int32 +} + +type pconsumer struct { + cl *kgo.Client + topic string + partition int32 + + quit chan struct{} + done chan struct{} + recs chan []*kgo.Record +} + +type splitConsume struct { + // Using BlockRebalanceOnCommit means we do not need a mu to manage + // consumers, unlike the autocommit normal example. + consumers map[tp]*pconsumer +} + +func (pc *pconsumer) consume() { + defer close(pc.done) + fmt.Printf("Starting consume for t %s p %d\n", pc.topic, pc.partition) + defer fmt.Printf("Closing consume for t %s p %d\n", pc.topic, pc.partition) + for { + select { + case <-pc.quit: + return + case recs := <-pc.recs: + time.Sleep(time.Duration(rand.Intn(150)+100) * time.Millisecond) // simulate work + fmt.Printf("Some sort of work done, about to commit t %s p %d\n", pc.topic, pc.partition) + err := pc.cl.CommitRecords(context.Background(), recs...) + if err != nil { + fmt.Printf("Error when committing offsets to kafka err: %v t: %s p: %d offset %d\n", err, pc.topic, pc.partition, recs[len(recs)-1].Offset+1) + } + } + } +} + +func (s *splitConsume) assigned(_ context.Context, cl *kgo.Client, assigned map[string][]int32) { + for topic, partitions := range assigned { + for _, partition := range partitions { + pc := &pconsumer{ + cl: cl, + topic: topic, + partition: partition, + + quit: make(chan struct{}), + done: make(chan struct{}), + recs: make(chan []*kgo.Record, 5), + } + s.consumers[tp{topic, partition}] = pc + go pc.consume() + } + } +} + +// In this example, each partition consumer commits itself. Those commits will +// fail if partitions are lost, but will succeed if partitions are revoked. We +// only need one revoked or lost function (and we name it "lost"). +func (s *splitConsume) lost(_ context.Context, cl *kgo.Client, lost map[string][]int32) { + var wg sync.WaitGroup + defer wg.Wait() + + for topic, partitions := range lost { + for _, partition := range partitions { + tp := tp{topic, partition} + pc := s.consumers[tp] + delete(s.consumers, tp) + close(pc.quit) + fmt.Printf("waiting for work to finish t %s p %d\n", topic, partition) + wg.Add(1) + go func() { <-pc.done; wg.Done() }() + } + } +} + +func main() { + rand.Seed(time.Now().Unix()) + flag.Parse() + + if len(*group) == 0 { + fmt.Println("missing required group") + return + } + if len(*topic) == 0 { + fmt.Println("missing required topic") + return + } + + s := &splitConsume{ + consumers: make(map[tp]*pconsumer), + } + + opts := []kgo.Opt{ + kgo.SeedBrokers(strings.Split(*brokers, ",")...), + kgo.ConsumerGroup(*group), + kgo.ConsumeTopics(*topic), + kgo.OnPartitionsAssigned(s.assigned), + kgo.OnPartitionsRevoked(s.lost), + kgo.OnPartitionsLost(s.lost), + kgo.DisableAutoCommit(), + kgo.BlockRebalanceOnPoll(), + } + + cl, err := kgo.NewClient(opts...) + if err != nil { + panic(err) + } + if err = cl.Ping(context.Background()); err != nil { // check connectivity to cluster + panic(err) + } + + s.poll(cl) +} + +func (s *splitConsume) poll(cl *kgo.Client) { + for { + // PollRecords is strongly recommended when using + // BlockRebalanceOnPoll. You can tune how many records to + // process at once (upper bound -- could all be on one + // partition), ensuring that your processor loops complete fast + // enough to not block a rebalance too long. + fetches := cl.PollRecords(context.Background(), 10000) + if fetches.IsClientClosed() { + return + } + fetches.EachError(func(_ string, _ int32, err error) { + // Note: you can delete this block, which will result + // in these errors being sent to the partition + // consumers, and then you can handle the errors there. + panic(err) + }) + fetches.EachPartition(func(p kgo.FetchTopicPartition) { + tp := tp{p.Topic, p.Partition} + + // Since we are using BlockRebalanceOnPoll, we can be + // sure this partition consumer exists: + // + // * onAssigned is guaranteed to be called before we + // fetch offsets for newly added partitions + // + // * onRevoked waits for partition consumers to quit + // and be deleted before re-allowing polling. + s.consumers[tp].recs <- p.Records + }) + cl.AllowRebalance() + } +} diff --git a/examples/goroutine_per_partition_consuming_manual_commit/README.md b/examples/goroutine_per_partition_consuming_manual_commit/README.md deleted file mode 100644 index d4b9c76b..00000000 --- a/examples/goroutine_per_partition_consuming_manual_commit/README.md +++ /dev/null @@ -1,23 +0,0 @@ -Group consuming, using a goroutine per partition and manual committing -=== - -This example consumes from a group and starts a goroutine to process each -partition concurrently, and committing manually after doing some work, on the slice of records. -This type of code may be useful if processing each -record per partition is slow, such that processing records in a single -`PollRecords` loop is not as fast as you want it to be. - -This is just one example of how to process messages concurrently. A simpler -solution would be just to have a group of record consumers selecting from a -channel, and to send all records down this channel in your `PollRecords` loop. -However, that simple solution does not preserve per-partition ordering. - -## Flags - -`-b` can be specified to override the default localhost:9092 broker to any -comma delimited set of brokers. - -`-t` specifies the topic to consume (required) - -`-g` specifies the group to consume in (required) - diff --git a/examples/goroutine_per_partition_consuming_manual_commit/main.go b/examples/goroutine_per_partition_consuming_manual_commit/main.go deleted file mode 100644 index f700069e..00000000 --- a/examples/goroutine_per_partition_consuming_manual_commit/main.go +++ /dev/null @@ -1,161 +0,0 @@ -package main - -import ( - "context" - "flag" - "fmt" - "math/rand" - "strings" - "sync" - "time" - - "github.com/twmb/franz-go/pkg/kgo" -) - -type pconsumer struct { - quit chan struct{} - done chan struct{} - recs chan []*kgo.Record -} - -var ( - brokers = flag.String("b", "", "comma delimited brokers to consume from") - topic = flag.String("t", "", "topic to consume") - group = flag.String("g", "", "group to consume in") -) - -func (pc *pconsumer) consume(topic string, partition int32, cl *kgo.Client) { - fmt.Printf("Starting consume for t %s p %d\n", topic, partition) - for { - select { - case <-pc.quit: - pc.done <- struct{}{} - fmt.Printf("Closing consume for t %s p %d\n", topic, partition) - return - case recs := <-pc.recs: - // Mimick work to happen before committing records - time.Sleep(time.Duration(rand.Intn(150)+100) * time.Millisecond) - fmt.Printf("Some sort of work done, about to commit t %s p %d\n", topic, partition) - err := cl.CommitRecords(context.Background(), recs...) - if err != nil { - fmt.Printf("Error when committing offsets to kafka err: %v t: %s p: %d offset %d\n", err, topic, partition, recs[len(recs)-1].Offset+1) - } - } - } -} - -type splitConsume struct { - mu sync.Mutex // gaurds assigning / losing vs. polling - consumers map[string]map[int32]pconsumer -} - -func (s *splitConsume) assigned(_ context.Context, cl *kgo.Client, assigned map[string][]int32) { - s.mu.Lock() - defer s.mu.Unlock() - for topic, partitions := range assigned { - if s.consumers[topic] == nil { - s.consumers[topic] = make(map[int32]pconsumer) - } - for _, partition := range partitions { - pc := pconsumer{ - quit: make(chan struct{}), - done: make(chan struct{}), - recs: make(chan []*kgo.Record), - } - s.consumers[topic][partition] = pc - go pc.consume(topic, partition, cl) - } - } -} - -func (s *splitConsume) lost(_ context.Context, cl *kgo.Client, lost map[string][]int32) { - s.mu.Lock() - defer s.mu.Unlock() - for topic, partitions := range lost { - ptopics := s.consumers[topic] - for _, partition := range partitions { - pc := ptopics[partition] - delete(ptopics, partition) - if len(ptopics) == 0 { - delete(s.consumers, topic) - } - close(pc.quit) - fmt.Printf("Waiting for work to finish t %s p %d\n", topic, partition) - <-pc.done - } - } -} - -func main() { - rand.Seed(time.Now().Unix()) - flag.Parse() - - if len(*group) == 0 { - fmt.Println("missing required group") - return - } - if len(*topic) == 0 { - fmt.Println("missing required topic") - return - } - - s := &splitConsume{ - consumers: make(map[string]map[int32]pconsumer), - } - - opts := []kgo.Opt{ - kgo.SeedBrokers(strings.Split(*brokers, ",")...), - kgo.ConsumerGroup(*group), - kgo.ConsumeTopics(*topic), - kgo.OnPartitionsAssigned(s.assigned), - kgo.OnPartitionsRevoked(s.lost), - kgo.OnPartitionsLost(s.lost), - kgo.DisableAutoCommit(), - kgo.BlockRebalanceOnPoll(), - } - - cl, err := kgo.NewClient(opts...) - if err != nil { - panic(err) - } - // Check connectivity to cluster - err = cl.Ping(context.Background()) - if err != nil { - panic(err) - } - - s.poll(cl) -} - -func (s *splitConsume) poll(cl *kgo.Client) { - for { - fetches := cl.PollRecords(context.Background(), 10000) - if fetches.IsClientClosed() { - return - } - fetches.EachError(func(_ string, _ int32, err error) { - panic(err) - }) - fetches.EachTopic(func(t kgo.FetchTopic) { - s.mu.Lock() - tconsumers := s.consumers[t.Topic] - s.mu.Unlock() - if tconsumers == nil { - return - } - t.EachPartition(func(p kgo.FetchPartition) { - pc, ok := tconsumers[p.Partition] - if !ok { - return - } - select { - case pc.recs <- p.Records: - case <-pc.quit: - } - }) - }) - s.mu.Lock() - cl.AllowRebalance() - s.mu.Unlock() - } -}