diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 4e50ea12af3..d19eb42f95d 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -1431,7 +1431,7 @@ func reshardAction(t *testing.T, action, workflow, keyspaceName, sourceShards, t action, workflow, output) } if err != nil { - t.Fatalf("Reshard %s command failed with %+v\n", action, err) + t.Fatalf("Reshard %s command failed with %+v\nOutput: %s", action, err, output) } } diff --git a/go/test/endtoend/vreplication/vstream_test.go b/go/test/endtoend/vreplication/vstream_test.go index 94f0fb25448..79bb8dabdfe 100644 --- a/go/test/endtoend/vreplication/vstream_test.go +++ b/go/test/endtoend/vreplication/vstream_test.go @@ -226,7 +226,7 @@ func insertRow(keyspace, table string, id int) { vtgateConn.ExecuteFetch("begin", 1000, false) _, err := vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s (name) values ('%s%d')", table, table, id), 1000, false) if err != nil { - log.Infof("error inserting row %d: %v", id, err) + log.Errorf("error inserting row %d: %v", id, err) } vtgateConn.ExecuteFetch("commit", 1000, false) } @@ -390,13 +390,15 @@ func testVStreamCopyMultiKeyspaceReshard(t *testing.T, baseTabletID int) numEven defer vc.TearDown() defaultCell := vc.Cells[vc.CellNames[0]] - vc.AddKeyspace(t, []*Cell{defaultCell}, "unsharded", "0", vschemaUnsharded, schemaUnsharded, defaultReplicas, defaultRdonly, baseTabletID+100, nil) + _, err := vc.AddKeyspace(t, []*Cell{defaultCell}, "unsharded", "0", vschemaUnsharded, schemaUnsharded, defaultReplicas, defaultRdonly, baseTabletID+100, nil) + require.NoError(t, err) vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) defer vtgateConn.Close() verifyClusterHealth(t, vc) - vc.AddKeyspace(t, []*Cell{defaultCell}, "sharded", "-80,80-", vschemaSharded, schemaSharded, defaultReplicas, defaultRdonly, baseTabletID+200, nil) + _, err = vc.AddKeyspace(t, []*Cell{defaultCell}, "sharded", "-80,80-", vschemaSharded, schemaSharded, defaultReplicas, defaultRdonly, baseTabletID+200, nil) + require.NoError(t, err) ctx := context.Background() vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort)) @@ -515,6 +517,196 @@ func testVStreamCopyMultiKeyspaceReshard(t *testing.T, baseTabletID int) numEven return ne } +// Validate that we can resume a VStream when the keyspace has been resharded +// while not streaming. Ensure that there we successfully transition from the +// old shards -- which are in the VGTID from the previous stream -- and that +// we miss no row events during the process. +func TestMultiVStreamsKeyspaceReshard(t *testing.T) { + ctx := context.Background() + ks := "testks" + wf := "multiVStreamsKeyspaceReshard" + baseTabletID := 100 + tabletType := topodatapb.TabletType_PRIMARY.String() + oldShards := "-80,80-" + newShards := "-40,40-80,80-c0,c0-" + oldShardRowEvents, newShardRowEvents := 0, 0 + vc = NewVitessCluster(t, nil) + defer vc.TearDown() + defaultCell := vc.Cells[vc.CellNames[0]] + ogdr := defaultReplicas + defaultReplicas = 0 // Because of CI resource constraints we can only run this test with primary tablets + defer func(dr int) { defaultReplicas = dr }(ogdr) + + // For our sequences etc. + _, err := vc.AddKeyspace(t, []*Cell{defaultCell}, "global", "0", vschemaUnsharded, schemaUnsharded, defaultReplicas, defaultRdonly, baseTabletID, nil) + require.NoError(t, err) + + // Setup the keyspace with our old/original shards. + keyspace, err := vc.AddKeyspace(t, []*Cell{defaultCell}, ks, oldShards, vschemaSharded, schemaSharded, defaultReplicas, defaultRdonly, baseTabletID+1000, nil) + require.NoError(t, err) + + // Add the new shards. + err = vc.AddShards(t, []*Cell{defaultCell}, keyspace, newShards, defaultReplicas, defaultRdonly, baseTabletID+2000, targetKsOpts) + require.NoError(t, err) + + vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) + defer vtgateConn.Close() + verifyClusterHealth(t, vc) + + vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort)) + require.NoError(t, err) + defer vstreamConn.Close() + + // Ensure that we're starting with a clean slate. + _, err = vtgateConn.ExecuteFetch(fmt.Sprintf("delete from %s.customer", ks), 1000, false) + require.NoError(t, err) + + // Coordinate go-routines. + streamCtx, streamCancel := context.WithTimeout(ctx, 1*time.Minute) + defer streamCancel() + done := make(chan struct{}) + + // First goroutine that keeps inserting rows into the table being streamed until the + // stream context is cancelled. + go func() { + id := 1 + for { + select { + case <-streamCtx.Done(): + // Give the VStream a little catch-up time before telling it to stop + // via the done channel. + time.Sleep(10 * time.Second) + close(done) + return + default: + insertRow(ks, "customer", id) + time.Sleep(250 * time.Millisecond) + id++ + } + } + }() + + // Create the Reshard workflow and wait for it to finish the copy phase. + reshardAction(t, "Create", wf, ks, oldShards, newShards, defaultCellName, tabletType) + waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", ks, wf), binlogdatapb.VReplicationWorkflowState_Running.String()) + + vgtid := &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{{ + Keyspace: "/.*", // Match all keyspaces just to be more realistic. + }}} + + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + // Only stream the customer table and its sequence backing table. + Match: "/customer.*", + }}, + } + flags := &vtgatepb.VStreamFlags{} + + // Stream events but stop once we have a VGTID with positions for the old/original shards. + var newVGTID *binlogdatapb.VGtid + func() { + var reader vtgateconn.VStreamReader + reader, err = vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags) + require.NoError(t, err) + for { + evs, err := reader.Recv() + + switch err { + case nil: + for _, ev := range evs { + switch ev.Type { + case binlogdatapb.VEventType_ROW: + shard := ev.GetRowEvent().GetShard() + switch shard { + case "-80", "80-": + oldShardRowEvents++ + case "0": + // We expect some for the sequence backing table, but don't care. + default: + require.FailNow(t, fmt.Sprintf("received event for unexpected shard: %s", shard)) + } + case binlogdatapb.VEventType_VGTID: + newVGTID = ev.GetVgtid() + if len(newVGTID.GetShardGtids()) == 3 { + // We want a VGTID with a position for the global shard and the old shards. + canStop := true + for _, sg := range newVGTID.GetShardGtids() { + if sg.GetGtid() == "" { + canStop = false + } + } + if canStop { + return + } + } + } + } + default: + require.FailNow(t, fmt.Sprintf("VStream returned unexpected error: %v", err)) + } + select { + case <-streamCtx.Done(): + return + default: + } + } + }() + + // Confirm that we have shard GTIDs for the global shard and the old/original shards. + require.Len(t, newVGTID.GetShardGtids(), 3) + + // Switch the traffic to the new shards. + reshardAction(t, "SwitchTraffic", wf, ks, oldShards, newShards, defaultCellName, tabletType) + + // Now start a new VStream from our previous VGTID which only has the old/original shards. + func() { + var reader vtgateconn.VStreamReader + reader, err = vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, newVGTID, filter, flags) + require.NoError(t, err) + for { + evs, err := reader.Recv() + + switch err { + case nil: + for _, ev := range evs { + switch ev.Type { + case binlogdatapb.VEventType_ROW: + shard := ev.RowEvent.Shard + switch shard { + case "-80", "80-": + oldShardRowEvents++ + case "-40", "40-80", "80-c0", "c0-": + newShardRowEvents++ + case "0": + // Again, we expect some for the sequence backing table, but don't care. + default: + require.FailNow(t, fmt.Sprintf("received event for unexpected shard: %s", shard)) + } + } + } + default: + require.FailNow(t, fmt.Sprintf("VStream returned unexpected error: %v", err)) + } + select { + case <-done: + return + default: + } + } + }() + + // We should have a mix of events across the old and new shards. + require.NotZero(t, oldShardRowEvents) + require.NotZero(t, newShardRowEvents) + + // The number of row events streamed by the VStream API should match the number of rows inserted. + customerResult := execVtgateQuery(t, vtgateConn, ks, "select count(*) from customer") + customerCount, err := customerResult.Rows[0][0].ToInt64() + require.NoError(t, err) + require.Equal(t, customerCount, int64(oldShardRowEvents+newShardRowEvents)) +} + func TestVStreamFailover(t *testing.T) { testVStreamWithFailover(t, true) } diff --git a/go/vt/topo/faketopo/faketopo.go b/go/vt/topo/faketopo/faketopo.go index c2babeca29b..75b88fc9644 100644 --- a/go/vt/topo/faketopo/faketopo.go +++ b/go/vt/topo/faketopo/faketopo.go @@ -21,12 +21,11 @@ import ( "strings" "sync" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" - "vitess.io/vitess/go/vt/log" topodatapb "vitess.io/vitess/go/vt/proto/topodata" - - "vitess.io/vitess/go/vt/topo" ) // FakeFactory implements the Factory interface. This is supposed to be used only for testing diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index cf6a13d2b9f..2907216a124 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -26,8 +26,11 @@ import ( "sync" "time" + "golang.org/x/exp/maps" + "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/discovery" + "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/srvtopo" @@ -541,21 +544,36 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha var err error cells := vs.getCells() - tp, err := discovery.NewTabletPicker(ctx, vs.ts, cells, vs.vsm.cell, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String(), vs.tabletPickerOptions, ignoreTablets...) + tpo := vs.tabletPickerOptions + resharded, err := vs.keyspaceHasBeenResharded(ctx, sgtid.Keyspace) if err != nil { - log.Errorf(err.Error()) - return err + return vterrors.Wrapf(err, "failed to determine if keyspace %s has been resharded", sgtid.Keyspace) + } + if resharded { + // The non-serving tablet in the old / non-serving shard will contain all of + // the GTIDs that we need before transitioning to the new shards along with + // the journal event that will then allow us to automatically transition to + // the new shards (provided the stop_on_reshard option is not set). + tpo.IncludeNonServingTablets = true } + tabletPickerErr := func(err error) error { + tperr := vterrors.Wrapf(err, "failed to find a %s tablet for VStream in %s/%s within the %s cell(s)", + vs.tabletType.String(), sgtid.GetKeyspace(), sgtid.GetShard(), strings.Join(cells, ",")) + log.Errorf("%v", tperr) + return tperr + } + tp, err := discovery.NewTabletPicker(ctx, vs.ts, cells, vs.vsm.cell, sgtid.GetKeyspace(), sgtid.GetShard(), vs.tabletType.String(), tpo, ignoreTablets...) + if err != nil { + return tabletPickerErr(err) + } // Create a child context with a stricter timeout when picking a tablet. // This will prevent hanging in the case no tablets are found. tpCtx, tpCancel := context.WithTimeout(ctx, tabletPickerContextTimeout) defer tpCancel() - tablet, err := tp.PickForStreaming(tpCtx) if err != nil { - log.Errorf(err.Error()) - return err + return tabletPickerErr(err) } tabletAliasString := topoproto.TabletAliasString(tablet.Alias) log.Infof("Picked %s tablet %s for VStream in %s/%s within the %s cell(s)", @@ -803,7 +821,7 @@ func (vs *vstream) sendAll(ctx context.Context, sgtid *binlogdatapb.ShardGtid, e if err := vs.getError(); err != nil { return err } - // convert all gtids to vgtids. This should be done here while holding the lock. + // Convert all gtids to vgtids. This should be done here while holding the lock. for j, event := range events { if event.Type == binlogdatapb.VEventType_GTID { // Update the VGtid and send that instead. @@ -990,3 +1008,56 @@ func (vs *vstream) getJournalEvent(ctx context.Context, sgtid *binlogdatapb.Shar close(je.done) return je, nil } + +// keyspaceHasBeenResharded returns true if the keyspace's serving shard set has changed +// since the last VStream as indicated by the shard definitions provided in the VGTID. +func (vs *vstream) keyspaceHasBeenResharded(ctx context.Context, keyspace string) (bool, error) { + shards, err := vs.ts.FindAllShardsInKeyspace(ctx, keyspace, nil) + if err != nil || len(shards) == 0 { + return false, err + } + + // First check the typical case, where the VGTID shards match the serving shards. + // In that case it's NOT possible that an applicable reshard has happened because + // the VGTID contains shards that are all serving. + reshardPossible := false + ksShardGTIDs := make([]*binlogdatapb.ShardGtid, 0, len(vs.vgtid.ShardGtids)) + for _, s := range vs.vgtid.ShardGtids { + if s.GetKeyspace() == keyspace { + ksShardGTIDs = append(ksShardGTIDs, s) + } + } + for _, s := range ksShardGTIDs { + shard := shards[s.GetShard()] + if shard == nil { + return false, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "shard provided in VGTID, %s, not found in the %s keyspace", s.GetShard(), keyspace) + } + if !shard.GetIsPrimaryServing() { + reshardPossible = true + break + } + } + if !reshardPossible { + return false, nil + } + + // Now that we know there MAY have been an applicable reshard, let's make a + // definitive determination by looking at the shard keyranges. + // All we care about are the shard info records now. + sis := maps.Values(shards) + for i := range sis { + for j := range sis { + if sis[i].ShardName() == sis[j].ShardName() && key.KeyRangeEqual(sis[i].GetKeyRange(), sis[j].GetKeyRange()) { + // It's the same shard so skip it. + continue + } + if key.KeyRangeIntersect(sis[i].GetKeyRange(), sis[j].GetKeyRange()) { + // We have different shards with overlapping keyranges so we know + // that a reshard has happened. + return true, nil + } + } + } + + return false, nil +} diff --git a/go/vt/vtgate/vstream_manager_test.go b/go/vt/vtgate/vstream_manager_test.go index 04abdb351a8..c447882c3a8 100644 --- a/go/vt/vtgate/vstream_manager_test.go +++ b/go/vt/vtgate/vstream_manager_test.go @@ -25,12 +25,12 @@ import ( "testing" "time" - "google.golang.org/protobuf/proto" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" "vitess.io/vitess/go/stats" + "vitess.io/vitess/go/test/utils" "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/logutil" @@ -45,8 +45,6 @@ import ( topodatapb "vitess.io/vitess/go/vt/proto/topodata" vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" - - "vitess.io/vitess/go/test/utils" ) var mu sync.Mutex diff --git a/test/config.json b/test/config.json index 53ef22a5e8f..3fd00df4126 100644 --- a/test/config.json +++ b/test/config.json @@ -1130,6 +1130,15 @@ "RetryMax": 1, "Tags": [] }, + "multi_vstreams_keyspace_reshard": { + "File": "unused.go", + "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestMultiVStreamsKeyspaceReshard", "-timeout", "15m"], + "Command": [], + "Manual": false, + "Shard": "vstream", + "RetryMax": 1, + "Tags": [] + }, "vstream_failover": { "File": "unused.go", "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "VStreamFailover"],