From 980c741576235f8eb04fcecd48ef1de3c59e69d4 Mon Sep 17 00:00:00 2001 From: Alex Collins Date: Tue, 25 May 2021 13:24:30 -0700 Subject: [PATCH] fix: try and avoid partition changes --- runner/sidecar/sidecar.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/runner/sidecar/sidecar.go b/runner/sidecar/sidecar.go index 95f440e7..2609b0e4 100644 --- a/runner/sidecar/sidecar.go +++ b/runner/sidecar/sidecar.go @@ -364,12 +364,13 @@ func connectSources(ctx context.Context, toMain func([]byte) error) error { }) if i == 0 && replica == 0 { go wait.JitterUntil(func() { - if handler.offset > 0 { - nextOffset, err := client.GetOffset(x.Topic, handler.partition, sarama.OffsetNewest) + offset, partition := handler.offset, handler.partition // copy these variables, so changes are not a problem + if offset > 0 { + nextOffset, err := client.GetOffset(x.Topic, partition, sarama.OffsetNewest) if err != nil { logger.Error(err, "failed to get offset", "source", sourceName) - } else if pending := nextOffset - 1 - handler.offset; pending >= 0 { - logger.Info("setting pending", "source", sourceName, "pending", pending, "nextOffset", nextOffset, "handlerOffset", handler.offset) + } else if pending := nextOffset - 1 - offset; pending >= 0 { + logger.Info("setting pending", "source", sourceName, "pending", pending, "nextOffset", nextOffset, "handlerOffset", offset) withLock(func() { status.SourceStatuses.SetPending(sourceName, uint64(pending)) }) } }