diff --git a/config/mycnf/default-fast.cnf b/config/mycnf/default-fast.cnf index 16aebb3c77f..ec7f92595f8 100644 --- a/config/mycnf/default-fast.cnf +++ b/config/mycnf/default-fast.cnf @@ -28,7 +28,7 @@ key_buffer_size = 2M log-error = {{.ErrorLogPath}} long_query_time = 2 max_allowed_packet = 16M -max_connections = 100 +max_connections = 200 net_write_timeout = 60 pid-file = {{.PidFile}} port = {{.MysqlPort}} diff --git a/go/vt/worker/executor.go b/go/vt/worker/executor.go index bc9cedfce87..757cdaf00e9 100644 --- a/go/vt/worker/executor.go +++ b/go/vt/worker/executor.go @@ -220,7 +220,7 @@ func (e *executor) checkError(ctx context.Context, err error, isRetry bool, mast e.wr.Logger().Warningf("ExecuteFetch failed on %v; will reresolve and retry because it's due to a MySQL read-only error: %v", tabletString, err) statsRetryCount.Add(1) statsRetryCounters.Add(retryCategoryReadOnly, 1) - case errNo == "2002" || errNo == "2006" || errNo == "2013": + case errNo == "2002" || errNo == "2006" || errNo == "2013" || errNo == "1053": // Note: // "2006" happens if the connection is already dead. Retrying a query in // this case is safe. @@ -229,6 +229,7 @@ func (e *executor) checkError(ctx context.Context, err error, isRetry bool, mast // it was aborted. If we retry the query and get a duplicate entry error, we // assume that the previous execution was successful and ignore the error. // See below for the handling of duplicate entry error "1062". + // "1053" is mysql shutting down e.wr.Logger().Warningf("ExecuteFetch failed on %v; will reresolve and retry because it's due to a MySQL connection error: %v", tabletString, err) statsRetryCount.Add(1) statsRetryCounters.Add(retryCategoryConnectionError, 1) diff --git a/go/vt/worker/restartable_result_reader.go b/go/vt/worker/restartable_result_reader.go index 1a629e57236..d21110d418e 100644 --- a/go/vt/worker/restartable_result_reader.go +++ b/go/vt/worker/restartable_result_reader.go @@ -82,14 +82,30 @@ func NewRestartableResultReader(ctx context.Context, logger logutil.Logger, tp t allowMultipleRetries: allowMultipleRetries, } - // If the initial connection fails, we do not restart. - if _ /* retryable */, err := r.getTablet(); err != nil { - return nil, vterrors.Wrap(err, "tablet=unknown") - } - if _ /* retryable */, err := r.startStream(); err != nil { - return nil, vterrors.Wrapf(err, "tablet=%v", topoproto.TabletAliasString(r.tablet.Alias)) + // If the initial connection fails we retry once. + // Note: The first retry will be the second attempt. + attempt := 0 + for { + attempt++ + var err error + var retryable bool + if retryable, err = r.getTablet(); err != nil { + err = fmt.Errorf("tablet=unknown: %v", err) + goto retry + } + if retryable, err = r.startStream(); err != nil { + err = fmt.Errorf("tablet=%v: %v", topoproto.TabletAliasString(r.tablet.Alias), err) + goto retry + } + return r, nil + + retry: + if !retryable || attempt > 1 { + return nil, fmt.Errorf("failed to initialize tablet connection: retryable %v, %v", retryable, err) + } + statsRetryCount.Add(1) + logger.Infof("retrying after error: %v", err) } - return r, nil } // getTablet (re)sets the tablet which is used for the streaming query. @@ -172,7 +188,7 @@ func (r *RestartableResultReader) nextWithRetries() (*sqltypes.Result, error) { retryCtx, retryCancel := context.WithTimeout(r.ctx, *retryDuration) defer retryCancel() - // Note: The first retry will be the second attempt. + // The first retry is the second attempt because we already tried once in Next() attempt := 1 start := time.Now() for { diff --git a/go/vt/worker/split_clone.go b/go/vt/worker/split_clone.go index 6fe32e9a26d..ff18878cb74 100644 --- a/go/vt/worker/split_clone.go +++ b/go/vt/worker/split_clone.go @@ -433,29 +433,20 @@ func (scw *SplitCloneWorker) run(ctx context.Context) error { return err } - // Phase 2a: Find destination master tablets. + // Phase 2: Find destination master tablets. if err := scw.findDestinationMasters(ctx); err != nil { return vterrors.Wrap(err, "findDestinationMasters() failed") } if err := checkDone(ctx); err != nil { return err } - // Phase 2b: Wait for minimum number of destination tablets (required for the - // diff). Note that while we wait for the minimum number, we'll always use - // *all* available RDONLY tablets from each destination shard. - if err := scw.waitForTablets(ctx, scw.destinationShards, *waitForHealthyTabletsTimeout); err != nil { - return vterrors.Wrap(err, "waitForDestinationTablets(destinationShards) failed") - } - if err := checkDone(ctx); err != nil { - return err - } // Phase 3: (optional) online clone. if scw.online { scw.wr.Logger().Infof("Online clone will be run now.") // 3a: Wait for minimum number of source tablets (required for the diff). if err := scw.waitForTablets(ctx, scw.sourceShards, *waitForHealthyTabletsTimeout); err != nil { - return vterrors.Wrap(err, "waitForDestinationTablets(sourceShards) failed") + return vterrors.Wrap(err, "waitForTablets(sourceShards) failed") } // 3b: Clone the data. start := time.Now() @@ -784,7 +775,7 @@ func (scw *SplitCloneWorker) waitForTablets(ctx context.Context, shardInfos []*t var wg sync.WaitGroup rec := concurrency.AllErrorRecorder{} - if len(shardInfos) > 0 { + if scw.minHealthyRdonlyTablets > 0 && len(shardInfos) > 0 { scw.wr.Logger().Infof("Waiting %v for %d %s/%s RDONLY tablet(s)", timeout, scw.minHealthyRdonlyTablets, shardInfos[0].Keyspace(), shardInfos[0].ShardName()) } @@ -976,7 +967,7 @@ func (scw *SplitCloneWorker) clone(ctx context.Context, state StatusWorkerState) // longer stopped at the same point as we took it offline initially. allowMultipleRetries = false } else { - tp = newShardTabletProvider(scw.tsc, scw.tabletTracker, si.Keyspace(), si.ShardName()) + tp = newShardTabletProvider(scw.tsc, scw.tabletTracker, si.Keyspace(), si.ShardName(), topodatapb.TabletType_RDONLY) } sourceResultReader, err := NewRestartableResultReader(ctx, scw.wr.Logger(), tp, td, chunk, allowMultipleRetries) if err != nil { @@ -987,16 +978,8 @@ func (scw *SplitCloneWorker) clone(ctx context.Context, state StatusWorkerState) sourceReaders[shardIndex] = sourceResultReader } - // Wait for enough healthy tablets (they might have become unhealthy - // and their replication lag might have increased due to a previous - // chunk pipeline.) - if err := scw.waitForTablets(ctx, scw.destinationShards, *retryDuration); err != nil { - processError("%v: No healthy destination tablets found (gave up after %v): ", errPrefix, time.Since(start), err) - return - } - for shardIndex, si := range scw.destinationShards { - tp := newShardTabletProvider(scw.tsc, scw.tabletTracker, si.Keyspace(), si.ShardName()) + tp := newShardTabletProvider(scw.tsc, scw.tabletTracker, si.Keyspace(), si.ShardName(), topodatapb.TabletType_MASTER) destResultReader, err := NewRestartableResultReader(ctx, scw.wr.Logger(), tp, td, chunk, true /* allowMultipleRetries */) if err != nil { processError("%v: NewRestartableResultReader for destination: %v failed: %v", errPrefix, tp.description(), err) diff --git a/go/vt/worker/split_clone_test.go b/go/vt/worker/split_clone_test.go index 184e87d1dc1..395555b32ae 100644 --- a/go/vt/worker/split_clone_test.go +++ b/go/vt/worker/split_clone_test.go @@ -57,7 +57,7 @@ const ( ) var ( - errReadOnly = errors.New("The MariaDB server is running with the --read-only option so it cannot execute this statement (errno 1290) during query:") + errReadOnly = errors.New("the MariaDB server is running with the --read-only option so it cannot execute this statement (errno 1290) during query: ") errStreamingQueryTimeout = errors.New("vttablet: generic::unknown: error: the query was killed either because it timed out or was canceled: (errno 2013) (sqlstate HY000) during query: ") ) @@ -76,9 +76,9 @@ type splitCloneTestCase struct { // Destination tablets. leftMasterFakeDb *fakesqldb.DB - leftMasterQs *fakes.StreamHealthQueryService + leftMasterQs *testQueryService rightMasterFakeDb *fakesqldb.DB - rightMasterQs *fakes.StreamHealthQueryService + rightMasterQs *testQueryService // leftReplica is used by the reparent test. leftReplica *testlib.FakeTablet @@ -92,14 +92,23 @@ type splitCloneTestCase struct { // defaultWorkerArgs are the full default arguments to run SplitClone. defaultWorkerArgs []string + + // Used to restore the default values after the test run + defaultExecuteFetchRetryTime time.Duration + defaultRetryDuration time.Duration } func (tc *splitCloneTestCase) setUp(v3 bool) { - tc.setUpWithConcurreny(v3, 10, 2, splitCloneTestRowsCount) + tc.setUpWithConcurrency(v3, 10, 2, splitCloneTestRowsCount) } -func (tc *splitCloneTestCase) setUpWithConcurreny(v3 bool, concurrency, writeQueryMaxRows, rowsCount int) { +func (tc *splitCloneTestCase) setUpWithConcurrency(v3 bool, concurrency, writeQueryMaxRows, rowsCount int) { *useV3ReshardingMode = v3 + + // Reset some retry flags for the tests that change that + tc.defaultRetryDuration = *retryDuration + tc.defaultExecuteFetchRetryTime = *executeFetchRetryTime + tc.ts = memorytopo.NewServer("cell1", "cell2") ctx := context.Background() tc.wi = NewInstance(tc.ts, "cell1", time.Second) @@ -245,12 +254,13 @@ func (tc *splitCloneTestCase) setUpWithConcurreny(v3 bool, concurrency, writeQue } // Fake stream health reponses because vtworker needs them to find the master. - tc.leftMasterQs = fakes.NewStreamHealthQueryService(leftMaster.Target()) - tc.leftMasterQs.AddDefaultHealthResponse() + shqs := fakes.NewStreamHealthQueryService(leftMaster.Target()) + shqs.AddDefaultHealthResponse() + tc.leftMasterQs = newTestQueryService(tc.t, leftMaster.Target(), shqs, 0, 2, topoproto.TabletAliasString(leftMaster.Tablet.Alias), false /* omitKeyspaceID */) tc.leftReplicaQs = fakes.NewStreamHealthQueryService(leftReplica.Target()) - tc.leftReplicaQs.AddDefaultHealthResponse() - tc.rightMasterQs = fakes.NewStreamHealthQueryService(rightMaster.Target()) - tc.rightMasterQs.AddDefaultHealthResponse() + shqs = fakes.NewStreamHealthQueryService(rightMaster.Target()) + shqs.AddDefaultHealthResponse() + tc.rightMasterQs = newTestQueryService(tc.t, rightMaster.Target(), shqs, 1, 2, topoproto.TabletAliasString(rightMaster.Tablet.Alias), false /* omitKeyspaceID */) grpcqueryservice.Register(leftMaster.RPCServer, tc.leftMasterQs) grpcqueryservice.Register(leftReplica.RPCServer, tc.leftReplicaQs) grpcqueryservice.Register(rightMaster.RPCServer, tc.rightMasterQs) @@ -276,6 +286,9 @@ func (tc *splitCloneTestCase) setUpWithConcurreny(v3 bool, concurrency, writeQue } func (tc *splitCloneTestCase) tearDown() { + *retryDuration = tc.defaultRetryDuration + *executeFetchRetryTime = tc.defaultExecuteFetchRetryTime + for _, ft := range tc.tablets { ft.StopActionLoop(tc.t) } @@ -519,7 +532,7 @@ func TestSplitCloneV2_Offline(t *testing.T) { // get processed concurrently while the other pending ones are blocked. func TestSplitCloneV2_Offline_HighChunkCount(t *testing.T) { tc := &splitCloneTestCase{t: t} - tc.setUpWithConcurreny(false /* v3 */, 10, 5 /* writeQueryMaxRows */, 1000 /* rowsCount */) + tc.setUpWithConcurrency(false /* v3 */, 10, 5 /* writeQueryMaxRows */, 1000 /* rowsCount */) defer tc.tearDown() args := make([]string, len(tc.defaultWorkerArgs)) @@ -545,6 +558,9 @@ func TestSplitCloneV2_Offline_RestartStreamingQuery(t *testing.T) { tc.setUp(false /* v3 */) defer tc.tearDown() + // Only wait 1 ms between retries, so that the test passes faster. + *executeFetchRetryTime = 1 * time.Millisecond + // Ensure that this test uses only the first tablet. This makes it easier // to verify that the restart actually happened for that tablet. // SplitClone will ignore the second tablet because we set its replication lag @@ -580,9 +596,12 @@ func TestSplitCloneV2_Offline_RestartStreamingQuery(t *testing.T) { // of the streaming query does not succeed here and instead vtworker will fail. func TestSplitCloneV2_Offline_FailOverStreamingQuery_NotAllowed(t *testing.T) { tc := &splitCloneTestCase{t: t} - tc.setUpWithConcurreny(false /* v3 */, 1, 10, splitCloneTestRowsCount) + tc.setUpWithConcurrency(false /* v3 */, 1, 10, splitCloneTestRowsCount) defer tc.tearDown() + // Only wait 1 ms between retries, so that the test passes faster. + *executeFetchRetryTime = 1 * time.Millisecond + // Ensure that this test uses only the first tablet. tc.sourceRdonlyQs[1].AddHealthResponseWithSecondsBehindMaster(3600) @@ -617,7 +636,7 @@ func TestSplitCloneV2_Offline_FailOverStreamingQuery_NotAllowed(t *testing.T) { // reading the last row. func TestSplitCloneV2_Online_FailOverStreamingQuery(t *testing.T) { tc := &splitCloneTestCase{t: t} - tc.setUpWithConcurreny(false /* v3 */, 1, 10, splitCloneTestRowsCount) + tc.setUpWithConcurrency(false /* v3 */, 1, 10, splitCloneTestRowsCount) defer tc.tearDown() // In the online phase we won't enable filtered replication. Don't expect it. @@ -672,7 +691,7 @@ func TestSplitCloneV2_Online_FailOverStreamingQuery(t *testing.T) { // available. func TestSplitCloneV2_Online_TabletsUnavailableDuringRestart(t *testing.T) { tc := &splitCloneTestCase{t: t} - tc.setUpWithConcurreny(false /* v3 */, 1, 10, splitCloneTestRowsCount) + tc.setUpWithConcurrency(false /* v3 */, 1, 10, splitCloneTestRowsCount) defer tc.tearDown() // In the online phase we won't enable filtered replication. Don't expect it. @@ -692,15 +711,9 @@ func TestSplitCloneV2_Online_TabletsUnavailableDuringRestart(t *testing.T) { tc.sourceRdonlyQs[0].AddHealthResponseWithNotServing() }) - // Only wait 1 ms between retries, so that the test passes faster. - *executeFetchRetryTime = 1 * time.Millisecond // Let vtworker keep retrying and give up rather quickly because the test // will be blocked until it finally fails. - defaultRetryDuration := *retryDuration *retryDuration = 500 * time.Millisecond - defer func() { - *retryDuration = defaultRetryDuration - }() // Run the vtworker command. args := []string{"SplitClone", @@ -757,14 +770,10 @@ func TestSplitCloneV2_Online_Offline(t *testing.T) { // When the online clone inserted the last rows, modify the destination test // query service such that it will return them as well. tc.leftMasterFakeDb.GetEntry(29).AfterFunc = func() { - for i := range []int{0, 1} { - tc.leftRdonlyQs[i].addGeneratedRows(100, 200) - } + tc.leftMasterQs.addGeneratedRows(100, 200) } tc.rightMasterFakeDb.GetEntry(29).AfterFunc = func() { - for i := range []int{0, 1} { - tc.rightRdonlyQs[i].addGeneratedRows(100, 200) - } + tc.rightMasterQs.addGeneratedRows(100, 200) } // Run the vtworker command. @@ -789,7 +798,7 @@ func TestSplitCloneV2_Offline_Reconciliation(t *testing.T) { tc := &splitCloneTestCase{t: t} // We reduce the parallelism to 1 to test the order of expected // insert/update/delete statements on the destination master. - tc.setUpWithConcurreny(false /* v3 */, 1, 10, splitCloneTestRowsCount) + tc.setUpWithConcurrency(false /* v3 */, 1, 10, splitCloneTestRowsCount) defer tc.tearDown() // We assume that an Online Clone ran before which copied the rows 100-199 @@ -807,15 +816,13 @@ func TestSplitCloneV2_Offline_Reconciliation(t *testing.T) { qs.addGeneratedRows(100, 190) } - for i := range []int{0, 1} { - // The destination has rows 100-190 with the source in common. - // Rows 191-200 are extraenous on the destination. - tc.leftRdonlyQs[i].addGeneratedRows(100, 200) - tc.rightRdonlyQs[i].addGeneratedRows(100, 200) - // But some data is outdated data and must be updated. - tc.leftRdonlyQs[i].modifyFirstRows(2) - tc.rightRdonlyQs[i].modifyFirstRows(2) - } + // The destination has rows 100-190 with the source in common. + // Rows 191-200 are extraneous on the destination. + tc.leftMasterQs.addGeneratedRows(100, 200) + tc.rightMasterQs.addGeneratedRows(100, 200) + // But some data is outdated data and must be updated. + tc.leftMasterQs.modifyFirstRows(2) + tc.rightMasterQs.modifyFirstRows(2) // The destination tablets should see inserts, updates and deletes. // Clear the entries added by setUp() because the reconcilation will @@ -893,11 +900,12 @@ func TestSplitCloneV2_RetryDueToReadonly(t *testing.T) { tc.setUp(false /* v3 */) defer tc.tearDown() + // Only wait 1 ms between retries, so that the test passes faster. + *executeFetchRetryTime = 1 * time.Millisecond + // Provoke a retry to test the error handling. tc.leftMasterFakeDb.AddExpectedQueryAtIndex(0, "INSERT INTO `vt_ks`.`table1` (`id`, `msg`, `keyspace_id`) VALUES (*", errReadOnly) tc.rightMasterFakeDb.AddExpectedQueryAtIndex(0, "INSERT INTO `vt_ks`.`table1` (`id`, `msg`, `keyspace_id`) VALUES (*", errReadOnly) - // Only wait 1 ms between retries, so that the test passes faster. - *executeFetchRetryTime = 1 * time.Millisecond // Run the vtworker command. if err := runCommand(t, tc.wi, tc.wi.wr, tc.defaultWorkerArgs); err != nil { @@ -922,6 +930,9 @@ func TestSplitCloneV2_RetryDueToReparent(t *testing.T) { tc.setUp(false /* v3 */) defer tc.tearDown() + // Only wait 1 ms between retries, so that the test passes faster. + *executeFetchRetryTime = 1 * time.Millisecond + // Provoke a reparent just before the copy finishes. // leftReplica will take over for the last, 30th, insert and the vreplication checkpoint. tc.leftReplicaFakeDb.AddExpectedQuery("INSERT INTO `vt_ks`.`table1` (`id`, `msg`, `keyspace_id`) VALUES (*", nil) @@ -960,9 +971,6 @@ func TestSplitCloneV2_RetryDueToReparent(t *testing.T) { // => vtworker has no MASTER to go to and will keep retrying. } - // Only wait 1 ms between retries, so that the test passes faster. - *executeFetchRetryTime = 1 * time.Millisecond - // Run the vtworker command. if err := runCommand(t, tc.wi, tc.wi.wr, tc.defaultWorkerArgs); err != nil { t.Fatal(err) @@ -982,11 +990,15 @@ func TestSplitCloneV2_NoMasterAvailable(t *testing.T) { tc.setUp(false /* v3 */) defer tc.tearDown() + // Only wait 1 ms between retries, so that the test passes faster. + *executeFetchRetryTime = 1 * time.Millisecond + // leftReplica will take over for the last, 30th, insert and the vreplication checkpoint. tc.leftReplicaFakeDb.AddExpectedQuery("INSERT INTO `vt_ks`.`table1` (`id`, `msg`, `keyspace_id`) VALUES (*", nil) // During the 29th write, let the MASTER disappear. tc.leftMasterFakeDb.GetEntry(28).AfterFunc = func() { + t.Logf("setting MASTER tablet to REPLICA") tc.leftMasterQs.UpdateType(topodatapb.TabletType_REPLICA) tc.leftMasterQs.AddDefaultHealthResponse() } @@ -1013,7 +1025,9 @@ func TestSplitCloneV2_NoMasterAvailable(t *testing.T) { defer cancel() for { - if statsRetryCounters.Counts()[retryCategoryNoMasterAvailable] >= 1 { + retries := statsRetryCounters.Counts()[retryCategoryNoMasterAvailable] + if retries >= 1 { + t.Logf("retried on no MASTER %v times", retries) break } @@ -1027,13 +1041,11 @@ func TestSplitCloneV2_NoMasterAvailable(t *testing.T) { // Make leftReplica the new MASTER. tc.leftReplica.Agent.TabletExternallyReparented(ctx, "1") + t.Logf("resetting tablet back to MASTER") tc.leftReplicaQs.UpdateType(topodatapb.TabletType_MASTER) tc.leftReplicaQs.AddDefaultHealthResponse() }() - // Only wait 1 ms between retries, so that the test passes faster. - *executeFetchRetryTime = 1 * time.Millisecond - // Run the vtworker command. if err := runCommand(t, tc.wi, tc.wi.wr, tc.defaultWorkerArgs); err != nil { t.Fatal(err) diff --git a/go/vt/worker/tablet_provider.go b/go/vt/worker/tablet_provider.go index f03602af4c6..2d42bcb7fc6 100644 --- a/go/vt/worker/tablet_provider.go +++ b/go/vt/worker/tablet_provider.go @@ -75,21 +75,22 @@ func (p *singleTabletProvider) description() string { // shardTabletProvider returns a random healthy RDONLY tablet for a given // keyspace and shard. It uses the HealthCheck module to retrieve the tablets. type shardTabletProvider struct { - tsc *discovery.TabletStatsCache - tracker *TabletTracker - keyspace string - shard string + tsc *discovery.TabletStatsCache + tracker *TabletTracker + keyspace string + shard string + tabletType topodatapb.TabletType } -func newShardTabletProvider(tsc *discovery.TabletStatsCache, tracker *TabletTracker, keyspace, shard string) *shardTabletProvider { - return &shardTabletProvider{tsc, tracker, keyspace, shard} +func newShardTabletProvider(tsc *discovery.TabletStatsCache, tracker *TabletTracker, keyspace, shard string, tabletType topodatapb.TabletType) *shardTabletProvider { + return &shardTabletProvider{tsc, tracker, keyspace, shard, tabletType} } func (p *shardTabletProvider) getTablet() (*topodatapb.Tablet, error) { // Pick any healthy serving tablet. - tablets := p.tsc.GetHealthyTabletStats(p.keyspace, p.shard, topodatapb.TabletType_RDONLY) + tablets := p.tsc.GetHealthyTabletStats(p.keyspace, p.shard, p.tabletType) if len(tablets) == 0 { - return nil, fmt.Errorf("%v: no healthy RDONLY tablets available", p.description()) + return nil, fmt.Errorf("%v: no healthy %v tablets available", p.description(), p.tabletType) } return p.tracker.Track(tablets), nil } diff --git a/go/vt/worker/vertical_split_clone_test.go b/go/vt/worker/vertical_split_clone_test.go index 9f31da571da..0d2ac0953ad 100644 --- a/go/vt/worker/vertical_split_clone_test.go +++ b/go/vt/worker/vertical_split_clone_test.go @@ -144,24 +144,20 @@ func TestVerticalSplitClone(t *testing.T) { sourceRdonlyQs.addGeneratedRows(verticalSplitCloneTestMin, verticalSplitCloneTestMax) grpcqueryservice.Register(sourceRdonly.RPCServer, sourceRdonlyQs) - // Set up destination rdonly which will be used as input for the diff during the clone. - destRdonlyShqs := fakes.NewStreamHealthQueryService(destRdonly.Target()) - destRdonlyShqs.AddDefaultHealthResponse() - destRdonlyQs := newTestQueryService(t, destRdonly.Target(), destRdonlyShqs, 0, 1, topoproto.TabletAliasString(destRdonly.Tablet.Alias), true /* omitKeyspaceID */) + // Set up destination master which will be used as input for the diff during the clone. + destMasterShqs := fakes.NewStreamHealthQueryService(destMaster.Target()) + destMasterShqs.AddDefaultHealthResponse() + destMasterQs := newTestQueryService(t, destMaster.Target(), destMasterShqs, 0, 1, topoproto.TabletAliasString(destMaster.Tablet.Alias), true /* omitKeyspaceID */) // This tablet is empty and does not return any rows. - grpcqueryservice.Register(destRdonly.RPCServer, destRdonlyQs) + grpcqueryservice.Register(destMaster.RPCServer, destMasterQs) - // Fake stream health reponses because vtworker needs them to find the master. - qs := fakes.NewStreamHealthQueryService(destMaster.Target()) - qs.AddDefaultHealthResponse() - grpcqueryservice.Register(destMaster.RPCServer, qs) // Only wait 1 ms between retries, so that the test passes faster *executeFetchRetryTime = (1 * time.Millisecond) // When the online clone inserted the last rows, modify the destination test // query service such that it will return them as well. destMasterFakeDb.GetEntry(29).AfterFunc = func() { - destRdonlyQs.addGeneratedRows(verticalSplitCloneTestMin, verticalSplitCloneTestMax) + destMasterQs.addGeneratedRows(verticalSplitCloneTestMin, verticalSplitCloneTestMax) } // Start action loop after having registered all RPC services. diff --git a/go/vt/wrangler/keyspace.go b/go/vt/wrangler/keyspace.go index 5b49bd07fd9..d65e721ba3f 100644 --- a/go/vt/wrangler/keyspace.go +++ b/go/vt/wrangler/keyspace.go @@ -874,6 +874,7 @@ func (wr *Wrangler) showVerticalResharding(ctx context.Context, keyspace, shard } func (wr *Wrangler) cancelVerticalResharding(ctx context.Context, keyspace, shard string) error { + wr.Logger().Infof("Cancel vertical resharding in keyspace %v", keyspace) destinationShard, err := wr.ts.GetShard(ctx, keyspace, shard) if err != nil { return err @@ -895,11 +896,14 @@ func (wr *Wrangler) cancelVerticalResharding(ctx context.Context, keyspace, shar if _, err := wr.tmc.VReplicationExec(ctx, destinationMasterTabletInfo.Tablet, binlogplayer.DeleteVReplication(destinationShard.SourceShards[0].Uid)); err != nil { return err } - _, err = wr.ts.UpdateShardFields(ctx, destinationShard.Keyspace(), destinationShard.ShardName(), func(si *topo.ShardInfo) error { + if _, err = wr.ts.UpdateShardFields(ctx, destinationShard.Keyspace(), destinationShard.ShardName(), func(si *topo.ShardInfo) error { si.SourceShards = nil return nil - }) - return err + }); err != nil { + return err + } + // set destination master back to serving + return wr.refreshMasters(ctx, []*topo.ShardInfo{destinationShard}) } // MigrateServedFrom is used during vertical splits to migrate a diff --git a/test/vertical_split.py b/test/vertical_split.py index 7b33c7923fb..4f9d64b64e0 100755 --- a/test/vertical_split.py +++ b/test/vertical_split.py @@ -427,6 +427,8 @@ def test_vertical_split(self): # test Cancel first utils.run_vtctl(['CancelResharding', 'destination_keyspace/0'], auto_log=True) self.check_no_binlog_player(destination_master) + # master should be in serving state after cancel + utils.check_tablet_query_service(self, destination_master, True, False) # redo VerticalSplitClone utils.run_vtworker(['--cell', 'test_nj', diff --git a/test/worker.py b/test/worker.py index d3f373ba6d9..b84567efb8b 100755 --- a/test/worker.py +++ b/test/worker.py @@ -448,12 +448,6 @@ def verify_successful_worker_copy_with_reparent(self, mysql_down=False): Raises: AssertionError if things didn't go as expected. """ - if mysql_down: - logging.debug('Shutting down mysqld on destination masters.') - utils.wait_procs( - [shard_0_master.shutdown_mysql(), - shard_1_master.shutdown_mysql()]) - worker_proc, worker_port, worker_rpc_port = utils.run_vtworker_bg( ['--cell', 'test_nj', '--use_v3_resharding_mode=false'], auto_log=True) @@ -471,14 +465,13 @@ def verify_successful_worker_copy_with_reparent(self, mysql_down=False): '--destination_writer_count', '1', '--min_healthy_rdonly_tablets', '1', '--max_tps', '9999'] - if not mysql_down: - # Make the clone as slow as necessary such that there is enough time to - # run PlannedReparent in the meantime. - # TODO(mberlin): Once insert_values is fixed to uniformly distribute the - # rows across shards when sorted by primary key, remove - # --chunk_count 2, --min_rows_per_chunk 1 and set - # --source_reader_count back to 1. - args.extend(['--source_reader_count', '2', + # Make the clone as slow as necessary such that there is enough time to + # run PlannedReparent in the meantime. + # TODO(mberlin): Once insert_values is fixed to uniformly distribute the + # rows across shards when sorted by primary key, remove + # --chunk_count 2, --min_rows_per_chunk 1 and set + # --source_reader_count back to 1. + args.extend(['--source_reader_count', '2', '--chunk_count', '2', '--min_rows_per_chunk', '1', '--write_query_max_rows', '1']) @@ -486,6 +479,23 @@ def verify_successful_worker_copy_with_reparent(self, mysql_down=False): workerclient_proc = utils.run_vtworker_client_bg(args, worker_rpc_port) if mysql_down: + # vtworker is blocked at this point. This is a good time to test that its + # throttler server is reacting to RPCs. + self.check_throttler_service('localhost:%d' % worker_rpc_port, + ['test_keyspace/-80', 'test_keyspace/80-'], + 9999) + + utils.poll_for_vars( + 'vtworker', worker_port, + 'WorkerState == cloning the data (online)', + condition_fn=lambda v: v.get('WorkerState') == 'cloning the' + ' data (online)') + + logging.debug('Worker is in copy state, Shutting down mysqld on destination masters.') + utils.wait_procs( + [shard_0_master.shutdown_mysql(), + shard_1_master.shutdown_mysql()]) + # If MySQL is down, we wait until vtworker retried at least once to make # sure it reached the point where a write failed due to MySQL being down. # There should be two retries at least, one for each destination shard. @@ -493,13 +503,7 @@ def verify_successful_worker_copy_with_reparent(self, mysql_down=False): 'vtworker', worker_port, 'WorkerRetryCount >= 2', condition_fn=lambda v: v.get('WorkerRetryCount') >= 2) - logging.debug('Worker has retried at least twice, starting reparent now') - - # vtworker is blocked at this point. This is a good time to test that its - # throttler server is reacting to RPCs. - self.check_throttler_service('localhost:%d' % worker_rpc_port, - ['test_keyspace/-80', 'test_keyspace/80-'], - 9999) + logging.debug('Worker has retried at least once per shard, starting reparent now') # Bring back masters. Since we test with semi-sync now, we need at least # one replica for the new master. This test is already quite expensive,