diff --git a/go/vt/worker/restartable_result_reader.go b/go/vt/worker/restartable_result_reader.go
index 73fadf04145..90a5e8c20ea 100644
--- a/go/vt/worker/restartable_result_reader.go
+++ b/go/vt/worker/restartable_result_reader.go
@@ -53,6 +53,8 @@ type RestartableResultReader struct {
chunk chunk
// allowMultipleRetries is true if we are allowed to retry more than once.
allowMultipleRetries bool
+ // if we are running inside a transaction, this will hold a non-zero value
+ txID int64
query string
@@ -82,6 +84,15 @@ func NewRestartableResultReader(ctx context.Context, logger logutil.Logger, tp t
allowMultipleRetries: allowMultipleRetries,
}
+ err := tryToConnect(r)
+ if err != nil {
+ return nil, err
+ }
+ return r, nil
+}
+
+func tryToConnect(r *RestartableResultReader) error {
+
// If the initial connection fails we retry once.
// Note: The first retry will be the second attempt.
attempt := 0
@@ -97,15 +108,35 @@ func NewRestartableResultReader(ctx context.Context, logger logutil.Logger, tp t
err = fmt.Errorf("tablet=%v: %v", topoproto.TabletAliasString(r.tablet.Alias), err)
goto retry
}
- return r, nil
+ return nil
retry:
if !retryable || attempt > 1 {
- return nil, fmt.Errorf("failed to initialize tablet connection: retryable %v, %v", retryable, err)
+ return fmt.Errorf("failed to initialize tablet connection: retryable %v, %v", retryable, err)
}
statsRetryCount.Add(1)
- logger.Infof("retrying after error: %v", err)
+ log.Infof("retrying after error: %v", err)
}
+
+}
+
+// NewTransactionalRestartableResultReader does the same thing that NewRestartableResultReader does,
+// but works inside of a single transaction
+func NewTransactionalRestartableResultReader(ctx context.Context, logger logutil.Logger, tp tabletProvider, td *tabletmanagerdatapb.TableDefinition, chunk chunk, allowMultipleRetries bool, txID int64) (*RestartableResultReader, error) {
+ r := &RestartableResultReader{
+ ctx: ctx,
+ logger: logger,
+ tp: tp,
+ td: td,
+ chunk: chunk,
+ allowMultipleRetries: allowMultipleRetries,
+ txID: txID,
+ }
+ err := tryToConnect(r)
+ if err != nil {
+ return nil, err
+ }
+ return r, nil
}
// getTablet (re)sets the tablet which is used for the streaming query.
@@ -145,11 +176,21 @@ func (r *RestartableResultReader) getTablet() (bool, error) {
func (r *RestartableResultReader) startStream() (bool, error) {
// Start the streaming query.
r.generateQuery()
- stream := queryservice.ExecuteWithStreamer(r.ctx, r.conn, &querypb.Target{
- Keyspace: r.tablet.Keyspace,
- Shard: r.tablet.Shard,
- TabletType: r.tablet.Type,
- }, r.query, make(map[string]*querypb.BindVariable), nil)
+ var stream sqltypes.ResultStream
+
+ if r.txID == 0 {
+ stream = queryservice.ExecuteWithStreamer(r.ctx, r.conn, &querypb.Target{
+ Keyspace: r.tablet.Keyspace,
+ Shard: r.tablet.Shard,
+ TabletType: r.tablet.Type,
+ }, r.query, make(map[string]*querypb.BindVariable), nil)
+ } else {
+ stream = queryservice.ExecuteWithTransactionalStreamer(r.ctx, r.conn, &querypb.Target{
+ Keyspace: r.tablet.Keyspace,
+ Shard: r.tablet.Shard,
+ TabletType: r.tablet.Type,
+ }, r.query, make(map[string]*querypb.BindVariable), r.txID, nil)
+ }
// Read the fields information.
cols, err := stream.Recv()
@@ -387,4 +428,4 @@ func greaterThanTupleWhereClause(columns []string, row []sqltypes.Value) []strin
clauses = append(clauses, b.String())
return clauses
-}
+}
\ No newline at end of file
diff --git a/go/vt/worker/result_merger.go b/go/vt/worker/result_merger.go
index 68250d06ad0..87213242b94 100644
--- a/go/vt/worker/result_merger.go
+++ b/go/vt/worker/result_merger.go
@@ -21,6 +21,7 @@ import (
"fmt"
"io"
+ "golang.org/x/net/context"
"vitess.io/vitess/go/vt/vterrors"
"github.com/golang/protobuf/proto"
@@ -177,6 +178,15 @@ func (rm *ResultMerger) Next() (*sqltypes.Result, error) {
return result, nil
}
+// Close closes all inputs
+func (rm *ResultMerger) Close(ctx context.Context) {
+ for _, i := range rm.inputs {
+ i.Close(ctx)
+ }
+}
+
+
+
func (rm *ResultMerger) deleteInput(deleteMe ResultReader) {
for i, input := range rm.inputs {
if input == deleteMe {
diff --git a/go/vt/worker/result_merger_test.go b/go/vt/worker/result_merger_test.go
index 10b37f3261f..66bfa65eddf 100644
--- a/go/vt/worker/result_merger_test.go
+++ b/go/vt/worker/result_merger_test.go
@@ -23,6 +23,7 @@ import (
"testing"
"time"
+ "golang.org/x/net/context"
"vitess.io/vitess/go/sqltypes"
querypb "vitess.io/vitess/go/vt/proto/query"
@@ -110,6 +111,10 @@ func (f *fakeResultReader) Fields() []*querypb.Field {
return f.fields
}
+// Close closes nothing
+func (f *fakeResultReader) Close(ctx context.Context) {
+}
+
// Next returns the next fake result. It is part of the ResultReader interface.
func (f *fakeResultReader) Next() (*sqltypes.Result, error) {
if f.rowsReturned == f.rowsTotal {
@@ -387,6 +392,10 @@ func (m *memoryResultReader) Next() (*sqltypes.Result, error) {
return result, nil
}
+func (m *memoryResultReader) Close(ctx context.Context) {
+ // intentionally blank. we have nothing we need to close
+}
+
// benchmarkResult is a package level variable whose sole purpose is to
// reference output from the Benchmark* functions below.
// This was suggested by http://dave.cheney.net/2013/06/30/how-to-write-benchmarks-in-go
diff --git a/go/vt/worker/result_reader.go b/go/vt/worker/result_reader.go
index 51ccda42367..efc0fc1f7bb 100644
--- a/go/vt/worker/result_reader.go
+++ b/go/vt/worker/result_reader.go
@@ -17,6 +17,8 @@ limitations under the License.
package worker
import (
+ "context"
+
"vitess.io/vitess/go/sqltypes"
querypb "vitess.io/vitess/go/vt/proto/query"
@@ -39,4 +41,5 @@ type ResultReader interface {
// It returns the next result on the stream.
// It will return io.EOF if the stream ended.
Next() (*sqltypes.Result, error)
+ Close(ctx context.Context)
}
diff --git a/go/vt/worker/split_clone.go b/go/vt/worker/split_clone.go
index 3cc7638d4a1..8537be596e2 100644
--- a/go/vt/worker/split_clone.go
+++ b/go/vt/worker/split_clone.go
@@ -24,14 +24,16 @@ import (
"sync"
"time"
+ "vitess.io/vitess/go/vt/vttablet/tabletconn"
+
+ "vitess.io/vitess/go/vt/binlog/binlogplayer"
+
"vitess.io/vitess/go/vt/vterrors"
"golang.org/x/net/context"
"vitess.io/vitess/go/event"
"vitess.io/vitess/go/stats"
- "vitess.io/vitess/go/sync2"
- "vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/throttler"
@@ -64,28 +66,30 @@ var servingTypes = []topodatapb.TabletType{topodatapb.TabletType_MASTER, topodat
type SplitCloneWorker struct {
StatusWorker
- wr *wrangler.Wrangler
- cloneType cloneType
- cell string
- destinationKeyspace string
- shard string
- online bool
- offline bool
+ wr *wrangler.Wrangler
+ cloneType cloneType
+ cell string
+ destinationKeyspace string
+ shard string
+ online bool
+ offline bool
+ useConsistentSnapshot bool
// verticalSplit only: List of tables which should be split out.
tables []string
// horizontalResharding only: List of tables which will be skipped.
- excludeTables []string
- chunkCount int
- minRowsPerChunk int
- sourceReaderCount int
- writeQueryMaxRows int
- writeQueryMaxSize int
- destinationWriterCount int
- minHealthyRdonlyTablets int
- maxTPS int64
- maxReplicationLag int64
- cleaner *wrangler.Cleaner
- tabletTracker *TabletTracker
+ excludeTables []string
+ chunkCount int
+ minRowsPerChunk int
+ sourceReaderCount int
+ writeQueryMaxRows int
+ writeQueryMaxSize int
+ destinationWriterCount int
+ minHealthyTablets int
+ tabletType topodatapb.TabletType
+ maxTPS int64
+ maxReplicationLag int64
+ cleaner *wrangler.Cleaner
+ tabletTracker *TabletTracker
// populated during WorkerStateInit, read-only after that
destinationKeyspaceInfo *topo.KeyspaceInfo
@@ -101,6 +105,9 @@ type SplitCloneWorker struct {
// populated during WorkerStateFindTargets, read-only after that
sourceTablets []*topodatapb.Tablet
+ lastPos string // contains the GTID position for the source
+ transactions []int64
+
// shardWatchers contains a TopologyWatcher for each source and destination
// shard. It updates the list of tablets in the healthcheck if replicas are
// added/removed.
@@ -118,15 +125,15 @@ type SplitCloneWorker struct {
// Throttlers will be added/removed during WorkerStateClone(Online|Offline).
throttlers map[string]*throttler.Throttler
- // offlineSourceAliases has the list of tablets (per source shard) we took
+ // sourceAliases has the list of tablets (per source shard) we took
// offline for the WorkerStateCloneOffline phase.
// Populated shortly before WorkerStateCloneOffline, read-only after that.
- offlineSourceAliases []*topodatapb.TabletAlias
+ sourceAliases []*topodatapb.TabletAlias
// formattedOfflineSourcesMu guards all fields in this group.
formattedOfflineSourcesMu sync.Mutex
// formattedOfflineSources is a space separated list of
- // "offlineSourceAliases". It is used by the StatusAs* methods to output the
+ // "sourceAliases". It is used by the StatusAs* methods to output the
// used source tablets during the offline clone phase.
formattedOfflineSources string
@@ -140,20 +147,20 @@ type SplitCloneWorker struct {
}
// newSplitCloneWorker returns a new worker object for the SplitClone command.
-func newSplitCloneWorker(wr *wrangler.Wrangler, cell, keyspace, shard string, online, offline bool, excludeTables []string, chunkCount, minRowsPerChunk, sourceReaderCount, writeQueryMaxRows, writeQueryMaxSize, destinationWriterCount, minHealthyRdonlyTablets int, maxTPS, maxReplicationLag int64) (Worker, error) {
- return newCloneWorker(wr, horizontalResharding, cell, keyspace, shard, online, offline, nil /* tables */, excludeTables, chunkCount, minRowsPerChunk, sourceReaderCount, writeQueryMaxRows, writeQueryMaxSize, destinationWriterCount, minHealthyRdonlyTablets, maxTPS, maxReplicationLag)
+func newSplitCloneWorker(wr *wrangler.Wrangler, cell, keyspace, shard string, online, offline bool, excludeTables []string, chunkCount, minRowsPerChunk, sourceReaderCount, writeQueryMaxRows, writeQueryMaxSize, destinationWriterCount, minHealthyTablets int, tabletType topodatapb.TabletType, maxTPS, maxReplicationLag int64, useConsistentSnapshot bool) (Worker, error) {
+ return newCloneWorker(wr, horizontalResharding, cell, keyspace, shard, online, offline, nil /* tables */, excludeTables, chunkCount, minRowsPerChunk, sourceReaderCount, writeQueryMaxRows, writeQueryMaxSize, destinationWriterCount, minHealthyTablets, tabletType, maxTPS, maxReplicationLag, useConsistentSnapshot)
}
// newVerticalSplitCloneWorker returns a new worker object for the
// VerticalSplitClone command.
-func newVerticalSplitCloneWorker(wr *wrangler.Wrangler, cell, keyspace, shard string, online, offline bool, tables []string, chunkCount, minRowsPerChunk, sourceReaderCount, writeQueryMaxRows, writeQueryMaxSize, destinationWriterCount, minHealthyRdonlyTablets int, maxTPS, maxReplicationLag int64) (Worker, error) {
- return newCloneWorker(wr, verticalSplit, cell, keyspace, shard, online, offline, tables, nil /* excludeTables */, chunkCount, minRowsPerChunk, sourceReaderCount, writeQueryMaxRows, writeQueryMaxSize, destinationWriterCount, minHealthyRdonlyTablets, maxTPS, maxReplicationLag)
+func newVerticalSplitCloneWorker(wr *wrangler.Wrangler, cell, keyspace, shard string, online, offline bool, tables []string, chunkCount, minRowsPerChunk, sourceReaderCount, writeQueryMaxRows, writeQueryMaxSize, destinationWriterCount, minHealthyTablets int, tabletType topodatapb.TabletType, maxTPS, maxReplicationLag int64, useConsistentSnapshot bool) (Worker, error) {
+ return newCloneWorker(wr, verticalSplit, cell, keyspace, shard, online, offline, tables, nil /* excludeTables */, chunkCount, minRowsPerChunk, sourceReaderCount, writeQueryMaxRows, writeQueryMaxSize, destinationWriterCount, minHealthyTablets, tabletType, maxTPS, maxReplicationLag, useConsistentSnapshot)
}
// newCloneWorker returns a new SplitCloneWorker object which is used both by
// the SplitClone and VerticalSplitClone command.
// TODO(mberlin): Rename SplitCloneWorker to cloneWorker.
-func newCloneWorker(wr *wrangler.Wrangler, cloneType cloneType, cell, keyspace, shard string, online, offline bool, tables, excludeTables []string, chunkCount, minRowsPerChunk, sourceReaderCount, writeQueryMaxRows, writeQueryMaxSize, destinationWriterCount, minHealthyRdonlyTablets int, maxTPS, maxReplicationLag int64) (Worker, error) {
+func newCloneWorker(wr *wrangler.Wrangler, cloneType cloneType, cell, keyspace, shard string, online, offline bool, tables, excludeTables []string, chunkCount, minRowsPerChunk, sourceReaderCount, writeQueryMaxRows, writeQueryMaxSize, destinationWriterCount, minHealthyTablets int, tabletType topodatapb.TabletType, maxTPS, maxReplicationLag int64, useConsistentSnapshot bool) (Worker, error) {
if cloneType != horizontalResharding && cloneType != verticalSplit {
return nil, fmt.Errorf("unknown cloneType: %v This is a bug. Please report", cloneType)
}
@@ -183,8 +190,8 @@ func newCloneWorker(wr *wrangler.Wrangler, cloneType cloneType, cell, keyspace,
if destinationWriterCount <= 0 {
return nil, fmt.Errorf("destination_writer_count must be > 0: %v", destinationWriterCount)
}
- if minHealthyRdonlyTablets < 0 {
- return nil, fmt.Errorf("min_healthy_rdonly_tablets must be >= 0: %v", minHealthyRdonlyTablets)
+ if minHealthyTablets < 0 {
+ return nil, fmt.Errorf("min_healthy_tablets must be >= 0: %v", minHealthyTablets)
}
if maxTPS != throttler.MaxRateModuleDisabled {
wr.Logger().Infof("throttling enabled and set to a max of %v transactions/second", maxTPS)
@@ -195,35 +202,39 @@ func newCloneWorker(wr *wrangler.Wrangler, cloneType cloneType, cell, keyspace,
if maxReplicationLag <= 0 {
return nil, fmt.Errorf("max_replication_lag must be >= 1s: %v", maxReplicationLag)
}
+ if tabletType != topodatapb.TabletType_REPLICA && tabletType != topodatapb.TabletType_RDONLY {
+ return nil, fmt.Errorf("tablet_type must be RDONLY or REPLICA: %v", topodatapb.TabletType_name[int32(tabletType)])
+ }
scw := &SplitCloneWorker{
- StatusWorker: NewStatusWorker(),
- wr: wr,
- cloneType: cloneType,
- cell: cell,
- destinationKeyspace: keyspace,
- shard: shard,
- online: online,
- offline: offline,
- tables: tables,
- excludeTables: excludeTables,
- chunkCount: chunkCount,
- minRowsPerChunk: minRowsPerChunk,
- sourceReaderCount: sourceReaderCount,
- writeQueryMaxRows: writeQueryMaxRows,
- writeQueryMaxSize: writeQueryMaxSize,
- destinationWriterCount: destinationWriterCount,
- minHealthyRdonlyTablets: minHealthyRdonlyTablets,
- maxTPS: maxTPS,
- maxReplicationLag: maxReplicationLag,
- cleaner: &wrangler.Cleaner{},
- tabletTracker: NewTabletTracker(),
- throttlers: make(map[string]*throttler.Throttler),
-
- destinationDbNames: make(map[string]string),
-
+ StatusWorker: NewStatusWorker(),
+ wr: wr,
+ cloneType: cloneType,
+ cell: cell,
+ destinationKeyspace: keyspace,
+ shard: shard,
+ online: online,
+ offline: offline,
+ tables: tables,
+ excludeTables: excludeTables,
+ chunkCount: chunkCount,
+ minRowsPerChunk: minRowsPerChunk,
+ sourceReaderCount: sourceReaderCount,
+ writeQueryMaxRows: writeQueryMaxRows,
+ writeQueryMaxSize: writeQueryMaxSize,
+ destinationWriterCount: destinationWriterCount,
+ minHealthyTablets: minHealthyTablets,
+ maxTPS: maxTPS,
+ maxReplicationLag: maxReplicationLag,
+ cleaner: &wrangler.Cleaner{},
+ tabletTracker: NewTabletTracker(),
+ tabletType: tabletType,
tableStatusListOnline: &tableStatusList{},
tableStatusListOffline: &tableStatusList{},
+ useConsistentSnapshot: useConsistentSnapshot,
+
+ throttlers: make(map[string]*throttler.Throttler),
+ destinationDbNames: make(map[string]string),
}
scw.initializeEventDescriptor()
return scw, nil
@@ -476,9 +487,15 @@ func (scw *SplitCloneWorker) run(ctx context.Context) error {
time.Sleep(1 * time.Second)
}
- // 4a: Take source tablets out of serving for an exact snapshot.
- if err := scw.findOfflineSourceTablets(ctx); err != nil {
- return vterrors.Wrap(err, "findSourceTablets() failed")
+ // 4a: Make sure the sources are producing a stable view of the data
+ if scw.useConsistentSnapshot {
+ if err := scw.findTransactionalSources(ctx); err != nil {
+ return vterrors.Wrap(err, "findSourceTablets() failed")
+ }
+ } else {
+ if err := scw.findOfflineSourceTablets(ctx); err != nil {
+ return vterrors.Wrap(err, "findSourceTablets() failed")
+ }
}
if err := checkDone(ctx); err != nil {
return err
@@ -489,6 +506,10 @@ func (scw *SplitCloneWorker) run(ctx context.Context) error {
if err := scw.clone(ctx, WorkerStateCloneOffline); err != nil {
return vterrors.Wrap(err, "offline clone() failed")
}
+ if err := scw.setUpVReplication(ctx); err != nil {
+ return fmt.Errorf("failed to set up replication: %v", err)
+ }
+
d := time.Since(start)
if err := checkDone(ctx); err != nil {
return err
@@ -583,6 +604,10 @@ func (scw *SplitCloneWorker) initShardsForHorizontalResharding(ctx context.Conte
scw.destinationShards = os.Left
}
+ if scw.useConsistentSnapshot && len(scw.sourceShards) > 1 {
+ return fmt.Errorf("cannot use consistent snapshot against multiple source shards")
+ }
+
return nil
}
@@ -705,20 +730,20 @@ func (scw *SplitCloneWorker) findOfflineSourceTablets(ctx context.Context) error
scw.setState(WorkerStateFindTargets)
// find an appropriate tablet in the source shards
- scw.offlineSourceAliases = make([]*topodatapb.TabletAlias, len(scw.sourceShards))
+ scw.sourceAliases = make([]*topodatapb.TabletAlias, len(scw.sourceShards))
for i, si := range scw.sourceShards {
var err error
- scw.offlineSourceAliases[i], err = FindWorkerTablet(ctx, scw.wr, scw.cleaner, scw.tsc, scw.cell, si.Keyspace(), si.ShardName(), scw.minHealthyRdonlyTablets, topodatapb.TabletType_RDONLY)
+ scw.sourceAliases[i], err = FindWorkerTablet(ctx, scw.wr, scw.cleaner, scw.tsc, scw.cell, si.Keyspace(), si.ShardName(), scw.minHealthyTablets, scw.tabletType)
if err != nil {
return vterrors.Wrapf(err, "FindWorkerTablet() failed for %v/%v/%v", scw.cell, si.Keyspace(), si.ShardName())
}
- scw.wr.Logger().Infof("Using tablet %v as source for %v/%v", topoproto.TabletAliasString(scw.offlineSourceAliases[i]), si.Keyspace(), si.ShardName())
+ scw.wr.Logger().Infof("Using tablet %v as source for %v/%v", topoproto.TabletAliasString(scw.sourceAliases[i]), si.Keyspace(), si.ShardName())
}
- scw.setFormattedOfflineSources(scw.offlineSourceAliases)
+ scw.setFormattedOfflineSources(scw.sourceAliases)
// get the tablet info for them, and stop their replication
- scw.sourceTablets = make([]*topodatapb.Tablet, len(scw.offlineSourceAliases))
- for i, alias := range scw.offlineSourceAliases {
+ scw.sourceTablets = make([]*topodatapb.Tablet, len(scw.sourceAliases))
+ for i, alias := range scw.sourceAliases {
shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
ti, err := scw.wr.TopoServer().GetTablet(shortCtx, alias)
cancel()
@@ -740,6 +765,44 @@ func (scw *SplitCloneWorker) findOfflineSourceTablets(ctx context.Context) error
return nil
}
+// findTransactionalSources phase:
+// - get the aliases of all the source tablets
+func (scw *SplitCloneWorker) findTransactionalSources(ctx context.Context) error {
+ scw.setState(WorkerStateFindTargets)
+
+ if len(scw.sourceShards) > 1 {
+ return fmt.Errorf("consistent snapshot can only be used with a single source shard")
+ }
+ var err error
+
+ // find an appropriate tablet in the source shard
+ si := scw.sourceShards[0]
+ scw.sourceAliases = make([]*topodatapb.TabletAlias, 1)
+ scw.sourceAliases[0], err = FindHealthyTablet(ctx, scw.wr, scw.tsc, scw.cell, si.Keyspace(), si.ShardName(), scw.minHealthyTablets, scw.tabletType)
+ if err != nil {
+ return fmt.Errorf("FindHealthyTablet() failed for %v/%v/%v: %v", scw.cell, si.Keyspace(), si.ShardName(), err)
+ }
+ scw.wr.Logger().Infof("Using tablet %v as source for %v/%v", topoproto.TabletAliasString(scw.sourceAliases[0]), si.Keyspace(), si.ShardName())
+ scw.setFormattedOfflineSources(scw.sourceAliases)
+
+ // get the tablet info
+ scw.sourceTablets = make([]*topodatapb.Tablet, 1)
+ shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
+ ti, err := scw.wr.TopoServer().GetTablet(shortCtx, scw.sourceAliases[0])
+ cancel()
+ if err != nil {
+ return fmt.Errorf("cannot read tablet %v: %v", topoproto.TabletAliasString(scw.sourceAliases[0]), err)
+ }
+ scw.sourceTablets[0] = ti.Tablet
+
+ // stop replication and create transactions to work on
+ txs, gtid, err := CreateConsistentTransactions(ctx, ti, scw.wr, scw.cleaner, scw.sourceReaderCount)
+ scw.wr.Logger().Infof("created %v transactions", len(txs))
+ scw.lastPos = gtid
+ scw.transactions = txs
+ return nil
+}
+
// findDestinationMasters finds for each destination shard the current master.
func (scw *SplitCloneWorker) findDestinationMasters(ctx context.Context) error {
scw.setState(WorkerStateFindTargets)
@@ -748,8 +811,9 @@ func (scw *SplitCloneWorker) findDestinationMasters(ctx context.Context) error {
scw.wr.Logger().Infof("Finding a MASTER tablet for each destination shard...")
for _, si := range scw.destinationShards {
waitCtx, waitCancel := context.WithTimeout(ctx, *waitForHealthyTabletsTimeout)
- defer waitCancel()
- if err := scw.tsc.WaitForTablets(waitCtx, scw.cell, si.Keyspace(), si.ShardName(), topodatapb.TabletType_MASTER); err != nil {
+ err := scw.tsc.WaitForTablets(waitCtx, scw.cell, si.Keyspace(), si.ShardName(), topodatapb.TabletType_MASTER)
+ waitCancel()
+ if err != nil {
return vterrors.Wrapf(err, "cannot find MASTER tablet for destination shard for %v/%v (in cell: %v)", si.Keyspace(), si.ShardName(), scw.cell)
}
masters := scw.tsc.GetHealthyTabletStats(si.Keyspace(), si.ShardName(), topodatapb.TabletType_MASTER)
@@ -775,18 +839,18 @@ func (scw *SplitCloneWorker) waitForTablets(ctx context.Context, shardInfos []*t
var wg sync.WaitGroup
rec := concurrency.AllErrorRecorder{}
- if scw.minHealthyRdonlyTablets > 0 && len(shardInfos) > 0 {
- scw.wr.Logger().Infof("Waiting %v for %d %s/%s RDONLY tablet(s)", timeout, scw.minHealthyRdonlyTablets, shardInfos[0].Keyspace(), shardInfos[0].ShardName())
+ if scw.minHealthyTablets > 0 && len(shardInfos) > 0 {
+ scw.wr.Logger().Infof("Waiting %v for %d %s/%s RDONLY tablet(s)", timeout, scw.minHealthyTablets, shardInfos[0].Keyspace(), shardInfos[0].ShardName())
}
for _, si := range shardInfos {
wg.Add(1)
go func(keyspace, shard string) {
defer wg.Done()
- // We wait for --min_healthy_rdonly_tablets because we will use several
+ // We wait for --min_healthy_tablets because we will use several
// tablets per shard to spread reading the chunks of rows across as many
// tablets as possible.
- if _, err := waitForHealthyTablets(ctx, scw.wr, scw.tsc, scw.cell, keyspace, shard, scw.minHealthyRdonlyTablets, timeout, topodatapb.TabletType_RDONLY); err != nil {
+ if _, err := waitForHealthyTablets(ctx, scw.wr, scw.tsc, scw.cell, keyspace, shard, scw.minHealthyTablets, timeout, scw.tabletType); err != nil {
rec.RecordError(err)
}
}(si.Keyspace(), si.ShardName())
@@ -795,6 +859,228 @@ func (scw *SplitCloneWorker) waitForTablets(ctx context.Context, shardInfos []*t
return rec.Error()
}
+func (scw *SplitCloneWorker) findFirstSourceTablet(ctx context.Context, state StatusWorkerState) (*topodatapb.Tablet, error) {
+ if state == WorkerStateCloneOffline {
+ // Use the first source tablet which we took offline.
+ return scw.sourceTablets[0], nil
+ }
+
+ // Pick any healthy serving source tablet.
+ si := scw.sourceShards[0]
+ tablets := scw.tsc.GetHealthyTabletStats(si.Keyspace(), si.ShardName(), scw.tabletType)
+ if len(tablets) == 0 {
+ // We fail fast on this problem and don't retry because at the start all tablets should be healthy.
+ return nil, fmt.Errorf("no healthy %v tablet in source shard (%v) available (required to find out the schema)", topodatapb.TabletType_name[int32(scw.tabletType)], topoproto.KeyspaceShardString(si.Keyspace(), si.ShardName()))
+ }
+ return tablets[0].Tablet, nil
+}
+
+func (scw *SplitCloneWorker) getCounters(state StatusWorkerState) ([]*stats.CountersWithSingleLabel, *tableStatusList) {
+ switch state {
+ case WorkerStateCloneOnline:
+ return []*stats.CountersWithSingleLabel{statsOnlineInsertsCounters, statsOnlineUpdatesCounters, statsOnlineDeletesCounters, statsOnlineEqualRowsCounters},
+ scw.tableStatusListOnline
+ case WorkerStateCloneOffline:
+ return []*stats.CountersWithSingleLabel{statsOfflineInsertsCounters, statsOfflineUpdatesCounters, statsOfflineDeletesCounters, statsOfflineEqualRowsCounters},
+ scw.tableStatusListOffline
+ default:
+ panic("should not happen")
+ }
+}
+
+func (scw *SplitCloneWorker) startExecutor(ctx context.Context, wg *sync.WaitGroup, keyspace, shard string, insertChannel chan string, threadID int, processError func(string, ...interface{})) {
+ defer wg.Done()
+ t := scw.getThrottler(keyspace, shard)
+ //defer t.ThreadFinished(threadID)
+
+ executor := newExecutor(scw.wr, scw.tsc, t, keyspace, shard, threadID)
+ if err := executor.fetchLoop(ctx, insertChannel); err != nil {
+ processError("executer.FetchLoop failed: %v", err)
+ }
+}
+
+func mergeOrSingle(readers []ResultReader, td *tabletmanagerdatapb.TableDefinition) (ResultReader, error) {
+ if len(readers) == 1 {
+ return readers[0], nil
+ }
+
+ sourceReader, err := NewResultMerger(readers, len(td.PrimaryKeyColumns))
+ if err != nil {
+ return nil, err
+ }
+
+ return sourceReader, nil
+}
+
+func (scw *SplitCloneWorker) getSourceResultReader(ctx context.Context, td *tabletmanagerdatapb.TableDefinition, state StatusWorkerState, chunk chunk, txID int64) (ResultReader, error) {
+ sourceReaders := make([]ResultReader, len(scw.sourceShards))
+ for shardIndex, si := range scw.sourceShards {
+ var sourceResultReader ResultReader
+ var err error
+ if state == WorkerStateCloneOffline && scw.useConsistentSnapshot {
+ if txID < 1 {
+ return nil, fmt.Errorf("tried using consistent snapshot without a valid transaction")
+ }
+ tp := newShardTabletProvider(scw.tsc, scw.tabletTracker, si.Keyspace(), si.ShardName(), scw.tabletType)
+ sourceResultReader, err = NewTransactionalRestartableResultReader(ctx, scw.wr.Logger(), tp, td, chunk, false, txID)
+ } else {
+ var tp tabletProvider
+ allowMultipleRetries := true
+ if state == WorkerStateCloneOffline {
+ tp = newSingleTabletProvider(ctx, scw.wr.TopoServer(), scw.sourceAliases[shardIndex])
+ // allowMultipleRetries is false to avoid that we'll keep retrying
+ // on the same tablet alias for hours. This guards us against the
+ // situation that an offline tablet gets restarted and serves again.
+ // In that case we cannot use it because its replication is no
+ // longer stopped at the same point as we took it offline initially.
+ allowMultipleRetries = false
+ } else {
+ tp = newShardTabletProvider(scw.tsc, scw.tabletTracker, si.Keyspace(), si.ShardName(), scw.tabletType)
+ }
+ sourceResultReader, err = NewRestartableResultReader(ctx, scw.wr.Logger(), tp, td, chunk, allowMultipleRetries)
+ if err != nil {
+ return nil, fmt.Errorf("NewRestartableResultReader for source: %v failed: %v", tp.description(), err)
+ }
+ }
+ // TODO: We could end up in a situation where some readers have been created but not all. In this situation, we would not close up all readers
+ sourceReaders[shardIndex] = sourceResultReader
+ }
+ return mergeOrSingle(sourceReaders, td)
+}
+
+func (scw *SplitCloneWorker) getDestinationResultReader(ctx context.Context, td *tabletmanagerdatapb.TableDefinition, state StatusWorkerState, chunk chunk) (ResultReader, error) {
+ destReaders := make([]ResultReader, len(scw.destinationShards))
+ for shardIndex, si := range scw.destinationShards {
+ tp := newShardTabletProvider(scw.tsc, scw.tabletTracker, si.Keyspace(), si.ShardName(), topodatapb.TabletType_MASTER)
+ destResultReader, err := NewRestartableResultReader(ctx, scw.wr.Logger(), tp, td, chunk, true /* allowMultipleRetries */)
+ if err != nil {
+ return nil, fmt.Errorf("NewRestartableResultReader for destination: %v failed: %v", tp.description(), err)
+ }
+ destReaders[shardIndex] = destResultReader
+ }
+ return mergeOrSingle(destReaders, td)
+}
+
+func (scw *SplitCloneWorker) cloneAChunk(ctx context.Context, td *tabletmanagerdatapb.TableDefinition, tableIndex int, chunk chunk, processError func(string, ...interface{}), state StatusWorkerState, tableStatusList *tableStatusList, keyResolver keyspaceIDResolver, start time.Time, insertChannels []chan string, txID int64, statsCounters []*stats.CountersWithSingleLabel) {
+ errPrefix := fmt.Sprintf("table=%v chunk=%v", td.Name, chunk)
+
+ var err error
+
+ if err := checkDone(ctx); err != nil {
+ processError("%v: Context expired while this thread was waiting for its turn. Context error: %v", errPrefix, err)
+ return
+ }
+
+ tableStatusList.threadStarted(tableIndex)
+ defer tableStatusList.threadDone(tableIndex)
+
+ if state == WorkerStateCloneOnline {
+ // Wait for enough healthy tablets (they might have become unhealthy
+ // and their replication lag might have increased since we started.)
+ if err := scw.waitForTablets(ctx, scw.sourceShards, *retryDuration); err != nil {
+ processError("%v: No healthy source tablets found (gave up after %v): %v", errPrefix, time.Since(start), err)
+ return
+ }
+ }
+
+ // Set up readers for the diff. There will be one reader for every
+ // source and destination shard.
+ sourceReader, err := scw.getSourceResultReader(ctx, td, state, chunk, txID)
+ if err != nil {
+ processError("%v NewResultMerger for source tablets failed: %v", errPrefix, err)
+ return
+ }
+ defer sourceReader.Close(ctx)
+ destReader, err := scw.getDestinationResultReader(ctx, td, state, chunk)
+ if err != nil {
+ processError("%v NewResultMerger for destinations tablets failed: %v", errPrefix, err)
+ return
+ }
+ defer destReader.Close(ctx)
+ dbNames := make([]string, len(scw.destinationShards))
+ for i, si := range scw.destinationShards {
+ keyspaceAndShard := topoproto.KeyspaceShardString(si.Keyspace(), si.ShardName())
+ dbNames[i] = scw.destinationDbNames[keyspaceAndShard]
+ }
+ // Compare the data and reconcile any differences.
+ differ, err := NewRowDiffer2(ctx, sourceReader, destReader, td, tableStatusList, tableIndex,
+ scw.destinationShards, keyResolver,
+ insertChannels, ctx.Done(), dbNames, scw.writeQueryMaxRows, scw.writeQueryMaxSize, statsCounters)
+ if err != nil {
+ processError("%v: NewRowDiffer2 failed: %v", errPrefix, err)
+ return
+ }
+ // Ignore the diff report because all diffs should get reconciled.
+ _ /* DiffReport */, err = differ.Diff()
+ if err != nil {
+ processError("%v: RowDiffer2 failed: %v", errPrefix, err)
+ return
+ }
+}
+
+type workUnit struct {
+ td *tabletmanagerdatapb.TableDefinition
+ chunk chunk
+ threadID int
+ resolver keyspaceIDResolver
+}
+
+func (scw *SplitCloneWorker) startCloningData(ctx context.Context, state StatusWorkerState, sourceSchemaDefinition *tabletmanagerdatapb.SchemaDefinition,
+ processError func(string, ...interface{}), firstSourceTablet *topodatapb.Tablet, tableStatusList *tableStatusList,
+ start time.Time, statsCounters []*stats.CountersWithSingleLabel, insertChannels []chan string, wg *sync.WaitGroup) error {
+
+ workPipeline := make(chan workUnit, 10) // We'll use a small buffer so producers do not run too far ahead of consumers
+ queryService, err := tabletconn.GetDialer()(firstSourceTablet, true)
+ if err != nil {
+ return fmt.Errorf("failed to create queryService: %v", err)
+ }
+ defer queryService.Close(ctx)
+
+ // Let's start the work consumers
+ for i := 0; i < scw.sourceReaderCount; i++ {
+ var txID int64
+ if scw.useConsistentSnapshot && state == WorkerStateCloneOffline {
+ txID = scw.transactions[i]
+ } else {
+ txID = -1
+ }
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for work := range workPipeline {
+ scw.cloneAChunk(ctx, work.td, work.threadID, work.chunk, processError, state, tableStatusList, work.resolver, start, insertChannels, txID, statsCounters)
+ }
+ }()
+ }
+
+ // And now let's start producing work units
+ for tableIndex, td := range sourceSchemaDefinition.TableDefinitions {
+ td = reorderColumnsPrimaryKeyFirst(td)
+
+ keyResolver, err := scw.createKeyResolver(td)
+ if err != nil {
+ return fmt.Errorf("cannot resolve sharding keys for keyspace %v: %v", scw.destinationKeyspace, err)
+ }
+
+ // TODO(mberlin): We're going to chunk *all* source shards based on the MIN
+ // and MAX values of the *first* source shard. Is this going to be a problem?
+ chunks, err := generateChunks(ctx, scw.wr, firstSourceTablet, td, scw.chunkCount, scw.minRowsPerChunk)
+ if err != nil {
+ return fmt.Errorf("failed to split table into chunks: %v", err)
+ }
+ tableStatusList.setThreadCount(tableIndex, len(chunks))
+
+ for _, c := range chunks {
+ workPipeline <- workUnit{td: td, chunk: c, threadID: tableIndex, resolver: keyResolver}
+ }
+ }
+
+ close(workPipeline)
+
+ return nil
+}
+
// copy phase:
// - copy the data from source tablets to destination masters (with replication on)
// Assumes that the schema has already been created on each destination tablet
@@ -809,31 +1095,13 @@ func (scw *SplitCloneWorker) clone(ctx context.Context, state StatusWorkerState)
statsStateDurationsNs.Set(string(state), time.Now().Sub(start).Nanoseconds())
}()
- var firstSourceTablet *topodatapb.Tablet
- if state == WorkerStateCloneOffline {
- // Use the first source tablet which we took offline.
- firstSourceTablet = scw.sourceTablets[0]
- } else {
- // Pick any healthy serving source tablet.
- si := scw.sourceShards[0]
- tablets := scw.tsc.GetHealthyTabletStats(si.Keyspace(), si.ShardName(), topodatapb.TabletType_RDONLY)
- if len(tablets) == 0 {
- // We fail fast on this problem and don't retry because at the start all tablets should be healthy.
- return fmt.Errorf("no healthy RDONLY tablet in source shard (%v) available (required to find out the schema)", topoproto.KeyspaceShardString(si.Keyspace(), si.ShardName()))
- }
- firstSourceTablet = tablets[0].Tablet
- }
- var statsCounters []*stats.CountersWithSingleLabel
- var tableStatusList *tableStatusList
- switch state {
- case WorkerStateCloneOnline:
- statsCounters = []*stats.CountersWithSingleLabel{statsOnlineInsertsCounters, statsOnlineUpdatesCounters, statsOnlineDeletesCounters, statsOnlineEqualRowsCounters}
- tableStatusList = scw.tableStatusListOnline
- case WorkerStateCloneOffline:
- statsCounters = []*stats.CountersWithSingleLabel{statsOfflineInsertsCounters, statsOfflineUpdatesCounters, statsOfflineDeletesCounters, statsOfflineEqualRowsCounters}
- tableStatusList = scw.tableStatusListOffline
+ var firstSourceTablet, err = scw.findFirstSourceTablet(ctx, state)
+ if err != nil {
+ return err
}
+ statsCounters, tableStatusList := scw.getCounters(state)
+
// The throttlers exist only for the duration of this clone() call.
// That means a SplitClone invocation with both online and offline phases
// will create throttlers for each phase.
@@ -849,22 +1117,19 @@ func (scw *SplitCloneWorker) clone(ctx context.Context, state StatusWorkerState)
scw.wr.Logger().Infof("Source tablet 0 has %v tables to copy", len(sourceSchemaDefinition.TableDefinitions))
tableStatusList.initialize(sourceSchemaDefinition)
- // In parallel, setup the channels to send SQL data chunks to for each destination tablet:
- //
- // mu protects the context for cancelation, and firstError
- mu := sync.Mutex{}
var firstError error
ctx, cancelCopy := context.WithCancel(ctx)
defer cancelCopy()
processError := func(format string, args ...interface{}) {
- mu.Lock()
+ // in theory we could have two threads see firstError as null and both write to the variable
+ // that should not cause any problems though - canceling and logging is concurrently safe,
+ // and overwriting the variable will not cause any problems
+ scw.wr.Logger().Errorf(format, args...)
if firstError == nil {
- scw.wr.Logger().Errorf(format, args...)
firstError = fmt.Errorf(format, args...)
cancelCopy()
}
- mu.Unlock()
}
// NOTE: Code below this point must *not* use "return" to exit this Go routine
@@ -877,6 +1142,7 @@ func (scw *SplitCloneWorker) clone(ctx context.Context, state StatusWorkerState)
// races between "defer throttler.ThreadFinished()" (must be executed first)
// and "defer scw.closeThrottlers()". Otherwise, vtworker will panic.
+ // In parallel, setup the channels to send SQL data chunks to for each destination tablet:
insertChannels := make([]chan string, len(scw.destinationShards))
destinationWaitGroup := sync.WaitGroup{}
for shardIndex, si := range scw.destinationShards {
@@ -888,168 +1154,41 @@ func (scw *SplitCloneWorker) clone(ctx context.Context, state StatusWorkerState)
for j := 0; j < scw.destinationWriterCount; j++ {
destinationWaitGroup.Add(1)
- go func(keyspace, shard string, insertChannel chan string, throttler *throttler.Throttler, threadID int) {
- defer destinationWaitGroup.Done()
- defer throttler.ThreadFinished(threadID)
-
- executor := newExecutor(scw.wr, scw.tsc, throttler, keyspace, shard, threadID)
- if err := executor.fetchLoop(ctx, insertChannel); err != nil {
- processError("executer.FetchLoop failed: %v", err)
- }
- }(si.Keyspace(), si.ShardName(), insertChannels[shardIndex], scw.getThrottler(si.Keyspace(), si.ShardName()), j)
+ go scw.startExecutor(ctx, &destinationWaitGroup, si.Keyspace(), si.ShardName(), insertChannels[shardIndex], j, processError)
}
}
// Now for each table, read data chunks and send them to all
// insertChannels
- sourceWaitGroup := sync.WaitGroup{}
- sema := sync2.NewSemaphore(scw.sourceReaderCount, 0)
- for tableIndex, td := range sourceSchemaDefinition.TableDefinitions {
- td = reorderColumnsPrimaryKeyFirst(td)
-
- keyResolver, err := scw.createKeyResolver(td)
- if err != nil {
- processError("cannot resolve sharding keys for keyspace %v: %v", scw.destinationKeyspace, err)
- break
- }
-
- // TODO(mberlin): We're going to chunk *all* source shards based on the MIN
- // and MAX values of the *first* source shard. Is this going to be a problem?
- chunks, err := generateChunks(ctx, scw.wr, firstSourceTablet, td, scw.chunkCount, scw.minRowsPerChunk)
- if err != nil {
- processError("failed to split table into chunks: %v", err)
- break
- }
- tableStatusList.setThreadCount(tableIndex, len(chunks))
-
- for _, c := range chunks {
- sourceWaitGroup.Add(1)
- go func(td *tabletmanagerdatapb.TableDefinition, tableIndex int, chunk chunk) {
- defer sourceWaitGroup.Done()
- errPrefix := fmt.Sprintf("table=%v chunk=%v", td.Name, chunk)
-
- // We need our own error per Go routine to avoid races.
- var err error
-
- sema.Acquire()
- defer sema.Release()
+ readers := sync.WaitGroup{}
- if err := checkDone(ctx); err != nil {
- processError("%v: Context expired while this thread was waiting for its turn. Context error: %v", errPrefix, err)
- return
- }
-
- tableStatusList.threadStarted(tableIndex)
- defer tableStatusList.threadDone(tableIndex)
-
- if state == WorkerStateCloneOnline {
- // Wait for enough healthy tablets (they might have become unhealthy
- // and their replication lag might have increased since we started.)
- if err := scw.waitForTablets(ctx, scw.sourceShards, *retryDuration); err != nil {
- processError("%v: No healthy source tablets found (gave up after %v): %v", errPrefix, time.Since(start), err)
- return
- }
- }
-
- // Set up readers for the diff. There will be one reader for every
- // source and destination shard.
- sourceReaders := make([]ResultReader, len(scw.sourceShards))
- destReaders := make([]ResultReader, len(scw.destinationShards))
- for shardIndex, si := range scw.sourceShards {
- var tp tabletProvider
- allowMultipleRetries := true
- if state == WorkerStateCloneOffline {
- tp = newSingleTabletProvider(ctx, scw.wr.TopoServer(), scw.offlineSourceAliases[shardIndex])
- // allowMultipleRetries is false to avoid that we'll keep retrying
- // on the same tablet alias for hours. This guards us against the
- // situation that an offline tablet gets restarted and serves again.
- // In that case we cannot use it because its replication is no
- // longer stopped at the same point as we took it offline initially.
- allowMultipleRetries = false
- } else {
- tp = newShardTabletProvider(scw.tsc, scw.tabletTracker, si.Keyspace(), si.ShardName(), topodatapb.TabletType_RDONLY)
- }
- sourceResultReader, err := NewRestartableResultReader(ctx, scw.wr.Logger(), tp, td, chunk, allowMultipleRetries)
- if err != nil {
- processError("%v: NewRestartableResultReader for source: %v failed: %v", errPrefix, tp.description(), err)
- return
- }
- defer sourceResultReader.Close(ctx)
- sourceReaders[shardIndex] = sourceResultReader
- }
-
- for shardIndex, si := range scw.destinationShards {
- tp := newShardTabletProvider(scw.tsc, scw.tabletTracker, si.Keyspace(), si.ShardName(), topodatapb.TabletType_MASTER)
- destResultReader, err := NewRestartableResultReader(ctx, scw.wr.Logger(), tp, td, chunk, true /* allowMultipleRetries */)
- if err != nil {
- processError("%v: NewRestartableResultReader for destination: %v failed: %v", errPrefix, tp.description(), err)
- return
- }
- defer destResultReader.Close(ctx)
- destReaders[shardIndex] = destResultReader
- }
-
- var sourceReader ResultReader
- var destReader ResultReader
- if len(sourceReaders) >= 2 {
- sourceReader, err = NewResultMerger(sourceReaders, len(td.PrimaryKeyColumns))
- if err != nil {
- processError("%v: NewResultMerger for source tablets failed: %v", errPrefix, err)
- return
- }
- } else {
- sourceReader = sourceReaders[0]
- }
- if len(destReaders) >= 2 {
- destReader, err = NewResultMerger(destReaders, len(td.PrimaryKeyColumns))
- if err != nil {
- processError("%v: NewResultMerger for destination tablets failed: %v", errPrefix, err)
- return
- }
- } else {
- destReader = destReaders[0]
- }
-
- dbNames := make([]string, len(scw.destinationShards))
- for i, si := range scw.destinationShards {
- keyspaceAndShard := topoproto.KeyspaceShardString(si.Keyspace(), si.ShardName())
- dbNames[i] = scw.destinationDbNames[keyspaceAndShard]
- }
- // Compare the data and reconcile any differences.
- differ, err := NewRowDiffer2(ctx, sourceReader, destReader, td, tableStatusList, tableIndex,
- scw.destinationShards, keyResolver,
- insertChannels, ctx.Done(), dbNames, scw.writeQueryMaxRows, scw.writeQueryMaxSize, statsCounters)
- if err != nil {
- processError("%v: NewRowDiffer2 failed: %v", errPrefix, err)
- return
- }
- // Ignore the diff report because all diffs should get reconciled.
- _ /* DiffReport */, err = differ.Diff()
- if err != nil {
- processError("%v: RowDiffer2 failed: %v", errPrefix, err)
- return
- }
- }(td, tableIndex, c)
- }
+ err = scw.startCloningData(ctx, state, sourceSchemaDefinition, processError, firstSourceTablet, tableStatusList, start, statsCounters, insertChannels, &readers)
+ if err != nil {
+ return fmt.Errorf("failed to startCloningData : %v", err)
}
- sourceWaitGroup.Wait()
+ readers.Wait()
for shardIndex := range scw.destinationShards {
close(insertChannels[shardIndex])
}
destinationWaitGroup.Wait()
- if firstError != nil {
- return firstError
- }
- if state == WorkerStateCloneOffline {
- // Create and populate the vreplication table to give filtered replication
- // a starting point.
- queries := make([]string, 0, 4)
- queries = append(queries, binlogplayer.CreateVReplicationTable()...)
+ return firstError
+}
- // get the current position from the sources
- sourcePositions := make([]string, len(scw.sourceShards))
+func (scw *SplitCloneWorker) setUpVReplication(ctx context.Context) error {
+ wg := sync.WaitGroup{}
+ // Create and populate the vreplication table to give filtered replication
+ // a starting point.
+ queries := make([]string, 0, 4)
+ queries = append(queries, binlogplayer.CreateVReplicationTable()...)
+
+ // get the current position from the sources
+ sourcePositions := make([]string, len(scw.sourceShards))
+
+ if scw.useConsistentSnapshot {
+ sourcePositions[0] = scw.lastPos
+ } else {
for shardIndex := range scw.sourceShards {
shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
status, err := scw.wr.TabletManagerClient().SlaveStatus(shortCtx, scw.sourceTablets[shardIndex])
@@ -1059,46 +1198,60 @@ func (scw *SplitCloneWorker) clone(ctx context.Context, state StatusWorkerState)
}
sourcePositions[shardIndex] = status.Position
}
+ }
+ cancelableCtx, cancel := context.WithCancel(ctx)
+ rec := concurrency.AllErrorRecorder{}
+ handleError := func(e error) {
+ rec.RecordError(e)
+ cancel()
+ }
- for _, si := range scw.destinationShards {
- destinationWaitGroup.Add(1)
- go func(keyspace, shard string, kr *topodatapb.KeyRange) {
- defer destinationWaitGroup.Done()
- scw.wr.Logger().Infof("Making and populating vreplication table")
-
- exc := newExecutor(scw.wr, scw.tsc, nil, keyspace, shard, 0)
- for shardIndex, src := range scw.sourceShards {
- bls := &binlogdatapb.BinlogSource{
- Keyspace: src.Keyspace(),
- Shard: src.ShardName(),
- }
- if scw.tables == nil {
- bls.KeyRange = kr
- } else {
- bls.Tables = scw.tables
- }
- // TODO(mberlin): Fill in scw.maxReplicationLag once the adapative
- // throttler is enabled by default.
- qr, err := exc.vreplicationExec(ctx, binlogplayer.CreateVReplication("SplitClone", bls, sourcePositions[shardIndex], scw.maxTPS, throttler.ReplicationLagModuleDisabled, time.Now().Unix()))
- if err != nil {
- processError("vreplication queries failed: %v", err)
- break
- }
- if err := scw.wr.SourceShardAdd(ctx, keyspace, shard, uint32(qr.InsertID), src.Keyspace(), src.ShardName(), src.Shard.KeyRange, scw.tables); err != nil {
- processError("could not add source shard: %v", err)
- break
- }
+ for _, si := range scw.destinationShards {
+ wg.Add(1)
+ go func(keyspace, shard string, kr *topodatapb.KeyRange) {
+ defer wg.Done()
+ scw.wr.Logger().Infof("Making and populating vreplication table")
+
+ exc := newExecutor(scw.wr, scw.tsc, nil, keyspace, shard, 0)
+ for shardIndex, src := range scw.sourceShards {
+ // Check if any error occurred in any other gorouties:
+ select {
+ case <-cancelableCtx.Done():
+ return // Error somewhere, terminate
+ default:
}
- // refreshState will cause the destination to become non-serving because
- // it's now participating in the resharding workflow.
- if err := exc.refreshState(ctx); err != nil {
- processError("RefreshState failed on tablet %v/%v: %v", keyspace, shard, err)
+
+ bls := &binlogdatapb.BinlogSource{
+ Keyspace: src.Keyspace(),
+ Shard: src.ShardName(),
}
- }(si.Keyspace(), si.ShardName(), si.KeyRange)
- }
- destinationWaitGroup.Wait()
- } // clonePhase == offline
- return firstError
+ if scw.tables == nil {
+ bls.KeyRange = kr
+ } else {
+ bls.Tables = scw.tables
+ }
+ // TODO(mberlin): Fill in scw.maxReplicationLag once the adapative throttler is enabled by default.
+ qr, err := exc.vreplicationExec(cancelableCtx, binlogplayer.CreateVReplication("SplitClone", bls, sourcePositions[shardIndex], scw.maxTPS, throttler.ReplicationLagModuleDisabled, time.Now().Unix()))
+ if err != nil {
+ handleError(fmt.Errorf("vreplication queries failed: %v", err))
+ cancel()
+ return
+ }
+ if err := scw.wr.SourceShardAdd(cancelableCtx, keyspace, shard, uint32(qr.InsertID), src.Keyspace(), src.ShardName(), src.Shard.KeyRange, scw.tables); err != nil {
+ handleError(fmt.Errorf("could not add source shard: %v", err))
+ break
+ }
+ }
+ // refreshState will cause the destination to become non-serving because
+ // it's now participating in the resharding workflow.
+ if err := exc.refreshState(ctx); err != nil {
+ handleError(fmt.Errorf("RefreshState failed on tablet %v/%v: %v", keyspace, shard, err))
+ }
+ }(si.Keyspace(), si.ShardName(), si.KeyRange)
+ }
+ wg.Wait()
+
+ return rec.Error()
}
func (scw *SplitCloneWorker) getSourceSchema(ctx context.Context, tablet *topodatapb.Tablet) (*tabletmanagerdatapb.SchemaDefinition, error) {
@@ -1201,4 +1354,4 @@ func (scw *SplitCloneWorker) closeThrottlers() {
t.Close()
delete(scw.throttlers, keyspaceAndShard)
}
-}
+}
\ No newline at end of file
diff --git a/go/vt/worker/split_clone_cmd.go b/go/vt/worker/split_clone_cmd.go
index 770af7797c5..e3d1085560d 100644
--- a/go/vt/worker/split_clone_cmd.go
+++ b/go/vt/worker/split_clone_cmd.go
@@ -25,12 +25,13 @@ import (
"strings"
"sync"
- "vitess.io/vitess/go/vt/vterrors"
-
"golang.org/x/net/context"
"vitess.io/vitess/go/vt/concurrency"
+ "vitess.io/vitess/go/vt/proto/topodata"
+ "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/topotools"
+ "vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/wrangler"
)
@@ -82,14 +83,22 @@ const splitCloneHTML2 = `
-
-
+
+
+
+
+
+
+ ?