Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions lib/service/diagnostic.go
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you be able to backport this to v17? Is it feasible? I just got a TestDebugService hit on v17.

Copy link
Copy Markdown
Contributor

@hugoShaka hugoShaka Mar 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I vote against backporting this. Each time we changed readiness we broke things, and this happened many times. v17 should be kept stable. This is a high risk change for a low reward.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have a great track record in touching readiness without breaking it and v17 is supposed to be stable; unless the test proves to be especially flaky in v17 I'd prefer not to touch anything. 😬

The backport would also not be clean without including something like #61620 (partial #59667 and #59907), which adds to the risk of either getting a tweaked backport wrong or to add more changes than necessary to a stable release.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understandable! Thanks.

Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"net/http"

"github.com/gravitational/roundtrip"
"github.com/gravitational/trace"
"github.com/prometheus/client_golang/prometheus/promhttp"

"github.com/gravitational/teleport"
Expand All @@ -36,10 +35,6 @@ type diagnosticHandlerConfig struct {
}

func (process *TeleportProcess) newDiagnosticHandler(config diagnosticHandlerConfig, logger *slog.Logger) (http.Handler, error) {
if process.state == nil {
return nil, trace.BadParameter("teleport process state machine has not yet been initialized (this is a bug)")
}

mux := http.NewServeMux()

if config.enableMetrics {
Expand All @@ -54,7 +49,7 @@ func (process *TeleportProcess) newDiagnosticHandler(config diagnosticHandlerCon
mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
roundtrip.ReplyJSON(w, http.StatusOK, map[string]any{"status": "ok"})
})
mux.HandleFunc("/readyz", process.state.readinessHandler())
mux.HandleFunc("/readyz", process.HandleReadiness)
}

if config.enableLogLeveler {
Expand Down
62 changes: 8 additions & 54 deletions lib/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,9 +733,6 @@ type TeleportProcess struct {
// need to add and remove metrics seevral times (e.g. hosted plugin metrics).
*metrics.SyncGatherers

// state is the process state machine tracking if the process is healthy or not.
state *processState

tsrv reversetunnelclient.Server
}

Expand Down Expand Up @@ -1174,14 +1171,17 @@ func NewTeleport(cfg *servicecfg.Config) (_ *TeleportProcess, err error) {
}
}

supervisor := NewSupervisor(processID, cfg.Logger)
store, err := storage.NewProcessStorage(supervisor.ExitContext(), filepath.Join(cfg.DataDir, teleport.ComponentProcess))
if cfg.Clock == nil {
cfg.Clock = clockwork.NewRealClock()
}

supervisor, err := NewSupervisor(processID, cfg.Logger, cfg.Clock)
if err != nil {
return nil, trace.Wrap(err)
}

if cfg.Clock == nil {
cfg.Clock = clockwork.NewRealClock()
store, err := storage.NewProcessStorage(supervisor.ExitContext(), filepath.Join(cfg.DataDir, teleport.ComponentProcess))
if err != nil {
return nil, trace.Wrap(err)
}

// full heartbeat announces are on average every 2/3 * 6/7 of the default
Expand Down Expand Up @@ -1460,12 +1460,6 @@ func NewTeleport(cfg *servicecfg.Config) (_ *TeleportProcess, err error) {

serviceStarted := false

ps, err := process.newProcessStateMachine()
if err != nil {
return nil, trace.Wrap(err, "failed to initialize process state machine")
}
process.state = ps

if !cfg.DiagnosticAddr.IsEmpty() {
if err := process.initDiagnosticService(); err != nil {
return nil, trace.Wrap(err)
Expand Down Expand Up @@ -4006,42 +4000,6 @@ func (process *TeleportProcess) initMetricsService() error {
return nil
}

// newProcessStateMachine creates a state machine tracking the Teleport process
// state. The state machine is then used by the diagnostics or the debug service
// to evaluate the process health.
func (process *TeleportProcess) newProcessStateMachine() (*processState, error) {
logger := process.logger.With(teleport.ComponentKey, teleport.Component(teleport.ComponentDiagnosticHealth, process.id))
// Create a state machine that will process and update the internal state of
// Teleport based off Events. Use this state machine to return the
// status from the /readyz endpoint.
ps, err := newProcessState(process)
if err != nil {
return nil, trace.Wrap(err)
}

process.RegisterFunc("readyz.monitor", func() error {
// Start loop to monitor for events that are used to update Teleport state.
ctx, cancel := context.WithCancel(process.GracefulExitContext())
defer cancel()

eventCh := make(chan Event, 1024)
process.ListenForEvents(ctx, TeleportDegradedEvent, eventCh)
process.ListenForEvents(ctx, TeleportOKEvent, eventCh)
process.ListenForEvents(ctx, TeleportStartingEvent, eventCh)

for {
select {
case e := <-eventCh:
ps.update(e)
case <-ctx.Done():
logger.DebugContext(process.ExitContext(), "Teleport is exiting, returning.")
return nil
}
}
})
return ps, nil
}

// initDiagnosticService starts diagnostic service currently serving healthz
// and prometheus endpoints
func (process *TeleportProcess) initDiagnosticService() error {
Expand Down Expand Up @@ -4123,10 +4081,6 @@ func (process *TeleportProcess) initDiagnosticService() error {
// disable its sensitive pprof and log-setting endpoints, but the liveness
// and readiness ones are always active.
func (process *TeleportProcess) initDebugService(exposeDebugRoutes bool) error {
if process.state == nil {
return trace.BadParameter("teleport process state machine has not yet been initialized (this is a bug)")
}

logger := process.logger.With(teleport.ComponentKey, teleport.Component(teleport.ComponentDebug, process.id))

// Unix socket creation can fail on paths too long. Depending on the UNIX implementation,
Expand Down
31 changes: 18 additions & 13 deletions lib/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,11 @@ func TestMonitor(t *testing.T) {
require.NoError(t, err)

// this simulates events that happened to be broadcast before the
// readyz.monitor started listening for events
// process was started
process.ExpectService("dummy")
process.ExpectService(teleport.ComponentAuth)
process.BroadcastEvent(Event{Name: TeleportOKEvent, Payload: teleport.ComponentAuth})
process.BroadcastEvent(Event{Name: TeleportOKEvent, Payload: "dummy"})

require.NoError(t, process.Start())
t.Cleanup(func() {
Expand Down Expand Up @@ -918,10 +921,12 @@ func TestSetupProxyTLSConfig(t *testing.T) {
cfg.Proxy.ACME.Enabled = tc.acmeEnabled
cfg.DataDir = makeTempDir(t)
cfg.Proxy.PublicAddrs = utils.MustParseAddrList("localhost")
supervisor, err := NewSupervisor("process-id", cfg.Logger, cfg.Clock)
require.NoError(t, err)
process := TeleportProcess{
Config: cfg,
// Setting Supervisor so that `ExitContext` can be called.
Supervisor: NewSupervisor("process-id", cfg.Logger),
Supervisor: supervisor,
}
tls, err := process.setupProxyTLSConfig(
&Connector{},
Expand Down Expand Up @@ -1286,8 +1291,10 @@ func TestProxyGRPCServers(t *testing.T) {

// Create a new Teleport process to initialize the gRPC servers with KubeProxy
// enabled.
supervisor, err := NewSupervisor(hostID, logtest.NewLogger(), clock)
require.NoError(t, err)
process := &TeleportProcess{
Supervisor: NewSupervisor(hostID, logtest.NewLogger()),
Supervisor: supervisor,
Config: &servicecfg.Config{
Proxy: servicecfg.ProxyConfig{
Kube: servicecfg.KubeProxyConfig{
Expand Down Expand Up @@ -1629,19 +1636,18 @@ func TestDebugService(t *testing.T) {
// In this test we don't want to spin a whole process and have to wait for
// every service to report ready (there's an integration test for this).
// So we craft a minimal process with only the debug service in it.
supervisor, err := NewSupervisor("supervisor-test", log, fakeClock)
require.NoError(t, err)
process := &TeleportProcess{
Config: cfg,
Clock: fakeClock,
logger: log,
metricsRegistry: localRegistry,
SyncGatherers: metrics.NewSyncGatherers(localRegistry, prometheus.DefaultGatherer),
Supervisor: NewSupervisor("supervisor-test", log),
Supervisor: supervisor,
}

fakeState, err := newProcessState(process)
require.NoError(t, err)
fakeState.update(Event{TeleportOKEvent, "dummy"})
process.state = fakeState
process.BroadcastEvent(Event{TeleportOKEvent, "dummy"})

httpClient := &http.Client{
Timeout: 10 * time.Second,
Expand Down Expand Up @@ -2125,19 +2131,18 @@ func TestDiagnosticsService(t *testing.T) {
// In this test we don't want to spin a whole process and have to wait for
// every service to report ready (there's an integration test for this).
// So we craft a minimal process with only the debug service in it.
supervisor, err := NewSupervisor("supervisor-test", log, fakeClock)
require.NoError(t, err)
process := &TeleportProcess{
Config: cfg,
Clock: fakeClock,
logger: log,
metricsRegistry: localRegistry,
SyncGatherers: metrics.NewSyncGatherers(localRegistry, prometheus.DefaultGatherer),
Supervisor: NewSupervisor("supervisor-test", log),
Supervisor: supervisor,
}

fakeState, err := newProcessState(process)
require.NoError(t, err)
fakeState.update(Event{TeleportOKEvent, "dummy"})
process.state = fakeState
process.BroadcastEvent(Event{TeleportOKEvent, "dummy"})

require.NoError(t, process.initDiagnosticService())
require.NoError(t, process.Start())
Expand Down
113 changes: 54 additions & 59 deletions lib/service/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,11 @@ import (
"time"

"github.com/gravitational/roundtrip"
"github.com/gravitational/trace"
"github.com/prometheus/client_golang/prometheus"

"github.com/gravitational/teleport"
"github.com/gravitational/teleport/lib/client/debug"
"github.com/gravitational/teleport/lib/defaults"
"github.com/gravitational/teleport/lib/observability/metrics"
)

type componentStateEnum byte
Expand Down Expand Up @@ -64,54 +62,52 @@ func init() {

// processState tracks the state of the Teleport process.
type processState struct {
process *TeleportProcess
mu sync.Mutex
states map[string]*componentState
mu sync.Mutex
states map[string]*componentState
}

type componentState struct {
recoveryTime time.Time
state componentStateEnum
}

// newProcessState returns a new FSM that tracks the state of the Teleport process.
func newProcessState(process *TeleportProcess) (*processState, error) {
err := metrics.RegisterPrometheusCollectors(stateGauge)
if err != nil {
return nil, trace.Wrap(err)
}
type updateResult int

return &processState{
process: process,
states: make(map[string]*componentState),
}, nil
}
const (
_ updateResult = iota

// update the state of a Teleport component.
func (f *processState) update(event Event) {
updateStarting
updateStarted
updateDegraded
updateRecovering
updateRecovered
)

// update the state of a Teleport component. Returns a value depending on what
// changed in the state of the component if something changed, or 0 if nothing
// changed.
func (f *processState) update(now time.Time, event, component string) updateResult {
f.mu.Lock()
defer f.mu.Unlock()
defer f.updateGauge()

component, ok := event.Payload.(string)
if !ok {
f.process.logger.ErrorContext(f.process.ExitContext(), "Received event broadcast without component name, this is a bug!", "event", event.Name)
return
}
s, ok := f.states[component]
if !ok {
if f.states == nil {
f.states = make(map[string]*componentState)
}
// Register a new component.
s = &componentState{recoveryTime: f.process.Clock.Now(), state: stateStarting}
s = &componentState{recoveryTime: now, state: stateStarting}
f.states[component] = s
}

switch event.Name {
switch event {
case TeleportStartingEvent:
f.process.logger.DebugContext(f.process.ExitContext(), "Teleport component is starting", "component", component)
return updateStarting
// If a degraded event was received, always change the state to degraded.
case TeleportDegradedEvent:
s.state = stateDegraded
f.process.logger.InfoContext(f.process.ExitContext(), "Detected Teleport component is running in a degraded state.", "component", component)
return updateDegraded
// If the current state is degraded, and a OK event has been
// received, change the state to recovering. If the current state is
// recovering and a OK events is received, if it's been longer
Expand All @@ -121,18 +117,19 @@ func (f *processState) update(event Event) {
switch s.state {
case stateStarting:
s.state = stateOK
f.process.logger.DebugContext(f.process.ExitContext(), "Teleport component has started.", "component", component)
return updateStarted
case stateDegraded:
s.state = stateRecovering
s.recoveryTime = f.process.Clock.Now()
f.process.logger.InfoContext(f.process.ExitContext(), "Teleport component is recovering from a degraded state.", "component", component)
s.recoveryTime = now
return updateRecovering
case stateRecovering:
if f.process.Clock.Since(s.recoveryTime) > defaults.HeartbeatCheckPeriod*2 {
if now.Sub(s.recoveryTime) > defaults.HeartbeatCheckPeriod*2 {
s.state = stateOK
f.process.logger.InfoContext(f.process.ExitContext(), "Teleport component has recovered from a degraded state.", "component", component)
return updateRecovered
}
}
}
return 0
}

// getStateLocked returns the overall process state based on the state of
Expand Down Expand Up @@ -178,32 +175,30 @@ func (f *processState) getState() componentStateEnum {
return f.getStateLocked()
}

func (f *processState) readinessHandler() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
switch f.getState() {
// 503
case stateDegraded:
roundtrip.ReplyJSON(w, http.StatusServiceUnavailable, debug.Readiness{
Status: "teleport is in a degraded state, check logs for details",
PID: os.Getpid(),
})
// 400
case stateRecovering:
roundtrip.ReplyJSON(w, http.StatusBadRequest, debug.Readiness{
Status: "teleport is recovering from a degraded state, check logs for details",
PID: os.Getpid(),
})
case stateStarting:
roundtrip.ReplyJSON(w, http.StatusBadRequest, debug.Readiness{
Status: "teleport is starting and hasn't joined the cluster yet",
PID: os.Getpid(),
})
// 200
case stateOK:
roundtrip.ReplyJSON(w, http.StatusOK, debug.Readiness{
Status: "ok",
PID: os.Getpid(),
})
}
func (f *processState) handleReadiness(w http.ResponseWriter, r *http.Request) {
switch f.getState() {
// 503
case stateDegraded:
roundtrip.ReplyJSON(w, http.StatusServiceUnavailable, debug.Readiness{
Status: "teleport is in a degraded state, check logs for details",
PID: os.Getpid(),
})
// 400
case stateRecovering:
roundtrip.ReplyJSON(w, http.StatusBadRequest, debug.Readiness{
Status: "teleport is recovering from a degraded state, check logs for details",
PID: os.Getpid(),
})
case stateStarting:
roundtrip.ReplyJSON(w, http.StatusBadRequest, debug.Readiness{
Status: "teleport is starting and hasn't joined the cluster yet",
PID: os.Getpid(),
})
// 200
case stateOK:
roundtrip.ReplyJSON(w, http.StatusOK, debug.Readiness{
Status: "ok",
PID: os.Getpid(),
})
}
}
Loading
Loading