Skip to content
This repository was archived by the owner on Dec 16, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 9 additions & 20 deletions go/vt/discovery/tablet_stats_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
log "github.com/golang/glog"
querypb "github.com/youtube/vitess/go/vt/proto/query"
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/topo/topoproto"
)

Expand All @@ -40,10 +41,6 @@ type TabletStatsCache struct {
// cell is the cell we are keeping all tablets for.
// Note we keep track of all master tablets in all cells.
cell string

// cell to region mapping function
cellToRegion func(cell string) string

// mu protects the entries map. It does not protect individual
// entries in the map.
mu sync.RWMutex
Expand All @@ -68,8 +65,8 @@ type tabletStatsCacheEntry struct {
// Note we do the registration in this code to guarantee we call
// SetListener with sendDownEvents=true, as we need these events
// to maintain the integrity of our cache.
func NewTabletStatsCache(hc HealthCheck, cell string, cellToRegion func(cell string) string) *TabletStatsCache {
return newTabletStatsCache(hc, cell, cellToRegion, true /* setListener */)
func NewTabletStatsCache(hc HealthCheck, cell string) *TabletStatsCache {
return newTabletStatsCache(hc, cell, true /* setListener */)
}

// NewTabletStatsCacheDoNotSetListener is identical to NewTabletStatsCache
Expand All @@ -79,22 +76,14 @@ func NewTabletStatsCache(hc HealthCheck, cell string, cellToRegion func(cell str
// When the caller sets its own listener on "hc", they must make sure that they
// set the parameter "sendDownEvents" to "true" or this cache won't properly
// remove tablets whose tablet type changes.
func NewTabletStatsCacheDoNotSetListener(cell string, cellToRegion func(cell string) string) *TabletStatsCache {
return newTabletStatsCache(nil, cell, cellToRegion, false /* setListener */)
}

// UpdateCellsToRegions is mainly for testing purpose, the `cellsToRegions` mapping should be provided in the constructor
func (tc *TabletStatsCache) UpdateCellsToRegions(cellsToRegions map[string]string) {
tc.cellToRegion = func(cell string) string {
return cellsToRegions[cell]
}
func NewTabletStatsCacheDoNotSetListener(cell string) *TabletStatsCache {
return newTabletStatsCache(nil, cell, false /* setListener */)
}

func newTabletStatsCache(hc HealthCheck, cell string, cellToRegion func(cell string) string, setListener bool) *TabletStatsCache {
func newTabletStatsCache(hc HealthCheck, cell string, setListener bool) *TabletStatsCache {
tc := &TabletStatsCache{
cell: cell,
cellToRegion: cellToRegion,
entries: make(map[string]map[string]map[topodatapb.TabletType]*tabletStatsCacheEntry),
cell: cell,
entries: make(map[string]map[string]map[topodatapb.TabletType]*tabletStatsCacheEntry),
}

if setListener {
Expand Down Expand Up @@ -155,7 +144,7 @@ func (tc *TabletStatsCache) getOrCreateEntry(target *querypb.Target) *tabletStat

// StatsUpdate is part of the HealthCheckStatsListener interface.
func (tc *TabletStatsCache) StatsUpdate(ts *TabletStats) {
if ts.Target.TabletType != topodatapb.TabletType_MASTER && ts.Tablet.Alias.Cell != tc.cell && tc.cellToRegion(ts.Tablet.Alias.Cell) != tc.cellToRegion(tc.cell) {
if ts.Target.TabletType != topodatapb.TabletType_MASTER && ts.Tablet.Alias.Cell != tc.cell && topo.GetRegionByCell(ts.Tablet.Alias.Cell) != topo.GetRegionByCell(tc.cell) {
// this is for a non-master tablet in a different cell and a different region, drop it
return
}
Expand Down
49 changes: 49 additions & 0 deletions go/vt/discovery/tablet_stats_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ import (

// TestTabletStatsCache tests the functionality of the TabletStatsCache class.
func TestTabletStatsCache(t *testing.T) {
defer topo.UpdateCellsToRegions(map[string]string{})
topo.UpdateCellsToRegions(map[string]string{
"cell": "region1",
"cell1": "region1",
"cell2": "region2",
})

// We want to unit test TabletStatsCache without a full-blown
// HealthCheck object, so we can't call NewTabletStatsCache.
// So we just construct this object here.
Expand Down Expand Up @@ -207,4 +214,46 @@ func TestTabletStatsCache(t *testing.T) {
if len(a) != 1 || !ts1.DeepEqual(&a[0]) {
t.Errorf("unexpected result: %v", a)
}

// add a third tablet as slave in diff cell, same region
tablet3 := topo.NewTablet(12, "cell1", "host3")
ts3 := &TabletStats{
Key: "t3",
Tablet: tablet3,
Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA},
Up: true,
Serving: true,
Stats: &querypb.RealtimeStats{SecondsBehindMaster: 10, CpuUsage: 0.2},
}
tsc.StatsUpdate(ts3)
// check it's there
a = tsc.GetTabletStats("k", "s", topodatapb.TabletType_REPLICA)
if len(a) != 1 {
t.Errorf("unexpected result: %v", a)
}
a = tsc.GetHealthyTabletStats("k", "s", topodatapb.TabletType_REPLICA)
if len(a) != 1 {
t.Errorf("unexpected result: %v", a)
}

// add a 4th slave tablet in a diff cell, diff region
tablet4 := topo.NewTablet(13, "cell2", "host4")
ts4 := &TabletStats{
Key: "t4",
Tablet: tablet4,
Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA},
Up: true,
Serving: true,
Stats: &querypb.RealtimeStats{SecondsBehindMaster: 10, CpuUsage: 0.2},
}
tsc.StatsUpdate(ts4)
// check it's *NOT* there
a = tsc.GetTabletStats("k", "s", topodatapb.TabletType_REPLICA)
if len(a) != 1 {
t.Errorf("unexpected result: %v", a)
}
a = tsc.GetHealthyTabletStats("k", "s", topodatapb.TabletType_REPLICA)
if len(a) != 1 {
t.Errorf("unexpected result: %v", a)
}
}
2 changes: 1 addition & 1 deletion go/vt/discovery/tablet_stats_cache_wait_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func TestWaitForTablets(t *testing.T) {
createFakeConn(tablet, input)

hc := NewHealthCheck(1*time.Millisecond, 1*time.Hour)
tsc := NewTabletStatsCache(hc, "cell", func(cell string) string { return cell })
tsc := NewTabletStatsCache(hc, "cell")
hc.AddTablet(tablet, "")

// this should time out
Expand Down
10 changes: 0 additions & 10 deletions go/vt/topo/cell_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,6 @@ func (ts *Server) GetCellInfo(ctx context.Context, cell string, strongRead bool)
return ci, nil
}

// GetRegionByCell gets the region from a given cell
func (ts *Server) GetRegionByCell(ctx context.Context, cell string) (string, error) {

cellInfo, err := ts.GetCellInfo(ctx, cell, false)
if err == nil {
return cellInfo.Region, nil
}
return "", err
}

// CreateCellInfo creates a new CellInfo with the provided content.
func (ts *Server) CreateCellInfo(ctx context.Context, cell string, ci *topodatapb.CellInfo) error {
// Pack the content.
Expand Down
54 changes: 37 additions & 17 deletions go/vt/topo/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,14 @@ type SrvTopoServer interface {
WatchSrvVSchema(ctx context.Context, cell string) (*WatchSrvVSchemaData, <-chan *WatchSrvVSchemaData, CancelFunc)
}

type cellsToRegionsMap struct {
mu sync.Mutex
// topo server in use
ts *Server
// cellsToRegions contains all cell->region mappings
cellsToRegions map[string]string
}

var (
// topoImplementation is the flag for which implementation to use.
topoImplementation = flag.String("topo_implementation", "zookeeper", "the topology implementation to use")
Expand All @@ -192,6 +200,11 @@ var (

// factories has the factories for the Conn objects.
factories = make(map[string]Factory)

// regions has the cell to region map with mutex protecting it
regions = cellsToRegionsMap{
cellsToRegions: make(map[string]string),
}
)

// RegisterFactory registers a Factory for an implementation for a Server.
Expand Down Expand Up @@ -247,6 +260,9 @@ func Open() *Server {
if err != nil {
log.Exitf("Failed to open topo server (%v,%v,%v): %v", *topoImplementation, *topoGlobalServerAddress, *topoGlobalRoot, err)
}
regions.mu.Lock()
defer regions.mu.Unlock()
regions.ts = ts
return ts
}

Expand Down Expand Up @@ -294,26 +310,30 @@ func (ts *Server) ConnForCell(ctx context.Context, cell string) (Conn, error) {
return conn, nil
}

// CellToRegionMapper function is a wrapper around topo.Server#GetRegionByCell with caching and error handling
func (ts *Server) CellToRegionMapper() func(cell string) string {

memoize := make(map[string]string)
// GetRegionByCell returns the region group this `cell` belongs to, if there's none, it returns the `cell` as region.
func GetRegionByCell(cell string) string {
regions.mu.Lock()
defer regions.mu.Unlock()
if region, ok := regions.cellsToRegions[cell]; ok {
return region
}
// lazily get the region from cell info if `regions.ts` is available
ctx := context.Background()

return func(cell string) string {
if ts == nil {
return cell
}
if region, ok := memoize[cell]; ok {
return region
if regions.ts != nil {
info, err := regions.ts.GetCellInfo(ctx, cell, false)
if err == nil && info.Region != "" {
regions.cellsToRegions[cell] = info.Region
return info.Region
}
if region, err := ts.GetRegionByCell(ctx, cell); err == nil {
memoize[cell] = region
return region
}
// for backward compatibility, when region isn't available, it's the same as given cell
return cell
}
return cell
}

// UpdateCellsToRegions overwrites the global map built by topo server init, and is meant for testing purpose only.
func UpdateCellsToRegions(cellsToRegions map[string]string) {
regions.mu.Lock()
defer regions.mu.Unlock()
regions.cellsToRegions = cellsToRegions
}

// Close will close all connections to underlying topo Server.
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vtgate/gateway/discoverygateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ type discoveryGateway struct {
func createDiscoveryGateway(hc discovery.HealthCheck, topoServer *topo.Server, serv topo.SrvTopoServer, cell string, retryCount int) Gateway {
dg := &discoveryGateway{
hc: hc,
tsc: discovery.NewTabletStatsCacheDoNotSetListener(cell, topoServer.CellToRegionMapper()),
tsc: discovery.NewTabletStatsCacheDoNotSetListener(cell),
topoServer: topoServer,
srvTopoServer: serv,
localCell: cell,
Expand Down Expand Up @@ -277,11 +277,11 @@ func shuffleTablets(cell string, tablets []discovery.TabletStats) {
sameCell, diffCell, sameCellMax := 0, 0, -1
length := len(tablets)

//move all same cell tablets to the front, this is O(n)
// move all same cell tablets to the front, this is O(n)
for {
sameCellMax = diffCell - 1
sameCell := nextTablet(cell, tablets, sameCell, length, true)
diffCell := nextTablet(cell, tablets, diffCell, length, false)
sameCell = nextTablet(cell, tablets, sameCell, length, true)
diffCell = nextTablet(cell, tablets, diffCell, length, false)
// either no more diffs or no more same cells should stop the iteration
if sameCell < 0 || diffCell < 0 {
break
Expand Down
81 changes: 80 additions & 1 deletion go/vt/vtgate/gateway/discoverygateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,91 @@ func TestDiscoveryGatewayGetTablets(t *testing.T) {
}
}

func TestShuffleTablets(t *testing.T) {
defer topo.UpdateCellsToRegions(map[string]string{})
topo.UpdateCellsToRegions(map[string]string{
"cell1": "region1",
"cell2": "region1",
})

ts1 := discovery.TabletStats{
Key: "t1",
Tablet: topo.NewTablet(10, "cell1", "host1"),
Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA},
Up: true,
Serving: true,
Stats: &querypb.RealtimeStats{SecondsBehindMaster: 1, CpuUsage: 0.2},
}

ts2 := discovery.TabletStats{
Key: "t2",
Tablet: topo.NewTablet(10, "cell1", "host2"),
Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA},
Up: true,
Serving: true,
Stats: &querypb.RealtimeStats{SecondsBehindMaster: 1, CpuUsage: 0.2},
}

ts3 := discovery.TabletStats{
Key: "t3",
Tablet: topo.NewTablet(10, "cell2", "host3"),
Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA},
Up: true,
Serving: true,
Stats: &querypb.RealtimeStats{SecondsBehindMaster: 1, CpuUsage: 0.2},
}

ts4 := discovery.TabletStats{
Key: "t4",
Tablet: topo.NewTablet(10, "cell2", "host4"),
Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA},
Up: true,
Serving: true,
Stats: &querypb.RealtimeStats{SecondsBehindMaster: 1, CpuUsage: 0.2},
}

sameCellTablets := []discovery.TabletStats{ts1, ts2}
diffCellTablets := []discovery.TabletStats{ts3, ts4}
mixedTablets := []discovery.TabletStats{ts1, ts2, ts3, ts4}
// repeat shuffling 10 times and everytime the same cell tablets should be in the front
for i := 0; i < 10; i++ {
shuffleTablets("cell1", sameCellTablets)
if (len(sameCellTablets) != 2) ||
(sameCellTablets[0].Key != "t1" && sameCellTablets[0].Key != "t2") ||
(sameCellTablets[1].Key != "t1" && sameCellTablets[1].Key != "t2") {
t.Errorf("should shuffle in only same cell tablets, got %+v", sameCellTablets)
}

shuffleTablets("cell1", diffCellTablets)
if (len(diffCellTablets) != 2) ||
(diffCellTablets[0].Key != "t3" && diffCellTablets[0].Key != "t4") ||
(diffCellTablets[1].Key != "t3" && diffCellTablets[1].Key != "t4") {
t.Errorf("should shuffle in only diff cell tablets, got %+v", diffCellTablets)
}

shuffleTablets("cell1", mixedTablets)
if len(mixedTablets) != 4 {
t.Errorf("should have 4 tablets, got %+v", mixedTablets)
}

if (mixedTablets[0].Key != "t1" && mixedTablets[0].Key != "t2") ||
(mixedTablets[1].Key != "t1" && mixedTablets[1].Key != "t2") {
t.Errorf("should have same cell tablets in the front, got %+v", mixedTablets)
}

if (mixedTablets[2].Key != "t3" && mixedTablets[2].Key != "t4") ||
(mixedTablets[3].Key != "t3" && mixedTablets[3].Key != "t4") {
t.Errorf("should have diff cell tablets in the rear, got %+v", mixedTablets)
}
}
}

func TestDiscoveryGatewayGetTabletsWithRegion(t *testing.T) {
keyspace := "ks"
shard := "0"
hc := discovery.NewFakeHealthCheck()
dg := createDiscoveryGateway(hc, nil, nil, "local", 2).(*discoveryGateway)
dg.tsc.UpdateCellsToRegions(map[string]string{
topo.UpdateCellsToRegions(map[string]string{
"local-west": "local",
"local-east": "local",
"local": "local",
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/binlog_players.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func newBinlogPlayerController(ts *topo.Server, vtClientFactory func() binlogpla
// of whether the BinlogPlayerController is Start()'d or Stop()'d.
// Use Close() after Stop() to finally close them and free their resources.
healthCheck: healthCheck,
tabletStatsCache: discovery.NewTabletStatsCache(healthCheck, cell, ts.CellToRegionMapper()),
tabletStatsCache: discovery.NewTabletStatsCache(healthCheck, cell),
shardReplicationWatcher: discovery.NewShardReplicationWatcher(ts, healthCheck, cell, sourceShard.Keyspace, sourceShard.Shard, *healthCheckTopologyRefresh, discovery.DefaultTopoReadConcurrency),
}
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/worker/legacy_split_clone.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ func (scw *LegacySplitCloneWorker) findTargets(ctx context.Context) error {

// Initialize healthcheck and add destination shards to it.
scw.healthCheck = discovery.NewHealthCheck(*healthcheckRetryDelay, *healthCheckTimeout)
scw.tsc = discovery.NewTabletStatsCache(scw.healthCheck, scw.cell, scw.wr.TopoServer().CellToRegionMapper())
scw.tsc = discovery.NewTabletStatsCache(scw.healthCheck, scw.cell)
for _, si := range scw.destinationShards {
watcher := discovery.NewShardReplicationWatcher(scw.wr.TopoServer(), scw.healthCheck,
scw.cell, si.Keyspace(), si.ShardName(),
Expand Down
2 changes: 1 addition & 1 deletion go/vt/worker/split_clone.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ func (scw *SplitCloneWorker) init(ctx context.Context) error {

// Initialize healthcheck and add destination shards to it.
scw.healthCheck = discovery.NewHealthCheck(*healthcheckRetryDelay, *healthCheckTimeout)
scw.tsc = discovery.NewTabletStatsCacheDoNotSetListener(scw.cell, scw.wr.TopoServer().CellToRegionMapper())
scw.tsc = discovery.NewTabletStatsCacheDoNotSetListener(scw.cell)
// We set sendDownEvents=true because it's required by TabletStatsCache.
scw.healthCheck.SetListener(scw, true /* sendDownEvents */)

Expand Down
2 changes: 1 addition & 1 deletion go/vt/worker/topo_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func FindHealthyRdonlyTablet(ctx context.Context, wr *wrangler.Wrangler, tsc *di
if tsc == nil {
// No healthcheck instance provided. Create one.
healthCheck := discovery.NewHealthCheck(*healthcheckRetryDelay, *healthCheckTimeout)
tsc = discovery.NewTabletStatsCache(healthCheck, cell, wr.TopoServer().CellToRegionMapper())
tsc = discovery.NewTabletStatsCache(healthCheck, cell)
watcher := discovery.NewShardReplicationWatcher(wr.TopoServer(), healthCheck, cell, keyspace, shard, *healthCheckTopologyRefresh, discovery.DefaultTopoReadConcurrency)
defer watcher.Stop()
defer healthCheck.Close()
Expand Down
Loading