diff --git a/go/vt/discovery/tablet_stats_cache.go b/go/vt/discovery/tablet_stats_cache.go index e2fdc2a2f60..8838dfc70ac 100644 --- a/go/vt/discovery/tablet_stats_cache.go +++ b/go/vt/discovery/tablet_stats_cache.go @@ -22,6 +22,7 @@ import ( log "github.com/golang/glog" querypb "github.com/youtube/vitess/go/vt/proto/query" topodatapb "github.com/youtube/vitess/go/vt/proto/topodata" + "github.com/youtube/vitess/go/vt/topo" "github.com/youtube/vitess/go/vt/topo/topoproto" ) @@ -40,10 +41,6 @@ type TabletStatsCache struct { // cell is the cell we are keeping all tablets for. // Note we keep track of all master tablets in all cells. cell string - - // cell to region mapping function - cellToRegion func(cell string) string - // mu protects the entries map. It does not protect individual // entries in the map. mu sync.RWMutex @@ -68,8 +65,8 @@ type tabletStatsCacheEntry struct { // Note we do the registration in this code to guarantee we call // SetListener with sendDownEvents=true, as we need these events // to maintain the integrity of our cache. -func NewTabletStatsCache(hc HealthCheck, cell string, cellToRegion func(cell string) string) *TabletStatsCache { - return newTabletStatsCache(hc, cell, cellToRegion, true /* setListener */) +func NewTabletStatsCache(hc HealthCheck, cell string) *TabletStatsCache { + return newTabletStatsCache(hc, cell, true /* setListener */) } // NewTabletStatsCacheDoNotSetListener is identical to NewTabletStatsCache @@ -79,22 +76,14 @@ func NewTabletStatsCache(hc HealthCheck, cell string, cellToRegion func(cell str // When the caller sets its own listener on "hc", they must make sure that they // set the parameter "sendDownEvents" to "true" or this cache won't properly // remove tablets whose tablet type changes. -func NewTabletStatsCacheDoNotSetListener(cell string, cellToRegion func(cell string) string) *TabletStatsCache { - return newTabletStatsCache(nil, cell, cellToRegion, false /* setListener */) -} - -// UpdateCellsToRegions is mainly for testing purpose, the `cellsToRegions` mapping should be provided in the constructor -func (tc *TabletStatsCache) UpdateCellsToRegions(cellsToRegions map[string]string) { - tc.cellToRegion = func(cell string) string { - return cellsToRegions[cell] - } +func NewTabletStatsCacheDoNotSetListener(cell string) *TabletStatsCache { + return newTabletStatsCache(nil, cell, false /* setListener */) } -func newTabletStatsCache(hc HealthCheck, cell string, cellToRegion func(cell string) string, setListener bool) *TabletStatsCache { +func newTabletStatsCache(hc HealthCheck, cell string, setListener bool) *TabletStatsCache { tc := &TabletStatsCache{ - cell: cell, - cellToRegion: cellToRegion, - entries: make(map[string]map[string]map[topodatapb.TabletType]*tabletStatsCacheEntry), + cell: cell, + entries: make(map[string]map[string]map[topodatapb.TabletType]*tabletStatsCacheEntry), } if setListener { @@ -155,7 +144,7 @@ func (tc *TabletStatsCache) getOrCreateEntry(target *querypb.Target) *tabletStat // StatsUpdate is part of the HealthCheckStatsListener interface. func (tc *TabletStatsCache) StatsUpdate(ts *TabletStats) { - if ts.Target.TabletType != topodatapb.TabletType_MASTER && ts.Tablet.Alias.Cell != tc.cell && tc.cellToRegion(ts.Tablet.Alias.Cell) != tc.cellToRegion(tc.cell) { + if ts.Target.TabletType != topodatapb.TabletType_MASTER && ts.Tablet.Alias.Cell != tc.cell && topo.GetRegionByCell(ts.Tablet.Alias.Cell) != topo.GetRegionByCell(tc.cell) { // this is for a non-master tablet in a different cell and a different region, drop it return } diff --git a/go/vt/discovery/tablet_stats_cache_test.go b/go/vt/discovery/tablet_stats_cache_test.go index 1e132634155..5a849acb303 100644 --- a/go/vt/discovery/tablet_stats_cache_test.go +++ b/go/vt/discovery/tablet_stats_cache_test.go @@ -27,6 +27,13 @@ import ( // TestTabletStatsCache tests the functionality of the TabletStatsCache class. func TestTabletStatsCache(t *testing.T) { + defer topo.UpdateCellsToRegions(map[string]string{}) + topo.UpdateCellsToRegions(map[string]string{ + "cell": "region1", + "cell1": "region1", + "cell2": "region2", + }) + // We want to unit test TabletStatsCache without a full-blown // HealthCheck object, so we can't call NewTabletStatsCache. // So we just construct this object here. @@ -207,4 +214,46 @@ func TestTabletStatsCache(t *testing.T) { if len(a) != 1 || !ts1.DeepEqual(&a[0]) { t.Errorf("unexpected result: %v", a) } + + // add a third tablet as slave in diff cell, same region + tablet3 := topo.NewTablet(12, "cell1", "host3") + ts3 := &TabletStats{ + Key: "t3", + Tablet: tablet3, + Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA}, + Up: true, + Serving: true, + Stats: &querypb.RealtimeStats{SecondsBehindMaster: 10, CpuUsage: 0.2}, + } + tsc.StatsUpdate(ts3) + // check it's there + a = tsc.GetTabletStats("k", "s", topodatapb.TabletType_REPLICA) + if len(a) != 1 { + t.Errorf("unexpected result: %v", a) + } + a = tsc.GetHealthyTabletStats("k", "s", topodatapb.TabletType_REPLICA) + if len(a) != 1 { + t.Errorf("unexpected result: %v", a) + } + + // add a 4th slave tablet in a diff cell, diff region + tablet4 := topo.NewTablet(13, "cell2", "host4") + ts4 := &TabletStats{ + Key: "t4", + Tablet: tablet4, + Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA}, + Up: true, + Serving: true, + Stats: &querypb.RealtimeStats{SecondsBehindMaster: 10, CpuUsage: 0.2}, + } + tsc.StatsUpdate(ts4) + // check it's *NOT* there + a = tsc.GetTabletStats("k", "s", topodatapb.TabletType_REPLICA) + if len(a) != 1 { + t.Errorf("unexpected result: %v", a) + } + a = tsc.GetHealthyTabletStats("k", "s", topodatapb.TabletType_REPLICA) + if len(a) != 1 { + t.Errorf("unexpected result: %v", a) + } } diff --git a/go/vt/discovery/tablet_stats_cache_wait_test.go b/go/vt/discovery/tablet_stats_cache_wait_test.go index b7c7e09b83d..c89f52f2c2c 100644 --- a/go/vt/discovery/tablet_stats_cache_wait_test.go +++ b/go/vt/discovery/tablet_stats_cache_wait_test.go @@ -131,7 +131,7 @@ func TestWaitForTablets(t *testing.T) { createFakeConn(tablet, input) hc := NewHealthCheck(1*time.Millisecond, 1*time.Hour) - tsc := NewTabletStatsCache(hc, "cell", func(cell string) string { return cell }) + tsc := NewTabletStatsCache(hc, "cell") hc.AddTablet(tablet, "") // this should time out diff --git a/go/vt/topo/cell_info.go b/go/vt/topo/cell_info.go index 43a8c5a8283..67d8db61306 100644 --- a/go/vt/topo/cell_info.go +++ b/go/vt/topo/cell_info.go @@ -80,16 +80,6 @@ func (ts *Server) GetCellInfo(ctx context.Context, cell string, strongRead bool) return ci, nil } -// GetRegionByCell gets the region from a given cell -func (ts *Server) GetRegionByCell(ctx context.Context, cell string) (string, error) { - - cellInfo, err := ts.GetCellInfo(ctx, cell, false) - if err == nil { - return cellInfo.Region, nil - } - return "", err -} - // CreateCellInfo creates a new CellInfo with the provided content. func (ts *Server) CreateCellInfo(ctx context.Context, cell string, ci *topodatapb.CellInfo) error { // Pack the content. diff --git a/go/vt/topo/server.go b/go/vt/topo/server.go index 7d600330e1a..fed3930f027 100644 --- a/go/vt/topo/server.go +++ b/go/vt/topo/server.go @@ -178,6 +178,14 @@ type SrvTopoServer interface { WatchSrvVSchema(ctx context.Context, cell string) (*WatchSrvVSchemaData, <-chan *WatchSrvVSchemaData, CancelFunc) } +type cellsToRegionsMap struct { + mu sync.Mutex + // topo server in use + ts *Server + // cellsToRegions contains all cell->region mappings + cellsToRegions map[string]string +} + var ( // topoImplementation is the flag for which implementation to use. topoImplementation = flag.String("topo_implementation", "zookeeper", "the topology implementation to use") @@ -192,6 +200,11 @@ var ( // factories has the factories for the Conn objects. factories = make(map[string]Factory) + + // regions has the cell to region map with mutex protecting it + regions = cellsToRegionsMap{ + cellsToRegions: make(map[string]string), + } ) // RegisterFactory registers a Factory for an implementation for a Server. @@ -247,6 +260,9 @@ func Open() *Server { if err != nil { log.Exitf("Failed to open topo server (%v,%v,%v): %v", *topoImplementation, *topoGlobalServerAddress, *topoGlobalRoot, err) } + regions.mu.Lock() + defer regions.mu.Unlock() + regions.ts = ts return ts } @@ -294,26 +310,30 @@ func (ts *Server) ConnForCell(ctx context.Context, cell string) (Conn, error) { return conn, nil } -// CellToRegionMapper function is a wrapper around topo.Server#GetRegionByCell with caching and error handling -func (ts *Server) CellToRegionMapper() func(cell string) string { - - memoize := make(map[string]string) +// GetRegionByCell returns the region group this `cell` belongs to, if there's none, it returns the `cell` as region. +func GetRegionByCell(cell string) string { + regions.mu.Lock() + defer regions.mu.Unlock() + if region, ok := regions.cellsToRegions[cell]; ok { + return region + } + // lazily get the region from cell info if `regions.ts` is available ctx := context.Background() - - return func(cell string) string { - if ts == nil { - return cell - } - if region, ok := memoize[cell]; ok { - return region + if regions.ts != nil { + info, err := regions.ts.GetCellInfo(ctx, cell, false) + if err == nil && info.Region != "" { + regions.cellsToRegions[cell] = info.Region + return info.Region } - if region, err := ts.GetRegionByCell(ctx, cell); err == nil { - memoize[cell] = region - return region - } - // for backward compatibility, when region isn't available, it's the same as given cell - return cell } + return cell +} + +// UpdateCellsToRegions overwrites the global map built by topo server init, and is meant for testing purpose only. +func UpdateCellsToRegions(cellsToRegions map[string]string) { + regions.mu.Lock() + defer regions.mu.Unlock() + regions.cellsToRegions = cellsToRegions } // Close will close all connections to underlying topo Server. diff --git a/go/vt/vtgate/gateway/discoverygateway.go b/go/vt/vtgate/gateway/discoverygateway.go index b3523070072..ab9822b6452 100644 --- a/go/vt/vtgate/gateway/discoverygateway.go +++ b/go/vt/vtgate/gateway/discoverygateway.go @@ -86,7 +86,7 @@ type discoveryGateway struct { func createDiscoveryGateway(hc discovery.HealthCheck, topoServer *topo.Server, serv topo.SrvTopoServer, cell string, retryCount int) Gateway { dg := &discoveryGateway{ hc: hc, - tsc: discovery.NewTabletStatsCacheDoNotSetListener(cell, topoServer.CellToRegionMapper()), + tsc: discovery.NewTabletStatsCacheDoNotSetListener(cell), topoServer: topoServer, srvTopoServer: serv, localCell: cell, @@ -277,11 +277,11 @@ func shuffleTablets(cell string, tablets []discovery.TabletStats) { sameCell, diffCell, sameCellMax := 0, 0, -1 length := len(tablets) - //move all same cell tablets to the front, this is O(n) + // move all same cell tablets to the front, this is O(n) for { sameCellMax = diffCell - 1 - sameCell := nextTablet(cell, tablets, sameCell, length, true) - diffCell := nextTablet(cell, tablets, diffCell, length, false) + sameCell = nextTablet(cell, tablets, sameCell, length, true) + diffCell = nextTablet(cell, tablets, diffCell, length, false) // either no more diffs or no more same cells should stop the iteration if sameCell < 0 || diffCell < 0 { break diff --git a/go/vt/vtgate/gateway/discoverygateway_test.go b/go/vt/vtgate/gateway/discoverygateway_test.go index f93d3d603fb..24eceb7b61d 100644 --- a/go/vt/vtgate/gateway/discoverygateway_test.go +++ b/go/vt/vtgate/gateway/discoverygateway_test.go @@ -127,12 +127,91 @@ func TestDiscoveryGatewayGetTablets(t *testing.T) { } } +func TestShuffleTablets(t *testing.T) { + defer topo.UpdateCellsToRegions(map[string]string{}) + topo.UpdateCellsToRegions(map[string]string{ + "cell1": "region1", + "cell2": "region1", + }) + + ts1 := discovery.TabletStats{ + Key: "t1", + Tablet: topo.NewTablet(10, "cell1", "host1"), + Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA}, + Up: true, + Serving: true, + Stats: &querypb.RealtimeStats{SecondsBehindMaster: 1, CpuUsage: 0.2}, + } + + ts2 := discovery.TabletStats{ + Key: "t2", + Tablet: topo.NewTablet(10, "cell1", "host2"), + Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA}, + Up: true, + Serving: true, + Stats: &querypb.RealtimeStats{SecondsBehindMaster: 1, CpuUsage: 0.2}, + } + + ts3 := discovery.TabletStats{ + Key: "t3", + Tablet: topo.NewTablet(10, "cell2", "host3"), + Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA}, + Up: true, + Serving: true, + Stats: &querypb.RealtimeStats{SecondsBehindMaster: 1, CpuUsage: 0.2}, + } + + ts4 := discovery.TabletStats{ + Key: "t4", + Tablet: topo.NewTablet(10, "cell2", "host4"), + Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA}, + Up: true, + Serving: true, + Stats: &querypb.RealtimeStats{SecondsBehindMaster: 1, CpuUsage: 0.2}, + } + + sameCellTablets := []discovery.TabletStats{ts1, ts2} + diffCellTablets := []discovery.TabletStats{ts3, ts4} + mixedTablets := []discovery.TabletStats{ts1, ts2, ts3, ts4} + // repeat shuffling 10 times and everytime the same cell tablets should be in the front + for i := 0; i < 10; i++ { + shuffleTablets("cell1", sameCellTablets) + if (len(sameCellTablets) != 2) || + (sameCellTablets[0].Key != "t1" && sameCellTablets[0].Key != "t2") || + (sameCellTablets[1].Key != "t1" && sameCellTablets[1].Key != "t2") { + t.Errorf("should shuffle in only same cell tablets, got %+v", sameCellTablets) + } + + shuffleTablets("cell1", diffCellTablets) + if (len(diffCellTablets) != 2) || + (diffCellTablets[0].Key != "t3" && diffCellTablets[0].Key != "t4") || + (diffCellTablets[1].Key != "t3" && diffCellTablets[1].Key != "t4") { + t.Errorf("should shuffle in only diff cell tablets, got %+v", diffCellTablets) + } + + shuffleTablets("cell1", mixedTablets) + if len(mixedTablets) != 4 { + t.Errorf("should have 4 tablets, got %+v", mixedTablets) + } + + if (mixedTablets[0].Key != "t1" && mixedTablets[0].Key != "t2") || + (mixedTablets[1].Key != "t1" && mixedTablets[1].Key != "t2") { + t.Errorf("should have same cell tablets in the front, got %+v", mixedTablets) + } + + if (mixedTablets[2].Key != "t3" && mixedTablets[2].Key != "t4") || + (mixedTablets[3].Key != "t3" && mixedTablets[3].Key != "t4") { + t.Errorf("should have diff cell tablets in the rear, got %+v", mixedTablets) + } + } +} + func TestDiscoveryGatewayGetTabletsWithRegion(t *testing.T) { keyspace := "ks" shard := "0" hc := discovery.NewFakeHealthCheck() dg := createDiscoveryGateway(hc, nil, nil, "local", 2).(*discoveryGateway) - dg.tsc.UpdateCellsToRegions(map[string]string{ + topo.UpdateCellsToRegions(map[string]string{ "local-west": "local", "local-east": "local", "local": "local", diff --git a/go/vt/vttablet/tabletmanager/binlog_players.go b/go/vt/vttablet/tabletmanager/binlog_players.go index c1ce2c63c92..872dfb0d405 100644 --- a/go/vt/vttablet/tabletmanager/binlog_players.go +++ b/go/vt/vttablet/tabletmanager/binlog_players.go @@ -131,7 +131,7 @@ func newBinlogPlayerController(ts *topo.Server, vtClientFactory func() binlogpla // of whether the BinlogPlayerController is Start()'d or Stop()'d. // Use Close() after Stop() to finally close them and free their resources. healthCheck: healthCheck, - tabletStatsCache: discovery.NewTabletStatsCache(healthCheck, cell, ts.CellToRegionMapper()), + tabletStatsCache: discovery.NewTabletStatsCache(healthCheck, cell), shardReplicationWatcher: discovery.NewShardReplicationWatcher(ts, healthCheck, cell, sourceShard.Keyspace, sourceShard.Shard, *healthCheckTopologyRefresh, discovery.DefaultTopoReadConcurrency), } } diff --git a/go/vt/worker/legacy_split_clone.go b/go/vt/worker/legacy_split_clone.go index 36fe7ab25d2..745cac36d12 100644 --- a/go/vt/worker/legacy_split_clone.go +++ b/go/vt/worker/legacy_split_clone.go @@ -374,7 +374,7 @@ func (scw *LegacySplitCloneWorker) findTargets(ctx context.Context) error { // Initialize healthcheck and add destination shards to it. scw.healthCheck = discovery.NewHealthCheck(*healthcheckRetryDelay, *healthCheckTimeout) - scw.tsc = discovery.NewTabletStatsCache(scw.healthCheck, scw.cell, scw.wr.TopoServer().CellToRegionMapper()) + scw.tsc = discovery.NewTabletStatsCache(scw.healthCheck, scw.cell) for _, si := range scw.destinationShards { watcher := discovery.NewShardReplicationWatcher(scw.wr.TopoServer(), scw.healthCheck, scw.cell, si.Keyspace(), si.ShardName(), diff --git a/go/vt/worker/split_clone.go b/go/vt/worker/split_clone.go index 4e367a19311..47987550615 100644 --- a/go/vt/worker/split_clone.go +++ b/go/vt/worker/split_clone.go @@ -555,7 +555,7 @@ func (scw *SplitCloneWorker) init(ctx context.Context) error { // Initialize healthcheck and add destination shards to it. scw.healthCheck = discovery.NewHealthCheck(*healthcheckRetryDelay, *healthCheckTimeout) - scw.tsc = discovery.NewTabletStatsCacheDoNotSetListener(scw.cell, scw.wr.TopoServer().CellToRegionMapper()) + scw.tsc = discovery.NewTabletStatsCacheDoNotSetListener(scw.cell) // We set sendDownEvents=true because it's required by TabletStatsCache. scw.healthCheck.SetListener(scw, true /* sendDownEvents */) diff --git a/go/vt/worker/topo_utils.go b/go/vt/worker/topo_utils.go index 9d054b92a97..edb3e9eafcb 100644 --- a/go/vt/worker/topo_utils.go +++ b/go/vt/worker/topo_utils.go @@ -50,7 +50,7 @@ func FindHealthyRdonlyTablet(ctx context.Context, wr *wrangler.Wrangler, tsc *di if tsc == nil { // No healthcheck instance provided. Create one. healthCheck := discovery.NewHealthCheck(*healthcheckRetryDelay, *healthCheckTimeout) - tsc = discovery.NewTabletStatsCache(healthCheck, cell, wr.TopoServer().CellToRegionMapper()) + tsc = discovery.NewTabletStatsCache(healthCheck, cell) watcher := discovery.NewShardReplicationWatcher(wr.TopoServer(), healthCheck, cell, keyspace, shard, *healthCheckTopologyRefresh, discovery.DefaultTopoReadConcurrency) defer watcher.Stop() defer healthCheck.Close() diff --git a/go/vt/wrangler/keyspace.go b/go/vt/wrangler/keyspace.go index e87a82c89be..699a9dea0a7 100644 --- a/go/vt/wrangler/keyspace.go +++ b/go/vt/wrangler/keyspace.go @@ -507,7 +507,7 @@ func (wr *Wrangler) waitForDrainInCell(ctx context.Context, cell, keyspace, shar // Create the healthheck module, with a cache. hc := discovery.NewHealthCheck(healthcheckRetryDelay, healthCheckTimeout) defer hc.Close() - tsc := discovery.NewTabletStatsCache(hc, cell, wr.TopoServer().CellToRegionMapper()) + tsc := discovery.NewTabletStatsCache(hc, cell) // Create a tablet watcher. watcher := discovery.NewShardReplicationWatcher(wr.TopoServer(), hc, cell, keyspace, shard, healthCheckTopologyRefresh, discovery.DefaultTopoReadConcurrency)