From 56fcfb45b882802ce103d25b228f4e0282a6e4eb Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Mon, 28 Nov 2022 21:29:54 -0700 Subject: [PATCH] sink: log all aspects of wanting to / failing records For #239, this will make debugging easier. --- pkg/kgo/sink.go | 32 +++++++++++++++++++++----------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/pkg/kgo/sink.go b/pkg/kgo/sink.go index 0f0db0aa..db150e31 100644 --- a/pkg/kgo/sink.go +++ b/pkg/kgo/sink.go @@ -1255,22 +1255,32 @@ func (recBuf *recBuf) bumpRepeatedLoadErr(err error) { if len(recBuf.batches) == 0 { return } - recBuf.cl.cfg.logger.Log(LogLevelWarn, "produce partition load error, bumping error count on first stored batch", "broker", logID(recBuf.sink.nodeID), "topic", recBuf.topic, "partition", recBuf.partition, "err", err) batch0 := recBuf.batches[0] batch0.tries++ - canFail := !recBuf.cl.idempotent() || batch0.canFailFromLoadErrs - if !canFail { - return - } - - batch0Fail := batch0.maybeFailErr(&recBuf.cl.cfg) != nil // timeout, retries, or aborting + var ( + canFail = !recBuf.cl.idempotent() || batch0.canFailFromLoadErrs // we can only fail if we are not idempotent or if we have no outstanding requests + batch0Fail = batch0.maybeFailErr(&recBuf.cl.cfg) != nil // timeout, retries, or aborting + netErr = isRetriableBrokerErr(err) || isDialErr(err) // we can fail if this is *not* a network error + retriableKerr = kerr.IsRetriable(err) // we fail if this is not a retriable kerr, + isUnknownLimit = recBuf.checkUnknownFailLimit(err) // or if it is, but it is UnknownTopicOrPartition and we are at our limit - okNet := !isRetriableBrokerErr(err) && !isDialErr(err) // we can fail if this is *not* a network error - retriableKerr := kerr.IsRetriable(err) // we fail if this is not a retriable kerr, - isUnknownLimit := recBuf.checkUnknownFailLimit(err) // or if it is, but it is UnknownTopicOrPartition and we are at our limit + willFail = canFail && (batch0Fail || !netErr && (!retriableKerr || retriableKerr && isUnknownLimit)) + ) - if batch0Fail || okNet && (!retriableKerr || retriableKerr && isUnknownLimit) { + recBuf.cl.cfg.logger.Log(LogLevelWarn, "produce partition load error, bumping error count on first stored batch", + "broker", logID(recBuf.sink.nodeID), + "topic", recBuf.topic, + "partition", recBuf.partition, + "err", err, + "can_fail", canFail, + "batch0_should_fail", batch0Fail, + "is_network_err", netErr, + "is_retriable_kerr", retriableKerr, + "is_unknown_limit", isUnknownLimit, + "will_fail", willFail, + ) + if willFail { recBuf.failAllRecords(err) } }