From a2e04a7845dc3e77c572263d7dc0d02b030eb92c Mon Sep 17 00:00:00 2001 From: messense Date: Wed, 30 Dec 2020 14:14:31 +0800 Subject: [PATCH] kafka: optimize getLagForPartition Signed-off-by: messense --- CHANGELOG.md | 1 + pkg/scalers/kafka_scaler.go | 13 ++++++------- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c8a3d1d1b8a..8f84958a2a5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ - Mask password in postgres scaler auto generated metricName. ([PR #1381](https://github.com/kedacore/keda/pull/1381)) - Bug fix for pending jobs in ScaledJob's accurateScalingStrategy . ([#1323](https://github.com/kedacore/keda/issues/1323)) - Fix memory leak because of unclosed scalers. ([#1413](https://github.com/kedacore/keda/issues/1413)) +- Optimize Kafka scaler's `getLagForPartition` function. ([#1464](https://github.com/kedacore/keda/pull/1464)) ### Breaking Changes diff --git a/pkg/scalers/kafka_scaler.go b/pkg/scalers/kafka_scaler.go index 972d4f90885..a1227b27015 100644 --- a/pkg/scalers/kafka_scaler.go +++ b/pkg/scalers/kafka_scaler.go @@ -298,20 +298,19 @@ func (s *kafkaScaler) getLagForPartition(partition int32, offsets *sarama.Offset return 0, fmt.Errorf("error finding offset block for topic %s and partition %d", s.metadata.topic, partition) } consumerOffset := block.Offset + if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == latest { + kafkaLog.V(0).Info(fmt.Sprintf("invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet", s.metadata.topic, s.metadata.group, partition)) + return invalidOffset, fmt.Errorf("invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet", s.metadata.topic, s.metadata.group, partition) + } latestOffset, err := s.client.GetOffset(s.metadata.topic, partition, sarama.OffsetNewest) if err != nil { kafkaLog.Error(err, fmt.Sprintf("error finding latest offset for topic %s and partition %d\n", s.metadata.topic, partition)) return 0, fmt.Errorf("error finding latest offset for topic %s and partition %d", s.metadata.topic, partition) } - - if consumerOffset == invalidOffset { - if s.metadata.offsetResetPolicy == latest { - kafkaLog.V(0).Info(fmt.Sprintf("invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet", s.metadata.topic, s.metadata.group, partition)) - return invalidOffset, fmt.Errorf("invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet", s.metadata.topic, s.metadata.group, partition) - } + if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == earliest { return latestOffset, nil } - return (latestOffset - consumerOffset), nil + return latestOffset - consumerOffset, nil } // Close closes the kafka admin and client