Skip to content

Commit

Permalink
Merge branch 'main' into less-scale-get
Browse files Browse the repository at this point in the history
  • Loading branch information
coderanger committed Jan 4, 2021
2 parents 2288c7e + 8ab9008 commit 84d3698
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 37 deletions.
6 changes: 3 additions & 3 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,23 @@ updates:
- package-ecosystem: github-actions
directory: "/"
schedule:
interval: weekly
interval: monthly
open-pull-requests-limit: 10
labels:
- enhancement
- dependency-management
- package-ecosystem: gomod
directory: "/"
schedule:
interval: weekly
interval: monthly
open-pull-requests-limit: 10
labels:
- enhancement
- dependency-management
- package-ecosystem: docker
directory: "/"
schedule:
interval: weekly
interval: monthly
open-pull-requests-limit: 10
labels:
- enhancement
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
- Mask password in postgres scaler auto generated metricName. ([PR #1381](https://github.com/kedacore/keda/pull/1381))
- Bug fix for pending jobs in ScaledJob's accurateScalingStrategy . ([#1323](https://github.com/kedacore/keda/issues/1323))
- Fix memory leak because of unclosed scalers. ([#1413](https://github.com/kedacore/keda/issues/1413))
- Optimize Kafka scaler's `getLagForPartition` function. ([#1464](https://github.com/kedacore/keda/pull/1464))
- Reduce unnecessary /scale requests from ScaledObject controller ([#1453](https://github.com/kedacore/keda/pull/1453))
- Improve performance when fetching current scaling information on Deployments ([#1458](https://github.com/kedacore/keda/pull/1458))

### Breaking Changes
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
<p align="center"><img src="images/logos/keda-word-colour.png" width="300"/></p>
<p style="font-size: 25px" align="center"><b>Kubernetes-based Event Driven Autoscaling</b></p>
<p style="font-size: 25px" align="center">
<a href="https://github.com/kedacore/keda/actions"><img src="https://github.com/kedacore/keda/workflows/main-build/badge.svg" alt="main build"></a>
<a href="https://github.com/kedacore/keda/actions"><img src="https://github.com/kedacore/keda/workflows/nightly-e2e-test/badge.svg" alt="nightly e2e"></a>
<a href="https://github.com/kedacore/keda/actions?query=workflow%3Amain-build"><img src="https://github.com/kedacore/keda/workflows/main-build/badge.svg" alt="main build"></a>
<a href="https://github.com/kedacore/keda/actions?query=workflow%3Anightly-e2e-test"><img src="https://github.com/kedacore/keda/workflows/nightly-e2e-test/badge.svg" alt="nightly e2e"></a>
<a href="https://bestpractices.coreinfrastructure.org/projects/3791"><img src="https://bestpractices.coreinfrastructure.org/projects/3791/badge"></a>
<a href="https://twitter.com/kedaorg"><img src="https://img.shields.io/twitter/follow/kedaorg?style=social" alt="Twitter"></a></p>

Expand Down
52 changes: 38 additions & 14 deletions controllers/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/go-logr/logr"
autoscalingv1 "k8s.io/api/autoscaling/v1"
autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
Expand Down Expand Up @@ -53,6 +54,17 @@ type ScaledObjectReconciler struct {
globalHTTPTimeout time.Duration
}

// A cache mapping "resource.group" to true or false if we know if this resource is scalable.
var isScalableCache map[string]bool

func init() {
// Prefill the cache with some known values for core resources in case of future parallelism to avoid stampeding herd on startup.
isScalableCache = map[string]bool{
"deployments.apps": true,
"statefusets.apps": true,
}
}

// SetupWithManager initializes the ScaledObjectReconciler instance and starts a new controller managed by the passed Manager instance.
func (r *ScaledObjectReconciler) SetupWithManager(mgr ctrl.Manager) error {
// create Discovery clientset
Expand Down Expand Up @@ -232,26 +244,38 @@ func (r *ScaledObjectReconciler) checkTargetResourceIsScalable(logger logr.Logge
gvkString := gvkr.GVKString()
logger.V(1).Info("Parsed Group, Version, Kind, Resource", "GVK", gvkString, "Resource", gvkr.Resource)

// let's try to detect /scale subresource
scale, errScale := (*r.scaleClient).Scales(scaledObject.Namespace).Get(context.TODO(), gvkr.GroupResource(), scaledObject.Spec.ScaleTargetRef.Name, metav1.GetOptions{})
if errScale != nil {
// not able to get /scale subresource -> let's check if the resource even exist in the cluster
unstruct := &unstructured.Unstructured{}
unstruct.SetGroupVersionKind(gvkr.GroupVersionKind())
if err := r.Client.Get(context.TODO(), client.ObjectKey{Namespace: scaledObject.Namespace, Name: scaledObject.Spec.ScaleTargetRef.Name}, unstruct); err != nil {
// resource doesn't exist
logger.Error(err, "Target resource doesn't exist", "resource", gvkString, "name", scaledObject.Spec.ScaleTargetRef.Name)
return gvkr, err
// do we need the scale to update the status later?
wantStatusUpdate := scaledObject.Status.ScaleTargetKind != gvkString || scaledObject.Status.OriginalReplicaCount == nil

// check if we already know.
var scale *autoscalingv1.Scale
gr := gvkr.GroupResource()
isScalable := isScalableCache[gr.String()]
if !isScalable || wantStatusUpdate {
// not cached, let's try to detect /scale subresource
// also rechecks when we need to update the status.
var errScale error
scale, errScale = (*r.scaleClient).Scales(scaledObject.Namespace).Get(context.TODO(), gr, scaledObject.Spec.ScaleTargetRef.Name, metav1.GetOptions{})
if errScale != nil {
// not able to get /scale subresource -> let's check if the resource even exist in the cluster
unstruct := &unstructured.Unstructured{}
unstruct.SetGroupVersionKind(gvkr.GroupVersionKind())
if err := r.Client.Get(context.TODO(), client.ObjectKey{Namespace: scaledObject.Namespace, Name: scaledObject.Spec.ScaleTargetRef.Name}, unstruct); err != nil {
// resource doesn't exist
logger.Error(err, "Target resource doesn't exist", "resource", gvkString, "name", scaledObject.Spec.ScaleTargetRef.Name)
return gvkr, err
}
// resource exist but doesn't expose /scale subresource
logger.Error(errScale, "Target resource doesn't expose /scale subresource", "resource", gvkString, "name", scaledObject.Spec.ScaleTargetRef.Name)
return gvkr, errScale
}
// resource exist but doesn't expose /scale subresource
logger.Error(errScale, "Target resource doesn't expose /scale subresource", "resource", gvkString, "name", scaledObject.Spec.ScaleTargetRef.Name)
return gvkr, errScale
isScalableCache[gr.String()] = true
}

// if it is not already present in ScaledObject Status:
// - store discovered GVK and GVKR
// - store original scaleTarget's replica count (before scaling with KEDA)
if scaledObject.Status.ScaleTargetKind != gvkString || scaledObject.Status.OriginalReplicaCount == nil {
if wantStatusUpdate {
status := scaledObject.Status.DeepCopy()
if scaledObject.Status.ScaleTargetKind != gvkString {
status.ScaleTargetKind = gvkString
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/Azure/go-autorest/autorest/azure/auth v0.5.5
github.com/Huawei/gophercloud v1.0.21
github.com/Shopify/sarama v1.27.2
github.com/aws/aws-sdk-go v1.36.12
github.com/aws/aws-sdk-go v1.36.19
github.com/go-logr/logr v0.1.0
github.com/go-redis/redis v6.15.9+incompatible
github.com/go-sql-driver/mysql v1.5.0
Expand All @@ -23,7 +23,7 @@ require (
github.com/google/go-cmp v0.5.4
github.com/hashicorp/vault/api v1.0.4
github.com/imdario/mergo v0.3.11
github.com/influxdata/influxdb-client-go/v2 v2.2.0
github.com/influxdata/influxdb-client-go/v2 v2.2.1
github.com/kubernetes-incubator/custom-metrics-apiserver v0.0.0-20200618121405-54026617ec44
github.com/lib/pq v1.9.0
github.com/mitchellh/hashstructure v1.1.0
Expand All @@ -34,7 +34,7 @@ require (
github.com/robfig/cron/v3 v3.0.1
github.com/streadway/amqp v1.0.0
github.com/stretchr/testify v1.6.1
github.com/tidwall/gjson v1.6.4
github.com/tidwall/gjson v1.6.7
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 // indirect
google.golang.org/api v0.36.0
Expand Down
16 changes: 8 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,8 @@ github.com/aws/aws-sdk-go v1.30.5/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU
github.com/aws/aws-sdk-go v1.30.16/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
github.com/aws/aws-sdk-go v1.31.6/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
github.com/aws/aws-sdk-go v1.31.12/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
github.com/aws/aws-sdk-go v1.36.12 h1:YJpKFEMbqEoo+incs5qMe61n1JH3o4O1IMkMexLzJG8=
github.com/aws/aws-sdk-go v1.36.12/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro=
github.com/aws/aws-sdk-go v1.36.19 h1:zbJZKkxeDiYxUYFjymjWxPye+qa1G2gRVyhIzZrB9zA=
github.com/aws/aws-sdk-go v1.36.19/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro=
github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g=
github.com/aybabtme/rgbterm v0.0.0-20170906152045-cc83f3b3ce59/go.mod h1:q/89r3U2H7sSsE2t6Kca0lfwTK8JdoNGS/yzM/4iH5I=
github.com/bazelbuild/buildtools v0.0.0-20190917191645-69366ca98f89/go.mod h1:5JP0TXzWDHXv8qvxRC4InIazwdyDseBDbzESUMKk1yU=
Expand Down Expand Up @@ -835,8 +835,8 @@ github.com/imdario/mergo v0.3.11 h1:3tnifQM4i+fbajXKBHXWEH+KvNHqojZ778UH75j3bGA=
github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/influxdata/influxdb v0.0.0-20161215172503-049f9b42e9a5/go.mod h1:qZna6X/4elxqT3yI9iZYdZrWWdeFOOprn86kgg4+IzY=
github.com/influxdata/influxdb-client-go/v2 v2.2.0 h1:2R/le0s/MZpHtc+ijuXKe2c4KGN14M85mWtGlmg6vec=
github.com/influxdata/influxdb-client-go/v2 v2.2.0/go.mod h1:fa/d1lAdUHxuc1jedx30ZfNG573oQTQmUni3N6pcW+0=
github.com/influxdata/influxdb-client-go/v2 v2.2.1 h1:VSSQG8jGj05fz0HoNBKeCGdo2dtK7ShdmgGR+DQp4/8=
github.com/influxdata/influxdb-client-go/v2 v2.2.1/go.mod h1:fa/d1lAdUHxuc1jedx30ZfNG573oQTQmUni3N6pcW+0=
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo=
Expand Down Expand Up @@ -1314,10 +1314,10 @@ github.com/tektoncd/plumbing v0.0.0-20200430135134-e53521e1d887/go.mod h1:cZPJIe
github.com/tektoncd/plumbing/pipelinerun-logs v0.0.0-20191206114338-712d544c2c21/go.mod h1:S62EUWtqmejjJgUMOGB1CCCHRp6C706laH06BoALkzU=
github.com/tetafro/godot v0.3.7/go.mod h1:/7NLHhv08H1+8DNj0MElpAACw1ajsCuf3TKNQxA5S+0=
github.com/tetafro/godot v0.4.2/go.mod h1:/7NLHhv08H1+8DNj0MElpAACw1ajsCuf3TKNQxA5S+0=
github.com/tidwall/gjson v1.6.4 h1:JKsCsJqRVFz8eYCsQ5E/ANRbK6CanAtA9IUvGsXklyo=
github.com/tidwall/gjson v1.6.4/go.mod h1:BaHyNc5bjzYkPqgLq7mdVzeiRtULKULXLgZFKsxEHI0=
github.com/tidwall/match v1.0.1 h1:PnKP62LPNxHKTwvHHZZzdOAOCtsJTjo6dZLCwpKm5xc=
github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E=
github.com/tidwall/gjson v1.6.7 h1:Mb1M9HZCRWEcXQ8ieJo7auYyyiSux6w9XN3AdTpxJrE=
github.com/tidwall/gjson v1.6.7/go.mod h1:zeFuBCIqD4sN/gmqBzZ4j7Jd6UcA2Fc56x7QFsv+8fI=
github.com/tidwall/match v1.0.3 h1:FQUVvBImDutD8wJLN6c5eMzWtjgONK9MwIBCOrUJKeE=
github.com/tidwall/match v1.0.3/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tidwall/pretty v1.0.2 h1:Z7S3cePv9Jwm1KwS0513MRaoUe3S01WPbLNV40pwWZU=
github.com/tidwall/pretty v1.0.2/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
Expand Down
13 changes: 6 additions & 7 deletions pkg/scalers/kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,20 +298,19 @@ func (s *kafkaScaler) getLagForPartition(partition int32, offsets *sarama.Offset
return 0, fmt.Errorf("error finding offset block for topic %s and partition %d", s.metadata.topic, partition)
}
consumerOffset := block.Offset
if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == latest {
kafkaLog.V(0).Info(fmt.Sprintf("invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet", s.metadata.topic, s.metadata.group, partition))
return invalidOffset, fmt.Errorf("invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet", s.metadata.topic, s.metadata.group, partition)
}
latestOffset, err := s.client.GetOffset(s.metadata.topic, partition, sarama.OffsetNewest)
if err != nil {
kafkaLog.Error(err, fmt.Sprintf("error finding latest offset for topic %s and partition %d\n", s.metadata.topic, partition))
return 0, fmt.Errorf("error finding latest offset for topic %s and partition %d", s.metadata.topic, partition)
}

if consumerOffset == invalidOffset {
if s.metadata.offsetResetPolicy == latest {
kafkaLog.V(0).Info(fmt.Sprintf("invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet", s.metadata.topic, s.metadata.group, partition))
return invalidOffset, fmt.Errorf("invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet", s.metadata.topic, s.metadata.group, partition)
}
if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == earliest {
return latestOffset, nil
}
return (latestOffset - consumerOffset), nil
return latestOffset - consumerOffset, nil
}

// Close closes the kafka admin and client
Expand Down

0 comments on commit 84d3698

Please sign in to comment.