diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index ada1ddb131c..9328b50dd25 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -18,6 +18,7 @@ package vtgate import ( "context" + "errors" "fmt" "io" "regexp" @@ -51,8 +52,11 @@ type vstreamManager struct { toposerv srvtopo.Server cell string - vstreamsCreated *stats.CountersWithMultiLabels - vstreamsLag *stats.GaugesWithMultiLabels + vstreamsCreated *stats.CountersWithMultiLabels + vstreamsLag *stats.GaugesWithMultiLabels + vstreamsCount *stats.CountersWithMultiLabels + vstreamsEventsStreamed *stats.CountersWithMultiLabels + vstreamsEndedWithErrors *stats.CountersWithMultiLabels } // maxSkewTimeoutSeconds is the maximum allowed skew between two streams when the MinimizeSkew flag is set @@ -143,6 +147,7 @@ type journalEvent struct { func newVStreamManager(resolver *srvtopo.Resolver, serv srvtopo.Server, cell string) *vstreamManager { exporter := servenv.NewExporter(cell, "VStreamManager") + labels := []string{"Keyspace", "ShardName", "TabletType"} return &vstreamManager{ resolver: resolver, @@ -151,11 +156,23 @@ func newVStreamManager(resolver *srvtopo.Resolver, serv srvtopo.Server, cell str vstreamsCreated: exporter.NewCountersWithMultiLabels( "VStreamsCreated", "Number of vstreams created", - []string{"Keyspace", "ShardName", "TabletType"}), + labels), vstreamsLag: exporter.NewGaugesWithMultiLabels( "VStreamsLag", "Difference between event current time and the binlog event timestamp", - []string{"Keyspace", "ShardName", "TabletType"}), + labels), + vstreamsCount: exporter.NewCountersWithMultiLabels( + "VStreamsCount", + "Number of active vstreams", + labels), + vstreamsEventsStreamed: exporter.NewCountersWithMultiLabels( + "VStreamsEventsStreamed", + "Number of events sent across all vstreams", + labels), + vstreamsEndedWithErrors: exporter.NewCountersWithMultiLabels( + "VStreamsEndedWithErrors", + "Number of vstreams that ended with errors", + labels), } } @@ -378,11 +395,26 @@ func (vs *vstream) startOneStream(ctx context.Context, sgtid *binlogdatapb.Shard vs.wg.Add(1) go func() { defer vs.wg.Done() + + labelValues := []string{sgtid.Keyspace, sgtid.Shard, vs.tabletType.String()} + // Initialize vstreamsEndedWithErrors metric to zero. + vs.vsm.vstreamsEndedWithErrors.Add(labelValues, 0) + vs.vsm.vstreamsCreated.Add(labelValues, 1) + vs.vsm.vstreamsCount.Add(labelValues, 1) + err := vs.streamFromTablet(ctx, sgtid) // Set the error on exit. First one wins. if err != nil { log.Errorf("Error in vstream for %+v: %s", sgtid, err) + // Get the original/base error. + uerr := vterrors.UnwrapAll(err) + if !errors.Is(uerr, context.Canceled) && !errors.Is(uerr, context.DeadlineExceeded) { + // The client did not intentionally end the stream so this was an error in the + // vstream itself. + vs.vsm.vstreamsEndedWithErrors.Add(labelValues, 1) + } + vs.vsm.vstreamsCount.Add(labelValues, -1) vs.once.Do(func() { vs.setError(err, fmt.Sprintf("error starting stream from shard GTID %+v", sgtid)) vs.cancel() @@ -503,6 +535,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha // It will be closed when all journal events converge. var journalDone chan struct{} ignoreTablets := make([]*topodatapb.TabletAlias, 0) + labelValues := []string{sgtid.Keyspace, sgtid.Shard, vs.tabletType.String()} errCount := 0 for { @@ -613,18 +646,11 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha TableLastPKs: sgtid.TablePKs, Options: options, } - var vstreamCreatedOnce sync.Once log.Infof("Starting to vstream from %s, with req %+v", tabletAliasString, req) err = tabletConn.VStream(ctx, req, func(events []*binlogdatapb.VEvent) error { // We received a valid event. Reset error count. errCount = 0 - labels := []string{sgtid.Keyspace, sgtid.Shard, req.Target.TabletType.String()} - - vstreamCreatedOnce.Do(func() { - vs.vsm.vstreamsCreated.Add(labels, 1) - }) - select { case <-ctx.Done(): return vterrors.Wrapf(ctx.Err(), "context ended while streaming from tablet %s in %s/%s", @@ -755,7 +781,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha sendevents = append(sendevents, event) } lag := event.CurrentTime/1e9 - event.Timestamp - vs.vsm.vstreamsLag.Set(labels, lag) + vs.vsm.vstreamsLag.Set(labelValues, lag) } if len(sendevents) != 0 { eventss = append(eventss, sendevents) @@ -793,6 +819,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha } log.Infof("vstream for %s/%s error, retrying: %v", sgtid.Keyspace, sgtid.Shard, err) } + } // shouldRetry determines whether we should exit immediately or retry the vstream. @@ -838,6 +865,7 @@ func (vs *vstream) shouldRetry(err error) (retry bool, ignoreTablet bool) { func (vs *vstream) sendAll(ctx context.Context, sgtid *binlogdatapb.ShardGtid, eventss [][]*binlogdatapb.VEvent) error { vs.mu.Lock() defer vs.mu.Unlock() + labelValues := []string{sgtid.Keyspace, sgtid.Shard, vs.tabletType.String()} // Send all chunks while holding the lock. for _, events := range eventss { @@ -890,6 +918,7 @@ func (vs *vstream) sendAll(ctx context.Context, sgtid *binlogdatapb.ShardGtid, e case <-ctx.Done(): return nil case vs.eventCh <- events: + vs.vsm.vstreamsEventsStreamed.Add(labelValues, int64(len(events))) } } return nil diff --git a/go/vt/vtgate/vstream_manager_test.go b/go/vt/vtgate/vstream_manager_test.go index e209e15fb3d..78014b69d5c 100644 --- a/go/vt/vtgate/vstream_manager_test.go +++ b/go/vt/vtgate/vstream_manager_test.go @@ -335,7 +335,7 @@ func TestVStreamMulti(t *testing.T) { } } -func TestVStreamsCreatedAndLagMetrics(t *testing.T) { +func TestVStreamsMetrics(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() cell := "aa" @@ -346,9 +346,12 @@ func TestVStreamsCreatedAndLagMetrics(t *testing.T) { vsm := newTestVStreamManager(ctx, hc, st, cell) vsm.vstreamsCreated.ResetAll() vsm.vstreamsLag.ResetAll() + vsm.vstreamsCount.ResetAll() + vsm.vstreamsEventsStreamed.ResetAll() + vsm.vstreamsEndedWithErrors.ResetAll() sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, ctx, st, ks, "-20", sbc0.Tablet()) - sbc1 := hc.AddTestTablet(cell, "1.1.1.1", 1002, ks, "20-40", topodatapb.TabletType_PRIMARY, true, 1, nil) + sbc1 := hc.AddTestTablet(cell, "1.1.1.2", 1002, ks, "20-40", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, ctx, st, ks, "20-40", sbc1.Tablet()) send0 := []*binlogdatapb.VEvent{ @@ -377,15 +380,96 @@ func TestVStreamsCreatedAndLagMetrics(t *testing.T) { ch := startVStream(ctx, t, vsm, vgtid, nil) <-ch <-ch + expectedLabels1 := "TestVStream.-20.PRIMARY" + expectedLabels2 := "TestVStream.20-40.PRIMARY" wantVStreamsCreated := make(map[string]int64) - wantVStreamsCreated["TestVStream.-20.PRIMARY"] = 1 - wantVStreamsCreated["TestVStream.20-40.PRIMARY"] = 1 + wantVStreamsCreated[expectedLabels1] = 1 + wantVStreamsCreated[expectedLabels2] = 1 assert.Equal(t, wantVStreamsCreated, vsm.vstreamsCreated.Counts(), "vstreamsCreated matches") wantVStreamsLag := make(map[string]int64) - wantVStreamsLag["TestVStream.-20.PRIMARY"] = 5 - wantVStreamsLag["TestVStream.20-40.PRIMARY"] = 7 + wantVStreamsLag[expectedLabels1] = 5 + wantVStreamsLag[expectedLabels2] = 7 assert.Equal(t, wantVStreamsLag, vsm.vstreamsLag.Counts(), "vstreamsLag matches") + + wantVStreamsCount := make(map[string]int64) + wantVStreamsCount[expectedLabels1] = 1 + wantVStreamsCount[expectedLabels2] = 1 + assert.Equal(t, wantVStreamsCount, vsm.vstreamsCount.Counts(), "vstreamsCount matches") + + wantVStreamsEventsStreamed := make(map[string]int64) + wantVStreamsEventsStreamed[expectedLabels1] = 2 + wantVStreamsEventsStreamed[expectedLabels2] = 2 + assert.Equal(t, wantVStreamsEventsStreamed, vsm.vstreamsEventsStreamed.Counts(), "vstreamsEventsStreamed matches") + + wantVStreamsEndedWithErrors := make(map[string]int64) + wantVStreamsEndedWithErrors[expectedLabels1] = 0 + wantVStreamsEndedWithErrors[expectedLabels2] = 0 + assert.Equal(t, wantVStreamsEndedWithErrors, vsm.vstreamsEndedWithErrors.Counts(), "vstreamsEndedWithErrors matches") +} + +func TestVStreamsMetricsErrors(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cell := "aa" + ks := "TestVStream" + _ = createSandbox(ks) + hc := discovery.NewFakeHealthCheck(nil) + st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"}) + vsm := newTestVStreamManager(ctx, hc, st, cell) + vsm.vstreamsCreated.ResetAll() + vsm.vstreamsLag.ResetAll() + vsm.vstreamsCount.ResetAll() + vsm.vstreamsEventsStreamed.ResetAll() + vsm.vstreamsEndedWithErrors.ResetAll() + sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) + addTabletToSandboxTopo(t, ctx, st, ks, "-20", sbc0.Tablet()) + sbc1 := hc.AddTestTablet(cell, "1.1.1.2", 1002, ks, "20-40", topodatapb.TabletType_PRIMARY, true, 1, nil) + addTabletToSandboxTopo(t, ctx, st, ks, "20-40", sbc1.Tablet()) + + const wantErr = "Invalid arg message" + sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, wantErr)) + + send1 := []*binlogdatapb.VEvent{ + {Type: binlogdatapb.VEventType_GTID, Gtid: "gtid02"}, + {Type: binlogdatapb.VEventType_COMMIT, Timestamp: 10, CurrentTime: 17 * 1e9}, + } + sbc1.AddVStreamEvents(send1, nil) + + vgtid := &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{{ + Keyspace: ks, + Shard: "-20", + Gtid: "pos", + }, { + Keyspace: ks, + Shard: "20-40", + Gtid: "pos", + }}, + } + ch := make(chan *binlogdatapb.VStreamResponse) + done := make(chan struct{}) + go func() { + err := vsm.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error { + ch <- &binlogdatapb.VStreamResponse{Events: events} + return nil + }) + + if err == nil || !strings.Contains(err.Error(), wantErr) { + require.ErrorContains(t, err, wantErr) + } + close(done) + }() + <-ch + <-done + + expectedLabels1 := "TestVStream.-20.PRIMARY" + expectedLabels2 := "TestVStream.20-40.PRIMARY" + + wantVStreamsEndedWithErrors := make(map[string]int64) + wantVStreamsEndedWithErrors[expectedLabels1] = 1 + wantVStreamsEndedWithErrors[expectedLabels2] = 0 + assert.Equal(t, wantVStreamsEndedWithErrors, vsm.vstreamsEndedWithErrors.Counts(), "vstreamsEndedWithErrors matches") } func TestVStreamRetriableErrors(t *testing.T) {