Skip to content
14 changes: 8 additions & 6 deletions go/vt/vtcombo/tablet_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,14 @@ func InitTabletMap(ts *topo.Server, tpb *vttestpb.VTTestTopology, mysqld mysqlct

ctx := context.Background()

// Register the tablet manager client factory for tablet manager
// Do this before any tablets are created so that they respect the protocol,
// otherwise it defaults to grpc
tmclient.RegisterTabletManagerClientFactory("internal", func() tmclient.TabletManagerClient {
return &internalTabletManagerClient{}
})
*tmclient.TabletManagerProtocol = "internal"

// iterate through the keyspaces
wr := wrangler.New(logutil.NewConsoleLogger(), ts, nil)
var uid uint32 = 1
Expand Down Expand Up @@ -246,12 +254,6 @@ func InitTabletMap(ts *topo.Server, tpb *vttestpb.VTTestTopology, mysqld mysqlct
tabletconn.RegisterDialer("internal", dialer)
*tabletconn.TabletProtocol = "internal"

// Register the tablet manager client factory for tablet manager
tmclient.RegisterTabletManagerClientFactory("internal", func() tmclient.TabletManagerClient {
return &internalTabletManagerClient{}
})
*tmclient.TabletManagerProtocol = "internal"

// run healthcheck on all vttablets
tmc := tmclient.NewTabletManagerClient()
for _, tablet := range tabletMap {
Expand Down
5 changes: 4 additions & 1 deletion go/vt/vttablet/tabletmanager/healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletservermock"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"

// needed so that grpc client is registered
_ "vitess.io/vitess/go/vt/vttablet/grpctmclient"
)

func TestHealthRecordDeduplication(t *testing.T) {
Expand Down Expand Up @@ -717,7 +720,7 @@ func TestStateChangeImmediateHealthBroadcast(t *testing.T) {
// Run TER to turn us into a proper master, wait for it to finish.
agent.HealthReporter.(*fakeHealthCheck).reportReplicationDelay = 19 * time.Second
if err := agent.TabletExternallyReparented(ctx, "unused_id"); err != nil {
t.Fatal(err)
t.Fatalf("TabletExternallyReparented failed: %v", err)
}
<-agent.finalizeReparentCtx.Done()
ti, err := agent.TopoServer.GetTablet(ctx, tabletAlias)
Expand Down
135 changes: 95 additions & 40 deletions go/vt/vttablet/tabletmanager/rpc_external_reparent.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,6 @@ func (agent *ActionAgent) TabletExternallyReparented(ctx context.Context, extern
// timestamp to the current time.
agent.setExternallyReparentedTime(startTime)

if topoproto.TabletAliasEqual(si.MasterAlias, tablet.Alias) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This short circuit was important because YouTube's equivalent of Orchestrator would call TabletExternallyReparented every 10 seconds on the master, even if nothing has changed. We don't do that yet in the Orchestrator integration, but it was always intended and the integration is incomplete without that constantly repeating signal.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

!! Why did it do that? Was the intent to have it basically be a self healing loop?

// We may get called on the current master even when nothing has changed.
// If the global shard record is already updated, it means we successfully
// finished a previous reparent to this tablet.
return nil
}

// Create a reusable Reparent event with available info.
ev := &events.Reparent{
ShardInfo: *si,
Expand All @@ -105,16 +98,19 @@ func (agent *ActionAgent) TabletExternallyReparented(ctx context.Context, extern
}()
event.DispatchUpdate(ev, "starting external from tablet (fast)")

// Execute state change to master by force-updating only the local copy of the
// tablet record. The actual record in topo will be updated later.
log.Infof("fastTabletExternallyReparented: executing change callback for state change to MASTER")
newTablet := proto.Clone(tablet).(*topodatapb.Tablet)
newTablet.Type = topodatapb.TabletType_MASTER

// This is where updateState will block for gracePeriod, while it gives
// vtgate a chance to stop sending replica queries.
agent.updateState(ctx, newTablet, "fastTabletExternallyReparented")

// We may get called on the current master multiple times in order to fix incomplete external reparents.
// We update the tablet here only if it is not currently master
if tablet.Type != topodatapb.TabletType_MASTER {
log.Infof("fastTabletExternallyReparented: executing change callback for state change to MASTER")
// Execute state change to master by force-updating only the local copy of the
// tablet record. The actual record in topo will be updated later.
newTablet := proto.Clone(tablet).(*topodatapb.Tablet)
newTablet.Type = topodatapb.TabletType_MASTER

// This is where updateState will block for gracePeriod, while it gives
// vtgate a chance to stop sending replica queries.
agent.updateState(ctx, newTablet, "fastTabletExternallyReparented")
}
// Start the finalize stage with a background context, but connect the trace.
bgCtx, cancel := context.WithTimeout(agent.batchCtx, *finalizeReparentTimeout)
bgCtx = trace.CopySpan(bgCtx, ctx)
Expand All @@ -135,25 +131,34 @@ func (agent *ActionAgent) TabletExternallyReparented(ctx context.Context, extern
}

// finalizeTabletExternallyReparented performs slow, synchronized reconciliation
// tasks that ensure topology is self-consistent, and then marks the reparent as
// finished by updating the global shard record.
// tasks that ensure topology is self-consistent.
// It first updates new and old master tablet records, then updates
// the global shard record, then refreshes the old master.
// After that it attempts to detect and clean up any lingering old masters.
// Note that an up-to-date shard record does not necessarily mean that
// the reparent completed all the actions successfully
func (agent *ActionAgent) finalizeTabletExternallyReparented(ctx context.Context, si *topo.ShardInfo, ev *events.Reparent) (err error) {
var wg sync.WaitGroup
var errs concurrency.AllErrorRecorder
var oldMasterTablet *topodatapb.Tablet
oldMasterAlias := si.MasterAlias

// Update the tablet records concurrently.
// Update the new and old master tablet records concurrently.
event.DispatchUpdate(ev, "updating old and new master tablet records")
log.Infof("finalizeTabletExternallyReparented: updating tablet records")
wg.Add(1)
go func() {
defer wg.Done()
// Update our own record to master.
log.Infof("finalizeTabletExternallyReparented: updating tablet record for new master: %v", agent.TabletAlias)
// Update our own record to master if needed
_, err := agent.TopoServer.UpdateTabletFields(ctx, agent.TabletAlias,
func(tablet *topodatapb.Tablet) error {
tablet.Type = topodatapb.TabletType_MASTER
return nil
if tablet.Type != topodatapb.TabletType_MASTER {
tablet.Type = topodatapb.TabletType_MASTER
return nil
}
// returning NoUpdateNeeded avoids unnecessary calls to UpdateTablet
return topo.NewError(topo.NoUpdateNeeded, agent.TabletAlias.String())
})
if err != nil {
errs.RecordError(err)
Expand All @@ -164,27 +169,32 @@ func (agent *ActionAgent) finalizeTabletExternallyReparented(ctx context.Context
wg.Add(1)
go func() {
defer wg.Done()
log.Infof("finalizeTabletExternallyReparented: updating tablet record for old master: %v", oldMasterAlias)

// Forcibly demote the old master in topology, since we can't rely on the
// old master to be up to change its own record.
var err error
oldMasterTablet, err = agent.TopoServer.UpdateTabletFields(ctx, oldMasterAlias,
func(tablet *topodatapb.Tablet) error {
tablet.Type = topodatapb.TabletType_REPLICA
return nil
if tablet.Type == topodatapb.TabletType_MASTER {
tablet.Type = topodatapb.TabletType_REPLICA
return nil
}
// returning NoUpdateNeeded avoids unnecessary calls to UpdateTablet
return topo.NewError(topo.NoUpdateNeeded, oldMasterAlias.String())
})
if err != nil {
errs.RecordError(err)
return
}

// We now know more about the old master, so add it to event data.
ev.OldMaster = *oldMasterTablet
// oldMasterTablet will be nil if no update was needed
if oldMasterTablet != nil {
ev.OldMaster = *oldMasterTablet
}
}()
}

tablet := agent.Tablet()

// Wait for the tablet records to be updated. At that point, any rebuild will
// see the new master, so we're ready to mark the reparent as done in the
// global shard record.
Expand All @@ -193,44 +203,48 @@ func (agent *ActionAgent) finalizeTabletExternallyReparented(ctx context.Context
return errs.Error()
}

masterTablet := agent.Tablet()

event.DispatchUpdate(ev, "updating global shard record")
log.Infof("finalizeTabletExternallyReparented: updating global shard record if needed")
wg.Add(1)
go func() {
defer wg.Done()

// Update the master field in the global shard record. We don't use a lock
// here anymore. The lock was only to ensure that the global shard record
// didn't get modified between the time when we read it and the time when we
// write it back. Now we use an update loop pattern to do that instead.
event.DispatchUpdate(ev, "updating global shard record")
log.Infof("finalizeTabletExternallyReparented: updating global shard record if needed")
_, err = agent.TopoServer.UpdateShardFields(ctx, tablet.Keyspace, tablet.Shard, func(currentSi *topo.ShardInfo) error {
if topoproto.TabletAliasEqual(currentSi.MasterAlias, tablet.Alias) {
return topo.NewError(topo.NoUpdateNeeded, tablet.Alias.String())
_, err = agent.TopoServer.UpdateShardFields(ctx, masterTablet.Keyspace, masterTablet.Shard, func(currentSi *topo.ShardInfo) error {
if topoproto.TabletAliasEqual(currentSi.MasterAlias, masterTablet.Alias) {
// returning NoUpdateNeeded avoids unnecessary calls to UpdateTablet
return topo.NewError(topo.NoUpdateNeeded, masterTablet.Alias.String())
}
if !topoproto.TabletAliasEqual(currentSi.MasterAlias, oldMasterAlias) {
log.Warningf("old master alias (%v) not found in the global Shard record i.e. it has changed in the meantime."+
" We're not overwriting the value with the new master (%v) because the current value is probably newer."+
" (initial Shard record = %#v, current Shard record = %#v)",
oldMasterAlias, tablet.Alias, si, currentSi)
oldMasterAlias, masterTablet.Alias, si, currentSi)
// returning NoUpdateNeeded avoids unnecessary calls to UpdateTablet
return topo.NewError(topo.NoUpdateNeeded, oldMasterAlias.String())
}
currentSi.MasterAlias = tablet.Alias
currentSi.MasterAlias = masterTablet.Alias
return nil
})
if err != nil {
errs.RecordError(err)
}
}()
if !topoproto.TabletAliasIsZero(oldMasterAlias) {

tmc := tmclient.NewTabletManagerClient()
defer tmc.Close()
if !topoproto.TabletAliasIsZero(oldMasterAlias) && oldMasterTablet != nil {
wg.Add(1)
go func() {
defer wg.Done()

// Tell the old master to re-read its tablet record and change its state.
// We don't need to put error into errs if this fails, but we need to wait
// for it to make sure that old master tablet is not stuck in the MASTER
// state.
tmc := tmclient.NewTabletManagerClient()
if err := tmc.RefreshState(ctx, oldMasterTablet); err != nil {
log.Warningf("Error calling RefreshState on old master %v: %v", topoproto.TabletAliasString(oldMasterTablet.Alias), err)
}
Expand All @@ -242,6 +256,47 @@ func (agent *ActionAgent) finalizeTabletExternallyReparented(ctx context.Context
return errs.Error()
}

// Look for any other tablets claiming to be master and fix them up on a best-effort basis
tabletMap, err := agent.TopoServer.GetTabletMapForShard(ctx, masterTablet.Keyspace, masterTablet.Shard)
if err != nil {
log.Errorf("ignoring error %v from GetTabletMapForShard so that we can process any partial results", err)
}

for _, tabletInfo := range tabletMap {
alias := tabletInfo.Tablet.Alias
if !topoproto.TabletAliasEqual(alias, agent.TabletAlias) && !topoproto.TabletAliasEqual(alias, oldMasterAlias) && tabletInfo.Tablet.Type == topodatapb.TabletType_MASTER {
log.Infof("finalizeTabletExternallyReparented: updating tablet record for another old master: %v", alias)
wg.Add(1)
go func(alias *topodatapb.TabletAlias) {
defer wg.Done()
var err error
tab, err := agent.TopoServer.UpdateTabletFields(ctx, alias,
func(tablet *topodatapb.Tablet) error {
if tablet.Type == topodatapb.TabletType_MASTER {
tablet.Type = topodatapb.TabletType_REPLICA
return nil
}
return topo.NewError(topo.NoUpdateNeeded, alias.String())
})
if err != nil {
errs.RecordError(err)
return
}
// tab will be nil if no update was needed
if tab != nil {
log.Infof("finalizeTabletExternallyReparented: Refresh state for tablet: %v", topoproto.TabletAliasString(tab.Alias))
if err := tmc.RefreshState(ctx, tab); err != nil {
log.Warningf("Error calling RefreshState on old master %v: %v", topoproto.TabletAliasString(tab.Alias), err)
}
}
}(alias)
}
}
wg.Wait()
if errs.HasErrors() {
return errs.Error()
}

event.DispatchUpdate(ev, "finished")
return nil
}
Expand Down
Loading