Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ require (
github.com/golang/mock v1.3.1
github.com/golang/protobuf v1.3.2
github.com/golang/snappy v0.0.0-20170215233205-553a64147049
github.com/google/btree v1.0.0 // indirect
github.com/gorilla/websocket v0.0.0-20160912153041-2d1e4548da23
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
Expand Down
181 changes: 103 additions & 78 deletions go/vt/proto/topodata/topodata.pb.go

Large diffs are not rendered by default.

22 changes: 20 additions & 2 deletions go/vt/vttablet/tabletmanager/action_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,16 @@ type ActionAgent struct {
// It's only set once in NewActionAgent() and never modified after that.
orc *orcClient

// shardSyncChan is a channel for informing the shard sync goroutine that
// it should wake up and recheck the tablet state, to make sure it and the
// shard record are in sync.
//
// Call agent.notifyShardSync() instead of sending directly to this channel.
shardSyncChan chan struct{}

// shardSyncCancel is the function to stop the background shard sync goroutine.
shardSyncCancel context.CancelFunc

// mutex protects all the following fields (that start with '_'),
// only hold the mutex to update the fields, nothing else.
mutex sync.Mutex
Expand Down Expand Up @@ -199,8 +209,8 @@ type ActionAgent struct {
// replication delay the last time we got it
_replicationDelay time.Duration

// last time we ran TabletExternallyReparented
_tabletExternallyReparentedTime time.Time
// _masterTermStartTime is the time at which our term as master began.
_masterTermStartTime time.Time

// _ignoreHealthErrorExpr can be set by RPC to selectively disable certain
// healthcheck errors. It should only be accessed while holding actionMutex.
Expand Down Expand Up @@ -438,6 +448,9 @@ func (agent *ActionAgent) setTablet(tablet *topodatapb.Tablet) {
agent.mutex.Lock()
agent._tablet = proto.Clone(tablet).(*topodatapb.Tablet)
agent.mutex.Unlock()

// Notify the shard sync loop that the tablet state changed.
agent.notifyShardSync()
}

// Tablet reads the stored Tablet from the agent, protected by mutex.
Expand Down Expand Up @@ -672,6 +685,10 @@ func (agent *ActionAgent) Start(ctx context.Context, mysqlHost string, mysqlPort
startingTablet.Type = topodatapb.TabletType_UNKNOWN
agent.setTablet(startingTablet)

// Start a background goroutine to watch and update the shard record,
// to make sure it and our tablet record are in sync.
agent.startShardSync()

return nil
}

Expand Down Expand Up @@ -699,6 +716,7 @@ func (agent *ActionAgent) Close() {
// while taking lameduck into account. However, this may be useful for tests,
// when you want to clean up an agent immediately.
func (agent *ActionAgent) Stop() {
agent.stopShardSync()
if agent.UpdateStream != nil {
agent.UpdateStream.Disable()
}
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vttablet/tabletmanager/init_tablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,17 +108,17 @@ func (agent *ActionAgent) InitTablet(port, gRPCPort int32) error {
// There's no existing tablet record, so we can assume
// no one has left us a message to step down.
tabletType = topodatapb.TabletType_MASTER
// Update the TER timestamp (current value is 0) because we
// Update the master term start time (current value is 0) because we
// assume that we are actually the MASTER and in case of a tiebreak,
// vtgate should prefer us.
agent.setExternallyReparentedTime(time.Now())
agent.setMasterTermStartTime(time.Now())
case err == nil:
if oldTablet.Type == topodatapb.TabletType_MASTER {
// We're marked as master in the shard record,
// and our existing tablet record agrees.
tabletType = topodatapb.TabletType_MASTER
// Same comment as above. Update tiebreaking timestamp to now.
agent.setExternallyReparentedTime(time.Now())
agent.setMasterTermStartTime(time.Now())
}
default:
return vterrors.Wrap(err, "InitTablet failed to read existing tablet record")
Expand Down
12 changes: 6 additions & 6 deletions go/vt/vttablet/tabletmanager/init_tablet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,8 @@ func TestInitTablet(t *testing.T) {
if string(ti.KeyRange.Start) != "" || string(ti.KeyRange.End) != "\xc0" {
t.Errorf("wrong KeyRange for tablet: %v", ti.KeyRange)
}
if got := agent._tabletExternallyReparentedTime; !got.IsZero() {
t.Fatalf("REPLICA tablet should not have an ExternallyReparentedTimestamp set: %v", got)
if got := agent._masterTermStartTime; !got.IsZero() {
t.Fatalf("REPLICA tablet should not have a MasterTermStartTime set: %v", got)
}

// 2. Update shard's master to our alias, then try to init again.
Expand All @@ -288,7 +288,7 @@ func TestInitTablet(t *testing.T) {
if ti.Type != topodatapb.TabletType_REPLICA {
t.Errorf("wrong tablet type: %v", ti.Type)
}
if got := agent._tabletExternallyReparentedTime; !got.IsZero() {
if got := agent._masterTermStartTime; !got.IsZero() {
t.Fatalf("REPLICA tablet should not have an ExternallyReparentedTimestamp set: %v", got)
}

Expand All @@ -308,7 +308,7 @@ func TestInitTablet(t *testing.T) {
if ti.Type != topodatapb.TabletType_MASTER {
t.Errorf("wrong tablet type: %v", ti.Type)
}
ter1 := agent._tabletExternallyReparentedTime
ter1 := agent._masterTermStartTime
if ter1.IsZero() {
t.Fatalf("MASTER tablet should have an ExternallyReparentedTimestamp set")
}
Expand All @@ -330,7 +330,7 @@ func TestInitTablet(t *testing.T) {
if ti.Type != topodatapb.TabletType_MASTER {
t.Errorf("wrong tablet type: %v", ti.Type)
}
ter2 := agent._tabletExternallyReparentedTime
ter2 := agent._masterTermStartTime
if ter2.IsZero() || !ter2.After(ter1) {
t.Fatalf("After a restart, ExternallyReparentedTimestamp must be set to the current time. Previous timestamp: %v current timestamp: %v", ter1, ter2)
}
Expand All @@ -355,7 +355,7 @@ func TestInitTablet(t *testing.T) {
if len(ti.Tags) != 1 || ti.Tags["aaa"] != "bbb" {
t.Errorf("wrong tablet tags: %v", ti.Tags)
}
ter3 := agent._tabletExternallyReparentedTime
ter3 := agent._masterTermStartTime
if ter3.IsZero() || !ter3.After(ter2) {
t.Fatalf("After a restart, ExternallyReparentedTimestamp must be set to the current time. Previous timestamp: %v current timestamp: %v", ter2, ter3)
}
Expand Down
18 changes: 3 additions & 15 deletions go/vt/vttablet/tabletmanager/rpc_external_reparent.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ func (agent *ActionAgent) TabletExternallyReparented(ctx context.Context, extern
return err
}

// The external failover tool told us that we are still the MASTER. Update the
// timestamp to the current time.
agent.setExternallyReparentedTime(startTime)
// The external failover tool told us that we are still the MASTER.
// Update the timestamp to the current time (start a new term).
agent.setMasterTermStartTime(startTime)

// Create a reusable Reparent event with available info.
ev := &events.Reparent{
Expand Down Expand Up @@ -301,15 +301,3 @@ func (agent *ActionAgent) finalizeTabletExternallyReparented(ctx context.Context
event.DispatchUpdate(ev, "finished")
return nil
}

// setExternallyReparentedTime remembers the last time when we were told we're
// the master.
// If another tablet claims to be master and offers a more recent time,
// that tablet will be trusted over us.
func (agent *ActionAgent) setExternallyReparentedTime(t time.Time) {
agent.mutex.Lock()
defer agent.mutex.Unlock()

agent._tabletExternallyReparentedTime = t
agent._replicationDelay = 0
}
10 changes: 5 additions & 5 deletions go/vt/vttablet/tabletmanager/rpc_external_reparent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,17 @@ func TestTabletExternallyReparentedAlwaysUpdatesTimestamp(t *testing.T) {
if err := agent.TabletExternallyReparented(ctx, "unused_id"); err != nil {
t.Fatal(err)
}
if agent._tabletExternallyReparentedTime.IsZero() {
t.Fatalf("externally_reparented_time should have been updated")
if agent._masterTermStartTime.IsZero() {
t.Fatalf("master_term_start_time should have been updated")
}

// Run RPC again and verify that the timestamp was updated.
ter1 := agent._tabletExternallyReparentedTime
ter1 := agent._masterTermStartTime
if err := agent.TabletExternallyReparented(ctx, "unused_id"); err != nil {
t.Fatal(err)
}
ter2 := agent._tabletExternallyReparentedTime
ter2 := agent._masterTermStartTime
if ter1 == ter2 {
t.Fatalf("subsequent TER call did not update the timestamp: %v = %v", ter1, ter2)
t.Fatalf("subsequent TER call did not update the master_term_start_time: %v = %v", ter1, ter2)
}
}
6 changes: 3 additions & 3 deletions go/vt/vttablet/tabletmanager/rpc_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (agent *ActionAgent) InitMaster(ctx context.Context) (string, error) {
if err := agent.MysqlDaemon.SetReadOnly(false); err != nil {
return "", err
}
agent.setExternallyReparentedTime(startTime)
agent.setMasterTermStartTime(startTime)

// Change our type to master if not already
if _, err := agent.TopoServer.UpdateTabletFields(ctx, agent.TabletAlias, func(tablet *topodatapb.Tablet) error {
Expand Down Expand Up @@ -450,7 +450,7 @@ func (agent *ActionAgent) PromoteSlaveWhenCaughtUp(ctx context.Context, position
if err := agent.MysqlDaemon.SetReadOnly(false); err != nil {
return "", err
}
agent.setExternallyReparentedTime(startTime)
agent.setMasterTermStartTime(startTime)

if _, err := topotools.ChangeType(ctx, agent.TopoServer, agent.TabletAlias, topodatapb.TabletType_MASTER); err != nil {
return "", err
Expand Down Expand Up @@ -671,7 +671,7 @@ func (agent *ActionAgent) PromoteSlave(ctx context.Context) (string, error) {
if err := agent.MysqlDaemon.SetReadOnly(false); err != nil {
return "", err
}
agent.setExternallyReparentedTime(startTime)
agent.setMasterTermStartTime(startTime)

if _, err := topotools.ChangeType(ctx, agent.TopoServer, agent.TabletAlias, topodatapb.TabletType_MASTER); err != nil {
return "", err
Expand Down
Loading