Skip to content

Commit

Permalink
feat: Introduce operational metrics in OpenTelemetry Collector (#4860)
Browse files Browse the repository at this point in the history
Co-authored-by: Tom Kerkhove <[email protected]>
  • Loading branch information
SpiritZhou and tomkerkhove authored Sep 28, 2023
1 parent 36a7ba9 commit 95a235b
Show file tree
Hide file tree
Showing 154 changed files with 14,466 additions and 1,673 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
Here is an overview of all new **experimental** features:

- **General**: Add support for formula based evaluation of metric values ([#2440](https://github.com/kedacore/keda/issues/2440)|[#4998](https://github.com/kedacore/keda/pull/4998))
- **General**: Introduce operational metrics in OpenTelemetry Collector ([#3078](https://github.com/kedacore/keda/issues/3078))

### Improvements
- **General**: Add apiserver Prometheus metrics to KEDA Metric Server ([#4460](https://github.com/kedacore/keda/issues/4460))
Expand Down
18 changes: 9 additions & 9 deletions apis/keda/v1alpha1/scaledobject_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/webhook"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"

prommetrics "github.com/kedacore/keda/v2/pkg/prommetrics/webhook"
metricscollector "github.com/kedacore/keda/v2/pkg/metricscollector/webhook"
)

var scaledobjectlog = logf.Log.WithName("scaledobject-validation-webhook")
Expand Down Expand Up @@ -95,7 +95,7 @@ func isRemovingFinalizer(so *ScaledObject, old runtime.Object) bool {
}

func validateWorkload(so *ScaledObject, action string) (admission.Warnings, error) {
prommetrics.RecordScaledObjectValidatingTotal(so.Namespace, action)
metricscollector.RecordScaledObjectValidatingTotal(so.Namespace, action)

verifyFunctions := []func(*ScaledObject, string) error{
verifyCPUMemoryScalers,
Expand All @@ -119,7 +119,7 @@ func verifyTriggers(incomingSo *ScaledObject, action string) error {
err := ValidateTriggers(incomingSo.Spec.Triggers)
if err != nil {
scaledobjectlog.WithValues("name", incomingSo.Name).Error(err, "validation error")
prommetrics.RecordScaledObjectValidatingErrors(incomingSo.Namespace, action, "incorrect-triggers")
metricscollector.RecordScaledObjectValidatingErrors(incomingSo.Namespace, action, "incorrect-triggers")
}
return err
}
Expand Down Expand Up @@ -170,7 +170,7 @@ func verifyHpas(incomingSo *ScaledObject, action string) error {
} else {
err = fmt.Errorf("the workload '%s' of type '%s' is already managed by the hpa '%s'", incomingSo.Spec.ScaleTargetRef.Name, incomingSoGckr.GVKString(), hpa.Name)
scaledobjectlog.Error(err, "validation error")
prommetrics.RecordScaledObjectValidatingErrors(incomingSo.Namespace, action, "other-hpa")
metricscollector.RecordScaledObjectValidatingErrors(incomingSo.Namespace, action, "other-hpa")
return err
}
}
Expand Down Expand Up @@ -212,7 +212,7 @@ func verifyScaledObjects(incomingSo *ScaledObject, action string) error {
so.Spec.ScaleTargetRef.Name == incomingSo.Spec.ScaleTargetRef.Name {
err = fmt.Errorf("the workload '%s' of type '%s' is already managed by the ScaledObject '%s'", so.Spec.ScaleTargetRef.Name, incomingSoGckr.GVKString(), so.Name)
scaledobjectlog.Error(err, "validation error")
prommetrics.RecordScaledObjectValidatingErrors(incomingSo.Namespace, action, "other-scaled-object")
metricscollector.RecordScaledObjectValidatingErrors(incomingSo.Namespace, action, "other-scaled-object")
return err
}
}
Expand All @@ -222,7 +222,7 @@ func verifyScaledObjects(incomingSo *ScaledObject, action string) error {
_, err = ValidateAndCompileScalingModifiers(incomingSo)
if err != nil {
scaledobjectlog.Error(err, "error validating ScalingModifiers")
prommetrics.RecordScaledObjectValidatingErrors(incomingSo.Namespace, action, "scaling-modifiers")
metricscollector.RecordScaledObjectValidatingErrors(incomingSo.Namespace, action, "scaling-modifiers")

return err
}
Expand Down Expand Up @@ -275,7 +275,7 @@ func verifyCPUMemoryScalers(incomingSo *ScaledObject, action string) error {
container.Resources.Requests.Cpu().AsApproximateFloat64() == 0 {
err := fmt.Errorf("the scaledobject has a cpu trigger but the container %s doesn't have the cpu request defined", container.Name)
scaledobjectlog.Error(err, "validation error")
prommetrics.RecordScaledObjectValidatingErrors(incomingSo.Namespace, action, "missing-requests")
metricscollector.RecordScaledObjectValidatingErrors(incomingSo.Namespace, action, "missing-requests")
return err
}
} else if trigger.Type == memoryString {
Expand All @@ -284,7 +284,7 @@ func verifyCPUMemoryScalers(incomingSo *ScaledObject, action string) error {
container.Resources.Requests.Memory().AsApproximateFloat64() == 0 {
err := fmt.Errorf("the scaledobject has a memory trigger but the container %s doesn't have the memory request defined", container.Name)
scaledobjectlog.Error(err, "validation error")
prommetrics.RecordScaledObjectValidatingErrors(incomingSo.Namespace, action, "missing-requests")
metricscollector.RecordScaledObjectValidatingErrors(incomingSo.Namespace, action, "missing-requests")
return err
}
}
Expand All @@ -304,7 +304,7 @@ func verifyCPUMemoryScalers(incomingSo *ScaledObject, action string) error {
if (scaleToZeroErr && incomingSo.Spec.MinReplicaCount == nil) || (scaleToZeroErr && *incomingSo.Spec.MinReplicaCount == 0) {
err := fmt.Errorf("scaledobject has only cpu/memory triggers AND minReplica is 0 (scale to zero doesn't work in this case)")
scaledobjectlog.Error(err, "validation error")
prommetrics.RecordScaledObjectValidatingErrors(incomingSo.Namespace, action, "scale-to-zero-requirements-not-met")
metricscollector.RecordScaledObjectValidatingErrors(incomingSo.Namespace, action, "scale-to-zero-requirements-not-met")
return err
}
}
Expand Down
12 changes: 11 additions & 1 deletion cmd/operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
kedacontrollers "github.com/kedacore/keda/v2/controllers/keda"
"github.com/kedacore/keda/v2/pkg/certificates"
"github.com/kedacore/keda/v2/pkg/k8s"
"github.com/kedacore/keda/v2/pkg/metricscollector"
"github.com/kedacore/keda/v2/pkg/metricsservice"
"github.com/kedacore/keda/v2/pkg/scaling"
kedautil "github.com/kedacore/keda/v2/pkg/util"
Expand All @@ -61,6 +62,8 @@ func init() {
}

func main() {
var enablePrometheusMetrics bool
var enableOpenTelemetryMetrics bool
var metricsAddr string
var probeAddr string
var metricsServiceAddr string
Expand All @@ -75,7 +78,9 @@ func main() {
var webhooksServiceName string
var enableCertRotation bool
var validatingWebhookName string
pflag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
pflag.BoolVar(&enablePrometheusMetrics, "enable-prometheus-metrics", true, "Enable the prometheus metric of keda-operator.")
pflag.BoolVar(&enableOpenTelemetryMetrics, "enable-opentelemetry-metrics", false, "Enable the opentelemetry metric of keda-operator.")
pflag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the prometheus metric endpoint binds to.")
pflag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
pflag.StringVar(&metricsServiceAddr, "metrics-service-bind-address", ":9666", "The address the gRPRC Metrics Service endpoint binds to.")
pflag.BoolVar(&enableLeaderElection, "leader-elect", false,
Expand Down Expand Up @@ -127,6 +132,11 @@ func main() {
cfg.Burst = adapterClientRequestBurst
cfg.DisableCompression = disableCompression

if !enablePrometheusMetrics {
metricsAddr = "0"
}
metricscollector.NewMetricsCollectors(enablePrometheusMetrics, enableOpenTelemetryMetrics)

mgr, err := ctrl.NewManager(cfg, ctrl.Options{
Scheme: scheme,
Metrics: server.Options{
Expand Down
16 changes: 16 additions & 0 deletions config/e2e/patch_operator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,19 @@
- op: replace
path: /spec/template/spec/containers/0/resources/requests/cpu
value: 500m

- op: add
path: /spec/template/spec/containers/0/args/-
value: --enable-opentelemetry-metrics=true

- op: add
path: /spec/template/spec/containers/0/env/-
value:
name: OTEL_EXPORTER_OTLP_ENDPOINT
value: "http://opentelemetry-collector.default.svc.cluster.local:4318"

- op: add
path: /spec/template/spec/containers/0/env/-
value:
name: OTEL_METRIC_EXPORT_INTERVAL
value: "3000"
8 changes: 4 additions & 4 deletions controllers/keda/clustertriggerauthentication_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/pkg/eventreason"
"github.com/kedacore/keda/v2/pkg/prommetrics"
"github.com/kedacore/keda/v2/pkg/metricscollector"
)

// ClusterTriggerAuthenticationReconciler reconciles a ClusterTriggerAuthentication object
Expand Down Expand Up @@ -97,10 +97,10 @@ func (r *ClusterTriggerAuthenticationReconciler) updatePromMetrics(clusterTrigge
defer clusterTriggerAuthPromMetricsLock.Unlock()

if metricsData, ok := clusterTriggerAuthPromMetricsMap[namespacedName]; ok {
prommetrics.DecrementCRDTotal(prommetrics.ClusterTriggerAuthenticationResource, metricsData.namespace)
metricscollector.DecrementCRDTotal(metricscollector.ClusterTriggerAuthenticationResource, metricsData.namespace)
}

prommetrics.IncrementCRDTotal(prommetrics.ClusterTriggerAuthenticationResource, clusterTriggerAuth.Namespace)
metricscollector.IncrementCRDTotal(metricscollector.ClusterTriggerAuthenticationResource, clusterTriggerAuth.Namespace)
clusterTriggerAuthPromMetricsMap[namespacedName] = clusterTriggerAuthMetricsData{namespace: clusterTriggerAuth.Namespace}
}

Expand All @@ -110,7 +110,7 @@ func (r *ClusterTriggerAuthenticationReconciler) UpdatePromMetricsOnDelete(names
defer clusterTriggerAuthPromMetricsLock.Unlock()

if metricsData, ok := clusterTriggerAuthPromMetricsMap[namespacedName]; ok {
prommetrics.DecrementCRDTotal(prommetrics.ClusterTriggerAuthenticationResource, metricsData.namespace)
metricscollector.DecrementCRDTotal(metricscollector.ClusterTriggerAuthenticationResource, metricsData.namespace)
}

delete(clusterTriggerAuthPromMetricsMap, namespacedName)
Expand Down
14 changes: 7 additions & 7 deletions controllers/keda/scaledjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import (
kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
kedacontrollerutil "github.com/kedacore/keda/v2/controllers/keda/util"
"github.com/kedacore/keda/v2/pkg/eventreason"
"github.com/kedacore/keda/v2/pkg/prommetrics"
"github.com/kedacore/keda/v2/pkg/metricscollector"
"github.com/kedacore/keda/v2/pkg/scaling"
kedastatus "github.com/kedacore/keda/v2/pkg/status"
)
Expand Down Expand Up @@ -327,18 +327,18 @@ func (r *ScaledJobReconciler) updatePromMetrics(scaledJob *kedav1alpha1.ScaledJo
metricsData, ok := scaledJobPromMetricsMap[namespacedName]

if ok {
prommetrics.DecrementCRDTotal(prommetrics.ScaledJobResource, metricsData.namespace)
metricscollector.DecrementCRDTotal(metricscollector.ScaledJobResource, metricsData.namespace)
for _, triggerType := range metricsData.triggerTypes {
prommetrics.DecrementTriggerTotal(triggerType)
metricscollector.DecrementTriggerTotal(triggerType)
}
}

prommetrics.IncrementCRDTotal(prommetrics.ScaledJobResource, scaledJob.Namespace)
metricscollector.IncrementCRDTotal(metricscollector.ScaledJobResource, scaledJob.Namespace)
metricsData.namespace = scaledJob.Namespace

triggerTypes := make([]string, len(scaledJob.Spec.Triggers))
for _, trigger := range scaledJob.Spec.Triggers {
prommetrics.IncrementTriggerTotal(trigger.Type)
metricscollector.IncrementTriggerTotal(trigger.Type)
triggerTypes = append(triggerTypes, trigger.Type)
}
metricsData.triggerTypes = triggerTypes
Expand All @@ -351,9 +351,9 @@ func (r *ScaledJobReconciler) updatePromMetricsOnDelete(namespacedName string) {
defer scaledJobPromMetricsLock.Unlock()

if metricsData, ok := scaledJobPromMetricsMap[namespacedName]; ok {
prommetrics.DecrementCRDTotal(prommetrics.ScaledJobResource, metricsData.namespace)
metricscollector.DecrementCRDTotal(metricscollector.ScaledJobResource, metricsData.namespace)
for _, triggerType := range metricsData.triggerTypes {
prommetrics.DecrementTriggerTotal(triggerType)
metricscollector.DecrementTriggerTotal(triggerType)
}
}

Expand Down
14 changes: 7 additions & 7 deletions controllers/keda/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import (
kedacontrollerutil "github.com/kedacore/keda/v2/controllers/keda/util"
"github.com/kedacore/keda/v2/pkg/common/message"
"github.com/kedacore/keda/v2/pkg/eventreason"
"github.com/kedacore/keda/v2/pkg/prommetrics"
"github.com/kedacore/keda/v2/pkg/metricscollector"
"github.com/kedacore/keda/v2/pkg/scaling"
kedastatus "github.com/kedacore/keda/v2/pkg/status"
)
Expand Down Expand Up @@ -564,18 +564,18 @@ func (r *ScaledObjectReconciler) updatePromMetrics(scaledObject *kedav1alpha1.Sc
metricsData, ok := scaledObjectPromMetricsMap[namespacedName]

if ok {
prommetrics.DecrementCRDTotal(prommetrics.ScaledObjectResource, metricsData.namespace)
metricscollector.DecrementCRDTotal(metricscollector.ScaledObjectResource, metricsData.namespace)
for _, triggerType := range metricsData.triggerTypes {
prommetrics.DecrementTriggerTotal(triggerType)
metricscollector.DecrementTriggerTotal(triggerType)
}
}

prommetrics.IncrementCRDTotal(prommetrics.ScaledObjectResource, scaledObject.Namespace)
metricscollector.IncrementCRDTotal(metricscollector.ScaledObjectResource, scaledObject.Namespace)
metricsData.namespace = scaledObject.Namespace

triggerTypes := make([]string, len(scaledObject.Spec.Triggers))
for _, trigger := range scaledObject.Spec.Triggers {
prommetrics.IncrementTriggerTotal(trigger.Type)
metricscollector.IncrementTriggerTotal(trigger.Type)
triggerTypes = append(triggerTypes, trigger.Type)
}
metricsData.triggerTypes = triggerTypes
Expand All @@ -588,9 +588,9 @@ func (r *ScaledObjectReconciler) updatePromMetricsOnDelete(namespacedName string
defer scaledObjectPromMetricsLock.Unlock()

if metricsData, ok := scaledObjectPromMetricsMap[namespacedName]; ok {
prommetrics.DecrementCRDTotal(prommetrics.ScaledObjectResource, metricsData.namespace)
metricscollector.DecrementCRDTotal(metricscollector.ScaledObjectResource, metricsData.namespace)
for _, triggerType := range metricsData.triggerTypes {
prommetrics.DecrementTriggerTotal(triggerType)
metricscollector.DecrementTriggerTotal(triggerType)
}
}

Expand Down
8 changes: 4 additions & 4 deletions controllers/keda/triggerauthentication_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/pkg/eventreason"
"github.com/kedacore/keda/v2/pkg/prommetrics"
"github.com/kedacore/keda/v2/pkg/metricscollector"
)

// TriggerAuthenticationReconciler reconciles a TriggerAuthentication object
Expand Down Expand Up @@ -98,10 +98,10 @@ func (r *TriggerAuthenticationReconciler) updatePromMetrics(triggerAuth *kedav1a
defer triggerAuthPromMetricsLock.Unlock()

if metricsData, ok := triggerAuthPromMetricsMap[namespacedName]; ok {
prommetrics.DecrementCRDTotal(prommetrics.TriggerAuthenticationResource, metricsData.namespace)
metricscollector.DecrementCRDTotal(metricscollector.TriggerAuthenticationResource, metricsData.namespace)
}

prommetrics.IncrementCRDTotal(prommetrics.TriggerAuthenticationResource, triggerAuth.Namespace)
metricscollector.IncrementCRDTotal(metricscollector.TriggerAuthenticationResource, triggerAuth.Namespace)
triggerAuthPromMetricsMap[namespacedName] = triggerAuthMetricsData{namespace: triggerAuth.Namespace}
}

Expand All @@ -111,7 +111,7 @@ func (r *TriggerAuthenticationReconciler) UpdatePromMetricsOnDelete(namespacedNa
defer triggerAuthPromMetricsLock.Unlock()

if metricsData, ok := triggerAuthPromMetricsMap[namespacedName]; ok {
prommetrics.DecrementCRDTotal(prommetrics.TriggerAuthenticationResource, metricsData.namespace)
metricscollector.DecrementCRDTotal(metricscollector.TriggerAuthenticationResource, metricsData.namespace)
}

delete(triggerAuthPromMetricsMap, namespacedName)
Expand Down
20 changes: 12 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ require (
github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a
go.etcd.io/etcd/client/v3 v3.5.9
go.mongodb.org/mongo-driver v1.12.1
go.opentelemetry.io/otel v1.16.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v0.39.0
go.opentelemetry.io/otel/metric v1.16.0
golang.org/x/oauth2 v0.12.0
golang.org/x/sync v0.3.0
google.golang.org/api v0.142.0
Expand All @@ -104,6 +107,8 @@ require (
sigs.k8s.io/kustomize/kustomize/v5 v5.1.1
)

require go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.39.0 // indirect

replace (
// we need a version with license
github.com/chzyer/logex => github.com/chzyer/logex v1.2.1
Expand Down Expand Up @@ -287,15 +292,14 @@ require (
go.etcd.io/etcd/api/v3 v3.5.9 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.9 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.40.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.40.0 // indirect
go.opentelemetry.io/otel v1.14.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.14.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.14.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.42.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.16.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.16.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.14.0 // indirect
go.opentelemetry.io/otel/metric v0.37.0 // indirect
go.opentelemetry.io/otel/sdk v1.14.0 // indirect
go.opentelemetry.io/otel/trace v1.14.0 // indirect
go.opentelemetry.io/otel/sdk v1.16.0 // indirect
go.opentelemetry.io/otel/sdk/metric v0.39.0
go.opentelemetry.io/otel/trace v1.16.0 // indirect
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go.starlark.net v0.0.0-20200306205701-8dd3e2ee1dd5 // indirect
go.uber.org/atomic v1.11.0 // indirect
Expand Down
Loading

0 comments on commit 95a235b

Please sign in to comment.