diff --git a/go/vt/vtgate/vstream_manager_test.go b/go/vt/vtgate/vstream_manager_test.go index 5e44d08fbca..748f5be526b 100644 --- a/go/vt/vtgate/vstream_manager_test.go +++ b/go/vt/vtgate/vstream_manager_test.go @@ -19,6 +19,7 @@ package vtgate import ( "context" "fmt" + "reflect" "strings" "sync" "sync/atomic" @@ -382,30 +383,47 @@ func TestVStreamsMetrics(t *testing.T) { <-ch expectedLabels1 := "TestVStream.-20.PRIMARY" expectedLabels2 := "TestVStream.20-40.PRIMARY" - wantVStreamsCreated := make(map[string]int64) - wantVStreamsCreated[expectedLabels1] = 1 - wantVStreamsCreated[expectedLabels2] = 1 - assert.Equal(t, wantVStreamsCreated, vsm.vstreamsCreated.Counts(), "vstreamsCreated matches") - - wantVStreamsLag := make(map[string]int64) - wantVStreamsLag[expectedLabels1] = 5 - wantVStreamsLag[expectedLabels2] = 7 - assert.Equal(t, wantVStreamsLag, vsm.vstreamsLag.Counts(), "vstreamsLag matches") - - wantVStreamsCount := make(map[string]int64) - wantVStreamsCount[expectedLabels1] = 1 - wantVStreamsCount[expectedLabels2] = 1 - assert.Equal(t, wantVStreamsCount, vsm.vstreamsCount.Counts(), "vstreamsCount matches") - - wantVStreamsEventsStreamed := make(map[string]int64) - wantVStreamsEventsStreamed[expectedLabels1] = 2 - wantVStreamsEventsStreamed[expectedLabels2] = 2 - assert.Equal(t, wantVStreamsEventsStreamed, vsm.vstreamsEventsStreamed.Counts(), "vstreamsEventsStreamed matches") - wantVStreamsEndedWithErrors := make(map[string]int64) - wantVStreamsEndedWithErrors[expectedLabels1] = 0 - wantVStreamsEndedWithErrors[expectedLabels2] = 0 - assert.Equal(t, wantVStreamsEndedWithErrors, vsm.vstreamsEndedWithErrors.Counts(), "vstreamsEndedWithErrors matches") + wantVStreamsCreated := map[string]int64{ + expectedLabels1: 1, + expectedLabels2: 1, + } + waitForMetricsMatch(t, vsm.vstreamsCreated.Counts, wantVStreamsCreated) + + wantVStreamsLag := map[string]int64{ + expectedLabels1: 5, + expectedLabels2: 7, + } + waitForMetricsMatch(t, vsm.vstreamsLag.Counts, wantVStreamsLag) + + wantVStreamsCount := map[string]int64{ + expectedLabels1: 1, + expectedLabels2: 1, + } + waitForMetricsMatch(t, vsm.vstreamsCount.Counts, wantVStreamsCount) + + wantVEventsCount := map[string]int64{ + expectedLabels1: 2, + expectedLabels2: 2, + } + waitForMetricsMatch(t, vsm.vstreamsEventsStreamed.Counts, wantVEventsCount) + + wantVStreamsEndedWithErrors := map[string]int64{ + expectedLabels1: 0, + expectedLabels2: 0, + } + waitForMetricsMatch(t, vsm.vstreamsEndedWithErrors.Counts, wantVStreamsEndedWithErrors) +} + +func waitForMetricsMatch(t *testing.T, getActual func() map[string]int64, want map[string]int64) { + deadline := time.Now().Add(1 * time.Second) + for time.Now().Before(deadline) { + if reflect.DeepEqual(getActual(), want) { + return + } + time.Sleep(10 * time.Millisecond) + } + assert.Equal(t, want, getActual(), "metrics did not match within timeout") } func TestVStreamsMetricsErrors(t *testing.T) {