diff --git a/lib/auth/accesspoint/accesspoint.go b/lib/auth/accesspoint/accesspoint.go index 4904076472e36..36aace8da5cd9 100644 --- a/lib/auth/accesspoint/accesspoint.go +++ b/lib/auth/accesspoint/accesspoint.go @@ -23,17 +23,15 @@ package accesspoint import ( "context" - "log/slog" "slices" "time" "github.com/gravitational/trace" + "github.com/prometheus/client_golang/prometheus" oteltrace "go.opentelemetry.io/otel/trace" "github.com/gravitational/teleport" "github.com/gravitational/teleport/api/types" - "github.com/gravitational/teleport/lib/backend" - "github.com/gravitational/teleport/lib/backend/memory" "github.com/gravitational/teleport/lib/cache" "github.com/gravitational/teleport/lib/observability/tracing" "github.com/gravitational/teleport/lib/services" @@ -63,6 +61,8 @@ type Config struct { // TracingProvider is the provider to be used for exporting // traces. No-op tracers will be used if no provider is set. TracingProvider *tracing.Provider + // Registerer is used to register prometheus metrics. + Registerer prometheus.Registerer // The following services are provided to the Cache to allow it to // populate its resource collections. They will either be the local service @@ -130,27 +130,11 @@ func NewCache(cfg Config) (*cache.Cache, error) { if err := cfg.CheckAndSetDefaults(); err != nil { return nil, trace.Wrap(err) } - slog.DebugContext(cfg.Context, "Creating in-memory backend cache.", "cache_name", cfg.CacheName) - mem, err := memory.New(memory.Config{ - Context: cfg.Context, - EventsOff: !cfg.EventsSystem, - Mirror: true, - }) - if err != nil { - return nil, trace.Wrap(err) - } + var tracer oteltrace.Tracer if cfg.TracingProvider != nil { tracer = cfg.TracingProvider.Tracer(teleport.ComponentCache) } - reporter, err := backend.NewReporter(backend.ReporterConfig{ - Component: teleport.ComponentCache, - Backend: mem, - Tracer: tracer, - }) - if err != nil { - return nil, trace.Wrap(err) - } component := slices.Clone(cfg.CacheName) if cfg.ProcessID != "" { @@ -161,14 +145,13 @@ func NewCache(cfg Config) (*cache.Cache, error) { metricComponent := append(slices.Clone(cfg.CacheName), teleport.ComponentCache) cacheCfg := cache.Config{ - Context: cfg.Context, - Backend: reporter, - Component: teleport.Component(component...), - MetricComponent: teleport.Component(metricComponent...), - Tracer: tracer, - MaxRetryPeriod: cfg.MaxRetryPeriod, - Unstarted: cfg.Unstarted, - + Context: cfg.Context, + Component: teleport.Component(component...), + MetricComponent: teleport.Component(metricComponent...), + Tracer: tracer, + Registerer: cfg.Registerer, + MaxRetryPeriod: cfg.MaxRetryPeriod, + Unstarted: cfg.Unstarted, Access: cfg.Access, AccessLists: cfg.AccessLists, AccessMonitoringRules: cfg.AccessMonitoringRules, diff --git a/lib/backend/atomicwrite.go b/lib/backend/atomicwrite.go index a59cf84f70cb7..b3eaf37fedd8d 100644 --- a/lib/backend/atomicwrite.go +++ b/lib/backend/atomicwrite.go @@ -218,6 +218,8 @@ const ( // MaxAtomicWriteSize is the maximum number of conditional actions that may // be applied via a single atomic write. The exact number is subject to change // but must always be less than the minimum value supported across all backends. + // + // NOTE:The buckets for backendmetrics.AtomicWriteSize must stay in sync with this constant. MaxAtomicWriteSize = 64 ) diff --git a/lib/backend/backendmetrics/metrics.go b/lib/backend/backendmetrics/metrics.go new file mode 100644 index 0000000000000..40881c020b604 --- /dev/null +++ b/lib/backend/backendmetrics/metrics.go @@ -0,0 +1,255 @@ +// Teleport +// Copyright (C) 2025 Gravitational, Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package backendmetrics + +import ( + "github.com/gravitational/trace" + "github.com/prometheus/client_golang/prometheus" + + "github.com/gravitational/teleport" + "github.com/gravitational/teleport/lib/observability/metrics" +) + +var ( + Requests = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: teleport.MetricBackendRequests, + Help: "Number of requests to the backend (reads, writes, and keepalives)", + }, + []string{teleport.ComponentLabel, teleport.TagReq, teleport.TagRange}, + ) + Watchers = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: teleport.MetricBackendWatchers, + Help: "Number of active backend watchers", + }, + []string{teleport.ComponentLabel}, + ) + WatcherQueues = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: teleport.MetricBackendWatcherQueues, + Help: "Watcher queue sizes", + }, + []string{teleport.ComponentLabel}, + ) + WriteRequests = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: teleport.MetricBackendWriteRequests, + Help: "Number of write requests to the backend", + }, + []string{teleport.ComponentLabel}, + ) + Writes = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: teleport.MetricNamespace, + Name: teleport.MetricBackendWrites, + Help: "Number of individual items written to the backend", + }, + []string{teleport.ComponentLabel}, + ) + WriteRequestsFailed = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: teleport.MetricBackendWriteFailedRequests, + Help: "Number of failed write requests to the backend", + }, + []string{teleport.ComponentLabel}, + ) + WriteRequestsFailedPrecondition = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: teleport.MetricNamespace, + Name: teleport.MetricBackendWriteFailedPreconditionRequests, + Help: "Number of write requests that failed due to a precondition (existence, revision, value, etc)", + }, + []string{teleport.ComponentLabel}, + ) + AtomicWriteRequests = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: teleport.MetricNamespace, + Name: teleport.MetricBackendAtomicWriteRequests, + Help: "Number of atomic write requests to the backend", + }, + []string{teleport.ComponentLabel}, + ) + AtomicWriteRequestsFailed = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: teleport.MetricNamespace, + Name: teleport.MetricBackendAtomicWriteFailedRequests, + Help: "Number of failed atomic write requests to the backend", + }, + []string{teleport.ComponentLabel}, + ) + AtomicWriteConditionFailed = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: teleport.MetricNamespace, + Name: teleport.MetricBackendAtomicWriteConditionFailed, + Help: "Number of times an atomic write request results in condition failure", + }, + []string{teleport.ComponentLabel}, + ) + AtomicWriteLatencies = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: teleport.MetricNamespace, + Name: teleport.MetricBackendAtomicWriteHistogram, + Help: "Latency for backend atomic write operations", + // lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2 + // highest bucket start of 0.001 sec * 2^15 == 32.768 sec + Buckets: prometheus.ExponentialBuckets(0.001, 2, 16), + }, + []string{teleport.ComponentLabel}, + ) + AtomicWriteSize = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: teleport.MetricNamespace, + Name: teleport.MetricBackendAtomicWriteSize, + Help: "Atomic write batch size", + // buckets of the form 1, 2, 4, 8, 16, etc... + Buckets: prometheus.ExponentialBuckets(1, 2, 8), + }, + []string{teleport.ComponentLabel}, + ) + AtomicWriteContention = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: teleport.MetricNamespace, + Name: teleport.MetricBackendAtomicWriteContention, + Help: "Number of times atomic write requests experience contention", + }, + []string{teleport.ComponentLabel}, + ) + BatchWriteRequests = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: teleport.MetricBackendBatchWriteRequests, + Help: "Number of batch write requests to the backend", + }, + []string{teleport.ComponentLabel}, + ) + BatchWriteRequestsFailed = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: teleport.MetricBackendBatchFailedWriteRequests, + Help: "Number of failed write requests to the backend", + }, + []string{teleport.ComponentLabel}, + ) + ReadRequests = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: teleport.MetricBackendReadRequests, + Help: "Number of read requests to the backend", + }, + []string{teleport.ComponentLabel}, + ) + Reads = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: teleport.MetricNamespace, + Name: teleport.MetricBackendReads, + Help: "Number of individual items read from the backend", + }, + []string{teleport.ComponentLabel}, + ) + ReadRequestsFailed = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: teleport.MetricBackendFailedReadRequests, + Help: "Number of failed read requests to the backend", + }, + []string{teleport.ComponentLabel}, + ) + StreamingRequests = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: teleport.MetricNamespace, + Subsystem: "backend", + Name: "stream_requests", + Help: "Number of inflight stream requests to the backend", + }, + []string{teleport.ComponentLabel}, + ) + StreamingRequestsFailed = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: teleport.MetricNamespace, + Subsystem: "backend", + Name: "stream_requests_failed", + Help: "Number of failed stream requests to the backend", + }, + []string{teleport.ComponentLabel}, + ) + BatchReadRequests = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: teleport.MetricBackendBatchReadRequests, + Help: "Number of read requests to the backend", + }, + []string{teleport.ComponentLabel}, + ) + BatchReadRequestsFailed = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: teleport.MetricBackendBatchFailedReadRequests, + Help: "Number of failed read requests to the backend", + }, + []string{teleport.ComponentLabel}, + ) + WriteLatencies = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: teleport.MetricBackendWriteHistogram, + Help: "Latency for backend write operations", + // lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2 + // highest bucket start of 0.001 sec * 2^15 == 32.768 sec + Buckets: prometheus.ExponentialBuckets(0.001, 2, 16), + }, + []string{teleport.ComponentLabel}, + ) + BatchWriteLatencies = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: teleport.MetricBackendBatchWriteHistogram, + Help: "Latency for backend batch write operations", + // lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2 + // highest bucket start of 0.001 sec * 2^15 == 32.768 sec + Buckets: prometheus.ExponentialBuckets(0.001, 2, 16), + }, + []string{teleport.ComponentLabel}, + ) + BatchReadLatencies = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: teleport.MetricBackendBatchReadHistogram, + Help: "Latency for batch read operations", + // lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2 + // highest bucket start of 0.001 sec * 2^15 == 32.768 sec + Buckets: prometheus.ExponentialBuckets(0.001, 2, 16), + }, + []string{teleport.ComponentLabel}, + ) + ReadLatencies = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: teleport.MetricBackendReadHistogram, + Help: "Latency for read operations", + // lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2 + // highest bucket start of 0.001 sec * 2^15 == 32.768 sec + Buckets: prometheus.ExponentialBuckets(0.001, 2, 16), + }, + []string{teleport.ComponentLabel}, + ) +) + +// RegisterCollectors ensures all backend metrics are registered +// with the provided [prometheus.Registerer]. +func RegisterCollectors(reg prometheus.Registerer) error { + return trace.Wrap(metrics.RegisterCollectors(reg, + Watchers, WatcherQueues, Requests, WriteRequests, + WriteRequestsFailed, BatchWriteRequests, BatchWriteRequestsFailed, ReadRequests, + ReadRequestsFailed, BatchReadRequests, BatchReadRequestsFailed, WriteLatencies, + WriteRequestsFailedPrecondition, + AtomicWriteRequests, AtomicWriteRequestsFailed, AtomicWriteConditionFailed, AtomicWriteLatencies, + AtomicWriteContention, AtomicWriteSize, Reads, Writes, + BatchWriteLatencies, BatchReadLatencies, ReadLatencies, + StreamingRequests, StreamingRequestsFailed, + )) +} diff --git a/lib/backend/buffer.go b/lib/backend/buffer.go index 5af31c114d52e..2cbc892e60fc7 100644 --- a/lib/backend/buffer.go +++ b/lib/backend/buffer.go @@ -32,6 +32,7 @@ import ( "github.com/gravitational/teleport" "github.com/gravitational/teleport/api/types" + "github.com/gravitational/teleport/lib/backend/backendmetrics" logutils "github.com/gravitational/teleport/lib/utils/log" ) @@ -203,7 +204,7 @@ func (c *CircularBuffer) fanOutEvent(r Event) { var watchersToDelete []*BufferWatcher c.watchers.walkPath(r.Item.Key.String(), func(watcher *BufferWatcher) { if watcher.MetricComponent != "" { - watcherQueues.WithLabelValues(watcher.MetricComponent).Set(float64(len(watcher.eventsC))) + backendmetrics.WatcherQueues.WithLabelValues(watcher.MetricComponent).Set(float64(len(watcher.eventsC))) } if !watcher.emit(r) { watchersToDelete = append(watchersToDelete, watcher) diff --git a/lib/backend/dynamo/atomicwrite.go b/lib/backend/dynamo/atomicwrite.go index 4de200e5b97b1..a30bbf8a9344a 100644 --- a/lib/backend/dynamo/atomicwrite.go +++ b/lib/backend/dynamo/atomicwrite.go @@ -34,6 +34,7 @@ import ( "github.com/gravitational/teleport" "github.com/gravitational/teleport/api/utils/retryutils" "github.com/gravitational/teleport/lib/backend" + "github.com/gravitational/teleport/lib/backend/backendmetrics" ) const ( @@ -250,7 +251,7 @@ TxnLoop: } if i > 0 { - backend.AtomicWriteContention.WithLabelValues(teleport.ComponentDynamoDB).Add(float64(i)) + backendmetrics.AtomicWriteContention.WithLabelValues(teleport.ComponentDynamoDB).Add(float64(i)) } if n := i + 1; n > 2 { diff --git a/lib/backend/firestore/atomicwrite.go b/lib/backend/firestore/atomicwrite.go index 3dee9de1b8770..6f4877dd04848 100644 --- a/lib/backend/firestore/atomicwrite.go +++ b/lib/backend/firestore/atomicwrite.go @@ -29,6 +29,7 @@ import ( "github.com/gravitational/teleport" "github.com/gravitational/teleport/lib/backend" + "github.com/gravitational/teleport/lib/backend/backendmetrics" ) func (b *Backend) AtomicWrite(ctx context.Context, condacts []backend.ConditionalAction) (revision string, err error) { @@ -146,7 +147,7 @@ func (b *Backend) AtomicWrite(ctx context.Context, condacts []backend.Conditiona } if n > 1 { - backend.AtomicWriteContention.WithLabelValues(teleport.ComponentFirestore).Add(float64(n - 1)) + backendmetrics.AtomicWriteContention.WithLabelValues(teleport.ComponentFirestore).Add(float64(n - 1)) } if n > 2 { diff --git a/lib/backend/pgbk/atomicwrite.go b/lib/backend/pgbk/atomicwrite.go index aaaada25d68a9..20e60d050919c 100644 --- a/lib/backend/pgbk/atomicwrite.go +++ b/lib/backend/pgbk/atomicwrite.go @@ -26,6 +26,7 @@ import ( "github.com/jackc/pgx/v5/pgtype/zeronull" "github.com/gravitational/teleport/lib/backend" + "github.com/gravitational/teleport/lib/backend/backendmetrics" pgcommon "github.com/gravitational/teleport/lib/backend/pgbk/common" ) @@ -131,7 +132,7 @@ func (b *Backend) AtomicWrite(ctx context.Context, condacts []backend.Conditiona }) if attempts > 1 { - backend.AtomicWriteContention.WithLabelValues(b.GetName()).Add(float64(attempts - 1)) + backendmetrics.AtomicWriteContention.WithLabelValues(b.GetName()).Add(float64(attempts - 1)) } if attempts > 2 { diff --git a/lib/backend/report.go b/lib/backend/report.go index f061a2c8822de..fa43ee613bf66 100644 --- a/lib/backend/report.go +++ b/lib/backend/report.go @@ -23,7 +23,6 @@ import ( "errors" "iter" "log/slog" - "math" "slices" "strings" "time" @@ -38,7 +37,7 @@ import ( "github.com/gravitational/teleport" "github.com/gravitational/teleport/api/types" - "github.com/gravitational/teleport/lib/observability/metrics" + "github.com/gravitational/teleport/lib/backend/backendmetrics" "github.com/gravitational/teleport/lib/observability/tracing" logutils "github.com/gravitational/teleport/lib/utils/log" ) @@ -57,6 +56,8 @@ type ReporterConfig struct { TopRequestsCount int // Tracer is used to create spans Tracer oteltrace.Tracer + // Registerer is used to register prometheus metrics. + Registerer prometheus.Registerer } // CheckAndSetDefaults checks and sets @@ -73,6 +74,9 @@ func (r *ReporterConfig) CheckAndSetDefaults() error { if r.Tracer == nil { r.Tracer = tracing.NoopTracer(teleport.ComponentBackend) } + if r.Registerer == nil { + r.Registerer = prometheus.DefaultRegisterer + } return nil } @@ -97,18 +101,17 @@ type Reporter struct { // NewReporter returns a new Reporter. func NewReporter(cfg ReporterConfig) (*Reporter, error) { - err := metrics.RegisterPrometheusCollectors(prometheusCollectors...) - if err != nil { + if err := cfg.CheckAndSetDefaults(); err != nil { return nil, trace.Wrap(err) } - if err := cfg.CheckAndSetDefaults(); err != nil { + if err := backendmetrics.RegisterCollectors(cfg.Registerer); err != nil { return nil, trace.Wrap(err) } cache, err := lru.NewWithEvict(cfg.TopRequestsCount, func(labels topRequestsCacheKey, value struct{}) { // Evict the key from requests metric. - requests.DeleteLabelValues(labels.component, labels.key, labels.isRange) + backendmetrics.Requests.DeleteLabelValues(labels.component, labels.key, labels.isRange) }) if err != nil { return nil, trace.Wrap(err) @@ -140,12 +143,12 @@ func (s *Reporter) GetRange(ctx context.Context, startKey, endKey Key, limit int start := s.Clock().Now() res, err := s.Backend.GetRange(ctx, startKey, endKey, limit) - batchReadLatencies.WithLabelValues(s.Component).Observe(s.Clock().Since(start).Seconds()) - batchReadRequests.WithLabelValues(s.Component).Inc() + backendmetrics.BatchReadLatencies.WithLabelValues(s.Component).Observe(s.Clock().Since(start).Seconds()) + backendmetrics.BatchReadRequests.WithLabelValues(s.Component).Inc() if err != nil { - batchReadRequestsFailed.WithLabelValues(s.Component).Inc() + backendmetrics.BatchReadRequestsFailed.WithLabelValues(s.Component).Inc() } else { - reads.WithLabelValues(s.Component).Add(float64(len(res.Items))) + backendmetrics.Reads.WithLabelValues(s.Component).Add(float64(len(res.Items))) } s.trackRequest(ctx, types.OpGet, startKey, endKey) end := s.Clock().Now() @@ -173,13 +176,13 @@ func (s *Reporter) Items(ctx context.Context, params ItemsParams) iter.Seq2[Item var count int defer func() { s.trackRequest(ctx, types.OpGet, params.StartKey, params.EndKey) - streamingRequests.WithLabelValues(s.Component).Inc() - reads.WithLabelValues(s.Component).Add(float64(count)) + backendmetrics.StreamingRequests.WithLabelValues(s.Component).Inc() + backendmetrics.Reads.WithLabelValues(s.Component).Add(float64(count)) }() for item, err := range s.Backend.Items(ctx, params) { if err != nil { - streamingRequestsFailed.WithLabelValues(s.Component).Inc() + backendmetrics.StreamingRequestsFailed.WithLabelValues(s.Component).Inc() } count++ @@ -204,15 +207,15 @@ func (s *Reporter) Create(ctx context.Context, i Item) (*Lease, error) { start := s.Clock().Now() lease, err := s.Backend.Create(ctx, i) - writeLatencies.WithLabelValues(s.Component).Observe(s.Clock().Since(start).Seconds()) - writeRequests.WithLabelValues(s.Component).Inc() + backendmetrics.WriteLatencies.WithLabelValues(s.Component).Observe(s.Clock().Since(start).Seconds()) + backendmetrics.WriteRequests.WithLabelValues(s.Component).Inc() if err != nil { - writeRequestsFailed.WithLabelValues(s.Component).Inc() + backendmetrics.WriteRequestsFailed.WithLabelValues(s.Component).Inc() if trace.IsAlreadyExists(err) { - writeRequestsFailedPrecondition.WithLabelValues(s.Component).Inc() + backendmetrics.WriteRequestsFailedPrecondition.WithLabelValues(s.Component).Inc() } } else { - writes.WithLabelValues(s.Component).Inc() + backendmetrics.Writes.WithLabelValues(s.Component).Inc() } s.trackRequest(ctx, types.OpPut, i.Key, Key{}) return lease, err @@ -233,12 +236,12 @@ func (s *Reporter) Put(ctx context.Context, i Item) (*Lease, error) { start := s.Clock().Now() lease, err := s.Backend.Put(ctx, i) - writeLatencies.WithLabelValues(s.Component).Observe(s.Clock().Since(start).Seconds()) - writeRequests.WithLabelValues(s.Component).Inc() + backendmetrics.WriteLatencies.WithLabelValues(s.Component).Observe(s.Clock().Since(start).Seconds()) + backendmetrics.WriteRequests.WithLabelValues(s.Component).Inc() if err != nil { - writeRequestsFailed.WithLabelValues(s.Component).Inc() + backendmetrics.WriteRequestsFailed.WithLabelValues(s.Component).Inc() } else { - writes.WithLabelValues(s.Component).Inc() + backendmetrics.Writes.WithLabelValues(s.Component).Inc() } s.trackRequest(ctx, types.OpPut, i.Key, Key{}) return lease, err @@ -258,15 +261,15 @@ func (s *Reporter) Update(ctx context.Context, i Item) (*Lease, error) { start := s.Clock().Now() lease, err := s.Backend.Update(ctx, i) - writeLatencies.WithLabelValues(s.Component).Observe(s.Clock().Since(start).Seconds()) - writeRequests.WithLabelValues(s.Component).Inc() + backendmetrics.WriteLatencies.WithLabelValues(s.Component).Observe(s.Clock().Since(start).Seconds()) + backendmetrics.WriteRequests.WithLabelValues(s.Component).Inc() if err != nil { - writeRequestsFailed.WithLabelValues(s.Component).Inc() + backendmetrics.WriteRequestsFailed.WithLabelValues(s.Component).Inc() if trace.IsNotFound(err) { - writeRequestsFailedPrecondition.WithLabelValues(s.Component).Inc() + backendmetrics.WriteRequestsFailedPrecondition.WithLabelValues(s.Component).Inc() } } else { - writes.WithLabelValues(s.Component).Inc() + backendmetrics.Writes.WithLabelValues(s.Component).Inc() } s.trackRequest(ctx, types.OpPut, i.Key, Key{}) return lease, err @@ -286,15 +289,15 @@ func (s *Reporter) ConditionalUpdate(ctx context.Context, i Item) (*Lease, error start := s.Clock().Now() lease, err := s.Backend.ConditionalUpdate(ctx, i) - writeLatencies.WithLabelValues(s.Component).Observe(s.Clock().Since(start).Seconds()) - writeRequests.WithLabelValues(s.Component).Inc() + backendmetrics.WriteLatencies.WithLabelValues(s.Component).Observe(s.Clock().Since(start).Seconds()) + backendmetrics.WriteRequests.WithLabelValues(s.Component).Inc() if err != nil { - writeRequestsFailed.WithLabelValues(s.Component).Inc() + backendmetrics.WriteRequestsFailed.WithLabelValues(s.Component).Inc() if errors.Is(err, ErrIncorrectRevision) { - writeRequestsFailedPrecondition.WithLabelValues(s.Component).Inc() + backendmetrics.WriteRequestsFailedPrecondition.WithLabelValues(s.Component).Inc() } } else { - writes.WithLabelValues(s.Component).Inc() + backendmetrics.Writes.WithLabelValues(s.Component).Inc() } s.trackRequest(ctx, types.OpPut, i.Key, Key{}) return lease, err @@ -313,11 +316,11 @@ func (s *Reporter) Get(ctx context.Context, key Key) (*Item, error) { start := s.Clock().Now() item, err := s.Backend.Get(ctx, key) - readLatencies.WithLabelValues(s.Component).Observe(s.Clock().Since(start).Seconds()) - readRequests.WithLabelValues(s.Component).Inc() - reads.WithLabelValues(s.Component).Inc() + backendmetrics.ReadLatencies.WithLabelValues(s.Component).Observe(s.Clock().Since(start).Seconds()) + backendmetrics.ReadRequests.WithLabelValues(s.Component).Inc() + backendmetrics.Reads.WithLabelValues(s.Component).Inc() if err != nil && !trace.IsNotFound(err) { - readRequestsFailed.WithLabelValues(s.Component).Inc() + backendmetrics.ReadRequestsFailed.WithLabelValues(s.Component).Inc() } s.trackRequest(ctx, types.OpGet, key, Key{}) return item, err @@ -337,15 +340,15 @@ func (s *Reporter) CompareAndSwap(ctx context.Context, expected Item, replaceWit start := s.Clock().Now() lease, err := s.Backend.CompareAndSwap(ctx, expected, replaceWith) - writeLatencies.WithLabelValues(s.Component).Observe(s.Clock().Since(start).Seconds()) - writeRequests.WithLabelValues(s.Component).Inc() + backendmetrics.WriteLatencies.WithLabelValues(s.Component).Observe(s.Clock().Since(start).Seconds()) + backendmetrics.WriteRequests.WithLabelValues(s.Component).Inc() if err != nil { - writeRequestsFailed.WithLabelValues(s.Component).Inc() + backendmetrics.WriteRequestsFailed.WithLabelValues(s.Component).Inc() if trace.IsNotFound(err) || trace.IsCompareFailed(err) { - writeRequestsFailedPrecondition.WithLabelValues(s.Component).Inc() + backendmetrics.WriteRequestsFailedPrecondition.WithLabelValues(s.Component).Inc() } } else { - writes.WithLabelValues(s.Component).Inc() + backendmetrics.Writes.WithLabelValues(s.Component).Inc() } s.trackRequest(ctx, types.OpPut, expected.Key, Key{}) return lease, err @@ -364,15 +367,15 @@ func (s *Reporter) Delete(ctx context.Context, key Key) error { start := s.Clock().Now() err := s.Backend.Delete(ctx, key) - writeLatencies.WithLabelValues(s.Component).Observe(s.Clock().Since(start).Seconds()) - writeRequests.WithLabelValues(s.Component).Inc() + backendmetrics.WriteLatencies.WithLabelValues(s.Component).Observe(s.Clock().Since(start).Seconds()) + backendmetrics.WriteRequests.WithLabelValues(s.Component).Inc() if err != nil { - writeRequestsFailed.WithLabelValues(s.Component).Inc() + backendmetrics.WriteRequestsFailed.WithLabelValues(s.Component).Inc() if trace.IsNotFound(err) { - writeRequestsFailedPrecondition.WithLabelValues(s.Component).Inc() + backendmetrics.WriteRequestsFailedPrecondition.WithLabelValues(s.Component).Inc() } } else { - writes.WithLabelValues(s.Component).Inc() + backendmetrics.Writes.WithLabelValues(s.Component).Inc() } s.trackRequest(ctx, types.OpDelete, key, Key{}) return err @@ -392,15 +395,15 @@ func (s *Reporter) ConditionalDelete(ctx context.Context, key Key, revision stri start := s.Clock().Now() err := s.Backend.ConditionalDelete(ctx, key, revision) - writeLatencies.WithLabelValues(s.Component).Observe(s.Clock().Since(start).Seconds()) - writeRequests.WithLabelValues(s.Component).Inc() + backendmetrics.WriteLatencies.WithLabelValues(s.Component).Observe(s.Clock().Since(start).Seconds()) + backendmetrics.WriteRequests.WithLabelValues(s.Component).Inc() if err != nil { - writeRequestsFailed.WithLabelValues(s.Component).Inc() + backendmetrics.WriteRequestsFailed.WithLabelValues(s.Component).Inc() if trace.IsNotFound(err) { - writeRequestsFailedPrecondition.WithLabelValues(s.Component).Inc() + backendmetrics.WriteRequestsFailedPrecondition.WithLabelValues(s.Component).Inc() } } else { - writes.WithLabelValues(s.Component).Inc() + backendmetrics.Writes.WithLabelValues(s.Component).Inc() } s.trackRequest(ctx, types.OpDelete, key, Key{}) return err @@ -424,18 +427,18 @@ func (s *Reporter) AtomicWrite(ctx context.Context, condacts []ConditionalAction revision, err = s.Backend.AtomicWrite(ctx, condacts) elapsed := s.Clock().Since(start).Seconds() - writeLatencies.WithLabelValues(s.Component).Observe(elapsed) - atomicWriteLatencies.WithLabelValues(s.Component).Observe(elapsed) + backendmetrics.WriteLatencies.WithLabelValues(s.Component).Observe(elapsed) + backendmetrics.AtomicWriteLatencies.WithLabelValues(s.Component).Observe(elapsed) - writeRequests.WithLabelValues(s.Component).Inc() - atomicWriteRequests.WithLabelValues(s.Component).Inc() - atomicWriteSize.WithLabelValues(s.Component).Observe(float64(len(condacts))) + backendmetrics.WriteRequests.WithLabelValues(s.Component).Inc() + backendmetrics.AtomicWriteRequests.WithLabelValues(s.Component).Inc() + backendmetrics.AtomicWriteSize.WithLabelValues(s.Component).Observe(float64(len(condacts))) if err != nil { - writeRequestsFailed.WithLabelValues(s.Component).Inc() - atomicWriteRequestsFailed.WithLabelValues(s.Component).Inc() + backendmetrics.WriteRequestsFailed.WithLabelValues(s.Component).Inc() + backendmetrics.AtomicWriteRequestsFailed.WithLabelValues(s.Component).Inc() if errors.Is(err, ErrConditionFailed) { - writeRequestsFailedPrecondition.WithLabelValues(s.Component).Inc() - atomicWriteConditionFailed.WithLabelValues(s.Component).Inc() + backendmetrics.WriteRequestsFailedPrecondition.WithLabelValues(s.Component).Inc() + backendmetrics.AtomicWriteConditionFailed.WithLabelValues(s.Component).Inc() } } @@ -454,7 +457,7 @@ func (s *Reporter) AtomicWrite(ctx context.Context, condacts []ConditionalAction } if err == nil { - writes.WithLabelValues(s.Component).Add(float64(writeTotal)) + backendmetrics.Writes.WithLabelValues(s.Component).Add(float64(writeTotal)) } return } @@ -473,10 +476,10 @@ func (s *Reporter) DeleteRange(ctx context.Context, startKey, endKey Key) error start := s.Clock().Now() err := s.Backend.DeleteRange(ctx, startKey, endKey) - batchWriteLatencies.WithLabelValues(s.Component).Observe(s.Clock().Since(start).Seconds()) - batchWriteRequests.WithLabelValues(s.Component).Inc() + backendmetrics.BatchWriteLatencies.WithLabelValues(s.Component).Observe(s.Clock().Since(start).Seconds()) + backendmetrics.BatchWriteRequests.WithLabelValues(s.Component).Inc() if err != nil && !trace.IsNotFound(err) { - batchWriteRequestsFailed.WithLabelValues(s.Component).Inc() + backendmetrics.BatchWriteRequestsFailed.WithLabelValues(s.Component).Inc() } s.trackRequest(ctx, types.OpDelete, startKey, endKey) return err @@ -499,15 +502,15 @@ func (s *Reporter) KeepAlive(ctx context.Context, lease Lease, expires time.Time start := s.Clock().Now() err := s.Backend.KeepAlive(ctx, lease, expires) - writeLatencies.WithLabelValues(s.Component).Observe(s.Clock().Since(start).Seconds()) - writeRequests.WithLabelValues(s.Component).Inc() + backendmetrics.WriteLatencies.WithLabelValues(s.Component).Observe(s.Clock().Since(start).Seconds()) + backendmetrics.WriteRequests.WithLabelValues(s.Component).Inc() if err != nil { - writeRequestsFailed.WithLabelValues(s.Component).Inc() + backendmetrics.WriteRequestsFailed.WithLabelValues(s.Component).Inc() if trace.IsNotFound(err) { - writeRequestsFailedPrecondition.WithLabelValues(s.Component).Inc() + backendmetrics.WriteRequestsFailedPrecondition.WithLabelValues(s.Component).Inc() } } else { - writes.WithLabelValues(s.Component).Inc() + backendmetrics.Writes.WithLabelValues(s.Component).Inc() } s.trackRequest(ctx, types.OpPut, lease.Key, Key{}) return err @@ -582,7 +585,7 @@ func (s *Reporter) trackRequest(ctx context.Context, opType types.OpType, key Ke s.topRequestsCache.Get(cacheKey) } - counter, err := requests.GetMetricWithLabelValues(s.Component, keyLabel, rangeSuffix) + counter, err := backendmetrics.Requests.GetMetricWithLabelValues(s.Component, keyLabel, rangeSuffix) if err != nil { slog.WarnContext(ctx, "Failed to get prometheus counter", "error", err) return @@ -665,8 +668,8 @@ func NewReporterWatcher(ctx context.Context, component string, w Watcher) *Repor } func (r *ReporterWatcher) watch(ctx context.Context) { - watchers.WithLabelValues(r.Component).Inc() - defer watchers.WithLabelValues(r.Component).Dec() + backendmetrics.Watchers.WithLabelValues(r.Component).Inc() + defer backendmetrics.Watchers.WithLabelValues(r.Component).Dec() select { case <-r.Done(): return @@ -675,227 +678,5 @@ func (r *ReporterWatcher) watch(ctx context.Context) { } } -var ( - requests = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: teleport.MetricBackendRequests, - Help: "Number of requests to the backend (reads, writes, and keepalives)", - }, - []string{teleport.ComponentLabel, teleport.TagReq, teleport.TagRange}, - ) - watchers = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Name: teleport.MetricBackendWatchers, - Help: "Number of active backend watchers", - }, - []string{teleport.ComponentLabel}, - ) - watcherQueues = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Name: teleport.MetricBackendWatcherQueues, - Help: "Watcher queue sizes", - }, - []string{teleport.ComponentLabel}, - ) - writeRequests = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: teleport.MetricBackendWriteRequests, - Help: "Number of write requests to the backend", - }, - []string{teleport.ComponentLabel}, - ) - writes = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: teleport.MetricNamespace, - Name: teleport.MetricBackendWrites, - Help: "Number of individual items written to the backend", - }, - []string{teleport.ComponentLabel}, - ) - writeRequestsFailed = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: teleport.MetricBackendWriteFailedRequests, - Help: "Number of failed write requests to the backend", - }, - []string{teleport.ComponentLabel}, - ) - writeRequestsFailedPrecondition = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: teleport.MetricNamespace, - Name: teleport.MetricBackendWriteFailedPreconditionRequests, - Help: "Number of write requests that failed due to a precondition (existence, revision, value, etc)", - }, - []string{teleport.ComponentLabel}, - ) - atomicWriteRequests = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: teleport.MetricNamespace, - Name: teleport.MetricBackendAtomicWriteRequests, - Help: "Number of atomic write requests to the backend", - }, - []string{teleport.ComponentLabel}, - ) - atomicWriteRequestsFailed = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: teleport.MetricNamespace, - Name: teleport.MetricBackendAtomicWriteFailedRequests, - Help: "Number of failed atomic write requests to the backend", - }, - []string{teleport.ComponentLabel}, - ) - atomicWriteConditionFailed = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: teleport.MetricNamespace, - Name: teleport.MetricBackendAtomicWriteConditionFailed, - Help: "Number of times an atomic write request results in condition failure", - }, - []string{teleport.ComponentLabel}, - ) - atomicWriteLatencies = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: teleport.MetricNamespace, - Name: teleport.MetricBackendAtomicWriteHistogram, - Help: "Latency for backend atomic write operations", - // lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2 - // highest bucket start of 0.001 sec * 2^15 == 32.768 sec - Buckets: prometheus.ExponentialBuckets(0.001, 2, 16), - }, - []string{teleport.ComponentLabel}, - ) - atomicWriteSize = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: teleport.MetricNamespace, - Name: teleport.MetricBackendAtomicWriteSize, - Help: "Atomic write batch size", - // buckets of the form 1, 2, 4, 8, 16, etc... - Buckets: prometheus.ExponentialBuckets(1, 2, int(math.Sqrt(MaxAtomicWriteSize))), - }, - []string{teleport.ComponentLabel}, - ) - AtomicWriteContention = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: teleport.MetricNamespace, - Name: teleport.MetricBackendAtomicWriteContention, - Help: "Number of times atomic write requests experience contention", - }, - []string{teleport.ComponentLabel}, - ) - batchWriteRequests = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: teleport.MetricBackendBatchWriteRequests, - Help: "Number of batch write requests to the backend", - }, - []string{teleport.ComponentLabel}, - ) - batchWriteRequestsFailed = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: teleport.MetricBackendBatchFailedWriteRequests, - Help: "Number of failed write requests to the backend", - }, - []string{teleport.ComponentLabel}, - ) - readRequests = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: teleport.MetricBackendReadRequests, - Help: "Number of read requests to the backend", - }, - []string{teleport.ComponentLabel}, - ) - reads = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: teleport.MetricNamespace, - Name: teleport.MetricBackendReads, - Help: "Number of individual items read from the backend", - }, - []string{teleport.ComponentLabel}, - ) - readRequestsFailed = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: teleport.MetricBackendFailedReadRequests, - Help: "Number of failed read requests to the backend", - }, - []string{teleport.ComponentLabel}, - ) - streamingRequests = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: teleport.MetricNamespace, - Subsystem: "backend", - Name: "stream_requests", - Help: "Number of inflight stream requests to the backend", - }, - []string{teleport.ComponentLabel}, - ) - streamingRequestsFailed = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: teleport.MetricNamespace, - Subsystem: "backend", - Name: "stream_requests_failed", - Help: "Number of failed stream requests to the backend", - }, - []string{teleport.ComponentLabel}, - ) - batchReadRequests = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: teleport.MetricBackendBatchReadRequests, - Help: "Number of read requests to the backend", - }, - []string{teleport.ComponentLabel}, - ) - batchReadRequestsFailed = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: teleport.MetricBackendBatchFailedReadRequests, - Help: "Number of failed read requests to the backend", - }, - []string{teleport.ComponentLabel}, - ) - writeLatencies = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Name: teleport.MetricBackendWriteHistogram, - Help: "Latency for backend write operations", - // lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2 - // highest bucket start of 0.001 sec * 2^15 == 32.768 sec - Buckets: prometheus.ExponentialBuckets(0.001, 2, 16), - }, - []string{teleport.ComponentLabel}, - ) - batchWriteLatencies = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Name: teleport.MetricBackendBatchWriteHistogram, - Help: "Latency for backend batch write operations", - // lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2 - // highest bucket start of 0.001 sec * 2^15 == 32.768 sec - Buckets: prometheus.ExponentialBuckets(0.001, 2, 16), - }, - []string{teleport.ComponentLabel}, - ) - batchReadLatencies = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Name: teleport.MetricBackendBatchReadHistogram, - Help: "Latency for batch read operations", - // lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2 - // highest bucket start of 0.001 sec * 2^15 == 32.768 sec - Buckets: prometheus.ExponentialBuckets(0.001, 2, 16), - }, - []string{teleport.ComponentLabel}, - ) - readLatencies = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Name: teleport.MetricBackendReadHistogram, - Help: "Latency for read operations", - // lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2 - // highest bucket start of 0.001 sec * 2^15 == 32.768 sec - Buckets: prometheus.ExponentialBuckets(0.001, 2, 16), - }, - []string{teleport.ComponentLabel}, - ) - - prometheusCollectors = []prometheus.Collector{ - watchers, watcherQueues, requests, writeRequests, - writeRequestsFailed, batchWriteRequests, batchWriteRequestsFailed, readRequests, - readRequestsFailed, batchReadRequests, batchReadRequestsFailed, writeLatencies, - writeRequestsFailedPrecondition, - atomicWriteRequests, atomicWriteRequestsFailed, atomicWriteConditionFailed, atomicWriteLatencies, - AtomicWriteContention, atomicWriteSize, reads, writes, - batchWriteLatencies, batchReadLatencies, readLatencies, - } -) +// TODO(tross): Remove once crdb is converted to use backendmetrics directly. +var AtomicWriteContention = backendmetrics.AtomicWriteContention diff --git a/lib/backend/report_test.go b/lib/backend/report_test.go index 41217fcf27213..42b79ba8a6825 100644 --- a/lib/backend/report_test.go +++ b/lib/backend/report_test.go @@ -28,6 +28,7 @@ import ( "github.com/stretchr/testify/require" "github.com/gravitational/teleport/api/types" + "github.com/gravitational/teleport/lib/backend/backendmetrics" ) func TestReporterTopRequestsLimit(t *testing.T) { @@ -44,7 +45,7 @@ func TestReporterTopRequestsLimit(t *testing.T) { countTopRequests := func() int { ch := make(chan prometheus.Metric) go func() { - requests.Collect(ch) + backendmetrics.Requests.Collect(ch) close(ch) }() @@ -54,7 +55,7 @@ func TestReporterTopRequestsLimit(t *testing.T) { } return int(count) } - t.Cleanup(requests.Reset) + t.Cleanup(backendmetrics.Requests.Reset) // At first, the metric should have no values. require.Equal(t, 0, countTopRequests()) diff --git a/lib/cache/access_list.go b/lib/cache/access_list.go index 1fb08925b3558..a0add3ae224cf 100644 --- a/lib/cache/access_list.go +++ b/lib/cache/access_list.go @@ -41,6 +41,7 @@ func newAccessListCollection(upstream services.AccessLists, w types.WatchKind) ( return &collection[*accesslist.AccessList, accessListIndex]{ store: newStore( + types.KindAccessList, (*accesslist.AccessList).Clone, map[accessListIndex]func(*accesslist.AccessList) string{ accessListNameIndex: func(al *accesslist.AccessList) string { @@ -163,6 +164,7 @@ func newAccessListMemberCollection(upstream services.AccessLists, w types.WatchK return &collection[*accesslist.AccessListMember, accessListMemberIndex]{ store: newStore( + types.KindAccessListMember, (*accesslist.AccessListMember).Clone, map[accessListMemberIndex]func(*accesslist.AccessListMember) string{ accessListMemberNameIndex: func(r *accesslist.AccessListMember) string { @@ -330,6 +332,7 @@ func newAccessListReviewCollection(upstream services.AccessLists, w types.WatchK return &collection[*accesslist.Review, accessListReviewIndex]{ store: newStore( + types.KindAccessListReview, (*accesslist.Review).Clone, map[accessListReviewIndex]func(*accesslist.Review) string{ accessListReviewNameIndex: func(r *accesslist.Review) string { diff --git a/lib/cache/access_monitoring_rule.go b/lib/cache/access_monitoring_rule.go index 84ff672e55c77..11fcfc789b1b0 100644 --- a/lib/cache/access_monitoring_rule.go +++ b/lib/cache/access_monitoring_rule.go @@ -39,6 +39,7 @@ func newAccessMonitoringRuleCollection(upstream services.AccessMonitoringRules, return &collection[*accessmonitoringrulesv1.AccessMonitoringRule, accessMonitoringRuleIndex]{ store: newStore( + types.KindAccessMonitoringRule, proto.CloneOf[*accessmonitoringrulesv1.AccessMonitoringRule], map[accessMonitoringRuleIndex]func(*accessmonitoringrulesv1.AccessMonitoringRule) string{ accessMonitoringRuleNameIndex: func(r *accessmonitoringrulesv1.AccessMonitoringRule) string { diff --git a/lib/cache/app.go b/lib/cache/app.go index 6785406bfa904..52910e8ab63db 100644 --- a/lib/cache/app.go +++ b/lib/cache/app.go @@ -37,6 +37,7 @@ func newAppCollection(p services.Apps, w types.WatchKind) (*collection[types.App return &collection[types.Application, appIndex]{ store: newStore( + types.KindApp, func(a types.Application) types.Application { return a.Copy() }, @@ -110,6 +111,7 @@ func newAppServerCollection(p services.Presence, w types.WatchKind) (*collection return &collection[types.AppServer, appServerIndex]{ store: newStore( + types.KindAppServer, types.AppServer.Copy, map[appServerIndex]func(types.AppServer) string{ appServerNameIndex: func(u types.AppServer) string { diff --git a/lib/cache/auth_server.go b/lib/cache/auth_server.go index cf9a5bcf736a0..db673329ebefb 100644 --- a/lib/cache/auth_server.go +++ b/lib/cache/auth_server.go @@ -35,6 +35,7 @@ func newAuthServerCollection(p services.Presence, w types.WatchKind) (*collectio return &collection[types.Server, authServerIndex]{ store: newStore( + types.KindAuthServer, types.Server.DeepCopy, map[authServerIndex]func(types.Server) string{ authServerNameIndex: types.Server.GetName, diff --git a/lib/cache/auto_update.go b/lib/cache/auto_update.go index 196793e142518..c83c4caaf9faf 100644 --- a/lib/cache/auto_update.go +++ b/lib/cache/auto_update.go @@ -41,6 +41,7 @@ func newAutoUpdateConfigCollection(upstream services.AutoUpdateServiceGetter, w return &collection[*autoupdatev1.AutoUpdateConfig, autoUpdateConfigIndex]{ store: newStore( + types.KindAutoUpdateConfig, proto.CloneOf[*autoupdatev1.AutoUpdateConfig], map[autoUpdateConfigIndex]func(*autoupdatev1.AutoUpdateConfig) string{ autoUpdateConfigNameIndex: func(r *autoupdatev1.AutoUpdateConfig) string { @@ -107,6 +108,7 @@ func newAutoUpdateVersionCollection(upstream services.AutoUpdateServiceGetter, w return &collection[*autoupdatev1.AutoUpdateVersion, autoUpdateVersionIndex]{ store: newStore( + types.KindAutoUpdateVersion, proto.CloneOf[*autoupdatev1.AutoUpdateVersion], map[autoUpdateVersionIndex]func(*autoupdatev1.AutoUpdateVersion) string{ autoUpdateVersionNameIndex: func(r *autoupdatev1.AutoUpdateVersion) string { @@ -169,6 +171,7 @@ func newAutoUpdateRolloutCollection(upstream services.AutoUpdateServiceGetter, w return &collection[*autoupdatev1.AutoUpdateAgentRollout, autoUpdateAgentRolloutIndex]{ store: newStore( + types.KindAutoUpdateAgentRollout, proto.CloneOf[*autoupdatev1.AutoUpdateAgentRollout], map[autoUpdateAgentRolloutIndex]func(*autoupdatev1.AutoUpdateAgentRollout) string{ autoUpdateAgentRolloutNameIndex: func(r *autoupdatev1.AutoUpdateAgentRollout) string { @@ -231,6 +234,7 @@ func newAutoUpdateAgentReportCollection(upstream services.AutoUpdateServiceGette return &collection[*autoupdatev1.AutoUpdateAgentReport, autoUpdateAgentReportIndex]{ store: newStore( + types.KindSecurityReport, proto.CloneOf[*autoupdatev1.AutoUpdateAgentReport], map[autoUpdateAgentReportIndex]func(*autoupdatev1.AutoUpdateAgentReport) string{ autoUpdateAgentReportNameIndex: func(r *autoupdatev1.AutoUpdateAgentReport) string { diff --git a/lib/cache/cache.go b/lib/cache/cache.go index bb35179ee3b17..66b5a65f84fcd 100644 --- a/lib/cache/cache.go +++ b/lib/cache/cache.go @@ -44,6 +44,7 @@ import ( "github.com/gravitational/teleport/api/types" "github.com/gravitational/teleport/api/utils/retryutils" "github.com/gravitational/teleport/lib/backend" + "github.com/gravitational/teleport/lib/backend/backendmetrics" "github.com/gravitational/teleport/lib/defaults" "github.com/gravitational/teleport/lib/observability/metrics" "github.com/gravitational/teleport/lib/observability/tracing" @@ -678,8 +679,6 @@ type Config struct { // WorkloadIdentity is the upstream Workload Identities service that we're // caching WorkloadIdentity services.WorkloadIdentities - // Backend is a backend for local cache - Backend backend.Backend // MaxRetryPeriod is the maximum period between cache retries on failures MaxRetryPeriod time.Duration // WatcherInitTimeout is the maximum acceptable delay for an @@ -713,6 +712,8 @@ type Config struct { neverOK bool // Tracer is used to create spans Tracer oteltrace.Tracer + // Registerer is used to register prometheus metrics. + Registerer prometheus.Registerer // Unstarted indicates that the cache should not be started during New. The // cache is usable before it's started, but it will always hit the backend. Unstarted bool @@ -743,9 +744,6 @@ func (c *Config) CheckAndSetDefaults() error { if c.Events == nil { return trace.BadParameter("missing Events parameter") } - if c.Backend == nil { - return trace.BadParameter("missing Backend parameter") - } if c.Context == nil { c.Context = context.Background() } @@ -786,6 +784,9 @@ func (c *Config) CheckAndSetDefaults() error { if c.Tracer == nil { c.Tracer = tracing.NoopTracer(c.Component) } + if c.Registerer == nil { + c.Registerer = prometheus.DefaultRegisterer + } if c.FanoutShards == 0 { c.FanoutShards = 1 } @@ -818,7 +819,15 @@ const ( // New creates a new instance of Cache func New(config Config) (*Cache, error) { - if err := metrics.RegisterPrometheusCollectors( + if err := config.CheckAndSetDefaults(); err != nil { + return nil, trace.Wrap(err) + } + + if err := backendmetrics.RegisterCollectors(config.Registerer); err != nil { + return nil, trace.Wrap(err) + } + + if err := metrics.RegisterCollectors(config.Registerer, cacheEventsReceived, cacheStaleEventsReceived, cacheHealth, @@ -827,10 +836,6 @@ func New(config Config) (*Cache, error) { return nil, trace.Wrap(err) } - if err := config.CheckAndSetDefaults(); err != nil { - return nil, trace.Wrap(err) - } - ctx, cancel := context.WithCancel(config.Context) fnCache, err := utils.NewFnCache(utils.FnCacheConfig{ TTL: time.Second, diff --git a/lib/cache/cache_test.go b/lib/cache/cache_test.go index b6e6a64e38694..d9e0f732bbf55 100644 --- a/lib/cache/cache_test.go +++ b/lib/cache/cache_test.go @@ -98,12 +98,10 @@ func TestNodesDontCacheHighVolumeResources(t *testing.T) { // testPack contains pack of // services used for test run type testPack struct { - dataDir string - backend *backend.Wrapper - eventsC chan Event - cache *Cache - cacheBackend backend.Backend - + dataDir string + backend *backend.Wrapper + eventsC chan Event + cache *Cache eventsS *proxyEvents trustS *local.CA provisionerS *local.ProvisioningService @@ -251,15 +249,6 @@ func newPackWithoutCache(dir string, opts ...packOption) (*testPack, error) { } p.backend = backend.NewWrapper(bk) - p.cacheBackend, err = memory.New( - memory.Config{ - Context: ctx, - Mirror: true, - }) - if err != nil { - return nil, trace.Wrap(err) - } - p.eventsC = make(chan Event, eventBufferSize) clusterConfig, err := local.NewClusterConfigurationService(p.backend) @@ -437,7 +426,6 @@ func newPack(dir string, setupConfig func(c Config) Config, opts ...packOption) p.cache, err = New(setupConfig(Config{ Context: ctx, - Backend: p.cacheBackend, Events: p.eventsS, ClusterConfig: p.clusterConfigS, Provisioner: p.provisionerS, @@ -694,23 +682,14 @@ func TestCompletenessInit(t *testing.T) { require.NoError(t, p.trustS.UpsertCertAuthority(ctx, ca)) } - for i := 0; i < inits; i++ { - var err error - - p.cacheBackend, err = memory.New( - memory.Config{ - Context: ctx, - Mirror: true, - }) - require.NoError(t, err) - + for range inits { // simulate bad connection to auth server p.backend.SetReadError(trace.ConnectionProblem(nil, "backend is unavailable")) p.eventsS.closeWatchers() + var err error p.cache, err = New(ForAuth(Config{ Context: ctx, - Backend: p.cacheBackend, Events: p.eventsS, ClusterConfig: p.clusterConfigS, Provisioner: p.provisionerS, @@ -772,8 +751,6 @@ func TestCompletenessInit(t *testing.T) { require.NoError(t, p.cache.Close()) p.cache = nil - require.NoError(t, p.cacheBackend.Close()) - p.cacheBackend = nil } } @@ -797,7 +774,6 @@ func TestCompletenessReset(t *testing.T) { var err error p.cache, err = New(ForAuth(Config{ Context: ctx, - Backend: p.cacheBackend, Events: p.eventsS, ClusterConfig: p.clusterConfigS, Provisioner: p.provisionerS, @@ -954,7 +930,6 @@ func TestListResources_NodesTTLVariant(t *testing.T) { p.cache, err = New(ForAuth(Config{ Context: ctx, - Backend: p.cacheBackend, Events: p.eventsS, ClusterConfig: p.clusterConfigS, Provisioner: p.provisionerS, @@ -1052,7 +1027,6 @@ func initStrategy(t *testing.T) { var err error p.cache, err = New(ForAuth(Config{ Context: ctx, - Backend: p.cacheBackend, Events: p.eventsS, ClusterConfig: p.clusterConfigS, Provisioner: p.provisionerS, diff --git a/lib/cache/cert_authority.go b/lib/cache/cert_authority.go index 1c081e1a083a5..7bef43ceef7e1 100644 --- a/lib/cache/cert_authority.go +++ b/lib/cache/cert_authority.go @@ -41,6 +41,7 @@ func newCertAuthorityCollection(t services.Trust, w types.WatchKind) (*collectio return &collection[types.CertAuthority, certAuthorityIndex]{ store: newStore( + types.KindCertAuthority, types.CertAuthority.Clone, map[certAuthorityIndex]func(types.CertAuthority) string{ certAuthorityIDIndex: func(ca types.CertAuthority) string { diff --git a/lib/cache/cert_authority_test.go b/lib/cache/cert_authority_test.go index 07a5732271778..3862ece1168ba 100644 --- a/lib/cache/cert_authority_test.go +++ b/lib/cache/cert_authority_test.go @@ -27,7 +27,6 @@ import ( "github.com/stretchr/testify/require" "github.com/gravitational/teleport/api/types" - "github.com/gravitational/teleport/lib/backend/memory" "github.com/gravitational/teleport/lib/services" "github.com/gravitational/teleport/lib/services/suite" ) @@ -81,10 +80,6 @@ func TestNodeCAFiltering(t *testing.T) { err = p.clusterConfigS.UpsertClusterName(clusterName) require.NoError(t, err) - nodeCacheBackend, err := memory.New(memory.Config{}) - require.NoError(t, err) - t.Cleanup(func() { require.NoError(t, nodeCacheBackend.Close()) }) - // this mimics a cache for a node pulling events from the auth server via WatchEvents nodeCache, err := New(ForNode(Config{ Events: p.cache, @@ -97,7 +92,6 @@ func TestNodeCAFiltering(t *testing.T) { SAMLIdPServiceProviders: p.cache.SAMLIdPServiceProviders, UserGroups: p.cache.UserGroups, StaticHostUsers: p.cache.StaticHostUsers, - Backend: nodeCacheBackend, })) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, nodeCache.Close()) }) diff --git a/lib/cache/cluster_config.go b/lib/cache/cluster_config.go index 4e763376a879f..cfcf645ad3433 100644 --- a/lib/cache/cluster_config.go +++ b/lib/cache/cluster_config.go @@ -41,6 +41,7 @@ func newClusterNameCollection(c services.ClusterConfiguration, w types.WatchKind return &collection[types.ClusterName, clusterNameIndex]{ store: newStore( + types.KindClusterName, types.ClusterName.Clone, map[clusterNameIndex]func(types.ClusterName) string{ clusterNameDefaultIndex: types.ClusterName.GetName, @@ -103,6 +104,7 @@ func newClusterAuditConfigCollection(c services.ClusterConfiguration, w types.Wa return &collection[types.ClusterAuditConfig, clusterAuditConfigIndex]{ store: newStore( + types.KindClusterAuditConfig, types.ClusterAuditConfig.Clone, map[clusterAuditConfigIndex]func(types.ClusterAuditConfig) string{ clusterAuditConfigNameIndex: types.ClusterAuditConfig.GetName, @@ -169,6 +171,7 @@ func newClusterNetworkingConfigCollection(c services.ClusterConfiguration, w typ return &collection[types.ClusterNetworkingConfig, clusterNetworkingConfigIndex]{ store: newStore( + types.KindClusterNetworkingConfig, types.ClusterNetworkingConfig.Clone, map[clusterNetworkingConfigIndex]func(types.ClusterNetworkingConfig) string{ clusterNetworkingConfigNameIndex: types.ClusterNetworkingConfig.GetName, @@ -231,6 +234,7 @@ func newAuthPreferenceCollection(c services.ClusterConfiguration, w types.WatchK return &collection[types.AuthPreference, authPreferenceIndex]{ store: newStore( + types.KindClusterAuthPreference, types.AuthPreference.Clone, map[authPreferenceIndex]func(types.AuthPreference) string{ authPreferenceNameIndex: types.AuthPreference.GetName, @@ -287,6 +291,7 @@ func newSessionRecordingConfigCollection(c services.ClusterConfiguration, w type return &collection[types.SessionRecordingConfig, sessionRecordingConfigIndex]{ store: newStore( + types.KindSessionRecordingConfig, types.SessionRecordingConfig.Clone, map[sessionRecordingConfigIndex]func(types.SessionRecordingConfig) string{ sessionRecordingConfigNameIndex: types.SessionRecordingConfig.GetName, @@ -343,6 +348,7 @@ func newAccessGraphSettingsCollection(upstream services.ClusterConfiguration, w return &collection[*clusterconfigv1.AccessGraphSettings, accessGraphSettingsIndex]{ store: newStore( + types.KindAccessGraphSettings, proto.CloneOf[*clusterconfigv1.AccessGraphSettings], map[accessGraphSettingsIndex]func(*clusterconfigv1.AccessGraphSettings) string{ accessGraphSettingsNameIndex: func(r *clusterconfigv1.AccessGraphSettings) string { diff --git a/lib/cache/crown_jewel.go b/lib/cache/crown_jewel.go index a1a5e6ebd92d3..258afcb1a5c98 100644 --- a/lib/cache/crown_jewel.go +++ b/lib/cache/crown_jewel.go @@ -39,6 +39,7 @@ func newCrownJewelCollection(upstream services.CrownJewels, w types.WatchKind) ( return &collection[*crownjewelv1.CrownJewel, crownJewelIndex]{ store: newStore( + types.KindCrownJewel, proto.CloneOf[*crownjewelv1.CrownJewel], map[crownJewelIndex]func(*crownjewelv1.CrownJewel) string{ crownJewelNameIndex: func(r *crownjewelv1.CrownJewel) string { diff --git a/lib/cache/database.go b/lib/cache/database.go index 5bea3d8e351e4..ab04f9789a51a 100644 --- a/lib/cache/database.go +++ b/lib/cache/database.go @@ -42,6 +42,7 @@ func newDatabaseCollection(p services.Databases, w types.WatchKind) (*collection return &collection[types.Database, databaseIndex]{ store: newStore( + types.KindDatabase, func(d types.Database) types.Database { return d.Copy() }, @@ -123,6 +124,7 @@ func newDatabaseServerCollection(p services.Presence, w types.WatchKind) (*colle return &collection[types.DatabaseServer, databaseServerIndex]{ store: newStore( + types.KindDatabaseServer, types.DatabaseServer.Copy, map[databaseServerIndex]func(types.DatabaseServer) string{ databaseServerNameIndex: func(u types.DatabaseServer) string { @@ -183,6 +185,7 @@ func newDatabaseServiceCollection(p services.Presence, w types.WatchKind) (*coll return &collection[types.DatabaseService, databaseServiceIndex]{ store: newStore( + types.KindDatabaseService, types.DatabaseService.Clone, map[databaseServiceIndex]func(types.DatabaseService) string{ databaseServiceNameIndex: types.DatabaseService.GetName, @@ -230,6 +233,7 @@ func newDatabaseObjectCollection(upstream services.DatabaseObjects, w types.Watc return &collection[*dbobjectv1.DatabaseObject, databaseObjectIndex]{ store: newStore( + types.KindDatabaseObject, proto.CloneOf[*dbobjectv1.DatabaseObject], map[databaseObjectIndex]func(*dbobjectv1.DatabaseObject) string{ databaseObjectNameIndex: func(r *dbobjectv1.DatabaseObject) string { diff --git a/lib/cache/discovery_config.go b/lib/cache/discovery_config.go index c5b0a7ec60cab..34dc27e44f872 100644 --- a/lib/cache/discovery_config.go +++ b/lib/cache/discovery_config.go @@ -38,6 +38,7 @@ func newDiscoveryConfigCollection(upstream services.DiscoveryConfigs, w types.Wa return &collection[*discoveryconfig.DiscoveryConfig, discoveryConfigIndex]{ store: newStore( + types.KindDiscoveryConfig, (*discoveryconfig.DiscoveryConfig).Clone, map[discoveryConfigIndex]func(*discoveryconfig.DiscoveryConfig) string{ discoveryConfigNameIndex: func(r *discoveryconfig.DiscoveryConfig) string { diff --git a/lib/cache/git_server.go b/lib/cache/git_server.go index ce909aa8df858..a1bf6a358b8a7 100644 --- a/lib/cache/git_server.go +++ b/lib/cache/git_server.go @@ -40,6 +40,7 @@ func newGitServerCollection(upstream services.GitServerGetter, w types.WatchKind return &collection[types.Server, gitServerIndex]{ store: newStore( + types.KindGitServer, types.Server.DeepCopy, map[gitServerIndex]func(types.Server) string{ gitServerNameIndex: types.Server.GetName, diff --git a/lib/cache/health_check_config.go b/lib/cache/health_check_config.go index eaba2b0669763..f9fd59d51a28e 100644 --- a/lib/cache/health_check_config.go +++ b/lib/cache/health_check_config.go @@ -42,6 +42,7 @@ func newHealthCheckConfigCollection(upstream services.HealthCheckConfigReader, w return &collection[*healthcheckconfigv1.HealthCheckConfig, healthCheckConfigIndex]{ store: newStore( + types.KindHealthCheckConfig, proto.CloneOf[*healthcheckconfigv1.HealthCheckConfig], map[healthCheckConfigIndex]func(*healthcheckconfigv1.HealthCheckConfig) string{ healthCheckConfigNameIndex: func(r *healthcheckconfigv1.HealthCheckConfig) string { diff --git a/lib/cache/identitycenter.go b/lib/cache/identitycenter.go index 77a5e017cb95e..f5025df522914 100644 --- a/lib/cache/identitycenter.go +++ b/lib/cache/identitycenter.go @@ -41,6 +41,7 @@ func newIdentityCenterAccountCollection(ic services.IdentityCenter, w types.Watc return &collection[*identitycenterv1.Account, identityCenterAccountIndex]{ store: newStore( + types.KindIdentityCenterAccount, proto.CloneOf[*identitycenterv1.Account], map[identityCenterAccountIndex]func(*identitycenterv1.Account) string{ identityCenterAccountNameIndex: func(r *identitycenterv1.Account) string { @@ -150,6 +151,7 @@ func newIdentityCenterAccountAssignmentCollection(ic services.IdentityCenter, w return &collection[*identitycenterv1.AccountAssignment, identityCenterAccountAssignmentIndex]{ store: newStore( + types.KindIdentityCenterAccountAssignment, proto.CloneOf[*identitycenterv1.AccountAssignment], map[identityCenterAccountAssignmentIndex]func(*identitycenterv1.AccountAssignment) string{ identityCenterAccountAssignmentNameIndex: func(r *identitycenterv1.AccountAssignment) string { @@ -262,6 +264,7 @@ func newIdentityCenterPrincipalAssignmentCollection(upstream services.IdentityCe return &collection[*identitycenterv1.PrincipalAssignment, identityCenterPrincipalAssignmentIndex]{ store: newStore( + types.KindIdentityCenterPrincipalAssignment, proto.CloneOf[*identitycenterv1.PrincipalAssignment], map[identityCenterPrincipalAssignmentIndex]func(*identitycenterv1.PrincipalAssignment) string{ identityCenterPrincipalAssignmentNameIndex: func(r *identitycenterv1.PrincipalAssignment) string { diff --git a/lib/cache/installer.go b/lib/cache/installer.go index cb7e7ac2c7326..d563acc6e5764 100644 --- a/lib/cache/installer.go +++ b/lib/cache/installer.go @@ -36,6 +36,7 @@ func newInstallerCollection(upstream services.ClusterConfiguration, w types.Watc return &collection[types.Installer, installerIndex]{ store: newStore( + types.KindInstaller, types.Installer.Clone, map[installerIndex]func(types.Installer) string{ installerNameIndex: types.Installer.GetName, diff --git a/lib/cache/integrations.go b/lib/cache/integrations.go index 0bc5fed705250..a2131da498923 100644 --- a/lib/cache/integrations.go +++ b/lib/cache/integrations.go @@ -36,6 +36,7 @@ func newIntegrationCollection(upstream services.Integrations, w types.WatchKind) return &collection[types.Integration, integrationIndex]{ store: newStore( + types.KindIntegration, types.Integration.Clone, map[integrationIndex]func(types.Integration) string{ integrationNameIndex: types.Integration.GetName, diff --git a/lib/cache/kube.go b/lib/cache/kube.go index eda9fe0bfd983..70310183a4842 100644 --- a/lib/cache/kube.go +++ b/lib/cache/kube.go @@ -39,6 +39,7 @@ func newKubernetesServerCollection(p services.Presence, w types.WatchKind) (*col return &collection[types.KubeServer, kubeServerIndex]{ store: newStore( + types.KindKubeServer, types.KubeServer.Copy, map[kubeServerIndex]func(types.KubeServer) string{ kubeServerNameIndex: func(u types.KubeServer) string { @@ -99,6 +100,7 @@ func newKubernetesClusterCollection(k services.Kubernetes, w types.WatchKind) (* return &collection[types.KubeCluster, kubeClusterIndex]{ store: newStore( + types.KindKubernetesCluster, types.KubeCluster.Copy, map[kubeClusterIndex]func(types.KubeCluster) string{ kubeClusterNameIndex: types.KubeCluster.GetName, @@ -178,6 +180,7 @@ func newKubernetesWaitingContainerCollection(upstream services.KubeWaitingContai return &collection[*kubewaitingcontainerv1.KubernetesWaitingContainer, kubeWaitingContainerIndex]{ store: newStore( + types.KindKubeWaitingContainer, proto.CloneOf[*kubewaitingcontainerv1.KubernetesWaitingContainer], map[kubeWaitingContainerIndex]func(*kubewaitingcontainerv1.KubernetesWaitingContainer) string{ kubeWaitingContainerNameIndex: func(u *kubewaitingcontainerv1.KubernetesWaitingContainer) string { diff --git a/lib/cache/lock.go b/lib/cache/lock.go index ee069af768f79..ce1f975db9d37 100644 --- a/lib/cache/lock.go +++ b/lib/cache/lock.go @@ -37,6 +37,7 @@ func newLockCollection(upstream services.Access, w types.WatchKind) (*collection return &collection[types.Lock, lockIndex]{ store: newStore( + types.KindLock, types.Lock.Clone, map[lockIndex]func(types.Lock) string{ lockNameIndex: types.Lock.GetName, diff --git a/lib/cache/network_restrictions.go b/lib/cache/network_restrictions.go index e37cf1d54dc6b..a61c5ffdcdee1 100644 --- a/lib/cache/network_restrictions.go +++ b/lib/cache/network_restrictions.go @@ -36,6 +36,7 @@ func newNetworkingRestrictionCollection(upstream services.Restrictions, w types. return &collection[types.NetworkRestrictions, networkingRestrictionIndex]{ store: newStore( + types.KindNetworkRestrictions, types.NetworkRestrictions.Clone, map[networkingRestrictionIndex]func(types.NetworkRestrictions) string{ networkingRestrictionNameIndex: types.NetworkRestrictions.GetName, diff --git a/lib/cache/node.go b/lib/cache/node.go index 197451df475c5..9b30819004eab 100644 --- a/lib/cache/node.go +++ b/lib/cache/node.go @@ -38,6 +38,7 @@ func newNodeCollection(p services.Presence, w types.WatchKind) (*collection[type return &collection[types.Server, nodeIndex]{ store: newStore( + types.KindNode, types.Server.DeepCopy, map[nodeIndex]func(types.Server) string{ nodeNameIndex: types.Server.GetName, diff --git a/lib/cache/notification.go b/lib/cache/notification.go index 310dbf9c34f5b..01dd96a0d27ab 100644 --- a/lib/cache/notification.go +++ b/lib/cache/notification.go @@ -38,6 +38,7 @@ func newUserNotificationCollection(upstream services.NotificationGetter, w types return &collection[*notificationsv1.Notification, userNotificationIndex]{ store: newStore( + types.KindNotification, proto.CloneOf[*notificationsv1.Notification], map[userNotificationIndex]func(*notificationsv1.Notification) string{ userNotificationNameIndex: func(r *notificationsv1.Notification) string { @@ -95,6 +96,7 @@ func newGlobalNotificationCollection(upstream services.NotificationGetter, w typ return &collection[*notificationsv1.GlobalNotification, globalNotificationIndex]{ store: newStore( + types.KindGlobalNotification, proto.CloneOf[*notificationsv1.GlobalNotification], map[globalNotificationIndex]func(*notificationsv1.GlobalNotification) string{ globalNotificationNameIndex: func(r *notificationsv1.GlobalNotification) string { diff --git a/lib/cache/okta.go b/lib/cache/okta.go index 259d9746ff95c..ee3f833f9db00 100644 --- a/lib/cache/okta.go +++ b/lib/cache/okta.go @@ -36,6 +36,7 @@ func newOktaImportRuleCollection(upstream services.Okta, w types.WatchKind) (*co return &collection[types.OktaImportRule, oktaImportRuleIndex]{ store: newStore( + types.KindOktaImportRule, types.OktaImportRule.Clone, map[oktaImportRuleIndex]func(types.OktaImportRule) string{ oktaImportRuleNameIndex: types.OktaImportRule.GetName, @@ -119,6 +120,7 @@ func newOktaImportAssignmentCollection(upstream services.Okta, w types.WatchKind return &collection[types.OktaAssignment, oktaAssignmentIndex]{ store: newStore( + types.KindOktaAssignment, types.OktaAssignment.Copy, map[oktaAssignmentIndex]func(types.OktaAssignment) string{ oktaAssignmentNameIndex: types.OktaAssignment.GetName, diff --git a/lib/cache/plugin_static_credentials.go b/lib/cache/plugin_static_credentials.go index 3ce8d34c960eb..b979ade3f6bc1 100644 --- a/lib/cache/plugin_static_credentials.go +++ b/lib/cache/plugin_static_credentials.go @@ -36,6 +36,7 @@ func newPluginStaticCredentialsCollection(upstream services.PluginStaticCredenti return &collection[types.PluginStaticCredentials, pluginStaticCredentialsIndex]{ store: newStore( + types.KindPluginStaticCredentials, types.PluginStaticCredentials.Clone, map[pluginStaticCredentialsIndex]func(types.PluginStaticCredentials) string{ pluginStaticCredentialsNameIndex: types.PluginStaticCredentials.GetName, diff --git a/lib/cache/provisioning.go b/lib/cache/provisioning.go index 6b53e38ce490f..0b5795a205d11 100644 --- a/lib/cache/provisioning.go +++ b/lib/cache/provisioning.go @@ -40,6 +40,7 @@ func newPrincipalStateCollection(upstream services.ProvisioningStates, w types.W return &collection[*provisioningv1.PrincipalState, principalStateIndex]{ store: newStore( + types.KindProvisioningPrincipalState, proto.CloneOf[*provisioningv1.PrincipalState], map[principalStateIndex]func(*provisioningv1.PrincipalState) string{ principalStateNameIndex: func(r *provisioningv1.PrincipalState) string { diff --git a/lib/cache/proxy_server.go b/lib/cache/proxy_server.go index ae8a0a766ac52..73d78ebfbe021 100644 --- a/lib/cache/proxy_server.go +++ b/lib/cache/proxy_server.go @@ -36,6 +36,7 @@ func newProxyServerCollection(p services.Presence, w types.WatchKind) (*collecti return &collection[types.Server, proxyServerIndex]{ store: newStore( + types.KindProxy, types.Server.DeepCopy, map[proxyServerIndex]func(types.Server) string{ proxyServerNameIndex: types.Server.GetName, diff --git a/lib/cache/remote_cluster.go b/lib/cache/remote_cluster.go index b2995bc3969c1..40d478622a397 100644 --- a/lib/cache/remote_cluster.go +++ b/lib/cache/remote_cluster.go @@ -38,6 +38,7 @@ func newTunnelConnectionCollection(upstream services.Trust, w types.WatchKind) ( return &collection[types.TunnelConnection, tunnelConnectionIndex]{ store: newStore( + types.KindTunnelConnection, types.TunnelConnection.Clone, map[tunnelConnectionIndex]func(types.TunnelConnection) string{ tunnelConnectionNameIndex: func(tc types.TunnelConnection) string { @@ -125,6 +126,7 @@ func newRemoteClusterCollection(upstream services.Trust, w types.WatchKind) (*co return &collection[types.RemoteCluster, remoteClusterIndex]{ store: newStore( + types.KindRemoteCluster, types.RemoteCluster.Clone, map[remoteClusterIndex]func(types.RemoteCluster) string{ remoteClusterNameIndex: types.RemoteCluster.GetName, diff --git a/lib/cache/reverse_tunnel.go b/lib/cache/reverse_tunnel.go index 5a99578876380..9c7fde6641b2c 100644 --- a/lib/cache/reverse_tunnel.go +++ b/lib/cache/reverse_tunnel.go @@ -37,6 +37,7 @@ func newReverseTunnelCollection(upstream services.Presence, w types.WatchKind) ( return &collection[types.ReverseTunnel, reverseTunnelIndex]{ store: newStore( + types.KindReverseTunnel, types.ReverseTunnel.Clone, map[reverseTunnelIndex]func(types.ReverseTunnel) string{ reverseTunnelNameIndex: types.ReverseTunnel.GetName, diff --git a/lib/cache/role.go b/lib/cache/role.go index e8b17bd2077b5..0000884a5ea67 100644 --- a/lib/cache/role.go +++ b/lib/cache/role.go @@ -36,6 +36,7 @@ func newRoleCollection(a services.Access, w types.WatchKind) (*collection[types. return &collection[types.Role, roleIndex]{ store: newStore( + types.KindRole, types.Role.Clone, map[roleIndex]func(types.Role) string{ roleNameIndex: types.Role.GetName, diff --git a/lib/cache/saml_idp.go b/lib/cache/saml_idp.go index 952e1caa8564b..98e9afd5637b7 100644 --- a/lib/cache/saml_idp.go +++ b/lib/cache/saml_idp.go @@ -36,6 +36,7 @@ func newSAMLIdPServiceProviderCollection(upstream services.SAMLIdPServiceProvide return &collection[types.SAMLIdPServiceProvider, samlIdPServiceProviderIndex]{ store: newStore( + types.KindSAMLIdPServiceProvider, types.SAMLIdPServiceProvider.Copy, map[samlIdPServiceProviderIndex]func(types.SAMLIdPServiceProvider) string{ samlIdPServiceProviderNameIndex: types.SAMLIdPServiceProvider.GetName, diff --git a/lib/cache/security_report.go b/lib/cache/security_report.go index abfcba742384e..39b938c28d460 100644 --- a/lib/cache/security_report.go +++ b/lib/cache/security_report.go @@ -38,6 +38,7 @@ func newAuditQueryCollection(upstream services.SecReports, w types.WatchKind) (* return &collection[*secreports.AuditQuery, auditQueryIndex]{ store: newStore( + types.KindAuditQuery, (*secreports.AuditQuery).Clone, map[auditQueryIndex]func(*secreports.AuditQuery) string{ auditQueryNameIndex: func(r *secreports.AuditQuery) string { @@ -149,6 +150,7 @@ func newSecurityReportCollection(upstream services.SecReports, w types.WatchKind return &collection[*secreports.Report, securityReportIndex]{ store: newStore( + types.KindSecurityReport, (*secreports.Report).Clone, map[securityReportIndex]func(*secreports.Report) string{ securityReportNameIndex: func(r *secreports.Report) string { @@ -261,6 +263,7 @@ func newSecurityReportStateCollection(upstream services.SecReports, w types.Watc return &collection[*secreports.ReportState, securityReportStateIndex]{ store: newStore( + types.KindSecurityReportState, (*secreports.ReportState).Clone, map[securityReportStateIndex]func(*secreports.ReportState) string{ securityReportStateNameIndex: func(r *secreports.ReportState) string { diff --git a/lib/cache/spiffe_federation.go b/lib/cache/spiffe_federation.go index 6f08a9727e7fe..b781205431768 100644 --- a/lib/cache/spiffe_federation.go +++ b/lib/cache/spiffe_federation.go @@ -40,6 +40,7 @@ func newSPIFFEFederationCollection(upstream services.SPIFFEFederations, w types. return &collection[*machineidv1.SPIFFEFederation, spiffeFederationIndex]{ store: newStore( + types.KindSPIFFEFederation, proto.CloneOf[*machineidv1.SPIFFEFederation], map[spiffeFederationIndex]func(*machineidv1.SPIFFEFederation) string{ spiffeFederationNameIndex: func(r *machineidv1.SPIFFEFederation) string { diff --git a/lib/cache/static_host_user.go b/lib/cache/static_host_user.go index 0a56181e2d30a..9c59b5bc37340 100644 --- a/lib/cache/static_host_user.go +++ b/lib/cache/static_host_user.go @@ -39,6 +39,7 @@ func newStaticHostUserCollection(upstream services.StaticHostUser, w types.Watch return &collection[*userprovisioningv2.StaticHostUser, staticHostUserIndex]{ store: newStore( + types.KindStaticHostUser, proto.CloneOf[*userprovisioningv2.StaticHostUser], map[staticHostUserIndex]func(*userprovisioningv2.StaticHostUser) string{ staticHostUserNameIndex: func(shu *userprovisioningv2.StaticHostUser) string { diff --git a/lib/cache/store.go b/lib/cache/store.go index d9c338d13b193..ccb4b99bb6b0e 100644 --- a/lib/cache/store.go +++ b/lib/cache/store.go @@ -18,15 +18,17 @@ package cache import ( "iter" - "reflect" + "time" "github.com/gravitational/trace" + "github.com/gravitational/teleport/lib/backend/backendmetrics" "github.com/gravitational/teleport/lib/utils/sortcache" ) // store persists cached resources directly in memory. type store[T any, I comparable] struct { + kind string cache *sortcache.SortCache[T, I] clone func(T) T indexes map[I]func(T) string @@ -34,8 +36,9 @@ type store[T any, I comparable] struct { // newStore creates a store that will index the resource // based on the provided indexes. -func newStore[T any, I comparable](clone func(T) T, indexes map[I]func(T) string) *store[T, I] { +func newStore[T any, I comparable](kind string, clone func(T) T, indexes map[I]func(T) string) *store[T, I] { return &store[T, I]{ + kind: kind, clone: clone, indexes: indexes, cache: sortcache.New(sortcache.Config[T, I]{ @@ -46,21 +49,33 @@ func newStore[T any, I comparable](clone func(T) T, indexes map[I]func(T) string // clear removes all items from the store. func (s *store[T, I]) clear() error { + start := time.Now() s.cache.Clear() + backendmetrics.BatchWriteLatencies.WithLabelValues("cache").Observe(time.Since(start).Seconds()) + backendmetrics.BatchWriteRequests.WithLabelValues("cache").Inc() + backendmetrics.Requests.WithLabelValues("cache", s.kind, "true").Inc() return nil } // put adds a new item, or updates an existing item. func (s *store[T, I]) put(t T) error { + start := time.Now() s.cache.Put(s.clone(t)) + backendmetrics.WriteLatencies.WithLabelValues("cache").Observe(time.Since(start).Seconds()) + backendmetrics.WriteRequests.WithLabelValues("cache").Inc() + backendmetrics.Requests.WithLabelValues("cache", s.kind, "false").Inc() return nil } // delete removes the provided item if any of the indexes match. func (s *store[T, I]) delete(t T) error { + start := time.Now() for idx, transform := range s.indexes { s.cache.Delete(idx, transform(t)) } + backendmetrics.WriteLatencies.WithLabelValues("cache").Observe(time.Since(start).Seconds()) + backendmetrics.WriteRequests.WithLabelValues("cache").Inc() + backendmetrics.Requests.WithLabelValues("cache", s.kind, "false").Inc() return nil } @@ -76,9 +91,14 @@ func (s *store[T, I]) len() int { // It is the responsibility of the caller to clone the resource // before propagating it further. func (s *store[T, I]) get(index I, key string) (T, error) { + start := time.Now() t, ok := s.cache.Get(index, key) + backendmetrics.ReadLatencies.WithLabelValues("cache").Observe(time.Since(start).Seconds()) + backendmetrics.ReadRequests.WithLabelValues("cache").Inc() + backendmetrics.Requests.WithLabelValues("cache", s.kind, "false").Inc() if !ok { - return t, trace.NotFound("%v %q does not exist", reflect.TypeFor[T](), key) + backendmetrics.ReadRequestsFailed.WithLabelValues("cache").Inc() + return t, trace.NotFound("%q %q does not exist", s.kind, key) } return t, nil @@ -90,7 +110,19 @@ func (s *store[T, I]) get(index I, key string) (T, error) { // It is the responsibility of the caller to clone the resource // before propagating it further. func (s *store[T, I]) resources(index I, start, stop string) iter.Seq[T] { - return s.cache.Ascend(index, start, stop) + return func(yield func(T) bool) { + defer func() { + backendmetrics.StreamingRequests.WithLabelValues("cache").Inc() + backendmetrics.Requests.WithLabelValues("cache", s.kind, "false").Inc() + }() + + for t := range s.cache.Ascend(index, start, stop) { + backendmetrics.ReadRequests.WithLabelValues("cache").Inc() + if !yield(t) { + return + } + } + } } // count returns the number of items that exist in the provided range. diff --git a/lib/cache/store_test.go b/lib/cache/store_test.go index 215d0540b8368..0c8a0ed61e57f 100644 --- a/lib/cache/store_test.go +++ b/lib/cache/store_test.go @@ -27,6 +27,7 @@ import ( func TestResourceStore(t *testing.T) { store := newStore( + "int", func(i int) int { return i }, map[string]func(i int) string{ "numbers": strconv.Itoa, @@ -43,7 +44,7 @@ func TestResourceStore(t *testing.T) { require.Equal(t, 0, zero) n, err := store.get("numbers", "1000") - require.ErrorIs(t, err, &trace.NotFoundError{Message: `int "1000" does not exist`}) + require.ErrorIs(t, err, &trace.NotFoundError{Message: `"int" "1000" does not exist`}) require.Equal(t, 0, n) v, err := store.get("characters", "1c") @@ -58,12 +59,12 @@ func TestResourceStore(t *testing.T) { require.NoError(t, store.delete(0)) _, err = store.get("numbers", "0") - require.ErrorIs(t, err, &trace.NotFoundError{Message: `int "0" does not exist`}) + require.ErrorIs(t, err, &trace.NotFoundError{Message: `"int" "0" does not exist`}) require.NoError(t, store.clear()) _, err = store.get("numbers", "0") - require.ErrorIs(t, err, &trace.NotFoundError{Message: `int "0" does not exist`}) + require.ErrorIs(t, err, &trace.NotFoundError{Message: `"int" "0" does not exist`}) require.Zero(t, store.len()) } diff --git a/lib/cache/tokens.go b/lib/cache/tokens.go index 4c23521eac4ec..fb40242565aac 100644 --- a/lib/cache/tokens.go +++ b/lib/cache/tokens.go @@ -36,6 +36,7 @@ func newStaticTokensCollection(c services.ClusterConfiguration, w types.WatchKin return &collection[types.StaticTokens, staticTokensIndex]{ store: newStore( + types.KindStaticTokens, types.StaticTokens.Clone, map[staticTokensIndex]func(types.StaticTokens) string{ staticTokensNameIndex: types.StaticTokens.GetName, @@ -91,6 +92,7 @@ func newProvisionTokensCollection(p services.Provisioner, w types.WatchKind) (*c return &collection[types.ProvisionToken, provisionTokenIndex]{ store: newStore( + types.KindToken, types.ProvisionToken.Clone, map[provisionTokenIndex]func(types.ProvisionToken) string{ provisionTokenStoreNameIndex: types.ProvisionToken.GetName, diff --git a/lib/cache/user_group.go b/lib/cache/user_group.go index c6c1919054be4..3e8ac63a66ec2 100644 --- a/lib/cache/user_group.go +++ b/lib/cache/user_group.go @@ -38,6 +38,7 @@ func newUserGroupCollection(u services.UserGroups, w types.WatchKind) (*collecti return &collection[types.UserGroup, userGroupIndex]{ store: newStore( + types.KindUserGroup, types.UserGroup.Clone, map[userGroupIndex]func(types.UserGroup) string{ userGroupNameIndex: types.UserGroup.GetName, diff --git a/lib/cache/user_login_state.go b/lib/cache/user_login_state.go index 1e5c28f40a4ed..4663f52a40ea9 100644 --- a/lib/cache/user_login_state.go +++ b/lib/cache/user_login_state.go @@ -38,6 +38,7 @@ func newUserLoginStateCollection(upstream services.UserLoginStates, w types.Watc return &collection[*userloginstate.UserLoginState, userLoginStateIndex]{ store: newStore( + types.KindUserLoginState, (*userloginstate.UserLoginState).Clone, map[userLoginStateIndex]func(*userloginstate.UserLoginState) string{ userLoginStateNameIndex: func(r *userloginstate.UserLoginState) string { diff --git a/lib/cache/user_task.go b/lib/cache/user_task.go index b491a78340b27..db950d410ffeb 100644 --- a/lib/cache/user_task.go +++ b/lib/cache/user_task.go @@ -39,6 +39,7 @@ func newUserTaskCollection(upstream services.UserTasks, w types.WatchKind) (*col return &collection[*usertasksv1.UserTask, userTaskIndex]{ store: newStore( + types.KindUserTask, proto.CloneOf[*usertasksv1.UserTask], map[userTaskIndex]func(*usertasksv1.UserTask) string{ userTaskNameIndex: func(r *usertasksv1.UserTask) string { diff --git a/lib/cache/users.go b/lib/cache/users.go index ccf1e5e9bbf7e..a09b5e10b697f 100644 --- a/lib/cache/users.go +++ b/lib/cache/users.go @@ -40,6 +40,7 @@ func newUserCollection(u services.UsersService, w types.WatchKind) (*collection[ return &collection[types.User, userIndex]{ store: newStore( + types.KindUser, types.User.Clone, map[userIndex]func(types.User) string{ userNameIndex: types.User.GetName, diff --git a/lib/cache/web_session.go b/lib/cache/web_session.go index 49859441dd006..ff35bad2f8fa6 100644 --- a/lib/cache/web_session.go +++ b/lib/cache/web_session.go @@ -38,6 +38,7 @@ func newWebSessionCollection(upstream types.WebSessionInterface, w types.WatchKi return &collection[types.WebSession, webSessionIndex]{ store: newStore( + types.KindWebSession, types.WebSession.Copy, map[webSessionIndex]func(types.WebSession) string{ webSessionNameIndex: types.WebSession.GetName, @@ -116,6 +117,7 @@ func newAppSessionCollection(upstream services.AppSession, w types.WatchKind) (* return &collection[types.WebSession, appSessionIndex]{ store: newStore( + types.KindAppSession, types.WebSession.Copy, map[appSessionIndex]func(types.WebSession) string{ appSessionNameIndex: types.WebSession.GetName, @@ -252,6 +254,7 @@ func newSnowflakeSessionCollection(upstream services.SnowflakeSession, w types.W return &collection[types.WebSession, snowflakeSessionIndex]{ store: newStore( + types.KindSnowflakeSession, types.WebSession.Copy, map[snowflakeSessionIndex]func(types.WebSession) string{ snowflakeSessionNameIndex: types.WebSession.GetName, diff --git a/lib/cache/web_tokens.go b/lib/cache/web_tokens.go index 7ac90fc92ef5f..6446049d464a1 100644 --- a/lib/cache/web_tokens.go +++ b/lib/cache/web_tokens.go @@ -35,6 +35,7 @@ func newWebTokenCollection(upstream types.WebTokenInterface, w types.WatchKind) return &collection[types.WebToken, webTokenIndex]{ store: newStore( + types.KindWebToken, types.WebToken.Clone, map[webTokenIndex]func(types.WebToken) string{ webTokenNameIndex: types.WebToken.GetName, diff --git a/lib/cache/web_ui_config.go b/lib/cache/web_ui_config.go index b542fc65ac07b..493701c731bd0 100644 --- a/lib/cache/web_ui_config.go +++ b/lib/cache/web_ui_config.go @@ -36,6 +36,7 @@ func newWebUIConfigCollection(upstream services.ClusterConfiguration, w types.Wa return &collection[types.UIConfig, webUIConfigIndex]{ store: newStore( + types.KindUIConfig, types.UIConfig.Clone, map[webUIConfigIndex]func(types.UIConfig) string{ webUIConfigNameIndex: types.UIConfig.GetName, diff --git a/lib/cache/windows_desktop.go b/lib/cache/windows_desktop.go index e602f03f868f9..e3e4023199c33 100644 --- a/lib/cache/windows_desktop.go +++ b/lib/cache/windows_desktop.go @@ -38,6 +38,7 @@ func newWindowsDesktopServiceCollection(p services.Presence, w types.WatchKind) return &collection[types.WindowsDesktopService, windowsDesktopServiceIndex]{ store: newStore( + types.KindWindowsDesktopService, types.WindowsDesktopService.Clone, map[windowsDesktopServiceIndex]func(types.WindowsDesktopService) string{ windowsDesktopServiceNameIndex: types.WindowsDesktopService.GetName, @@ -168,6 +169,7 @@ func newWindowsDesktopCollection(d services.WindowsDesktops, w types.WatchKind) return &collection[types.WindowsDesktop, windowsDesktopIndex]{ store: newStore( + types.KindWindowsDesktop, types.WindowsDesktop.Copy, map[windowsDesktopIndex]func(types.WindowsDesktop) string{ windowsDesktopNameIndex: func(u types.WindowsDesktop) string { @@ -293,6 +295,7 @@ func newDynamicWindowsDesktopCollection(upstream services.DynamicWindowsDesktops return &collection[types.DynamicWindowsDesktop, dynamicWindowsDesktopIndex]{ store: newStore( + types.KindDynamicWindowsDesktop, types.DynamicWindowsDesktop.Copy, map[dynamicWindowsDesktopIndex]func(types.DynamicWindowsDesktop) string{ dynamicWindowsDesktopNameIndex: types.DynamicWindowsDesktop.GetName, diff --git a/lib/cache/workload_identity.go b/lib/cache/workload_identity.go index ac8a258b47946..34a17997e8de9 100644 --- a/lib/cache/workload_identity.go +++ b/lib/cache/workload_identity.go @@ -40,6 +40,7 @@ func newWorkloadIdentityCollection(upstream services.WorkloadIdentities, w types return &collection[*workloadidentityv1pb.WorkloadIdentity, workloadIdentityIndex]{ store: newStore( + types.KindWorkloadIdentity, proto.CloneOf[*workloadidentityv1pb.WorkloadIdentity], map[workloadIdentityIndex]func(*workloadidentityv1pb.WorkloadIdentity) string{ workloadIdentityNameIndex: func(r *workloadidentityv1pb.WorkloadIdentity) string { diff --git a/lib/observability/metrics/prometheus.go b/lib/observability/metrics/prometheus.go index 8085be0ef19b7..d190c7ca1f5dc 100644 --- a/lib/observability/metrics/prometheus.go +++ b/lib/observability/metrics/prometheus.go @@ -34,6 +34,13 @@ import ( // - returns an error if a collector does not fulfill the consistency and // uniqueness criteria func RegisterPrometheusCollectors(collectors ...prometheus.Collector) error { + return RegisterCollectors(prometheus.DefaultRegisterer, collectors...) +} + +// RegisterCollectors registers the collectors in the registry. Any errors +// for already registered collectors are ignored. Errors are only returned +// if a collector does not fulfill the consistency and uniqueness criteria. +func RegisterCollectors(registry prometheus.Registerer, collectors ...prometheus.Collector) error { var errs []error for _, c := range collectors { if err := prometheus.Register(c); err != nil { diff --git a/lib/service/service.go b/lib/service/service.go index 671259ab0ab0a..e6ea773e3d15c 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -2660,6 +2660,7 @@ func (process *TeleportProcess) newAccessCacheForServices(cfg accesspoint.Config cfg.ProcessID = process.id cfg.TracingProvider = process.TracingProvider cfg.MaxRetryPeriod = process.Config.CachePolicy.MaxRetryPeriod + cfg.Registerer = process.metricsRegistry cfg.Access = services.Access cfg.AccessLists = services.AccessLists @@ -6361,9 +6362,10 @@ func (process *TeleportProcess) initAuthStorage() (backend.Backend, error) { } reporter, err := backend.NewReporter(backend.ReporterConfig{ - Component: teleport.ComponentBackend, - Backend: backend.NewSanitizer(bk), - Tracer: process.TracingProvider.Tracer(teleport.ComponentBackend), + Component: teleport.ComponentBackend, + Backend: backend.NewSanitizer(bk), + Tracer: process.TracingProvider.Tracer(teleport.ComponentBackend), + Registerer: process.metricsRegistry, }) if err != nil { return nil, trace.Wrap(err)