From d496dbea9992db60dbd98bbf86e57f9faf627249 Mon Sep 17 00:00:00 2001 From: Zbynek Roubalik Date: Mon, 16 Jan 2023 17:02:16 +0100 Subject: [PATCH] expose Prom Metrics also when getting ScaledObject State Signed-off-by: Zbynek Roubalik --- CHANGELOG.md | 5 +- pkg/provider/provider.go | 2 +- pkg/scaling/cache/scalers_cache.go | 129 ++++++++------------------ pkg/scaling/scale_handler.go | 141 ++++++++++++++++++++++++----- pkg/scaling/scale_handler_test.go | 48 ++++++++-- 5 files changed, 201 insertions(+), 124 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7fb5f8286e3..f4a62ce8551 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -48,9 +48,9 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio Here is an overview of all **stable** additions: -- **General**: Introduce admission webhooks to automatically validate resource changes to prevent misconfiguration and enforce best practices. ([#3755](https://github.com/kedacore/keda/issues/3755)) +- **General**: Introduce admission webhooks to automatically validate resource changes to prevent misconfiguration and enforce best practices ([#3755](https://github.com/kedacore/keda/issues/3755)) - **General**: Introduce new ArangoDB Scaler ([#4000](https://github.com/kedacore/keda/issues/4000)) -- **Prometheus Metrics**: Introduce scaler latency in Prometheus metrics. ([#4037](https://github.com/kedacore/keda/issues/4037)) +- **Prometheus Metrics**: Introduce scaler latency in Prometheus metrics ([#4037](https://github.com/kedacore/keda/issues/4037)) Here is an overview of all new **experimental** features: @@ -64,6 +64,7 @@ Here is an overview of all new **experimental** features: ### Fixes - **General**: Prevent a panic that might occur while refreshing a scaler cache ([#4092](https://github.com/kedacore/keda/issues/4092)) +- **Prometheus Metrics**: Expose Prometheus Metrics also when getting ScaledObject state ([#4075](https://github.com/kedacore/keda/issues/4075)) - **Azure Service Bus Scaler:** Use correct auth flows with pod identity ([#4026](https://github.com/kedacore/keda/issues/4026)) - **CPU Memory Scaler** Store forgotten logger ([#4022](https://github.com/kedacore/keda/issues/4022)) diff --git a/pkg/provider/provider.go b/pkg/provider/provider.go index 8561ed9956e..1fe4aac51eb 100644 --- a/pkg/provider/provider.go +++ b/pkg/provider/provider.go @@ -172,7 +172,7 @@ func (p *KedaProvider) GetExternalMetric(ctx context.Context, namespace string, } // Filter only the desired metric if strings.EqualFold(metricSpec.External.Metric.Name, info.Metric) { - metrics, _, err := cache.GetMetricsForScaler(ctx, scalerIndex, info.Metric) + metrics, _, _, err := cache.GetMetricsAndActivityForScaler(ctx, scalerIndex, info.Metric) metrics, err = fallback.GetMetricsWithFallback(ctx, p.client, logger, metrics, err, info.Metric, scaledObject, metricSpec) if err != nil { scalerError = true diff --git a/pkg/scaling/cache/scalers_cache.go b/pkg/scaling/cache/scalers_cache.go index de7c2a30f94..d06f9bc2be8 100644 --- a/pkg/scaling/cache/scalers_cache.go +++ b/pkg/scaling/cache/scalers_cache.go @@ -31,7 +31,6 @@ import ( kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" "github.com/kedacore/keda/v2/pkg/eventreason" "github.com/kedacore/keda/v2/pkg/scalers" - "github.com/kedacore/keda/v2/pkg/scaling/cache/metricscache" ) var log = logf.Log.WithName("scalers_cache") @@ -70,105 +69,53 @@ func (c *ScalersCache) GetPushScalers() []scalers.PushScaler { return result } -// GetMetricsForScaler returns metric value and latency for a scaler identified by the metric name +// GetMetricSpecForScalingForScaler returns metrics spec for a scaler identified by the metric name +func (c *ScalersCache) GetMetricSpecForScalingForScaler(ctx context.Context, index int) ([]v2.MetricSpec, error) { + var err error + + scalersList, _ := c.GetScalers() + if index < 0 || index >= len(scalersList) { + return nil, fmt.Errorf("scaler with id %d not found. Len = %d", index, len(c.Scalers)) + } + + metricSpecs := scalersList[index].GetMetricSpecForScaling(ctx) + + // no metric spec returned for a scaler -> this could signal error during connection to the scaler + // usually in case this is an external scaler + // let's try to refresh the scaler and query metrics spec again + if len(metricSpecs) < 1 { + var ns scalers.Scaler + ns, err = c.refreshScaler(ctx, index) + if err == nil { + metricSpecs = ns.GetMetricSpecForScaling(ctx) + if len(metricSpecs) < 1 { + err = fmt.Errorf("got empty metric spec") + } + } + } + + return metricSpecs, err +} + +// GetMetricsAndActivityForScaler returns metric value, activity and latency for a scaler identified by the metric name // and by the input index (from the list of scalers in this ScaledObject) -func (c *ScalersCache) GetMetricsForScaler(ctx context.Context, index int, metricName string) ([]external_metrics.ExternalMetricValue, int64, error) { +func (c *ScalersCache) GetMetricsAndActivityForScaler(ctx context.Context, index int, metricName string) ([]external_metrics.ExternalMetricValue, bool, int64, error) { if index < 0 || index >= len(c.Scalers) { - return nil, -1, fmt.Errorf("scaler with id %d not found. Len = %d", index, len(c.Scalers)) + return nil, false, -1, fmt.Errorf("scaler with id %d not found. Len = %d", index, len(c.Scalers)) } startTime := time.Now() - m, _, err := c.Scalers[index].Scaler.GetMetricsAndActivity(ctx, metricName) + metric, activity, err := c.Scalers[index].Scaler.GetMetricsAndActivity(ctx, metricName) if err == nil { - return m, time.Since(startTime).Milliseconds(), nil + return metric, activity, time.Since(startTime).Milliseconds(), nil } ns, err := c.refreshScaler(ctx, index) if err != nil { - return nil, -1, err + return nil, false, -1, err } startTime = time.Now() - m, _, err = ns.GetMetricsAndActivity(ctx, metricName) - return m, time.Since(startTime).Milliseconds(), err -} - -// GetScaledObjectState returns whether the input ScaledObject is active as a first parameters, -// the second parameter indicates whether there was any error during quering scalers -// the third parameter returns map of metrics record - a metric value for each scaler and it's metric -func (c *ScalersCache) GetScaledObjectState(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject) (bool, bool, map[string]metricscache.MetricsRecord) { - logger := log.WithValues("scaledobject.Name", scaledObject.Name, "scaledObject.Namespace", scaledObject.Namespace, "scaleTarget.Name", scaledObject.Spec.ScaleTargetRef.Name) - - isScaledObjectActive := false - isError := false - metricsRecord := map[string]metricscache.MetricsRecord{} - - // Let's collect status of all scalers, no matter if any scaler raises error or is active - for i, s := range c.Scalers { - metricSpec := s.Scaler.GetMetricSpecForScaling(ctx) - - // no metric spec returned for a scaler -> this could signal error during connection to the scaler - // usually in case this is an external scaler - // let's try to refresh the scaler and query metrics spec again - if len(metricSpec) < 1 { - var err error - var ns scalers.Scaler - - ns, err = c.refreshScaler(ctx, i) - if err == nil { - metricSpec = ns.GetMetricSpecForScaling(ctx) - if len(metricSpec) < 1 { - isError = true - err = fmt.Errorf("error getting metrics spec") - logger.Error(err, "error getting metric spec for the scaler", "scaler", s.ScalerConfig.TriggerName) - c.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) - } - } else { - isError = true - logger.Error(err, "error getting metric spec for the scaler", "scaler", s.ScalerConfig.TriggerName) - c.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) - } - } - - for _, spec := range metricSpec { - // skip cpu/memory resource scaler, these scalers are also always Active - if spec.External == nil { - isScaledObjectActive = true - continue - } - - metric, isMetricActive, err := s.Scaler.GetMetricsAndActivity(ctx, spec.External.Metric.Name) - if err != nil { - var ns scalers.Scaler - ns, err = c.refreshScaler(ctx, i) - if err == nil { - metric, isMetricActive, err = ns.GetMetricsAndActivity(ctx, spec.External.Metric.Name) - } - } - - if s.ScalerConfig.TriggerUseCachedMetrics { - metricsRecord[spec.External.Metric.Name] = metricscache.MetricsRecord{ - IsActive: isMetricActive, - Metric: metric, - ScalerError: err, - } - } - - if err != nil { - isError = true - logger.Error(err, "error getting scale decision") - c.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) - } else if isMetricActive { - isScaledObjectActive = true - if spec.External != nil { - logger.V(1).Info("Scaler for scaledObject is active", "Metrics Name", spec.External.Metric.Name) - } - if spec.Resource != nil { - logger.V(1).Info("Scaler for scaledObject is active", "Metrics Name", spec.Resource.Name) - } - } - } - } - - return isScaledObjectActive, isError, metricsRecord + metric, activity, err = ns.GetMetricsAndActivity(ctx, metricName) + return metric, activity, time.Since(startTime).Milliseconds(), err } func (c *ScalersCache) IsScaledJobActive(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) (bool, int64, int64) { @@ -233,7 +180,7 @@ func (c *ScalersCache) IsScaledJobActive(ctx context.Context, scaledJob *kedav1a func (c *ScalersCache) refreshScaler(ctx context.Context, id int) (scalers.Scaler, error) { if id < 0 || id >= len(c.Scalers) { - return nil, fmt.Errorf("scaler with id %d not found. Len = %d", id, len(c.Scalers)) + return nil, fmt.Errorf("scaler with id %d not found, len = %d, cache has been probably already invalidated", id, len(c.Scalers)) } sb := c.Scalers[id] @@ -244,7 +191,7 @@ func (c *ScalersCache) refreshScaler(ctx context.Context, id int) (scalers.Scale } if id < 0 || id >= len(c.Scalers) { - return nil, fmt.Errorf("scaler with id %d not found. Len = %d", id, len(c.Scalers)) + return nil, fmt.Errorf("scaler with id %d not found, len = %d, cache has been probably already invalidated", id, len(c.Scalers)) } c.Scalers[id] = ScalerBuilder{ Scaler: ns, diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index 67141aea3e0..2902db2bdc5 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -342,28 +342,28 @@ func (h *scaleHandler) startPushScalers(ctx context.Context, withTriggers *kedav // checkScalers contains the main logic for the ScaleHandler scaling logic. // It'll check each trigger active status then call RequestScale func (h *scaleHandler) checkScalers(ctx context.Context, scalableObject interface{}, scalingMutex sync.Locker) { - cache, err := h.GetScalersCache(ctx, scalableObject) - if err != nil { - h.logger.Error(err, "Error getting scalers", "object", scalableObject) - return - } - scalingMutex.Lock() defer scalingMutex.Unlock() switch obj := scalableObject.(type) { case *kedav1alpha1.ScaledObject: - err = h.client.Get(ctx, types.NamespacedName{Name: obj.Name, Namespace: obj.Namespace}, obj) + err := h.client.Get(ctx, types.NamespacedName{Name: obj.Name, Namespace: obj.Namespace}, obj) if err != nil { h.logger.Error(err, "Error getting scaledObject", "object", scalableObject) return } - isActive, isError, metricsRecords := cache.GetScaledObjectState(ctx, obj) + isActive, isError, metricsRecords := h.getScaledObjectState(ctx, obj) h.scaleExecutor.RequestScale(ctx, obj, isActive, isError) if len(metricsRecords) > 0 { h.logger.V(1).Info("Storing metrics to cache", "scaledObject.Namespace", obj.Namespace, "scaledObject.Name", obj.Name, "metricsRecords", metricsRecords) h.scaledObjectsMetricCache.StoreRecords(obj.GenerateIdentifier(), metricsRecords) } case *kedav1alpha1.ScaledJob: + cache, err := h.GetScalersCache(ctx, scalableObject) + if err != nil { + h.logger.Error(err, "Error getting scalers", "object", scalableObject) + return + } + err = h.client.Get(ctx, types.NamespacedName{Name: obj.Name, Namespace: obj.Namespace}, obj) if err != nil { h.logger.Error(err, "Error getting scaledJob", "object", scalableObject) @@ -405,35 +405,40 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN return nil, &exportedPromMetrics, err } - scalerError := false + isScalerError := false scaledObjectIdentifier := scaledObject.GenerateIdentifier() // let's check metrics for all scalers in a ScaledObject scalers, scalerConfigs := cache.GetScalers() for scalerIndex := 0; scalerIndex < len(scalers); scalerIndex++ { - metricSpecs := scalers[scalerIndex].GetMetricSpecForScaling(ctx) - scalerName := strings.Replace(fmt.Sprintf("%T", scalers[scalerIndex]), "*scalers.", "", 1) if scalerConfigs[scalerIndex].TriggerName != "" { scalerName = scalerConfigs[scalerIndex].TriggerName } - for _, metricSpec := range metricSpecs { + metricSpecs, err := cache.GetMetricSpecForScalingForScaler(ctx, scalerIndex) + if err != nil { + isScalerError = true + h.logger.Error(err, "error getting metric spec for the scaler", "name", scaledObjectName, "namespace", scaledObjectNamespace, "scaler", scalerName) + cache.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) + } + + for _, spec := range metricSpecs { // skip cpu/memory resource scaler - if metricSpec.External == nil { + if spec.External == nil { continue } // Filter only the desired metric - if strings.EqualFold(metricSpec.External.Metric.Name, metricName) { + if strings.EqualFold(spec.External.Metric.Name, metricName) { var metrics []external_metrics.ExternalMetricValue // if cache is defined for this scaler/metric, let's try to hit it first metricsFoundInCache := false if scalerConfigs[scalerIndex].TriggerUseCachedMetrics { var metricsRecord metricscache.MetricsRecord - if metricsRecord, metricsFoundInCache = h.scaledObjectsMetricCache.ReadRecord(scaledObjectIdentifier, metricSpec.External.Metric.Name); metricsFoundInCache { - h.logger.V(1).Info("Reading metrics from cache", "scaledObject.Namespace", scaledObjectNamespace, "scaledObject.Name", scaledObjectName, "scaler", scalerName, "metricName", metricSpec.External.Metric.Name, "metricsRecord", metricsRecord) + if metricsRecord, metricsFoundInCache = h.scaledObjectsMetricCache.ReadRecord(scaledObjectIdentifier, spec.External.Metric.Name); metricsFoundInCache { + h.logger.V(1).Info("Reading metrics from cache", "scaledObject.Namespace", scaledObjectNamespace, "scaledObject.Name", scaledObjectName, "scaler", scalerName, "metricName", spec.External.Metric.Name, "metricsRecord", metricsRecord) metrics = metricsRecord.Metric err = metricsRecord.ScalerError } @@ -441,15 +446,18 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN if !metricsFoundInCache { var latency int64 - metrics, latency, err = cache.GetMetricsForScaler(ctx, scalerIndex, metricName) + metrics, _, latency, err = cache.GetMetricsAndActivityForScaler(ctx, scalerIndex, metricName) if latency != -1 { prommetrics.RecordScalerLatency(scaledObjectNamespace, scaledObject.Name, scalerName, scalerIndex, metricName, float64(latency)) } - h.logger.V(1).Info("Getting metrics from scaler", "scaledObject.Namespace", scaledObjectNamespace, "scaledObject.Name", scaledObjectName, "scaler", scalerName, "metricName", metricSpec.External.Metric.Name, "metrics", metrics, "scalerError", err) + h.logger.V(1).Info("Getting metrics from scaler", "scaledObject.Namespace", scaledObjectNamespace, "scaledObject.Name", scaledObjectName, "scaler", scalerName, "metricName", spec.External.Metric.Name, "metrics", metrics, "scalerError", err) } - metrics, err = fallback.GetMetricsWithFallback(ctx, h.client, h.logger, metrics, err, metricName, scaledObject, metricSpec) + + // check if we need to set a fallback + metrics, err = fallback.GetMetricsWithFallback(ctx, h.client, h.logger, metrics, err, metricName, scaledObject, spec) + if err != nil { - scalerError = true + isScalerError = true h.logger.Error(err, "error getting metric for scaler", "scaledObject.Namespace", scaledObjectNamespace, "scaledObject.Name", scaledObjectName, "scaler", scalerName) } else { for _, metric := range metrics { @@ -483,12 +491,12 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN // invalidate the cache for the ScaledObject, if we hit an error in any scaler // in this case we try to build all scalers (and resolve all secrets/creds) again in the next call - if scalerError { + if isScalerError { err := h.ClearScalersCache(ctx, scaledObject) if err != nil { - h.logger.Error(err, "error clearing scalers cache") + h.logger.Error(err, "error clearing scalers cache", scaledObject.Namespace, "scaledObject.Name", scaledObject.Name) } - h.logger.V(1).Info("scaler error encountered, clearing scaler cache") + h.logger.V(1).Info("scaler error encountered, clearing scaler cache", scaledObject.Namespace, "scaledObject.Name", scaledObject.Name) } if len(matchingMetrics) == 0 { @@ -500,6 +508,93 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN }, &exportedPromMetrics, nil } +// getScaledObjectState returns whether the input ScaledObject is active as the first parameter, +// the second parameter indicates whether there was any error during quering scalers +// the third parameter returns map of metrics record - a metric value for each scaler and it's metric +func (h *scaleHandler) getScaledObjectState(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject) (bool, bool, map[string]metricscache.MetricsRecord) { + isScaledObjectActive := false + isScalerError := false + metricsRecord := map[string]metricscache.MetricsRecord{} + + cache, err := h.GetScalersCache(ctx, scaledObject) + prommetrics.RecordScaledObjectError(scaledObject.Namespace, scaledObject.Name, err) + + // Let's collect status of all scalers, no matter if any scaler raises error or is active + scalers, scalerConfigs := cache.GetScalers() + for scalerIndex := 0; scalerIndex < len(scalers); scalerIndex++ { + scalerName := strings.Replace(fmt.Sprintf("%T", scalers[scalerIndex]), "*scalers.", "", 1) + if scalerConfigs[scalerIndex].TriggerName != "" { + scalerName = scalerConfigs[scalerIndex].TriggerName + } + + metricSpecs, err := cache.GetMetricSpecForScalingForScaler(ctx, scalerIndex) + if err != nil { + isScalerError = true + h.logger.Error(err, "error getting metric spec for the scaler", "name", scaledObject.Name, "namespace", scaledObject.Namespace, "scaler", scalerName) + cache.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) + } + + for _, spec := range metricSpecs { + // skip cpu/memory resource scaler, these scalers are also always Active + if spec.External == nil { + isScaledObjectActive = true + continue + } + + metricName := spec.External.Metric.Name + + var latency int64 + metrics, isMetricActive, latency, err := cache.GetMetricsAndActivityForScaler(ctx, scalerIndex, metricName) + if latency != -1 { + prommetrics.RecordScalerLatency(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metricName, float64(latency)) + } + h.logger.V(1).Info("Getting metrics and activity from scaler", "scaledObject.Namespace", scaledObject.Namespace, "scaledObject.Name", scaledObject.Name, "scaler", scalerName, "metricName", metricName, "metrics", metrics, "activity", isMetricActive, "scalerError", err) + + if scalerConfigs[scalerIndex].TriggerUseCachedMetrics { + metricsRecord[metricName] = metricscache.MetricsRecord{ + IsActive: isMetricActive, + Metric: metrics, + ScalerError: err, + } + } + + if err != nil { + isScalerError = true + h.logger.Error(err, "error getting scale decision", "scaledObject.Namespace", scaledObject.Namespace, "scaledObject.Name", scaledObject.Name, "scaler", scalerName) + cache.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) + } else { + for _, metric := range metrics { + metricValue := metric.Value.AsApproximateFloat64() + prommetrics.RecordScalerMetric(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metric.MetricName, metricValue) + } + + if isMetricActive { + isScaledObjectActive = true + if spec.External != nil { + h.logger.V(1).Info("Scaler for scaledObject is active", "scaledObject.Namespace", scaledObject.Namespace, "scaledObject.Name", scaledObject.Name, "scaler", scalerName, "metricName", metricName) + } + if spec.Resource != nil { + h.logger.V(1).Info("Scaler for scaledObject is active", "scaledObject.Namespace", scaledObject.Namespace, "scaledObject.Name", scaledObject.Name, "scaler", scalerName, "metricName", spec.Resource.Name) + } + } + } + prommetrics.RecordScalerError(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metricName, err) + } + } + + // invalidate the cache for the ScaledObject, if we hit an error in any scaler + // in this case we try to build all scalers (and resolve all secrets/creds) again in the next call + if isScalerError { + err := h.ClearScalersCache(ctx, scaledObject) + if err != nil { + h.logger.Error(err, "error clearing scalers cache", scaledObject.Namespace, "scaledObject.Name", scaledObject.Name) + } + h.logger.V(1).Info("scaler error encountered, clearing scaler cache", "scaledObject.Namespace", scaledObject.Namespace, "scaledObject.Name", scaledObject.Name) + } + + return isScaledObjectActive, isScalerError, metricsRecord +} + // buildScalers returns list of Scalers for the specified triggers func (h *scaleHandler) buildScalers(ctx context.Context, withTriggers *kedav1alpha1.WithTriggers, podTemplateSpec *corev1.PodTemplateSpec, containerName string) ([]cache.ScalerBuilder, error) { logger := h.logger.WithValues("type", withTriggers.Kind, "namespace", withTriggers.Namespace, "name", withTriggers.Name) diff --git a/pkg/scaling/scale_handler_test.go b/pkg/scaling/scale_handler_test.go index 44d6275aaa2..339d08c4dde 100644 --- a/pkg/scaling/scale_handler_test.go +++ b/pkg/scaling/scale_handler_test.go @@ -228,6 +228,8 @@ func TestGetScaledObjectMetrics_FromCache(t *testing.T) { func TestCheckScaledObjectScalersWithError(t *testing.T) { ctrl := gomock.NewController(t) + mockClient := mock_client.NewMockClient(ctrl) + mockExecutor := mock_executor.NewMockScaleExecutor(ctrl) recorder := record.NewFakeRecorder(1) metricsSpecs := []v2.MetricSpec{createMetricSpec(1, "metric-name")} @@ -256,7 +258,7 @@ func TestCheckScaledObjectScalersWithError(t *testing.T) { }, } - cache := cache.ScalersCache{ + scalerCache := cache.ScalersCache{ Scalers: []cache.ScalerBuilder{{ Scaler: scaler, Factory: factory, @@ -264,8 +266,23 @@ func TestCheckScaledObjectScalersWithError(t *testing.T) { Recorder: recorder, } - isActive, isError, _ := cache.GetScaledObjectState(context.TODO(), &scaledObject) - cache.Close(context.Background()) + caches := map[string]*cache.ScalersCache{} + caches[scaledObject.GenerateIdentifier()] = &scalerCache + + sh := scaleHandler{ + client: mockClient, + logger: logr.Discard(), + scaleLoopContexts: &sync.Map{}, + scaleExecutor: mockExecutor, + globalHTTPTimeout: time.Duration(1000), + recorder: recorder, + scalerCaches: caches, + scalerCachesLock: &sync.RWMutex{}, + scaledObjectsMetricCache: metricscache.NewMetricsCache(), + } + + isActive, isError, _ := sh.getScaledObjectState(context.TODO(), &scaledObject) + scalerCache.Close(context.Background()) assert.Equal(t, false, isActive) assert.Equal(t, true, isError) @@ -273,6 +290,8 @@ func TestCheckScaledObjectScalersWithError(t *testing.T) { func TestCheckScaledObjectFindFirstActiveNotIgnoreOthers(t *testing.T) { ctrl := gomock.NewController(t) + mockClient := mock_client.NewMockClient(ctrl) + mockExecutor := mock_executor.NewMockScaleExecutor(ctrl) recorder := record.NewFakeRecorder(1) metricsSpecs := []v2.MetricSpec{createMetricSpec(1, "metric-name")} @@ -298,7 +317,7 @@ func TestCheckScaledObjectFindFirstActiveNotIgnoreOthers(t *testing.T) { failingScaler.EXPECT().GetMetricsAndActivity(gomock.Any(), gomock.Any()).Return([]external_metrics.ExternalMetricValue{}, false, errors.New("some error")) failingScaler.EXPECT().Close(gomock.Any()) - scaledObject := &kedav1alpha1.ScaledObject{ + scaledObject := kedav1alpha1.ScaledObject{ ObjectMeta: metav1.ObjectMeta{ Name: "test", Namespace: "test", @@ -318,13 +337,28 @@ func TestCheckScaledObjectFindFirstActiveNotIgnoreOthers(t *testing.T) { Factory: failingFactory, }} - scalersCache := cache.ScalersCache{ + scalerCache := cache.ScalersCache{ Scalers: scalers, Recorder: recorder, } - isActive, isError, _ := scalersCache.GetScaledObjectState(context.TODO(), scaledObject) - scalersCache.Close(context.Background()) + caches := map[string]*cache.ScalersCache{} + caches[scaledObject.GenerateIdentifier()] = &scalerCache + + sh := scaleHandler{ + client: mockClient, + logger: logr.Discard(), + scaleLoopContexts: &sync.Map{}, + scaleExecutor: mockExecutor, + globalHTTPTimeout: time.Duration(1000), + recorder: recorder, + scalerCaches: caches, + scalerCachesLock: &sync.RWMutex{}, + scaledObjectsMetricCache: metricscache.NewMetricsCache(), + } + + isActive, isError, _ := sh.getScaledObjectState(context.TODO(), &scaledObject) + scalerCache.Close(context.Background()) assert.Equal(t, true, isActive) assert.Equal(t, true, isError)