diff --git a/pkg/kgo/sink.go b/pkg/kgo/sink.go index 97952df4..0f0db0aa 100644 --- a/pkg/kgo/sink.go +++ b/pkg/kgo/sink.go @@ -329,6 +329,7 @@ func (s *sink) produce(sem <-chan struct{}) bool { case isRetriableBrokerErr(err) || isDialErr(err): s.cl.bumpRepeatedLoadErr(err) s.cl.cfg.logger.Log(LogLevelWarn, "unable to AddPartitionsToTxn due to retriable broker err, bumping client's buffered record load errors by 1 and retrying", "err", err) + s.cl.triggerUpdateMetadata(false, "attempting to refresh transaction coordinator due to failed AddPartitionsToTxn requests") return moreToDrain || len(req.batches) > 0 // nothing stripped if request-issuing error default: // Note that err can be InvalidProducerEpoch, which is