diff --git a/doc/releasenotes/16_0_0_summary.md b/doc/releasenotes/16_0_0_summary.md index 01b6acac821..6550c052e69 100644 --- a/doc/releasenotes/16_0_0_summary.md +++ b/doc/releasenotes/16_0_0_summary.md @@ -15,6 +15,13 @@ In [PR #11103](https://github.com/vitessio/vitess/pull/11103) we introduced the ability to resume a `VTGate` [`VStream` copy operation](https://vitess.io/docs/design-docs/vreplication/vstream/vscopy/). This is useful when a [`VStream` copy operation](https://vitess.io/docs/design-docs/vreplication/vstream/vscopy/) is interrupted due to e.g. a network failure or a server restart. The `VStream` copy operation can be resumed by specifying each table's last seen primary key value in the `VStream` request. Please see the [`VStream` docs](https://vitess.io/docs/16.0/reference/vreplication/vstream/) for more details. +### New TabletPicker Options and Default Cell Behavior + +In [PR 11771](https://github.com/vitessio/vitess/pull/11771) we allow for default cell alias fallback during tablet selection for VStreams when client +does not specify list of cells. In addition, we add the option for local cell preference during tablet selection. +The local cell preference takes precedence over tablet type.See PR description for examples. If a client wants to specify local cell preference in the gRPC request, +they can pass in a new "local:" tag with the rest of the cells under VStreamFlags. e.g. "local:,cella,cellb". + ### Tablet throttler The tablet throttler can now be configured dynamically. Configuration is now found in the topo service, and applies to all tablets in all shards and cells of a given keyspace. For backwards compatibility `v16` still supports `vttablet`-based command line flags for throttler ocnfiguration. diff --git a/go/vt/discovery/tablet_picker.go b/go/vt/discovery/tablet_picker.go index cb0449c6191..0286661c2ab 100644 --- a/go/vt/discovery/tablet_picker.go +++ b/go/vt/discovery/tablet_picker.go @@ -46,6 +46,7 @@ var ( muTabletPickerRetryDelay sync.Mutex globalTPStats *tabletPickerStats inOrderHint = "in_order:" + localPreferenceHint = "local:" ) // GetTabletPickerRetryDelay synchronizes changes to tabletPickerRetryDelay. Used in tests only at the moment @@ -64,12 +65,13 @@ func SetTabletPickerRetryDelay(delay time.Duration) { // TabletPicker gives a simplified API for picking tablets. type TabletPicker struct { - ts *topo.Server - cells []string - keyspace string - shard string - tabletTypes []topodatapb.TabletType - inOrder bool + ts *topo.Server + cells []string + keyspace string + shard string + tabletTypes []topodatapb.TabletType + inOrder bool + localPreference string } // NewTabletPicker returns a TabletPicker. @@ -92,19 +94,60 @@ func NewTabletPicker(ts *topo.Server, cells []string, keyspace, shard, tabletTyp return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, fmt.Sprintf("Missing required field(s) for tablet picker: %s", strings.Join(missingFields, ", "))) } + + localPreference := "" + if strings.HasPrefix(cells[0], localPreferenceHint) { + localPreference = cells[0][len(localPreferenceHint):] + cells = cells[1:] + // Add the local cell to the list of cells + // This may result in the local cell appearing twice if it already exists as part of an alias, + // but cells will get deduped during tablet selection. See GetMatchingTablets() -> tp.dedupeCells() + cells = append(cells, localPreference) + } + return &TabletPicker{ - ts: ts, - cells: cells, - keyspace: keyspace, - shard: shard, - tabletTypes: tabletTypes, - inOrder: inOrder, + ts: ts, + cells: cells, + keyspace: keyspace, + shard: shard, + tabletTypes: tabletTypes, + inOrder: inOrder, + localPreference: localPreference, }, nil } +func (tp *TabletPicker) prioritizeTablets(candidates []*topo.TabletInfo) (sameCell, allOthers []*topo.TabletInfo) { + for _, c := range candidates { + if c.Alias.Cell == tp.localPreference { + sameCell = append(sameCell, c) + } else { + allOthers = append(allOthers, c) + } + } + + return sameCell, allOthers +} + +func (tp *TabletPicker) orderByTabletType(candidates []*topo.TabletInfo) []*topo.TabletInfo { + // Sort candidates slice such that tablets appear in same tablet type order as in tp.tabletTypes + orderMap := map[topodatapb.TabletType]int{} + for i, t := range tp.tabletTypes { + orderMap[t] = i + } + sort.Slice(candidates, func(i, j int) bool { + if orderMap[candidates[i].Type] == orderMap[candidates[j].Type] { + // identical tablet types: randomize order of tablets for this type + return rand.Intn(2) == 0 // 50% chance + } + return orderMap[candidates[i].Type] < orderMap[candidates[j].Type] + }) + + return candidates +} + // PickForStreaming picks an available tablet. // All tablets that belong to tp.cells are evaluated and one is -// chosen at random. +// chosen at random, unless local preference is given func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Tablet, error) { rand.Seed(time.Now().UnixNano()) // keep trying at intervals (tabletPickerRetryDelay) until a tablet is found @@ -116,19 +159,28 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table default: } candidates := tp.GetMatchingTablets(ctx) - if tp.inOrder { - // Sort candidates slice such that tablets appear in same tablet type order as in tp.tabletTypes - orderMap := map[topodatapb.TabletType]int{} - for i, t := range tp.tabletTypes { - orderMap[t] = i + // we'd like to prioritize same cell tablets + if tp.localPreference != "" { + sameCellCandidates, allOtherCandidates := tp.prioritizeTablets(candidates) + + // order same cell and all others by tablet type separately + // combine with same cell in front + if tp.inOrder { + sameCellCandidates = tp.orderByTabletType(sameCellCandidates) + allOtherCandidates = tp.orderByTabletType(allOtherCandidates) + } else { + // Randomize same cell candidates + rand.Shuffle(len(sameCellCandidates), func(i, j int) { + sameCellCandidates[i], sameCellCandidates[j] = sameCellCandidates[j], sameCellCandidates[i] + }) + // Randomize all other candidates + rand.Shuffle(len(allOtherCandidates), func(i, j int) { + allOtherCandidates[i], allOtherCandidates[j] = allOtherCandidates[j], allOtherCandidates[i] + }) } - sort.Slice(candidates, func(i, j int) bool { - if orderMap[candidates[i].Type] == orderMap[candidates[j].Type] { - // identical tablet types: randomize order of tablets for this type - return rand.Intn(2) == 0 // 50% chance - } - return orderMap[candidates[i].Type] < orderMap[candidates[j].Type] - }) + candidates = append(sameCellCandidates, allOtherCandidates...) + } else if tp.inOrder { + candidates = tp.orderByTabletType(candidates) } else { // Randomize candidates rand.Shuffle(len(candidates), func(i, j int) { @@ -204,6 +256,10 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn actualCells = append(actualCells, cell) } } + // Just in case a cell was passed in addition to its alias. + // Can happen if localPreference is not "". See NewTabletPicker + actualCells = tp.dedupeCells(actualCells) + for _, cell := range actualCells { shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer cancel() @@ -246,6 +302,19 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn return tablets } +func (tp *TabletPicker) dedupeCells(cells []string) []string { + keys := make(map[string]bool) + dedupedCells := []string{} + + for _, c := range cells { + if _, value := keys[c]; !value { + keys[c] = true + dedupedCells = append(dedupedCells, c) + } + } + return dedupedCells +} + func init() { // TODO(sougou): consolidate this call to be once per process. rand.Seed(time.Now().UnixNano()) diff --git a/go/vt/discovery/tablet_picker_test.go b/go/vt/discovery/tablet_picker_test.go index ed071af13ad..e24982a5175 100644 --- a/go/vt/discovery/tablet_picker_test.go +++ b/go/vt/discovery/tablet_picker_test.go @@ -18,9 +18,12 @@ package discovery import ( "context" + "os" "testing" "time" + _flag "vitess.io/vitess/go/internal/flag" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" @@ -364,6 +367,156 @@ func TestPickUsingCellAlias(t *testing.T) { assert.True(t, picked2) } +func TestPickLocalPreferences(t *testing.T) { + type tablet struct { + id uint32 + typ topodatapb.TabletType + cell string + } + + type testCase struct { + name string + + //inputs + tablets []tablet + inCells []string + inTabletTypes string + + //expected + tpLocalPreference string + tpCells []string + wantTablets []uint32 + } + + tcases := []testCase{ + { + "local preference", + []tablet{ + {101, topodatapb.TabletType_REPLICA, "cell1"}, + {102, topodatapb.TabletType_REPLICA, "cell2"}, + {103, topodatapb.TabletType_REPLICA, "cell2"}, + }, + []string{"local:cell2", "cell1"}, + "replica", + "cell2", + []string{"cell1", "cell2"}, + []uint32{102, 103}, + }, + { + "local preference with cell alias", + []tablet{ + {101, topodatapb.TabletType_REPLICA, "cell1"}, + {102, topodatapb.TabletType_REPLICA, "cell2"}, + }, + []string{"local:cell2", "cella"}, + "replica", + "cell2", + []string{"cella", "cell2"}, + []uint32{102}, + }, + { + "local preference with tablet type ordering, replica", + []tablet{ + {101, topodatapb.TabletType_REPLICA, "cell1"}, + {102, topodatapb.TabletType_REPLICA, "cell1"}, + {103, topodatapb.TabletType_PRIMARY, "cell2"}, + {104, topodatapb.TabletType_REPLICA, "cell2"}, + }, + []string{"local:cell2", "cella"}, + "in_order:replica,primary", + "cell2", + []string{"cella", "cell2"}, + []uint32{104}, + }, + { + "no local preference with tablet type ordering, primary", + []tablet{ + {101, topodatapb.TabletType_REPLICA, "cell1"}, + {102, topodatapb.TabletType_PRIMARY, "cell1"}, + {103, topodatapb.TabletType_REPLICA, "cell2"}, + {104, topodatapb.TabletType_REPLICA, "cell2"}, + }, + []string{"cell2", "cella"}, + "in_order:primary,replica", + "", + []string{"cella", "cell2"}, + []uint32{102}, + }, + { + "local preference with tablet type ordering, primary in local", + []tablet{ + {101, topodatapb.TabletType_REPLICA, "cell1"}, + {102, topodatapb.TabletType_REPLICA, "cell1"}, + {103, topodatapb.TabletType_PRIMARY, "cell2"}, + {104, topodatapb.TabletType_REPLICA, "cell2"}, + }, + []string{"local:cell2", "cella"}, + "in_order:primary,replica", + "cell2", + []string{"cella", "cell2"}, + []uint32{103}, + }, + { + "local preference with tablet type ordering, primary not local", + []tablet{ + {101, topodatapb.TabletType_PRIMARY, "cell1"}, + {102, topodatapb.TabletType_REPLICA, "cell1"}, + {103, topodatapb.TabletType_REPLICA, "cell2"}, + {104, topodatapb.TabletType_REPLICA, "cell2"}, + }, + []string{"local:cell2", "cella"}, + "in_order:primary,replica", + "cell2", + []string{"cella", "cell2"}, + []uint32{103, 104}, // replicas are picked because primary is not in the local cell/cell alias + }, + { + "local preference with tablet type ordering, primary in local's alias", + []tablet{ + {101, topodatapb.TabletType_PRIMARY, "cell1"}, + {102, topodatapb.TabletType_REPLICA, "cell1"}, + }, + []string{"local:cell2", "cella"}, + "in_order:primary,replica", + "cell2", + []string{"cella", "cell2"}, + []uint32{101}, // primary found since there are no tablets in cell/cell alias + }, + } + + ctx := context.Background() + for _, tcase := range tcases { + t.Run(tcase.name, func(t *testing.T) { + cells := []string{"cell1", "cell2"} + te := newPickerTestEnv(t, cells) + var testTablets []*topodatapb.Tablet + for _, tab := range tcase.tablets { + testTablets = append(testTablets, addTablet(te, int(tab.id), tab.typ, tab.cell, true, true)) + } + defer func() { + for _, tab := range testTablets { + deleteTablet(t, te, tab) + } + }() + tp, err := NewTabletPicker(te.topoServ, tcase.inCells, te.keyspace, te.shard, tcase.inTabletTypes) + require.NoError(t, err) + require.Equal(t, tp.localPreference, tcase.tpLocalPreference) + require.ElementsMatch(t, tp.cells, tcase.tpCells) + var selectedTablets []uint32 + selectedTabletMap := make(map[uint32]bool) + for i := 0; i < 20; i++ { + tab, err := tp.PickForStreaming(ctx) + require.NoError(t, err) + selectedTabletMap[tab.Alias.Uid] = true + } + for uid := range selectedTabletMap { + selectedTablets = append(selectedTablets, uid) + } + require.ElementsMatch(t, selectedTablets, tcase.wantTablets) + }) + } +} + func TestTabletAppearsDuringSleep(t *testing.T) { te := newPickerTestEnv(t, []string{"cell"}) tp, err := NewTabletPicker(te.topoServ, te.cells, te.keyspace, te.shard, "replica") @@ -428,6 +581,11 @@ type pickerTestEnv struct { topoServ *topo.Server } +func TestMain(m *testing.M) { + _flag.ParseFlagsForTest() + os.Exit(m.Run()) +} + func newPickerTestEnv(t *testing.T, cells []string) *pickerTestEnv { ctx := context.Background() diff --git a/go/vt/discovery/topology_watcher.go b/go/vt/discovery/topology_watcher.go index a2a70a1f2d0..ad459364fa0 100644 --- a/go/vt/discovery/topology_watcher.go +++ b/go/vt/discovery/topology_watcher.go @@ -161,7 +161,7 @@ func (tw *TopologyWatcher) loadTablets() { return default: } - log.Errorf("cannot get tablets for cell: %v: %v", tw.cell, err) + log.Errorf("cannot get tablets for cell:%v: %v", tw.cell, err) return } diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 6efe0fb5e7a..5af4f0cc757 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -415,18 +415,52 @@ func (vs *vstream) alignStreams(ctx context.Context, event *binlogdatapb.VEvent, } } -func (vs *vstream) getCells() []string { +// getCells determines the availability zones to select tablets from. +// 2 scenarios: +// +// 1. No cells specified by the client via the gRPC request. +// Tablets from the local cell of the VTGate AND the cell alias that this cell belongs to will be selected by default +// Local cell will take precedence. +// +// 2. Cells are specified by the client via the gRPC request +// These cels will take precendence over the default local cell and its alias +// and only tablets belonging to the specified cells will be selected. +// If the "local:" tag is passed in as an option in the list of optCells, +// the local cell of the VTGate will take precedence over any other cell specified. +func (vs *vstream) getCells(ctx context.Context) []string { var cells []string if vs.optCells != "" { - for _, cell := range strings.Split(strings.TrimSpace(vs.optCells), ",") { + for i, cell := range strings.Split(strings.TrimSpace(vs.optCells), ",") { + // if the local tag is passed in, we must give local cell priority + // during tablet selection. Append the VTGate's local cell to the list of cells + if i == 0 && cell == "local:" { + cells = append(cells, fmt.Sprintf("local:%s", vs.vsm.cell)) + continue + } cells = append(cells, strings.TrimSpace(cell)) } } + // if no override provided in gRPC request, perform cell alias fallback + if len(cells) == 0 { + log.Info("No cells provided by client, falling back to local cell and alias...\n") + // append the alias this cell belongs to, otherwise appends the vtgate's cell + alias := topo.GetAliasByCell(ctx, vs.ts, vs.vsm.cell) + // an alias was actually found + if alias != vs.vsm.cell { + // send in the vtgate's cell for local cell preference + cells = append(cells, fmt.Sprintf("local:%s", vs.vsm.cell)) + } + cells = append(cells, alias) + } + if len(cells) == 0 { // use the vtgate's cell by default cells = append(cells, vs.vsm.cell) } + + log.Info("Cells for tablet selection: %v\n", cells) + return cells } @@ -451,7 +485,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha var eventss [][]*binlogdatapb.VEvent var err error - cells := vs.getCells() + cells := vs.getCells(ctx) tp, err := discovery.NewTabletPicker(vs.ts, cells, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String()) if err != nil { log.Errorf(err.Error()) diff --git a/go/vt/vtgate/vstream_manager_test.go b/go/vt/vtgate/vstream_manager_test.go index d71008bc6ae..be85c853154 100644 --- a/go/vt/vtgate/vstream_manager_test.go +++ b/go/vt/vtgate/vstream_manager_test.go @@ -274,6 +274,74 @@ func TestVStreamChunks(t *testing.T) { assert.Equal(t, int32(100), ddlCount.Get()) } +func TestVStreamManagerGetCells(t *testing.T) { + type testArgs struct { + name string + optCells string + cellAlias bool + resultCells []string + } + + tcases := []*testArgs{ + {"default-local", "", false, []string{"aa"}}, + {"default-local-cell-alias", "", true, []string{"local:aa", "region1"}}, + {"with-opt-cells", "local:,bb,cc", true, []string{"local:aa", "bb", "cc"}}, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + cell := "aa" + ks := "TestVStream" + _ = createSandbox(ks) + hc := discovery.NewFakeHealthCheck(nil) + st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"}) + vsm := newTestVStreamManager(hc, st, "aa") + ts, _ := st.GetTopoServer() + + for _, tcase := range tcases { + t.Run(tcase.name, func(t *testing.T) { + vs := &vstream{ + vgtid: nil, + tabletType: topodatapb.TabletType_PRIMARY, + optCells: tcase.optCells, + filter: nil, + send: nil, + resolver: nil, + journaler: nil, + minimizeSkew: false, + stopOnReshard: true, + skewTimeoutSeconds: 0, + timestamps: nil, + vsm: vsm, + eventCh: nil, + heartbeatInterval: 0, + ts: ts, + } + + if tcase.cellAlias { + cellsAlias := &topodatapb.CellsAlias{ + Cells: []string{"aa", "bb"}, + } + assert.Nil(t, ts.CreateCellsAlias(context.Background(), "region1", cellsAlias), "failed to create cell alias") + defer cleanupGetCellTests(t, ts, "region1") + } + + got := vs.getCells(ctx) + assert.Equal(t, len(got), len(tcase.resultCells)) + assert.Equal(t, got, tcase.resultCells) + }) + } +} + +func cleanupGetCellTests(t *testing.T, ts *topo.Server, region string) { + if region != "" { + if err := ts.DeleteCellsAlias(context.Background(), region); err != nil { + t.Logf("DeleteCellsAlias(%s) failed: %v", region, err) + } + } +} + func TestVStreamMulti(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel()