From 4975fbd78dadba317dc270c5e0f3157806d82a8f Mon Sep 17 00:00:00 2001 From: "vitess-bot[bot]" <108069721+vitess-bot[bot]@users.noreply.github.com> Date: Tue, 23 Sep 2025 16:11:48 +0200 Subject: [PATCH] Cherry-pick e5593a8a123035e13cdf8cb3b6cfcca43745404d with conflicts --- go/vt/vtgate/vstream_manager_test.go | 129 +++++++++++++++++++++++++++ 1 file changed, 129 insertions(+) diff --git a/go/vt/vtgate/vstream_manager_test.go b/go/vt/vtgate/vstream_manager_test.go index 4e10e60c758..9b7a6305e03 100644 --- a/go/vt/vtgate/vstream_manager_test.go +++ b/go/vt/vtgate/vstream_manager_test.go @@ -382,10 +382,139 @@ func TestVStreamsCreatedAndLagMetrics(t *testing.T) { wantVStreamsCreated["TestVStream.20-40.PRIMARY"] = 1 assert.Equal(t, wantVStreamsCreated, vsm.vstreamsCreated.Counts(), "vstreamsCreated matches") +<<<<<<< HEAD wantVStreamsLag := make(map[string]int64) wantVStreamsLag["TestVStream.-20.PRIMARY"] = 5 wantVStreamsLag["TestVStream.20-40.PRIMARY"] = 7 assert.Equal(t, wantVStreamsLag, vsm.vstreamsLag.Counts(), "vstreamsLag matches") +======= + wantVStreamsCreated := map[string]int64{ + expectedLabels1: 1, + expectedLabels2: 1, + } + waitForMetricsMatch(t, vsm.vstreamsCreated.Counts, wantVStreamsCreated) + + wantVStreamsLag := map[string]int64{ + expectedLabels1: 5, + expectedLabels2: 7, + } + waitForMetricsMatch(t, vsm.vstreamsLag.Counts, wantVStreamsLag) + + wantVStreamsCount := map[string]int64{ + expectedLabels1: 1, + expectedLabels2: 1, + } + waitForMetricsMatch(t, vsm.vstreamsCount.Counts, wantVStreamsCount) + + wantVEventsCount := map[string]int64{ + expectedLabels1: 2, + expectedLabels2: 2, + } + waitForMetricsMatch(t, vsm.vstreamsEventsStreamed.Counts, wantVEventsCount) + + wantVStreamsEndedWithErrors := map[string]int64{ + expectedLabels1: 0, + expectedLabels2: 0, + } + waitForMetricsMatch(t, vsm.vstreamsEndedWithErrors.Counts, wantVStreamsEndedWithErrors) +} + +func waitForMetricsMatch(t *testing.T, getActual func() map[string]int64, want map[string]int64) { + deadline := time.Now().Add(1 * time.Second) + for time.Now().Before(deadline) { + if reflect.DeepEqual(getActual(), want) { + return + } + time.Sleep(10 * time.Millisecond) + } + assert.Equal(t, want, getActual(), "metrics did not match within timeout") +} + +func TestVStreamsMetricsErrors(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Use a unique cell to avoid parallel tests interfering with each other's metrics + cell := "ac" + ks := "TestVStream" + _ = createSandbox(ks) + hc := discovery.NewFakeHealthCheck(nil) + st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"}) + vsm := newTestVStreamManager(ctx, hc, st, cell) + vsm.vstreamsCreated.ResetAll() + vsm.vstreamsLag.ResetAll() + vsm.vstreamsCount.ResetAll() + vsm.vstreamsEventsStreamed.ResetAll() + vsm.vstreamsEndedWithErrors.ResetAll() + sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) + addTabletToSandboxTopo(t, ctx, st, ks, "-20", sbc0.Tablet()) + sbc1 := hc.AddTestTablet(cell, "1.1.1.2", 1002, ks, "20-40", topodatapb.TabletType_PRIMARY, true, 1, nil) + addTabletToSandboxTopo(t, ctx, st, ks, "20-40", sbc1.Tablet()) + + const wantErr = "Invalid arg message" + sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, wantErr)) + + expectedEvents := []*binlogdatapb.VEvent{ + {Type: binlogdatapb.VEventType_GTID, Gtid: "gtid02"}, + {Type: binlogdatapb.VEventType_COMMIT, Timestamp: 10, CurrentTime: 17 * 1e9}, + } + sbc1.AddVStreamEvents(expectedEvents, nil) + + vgtid := &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{{ + Keyspace: ks, + Shard: "-20", + Gtid: "pos", + }, { + Keyspace: ks, + Shard: "20-40", + Gtid: "pos", + }}, + } + + results := make([]*binlogdatapb.VStreamResponse, 0) + + var err error + ch := make(chan *binlogdatapb.VStreamResponse) + wg := sync.WaitGroup{} + wg.Go(func() { + for res := range ch { + results = append(results, res) + } + }) + wg.Go(func() { + err = vsm.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error { + ch <- &binlogdatapb.VStreamResponse{Events: events} + return nil + }) + close(ch) + }) + wg.Wait() + + require.Error(t, err) + require.ErrorContains(t, err, wantErr) + + // Because there's essentially a race condition between the two streams, + // we may get 0 or 1 results, depending on whether the error from + // sbc0 or the events from sbc1 come first. + require.LessOrEqual(t, len(results), 1) + if len(results) == 1 { + require.Len(t, results[0].Events, 2) + } + + // When we verify the metrics, we should see that the -20 stream had an error, + // while the 20-40 stream might have one too (if the error from -20 came first), + // or might not (if the events from 20-40 came first). + // So we only verify the -20 metrics exactly, while the 20-40 metrics are + // verified to be at least 0 or 1 as appropriate. + + errorCounts := vsm.vstreamsEndedWithErrors.Counts() + require.Contains(t, errorCounts, "TestVStream.-20.PRIMARY") + require.Contains(t, errorCounts, "TestVStream.20-40.PRIMARY") + + require.Equal(t, int64(1), errorCounts["TestVStream.-20.PRIMARY"]) + require.LessOrEqual(t, errorCounts["TestVStream.20-40.PRIMARY"], int64(1)) +>>>>>>> e5593a8a12 (Deflake `TestVStreamsMetricsErrors`. (#18679)) } func TestVStreamRetriableErrors(t *testing.T) {