diff --git a/go/vt/vttablet/tabletmanager/rpc_actions.go b/go/vt/vttablet/tabletmanager/rpc_actions.go index 2ac5da8548a..ba0413969a4 100644 --- a/go/vt/vttablet/tabletmanager/rpc_actions.go +++ b/go/vt/vttablet/tabletmanager/rpc_actions.go @@ -62,6 +62,12 @@ func (agent *ActionAgent) ChangeType(ctx context.Context, tabletType topodatapb. } defer agent.unlock() + // We don't want to allow multiple callers to claim a tablet as drained. There is a race that could happen during + // horizontal resharding where two vtworkers will try to DRAIN the same tablet. This check prevents that race from + // causing errors. + if tabletType == topodatapb.TabletType_DRAINED && agent.Tablet().Type == topodatapb.TabletType_DRAINED { + return fmt.Errorf("Tablet: %v, is already drained", agent.TabletAlias) + } // change our type in the topology _, err := topotools.ChangeType(ctx, agent.TopoServer, agent.TabletAlias, tabletType) if err != nil { diff --git a/go/vt/worker/split_diff.go b/go/vt/worker/split_diff.go index 0729718226f..e15656873b8 100644 --- a/go/vt/worker/split_diff.go +++ b/go/vt/worker/split_diff.go @@ -239,14 +239,29 @@ func (sdw *SplitDiffWorker) findTargets(ctx context.Context) error { // find an appropriate tablet in the source shard for _, ss := range sdw.shardInfo.SourceShards { if ss.Uid == sdw.sourceUID { - sdw.sourceAlias, err = FindWorkerTablet(ctx, sdw.wr, sdw.cleaner, nil /* tsc */, sdw.cell, sdw.keyspace, ss.Shard, sdw.minHealthyRdonlyTablets, topodatapb.TabletType_RDONLY) - if err != nil { - return fmt.Errorf("FindWorkerTablet() failed for %v/%v/%v: %v", sdw.cell, sdw.keyspace, ss.Shard, err) + // During an horizontal shard split, multiple workers will race to get + // a RDONLY tablet in the source shard. When this happen, concurrent calls + // to FindWorkerTablet could attempt to set to DRAIN state the same tablet. Only + // one of these calls to FindWorkerTablet will succeed and the rest will fail. + // The following, makes sures we keep trying to find a worker tablet when this error occur. + shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) + for { + select { + case <-shortCtx.Done(): + return fmt.Errorf("Could not find healthy table for %v/%v%v: after: %v, aborting", sdw.cell, sdw.keyspace, ss.Shard, *remoteActionsTimeout) + default: + sdw.sourceAlias, err = FindWorkerTablet(ctx, sdw.wr, sdw.cleaner, nil /* tsc */, sdw.cell, sdw.keyspace, ss.Shard, sdw.minHealthyRdonlyTablets, topodatapb.TabletType_RDONLY) + if err != nil { + sdw.wr.Logger().Infof("FindWorkerTablet() failed for %v/%v/%v: %v retrying...", sdw.cell, sdw.keyspace, ss.Shard, err) + continue + } + cancel() + return nil + } } } } - - return nil + panic("unreachable") } // synchronizeReplication phase: diff --git a/go/vt/worker/topo_utils.go b/go/vt/worker/topo_utils.go index 7e0140e791b..6c078352b88 100644 --- a/go/vt/worker/topo_utils.go +++ b/go/vt/worker/topo_utils.go @@ -119,11 +119,17 @@ func FindWorkerTablet(ctx context.Context, wr *wrangler.Wrangler, cleaner *wrang return nil, err } - // We add the tag before calling ChangeSlaveType, so the destination - // vttablet reloads the worker URL when it reloads the tablet. + wr.Logger().Infof("Changing tablet %v to '%v'", topoproto.TabletAliasString(tabletAlias), topodatapb.TabletType_DRAINED) + shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) + err = wr.ChangeSlaveType(shortCtx, tabletAlias, topodatapb.TabletType_DRAINED) + cancel() + if err != nil { + return nil, err + } + ourURL := servenv.ListeningURL.String() wr.Logger().Infof("Adding tag[worker]=%v to tablet %v", ourURL, topoproto.TabletAliasString(tabletAlias)) - shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) + shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) _, err = wr.TopoServer().UpdateTabletFields(shortCtx, tabletAlias, func(tablet *topodatapb.Tablet) error { if tablet.Tags == nil { tablet.Tags = make(map[string]string) @@ -142,16 +148,17 @@ func FindWorkerTablet(ctx context.Context, wr *wrangler.Wrangler, cleaner *wrang defer wrangler.RecordTabletTagAction(cleaner, tabletAlias, "worker", "") defer wrangler.RecordTabletTagAction(cleaner, tabletAlias, "drain_reason", "") - wr.Logger().Infof("Changing tablet %v to '%v'", topoproto.TabletAliasString(tabletAlias), topodatapb.TabletType_DRAINED) + // Record a clean-up action to take the tablet back to tabletAlias. + wrangler.RecordChangeSlaveTypeAction(cleaner, tabletAlias, topodatapb.TabletType_DRAINED, tabletType) + + // We refresh the destination vttablet reloads the worker URL when it reloads the tablet. shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) - err = wr.ChangeSlaveType(shortCtx, tabletAlias, topodatapb.TabletType_DRAINED) - cancel() + wr.RefreshTabletState(shortCtx, tabletAlias) if err != nil { return nil, err } + cancel() - // Record a clean-up action to take the tablet back to rdonly. - wrangler.RecordChangeSlaveTypeAction(cleaner, tabletAlias, topodatapb.TabletType_DRAINED, topodatapb.TabletType_RDONLY) return tabletAlias, nil } diff --git a/go/vt/workflow/resharding/horizontal_resharding_workflow.go b/go/vt/workflow/resharding/horizontal_resharding_workflow.go index 5152da21bfa..39a86cfb580 100644 --- a/go/vt/workflow/resharding/horizontal_resharding_workflow.go +++ b/go/vt/workflow/resharding/horizontal_resharding_workflow.go @@ -248,8 +248,13 @@ func findSourceAndDestinationShards(ts *topo.Server, keyspace string) ([]string, } func initCheckpointFromShards(keyspace string, vtworkers, sourceShards, destinationShards []string, minHealthyRdonlyTablets, splitCmd, splitDiffDestTabletType string) (*workflowpb.WorkflowCheckpoint, error) { - if len(vtworkers) != len(sourceShards) { - return nil, fmt.Errorf("there are %v vtworkers, %v source shards: the number should be same", len(vtworkers), len(sourceShards)) + if len(vtworkers) != len(destinationShards) { + return nil, fmt.Errorf("there are %v vtworkers, %v destination shards: the number should be same", len(vtworkers), len(destinationShards)) + } + + splitRatio := len(destinationShards) / len(sourceShards) + if minHealthyRdonlyTabletsVal, err := strconv.Atoi(minHealthyRdonlyTablets); err != nil || minHealthyRdonlyTabletsVal < splitRatio { + return nil, fmt.Errorf("there are not enough rdonly tablets in source shards. You need at least %v, it got: %v", splitRatio, minHealthyRdonlyTablets) } tasks := make(map[string]*workflowpb.Task) @@ -280,7 +285,7 @@ func initCheckpointFromShards(keyspace string, vtworkers, sourceShards, destinat "keyspace": keyspace, "destination_shard": shard, "dest_tablet_type": splitDiffDestTabletType, - "vtworker": vtworkers[0], + "vtworker": vtworkers[i], } }) initTasks(tasks, phaseMigrateRdonly, sourceShards, func(i int, shard string) map[string]string { @@ -382,7 +387,7 @@ func (hw *HorizontalReshardingWorkflow) runWorkflow() error { } diffTasks := hw.GetTasks(phaseDiff) - diffRunner := NewParallelRunner(hw.ctx, hw.rootUINode, hw.checkpointWriter, diffTasks, hw.runSplitDiff, Sequential, hw.enableApprovals) + diffRunner := NewParallelRunner(hw.ctx, hw.rootUINode, hw.checkpointWriter, diffTasks, hw.runSplitDiff, Parallel, hw.enableApprovals) if err := diffRunner.Run(); err != nil { return err } diff --git a/go/vt/workflow/resharding/horizontal_resharding_workflow_test.go b/go/vt/workflow/resharding/horizontal_resharding_workflow_test.go index 56170203801..cfc9f13195b 100644 --- a/go/vt/workflow/resharding/horizontal_resharding_workflow_test.go +++ b/go/vt/workflow/resharding/horizontal_resharding_workflow_test.go @@ -66,7 +66,8 @@ func TestHorizontalResharding(t *testing.T) { // Run the manager in the background. wg, _, cancel := startManager(m) // Create the workflow. - uuid, err := m.Create(ctx, horizontalReshardingFactoryName, []string{"-keyspace=" + testKeyspace, "-vtworkers=" + testVtworkers, "-enable_approvals=false"}) + vtworkersParameter := testVtworkers + "," + testVtworkers + uuid, err := m.Create(ctx, horizontalReshardingFactoryName, []string{"-keyspace=" + testKeyspace, "-vtworkers=" + vtworkersParameter, "-enable_approvals=false", "-min_healthy_rdonly_tablets=2"}) if err != nil { t.Fatalf("cannot create resharding workflow: %v", err) } @@ -101,7 +102,7 @@ func setupFakeVtworker(keyspace, vtworkers string) *fakevtworkerclient.FakeVtwor flag.Set("vtworker_client_protocol", "fake") fakeVtworkerClient := fakevtworkerclient.NewFakeVtworkerClient() fakeVtworkerClient.RegisterResultForAddr(vtworkers, []string{"Reset"}, "", nil) - fakeVtworkerClient.RegisterResultForAddr(vtworkers, []string{"SplitClone", "--min_healthy_rdonly_tablets=1", keyspace + "/0"}, "", nil) + fakeVtworkerClient.RegisterResultForAddr(vtworkers, []string{"SplitClone", "--min_healthy_rdonly_tablets=2", keyspace + "/0"}, "", nil) fakeVtworkerClient.RegisterResultForAddr(vtworkers, []string{"Reset"}, "", nil) fakeVtworkerClient.RegisterResultForAddr(vtworkers, []string{"SplitDiff", "--min_healthy_rdonly_tablets=1", "--dest_tablet_type=RDONLY", keyspace + "/-80"}, "", nil) fakeVtworkerClient.RegisterResultForAddr(vtworkers, []string{"Reset"}, "", nil) diff --git a/go/vt/wrangler/tablet.go b/go/vt/wrangler/tablet.go index 7b8b7393f13..285c279578b 100644 --- a/go/vt/wrangler/tablet.go +++ b/go/vt/wrangler/tablet.go @@ -161,6 +161,18 @@ func (wr *Wrangler) ChangeSlaveType(ctx context.Context, tabletAlias *topodatapb return wr.tmc.ChangeType(ctx, ti.Tablet, tabletType) } +// RefreshTabletState refreshes tablet state +func (wr *Wrangler) RefreshTabletState(ctx context.Context, tabletAlias *topodatapb.TabletAlias) error { + // Load tablet to find endpoint, and keyspace and shard assignment. + ti, err := wr.ts.GetTablet(ctx, tabletAlias) + if err != nil { + return err + } + + // and ask the tablet to refresh itself + return wr.tmc.RefreshState(ctx, ti.Tablet) +} + // ExecuteFetchAsDba executes a query remotely using the DBA pool func (wr *Wrangler) ExecuteFetchAsDba(ctx context.Context, tabletAlias *topodatapb.TabletAlias, query string, maxRows int, disableBinlogs bool, reloadSchema bool) (*querypb.QueryResult, error) { ti, err := wr.ts.GetTablet(ctx, tabletAlias) diff --git a/test/tabletmanager.py b/test/tabletmanager.py index 2cfc236aabc..68a5bf9f645 100755 --- a/test/tabletmanager.py +++ b/test/tabletmanager.py @@ -507,6 +507,12 @@ def test_health_check_drained_state_does_not_shutdown_query_service(self): # implementation.) The tablet will stay healthy, and the # query service is still running. utils.run_vtctl(['ChangeSlaveType', tablet_62044.tablet_alias, 'drained']) + # Trying to drain the same tablet again, should error + try: + utils.run_vtctl(['ChangeSlaveType', tablet_62044.tablet_alias, 'drained']) + except Exception as e: + s = str(e) + self.assertIn("already drained", s) utils.run_vtctl(['StopSlave', tablet_62044.tablet_alias]) # Trigger healthcheck explicitly to avoid waiting for the next interval. utils.run_vtctl(['RunHealthCheck', tablet_62044.tablet_alias])