Skip to content

Commit

Permalink
Fix handling of invalid base offsets
Browse files Browse the repository at this point in the history
  • Loading branch information
rodaine committed Oct 22, 2024
1 parent cea7aa5 commit 2eed36e
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 2 deletions.
8 changes: 7 additions & 1 deletion pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,13 @@ start:
p.promisesMu.Lock()
for i, pr := range b.recs {
pr.LeaderEpoch = 0
pr.Offset = b.baseOffset + int64(i)
if b.baseOffset == -1 {
// if the base offset is invalid/unknown (-1), all record offsets should
// be treated as unknown
pr.Offset = -1
} else {
pr.Offset = b.baseOffset + int64(i)
}
pr.Partition = b.partition
pr.ProducerID = b.pid
pr.ProducerEpoch = b.epoch
Expand Down
6 changes: 5 additions & 1 deletion pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -1773,7 +1773,11 @@ func recordToRecord(
ProducerID: batch.ProducerID,
ProducerEpoch: batch.ProducerEpoch,
LeaderEpoch: batch.PartitionLeaderEpoch,
Offset: batch.FirstOffset + int64(record.OffsetDelta),
}
if batch.FirstOffset == -1 {
r.Offset = -1
} else {
r.Offset = batch.FirstOffset + int64(record.OffsetDelta)
}
if r.Attrs.TimestampType() == 0 {
r.Timestamp = timeFromMillis(batch.FirstTimestamp + record.TimestampDelta64)
Expand Down

0 comments on commit 2eed36e

Please sign in to comment.