diff --git a/CHANGELOG.md b/CHANGELOG.md index 22e806e20e0..115bd7ea607 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ - Mask password in postgres scaler auto generated metricName. ([PR #1381](https://github.com/kedacore/keda/pull/1381)) - Bug fix for pending jobs in ScaledJob's accurateScalingStrategy . ([#1323](https://github.com/kedacore/keda/issues/1323)) - Fix memory leak because of unclosed scalers. ([#1413](https://github.com/kedacore/keda/issues/1413)) +- Optimize Kafka scaler's `getLagForPartition` function. ([#1464](https://github.com/kedacore/keda/pull/1464)) - Reduce unnecessary /scale requests from ScaledObject controller ([#1453](https://github.com/kedacore/keda/pull/1453)) ### Breaking Changes diff --git a/pkg/scalers/kafka_scaler.go b/pkg/scalers/kafka_scaler.go index 972d4f90885..a1227b27015 100644 --- a/pkg/scalers/kafka_scaler.go +++ b/pkg/scalers/kafka_scaler.go @@ -298,20 +298,19 @@ func (s *kafkaScaler) getLagForPartition(partition int32, offsets *sarama.Offset return 0, fmt.Errorf("error finding offset block for topic %s and partition %d", s.metadata.topic, partition) } consumerOffset := block.Offset + if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == latest { + kafkaLog.V(0).Info(fmt.Sprintf("invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet", s.metadata.topic, s.metadata.group, partition)) + return invalidOffset, fmt.Errorf("invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet", s.metadata.topic, s.metadata.group, partition) + } latestOffset, err := s.client.GetOffset(s.metadata.topic, partition, sarama.OffsetNewest) if err != nil { kafkaLog.Error(err, fmt.Sprintf("error finding latest offset for topic %s and partition %d\n", s.metadata.topic, partition)) return 0, fmt.Errorf("error finding latest offset for topic %s and partition %d", s.metadata.topic, partition) } - - if consumerOffset == invalidOffset { - if s.metadata.offsetResetPolicy == latest { - kafkaLog.V(0).Info(fmt.Sprintf("invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet", s.metadata.topic, s.metadata.group, partition)) - return invalidOffset, fmt.Errorf("invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet", s.metadata.topic, s.metadata.group, partition) - } + if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == earliest { return latestOffset, nil } - return (latestOffset - consumerOffset), nil + return latestOffset - consumerOffset, nil } // Close closes the kafka admin and client