From 5f24faef09e40ce653a182cbe0c381737a6d2bbf Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Tue, 30 Aug 2022 12:16:18 -0600 Subject: [PATCH] kgo: fix and add log line * Fix metadata log line: we needed to print the epoch before we updated it * Add log line to sink to indicate if we are going to update metadata on batch failures --- pkg/kgo/metadata.go | 7 +++---- pkg/kgo/sink.go | 16 ++++++++++++++++ 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/pkg/kgo/metadata.go b/pkg/kgo/metadata.go index d9dabf54..ea0825dc 100644 --- a/pkg/kgo/metadata.go +++ b/pkg/kgo/metadata.go @@ -674,16 +674,15 @@ func (cl *Client) mergeTopicPartitions( // agree things rewound. const maxEpochRewinds = 5 if oldTP.epochRewinds < maxEpochRewinds { - *newTP = *oldTP - newTP.epochRewinds++ - cl.cfg.logger.Log(LogLevelDebug, "metadata leader epoch went backwards, ignoring update", "topic", topic, "partition", part, "old_leader_epoch", oldTP.leaderEpoch, "new_leader_epoch", newTP.leaderEpoch, - "current_num_rewinds", newTP.epochRewinds, + "current_num_rewinds", oldTP.epochRewinds+1, ) + *newTP = *oldTP + newTP.epochRewinds++ retryWhy.add(topic, int32(part), errEpochRewind) continue } diff --git a/pkg/kgo/sink.go b/pkg/kgo/sink.go index 391667ce..c8c42cf8 100644 --- a/pkg/kgo/sink.go +++ b/pkg/kgo/sink.go @@ -900,10 +900,18 @@ func (s *sink) handleRetryBatches( canFail bool, // if records can fail if they are at limits why string, ) { + logger := s.cl.cfg.logger + debug := logger.Level() >= LogLevelDebug var needsMetaUpdate bool var shouldBackoff bool retry.eachOwnerLocked(func(batch seqRecBatch) { if !batch.isOwnersFirstBatch() { + if debug { + logger.Log(LogLevelDebug, "retry batch is not the first batch in the owner, skipping result", + "topic", batch.owner.topic, + "partition", batch.owner.partition, + ) + } return } @@ -932,6 +940,14 @@ func (s *sink) handleRetryBatches( } }) + if debug { + logger.Log(LogLevelDebug, "retry batches processed", + "wanted_metadata_update", updateMeta, + "triggering_metadata_update", needsMetaUpdate, + "should_backoff", shouldBackoff, + ) + } + // If we do want to metadata update, we only do so if any batch was the // first batch in its buf / not concurrently failed. if needsMetaUpdate {