diff --git a/lib/metrics/gatherers.go b/lib/metrics/gatherers.go new file mode 100644 index 0000000000000..e1cb59e260d6d --- /dev/null +++ b/lib/metrics/gatherers.go @@ -0,0 +1,58 @@ +/* + * Teleport + * Copyright (C) 2025 Gravitational, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package metrics + +import ( + "sync" + + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" +) + +// SyncGatherers wraps a [prometheus.Gatherers] so it can be accessed +// in a thread-safe fashion. Gatherer can be added with AddGatherer. +type SyncGatherers struct { + mutex sync.Mutex + gatherers prometheus.Gatherers +} + +// NewSyncGatherers returns a thread-safe [prometheus.Gatherers]. +func NewSyncGatherers(gatherers ...prometheus.Gatherer) *SyncGatherers { + return &SyncGatherers{ + gatherers: gatherers, + } +} + +// AddGatherer adds a gatherer to the SyncGatherers slice. +func (s *SyncGatherers) AddGatherer(g prometheus.Gatherer) { + s.mutex.Lock() + defer s.mutex.Unlock() + + s.gatherers = append(s.gatherers, g) +} + +// Gather implements [prometheus.Gatherer]. +func (s *SyncGatherers) Gather() ([]*dto.MetricFamily, error) { + s.mutex.Lock() + defer s.mutex.Unlock() + if s.gatherers == nil { + return nil, nil + } + return s.gatherers.Gather() +} diff --git a/lib/service/diagnostic.go b/lib/service/diagnostic.go index 3d912d9915e29..2dc844dd14005 100644 --- a/lib/service/diagnostic.go +++ b/lib/service/diagnostic.go @@ -22,7 +22,6 @@ import ( "github.com/gravitational/roundtrip" "github.com/gravitational/trace" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/gravitational/teleport" @@ -68,15 +67,8 @@ func (process *TeleportProcess) newDiagnosticHandler(config diagnosticHandlerCon // newMetricsHandler creates a new metrics handler serving metrics both from the global prometheus registry and the // in-process one. func (process *TeleportProcess) newMetricsHandler() http.Handler { - // We gather metrics both from the in-process registry (preferred metrics registration method) - // and the global registry (used by some Teleport services and many dependencies). - gatherers := prometheus.Gatherers{ - process.metricsRegistry, - prometheus.DefaultGatherer, - } - metricsHandler := promhttp.InstrumentMetricHandler( - process.metricsRegistry, promhttp.HandlerFor(gatherers, promhttp.HandlerOpts{ + process.metricsRegistry, promhttp.HandlerFor(process, promhttp.HandlerOpts{ // Errors can happen if metrics are registered with identical names in both the local and the global registry. // In this case, we log the error but continue collecting metrics. The first collected metric will win // (the one from the local metrics registry takes precedence). diff --git a/lib/service/service.go b/lib/service/service.go index 7d3d643a0e313..8f29dc5b6f29e 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -143,6 +143,7 @@ import ( kubeproxy "github.com/gravitational/teleport/lib/kube/proxy" "github.com/gravitational/teleport/lib/labels" "github.com/gravitational/teleport/lib/limiter" + "github.com/gravitational/teleport/lib/metrics" "github.com/gravitational/teleport/lib/modules" "github.com/gravitational/teleport/lib/multiplexer" "github.com/gravitational/teleport/lib/observability/tracing" @@ -718,6 +719,12 @@ type TeleportProcess struct { // Teleport's metric service. metricsRegistry *prometheus.Registry + // We gather metrics both from the in-process registry (preferred metrics registration method) + // and the global registry (used by some Teleport services and many dependencies). + // optionally other systems can add their gatherers, this can be used if they + // need to add and remove metrics seevral times (e.g. hosted plugin metrics). + *metrics.SyncGatherers + // state is the process state machine tracking if the process is healthy or not. state *processState @@ -1291,6 +1298,10 @@ func NewTeleport(cfg *servicecfg.Config) (_ *TeleportProcess, err error) { cloudLabels: cloudLabels, TracingProvider: tracing.NoopProvider(), metricsRegistry: metricsRegistry, + SyncGatherers: metrics.NewSyncGatherers( + metricsRegistry, + prometheus.DefaultGatherer, + ), } process.registerExpectedServices(cfg) diff --git a/lib/service/service_test.go b/lib/service/service_test.go index 156935364826d..bc71c1d103dac 100644 --- a/lib/service/service_test.go +++ b/lib/service/service_test.go @@ -70,6 +70,7 @@ import ( "github.com/gravitational/teleport/lib/events/athena" "github.com/gravitational/teleport/lib/integrations/externalauditstorage" "github.com/gravitational/teleport/lib/limiter" + "github.com/gravitational/teleport/lib/metrics" "github.com/gravitational/teleport/lib/modules" "github.com/gravitational/teleport/lib/modules/modulestest" "github.com/gravitational/teleport/lib/multiplexer" @@ -1624,6 +1625,9 @@ func TestDebugService(t *testing.T) { log := logtest.NewLogger() + localRegistry := prometheus.NewRegistry() + additionalRegistry := prometheus.NewRegistry() + // In this test we don't want to spin a whole process and have to wait for // every service to report ready (there's an integration test for this). // So we craft a minimal process with only the debug service in it. @@ -1631,7 +1635,8 @@ func TestDebugService(t *testing.T) { Config: cfg, Clock: fakeClock, logger: log, - metricsRegistry: prometheus.NewRegistry(), + metricsRegistry: localRegistry, + SyncGatherers: metrics.NewSyncGatherers(localRegistry, prometheus.DefaultGatherer), Supervisor: NewSupervisor("supervisor-test", log), } @@ -1683,8 +1688,13 @@ func TestDebugService(t *testing.T) { Namespace: "test", Name: "global_metric_" + nonce, }) + additionalMetric := prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "test", + Name: "additional_metric_" + nonce, + }) require.NoError(t, process.metricsRegistry.Register(localMetric)) require.NoError(t, prometheus.Register(globalMetric)) + require.NoError(t, additionalRegistry.Register(additionalMetric)) // Test execution: hit the metrics endpoint. resp, err := httpClient.Get("http://debug/metrics") @@ -1698,6 +1708,26 @@ func TestDebugService(t *testing.T) { // Test validation: check that the metrics server served both the local and global registry. require.Contains(t, string(body), "local_metric_"+nonce) require.Contains(t, string(body), "global_metric_"+nonce) + // the additional registry is not yet added + require.NotContains(t, string(body), "additional_metric_"+nonce) + + // Test execution: add the additional registry and lookup again + process.AddGatherer(additionalRegistry) + + // Test execution: hit the metrics endpoint. + resp, err = httpClient.Get("http://debug/metrics") + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + + body, err = io.ReadAll(resp.Body) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + + // Test validation: check that the metrics server served both the local and global registry. + require.Contains(t, string(body), "local_metric_"+nonce) + require.Contains(t, string(body), "global_metric_"+nonce) + // Metric has been added + require.Contains(t, string(body), "additional_metric_"+nonce) } type mockInstanceMetadata struct { @@ -2091,6 +2121,8 @@ func TestDiagnosticsService(t *testing.T) { } log := logtest.NewLogger() + localRegistry := prometheus.NewRegistry() + additionalRegistry := prometheus.NewRegistry() // In this test we don't want to spin a whole process and have to wait for // every service to report ready (there's an integration test for this). @@ -2099,7 +2131,8 @@ func TestDiagnosticsService(t *testing.T) { Config: cfg, Clock: fakeClock, logger: log, - metricsRegistry: prometheus.NewRegistry(), + metricsRegistry: localRegistry, + SyncGatherers: metrics.NewSyncGatherers(localRegistry, prometheus.DefaultGatherer), Supervisor: NewSupervisor("supervisor-test", log), } @@ -2121,8 +2154,13 @@ func TestDiagnosticsService(t *testing.T) { Namespace: "test", Name: "global_metric_" + nonce, }) + additionalMetric := prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "test", + Name: "additional_metric_" + nonce, + }) require.NoError(t, process.metricsRegistry.Register(localMetric)) require.NoError(t, prometheus.Register(globalMetric)) + require.NoError(t, additionalRegistry.Register(additionalMetric)) // Test execution: query the metrics endpoint and check the tests metrics are here. diagAddr, err := process.DiagnosticAddr() @@ -2141,7 +2179,24 @@ func TestDiagnosticsService(t *testing.T) { // Test validation: check that the metrics server served both the local and global registry. require.Contains(t, string(body), "local_metric_"+nonce) require.Contains(t, string(body), "global_metric_"+nonce) + // the additional registry is not yet added + require.NotContains(t, string(body), "additional_metric_"+nonce) + // Test execution: add the additional registry and lookup again + process.AddGatherer(additionalRegistry) + + resp, err = http.Get(metricsURL.String()) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + + body, err = io.ReadAll(resp.Body) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + + require.Contains(t, string(body), "local_metric_"+nonce) + require.Contains(t, string(body), "global_metric_"+nonce) + // the additional registry is not yet added + require.Contains(t, string(body), "additional_metric_"+nonce) // Fetch the liveness endpoint healthURL, err := url.Parse("http://" + diagAddr.String()) require.NoError(t, err)