Skip to content

Commit c7c08fb

Browse files
committed
txnal producer: work around KAFKA-12671, take 2
Rather than ever aborting anything when we could potentially be unsafe, we will work with the logic we added to ensure we do not have broken sequence numbers: only if we can abort, do we. Secondly, we now wait for everything to be "flushed" (with the caveat that now we can abort when flushing). These two in combination should avoid KAFKA-12671.
1 parent c4481a5 commit c7c08fb

File tree

3 files changed

+29
-83
lines changed

3 files changed

+29
-83
lines changed

Diff for: pkg/kgo/producer.go

-4
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ type producer struct {
3434
flushing int32 // >0 if flushing, can Flush many times concurrently
3535

3636
aborting uint32 // 1 means yes
37-
workers int32 // number of sinks draining / number of in flight produce requests
3837

3938
idMu sync.Mutex
4039
idVersion int16
@@ -66,9 +65,6 @@ func (p *producer) init() {
6665
p.notifyCond = sync.NewCond(&p.notifyMu)
6766
}
6867

69-
func (p *producer) incWorkers() { atomic.AddInt32(&p.workers, 1) }
70-
func (p *producer) decWorkers() { p.decAbortNotify(&p.workers) }
71-
7268
func (p *producer) decAbortNotify(v *int32) {
7369
if atomic.AddInt32(v, -1) != 0 || atomic.LoadUint32(&p.aborting) == 0 {
7470
return

Diff for: pkg/kgo/sink.go

+5-30
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,6 @@ type sink struct {
3838
needBackoff bool
3939
backoffSeq uint32 // prevents pile on failures
4040

41-
// To work around KAFKA-12671, before we issue EndTxn, we check to see
42-
// that all sinks had a final successful response. If not, then we risk
43-
// running into KAFKA-12671 (out of order processing leading to
44-
// orphaned begun "transaction" in ProducerStateManager), so rather
45-
// than issuing EndTxn immediately, we wait a little bit.
46-
lastRespSuccessful bool
47-
4841
// consecutiveFailures is incremented every backoff and cleared every
4942
// successful response. For simplicity, if we have a good response
5043
// following an error response before the error response's backoff
@@ -65,10 +58,9 @@ type seqResp struct {
6558

6659
func (cl *Client) newSink(nodeID int32) *sink {
6760
s := &sink{
68-
cl: cl,
69-
nodeID: nodeID,
70-
produceVersion: -1,
71-
lastRespSuccessful: true,
61+
cl: cl,
62+
nodeID: nodeID,
63+
produceVersion: -1,
7264
}
7365
s.inflightSem.Store(make(chan struct{}, 1))
7466
return s
@@ -231,16 +223,8 @@ func (s *sink) drain() {
231223
time.Sleep(5 * time.Millisecond)
232224
}
233225

234-
s.cl.producer.incWorkers()
235-
defer s.cl.producer.decWorkers()
236-
237226
again := true
238227
for again {
239-
if s.cl.producer.isAborting() {
240-
s.drainState.hardFinish()
241-
return
242-
}
243-
244228
s.maybeBackoff()
245229

246230
sem := s.inflightSem.Load().(chan struct{})
@@ -343,20 +327,9 @@ func (s *sink) produce(sem <-chan struct{}) bool {
343327

344328
req.backoffSeq = s.backoffSeq // safe to read outside mu since we are in drain loop
345329

346-
// Add that we are working and then check if we are aborting: this
347-
// order ensures that we will do not produce after aborting is set.
348-
p := &s.cl.producer
349-
p.incWorkers()
350-
if p.isAborting() {
351-
p.decWorkers()
352-
return false
353-
}
354-
355330
produced = true
356331
s.doSequenced(req, func(resp kmsg.Response, err error) {
357-
s.lastRespSuccessful = err == nil
358332
s.handleReqResp(req, resp, err)
359-
p.decWorkers()
360333
<-sem
361334
})
362335
return moreToDrain
@@ -1236,6 +1209,8 @@ func (b *recBatch) maybeFailErr(cfg *cfg) error {
12361209
return errRecordTimeout
12371210
} else if b.tries >= cfg.produceRetries {
12381211
return errRecordRetries
1212+
} else if b.owner.cl.producer.isAborting() {
1213+
return ErrAborting
12391214
}
12401215
return nil
12411216
}

Diff for: pkg/kgo/txn.go

+24-49
Original file line numberDiff line numberDiff line change
@@ -164,37 +164,9 @@ func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry
164164
return false, err // we do not abort below, because an error here is ctx closing
165165
}
166166
case TryAbort:
167-
// If we have no buffered records, there is no need to abort
168-
// buffered records and we can avoid resetting our producer ID.
169-
if atomic.LoadInt64(&s.cl.producer.bufferedRecords) == 0 {
170-
break
171-
}
172-
173167
if err := s.cl.AbortBufferedRecords(ctx); err != nil {
174168
return false, err // same
175169
}
176-
defer s.cl.ResetProducerID()
177-
178-
allOk := true
179-
s.cl.sinksAndSourcesMu.Lock()
180-
for _, sns := range s.cl.sinksAndSources {
181-
allOk = allOk && sns.sink.lastRespSuccessful
182-
}
183-
s.cl.sinksAndSourcesMu.Unlock()
184-
185-
if !allOk {
186-
s.cl.cfg.logger.Log(LogLevelWarn, "Buffered records were aborted, but some sink(s) did not have a final handled produce response. Kafka could still be handling these produce requests or have yet to handle them. We do not want to issue EndTxn before these produce requests are handled, because that would risk beginning a new transaction that we may not finish. Waiting 1s to give Kafka some time... (See KAFKA-12671)")
187-
timer := time.NewTimer(time.Second)
188-
select {
189-
case <-timer.C:
190-
case <-s.cl.ctx.Done():
191-
timer.Stop()
192-
return false, s.cl.ctx.Err()
193-
case <-ctx.Done():
194-
timer.Stop()
195-
return false, ctx.Err()
196-
}
197-
}
198170
}
199171

200172
wantCommit := bool(commit)
@@ -326,48 +298,51 @@ func (cl *Client) BeginTransaction() error {
326298
// Records produced during or after a call to this function may not be failed,
327299
// thus it is incorrect to concurrently produce with this function.
328300
func (cl *Client) AbortBufferedRecords(ctx context.Context) error {
329-
atomic.StoreUint32(&cl.producer.aborting, 1)
330-
defer atomic.StoreUint32(&cl.producer.aborting, 0)
331-
atomic.AddInt32(&cl.producer.flushing, 1) // disallow lingering to start
332-
defer atomic.AddInt32(&cl.producer.flushing, -1)
301+
p := &cl.producer
302+
303+
atomic.StoreUint32(&p.aborting, 1)
304+
defer atomic.StoreUint32(&p.aborting, 0)
305+
atomic.AddInt32(&p.flushing, 1) // disallow lingering to start
306+
defer atomic.AddInt32(&p.flushing, -1)
333307
// At this point, all drain loops that start will immediately stop,
334308
// thus they will not begin any AddPartitionsToTxn request. We must
335309
// now wait for any req currently built to be done being issued.
336310

337311
cl.cfg.logger.Log(LogLevelInfo, "aborting buffered records")
338312
defer cl.cfg.logger.Log(LogLevelDebug, "aborted buffered records")
339313

340-
cl.failBufferedRecords(ErrAborting)
314+
// Similar to flushing, we unlinger; nothing will start a linger because
315+
// the flushing atomic is non-zero.
316+
if cl.cfg.linger > 0 || cl.cfg.manualFlushing {
317+
for _, parts := range p.topics.load() {
318+
for _, part := range parts.load().partitions {
319+
part.records.unlingerAndManuallyDrain()
320+
}
321+
}
322+
}
341323

342-
// Now, we wait for any active drain to stop. We must wait for all
343-
// workers to stop otherwise we could end up with some exceptionally
344-
// weird scenario where we end a txn and begin a new one before a
345-
// prior AddPartitionsToTxn request that was built is issued.
346-
//
347-
// By waiting for our workers count to hit 0, we know that at that
348-
// point, no new AddPartitionsToTxn request will be sent.
324+
// We have to wait for all buffered records to either be flushed
325+
// or to safely abort themselves.
349326
quit := false
350327
done := make(chan struct{})
351328
go func() {
352-
cl.producer.notifyMu.Lock()
353-
defer cl.producer.notifyMu.Unlock()
329+
p.notifyMu.Lock()
330+
defer p.notifyMu.Unlock()
354331
defer close(done)
355332

356-
for !quit && atomic.LoadInt32(&cl.producer.workers) > 0 {
357-
cl.producer.notifyCond.Wait()
333+
for !quit && atomic.LoadInt64(&p.bufferedRecords) > 0 {
334+
p.notifyCond.Wait()
358335
}
359336
}()
360337

361338
select {
362339
case <-done:
363-
// All records were failed above, and all workers are stopped.
364-
// We are safe to return.
365340
return nil
366341
case <-ctx.Done():
367-
cl.producer.notifyMu.Lock()
342+
p.notifyMu.Lock()
368343
quit = true
369-
cl.producer.notifyMu.Unlock()
370-
cl.producer.notifyCond.Broadcast()
344+
p.notifyMu.Unlock()
345+
p.notifyCond.Broadcast()
371346
return ctx.Err()
372347
}
373348
}

0 commit comments

Comments
 (0)