Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion lib/events/filesessions/fileasync.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,11 +392,24 @@ func (u *upload) removeFiles() error {

var errSkipEncryptedUpload = errors.New("skip encrypted upload")

func (u *Uploader) uploadEncryptedRecording(ctx context.Context, sessionID string, in io.Reader) error {
func (u *Uploader) uploadEncryptedRecording(ctx context.Context, sessionID string, in io.ReadSeeker) error {
if u.cfg.EncryptedRecordingUploader == nil {
return trace.Wrap(errSkipEncryptedUpload, "no encrypted uploader configured")
}

header, err := events.ParsePartHeader(in)
if err != nil {
return trace.Wrap(err)
}

if header.Flags&events.ProtoStreamFlagEncrypted == 0 {
return trace.Wrap(errSkipEncryptedUpload, "recording not encrypted")
}

if _, err := in.Seek(0, 0); err != nil {
return trace.Wrap(err, "resetting recording for plaintext upload")
}

// The upload parts in the given reader are each ~128KB. Usually these parts are consumed and reconstructed
// by Auth in 5MB chunks to meet the minimum upload size of upload providers like S3. Since these uploads
// are proxied directly to the uploader from the agent here (see link below), this agent needs to combine
Expand Down
13 changes: 12 additions & 1 deletion lib/events/filesessions/fileasync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ func TestUploadOK(t *testing.T) {
// read the upload and make sure the data is equal
outEvents := p.readEvents(ctx, t, event.UploadID)
require.Equal(t, inEvents, outEvents)

// regression: ensure there is a single upload for the captured session
uploads, err := p.memUploader.ListUploads(ctx)
require.NoError(t, err)
require.Len(t, uploads, 1)
}

// TestUploadParallel verifies several parallel uploads that have to wait
Expand All @@ -93,7 +98,8 @@ func TestUploadParallel(t *testing.T) {

sessions := make(map[string][]apievents.AuditEvent)

for range 5 {
const sessionCount = 5
for range sessionCount {
sessionEvents := eventstest.GenerateTestSession(eventstest.SessionParams{PrintEvents: 1024})
sid := sessionEvents[0].(events.SessionMetadataGetter).GetSessionID()

Expand Down Expand Up @@ -125,6 +131,11 @@ func TestUploadParallel(t *testing.T) {

delete(sessions, event.SessionID)
}

// regression: ensure there is a single upload for each captured session
uploads, err := p.memUploader.ListUploads(ctx)
require.NoError(t, err)
require.Len(t, uploads, sessionCount)
}

// TestMovesCorruptedUploads verifies that the uploader moves corrupted uploads
Expand Down
Loading