diff --git a/go/vt/servenv/servenv.go b/go/vt/servenv/servenv.go index 27d1b3d4bf1..871528a5cbb 100644 --- a/go/vt/servenv/servenv.go +++ b/go/vt/servenv/servenv.go @@ -158,6 +158,7 @@ func OnTermSync(f func()) { // fireOnTermSyncHooks returns true iff all the hooks finish before the timeout. func fireOnTermSyncHooks(timeout time.Duration) bool { + defer log.Flush() log.Infof("Firing synchronous OnTermSync hooks and waiting up to %v for them", timeout) timer := time.NewTimer(timeout) diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index 2acf9a970f9..524cd2b84ba 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -164,12 +164,12 @@ var commands = []commandGroup{ { "Tablets", []command{ {"InitTablet", commandInitTablet, - "[-allow_update] [-allow_different_shard] [-allow_master_override] [-parent] [-db_name_override=] [-hostname=] [-mysql_port=] [-port=] [-grpc_port=] [-tags=tag1:value1,tag2:value2] -keyspace= -shard= ", + "DEPRECATED [-allow_update] [-allow_different_shard] [-allow_master_override] [-parent] [-db_name_override=] [-hostname=] [-mysql_port=] [-port=] [-grpc_port=] [-tags=tag1:value1,tag2:value2] -keyspace= -shard= ", "Initializes a tablet in the topology.\n"}, {"GetTablet", commandGetTablet, "", "Outputs a JSON structure that contains information about the Tablet."}, - {"UpdateTabletAddrs", commandUpdateTabletAddrs, + {"DEPRECATED UpdateTabletAddrs", commandUpdateTabletAddrs, "[-hostname ] [-ip-addr ] [-mysql-port ] [-vt-port ] [-grpc-port ] ", "Updates the IP address and port numbers of a tablet."}, {"DeleteTablet", commandDeleteTablet, diff --git a/go/vt/vttablet/tabletmanager/action_agent.go b/go/vt/vttablet/tabletmanager/action_agent.go index 95f391d6ab1..836fb1399ef 100644 --- a/go/vt/vttablet/tabletmanager/action_agent.go +++ b/go/vt/vttablet/tabletmanager/action_agent.go @@ -184,9 +184,6 @@ type ActionAgent struct { // _shardSyncCancel is the function to stop the background shard sync goroutine. _shardSyncCancel context.CancelFunc - // _tablet has the Tablet record we last read from the topology server. - _tablet *topodatapb.Tablet - // _disallowQueryService is set to the reason we should be // disallowing queries from being served. It is set from changeCallback, // and used by healthcheck. If empty, we should allow queries. @@ -230,6 +227,11 @@ type ActionAgent struct { _lockTablesTimer *time.Timer // _isBackupRunning tells us whether there is a backup that is currently running _isBackupRunning bool + + pubMu sync.Mutex + // tablet has the Tablet record we last read from the topology server. + tablet *topodatapb.Tablet + isPublishing bool } // NewActionAgent creates a new ActionAgent and registers all the @@ -326,6 +328,33 @@ func NewActionAgent( agent.registerQueryService() }) + // optionally populate metadata records + if !*restoreFromBackup && *initPopulateMetadata { + // we use initialTablet here because it has the intended tabletType. + // the tablet returned by agent.Tablet() will have type UNKNOWN until + // we call updateState. + localMetadata := agent.getLocalMetadataValues(agent.initialTablet.Type) + if agent.Cnf != nil { // we are managing mysqld + // we'll use batchCtx here because we are still initializing and can't proceed unless this succeeds + if err := agent.MysqlDaemon.Wait(batchCtx, agent.Cnf); err != nil { + return nil, err + } + } + err := mysqlctl.PopulateMetadataTables(agent.MysqlDaemon, localMetadata, topoproto.TabletDbName(agent.initialTablet)) + if err != nil { + return nil, vterrors.Wrap(err, "failed to -init_populate_metadata") + } + } + + // Update our state (need the action lock). + // We do this upfront to prevent this from racing with restoreFromBackup + // in case it gets launched. + if err := agent.lock(batchCtx); err != nil { + return nil, err + } + agent.updateState(batchCtx, agent.initialTablet, "Start") + agent.unlock() + // two cases then: // - restoreFromBackup is set: we restore, then initHealthCheck, all // in the background @@ -342,34 +371,6 @@ func NewActionAgent( agent.initHealthCheck() }() } else { - // optionally populate metadata records - if *initPopulateMetadata { - // we use initialTablet here because it has the intended tabletType. - // the tablet returned by agent.Tablet() will have type UNKNOWN until we call - // refreshTablet - localMetadata := agent.getLocalMetadataValues(agent.initialTablet.Type) - if agent.Cnf != nil { // we are managing mysqld - // we'll use batchCtx here because we are still initializing and can't proceed unless this succeeds - if err := agent.MysqlDaemon.Wait(batchCtx, agent.Cnf); err != nil { - return nil, err - } - } - err := mysqlctl.PopulateMetadataTables(agent.MysqlDaemon, localMetadata, topoproto.TabletDbName(agent.initialTablet)) - if err != nil { - return nil, vterrors.Wrap(err, "failed to -init_populate_metadata") - } - } - - // Update our state (need the action lock). - if err := agent.lock(batchCtx); err != nil { - return nil, err - } - if err := agent.refreshTablet(batchCtx, "Start"); err != nil { - agent.unlock() - return nil, err - } - agent.unlock() - // synchronously start health check if needed agent.initHealthCheck() } @@ -423,9 +424,7 @@ func NewTestActionAgent(batchCtx context.Context, ts *topo.Server, tabletAlias * panic(vterrors.Wrap(err, "agent.lock() failed")) } defer agent.unlock() - if err := agent.refreshTablet(batchCtx, "Start"); err != nil { - panic(vterrors.Wrapf(err, "agent.refreshTablet(%v) failed", tabletAlias)) - } + agent.updateState(batchCtx, agent.initialTablet, "Start") return agent } @@ -475,9 +474,7 @@ func NewComboActionAgent(batchCtx context.Context, ts *topo.Server, tabletAlias panic(vterrors.Wrap(err, "agent.lock() failed")) } defer agent.unlock() - if err := agent.refreshTablet(batchCtx, "Start"); err != nil { - panic(vterrors.Wrapf(err, "agent.refreshTablet(%v) failed", tabletAlias)) - } + agent.updateState(batchCtx, agent.initialTablet, "Start") return agent } @@ -488,19 +485,19 @@ func (agent *ActionAgent) registerQueryRuleSources() { } func (agent *ActionAgent) setTablet(tablet *topodatapb.Tablet) { - agent.mutex.Lock() - agent._tablet = proto.Clone(tablet).(*topodatapb.Tablet) - agent.mutex.Unlock() + agent.pubMu.Lock() + agent.tablet = proto.Clone(tablet).(*topodatapb.Tablet) + agent.pubMu.Unlock() // Notify the shard sync loop that the tablet state changed. agent.notifyShardSync() } -// Tablet reads the stored Tablet from the agent, protected by mutex. +// Tablet reads the stored Tablet from the agent. func (agent *ActionAgent) Tablet() *topodatapb.Tablet { - agent.mutex.Lock() - tablet := proto.Clone(agent._tablet).(*topodatapb.Tablet) - agent.mutex.Unlock() + agent.pubMu.Lock() + tablet := proto.Clone(agent.tablet).(*topodatapb.Tablet) + agent.pubMu.Unlock() return tablet } @@ -666,6 +663,9 @@ func (agent *ActionAgent) Start(ctx context.Context, dbcfgs *dbconfigs.DBConfigs return err } + // Initialize masterTermStartTime + agent.setMasterTermStartTime(logutil.ProtoToTime(agent.initialTablet.MasterTermStartTime)) + // Verify the topology is correct. agent.verifyTopology(ctx) @@ -719,8 +719,7 @@ func (agent *ActionAgent) Start(ctx context.Context, dbcfgs *dbconfigs.DBConfigs // Initialize the current tablet to match our current running // state: Has most field filled in, but type is UNKNOWN. - // Subsequents calls to updateState or refreshTablet - // will then work as expected. + // Subsequents calls to updateState will then work as expected. startingTablet := proto.Clone(agent.initialTablet).(*topodatapb.Tablet) startingTablet.Type = topodatapb.TabletType_UNKNOWN agent.setTablet(startingTablet) diff --git a/go/vt/vttablet/tabletmanager/healthcheck.go b/go/vt/vttablet/tabletmanager/healthcheck.go index 3bbb9992b71..65edb4f15db 100644 --- a/go/vt/vttablet/tabletmanager/healthcheck.go +++ b/go/vt/vttablet/tabletmanager/healthcheck.go @@ -33,7 +33,6 @@ import ( "html/template" "time" - "github.com/golang/protobuf/proto" "vitess.io/vitess/go/timer" "vitess.io/vitess/go/vt/health" "vitess.io/vitess/go/vt/log" @@ -182,8 +181,8 @@ func (agent *ActionAgent) runHealthCheck() { func (agent *ActionAgent) runHealthCheckLocked() { agent.checkLock() // read the current tablet record and tablet control + tablet := agent.Tablet() agent.mutex.Lock() - tablet := proto.Clone(agent._tablet).(*topodatapb.Tablet) shouldBeServing := agent._disallowQueryService == "" runUpdateStream := agent._enableUpdateStream ignoreErrorExpr := agent._ignoreHealthErrorExpr diff --git a/go/vt/vttablet/tabletmanager/healthcheck_test.go b/go/vt/vttablet/tabletmanager/healthcheck_test.go index 9085f4820e9..e50cb14d1c2 100644 --- a/go/vt/vttablet/tabletmanager/healthcheck_test.go +++ b/go/vt/vttablet/tabletmanager/healthcheck_test.go @@ -670,7 +670,7 @@ func TestTabletControl(t *testing.T) { // now refresh the tablet state, as the resharding process would do agent.RefreshState(ctx) - // QueryService changed back from SERVING to NOT_SERVING since refreshTablet() + // QueryService changed back from SERVING to NOT_SERVING since RefreshState() // re-read the topology and saw that REPLICA is still not allowed to serve. if _, err := expectBroadcastData(agent.QueryServiceControl, true, "", 19); err != nil { t.Fatal(err) diff --git a/go/vt/vttablet/tabletmanager/init_tablet.go b/go/vt/vttablet/tabletmanager/init_tablet.go index ea4edc985a6..f1ca125ea64 100644 --- a/go/vt/vttablet/tabletmanager/init_tablet.go +++ b/go/vt/vttablet/tabletmanager/init_tablet.go @@ -96,6 +96,7 @@ func (agent *ActionAgent) InitTablet(port, gRPCPort int32) error { }); err != nil { return vterrors.Wrap(err, "InitTablet cannot GetOrCreateShard shard") } + var masterTermStartTime time.Time if si.MasterAlias != nil && topoproto.TabletAliasEqual(si.MasterAlias, agent.TabletAlias) { // We're marked as master in the shard record, which could mean the master // tablet process was just restarted. However, we need to check if a new @@ -110,7 +111,7 @@ func (agent *ActionAgent) InitTablet(port, gRPCPort int32) error { // Update the master term start time (current value is 0) because we // assume that we are actually the MASTER and in case of a tiebreak, // vtgate should prefer us. - agent.setMasterTermStartTime(time.Now()) + masterTermStartTime = time.Now() case err == nil: if oldTablet.Type == topodatapb.TabletType_MASTER { // We're marked as master in the shard record, @@ -119,9 +120,9 @@ func (agent *ActionAgent) InitTablet(port, gRPCPort int32) error { // Read the master term start time from tablet. // If it is nil, it might mean that we are upgrading, so use current time instead if oldTablet.MasterTermStartTime != nil { - agent.setMasterTermStartTime(oldTablet.GetMasterTermStartTime()) + masterTermStartTime = oldTablet.GetMasterTermStartTime() } else { - agent.setMasterTermStartTime(time.Now()) + masterTermStartTime = time.Now() } } default: @@ -141,7 +142,7 @@ func (agent *ActionAgent) InitTablet(port, gRPCPort int32) error { if oldMasterTermStartTime.After(currentShardTime) { tabletType = topodatapb.TabletType_MASTER // read the master term start time from tablet - agent.setMasterTermStartTime(oldMasterTermStartTime) + masterTermStartTime = oldTablet.GetMasterTermStartTime() } } default: @@ -220,8 +221,8 @@ func (agent *ActionAgent) InitTablet(port, gRPCPort int32) error { DbNameOverride: *initDbNameOverride, Tags: initTags, } - if !agent.masterTermStartTime().IsZero() { - tablet.MasterTermStartTime = logutil.TimeToProto(agent.masterTermStartTime()) + if !masterTermStartTime.IsZero() { + tablet.MasterTermStartTime = logutil.TimeToProto(masterTermStartTime) } if port != 0 { tablet.PortMap["vt"] = port @@ -265,7 +266,5 @@ func (agent *ActionAgent) InitTablet(port, gRPCPort int32) error { default: return vterrors.Wrap(err, "CreateTablet failed") } - - agent.setTablet(tablet) return nil } diff --git a/go/vt/vttablet/tabletmanager/init_tablet_test.go b/go/vt/vttablet/tabletmanager/init_tablet_test.go index 9936d402133..897d004a9b6 100644 --- a/go/vt/vttablet/tabletmanager/init_tablet_test.go +++ b/go/vt/vttablet/tabletmanager/init_tablet_test.go @@ -27,6 +27,7 @@ import ( "vitess.io/vitess/go/history" "vitess.io/vitess/go/mysql/fakesqldb" "vitess.io/vitess/go/vt/dbconfigs" + "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" @@ -295,7 +296,7 @@ func TestInitTablet(t *testing.T) { if ti.Type != topodatapb.TabletType_MASTER { t.Errorf("wrong tablet type: %v", ti.Type) } - ter1 := agent._masterTermStartTime + ter1 := logutil.ProtoToTime(ti.Tablet.MasterTermStartTime) if ter1.IsZero() { t.Fatalf("MASTER tablet should have a masterTermStartTime set") } @@ -317,7 +318,7 @@ func TestInitTablet(t *testing.T) { if ti.Type != topodatapb.TabletType_MASTER { t.Errorf("wrong tablet type: %v", ti.Type) } - ter2 := agent._masterTermStartTime + ter2 := logutil.ProtoToTime(ti.Tablet.MasterTermStartTime) if ter2.IsZero() || !ter2.Equal(ter1) { t.Fatalf("After a restart, masterTermStartTime must be equal to the previous time saved in the tablet record. Previous timestamp: %v current timestamp: %v", ter1, ter2) } @@ -342,7 +343,7 @@ func TestInitTablet(t *testing.T) { if len(ti.Tags) != 1 || ti.Tags["aaa"] != "bbb" { t.Errorf("wrong tablet tags: %v", ti.Tags) } - ter3 := agent._masterTermStartTime + ter3 := logutil.ProtoToTime(ti.Tablet.MasterTermStartTime) if ter3.IsZero() || !ter3.Equal(ter2) { t.Fatalf("After a restart, masterTermStartTime must be set to the previous time saved in the tablet record. Previous timestamp: %v current timestamp: %v", ter2, ter3) } diff --git a/go/vt/vttablet/tabletmanager/restore.go b/go/vt/vttablet/tabletmanager/restore.go index b1ae16391ba..aed6a4a864e 100644 --- a/go/vt/vttablet/tabletmanager/restore.go +++ b/go/vt/vttablet/tabletmanager/restore.go @@ -62,28 +62,16 @@ func (agent *ActionAgent) RestoreData(ctx context.Context, logger logutil.Logger } func (agent *ActionAgent) restoreDataLocked(ctx context.Context, logger logutil.Logger, waitForBackupInterval time.Duration, deleteBeforeRestore bool) error { - // change type to RESTORE (using UpdateTabletFields so it's - // always authorized) var originalType topodatapb.TabletType - if _, err := agent.TopoServer.UpdateTabletFields(ctx, agent.TabletAlias, func(tablet *topodatapb.Tablet) error { - originalType = tablet.Type - tablet.Type = topodatapb.TabletType_RESTORE - return nil - }); err != nil { - return vterrors.Wrap(err, "Cannot change type to RESTORE") - } - - // let's update our internal state (stop query service and other things) - if err := agent.refreshTablet(ctx, "restore from backup"); err != nil { - return vterrors.Wrap(err, "failed to update state before restore") - } + tablet := agent.Tablet() + originalType, tablet.Type = tablet.Type, topodatapb.TabletType_RESTORE + agent.updateState(ctx, tablet, "restore from backup") // Try to restore. Depending on the reason for failure, we may be ok. // If we're not ok, return an error and the agent will log.Fatalf, // causing the process to be restarted and the restore retried. // Record local metadata values based on the original type. localMetadata := agent.getLocalMetadataValues(originalType) - tablet := agent.Tablet() keyspace := tablet.Keyspace keyspaceInfo, err := agent.TopoServer.GetKeyspace(ctx, keyspace) @@ -157,11 +145,8 @@ func (agent *ActionAgent) restoreDataLocked(ctx context.Context, logger logutil. // alter replication here. default: // If anything failed, we should reset the original tablet type - agent.TopoServer.UpdateTabletFields(context.Background(), tablet.Alias, func(tablet *topodatapb.Tablet) error { - tablet.Type = originalType - return nil - }) - agent.refreshTablet(ctx, "failed for restore from backup") + tablet.Type = originalType + agent.updateState(ctx, tablet, "failed for restore from backup") return vterrors.Wrap(err, "Can't restore backup") } @@ -175,17 +160,8 @@ func (agent *ActionAgent) restoreDataLocked(ctx context.Context, logger logutil. } // Change type back to original type if we're ok to serve. - if _, err := agent.TopoServer.UpdateTabletFields(context.Background(), tablet.Alias, func(tablet *topodatapb.Tablet) error { - tablet.Type = originalType - return nil - }); err != nil { - return vterrors.Wrapf(err, "Cannot change type back to %v", originalType) - } - - // let's update our internal state (start query service and other things) - if err := agent.refreshTablet(context.Background(), "after restore from backup"); err != nil { - return vterrors.Wrap(err, "failed to update state after backup") - } + tablet.Type = originalType + agent.updateState(ctx, tablet, "after restore from backup") return nil } diff --git a/go/vt/vttablet/tabletmanager/rpc_actions.go b/go/vt/vttablet/tabletmanager/rpc_actions.go index b6d88215628..049518a44b8 100644 --- a/go/vt/vttablet/tabletmanager/rpc_actions.go +++ b/go/vt/vttablet/tabletmanager/rpc_actions.go @@ -27,7 +27,6 @@ import ( "golang.org/x/net/context" "vitess.io/vitess/go/vt/hook" - "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/mysqlctl" "vitess.io/vitess/go/vt/topotools" @@ -76,28 +75,29 @@ func (agent *ActionAgent) changeTypeLocked(ctx context.Context, tabletType topod return fmt.Errorf("Tablet: %v, is already drained", agent.TabletAlias) } - agentMasterTermStartTime := time.Time{} + tablet := agent.Tablet() + tablet.Type = tabletType // If we have been told we're master, set master term start time to Now + // and save it topo immediately. if tabletType == topodatapb.TabletType_MASTER { - agentMasterTermStartTime = time.Now() - } - // change our type in the topology, and set masterTermStartTime on tablet record if applicable - _, err := topotools.ChangeType(ctx, agent.TopoServer, agent.TabletAlias, tabletType, logutil.TimeToProto(agentMasterTermStartTime)) - if err != nil { - return err - } - // We only update agent's masterTermStartTime if we were able to update the topo. - // This ensures that in case of a failure, we are never in a situation where the - // tablet's timestamp is ahead of the topo's timestamp. - if tabletType == topodatapb.TabletType_MASTER { + agentMasterTermStartTime := time.Now() + tablet.MasterTermStartTime = logutil.TimeToProto(agentMasterTermStartTime) + + // change our type in the topology, and set masterTermStartTime on tablet record if applicable + _, err := topotools.ChangeType(ctx, agent.TopoServer, agent.TabletAlias, tabletType, tablet.MasterTermStartTime) + if err != nil { + return err + } + // We only update agent's masterTermStartTime if we were able to update the topo. + // This ensures that in case of a failure, we are never in a situation where the + // tablet's timestamp is ahead of the topo's timestamp. agent.setMasterTermStartTime(agentMasterTermStartTime) + } else { + agent.setMasterTermStartTime(time.Time{}) } - // let's update our internal state (stop query service and other things) - // refreshTablet will invoke broadcastHealth if needed. - if err := agent.refreshTablet(ctx, "ChangeType"); err != nil { - return err - } + // updateState will invoke broadcastHealth if needed. + agent.updateState(ctx, tablet, "ChangeType") // Let's see if we need to fix semi-sync acking. if err := agent.fixSemiSyncAndReplication(agent.Tablet().Type); err != nil { @@ -127,14 +127,7 @@ func (agent *ActionAgent) ExecuteHook(ctx context.Context, hk *hook.Hook) *hook. // Execute the hooks topotools.ConfigureTabletHook(hk, agent.TabletAlias) - hr := hk.Execute() - - // We never know what the hook did, so let's refresh our state. - if err := agent.refreshTablet(ctx, "ExecuteHook"); err != nil { - log.Errorf("refreshTablet after ExecuteHook failed: %v", err) - } - - return hr + return hk.Execute() } // RefreshState reload the tablet record from the topo server. diff --git a/go/vt/vttablet/tabletmanager/rpc_replication.go b/go/vt/vttablet/tabletmanager/rpc_replication.go index 21e16ec1384..f07bb3f28a9 100644 --- a/go/vt/vttablet/tabletmanager/rpc_replication.go +++ b/go/vt/vttablet/tabletmanager/rpc_replication.go @@ -27,7 +27,6 @@ import ( "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/mysqlctl" - "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vterrors" @@ -491,19 +490,7 @@ func (agent *ActionAgent) SetMaster(ctx context.Context, parentAlias *topodatapb } defer agent.unlock() - if err := agent.setMasterLocked(ctx, parentAlias, timeCreatedNS, waitPosition, forceStartSlave); err != nil { - return err - } - - // Always refresh the tablet, even if we may not have changed it. - // It's possible that we changed it earlier but failed to refresh. - // Note that we do this outside setMasterLocked() because this should never - // be done as part of setMasterRepairReplication(). - if err := agent.refreshTablet(ctx, "SetMaster"); err != nil { - return err - } - - return nil + return agent.setMasterLocked(ctx, parentAlias, timeCreatedNS, waitPosition, forceStartSlave) } func (agent *ActionAgent) setMasterRepairReplication(ctx context.Context, parentAlias *topodatapb.TabletAlias, timeCreatedNS int64, waitPosition string, forceStartSlave bool) (err error) { @@ -542,16 +529,11 @@ func (agent *ActionAgent) setMasterLocked(ctx context.Context, parentAlias *topo // steps fail below. // Note it is important to check for MASTER here so that we don't // unintentionally change the type of RDONLY tablets - _, err = agent.TopoServer.UpdateTabletFields(ctx, agent.TabletAlias, func(tablet *topodatapb.Tablet) error { - if tablet.Type == topodatapb.TabletType_MASTER { - tablet.Type = topodatapb.TabletType_REPLICA - tablet.MasterTermStartTime = nil - return nil - } - return topo.NewError(topo.NoUpdateNeeded, agent.TabletAlias.String()) - }) - if err != nil { - return err + tablet := agent.Tablet() + if tablet.Type == topodatapb.TabletType_MASTER { + tablet.Type = topodatapb.TabletType_REPLICA + tablet.MasterTermStartTime = nil + agent.updateState(ctx, tablet, "setMasterLocked") } // See if we were replicating at all, and should be replicating. @@ -647,25 +629,14 @@ func (agent *ActionAgent) SlaveWasRestarted(ctx context.Context, parent *topodat // Only change type of former MASTER tablets. // Don't change type of RDONLY - typeChanged := false - if _, err := agent.TopoServer.UpdateTabletFields(ctx, agent.TabletAlias, func(tablet *topodatapb.Tablet) error { - if tablet.Type == topodatapb.TabletType_MASTER { - tablet.Type = topodatapb.TabletType_REPLICA - tablet.MasterTermStartTime = nil - typeChanged = true - return nil - } - return topo.NewError(topo.NoUpdateNeeded, agent.TabletAlias.String()) - }); err != nil { - return err - } - - if typeChanged { - if err := agent.refreshTablet(ctx, "SlaveWasRestarted"); err != nil { - return err - } - agent.runHealthCheckLocked() + tablet := agent.Tablet() + if tablet.Type != topodatapb.TabletType_MASTER { + return nil } + tablet.Type = topodatapb.TabletType_MASTER + tablet.MasterTermStartTime = nil + agent.updateState(ctx, tablet, "SlaveWasRestarted") + agent.runHealthCheckLocked() return nil } diff --git a/go/vt/vttablet/tabletmanager/shard_sync.go b/go/vt/vttablet/tabletmanager/shard_sync.go index 92df4a2f5b6..86c57cc2211 100644 --- a/go/vt/vttablet/tabletmanager/shard_sync.go +++ b/go/vt/vttablet/tabletmanager/shard_sync.go @@ -291,7 +291,10 @@ func (agent *ActionAgent) notifyShardSync() { func (agent *ActionAgent) setMasterTermStartTime(t time.Time) { agent.mutex.Lock() agent._masterTermStartTime = t - agent._replicationDelay = 0 + // Reset replication delay ony if we're the master. + if !t.IsZero() { + agent._replicationDelay = 0 + } agent.mutex.Unlock() // Notify the shard sync loop that the tablet state changed. diff --git a/go/vt/vttablet/tabletmanager/state_change.go b/go/vt/vttablet/tabletmanager/state_change.go index 17a2b768ba2..011d2ad35ea 100644 --- a/go/vt/vttablet/tabletmanager/state_change.go +++ b/go/vt/vttablet/tabletmanager/state_change.go @@ -24,6 +24,7 @@ import ( "strings" "time" + "github.com/golang/protobuf/proto" "golang.org/x/net/context" "vitess.io/vitess/go/event" "vitess.io/vitess/go/trace" @@ -35,6 +36,7 @@ import ( topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/topotools" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/tabletmanager/events" "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" @@ -51,6 +53,8 @@ var ( // vtgate to gracefully redirect traffic elsewhere, before we begin actually // rejecting queries for that target type. gracePeriod = flag.Duration("serving_state_grace_period", 0, "how long to pause after broadcasting health to vtgate, before enforcing a new serving state") + + publishRetryInterval = flag.Duration("publish_retry_interval", 30*time.Second, "how long vttablet waits to retry publishing the tablet record") ) // Query rules from blacklist @@ -154,9 +158,7 @@ func (agent *ActionAgent) refreshTablet(ctx context.Context, reason string) erro tablet = updatedTablet } // Also refresh masterTermStartTime - if tablet.MasterTermStartTime != nil { - agent.setMasterTermStartTime(logutil.ProtoToTime(tablet.MasterTermStartTime)) - } + agent.setMasterTermStartTime(logutil.ProtoToTime(tablet.MasterTermStartTime)) agent.updateState(ctx, tablet, reason) log.Infof("Done with post-action state refresh") return nil @@ -172,6 +174,7 @@ func (agent *ActionAgent) updateState(ctx context.Context, newTablet *topodatapb log.Infof("Running tablet callback because: %v", reason) agent.changeCallback(ctx, oldTablet, newTablet) agent.setTablet(newTablet) + agent.publishState(ctx) event.Dispatch(&events.StateChange{ OldTablet: *oldTablet, NewTablet: *newTablet, @@ -374,3 +377,62 @@ func (agent *ActionAgent) changeCallback(ctx context.Context, oldTablet, newTabl agent.broadcastHealth() } } + +func (agent *ActionAgent) publishState(ctx context.Context) { + agent.pubMu.Lock() + defer agent.pubMu.Unlock() + log.Infof("Publishing state: %v", agent.tablet) + // If retry is in progress, there's nothing to do. + if agent.isPublishing { + return + } + // Common code path: publish immediately. + ctx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout) + defer cancel() + _, err := agent.TopoServer.UpdateTabletFields(ctx, agent.TabletAlias, func(tablet *topodatapb.Tablet) error { + if err := topotools.CheckOwnership(tablet, agent.tablet); err != nil { + log.Error(err) + return topo.NewError(topo.NoUpdateNeeded, "") + } + *tablet = *proto.Clone(agent.tablet).(*topodatapb.Tablet) + return nil + }) + if err != nil { + log.Errorf("Unable to publish state to topo, will keep retrying: %v", err) + agent.isPublishing = true + // Keep retrying until success. + go agent.retryPublish() + } +} + +func (agent *ActionAgent) retryPublish() { + agent.pubMu.Lock() + defer func() { + agent.isPublishing = false + agent.pubMu.Unlock() + }() + + for { + // Retry immediately the first time because the previous failure might have been + // due to an expired context. + ctx, cancel := context.WithTimeout(agent.batchCtx, *topo.RemoteOperationTimeout) + _, err := agent.TopoServer.UpdateTabletFields(ctx, agent.TabletAlias, func(tablet *topodatapb.Tablet) error { + if err := topotools.CheckOwnership(tablet, agent.tablet); err != nil { + log.Error(err) + return topo.NewError(topo.NoUpdateNeeded, "") + } + *tablet = *proto.Clone(agent.tablet).(*topodatapb.Tablet) + return nil + }) + cancel() + if err != nil { + log.Errorf("Unable to publish state to topo, will keep retrying: %v", err) + agent.pubMu.Unlock() + time.Sleep(*publishRetryInterval) + agent.pubMu.Lock() + continue + } + log.Infof("Published state: %v", agent.tablet) + return + } +} diff --git a/go/vt/vttablet/tabletmanager/state_change_test.go b/go/vt/vttablet/tabletmanager/state_change_test.go new file mode 100644 index 00000000000..ae5a1fd18aa --- /dev/null +++ b/go/vt/vttablet/tabletmanager/state_change_test.go @@ -0,0 +1,72 @@ +/* +Copyright 2020 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tabletmanager + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/net/context" +) + +func TestPublishState(t *testing.T) { + defer func(saved time.Duration) { *publishRetryInterval = saved }(*publishRetryInterval) + *publishRetryInterval = 1 * time.Millisecond + + // This flow doesn't test the failure scenario, which + // we can't do using memorytopo, but we do test the retry + // code path. + + ctx := context.Background() + agent := createTestAgent(ctx, t, nil) + ttablet, err := agent.TopoServer.GetTablet(ctx, agent.TabletAlias) + require.NoError(t, err) + assert.Equal(t, agent.Tablet(), ttablet.Tablet) + + tab1 := agent.Tablet() + tab1.Keyspace = "tab1" + agent.setTablet(tab1) + agent.publishState(ctx) + ttablet, err = agent.TopoServer.GetTablet(ctx, agent.TabletAlias) + require.NoError(t, err) + assert.Equal(t, tab1, ttablet.Tablet) + + tab2 := agent.Tablet() + tab2.Keyspace = "tab2" + agent.setTablet(tab2) + agent.retryPublish() + ttablet, err = agent.TopoServer.GetTablet(ctx, agent.TabletAlias) + require.NoError(t, err) + assert.Equal(t, tab2, ttablet.Tablet) + + // If hostname doesn't match, it should not update. + tab3 := agent.Tablet() + tab3.Hostname = "tab3" + agent.setTablet(tab3) + agent.publishState(ctx) + ttablet, err = agent.TopoServer.GetTablet(ctx, agent.TabletAlias) + require.NoError(t, err) + assert.Equal(t, tab2, ttablet.Tablet) + + // Same for retryPublish. + agent.retryPublish() + ttablet, err = agent.TopoServer.GetTablet(ctx, agent.TabletAlias) + require.NoError(t, err) + assert.Equal(t, tab2, ttablet.Tablet) +} diff --git a/go/vt/worker/topo_utils.go b/go/vt/worker/topo_utils.go index a125910a9eb..edf89088bfd 100644 --- a/go/vt/worker/topo_utils.go +++ b/go/vt/worker/topo_utils.go @@ -27,7 +27,6 @@ import ( "golang.org/x/net/context" "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/wrangler" @@ -123,44 +122,12 @@ func FindWorkerTablet(ctx context.Context, wr *wrangler.Wrangler, cleaner *wrang wr.Logger().Infof("Changing tablet %v to '%v'", topoproto.TabletAliasString(tabletAlias), topodatapb.TabletType_DRAINED) shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) - err = wr.ChangeSlaveType(shortCtx, tabletAlias, topodatapb.TabletType_DRAINED) - cancel() - if err != nil { - return nil, err - } - - ourURL := servenv.ListeningURL.String() - wr.Logger().Infof("Adding tag[worker]=%v to tablet %v", ourURL, topoproto.TabletAliasString(tabletAlias)) - shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) - _, err = wr.TopoServer().UpdateTabletFields(shortCtx, tabletAlias, func(tablet *topodatapb.Tablet) error { - if tablet.Tags == nil { - tablet.Tags = make(map[string]string) - } - tablet.Tags["worker"] = ourURL - tablet.Tags["drain_reason"] = "Used by vtworker" - return nil - }) - cancel() - if err != nil { + defer cancel() + if err := wr.ChangeSlaveType(shortCtx, tabletAlias, topodatapb.TabletType_DRAINED); err != nil { return nil, err } - // Using "defer" here because we remove the tag *before* calling - // ChangeSlaveType back, so we need to record this tag change after the change - // slave type change in the cleaner. - defer wrangler.RecordTabletTagAction(cleaner, tabletAlias, "worker", "") - defer wrangler.RecordTabletTagAction(cleaner, tabletAlias, "drain_reason", "") - // Record a clean-up action to take the tablet back to tabletAlias. wrangler.RecordChangeSlaveTypeAction(cleaner, tabletAlias, topodatapb.TabletType_DRAINED, tabletType) - - // We refresh the destination vttablet reloads the worker URL when it reloads the tablet. - shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout) - wr.RefreshTabletState(shortCtx, tabletAlias) - if err != nil { - return nil, err - } - cancel() - return tabletAlias, nil } diff --git a/go/vt/wrangler/cleaner.go b/go/vt/wrangler/cleaner.go index 7a0fd4c7d4a..d21dac9be8b 100644 --- a/go/vt/wrangler/cleaner.go +++ b/go/vt/wrangler/cleaner.go @@ -30,25 +30,16 @@ import ( ) const ( - // - // ChangeSlaveTypeAction CleanerFunction - // // ChangeSlaveTypeActionName is the name of the action to change a slave type // (can be used to find such an action by name) ChangeSlaveTypeActionName = "ChangeSlaveTypeAction" - // - // TabletTagAction CleanerFunction - // + // TabletTagActionName is the name of the Tag action TabletTagActionName = "TabletTagAction" - // - // StartSlaveAction CleanerAction - // + // StartSlaveActionName is the name of the slave start action StartSlaveActionName = "StartSlaveAction" - // - // VReplication CleanerAction - // + // VReplicationActionName is the name of the action to execute VReplication commands VReplicationActionName = "VReplicationAction" ) @@ -147,25 +138,6 @@ func RecordChangeSlaveTypeAction(cleaner *Cleaner, tabletAlias *topodatapb.Table }) } -// RecordTabletTagAction records a new action to set / remove a tag -// into the specified Cleaner -func RecordTabletTagAction(cleaner *Cleaner, tabletAlias *topodatapb.TabletAlias, name, value string) { - cleaner.Record(TabletTagActionName, topoproto.TabletAliasString(tabletAlias), func(ctx context.Context, wr *Wrangler) error { - _, err := wr.TopoServer().UpdateTabletFields(ctx, tabletAlias, func(tablet *topodatapb.Tablet) error { - if tablet.Tags == nil { - tablet.Tags = make(map[string]string) - } - if value != "" { - tablet.Tags[name] = value - } else { - delete(tablet.Tags, name) - } - return nil - }) - return err - }) -} - // RecordStartSlaveAction records a new action to restart binlog replication on a server // into the specified Cleaner func RecordStartSlaveAction(cleaner *Cleaner, tablet *topodatapb.Tablet) {