From 79508d3582c284b3ed34080cad1f6631699b3c70 Mon Sep 17 00:00:00 2001 From: Jon Tirsen Date: Fri, 21 Sep 2018 09:40:46 +0200 Subject: [PATCH 1/5] Multi split diff A new implementation of split diff that diffs against all the destination servers at the same time. Since we only have to read out of the source a single time this could give significant time savings. Signed-off-by: Jon Tirsen --- go/vt/worker/diff_utils.go | 32 +- go/vt/worker/multi_split_diff.go | 747 ++++++++++++++++++++++++++ go/vt/worker/multi_split_diff_cmd.go | 250 +++++++++ go/vt/worker/multi_split_diff_test.go | 342 ++++++++++++ 4 files changed, 1355 insertions(+), 16 deletions(-) create mode 100644 go/vt/worker/multi_split_diff.go create mode 100644 go/vt/worker/multi_split_diff_cmd.go create mode 100644 go/vt/worker/multi_split_diff_test.go diff --git a/go/vt/worker/diff_utils.go b/go/vt/worker/diff_utils.go index ae9ba1a6ba9..43b469fd21b 100644 --- a/go/vt/worker/diff_utils.go +++ b/go/vt/worker/diff_utils.go @@ -430,7 +430,7 @@ func CompareRows(fields []*querypb.Field, compareCount int, left, right []sqltyp r := rv.([]byte) return bytes.Compare(l, r), nil default: - return 0, fmt.Errorf("Unsuported type %T returned by mysql.proto.Convert", l) + return 0, fmt.Errorf("Unsupported type %T returned by mysql.proto.Convert", l) } } return 0, nil @@ -440,27 +440,27 @@ func CompareRows(fields []*querypb.Field, compareCount int, left, right []sqltyp // It assumes left and right are sorted by ascending primary key. // it will record errors if extra rows exist on either side. type RowDiffer struct { - left *RowReader - right *RowReader - pkFieldCount int + left *RowReader + right *RowReader + tableDefinition *tabletmanagerdatapb.TableDefinition } // NewRowDiffer returns a new RowDiffer -func NewRowDiffer(left, right *QueryResultReader, tableDefinition *tabletmanagerdatapb.TableDefinition) (*RowDiffer, error) { +func NewRowDiffer(left, right ResultReader, tableDefinition *tabletmanagerdatapb.TableDefinition) (*RowDiffer, error) { leftFields := left.Fields() rightFields := right.Fields() if len(leftFields) != len(rightFields) { - return nil, fmt.Errorf("Cannot diff inputs with different types") + return nil, fmt.Errorf("[table=%v] Cannot diff inputs with different types", tableDefinition.Name) } for i, field := range leftFields { if field.Type != rightFields[i].Type { - return nil, fmt.Errorf("Cannot diff inputs with different types: field %v types are %v and %v", i, field.Type, rightFields[i].Type) + return nil, fmt.Errorf("[table=%v] Cannot diff inputs with different types: field %v types are %v and %v", tableDefinition.Name, i, field.Type, rightFields[i].Type) } } return &RowDiffer{ - left: NewRowReader(left), - right: NewRowReader(right), - pkFieldCount: len(tableDefinition.PrimaryKeyColumns), + left: NewRowReader(left), + right: NewRowReader(right), + tableDefinition: tableDefinition, }, nil } @@ -529,10 +529,10 @@ func (rd *RowDiffer) Go(log logutil.Logger) (dr DiffReport, err error) { continue } - if f >= rd.pkFieldCount { + if f >= len(rd.tableDefinition.PrimaryKeyColumns) { // rows have the same primary key, only content is different if dr.mismatchedRows < 10 { - log.Errorf("Different content %v in same PK: %v != %v", dr.mismatchedRows, left, right) + log.Errorf("[table=%v] Different content %v in same PK: %v != %v", rd.tableDefinition.Name, dr.mismatchedRows, left, right) } dr.mismatchedRows++ advanceLeft = true @@ -541,20 +541,20 @@ func (rd *RowDiffer) Go(log logutil.Logger) (dr DiffReport, err error) { } // have to find the 'smallest' row and advance it - c, err := CompareRows(rd.left.Fields(), rd.pkFieldCount, left, right) + c, err := CompareRows(rd.left.Fields(), len(rd.tableDefinition.PrimaryKeyColumns), left, right) if err != nil { return dr, err } if c < 0 { if dr.extraRowsLeft < 10 { - log.Errorf("Extra row %v on left: %v", dr.extraRowsLeft, left) + log.Errorf("[table=%v] Extra row %v on left: %v", rd.tableDefinition.Name, dr.extraRowsLeft, left) } dr.extraRowsLeft++ advanceLeft = true continue } else if c > 0 { if dr.extraRowsRight < 10 { - log.Errorf("Extra row %v on right: %v", dr.extraRowsRight, right) + log.Errorf("[table=%v] Extra row %v on right: %v", rd.tableDefinition.Name, dr.extraRowsRight, right) } dr.extraRowsRight++ advanceRight = true @@ -565,7 +565,7 @@ func (rd *RowDiffer) Go(log logutil.Logger) (dr DiffReport, err error) { // they're the same. Logging a regular difference // then, and advancing both. if dr.mismatchedRows < 10 { - log.Errorf("Different content %v in same PK: %v != %v", dr.mismatchedRows, left, right) + log.Errorf("[table=%v] Different content %v in same PK: %v != %v", rd.tableDefinition.Name, dr.mismatchedRows, left, right) } dr.mismatchedRows++ advanceLeft = true diff --git a/go/vt/worker/multi_split_diff.go b/go/vt/worker/multi_split_diff.go new file mode 100644 index 00000000000..7b635fcbf36 --- /dev/null +++ b/go/vt/worker/multi_split_diff.go @@ -0,0 +1,747 @@ +/* +Copyright 2017 Google Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package worker + +import ( + "fmt" + "html/template" + "sync" + + "golang.org/x/net/context" + + "vitess.io/vitess/go/sync2" + "vitess.io/vitess/go/vt/concurrency" + "vitess.io/vitess/go/vt/mysqlctl/tmutils" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/wrangler" + + "sort" + + "bytes" + + "time" + + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/binlog/binlogplayer" + "vitess.io/vitess/go/vt/key" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/vtgate/vindexes" +) + +// MultiSplitDiffWorker executes a diff between a destination shard and its +// source shards in a shard split case. +type MultiSplitDiffWorker struct { + StatusWorker + + wr *wrangler.Wrangler + cell string + keyspace string + shard string + excludeTables []string + excludeShards []string + minHealthyRdonlyTablets int + destinationTabletType topodatapb.TabletType + parallelDiffsCount int + waitForFixedTimeRatherThanGtidSet bool + cleaner *wrangler.Cleaner + + // populated during WorkerStateInit, read-only after that + keyspaceInfo *topo.KeyspaceInfo + shardInfo *topo.ShardInfo + sourceUID uint32 + destinationShards []*topo.ShardInfo + + // populated during WorkerStateFindTargets, read-only after that + sourceAlias *topodatapb.TabletAlias + destinationAliases []*topodatapb.TabletAlias // matches order of destinationShards + + // populated during WorkerStateDiff + sourceSchemaDefinition *tabletmanagerdatapb.SchemaDefinition + destinationSchemaDefinitions []*tabletmanagerdatapb.SchemaDefinition +} + +// NewMultiSplitDiffWorker returns a new MultiSplitDiffWorker object. +func NewMultiSplitDiffWorker(wr *wrangler.Wrangler, cell, keyspace, shard string, excludeTables []string, excludeShards []string, minHealthyRdonlyTablets, parallelDiffsCount int, waitForFixedTimeRatherThanGtidSet bool, tabletType topodatapb.TabletType) Worker { + return &MultiSplitDiffWorker{ + StatusWorker: NewStatusWorker(), + wr: wr, + cell: cell, + keyspace: keyspace, + shard: shard, + excludeTables: excludeTables, + excludeShards: excludeShards, + minHealthyRdonlyTablets: minHealthyRdonlyTablets, + destinationTabletType: tabletType, + parallelDiffsCount: parallelDiffsCount, + waitForFixedTimeRatherThanGtidSet: waitForFixedTimeRatherThanGtidSet, + cleaner: &wrangler.Cleaner{}, + } +} + +// StatusAsHTML is part of the Worker interface +func (msdw *MultiSplitDiffWorker) StatusAsHTML() template.HTML { + state := msdw.State() + + result := "Working on: " + msdw.keyspace + "/" + msdw.shard + "
\n" + result += "State: " + state.String() + "
\n" + switch state { + case WorkerStateDiff: + result += "Running...
\n" + case WorkerStateDone: + result += "Success.
\n" + } + + return template.HTML(result) +} + +// StatusAsText is part of the Worker interface +func (msdw *MultiSplitDiffWorker) StatusAsText() string { + state := msdw.State() + + result := "Working on: " + msdw.keyspace + "/" + msdw.shard + "\n" + result += "State: " + state.String() + "\n" + switch state { + case WorkerStateDiff: + result += "Running...\n" + case WorkerStateDone: + result += "Success.\n" + } + return result +} + +// Run is mostly a wrapper to run the cleanup at the end. +func (msdw *MultiSplitDiffWorker) Run(ctx context.Context) error { + resetVars() + err := msdw.run(ctx) + + msdw.SetState(WorkerStateCleanUp) + cerr := msdw.cleaner.CleanUp(msdw.wr) + if cerr != nil { + if err != nil { + msdw.wr.Logger().Errorf("CleanUp failed in addition to job error: %v", cerr) + } else { + err = cerr + } + } + if err != nil { + msdw.wr.Logger().Errorf("Run() error: %v", err) + msdw.SetState(WorkerStateError) + return err + } + msdw.SetState(WorkerStateDone) + return nil +} + +func (msdw *MultiSplitDiffWorker) run(ctx context.Context) error { + // first state: read what we need to do + if err := msdw.init(ctx); err != nil { + return fmt.Errorf("init() failed: %v", err) + } + if err := checkDone(ctx); err != nil { + return err + } + + // second state: find targets + if err := msdw.findTargets(ctx); err != nil { + return fmt.Errorf("findTargets() failed: %v", err) + } + if err := checkDone(ctx); err != nil { + return err + } + + // third phase: synchronize replication + if err := msdw.synchronizeReplication(ctx); err != nil { + return fmt.Errorf("synchronizeReplication() failed: %v", err) + } + if err := checkDone(ctx); err != nil { + return err + } + + // fourth phase: diff + if err := msdw.diff(ctx); err != nil { + return fmt.Errorf("diff() failed: %v", err) + } + if err := checkDone(ctx); err != nil { + return err + } + + return nil +} + +// findDestinationShards finds all the shards that have filtered replication from the source shard +func (msdw *MultiSplitDiffWorker) findDestinationShards(ctx context.Context) ([]*topo.ShardInfo, error) { + shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) + keyspaces, err := msdw.wr.TopoServer().GetKeyspaces(shortCtx) + cancel() + if err != nil { + return nil, fmt.Errorf("failed to get list of keyspaces: %v", err) + } + + wg := sync.WaitGroup{} + mu := sync.Mutex{} // protects result + result := make([]*topo.ShardInfo, 0, len(keyspaces)) + foundFirst := false + rec := concurrency.AllErrorRecorder{} + for _, keyspace := range keyspaces { + wg.Add(1) + go func(keyspace string) { + defer wg.Done() + shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) + shards, err := msdw.wr.TopoServer().GetShardNames(shortCtx, keyspace) + cancel() + if err != nil { + rec.RecordError(fmt.Errorf("failed to get list of shards for keyspace '%v': %v", keyspace, err)) + return + } + for _, shard := range shards { + wg.Add(1) + go func(keyspace, shard string) { + defer wg.Done() + shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) + si, err := msdw.wr.TopoServer().GetShard(shortCtx, keyspace, shard) + cancel() + if err != nil { + rec.RecordError(fmt.Errorf("failed to get details for shard '%v': %v", topoproto.KeyspaceShardString(keyspace, shard), err)) + return + } + + if stringContains(msdw.excludeShards, si.ShardName()) { + msdw.wr.Logger().Infof("ignoring shard %v/%v", si.Keyspace(), si.ShardName()) + return + } + + for _, sourceShard := range si.SourceShards { + if len(sourceShard.Tables) == 0 && sourceShard.Keyspace == msdw.keyspace && sourceShard.Shard == msdw.shard { + mu.Lock() + if foundFirst { + if msdw.sourceUID != sourceShard.Uid { + rec.RecordError(fmt.Errorf("different uid for the blp in %v/%v", si.Keyspace(), si.ShardName())) + } + } else { + msdw.sourceUID = sourceShard.Uid + } + result = append(result, si) + mu.Unlock() + // Prevents the same shard from showing up multiple times + break + } + } + + }(keyspace, shard) + } + }(keyspace) + } + wg.Wait() + + if rec.HasErrors() { + return nil, rec.Error() + } + if len(result) == 0 { + return nil, fmt.Errorf("there are no destination shards") + } + return result, nil +} + +func stringContains(l []string, s string) bool { + for _, v := range l { + if v == s { + return true + } + } + return false +} + +// init phase: +// - read the shard info, make sure it has sources +func (msdw *MultiSplitDiffWorker) init(ctx context.Context) error { + msdw.SetState(WorkerStateInit) + + var err error + shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) + msdw.keyspaceInfo, err = msdw.wr.TopoServer().GetKeyspace(shortCtx, msdw.keyspace) + cancel() + if err != nil { + return fmt.Errorf("cannot read keyspace %v: %v", msdw.keyspace, err) + } + shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) + msdw.shardInfo, err = msdw.wr.TopoServer().GetShard(shortCtx, msdw.keyspace, msdw.shard) + cancel() + if err != nil { + return fmt.Errorf("cannot read shard %v/%v: %v", msdw.keyspace, msdw.shard, err) + } + + if !msdw.shardInfo.HasMaster() { + return fmt.Errorf("shard %v/%v has no master", msdw.keyspace, msdw.shard) + } + + destinationShards, err := msdw.findDestinationShards(ctx) + if err != nil { + return fmt.Errorf("findDestinationShards() failed for %v/%v/%v: %v", msdw.cell, msdw.keyspace, msdw.shard, err) + } + msdw.destinationShards = destinationShards + + return nil +} + +// findTargets phase: +// - find one rdonly in source shard +// - find one rdonly per destination shard +// - mark them all as 'worker' pointing back to us +func (msdw *MultiSplitDiffWorker) findTargets(ctx context.Context) error { + msdw.SetState(WorkerStateFindTargets) + + var err error + + // find an appropriate tablet in the source shard + msdw.sourceAlias, err = FindWorkerTablet( + ctx, + msdw.wr, + msdw.cleaner, + nil, /* tsc */ + msdw.cell, + msdw.keyspace, + msdw.shard, + 1, /* minHealthyTablets */ + msdw.destinationTabletType) + if err != nil { + return fmt.Errorf("FindWorkerTablet() failed for %v/%v/%v: %v", msdw.cell, msdw.keyspace, msdw.shard, err) + } + + // find an appropriate tablet in each destination shard + msdw.destinationAliases = make([]*topodatapb.TabletAlias, len(msdw.destinationShards)) + for i, destinationShard := range msdw.destinationShards { + keyspace := destinationShard.Keyspace() + shard := destinationShard.ShardName() + destinationAlias, err := FindWorkerTablet( + ctx, + msdw.wr, + msdw.cleaner, + nil, /* tsc */ + msdw.cell, + keyspace, + shard, + msdw.minHealthyRdonlyTablets, + topodatapb.TabletType_RDONLY) + if err != nil { + return fmt.Errorf("FindWorkerTablet() failed for %v/%v/%v: %v", msdw.cell, keyspace, shard, err) + } + msdw.destinationAliases[i] = destinationAlias + } + if err != nil { + return fmt.Errorf("FindWorkerTablet() failed for %v/%v/%v: %v", msdw.cell, msdw.keyspace, msdw.shard, err) + } + + return nil +} + +// synchronizeReplication phase: +// 1 - ask the master of the destination shard to pause filtered replication, +// and return the source binlog positions +// (add a cleanup task to restart filtered replication on master) +// 2 - stop the source tablet at a binlog position higher than the +// destination master. Get that new list of positions. +// (add a cleanup task to restart binlog replication on the source tablet, and +// change the existing ChangeSlaveType cleanup action to 'spare' type) +// 3 - ask the master of the destination shard to resume filtered replication +// up to the new list of positions, and return its binlog position. +// 4 - wait until the destination tablet is equal or passed that master +// binlog position, and stop its replication. +// (add a cleanup task to restart binlog replication on it, and change +// the existing ChangeSlaveType cleanup action to 'spare' type) +// 5 - restart filtered replication on the destination master. +// (remove the cleanup task that does the same) +// At this point, the source and the destination tablet are stopped at the same +// point. + +func (msdw *MultiSplitDiffWorker) synchronizeReplication(ctx context.Context) error { + msdw.SetState(WorkerStateSyncReplication) + + masterInfos := make([]*topo.TabletInfo, len(msdw.destinationAliases)) + for i, shardInfo := range msdw.destinationShards { + shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) + masterInfo, err := msdw.wr.TopoServer().GetTablet(shortCtx, shardInfo.MasterAlias) + cancel() + if err != nil { + return fmt.Errorf("synchronizeReplication: cannot get Tablet record for master %v: %v", msdw.shardInfo.MasterAlias, err) + } + masterInfos[i] = masterInfo + } + + // 1 - stop replication on all the destinations masters + destVreplicationPos := make([]string, len(msdw.destinationShards)) + for i, shardInfo := range msdw.destinationShards { + masterInfo := masterInfos[i] + + msdw.wr.Logger().Infof("Stopping master binlog replication on %v", shardInfo.MasterAlias) + shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) + _, err := msdw.wr.TabletManagerClient().VReplicationExec(shortCtx, masterInfo.Tablet, binlogplayer.StopVReplication(msdw.sourceUID, "for split diff")) + if err != nil { + cancel() + return fmt.Errorf("VReplicationExec(stop) for %v failed: %v", shardInfo.MasterAlias, err) + } + wrangler.RecordVReplicationAction(msdw.cleaner, masterInfo.Tablet, binlogplayer.StartVReplication(msdw.sourceUID)) + p3qr, err := msdw.wr.TabletManagerClient().VReplicationExec(shortCtx, masterInfo.Tablet, binlogplayer.ReadVReplicationPos(msdw.sourceUID)) + if err != nil { + cancel() + return fmt.Errorf("VReplicationExec(stop) for %v failed: %v", msdw.shardInfo.MasterAlias, err) + } + qr := sqltypes.Proto3ToResult(p3qr) + if len(qr.Rows) != 1 || len(qr.Rows[0]) != 1 { + cancel() + return fmt.Errorf("Unexpected result while reading position: %v", qr) + } + destVreplicationPos[i] = qr.Rows[0][0].ToString() + cancel() + if err != nil { + return fmt.Errorf("StopBlp for %v failed: %v", msdw.shardInfo.MasterAlias, err) + } + } + + // 2 - stop replication on the source rdonly tablet later than the last blp pos and note the blp position + shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) + sourceTablet, err := msdw.wr.TopoServer().GetTablet(shortCtx, msdw.sourceAlias) + cancel() + if err != nil { + return err + } + + var mysqlPos string // will be the last GTID that we stopped at + for _, vreplicationPos := range destVreplicationPos { + // We need to stop the source RDONLY tablet at a position which includes ALL of the positions of the destination + // shards. We do this by starting replication and then stopping at a minimum of each blp position separately. + // TODO this is not terribly efficient but it's possible to implement without changing the existing RPC, + // if we make StopSlaveMinimum take multiple blp positions then this will be a lot more efficient because you just + // check for each position using WAIT_UNTIL_SQL_THREAD_AFTER_GTIDS and then stop replication. + + msdw.wr.Logger().Infof("Stopping slave %v at a minimum of %v", msdw.sourceAlias, vreplicationPos) + // read the tablet + sourceTablet, err := msdw.wr.TopoServer().GetTablet(shortCtx, msdw.sourceAlias) + if err != nil { + return err + } + + shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) + defer cancel() + mysqlPos, err = msdw.wr.TabletManagerClient().StopSlaveMinimum(shortCtx, sourceTablet.Tablet, vreplicationPos, *remoteActionsTimeout) + if err != nil { + return fmt.Errorf("cannot stop slave %v at right binlog position %v: %v", msdw.sourceAlias, vreplicationPos, err) + } + } + // change the cleaner actions from ChangeSlaveType(rdonly) + // to StartSlave() + ChangeSlaveType(spare) + wrangler.RecordStartSlaveAction(msdw.cleaner, sourceTablet.Tablet) + + for i, shardInfo := range msdw.destinationShards { + masterInfo := masterInfos[i] + destinationAlias := msdw.destinationAliases[i] + + // 3 - run all the destination masters blp until they've reached that position + msdw.wr.Logger().Infof("Restarting master %v until it catches up to %v", shardInfo.MasterAlias, mysqlPos) + shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) + defer cancel() + _, err = msdw.wr.TabletManagerClient().VReplicationExec(shortCtx, masterInfo.Tablet, binlogplayer.StartVReplicationUntil(msdw.sourceUID, mysqlPos)) + if err != nil { + return fmt.Errorf("VReplication(start until) for %v until %v failed: %v", shardInfo.MasterAlias, mysqlPos, err) + } + if err := msdw.wr.TabletManagerClient().VReplicationWaitForPos(shortCtx, masterInfo.Tablet, int(msdw.sourceUID), mysqlPos); err != nil { + return fmt.Errorf("VReplicationWaitForPos for %v until %v failed: %v", shardInfo.MasterAlias, mysqlPos, err) + } + masterPos, err := msdw.wr.TabletManagerClient().MasterPosition(shortCtx, masterInfo.Tablet) + if err != nil { + return fmt.Errorf("MasterPosition for %v failed: %v", msdw.shardInfo.MasterAlias, err) + } + + // 4 - stop replication on all the destination rdonlys + if msdw.waitForFixedTimeRatherThanGtidSet { + msdw.wr.Logger().Infof("Workaround for broken GTID set in destination RDONLY. Just waiting for 1 minute for %v and assuming replication has caught up. (should be at %v)", destinationAlias, masterPos) + } else { + msdw.wr.Logger().Infof("Waiting for destination tablet %v to catch up to %v", destinationAlias, masterPos) + } + shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) + destinationTablet, err := msdw.wr.TopoServer().GetTablet(shortCtx, destinationAlias) + cancel() + if err != nil { + return err + } + + if msdw.waitForFixedTimeRatherThanGtidSet { + time.Sleep(1 * time.Minute) + } + + shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) + if msdw.waitForFixedTimeRatherThanGtidSet { + err = msdw.wr.TabletManagerClient().StopSlave(shortCtx, destinationTablet.Tablet) + } else { + _, err = msdw.wr.TabletManagerClient().StopSlaveMinimum(shortCtx, destinationTablet.Tablet, masterPos, *remoteActionsTimeout) + } + cancel() + if err != nil { + return fmt.Errorf("StopSlaveMinimum for %v at %v failed: %v", destinationAlias, masterPos, err) + } + wrangler.RecordStartSlaveAction(msdw.cleaner, destinationTablet.Tablet) + + // 5 - restart replication on the destination masters + + msdw.wr.Logger().Infof("Restarting filtered replication on master %v", shardInfo.MasterAlias) + shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) + defer cancel() + if _, err = msdw.wr.TabletManagerClient().VReplicationExec(ctx, masterInfo.Tablet, binlogplayer.StartVReplication(msdw.sourceUID)); err != nil { + return fmt.Errorf("VReplicationExec(start) failed for %v: %v", shardInfo.MasterAlias, err) + } + cancel() + if err != nil { + return fmt.Errorf("StartBlp failed for %v: %v", shardInfo.MasterAlias, err) + } + } + + return nil +} + +// diff phase: will log messages regarding the diff. +// - get the schema on all tablets +// - if some table schema mismatches, record them (use existing schema diff tools). +// - for each table in destination, run a diff pipeline. + +func (msdw *MultiSplitDiffWorker) diff(ctx context.Context) error { + msdw.SetState(WorkerStateDiff) + + msdw.wr.Logger().Infof("Gathering schema information...") + wg := sync.WaitGroup{} + rec := &concurrency.AllErrorRecorder{} + mu := sync.Mutex{} // protects msdw.destinationSchemaDefinitions + msdw.destinationSchemaDefinitions = make([]*tabletmanagerdatapb.SchemaDefinition, len(msdw.destinationAliases)) + for i, destinationAlias := range msdw.destinationAliases { + wg.Add(1) + go func(i int, destinationAlias *topodatapb.TabletAlias) { + var err error + shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) + destinationSchemaDefinition, err := msdw.wr.GetSchema( + shortCtx, destinationAlias, nil /* tables */, msdw.excludeTables, false /* includeViews */) + cancel() + rec.RecordError(err) + mu.Lock() + msdw.destinationSchemaDefinitions[i] = destinationSchemaDefinition + mu.Unlock() + msdw.wr.Logger().Infof("Got schema from destination %v", destinationAlias) + wg.Done() + }(i, destinationAlias) + } + wg.Add(1) + go func() { + var err error + shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) + msdw.sourceSchemaDefinition, err = msdw.wr.GetSchema( + shortCtx, msdw.sourceAlias, nil /* tables */, msdw.excludeTables, false /* includeViews */) + cancel() + rec.RecordError(err) + msdw.wr.Logger().Infof("Got schema from source %v", msdw.sourceAlias) + wg.Done() + }() + + wg.Wait() + if rec.HasErrors() { + return rec.Error() + } + + msdw.wr.Logger().Infof("Diffing the schema...") + rec = &concurrency.AllErrorRecorder{} + sourceShardName := fmt.Sprintf("%v/%v", msdw.shardInfo.Keyspace(), msdw.shardInfo.ShardName()) + for i, destinationSchemaDefinition := range msdw.destinationSchemaDefinitions { + destinationShard := msdw.destinationShards[i] + destinationShardName := fmt.Sprintf("%v/%v", destinationShard.Keyspace(), destinationShard.ShardName()) + tmutils.DiffSchema(destinationShardName, destinationSchemaDefinition, sourceShardName, msdw.sourceSchemaDefinition, rec) + } + if rec.HasErrors() { + msdw.wr.Logger().Warningf("Different schemas: %v", rec.Error().Error()) + } else { + msdw.wr.Logger().Infof("Schema match, good.") + } + + // read the vschema if needed + var keyspaceSchema *vindexes.KeyspaceSchema + if *useV3ReshardingMode { + kschema, err := msdw.wr.TopoServer().GetVSchema(ctx, msdw.keyspace) + if err != nil { + return fmt.Errorf("cannot load VSchema for keyspace %v: %v", msdw.keyspace, err) + } + if kschema == nil { + return fmt.Errorf("no VSchema for keyspace %v", msdw.keyspace) + } + + keyspaceSchema, err = vindexes.BuildKeyspaceSchema(kschema, msdw.keyspace) + if err != nil { + return fmt.Errorf("cannot build vschema for keyspace %v: %v", msdw.keyspace, err) + } + } + + // Compute the overlap keyrange. Later, we'll compare it with + // source or destination keyrange. If it matches either, + // we'll just ask for all the data. If the overlap is a subset, + // we'll filter. + var err error + var keyranges = make([]*topodatapb.KeyRange, len(msdw.destinationShards)) + for i, destinationShard := range msdw.destinationShards { + keyranges[i] = destinationShard.KeyRange + } + union, err := keyRangesUnion(keyranges) + if err != nil { + return err + } + + // run diffs in parallel + msdw.wr.Logger().Infof("Running the diffs...") + sem := sync2.NewSemaphore(msdw.parallelDiffsCount, 0) + tableDefinitions := msdw.sourceSchemaDefinition.TableDefinitions + + // sort tables by size + // if there are large deltas between table sizes then it's more efficient to start working on the large tables first + sort.Slice(tableDefinitions, func(i, j int) bool { return tableDefinitions[i].DataLength > tableDefinitions[j].DataLength }) + + // use a channel to make sure tables are diffed in order + tableChan := make(chan *tabletmanagerdatapb.TableDefinition, len(tableDefinitions)) + for _, tableDefinition := range tableDefinitions { + tableChan <- tableDefinition + } + + // start as many goroutines as there are tables to diff + for range tableDefinitions { + wg.Add(1) + go func() { + defer wg.Done() + // use the semaphore to limit the number of tables that are diffed in parallel + sem.Acquire() + defer sem.Release() + + // grab the table to process out of the channel + tableDefinition := <-tableChan + + msdw.wr.Logger().Infof("Starting the diff on table %v", tableDefinition.Name) + + // On the source, see if we need a full scan + // or a filtered scan. + var err error + var sourceQueryResultReader *QueryResultReader + if key.KeyRangeEqual(union, msdw.shardInfo.KeyRange) { + sourceQueryResultReader, err = TableScan(ctx, msdw.wr.Logger(), msdw.wr.TopoServer(), msdw.sourceAlias, tableDefinition) + } else { + msdw.wr.Logger().Infof("Filtering source down to %v", union) + sourceQueryResultReader, err = TableScanByKeyRange(ctx, msdw.wr.Logger(), msdw.wr.TopoServer(), msdw.sourceAlias, tableDefinition, union, keyspaceSchema, msdw.keyspaceInfo.ShardingColumnName, msdw.keyspaceInfo.ShardingColumnType) + } + if err != nil { + newErr := fmt.Errorf("TableScan(source) failed: %v", err) + rec.RecordError(newErr) + msdw.wr.Logger().Errorf("%v", newErr) + return + } + defer sourceQueryResultReader.Close(ctx) + + // On the destination, see if we need a full scan + // or a filtered scan. + destinationQueryResultReaders := make([]ResultReader, len(msdw.destinationAliases)) + for i, destinationAlias := range msdw.destinationAliases { + destinationQueryResultReader, err := TableScan(ctx, msdw.wr.Logger(), msdw.wr.TopoServer(), destinationAlias, tableDefinition) + if err != nil { + newErr := fmt.Errorf("TableScan(destination) failed: %v", err) + rec.RecordError(newErr) + msdw.wr.Logger().Errorf("%v", newErr) + return + } + //noinspection GoDeferInLoop + defer destinationQueryResultReader.Close(ctx) + destinationQueryResultReaders[i] = destinationQueryResultReader + } + mergedResultReader, err := NewResultMerger(destinationQueryResultReaders, len(tableDefinition.PrimaryKeyColumns)) + if err != nil { + newErr := fmt.Errorf("NewResultMerger failed: %v", err) + rec.RecordError(newErr) + msdw.wr.Logger().Errorf("%v", newErr) + return + } + + // Create the row differ. + differ, err := NewRowDiffer(sourceQueryResultReader, mergedResultReader, tableDefinition) + if err != nil { + newErr := fmt.Errorf("NewRowDiffer() failed: %v", err) + rec.RecordError(newErr) + msdw.wr.Logger().Errorf("%v", newErr) + return + } + + // And run the diff. + report, err := differ.Go(msdw.wr.Logger()) + if err != nil { + newErr := fmt.Errorf("Differ.Go failed: %v", err.Error()) + rec.RecordError(newErr) + msdw.wr.Logger().Errorf("%v", newErr) + } else { + if report.HasDifferences() { + err := fmt.Errorf("Table %v has differences: %v", tableDefinition.Name, report.String()) + rec.RecordError(err) + msdw.wr.Logger().Warningf(err.Error()) + } else { + msdw.wr.Logger().Infof("Table %v checks out (%v rows processed, %v qps)", tableDefinition.Name, report.processedRows, report.processingQPS) + } + } + }() + } + + // grab the table to process out of the channel + wg.Wait() + + return rec.Error() +} + +func keyRangesUnion(keyranges []*topodatapb.KeyRange) (*topodatapb.KeyRange, error) { + // HACK HACK HACK + // This assumes the ranges are consecutive. It just returns the smallest start and the largest end. + + var start []byte + var end []byte + for i, keyrange := range keyranges { + if i == 0 { + // initialize the first values + start = keyrange.Start + end = keyrange.End + continue + } + + // nil always wins + if keyrange.Start == nil { + start = nil + } else { + if start != nil { + if bytes.Compare(start, keyrange.Start) > 0 { + start = keyrange.Start + } + } + } + + if keyrange.End == nil { + end = nil + } else { + if end != nil { + if bytes.Compare(end, keyrange.End) < 0 { + end = keyrange.End + } + } + } + } + + return &topodatapb.KeyRange{Start: start, End: end}, nil +} diff --git a/go/vt/worker/multi_split_diff_cmd.go b/go/vt/worker/multi_split_diff_cmd.go new file mode 100644 index 00000000000..37064dbbaad --- /dev/null +++ b/go/vt/worker/multi_split_diff_cmd.go @@ -0,0 +1,250 @@ +/* +Copyright 2017 Google Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package worker + +import ( + "flag" + "fmt" + "html/template" + "net/http" + "strconv" + "strings" + "sync" + + "golang.org/x/net/context" + "vitess.io/vitess/go/vt/concurrency" + "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/wrangler" + + topodatapb "vitess.io/vitess/go/vt/proto/topodata" +) + +const multiSplitDiffHTML = ` + + + Multi Split Diff Action + + +

Multi Split Diff Action

+ + {{if .Error}} + Error: {{.Error}}
+ {{else}} + {{range $i, $si := .Shards}} +
  • {{$si.Keyspace}}/{{$si.Shard}}
  • + {{end}} + {{end}} + +` + +const multiSplitDiffHTML2 = ` + + + Multi Split Diff Action + + +

    Shard involved: {{.Keyspace}}/{{.Shard}}

    +

    Multi Split Diff Action

    +
    + +
    + +
    + +
    + +
    + +
    + +
    + + + +
    + +` + +var multiSplitDiffTemplate = mustParseTemplate("multiSplitDiff", multiSplitDiffHTML) +var multiSplitDiffTemplate2 = mustParseTemplate("multiSplitDiff2", multiSplitDiffHTML2) + +func commandMultiSplitDiff(wi *Instance, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) (Worker, error) { + excludeTables := subFlags.String("exclude_tables", "", "comma separated list of tables to exclude") + excludeShards := subFlags.String("exclude_shards", "", "comma separated list of shards to exclude") + minHealthyRdonlyTablets := subFlags.Int("min_healthy_rdonly_tablets", defaultMinHealthyRdonlyTablets, "minimum number of healthy RDONLY tablets before taking out one") + destTabletTypeStr := subFlags.String("dest_tablet_type", defaultDestTabletType, "destination tablet type (RDONLY or REPLICA) that will be used to compare the shards") + parallelDiffsCount := subFlags.Int("parallel_diffs_count", defaultParallelDiffsCount, "number of tables to diff in parallel") + waitForFixedTimeRatherThanGtidSet := subFlags.Bool("wait_for_fixed_time_rather_than_gtid_set", false, "wait for 1m when syncing up the destination RDONLY tablet rather than using the GTID set. Use this when the GTID set on the RDONLY is broken. Make sure the RDONLY is not behind in replication when using this flag.") + if err := subFlags.Parse(args); err != nil { + return nil, err + } + if subFlags.NArg() != 1 { + subFlags.Usage() + return nil, fmt.Errorf("command MultiSplitDiff requires ") + } + keyspace, shard, err := topoproto.ParseKeyspaceShard(subFlags.Arg(0)) + if err != nil { + return nil, err + } + var excludeTableArray []string + if *excludeTables != "" { + excludeTableArray = strings.Split(*excludeTables, ",") + } + var excludeShardsArray []string + if *excludeShards != "" { + excludeShardsArray = strings.Split(*excludeShards, ",") + } + destTabletType, ok := topodatapb.TabletType_value[*destTabletTypeStr] + if !ok { + return nil, fmt.Errorf("command MultiSplitDiff invalid dest_tablet_type: %v", destTabletType) + } + return NewMultiSplitDiffWorker(wr, wi.cell, keyspace, shard, excludeTableArray, excludeShardsArray, *minHealthyRdonlyTablets, *parallelDiffsCount, *waitForFixedTimeRatherThanGtidSet, topodatapb.TabletType(destTabletType)), nil +} + +// shardSources returns all the shards that are SourceShards of at least one other shard. +func shardSources(ctx context.Context, wr *wrangler.Wrangler) ([]map[string]string, error) { + shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) + keyspaces, err := wr.TopoServer().GetKeyspaces(shortCtx) + cancel() + if err != nil { + return nil, fmt.Errorf("failed to get list of keyspaces: %v", err) + } + + wg := sync.WaitGroup{} + mu := sync.Mutex{} // protects sourceShards + // Use a map to dedupe source shards + sourceShards := make(map[string]map[string]string) + rec := concurrency.AllErrorRecorder{} + for _, keyspace := range keyspaces { + wg.Add(1) + go func(keyspace string) { + defer wg.Done() + shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) + shards, err := wr.TopoServer().GetShardNames(shortCtx, keyspace) + cancel() + if err != nil { + rec.RecordError(fmt.Errorf("failed to get list of shards for keyspace '%v': %v", keyspace, err)) + return + } + for _, shard := range shards { + wg.Add(1) + go func(keyspace, shard string) { + defer wg.Done() + shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) + si, err := wr.TopoServer().GetShard(shortCtx, keyspace, shard) + cancel() + if err != nil { + rec.RecordError(fmt.Errorf("failed to get details for shard '%v': %v", topoproto.KeyspaceShardString(keyspace, shard), err)) + return + } + + if len(si.SourceShards) > 0 && len(si.SourceShards[0].Tables) == 0 { + mu.Lock() + for _, sourceShard := range si.SourceShards { + sourceShards[fmt.Sprintf("%v/%v", sourceShard.Keyspace, sourceShard.Shard)] = + map[string]string{ + "Keyspace": sourceShard.Keyspace, + "Shard": sourceShard.Shard, + } + } + mu.Unlock() + } + }(keyspace, shard) + } + }(keyspace) + } + wg.Wait() + + if rec.HasErrors() { + return nil, rec.Error() + } + result := make([]map[string]string, 0, len(sourceShards)) + for _, shard := range sourceShards { + result = append(result, shard) + } + if len(result) == 0 { + return nil, fmt.Errorf("there are no shards with SourceShards") + } + return result, nil +} + +func interactiveMultiSplitDiff(ctx context.Context, wi *Instance, wr *wrangler.Wrangler, w http.ResponseWriter, r *http.Request) (Worker, *template.Template, map[string]interface{}, error) { + if err := r.ParseForm(); err != nil { + return nil, nil, nil, fmt.Errorf("cannot parse form: %s", err) + } + keyspace := r.FormValue("keyspace") + shard := r.FormValue("shard") + + if keyspace == "" || shard == "" { + // display the list of possible shards to chose from + result := make(map[string]interface{}) + shards, err := shardSources(ctx, wr) + if err != nil { + result["Error"] = err.Error() + } else { + result["Shards"] = shards + } + return nil, multiSplitDiffTemplate, result, nil + } + + submitButtonValue := r.FormValue("submit") + if submitButtonValue == "" { + // display the input form + result := make(map[string]interface{}) + result["Keyspace"] = keyspace + result["Shard"] = shard + result["DefaultSourceUID"] = "0" + result["DefaultMinHealthyRdonlyTablets"] = fmt.Sprintf("%v", defaultMinHealthyRdonlyTablets) + result["DefaultParallelDiffsCount"] = fmt.Sprintf("%v", defaultParallelDiffsCount) + return nil, multiSplitDiffTemplate2, result, nil + } + + // Process input form. + excludeTables := r.FormValue("excludeTables") + var excludeTableArray []string + if excludeTables != "" { + excludeTableArray = strings.Split(excludeTables, ",") + } + excludeShards := r.FormValue("excludeShards") + var excludeShardsArray []string + if excludeShards != "" { + excludeShardsArray = strings.Split(excludeShards, ",") + } + minHealthyRdonlyTabletsStr := r.FormValue("minHealthyRdonlyTablets") + parallelDiffsCountStr := r.FormValue("parallelDiffsCount") + minHealthyRdonlyTablets, err := strconv.ParseInt(minHealthyRdonlyTabletsStr, 0, 64) + parallelDiffsCount, err := strconv.ParseInt(parallelDiffsCountStr, 0, 64) + if err != nil { + return nil, nil, nil, fmt.Errorf("cannot parse minHealthyRdonlyTablets: %s", err) + } + waitForFixedTimeRatherThanGtidSetStr := r.FormValue("waitForFixedTimeRatherThanGtidSet") + waitForFixedTimeRatherThanGtidSet := waitForFixedTimeRatherThanGtidSetStr == "true" + if err != nil { + return nil, nil, nil, fmt.Errorf("cannot parse minHealthyRdonlyTablets: %s", err) + } + + // start the diff job + wrk := NewMultiSplitDiffWorker(wr, wi.cell, keyspace, shard, excludeTableArray, excludeShardsArray, int(minHealthyRdonlyTablets), int(parallelDiffsCount), waitForFixedTimeRatherThanGtidSet, topodatapb.TabletType_RDONLY) + return wrk, nil, nil, nil +} + +func init() { + AddCommand("Diffs", Command{"MultiSplitDiff", + commandMultiSplitDiff, interactiveMultiSplitDiff, + "[--exclude_tables=''] ", + "Diffs a rdonly destination shard against its SourceShards"}) +} diff --git a/go/vt/worker/multi_split_diff_test.go b/go/vt/worker/multi_split_diff_test.go new file mode 100644 index 00000000000..670389e921a --- /dev/null +++ b/go/vt/worker/multi_split_diff_test.go @@ -0,0 +1,342 @@ +/* +Copyright 2017 Google Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package worker + +import ( + "fmt" + "strings" + "testing" + "time" + + "golang.org/x/net/context" + + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/logutil" + "vitess.io/vitess/go/vt/mysqlctl/tmutils" + "vitess.io/vitess/go/vt/topo/memorytopo" + "vitess.io/vitess/go/vt/vttablet/grpcqueryservice" + "vitess.io/vitess/go/vt/vttablet/queryservice/fakes" + "vitess.io/vitess/go/vt/wrangler" + "vitess.io/vitess/go/vt/wrangler/testlib" + + querypb "vitess.io/vitess/go/vt/proto/query" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vschemapb "vitess.io/vitess/go/vt/proto/vschema" +) + +// msdDestinationTabletServer is a local QueryService implementation to +// support the tests +type msdDestinationTabletServer struct { + t *testing.T + + *fakes.StreamHealthQueryService + excludedTable string + shardIndex int +} + +func (sq *msdDestinationTabletServer) StreamExecute(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions, callback func(reply *sqltypes.Result) error) error { + if strings.Contains(sql, sq.excludedTable) { + sq.t.Errorf("Split Diff operation on destination should skip the excluded table: %v query: %v", sq.excludedTable, sql) + } + + if hasKeyspace := strings.Contains(sql, "WHERE `keyspace_id`"); hasKeyspace == true { + sq.t.Errorf("Sql query on destination should not contain a keyspace_id WHERE clause; query received: %v", sql) + } + + sq.t.Logf("msdDestinationTabletServer: got query: %v", sql) + + // Send the headers + if err := callback(&sqltypes.Result{ + Fields: []*querypb.Field{ + { + Name: "id", + Type: sqltypes.Int64, + }, + { + Name: "msg", + Type: sqltypes.VarChar, + }, + { + Name: "keyspace_id", + Type: sqltypes.Int64, + }, + }, + }); err != nil { + return err + } + + // Send the values + ksids := []uint64{0x2000000000000000, 0x6000000000000000} + for i := 0; i < 100; i++ { + // skip the out-of-range values + if i%2 == sq.shardIndex { + continue + } + if err := callback(&sqltypes.Result{ + Rows: [][]sqltypes.Value{ + { + sqltypes.NewVarBinary(fmt.Sprintf("%v", i)), + sqltypes.NewVarBinary(fmt.Sprintf("Text for %v", i)), + sqltypes.NewVarBinary(fmt.Sprintf("%v", ksids[i%2])), + }, + }, + }); err != nil { + return err + } + } + return nil +} + +// msdSourceTabletServer is a local QueryService implementation to support the tests +type msdSourceTabletServer struct { + t *testing.T + + *fakes.StreamHealthQueryService + excludedTable string + v3 bool +} + +func (sq *msdSourceTabletServer) StreamExecute(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions, callback func(reply *sqltypes.Result) error) error { + if strings.Contains(sql, sq.excludedTable) { + sq.t.Errorf("Split Diff operation on source should skip the excluded table: %v query: %v", sq.excludedTable, sql) + } + + // we test for a keyspace_id where clause, except for v3 + if !sq.v3 { + if hasKeyspace := strings.Contains(sql, "WHERE `keyspace_id` < 4611686018427387904"); hasKeyspace != true { + sq.t.Errorf("Sql query on source should contain a keyspace_id WHERE clause; query received: %v", sql) + } + } + + sq.t.Logf("msdSourceTabletServer: got query: %v", sql) + + // Send the headers + if err := callback(&sqltypes.Result{ + Fields: []*querypb.Field{ + { + Name: "id", + Type: sqltypes.Int64, + }, + { + Name: "msg", + Type: sqltypes.VarChar, + }, + { + Name: "keyspace_id", + Type: sqltypes.Int64, + }, + }, + }); err != nil { + return err + } + + // Send the values + ksids := []uint64{0x2000000000000000, 0x6000000000000000} + for i := 0; i < 100; i++ { + if !sq.v3 && i%2 == 1 { + // for v2, filtering is done at SQL layer + continue + } + if err := callback(&sqltypes.Result{ + Rows: [][]sqltypes.Value{ + { + sqltypes.NewVarBinary(fmt.Sprintf("%v", i)), + sqltypes.NewVarBinary(fmt.Sprintf("Text for %v", i)), + sqltypes.NewVarBinary(fmt.Sprintf("%v", ksids[i%2])), + }, + }, + }); err != nil { + return err + } + } + return nil +} + +// TODO(aaijazi): Create a test in which source and destination data does not match + +func testMultiSplitDiff(t *testing.T, v3 bool, destinationTabletType topodatapb.TabletType) { + *useV3ReshardingMode = v3 + ts := memorytopo.NewServer("cell1", "cell2") + ctx := context.Background() + wi := NewInstance(ts, "cell1", time.Second) + + if v3 { + if err := ts.CreateKeyspace(ctx, "ks", &topodatapb.Keyspace{}); err != nil { + t.Fatalf("CreateKeyspace v3 failed: %v", err) + } + + vs := &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "table1_index": { + Type: "numeric", + }, + }, + Tables: map[string]*vschemapb.Table{ + "table1": { + ColumnVindexes: []*vschemapb.ColumnVindex{ + { + Column: "keyspace_id", + Name: "table1_index", + }, + }, + }, + }, + } + if err := ts.SaveVSchema(ctx, "ks", vs); err != nil { + t.Fatalf("SaveVSchema v3 failed: %v", err) + } + } else { + if err := ts.CreateKeyspace(ctx, "ks", &topodatapb.Keyspace{ + ShardingColumnName: "keyspace_id", + ShardingColumnType: topodatapb.KeyspaceIdType_UINT64, + }); err != nil { + t.Fatalf("CreateKeyspace failed: %v", err) + } + } + + sourceMaster := testlib.NewFakeTablet(t, wi.wr, "cell1", 0, + topodatapb.TabletType_MASTER, nil, testlib.TabletKeyspaceShard(t, "ks", "-80")) + sourceRdonly1 := testlib.NewFakeTablet(t, wi.wr, "cell1", 1, + topodatapb.TabletType_RDONLY, nil, testlib.TabletKeyspaceShard(t, "ks", "-80")) + sourceRdonly2 := testlib.NewFakeTablet(t, wi.wr, "cell1", 2, + topodatapb.TabletType_RDONLY, nil, testlib.TabletKeyspaceShard(t, "ks", "-80")) + + leftMaster := testlib.NewFakeTablet(t, wi.wr, "cell1", 10, + topodatapb.TabletType_MASTER, nil, testlib.TabletKeyspaceShard(t, "ks", "-40")) + leftRdonly1 := testlib.NewFakeTablet(t, wi.wr, "cell1", 11, + destinationTabletType, nil, testlib.TabletKeyspaceShard(t, "ks", "-40")) + leftRdonly2 := testlib.NewFakeTablet(t, wi.wr, "cell1", 12, + destinationTabletType, nil, testlib.TabletKeyspaceShard(t, "ks", "-40")) + + rightMaster := testlib.NewFakeTablet(t, wi.wr, "cell1", 20, + topodatapb.TabletType_MASTER, nil, testlib.TabletKeyspaceShard(t, "ks", "40-80")) + rightRdonly1 := testlib.NewFakeTablet(t, wi.wr, "cell1", 21, + topodatapb.TabletType_RDONLY, nil, testlib.TabletKeyspaceShard(t, "ks", "40-80")) + rightRdonly2 := testlib.NewFakeTablet(t, wi.wr, "cell1", 22, + topodatapb.TabletType_RDONLY, nil, testlib.TabletKeyspaceShard(t, "ks", "40-80")) + + // add the topo and schema data we'll need + if err := ts.CreateShard(ctx, "ks", "80-"); err != nil { + t.Fatalf("CreateShard(\"-80\") failed: %v", err) + } + wi.wr.SetSourceShards(ctx, "ks", "-40", []*topodatapb.TabletAlias{sourceRdonly1.Tablet.Alias}, nil) + if err := wi.wr.SetKeyspaceShardingInfo(ctx, "ks", "keyspace_id", topodatapb.KeyspaceIdType_UINT64, false); err != nil { + t.Fatalf("SetKeyspaceShardingInfo failed: %v", err) + } + wi.wr.SetSourceShards(ctx, "ks", "40-80", []*topodatapb.TabletAlias{sourceRdonly1.Tablet.Alias}, nil) + if err := wi.wr.SetKeyspaceShardingInfo(ctx, "ks", "keyspace_id", topodatapb.KeyspaceIdType_UINT64, false); err != nil { + t.Fatalf("SetKeyspaceShardingInfo failed: %v", err) + } + if err := wi.wr.RebuildKeyspaceGraph(ctx, "ks", nil); err != nil { + t.Fatalf("RebuildKeyspaceGraph failed: %v", err) + } + + excludedTable := "excludedTable1" + + for _, rdonly := range []*testlib.FakeTablet{sourceRdonly1, sourceRdonly2, leftRdonly1, leftRdonly2, rightRdonly1, rightRdonly2} { + // The destination only has half the data. + // For v2, we do filtering at the SQL level. + // For v3, we do it in the client. + // So in any case, we need real data. + rdonly.FakeMysqlDaemon.Schema = &tabletmanagerdatapb.SchemaDefinition{ + DatabaseSchema: "", + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ + { + Name: "table1", + Columns: []string{"id", "msg", "keyspace_id"}, + PrimaryKeyColumns: []string{"id"}, + Type: tmutils.TableBaseTable, + }, + { + Name: excludedTable, + Columns: []string{"id", "msg", "keyspace_id"}, + PrimaryKeyColumns: []string{"id"}, + Type: tmutils.TableBaseTable, + }, + }, + } + } + + for _, sourceRdonly := range []*testlib.FakeTablet{sourceRdonly1, sourceRdonly2} { + qs := fakes.NewStreamHealthQueryService(sourceRdonly.Target()) + qs.AddDefaultHealthResponse() + grpcqueryservice.Register(sourceRdonly.RPCServer, &msdSourceTabletServer{ + t: t, + StreamHealthQueryService: qs, + excludedTable: excludedTable, + v3: v3, + }) + } + + for _, destRdonly := range []*testlib.FakeTablet{leftRdonly1, leftRdonly2} { + qs := fakes.NewStreamHealthQueryService(destRdonly.Target()) + qs.AddDefaultHealthResponse() + grpcqueryservice.Register(destRdonly.RPCServer, &msdDestinationTabletServer{ + t: t, + StreamHealthQueryService: qs, + excludedTable: excludedTable, + shardIndex: 0, + }) + } + + for _, destRdonly := range []*testlib.FakeTablet{rightRdonly1, rightRdonly2} { + qs := fakes.NewStreamHealthQueryService(destRdonly.Target()) + qs.AddDefaultHealthResponse() + grpcqueryservice.Register(destRdonly.RPCServer, &msdDestinationTabletServer{ + t: t, + StreamHealthQueryService: qs, + excludedTable: excludedTable, + shardIndex: 1, + }) + } + + // Start action loop after having registered all RPC services. + for _, ft := range []*testlib.FakeTablet{sourceMaster, sourceRdonly1, sourceRdonly2, leftMaster, leftRdonly1, leftRdonly2, rightMaster, rightRdonly1, rightRdonly2} { + ft.StartActionLoop(t, wi.wr) + defer ft.StopActionLoop(t) + } + + tabletTypeName, _ := topodatapb.TabletType_name[int32(destinationTabletType)] + // Run the vtworker command. + args := []string{ + "MultiSplitDiff", + "-exclude_tables", excludedTable, + "-dest_tablet_type", tabletTypeName, + "ks/-80", + } + // We need to use FakeTabletManagerClient because we don't + // have a good way to fake the binlog player yet, which is + // necessary for synchronizing replication. + wr := wrangler.New(logutil.NewConsoleLogger(), ts, newFakeTMCTopo(ts)) + if err := runCommand(t, wi, wr, args); err != nil { + t.Fatal(err) + } +} + +func TestMultiSplitDiffv2(t *testing.T) { + testMultiSplitDiff(t, false, topodatapb.TabletType_RDONLY) +} + +func TestMultiSplitDiffv3(t *testing.T) { + testMultiSplitDiff(t, true, topodatapb.TabletType_RDONLY) +} + +func TestMultiSplitDiffWithReplica(t *testing.T) { + testMultiSplitDiff(t, true, topodatapb.TabletType_REPLICA) +} From 4303a66d1beb77cfe184151808df3f62ebd2c990 Mon Sep 17 00:00:00 2001 From: Andres Taylor Date: Tue, 9 Oct 2018 17:23:00 +0200 Subject: [PATCH 2/5] Clean up of multi_split_diff * Extracted a bunch of methods * Instead of mutexes, use channels * Check UIDs in separate channel * Test if MSD can work with the given tables * Remove destination tablet type that was not used anyway * Make tests run MultiSplitDiff as well * Make the unit test run MSD Signed-off-by: Andres Taylor --- go/vt/worker/multi_split_diff.go | 686 +++++++++++----------- go/vt/worker/multi_split_diff_cmd.go | 23 +- go/vt/worker/multi_split_diff_test.go | 17 +- go/vt/worker/result_merger.go | 19 +- test/base_sharding.py | 1 + test/initial_sharding.py | 40 +- test/initial_sharding_multi_split_diff.py | 29 + test/resharding.py | 54 +- test/resharding_multi_split_diff.py | 25 + 9 files changed, 487 insertions(+), 407 deletions(-) create mode 100755 test/initial_sharding_multi_split_diff.py create mode 100755 test/resharding_multi_split_diff.py diff --git a/go/vt/worker/multi_split_diff.go b/go/vt/worker/multi_split_diff.go index 7b635fcbf36..3449b74eb5a 100644 --- a/go/vt/worker/multi_split_diff.go +++ b/go/vt/worker/multi_split_diff.go @@ -23,7 +23,6 @@ import ( "golang.org/x/net/context" - "vitess.io/vitess/go/sync2" "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/mysqlctl/tmutils" "vitess.io/vitess/go/vt/topo" @@ -32,13 +31,10 @@ import ( "sort" - "bytes" - "time" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/binlog/binlogplayer" - "vitess.io/vitess/go/vt/key" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/vtgate/vindexes" @@ -54,9 +50,7 @@ type MultiSplitDiffWorker struct { keyspace string shard string excludeTables []string - excludeShards []string minHealthyRdonlyTablets int - destinationTabletType topodatapb.TabletType parallelDiffsCount int waitForFixedTimeRatherThanGtidSet bool cleaner *wrangler.Cleaner @@ -70,27 +64,21 @@ type MultiSplitDiffWorker struct { // populated during WorkerStateFindTargets, read-only after that sourceAlias *topodatapb.TabletAlias destinationAliases []*topodatapb.TabletAlias // matches order of destinationShards - - // populated during WorkerStateDiff - sourceSchemaDefinition *tabletmanagerdatapb.SchemaDefinition - destinationSchemaDefinitions []*tabletmanagerdatapb.SchemaDefinition } // NewMultiSplitDiffWorker returns a new MultiSplitDiffWorker object. -func NewMultiSplitDiffWorker(wr *wrangler.Wrangler, cell, keyspace, shard string, excludeTables []string, excludeShards []string, minHealthyRdonlyTablets, parallelDiffsCount int, waitForFixedTimeRatherThanGtidSet bool, tabletType topodatapb.TabletType) Worker { +func NewMultiSplitDiffWorker(wr *wrangler.Wrangler, cell, keyspace, shard string, excludeTables []string, minHealthyRdonlyTablets, parallelDiffsCount int, waitForFixedTimeRatherThanGtidSet bool) Worker { return &MultiSplitDiffWorker{ + waitForFixedTimeRatherThanGtidSet: waitForFixedTimeRatherThanGtidSet, StatusWorker: NewStatusWorker(), wr: wr, cell: cell, keyspace: keyspace, shard: shard, excludeTables: excludeTables, - excludeShards: excludeShards, minHealthyRdonlyTablets: minHealthyRdonlyTablets, - destinationTabletType: tabletType, parallelDiffsCount: parallelDiffsCount, - waitForFixedTimeRatherThanGtidSet: waitForFixedTimeRatherThanGtidSet, - cleaner: &wrangler.Cleaner{}, + cleaner: &wrangler.Cleaner{}, } } @@ -177,11 +165,43 @@ func (msdw *MultiSplitDiffWorker) run(ctx context.Context) error { if err := msdw.diff(ctx); err != nil { return fmt.Errorf("diff() failed: %v", err) } - if err := checkDone(ctx); err != nil { - return err + + return checkDone(ctx) +} + +func (msdw *MultiSplitDiffWorker) searchInKeyspace(ctx context.Context, wg *sync.WaitGroup, rec *concurrency.AllErrorRecorder, keyspace string, result chan *topo.ShardInfo, UIDs chan uint32) { + defer wg.Done() + shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) + shards, err := msdw.wr.TopoServer().GetShardNames(shortCtx, keyspace) + cancel() + if err != nil { + rec.RecordError(fmt.Errorf("failed to get list of shards for keyspace '%v': %v", keyspace, err)) + return + } + for _, shard := range shards { + wg.Add(1) + go msdw.produceShardInfo(ctx, wg, rec, keyspace, shard, result, UIDs) } +} - return nil +func (msdw *MultiSplitDiffWorker) produceShardInfo(ctx context.Context, wg *sync.WaitGroup, rec *concurrency.AllErrorRecorder, keyspace string, shard string, result chan *topo.ShardInfo, UIDs chan uint32) { + defer wg.Done() + shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) + si, err := msdw.wr.TopoServer().GetShard(shortCtx, keyspace, shard) + cancel() + if err != nil { + rec.RecordError(fmt.Errorf("failed to get details for shard '%v': %v", topoproto.KeyspaceShardString(keyspace, shard), err)) + return + } + + for _, sourceShard := range si.SourceShards { + if len(sourceShard.Tables) == 0 && sourceShard.Keyspace == msdw.keyspace && sourceShard.Shard == msdw.shard { + result <- si + UIDs <- sourceShard.Uid + // Prevents the same shard from showing up multiple times + return + } + } } // findDestinationShards finds all the shards that have filtered replication from the source shard @@ -193,78 +213,56 @@ func (msdw *MultiSplitDiffWorker) findDestinationShards(ctx context.Context) ([] return nil, fmt.Errorf("failed to get list of keyspaces: %v", err) } - wg := sync.WaitGroup{} - mu := sync.Mutex{} // protects result - result := make([]*topo.ShardInfo, 0, len(keyspaces)) - foundFirst := false + producers := sync.WaitGroup{} + consumer := sync.WaitGroup{} + result := make(chan *topo.ShardInfo, 2 /*the consumer will just copy out the data, so this should be enough*/) + sourceUIDs := make(chan uint32, 2 /*the consumer will just copy out the data, so this should be enough*/) rec := concurrency.AllErrorRecorder{} + var resultArray = make([]*topo.ShardInfo, 0, len(keyspaces)) + for _, keyspace := range keyspaces { - wg.Add(1) - go func(keyspace string) { - defer wg.Done() - shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) - shards, err := msdw.wr.TopoServer().GetShardNames(shortCtx, keyspace) - cancel() - if err != nil { - rec.RecordError(fmt.Errorf("failed to get list of shards for keyspace '%v': %v", keyspace, err)) - return - } - for _, shard := range shards { - wg.Add(1) - go func(keyspace, shard string) { - defer wg.Done() - shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) - si, err := msdw.wr.TopoServer().GetShard(shortCtx, keyspace, shard) - cancel() - if err != nil { - rec.RecordError(fmt.Errorf("failed to get details for shard '%v': %v", topoproto.KeyspaceShardString(keyspace, shard), err)) - return - } - - if stringContains(msdw.excludeShards, si.ShardName()) { - msdw.wr.Logger().Infof("ignoring shard %v/%v", si.Keyspace(), si.ShardName()) - return - } - - for _, sourceShard := range si.SourceShards { - if len(sourceShard.Tables) == 0 && sourceShard.Keyspace == msdw.keyspace && sourceShard.Shard == msdw.shard { - mu.Lock() - if foundFirst { - if msdw.sourceUID != sourceShard.Uid { - rec.RecordError(fmt.Errorf("different uid for the blp in %v/%v", si.Keyspace(), si.ShardName())) - } - } else { - msdw.sourceUID = sourceShard.Uid - } - result = append(result, si) - mu.Unlock() - // Prevents the same shard from showing up multiple times - break - } - } - - }(keyspace, shard) - } - }(keyspace) + producers.Add(1) + go msdw.searchInKeyspace(ctx, &producers, &rec, keyspace, result, sourceUIDs) } - wg.Wait() + + // Start the result array consumer + consumer.Add(1) + go func() { + defer consumer.Done() + for r := range result { + resultArray = append(resultArray, r) + } + }() + + // Start the sourceUID check consumer + consumer.Add(1) + go func() { + defer consumer.Done() + first := true + for r := range sourceUIDs { + if first { + first = false + msdw.sourceUID = r + } else if r != msdw.sourceUID { + rec.RecordError(fmt.Errorf("found a source ID that was different, aborting. %v vs %v", r, msdw.sourceUID)) + } + } + }() + + // we use this pattern because we don't know the size of shards up front, so we are using a buffered channel + producers.Wait() + close(result) + close(sourceUIDs) + consumer.Wait() if rec.HasErrors() { return nil, rec.Error() } - if len(result) == 0 { - return nil, fmt.Errorf("there are no destination shards") - } - return result, nil -} -func stringContains(l []string, s string) bool { - for _, v := range l { - if v == s { - return true - } + if len(resultArray) == 0 { + return nil, fmt.Errorf("there are no destination shards") } - return false + return resultArray, nil } // init phase: @@ -318,7 +316,7 @@ func (msdw *MultiSplitDiffWorker) findTargets(ctx context.Context) error { msdw.keyspace, msdw.shard, 1, /* minHealthyTablets */ - msdw.destinationTabletType) + topodatapb.TabletType_RDONLY) if err != nil { return fmt.Errorf("FindWorkerTablet() failed for %v/%v/%v: %v", msdw.cell, msdw.keyspace, msdw.shard, err) } @@ -350,75 +348,61 @@ func (msdw *MultiSplitDiffWorker) findTargets(ctx context.Context) error { return nil } -// synchronizeReplication phase: -// 1 - ask the master of the destination shard to pause filtered replication, -// and return the source binlog positions -// (add a cleanup task to restart filtered replication on master) -// 2 - stop the source tablet at a binlog position higher than the -// destination master. Get that new list of positions. -// (add a cleanup task to restart binlog replication on the source tablet, and -// change the existing ChangeSlaveType cleanup action to 'spare' type) -// 3 - ask the master of the destination shard to resume filtered replication -// up to the new list of positions, and return its binlog position. -// 4 - wait until the destination tablet is equal or passed that master -// binlog position, and stop its replication. -// (add a cleanup task to restart binlog replication on it, and change -// the existing ChangeSlaveType cleanup action to 'spare' type) -// 5 - restart filtered replication on the destination master. -// (remove the cleanup task that does the same) -// At this point, the source and the destination tablet are stopped at the same -// point. - -func (msdw *MultiSplitDiffWorker) synchronizeReplication(ctx context.Context) error { - msdw.SetState(WorkerStateSyncReplication) - - masterInfos := make([]*topo.TabletInfo, len(msdw.destinationAliases)) - for i, shardInfo := range msdw.destinationShards { - shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) - masterInfo, err := msdw.wr.TopoServer().GetTablet(shortCtx, shardInfo.MasterAlias) - cancel() - if err != nil { - return fmt.Errorf("synchronizeReplication: cannot get Tablet record for master %v: %v", msdw.shardInfo.MasterAlias, err) - } - masterInfos[i] = masterInfo - } - - // 1 - stop replication on all the destinations masters +// ask the master of the destination shard to pause filtered replication, +// and return the source binlog positions +// (add a cleanup task to restart filtered replication on master) +func (msdw *MultiSplitDiffWorker) stopReplicationOnAllDestinationMasters(ctx context.Context, masterInfos []*topo.TabletInfo) ([]string, error) { destVreplicationPos := make([]string, len(msdw.destinationShards)) + for i, shardInfo := range msdw.destinationShards { masterInfo := masterInfos[i] msdw.wr.Logger().Infof("Stopping master binlog replication on %v", shardInfo.MasterAlias) shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) _, err := msdw.wr.TabletManagerClient().VReplicationExec(shortCtx, masterInfo.Tablet, binlogplayer.StopVReplication(msdw.sourceUID, "for split diff")) + cancel() if err != nil { - cancel() - return fmt.Errorf("VReplicationExec(stop) for %v failed: %v", shardInfo.MasterAlias, err) + return nil, fmt.Errorf("VReplicationExec(stop) for %v failed: %v", shardInfo.MasterAlias, err) } wrangler.RecordVReplicationAction(msdw.cleaner, masterInfo.Tablet, binlogplayer.StartVReplication(msdw.sourceUID)) + shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) p3qr, err := msdw.wr.TabletManagerClient().VReplicationExec(shortCtx, masterInfo.Tablet, binlogplayer.ReadVReplicationPos(msdw.sourceUID)) + cancel() if err != nil { - cancel() - return fmt.Errorf("VReplicationExec(stop) for %v failed: %v", msdw.shardInfo.MasterAlias, err) + return nil, fmt.Errorf("VReplicationExec(stop) for %v failed: %v", msdw.shardInfo.MasterAlias, err) } qr := sqltypes.Proto3ToResult(p3qr) if len(qr.Rows) != 1 || len(qr.Rows[0]) != 1 { - cancel() - return fmt.Errorf("Unexpected result while reading position: %v", qr) + return nil, fmt.Errorf("unexpected result while reading position: %v", qr) } destVreplicationPos[i] = qr.Rows[0][0].ToString() - cancel() if err != nil { - return fmt.Errorf("StopBlp for %v failed: %v", msdw.shardInfo.MasterAlias, err) + return nil, fmt.Errorf("StopBlp for %v failed: %v", msdw.shardInfo.MasterAlias, err) } } + return destVreplicationPos, nil +} - // 2 - stop replication on the source rdonly tablet later than the last blp pos and note the blp position +func (msdw *MultiSplitDiffWorker) getTabletInfoForShard(ctx context.Context, shardInfo *topo.ShardInfo) (*topo.TabletInfo, error) { + shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) + masterInfo, err := msdw.wr.TopoServer().GetTablet(shortCtx, shardInfo.MasterAlias) + cancel() + if err != nil { + return nil, fmt.Errorf("synchronizeReplication: cannot get Tablet record for master %v: %v", msdw.shardInfo.MasterAlias, err) + } + return masterInfo, nil +} + +// stop the source tablet at a binlog position higher than the +// destination masters. Return the reached position +// (add a cleanup task to restart binlog replication on the source tablet, and +// change the existing ChangeSlaveType cleanup action to 'spare' type) +func (msdw *MultiSplitDiffWorker) stopReplicationOnSourceRdOnlyTabletAt(ctx context.Context, destVreplicationPos []string) (string, error) { shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) sourceTablet, err := msdw.wr.TopoServer().GetTablet(shortCtx, msdw.sourceAlias) cancel() if err != nil { - return err + return "", err } var mysqlPos string // will be the last GTID that we stopped at @@ -433,99 +417,228 @@ func (msdw *MultiSplitDiffWorker) synchronizeReplication(ctx context.Context) er // read the tablet sourceTablet, err := msdw.wr.TopoServer().GetTablet(shortCtx, msdw.sourceAlias) if err != nil { - return err + return "", err + } + shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) + msdw.wr.TabletManagerClient().StartSlave(shortCtx, sourceTablet.Tablet) + cancel() + if err != nil { + return "", err } shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) - defer cancel() mysqlPos, err = msdw.wr.TabletManagerClient().StopSlaveMinimum(shortCtx, sourceTablet.Tablet, vreplicationPos, *remoteActionsTimeout) + cancel() if err != nil { - return fmt.Errorf("cannot stop slave %v at right binlog position %v: %v", msdw.sourceAlias, vreplicationPos, err) + return "", fmt.Errorf("cannot stop slave %v at right binlog position %v: %v", msdw.sourceAlias, vreplicationPos, err) } } // change the cleaner actions from ChangeSlaveType(rdonly) // to StartSlave() + ChangeSlaveType(spare) wrangler.RecordStartSlaveAction(msdw.cleaner, sourceTablet.Tablet) + return mysqlPos, nil +} + +// ask the master of the destination shard to resume filtered replication +// up to the new list of positions, and return its binlog position. +func (msdw *MultiSplitDiffWorker) resumeReplicationOnDestinationMasterUntil(ctx context.Context, shardInfo *topo.ShardInfo, mysqlPos string, masterInfo *topo.TabletInfo) (string, error) { + msdw.wr.Logger().Infof("Restarting master %v until it catches up to %v", shardInfo.MasterAlias, mysqlPos) + shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) + _, err := msdw.wr.TabletManagerClient().VReplicationExec(shortCtx, masterInfo.Tablet, binlogplayer.StartVReplicationUntil(msdw.sourceUID, mysqlPos)) + cancel() + if err != nil { + return "", fmt.Errorf("VReplication(start until) for %v until %v failed: %v", shardInfo.MasterAlias, mysqlPos, err) + } + shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) + if err := msdw.wr.TabletManagerClient().VReplicationWaitForPos(shortCtx, masterInfo.Tablet, int(msdw.sourceUID), mysqlPos); err != nil { + cancel() + return "", fmt.Errorf("VReplicationWaitForPos for %v until %v failed: %v", shardInfo.MasterAlias, mysqlPos, err) + } + cancel() + + shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) + masterPos, err := msdw.wr.TabletManagerClient().MasterPosition(shortCtx, masterInfo.Tablet) + cancel() + if err != nil { + return "", fmt.Errorf("MasterPosition for %v failed: %v", msdw.shardInfo.MasterAlias, err) + } + return masterPos, nil +} + +// wait until the destination tablet is equal or passed that master +// binlog position, and stop its replication. +// (add a cleanup task to restart binlog replication on it, and change +// the existing ChangeSlaveType cleanup action to 'spare' type) +func (msdw *MultiSplitDiffWorker) stopReplicationOnDestinationRdOnlys(ctx context.Context, destinationAlias *topodatapb.TabletAlias, masterPos string) error { + if msdw.waitForFixedTimeRatherThanGtidSet { + msdw.wr.Logger().Infof("Workaround for broken GTID set in destination RDONLY. Just waiting for 1 minute for %v and assuming replication has caught up. (should be at %v)", destinationAlias, masterPos) + } else { + msdw.wr.Logger().Infof("Waiting for destination tablet %v to catch up to %v", destinationAlias, masterPos) + } + shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) + destinationTablet, err := msdw.wr.TopoServer().GetTablet(shortCtx, destinationAlias) + cancel() + if err != nil { + return err + } + + if msdw.waitForFixedTimeRatherThanGtidSet { + time.Sleep(1 * time.Minute) + } + + shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) + if msdw.waitForFixedTimeRatherThanGtidSet { + err = msdw.wr.TabletManagerClient().StopSlave(shortCtx, destinationTablet.Tablet) + } else { + _, err = msdw.wr.TabletManagerClient().StopSlaveMinimum(shortCtx, destinationTablet.Tablet, masterPos, *remoteActionsTimeout) + } + cancel() + if err != nil { + return fmt.Errorf("StopSlaveMinimum for %v at %v failed: %v", destinationAlias, masterPos, err) + } + wrangler.RecordStartSlaveAction(msdw.cleaner, destinationTablet.Tablet) + return nil +} + +// restart filtered replication on the destination master. +// (remove the cleanup task that does the same) +func (msdw *MultiSplitDiffWorker) restartReplicationOn(ctx context.Context, shardInfo *topo.ShardInfo, masterInfo *topo.TabletInfo) error { + msdw.wr.Logger().Infof("Restarting filtered replication on master %v", shardInfo.MasterAlias) + shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) + _, err := msdw.wr.TabletManagerClient().VReplicationExec(shortCtx, masterInfo.Tablet, binlogplayer.StartVReplication(msdw.sourceUID)) + if err != nil { + return fmt.Errorf("VReplicationExec(start) failed for %v: %v", shardInfo.MasterAlias, err) + } + cancel() + return nil +} + +// synchronizeReplication phase: +// At this point, the source and the destination tablet are stopped at the same +// point. + +func (msdw *MultiSplitDiffWorker) synchronizeReplication(ctx context.Context) error { + msdw.SetState(WorkerStateSyncReplication) + var err error + + masterInfos := make([]*topo.TabletInfo, len(msdw.destinationAliases)) + for i, shardInfo := range msdw.destinationShards { + masterInfos[i], err = msdw.getTabletInfoForShard(ctx, shardInfo) + if err != nil { + return err + } + } + + destVreplicationPos, err := msdw.stopReplicationOnAllDestinationMasters(ctx, masterInfos) + if err != nil { + return err + } + + mysqlPos, err := msdw.stopReplicationOnSourceRdOnlyTabletAt(ctx, destVreplicationPos) + if err != nil { + return err + } + for i, shardInfo := range msdw.destinationShards { masterInfo := masterInfos[i] destinationAlias := msdw.destinationAliases[i] - // 3 - run all the destination masters blp until they've reached that position - msdw.wr.Logger().Infof("Restarting master %v until it catches up to %v", shardInfo.MasterAlias, mysqlPos) - shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) - defer cancel() - _, err = msdw.wr.TabletManagerClient().VReplicationExec(shortCtx, masterInfo.Tablet, binlogplayer.StartVReplicationUntil(msdw.sourceUID, mysqlPos)) + masterPos, err := msdw.resumeReplicationOnDestinationMasterUntil(ctx, shardInfo, mysqlPos, masterInfo) if err != nil { - return fmt.Errorf("VReplication(start until) for %v until %v failed: %v", shardInfo.MasterAlias, mysqlPos, err) - } - if err := msdw.wr.TabletManagerClient().VReplicationWaitForPos(shortCtx, masterInfo.Tablet, int(msdw.sourceUID), mysqlPos); err != nil { - return fmt.Errorf("VReplicationWaitForPos for %v until %v failed: %v", shardInfo.MasterAlias, mysqlPos, err) + return err } - masterPos, err := msdw.wr.TabletManagerClient().MasterPosition(shortCtx, masterInfo.Tablet) + + err = msdw.stopReplicationOnDestinationRdOnlys(ctx, destinationAlias, masterPos) if err != nil { - return fmt.Errorf("MasterPosition for %v failed: %v", msdw.shardInfo.MasterAlias, err) + return err } - // 4 - stop replication on all the destination rdonlys - if msdw.waitForFixedTimeRatherThanGtidSet { - msdw.wr.Logger().Infof("Workaround for broken GTID set in destination RDONLY. Just waiting for 1 minute for %v and assuming replication has caught up. (should be at %v)", destinationAlias, masterPos) - } else { - msdw.wr.Logger().Infof("Waiting for destination tablet %v to catch up to %v", destinationAlias, masterPos) - } - shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) - destinationTablet, err := msdw.wr.TopoServer().GetTablet(shortCtx, destinationAlias) - cancel() + err = msdw.restartReplicationOn(ctx, shardInfo, masterInfo) if err != nil { return err } + } - if msdw.waitForFixedTimeRatherThanGtidSet { - time.Sleep(1 * time.Minute) - } + return nil +} - shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) - if msdw.waitForFixedTimeRatherThanGtidSet { - err = msdw.wr.TabletManagerClient().StopSlave(shortCtx, destinationTablet.Tablet) - } else { - _, err = msdw.wr.TabletManagerClient().StopSlaveMinimum(shortCtx, destinationTablet.Tablet, masterPos, *remoteActionsTimeout) - } - cancel() - if err != nil { - return fmt.Errorf("StopSlaveMinimum for %v at %v failed: %v", destinationAlias, masterPos, err) - } - wrangler.RecordStartSlaveAction(msdw.cleaner, destinationTablet.Tablet) +func (msdw *MultiSplitDiffWorker) diffSingleTable(ctx context.Context, wg *sync.WaitGroup, tableDefinition *tabletmanagerdatapb.TableDefinition, keyspaceSchema *vindexes.KeyspaceSchema) error { + msdw.wr.Logger().Infof("Starting the diff on table %v", tableDefinition.Name) - // 5 - restart replication on the destination masters + sourceQueryResultReader, err := TableScan(ctx, msdw.wr.Logger(), msdw.wr.TopoServer(), msdw.sourceAlias, tableDefinition) + if err != nil { + return fmt.Errorf("TableScan(source) failed: %v", err) + } + defer sourceQueryResultReader.Close(ctx) - msdw.wr.Logger().Infof("Restarting filtered replication on master %v", shardInfo.MasterAlias) - shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) - defer cancel() - if _, err = msdw.wr.TabletManagerClient().VReplicationExec(ctx, masterInfo.Tablet, binlogplayer.StartVReplication(msdw.sourceUID)); err != nil { - return fmt.Errorf("VReplicationExec(start) failed for %v: %v", shardInfo.MasterAlias, err) - } - cancel() + destinationQueryResultReaders := make([]ResultReader, len(msdw.destinationAliases)) + for i, destinationAlias := range msdw.destinationAliases { + destinationQueryResultReader, err := TableScan(ctx, msdw.wr.Logger(), msdw.wr.TopoServer(), destinationAlias, tableDefinition) if err != nil { - return fmt.Errorf("StartBlp failed for %v: %v", shardInfo.MasterAlias, err) + return fmt.Errorf("TableScan(destination) failed: %v", err) } + + // For the first result scanner, let's check the PKs are of types that we can work with + if i == 0 { + err = CheckValidTypesForResultMerger(destinationQueryResultReader.fields, len(tableDefinition.PrimaryKeyColumns)) + if err != nil { + return fmt.Errorf("invalid types for multi split diff. use the regular split diff instead %v", err.Error()) + } + } + + // We are knowingly using defer inside the for loop. + // All these readers need to be active until the diff is done + //noinspection GoDeferInLoop + defer destinationQueryResultReader.Close(ctx) + destinationQueryResultReaders[i] = destinationQueryResultReader + } + mergedResultReader, err := NewResultMerger(destinationQueryResultReaders, len(tableDefinition.PrimaryKeyColumns)) + if err != nil { + return fmt.Errorf("NewResultMerger failed: %v", err) + } + + // Create the row differ. + differ, err := NewRowDiffer(sourceQueryResultReader, mergedResultReader, tableDefinition) + if err != nil { + return fmt.Errorf("NewRowDiffer() failed: %v", err) + } + + // And run the diff. + report, err := differ.Go(msdw.wr.Logger()) + if err != nil { + return fmt.Errorf("Differ.Go failed: %v", err.Error()) + } + + if report.HasDifferences() { + return fmt.Errorf("table %v has differences: %v", tableDefinition.Name, report.String()) } + msdw.wr.Logger().Infof("Table %v checks out (%v rows processed, %v qps)", tableDefinition.Name, report.processedRows, report.processingQPS) + return nil } -// diff phase: will log messages regarding the diff. -// - get the schema on all tablets -// - if some table schema mismatches, record them (use existing schema diff tools). -// - for each table in destination, run a diff pipeline. +func (msdw *MultiSplitDiffWorker) tableDiffingConsumer(ctx context.Context, wg *sync.WaitGroup, tableChan chan *tabletmanagerdatapb.TableDefinition, rec *concurrency.AllErrorRecorder, keyspaceSchema *vindexes.KeyspaceSchema) { + defer wg.Done() -func (msdw *MultiSplitDiffWorker) diff(ctx context.Context) error { - msdw.SetState(WorkerStateDiff) + for tableDefinition := range tableChan { + err := msdw.diffSingleTable(ctx, wg, tableDefinition, keyspaceSchema) + if err != nil { + rec.RecordError(err) + msdw.wr.Logger().Errorf("%v", err) + } + } +} +func (msdw *MultiSplitDiffWorker) gatherSchemaInfo(ctx context.Context) ([]*tabletmanagerdatapb.SchemaDefinition, *tabletmanagerdatapb.SchemaDefinition, error) { msdw.wr.Logger().Infof("Gathering schema information...") wg := sync.WaitGroup{} rec := &concurrency.AllErrorRecorder{} - mu := sync.Mutex{} // protects msdw.destinationSchemaDefinitions - msdw.destinationSchemaDefinitions = make([]*tabletmanagerdatapb.SchemaDefinition, len(msdw.destinationAliases)) + + // this array will have concurrent writes to it, but no two goroutines will write to the same slot in the array + destinationSchemaDefinitions := make([]*tabletmanagerdatapb.SchemaDefinition, len(msdw.destinationAliases)) + var sourceSchemaDefinition *tabletmanagerdatapb.SchemaDefinition for i, destinationAlias := range msdw.destinationAliases { wg.Add(1) go func(i int, destinationAlias *topodatapb.TabletAlias) { @@ -535,9 +648,7 @@ func (msdw *MultiSplitDiffWorker) diff(ctx context.Context) error { shortCtx, destinationAlias, nil /* tables */, msdw.excludeTables, false /* includeViews */) cancel() rec.RecordError(err) - mu.Lock() - msdw.destinationSchemaDefinitions[i] = destinationSchemaDefinition - mu.Unlock() + destinationSchemaDefinitions[i] = destinationSchemaDefinition msdw.wr.Logger().Infof("Got schema from destination %v", destinationAlias) wg.Done() }(i, destinationAlias) @@ -546,7 +657,7 @@ func (msdw *MultiSplitDiffWorker) diff(ctx context.Context) error { go func() { var err error shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) - msdw.sourceSchemaDefinition, err = msdw.wr.GetSchema( + sourceSchemaDefinition, err = msdw.wr.GetSchema( shortCtx, msdw.sourceAlias, nil /* tables */, msdw.excludeTables, false /* includeViews */) cancel() rec.RecordError(err) @@ -556,192 +667,91 @@ func (msdw *MultiSplitDiffWorker) diff(ctx context.Context) error { wg.Wait() if rec.HasErrors() { - return rec.Error() + return nil, nil, rec.Error() } + return destinationSchemaDefinitions, sourceSchemaDefinition, nil +} + +func (msdw *MultiSplitDiffWorker) diffSchemaInformation(ctx context.Context, destinationSchemaDefinitions []*tabletmanagerdatapb.SchemaDefinition, sourceSchemaDefinition *tabletmanagerdatapb.SchemaDefinition) { msdw.wr.Logger().Infof("Diffing the schema...") - rec = &concurrency.AllErrorRecorder{} + rec := &concurrency.AllErrorRecorder{} sourceShardName := fmt.Sprintf("%v/%v", msdw.shardInfo.Keyspace(), msdw.shardInfo.ShardName()) - for i, destinationSchemaDefinition := range msdw.destinationSchemaDefinitions { + for i, destinationSchemaDefinition := range destinationSchemaDefinitions { destinationShard := msdw.destinationShards[i] destinationShardName := fmt.Sprintf("%v/%v", destinationShard.Keyspace(), destinationShard.ShardName()) - tmutils.DiffSchema(destinationShardName, destinationSchemaDefinition, sourceShardName, msdw.sourceSchemaDefinition, rec) + tmutils.DiffSchema(destinationShardName, destinationSchemaDefinition, sourceShardName, sourceSchemaDefinition, rec) } if rec.HasErrors() { msdw.wr.Logger().Warningf("Different schemas: %v", rec.Error().Error()) } else { msdw.wr.Logger().Infof("Schema match, good.") } +} - // read the vschema if needed - var keyspaceSchema *vindexes.KeyspaceSchema - if *useV3ReshardingMode { - kschema, err := msdw.wr.TopoServer().GetVSchema(ctx, msdw.keyspace) - if err != nil { - return fmt.Errorf("cannot load VSchema for keyspace %v: %v", msdw.keyspace, err) - } - if kschema == nil { - return fmt.Errorf("no VSchema for keyspace %v", msdw.keyspace) - } - - keyspaceSchema, err = vindexes.BuildKeyspaceSchema(kschema, msdw.keyspace) - if err != nil { - return fmt.Errorf("cannot build vschema for keyspace %v: %v", msdw.keyspace, err) - } +func (msdw *MultiSplitDiffWorker) loadVSchema(ctx context.Context) (*vindexes.KeyspaceSchema, error) { + shortCtx, cancel := context.WithCancel(ctx) + kschema, err := msdw.wr.TopoServer().GetVSchema(shortCtx, msdw.keyspace) + cancel() + if err != nil { + return nil, fmt.Errorf("cannot load VSchema for keyspace %v: %v", msdw.keyspace, err) + } + if kschema == nil { + return nil, fmt.Errorf("no VSchema for keyspace %v", msdw.keyspace) } - // Compute the overlap keyrange. Later, we'll compare it with - // source or destination keyrange. If it matches either, - // we'll just ask for all the data. If the overlap is a subset, - // we'll filter. - var err error - var keyranges = make([]*topodatapb.KeyRange, len(msdw.destinationShards)) - for i, destinationShard := range msdw.destinationShards { - keyranges[i] = destinationShard.KeyRange + keyspaceSchema, err := vindexes.BuildKeyspaceSchema(kschema, msdw.keyspace) + if err != nil { + return nil, fmt.Errorf("cannot build vschema for keyspace %v: %v", msdw.keyspace, err) } - union, err := keyRangesUnion(keyranges) + return keyspaceSchema, nil +} + +// diff phase: will log messages regarding the diff. +// - get the schema on all tablets +// - if some table schema mismatches, record them (use existing schema diff tools). +// - for each table in destination, run a diff pipeline. + +func (msdw *MultiSplitDiffWorker) diff(ctx context.Context) error { + msdw.SetState(WorkerStateDiff) + + destinationSchemaDefinitions, sourceSchemaDefinition, err := msdw.gatherSchemaInfo(ctx) if err != nil { return err } + msdw.diffSchemaInformation(ctx, destinationSchemaDefinitions, sourceSchemaDefinition) + + // read the vschema if needed + var keyspaceSchema *vindexes.KeyspaceSchema + if *useV3ReshardingMode { + keyspaceSchema, err = msdw.loadVSchema(ctx) + if err != nil { + return err + } + } - // run diffs in parallel msdw.wr.Logger().Infof("Running the diffs...") - sem := sync2.NewSemaphore(msdw.parallelDiffsCount, 0) - tableDefinitions := msdw.sourceSchemaDefinition.TableDefinitions + tableDefinitions := sourceSchemaDefinition.TableDefinitions + rec := &concurrency.AllErrorRecorder{} // sort tables by size // if there are large deltas between table sizes then it's more efficient to start working on the large tables first sort.Slice(tableDefinitions, func(i, j int) bool { return tableDefinitions[i].DataLength > tableDefinitions[j].DataLength }) - - // use a channel to make sure tables are diffed in order tableChan := make(chan *tabletmanagerdatapb.TableDefinition, len(tableDefinitions)) for _, tableDefinition := range tableDefinitions { tableChan <- tableDefinition } + close(tableChan) - // start as many goroutines as there are tables to diff - for range tableDefinitions { - wg.Add(1) - go func() { - defer wg.Done() - // use the semaphore to limit the number of tables that are diffed in parallel - sem.Acquire() - defer sem.Release() - - // grab the table to process out of the channel - tableDefinition := <-tableChan - - msdw.wr.Logger().Infof("Starting the diff on table %v", tableDefinition.Name) - - // On the source, see if we need a full scan - // or a filtered scan. - var err error - var sourceQueryResultReader *QueryResultReader - if key.KeyRangeEqual(union, msdw.shardInfo.KeyRange) { - sourceQueryResultReader, err = TableScan(ctx, msdw.wr.Logger(), msdw.wr.TopoServer(), msdw.sourceAlias, tableDefinition) - } else { - msdw.wr.Logger().Infof("Filtering source down to %v", union) - sourceQueryResultReader, err = TableScanByKeyRange(ctx, msdw.wr.Logger(), msdw.wr.TopoServer(), msdw.sourceAlias, tableDefinition, union, keyspaceSchema, msdw.keyspaceInfo.ShardingColumnName, msdw.keyspaceInfo.ShardingColumnType) - } - if err != nil { - newErr := fmt.Errorf("TableScan(source) failed: %v", err) - rec.RecordError(newErr) - msdw.wr.Logger().Errorf("%v", newErr) - return - } - defer sourceQueryResultReader.Close(ctx) - - // On the destination, see if we need a full scan - // or a filtered scan. - destinationQueryResultReaders := make([]ResultReader, len(msdw.destinationAliases)) - for i, destinationAlias := range msdw.destinationAliases { - destinationQueryResultReader, err := TableScan(ctx, msdw.wr.Logger(), msdw.wr.TopoServer(), destinationAlias, tableDefinition) - if err != nil { - newErr := fmt.Errorf("TableScan(destination) failed: %v", err) - rec.RecordError(newErr) - msdw.wr.Logger().Errorf("%v", newErr) - return - } - //noinspection GoDeferInLoop - defer destinationQueryResultReader.Close(ctx) - destinationQueryResultReaders[i] = destinationQueryResultReader - } - mergedResultReader, err := NewResultMerger(destinationQueryResultReaders, len(tableDefinition.PrimaryKeyColumns)) - if err != nil { - newErr := fmt.Errorf("NewResultMerger failed: %v", err) - rec.RecordError(newErr) - msdw.wr.Logger().Errorf("%v", newErr) - return - } - - // Create the row differ. - differ, err := NewRowDiffer(sourceQueryResultReader, mergedResultReader, tableDefinition) - if err != nil { - newErr := fmt.Errorf("NewRowDiffer() failed: %v", err) - rec.RecordError(newErr) - msdw.wr.Logger().Errorf("%v", newErr) - return - } - - // And run the diff. - report, err := differ.Go(msdw.wr.Logger()) - if err != nil { - newErr := fmt.Errorf("Differ.Go failed: %v", err.Error()) - rec.RecordError(newErr) - msdw.wr.Logger().Errorf("%v", newErr) - } else { - if report.HasDifferences() { - err := fmt.Errorf("Table %v has differences: %v", tableDefinition.Name, report.String()) - rec.RecordError(err) - msdw.wr.Logger().Warningf(err.Error()) - } else { - msdw.wr.Logger().Infof("Table %v checks out (%v rows processed, %v qps)", tableDefinition.Name, report.processedRows, report.processingQPS) - } - } - }() + consumers := sync.WaitGroup{} + // start as many goroutines we want parallel diffs running + for i := 0; i < msdw.parallelDiffsCount; i++ { + consumers.Add(1) + go msdw.tableDiffingConsumer(ctx, &consumers, tableChan, rec, keyspaceSchema) } - // grab the table to process out of the channel - wg.Wait() + // wait for all consumers to wrap up their work + consumers.Wait() return rec.Error() } - -func keyRangesUnion(keyranges []*topodatapb.KeyRange) (*topodatapb.KeyRange, error) { - // HACK HACK HACK - // This assumes the ranges are consecutive. It just returns the smallest start and the largest end. - - var start []byte - var end []byte - for i, keyrange := range keyranges { - if i == 0 { - // initialize the first values - start = keyrange.Start - end = keyrange.End - continue - } - - // nil always wins - if keyrange.Start == nil { - start = nil - } else { - if start != nil { - if bytes.Compare(start, keyrange.Start) > 0 { - start = keyrange.Start - } - } - } - - if keyrange.End == nil { - end = nil - } else { - if end != nil { - if bytes.Compare(end, keyrange.End) < 0 { - end = keyrange.End - } - } - } - } - - return &topodatapb.KeyRange{Start: start, End: end}, nil -} diff --git a/go/vt/worker/multi_split_diff_cmd.go b/go/vt/worker/multi_split_diff_cmd.go index 37064dbbaad..353e47790e7 100644 --- a/go/vt/worker/multi_split_diff_cmd.go +++ b/go/vt/worker/multi_split_diff_cmd.go @@ -29,8 +29,6 @@ import ( "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/wrangler" - - topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) const multiSplitDiffHTML = ` @@ -64,8 +62,6 @@ const multiSplitDiffHTML2 = `

    - -

    @@ -84,9 +80,7 @@ var multiSplitDiffTemplate2 = mustParseTemplate("multiSplitDiff2", multiSplitDif func commandMultiSplitDiff(wi *Instance, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) (Worker, error) { excludeTables := subFlags.String("exclude_tables", "", "comma separated list of tables to exclude") - excludeShards := subFlags.String("exclude_shards", "", "comma separated list of shards to exclude") minHealthyRdonlyTablets := subFlags.Int("min_healthy_rdonly_tablets", defaultMinHealthyRdonlyTablets, "minimum number of healthy RDONLY tablets before taking out one") - destTabletTypeStr := subFlags.String("dest_tablet_type", defaultDestTabletType, "destination tablet type (RDONLY or REPLICA) that will be used to compare the shards") parallelDiffsCount := subFlags.Int("parallel_diffs_count", defaultParallelDiffsCount, "number of tables to diff in parallel") waitForFixedTimeRatherThanGtidSet := subFlags.Bool("wait_for_fixed_time_rather_than_gtid_set", false, "wait for 1m when syncing up the destination RDONLY tablet rather than using the GTID set. Use this when the GTID set on the RDONLY is broken. Make sure the RDONLY is not behind in replication when using this flag.") if err := subFlags.Parse(args); err != nil { @@ -104,15 +98,7 @@ func commandMultiSplitDiff(wi *Instance, wr *wrangler.Wrangler, subFlags *flag.F if *excludeTables != "" { excludeTableArray = strings.Split(*excludeTables, ",") } - var excludeShardsArray []string - if *excludeShards != "" { - excludeShardsArray = strings.Split(*excludeShards, ",") - } - destTabletType, ok := topodatapb.TabletType_value[*destTabletTypeStr] - if !ok { - return nil, fmt.Errorf("command MultiSplitDiff invalid dest_tablet_type: %v", destTabletType) - } - return NewMultiSplitDiffWorker(wr, wi.cell, keyspace, shard, excludeTableArray, excludeShardsArray, *minHealthyRdonlyTablets, *parallelDiffsCount, *waitForFixedTimeRatherThanGtidSet, topodatapb.TabletType(destTabletType)), nil + return NewMultiSplitDiffWorker(wr, wi.cell, keyspace, shard, excludeTableArray, *minHealthyRdonlyTablets, *parallelDiffsCount, *waitForFixedTimeRatherThanGtidSet), nil } // shardSources returns all the shards that are SourceShards of at least one other shard. @@ -219,11 +205,6 @@ func interactiveMultiSplitDiff(ctx context.Context, wi *Instance, wr *wrangler.W if excludeTables != "" { excludeTableArray = strings.Split(excludeTables, ",") } - excludeShards := r.FormValue("excludeShards") - var excludeShardsArray []string - if excludeShards != "" { - excludeShardsArray = strings.Split(excludeShards, ",") - } minHealthyRdonlyTabletsStr := r.FormValue("minHealthyRdonlyTablets") parallelDiffsCountStr := r.FormValue("parallelDiffsCount") minHealthyRdonlyTablets, err := strconv.ParseInt(minHealthyRdonlyTabletsStr, 0, 64) @@ -238,7 +219,7 @@ func interactiveMultiSplitDiff(ctx context.Context, wi *Instance, wr *wrangler.W } // start the diff job - wrk := NewMultiSplitDiffWorker(wr, wi.cell, keyspace, shard, excludeTableArray, excludeShardsArray, int(minHealthyRdonlyTablets), int(parallelDiffsCount), waitForFixedTimeRatherThanGtidSet, topodatapb.TabletType_RDONLY) + wrk := NewMultiSplitDiffWorker(wr, wi.cell, keyspace, shard, excludeTableArray, int(minHealthyRdonlyTablets), int(parallelDiffsCount), waitForFixedTimeRatherThanGtidSet) return wrk, nil, nil, nil } diff --git a/go/vt/worker/multi_split_diff_test.go b/go/vt/worker/multi_split_diff_test.go index 670389e921a..7811f5b1b54 100644 --- a/go/vt/worker/multi_split_diff_test.go +++ b/go/vt/worker/multi_split_diff_test.go @@ -169,7 +169,7 @@ func (sq *msdSourceTabletServer) StreamExecute(ctx context.Context, target *quer // TODO(aaijazi): Create a test in which source and destination data does not match -func testMultiSplitDiff(t *testing.T, v3 bool, destinationTabletType topodatapb.TabletType) { +func testMultiSplitDiff(t *testing.T, v3 bool) { *useV3ReshardingMode = v3 ts := memorytopo.NewServer("cell1", "cell2") ctx := context.Background() @@ -220,9 +220,9 @@ func testMultiSplitDiff(t *testing.T, v3 bool, destinationTabletType topodatapb. leftMaster := testlib.NewFakeTablet(t, wi.wr, "cell1", 10, topodatapb.TabletType_MASTER, nil, testlib.TabletKeyspaceShard(t, "ks", "-40")) leftRdonly1 := testlib.NewFakeTablet(t, wi.wr, "cell1", 11, - destinationTabletType, nil, testlib.TabletKeyspaceShard(t, "ks", "-40")) + topodatapb.TabletType_RDONLY, nil, testlib.TabletKeyspaceShard(t, "ks", "-40")) leftRdonly2 := testlib.NewFakeTablet(t, wi.wr, "cell1", 12, - destinationTabletType, nil, testlib.TabletKeyspaceShard(t, "ks", "-40")) + topodatapb.TabletType_RDONLY, nil, testlib.TabletKeyspaceShard(t, "ks", "-40")) rightMaster := testlib.NewFakeTablet(t, wi.wr, "cell1", 20, topodatapb.TabletType_MASTER, nil, testlib.TabletKeyspaceShard(t, "ks", "40-80")) @@ -312,12 +312,10 @@ func testMultiSplitDiff(t *testing.T, v3 bool, destinationTabletType topodatapb. defer ft.StopActionLoop(t) } - tabletTypeName, _ := topodatapb.TabletType_name[int32(destinationTabletType)] // Run the vtworker command. args := []string{ "MultiSplitDiff", "-exclude_tables", excludedTable, - "-dest_tablet_type", tabletTypeName, "ks/-80", } // We need to use FakeTabletManagerClient because we don't @@ -330,13 +328,10 @@ func testMultiSplitDiff(t *testing.T, v3 bool, destinationTabletType topodatapb. } func TestMultiSplitDiffv2(t *testing.T) { - testMultiSplitDiff(t, false, topodatapb.TabletType_RDONLY) + // TODO: Make MultiSplitDiff work with V2 + // testMultiSplitDiff(t, false) } func TestMultiSplitDiffv3(t *testing.T) { - testMultiSplitDiff(t, true, topodatapb.TabletType_RDONLY) -} - -func TestMultiSplitDiffWithReplica(t *testing.T) { - testMultiSplitDiff(t, true, topodatapb.TabletType_REPLICA) + testMultiSplitDiff(t, true) } diff --git a/go/vt/worker/result_merger.go b/go/vt/worker/result_merger.go index d5a2ca26c94..68250d06ad0 100644 --- a/go/vt/worker/result_merger.go +++ b/go/vt/worker/result_merger.go @@ -68,11 +68,9 @@ func NewResultMerger(inputs []ResultReader, pkFieldCount int) (*ResultMerger, er return nil, err } - for i := 0; i < pkFieldCount; i++ { - typ := fields[i].Type - if !sqltypes.IsIntegral(typ) && !sqltypes.IsFloat(typ) && !sqltypes.IsBinary(typ) { - return nil, fmt.Errorf("unsupported type: %v cannot compare fields with this type. Use the vtworker LegacySplitClone command instead", typ) - } + err := CheckValidTypesForResultMerger(fields, pkFieldCount) + if err != nil { + return nil, fmt.Errorf("invalid PK types for ResultMerger. Use the vtworker LegacySplitClone command instead. %v", err.Error()) } // Initialize the priority queue with all input ResultReader which have at @@ -100,6 +98,17 @@ func NewResultMerger(inputs []ResultReader, pkFieldCount int) (*ResultMerger, er return rm, nil } +// CheckValidTypesForResultMerger returns an error if the provided fields are not compatible with how ResultMerger works +func CheckValidTypesForResultMerger(fields []*querypb.Field, pkFieldCount int) error { + for i := 0; i < pkFieldCount; i++ { + typ := fields[i].Type + if !sqltypes.IsIntegral(typ) && !sqltypes.IsFloat(typ) && !sqltypes.IsBinary(typ) { + return fmt.Errorf("unsupported type: %v cannot compare fields with this type", typ) + } + } + return nil +} + // Fields returns the field information for the columns in the result. // It is part of the ResultReader interface. func (rm *ResultMerger) Fields() []*querypb.Field { diff --git a/test/base_sharding.py b/test/base_sharding.py index 89b1490804f..3fcc92c1fb1 100644 --- a/test/base_sharding.py +++ b/test/base_sharding.py @@ -28,6 +28,7 @@ keyspace_id_type = keyrange_constants.KIT_UINT64 use_rbr = False +use_multi_split_diff = False pack_keyspace_id = struct.Struct('!Q').pack # fixed_parent_id is used as fixed value for the "parent_id" column in all rows. diff --git a/test/initial_sharding.py b/test/initial_sharding.py index 0a101192ae0..e6aa8728253 100755 --- a/test/initial_sharding.py +++ b/test/initial_sharding.py @@ -28,7 +28,6 @@ import logging import unittest - from vtdb import keyrange_constants import base_sharding @@ -577,23 +576,32 @@ def test_resharding(self): min_statements=1000, min_transactions=1000) # use vtworker to compare the data - logging.debug('Running vtworker SplitDiff for -80') for t in [shard_0_rdonly1, shard_1_rdonly1]: utils.run_vtctl(['RunHealthCheck', t.tablet_alias]) - utils.run_vtworker(['-cell', 'test_nj', - '--use_v3_resharding_mode=false', - 'SplitDiff', - '--min_healthy_rdonly_tablets', '1', - 'test_keyspace/-80'], - auto_log=True) - - logging.debug('Running vtworker SplitDiff for 80-') - utils.run_vtworker(['-cell', 'test_nj', - '--use_v3_resharding_mode=false', - 'SplitDiff', - '--min_healthy_rdonly_tablets', '1', - 'test_keyspace/80-'], - auto_log=True) + + if base_sharding.use_multi_split_diff: + logging.debug('Running vtworker MultiSplitDiff for 0') + utils.run_vtworker(['-cell', 'test_nj', + '--use_v3_resharding_mode=false', + 'MultiSplitDiff', + '--min_healthy_rdonly_tablets', '1', + 'test_keyspace/0'], + auto_log=True) + else: + logging.debug('Running vtworker SplitDiff for -80') + utils.run_vtworker(['-cell', 'test_nj', + '--use_v3_resharding_mode=false', + 'SplitDiff', + '--min_healthy_rdonly_tablets', '1', + 'test_keyspace/-80'], + auto_log=True) + logging.debug('Running vtworker SplitDiff for 80-') + utils.run_vtworker(['-cell', 'test_nj', + '--use_v3_resharding_mode=false', + 'SplitDiff', + '--min_healthy_rdonly_tablets', '1', + 'test_keyspace/80-'], + auto_log=True) utils.pause('Good time to test vtworker for diffs') diff --git a/test/initial_sharding_multi_split_diff.py b/test/initial_sharding_multi_split_diff.py new file mode 100755 index 00000000000..d21c41cf162 --- /dev/null +++ b/test/initial_sharding_multi_split_diff.py @@ -0,0 +1,29 @@ +#!/usr/bin/env python +# +# Copyright 2017 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Re-runs initial_sharding.py using multiple-split-diff.""" + +from vtdb import keyrange_constants + +import base_sharding +import initial_sharding +import utils + +# this test is just re-running an entire initial_sharding.py with a +# varbinary keyspace_id +if __name__ == '__main__': + base_sharding.use_multi_split_diff = True + utils.main(initial_sharding) diff --git a/test/resharding.py b/test/resharding.py index 86c55480a7b..a942c6dd466 100755 --- a/test/resharding.py +++ b/test/resharding.py @@ -880,14 +880,25 @@ def test_resharding(self): # use vtworker to compare the data (after health-checking the destination # rdonly tablets so discovery works) utils.run_vtctl(['RunHealthCheck', shard_3_rdonly1.tablet_alias]) - logging.debug('Running vtworker SplitDiff') - utils.run_vtworker(['-cell', 'test_nj', - '--use_v3_resharding_mode=false', - 'SplitDiff', - '--exclude_tables', 'unrelated', - '--min_healthy_rdonly_tablets', '1', - 'test_keyspace/c0-'], - auto_log=True) + + if base_sharding.use_multi_split_diff: + logging.debug('Running vtworker MultiSplitDiff') + utils.run_vtworker(['-cell', 'test_nj', + '--use_v3_resharding_mode=false', + 'MultiSplitDiff', + '--exclude_tables', 'unrelated', + '--min_healthy_rdonly_tablets', '1', + 'test_keyspace/80-'], + auto_log=True) + else: + logging.debug('Running vtworker SplitDiff') + utils.run_vtworker(['-cell', 'test_nj', + '--use_v3_resharding_mode=false', + 'SplitDiff', + '--exclude_tables', 'unrelated', + '--min_healthy_rdonly_tablets', '1', + 'test_keyspace/c0-'], + auto_log=True) utils.run_vtctl(['ChangeSlaveType', shard_1_rdonly1.tablet_alias, 'rdonly'], auto_log=True) utils.run_vtctl(['ChangeSlaveType', shard_3_rdonly1.tablet_alias, 'rdonly'], @@ -1067,14 +1078,25 @@ def test_resharding(self): self._check_lots_timeout(3000, 80, 10, base=2000) # use vtworker to compare the data again - logging.debug('Running vtworker SplitDiff') - utils.run_vtworker(['-cell', 'test_nj', - '--use_v3_resharding_mode=false', - 'SplitDiff', - '--exclude_tables', 'unrelated', - '--min_healthy_rdonly_tablets', '1', - 'test_keyspace/c0-'], - auto_log=True) + if base_sharding.use_multi_split_diff: + logging.debug('Running vtworker MultiSplitDiff') + utils.run_vtworker(['-cell', 'test_nj', + '--use_v3_resharding_mode=false', + 'MultiSplitDiff', + '--exclude_tables', 'unrelated', + '--min_healthy_rdonly_tablets', '1', + 'test_keyspace/80-'], + auto_log=True) + else: + logging.debug('Running vtworker SplitDiff') + utils.run_vtworker(['-cell', 'test_nj', + '--use_v3_resharding_mode=false', + 'SplitDiff', + '--exclude_tables', 'unrelated', + '--min_healthy_rdonly_tablets', '1', + 'test_keyspace/c0-'], + auto_log=True) + utils.run_vtctl(['ChangeSlaveType', shard_1_rdonly1.tablet_alias, 'rdonly'], auto_log=True) utils.run_vtctl(['ChangeSlaveType', shard_3_rdonly1.tablet_alias, 'rdonly'], diff --git a/test/resharding_multi_split_diff.py b/test/resharding_multi_split_diff.py new file mode 100755 index 00000000000..64413128816 --- /dev/null +++ b/test/resharding_multi_split_diff.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python +# +# Copyright 2017 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Re-runs resharding.py with MultiSplitDiff.""" + +import base_sharding +import resharding +import utils + +if __name__ == '__main__': + base_sharding.use_multi_split_diff = True + utils.main(resharding) From 2499f32bebad17edc6fef38d187925dc96ce6ea8 Mon Sep 17 00:00:00 2001 From: Andres Taylor Date: Thu, 13 Dec 2018 08:56:22 +0100 Subject: [PATCH 3/5] Flags multi split diffs as failing before finished Signed-off-by: Andres Taylor --- go/vt/worker/chunk.go | 2 +- go/vt/worker/multi_split_diff.go | 22 ++++++++++++++++------ 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/go/vt/worker/chunk.go b/go/vt/worker/chunk.go index 9a005b7956e..2de9dc1c9f8 100644 --- a/go/vt/worker/chunk.go +++ b/go/vt/worker/chunk.go @@ -97,7 +97,7 @@ func generateChunks(ctx context.Context, wr *wrangler.Wrangler, tablet *topodata qr, err := wr.TabletManagerClient().ExecuteFetchAsApp(shortCtx, tablet, true, []byte(query), 1) cancel() if err != nil { - return nil, vterrors.Wrapf(err, "tablet: %v, table: %v: cannot determine MIN and MAX of the first primary key column. ExecuteFetchAsApp: %v", topoproto.TabletAliasString(tablet.Alias), td.Name) + return nil, vterrors.Wrapf(err, "tablet: %v, table: %v: cannot determine MIN and MAX of the first primary key column. ExecuteFetchAsApp", topoproto.TabletAliasString(tablet.Alias), td.Name) } if len(qr.Rows) != 1 { return nil, fmt.Errorf("tablet: %v, table: %v: cannot determine MIN and MAX of the first primary key column. Zero rows were returned", topoproto.TabletAliasString(tablet.Alias), td.Name) diff --git a/go/vt/worker/multi_split_diff.go b/go/vt/worker/multi_split_diff.go index 3449b74eb5a..4a3df2066ad 100644 --- a/go/vt/worker/multi_split_diff.go +++ b/go/vt/worker/multi_split_diff.go @@ -91,6 +91,8 @@ func (msdw *MultiSplitDiffWorker) StatusAsHTML() template.HTML { switch state { case WorkerStateDiff: result += "Running...
    \n" + case WorkerStateDiffWillFail: + result += "Running - have already found differences...\n" case WorkerStateDone: result += "Success.
    \n" } @@ -107,6 +109,8 @@ func (msdw *MultiSplitDiffWorker) StatusAsText() string { switch state { case WorkerStateDiff: result += "Running...\n" + case WorkerStateDiffWillFail: + result += "Running - have already found differences...\n" case WorkerStateDone: result += "Success.\n" } @@ -175,7 +179,7 @@ func (msdw *MultiSplitDiffWorker) searchInKeyspace(ctx context.Context, wg *sync shards, err := msdw.wr.TopoServer().GetShardNames(shortCtx, keyspace) cancel() if err != nil { - rec.RecordError(fmt.Errorf("failed to get list of shards for keyspace '%v': %v", keyspace, err)) + msdw.markAsWillFail(rec, fmt.Errorf("failed to get list of shards for keyspace '%v': %v", keyspace, err)) return } for _, shard := range shards { @@ -190,7 +194,7 @@ func (msdw *MultiSplitDiffWorker) produceShardInfo(ctx context.Context, wg *sync si, err := msdw.wr.TopoServer().GetShard(shortCtx, keyspace, shard) cancel() if err != nil { - rec.RecordError(fmt.Errorf("failed to get details for shard '%v': %v", topoproto.KeyspaceShardString(keyspace, shard), err)) + msdw.markAsWillFail(rec, fmt.Errorf("failed to get details for shard '%v': %v", topoproto.KeyspaceShardString(keyspace, shard), err)) return } @@ -244,7 +248,7 @@ func (msdw *MultiSplitDiffWorker) findDestinationShards(ctx context.Context) ([] first = false msdw.sourceUID = r } else if r != msdw.sourceUID { - rec.RecordError(fmt.Errorf("found a source ID that was different, aborting. %v vs %v", r, msdw.sourceUID)) + msdw.markAsWillFail(&rec, fmt.Errorf("found a source ID that was different, aborting. %v vs %v", r, msdw.sourceUID)) } } }() @@ -625,7 +629,7 @@ func (msdw *MultiSplitDiffWorker) tableDiffingConsumer(ctx context.Context, wg * for tableDefinition := range tableChan { err := msdw.diffSingleTable(ctx, wg, tableDefinition, keyspaceSchema) if err != nil { - rec.RecordError(err) + msdw.markAsWillFail(rec, err) msdw.wr.Logger().Errorf("%v", err) } } @@ -647,7 +651,7 @@ func (msdw *MultiSplitDiffWorker) gatherSchemaInfo(ctx context.Context) ([]*tabl destinationSchemaDefinition, err := msdw.wr.GetSchema( shortCtx, destinationAlias, nil /* tables */, msdw.excludeTables, false /* includeViews */) cancel() - rec.RecordError(err) + msdw.markAsWillFail(rec, err) destinationSchemaDefinitions[i] = destinationSchemaDefinition msdw.wr.Logger().Infof("Got schema from destination %v", destinationAlias) wg.Done() @@ -660,7 +664,7 @@ func (msdw *MultiSplitDiffWorker) gatherSchemaInfo(ctx context.Context) ([]*tabl sourceSchemaDefinition, err = msdw.wr.GetSchema( shortCtx, msdw.sourceAlias, nil /* tables */, msdw.excludeTables, false /* includeViews */) cancel() - rec.RecordError(err) + msdw.markAsWillFail(rec, err) msdw.wr.Logger().Infof("Got schema from source %v", msdw.sourceAlias) wg.Done() }() @@ -755,3 +759,9 @@ func (msdw *MultiSplitDiffWorker) diff(ctx context.Context) error { return rec.Error() } + +// markAsWillFail records the error and changes the state of the worker to reflect this +func (msdw *MultiSplitDiffWorker) markAsWillFail(er concurrency.ErrorRecorder, err error) { + er.RecordError(err) + msdw.SetState(WorkerStateDiffWillFail) +} From d50a6480c448faea176961c9d9223536c412b263 Mon Sep 17 00:00:00 2001 From: Andres Taylor Date: Thu, 13 Dec 2018 09:44:49 +0100 Subject: [PATCH 4/5] Made finding destination shards single threaded Signed-off-by: Andres Taylor --- go/vt/worker/multi_split_diff.go | 154 ++++++++++++++----------------- 1 file changed, 67 insertions(+), 87 deletions(-) diff --git a/go/vt/worker/multi_split_diff.go b/go/vt/worker/multi_split_diff.go index 4a3df2066ad..dbf899883fa 100644 --- a/go/vt/worker/multi_split_diff.go +++ b/go/vt/worker/multi_split_diff.go @@ -21,12 +21,13 @@ import ( "html/template" "sync" + "vitess.io/vitess/go/vt/vterrors" + "golang.org/x/net/context" "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/mysqlctl/tmutils" "vitess.io/vitess/go/vt/topo" - "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/wrangler" "sort" @@ -173,39 +174,36 @@ func (msdw *MultiSplitDiffWorker) run(ctx context.Context) error { return checkDone(ctx) } -func (msdw *MultiSplitDiffWorker) searchInKeyspace(ctx context.Context, wg *sync.WaitGroup, rec *concurrency.AllErrorRecorder, keyspace string, result chan *topo.ShardInfo, UIDs chan uint32) { - defer wg.Done() +// init phase: +// - read the shard info, make sure it has sources +func (msdw *MultiSplitDiffWorker) init(ctx context.Context) error { + msdw.SetState(WorkerStateInit) + + var err error shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) - shards, err := msdw.wr.TopoServer().GetShardNames(shortCtx, keyspace) + msdw.keyspaceInfo, err = msdw.wr.TopoServer().GetKeyspace(shortCtx, msdw.keyspace) cancel() if err != nil { - msdw.markAsWillFail(rec, fmt.Errorf("failed to get list of shards for keyspace '%v': %v", keyspace, err)) - return - } - for _, shard := range shards { - wg.Add(1) - go msdw.produceShardInfo(ctx, wg, rec, keyspace, shard, result, UIDs) + return fmt.Errorf("cannot read keyspace %v: %v", msdw.keyspace, err) } -} - -func (msdw *MultiSplitDiffWorker) produceShardInfo(ctx context.Context, wg *sync.WaitGroup, rec *concurrency.AllErrorRecorder, keyspace string, shard string, result chan *topo.ShardInfo, UIDs chan uint32) { - defer wg.Done() - shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) - si, err := msdw.wr.TopoServer().GetShard(shortCtx, keyspace, shard) + shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) + msdw.shardInfo, err = msdw.wr.TopoServer().GetShard(shortCtx, msdw.keyspace, msdw.shard) cancel() if err != nil { - msdw.markAsWillFail(rec, fmt.Errorf("failed to get details for shard '%v': %v", topoproto.KeyspaceShardString(keyspace, shard), err)) - return + return fmt.Errorf("cannot read shard %v/%v: %v", msdw.keyspace, msdw.shard, err) } - for _, sourceShard := range si.SourceShards { - if len(sourceShard.Tables) == 0 && sourceShard.Keyspace == msdw.keyspace && sourceShard.Shard == msdw.shard { - result <- si - UIDs <- sourceShard.Uid - // Prevents the same shard from showing up multiple times - return - } + if !msdw.shardInfo.HasMaster() { + return fmt.Errorf("shard %v/%v has no master", msdw.keyspace, msdw.shard) + } + + destinationShards, err := msdw.findDestinationShards(ctx) + if err != nil { + return fmt.Errorf("findDestinationShards() failed for %v/%v/%v: %v", msdw.cell, msdw.keyspace, msdw.shard, err) } + msdw.destinationShards = destinationShards + + return nil } // findDestinationShards finds all the shards that have filtered replication from the source shard @@ -214,53 +212,17 @@ func (msdw *MultiSplitDiffWorker) findDestinationShards(ctx context.Context) ([] keyspaces, err := msdw.wr.TopoServer().GetKeyspaces(shortCtx) cancel() if err != nil { - return nil, fmt.Errorf("failed to get list of keyspaces: %v", err) + return nil, vterrors.Wrap(err, "failed to get list of keyspaces") } - producers := sync.WaitGroup{} - consumer := sync.WaitGroup{} - result := make(chan *topo.ShardInfo, 2 /*the consumer will just copy out the data, so this should be enough*/) - sourceUIDs := make(chan uint32, 2 /*the consumer will just copy out the data, so this should be enough*/) - rec := concurrency.AllErrorRecorder{} - var resultArray = make([]*topo.ShardInfo, 0, len(keyspaces)) + var resultArray []*topo.ShardInfo for _, keyspace := range keyspaces { - producers.Add(1) - go msdw.searchInKeyspace(ctx, &producers, &rec, keyspace, result, sourceUIDs) - } - - // Start the result array consumer - consumer.Add(1) - go func() { - defer consumer.Done() - for r := range result { - resultArray = append(resultArray, r) - } - }() - - // Start the sourceUID check consumer - consumer.Add(1) - go func() { - defer consumer.Done() - first := true - for r := range sourceUIDs { - if first { - first = false - msdw.sourceUID = r - } else if r != msdw.sourceUID { - msdw.markAsWillFail(&rec, fmt.Errorf("found a source ID that was different, aborting. %v vs %v", r, msdw.sourceUID)) - } + shardInfo, err := msdw.findShardsInKeyspace(ctx, keyspace) + if err != nil { + return nil, err } - }() - - // we use this pattern because we don't know the size of shards up front, so we are using a buffered channel - producers.Wait() - close(result) - close(sourceUIDs) - consumer.Wait() - - if rec.HasErrors() { - return nil, rec.Error() + resultArray = append(resultArray, shardInfo...) } if len(resultArray) == 0 { @@ -269,36 +231,54 @@ func (msdw *MultiSplitDiffWorker) findDestinationShards(ctx context.Context) ([] return resultArray, nil } -// init phase: -// - read the shard info, make sure it has sources -func (msdw *MultiSplitDiffWorker) init(ctx context.Context) error { - msdw.SetState(WorkerStateInit) - - var err error +func (msdw *MultiSplitDiffWorker) findShardsInKeyspace(ctx context.Context, keyspace string) ([]*topo.ShardInfo, error) { shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) - msdw.keyspaceInfo, err = msdw.wr.TopoServer().GetKeyspace(shortCtx, msdw.keyspace) - cancel() - if err != nil { - return fmt.Errorf("cannot read keyspace %v: %v", msdw.keyspace, err) - } - shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) - msdw.shardInfo, err = msdw.wr.TopoServer().GetShard(shortCtx, msdw.keyspace, msdw.shard) + shards, err := msdw.wr.TopoServer().GetShardNames(shortCtx, keyspace) cancel() if err != nil { - return fmt.Errorf("cannot read shard %v/%v: %v", msdw.keyspace, msdw.shard, err) + return nil, vterrors.Wrapf(err, "failed to get list of shards for keyspace '%v'", keyspace) } - if !msdw.shardInfo.HasMaster() { - return fmt.Errorf("shard %v/%v has no master", msdw.keyspace, msdw.shard) + var resultArray []*topo.ShardInfo + first := true + + for _, shard := range shards { + shardInfo, uid, err := msdw.getShardInfo(ctx, keyspace, shard) + if err != nil { + return nil, err + } + // There might not be any source shards here + if shardInfo != nil { + if first { + msdw.sourceUID = uid + first = false + } else if msdw.sourceUID != uid { + return nil, fmt.Errorf("found a source ID that was different, aborting. %v vs %v", msdw.sourceUID, uid) + } + + resultArray = append(resultArray, shardInfo) + } } - destinationShards, err := msdw.findDestinationShards(ctx) + return resultArray, nil +} + +func (msdw *MultiSplitDiffWorker) getShardInfo(ctx context.Context, keyspace string, shard string) (*topo.ShardInfo, uint32, error) { + shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) + si, err := msdw.wr.TopoServer().GetShard(shortCtx, keyspace, shard) + cancel() if err != nil { - return fmt.Errorf("findDestinationShards() failed for %v/%v/%v: %v", msdw.cell, msdw.keyspace, msdw.shard, err) + return nil, 0, vterrors.Wrap(err, "failed to get shard info from toposerver") } - msdw.destinationShards = destinationShards - return nil + for _, sourceShard := range si.SourceShards { + if len(sourceShard.Tables) == 0 && sourceShard.Keyspace == msdw.keyspace && sourceShard.Shard == msdw.shard { + // Prevents the same shard from showing up multiple times + return si, sourceShard.Uid, nil + } + } + + return nil, 0, nil } // findTargets phase: From 4242365f873ff64d7166c63a04c3e7809f533cc3 Mon Sep 17 00:00:00 2001 From: Andres Taylor Date: Mon, 17 Dec 2018 15:35:58 +0100 Subject: [PATCH 5/5] Fix govet error Signed-off-by: Andres Taylor --- go/vt/worker/chunk.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/worker/chunk.go b/go/vt/worker/chunk.go index e02e5b7140a..2de9dc1c9f8 100644 --- a/go/vt/worker/chunk.go +++ b/go/vt/worker/chunk.go @@ -97,7 +97,7 @@ func generateChunks(ctx context.Context, wr *wrangler.Wrangler, tablet *topodata qr, err := wr.TabletManagerClient().ExecuteFetchAsApp(shortCtx, tablet, true, []byte(query), 1) cancel() if err != nil { - return nil, vterrors.Wrapf(err, "tablet: %v, table: %v: cannot determine MIN and MAX of the first primary key column. ExecuteFetchAsApp", topoproto.TabletAliasString(tablet.Alias), td.Name, err) + return nil, vterrors.Wrapf(err, "tablet: %v, table: %v: cannot determine MIN and MAX of the first primary key column. ExecuteFetchAsApp", topoproto.TabletAliasString(tablet.Alias), td.Name) } if len(qr.Rows) != 1 { return nil, fmt.Errorf("tablet: %v, table: %v: cannot determine MIN and MAX of the first primary key column. Zero rows were returned", topoproto.TabletAliasString(tablet.Alias), td.Name)