Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ const (
// ComponentDiagnostic is a diagnostic service
ComponentDiagnostic = "diag"

// ComponentDiagnosticHealth is the health monitor used by the diagnostic
// and debug services.
ComponentDiagnosticHealth = "diag:health"

// ComponentDebug is the debug service, which exposes debugging
// configuration over a Unix socket.
ComponentDebug = "debug"
Expand Down
93 changes: 93 additions & 0 deletions lib/service/diagnostic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Teleport
// Copyright (C) 2025 Gravitational, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package service

import (
"log/slog"
"net/http"

"github.com/gravitational/roundtrip"
"github.com/gravitational/trace"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"

"github.com/gravitational/teleport"
"github.com/gravitational/teleport/lib/srv/debug"
)

type diagnosticHandlerConfig struct {
enableMetrics bool
enableProfiling bool
enableHealth bool
enableLogLeveler bool
}

func (process *TeleportProcess) newDiagnosticHandler(config diagnosticHandlerConfig, logger *slog.Logger) (http.Handler, error) {
if process.state == nil {
return nil, trace.BadParameter("teleport process state machine has not yet been initialized (this is a bug)")
}

mux := http.NewServeMux()

if config.enableMetrics {
mux.Handle("/metrics", process.newMetricsHandler())
}

if config.enableProfiling {
debug.RegisterProfilingHandlers(mux, logger)
}

if config.enableHealth {
mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
roundtrip.ReplyJSON(w, http.StatusOK, map[string]any{"status": "ok"})
})
mux.HandleFunc("/readyz", process.state.readinessHandler())
}

if config.enableLogLeveler {
debug.RegisterLogLevelHandlers(mux, logger, process.Config)
}

return mux, nil
}

// newMetricsHandler creates a new metrics handler serving metrics both from the global prometheus registry and the
// in-process one.
func (process *TeleportProcess) newMetricsHandler() http.Handler {
// We gather metrics both from the in-process registry (preferred metrics registration method)
// and the global registry (used by some Teleport services and many dependencies).
gatherers := prometheus.Gatherers{
process.metricsRegistry,
prometheus.DefaultGatherer,
}

metricsHandler := promhttp.InstrumentMetricHandler(
process.metricsRegistry, promhttp.HandlerFor(gatherers, promhttp.HandlerOpts{
// Errors can happen if metrics are registered with identical names in both the local and the global registry.
// In this case, we log the error but continue collecting metrics. The first collected metric will win
// (the one from the local metrics registry takes precedence).
// As we move more things to the local registry, especially in other tools like tbot, we will have less
// conflicts in tests.
ErrorHandling: promhttp.ContinueOnError,
ErrorLog: promHTTPLogAdapter{
ctx: process.ExitContext(),
Logger: process.logger.With(teleport.ComponentKey, teleport.ComponentMetrics),
},
}),
)
return metricsHandler
}
188 changes: 81 additions & 107 deletions lib/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"net/http"
"net/http/httptest"
"net/http/httputil"
"net/http/pprof"
"os"
"os/signal"
"path/filepath"
Expand All @@ -50,11 +49,9 @@ import (
awssession "github.com/aws/aws-sdk-go/aws/session"
"github.com/google/renameio/v2"
"github.com/google/uuid"
"github.com/gravitational/roundtrip"
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/attribute"
Expand Down Expand Up @@ -149,7 +146,6 @@ import (
alpncommon "github.com/gravitational/teleport/lib/srv/alpnproxy/common"
"github.com/gravitational/teleport/lib/srv/app"
"github.com/gravitational/teleport/lib/srv/db"
"github.com/gravitational/teleport/lib/srv/debug"
"github.com/gravitational/teleport/lib/srv/desktop"
"github.com/gravitational/teleport/lib/srv/ingress"
"github.com/gravitational/teleport/lib/srv/regular"
Expand Down Expand Up @@ -467,6 +463,9 @@ type TeleportProcess struct {
// Both the metricsRegistry and the default global registry are gathered by
// Telepeort's metric service.
metricsRegistry *prometheus.Registry

// state is the process state machine tracking if the process is healthy or not.
state *processState
}

type keyPairKey struct {
Expand Down Expand Up @@ -1226,6 +1225,12 @@ func NewTeleport(cfg *servicecfg.Config) (*TeleportProcess, error) {

serviceStarted := false

ps, err := process.newProcessStateMachine()
if err != nil {
return nil, trace.Wrap(err, "failed to initialize process state machine")
}
process.state = ps

if !cfg.DiagnosticAddr.IsEmpty() {
if err := process.initDiagnosticService(); err != nil {
return nil, trace.Wrap(err)
Expand All @@ -1244,12 +1249,8 @@ func NewTeleport(cfg *servicecfg.Config) (*TeleportProcess, error) {
process.initPyroscope(address)
}

if cfg.DebugService.Enabled {
if err := process.initDebugService(); err != nil {
return nil, trace.Wrap(err)
}
} else {
warnOnErr(process.ExitContext(), process.closeImportedDescriptors(teleport.ComponentDebug), process.logger)
if err := process.initDebugService(cfg.DebugService.Enabled); err != nil {
return nil, trace.Wrap(err)
}

// Create a process wide key generator that will be shared. This is so the
Expand Down Expand Up @@ -3303,11 +3304,16 @@ func (l promHTTPLogAdapter) Println(v ...interface{}) {
// initMetricsService starts the metrics service currently serving metrics for
// prometheus consumption
func (process *TeleportProcess) initMetricsService() error {
mux := http.NewServeMux()
mux.Handle("/metrics", process.newMetricsHandler())

logger := process.logger.With(teleport.ComponentKey, teleport.Component(teleport.ComponentMetrics, process.id))

config := diagnosticHandlerConfig{
enableMetrics: true,
}
mux, err := process.newDiagnosticHandler(config, logger)
if err != nil {
return trace.Wrap(err)
}

listener, err := process.importOrCreateListener(ListenerMetrics, process.Config.Metrics.ListenAddr.Addr)
if err != nil {
return trace.Wrap(err)
Expand Down Expand Up @@ -3389,78 +3395,17 @@ func (process *TeleportProcess) initMetricsService() error {
return nil
}

// newMetricsHandler creates a new metrics handler serving metrics both from the global prometheus registry and the
// in-process one.
func (process *TeleportProcess) newMetricsHandler() http.Handler {
// We gather metrics both from the in-process registry (preferred metrics registration method)
// and the global registry (used by some Teleport services and many dependencies).
gatherers := prometheus.Gatherers{
process.metricsRegistry,
prometheus.DefaultGatherer,
}

metricsHandler := promhttp.InstrumentMetricHandler(
process.metricsRegistry, promhttp.HandlerFor(gatherers, promhttp.HandlerOpts{
// Errors can happen if metrics are registered with identical names in both the local and the global registry.
// In this case, we log the error but continue collecting metrics. The first collected metric will win
// (the one from the local metrics registry takes precedence).
// As we move more things to the local registry, especially in other tools like tbot, we will have less
// conflicts in tests.
ErrorHandling: promhttp.ContinueOnError,
ErrorLog: promHTTPLogAdapter{
ctx: process.ExitContext(),
Logger: process.logger.With(teleport.ComponentKey, teleport.ComponentMetrics),
},
}),
)
return metricsHandler
}

// initDiagnosticService starts diagnostic service currently serving healthz
// and prometheus endpoints
func (process *TeleportProcess) initDiagnosticService() error {
mux := http.NewServeMux()

// support legacy metrics collection in the diagnostic service.
// metrics will otherwise be served by the metrics service if it's enabled
// in the config.
if !process.Config.Metrics.Enabled {
mux.Handle("/metrics", process.newMetricsHandler())
}

if process.Config.Debug {
process.logger.InfoContext(process.ExitContext(), "Adding diagnostic debugging handlers. To connect with profiler, use `go tool pprof <listen_address>`.", "listen_address", process.Config.DiagnosticAddr.Addr)

noWriteTimeout := func(h http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
rc := http.NewResponseController(w) //nolint:bodyclose // bodyclose gets really confused about NewResponseController
if err := rc.SetWriteDeadline(time.Time{}); err == nil {
// don't let the pprof handlers know about the WriteTimeout
r = r.WithContext(context.WithValue(r.Context(), http.ServerContextKey, nil))
}
h(w, r)
}
}

mux.HandleFunc("/debug/pprof/", noWriteTimeout(pprof.Index))
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
mux.HandleFunc("/debug/pprof/profile", noWriteTimeout(pprof.Profile))
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", noWriteTimeout(pprof.Trace))
}

mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
roundtrip.ReplyJSON(w, http.StatusOK, map[string]interface{}{"status": "ok"})
})

logger := process.logger.With(teleport.ComponentKey, teleport.Component(teleport.ComponentDiagnostic, process.id))

// newProcessStateMachine creates a state machine tracking the Teleport process
// state. The state machine is then used by the diagnostics or the debug service
// to evaluate the process health.
func (process *TeleportProcess) newProcessStateMachine() (*processState, error) {
logger := process.logger.With(teleport.ComponentKey, teleport.Component(teleport.ComponentDiagnosticHealth, process.id))
// Create a state machine that will process and update the internal state of
// Teleport based off Events. Use this state machine to return return the
// Teleport based off Events. Use this state machine to return the
// status from the /readyz endpoint.
ps, err := newProcessState(process)
if err != nil {
return trace.Wrap(err)
return nil, trace.Wrap(err)
}

process.RegisterFunc("readyz.monitor", func() error {
Expand All @@ -3482,29 +3427,27 @@ func (process *TeleportProcess) initDiagnosticService() error {
}
}
})
mux.HandleFunc("/readyz", func(w http.ResponseWriter, r *http.Request) {
switch ps.getState() {
// 503
case stateDegraded:
roundtrip.ReplyJSON(w, http.StatusServiceUnavailable, map[string]interface{}{
"status": "teleport is in a degraded state, check logs for details",
})
// 400
case stateRecovering:
roundtrip.ReplyJSON(w, http.StatusBadRequest, map[string]interface{}{
"status": "teleport is recovering from a degraded state, check logs for details",
})
case stateStarting:
roundtrip.ReplyJSON(w, http.StatusBadRequest, map[string]interface{}{
"status": "teleport is starting and hasn't joined the cluster yet",
})
// 200
case stateOK:
roundtrip.ReplyJSON(w, http.StatusOK, map[string]interface{}{
"status": "ok",
})
}
})
return ps, nil
}

// initDiagnosticService starts diagnostic service currently serving healthz
// and prometheus endpoints
func (process *TeleportProcess) initDiagnosticService() error {
logger := process.logger.With(teleport.ComponentKey, teleport.Component(teleport.ComponentDiagnostic, process.id))

config := diagnosticHandlerConfig{
// support legacy metrics collection in the diagnostic service.
// metrics will otherwise be served by the metrics service if it's enabled
// in the config.
enableMetrics: !process.Config.Metrics.Enabled,
enableProfiling: process.Config.Debug,
enableHealth: true,
enableLogLeveler: false,
}
mux, err := process.newDiagnosticHandler(config, logger)
if err != nil {
return trace.Wrap(err)
}

listener, err := process.importOrCreateListener(ListenerDiagnostic, process.Config.DiagnosticAddr.Addr)
if err != nil {
Expand Down Expand Up @@ -3564,17 +3507,48 @@ func (process *TeleportProcess) initDiagnosticService() error {
}

// initDebugService starts debug service serving endpoints used for
// troubleshooting the instance.
func (process *TeleportProcess) initDebugService() error {
// troubleshooting the instance. This service is always active, users can
// disable its sensitive pprof and log-setting endpoints, but the liveness
// and readiness ones are always active.
func (process *TeleportProcess) initDebugService(exposeDebugRoutes bool) error {
if process.state == nil {
return trace.BadParameter("teleport process state machine has not yet been initialized (this is a bug)")
}

logger := process.logger.With(teleport.ComponentKey, teleport.Component(teleport.ComponentDebug, process.id))

// Unix socket creation can fail on paths too long. Depending on the UNIX implementation,
// socket paths cannot exceed 104 or 108 chars.
listener, err := process.importOrCreateListener(ListenerDebug, filepath.Join(process.Config.DataDir, teleport.DebugServiceSocketName))
if err != nil {
if exposeDebugRoutes {
// If the debug service is enabled in the config, this is a hard failure
return trace.Wrap(err)
} else {
// If the debug service was disabled in the config, we issue a warning and will have to continue.
logger.WarnContext(process.ExitContext(),
"Failed to open the debug socket. teleport-update will not be able to accurately check Teleport health.",
"error", err,
)
return nil
}
}

// Users can disable the debug service for compliance reasons but not the health
// routes because the updater relies on them.
config := diagnosticHandlerConfig{
enableMetrics: exposeDebugRoutes,
enableProfiling: exposeDebugRoutes,
enableHealth: true,
enableLogLeveler: exposeDebugRoutes,
}
mux, err := process.newDiagnosticHandler(config, logger)
if err != nil {
return trace.Wrap(err)
}

server := &http.Server{
Handler: debug.NewServeMux(logger, process.Config),
Handler: mux,
ReadTimeout: apidefaults.DefaultIOTimeout,
ReadHeaderTimeout: defaults.ReadHeadersTimeout,
WriteTimeout: apidefaults.DefaultIOTimeout,
Expand Down
Loading