Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/local/vttablet-up.sh
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ source $script_root/env.sh

init_db_sql_file="$VTROOT/config/init_db.sql"

export EXTRA_MY_CNF=$VTROOT/config/mycnf/default-fast.cnf
export EXTRA_MY_CNF=$VTROOT/config/mycnf/default-fast.cnf:$VTROOT/config/mycnf/rbr.cnf

case "$MYSQL_FLAVOR" in
"MySQL56")
Expand Down
2 changes: 1 addition & 1 deletion go/vt/key/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func ParseKeyRangeParts(start, end string) (*topodatapb.KeyRange, error) {
// KeyRangeString prints a topodatapb.KeyRange
func KeyRangeString(k *topodatapb.KeyRange) string {
if k == nil {
return "<nil>"
return "-"
}
return hex.EncodeToString(k.Start) + "-" + hex.EncodeToString(k.End)
}
Expand Down
14 changes: 14 additions & 0 deletions go/vt/topo/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,20 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string)
return result, nil
}

// GetOnlyShard returns the single ShardInfo of an unsharded keyspace.
func (ts *Server) GetOnlyShard(ctx context.Context, keyspace string) (*ShardInfo, error) {
allShards, err := ts.FindAllShardsInKeyspace(ctx, keyspace)
if err != nil {
return nil, err
}
if len(allShards) == 1 {
for _, s := range allShards {
return s, nil
}
}
return nil, vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "keyspace %s must have one and only one shard: %v", keyspace, allShards)
}

// DeleteKeyspace wraps the underlying Conn.Delete
// and dispatches the event.
func (ts *Server) DeleteKeyspace(ctx context.Context, keyspace string) error {
Expand Down
32 changes: 32 additions & 0 deletions go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,12 @@ var commands = []commandGroup{
{"ValidateKeyspace", commandValidateKeyspace,
"[-ping-tablets] <keyspace name>",
"Validates that all nodes reachable from the specified keyspace are consistent."},
{"SplitClone", commandSplitClone,
"<keyspace> <from_shards> <to_shards>",
"Start the SplitClone process to perform horizontal resharding. Example: SplitClone ks '0' '-80,80-'"},
{"VerticalSplitClone", commandVerticalSplitClone,
"<from_keyspace> <to_keyspace> <tables>",
"Start the VerticalSplitClone process to perform vertical resharding. Example: SplitClone from_ks to_ks 'a,/b.*/'"},
{"MigrateServedTypes", commandMigrateServedTypes,
"[-cells=c1,c2,...] [-reverse] [-skip-refresh-state] <keyspace/shard> <served tablet type>",
"Migrates a serving type from the source shard to the shards that it replicates to. This command also rebuilds the serving graph. The <keyspace/shard> argument can specify any of the shards involved in the migration."},
Expand Down Expand Up @@ -1718,6 +1724,32 @@ func commandValidateKeyspace(ctx context.Context, wr *wrangler.Wrangler, subFlag
return wr.ValidateKeyspace(ctx, keyspace, *pingTablets)
}

func commandSplitClone(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
if err := subFlags.Parse(args); err != nil {
return err
}
if subFlags.NArg() != 3 {
return fmt.Errorf("three arguments are required: keyspace, from_shards, to_shards")
}
keyspace := subFlags.Arg(0)
from := strings.Split(subFlags.Arg(1), ",")
to := strings.Split(subFlags.Arg(2), ",")
return wr.SplitClone(ctx, keyspace, from, to)
}

func commandVerticalSplitClone(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
if err := subFlags.Parse(args); err != nil {
return err
}
if subFlags.NArg() != 3 {
return fmt.Errorf("three arguments are required: from_keyspace, to_keyspace, tables")
}
fromKeyspace := subFlags.Arg(0)
toKeyspace := subFlags.Arg(1)
tables := strings.Split(subFlags.Arg(2), ",")
return wr.VerticalSplitClone(ctx, fromKeyspace, toKeyspace, tables)
}

func commandMigrateServedTypes(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
cellsStr := subFlags.String("cells", "", "Specifies a comma-separated list of cells to update")
reverse := subFlags.Bool("reverse", false, "Moves the served tablet type backward instead of forward. Use in case of trouble")
Expand Down
10 changes: 8 additions & 2 deletions go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,13 @@ func buildQuery(tableName, filter string) string {
}

func buildTablePlan(rule *binlogdatapb.Rule, tableKeys map[string][]string, lastpk *sqltypes.Result) (*TablePlan, error) {
sel, fromTable, err := analyzeSelectFrom(rule.Filter)
query := rule.Filter
if query == "" {
buf := sqlparser.NewTrackedBuffer(nil)
buf.Myprintf("select * from %v", sqlparser.NewTableIdent(rule.Match))
query = buf.String()
}
sel, fromTable, err := analyzeSelectFrom(query)
if err != nil {
return nil, err
}
Expand All @@ -162,7 +168,7 @@ func buildTablePlan(rule *binlogdatapb.Rule, tableKeys map[string][]string, last
if !expr.TableName.IsEmpty() {
return nil, fmt.Errorf("unsupported qualifier for '*' expression: %v", sqlparser.String(expr))
}
sendRule.Filter = rule.Filter
sendRule.Filter = query
tablePlan := &TablePlan{
TargetName: rule.Match,
SendRule: sendRule,
Expand Down
11 changes: 8 additions & 3 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,10 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map
// play is not resumable. If pausePos is set, play returns without updating the vreplication state.
func (vp *vplayer) play(ctx context.Context) error {
if !vp.stopPos.IsZero() && vp.startPos.AtLeast(vp.stopPos) {
return vp.vr.setState(binlogplayer.BlpStopped, fmt.Sprintf("Stop position %v already reached: %v", vp.startPos, vp.stopPos))
if vp.saveStop {
return vp.vr.setState(binlogplayer.BlpStopped, fmt.Sprintf("Stop position %v already reached: %v", vp.startPos, vp.stopPos))
}
return nil
}

plan, err := buildReplicatorPlan(vp.vr.source.Filter, vp.vr.tableKeys, vp.copyState)
Expand Down Expand Up @@ -324,8 +327,10 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m
}
if !vp.pos.Equal(vp.stopPos) && vp.pos.AtLeast(vp.stopPos) {
// Code is unreachable, but bad data can cause this to happen.
if err := vp.vr.setState(binlogplayer.BlpStopped, fmt.Sprintf("next event position %v exceeds stop pos %v, exiting without applying", vp.pos, vp.stopPos)); err != nil {
return err
if vp.saveStop {
if err := vp.vr.setState(binlogplayer.BlpStopped, fmt.Sprintf("next event position %v exceeds stop pos %v, exiting without applying", vp.pos, vp.stopPos)); err != nil {
return err
}
}
return io.EOF
}
Expand Down
102 changes: 102 additions & 0 deletions go/vt/wrangler/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/topotools"
"vitess.io/vitess/go/vt/topotools/events"
"vitess.io/vitess/go/vt/vterrors"
)

const (
Expand Down Expand Up @@ -90,6 +91,107 @@ func (wr *Wrangler) SetKeyspaceShardingInfo(ctx context.Context, keyspace, shard
return wr.ts.UpdateKeyspace(ctx, ki)
}

// SplitClone initiates a SplitClone workflow.
func (wr *Wrangler) SplitClone(ctx context.Context, keyspace string, from, to []string) error {
var fromShards, toShards []*topo.ShardInfo
for _, shard := range from {
si, err := wr.ts.GetShard(ctx, keyspace, shard)
if err != nil {
return vterrors.Wrapf(err, "GetShard(%s) failed", shard)
}
fromShards = append(fromShards, si)
}
for _, shard := range to {
si, err := wr.ts.GetShard(ctx, keyspace, shard)
if err != nil {
return vterrors.Wrapf(err, "GetShard(%s) failed", shard)
}
toShards = append(toShards, si)
}
// TODO(sougou): validate from and to shards.

for _, dest := range toShards {
master, err := wr.ts.GetTablet(ctx, dest.MasterAlias)
if err != nil {
return vterrors.Wrapf(err, "GetTablet(%v) failed", dest.MasterAlias)
}
var ids []uint64
for _, source := range fromShards {
filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "/.*",
Filter: key.KeyRangeString(dest.KeyRange),
}},
}
bls := &binlogdatapb.BinlogSource{
Keyspace: keyspace,
Shard: source.ShardName(),
Filter: filter,
}
cmd := binlogplayer.CreateVReplicationState("VSplitClone", bls, "", binlogplayer.BlpStopped, master.DbName())
qr, err := wr.TabletManagerClient().VReplicationExec(ctx, master.Tablet, cmd)
if err != nil {
return vterrors.Wrapf(err, "VReplicationExec(%v, %s) failed", dest.MasterAlias, cmd)
}
if err := wr.SourceShardAdd(ctx, keyspace, dest.ShardName(), uint32(qr.InsertId), keyspace, source.ShardName(), source.Shard.KeyRange, nil); err != nil {
return vterrors.Wrapf(err, "SourceShardAdd(%s, %s) failed", dest.ShardName(), source.ShardName())
}
ids = append(ids, qr.InsertId)
}
// Start vreplication only if all metadata was successfully created.
for _, id := range ids {
cmd := fmt.Sprintf("update _vt.vreplication set state='%s' where id=%d", binlogplayer.VReplicationInit, id)
if _, err = wr.TabletManagerClient().VReplicationExec(ctx, master.Tablet, cmd); err != nil {
return vterrors.Wrapf(err, "VReplicationExec(%v, %s) failed", dest.MasterAlias, cmd)
}
}
}
return wr.refreshMasters(ctx, toShards)
}

// VerticalSplitClone initiates a VerticalSplitClone workflow.
func (wr *Wrangler) VerticalSplitClone(ctx context.Context, fromKeyspace, toKeyspace string, tables []string) error {
source, err := wr.ts.GetOnlyShard(ctx, fromKeyspace)
if err != nil {
return vterrors.Wrapf(err, "GetOnlyShard(%s) failed", fromKeyspace)
}
dest, err := wr.ts.GetOnlyShard(ctx, toKeyspace)
if err != nil {
return vterrors.Wrapf(err, "GetOnlyShard(%s) failed", toKeyspace)
}
// TODO(sougou): validate from and to shards.

master, err := wr.ts.GetTablet(ctx, dest.MasterAlias)
if err != nil {
return vterrors.Wrapf(err, "GetTablet(%v) failed", dest.MasterAlias)
}
filter := &binlogdatapb.Filter{}
for _, table := range tables {
filter.Rules = append(filter.Rules, &binlogdatapb.Rule{
Match: table,
})
}
bls := &binlogdatapb.BinlogSource{
Keyspace: fromKeyspace,
Shard: source.ShardName(),
Filter: filter,
}
cmd := binlogplayer.CreateVReplicationState("VSplitClone", bls, "", binlogplayer.BlpStopped, master.DbName())
qr, err := wr.TabletManagerClient().VReplicationExec(ctx, master.Tablet, cmd)
if err != nil {
return vterrors.Wrapf(err, "VReplicationExec(%v, %s) failed", dest.MasterAlias, cmd)
}
if err := wr.SourceShardAdd(ctx, toKeyspace, dest.ShardName(), uint32(qr.InsertId), fromKeyspace, source.ShardName(), nil, tables); err != nil {
return vterrors.Wrapf(err, "SourceShardAdd(%s, %s) failed", dest.ShardName(), source.ShardName())
}
// Start vreplication only if metadata was successfully created.
cmd = fmt.Sprintf("update _vt.vreplication set state='%s' where id=%d", binlogplayer.VReplicationInit, qr.InsertId)
if _, err = wr.TabletManagerClient().VReplicationExec(ctx, master.Tablet, cmd); err != nil {
return vterrors.Wrapf(err, "VReplicationExec(%v, %s) failed", dest.MasterAlias, cmd)
}
return wr.refreshMasters(ctx, []*topo.ShardInfo{dest})
}

// ShowResharding shows all resharding related metadata for the keyspace/shard.
func (wr *Wrangler) ShowResharding(ctx context.Context, keyspace, shard string) (err error) {
ki, err := wr.ts.GetKeyspace(ctx, keyspace)
Expand Down