Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ import (
"sync"
"time"

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/discovery"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/topo"

vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
Expand All @@ -46,6 +48,9 @@ type vstreamManager struct {
resolver *srvtopo.Resolver
toposerv srvtopo.Server
cell string

vstreamsCreated *stats.CountersWithMultiLabels
vstreamsLag *stats.GaugesWithMultiLabels
}

// maxSkewTimeoutSeconds is the maximum allowed skew between two streams when the MinimizeSkew flag is set
Expand Down Expand Up @@ -119,10 +124,19 @@ type journalEvent struct {
}

func newVStreamManager(resolver *srvtopo.Resolver, serv srvtopo.Server, cell string) *vstreamManager {
exporter := servenv.NewExporter(cell, "VStreamManager")
return &vstreamManager{
resolver: resolver,
toposerv: serv,
cell: cell,
vstreamsCreated: exporter.NewCountersWithMultiLabels(
"VStreamsCreated",
"Number of vstreams created",
[]string{"Keyspace", "ShardName", "TabletType"}),
vstreamsLag: exporter.NewGaugesWithMultiLabels(
"VStreamsLag",
"Difference between event current time and the binlog event timestamp",
[]string{"Keyspace", "ShardName", "TabletType"}),
}
}

Expand Down Expand Up @@ -528,10 +542,17 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
Filter: vs.filter,
TableLastPKs: sgtid.TablePKs,
}
var vstreamCreatedOnce sync.Once
err = tabletConn.VStream(ctx, req, func(events []*binlogdatapb.VEvent) error {
// We received a valid event. Reset error count.
errCount = 0

labels := []string{sgtid.Keyspace, sgtid.Shard, req.Target.TabletType.String()}

vstreamCreatedOnce.Do(func() {
vs.vsm.vstreamsCreated.Add(labels, 1)
})

select {
case <-ctx.Done():
return ctx.Err()
Expand Down Expand Up @@ -627,6 +648,9 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
default:
sendevents = append(sendevents, event)
}
lag := event.CurrentTime/1e9 - event.Timestamp
vs.vsm.vstreamsLag.Set(labels, lag)

}
if len(sendevents) != 0 {
eventss = append(eventss, sendevents)
Expand Down
56 changes: 55 additions & 1 deletion go/vt/vtgate/vstream_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,59 @@ func TestVStreamMulti(t *testing.T) {
}
}

func TestVStreamsCreatedAndLagMetrics(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cell := "aa"
ks := "TestVStream"
_ = createSandbox(ks)
hc := discovery.NewFakeHealthCheck(nil)
st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"})
vsm := newTestVStreamManager(hc, st, cell)
vsm.vstreamsCreated.ResetAll()
vsm.vstreamsLag.ResetAll()
sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet())
sbc1 := hc.AddTestTablet(cell, "1.1.1.1", 1002, ks, "20-40", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, st, ks, "20-40", sbc1.Tablet())

send0 := []*binlogdatapb.VEvent{
{Type: binlogdatapb.VEventType_GTID, Gtid: "gtid01"},
{Type: binlogdatapb.VEventType_COMMIT, Timestamp: 10, CurrentTime: 15 * 1e9},
}
sbc0.AddVStreamEvents(send0, nil)

send1 := []*binlogdatapb.VEvent{
{Type: binlogdatapb.VEventType_GTID, Gtid: "gtid02"},
{Type: binlogdatapb.VEventType_COMMIT, Timestamp: 10, CurrentTime: 17 * 1e9},
}
sbc1.AddVStreamEvents(send1, nil)

vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: ks,
Shard: "-20",
Gtid: "pos",
}, {
Keyspace: ks,
Shard: "20-40",
Gtid: "pos",
}},
}
ch := startVStream(ctx, t, vsm, vgtid, nil)
<-ch
<-ch
wantVStreamsCreated := make(map[string]int64)
wantVStreamsCreated["TestVStream.-20.PRIMARY"] = 1
wantVStreamsCreated["TestVStream.20-40.PRIMARY"] = 1
assert.Equal(t, wantVStreamsCreated, vsm.vstreamsCreated.Counts(), "vstreamsCreated matches")

wantVStreamsLag := make(map[string]int64)
wantVStreamsLag["TestVStream.-20.PRIMARY"] = 5
wantVStreamsLag["TestVStream.20-40.PRIMARY"] = 7
assert.Equal(t, wantVStreamsLag, vsm.vstreamsLag.Counts(), "vstreamsLag matches")
}

func TestVStreamRetry(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -1157,7 +1210,8 @@ func startVStream(ctx context.Context, t *testing.T, vsm *vstreamManager, vgtid
func verifyEvents(t *testing.T, ch <-chan *binlogdatapb.VStreamResponse, wants ...*binlogdatapb.VStreamResponse) {
t.Helper()
for i, want := range wants {
got := <-ch
val := <-ch
got := proto.Clone(val).(*binlogdatapb.VStreamResponse)
require.NotNil(t, got)
for _, event := range got.Events {
event.Timestamp = 0
Expand Down