diff --git a/lib/kube/proxy/errors.go b/lib/kube/proxy/errors.go new file mode 100644 index 0000000000000..8258eb41a5da5 --- /dev/null +++ b/lib/kube/proxy/errors.go @@ -0,0 +1,62 @@ +/* + * Teleport + * Copyright (C) 2024 Gravitational, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package proxy + +import ( + "net/http" + + kubeerrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +const ( + // kubernetesSessionTerminatedByUser is the message that is sent to the + // client when the session is terminated by the moderator. + kubernetesSessionTerminatedByModerator = "Session terminated by moderator." + sessionTerminatedByModeratorReason = metav1.StatusReason("SessionTerminatedByModerator") +) + +var sessionTerminatedByModeratorErr = &kubeerrors.StatusError{ + ErrStatus: metav1.Status{ + Status: metav1.StatusFailure, + Code: http.StatusUnauthorized, + Reason: sessionTerminatedByModeratorReason, + Message: kubernetesSessionTerminatedByModerator, + Details: &metav1.StatusDetails{ + Causes: []metav1.StatusCause{ + { + Type: metav1.CauseTypeForbidden, + Message: kubernetesSessionTerminatedByModerator, + }, + }, + }, + }, +} + +// isSessionTerminatedError returns true if the error is a session terminated error. +// This is required because StreamWithContext wraps the error into a new error string +// and we lose the type information to forward the error to the client. +func isSessionTerminatedError(err error) bool { + if err == nil { + return false + } + // This check is required because the error is wrapped into a new error string + // by StreamWithContext and we lose the type information. + return err.Error() == kubernetesSessionTerminatedByModerator +} diff --git a/lib/kube/proxy/forwarder.go b/lib/kube/proxy/forwarder.go index c0eaa49b8b4af..cfaac2ff212c9 100644 --- a/lib/kube/proxy/forwarder.go +++ b/lib/kube/proxy/forwarder.go @@ -1206,27 +1206,34 @@ func (f *Forwarder) join(ctx *authContext, w http.ResponseWriter, req *http.Requ client := &websocketClientStreams{stream} party := newParty(*ctx, stream.Mode, client) + defer party.CloseConnection() err = session.join(party, true /* emitSessionJoinEvent */) if err != nil { return trace.Wrap(err) } closeC := make(chan struct{}) + var wg sync.WaitGroup + wg.Add(1) go func() { - defer close(closeC) + defer wg.Done() select { case <-stream.Done(): - party.InformClose() - case <-party.closeC: + party.InformClose(trace.BadParameter("websocket connection closed")) + case <-closeC: return } }() - <-party.closeC + + err = <-party.closeC + close(closeC) + if _, err := session.leave(party.ID); err != nil { f.log.WithError(err).Debugf("Participant %q was unable to leave session %s", party.ID, session.id) } - <-closeC - return nil + wg.Wait() + + return trace.Wrap(err) }(); err != nil { writeErr := ws.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error()), time.Now().Add(time.Second*10)) if writeErr != nil { @@ -1433,12 +1440,10 @@ func (f *Forwarder) acquireConnectionLock(ctx context.Context, user string, role } // execNonInteractive handles all exec sessions without a TTY. -func (f *Forwarder) execNonInteractive(ctx *authContext, w http.ResponseWriter, req *http.Request, p httprouter.Params, request remoteCommandRequest, proxy *remoteCommandProxy, sess *clusterSession) (resp any, err error) { - defer proxy.Close() - +func (f *Forwarder) execNonInteractive(ctx *authContext, w http.ResponseWriter, req *http.Request, p httprouter.Params, request remoteCommandRequest, proxy *remoteCommandProxy, sess *clusterSession) error { roles, err := getRolesByName(f, ctx.Context.Identity.GetIdentity().Groups) if err != nil { - return nil, trace.Wrap(err) + return trace.Wrap(err) } var policySets []*types.SessionTrackerPolicySet @@ -1450,10 +1455,10 @@ func (f *Forwarder) execNonInteractive(ctx *authContext, w http.ResponseWriter, authorizer := auth.NewSessionAccessEvaluator(policySets, types.KubernetesSessionKind, ctx.User.GetName()) canStart, _, err := authorizer.FulfilledFor(nil) if err != nil { - return nil, trace.Wrap(err) + return trace.Wrap(err) } if !canStart { - return nil, trace.AccessDenied("insufficient permissions to launch non-interactive session") + return trace.AccessDenied("insufficient permissions to launch non-interactive session") } eventPodMeta := request.eventPodMeta(request.context, sess.kubeAPICreds) @@ -1494,6 +1499,7 @@ func (f *Forwarder) execNonInteractive(ctx *authContext, w http.ResponseWriter, if err := f.cfg.Emitter.EmitAuditEvent(f.ctx, sessionStartEvent); err != nil { f.log.WithError(err).Warn("Failed to emit event.") + return trace.Wrap(err) } execEvent := &apievents.Exec{ @@ -1547,29 +1553,22 @@ func (f *Forwarder) execNonInteractive(ctx *authContext, w http.ResponseWriter, execEvent.Error, execEvent.ExitCode = exitCode(err) f.log.WithError(err).Warning("Failed creating executor.") - return nil, trace.Wrap(err) + return trace.Wrap(err) } streamOptions := proxy.options() err = executor.StreamWithContext(req.Context(), streamOptions) - // send the status back to the client when forwarding mode is enabled - // sendStatus sends a payload even if the error is nil to make sure the client - // receives the status and can close the connection. - if sendErr := proxy.sendStatus(err); sendErr != nil { - f.log.WithError(sendErr).Warning("Failed to send status. Exec command was aborted by client.") - } if err != nil { execEvent.Code = events.ExecFailureCode execEvent.Error, execEvent.ExitCode = exitCode(err) f.log.WithError(err).Warning("Executor failed while streaming.") - // do not return the error otherwise the fwd.withAuth interceptor will try to write it into a hijacked connection - return nil, nil + return trace.Wrap(err) } execEvent.Code = events.ExecCode - return nil, nil + return nil } func exitCode(err error) (errMsg, code string) { @@ -1665,80 +1664,56 @@ func (f *Forwarder) exec(authCtx *authContext, w http.ResponseWriter, req *http. return nil, trace.Wrap(err) } - proxy, err := createRemoteCommandProxy(request) - if err != nil { - return nil, trace.Wrap(err) - } - // proxy.Close closes the underlying connection and releases the resources. - defer proxy.Close() - if sess.noAuditEvents { - // We're forwarding this to another kubernetes_service instance, let it handle multiplexing. - return f.remoteExec(authCtx, w, req, p, sess, request, proxy) - } + return upgradeRequestToRemoteCommandProxy(request, + func(proxy *remoteCommandProxy) error { + if sess.noAuditEvents { + // We're forwarding this to another kubernetes_service instance, let it handle multiplexing. + return f.remoteExec(authCtx, w, req, p, sess, request, proxy) + } - if !request.tty { - resp, err = f.execNonInteractive(authCtx, w, req, p, request, proxy, sess) - if err != nil { - // will hang waiting for the response. - proxy.sendStatus(err) - } - return nil, nil - } + if !request.tty { + return f.execNonInteractive(authCtx, w, req, p, request, proxy, sess) + } - client := newKubeProxyClientStreams(proxy) - party := newParty(*authCtx, types.SessionPeerMode, client) - session, err := newSession(*authCtx, f, req, p, party, sess) - if err != nil { - // This error must be forwarded to SPDY error stream, otherwise the client - // will hang waiting for the response. - proxy.sendStatus(err) - return nil, nil - } + client := newKubeProxyClientStreams(proxy) + party := newParty(*authCtx, types.SessionPeerMode, client) + session, err := newSession(*authCtx, f, req, p, party, sess) + if err != nil { + return trace.Wrap(err) + } - f.setSession(session.id, session) - // When Teleport attaches the original session creator terminal streams to the - // session, we don't wan't to emmit session.join event since it won't be required. - err = session.join(party, false /* emitSessionJoinEvent */) - if err != nil { - // This error must be forwarded to SPDY error stream, otherwise the client - // will hang waiting for the response. - proxy.sendStatus(err) - return nil, nil - } + f.setSession(session.id, session) + // When Teleport attaches the original session creator terminal streams to the + // session, we don't want to emit session.join event since it won't be required. + if err = session.join(party, false /* emitSessionJoinEvent */); err != nil { + return trace.Wrap(err) + } - <-party.closeC + err = <-party.closeC - if _, err := session.leave(party.ID); err != nil { - f.log.WithError(err).Debugf("Participant %q was unable to leave session %s", party.ID, session.id) - } + if _, errLeave := session.leave(party.ID); errLeave != nil { + f.log.WithError(errLeave).Debugf("Participant %q was unable to leave session %s", party.ID, session.id) + } - return nil, nil + return trace.Wrap(err) + }, + ) } // remoteExec forwards an exec request to a remote cluster. -func (f *Forwarder) remoteExec(ctx *authContext, w http.ResponseWriter, req *http.Request, p httprouter.Params, sess *clusterSession, request remoteCommandRequest, proxy *remoteCommandProxy) (resp any, err error) { - defer proxy.Close() - +func (f *Forwarder) remoteExec(ctx *authContext, w http.ResponseWriter, req *http.Request, p httprouter.Params, sess *clusterSession, request remoteCommandRequest, proxy *remoteCommandProxy) error { executor, err := f.getExecutor(sess, req) if err != nil { f.log.WithError(err).Warning("Failed creating executor.") - return nil, trace.Wrap(err) + return trace.Wrap(err) } streamOptions := proxy.options() err = executor.StreamWithContext(req.Context(), streamOptions) - // send the status back to the client when forwarding mode is enabled - // sendStatus sends a payload even if the error is nil to make sure the client - // receives the status and can close the connection. - if sendErr := proxy.sendStatus(err); sendErr != nil { - f.log.WithError(sendErr).Warning("Failed to send status. Exec command was aborted by client.") - } if err != nil { f.log.WithError(err).Warning("Executor failed while streaming.") - // do not return the error otherwise the fwd.withAuth interceptor will try to write it into a hijacked connection - return nil, nil } - return nil, nil + return trace.Wrap(err) } // portForward starts port forwarding to the remote cluster diff --git a/lib/kube/proxy/moderated_sessions_test.go b/lib/kube/proxy/moderated_sessions_test.go index e0a1a4c6c973d..d16f37291a74c 100644 --- a/lib/kube/proxy/moderated_sessions_test.go +++ b/lib/kube/proxy/moderated_sessions_test.go @@ -459,6 +459,9 @@ func TestModeratedSessions(t *testing.T) { if errors.Is(err, io.ErrClosedPipe) { return nil } + if tt.args.moderatorForcedClose && isSessionTerminatedError(err) { + return nil + } return trace.Wrap(err) }) // wait for every go-routine to finish without errors returned. diff --git a/lib/kube/proxy/remotecommand.go b/lib/kube/proxy/remotecommand.go index 1f64f40fe45aa..4857e9e469910 100644 --- a/lib/kube/proxy/remotecommand.go +++ b/lib/kube/proxy/remotecommand.go @@ -19,6 +19,7 @@ package proxy import ( "context" "encoding/json" + "errors" "fmt" "io" "net/http" @@ -29,6 +30,7 @@ import ( log "github.com/sirupsen/logrus" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/httpstream" spdystream "k8s.io/apimachinery/pkg/util/httpstream/spdy" "k8s.io/apimachinery/pkg/util/httpstream/wsstream" @@ -94,7 +96,7 @@ func (req remoteCommandRequest) eventPodMeta(ctx context.Context, creds kubeCred return meta } -func createRemoteCommandProxy(req remoteCommandRequest) (*remoteCommandProxy, error) { +func upgradeRequestToRemoteCommandProxy(req remoteCommandRequest, exec func(*remoteCommandProxy) error) (any, error) { var ( proxy *remoteCommandProxy err error @@ -107,11 +109,21 @@ func createRemoteCommandProxy(req remoteCommandRequest) (*remoteCommandProxy, er if err != nil { return nil, trace.Wrap(err) } + defer proxy.Close() + if proxy.resizeStream != nil { proxy.resizeQueue = newTermQueue(req.context, req.onResize) go proxy.resizeQueue.handleResizeEvents(proxy.resizeStream) } - return proxy, nil + err = exec(proxy) + if err := proxy.sendStatus(err); err != nil { + log.Warningf("Failed to send status: %v", err) + } + // return rsp=nil, err=nil to indicate that the request has been handled + // by the hijacked connection. If we return an error, the request will be + // considered unhandled and the middleware will try to write the error + // or response into the hicjacked connection, which will fail. + return nil /* rsp */, nil /* err */ } func createSPDYStreams(req remoteCommandRequest) (*remoteCommandProxy, error) { @@ -228,11 +240,12 @@ func (s *remoteCommandProxy) sendStatus(err error) error { Status: metav1.StatusSuccess, }}) } - if statusErr, ok := err.(*apierrors.StatusError); ok { + var statusErr *apierrors.StatusError + if errors.As(err, &statusErr) { return s.writeStatus(statusErr) } - - if exitErr, ok := err.(utilexec.ExitError); ok && exitErr.Exited() { + var exitErr utilexec.ExitError + if errors.As(err, &exitErr) && exitErr.Exited() { rc := exitErr.ExitStatus() return s.writeStatus(&apierrors.StatusError{ErrStatus: metav1.Status{ Status: metav1.StatusFailure, @@ -262,6 +275,8 @@ func (s *remoteCommandProxy) sendStatus(err error) error { Message: err.Error(), }, }) + } else if isSessionTerminatedError(err) { + return s.writeStatus(sessionTerminatedByModeratorErr) } err = trace.BadParameter("error executing command in container: %v", err) @@ -405,11 +420,12 @@ func waitStreamReply(ctx context.Context, replySent <-chan struct{}, notify chan // as json in the error channel. func v4WriteStatusFunc(stream io.Writer) func(status *apierrors.StatusError) error { return func(status *apierrors.StatusError) error { - bs, err := json.Marshal(status.Status()) + st := status.Status() + data, err := runtime.Encode(globalKubeCodecs.LegacyCodec(), &st) if err != nil { - return err + return trace.Wrap(err) } - _, err = stream.Write(bs) + _, err = stream.Write(data) return err } } diff --git a/lib/kube/proxy/sess.go b/lib/kube/proxy/sess.go index f060e45ab858a..bab1fcefb5086 100644 --- a/lib/kube/proxy/sess.go +++ b/lib/kube/proxy/sess.go @@ -180,7 +180,7 @@ func (p *kubeProxyClientStreams) Close() error { p.sizeQueue.Close() } p.wg.Wait() - return trace.Wrap(p.proxy.Close()) + return nil } // multiResizeQueue is a merged queue of multiple terminal size queues. @@ -272,7 +272,7 @@ type party struct { ID uuid.UUID Client remoteClient Mode types.SessionParticipantMode - closeC chan struct{} + closeC chan error closeOnce sync.Once } @@ -283,13 +283,14 @@ func newParty(ctx authContext, mode types.SessionParticipantMode, client remoteC ID: uuid.New(), Client: client, Mode: mode, - closeC: make(chan struct{}), + closeC: make(chan error, 1), } } // InformClose informs the party that he must leave the session. -func (p *party) InformClose() { +func (p *party) InformClose(err error) { p.closeOnce.Do(func() { + p.closeC <- err close(p.closeC) }) } @@ -372,6 +373,8 @@ type session struct { // decremented when he leaves - it waits until the session leave events // are emitted for every party before returning. partiesWg sync.WaitGroup + // terminationErr is set when the session is terminated. + terminationErr error } // newSession creates a new session in pending mode. @@ -612,13 +615,24 @@ func (s *session) launch() error { s.io.On() if err = executor.StreamWithContext(s.streamContext, options); err != nil { + s.setTerminationErr(err) s.reportErrorToSessionRecorder(err) s.log.WithError(err).Warning("Executor failed while streaming.") + return trace.Wrap(err) } return nil } +func (s *session) setTerminationErr(err error) { + s.mu.Lock() + defer s.mu.Unlock() + if s.terminationErr != nil { + return + } + s.terminationErr = err +} + // reportErrorToSessionRecorder reports the error to the session recorder // if it is set. func (s *session) reportErrorToSessionRecorder(err error) { @@ -626,7 +640,7 @@ func (s *session) reportErrorToSessionRecorder(err error) { return } if s.recorder != nil { - fmt.Fprintf(s.recorder, "\n---\nSession exited with error: %v\n", err) + fmt.Fprintf(s.recorder, "\r\n---\r\nSession exited with error: %v\r\n", err) } } @@ -748,12 +762,6 @@ func (s *session) lockedSetupLaunch(request *remoteCommandRequest, q url.Values, s.mu.Lock() defer s.mu.Unlock() - for _, party := range s.parties { - if err := party.Client.sendStatus(errExec); err != nil { - s.forwarder.log.WithError(err).Warning("Failed to send status. Exec command was aborted by client.") - } - } - serverMetadata := apievents.ServerMetadata{ ServerID: s.forwarder.cfg.HostID, ServerNamespace: s.forwarder.cfg.Namespace, @@ -913,6 +921,7 @@ func (s *session) join(p *party, emitJoinEvent bool) error { c := p.Client.forceTerminate() select { case <-c: + s.setTerminationErr(sessionTerminatedByModeratorErr) go func() { s.log.Debugf("Received force termination request") err := s.Close() @@ -1055,14 +1064,7 @@ func (s *session) unlockedLeave(id uuid.UUID) (bool, error) { errs = append(errs, trace.Wrap(err)) } - party.InformClose() - defer func() { - if err := party.Client.Close(); err != nil { - s.log.WithError(err).Error("Error closing party") - errs = append(errs, trace.Wrap(err)) - } - }() - + party.InformClose(s.terminationErr) if len(s.parties) == 0 || id == s.initiator { go func() { // Currently, Teleport closes the session when the initiator exits. @@ -1163,9 +1165,10 @@ func (s *session) Close() error { s.log.WithError(err).Debug("Failed to close session tracker") } s.mu.Lock() + terminationErr := s.terminationErr // terminate all active parties in the session. for _, party := range s.parties { - party.InformClose() + party.InformClose(terminationErr) } recorder := s.recorder s.mu.Unlock()