Skip to content

Commit

Permalink
producer: ensure that the management message (fin) is never "leaked" (#…
Browse files Browse the repository at this point in the history
…2182)

* producer: ensure that the management message (fin) is never "leaked"

Since async producer now support multiple inflight messages
thanks to #1686 and
#2094, it now may "leak"
the "fin" internal management message to Kafka (and to the client)
when broker producer is reconnecting to Kafka broker and retries
multiple inflight messages at the same time.

* test:async-producer: test broker restart (this fixes #2150)

* tests:async-producer: disable logger in TestAsyncProducerBrokerRestart

* tests:async-producer: protect leader with a mutex to make race detector happy

* test:async-producer: set 5mn default finish timeout

* async-producer: do not clear bp.currentRetries when fin message is received just after syn

* async-producer: use debug logger when fin message is handled for a healthy brokerProducer

* test:async-producer:restart: make emptyValues atomic to avoid races

* test:async-producer:restart: rename produceRequestTest to countRecordsWithEmptyValue

* test:async-producer:restart: reduce retry backoff timeout to speed up the test

* test:async-producer:restart: remove bogus 0
  • Loading branch information
niamster authored Mar 30, 2022
1 parent f07b7b8 commit d5f076b
Show file tree
Hide file tree
Showing 2 changed files with 162 additions and 11 deletions.
8 changes: 8 additions & 0 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,14 @@ func (bp *brokerProducer) run() {
continue
}

if msg.flags&fin == fin {
// New broker producer that was caught up by the retry loop
bp.parent.retryMessage(msg, ErrShuttingDown)
DebugLogger.Printf("producer/broker/%d state change to [dying-%d] on %s/%d\n",
bp.broker.ID(), msg.retries, msg.Topic, msg.Partition)
continue
}

if bp.buffer.wouldOverflow(msg) {
Logger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID())
if err := bp.waitForSpace(msg, false); err != nil {
Expand Down
165 changes: 154 additions & 11 deletions async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,67 @@ import (

const TestMessage = "ABC THE MESSAGE"

func closeProducer(t *testing.T, p AsyncProducer) {
func closeProducerWithTimeout(t *testing.T, p AsyncProducer, timeout time.Duration) {
var wg sync.WaitGroup
p.AsyncClose()

closer := make(chan struct{})
timer := time.AfterFunc(timeout, func() {
t.Error("timeout")
close(closer)
})
defer timer.Stop()

wg.Add(2)
go func() {
for range p.Successes() {
t.Error("Unexpected message on Successes()")
defer wg.Done()
for {
select {
case <-closer:
return
case _, ok := <-p.Successes():
if !ok {
return
}
t.Error("Unexpected message on Successes()")
}
}
wg.Done()
}()
go func() {
for msg := range p.Errors() {
t.Error(msg.Err)
defer wg.Done()
for {
select {
case <-closer:
return
case msg, ok := <-p.Errors():
if !ok {
return
}
t.Error(msg.Err)
}
}
wg.Done()
}()
wg.Wait()
}

func expectResults(t *testing.T, p AsyncProducer, successes, errors int) {
func closeProducer(t *testing.T, p AsyncProducer) {
closeProducerWithTimeout(t, p, 5*time.Minute)
}

func expectResultsWithTimeout(t *testing.T, p AsyncProducer, successes, errors int, timeout time.Duration) {
t.Helper()
expect := successes + errors
defer func() {
if successes != 0 || errors != 0 {
t.Error("Unexpected successes", successes, "or errors", errors)
}
}()
timer := time.NewTimer(timeout)
defer timer.Stop()
for expect > 0 {
select {
case <-timer.C:
return
case msg := <-p.Errors():
if msg.Msg.flags != 0 {
t.Error("Message had flags set")
Expand All @@ -62,9 +98,10 @@ func expectResults(t *testing.T, p AsyncProducer, successes, errors int) {
}
}
}
if successes != 0 || errors != 0 {
t.Error("Unexpected successes", successes, "or errors", errors)
}
}

func expectResults(t *testing.T, p AsyncProducer, successes, errors int) {
expectResultsWithTimeout(t, p, successes, errors, 5*time.Minute)
}

type testPartitioner chan *int32
Expand Down Expand Up @@ -693,6 +730,112 @@ func TestAsyncProducerMultipleRetriesWithConcurrentRequests(t *testing.T) {
closeProducer(t, producer)
}

func TestAsyncProducerBrokerRestart(t *testing.T) {
// Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)

seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 2)

var leaderLock sync.Mutex

// The seed broker only handles Metadata request
seedBroker.setHandler(func(req *request) (res encoderWithHeader) {
leaderLock.Lock()
defer leaderLock.Unlock()
metadataLeader := new(MetadataResponse)
metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
return metadataLeader
})

var emptyValues int32 = 0

countRecordsWithEmptyValue := func(req *request) {
preq := req.body.(*ProduceRequest)
if batch := preq.records["my_topic"][0].RecordBatch; batch != nil {
for _, record := range batch.Records {
if len(record.Value) == 0 {
atomic.AddInt32(&emptyValues, 1)
}
}
}
if batch := preq.records["my_topic"][0].MsgSet; batch != nil {
for _, record := range batch.Messages {
if len(record.Msg.Value) == 0 {
atomic.AddInt32(&emptyValues, 1)
}
}
}
}

leader.setHandler(func(req *request) (res encoderWithHeader) {
countRecordsWithEmptyValue(req)

time.Sleep(50 * time.Millisecond)

prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
return prodSuccess
})

config := NewTestConfig()
config.Producer.Retry.Backoff = 250 * time.Millisecond
config.Producer.Flush.MaxMessages = 1
config.Producer.Return.Errors = true
config.Producer.Return.Successes = true
config.Producer.Retry.Max = 10

producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

var wg sync.WaitGroup

pushMsg := func() {
defer wg.Done()
for i := 0; i < 10; i++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
time.Sleep(50 * time.Millisecond)
}
}

wg.Add(1)
go pushMsg()

for i := 0; i < 3; i++ {
time.Sleep(100 * time.Millisecond)

wg.Add(1)
go pushMsg()
}

leader.Close()
leaderLock.Lock()
leader = NewMockBroker(t, 2)
leaderLock.Unlock()
leader.setHandler(func(req *request) (res encoderWithHeader) {
countRecordsWithEmptyValue(req)

prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
return prodSuccess
})

wg.Wait()

expectResultsWithTimeout(t, producer, 40, 0, 10*time.Second)

seedBroker.Close()
leader.Close()

closeProducerWithTimeout(t, producer, 5*time.Second)

if emptyValues := atomic.LoadInt32(&emptyValues); emptyValues > 0 {
t.Fatalf("%d empty values", emptyValues)
}
}

func TestAsyncProducerOutOfRetries(t *testing.T) {
t.Skip("Enable once bug #294 is fixed.")

Expand Down

0 comments on commit d5f076b

Please sign in to comment.