Skip to content

Commit

Permalink
fix: negative Kafka pending
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed Jun 21, 2021
1 parent 32a73a7 commit 24d91a1
Showing 1 changed file with 4 additions and 1 deletion.
5 changes: 4 additions & 1 deletion runner/sidecar/sources.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,10 @@ func startKafkaSetPendingLoop(ctx context.Context, x *dfv1.Kafka, sourceName str
return
}
block := rep.GetBlock(x.Topic, partition)
totalLags += partitionOffset - block.Offset - 1
x := partitionOffset - block.Offset - 1
if x > 0 {
totalLags += x
}
}
logger.Info("setting pending", "source", sourceName, "pending", totalLags)
withLock(func() { step.Status.SourceStatuses.SetPending(sourceName, uint64(totalLags)) })
Expand Down

0 comments on commit 24d91a1

Please sign in to comment.