From 3164d5494c7ee439526fc9e9245354905e880e94 Mon Sep 17 00:00:00 2001 From: Tiago Silva Date: Thu, 26 Oct 2023 21:23:20 +0100 Subject: [PATCH] Use the correct error when inspecting Kubernetes session (#33904) * Use the correct error when inspecting Kubernetes session This PR fixes the audit log report for Kubernetes sessions that returned errors. The previous functions was using the incorrect error variable. Signed-off-by: Tiago Silva * append error to the session recorder * correct session leave code * avoid emisleading debug error when owner joins the session --------- Signed-off-by: Tiago Silva --- lib/kube/proxy/forwarder.go | 9 +- lib/kube/proxy/sess.go | 82 +++++---- lib/kube/proxy/sess_test.go | 155 ++++++++++++++++++ .../proxy/testing/kube_server/kube_mock.go | 58 +++++-- 4 files changed, 263 insertions(+), 41 deletions(-) create mode 100644 lib/kube/proxy/sess_test.go diff --git a/lib/kube/proxy/forwarder.go b/lib/kube/proxy/forwarder.go index e6cc6a2088bb6..61388fcee5110 100644 --- a/lib/kube/proxy/forwarder.go +++ b/lib/kube/proxy/forwarder.go @@ -1349,7 +1349,7 @@ func (f *Forwarder) join(ctx *authContext, w http.ResponseWriter, req *http.Requ client := &websocketClientStreams{stream} party := newParty(*ctx, stream.Mode, client) - err = session.join(party) + err = session.join(party, true /* emitSessionJoinEvent */) if err != nil { return trace.Wrap(err) } @@ -1694,6 +1694,9 @@ func exitCode(err error) (errMsg, code string) { return } errMsg = kubeStatusErr.ErrStatus.Message + if errMsg == "" { + errMsg = string(kubeStatusErr.ErrStatus.Reason) + } code = strconv.Itoa(int(kubeStatusErr.ErrStatus.Code)) } else if errors.As(err, &kubeExecErr) { if kubeExecErr.Err != nil { @@ -1796,7 +1799,9 @@ func (f *Forwarder) exec(authCtx *authContext, w http.ResponseWriter, req *http. } f.setSession(session.id, session) - err = session.join(party) + // When Teleport attaches the original session creator terminal streams to the + // session, we don't wan't to emmit session.join event since it won't be required. + err = session.join(party, false /* emitSessionJoinEvent */) if err != nil { return nil, trace.Wrap(err) } diff --git a/lib/kube/proxy/sess.go b/lib/kube/proxy/sess.go index d09eeeb416dbd..4242a68b605c2 100644 --- a/lib/kube/proxy/sess.go +++ b/lib/kube/proxy/sess.go @@ -608,12 +608,24 @@ func (s *session) launch() error { s.io.On() if err = executor.StreamWithContext(s.streamContext, options); err != nil { + s.reportErrorToSessionRecorder(err) s.log.WithError(err).Warning("Executor failed while streaming.") return trace.Wrap(err) } return nil } +// reportErrorToSessionRecorder reports the error to the session recorder +// if it is set. +func (s *session) reportErrorToSessionRecorder(err error) { + if err == nil { + return + } + if s.recorder != nil { + fmt.Fprintf(s.recorder, "\n---\nSession exited with error: %v\n", err) + } +} + func (s *session) lockedSetupLaunch(request *remoteCommandRequest, q url.Values, eventPodMeta apievents.KubernetesPodMetadata) (func(error), error) { s.mu.Lock() defer s.mu.Unlock() @@ -778,8 +790,7 @@ func (s *session) lockedSetupLaunch(request *remoteCommandRequest, q url.Values, if errExec != nil { execEvent.Code = events.ExecFailureCode - execEvent.Error, execEvent.ExitCode = exitCode(err) - + execEvent.Error, execEvent.ExitCode = exitCode(errExec) } if err := s.emitter.EmitAuditEvent(s.forwarder.ctx, execEvent); err != nil { @@ -833,7 +844,7 @@ func (s *session) lockedSetupLaunch(request *remoteCommandRequest, q url.Values, } // join attempts to connect a party to the session. -func (s *session) join(p *party) error { +func (s *session) join(p *party, emitJoinEvent bool) error { if p.Ctx.User.GetName() != s.ctx.User.GetName() { roles := p.Ctx.Checker.Roles() @@ -863,29 +874,11 @@ func (s *session) join(p *party) error { return trace.Wrap(err) } - sessionJoinEvent := &apievents.SessionJoin{ - Metadata: apievents.Metadata{ - Type: events.SessionJoinEvent, - Code: events.SessionJoinCode, - ClusterName: s.ctx.teleportCluster.name, - }, - KubernetesClusterMetadata: apievents.KubernetesClusterMetadata{ - KubernetesCluster: s.ctx.kubeClusterName, - KubernetesUsers: []string{}, - KubernetesGroups: []string{}, - KubernetesLabels: s.ctx.kubeClusterLabels, - }, - SessionMetadata: apievents.SessionMetadata{ - SessionID: s.id.String(), - }, - UserMetadata: p.Ctx.eventUserMetaWithLogin("root"), - ConnectionMetadata: apievents.ConnectionMetadata{ - RemoteAddr: s.params.ByName("podName"), - }, - } - - if err := s.emitter.EmitAuditEvent(s.forwarder.ctx, sessionJoinEvent); err != nil { - s.forwarder.log.WithError(err).Warn("Failed to emit event.") + // we only want to emit the session.join when someone tries to join a session via + // tsh kube join and not when the original session owner terminal streams are + // connected to the Kubernetes session. + if emitJoinEvent { + s.emitSessionJoinEvent(p) } recentWrites := s.io.GetRecentHistory() @@ -974,6 +967,39 @@ func (s *session) BroadcastMessage(format string, args ...any) { } } +// emitSessionJoinEvent emits a session.join audit event when a user joins +// the session. +// This function requires that the session must be active, otherwise audit logger +// will discard the event. +func (s *session) emitSessionJoinEvent(p *party) { + sessionJoinEvent := &apievents.SessionJoin{ + Metadata: apievents.Metadata{ + Type: events.SessionJoinEvent, + Code: events.SessionJoinCode, + ClusterName: s.ctx.teleportCluster.name, + }, + KubernetesClusterMetadata: apievents.KubernetesClusterMetadata{ + KubernetesCluster: s.ctx.kubeClusterName, + // joining moderators, obervers and peers don't have any + // kubernetes metadata configured. + KubernetesUsers: []string{}, + KubernetesGroups: []string{}, + KubernetesLabels: s.ctx.kubeClusterLabels, + }, + SessionMetadata: apievents.SessionMetadata{ + SessionID: s.id.String(), + }, + UserMetadata: p.Ctx.eventUserMetaWithLogin("root"), + ConnectionMetadata: apievents.ConnectionMetadata{ + RemoteAddr: s.params.ByName("podName"), + }, + } + + if err := s.emitter.EmitAuditEvent(s.forwarder.ctx, sessionJoinEvent); err != nil { + s.forwarder.log.WithError(err).Warn("Failed to emit event.") + } +} + // leave removes a party from the session and returns if the party was still active // in the session. If the party wasn't found, it returns false, nil. func (s *session) leave(id uuid.UUID) (bool, error) { @@ -1007,8 +1033,8 @@ func (s *session) unlockedLeave(id uuid.UUID) (bool, error) { sessionLeaveEvent := &apievents.SessionLeave{ Metadata: apievents.Metadata{ - Type: events.SessionJoinEvent, - Code: events.SessionJoinCode, + Type: events.SessionLeaveEvent, + Code: events.SessionLeaveCode, ClusterName: s.ctx.teleportCluster.name, }, SessionMetadata: apievents.SessionMetadata{ diff --git a/lib/kube/proxy/sess_test.go b/lib/kube/proxy/sess_test.go new file mode 100644 index 0000000000000..104aaa04f986a --- /dev/null +++ b/lib/kube/proxy/sess_test.go @@ -0,0 +1,155 @@ +/* +Copyright 2021 Gravitational, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package proxy + +import ( + "bytes" + "context" + "io" + "net/http" + "strconv" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/remotecommand" + + apievents "github.com/gravitational/teleport/api/types/events" + "github.com/gravitational/teleport/lib/events" + testingkubemock "github.com/gravitational/teleport/lib/kube/proxy/testing/kube_server" +) + +func TestSessionEndError(t *testing.T) { + t.Parallel() + var ( + eventsResult []apievents.AuditEvent + eventsResultMutex sync.Mutex + ) + const ( + errorMessage = "request denied" + errorCode = http.StatusForbidden + ) + kubeMock, err := testingkubemock.NewKubeAPIMock( + testingkubemock.WithExecError( + metav1.Status{ + Status: metav1.StatusFailure, + Message: errorMessage, + Reason: metav1.StatusReasonForbidden, + Code: errorCode, + }, + ), + ) + require.NoError(t, err) + t.Cleanup(func() { kubeMock.Close() }) + + // creates a Kubernetes service with a configured cluster pointing to mock api server + testCtx := SetupTestContext( + context.Background(), + t, + TestConfig{ + Clusters: []KubeClusterConfig{{Name: kubeCluster, APIEndpoint: kubeMock.URL}}, + // collect all audit events + OnEvent: func(event apievents.AuditEvent) { + eventsResultMutex.Lock() + defer eventsResultMutex.Unlock() + eventsResult = append(eventsResult, event) + }, + }, + ) + + t.Cleanup(func() { require.NoError(t, testCtx.Close()) }) + + // create a user with access to kubernetes (kubernetes_user and kubernetes_groups specified) + user, _ := testCtx.CreateUserAndRole( + testCtx.Context, + t, + username, + RoleSpec{ + Name: roleName, + KubeUsers: roleKubeUsers, + KubeGroups: roleKubeGroups, + }) + + // generate a kube client with user certs for auth + _, userRestConfig := testCtx.GenTestKubeClientTLSCert( + t, + user.GetName(), + kubeCluster, + ) + require.NoError(t, err) + + var ( + stdinWrite = &bytes.Buffer{} + stdout = &bytes.Buffer{} + stderr = &bytes.Buffer{} + ) + + _, err = stdinWrite.Write(stdinContent) + require.NoError(t, err) + + streamOpts := remotecommand.StreamOptions{ + Stdin: io.NopCloser(stdinWrite), + Stdout: stdout, + Stderr: stderr, + Tty: false, + } + + req, err := generateExecRequest( + generateExecRequestConfig{ + addr: testCtx.KubeServiceAddress(), + podName: podName, + podNamespace: podNamespace, + containerName: podContainerName, + cmd: containerCommmandExecute, // placeholder for commands to execute in the dummy pod + options: streamOpts, + }, + ) + require.NoError(t, err) + + exec, err := remotecommand.NewSPDYExecutor(userRestConfig, http.MethodPost, req.URL()) + require.NoError(t, err) + err = exec.StreamWithContext(testCtx.Context, streamOpts) + require.Error(t, err) + + // check that the session is ended with an error in audit log. + require.EventuallyWithT(t, func(t *assert.CollectT) { + eventsResultMutex.Lock() + defer eventsResultMutex.Unlock() + hasSessionEndEvent := false + hasSessionExecEvent := false + for _, event := range eventsResult { + if event.GetType() == events.SessionEndEvent { + hasSessionEndEvent = true + } + if event.GetType() != events.ExecEvent { + continue + } + + execEvent, ok := event.(*apievents.Exec) + assert.True(t, ok) + assert.Equal(t, events.ExecFailureCode, execEvent.GetCode()) + assert.Equal(t, strconv.Itoa(errorCode), execEvent.ExitCode) + assert.Equal(t, errorMessage, execEvent.Error) + hasSessionExecEvent = true + } + assert.Truef(t, hasSessionEndEvent, "session end event not found in audit log") + assert.Truef(t, hasSessionExecEvent, "session exec event not found in audit log") + }, 10*time.Second, 1*time.Second) +} diff --git a/lib/kube/proxy/testing/kube_server/kube_mock.go b/lib/kube/proxy/testing/kube_server/kube_mock.go index 26c4ac9215110..78c1e34131fc9 100644 --- a/lib/kube/proxy/testing/kube_server/kube_mock.go +++ b/lib/kube/proxy/testing/kube_server/kube_mock.go @@ -89,16 +89,27 @@ const ( PortForwardPayload = "Portforward handler message" ) +// Option is a functional option for KubeMockServer +type Option func(*KubeMockServer) + +// WithExecError sets the error to be returned by the Exec call +func WithExecError(status metav1.Status) Option { + return func(s *KubeMockServer) { + s.execPodError = &status + } +} + type KubeMockServer struct { - router *httprouter.Router - log *log.Entry - server *httptest.Server - TLS *tls.Config - Addr net.Addr - URL string - CA []byte - deletedPods map[string][]string - mu sync.Mutex + router *httprouter.Router + log *log.Entry + server *httptest.Server + TLS *tls.Config + Addr net.Addr + URL string + CA []byte + deletedPods map[string][]string + mu sync.Mutex + execPodError *metav1.Status } // NewKubeAPIMock creates Kubernetes API server for handling exec calls. @@ -108,12 +119,17 @@ type KubeMockServer struct { // The output returns the container followed by a dump of the data received from stdin. // More endpoints can be configured // TODO(tigrato): add support for other endpoints -func NewKubeAPIMock() (*KubeMockServer, error) { +func NewKubeAPIMock(opts ...Option) (*KubeMockServer, error) { s := &KubeMockServer{ router: httprouter.New(), log: log.NewEntry(log.New()), deletedPods: make(map[string][]string), } + + for _, o := range opts { + o(s) + } + s.setup() if err := http2.ConfigureServer(s.server.Config, &http2.Server{}); err != nil { return nil, err @@ -174,9 +190,29 @@ func (s *KubeMockServer) formatResponseError(rw http.ResponseWriter, respErr err } } +func (s *KubeMockServer) writeResponseError(rw http.ResponseWriter, respErr error, status *metav1.Status) { + data, err := runtime.Encode(kubeCodecs.LegacyCodec(), status) + if err != nil { + s.log.Warningf("Failed encoding error into kube Status object: %v", err) + trace.WriteError(rw, respErr) + return + } + rw.Header().Set(responsewriters.ContentTypeHeader, "application/json") + // Always write InternalServerError, that's the only code that kubectl will + // parse the Status object for. The Status object has the real status code + // embedded. + rw.WriteHeader(int(status.Code)) + if _, err := rw.Write(data); err != nil { + s.log.Warningf("Failed writing kube error response body: %v", err) + } +} + func (s *KubeMockServer) exec(w http.ResponseWriter, req *http.Request, p httprouter.Params) (resp any, err error) { q := req.URL.Query() - + if s.execPodError != nil { + s.writeResponseError(w, nil, s.execPodError) + return nil, nil + } request := remoteCommandRequest{ podNamespace: p.ByName("podNamespace"), podName: p.ByName("podName"),