Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions lib/metrics/gatherers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Teleport
* Copyright (C) 2025 Gravitational, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package metrics

import (
"sync"

"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
)

// SyncGatherers wraps a [prometheus.Gatherers] so it can be accessed
// in a thread-safe fashion. Gatherer can be added with AddGatherer.
type SyncGatherers struct {
mutex sync.Mutex
gatherers prometheus.Gatherers
}

// NewSyncGatherers returns a thread-safe [prometheus.Gatherers].
func NewSyncGatherers(gatherers ...prometheus.Gatherer) *SyncGatherers {
return &SyncGatherers{
gatherers: gatherers,
}
}

// AddGatherer adds a gatherer to the SyncGatherers slice.
func (s *SyncGatherers) AddGatherer(g prometheus.Gatherer) {
s.mutex.Lock()
defer s.mutex.Unlock()

s.gatherers = append(s.gatherers, g)
}

// Gather implements [prometheus.Gatherer].
func (s *SyncGatherers) Gather() ([]*dto.MetricFamily, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
if s.gatherers == nil {
return nil, nil
}
return s.gatherers.Gather()
}
10 changes: 1 addition & 9 deletions lib/service/diagnostic.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"github.com/gravitational/roundtrip"
"github.com/gravitational/trace"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"

"github.com/gravitational/teleport"
Expand Down Expand Up @@ -68,15 +67,8 @@ func (process *TeleportProcess) newDiagnosticHandler(config diagnosticHandlerCon
// newMetricsHandler creates a new metrics handler serving metrics both from the global prometheus registry and the
// in-process one.
func (process *TeleportProcess) newMetricsHandler() http.Handler {
// We gather metrics both from the in-process registry (preferred metrics registration method)
// and the global registry (used by some Teleport services and many dependencies).
gatherers := prometheus.Gatherers{
process.metricsRegistry,
prometheus.DefaultGatherer,
}

metricsHandler := promhttp.InstrumentMetricHandler(
process.metricsRegistry, promhttp.HandlerFor(gatherers, promhttp.HandlerOpts{
process.metricsRegistry, promhttp.HandlerFor(process, promhttp.HandlerOpts{
// Errors can happen if metrics are registered with identical names in both the local and the global registry.
// In this case, we log the error but continue collecting metrics. The first collected metric will win
// (the one from the local metrics registry takes precedence).
Expand Down
11 changes: 11 additions & 0 deletions lib/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ import (
kubeproxy "github.com/gravitational/teleport/lib/kube/proxy"
"github.com/gravitational/teleport/lib/labels"
"github.com/gravitational/teleport/lib/limiter"
"github.com/gravitational/teleport/lib/metrics"
"github.com/gravitational/teleport/lib/modules"
"github.com/gravitational/teleport/lib/multiplexer"
"github.com/gravitational/teleport/lib/observability/tracing"
Expand Down Expand Up @@ -718,6 +719,12 @@ type TeleportProcess struct {
// Teleport's metric service.
metricsRegistry *prometheus.Registry

// We gather metrics both from the in-process registry (preferred metrics registration method)
// and the global registry (used by some Teleport services and many dependencies).
// optionally other systems can add their gatherers, this can be used if they
// need to add and remove metrics seevral times (e.g. hosted plugin metrics).
*metrics.SyncGatherers

// state is the process state machine tracking if the process is healthy or not.
state *processState

Expand Down Expand Up @@ -1291,6 +1298,10 @@ func NewTeleport(cfg *servicecfg.Config) (_ *TeleportProcess, err error) {
cloudLabels: cloudLabels,
TracingProvider: tracing.NoopProvider(),
metricsRegistry: metricsRegistry,
SyncGatherers: metrics.NewSyncGatherers(
metricsRegistry,
prometheus.DefaultGatherer,
),
}

process.registerExpectedServices(cfg)
Expand Down
59 changes: 57 additions & 2 deletions lib/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ import (
"github.com/gravitational/teleport/lib/events/athena"
"github.com/gravitational/teleport/lib/integrations/externalauditstorage"
"github.com/gravitational/teleport/lib/limiter"
"github.com/gravitational/teleport/lib/metrics"
"github.com/gravitational/teleport/lib/modules"
"github.com/gravitational/teleport/lib/modules/modulestest"
"github.com/gravitational/teleport/lib/multiplexer"
Expand Down Expand Up @@ -1624,14 +1625,18 @@ func TestDebugService(t *testing.T) {

log := logtest.NewLogger()

localRegistry := prometheus.NewRegistry()
additionalRegistry := prometheus.NewRegistry()

// In this test we don't want to spin a whole process and have to wait for
// every service to report ready (there's an integration test for this).
// So we craft a minimal process with only the debug service in it.
process := &TeleportProcess{
Config: cfg,
Clock: fakeClock,
logger: log,
metricsRegistry: prometheus.NewRegistry(),
metricsRegistry: localRegistry,
SyncGatherers: metrics.NewSyncGatherers(localRegistry, prometheus.DefaultGatherer),
Supervisor: NewSupervisor("supervisor-test", log),
}

Expand Down Expand Up @@ -1683,8 +1688,13 @@ func TestDebugService(t *testing.T) {
Namespace: "test",
Name: "global_metric_" + nonce,
})
additionalMetric := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "test",
Name: "additional_metric_" + nonce,
})
require.NoError(t, process.metricsRegistry.Register(localMetric))
require.NoError(t, prometheus.Register(globalMetric))
require.NoError(t, additionalRegistry.Register(additionalMetric))

// Test execution: hit the metrics endpoint.
resp, err := httpClient.Get("http://debug/metrics")
Expand All @@ -1698,6 +1708,26 @@ func TestDebugService(t *testing.T) {
// Test validation: check that the metrics server served both the local and global registry.
require.Contains(t, string(body), "local_metric_"+nonce)
require.Contains(t, string(body), "global_metric_"+nonce)
// the additional registry is not yet added
require.NotContains(t, string(body), "additional_metric_"+nonce)

// Test execution: add the additional registry and lookup again
process.AddGatherer(additionalRegistry)

// Test execution: hit the metrics endpoint.
resp, err = httpClient.Get("http://debug/metrics")
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)

body, err = io.ReadAll(resp.Body)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())

// Test validation: check that the metrics server served both the local and global registry.
require.Contains(t, string(body), "local_metric_"+nonce)
require.Contains(t, string(body), "global_metric_"+nonce)
// Metric has been added
require.Contains(t, string(body), "additional_metric_"+nonce)
}

type mockInstanceMetadata struct {
Expand Down Expand Up @@ -2091,6 +2121,8 @@ func TestDiagnosticsService(t *testing.T) {
}

log := logtest.NewLogger()
localRegistry := prometheus.NewRegistry()
additionalRegistry := prometheus.NewRegistry()

// In this test we don't want to spin a whole process and have to wait for
// every service to report ready (there's an integration test for this).
Expand All @@ -2099,7 +2131,8 @@ func TestDiagnosticsService(t *testing.T) {
Config: cfg,
Clock: fakeClock,
logger: log,
metricsRegistry: prometheus.NewRegistry(),
metricsRegistry: localRegistry,
SyncGatherers: metrics.NewSyncGatherers(localRegistry, prometheus.DefaultGatherer),
Supervisor: NewSupervisor("supervisor-test", log),
}

Expand All @@ -2121,8 +2154,13 @@ func TestDiagnosticsService(t *testing.T) {
Namespace: "test",
Name: "global_metric_" + nonce,
})
additionalMetric := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "test",
Name: "additional_metric_" + nonce,
})
require.NoError(t, process.metricsRegistry.Register(localMetric))
require.NoError(t, prometheus.Register(globalMetric))
require.NoError(t, additionalRegistry.Register(additionalMetric))

// Test execution: query the metrics endpoint and check the tests metrics are here.
diagAddr, err := process.DiagnosticAddr()
Expand All @@ -2141,7 +2179,24 @@ func TestDiagnosticsService(t *testing.T) {
// Test validation: check that the metrics server served both the local and global registry.
require.Contains(t, string(body), "local_metric_"+nonce)
require.Contains(t, string(body), "global_metric_"+nonce)
// the additional registry is not yet added
require.NotContains(t, string(body), "additional_metric_"+nonce)

// Test execution: add the additional registry and lookup again
process.AddGatherer(additionalRegistry)

resp, err = http.Get(metricsURL.String())
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)

body, err = io.ReadAll(resp.Body)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())

require.Contains(t, string(body), "local_metric_"+nonce)
require.Contains(t, string(body), "global_metric_"+nonce)
// the additional registry is not yet added
require.Contains(t, string(body), "additional_metric_"+nonce)
// Fetch the liveness endpoint
healthURL, err := url.Parse("http://" + diagAddr.String())
require.NoError(t, err)
Expand Down
Loading