diff --git a/go/test/endtoend/vreplication/cluster.go b/go/test/endtoend/vreplication/cluster.go index 89198a2281b..34a90cd5640 100644 --- a/go/test/endtoend/vreplication/cluster.go +++ b/go/test/endtoend/vreplication/cluster.go @@ -124,8 +124,8 @@ func InitCluster(t *testing.T, cellNames []string) *VitessCluster { globalConfig.topoPort, globalConfig.hostname, globalConfig.tmpDir) vc.Vtctld = vtctld assert.NotNil(t, vc.Vtctld) - // use first cell as `-cell` and all cells as `-cells_to_watch` - vc.Vtctld.Setup(cellNames[0], "-cells_to_watch", strings.Join(cellNames, ",")) + // use first cell as `-cell` + vc.Vtctld.Setup(cellNames[0]) vc.Vtctl = cluster.VtctlProcessInstance(globalConfig.topoPort, globalConfig.hostname) assert.NotNil(t, vc.Vtctl) diff --git a/go/vt/discovery/healthcheck_test.go b/go/vt/discovery/healthcheck_test.go index e1bc0325cb2..98bb5c57aea 100644 --- a/go/vt/discovery/healthcheck_test.go +++ b/go/vt/discovery/healthcheck_test.go @@ -48,6 +48,9 @@ import ( topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) +var connMap map[string]*fakeConn +var connMapMu sync.Mutex + func init() { tabletconn.RegisterDialer("fake_gateway", tabletDialer) @@ -55,6 +58,7 @@ func init() { if err := flag.Set("tablet_protocol", "fake_gateway"); err != nil { log.Errorf("failed to set flag \"tablet_protocol\" to \"fake_gateway\":%v", err) } + connMap = make(map[string]*fakeConn) } func TestHealthCheck(t *testing.T) { @@ -85,9 +89,10 @@ func TestHealthCheck(t *testing.T) { mustMatch(t, want, result, "Wrong TabletHealth data") shr := &querypb.StreamHealthResponse{ - TabletAlias: tablet.Alias, - Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA}, - Serving: true, + TabletAlias: tablet.Alias, + Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA}, + Serving: true, + TabletExternallyReparentedTimestamp: 0, RealtimeStats: &querypb.RealtimeStats{SecondsBehindMaster: 1, CpuUsage: 0.5}, } @@ -323,10 +328,11 @@ func TestHealthCheckCloseWaitsForGoRoutines(t *testing.T) { RealtimeStats: &querypb.RealtimeStats{SecondsBehindMaster: 1, CpuUsage: 0.2}, } want = &TabletHealth{ - Tablet: tablet, - Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA}, - Serving: true, - Stats: &querypb.RealtimeStats{SecondsBehindMaster: 1, CpuUsage: 0.2}, + Tablet: tablet, + Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA}, + Serving: true, + Stats: &querypb.RealtimeStats{SecondsBehindMaster: 1, CpuUsage: 0.2}, + MasterTermStartTime: 0, } input <- shr @@ -549,11 +555,13 @@ func TestGetHealthyTablets(t *testing.T) { // second tablet turns into a master shr2 = &querypb.StreamHealthResponse{ - TabletAlias: tablet2.Alias, - Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_MASTER}, - Serving: true, + TabletAlias: tablet2.Alias, + Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_MASTER}, + Serving: true, + TabletExternallyReparentedTimestamp: 10, - RealtimeStats: &querypb.RealtimeStats{SecondsBehindMaster: 0, CpuUsage: 0.2}, + + RealtimeStats: &querypb.RealtimeStats{SecondsBehindMaster: 0, CpuUsage: 0.2}, } input2 <- shr2 // wait for result @@ -766,6 +774,9 @@ func TestDebugURLFormatting(t *testing.T) { } func tabletDialer(tablet *topodatapb.Tablet, _ grpcclient.FailFast) (queryservice.QueryService, error) { + connMapMu.Lock() + defer connMapMu.Unlock() + key := TabletToMapKey(tablet) if qs, ok := connMap[key]; ok { return qs, nil @@ -794,6 +805,8 @@ type fakeConn struct { } func createFakeConn(tablet *topodatapb.Tablet, c chan *querypb.StreamHealthResponse) *fakeConn { + connMapMu.Lock() + defer connMapMu.Unlock() key := TabletToMapKey(tablet) conn := &fakeConn{ QueryService: fakes.ErrorQueryService, @@ -866,6 +879,8 @@ func createFixedHealthConn(tablet *topodatapb.Tablet, fixedResult *querypb.Strea tablet: tablet, fixedResult: fixedResult, } + connMapMu.Lock() + defer connMapMu.Unlock() connMap[key] = conn return conn } diff --git a/go/vt/discovery/legacy_healthcheck_flaky_test.go b/go/vt/discovery/legacy_healthcheck_flaky_test.go index 733b8392fcc..af73844746c 100644 --- a/go/vt/discovery/legacy_healthcheck_flaky_test.go +++ b/go/vt/discovery/legacy_healthcheck_flaky_test.go @@ -26,31 +26,13 @@ import ( "testing" "time" - "vitess.io/vitess/go/vt/log" - "golang.org/x/net/context" - "vitess.io/vitess/go/vt/grpcclient" - "vitess.io/vitess/go/vt/status" - "vitess.io/vitess/go/vt/topo" - "vitess.io/vitess/go/vt/vttablet/queryservice" - "vitess.io/vitess/go/vt/vttablet/tabletconn" - querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/status" + "vitess.io/vitess/go/vt/topo" ) -var connMap map[string]*fakeConn - -func init() { - tabletconn.RegisterDialer("fake_discovery", discoveryDialer) - - //log error - if err := flag.Set("tablet_protocol", "fake_discovery"); err != nil { - log.Errorf("flag.Set(\"tablet_protocol\", \"fake_discovery\") failed : %v", err) - } - connMap = make(map[string]*fakeConn) -} - func testChecksum(t *testing.T, want, got int64) { t.Helper() if want != got { @@ -88,18 +70,20 @@ func TestLegacyHealthCheck(t *testing.T) { // one tablet after receiving a StreamHealthResponse shr := &querypb.StreamHealthResponse{ - Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_MASTER}, - Serving: true, + Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_MASTER}, + Serving: true, + TabletExternallyReparentedTimestamp: 10, RealtimeStats: &querypb.RealtimeStats{SecondsBehindMaster: 1, CpuUsage: 0.2}, } want = &LegacyTabletStats{ - Key: "a,vt:1", - Tablet: tablet, - Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_MASTER}, - Up: true, - Serving: true, - Stats: &querypb.RealtimeStats{SecondsBehindMaster: 1, CpuUsage: 0.2}, + Key: "a,vt:1", + Tablet: tablet, + Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_MASTER}, + Up: true, + Serving: true, + Stats: &querypb.RealtimeStats{SecondsBehindMaster: 1, CpuUsage: 0.2}, + TabletExternallyReparentedTimestamp: 10, } input <- shr @@ -119,12 +103,13 @@ func TestLegacyHealthCheck(t *testing.T) { Cell: "cell", Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_MASTER}, TabletsStats: LegacyTabletStatsList{{ - Key: "a,vt:1", - Tablet: tablet, - Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_MASTER}, - Up: true, - Serving: true, - Stats: &querypb.RealtimeStats{SecondsBehindMaster: 1, CpuUsage: 0.2}, + Key: "a,vt:1", + Tablet: tablet, + Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_MASTER}, + Up: true, + Serving: true, + Stats: &querypb.RealtimeStats{SecondsBehindMaster: 1, CpuUsage: 0.2}, + TabletExternallyReparentedTimestamp: 10, }}, }} @@ -135,10 +120,12 @@ func TestLegacyHealthCheck(t *testing.T) { // TabletType changed, should get both old and new event shr = &querypb.StreamHealthResponse{ - Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA}, - Serving: true, + Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA}, + Serving: true, + TabletExternallyReparentedTimestamp: 0, - RealtimeStats: &querypb.RealtimeStats{SecondsBehindMaster: 1, CpuUsage: 0.5}, + + RealtimeStats: &querypb.RealtimeStats{SecondsBehindMaster: 1, CpuUsage: 0.5}, } input <- shr t.Logf(`input <- {{Keyspace: "k", Shard: "s", TabletType: REPLICA}, Serving: true, TabletExternallyReparentedTimestamp: 0, {SecondsBehindMaster: 1, CpuUsage: 0.5}}`) @@ -658,11 +645,3 @@ func newListener() *listener { func (l *listener) StatsUpdate(ts *LegacyTabletStats) { l.output <- ts } - -func discoveryDialer(tablet *topodatapb.Tablet, failFast grpcclient.FailFast) (queryservice.QueryService, error) { - key := TabletToMapKey(tablet) - if qs, ok := connMap[key]; ok { - return qs, nil - } - return nil, fmt.Errorf("tablet %v not found", key) -} diff --git a/go/vt/discovery/tablet_picker.go b/go/vt/discovery/tablet_picker.go index 6b760c42f02..27362382b9c 100644 --- a/go/vt/discovery/tablet_picker.go +++ b/go/vt/discovery/tablet_picker.go @@ -17,7 +17,10 @@ limitations under the License. package discovery import ( + "fmt" "math/rand" + "strings" + "sync" "time" "vitess.io/vitess/go/vt/topo/topoproto" @@ -34,6 +37,25 @@ import ( "vitess.io/vitess/go/vt/vterrors" ) +var ( + tabletPickerRetryDelay = 30 * time.Second + muTabletPickerRetryDelay sync.Mutex +) + +// GetTabletPickerRetryDelay synchronizes changes to tabletPickerRetryDelay. Used in tests only at the moment +func GetTabletPickerRetryDelay() time.Duration { + muTabletPickerRetryDelay.Lock() + defer muTabletPickerRetryDelay.Unlock() + return tabletPickerRetryDelay +} + +// SetTabletPickerRetryDelay synchronizes reads for tabletPickerRetryDelay. Used in tests only at the moment +func SetTabletPickerRetryDelay(delay time.Duration) { + muTabletPickerRetryDelay.Lock() + defer muTabletPickerRetryDelay.Unlock() + tabletPickerRetryDelay = delay +} + // TabletPicker gives a simplified API for picking tablets. type TabletPicker struct { ts *topo.Server @@ -49,6 +71,21 @@ func NewTabletPicker(ts *topo.Server, cells []string, keyspace, shard, tabletTyp if err != nil { return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "failed to parse list of tablet types: %v", tabletTypesStr) } + var missingFields []string + if keyspace == "" { + missingFields = append(missingFields, "Keyspace") + } + if shard == "" { + missingFields = append(missingFields, "Shard") + } + if len(cells) == 0 { + missingFields = append(missingFields, "Cells") + } + if len(missingFields) > 0 { + //log.Errorf("missing picker fields %s", debug.Stack()) //FIXME: remove after all tests run + return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, + fmt.Sprintf("Missing required field(s) for tablet picker: %s", strings.Join(missingFields, ", "))) + } return &TabletPicker{ ts: ts, cells: cells, @@ -62,84 +99,112 @@ func NewTabletPicker(ts *topo.Server, cells []string, keyspace, shard, tabletTyp // All tablets that belong to tp.cells are evaluated and one is // chosen at random func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Tablet, error) { - candidates := tp.getAllTablets(ctx) - if len(candidates) == 0 { - return nil, vterrors.Errorf(vtrpcpb.Code_NOT_FOUND, "no tablets available for cells:%v, keyspace/shard:%v/%v, tablet types:%v", tp.cells, tp.keyspace, tp.shard, tp.tabletTypes) - } + // keep trying at intervals (tabletPickerRetryDelay) until a tablet is found + // or the context is canceled for { - idx := 0 - // if there is only one candidate we use that, otherwise we find one randomly - if len(candidates) > 1 { - idx = rand.Intn(len(candidates)) - } - alias := candidates[idx] - // get tablet - ti, err := tp.ts.GetTablet(ctx, alias) - if err != nil { - log.Warningf("unable to get tablet for alias %v", alias) - candidates = append(candidates[:idx], candidates[idx+1:]...) - if len(candidates) == 0 { - break - } - continue + select { + case <-ctx.Done(): + return nil, vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired") + default: } - if !topoproto.IsTypeInList(ti.Tablet.Type, tp.tabletTypes) { - // tablet is not of one of the desired types + candidates := tp.getMatchingTablets(ctx) + if len(candidates) == 0 { + // if no candidates were found, sleep and try again + log.Infof("No tablet found for streaming, sleeping for %d seconds", int(GetTabletPickerRetryDelay()/1e9)) + time.Sleep(GetTabletPickerRetryDelay()) continue } - - // try to connect to tablet - conn, err := tabletconn.GetDialer()(ti.Tablet, true) - if err != nil { - log.Warningf("unable to connect to tablet for alias %v", alias) - candidates = append(candidates[:idx], candidates[idx+1:]...) - if len(candidates) == 0 { - break + // try at most len(candidate) times to find a healthy tablet + for i := 0; i < len(candidates); i++ { + idx := rand.Intn(len(candidates)) + ti := candidates[idx] + // get tablet + // try to connect to tablet + conn, err := tabletconn.GetDialer()(ti.Tablet, true) + if err != nil { + log.Warningf("unable to connect to tablet for alias %v", ti.Alias) + candidates = append(candidates[:idx], candidates[idx+1:]...) + if len(candidates) == 0 { + break + } + continue } - continue + // OK to use ctx here because it is not actually used by the underlying Close implementation + _ = conn.Close(ctx) + return ti.Tablet, nil } - _ = conn.Close(ctx) - return ti.Tablet, nil } - return nil, vterrors.Errorf(vtrpcpb.Code_NOT_FOUND, "can't find any healthy source tablet for keyspace/shard:%v/%v tablet types:%v", tp.keyspace, tp.shard, tp.tabletTypes) } -func (tp *TabletPicker) getAllTablets(ctx context.Context) []*topodatapb.TabletAlias { +// getMatchingTablets returns a list of TabletInfo for tablets +// that match the cells, keyspace, shard and tabletTypes for this TabletPicker +func (tp *TabletPicker) getMatchingTablets(ctx context.Context) []*topo.TabletInfo { // Special handling for MASTER tablet type // Since there is only one master, we ignore cell and find the master - result := make([]*topodatapb.TabletAlias, 0) + aliases := make([]*topodatapb.TabletAlias, 0) if len(tp.tabletTypes) == 1 && tp.tabletTypes[0] == topodatapb.TabletType_MASTER { - si, err := tp.ts.GetShard(ctx, tp.keyspace, tp.shard) + shortCtx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout) + defer cancel() + si, err := tp.ts.GetShard(shortCtx, tp.keyspace, tp.shard) if err != nil { - return result + return nil } - result = append(result, si.MasterAlias) - return result - } - actualCells := make([]string, 0) - for _, cell := range tp.cells { - // check if cell is actually an alias - // non-blocking read so that this is fast - alias, err := tp.ts.GetCellsAlias(ctx, cell, false) - if err != nil { - // either cellAlias doesn't exist or it isn't a cell alias at all. In that case assume it is a cell - actualCells = append(actualCells, cell) - } else { - actualCells = append(actualCells, alias.Cells...) + aliases = append(aliases, si.MasterAlias) + } else { + actualCells := make([]string, 0) + for _, cell := range tp.cells { + // check if cell is actually an alias + // non-blocking read so that this is fast + shortCtx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout) + defer cancel() + alias, err := tp.ts.GetCellsAlias(shortCtx, cell, false) + if err != nil { + // either cellAlias doesn't exist or it isn't a cell alias at all. In that case assume it is a cell + actualCells = append(actualCells, cell) + } else { + actualCells = append(actualCells, alias.Cells...) + } } - } - for _, cell := range actualCells { - sri, err := tp.ts.GetShardReplication(ctx, cell, tp.keyspace, tp.shard) - if err != nil { - log.Warningf("error %v from GetShardReplication for %v %v %v", err, cell, tp.keyspace, tp.shard) - continue + for _, cell := range actualCells { + shortCtx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout) + defer cancel() + // match cell, keyspace and shard + sri, err := tp.ts.GetShardReplication(shortCtx, cell, tp.keyspace, tp.shard) + if err != nil { + //log.Errorf("missing shard in topo %s", debug.Stack()) //FIXME: remove after all tests run + + //log.Warningf("error %v from GetShardReplication for %v %v %v", err, cell, tp.keyspace, tp.shard) + continue + } + + for _, node := range sri.Nodes { + aliases = append(aliases, node.TabletAlias) + } } + } - for _, node := range sri.Nodes { - result = append(result, node.TabletAlias) + if len(aliases) == 0 { + return nil + } + shortCtx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout) + defer cancel() + tabletMap, err := tp.ts.GetTabletMap(shortCtx, aliases) + if err != nil { + log.Warningf("error fetching tablets from topo: %v", err) + return nil + } + tablets := make([]*topo.TabletInfo, 0, len(aliases)) + for _, tabletAlias := range aliases { + tabletInfo, ok := tabletMap[topoproto.TabletAliasString(tabletAlias)] + if !ok { + // tablet disappeared on us (GetTabletMap ignores + // topo.ErrNoNode), just echo a warning + log.Warningf("failed to load tablet %v", tabletAlias) + } else if topoproto.IsTypeInList(tabletInfo.Type, tp.tabletTypes) { + tablets = append(tablets, tabletInfo) } } - return result + return tablets } func init() { diff --git a/go/vt/discovery/tablet_picker_test.go b/go/vt/discovery/tablet_picker_test.go index 76b30703545..b1e26898d20 100644 --- a/go/vt/discovery/tablet_picker_test.go +++ b/go/vt/discovery/tablet_picker_test.go @@ -275,20 +275,58 @@ func TestPickUsingCellAlias(t *testing.T) { assert.True(t, picked2) } +func TestTabletAppearsDuringSleep(t *testing.T) { + te := newPickerTestEnv(t, []string{"cell"}) + tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "replica") + require.NoError(t, err) + + delay := GetTabletPickerRetryDelay() + defer func() { + SetTabletPickerRetryDelay(delay) + }() + SetTabletPickerRetryDelay(11 * time.Millisecond) + + result := make(chan *topodatapb.Tablet) + // start picker first, then add tablet + go func() { + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond) + defer cancel() + tablet, err := tp.PickForStreaming(ctx) + assert.NoError(t, err) + result <- tablet + }() + + want := addTablet(te, 100, topodatapb.TabletType_REPLICA, "cell", true, true) + defer deleteTablet(te, want) + got := <-result + require.NotNil(t, got, "Tablet should not be nil") + assert.True(t, proto.Equal(want, got), "Pick: %v, want %v", got, want) +} + func TestPickError(t *testing.T) { te := newPickerTestEnv(t, []string{"cell"}) _, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "badtype") assert.EqualError(t, err, "failed to parse list of tablet types: badtype") - tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "replica,rdonly") + tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "replica") require.NoError(t, err) - ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + delay := GetTabletPickerRetryDelay() + defer func() { + SetTabletPickerRetryDelay(delay) + }() + SetTabletPickerRetryDelay(11 * time.Millisecond) + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond) defer cancel() + // no tablets _, err = tp.PickForStreaming(ctx) - require.EqualError(t, err, "no tablets available for cells:[cell], keyspace/shard:ks/0, tablet types:[REPLICA RDONLY]") - defer deleteTablet(te, addTablet(te, 200, topodatapb.TabletType_REPLICA, "cell", false, false)) + require.EqualError(t, err, "context has expired") + // no tablets of the correct type + defer deleteTablet(te, addTablet(te, 200, topodatapb.TabletType_RDONLY, "cell", true, true)) + ctx, cancel = context.WithTimeout(context.Background(), 20*time.Millisecond) + defer cancel() _, err = tp.PickForStreaming(ctx) - require.EqualError(t, err, "can't find any healthy source tablet for keyspace/shard:ks/0 tablet types:[REPLICA RDONLY]") + require.EqualError(t, err, "context has expired") } type pickerTestEnv struct { diff --git a/go/vt/worker/legacy_split_clone_test.go b/go/vt/worker/legacy_split_clone_test.go index 35fd5e9190f..5084273fd73 100644 --- a/go/vt/worker/legacy_split_clone_test.go +++ b/go/vt/worker/legacy_split_clone_test.go @@ -26,6 +26,8 @@ import ( "testing" "time" + "vitess.io/vitess/go/vt/discovery" + "golang.org/x/net/context" "vitess.io/vitess/go/mysql" @@ -179,7 +181,8 @@ func (tc *legacySplitCloneTestCase) setUp(v3 bool) { qs := fakes.NewStreamHealthQueryService(sourceRdonly.Target()) qs.AddDefaultHealthResponse() grpcqueryservice.Register(sourceRdonly.RPCServer, &legacyTestQueryService{ - t: tc.t, + t: tc.t, + StreamHealthQueryService: qs, }) } @@ -295,6 +298,12 @@ func (sq *legacyTestQueryService) StreamExecute(ctx context.Context, target *que } func TestLegacySplitCloneV2(t *testing.T) { + delay := discovery.GetTabletPickerRetryDelay() + defer func() { + discovery.SetTabletPickerRetryDelay(delay) + }() + discovery.SetTabletPickerRetryDelay(5 * time.Millisecond) + tc := &legacySplitCloneTestCase{t: t} tc.setUp(false /* v3 */) defer tc.tearDown() @@ -306,6 +315,12 @@ func TestLegacySplitCloneV2(t *testing.T) { } func TestLegacySplitCloneV2_Throttled(t *testing.T) { + delay := discovery.GetTabletPickerRetryDelay() + defer func() { + discovery.SetTabletPickerRetryDelay(delay) + }() + discovery.SetTabletPickerRetryDelay(5 * time.Millisecond) + tc := &legacySplitCloneTestCase{t: t} tc.setUp(false /* v3 */) defer tc.tearDown() @@ -345,6 +360,12 @@ func TestLegacySplitCloneV2_Throttled(t *testing.T) { // TestLegacySplitCloneV2 with the additional twist that the destination masters // fail the first write because they are read-only and succeed after that. func TestLegacySplitCloneV2_RetryDueToReadonly(t *testing.T) { + delay := discovery.GetTabletPickerRetryDelay() + defer func() { + discovery.SetTabletPickerRetryDelay(delay) + }() + discovery.SetTabletPickerRetryDelay(5 * time.Millisecond) + tc := &legacySplitCloneTestCase{t: t} tc.setUp(false /* v3 */) defer tc.tearDown() @@ -374,6 +395,12 @@ func TestLegacySplitCloneV2_RetryDueToReadonly(t *testing.T) { // even in a period where no MASTER tablet is available according to the // HealthCheck instance. func TestLegacySplitCloneV2_NoMasterAvailable(t *testing.T) { + delay := discovery.GetTabletPickerRetryDelay() + defer func() { + discovery.SetTabletPickerRetryDelay(delay) + }() + discovery.SetTabletPickerRetryDelay(5 * time.Millisecond) + tc := &legacySplitCloneTestCase{t: t} tc.setUp(false /* v3 */) defer tc.tearDown() @@ -447,6 +474,12 @@ func TestLegacySplitCloneV2_NoMasterAvailable(t *testing.T) { } func TestLegacySplitCloneV3(t *testing.T) { + delay := discovery.GetTabletPickerRetryDelay() + defer func() { + discovery.SetTabletPickerRetryDelay(delay) + }() + discovery.SetTabletPickerRetryDelay(5 * time.Millisecond) + tc := &legacySplitCloneTestCase{t: t} tc.setUp(true /* v3 */) defer tc.tearDown() diff --git a/go/vt/worker/multi_split_diff_test.go b/go/vt/worker/multi_split_diff_test.go index 0b9c0b85f96..6561ccffebc 100644 --- a/go/vt/worker/multi_split_diff_test.go +++ b/go/vt/worker/multi_split_diff_test.go @@ -22,6 +22,8 @@ import ( "testing" "time" + "vitess.io/vitess/go/vt/discovery" + "golang.org/x/net/context" "vitess.io/vitess/go/sqltypes" @@ -277,7 +279,8 @@ func testMultiSplitDiff(t *testing.T, v3 bool) { qs := fakes.NewStreamHealthQueryService(sourceRdonly.Target()) qs.AddDefaultHealthResponse() grpcqueryservice.Register(sourceRdonly.RPCServer, &msdSourceTabletServer{ - t: t, + t: t, + StreamHealthQueryService: qs, excludedTable: excludedTable, v3: v3, @@ -288,7 +291,8 @@ func testMultiSplitDiff(t *testing.T, v3 bool) { qs := fakes.NewStreamHealthQueryService(destRdonly.Target()) qs.AddDefaultHealthResponse() grpcqueryservice.Register(destRdonly.RPCServer, &msdDestinationTabletServer{ - t: t, + t: t, + StreamHealthQueryService: qs, excludedTable: excludedTable, shardIndex: 0, @@ -299,7 +303,8 @@ func testMultiSplitDiff(t *testing.T, v3 bool) { qs := fakes.NewStreamHealthQueryService(destRdonly.Target()) qs.AddDefaultHealthResponse() grpcqueryservice.Register(destRdonly.RPCServer, &msdDestinationTabletServer{ - t: t, + t: t, + StreamHealthQueryService: qs, excludedTable: excludedTable, shardIndex: 1, @@ -333,5 +338,11 @@ func TestMultiSplitDiffv2(t *testing.T) { } func TestMultiSplitDiffv3(t *testing.T) { + delay := discovery.GetTabletPickerRetryDelay() + defer func() { + discovery.SetTabletPickerRetryDelay(delay) + }() + discovery.SetTabletPickerRetryDelay(5 * time.Millisecond) + testMultiSplitDiff(t, true) } diff --git a/go/vt/worker/split_clone_flaky_test.go b/go/vt/worker/split_clone_flaky_test.go index ce287485d79..ff2b9b8af42 100644 --- a/go/vt/worker/split_clone_flaky_test.go +++ b/go/vt/worker/split_clone_flaky_test.go @@ -26,6 +26,8 @@ import ( "testing" "time" + "vitess.io/vitess/go/vt/discovery" + "vitess.io/vitess/go/vt/vtgate/evalengine" "golang.org/x/net/context" @@ -335,15 +337,16 @@ func newTestQueryService(t *testing.T, target querypb.Target, shqs *fakes.Stream fields = v3Fields } return &testQueryService{ - t: t, - target: target, + t: t, + target: target, + shardIndex: shardIndex, + shardCount: shardCount, + alias: alias, + omitKeyspaceID: omitKeyspaceID, + fields: fields, + forceError: make(map[int64]int), + StreamHealthQueryService: shqs, - shardIndex: shardIndex, - shardCount: shardCount, - alias: alias, - omitKeyspaceID: omitKeyspaceID, - fields: fields, - forceError: make(map[int64]int), } } @@ -521,6 +524,12 @@ var v3Fields = []*querypb.Field{ // TestSplitCloneV2_Offline tests the offline phase with an empty destination. func TestSplitCloneV2_Offline(t *testing.T) { + delay := discovery.GetTabletPickerRetryDelay() + defer func() { + discovery.SetTabletPickerRetryDelay(delay) + }() + discovery.SetTabletPickerRetryDelay(5 * time.Millisecond) + tc := &splitCloneTestCase{t: t} tc.setUp(false /* v3 */) defer tc.tearDown() @@ -536,6 +545,12 @@ func TestSplitCloneV2_Offline(t *testing.T) { // --source_reader_count=10, at most 10 out of the 1000 chunk pipeplines will // get processed concurrently while the other pending ones are blocked. func TestSplitCloneV2_Offline_HighChunkCount(t *testing.T) { + delay := discovery.GetTabletPickerRetryDelay() + defer func() { + discovery.SetTabletPickerRetryDelay(delay) + }() + discovery.SetTabletPickerRetryDelay(5 * time.Millisecond) + tc := &splitCloneTestCase{t: t} tc.setUpWithConcurrency(false /* v3 */, 10, 5 /* writeQueryMaxRows */, 1000 /* rowsCount */) defer tc.tearDown() @@ -559,6 +574,12 @@ func TestSplitCloneV2_Offline_HighChunkCount(t *testing.T) { // TestSplitCloneV2_Offline but forces SplitClone to restart the streaming // query on the source before reading the last row. func TestSplitCloneV2_Offline_RestartStreamingQuery(t *testing.T) { + delay := discovery.GetTabletPickerRetryDelay() + defer func() { + discovery.SetTabletPickerRetryDelay(delay) + }() + discovery.SetTabletPickerRetryDelay(5 * time.Millisecond) + tc := &splitCloneTestCase{t: t} tc.setUp(false /* v3 */) defer tc.tearDown() @@ -600,6 +621,12 @@ func TestSplitCloneV2_Offline_RestartStreamingQuery(t *testing.T) { // TestSplitCloneV2_Offline_RestartStreamingQuery. However, the first restart // of the streaming query does not succeed here and instead vtworker will fail. func TestSplitCloneV2_Offline_FailOverStreamingQuery_NotAllowed(t *testing.T) { + delay := discovery.GetTabletPickerRetryDelay() + defer func() { + discovery.SetTabletPickerRetryDelay(delay) + }() + discovery.SetTabletPickerRetryDelay(5 * time.Millisecond) + tc := &splitCloneTestCase{t: t} tc.setUpWithConcurrency(false /* v3 */, 1, 10, splitCloneTestRowsCount) defer tc.tearDown() @@ -640,6 +667,12 @@ func TestSplitCloneV2_Offline_FailOverStreamingQuery_NotAllowed(t *testing.T) { // query on the source *and* failover to a different source tablet before // reading the last row. func TestSplitCloneV2_Online_FailOverStreamingQuery(t *testing.T) { + delay := discovery.GetTabletPickerRetryDelay() + defer func() { + discovery.SetTabletPickerRetryDelay(delay) + }() + discovery.SetTabletPickerRetryDelay(5 * time.Millisecond) + tc := &splitCloneTestCase{t: t} tc.setUpWithConcurrency(false /* v3 */, 1, 10, splitCloneTestRowsCount) defer tc.tearDown() @@ -695,6 +728,12 @@ func TestSplitCloneV2_Online_FailOverStreamingQuery(t *testing.T) { // restartable_result_reader.go where we keep retrying while no tablet may be // available. func TestSplitCloneV2_Online_TabletsUnavailableDuringRestart(t *testing.T) { + delay := discovery.GetTabletPickerRetryDelay() + defer func() { + discovery.SetTabletPickerRetryDelay(delay) + }() + discovery.SetTabletPickerRetryDelay(5 * time.Millisecond) + tc := &splitCloneTestCase{t: t} tc.setUpWithConcurrency(false /* v3 */, 1, 10, splitCloneTestRowsCount) defer tc.tearDown() @@ -743,6 +782,11 @@ func TestSplitCloneV2_Online_TabletsUnavailableDuringRestart(t *testing.T) { // TestSplitCloneV2_Online tests the online phase with an empty destination. func TestSplitCloneV2_Online(t *testing.T) { + delay := discovery.GetTabletPickerRetryDelay() + defer func() { + discovery.SetTabletPickerRetryDelay(delay) + }() + discovery.SetTabletPickerRetryDelay(5 * time.Millisecond) tc := &splitCloneTestCase{t: t} tc.setUp(false /* v3 */) defer tc.tearDown() @@ -768,6 +812,12 @@ func TestSplitCloneV2_Online(t *testing.T) { } func TestSplitCloneV2_Online_Offline(t *testing.T) { + delay := discovery.GetTabletPickerRetryDelay() + defer func() { + discovery.SetTabletPickerRetryDelay(delay) + }() + discovery.SetTabletPickerRetryDelay(5 * time.Millisecond) + tc := &splitCloneTestCase{t: t} tc.setUp(false /* v3 */) defer tc.tearDown() @@ -800,6 +850,12 @@ func TestSplitCloneV2_Online_Offline(t *testing.T) { // TestSplitCloneV2_Offline, but the destination has existing data which must be // reconciled. func TestSplitCloneV2_Offline_Reconciliation(t *testing.T) { + delay := discovery.GetTabletPickerRetryDelay() + defer func() { + discovery.SetTabletPickerRetryDelay(delay) + }() + discovery.SetTabletPickerRetryDelay(5 * time.Millisecond) + tc := &splitCloneTestCase{t: t} // We reduce the parallelism to 1 to test the order of expected // insert/update/delete statements on the destination master. @@ -860,6 +916,12 @@ func TestSplitCloneV2_Offline_Reconciliation(t *testing.T) { } func TestSplitCloneV2_Throttled(t *testing.T) { + delay := discovery.GetTabletPickerRetryDelay() + defer func() { + discovery.SetTabletPickerRetryDelay(delay) + }() + discovery.SetTabletPickerRetryDelay(5 * time.Millisecond) + tc := &splitCloneTestCase{t: t} tc.setUp(false /* v3 */) defer tc.tearDown() @@ -901,6 +963,12 @@ func TestSplitCloneV2_Throttled(t *testing.T) { // TestSplitCloneV2 with the additional twist that the destination masters // fail the first write because they are read-only and succeed after that. func TestSplitCloneV2_RetryDueToReadonly(t *testing.T) { + delay := discovery.GetTabletPickerRetryDelay() + defer func() { + discovery.SetTabletPickerRetryDelay(delay) + }() + discovery.SetTabletPickerRetryDelay(5 * time.Millisecond) + tc := &splitCloneTestCase{t: t} tc.setUp(false /* v3 */) defer tc.tearDown() @@ -931,6 +999,12 @@ func TestSplitCloneV2_RetryDueToReadonly(t *testing.T) { // even in a period where no MASTER tablet is available according to the // HealthCheck instance. func TestSplitCloneV2_NoMasterAvailable(t *testing.T) { + delay := discovery.GetTabletPickerRetryDelay() + defer func() { + discovery.SetTabletPickerRetryDelay(delay) + }() + discovery.SetTabletPickerRetryDelay(5 * time.Millisecond) + tc := &splitCloneTestCase{t: t} tc.setUp(false /* v3 */) defer tc.tearDown() @@ -998,6 +1072,12 @@ func TestSplitCloneV2_NoMasterAvailable(t *testing.T) { } func TestSplitCloneV3(t *testing.T) { + delay := discovery.GetTabletPickerRetryDelay() + defer func() { + discovery.SetTabletPickerRetryDelay(delay) + }() + discovery.SetTabletPickerRetryDelay(5 * time.Millisecond) + tc := &splitCloneTestCase{t: t} tc.setUp(true /* v3 */) defer tc.tearDown() diff --git a/go/vt/worker/split_diff_test.go b/go/vt/worker/split_diff_test.go index 5c587fa0158..3dc938c0da1 100644 --- a/go/vt/worker/split_diff_test.go +++ b/go/vt/worker/split_diff_test.go @@ -22,6 +22,8 @@ import ( "testing" "time" + "vitess.io/vitess/go/vt/discovery" + "golang.org/x/net/context" "vitess.io/vitess/go/sqltypes" @@ -169,6 +171,12 @@ func (sq *sourceTabletServer) StreamExecute(ctx context.Context, target *querypb // TODO(aaijazi): Create a test in which source and destination data does not match func testSplitDiff(t *testing.T, v3 bool, destinationTabletType topodatapb.TabletType) { + delay := discovery.GetTabletPickerRetryDelay() + defer func() { + discovery.SetTabletPickerRetryDelay(delay) + }() + discovery.SetTabletPickerRetryDelay(5 * time.Millisecond) + *useV3ReshardingMode = v3 ts := memorytopo.NewServer("cell1", "cell2") ctx := context.Background() @@ -265,7 +273,8 @@ func testSplitDiff(t *testing.T, v3 bool, destinationTabletType topodatapb.Table qs := fakes.NewStreamHealthQueryService(sourceRdonly.Target()) qs.AddDefaultHealthResponse() grpcqueryservice.Register(sourceRdonly.RPCServer, &sourceTabletServer{ - t: t, + t: t, + StreamHealthQueryService: qs, excludedTable: excludedTable, v3: v3, @@ -276,7 +285,8 @@ func testSplitDiff(t *testing.T, v3 bool, destinationTabletType topodatapb.Table qs := fakes.NewStreamHealthQueryService(destRdonly.Target()) qs.AddDefaultHealthResponse() grpcqueryservice.Register(destRdonly.RPCServer, &destinationTabletServer{ - t: t, + t: t, + StreamHealthQueryService: qs, excludedTable: excludedTable, }) diff --git a/go/vt/worker/vertical_split_clone_test.go b/go/vt/worker/vertical_split_clone_test.go index e138dce21c1..fd096d967a9 100644 --- a/go/vt/worker/vertical_split_clone_test.go +++ b/go/vt/worker/vertical_split_clone_test.go @@ -20,6 +20,8 @@ import ( "testing" "time" + "vitess.io/vitess/go/vt/discovery" + "golang.org/x/net/context" "vitess.io/vitess/go/mysql" @@ -60,6 +62,12 @@ func createVerticalSplitCloneDestinationFakeDb(t *testing.T, name string, insert // to the destination and the offline phase won't copy any rows as the source // has not changed in the meantime. func TestVerticalSplitClone(t *testing.T) { + delay := discovery.GetTabletPickerRetryDelay() + defer func() { + discovery.SetTabletPickerRetryDelay(delay) + }() + discovery.SetTabletPickerRetryDelay(5 * time.Millisecond) + ts := memorytopo.NewServer("cell1", "cell2") ctx := context.Background() wi := NewInstance(ts, "cell1", time.Second) diff --git a/go/vt/worker/vertical_split_diff_test.go b/go/vt/worker/vertical_split_diff_test.go index ab286da6e97..c0d494b162b 100644 --- a/go/vt/worker/vertical_split_diff_test.go +++ b/go/vt/worker/vertical_split_diff_test.go @@ -22,6 +22,8 @@ import ( "testing" "time" + "vitess.io/vitess/go/vt/discovery" + "golang.org/x/net/context" "vitess.io/vitess/go/sqltypes" @@ -92,6 +94,12 @@ func (sq *verticalDiffTabletServer) StreamExecute(ctx context.Context, target *q // TODO(aaijazi): Create a test in which source and destination data does not match func TestVerticalSplitDiff(t *testing.T) { + delay := discovery.GetTabletPickerRetryDelay() + defer func() { + discovery.SetTabletPickerRetryDelay(delay) + }() + discovery.SetTabletPickerRetryDelay(5 * time.Millisecond) + ts := memorytopo.NewServer("cell1", "cell2") ctx := context.Background() wi := NewInstance(ts, "cell1", time.Second) @@ -171,7 +179,8 @@ func TestVerticalSplitDiff(t *testing.T) { qs := fakes.NewStreamHealthQueryService(rdonly.Target()) qs.AddDefaultHealthResponse() grpcqueryservice.Register(rdonly.RPCServer, &verticalDiffTabletServer{ - t: t, + t: t, + StreamHealthQueryService: qs, }) } diff --git a/go/vt/wrangler/testlib/apply_schema_flaky_test.go b/go/vt/wrangler/testlib/apply_schema_flaky_test.go index 534d2fff8e2..014ca8cd755 100644 --- a/go/vt/wrangler/testlib/apply_schema_flaky_test.go +++ b/go/vt/wrangler/testlib/apply_schema_flaky_test.go @@ -19,6 +19,9 @@ package testlib import ( "strings" "testing" + "time" + + "vitess.io/vitess/go/vt/discovery" "golang.org/x/net/context" @@ -39,6 +42,12 @@ import ( // Only if the flag is specified, potentially long running schema changes are // allowed. func TestApplySchema_AllowLongUnavailability(t *testing.T) { + delay := discovery.GetTabletPickerRetryDelay() + defer func() { + discovery.SetTabletPickerRetryDelay(delay) + }() + discovery.SetTabletPickerRetryDelay(5 * time.Millisecond) + cell := "cell1" db := fakesqldb.New(t) defer db.Close() diff --git a/go/vt/wrangler/testlib/backup_test.go b/go/vt/wrangler/testlib/backup_test.go index 493732d41e4..4ec02b6943e 100644 --- a/go/vt/wrangler/testlib/backup_test.go +++ b/go/vt/wrangler/testlib/backup_test.go @@ -23,6 +23,8 @@ import ( "testing" "time" + "vitess.io/vitess/go/vt/discovery" + "golang.org/x/net/context" "vitess.io/vitess/go/mysql" @@ -42,6 +44,12 @@ import ( ) func TestBackupRestore(t *testing.T) { + delay := discovery.GetTabletPickerRetryDelay() + defer func() { + discovery.SetTabletPickerRetryDelay(delay) + }() + discovery.SetTabletPickerRetryDelay(5 * time.Millisecond) + // Initialize our environment ctx := context.Background() db := fakesqldb.New(t) @@ -214,6 +222,12 @@ func TestBackupRestore(t *testing.T) { } func TestRestoreUnreachableMaster(t *testing.T) { + delay := discovery.GetTabletPickerRetryDelay() + defer func() { + discovery.SetTabletPickerRetryDelay(delay) + }() + discovery.SetTabletPickerRetryDelay(5 * time.Millisecond) + // Initialize our environment ctx := context.Background() db := fakesqldb.New(t) diff --git a/go/vt/wrangler/testlib/copy_schema_shard_test.go b/go/vt/wrangler/testlib/copy_schema_shard_test.go index 35523a7dd26..6b99d9acd19 100644 --- a/go/vt/wrangler/testlib/copy_schema_shard_test.go +++ b/go/vt/wrangler/testlib/copy_schema_shard_test.go @@ -18,6 +18,9 @@ package testlib import ( "testing" + "time" + + "vitess.io/vitess/go/vt/discovery" "golang.org/x/net/context" @@ -44,6 +47,12 @@ func TestCopySchemaShard_UseShardAsSource(t *testing.T) { } func copySchema(t *testing.T, useShardAsSource bool) { + delay := discovery.GetTabletPickerRetryDelay() + defer func() { + discovery.SetTabletPickerRetryDelay(delay) + }() + discovery.SetTabletPickerRetryDelay(5 * time.Millisecond) + ts := memorytopo.NewServer("cell1", "cell2") wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient()) vp := NewVtctlPipe(t, ts) diff --git a/go/vt/wrangler/testlib/emergency_reparent_shard_test.go b/go/vt/wrangler/testlib/emergency_reparent_shard_test.go index c24c1dbe6eb..c6a7566011d 100644 --- a/go/vt/wrangler/testlib/emergency_reparent_shard_test.go +++ b/go/vt/wrangler/testlib/emergency_reparent_shard_test.go @@ -21,6 +21,8 @@ import ( "testing" "time" + "vitess.io/vitess/go/vt/discovery" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -35,6 +37,12 @@ import ( ) func TestEmergencyReparentShard(t *testing.T) { + delay := discovery.GetTabletPickerRetryDelay() + defer func() { + discovery.SetTabletPickerRetryDelay(delay) + }() + discovery.SetTabletPickerRetryDelay(5 * time.Millisecond) + ts := memorytopo.NewServer("cell1", "cell2") wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient()) vp := NewVtctlPipe(t, ts) @@ -149,6 +157,12 @@ func TestEmergencyReparentShard(t *testing.T) { // TestEmergencyReparentShardMasterElectNotBest tries to emergency reparent // to a host that is not the latest in replication position. func TestEmergencyReparentShardMasterElectNotBest(t *testing.T) { + delay := discovery.GetTabletPickerRetryDelay() + defer func() { + discovery.SetTabletPickerRetryDelay(delay) + }() + discovery.SetTabletPickerRetryDelay(5 * time.Millisecond) + ctx := context.Background() ts := memorytopo.NewServer("cell1", "cell2") wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient()) diff --git a/go/vt/wrangler/testlib/external_reparent_test.go b/go/vt/wrangler/testlib/external_reparent_test.go index 72d58ceaa44..f6612f3fa23 100644 --- a/go/vt/wrangler/testlib/external_reparent_test.go +++ b/go/vt/wrangler/testlib/external_reparent_test.go @@ -21,6 +21,8 @@ import ( "testing" "time" + "vitess.io/vitess/go/vt/discovery" + "github.com/stretchr/testify/assert" "golang.org/x/net/context" "vitess.io/vitess/go/vt/logutil" @@ -38,6 +40,12 @@ import ( // TestTabletExternallyReparentedBasic tests the base cases for TER func TestTabletExternallyReparentedBasic(t *testing.T) { + delay := discovery.GetTabletPickerRetryDelay() + defer func() { + discovery.SetTabletPickerRetryDelay(delay) + }() + discovery.SetTabletPickerRetryDelay(5 * time.Millisecond) + ctx := context.Background() ts := memorytopo.NewServer("cell1") wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient()) @@ -124,6 +132,12 @@ func TestTabletExternallyReparentedBasic(t *testing.T) { } func TestTabletExternallyReparentedToReplica(t *testing.T) { + delay := discovery.GetTabletPickerRetryDelay() + defer func() { + discovery.SetTabletPickerRetryDelay(delay) + }() + discovery.SetTabletPickerRetryDelay(5 * time.Millisecond) + ctx := context.Background() ts := memorytopo.NewServer("cell1") wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient()) @@ -200,6 +214,12 @@ func TestTabletExternallyReparentedToReplica(t *testing.T) { // that if mysql is restarted on the master-elect tablet and has a different // port, we pick it up correctly. func TestTabletExternallyReparentedWithDifferentMysqlPort(t *testing.T) { + delay := discovery.GetTabletPickerRetryDelay() + defer func() { + discovery.SetTabletPickerRetryDelay(delay) + }() + discovery.SetTabletPickerRetryDelay(5 * time.Millisecond) + ctx := context.Background() ts := memorytopo.NewServer("cell1") wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient()) @@ -279,6 +299,12 @@ func TestTabletExternallyReparentedWithDifferentMysqlPort(t *testing.T) { // TestTabletExternallyReparentedContinueOnUnexpectedMaster makes sure // that we ignore mysql's master if the flag is set func TestTabletExternallyReparentedContinueOnUnexpectedMaster(t *testing.T) { + delay := discovery.GetTabletPickerRetryDelay() + defer func() { + discovery.SetTabletPickerRetryDelay(delay) + }() + discovery.SetTabletPickerRetryDelay(5 * time.Millisecond) + ctx := context.Background() ts := memorytopo.NewServer("cell1") wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient()) @@ -351,6 +377,12 @@ func TestTabletExternallyReparentedContinueOnUnexpectedMaster(t *testing.T) { } func TestTabletExternallyReparentedRerun(t *testing.T) { + delay := discovery.GetTabletPickerRetryDelay() + defer func() { + discovery.SetTabletPickerRetryDelay(delay) + }() + discovery.SetTabletPickerRetryDelay(5 * time.Millisecond) + ctx := context.Background() ts := memorytopo.NewServer("cell1") wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient()) @@ -439,6 +471,12 @@ func TestTabletExternallyReparentedRerun(t *testing.T) { } func TestRPCTabletExternallyReparentedDemotesMasterToConfiguredTabletType(t *testing.T) { + delay := discovery.GetTabletPickerRetryDelay() + defer func() { + discovery.SetTabletPickerRetryDelay(delay) + }() + discovery.SetTabletPickerRetryDelay(5 * time.Millisecond) + flag.Set("disable_active_reparents", "true") defer flag.Set("disable_active_reparents", "false") diff --git a/go/vt/wrangler/testlib/migrate_served_from_test.go b/go/vt/wrangler/testlib/migrate_served_from_test.go index 7105f298419..c31f5a2dcb6 100644 --- a/go/vt/wrangler/testlib/migrate_served_from_test.go +++ b/go/vt/wrangler/testlib/migrate_served_from_test.go @@ -19,6 +19,9 @@ package testlib import ( "reflect" "testing" + "time" + + "vitess.io/vitess/go/vt/discovery" "golang.org/x/net/context" @@ -35,6 +38,12 @@ import ( ) func TestMigrateServedFrom(t *testing.T) { + delay := discovery.GetTabletPickerRetryDelay() + defer func() { + discovery.SetTabletPickerRetryDelay(delay) + }() + discovery.SetTabletPickerRetryDelay(5 * time.Millisecond) + ctx := context.Background() ts := memorytopo.NewServer("cell1", "cell2") wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient()) diff --git a/go/vt/wrangler/testlib/migrate_served_types_test.go b/go/vt/wrangler/testlib/migrate_served_types_test.go index 74d574be9c9..fa34f1cb911 100644 --- a/go/vt/wrangler/testlib/migrate_served_types_test.go +++ b/go/vt/wrangler/testlib/migrate_served_types_test.go @@ -20,6 +20,9 @@ import ( "flag" "strings" "testing" + "time" + + "vitess.io/vitess/go/vt/discovery" "golang.org/x/net/context" @@ -244,6 +247,12 @@ func TestMigrateServedTypes(t *testing.T) { } func TestMultiShardMigrateServedTypes(t *testing.T) { + delay := discovery.GetTabletPickerRetryDelay() + defer func() { + discovery.SetTabletPickerRetryDelay(delay) + }() + discovery.SetTabletPickerRetryDelay(5 * time.Millisecond) + // TODO(b/26388813): Remove the next two lines once vtctl WaitForDrain is integrated in the vtctl MigrateServed* commands. flag.Set("wait_for_drain_sleep_rdonly", "0s") flag.Set("wait_for_drain_sleep_replica", "0s") diff --git a/go/vt/wrangler/testlib/permissions_test.go b/go/vt/wrangler/testlib/permissions_test.go index 903fbf8ff57..17919953572 100644 --- a/go/vt/wrangler/testlib/permissions_test.go +++ b/go/vt/wrangler/testlib/permissions_test.go @@ -19,6 +19,9 @@ package testlib import ( "strings" "testing" + "time" + + "vitess.io/vitess/go/vt/discovery" "golang.org/x/net/context" @@ -34,6 +37,12 @@ import ( ) func TestPermissions(t *testing.T) { + delay := discovery.GetTabletPickerRetryDelay() + defer func() { + discovery.SetTabletPickerRetryDelay(delay) + }() + discovery.SetTabletPickerRetryDelay(5 * time.Millisecond) + // Initialize our environment ctx := context.Background() ts := memorytopo.NewServer("cell1", "cell2") diff --git a/go/vt/wrangler/testlib/planned_reparent_shard_test.go b/go/vt/wrangler/testlib/planned_reparent_shard_test.go index deab660c594..59f10c4dc26 100644 --- a/go/vt/wrangler/testlib/planned_reparent_shard_test.go +++ b/go/vt/wrangler/testlib/planned_reparent_shard_test.go @@ -20,6 +20,9 @@ import ( "context" "errors" "testing" + "time" + + "vitess.io/vitess/go/vt/discovery" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -36,6 +39,12 @@ import ( ) func TestPlannedReparentShardNoMasterProvided(t *testing.T) { + delay := discovery.GetTabletPickerRetryDelay() + defer func() { + discovery.SetTabletPickerRetryDelay(delay) + }() + discovery.SetTabletPickerRetryDelay(5 * time.Millisecond) + ts := memorytopo.NewServer("cell1", "cell2") wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient()) vp := NewVtctlPipe(t, ts) @@ -139,6 +148,12 @@ func TestPlannedReparentShardNoMasterProvided(t *testing.T) { } func TestPlannedReparentShardNoError(t *testing.T) { + delay := discovery.GetTabletPickerRetryDelay() + defer func() { + discovery.SetTabletPickerRetryDelay(delay) + }() + discovery.SetTabletPickerRetryDelay(5 * time.Millisecond) + ts := memorytopo.NewServer("cell1", "cell2") wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient()) vp := NewVtctlPipe(t, ts) @@ -257,6 +272,12 @@ func TestPlannedReparentShardNoError(t *testing.T) { } func TestPlannedReparentNoMaster(t *testing.T) { + delay := discovery.GetTabletPickerRetryDelay() + defer func() { + discovery.SetTabletPickerRetryDelay(delay) + }() + discovery.SetTabletPickerRetryDelay(5 * time.Millisecond) + ts := memorytopo.NewServer("cell1", "cell2") wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient()) vp := NewVtctlPipe(t, ts) @@ -275,6 +296,12 @@ func TestPlannedReparentNoMaster(t *testing.T) { // TestPlannedReparentShardWaitForPositionFail simulates a failure of the WaitForPosition call // on the desired new master tablet func TestPlannedReparentShardWaitForPositionFail(t *testing.T) { + delay := discovery.GetTabletPickerRetryDelay() + defer func() { + discovery.SetTabletPickerRetryDelay(delay) + }() + discovery.SetTabletPickerRetryDelay(5 * time.Millisecond) + ts := memorytopo.NewServer("cell1", "cell2") wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient()) vp := NewVtctlPipe(t, ts) @@ -369,6 +396,12 @@ func TestPlannedReparentShardWaitForPositionFail(t *testing.T) { // TestPlannedReparentShardWaitForPositionTimeout simulates a context timeout // during the WaitForPosition call to the desired new master func TestPlannedReparentShardWaitForPositionTimeout(t *testing.T) { + delay := discovery.GetTabletPickerRetryDelay() + defer func() { + discovery.SetTabletPickerRetryDelay(delay) + }() + discovery.SetTabletPickerRetryDelay(5 * time.Millisecond) + ts := memorytopo.NewServer("cell1", "cell2") wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient()) vp := NewVtctlPipe(t, ts) @@ -461,6 +494,12 @@ func TestPlannedReparentShardWaitForPositionTimeout(t *testing.T) { } func TestPlannedReparentShardRelayLogError(t *testing.T) { + delay := discovery.GetTabletPickerRetryDelay() + defer func() { + discovery.SetTabletPickerRetryDelay(delay) + }() + discovery.SetTabletPickerRetryDelay(5 * time.Millisecond) + ts := memorytopo.NewServer("cell1") wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient()) vp := NewVtctlPipe(t, ts) @@ -530,6 +569,12 @@ func TestPlannedReparentShardRelayLogError(t *testing.T) { // is not replicating to start with (IO_Thread is not running) and we // simulate an error from the attempt to start replication func TestPlannedReparentShardRelayLogErrorStartReplication(t *testing.T) { + delay := discovery.GetTabletPickerRetryDelay() + defer func() { + discovery.SetTabletPickerRetryDelay(delay) + }() + discovery.SetTabletPickerRetryDelay(5 * time.Millisecond) + ts := memorytopo.NewServer("cell1") wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient()) vp := NewVtctlPipe(t, ts) @@ -600,6 +645,12 @@ func TestPlannedReparentShardRelayLogErrorStartReplication(t *testing.T) { // TestPlannedReparentShardPromoteReplicaFail simulates a failure of the PromoteReplica call // on the desired new master tablet func TestPlannedReparentShardPromoteReplicaFail(t *testing.T) { + delay := discovery.GetTabletPickerRetryDelay() + defer func() { + discovery.SetTabletPickerRetryDelay(delay) + }() + discovery.SetTabletPickerRetryDelay(5 * time.Millisecond) + ts := memorytopo.NewServer("cell1", "cell2") wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient()) vp := NewVtctlPipe(t, ts) @@ -728,6 +779,12 @@ func TestPlannedReparentShardPromoteReplicaFail(t *testing.T) { // Simulate failure of previous PRS and oldMaster is ReadOnly // Verify that master correctly gets set to ReadWrite func TestPlannedReparentShardSameMaster(t *testing.T) { + delay := discovery.GetTabletPickerRetryDelay() + defer func() { + discovery.SetTabletPickerRetryDelay(delay) + }() + discovery.SetTabletPickerRetryDelay(5 * time.Millisecond) + ts := memorytopo.NewServer("cell1", "cell2") wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient()) vp := NewVtctlPipe(t, ts) diff --git a/go/vt/wrangler/testlib/reparent_utils_test.go b/go/vt/wrangler/testlib/reparent_utils_test.go index 31ccc53e729..739a1922bd4 100644 --- a/go/vt/wrangler/testlib/reparent_utils_test.go +++ b/go/vt/wrangler/testlib/reparent_utils_test.go @@ -18,6 +18,9 @@ package testlib import ( "testing" + "time" + + "vitess.io/vitess/go/vt/discovery" "golang.org/x/net/context" @@ -33,6 +36,12 @@ import ( ) func TestShardReplicationStatuses(t *testing.T) { + delay := discovery.GetTabletPickerRetryDelay() + defer func() { + discovery.SetTabletPickerRetryDelay(delay) + }() + discovery.SetTabletPickerRetryDelay(5 * time.Millisecond) + ctx := context.Background() ts := memorytopo.NewServer("cell1", "cell2") wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient()) @@ -103,6 +112,12 @@ func TestShardReplicationStatuses(t *testing.T) { } func TestReparentTablet(t *testing.T) { + delay := discovery.GetTabletPickerRetryDelay() + defer func() { + discovery.SetTabletPickerRetryDelay(delay) + }() + discovery.SetTabletPickerRetryDelay(5 * time.Millisecond) + ctx := context.Background() ts := memorytopo.NewServer("cell1", "cell2") wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient()) diff --git a/go/vt/wrangler/testlib/version_test.go b/go/vt/wrangler/testlib/version_test.go index c65acc823b3..1d16c6fe0ae 100644 --- a/go/vt/wrangler/testlib/version_test.go +++ b/go/vt/wrangler/testlib/version_test.go @@ -22,6 +22,9 @@ import ( "net/http" "strings" "testing" + "time" + + "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/topo/memorytopo" @@ -55,6 +58,12 @@ func expvarHandler(gitRev *string) func(http.ResponseWriter, *http.Request) { } func TestVersion(t *testing.T) { + delay := discovery.GetTabletPickerRetryDelay() + defer func() { + discovery.SetTabletPickerRetryDelay(delay) + }() + discovery.SetTabletPickerRetryDelay(5 * time.Millisecond) + // We need to run this test with the /debug/vars version of the // plugin. wrangler.ResetDebugVarsGetVersion() diff --git a/go/vt/wrangler/traffic_switcher_env_test.go b/go/vt/wrangler/traffic_switcher_env_test.go index eff923af02e..e24040f34dc 100644 --- a/go/vt/wrangler/traffic_switcher_env_test.go +++ b/go/vt/wrangler/traffic_switcher_env_test.go @@ -66,6 +66,17 @@ type testShardMigraterEnv struct { testMigraterEnv } +// tablet picker requires these to be set, otherwise it errors out. also the values need to match an existing +// tablet, otherwise it sleeps until it retries, causing tests to timeout and hence break +// we set these for each new migater env to be the first source shard +// the tests don't depend on which tablet is picked, so this works for now +type testTabletPickerChoice struct { + keyspace string + shard string +} + +var tpChoice *testTabletPickerChoice + func newTestTableMigrater(ctx context.Context, t *testing.T) *testMigraterEnv { return newTestTableMigraterCustom(ctx, t, []string{"-40", "40-"}, []string{"-80", "80-"}, "select * %s") } @@ -91,6 +102,11 @@ func newTestTableMigraterCustom(ctx context.Context, t *testing.T, sourceShards, } tme.sourceKeyRanges = append(tme.sourceKeyRanges, sourceKeyRange) } + tpChoiceTablet := tme.sourceMasters[0].Tablet + tpChoice = &testTabletPickerChoice{ + keyspace: tpChoiceTablet.Keyspace, + shard: tpChoiceTablet.Shard, + } for _, shard := range targetShards { tme.targetMasters = append(tme.targetMasters, newFakeTablet(t, tme.wr, "cell1", uint32(tabletID), topodatapb.TabletType_MASTER, tme.tmeDB, TabletKeyspaceShard(t, "ks2", shard))) tabletID += 10 @@ -209,6 +225,12 @@ func newTestShardMigrater(ctx context.Context, t *testing.T, sourceShards, targe } tme.sourceKeyRanges = append(tme.sourceKeyRanges, sourceKeyRange) } + tpChoiceTablet := tme.sourceMasters[0].Tablet + tpChoice = &testTabletPickerChoice{ + keyspace: tpChoiceTablet.Keyspace, + shard: tpChoiceTablet.Shard, + } + for _, shard := range targetShards { tme.targetMasters = append(tme.targetMasters, newFakeTablet(t, tme.wr, "cell1", uint32(tabletID), topodatapb.TabletType_MASTER, nil, TabletKeyspaceShard(t, "ks", shard))) tabletID += 10 diff --git a/go/vt/wrangler/traffic_switcher_test.go b/go/vt/wrangler/traffic_switcher_test.go index 3a0538e3d56..b506aba2128 100644 --- a/go/vt/wrangler/traffic_switcher_test.go +++ b/go/vt/wrangler/traffic_switcher_test.go @@ -559,7 +559,6 @@ func TestShardMigrateMainflow(t *testing.T) { t.Errorf("SwitchWrites err: %v, want %v", err, want) } verifyQueries(t, tme.allDBClients) - //------------------------------------------------------------------------------------------------------------------- // Test SwitchWrites cancelation on failure. @@ -622,7 +621,6 @@ func TestShardMigrateMainflow(t *testing.T) { } verifyQueries(t, tme.allDBClients) - checkServedTypes(t, tme.ts, "ks:-40", 1) checkServedTypes(t, tme.ts, "ks:40-", 1) checkServedTypes(t, tme.ts, "ks:-80", 2) @@ -1751,18 +1749,18 @@ func checkIsMasterServing(t *testing.T, ts *topo.Server, keyspaceShard string, w } } -func stoppedResult(id int) *sqltypes.Result { +func getResult(id int, state string, keyspace string, shard string) *sqltypes.Result { return sqltypes.MakeTestResult(sqltypes.MakeTestFields( - "id|state", - "int64|varchar"), - fmt.Sprintf("%d|Stopped", id), + "id|state|cell|tablet_types|source", + "int64|varchar|varchar|varchar|varchar"), + fmt.Sprintf("%d|%s|cell1|MASTER|keyspace:\"%s\" shard:\"%s\"", id, state, keyspace, shard), ) } +func stoppedResult(id int) *sqltypes.Result { + return getResult(id, "Stopped", tpChoice.keyspace, tpChoice.shard) +} + func runningResult(id int) *sqltypes.Result { - return sqltypes.MakeTestResult(sqltypes.MakeTestFields( - "id|state", - "int64|varchar"), - fmt.Sprintf("%d|Running", id), - ) + return getResult(id, "Running", tpChoice.keyspace, tpChoice.shard) }