diff --git a/lib/service/diagnostic.go b/lib/service/diagnostic.go index 2dc844dd14005..01dfd60b62411 100644 --- a/lib/service/diagnostic.go +++ b/lib/service/diagnostic.go @@ -21,7 +21,6 @@ import ( "net/http" "github.com/gravitational/roundtrip" - "github.com/gravitational/trace" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/gravitational/teleport" @@ -36,10 +35,6 @@ type diagnosticHandlerConfig struct { } func (process *TeleportProcess) newDiagnosticHandler(config diagnosticHandlerConfig, logger *slog.Logger) (http.Handler, error) { - if process.state == nil { - return nil, trace.BadParameter("teleport process state machine has not yet been initialized (this is a bug)") - } - mux := http.NewServeMux() if config.enableMetrics { @@ -54,7 +49,7 @@ func (process *TeleportProcess) newDiagnosticHandler(config diagnosticHandlerCon mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { roundtrip.ReplyJSON(w, http.StatusOK, map[string]any{"status": "ok"}) }) - mux.HandleFunc("/readyz", process.state.readinessHandler()) + mux.HandleFunc("/readyz", process.HandleReadiness) } if config.enableLogLeveler { diff --git a/lib/service/service.go b/lib/service/service.go index ef8e3a3547f3f..0557d56383f81 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -733,9 +733,6 @@ type TeleportProcess struct { // need to add and remove metrics seevral times (e.g. hosted plugin metrics). *metrics.SyncGatherers - // state is the process state machine tracking if the process is healthy or not. - state *processState - tsrv reversetunnelclient.Server } @@ -1174,14 +1171,17 @@ func NewTeleport(cfg *servicecfg.Config) (_ *TeleportProcess, err error) { } } - supervisor := NewSupervisor(processID, cfg.Logger) - store, err := storage.NewProcessStorage(supervisor.ExitContext(), filepath.Join(cfg.DataDir, teleport.ComponentProcess)) + if cfg.Clock == nil { + cfg.Clock = clockwork.NewRealClock() + } + + supervisor, err := NewSupervisor(processID, cfg.Logger, cfg.Clock) if err != nil { return nil, trace.Wrap(err) } - - if cfg.Clock == nil { - cfg.Clock = clockwork.NewRealClock() + store, err := storage.NewProcessStorage(supervisor.ExitContext(), filepath.Join(cfg.DataDir, teleport.ComponentProcess)) + if err != nil { + return nil, trace.Wrap(err) } // full heartbeat announces are on average every 2/3 * 6/7 of the default @@ -1460,12 +1460,6 @@ func NewTeleport(cfg *servicecfg.Config) (_ *TeleportProcess, err error) { serviceStarted := false - ps, err := process.newProcessStateMachine() - if err != nil { - return nil, trace.Wrap(err, "failed to initialize process state machine") - } - process.state = ps - if !cfg.DiagnosticAddr.IsEmpty() { if err := process.initDiagnosticService(); err != nil { return nil, trace.Wrap(err) @@ -4006,42 +4000,6 @@ func (process *TeleportProcess) initMetricsService() error { return nil } -// newProcessStateMachine creates a state machine tracking the Teleport process -// state. The state machine is then used by the diagnostics or the debug service -// to evaluate the process health. -func (process *TeleportProcess) newProcessStateMachine() (*processState, error) { - logger := process.logger.With(teleport.ComponentKey, teleport.Component(teleport.ComponentDiagnosticHealth, process.id)) - // Create a state machine that will process and update the internal state of - // Teleport based off Events. Use this state machine to return the - // status from the /readyz endpoint. - ps, err := newProcessState(process) - if err != nil { - return nil, trace.Wrap(err) - } - - process.RegisterFunc("readyz.monitor", func() error { - // Start loop to monitor for events that are used to update Teleport state. - ctx, cancel := context.WithCancel(process.GracefulExitContext()) - defer cancel() - - eventCh := make(chan Event, 1024) - process.ListenForEvents(ctx, TeleportDegradedEvent, eventCh) - process.ListenForEvents(ctx, TeleportOKEvent, eventCh) - process.ListenForEvents(ctx, TeleportStartingEvent, eventCh) - - for { - select { - case e := <-eventCh: - ps.update(e) - case <-ctx.Done(): - logger.DebugContext(process.ExitContext(), "Teleport is exiting, returning.") - return nil - } - } - }) - return ps, nil -} - // initDiagnosticService starts diagnostic service currently serving healthz // and prometheus endpoints func (process *TeleportProcess) initDiagnosticService() error { @@ -4123,10 +4081,6 @@ func (process *TeleportProcess) initDiagnosticService() error { // disable its sensitive pprof and log-setting endpoints, but the liveness // and readiness ones are always active. func (process *TeleportProcess) initDebugService(exposeDebugRoutes bool) error { - if process.state == nil { - return trace.BadParameter("teleport process state machine has not yet been initialized (this is a bug)") - } - logger := process.logger.With(teleport.ComponentKey, teleport.Component(teleport.ComponentDebug, process.id)) // Unix socket creation can fail on paths too long. Depending on the UNIX implementation, diff --git a/lib/service/service_test.go b/lib/service/service_test.go index d99b8b8f479dc..fc1451fd17848 100644 --- a/lib/service/service_test.go +++ b/lib/service/service_test.go @@ -294,8 +294,11 @@ func TestMonitor(t *testing.T) { require.NoError(t, err) // this simulates events that happened to be broadcast before the - // readyz.monitor started listening for events + // process was started + process.ExpectService("dummy") + process.ExpectService(teleport.ComponentAuth) process.BroadcastEvent(Event{Name: TeleportOKEvent, Payload: teleport.ComponentAuth}) + process.BroadcastEvent(Event{Name: TeleportOKEvent, Payload: "dummy"}) require.NoError(t, process.Start()) t.Cleanup(func() { @@ -918,10 +921,12 @@ func TestSetupProxyTLSConfig(t *testing.T) { cfg.Proxy.ACME.Enabled = tc.acmeEnabled cfg.DataDir = makeTempDir(t) cfg.Proxy.PublicAddrs = utils.MustParseAddrList("localhost") + supervisor, err := NewSupervisor("process-id", cfg.Logger, cfg.Clock) + require.NoError(t, err) process := TeleportProcess{ Config: cfg, // Setting Supervisor so that `ExitContext` can be called. - Supervisor: NewSupervisor("process-id", cfg.Logger), + Supervisor: supervisor, } tls, err := process.setupProxyTLSConfig( &Connector{}, @@ -1286,8 +1291,10 @@ func TestProxyGRPCServers(t *testing.T) { // Create a new Teleport process to initialize the gRPC servers with KubeProxy // enabled. + supervisor, err := NewSupervisor(hostID, logtest.NewLogger(), clock) + require.NoError(t, err) process := &TeleportProcess{ - Supervisor: NewSupervisor(hostID, logtest.NewLogger()), + Supervisor: supervisor, Config: &servicecfg.Config{ Proxy: servicecfg.ProxyConfig{ Kube: servicecfg.KubeProxyConfig{ @@ -1629,19 +1636,18 @@ func TestDebugService(t *testing.T) { // In this test we don't want to spin a whole process and have to wait for // every service to report ready (there's an integration test for this). // So we craft a minimal process with only the debug service in it. + supervisor, err := NewSupervisor("supervisor-test", log, fakeClock) + require.NoError(t, err) process := &TeleportProcess{ Config: cfg, Clock: fakeClock, logger: log, metricsRegistry: localRegistry, SyncGatherers: metrics.NewSyncGatherers(localRegistry, prometheus.DefaultGatherer), - Supervisor: NewSupervisor("supervisor-test", log), + Supervisor: supervisor, } - fakeState, err := newProcessState(process) - require.NoError(t, err) - fakeState.update(Event{TeleportOKEvent, "dummy"}) - process.state = fakeState + process.BroadcastEvent(Event{TeleportOKEvent, "dummy"}) httpClient := &http.Client{ Timeout: 10 * time.Second, @@ -2125,19 +2131,18 @@ func TestDiagnosticsService(t *testing.T) { // In this test we don't want to spin a whole process and have to wait for // every service to report ready (there's an integration test for this). // So we craft a minimal process with only the debug service in it. + supervisor, err := NewSupervisor("supervisor-test", log, fakeClock) + require.NoError(t, err) process := &TeleportProcess{ Config: cfg, Clock: fakeClock, logger: log, metricsRegistry: localRegistry, SyncGatherers: metrics.NewSyncGatherers(localRegistry, prometheus.DefaultGatherer), - Supervisor: NewSupervisor("supervisor-test", log), + Supervisor: supervisor, } - fakeState, err := newProcessState(process) - require.NoError(t, err) - fakeState.update(Event{TeleportOKEvent, "dummy"}) - process.state = fakeState + process.BroadcastEvent(Event{TeleportOKEvent, "dummy"}) require.NoError(t, process.initDiagnosticService()) require.NoError(t, process.Start()) diff --git a/lib/service/state.go b/lib/service/state.go index dd6cf3eba431a..e4580314120c6 100644 --- a/lib/service/state.go +++ b/lib/service/state.go @@ -26,13 +26,11 @@ import ( "time" "github.com/gravitational/roundtrip" - "github.com/gravitational/trace" "github.com/prometheus/client_golang/prometheus" "github.com/gravitational/teleport" "github.com/gravitational/teleport/lib/client/debug" "github.com/gravitational/teleport/lib/defaults" - "github.com/gravitational/teleport/lib/observability/metrics" ) type componentStateEnum byte @@ -64,9 +62,8 @@ func init() { // processState tracks the state of the Teleport process. type processState struct { - process *TeleportProcess - mu sync.Mutex - states map[string]*componentState + mu sync.Mutex + states map[string]*componentState } type componentState struct { @@ -74,44 +71,43 @@ type componentState struct { state componentStateEnum } -// newProcessState returns a new FSM that tracks the state of the Teleport process. -func newProcessState(process *TeleportProcess) (*processState, error) { - err := metrics.RegisterPrometheusCollectors(stateGauge) - if err != nil { - return nil, trace.Wrap(err) - } +type updateResult int - return &processState{ - process: process, - states: make(map[string]*componentState), - }, nil -} +const ( + _ updateResult = iota -// update the state of a Teleport component. -func (f *processState) update(event Event) { + updateStarting + updateStarted + updateDegraded + updateRecovering + updateRecovered +) + +// update the state of a Teleport component. Returns a value depending on what +// changed in the state of the component if something changed, or 0 if nothing +// changed. +func (f *processState) update(now time.Time, event, component string) updateResult { f.mu.Lock() defer f.mu.Unlock() defer f.updateGauge() - component, ok := event.Payload.(string) - if !ok { - f.process.logger.ErrorContext(f.process.ExitContext(), "Received event broadcast without component name, this is a bug!", "event", event.Name) - return - } s, ok := f.states[component] if !ok { + if f.states == nil { + f.states = make(map[string]*componentState) + } // Register a new component. - s = &componentState{recoveryTime: f.process.Clock.Now(), state: stateStarting} + s = &componentState{recoveryTime: now, state: stateStarting} f.states[component] = s } - switch event.Name { + switch event { case TeleportStartingEvent: - f.process.logger.DebugContext(f.process.ExitContext(), "Teleport component is starting", "component", component) + return updateStarting // If a degraded event was received, always change the state to degraded. case TeleportDegradedEvent: s.state = stateDegraded - f.process.logger.InfoContext(f.process.ExitContext(), "Detected Teleport component is running in a degraded state.", "component", component) + return updateDegraded // If the current state is degraded, and a OK event has been // received, change the state to recovering. If the current state is // recovering and a OK events is received, if it's been longer @@ -121,18 +117,19 @@ func (f *processState) update(event Event) { switch s.state { case stateStarting: s.state = stateOK - f.process.logger.DebugContext(f.process.ExitContext(), "Teleport component has started.", "component", component) + return updateStarted case stateDegraded: s.state = stateRecovering - s.recoveryTime = f.process.Clock.Now() - f.process.logger.InfoContext(f.process.ExitContext(), "Teleport component is recovering from a degraded state.", "component", component) + s.recoveryTime = now + return updateRecovering case stateRecovering: - if f.process.Clock.Since(s.recoveryTime) > defaults.HeartbeatCheckPeriod*2 { + if now.Sub(s.recoveryTime) > defaults.HeartbeatCheckPeriod*2 { s.state = stateOK - f.process.logger.InfoContext(f.process.ExitContext(), "Teleport component has recovered from a degraded state.", "component", component) + return updateRecovered } } } + return 0 } // getStateLocked returns the overall process state based on the state of @@ -178,32 +175,30 @@ func (f *processState) getState() componentStateEnum { return f.getStateLocked() } -func (f *processState) readinessHandler() http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - switch f.getState() { - // 503 - case stateDegraded: - roundtrip.ReplyJSON(w, http.StatusServiceUnavailable, debug.Readiness{ - Status: "teleport is in a degraded state, check logs for details", - PID: os.Getpid(), - }) - // 400 - case stateRecovering: - roundtrip.ReplyJSON(w, http.StatusBadRequest, debug.Readiness{ - Status: "teleport is recovering from a degraded state, check logs for details", - PID: os.Getpid(), - }) - case stateStarting: - roundtrip.ReplyJSON(w, http.StatusBadRequest, debug.Readiness{ - Status: "teleport is starting and hasn't joined the cluster yet", - PID: os.Getpid(), - }) - // 200 - case stateOK: - roundtrip.ReplyJSON(w, http.StatusOK, debug.Readiness{ - Status: "ok", - PID: os.Getpid(), - }) - } +func (f *processState) handleReadiness(w http.ResponseWriter, r *http.Request) { + switch f.getState() { + // 503 + case stateDegraded: + roundtrip.ReplyJSON(w, http.StatusServiceUnavailable, debug.Readiness{ + Status: "teleport is in a degraded state, check logs for details", + PID: os.Getpid(), + }) + // 400 + case stateRecovering: + roundtrip.ReplyJSON(w, http.StatusBadRequest, debug.Readiness{ + Status: "teleport is recovering from a degraded state, check logs for details", + PID: os.Getpid(), + }) + case stateStarting: + roundtrip.ReplyJSON(w, http.StatusBadRequest, debug.Readiness{ + Status: "teleport is starting and hasn't joined the cluster yet", + PID: os.Getpid(), + }) + // 200 + case stateOK: + roundtrip.ReplyJSON(w, http.StatusOK, debug.Readiness{ + Status: "ok", + PID: os.Getpid(), + }) } } diff --git a/lib/service/state_test.go b/lib/service/state_test.go index eab50636d6f8e..42014db8257c4 100644 --- a/lib/service/state_test.go +++ b/lib/service/state_test.go @@ -19,7 +19,6 @@ package service import ( - "context" "testing" "github.com/jonboulle/clockwork" @@ -103,56 +102,26 @@ func TestProcessStateStarting(t *testing.T) { slowComponent := teleport.Component("slow-component") log := logtest.NewLogger() - supervisor := NewSupervisor("test-process-state", log) + fakeClock := clockwork.NewFakeClock() + supervisor, err := NewSupervisor("test-process-state", log, fakeClock) + require.NoError(t, err) process := &TeleportProcess{ Supervisor: supervisor, - Clock: clockwork.NewFakeClock(), + Clock: fakeClock, logger: log, } - ps, err := newProcessState(process) - require.NoError(t, err) - process.state = ps - - eventProcessor := newFakeEventProcessor(t.Context(), process) + ps := &process.Supervisor.(*LocalSupervisor).processState require.Equal(t, stateStarting, ps.getState(), "no services are running, we are starting") process.OnHeartbeat(component)(nil) - eventProcessor.processEvent() require.Equal(t, stateOK, ps.getState(), "a single service is running, we are healthy") process.ExpectService(slowComponent) - eventProcessor.processEvent() require.Equal(t, stateStarting, ps.getState(), "we know about a second service starting, we should be in starting state") process.OnHeartbeat(component)(nil) - eventProcessor.processEvent() require.Equal(t, stateStarting, ps.getState(), "we know about a second service starting, we should still be in starting state") process.OnHeartbeat(slowComponent)(nil) - eventProcessor.processEvent() require.Equal(t, stateOK, ps.getState(), "two services are running, we are healthy") } - -// fakeEventProcessor synchronously processes events in tests. The real TeleportProcess -// has a dedicated routine for that, but testing with asynchronous event processing -// would be troublesome and flaky. -type fakeEventProcessor struct { - eventCh chan Event - ps *processState -} - -func newFakeEventProcessor(ctx context.Context, process *TeleportProcess) *fakeEventProcessor { - eventCh := make(chan Event, 1024) - process.ListenForEvents(ctx, TeleportDegradedEvent, eventCh) - process.ListenForEvents(ctx, TeleportOKEvent, eventCh) - process.ListenForEvents(ctx, TeleportStartingEvent, eventCh) - return &fakeEventProcessor{ - eventCh: eventCh, - ps: process.state, - } -} - -func (f *fakeEventProcessor) processEvent() { - evt := <-f.eventCh - f.ps.update(evt) -} diff --git a/lib/service/supervisor.go b/lib/service/supervisor.go index 2a930366379cf..efe126139c11a 100644 --- a/lib/service/supervisor.go +++ b/lib/service/supervisor.go @@ -23,11 +23,13 @@ import ( "errors" "fmt" "log/slog" + "net/http" "slices" "sync" "time" "github.com/gravitational/trace" + "github.com/jonboulle/clockwork" "github.com/prometheus/client_golang/prometheus" "github.com/gravitational/teleport" @@ -105,6 +107,9 @@ type Supervisor interface { // GracefulExitContext returns context that will be closed when // a graceful or hard TeleportExitEvent is broadcast. GracefulExitContext() context.Context + + // HandleReadiness is the HTTP handler for "/readyz". + HandleReadiness(http.ResponseWriter, *http.Request) } // EventMapping maps a sequence of incoming @@ -166,10 +171,22 @@ type LocalSupervisor struct { // log specifies the logger log *slog.Logger + + // clock is used as a source of time, to support fake clocks in tests. + clock clockwork.Clock + + // processState is the process state machine tracking if the process is + // healthy or not. + processState processState } // NewSupervisor returns new instance of initialized supervisor -func NewSupervisor(id string, parentLog *slog.Logger) Supervisor { +func NewSupervisor(id string, parentLog *slog.Logger, clock clockwork.Clock) (*LocalSupervisor, error) { + // used by processState + if err := metrics.RegisterPrometheusCollectors(stateGauge); err != nil { + return nil, trace.Wrap(err) + } + ctx := context.TODO() closeContext, cancel := context.WithCancel(ctx) @@ -180,6 +197,10 @@ func NewSupervisor(id string, parentLog *slog.Logger) Supervisor { // in the event of graceful exit must also terminate in the event of an immediate exit. gracefulExitContext, signalGracefulExit := context.WithCancel(exitContext) + if clock == nil { + clock = clockwork.NewRealClock() + } + srv := &LocalSupervisor{ state: stateCreated, id: id, @@ -197,9 +218,12 @@ func NewSupervisor(id string, parentLog *slog.Logger) Supervisor { signalGracefulExit: signalGracefulExit, log: parentLog.With(teleport.ComponentKey, teleport.Component(teleport.ComponentProcess, id)), + + clock: clock, } + go srv.fanOut() - return srv + return srv, nil } // Event is a special service event that can be generated @@ -403,13 +427,23 @@ func (s *LocalSupervisor) GracefulExitContext() context.Context { return s.gracefulExitContext } +// HandleReadiness implements [Supervisor]. +func (s *LocalSupervisor) HandleReadiness(w http.ResponseWriter, r *http.Request) { + s.processState.handleReadiness(w, r) +} + // BroadcastEvent generates event and broadcasts it to all // subscribed parties. func (s *LocalSupervisor) BroadcastEvent(event Event) { s.Lock() defer s.Unlock() - if event.Name == TeleportExitEvent { + // some events have additional handling here because they affect the + // behavior or status of the process as a whole: Exit begins the shutdown by + // causing contexts to be closed, and Degraded/OK/Starting update the + // process' health. + switch event.Name { + case TeleportExitEvent: // if exit event includes a context payload, it is a "graceful" exit, and // we need to hold off closing the supervisor's exit context until after // the graceful context has closed. If not, it is an immediate exit. @@ -425,6 +459,25 @@ func (s *LocalSupervisor) BroadcastEvent(event Event) { } else { s.signalExit() } + case TeleportDegradedEvent, TeleportOKEvent, TeleportStartingEvent: + componentName, ok := event.Payload.(string) + if !ok { + s.log.ErrorContext(s.closeContext, "Received event broadcast without component name, this is a bug!", "event", event.Name) + break + } + updateResult := s.processState.update(s.clock.Now(), event.Name, componentName) + switch updateResult { + case updateStarting: + s.log.DebugContext(s.closeContext, "Teleport component is starting", "component", componentName) + case updateStarted: + s.log.DebugContext(s.closeContext, "Teleport component has started.", "component", componentName) + case updateDegraded: + s.log.InfoContext(s.closeContext, "Detected Teleport component is running in a degraded state.", "component", componentName) + case updateRecovering: + s.log.InfoContext(s.closeContext, "Teleport component is recovering from a degraded state.", "component", componentName) + case updateRecovered: + s.log.InfoContext(s.closeContext, "Teleport component has recovered from a degraded state.", "component", componentName) + } } s.events[event.Name] = event