Skip to content

Commit

Permalink
fix EOS example
Browse files Browse the repository at this point in the history
  • Loading branch information
twmb committed May 11, 2021
1 parent 87178ce commit 20837ec
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 16 deletions.
32 changes: 32 additions & 0 deletions examples/transactions/eos/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
Transactional producing, EOS consuming/producing
===

This contains an example that will produce 10 records in succession as a part
of a single transaction. The producer will commit the batch, and the next batch
it will abort, and this will flip indefinitely.

The EOS consumer/producer will consume only the committed produced records,
prefix them with "eos ", and then produce them to another topic.

Two flags are required: `-produce-to` and `-eos-to`.

This program outputs a bunch of debug output, but does not output anything
produced or consumed. Use `kcl` to see the final created committed offsets.

## Flags

`-brokers` can be specified to override the default localhost:9092 broker to
any comma delimited set of brokers.

`-produce-to` specifies which topic for the input transactional producer to
produce to.

`-eos-to` specifies which topic the EOS consumer/producer will produce to.

`-group` specifies which group will be used for the EOS consumer.

`-produce-txn-id` specifies which transactional ID should be used for the
transactional producer.

`-consume-txn-id` specifies which transactional ID should be used for the EOS
consumer/producer.
33 changes: 17 additions & 16 deletions examples/transactions/eos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"os"
"strconv"
"strings"
"sync/atomic"
"time"

"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo"
Expand Down Expand Up @@ -38,6 +38,8 @@ func main() {

go inputProducer()
go eosConsumer()

select {}
}

func inputProducer() {
Expand All @@ -55,7 +57,7 @@ func inputProducer() {

ctx := context.Background()

for flip := true; ; flip = !flip {
for doCommit := true; ; doCommit = !doCommit {
if err := cl.BeginTransaction(); err != nil {
// We are unable to start a transaction if the client
// is not transactional or if we are already in a
Expand All @@ -64,22 +66,22 @@ func inputProducer() {
die("unable to start transaction: %v", err)
}

var haveFirstErr int64
var firstErr error
msg := "commit "
if !doCommit {
msg = "abort "
}

var e kgo.FirstErrPromise
for i := 0; i < 10; i++ {
cl.Produce(ctx, kgo.StringRecord(strconv.Itoa(i)), func(_ *kgo.Record, err error) {
if err != nil && atomic.SwapInt64(&haveFirstErr, 1) == 0 {
firstErr = err
}
})
cl.Produce(ctx, kgo.StringRecord(msg+strconv.Itoa(i)), e.Promise)
}

if err := cl.Flush(ctx); err != nil {
die("Flush only returns error if the context is canceled: %v", err)
}

switch err := cl.EndTransaction(ctx, firstErr == nil); err {
commit := kgo.TransactionEndTry(doCommit && e.Err() == nil)

switch err := cl.EndTransaction(ctx, commit); err {
case nil:
case kerr.OperationNotAttempted:
if err := cl.EndTransaction(ctx, kgo.TryAbort); err != nil {
Expand All @@ -88,14 +90,17 @@ func inputProducer() {
default:
die("commit failed: %v", err)
}

time.Sleep(10 * time.Second)
}
}

func eosConsumer() {
cl, err := kgo.NewClient(
kgo.SeedBrokers(strings.Split(*seedBrokers, ",")...),
kgo.ProduceTopic(*produceTo),
kgo.ProduceTopic(*eosTo),
kgo.TransactionalID(*consumeTxnID),
kgo.FetchIsolationLevel(kgo.ReadCommitted()),
kgo.WithLogger(kgo.BasicLogger(os.Stderr, kgo.LogLevelInfo, func() string {
return "[eos consumer] "
})),
Expand Down Expand Up @@ -138,10 +143,6 @@ func eosConsumer() {
fetches.EachRecord(func(r *kgo.Record) {
sess.Produce(ctx, kgo.StringRecord("eos "+string(r.Value)), e.Promise)
})
if err := cl.Flush(ctx); err != nil {
die("Flush only returns error if the context is canceled: %v", err)
}

committed, err := sess.End(ctx, e.Err() == nil)

if committed {
Expand Down

0 comments on commit 20837ec

Please sign in to comment.