Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 140 additions & 2 deletions lib/cache/inventory/inventory_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/charlievieth/strcase"
"github.com/coreos/go-semver/semver"
"github.com/gravitational/trace"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/time/rate"
"google.golang.org/protobuf/proto"
"rsc.io/ordered"
Expand All @@ -44,6 +45,7 @@ import (
"github.com/gravitational/teleport/api/utils"
"github.com/gravitational/teleport/lib/cache"
"github.com/gravitational/teleport/lib/expression"
"github.com/gravitational/teleport/lib/observability/metrics"
"github.com/gravitational/teleport/lib/services"
"github.com/gravitational/teleport/lib/utils/set"
"github.com/gravitational/teleport/lib/utils/sortcache"
Expand All @@ -56,8 +58,82 @@ const (

// botInstancePrefix is the backend prefix for bot instances.
botInstancePrefix = "bot_instance"

metricsSubsystem = "inventory_cache"
// metricsVersionLabel is the label name for version metrics.
metricsVersionLabel = "version"
)

type inventoryCacheMetrics struct {
// instancesTotal is the total number of teleport instances in the cache.
instancesTotal prometheus.Gauge

// botInstancesTotal is the total number of bot instances in the cache.
botInstancesTotal prometheus.Gauge

// instancesByVersion is the number of instances by teleport version.
instancesByVersion *prometheus.GaugeVec

// initDurationSeconds is how long it took to initialize and populate the cache.
initDurationSeconds prometheus.Gauge

// requests is the total number of requests made to the cache (times ListUnifiedInstances was called).
requests prometheus.Counter
}

// newInventoryCacheMetrics creates the inventory cache metrics.
func newInventoryCacheMetrics(reg *metrics.Registry) *inventoryCacheMetrics {
var namespace, subsystem string
if reg != nil {
namespace = reg.Namespace()
subsystem = reg.Subsystem()
}

return &inventoryCacheMetrics{
instancesTotal: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "instances_total",
Help: "Total number of teleport instances in the inventory cache.",
}),
botInstancesTotal: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "bot_instances_total",
Help: "Total number of bot instances in the inventory cache.",
}),
instancesByVersion: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "instances_by_version",
Help: "Number of teleport instances by teleport version.",
}, []string{metricsVersionLabel}),
initDurationSeconds: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "init_duration_seconds",
Help: "Time taken to initialize and populate the inventory cache.",
}),
requests: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "requests_total",
Help: "Total number of requests to the inventory cache.",
}),
}
}

// register registers the metrics with the provided registerer.
func (m *inventoryCacheMetrics) register(reg prometheus.Registerer) error {
return trace.NewAggregate(
reg.Register(m.instancesTotal),
reg.Register(m.botInstancesTotal),
reg.Register(m.instancesByVersion),
reg.Register(m.initDurationSeconds),
reg.Register(m.requests),
)
}

var (
// unifiedExpressionParser is a cached unified expression parser
unifiedExpressionParser *typical.Parser[*unifiedFilterEnvironment, bool]
Expand Down Expand Up @@ -266,6 +342,9 @@ type InventoryCacheConfig struct {
TargetVersion string

Logger *slog.Logger

// MetricsRegistry is the registry for prometheus metrics.
MetricsRegistry *metrics.Registry
}

func (c *InventoryCacheConfig) CheckAndSetDefaults() error {
Expand Down Expand Up @@ -303,13 +382,28 @@ type InventoryCache struct {

// cache is the unified sortcache that holds both teleport and bot instances.
cache *sortcache.SortCache[*inventoryInstance, inventoryIndex]

// metrics holds the prometheus metrics for the inventory cache.
metrics *inventoryCacheMetrics
}

func NewInventoryCache(cfg InventoryCacheConfig) (*InventoryCache, error) {
if err := cfg.CheckAndSetDefaults(); err != nil {
return nil, trace.Wrap(err)
}

var reg *metrics.Registry
if cfg.MetricsRegistry != nil {
reg = cfg.MetricsRegistry.Wrap(metricsSubsystem)
}

m := newInventoryCacheMetrics(reg)
if reg != nil {
if err := m.register(reg); err != nil {
cfg.Logger.ErrorContext(context.Background(), "Failed to register inventory cache metrics", "error", err)
}
}

ctx, cancel := context.WithCancel(context.Background())

ic := &InventoryCache{
Expand All @@ -328,8 +422,9 @@ func NewInventoryCache(cfg InventoryCacheConfig) (*InventoryCache, error) {
// Create a channel that will close when the initialization is done.
done: make(chan struct{}),

ctx: ctx,
cancel: cancel,
ctx: ctx,
cancel: cancel,
metrics: m,
}

go func() {
Expand All @@ -352,6 +447,34 @@ func (ic *InventoryCache) Close() error {
return nil
}

// updateMetrics iterates through the cache and updates all prometheus metrics.
func (ic *InventoryCache) updateMetrics() {
var instanceCount, botInstanceCount float64
versionCounts := make(map[string]float64)

for item := range ic.cache.Ascend(inventoryIDIndex, "", "") {
if item.isInstance() {
instanceCount++

// Count versions
version := item.instance.Spec.Version
if version != "" {
versionCounts[version]++
}
} else {
botInstanceCount++
}
}

ic.metrics.instancesTotal.Set(instanceCount)
ic.metrics.botInstancesTotal.Set(botInstanceCount)

ic.metrics.instancesByVersion.Reset()
for version, count := range versionCounts {
ic.metrics.instancesByVersion.WithLabelValues(version).Set(count)
}
}

// calculateReadsPerSecond calculates the rate limit to use for backend reads based on cluster size.
// The curve is intentionalled capped to stay below the 90s watcher grace period even in extremely large clusters.
// With this implementation, these are some of the expected rate limits and corresponding total times based on cluster size:
Expand Down Expand Up @@ -408,6 +531,8 @@ func (ic *InventoryCache) initializeAndWatchWithRetry(ctx context.Context) {

// initializeAndWatch initializes the inventory cache and begins watching for instance and bot_instance backend events.
func (ic *InventoryCache) initializeAndWatch(ctx context.Context) error {
initStart := time.Now()

// Wait for primary cache to be ready.
if err := ic.waitForPrimaryCacheInit(ctx); err != nil {
return trace.Wrap(err, "Failed to wait for primary cache init")
Expand All @@ -434,6 +559,11 @@ func (ic *InventoryCache) initializeAndWatch(ctx context.Context) error {
return trace.Wrap(err, "failed to populate inventory cache")
}

// Record how long it took to initialize the cache
ic.metrics.initDurationSeconds.Set(time.Since(initStart).Seconds())

ic.updateMetrics()

// Mark cache as healthy.
ic.healthy.Store(true)
ic.cfg.Logger.InfoContext(ctx, "Inventory cache init succeeded")
Expand Down Expand Up @@ -561,6 +691,9 @@ func (ic *InventoryCache) populateBotInstances(ctx context.Context, limiter *rat

// processEvents processes events from the watcher.
func (ic *InventoryCache) processEvents(ctx context.Context, watcher types.Watcher) {
metricsTicker := time.NewTicker(30 * time.Second)
defer metricsTicker.Stop()

for {
select {
case <-ctx.Done():
Expand All @@ -570,6 +703,9 @@ func (ic *InventoryCache) processEvents(ctx context.Context, watcher types.Watch
if err := ic.processEvent(event); err != nil {
ic.cfg.Logger.WarnContext(ctx, "Failed to process event", "error", err)
}

case <-metricsTicker.C:
ic.updateMetrics()
Comment thread
rudream marked this conversation as resolved.
}
}
}
Expand Down Expand Up @@ -664,6 +800,8 @@ func (ic *InventoryCache) ListUnifiedInstances(ctx context.Context, req *invento
return nil, trace.ConnectionProblem(nil, "inventory cache is not yet healthy, please try again in a few minutes'")
}

ic.metrics.requests.Inc()

if req.PageSize <= 0 {
req.PageSize = defaults.DefaultChunkSize
}
Expand Down
11 changes: 8 additions & 3 deletions lib/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ type TeleportProcess struct {
//
// Both the metricsRegistry and the default global registry are gathered by
// Teleport's metric service.
metricsRegistry *prometheus.Registry
metricsRegistry *metrics.Registry

// We gather metrics both from the in-process registry (preferred metrics registration method)
// and the global registry (used by some Teleport services and many dependencies).
Expand Down Expand Up @@ -1124,7 +1124,11 @@ func NewTeleport(cfg *servicecfg.Config) (_ *TeleportProcess, err error) {
// We must create the registry in NewTeleport, as opposed to the config,
// because some tests are running multiple Teleport instances from the same
// config and reusing the same registry causes them to fail.
metricsRegistry := prometheus.NewRegistry()
rootMetricRegistry := prometheus.NewRegistry()
metricsRegistry, err := metrics.NewRegistry(rootMetricRegistry, teleport.MetricNamespace, "")
if err != nil {
return nil, trace.Wrap(err, "creating metrics registry")
}

// If FIPS mode was requested make sure binary is build against BoringCrypto.
if cfg.FIPS {
Expand Down Expand Up @@ -1331,7 +1335,7 @@ func NewTeleport(cfg *servicecfg.Config) (_ *TeleportProcess, err error) {
TracingProvider: tracing.NoopProvider(),
metricsRegistry: metricsRegistry,
SyncGatherers: metrics.NewSyncGatherers(
metricsRegistry,
rootMetricRegistry,
prometheus.DefaultGatherer,
),
}
Expand Down Expand Up @@ -2500,6 +2504,7 @@ func (process *TeleportProcess) initAuthService() error {
Inventory: as.Services,
BotInstanceBackend: as.Services,
Logger: process.logger.With(teleport.ComponentKey, "inventory.cache"),
MetricsRegistry: process.metricsRegistry.Wrap("inventory_cache"),
})
if err != nil {
return trace.Wrap(err, "creating inventory cache")
Expand Down
8 changes: 6 additions & 2 deletions lib/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1635,6 +1635,8 @@ func TestDebugService(t *testing.T) {
log := logtest.NewLogger()

localRegistry := prometheus.NewRegistry()
processRegistry, err := metrics.NewRegistry(localRegistry, teleport.MetricNamespace, "")
require.NoError(t, err)
additionalRegistry := prometheus.NewRegistry()

// In this test we don't want to spin a whole process and have to wait for
Expand All @@ -1646,7 +1648,7 @@ func TestDebugService(t *testing.T) {
Config: cfg,
Clock: fakeClock,
logger: log,
metricsRegistry: localRegistry,
metricsRegistry: processRegistry,
SyncGatherers: metrics.NewSyncGatherers(localRegistry, prometheus.DefaultGatherer),
Supervisor: supervisor,
}
Expand Down Expand Up @@ -2130,6 +2132,8 @@ func TestDiagnosticsService(t *testing.T) {

log := logtest.NewLogger()
localRegistry := prometheus.NewRegistry()
processRegistry, err := metrics.NewRegistry(localRegistry, teleport.MetricNamespace, "")
require.NoError(t, err)
additionalRegistry := prometheus.NewRegistry()

// In this test we don't want to spin a whole process and have to wait for
Expand All @@ -2141,7 +2145,7 @@ func TestDiagnosticsService(t *testing.T) {
Config: cfg,
Clock: fakeClock,
logger: log,
metricsRegistry: localRegistry,
metricsRegistry: processRegistry,
SyncGatherers: metrics.NewSyncGatherers(localRegistry, prometheus.DefaultGatherer),
Supervisor: supervisor,
}
Expand Down
Loading