diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index 2795302d1e1..67f1199b548 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -615,8 +615,30 @@ func (hc *HealthCheckImpl) WaitForAllServingTablets(ctx context.Context, targets return hc.waitForTablets(ctx, targets, true) } +// FilterTargetsByKeyspaces only returns the targets that are part of the provided keyspaces +func FilterTargetsByKeyspaces(keyspaces []string, targets []*query.Target) []*query.Target { + filteredTargets := make([]*query.Target, 0) + + // Keep them all if there are no keyspaces to watch + if len(KeyspacesToWatch) == 0 { + return append(filteredTargets, targets...) + } + + // Let's remove from the target shards that are not in the keyspaceToWatch list. + for _, target := range targets { + for _, keyspaceToWatch := range keyspaces { + if target.Keyspace == keyspaceToWatch { + filteredTargets = append(filteredTargets, target) + } + } + } + return filteredTargets +} + // waitForTablets is the internal method that polls for tablets. func (hc *HealthCheckImpl) waitForTablets(ctx context.Context, targets []*query.Target, requireServing bool) error { + targets = FilterTargetsByKeyspaces(KeyspacesToWatch, targets) + for { // We nil targets as we find them. allPresent := true @@ -648,6 +670,11 @@ func (hc *HealthCheckImpl) waitForTablets(ctx context.Context, targets []*query. select { case <-ctx.Done(): timer.Stop() + for _, target := range targets { + if target != nil { + log.Infof("couldn't find tablets for target: %v", target) + } + } return ctx.Err() case <-timer.C: } diff --git a/go/vt/discovery/healthcheck_test.go b/go/vt/discovery/healthcheck_test.go index 98bb5c57aea..a4b2483d47a 100644 --- a/go/vt/discovery/healthcheck_test.go +++ b/go/vt/discovery/healthcheck_test.go @@ -44,6 +44,7 @@ import ( "vitess.io/vitess/go/vt/vttablet/queryservice" "vitess.io/vitess/go/vt/vttablet/tabletconn" + "vitess.io/vitess/go/vt/proto/query" querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) @@ -425,6 +426,94 @@ func TestHealthCheckTimeout(t *testing.T) { mustMatch(t, want, result, "Wrong TabletHealth data") } +func TestWaitForAllServingTablets(t *testing.T) { + ts := memorytopo.NewServer("cell") + hc := createTestHc(ts) + defer hc.Close() + tablet := createTestTablet(0, "cell", "a") + tablet.Type = topodatapb.TabletType_REPLICA + targets := []*query.Target{ + { + Keyspace: tablet.Keyspace, + Shard: tablet.Shard, + TabletType: tablet.Type, + }, + } + input := make(chan *querypb.StreamHealthResponse) + createFakeConn(tablet, input) + + // create a channel and subscribe to healthcheck + resultChan := hc.Subscribe() + hc.AddTablet(tablet) + // there will be a first result, get and discard it + <-resultChan + // empty + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + err := hc.WaitForAllServingTablets(ctx, targets) + assert.NotNil(t, err, "error should not be nil") + + shr := &querypb.StreamHealthResponse{ + TabletAlias: tablet.Alias, + Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA}, + Serving: true, + TabletExternallyReparentedTimestamp: 0, + RealtimeStats: &querypb.RealtimeStats{SecondsBehindMaster: 1, CpuUsage: 0.2}, + } + + input <- shr + <-resultChan + // // check it's there + + targets = []*query.Target{ + { + Keyspace: tablet.Keyspace, + Shard: tablet.Shard, + TabletType: tablet.Type, + }, + } + + err = hc.WaitForAllServingTablets(ctx, targets) + assert.Nil(t, err, "error should be nil. Targets are found") + + targets = []*query.Target{ + { + Keyspace: tablet.Keyspace, + Shard: tablet.Shard, + TabletType: tablet.Type, + }, + { + Keyspace: "newkeyspace", + Shard: tablet.Shard, + TabletType: tablet.Type, + }, + } + + err = hc.WaitForAllServingTablets(ctx, targets) + assert.NotNil(t, err, "error should not be nil (there are no tablets on this keyspace") + + targets = []*query.Target{ + { + Keyspace: tablet.Keyspace, + Shard: tablet.Shard, + TabletType: tablet.Type, + }, + { + Keyspace: "newkeyspace", + Shard: tablet.Shard, + TabletType: tablet.Type, + }, + } + + KeyspacesToWatch = []string{tablet.Keyspace} + + err = hc.WaitForAllServingTablets(ctx, targets) + assert.Nil(t, err, "error should be nil. Keyspace with no tablets is filtered") + + KeyspacesToWatch = []string{} +} + // TestGetHealthyTablets tests the functionality of GetHealthyTabletStats. func TestGetHealthyTablets(t *testing.T) { ts := memorytopo.NewServer("cell") diff --git a/go/vt/discovery/legacy_tablet_stats_cache_wait.go b/go/vt/discovery/legacy_tablet_stats_cache_wait.go index d984ff7d1ce..976aa9e7760 100644 --- a/go/vt/discovery/legacy_tablet_stats_cache_wait.go +++ b/go/vt/discovery/legacy_tablet_stats_cache_wait.go @@ -21,6 +21,7 @@ import ( "golang.org/x/net/context" + "vitess.io/vitess/go/vt/log" querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) @@ -84,6 +85,11 @@ func (tc *LegacyTabletStatsCache) waitForTablets(ctx context.Context, targets [] timer := time.NewTimer(waitAvailableTabletInterval) select { case <-ctx.Done(): + for _, target := range targets { + if target != nil { + log.Infof("couldn't find tablets for target: %v", target) + } + } timer.Stop() return ctx.Err() case <-timer.C: diff --git a/go/vt/vtgate/discoverygateway.go b/go/vt/vtgate/discoverygateway.go index d5fd6e0e637..bb65952916a 100644 --- a/go/vt/vtgate/discoverygateway.go +++ b/go/vt/vtgate/discoverygateway.go @@ -207,7 +207,8 @@ func (dg *DiscoveryGateway) WaitForTablets(ctx context.Context, tabletTypesToWai return err } - return dg.tsc.WaitForAllServingTablets(ctx, targets) + filteredTargets := discovery.FilterTargetsByKeyspaces(discovery.KeyspacesToWatch, targets) + return dg.tsc.WaitForAllServingTablets(ctx, filteredTargets) } // Close shuts down underlying connections. diff --git a/go/vt/vtgate/discoverygateway_test.go b/go/vt/vtgate/discoverygateway_test.go index 9e33fd69041..b855d18d647 100644 --- a/go/vt/vtgate/discoverygateway_test.go +++ b/go/vt/vtgate/discoverygateway_test.go @@ -19,6 +19,7 @@ package vtgate import ( "fmt" "testing" + "time" "vitess.io/vitess/go/vt/log" @@ -32,6 +33,7 @@ import ( "vitess.io/vitess/go/vt/topotools" querypb "vitess.io/vitess/go/vt/proto/query" + "vitess.io/vitess/go/vt/proto/topodata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) @@ -132,6 +134,82 @@ func TestDiscoveryGatewayGetTablets(t *testing.T) { } } +func TestDiscoveryGatewayWaitForTablets(t *testing.T) { + keyspace := "ks" + shard := "0" + cell := "local" + hc := discovery.NewFakeLegacyHealthCheck() + ts := memorytopo.NewServer("local") + srvTopo := srvtopotest.NewPassthroughSrvTopoServer() + srvTopo.TopoServer = ts + srvTopo.SrvKeyspaceNames = []string{keyspace} + srvTopo.SrvKeyspace = &topodatapb.SrvKeyspace{ + Partitions: []*topodatapb.SrvKeyspace_KeyspacePartition{ + { + ServedType: topodata.TabletType_MASTER, + ShardReferences: []*topodatapb.ShardReference{ + { + Name: shard, + }, + }, + }, + { + ServedType: topodata.TabletType_REPLICA, + ShardReferences: []*topodatapb.ShardReference{ + { + Name: shard, + }, + }, + }, + { + ServedType: topodata.TabletType_RDONLY, + ShardReferences: []*topodatapb.ShardReference{ + { + Name: shard, + }, + }, + }, + }, + } + + dg := NewDiscoveryGateway(context.Background(), hc, srvTopo, "local", 2) + + // replica should only use local ones + hc.Reset() + dg.tsc.ResetForTesting() + hc.AddTestTablet(cell, "2.2.2.2", 1001, keyspace, shard, topodatapb.TabletType_REPLICA, true, 10, nil) + hc.AddTestTablet(cell, "1.1.1.1", 1001, keyspace, shard, topodatapb.TabletType_MASTER, true, 5, nil) + ctx, _ := context.WithTimeout(context.Background(), 1*time.Second) + err := dg.WaitForTablets(ctx, []topodatapb.TabletType{topodatapb.TabletType_REPLICA, topodatapb.TabletType_MASTER}) + if err != nil { + t.Errorf("want %+v, got %+v", nil, err) + } + + // fails if there are no available tablets for the desired TabletType + err = dg.WaitForTablets(ctx, []topodatapb.TabletType{topodatapb.TabletType_RDONLY}) + if err == nil { + t.Errorf("expected error, got nil") + } + + // errors because there is no primary on ks2 + ctx, _ = context.WithTimeout(context.Background(), 1*time.Second) + srvTopo.SrvKeyspaceNames = []string{keyspace, "ks2"} + err = dg.WaitForTablets(ctx, []topodatapb.TabletType{topodatapb.TabletType_MASTER}) + if err == nil { + t.Errorf("expected error, got nil") + } + + discovery.KeyspacesToWatch = []string{keyspace} + // does not wait for ks2 if it's not part of the filter + ctx, _ = context.WithTimeout(context.Background(), 1*time.Second) + err = dg.WaitForTablets(ctx, []topodatapb.TabletType{topodatapb.TabletType_MASTER}) + if err != nil { + t.Errorf("want %+v, got %+v", nil, err) + } + + discovery.KeyspacesToWatch = []string{} +} + func TestShuffleTablets(t *testing.T) { ts1 := discovery.LegacyTabletStats{ Key: "t1", diff --git a/go/vt/vttablet/tabletconn/tablet_conn.go b/go/vt/vttablet/tabletconn/tablet_conn.go index 2580c34fad0..97a6ea2ee1b 100644 --- a/go/vt/vttablet/tabletconn/tablet_conn.go +++ b/go/vt/vttablet/tabletconn/tablet_conn.go @@ -18,6 +18,7 @@ package tabletconn import ( "flag" + "sync" "vitess.io/vitess/go/vt/grpcclient" "vitess.io/vitess/go/vt/log" @@ -50,9 +51,14 @@ type TabletDialer func(tablet *topodatapb.Tablet, failFast grpcclient.FailFast) var dialers = make(map[string]TabletDialer) +// mu This mutex helps us prevent data races when registering / getting dialers +var mu sync.Mutex + // RegisterDialer is meant to be used by TabletDialer implementations // to self register. func RegisterDialer(name string, dialer TabletDialer) { + mu.Lock() + defer mu.Unlock() if _, ok := dialers[name]; ok { log.Fatalf("Dialer %s already exists", name) } @@ -61,6 +67,8 @@ func RegisterDialer(name string, dialer TabletDialer) { // GetDialer returns the dialer to use, described by the command line flag func GetDialer() TabletDialer { + mu.Lock() + defer mu.Unlock() td, ok := dialers[*TabletProtocol] if !ok { log.Exitf("No dialer registered for tablet protocol %s", *TabletProtocol)