diff --git a/go/vt/throttler/throttler.go b/go/vt/throttler/throttler.go index 179b6f1059f..87184e0e751 100644 --- a/go/vt/throttler/throttler.go +++ b/go/vt/throttler/throttler.go @@ -44,9 +44,9 @@ const ( // not throttled. NotThrottled time.Duration = 0 - // ZeroRateNoProgess can be used to set maxRate to 0. In this case, the + // ZeroRateNoProgress can be used to set maxRate to 0. In this case, the // throttler won't let any requests through until the rate is increased again. - ZeroRateNoProgess = 0 + ZeroRateNoProgress = 0 // MaxRateModuleDisabled can be set in NewThrottler() to disable throttling // by a fixed rate. @@ -262,7 +262,7 @@ func (t *Throttler) updateMaxRate() { return } - if maxRate != ZeroRateNoProgess && maxRate < int64(threadsRunning) { + if maxRate != ZeroRateNoProgress && maxRate < int64(threadsRunning) { log.Warningf("Set maxRate is less than the number of threads (%v). To prevent threads from starving, maxRate was increased from: %v to: %v.", threadsRunning, maxRate, threadsRunning) maxRate = int64(threadsRunning) } diff --git a/go/vt/throttler/throttler_test.go b/go/vt/throttler/throttler_test.go index b8d6faf6363..b735e3c04e4 100644 --- a/go/vt/throttler/throttler_test.go +++ b/go/vt/throttler/throttler_test.go @@ -324,7 +324,7 @@ func TestThreadFinished(t *testing.T) { func TestThrottle_MaxRateIsZero(t *testing.T) { fc := &fakeClock{} // 1 Thread, 0 QPS. - throttler, _ := newThrottlerWithClock("test", "queries", 1, ZeroRateNoProgess, ReplicationLagModuleDisabled, fc.now) + throttler, _ := newThrottlerWithClock("test", "queries", 1, ZeroRateNoProgress, ReplicationLagModuleDisabled, fc.now) defer throttler.Close() fc.setNow(1000 * time.Millisecond) diff --git a/go/vt/worker/defaults.go b/go/vt/worker/defaults.go index 7bf60295f71..adf2ccf3546 100644 --- a/go/vt/worker/defaults.go +++ b/go/vt/worker/defaults.go @@ -46,11 +46,12 @@ const ( // StreamExecute response. As of 06/2015, the default for it was 32 kB. // Note that higher values for this flag --destination_pack_count will // increase memory consumption in vtworker, vttablet and mysql. - defaultDestinationPackCount = 10 - defaultDestinationWriterCount = 20 - defaultMinHealthyRdonlyTablets = 2 - defaultDestTabletType = "RDONLY" - defaultParallelDiffsCount = 8 - defaultMaxTPS = throttler.MaxRateModuleDisabled - defaultMaxReplicationLag = throttler.ReplicationLagModuleDisabled + defaultDestinationPackCount = 10 + defaultDestinationWriterCount = 20 + defaultMinHealthyTablets = 2 + defaultDestTabletType = "RDONLY" + defaultParallelDiffsCount = 8 + defaultMaxTPS = throttler.MaxRateModuleDisabled + defaultMaxReplicationLag = throttler.ReplicationLagModuleDisabled + defaultUseConsistentSnapshot = false ) diff --git a/go/vt/worker/diff_utils.go b/go/vt/worker/diff_utils.go index 551e12281fa..4ff6ebc92ad 100644 --- a/go/vt/worker/diff_utils.go +++ b/go/vt/worker/diff_utils.go @@ -27,6 +27,8 @@ import ( "time" "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttablet/tmclient" + "vitess.io/vitess/go/vt/wrangler" "golang.org/x/net/context" @@ -36,6 +38,7 @@ import ( "vitess.io/vitess/go/vt/grpcclient" "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/logutil" + "vitess.io/vitess/go/vt/proto/query" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vtgate/vindexes" @@ -157,8 +160,8 @@ func (qrr *QueryResultReader) Fields() []*querypb.Field { } // Close closes the connection to the tablet. -func (qrr *QueryResultReader) Close(ctx context.Context) error { - return qrr.conn.Close(ctx) +func (qrr *QueryResultReader) Close(ctx context.Context) { + qrr.conn.Close(ctx) } // v3KeyRangeFilter is a sqltypes.ResultStream implementation that filters @@ -266,6 +269,16 @@ func TransactionalTableScan(ctx context.Context, log logutil.Logger, ts *topo.Se return NewTransactionalQueryResultReaderForTablet(ctx, ts, tabletAlias, sql, txID) } +// CreateTargetFrom is a helper function +func CreateTargetFrom(tablet *topodatapb.Tablet) *query.Target { + return &query.Target{ + Cell: tablet.Alias.Cell, + Keyspace: tablet.Keyspace, + Shard: tablet.Shard, + TabletType: tablet.Type, + } +} + // TableScanByKeyRange returns a QueryResultReader that gets all the // rows from a table that match the supplied KeyRange, ordered by // Primary Key. The returned columns are ordered with the Primary Key @@ -637,3 +650,129 @@ func (rd *RowDiffer) Go(log logutil.Logger) (dr DiffReport, err error) { advanceRight = true } } + +// createTransactions returns an array of transactions that all share the same view of the data. +// It will check that no new transactions have been seen between the creation of the underlying transactions, +// to guarantee that all TransactionalTableScanner are pointing to the same point +func createTransactions(ctx context.Context, numberOfScanners int, wr *wrangler.Wrangler, cleaner *wrangler.Cleaner, queryService queryservice.QueryService, target *query.Target, tabletInfo *topodatapb.Tablet) ([]int64, error) { + scanners := make([]int64, numberOfScanners) + for i := 0; i < numberOfScanners; i++ { + + tx, err := queryService.Begin(ctx, target, &query.ExecuteOptions{ + // Make sure our tx is not killed by tx sniper + Workload: query.ExecuteOptions_DBA, + TransactionIsolation: query.ExecuteOptions_CONSISTENT_SNAPSHOT_READ_ONLY, + }) + if err != nil { + return nil, fmt.Errorf("could not open transaction on %v\n%v", topoproto.TabletAliasString(tabletInfo.Alias), err) + } + + // Remember to rollback the transactions + cleaner.Record("CloseTransaction", topoproto.TabletAliasString(tabletInfo.Alias), func(ctx context.Context, wr *wrangler.Wrangler) error { + queryService, err := tabletconn.GetDialer()(tabletInfo, true) + if err != nil { + return err + } + return queryService.Rollback(ctx, target, tx) + }) + + scanners[i] = tx + } + + return scanners, nil +} + +// TableScanner is a simple abstraction that allows a TableScanner user to remain impervious +// by the transactionality of the connection +type TableScanner interface { + ScanTable(ctx context.Context, td *tabletmanagerdatapb.TableDefinition) (*QueryResultReader, error) +} + +// TransactionalTableScanner works inside of a transaction set up with CONSISTENT SNAPSHOT +type TransactionalTableScanner struct { + wr *wrangler.Wrangler + cleaner *wrangler.Cleaner + tabletAlias *topodatapb.TabletAlias + queryService queryservice.QueryService + tx int64 +} + +// ScanTable performs a full table scan, ordered by the primary keys, if any +func (tt TransactionalTableScanner) ScanTable(ctx context.Context, td *tabletmanagerdatapb.TableDefinition) (*QueryResultReader, error) { + return TransactionalTableScan(ctx, tt.wr.Logger(), tt.wr.TopoServer(), tt.tabletAlias, tt.tx, td) +} + +// NonTransactionalTableScanner just passes through the queries, and relies on paused replication traffic taking care of the consistent snapshot part +type NonTransactionalTableScanner struct { + wr *wrangler.Wrangler + cleaner *wrangler.Cleaner + tabletAlias *topodatapb.TabletAlias + queryService queryservice.QueryService +} + +// ScanTable performs a full table scan, ordered by the primary keys, if any +func (ntts NonTransactionalTableScanner) ScanTable(ctx context.Context, td *tabletmanagerdatapb.TableDefinition) (*QueryResultReader, error) { + return TableScan(ctx, ntts.wr.Logger(), ntts.wr.TopoServer(), ntts.tabletAlias, td) +} + +// CreateConsistentTableScanners will momentarily stop updates on the tablet, and then create connections that are all +// consistent snapshots of the same point in the transaction history +func CreateConsistentTableScanners(ctx context.Context, tablet *topo.TabletInfo, wr *wrangler.Wrangler, cleaner *wrangler.Cleaner, numberOfScanners int) ([]TableScanner, string, error) { + txs, gtid, err := CreateConsistentTransactions(ctx, tablet, wr, cleaner, numberOfScanners) + if err != nil { + return nil, "", err + } + + queryService, err := tabletconn.GetDialer()(tablet.Tablet, true) + defer queryService.Close(ctx) + + scanners := make([]TableScanner, numberOfScanners) + for i, tx := range txs { + scanners[i] = TransactionalTableScanner{ + wr: wr, + cleaner: cleaner, + tabletAlias: tablet.Alias, + queryService: queryService, + tx: tx, + } + } + + return scanners, gtid, nil +} + +// CreateConsistentTransactions creates a number of consistent snapshot transactions, +// all starting from the same spot in the tx log +func CreateConsistentTransactions(ctx context.Context, tablet *topo.TabletInfo, wr *wrangler.Wrangler, cleaner *wrangler.Cleaner, numberOfScanners int) ([]int64, string, error) { + tm := tmclient.NewTabletManagerClient() + defer tm.Close() + + // Lock all tables with a read lock to pause replication + err := tm.LockTables(ctx, tablet.Tablet) + if err != nil { + return nil, "", fmt.Errorf("could not lock tables on %v\n%v", topoproto.TabletAliasString(tablet.Tablet.Alias), err) + } + defer func() { + tm := tmclient.NewTabletManagerClient() + defer tm.Close() + tm.UnlockTables(ctx, tablet.Tablet) + wr.Logger().Infof("tables unlocked on %v", topoproto.TabletAliasString(tablet.Tablet.Alias)) + }() + + wr.Logger().Infof("tables locked on %v", topoproto.TabletAliasString(tablet.Tablet.Alias)) + target := CreateTargetFrom(tablet.Tablet) + + // Create transactions + queryService, err := tabletconn.GetDialer()(tablet.Tablet, true) + defer queryService.Close(ctx) + connections, err := createTransactions(ctx, numberOfScanners, wr, cleaner, queryService, target, tablet.Tablet) + if err != nil { + return nil, "", fmt.Errorf("failed to create transactions on %v: %v", topoproto.TabletAliasString(tablet.Tablet.Alias), err) + } + wr.Logger().Infof("transactions created on %v", topoproto.TabletAliasString(tablet.Tablet.Alias)) + executedGtid, err := tm.MasterPosition(ctx, tablet.Tablet) + if err != nil { + return nil, "", fmt.Errorf("could not read executed GTID set on %v\n%v", topoproto.TabletAliasString(tablet.Tablet.Alias), err) + } + + return connections, executedGtid, nil +} diff --git a/go/vt/worker/legacy_split_clone_cmd.go b/go/vt/worker/legacy_split_clone_cmd.go index ff48c291d4b..ab73cca3903 100644 --- a/go/vt/worker/legacy_split_clone_cmd.go +++ b/go/vt/worker/legacy_split_clone_cmd.go @@ -92,7 +92,7 @@ func commandLegacySplitClone(wi *Instance, wr *wrangler.Wrangler, subFlags *flag sourceReaderCount := subFlags.Int("source_reader_count", defaultSourceReaderCount, "number of concurrent streaming queries to use on the source") destinationPackCount := subFlags.Int("destination_pack_count", defaultDestinationPackCount, "number of packets to pack in one destination insert") destinationWriterCount := subFlags.Int("destination_writer_count", defaultDestinationWriterCount, "number of concurrent RPCs to execute on the destination") - minHealthyRdonlyTablets := subFlags.Int("min_healthy_rdonly_tablets", defaultMinHealthyRdonlyTablets, "minimum number of healthy RDONLY tablets before taking out one") + minHealthyRdonlyTablets := subFlags.Int("min_healthy_rdonly_tablets", defaultMinHealthyTablets, "minimum number of healthy RDONLY tablets before taking out one") maxTPS := subFlags.Int64("max_tps", defaultMaxTPS, "if non-zero, limit copy to maximum number of (write) transactions/second on the destination (unlimited by default)") if err := subFlags.Parse(args); err != nil { return nil, err @@ -146,7 +146,7 @@ func interactiveLegacySplitClone(ctx context.Context, wi *Instance, wr *wrangler result["DefaultSourceReaderCount"] = fmt.Sprintf("%v", defaultSourceReaderCount) result["DefaultDestinationPackCount"] = fmt.Sprintf("%v", defaultDestinationPackCount) result["DefaultDestinationWriterCount"] = fmt.Sprintf("%v", defaultDestinationWriterCount) - result["DefaultMinHealthyRdonlyTablets"] = fmt.Sprintf("%v", defaultMinHealthyRdonlyTablets) + result["DefaultMinHealthyRdonlyTablets"] = fmt.Sprintf("%v", defaultMinHealthyTablets) result["DefaultMaxTPS"] = fmt.Sprintf("%v", defaultMaxTPS) return nil, legacySplitCloneTemplate2, result, nil } diff --git a/go/vt/worker/multi_split_diff.go b/go/vt/worker/multi_split_diff.go index 71ee4c22c28..002eb70e0f5 100644 --- a/go/vt/worker/multi_split_diff.go +++ b/go/vt/worker/multi_split_diff.go @@ -24,16 +24,20 @@ import ( "time" "golang.org/x/net/context" - "vitess.io/vitess/go/sqltypes" - "vitess.io/vitess/go/vt/binlog/binlogplayer" + "vitess.io/vitess/go/vt/vttablet/queryservice" + "vitess.io/vitess/go/vt/vttablet/tabletconn" + "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/mysqlctl/tmutils" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/wrangler" + + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/binlog/binlogplayer" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" - "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/vindexes" - "vitess.io/vitess/go/vt/wrangler" ) // MultiSplitDiffWorker executes a diff between a destination shard and its @@ -46,10 +50,12 @@ type MultiSplitDiffWorker struct { keyspace string shard string excludeTables []string - minHealthyRdonlyTablets int + minHealthyTablets int parallelDiffsCount int waitForFixedTimeRatherThanGtidSet bool cleaner *wrangler.Cleaner + useConsistentSnapshot bool + tabletType topodatapb.TabletType // populated during WorkerStateInit, read-only after that keyspaceInfo *topo.KeyspaceInfo @@ -58,23 +64,27 @@ type MultiSplitDiffWorker struct { destinationShards []*topo.ShardInfo // populated during WorkerStateFindTargets, read-only after that - sourceAlias *topodatapb.TabletAlias - destinationAliases []*topodatapb.TabletAlias // matches order of destinationShards + sourceAlias *topodatapb.TabletAlias + destinationAliases []*topodatapb.TabletAlias // matches order of destinationShards + sourceScanners []TableScanner + destinationScanners [][]TableScanner } // NewMultiSplitDiffWorker returns a new MultiSplitDiffWorker object. -func NewMultiSplitDiffWorker(wr *wrangler.Wrangler, cell, keyspace, shard string, excludeTables []string, minHealthyRdonlyTablets, parallelDiffsCount int, waitForFixedTimeRatherThanGtidSet bool) Worker { +func NewMultiSplitDiffWorker(wr *wrangler.Wrangler, cell, keyspace, shard string, excludeTables []string, minHealthyTablets, parallelDiffsCount int, waitForFixedTimeRatherThanGtidSet bool, useConsistentSnapshot bool, tabletType topodatapb.TabletType) Worker { return &MultiSplitDiffWorker{ - waitForFixedTimeRatherThanGtidSet: waitForFixedTimeRatherThanGtidSet, StatusWorker: NewStatusWorker(), wr: wr, cell: cell, keyspace: keyspace, shard: shard, excludeTables: excludeTables, - minHealthyRdonlyTablets: minHealthyRdonlyTablets, + minHealthyTablets: minHealthyTablets, parallelDiffsCount: parallelDiffsCount, cleaner: &wrangler.Cleaner{}, + useConsistentSnapshot: useConsistentSnapshot, + waitForFixedTimeRatherThanGtidSet: waitForFixedTimeRatherThanGtidSet, + tabletType: tabletType, } } @@ -154,8 +164,8 @@ func (msdw *MultiSplitDiffWorker) run(ctx context.Context) error { } // third phase: synchronize replication - if err := msdw.synchronizeReplication(ctx); err != nil { - return fmt.Errorf("synchronizeReplication() failed: %v", err) + if err := msdw.synchronizeSrcAndDestTxState(ctx); err != nil { + return fmt.Errorf("synchronizeSrcAndDestTxState() failed: %v", err) } if err := checkDone(ctx); err != nil { return err @@ -174,6 +184,12 @@ func (msdw *MultiSplitDiffWorker) run(ctx context.Context) error { func (msdw *MultiSplitDiffWorker) init(ctx context.Context) error { msdw.SetState(WorkerStateInit) + if msdw.useConsistentSnapshot { + msdw.wr.Logger().Infof("splitting using consistent snapshot") + } else { + msdw.wr.Logger().Infof("splitting using STOP SLAVE") + } + var err error shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) msdw.keyspaceInfo, err = msdw.wr.TopoServer().GetKeyspace(shortCtx, msdw.keyspace) @@ -282,47 +298,34 @@ func (msdw *MultiSplitDiffWorker) getShardInfo(ctx context.Context, keyspace str // - mark them all as 'worker' pointing back to us func (msdw *MultiSplitDiffWorker) findTargets(ctx context.Context) error { msdw.SetState(WorkerStateFindTargets) - var err error - // find an appropriate tablet in the source shard - msdw.sourceAlias, err = FindWorkerTablet( - ctx, - msdw.wr, - msdw.cleaner, - nil, /* tsc */ - msdw.cell, - msdw.keyspace, - msdw.shard, - 1, /* minHealthyTablets */ - topodatapb.TabletType_RDONLY) + var finderFunc func(keyspace string, shard string) (*topodatapb.TabletAlias, error) + if msdw.tabletType == topodatapb.TabletType_RDONLY { + finderFunc = func(keyspace string, shard string) (*topodatapb.TabletAlias, error) { + return FindWorkerTablet(ctx, msdw.wr, msdw.cleaner, nil /*tsc*/, msdw.cell, keyspace, shard, 1, topodatapb.TabletType_RDONLY) + } + } else { + finderFunc = func(keyspace string, shard string) (*topodatapb.TabletAlias, error) { + return FindHealthyTablet(ctx, msdw.wr, nil /*tsc*/, msdw.cell, keyspace, shard, 1, msdw.tabletType) + } + } + + msdw.sourceAlias, err = finderFunc(msdw.keyspace, msdw.shard) if err != nil { - return fmt.Errorf("FindWorkerTablet() failed for %v/%v/%v: %v", msdw.cell, msdw.keyspace, msdw.shard, err) + return fmt.Errorf("finding source failed for %v/%v/%v: %v", msdw.cell, msdw.keyspace, msdw.shard, err) } - // find an appropriate tablet in each destination shard msdw.destinationAliases = make([]*topodatapb.TabletAlias, len(msdw.destinationShards)) for i, destinationShard := range msdw.destinationShards { keyspace := destinationShard.Keyspace() shard := destinationShard.ShardName() - destinationAlias, err := FindWorkerTablet( - ctx, - msdw.wr, - msdw.cleaner, - nil, /* tsc */ - msdw.cell, - keyspace, - shard, - msdw.minHealthyRdonlyTablets, - topodatapb.TabletType_RDONLY) + destinationAlias, err := finderFunc(keyspace, shard) if err != nil { - return fmt.Errorf("FindWorkerTablet() failed for %v/%v/%v: %v", msdw.cell, keyspace, shard, err) + return fmt.Errorf("finding destination failed for %v/%v/%v: %v", msdw.cell, keyspace, shard, err) } msdw.destinationAliases[i] = destinationAlias } - if err != nil { - return fmt.Errorf("FindWorkerTablet() failed for %v/%v/%v: %v", msdw.cell, msdw.keyspace, msdw.shard, err) - } return nil } @@ -330,22 +333,22 @@ func (msdw *MultiSplitDiffWorker) findTargets(ctx context.Context) error { // ask the master of the destination shard to pause filtered replication, // and return the source binlog positions // (add a cleanup task to restart filtered replication on master) -func (msdw *MultiSplitDiffWorker) stopReplicationOnAllDestinationMasters(ctx context.Context, masterInfos []*topo.TabletInfo) ([]string, error) { +func (msdw *MultiSplitDiffWorker) stopVreplicationOnAll(ctx context.Context, tabletInfo []*topo.TabletInfo) ([]string, error) { destVreplicationPos := make([]string, len(msdw.destinationShards)) for i, shardInfo := range msdw.destinationShards { - masterInfo := masterInfos[i] + tablet := tabletInfo[i].Tablet - msdw.wr.Logger().Infof("Stopping master binlog replication on %v", shardInfo.MasterAlias) + msdw.wr.Logger().Infof("stopping master binlog replication on %v", shardInfo.MasterAlias) shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) - _, err := msdw.wr.TabletManagerClient().VReplicationExec(shortCtx, masterInfo.Tablet, binlogplayer.StopVReplication(msdw.sourceUID, "for split diff")) + _, err := msdw.wr.TabletManagerClient().VReplicationExec(shortCtx, tablet, binlogplayer.StopVReplication(msdw.sourceUID, "for split diff")) cancel() if err != nil { return nil, fmt.Errorf("VReplicationExec(stop) for %v failed: %v", shardInfo.MasterAlias, err) } - wrangler.RecordVReplicationAction(msdw.cleaner, masterInfo.Tablet, binlogplayer.StartVReplication(msdw.sourceUID)) + wrangler.RecordVReplicationAction(msdw.cleaner, tablet, binlogplayer.StartVReplication(msdw.sourceUID)) shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) - p3qr, err := msdw.wr.TabletManagerClient().VReplicationExec(shortCtx, masterInfo.Tablet, binlogplayer.ReadVReplicationPos(msdw.sourceUID)) + p3qr, err := msdw.wr.TabletManagerClient().VReplicationExec(shortCtx, tablet, binlogplayer.ReadVReplicationPos(msdw.sourceUID)) cancel() if err != nil { return nil, fmt.Errorf("VReplicationExec(stop) for %v failed: %v", msdw.shardInfo.MasterAlias, err) @@ -362,12 +365,12 @@ func (msdw *MultiSplitDiffWorker) stopReplicationOnAllDestinationMasters(ctx con return destVreplicationPos, nil } -func (msdw *MultiSplitDiffWorker) getTabletInfoForShard(ctx context.Context, shardInfo *topo.ShardInfo) (*topo.TabletInfo, error) { +func (msdw *MultiSplitDiffWorker) getMasterTabletInfoForShard(ctx context.Context, shardInfo *topo.ShardInfo) (*topo.TabletInfo, error) { shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) masterInfo, err := msdw.wr.TopoServer().GetTablet(shortCtx, shardInfo.MasterAlias) cancel() if err != nil { - return nil, fmt.Errorf("synchronizeReplication: cannot get Tablet record for master %v: %v", msdw.shardInfo.MasterAlias, err) + return nil, fmt.Errorf("synchronizeSrcAndDestTxState: cannot get Tablet record for master %v: %v", msdw.shardInfo.MasterAlias, err) } return masterInfo, nil } @@ -376,7 +379,7 @@ func (msdw *MultiSplitDiffWorker) getTabletInfoForShard(ctx context.Context, sha // destination masters. Return the reached position // (add a cleanup task to restart binlog replication on the source tablet, and // change the existing ChangeSlaveType cleanup action to 'spare' type) -func (msdw *MultiSplitDiffWorker) stopReplicationOnSourceRdOnlyTabletAt(ctx context.Context, destVreplicationPos []string) (string, error) { +func (msdw *MultiSplitDiffWorker) stopReplicationOnSourceTabletAt(ctx context.Context, destVreplicationPos []string) (string, error) { shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) sourceTablet, err := msdw.wr.TopoServer().GetTablet(shortCtx, msdw.sourceAlias) cancel() @@ -392,12 +395,8 @@ func (msdw *MultiSplitDiffWorker) stopReplicationOnSourceRdOnlyTabletAt(ctx cont // if we make StopSlaveMinimum take multiple blp positions then this will be a lot more efficient because you just // check for each position using WAIT_UNTIL_SQL_THREAD_AFTER_GTIDS and then stop replication. - msdw.wr.Logger().Infof("Stopping slave %v at a minimum of %v", msdw.sourceAlias, vreplicationPos) - // read the tablet - sourceTablet, err := msdw.wr.TopoServer().GetTablet(shortCtx, msdw.sourceAlias) - if err != nil { - return "", err - } + msdw.wr.Logger().Infof("stopping slave %v at a minimum of %v", msdw.sourceAlias, vreplicationPos) + shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) msdw.wr.TabletManagerClient().StartSlave(shortCtx, sourceTablet.Tablet) cancel() @@ -420,21 +419,22 @@ func (msdw *MultiSplitDiffWorker) stopReplicationOnSourceRdOnlyTabletAt(ctx cont } // ask the master of the destination shard to resume filtered replication -// up to the new list of positions, and return its binlog position. -func (msdw *MultiSplitDiffWorker) resumeReplicationOnDestinationMasterUntil(ctx context.Context, shardInfo *topo.ShardInfo, mysqlPos string, masterInfo *topo.TabletInfo) (string, error) { - msdw.wr.Logger().Infof("Restarting master %v until it catches up to %v", shardInfo.MasterAlias, mysqlPos) +// up to the specified source position, and return the destination position. +func (msdw *MultiSplitDiffWorker) stopVreplicationAt(ctx context.Context, shardInfo *topo.ShardInfo, sourcePosition string, masterInfo *topo.TabletInfo) (string, error) { + msdw.wr.Logger().Infof("Restarting master %v until it catches up to %v", shardInfo.MasterAlias, sourcePosition) shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) - _, err := msdw.wr.TabletManagerClient().VReplicationExec(shortCtx, masterInfo.Tablet, binlogplayer.StartVReplicationUntil(msdw.sourceUID, mysqlPos)) + _, err := msdw.wr.TabletManagerClient().VReplicationExec(shortCtx, masterInfo.Tablet, binlogplayer.StartVReplicationUntil(msdw.sourceUID, sourcePosition)) cancel() if err != nil { - return "", fmt.Errorf("VReplication(start until) for %v until %v failed: %v", shardInfo.MasterAlias, mysqlPos, err) + return "", fmt.Errorf("VReplication(start until) for %v until %v failed: %v", shardInfo.MasterAlias, sourcePosition, err) } + shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) - if err := msdw.wr.TabletManagerClient().VReplicationWaitForPos(shortCtx, masterInfo.Tablet, int(msdw.sourceUID), mysqlPos); err != nil { - cancel() - return "", fmt.Errorf("VReplicationWaitForPos for %v until %v failed: %v", shardInfo.MasterAlias, mysqlPos, err) - } + err = msdw.wr.TabletManagerClient().VReplicationWaitForPos(shortCtx, masterInfo.Tablet, int(msdw.sourceUID), sourcePosition) cancel() + if err != nil { + return "", fmt.Errorf("VReplicationWaitForPos for %v until %v failed: %v", shardInfo.MasterAlias, sourcePosition, err) + } shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) masterPos, err := msdw.wr.TabletManagerClient().MasterPosition(shortCtx, masterInfo.Tablet) @@ -449,11 +449,11 @@ func (msdw *MultiSplitDiffWorker) resumeReplicationOnDestinationMasterUntil(ctx // binlog position, and stop its replication. // (add a cleanup task to restart binlog replication on it, and change // the existing ChangeSlaveType cleanup action to 'spare' type) -func (msdw *MultiSplitDiffWorker) stopReplicationOnDestinationRdOnlys(ctx context.Context, destinationAlias *topodatapb.TabletAlias, masterPos string) error { +func (msdw *MultiSplitDiffWorker) stopReplicationAt(ctx context.Context, destinationAlias *topodatapb.TabletAlias, masterPos string) error { if msdw.waitForFixedTimeRatherThanGtidSet { - msdw.wr.Logger().Infof("Workaround for broken GTID set in destination RDONLY. Just waiting for 1 minute for %v and assuming replication has caught up. (should be at %v)", destinationAlias, masterPos) + msdw.wr.Logger().Infof("workaround for broken GTID set in destination RDONLY. Just waiting for 1 minute for %v and assuming replication has caught up. (should be at %v)", destinationAlias, masterPos) } else { - msdw.wr.Logger().Infof("Waiting for destination tablet %v to catch up to %v", destinationAlias, masterPos) + msdw.wr.Logger().Infof("waiting for destination tablet %v to catch up to %v", destinationAlias, masterPos) } shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) destinationTablet, err := msdw.wr.TopoServer().GetTablet(shortCtx, destinationAlias) @@ -482,8 +482,8 @@ func (msdw *MultiSplitDiffWorker) stopReplicationOnDestinationRdOnlys(ctx contex // restart filtered replication on the destination master. // (remove the cleanup task that does the same) -func (msdw *MultiSplitDiffWorker) restartReplicationOn(ctx context.Context, shardInfo *topo.ShardInfo, masterInfo *topo.TabletInfo) error { - msdw.wr.Logger().Infof("Restarting filtered replication on master %v", shardInfo.MasterAlias) +func (msdw *MultiSplitDiffWorker) startVreplication(ctx context.Context, shardInfo *topo.ShardInfo, masterInfo *topo.TabletInfo) error { + msdw.wr.Logger().Infof("restarting filtered replication on master %v", shardInfo.MasterAlias) shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) _, err := msdw.wr.TabletManagerClient().VReplicationExec(shortCtx, masterInfo.Tablet, binlogplayer.StartVReplication(msdw.sourceUID)) if err != nil { @@ -493,67 +493,164 @@ func (msdw *MultiSplitDiffWorker) restartReplicationOn(ctx context.Context, shar return nil } -// synchronizeReplication phase: -// At this point, the source and the destination tablet are stopped at the same -// point. +func (msdw *MultiSplitDiffWorker) createNonTransactionalTableScanners(ctx context.Context, queryService queryservice.QueryService, source *topo.TabletInfo) ([]TableScanner, error) { + // If we are not using consistent snapshot, we'll use the NonTransactionalTableScanner, + // which does not have any instance state and so can be used by all connections + scanners := make([]TableScanner, msdw.parallelDiffsCount) + scanner := NonTransactionalTableScanner{ + queryService: queryService, + cleaner: msdw.cleaner, + wr: msdw.wr, + tabletAlias: source.Alias, + } -func (msdw *MultiSplitDiffWorker) synchronizeReplication(ctx context.Context) error { + for i := 0; i < msdw.parallelDiffsCount; i++ { + scanners[i] = scanner + } + + return scanners, nil +} + +// synchronizeSrcAndDestTxState phase: +// After this point, the source and the destination tablet are stopped at the same point. +func (msdw *MultiSplitDiffWorker) synchronizeSrcAndDestTxState(ctx context.Context) error { msdw.SetState(WorkerStateSyncReplication) var err error + // 1. Find all the tablets we will need to work with masterInfos := make([]*topo.TabletInfo, len(msdw.destinationAliases)) for i, shardInfo := range msdw.destinationShards { - masterInfos[i], err = msdw.getTabletInfoForShard(ctx, shardInfo) + masterInfos[i], err = msdw.getMasterTabletInfoForShard(ctx, shardInfo) if err != nil { return err } } - destVreplicationPos, err := msdw.stopReplicationOnAllDestinationMasters(ctx, masterInfos) + shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) + source, err := msdw.wr.TopoServer().GetTablet(shortCtx, msdw.sourceAlias) + cancel() + + var sourcePosition string + + // 2. Stop replication on destination + destVreplicationPos, err := msdw.stopVreplicationOnAll(ctx, masterInfos) if err != nil { return err } - mysqlPos, err := msdw.stopReplicationOnSourceRdOnlyTabletAt(ctx, destVreplicationPos) - if err != nil { - return err + // 3. Pause updates on the source and create consistent snapshot connections + if msdw.useConsistentSnapshot { + connections, pos, err := CreateConsistentTableScanners(ctx, source, msdw.wr, msdw.cleaner, msdw.parallelDiffsCount) + if err != nil { + return fmt.Errorf("failed to create transactional connections %v", err.Error()) + } + msdw.sourceScanners = connections + sourcePosition = pos + } else { + sourcePosition, err = msdw.stopReplicationOnSourceTabletAt(ctx, destVreplicationPos) + if err != nil { + return fmt.Errorf("failed to stop replication on source %v", err.Error()) + } + + queryService, err := tabletconn.GetDialer()(source.Tablet, true) + if err != nil { + return fmt.Errorf("failed to instantiate query service for %v: %v", source.Tablet, err.Error()) + } + msdw.sourceScanners, err = msdw.createNonTransactionalTableScanners(ctx, queryService, source) + if err != nil { + return fmt.Errorf("failed to create table scanners %v", err.Error()) + } } + msdw.destinationScanners = make([][]TableScanner, msdw.parallelDiffsCount) + + // 4. Make sure all replicas have caught up with the master for i, shardInfo := range msdw.destinationShards { masterInfo := masterInfos[i] destinationAlias := msdw.destinationAliases[i] - masterPos, err := msdw.resumeReplicationOnDestinationMasterUntil(ctx, shardInfo, mysqlPos, masterInfo) + destinationPosition, err := msdw.stopVreplicationAt(ctx, shardInfo, sourcePosition, masterInfo) if err != nil { return err } - err = msdw.stopReplicationOnDestinationRdOnlys(ctx, destinationAlias, masterPos) + shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) + destTabletInfo, err := msdw.wr.TopoServer().GetTablet(shortCtx, destinationAlias) + cancel() if err != nil { - return err + return fmt.Errorf("waitForDestinationTabletToReach: cannot get Tablet record for master %v: %v", msdw.shardInfo.MasterAlias, err) + } + + queryService, err := tabletconn.GetDialer()(source.Tablet, true) + + if msdw.useConsistentSnapshot { + // loop to wait for the destinationAlias tablet in shardInfo to have reached destinationPosition + err = msdw.waitForDestinationTabletToReach(ctx, destTabletInfo.Tablet, destinationPosition) + if err != nil { + return err + } + + scanners, _, err := CreateConsistentTableScanners(ctx, destTabletInfo, msdw.wr, msdw.cleaner, msdw.parallelDiffsCount) + if err != nil { + return fmt.Errorf("failed to create transactional destination connections") + } + for j, scanner := range scanners { + msdw.destinationScanners[j] = append(msdw.destinationScanners[j], scanner) + } + } else { + err = msdw.stopReplicationAt(ctx, destinationAlias, destinationPosition) + if err != nil { + return fmt.Errorf("failed to stop replication on %v at position %v: %v", destinationAlias, destinationPosition, err.Error()) + } + msdw.destinationScanners[i], err = msdw.createNonTransactionalTableScanners(ctx, queryService, destTabletInfo) + if err != nil { + return fmt.Errorf("failed to stop create table scanners for %v using %v : %v", destTabletInfo, queryService, err.Error()) + } } - err = msdw.restartReplicationOn(ctx, shardInfo, masterInfo) + err = msdw.startVreplication(ctx, shardInfo, masterInfo) if err != nil { - return err + return fmt.Errorf("failed to restart vreplication for shard %v on tablet %v: %v", shardInfo, masterInfo, err.Error()) } - } + } return nil } -func (msdw *MultiSplitDiffWorker) diffSingleTable(ctx context.Context, wg *sync.WaitGroup, tableDefinition *tabletmanagerdatapb.TableDefinition, keyspaceSchema *vindexes.KeyspaceSchema) error { +func (msdw *MultiSplitDiffWorker) waitForDestinationTabletToReach(ctx context.Context, tablet *topodatapb.Tablet, mysqlPos string) error { + for i := 0; i < 20; i++ { + shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) + pos, err := msdw.wr.TabletManagerClient().MasterPosition(shortCtx, tablet) + cancel() + if err != nil { + return fmt.Errorf("get MasterPosition for %v failed: %v", tablet, err) + } + + if pos == mysqlPos { + return nil + } + time.Sleep(time.Second) + } + return fmt.Errorf("failed to reach transaction position after multiple attempts") +} + +func (msdw *MultiSplitDiffWorker) diffSingleTable(ctx context.Context, wg *sync.WaitGroup, tableDefinition *tabletmanagerdatapb.TableDefinition, keyspaceSchema *vindexes.KeyspaceSchema, sourceScanner TableScanner, destinationScanners []TableScanner) error { msdw.wr.Logger().Infof("Starting the diff on table %v", tableDefinition.Name) - sourceQueryResultReader, err := TableScan(ctx, msdw.wr.Logger(), msdw.wr.TopoServer(), msdw.sourceAlias, tableDefinition) + if len(destinationScanners) != len(msdw.destinationAliases) { + return fmt.Errorf("did not receive the expected amount of destination connections") + } + + sourceQueryResultReader, err := sourceScanner.ScanTable(ctx, tableDefinition) if err != nil { return fmt.Errorf("TableScan(source) failed: %v", err) } defer sourceQueryResultReader.Close(ctx) destinationQueryResultReaders := make([]ResultReader, len(msdw.destinationAliases)) - for i, destinationAlias := range msdw.destinationAliases { - destinationQueryResultReader, err := TableScan(ctx, msdw.wr.Logger(), msdw.wr.TopoServer(), destinationAlias, tableDefinition) + for i := range msdw.destinationAliases { + scanner := destinationScanners[i] + destinationQueryResultReader, err := scanner.ScanTable(ctx, tableDefinition) if err != nil { return fmt.Errorf("TableScan(destination) failed: %v", err) } @@ -593,16 +690,16 @@ func (msdw *MultiSplitDiffWorker) diffSingleTable(ctx context.Context, wg *sync. return fmt.Errorf("table %v has differences: %v", tableDefinition.Name, report.String()) } - msdw.wr.Logger().Infof("Table %v checks out (%v rows processed, %v qps)", tableDefinition.Name, report.processedRows, report.processingQPS) + msdw.wr.Logger().Infof("table %v checks out (%v rows processed, %v qps)", tableDefinition.Name, report.processedRows, report.processingQPS) return nil } -func (msdw *MultiSplitDiffWorker) tableDiffingConsumer(ctx context.Context, wg *sync.WaitGroup, tableChan chan *tabletmanagerdatapb.TableDefinition, rec *concurrency.AllErrorRecorder, keyspaceSchema *vindexes.KeyspaceSchema) { +func (msdw *MultiSplitDiffWorker) tableDiffingConsumer(ctx context.Context, wg *sync.WaitGroup, tableChan chan *tabletmanagerdatapb.TableDefinition, rec *concurrency.AllErrorRecorder, keyspaceSchema *vindexes.KeyspaceSchema, sourceScanner TableScanner, destinationScanners []TableScanner) { defer wg.Done() for tableDefinition := range tableChan { - err := msdw.diffSingleTable(ctx, wg, tableDefinition, keyspaceSchema) + err := msdw.diffSingleTable(ctx, wg, tableDefinition, keyspaceSchema, sourceScanner, destinationScanners) if err != nil { msdw.markAsWillFail(rec, err) msdw.wr.Logger().Errorf("%v", err) @@ -611,7 +708,7 @@ func (msdw *MultiSplitDiffWorker) tableDiffingConsumer(ctx context.Context, wg * } func (msdw *MultiSplitDiffWorker) gatherSchemaInfo(ctx context.Context) ([]*tabletmanagerdatapb.SchemaDefinition, *tabletmanagerdatapb.SchemaDefinition, error) { - msdw.wr.Logger().Infof("Gathering schema information...") + msdw.wr.Logger().Infof("gathering schema information...") wg := sync.WaitGroup{} rec := &concurrency.AllErrorRecorder{} @@ -630,7 +727,7 @@ func (msdw *MultiSplitDiffWorker) gatherSchemaInfo(ctx context.Context) ([]*tabl msdw.markAsWillFail(rec, err) } destinationSchemaDefinitions[i] = destinationSchemaDefinition - msdw.wr.Logger().Infof("Got schema from destination %v", destinationAlias) + msdw.wr.Logger().Infof("got schema from destination %v", destinationAlias) wg.Done() }(i, destinationAlias) } @@ -644,7 +741,7 @@ func (msdw *MultiSplitDiffWorker) gatherSchemaInfo(ctx context.Context) ([]*tabl if err != nil { msdw.markAsWillFail(rec, err) } - msdw.wr.Logger().Infof("Got schema from source %v", msdw.sourceAlias) + msdw.wr.Logger().Infof("got schema from source %v", msdw.sourceAlias) wg.Done() }() @@ -656,8 +753,8 @@ func (msdw *MultiSplitDiffWorker) gatherSchemaInfo(ctx context.Context) ([]*tabl return destinationSchemaDefinitions, sourceSchemaDefinition, nil } -func (msdw *MultiSplitDiffWorker) diffSchemaInformation(ctx context.Context, destinationSchemaDefinitions []*tabletmanagerdatapb.SchemaDefinition, sourceSchemaDefinition *tabletmanagerdatapb.SchemaDefinition) { - msdw.wr.Logger().Infof("Diffing the schema...") +func (msdw *MultiSplitDiffWorker) diffSchemaInformation(ctx context.Context, destinationSchemaDefinitions []*tabletmanagerdatapb.SchemaDefinition, sourceSchemaDefinition *tabletmanagerdatapb.SchemaDefinition) error { + msdw.wr.Logger().Infof("diffing the schema...") rec := &concurrency.AllErrorRecorder{} sourceShardName := fmt.Sprintf("%v/%v", msdw.shardInfo.Keyspace(), msdw.shardInfo.ShardName()) for i, destinationSchemaDefinition := range destinationSchemaDefinitions { @@ -666,10 +763,12 @@ func (msdw *MultiSplitDiffWorker) diffSchemaInformation(ctx context.Context, des tmutils.DiffSchema(destinationShardName, destinationSchemaDefinition, sourceShardName, sourceSchemaDefinition, rec) } if rec.HasErrors() { - msdw.wr.Logger().Warningf("Different schemas: %v", rec.Error().Error()) - } else { - msdw.wr.Logger().Infof("Schema match, good.") + msdw.wr.Logger().Warningf("different schemas: %v", rec.Error().Error()) + return rec.Error() } + + msdw.wr.Logger().Infof("schema match, good.") + return nil } func (msdw *MultiSplitDiffWorker) loadVSchema(ctx context.Context) (*vindexes.KeyspaceSchema, error) { @@ -702,7 +801,10 @@ func (msdw *MultiSplitDiffWorker) diff(ctx context.Context) error { if err != nil { return err } - msdw.diffSchemaInformation(ctx, destinationSchemaDefinitions, sourceSchemaDefinition) + err = msdw.diffSchemaInformation(ctx, destinationSchemaDefinitions, sourceSchemaDefinition) + if err != nil { + return fmt.Errorf("schema comparison failed: %v", err) + } // read the vschema if needed var keyspaceSchema *vindexes.KeyspaceSchema @@ -713,7 +815,7 @@ func (msdw *MultiSplitDiffWorker) diff(ctx context.Context) error { } } - msdw.wr.Logger().Infof("Running the diffs...") + msdw.wr.Logger().Infof("running the diffs...") tableDefinitions := sourceSchemaDefinition.TableDefinitions rec := &concurrency.AllErrorRecorder{} @@ -730,7 +832,7 @@ func (msdw *MultiSplitDiffWorker) diff(ctx context.Context) error { // start as many goroutines we want parallel diffs running for i := 0; i < msdw.parallelDiffsCount; i++ { consumers.Add(1) - go msdw.tableDiffingConsumer(ctx, &consumers, tableChan, rec, keyspaceSchema) + go msdw.tableDiffingConsumer(ctx, &consumers, tableChan, rec, keyspaceSchema, msdw.sourceScanners[i], msdw.destinationScanners[i]) } // wait for all consumers to wrap up their work diff --git a/go/vt/worker/multi_split_diff_cmd.go b/go/vt/worker/multi_split_diff_cmd.go index 353e47790e7..44562b29658 100644 --- a/go/vt/worker/multi_split_diff_cmd.go +++ b/go/vt/worker/multi_split_diff_cmd.go @@ -25,6 +25,8 @@ import ( "strings" "sync" + "vitess.io/vitess/go/vt/proto/topodata" + "golang.org/x/net/context" "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/topo/topoproto" @@ -60,14 +62,22 @@ const multiSplitDiffHTML2 = `

+ + +

- -
+ +


+ + ?
@@ -79,10 +89,12 @@ var multiSplitDiffTemplate = mustParseTemplate("multiSplitDiff", multiSplitDiffH var multiSplitDiffTemplate2 = mustParseTemplate("multiSplitDiff2", multiSplitDiffHTML2) func commandMultiSplitDiff(wi *Instance, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) (Worker, error) { + tabletTypeStr := subFlags.String("tablet_type", "RDONLY", "type of tablet used") excludeTables := subFlags.String("exclude_tables", "", "comma separated list of tables to exclude") - minHealthyRdonlyTablets := subFlags.Int("min_healthy_rdonly_tablets", defaultMinHealthyRdonlyTablets, "minimum number of healthy RDONLY tablets before taking out one") + minHealthyTablets := subFlags.Int("min_healthy_tablets", defaultMinHealthyTablets, "minimum number of healthy tablets before taking out one") parallelDiffsCount := subFlags.Int("parallel_diffs_count", defaultParallelDiffsCount, "number of tables to diff in parallel") waitForFixedTimeRatherThanGtidSet := subFlags.Bool("wait_for_fixed_time_rather_than_gtid_set", false, "wait for 1m when syncing up the destination RDONLY tablet rather than using the GTID set. Use this when the GTID set on the RDONLY is broken. Make sure the RDONLY is not behind in replication when using this flag.") + useConsistentSnapshot := subFlags.Bool("use_consistent_snapshot", defaultUseConsistentSnapshot, "Instead of pausing replication on the source, uses transactions with consistent snapshot to have a stable view of the data.") if err := subFlags.Parse(args); err != nil { return nil, err } @@ -98,7 +110,13 @@ func commandMultiSplitDiff(wi *Instance, wr *wrangler.Wrangler, subFlags *flag.F if *excludeTables != "" { excludeTableArray = strings.Split(*excludeTables, ",") } - return NewMultiSplitDiffWorker(wr, wi.cell, keyspace, shard, excludeTableArray, *minHealthyRdonlyTablets, *parallelDiffsCount, *waitForFixedTimeRatherThanGtidSet), nil + + tabletType, ok := topodata.TabletType_value[*tabletTypeStr] + if !ok { + return nil, fmt.Errorf("failed to find this tablet type %v", tabletTypeStr) + } + + return NewMultiSplitDiffWorker(wr, wi.cell, keyspace, shard, excludeTableArray, *minHealthyTablets, *parallelDiffsCount, *waitForFixedTimeRatherThanGtidSet, *useConsistentSnapshot, topodata.TabletType(tabletType)), nil } // shardSources returns all the shards that are SourceShards of at least one other shard. @@ -168,7 +186,7 @@ func shardSources(ctx context.Context, wr *wrangler.Wrangler) ([]map[string]stri return result, nil } -func interactiveMultiSplitDiff(ctx context.Context, wi *Instance, wr *wrangler.Wrangler, w http.ResponseWriter, r *http.Request) (Worker, *template.Template, map[string]interface{}, error) { +func interactiveMultiSplitDiff(ctx context.Context, wi *Instance, wr *wrangler.Wrangler, _ http.ResponseWriter, r *http.Request) (Worker, *template.Template, map[string]interface{}, error) { if err := r.ParseForm(); err != nil { return nil, nil, nil, fmt.Errorf("cannot parse form: %s", err) } @@ -194,7 +212,7 @@ func interactiveMultiSplitDiff(ctx context.Context, wi *Instance, wr *wrangler.W result["Keyspace"] = keyspace result["Shard"] = shard result["DefaultSourceUID"] = "0" - result["DefaultMinHealthyRdonlyTablets"] = fmt.Sprintf("%v", defaultMinHealthyRdonlyTablets) + result["DefaultMinHealthyTablets"] = fmt.Sprintf("%v", defaultMinHealthyTablets) result["DefaultParallelDiffsCount"] = fmt.Sprintf("%v", defaultParallelDiffsCount) return nil, multiSplitDiffTemplate2, result, nil } @@ -205,21 +223,26 @@ func interactiveMultiSplitDiff(ctx context.Context, wi *Instance, wr *wrangler.W if excludeTables != "" { excludeTableArray = strings.Split(excludeTables, ",") } - minHealthyRdonlyTabletsStr := r.FormValue("minHealthyRdonlyTablets") + minHealthyTabletsStr := r.FormValue("minHealthyTablets") parallelDiffsCountStr := r.FormValue("parallelDiffsCount") - minHealthyRdonlyTablets, err := strconv.ParseInt(minHealthyRdonlyTabletsStr, 0, 64) + minHealthyTablets, err := strconv.ParseInt(minHealthyTabletsStr, 0, 64) parallelDiffsCount, err := strconv.ParseInt(parallelDiffsCountStr, 0, 64) if err != nil { - return nil, nil, nil, fmt.Errorf("cannot parse minHealthyRdonlyTablets: %s", err) + return nil, nil, nil, fmt.Errorf("cannot parse minHealthyTablets: %s", err) } waitForFixedTimeRatherThanGtidSetStr := r.FormValue("waitForFixedTimeRatherThanGtidSet") waitForFixedTimeRatherThanGtidSet := waitForFixedTimeRatherThanGtidSetStr == "true" - if err != nil { - return nil, nil, nil, fmt.Errorf("cannot parse minHealthyRdonlyTablets: %s", err) + useConsistentSnapshotStr := r.FormValue("useConsistentSnapshot") + useConsistentSnapshot := useConsistentSnapshotStr == "true" + + tabletTypeStr := r.FormValue("tabletType") + tabletType, ok := topodata.TabletType_value[tabletTypeStr] + if !ok { + return nil, nil, nil, fmt.Errorf("cannot parse tabletType: %s", tabletTypeStr) } // start the diff job - wrk := NewMultiSplitDiffWorker(wr, wi.cell, keyspace, shard, excludeTableArray, int(minHealthyRdonlyTablets), int(parallelDiffsCount), waitForFixedTimeRatherThanGtidSet) + wrk := NewMultiSplitDiffWorker(wr, wi.cell, keyspace, shard, excludeTableArray, int(minHealthyTablets), int(parallelDiffsCount), waitForFixedTimeRatherThanGtidSet, useConsistentSnapshot, topodata.TabletType(tabletType)) return wrk, nil, nil, nil } diff --git a/go/vt/worker/restartable_result_reader.go b/go/vt/worker/restartable_result_reader.go index 73fadf04145..6fa8a814901 100644 --- a/go/vt/worker/restartable_result_reader.go +++ b/go/vt/worker/restartable_result_reader.go @@ -23,6 +23,8 @@ import ( "strings" "time" + "github.com/golang/glog" + "vitess.io/vitess/go/vt/vterrors" "golang.org/x/net/context" @@ -53,6 +55,8 @@ type RestartableResultReader struct { chunk chunk // allowMultipleRetries is true if we are allowed to retry more than once. allowMultipleRetries bool + // if we are running inside a transaction, this will hold a non-zero value + txID int64 query string @@ -82,6 +86,15 @@ func NewRestartableResultReader(ctx context.Context, logger logutil.Logger, tp t allowMultipleRetries: allowMultipleRetries, } + err := tryToConnect(r) + if err != nil { + return nil, err + } + return r, nil +} + +func tryToConnect(r *RestartableResultReader) error { + // If the initial connection fails we retry once. // Note: The first retry will be the second attempt. attempt := 0 @@ -97,15 +110,35 @@ func NewRestartableResultReader(ctx context.Context, logger logutil.Logger, tp t err = fmt.Errorf("tablet=%v: %v", topoproto.TabletAliasString(r.tablet.Alias), err) goto retry } - return r, nil + return nil retry: if !retryable || attempt > 1 { - return nil, fmt.Errorf("failed to initialize tablet connection: retryable %v, %v", retryable, err) + return fmt.Errorf("failed to initialize tablet connection: retryable %v, %v", retryable, err) } statsRetryCount.Add(1) - logger.Infof("retrying after error: %v", err) + glog.Infof("retrying after error: %v", err) } + +} + +// NewTransactionalRestartableResultReader does the same thing that NewRestartableResultReader does, +// but works inside of a single transaction +func NewTransactionalRestartableResultReader(ctx context.Context, logger logutil.Logger, tp tabletProvider, td *tabletmanagerdatapb.TableDefinition, chunk chunk, allowMultipleRetries bool, txID int64) (*RestartableResultReader, error) { + r := &RestartableResultReader{ + ctx: ctx, + logger: logger, + tp: tp, + td: td, + chunk: chunk, + allowMultipleRetries: allowMultipleRetries, + txID: txID, + } + err := tryToConnect(r) + if err != nil { + return nil, err + } + return r, nil } // getTablet (re)sets the tablet which is used for the streaming query. @@ -145,11 +178,21 @@ func (r *RestartableResultReader) getTablet() (bool, error) { func (r *RestartableResultReader) startStream() (bool, error) { // Start the streaming query. r.generateQuery() - stream := queryservice.ExecuteWithStreamer(r.ctx, r.conn, &querypb.Target{ - Keyspace: r.tablet.Keyspace, - Shard: r.tablet.Shard, - TabletType: r.tablet.Type, - }, r.query, make(map[string]*querypb.BindVariable), nil) + var stream sqltypes.ResultStream + + if r.txID == 0 { + stream = queryservice.ExecuteWithStreamer(r.ctx, r.conn, &querypb.Target{ + Keyspace: r.tablet.Keyspace, + Shard: r.tablet.Shard, + TabletType: r.tablet.Type, + }, r.query, make(map[string]*querypb.BindVariable), nil) + } else { + stream = queryservice.ExecuteWithTransactionalStreamer(r.ctx, r.conn, &querypb.Target{ + Keyspace: r.tablet.Keyspace, + Shard: r.tablet.Shard, + TabletType: r.tablet.Type, + }, r.query, make(map[string]*querypb.BindVariable), r.txID, nil) + } // Read the fields information. cols, err := stream.Recv() diff --git a/go/vt/worker/result_merger.go b/go/vt/worker/result_merger.go index 68250d06ad0..5e4c83544b1 100644 --- a/go/vt/worker/result_merger.go +++ b/go/vt/worker/result_merger.go @@ -21,9 +21,9 @@ import ( "fmt" "io" - "vitess.io/vitess/go/vt/vterrors" - "github.com/golang/protobuf/proto" + "golang.org/x/net/context" + "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/sqltypes" @@ -115,6 +115,13 @@ func (rm *ResultMerger) Fields() []*querypb.Field { return rm.fields } +// Close closes all inputs +func (rm *ResultMerger) Close(ctx context.Context) { + for _, i := range rm.inputs { + i.Close(ctx) + } +} + // Next returns the next Result in the sorted, merged stream. // It is part of the ResultReader interface. func (rm *ResultMerger) Next() (*sqltypes.Result, error) { diff --git a/go/vt/worker/result_merger_test.go b/go/vt/worker/result_merger_test.go index 10b37f3261f..c450bf7dedc 100644 --- a/go/vt/worker/result_merger_test.go +++ b/go/vt/worker/result_merger_test.go @@ -23,6 +23,8 @@ import ( "testing" "time" + "golang.org/x/net/context" + "vitess.io/vitess/go/sqltypes" querypb "vitess.io/vitess/go/vt/proto/query" @@ -110,6 +112,9 @@ func (f *fakeResultReader) Fields() []*querypb.Field { return f.fields } +func (f *fakeResultReader) Close(ctx context.Context) { +} + // Next returns the next fake result. It is part of the ResultReader interface. func (f *fakeResultReader) Next() (*sqltypes.Result, error) { if f.rowsReturned == f.rowsTotal { @@ -387,6 +392,10 @@ func (m *memoryResultReader) Next() (*sqltypes.Result, error) { return result, nil } +func (m *memoryResultReader) Close(ctx context.Context) { + // intentionally blank. we have nothing we need to close +} + // benchmarkResult is a package level variable whose sole purpose is to // reference output from the Benchmark* functions below. // This was suggested by http://dave.cheney.net/2013/06/30/how-to-write-benchmarks-in-go diff --git a/go/vt/worker/result_reader.go b/go/vt/worker/result_reader.go index 51ccda42367..231e497b054 100644 --- a/go/vt/worker/result_reader.go +++ b/go/vt/worker/result_reader.go @@ -17,6 +17,7 @@ limitations under the License. package worker import ( + "golang.org/x/net/context" "vitess.io/vitess/go/sqltypes" querypb "vitess.io/vitess/go/vt/proto/query" @@ -39,4 +40,5 @@ type ResultReader interface { // It returns the next result on the stream. // It will return io.EOF if the stream ended. Next() (*sqltypes.Result, error) + Close(ctx context.Context) } diff --git a/go/vt/worker/split_clone.go b/go/vt/worker/split_clone.go index 3cc7638d4a1..0772270e59a 100644 --- a/go/vt/worker/split_clone.go +++ b/go/vt/worker/split_clone.go @@ -24,14 +24,16 @@ import ( "sync" "time" + "vitess.io/vitess/go/vt/vttablet/tabletconn" + + "vitess.io/vitess/go/vt/binlog/binlogplayer" + "vitess.io/vitess/go/vt/vterrors" "golang.org/x/net/context" "vitess.io/vitess/go/event" "vitess.io/vitess/go/stats" - "vitess.io/vitess/go/sync2" - "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/throttler" @@ -64,28 +66,30 @@ var servingTypes = []topodatapb.TabletType{topodatapb.TabletType_MASTER, topodat type SplitCloneWorker struct { StatusWorker - wr *wrangler.Wrangler - cloneType cloneType - cell string - destinationKeyspace string - shard string - online bool - offline bool + wr *wrangler.Wrangler + cloneType cloneType + cell string + destinationKeyspace string + shard string + online bool + offline bool + useConsistentSnapshot bool // verticalSplit only: List of tables which should be split out. tables []string // horizontalResharding only: List of tables which will be skipped. - excludeTables []string - chunkCount int - minRowsPerChunk int - sourceReaderCount int - writeQueryMaxRows int - writeQueryMaxSize int - destinationWriterCount int - minHealthyRdonlyTablets int - maxTPS int64 - maxReplicationLag int64 - cleaner *wrangler.Cleaner - tabletTracker *TabletTracker + excludeTables []string + chunkCount int + minRowsPerChunk int + sourceReaderCount int + writeQueryMaxRows int + writeQueryMaxSize int + destinationWriterCount int + minHealthyTablets int + tabletType topodatapb.TabletType + maxTPS int64 + maxReplicationLag int64 + cleaner *wrangler.Cleaner + tabletTracker *TabletTracker // populated during WorkerStateInit, read-only after that destinationKeyspaceInfo *topo.KeyspaceInfo @@ -101,6 +105,9 @@ type SplitCloneWorker struct { // populated during WorkerStateFindTargets, read-only after that sourceTablets []*topodatapb.Tablet + lastPos string // contains the GTID position for the source + transactions []int64 + // shardWatchers contains a TopologyWatcher for each source and destination // shard. It updates the list of tablets in the healthcheck if replicas are // added/removed. @@ -118,15 +125,15 @@ type SplitCloneWorker struct { // Throttlers will be added/removed during WorkerStateClone(Online|Offline). throttlers map[string]*throttler.Throttler - // offlineSourceAliases has the list of tablets (per source shard) we took + // sourceAliases has the list of tablets (per source shard) we took // offline for the WorkerStateCloneOffline phase. // Populated shortly before WorkerStateCloneOffline, read-only after that. - offlineSourceAliases []*topodatapb.TabletAlias + sourceAliases []*topodatapb.TabletAlias // formattedOfflineSourcesMu guards all fields in this group. formattedOfflineSourcesMu sync.Mutex // formattedOfflineSources is a space separated list of - // "offlineSourceAliases". It is used by the StatusAs* methods to output the + // "sourceAliases". It is used by the StatusAs* methods to output the // used source tablets during the offline clone phase. formattedOfflineSources string @@ -140,20 +147,20 @@ type SplitCloneWorker struct { } // newSplitCloneWorker returns a new worker object for the SplitClone command. -func newSplitCloneWorker(wr *wrangler.Wrangler, cell, keyspace, shard string, online, offline bool, excludeTables []string, chunkCount, minRowsPerChunk, sourceReaderCount, writeQueryMaxRows, writeQueryMaxSize, destinationWriterCount, minHealthyRdonlyTablets int, maxTPS, maxReplicationLag int64) (Worker, error) { - return newCloneWorker(wr, horizontalResharding, cell, keyspace, shard, online, offline, nil /* tables */, excludeTables, chunkCount, minRowsPerChunk, sourceReaderCount, writeQueryMaxRows, writeQueryMaxSize, destinationWriterCount, minHealthyRdonlyTablets, maxTPS, maxReplicationLag) +func newSplitCloneWorker(wr *wrangler.Wrangler, cell, keyspace, shard string, online, offline bool, excludeTables []string, chunkCount, minRowsPerChunk, sourceReaderCount, writeQueryMaxRows, writeQueryMaxSize, destinationWriterCount, minHealthyTablets int, tabletType topodatapb.TabletType, maxTPS, maxReplicationLag int64, useConsistentSnapshot bool) (Worker, error) { + return newCloneWorker(wr, horizontalResharding, cell, keyspace, shard, online, offline, nil /* tables */, excludeTables, chunkCount, minRowsPerChunk, sourceReaderCount, writeQueryMaxRows, writeQueryMaxSize, destinationWriterCount, minHealthyTablets, tabletType, maxTPS, maxReplicationLag, useConsistentSnapshot) } // newVerticalSplitCloneWorker returns a new worker object for the // VerticalSplitClone command. -func newVerticalSplitCloneWorker(wr *wrangler.Wrangler, cell, keyspace, shard string, online, offline bool, tables []string, chunkCount, minRowsPerChunk, sourceReaderCount, writeQueryMaxRows, writeQueryMaxSize, destinationWriterCount, minHealthyRdonlyTablets int, maxTPS, maxReplicationLag int64) (Worker, error) { - return newCloneWorker(wr, verticalSplit, cell, keyspace, shard, online, offline, tables, nil /* excludeTables */, chunkCount, minRowsPerChunk, sourceReaderCount, writeQueryMaxRows, writeQueryMaxSize, destinationWriterCount, minHealthyRdonlyTablets, maxTPS, maxReplicationLag) +func newVerticalSplitCloneWorker(wr *wrangler.Wrangler, cell, keyspace, shard string, online, offline bool, tables []string, chunkCount, minRowsPerChunk, sourceReaderCount, writeQueryMaxRows, writeQueryMaxSize, destinationWriterCount, minHealthyTablets int, tabletType topodatapb.TabletType, maxTPS, maxReplicationLag int64, useConsistentSnapshot bool) (Worker, error) { + return newCloneWorker(wr, verticalSplit, cell, keyspace, shard, online, offline, tables, nil /* excludeTables */, chunkCount, minRowsPerChunk, sourceReaderCount, writeQueryMaxRows, writeQueryMaxSize, destinationWriterCount, minHealthyTablets, tabletType, maxTPS, maxReplicationLag, useConsistentSnapshot) } // newCloneWorker returns a new SplitCloneWorker object which is used both by // the SplitClone and VerticalSplitClone command. // TODO(mberlin): Rename SplitCloneWorker to cloneWorker. -func newCloneWorker(wr *wrangler.Wrangler, cloneType cloneType, cell, keyspace, shard string, online, offline bool, tables, excludeTables []string, chunkCount, minRowsPerChunk, sourceReaderCount, writeQueryMaxRows, writeQueryMaxSize, destinationWriterCount, minHealthyRdonlyTablets int, maxTPS, maxReplicationLag int64) (Worker, error) { +func newCloneWorker(wr *wrangler.Wrangler, cloneType cloneType, cell, keyspace, shard string, online, offline bool, tables, excludeTables []string, chunkCount, minRowsPerChunk, sourceReaderCount, writeQueryMaxRows, writeQueryMaxSize, destinationWriterCount, minHealthyTablets int, tabletType topodatapb.TabletType, maxTPS, maxReplicationLag int64, useConsistentSnapshot bool) (Worker, error) { if cloneType != horizontalResharding && cloneType != verticalSplit { return nil, fmt.Errorf("unknown cloneType: %v This is a bug. Please report", cloneType) } @@ -183,8 +190,8 @@ func newCloneWorker(wr *wrangler.Wrangler, cloneType cloneType, cell, keyspace, if destinationWriterCount <= 0 { return nil, fmt.Errorf("destination_writer_count must be > 0: %v", destinationWriterCount) } - if minHealthyRdonlyTablets < 0 { - return nil, fmt.Errorf("min_healthy_rdonly_tablets must be >= 0: %v", minHealthyRdonlyTablets) + if minHealthyTablets < 0 { + return nil, fmt.Errorf("min_healthy_tablets must be >= 0: %v", minHealthyTablets) } if maxTPS != throttler.MaxRateModuleDisabled { wr.Logger().Infof("throttling enabled and set to a max of %v transactions/second", maxTPS) @@ -195,35 +202,39 @@ func newCloneWorker(wr *wrangler.Wrangler, cloneType cloneType, cell, keyspace, if maxReplicationLag <= 0 { return nil, fmt.Errorf("max_replication_lag must be >= 1s: %v", maxReplicationLag) } + if tabletType != topodatapb.TabletType_REPLICA && tabletType != topodatapb.TabletType_RDONLY { + return nil, fmt.Errorf("tablet_type must be RDONLY or REPLICA: %v", topodatapb.TabletType_name[int32(tabletType)]) + } scw := &SplitCloneWorker{ - StatusWorker: NewStatusWorker(), - wr: wr, - cloneType: cloneType, - cell: cell, - destinationKeyspace: keyspace, - shard: shard, - online: online, - offline: offline, - tables: tables, - excludeTables: excludeTables, - chunkCount: chunkCount, - minRowsPerChunk: minRowsPerChunk, - sourceReaderCount: sourceReaderCount, - writeQueryMaxRows: writeQueryMaxRows, - writeQueryMaxSize: writeQueryMaxSize, - destinationWriterCount: destinationWriterCount, - minHealthyRdonlyTablets: minHealthyRdonlyTablets, - maxTPS: maxTPS, - maxReplicationLag: maxReplicationLag, - cleaner: &wrangler.Cleaner{}, - tabletTracker: NewTabletTracker(), - throttlers: make(map[string]*throttler.Throttler), + StatusWorker: NewStatusWorker(), + wr: wr, + cloneType: cloneType, + cell: cell, + destinationKeyspace: keyspace, + shard: shard, + online: online, + offline: offline, + tables: tables, + excludeTables: excludeTables, + chunkCount: chunkCount, + minRowsPerChunk: minRowsPerChunk, + sourceReaderCount: sourceReaderCount, + writeQueryMaxRows: writeQueryMaxRows, + writeQueryMaxSize: writeQueryMaxSize, + destinationWriterCount: destinationWriterCount, + minHealthyTablets: minHealthyTablets, + maxTPS: maxTPS, + maxReplicationLag: maxReplicationLag, + cleaner: &wrangler.Cleaner{}, + tabletTracker: NewTabletTracker(), + throttlers: make(map[string]*throttler.Throttler), destinationDbNames: make(map[string]string), tableStatusListOnline: &tableStatusList{}, tableStatusListOffline: &tableStatusList{}, + useConsistentSnapshot: useConsistentSnapshot, } scw.initializeEventDescriptor() return scw, nil @@ -424,6 +435,29 @@ func (scw *SplitCloneWorker) Run(ctx context.Context) error { return nil } +func (scw *SplitCloneWorker) disableUniquenessCheckOnDestinationTablets(ctx context.Context) error { + for _, si := range scw.destinationShards { + tablets := scw.tsc.GetHealthyTabletStats(si.Keyspace(), si.ShardName(), topodatapb.TabletType_MASTER) + if len(tablets) != 1 { + return fmt.Errorf("should have exactly one MASTER tablet, have %v", len(tablets)) + } + tablet := tablets[0].Tablet + cmd := "SET UNIQUE_CHECKS=0" + scw.wr.Logger().Infof("disabling uniqueness checks on %v", topoproto.TabletAliasString(tablet.Alias)) + _, err := scw.wr.TabletManagerClient().ExecuteFetchAsApp(ctx, tablet, true, []byte(cmd), 0) + if err != nil { + return fmt.Errorf("should have exactly one MASTER tablet, have %v", len(tablets)) + } + scw.cleaner.Record("EnableUniquenessChecks", topoproto.TabletAliasString(tablet.Alias), func(ctx context.Context, wr *wrangler.Wrangler) error { + cmd := "SET UNIQUE_CHECKS=1" + _, err := scw.wr.TabletManagerClient().ExecuteFetchAsApp(ctx, tablet, true, []byte(cmd), 0) + return err + }) + } + + return nil +} + func (scw *SplitCloneWorker) run(ctx context.Context) error { // Phase 1: read what we need to do. if err := scw.init(ctx); err != nil { @@ -476,9 +510,15 @@ func (scw *SplitCloneWorker) run(ctx context.Context) error { time.Sleep(1 * time.Second) } - // 4a: Take source tablets out of serving for an exact snapshot. - if err := scw.findOfflineSourceTablets(ctx); err != nil { - return vterrors.Wrap(err, "findSourceTablets() failed") + // 4a: Make sure the sources are producing a stable view of the data + if scw.useConsistentSnapshot { + if err := scw.findTransactionalSources(ctx); err != nil { + return vterrors.Wrap(err, "findSourceTablets() failed") + } + } else { + if err := scw.findOfflineSourceTablets(ctx); err != nil { + return vterrors.Wrap(err, "findSourceTablets() failed") + } } if err := checkDone(ctx); err != nil { return err @@ -489,6 +529,10 @@ func (scw *SplitCloneWorker) run(ctx context.Context) error { if err := scw.clone(ctx, WorkerStateCloneOffline); err != nil { return vterrors.Wrap(err, "offline clone() failed") } + if err := scw.setUpVReplication(ctx); err != nil { + return fmt.Errorf("failed to set up replication: %v", err) + } + d := time.Since(start) if err := checkDone(ctx); err != nil { return err @@ -583,6 +627,10 @@ func (scw *SplitCloneWorker) initShardsForHorizontalResharding(ctx context.Conte scw.destinationShards = os.Left } + if scw.useConsistentSnapshot && len(scw.sourceShards) > 1 { + return fmt.Errorf("cannot use consistent snapshot against multiple source shards") + } + return nil } @@ -705,20 +753,20 @@ func (scw *SplitCloneWorker) findOfflineSourceTablets(ctx context.Context) error scw.setState(WorkerStateFindTargets) // find an appropriate tablet in the source shards - scw.offlineSourceAliases = make([]*topodatapb.TabletAlias, len(scw.sourceShards)) + scw.sourceAliases = make([]*topodatapb.TabletAlias, len(scw.sourceShards)) for i, si := range scw.sourceShards { var err error - scw.offlineSourceAliases[i], err = FindWorkerTablet(ctx, scw.wr, scw.cleaner, scw.tsc, scw.cell, si.Keyspace(), si.ShardName(), scw.minHealthyRdonlyTablets, topodatapb.TabletType_RDONLY) + scw.sourceAliases[i], err = FindWorkerTablet(ctx, scw.wr, scw.cleaner, scw.tsc, scw.cell, si.Keyspace(), si.ShardName(), scw.minHealthyTablets, scw.tabletType) if err != nil { return vterrors.Wrapf(err, "FindWorkerTablet() failed for %v/%v/%v", scw.cell, si.Keyspace(), si.ShardName()) } - scw.wr.Logger().Infof("Using tablet %v as source for %v/%v", topoproto.TabletAliasString(scw.offlineSourceAliases[i]), si.Keyspace(), si.ShardName()) + scw.wr.Logger().Infof("Using tablet %v as source for %v/%v", topoproto.TabletAliasString(scw.sourceAliases[i]), si.Keyspace(), si.ShardName()) } - scw.setFormattedOfflineSources(scw.offlineSourceAliases) + scw.setFormattedOfflineSources(scw.sourceAliases) // get the tablet info for them, and stop their replication - scw.sourceTablets = make([]*topodatapb.Tablet, len(scw.offlineSourceAliases)) - for i, alias := range scw.offlineSourceAliases { + scw.sourceTablets = make([]*topodatapb.Tablet, len(scw.sourceAliases)) + for i, alias := range scw.sourceAliases { shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) ti, err := scw.wr.TopoServer().GetTablet(shortCtx, alias) cancel() @@ -740,6 +788,44 @@ func (scw *SplitCloneWorker) findOfflineSourceTablets(ctx context.Context) error return nil } +// findTransactionalSources phase: +// - get the aliases of all the source tablets +func (scw *SplitCloneWorker) findTransactionalSources(ctx context.Context) error { + scw.setState(WorkerStateFindTargets) + + if len(scw.sourceShards) > 1 { + return fmt.Errorf("consistent snapshot can only be used with a single source shard") + } + var err error + + // find an appropriate tablet in the source shard + si := scw.sourceShards[0] + scw.sourceAliases = make([]*topodatapb.TabletAlias, 1) + scw.sourceAliases[0], err = FindHealthyTablet(ctx, scw.wr, scw.tsc, scw.cell, si.Keyspace(), si.ShardName(), scw.minHealthyTablets, scw.tabletType) + if err != nil { + return fmt.Errorf("FindHealthyTablet() failed for %v/%v/%v: %v", scw.cell, si.Keyspace(), si.ShardName(), err) + } + scw.wr.Logger().Infof("Using tablet %v as source for %v/%v", topoproto.TabletAliasString(scw.sourceAliases[0]), si.Keyspace(), si.ShardName()) + scw.setFormattedOfflineSources(scw.sourceAliases) + + // get the tablet info + scw.sourceTablets = make([]*topodatapb.Tablet, 1) + shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) + ti, err := scw.wr.TopoServer().GetTablet(shortCtx, scw.sourceAliases[0]) + cancel() + if err != nil { + return fmt.Errorf("cannot read tablet %v: %v", topoproto.TabletAliasString(scw.sourceAliases[0]), err) + } + scw.sourceTablets[0] = ti.Tablet + + // stop replication and create transactions to work on + txs, gtid, err := CreateConsistentTransactions(ctx, ti, scw.wr, scw.cleaner, scw.sourceReaderCount) + scw.wr.Logger().Infof("created %v transactions", len(txs)) + scw.lastPos = gtid + scw.transactions = txs + return nil +} + // findDestinationMasters finds for each destination shard the current master. func (scw *SplitCloneWorker) findDestinationMasters(ctx context.Context) error { scw.setState(WorkerStateFindTargets) @@ -748,9 +834,10 @@ func (scw *SplitCloneWorker) findDestinationMasters(ctx context.Context) error { scw.wr.Logger().Infof("Finding a MASTER tablet for each destination shard...") for _, si := range scw.destinationShards { waitCtx, waitCancel := context.WithTimeout(ctx, *waitForHealthyTabletsTimeout) - defer waitCancel() - if err := scw.tsc.WaitForTablets(waitCtx, scw.cell, si.Keyspace(), si.ShardName(), topodatapb.TabletType_MASTER); err != nil { - return vterrors.Wrapf(err, "cannot find MASTER tablet for destination shard for %v/%v (in cell: %v)", si.Keyspace(), si.ShardName(), scw.cell) + err := scw.tsc.WaitForTablets(waitCtx, scw.cell, si.Keyspace(), si.ShardName(), topodatapb.TabletType_MASTER) + waitCancel() + if err != nil { + return vterrors.Wrapf(err, "cannot find MASTER tablet for destination shard for %v/%v (in cell: %v): %v", si.Keyspace(), si.ShardName(), scw.cell) } masters := scw.tsc.GetHealthyTabletStats(si.Keyspace(), si.ShardName(), topodatapb.TabletType_MASTER) if len(masters) == 0 { @@ -775,18 +862,18 @@ func (scw *SplitCloneWorker) waitForTablets(ctx context.Context, shardInfos []*t var wg sync.WaitGroup rec := concurrency.AllErrorRecorder{} - if scw.minHealthyRdonlyTablets > 0 && len(shardInfos) > 0 { - scw.wr.Logger().Infof("Waiting %v for %d %s/%s RDONLY tablet(s)", timeout, scw.minHealthyRdonlyTablets, shardInfos[0].Keyspace(), shardInfos[0].ShardName()) + if scw.minHealthyTablets > 0 && len(shardInfos) > 0 { + scw.wr.Logger().Infof("Waiting %v for %d %s/%s RDONLY tablet(s)", timeout, scw.minHealthyTablets, shardInfos[0].Keyspace(), shardInfos[0].ShardName()) } for _, si := range shardInfos { wg.Add(1) go func(keyspace, shard string) { defer wg.Done() - // We wait for --min_healthy_rdonly_tablets because we will use several + // We wait for --min_healthy_tablets because we will use several // tablets per shard to spread reading the chunks of rows across as many // tablets as possible. - if _, err := waitForHealthyTablets(ctx, scw.wr, scw.tsc, scw.cell, keyspace, shard, scw.minHealthyRdonlyTablets, timeout, topodatapb.TabletType_RDONLY); err != nil { + if _, err := waitForHealthyTablets(ctx, scw.wr, scw.tsc, scw.cell, keyspace, shard, scw.minHealthyTablets, timeout, scw.tabletType); err != nil { rec.RecordError(err) } }(si.Keyspace(), si.ShardName()) @@ -795,6 +882,228 @@ func (scw *SplitCloneWorker) waitForTablets(ctx context.Context, shardInfos []*t return rec.Error() } +func (scw *SplitCloneWorker) findFirstSourceTablet(ctx context.Context, state StatusWorkerState) (*topodatapb.Tablet, error) { + if state == WorkerStateCloneOffline { + // Use the first source tablet which we took offline. + return scw.sourceTablets[0], nil + } + + // Pick any healthy serving source tablet. + si := scw.sourceShards[0] + tablets := scw.tsc.GetHealthyTabletStats(si.Keyspace(), si.ShardName(), scw.tabletType) + if len(tablets) == 0 { + // We fail fast on this problem and don't retry because at the start all tablets should be healthy. + return nil, fmt.Errorf("no healthy %v tablet in source shard (%v) available (required to find out the schema)", topodatapb.TabletType_name[int32(scw.tabletType)], topoproto.KeyspaceShardString(si.Keyspace(), si.ShardName())) + } + return tablets[0].Tablet, nil +} + +func (scw *SplitCloneWorker) getCounters(state StatusWorkerState) ([]*stats.CountersWithSingleLabel, *tableStatusList) { + switch state { + case WorkerStateCloneOnline: + return []*stats.CountersWithSingleLabel{statsOnlineInsertsCounters, statsOnlineUpdatesCounters, statsOnlineDeletesCounters, statsOnlineEqualRowsCounters}, + scw.tableStatusListOnline + case WorkerStateCloneOffline: + return []*stats.CountersWithSingleLabel{statsOfflineInsertsCounters, statsOfflineUpdatesCounters, statsOfflineDeletesCounters, statsOfflineEqualRowsCounters}, + scw.tableStatusListOffline + default: + panic("should not happen") + } +} + +func (scw *SplitCloneWorker) startExecutor(ctx context.Context, wg *sync.WaitGroup, keyspace, shard string, insertChannel chan string, threadID int, processError func(string, ...interface{})) { + defer wg.Done() + t := scw.getThrottler(keyspace, shard) + //defer t.ThreadFinished(threadID) + + executor := newExecutor(scw.wr, scw.tsc, t, keyspace, shard, threadID) + if err := executor.fetchLoop(ctx, insertChannel); err != nil { + processError("executer.FetchLoop failed: %v", err) + } +} + +func mergeOrSingle(readers []ResultReader, td *tabletmanagerdatapb.TableDefinition) (ResultReader, error) { + if len(readers) == 1 { + return readers[0], nil + } + + sourceReader, err := NewResultMerger(readers, len(td.PrimaryKeyColumns)) + if err != nil { + return nil, err + } + + return sourceReader, nil +} + +func (scw *SplitCloneWorker) getSourceResultReader(ctx context.Context, td *tabletmanagerdatapb.TableDefinition, state StatusWorkerState, chunk chunk, txID int64) (ResultReader, error) { + sourceReaders := make([]ResultReader, len(scw.sourceShards)) + for shardIndex, si := range scw.sourceShards { + var sourceResultReader ResultReader + var err error + if state == WorkerStateCloneOffline && scw.useConsistentSnapshot { + if txID < 1 { + return nil, fmt.Errorf("tried using consistent snapshot without a valid transaction") + } + tp := newShardTabletProvider(scw.tsc, scw.tabletTracker, si.Keyspace(), si.ShardName(), scw.tabletType) + sourceResultReader, err = NewTransactionalRestartableResultReader(ctx, scw.wr.Logger(), tp, td, chunk, false, txID) + } else { + var tp tabletProvider + allowMultipleRetries := true + if state == WorkerStateCloneOffline { + tp = newSingleTabletProvider(ctx, scw.wr.TopoServer(), scw.sourceAliases[shardIndex]) + // allowMultipleRetries is false to avoid that we'll keep retrying + // on the same tablet alias for hours. This guards us against the + // situation that an offline tablet gets restarted and serves again. + // In that case we cannot use it because its replication is no + // longer stopped at the same point as we took it offline initially. + allowMultipleRetries = false + } else { + tp = newShardTabletProvider(scw.tsc, scw.tabletTracker, si.Keyspace(), si.ShardName(), scw.tabletType) + } + sourceResultReader, err = NewRestartableResultReader(ctx, scw.wr.Logger(), tp, td, chunk, allowMultipleRetries) + if err != nil { + return nil, fmt.Errorf("NewRestartableResultReader for source: %v failed: %v", tp.description(), err) + } + } + // TODO: We could end up in a situation where some readers have been created but not all. In this situation, we would not close up all readers + sourceReaders[shardIndex] = sourceResultReader + } + return mergeOrSingle(sourceReaders, td) +} + +func (scw *SplitCloneWorker) getDestinationResultReader(ctx context.Context, td *tabletmanagerdatapb.TableDefinition, state StatusWorkerState, chunk chunk) (ResultReader, error) { + destReaders := make([]ResultReader, len(scw.destinationShards)) + for shardIndex, si := range scw.destinationShards { + tp := newShardTabletProvider(scw.tsc, scw.tabletTracker, si.Keyspace(), si.ShardName(), topodatapb.TabletType_MASTER) + destResultReader, err := NewRestartableResultReader(ctx, scw.wr.Logger(), tp, td, chunk, true /* allowMultipleRetries */) + if err != nil { + return nil, fmt.Errorf("NewRestartableResultReader for destination: %v failed: %v", tp.description(), err) + } + destReaders[shardIndex] = destResultReader + } + return mergeOrSingle(destReaders, td) +} + +func (scw *SplitCloneWorker) cloneAChunk(ctx context.Context, td *tabletmanagerdatapb.TableDefinition, tableIndex int, chunk chunk, processError func(string, ...interface{}), state StatusWorkerState, tableStatusList *tableStatusList, keyResolver keyspaceIDResolver, start time.Time, insertChannels []chan string, txID int64, statsCounters []*stats.CountersWithSingleLabel) { + errPrefix := fmt.Sprintf("table=%v chunk=%v", td.Name, chunk) + + var err error + + if err := checkDone(ctx); err != nil { + processError("%v: Context expired while this thread was waiting for its turn. Context error: %v", errPrefix, err) + return + } + + tableStatusList.threadStarted(tableIndex) + defer tableStatusList.threadDone(tableIndex) + + if state == WorkerStateCloneOnline { + // Wait for enough healthy tablets (they might have become unhealthy + // and their replication lag might have increased since we started.) + if err := scw.waitForTablets(ctx, scw.sourceShards, *retryDuration); err != nil { + processError("%v: No healthy source tablets found (gave up after %v): %v", errPrefix, time.Since(start), err) + return + } + } + + // Set up readers for the diff. There will be one reader for every + // source and destination shard. + sourceReader, err := scw.getSourceResultReader(ctx, td, state, chunk, txID) + if err != nil { + processError("%v NewResultMerger for source tablets failed: %v", errPrefix, err) + return + } + defer sourceReader.Close(ctx) + destReader, err := scw.getDestinationResultReader(ctx, td, state, chunk) + if err != nil { + processError("%v NewResultMerger for destinations tablets failed: %v", errPrefix, err) + return + } + defer destReader.Close(ctx) + dbNames := make([]string, len(scw.destinationShards)) + for i, si := range scw.destinationShards { + keyspaceAndShard := topoproto.KeyspaceShardString(si.Keyspace(), si.ShardName()) + dbNames[i] = scw.destinationDbNames[keyspaceAndShard] + } + // Compare the data and reconcile any differences. + differ, err := NewRowDiffer2(ctx, sourceReader, destReader, td, tableStatusList, tableIndex, + scw.destinationShards, keyResolver, + insertChannels, ctx.Done(), dbNames, scw.writeQueryMaxRows, scw.writeQueryMaxSize, statsCounters) + if err != nil { + processError("%v: NewRowDiffer2 failed: %v", errPrefix, err) + return + } + // Ignore the diff report because all diffs should get reconciled. + _ /* DiffReport */, err = differ.Diff() + if err != nil { + processError("%v: RowDiffer2 failed: %v", errPrefix, err) + return + } +} + +type workUnit struct { + td *tabletmanagerdatapb.TableDefinition + chunk chunk + threadID int + resolver keyspaceIDResolver +} + +func (scw *SplitCloneWorker) startCloningData(ctx context.Context, state StatusWorkerState, sourceSchemaDefinition *tabletmanagerdatapb.SchemaDefinition, + processError func(string, ...interface{}), firstSourceTablet *topodatapb.Tablet, tableStatusList *tableStatusList, + start time.Time, statsCounters []*stats.CountersWithSingleLabel, insertChannels []chan string, wg *sync.WaitGroup) error { + + workPipeline := make(chan workUnit, 10) // We'll use a small buffer so producers do not run too far ahead of consumers + queryService, err := tabletconn.GetDialer()(firstSourceTablet, true) + if err != nil { + return fmt.Errorf("failed to create queryService: %v", err) + } + defer queryService.Close(ctx) + + // Let's start the work consumers + for i := 0; i < scw.sourceReaderCount; i++ { + var txID int64 + if scw.useConsistentSnapshot && state == WorkerStateCloneOffline { + txID = scw.transactions[i] + } else { + txID = -1 + } + + wg.Add(1) + go func() { + defer wg.Done() + for work := range workPipeline { + scw.cloneAChunk(ctx, work.td, work.threadID, work.chunk, processError, state, tableStatusList, work.resolver, start, insertChannels, txID, statsCounters) + } + }() + } + + // And now let's start producing work units + for tableIndex, td := range sourceSchemaDefinition.TableDefinitions { + td = reorderColumnsPrimaryKeyFirst(td) + + keyResolver, err := scw.createKeyResolver(td) + if err != nil { + return fmt.Errorf("cannot resolve sharding keys for keyspace %v: %v", scw.destinationKeyspace, err) + } + + // TODO(mberlin): We're going to chunk *all* source shards based on the MIN + // and MAX values of the *first* source shard. Is this going to be a problem? + chunks, err := generateChunks(ctx, scw.wr, firstSourceTablet, td, scw.chunkCount, scw.minRowsPerChunk) + if err != nil { + return fmt.Errorf("failed to split table into chunks: %v", err) + } + tableStatusList.setThreadCount(tableIndex, len(chunks)) + + for _, c := range chunks { + workPipeline <- workUnit{td: td, chunk: c, threadID: tableIndex, resolver: keyResolver} + } + } + + close(workPipeline) + + return nil +} + // copy phase: // - copy the data from source tablets to destination masters (with replication on) // Assumes that the schema has already been created on each destination tablet @@ -809,31 +1118,13 @@ func (scw *SplitCloneWorker) clone(ctx context.Context, state StatusWorkerState) statsStateDurationsNs.Set(string(state), time.Now().Sub(start).Nanoseconds()) }() - var firstSourceTablet *topodatapb.Tablet - if state == WorkerStateCloneOffline { - // Use the first source tablet which we took offline. - firstSourceTablet = scw.sourceTablets[0] - } else { - // Pick any healthy serving source tablet. - si := scw.sourceShards[0] - tablets := scw.tsc.GetHealthyTabletStats(si.Keyspace(), si.ShardName(), topodatapb.TabletType_RDONLY) - if len(tablets) == 0 { - // We fail fast on this problem and don't retry because at the start all tablets should be healthy. - return fmt.Errorf("no healthy RDONLY tablet in source shard (%v) available (required to find out the schema)", topoproto.KeyspaceShardString(si.Keyspace(), si.ShardName())) - } - firstSourceTablet = tablets[0].Tablet - } - var statsCounters []*stats.CountersWithSingleLabel - var tableStatusList *tableStatusList - switch state { - case WorkerStateCloneOnline: - statsCounters = []*stats.CountersWithSingleLabel{statsOnlineInsertsCounters, statsOnlineUpdatesCounters, statsOnlineDeletesCounters, statsOnlineEqualRowsCounters} - tableStatusList = scw.tableStatusListOnline - case WorkerStateCloneOffline: - statsCounters = []*stats.CountersWithSingleLabel{statsOfflineInsertsCounters, statsOfflineUpdatesCounters, statsOfflineDeletesCounters, statsOfflineEqualRowsCounters} - tableStatusList = scw.tableStatusListOffline + var firstSourceTablet, err = scw.findFirstSourceTablet(ctx, state) + if err != nil { + return err } + statsCounters, tableStatusList := scw.getCounters(state) + // The throttlers exist only for the duration of this clone() call. // That means a SplitClone invocation with both online and offline phases // will create throttlers for each phase. @@ -849,22 +1140,19 @@ func (scw *SplitCloneWorker) clone(ctx context.Context, state StatusWorkerState) scw.wr.Logger().Infof("Source tablet 0 has %v tables to copy", len(sourceSchemaDefinition.TableDefinitions)) tableStatusList.initialize(sourceSchemaDefinition) - // In parallel, setup the channels to send SQL data chunks to for each destination tablet: - // - // mu protects the context for cancelation, and firstError - mu := sync.Mutex{} var firstError error ctx, cancelCopy := context.WithCancel(ctx) defer cancelCopy() processError := func(format string, args ...interface{}) { - mu.Lock() + // in theory we could have two threads see firstError as null and both write to the variable + // that should not cause any problems though - canceling and logging is concurrently safe, + // and overwriting the variable will not cause any problems + scw.wr.Logger().Errorf(format, args...) if firstError == nil { - scw.wr.Logger().Errorf(format, args...) firstError = fmt.Errorf(format, args...) cancelCopy() } - mu.Unlock() } // NOTE: Code below this point must *not* use "return" to exit this Go routine @@ -877,6 +1165,7 @@ func (scw *SplitCloneWorker) clone(ctx context.Context, state StatusWorkerState) // races between "defer throttler.ThreadFinished()" (must be executed first) // and "defer scw.closeThrottlers()". Otherwise, vtworker will panic. + // In parallel, setup the channels to send SQL data chunks to for each destination tablet: insertChannels := make([]chan string, len(scw.destinationShards)) destinationWaitGroup := sync.WaitGroup{} for shardIndex, si := range scw.destinationShards { @@ -888,168 +1177,41 @@ func (scw *SplitCloneWorker) clone(ctx context.Context, state StatusWorkerState) for j := 0; j < scw.destinationWriterCount; j++ { destinationWaitGroup.Add(1) - go func(keyspace, shard string, insertChannel chan string, throttler *throttler.Throttler, threadID int) { - defer destinationWaitGroup.Done() - defer throttler.ThreadFinished(threadID) - - executor := newExecutor(scw.wr, scw.tsc, throttler, keyspace, shard, threadID) - if err := executor.fetchLoop(ctx, insertChannel); err != nil { - processError("executer.FetchLoop failed: %v", err) - } - }(si.Keyspace(), si.ShardName(), insertChannels[shardIndex], scw.getThrottler(si.Keyspace(), si.ShardName()), j) + go scw.startExecutor(ctx, &destinationWaitGroup, si.Keyspace(), si.ShardName(), insertChannels[shardIndex], j, processError) } } // Now for each table, read data chunks and send them to all // insertChannels - sourceWaitGroup := sync.WaitGroup{} - sema := sync2.NewSemaphore(scw.sourceReaderCount, 0) - for tableIndex, td := range sourceSchemaDefinition.TableDefinitions { - td = reorderColumnsPrimaryKeyFirst(td) - - keyResolver, err := scw.createKeyResolver(td) - if err != nil { - processError("cannot resolve sharding keys for keyspace %v: %v", scw.destinationKeyspace, err) - break - } - - // TODO(mberlin): We're going to chunk *all* source shards based on the MIN - // and MAX values of the *first* source shard. Is this going to be a problem? - chunks, err := generateChunks(ctx, scw.wr, firstSourceTablet, td, scw.chunkCount, scw.minRowsPerChunk) - if err != nil { - processError("failed to split table into chunks: %v", err) - break - } - tableStatusList.setThreadCount(tableIndex, len(chunks)) - - for _, c := range chunks { - sourceWaitGroup.Add(1) - go func(td *tabletmanagerdatapb.TableDefinition, tableIndex int, chunk chunk) { - defer sourceWaitGroup.Done() - errPrefix := fmt.Sprintf("table=%v chunk=%v", td.Name, chunk) + readers := sync.WaitGroup{} - // We need our own error per Go routine to avoid races. - var err error - - sema.Acquire() - defer sema.Release() - - if err := checkDone(ctx); err != nil { - processError("%v: Context expired while this thread was waiting for its turn. Context error: %v", errPrefix, err) - return - } - - tableStatusList.threadStarted(tableIndex) - defer tableStatusList.threadDone(tableIndex) - - if state == WorkerStateCloneOnline { - // Wait for enough healthy tablets (they might have become unhealthy - // and their replication lag might have increased since we started.) - if err := scw.waitForTablets(ctx, scw.sourceShards, *retryDuration); err != nil { - processError("%v: No healthy source tablets found (gave up after %v): %v", errPrefix, time.Since(start), err) - return - } - } - - // Set up readers for the diff. There will be one reader for every - // source and destination shard. - sourceReaders := make([]ResultReader, len(scw.sourceShards)) - destReaders := make([]ResultReader, len(scw.destinationShards)) - for shardIndex, si := range scw.sourceShards { - var tp tabletProvider - allowMultipleRetries := true - if state == WorkerStateCloneOffline { - tp = newSingleTabletProvider(ctx, scw.wr.TopoServer(), scw.offlineSourceAliases[shardIndex]) - // allowMultipleRetries is false to avoid that we'll keep retrying - // on the same tablet alias for hours. This guards us against the - // situation that an offline tablet gets restarted and serves again. - // In that case we cannot use it because its replication is no - // longer stopped at the same point as we took it offline initially. - allowMultipleRetries = false - } else { - tp = newShardTabletProvider(scw.tsc, scw.tabletTracker, si.Keyspace(), si.ShardName(), topodatapb.TabletType_RDONLY) - } - sourceResultReader, err := NewRestartableResultReader(ctx, scw.wr.Logger(), tp, td, chunk, allowMultipleRetries) - if err != nil { - processError("%v: NewRestartableResultReader for source: %v failed: %v", errPrefix, tp.description(), err) - return - } - defer sourceResultReader.Close(ctx) - sourceReaders[shardIndex] = sourceResultReader - } - - for shardIndex, si := range scw.destinationShards { - tp := newShardTabletProvider(scw.tsc, scw.tabletTracker, si.Keyspace(), si.ShardName(), topodatapb.TabletType_MASTER) - destResultReader, err := NewRestartableResultReader(ctx, scw.wr.Logger(), tp, td, chunk, true /* allowMultipleRetries */) - if err != nil { - processError("%v: NewRestartableResultReader for destination: %v failed: %v", errPrefix, tp.description(), err) - return - } - defer destResultReader.Close(ctx) - destReaders[shardIndex] = destResultReader - } - - var sourceReader ResultReader - var destReader ResultReader - if len(sourceReaders) >= 2 { - sourceReader, err = NewResultMerger(sourceReaders, len(td.PrimaryKeyColumns)) - if err != nil { - processError("%v: NewResultMerger for source tablets failed: %v", errPrefix, err) - return - } - } else { - sourceReader = sourceReaders[0] - } - if len(destReaders) >= 2 { - destReader, err = NewResultMerger(destReaders, len(td.PrimaryKeyColumns)) - if err != nil { - processError("%v: NewResultMerger for destination tablets failed: %v", errPrefix, err) - return - } - } else { - destReader = destReaders[0] - } - - dbNames := make([]string, len(scw.destinationShards)) - for i, si := range scw.destinationShards { - keyspaceAndShard := topoproto.KeyspaceShardString(si.Keyspace(), si.ShardName()) - dbNames[i] = scw.destinationDbNames[keyspaceAndShard] - } - // Compare the data and reconcile any differences. - differ, err := NewRowDiffer2(ctx, sourceReader, destReader, td, tableStatusList, tableIndex, - scw.destinationShards, keyResolver, - insertChannels, ctx.Done(), dbNames, scw.writeQueryMaxRows, scw.writeQueryMaxSize, statsCounters) - if err != nil { - processError("%v: NewRowDiffer2 failed: %v", errPrefix, err) - return - } - // Ignore the diff report because all diffs should get reconciled. - _ /* DiffReport */, err = differ.Diff() - if err != nil { - processError("%v: RowDiffer2 failed: %v", errPrefix, err) - return - } - }(td, tableIndex, c) - } + err = scw.startCloningData(ctx, state, sourceSchemaDefinition, processError, firstSourceTablet, tableStatusList, start, statsCounters, insertChannels, &readers) + if err != nil { + return fmt.Errorf("failed to startCloningData : %v", err) } - sourceWaitGroup.Wait() + readers.Wait() for shardIndex := range scw.destinationShards { close(insertChannels[shardIndex]) } destinationWaitGroup.Wait() - if firstError != nil { - return firstError - } - if state == WorkerStateCloneOffline { - // Create and populate the vreplication table to give filtered replication - // a starting point. - queries := make([]string, 0, 4) - queries = append(queries, binlogplayer.CreateVReplicationTable()...) + return firstError +} + +func (scw *SplitCloneWorker) setUpVReplication(ctx context.Context) error { + wg := sync.WaitGroup{} + // Create and populate the vreplication table to give filtered replication + // a starting point. + queries := make([]string, 0, 4) + queries = append(queries, binlogplayer.CreateVReplicationTable()...) - // get the current position from the sources - sourcePositions := make([]string, len(scw.sourceShards)) + // get the current position from the sources + sourcePositions := make([]string, len(scw.sourceShards)) + + if scw.useConsistentSnapshot { + sourcePositions[0] = scw.lastPos + } else { for shardIndex := range scw.sourceShards { shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) status, err := scw.wr.TabletManagerClient().SlaveStatus(shortCtx, scw.sourceTablets[shardIndex]) @@ -1059,46 +1221,60 @@ func (scw *SplitCloneWorker) clone(ctx context.Context, state StatusWorkerState) } sourcePositions[shardIndex] = status.Position } + } + cancelableCtx, cancel := context.WithCancel(ctx) + rec := concurrency.AllErrorRecorder{} + handleError := func(e error) { + rec.RecordError(e) + cancel() + } - for _, si := range scw.destinationShards { - destinationWaitGroup.Add(1) - go func(keyspace, shard string, kr *topodatapb.KeyRange) { - defer destinationWaitGroup.Done() - scw.wr.Logger().Infof("Making and populating vreplication table") - - exc := newExecutor(scw.wr, scw.tsc, nil, keyspace, shard, 0) - for shardIndex, src := range scw.sourceShards { - bls := &binlogdatapb.BinlogSource{ - Keyspace: src.Keyspace(), - Shard: src.ShardName(), - } - if scw.tables == nil { - bls.KeyRange = kr - } else { - bls.Tables = scw.tables - } - // TODO(mberlin): Fill in scw.maxReplicationLag once the adapative - // throttler is enabled by default. - qr, err := exc.vreplicationExec(ctx, binlogplayer.CreateVReplication("SplitClone", bls, sourcePositions[shardIndex], scw.maxTPS, throttler.ReplicationLagModuleDisabled, time.Now().Unix())) - if err != nil { - processError("vreplication queries failed: %v", err) - break - } - if err := scw.wr.SourceShardAdd(ctx, keyspace, shard, uint32(qr.InsertID), src.Keyspace(), src.ShardName(), src.Shard.KeyRange, scw.tables); err != nil { - processError("could not add source shard: %v", err) - break - } + for _, si := range scw.destinationShards { + wg.Add(1) + go func(keyspace, shard string, kr *topodatapb.KeyRange) { + defer wg.Done() + scw.wr.Logger().Infof("Making and populating vreplication table") + + exc := newExecutor(scw.wr, scw.tsc, nil, keyspace, shard, 0) + for shardIndex, src := range scw.sourceShards { + // Check if any error occurred in any other gorouties: + select { + case <-cancelableCtx.Done(): + return // Error somewhere, terminate + default: + } + + bls := &binlogdatapb.BinlogSource{ + Keyspace: src.Keyspace(), + Shard: src.ShardName(), } - // refreshState will cause the destination to become non-serving because - // it's now participating in the resharding workflow. - if err := exc.refreshState(ctx); err != nil { - processError("RefreshState failed on tablet %v/%v: %v", keyspace, shard, err) + if scw.tables == nil { + bls.KeyRange = kr + } else { + bls.Tables = scw.tables } - }(si.Keyspace(), si.ShardName(), si.KeyRange) - } - destinationWaitGroup.Wait() - } // clonePhase == offline - return firstError + // TODO(mberlin): Fill in scw.maxReplicationLag once the adapative throttler is enabled by default. + qr, err := exc.vreplicationExec(cancelableCtx, binlogplayer.CreateVReplication("SplitClone", bls, sourcePositions[shardIndex], scw.maxTPS, throttler.ReplicationLagModuleDisabled, time.Now().Unix())) + if err != nil { + handleError(fmt.Errorf("vreplication queries failed: %v", err)) + cancel() + return + } + if err := scw.wr.SourceShardAdd(cancelableCtx, keyspace, shard, uint32(qr.InsertID), src.Keyspace(), src.ShardName(), src.Shard.KeyRange, scw.tables); err != nil { + handleError(fmt.Errorf("could not add source shard: %v", err)) + break + } + } + // refreshState will cause the destination to become non-serving because + // it's now participating in the resharding workflow. + if err := exc.refreshState(ctx); err != nil { + handleError(fmt.Errorf("RefreshState failed on tablet %v/%v: %v", keyspace, shard, err)) + } + }(si.Keyspace(), si.ShardName(), si.KeyRange) + } + wg.Wait() + + return rec.Error() } func (scw *SplitCloneWorker) getSourceSchema(ctx context.Context, tablet *topodatapb.Tablet) (*tabletmanagerdatapb.SchemaDefinition, error) { diff --git a/go/vt/worker/split_clone_cmd.go b/go/vt/worker/split_clone_cmd.go index 9285ae2f4cb..7f98a4800ba 100644 --- a/go/vt/worker/split_clone_cmd.go +++ b/go/vt/worker/split_clone_cmd.go @@ -25,12 +25,12 @@ import ( "strings" "sync" - "vitess.io/vitess/go/vt/vterrors" - "golang.org/x/net/context" "vitess.io/vitess/go/vt/concurrency" + "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/topotools" + "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/wrangler" ) @@ -82,14 +82,22 @@ const splitCloneHTML2 = `

- -
+ + +
+ +


+ + ?
@@ -108,7 +116,9 @@ func commandSplitClone(wi *Instance, wr *wrangler.Wrangler, subFlags *flag.FlagS writeQueryMaxRows := subFlags.Int("write_query_max_rows", defaultWriteQueryMaxRows, "maximum number of rows per write query") writeQueryMaxSize := subFlags.Int("write_query_max_size", defaultWriteQueryMaxSize, "maximum size (in bytes) per write query") destinationWriterCount := subFlags.Int("destination_writer_count", defaultDestinationWriterCount, "number of concurrent RPCs to execute on the destination") - minHealthyRdonlyTablets := subFlags.Int("min_healthy_rdonly_tablets", defaultMinHealthyRdonlyTablets, "minimum number of healthy RDONLY tablets in the source and destination shard at start") + tabletTypeStr := subFlags.String("tablet_type", "RDONLY", "tablet type to use (RDONLY or REPLICA)") + minHealthyTablets := subFlags.Int("min_healthy_tablets", defaultMinHealthyTablets, "minimum number of healthy tablets in the source and destination shard at start") + useConsistentSnapshot := subFlags.Bool("use_consistent_snapshot", defaultUseConsistentSnapshot, "Instead of pausing replication on the source, uses transactions with consistent snapshot to have a stable view of the data.") maxTPS := subFlags.Int64("max_tps", defaultMaxTPS, "rate limit of maximum number of (write) transactions/second on the destination (unlimited by default)") maxReplicationLag := subFlags.Int64("max_replication_lag", defaultMaxReplicationLag, "if set, the adapative throttler will be enabled and automatically adjust the write rate to keep the lag below the set value in seconds (disabled by default)") if err := subFlags.Parse(args); err != nil { @@ -127,7 +137,11 @@ func commandSplitClone(wi *Instance, wr *wrangler.Wrangler, subFlags *flag.FlagS if *excludeTables != "" { excludeTableArray = strings.Split(*excludeTables, ",") } - worker, err := newSplitCloneWorker(wr, wi.cell, keyspace, shard, *online, *offline, excludeTableArray, *chunkCount, *minRowsPerChunk, *sourceReaderCount, *writeQueryMaxRows, *writeQueryMaxSize, *destinationWriterCount, *minHealthyRdonlyTablets, *maxTPS, *maxReplicationLag) + tabletType, ok := topodata.TabletType_value[*tabletTypeStr] + if !ok { + return nil, fmt.Errorf("command SplitClone invalid tablet_type: %v", tabletType) + } + worker, err := newSplitCloneWorker(wr, wi.cell, keyspace, shard, *online, *offline, excludeTableArray, *chunkCount, *minRowsPerChunk, *sourceReaderCount, *writeQueryMaxRows, *writeQueryMaxSize, *destinationWriterCount, *minHealthyTablets, topodata.TabletType(tabletType), *maxTPS, *maxReplicationLag, *useConsistentSnapshot) if err != nil { return nil, vterrors.Wrap(err, "cannot create split clone worker") } @@ -212,9 +226,10 @@ func interactiveSplitClone(ctx context.Context, wi *Instance, wr *wrangler.Wrang result["DefaultWriteQueryMaxRows"] = fmt.Sprintf("%v", defaultWriteQueryMaxRows) result["DefaultWriteQueryMaxSize"] = fmt.Sprintf("%v", defaultWriteQueryMaxSize) result["DefaultDestinationWriterCount"] = fmt.Sprintf("%v", defaultDestinationWriterCount) - result["DefaultMinHealthyRdonlyTablets"] = fmt.Sprintf("%v", defaultMinHealthyRdonlyTablets) + result["DefaultMinHealthyTablets"] = fmt.Sprintf("%v", defaultMinHealthyTablets) result["DefaultMaxTPS"] = fmt.Sprintf("%v", defaultMaxTPS) result["DefaultMaxReplicationLag"] = fmt.Sprintf("%v", defaultMaxReplicationLag) + result["DefaultUseConsistentSnapshot"] = fmt.Sprintf("%v", defaultUseConsistentSnapshot) return nil, splitCloneTemplate2, result, nil } @@ -257,10 +272,15 @@ func interactiveSplitClone(ctx context.Context, wi *Instance, wr *wrangler.Wrang if err != nil { return nil, nil, nil, vterrors.Wrap(err, "cannot parse destinationWriterCount") } - minHealthyRdonlyTabletsStr := r.FormValue("minHealthyRdonlyTablets") - minHealthyRdonlyTablets, err := strconv.ParseInt(minHealthyRdonlyTabletsStr, 0, 64) + minHealthyTabletsStr := r.FormValue("minHealthyTablets") + minHealthyTablets, err := strconv.ParseInt(minHealthyTabletsStr, 0, 64) if err != nil { - return nil, nil, nil, vterrors.Wrap(err, "cannot parse minHealthyRdonlyTablets") + return nil, nil, nil, vterrors.Wrap(err, "cannot parse minHealthyTablets") + } + tabletTypeStr := r.FormValue("tabletType") + tabletType, ok := topodata.TabletType_value[tabletTypeStr] + if !ok { + return nil, nil, nil, fmt.Errorf("command SplitClone invalid tablet_type: %v", tabletType) } maxTPSStr := r.FormValue("maxTPS") maxTPS, err := strconv.ParseInt(maxTPSStr, 0, 64) @@ -273,8 +293,11 @@ func interactiveSplitClone(ctx context.Context, wi *Instance, wr *wrangler.Wrang return nil, nil, nil, vterrors.Wrap(err, "cannot parse maxReplicationLag") } + useConsistentSnapshotStr := r.FormValue("useConsistentSnapshot") + useConsistentSnapshot := useConsistentSnapshotStr == "true" + // start the clone job - wrk, err := newSplitCloneWorker(wr, wi.cell, keyspace, shard, online, offline, excludeTableArray, int(chunkCount), int(minRowsPerChunk), int(sourceReaderCount), int(writeQueryMaxRows), int(writeQueryMaxSize), int(destinationWriterCount), int(minHealthyRdonlyTablets), maxTPS, maxReplicationLag) + wrk, err := newSplitCloneWorker(wr, wi.cell, keyspace, shard, online, offline, excludeTableArray, int(chunkCount), int(minRowsPerChunk), int(sourceReaderCount), int(writeQueryMaxRows), int(writeQueryMaxSize), int(destinationWriterCount), int(minHealthyTablets), topodata.TabletType(tabletType), maxTPS, maxReplicationLag, useConsistentSnapshot) if err != nil { return nil, nil, nil, vterrors.Wrap(err, "cannot create worker") } diff --git a/go/vt/worker/split_clone_test.go b/go/vt/worker/split_clone_test.go index 6e219171ed2..cf72167b22b 100644 --- a/go/vt/worker/split_clone_test.go +++ b/go/vt/worker/split_clone_test.go @@ -576,7 +576,7 @@ func TestSplitCloneV2_Offline_RestartStreamingQuery(t *testing.T) { // Run the vtworker command. // We require only 1 instead of the default 2 replicas. - args := []string{"SplitClone", "--min_healthy_rdonly_tablets", "1"} + args := []string{"SplitClone", "--min_healthy_tablets", "1"} args = append(args, tc.defaultWorkerArgs[1:]...) if err := runCommand(t, tc.wi, tc.wi.wr, args); err != nil { t.Fatal(err) @@ -615,7 +615,7 @@ func TestSplitCloneV2_Offline_FailOverStreamingQuery_NotAllowed(t *testing.T) { defer tc.rightMasterFakeDb.DeleteAllEntries() // Run the vtworker command. - args := []string{"SplitClone", "--min_healthy_rdonly_tablets", "1"} + args := []string{"SplitClone", "--min_healthy_tablets", "1"} args = append(args, tc.defaultWorkerArgs[1:]...) if err := runCommand(t, tc.wi, tc.wi.wr, args); err == nil || !strings.Contains(err.Error(), "first retry to restart the streaming query on the same tablet failed. We're failing at this point") { t.Fatalf("worker should have failed because all tablets became unavailable and it gave up retrying. err: %v", err) @@ -662,7 +662,7 @@ func TestSplitCloneV2_Online_FailOverStreamingQuery(t *testing.T) { args := []string{"SplitClone", "-offline=false", // We require only 1 instead of the default 2 replicas. - "--min_healthy_rdonly_tablets", "1"} + "--min_healthy_tablets", "1"} args = append(args, tc.defaultWorkerArgs[2:]...) if err := runCommand(t, tc.wi, tc.wi.wr, args); err != nil { t.Fatal(err) @@ -719,7 +719,7 @@ func TestSplitCloneV2_Online_TabletsUnavailableDuringRestart(t *testing.T) { args := []string{"SplitClone", "-offline=false", // We require only 1 instead of the default 2 replicas. - "--min_healthy_rdonly_tablets", "1"} + "--min_healthy_tablets", "1"} args = append(args, tc.defaultWorkerArgs[2:]...) if err := runCommand(t, tc.wi, tc.wi.wr, args); err == nil || !strings.Contains(err.Error(), "failed to restart the streaming connection") { t.Fatalf("worker should have failed because all tablets became unavailable and it gave up retrying. err: %v", err) diff --git a/go/vt/worker/split_diff_cmd.go b/go/vt/worker/split_diff_cmd.go index 39178646f8f..8ac36c675e6 100644 --- a/go/vt/worker/split_diff_cmd.go +++ b/go/vt/worker/split_diff_cmd.go @@ -67,7 +67,7 @@ const splitDiffHTML2 = `
-
+

@@ -83,7 +83,7 @@ var splitDiffTemplate2 = mustParseTemplate("splitDiff2", splitDiffHTML2) func commandSplitDiff(wi *Instance, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) (Worker, error) { sourceUID := subFlags.Int("source_uid", 0, "uid of the source shard to run the diff against") excludeTables := subFlags.String("exclude_tables", "", "comma separated list of tables to exclude") - minHealthyRdonlyTablets := subFlags.Int("min_healthy_rdonly_tablets", defaultMinHealthyRdonlyTablets, "minimum number of healthy RDONLY tablets before taking out one") + minHealthyRdonlyTablets := subFlags.Int("min_healthy_rdonly_tablets", defaultMinHealthyTablets, "minimum number of healthy RDONLY tablets before taking out one") destTabletTypeStr := subFlags.String("dest_tablet_type", defaultDestTabletType, "destination tablet type (RDONLY or REPLICA) that will be used to compare the shards") parallelDiffsCount := subFlags.Int("parallel_diffs_count", defaultParallelDiffsCount, "number of tables to diff in parallel") if err := subFlags.Parse(args); err != nil { @@ -196,7 +196,7 @@ func interactiveSplitDiff(ctx context.Context, wi *Instance, wr *wrangler.Wrangl result["Keyspace"] = keyspace result["Shard"] = shard result["DefaultSourceUID"] = "0" - result["DefaultMinHealthyRdonlyTablets"] = fmt.Sprintf("%v", defaultMinHealthyRdonlyTablets) + result["DefaultMinHealthyTablets"] = fmt.Sprintf("%v", defaultMinHealthyTablets) result["DefaultParallelDiffsCount"] = fmt.Sprintf("%v", defaultParallelDiffsCount) return nil, splitDiffTemplate2, result, nil } diff --git a/go/vt/worker/vertical_split_clone_cmd.go b/go/vt/worker/vertical_split_clone_cmd.go index c2d5a0b074c..6adcbe5c0e5 100644 --- a/go/vt/worker/vertical_split_clone_cmd.go +++ b/go/vt/worker/vertical_split_clone_cmd.go @@ -25,11 +25,11 @@ import ( "strings" "sync" - "vitess.io/vitess/go/vt/vterrors" - "golang.org/x/net/context" "vitess.io/vitess/go/vt/concurrency" + "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/wrangler" ) @@ -81,8 +81,14 @@ const verticalSplitCloneHTML2 = `

- -
+ + +
+ +

@@ -106,7 +112,8 @@ func commandVerticalSplitClone(wi *Instance, wr *wrangler.Wrangler, subFlags *fl writeQueryMaxRows := subFlags.Int("write_query_max_rows", defaultWriteQueryMaxRows, "maximum number of rows per write query") writeQueryMaxSize := subFlags.Int("write_query_max_size", defaultWriteQueryMaxSize, "maximum size (in bytes) per write query") destinationWriterCount := subFlags.Int("destination_writer_count", defaultDestinationWriterCount, "number of concurrent RPCs to execute on the destination") - minHealthyRdonlyTablets := subFlags.Int("min_healthy_rdonly_tablets", defaultMinHealthyRdonlyTablets, "minimum number of healthy RDONLY tablets before taking out one") + minHealthyTablets := subFlags.Int("min_healthy_tablets", defaultMinHealthyTablets, "minimum number of healthy tablets of specified type before taking out one") + tabletTypeStr := subFlags.String("tablet_type", "RDONLY", "tablet type to use (RDONLY or REPLICA)") maxTPS := subFlags.Int64("max_tps", defaultMaxTPS, "if non-zero, limit copy to maximum number of (write) transactions/second on the destination (unlimited by default)") maxReplicationLag := subFlags.Int64("max_replication_lag", defaultMaxReplicationLag, "if set, the adapative throttler will be enabled and automatically adjust the write rate to keep the lag below the set value in seconds (disabled by default)") if err := subFlags.Parse(args); err != nil { @@ -127,7 +134,11 @@ func commandVerticalSplitClone(wi *Instance, wr *wrangler.Wrangler, subFlags *fl if *tables != "" { tableArray = strings.Split(*tables, ",") } - worker, err := newVerticalSplitCloneWorker(wr, wi.cell, keyspace, shard, *online, *offline, tableArray, *chunkCount, *minRowsPerChunk, *sourceReaderCount, *writeQueryMaxRows, *writeQueryMaxSize, *destinationWriterCount, *minHealthyRdonlyTablets, *maxTPS, *maxReplicationLag) + tabletType, ok := topodata.TabletType_value[*tabletTypeStr] + if !ok { + return nil, fmt.Errorf("command SplitClone invalid tablet_type: %v", tabletType) + } + worker, err := newVerticalSplitCloneWorker(wr, wi.cell, keyspace, shard, *online, *offline, tableArray, *chunkCount, *minRowsPerChunk, *sourceReaderCount, *writeQueryMaxRows, *writeQueryMaxSize, *destinationWriterCount, *minHealthyTablets, topodata.TabletType(tabletType), *maxTPS, *maxReplicationLag, false) if err != nil { return nil, vterrors.Wrap(err, "cannot create worker") } @@ -208,7 +219,7 @@ func interactiveVerticalSplitClone(ctx context.Context, wi *Instance, wr *wrangl result["DefaultWriteQueryMaxRows"] = fmt.Sprintf("%v", defaultWriteQueryMaxRows) result["DefaultWriteQueryMaxSize"] = fmt.Sprintf("%v", defaultWriteQueryMaxSize) result["DefaultDestinationWriterCount"] = fmt.Sprintf("%v", defaultDestinationWriterCount) - result["DefaultMinHealthyRdonlyTablets"] = fmt.Sprintf("%v", defaultMinHealthyRdonlyTablets) + result["DefaultMinHealthyTablets"] = fmt.Sprintf("%v", defaultMinHealthyTablets) result["DefaultMaxTPS"] = fmt.Sprintf("%v", defaultMaxTPS) result["DefaultMaxReplicationLag"] = fmt.Sprintf("%v", defaultMaxReplicationLag) return nil, verticalSplitCloneTemplate2, result, nil @@ -250,10 +261,10 @@ func interactiveVerticalSplitClone(ctx context.Context, wi *Instance, wr *wrangl if err != nil { return nil, nil, nil, vterrors.Wrap(err, "cannot parse destinationWriterCount") } - minHealthyRdonlyTabletsStr := r.FormValue("minHealthyRdonlyTablets") - minHealthyRdonlyTablets, err := strconv.ParseInt(minHealthyRdonlyTabletsStr, 0, 64) + minHealthyTabletsStr := r.FormValue("minHealthyTablets") + minHealthyTablets, err := strconv.ParseInt(minHealthyTabletsStr, 0, 64) if err != nil { - return nil, nil, nil, vterrors.Wrap(err, "cannot parse minHealthyRdonlyTablets") + return nil, nil, nil, vterrors.Wrap(err, "cannot parse minHealthyTablets") } maxTPSStr := r.FormValue("maxTPS") maxTPS, err := strconv.ParseInt(maxTPSStr, 0, 64) @@ -265,6 +276,11 @@ func interactiveVerticalSplitClone(ctx context.Context, wi *Instance, wr *wrangl if err != nil { return nil, nil, nil, vterrors.Wrap(err, "cannot parse maxReplicationLag") } + tabletTypeStr := r.FormValue("tabletType") + tabletType, ok := topodata.TabletType_value[tabletTypeStr] + if !ok { + return nil, nil, nil, fmt.Errorf("cannot parse tabletType: %v", tabletType) + } // Figure out the shard shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) @@ -282,7 +298,7 @@ func interactiveVerticalSplitClone(ctx context.Context, wi *Instance, wr *wrangl } // start the clone job - wrk, err := newVerticalSplitCloneWorker(wr, wi.cell, keyspace, shard, online, offline, tableArray, int(chunkCount), int(minRowsPerChunk), int(sourceReaderCount), int(writeQueryMaxRows), int(writeQueryMaxSize), int(destinationWriterCount), int(minHealthyRdonlyTablets), maxTPS, maxReplicationLag) + wrk, err := newVerticalSplitCloneWorker(wr, wi.cell, keyspace, shard, online, offline, tableArray, int(chunkCount), int(minRowsPerChunk), int(sourceReaderCount), int(writeQueryMaxRows), int(writeQueryMaxSize), int(destinationWriterCount), int(minHealthyTablets), topodata.TabletType(tabletType), maxTPS, maxReplicationLag, false) if err != nil { return nil, nil, nil, vterrors.Wrap(err, "cannot create worker") } diff --git a/go/vt/worker/vertical_split_diff_cmd.go b/go/vt/worker/vertical_split_diff_cmd.go index 7eebf106f5a..82a8b2af10f 100644 --- a/go/vt/worker/vertical_split_diff_cmd.go +++ b/go/vt/worker/vertical_split_diff_cmd.go @@ -62,7 +62,7 @@ const verticalSplitDiffHTML2 = `
-
+

@@ -75,7 +75,7 @@ var verticalSplitDiffTemplate = mustParseTemplate("verticalSplitDiff", verticalS var verticalSplitDiffTemplate2 = mustParseTemplate("verticalSplitDiff2", verticalSplitDiffHTML2) func commandVerticalSplitDiff(wi *Instance, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) (Worker, error) { - minHealthyRdonlyTablets := subFlags.Int("min_healthy_rdonly_tablets", defaultMinHealthyRdonlyTablets, "minimum number of healthy RDONLY tablets before taking out one") + minHealthyRdonlyTablets := subFlags.Int("min_healthy_rdonly_tablets", defaultMinHealthyTablets, "minimum number of healthy RDONLY tablets before taking out one") parallelDiffsCount := subFlags.Int("parallel_diffs_count", defaultParallelDiffsCount, "number of tables to diff in parallel") destTabletTypeStr := subFlags.String("dest_tablet_type", defaultDestTabletType, "destination tablet type (RDONLY or REPLICA) that will be used to compare the shards") if err := subFlags.Parse(args); err != nil { @@ -183,7 +183,7 @@ func interactiveVerticalSplitDiff(ctx context.Context, wi *Instance, wr *wrangle result := make(map[string]interface{}) result["Keyspace"] = keyspace result["Shard"] = shard - result["DefaultMinHealthyRdonlyTablets"] = fmt.Sprintf("%v", defaultMinHealthyRdonlyTablets) + result["DefaultMinHealthyTablets"] = fmt.Sprintf("%v", defaultMinHealthyTablets) result["DefaultParallelDiffsCount"] = fmt.Sprintf("%v", defaultParallelDiffsCount) return nil, verticalSplitDiffTemplate2, result, nil } diff --git a/go/vt/workflow/resharding/tasks.go b/go/vt/workflow/resharding/tasks.go index c545a08ab7e..7e2d514069f 100644 --- a/go/vt/workflow/resharding/tasks.go +++ b/go/vt/workflow/resharding/tasks.go @@ -69,7 +69,7 @@ func (hw *horizontalReshardingWorkflow) runSplitClone(ctx context.Context, t *wo keyspace := t.Attributes["keyspace"] sourceShard := t.Attributes["source_shard"] worker := t.Attributes["vtworker"] - minHealthyRdonlyTablets := t.Attributes["min_healthy_rdonly_tablets"] + minHealthyTablets := t.Attributes["min_healthy_tablets"] splitCmd := t.Attributes["split_cmd"] sourceKeyspaceShard := topoproto.KeyspaceShardString(keyspace, sourceShard) @@ -79,7 +79,7 @@ func (hw *horizontalReshardingWorkflow) runSplitClone(ctx context.Context, t *wo return err } - args := []string{splitCmd, "--min_healthy_rdonly_tablets=" + minHealthyRdonlyTablets, sourceKeyspaceShard} + args := []string{splitCmd, "--min_healthy_tablets=" + minHealthyTablets, sourceKeyspaceShard} _, err := automation.ExecuteVtworker(hw.ctx, worker, args) return err } diff --git a/go/vt/workflow/resharding/workflow.go b/go/vt/workflow/resharding/workflow.go index c0fe36df72b..3541dc53825 100644 --- a/go/vt/workflow/resharding/workflow.go +++ b/go/vt/workflow/resharding/workflow.go @@ -74,7 +74,7 @@ func (*Factory) Init(m *workflow.Manager, w *workflowpb.Workflow, args []string) vtworkersStr := subFlags.String("vtworkers", "", "A comma-separated list of vtworker addresses") sourceShardsStr := subFlags.String("source_shards", "", "A comma-separated list of source shards") destinationShardsStr := subFlags.String("destination_shards", "", "A comma-separated list of destination shards") - minHealthyRdonlyTablets := subFlags.String("min_healthy_rdonly_tablets", "1", "Minimum number of healthy RDONLY tablets required in source shards") + minHealthyTablets := subFlags.String("min_healthy_tablets", "1", "Minimum number of healthy tablets required in source shards") splitCmd := subFlags.String("split_cmd", "SplitClone", "Split command to use to perform horizontal resharding (either SplitClone or LegacySplitClone)") splitDiffDestTabletType := subFlags.String("split_diff_dest_tablet_type", "RDONLY", "Specifies tablet type to use in destination shards while performing SplitDiff operation") phaseEnaableApprovalsDesc := fmt.Sprintf("Comma separated phases that require explicit approval in the UI to execute. Phase names are: %v", strings.Join(WorkflowPhases(), ",")) @@ -83,7 +83,7 @@ func (*Factory) Init(m *workflow.Manager, w *workflowpb.Workflow, args []string) if err := subFlags.Parse(args); err != nil { return err } - if *keyspace == "" || *vtworkersStr == "" || *minHealthyRdonlyTablets == "" || *splitCmd == "" { + if *keyspace == "" || *vtworkersStr == "" || *minHealthyTablets == "" || *splitCmd == "" { return fmt.Errorf("Keyspace name, min healthy rdonly tablets, split command, and vtworkers information must be provided for horizontal resharding") } @@ -103,13 +103,13 @@ func (*Factory) Init(m *workflow.Manager, w *workflowpb.Workflow, args []string) } } - err := validateWorkflow(m, *keyspace, vtworkers, sourceShards, destinationShards, *minHealthyRdonlyTablets) + err := validateWorkflow(m, *keyspace, vtworkers, sourceShards, destinationShards, *minHealthyTablets) if err != nil { return err } w.Name = fmt.Sprintf("Reshard shards %v into shards %v of keyspace %v.", *keyspace, *sourceShardsStr, *destinationShardsStr) - checkpoint, err := initCheckpoint(*keyspace, vtworkers, sourceShards, destinationShards, *minHealthyRdonlyTablets, *splitCmd, *splitDiffDestTabletType) + checkpoint, err := initCheckpoint(*keyspace, vtworkers, sourceShards, destinationShards, *minHealthyTablets, *splitCmd, *splitDiffDestTabletType) if err != nil { return err } @@ -268,7 +268,7 @@ func validateWorkflow(m *workflow.Manager, keyspace string, vtworkers, sourceSha } // initCheckpoint initialize the checkpoint for the horizontal workflow. -func initCheckpoint(keyspace string, vtworkers, sourceShards, destinationShards []string, minHealthyRdonlyTablets, splitCmd, splitDiffDestTabletType string) (*workflowpb.WorkflowCheckpoint, error) { +func initCheckpoint(keyspace string, vtworkers, sourceShards, destinationShards []string, minHealthyTablets, splitCmd, splitDiffDestTabletType string) (*workflowpb.WorkflowCheckpoint, error) { tasks := make(map[string]*workflowpb.Task) initTasks(tasks, phaseCopySchema, destinationShards, func(i int, shard string) map[string]string { return map[string]string{ @@ -279,11 +279,11 @@ func initCheckpoint(keyspace string, vtworkers, sourceShards, destinationShards }) initTasks(tasks, phaseClone, sourceShards, func(i int, shard string) map[string]string { return map[string]string{ - "keyspace": keyspace, - "source_shard": shard, - "min_healthy_rdonly_tablets": minHealthyRdonlyTablets, - "split_cmd": splitCmd, - "vtworker": vtworkers[i], + "keyspace": keyspace, + "source_shard": shard, + "min_healthy_tablets": minHealthyTablets, + "split_cmd": splitCmd, + "vtworker": vtworkers[i], } }) initTasks(tasks, phaseWaitForFilteredReplication, destinationShards, func(i int, shard string) map[string]string { diff --git a/go/vt/workflow/resharding/workflow_test.go b/go/vt/workflow/resharding/workflow_test.go index 631adc975b6..a2f279fe96a 100644 --- a/go/vt/workflow/resharding/workflow_test.go +++ b/go/vt/workflow/resharding/workflow_test.go @@ -66,20 +66,20 @@ func TestSourceDestShards(t *testing.T) { _, _, cancel := workflow.StartManager(m) // Create the workflow. vtworkersParameter := testVtworkers + "," + testVtworkers - _, err := m.Create(ctx, horizontalReshardingFactoryName, []string{"-keyspace=" + testKeyspace, "-vtworkers=" + vtworkersParameter, "-phase_enable_approvals=", "-min_healthy_rdonly_tablets=2", "-source_shards=0", "-destination_shards=-40,40-"}) + _, err := m.Create(ctx, horizontalReshardingFactoryName, []string{"-keyspace=" + testKeyspace, "-vtworkers=" + vtworkersParameter, "-phase_enable_approvals=", "-min_healthy_tablets=2", "-source_shards=0", "-destination_shards=-40,40-"}) want := "the specified destination shard test_keyspace/-40 is not in any overlapping shard" if err == nil || err.Error() != want { t.Errorf("workflow error: %v, want %s", err, want) } - _, err = m.Create(ctx, horizontalReshardingFactoryName, []string{"-keyspace=" + testKeyspace, "-vtworkers=" + vtworkersParameter, "-phase_enable_approvals=", "-min_healthy_rdonly_tablets=2", "-source_shards=0", "-destination_shards=-80,40-"}) + _, err = m.Create(ctx, horizontalReshardingFactoryName, []string{"-keyspace=" + testKeyspace, "-vtworkers=" + vtworkersParameter, "-phase_enable_approvals=", "-min_healthy_tablets=2", "-source_shards=0", "-destination_shards=-80,40-"}) want = "the specified destination shard test_keyspace/40- is not in any overlapping shard" if err == nil || err.Error() != want { t.Errorf("workflow error: %v, want %s", err, want) } - _, err = m.Create(ctx, horizontalReshardingFactoryName, []string{"-keyspace=" + testKeyspace, "-vtworkers=" + vtworkersParameter, "-phase_enable_approvals=", "-min_healthy_rdonly_tablets=2", "-source_shards=-20", "-destination_shards=-80,80-"}) + _, err = m.Create(ctx, horizontalReshardingFactoryName, []string{"-keyspace=" + testKeyspace, "-vtworkers=" + vtworkersParameter, "-phase_enable_approvals=", "-min_healthy_tablets=2", "-source_shards=-20", "-destination_shards=-80,80-"}) want = "the specified source shard test_keyspace/-20 is not in any overlapping shard" if err == nil || err.Error() != want { @@ -110,7 +110,7 @@ func TestHorizontalResharding(t *testing.T) { wg, _, cancel := workflow.StartManager(m) // Create the workflow. vtworkersParameter := testVtworkers + "," + testVtworkers - uuid, err := m.Create(ctx, horizontalReshardingFactoryName, []string{"-keyspace=" + testKeyspace, "-vtworkers=" + vtworkersParameter, "-phase_enable_approvals=", "-min_healthy_rdonly_tablets=2", "-source_shards=0", "-destination_shards=-80,80-"}) + uuid, err := m.Create(ctx, horizontalReshardingFactoryName, []string{"-keyspace=" + testKeyspace, "-vtworkers=" + vtworkersParameter, "-phase_enable_approvals=", "-min_healthy_tablets=2", "-source_shards=0", "-destination_shards=-80,80-"}) if err != nil { t.Fatalf("cannot create resharding workflow: %v", err) } @@ -145,7 +145,7 @@ func setupFakeVtworker(keyspace, vtworkers string) *fakevtworkerclient.FakeVtwor flag.Set("vtworker_client_protocol", "fake") fakeVtworkerClient := fakevtworkerclient.NewFakeVtworkerClient() fakeVtworkerClient.RegisterResultForAddr(vtworkers, []string{"Reset"}, "", nil) - fakeVtworkerClient.RegisterResultForAddr(vtworkers, []string{"SplitClone", "--min_healthy_rdonly_tablets=2", keyspace + "/0"}, "", nil) + fakeVtworkerClient.RegisterResultForAddr(vtworkers, []string{"SplitClone", "--min_healthy_tablets=2", keyspace + "/0"}, "", nil) fakeVtworkerClient.RegisterResultForAddr(vtworkers, []string{"Reset"}, "", nil) fakeVtworkerClient.RegisterResultForAddr(vtworkers, []string{"SplitDiff", "--min_healthy_rdonly_tablets=1", "--dest_tablet_type=RDONLY", keyspace + "/-80"}, "", nil) fakeVtworkerClient.RegisterResultForAddr(vtworkers, []string{"Reset"}, "", nil) diff --git a/test/binlog.py b/test/binlog.py index cb677f051ab..31b6ea6bfd5 100755 --- a/test/binlog.py +++ b/test/binlog.py @@ -110,7 +110,7 @@ def setUpModule(): 'SplitClone', '--chunk_count', '10', '--min_rows_per_chunk', '1', - '--min_healthy_rdonly_tablets', '1', + '--min_healthy_tablets', '1', 'test_keyspace/0'], auto_log=True) dst_master.wait_for_binlog_player_count(1) diff --git a/test/initial_sharding.py b/test/initial_sharding.py index cb4fb3428b7..bf21c628ba3 100755 --- a/test/initial_sharding.py +++ b/test/initial_sharding.py @@ -418,7 +418,7 @@ def test_resharding(self): '--exclude_tables', 'unrelated', '--chunk_count', '10', '--min_rows_per_chunk', '1', - '--min_healthy_rdonly_tablets', '1', + '--min_healthy_tablets', '1', 'test_keyspace/0'], worker_rpc_port) utils.wait_procs([workerclient_proc]) @@ -449,7 +449,7 @@ def test_resharding(self): '--exclude_tables', 'unrelated', '--chunk_count', '10', '--min_rows_per_chunk', '1', - '--min_healthy_rdonly_tablets', '1', + '--min_healthy_tablets', '1', 'test_keyspace/0'], worker_rpc_port) utils.wait_procs([workerclient_proc]) diff --git a/test/merge_sharding.py b/test/merge_sharding.py index 72e71e4dbfd..097fbdc72e5 100755 --- a/test/merge_sharding.py +++ b/test/merge_sharding.py @@ -280,7 +280,7 @@ def test_merge_sharding(self): '--offline=false', '--chunk_count', '10', '--min_rows_per_chunk', '1', - '--min_healthy_rdonly_tablets', '1', + '--min_healthy_tablets', '1', 'test_keyspace/-80'], worker_rpc_port) utils.wait_procs([workerclient_proc]) @@ -307,7 +307,7 @@ def test_merge_sharding(self): ['SplitClone', '--chunk_count', '10', '--min_rows_per_chunk', '1', - '--min_healthy_rdonly_tablets', '1', + '--min_healthy_tablets', '1', 'test_keyspace/-80'], worker_rpc_port) utils.wait_procs([workerclient_proc]) diff --git a/test/resharding.py b/test/resharding.py index a942c6dd466..2b153b8b611 100755 --- a/test/resharding.py +++ b/test/resharding.py @@ -706,7 +706,7 @@ def test_resharding(self): '--exclude_tables', 'unrelated', '--chunk_count', '10', '--min_rows_per_chunk', '1', - '--min_healthy_rdonly_tablets', '1', + '--min_healthy_tablets', '1', '--max_tps', '9999', 'test_keyspace/80-'], worker_rpc_port) @@ -731,7 +731,7 @@ def test_resharding(self): '--exclude_tables', 'unrelated', '--chunk_count', '10', '--min_rows_per_chunk', '1', - '--min_healthy_rdonly_tablets', '1', + '--min_healthy_tablets', '1', '--max_tps', '9999', 'test_keyspace/80-'], worker_rpc_port) @@ -758,7 +758,7 @@ def test_resharding(self): '--exclude_tables', 'unrelated', '--chunk_count', '10', '--min_rows_per_chunk', '1', - '--min_healthy_rdonly_tablets', '1', + '--min_healthy_tablets', '1', '--max_tps', '9999', 'test_keyspace/80-'], worker_rpc_port) @@ -793,7 +793,7 @@ def test_resharding(self): '--exclude_tables', 'unrelated', '--chunk_count', '10', '--min_rows_per_chunk', '1', - '--min_healthy_rdonly_tablets', '1', + '--min_healthy_tablets', '1', '--max_tps', '9999', 'test_keyspace/80-'], worker_rpc_port) diff --git a/test/worker.py b/test/worker.py index b84567efb8b..158d6ce3df5 100755 --- a/test/worker.py +++ b/test/worker.py @@ -463,7 +463,7 @@ def verify_successful_worker_copy_with_reparent(self, mysql_down=False): args = ['SplitClone', '--offline=false', '--destination_writer_count', '1', - '--min_healthy_rdonly_tablets', '1', + '--min_healthy_tablets', '1', '--max_tps', '9999'] # Make the clone as slow as necessary such that there is enough time to # run PlannedReparent in the meantime. @@ -570,7 +570,7 @@ def verify_successful_worker_copy_with_reparent(self, mysql_down=False): '--use_v3_resharding_mode=false', 'SplitClone', '--online=false', - '--min_healthy_rdonly_tablets', '1', + '--min_healthy_tablets', '1', 'test_keyspace/0'], auto_log=True) # Make sure that everything is caught up to the same replication point @@ -672,7 +672,7 @@ def split_clone_fails_not_enough_health_rdonly_tablets(self): '--wait_for_healthy_rdonly_tablets_timeout', '1s', '--use_v3_resharding_mode=false', 'SplitClone', - '--min_healthy_rdonly_tablets', '2', + '--min_healthy_tablets', '2', 'test_keyspace/0'], auto_log=True, expect_fail=True)