diff --git a/go/vt/vtgate/vstream_manager_test.go b/go/vt/vtgate/vstream_manager_test.go index 96665b38c30..0ea4e6fd68f 100644 --- a/go/vt/vtgate/vstream_manager_test.go +++ b/go/vt/vtgate/vstream_manager_test.go @@ -498,11 +498,11 @@ func TestVStreamsMetricsErrors(t *testing.T) { const wantErr = "Invalid arg message" sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, wantErr)) - send1 := []*binlogdatapb.VEvent{ + expectedEvents := []*binlogdatapb.VEvent{ {Type: binlogdatapb.VEventType_GTID, Gtid: "gtid02"}, {Type: binlogdatapb.VEventType_COMMIT, Timestamp: 10, CurrentTime: 17 * 1e9}, } - sbc1.AddVStreamEvents(send1, nil) + sbc1.AddVStreamEvents(expectedEvents, nil) vgtid := &binlogdatapb.VGtid{ ShardGtids: []*binlogdatapb.ShardGtid{{ @@ -516,32 +516,52 @@ func TestVStreamsMetricsErrors(t *testing.T) { }}, } - vstreamCtx, vstreamCancel := context.WithCancel(ctx) - defer vstreamCancel() - results := make([]*binlogdatapb.VStreamResponse, 0) - err := vsm.VStream(vstreamCtx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error { - results = append(results, &binlogdatapb.VStreamResponse{Events: events}) - if len(results) == 2 { - // We should never actually see 2 responses come in - vstreamCancel() + var err error + ch := make(chan *binlogdatapb.VStreamResponse) + wg := sync.WaitGroup{} + wg.Add(2) + go func() { + defer wg.Done() + for res := range ch { + results = append(results, res) } + }() + func() { + defer wg.Done() - return nil - }) + err = vsm.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error { + ch <- &binlogdatapb.VStreamResponse{Events: events} + return nil + }) + close(ch) + }() + wg.Wait() + + require.Error(t, err) + require.ErrorContains(t, err, wantErr) - if err == nil || !strings.Contains(err.Error(), wantErr) { - require.ErrorContains(t, err, wantErr) + // Because there's essentially a race condition between the two streams, + // we may get 0 or 1 results, depending on whether the error from + // sbc0 or the events from sbc1 come first. + require.LessOrEqual(t, len(results), 1) + if len(results) == 1 { + require.Len(t, results[0].Events, 2) } - expectedLabels1 := "TestVStream.-20.PRIMARY" - expectedLabels2 := "TestVStream.20-40.PRIMARY" + // When we verify the metrics, we should see that the -20 stream had an error, + // while the 20-40 stream might have one too (if the error from -20 came first), + // or might not (if the events from 20-40 came first). + // So we only verify the -20 metrics exactly, while the 20-40 metrics are + // verified to be at least 0 or 1 as appropriate. + + errorCounts := vsm.vstreamsEndedWithErrors.Counts() + require.Contains(t, errorCounts, "TestVStream.-20.PRIMARY") + require.Contains(t, errorCounts, "TestVStream.20-40.PRIMARY") - wantVStreamsEndedWithErrors := make(map[string]int64) - wantVStreamsEndedWithErrors[expectedLabels1] = 1 - wantVStreamsEndedWithErrors[expectedLabels2] = 0 - assert.Equal(t, wantVStreamsEndedWithErrors, vsm.vstreamsEndedWithErrors.Counts(), "vstreamsEndedWithErrors matches") + require.Equal(t, int64(1), errorCounts["TestVStream.-20.PRIMARY"]) + require.LessOrEqual(t, errorCounts["TestVStream.20-40.PRIMARY"], int64(1)) } func TestVStreamErrorInCallback(t *testing.T) {