Skip to content

Commit

Permalink
expose Prom Metrics also when getting ScaledObject State
Browse files Browse the repository at this point in the history
Signed-off-by: Zbynek Roubalik <[email protected]>
  • Loading branch information
zroubalik committed Jan 16, 2023
1 parent 73000c0 commit d496dbe
Show file tree
Hide file tree
Showing 5 changed files with 201 additions and 124 deletions.
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio

Here is an overview of all **stable** additions:

- **General**: Introduce admission webhooks to automatically validate resource changes to prevent misconfiguration and enforce best practices. ([#3755](https://github.com/kedacore/keda/issues/3755))
- **General**: Introduce admission webhooks to automatically validate resource changes to prevent misconfiguration and enforce best practices ([#3755](https://github.com/kedacore/keda/issues/3755))
- **General**: Introduce new ArangoDB Scaler ([#4000](https://github.com/kedacore/keda/issues/4000))
- **Prometheus Metrics**: Introduce scaler latency in Prometheus metrics. ([#4037](https://github.com/kedacore/keda/issues/4037))
- **Prometheus Metrics**: Introduce scaler latency in Prometheus metrics ([#4037](https://github.com/kedacore/keda/issues/4037))

Here is an overview of all new **experimental** features:

Expand All @@ -64,6 +64,7 @@ Here is an overview of all new **experimental** features:
### Fixes

- **General**: Prevent a panic that might occur while refreshing a scaler cache ([#4092](https://github.com/kedacore/keda/issues/4092))
- **Prometheus Metrics**: Expose Prometheus Metrics also when getting ScaledObject state ([#4075](https://github.com/kedacore/keda/issues/4075))
- **Azure Service Bus Scaler:** Use correct auth flows with pod identity ([#4026](https://github.com/kedacore/keda/issues/4026))
- **CPU Memory Scaler** Store forgotten logger ([#4022](https://github.com/kedacore/keda/issues/4022))

Expand Down
2 changes: 1 addition & 1 deletion pkg/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func (p *KedaProvider) GetExternalMetric(ctx context.Context, namespace string,
}
// Filter only the desired metric
if strings.EqualFold(metricSpec.External.Metric.Name, info.Metric) {
metrics, _, err := cache.GetMetricsForScaler(ctx, scalerIndex, info.Metric)
metrics, _, _, err := cache.GetMetricsAndActivityForScaler(ctx, scalerIndex, info.Metric)
metrics, err = fallback.GetMetricsWithFallback(ctx, p.client, logger, metrics, err, info.Metric, scaledObject, metricSpec)
if err != nil {
scalerError = true
Expand Down
129 changes: 38 additions & 91 deletions pkg/scaling/cache/scalers_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/pkg/eventreason"
"github.com/kedacore/keda/v2/pkg/scalers"
"github.com/kedacore/keda/v2/pkg/scaling/cache/metricscache"
)

var log = logf.Log.WithName("scalers_cache")
Expand Down Expand Up @@ -70,105 +69,53 @@ func (c *ScalersCache) GetPushScalers() []scalers.PushScaler {
return result
}

// GetMetricsForScaler returns metric value and latency for a scaler identified by the metric name
// GetMetricSpecForScalingForScaler returns metrics spec for a scaler identified by the metric name
func (c *ScalersCache) GetMetricSpecForScalingForScaler(ctx context.Context, index int) ([]v2.MetricSpec, error) {
var err error

scalersList, _ := c.GetScalers()
if index < 0 || index >= len(scalersList) {
return nil, fmt.Errorf("scaler with id %d not found. Len = %d", index, len(c.Scalers))
}

metricSpecs := scalersList[index].GetMetricSpecForScaling(ctx)

// no metric spec returned for a scaler -> this could signal error during connection to the scaler
// usually in case this is an external scaler
// let's try to refresh the scaler and query metrics spec again
if len(metricSpecs) < 1 {
var ns scalers.Scaler
ns, err = c.refreshScaler(ctx, index)
if err == nil {
metricSpecs = ns.GetMetricSpecForScaling(ctx)
if len(metricSpecs) < 1 {
err = fmt.Errorf("got empty metric spec")
}
}
}

return metricSpecs, err
}

// GetMetricsAndActivityForScaler returns metric value, activity and latency for a scaler identified by the metric name
// and by the input index (from the list of scalers in this ScaledObject)
func (c *ScalersCache) GetMetricsForScaler(ctx context.Context, index int, metricName string) ([]external_metrics.ExternalMetricValue, int64, error) {
func (c *ScalersCache) GetMetricsAndActivityForScaler(ctx context.Context, index int, metricName string) ([]external_metrics.ExternalMetricValue, bool, int64, error) {
if index < 0 || index >= len(c.Scalers) {
return nil, -1, fmt.Errorf("scaler with id %d not found. Len = %d", index, len(c.Scalers))
return nil, false, -1, fmt.Errorf("scaler with id %d not found. Len = %d", index, len(c.Scalers))
}
startTime := time.Now()
m, _, err := c.Scalers[index].Scaler.GetMetricsAndActivity(ctx, metricName)
metric, activity, err := c.Scalers[index].Scaler.GetMetricsAndActivity(ctx, metricName)
if err == nil {
return m, time.Since(startTime).Milliseconds(), nil
return metric, activity, time.Since(startTime).Milliseconds(), nil
}

ns, err := c.refreshScaler(ctx, index)
if err != nil {
return nil, -1, err
return nil, false, -1, err
}
startTime = time.Now()
m, _, err = ns.GetMetricsAndActivity(ctx, metricName)
return m, time.Since(startTime).Milliseconds(), err
}

// GetScaledObjectState returns whether the input ScaledObject is active as a first parameters,
// the second parameter indicates whether there was any error during quering scalers
// the third parameter returns map of metrics record - a metric value for each scaler and it's metric
func (c *ScalersCache) GetScaledObjectState(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject) (bool, bool, map[string]metricscache.MetricsRecord) {
logger := log.WithValues("scaledobject.Name", scaledObject.Name, "scaledObject.Namespace", scaledObject.Namespace, "scaleTarget.Name", scaledObject.Spec.ScaleTargetRef.Name)

isScaledObjectActive := false
isError := false
metricsRecord := map[string]metricscache.MetricsRecord{}

// Let's collect status of all scalers, no matter if any scaler raises error or is active
for i, s := range c.Scalers {
metricSpec := s.Scaler.GetMetricSpecForScaling(ctx)

// no metric spec returned for a scaler -> this could signal error during connection to the scaler
// usually in case this is an external scaler
// let's try to refresh the scaler and query metrics spec again
if len(metricSpec) < 1 {
var err error
var ns scalers.Scaler

ns, err = c.refreshScaler(ctx, i)
if err == nil {
metricSpec = ns.GetMetricSpecForScaling(ctx)
if len(metricSpec) < 1 {
isError = true
err = fmt.Errorf("error getting metrics spec")
logger.Error(err, "error getting metric spec for the scaler", "scaler", s.ScalerConfig.TriggerName)
c.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error())
}
} else {
isError = true
logger.Error(err, "error getting metric spec for the scaler", "scaler", s.ScalerConfig.TriggerName)
c.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error())
}
}

for _, spec := range metricSpec {
// skip cpu/memory resource scaler, these scalers are also always Active
if spec.External == nil {
isScaledObjectActive = true
continue
}

metric, isMetricActive, err := s.Scaler.GetMetricsAndActivity(ctx, spec.External.Metric.Name)
if err != nil {
var ns scalers.Scaler
ns, err = c.refreshScaler(ctx, i)
if err == nil {
metric, isMetricActive, err = ns.GetMetricsAndActivity(ctx, spec.External.Metric.Name)
}
}

if s.ScalerConfig.TriggerUseCachedMetrics {
metricsRecord[spec.External.Metric.Name] = metricscache.MetricsRecord{
IsActive: isMetricActive,
Metric: metric,
ScalerError: err,
}
}

if err != nil {
isError = true
logger.Error(err, "error getting scale decision")
c.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error())
} else if isMetricActive {
isScaledObjectActive = true
if spec.External != nil {
logger.V(1).Info("Scaler for scaledObject is active", "Metrics Name", spec.External.Metric.Name)
}
if spec.Resource != nil {
logger.V(1).Info("Scaler for scaledObject is active", "Metrics Name", spec.Resource.Name)
}
}
}
}

return isScaledObjectActive, isError, metricsRecord
metric, activity, err = ns.GetMetricsAndActivity(ctx, metricName)
return metric, activity, time.Since(startTime).Milliseconds(), err
}

func (c *ScalersCache) IsScaledJobActive(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) (bool, int64, int64) {
Expand Down Expand Up @@ -233,7 +180,7 @@ func (c *ScalersCache) IsScaledJobActive(ctx context.Context, scaledJob *kedav1a

func (c *ScalersCache) refreshScaler(ctx context.Context, id int) (scalers.Scaler, error) {
if id < 0 || id >= len(c.Scalers) {
return nil, fmt.Errorf("scaler with id %d not found. Len = %d", id, len(c.Scalers))
return nil, fmt.Errorf("scaler with id %d not found, len = %d, cache has been probably already invalidated", id, len(c.Scalers))
}

sb := c.Scalers[id]
Expand All @@ -244,7 +191,7 @@ func (c *ScalersCache) refreshScaler(ctx context.Context, id int) (scalers.Scale
}

if id < 0 || id >= len(c.Scalers) {
return nil, fmt.Errorf("scaler with id %d not found. Len = %d", id, len(c.Scalers))
return nil, fmt.Errorf("scaler with id %d not found, len = %d, cache has been probably already invalidated", id, len(c.Scalers))
}
c.Scalers[id] = ScalerBuilder{
Scaler: ns,
Expand Down
Loading

0 comments on commit d496dbe

Please sign in to comment.