Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions lib/events/filesessions/fileasync.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,17 @@ func (u *Uploader) startUpload(ctx context.Context, fileName string) (err error)
reader: nil,
}

defer func() {
// If we get an error, that signals that the upload goroutine (encrypted or nonencrypted)
// failed to start, so we must manually close the upload as the defers in the goroutine will
// never be called.
if err != nil {
Comment thread
Joerger marked this conversation as resolved.
if err := upload.Close(); err != nil {
log.WarnContext(ctx, "Failed to close upload.", "error", err)
}
}
}()

if err := u.uploadEncrypted(ctx, upload); err != nil {
if !errors.Is(err, errSkipEncryptedUpload) {
u.emitEvent(events.UploadEvent{
Expand All @@ -604,9 +615,6 @@ func (u *Uploader) startUpload(ctx context.Context, fileName string) (err error)

upload.checkpointFile, err = os.OpenFile(u.checkpointFilePath(sessionID), os.O_RDWR|os.O_CREATE, 0o600)
if err != nil {
if err := upload.Close(); err != nil {
log.WarnContext(ctx, "Failed to close upload.", "error", err)
}
return trace.ConvertSystemError(err)
}

Expand Down Expand Up @@ -635,6 +643,10 @@ func (u *Uploader) uploadEncrypted(ctx context.Context, up *upload) error {
log := u.log.With(fieldSessionID, up.sessionID)
header, err := events.ParsePartHeader(up.file)
if err != nil {
// Empty upload files are not treated as a session error.
if errors.Is(err, io.EOF) {
return trace.Wrap(err)
}
return trace.Wrap(sessionError{err})
}

Expand Down Expand Up @@ -768,6 +780,7 @@ func (u *Uploader) upload(ctx context.Context, up *upload) error {
for {
event, err := up.reader.Read(ctx)
if err != nil {
// Note that empty upload files are not treated as a session error.
if errors.Is(err, io.EOF) {
break
}
Expand Down
12 changes: 1 addition & 11 deletions lib/events/filesessions/fileasync_chaos_test.go
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it feasible to backport these changes to v17? I just got a hit on v17. I tried backporting this PR myself, but it doesn't backport cleanly.

Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
//go:build !race

/*
* Teleport
* Copyright (C) 2023 Gravitational, Inc.
Expand Down Expand Up @@ -41,15 +39,7 @@ import (

// TestChaosUpload introduces failures in all stages of the async
// upload process and verifies that the system is working correctly.
//
// Data race detector slows down the test significantly (10x+),
// that is why the test is skipped when tests are running with
// `go test -race` flag or `go test -short` flag
func TestChaosUpload(t *testing.T) {
if testing.Short() {
t.Skip("Skipping chaos test in short mode.")
}

ctx := t.Context()

eventsC := make(chan events.UploadEvent, 100)
Expand Down Expand Up @@ -114,7 +104,7 @@ func TestChaosUpload(t *testing.T) {
uploader, err := NewUploader(UploaderConfig{
ScanDir: scanDir,
CorruptedDir: corruptedDir,
ScanPeriod: 3 * time.Second,
ScanPeriod: 100 * time.Millisecond,
Streamer: faultyStreamer,
Clock: clockwork.NewRealClock(),
})
Expand Down
Loading