Skip to content

Commit

Permalink
Refactor non-interactive sessions out of proxy/sess.go (#12497)
Browse files Browse the repository at this point in the history
xacrimon authored May 10, 2022
1 parent fd750dd commit bc09235
Showing 2 changed files with 207 additions and 204 deletions.
79 changes: 74 additions & 5 deletions lib/kube/proxy/forwarder.go
Original file line number Diff line number Diff line change
@@ -797,10 +797,6 @@ func (f *Forwarder) join(ctx *authContext, w http.ResponseWriter, req *http.Requ
return nil, trace.NotFound("session %v not found", sessionID)
}

if !session.tty {
return nil, trace.NotFound("session %v is not interactive", sessionID)
}

ws, err := f.upgrader.Upgrade(w, req, nil)
if err != nil {
return nil, trace.Wrap(err)
@@ -951,6 +947,75 @@ func (f *Forwarder) AcquireConnectionLock(ctx context.Context, user string, role
return nil
}

// execNonInteractive handles all exec sessions without a TTY.
func (f *Forwarder) execNonInteractive(ctx *authContext, w http.ResponseWriter, req *http.Request, p httprouter.Params, request remoteCommandRequest, proxy *remoteCommandProxy, sess *clusterSession) (resp interface{}, err error) {
defer proxy.Close()
roles, err := getRolesByName(f, ctx.Context.Identity.GetIdentity().Groups)
if err != nil {
return nil, trace.Wrap(err)
}

var policySets []*types.SessionTrackerPolicySet
for _, role := range roles {
policySet := role.GetSessionPolicySet()
policySets = append(policySets, &policySet)
}

authorizer := auth.NewSessionAccessEvaluator(policySets, types.KubernetesSessionKind)
canStart, _, err := authorizer.FulfilledFor(nil)
if err != nil {
return nil, trace.Wrap(err)
}
if !canStart {
return nil, trace.AccessDenied("insufficient permissions to launch non-interactive session")
}

eventPodMeta := request.eventPodMeta(request.context, sess.creds)
event := &apievents.Exec{
Metadata: apievents.Metadata{
Type: events.ExecEvent,
ClusterName: f.cfg.ClusterName,
},
ServerMetadata: apievents.ServerMetadata{
ServerID: f.cfg.ServerID,
ServerNamespace: f.cfg.Namespace,
},
SessionMetadata: apievents.SessionMetadata{
SessionID: uuid.NewString(),
WithMFA: ctx.Identity.GetIdentity().MFAVerified,
},
UserMetadata: ctx.eventUserMeta(),
ConnectionMetadata: apievents.ConnectionMetadata{
RemoteAddr: req.RemoteAddr,
LocalAddr: sess.kubeAddress,
Protocol: events.EventProtocolKube,
},
CommandMetadata: apievents.CommandMetadata{
Command: strings.Join(request.cmd, " "),
},
KubernetesClusterMetadata: ctx.eventClusterMeta(),
KubernetesPodMetadata: eventPodMeta,
}

if err := f.cfg.StreamEmitter.EmitAuditEvent(f.ctx, event); err != nil {
f.log.WithError(err).Warn("Failed to emit exec event.")
}

executor, err := f.getExecutor(*ctx, sess, req)
if err != nil {
f.log.WithError(err).Warning("Failed creating executor.")
return nil, trace.Wrap(err)
}

streamOptions := proxy.options()
if err = executor.Stream(streamOptions); err != nil {
f.log.WithError(err).Warning("Executor failed while streaming.")
return nil, trace.Wrap(err)
}

return nil, nil
}

// exec forwards all exec requests to the target server, captures
// all output from the session
func (f *Forwarder) exec(ctx *authContext, w http.ResponseWriter, req *http.Request, p httprouter.Params) (resp interface{}, err error) {
@@ -1005,6 +1070,11 @@ func (f *Forwarder) exec(ctx *authContext, w http.ResponseWriter, req *http.Requ
return f.remoteExec(ctx, w, req, p, sess, request, proxy)
}

if !request.tty {
resp, err = f.execNonInteractive(ctx, w, req, p, request, proxy, sess)
return
}

client := newKubeProxyClientStreams(proxy)
party := newParty(*ctx, types.SessionPeerMode, client)
session, err := newSession(*ctx, f, req, p, party, sess)
@@ -1035,7 +1105,6 @@ func (f *Forwarder) remoteExec(ctx *authContext, w http.ResponseWriter, req *htt
return nil, trace.Wrap(err)
}
streamOptions := proxy.options()

if err = executor.Stream(streamOptions); err != nil {
f.log.WithError(err).Warning("Executor failed while streaming.")
return nil, trace.Wrap(err)
332 changes: 133 additions & 199 deletions lib/kube/proxy/sess.go
Original file line number Diff line number Diff line change
@@ -23,7 +23,6 @@ import (
"net/http"
"net/url"
"reflect"
"strings"
"sync"
"time"

@@ -43,7 +42,6 @@ import (
"github.com/julienschmidt/httprouter"
log "github.com/sirupsen/logrus"
"k8s.io/client-go/tools/remotecommand"
utilexec "k8s.io/client-go/util/exec"
)

const sessionRecorderID = "session-recorder"
@@ -285,9 +283,6 @@ type session struct {

emitter apievents.Emitter

// tty is set if the session is using a TTY.
tty bool

podName string

started bool
@@ -327,7 +322,6 @@ func newSession(ctx authContext, forwarder *Forwarder, req *http.Request, params
}

q := req.URL.Query()
tty := utils.AsBool(q.Get("tty"))
accessEvaluator := auth.NewSessionAccessEvaluator(policySets, types.KubernetesSessionKind)

io := srv.NewTermManager()
@@ -345,7 +339,6 @@ func newSession(ctx authContext, forwarder *Forwarder, req *http.Request, params
state: types.SessionState_SessionStatePending,
accessEvaluator: accessEvaluator,
emitter: events.NewDiscardEmitter(),
tty: tty,
terminalSizeQueue: newMultiResizeQueue(),
started: false,
sess: sess,
@@ -454,7 +447,6 @@ func (s *session) launch() error {
stdin: utils.AsBool(q.Get("stdin")),
stdout: utils.AsBool(q.Get("stdout")),
stderr: utils.AsBool(q.Get("stderr")),
tty: utils.AsBool(q.Get("tty")),
httpRequest: s.req,
httpResponseWriter: nil,
context: s.req.Context(),
@@ -489,48 +481,46 @@ func (s *session) launch() error {
return trace.Wrap(err)
}

if request.tty {
termParams := tsession.TerminalParams{
W: 100,
H: 100,
}
termParams := tsession.TerminalParams{
W: 100,
H: 100,
}

sessionStartEvent := &apievents.SessionStart{
Metadata: apievents.Metadata{
Type: events.SessionStartEvent,
Code: events.SessionStartCode,
ClusterName: s.forwarder.cfg.ClusterName,
},
ServerMetadata: apievents.ServerMetadata{
ServerID: s.forwarder.cfg.ServerID,
ServerNamespace: s.forwarder.cfg.Namespace,
ServerHostname: s.sess.teleportCluster.name,
ServerAddr: s.sess.kubeAddress,
},
SessionMetadata: apievents.SessionMetadata{
SessionID: s.id.String(),
WithMFA: s.ctx.Identity.GetIdentity().MFAVerified,
},
UserMetadata: apievents.UserMetadata{
User: s.ctx.User.GetName(),
Login: s.ctx.User.GetName(),
Impersonator: s.ctx.Identity.GetIdentity().Impersonator,
},
ConnectionMetadata: apievents.ConnectionMetadata{
RemoteAddr: s.req.RemoteAddr,
LocalAddr: s.sess.kubeAddress,
Protocol: events.EventProtocolKube,
},
TerminalSize: termParams.Serialize(),
KubernetesClusterMetadata: s.ctx.eventClusterMeta(),
KubernetesPodMetadata: eventPodMeta,
InitialCommand: q["command"],
SessionRecording: s.ctx.recordingConfig.GetMode(),
}
sessionStartEvent := &apievents.SessionStart{
Metadata: apievents.Metadata{
Type: events.SessionStartEvent,
Code: events.SessionStartCode,
ClusterName: s.forwarder.cfg.ClusterName,
},
ServerMetadata: apievents.ServerMetadata{
ServerID: s.forwarder.cfg.ServerID,
ServerNamespace: s.forwarder.cfg.Namespace,
ServerHostname: s.sess.teleportCluster.name,
ServerAddr: s.sess.kubeAddress,
},
SessionMetadata: apievents.SessionMetadata{
SessionID: s.id.String(),
WithMFA: s.ctx.Identity.GetIdentity().MFAVerified,
},
UserMetadata: apievents.UserMetadata{
User: s.ctx.User.GetName(),
Login: s.ctx.User.GetName(),
Impersonator: s.ctx.Identity.GetIdentity().Impersonator,
},
ConnectionMetadata: apievents.ConnectionMetadata{
RemoteAddr: s.req.RemoteAddr,
LocalAddr: s.sess.kubeAddress,
Protocol: events.EventProtocolKube,
},
TerminalSize: termParams.Serialize(),
KubernetesClusterMetadata: s.ctx.eventClusterMeta(),
KubernetesPodMetadata: eventPodMeta,
InitialCommand: q["command"],
SessionRecording: s.ctx.recordingConfig.GetMode(),
}

if err := s.emitter.EmitAuditEvent(s.forwarder.ctx, sessionStartEvent); err != nil {
s.forwarder.log.WithError(err).Warn("Failed to emit event.")
}
if err := s.emitter.EmitAuditEvent(s.forwarder.ctx, sessionStartEvent); err != nil {
s.forwarder.log.WithError(err).Warn("Failed to emit event.")
}

go func() {
@@ -564,7 +554,7 @@ func (s *session) launch() error {
Stdin: s.io,
Stdout: s.io,
Stderr: s.io,
Tty: request.tty,
Tty: true,
TerminalSizeQueue: s.terminalSizeQueue,
}

@@ -585,7 +575,7 @@ func (s *session) lockedSetupLaunch(request *remoteCommandRequest, q url.Values,
s.started = true
sessionStart := s.forwarder.cfg.Clock.Now().UTC()

if !s.sess.noAuditEvents && s.tty {
if !s.sess.noAuditEvents {
s.terminalSizeQueue.callback = func(resize *remotecommand.TerminalSize) {
s.mu.Lock()
defer s.mu.Unlock()
@@ -639,37 +629,33 @@ func (s *session) lockedSetupLaunch(request *remoteCommandRequest, q url.Values,
s.terminalSizeQueue.callback = func(resize *remotecommand.TerminalSize) {}
}

if !s.sess.noAuditEvents && request.tty {
streamer, err := s.forwarder.newStreamer(&s.ctx)
if err != nil {
return nil, trace.Wrap(err)
}

recorder, err := events.NewAuditWriter(events.AuditWriterConfig{
// Audit stream is using server context, not session context,
// to make sure that session is uploaded even after it is closed
Context: s.forwarder.ctx,
Streamer: streamer,
Clock: s.forwarder.cfg.Clock,
SessionID: tsession.ID(s.id.String()),
ServerID: s.forwarder.cfg.ServerID,
Namespace: s.forwarder.cfg.Namespace,
RecordOutput: s.ctx.recordingConfig.GetMode() != types.RecordOff,
Component: teleport.Component(teleport.ComponentSession, teleport.ComponentProxyKube),
ClusterName: s.forwarder.cfg.ClusterName,
})
streamer, err := s.forwarder.newStreamer(&s.ctx)
if err != nil {
return nil, trace.Wrap(err)
}

s.recorder = recorder
s.emitter = recorder
if err != nil {
return nil, trace.Wrap(err)
}
recorder, err := events.NewAuditWriter(events.AuditWriterConfig{
// Audit stream is using server context, not session context,
// to make sure that session is uploaded even after it is closed
Context: s.forwarder.ctx,
Streamer: streamer,
Clock: s.forwarder.cfg.Clock,
SessionID: tsession.ID(s.id.String()),
ServerID: s.forwarder.cfg.ServerID,
Namespace: s.forwarder.cfg.Namespace,
RecordOutput: s.ctx.recordingConfig.GetMode() != types.RecordOff,
Component: teleport.Component(teleport.ComponentSession, teleport.ComponentProxyKube),
ClusterName: s.forwarder.cfg.ClusterName,
})

s.io.AddWriter(sessionRecorderID, recorder)
} else if !s.sess.noAuditEvents {
s.emitter = s.forwarder.cfg.StreamEmitter
s.recorder = recorder
s.emitter = recorder
if err != nil {
return nil, trace.Wrap(err)
}

s.io.AddWriter(sessionRecorderID, recorder)

// If the identity is verified with an MFA device, we enabled MFA-based presence for the session.
if s.PresenceEnabled {
go func() {
@@ -704,123 +690,76 @@ func (s *session) lockedSetupLaunch(request *remoteCommandRequest, q url.Values,
}
}

if request.tty {
sessionDataEvent := &apievents.SessionData{
Metadata: apievents.Metadata{
Type: events.SessionDataEvent,
Code: events.SessionDataCode,
ClusterName: s.forwarder.cfg.ClusterName,
},
ServerMetadata: apievents.ServerMetadata{
ServerID: s.forwarder.cfg.ServerID,
ServerNamespace: s.forwarder.cfg.Namespace,
},
SessionMetadata: apievents.SessionMetadata{
SessionID: s.id.String(),
WithMFA: s.ctx.Identity.GetIdentity().MFAVerified,
},
UserMetadata: apievents.UserMetadata{
User: s.ctx.User.GetName(),
Login: s.ctx.User.GetName(),
Impersonator: s.ctx.Identity.GetIdentity().Impersonator,
},
ConnectionMetadata: apievents.ConnectionMetadata{
RemoteAddr: s.req.RemoteAddr,
LocalAddr: s.sess.kubeAddress,
Protocol: events.EventProtocolKube,
},
// Bytes transmitted from user to pod.
BytesTransmitted: s.io.CountRead(),
// Bytes received from pod by user.
BytesReceived: s.io.CountWritten(),
}

if err := s.emitter.EmitAuditEvent(s.forwarder.ctx, sessionDataEvent); err != nil {
s.forwarder.log.WithError(err).Warn("Failed to emit session data event.")
}

sessionEndEvent := &apievents.SessionEnd{
Metadata: apievents.Metadata{
Type: events.SessionEndEvent,
Code: events.SessionEndCode,
ClusterName: s.forwarder.cfg.ClusterName,
},
ServerMetadata: apievents.ServerMetadata{
ServerID: s.forwarder.cfg.ServerID,
ServerNamespace: s.forwarder.cfg.Namespace,
},
SessionMetadata: apievents.SessionMetadata{
SessionID: s.id.String(),
WithMFA: s.ctx.Identity.GetIdentity().MFAVerified,
},
UserMetadata: apievents.UserMetadata{
User: s.ctx.User.GetName(),
Login: s.ctx.User.GetName(),
Impersonator: s.ctx.Identity.GetIdentity().Impersonator,
},
ConnectionMetadata: apievents.ConnectionMetadata{
RemoteAddr: s.req.RemoteAddr,
LocalAddr: s.sess.kubeAddress,
Protocol: events.EventProtocolKube,
},
Interactive: true,
Participants: s.allParticipants(),
StartTime: sessionStart,
EndTime: s.forwarder.cfg.Clock.Now().UTC(),
KubernetesClusterMetadata: s.ctx.eventClusterMeta(),
KubernetesPodMetadata: eventPodMeta,
InitialCommand: request.cmd,
SessionRecording: s.ctx.recordingConfig.GetMode(),
}
sessionDataEvent := &apievents.SessionData{
Metadata: apievents.Metadata{
Type: events.SessionDataEvent,
Code: events.SessionDataCode,
ClusterName: s.forwarder.cfg.ClusterName,
},
ServerMetadata: apievents.ServerMetadata{
ServerID: s.forwarder.cfg.ServerID,
ServerNamespace: s.forwarder.cfg.Namespace,
},
SessionMetadata: apievents.SessionMetadata{
SessionID: s.id.String(),
WithMFA: s.ctx.Identity.GetIdentity().MFAVerified,
},
UserMetadata: apievents.UserMetadata{
User: s.ctx.User.GetName(),
Login: s.ctx.User.GetName(),
Impersonator: s.ctx.Identity.GetIdentity().Impersonator,
},
ConnectionMetadata: apievents.ConnectionMetadata{
RemoteAddr: s.req.RemoteAddr,
LocalAddr: s.sess.kubeAddress,
Protocol: events.EventProtocolKube,
},
// Bytes transmitted from user to pod.
BytesTransmitted: s.io.CountRead(),
// Bytes received from pod by user.
BytesReceived: s.io.CountWritten(),
}

if err := s.emitter.EmitAuditEvent(s.forwarder.ctx, sessionEndEvent); err != nil {
s.forwarder.log.WithError(err).Warn("Failed to emit session end event.")
}
} else {
// send an exec event
execEvent := &apievents.Exec{
Metadata: apievents.Metadata{
Type: events.ExecEvent,
ClusterName: s.forwarder.cfg.ClusterName,
},
ServerMetadata: apievents.ServerMetadata{
ServerID: s.forwarder.cfg.ServerID,
ServerNamespace: s.forwarder.cfg.Namespace,
},
SessionMetadata: apievents.SessionMetadata{
SessionID: s.id.String(),
WithMFA: s.ctx.Identity.GetIdentity().MFAVerified,
},
UserMetadata: apievents.UserMetadata{
User: s.ctx.User.GetName(),
Login: s.ctx.User.GetName(),
Impersonator: s.ctx.Identity.GetIdentity().Impersonator,
},
ConnectionMetadata: apievents.ConnectionMetadata{
RemoteAddr: s.req.RemoteAddr,
LocalAddr: s.sess.kubeAddress,
Protocol: events.EventProtocolKube,
},
CommandMetadata: apievents.CommandMetadata{
Command: strings.Join(request.cmd, " "),
},
KubernetesClusterMetadata: s.ctx.eventClusterMeta(),
KubernetesPodMetadata: eventPodMeta,
}
if err := s.emitter.EmitAuditEvent(s.forwarder.ctx, sessionDataEvent); err != nil {
s.forwarder.log.WithError(err).Warn("Failed to emit session data event.")
}

if err != nil {
execEvent.Code = events.ExecFailureCode
execEvent.Error = err.Error()
if exitErr, ok := err.(utilexec.ExitError); ok && exitErr.Exited() {
execEvent.ExitCode = fmt.Sprintf("%d", exitErr.ExitStatus())
}
} else {
execEvent.Code = events.ExecCode
}
sessionEndEvent := &apievents.SessionEnd{
Metadata: apievents.Metadata{
Type: events.SessionEndEvent,
Code: events.SessionEndCode,
ClusterName: s.forwarder.cfg.ClusterName,
},
ServerMetadata: apievents.ServerMetadata{
ServerID: s.forwarder.cfg.ServerID,
ServerNamespace: s.forwarder.cfg.Namespace,
},
SessionMetadata: apievents.SessionMetadata{
SessionID: s.id.String(),
WithMFA: s.ctx.Identity.GetIdentity().MFAVerified,
},
UserMetadata: apievents.UserMetadata{
User: s.ctx.User.GetName(),
Login: s.ctx.User.GetName(),
Impersonator: s.ctx.Identity.GetIdentity().Impersonator,
},
ConnectionMetadata: apievents.ConnectionMetadata{
RemoteAddr: s.req.RemoteAddr,
LocalAddr: s.sess.kubeAddress,
Protocol: events.EventProtocolKube,
},
Interactive: true,
Participants: s.allParticipants(),
StartTime: sessionStart,
EndTime: s.forwarder.cfg.Clock.Now().UTC(),
KubernetesClusterMetadata: s.ctx.eventClusterMeta(),
KubernetesPodMetadata: eventPodMeta,
InitialCommand: request.cmd,
SessionRecording: s.ctx.recordingConfig.GetMode(),
}

if err := s.emitter.EmitAuditEvent(s.forwarder.ctx, execEvent); err != nil {
s.forwarder.log.WithError(err).Warn("Failed to emit event.")
}
if err := s.emitter.EmitAuditEvent(s.forwarder.ctx, sessionEndEvent); err != nil {
s.forwarder.log.WithError(err).Warn("Failed to emit session end event.")
}
}, nil
}
@@ -901,12 +840,9 @@ func (s *session) join(p *party) error {
stringID := p.ID.String()
s.parties[p.ID] = p
s.partiesHistorical[p.ID] = p
s.terminalSizeQueue.add(stringID, p.Client.resizeQueue())

if s.tty {
s.terminalSizeQueue.add(stringID, p.Client.resizeQueue())
}

if s.tty && p.Mode == types.SessionPeerMode {
if p.Mode == types.SessionPeerMode {
s.io.AddReader(stringID, p.Client.stdinStream())
}

@@ -943,8 +879,6 @@ func (s *session) join(p *party) error {
s.log.WithError(err).Warning("Failed to launch Kubernetes session.")
}
}()
} else if !s.tty {
return trace.AccessDenied("insufficient permissions to launch non-interactive session")
} else if len(s.parties) == 1 {
base := "Waiting for required participants..."

@@ -960,7 +894,7 @@ func (s *session) join(p *party) error {
}

func (s *session) BroadcastMessage(format string, args ...interface{}) {
if s.accessEvaluator.IsModerated() && s.tty {
if s.accessEvaluator.IsModerated() {
s.io.BroadcastMessage(fmt.Sprintf(format, args...))
}
}

0 comments on commit bc09235

Please sign in to comment.