Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions go/vt/vttablet/tabletmanager/rpc_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ func (agent *ActionAgent) ChangeType(ctx context.Context, tabletType topodatapb.
}
defer agent.unlock()

// We don't want to allow multiple callers to claim a tablet as drained. There is a race that could happen during
// horizontal resharding where two vtworkers will try to DRAIN the same tablet. This check prevents that race from
// causing errors.
if tabletType == topodatapb.TabletType_DRAINED && agent.Tablet().Type == topodatapb.TabletType_DRAINED {
return fmt.Errorf("Tablet: %v, is already drained", agent.TabletAlias)
}
// change our type in the topology
_, err := topotools.ChangeType(ctx, agent.TopoServer, agent.TabletAlias, tabletType)
if err != nil {
Expand Down
25 changes: 20 additions & 5 deletions go/vt/worker/split_diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,14 +239,29 @@ func (sdw *SplitDiffWorker) findTargets(ctx context.Context) error {
// find an appropriate tablet in the source shard
for _, ss := range sdw.shardInfo.SourceShards {
if ss.Uid == sdw.sourceUID {
sdw.sourceAlias, err = FindWorkerTablet(ctx, sdw.wr, sdw.cleaner, nil /* tsc */, sdw.cell, sdw.keyspace, ss.Shard, sdw.minHealthyRdonlyTablets, topodatapb.TabletType_RDONLY)
if err != nil {
return fmt.Errorf("FindWorkerTablet() failed for %v/%v/%v: %v", sdw.cell, sdw.keyspace, ss.Shard, err)
// During an horizontal shard split, multiple workers will race to get
// a RDONLY tablet in the source shard. When this happen, concurrent calls
// to FindWorkerTablet could attempt to set to DRAIN state the same tablet. Only
// one of these calls to FindWorkerTablet will succeed and the rest will fail.
// The following, makes sures we keep trying to find a worker tablet when this error occur.
shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
for {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this loop will help.

The context gets passed in, and the downstream calls are themselves looping waiting for context to expire. I see one difference in waitForHealthyTablets where a waitForHealthyTabletsTimeout gets added, but its default value is same as remoteActionsTimeout.

Can you check the error that caused the race? Then we can identify the code path, and ideally fix it at the point where the error origintated.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey Sugu - I've validated in my local environment that this fixes the problem.

The loop by itself does not help. The fix is the loop + the change where now when you try to call drain on a tablet that is already drained, you will get an error.

Before this change, the race allow two vtworkers to use the same tablet and if I recall correctly one of the actual errors you get is when trying to synchronizeReplication. Both workers are trying to StopBlp and they get errors.

select {
case <-shortCtx.Done():
return fmt.Errorf("Could not find healthy table for %v/%v%v: after: %v, aborting", sdw.cell, sdw.keyspace, ss.Shard, *remoteActionsTimeout)
default:
sdw.sourceAlias, err = FindWorkerTablet(ctx, sdw.wr, sdw.cleaner, nil /* tsc */, sdw.cell, sdw.keyspace, ss.Shard, sdw.minHealthyRdonlyTablets, topodatapb.TabletType_RDONLY)
if err != nil {
sdw.wr.Logger().Infof("FindWorkerTablet() failed for %v/%v/%v: %v retrying...", sdw.cell, sdw.keyspace, ss.Shard, err)
continue
}
cancel()
return nil
}
}
}
}

return nil
panic("unreachable")
}

// synchronizeReplication phase:
Expand Down
23 changes: 15 additions & 8 deletions go/vt/worker/topo_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,17 @@ func FindWorkerTablet(ctx context.Context, wr *wrangler.Wrangler, cleaner *wrang
return nil, err
}

// We add the tag before calling ChangeSlaveType, so the destination
// vttablet reloads the worker URL when it reloads the tablet.
wr.Logger().Infof("Changing tablet %v to '%v'", topoproto.TabletAliasString(tabletAlias), topodatapb.TabletType_DRAINED)
shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
err = wr.ChangeSlaveType(shortCtx, tabletAlias, topodatapb.TabletType_DRAINED)
cancel()
if err != nil {
return nil, err
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... the tag comment right below this explicitly states that the intent is to put the tag on before changing the slave type, so either we need to rethink this part or at least change the comment now that it's no longer true.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh good catch. I think we can solve this by refreshing tablet state after adding this tag. Let's get @sougou input on this.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at implementation of ChangeSlaveType and it has a built-in refresh, which is probably why the tag change is done before.

The refresh that @rafael talks about is a higher level one, and I think it's for reacting to change in serving state. But since this is a lower level function that others can use independent of serving state, we should preserve the original order.

ourURL := servenv.ListeningURL.String()
wr.Logger().Infof("Adding tag[worker]=%v to tablet %v", ourURL, topoproto.TabletAliasString(tabletAlias))
shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
_, err = wr.TopoServer().UpdateTabletFields(shortCtx, tabletAlias, func(tablet *topodatapb.Tablet) error {
if tablet.Tags == nil {
tablet.Tags = make(map[string]string)
Expand All @@ -142,16 +148,17 @@ func FindWorkerTablet(ctx context.Context, wr *wrangler.Wrangler, cleaner *wrang
defer wrangler.RecordTabletTagAction(cleaner, tabletAlias, "worker", "")
defer wrangler.RecordTabletTagAction(cleaner, tabletAlias, "drain_reason", "")

wr.Logger().Infof("Changing tablet %v to '%v'", topoproto.TabletAliasString(tabletAlias), topodatapb.TabletType_DRAINED)
// Record a clean-up action to take the tablet back to tabletAlias.
wrangler.RecordChangeSlaveTypeAction(cleaner, tabletAlias, topodatapb.TabletType_DRAINED, tabletType)

// We refresh the destination vttablet reloads the worker URL when it reloads the tablet.
shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
err = wr.ChangeSlaveType(shortCtx, tabletAlias, topodatapb.TabletType_DRAINED)
cancel()
wr.RefreshTabletState(shortCtx, tabletAlias)
if err != nil {
return nil, err
}
cancel()

// Record a clean-up action to take the tablet back to rdonly.
wrangler.RecordChangeSlaveTypeAction(cleaner, tabletAlias, topodatapb.TabletType_DRAINED, topodatapb.TabletType_RDONLY)
return tabletAlias, nil
}

Expand Down
13 changes: 9 additions & 4 deletions go/vt/workflow/resharding/horizontal_resharding_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,13 @@ func findSourceAndDestinationShards(ts *topo.Server, keyspace string) ([]string,
}

func initCheckpointFromShards(keyspace string, vtworkers, sourceShards, destinationShards []string, minHealthyRdonlyTablets, splitCmd, splitDiffDestTabletType string) (*workflowpb.WorkflowCheckpoint, error) {
if len(vtworkers) != len(sourceShards) {
return nil, fmt.Errorf("there are %v vtworkers, %v source shards: the number should be same", len(vtworkers), len(sourceShards))
if len(vtworkers) != len(destinationShards) {
return nil, fmt.Errorf("there are %v vtworkers, %v destination shards: the number should be same", len(vtworkers), len(destinationShards))
}

splitRatio := len(destinationShards) / len(sourceShards)
if minHealthyRdonlyTabletsVal, err := strconv.Atoi(minHealthyRdonlyTablets); err != nil || minHealthyRdonlyTabletsVal < splitRatio {
return nil, fmt.Errorf("there are not enough rdonly tablets in source shards. You need at least %v, it got: %v", splitRatio, minHealthyRdonlyTablets)
}

tasks := make(map[string]*workflowpb.Task)
Expand Down Expand Up @@ -280,7 +285,7 @@ func initCheckpointFromShards(keyspace string, vtworkers, sourceShards, destinat
"keyspace": keyspace,
"destination_shard": shard,
"dest_tablet_type": splitDiffDestTabletType,
"vtworker": vtworkers[0],
"vtworker": vtworkers[i],
}
})
initTasks(tasks, phaseMigrateRdonly, sourceShards, func(i int, shard string) map[string]string {
Expand Down Expand Up @@ -382,7 +387,7 @@ func (hw *HorizontalReshardingWorkflow) runWorkflow() error {
}

diffTasks := hw.GetTasks(phaseDiff)
diffRunner := NewParallelRunner(hw.ctx, hw.rootUINode, hw.checkpointWriter, diffTasks, hw.runSplitDiff, Sequential, hw.enableApprovals)
diffRunner := NewParallelRunner(hw.ctx, hw.rootUINode, hw.checkpointWriter, diffTasks, hw.runSplitDiff, Parallel, hw.enableApprovals)
if err := diffRunner.Run(); err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ func TestHorizontalResharding(t *testing.T) {
// Run the manager in the background.
wg, _, cancel := startManager(m)
// Create the workflow.
uuid, err := m.Create(ctx, horizontalReshardingFactoryName, []string{"-keyspace=" + testKeyspace, "-vtworkers=" + testVtworkers, "-enable_approvals=false"})
vtworkersParameter := testVtworkers + "," + testVtworkers
uuid, err := m.Create(ctx, horizontalReshardingFactoryName, []string{"-keyspace=" + testKeyspace, "-vtworkers=" + vtworkersParameter, "-enable_approvals=false", "-min_healthy_rdonly_tablets=2"})
if err != nil {
t.Fatalf("cannot create resharding workflow: %v", err)
}
Expand Down Expand Up @@ -101,7 +102,7 @@ func setupFakeVtworker(keyspace, vtworkers string) *fakevtworkerclient.FakeVtwor
flag.Set("vtworker_client_protocol", "fake")
fakeVtworkerClient := fakevtworkerclient.NewFakeVtworkerClient()
fakeVtworkerClient.RegisterResultForAddr(vtworkers, []string{"Reset"}, "", nil)
fakeVtworkerClient.RegisterResultForAddr(vtworkers, []string{"SplitClone", "--min_healthy_rdonly_tablets=1", keyspace + "/0"}, "", nil)
fakeVtworkerClient.RegisterResultForAddr(vtworkers, []string{"SplitClone", "--min_healthy_rdonly_tablets=2", keyspace + "/0"}, "", nil)
fakeVtworkerClient.RegisterResultForAddr(vtworkers, []string{"Reset"}, "", nil)
fakeVtworkerClient.RegisterResultForAddr(vtworkers, []string{"SplitDiff", "--min_healthy_rdonly_tablets=1", "--dest_tablet_type=RDONLY", keyspace + "/-80"}, "", nil)
fakeVtworkerClient.RegisterResultForAddr(vtworkers, []string{"Reset"}, "", nil)
Expand Down
12 changes: 12 additions & 0 deletions go/vt/wrangler/tablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,18 @@ func (wr *Wrangler) ChangeSlaveType(ctx context.Context, tabletAlias *topodatapb
return wr.tmc.ChangeType(ctx, ti.Tablet, tabletType)
}

// RefreshTabletState refreshes tablet state
func (wr *Wrangler) RefreshTabletState(ctx context.Context, tabletAlias *topodatapb.TabletAlias) error {
// Load tablet to find endpoint, and keyspace and shard assignment.
ti, err := wr.ts.GetTablet(ctx, tabletAlias)
if err != nil {
return err
}

// and ask the tablet to refresh itself
return wr.tmc.RefreshState(ctx, ti.Tablet)
}

// ExecuteFetchAsDba executes a query remotely using the DBA pool
func (wr *Wrangler) ExecuteFetchAsDba(ctx context.Context, tabletAlias *topodatapb.TabletAlias, query string, maxRows int, disableBinlogs bool, reloadSchema bool) (*querypb.QueryResult, error) {
ti, err := wr.ts.GetTablet(ctx, tabletAlias)
Expand Down
6 changes: 6 additions & 0 deletions test/tabletmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,12 @@ def test_health_check_drained_state_does_not_shutdown_query_service(self):
# implementation.) The tablet will stay healthy, and the
# query service is still running.
utils.run_vtctl(['ChangeSlaveType', tablet_62044.tablet_alias, 'drained'])
# Trying to drain the same tablet again, should error
try:
utils.run_vtctl(['ChangeSlaveType', tablet_62044.tablet_alias, 'drained'])
except Exception as e:
s = str(e)
self.assertIn("already drained", s)
utils.run_vtctl(['StopSlave', tablet_62044.tablet_alias])
# Trigger healthcheck explicitly to avoid waiting for the next interval.
utils.run_vtctl(['RunHealthCheck', tablet_62044.tablet_alias])
Expand Down