Skip to content
Merged
4 changes: 2 additions & 2 deletions go/test/endtoend/vreplication/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ func InitCluster(t *testing.T, cellNames []string) *VitessCluster {
globalConfig.topoPort, globalConfig.hostname, globalConfig.tmpDir)
vc.Vtctld = vtctld
assert.NotNil(t, vc.Vtctld)
// use first cell as `-cell` and all cells as `-cells_to_watch`
vc.Vtctld.Setup(cellNames[0], "-cells_to_watch", strings.Join(cellNames, ","))
// use first cell as `-cell`
vc.Vtctld.Setup(cellNames[0])

vc.Vtctl = cluster.VtctlProcessInstance(globalConfig.topoPort, globalConfig.hostname)
assert.NotNil(t, vc.Vtctl)
Expand Down
166 changes: 107 additions & 59 deletions go/vt/discovery/tablet_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ limitations under the License.
package discovery

import (
"fmt"
"math/rand"
"runtime/debug"
"strings"
"time"

"vitess.io/vitess/go/vt/topo/topoproto"
Expand All @@ -34,6 +37,10 @@ import (
"vitess.io/vitess/go/vt/vterrors"
)

var (
tabletPickerRetryDelay = 30 * time.Second
)

// TabletPicker gives a simplified API for picking tablets.
type TabletPicker struct {
ts *topo.Server
Expand All @@ -49,6 +56,21 @@ func NewTabletPicker(ts *topo.Server, cells []string, keyspace, shard, tabletTyp
if err != nil {
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "failed to parse list of tablet types: %v", tabletTypesStr)
}
var missingFields []string
if keyspace == "" {
missingFields = append(missingFields, "Keyspace")
}
if shard == "" {
missingFields = append(missingFields, "Shard")
}
if len(cells) == 0 {
missingFields = append(missingFields, "Cells")
}
if len(missingFields) > 0 {
log.Errorf("missing picker fields %s", debug.Stack())
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION,
fmt.Sprintf("Missing required field(s) for tablet picker: %s", strings.Join(missingFields, ", ")))
}
return &TabletPicker{
ts: ts,
cells: cells,
Expand All @@ -62,84 +84,110 @@ func NewTabletPicker(ts *topo.Server, cells []string, keyspace, shard, tabletTyp
// All tablets that belong to tp.cells are evaluated and one is
// chosen at random
func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Tablet, error) {
candidates := tp.getAllTablets(ctx)
if len(candidates) == 0 {
return nil, vterrors.Errorf(vtrpcpb.Code_NOT_FOUND, "no tablets available for cells:%v, keyspace/shard:%v/%v, tablet types:%v", tp.cells, tp.keyspace, tp.shard, tp.tabletTypes)
}
// keep trying at intervals (tabletPickerRetryDelay) until a tablet is found
// or the context is canceled
for {
idx := 0
// if there is only one candidate we use that, otherwise we find one randomly
if len(candidates) > 1 {
idx = rand.Intn(len(candidates))
select {
case <-ctx.Done():
return nil, vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired")
default:
}
alias := candidates[idx]
// get tablet
ti, err := tp.ts.GetTablet(ctx, alias)
if err != nil {
log.Warningf("unable to get tablet for alias %v", alias)
candidates = append(candidates[:idx], candidates[idx+1:]...)
if len(candidates) == 0 {
break
}
continue
}
if !topoproto.IsTypeInList(ti.Tablet.Type, tp.tabletTypes) {
// tablet is not of one of the desired types
candidates := tp.getMatchingTablets(ctx)
if len(candidates) == 0 {
// if no candidates were found, sleep and try again
log.Infof("No tablet found for streaming, sleeping for %d", tabletPickerRetryDelay)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. I was planning to add something similar.

time.Sleep(tabletPickerRetryDelay)
continue
}

// try to connect to tablet
conn, err := tabletconn.GetDialer()(ti.Tablet, true)
if err != nil {
log.Warningf("unable to connect to tablet for alias %v", alias)
candidates = append(candidates[:idx], candidates[idx+1:]...)
if len(candidates) == 0 {
break
// try at most len(candidate) times to find a healthy tablet
for i := 0; i < len(candidates); i++ {
idx := rand.Intn(len(candidates))
ti := candidates[idx]
// get tablet
// try to connect to tablet
conn, err := tabletconn.GetDialer()(ti.Tablet, true)
if err != nil {
log.Warningf("unable to connect to tablet for alias %v", ti.Alias)
candidates = append(candidates[:idx], candidates[idx+1:]...)
if len(candidates) == 0 {
break
}
continue
}
continue
// OK to use ctx here because it is not actually used by the underlying Close implementation
_ = conn.Close(ctx)
return ti.Tablet, nil
}
_ = conn.Close(ctx)
return ti.Tablet, nil
}
return nil, vterrors.Errorf(vtrpcpb.Code_NOT_FOUND, "can't find any healthy source tablet for keyspace/shard:%v/%v tablet types:%v", tp.keyspace, tp.shard, tp.tabletTypes)
}

func (tp *TabletPicker) getAllTablets(ctx context.Context) []*topodatapb.TabletAlias {
// getMatchingTablets returns a list of TabletInfo for tablets
// that match the cells, keyspace, shard and tabletTypes for this TabletPicker
func (tp *TabletPicker) getMatchingTablets(ctx context.Context) []*topo.TabletInfo {
// Special handling for MASTER tablet type
// Since there is only one master, we ignore cell and find the master
result := make([]*topodatapb.TabletAlias, 0)
aliases := make([]*topodatapb.TabletAlias, 0)
if len(tp.tabletTypes) == 1 && tp.tabletTypes[0] == topodatapb.TabletType_MASTER {
si, err := tp.ts.GetShard(ctx, tp.keyspace, tp.shard)
shortCtx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
defer cancel()
si, err := tp.ts.GetShard(shortCtx, tp.keyspace, tp.shard)
if err != nil {
return result
return nil
}
result = append(result, si.MasterAlias)
return result
}
actualCells := make([]string, 0)
for _, cell := range tp.cells {
// check if cell is actually an alias
// non-blocking read so that this is fast
alias, err := tp.ts.GetCellsAlias(ctx, cell, false)
if err != nil {
// either cellAlias doesn't exist or it isn't a cell alias at all. In that case assume it is a cell
actualCells = append(actualCells, cell)
} else {
actualCells = append(actualCells, alias.Cells...)
aliases = append(aliases, si.MasterAlias)
} else {
actualCells := make([]string, 0)
for _, cell := range tp.cells {
// check if cell is actually an alias
// non-blocking read so that this is fast
shortCtx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
defer cancel()
alias, err := tp.ts.GetCellsAlias(shortCtx, cell, false)
if err != nil {
// either cellAlias doesn't exist or it isn't a cell alias at all. In that case assume it is a cell
actualCells = append(actualCells, cell)
} else {
actualCells = append(actualCells, alias.Cells...)
}
}
}
for _, cell := range actualCells {
sri, err := tp.ts.GetShardReplication(ctx, cell, tp.keyspace, tp.shard)
if err != nil {
log.Warningf("error %v from GetShardReplication for %v %v %v", err, cell, tp.keyspace, tp.shard)
continue
for _, cell := range actualCells {
shortCtx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
defer cancel()
// match cell, keyspace and shard
sri, err := tp.ts.GetShardReplication(shortCtx, cell, tp.keyspace, tp.shard)
if err != nil {
log.Warningf("error %v from GetShardReplication for %v %v %v", err, cell, tp.keyspace, tp.shard)
continue
}

for _, node := range sri.Nodes {
aliases = append(aliases, node.TabletAlias)
}
}
}

for _, node := range sri.Nodes {
result = append(result, node.TabletAlias)
if len(aliases) == 0 {
return nil
}
shortCtx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
defer cancel()
tabletMap, err := tp.ts.GetTabletMap(shortCtx, aliases)
if err != nil {
log.Warningf("error fetching tablets from topo: %v", err)
return nil
}
tablets := make([]*topo.TabletInfo, 0, len(aliases))
for _, tabletAlias := range aliases {
tabletInfo, ok := tabletMap[topoproto.TabletAliasString(tabletAlias)]
if !ok {
// tablet disappeared on us (GetTabletMap ignores
// topo.ErrNoNode), just echo a warning
log.Warningf("failed to load tablet %v", tabletAlias)
} else if topoproto.IsTypeInList(tabletInfo.Type, tp.tabletTypes) {
tablets = append(tablets, tabletInfo)
}
}
return result
return tablets
}

func init() {
Expand Down
45 changes: 40 additions & 5 deletions go/vt/discovery/tablet_picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,20 +275,55 @@ func TestPickUsingCellAlias(t *testing.T) {
assert.True(t, picked2)
}

func TestTabletAppearsDuringSleep(t *testing.T) {
te := newPickerTestEnv(t, []string{"cell"})
tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "replica")
require.NoError(t, err)

tabletPickerRetryDelay = 11 * time.Millisecond
defer func() {
tabletPickerRetryDelay = 30 * time.Second
}()

result := make(chan *topodatapb.Tablet)
// start picker first, then add tablet
go func() {
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond)
defer cancel()
tablet, err := tp.PickForStreaming(ctx)
assert.NoError(t, err)
result <- tablet
}()

want := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true)
defer deleteTablet(te, want)
got := <-result
require.NotNil(t, got, "Tablet should not be nil")
assert.True(t, proto.Equal(want, got), "Pick: %v, want %v", got, want)
}

func TestPickError(t *testing.T) {
te := newPickerTestEnv(t, []string{"cell"})
_, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "badtype")
assert.EqualError(t, err, "failed to parse list of tablet types: badtype")

tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "replica,rdonly")
tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "replica")
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
tabletPickerRetryDelay = 11 * time.Millisecond
defer func() {
tabletPickerRetryDelay = 30 * time.Second
}()
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond)
defer cancel()
// no tablets
_, err = tp.PickForStreaming(ctx)
require.EqualError(t, err, "no tablets available for cells:[cell], keyspace/shard:ks/0, tablet types:[REPLICA RDONLY]")
defer deleteTablet(te, addTablet(te, 200, topodatapb.TabletType_REPLICA, "cell", false, false))
require.EqualError(t, err, "context has expired")
// no tablets of the correct type
defer deleteTablet(te, addTablet(te, 200, topodatapb.TabletType_RDONLY, "cell", true, true))
ctx, cancel = context.WithTimeout(context.Background(), 20*time.Millisecond)
defer cancel()
_, err = tp.PickForStreaming(ctx)
require.EqualError(t, err, "can't find any healthy source tablet for keyspace/shard:ks/0 tablet types:[REPLICA RDONLY]")
require.EqualError(t, err, "context has expired")
}

type pickerTestEnv struct {
Expand Down
48 changes: 27 additions & 21 deletions go/vt/wrangler/traffic_switcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,6 @@ func TestShardMigrateMainflow(t *testing.T) {
t.Errorf("SwitchWrites err: %v, want %v", err, want)
}
verifyQueries(t, tme.allDBClients)

//-------------------------------------------------------------------------------------------------------------------
// Test SwitchWrites cancelation on failure.

Expand Down Expand Up @@ -607,9 +606,9 @@ func TestShardMigrateMainflow(t *testing.T) {
tme.dbTargetClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks' and workflow = 'test'", resultid2, nil)
tme.dbTargetClients[0].addQuery("update _vt.vreplication set state = 'Running', message = '' where id in (1, 2)", &sqltypes.Result{}, nil)
tme.dbTargetClients[1].addQuery("update _vt.vreplication set state = 'Running', message = '' where id in (2)", &sqltypes.Result{}, nil)
tme.dbTargetClients[0].addQuery("select * from _vt.vreplication where id = 1", runningResult(1), nil)
tme.dbTargetClients[0].addQuery("select * from _vt.vreplication where id = 2", runningResult(2), nil)
tme.dbTargetClients[1].addQuery("select * from _vt.vreplication where id = 2", runningResult(2), nil)
tme.dbTargetClients[0].addQuery("select * from _vt.vreplication where id = 1", runningResult2(1), nil)
tme.dbTargetClients[0].addQuery("select * from _vt.vreplication where id = 2", runningResult2(2), nil)
tme.dbTargetClients[1].addQuery("select * from _vt.vreplication where id = 2", runningResult2(2), nil)

deleteReverseReplicaion()
}
Expand All @@ -622,7 +621,6 @@ func TestShardMigrateMainflow(t *testing.T) {
}

verifyQueries(t, tme.allDBClients)

checkServedTypes(t, tme.ts, "ks:-40", 1)
checkServedTypes(t, tme.ts, "ks:40-", 1)
checkServedTypes(t, tme.ts, "ks:-80", 2)
Expand Down Expand Up @@ -656,9 +654,9 @@ func TestShardMigrateMainflow(t *testing.T) {
tme.dbTargetClients[0].addQuery("update _vt.vreplication set state = 'Stopped', message = 'stopped for cutover' where id in (2)", &sqltypes.Result{}, nil)
tme.dbTargetClients[1].addQuery("select id from _vt.vreplication where id = 2", resultid2, nil)
tme.dbTargetClients[1].addQuery("update _vt.vreplication set state = 'Stopped', message = 'stopped for cutover' where id in (2)", &sqltypes.Result{}, nil)
tme.dbTargetClients[0].addQuery("select * from _vt.vreplication where id = 1", stoppedResult(1), nil)
tme.dbTargetClients[1].addQuery("select * from _vt.vreplication where id = 2", stoppedResult(2), nil)
tme.dbTargetClients[0].addQuery("select * from _vt.vreplication where id = 2", stoppedResult(2), nil)
tme.dbTargetClients[0].addQuery("select * from _vt.vreplication where id = 1", stoppedResult2(1), nil)
tme.dbTargetClients[1].addQuery("select * from _vt.vreplication where id = 2", stoppedResult2(2), nil)
tme.dbTargetClients[0].addQuery("select * from _vt.vreplication where id = 2", stoppedResult2(2), nil)
}
waitForCatchup()

Expand All @@ -685,12 +683,12 @@ func TestShardMigrateMainflow(t *testing.T) {
startReverseVReplication := func() {
tme.dbSourceClients[0].addQuery("select id from _vt.vreplication where db_name = 'vt_ks'", resultid34, nil)
tme.dbSourceClients[0].addQuery("update _vt.vreplication set state = 'Running', message = '' where id in (3, 4)", &sqltypes.Result{}, nil)
tme.dbSourceClients[0].addQuery("select * from _vt.vreplication where id = 3", runningResult(3), nil)
tme.dbSourceClients[0].addQuery("select * from _vt.vreplication where id = 4", runningResult(4), nil)
tme.dbSourceClients[0].addQuery("select * from _vt.vreplication where id = 3", runningResult2(3), nil)
tme.dbSourceClients[0].addQuery("select * from _vt.vreplication where id = 4", runningResult2(4), nil)
tme.dbSourceClients[1].addQuery("select id from _vt.vreplication where db_name = 'vt_ks'", resultid34, nil)
tme.dbSourceClients[1].addQuery("update _vt.vreplication set state = 'Running', message = '' where id in (3, 4)", &sqltypes.Result{}, nil)
tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 3", runningResult(3), nil)
tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 4", runningResult(4), nil)
tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 3", runningResult2(3), nil)
tme.dbSourceClients[1].addQuery("select * from _vt.vreplication where id = 4", runningResult2(4), nil)
}
startReverseVReplication()

Expand Down Expand Up @@ -1751,18 +1749,26 @@ func checkIsMasterServing(t *testing.T, ts *topo.Server, keyspaceShard string, w
}
}

func stoppedResult(id int) *sqltypes.Result {
func getResult(id int, state string, keyspace string, shard string) *sqltypes.Result {
return sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|state",
"int64|varchar"),
fmt.Sprintf("%d|Stopped", id),
"id|state|cell|tablet_types|source",
"int64|varchar|varchar|varchar|varchar"),
fmt.Sprintf("%d|%s|cell1|MASTER|keyspace:\"%s\" shard:\"%s\"", id, state, keyspace, shard),
)
}

func stoppedResult2(id int) *sqltypes.Result {
return getResult(id, "Stopped", "ks", "-40")
}

func runningResult2(id int) *sqltypes.Result {
return getResult(id, "Running", "ks", "-40")
}

func stoppedResult(id int) *sqltypes.Result {
return getResult(id, "Stopped", "", "")
}

func runningResult(id int) *sqltypes.Result {
return sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|state",
"int64|varchar"),
fmt.Sprintf("%d|Running", id),
)
return getResult(id, "Running", "", "")
}