From c742310081828365846a48975eb10d70de34ad01 Mon Sep 17 00:00:00 2001 From: hugoShaka Date: Fri, 3 Oct 2025 10:34:33 -0400 Subject: [PATCH] Expect the auth backend/cache to be initialized before turning ready --- integration/helpers/instance.go | 4 +- lib/service/service.go | 66 +++++++++++++++++++++++++ lib/service/service_test.go | 88 +++++++++++++++++---------------- 3 files changed, 115 insertions(+), 43 deletions(-) diff --git a/integration/helpers/instance.go b/integration/helpers/instance.go index de8ad7487d631..f04d337d66246 100644 --- a/integration/helpers/instance.go +++ b/integration/helpers/instance.go @@ -1329,7 +1329,9 @@ func (i *TeleInstance) Start() error { if i.Config.WindowsDesktop.Enabled { expectedEvents = append(expectedEvents, service.WindowsDesktopReady) } - + if i.Config.Relay.Enabled { + expectedEvents = append(expectedEvents, service.RelayReady) + } if i.Config.Discovery.Enabled { expectedEvents = append(expectedEvents, service.DiscoveryReady) } diff --git a/lib/service/service.go b/lib/service/service.go index 68240b91d1cfc..d90e18a0ff631 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -2606,6 +2606,72 @@ func (process *TeleportProcess) initAuthService() error { } } + // We mark the process state as starting until the auth backend is ready. + // If cache is enabled, this will wait for the cache to be populated. + // We don't want auth to say its ready until its cache is populated, + // else a rollout might progress too quickly and cause backend pressure + // and outages. + { + component := teleport.Component(teleport.ComponentAuth, "backend") + process.ExpectService(component) + process.RegisterFunc("auth.wait-for-event-stream", func() error { + start := process.Clock.Now() + + retry, err := retryutils.NewRetryV2(retryutils.RetryV2Config{ + First: 0, + Driver: retryutils.NewLinearDriver(5 * time.Second), + Max: time.Minute, + Jitter: retryutils.DefaultJitter, + Clock: process.Clock, + }) + if err != nil { + return trace.Wrap(err, "creating the backend watch retry (this is a bug)") + } + log := process.logger.With(teleport.ComponentLabel, component) + var attempt int + + // The wait logic is retried until it is successful or a termination is triggered. + for { + attempt++ + select { + case <-process.GracefulExitContext().Done(): + return trace.Wrap(err, "context canceled while backing off (attempt %d)", attempt) + case <-retry.After(): + retry.Inc() + } + + w, err := authServer.NewWatcher(process.ExitContext(), types.Watch{ + Name: "auth.wait-for-backend", + Kinds: []types.WatchKind{ + {Kind: types.KindClusterName}, + }, + }) + if err != nil { + log.ErrorContext(process.ExitContext(), "Failed to create watcher", "kind", types.KindClusterName, "attempt", attempt, "error", err) + continue + } + + select { + case evt := <-w.Events(): + w.Close() + if evt.Type != types.OpInit { + log.ErrorContext(process.ExitContext(), "expected init event, got something else (this is a bug)", "kind", types.KindClusterName, "attempt", attempt, "error", err, "event_type", evt.Type) + continue + } + process.logger.With(teleport.ComponentLabel, component).InfoContext(process.ExitContext(), "auth backend initialized", "duration", process.Clock.Since(start).String()) + process.OnHeartbeat(component)(nil) + return nil + case <-w.Done(): + log.ErrorContext(process.ExitContext(), "watcher closed while waiting for backend init", "kind", types.KindClusterName, "attempt", attempt, "error", w.Error()) + continue + case <-process.GracefulExitContext().Done(): + w.Close() + return trace.Wrap(err, "context canceled while waiting for backendf init event") + } + } + }) + } + // Register TLS endpoint of the auth service tlsConfig, err := connector.ServerTLSConfig(cfg.CipherSuites) if err != nil { diff --git a/lib/service/service_test.go b/lib/service/service_test.go index 95b10216a4c5c..256eb987e4d63 100644 --- a/lib/service/service_test.go +++ b/lib/service/service_test.go @@ -1538,32 +1538,30 @@ func TestDebugService(t *testing.T) { fakeClock := clockwork.NewFakeClock() dataDir := makeTempDir(t) - cfg := servicecfg.MakeDefaultConfig() - cfg.DebugService.Enabled = true + cfg := &servicecfg.Config{ + Clock: fakeClock, + DataDir: dataDir, + } cfg.Clock = fakeClock cfg.DataDir = dataDir - cfg.DiagnosticAddr = utils.NetAddr{AddrNetwork: "tcp", Addr: "127.0.0.1:0"} - cfg.SetAuthServerAddress(utils.NetAddr{AddrNetwork: "tcp", Addr: "127.0.0.1:0"}) - cfg.Auth.Enabled = true - cfg.Proxy.Enabled = false - cfg.Auth.StorageConfig.Params["path"] = dataDir - cfg.Auth.ListenAddr = utils.NetAddr{AddrNetwork: "tcp", Addr: "127.0.0.1:0"} - cfg.SSH.Enabled = false - cfg.CircuitBreakerConfig = breaker.NoopBreakerConfig() - process, err := NewTeleport(cfg) - require.NoError(t, err) + log := logtest.NewLogger() - require.NoError(t, process.Start()) - t.Cleanup(func() { - require.NoError(t, process.Close()) - require.NoError(t, process.Wait()) - }) + // In this test we don't want to spin a whole process and have to wait for + // every service to report ready (there's an integration test for this). + // So we craft a minimal process with only the debug service in it. + process := &TeleportProcess{ + Config: cfg, + Clock: fakeClock, + logger: log, + metricsRegistry: prometheus.NewRegistry(), + Supervisor: NewSupervisor("supervisor-test", log), + } - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - t.Cleanup(cancel) - _, err = process.WaitForEvent(ctx, TeleportOKEvent) + fakeState, err := newProcessState(process) require.NoError(t, err) + fakeState.update(Event{TeleportOKEvent, "dummy"}) + process.state = fakeState httpClient := &http.Client{ Timeout: 10 * time.Second, @@ -1574,6 +1572,9 @@ func TestDebugService(t *testing.T) { }, } + require.NoError(t, process.initDebugService(true)) + require.NoError(t, process.Start()) + // Testing the debug listener. // Fetch a random path, it should return 404 error. req, err := httpClient.Get("http://debug/random") @@ -2003,27 +2004,35 @@ func TestMetricsService(t *testing.T) { // health routes. func TestDiagnosticsService(t *testing.T) { t.Parallel() - // Test setup: create a new teleport process + + fakeClock := clockwork.NewFakeClock() dataDir := makeTempDir(t) - cfg := servicecfg.MakeDefaultConfig() - cfg.DataDir = dataDir - cfg.SetAuthServerAddress(utils.NetAddr{AddrNetwork: "tcp", Addr: "127.0.0.1:0"}) - cfg.Auth.Enabled = true - cfg.Proxy.Enabled = false - cfg.SSH.Enabled = false - cfg.DebugService.Enabled = false - cfg.Auth.StorageConfig.Params["path"] = dataDir - cfg.Auth.ListenAddr = utils.NetAddr{AddrNetwork: "tcp", Addr: "127.0.0.1:0"} - cfg.DiagnosticAddr = utils.NetAddr{AddrNetwork: "tcp", Addr: "127.0.0.1:0"} + cfg := &servicecfg.Config{ + Clock: fakeClock, + DataDir: dataDir, + DiagnosticAddr: utils.NetAddr{AddrNetwork: "tcp", Addr: "127.0.0.1:0"}, + } - // Test setup: Create and start the Teleport service. - process, err := NewTeleport(cfg) + log := logtest.NewLogger() + + // In this test we don't want to spin a whole process and have to wait for + // every service to report ready (there's an integration test for this). + // So we craft a minimal process with only the debug service in it. + process := &TeleportProcess{ + Config: cfg, + Clock: fakeClock, + logger: log, + metricsRegistry: prometheus.NewRegistry(), + Supervisor: NewSupervisor("supervisor-test", log), + } + + fakeState, err := newProcessState(process) require.NoError(t, err) + fakeState.update(Event{TeleportOKEvent, "dummy"}) + process.state = fakeState + + require.NoError(t, process.initDiagnosticService()) require.NoError(t, process.Start()) - t.Cleanup(func() { - assert.NoError(t, process.Close()) - assert.NoError(t, process.Wait()) - }) // Test setup: create our test metrics. nonce := strings.ReplaceAll(uuid.NewString(), "-", "") @@ -2038,11 +2047,6 @@ func TestDiagnosticsService(t *testing.T) { require.NoError(t, process.metricsRegistry.Register(localMetric)) require.NoError(t, prometheus.Register(globalMetric)) - ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) - t.Cleanup(cancel) - _, err = process.WaitForEvent(ctx, TeleportOKEvent) - require.NoError(t, err) - // Test execution: query the metrics endpoint and check the tests metrics are here. diagAddr, err := process.DiagnosticAddr() require.NoError(t, err)