Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a cache for what resource types are scalable so not every reconcile has to do a GET to the /scale endpoint #1453

Merged
merged 2 commits into from
Jan 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
- Mask password in postgres scaler auto generated metricName. ([PR #1381](https://github.com/kedacore/keda/pull/1381))
- Bug fix for pending jobs in ScaledJob's accurateScalingStrategy . ([#1323](https://github.com/kedacore/keda/issues/1323))
- Fix memory leak because of unclosed scalers. ([#1413](https://github.com/kedacore/keda/issues/1413))
- Reduce unnecessary /scale requests from ScaledObject controller ([#1453](https://github.com/kedacore/keda/pull/1453))

### Breaking Changes

Expand Down
52 changes: 38 additions & 14 deletions controllers/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/go-logr/logr"
autoscalingv1 "k8s.io/api/autoscaling/v1"
autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
Expand Down Expand Up @@ -53,6 +54,17 @@ type ScaledObjectReconciler struct {
globalHTTPTimeout time.Duration
}

// A cache mapping "resource.group" to true or false if we know if this resource is scalable.
var isScalableCache map[string]bool

func init() {
// Prefill the cache with some known values for core resources in case of future parallelism to avoid stampeding herd on startup.
isScalableCache = map[string]bool{
"deployments.apps": true,
"statefusets.apps": true,
}
}

// SetupWithManager initializes the ScaledObjectReconciler instance and starts a new controller managed by the passed Manager instance.
func (r *ScaledObjectReconciler) SetupWithManager(mgr ctrl.Manager) error {
// create Discovery clientset
Expand Down Expand Up @@ -231,26 +243,38 @@ func (r *ScaledObjectReconciler) checkTargetResourceIsScalable(logger logr.Logge
gvkString := gvkr.GVKString()
logger.V(1).Info("Parsed Group, Version, Kind, Resource", "GVK", gvkString, "Resource", gvkr.Resource)

// let's try to detect /scale subresource
scale, errScale := (*r.scaleClient).Scales(scaledObject.Namespace).Get(context.TODO(), gvkr.GroupResource(), scaledObject.Spec.ScaleTargetRef.Name, metav1.GetOptions{})
if errScale != nil {
// not able to get /scale subresource -> let's check if the resource even exist in the cluster
unstruct := &unstructured.Unstructured{}
unstruct.SetGroupVersionKind(gvkr.GroupVersionKind())
if err := r.Client.Get(context.TODO(), client.ObjectKey{Namespace: scaledObject.Namespace, Name: scaledObject.Spec.ScaleTargetRef.Name}, unstruct); err != nil {
// resource doesn't exist
logger.Error(err, "Target resource doesn't exist", "resource", gvkString, "name", scaledObject.Spec.ScaleTargetRef.Name)
return gvkr, err
// do we need the scale to update the status later?
wantStatusUpdate := scaledObject.Status.ScaleTargetKind != gvkString || scaledObject.Status.OriginalReplicaCount == nil

// check if we already know.
var scale *autoscalingv1.Scale
gr := gvkr.GroupResource()
isScalable := isScalableCache[gr.String()]
if !isScalable || wantStatusUpdate {
// not cached, let's try to detect /scale subresource
// also rechecks when we need to update the status.
var errScale error
scale, errScale = (*r.scaleClient).Scales(scaledObject.Namespace).Get(context.TODO(), gr, scaledObject.Spec.ScaleTargetRef.Name, metav1.GetOptions{})
if errScale != nil {
// not able to get /scale subresource -> let's check if the resource even exist in the cluster
unstruct := &unstructured.Unstructured{}
unstruct.SetGroupVersionKind(gvkr.GroupVersionKind())
if err := r.Client.Get(context.TODO(), client.ObjectKey{Namespace: scaledObject.Namespace, Name: scaledObject.Spec.ScaleTargetRef.Name}, unstruct); err != nil {
// resource doesn't exist
logger.Error(err, "Target resource doesn't exist", "resource", gvkString, "name", scaledObject.Spec.ScaleTargetRef.Name)
return gvkr, err
}
// resource exist but doesn't expose /scale subresource
logger.Error(errScale, "Target resource doesn't expose /scale subresource", "resource", gvkString, "name", scaledObject.Spec.ScaleTargetRef.Name)
return gvkr, errScale
}
// resource exist but doesn't expose /scale subresource
logger.Error(errScale, "Target resource doesn't expose /scale subresource", "resource", gvkString, "name", scaledObject.Spec.ScaleTargetRef.Name)
return gvkr, errScale
isScalableCache[gr.String()] = true
}

// if it is not already present in ScaledObject Status:
// - store discovered GVK and GVKR
// - store original scaleTarget's replica count (before scaling with KEDA)
if scaledObject.Status.ScaleTargetKind != gvkString || scaledObject.Status.OriginalReplicaCount == nil {
if wantStatusUpdate {
status := scaledObject.Status.DeepCopy()
if scaledObject.Status.ScaleTargetKind != gvkString {
status.ScaleTargetKind = gvkString
Expand Down