diff --git a/lib/events/filesessions/fileasync.go b/lib/events/filesessions/fileasync.go index 6e29e2cce46eb..cf78751645f15 100644 --- a/lib/events/filesessions/fileasync.go +++ b/lib/events/filesessions/fileasync.go @@ -581,6 +581,17 @@ func (u *Uploader) startUpload(ctx context.Context, fileName string) (err error) reader: nil, } + defer func() { + // If we get an error, that signals that the upload goroutine (encrypted or nonencrypted) + // failed to start, so we must manually close the upload as the defers in the goroutine will + // never be called. + if err != nil { + if err := upload.Close(); err != nil { + log.WarnContext(ctx, "Failed to close upload.", "error", err) + } + } + }() + if err := u.uploadEncrypted(ctx, upload); err != nil { if !errors.Is(err, errSkipEncryptedUpload) { u.emitEvent(events.UploadEvent{ @@ -604,9 +615,6 @@ func (u *Uploader) startUpload(ctx context.Context, fileName string) (err error) upload.checkpointFile, err = os.OpenFile(u.checkpointFilePath(sessionID), os.O_RDWR|os.O_CREATE, 0o600) if err != nil { - if err := upload.Close(); err != nil { - log.WarnContext(ctx, "Failed to close upload.", "error", err) - } return trace.ConvertSystemError(err) } @@ -635,6 +643,10 @@ func (u *Uploader) uploadEncrypted(ctx context.Context, up *upload) error { log := u.log.With(fieldSessionID, up.sessionID) header, err := events.ParsePartHeader(up.file) if err != nil { + // Empty upload files are not treated as a session error. + if errors.Is(err, io.EOF) { + return trace.Wrap(err) + } return trace.Wrap(sessionError{err}) } @@ -768,6 +780,7 @@ func (u *Uploader) upload(ctx context.Context, up *upload) error { for { event, err := up.reader.Read(ctx) if err != nil { + // Note that empty upload files are not treated as a session error. if errors.Is(err, io.EOF) { break } diff --git a/lib/events/filesessions/fileasync_chaos_test.go b/lib/events/filesessions/fileasync_chaos_test.go index 943e7b1775ddf..3452a9b8f1e56 100644 --- a/lib/events/filesessions/fileasync_chaos_test.go +++ b/lib/events/filesessions/fileasync_chaos_test.go @@ -1,5 +1,3 @@ -//go:build !race - /* * Teleport * Copyright (C) 2023 Gravitational, Inc. @@ -41,15 +39,7 @@ import ( // TestChaosUpload introduces failures in all stages of the async // upload process and verifies that the system is working correctly. -// -// Data race detector slows down the test significantly (10x+), -// that is why the test is skipped when tests are running with -// `go test -race` flag or `go test -short` flag func TestChaosUpload(t *testing.T) { - if testing.Short() { - t.Skip("Skipping chaos test in short mode.") - } - ctx := t.Context() eventsC := make(chan events.UploadEvent, 100) @@ -114,7 +104,7 @@ func TestChaosUpload(t *testing.T) { uploader, err := NewUploader(UploaderConfig{ ScanDir: scanDir, CorruptedDir: corruptedDir, - ScanPeriod: 3 * time.Second, + ScanPeriod: 100 * time.Millisecond, Streamer: faultyStreamer, Clock: clockwork.NewRealClock(), })