Skip to content

Commit

Permalink
Txn integration tests: rely on KIP-447
Browse files Browse the repository at this point in the history
  • Loading branch information
edenhill committed Feb 26, 2021
1 parent 00156ec commit a6e35f9
Showing 1 changed file with 14 additions and 21 deletions.
35 changes: 14 additions & 21 deletions kafka/txn_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,34 +297,27 @@ func TestTransactionalSendOffsets(t *testing.T) {
}
}

// Create consumer (to read committed offsets) prior to closing the
// consumer to trigger the race condition where the transaction is
// not fully committed by the time consumer.Committed() is called.
// Prior to KIP-447 this would result in the committed offsets not
// showing up, but with KIP-447 the consumer automatically retries
// the offset retrieval.
t.Logf("Creating consumer for (later) offset retrieval\n")
consumer, err := NewConsumer(consumerConfig)
if err != nil {
t.Fatalf("Failed to create Consumer client: %s\n", err)
}

// Close producer
// signal go-routine to finish
close(termChan)
// wait for go-routine to finish
_ = <-doneChan

producer.Close()

// Until KIP-447 is implemented we need to call
// InitTransactions() after transactions are committed to make
// sure the committed offsets are made available to consumers.
producer2, err := NewProducer(config)
if err != nil {
t.Fatalf("Failed to create Producer client: %s\n", err)
}

err = producer2.InitTransactions(nil)
if err != nil {
t.Fatalf("InitTransactions() failed: %v\n", err)
}

producer2.Close()

// Read committed offsets.
consumer, err := NewConsumer(consumerConfig)
if err != nil {
t.Fatalf("Failed to create Consumer client: %s\n", err)
}

t.Logf("Retrieving committed offsets\n")
committed, err := consumer.Committed(partitions, -1)
if err != nil {
t.Fatalf("Failed to get committed offsets: %s\n", err)
Expand Down

0 comments on commit a6e35f9

Please sign in to comment.