Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 146 additions & 8 deletions lib/services/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ package services
import (
"context"
"log/slog"
"time"

"github.com/gravitational/trace"
"github.com/prometheus/client_golang/prometheus"

"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/types"
Expand Down Expand Up @@ -55,6 +57,14 @@ type GenericReconcilerConfig[K comparable, T any] struct {
OnDelete func(context.Context, T) error
// Logger emits log messages.
Logger *slog.Logger
// MetricsSubsystem is the subsystem used when creating the reconciler
// metrics. e.g. "entra_sync" will give metrics such as:
// "teleport_entra_sync_reconciliation_total"
// This must be set when MetricsRegistry is non-nil.
MetricsSubsystem string
// MetricsRegistry is used to register the reconciler metrics If nil,
// metrics are not registered. If non-nil, MetricsSubsystem must be set.
MetricsRegistry prometheus.Registerer
// AllowOriginChanges is a flag that allows the reconciler to change the
// origin value of a reconciled resource. By default, origin changes are
// disallowed to enforce segregation between of resources from different
Expand Down Expand Up @@ -88,17 +98,68 @@ func (c *GenericReconcilerConfig[K, T]) CheckAndSetDefaults() error {
if c.Logger == nil {
c.Logger = slog.With(teleport.ComponentKey, "reconciler")
}
if c.MetricsRegistry != nil && c.MetricsSubsystem == "" {
return trace.BadParameter("if MetricsRegistry is non-nil, MetricsSubsystem is required (this is a bug)")
}
return nil
}

type reconcilerMetrics struct {
reconciliationTotal *prometheus.CounterVec
reconciliationDuration *prometheus.HistogramVec
}

const (
metricLabelResult = "result"
metricLabelResultSuccess = "success"
metricLabelResultError = "error"
metricLabelResultNoop = "noop"
metricLabelOperation = "operation"
metricLabelOperationCreate = "create"
metricLabelOperationUpdate = "update"
metricLabelOperationDelete = "delete"
metricLabelKind = "kind"
)

func newReconcilerMetrics(subsystem string) *reconcilerMetrics {
return &reconcilerMetrics{
reconciliationTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: teleport.MetricNamespace,
Subsystem: subsystem,
Name: "reconciliation_total",
Help: "Total number of individual resource reconciliations.",
}, []string{metricLabelKind, metricLabelOperation, metricLabelResult}),
reconciliationDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: teleport.MetricNamespace,
Subsystem: subsystem,
Name: "reconciliation_duration_seconds",
Help: "The duration of individual resource reconciliation in seconds.",
}, []string{metricLabelKind, metricLabelOperation}),
}
}

func (m *reconcilerMetrics) register(r prometheus.Registerer) error {
return trace.NewAggregate(
r.Register(m.reconciliationTotal),
r.Register(m.reconciliationDuration),
)
}

// NewGenericReconciler creates a new GenericReconciler with provided configuration.
func NewGenericReconciler[K comparable, T any](cfg GenericReconcilerConfig[K, T]) (*GenericReconciler[K, T], error) {
if err := cfg.CheckAndSetDefaults(); err != nil {
return nil, trace.Wrap(err)
}
m := newReconcilerMetrics(cfg.MetricsSubsystem)
if cfg.MetricsRegistry != nil {
if err := m.register(cfg.MetricsRegistry); err != nil {
return nil, trace.Wrap(err, "registering metrics")
}
}
return &GenericReconciler[K, T]{
cfg: cfg,
logger: cfg.Logger,
cfg: cfg,
logger: cfg.Logger,
metrics: m,
}, nil
}

Expand All @@ -108,8 +169,9 @@ func NewGenericReconciler[K comparable, T any](cfg GenericReconcilerConfig[K, T]
// It's used in combination with watchers by agents (app, database, desktop)
// to enable dynamically registered resources.
type GenericReconciler[K comparable, T any] struct {
cfg GenericReconcilerConfig[K, T]
logger *slog.Logger
cfg GenericReconcilerConfig[K, T]
logger *slog.Logger
metrics *reconcilerMetrics
}

// Reconcile reconciles currently registered resources with new resources and
Expand Down Expand Up @@ -155,14 +217,35 @@ func (r *GenericReconciler[K, T]) processRegisteredResource(ctx context.Context,
return trace.Wrap(err)
}
r.logger.InfoContext(ctx, "Resource was removed, deleting", "kind", kind, "name", key)
if err := r.cfg.OnDelete(ctx, registered); err != nil {
start := time.Now()
err = r.cfg.OnDelete(ctx, registered)
r.metrics.reconciliationDuration.With(prometheus.Labels{
metricLabelKind: kind,
metricLabelOperation: metricLabelOperationDelete,
}).Observe(time.Since(start).Seconds())
if err != nil {
if trace.IsNotFound(err) {
r.logger.Log(ctx, logutils.TraceLevel, "Failed to delete resource", "kind", kind, "name", key, "err", err)
r.metrics.reconciliationTotal.With(prometheus.Labels{
metricLabelKind: kind,
metricLabelOperation: metricLabelOperationDelete,
metricLabelResult: metricLabelResultNoop,
}).Inc()
return nil
}
r.metrics.reconciliationTotal.With(prometheus.Labels{
metricLabelKind: kind,
metricLabelOperation: metricLabelOperationDelete,
metricLabelResult: metricLabelResultError,
}).Inc()
return trace.Wrap(err, "failed to delete %v %v", kind, key)
}

r.metrics.reconciliationTotal.With(prometheus.Labels{
metricLabelKind: kind,
metricLabelOperation: metricLabelOperationDelete,
metricLabelResult: metricLabelResultSuccess,
}).Inc()
return nil
}

Expand All @@ -179,9 +262,27 @@ func (r *GenericReconciler[K, T]) processNewResource(ctx context.Context, curren
}
if r.cfg.Matcher(newT) {
r.logger.InfoContext(ctx, "New resource matches, creating", "kind", kind, "name", key)
if err := r.cfg.OnCreate(ctx, newT); err != nil {
start := time.Now()
err = r.cfg.OnCreate(ctx, newT)
r.metrics.reconciliationDuration.With(prometheus.Labels{
metricLabelKind: kind,
metricLabelOperation: metricLabelOperationCreate,
}).Observe(time.Since(start).Seconds())

if err != nil {
r.metrics.reconciliationTotal.With(prometheus.Labels{
metricLabelKind: kind,
metricLabelOperation: metricLabelOperationCreate,
metricLabelResult: metricLabelResultError,
}).Inc()
return trace.Wrap(err, "failed to create %v %v", kind, key)
}
r.metrics.reconciliationTotal.With(
prometheus.Labels{
metricLabelKind: kind,
metricLabelOperation: metricLabelOperationCreate,
metricLabelResult: metricLabelResultSuccess,
}).Inc()
return nil
}
r.logger.DebugContext(ctx, "New resource doesn't match, not creating", "kind", kind, "name", key)
Expand Down Expand Up @@ -215,19 +316,56 @@ func (r *GenericReconciler[K, T]) processNewResource(ctx context.Context, curren
if r.cfg.CompareResources(newT, registered) != Equal {
if r.cfg.Matcher(newT) {
r.logger.InfoContext(ctx, "Existing resource updated, updating", "name", key)
if err := r.cfg.OnUpdate(ctx, newT, registered); err != nil {
start := time.Now()
err := r.cfg.OnUpdate(ctx, newT, registered)
r.metrics.reconciliationDuration.With(prometheus.Labels{
metricLabelKind: kind,
metricLabelOperation: metricLabelOperationUpdate,
}).Observe(time.Since(start).Seconds())
if err != nil {
r.metrics.reconciliationTotal.With(prometheus.Labels{
metricLabelKind: kind,
metricLabelOperation: metricLabelOperationUpdate,
metricLabelResult: metricLabelResultError,
}).Inc()
return trace.Wrap(err, "failed to update %v %v", kind, key)
}
r.metrics.reconciliationTotal.With(prometheus.Labels{
metricLabelKind: kind,
metricLabelOperation: metricLabelOperationUpdate,
metricLabelResult: metricLabelResultSuccess,
}).Inc()
return nil
}
r.logger.InfoContext(ctx, "Existing resource updated and no longer matches, deleting", "name", key)
if err := r.cfg.OnDelete(ctx, registered); err != nil {
start := time.Now()
err := r.cfg.OnDelete(ctx, registered)
r.metrics.reconciliationDuration.With(prometheus.Labels{
metricLabelKind: kind,
metricLabelOperation: metricLabelOperationDelete,
}).Observe(time.Since(start).Seconds())
if err != nil {
if trace.IsNotFound(err) {
r.logger.Log(ctx, logutils.TraceLevel, "Failed to delete resource", "kind", kind, "name", key, "err", err)
r.metrics.reconciliationTotal.With(prometheus.Labels{
metricLabelKind: kind,
metricLabelOperation: metricLabelOperationDelete,
metricLabelResult: metricLabelResultNoop,
}).Inc()
return nil
}
r.metrics.reconciliationTotal.With(prometheus.Labels{
metricLabelKind: kind,
metricLabelOperation: metricLabelOperationDelete,
metricLabelResult: metricLabelResultError,
}).Inc()
return trace.Wrap(err, "failed to delete %v %v", kind, key)
}
r.metrics.reconciliationTotal.With(prometheus.Labels{
metricLabelKind: kind,
metricLabelOperation: metricLabelOperationDelete,
metricLabelResult: metricLabelResultSuccess,
}).Inc()
return nil
}

Expand Down
Loading