From bcf3d68fb14bf51a28f1b6e3d78a2e80049dc1b5 Mon Sep 17 00:00:00 2001 From: Edoardo Spadolini Date: Thu, 18 Apr 2024 19:02:39 +0200 Subject: [PATCH 1/4] Fix time.After in joinserver --- lib/joinserver/joinserver.go | 32 ++++++++++++++------------------ 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/lib/joinserver/joinserver.go b/lib/joinserver/joinserver.go index 4347c50245069..09fcb270e9522 100644 --- a/lib/joinserver/joinserver.go +++ b/lib/joinserver/joinserver.go @@ -74,37 +74,35 @@ func NewJoinServiceGRPCServer(joinServiceClient joinServiceClient) *JoinServiceG // sts:GetCallerIdentity request with the challenge string. Finally, the signed // cluster certs are sent on the server stream. func (s *JoinServiceGRPCServer) RegisterUsingIAMMethod(srv proto.JoinService_RegisterUsingIAMMethodServer) error { - ctx := srv.Context() - // Enforce a timeout on the entire RPC so that misbehaving clients cannot // hold connections open indefinitely. - timeout := s.clock.After(iamJoinRequestTimeout) + timeout := s.clock.NewTimer(iamJoinRequestTimeout) + defer timeout.Stop() // The only way to cancel a blocked Send or Recv on the server side without // adding an interceptor to the entire gRPC service is to return from the // handler https://github.com/grpc/grpc-go/issues/465#issuecomment-179414474 errCh := make(chan error, 1) go func() { - errCh <- s.registerUsingIAMMethod(ctx, srv) + errCh <- s.registerUsingIAMMethod(srv) }() select { case err := <-errCh: // Completed before the deadline, return the error (may be nil). return trace.Wrap(err) - case <-timeout: + case <-timeout.Chan(): nodeAddr := "" - if peerInfo, ok := peer.FromContext(ctx); ok { + if peerInfo, ok := peer.FromContext(srv.Context()); ok { nodeAddr = peerInfo.Addr.String() } logrus.Warnf("IAM join attempt timed out, node at (%s) is misbehaving or did not close the connection after encountering an error.", nodeAddr) // Returning here should cancel any blocked Send or Recv operations. return trace.LimitExceeded("RegisterUsingIAMMethod timed out after %s, terminating the stream on the server", iamJoinRequestTimeout) - case <-ctx.Done(): - return trace.Wrap(ctx.Err()) } } -func (s *JoinServiceGRPCServer) registerUsingIAMMethod(ctx context.Context, srv proto.JoinService_RegisterUsingIAMMethodServer) error { +func (s *JoinServiceGRPCServer) registerUsingIAMMethod(srv proto.JoinService_RegisterUsingIAMMethodServer) error { + ctx := srv.Context() // Call RegisterUsingIAMMethod with a callback to get the challenge response // from the gRPC client. certs, err := s.joinServiceClient.RegisterUsingIAMMethod(ctx, func(challenge string) (*proto.RegisterUsingIAMMethodRequest, error) { @@ -146,33 +144,30 @@ func (s *JoinServiceGRPCServer) registerUsingIAMMethod(ctx context.Context, srv // attested data document with the challenge string. Finally, the signed // cluster certs are sent on the server stream. func (s *JoinServiceGRPCServer) RegisterUsingAzureMethod(srv proto.JoinService_RegisterUsingAzureMethodServer) error { - ctx := srv.Context() - // Enforce a timeout on the entire RPC so that misbehaving clients cannot // hold connections open indefinitely. - timeout := s.clock.After(azureJoinRequestTimeout) + timeout := s.clock.NewTimer(azureJoinRequestTimeout) + defer timeout.Stop() // The only way to cancel a blocked Send or Recv on the server side without // adding an interceptor to the entire gRPC service is to return from the // handler https://github.com/grpc/grpc-go/issues/465#issuecomment-179414474 errCh := make(chan error, 1) go func() { - errCh <- s.registerUsingAzureMethod(ctx, srv) + errCh <- s.registerUsingAzureMethod(srv) }() select { case err := <-errCh: // Completed before the deadline, return the error (may be nil). return trace.Wrap(err) - case <-timeout: + case <-timeout.Chan(): nodeAddr := "" - if peerInfo, ok := peer.FromContext(ctx); ok { + if peerInfo, ok := peer.FromContext(srv.Context()); ok { nodeAddr = peerInfo.Addr.String() } logrus.Warnf("Azure join attempt timed out, node at (%s) is misbehaving or did not close the connection after encountering an error.", nodeAddr) // Returning here should cancel any blocked Send or Recv operations. return trace.LimitExceeded("RegisterUsingAzureMethod timed out after %s, terminating the stream on the server", azureJoinRequestTimeout) - case <-ctx.Done(): - return trace.Wrap(ctx.Err()) } } @@ -198,7 +193,8 @@ func setClientRemoteAddr(ctx context.Context, req *types.RegisterUsingTokenReque return nil } -func (s *JoinServiceGRPCServer) registerUsingAzureMethod(ctx context.Context, srv proto.JoinService_RegisterUsingAzureMethodServer) error { +func (s *JoinServiceGRPCServer) registerUsingAzureMethod(srv proto.JoinService_RegisterUsingAzureMethodServer) error { + ctx := srv.Context() certs, err := s.joinServiceClient.RegisterUsingAzureMethod(ctx, func(challenge string) (*proto.RegisterUsingAzureMethodRequest, error) { err := srv.Send(&proto.RegisterUsingAzureMethodResponse{ Challenge: challenge, From a286d94877749a797b140d04e4398f3eaefdb008 Mon Sep 17 00:00:00 2001 From: Edoardo Spadolini Date: Thu, 18 Apr 2024 21:41:13 +0200 Subject: [PATCH 2/4] Log failed cluster joins --- lib/auth/join.go | 24 +++++++++++++++++++++--- lib/auth/join_iam.go | 22 +++++++++++++++++++++- 2 files changed, 42 insertions(+), 4 deletions(-) diff --git a/lib/auth/join.go b/lib/auth/join.go index 28505456c45e9..b5348fbfa9e19 100644 --- a/lib/auth/join.go +++ b/lib/auth/join.go @@ -28,6 +28,7 @@ import ( "strings" "github.com/gravitational/trace" + "github.com/sirupsen/logrus" "google.golang.org/grpc/peer" "github.com/gravitational/teleport/api/client/proto" @@ -125,14 +126,31 @@ func setRemoteAddrFromContext(ctx context.Context, req *types.RegisterUsingToken // // If the token includes a specific join method, the rules for that join method // will be checked. -func (a *Server) RegisterUsingToken(ctx context.Context, req *types.RegisterUsingTokenRequest) (*proto.Certs, error) { - log.Infof("Node %q [%v] is trying to join with role: %v.", req.NodeName, req.HostID, req.Role) +func (a *Server) RegisterUsingToken(ctx context.Context, req *types.RegisterUsingTokenRequest) (_ *proto.Certs, err error) { if err := req.CheckAndSetDefaults(); err != nil { return nil, trace.Wrap(err) } + method := a.tokenJoinMethod(ctx, req.Token) + defer func() { + if err == nil { + return + } + level := logrus.WarnLevel + if trace.IsAccessDenied(err) { + level = logrus.DebugLevel + } + log.WithFields(logrus.Fields{ + "node_name": req.NodeName, + "host_id": req.HostID, + "role": req.Role, + "method": method, + logrus.ErrorKey: err, + }).Log(level, "Agent has failed to join the cluster.") + }() + var joinAttributeSrc joinAttributeSourcer - switch method := a.tokenJoinMethod(ctx, req.Token); method { + switch method { case types.JoinMethodEC2: if err := a.checkEC2JoinRequest(ctx, req); err != nil { return nil, trace.Wrap(err) diff --git a/lib/auth/join_iam.go b/lib/auth/join_iam.go index c12b38b154b9e..93a9837d40bbd 100644 --- a/lib/auth/join_iam.go +++ b/lib/auth/join_iam.go @@ -35,6 +35,7 @@ import ( "github.com/aws/aws-sdk-go/service/sts" "github.com/coreos/go-semver/semver" "github.com/gravitational/trace" + "github.com/sirupsen/logrus" "github.com/gravitational/teleport" "github.com/gravitational/teleport/api/client" @@ -339,7 +340,7 @@ func withFips(fips bool) iamRegisterOption { // The caller must provide a ChallengeResponseFunc which returns a // *types.RegisterUsingTokenRequest with a signed sts:GetCallerIdentity request // including the challenge as a signed header. -func (a *Server) RegisterUsingIAMMethod(ctx context.Context, challengeResponse client.RegisterIAMChallengeResponseFunc, opts ...iamRegisterOption) (*proto.Certs, error) { +func (a *Server) RegisterUsingIAMMethod(ctx context.Context, challengeResponse client.RegisterIAMChallengeResponseFunc, opts ...iamRegisterOption) (_ *proto.Certs, err error) { cfg := defaultIAMRegisterConfig(a.fips) for _, opt := range opts { opt(cfg) @@ -359,11 +360,30 @@ func (a *Server) RegisterUsingIAMMethod(ctx context.Context, challengeResponse c return nil, trace.Wrap(err) } + var method types.JoinMethod = "unknown" + defer func() { + if err == nil { + return + } + level := logrus.WarnLevel + if trace.IsAccessDenied(err) { + level = logrus.DebugLevel + } + log.WithFields(logrus.Fields{ + "node_name": req.RegisterUsingTokenRequest.NodeName, + "host_id": req.RegisterUsingTokenRequest.HostID, + "role": req.RegisterUsingTokenRequest.Role, + "method": method, + logrus.ErrorKey: err, + }).Log(level, "Agent has failed to join the cluster.") + }() + // perform common token checks provisionToken, err := a.checkTokenJoinRequestCommon(ctx, req.RegisterUsingTokenRequest) if err != nil { return nil, trace.Wrap(err) } + method = provisionToken.GetJoinMethod() // check that the GetCallerIdentity request is valid and matches the token if err := a.checkIAMRequest(ctx, challenge, req, cfg); err != nil { From 864d0adbf8685bf66bac9c1fe835467c89339312 Mon Sep 17 00:00:00 2001 From: Edoardo Spadolini Date: Fri, 19 Apr 2024 21:07:51 +0200 Subject: [PATCH 3/4] Instrument teleport instance connector breakers --- api/breaker/breaker.go | 30 +++++++++++++----- api/breaker/breaker_test.go | 4 +-- lib/service/breaker/breaker.go | 56 ++++++++++++++++++++++++++++++++++ lib/service/connect.go | 14 +++++---- 4 files changed, 88 insertions(+), 16 deletions(-) create mode 100644 lib/service/breaker/breaker.go diff --git a/api/breaker/breaker.go b/api/breaker/breaker.go index ec5d464b5c7a7..5b58ade0df553 100644 --- a/api/breaker/breaker.go +++ b/api/breaker/breaker.go @@ -129,6 +129,9 @@ type Config struct { OnTripped func() // OnStandby will be called when the CircuitBreaker returns to the StateStandby state OnStandBy func() + // OnExecute will be called once for each execution, and given the result + // and the current state of the breaker state + OnExecute func(success bool, state State) // IsSuccessful is used by the CircuitBreaker to determine if the executed function was successful or not IsSuccessful func(v interface{}, err error) bool // TrippedErrorMessage is an optional message to use as the error message when the CircuitBreaker @@ -136,6 +139,12 @@ type Config struct { TrippedErrorMessage string } +// Clone returns a clone of the Config. +func (c *Config) Clone() Config { + // the current Config can just be copied without issues + return *c +} + // TripFn determines if the CircuitBreaker should be tripped based // on the state of the provided Metrics. A return value of true will // cause the CircuitBreaker to transition into the StateTripped state @@ -323,8 +332,9 @@ func (c *CircuitBreaker) beforeExecution() (uint64, error) { generation, state := c.currentState(now) - switch { - case state == StateTripped: + if state == StateTripped { + c.cfg.OnExecute(false, StateTripped) + if c.cfg.TrippedErrorMessage != "" { return generation, trace.ConnectionProblem(nil, c.cfg.TrippedErrorMessage) } @@ -350,19 +360,21 @@ func (c *CircuitBreaker) afterExecution(prior uint64, v interface{}, err error) } if c.cfg.IsSuccessful(v, err) { - c.success(state, now) + c.successLocked(state, now) } else { - c.failure(state, now) + c.failureLocked(state, now) } } -// success tallies a successful execution and migrates to StateStandby +// successLocked tallies a successful execution and migrates to StateStandby // if in another state and criteria has been met to transition -func (c *CircuitBreaker) success(state State, t time.Time) { +func (c *CircuitBreaker) successLocked(state State, t time.Time) { switch state { case StateStandby: + c.cfg.OnExecute(true, StateStandby) c.metrics.success() case StateRecovering: + c.cfg.OnExecute(true, StateRecovering) c.metrics.success() if c.metrics.ConsecutiveSuccesses >= c.cfg.RecoveryLimit { c.setState(StateStandby, t) @@ -371,17 +383,19 @@ func (c *CircuitBreaker) success(state State, t time.Time) { } } -// failure tallies a failed execution and migrate to StateTripped +// failureLocked tallies a failed execution and migrate to StateTripped // if in another state and criteria has been met to transition -func (c *CircuitBreaker) failure(state State, t time.Time) { +func (c *CircuitBreaker) failureLocked(state State, t time.Time) { c.metrics.failure() switch state { case StateRecovering: + c.cfg.OnExecute(false, StateRecovering) if c.cfg.Recover(c.metrics) { c.setState(StateTripped, t) } case StateStandby: + c.cfg.OnExecute(false, StateStandby) if c.cfg.Trip(c.metrics) { c.setState(StateTripped, t) go c.cfg.OnTripped() diff --git a/api/breaker/breaker_test.go b/api/breaker/breaker_test.go index 00803dc2f5f32..fa607330fea18 100644 --- a/api/breaker/breaker_test.go +++ b/api/breaker/breaker_test.go @@ -259,7 +259,7 @@ func TestCircuitBreaker_success(t *testing.T) { cb.state = tt.initialState generation, state := cb.currentState(clock.Now()) - cb.success(tt.successState, clock.Now()) + cb.successLocked(tt.successState, clock.Now()) require.Equal(t, tt.expectedState, cb.state) if tt.expectedState != state { require.NotEqual(t, generation, cb.generation) @@ -341,7 +341,7 @@ func TestCircuitBreaker_failure(t *testing.T) { cb.state = tt.initialState generation, state := cb.currentState(clock.Now()) - cb.failure(tt.failureState, clock.Now()) + cb.failureLocked(tt.failureState, clock.Now()) require.Equal(t, tt.expectedState, cb.state) if tt.expectedState != state { require.NotEqual(t, generation, cb.generation) diff --git a/lib/service/breaker/breaker.go b/lib/service/breaker/breaker.go new file mode 100644 index 0000000000000..d1955f4d83ab6 --- /dev/null +++ b/lib/service/breaker/breaker.go @@ -0,0 +1,56 @@ +// Teleport +// Copyright (C) 2024 Gravitational, Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package breaker + +import ( + "strconv" + "sync" + + "github.com/prometheus/client_golang/prometheus" + + "github.com/gravitational/teleport" + "github.com/gravitational/teleport/api/breaker" + "github.com/gravitational/teleport/api/types" +) + +var connectorExecutions = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: teleport.MetricNamespace, + Subsystem: "breaker", + Name: "connector_executions_total", + Help: "Client requests per system role, state of the breaker and success as interpreted by the breaker.", +}, []string{"role", "state", "success"}) + +var registerOnce sync.Once + +func ensureRegistered() { + registerOnce.Do(func() { + prometheus.MustRegister(connectorExecutions) + }) +} + +// InstrumentBreakerForConnector returns a copy of a [breaker.Config] that +// counts client "executions" (i.e. requests or streams) that go through the +// breaker, attributing the count to the given system role. +func InstrumentBreakerForConnector(role types.SystemRole, cfg breaker.Config) breaker.Config { + ensureRegistered() + + cfg = cfg.Clone() + cfg.OnExecute = func(success bool, state breaker.State) { + connectorExecutions.WithLabelValues(role.String(), state.String(), strconv.FormatBool(success)).Inc() + } + return cfg +} diff --git a/lib/service/connect.go b/lib/service/connect.go index aedd030d5c344..6a07d53507fbe 100644 --- a/lib/service/connect.go +++ b/lib/service/connect.go @@ -36,6 +36,7 @@ import ( "google.golang.org/grpc" "github.com/gravitational/teleport" + "github.com/gravitational/teleport/api/breaker" apiclient "github.com/gravitational/teleport/api/client" "github.com/gravitational/teleport/api/client/proto" apidefaults "github.com/gravitational/teleport/api/defaults" @@ -49,6 +50,7 @@ import ( "github.com/gravitational/teleport/lib/observability/metrics" "github.com/gravitational/teleport/lib/openssh" "github.com/gravitational/teleport/lib/reversetunnelclient" + servicebreaker "github.com/gravitational/teleport/lib/service/breaker" "github.com/gravitational/teleport/lib/service/servicecfg" "github.com/gravitational/teleport/lib/tlsca" "github.com/gravitational/teleport/lib/utils" @@ -453,7 +455,7 @@ func (process *TeleportProcess) firstTimeConnect(role types.SystemRole) (*Connec GetHostCredentials: client.HostCredentials, Clock: process.Clock, JoinMethod: process.Config.JoinMethod, - CircuitBreakerConfig: process.Config.CircuitBreakerConfig, + CircuitBreakerConfig: breaker.NoopBreakerConfig(), FIPS: process.Config.FIPS, Insecure: lib.IsInsecureDevMode(), } @@ -1093,7 +1095,7 @@ func (process *TeleportProcess) newClient(identity *auth.Identity) (*auth.Client logger.DebugContext(process.ExitContext(), "Attempting to discover reverse tunnel address.") logger.DebugContext(process.ExitContext(), "Attempting to connect to Auth Server through tunnel.") - tunnelClient, pingResponse, err := process.newClientThroughTunnel(tlsConfig, sshClientConfig) + tunnelClient, pingResponse, err := process.newClientThroughTunnel(tlsConfig, sshClientConfig, identity.ID.Role) if err != nil { process.logger.ErrorContext(process.ExitContext(), "Node failed to establish connection to Teleport Proxy. We have tried the following endpoints:") process.logger.ErrorContext(process.ExitContext(), "- connecting to auth server directly", "error", directErr) @@ -1118,7 +1120,7 @@ func (process *TeleportProcess) newClient(identity *auth.Identity) (*auth.Client logger := process.logger.With("proxy-server", proxyServer.String()) logger.DebugContext(process.ExitContext(), "Attempting to connect to Auth Server through tunnel.") - tunnelClient, pingResponse, err := process.newClientThroughTunnel(tlsConfig, sshClientConfig) + tunnelClient, pingResponse, err := process.newClientThroughTunnel(tlsConfig, sshClientConfig, identity.ID.Role) if err != nil { return nil, nil, trace.Errorf("Failed to connect to Proxy Server through tunnel: %v", err) } @@ -1137,7 +1139,7 @@ func (process *TeleportProcess) newClient(identity *auth.Identity) (*auth.Client return nil, nil, trace.NotImplemented("could not find connection strategy for config version %s", process.Config.Version) } -func (process *TeleportProcess) newClientThroughTunnel(tlsConfig *tls.Config, sshConfig *ssh.ClientConfig) (*auth.Client, *proto.PingResponse, error) { +func (process *TeleportProcess) newClientThroughTunnel(tlsConfig *tls.Config, sshConfig *ssh.ClientConfig, role types.SystemRole) (*auth.Client, *proto.PingResponse, error) { dialer, err := reversetunnelclient.NewTunnelAuthDialer(reversetunnelclient.TunnelAuthDialerConfig{ Resolver: process.resolver, ClientConfig: sshConfig, @@ -1154,7 +1156,7 @@ func (process *TeleportProcess) newClientThroughTunnel(tlsConfig *tls.Config, ss Credentials: []apiclient.Credentials{ apiclient.LoadTLS(tlsConfig), }, - CircuitBreakerConfig: process.Config.CircuitBreakerConfig, + CircuitBreakerConfig: servicebreaker.InstrumentBreakerForConnector(role, process.Config.CircuitBreakerConfig), DialTimeout: process.Config.Testing.ClientTimeout, }) if err != nil { @@ -1201,7 +1203,7 @@ func (process *TeleportProcess) newClientDirect(authServers []utils.NetAddr, tls apiclient.LoadTLS(tlsConfig), }, DialTimeout: process.Config.Testing.ClientTimeout, - CircuitBreakerConfig: process.Config.CircuitBreakerConfig, + CircuitBreakerConfig: servicebreaker.InstrumentBreakerForConnector(role, process.Config.CircuitBreakerConfig), DialOpts: dialOpts, }, cltParams...) if err != nil { From a8e86689ede5773b04a39da65ed3cab18f46eee4 Mon Sep 17 00:00:00 2001 From: Edoardo Spadolini Date: Fri, 19 Apr 2024 22:14:18 +0200 Subject: [PATCH 4/4] Address comments, fix nil panic --- api/breaker/breaker.go | 15 ++++++++++++--- lib/service/connect.go | 2 ++ 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/api/breaker/breaker.go b/api/breaker/breaker.go index 5b58ade0df553..a7cc26fe47e71 100644 --- a/api/breaker/breaker.go +++ b/api/breaker/breaker.go @@ -125,12 +125,17 @@ type Config struct { // StateStandby to StateTripped. This is required to be supplied, failure to do so will result in an error // creating the CircuitBreaker. Trip TripFn - // OnTripped will be called when the CircuitBreaker enters the StateTripped state + // OnTripped will be called when the CircuitBreaker enters the StateTripped + // state; this callback is called while holding a lock, so it should return + // quickly. OnTripped func() - // OnStandby will be called when the CircuitBreaker returns to the StateStandby state + // OnStandby will be called when the CircuitBreaker returns to the + // StateStandby state; this callback is called while holding a lock, so it + // should return quickly. OnStandBy func() // OnExecute will be called once for each execution, and given the result - // and the current state of the breaker state + // and the current state of the breaker state; this callback is called while + // holding a lock, so it should return quickly. OnExecute func(success bool, state State) // IsSuccessful is used by the CircuitBreaker to determine if the executed function was successful or not IsSuccessful func(v interface{}, err error) bool @@ -262,6 +267,10 @@ func (c *Config) CheckAndSetDefaults() error { c.OnStandBy = func() {} } + if c.OnExecute == nil { + c.OnExecute = func(bool, State) {} + } + if c.IsSuccessful == nil { c.IsSuccessful = NonNilErrorIsSuccess } diff --git a/lib/service/connect.go b/lib/service/connect.go index 6a07d53507fbe..d63696762d322 100644 --- a/lib/service/connect.go +++ b/lib/service/connect.go @@ -455,6 +455,8 @@ func (process *TeleportProcess) firstTimeConnect(role types.SystemRole) (*Connec GetHostCredentials: client.HostCredentials, Clock: process.Clock, JoinMethod: process.Config.JoinMethod, + // this circuit breaker is used for a client that only does a few + // requests before closing CircuitBreakerConfig: breaker.NoopBreakerConfig(), FIPS: process.Config.FIPS, Insecure: lib.IsInsecureDevMode(),