diff --git a/lib/auth/middleware.go b/lib/auth/middleware.go index 18eb90a751bd8..70a16394d565c 100644 --- a/lib/auth/middleware.go +++ b/lib/auth/middleware.go @@ -55,6 +55,7 @@ import ( "github.com/gravitational/teleport/lib/limiter" "github.com/gravitational/teleport/lib/multiplexer" "github.com/gravitational/teleport/lib/observability/metrics" + grpcmetrics "github.com/gravitational/teleport/lib/observability/metrics/grpc" "github.com/gravitational/teleport/lib/tlsca" "github.com/gravitational/teleport/lib/utils" logutils "github.com/gravitational/teleport/lib/utils/log" @@ -162,7 +163,7 @@ func NewTLSServer(ctx context.Context, cfg TLSServerConfig) (*TLSServer, error) } // sets up gRPC metrics interceptor - grpcMetrics := metrics.CreateGRPCServerMetrics(cfg.Metrics.GRPCServerLatency, prometheus.Labels{teleport.TagServer: "teleport-auth"}) + grpcMetrics := grpcmetrics.CreateGRPCServerMetrics(cfg.Metrics.GRPCServerLatency, prometheus.Labels{teleport.TagServer: "teleport-auth"}) err = metrics.RegisterPrometheusCollectors(grpcMetrics) if err != nil { return nil, trace.Wrap(err) diff --git a/lib/msgraph/client.go b/lib/msgraph/client.go index 7a061416c26b5..4f74ed1531ce3 100644 --- a/lib/msgraph/client.go +++ b/lib/msgraph/client.go @@ -43,6 +43,7 @@ import ( "github.com/gravitational/teleport/api/types" "github.com/gravitational/teleport/api/utils/retryutils" "github.com/gravitational/teleport/lib/defaults" + "github.com/gravitational/teleport/lib/observability/metrics" "github.com/gravitational/teleport/lib/utils" ) @@ -98,6 +99,9 @@ type Config struct { // GraphEndpoint specifies root domain of the Graph API. GraphEndpoint string Logger *slog.Logger + // MetricsRegistry configures where metrics should be registered. + // When nil, metrics are created but not registered. + MetricsRegistry *metrics.Registry } // SetDefaults sets the default values for optional fields. @@ -120,6 +124,9 @@ func (cfg *Config) SetDefaults() { if cfg.Logger == nil { cfg.Logger = slog.With(teleport.ComponentKey, "msgraph") } + if cfg.MetricsRegistry == nil { + cfg.MetricsRegistry = metrics.NoopRegistry() + } } // Validate checks that required fields are set. @@ -144,6 +151,7 @@ type Client struct { baseURL *url.URL pageSize int logger *slog.Logger + metrics *clientMetrics } // NewClient returns a new client for the given config. @@ -156,6 +164,13 @@ func NewClient(cfg Config) (*Client, error) { if err != nil { return nil, trace.Wrap(err) } + + m := newMetrics(cfg.MetricsRegistry) + // gracefully handle not being given a metric registry + if err := m.register(cfg.MetricsRegistry); err != nil { + cfg.Logger.ErrorContext(context.Background(), "Failed to register metrics.", "error", err) + } + return &Client{ httpClient: cfg.HTTPClient, tokenProvider: cfg.TokenProvider, @@ -164,6 +179,7 @@ func NewClient(cfg Config) (*Client, error) { baseURL: base.JoinPath(graphVersion), pageSize: cfg.PageSize, logger: cfg.Logger, + metrics: m, }, nil } @@ -201,7 +217,8 @@ func (c *Client) request(ctx context.Context, method string, uri string, header } var lastErr error - for i := 0; i < maxRetries; i++ { + var start time.Time + for range maxRetries { if retryAfter > 0 { select { case <-c.clock.After(retryAfter): @@ -231,10 +248,13 @@ func (c *Client) request(ctx context.Context, method string, uri string, header // https://learn.microsoft.com/en-us/graph/best-practices-concept#reliability-and-support req.Header.Set("client-request-id", requestID) + start = c.clock.Now() resp, err := c.httpClient.Do(req) if err != nil { return nil, trace.Wrap(err) // hard I/O error, bail } + c.metrics.requestDuration.WithLabelValues(method).Observe(c.clock.Since(start).Seconds()) + c.metrics.requestTotal.WithLabelValues(method, strconv.Itoa(resp.StatusCode)) if resp.StatusCode >= 200 && resp.StatusCode < 400 { return resp, nil diff --git a/lib/msgraph/metrics.go b/lib/msgraph/metrics.go new file mode 100644 index 0000000000000..d7bca2ee9f223 --- /dev/null +++ b/lib/msgraph/metrics.go @@ -0,0 +1,68 @@ +/* + * Teleport + * Copyright (C) 2025 Gravitational, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package msgraph + +import ( + "github.com/gravitational/trace" + "github.com/prometheus/client_golang/prometheus" + + "github.com/gravitational/teleport/lib/observability/metrics" +) + +type clientMetrics struct { + // requestsTotal keeps track of the number of requests done by the client + // This metric is labeled by status code. + requestTotal *prometheus.CounterVec + // requestDuration keeps track of the request duration, in seconds. + requestDuration *prometheus.HistogramVec +} + +const ( + metricsLabelStatus = "status" + metricsLabelsMethod = "method" +) + +func newMetrics(reg *metrics.Registry) *clientMetrics { + var namespace, subsystem string + if reg != nil { + namespace = reg.Namespace() + subsystem = reg.Subsystem() + } + return &clientMetrics{ + requestTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "request_total", + Help: "Total number of requests made to MS Graph", + }, []string{metricsLabelsMethod, metricsLabelStatus}), + requestDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "request_duration_seconds", + Help: "Request to MS Graph duration in seconds.", + }, []string{metricsLabelsMethod}), + } +} + +func (metrics *clientMetrics) register(r prometheus.Registerer) error { + return trace.NewAggregate( + r.Register(metrics.requestTotal), + r.Register(metrics.requestDuration), + ) +} diff --git a/lib/observability/metrics/gatherers.go b/lib/observability/metrics/gatherers.go new file mode 100644 index 0000000000000..e1cb59e260d6d --- /dev/null +++ b/lib/observability/metrics/gatherers.go @@ -0,0 +1,58 @@ +/* + * Teleport + * Copyright (C) 2025 Gravitational, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package metrics + +import ( + "sync" + + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" +) + +// SyncGatherers wraps a [prometheus.Gatherers] so it can be accessed +// in a thread-safe fashion. Gatherer can be added with AddGatherer. +type SyncGatherers struct { + mutex sync.Mutex + gatherers prometheus.Gatherers +} + +// NewSyncGatherers returns a thread-safe [prometheus.Gatherers]. +func NewSyncGatherers(gatherers ...prometheus.Gatherer) *SyncGatherers { + return &SyncGatherers{ + gatherers: gatherers, + } +} + +// AddGatherer adds a gatherer to the SyncGatherers slice. +func (s *SyncGatherers) AddGatherer(g prometheus.Gatherer) { + s.mutex.Lock() + defer s.mutex.Unlock() + + s.gatherers = append(s.gatherers, g) +} + +// Gather implements [prometheus.Gatherer]. +func (s *SyncGatherers) Gather() ([]*dto.MetricFamily, error) { + s.mutex.Lock() + defer s.mutex.Unlock() + if s.gatherers == nil { + return nil, nil + } + return s.gatherers.Gather() +} diff --git a/lib/observability/metrics/grpc/grpc.go b/lib/observability/metrics/grpc/grpc.go new file mode 100644 index 0000000000000..426b7c30f988c --- /dev/null +++ b/lib/observability/metrics/grpc/grpc.go @@ -0,0 +1,65 @@ +/* + * Teleport + * Copyright (C) 2025 Gravitational, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package grpcmetrics + +import ( + grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" + "github.com/prometheus/client_golang/prometheus" +) + +// CreateGRPCServerMetrics creates server gRPC metrics configuration that is to be registered and used by the caller +// in an openmetrics unary and/or stream interceptor +func CreateGRPCServerMetrics( + latencyEnabled bool, labels prometheus.Labels, +) *grpcprom.ServerMetrics { + serverOpts := []grpcprom.ServerMetricsOption{ + grpcprom.WithServerCounterOptions(grpcprom.WithConstLabels(labels)), + } + if latencyEnabled { + histOpts := grpcHistogramOpts(labels) + serverOpts = append( + serverOpts, grpcprom.WithServerHandlingTimeHistogram(histOpts...), + ) + } + return grpcprom.NewServerMetrics(serverOpts...) +} + +// CreateGRPCClientMetrics creates client gRPC metrics configuration that is to be registered and used by the caller +// in an openmetrics unary and/or stream interceptor +func CreateGRPCClientMetrics( + latencyEnabled bool, labels prometheus.Labels, +) *grpcprom.ClientMetrics { + clientOpts := []grpcprom.ClientMetricsOption{ + grpcprom.WithClientCounterOptions(grpcprom.WithConstLabels(labels)), + } + if latencyEnabled { + histOpts := grpcHistogramOpts(labels) + clientOpts = append( + clientOpts, grpcprom.WithClientHandlingTimeHistogram(histOpts...), + ) + } + return grpcprom.NewClientMetrics(clientOpts...) +} + +func grpcHistogramOpts(labels prometheus.Labels) []grpcprom.HistogramOption { + return []grpcprom.HistogramOption{ + grpcprom.WithHistogramBuckets(prometheus.ExponentialBuckets(0.001, 2, 16)), + grpcprom.WithHistogramConstLabels(labels), + } +} diff --git a/lib/observability/metrics/prometheus.go b/lib/observability/metrics/prometheus.go index d190c7ca1f5dc..440a4078c696c 100644 --- a/lib/observability/metrics/prometheus.go +++ b/lib/observability/metrics/prometheus.go @@ -23,7 +23,6 @@ import ( "runtime" "github.com/gravitational/trace" - grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" "github.com/prometheus/client_golang/prometheus" "github.com/gravitational/teleport" @@ -70,44 +69,3 @@ func BuildCollector() prometheus.Collector { func() float64 { return 1 }, ) } - -// CreateGRPCServerMetrics creates server gRPC metrics configuration that is to be registered and used by the caller -// in an openmetrics unary and/or stream interceptor -func CreateGRPCServerMetrics( - latencyEnabled bool, labels prometheus.Labels, -) *grpcprom.ServerMetrics { - serverOpts := []grpcprom.ServerMetricsOption{ - grpcprom.WithServerCounterOptions(grpcprom.WithConstLabels(labels)), - } - if latencyEnabled { - histOpts := grpcHistogramOpts(labels) - serverOpts = append( - serverOpts, grpcprom.WithServerHandlingTimeHistogram(histOpts...), - ) - } - return grpcprom.NewServerMetrics(serverOpts...) -} - -// CreateGRPCClientMetrics creates client gRPC metrics configuration that is to be registered and used by the caller -// in an openmetrics unary and/or stream interceptor -func CreateGRPCClientMetrics( - latencyEnabled bool, labels prometheus.Labels, -) *grpcprom.ClientMetrics { - clientOpts := []grpcprom.ClientMetricsOption{ - grpcprom.WithClientCounterOptions(grpcprom.WithConstLabels(labels)), - } - if latencyEnabled { - histOpts := grpcHistogramOpts(labels) - clientOpts = append( - clientOpts, grpcprom.WithClientHandlingTimeHistogram(histOpts...), - ) - } - return grpcprom.NewClientMetrics(clientOpts...) -} - -func grpcHistogramOpts(labels prometheus.Labels) []grpcprom.HistogramOption { - return []grpcprom.HistogramOption{ - grpcprom.WithHistogramBuckets(prometheus.ExponentialBuckets(0.001, 2, 16)), - grpcprom.WithHistogramConstLabels(labels), - } -} diff --git a/lib/observability/metrics/registry.go b/lib/observability/metrics/registry.go new file mode 100644 index 0000000000000..8c4f447577bd3 --- /dev/null +++ b/lib/observability/metrics/registry.go @@ -0,0 +1,129 @@ +/* + * Teleport + * Copyright (C) 2025 Gravitational, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package metrics + +import ( + "cmp" + "errors" + + "github.com/prometheus/client_golang/prometheus" +) + +// Registry is a [prometheus.Registerer] for a Teleport process that +// allows propagating additional information such as: +// - the metric namespace (`teleport`, `teleport_bot`, `teleport_plugins`) +// - an optional subsystem +// +// This should be passed anywhere that needs to register a metric. +type Registry struct { + prometheus.Registerer + + namespace string + subsystem string +} + +// Namespace returns the namespace that should be used by metrics registered +// in this Registry. Common namespaces are "teleport", "tbot", and +// "teleport_plugins". +func (r *Registry) Namespace() string { + return r.namespace +} + +// Subsystem is the subsystem base that should be used by metrics registered in +// this Registry. Subsystem parts can be added with WrapWithSubsystem. +func (r *Registry) Subsystem() string { + return r.subsystem +} + +// Wrap wraps a Registry by adding a component to its subsystem. +// This should be used before passing a registry to a sub-component. +// Example usage: +// +// rootReg := prometheus.NewRegistry() +// process.AddGatherer(rootReg) +// reg, err := NewRegistry(rootReg, "teleport_plugins", "") +// go runFooService(ctx, log, reg.Wrap("foo")) +// go runBarService(ctx, log, reg.Wrap("bar")) +func (r *Registry) Wrap(subsystem string) *Registry { + if r.subsystem != "" && subsystem != "" { + subsystem = r.subsystem + "_" + subsystem + } else { + subsystem = cmp.Or(r.subsystem, subsystem) + } + + newReg := &Registry{ + Registerer: r.Registerer, + namespace: r.namespace, + subsystem: subsystem, + } + return newReg +} + +// NewRegistry creates a new Registry wrapping a prometheus registry. +// This should only be called when starting the service management routines such +// as: service.NewTeleport(), tbot.New(), or the hosted plugin manager. +// Services and sub-services should take the registry as a parameter, like they +// already do for the logger. +// Example usage: +// +// rootReg := prometheus.NewRegistry() +// process.AddGatherer(rootReg) +// reg, err := NewRegistry(rootReg, "teleport_plugins", "") +// go runFooService(ctx, log, reg.Wrap("foo")) +// go runBarService(ctx, log, reg.Wrap("bar")) +func NewRegistry(reg prometheus.Registerer, namespace, subsystem string) (*Registry, error) { + if reg == nil { + return nil, errors.New("nil prometheus.Registerer (this is a bug)") + } + if namespace == "" { + return nil, errors.New("namespace is required (this is a bug)") + } + return &Registry{ + Registerer: reg, + namespace: namespace, + subsystem: subsystem, + }, nil +} + +// NoopRegistry returns a Registry that doesn't register metrics. +// This can be used in tests, or to provide backward compatibility when a nil +// Registry is passed. +func NoopRegistry() *Registry { + return &Registry{ + Registerer: noopRegistry{}, + namespace: "noop", + subsystem: "", + } +} + +type noopRegistry struct{} + +// Register implements [prometheus.Registerer]. +func (b noopRegistry) Register(collector prometheus.Collector) error { + return nil +} + +// MustRegister implements [prometheus.Registerer]. +func (b noopRegistry) MustRegister(collector ...prometheus.Collector) { +} + +// Unregister implements [prometheus.Registerer]. +func (b noopRegistry) Unregister(collector prometheus.Collector) bool { + return true +} diff --git a/lib/observability/metrics/registry_test.go b/lib/observability/metrics/registry_test.go new file mode 100644 index 0000000000000..7bfc166a75f90 --- /dev/null +++ b/lib/observability/metrics/registry_test.go @@ -0,0 +1,72 @@ +/* + * Teleport + * Copyright (C) 2025 Gravitational, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package metrics + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestWrap(t *testing.T) { + testNamespace := "namespace" + tests := []struct { + name string + existingSubsystem string + wrappingSubsystem string + expectedSubsystem string + }{ + { + name: "empty subsystem + empty subsystem", + existingSubsystem: "", + wrappingSubsystem: "", + expectedSubsystem: "", + }, + { + name: "empty subsystem + non-empty subsystem", + existingSubsystem: "", + wrappingSubsystem: "test", + expectedSubsystem: "test", + }, + { + name: "non-empty subsystem + empty subsystem", + existingSubsystem: "test", + wrappingSubsystem: "", + expectedSubsystem: "test", + }, + { + name: "non-empty subsystem + non-empty subsystem", + existingSubsystem: "test", + wrappingSubsystem: "test", + expectedSubsystem: "test_test", + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + r := &Registry{ + Registerer: nil, + namespace: testNamespace, + subsystem: test.existingSubsystem, + } + result := r.Wrap(test.wrappingSubsystem) + require.Equal(t, test.expectedSubsystem, result.subsystem) + require.Equal(t, testNamespace, result.namespace) + }) + } +} diff --git a/lib/service/connect.go b/lib/service/connect.go index 12a7bbd6040bf..f7d6953ae0ac3 100644 --- a/lib/service/connect.go +++ b/lib/service/connect.go @@ -57,6 +57,7 @@ import ( "github.com/gravitational/teleport/lib/defaults" "github.com/gravitational/teleport/lib/join/joinclient" "github.com/gravitational/teleport/lib/observability/metrics" + grpcmetrics "github.com/gravitational/teleport/lib/observability/metrics/grpc" "github.com/gravitational/teleport/lib/openssh" "github.com/gravitational/teleport/lib/reversetunnelclient" servicebreaker "github.com/gravitational/teleport/lib/service/breaker" @@ -1505,7 +1506,7 @@ func (process *TeleportProcess) newClientDirect(authServers []utils.NetAddr, tls var dialOpts []grpc.DialOption if role == types.RoleProxy { - grpcMetrics := metrics.CreateGRPCClientMetrics(process.Config.Metrics.GRPCClientLatency, prometheus.Labels{teleport.TagClient: "teleport-proxy"}) + grpcMetrics := grpcmetrics.CreateGRPCClientMetrics(process.Config.Metrics.GRPCClientLatency, prometheus.Labels{teleport.TagClient: "teleport-proxy"}) if err := metrics.RegisterPrometheusCollectors(grpcMetrics); err != nil { return nil, nil, trace.Wrap(err) } diff --git a/lib/service/diagnostic.go b/lib/service/diagnostic.go index 3d912d9915e29..2dc844dd14005 100644 --- a/lib/service/diagnostic.go +++ b/lib/service/diagnostic.go @@ -22,7 +22,6 @@ import ( "github.com/gravitational/roundtrip" "github.com/gravitational/trace" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/gravitational/teleport" @@ -68,15 +67,8 @@ func (process *TeleportProcess) newDiagnosticHandler(config diagnosticHandlerCon // newMetricsHandler creates a new metrics handler serving metrics both from the global prometheus registry and the // in-process one. func (process *TeleportProcess) newMetricsHandler() http.Handler { - // We gather metrics both from the in-process registry (preferred metrics registration method) - // and the global registry (used by some Teleport services and many dependencies). - gatherers := prometheus.Gatherers{ - process.metricsRegistry, - prometheus.DefaultGatherer, - } - metricsHandler := promhttp.InstrumentMetricHandler( - process.metricsRegistry, promhttp.HandlerFor(gatherers, promhttp.HandlerOpts{ + process.metricsRegistry, promhttp.HandlerFor(process, promhttp.HandlerOpts{ // Errors can happen if metrics are registered with identical names in both the local and the global registry. // In this case, we log the error but continue collecting metrics. The first collected metric will win // (the one from the local metrics registry takes precedence). diff --git a/lib/service/service.go b/lib/service/service.go index b992c404ba0d0..45a993fdfed4f 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -143,6 +143,7 @@ import ( "github.com/gravitational/teleport/lib/limiter" "github.com/gravitational/teleport/lib/modules" "github.com/gravitational/teleport/lib/multiplexer" + "github.com/gravitational/teleport/lib/observability/metrics" "github.com/gravitational/teleport/lib/observability/tracing" "github.com/gravitational/teleport/lib/openssh" "github.com/gravitational/teleport/lib/pam" @@ -719,9 +720,15 @@ type TeleportProcess struct { // conflicts. // // Both the metricsRegistry and the default global registry are gathered by - // Telepeort's metric service. + // Teleport's metric service. metricsRegistry *prometheus.Registry + // We gather metrics both from the in-process registry (preferred metrics registration method) + // and the global registry (used by some Teleport services and many dependencies). + // optionally other systems can add their gatherers, this can be used if they + // need to add and remove metrics seevral times (e.g. hosted plugin metrics). + *metrics.SyncGatherers + // state is the process state machine tracking if the process is healthy or not. state *processState @@ -993,6 +1000,12 @@ func (process *TeleportProcess) getIdentity(role types.SystemRole) (i *state.Ide return i, nil } +// MetricsRegistry returns the process-scoped metrics registry. +// New metrics must register against this and not the global prometheus registry. +func (process *TeleportProcess) MetricsRegistry() prometheus.Registerer { + return process.metricsRegistry +} + // Process is a interface for processes type Process interface { // Closer closes all resources used by the process @@ -1084,14 +1097,10 @@ func NewTeleport(cfg *servicecfg.Config) (_ *TeleportProcess, err error) { } }() - // Use the custom metrics registry if specified, else create a new one. - // We must create the registry in NewTeleport, as opposed to ApplyConfig(), + // We must create the registry in NewTeleport, as opposed to the config, // because some tests are running multiple Teleport instances from the same - // config. - metricsRegistry := cfg.MetricsRegistry - if metricsRegistry == nil { - metricsRegistry = prometheus.NewRegistry() - } + // config and reusing the same registry causes them to fail. + metricsRegistry := prometheus.NewRegistry() // If FIPS mode was requested make sure binary is build against BoringCrypto. if cfg.FIPS { @@ -1294,6 +1303,10 @@ func NewTeleport(cfg *servicecfg.Config) (_ *TeleportProcess, err error) { cloudLabels: cloudLabels, TracingProvider: tracing.NoopProvider(), metricsRegistry: metricsRegistry, + SyncGatherers: metrics.NewSyncGatherers( + metricsRegistry, + prometheus.DefaultGatherer, + ), } process.registerExpectedServices(cfg) diff --git a/lib/service/service_test.go b/lib/service/service_test.go index d0365ab7eddfc..61d8bc74265bc 100644 --- a/lib/service/service_test.go +++ b/lib/service/service_test.go @@ -72,6 +72,7 @@ import ( "github.com/gravitational/teleport/lib/modules" "github.com/gravitational/teleport/lib/modules/modulestest" "github.com/gravitational/teleport/lib/multiplexer" + "github.com/gravitational/teleport/lib/observability/metrics" "github.com/gravitational/teleport/lib/reversetunnelclient" "github.com/gravitational/teleport/lib/service/servicecfg" "github.com/gravitational/teleport/lib/services" @@ -1626,6 +1627,9 @@ func TestDebugService(t *testing.T) { log := logtest.NewLogger() + localRegistry := prometheus.NewRegistry() + additionalRegistry := prometheus.NewRegistry() + // In this test we don't want to spin a whole process and have to wait for // every service to report ready (there's an integration test for this). // So we craft a minimal process with only the debug service in it. @@ -1633,7 +1637,8 @@ func TestDebugService(t *testing.T) { Config: cfg, Clock: fakeClock, logger: log, - metricsRegistry: prometheus.NewRegistry(), + metricsRegistry: localRegistry, + SyncGatherers: metrics.NewSyncGatherers(localRegistry, prometheus.DefaultGatherer), Supervisor: NewSupervisor("supervisor-test", log), } @@ -1685,8 +1690,13 @@ func TestDebugService(t *testing.T) { Namespace: "test", Name: "global_metric_" + nonce, }) + additionalMetric := prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "test", + Name: "additional_metric_" + nonce, + }) require.NoError(t, process.metricsRegistry.Register(localMetric)) require.NoError(t, prometheus.Register(globalMetric)) + require.NoError(t, additionalRegistry.Register(additionalMetric)) // Test execution: hit the metrics endpoint. resp, err := httpClient.Get("http://debug/metrics") @@ -1700,6 +1710,26 @@ func TestDebugService(t *testing.T) { // Test validation: check that the metrics server served both the local and global registry. require.Contains(t, string(body), "local_metric_"+nonce) require.Contains(t, string(body), "global_metric_"+nonce) + // the additional registry is not yet added + require.NotContains(t, string(body), "additional_metric_"+nonce) + + // Test execution: add the additional registry and lookup again + process.AddGatherer(additionalRegistry) + + // Test execution: hit the metrics endpoint. + resp, err = httpClient.Get("http://debug/metrics") + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + + body, err = io.ReadAll(resp.Body) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + + // Test validation: check that the metrics server served both the local and global registry. + require.Contains(t, string(body), "local_metric_"+nonce) + require.Contains(t, string(body), "global_metric_"+nonce) + // Metric has been added + require.Contains(t, string(body), "additional_metric_"+nonce) } type mockInstanceMetadata struct { @@ -2093,6 +2123,8 @@ func TestDiagnosticsService(t *testing.T) { } log := logtest.NewLogger() + localRegistry := prometheus.NewRegistry() + additionalRegistry := prometheus.NewRegistry() // In this test we don't want to spin a whole process and have to wait for // every service to report ready (there's an integration test for this). @@ -2101,7 +2133,8 @@ func TestDiagnosticsService(t *testing.T) { Config: cfg, Clock: fakeClock, logger: log, - metricsRegistry: prometheus.NewRegistry(), + metricsRegistry: localRegistry, + SyncGatherers: metrics.NewSyncGatherers(localRegistry, prometheus.DefaultGatherer), Supervisor: NewSupervisor("supervisor-test", log), } @@ -2123,8 +2156,13 @@ func TestDiagnosticsService(t *testing.T) { Namespace: "test", Name: "global_metric_" + nonce, }) + additionalMetric := prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "test", + Name: "additional_metric_" + nonce, + }) require.NoError(t, process.metricsRegistry.Register(localMetric)) require.NoError(t, prometheus.Register(globalMetric)) + require.NoError(t, additionalRegistry.Register(additionalMetric)) // Test execution: query the metrics endpoint and check the tests metrics are here. diagAddr, err := process.DiagnosticAddr() @@ -2143,7 +2181,24 @@ func TestDiagnosticsService(t *testing.T) { // Test validation: check that the metrics server served both the local and global registry. require.Contains(t, string(body), "local_metric_"+nonce) require.Contains(t, string(body), "global_metric_"+nonce) + // the additional registry is not yet added + require.NotContains(t, string(body), "additional_metric_"+nonce) + // Test execution: add the additional registry and lookup again + process.AddGatherer(additionalRegistry) + + resp, err = http.Get(metricsURL.String()) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + + body, err = io.ReadAll(resp.Body) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + + require.Contains(t, string(body), "local_metric_"+nonce) + require.Contains(t, string(body), "global_metric_"+nonce) + // the additional registry is not yet added + require.Contains(t, string(body), "additional_metric_"+nonce) // Fetch the liveness endpoint healthURL, err := url.Parse("http://" + diagAddr.String()) require.NoError(t, err) diff --git a/lib/service/servicecfg/config.go b/lib/service/servicecfg/config.go index 90f869895744b..5564d86e74aea 100644 --- a/lib/service/servicecfg/config.go +++ b/lib/service/servicecfg/config.go @@ -33,7 +33,6 @@ import ( "github.com/ghodss/yaml" "github.com/gravitational/trace" "github.com/jonboulle/clockwork" - "github.com/prometheus/client_golang/prometheus" "golang.org/x/crypto/ssh" "github.com/gravitational/teleport" @@ -265,12 +264,6 @@ type Config struct { // protocol. DatabaseREPLRegistry dbrepl.REPLRegistry - // MetricsRegistry is the prometheus metrics registry used by the Teleport process to register its metrics. - // As of today, not every Teleport metric is registered against this registry. Some Teleport services - // and Teleport dependencies are using the global registry. - // Both the MetricsRegistry and the default global registry are gathered by Teleport's metric service. - MetricsRegistry *prometheus.Registry - // token is either the token needed to join the auth server, or a path pointing to a file // that contains the token // diff --git a/lib/services/reconciler.go b/lib/services/reconciler.go index 70376f30557aa..a7cae8d16f3db 100644 --- a/lib/services/reconciler.go +++ b/lib/services/reconciler.go @@ -21,11 +21,14 @@ package services import ( "context" "log/slog" + "time" "github.com/gravitational/trace" + "github.com/prometheus/client_golang/prometheus" "github.com/gravitational/teleport" "github.com/gravitational/teleport/api/types" + "github.com/gravitational/teleport/lib/observability/metrics" logutils "github.com/gravitational/teleport/lib/utils/log" ) @@ -55,6 +58,17 @@ type GenericReconcilerConfig[K comparable, T any] struct { OnDelete func(context.Context, T) error // Logger emits log messages. Logger *slog.Logger + // Metrics is an optional ReconcilerMetrics created by the caller. + // The caller is responsible for registering the metrics. + // Metrics can be nil, in this case the generic reconciler will generate its + // own metrics, which won't be registered. + // Passing a metrics struct might look like a cumbersome API but we have 2 challenges: + // - some parts of Teleport are using one-shot reconcilers. Registering + // metrics on every run would fail and we would lose the past reconciliation + // data. + // - we have many reconcilers in Teleport and making the caller create the + // metrics beforehand allows them to specify the metric subsystem. + Metrics *ReconcilerMetrics // AllowOriginChanges is a flag that allows the reconciler to change the // origin value of a reconciled resource. By default, origin changes are // disallowed to enforce segregation between of resources from different @@ -88,17 +102,82 @@ func (c *GenericReconcilerConfig[K, T]) CheckAndSetDefaults() error { if c.Logger == nil { c.Logger = slog.With(teleport.ComponentKey, "reconciler") } + if c.Metrics == nil { + var err error + // If we are not given metrics, we create our own so we don't + // panic when trying to increment/observe. + c.Metrics, err = NewReconcilerMetrics(metrics.NoopRegistry().Wrap("unknown")) + if err != nil { + return trace.Wrap(err) + } + } return nil } +// ReconcilerMetrics is a set of metrics that the reconciler will update during +// its reconciliation cycle. +type ReconcilerMetrics struct { + reconciliationTotal *prometheus.CounterVec + reconciliationDuration *prometheus.HistogramVec +} + +const ( + metricLabelResult = "result" + metricLabelResultSuccess = "success" + metricLabelResultError = "error" + metricLabelResultNoop = "noop" + metricLabelOperation = "operation" + metricLabelOperationCreate = "create" + metricLabelOperationUpdate = "update" + metricLabelOperationDelete = "delete" + metricLabelKind = "kind" +) + +// NewReconcilerMetrics creates subsystem-scoped metrics for the reconciler. +// The caller is responsible for registering them into an appropriate registry. +// The same ReconcilerMetrics can be used across different reconcilers. +// The metrics subsystem cannot be empty. +func NewReconcilerMetrics(reg *metrics.Registry) (*ReconcilerMetrics, error) { + if reg == nil { + return nil, trace.BadParameter("missing metrics registry (this is a bug)") + } + if reg.Subsystem() == "" { + return nil, trace.BadParameter("missing metrics subsystem (this is a bug)") + } + return &ReconcilerMetrics{ + reconciliationTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: reg.Namespace(), + Subsystem: reg.Subsystem(), + Name: "reconciliation_total", + Help: "Total number of individual resource reconciliations.", + }, []string{metricLabelKind, metricLabelOperation, metricLabelResult}), + reconciliationDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: reg.Namespace(), + Subsystem: reg.Subsystem(), + Name: "reconciliation_duration_seconds", + Help: "The duration of individual resource reconciliation in seconds.", + }, []string{metricLabelKind, metricLabelOperation}), + }, nil +} + +// Register metrics in the specified [prometheus.Registerer], returns an error +// if any metric fails, but still tries to register every metric before returning. +func (m *ReconcilerMetrics) Register(r prometheus.Registerer) error { + return trace.NewAggregate( + r.Register(m.reconciliationTotal), + r.Register(m.reconciliationDuration), + ) +} + // NewGenericReconciler creates a new GenericReconciler with provided configuration. func NewGenericReconciler[K comparable, T any](cfg GenericReconcilerConfig[K, T]) (*GenericReconciler[K, T], error) { if err := cfg.CheckAndSetDefaults(); err != nil { return nil, trace.Wrap(err) } return &GenericReconciler[K, T]{ - cfg: cfg, - logger: cfg.Logger, + cfg: cfg, + logger: cfg.Logger, + metrics: cfg.Metrics, }, nil } @@ -108,8 +187,9 @@ func NewGenericReconciler[K comparable, T any](cfg GenericReconcilerConfig[K, T] // It's used in combination with watchers by agents (app, database, desktop) // to enable dynamically registered resources. type GenericReconciler[K comparable, T any] struct { - cfg GenericReconcilerConfig[K, T] - logger *slog.Logger + cfg GenericReconcilerConfig[K, T] + logger *slog.Logger + metrics *ReconcilerMetrics } // Reconcile reconciles currently registered resources with new resources and @@ -155,14 +235,35 @@ func (r *GenericReconciler[K, T]) processRegisteredResource(ctx context.Context, return trace.Wrap(err) } r.logger.InfoContext(ctx, "Resource was removed, deleting", "kind", kind, "name", key) - if err := r.cfg.OnDelete(ctx, registered); err != nil { + start := time.Now() + err = r.cfg.OnDelete(ctx, registered) + r.metrics.reconciliationDuration.With(prometheus.Labels{ + metricLabelKind: kind, + metricLabelOperation: metricLabelOperationDelete, + }).Observe(time.Since(start).Seconds()) + if err != nil { if trace.IsNotFound(err) { r.logger.Log(ctx, logutils.TraceLevel, "Failed to delete resource", "kind", kind, "name", key, "err", err) + r.metrics.reconciliationTotal.With(prometheus.Labels{ + metricLabelKind: kind, + metricLabelOperation: metricLabelOperationDelete, + metricLabelResult: metricLabelResultNoop, + }).Inc() return nil } + r.metrics.reconciliationTotal.With(prometheus.Labels{ + metricLabelKind: kind, + metricLabelOperation: metricLabelOperationDelete, + metricLabelResult: metricLabelResultError, + }).Inc() return trace.Wrap(err, "failed to delete %v %v", kind, key) } + r.metrics.reconciliationTotal.With(prometheus.Labels{ + metricLabelKind: kind, + metricLabelOperation: metricLabelOperationDelete, + metricLabelResult: metricLabelResultSuccess, + }).Inc() return nil } @@ -179,9 +280,27 @@ func (r *GenericReconciler[K, T]) processNewResource(ctx context.Context, curren } if r.cfg.Matcher(newT) { r.logger.InfoContext(ctx, "New resource matches, creating", "kind", kind, "name", key) - if err := r.cfg.OnCreate(ctx, newT); err != nil { + start := time.Now() + err = r.cfg.OnCreate(ctx, newT) + r.metrics.reconciliationDuration.With(prometheus.Labels{ + metricLabelKind: kind, + metricLabelOperation: metricLabelOperationCreate, + }).Observe(time.Since(start).Seconds()) + + if err != nil { + r.metrics.reconciliationTotal.With(prometheus.Labels{ + metricLabelKind: kind, + metricLabelOperation: metricLabelOperationCreate, + metricLabelResult: metricLabelResultError, + }).Inc() return trace.Wrap(err, "failed to create %v %v", kind, key) } + r.metrics.reconciliationTotal.With( + prometheus.Labels{ + metricLabelKind: kind, + metricLabelOperation: metricLabelOperationCreate, + metricLabelResult: metricLabelResultSuccess, + }).Inc() return nil } r.logger.DebugContext(ctx, "New resource doesn't match, not creating", "kind", kind, "name", key) @@ -215,19 +334,56 @@ func (r *GenericReconciler[K, T]) processNewResource(ctx context.Context, curren if r.cfg.CompareResources(newT, registered) != Equal { if r.cfg.Matcher(newT) { r.logger.InfoContext(ctx, "Existing resource updated, updating", "name", key) - if err := r.cfg.OnUpdate(ctx, newT, registered); err != nil { + start := time.Now() + err := r.cfg.OnUpdate(ctx, newT, registered) + r.metrics.reconciliationDuration.With(prometheus.Labels{ + metricLabelKind: kind, + metricLabelOperation: metricLabelOperationUpdate, + }).Observe(time.Since(start).Seconds()) + if err != nil { + r.metrics.reconciliationTotal.With(prometheus.Labels{ + metricLabelKind: kind, + metricLabelOperation: metricLabelOperationUpdate, + metricLabelResult: metricLabelResultError, + }).Inc() return trace.Wrap(err, "failed to update %v %v", kind, key) } + r.metrics.reconciliationTotal.With(prometheus.Labels{ + metricLabelKind: kind, + metricLabelOperation: metricLabelOperationUpdate, + metricLabelResult: metricLabelResultSuccess, + }).Inc() return nil } r.logger.InfoContext(ctx, "Existing resource updated and no longer matches, deleting", "name", key) - if err := r.cfg.OnDelete(ctx, registered); err != nil { + start := time.Now() + err := r.cfg.OnDelete(ctx, registered) + r.metrics.reconciliationDuration.With(prometheus.Labels{ + metricLabelKind: kind, + metricLabelOperation: metricLabelOperationDelete, + }).Observe(time.Since(start).Seconds()) + if err != nil { if trace.IsNotFound(err) { r.logger.Log(ctx, logutils.TraceLevel, "Failed to delete resource", "kind", kind, "name", key, "err", err) + r.metrics.reconciliationTotal.With(prometheus.Labels{ + metricLabelKind: kind, + metricLabelOperation: metricLabelOperationDelete, + metricLabelResult: metricLabelResultNoop, + }).Inc() return nil } + r.metrics.reconciliationTotal.With(prometheus.Labels{ + metricLabelKind: kind, + metricLabelOperation: metricLabelOperationDelete, + metricLabelResult: metricLabelResultError, + }).Inc() return trace.Wrap(err, "failed to delete %v %v", kind, key) } + r.metrics.reconciliationTotal.With(prometheus.Labels{ + metricLabelKind: kind, + metricLabelOperation: metricLabelOperationDelete, + metricLabelResult: metricLabelResultSuccess, + }).Inc() return nil } diff --git a/lib/tbot/services/legacyspiffe/workload_api.go b/lib/tbot/services/legacyspiffe/workload_api.go index 159fbefe43839..b5ddbec8aefab 100644 --- a/lib/tbot/services/legacyspiffe/workload_api.go +++ b/lib/tbot/services/legacyspiffe/workload_api.go @@ -51,6 +51,7 @@ import ( machineidv1pb "github.com/gravitational/teleport/api/gen/proto/go/teleport/machineid/v1" "github.com/gravitational/teleport/lib/defaults" "github.com/gravitational/teleport/lib/observability/metrics" + grpcmetrics "github.com/gravitational/teleport/lib/observability/metrics/grpc" "github.com/gravitational/teleport/lib/tbot/bot" "github.com/gravitational/teleport/lib/tbot/client" "github.com/gravitational/teleport/lib/tbot/internal" @@ -174,7 +175,7 @@ func (s *WorkloadAPIService) Run(ctx context.Context) error { defer s.client.Close() s.log.DebugContext(ctx, "Completed pre-run initialization") - srvMetrics := metrics.CreateGRPCServerMetrics( + srvMetrics := grpcmetrics.CreateGRPCServerMetrics( true, prometheus.Labels{ teleport.TagServer: "tbot-spiffe-workload-api", }, diff --git a/lib/tbot/services/workloadidentity/workload_api.go b/lib/tbot/services/workloadidentity/workload_api.go index 23a26f00d936c..2dc7bacb98cf9 100644 --- a/lib/tbot/services/workloadidentity/workload_api.go +++ b/lib/tbot/services/workloadidentity/workload_api.go @@ -46,6 +46,7 @@ import ( apiclient "github.com/gravitational/teleport/api/client" "github.com/gravitational/teleport/lib/defaults" "github.com/gravitational/teleport/lib/observability/metrics" + grpcmetrics "github.com/gravitational/teleport/lib/observability/metrics/grpc" "github.com/gravitational/teleport/lib/tbot/bot" "github.com/gravitational/teleport/lib/tbot/client" "github.com/gravitational/teleport/lib/tbot/internal" @@ -168,7 +169,7 @@ func (s *WorkloadAPIService) Run(ctx context.Context) error { defer s.client.Close() s.log.DebugContext(ctx, "Completed pre-run initialization") - srvMetrics := metrics.CreateGRPCServerMetrics( + srvMetrics := grpcmetrics.CreateGRPCServerMetrics( true, prometheus.Labels{ teleport.TagServer: "tbot-workload-identity-api", }, diff --git a/lib/tbot/tbot.go b/lib/tbot/tbot.go index 4ead51ad18e6f..aaba54081fbed 100644 --- a/lib/tbot/tbot.go +++ b/lib/tbot/tbot.go @@ -35,6 +35,7 @@ import ( "github.com/gravitational/teleport/api/types" "github.com/gravitational/teleport/lib/modules" "github.com/gravitational/teleport/lib/observability/metrics" + grpcmetrics "github.com/gravitational/teleport/lib/observability/metrics/grpc" "github.com/gravitational/teleport/lib/tbot/bot" "github.com/gravitational/teleport/lib/tbot/bot/connection" "github.com/gravitational/teleport/lib/tbot/config" @@ -58,7 +59,7 @@ import ( var tracer = otel.Tracer("github.com/gravitational/teleport/lib/tbot") -var clientMetrics = metrics.CreateGRPCClientMetrics( +var clientMetrics = grpcmetrics.CreateGRPCClientMetrics( false, prometheus.Labels{}, )