diff --git a/pkg/kgo/metadata.go b/pkg/kgo/metadata.go index 4da29471..987e471e 100644 --- a/pkg/kgo/metadata.go +++ b/pkg/kgo/metadata.go @@ -151,8 +151,8 @@ func (cl *Client) updateMetadataLoop() { } } - again, err, why := cl.updateMetadata() - if again || err != nil { + retryWhy, err := cl.updateMetadata() + if retryWhy != nil || err != nil { // If err is non-nil, the metadata request failed // itself and already retried 3x; we do not loop more. // @@ -180,7 +180,7 @@ func (cl *Client) updateMetadataLoop() { if err != nil { cl.triggerUpdateMetadata(true, fmt.Sprintf("re-updating metadata due to err: %s", err)) } else { - cl.triggerUpdateMetadata(true, why.reason("re-updating due to inner errors")) + cl.triggerUpdateMetadata(true, retryWhy.reason("re-updating due to inner errors")) } } if err == nil { @@ -207,7 +207,7 @@ func (cl *Client) updateMetadataLoop() { // The producer and consumer use different topic maps and underlying // topicPartitionsData pointers, but we update those underlying pointers // equally. -func (cl *Client) updateMetadata() (needsRetry bool, err error, why multiUpdateWhy) { +func (cl *Client) updateMetadata() (retryWhy multiUpdateWhy, err error) { defer cl.metawait.signal() defer cl.consumer.doOnMetadataUpdate() @@ -247,7 +247,7 @@ func (cl *Client) updateMetadata() (needsRetry bool, err error, why multiUpdateW tpsProducerLoad, err, ) - return true, err, nil + return nil, err } // If we are consuming with regex and fetched all topics, the metadata @@ -305,14 +305,14 @@ func (cl *Client) updateMetadata() (needsRetry bool, err error, why multiUpdateW } continue } - needsRetry = needsRetry || cl.mergeTopicPartitions( + cl.mergeTopicPartitions( topic, priorParts, newParts, m.isProduce, &reloadOffsets, stopConsumerSession, - &why, + &retryWhy, ) } } @@ -324,7 +324,7 @@ func (cl *Client) updateMetadata() (needsRetry bool, err error, why multiUpdateW ) } - return needsRetry, nil, why + return retryWhy, nil } // fetchTopicMetadata fetches metadata for all reqTopics and returns new @@ -481,8 +481,8 @@ func (cl *Client) mergeTopicPartitions( isProduce bool, reloadOffsets *listOrEpochLoads, stopConsumerSession func(), - why *multiUpdateWhy, -) (needsRetry bool) { + retryWhy *multiUpdateWhy, +) { lv := *l.load() // copy so our field writes do not collide with reads // Producers must store the update through a special function that @@ -508,8 +508,8 @@ func (cl *Client) mergeTopicPartitions( topicPartition.records.bumpRepeatedLoadErr(lv.loadErr) } } - why.add(topic, -1, r.loadErr) - return true + retryWhy.add(topic, -1, r.loadErr) + return } // Before the atomic update, we keep the latest partitions / writable @@ -572,8 +572,7 @@ func (cl *Client) mergeTopicPartitions( if isProduce { newTP.records.bumpRepeatedLoadErr(newTP.loadErr) } - needsRetry = true - why.add(topic, int32(part), newTP.loadErr) + retryWhy.add(topic, int32(part), newTP.loadErr) continue } @@ -581,16 +580,39 @@ func (cl *Client) mergeTopicPartitions( // fetched from an out of date broker. We just keep the old // information. if newTP.leaderEpoch < oldTP.leaderEpoch { - *newTP = *oldTP + // If we repeatedly rewind, then perhaps the cluster + // entered some bad state and lost forward progress. + // We will log & allow the rewind to allow the client + // to continue; other requests may encounter fenced + // epoch errors (and respectively recover). + // + // Five is a pretty low amount of retries, but since + // we iterate through known brokers, this basically + // means we keep stale metadata if five brokers all + // agree things rewound. + const maxEpochRewinds = 5 + if oldTP.epochRewinds < maxEpochRewinds { + *newTP = *oldTP + newTP.epochRewinds++ + + cl.cfg.logger.Log(LogLevelDebug, "metadata leader epoch went backwards, ignoring update", + "topic", topic, + "partition", part, + "old_leader_epoch", oldTP.leaderEpoch, + "new_leader_epoch", newTP.leaderEpoch, + "current_num_rewinds", newTP.epochRewinds, + ) + retryWhy.add(topic, int32(part), errEpochRewind) + continue + } - cl.cfg.logger.Log(LogLevelDebug, "metadata leader epoch went backwards, ignoring update", + cl.cfg.logger.Log(LogLevelInfo, "metadata leader epoch went backwards repeatedly, we are now keeping the metadata to allow forward progress", "topic", topic, "partition", part, - "old_epoch", oldTP.leaderEpoch, - "new_epoch", newTP.leaderEpoch, + "old_leader_epoch", oldTP.leaderEpoch, + "new_leader_epoch", newTP.leaderEpoch, ) - continue } // If the tp data is the same, we simply copy over the records @@ -641,9 +663,10 @@ func (cl *Client) mergeTopicPartitions( newTP.cursor.source.addCursor(newTP.cursor) } } - return needsRetry } +var errEpochRewind = errors.New("epoch rewind") + type multiUpdateWhy map[kerrOrString]map[string]map[int32]struct{} type kerrOrString struct { diff --git a/pkg/kgo/topics_and_partitions.go b/pkg/kgo/topics_and_partitions.go index 18dd0c92..e6931e4e 100644 --- a/pkg/kgo/topics_and_partitions.go +++ b/pkg/kgo/topics_and_partitions.go @@ -354,6 +354,15 @@ type topicPartition struct { // keep the old topicPartition data and the new error. loadErr error + // If, on metadata refresh, the leader epoch for this partition goes + // backwards, we ignore the metadata refresh and signal the metadata + // should be reloaded: the broker we requested is stale. However, the + // broker could get into a bad state through some weird cluster failure + // scenarios. If we see the epoch rewind repeatedly, we eventually keep + // the metadata refresh. This is not detrimental and at worst will lead + // to the broker telling us to update our metadata. + epochRewinds uint8 + // If we do not have a load error, we determine if the new // topicPartition is the same or different from the old based on // whether the data changed (leader or leader epoch, etc.).