Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion integration/helpers/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -1329,7 +1329,9 @@ func (i *TeleInstance) Start() error {
if i.Config.WindowsDesktop.Enabled {
expectedEvents = append(expectedEvents, service.WindowsDesktopReady)
}

if i.Config.Relay.Enabled {
expectedEvents = append(expectedEvents, service.RelayReady)
}
if i.Config.Discovery.Enabled {
expectedEvents = append(expectedEvents, service.DiscoveryReady)
}
Expand Down
66 changes: 66 additions & 0 deletions lib/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2606,6 +2606,72 @@ func (process *TeleportProcess) initAuthService() error {
}
}

// We mark the process state as starting until the auth backend is ready.
// If cache is enabled, this will wait for the cache to be populated.
// We don't want auth to say its ready until its cache is populated,
// else a rollout might progress too quickly and cause backend pressure
// and outages.
{
component := teleport.Component(teleport.ComponentAuth, "backend")
process.ExpectService(component)
process.RegisterFunc("auth.wait-for-event-stream", func() error {
start := process.Clock.Now()

retry, err := retryutils.NewRetryV2(retryutils.RetryV2Config{
First: 0,
Driver: retryutils.NewLinearDriver(5 * time.Second),
Max: time.Minute,
Jitter: retryutils.DefaultJitter,
Clock: process.Clock,
})
if err != nil {
return trace.Wrap(err, "creating the backend watch retry (this is a bug)")
}
log := process.logger.With(teleport.ComponentLabel, component)
var attempt int

// The wait logic is retried until it is successful or a termination is triggered.
for {
attempt++
select {
case <-process.GracefulExitContext().Done():
return trace.Wrap(err, "context canceled while backing off (attempt %d)", attempt)
case <-retry.After():
retry.Inc()
}

w, err := authServer.NewWatcher(process.ExitContext(), types.Watch{
Name: "auth.wait-for-backend",
Kinds: []types.WatchKind{
{Kind: types.KindClusterName},
},
})
if err != nil {
log.ErrorContext(process.ExitContext(), "Failed to create watcher", "kind", types.KindClusterName, "attempt", attempt, "error", err)
continue
}

select {
case evt := <-w.Events():
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can add a fallback safety timeout when it will force to start the service even if a cache is not fully healthy - like 5m or 10m

Imagine scenario when one of not important collection for the teleport functionality is broken like one of the db_server element exceeds the max gRPC message size.

So if a bad resource is created the auth are still running but the db access flow is degraded. If auth pods will be restarted due to troubleshooting all the flow will be broken and users will lost access to the teleport cluster entirely.

Copy link
Copy Markdown
Contributor Author

@hugoShaka hugoShaka Oct 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imagine scenario when one of not important collection for the teleport functionality is broken like one of the db_server element exceeds the max gRPC message size.

auth cannot break because of the max grpc message size because it doesn't send grpc message when creating its cache. gRPC message size is checked on the receiver side (proxy, node, ...), auth doesn't establish a grpc client against itself.

I would argue the opposite: if we have an auth that cannot build a valid cache, we should never let the rollout proceed and send clients to this auth.

This happened 3 months ago for a very large financial customer: they were running with broken cache for a week and they complained about constantly being disconnected from the UI and being unable to access any resource. A Teleport auth unable to establish watcher is a completely dysfunctional Teleport instance and should not be allowed to serve requests. Keeping the pod unready would have made the issue visible.

w.Close()
if evt.Type != types.OpInit {
log.ErrorContext(process.ExitContext(), "expected init event, got something else (this is a bug)", "kind", types.KindClusterName, "attempt", attempt, "error", err, "event_type", evt.Type)
continue
}
process.logger.With(teleport.ComponentLabel, component).InfoContext(process.ExitContext(), "auth backend initialized", "duration", process.Clock.Since(start).String())
process.OnHeartbeat(component)(nil)
return nil
case <-w.Done():
log.ErrorContext(process.ExitContext(), "watcher closed while waiting for backend init", "kind", types.KindClusterName, "attempt", attempt, "error", w.Error())
continue
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we OnHeartbeat(component)(w.Error()) or is it better to not actively unready the process in this scenario?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm worried that for any reason w.Error() returns nil and we accidentally turn ready

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The syncRotationStateCycle uses trace.ConnectionProblem(watcher.Error(), "watcher has disconnected") (probably because the watcher from storage always returns nil from error 😳) which is guaranteed to be non-nil, so that would be an option, I don't think it would be a good idea tho because from what I'm seeing in the state machine logic we would never recover without at least two TeleportOKEvents spaced at least by 10 seconds, so anything that's only ever sending one TeleportOKEvent must not send anything but TeleportOKEvent (or exactly one TeleportDegradedEvent and then die).

case <-process.GracefulExitContext().Done():
w.Close()
return trace.Wrap(err, "context canceled while waiting for backendf init event")
}
}
})
}

// Register TLS endpoint of the auth service
tlsConfig, err := connector.ServerTLSConfig(cfg.CipherSuites)
if err != nil {
Expand Down
88 changes: 46 additions & 42 deletions lib/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1538,32 +1538,30 @@ func TestDebugService(t *testing.T) {
fakeClock := clockwork.NewFakeClock()

dataDir := makeTempDir(t)
cfg := servicecfg.MakeDefaultConfig()
cfg.DebugService.Enabled = true
cfg := &servicecfg.Config{
Clock: fakeClock,
DataDir: dataDir,
}
cfg.Clock = fakeClock
cfg.DataDir = dataDir
cfg.DiagnosticAddr = utils.NetAddr{AddrNetwork: "tcp", Addr: "127.0.0.1:0"}
cfg.SetAuthServerAddress(utils.NetAddr{AddrNetwork: "tcp", Addr: "127.0.0.1:0"})
cfg.Auth.Enabled = true
cfg.Proxy.Enabled = false
cfg.Auth.StorageConfig.Params["path"] = dataDir
cfg.Auth.ListenAddr = utils.NetAddr{AddrNetwork: "tcp", Addr: "127.0.0.1:0"}
cfg.SSH.Enabled = false
cfg.CircuitBreakerConfig = breaker.NoopBreakerConfig()

process, err := NewTeleport(cfg)
require.NoError(t, err)
log := logtest.NewLogger()

require.NoError(t, process.Start())
t.Cleanup(func() {
require.NoError(t, process.Close())
require.NoError(t, process.Wait())
})
// In this test we don't want to spin a whole process and have to wait for
// every service to report ready (there's an integration test for this).
// So we craft a minimal process with only the debug service in it.
process := &TeleportProcess{
Config: cfg,
Clock: fakeClock,
logger: log,
metricsRegistry: prometheus.NewRegistry(),
Supervisor: NewSupervisor("supervisor-test", log),
}

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
t.Cleanup(cancel)
_, err = process.WaitForEvent(ctx, TeleportOKEvent)
fakeState, err := newProcessState(process)
require.NoError(t, err)
fakeState.update(Event{TeleportOKEvent, "dummy"})
process.state = fakeState

httpClient := &http.Client{
Timeout: 10 * time.Second,
Expand All @@ -1574,6 +1572,9 @@ func TestDebugService(t *testing.T) {
},
}

require.NoError(t, process.initDebugService(true))
require.NoError(t, process.Start())

// Testing the debug listener.
// Fetch a random path, it should return 404 error.
req, err := httpClient.Get("http://debug/random")
Expand Down Expand Up @@ -2003,27 +2004,35 @@ func TestMetricsService(t *testing.T) {
// health routes.
func TestDiagnosticsService(t *testing.T) {
t.Parallel()
// Test setup: create a new teleport process

fakeClock := clockwork.NewFakeClock()
dataDir := makeTempDir(t)
cfg := servicecfg.MakeDefaultConfig()
cfg.DataDir = dataDir
cfg.SetAuthServerAddress(utils.NetAddr{AddrNetwork: "tcp", Addr: "127.0.0.1:0"})
cfg.Auth.Enabled = true
cfg.Proxy.Enabled = false
cfg.SSH.Enabled = false
cfg.DebugService.Enabled = false
cfg.Auth.StorageConfig.Params["path"] = dataDir
cfg.Auth.ListenAddr = utils.NetAddr{AddrNetwork: "tcp", Addr: "127.0.0.1:0"}
cfg.DiagnosticAddr = utils.NetAddr{AddrNetwork: "tcp", Addr: "127.0.0.1:0"}
cfg := &servicecfg.Config{
Clock: fakeClock,
DataDir: dataDir,
DiagnosticAddr: utils.NetAddr{AddrNetwork: "tcp", Addr: "127.0.0.1:0"},
}

// Test setup: Create and start the Teleport service.
process, err := NewTeleport(cfg)
log := logtest.NewLogger()

// In this test we don't want to spin a whole process and have to wait for
// every service to report ready (there's an integration test for this).
// So we craft a minimal process with only the debug service in it.
process := &TeleportProcess{
Config: cfg,
Clock: fakeClock,
logger: log,
metricsRegistry: prometheus.NewRegistry(),
Supervisor: NewSupervisor("supervisor-test", log),
}

fakeState, err := newProcessState(process)
require.NoError(t, err)
fakeState.update(Event{TeleportOKEvent, "dummy"})
process.state = fakeState

require.NoError(t, process.initDiagnosticService())
require.NoError(t, process.Start())
t.Cleanup(func() {
assert.NoError(t, process.Close())
assert.NoError(t, process.Wait())
})

// Test setup: create our test metrics.
nonce := strings.ReplaceAll(uuid.NewString(), "-", "")
Expand All @@ -2038,11 +2047,6 @@ func TestDiagnosticsService(t *testing.T) {
require.NoError(t, process.metricsRegistry.Register(localMetric))
require.NoError(t, prometheus.Register(globalMetric))

ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
t.Cleanup(cancel)
_, err = process.WaitForEvent(ctx, TeleportOKEvent)
require.NoError(t, err)

// Test execution: query the metrics endpoint and check the tests metrics are here.
diagAddr, err := process.DiagnosticAddr()
require.NoError(t, err)
Expand Down
Loading