Skip to content

Commit

Permalink
kgo: fix and add log line
Browse files Browse the repository at this point in the history
* Fix metadata log line: we needed to print the epoch before we updated
  it
* Add log line to sink to indicate if we are going to update metadata on
  batch failures
  • Loading branch information
twmb committed Aug 30, 2022
1 parent eb2e62d commit 5f24fae
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 4 deletions.
7 changes: 3 additions & 4 deletions pkg/kgo/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -674,16 +674,15 @@ func (cl *Client) mergeTopicPartitions(
// agree things rewound.
const maxEpochRewinds = 5
if oldTP.epochRewinds < maxEpochRewinds {
*newTP = *oldTP
newTP.epochRewinds++

cl.cfg.logger.Log(LogLevelDebug, "metadata leader epoch went backwards, ignoring update",
"topic", topic,
"partition", part,
"old_leader_epoch", oldTP.leaderEpoch,
"new_leader_epoch", newTP.leaderEpoch,
"current_num_rewinds", newTP.epochRewinds,
"current_num_rewinds", oldTP.epochRewinds+1,
)
*newTP = *oldTP
newTP.epochRewinds++
retryWhy.add(topic, int32(part), errEpochRewind)
continue
}
Expand Down
16 changes: 16 additions & 0 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -900,10 +900,18 @@ func (s *sink) handleRetryBatches(
canFail bool, // if records can fail if they are at limits
why string,
) {
logger := s.cl.cfg.logger
debug := logger.Level() >= LogLevelDebug
var needsMetaUpdate bool
var shouldBackoff bool
retry.eachOwnerLocked(func(batch seqRecBatch) {
if !batch.isOwnersFirstBatch() {
if debug {
logger.Log(LogLevelDebug, "retry batch is not the first batch in the owner, skipping result",
"topic", batch.owner.topic,
"partition", batch.owner.partition,
)
}
return
}

Expand Down Expand Up @@ -932,6 +940,14 @@ func (s *sink) handleRetryBatches(
}
})

if debug {
logger.Log(LogLevelDebug, "retry batches processed",
"wanted_metadata_update", updateMeta,
"triggering_metadata_update", needsMetaUpdate,
"should_backoff", shouldBackoff,
)
}

// If we do want to metadata update, we only do so if any batch was the
// first batch in its buf / not concurrently failed.
if needsMetaUpdate {
Expand Down

0 comments on commit 5f24fae

Please sign in to comment.