diff --git a/examples/local/vstream_client.go b/examples/local/vstream_client.go index 98d2129f898..a22cb0cdbdc 100644 --- a/examples/local/vstream_client.go +++ b/examples/local/vstream_client.go @@ -23,13 +23,16 @@ import ( "log" "time" - vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" - - binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/mysql/collations" + "vitess.io/vitess/go/sqltypes" _ "vitess.io/vitess/go/vt/vtctl/grpcvtctlclient" _ "vitess.io/vitess/go/vt/vtgate/grpcvtgateconn" "vitess.io/vitess/go/vt/vtgate/vtgateconn" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" ) /* @@ -38,7 +41,7 @@ import ( */ func main() { ctx := context.Background() - streamCustomer := true + streamCustomer := false var vgtid *binlogdatapb.VGtid if streamCustomer { vgtid = &binlogdatapb.VGtid{ @@ -54,11 +57,21 @@ func main() { Gtid: "", }}} } else { + lastPK := sqltypes.Result{ + Fields: []*querypb.Field{{Name: "customer_id", Type: querypb.Type_INT64, Charset: collations.CollationBinaryID, Flags: uint32(querypb.MySqlFlag_NUM_FLAG | querypb.MySqlFlag_BINARY_FLAG)}}, + Rows: [][]sqltypes.Value{{sqltypes.NewInt64(5)}}, + } vgtid = &binlogdatapb.VGtid{ ShardGtids: []*binlogdatapb.ShardGtid{{ - Keyspace: "commerce", + Keyspace: "customer", Shard: "0", Gtid: "", + TablePKs: []*binlogdatapb.TableLastPK{ + { + TableName: "customer", + Lastpk: sqltypes.ResultToProto3(&lastPK), + }, + }, }}} } filter := &binlogdatapb.Filter{ diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 08553969a50..8d2b7b9223c 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -21,12 +21,16 @@ import ( "fmt" "io" "regexp" + "slices" "strings" "sync" "time" + "golang.org/x/exp/maps" + "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/discovery" + "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/srvtopo" @@ -180,6 +184,58 @@ func (vsm *vstreamManager) VStream(ctx context.Context, tabletType topodatapb.Ta TabletOrder: flags.GetTabletOrder(), }, } + keyspaces := make(map[string]bool) + for _, sgtid := range vgtid.ShardGtids { + if len(sgtid.TablePKs) > 0 { + keyspaces[sgtid.GetKeyspace()] = true + } else { + keyspaces[sgtid.GetKeyspace()] = false + } + } + for keyspace := range keyspaces { + reshard, servingShards, err := vs.keyspaceHasBeenResharded(ctx, keyspace) + if err != nil { + return err + } + if !reshard { + continue + } + if flags.StopOnReshard { + return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "Keyspace %s has been resharded", keyspace) + } + if keyspaces[keyspace] { // Last tablepk values were provided + // So we can try to resume the copy phase for the tables. + // Remove the old shards from the vgtid and add the new shards. + newsgtids := make([]*binlogdatapb.ShardGtid, 0, len(vs.vgtid.ShardGtids)) + var lastPKs []*binlogdatapb.TableLastPK + for _, cursgtid := range vs.vgtid.ShardGtids { + if cursgtid.Keyspace == keyspace { + if (cursgtid.TablePKs == nil) || (lastPKs != nil && !slices.Equal(cursgtid.TablePKs, lastPKs)) { + return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "Keyspace %s has been resharded and empty or different table last PK values were found across shards so we cannot resume", + keyspace) + } + lastPKs = cursgtid.TablePKs + continue + } + newsgtids = append(newsgtids, cursgtid) + } + // Add the new ones. + for _, shard := range servingShards { + newsgtid := &binlogdatapb.ShardGtid{ + Keyspace: keyspace, + Gtid: "current", // Start a copy phase, resuming from the lastPKs seen + Shard: shard.ShardName(), + TablePKs: lastPKs, // The resume point per table + } + newsgtids = append(newsgtids, newsgtid) + } + vs.vgtid.ShardGtids = newsgtids + } else { + return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "Keyspace %s has been resharded but no last tablepk values have been provided so we cannot resume", + keyspace) + } + } + return vs.stream(ctx) } @@ -737,7 +793,7 @@ func (vs *vstream) sendAll(ctx context.Context, sgtid *binlogdatapb.ShardGtid, e if err := vs.getError(); err != nil { return err } - // convert all gtids to vgtids. This should be done here while holding the lock. + // Convert all gtids to vgtids. This should be done here while holding the lock. for j, event := range events { if event.Type == binlogdatapb.VEventType_GTID { // Update the VGtid and send that instead. @@ -752,6 +808,11 @@ func (vs *vstream) sendAll(ctx context.Context, sgtid *binlogdatapb.ShardGtid, e var foundIndex = -1 eventTablePK := event.LastPKEvent.TableLastPK for idx, pk := range sgtid.TablePKs { + // There was no matching row on the given shard. This can happen + // when resuming after a Reshard. + if pk == nil { + continue + } if pk.TableName == eventTablePK.TableName { foundIndex = idx break @@ -921,3 +982,38 @@ func (vs *vstream) getJournalEvent(ctx context.Context, sgtid *binlogdatapb.Shar close(je.done) return je, nil } + +func (vs *vstream) keyspaceHasBeenResharded(ctx context.Context, keyspace string) (bool, []*topo.ShardInfo, error) { + smap, err := vs.ts.FindAllShardsInKeyspace(ctx, keyspace, nil) + if err != nil || len(smap) == 0 { + return false, nil, err + } + ksShardGTIDs := make([]*binlogdatapb.ShardGtid, 0, len(vs.vgtid.ShardGtids)) + for _, sg := range vs.vgtid.ShardGtids { + if sg.Keyspace == keyspace { + ksShardGTIDs = append(ksShardGTIDs, sg) + } + } + reshard := false + shards := maps.Values(smap) + servingShards := make([]*topo.ShardInfo, 0, len(shards)) + if len(smap) != len(ksShardGTIDs) { + for i, shard := range shards { + if shard.IsPrimaryServing { + servingShards = append(servingShards, shard) + } + if !reshard { + for n, s := range shards { + if i == n { // Same shard + continue + } + if key.KeyRangeIntersect(shard.GetKeyRange(), s.GetKeyRange()) { + reshard = true + break + } + } + } + } + } + return reshard, servingShards, nil +}