Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions lib/events/filesessions/fileasync.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,17 @@ func (u *Uploader) startUpload(ctx context.Context, fileName string) (err error)
reader: nil,
}

defer func() {
// If we get an error, that signals that the upload goroutine (encrypted or nonencrypted)
// failed to start, so we must manually close the upload as the defers in the goroutine will
// never be called.
if err != nil {
if err := upload.Close(); err != nil {
log.WarnContext(ctx, "Failed to close upload.", "error", err)
}
}
}()

if err := u.uploadEncrypted(ctx, upload); err != nil {
if !errors.Is(err, errSkipEncryptedUpload) {
u.emitEvent(events.UploadEvent{
Expand All @@ -604,9 +615,6 @@ func (u *Uploader) startUpload(ctx context.Context, fileName string) (err error)

upload.checkpointFile, err = os.OpenFile(u.checkpointFilePath(sessionID), os.O_RDWR|os.O_CREATE, 0o600)
if err != nil {
if err := upload.Close(); err != nil {
log.WarnContext(ctx, "Failed to close upload.", "error", err)
}
return trace.ConvertSystemError(err)
}

Expand Down Expand Up @@ -637,6 +645,10 @@ func (u *Uploader) uploadEncrypted(ctx context.Context, up *upload) error {
log := u.log.With(fieldSessionID, up.sessionID)
header, err := events.ParsePartHeader(up.file)
if err != nil {
// Empty upload files are not treated as a session error.
if errors.Is(err, io.EOF) {
return trace.Wrap(err)
}
return trace.Wrap(sessionError{err})
}

Expand Down Expand Up @@ -772,6 +784,7 @@ func (u *Uploader) upload(ctx context.Context, up *upload) error {
for {
event, err := up.reader.Read(ctx)
if err != nil {
// Note that empty upload files are not treated as a session error.
if errors.Is(err, io.EOF) {
break
}
Expand Down
21 changes: 5 additions & 16 deletions lib/events/filesessions/fileasync_chaos_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
//go:build !race

/*
* Teleport
* Copyright (C) 2023 Gravitational, Inc.
Expand Down Expand Up @@ -41,17 +39,8 @@ import (

// TestChaosUpload introduces failures in all stages of the async
// upload process and verifies that the system is working correctly.
//
// Data race detector slows down the test significantly (10x+),
// that is why the test is skipped when tests are running with
// `go test -race` flag or `go test -short` flag
func TestChaosUpload(t *testing.T) {
if testing.Short() {
t.Skip("Skipping chaos test in short mode.")
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx := t.Context()

eventsC := make(chan events.UploadEvent, 100)
memUploader := eventstest.NewMemoryUploader(eventstest.MemoryUploaderConfig{
Expand Down Expand Up @@ -115,7 +104,7 @@ func TestChaosUpload(t *testing.T) {
uploader, err := NewUploader(UploaderConfig{
ScanDir: scanDir,
CorruptedDir: corruptedDir,
ScanPeriod: 3 * time.Second,
ScanPeriod: 100 * time.Millisecond,
Streamer: faultyStreamer,
Clock: clockwork.NewRealClock(),
})
Expand All @@ -136,7 +125,7 @@ func TestChaosUpload(t *testing.T) {
err error
}
streamsCh := make(chan streamState, parallelStreams)
for i := 0; i < parallelStreams; i++ {
for range parallelStreams {
go func() {
inEvents := eventstest.GenerateTestSession(eventstest.SessionParams{PrintEvents: 4096})
sid := inEvents[0].(events.SessionMetadataGetter).GetSessionID()
Expand Down Expand Up @@ -166,7 +155,7 @@ func TestChaosUpload(t *testing.T) {

// wait for all streams to be completed
streams := make(map[string]streamState)
for i := 0; i < parallelStreams; i++ {
for range parallelStreams {
select {
case status := <-streamsCh:
require.NoError(t, status.err)
Expand All @@ -178,7 +167,7 @@ func TestChaosUpload(t *testing.T) {

require.Len(t, streams, parallelStreams)

for i := 0; i < parallelStreams; i++ {
for range parallelStreams {
select {
case event := <-eventsC:
require.NoError(t, event.Error)
Expand Down
Loading