Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 129 additions & 0 deletions go/vt/vtgate/vstream_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,10 +382,139 @@
wantVStreamsCreated["TestVStream.20-40.PRIMARY"] = 1
assert.Equal(t, wantVStreamsCreated, vsm.vstreamsCreated.Counts(), "vstreamsCreated matches")

<<<<<<< HEAD

Check failure on line 385 in go/vt/vtgate/vstream_manager_test.go

View workflow job for this annotation

GitHub Actions / Code Coverage

expected statement, found '<<'

Check failure on line 385 in go/vt/vtgate/vstream_manager_test.go

View workflow job for this annotation

GitHub Actions / Code Coverage

expected statement, found '<<'
wantVStreamsLag := make(map[string]int64)
wantVStreamsLag["TestVStream.-20.PRIMARY"] = 5
wantVStreamsLag["TestVStream.20-40.PRIMARY"] = 7
assert.Equal(t, wantVStreamsLag, vsm.vstreamsLag.Counts(), "vstreamsLag matches")
=======
wantVStreamsCreated := map[string]int64{
expectedLabels1: 1,
expectedLabels2: 1,
}
waitForMetricsMatch(t, vsm.vstreamsCreated.Counts, wantVStreamsCreated)

wantVStreamsLag := map[string]int64{
expectedLabels1: 5,
expectedLabels2: 7,
}
waitForMetricsMatch(t, vsm.vstreamsLag.Counts, wantVStreamsLag)

wantVStreamsCount := map[string]int64{
expectedLabels1: 1,
expectedLabels2: 1,
}
waitForMetricsMatch(t, vsm.vstreamsCount.Counts, wantVStreamsCount)

wantVEventsCount := map[string]int64{
expectedLabels1: 2,
expectedLabels2: 2,
}
waitForMetricsMatch(t, vsm.vstreamsEventsStreamed.Counts, wantVEventsCount)

wantVStreamsEndedWithErrors := map[string]int64{
expectedLabels1: 0,
expectedLabels2: 0,
}
waitForMetricsMatch(t, vsm.vstreamsEndedWithErrors.Counts, wantVStreamsEndedWithErrors)
}

func waitForMetricsMatch(t *testing.T, getActual func() map[string]int64, want map[string]int64) {
deadline := time.Now().Add(1 * time.Second)
for time.Now().Before(deadline) {
if reflect.DeepEqual(getActual(), want) {
return
}
time.Sleep(10 * time.Millisecond)
}
assert.Equal(t, want, getActual(), "metrics did not match within timeout")
}

func TestVStreamsMetricsErrors(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Use a unique cell to avoid parallel tests interfering with each other's metrics
cell := "ac"
ks := "TestVStream"
_ = createSandbox(ks)
hc := discovery.NewFakeHealthCheck(nil)
st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"})
vsm := newTestVStreamManager(ctx, hc, st, cell)
vsm.vstreamsCreated.ResetAll()
vsm.vstreamsLag.ResetAll()
vsm.vstreamsCount.ResetAll()
vsm.vstreamsEventsStreamed.ResetAll()
vsm.vstreamsEndedWithErrors.ResetAll()
sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, ctx, st, ks, "-20", sbc0.Tablet())
sbc1 := hc.AddTestTablet(cell, "1.1.1.2", 1002, ks, "20-40", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, ctx, st, ks, "20-40", sbc1.Tablet())

const wantErr = "Invalid arg message"
sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, wantErr))

expectedEvents := []*binlogdatapb.VEvent{
{Type: binlogdatapb.VEventType_GTID, Gtid: "gtid02"},
{Type: binlogdatapb.VEventType_COMMIT, Timestamp: 10, CurrentTime: 17 * 1e9},
}
sbc1.AddVStreamEvents(expectedEvents, nil)

vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: ks,
Shard: "-20",
Gtid: "pos",
}, {
Keyspace: ks,
Shard: "20-40",
Gtid: "pos",
}},
}

results := make([]*binlogdatapb.VStreamResponse, 0)

var err error
ch := make(chan *binlogdatapb.VStreamResponse)
wg := sync.WaitGroup{}
wg.Go(func() {
for res := range ch {
results = append(results, res)
}
})
wg.Go(func() {
err = vsm.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error {
ch <- &binlogdatapb.VStreamResponse{Events: events}
return nil
})
close(ch)
})
wg.Wait()

require.Error(t, err)
require.ErrorContains(t, err, wantErr)

// Because there's essentially a race condition between the two streams,
// we may get 0 or 1 results, depending on whether the error from
// sbc0 or the events from sbc1 come first.
require.LessOrEqual(t, len(results), 1)
if len(results) == 1 {
require.Len(t, results[0].Events, 2)
}

// When we verify the metrics, we should see that the -20 stream had an error,
// while the 20-40 stream might have one too (if the error from -20 came first),
// or might not (if the events from 20-40 came first).
// So we only verify the -20 metrics exactly, while the 20-40 metrics are
// verified to be at least 0 or 1 as appropriate.

errorCounts := vsm.vstreamsEndedWithErrors.Counts()
require.Contains(t, errorCounts, "TestVStream.-20.PRIMARY")
require.Contains(t, errorCounts, "TestVStream.20-40.PRIMARY")

require.Equal(t, int64(1), errorCounts["TestVStream.-20.PRIMARY"])
require.LessOrEqual(t, errorCounts["TestVStream.20-40.PRIMARY"], int64(1))
>>>>>>> e5593a8a12 (Deflake `TestVStreamsMetricsErrors`. (#18679))
}

func TestVStreamRetriableErrors(t *testing.T) {
Expand Down
Loading