Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions go/vt/vttablet/tabletmanager/tm_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func (tm *TabletManager) Start(tablet *topodatapb.Tablet, healthCheckInterval ti
return nil
}

tm.tmState.Open(tm.BatchCtx)
tm.tmState.Open()
return nil
}

Expand Down Expand Up @@ -473,7 +473,7 @@ func (tm *TabletManager) checkMastership(ctx context.Context, si *topo.ShardInfo
tablet.MasterTermStartTime = oldTablet.MasterTermStartTime
})
} else {
log.Warningf("Shard master alias matches, but existing tablet is not master. Switching to master with the shard's master term start time: %v", oldTablet.MasterTermStartTime)
log.Warningf("Shard master alias matches, but existing tablet is not master. Switching from %v to master with the shard's master term start time: %v", oldTablet.Type, si.MasterTermStartTime)
tm.tmState.UpdateTablet(func(tablet *topodatapb.Tablet) {
tablet.Type = topodatapb.TabletType_MASTER
tablet.MasterTermStartTime = si.MasterTermStartTime
Expand Down Expand Up @@ -599,7 +599,7 @@ func (tm *TabletManager) handleRestore(ctx context.Context) (bool, error) {
if *restoreFromBackup {
go func() {
// Open the state manager after restore is done.
defer tm.tmState.Open(ctx)
defer tm.tmState.Open()

// restoreFromBackup will just be a regular action
// (same as if it was triggered remotely)
Expand Down
65 changes: 65 additions & 0 deletions go/vt/vttablet/tabletmanager/tm_init_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,71 @@ func TestStartDoesNotUpdateReplicationDataForTabletInWrongShard(t *testing.T) {
assert.Equal(t, 0, len(tablets))
}

func TestCheckTabletTypeResets(t *testing.T) {
defer func(saved time.Duration) { rebuildKeyspaceRetryInterval = saved }(rebuildKeyspaceRetryInterval)
rebuildKeyspaceRetryInterval = 10 * time.Millisecond

ctx := context.Background()
cell := "cell1"
ts := memorytopo.NewServer(cell)
alias := &topodatapb.TabletAlias{
Cell: "cell1",
Uid: 1,
}

// 1. Initialize the tablet as REPLICA.
// This will create the respective topology records.
tm := newTestTM(t, ts, 1, "ks", "0")
tablet := tm.Tablet()
ensureSrvKeyspace(t, ts, cell, "ks")
ti, err := ts.GetTablet(ctx, alias)
require.NoError(t, err)
assert.Equal(t, topodatapb.TabletType_REPLICA, ti.Type)
tm.Stop()

// 2. Update tablet record with tabletType RESTORE
_, err = ts.UpdateTabletFields(ctx, alias, func(t *topodatapb.Tablet) error {
t.Type = topodatapb.TabletType_RESTORE
return nil
})
require.NoError(t, err)
err = tm.Start(tablet, 0)
require.NoError(t, err)
assert.Equal(t, tm.tmState.tablet.Type, tm.tmState.displayState.tablet.Type)
ti, err = ts.GetTablet(ctx, alias)
require.NoError(t, err)
// Verify that it changes back to initTabletType
assert.Equal(t, topodatapb.TabletType_REPLICA, ti.Type)

// 3. Update shard's master to our alias, then try to init again.
// (This simulates the case where the MasterAlias in the shard record says
// that we are the master but the tablet record says otherwise. In that case,
// we become master by inheriting the shard record's timestamp.)
now := time.Now()
_, err = ts.UpdateShardFields(ctx, "ks", "0", func(si *topo.ShardInfo) error {
si.MasterAlias = alias
si.MasterTermStartTime = logutil.TimeToProto(now)
// Reassign to now for easier comparison.
now = si.GetMasterTermStartTime()
return nil
})
require.NoError(t, err)
si, err := tm.createKeyspaceShard(ctx)
require.NoError(t, err)
err = tm.checkMastership(ctx, si)
require.NoError(t, err)
assert.Equal(t, tm.tmState.tablet.Type, tm.tmState.displayState.tablet.Type)
err = tm.initTablet(ctx)
require.NoError(t, err)
assert.Equal(t, tm.tmState.tablet.Type, tm.tmState.displayState.tablet.Type)
ti, err = ts.GetTablet(ctx, alias)
require.NoError(t, err)
assert.Equal(t, topodatapb.TabletType_MASTER, ti.Type)
ter0 := ti.GetMasterTermStartTime()
assert.Equal(t, now, ter0)
tm.Stop()
}

func newTestTM(t *testing.T, ts *topo.Server, uid int, keyspace, shard string) *TabletManager {
t.Helper()
ctx := context.Background()
Expand Down
11 changes: 7 additions & 4 deletions go/vt/vttablet/tabletmanager/tm_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,26 +68,28 @@ type tmState struct {
}

func newTMState(tm *TabletManager, tablet *topodatapb.Tablet) *tmState {
ctx, cancel := context.WithCancel(tm.BatchCtx)
return &tmState{
tm: tm,
displayState: displayState{
tablet: proto.Clone(tablet).(*topodatapb.Tablet),
},
tablet: tablet,
ctx: ctx,
cancel: cancel,
}
}

func (ts *tmState) Open(ctx context.Context) {
func (ts *tmState) Open() {
ts.mu.Lock()
defer ts.mu.Unlock()
if ts.isOpen {
return
}

ts.ctx, ts.cancel = context.WithCancel(ctx)
ts.isOpen = true
ts.updateLocked(ts.ctx)
ts.publishStateLocked(ctx)
ts.publishStateLocked(ts.ctx)
}

func (ts *tmState) Close() {
Expand Down Expand Up @@ -192,18 +194,19 @@ func (ts *tmState) UpdateTablet(update func(tablet *topodatapb.Tablet)) {
ts.mu.Lock()
defer ts.mu.Unlock()
update(ts.tablet)
ts.publishForDisplay()
}

func (ts *tmState) updateLocked(ctx context.Context) {
span, ctx := trace.NewSpan(ctx, "tmState.update")
defer span.Finish()
ts.publishForDisplay()

if !ts.isOpen {
return
}

terTime := logutil.ProtoToTime(ts.tablet.MasterTermStartTime)
ts.publishForDisplay()

// Disable TabletServer first so the nonserving state gets advertised
// before other services are shutdown.
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/tm_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestStateOpenClose(t *testing.T) {
savedCtx := tm.tmState.ctx
tm.tmState.mu.Unlock()

tm.tmState.Open(context.Background())
tm.tmState.Open()

tm.tmState.mu.Lock()
assert.Equal(t, savedCtx, tm.tmState.ctx)
Expand Down