Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/vt/servenv/servenv.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ func OnTermSync(f func()) {

// fireOnTermSyncHooks returns true iff all the hooks finish before the timeout.
func fireOnTermSyncHooks(timeout time.Duration) bool {
defer log.Flush()
log.Infof("Firing synchronous OnTermSync hooks and waiting up to %v for them", timeout)

timer := time.NewTimer(timeout)
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,12 +164,12 @@ var commands = []commandGroup{
{
"Tablets", []command{
{"InitTablet", commandInitTablet,
"[-allow_update] [-allow_different_shard] [-allow_master_override] [-parent] [-db_name_override=<db name>] [-hostname=<hostname>] [-mysql_port=<port>] [-port=<port>] [-grpc_port=<port>] [-tags=tag1:value1,tag2:value2] -keyspace=<keyspace> -shard=<shard> <tablet alias> <tablet type>",
"DEPRECATED [-allow_update] [-allow_different_shard] [-allow_master_override] [-parent] [-db_name_override=<db name>] [-hostname=<hostname>] [-mysql_port=<port>] [-port=<port>] [-grpc_port=<port>] [-tags=tag1:value1,tag2:value2] -keyspace=<keyspace> -shard=<shard> <tablet alias> <tablet type>",
"Initializes a tablet in the topology.\n"},
{"GetTablet", commandGetTablet,
"<tablet alias>",
"Outputs a JSON structure that contains information about the Tablet."},
{"UpdateTabletAddrs", commandUpdateTabletAddrs,
{"DEPRECATED UpdateTabletAddrs", commandUpdateTabletAddrs,
"[-hostname <hostname>] [-ip-addr <ip addr>] [-mysql-port <mysql port>] [-vt-port <vt port>] [-grpc-port <grpc port>] <tablet alias> ",
"Updates the IP address and port numbers of a tablet."},
{"DeleteTablet", commandDeleteTablet,
Expand Down
91 changes: 45 additions & 46 deletions go/vt/vttablet/tabletmanager/action_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,6 @@ type ActionAgent struct {
// _shardSyncCancel is the function to stop the background shard sync goroutine.
_shardSyncCancel context.CancelFunc

// _tablet has the Tablet record we last read from the topology server.
_tablet *topodatapb.Tablet

// _disallowQueryService is set to the reason we should be
// disallowing queries from being served. It is set from changeCallback,
// and used by healthcheck. If empty, we should allow queries.
Expand Down Expand Up @@ -230,6 +227,11 @@ type ActionAgent struct {
_lockTablesTimer *time.Timer
// _isBackupRunning tells us whether there is a backup that is currently running
_isBackupRunning bool

pubMu sync.Mutex
// tablet has the Tablet record we last read from the topology server.
tablet *topodatapb.Tablet
isPublishing bool
}

// NewActionAgent creates a new ActionAgent and registers all the
Expand Down Expand Up @@ -326,6 +328,33 @@ func NewActionAgent(
agent.registerQueryService()
})

// optionally populate metadata records
if !*restoreFromBackup && *initPopulateMetadata {
// we use initialTablet here because it has the intended tabletType.
// the tablet returned by agent.Tablet() will have type UNKNOWN until
// we call updateState.
localMetadata := agent.getLocalMetadataValues(agent.initialTablet.Type)
if agent.Cnf != nil { // we are managing mysqld
// we'll use batchCtx here because we are still initializing and can't proceed unless this succeeds
if err := agent.MysqlDaemon.Wait(batchCtx, agent.Cnf); err != nil {
return nil, err
}
}
err := mysqlctl.PopulateMetadataTables(agent.MysqlDaemon, localMetadata, topoproto.TabletDbName(agent.initialTablet))
if err != nil {
return nil, vterrors.Wrap(err, "failed to -init_populate_metadata")
}
}

// Update our state (need the action lock).
// We do this upfront to prevent this from racing with restoreFromBackup
// in case it gets launched.
if err := agent.lock(batchCtx); err != nil {
return nil, err
}
agent.updateState(batchCtx, agent.initialTablet, "Start")
agent.unlock()

// two cases then:
// - restoreFromBackup is set: we restore, then initHealthCheck, all
// in the background
Expand All @@ -342,34 +371,6 @@ func NewActionAgent(
agent.initHealthCheck()
}()
} else {
// optionally populate metadata records
if *initPopulateMetadata {
// we use initialTablet here because it has the intended tabletType.
// the tablet returned by agent.Tablet() will have type UNKNOWN until we call
// refreshTablet
localMetadata := agent.getLocalMetadataValues(agent.initialTablet.Type)
if agent.Cnf != nil { // we are managing mysqld
// we'll use batchCtx here because we are still initializing and can't proceed unless this succeeds
if err := agent.MysqlDaemon.Wait(batchCtx, agent.Cnf); err != nil {
return nil, err
}
}
err := mysqlctl.PopulateMetadataTables(agent.MysqlDaemon, localMetadata, topoproto.TabletDbName(agent.initialTablet))
if err != nil {
return nil, vterrors.Wrap(err, "failed to -init_populate_metadata")
}
}

// Update our state (need the action lock).
if err := agent.lock(batchCtx); err != nil {
return nil, err
}
if err := agent.refreshTablet(batchCtx, "Start"); err != nil {
agent.unlock()
return nil, err
}
agent.unlock()

// synchronously start health check if needed
agent.initHealthCheck()
}
Expand Down Expand Up @@ -423,9 +424,7 @@ func NewTestActionAgent(batchCtx context.Context, ts *topo.Server, tabletAlias *
panic(vterrors.Wrap(err, "agent.lock() failed"))
}
defer agent.unlock()
if err := agent.refreshTablet(batchCtx, "Start"); err != nil {
panic(vterrors.Wrapf(err, "agent.refreshTablet(%v) failed", tabletAlias))
}
agent.updateState(batchCtx, agent.initialTablet, "Start")

return agent
}
Expand Down Expand Up @@ -475,9 +474,7 @@ func NewComboActionAgent(batchCtx context.Context, ts *topo.Server, tabletAlias
panic(vterrors.Wrap(err, "agent.lock() failed"))
}
defer agent.unlock()
if err := agent.refreshTablet(batchCtx, "Start"); err != nil {
panic(vterrors.Wrapf(err, "agent.refreshTablet(%v) failed", tabletAlias))
}
agent.updateState(batchCtx, agent.initialTablet, "Start")

return agent
}
Expand All @@ -488,19 +485,19 @@ func (agent *ActionAgent) registerQueryRuleSources() {
}

func (agent *ActionAgent) setTablet(tablet *topodatapb.Tablet) {
agent.mutex.Lock()
agent._tablet = proto.Clone(tablet).(*topodatapb.Tablet)
agent.mutex.Unlock()
agent.pubMu.Lock()
agent.tablet = proto.Clone(tablet).(*topodatapb.Tablet)
agent.pubMu.Unlock()

// Notify the shard sync loop that the tablet state changed.
agent.notifyShardSync()
}

// Tablet reads the stored Tablet from the agent, protected by mutex.
// Tablet reads the stored Tablet from the agent.
func (agent *ActionAgent) Tablet() *topodatapb.Tablet {
agent.mutex.Lock()
tablet := proto.Clone(agent._tablet).(*topodatapb.Tablet)
agent.mutex.Unlock()
agent.pubMu.Lock()
tablet := proto.Clone(agent.tablet).(*topodatapb.Tablet)
agent.pubMu.Unlock()
return tablet
}

Expand Down Expand Up @@ -666,6 +663,9 @@ func (agent *ActionAgent) Start(ctx context.Context, dbcfgs *dbconfigs.DBConfigs
return err
}

// Initialize masterTermStartTime
agent.setMasterTermStartTime(logutil.ProtoToTime(agent.initialTablet.MasterTermStartTime))

// Verify the topology is correct.
agent.verifyTopology(ctx)

Expand Down Expand Up @@ -719,8 +719,7 @@ func (agent *ActionAgent) Start(ctx context.Context, dbcfgs *dbconfigs.DBConfigs

// Initialize the current tablet to match our current running
// state: Has most field filled in, but type is UNKNOWN.
// Subsequents calls to updateState or refreshTablet
// will then work as expected.
// Subsequents calls to updateState will then work as expected.
startingTablet := proto.Clone(agent.initialTablet).(*topodatapb.Tablet)
startingTablet.Type = topodatapb.TabletType_UNKNOWN
agent.setTablet(startingTablet)
Expand Down
3 changes: 1 addition & 2 deletions go/vt/vttablet/tabletmanager/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"html/template"
"time"

"github.com/golang/protobuf/proto"
"vitess.io/vitess/go/timer"
"vitess.io/vitess/go/vt/health"
"vitess.io/vitess/go/vt/log"
Expand Down Expand Up @@ -182,8 +181,8 @@ func (agent *ActionAgent) runHealthCheck() {
func (agent *ActionAgent) runHealthCheckLocked() {
agent.checkLock()
// read the current tablet record and tablet control
tablet := agent.Tablet()
agent.mutex.Lock()
tablet := proto.Clone(agent._tablet).(*topodatapb.Tablet)
shouldBeServing := agent._disallowQueryService == ""
runUpdateStream := agent._enableUpdateStream
ignoreErrorExpr := agent._ignoreHealthErrorExpr
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,7 @@ func TestTabletControl(t *testing.T) {
// now refresh the tablet state, as the resharding process would do
agent.RefreshState(ctx)

// QueryService changed back from SERVING to NOT_SERVING since refreshTablet()
// QueryService changed back from SERVING to NOT_SERVING since RefreshState()
// re-read the topology and saw that REPLICA is still not allowed to serve.
if _, err := expectBroadcastData(agent.QueryServiceControl, true, "", 19); err != nil {
t.Fatal(err)
Expand Down
15 changes: 7 additions & 8 deletions go/vt/vttablet/tabletmanager/init_tablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func (agent *ActionAgent) InitTablet(port, gRPCPort int32) error {
}); err != nil {
return vterrors.Wrap(err, "InitTablet cannot GetOrCreateShard shard")
}
var masterTermStartTime time.Time
if si.MasterAlias != nil && topoproto.TabletAliasEqual(si.MasterAlias, agent.TabletAlias) {
// We're marked as master in the shard record, which could mean the master
// tablet process was just restarted. However, we need to check if a new
Expand All @@ -110,7 +111,7 @@ func (agent *ActionAgent) InitTablet(port, gRPCPort int32) error {
// Update the master term start time (current value is 0) because we
// assume that we are actually the MASTER and in case of a tiebreak,
// vtgate should prefer us.
agent.setMasterTermStartTime(time.Now())
masterTermStartTime = time.Now()
case err == nil:
if oldTablet.Type == topodatapb.TabletType_MASTER {
// We're marked as master in the shard record,
Expand All @@ -119,9 +120,9 @@ func (agent *ActionAgent) InitTablet(port, gRPCPort int32) error {
// Read the master term start time from tablet.
// If it is nil, it might mean that we are upgrading, so use current time instead
if oldTablet.MasterTermStartTime != nil {
agent.setMasterTermStartTime(oldTablet.GetMasterTermStartTime())
masterTermStartTime = oldTablet.GetMasterTermStartTime()
} else {
agent.setMasterTermStartTime(time.Now())
masterTermStartTime = time.Now()
}
}
default:
Expand All @@ -141,7 +142,7 @@ func (agent *ActionAgent) InitTablet(port, gRPCPort int32) error {
if oldMasterTermStartTime.After(currentShardTime) {
tabletType = topodatapb.TabletType_MASTER
// read the master term start time from tablet
agent.setMasterTermStartTime(oldMasterTermStartTime)
masterTermStartTime = oldTablet.GetMasterTermStartTime()
}
}
default:
Expand Down Expand Up @@ -220,8 +221,8 @@ func (agent *ActionAgent) InitTablet(port, gRPCPort int32) error {
DbNameOverride: *initDbNameOverride,
Tags: initTags,
}
if !agent.masterTermStartTime().IsZero() {
tablet.MasterTermStartTime = logutil.TimeToProto(agent.masterTermStartTime())
if !masterTermStartTime.IsZero() {
tablet.MasterTermStartTime = logutil.TimeToProto(masterTermStartTime)
}
if port != 0 {
tablet.PortMap["vt"] = port
Expand Down Expand Up @@ -265,7 +266,5 @@ func (agent *ActionAgent) InitTablet(port, gRPCPort int32) error {
default:
return vterrors.Wrap(err, "CreateTablet failed")
}

agent.setTablet(tablet)
return nil
}
7 changes: 4 additions & 3 deletions go/vt/vttablet/tabletmanager/init_tablet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"vitess.io/vitess/go/history"
"vitess.io/vitess/go/mysql/fakesqldb"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/memorytopo"
Expand Down Expand Up @@ -295,7 +296,7 @@ func TestInitTablet(t *testing.T) {
if ti.Type != topodatapb.TabletType_MASTER {
t.Errorf("wrong tablet type: %v", ti.Type)
}
ter1 := agent._masterTermStartTime
ter1 := logutil.ProtoToTime(ti.Tablet.MasterTermStartTime)
if ter1.IsZero() {
t.Fatalf("MASTER tablet should have a masterTermStartTime set")
}
Expand All @@ -317,7 +318,7 @@ func TestInitTablet(t *testing.T) {
if ti.Type != topodatapb.TabletType_MASTER {
t.Errorf("wrong tablet type: %v", ti.Type)
}
ter2 := agent._masterTermStartTime
ter2 := logutil.ProtoToTime(ti.Tablet.MasterTermStartTime)
if ter2.IsZero() || !ter2.Equal(ter1) {
t.Fatalf("After a restart, masterTermStartTime must be equal to the previous time saved in the tablet record. Previous timestamp: %v current timestamp: %v", ter1, ter2)
}
Expand All @@ -342,7 +343,7 @@ func TestInitTablet(t *testing.T) {
if len(ti.Tags) != 1 || ti.Tags["aaa"] != "bbb" {
t.Errorf("wrong tablet tags: %v", ti.Tags)
}
ter3 := agent._masterTermStartTime
ter3 := logutil.ProtoToTime(ti.Tablet.MasterTermStartTime)
if ter3.IsZero() || !ter3.Equal(ter2) {
t.Fatalf("After a restart, masterTermStartTime must be set to the previous time saved in the tablet record. Previous timestamp: %v current timestamp: %v", ter2, ter3)
}
Expand Down
38 changes: 7 additions & 31 deletions go/vt/vttablet/tabletmanager/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,28 +62,16 @@ func (agent *ActionAgent) RestoreData(ctx context.Context, logger logutil.Logger
}

func (agent *ActionAgent) restoreDataLocked(ctx context.Context, logger logutil.Logger, waitForBackupInterval time.Duration, deleteBeforeRestore bool) error {
// change type to RESTORE (using UpdateTabletFields so it's
// always authorized)
var originalType topodatapb.TabletType
if _, err := agent.TopoServer.UpdateTabletFields(ctx, agent.TabletAlias, func(tablet *topodatapb.Tablet) error {
originalType = tablet.Type
tablet.Type = topodatapb.TabletType_RESTORE
return nil
}); err != nil {
return vterrors.Wrap(err, "Cannot change type to RESTORE")
}

// let's update our internal state (stop query service and other things)
if err := agent.refreshTablet(ctx, "restore from backup"); err != nil {
return vterrors.Wrap(err, "failed to update state before restore")
}
tablet := agent.Tablet()
originalType, tablet.Type = tablet.Type, topodatapb.TabletType_RESTORE
agent.updateState(ctx, tablet, "restore from backup")

// Try to restore. Depending on the reason for failure, we may be ok.
// If we're not ok, return an error and the agent will log.Fatalf,
// causing the process to be restarted and the restore retried.
// Record local metadata values based on the original type.
localMetadata := agent.getLocalMetadataValues(originalType)
tablet := agent.Tablet()

keyspace := tablet.Keyspace
keyspaceInfo, err := agent.TopoServer.GetKeyspace(ctx, keyspace)
Expand Down Expand Up @@ -157,11 +145,8 @@ func (agent *ActionAgent) restoreDataLocked(ctx context.Context, logger logutil.
// alter replication here.
default:
// If anything failed, we should reset the original tablet type
agent.TopoServer.UpdateTabletFields(context.Background(), tablet.Alias, func(tablet *topodatapb.Tablet) error {
tablet.Type = originalType
return nil
})
agent.refreshTablet(ctx, "failed for restore from backup")
tablet.Type = originalType
agent.updateState(ctx, tablet, "failed for restore from backup")
return vterrors.Wrap(err, "Can't restore backup")
}

Expand All @@ -175,17 +160,8 @@ func (agent *ActionAgent) restoreDataLocked(ctx context.Context, logger logutil.
}

// Change type back to original type if we're ok to serve.
if _, err := agent.TopoServer.UpdateTabletFields(context.Background(), tablet.Alias, func(tablet *topodatapb.Tablet) error {
tablet.Type = originalType
return nil
}); err != nil {
return vterrors.Wrapf(err, "Cannot change type back to %v", originalType)
}

// let's update our internal state (start query service and other things)
if err := agent.refreshTablet(context.Background(), "after restore from backup"); err != nil {
return vterrors.Wrap(err, "failed to update state after backup")
}
tablet.Type = originalType
agent.updateState(ctx, tablet, "after restore from backup")

return nil
}
Expand Down
Loading