Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions lib/kube/proxy/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1177,7 +1177,7 @@ func (f *Forwarder) join(ctx *authContext, w http.ResponseWriter, req *http.Requ
client := &websocketClientStreams{stream}
party := newParty(*ctx, stream.Mode, client)

err = session.join(party)
err = session.join(party, true /* emitSessionJoinEvent */)
if err != nil {
return trace.Wrap(err)
}
Expand Down Expand Up @@ -1519,6 +1519,9 @@ func exitCode(err error) (errMsg, code string) {
return
}
errMsg = kubeStatusErr.ErrStatus.Message
if errMsg == "" {
errMsg = string(kubeStatusErr.ErrStatus.Reason)
}
code = strconv.Itoa(int(kubeStatusErr.ErrStatus.Code))
} else if errors.As(err, &kubeExecErr) {
if kubeExecErr.Err != nil {
Expand Down Expand Up @@ -1604,7 +1607,9 @@ func (f *Forwarder) exec(ctx *authContext, w http.ResponseWriter, req *http.Requ
}

f.setSession(session.id, session)
err = session.join(party)
// When Teleport attaches the original session creator terminal streams to the
// session, we don't wan't to emmit session.join event since it won't be required.
err = session.join(party, false /* emitSessionJoinEvent */)
if err != nil {
return nil, trace.Wrap(err)
}
Expand Down
82 changes: 54 additions & 28 deletions lib/kube/proxy/sess.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,12 +601,24 @@ func (s *session) launch() error {

s.io.On()
if err = executor.StreamWithContext(s.streamContext, options); err != nil {
s.reportErrorToSessionRecorder(err)
s.log.WithError(err).Warning("Executor failed while streaming.")
return trace.Wrap(err)
}
return nil
}

// reportErrorToSessionRecorder reports the error to the session recorder
// if it is set.
func (s *session) reportErrorToSessionRecorder(err error) {
if err == nil {
return
}
if s.recorder != nil {
fmt.Fprintf(s.recorder, "\n---\nSession exited with error: %v\n", err)
}
}

func (s *session) lockedSetupLaunch(request *remoteCommandRequest, q url.Values, eventPodMeta apievents.KubernetesPodMetadata) (func(error), error) {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down Expand Up @@ -771,8 +783,7 @@ func (s *session) lockedSetupLaunch(request *remoteCommandRequest, q url.Values,

if errExec != nil {
execEvent.Code = events.ExecFailureCode
execEvent.Error, execEvent.ExitCode = exitCode(err)

execEvent.Error, execEvent.ExitCode = exitCode(errExec)
}

if err := s.emitter.EmitAuditEvent(s.forwarder.ctx, execEvent); err != nil {
Expand Down Expand Up @@ -826,7 +837,7 @@ func (s *session) lockedSetupLaunch(request *remoteCommandRequest, q url.Values,
}

// join attempts to connect a party to the session.
func (s *session) join(p *party) error {
func (s *session) join(p *party, emitJoinEvent bool) error {
if p.Ctx.User.GetName() != s.ctx.User.GetName() {
roles := p.Ctx.Checker.Roles()

Expand Down Expand Up @@ -856,29 +867,11 @@ func (s *session) join(p *party) error {
return trace.Wrap(err)
}

sessionJoinEvent := &apievents.SessionJoin{
Metadata: apievents.Metadata{
Type: events.SessionJoinEvent,
Code: events.SessionJoinCode,
ClusterName: s.ctx.teleportCluster.name,
},
KubernetesClusterMetadata: apievents.KubernetesClusterMetadata{
KubernetesCluster: s.ctx.kubeClusterName,
KubernetesUsers: []string{},
KubernetesGroups: []string{},
KubernetesLabels: s.ctx.kubeClusterLabels,
},
SessionMetadata: apievents.SessionMetadata{
SessionID: s.id.String(),
},
UserMetadata: p.Ctx.eventUserMetaWithLogin("root"),
ConnectionMetadata: apievents.ConnectionMetadata{
RemoteAddr: s.params.ByName("podName"),
},
}

if err := s.emitter.EmitAuditEvent(s.forwarder.ctx, sessionJoinEvent); err != nil {
s.forwarder.log.WithError(err).Warn("Failed to emit event.")
// we only want to emit the session.join when someone tries to join a session via
// tsh kube join and not when the original session owner terminal streams are
// connected to the Kubernetes session.
if emitJoinEvent {
s.emitSessionJoinEvent(p)
}

recentWrites := s.io.GetRecentHistory()
Expand Down Expand Up @@ -967,6 +960,39 @@ func (s *session) BroadcastMessage(format string, args ...any) {
}
}

// emitSessionJoinEvent emits a session.join audit event when a user joins
// the session.
// This function requires that the session must be active, otherwise audit logger
// will discard the event.
func (s *session) emitSessionJoinEvent(p *party) {
sessionJoinEvent := &apievents.SessionJoin{
Metadata: apievents.Metadata{
Type: events.SessionJoinEvent,
Code: events.SessionJoinCode,
ClusterName: s.ctx.teleportCluster.name,
},
KubernetesClusterMetadata: apievents.KubernetesClusterMetadata{
KubernetesCluster: s.ctx.kubeClusterName,
// joining moderators, obervers and peers don't have any
// kubernetes metadata configured.
KubernetesUsers: []string{},
KubernetesGroups: []string{},
KubernetesLabels: s.ctx.kubeClusterLabels,
},
SessionMetadata: apievents.SessionMetadata{
SessionID: s.id.String(),
},
UserMetadata: p.Ctx.eventUserMetaWithLogin("root"),
ConnectionMetadata: apievents.ConnectionMetadata{
RemoteAddr: s.params.ByName("podName"),
},
}

if err := s.emitter.EmitAuditEvent(s.forwarder.ctx, sessionJoinEvent); err != nil {
s.forwarder.log.WithError(err).Warn("Failed to emit event.")
}
}

// leave removes a party from the session and returns if the party was still active
// in the session. If the party wasn't found, it returns false, nil.
func (s *session) leave(id uuid.UUID) (bool, error) {
Expand Down Expand Up @@ -1000,8 +1026,8 @@ func (s *session) unlockedLeave(id uuid.UUID) (bool, error) {

sessionLeaveEvent := &apievents.SessionLeave{
Metadata: apievents.Metadata{
Type: events.SessionJoinEvent,
Code: events.SessionJoinCode,
Type: events.SessionLeaveEvent,
Code: events.SessionLeaveCode,
ClusterName: s.ctx.teleportCluster.name,
},
SessionMetadata: apievents.SessionMetadata{
Expand Down
153 changes: 153 additions & 0 deletions lib/kube/proxy/sess_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
Copyright 2021 Gravitational, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package proxy

import (
"bytes"
"context"
"io"
"net/http"
"strconv"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/remotecommand"

apievents "github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/lib/events"
testingkubemock "github.com/gravitational/teleport/lib/kube/proxy/testing/kube_server"
)

func TestSessionEndError(t *testing.T) {
t.Parallel()
var (
eventsResult []apievents.AuditEvent
eventsResultMutex sync.Mutex
)
const (
errorMessage = "request denied"
errorCode = http.StatusForbidden
)
kubeMock, err := testingkubemock.NewKubeAPIMock(
testingkubemock.WithExecError(
metav1.Status{
Status: metav1.StatusFailure,
Message: errorMessage,
Reason: metav1.StatusReasonForbidden,
Code: errorCode,
},
),
)
require.NoError(t, err)
t.Cleanup(func() { kubeMock.Close() })

// creates a Kubernetes service with a configured cluster pointing to mock api server
testCtx := SetupTestContext(
context.Background(),
t,
TestConfig{
Clusters: []KubeClusterConfig{{Name: kubeCluster, APIEndpoint: kubeMock.URL}},
// collect all audit events
OnEvent: func(event apievents.AuditEvent) {
eventsResultMutex.Lock()
defer eventsResultMutex.Unlock()
eventsResult = append(eventsResult, event)
},
},
)

t.Cleanup(func() { require.NoError(t, testCtx.Close()) })

// create a user with access to kubernetes (kubernetes_user and kubernetes_groups specified)
user, _ := testCtx.CreateUserAndRole(
testCtx.Context,
t,
username,
RoleSpec{
Name: roleName,
KubeUsers: roleKubeUsers,
KubeGroups: roleKubeGroups,
})

// generate a kube client with user certs for auth
_, userRestConfig := testCtx.GenTestKubeClientTLSCert(
t,
user.GetName(),
kubeCluster,
)
require.NoError(t, err)

var (
stdinWrite = &bytes.Buffer{}
stdout = &bytes.Buffer{}
stderr = &bytes.Buffer{}
)

_, err = stdinWrite.Write(stdinContent)
require.NoError(t, err)

streamOpts := remotecommand.StreamOptions{
Stdin: io.NopCloser(stdinWrite),
Stdout: stdout,
Stderr: stderr,
Tty: false,
}

req, err := generateExecRequest(
testCtx.KubeServiceAddress(),
podName,
podNamespace,
podContainerName,
containerCommmandExecute, // placeholder for commands to execute in the dummy pod
streamOpts,
)
require.NoError(t, err)

exec, err := remotecommand.NewSPDYExecutor(userRestConfig, http.MethodPost, req.URL())
require.NoError(t, err)
err = exec.StreamWithContext(testCtx.Context, streamOpts)
require.Error(t, err)

// check that the session is ended with an error in audit log.
require.EventuallyWithT(t, func(t *assert.CollectT) {
eventsResultMutex.Lock()
defer eventsResultMutex.Unlock()
hasSessionEndEvent := false
hasSessionExecEvent := false
for _, event := range eventsResult {
if event.GetType() == events.SessionEndEvent {
hasSessionEndEvent = true
}
if event.GetType() != events.ExecEvent {
continue
}

execEvent, ok := event.(*apievents.Exec)
assert.True(t, ok)
assert.Equal(t, events.ExecFailureCode, execEvent.GetCode())
assert.Equal(t, strconv.Itoa(errorCode), execEvent.ExitCode)
assert.Equal(t, errorMessage, execEvent.Error)
hasSessionExecEvent = true
}
assert.Truef(t, hasSessionEndEvent, "session end event not found in audit log")
assert.Truef(t, hasSessionExecEvent, "session exec event not found in audit log")
}, 10*time.Second, 1*time.Second)
}
Loading