From f6f37d0e6680cc8ddf6bb029dbf1239dd1d3bca6 Mon Sep 17 00:00:00 2001 From: twthorn Date: Mon, 24 Feb 2025 18:00:19 -0500 Subject: [PATCH 1/7] Add more vstream metrics for vstream manager Signed-off-by: twthorn --- go/vt/vtgate/vstream_manager.go | 41 +++++++++--- go/vt/vtgate/vstream_manager_test.go | 97 ++++++++++++++++++++++++++-- 2 files changed, 121 insertions(+), 17 deletions(-) diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index ada1ddb131c..57016cc0a7d 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -51,8 +51,11 @@ type vstreamManager struct { toposerv srvtopo.Server cell string - vstreamsCreated *stats.CountersWithMultiLabels - vstreamsLag *stats.GaugesWithMultiLabels + vstreamsCreated *stats.CountersWithMultiLabels + vstreamsLag *stats.GaugesWithMultiLabels + vstreamsCount *stats.CountersWithMultiLabels + vstreamsEventsStreamed *stats.CountersWithMultiLabels + vstreamsEndedWithErrors *stats.CountersWithMultiLabels } // maxSkewTimeoutSeconds is the maximum allowed skew between two streams when the MinimizeSkew flag is set @@ -151,10 +154,22 @@ func newVStreamManager(resolver *srvtopo.Resolver, serv srvtopo.Server, cell str vstreamsCreated: exporter.NewCountersWithMultiLabels( "VStreamsCreated", "Number of vstreams created", - []string{"Keyspace", "ShardName", "TabletType"}), + []string{"Keyspace", "ShardName", "TabletType", "TabletHostname"}), vstreamsLag: exporter.NewGaugesWithMultiLabels( "VStreamsLag", "Difference between event current time and the binlog event timestamp", + []string{"Keyspace", "ShardName", "TabletType", "TabletHostname"}), + vstreamsCount: exporter.NewCountersWithMultiLabels( + "VStreamsCount", + "Number of active vstreams", + []string{"Keyspace", "ShardName", "TabletType", "TabletHostname"}), + vstreamsEventsStreamed: exporter.NewCountersWithMultiLabels( + "VStreamsEventsStreamed", + "Number of vstreams events sent", + []string{"Keyspace", "ShardName", "TabletType", "TabletHostname"}), + vstreamsEndedWithErrors: exporter.NewCountersWithMultiLabels( + "VStreamsEndedWithErrors", + "Number of vstreams ended with errors", []string{"Keyspace", "ShardName", "TabletType"}), } } @@ -378,11 +393,17 @@ func (vs *vstream) startOneStream(ctx context.Context, sgtid *binlogdatapb.Shard vs.wg.Add(1) go func() { defer vs.wg.Done() + + labels := []string{sgtid.Keyspace, sgtid.Shard, vs.tabletType.String()} + vs.vsm.vstreamsEndedWithErrors.Add(labels, 0) + err := vs.streamFromTablet(ctx, sgtid) // Set the error on exit. First one wins. if err != nil { log.Errorf("Error in vstream for %+v: %s", sgtid, err) + + vs.vsm.vstreamsEndedWithErrors.Add(labels, 1) vs.once.Do(func() { vs.setError(err, fmt.Sprintf("error starting stream from shard GTID %+v", sgtid)) vs.cancel() @@ -613,18 +634,16 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha TableLastPKs: sgtid.TablePKs, Options: options, } - var vstreamCreatedOnce sync.Once + + labels := []string{sgtid.Keyspace, sgtid.Shard, req.Target.TabletType.String(), tablet.Hostname} + vs.vsm.vstreamsCreated.Add(labels, 1) + vs.vsm.vstreamsCount.Add(labels, 1) + log.Infof("Starting to vstream from %s, with req %+v", tabletAliasString, req) err = tabletConn.VStream(ctx, req, func(events []*binlogdatapb.VEvent) error { // We received a valid event. Reset error count. errCount = 0 - labels := []string{sgtid.Keyspace, sgtid.Shard, req.Target.TabletType.String()} - - vstreamCreatedOnce.Do(func() { - vs.vsm.vstreamsCreated.Add(labels, 1) - }) - select { case <-ctx.Done(): return vterrors.Wrapf(ctx.Err(), "context ended while streaming from tablet %s in %s/%s", @@ -646,6 +665,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha sendevents := make([]*binlogdatapb.VEvent, 0, len(events)) for i, event := range events { + vs.vsm.vstreamsEventsStreamed.Add(labels, 1) switch event.Type { case binlogdatapb.VEventType_FIELD: // Update table names and send. @@ -762,6 +782,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha } return nil }) + vs.vsm.vstreamsCount.Add(labels, -1) // If stream was ended (by a journal event), return nil without checking for error. select { case <-journalDone: diff --git a/go/vt/vtgate/vstream_manager_test.go b/go/vt/vtgate/vstream_manager_test.go index e209e15fb3d..f34345a767e 100644 --- a/go/vt/vtgate/vstream_manager_test.go +++ b/go/vt/vtgate/vstream_manager_test.go @@ -335,7 +335,7 @@ func TestVStreamMulti(t *testing.T) { } } -func TestVStreamsCreatedAndLagMetrics(t *testing.T) { +func TestVStreamsMetrics(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() cell := "aa" @@ -346,9 +346,11 @@ func TestVStreamsCreatedAndLagMetrics(t *testing.T) { vsm := newTestVStreamManager(ctx, hc, st, cell) vsm.vstreamsCreated.ResetAll() vsm.vstreamsLag.ResetAll() - sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) + hostname1 := "host1" + hostname2 := "host2" + sbc0 := hc.AddTestTablet(cell, hostname1, 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, ctx, st, ks, "-20", sbc0.Tablet()) - sbc1 := hc.AddTestTablet(cell, "1.1.1.1", 1002, ks, "20-40", topodatapb.TabletType_PRIMARY, true, 1, nil) + sbc1 := hc.AddTestTablet(cell, hostname2, 1002, ks, "20-40", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, ctx, st, ks, "20-40", sbc1.Tablet()) send0 := []*binlogdatapb.VEvent{ @@ -377,15 +379,96 @@ func TestVStreamsCreatedAndLagMetrics(t *testing.T) { ch := startVStream(ctx, t, vsm, vgtid, nil) <-ch <-ch + expectedLabels1Prefix := "TestVStream.-20.PRIMARY" + expectedLabels2Prefix := "TestVStream.20-40.PRIMARY" + expectedLabels1 := expectedLabels1Prefix + "." + hostname1 + expectedLabels2 := expectedLabels2Prefix + "." + hostname2 wantVStreamsCreated := make(map[string]int64) - wantVStreamsCreated["TestVStream.-20.PRIMARY"] = 1 - wantVStreamsCreated["TestVStream.20-40.PRIMARY"] = 1 + wantVStreamsCreated[expectedLabels1] = 1 + wantVStreamsCreated[expectedLabels2] = 1 assert.Equal(t, wantVStreamsCreated, vsm.vstreamsCreated.Counts(), "vstreamsCreated matches") wantVStreamsLag := make(map[string]int64) - wantVStreamsLag["TestVStream.-20.PRIMARY"] = 5 - wantVStreamsLag["TestVStream.20-40.PRIMARY"] = 7 + wantVStreamsLag[expectedLabels1] = 5 + wantVStreamsLag[expectedLabels2] = 7 assert.Equal(t, wantVStreamsLag, vsm.vstreamsLag.Counts(), "vstreamsLag matches") + + wantVStreamsCount := make(map[string]int64) + wantVStreamsCount[expectedLabels1] = 1 + wantVStreamsCount[expectedLabels2] = 1 + assert.Equal(t, wantVStreamsCount, vsm.vstreamsCount.Counts(), "vstreamsCount matches") + + wantVStreamsEventsStreamed := make(map[string]int64) + wantVStreamsEventsStreamed[expectedLabels1] = 2 + wantVStreamsEventsStreamed[expectedLabels2] = 2 + assert.Equal(t, wantVStreamsEventsStreamed, vsm.vstreamsEventsStreamed.Counts(), "vstreamsEventsStreamed matches") + + wantVStreamsEndedWithErrors := make(map[string]int64) + wantVStreamsEndedWithErrors[expectedLabels1Prefix] = 0 + wantVStreamsEndedWithErrors[expectedLabels2Prefix] = 0 + assert.Equal(t, wantVStreamsEndedWithErrors, vsm.vstreamsEndedWithErrors.Counts(), "vstreamsEndedWithErrors matches") +} + +func TestVStreamsMetricsErrors(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cell := "aa" + ks := "TestVStream" + _ = createSandbox(ks) + hc := discovery.NewFakeHealthCheck(nil) + st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"}) + vsm := newTestVStreamManager(ctx, hc, st, cell) + vsm.vstreamsCreated.ResetAll() + vsm.vstreamsLag.ResetAll() + hostname1 := "host1" + hostname2 := "host2" + sbc0 := hc.AddTestTablet(cell, hostname1, 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) + addTabletToSandboxTopo(t, ctx, st, ks, "-20", sbc0.Tablet()) + sbc1 := hc.AddTestTablet(cell, hostname2, 1002, ks, "20-40", topodatapb.TabletType_PRIMARY, true, 1, nil) + addTabletToSandboxTopo(t, ctx, st, ks, "20-40", sbc1.Tablet()) + + const wantErr = "Invalid arg message" + sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, wantErr)) + + send1 := []*binlogdatapb.VEvent{ + {Type: binlogdatapb.VEventType_GTID, Gtid: "gtid02"}, + {Type: binlogdatapb.VEventType_COMMIT, Timestamp: 10, CurrentTime: 17 * 1e9}, + } + sbc1.AddVStreamEvents(send1, nil) + + vgtid := &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{{ + Keyspace: ks, + Shard: "-20", + Gtid: "pos", + }, { + Keyspace: ks, + Shard: "20-40", + Gtid: "pos", + }}, + } + ch := make(chan *binlogdatapb.VStreamResponse) + done := make(chan struct{}) + go func() { + err := vsm.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error { + ch <- &binlogdatapb.VStreamResponse{Events: events} + return nil + }) + + if err == nil || !strings.Contains(err.Error(), wantErr) { + t.Errorf("vstream end: %v, must contain %v", err.Error(), wantErr) + } + close(done) + }() + <-done + + expectedLabels1 := "TestVStream.-20.PRIMARY" + expectedLabels2 := "TestVStream.20-40.PRIMARY" + + wantVStreamsEndedWithErrors := make(map[string]int64) + wantVStreamsEndedWithErrors[expectedLabels1] = 1 + wantVStreamsEndedWithErrors[expectedLabels2] = 1 + assert.Equal(t, wantVStreamsEndedWithErrors, vsm.vstreamsEndedWithErrors.Counts(), "vstreamsEndedWithErrors matches") } func TestVStreamRetriableErrors(t *testing.T) { From a9c08db4559d97da63887f85c312903806a59f0d Mon Sep 17 00:00:00 2001 From: twthorn Date: Tue, 25 Feb 2025 11:00:50 -0500 Subject: [PATCH 2/7] Fix unit tests by resetting metrics Signed-off-by: twthorn --- go/vt/vtgate/vstream_manager_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/go/vt/vtgate/vstream_manager_test.go b/go/vt/vtgate/vstream_manager_test.go index f34345a767e..61ef0e115b0 100644 --- a/go/vt/vtgate/vstream_manager_test.go +++ b/go/vt/vtgate/vstream_manager_test.go @@ -346,6 +346,9 @@ func TestVStreamsMetrics(t *testing.T) { vsm := newTestVStreamManager(ctx, hc, st, cell) vsm.vstreamsCreated.ResetAll() vsm.vstreamsLag.ResetAll() + vsm.vstreamsCount.ResetAll() + vsm.vstreamsEventsStreamed.ResetAll() + vsm.vstreamsEndedWithErrors.ResetAll() hostname1 := "host1" hostname2 := "host2" sbc0 := hc.AddTestTablet(cell, hostname1, 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) @@ -420,6 +423,9 @@ func TestVStreamsMetricsErrors(t *testing.T) { vsm := newTestVStreamManager(ctx, hc, st, cell) vsm.vstreamsCreated.ResetAll() vsm.vstreamsLag.ResetAll() + vsm.vstreamsCount.ResetAll() + vsm.vstreamsEventsStreamed.ResetAll() + vsm.vstreamsEndedWithErrors.ResetAll() hostname1 := "host1" hostname2 := "host2" sbc0 := hc.AddTestTablet(cell, hostname1, 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) From ea6caffb5893fa19a8b08feac98df5c66b5c7e6c Mon Sep 17 00:00:00 2001 From: twthorn Date: Tue, 25 Feb 2025 12:12:56 -0500 Subject: [PATCH 3/7] Prevent go routine leak in tests Signed-off-by: twthorn --- go/vt/vtgate/vstream_manager_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/go/vt/vtgate/vstream_manager_test.go b/go/vt/vtgate/vstream_manager_test.go index 61ef0e115b0..adc4e7f1c21 100644 --- a/go/vt/vtgate/vstream_manager_test.go +++ b/go/vt/vtgate/vstream_manager_test.go @@ -466,6 +466,7 @@ func TestVStreamsMetricsErrors(t *testing.T) { } close(done) }() + <-ch <-done expectedLabels1 := "TestVStream.-20.PRIMARY" From 02268f7e6e7498747dc9217b8d7694c0af7b6acb Mon Sep 17 00:00:00 2001 From: twthorn Date: Wed, 26 Feb 2025 13:27:43 -0500 Subject: [PATCH 4/7] Make metrics shard level Signed-off-by: twthorn --- go/vt/vtgate/vstream_manager.go | 38 ++++++++++++++-------------- go/vt/vtgate/vstream_manager_test.go | 22 ++++++---------- 2 files changed, 27 insertions(+), 33 deletions(-) diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 57016cc0a7d..349b62125e1 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -146,6 +146,7 @@ type journalEvent struct { func newVStreamManager(resolver *srvtopo.Resolver, serv srvtopo.Server, cell string) *vstreamManager { exporter := servenv.NewExporter(cell, "VStreamManager") + labels := []string{"Keyspace", "ShardName", "TabletType"} return &vstreamManager{ resolver: resolver, @@ -154,23 +155,23 @@ func newVStreamManager(resolver *srvtopo.Resolver, serv srvtopo.Server, cell str vstreamsCreated: exporter.NewCountersWithMultiLabels( "VStreamsCreated", "Number of vstreams created", - []string{"Keyspace", "ShardName", "TabletType", "TabletHostname"}), + labels), vstreamsLag: exporter.NewGaugesWithMultiLabels( "VStreamsLag", "Difference between event current time and the binlog event timestamp", - []string{"Keyspace", "ShardName", "TabletType", "TabletHostname"}), + labels), vstreamsCount: exporter.NewCountersWithMultiLabels( "VStreamsCount", "Number of active vstreams", - []string{"Keyspace", "ShardName", "TabletType", "TabletHostname"}), + labels), vstreamsEventsStreamed: exporter.NewCountersWithMultiLabels( "VStreamsEventsStreamed", - "Number of vstreams events sent", - []string{"Keyspace", "ShardName", "TabletType", "TabletHostname"}), + "Number of events sent across all vstreams", + labels), vstreamsEndedWithErrors: exporter.NewCountersWithMultiLabels( "VStreamsEndedWithErrors", - "Number of vstreams ended with errors", - []string{"Keyspace", "ShardName", "TabletType"}), + "Number of vstreams that ended with errors", + labels), } } @@ -394,16 +395,18 @@ func (vs *vstream) startOneStream(ctx context.Context, sgtid *binlogdatapb.Shard go func() { defer vs.wg.Done() - labels := []string{sgtid.Keyspace, sgtid.Shard, vs.tabletType.String()} - vs.vsm.vstreamsEndedWithErrors.Add(labels, 0) + labelValues := []string{sgtid.Keyspace, sgtid.Shard, vs.tabletType.String()} + vs.vsm.vstreamsEndedWithErrors.Add(labelValues, 0) + vs.vsm.vstreamsCreated.Add(labelValues, 1) + vs.vsm.vstreamsCount.Add(labelValues, 1) err := vs.streamFromTablet(ctx, sgtid) // Set the error on exit. First one wins. if err != nil { log.Errorf("Error in vstream for %+v: %s", sgtid, err) - - vs.vsm.vstreamsEndedWithErrors.Add(labels, 1) + vs.vsm.vstreamsEndedWithErrors.Add(labelValues, 1) + vs.vsm.vstreamsCount.Add(labelValues, -1) vs.once.Do(func() { vs.setError(err, fmt.Sprintf("error starting stream from shard GTID %+v", sgtid)) vs.cancel() @@ -524,6 +527,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha // It will be closed when all journal events converge. var journalDone chan struct{} ignoreTablets := make([]*topodatapb.TabletAlias, 0) + labelValues := []string{sgtid.Keyspace, sgtid.Shard, vs.tabletType.String()} errCount := 0 for { @@ -634,11 +638,6 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha TableLastPKs: sgtid.TablePKs, Options: options, } - - labels := []string{sgtid.Keyspace, sgtid.Shard, req.Target.TabletType.String(), tablet.Hostname} - vs.vsm.vstreamsCreated.Add(labels, 1) - vs.vsm.vstreamsCount.Add(labels, 1) - log.Infof("Starting to vstream from %s, with req %+v", tabletAliasString, req) err = tabletConn.VStream(ctx, req, func(events []*binlogdatapb.VEvent) error { // We received a valid event. Reset error count. @@ -665,7 +664,6 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha sendevents := make([]*binlogdatapb.VEvent, 0, len(events)) for i, event := range events { - vs.vsm.vstreamsEventsStreamed.Add(labels, 1) switch event.Type { case binlogdatapb.VEventType_FIELD: // Update table names and send. @@ -775,14 +773,13 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha sendevents = append(sendevents, event) } lag := event.CurrentTime/1e9 - event.Timestamp - vs.vsm.vstreamsLag.Set(labels, lag) + vs.vsm.vstreamsLag.Set(labelValues, lag) } if len(sendevents) != 0 { eventss = append(eventss, sendevents) } return nil }) - vs.vsm.vstreamsCount.Add(labels, -1) // If stream was ended (by a journal event), return nil without checking for error. select { case <-journalDone: @@ -814,6 +811,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha } log.Infof("vstream for %s/%s error, retrying: %v", sgtid.Keyspace, sgtid.Shard, err) } + } // shouldRetry determines whether we should exit immediately or retry the vstream. @@ -859,6 +857,7 @@ func (vs *vstream) shouldRetry(err error) (retry bool, ignoreTablet bool) { func (vs *vstream) sendAll(ctx context.Context, sgtid *binlogdatapb.ShardGtid, eventss [][]*binlogdatapb.VEvent) error { vs.mu.Lock() defer vs.mu.Unlock() + labelValues := []string{sgtid.Keyspace, sgtid.Shard, vs.tabletType.String()} // Send all chunks while holding the lock. for _, events := range eventss { @@ -911,6 +910,7 @@ func (vs *vstream) sendAll(ctx context.Context, sgtid *binlogdatapb.ShardGtid, e case <-ctx.Done(): return nil case vs.eventCh <- events: + vs.vsm.vstreamsEventsStreamed.Add(labelValues, int64(len(events))) } } return nil diff --git a/go/vt/vtgate/vstream_manager_test.go b/go/vt/vtgate/vstream_manager_test.go index adc4e7f1c21..98c9bd3742f 100644 --- a/go/vt/vtgate/vstream_manager_test.go +++ b/go/vt/vtgate/vstream_manager_test.go @@ -349,11 +349,9 @@ func TestVStreamsMetrics(t *testing.T) { vsm.vstreamsCount.ResetAll() vsm.vstreamsEventsStreamed.ResetAll() vsm.vstreamsEndedWithErrors.ResetAll() - hostname1 := "host1" - hostname2 := "host2" - sbc0 := hc.AddTestTablet(cell, hostname1, 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) + sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, ctx, st, ks, "-20", sbc0.Tablet()) - sbc1 := hc.AddTestTablet(cell, hostname2, 1002, ks, "20-40", topodatapb.TabletType_PRIMARY, true, 1, nil) + sbc1 := hc.AddTestTablet(cell, "1.1.1.2", 1002, ks, "20-40", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, ctx, st, ks, "20-40", sbc1.Tablet()) send0 := []*binlogdatapb.VEvent{ @@ -382,10 +380,8 @@ func TestVStreamsMetrics(t *testing.T) { ch := startVStream(ctx, t, vsm, vgtid, nil) <-ch <-ch - expectedLabels1Prefix := "TestVStream.-20.PRIMARY" - expectedLabels2Prefix := "TestVStream.20-40.PRIMARY" - expectedLabels1 := expectedLabels1Prefix + "." + hostname1 - expectedLabels2 := expectedLabels2Prefix + "." + hostname2 + expectedLabels1 := "TestVStream.-20.PRIMARY" + expectedLabels2 := "TestVStream.20-40.PRIMARY" wantVStreamsCreated := make(map[string]int64) wantVStreamsCreated[expectedLabels1] = 1 wantVStreamsCreated[expectedLabels2] = 1 @@ -407,8 +403,8 @@ func TestVStreamsMetrics(t *testing.T) { assert.Equal(t, wantVStreamsEventsStreamed, vsm.vstreamsEventsStreamed.Counts(), "vstreamsEventsStreamed matches") wantVStreamsEndedWithErrors := make(map[string]int64) - wantVStreamsEndedWithErrors[expectedLabels1Prefix] = 0 - wantVStreamsEndedWithErrors[expectedLabels2Prefix] = 0 + wantVStreamsEndedWithErrors[expectedLabels1] = 0 + wantVStreamsEndedWithErrors[expectedLabels2] = 0 assert.Equal(t, wantVStreamsEndedWithErrors, vsm.vstreamsEndedWithErrors.Counts(), "vstreamsEndedWithErrors matches") } @@ -426,11 +422,9 @@ func TestVStreamsMetricsErrors(t *testing.T) { vsm.vstreamsCount.ResetAll() vsm.vstreamsEventsStreamed.ResetAll() vsm.vstreamsEndedWithErrors.ResetAll() - hostname1 := "host1" - hostname2 := "host2" - sbc0 := hc.AddTestTablet(cell, hostname1, 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) + sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, ctx, st, ks, "-20", sbc0.Tablet()) - sbc1 := hc.AddTestTablet(cell, hostname2, 1002, ks, "20-40", topodatapb.TabletType_PRIMARY, true, 1, nil) + sbc1 := hc.AddTestTablet(cell, "1.1.1.2", 1002, ks, "20-40", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, ctx, st, ks, "20-40", sbc1.Tablet()) const wantErr = "Invalid arg message" From bc9b72b03d1f2d7b50e24563d6c936bf4dd6f4d4 Mon Sep 17 00:00:00 2001 From: twthorn Date: Thu, 27 Feb 2025 13:33:13 -0500 Subject: [PATCH 5/7] Only count errors not due to client Signed-off-by: twthorn --- go/vt/vtgate/vstream_manager.go | 9 ++++++++- go/vt/vtgate/vstream_manager_test.go | 4 ++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 349b62125e1..26e25235ae2 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -18,6 +18,7 @@ package vtgate import ( "context" + "errors" "fmt" "io" "regexp" @@ -405,7 +406,13 @@ func (vs *vstream) startOneStream(ctx context.Context, sgtid *binlogdatapb.Shard // Set the error on exit. First one wins. if err != nil { log.Errorf("Error in vstream for %+v: %s", sgtid, err) - vs.vsm.vstreamsEndedWithErrors.Add(labelValues, 1) + // Get the original/base error. + uerr := vterrors.UnwrapAll(err) + if !errors.Is(uerr, context.Canceled) && !errors.Is(uerr, context.DeadlineExceeded) { + // The client did not intentionally end the stream so this was an error in the + // vstream itself. + vs.vsm.vstreamsEndedWithErrors.Add(labelValues, 1) + } vs.vsm.vstreamsCount.Add(labelValues, -1) vs.once.Do(func() { vs.setError(err, fmt.Sprintf("error starting stream from shard GTID %+v", sgtid)) diff --git a/go/vt/vtgate/vstream_manager_test.go b/go/vt/vtgate/vstream_manager_test.go index 98c9bd3742f..78014b69d5c 100644 --- a/go/vt/vtgate/vstream_manager_test.go +++ b/go/vt/vtgate/vstream_manager_test.go @@ -456,7 +456,7 @@ func TestVStreamsMetricsErrors(t *testing.T) { }) if err == nil || !strings.Contains(err.Error(), wantErr) { - t.Errorf("vstream end: %v, must contain %v", err.Error(), wantErr) + require.ErrorContains(t, err, wantErr) } close(done) }() @@ -468,7 +468,7 @@ func TestVStreamsMetricsErrors(t *testing.T) { wantVStreamsEndedWithErrors := make(map[string]int64) wantVStreamsEndedWithErrors[expectedLabels1] = 1 - wantVStreamsEndedWithErrors[expectedLabels2] = 1 + wantVStreamsEndedWithErrors[expectedLabels2] = 0 assert.Equal(t, wantVStreamsEndedWithErrors, vsm.vstreamsEndedWithErrors.Counts(), "vstreamsEndedWithErrors matches") } From 599851338d8d2f117edfc485443eb68b50e87a01 Mon Sep 17 00:00:00 2001 From: twthorn Date: Thu, 27 Feb 2025 13:35:43 -0500 Subject: [PATCH 6/7] Use reset instead of adding zero Signed-off-by: twthorn --- go/vt/vtgate/vstream_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 26e25235ae2..ed4e6c92331 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -397,7 +397,7 @@ func (vs *vstream) startOneStream(ctx context.Context, sgtid *binlogdatapb.Shard defer vs.wg.Done() labelValues := []string{sgtid.Keyspace, sgtid.Shard, vs.tabletType.String()} - vs.vsm.vstreamsEndedWithErrors.Add(labelValues, 0) + vs.vsm.vstreamsEndedWithErrors.Reset(labelValues) vs.vsm.vstreamsCreated.Add(labelValues, 1) vs.vsm.vstreamsCount.Add(labelValues, 1) From 239367b408ea7a34ca0b5309d639a30354a18e39 Mon Sep 17 00:00:00 2001 From: twthorn Date: Thu, 27 Feb 2025 14:08:57 -0500 Subject: [PATCH 7/7] Switch back to add zero Signed-off-by: twthorn --- go/vt/vtgate/vstream_manager.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index ed4e6c92331..9328b50dd25 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -397,7 +397,8 @@ func (vs *vstream) startOneStream(ctx context.Context, sgtid *binlogdatapb.Shard defer vs.wg.Done() labelValues := []string{sgtid.Keyspace, sgtid.Shard, vs.tabletType.String()} - vs.vsm.vstreamsEndedWithErrors.Reset(labelValues) + // Initialize vstreamsEndedWithErrors metric to zero. + vs.vsm.vstreamsEndedWithErrors.Add(labelValues, 0) vs.vsm.vstreamsCreated.Add(labelValues, 1) vs.vsm.vstreamsCount.Add(labelValues, 1)