diff --git a/lib/kube/proxy/forwarder.go b/lib/kube/proxy/forwarder.go index 8cf5a6d29ef75..a3da067a96053 100644 --- a/lib/kube/proxy/forwarder.go +++ b/lib/kube/proxy/forwarder.go @@ -1177,7 +1177,7 @@ func (f *Forwarder) join(ctx *authContext, w http.ResponseWriter, req *http.Requ client := &websocketClientStreams{stream} party := newParty(*ctx, stream.Mode, client) - err = session.join(party) + err = session.join(party, true /* emitSessionJoinEvent */) if err != nil { return trace.Wrap(err) } @@ -1519,6 +1519,9 @@ func exitCode(err error) (errMsg, code string) { return } errMsg = kubeStatusErr.ErrStatus.Message + if errMsg == "" { + errMsg = string(kubeStatusErr.ErrStatus.Reason) + } code = strconv.Itoa(int(kubeStatusErr.ErrStatus.Code)) } else if errors.As(err, &kubeExecErr) { if kubeExecErr.Err != nil { @@ -1604,7 +1607,9 @@ func (f *Forwarder) exec(ctx *authContext, w http.ResponseWriter, req *http.Requ } f.setSession(session.id, session) - err = session.join(party) + // When Teleport attaches the original session creator terminal streams to the + // session, we don't wan't to emmit session.join event since it won't be required. + err = session.join(party, false /* emitSessionJoinEvent */) if err != nil { return nil, trace.Wrap(err) } diff --git a/lib/kube/proxy/sess.go b/lib/kube/proxy/sess.go index 100dffe147780..c27edc8d6eb7c 100644 --- a/lib/kube/proxy/sess.go +++ b/lib/kube/proxy/sess.go @@ -601,12 +601,24 @@ func (s *session) launch() error { s.io.On() if err = executor.StreamWithContext(s.streamContext, options); err != nil { + s.reportErrorToSessionRecorder(err) s.log.WithError(err).Warning("Executor failed while streaming.") return trace.Wrap(err) } return nil } +// reportErrorToSessionRecorder reports the error to the session recorder +// if it is set. +func (s *session) reportErrorToSessionRecorder(err error) { + if err == nil { + return + } + if s.recorder != nil { + fmt.Fprintf(s.recorder, "\n---\nSession exited with error: %v\n", err) + } +} + func (s *session) lockedSetupLaunch(request *remoteCommandRequest, q url.Values, eventPodMeta apievents.KubernetesPodMetadata) (func(error), error) { s.mu.Lock() defer s.mu.Unlock() @@ -771,8 +783,7 @@ func (s *session) lockedSetupLaunch(request *remoteCommandRequest, q url.Values, if errExec != nil { execEvent.Code = events.ExecFailureCode - execEvent.Error, execEvent.ExitCode = exitCode(err) - + execEvent.Error, execEvent.ExitCode = exitCode(errExec) } if err := s.emitter.EmitAuditEvent(s.forwarder.ctx, execEvent); err != nil { @@ -826,7 +837,7 @@ func (s *session) lockedSetupLaunch(request *remoteCommandRequest, q url.Values, } // join attempts to connect a party to the session. -func (s *session) join(p *party) error { +func (s *session) join(p *party, emitJoinEvent bool) error { if p.Ctx.User.GetName() != s.ctx.User.GetName() { roles := p.Ctx.Checker.Roles() @@ -856,29 +867,11 @@ func (s *session) join(p *party) error { return trace.Wrap(err) } - sessionJoinEvent := &apievents.SessionJoin{ - Metadata: apievents.Metadata{ - Type: events.SessionJoinEvent, - Code: events.SessionJoinCode, - ClusterName: s.ctx.teleportCluster.name, - }, - KubernetesClusterMetadata: apievents.KubernetesClusterMetadata{ - KubernetesCluster: s.ctx.kubeClusterName, - KubernetesUsers: []string{}, - KubernetesGroups: []string{}, - KubernetesLabels: s.ctx.kubeClusterLabels, - }, - SessionMetadata: apievents.SessionMetadata{ - SessionID: s.id.String(), - }, - UserMetadata: p.Ctx.eventUserMetaWithLogin("root"), - ConnectionMetadata: apievents.ConnectionMetadata{ - RemoteAddr: s.params.ByName("podName"), - }, - } - - if err := s.emitter.EmitAuditEvent(s.forwarder.ctx, sessionJoinEvent); err != nil { - s.forwarder.log.WithError(err).Warn("Failed to emit event.") + // we only want to emit the session.join when someone tries to join a session via + // tsh kube join and not when the original session owner terminal streams are + // connected to the Kubernetes session. + if emitJoinEvent { + s.emitSessionJoinEvent(p) } recentWrites := s.io.GetRecentHistory() @@ -967,6 +960,39 @@ func (s *session) BroadcastMessage(format string, args ...any) { } } +// emitSessionJoinEvent emits a session.join audit event when a user joins +// the session. +// This function requires that the session must be active, otherwise audit logger +// will discard the event. +func (s *session) emitSessionJoinEvent(p *party) { + sessionJoinEvent := &apievents.SessionJoin{ + Metadata: apievents.Metadata{ + Type: events.SessionJoinEvent, + Code: events.SessionJoinCode, + ClusterName: s.ctx.teleportCluster.name, + }, + KubernetesClusterMetadata: apievents.KubernetesClusterMetadata{ + KubernetesCluster: s.ctx.kubeClusterName, + // joining moderators, obervers and peers don't have any + // kubernetes metadata configured. + KubernetesUsers: []string{}, + KubernetesGroups: []string{}, + KubernetesLabels: s.ctx.kubeClusterLabels, + }, + SessionMetadata: apievents.SessionMetadata{ + SessionID: s.id.String(), + }, + UserMetadata: p.Ctx.eventUserMetaWithLogin("root"), + ConnectionMetadata: apievents.ConnectionMetadata{ + RemoteAddr: s.params.ByName("podName"), + }, + } + + if err := s.emitter.EmitAuditEvent(s.forwarder.ctx, sessionJoinEvent); err != nil { + s.forwarder.log.WithError(err).Warn("Failed to emit event.") + } +} + // leave removes a party from the session and returns if the party was still active // in the session. If the party wasn't found, it returns false, nil. func (s *session) leave(id uuid.UUID) (bool, error) { @@ -1000,8 +1026,8 @@ func (s *session) unlockedLeave(id uuid.UUID) (bool, error) { sessionLeaveEvent := &apievents.SessionLeave{ Metadata: apievents.Metadata{ - Type: events.SessionJoinEvent, - Code: events.SessionJoinCode, + Type: events.SessionLeaveEvent, + Code: events.SessionLeaveCode, ClusterName: s.ctx.teleportCluster.name, }, SessionMetadata: apievents.SessionMetadata{ diff --git a/lib/kube/proxy/sess_test.go b/lib/kube/proxy/sess_test.go new file mode 100644 index 0000000000000..7d482473e82af --- /dev/null +++ b/lib/kube/proxy/sess_test.go @@ -0,0 +1,153 @@ +/* +Copyright 2021 Gravitational, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package proxy + +import ( + "bytes" + "context" + "io" + "net/http" + "strconv" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/remotecommand" + + apievents "github.com/gravitational/teleport/api/types/events" + "github.com/gravitational/teleport/lib/events" + testingkubemock "github.com/gravitational/teleport/lib/kube/proxy/testing/kube_server" +) + +func TestSessionEndError(t *testing.T) { + t.Parallel() + var ( + eventsResult []apievents.AuditEvent + eventsResultMutex sync.Mutex + ) + const ( + errorMessage = "request denied" + errorCode = http.StatusForbidden + ) + kubeMock, err := testingkubemock.NewKubeAPIMock( + testingkubemock.WithExecError( + metav1.Status{ + Status: metav1.StatusFailure, + Message: errorMessage, + Reason: metav1.StatusReasonForbidden, + Code: errorCode, + }, + ), + ) + require.NoError(t, err) + t.Cleanup(func() { kubeMock.Close() }) + + // creates a Kubernetes service with a configured cluster pointing to mock api server + testCtx := SetupTestContext( + context.Background(), + t, + TestConfig{ + Clusters: []KubeClusterConfig{{Name: kubeCluster, APIEndpoint: kubeMock.URL}}, + // collect all audit events + OnEvent: func(event apievents.AuditEvent) { + eventsResultMutex.Lock() + defer eventsResultMutex.Unlock() + eventsResult = append(eventsResult, event) + }, + }, + ) + + t.Cleanup(func() { require.NoError(t, testCtx.Close()) }) + + // create a user with access to kubernetes (kubernetes_user and kubernetes_groups specified) + user, _ := testCtx.CreateUserAndRole( + testCtx.Context, + t, + username, + RoleSpec{ + Name: roleName, + KubeUsers: roleKubeUsers, + KubeGroups: roleKubeGroups, + }) + + // generate a kube client with user certs for auth + _, userRestConfig := testCtx.GenTestKubeClientTLSCert( + t, + user.GetName(), + kubeCluster, + ) + require.NoError(t, err) + + var ( + stdinWrite = &bytes.Buffer{} + stdout = &bytes.Buffer{} + stderr = &bytes.Buffer{} + ) + + _, err = stdinWrite.Write(stdinContent) + require.NoError(t, err) + + streamOpts := remotecommand.StreamOptions{ + Stdin: io.NopCloser(stdinWrite), + Stdout: stdout, + Stderr: stderr, + Tty: false, + } + + req, err := generateExecRequest( + testCtx.KubeServiceAddress(), + podName, + podNamespace, + podContainerName, + containerCommmandExecute, // placeholder for commands to execute in the dummy pod + streamOpts, + ) + require.NoError(t, err) + + exec, err := remotecommand.NewSPDYExecutor(userRestConfig, http.MethodPost, req.URL()) + require.NoError(t, err) + err = exec.StreamWithContext(testCtx.Context, streamOpts) + require.Error(t, err) + + // check that the session is ended with an error in audit log. + require.EventuallyWithT(t, func(t *assert.CollectT) { + eventsResultMutex.Lock() + defer eventsResultMutex.Unlock() + hasSessionEndEvent := false + hasSessionExecEvent := false + for _, event := range eventsResult { + if event.GetType() == events.SessionEndEvent { + hasSessionEndEvent = true + } + if event.GetType() != events.ExecEvent { + continue + } + + execEvent, ok := event.(*apievents.Exec) + assert.True(t, ok) + assert.Equal(t, events.ExecFailureCode, execEvent.GetCode()) + assert.Equal(t, strconv.Itoa(errorCode), execEvent.ExitCode) + assert.Equal(t, errorMessage, execEvent.Error) + hasSessionExecEvent = true + } + assert.Truef(t, hasSessionEndEvent, "session end event not found in audit log") + assert.Truef(t, hasSessionExecEvent, "session exec event not found in audit log") + }, 10*time.Second, 1*time.Second) +} diff --git a/lib/kube/proxy/testing/kube_server/kube_mock.go b/lib/kube/proxy/testing/kube_server/kube_mock.go index 26c4ac9215110..78c1e34131fc9 100644 --- a/lib/kube/proxy/testing/kube_server/kube_mock.go +++ b/lib/kube/proxy/testing/kube_server/kube_mock.go @@ -89,16 +89,27 @@ const ( PortForwardPayload = "Portforward handler message" ) +// Option is a functional option for KubeMockServer +type Option func(*KubeMockServer) + +// WithExecError sets the error to be returned by the Exec call +func WithExecError(status metav1.Status) Option { + return func(s *KubeMockServer) { + s.execPodError = &status + } +} + type KubeMockServer struct { - router *httprouter.Router - log *log.Entry - server *httptest.Server - TLS *tls.Config - Addr net.Addr - URL string - CA []byte - deletedPods map[string][]string - mu sync.Mutex + router *httprouter.Router + log *log.Entry + server *httptest.Server + TLS *tls.Config + Addr net.Addr + URL string + CA []byte + deletedPods map[string][]string + mu sync.Mutex + execPodError *metav1.Status } // NewKubeAPIMock creates Kubernetes API server for handling exec calls. @@ -108,12 +119,17 @@ type KubeMockServer struct { // The output returns the container followed by a dump of the data received from stdin. // More endpoints can be configured // TODO(tigrato): add support for other endpoints -func NewKubeAPIMock() (*KubeMockServer, error) { +func NewKubeAPIMock(opts ...Option) (*KubeMockServer, error) { s := &KubeMockServer{ router: httprouter.New(), log: log.NewEntry(log.New()), deletedPods: make(map[string][]string), } + + for _, o := range opts { + o(s) + } + s.setup() if err := http2.ConfigureServer(s.server.Config, &http2.Server{}); err != nil { return nil, err @@ -174,9 +190,29 @@ func (s *KubeMockServer) formatResponseError(rw http.ResponseWriter, respErr err } } +func (s *KubeMockServer) writeResponseError(rw http.ResponseWriter, respErr error, status *metav1.Status) { + data, err := runtime.Encode(kubeCodecs.LegacyCodec(), status) + if err != nil { + s.log.Warningf("Failed encoding error into kube Status object: %v", err) + trace.WriteError(rw, respErr) + return + } + rw.Header().Set(responsewriters.ContentTypeHeader, "application/json") + // Always write InternalServerError, that's the only code that kubectl will + // parse the Status object for. The Status object has the real status code + // embedded. + rw.WriteHeader(int(status.Code)) + if _, err := rw.Write(data); err != nil { + s.log.Warningf("Failed writing kube error response body: %v", err) + } +} + func (s *KubeMockServer) exec(w http.ResponseWriter, req *http.Request, p httprouter.Params) (resp any, err error) { q := req.URL.Query() - + if s.execPodError != nil { + s.writeResponseError(w, nil, s.execPodError) + return nil, nil + } request := remoteCommandRequest{ podNamespace: p.ByName("podNamespace"), podName: p.ByName("podName"),