From bc092353915e759fc662e395408353498e7caa2f Mon Sep 17 00:00:00 2001 From: Joel Date: Tue, 10 May 2022 07:09:10 +0200 Subject: [PATCH] Refactor non-interactive sessions out of proxy/sess.go (#12497) --- lib/kube/proxy/forwarder.go | 79 ++++++++- lib/kube/proxy/sess.go | 332 +++++++++++++++--------------------- 2 files changed, 207 insertions(+), 204 deletions(-) diff --git a/lib/kube/proxy/forwarder.go b/lib/kube/proxy/forwarder.go index 2282716beffe0..31049202794e1 100644 --- a/lib/kube/proxy/forwarder.go +++ b/lib/kube/proxy/forwarder.go @@ -797,10 +797,6 @@ func (f *Forwarder) join(ctx *authContext, w http.ResponseWriter, req *http.Requ return nil, trace.NotFound("session %v not found", sessionID) } - if !session.tty { - return nil, trace.NotFound("session %v is not interactive", sessionID) - } - ws, err := f.upgrader.Upgrade(w, req, nil) if err != nil { return nil, trace.Wrap(err) @@ -951,6 +947,75 @@ func (f *Forwarder) AcquireConnectionLock(ctx context.Context, user string, role return nil } +// execNonInteractive handles all exec sessions without a TTY. +func (f *Forwarder) execNonInteractive(ctx *authContext, w http.ResponseWriter, req *http.Request, p httprouter.Params, request remoteCommandRequest, proxy *remoteCommandProxy, sess *clusterSession) (resp interface{}, err error) { + defer proxy.Close() + roles, err := getRolesByName(f, ctx.Context.Identity.GetIdentity().Groups) + if err != nil { + return nil, trace.Wrap(err) + } + + var policySets []*types.SessionTrackerPolicySet + for _, role := range roles { + policySet := role.GetSessionPolicySet() + policySets = append(policySets, &policySet) + } + + authorizer := auth.NewSessionAccessEvaluator(policySets, types.KubernetesSessionKind) + canStart, _, err := authorizer.FulfilledFor(nil) + if err != nil { + return nil, trace.Wrap(err) + } + if !canStart { + return nil, trace.AccessDenied("insufficient permissions to launch non-interactive session") + } + + eventPodMeta := request.eventPodMeta(request.context, sess.creds) + event := &apievents.Exec{ + Metadata: apievents.Metadata{ + Type: events.ExecEvent, + ClusterName: f.cfg.ClusterName, + }, + ServerMetadata: apievents.ServerMetadata{ + ServerID: f.cfg.ServerID, + ServerNamespace: f.cfg.Namespace, + }, + SessionMetadata: apievents.SessionMetadata{ + SessionID: uuid.NewString(), + WithMFA: ctx.Identity.GetIdentity().MFAVerified, + }, + UserMetadata: ctx.eventUserMeta(), + ConnectionMetadata: apievents.ConnectionMetadata{ + RemoteAddr: req.RemoteAddr, + LocalAddr: sess.kubeAddress, + Protocol: events.EventProtocolKube, + }, + CommandMetadata: apievents.CommandMetadata{ + Command: strings.Join(request.cmd, " "), + }, + KubernetesClusterMetadata: ctx.eventClusterMeta(), + KubernetesPodMetadata: eventPodMeta, + } + + if err := f.cfg.StreamEmitter.EmitAuditEvent(f.ctx, event); err != nil { + f.log.WithError(err).Warn("Failed to emit exec event.") + } + + executor, err := f.getExecutor(*ctx, sess, req) + if err != nil { + f.log.WithError(err).Warning("Failed creating executor.") + return nil, trace.Wrap(err) + } + + streamOptions := proxy.options() + if err = executor.Stream(streamOptions); err != nil { + f.log.WithError(err).Warning("Executor failed while streaming.") + return nil, trace.Wrap(err) + } + + return nil, nil +} + // exec forwards all exec requests to the target server, captures // all output from the session func (f *Forwarder) exec(ctx *authContext, w http.ResponseWriter, req *http.Request, p httprouter.Params) (resp interface{}, err error) { @@ -1005,6 +1070,11 @@ func (f *Forwarder) exec(ctx *authContext, w http.ResponseWriter, req *http.Requ return f.remoteExec(ctx, w, req, p, sess, request, proxy) } + if !request.tty { + resp, err = f.execNonInteractive(ctx, w, req, p, request, proxy, sess) + return + } + client := newKubeProxyClientStreams(proxy) party := newParty(*ctx, types.SessionPeerMode, client) session, err := newSession(*ctx, f, req, p, party, sess) @@ -1035,7 +1105,6 @@ func (f *Forwarder) remoteExec(ctx *authContext, w http.ResponseWriter, req *htt return nil, trace.Wrap(err) } streamOptions := proxy.options() - if err = executor.Stream(streamOptions); err != nil { f.log.WithError(err).Warning("Executor failed while streaming.") return nil, trace.Wrap(err) diff --git a/lib/kube/proxy/sess.go b/lib/kube/proxy/sess.go index c23c7f99aed60..c2238d7b5e1df 100644 --- a/lib/kube/proxy/sess.go +++ b/lib/kube/proxy/sess.go @@ -23,7 +23,6 @@ import ( "net/http" "net/url" "reflect" - "strings" "sync" "time" @@ -43,7 +42,6 @@ import ( "github.com/julienschmidt/httprouter" log "github.com/sirupsen/logrus" "k8s.io/client-go/tools/remotecommand" - utilexec "k8s.io/client-go/util/exec" ) const sessionRecorderID = "session-recorder" @@ -285,9 +283,6 @@ type session struct { emitter apievents.Emitter - // tty is set if the session is using a TTY. - tty bool - podName string started bool @@ -327,7 +322,6 @@ func newSession(ctx authContext, forwarder *Forwarder, req *http.Request, params } q := req.URL.Query() - tty := utils.AsBool(q.Get("tty")) accessEvaluator := auth.NewSessionAccessEvaluator(policySets, types.KubernetesSessionKind) io := srv.NewTermManager() @@ -345,7 +339,6 @@ func newSession(ctx authContext, forwarder *Forwarder, req *http.Request, params state: types.SessionState_SessionStatePending, accessEvaluator: accessEvaluator, emitter: events.NewDiscardEmitter(), - tty: tty, terminalSizeQueue: newMultiResizeQueue(), started: false, sess: sess, @@ -454,7 +447,6 @@ func (s *session) launch() error { stdin: utils.AsBool(q.Get("stdin")), stdout: utils.AsBool(q.Get("stdout")), stderr: utils.AsBool(q.Get("stderr")), - tty: utils.AsBool(q.Get("tty")), httpRequest: s.req, httpResponseWriter: nil, context: s.req.Context(), @@ -489,48 +481,46 @@ func (s *session) launch() error { return trace.Wrap(err) } - if request.tty { - termParams := tsession.TerminalParams{ - W: 100, - H: 100, - } + termParams := tsession.TerminalParams{ + W: 100, + H: 100, + } - sessionStartEvent := &apievents.SessionStart{ - Metadata: apievents.Metadata{ - Type: events.SessionStartEvent, - Code: events.SessionStartCode, - ClusterName: s.forwarder.cfg.ClusterName, - }, - ServerMetadata: apievents.ServerMetadata{ - ServerID: s.forwarder.cfg.ServerID, - ServerNamespace: s.forwarder.cfg.Namespace, - ServerHostname: s.sess.teleportCluster.name, - ServerAddr: s.sess.kubeAddress, - }, - SessionMetadata: apievents.SessionMetadata{ - SessionID: s.id.String(), - WithMFA: s.ctx.Identity.GetIdentity().MFAVerified, - }, - UserMetadata: apievents.UserMetadata{ - User: s.ctx.User.GetName(), - Login: s.ctx.User.GetName(), - Impersonator: s.ctx.Identity.GetIdentity().Impersonator, - }, - ConnectionMetadata: apievents.ConnectionMetadata{ - RemoteAddr: s.req.RemoteAddr, - LocalAddr: s.sess.kubeAddress, - Protocol: events.EventProtocolKube, - }, - TerminalSize: termParams.Serialize(), - KubernetesClusterMetadata: s.ctx.eventClusterMeta(), - KubernetesPodMetadata: eventPodMeta, - InitialCommand: q["command"], - SessionRecording: s.ctx.recordingConfig.GetMode(), - } + sessionStartEvent := &apievents.SessionStart{ + Metadata: apievents.Metadata{ + Type: events.SessionStartEvent, + Code: events.SessionStartCode, + ClusterName: s.forwarder.cfg.ClusterName, + }, + ServerMetadata: apievents.ServerMetadata{ + ServerID: s.forwarder.cfg.ServerID, + ServerNamespace: s.forwarder.cfg.Namespace, + ServerHostname: s.sess.teleportCluster.name, + ServerAddr: s.sess.kubeAddress, + }, + SessionMetadata: apievents.SessionMetadata{ + SessionID: s.id.String(), + WithMFA: s.ctx.Identity.GetIdentity().MFAVerified, + }, + UserMetadata: apievents.UserMetadata{ + User: s.ctx.User.GetName(), + Login: s.ctx.User.GetName(), + Impersonator: s.ctx.Identity.GetIdentity().Impersonator, + }, + ConnectionMetadata: apievents.ConnectionMetadata{ + RemoteAddr: s.req.RemoteAddr, + LocalAddr: s.sess.kubeAddress, + Protocol: events.EventProtocolKube, + }, + TerminalSize: termParams.Serialize(), + KubernetesClusterMetadata: s.ctx.eventClusterMeta(), + KubernetesPodMetadata: eventPodMeta, + InitialCommand: q["command"], + SessionRecording: s.ctx.recordingConfig.GetMode(), + } - if err := s.emitter.EmitAuditEvent(s.forwarder.ctx, sessionStartEvent); err != nil { - s.forwarder.log.WithError(err).Warn("Failed to emit event.") - } + if err := s.emitter.EmitAuditEvent(s.forwarder.ctx, sessionStartEvent); err != nil { + s.forwarder.log.WithError(err).Warn("Failed to emit event.") } go func() { @@ -564,7 +554,7 @@ func (s *session) launch() error { Stdin: s.io, Stdout: s.io, Stderr: s.io, - Tty: request.tty, + Tty: true, TerminalSizeQueue: s.terminalSizeQueue, } @@ -585,7 +575,7 @@ func (s *session) lockedSetupLaunch(request *remoteCommandRequest, q url.Values, s.started = true sessionStart := s.forwarder.cfg.Clock.Now().UTC() - if !s.sess.noAuditEvents && s.tty { + if !s.sess.noAuditEvents { s.terminalSizeQueue.callback = func(resize *remotecommand.TerminalSize) { s.mu.Lock() defer s.mu.Unlock() @@ -639,37 +629,33 @@ func (s *session) lockedSetupLaunch(request *remoteCommandRequest, q url.Values, s.terminalSizeQueue.callback = func(resize *remotecommand.TerminalSize) {} } - if !s.sess.noAuditEvents && request.tty { - streamer, err := s.forwarder.newStreamer(&s.ctx) - if err != nil { - return nil, trace.Wrap(err) - } - - recorder, err := events.NewAuditWriter(events.AuditWriterConfig{ - // Audit stream is using server context, not session context, - // to make sure that session is uploaded even after it is closed - Context: s.forwarder.ctx, - Streamer: streamer, - Clock: s.forwarder.cfg.Clock, - SessionID: tsession.ID(s.id.String()), - ServerID: s.forwarder.cfg.ServerID, - Namespace: s.forwarder.cfg.Namespace, - RecordOutput: s.ctx.recordingConfig.GetMode() != types.RecordOff, - Component: teleport.Component(teleport.ComponentSession, teleport.ComponentProxyKube), - ClusterName: s.forwarder.cfg.ClusterName, - }) + streamer, err := s.forwarder.newStreamer(&s.ctx) + if err != nil { + return nil, trace.Wrap(err) + } - s.recorder = recorder - s.emitter = recorder - if err != nil { - return nil, trace.Wrap(err) - } + recorder, err := events.NewAuditWriter(events.AuditWriterConfig{ + // Audit stream is using server context, not session context, + // to make sure that session is uploaded even after it is closed + Context: s.forwarder.ctx, + Streamer: streamer, + Clock: s.forwarder.cfg.Clock, + SessionID: tsession.ID(s.id.String()), + ServerID: s.forwarder.cfg.ServerID, + Namespace: s.forwarder.cfg.Namespace, + RecordOutput: s.ctx.recordingConfig.GetMode() != types.RecordOff, + Component: teleport.Component(teleport.ComponentSession, teleport.ComponentProxyKube), + ClusterName: s.forwarder.cfg.ClusterName, + }) - s.io.AddWriter(sessionRecorderID, recorder) - } else if !s.sess.noAuditEvents { - s.emitter = s.forwarder.cfg.StreamEmitter + s.recorder = recorder + s.emitter = recorder + if err != nil { + return nil, trace.Wrap(err) } + s.io.AddWriter(sessionRecorderID, recorder) + // If the identity is verified with an MFA device, we enabled MFA-based presence for the session. if s.PresenceEnabled { go func() { @@ -704,123 +690,76 @@ func (s *session) lockedSetupLaunch(request *remoteCommandRequest, q url.Values, } } - if request.tty { - sessionDataEvent := &apievents.SessionData{ - Metadata: apievents.Metadata{ - Type: events.SessionDataEvent, - Code: events.SessionDataCode, - ClusterName: s.forwarder.cfg.ClusterName, - }, - ServerMetadata: apievents.ServerMetadata{ - ServerID: s.forwarder.cfg.ServerID, - ServerNamespace: s.forwarder.cfg.Namespace, - }, - SessionMetadata: apievents.SessionMetadata{ - SessionID: s.id.String(), - WithMFA: s.ctx.Identity.GetIdentity().MFAVerified, - }, - UserMetadata: apievents.UserMetadata{ - User: s.ctx.User.GetName(), - Login: s.ctx.User.GetName(), - Impersonator: s.ctx.Identity.GetIdentity().Impersonator, - }, - ConnectionMetadata: apievents.ConnectionMetadata{ - RemoteAddr: s.req.RemoteAddr, - LocalAddr: s.sess.kubeAddress, - Protocol: events.EventProtocolKube, - }, - // Bytes transmitted from user to pod. - BytesTransmitted: s.io.CountRead(), - // Bytes received from pod by user. - BytesReceived: s.io.CountWritten(), - } - - if err := s.emitter.EmitAuditEvent(s.forwarder.ctx, sessionDataEvent); err != nil { - s.forwarder.log.WithError(err).Warn("Failed to emit session data event.") - } - - sessionEndEvent := &apievents.SessionEnd{ - Metadata: apievents.Metadata{ - Type: events.SessionEndEvent, - Code: events.SessionEndCode, - ClusterName: s.forwarder.cfg.ClusterName, - }, - ServerMetadata: apievents.ServerMetadata{ - ServerID: s.forwarder.cfg.ServerID, - ServerNamespace: s.forwarder.cfg.Namespace, - }, - SessionMetadata: apievents.SessionMetadata{ - SessionID: s.id.String(), - WithMFA: s.ctx.Identity.GetIdentity().MFAVerified, - }, - UserMetadata: apievents.UserMetadata{ - User: s.ctx.User.GetName(), - Login: s.ctx.User.GetName(), - Impersonator: s.ctx.Identity.GetIdentity().Impersonator, - }, - ConnectionMetadata: apievents.ConnectionMetadata{ - RemoteAddr: s.req.RemoteAddr, - LocalAddr: s.sess.kubeAddress, - Protocol: events.EventProtocolKube, - }, - Interactive: true, - Participants: s.allParticipants(), - StartTime: sessionStart, - EndTime: s.forwarder.cfg.Clock.Now().UTC(), - KubernetesClusterMetadata: s.ctx.eventClusterMeta(), - KubernetesPodMetadata: eventPodMeta, - InitialCommand: request.cmd, - SessionRecording: s.ctx.recordingConfig.GetMode(), - } + sessionDataEvent := &apievents.SessionData{ + Metadata: apievents.Metadata{ + Type: events.SessionDataEvent, + Code: events.SessionDataCode, + ClusterName: s.forwarder.cfg.ClusterName, + }, + ServerMetadata: apievents.ServerMetadata{ + ServerID: s.forwarder.cfg.ServerID, + ServerNamespace: s.forwarder.cfg.Namespace, + }, + SessionMetadata: apievents.SessionMetadata{ + SessionID: s.id.String(), + WithMFA: s.ctx.Identity.GetIdentity().MFAVerified, + }, + UserMetadata: apievents.UserMetadata{ + User: s.ctx.User.GetName(), + Login: s.ctx.User.GetName(), + Impersonator: s.ctx.Identity.GetIdentity().Impersonator, + }, + ConnectionMetadata: apievents.ConnectionMetadata{ + RemoteAddr: s.req.RemoteAddr, + LocalAddr: s.sess.kubeAddress, + Protocol: events.EventProtocolKube, + }, + // Bytes transmitted from user to pod. + BytesTransmitted: s.io.CountRead(), + // Bytes received from pod by user. + BytesReceived: s.io.CountWritten(), + } - if err := s.emitter.EmitAuditEvent(s.forwarder.ctx, sessionEndEvent); err != nil { - s.forwarder.log.WithError(err).Warn("Failed to emit session end event.") - } - } else { - // send an exec event - execEvent := &apievents.Exec{ - Metadata: apievents.Metadata{ - Type: events.ExecEvent, - ClusterName: s.forwarder.cfg.ClusterName, - }, - ServerMetadata: apievents.ServerMetadata{ - ServerID: s.forwarder.cfg.ServerID, - ServerNamespace: s.forwarder.cfg.Namespace, - }, - SessionMetadata: apievents.SessionMetadata{ - SessionID: s.id.String(), - WithMFA: s.ctx.Identity.GetIdentity().MFAVerified, - }, - UserMetadata: apievents.UserMetadata{ - User: s.ctx.User.GetName(), - Login: s.ctx.User.GetName(), - Impersonator: s.ctx.Identity.GetIdentity().Impersonator, - }, - ConnectionMetadata: apievents.ConnectionMetadata{ - RemoteAddr: s.req.RemoteAddr, - LocalAddr: s.sess.kubeAddress, - Protocol: events.EventProtocolKube, - }, - CommandMetadata: apievents.CommandMetadata{ - Command: strings.Join(request.cmd, " "), - }, - KubernetesClusterMetadata: s.ctx.eventClusterMeta(), - KubernetesPodMetadata: eventPodMeta, - } + if err := s.emitter.EmitAuditEvent(s.forwarder.ctx, sessionDataEvent); err != nil { + s.forwarder.log.WithError(err).Warn("Failed to emit session data event.") + } - if err != nil { - execEvent.Code = events.ExecFailureCode - execEvent.Error = err.Error() - if exitErr, ok := err.(utilexec.ExitError); ok && exitErr.Exited() { - execEvent.ExitCode = fmt.Sprintf("%d", exitErr.ExitStatus()) - } - } else { - execEvent.Code = events.ExecCode - } + sessionEndEvent := &apievents.SessionEnd{ + Metadata: apievents.Metadata{ + Type: events.SessionEndEvent, + Code: events.SessionEndCode, + ClusterName: s.forwarder.cfg.ClusterName, + }, + ServerMetadata: apievents.ServerMetadata{ + ServerID: s.forwarder.cfg.ServerID, + ServerNamespace: s.forwarder.cfg.Namespace, + }, + SessionMetadata: apievents.SessionMetadata{ + SessionID: s.id.String(), + WithMFA: s.ctx.Identity.GetIdentity().MFAVerified, + }, + UserMetadata: apievents.UserMetadata{ + User: s.ctx.User.GetName(), + Login: s.ctx.User.GetName(), + Impersonator: s.ctx.Identity.GetIdentity().Impersonator, + }, + ConnectionMetadata: apievents.ConnectionMetadata{ + RemoteAddr: s.req.RemoteAddr, + LocalAddr: s.sess.kubeAddress, + Protocol: events.EventProtocolKube, + }, + Interactive: true, + Participants: s.allParticipants(), + StartTime: sessionStart, + EndTime: s.forwarder.cfg.Clock.Now().UTC(), + KubernetesClusterMetadata: s.ctx.eventClusterMeta(), + KubernetesPodMetadata: eventPodMeta, + InitialCommand: request.cmd, + SessionRecording: s.ctx.recordingConfig.GetMode(), + } - if err := s.emitter.EmitAuditEvent(s.forwarder.ctx, execEvent); err != nil { - s.forwarder.log.WithError(err).Warn("Failed to emit event.") - } + if err := s.emitter.EmitAuditEvent(s.forwarder.ctx, sessionEndEvent); err != nil { + s.forwarder.log.WithError(err).Warn("Failed to emit session end event.") } }, nil } @@ -901,12 +840,9 @@ func (s *session) join(p *party) error { stringID := p.ID.String() s.parties[p.ID] = p s.partiesHistorical[p.ID] = p + s.terminalSizeQueue.add(stringID, p.Client.resizeQueue()) - if s.tty { - s.terminalSizeQueue.add(stringID, p.Client.resizeQueue()) - } - - if s.tty && p.Mode == types.SessionPeerMode { + if p.Mode == types.SessionPeerMode { s.io.AddReader(stringID, p.Client.stdinStream()) } @@ -943,8 +879,6 @@ func (s *session) join(p *party) error { s.log.WithError(err).Warning("Failed to launch Kubernetes session.") } }() - } else if !s.tty { - return trace.AccessDenied("insufficient permissions to launch non-interactive session") } else if len(s.parties) == 1 { base := "Waiting for required participants..." @@ -960,7 +894,7 @@ func (s *session) join(p *party) error { } func (s *session) BroadcastMessage(format string, args ...interface{}) { - if s.accessEvaluator.IsModerated() && s.tty { + if s.accessEvaluator.IsModerated() { s.io.BroadcastMessage(fmt.Sprintf(format, args...)) } }