Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

expose Prom Metrics also when getting ScaledObject State #4115

Merged
merged 1 commit into from
Jan 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio

Here is an overview of all **stable** additions:

- **General**: Introduce admission webhooks to automatically validate resource changes to prevent misconfiguration and enforce best practices. ([#3755](https://github.com/kedacore/keda/issues/3755))
- **General**: Introduce admission webhooks to automatically validate resource changes to prevent misconfiguration and enforce best practices ([#3755](https://github.com/kedacore/keda/issues/3755))
- **General**: Introduce new ArangoDB Scaler ([#4000](https://github.com/kedacore/keda/issues/4000))
- **Prometheus Metrics**: Introduce scaler latency in Prometheus metrics. ([#4037](https://github.com/kedacore/keda/issues/4037))
- **Prometheus Metrics**: Introduce scaler latency in Prometheus metrics ([#4037](https://github.com/kedacore/keda/issues/4037))

Here is an overview of all new **experimental** features:

Expand All @@ -64,6 +64,7 @@ Here is an overview of all new **experimental** features:
### Fixes

- **General**: Prevent a panic that might occur while refreshing a scaler cache ([#4092](https://github.com/kedacore/keda/issues/4092))
- **Prometheus Metrics**: Expose Prometheus Metrics also when getting ScaledObject state ([#4075](https://github.com/kedacore/keda/issues/4075))
- **Azure Service Bus Scaler:** Use correct auth flows with pod identity ([#4026](https://github.com/kedacore/keda/issues/4026))
- **CPU Memory Scaler** Store forgotten logger ([#4022](https://github.com/kedacore/keda/issues/4022))

Expand Down
2 changes: 1 addition & 1 deletion pkg/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func (p *KedaProvider) GetExternalMetric(ctx context.Context, namespace string,
}
// Filter only the desired metric
if strings.EqualFold(metricSpec.External.Metric.Name, info.Metric) {
metrics, _, err := cache.GetMetricsForScaler(ctx, scalerIndex, info.Metric)
metrics, _, _, err := cache.GetMetricsAndActivityForScaler(ctx, scalerIndex, info.Metric)
metrics, err = fallback.GetMetricsWithFallback(ctx, p.client, logger, metrics, err, info.Metric, scaledObject, metricSpec)
if err != nil {
scalerError = true
Expand Down
129 changes: 38 additions & 91 deletions pkg/scaling/cache/scalers_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/pkg/eventreason"
"github.com/kedacore/keda/v2/pkg/scalers"
"github.com/kedacore/keda/v2/pkg/scaling/cache/metricscache"
)

var log = logf.Log.WithName("scalers_cache")
Expand Down Expand Up @@ -70,105 +69,53 @@ func (c *ScalersCache) GetPushScalers() []scalers.PushScaler {
return result
}

// GetMetricsForScaler returns metric value and latency for a scaler identified by the metric name
// GetMetricSpecForScalingForScaler returns metrics spec for a scaler identified by the metric name
func (c *ScalersCache) GetMetricSpecForScalingForScaler(ctx context.Context, index int) ([]v2.MetricSpec, error) {
zroubalik marked this conversation as resolved.
Show resolved Hide resolved
var err error

scalersList, _ := c.GetScalers()
if index < 0 || index >= len(scalersList) {
return nil, fmt.Errorf("scaler with id %d not found. Len = %d", index, len(c.Scalers))
}

metricSpecs := scalersList[index].GetMetricSpecForScaling(ctx)

// no metric spec returned for a scaler -> this could signal error during connection to the scaler
// usually in case this is an external scaler
// let's try to refresh the scaler and query metrics spec again
if len(metricSpecs) < 1 {
var ns scalers.Scaler
ns, err = c.refreshScaler(ctx, index)
if err == nil {
metricSpecs = ns.GetMetricSpecForScaling(ctx)
if len(metricSpecs) < 1 {
err = fmt.Errorf("got empty metric spec")
}
}
}

return metricSpecs, err
}

// GetMetricsAndActivityForScaler returns metric value, activity and latency for a scaler identified by the metric name
// and by the input index (from the list of scalers in this ScaledObject)
func (c *ScalersCache) GetMetricsForScaler(ctx context.Context, index int, metricName string) ([]external_metrics.ExternalMetricValue, int64, error) {
func (c *ScalersCache) GetMetricsAndActivityForScaler(ctx context.Context, index int, metricName string) ([]external_metrics.ExternalMetricValue, bool, int64, error) {
if index < 0 || index >= len(c.Scalers) {
return nil, -1, fmt.Errorf("scaler with id %d not found. Len = %d", index, len(c.Scalers))
return nil, false, -1, fmt.Errorf("scaler with id %d not found. Len = %d", index, len(c.Scalers))
}
startTime := time.Now()
m, _, err := c.Scalers[index].Scaler.GetMetricsAndActivity(ctx, metricName)
metric, activity, err := c.Scalers[index].Scaler.GetMetricsAndActivity(ctx, metricName)
if err == nil {
return m, time.Since(startTime).Milliseconds(), nil
return metric, activity, time.Since(startTime).Milliseconds(), nil
}

ns, err := c.refreshScaler(ctx, index)
if err != nil {
return nil, -1, err
return nil, false, -1, err
}
startTime = time.Now()
m, _, err = ns.GetMetricsAndActivity(ctx, metricName)
return m, time.Since(startTime).Milliseconds(), err
}

// GetScaledObjectState returns whether the input ScaledObject is active as a first parameters,
// the second parameter indicates whether there was any error during quering scalers
// the third parameter returns map of metrics record - a metric value for each scaler and it's metric
func (c *ScalersCache) GetScaledObjectState(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject) (bool, bool, map[string]metricscache.MetricsRecord) {
logger := log.WithValues("scaledobject.Name", scaledObject.Name, "scaledObject.Namespace", scaledObject.Namespace, "scaleTarget.Name", scaledObject.Spec.ScaleTargetRef.Name)

isScaledObjectActive := false
isError := false
metricsRecord := map[string]metricscache.MetricsRecord{}

// Let's collect status of all scalers, no matter if any scaler raises error or is active
for i, s := range c.Scalers {
metricSpec := s.Scaler.GetMetricSpecForScaling(ctx)

// no metric spec returned for a scaler -> this could signal error during connection to the scaler
// usually in case this is an external scaler
// let's try to refresh the scaler and query metrics spec again
if len(metricSpec) < 1 {
var err error
var ns scalers.Scaler

ns, err = c.refreshScaler(ctx, i)
if err == nil {
metricSpec = ns.GetMetricSpecForScaling(ctx)
if len(metricSpec) < 1 {
isError = true
err = fmt.Errorf("error getting metrics spec")
logger.Error(err, "error getting metric spec for the scaler", "scaler", s.ScalerConfig.TriggerName)
c.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error())
}
} else {
isError = true
logger.Error(err, "error getting metric spec for the scaler", "scaler", s.ScalerConfig.TriggerName)
c.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error())
}
}

for _, spec := range metricSpec {
// skip cpu/memory resource scaler, these scalers are also always Active
if spec.External == nil {
isScaledObjectActive = true
continue
}

metric, isMetricActive, err := s.Scaler.GetMetricsAndActivity(ctx, spec.External.Metric.Name)
if err != nil {
var ns scalers.Scaler
ns, err = c.refreshScaler(ctx, i)
if err == nil {
metric, isMetricActive, err = ns.GetMetricsAndActivity(ctx, spec.External.Metric.Name)
}
}

if s.ScalerConfig.TriggerUseCachedMetrics {
metricsRecord[spec.External.Metric.Name] = metricscache.MetricsRecord{
IsActive: isMetricActive,
Metric: metric,
ScalerError: err,
}
}

if err != nil {
isError = true
logger.Error(err, "error getting scale decision")
c.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error())
} else if isMetricActive {
isScaledObjectActive = true
if spec.External != nil {
logger.V(1).Info("Scaler for scaledObject is active", "Metrics Name", spec.External.Metric.Name)
}
if spec.Resource != nil {
logger.V(1).Info("Scaler for scaledObject is active", "Metrics Name", spec.Resource.Name)
}
}
}
}

return isScaledObjectActive, isError, metricsRecord
metric, activity, err = ns.GetMetricsAndActivity(ctx, metricName)
return metric, activity, time.Since(startTime).Milliseconds(), err
}

func (c *ScalersCache) IsScaledJobActive(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) (bool, int64, int64) {
Expand Down Expand Up @@ -233,7 +180,7 @@ func (c *ScalersCache) IsScaledJobActive(ctx context.Context, scaledJob *kedav1a

func (c *ScalersCache) refreshScaler(ctx context.Context, id int) (scalers.Scaler, error) {
if id < 0 || id >= len(c.Scalers) {
return nil, fmt.Errorf("scaler with id %d not found. Len = %d", id, len(c.Scalers))
return nil, fmt.Errorf("scaler with id %d not found, len = %d, cache has been probably already invalidated", id, len(c.Scalers))
}

sb := c.Scalers[id]
Expand All @@ -244,7 +191,7 @@ func (c *ScalersCache) refreshScaler(ctx context.Context, id int) (scalers.Scale
}

if id < 0 || id >= len(c.Scalers) {
return nil, fmt.Errorf("scaler with id %d not found. Len = %d", id, len(c.Scalers))
return nil, fmt.Errorf("scaler with id %d not found, len = %d, cache has been probably already invalidated", id, len(c.Scalers))
}
c.Scalers[id] = ScalerBuilder{
Scaler: ns,
Expand Down
Loading