Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions lib/kube/proxy/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Teleport
* Copyright (C) 2024 Gravitational, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package proxy

import (
"net/http"

kubeerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
// kubernetesSessionTerminatedByUser is the message that is sent to the
// client when the session is terminated by the moderator.
kubernetesSessionTerminatedByModerator = "Session terminated by moderator."
sessionTerminatedByModeratorReason = metav1.StatusReason("SessionTerminatedByModerator")
)

var sessionTerminatedByModeratorErr = &kubeerrors.StatusError{
ErrStatus: metav1.Status{
Status: metav1.StatusFailure,
Code: http.StatusUnauthorized,
Reason: sessionTerminatedByModeratorReason,
Message: kubernetesSessionTerminatedByModerator,
Details: &metav1.StatusDetails{
Causes: []metav1.StatusCause{
{
Type: metav1.CauseTypeForbidden,
Message: kubernetesSessionTerminatedByModerator,
},
},
},
},
}

// isSessionTerminatedError returns true if the error is a session terminated error.
// This is required because StreamWithContext wraps the error into a new error string
// and we lose the type information to forward the error to the client.
func isSessionTerminatedError(err error) bool {
if err == nil {
return false
}
// This check is required because the error is wrapped into a new error string
// by StreamWithContext and we lose the type information.
return err.Error() == kubernetesSessionTerminatedByModerator
}
129 changes: 52 additions & 77 deletions lib/kube/proxy/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1206,27 +1206,34 @@ func (f *Forwarder) join(ctx *authContext, w http.ResponseWriter, req *http.Requ

client := &websocketClientStreams{stream}
party := newParty(*ctx, stream.Mode, client)
defer party.CloseConnection()

err = session.join(party, true /* emitSessionJoinEvent */)
if err != nil {
return trace.Wrap(err)
}
closeC := make(chan struct{})
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer close(closeC)
defer wg.Done()
select {
case <-stream.Done():
party.InformClose()
case <-party.closeC:
party.InformClose(trace.BadParameter("websocket connection closed"))
case <-closeC:
return
}
}()
<-party.closeC

err = <-party.closeC
close(closeC)

if _, err := session.leave(party.ID); err != nil {
f.log.WithError(err).Debugf("Participant %q was unable to leave session %s", party.ID, session.id)
}
<-closeC
return nil
wg.Wait()

return trace.Wrap(err)
}(); err != nil {
writeErr := ws.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error()), time.Now().Add(time.Second*10))
if writeErr != nil {
Expand Down Expand Up @@ -1433,12 +1440,10 @@ func (f *Forwarder) acquireConnectionLock(ctx context.Context, user string, role
}

// execNonInteractive handles all exec sessions without a TTY.
func (f *Forwarder) execNonInteractive(ctx *authContext, w http.ResponseWriter, req *http.Request, p httprouter.Params, request remoteCommandRequest, proxy *remoteCommandProxy, sess *clusterSession) (resp any, err error) {
defer proxy.Close()

func (f *Forwarder) execNonInteractive(ctx *authContext, w http.ResponseWriter, req *http.Request, p httprouter.Params, request remoteCommandRequest, proxy *remoteCommandProxy, sess *clusterSession) error {
roles, err := getRolesByName(f, ctx.Context.Identity.GetIdentity().Groups)
if err != nil {
return nil, trace.Wrap(err)
return trace.Wrap(err)
}

var policySets []*types.SessionTrackerPolicySet
Expand All @@ -1450,10 +1455,10 @@ func (f *Forwarder) execNonInteractive(ctx *authContext, w http.ResponseWriter,
authorizer := auth.NewSessionAccessEvaluator(policySets, types.KubernetesSessionKind, ctx.User.GetName())
canStart, _, err := authorizer.FulfilledFor(nil)
if err != nil {
return nil, trace.Wrap(err)
return trace.Wrap(err)
}
if !canStart {
return nil, trace.AccessDenied("insufficient permissions to launch non-interactive session")
return trace.AccessDenied("insufficient permissions to launch non-interactive session")
}

eventPodMeta := request.eventPodMeta(request.context, sess.kubeAPICreds)
Expand Down Expand Up @@ -1494,6 +1499,7 @@ func (f *Forwarder) execNonInteractive(ctx *authContext, w http.ResponseWriter,

if err := f.cfg.Emitter.EmitAuditEvent(f.ctx, sessionStartEvent); err != nil {
f.log.WithError(err).Warn("Failed to emit event.")
return trace.Wrap(err)
}

execEvent := &apievents.Exec{
Expand Down Expand Up @@ -1547,29 +1553,22 @@ func (f *Forwarder) execNonInteractive(ctx *authContext, w http.ResponseWriter,
execEvent.Error, execEvent.ExitCode = exitCode(err)

f.log.WithError(err).Warning("Failed creating executor.")
return nil, trace.Wrap(err)
return trace.Wrap(err)
}

streamOptions := proxy.options()
err = executor.StreamWithContext(req.Context(), streamOptions)
// send the status back to the client when forwarding mode is enabled
// sendStatus sends a payload even if the error is nil to make sure the client
// receives the status and can close the connection.
if sendErr := proxy.sendStatus(err); sendErr != nil {
f.log.WithError(sendErr).Warning("Failed to send status. Exec command was aborted by client.")
}
if err != nil {
execEvent.Code = events.ExecFailureCode
execEvent.Error, execEvent.ExitCode = exitCode(err)

f.log.WithError(err).Warning("Executor failed while streaming.")
// do not return the error otherwise the fwd.withAuth interceptor will try to write it into a hijacked connection
return nil, nil
return trace.Wrap(err)
}

execEvent.Code = events.ExecCode

return nil, nil
return nil
}

func exitCode(err error) (errMsg, code string) {
Expand Down Expand Up @@ -1665,80 +1664,56 @@ func (f *Forwarder) exec(authCtx *authContext, w http.ResponseWriter, req *http.
return nil, trace.Wrap(err)
}

proxy, err := createRemoteCommandProxy(request)
if err != nil {
return nil, trace.Wrap(err)
}
// proxy.Close closes the underlying connection and releases the resources.
defer proxy.Close()
if sess.noAuditEvents {
// We're forwarding this to another kubernetes_service instance, let it handle multiplexing.
return f.remoteExec(authCtx, w, req, p, sess, request, proxy)
}
return upgradeRequestToRemoteCommandProxy(request,
Comment thread
tigrato marked this conversation as resolved.
func(proxy *remoteCommandProxy) error {
if sess.noAuditEvents {
// We're forwarding this to another kubernetes_service instance, let it handle multiplexing.
return f.remoteExec(authCtx, w, req, p, sess, request, proxy)
}

if !request.tty {
resp, err = f.execNonInteractive(authCtx, w, req, p, request, proxy, sess)
if err != nil {
// will hang waiting for the response.
proxy.sendStatus(err)
}
return nil, nil
}
if !request.tty {
return f.execNonInteractive(authCtx, w, req, p, request, proxy, sess)
}

client := newKubeProxyClientStreams(proxy)
party := newParty(*authCtx, types.SessionPeerMode, client)
session, err := newSession(*authCtx, f, req, p, party, sess)
if err != nil {
// This error must be forwarded to SPDY error stream, otherwise the client
// will hang waiting for the response.
proxy.sendStatus(err)
return nil, nil
}
client := newKubeProxyClientStreams(proxy)
party := newParty(*authCtx, types.SessionPeerMode, client)
session, err := newSession(*authCtx, f, req, p, party, sess)
if err != nil {
return trace.Wrap(err)
}

f.setSession(session.id, session)
// When Teleport attaches the original session creator terminal streams to the
// session, we don't wan't to emmit session.join event since it won't be required.
err = session.join(party, false /* emitSessionJoinEvent */)
if err != nil {
// This error must be forwarded to SPDY error stream, otherwise the client
// will hang waiting for the response.
proxy.sendStatus(err)
return nil, nil
}
f.setSession(session.id, session)
// When Teleport attaches the original session creator terminal streams to the
// session, we don't want to emit session.join event since it won't be required.
if err = session.join(party, false /* emitSessionJoinEvent */); err != nil {
return trace.Wrap(err)
}

<-party.closeC
err = <-party.closeC

if _, err := session.leave(party.ID); err != nil {
f.log.WithError(err).Debugf("Participant %q was unable to leave session %s", party.ID, session.id)
}
if _, errLeave := session.leave(party.ID); errLeave != nil {
f.log.WithError(errLeave).Debugf("Participant %q was unable to leave session %s", party.ID, session.id)
}

return nil, nil
return trace.Wrap(err)
},
)
}

// remoteExec forwards an exec request to a remote cluster.
func (f *Forwarder) remoteExec(ctx *authContext, w http.ResponseWriter, req *http.Request, p httprouter.Params, sess *clusterSession, request remoteCommandRequest, proxy *remoteCommandProxy) (resp any, err error) {
defer proxy.Close()

func (f *Forwarder) remoteExec(ctx *authContext, w http.ResponseWriter, req *http.Request, p httprouter.Params, sess *clusterSession, request remoteCommandRequest, proxy *remoteCommandProxy) error {
executor, err := f.getExecutor(sess, req)
if err != nil {
f.log.WithError(err).Warning("Failed creating executor.")
return nil, trace.Wrap(err)
return trace.Wrap(err)
}
streamOptions := proxy.options()
err = executor.StreamWithContext(req.Context(), streamOptions)
// send the status back to the client when forwarding mode is enabled
// sendStatus sends a payload even if the error is nil to make sure the client
// receives the status and can close the connection.
if sendErr := proxy.sendStatus(err); sendErr != nil {
f.log.WithError(sendErr).Warning("Failed to send status. Exec command was aborted by client.")
}
if err != nil {
f.log.WithError(err).Warning("Executor failed while streaming.")
// do not return the error otherwise the fwd.withAuth interceptor will try to write it into a hijacked connection
return nil, nil
}

return nil, nil
return trace.Wrap(err)
}

// portForward starts port forwarding to the remote cluster
Expand Down
3 changes: 3 additions & 0 deletions lib/kube/proxy/moderated_sessions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,9 @@ func TestModeratedSessions(t *testing.T) {
if errors.Is(err, io.ErrClosedPipe) {
return nil
}
if tt.args.moderatorForcedClose && isSessionTerminatedError(err) {
return nil
}
return trace.Wrap(err)
})
// wait for every go-routine to finish without errors returned.
Expand Down
32 changes: 24 additions & 8 deletions lib/kube/proxy/remotecommand.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package proxy
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
Expand All @@ -29,6 +30,7 @@ import (
log "github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/httpstream"
spdystream "k8s.io/apimachinery/pkg/util/httpstream/spdy"
"k8s.io/apimachinery/pkg/util/httpstream/wsstream"
Expand Down Expand Up @@ -94,7 +96,7 @@ func (req remoteCommandRequest) eventPodMeta(ctx context.Context, creds kubeCred
return meta
}

func createRemoteCommandProxy(req remoteCommandRequest) (*remoteCommandProxy, error) {
func upgradeRequestToRemoteCommandProxy(req remoteCommandRequest, exec func(*remoteCommandProxy) error) (any, error) {
var (
proxy *remoteCommandProxy
err error
Expand All @@ -107,11 +109,21 @@ func createRemoteCommandProxy(req remoteCommandRequest) (*remoteCommandProxy, er
if err != nil {
return nil, trace.Wrap(err)
}
defer proxy.Close()

if proxy.resizeStream != nil {
proxy.resizeQueue = newTermQueue(req.context, req.onResize)
go proxy.resizeQueue.handleResizeEvents(proxy.resizeStream)
}
return proxy, nil
err = exec(proxy)
if err := proxy.sendStatus(err); err != nil {
log.Warningf("Failed to send status: %v", err)
}
// return rsp=nil, err=nil to indicate that the request has been handled
// by the hijacked connection. If we return an error, the request will be
// considered unhandled and the middleware will try to write the error
// or response into the hicjacked connection, which will fail.
return nil /* rsp */, nil /* err */
}

func createSPDYStreams(req remoteCommandRequest) (*remoteCommandProxy, error) {
Expand Down Expand Up @@ -228,11 +240,12 @@ func (s *remoteCommandProxy) sendStatus(err error) error {
Status: metav1.StatusSuccess,
}})
}
if statusErr, ok := err.(*apierrors.StatusError); ok {
var statusErr *apierrors.StatusError
if errors.As(err, &statusErr) {
return s.writeStatus(statusErr)
}

if exitErr, ok := err.(utilexec.ExitError); ok && exitErr.Exited() {
var exitErr utilexec.ExitError
if errors.As(err, &exitErr) && exitErr.Exited() {
rc := exitErr.ExitStatus()
return s.writeStatus(&apierrors.StatusError{ErrStatus: metav1.Status{
Status: metav1.StatusFailure,
Expand Down Expand Up @@ -262,6 +275,8 @@ func (s *remoteCommandProxy) sendStatus(err error) error {
Message: err.Error(),
},
})
} else if isSessionTerminatedError(err) {
return s.writeStatus(sessionTerminatedByModeratorErr)
}

err = trace.BadParameter("error executing command in container: %v", err)
Expand Down Expand Up @@ -405,11 +420,12 @@ func waitStreamReply(ctx context.Context, replySent <-chan struct{}, notify chan
// as json in the error channel.
func v4WriteStatusFunc(stream io.Writer) func(status *apierrors.StatusError) error {
return func(status *apierrors.StatusError) error {
bs, err := json.Marshal(status.Status())
st := status.Status()
data, err := runtime.Encode(globalKubeCodecs.LegacyCodec(), &st)
if err != nil {
return err
return trace.Wrap(err)
}
_, err = stream.Write(bs)
_, err = stream.Write(data)
return err
}
}
Expand Down
Loading