diff --git a/runner/sidecar/sidecar.go b/runner/sidecar/sidecar.go index 95f440e7..2609b0e4 100644 --- a/runner/sidecar/sidecar.go +++ b/runner/sidecar/sidecar.go @@ -364,12 +364,13 @@ func connectSources(ctx context.Context, toMain func([]byte) error) error { }) if i == 0 && replica == 0 { go wait.JitterUntil(func() { - if handler.offset > 0 { - nextOffset, err := client.GetOffset(x.Topic, handler.partition, sarama.OffsetNewest) + offset, partition := handler.offset, handler.partition // copy these variables, so changes are not a problem + if offset > 0 { + nextOffset, err := client.GetOffset(x.Topic, partition, sarama.OffsetNewest) if err != nil { logger.Error(err, "failed to get offset", "source", sourceName) - } else if pending := nextOffset - 1 - handler.offset; pending >= 0 { - logger.Info("setting pending", "source", sourceName, "pending", pending, "nextOffset", nextOffset, "handlerOffset", handler.offset) + } else if pending := nextOffset - 1 - offset; pending >= 0 { + logger.Info("setting pending", "source", sourceName, "pending", pending, "nextOffset", nextOffset, "handlerOffset", offset) withLock(func() { status.SourceStatuses.SetPending(sourceName, uint64(pending)) }) } }