Skip to content
This repository was archived by the owner on Dec 16, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions go/vt/discovery/topology_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,3 +437,59 @@ func (fbs *FilterByShard) isIncluded(tablet *topodatapb.Tablet) bool {
}
return false
}

// FilterByKeyspace is a TabletRecorder filter that filters tablets by
// keyspace
type FilterByKeyspace struct {
tr TabletRecorder

keyspaces map[string]bool
}

// NewFilterByKeyspace creates a new FilterByKeyspace on top of an existing
// TabletRecorder. Each filter is a keyspace entry. All tablets that match
// a keyspace will be forwarded to the underlying TabletRecorder.
func NewFilterByKeyspace(tr TabletRecorder, selectedKeyspaces []string) *FilterByKeyspace {
m := make(map[string]bool)
for _, keyspace := range selectedKeyspaces {
m[keyspace] = true
}

return &FilterByKeyspace{
tr: tr,
keyspaces: m,
}
}

// AddTablet is part of the TabletRecorder interface.
func (fbk *FilterByKeyspace) AddTablet(tablet *topodatapb.Tablet, name string) {
if fbk.isIncluded(tablet) {
fbk.tr.AddTablet(tablet, name)
}
}

// RemoveTablet is part of the TabletRecorder interface.
func (fbk *FilterByKeyspace) RemoveTablet(tablet *topodatapb.Tablet) {
if fbk.isIncluded(tablet) {
fbk.tr.RemoveTablet(tablet)
}
}

// ReplaceTablet is part of the TabletRecorder interface.
func (fbk *FilterByKeyspace) ReplaceTablet(old *topodatapb.Tablet, new *topodatapb.Tablet, name string) {
if old.Keyspace != new.Keyspace {
log.Errorf("Error replacing old tablet in %v with new tablet in %v", old.Keyspace, new.Keyspace)
return
}

if fbk.isIncluded(new) {
fbk.tr.ReplaceTablet(old, new, name)
}
}

// isIncluded returns true if the tablet's keyspace should be
// forwarded to the underlying TabletRecorder.
func (fbk *FilterByKeyspace) isIncluded(tablet *topodatapb.Tablet) bool {
_, exist := fbk.keyspaces[tablet.Keyspace]
return exist
}
95 changes: 95 additions & 0 deletions go/vt/discovery/topology_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package discovery

import (
"math/rand"
"testing"
"time"

Expand Down Expand Up @@ -410,3 +411,97 @@ func TestFilterByShard(t *testing.T) {
}
}
}

var (
testFilterByKeyspace = []struct {
keyspace string
expected bool
}{
{"ks1", true},
{"ks2", true},
{"ks3", false},
{"ks4", true},
{"ks5", true},
{"ks6", false},
{"ks7", false},
}
testKeyspacesToWatch = []string{"ks1", "ks2", "ks4", "ks5"}
testCell = "testCell"
testShard = "testShard"
testHostName = "testHostName"
)

func TestFilterByKeyspace(t *testing.T) {
hc := NewFakeHealthCheck()
tr := NewFilterByKeyspace(hc, testKeyspacesToWatch)
ts := memorytopo.NewServer(testCell)
tw := NewCellTabletsWatcher(context.Background(), ts, tr, testCell, 10*time.Minute, true, 5)

for _, test := range testFilterByKeyspace {
// Add a new tablet to the topology.
port := rand.Int31n(1000)
tablet := &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: testCell,
Uid: rand.Uint32(),
},
Hostname: testHostName,
PortMap: map[string]int32{
"vt": port,
},
Keyspace: test.keyspace,
Shard: testShard,
}

got := tr.isIncluded(tablet)
if got != test.expected {
t.Errorf("isIncluded(%v) for keyspace %v returned %v but expected %v", test.keyspace, test.keyspace, got, test.expected)
}

if err := ts.CreateTablet(context.Background(), tablet); err != nil {
t.Errorf("CreateTablet failed: %v", err)
}

tw.loadTablets()
key := TabletToMapKey(tablet)
allTablets := hc.GetAllTablets()

if _, ok := allTablets[key]; ok != test.expected && proto.Equal(allTablets[key], tablet) != test.expected {
t.Errorf("Error adding tablet - got %v; want %v", ok, test.expected)
}

// Replace the tablet we added above
tabletReplacement := &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: testCell,
Uid: rand.Uint32(),
},
Hostname: testHostName,
PortMap: map[string]int32{
"vt": port,
},
Keyspace: test.keyspace,
Shard: testShard,
}
got = tr.isIncluded(tabletReplacement)
if got != test.expected {
t.Errorf("isIncluded(%v) for keyspace %v returned %v but expected %v", test.keyspace, test.keyspace, got, test.expected)
}
if err := ts.CreateTablet(context.Background(), tabletReplacement); err != nil {
t.Errorf("CreateTablet failed: %v", err)
}

tw.loadTablets()
key = TabletToMapKey(tabletReplacement)
allTablets = hc.GetAllTablets()

if _, ok := allTablets[key]; ok != test.expected && proto.Equal(allTablets[key], tabletReplacement) != test.expected {
t.Errorf("Error replacing tablet - got %v; want %v", ok, test.expected)
}

// Delete the tablet
if err := ts.DeleteTablet(context.Background(), tabletReplacement.Alias); err != nil {
t.Fatalf("DeleteTablet failed: %v", err)
}
}
}
2 changes: 2 additions & 0 deletions go/vt/vtgate/gateway/discoverygateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ func createDiscoveryGateway(ctx context.Context, hc discovery.HealthCheck, serv
log.Exitf("Cannot parse tablet_filters parameter: %v", err)
}
tr = fbs
} else if len(KeyspacesToWatch) > 0 {
tr = discovery.NewFilterByKeyspace(dg.hc, KeyspacesToWatch)
}

ctw := discovery.NewCellTabletsWatcher(ctx, topoServer, tr, c, *refreshInterval, *refreshKnownTablets, *topoReadConcurrency)
Expand Down