From 11177847cfb7ee2fe74f99c3ee4b10deef08e15a Mon Sep 17 00:00:00 2001 From: Andres Taylor Date: Tue, 22 Jan 2019 16:31:20 +0100 Subject: [PATCH 1/4] Make SplitClone work with consistent snapshot Signed-off-by: Andres Taylor --- go/cmd/vtworkerclient/vtworkerclient.go | 2 +- go/vt/worker/restartable_result_reader.go | 61 +- go/vt/worker/result_merger.go | 10 + go/vt/worker/result_merger_test.go | 9 + go/vt/worker/result_reader.go | 3 + go/vt/worker/split_clone.go | 750 +++++++++++++--------- go/vt/worker/split_clone_cmd.go | 52 +- go/vt/worker/split_clone_test.go | 2 +- go/vt/worker/vertical_split_clone_cmd.go | 20 +- 9 files changed, 595 insertions(+), 314 deletions(-) diff --git a/go/cmd/vtworkerclient/vtworkerclient.go b/go/cmd/vtworkerclient/vtworkerclient.go index 0e1874453ba..997da727657 100644 --- a/go/cmd/vtworkerclient/vtworkerclient.go +++ b/go/cmd/vtworkerclient/vtworkerclient.go @@ -55,7 +55,7 @@ func main() { logutil.LogEvent(logger, e) }) if err != nil { - log.Error(err) + log.Errorf("%+v", err) os.Exit(1) } } diff --git a/go/vt/worker/restartable_result_reader.go b/go/vt/worker/restartable_result_reader.go index 73fadf04145..84649be867d 100644 --- a/go/vt/worker/restartable_result_reader.go +++ b/go/vt/worker/restartable_result_reader.go @@ -23,6 +23,8 @@ import ( "strings" "time" + "github.com/golang/glog" + "vitess.io/vitess/go/vt/vterrors" "golang.org/x/net/context" @@ -53,6 +55,8 @@ type RestartableResultReader struct { chunk chunk // allowMultipleRetries is true if we are allowed to retry more than once. allowMultipleRetries bool + // if we are running inside a transaction, this will hold a non-zero value + txID int64 query string @@ -82,6 +86,15 @@ func NewRestartableResultReader(ctx context.Context, logger logutil.Logger, tp t allowMultipleRetries: allowMultipleRetries, } + err := tryToConnect(r) + if err != nil { + return nil, err + } + return r, nil +} + +func tryToConnect(r *RestartableResultReader) error { + // If the initial connection fails we retry once. // Note: The first retry will be the second attempt. attempt := 0 @@ -97,15 +110,35 @@ func NewRestartableResultReader(ctx context.Context, logger logutil.Logger, tp t err = fmt.Errorf("tablet=%v: %v", topoproto.TabletAliasString(r.tablet.Alias), err) goto retry } - return r, nil + return nil retry: if !retryable || attempt > 1 { - return nil, fmt.Errorf("failed to initialize tablet connection: retryable %v, %v", retryable, err) + return fmt.Errorf("failed to initialize tablet connection: retryable %v, %v", retryable, err) } statsRetryCount.Add(1) - logger.Infof("retrying after error: %v", err) + glog.Infof("retrying after error: %v", err) + } + +} + +// NewTransactionalRestartableResultReader does the same thing that NewRestartableResultReader does, +// but works inside of a single transaction +func NewTransactionalRestartableResultReader(ctx context.Context, logger logutil.Logger, tp tabletProvider, td *tabletmanagerdatapb.TableDefinition, chunk chunk, allowMultipleRetries bool, txID int64) (*RestartableResultReader, error) { + r := &RestartableResultReader{ + ctx: ctx, + logger: logger, + tp: tp, + td: td, + chunk: chunk, + allowMultipleRetries: allowMultipleRetries, + txID: txID, } + err := tryToConnect(r) + if err != nil { + return nil, err + } + return r, nil } // getTablet (re)sets the tablet which is used for the streaming query. @@ -145,11 +178,21 @@ func (r *RestartableResultReader) getTablet() (bool, error) { func (r *RestartableResultReader) startStream() (bool, error) { // Start the streaming query. r.generateQuery() - stream := queryservice.ExecuteWithStreamer(r.ctx, r.conn, &querypb.Target{ - Keyspace: r.tablet.Keyspace, - Shard: r.tablet.Shard, - TabletType: r.tablet.Type, - }, r.query, make(map[string]*querypb.BindVariable), nil) + var stream sqltypes.ResultStream + + if r.txID == 0 { + stream = queryservice.ExecuteWithStreamer(r.ctx, r.conn, &querypb.Target{ + Keyspace: r.tablet.Keyspace, + Shard: r.tablet.Shard, + TabletType: r.tablet.Type, + }, r.query, make(map[string]*querypb.BindVariable), nil) + } else { + stream = queryservice.ExecuteWithTransactionalStreamer(r.ctx, r.conn, &querypb.Target{ + Keyspace: r.tablet.Keyspace, + Shard: r.tablet.Shard, + TabletType: r.tablet.Type, + }, r.query, make(map[string]*querypb.BindVariable), r.txID, nil) + } // Read the fields information. cols, err := stream.Recv() @@ -387,4 +430,4 @@ func greaterThanTupleWhereClause(columns []string, row []sqltypes.Value) []strin clauses = append(clauses, b.String()) return clauses -} +} \ No newline at end of file diff --git a/go/vt/worker/result_merger.go b/go/vt/worker/result_merger.go index 68250d06ad0..87213242b94 100644 --- a/go/vt/worker/result_merger.go +++ b/go/vt/worker/result_merger.go @@ -21,6 +21,7 @@ import ( "fmt" "io" + "golang.org/x/net/context" "vitess.io/vitess/go/vt/vterrors" "github.com/golang/protobuf/proto" @@ -177,6 +178,15 @@ func (rm *ResultMerger) Next() (*sqltypes.Result, error) { return result, nil } +// Close closes all inputs +func (rm *ResultMerger) Close(ctx context.Context) { + for _, i := range rm.inputs { + i.Close(ctx) + } +} + + + func (rm *ResultMerger) deleteInput(deleteMe ResultReader) { for i, input := range rm.inputs { if input == deleteMe { diff --git a/go/vt/worker/result_merger_test.go b/go/vt/worker/result_merger_test.go index 10b37f3261f..66bfa65eddf 100644 --- a/go/vt/worker/result_merger_test.go +++ b/go/vt/worker/result_merger_test.go @@ -23,6 +23,7 @@ import ( "testing" "time" + "golang.org/x/net/context" "vitess.io/vitess/go/sqltypes" querypb "vitess.io/vitess/go/vt/proto/query" @@ -110,6 +111,10 @@ func (f *fakeResultReader) Fields() []*querypb.Field { return f.fields } +// Close closes nothing +func (f *fakeResultReader) Close(ctx context.Context) { +} + // Next returns the next fake result. It is part of the ResultReader interface. func (f *fakeResultReader) Next() (*sqltypes.Result, error) { if f.rowsReturned == f.rowsTotal { @@ -387,6 +392,10 @@ func (m *memoryResultReader) Next() (*sqltypes.Result, error) { return result, nil } +func (m *memoryResultReader) Close(ctx context.Context) { + // intentionally blank. we have nothing we need to close +} + // benchmarkResult is a package level variable whose sole purpose is to // reference output from the Benchmark* functions below. // This was suggested by http://dave.cheney.net/2013/06/30/how-to-write-benchmarks-in-go diff --git a/go/vt/worker/result_reader.go b/go/vt/worker/result_reader.go index 51ccda42367..efc0fc1f7bb 100644 --- a/go/vt/worker/result_reader.go +++ b/go/vt/worker/result_reader.go @@ -17,6 +17,8 @@ limitations under the License. package worker import ( + "context" + "vitess.io/vitess/go/sqltypes" querypb "vitess.io/vitess/go/vt/proto/query" @@ -39,4 +41,5 @@ type ResultReader interface { // It returns the next result on the stream. // It will return io.EOF if the stream ended. Next() (*sqltypes.Result, error) + Close(ctx context.Context) } diff --git a/go/vt/worker/split_clone.go b/go/vt/worker/split_clone.go index 3cc7638d4a1..8eb5fefa2b6 100644 --- a/go/vt/worker/split_clone.go +++ b/go/vt/worker/split_clone.go @@ -24,14 +24,16 @@ import ( "sync" "time" + "vitess.io/vitess/go/vt/vttablet/tabletconn" + + "vitess.io/vitess/go/vt/binlog/binlogplayer" + "vitess.io/vitess/go/vt/vterrors" "golang.org/x/net/context" "vitess.io/vitess/go/event" "vitess.io/vitess/go/stats" - "vitess.io/vitess/go/sync2" - "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/throttler" @@ -64,28 +66,30 @@ var servingTypes = []topodatapb.TabletType{topodatapb.TabletType_MASTER, topodat type SplitCloneWorker struct { StatusWorker - wr *wrangler.Wrangler - cloneType cloneType - cell string - destinationKeyspace string - shard string - online bool - offline bool + wr *wrangler.Wrangler + cloneType cloneType + cell string + destinationKeyspace string + shard string + online bool + offline bool + useConsistentSnapshot bool // verticalSplit only: List of tables which should be split out. tables []string // horizontalResharding only: List of tables which will be skipped. - excludeTables []string - chunkCount int - minRowsPerChunk int - sourceReaderCount int - writeQueryMaxRows int - writeQueryMaxSize int - destinationWriterCount int - minHealthyRdonlyTablets int - maxTPS int64 - maxReplicationLag int64 - cleaner *wrangler.Cleaner - tabletTracker *TabletTracker + excludeTables []string + chunkCount int + minRowsPerChunk int + sourceReaderCount int + writeQueryMaxRows int + writeQueryMaxSize int + destinationWriterCount int + minHealthyTablets int + tabletType topodatapb.TabletType + maxTPS int64 + maxReplicationLag int64 + cleaner *wrangler.Cleaner + tabletTracker *TabletTracker // populated during WorkerStateInit, read-only after that destinationKeyspaceInfo *topo.KeyspaceInfo @@ -101,6 +105,9 @@ type SplitCloneWorker struct { // populated during WorkerStateFindTargets, read-only after that sourceTablets []*topodatapb.Tablet + lastPos string // contains the GTID position for the source + transactions []int64 + // shardWatchers contains a TopologyWatcher for each source and destination // shard. It updates the list of tablets in the healthcheck if replicas are // added/removed. @@ -118,15 +125,15 @@ type SplitCloneWorker struct { // Throttlers will be added/removed during WorkerStateClone(Online|Offline). throttlers map[string]*throttler.Throttler - // offlineSourceAliases has the list of tablets (per source shard) we took + // sourceAliases has the list of tablets (per source shard) we took // offline for the WorkerStateCloneOffline phase. // Populated shortly before WorkerStateCloneOffline, read-only after that. - offlineSourceAliases []*topodatapb.TabletAlias + sourceAliases []*topodatapb.TabletAlias // formattedOfflineSourcesMu guards all fields in this group. formattedOfflineSourcesMu sync.Mutex // formattedOfflineSources is a space separated list of - // "offlineSourceAliases". It is used by the StatusAs* methods to output the + // "sourceAliases". It is used by the StatusAs* methods to output the // used source tablets during the offline clone phase. formattedOfflineSources string @@ -140,20 +147,20 @@ type SplitCloneWorker struct { } // newSplitCloneWorker returns a new worker object for the SplitClone command. -func newSplitCloneWorker(wr *wrangler.Wrangler, cell, keyspace, shard string, online, offline bool, excludeTables []string, chunkCount, minRowsPerChunk, sourceReaderCount, writeQueryMaxRows, writeQueryMaxSize, destinationWriterCount, minHealthyRdonlyTablets int, maxTPS, maxReplicationLag int64) (Worker, error) { - return newCloneWorker(wr, horizontalResharding, cell, keyspace, shard, online, offline, nil /* tables */, excludeTables, chunkCount, minRowsPerChunk, sourceReaderCount, writeQueryMaxRows, writeQueryMaxSize, destinationWriterCount, minHealthyRdonlyTablets, maxTPS, maxReplicationLag) +func newSplitCloneWorker(wr *wrangler.Wrangler, cell, keyspace, shard string, online, offline bool, excludeTables []string, chunkCount, minRowsPerChunk, sourceReaderCount, writeQueryMaxRows, writeQueryMaxSize, destinationWriterCount, minHealthyTablets int, tabletType topodatapb.TabletType, maxTPS, maxReplicationLag int64, useConsistentSnapshot bool) (Worker, error) { + return newCloneWorker(wr, horizontalResharding, cell, keyspace, shard, online, offline, nil /* tables */, excludeTables, chunkCount, minRowsPerChunk, sourceReaderCount, writeQueryMaxRows, writeQueryMaxSize, destinationWriterCount, minHealthyTablets, tabletType, maxTPS, maxReplicationLag, useConsistentSnapshot) } // newVerticalSplitCloneWorker returns a new worker object for the // VerticalSplitClone command. -func newVerticalSplitCloneWorker(wr *wrangler.Wrangler, cell, keyspace, shard string, online, offline bool, tables []string, chunkCount, minRowsPerChunk, sourceReaderCount, writeQueryMaxRows, writeQueryMaxSize, destinationWriterCount, minHealthyRdonlyTablets int, maxTPS, maxReplicationLag int64) (Worker, error) { - return newCloneWorker(wr, verticalSplit, cell, keyspace, shard, online, offline, tables, nil /* excludeTables */, chunkCount, minRowsPerChunk, sourceReaderCount, writeQueryMaxRows, writeQueryMaxSize, destinationWriterCount, minHealthyRdonlyTablets, maxTPS, maxReplicationLag) +func newVerticalSplitCloneWorker(wr *wrangler.Wrangler, cell, keyspace, shard string, online, offline bool, tables []string, chunkCount, minRowsPerChunk, sourceReaderCount, writeQueryMaxRows, writeQueryMaxSize, destinationWriterCount, minHealthyTablets int, tabletType topodatapb.TabletType, maxTPS, maxReplicationLag int64, useConsistentSnapshot bool) (Worker, error) { + return newCloneWorker(wr, verticalSplit, cell, keyspace, shard, online, offline, tables, nil /* excludeTables */, chunkCount, minRowsPerChunk, sourceReaderCount, writeQueryMaxRows, writeQueryMaxSize, destinationWriterCount, minHealthyTablets, tabletType, maxTPS, maxReplicationLag, useConsistentSnapshot) } // newCloneWorker returns a new SplitCloneWorker object which is used both by // the SplitClone and VerticalSplitClone command. // TODO(mberlin): Rename SplitCloneWorker to cloneWorker. -func newCloneWorker(wr *wrangler.Wrangler, cloneType cloneType, cell, keyspace, shard string, online, offline bool, tables, excludeTables []string, chunkCount, minRowsPerChunk, sourceReaderCount, writeQueryMaxRows, writeQueryMaxSize, destinationWriterCount, minHealthyRdonlyTablets int, maxTPS, maxReplicationLag int64) (Worker, error) { +func newCloneWorker(wr *wrangler.Wrangler, cloneType cloneType, cell, keyspace, shard string, online, offline bool, tables, excludeTables []string, chunkCount, minRowsPerChunk, sourceReaderCount, writeQueryMaxRows, writeQueryMaxSize, destinationWriterCount, minHealthyTablets int, tabletType topodatapb.TabletType, maxTPS, maxReplicationLag int64, useConsistentSnapshot bool) (Worker, error) { if cloneType != horizontalResharding && cloneType != verticalSplit { return nil, fmt.Errorf("unknown cloneType: %v This is a bug. Please report", cloneType) } @@ -183,8 +190,8 @@ func newCloneWorker(wr *wrangler.Wrangler, cloneType cloneType, cell, keyspace, if destinationWriterCount <= 0 { return nil, fmt.Errorf("destination_writer_count must be > 0: %v", destinationWriterCount) } - if minHealthyRdonlyTablets < 0 { - return nil, fmt.Errorf("min_healthy_rdonly_tablets must be >= 0: %v", minHealthyRdonlyTablets) + if minHealthyTablets < 0 { + return nil, fmt.Errorf("min_healthy_tablets must be >= 0: %v", minHealthyTablets) } if maxTPS != throttler.MaxRateModuleDisabled { wr.Logger().Infof("throttling enabled and set to a max of %v transactions/second", maxTPS) @@ -195,35 +202,39 @@ func newCloneWorker(wr *wrangler.Wrangler, cloneType cloneType, cell, keyspace, if maxReplicationLag <= 0 { return nil, fmt.Errorf("max_replication_lag must be >= 1s: %v", maxReplicationLag) } + if tabletType != topodatapb.TabletType_REPLICA && tabletType != topodatapb.TabletType_RDONLY { + return nil, fmt.Errorf("tablet_type must be RDONLY or REPLICA: %v", topodatapb.TabletType_name[int32(tabletType)]) + } scw := &SplitCloneWorker{ - StatusWorker: NewStatusWorker(), - wr: wr, - cloneType: cloneType, - cell: cell, - destinationKeyspace: keyspace, - shard: shard, - online: online, - offline: offline, - tables: tables, - excludeTables: excludeTables, - chunkCount: chunkCount, - minRowsPerChunk: minRowsPerChunk, - sourceReaderCount: sourceReaderCount, - writeQueryMaxRows: writeQueryMaxRows, - writeQueryMaxSize: writeQueryMaxSize, - destinationWriterCount: destinationWriterCount, - minHealthyRdonlyTablets: minHealthyRdonlyTablets, - maxTPS: maxTPS, - maxReplicationLag: maxReplicationLag, - cleaner: &wrangler.Cleaner{}, - tabletTracker: NewTabletTracker(), - throttlers: make(map[string]*throttler.Throttler), - - destinationDbNames: make(map[string]string), - + StatusWorker: NewStatusWorker(), + wr: wr, + cloneType: cloneType, + cell: cell, + destinationKeyspace: keyspace, + shard: shard, + online: online, + offline: offline, + tables: tables, + excludeTables: excludeTables, + chunkCount: chunkCount, + minRowsPerChunk: minRowsPerChunk, + sourceReaderCount: sourceReaderCount, + writeQueryMaxRows: writeQueryMaxRows, + writeQueryMaxSize: writeQueryMaxSize, + destinationWriterCount: destinationWriterCount, + minHealthyTablets: minHealthyTablets, + maxTPS: maxTPS, + maxReplicationLag: maxReplicationLag, + cleaner: &wrangler.Cleaner{}, + tabletTracker: NewTabletTracker(), + tabletType: tabletType, tableStatusListOnline: &tableStatusList{}, tableStatusListOffline: &tableStatusList{}, + useConsistentSnapshot: useConsistentSnapshot, + + throttlers: make(map[string]*throttler.Throttler), + destinationDbNames: make(map[string]string), } scw.initializeEventDescriptor() return scw, nil @@ -424,6 +435,29 @@ func (scw *SplitCloneWorker) Run(ctx context.Context) error { return nil } +func (scw *SplitCloneWorker) disableUniquenessCheckOnDestinationTablets(ctx context.Context) error { + for _, si := range scw.destinationShards { + tablets := scw.tsc.GetHealthyTabletStats(si.Keyspace(), si.ShardName(), topodatapb.TabletType_MASTER) + if len(tablets) != 1 { + return fmt.Errorf("should have exactly one MASTER tablet, have %v", len(tablets)) + } + tablet := tablets[0].Tablet + cmd := "SET UNIQUE_CHECKS=0" + scw.wr.Logger().Infof("disabling uniqueness checks on %v", topoproto.TabletAliasString(tablet.Alias)) + _, err := scw.wr.TabletManagerClient().ExecuteFetchAsApp(ctx, tablet, true, []byte(cmd), 0) + if err != nil { + return fmt.Errorf("should have exactly one MASTER tablet, have %v", len(tablets)) + } + scw.cleaner.Record("EnableUniquenessChecks", topoproto.TabletAliasString(tablet.Alias), func(ctx context.Context, wr *wrangler.Wrangler) error { + cmd := "SET UNIQUE_CHECKS=1" + _, err := scw.wr.TabletManagerClient().ExecuteFetchAsApp(ctx, tablet, true, []byte(cmd), 0) + return err + }) + } + + return nil +} + func (scw *SplitCloneWorker) run(ctx context.Context) error { // Phase 1: read what we need to do. if err := scw.init(ctx); err != nil { @@ -476,9 +510,15 @@ func (scw *SplitCloneWorker) run(ctx context.Context) error { time.Sleep(1 * time.Second) } - // 4a: Take source tablets out of serving for an exact snapshot. - if err := scw.findOfflineSourceTablets(ctx); err != nil { - return vterrors.Wrap(err, "findSourceTablets() failed") + // 4a: Make sure the sources are producing a stable view of the data + if scw.useConsistentSnapshot { + if err := scw.findTransactionalSources(ctx); err != nil { + return vterrors.Wrap(err, "findSourceTablets() failed") + } + } else { + if err := scw.findOfflineSourceTablets(ctx); err != nil { + return vterrors.Wrap(err, "findSourceTablets() failed") + } } if err := checkDone(ctx); err != nil { return err @@ -489,6 +529,10 @@ func (scw *SplitCloneWorker) run(ctx context.Context) error { if err := scw.clone(ctx, WorkerStateCloneOffline); err != nil { return vterrors.Wrap(err, "offline clone() failed") } + if err := scw.setUpVReplication(ctx); err != nil { + return fmt.Errorf("failed to set up replication: %v", err) + } + d := time.Since(start) if err := checkDone(ctx); err != nil { return err @@ -583,6 +627,10 @@ func (scw *SplitCloneWorker) initShardsForHorizontalResharding(ctx context.Conte scw.destinationShards = os.Left } + if scw.useConsistentSnapshot && len(scw.sourceShards) > 1 { + return fmt.Errorf("cannot use consistent snapshot against multiple source shards") + } + return nil } @@ -705,20 +753,20 @@ func (scw *SplitCloneWorker) findOfflineSourceTablets(ctx context.Context) error scw.setState(WorkerStateFindTargets) // find an appropriate tablet in the source shards - scw.offlineSourceAliases = make([]*topodatapb.TabletAlias, len(scw.sourceShards)) + scw.sourceAliases = make([]*topodatapb.TabletAlias, len(scw.sourceShards)) for i, si := range scw.sourceShards { var err error - scw.offlineSourceAliases[i], err = FindWorkerTablet(ctx, scw.wr, scw.cleaner, scw.tsc, scw.cell, si.Keyspace(), si.ShardName(), scw.minHealthyRdonlyTablets, topodatapb.TabletType_RDONLY) + scw.sourceAliases[i], err = FindWorkerTablet(ctx, scw.wr, scw.cleaner, scw.tsc, scw.cell, si.Keyspace(), si.ShardName(), scw.minHealthyTablets, scw.tabletType) if err != nil { return vterrors.Wrapf(err, "FindWorkerTablet() failed for %v/%v/%v", scw.cell, si.Keyspace(), si.ShardName()) } - scw.wr.Logger().Infof("Using tablet %v as source for %v/%v", topoproto.TabletAliasString(scw.offlineSourceAliases[i]), si.Keyspace(), si.ShardName()) + scw.wr.Logger().Infof("Using tablet %v as source for %v/%v", topoproto.TabletAliasString(scw.sourceAliases[i]), si.Keyspace(), si.ShardName()) } - scw.setFormattedOfflineSources(scw.offlineSourceAliases) + scw.setFormattedOfflineSources(scw.sourceAliases) // get the tablet info for them, and stop their replication - scw.sourceTablets = make([]*topodatapb.Tablet, len(scw.offlineSourceAliases)) - for i, alias := range scw.offlineSourceAliases { + scw.sourceTablets = make([]*topodatapb.Tablet, len(scw.sourceAliases)) + for i, alias := range scw.sourceAliases { shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) ti, err := scw.wr.TopoServer().GetTablet(shortCtx, alias) cancel() @@ -740,6 +788,44 @@ func (scw *SplitCloneWorker) findOfflineSourceTablets(ctx context.Context) error return nil } +// findTransactionalSources phase: +// - get the aliases of all the source tablets +func (scw *SplitCloneWorker) findTransactionalSources(ctx context.Context) error { + scw.setState(WorkerStateFindTargets) + + if len(scw.sourceShards) > 1 { + return fmt.Errorf("consistent snapshot can only be used with a single source shard") + } + var err error + + // find an appropriate tablet in the source shard + si := scw.sourceShards[0] + scw.sourceAliases = make([]*topodatapb.TabletAlias, 1) + scw.sourceAliases[0], err = FindHealthyTablet(ctx, scw.wr, scw.tsc, scw.cell, si.Keyspace(), si.ShardName(), scw.minHealthyTablets, scw.tabletType) + if err != nil { + return fmt.Errorf("FindHealthyTablet() failed for %v/%v/%v: %v", scw.cell, si.Keyspace(), si.ShardName(), err) + } + scw.wr.Logger().Infof("Using tablet %v as source for %v/%v", topoproto.TabletAliasString(scw.sourceAliases[0]), si.Keyspace(), si.ShardName()) + scw.setFormattedOfflineSources(scw.sourceAliases) + + // get the tablet info + scw.sourceTablets = make([]*topodatapb.Tablet, 1) + shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) + ti, err := scw.wr.TopoServer().GetTablet(shortCtx, scw.sourceAliases[0]) + cancel() + if err != nil { + return fmt.Errorf("cannot read tablet %v: %v", topoproto.TabletAliasString(scw.sourceAliases[0]), err) + } + scw.sourceTablets[0] = ti.Tablet + + // stop replication and create transactions to work on + txs, gtid, err := CreateConsistentTransactions(ctx, ti, scw.wr, scw.cleaner, scw.sourceReaderCount) + scw.wr.Logger().Infof("created %v transactions", len(txs)) + scw.lastPos = gtid + scw.transactions = txs + return nil +} + // findDestinationMasters finds for each destination shard the current master. func (scw *SplitCloneWorker) findDestinationMasters(ctx context.Context) error { scw.setState(WorkerStateFindTargets) @@ -748,8 +834,9 @@ func (scw *SplitCloneWorker) findDestinationMasters(ctx context.Context) error { scw.wr.Logger().Infof("Finding a MASTER tablet for each destination shard...") for _, si := range scw.destinationShards { waitCtx, waitCancel := context.WithTimeout(ctx, *waitForHealthyTabletsTimeout) - defer waitCancel() - if err := scw.tsc.WaitForTablets(waitCtx, scw.cell, si.Keyspace(), si.ShardName(), topodatapb.TabletType_MASTER); err != nil { + err := scw.tsc.WaitForTablets(waitCtx, scw.cell, si.Keyspace(), si.ShardName(), topodatapb.TabletType_MASTER) + waitCancel() + if err != nil { return vterrors.Wrapf(err, "cannot find MASTER tablet for destination shard for %v/%v (in cell: %v)", si.Keyspace(), si.ShardName(), scw.cell) } masters := scw.tsc.GetHealthyTabletStats(si.Keyspace(), si.ShardName(), topodatapb.TabletType_MASTER) @@ -775,18 +862,18 @@ func (scw *SplitCloneWorker) waitForTablets(ctx context.Context, shardInfos []*t var wg sync.WaitGroup rec := concurrency.AllErrorRecorder{} - if scw.minHealthyRdonlyTablets > 0 && len(shardInfos) > 0 { - scw.wr.Logger().Infof("Waiting %v for %d %s/%s RDONLY tablet(s)", timeout, scw.minHealthyRdonlyTablets, shardInfos[0].Keyspace(), shardInfos[0].ShardName()) + if scw.minHealthyTablets > 0 && len(shardInfos) > 0 { + scw.wr.Logger().Infof("Waiting %v for %d %s/%s RDONLY tablet(s)", timeout, scw.minHealthyTablets, shardInfos[0].Keyspace(), shardInfos[0].ShardName()) } for _, si := range shardInfos { wg.Add(1) go func(keyspace, shard string) { defer wg.Done() - // We wait for --min_healthy_rdonly_tablets because we will use several + // We wait for --min_healthy_tablets because we will use several // tablets per shard to spread reading the chunks of rows across as many // tablets as possible. - if _, err := waitForHealthyTablets(ctx, scw.wr, scw.tsc, scw.cell, keyspace, shard, scw.minHealthyRdonlyTablets, timeout, topodatapb.TabletType_RDONLY); err != nil { + if _, err := waitForHealthyTablets(ctx, scw.wr, scw.tsc, scw.cell, keyspace, shard, scw.minHealthyTablets, timeout, scw.tabletType); err != nil { rec.RecordError(err) } }(si.Keyspace(), si.ShardName()) @@ -795,6 +882,228 @@ func (scw *SplitCloneWorker) waitForTablets(ctx context.Context, shardInfos []*t return rec.Error() } +func (scw *SplitCloneWorker) findFirstSourceTablet(ctx context.Context, state StatusWorkerState) (*topodatapb.Tablet, error) { + if state == WorkerStateCloneOffline { + // Use the first source tablet which we took offline. + return scw.sourceTablets[0], nil + } + + // Pick any healthy serving source tablet. + si := scw.sourceShards[0] + tablets := scw.tsc.GetHealthyTabletStats(si.Keyspace(), si.ShardName(), scw.tabletType) + if len(tablets) == 0 { + // We fail fast on this problem and don't retry because at the start all tablets should be healthy. + return nil, fmt.Errorf("no healthy %v tablet in source shard (%v) available (required to find out the schema)", topodatapb.TabletType_name[int32(scw.tabletType)], topoproto.KeyspaceShardString(si.Keyspace(), si.ShardName())) + } + return tablets[0].Tablet, nil +} + +func (scw *SplitCloneWorker) getCounters(state StatusWorkerState) ([]*stats.CountersWithSingleLabel, *tableStatusList) { + switch state { + case WorkerStateCloneOnline: + return []*stats.CountersWithSingleLabel{statsOnlineInsertsCounters, statsOnlineUpdatesCounters, statsOnlineDeletesCounters, statsOnlineEqualRowsCounters}, + scw.tableStatusListOnline + case WorkerStateCloneOffline: + return []*stats.CountersWithSingleLabel{statsOfflineInsertsCounters, statsOfflineUpdatesCounters, statsOfflineDeletesCounters, statsOfflineEqualRowsCounters}, + scw.tableStatusListOffline + default: + panic("should not happen") + } +} + +func (scw *SplitCloneWorker) startExecutor(ctx context.Context, wg *sync.WaitGroup, keyspace, shard string, insertChannel chan string, threadID int, processError func(string, ...interface{})) { + defer wg.Done() + t := scw.getThrottler(keyspace, shard) + //defer t.ThreadFinished(threadID) + + executor := newExecutor(scw.wr, scw.tsc, t, keyspace, shard, threadID) + if err := executor.fetchLoop(ctx, insertChannel); err != nil { + processError("executer.FetchLoop failed: %v", err) + } +} + +func mergeOrSingle(readers []ResultReader, td *tabletmanagerdatapb.TableDefinition) (ResultReader, error) { + if len(readers) == 1 { + return readers[0], nil + } + + sourceReader, err := NewResultMerger(readers, len(td.PrimaryKeyColumns)) + if err != nil { + return nil, err + } + + return sourceReader, nil +} + +func (scw *SplitCloneWorker) getSourceResultReader(ctx context.Context, td *tabletmanagerdatapb.TableDefinition, state StatusWorkerState, chunk chunk, txID int64) (ResultReader, error) { + sourceReaders := make([]ResultReader, len(scw.sourceShards)) + for shardIndex, si := range scw.sourceShards { + var sourceResultReader ResultReader + var err error + if state == WorkerStateCloneOffline && scw.useConsistentSnapshot { + if txID < 1 { + return nil, fmt.Errorf("tried using consistent snapshot without a valid transaction") + } + tp := newShardTabletProvider(scw.tsc, scw.tabletTracker, si.Keyspace(), si.ShardName(), scw.tabletType) + sourceResultReader, err = NewTransactionalRestartableResultReader(ctx, scw.wr.Logger(), tp, td, chunk, false, txID) + } else { + var tp tabletProvider + allowMultipleRetries := true + if state == WorkerStateCloneOffline { + tp = newSingleTabletProvider(ctx, scw.wr.TopoServer(), scw.sourceAliases[shardIndex]) + // allowMultipleRetries is false to avoid that we'll keep retrying + // on the same tablet alias for hours. This guards us against the + // situation that an offline tablet gets restarted and serves again. + // In that case we cannot use it because its replication is no + // longer stopped at the same point as we took it offline initially. + allowMultipleRetries = false + } else { + tp = newShardTabletProvider(scw.tsc, scw.tabletTracker, si.Keyspace(), si.ShardName(), scw.tabletType) + } + sourceResultReader, err = NewRestartableResultReader(ctx, scw.wr.Logger(), tp, td, chunk, allowMultipleRetries) + if err != nil { + return nil, fmt.Errorf("NewRestartableResultReader for source: %v failed: %v", tp.description(), err) + } + } + // TODO: We could end up in a situation where some readers have been created but not all. In this situation, we would not close up all readers + sourceReaders[shardIndex] = sourceResultReader + } + return mergeOrSingle(sourceReaders, td) +} + +func (scw *SplitCloneWorker) getDestinationResultReader(ctx context.Context, td *tabletmanagerdatapb.TableDefinition, state StatusWorkerState, chunk chunk) (ResultReader, error) { + destReaders := make([]ResultReader, len(scw.destinationShards)) + for shardIndex, si := range scw.destinationShards { + tp := newShardTabletProvider(scw.tsc, scw.tabletTracker, si.Keyspace(), si.ShardName(), topodatapb.TabletType_MASTER) + destResultReader, err := NewRestartableResultReader(ctx, scw.wr.Logger(), tp, td, chunk, true /* allowMultipleRetries */) + if err != nil { + return nil, fmt.Errorf("NewRestartableResultReader for destination: %v failed: %v", tp.description(), err) + } + destReaders[shardIndex] = destResultReader + } + return mergeOrSingle(destReaders, td) +} + +func (scw *SplitCloneWorker) cloneAChunk(ctx context.Context, td *tabletmanagerdatapb.TableDefinition, tableIndex int, chunk chunk, processError func(string, ...interface{}), state StatusWorkerState, tableStatusList *tableStatusList, keyResolver keyspaceIDResolver, start time.Time, insertChannels []chan string, txID int64, statsCounters []*stats.CountersWithSingleLabel) { + errPrefix := fmt.Sprintf("table=%v chunk=%v", td.Name, chunk) + + var err error + + if err := checkDone(ctx); err != nil { + processError("%v: Context expired while this thread was waiting for its turn. Context error: %v", errPrefix, err) + return + } + + tableStatusList.threadStarted(tableIndex) + defer tableStatusList.threadDone(tableIndex) + + if state == WorkerStateCloneOnline { + // Wait for enough healthy tablets (they might have become unhealthy + // and their replication lag might have increased since we started.) + if err := scw.waitForTablets(ctx, scw.sourceShards, *retryDuration); err != nil { + processError("%v: No healthy source tablets found (gave up after %v): %v", errPrefix, time.Since(start), err) + return + } + } + + // Set up readers for the diff. There will be one reader for every + // source and destination shard. + sourceReader, err := scw.getSourceResultReader(ctx, td, state, chunk, txID) + if err != nil { + processError("%v NewResultMerger for source tablets failed: %v", errPrefix, err) + return + } + defer sourceReader.Close(ctx) + destReader, err := scw.getDestinationResultReader(ctx, td, state, chunk) + if err != nil { + processError("%v NewResultMerger for destinations tablets failed: %v", errPrefix, err) + return + } + defer destReader.Close(ctx) + dbNames := make([]string, len(scw.destinationShards)) + for i, si := range scw.destinationShards { + keyspaceAndShard := topoproto.KeyspaceShardString(si.Keyspace(), si.ShardName()) + dbNames[i] = scw.destinationDbNames[keyspaceAndShard] + } + // Compare the data and reconcile any differences. + differ, err := NewRowDiffer2(ctx, sourceReader, destReader, td, tableStatusList, tableIndex, + scw.destinationShards, keyResolver, + insertChannels, ctx.Done(), dbNames, scw.writeQueryMaxRows, scw.writeQueryMaxSize, statsCounters) + if err != nil { + processError("%v: NewRowDiffer2 failed: %v", errPrefix, err) + return + } + // Ignore the diff report because all diffs should get reconciled. + _ /* DiffReport */, err = differ.Diff() + if err != nil { + processError("%v: RowDiffer2 failed: %v", errPrefix, err) + return + } +} + +type workUnit struct { + td *tabletmanagerdatapb.TableDefinition + chunk chunk + threadID int + resolver keyspaceIDResolver +} + +func (scw *SplitCloneWorker) startCloningData(ctx context.Context, state StatusWorkerState, sourceSchemaDefinition *tabletmanagerdatapb.SchemaDefinition, + processError func(string, ...interface{}), firstSourceTablet *topodatapb.Tablet, tableStatusList *tableStatusList, + start time.Time, statsCounters []*stats.CountersWithSingleLabel, insertChannels []chan string, wg *sync.WaitGroup) error { + + workPipeline := make(chan workUnit, 10) // We'll use a small buffer so producers do not run too far ahead of consumers + queryService, err := tabletconn.GetDialer()(firstSourceTablet, true) + if err != nil { + return fmt.Errorf("failed to create queryService: %v", err) + } + defer queryService.Close(ctx) + + // Let's start the work consumers + for i := 0; i < scw.sourceReaderCount; i++ { + var txID int64 + if scw.useConsistentSnapshot && state == WorkerStateCloneOffline { + txID = scw.transactions[i] + } else { + txID = -1 + } + + wg.Add(1) + go func() { + defer wg.Done() + for work := range workPipeline { + scw.cloneAChunk(ctx, work.td, work.threadID, work.chunk, processError, state, tableStatusList, work.resolver, start, insertChannels, txID, statsCounters) + } + }() + } + + // And now let's start producing work units + for tableIndex, td := range sourceSchemaDefinition.TableDefinitions { + td = reorderColumnsPrimaryKeyFirst(td) + + keyResolver, err := scw.createKeyResolver(td) + if err != nil { + return fmt.Errorf("cannot resolve sharding keys for keyspace %v: %v", scw.destinationKeyspace, err) + } + + // TODO(mberlin): We're going to chunk *all* source shards based on the MIN + // and MAX values of the *first* source shard. Is this going to be a problem? + chunks, err := generateChunks(ctx, scw.wr, firstSourceTablet, td, scw.chunkCount, scw.minRowsPerChunk) + if err != nil { + return fmt.Errorf("failed to split table into chunks: %v", err) + } + tableStatusList.setThreadCount(tableIndex, len(chunks)) + + for _, c := range chunks { + workPipeline <- workUnit{td: td, chunk: c, threadID: tableIndex, resolver: keyResolver} + } + } + + close(workPipeline) + + return nil +} + // copy phase: // - copy the data from source tablets to destination masters (with replication on) // Assumes that the schema has already been created on each destination tablet @@ -809,31 +1118,13 @@ func (scw *SplitCloneWorker) clone(ctx context.Context, state StatusWorkerState) statsStateDurationsNs.Set(string(state), time.Now().Sub(start).Nanoseconds()) }() - var firstSourceTablet *topodatapb.Tablet - if state == WorkerStateCloneOffline { - // Use the first source tablet which we took offline. - firstSourceTablet = scw.sourceTablets[0] - } else { - // Pick any healthy serving source tablet. - si := scw.sourceShards[0] - tablets := scw.tsc.GetHealthyTabletStats(si.Keyspace(), si.ShardName(), topodatapb.TabletType_RDONLY) - if len(tablets) == 0 { - // We fail fast on this problem and don't retry because at the start all tablets should be healthy. - return fmt.Errorf("no healthy RDONLY tablet in source shard (%v) available (required to find out the schema)", topoproto.KeyspaceShardString(si.Keyspace(), si.ShardName())) - } - firstSourceTablet = tablets[0].Tablet - } - var statsCounters []*stats.CountersWithSingleLabel - var tableStatusList *tableStatusList - switch state { - case WorkerStateCloneOnline: - statsCounters = []*stats.CountersWithSingleLabel{statsOnlineInsertsCounters, statsOnlineUpdatesCounters, statsOnlineDeletesCounters, statsOnlineEqualRowsCounters} - tableStatusList = scw.tableStatusListOnline - case WorkerStateCloneOffline: - statsCounters = []*stats.CountersWithSingleLabel{statsOfflineInsertsCounters, statsOfflineUpdatesCounters, statsOfflineDeletesCounters, statsOfflineEqualRowsCounters} - tableStatusList = scw.tableStatusListOffline + var firstSourceTablet, err = scw.findFirstSourceTablet(ctx, state) + if err != nil { + return err } + statsCounters, tableStatusList := scw.getCounters(state) + // The throttlers exist only for the duration of this clone() call. // That means a SplitClone invocation with both online and offline phases // will create throttlers for each phase. @@ -849,22 +1140,19 @@ func (scw *SplitCloneWorker) clone(ctx context.Context, state StatusWorkerState) scw.wr.Logger().Infof("Source tablet 0 has %v tables to copy", len(sourceSchemaDefinition.TableDefinitions)) tableStatusList.initialize(sourceSchemaDefinition) - // In parallel, setup the channels to send SQL data chunks to for each destination tablet: - // - // mu protects the context for cancelation, and firstError - mu := sync.Mutex{} var firstError error ctx, cancelCopy := context.WithCancel(ctx) defer cancelCopy() processError := func(format string, args ...interface{}) { - mu.Lock() + // in theory we could have two threads see firstError as null and both write to the variable + // that should not cause any problems though - canceling and logging is concurrently safe, + // and overwriting the variable will not cause any problems + scw.wr.Logger().Errorf(format, args...) if firstError == nil { - scw.wr.Logger().Errorf(format, args...) firstError = fmt.Errorf(format, args...) cancelCopy() } - mu.Unlock() } // NOTE: Code below this point must *not* use "return" to exit this Go routine @@ -877,6 +1165,7 @@ func (scw *SplitCloneWorker) clone(ctx context.Context, state StatusWorkerState) // races between "defer throttler.ThreadFinished()" (must be executed first) // and "defer scw.closeThrottlers()". Otherwise, vtworker will panic. + // In parallel, setup the channels to send SQL data chunks to for each destination tablet: insertChannels := make([]chan string, len(scw.destinationShards)) destinationWaitGroup := sync.WaitGroup{} for shardIndex, si := range scw.destinationShards { @@ -888,168 +1177,41 @@ func (scw *SplitCloneWorker) clone(ctx context.Context, state StatusWorkerState) for j := 0; j < scw.destinationWriterCount; j++ { destinationWaitGroup.Add(1) - go func(keyspace, shard string, insertChannel chan string, throttler *throttler.Throttler, threadID int) { - defer destinationWaitGroup.Done() - defer throttler.ThreadFinished(threadID) - - executor := newExecutor(scw.wr, scw.tsc, throttler, keyspace, shard, threadID) - if err := executor.fetchLoop(ctx, insertChannel); err != nil { - processError("executer.FetchLoop failed: %v", err) - } - }(si.Keyspace(), si.ShardName(), insertChannels[shardIndex], scw.getThrottler(si.Keyspace(), si.ShardName()), j) + go scw.startExecutor(ctx, &destinationWaitGroup, si.Keyspace(), si.ShardName(), insertChannels[shardIndex], j, processError) } } // Now for each table, read data chunks and send them to all // insertChannels - sourceWaitGroup := sync.WaitGroup{} - sema := sync2.NewSemaphore(scw.sourceReaderCount, 0) - for tableIndex, td := range sourceSchemaDefinition.TableDefinitions { - td = reorderColumnsPrimaryKeyFirst(td) - - keyResolver, err := scw.createKeyResolver(td) - if err != nil { - processError("cannot resolve sharding keys for keyspace %v: %v", scw.destinationKeyspace, err) - break - } - - // TODO(mberlin): We're going to chunk *all* source shards based on the MIN - // and MAX values of the *first* source shard. Is this going to be a problem? - chunks, err := generateChunks(ctx, scw.wr, firstSourceTablet, td, scw.chunkCount, scw.minRowsPerChunk) - if err != nil { - processError("failed to split table into chunks: %v", err) - break - } - tableStatusList.setThreadCount(tableIndex, len(chunks)) - - for _, c := range chunks { - sourceWaitGroup.Add(1) - go func(td *tabletmanagerdatapb.TableDefinition, tableIndex int, chunk chunk) { - defer sourceWaitGroup.Done() - errPrefix := fmt.Sprintf("table=%v chunk=%v", td.Name, chunk) - - // We need our own error per Go routine to avoid races. - var err error - - sema.Acquire() - defer sema.Release() - - if err := checkDone(ctx); err != nil { - processError("%v: Context expired while this thread was waiting for its turn. Context error: %v", errPrefix, err) - return - } - - tableStatusList.threadStarted(tableIndex) - defer tableStatusList.threadDone(tableIndex) - - if state == WorkerStateCloneOnline { - // Wait for enough healthy tablets (they might have become unhealthy - // and their replication lag might have increased since we started.) - if err := scw.waitForTablets(ctx, scw.sourceShards, *retryDuration); err != nil { - processError("%v: No healthy source tablets found (gave up after %v): %v", errPrefix, time.Since(start), err) - return - } - } - - // Set up readers for the diff. There will be one reader for every - // source and destination shard. - sourceReaders := make([]ResultReader, len(scw.sourceShards)) - destReaders := make([]ResultReader, len(scw.destinationShards)) - for shardIndex, si := range scw.sourceShards { - var tp tabletProvider - allowMultipleRetries := true - if state == WorkerStateCloneOffline { - tp = newSingleTabletProvider(ctx, scw.wr.TopoServer(), scw.offlineSourceAliases[shardIndex]) - // allowMultipleRetries is false to avoid that we'll keep retrying - // on the same tablet alias for hours. This guards us against the - // situation that an offline tablet gets restarted and serves again. - // In that case we cannot use it because its replication is no - // longer stopped at the same point as we took it offline initially. - allowMultipleRetries = false - } else { - tp = newShardTabletProvider(scw.tsc, scw.tabletTracker, si.Keyspace(), si.ShardName(), topodatapb.TabletType_RDONLY) - } - sourceResultReader, err := NewRestartableResultReader(ctx, scw.wr.Logger(), tp, td, chunk, allowMultipleRetries) - if err != nil { - processError("%v: NewRestartableResultReader for source: %v failed: %v", errPrefix, tp.description(), err) - return - } - defer sourceResultReader.Close(ctx) - sourceReaders[shardIndex] = sourceResultReader - } - - for shardIndex, si := range scw.destinationShards { - tp := newShardTabletProvider(scw.tsc, scw.tabletTracker, si.Keyspace(), si.ShardName(), topodatapb.TabletType_MASTER) - destResultReader, err := NewRestartableResultReader(ctx, scw.wr.Logger(), tp, td, chunk, true /* allowMultipleRetries */) - if err != nil { - processError("%v: NewRestartableResultReader for destination: %v failed: %v", errPrefix, tp.description(), err) - return - } - defer destResultReader.Close(ctx) - destReaders[shardIndex] = destResultReader - } - - var sourceReader ResultReader - var destReader ResultReader - if len(sourceReaders) >= 2 { - sourceReader, err = NewResultMerger(sourceReaders, len(td.PrimaryKeyColumns)) - if err != nil { - processError("%v: NewResultMerger for source tablets failed: %v", errPrefix, err) - return - } - } else { - sourceReader = sourceReaders[0] - } - if len(destReaders) >= 2 { - destReader, err = NewResultMerger(destReaders, len(td.PrimaryKeyColumns)) - if err != nil { - processError("%v: NewResultMerger for destination tablets failed: %v", errPrefix, err) - return - } - } else { - destReader = destReaders[0] - } + readers := sync.WaitGroup{} - dbNames := make([]string, len(scw.destinationShards)) - for i, si := range scw.destinationShards { - keyspaceAndShard := topoproto.KeyspaceShardString(si.Keyspace(), si.ShardName()) - dbNames[i] = scw.destinationDbNames[keyspaceAndShard] - } - // Compare the data and reconcile any differences. - differ, err := NewRowDiffer2(ctx, sourceReader, destReader, td, tableStatusList, tableIndex, - scw.destinationShards, keyResolver, - insertChannels, ctx.Done(), dbNames, scw.writeQueryMaxRows, scw.writeQueryMaxSize, statsCounters) - if err != nil { - processError("%v: NewRowDiffer2 failed: %v", errPrefix, err) - return - } - // Ignore the diff report because all diffs should get reconciled. - _ /* DiffReport */, err = differ.Diff() - if err != nil { - processError("%v: RowDiffer2 failed: %v", errPrefix, err) - return - } - }(td, tableIndex, c) - } + err = scw.startCloningData(ctx, state, sourceSchemaDefinition, processError, firstSourceTablet, tableStatusList, start, statsCounters, insertChannels, &readers) + if err != nil { + return fmt.Errorf("failed to startCloningData : %v", err) } - sourceWaitGroup.Wait() + readers.Wait() for shardIndex := range scw.destinationShards { close(insertChannels[shardIndex]) } destinationWaitGroup.Wait() - if firstError != nil { - return firstError - } - if state == WorkerStateCloneOffline { - // Create and populate the vreplication table to give filtered replication - // a starting point. - queries := make([]string, 0, 4) - queries = append(queries, binlogplayer.CreateVReplicationTable()...) + return firstError +} + +func (scw *SplitCloneWorker) setUpVReplication(ctx context.Context) error { + wg := sync.WaitGroup{} + // Create and populate the vreplication table to give filtered replication + // a starting point. + queries := make([]string, 0, 4) + queries = append(queries, binlogplayer.CreateVReplicationTable()...) - // get the current position from the sources - sourcePositions := make([]string, len(scw.sourceShards)) + // get the current position from the sources + sourcePositions := make([]string, len(scw.sourceShards)) + + if scw.useConsistentSnapshot { + sourcePositions[0] = scw.lastPos + } else { for shardIndex := range scw.sourceShards { shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) status, err := scw.wr.TabletManagerClient().SlaveStatus(shortCtx, scw.sourceTablets[shardIndex]) @@ -1059,46 +1221,60 @@ func (scw *SplitCloneWorker) clone(ctx context.Context, state StatusWorkerState) } sourcePositions[shardIndex] = status.Position } + } + cancelableCtx, cancel := context.WithCancel(ctx) + rec := concurrency.AllErrorRecorder{} + handleError := func(e error) { + rec.RecordError(e) + cancel() + } - for _, si := range scw.destinationShards { - destinationWaitGroup.Add(1) - go func(keyspace, shard string, kr *topodatapb.KeyRange) { - defer destinationWaitGroup.Done() - scw.wr.Logger().Infof("Making and populating vreplication table") - - exc := newExecutor(scw.wr, scw.tsc, nil, keyspace, shard, 0) - for shardIndex, src := range scw.sourceShards { - bls := &binlogdatapb.BinlogSource{ - Keyspace: src.Keyspace(), - Shard: src.ShardName(), - } - if scw.tables == nil { - bls.KeyRange = kr - } else { - bls.Tables = scw.tables - } - // TODO(mberlin): Fill in scw.maxReplicationLag once the adapative - // throttler is enabled by default. - qr, err := exc.vreplicationExec(ctx, binlogplayer.CreateVReplication("SplitClone", bls, sourcePositions[shardIndex], scw.maxTPS, throttler.ReplicationLagModuleDisabled, time.Now().Unix())) - if err != nil { - processError("vreplication queries failed: %v", err) - break - } - if err := scw.wr.SourceShardAdd(ctx, keyspace, shard, uint32(qr.InsertID), src.Keyspace(), src.ShardName(), src.Shard.KeyRange, scw.tables); err != nil { - processError("could not add source shard: %v", err) - break - } + for _, si := range scw.destinationShards { + wg.Add(1) + go func(keyspace, shard string, kr *topodatapb.KeyRange) { + defer wg.Done() + scw.wr.Logger().Infof("Making and populating vreplication table") + + exc := newExecutor(scw.wr, scw.tsc, nil, keyspace, shard, 0) + for shardIndex, src := range scw.sourceShards { + // Check if any error occurred in any other gorouties: + select { + case <-cancelableCtx.Done(): + return // Error somewhere, terminate + default: } - // refreshState will cause the destination to become non-serving because - // it's now participating in the resharding workflow. - if err := exc.refreshState(ctx); err != nil { - processError("RefreshState failed on tablet %v/%v: %v", keyspace, shard, err) + + bls := &binlogdatapb.BinlogSource{ + Keyspace: src.Keyspace(), + Shard: src.ShardName(), } - }(si.Keyspace(), si.ShardName(), si.KeyRange) - } - destinationWaitGroup.Wait() - } // clonePhase == offline - return firstError + if scw.tables == nil { + bls.KeyRange = kr + } else { + bls.Tables = scw.tables + } + // TODO(mberlin): Fill in scw.maxReplicationLag once the adapative throttler is enabled by default. + qr, err := exc.vreplicationExec(cancelableCtx, binlogplayer.CreateVReplication("SplitClone", bls, sourcePositions[shardIndex], scw.maxTPS, throttler.ReplicationLagModuleDisabled, time.Now().Unix())) + if err != nil { + handleError(fmt.Errorf("vreplication queries failed: %v", err)) + cancel() + return + } + if err := scw.wr.SourceShardAdd(cancelableCtx, keyspace, shard, uint32(qr.InsertID), src.Keyspace(), src.ShardName(), src.Shard.KeyRange, scw.tables); err != nil { + handleError(fmt.Errorf("could not add source shard: %v", err)) + break + } + } + // refreshState will cause the destination to become non-serving because + // it's now participating in the resharding workflow. + if err := exc.refreshState(ctx); err != nil { + handleError(fmt.Errorf("RefreshState failed on tablet %v/%v: %v", keyspace, shard, err)) + } + }(si.Keyspace(), si.ShardName(), si.KeyRange) + } + wg.Wait() + + return rec.Error() } func (scw *SplitCloneWorker) getSourceSchema(ctx context.Context, tablet *topodatapb.Tablet) (*tabletmanagerdatapb.SchemaDefinition, error) { @@ -1201,4 +1377,4 @@ func (scw *SplitCloneWorker) closeThrottlers() { t.Close() delete(scw.throttlers, keyspaceAndShard) } -} +} \ No newline at end of file diff --git a/go/vt/worker/split_clone_cmd.go b/go/vt/worker/split_clone_cmd.go index 770af7797c5..f349fb25f89 100644 --- a/go/vt/worker/split_clone_cmd.go +++ b/go/vt/worker/split_clone_cmd.go @@ -25,12 +25,13 @@ import ( "strings" "sync" - "vitess.io/vitess/go/vt/vterrors" - "golang.org/x/net/context" "vitess.io/vitess/go/vt/concurrency" + "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/topotools" + "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/wrangler" ) @@ -82,14 +83,22 @@ const splitCloneHTML2 = `

- -
+ + +
+ +


+ + ?
@@ -108,7 +117,9 @@ func commandSplitClone(wi *Instance, wr *wrangler.Wrangler, subFlags *flag.FlagS writeQueryMaxRows := subFlags.Int("write_query_max_rows", defaultWriteQueryMaxRows, "maximum number of rows per write query") writeQueryMaxSize := subFlags.Int("write_query_max_size", defaultWriteQueryMaxSize, "maximum size (in bytes) per write query") destinationWriterCount := subFlags.Int("destination_writer_count", defaultDestinationWriterCount, "number of concurrent RPCs to execute on the destination") - minHealthyRdonlyTablets := subFlags.Int("min_healthy_rdonly_tablets", defaultMinHealthyTablets, "minimum number of healthy RDONLY tablets in the source and destination shard at start") + tabletTypeStr := subFlags.String("tablet_type", "RDONLY", "tablet type to use (RDONLY or REPLICA)") + minHealthyTablets := subFlags.Int("min_healthy_rdonly_tablets", defaultMinHealthyTablets, "minimum number of healthy tablets in the source and destination shard at start") + useConsistentSnapshot := subFlags.Bool("use_consistent_snapshot", defaultUseConsistentSnapshot, "Instead of pausing replication on the source, uses transactions with consistent snapshot to have a stable view of the data.") maxTPS := subFlags.Int64("max_tps", defaultMaxTPS, "rate limit of maximum number of (write) transactions/second on the destination (unlimited by default)") maxReplicationLag := subFlags.Int64("max_replication_lag", defaultMaxReplicationLag, "if set, the adapative throttler will be enabled and automatically adjust the write rate to keep the lag below the set value in seconds (disabled by default)") if err := subFlags.Parse(args); err != nil { @@ -116,7 +127,7 @@ func commandSplitClone(wi *Instance, wr *wrangler.Wrangler, subFlags *flag.FlagS } if subFlags.NArg() != 1 { subFlags.Usage() - return nil, fmt.Errorf("command SplitClone requires ") + return nil, vterrors.New(vtrpc.Code_INVALID_ARGUMENT, "command SplitClone requires ") } keyspace, shard, err := topoproto.ParseKeyspaceShard(subFlags.Arg(0)) @@ -127,7 +138,11 @@ func commandSplitClone(wi *Instance, wr *wrangler.Wrangler, subFlags *flag.FlagS if *excludeTables != "" { excludeTableArray = strings.Split(*excludeTables, ",") } - worker, err := newSplitCloneWorker(wr, wi.cell, keyspace, shard, *online, *offline, excludeTableArray, *chunkCount, *minRowsPerChunk, *sourceReaderCount, *writeQueryMaxRows, *writeQueryMaxSize, *destinationWriterCount, *minHealthyRdonlyTablets, *maxTPS, *maxReplicationLag) + tabletType, ok := topodata.TabletType_value[*tabletTypeStr] + if !ok { + return nil, vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "command SplitClone invalid tablet_type: %v", tabletType) + } + worker, err := newSplitCloneWorker(wr, wi.cell, keyspace, shard, *online, *offline, excludeTableArray, *chunkCount, *minRowsPerChunk, *sourceReaderCount, *writeQueryMaxRows, *writeQueryMaxSize, *destinationWriterCount, *minHealthyTablets, topodata.TabletType(tabletType), *maxTPS, *maxReplicationLag, *useConsistentSnapshot) if err != nil { return nil, vterrors.Wrap(err, "cannot create split clone worker") } @@ -173,7 +188,7 @@ func keyspacesWithOverlappingShards(ctx context.Context, wr *wrangler.Wrangler) return nil, rec.Error() } if len(result) == 0 { - return nil, fmt.Errorf("There are no keyspaces with overlapping shards") + return nil, vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "there are no keyspaces with overlapping shards") } return result, nil } @@ -212,9 +227,10 @@ func interactiveSplitClone(ctx context.Context, wi *Instance, wr *wrangler.Wrang result["DefaultWriteQueryMaxRows"] = fmt.Sprintf("%v", defaultWriteQueryMaxRows) result["DefaultWriteQueryMaxSize"] = fmt.Sprintf("%v", defaultWriteQueryMaxSize) result["DefaultDestinationWriterCount"] = fmt.Sprintf("%v", defaultDestinationWriterCount) - result["DefaultMinHealthyRdonlyTablets"] = fmt.Sprintf("%v", defaultMinHealthyTablets) + result["DefaultMinHealthyTablets"] = fmt.Sprintf("%v", defaultMinHealthyTablets) result["DefaultMaxTPS"] = fmt.Sprintf("%v", defaultMaxTPS) result["DefaultMaxReplicationLag"] = fmt.Sprintf("%v", defaultMaxReplicationLag) + result["DefaultUseConsistentSnapshot"] = fmt.Sprintf("%v", defaultUseConsistentSnapshot) return nil, splitCloneTemplate2, result, nil } @@ -257,10 +273,15 @@ func interactiveSplitClone(ctx context.Context, wi *Instance, wr *wrangler.Wrang if err != nil { return nil, nil, nil, vterrors.Wrap(err, "cannot parse destinationWriterCount") } - minHealthyRdonlyTabletsStr := r.FormValue("minHealthyRdonlyTablets") - minHealthyRdonlyTablets, err := strconv.ParseInt(minHealthyRdonlyTabletsStr, 0, 64) + minHealthyTabletsStr := r.FormValue("minHealthyTablets") + minHealthyTablets, err := strconv.ParseInt(minHealthyTabletsStr, 0, 64) if err != nil { - return nil, nil, nil, vterrors.Wrap(err, "cannot parse minHealthyRdonlyTablets") + return nil, nil, nil, vterrors.Wrap(err, "cannot parse minHealthyTablets") + } + tabletTypeStr := r.FormValue("tabletType") + tabletType, ok := topodata.TabletType_value[tabletTypeStr] + if !ok { + return nil, nil, nil, vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "command SplitClone invalid tablet_type: %v", tabletType) } maxTPSStr := r.FormValue("maxTPS") maxTPS, err := strconv.ParseInt(maxTPSStr, 0, 64) @@ -273,8 +294,11 @@ func interactiveSplitClone(ctx context.Context, wi *Instance, wr *wrangler.Wrang return nil, nil, nil, vterrors.Wrap(err, "cannot parse maxReplicationLag") } + useConsistentSnapshotStr := r.FormValue("useConsistentSnapshot") + useConsistentSnapshot := useConsistentSnapshotStr == "true" + // start the clone job - wrk, err := newSplitCloneWorker(wr, wi.cell, keyspace, shard, online, offline, excludeTableArray, int(chunkCount), int(minRowsPerChunk), int(sourceReaderCount), int(writeQueryMaxRows), int(writeQueryMaxSize), int(destinationWriterCount), int(minHealthyRdonlyTablets), maxTPS, maxReplicationLag) + wrk, err := newSplitCloneWorker(wr, wi.cell, keyspace, shard, online, offline, excludeTableArray, int(chunkCount), int(minRowsPerChunk), int(sourceReaderCount), int(writeQueryMaxRows), int(writeQueryMaxSize), int(destinationWriterCount), int(minHealthyTablets), topodata.TabletType(tabletType), maxTPS, maxReplicationLag, useConsistentSnapshot) if err != nil { return nil, nil, nil, vterrors.Wrap(err, "cannot create worker") } @@ -286,4 +310,4 @@ func init() { commandSplitClone, interactiveSplitClone, "[--online=false] [--offline=false] [--exclude_tables=''] ", "Replicates the data and creates configuration for a horizontal split."}) -} +} \ No newline at end of file diff --git a/go/vt/worker/split_clone_test.go b/go/vt/worker/split_clone_test.go index 6e219171ed2..2d8994f11b1 100644 --- a/go/vt/worker/split_clone_test.go +++ b/go/vt/worker/split_clone_test.go @@ -522,7 +522,7 @@ func TestSplitCloneV2_Offline(t *testing.T) { // Run the vtworker command. if err := runCommand(t, tc.wi, tc.wi.wr, tc.defaultWorkerArgs); err != nil { - t.Fatal(err) + t.Fatalf("%+v", err) } } diff --git a/go/vt/worker/vertical_split_clone_cmd.go b/go/vt/worker/vertical_split_clone_cmd.go index 3d812003eac..f221ee5d677 100644 --- a/go/vt/worker/vertical_split_clone_cmd.go +++ b/go/vt/worker/vertical_split_clone_cmd.go @@ -25,6 +25,7 @@ import ( "strings" "sync" + "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/vterrors" "golang.org/x/net/context" @@ -107,6 +108,7 @@ func commandVerticalSplitClone(wi *Instance, wr *wrangler.Wrangler, subFlags *fl writeQueryMaxSize := subFlags.Int("write_query_max_size", defaultWriteQueryMaxSize, "maximum size (in bytes) per write query") destinationWriterCount := subFlags.Int("destination_writer_count", defaultDestinationWriterCount, "number of concurrent RPCs to execute on the destination") minHealthyRdonlyTablets := subFlags.Int("min_healthy_rdonly_tablets", defaultMinHealthyTablets, "minimum number of healthy RDONLY tablets before taking out one") + tabletTypeStr := subFlags.String("tablet_type", "RDONLY", "tablet type to use (RDONLY or REPLICA)") maxTPS := subFlags.Int64("max_tps", defaultMaxTPS, "if non-zero, limit copy to maximum number of (write) transactions/second on the destination (unlimited by default)") maxReplicationLag := subFlags.Int64("max_replication_lag", defaultMaxReplicationLag, "if set, the adapative throttler will be enabled and automatically adjust the write rate to keep the lag below the set value in seconds (disabled by default)") if err := subFlags.Parse(args); err != nil { @@ -127,7 +129,12 @@ func commandVerticalSplitClone(wi *Instance, wr *wrangler.Wrangler, subFlags *fl if *tables != "" { tableArray = strings.Split(*tables, ",") } - worker, err := newVerticalSplitCloneWorker(wr, wi.cell, keyspace, shard, *online, *offline, tableArray, *chunkCount, *minRowsPerChunk, *sourceReaderCount, *writeQueryMaxRows, *writeQueryMaxSize, *destinationWriterCount, *minHealthyRdonlyTablets, *maxTPS, *maxReplicationLag) + tabletType, ok := topodata.TabletType_value[*tabletTypeStr] + if !ok { + return nil, fmt.Errorf("command SplitClone invalid tablet_type: %v", tabletType) + } + + worker, err := newVerticalSplitCloneWorker(wr, wi.cell, keyspace, shard, *online, *offline, tableArray, *chunkCount, *minRowsPerChunk, *sourceReaderCount, *writeQueryMaxRows, *writeQueryMaxSize, *destinationWriterCount, *minHealthyRdonlyTablets, topodata.TabletType(tabletType), *maxTPS, *maxReplicationLag, /*useConsistentSnapshot*/false) if err != nil { return nil, vterrors.Wrap(err, "cannot create worker") } @@ -265,6 +272,12 @@ func interactiveVerticalSplitClone(ctx context.Context, wi *Instance, wr *wrangl if err != nil { return nil, nil, nil, vterrors.Wrap(err, "cannot parse maxReplicationLag") } + tabletTypeStr := r.FormValue("tabletType") + tabletType, ok := topodata.TabletType_value[tabletTypeStr] + if !ok { + return nil, nil, nil, fmt.Errorf("cannot parse tabletType: %v", tabletType) + } + // Figure out the shard shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) @@ -282,7 +295,10 @@ func interactiveVerticalSplitClone(ctx context.Context, wi *Instance, wr *wrangl } // start the clone job - wrk, err := newVerticalSplitCloneWorker(wr, wi.cell, keyspace, shard, online, offline, tableArray, int(chunkCount), int(minRowsPerChunk), int(sourceReaderCount), int(writeQueryMaxRows), int(writeQueryMaxSize), int(destinationWriterCount), int(minHealthyRdonlyTablets), maxTPS, maxReplicationLag) + wrk, err := newVerticalSplitCloneWorker(wr, wi.cell, keyspace, shard, online, offline, tableArray, int(chunkCount), + int(minRowsPerChunk), int(sourceReaderCount), int(writeQueryMaxRows), int(writeQueryMaxSize), + int(destinationWriterCount), int(minHealthyRdonlyTablets), topodata.TabletType(tabletType), maxTPS, + maxReplicationLag, false) if err != nil { return nil, nil, nil, vterrors.Wrap(err, "cannot create worker") } From 1d8fb2046208dcd25211948888f672c381e0c0a8 Mon Sep 17 00:00:00 2001 From: Andres Taylor Date: Fri, 25 Jan 2019 13:36:58 +0100 Subject: [PATCH 2/4] Close test resources Signed-off-by: Andres Taylor --- go/vt/worker/split_clone_test.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/go/vt/worker/split_clone_test.go b/go/vt/worker/split_clone_test.go index 2d8994f11b1..7e73330b273 100644 --- a/go/vt/worker/split_clone_test.go +++ b/go/vt/worker/split_clone_test.go @@ -27,7 +27,6 @@ import ( "time" "golang.org/x/net/context" - "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/mysql/fakesqldb" "vitess.io/vitess/go/sqltypes" @@ -291,6 +290,11 @@ func (tc *splitCloneTestCase) tearDown() { for _, ft := range tc.tablets { ft.StopActionLoop(tc.t) + ft.RPCServer.Stop() + ft.FakeMysqlDaemon.Close() + ft.Agent = nil + ft.RPCServer = nil + ft.FakeMysqlDaemon = nil } tc.leftMasterFakeDb.VerifyAllExecutedOrFail() tc.leftReplicaFakeDb.VerifyAllExecutedOrFail() From 4b4da3ade9129efa104b495c5b376dcd0951ab5a Mon Sep 17 00:00:00 2001 From: Andres Taylor Date: Mon, 28 Jan 2019 12:35:38 +0100 Subject: [PATCH 3/4] Minor cleanups Signed-off-by: Andres Taylor --- go/cmd/vtworkerclient/vtworkerclient.go | 2 +- go/vt/worker/restartable_result_reader.go | 4 +--- go/vt/worker/split_clone.go | 23 ----------------------- 3 files changed, 2 insertions(+), 27 deletions(-) diff --git a/go/cmd/vtworkerclient/vtworkerclient.go b/go/cmd/vtworkerclient/vtworkerclient.go index 997da727657..0e1874453ba 100644 --- a/go/cmd/vtworkerclient/vtworkerclient.go +++ b/go/cmd/vtworkerclient/vtworkerclient.go @@ -55,7 +55,7 @@ func main() { logutil.LogEvent(logger, e) }) if err != nil { - log.Errorf("%+v", err) + log.Error(err) os.Exit(1) } } diff --git a/go/vt/worker/restartable_result_reader.go b/go/vt/worker/restartable_result_reader.go index 84649be867d..90a5e8c20ea 100644 --- a/go/vt/worker/restartable_result_reader.go +++ b/go/vt/worker/restartable_result_reader.go @@ -23,8 +23,6 @@ import ( "strings" "time" - "github.com/golang/glog" - "vitess.io/vitess/go/vt/vterrors" "golang.org/x/net/context" @@ -117,7 +115,7 @@ func tryToConnect(r *RestartableResultReader) error { return fmt.Errorf("failed to initialize tablet connection: retryable %v, %v", retryable, err) } statsRetryCount.Add(1) - glog.Infof("retrying after error: %v", err) + log.Infof("retrying after error: %v", err) } } diff --git a/go/vt/worker/split_clone.go b/go/vt/worker/split_clone.go index 8eb5fefa2b6..8537be596e2 100644 --- a/go/vt/worker/split_clone.go +++ b/go/vt/worker/split_clone.go @@ -435,29 +435,6 @@ func (scw *SplitCloneWorker) Run(ctx context.Context) error { return nil } -func (scw *SplitCloneWorker) disableUniquenessCheckOnDestinationTablets(ctx context.Context) error { - for _, si := range scw.destinationShards { - tablets := scw.tsc.GetHealthyTabletStats(si.Keyspace(), si.ShardName(), topodatapb.TabletType_MASTER) - if len(tablets) != 1 { - return fmt.Errorf("should have exactly one MASTER tablet, have %v", len(tablets)) - } - tablet := tablets[0].Tablet - cmd := "SET UNIQUE_CHECKS=0" - scw.wr.Logger().Infof("disabling uniqueness checks on %v", topoproto.TabletAliasString(tablet.Alias)) - _, err := scw.wr.TabletManagerClient().ExecuteFetchAsApp(ctx, tablet, true, []byte(cmd), 0) - if err != nil { - return fmt.Errorf("should have exactly one MASTER tablet, have %v", len(tablets)) - } - scw.cleaner.Record("EnableUniquenessChecks", topoproto.TabletAliasString(tablet.Alias), func(ctx context.Context, wr *wrangler.Wrangler) error { - cmd := "SET UNIQUE_CHECKS=1" - _, err := scw.wr.TabletManagerClient().ExecuteFetchAsApp(ctx, tablet, true, []byte(cmd), 0) - return err - }) - } - - return nil -} - func (scw *SplitCloneWorker) run(ctx context.Context) error { // Phase 1: read what we need to do. if err := scw.init(ctx); err != nil { From c00469e3bdcf83d99f6a2e7de8747a48c31c041f Mon Sep 17 00:00:00 2001 From: Andres Taylor Date: Tue, 29 Jan 2019 06:32:40 +0100 Subject: [PATCH 4/4] Mkwe consistent snapshot the default Signed-off-by: Andres Taylor --- go/vt/worker/split_clone_cmd.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/worker/split_clone_cmd.go b/go/vt/worker/split_clone_cmd.go index f349fb25f89..e3d1085560d 100644 --- a/go/vt/worker/split_clone_cmd.go +++ b/go/vt/worker/split_clone_cmd.go @@ -98,7 +98,7 @@ const splitCloneHTML2 = ` - ?
+ ?