Skip to content
Merged
11 changes: 9 additions & 2 deletions lib/events/complete.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,16 @@ func (u *UploadCompleter) checkUploads(ctx context.Context) error {
}
}

if _, err := u.cfg.SessionTracker.GetSessionTracker(ctx, upload.SessionID.String()); err == nil {
switch _, err := u.cfg.SessionTracker.GetSessionTracker(ctx, upload.SessionID.String()); {
case err == nil: // session is still in progress, continue to other uploads
continue
} else if !trace.IsNotFound(err) {
case trace.IsNotFound(err): // upload abandoned, complete upload
case trace.IsAccessDenied(err): // upload abandoned, complete upload
// Treat access denied errors as not found errors, since we expect
// to get them if the auth server is v9.2.3 or earlier, since only
// node, proxy, and kube roles had permissions to create trackers.
// DELETE IN 11.0.0
default: // aka err != nil
return trace.Wrap(err)
}

Expand Down
97 changes: 66 additions & 31 deletions lib/events/complete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package events

import (
"context"
"fmt"
"strings"
"testing"
"time"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/gravitational/teleport/api/types"
apievents "github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/lib/events/eventstest"
"github.com/gravitational/teleport/lib/services"
"github.com/gravitational/teleport/lib/session"
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
Expand Down Expand Up @@ -85,42 +87,49 @@ func TestUploadCompleterCompletesAbandonedUploads(t *testing.T) {
// completes uploads that have lived past the configured grace period.
// DELETE IN 11.0.0
func TestUploadCompleterWithGracePeriod(t *testing.T) {
clock := clockwork.NewFakeClock()
mu := NewMemoryUploader()
mu.Clock = clock

log := &mockAuditLog{}

sessionID := session.NewID()
sessionTrackerService := &mockSessionTrackerService{}
for _, sts := range []services.SessionTrackerService{
&mockSessionTrackerService{},
// Session tracker services will return an access denied error to
// db, app, and desktop sessions if the auth server is between v9.2.2
// and v9.2.3. We should handle these cases as if the tracker could
// not be found, and depend on the grace period to handle the completion.
&mockSessionTrackerServiceAccessDenied{},
} {
t.Run(fmt.Sprintf("%T", sts), func(t *testing.T) {
clock := clockwork.NewFakeClock()
mu := NewMemoryUploader()
mu.Clock = clock

uc, err := NewUploadCompleter(UploadCompleterConfig{
Uploader: mu,
AuditLog: log,
SessionTracker: sessionTrackerService,
Clock: clock,
GracePeriod: 2 * time.Hour,
})
require.NoError(t, err)
uc, err := NewUploadCompleter(UploadCompleterConfig{
Uploader: mu,
AuditLog: &mockAuditLog{},
SessionTracker: sts,
Clock: clock,
GracePeriod: 2 * time.Hour,
})
require.NoError(t, err)
t.Cleanup(uc.Close)

upload, err := mu.CreateUpload(context.Background(), sessionID)
require.NoError(t, err)
upload, err := mu.CreateUpload(context.Background(), session.NewID())
require.NoError(t, err)

err = uc.checkUploads(context.Background())
require.NoError(t, err)
require.False(t, mu.uploads[upload.ID].completed)
err = uc.checkUploads(context.Background())
require.NoError(t, err)
require.False(t, mu.uploads[upload.ID].completed)

// Even if session tracker is not found, the completer
// should wait until the grace period to complete it
clock.Advance(1 * time.Hour)
err = uc.checkUploads(context.Background())
require.NoError(t, err)
require.False(t, mu.uploads[upload.ID].completed)
// Even if session tracker is not found, the completer
// should wait until the grace period to complete it
clock.Advance(1 * time.Hour)
err = uc.checkUploads(context.Background())
require.NoError(t, err)
require.False(t, mu.uploads[upload.ID].completed)

clock.Advance(1 * time.Hour)
err = uc.checkUploads(context.Background())
require.NoError(t, err)
require.True(t, mu.uploads[upload.ID].completed)
clock.Advance(1 * time.Hour)
err = uc.checkUploads(context.Background())
require.NoError(t, err)
require.True(t, mu.uploads[upload.ID].completed)
})
}
}

// TestUploadCompleterEmitsSessionEnd verifies that the upload completer
Expand Down Expand Up @@ -248,3 +257,29 @@ func (m *mockSessionTrackerService) RemoveSessionTracker(ctx context.Context, se
func (m *mockSessionTrackerService) UpdatePresence(ctx context.Context, sessionID, user string) error {
return trace.NotImplemented("UpdatePresence is not implemented")
}

type mockSessionTrackerServiceAccessDenied struct{}

func (m *mockSessionTrackerServiceAccessDenied) GetActiveSessionTrackers(ctx context.Context) ([]types.SessionTracker, error) {
return nil, trace.AccessDenied("access denied")
}

func (m *mockSessionTrackerServiceAccessDenied) GetSessionTracker(ctx context.Context, sessionID string) (types.SessionTracker, error) {
return nil, trace.AccessDenied("access denied")
}

func (m *mockSessionTrackerServiceAccessDenied) CreateSessionTracker(ctx context.Context, st types.SessionTracker) (types.SessionTracker, error) {
return nil, trace.AccessDenied("access denied")
}

func (m *mockSessionTrackerServiceAccessDenied) UpdateSessionTracker(ctx context.Context, req *proto.UpdateSessionTrackerRequest) error {
return trace.AccessDenied("access denied")
}

func (m *mockSessionTrackerServiceAccessDenied) RemoveSessionTracker(ctx context.Context, sessionID string) error {
return trace.AccessDenied("access denied")
}

func (m *mockSessionTrackerServiceAccessDenied) UpdatePresence(ctx context.Context, sessionID, user string) error {
return trace.AccessDenied("access denied")
}
11 changes: 10 additions & 1 deletion lib/srv/app/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,16 @@ func (s *Server) createTracker(sess *sessionChunk, identity *tlsca.Identity) err

s.log.Debugf("Creating tracker for session chunk %v", sess.id)
tracker, err := srv.NewSessionTracker(s.closeContext, trackerSpec, s.c.AuthClient)
if err != nil {
switch {
case err == nil:
case trace.IsAccessDenied(err):
// Ignore access denied errors, which we may get if the auth
// server is v9.2.3 or earlier, since only node, proxy, and
// kube roles had permission to create session trackers.
// DELETE IN 11.0.0
s.log.Debugf("Insufficient permissions to create session tracker, skipping session tracking for session chunk %v", sess.id)
return nil
default: // aka err != nil
return trace.Wrap(err)
}

Expand Down
11 changes: 10 additions & 1 deletion lib/srv/db/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -898,7 +898,16 @@ func (s *Server) trackSession(ctx context.Context, sessionCtx *common.Session) e

s.log.Debugf("Creating tracker for session %v", sessionCtx.ID)
tracker, err := srv.NewSessionTracker(s.closeContext, trackerSpec, s.cfg.AuthClient)
if err != nil {
switch {
case err == nil:
case trace.IsAccessDenied(err):
// Ignore access denied errors, which we may get if the auth
// server is v9.2.3 or earlier, since only node, proxy, and
// kube roles had permission to create session trackers.
// DELETE IN 11.0.0
s.log.Debugf("Insufficient permissions to create session tracker, skipping session tracking for session %v", sessionCtx.ID)
return nil
default: // aka err != nil
return trace.Wrap(err)
}

Expand Down
11 changes: 10 additions & 1 deletion lib/srv/desktop/windows_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1282,7 +1282,16 @@ func (s *WindowsService) trackSession(ctx context.Context, id *tlsca.Identity, w

s.cfg.Log.Debugf("Creating tracker for session %v", sessionID)
tracker, err := srv.NewSessionTracker(ctx, trackerSpec, s.cfg.AuthClient)
if err != nil {
switch {
case err == nil:
case trace.IsAccessDenied(err):
// Ignore access denied errors, which we may get if the auth
// server is v9.2.3 or earlier, since only node, proxy, and
// kube roles had permission to create session trackers.
// DELETE IN 11.0.0
s.cfg.Log.Debugf("Insufficient permissions to create session tracker, skipping session tracking for session %v", sessionID)
return nil
default: // aka err != nil
return trace.Wrap(err)
}

Expand Down