Skip to content

Commit

Permalink
serve all Prometheus metrics from Operator
Browse files Browse the repository at this point in the history
Signed-off-by: Zbynek Roubalik <[email protected]>
  • Loading branch information
zroubalik committed Nov 28, 2022
1 parent cf91e83 commit 810d9af
Show file tree
Hide file tree
Showing 12 changed files with 153 additions and 98 deletions.
2 changes: 1 addition & 1 deletion adapter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import (
kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
kedacontrollers "github.com/kedacore/keda/v2/controllers/keda"
"github.com/kedacore/keda/v2/pkg/metricsservice"
"github.com/kedacore/keda/v2/pkg/prommetrics"
prommetrics "github.com/kedacore/keda/v2/pkg/prommetrics/adapter"
kedaprovider "github.com/kedacore/keda/v2/pkg/provider"
"github.com/kedacore/keda/v2/pkg/scaling"
kedautil "github.com/kedacore/keda/v2/pkg/util"
Expand Down
3 changes: 3 additions & 0 deletions config/manager/service.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,8 @@ spec:
- name: metricsservice
port: 9666
targetPort: 9666
- name: metrics
port: 8080
targetPort: 8080
selector:
app: keda-operator
11 changes: 1 addition & 10 deletions pkg/metricsservice/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,8 @@ type GrpcServer struct {

// GetMetrics returns metrics values in form of ExternalMetricValueList for specified ScaledObject reference
func (s *GrpcServer) GetMetrics(ctx context.Context, in *api.ScaledObjectRef) (*v1beta1.ExternalMetricValueList, error) {
// TODO hit the metrics cache here first

cache, err := (*s.scalerHandler).GetScalersCacheForScaledObject(ctx, in.Name, in.Namespace)
// TODO fix Prom metrics recorder
// metricsServer.RecordScalerObjectError(scaledObject.Namespace, scaledObject.Name, err)
if err != nil {
return nil, fmt.Errorf("error when getting scalers %s", err)
}

v1beta1ExtMetrics := &v1beta1.ExternalMetricValueList{}
extMetrics, err := (*s.scalerHandler).GetExternalMetricsValuesList(ctx, cache, &cache.ScaledObject, in.MetricName)
extMetrics, err := (*s.scalerHandler).GetExternalMetricsValuesList(ctx, in.Name, in.Namespace, in.MetricName)
if err != nil {
return nil, fmt.Errorf("error when getting metric values %s", err)
}
Expand Down
24 changes: 4 additions & 20 deletions pkg/mock/mock_scaling/mock_interface.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package prommetrics
package adapter

import (
"log"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package prommetrics
package adapter

// Server an HTTP serving instance to track metrics
type Server interface {
Expand Down
88 changes: 88 additions & 0 deletions pkg/prommetrics/operator_prommetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@ limitations under the License.
package prommetrics

import (
"strconv"

"github.com/prometheus/client_golang/prometheus"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/metrics"
)

var log = ctrl.Log.WithName("prometheus_server")

const (
ClusterTriggerAuthenticationResource = "cluster_trigger_authentication"
TriggerAuthenticationResource = "trigger_authentication"
Expand All @@ -29,6 +34,44 @@ const (
)

var (
metricLabels = []string{"namespace", "metric", "scaledObject", "scaler", "scalerIndex"}
scalerErrorsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "keda_operator",
Subsystem: "scaler",
Name: "errors_total",
Help: "Total number of errors for all scalers",
},
[]string{},
)
scalerMetricsValue = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "keda_operator",
Subsystem: "scaler",
Name: "metrics_value",
Help: "Metric Value used for HPA",
},
metricLabels,
)
scalerErrors = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "keda_operator",
Subsystem: "scaler",
Name: "errors",
Help: "Number of scaler errors",
},
metricLabels,
)
scaledObjectErrors = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "keda_operator",
Subsystem: "scaled_object",
Name: "errors",
Help: "Number of scaled object errors",
},
[]string{"namespace", "scaledObject"},
)

triggerTotalsGaugeVec = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "keda_operator",
Expand All @@ -49,10 +92,55 @@ var (
)

func init() {
metrics.Registry.MustRegister(scalerErrorsTotal)
metrics.Registry.MustRegister(scalerMetricsValue)
metrics.Registry.MustRegister(scalerErrors)
metrics.Registry.MustRegister(scaledObjectErrors)

metrics.Registry.MustRegister(triggerTotalsGaugeVec)
metrics.Registry.MustRegister(crdTotalsGaugeVec)
}

// RecordHPAScalerMetric create a measurement of the external metric used by the HPA
func RecordHPAScalerMetric(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, value float64) {
scalerMetricsValue.With(getLabels(namespace, scaledObject, scaler, scalerIndex, metric)).Set(value)
}

// RecordHPAScalerError counts the number of errors occurred in trying get an external metric used by the HPA
func RecordHPAScalerError(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, err error) {
if err != nil {
scalerErrors.With(getLabels(namespace, scaledObject, scaler, scalerIndex, metric)).Inc()
// scaledObjectErrors.With(prometheus.Labels{"namespace": namespace, "scaledObject": scaledObject}).Inc()
RecordScalerObjectError(namespace, scaledObject, err)
scalerErrorsTotal.With(prometheus.Labels{}).Inc()
return
}
// initialize metric with 0 if not already set
_, errscaler := scalerErrors.GetMetricWith(getLabels(namespace, scaledObject, scaler, scalerIndex, metric))
if errscaler != nil {
log.Error(errscaler, "Unable to write to metrics to Prometheus Server: %v")
}
}

// RecordScalerObjectError counts the number of errors with the scaled object
func RecordScalerObjectError(namespace string, scaledObject string, err error) {
labels := prometheus.Labels{"namespace": namespace, "scaledObject": scaledObject}
if err != nil {
scaledObjectErrors.With(labels).Inc()
return
}
// initialize metric with 0 if not already set
_, errscaledobject := scaledObjectErrors.GetMetricWith(labels)
if errscaledobject != nil {
log.Error(errscaledobject, "Unable to write to metrics to Prometheus Server: %v")
return
}
}

func getLabels(namespace string, scaledObject string, scaler string, scalerIndex int, metric string) prometheus.Labels {
return prometheus.Labels{"namespace": namespace, "scaledObject": scaledObject, "scaler": scaler, "scalerIndex": strconv.Itoa(scalerIndex), "metric": metric}
}

func IncrementTriggerTotal(triggerType string) {
if triggerType != "" {
triggerTotalsGaugeVec.WithLabelValues(triggerType).Inc()
Expand Down
24 changes: 14 additions & 10 deletions pkg/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ import (

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/pkg/metricsservice"
"github.com/kedacore/keda/v2/pkg/prommetrics"
"github.com/kedacore/keda/v2/pkg/scaling"
)

// prommetrics "github.com/kedacore/keda/v2/pkg/prommetrics/adapter"

// KedaProvider implements External Metrics Provider
type KedaProvider struct {
client client.Client
Expand All @@ -50,8 +51,9 @@ type KedaProvider struct {
}

var (
logger logr.Logger
metricsServer prommetrics.PrometheusMetricServer
logger logr.Logger

// metricsServer prommetrics.PrometheusMetricServer
)

// NewProvider returns an instance of KedaProvider
Expand Down Expand Up @@ -109,14 +111,16 @@ func (p *KedaProvider) GetExternalMetric(ctx context.Context, namespace string,
return nil, fmt.Errorf("exactly one ScaledObject should match label %s", metricSelector.String())
}

scaledObject := &scaledObjects.Items[0]
cache, err := p.scaleHandler.GetScalersCache(ctx, scaledObject)
metricsServer.RecordScalerObjectError(scaledObject.Namespace, scaledObject.Name, err)
if err != nil {
return nil, fmt.Errorf("error when getting scalers %s", err)
}
// scaledObject := &scaledObjects.Items[0]
// cache, err := p.scaleHandler.GetScalersCache(ctx, scaledObject)
// metricsServer.RecordScalerObjectError(scaledObject.Namespace, scaledObject.Name, err)
// if err != nil {
// return nil, fmt.Errorf("error when getting scalers %s", err)
// }

return p.scaleHandler.GetExternalMetricsValuesList(ctx, cache, scaledObject, info.Metric)
// TODO Fixme
return nil, nil
// return p.scaleHandler.GetExternalMetricsValuesList(ctx, cache, scaledObject, info.Metric)
}

// ListAllExternalMetrics returns the supported external metrics for this provider
Expand Down
2 changes: 1 addition & 1 deletion pkg/scaling/cache/scalers_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
)

type ScalersCache struct {
ScaledObject kedav1alpha1.ScaledObject
ScaledObject *kedav1alpha1.ScaledObject
Generation int64
Scalers []ScalerBuilder
Logger logr.Logger
Expand Down
45 changes: 29 additions & 16 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/pkg/eventreason"
"github.com/kedacore/keda/v2/pkg/fallback"
"github.com/kedacore/keda/v2/pkg/prommetrics"
"github.com/kedacore/keda/v2/pkg/scalers"
"github.com/kedacore/keda/v2/pkg/scaling/cache"
"github.com/kedacore/keda/v2/pkg/scaling/executor"
Expand All @@ -50,8 +51,8 @@ type ScaleHandler interface {
GetScalersCache(ctx context.Context, scalableObject interface{}) (*cache.ScalersCache, error)
ClearScalersCache(ctx context.Context, scalableObject interface{}) error

GetScalersCacheForScaledObject(ctx context.Context, scaledObjectName, scaledObjectNamespace string) (*cache.ScalersCache, error)
GetExternalMetricsValuesList(ctx context.Context, cache *cache.ScalersCache, scaledObject *kedav1alpha1.ScaledObject, metricName string) (*external_metrics.ExternalMetricValueList, error)
// GetScalersCacheForScaledObject(ctx context.Context, scaledObjectName, scaledObjectNamespace string) (*cache.ScalersCache, error)
GetExternalMetricsValuesList(ctx context.Context, scaledObjectName, scaledObjectNamespace, metricName string) (*external_metrics.ExternalMetricValueList, error)
}

type scaleHandler struct {
Expand Down Expand Up @@ -169,7 +170,7 @@ func (h *scaleHandler) startScaleLoop(ctx context.Context, withTriggers *kedav1a
}
}

func (h *scaleHandler) GetScalersCacheForScaledObject(ctx context.Context, scaledObjectName, scaledObjectNamespace string) (*cache.ScalersCache, error) {
func (h *scaleHandler) getScalersCacheForScaledObject(ctx context.Context, scaledObjectName, scaledObjectNamespace string) (*cache.ScalersCache, error) {
key := kedav1alpha1.GenerateIdentifier("ScaledObject", scaledObjectNamespace, scaledObjectName)

h.lock.RLock()
Expand Down Expand Up @@ -208,8 +209,7 @@ func (h *scaleHandler) GetScalersCacheForScaledObject(ctx context.Context, scale
}

h.scalerCaches[key] = &cache.ScalersCache{
ScaledObject: *scaledObject,
Generation: withTriggers.Generation,
ScaledObject: scaledObject,
Scalers: scalers,
Logger: h.logger,
Recorder: h.recorder,
Expand Down Expand Up @@ -260,7 +260,7 @@ func (h *scaleHandler) GetScalersCache(ctx context.Context, scalableObject inter
}
switch obj := scalableObject.(type) {
case *kedav1alpha1.ScaledObject:
newCache.ScaledObject = *obj
newCache.ScaledObject = obj
default:
}

Expand Down Expand Up @@ -351,14 +351,29 @@ func (h *scaleHandler) checkScalers(ctx context.Context, scalableObject interfac
}
}

func (h *scaleHandler) GetExternalMetricsValuesList(ctx context.Context, cache *cache.ScalersCache, scaledObject *kedav1alpha1.ScaledObject, metricName string) (*external_metrics.ExternalMetricValueList, error) {
func (h *scaleHandler) GetExternalMetricsValuesList(ctx context.Context, scaledObjectName, scaledObjectNamespace, metricName string) (*external_metrics.ExternalMetricValueList, error) {
var matchingMetrics []external_metrics.ExternalMetricValue

cache, err := h.getScalersCacheForScaledObject(ctx, scaledObjectName, scaledObjectNamespace)
prommetrics.RecordScalerObjectError(scaledObjectNamespace, scaledObjectName, err)
if err != nil {
return nil, fmt.Errorf("error when getting scalers %s", err)
}

var scaledObject *kedav1alpha1.ScaledObject
if cache.ScaledObject != nil {
scaledObject = cache.ScaledObject
} else {
err := fmt.Errorf("scaledObject not found in the cache")
h.logger.Error(err, "scaledObject not found in the cache", "name", scaledObjectName, "namespace", scaledObjectNamespace)
return nil, err
}

// let's check metrics for all scalers in a ScaledObject
scalerError := false
scalers, scalerConfigs := cache.GetScalers()

h.logger.V(1).WithValues("name", scaledObject.Name, "namespace", scaledObject.Namespace, "metricName", metricName, "scalers", scalers).Info("Getting metric value")
h.logger.V(1).WithValues("name", scaledObjectName, "namespace", scaledObjectNamespace, "metricName", metricName, "scalers", scalers).Info("Getting metric value")

for scalerIndex := 0; scalerIndex < len(scalers); scalerIndex++ {
metricSpecs := scalers[scalerIndex].GetMetricSpecForScaling(ctx)
Expand All @@ -379,17 +394,15 @@ func (h *scaleHandler) GetExternalMetricsValuesList(ctx context.Context, cache *
metrics, err = fallback.GetMetricsWithFallback(ctx, h.client, h.logger, metrics, err, metricName, scaledObject, metricSpec)
if err != nil {
scalerError = true
h.logger.Error(err, "error getting metric for scaler", "scaledObject.Namespace", scaledObject.Namespace, "scaledObject.Name", scaledObject.Name, "scaler", scalerName)
h.logger.Error(err, "error getting metric for scaler", "scaledObject.Namespace", scaledObjectNamespace, "scaledObject.Name", scaledObjectName, "scaler", scalerName)
} else {
// TODO fix Prom metrics recorder
// for _, metric := range metrics {
// metricValue := metric.Value.AsApproximateFloat64()
// metricsServer.RecordHPAScalerMetric(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metric.MetricName, metricValue)
// }
for _, metric := range metrics {
metricValue := metric.Value.AsApproximateFloat64()
prommetrics.RecordHPAScalerMetric(scaledObjectNamespace, scaledObjectName, scalerName, scalerIndex, metric.MetricName, metricValue)
}
matchingMetrics = append(matchingMetrics, metrics...)
}
// TODO fix Prom metrics recorder
// metricsServer.RecordHPAScalerError(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metricName, err)
prommetrics.RecordHPAScalerError(scaledObjectNamespace, scaledObjectName, scalerName, scalerIndex, metricName, err)
}
}
}
Expand Down
10 changes: 0 additions & 10 deletions pkg/util/env_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,3 @@ func ResolveOsEnvDuration(envName string) (*time.Duration, error) {

return nil, nil
}

func ResolveOsEnvBool(envName string, defaultValue bool) (bool, error) {
valueStr, found := os.LookupEnv(envName)

if found && valueStr != "" {
return strconv.ParseBool(valueStr)
}

return defaultValue, nil
}
Loading

0 comments on commit 810d9af

Please sign in to comment.