From c10bad814d24fabf26616453fa977ba3b6d3c60d Mon Sep 17 00:00:00 2001 From: Yassine Bounekhla Date: Sat, 7 Feb 2026 05:30:09 +0800 Subject: [PATCH] add prometheus metrics to inventory cache --- lib/cache/inventory/inventory_cache.go | 142 ++++++++++++++++++++++++- lib/service/service.go | 11 +- lib/service/service_test.go | 8 +- 3 files changed, 154 insertions(+), 7 deletions(-) diff --git a/lib/cache/inventory/inventory_cache.go b/lib/cache/inventory/inventory_cache.go index 6541fe51863cf..463e32ba70e85 100644 --- a/lib/cache/inventory/inventory_cache.go +++ b/lib/cache/inventory/inventory_cache.go @@ -33,6 +33,7 @@ import ( "github.com/charlievieth/strcase" "github.com/coreos/go-semver/semver" "github.com/gravitational/trace" + "github.com/prometheus/client_golang/prometheus" "golang.org/x/time/rate" "google.golang.org/protobuf/proto" "rsc.io/ordered" @@ -44,6 +45,7 @@ import ( "github.com/gravitational/teleport/api/utils" "github.com/gravitational/teleport/lib/cache" "github.com/gravitational/teleport/lib/expression" + "github.com/gravitational/teleport/lib/observability/metrics" "github.com/gravitational/teleport/lib/services" "github.com/gravitational/teleport/lib/utils/set" "github.com/gravitational/teleport/lib/utils/sortcache" @@ -56,8 +58,82 @@ const ( // botInstancePrefix is the backend prefix for bot instances. botInstancePrefix = "bot_instance" + + metricsSubsystem = "inventory_cache" + // metricsVersionLabel is the label name for version metrics. + metricsVersionLabel = "version" ) +type inventoryCacheMetrics struct { + // instancesTotal is the total number of teleport instances in the cache. + instancesTotal prometheus.Gauge + + // botInstancesTotal is the total number of bot instances in the cache. + botInstancesTotal prometheus.Gauge + + // instancesByVersion is the number of instances by teleport version. + instancesByVersion *prometheus.GaugeVec + + // initDurationSeconds is how long it took to initialize and populate the cache. + initDurationSeconds prometheus.Gauge + + // requests is the total number of requests made to the cache (times ListUnifiedInstances was called). + requests prometheus.Counter +} + +// newInventoryCacheMetrics creates the inventory cache metrics. +func newInventoryCacheMetrics(reg *metrics.Registry) *inventoryCacheMetrics { + var namespace, subsystem string + if reg != nil { + namespace = reg.Namespace() + subsystem = reg.Subsystem() + } + + return &inventoryCacheMetrics{ + instancesTotal: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "instances_total", + Help: "Total number of teleport instances in the inventory cache.", + }), + botInstancesTotal: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "bot_instances_total", + Help: "Total number of bot instances in the inventory cache.", + }), + instancesByVersion: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "instances_by_version", + Help: "Number of teleport instances by teleport version.", + }, []string{metricsVersionLabel}), + initDurationSeconds: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "init_duration_seconds", + Help: "Time taken to initialize and populate the inventory cache.", + }), + requests: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "requests_total", + Help: "Total number of requests to the inventory cache.", + }), + } +} + +// register registers the metrics with the provided registerer. +func (m *inventoryCacheMetrics) register(reg prometheus.Registerer) error { + return trace.NewAggregate( + reg.Register(m.instancesTotal), + reg.Register(m.botInstancesTotal), + reg.Register(m.instancesByVersion), + reg.Register(m.initDurationSeconds), + reg.Register(m.requests), + ) +} + var ( // unifiedExpressionParser is a cached unified expression parser unifiedExpressionParser *typical.Parser[*unifiedFilterEnvironment, bool] @@ -266,6 +342,9 @@ type InventoryCacheConfig struct { TargetVersion string Logger *slog.Logger + + // MetricsRegistry is the registry for prometheus metrics. + MetricsRegistry *metrics.Registry } func (c *InventoryCacheConfig) CheckAndSetDefaults() error { @@ -303,6 +382,9 @@ type InventoryCache struct { // cache is the unified sortcache that holds both teleport and bot instances. cache *sortcache.SortCache[*inventoryInstance, inventoryIndex] + + // metrics holds the prometheus metrics for the inventory cache. + metrics *inventoryCacheMetrics } func NewInventoryCache(cfg InventoryCacheConfig) (*InventoryCache, error) { @@ -310,6 +392,18 @@ func NewInventoryCache(cfg InventoryCacheConfig) (*InventoryCache, error) { return nil, trace.Wrap(err) } + var reg *metrics.Registry + if cfg.MetricsRegistry != nil { + reg = cfg.MetricsRegistry.Wrap(metricsSubsystem) + } + + m := newInventoryCacheMetrics(reg) + if reg != nil { + if err := m.register(reg); err != nil { + cfg.Logger.ErrorContext(context.Background(), "Failed to register inventory cache metrics", "error", err) + } + } + ctx, cancel := context.WithCancel(context.Background()) ic := &InventoryCache{ @@ -328,8 +422,9 @@ func NewInventoryCache(cfg InventoryCacheConfig) (*InventoryCache, error) { // Create a channel that will close when the initialization is done. done: make(chan struct{}), - ctx: ctx, - cancel: cancel, + ctx: ctx, + cancel: cancel, + metrics: m, } go func() { @@ -352,6 +447,34 @@ func (ic *InventoryCache) Close() error { return nil } +// updateMetrics iterates through the cache and updates all prometheus metrics. +func (ic *InventoryCache) updateMetrics() { + var instanceCount, botInstanceCount float64 + versionCounts := make(map[string]float64) + + for item := range ic.cache.Ascend(inventoryIDIndex, "", "") { + if item.isInstance() { + instanceCount++ + + // Count versions + version := item.instance.Spec.Version + if version != "" { + versionCounts[version]++ + } + } else { + botInstanceCount++ + } + } + + ic.metrics.instancesTotal.Set(instanceCount) + ic.metrics.botInstancesTotal.Set(botInstanceCount) + + ic.metrics.instancesByVersion.Reset() + for version, count := range versionCounts { + ic.metrics.instancesByVersion.WithLabelValues(version).Set(count) + } +} + // calculateReadsPerSecond calculates the rate limit to use for backend reads based on cluster size. // The curve is intentionalled capped to stay below the 90s watcher grace period even in extremely large clusters. // With this implementation, these are some of the expected rate limits and corresponding total times based on cluster size: @@ -408,6 +531,8 @@ func (ic *InventoryCache) initializeAndWatchWithRetry(ctx context.Context) { // initializeAndWatch initializes the inventory cache and begins watching for instance and bot_instance backend events. func (ic *InventoryCache) initializeAndWatch(ctx context.Context) error { + initStart := time.Now() + // Wait for primary cache to be ready. if err := ic.waitForPrimaryCacheInit(ctx); err != nil { return trace.Wrap(err, "Failed to wait for primary cache init") @@ -434,6 +559,11 @@ func (ic *InventoryCache) initializeAndWatch(ctx context.Context) error { return trace.Wrap(err, "failed to populate inventory cache") } + // Record how long it took to initialize the cache + ic.metrics.initDurationSeconds.Set(time.Since(initStart).Seconds()) + + ic.updateMetrics() + // Mark cache as healthy. ic.healthy.Store(true) ic.cfg.Logger.InfoContext(ctx, "Inventory cache init succeeded") @@ -561,6 +691,9 @@ func (ic *InventoryCache) populateBotInstances(ctx context.Context, limiter *rat // processEvents processes events from the watcher. func (ic *InventoryCache) processEvents(ctx context.Context, watcher types.Watcher) { + metricsTicker := time.NewTicker(30 * time.Second) + defer metricsTicker.Stop() + for { select { case <-ctx.Done(): @@ -570,6 +703,9 @@ func (ic *InventoryCache) processEvents(ctx context.Context, watcher types.Watch if err := ic.processEvent(event); err != nil { ic.cfg.Logger.WarnContext(ctx, "Failed to process event", "error", err) } + + case <-metricsTicker.C: + ic.updateMetrics() } } } @@ -664,6 +800,8 @@ func (ic *InventoryCache) ListUnifiedInstances(ctx context.Context, req *invento return nil, trace.ConnectionProblem(nil, "inventory cache is not yet healthy, please try again in a few minutes'") } + ic.metrics.requests.Inc() + if req.PageSize <= 0 { req.PageSize = defaults.DefaultChunkSize } diff --git a/lib/service/service.go b/lib/service/service.go index 564617214a67b..98a7032cfc3b6 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -731,7 +731,7 @@ type TeleportProcess struct { // // Both the metricsRegistry and the default global registry are gathered by // Teleport's metric service. - metricsRegistry *prometheus.Registry + metricsRegistry *metrics.Registry // We gather metrics both from the in-process registry (preferred metrics registration method) // and the global registry (used by some Teleport services and many dependencies). @@ -1124,7 +1124,11 @@ func NewTeleport(cfg *servicecfg.Config) (_ *TeleportProcess, err error) { // We must create the registry in NewTeleport, as opposed to the config, // because some tests are running multiple Teleport instances from the same // config and reusing the same registry causes them to fail. - metricsRegistry := prometheus.NewRegistry() + rootMetricRegistry := prometheus.NewRegistry() + metricsRegistry, err := metrics.NewRegistry(rootMetricRegistry, teleport.MetricNamespace, "") + if err != nil { + return nil, trace.Wrap(err, "creating metrics registry") + } // If FIPS mode was requested make sure binary is build against BoringCrypto. if cfg.FIPS { @@ -1331,7 +1335,7 @@ func NewTeleport(cfg *servicecfg.Config) (_ *TeleportProcess, err error) { TracingProvider: tracing.NoopProvider(), metricsRegistry: metricsRegistry, SyncGatherers: metrics.NewSyncGatherers( - metricsRegistry, + rootMetricRegistry, prometheus.DefaultGatherer, ), } @@ -2500,6 +2504,7 @@ func (process *TeleportProcess) initAuthService() error { Inventory: as.Services, BotInstanceBackend: as.Services, Logger: process.logger.With(teleport.ComponentKey, "inventory.cache"), + MetricsRegistry: process.metricsRegistry.Wrap("inventory_cache"), }) if err != nil { return trace.Wrap(err, "creating inventory cache") diff --git a/lib/service/service_test.go b/lib/service/service_test.go index f4cf8dd561173..efc3878a29140 100644 --- a/lib/service/service_test.go +++ b/lib/service/service_test.go @@ -1635,6 +1635,8 @@ func TestDebugService(t *testing.T) { log := logtest.NewLogger() localRegistry := prometheus.NewRegistry() + processRegistry, err := metrics.NewRegistry(localRegistry, teleport.MetricNamespace, "") + require.NoError(t, err) additionalRegistry := prometheus.NewRegistry() // In this test we don't want to spin a whole process and have to wait for @@ -1646,7 +1648,7 @@ func TestDebugService(t *testing.T) { Config: cfg, Clock: fakeClock, logger: log, - metricsRegistry: localRegistry, + metricsRegistry: processRegistry, SyncGatherers: metrics.NewSyncGatherers(localRegistry, prometheus.DefaultGatherer), Supervisor: supervisor, } @@ -2130,6 +2132,8 @@ func TestDiagnosticsService(t *testing.T) { log := logtest.NewLogger() localRegistry := prometheus.NewRegistry() + processRegistry, err := metrics.NewRegistry(localRegistry, teleport.MetricNamespace, "") + require.NoError(t, err) additionalRegistry := prometheus.NewRegistry() // In this test we don't want to spin a whole process and have to wait for @@ -2141,7 +2145,7 @@ func TestDiagnosticsService(t *testing.T) { Config: cfg, Clock: fakeClock, logger: log, - metricsRegistry: localRegistry, + metricsRegistry: processRegistry, SyncGatherers: metrics.NewSyncGatherers(localRegistry, prometheus.DefaultGatherer), Supervisor: supervisor, }