From 50ad606749406f674a374425d31e418e13651823 Mon Sep 17 00:00:00 2001 From: Zac Bergquist Date: Tue, 17 Oct 2023 10:56:47 -0600 Subject: [PATCH] Deflake TestChaosUpload This test attempted to verify that the uploader is able to operate correctly even when faults are injected into the system. It ended up being flaky for a number of reasons, but the biggest issue is that it wasn't simulating a real world scenario. 1. It used a fake clock, but never advanced it, so the uploader's scan loop didn't correctly "tick" and only ran once. 2. It invoked many uploader scans in parallel while also running the uploader's own scan loop in the background. In any real deployment we only have one scan loop running, so this test is creating an environment that never exists in the wild. Updates #33099 --- .../filesessions/fileasync_chaos_test.go | 45 +++++-------------- lib/events/filesessions/fileasync_test.go | 9 ++-- 2 files changed, 16 insertions(+), 38 deletions(-) diff --git a/lib/events/filesessions/fileasync_chaos_test.go b/lib/events/filesessions/fileasync_chaos_test.go index 73c32f846dd6e..1d114752a47e6 100644 --- a/lib/events/filesessions/fileasync_chaos_test.go +++ b/lib/events/filesessions/fileasync_chaos_test.go @@ -51,10 +51,9 @@ func TestChaosUpload(t *testing.T) { t.Skip("Skipping chaos test in short mode.") } - ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - clock := clockwork.NewFakeClock() eventsC := make(chan events.UploadEvent, 100) memUploader := eventstest.NewMemoryUploader(eventsC) streamer, err := events.NewProtoStreamer(events.ProtoStreamerConfig{ @@ -111,19 +110,17 @@ func TestChaosUpload(t *testing.T) { }) require.NoError(t, err) - scanPeriod := 10 * time.Second + scanPeriod := 3 * time.Second uploader, err := NewUploader(UploaderConfig{ ScanDir: scanDir, CorruptedDir: corruptedDir, ScanPeriod: scanPeriod, Streamer: faultyStreamer, - Clock: clock, + Clock: clockwork.NewRealClock(), }) require.NoError(t, err) - go uploader.Serve(ctx) - // wait until uploader blocks on the clock - clock.BlockUntil(1) + go uploader.Serve(ctx) defer uploader.Close() fileStreamer, err := NewStreamer(scanDir) @@ -164,15 +161,6 @@ func TestChaosUpload(t *testing.T) { }() } - // initiate concurrent scans - scansCh := make(chan error, parallelStreams) - for i := 0; i < parallelStreams; i++ { - go func() { - _, err := uploader.Scan(ctx) - scansCh <- trace.Wrap(err) - }() - } - // wait for all streams to be completed streams := make(map[string]streamState) for i := 0; i < parallelStreams; i++ { @@ -185,32 +173,19 @@ func TestChaosUpload(t *testing.T) { } } - // wait for all scans to be completed - for i := 0; i < parallelStreams; i++ { - select { - case err := <-scansCh: - require.NoError(t, err, trace.DebugReport(err)) - case <-ctx.Done(): - t.Fatalf("Timeout waiting for parallel scan complete, try `go test -v` to get more logs for details") - } - } + require.Len(t, streams, parallelStreams) for i := 0; i < parallelStreams; i++ { - // do scans to catch remaining uploads - _, err = uploader.Scan(ctx) - require.NoError(t, err) - - // wait for the upload events - var event events.UploadEvent select { - case event = <-eventsC: + case event := <-eventsC: require.NoError(t, event.Error) - state, ok := streams[event.SessionID] - require.True(t, ok) + require.Contains(t, streams, event.SessionID, "missing stream for session") + + state := streams[event.SessionID] outEvents := readStream(ctx, t, event.UploadID, memUploader) require.Equal(t, len(state.events), len(outEvents), fmt.Sprintf("event: %v", event)) case <-ctx.Done(): - t.Fatalf("Timeout waiting for async upload, try `go test -v` to get more logs for details") + t.Fatal("Timeout waiting for async upload, try `go test -v` to get more logs for details") } } } diff --git a/lib/events/filesessions/fileasync_test.go b/lib/events/filesessions/fileasync_test.go index 5ff38cbc2b988..2f452808710e7 100644 --- a/lib/events/filesessions/fileasync_test.go +++ b/lib/events/filesessions/fileasync_test.go @@ -644,8 +644,10 @@ func emitStream(ctx context.Context, t *testing.T, streamer events.Streamer, inE // readStream reads and decodes the audit stream from uploadID func readStream(ctx context.Context, t *testing.T, uploadID string, uploader *eventstest.MemoryUploader) []apievents.AuditEvent { + t.Helper() + parts, err := uploader.GetParts(uploadID) - require.Nil(t, err) + require.NoError(t, err) var outEvents []apievents.AuditEvent var reader *events.ProtoReader @@ -654,10 +656,11 @@ func readStream(ctx context.Context, t *testing.T, uploadID string, uploader *ev reader = events.NewProtoReader(bytes.NewReader(part)) } else { err := reader.Reset(bytes.NewReader(part)) - require.Nil(t, err) + require.NoError(t, err) } out, err := reader.ReadAll(ctx) - require.Nil(t, err, "part crash %#v", part) + require.NoError(t, err, "part crash %#v", part) + outEvents = append(outEvents, out...) } return outEvents