Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 40 additions & 20 deletions go/vt/vtgate/vstream_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,11 +498,11 @@ func TestVStreamsMetricsErrors(t *testing.T) {
const wantErr = "Invalid arg message"
sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, wantErr))

send1 := []*binlogdatapb.VEvent{
expectedEvents := []*binlogdatapb.VEvent{
{Type: binlogdatapb.VEventType_GTID, Gtid: "gtid02"},
{Type: binlogdatapb.VEventType_COMMIT, Timestamp: 10, CurrentTime: 17 * 1e9},
}
sbc1.AddVStreamEvents(send1, nil)
sbc1.AddVStreamEvents(expectedEvents, nil)

vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Expand All @@ -516,32 +516,52 @@ func TestVStreamsMetricsErrors(t *testing.T) {
}},
}

vstreamCtx, vstreamCancel := context.WithCancel(ctx)
defer vstreamCancel()

results := make([]*binlogdatapb.VStreamResponse, 0)
err := vsm.VStream(vstreamCtx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error {
results = append(results, &binlogdatapb.VStreamResponse{Events: events})

if len(results) == 2 {
// We should never actually see 2 responses come in
vstreamCancel()
var err error
ch := make(chan *binlogdatapb.VStreamResponse)
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
for res := range ch {
results = append(results, res)
}
}()
func() {
defer wg.Done()

return nil
})
err = vsm.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error {
ch <- &binlogdatapb.VStreamResponse{Events: events}
return nil
})
close(ch)
}()
wg.Wait()

require.Error(t, err)
require.ErrorContains(t, err, wantErr)

if err == nil || !strings.Contains(err.Error(), wantErr) {
require.ErrorContains(t, err, wantErr)
// Because there's essentially a race condition between the two streams,
// we may get 0 or 1 results, depending on whether the error from
// sbc0 or the events from sbc1 come first.
require.LessOrEqual(t, len(results), 1)
if len(results) == 1 {
require.Len(t, results[0].Events, 2)
}

expectedLabels1 := "TestVStream.-20.PRIMARY"
expectedLabels2 := "TestVStream.20-40.PRIMARY"
// When we verify the metrics, we should see that the -20 stream had an error,
// while the 20-40 stream might have one too (if the error from -20 came first),
// or might not (if the events from 20-40 came first).
// So we only verify the -20 metrics exactly, while the 20-40 metrics are
// verified to be at least 0 or 1 as appropriate.

errorCounts := vsm.vstreamsEndedWithErrors.Counts()
require.Contains(t, errorCounts, "TestVStream.-20.PRIMARY")
require.Contains(t, errorCounts, "TestVStream.20-40.PRIMARY")

wantVStreamsEndedWithErrors := make(map[string]int64)
wantVStreamsEndedWithErrors[expectedLabels1] = 1
wantVStreamsEndedWithErrors[expectedLabels2] = 0
assert.Equal(t, wantVStreamsEndedWithErrors, vsm.vstreamsEndedWithErrors.Counts(), "vstreamsEndedWithErrors matches")
require.Equal(t, int64(1), errorCounts["TestVStream.-20.PRIMARY"])
require.LessOrEqual(t, errorCounts["TestVStream.20-40.PRIMARY"], int64(1))
}

func TestVStreamErrorInCallback(t *testing.T) {
Expand Down
Loading