diff --git a/lib/events/filesessions/fileasync_chaos_test.go b/lib/events/filesessions/fileasync_chaos_test.go index 73c32f846dd6e..1d114752a47e6 100644 --- a/lib/events/filesessions/fileasync_chaos_test.go +++ b/lib/events/filesessions/fileasync_chaos_test.go @@ -51,10 +51,9 @@ func TestChaosUpload(t *testing.T) { t.Skip("Skipping chaos test in short mode.") } - ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - clock := clockwork.NewFakeClock() eventsC := make(chan events.UploadEvent, 100) memUploader := eventstest.NewMemoryUploader(eventsC) streamer, err := events.NewProtoStreamer(events.ProtoStreamerConfig{ @@ -111,19 +110,17 @@ func TestChaosUpload(t *testing.T) { }) require.NoError(t, err) - scanPeriod := 10 * time.Second + scanPeriod := 3 * time.Second uploader, err := NewUploader(UploaderConfig{ ScanDir: scanDir, CorruptedDir: corruptedDir, ScanPeriod: scanPeriod, Streamer: faultyStreamer, - Clock: clock, + Clock: clockwork.NewRealClock(), }) require.NoError(t, err) - go uploader.Serve(ctx) - // wait until uploader blocks on the clock - clock.BlockUntil(1) + go uploader.Serve(ctx) defer uploader.Close() fileStreamer, err := NewStreamer(scanDir) @@ -164,15 +161,6 @@ func TestChaosUpload(t *testing.T) { }() } - // initiate concurrent scans - scansCh := make(chan error, parallelStreams) - for i := 0; i < parallelStreams; i++ { - go func() { - _, err := uploader.Scan(ctx) - scansCh <- trace.Wrap(err) - }() - } - // wait for all streams to be completed streams := make(map[string]streamState) for i := 0; i < parallelStreams; i++ { @@ -185,32 +173,19 @@ func TestChaosUpload(t *testing.T) { } } - // wait for all scans to be completed - for i := 0; i < parallelStreams; i++ { - select { - case err := <-scansCh: - require.NoError(t, err, trace.DebugReport(err)) - case <-ctx.Done(): - t.Fatalf("Timeout waiting for parallel scan complete, try `go test -v` to get more logs for details") - } - } + require.Len(t, streams, parallelStreams) for i := 0; i < parallelStreams; i++ { - // do scans to catch remaining uploads - _, err = uploader.Scan(ctx) - require.NoError(t, err) - - // wait for the upload events - var event events.UploadEvent select { - case event = <-eventsC: + case event := <-eventsC: require.NoError(t, event.Error) - state, ok := streams[event.SessionID] - require.True(t, ok) + require.Contains(t, streams, event.SessionID, "missing stream for session") + + state := streams[event.SessionID] outEvents := readStream(ctx, t, event.UploadID, memUploader) require.Equal(t, len(state.events), len(outEvents), fmt.Sprintf("event: %v", event)) case <-ctx.Done(): - t.Fatalf("Timeout waiting for async upload, try `go test -v` to get more logs for details") + t.Fatal("Timeout waiting for async upload, try `go test -v` to get more logs for details") } } } diff --git a/lib/events/filesessions/fileasync_test.go b/lib/events/filesessions/fileasync_test.go index 5ff38cbc2b988..2f452808710e7 100644 --- a/lib/events/filesessions/fileasync_test.go +++ b/lib/events/filesessions/fileasync_test.go @@ -644,8 +644,10 @@ func emitStream(ctx context.Context, t *testing.T, streamer events.Streamer, inE // readStream reads and decodes the audit stream from uploadID func readStream(ctx context.Context, t *testing.T, uploadID string, uploader *eventstest.MemoryUploader) []apievents.AuditEvent { + t.Helper() + parts, err := uploader.GetParts(uploadID) - require.Nil(t, err) + require.NoError(t, err) var outEvents []apievents.AuditEvent var reader *events.ProtoReader @@ -654,10 +656,11 @@ func readStream(ctx context.Context, t *testing.T, uploadID string, uploader *ev reader = events.NewProtoReader(bytes.NewReader(part)) } else { err := reader.Reset(bytes.NewReader(part)) - require.Nil(t, err) + require.NoError(t, err) } out, err := reader.ReadAll(ctx) - require.Nil(t, err, "part crash %#v", part) + require.NoError(t, err, "part crash %#v", part) + outEvents = append(outEvents, out...) } return outEvents