Skip to content

Commit

Permalink
fix: try and avoid partition changes
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed May 25, 2021
1 parent 2031cd1 commit 980c741
Showing 1 changed file with 5 additions and 4 deletions.
9 changes: 5 additions & 4 deletions runner/sidecar/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,12 +364,13 @@ func connectSources(ctx context.Context, toMain func([]byte) error) error {
})
if i == 0 && replica == 0 {
go wait.JitterUntil(func() {
if handler.offset > 0 {
nextOffset, err := client.GetOffset(x.Topic, handler.partition, sarama.OffsetNewest)
offset, partition := handler.offset, handler.partition // copy these variables, so changes are not a problem
if offset > 0 {
nextOffset, err := client.GetOffset(x.Topic, partition, sarama.OffsetNewest)
if err != nil {
logger.Error(err, "failed to get offset", "source", sourceName)
} else if pending := nextOffset - 1 - handler.offset; pending >= 0 {
logger.Info("setting pending", "source", sourceName, "pending", pending, "nextOffset", nextOffset, "handlerOffset", handler.offset)
} else if pending := nextOffset - 1 - offset; pending >= 0 {
logger.Info("setting pending", "source", sourceName, "pending", pending, "nextOffset", nextOffset, "handlerOffset", offset)
withLock(func() { status.SourceStatuses.SetPending(sourceName, uint64(pending)) })
}
}
Expand Down

0 comments on commit 980c741

Please sign in to comment.