Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 37 additions & 17 deletions go/vt/vtgate/vstream_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,7 @@ func waitForMetricsMatch(t *testing.T, getActual func() map[string]int64, want m
func TestVStreamsMetricsErrors(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Use a unique cell to avoid parallel tests interfering with each other's metrics
cell := "ac"
ks := "TestVStream"
Expand All @@ -637,11 +638,11 @@ func TestVStreamsMetricsErrors(t *testing.T) {
const wantErr = "Invalid arg message"
sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, wantErr))

send1 := []*binlogdatapb.VEvent{
expectedEvents := []*binlogdatapb.VEvent{
{Type: binlogdatapb.VEventType_GTID, Gtid: "gtid02"},
{Type: binlogdatapb.VEventType_COMMIT, Timestamp: 10, CurrentTime: 17 * 1e9},
}
sbc1.AddVStreamEvents(send1, nil)
sbc1.AddVStreamEvents(expectedEvents, nil)

vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Expand All @@ -654,30 +655,49 @@ func TestVStreamsMetricsErrors(t *testing.T) {
Gtid: "pos",
}},
}
ch := make(chan *binlogdatapb.VStreamResponse)
done := make(chan struct{})

results := make([]*binlogdatapb.VStreamResponse, 0)

var err error
go func() {
ch := make(chan *binlogdatapb.VStreamResponse)
wg := sync.WaitGroup{}
wg.Go(func() {
for res := range ch {
results = append(results, res)
}
})
wg.Go(func() {
err = vsm.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error {
ch <- &binlogdatapb.VStreamResponse{Events: events}
return nil
})
close(done)
}()
<-ch
<-done
close(ch)
})
wg.Wait()

if err == nil || !strings.Contains(err.Error(), wantErr) {
require.ErrorContains(t, err, wantErr)
require.Error(t, err)
require.ErrorContains(t, err, wantErr)

// Because there's essentially a race condition between the two streams,
// we may get 0 or 1 results, depending on whether the error from
// sbc0 or the events from sbc1 come first.
require.LessOrEqual(t, len(results), 1)
if len(results) == 1 {
require.Len(t, results[0].Events, 2)
}

expectedLabels1 := "TestVStream.-20.PRIMARY"
expectedLabels2 := "TestVStream.20-40.PRIMARY"
// When we verify the metrics, we should see that the -20 stream had an error,
// while the 20-40 stream might have one too (if the error from -20 came first),
// or might not (if the events from 20-40 came first).
// So we only verify the -20 metrics exactly, while the 20-40 metrics are
// verified to be at least 0 or 1 as appropriate.

errorCounts := vsm.vstreamsEndedWithErrors.Counts()
require.Contains(t, errorCounts, "TestVStream.-20.PRIMARY")
require.Contains(t, errorCounts, "TestVStream.20-40.PRIMARY")

wantVStreamsEndedWithErrors := make(map[string]int64)
wantVStreamsEndedWithErrors[expectedLabels1] = 1
wantVStreamsEndedWithErrors[expectedLabels2] = 0
assert.Equal(t, wantVStreamsEndedWithErrors, vsm.vstreamsEndedWithErrors.Counts(), "vstreamsEndedWithErrors matches")
require.Equal(t, int64(1), errorCounts["TestVStream.-20.PRIMARY"])
require.LessOrEqual(t, errorCounts["TestVStream.20-40.PRIMARY"], int64(1))
}

func TestVStreamRetriableErrors(t *testing.T) {
Expand Down
Loading