Skip to content
192 changes: 105 additions & 87 deletions go/vt/proto/topodata/topodata.pb.go

Large diffs are not rendered by default.

22 changes: 20 additions & 2 deletions go/vt/topotools/tablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"errors"
"fmt"

"github.com/golang/protobuf/proto"
"golang.org/x/net/context"

"vitess.io/vitess/go/vt/hook"
Expand All @@ -46,6 +47,7 @@ import (

querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/proto/vttime"
)

// ConfigureTabletHook configures the right parameters for a hook
Expand All @@ -61,11 +63,27 @@ func ConfigureTabletHook(hk *hook.Hook, tabletAlias *topodatapb.TabletAlias) {
// transitions need to be forced from time to time.
//
// If successful, the updated tablet record is returned.
func ChangeType(ctx context.Context, ts *topo.Server, tabletAlias *topodatapb.TabletAlias, newType topodatapb.TabletType) (*topodatapb.Tablet, error) {
return ts.UpdateTabletFields(ctx, tabletAlias, func(tablet *topodatapb.Tablet) error {
func ChangeType(ctx context.Context, ts *topo.Server, tabletAlias *topodatapb.TabletAlias, newType topodatapb.TabletType, masterTermStartTime *vttime.Time) (*topodatapb.Tablet, error) {
var result *topodatapb.Tablet
// Always clear out the master timestamp if not master.
if newType != topodatapb.TabletType_MASTER {
masterTermStartTime = nil
}
_, err := ts.UpdateTabletFields(ctx, tabletAlias, func(tablet *topodatapb.Tablet) error {
// Save the most recent tablet value so we can return it
// either if the update succeeds or if no update is needed.
result = tablet
if tablet.Type == newType && proto.Equal(tablet.MasterTermStartTime, masterTermStartTime) {
return topo.NewError(topo.NoUpdateNeeded, topoproto.TabletAliasString(tabletAlias))
}
tablet.Type = newType
tablet.MasterTermStartTime = masterTermStartTime
return nil
})
if err != nil {
return nil, err
}
return result, nil
}

// CheckOwnership returns nil iff the Hostname and port match on oldTablet and
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vtctld/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ func TestAPI(t *testing.T) {
"db_name_override": "",
"tags": {},
"mysql_hostname":"",
"mysql_port":0
"mysql_port":0,
"master_term_start_time":null
}`},
{"GET", "tablets/nonexistent-999", "", "404 page not found"},
{"POST", "tablets/cell1-100?action=TestTabletAction", "", `{
Expand Down
41 changes: 32 additions & 9 deletions go/vt/vttablet/tabletmanager/healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,29 @@ func TestStateChangeImmediateHealthBroadcast(t *testing.T) {
t.Fatalf("TabletExternallyReparented failed: %v", err)
}
<-agent.finalizeReparentCtx.Done()
// It is not enough to wait for finalizeReparentCtx to be done, we have to wait for shard_sync to finish
startTime := time.Now()
for {
if time.Since(startTime) > 10*time.Second /* timeout */ {
si, err := agent.TopoServer.GetShard(ctx, agent.Tablet().Keyspace, agent.Tablet().Shard)
if err != nil {
t.Fatalf("GetShard(%v, %v) failed: %v", agent.Tablet().Keyspace, agent.Tablet().Shard, err)
}
if !topoproto.TabletAliasEqual(si.MasterAlias, agent.Tablet().Alias) {
t.Fatalf("ShardInfo should have MasterAlias %v but has %v", topoproto.TabletAliasString(agent.Tablet().Alias), topoproto.TabletAliasString(si.MasterAlias))
}
}
si, err := agent.TopoServer.GetShard(ctx, agent.Tablet().Keyspace, agent.Tablet().Shard)
if err != nil {
t.Fatalf("GetShard(%v, %v) failed: %v", agent.Tablet().Keyspace, agent.Tablet().Shard, err)
}
if topoproto.TabletAliasEqual(si.MasterAlias, agent.Tablet().Alias) {
break
} else {
time.Sleep(100 * time.Millisecond /* interval at which to re-check the shard record */)
}
}

ti, err := agent.TopoServer.GetTablet(ctx, tabletAlias)
if err != nil {
t.Fatalf("GetTablet failed: %v", err)
Expand Down Expand Up @@ -795,9 +818,8 @@ func TestStateChangeImmediateHealthBroadcast(t *testing.T) {
}
// Consume health broadcast sent out due to QueryService state change from
// (MASTER, SERVING) to (MASTER, NOT_SERVING).
// Since we didn't run healthcheck again yet, the broadcast data contains the
// cached replication lag of 20 instead of 21.
if _, err := expectBroadcastData(agent.QueryServiceControl, false, "", 20); err != nil {
// RefreshState on MASTER always sets the replicationDelay to 0
if _, err := expectBroadcastData(agent.QueryServiceControl, false, "", 0); err != nil {
t.Fatal(err)
}
if err := expectStateChange(agent.QueryServiceControl, false, topodatapb.TabletType_MASTER); err != nil {
Expand Down Expand Up @@ -845,8 +867,9 @@ func TestStateChangeImmediateHealthBroadcast(t *testing.T) {
t.Errorf("Query service should not be running")
}
// Since we didn't run healthcheck again yet, the broadcast data contains the
// cached replication lag of 22 instead of 23.
if _, err := expectBroadcastData(agent.QueryServiceControl, true, "", 22); err != nil {
// cached replication lag of 0. This is because
// RefreshState on MASTER always sets the replicationDelay to 0
if _, err := expectBroadcastData(agent.QueryServiceControl, true, "", 0); err != nil {
t.Fatal(err)
}
if err := expectStateChange(agent.QueryServiceControl, true, topodatapb.TabletType_MASTER); err != nil {
Expand Down Expand Up @@ -907,7 +930,7 @@ func TestBackupStateChange(t *testing.T) {
agent.HealthReporter.(*fakeHealthCheck).reportReplicationDelay = 16 * time.Second

// change to BACKUP, query service will turn off
if _, err := topotools.ChangeType(ctx, agent.TopoServer, agent.TabletAlias, topodatapb.TabletType_BACKUP); err != nil {
if _, err := topotools.ChangeType(ctx, agent.TopoServer, agent.TabletAlias, topodatapb.TabletType_BACKUP, nil); err != nil {
t.Fatal(err)
}
if err := agent.RefreshState(ctx); err != nil {
Expand All @@ -921,7 +944,7 @@ func TestBackupStateChange(t *testing.T) {
}
// change back to REPLICA, query service should not start
// because replication delay > unhealthyThreshold
if _, err := topotools.ChangeType(ctx, agent.TopoServer, agent.TabletAlias, topodatapb.TabletType_REPLICA); err != nil {
if _, err := topotools.ChangeType(ctx, agent.TopoServer, agent.TabletAlias, topodatapb.TabletType_REPLICA, nil); err != nil {
t.Fatal(err)
}
if err := agent.RefreshState(ctx); err != nil {
Expand Down Expand Up @@ -961,7 +984,7 @@ func TestRestoreStateChange(t *testing.T) {
agent.HealthReporter.(*fakeHealthCheck).reportReplicationDelay = 16 * time.Second

// change to RESTORE, query service will turn off
if _, err := topotools.ChangeType(ctx, agent.TopoServer, agent.TabletAlias, topodatapb.TabletType_RESTORE); err != nil {
if _, err := topotools.ChangeType(ctx, agent.TopoServer, agent.TabletAlias, topodatapb.TabletType_RESTORE, nil); err != nil {
t.Fatal(err)
}
if err := agent.RefreshState(ctx); err != nil {
Expand All @@ -975,7 +998,7 @@ func TestRestoreStateChange(t *testing.T) {
}
// change back to REPLICA, query service should not start
// because replication delay > unhealthyThreshold
if _, err := topotools.ChangeType(ctx, agent.TopoServer, agent.TabletAlias, topodatapb.TabletType_REPLICA); err != nil {
if _, err := topotools.ChangeType(ctx, agent.TopoServer, agent.TabletAlias, topodatapb.TabletType_REPLICA, nil); err != nil {
t.Fatal(err)
}
if err := agent.RefreshState(ctx); err != nil {
Expand Down
34 changes: 31 additions & 3 deletions go/vt/vttablet/tabletmanager/init_tablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,33 @@ func (agent *ActionAgent) InitTablet(port, gRPCPort int32) error {
// We're marked as master in the shard record,
// and our existing tablet record agrees.
tabletType = topodatapb.TabletType_MASTER
// Same comment as above. Update tiebreaking timestamp to now.
agent.setMasterTermStartTime(time.Now())
// Read the master term start time from tablet.
// If it is nil, it might mean that we are upgrading, so use current time instead
if oldTablet.MasterTermStartTime != nil {
agent.setMasterTermStartTime(logutil.ProtoToTime(oldTablet.MasterTermStartTime))
} else {
agent.setMasterTermStartTime(time.Now())
}
}
default:
return vterrors.Wrap(err, "InitTablet failed to read existing tablet record")
}
} else {
oldTablet, err := agent.TopoServer.GetTablet(ctx, agent.TabletAlias)
switch {
case topo.IsErrType(err, topo.NoNode):
// There's no existing tablet record, so there is nothing to do
case err == nil:
if oldTablet.Type == topodatapb.TabletType_MASTER {
// Our existing tablet type is master, but the shard record does not agree.
// Only take over if our master_term_start_time is after what is in the shard record
oldMasterTermStartTime := logutil.ProtoToTime(oldTablet.MasterTermStartTime)
currentShardTime := logutil.ProtoToTime(si.MasterTermStartTime)
if oldMasterTermStartTime.After(currentShardTime) {
tabletType = topodatapb.TabletType_MASTER
// read the master term start time from tablet
agent.setMasterTermStartTime(oldMasterTermStartTime)
}
}
default:
return vterrors.Wrap(err, "InitTablet failed to read existing tablet record")
Expand Down Expand Up @@ -176,6 +201,9 @@ func (agent *ActionAgent) InitTablet(port, gRPCPort int32) error {
DbNameOverride: *initDbNameOverride,
Tags: initTags,
}
if !agent.masterTermStartTime().IsZero() {
tablet.MasterTermStartTime = logutil.TimeToProto(agent.masterTermStartTime())
}
if port != 0 {
tablet.PortMap["vt"] = port
}
Expand Down Expand Up @@ -219,9 +247,9 @@ func (agent *ActionAgent) InitTablet(port, gRPCPort int32) error {
return vterrors.Wrap(err, "CreateTablet failed")
}

agent.setTablet(tablet)
// optionally populate metadata records
if *initPopulateMetadata {
agent.setTablet(tablet)
localMetadata := agent.getLocalMetadataValues(tablet.Type)
err := mysqlctl.PopulateMetadataTables(agent.MysqlDaemon, localMetadata, topoproto.TabletDbName(tablet))
if err != nil {
Expand Down
12 changes: 6 additions & 6 deletions go/vt/vttablet/tabletmanager/init_tablet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func TestInitTablet(t *testing.T) {
t.Errorf("wrong tablet type: %v", ti.Type)
}
if got := agent._masterTermStartTime; !got.IsZero() {
t.Fatalf("REPLICA tablet should not have an ExternallyReparentedTimestamp set: %v", got)
t.Fatalf("REPLICA tablet should not have a masterTermStartTime set: %v", got)
}

// 3. Delete the tablet record. The shard record still says that we are the
Expand All @@ -310,7 +310,7 @@ func TestInitTablet(t *testing.T) {
}
ter1 := agent._masterTermStartTime
if ter1.IsZero() {
t.Fatalf("MASTER tablet should have an ExternallyReparentedTimestamp set")
t.Fatalf("MASTER tablet should have a masterTermStartTime set")
}

// 4. Fix the tablet record to agree that we're master.
Expand All @@ -331,8 +331,8 @@ func TestInitTablet(t *testing.T) {
t.Errorf("wrong tablet type: %v", ti.Type)
}
ter2 := agent._masterTermStartTime
if ter2.IsZero() || !ter2.After(ter1) {
t.Fatalf("After a restart, ExternallyReparentedTimestamp must be set to the current time. Previous timestamp: %v current timestamp: %v", ter1, ter2)
if ter2.IsZero() || !ter2.Equal(ter1) {
t.Fatalf("After a restart, masterTermStartTime must be equal to the previous time saved in the tablet record. Previous timestamp: %v current timestamp: %v", ter1, ter2)
}

// 5. Subsequent inits will still start the vttablet as MASTER.
Expand All @@ -356,7 +356,7 @@ func TestInitTablet(t *testing.T) {
t.Errorf("wrong tablet tags: %v", ti.Tags)
}
ter3 := agent._masterTermStartTime
if ter3.IsZero() || !ter3.After(ter2) {
t.Fatalf("After a restart, ExternallyReparentedTimestamp must be set to the current time. Previous timestamp: %v current timestamp: %v", ter2, ter3)
if ter3.IsZero() || !ter3.Equal(ter2) {
t.Fatalf("After a restart, masterTermStartTime must be set to the previous time saved in the tablet record. Previous timestamp: %v current timestamp: %v", ter2, ter3)
}
}
18 changes: 13 additions & 5 deletions go/vt/vttablet/tabletmanager/rpc_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"regexp"
"time"

"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/vterrors"

"golang.org/x/net/context"
Expand Down Expand Up @@ -70,15 +71,22 @@ func (agent *ActionAgent) ChangeType(ctx context.Context, tabletType topodatapb.
if tabletType == topodatapb.TabletType_DRAINED && agent.Tablet().Type == topodatapb.TabletType_DRAINED {
return fmt.Errorf("Tablet: %v, is already drained", agent.TabletAlias)
}
// change our type in the topology
_, err := topotools.ChangeType(ctx, agent.TopoServer, agent.TabletAlias, tabletType)

agentMasterTermStartTime := time.Time{}
// If we have been told we're master, set master term start time to Now
if tabletType == topodatapb.TabletType_MASTER {
agentMasterTermStartTime = time.Now()
}
// change our type in the topology, and set masterTermStartTime on tablet record if applicable
_, err := topotools.ChangeType(ctx, agent.TopoServer, agent.TabletAlias, tabletType, logutil.TimeToProto(agentMasterTermStartTime))
if err != nil {
return err
}

// If we have been told we're master, update master term start time.
// We only update agent's masterTermStartTime if we were able to update the topo.
// This ensures that in case of a failure, we are never in a situation where the
// tablet's timestamp is ahead of the topo's timestamp.
if tabletType == topodatapb.TabletType_MASTER {
agent.setMasterTermStartTime(time.Now())
agent.setMasterTermStartTime(agentMasterTermStartTime)
}

// let's update our internal state (stop query service and other things)
Expand Down
7 changes: 4 additions & 3 deletions go/vt/vttablet/tabletmanager/rpc_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (agent *ActionAgent) Backup(ctx context.Context, concurrency int, logger lo
}
originalType = tablet.Type
// update our type to BACKUP
if _, err := topotools.ChangeType(ctx, agent.TopoServer, tablet.Alias, topodatapb.TabletType_BACKUP); err != nil {
if _, err := topotools.ChangeType(ctx, agent.TopoServer, tablet.Alias, topodatapb.TabletType_BACKUP, nil); err != nil {
return err
}

Expand Down Expand Up @@ -118,8 +118,9 @@ func (agent *ActionAgent) Backup(ctx context.Context, concurrency int, logger lo
// context. It is also possible that the context already timed out during the
// above call to Backup. Thus we use the background context to get through to the finish.

// change our type back to the original value
_, err = topotools.ChangeType(bgCtx, agent.TopoServer, tablet.Alias, originalType)
// Change our type back to the original value.
// Original type could be master so pass in a real value for masterTermStartTime
_, err = topotools.ChangeType(bgCtx, agent.TopoServer, tablet.Alias, originalType, tablet.MasterTermStartTime)
if err != nil {
// failure in changing the topology type is probably worse,
// so returning that (we logged the snapshot error anyway)
Expand Down
35 changes: 8 additions & 27 deletions go/vt/vttablet/tabletmanager/rpc_external_reparent.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ import (
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/topotools"
"vitess.io/vitess/go/vt/topotools/events"
"vitess.io/vitess/go/vt/vttablet/tmclient"

Expand Down Expand Up @@ -106,7 +108,7 @@ func (agent *ActionAgent) TabletExternallyReparented(ctx context.Context, extern
// tablet record. The actual record in topo will be updated later.
newTablet := proto.Clone(tablet).(*topodatapb.Tablet)
newTablet.Type = topodatapb.TabletType_MASTER

newTablet.MasterTermStartTime = logutil.TimeToProto(agent.masterTermStartTime())
// This is where updateState will block for gracePeriod, while it gives
// vtgate a chance to stop sending replica queries.
agent.updateState(ctx, newTablet, "fastTabletExternallyReparented")
Expand Down Expand Up @@ -151,15 +153,7 @@ func (agent *ActionAgent) finalizeTabletExternallyReparented(ctx context.Context
defer wg.Done()
log.Infof("finalizeTabletExternallyReparented: updating tablet record for new master: %v", agent.TabletAlias)
// Update our own record to master if needed
_, err := agent.TopoServer.UpdateTabletFields(ctx, agent.TabletAlias,
func(tablet *topodatapb.Tablet) error {
if tablet.Type != topodatapb.TabletType_MASTER {
tablet.Type = topodatapb.TabletType_MASTER
return nil
}
// returning NoUpdateNeeded avoids unnecessary calls to UpdateTablet
return topo.NewError(topo.NoUpdateNeeded, agent.TabletAlias.String())
})
_, err := topotools.ChangeType(ctx, agent.TopoServer, agent.TabletAlias, topodatapb.TabletType_MASTER, logutil.TimeToProto(agent.masterTermStartTime()))
if err != nil {
errs.RecordError(err)
}
Expand All @@ -174,16 +168,10 @@ func (agent *ActionAgent) finalizeTabletExternallyReparented(ctx context.Context

// Forcibly demote the old master in topology, since we can't rely on the
// old master to be up to change its own record.
// Call UpdateTabletFields instead of ChangeType so that we can check the type
// before changing it and avoid unnecessary topo updates
var err error
oldMasterTablet, err = agent.TopoServer.UpdateTabletFields(ctx, oldMasterAlias,
func(tablet *topodatapb.Tablet) error {
if tablet.Type == topodatapb.TabletType_MASTER {
tablet.Type = topodatapb.TabletType_REPLICA
return nil
}
// returning NoUpdateNeeded avoids unnecessary calls to UpdateTablet
return topo.NewError(topo.NoUpdateNeeded, oldMasterAlias.String())
})
oldMasterTablet, err = topotools.ChangeType(ctx, agent.TopoServer, oldMasterAlias, topodatapb.TabletType_REPLICA, nil)
if err != nil {
errs.RecordError(err)
return
Expand Down Expand Up @@ -271,14 +259,7 @@ func (agent *ActionAgent) finalizeTabletExternallyReparented(ctx context.Context
go func(alias *topodatapb.TabletAlias) {
defer wg.Done()
var err error
tab, err := agent.TopoServer.UpdateTabletFields(ctx, alias,
func(tablet *topodatapb.Tablet) error {
if tablet.Type == topodatapb.TabletType_MASTER {
tablet.Type = topodatapb.TabletType_REPLICA
return nil
}
return topo.NewError(topo.NoUpdateNeeded, alias.String())
})
tab, err := topotools.ChangeType(ctx, agent.TopoServer, alias, topodatapb.TabletType_REPLICA, nil)
if err != nil {
errs.RecordError(err)
return
Expand Down
Loading