Skip to content

Commit bb99686

Browse files
fix: Use separate variable to track the consume offset
This commit updates reader_service.go to use a separate variable to track the consume offset instead of mutating the last committed offset. I found this makes the code easier to follow as the last committed offset does not change.
1 parent 5d5affa commit bb99686

File tree

1 file changed

+6
-4
lines changed

1 file changed

+6
-4
lines changed

pkg/kafka/partition/reader_service.go

+6-4
Original file line numberDiff line numberDiff line change
@@ -143,14 +143,16 @@ func (s *ReaderService) starting(ctx context.Context) error {
143143

144144
if lastCommittedOffset == int64(KafkaEndOffset) {
145145
level.Warn(s.logger).Log("msg", fmt.Sprintf("no committed offset found, starting from %d", kafkaStartOffset))
146-
lastCommittedOffset = int64(KafkaStartOffset)
146+
} else {
147+
level.Debug(r.logger).Log("msg", "last committed offset", "offset", lastCommittedOffset)
147148
}
148149

150+
consumeOffset := kafkaStartOffset
149151
if lastCommittedOffset >= 0 {
150-
lastCommittedOffset++ // We want to begin to read from the next offset, but only if we've previously committed an offset.
152+
// Read from the next offset.
153+
consumeOffset = lastCommittedOffset + 1
151154
}
152-
153-
s.reader.SetOffsetForConsumption(lastCommittedOffset)
155+
s.reader.SetOffsetForConsumption(consumeOffset)
154156

155157
if targetLag, maxLag := s.cfg.TargetConsumerLagAtStartup, s.cfg.MaxConsumerLagAtStartup; targetLag > 0 && maxLag > 0 {
156158
consumer, err := s.consumerFactory(s.committer)

0 commit comments

Comments
 (0)