diff --git a/CHANGELOG.md b/CHANGELOG.md index c28beacedeb..8c42cbd2dee 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -60,7 +60,7 @@ Here is an overview of all new **experimental** features: - **General**: Add support for formula based evaluation of metric values ([#2440](https://github.com/kedacore/keda/issues/2440)) ### Improvements - +- **General**: Add apiserver Prometheus metrics to KEDA Metric Server ([#4460](https://github.com/kedacore/keda/issues/4460)) - **General**: Add more events for user checking ([#796](https://github.com/kedacore/keda/issues/3764)) - **General**: Add ScaledObject/ScaledJob names to output of `kubectl get triggerauthentication/clustertriggerauthentication` ([#796](https://github.com/kedacore/keda/issues/796)) - **General**: Add standalone CRD generation to release workflow ([#2726](https://github.com/kedacore/keda/issues/2726)) diff --git a/cmd/adapter/main.go b/cmd/adapter/main.go index 590c97da7c9..7f17bc105a1 100644 --- a/cmd/adapter/main.go +++ b/cmd/adapter/main.go @@ -20,14 +20,20 @@ import ( "context" "flag" "fmt" + "net/http" "os" + "github.com/prometheus/client_golang/prometheus/collectors" appsv1 "k8s.io/api/apps/v1" + apimetrics "k8s.io/apiserver/pkg/endpoints/metrics" "k8s.io/client-go/kubernetes/scheme" + kubemetrics "k8s.io/component-base/metrics" + "k8s.io/component-base/metrics/legacyregistry" "k8s.io/klog/v2" "k8s.io/klog/v2/klogr" ctrl "sigs.k8s.io/controller-runtime" ctrlcache "sigs.k8s.io/controller-runtime/pkg/cache" + ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/metrics" "sigs.k8s.io/controller-runtime/pkg/metrics/server" basecmd "sigs.k8s.io/custom-metrics-apiserver/pkg/cmd" "sigs.k8s.io/custom-metrics-apiserver/pkg/provider" @@ -96,10 +102,9 @@ func (a *Adapter) makeProvider(ctx context.Context) (provider.ExternalMetricsPro cfg.Burst = adapterClientRequestBurst cfg.DisableCompression = disableCompression - metricsBindAddress := fmt.Sprintf(":%v", metricsAPIServerPort) mgr, err := ctrl.NewManager(cfg, ctrl.Options{ Metrics: server.Options{ - BindAddress: metricsBindAddress, + BindAddress: "0", // disabled since we use our own server to serve metrics }, Scheme: scheme, Cache: ctrlcache.Options{ @@ -131,6 +136,58 @@ func (a *Adapter) makeProvider(ctx context.Context) (provider.ExternalMetricsPro return kedaprovider.NewProvider(ctx, logger, mgr.GetClient(), *grpcClient), stopCh, nil } +// getMetricHandler returns a http handler that exposes metrics from controller-runtime and apiserver +func getMetricHandler() http.HandlerFunc { + // Register apiserver metrics in legacy registry + // this contains the apiserver_* metrics + apimetrics.Register() + + // unregister duplicate collectors that are already handled by controller-runtime's registry + legacyregistry.Registerer().Unregister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) + legacyregistry.Registerer().Unregister(collectors.NewGoCollector(collectors.WithGoCollectorRuntimeMetrics(collectors.MetricsAll))) + + // Return handler that serves metrics from both legacy and controller-runtime registry + return func(w http.ResponseWriter, req *http.Request) { + legacyregistry.Handler().ServeHTTP(w, req) + + kubemetrics.HandlerFor(ctrlmetrics.Registry, kubemetrics.HandlerOpts{}).ServeHTTP(w, req) + } +} + +// RunMetricsServer runs a http listener and handles the /metrics endpoint +// this is needed to consolidate apiserver and controller-runtime metrics +// we have to use a separate http server & can't rely on the controller-runtime implementation +// because apiserver doesn't provide a way to register metrics to other prometheus registries +func RunMetricsServer(ctx context.Context, stopCh <-chan struct{}) { + h := getMetricHandler() + mux := http.NewServeMux() + mux.Handle("/metrics", h) + metricsBindAddress := fmt.Sprintf(":%v", metricsAPIServerPort) + + server := &http.Server{ + Addr: metricsBindAddress, + Handler: mux, + } + + go func() { + logger.Info("starting /metrics server endpoint") + // nosemgrep: use-tls + err := server.ListenAndServe() + if err != http.ErrServerClosed { + panic(err) + } + }() + + go func() { + <-stopCh + logger.Info("Shutting down the /metrics server gracefully...") + + if err := server.Shutdown(ctx); err != nil { + logger.Error(err, "http server shutdown error") + } + }() +} + // generateDefaultMetricsServiceAddr generates default Metrics Service gRPC Server address based on the current Namespace. // By default the Metrics Service gRPC Server runs in the same namespace on the keda-operator pod. func generateDefaultMetricsServiceAddr() string { @@ -196,6 +253,9 @@ func main() { cmd.WithExternalMetrics(kedaProvider) logger.Info(cmd.Message) + + RunMetricsServer(ctx, stopCh) + if err = cmd.Run(stopCh); err != nil { return } diff --git a/tests/sequential/prometheus_metrics/prometheus_metrics_test.go b/tests/sequential/prometheus_metrics/prometheus_metrics_test.go index 4bd2bb367af..f86cb1e3e1a 100644 --- a/tests/sequential/prometheus_metrics/prometheus_metrics_test.go +++ b/tests/sequential/prometheus_metrics/prometheus_metrics_test.go @@ -620,7 +620,9 @@ func testWebhookMetricValues(t *testing.T) { } func testMetricServerMetrics(t *testing.T) { - _ = fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaMetricsServerPrometheusURL)) + families := fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaMetricsServerPrometheusURL)) + checkMetricServerValues(t, families) + checkBuildInfo(t, families) } func testOperatorMetricValues(t *testing.T, kc *kubernetes.Clientset) { @@ -768,3 +770,38 @@ func checkWebhookValues(t *testing.T, families map[string]*prommodel.MetricFamil } assert.GreaterOrEqual(t, metricValue, 1.0, "keda_webhook_scaled_object_validation_total has to be greater than 0") } + +func checkMetricServerValues(t *testing.T, families map[string]*prommodel.MetricFamily) { + t.Log("--- testing metric server metrics ---") + + family, ok := families["workqueue_adds_total"] + if !ok { + t.Errorf("metric workqueue_adds_total not available") + return + } + + metricValue := 0.0 + metrics := family.GetMetric() + for _, metric := range metrics { + metricValue += *metric.Counter.Value + } + assert.GreaterOrEqual(t, metricValue, 1.0, "workqueue_adds_total has to be greater than 0") + + family, ok = families["apiserver_request_total"] + if !ok { + t.Errorf("metric apiserver_request_total not available") + return + } + + metricValue = 0.0 + metrics = family.GetMetric() + for _, metric := range metrics { + labels := metric.GetLabel() + for _, label := range labels { + if *label.Name == "group" && *label.Value == "external.metrics.k8s.io" { + metricValue = *metric.Counter.Value + } + } + } + assert.GreaterOrEqual(t, metricValue, 1.0, "apiserver_request_total has to be greater than 0") +}