diff --git a/lib/auth/keystore/health/active.go b/lib/auth/keystore/health/active.go new file mode 100644 index 0000000000000..58d208014f52f --- /dev/null +++ b/lib/auth/keystore/health/active.go @@ -0,0 +1,338 @@ +/* + * Teleport + * Copyright (C) 2025 Gravitational, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package health + +import ( + "bytes" + "context" + "crypto" + "crypto/ecdsa" + "crypto/ed25519" + "crypto/rand" + "crypto/rsa" + "crypto/sha256" + "log/slog" + "slices" + "strings" + "sync" + "time" + + "github.com/gravitational/trace" + "golang.org/x/crypto/ssh" + + "github.com/gravitational/teleport/api/types" +) + +// KeyManager allows getting a signer for each CA we expect to health check for. +type KeyManager interface { + GetTLSSigner(context.Context, types.CertAuthority) (crypto.Signer, error) + GetSSHSigner(context.Context, types.CertAuthority) (ssh.Signer, error) + GetJWTSigner(context.Context, types.CertAuthority) (crypto.Signer, error) +} + +// ActiveHealthCheckConfig contains values for configuring an [ActiveHealthChecker]. +type ActiveHealthCheckConfig struct { + // Interval is the duration waited between signing requests when there have + // been no failures. + Interval time.Duration + // FailureInterval is the duration waited between calls after a failure + // occurs. + FailureInterval time.Duration + // Callback should be a non-blocking call that is passed the result of + // each signing request made by the health checker. + // The first callback will occur only after receiving a cert authority event. + Callback func(error) + // ResourceC is a channel for receiving cert authority events. + // It is expected that the full list of cert authorities is provided + // in each event. + ResourceC <-chan []types.CertAuthority + // KeyManager allows getting signers for each CA we expect to make requests against. + KeyManager KeyManager + // Logger configures a structured logger to use. + Logger *slog.Logger +} + +// NewActiveHealthChecker constructs an [ActiveHealthChecker] instance. +func NewActiveHealthChecker(c ActiveHealthCheckConfig) (*ActiveHealthChecker, error) { + if c.Callback == nil { + return nil, trace.BadParameter("health check callback must be specified") + } + if c.Interval == 0 { + c.Interval = time.Minute + } + if c.FailureInterval == 0 { + c.FailureInterval = time.Second * 10 + } + if c.Logger == nil { + c.Logger = slog.New(slog.DiscardHandler) + } + + return &ActiveHealthChecker{ + interval: c.Interval, + failureInterval: c.FailureInterval, + callback: c.Callback, + c: c.ResourceC, + firstEvent: make(chan struct{}, 1), + m: c.KeyManager, + logger: c.Logger, + signers: make([]*healthSigner, 0), + healthFn: sign, + }, nil +} + +// ActiveHealthChecker makes signing requests to CAs and reports errors back to +// the configured callback. CAs are health checked one at a time at the given +// interval. +type ActiveHealthChecker struct { + interval time.Duration + failureInterval time.Duration + healthFn func(*healthSigner) error + + // firstEvent is sent on and closed after receiving the first message on c. + // If c is closed before any message is received firstEvent is closed without + // being sent on. + firstEvent chan struct{} + c <-chan []types.CertAuthority + + m KeyManager + callback func(error) + logger *slog.Logger + + // mu protects signers. + mu sync.RWMutex + signers []*healthSigner +} + +// Run executes the main health checking loop, iterating over CAs and making +// signing requests. +func (c *ActiveHealthChecker) Run(ctx context.Context) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + go func() { + err := c.watch(ctx) + c.logger.ErrorContext(ctx, "CA event watcher exited", "error", err) + cancel() + }() + + select { + case _, ok := <-c.firstEvent: + if !ok { + return trace.Errorf("failed to start active health checker: failed to receive first CA event") + } + case <-ctx.Done(): + return ctx.Err() + } + + ticker := time.NewTicker(c.interval) + defer ticker.Stop() + var ( + signer *healthSigner + err error + ) + for { + signer, err = c.step(signer) + c.callback(err) + if err != nil { + ticker.Reset(c.failureInterval) + } else { + ticker.Reset(c.interval) + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + } + } +} + +func (c *ActiveHealthChecker) step(curr *healthSigner) (*healthSigner, error) { + if !c.exists(curr) { + n, err := c.nextSigner(curr) + if err != nil { + return nil, trace.Wrap(err) + } + curr = n + } + if curr != nil { + err := c.healthFn(curr) + if err != nil { + return curr, trace.Wrap(err) + } + } + next, err := c.nextSigner(curr) + if err != nil { + return nil, trace.Wrap(err) + } + return next, nil +} + +func (c *ActiveHealthChecker) exists(s *healthSigner) bool { + if s == nil { + return false + } + + c.mu.RLock() + defer c.mu.RUnlock() + for _, signer := range c.signers { + if signer.Equal(s) { + return true + } + } + return false +} + +func (c *ActiveHealthChecker) nextSigner(last *healthSigner) (*healthSigner, error) { + c.mu.RLock() + defer c.mu.RUnlock() + n := len(c.signers) + if n == 0 { + return nil, trace.Errorf("no signers present") + } + if last == nil { + return c.signers[0], nil + } + for i := range c.signers { + if last.id <= c.signers[i].id { + continue + } + return c.signers[i], nil + } + return c.signers[0], nil +} + +func (c *ActiveHealthChecker) watch(ctx context.Context) error { + once := sync.Once{} + for { + select { + case <-ctx.Done(): + return ctx.Err() + case cas, ok := <-c.c: + if !ok { + once.Do(func() { close(c.firstEvent) }) + return trace.Errorf("CA resource watcher closed") + } + c.mu.Lock() + var signers []*healthSigner + for _, ca := range cas { + signers = append(signers, c.getHealthSigners(ctx, ca)...) + } + slices.SortStableFunc(signers, func(a, b *healthSigner) int { + return strings.Compare(a.id, b.id) + }) + c.signers = signers + c.mu.Unlock() + once.Do(func() { + c.firstEvent <- struct{}{} + close(c.firstEvent) + }) + } + } +} + +func (c *ActiveHealthChecker) getHealthSigners(ctx context.Context, ca types.CertAuthority) []*healthSigner { + var signers []*healthSigner + ks := ca.GetActiveKeys() + if len(ks.TLS) > 0 { + signer, err := c.m.GetTLSSigner(ctx, ca) + if err == nil { + signers = append(signers, &healthSigner{ + crypto: signer, + id: ca.GetID().String() + "-tls", + }) + } + } + if len(ks.SSH) > 0 { + signer, err := c.m.GetSSHSigner(ctx, ca) + if err == nil { + signers = append(signers, &healthSigner{ + ssh: signer, + id: ca.GetID().String() + "-ssh", + }) + } + } + if len(ks.JWT) > 0 { + signer, err := c.m.GetJWTSigner(ctx, ca) + if err == nil { + signers = append(signers, &healthSigner{ + crypto: signer, + id: ca.GetID().String() + "-jwt", + }) + } + } + return signers +} + +// healthSigner wraps a crypto OR ssh signer with an ID. The ID is the CA ID plus +// a suffix to indicate the signer type of "-tls", "-ssh", "-jwt". This suffix +// is necessary to differentiate signers associated with the same CA. +type healthSigner struct { + id string + crypto crypto.Signer + ssh ssh.Signer +} + +// sign performs a signing request given a healthSigner. +func sign(s *healthSigner) error { + msg := []byte("healthcheck") + if s.crypto != nil { + var ( + digest []byte + opts crypto.SignerOpts + ) + switch pub := s.crypto.Public().(type) { + case *ecdsa.PublicKey, *rsa.PublicKey: + h := sha256.Sum256(msg) + digest = h[:] + opts = crypto.SHA256 + case ed25519.PublicKey: + digest = msg + opts = &ed25519.Options{} + default: + return trace.Errorf("failed signing with crypto signer: %s unexpected key type %T", s.id, pub) + } + _, err := s.crypto.Sign(rand.Reader, digest, opts) + if err != nil { + return trace.Wrap(err, "failed signing with crypto signer: %s", s.id) + } + } else if s.ssh != nil { + _, err := s.ssh.Sign(rand.Reader, msg) + if err != nil { + return trace.Wrap(err, "failed signing with ssh signer: %s", s.id) + } + } else { + return trace.Errorf("unable to test key signing: missing signer: %s", s.id) + } + return nil +} + +type keycompare interface { + Equal(crypto.PublicKey) bool +} + +// Equal compares healthSigner a's public key to healthSigner b's public key. +func (a *healthSigner) Equal(b *healthSigner) bool { + if a.crypto != nil && b.crypto != nil { + kc, ok := a.crypto.Public().(keycompare) + return ok && kc.Equal(b.crypto.Public()) + } else if a.ssh != nil && b.ssh != nil { + return bytes.Equal(a.ssh.PublicKey().Marshal(), b.ssh.PublicKey().Marshal()) + } + return false +} diff --git a/lib/auth/keystore/health/active_test.go b/lib/auth/keystore/health/active_test.go new file mode 100644 index 0000000000000..24326f187551d --- /dev/null +++ b/lib/auth/keystore/health/active_test.go @@ -0,0 +1,83 @@ +/* + * Teleport + * Copyright (C) 2025 Gravitational, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package health + +import ( + "context" + "crypto" + "log/slog" + "testing" + "testing/synctest" + + "github.com/stretchr/testify/require" + + "github.com/gravitational/teleport/api/types" +) + +type mockKM struct { + KeyManager + crypto.Signer +} + +func (m *mockKM) GetTLSSigner(_ context.Context, ca types.CertAuthority) (crypto.Signer, error) { + return m, nil +} + +func (m *mockKM) Public() crypto.PublicKey { + return m +} + +func (m *mockKM) Equal(o crypto.PublicKey) bool { + unwrap, ok := o.(*mockKM) + return ok && unwrap == m +} + +func TestActiveHealthCheckerSync(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + ca := &types.CertAuthorityV2{} + err := ca.SetActiveKeys(types.CAKeySet{ + TLS: []*types.TLSKeyPair{{ + Cert: []byte{0}, + }}, + }) + require.NoError(t, err) + + ch := make(chan []types.CertAuthority, 1) + ch <- []types.CertAuthority{ca} + calls := make([]error, 0) + + hc, err := NewActiveHealthChecker(ActiveHealthCheckConfig{ + Callback: func(err error) { + calls = append(calls, err) + }, + ResourceC: ch, + KeyManager: &mockKM{}, + Logger: slog.Default(), + }) + require.NoError(t, err) + hc.healthFn = func(_ *healthSigner) error { + return nil + } + + go hc.Run(t.Context()) + synctest.Wait() + require.Len(t, calls, 1) + require.NoError(t, calls[0]) + }) +} diff --git a/lib/auth/keystore/manager.go b/lib/auth/keystore/manager.go index 00ab351553e9d..8e12a857db541 100644 --- a/lib/auth/keystore/manager.go +++ b/lib/auth/keystore/manager.go @@ -479,6 +479,13 @@ func (m *Manager) GetTLSCertAndSigner(ctx context.Context, ca types.CertAuthorit return cert, signer, trace.Wrap(err) } +// GetTLSSigner selects a usable TLS keypair from the given CA and returns a +// [crypto.Signer]. +func (m *Manager) GetTLSSigner(ctx context.Context, ca types.CertAuthority) (crypto.Signer, error) { + _, signer, err := m.getTLSCertAndSigner(ctx, ca.GetActiveKeys()) + return signer, trace.Wrap(err) +} + // GetAdditionalTrustedTLSCertAndSigner selects a usable TLS keypair from the given CA // and returns the PEM-encoded TLS certificate and a [crypto.Signer]. func (m *Manager) GetAdditionalTrustedTLSCertAndSigner(ctx context.Context, ca types.CertAuthority) ([]byte, crypto.Signer, error) { diff --git a/lib/reversetunnel/srv.go b/lib/reversetunnel/srv.go index 00c3c58f98c06..bc05edeff5829 100644 --- a/lib/reversetunnel/srv.go +++ b/lib/reversetunnel/srv.go @@ -1319,7 +1319,8 @@ func newLeafCluster(srv *server, domainName string, sconn ssh.Conn) (*leafCluste return nil, trace.Wrap(err) } - leafClusterWatcher, err := services.NewCertAuthorityWatcher(srv.ctx, services.CertAuthorityWatcherConfig{ + //nolint:staticcheck // SA1019 This should be updated to use [services.NewCertAuthorityWatcher] + leafClusterWatcher, err := services.DeprecatedNewCertAuthorityWatcher(srv.ctx, services.CertAuthorityWatcherConfig{ ResourceWatcherConfig: services.ResourceWatcherConfig{ Component: teleport.ComponentProxy, Logger: srv.logger, diff --git a/lib/service/service.go b/lib/service/service.go index 8d560a60bc1c3..722e586b6c96a 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -91,6 +91,7 @@ import ( "github.com/gravitational/teleport/lib/auth/authclient" "github.com/gravitational/teleport/lib/auth/keygen" "github.com/gravitational/teleport/lib/auth/keystore" + "github.com/gravitational/teleport/lib/auth/keystore/health" "github.com/gravitational/teleport/lib/auth/machineid/machineidv1" "github.com/gravitational/teleport/lib/auth/recordingencryption" "github.com/gravitational/teleport/lib/auth/recordingmetadata" @@ -2834,6 +2835,38 @@ func (process *TeleportProcess) initAuthService() error { process.ExpectService(teleport.ComponentAuth) process.RegisterFunc("auth.heartbeat", heartbeat.Run) + if cfg.Auth.KeyStore.HealthCheck != nil && + cfg.Auth.KeyStore.HealthCheck.Active != nil && + cfg.Auth.KeyStore.HealthCheck.Active.Enabled { + + cawatcher, err := services.NewCertAuthorityWatcher(process.ExitContext(), services.CertAuthorityWatcherConfig{ + ResourceWatcherConfig: services.ResourceWatcherConfig{ + Component: teleport.ComponentAuth, + Logger: process.logger.With(teleport.ComponentKey, teleport.Component(teleport.ComponentAuth, process.id)), + Client: authServer.Services, + }, + AuthorityGetter: authServer.Services, + Types: types.CertAuthTypes, + LoadKeys: true, + ResourceC: make(chan []types.CertAuthority, 64), + }) + if err != nil { + return trace.Wrap(err) + } + keystoreHealth, err := health.NewActiveHealthChecker(health.ActiveHealthCheckConfig{ + Callback: process.OnHeartbeat(teleport.ComponentKeyStore), + KeyManager: authServer.GetKeyStore(), + Logger: logger.With(teleport.ComponentKey, teleport.Component(teleport.ComponentKeyStore, "health")), + ResourceC: cawatcher.ResourcesC, + }) + if err != nil { + return trace.Wrap(err) + } + process.RegisterFunc("auth.keystore_health", func() error { + return trace.Wrap(keystoreHealth.Run(process.ExitContext())) + }) + } + spiffeFedSyncer, err := machineidv1.NewSPIFFEFederationSyncer(machineidv1.SPIFFEFederationSyncerConfig{ Backend: b, Store: authServer.Services.SPIFFEFederations, @@ -4873,7 +4906,8 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error { return trace.Wrap(err) } - caWatcher, err := services.NewCertAuthorityWatcher(process.ExitContext(), services.CertAuthorityWatcherConfig{ + //nolint:staticcheck // SA1019 This should be updated to use [services.NewCertAuthorityWatcher] + caWatcher, err := services.DeprecatedNewCertAuthorityWatcher(process.ExitContext(), services.CertAuthorityWatcherConfig{ ResourceWatcherConfig: services.ResourceWatcherConfig{ Component: teleport.ComponentProxy, Logger: process.logger.With(teleport.ComponentKey, teleport.ComponentProxy), diff --git a/lib/service/servicecfg/auth.go b/lib/service/servicecfg/auth.go index 6288b1bb1e600..e909df4af84b4 100644 --- a/lib/service/servicecfg/auth.go +++ b/lib/service/servicecfg/auth.go @@ -202,6 +202,8 @@ type KeystoreConfig struct { GCPKMS GCPKMSConfig // AWSKMS holds configuration parameter specific to AWS KMS keystores. AWSKMS *AWSKMSConfig + // HealthCheck holds configuration parameters for keystore health checking. + HealthCheck *KeystoreHealthCheck } // CheckAndSetDefaults checks that required parameters of the config are @@ -334,3 +336,15 @@ type MultiRegionKeyStore struct { // ReplicaRegions is a list of regions keys will be replicated to. ReplicaRegions []string `yaml:"replica_regions"` } + +// KeystoreHealthCheck contains configuration for keystore health checking. +type KeystoreHealthCheck struct { + // Active configures active health checking for a keystore. + Active *KeystoreActiveHealthCheck `yaml:"active"` +} + +// KeystoreActiveHealthCheck contains configuration for keystore active health checking. +type KeystoreActiveHealthCheck struct { + // Enabled enables active health checking. + Enabled bool `yaml:"enabled"` +} diff --git a/lib/services/readonly/readonly.go b/lib/services/readonly/readonly.go index 0c7142a872990..3100073cb03db 100644 --- a/lib/services/readonly/readonly.go +++ b/lib/services/readonly/readonly.go @@ -536,3 +536,40 @@ type DynamicWindowsDesktop interface { } var _ DynamicWindowsDesktop = types.DynamicWindowsDesktop(nil) + +// CertAuthority represents a teleport certificate authority. +type CertAuthority interface { + // ResourceWithSecrets sets common resource properties + types.ResourceWithSecrets + // GetID returns certificate authority ID - + // combined type and name + GetID() types.CertAuthID + // GetType returns user or host certificate authority + GetType() types.CertAuthType + // GetClusterName returns cluster name this cert authority + // is associated with + GetClusterName() string + + GetActiveKeys() types.CAKeySet + GetAdditionalTrustedKeys() types.CAKeySet + + GetTrustedSSHKeyPairs() []*types.SSHKeyPair + GetTrustedTLSKeyPairs() []*types.TLSKeyPair + GetTrustedJWTKeyPairs() []*types.JWTKeyPair + + // CombinedMapping is used to specify combined mapping from legacy property Roles + // and new property RoleMap + CombinedMapping() types.RoleMap + // GetRoleMap returns role map property + GetRoleMap() types.RoleMap + // GetRoles returns a list of roles assumed by users signed by this CA + GetRoles() []string + // String returns human readable version of the CertAuthority + String() string + // GetRotation returns rotation state. + GetRotation() types.Rotation + // AllKeyTypes returns the set of all different key types in the CA. + AllKeyTypes() []string + // Clone returns a copy of the cert authority object. + Clone() types.CertAuthority +} diff --git a/lib/services/watcher.go b/lib/services/watcher.go index b4d8c70596fea..ac03dc4483dcb 100644 --- a/lib/services/watcher.go +++ b/lib/services/watcher.go @@ -649,6 +649,11 @@ type GenericWatcherConfig[T any, R any] struct { ResourceDiffer func(old, new T) bool // ResourceKey defines how the resources should be keyed. ResourceKey func(resource T) string + // DeleteKey defines how a deleted resource key is derived. A delete event + // typically sends a stripped down resource representation with an underlying + // type of [types.ResourceHeader]. + // If unspecified the key will be derived from the resource.Description + resource.GetName + DeleteKey func(types.Resource) string // ResourcesC is a channel used to report the current resource set. It receives // a fresh list at startup and subsequently a list of all known resources // whenever an addition or deletion is detected. @@ -674,6 +679,9 @@ type GenericWatcherConfig[T any, R any] struct { // [GenericWatcher.CurrentResourcesWithFilter] manually to retrieve the active // resource set. DisableUpdateBroadcast bool + // LoadSecrets specifies whether sensitive data will be loaded into memory. + // This is only applicable to certain types like [types.CertAuthority]. + LoadSecrets bool } // CheckAndSetDefaults checks parameters and sets default values. @@ -805,7 +813,10 @@ type genericCollector[T any, R any] struct { // resourceKinds specifies the resource kind to watch. func (g *genericCollector[T, R]) resourceKinds() []types.WatchKind { - return []types.WatchKind{{Kind: g.ResourceKind}} + return []types.WatchKind{{ + Kind: g.ResourceKind, + LoadSecrets: g.LoadSecrets, + }} } // getResources gets the list of current resources. @@ -904,7 +915,11 @@ func (g *genericCollector[T, R]) processEventsAndUpdateCurrent(ctx context.Conte switch event.Type { case types.OpDelete: // On delete events, the server description is populated with the host ID. - delete(g.current, event.Resource.GetMetadata().Description+event.Resource.GetName()) + key := event.Resource.GetMetadata().Description + event.Resource.GetName() + if g.DeleteKey != nil { + key = g.DeleteKey(event.Resource) + } + delete(g.current, key) // Always broadcast when a resource is deleted. updated = true case types.OpPut: @@ -1246,6 +1261,10 @@ type CertAuthorityWatcherConfig struct { AuthorityGetter // Types restricts which cert authority types are retrieved via the AuthorityGetter. Types []types.CertAuthType + // LoadKeys determines whether private keys will be included. + LoadKeys bool + // ResourceC receives an up-to-date list of all cert authority resources. + ResourceC chan []types.CertAuthority } // CheckAndSetDefaults checks parameters and sets default values. @@ -1266,8 +1285,47 @@ func (cfg *CertAuthorityWatcherConfig) CheckAndSetDefaults() error { return nil } -// NewCertAuthorityWatcher returns a new instance of CertAuthorityWatcher. -func NewCertAuthorityWatcher(ctx context.Context, cfg CertAuthorityWatcherConfig) (*CertAuthorityWatcher, error) { +// NewCertAuthorityWatcher returns a new cert authority watcher instance. +func NewCertAuthorityWatcher(ctx context.Context, cfg CertAuthorityWatcherConfig) (*GenericWatcher[types.CertAuthority, readonly.CertAuthority], error) { + if err := cfg.CheckAndSetDefaults(); err != nil { + return nil, trace.Wrap(err) + } + getter := cfg.AuthorityGetter + w, err := NewGenericResourceWatcher(ctx, GenericWatcherConfig[types.CertAuthority, readonly.CertAuthority]{ + ResourceKind: types.KindCertAuthority, + ResourceWatcherConfig: cfg.ResourceWatcherConfig, + ResourceGetter: func(ctx context.Context) ([]types.CertAuthority, error) { + var cas []types.CertAuthority + for _, t := range cfg.Types { + innerCAs, err := getter.GetCertAuthorities(ctx, t, cfg.LoadKeys) + if err != nil { + return nil, trace.Wrap(err) + } + cas = append(cas, innerCAs...) + } + return cas, nil + }, + ResourceKey: func(resource types.CertAuthority) string { + return resource.GetSubKind() + "/" + resource.GetName() + }, + DeleteKey: func(resource types.Resource) string { + return resource.GetSubKind() + "/" + resource.GetName() + }, + ResourcesC: cfg.ResourceC, + CloneFunc: types.CertAuthority.Clone, + ReadOnlyFunc: func(resource types.CertAuthority) readonly.CertAuthority { + return resource + }, + LoadSecrets: cfg.LoadKeys, + }) + return w, trace.Wrap(err) +} + +// DeprecatedNewCertAuthorityWatcher returns a new instance of CertAuthorityWatcher. +// +// Deprecated: This has been replaced by NewCertAuthorityWatcher which uses the +// newer generic watcher under the hood. +func DeprecatedNewCertAuthorityWatcher(ctx context.Context, cfg CertAuthorityWatcherConfig) (*CertAuthorityWatcher, error) { if err := cfg.CheckAndSetDefaults(); err != nil { return nil, trace.Wrap(err) } diff --git a/lib/services/watcher_test.go b/lib/services/watcher_test.go index ca8974969ee9a..c33bde93821f6 100644 --- a/lib/services/watcher_test.go +++ b/lib/services/watcher_test.go @@ -23,6 +23,7 @@ import ( "crypto/x509/pkix" "errors" "fmt" + "slices" "sort" "sync" "testing" @@ -774,6 +775,80 @@ func TestCertAuthorityWatcher(t *testing.T) { caService := local.NewCAService(bk) w, err := services.NewCertAuthorityWatcher(ctx, services.CertAuthorityWatcherConfig{ + ResourceWatcherConfig: services.ResourceWatcherConfig{ + Component: "test", + MaxRetryPeriod: 200 * time.Millisecond, + Client: &client{ + Trust: caService, + Events: local.NewEventsService(bk), + }, + Clock: clock, + }, + Types: []types.CertAuthType{types.HostCA, types.UserCA, types.DatabaseCA, types.OpenSSHCA}, + AuthorityGetter: caService, + ResourceC: make(chan []types.CertAuthority, 8), + }) + require.NoError(t, err) + t.Cleanup(w.Close) + + waitForEvent := func(t *testing.T, caTypes []types.CertAuthType) { + select { + case cas := <-w.ResourcesC: + for _, caType := range caTypes { + require.True(t, slices.ContainsFunc(cas, func(ca types.CertAuthority) bool { + return ca.GetType() == caType + })) + } + require.Empty(t, w.ResourcesC) + require.Len(t, cas, len(caTypes)) + case <-time.After(time.Second * 2): + t.Fatal("timed out waiting for event") + } + } + + select { + case changeset := <-w.ResourcesC: + require.Empty(t, changeset) + case <-w.Done(): + t.Fatal("Watcher has unexpectedly exited.") + case <-time.After(2 * time.Second): + t.Fatal("Timeout waiting for the first event.") + } + + // Create a CA and ensure we receive the event. + ca := newCertAuthority(t, "test", types.HostCA) + require.NoError(t, caService.UpsertCertAuthority(ctx, ca)) + waitForEvent(t, []types.CertAuthType{types.HostCA}) + + ca = newCertAuthority(t, "test", types.DatabaseCA) + require.NoError(t, caService.UpsertCertAuthority(ctx, ca)) + waitForEvent(t, []types.CertAuthType{types.HostCA, types.DatabaseCA}) + + require.NoError(t, caService.DeleteCertAuthority(ctx, ca.GetID())) + waitForEvent(t, []types.CertAuthType{types.HostCA}) +} + +func TestDeprecatedCertAuthorityWatcher(t *testing.T) { + t.Parallel() + + ctx := context.Background() + clock := clockwork.NewFakeClock() + + bk, err := memory.New(memory.Config{ + Context: ctx, + Clock: clock, + }) + require.NoError(t, err) + + type client struct { + services.Trust + types.Events + } + + caService := local.NewCAService(bk) + //nolint:staticcheck // SA1019 This test should be deleted after all uses of + // [services.DeprecatedNewCertAuthorityWatcher] are removed. + w, err := services.DeprecatedNewCertAuthorityWatcher(ctx, services.CertAuthorityWatcherConfig{ ResourceWatcherConfig: services.ResourceWatcherConfig{ Component: "test", MaxRetryPeriod: 200 * time.Millisecond, diff --git a/lib/srv/regular/sshserver_test.go b/lib/srv/regular/sshserver_test.go index aee8e4f89d349..4a87ed2b6230d 100644 --- a/lib/srv/regular/sshserver_test.go +++ b/lib/srv/regular/sshserver_test.go @@ -3234,7 +3234,8 @@ func newGitServerWatcher(ctx context.Context, t *testing.T, client *authclient.C } func newCertAuthorityWatcher(ctx context.Context, t *testing.T, client types.Events) *services.CertAuthorityWatcher { - caWatcher, err := services.NewCertAuthorityWatcher(ctx, services.CertAuthorityWatcherConfig{ + //nolint:staticcheck // SA1019 This should be updated to use [services.NewCertAuthorityWatcher] + caWatcher, err := services.DeprecatedNewCertAuthorityWatcher(ctx, services.CertAuthorityWatcherConfig{ ResourceWatcherConfig: services.ResourceWatcherConfig{ Component: "test", Client: client, diff --git a/lib/web/apiserver_test.go b/lib/web/apiserver_test.go index bf6b3533c9232..e0b7c862ef151 100644 --- a/lib/web/apiserver_test.go +++ b/lib/web/apiserver_test.go @@ -423,7 +423,8 @@ func newWebSuiteWithConfig(t *testing.T, cfg webSuiteConfig) *WebSuite { }) require.NoError(t, err) - caWatcher, err := services.NewCertAuthorityWatcher(s.ctx, services.CertAuthorityWatcherConfig{ + //nolint:staticcheck // SA1019 This should be updated to use [services.NewCertAuthorityWatcher] + caWatcher, err := services.DeprecatedNewCertAuthorityWatcher(s.ctx, services.CertAuthorityWatcherConfig{ ResourceWatcherConfig: services.ResourceWatcherConfig{ Component: teleport.ComponentProxy, Client: s.proxyClient, @@ -8624,7 +8625,8 @@ func createProxy(ctx context.Context, t *testing.T, proxyID string, node *regula require.NoError(t, err) t.Cleanup(proxyLockWatcher.Close) - proxyCAWatcher, err := services.NewCertAuthorityWatcher(ctx, services.CertAuthorityWatcherConfig{ + //nolint:staticcheck // SA1019 This should be updated to use [services.NewCertAuthorityWatcher] + proxyCAWatcher, err := services.DeprecatedNewCertAuthorityWatcher(ctx, services.CertAuthorityWatcherConfig{ ResourceWatcherConfig: services.ResourceWatcherConfig{ Component: teleport.ComponentProxy, Client: client,